Commit 5e7f34d2 authored by Julien Muchembled's avatar Julien Muchembled

New algorithm for deadlock avoidance

The time complexity of previous one was too bad. With several tens of
concurrent transactions, we saw commits take minutes to complete and
the whole application looked frozen.

This new algorithm is much simpler. Instead of asking the oldest
transaction to somewhat restart (we used the "rebase" term because
the concept was similar to what git-rebase does), the storage gives
it priority and the newest is asked to relock (this request is ignored
if vote already happened, which means there was actually no deadlock).

testLocklessWriteDuringConflictResolution was initially more complex
because Transaction.written (client) ignored KeyError (which is not the
case anymore since commit 8ef1ddba).
parent d98b576c
......@@ -524,59 +524,37 @@ class Application(ThreadedApplication):
while 1:
# We iterate over conflict_dict, and clear it,
# because new items may be added by calls to _store.
# This is also done atomically, to avoid race conditions
# with PrimaryNotificationsHandler.notifyDeadlock
try:
oid, serial = pop_conflict()
except KeyError:
return
data, old_serial, _ = data_dict.pop(oid)
if data is CHECKED_SERIAL:
raise ReadConflictError(oid=oid,
serials=(serial, old_serial))
# TODO: data can be None if a conflict happens during undo
if data:
txn_context.data_size -= len(data)
if self.last_tid < serial:
self.sync() # possible late invalidation (very rare)
try:
data, old_serial, _ = data_dict.pop(oid)
except KeyError:
assert oid is None, (oid, serial)
# Storage refused us from taking object lock, to avoid a
# possible deadlock. TID is actually used for some kind of
# "locking priority": when a higher value has the lock,
# this means we stored objects "too late", and we would
# otherwise cause a deadlock.
# To recover, we must ask storages to release locks we
# hold (to let possibly-competing transactions acquire
# them), and requeue our already-sent store requests.
ttid = txn_context.ttid
logging.info('Deadlock avoidance triggered for TXN %s'
' with new locking TID %s', dump(ttid), dump(serial))
txn_context.locking_tid = serial
packet = Packets.AskRebaseTransaction(ttid, serial)
for uuid in txn_context.conn_dict:
self._askStorageForWrite(txn_context, uuid, packet)
else:
if data is CHECKED_SERIAL:
raise ReadConflictError(oid=oid,
serials=(serial, old_serial))
# TODO: data can be None if a conflict happens during undo
if data:
txn_context.data_size -= len(data)
if self.last_tid < serial:
self.sync() # possible late invalidation (very rare)
try:
data = tryToResolveConflict(oid, serial, old_serial, data)
except ConflictError:
logging.info(
'Conflict resolution failed for %s@%s with %s',
dump(oid), dump(old_serial), dump(serial))
# With recent ZODB, get_pickle_metadata (from ZODB.utils)
# does not support empty values, so do not pass 'data'
# in this case.
raise ConflictError(oid=oid, serials=(serial, old_serial),
data=data or None)
else:
logging.info(
'Conflict resolution succeeded for %s@%s with %s',
dump(oid), dump(old_serial), dump(serial))
# Mark this conflict as resolved
resolved_dict[oid] = serial
# Try to store again
self._store(txn_context, oid, serial, data)
data = tryToResolveConflict(oid, serial, old_serial, data)
except ConflictError:
logging.info(
'Conflict resolution failed for %s@%s with %s',
dump(oid), dump(old_serial), dump(serial))
# With recent ZODB, get_pickle_metadata (from ZODB.utils)
# does not support empty values, so do not pass 'data'
# in this case.
raise ConflictError(oid=oid, serials=(serial, old_serial),
data=data or None)
logging.info(
'Conflict resolution succeeded for %s@%s with %s',
dump(oid), dump(old_serial), dump(serial))
# Mark this conflict as resolved
resolved_dict[oid] = serial
# Try to store again
self._store(txn_context, oid, serial, data)
def _askStorageForWrite(self, txn_context, uuid, packet):
conn = txn_context.conn_dict[uuid]
......@@ -609,6 +587,7 @@ class Application(ThreadedApplication):
"""Store current transaction."""
txn_context = self._txn_container.get(transaction)
self.waitStoreResponses(txn_context)
txn_context.stored = True
ttid = txn_context.ttid
ext = transaction._extension
ext = dumps(ext, _protocol) if ext else ''
......@@ -640,10 +619,8 @@ class Application(ThreadedApplication):
if not (uuid in failed or uuid in uuid_set):
break
else:
# Very unlikely. Instead of raising, we could recover
# the transaction by doing something similar to
# deadlock avoidance; that would be done before voting.
# But it's not worth the complexity.
# Very unlikely. Trying to recover
# is not worth the complexity.
raise NEOStorageError(
'partition %s not fully write-locked' % offset)
failed = [uuid for uuid, running in failed.iteritems() if running]
......
......@@ -149,13 +149,6 @@ class PrimaryNotificationsHandler(MTEventHandler):
if node and node.isConnected():
node.getConnection().close()
def notifyDeadlock(self, conn, ttid, locking_tid):
for txn_context in self.app.txn_contexts():
if txn_context.ttid == ttid:
txn_context.conflict_dict[None] = locking_tid
txn_context.wakeup(conn)
break
class PrimaryAnswersHandler(AnswerBaseHandler):
""" Handle that process expected packets from the primary master """
......
......@@ -28,11 +28,24 @@ from ..transactions import Transaction
from ..exception import NEOStorageError, NEOStorageNotFoundError
from ..exception import NEOStorageReadRetry, NEOStorageDoesNotExistError
@apply
class _DeadlockPacket(object):
handler_method_name = 'notifyDeadlock'
_args = ()
getId = int
class StorageEventHandler(MTEventHandler):
def _acceptIdentification(*args):
pass
def notifyDeadlock(self, conn, ttid, oid):
for txn_context in self.app.txn_contexts():
if txn_context.ttid == ttid:
txn_context.queue.put((conn, _DeadlockPacket, {'oid': oid}))
break
class StorageBootstrapHandler(AnswerBaseHandler):
""" Handler used when connecting to a storage node """
......@@ -72,22 +85,28 @@ class StorageAnswersHandler(AnswerBaseHandler):
answerCheckCurrentSerial = answerStoreObject
def answerRebaseTransaction(self, conn, oid_list):
def notifyDeadlock(self, conn, oid):
# To avoid a possible deadlock, storage nodes are waiting for our
# lock to be cancelled, so that a transaction that started earlier
# can complete. This is done by acquiring the lock again.
txn_context = self.app.getHandlerData()
if txn_context.stored:
return
ttid = txn_context.ttid
queue = txn_context.queue
logging.info('Deadlock avoidance triggered for %s:%s',
dump(oid), dump(ttid))
assert conn.getUUID() not in txn_context.data_dict.get(oid, ((),))[-1]
try:
for oid in oid_list:
# We could have an extra parameter to tell the storage if we
# still have the data, and in this case revert what was done
# in Transaction.written. This would save bandwidth in case of
# conflict.
conn.ask(Packets.AskRebaseObject(ttid, oid),
queue=queue, oid=oid)
# We could have an extra parameter to tell the storage if we
# still have the data, and in this case revert what was done
# in Transaction.written. This would save bandwidth in case of
# conflict.
conn.ask(Packets.AskRelockObject(ttid, oid),
queue=txn_context.queue, oid=oid)
except ConnectionClosed:
txn_context.conn_dict[conn.getUUID()] = None
def answerRebaseObject(self, conn, conflict, oid):
def answerRelockObject(self, conn, conflict, oid):
if conflict:
txn_context = self.app.getHandlerData()
serial, conflict, data = conflict
......@@ -105,7 +124,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
assert oid in txn_context.data_dict
if serial <= txn_context.conflict_dict.get(oid, ''):
# Another node already reported this conflict or a newer,
# by answering to this rebase or to the previous store.
# by answering to this relock or to the previous store.
return
# A node has not answered yet to a previous store. Do not wait
# it to report the conflict because it may fail before.
......@@ -119,8 +138,8 @@ class StorageAnswersHandler(AnswerBaseHandler):
compression, checksum, data = data
if checksum != makeChecksum(data):
raise NEOStorageError(
'wrong checksum while getting back data for'
' object %s during rebase of transaction %s'
'wrong checksum while getting back data for %s:%s'
' (deadlock avoidance)'
% (dump(oid), dump(txn_context.ttid)))
data = decompress_list[compression](data)
size = len(data)
......
......@@ -22,19 +22,12 @@ from neo.lib.protocol import Packets
from neo.lib.util import dump
from .exception import NEOStorageError
@apply
class _WakeupPacket(object):
handler_method_name = 'pong'
_args = ()
getId = int
class Transaction(object):
cache_size = 0 # size of data in cache_dict
data_size = 0 # size of data in data_dict
error = None
locking_tid = None
stored = False
voted = False
ttid = None # XXX: useless, except for testBackupReadOnlyAccess
lockless_dict = None # {partition: {uuid}}
......@@ -55,16 +48,13 @@ class Transaction(object):
def __repr__(self):
error = self.error
return ("<%s ttid=%s locking_tid=%s voted=%u"
return ("<%s ttid=%s voted=%u"
" #queue=%s #writing=%s #written=%s%s>") % (
self.__class__.__name__,
dump(self.ttid), dump(self.locking_tid), self.voted,
dump(self.ttid), self.voted,
len(self.queue._queue), len(self.data_dict), len(self.cache_dict),
' error=%r' % error if error else '')
def wakeup(self, conn):
self.queue.put((conn, _WakeupPacket, {}))
def write(self, app, packet, object_id, **kw):
uuid_list = []
pt = app.pt
......@@ -78,14 +68,6 @@ class Transaction(object):
conn = conn_dict[uuid]
except KeyError:
conn = conn_dict[uuid] = app.getStorageConnection(node)
if self.locking_tid and 'oid' in kw:
# A deadlock happened but this node is not aware of it.
# Tell it to write-lock with the same locking tid as
# for the other nodes. The condition on kw is to
# distinguish whether we're writing an oid or
# transaction metadata.
conn.ask(Packets.AskRebaseTransaction(
self.ttid, self.locking_tid), queue=self.queue)
conn.ask(packet, queue=self.queue, **kw)
uuid_list.append(uuid)
except AttributeError:
......@@ -128,8 +110,8 @@ class Transaction(object):
lockless = self.lockless_dict = defaultdict(set)
lockless[app.pt.getPartition(oid)].add(uuid)
if oid in self.conflict_dict:
# In the case of a rebase, uuid_list may not contain the id
# of the node reporting a conflict.
# In case of deadlock avoidance, uuid_list may not contain the
# id of the node reporting a conflict.
return
if uuid_list:
return
......
......@@ -322,8 +322,8 @@ class EventQueue(object):
# Stable sort when 2 keys are equal.
# XXX: Is it really useful to keep events with same key ordered
# chronologically ? The caller could use more specific keys. For
# write-locks (by the storage node), the locking tid seems enough.
# chronologically ? The caller could use more specific keys.
# For write-locks (by the storage node), the ttid seems enough.
sortQueuedEvents = (lambda key=itemgetter(0): lambda self:
self._event_queue.sort(key=key))()
......@@ -334,14 +334,6 @@ class EventQueue(object):
if key is not None:
self.sortQueuedEvents()
def sortAndExecuteQueuedEvents(self):
if self._executing_event < 0:
self.sortQueuedEvents()
self.executeQueuedEvents()
else:
# We can't sort events when they're being processed.
self._executing_event = 1
def executeQueuedEvents(self):
# Not reentrant. When processing a queued event, calling this method
# only tells the caller to retry all events from the beginning, because
......@@ -369,10 +361,6 @@ class EventQueue(object):
while done:
del queue[done.pop()]
self._executing_event = 0
# What sortAndExecuteQueuedEvents could not do immediately
# is done here:
if event[0] is not None:
self.sortQueuedEvents()
self._executing_event = -1
def logQueuedEvents(self):
......
......@@ -20,7 +20,7 @@ from msgpack import packb
# The protocol version must be increased whenever upgrading a node may require
# to upgrade other nodes.
PROTOCOL_VERSION = 1
PROTOCOL_VERSION = 2
# By encoding the handshake packet with msgpack, the whole NEO stream can be
# decoded with msgpack. The first byte is 0x92, which is different from TLS
# Handshake (0x16).
......@@ -532,28 +532,17 @@ class Packets(dict):
""")
NotifyDeadlock = notify("""
Ask master to generate a new TTID that will be used by the client to
solve a deadlock by rebasing the transaction on top of concurrent
changes.
A client acquired a write-lock before another one that has a smaller
TTID, leading to a possible deadlock. In order to solve it, this asks
the client with the greatest TTID to lock again if it can't vote.
:nodes: S -> M -> C
:nodes: S -> C
""")
AskRebaseTransaction, AnswerRebaseTransaction = request("""
Rebase a transaction to solve a deadlock.
AskRelockObject, AnswerRelockObject = request("""
Relock an object change to solve a deadlock.
:nodes: C -> S
""")
AskRebaseObject, AnswerRebaseObject = request("""
Rebase an object change to solve a deadlock.
:nodes: C -> S
XXX: It is a request packet to simplify the implementation. For more
efficiency, this should be turned into a notification, and the
RebaseTransaction should answered once all objects are rebased
(so that the client can still wait on something).
""", data_path=(1, 0, 2, 0))
AskStoreObject, AnswerStoreObject = request("""
......
......@@ -68,9 +68,6 @@ class StorageServiceHandler(BaseServiceHandler):
conn.answer(p)
app.pt.updatable(conn.getUUID(), offset_list)
def notifyDeadlock(self, conn, *args):
self.app.tm.deadlock(conn.getUUID(), *args)
def answerInformationLocked(self, conn, ttid):
self.app.tm.lock(ttid, conn.getUUID())
......
......@@ -19,15 +19,13 @@ from time import time
from struct import pack, unpack
from neo.lib import logging
from neo.lib.handler import DelayEvent, EventQueue
from neo.lib.protocol import Packets, ProtocolError, uuid_str, \
ZERO_OID, ZERO_TID
from neo.lib.protocol import ProtocolError, uuid_str, ZERO_OID, ZERO_TID
from neo.lib.util import dump, u64, addTID, tidFromTime
class Transaction(object):
"""
A pending transaction
"""
locking_tid = ZERO_TID
_tid = None
_msg_id = None
_oid_list = None
......@@ -292,19 +290,6 @@ class TransactionManager(EventQueue):
logging.debug('Begin %s', txn)
return tid
def deadlock(self, storage_id, ttid, locking_tid):
try:
txn = self._ttid_dict[ttid]
except KeyError:
return
if txn.locking_tid <= locking_tid:
client = txn.getNode()
txn.locking_tid = locking_tid = self._nextTID()
logging.info('Deadlock avoidance triggered by %s for %s:'
' new locking tid for TXN %s is %s', uuid_str(storage_id),
uuid_str(client.getUUID()), dump(ttid), dump(locking_tid))
client.send(Packets.NotifyDeadlock(ttid, locking_tid))
def vote(self, app, ttid, uuid_list):
"""
Check that the transaction can be voted
......
......@@ -133,25 +133,23 @@ class ClientOperationHandler(BaseHandler):
compression, checksum, data, data_serial, ttid, time.time()),
*e.args)
def askRebaseTransaction(self, conn, *args):
conn.answer(Packets.AnswerRebaseTransaction(
self.app.tm.rebase(conn, *args)))
def askRebaseObject(self, conn, ttid, oid):
def askRelockObject(self, conn, ttid, oid):
try:
self._askRebaseObject(conn, ttid, oid, None)
self.app.tm.relockObject(ttid, oid, True)
except DelayEvent, e:
# locked by a previous transaction, retry later
self.app.tm.queueEvent(self._askRebaseObject,
self.app.tm.queueEvent(self._askRelockObject,
conn, (ttid, oid, time.time()), *e.args)
else:
conn.answer(Packets.AnswerRelockObject(None))
def _askRebaseObject(self, conn, ttid, oid, request_time):
conflict = self.app.tm.rebaseObject(ttid, oid)
def _askRelockObject(self, conn, ttid, oid, request_time):
conflict = self.app.tm.relockObject(ttid, oid, False)
if request_time and SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
logging.info('RebaseObject delay: %.02fs', duration)
conn.answer(Packets.AnswerRebaseObject(conflict))
logging.info('RelockObject delay: %.02fs', duration)
conn.answer(Packets.AnswerRelockObject(conflict))
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
conn.answer(Packets.AnswerTIDsFrom(self.app.dm.getReplicationTIDList(
......@@ -251,8 +249,7 @@ class ClientReadOnlyOperationHandler(ClientOperationHandler):
askVoteTransaction = _readOnly
askStoreObject = _readOnly
askFinalTID = _readOnly
askRebaseObject = _readOnly
askRebaseTransaction = _readOnly
askRelockObject = _readOnly
# takes write lock & is only used when going to commit
askCheckCurrentSerial = _readOnly
......
This diff is collapsed.
......@@ -26,9 +26,8 @@ AnswerPack(bool)
AnswerPartitionList(int,int,[[(int,CellStates)]])
AnswerPartitionTable(int,int,[[(int,CellStates)]])
AnswerPrimary(int)
AnswerRebaseObject(?(p64,p64,?(int,bin,bin)))
AnswerRebaseTransaction([p64])
AnswerRecovery(?int,?p64,?p64)
AnswerRelockObject(?(p64,p64,?(int,bin,bin)))
AnswerStoreObject(?p64)
AnswerStoreTransaction()
AnswerTIDs([p64])
......@@ -61,9 +60,8 @@ AskPack(p64)
AskPartitionList(int,int,?)
AskPartitionTable()
AskPrimary()
AskRebaseObject(p64,p64)
AskRebaseTransaction(p64,p64)
AskRecovery()
AskRelockObject(p64,p64)
AskStoreObject(p64,p64,int,bin,bin,?p64,?p64)
AskStoreTransaction(p64,bin,bin,bin,[p64])
AskTIDs(int,int,int)
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment