Commit 50e7fe52 authored by Julien Muchembled's avatar Julien Muchembled

client: discard late answers to lockless writes

This fixes:

  Traceback (most recent call last):
    File "neo/client/", line 108, in tpc_vote
    File "neo/client/", line 546, in tpc_vote
    File "neo/client/", line 539, in waitStoreResponses
    File "neo/client/", line 160, in _waitAnyTransactionMessage
    File "neo/client/", line 514, in _handleConflicts
      self._store(txn_context, oid, serial, data)
    File "neo/client/", line 452, in _store
      self._waitAnyTransactionMessage(txn_context, False)
    File "neo/client/", line 155, in _waitAnyTransactionMessage
      self._waitAnyMessage(queue, block=block)
    File "neo/client/", line 142, in _waitAnyMessage
      _handlePacket(conn, packet, kw)
    File "neo/lib/", line 133, in _handlePacket
      handler.dispatch(conn, packet, kw)
    File "neo/lib/", line 72, in dispatch
      method(conn, *args, **kw)
    File "neo/client/handlers/", line 143, in answerRebaseObject
      assert cached == data
parent 82672031
......@@ -445,7 +445,7 @@ class Application(ThreadedApplication):
packet = Packets.AskStoreObject(oid, serial, compression,
checksum, compressed_data, data_serial, ttid)
txn_context.data_dict[oid] = data, serial, txn_context.write(
self, packet, oid, oid=oid)
self, packet, oid, oid=oid, serial=serial)
while txn_context.data_size >= self._cache.max_size:
......@@ -951,6 +951,6 @@ class Application(ThreadedApplication):
assert oid not in txn_context.data_dict, oid
packet = Packets.AskCheckCurrentSerial(ttid, oid, serial)
txn_context.data_dict[oid] = CHECKED_SERIAL, serial, txn_context.write(
self, packet, oid, oid=oid)
self, packet, oid, oid=oid, serial=serial)
self._waitAnyTransactionMessage(txn_context, False)
......@@ -60,11 +60,11 @@ class StorageAnswersHandler(AnswerBaseHandler):
def answerObject(self, conn, oid, *args):
def answerStoreObject(self, conn, conflict, oid):
def answerStoreObject(self, conn, conflict, oid, serial):
txn_context =
if conflict:
if conflict == ZERO_TID:
txn_context.written(, conn.getUUID(), oid, True)
txn_context.written(, conn.getUUID(), oid, serial)
# Conflicts can not be resolved now because 'conn' is locked.
# We must postpone the resolution (by queuing the conflict in
......@@ -88,7 +88,7 @@ class Transaction(object):
raise NEOStorageError(
'no storage available for write to partition %s' % object_id)
def written(self, app, uuid, oid, lockless=False):
def written(self, app, uuid, oid, lockless=None):
# When a node is being disconnected by the master because it was
# not part of the transaction that caused a conflict, we may receive a
# positive answer (not to be confused with lockless stores) before the
......@@ -113,6 +113,10 @@ class Transaction(object):
# node that was being disconnected by the master
if lockless:
if lockless != serial: # late lockless write
assert lockless < serial, (lockless, serial)
# It's safe to do this after the above excepts: either the cell is
# already marked as lockless or the node will be reported as failed.
lockless = self.lockless_dict
......@@ -51,7 +51,9 @@ class Transaction(object):
self.uuid = uuid
self.serial_dict = {}
self.store_dict = {}
# We must distinguish lockless stores from deadlocks.
# Remember the oids for which we didn't check for conflict. This is
# used primarily to know when to declare a cell really readable once
# the replication is finished.
self.lockless = set()
def __repr__(self):
......@@ -221,6 +223,9 @@ class TransactionManager(EventQueue):
# fixing the store lock.
if locked == ttid:
del self._store_lock_dict[oid]
# However here, we don't try to remember lockless writes: later,
# we may give write-locks to oids that would normally conflict.
# Readable cells prevent such scenario to go wrong.
lockless = transaction.lockless
if locking_tid == MAX_TID:
if lockless:
......@@ -233,7 +238,7 @@ class TransactionManager(EventQueue):
# become normal ones: this transaction does not block anymore
# replicated partitions from being marked as UP_TO_DATE.
oid = [oid
for oid in lockless.intersection(transaction.serial_dict)
for oid in lockless
if self.getPartition(oid) not in self._replicating]
if oid:
......@@ -272,6 +277,8 @@ class TransactionManager(EventQueue):
self.lockObject(ttid, serial, oid)
except ConflictError:
except Exception, e: # pragma: no cover
raise AssertionError(e)
return recheck_set
def vote(self, ttid, txn_info=None):
......@@ -1048,6 +1048,10 @@ class NEOThreadedTest(NeoTestBase):
return Patch(jar.db(), getConnForNode=lambda orig, node:
None if node.getUUID() == storage.uuid else orig(node))
def getConnectionApp(conn):
return getattr(conn.getHandler(), 'app', None)
def readCurrent(ob):
......@@ -1472,11 +1472,11 @@ class Test(NEOThreadedTest):
reports a conflict after that this conflict was fully resolved with
another node.
def answerStoreObject(orig, conn, conflict, oid):
def answerStoreObject(orig, conn, conflict, oid, serial):
if not conflict:
orig(conn, conflict, oid)
orig(conn, conflict, oid, serial)
if 1:
s0, s1 = cluster.storage_list
t1, c1 = cluster.getTransaction()
......@@ -1774,7 +1774,7 @@ class Test(NEOThreadedTest):
f.add(delayAnswerStoreObject, Patch(Transaction, written=written))
def delayAnswerStoreObject(conn, packet):
return (isinstance(packet, Packets.AnswerStoreObject)
and getattr(conn.getHandler(), 'app', None) is s)
and self.getConnectionApp(conn) is s)
def written(orig, *args):
......@@ -2126,7 +2126,7 @@ class Test(NEOThreadedTest):
# Delay the conflict for the second store of 'a' by t3.
delay_conflict = {s0.uuid: [1], s1.uuid: [1,0]}
def delayConflict(conn, packet):
app = conn.getHandler().app
app = self.getConnectionApp(conn)
if (isinstance(packet, Packets.AnswerStoreObject)
and packet.decode()[0]):
conn, = cluster.client.getConnectionList(app)
......@@ -2265,6 +2265,102 @@ class Test(NEOThreadedTest):
self.assertEqual(end, {0: ['tpc_abort']})
self.assertPartitionTable(cluster, 'UU|UU')
@with_cluster(partitions=2, storage_count=2)
def testConflictAfterLocklessWrite(self, cluster):
Show that in case of a write to an outdated cell, the client must
discard the answer if it comes after a resolved conflict, as the client
would not have the data anymore to solve a second conflict (deadlock
avoidance). This test reproduces a case where the storage node can't
easily return the correct data back to the client.
The scenario focuses on object A (oid 1) and node s0, which is
initially replicating partition 1:
1. t1 writes A: s1 conflicts and the answer from s0 is delayed
2. t1 writes B: a deadlock is triggered by s0 and internally, the write
of A is not considered lockless anymore
3. replication of partition 1 finishes: A is marked as locked normally
(which can be considered wrong but discarding the write at that
point is not trivial and anyway another write is coming)
4. t1 resolves A: s1 is not yet notified of the deadlock and accepts
5. t1 receives the answer for the first write of A to s1: if it didn't
discard it, it would mark the write of A as completed on all nodes
6. t1 starts resolving the deadlock, s0 conflicts for the second store
and returns that A needs to be rebased (like in 3, questionable)
7. the answer of s0 for the rebasing of A contains data from the first
write and it is processed first: this is not an issue if the client
still has the data (i.e. not moved to transaction cache, or
discarded because the cache is too small)
t1, c1 = cluster.getTransaction()
r = c1.root()
for x in 'ab':
r[x] = PCounterWithResolution()
s0, s1 = cluster.sortStorageList()
t1, c1 = cluster.getTransaction()
r = c1.root()
r['a'].value += 1
r['b'].value += 2
with cluster.newClient(1) as db, ConnectionFilter() as f:
delayReplication = f.delayAnswerFetchObjects()
delayStore = f.delayAnswerStoreObject(lambda conn:
conn.uuid == cluster.client.uuid and
self.getConnectionApp(conn) is s0)
delayDeadlock = f.delayNotifyDeadlock()
delayRebase = f.delayAnswerRebaseObject(lambda conn:
# to first process the one from s0
self.getConnectionApp(conn) is s1)
t2, c2 = cluster.getTransaction(db)
r = c2.root()
r['a'].value += 3 # for a first conflict on t1/s1
r['b'].value += 4 # for a deadlock on t1/s0
r['a'].value += 5 # for a second conflict on t1/s0
def t1_b(*args, **kw):
self.tic() # make sure t2/b will be processed before t1/b
yield 0
def t1_resolve(*args, **kw):
yield 1
def t2_vote(*args, **kw):
yield 0
# From now own, prefer reading from s0,
# in case that packets from s1 are blocked by the filter.
def t1_end(*args, **kw):
yield 0
commit2 = self.newPausedThread(t2.commit)
no_read = []
with cluster.client.extraCellSortKey(
lambda cell: cell.getUUID() in no_read), \
(1, 1, 0, 0, t1_b, t1_resolve, 0, 0, 0, 0, 1, t2_vote, t1_end),
('tpc_begin', 'tpc_begin', 2, 1, 2, 1, 1,
'RebaseTransaction', 'RebaseTransaction',
'AnswerRebaseTransaction', 'AnswerRebaseTransaction',
'StoreTransaction')) as end:
r = c1.root()
self.assertEqual(r['a'].value, 9)
self.assertEqual(r['b'].value, 6)
t1 = end.pop(0)
self.assertEqual(t1.pop(), 'StoreTransaction')
self.assertEqual(sorted(t1), [1, 2])
self.assertPartitionTable(cluster, 'UU|UU')
@with_cluster(storage_count=2, partitions=2)
def testDeadlockAvoidanceBeforeInvolvingAnotherNode(self, cluster):
t1, c1 = cluster.getTransaction()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment