Commit 1280f73e authored by Julien Muchembled's avatar Julien Muchembled

storage: in deadlock avoidance, fix performance issue that could freeze the cluster

In the worst case, with many clients trying to lock the same oids,
the cluster could enter in an infinite cascade of deadlocks.

Here is an overview with 3 storage nodes and 3 transactions:

 S1     S2     S3     order of locking tids          # abbreviations:
 l1     l1     l2     123                            #  l: lock
 q23    q23    d1q3   231                            #  d: deadlock triggered
 r1:l3  r1:l2  (r1)   # for S3, we still have l2     #  q: queued
 d2q1   q13    q13    312                            #  r: rebase

Above, we show what happens when a random transaction gets a lock just after
that another is rebased. Here, the result is that the last 2 lines are a
permutation of the first 2, and this can repeat indefinitely with bad luck.

This commit reduces the probability of deadlock by processing delayed
stores/checks in the order of their locking tid. In the above example,
S1 would give the lock to 2 when 1 is rebased, and 2 would vote successfully.
parent 1b9f8f72
...@@ -70,9 +70,9 @@ class EventHandler(object): ...@@ -70,9 +70,9 @@ class EventHandler(object):
raise UnexpectedPacketError('no handler found') raise UnexpectedPacketError('no handler found')
args = packet.decode() or () args = packet.decode() or ()
method(conn, *args, **kw) method(conn, *args, **kw)
except DelayEvent: except DelayEvent, e:
assert not kw, kw assert not kw, kw
self.getEventQueue().queueEvent(method, conn, args) self.getEventQueue().queueEvent(method, conn, args, *e.args)
except UnexpectedPacketError, e: except UnexpectedPacketError, e:
if not conn.isClosed(): if not conn.isClosed():
self.__unexpectedPacket(conn, packet, *e.args) self.__unexpectedPacket(conn, packet, *e.args)
...@@ -311,36 +311,55 @@ class _DelayedConnectionEvent(EventHandler): ...@@ -311,36 +311,55 @@ class _DelayedConnectionEvent(EventHandler):
class EventQueue(object): class EventQueue(object):
def __init__(self): def __init__(self):
self._event_queue = deque() self._event_queue = []
self._executing_event = -1 self._executing_event = -1
def queueEvent(self, func, conn=None, args=()): def queueEvent(self, func, conn=None, args=(), key=None):
self._event_queue.append(func if conn is None else assert self._executing_event < 0, self._executing_event
_DelayedConnectionEvent(func, conn, args)) self._event_queue.append((key, func if conn is None else
_DelayedConnectionEvent(func, conn, args)))
if key is not None:
self._event_queue.sort()
def sortAndExecuteQueuedEvents(self):
if self._executing_event < 0:
self._event_queue.sort()
self.executeQueuedEvents()
else:
# We can't sort events when they're being processed.
self._executing_event = 1
def executeQueuedEvents(self): def executeQueuedEvents(self):
# Not reentrant. When processing a queued event, calling this method # Not reentrant. When processing a queued event, calling this method
# only tells the caller to retry all events from the beginning, because # only tells the caller to retry all events from the beginning, because
# events for the same connection must be processed in chronological # events for the same connection must be processed in chronological
# order. # order.
self._executing_event += 1
if self._executing_event:
return
queue = self._event_queue queue = self._event_queue
n = len(queue) if queue: # return quickly if the queue is empty
while n: self._executing_event += 1
try:
queue[0]()
except DelayEvent:
queue.rotate(-1)
else:
del queue[0]
n -= 1
if self._executing_event: if self._executing_event:
return
done = []
while 1:
try:
for i, event in enumerate(queue):
try:
event[1]()
done.append(i)
except DelayEvent:
pass
if self._executing_event:
break
else:
break
finally:
while done:
del queue[done.pop()]
self._executing_event = 0 self._executing_event = 0
queue.rotate(-n) # What sortQueuedEvents could not do immediately is done here:
n = len(queue) if event[0] is not None:
self._executing_event = -1 queue.sort()
self._executing_event = -1
def logQueuedEvents(self): def logQueuedEvents(self):
if self._event_queue: if self._event_queue:
......
...@@ -109,7 +109,7 @@ class Checker(object): ...@@ -109,7 +109,7 @@ class Checker(object):
self.source = source self.source = source
def start(): def start():
if app.tm.isLockedTid(max_tid): if app.tm.isLockedTid(max_tid):
app.tm.queueEvent(start) app.tm.read_queue.queueEvent(start)
return return
args = partition, CHECK_COUNT, min_tid, max_tid args = partition, CHECK_COUNT, min_tid, max_tid
p = Packets.AskCheckTIDRange(*args) p = Packets.AskCheckTIDRange(*args)
......
...@@ -40,7 +40,7 @@ class ClientOperationHandler(BaseHandler): ...@@ -40,7 +40,7 @@ class ClientOperationHandler(BaseHandler):
def getEventQueue(self): def getEventQueue(self):
# for read rpc # for read rpc
return self.app.tm return self.app.tm.read_queue
def askObject(self, conn, oid, serial, tid): def askObject(self, conn, oid, serial, tid):
app = self.app app = self.app
...@@ -106,10 +106,11 @@ class ClientOperationHandler(BaseHandler): ...@@ -106,10 +106,11 @@ class ClientOperationHandler(BaseHandler):
try: try:
self._askStoreObject(conn, oid, serial, compression, self._askStoreObject(conn, oid, serial, compression,
checksum, data, data_serial, ttid, None) checksum, data, data_serial, ttid, None)
except DelayEvent: except DelayEvent, e:
# locked by a previous transaction, retry later # locked by a previous transaction, retry later
self.app.tm.queueEvent(self._askStoreObject, conn, (oid, serial, self.app.tm.queueEvent(self._askStoreObject, conn, (oid, serial,
compression, checksum, data, data_serial, ttid, time.time())) compression, checksum, data, data_serial, ttid, time.time()),
*e.args)
def askRebaseTransaction(self, conn, *args): def askRebaseTransaction(self, conn, *args):
conn.answer(Packets.AnswerRebaseTransaction( conn.answer(Packets.AnswerRebaseTransaction(
...@@ -118,10 +119,10 @@ class ClientOperationHandler(BaseHandler): ...@@ -118,10 +119,10 @@ class ClientOperationHandler(BaseHandler):
def askRebaseObject(self, conn, ttid, oid): def askRebaseObject(self, conn, ttid, oid):
try: try:
self._askRebaseObject(conn, ttid, oid, None) self._askRebaseObject(conn, ttid, oid, None)
except DelayEvent: except DelayEvent, e:
# locked by a previous transaction, retry later # locked by a previous transaction, retry later
self.app.tm.queueEvent(self._askRebaseObject, self.app.tm.queueEvent(self._askRebaseObject,
conn, (ttid, oid, time.time())) conn, (ttid, oid, time.time()), *e.args)
def _askRebaseObject(self, conn, ttid, oid, request_time): def _askRebaseObject(self, conn, ttid, oid, request_time):
conflict = self.app.tm.rebaseObject(ttid, oid) conflict = self.app.tm.rebaseObject(ttid, oid)
...@@ -188,10 +189,10 @@ class ClientOperationHandler(BaseHandler): ...@@ -188,10 +189,10 @@ class ClientOperationHandler(BaseHandler):
self.app.tm.register(conn, ttid) self.app.tm.register(conn, ttid)
try: try:
self._askCheckCurrentSerial(conn, ttid, oid, serial, None) self._askCheckCurrentSerial(conn, ttid, oid, serial, None)
except DelayEvent: except DelayEvent, e:
# locked by a previous transaction, retry later # locked by a previous transaction, retry later
self.app.tm.queueEvent(self._askCheckCurrentSerial, self.app.tm.queueEvent(self._askCheckCurrentSerial,
conn, (ttid, oid, serial, time.time())) conn, (ttid, oid, serial, time.time()), *e.args)
def _askCheckCurrentSerial(self, conn, ttid, oid, serial, request_time): def _askCheckCurrentSerial(self, conn, ttid, oid, serial, request_time):
try: try:
......
...@@ -84,14 +84,11 @@ class Transaction(object): ...@@ -84,14 +84,11 @@ class Transaction(object):
class TransactionManager(EventQueue): class TransactionManager(EventQueue):
""" """
Manage pending transaction and locks Manage pending transaction and locks
XXX: EventQueue is not very suited for deadlocks. It would be more
efficient to sort delayed packets by locking tid in order to minimize
cascaded deadlocks.
""" """
def __init__(self, app): def __init__(self, app):
EventQueue.__init__(self) EventQueue.__init__(self)
self.read_queue = EventQueue()
self._app = app self._app = app
self._transaction_dict = {} self._transaction_dict = {}
self._store_lock_dict = {} self._store_lock_dict = {}
...@@ -208,8 +205,9 @@ class TransactionManager(EventQueue): ...@@ -208,8 +205,9 @@ class TransactionManager(EventQueue):
self._notifyReplicated() self._notifyReplicated()
# Some locks were released, some pending locks may now succeed. # Some locks were released, some pending locks may now succeed.
# We may even have delayed stores for this transaction, like the one # We may even have delayed stores for this transaction, like the one
# that triggered the deadlock. # that triggered the deadlock. They must also be sorted again because
self.executeQueuedEvents() # our locking tid has changed.
self.sortAndExecuteQueuedEvents()
def rebase(self, conn, ttid, locking_tid): def rebase(self, conn, ttid, locking_tid):
self.register(conn, ttid) self.register(conn, ttid)
...@@ -337,7 +335,7 @@ class TransactionManager(EventQueue): ...@@ -337,7 +335,7 @@ class TransactionManager(EventQueue):
# but this is not a problem. EventQueue processes them in order # but this is not a problem. EventQueue processes them in order
# and only the last one will not result in conflicts (that are # and only the last one will not result in conflicts (that are
# already resolved). # already resolved).
raise DelayEvent raise DelayEvent(transaction)
if oid in transaction.lockless: if oid in transaction.lockless:
# This is a consequence of not having taken a lock during # This is a consequence of not having taken a lock during
# replication. After a ConflictError, we may be asked to "lock" # replication. After a ConflictError, we may be asked to "lock"
...@@ -358,7 +356,7 @@ class TransactionManager(EventQueue): ...@@ -358,7 +356,7 @@ class TransactionManager(EventQueue):
self._app.master_conn.send(Packets.NotifyDeadlock( self._app.master_conn.send(Packets.NotifyDeadlock(
ttid, transaction.locking_tid)) ttid, transaction.locking_tid))
self._rebase(transaction, ttid) self._rebase(transaction, ttid)
raise DelayEvent raise DelayEvent(transaction)
# If previous store was an undo, next store must be based on # If previous store was an undo, next store must be based on
# undo target. # undo target.
try: try:
...@@ -387,7 +385,7 @@ class TransactionManager(EventQueue): ...@@ -387,7 +385,7 @@ class TransactionManager(EventQueue):
return return
elif transaction.locking_tid == MAX_TID: elif transaction.locking_tid == MAX_TID:
# Deadlock avoidance. Still no new locking_tid from the client. # Deadlock avoidance. Still no new locking_tid from the client.
raise DelayEvent raise DelayEvent(transaction)
else: else:
previous_serial = self._app.dm.getLastObjectTID(oid) previous_serial = self._app.dm.getLastObjectTID(oid)
# Locking before reporting a conflict would speed up the case of # Locking before reporting a conflict would speed up the case of
...@@ -521,6 +519,7 @@ class TransactionManager(EventQueue): ...@@ -521,6 +519,7 @@ class TransactionManager(EventQueue):
if self._replicated: if self._replicated:
self._notifyReplicated() self._notifyReplicated()
# some locks were released, some pending locks may now succeed # some locks were released, some pending locks may now succeed
self.read_queue.executeQueuedEvents()
self.executeQueuedEvents() self.executeQueuedEvents()
def abortFor(self, uuid): def abortFor(self, uuid):
...@@ -551,6 +550,7 @@ class TransactionManager(EventQueue): ...@@ -551,6 +550,7 @@ class TransactionManager(EventQueue):
for oid, ttid in self._store_lock_dict.iteritems(): for oid, ttid in self._store_lock_dict.iteritems():
logging.info(' %s by %s', dump(oid), dump(ttid)) logging.info(' %s by %s', dump(oid), dump(ttid))
self.logQueuedEvents() self.logQueuedEvents()
self.read_queue.logQueuedEvents()
def updateObjectDataForPack(self, oid, orig_serial, new_serial, data_id): def updateObjectDataForPack(self, oid, orig_serial, new_serial, data_id):
lock_tid = self.getLockingTID(oid) lock_tid = self.getLockingTID(oid)
......
...@@ -2060,6 +2060,40 @@ class Test(NEOThreadedTest): ...@@ -2060,6 +2060,40 @@ class Test(NEOThreadedTest):
self.assertEqual(end, {0: ['AnswerRebaseTransaction', self.assertEqual(end, {0: ['AnswerRebaseTransaction',
'StoreTransaction', 'VoteTransaction']}) 'StoreTransaction', 'VoteTransaction']})
@with_cluster()
def testDelayedStoreOrdering(self, cluster):
"""
By processing delayed stores (EventQueue) in the order of their locking
tid, we minimize the number deadlocks. Here, we trigger a first
deadlock, so that the delayed check for t1 is reordered after that of
t3.
"""
t1, c1 = cluster.getTransaction()
r = c1.root()
for x in 'abcd':
r[x] = PCounter()
t1.commit()
r['a'].value += 1
self.readCurrent(r['d'])
t2, c2 = cluster.getTransaction()
r = c2.root()
r['b'].value += 1
self.readCurrent(r['d'])
t3, c3 = cluster.getTransaction()
r = c3.root()
r['c'].value += 1
self.readCurrent(r['d'])
threads = map(self.newPausedThread, (t2.commit, t3.commit))
with self.thread_switcher(threads, (1, 2, 0, 1, 2, 1, 0, 2, 0, 1, 2),
('tpc_begin', 'tpc_begin', 'tpc_begin', 1, 2, 3, 4, 4, 4,
'RebaseTransaction', 'StoreTransaction')) as end:
t1.commit()
for t in threads:
t.join()
self.assertEqual(end, {
0: ['AnswerRebaseTransaction', 'StoreTransaction'],
2: ['StoreTransaction']})
@with_cluster(replicas=1) @with_cluster(replicas=1)
def testConflictAfterDeadlockWithSlowReplica1(self, cluster, def testConflictAfterDeadlockWithSlowReplica1(self, cluster,
slow_rebase=False): slow_rebase=False):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment