Commit cb81bfe3 authored by Julien Muchembled's avatar Julien Muchembled

storage: fix replication when clients already committed transactions

Similar to r2777 but for transactions instead of objects.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2780 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent ce645dc5
...@@ -423,12 +423,11 @@ class BTreeDatabaseManager(DatabaseManager): ...@@ -423,12 +423,11 @@ class BTreeDatabaseManager(DatabaseManager):
except KeyError: except KeyError:
pass pass
def deleteTransactionsAbove(self, num_partitions, partition, tid): def deleteTransactionsAbove(self, num_partitions, partition, tid, max_tid):
tid = util.u64(tid)
def same_partition(key, _): def same_partition(key, _):
return key % num_partitions == partition return key % num_partitions == partition
batchDelete(self._trans, same_partition, batchDelete(self._trans, same_partition,
iter_kw={'min': tid}) iter_kw={'min': util.u64(tid), 'max': util.u64(max_tid)})
def deleteObject(self, oid, serial=None): def deleteObject(self, oid, serial=None):
u64 = util.u64 u64 = util.u64
......
...@@ -391,9 +391,10 @@ class DatabaseManager(object): ...@@ -391,9 +391,10 @@ class DatabaseManager(object):
an oid list""" an oid list"""
raise NotImplementedError raise NotImplementedError
def deleteTransactionsAbove(self, num_partitions, partition, tid): def deleteTransactionsAbove(self, num_partitions, partition, tid, max_tid):
"""Delete all transactions above given TID (inclued) in given """Delete all transactions above given TID (inclued) in given
partition.""" partition, but never above max_tid (in case transactions are committed
during replication)."""
raise NotImplementedError raise NotImplementedError
def deleteObject(self, oid, serial=None): def deleteObject(self, oid, serial=None):
......
...@@ -541,13 +541,14 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -541,13 +541,14 @@ class MySQLDatabaseManager(DatabaseManager):
raise raise
self.commit() self.commit()
def deleteTransactionsAbove(self, num_partitions, partition, tid): def deleteTransactionsAbove(self, num_partitions, partition, tid, max_tid):
self.begin() self.begin()
try: try:
self.query('DELETE FROM trans WHERE partition=%(partition)d AND ' self.query('DELETE FROM trans WHERE partition=%(partition)d AND '
'tid >= %(tid)d' % { '%(tid)d <= tid AND tid <= %(max_tid)d' % {
'partition': partition, 'partition': partition,
'tid': util.u64(tid), 'tid': util.u64(tid),
'max_tid': util.u64(max_tid),
}) })
except: except:
self.rollback() self.rollback()
......
...@@ -256,6 +256,7 @@ class ReplicationHandler(EventHandler): ...@@ -256,6 +256,7 @@ class ReplicationHandler(EventHandler):
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
def answerCheckTIDRange(self, conn, min_tid, length, count, tid_checksum, def answerCheckTIDRange(self, conn, min_tid, length, count, tid_checksum,
max_tid): max_tid):
pkt_min_tid = min_tid
ask = conn.ask ask = conn.ask
app = self.app app = self.app
replicator = app.replicator replicator = app.replicator
...@@ -264,6 +265,7 @@ class ReplicationHandler(EventHandler): ...@@ -264,6 +265,7 @@ class ReplicationHandler(EventHandler):
replicator.getTIDCheckResult(min_tid, length) == ( replicator.getTIDCheckResult(min_tid, length) == (
count, tid_checksum, max_tid), min_tid, next_tid, length, count, tid_checksum, max_tid), min_tid, next_tid, length,
count) count)
critical_tid = replicator.getCurrentCriticalTID()
if action == CHECK_REPLICATE: if action == CHECK_REPLICATE:
(min_tid, ) = params (min_tid, ) = params
ask(self._doAskTIDsFrom(min_tid, count)) ask(self._doAskTIDsFrom(min_tid, count))
...@@ -272,23 +274,26 @@ class ReplicationHandler(EventHandler): ...@@ -272,23 +274,26 @@ class ReplicationHandler(EventHandler):
params = (next_tid, ) params = (next_tid, )
if action == CHECK_CHUNK: if action == CHECK_CHUNK:
(min_tid, count) = params (min_tid, count) = params
if min_tid >= replicator.getCurrentCriticalTID(): if min_tid >= critical_tid:
# Stop if past critical TID # Stop if past critical TID
action = CHECK_DONE action = CHECK_DONE
params = (next_tid, ) params = (next_tid, )
else: else:
max_tid = replicator.getCurrentCriticalTID() ask(self._doAskCheckTIDRange(min_tid, critical_tid, count))
ask(self._doAskCheckTIDRange(min_tid, max_tid, count))
if action == CHECK_DONE: if action == CHECK_DONE:
# Delete all transactions we might have which are beyond what peer # Delete all transactions we might have which are beyond what peer
# knows. # knows.
(last_tid, ) = params (last_tid, ) = params
offset = replicator.getCurrentOffset()
neo.lib.logging.debug("TID range checked (offset=%s, min_tid=%x,"
" length=%s, count=%s, max_tid=%x, last_tid=%x,"
" critical_tid=%x)", offset, u64(pkt_min_tid), length, count,
u64(max_tid), u64(last_tid), u64(critical_tid))
app.dm.deleteTransactionsAbove(app.pt.getPartitions(), app.dm.deleteTransactionsAbove(app.pt.getPartitions(),
replicator.getCurrentOffset(), last_tid) offset, last_tid, critical_tid)
# If no more TID, a replication of transactions is finished. # If no more TID, a replication of transactions is finished.
# So start to replicate objects now. # So start to replicate objects now.
max_tid = replicator.getCurrentCriticalTID() ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID, critical_tid))
ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID, max_tid))
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
def answerCheckSerialRange(self, conn, min_oid, min_serial, length, count, def answerCheckSerialRange(self, conn, min_oid, min_serial, length, count,
......
...@@ -380,7 +380,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -380,7 +380,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
# ...and delete partition tail # ...and delete partition tail
calls = app.dm.mockGetNamedCalls('deleteTransactionsAbove') calls = app.dm.mockGetNamedCalls('deleteTransactionsAbove')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(num_partitions, rid, add64(max_tid, 1)) calls[0].checkArgs(num_partitions, rid, add64(max_tid, 1), ZERO_TID)
def test_answerCheckTIDRangeDifferentBigChunk(self): def test_answerCheckTIDRangeDifferentBigChunk(self):
min_tid = self.getNextTID() min_tid = self.getNextTID()
......
...@@ -346,7 +346,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -346,7 +346,7 @@ class StorageDBTests(NeoUnitTestBase):
txn, objs = self.getTransaction([oid1]) txn, objs = self.getTransaction([oid1])
self.db.storeTransaction(tid, objs, txn) self.db.storeTransaction(tid, objs, txn)
self.db.finishTransaction(tid) self.db.finishTransaction(tid)
self.db.deleteTransactionsAbove(2, 0, tid2) self.db.deleteTransactionsAbove(2, 0, tid2, tid3)
# Right partition, below cutoff # Right partition, below cutoff
self.assertNotEqual(self.db.getTransaction(tid1, True), None) self.assertNotEqual(self.db.getTransaction(tid1, True), None)
# Wrong partition, above cutoff # Wrong partition, above cutoff
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment