Commit aeb8549b authored by Julien Muchembled's avatar Julien Muchembled

client: simplify handling of conflicts

parent 8bf149d0
...@@ -80,14 +80,12 @@ class TransactionContainer(dict): ...@@ -80,14 +80,12 @@ class TransactionContainer(dict):
# data stored: this will go to the cache on tpc_finish # data stored: this will go to the cache on tpc_finish
'cache_dict': {}, 'cache_dict': {},
'cache_size': 0, 'cache_size': 0,
# serial being stored
'object_serial_dict': {}, # {oid: serial}
# track successful stores/checks # track successful stores/checks
'object_stored_counter_dict': {}, # {oid: {serial: {storage_id}}} 'object_stored_counter_dict': {}, # {oid: {serial: {storage_id}}}
# conflicts to resolve # conflicts to resolve
'conflict_serial_dict': {}, # {oid: {serial}} 'conflict_dict': {}, # {oid: (base_serial, serial)}
# resolved conflicts # resolved conflicts
'resolved_conflict_serial_dict': {}, # {oid: {serial}} 'resolved_dict': {}, # {oid: serial}
# nodes with at least 1 store (object or transaction) # nodes with at least 1 store (object or transaction)
'involved_nodes': set(), # {node} 'involved_nodes': set(), # {node}
# nodes with at least 1 check # nodes with at least 1 check
...@@ -441,7 +439,6 @@ class Application(ThreadedApplication): ...@@ -441,7 +439,6 @@ class Application(ThreadedApplication):
txn_context['data_dict'][oid] = data txn_context['data_dict'][oid] = data
# Store data on each node # Store data on each node
txn_context['object_stored_counter_dict'][oid] = {} txn_context['object_stored_counter_dict'][oid] = {}
txn_context['object_serial_dict'][oid] = serial
queue = txn_context['queue'] queue = txn_context['queue']
involved_nodes = txn_context['involved_nodes'] involved_nodes = txn_context['involved_nodes']
add_involved_nodes = involved_nodes.add add_involved_nodes = involved_nodes.add
...@@ -461,19 +458,17 @@ class Application(ThreadedApplication): ...@@ -461,19 +458,17 @@ class Application(ThreadedApplication):
self._waitAnyTransactionMessage(txn_context, False) self._waitAnyTransactionMessage(txn_context, False)
def _handleConflicts(self, txn_context, tryToResolveConflict): def _handleConflicts(self, txn_context, tryToResolveConflict):
result = []
append = result.append
# Check for conflicts
data_dict = txn_context['data_dict'] data_dict = txn_context['data_dict']
object_serial_dict = txn_context['object_serial_dict'] pop_conflict = txn_context['conflict_dict'].popitem
conflict_serial_dict = txn_context['conflict_serial_dict'].copy() resolved_dict = txn_context['resolved_dict']
txn_context['conflict_serial_dict'].clear() while 1:
resolved_conflict_serial_dict = txn_context[ # We iterate over conflict_dict, and clear it,
'resolved_conflict_serial_dict'] # because new items may be added by calls to _store.
for oid, conflict_serial_set in conflict_serial_dict.iteritems(): try:
conflict_serial = max(conflict_serial_set) oid, (serial, conflict_serial) = pop_conflict()
serial = object_serial_dict[oid] except KeyError:
if ZERO_TID in conflict_serial_set: return
if conflict_serial == ZERO_TID:
if 1: if 1:
# XXX: disable deadlock avoidance code until it is fixed # XXX: disable deadlock avoidance code until it is fixed
logging.info('Deadlock avoidance on %r:%r', logging.info('Deadlock avoidance on %r:%r',
...@@ -484,6 +479,7 @@ class Application(ThreadedApplication): ...@@ -484,6 +479,7 @@ class Application(ThreadedApplication):
try: try:
data = data_dict[oid] data = data_dict[oid]
except KeyError: except KeyError:
# succesfully stored on another storage node
data = txn_context['cache_dict'][oid] data = txn_context['cache_dict'][oid]
else: else:
# Storage refused us from taking object lock, to avoid a # Storage refused us from taking object lock, to avoid a
...@@ -505,13 +501,6 @@ class Application(ThreadedApplication): ...@@ -505,13 +501,6 @@ class Application(ThreadedApplication):
# TODO: data can be None if a conflict happens during undo # TODO: data can be None if a conflict happens during undo
if data: if data:
txn_context['data_size'] -= len(data) txn_context['data_size'] -= len(data)
resolved_serial_set = resolved_conflict_serial_dict.setdefault(
oid, set())
if resolved_serial_set and conflict_serial <= max(
resolved_serial_set):
# A later serial has already been resolved, skip.
resolved_serial_set.update(conflict_serial_set)
continue
if self.last_tid < conflict_serial: if self.last_tid < conflict_serial:
self.sync() # possible late invalidation (very rare) self.sync() # possible late invalidation (very rare)
try: try:
...@@ -526,16 +515,14 @@ class Application(ThreadedApplication): ...@@ -526,16 +515,14 @@ class Application(ThreadedApplication):
'%r:%r with %r', dump(oid), dump(serial), '%r:%r with %r', dump(oid), dump(serial),
dump(conflict_serial)) dump(conflict_serial))
# Mark this conflict as resolved # Mark this conflict as resolved
resolved_serial_set.update(conflict_serial_set) resolved_dict[oid] = conflict_serial
# Try to store again # Try to store again
self._store(txn_context, oid, conflict_serial, new_data) self._store(txn_context, oid, conflict_serial, new_data)
append(oid)
continue continue
# With recent ZODB, get_pickle_metadata (from ZODB.utils) does # With recent ZODB, get_pickle_metadata (from ZODB.utils) does
# not support empty values, so do not pass 'data' in this case. # not support empty values, so do not pass 'data' in this case.
raise ConflictError(oid=oid, serials=(conflict_serial, raise ConflictError(oid=oid, serials=(conflict_serial,
serial), data=data or None) serial), data=data or None)
return result
def waitResponses(self, queue): def waitResponses(self, queue):
"""Wait for all requests to be answered (or their connection to be """Wait for all requests to be answered (or their connection to be
...@@ -546,24 +533,17 @@ class Application(ThreadedApplication): ...@@ -546,24 +533,17 @@ class Application(ThreadedApplication):
_waitAnyMessage(queue) _waitAnyMessage(queue)
def waitStoreResponses(self, txn_context, tryToResolveConflict): def waitStoreResponses(self, txn_context, tryToResolveConflict):
result = []
append = result.append
resolved_oid_set = set()
update = resolved_oid_set.update
_handleConflicts = self._handleConflicts _handleConflicts = self._handleConflicts
queue = txn_context['queue'] queue = txn_context['queue']
conflict_serial_dict = txn_context['conflict_serial_dict'] conflict_dict = txn_context['conflict_dict']
pending = self.dispatcher.pending pending = self.dispatcher.pending
_waitAnyTransactionMessage = self._waitAnyTransactionMessage _waitAnyTransactionMessage = self._waitAnyTransactionMessage
while pending(queue) or conflict_serial_dict: while pending(queue) or conflict_dict:
# Note: handler data can be overwritten by _handleConflicts # Note: handler data can be overwritten by _handleConflicts
# so we must set it for each iteration. # so we must set it for each iteration.
_waitAnyTransactionMessage(txn_context) _waitAnyTransactionMessage(txn_context)
if conflict_serial_dict: if conflict_dict:
conflicts = _handleConflicts(txn_context, _handleConflicts(txn_context, tryToResolveConflict)
tryToResolveConflict)
if conflicts:
update(conflicts)
# Check for never-stored objects, and update result for all others # Check for never-stored objects, and update result for all others
for oid, store_dict in \ for oid, store_dict in \
...@@ -571,9 +551,10 @@ class Application(ThreadedApplication): ...@@ -571,9 +551,10 @@ class Application(ThreadedApplication):
if not store_dict: if not store_dict:
logging.error('tpc_store failed') logging.error('tpc_store failed')
raise NEOStorageError('tpc_store failed') raise NEOStorageError('tpc_store failed')
elif oid in resolved_oid_set: if OLD_ZODB:
append((oid, ResolvedSerial) if OLD_ZODB else oid) return [(oid, ResolvedSerial)
return result for oid in txn_context['resolved_dict']]
return txn_context['resolved_dict']
def tpc_vote(self, transaction, tryToResolveConflict): def tpc_vote(self, transaction, tryToResolveConflict):
"""Store current transaction.""" """Store current transaction."""
...@@ -950,7 +931,6 @@ class Application(ThreadedApplication): ...@@ -950,7 +931,6 @@ class Application(ThreadedApplication):
def _checkCurrentSerialInTransaction(self, txn_context, oid, serial): def _checkCurrentSerialInTransaction(self, txn_context, oid, serial):
ttid = txn_context['ttid'] ttid = txn_context['ttid']
txn_context['object_serial_dict'][oid] = serial
# Placeholders # Placeholders
queue = txn_context['queue'] queue = txn_context['queue']
txn_context['object_stored_counter_dict'][oid] = {} txn_context['object_stored_counter_dict'][oid] = {}
......
...@@ -72,17 +72,17 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -72,17 +72,17 @@ class StorageAnswersHandler(AnswerBaseHandler):
# receive the conflict answer from the first store on S2. # receive the conflict answer from the first store on S2.
logging.info('%r report a conflict for %r with %r', logging.info('%r report a conflict for %r with %r',
conn, dump(oid), dump(conflict)) conn, dump(oid), dump(conflict))
# If this conflict is not already resolved, mark it for if conflict != ZERO_TID:
# resolution. # If this conflict is not already resolved, mark it for
if conflict not in txn_context[ # resolution.
'resolved_conflict_serial_dict'].get(oid, ()): if conflict <= txn_context['resolved_dict'].get(oid, ZERO_TID):
if ZERO_TID != conflict in object_stored_counter_dict: return
if conflict in object_stored_counter_dict:
raise NEOStorageError('Storages %s accepted object %s' raise NEOStorageError('Storages %s accepted object %s'
' for serial %s but %s reports a conflict for it.' % ( ' for serial %s but %s reports a conflict for it.' % (
map(dump, object_stored_counter_dict[conflict]), map(dump, object_stored_counter_dict[conflict]),
dump(oid), dump(conflict), dump(conn.getUUID()))) dump(oid), dump(conflict), dump(conn.getUUID())))
conflict_serial_dict = txn_context['conflict_serial_dict'] txn_context['conflict_dict'][oid] = serial, conflict
conflict_serial_dict.setdefault(oid, set()).add(conflict)
else: else:
uuid_set = object_stored_counter_dict.get(serial) uuid_set = object_stored_counter_dict.get(serial)
if uuid_set is None: # store to first storage node if uuid_set is None: # store to first storage node
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment