Commit 24e83358 authored by Julien Muchembled's avatar Julien Muchembled

client: limit memory usage when committing big transactions

parent 7719e87b
......@@ -515,20 +515,19 @@ class Application(object):
assert data_serial is None
compression = self.compress
compressed_data = data
size = len(data)
if self.compress:
compressed_data = compress(data)
if len(compressed_data) > len(data):
if size < len(compressed_data):
compressed_data = data
compression = 0
else:
compression = 1
checksum = makeChecksum(compressed_data)
txn_context['data_size'] += size
on_timeout = OnTimeout(self.onStoreTimeout, txn_context, oid)
# Store object in tmp cache
data_dict = txn_context['data_dict']
if oid not in data_dict:
txn_context['data_list'].append(oid)
data_dict[oid] = data
txn_context['data_dict'][oid] = data
# Store data on each node
txn_context['object_stored_counter_dict'][oid] = {}
object_base_serial_dict = txn_context['object_base_serial_dict']
......@@ -549,6 +548,8 @@ class Application(object):
if not involved_nodes:
raise NEOStorageError("Store failed")
while txn_context['data_size'] >= self._cache._max_size:
self._waitAnyTransactionMessage(txn_context)
self._waitAnyTransactionMessage(txn_context, False)
def onStoreTimeout(self, conn, msg_id, txn_context, oid):
......@@ -576,12 +577,18 @@ class Application(object):
for oid, conflict_serial_set in conflict_serial_dict.iteritems():
conflict_serial = max(conflict_serial_set)
serial = object_serial_dict[oid]
data = data_dict[oid]
if ZERO_TID in conflict_serial_set:
if 1:
# XXX: disable deadlock avoidance code until it is fixed
neo.lib.logging.info('Deadlock avoidance on %r:%r',
dump(oid), dump(serial))
# 'data' parameter of ConflictError is only used to report the
# class of the object. It doesn't matter if 'data' is None
# because the transaction is too big.
try:
data = data_dict[oid]
except KeyError:
data = txn_context['cache_dict'][oid]
else:
# Storage refused us from taking object lock, to avoid a
# possible deadlock. TID is actually used for some kind of
......@@ -593,6 +600,7 @@ class Application(object):
# them), and requeue our already-sent store requests.
# XXX: currently, brute-force is implemented: we send
# object data again.
# WARNING: not maintained code
neo.lib.logging.info('Deadlock avoidance triggered on %r:%r',
dump(oid), dump(serial))
for store_oid, store_data in data_dict.iteritems():
......@@ -612,7 +620,13 @@ class Application(object):
store_data, unlock=True)
else:
continue
elif data is not CHECKED_SERIAL:
else:
data = data_dict.pop(oid)
if data is CHECKED_SERIAL:
raise ReadConflictError(oid=oid, serials=(conflict_serial,
serial))
if data: # XXX: can 'data' be None ???
txn_context['data_size'] -= len(data)
resolved_serial_set = resolved_conflict_serial_dict.setdefault(
oid, set())
if resolved_serial_set and conflict_serial <= max(
......@@ -638,12 +652,6 @@ class Application(object):
neo.lib.logging.info('Conflict resolution failed for ' \
'%r:%r with %r', dump(oid), dump(serial),
dump(conflict_serial))
# XXX: Is it really required to remove from data_dict ?
del data_dict[oid]
txn_context['data_list'].remove(oid)
if data is CHECKED_SERIAL:
raise ReadConflictError(oid=oid, serials=(conflict_serial,
serial))
raise ConflictError(oid=oid, serials=(txn_context['ttid'],
serial), data=data)
return result
......@@ -702,9 +710,10 @@ class Application(object):
ttid = txn_context['ttid']
# Store data on each node
txn_stored_counter = 0
assert not txn_context['data_dict'], txn_context
packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), dumps(transaction._extension),
txn_context['data_list'])
txn_context['cache_dict'])
add_involved_nodes = txn_context['involved_nodes'].add
for node, conn in self.cp.iterateForObject(ttid, writable=True):
neo.lib.logging.debug("voting object %s on %s", dump(ttid),
......@@ -773,9 +782,9 @@ class Application(object):
self._load_lock_acquire()
try:
# Call finish on master
oid_list = txn_context['data_list']
p = Packets.AskFinishTransaction(txn_context['ttid'], oid_list)
tid = self._askPrimary(p)
cache_dict = txn_context['cache_dict']
tid = self._askPrimary(Packets.AskFinishTransaction(
txn_context['ttid'], cache_dict))
# Call function given by ZODB
if f is not None:
......@@ -785,7 +794,7 @@ class Application(object):
self._cache_lock_acquire()
try:
cache = self._cache
for oid, data in txn_context['data_dict'].iteritems():
for oid, data in cache_dict.iteritems():
if data is CHECKED_SERIAL:
# this is just a remain of
# checkCurrentSerialInTransaction call, ignore (no data
......@@ -1088,11 +1097,10 @@ class Application(object):
# Placeholders
queue = txn_context['queue']
txn_context['object_stored_counter_dict'][oid] = {}
data_dict = txn_context['data_dict']
if oid not in data_dict:
# Marker value so we don't try to resolve conflicts.
data_dict[oid] = CHECKED_SERIAL
txn_context['data_list'].append(oid)
# ZODB.Connection performs calls 'checkCurrentSerialInTransaction'
# after stores, and skips oids that have been succeessfully stored.
assert oid not in txn_context['cache_dict'], (oid, txn_context)
txn_context['data_dict'].setdefault(oid, CHECKED_SERIAL)
packet = Packets.AskCheckCurrentSerial(ttid, serial, oid)
for node, conn in self.cp.iterateForObject(oid, writable=True):
try:
......
......@@ -70,7 +70,9 @@ class TransactionContainer(ContainerBase):
'txn': txn,
'ttid': None,
'data_dict': {},
'data_list': [],
'data_size': 0,
'cache_dict': {},
'cache_size': 0,
'object_base_serial_dict': {},
'object_serial_dict': {},
'object_stored_counter_dict': {},
......
......@@ -92,7 +92,28 @@ class StorageAnswersHandler(AnswerBaseHandler):
conflict_serial_dict = txn_context['conflict_serial_dict']
conflict_serial_dict.setdefault(oid, set()).add(serial)
else:
uuid_set = object_stored_counter_dict.setdefault(serial, set())
uuid_set = object_stored_counter_dict.get(serial)
if uuid_set is None: # store to first storage node
object_stored_counter_dict[serial] = uuid_set = set()
try:
data = txn_context['data_dict'].pop(oid)
except KeyError: # multiple undo
assert txn_context['cache_dict'][oid] is None, oid
else:
if type(data) is str:
size = len(data)
txn_context['data_size'] -= size
size += txn_context['cache_size']
if size < self.app._cache._max_size:
txn_context['cache_size'] = size
else:
# Do not cache data past cache max size, as it
# would just flush it on tpc_finish. This also
# prevents memory errors for big transactions.
data = None
txn_context['cache_dict'][oid] = data
else: # replica
assert oid not in txn_context['data_dict'], oid
uuid_set.add(conn.getUUID())
answerCheckCurrentSerial = answerStoreObject
......
......@@ -330,7 +330,6 @@ class PList(PStructItem):
self._item = item
def _encode(self, writer, items):
assert isinstance(items, (list, tuple, set)), (type(items), items)
writer(self.pack(len(items)))
item = self._item
for value in items:
......
......@@ -313,7 +313,6 @@ class ClientApplicationTests(NeoUnitTestBase):
app.nm.createStorage(address=storage_address)
data_dict = txn_context['data_dict']
data_dict[oid] = 'BEFORE'
txn_context['data_list'].append(oid)
app.store(oid, tid, '', None, txn)
txn_context['queue'].put((conn, packet))
self.assertRaises(ConflictError, app.waitStoreResponses, txn_context,
......@@ -348,7 +347,8 @@ class ClientApplicationTests(NeoUnitTestBase):
app.waitStoreResponses(txn_context, resolving_tryToResolveConflict)
self.assertEqual(txn_context['object_stored_counter_dict'][oid],
{tid: set([uuid])})
self.assertEqual(txn_context['data_dict'].get(oid, None), 'DATA')
self.assertEqual(txn_context['cache_dict'][oid], 'DATA')
self.assertFalse(oid in txn_context['data_dict'])
self.assertFalse(oid in txn_context['conflict_serial_dict'])
def test_tpc_vote1(self):
......
......@@ -175,9 +175,10 @@ class StorageAnswerHandlerTests(NeoUnitTestBase):
object_stored_counter_dict = {oid: {}}
conflict_serial_dict = {}
resolved_conflict_serial_dict = {}
self._getAnswerStoreObjectHandler(object_stored_counter_dict,
conflict_serial_dict, resolved_conflict_serial_dict,
).answerStoreObject(conn, 0, oid, tid)
h = self._getAnswerStoreObjectHandler(object_stored_counter_dict,
conflict_serial_dict, resolved_conflict_serial_dict)
h.app.getHandlerData()['cache_dict'] = {oid: None}
h.answerStoreObject(conn, 0, oid, tid)
self.assertFalse(oid in conflict_serial_dict)
self.assertFalse(oid in resolved_conflict_serial_dict)
self.assertEqual(object_stored_counter_dict[oid], {tid: set([uuid])})
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment