Commit 0e133ebb authored by Julien Muchembled's avatar Julien Muchembled

client: do not loop forever if conflicts happen on a big amount of data

parent aeb8549b
......@@ -455,6 +455,9 @@ class Application(ThreadedApplication):
while txn_context['data_size'] >= self._cache._max_size:
self._waitAnyTransactionMessage(txn_context)
# Do not loop forever if conflicts happen on a big amount of data.
if not self.dispatcher.pending(queue):
return
self._waitAnyTransactionMessage(txn_context, False)
def _handleConflicts(self, txn_context, tryToResolveConflict):
......
......@@ -66,6 +66,10 @@ class StorageAnswersHandler(AnswerBaseHandler):
object_stored_counter_dict = txn_context[
'object_stored_counter_dict'][oid]
if conflict:
# Conflicts can not be resolved now because 'conn' is locked.
# We must postpone the resolution (by queuing the conflict in
# 'conflict_dict') to avoid any deadlock with another thread that
# also resolves a conflict successfully to the same storage nodes.
# Warning: if a storage (S1) is much faster than another (S2), then
# we may process entirely a conflict with S1 (i.e. we received the
# answer to the store of the resolved object on S1) before we
......
......@@ -1362,5 +1362,22 @@ class Test(NEOThreadedTest):
ll()
t.join()
@with_cluster()
def testSameNewOidAndConflictOnBigValue(self, cluster):
storage = cluster.getZODBStorage()
oid = storage.new_oid()
txn = transaction.Transaction()
storage.tpc_begin(txn)
storage.store(oid, None, 'foo', '', txn)
storage.tpc_vote(txn)
storage.tpc_finish(txn)
txn = transaction.Transaction()
storage.tpc_begin(txn)
storage.store(oid, None, '*' * storage._cache._max_size, '', txn)
self.assertRaises(POSException.ConflictError, storage.tpc_vote, txn)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment