Commit 0f980f31 authored by Julien Muchembled's avatar Julien Muchembled

Merge "client: limit memory usage when committing big transactions"

parents 4456b532 24e83358
...@@ -516,20 +516,19 @@ class Application(object): ...@@ -516,20 +516,19 @@ class Application(object):
assert data_serial is None assert data_serial is None
compression = self.compress compression = self.compress
compressed_data = data compressed_data = data
size = len(data)
if self.compress: if self.compress:
compressed_data = compress(data) compressed_data = compress(data)
if len(compressed_data) > len(data): if size < len(compressed_data):
compressed_data = data compressed_data = data
compression = 0 compression = 0
else: else:
compression = 1 compression = 1
checksum = makeChecksum(compressed_data) checksum = makeChecksum(compressed_data)
txn_context['data_size'] += size
on_timeout = OnTimeout(self.onStoreTimeout, txn_context, oid) on_timeout = OnTimeout(self.onStoreTimeout, txn_context, oid)
# Store object in tmp cache # Store object in tmp cache
data_dict = txn_context['data_dict'] txn_context['data_dict'][oid] = data
if oid not in data_dict:
txn_context['data_list'].append(oid)
data_dict[oid] = data
# Store data on each node # Store data on each node
txn_context['object_stored_counter_dict'][oid] = {} txn_context['object_stored_counter_dict'][oid] = {}
object_base_serial_dict = txn_context['object_base_serial_dict'] object_base_serial_dict = txn_context['object_base_serial_dict']
...@@ -550,6 +549,8 @@ class Application(object): ...@@ -550,6 +549,8 @@ class Application(object):
if not involved_nodes: if not involved_nodes:
raise NEOStorageError("Store failed") raise NEOStorageError("Store failed")
while txn_context['data_size'] >= self._cache._max_size:
self._waitAnyTransactionMessage(txn_context)
self._waitAnyTransactionMessage(txn_context, False) self._waitAnyTransactionMessage(txn_context, False)
def onStoreTimeout(self, conn, msg_id, txn_context, oid): def onStoreTimeout(self, conn, msg_id, txn_context, oid):
...@@ -577,12 +578,18 @@ class Application(object): ...@@ -577,12 +578,18 @@ class Application(object):
for oid, conflict_serial_set in conflict_serial_dict.iteritems(): for oid, conflict_serial_set in conflict_serial_dict.iteritems():
conflict_serial = max(conflict_serial_set) conflict_serial = max(conflict_serial_set)
serial = object_serial_dict[oid] serial = object_serial_dict[oid]
data = data_dict[oid]
if ZERO_TID in conflict_serial_set: if ZERO_TID in conflict_serial_set:
if 1: if 1:
# XXX: disable deadlock avoidance code until it is fixed # XXX: disable deadlock avoidance code until it is fixed
neo.lib.logging.info('Deadlock avoidance on %r:%r', neo.lib.logging.info('Deadlock avoidance on %r:%r',
dump(oid), dump(serial)) dump(oid), dump(serial))
# 'data' parameter of ConflictError is only used to report the
# class of the object. It doesn't matter if 'data' is None
# because the transaction is too big.
try:
data = data_dict[oid]
except KeyError:
data = txn_context['cache_dict'][oid]
else: else:
# Storage refused us from taking object lock, to avoid a # Storage refused us from taking object lock, to avoid a
# possible deadlock. TID is actually used for some kind of # possible deadlock. TID is actually used for some kind of
...@@ -594,6 +601,7 @@ class Application(object): ...@@ -594,6 +601,7 @@ class Application(object):
# them), and requeue our already-sent store requests. # them), and requeue our already-sent store requests.
# XXX: currently, brute-force is implemented: we send # XXX: currently, brute-force is implemented: we send
# object data again. # object data again.
# WARNING: not maintained code
neo.lib.logging.info('Deadlock avoidance triggered on %r:%r', neo.lib.logging.info('Deadlock avoidance triggered on %r:%r',
dump(oid), dump(serial)) dump(oid), dump(serial))
for store_oid, store_data in data_dict.iteritems(): for store_oid, store_data in data_dict.iteritems():
...@@ -613,7 +621,13 @@ class Application(object): ...@@ -613,7 +621,13 @@ class Application(object):
store_data, unlock=True) store_data, unlock=True)
else: else:
continue continue
elif data is not CHECKED_SERIAL: else:
data = data_dict.pop(oid)
if data is CHECKED_SERIAL:
raise ReadConflictError(oid=oid, serials=(conflict_serial,
serial))
if data: # XXX: can 'data' be None ???
txn_context['data_size'] -= len(data)
resolved_serial_set = resolved_conflict_serial_dict.setdefault( resolved_serial_set = resolved_conflict_serial_dict.setdefault(
oid, set()) oid, set())
if resolved_serial_set and conflict_serial <= max( if resolved_serial_set and conflict_serial <= max(
...@@ -639,12 +653,6 @@ class Application(object): ...@@ -639,12 +653,6 @@ class Application(object):
neo.lib.logging.info('Conflict resolution failed for ' \ neo.lib.logging.info('Conflict resolution failed for ' \
'%r:%r with %r', dump(oid), dump(serial), '%r:%r with %r', dump(oid), dump(serial),
dump(conflict_serial)) dump(conflict_serial))
# XXX: Is it really required to remove from data_dict ?
del data_dict[oid]
txn_context['data_list'].remove(oid)
if data is CHECKED_SERIAL:
raise ReadConflictError(oid=oid, serials=(conflict_serial,
serial))
raise ConflictError(oid=oid, serials=(txn_context['ttid'], raise ConflictError(oid=oid, serials=(txn_context['ttid'],
serial), data=data) serial), data=data)
return result return result
...@@ -702,9 +710,10 @@ class Application(object): ...@@ -702,9 +710,10 @@ class Application(object):
ttid = txn_context['ttid'] ttid = txn_context['ttid']
# Store data on each node # Store data on each node
txn_stored_counter = 0 txn_stored_counter = 0
assert not txn_context['data_dict'], txn_context
packet = Packets.AskStoreTransaction(ttid, str(transaction.user), packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), dumps(transaction._extension), str(transaction.description), dumps(transaction._extension),
txn_context['data_list']) txn_context['cache_dict'])
add_involved_nodes = txn_context['involved_nodes'].add add_involved_nodes = txn_context['involved_nodes'].add
for node, conn in self.cp.iterateForObject(ttid, writable=True): for node, conn in self.cp.iterateForObject(ttid, writable=True):
neo.lib.logging.debug("voting object %s on %s", dump(ttid), neo.lib.logging.debug("voting object %s on %s", dump(ttid),
...@@ -773,9 +782,9 @@ class Application(object): ...@@ -773,9 +782,9 @@ class Application(object):
self._load_lock_acquire() self._load_lock_acquire()
try: try:
# Call finish on master # Call finish on master
oid_list = txn_context['data_list'] cache_dict = txn_context['cache_dict']
p = Packets.AskFinishTransaction(txn_context['ttid'], oid_list) tid = self._askPrimary(Packets.AskFinishTransaction(
tid = self._askPrimary(p) txn_context['ttid'], cache_dict))
# Call function given by ZODB # Call function given by ZODB
if f is not None: if f is not None:
...@@ -785,7 +794,7 @@ class Application(object): ...@@ -785,7 +794,7 @@ class Application(object):
self._cache_lock_acquire() self._cache_lock_acquire()
try: try:
cache = self._cache cache = self._cache
for oid, data in txn_context['data_dict'].iteritems(): for oid, data in cache_dict.iteritems():
if data is CHECKED_SERIAL: if data is CHECKED_SERIAL:
# this is just a remain of # this is just a remain of
# checkCurrentSerialInTransaction call, ignore (no data # checkCurrentSerialInTransaction call, ignore (no data
...@@ -1082,11 +1091,10 @@ class Application(object): ...@@ -1082,11 +1091,10 @@ class Application(object):
# Placeholders # Placeholders
queue = txn_context['queue'] queue = txn_context['queue']
txn_context['object_stored_counter_dict'][oid] = {} txn_context['object_stored_counter_dict'][oid] = {}
data_dict = txn_context['data_dict'] # ZODB.Connection performs calls 'checkCurrentSerialInTransaction'
if oid not in data_dict: # after stores, and skips oids that have been succeessfully stored.
# Marker value so we don't try to resolve conflicts. assert oid not in txn_context['cache_dict'], (oid, txn_context)
data_dict[oid] = CHECKED_SERIAL txn_context['data_dict'].setdefault(oid, CHECKED_SERIAL)
txn_context['data_list'].append(oid)
packet = Packets.AskCheckCurrentSerial(ttid, serial, oid) packet = Packets.AskCheckCurrentSerial(ttid, serial, oid)
for node, conn in self.cp.iterateForObject(oid, writable=True): for node, conn in self.cp.iterateForObject(oid, writable=True):
try: try:
......
...@@ -70,7 +70,9 @@ class TransactionContainer(ContainerBase): ...@@ -70,7 +70,9 @@ class TransactionContainer(ContainerBase):
'txn': txn, 'txn': txn,
'ttid': None, 'ttid': None,
'data_dict': {}, 'data_dict': {},
'data_list': [], 'data_size': 0,
'cache_dict': {},
'cache_size': 0,
'object_base_serial_dict': {}, 'object_base_serial_dict': {},
'object_serial_dict': {}, 'object_serial_dict': {},
'object_stored_counter_dict': {}, 'object_stored_counter_dict': {},
......
...@@ -91,7 +91,28 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -91,7 +91,28 @@ class StorageAnswersHandler(AnswerBaseHandler):
conflict_serial_dict = txn_context['conflict_serial_dict'] conflict_serial_dict = txn_context['conflict_serial_dict']
conflict_serial_dict.setdefault(oid, set()).add(serial) conflict_serial_dict.setdefault(oid, set()).add(serial)
else: else:
uuid_set = object_stored_counter_dict.setdefault(serial, set()) uuid_set = object_stored_counter_dict.get(serial)
if uuid_set is None: # store to first storage node
object_stored_counter_dict[serial] = uuid_set = set()
try:
data = txn_context['data_dict'].pop(oid)
except KeyError: # multiple undo
assert txn_context['cache_dict'][oid] is None, oid
else:
if type(data) is str:
size = len(data)
txn_context['data_size'] -= size
size += txn_context['cache_size']
if size < self.app._cache._max_size:
txn_context['cache_size'] = size
else:
# Do not cache data past cache max size, as it
# would just flush it on tpc_finish. This also
# prevents memory errors for big transactions.
data = None
txn_context['cache_dict'][oid] = data
else: # replica
assert oid not in txn_context['data_dict'], oid
uuid_set.add(conn.getUUID()) uuid_set.add(conn.getUUID())
answerCheckCurrentSerial = answerStoreObject answerCheckCurrentSerial = answerStoreObject
......
...@@ -330,7 +330,6 @@ class PList(PStructItem): ...@@ -330,7 +330,6 @@ class PList(PStructItem):
self._item = item self._item = item
def _encode(self, writer, items): def _encode(self, writer, items):
assert isinstance(items, (list, tuple, set)), (type(items), items)
writer(self.pack(len(items))) writer(self.pack(len(items)))
item = self._item item = self._item
for value in items: for value in items:
......
...@@ -314,7 +314,6 @@ class ClientApplicationTests(NeoUnitTestBase): ...@@ -314,7 +314,6 @@ class ClientApplicationTests(NeoUnitTestBase):
app.nm.createStorage(address=storage_address) app.nm.createStorage(address=storage_address)
data_dict = txn_context['data_dict'] data_dict = txn_context['data_dict']
data_dict[oid] = 'BEFORE' data_dict[oid] = 'BEFORE'
txn_context['data_list'].append(oid)
app.store(oid, tid, '', None, txn) app.store(oid, tid, '', None, txn)
txn_context['queue'].put((conn, packet, {})) txn_context['queue'].put((conn, packet, {}))
self.assertRaises(ConflictError, app.waitStoreResponses, txn_context, self.assertRaises(ConflictError, app.waitStoreResponses, txn_context,
...@@ -346,7 +345,8 @@ class ClientApplicationTests(NeoUnitTestBase): ...@@ -346,7 +345,8 @@ class ClientApplicationTests(NeoUnitTestBase):
app.waitStoreResponses(txn_context, resolving_tryToResolveConflict) app.waitStoreResponses(txn_context, resolving_tryToResolveConflict)
self.assertEqual(txn_context['object_stored_counter_dict'][oid], self.assertEqual(txn_context['object_stored_counter_dict'][oid],
{tid: set([uuid])}) {tid: set([uuid])})
self.assertEqual(txn_context['data_dict'].get(oid, None), 'DATA') self.assertEqual(txn_context['cache_dict'][oid], 'DATA')
self.assertFalse(oid in txn_context['data_dict'])
self.assertFalse(oid in txn_context['conflict_serial_dict']) self.assertFalse(oid in txn_context['conflict_serial_dict'])
def test_tpc_vote1(self): def test_tpc_vote1(self):
......
...@@ -174,9 +174,10 @@ class StorageAnswerHandlerTests(NeoUnitTestBase): ...@@ -174,9 +174,10 @@ class StorageAnswerHandlerTests(NeoUnitTestBase):
object_stored_counter_dict = {oid: {}} object_stored_counter_dict = {oid: {}}
conflict_serial_dict = {} conflict_serial_dict = {}
resolved_conflict_serial_dict = {} resolved_conflict_serial_dict = {}
self._getAnswerStoreObjectHandler(object_stored_counter_dict, h = self._getAnswerStoreObjectHandler(object_stored_counter_dict,
conflict_serial_dict, resolved_conflict_serial_dict, conflict_serial_dict, resolved_conflict_serial_dict)
).answerStoreObject(conn, 0, oid, tid) h.app.getHandlerData()['cache_dict'] = {oid: None}
h.answerStoreObject(conn, 0, oid, tid)
self.assertFalse(oid in conflict_serial_dict) self.assertFalse(oid in conflict_serial_dict)
self.assertFalse(oid in resolved_conflict_serial_dict) self.assertFalse(oid in resolved_conflict_serial_dict)
self.assertEqual(object_stored_counter_dict[oid], {tid: set([uuid])}) self.assertEqual(object_stored_counter_dict[oid], {tid: set([uuid])})
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment