Commit 7fff11f6 authored by Julien Muchembled's avatar Julien Muchembled

storage: fix write-locking bug when a deadlock happens at the end of a replication

During rebase, writes could stay lockless although the partition was
replicated. Another transaction could then take locks prematurely, leading to
the following crash:

  Traceback (most recent call last):
    File "neo/lib/handler.py", line 72, in dispatch
      method(conn, *args, **kw)
    File "neo/storage/handlers/master.py", line 36, in notifyUnlockInformation
      self.app.tm.unlock(ttid)
    File "neo/storage/transactions.py", line 329, in unlock
      self.abort(ttid, even_if_locked=True)
    File "neo/storage/transactions.py", line 573, in abort
      not self._replicated.get(self.getPartition(oid))), x
  AssertionError: ('\x00\x00\x00\x00\x00\x03\x03v', '\x03\xca\xb44J\x13\x99\x88', '\x03\xca\xb44J\xe0\xdcU', {}, set(['\x00\x00\x00\x00\x00\x03\x03v']))
parent efaae043
......@@ -157,8 +157,6 @@ class TransactionManager(EventQueue):
# which will delay new transactions.
for txn, ttid in sorted((txn, ttid) for ttid, txn in
self._transaction_dict.iteritems()):
if txn.locking_tid == MAX_TID:
break # all remaining transactions are resolving a deadlock
assert txn.lockless.issubset(txn.serial_dict), (
txn.lockless, txn.serial_dict)
for oid in txn.lockless:
......@@ -224,17 +222,22 @@ class TransactionManager(EventQueue):
if locked == ttid:
del self._store_lock_dict[oid]
lockless = transaction.lockless
# There's nothing to rebase for lockless stores to replicating
# partitions because there's no lock taken yet. In other words,
# rebasing such stores would do nothing. Other lockless stores
# become normal ones: this transaction does not block anymore
# replicated partitions from being marked as UP_TO_DATE.
oid = [oid
for oid in lockless.intersection(transaction.serial_dict)
if self.getPartition(oid) not in self._replicating]
if oid:
lockless.difference_update(oid)
self._notifyReplicated()
if locking_tid == MAX_TID:
if lockless:
lockless.clear()
self._notifyReplicated()
else:
# There's nothing to rebase for lockless stores to replicating
# partitions because there's no lock taken yet. In other words,
# rebasing such stores would do nothing. Other lockless stores
# become normal ones: this transaction does not block anymore
# replicated partitions from being marked as UP_TO_DATE.
oid = [oid
for oid in lockless.intersection(transaction.serial_dict)
if self.getPartition(oid) not in self._replicating]
if oid:
lockless.difference_update(oid)
self._notifyReplicated()
# Some locks were released, some pending locks may now succeed.
# We may even have delayed stores for this transaction, like the one
# that triggered the deadlock. They must also be sorted again because
......
......@@ -46,6 +46,7 @@ from neo.storage.database import DatabaseFailure
from neo.storage.handlers.client import ClientOperationHandler
from neo.storage.handlers.identification import IdentificationHandler
from neo.storage.handlers.initialization import InitializationHandler
from neo.storage.replicator import Replicator
class PCounter(Persistent):
value = 0
......@@ -2208,6 +2209,62 @@ class Test(NEOThreadedTest):
t2.begin()
self.assertEqual([4, 6], [r[x].value for x in 'ab'])
@with_cluster(replicas=1, partitions=2)
def testNotifyReplicated3(self, cluster):
s0, s1 = cluster.storage_list
t1, c1 = cluster.getTransaction()
r = c1.root()
r[''] = PCounter()
t1.commit()
s1.stop()
cluster.join((s1,))
s1.resetNode()
nonlocal_ = [0]
class Abort(Exception):
pass
with cluster.newClient(1) as db, Patch(Replicator,
_nextPartitionSortKey=lambda orig, self, offset: offset):
t3, c3 = cluster.getTransaction(db)
with ConnectionFilter() as f, self.noConnection(c3, s0):
@f.delayAnswerFetchObjects
def delay(_):
if nonlocal_:
return nonlocal_.pop()
s1.start()
self.tic()
r[''].value += 1
r._p_changed = 1
t2, c2 = cluster.getTransaction()
c2.root()._p_changed = 1
def t1_rebase(*args, **kw):
self.tic()
f.remove(delay)
yield 0
@self.newPausedThread
def commit23():
t2.commit()
c3.root()[''].value += 3
with self.assertRaises(Abort) as cm:
t3.commit()
self.assertTrue(*cm.exception.args)
def t3_commit(txn):
# Check that the storage hasn't answered to the store,
# which means that a lock is still taken for r[''] by t1.
self.tic()
txn_context = db.storage.app._txn_container.get(txn)
raise Abort(txn_context.queue.empty())
TransactionalResource(t3, 1, commit=t3_commit)
with self.thread_switcher((commit23,),
(1, 1, 0, 0, t1_rebase, 0, 0, 0, 1, 1, 1, 1, 0),
('tpc_begin', 'tpc_begin', 0, 1, 0,
'RebaseTransaction', 'RebaseTransaction',
'AnswerRebaseTransaction', 'AnswerRebaseTransaction',
'StoreTransaction', 'tpc_begin', 1, 'tpc_abort')) as end:
self.assertRaises(POSException.ConflictError, t1.commit)
commit23.join()
self.assertEqual(end, {0: ['tpc_abort']})
self.assertPartitionTable(cluster, 'UU|UU')
@with_cluster(storage_count=2, partitions=2)
def testDeadlockAvoidanceBeforeInvolvingAnotherNode(self, cluster):
t1, c1 = cluster.getTransaction()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment