Commit 62ccef7f authored by Vincent Pelletier's avatar Vincent Pelletier

Fix replication when last TID range is empty. TO TEST

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2476 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent e242ab2d
...@@ -440,7 +440,7 @@ class BTreeDatabaseManager(DatabaseManager): ...@@ -440,7 +440,7 @@ class BTreeDatabaseManager(DatabaseManager):
def same_partition(key, _): def same_partition(key, _):
return key % num_partitions == partition return key % num_partitions == partition
batchDelete(self._trans, same_partition, batchDelete(self._trans, same_partition,
iter_kw={'min': tid, 'excludemin': True}) iter_kw={'min': tid})
def deleteObject(self, oid, serial=None): def deleteObject(self, oid, serial=None):
u64 = util.u64 u64 = util.u64
...@@ -477,7 +477,7 @@ class BTreeDatabaseManager(DatabaseManager): ...@@ -477,7 +477,7 @@ class BTreeDatabaseManager(DatabaseManager):
pass pass
else: else:
batchDelete(tserial, lambda _, __: True, batchDelete(tserial, lambda _, __: True,
iter_kw={'min': serial, 'excludemin': True}) iter_kw={'min': serial})
def same_partition(key, _): def same_partition(key, _):
return key % num_partitions == partition return key % num_partitions == partition
batchDelete(obj, same_partition, batchDelete(obj, same_partition,
......
...@@ -389,7 +389,7 @@ class DatabaseManager(object): ...@@ -389,7 +389,7 @@ class DatabaseManager(object):
raise NotImplementedError raise NotImplementedError
def deleteTransactionsAbove(self, num_partitions, partition, tid): def deleteTransactionsAbove(self, num_partitions, partition, tid):
"""Delete all transactions above given TID (excluded) in given """Delete all transactions above given TID (inclued) in given
partition.""" partition."""
raise NotImplementedError raise NotImplementedError
...@@ -399,7 +399,7 @@ class DatabaseManager(object): ...@@ -399,7 +399,7 @@ class DatabaseManager(object):
raise NotImplementedError raise NotImplementedError
def deleteObjectsAbove(self, num_partitions, partition, oid, serial): def deleteObjectsAbove(self, num_partitions, partition, oid, serial):
"""Delete all objects above given OID and serial (excluded) in given """Delete all objects above given OID and serial (inclued) in given
partition.""" partition."""
raise NotImplementedError raise NotImplementedError
......
...@@ -561,7 +561,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -561,7 +561,7 @@ class MySQLDatabaseManager(DatabaseManager):
self.begin() self.begin()
try: try:
self.query('DELETE FROM trans WHERE partition=%(partition)d AND ' self.query('DELETE FROM trans WHERE partition=%(partition)d AND '
'tid > %(tid)d' % { 'tid >= %(tid)d' % {
'partition': partition, 'partition': partition,
'tid': util.u64(tid), 'tid': util.u64(tid),
}) })
...@@ -595,7 +595,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -595,7 +595,7 @@ class MySQLDatabaseManager(DatabaseManager):
self.begin() self.begin()
try: try:
self.query('DELETE FROM obj WHERE partition=%(partition)d AND (' self.query('DELETE FROM obj WHERE partition=%(partition)d AND ('
'oid > %(oid)d OR (oid = %(oid)d AND serial > %(serial)d))' % { 'oid > %(oid)d OR (oid = %(oid)d AND serial >= %(serial)d))' % {
'partition': partition, 'partition': partition,
'oid': u64(oid), 'oid': u64(oid),
'serial': u64(serial), 'serial': u64(serial),
......
...@@ -100,6 +100,7 @@ class ReplicationHandler(EventHandler): ...@@ -100,6 +100,7 @@ class ReplicationHandler(EventHandler):
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
def answerTIDsFrom(self, conn, tid_list): def answerTIDsFrom(self, conn, tid_list):
assert tid_list
app = self.app app = self.app
ask = conn.ask ask = conn.ask
# If I have pending TIDs, check which TIDs I don't have, and # If I have pending TIDs, check which TIDs I don't have, and
...@@ -111,13 +112,10 @@ class ReplicationHandler(EventHandler): ...@@ -111,13 +112,10 @@ class ReplicationHandler(EventHandler):
deleteTransaction = app.dm.deleteTransaction deleteTransaction = app.dm.deleteTransaction
for tid in extra_tid_set: for tid in extra_tid_set:
deleteTransaction(tid) deleteTransaction(tid)
if tid_list: missing_tid_set = tid_set - my_tid_set
missing_tid_set = tid_set - my_tid_set for tid in missing_tid_set:
for tid in missing_tid_set: ask(Packets.AskTransactionInformation(tid), timeout=300)
ask(Packets.AskTransactionInformation(tid), timeout=300) ask(self._doAskCheckTIDRange(add64(tid_list[-1], 1), RANGE_LENGTH))
ask(self._doAskCheckTIDRange(add64(tid_list[-1], 1), RANGE_LENGTH))
else:
ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID))
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
def answerTransactionInformation(self, conn, tid, def answerTransactionInformation(self, conn, tid,
...@@ -129,19 +127,17 @@ class ReplicationHandler(EventHandler): ...@@ -129,19 +127,17 @@ class ReplicationHandler(EventHandler):
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
def answerObjectHistoryFrom(self, conn, object_dict): def answerObjectHistoryFrom(self, conn, object_dict):
assert object_dict
app = self.app app = self.app
ask = conn.ask ask = conn.ask
deleteObject = app.dm.deleteObject deleteObject = app.dm.deleteObject
my_object_dict = app.replicator.getObjectHistoryFromResult() my_object_dict = app.replicator.getObjectHistoryFromResult()
object_set = set() object_set = set()
if object_dict: max_oid = max(object_dict.iterkeys())
max_oid = max(object_dict.iterkeys()) max_serial = max(object_dict[max_oid])
max_serial = max(object_dict[max_oid]) for oid, serial_list in object_dict.iteritems():
for oid, serial_list in object_dict.iteritems(): for serial in serial_list:
for serial in serial_list: object_set.add((oid, serial))
object_set.add((oid, serial))
else:
max_oid = None
my_object_set = set() my_object_set = set()
for oid, serial_list in my_object_dict.iteritems(): for oid, serial_list in my_object_dict.iteritems():
filter = lambda x: True filter = lambda x: True
...@@ -156,14 +152,11 @@ class ReplicationHandler(EventHandler): ...@@ -156,14 +152,11 @@ class ReplicationHandler(EventHandler):
extra_object_set = my_object_set - object_set extra_object_set = my_object_set - object_set
for oid, serial in extra_object_set: for oid, serial in extra_object_set:
deleteObject(oid, serial) deleteObject(oid, serial)
if object_dict: missing_object_set = object_set - my_object_set
missing_object_set = object_set - my_object_set for oid, serial in missing_object_set:
for oid, serial in missing_object_set: ask(Packets.AskObject(oid, serial, None), timeout=300)
ask(Packets.AskObject(oid, serial, None), timeout=300) ask(self._doAskCheckSerialRange(max_oid, add64(max_serial, 1),
ask(self._doAskCheckSerialRange(max_oid, add64(max_serial, 1), RANGE_LENGTH))
RANGE_LENGTH))
else:
self.app.replicator.setReplicationDone()
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
def answerObject(self, conn, oid, serial_start, def answerObject(self, conn, oid, serial_start,
...@@ -228,10 +221,15 @@ class ReplicationHandler(EventHandler): ...@@ -228,10 +221,15 @@ class ReplicationHandler(EventHandler):
else: else:
# No more chunks. # No more chunks.
action = CHECK_DONE action = CHECK_DONE
params = None params = (next_boundary, )
else: else:
# We must recheck current chunk. # We must recheck current chunk.
if length <= MIN_RANGE_LENGTH: if count == 0:
# Reference storage has no data for this chunk, stop and
# truncate.
action = CHECK_DONE
params = (current_boundary, )
elif length <= MIN_RANGE_LENGTH:
# We are already at minimum chunk length, replicate. # We are already at minimum chunk length, replicate.
action = CHECK_REPLICATE action = CHECK_REPLICATE
params = (recheck_min_boundary, ) params = (recheck_min_boundary, )
...@@ -259,18 +257,21 @@ class ReplicationHandler(EventHandler): ...@@ -259,18 +257,21 @@ class ReplicationHandler(EventHandler):
ask(self._doAskTIDsFrom(min_tid, count)) ask(self._doAskTIDsFrom(min_tid, count))
if length != count: if length != count:
action = CHECK_DONE action = CHECK_DONE
params = (next_tid, )
if action == CHECK_CHUNK: if action == CHECK_CHUNK:
(min_tid, count) = params (min_tid, count) = params
if min_tid >= replicator.getCurrentCriticalTID(): if min_tid >= replicator.getCurrentCriticalTID():
# Stop if past critical TID # Stop if past critical TID
action = CHECK_DONE action = CHECK_DONE
params = (next_tid, )
else: else:
ask(self._doAskCheckTIDRange(min_tid, count)) ask(self._doAskCheckTIDRange(min_tid, count))
if action == CHECK_DONE: if action == CHECK_DONE:
# Delete all transactions we might have which are beyond what peer # Delete all transactions we might have which are beyond what peer
# knows. # knows.
(last_tid, ) = params
app.dm.deleteTransactionsAbove(app.pt.getPartitions(), app.dm.deleteTransactionsAbove(app.pt.getPartitions(),
replicator.getCurrentRID(), max_tid) replicator.getCurrentRID(), last_tid)
# If no more TID, a replication of transactions is finished. # If no more TID, a replication of transactions is finished.
# So start to replicate objects now. # So start to replicate objects now.
ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID)) ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID))
...@@ -291,14 +292,16 @@ class ReplicationHandler(EventHandler): ...@@ -291,14 +292,16 @@ class ReplicationHandler(EventHandler):
ask(self._doAskObjectHistoryFrom(min_oid, min_serial, count)) ask(self._doAskObjectHistoryFrom(min_oid, min_serial, count))
if length != count: if length != count:
action = CHECK_DONE action = CHECK_DONE
params = (next_params, )
if action == CHECK_CHUNK: if action == CHECK_CHUNK:
((min_oid, min_serial), count) = params ((min_oid, min_serial), count) = params
ask(self._doAskCheckSerialRange(min_oid, min_serial, count)) ask(self._doAskCheckSerialRange(min_oid, min_serial, count))
if action == CHECK_DONE: if action == CHECK_DONE:
# Delete all objects we might have which are beyond what peer # Delete all objects we might have which are beyond what peer
# knows. # knows.
((last_oid, last_serial), ) = params
app.dm.deleteObjectsAbove(app.pt.getPartitions(), app.dm.deleteObjectsAbove(app.pt.getPartitions(),
replicator.getCurrentRID(), max_oid, max_serial) replicator.getCurrentRID(), last_oid, last_serial)
# Nothing remains, so the replication for this partition is # Nothing remains, so the replication for this partition is
# finished. # finished.
replicator.setReplicationDone() replicator.setReplicationDone()
......
...@@ -207,13 +207,6 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -207,13 +207,6 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
calls = app.dm.mockGetNamedCalls('deleteTransaction') calls = app.dm.mockGetNamedCalls('deleteTransaction')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(tid_list[0]) calls[0].checkArgs(tid_list[0])
# Peer has no transaction above requested min, go on with object
# replication after deleting local transactions
conn = self.getFakeConnection()
known_tid_list = [tid_list[0], ]
app = self.getApp(conn=conn, tid_result=known_tid_list)
ReplicationHandler(app).answerTIDsFrom(conn, [])
self.checkAskPacket(conn, Packets.AskCheckSerialRange)
def test_answerTransactionInformation(self): def test_answerTransactionInformation(self):
conn = self.getFakeConnection() conn = self.getFakeConnection()
...@@ -276,22 +269,6 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -276,22 +269,6 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
(oid_4, tid_list[4]), (oid_4, tid_list[4]),
)) ))
self.assertEqual(actual_deletes, expected_deletes) self.assertEqual(actual_deletes, expected_deletes)
# Peer has no object above requested min, replication is over for this
# transaction once we deleted local content.
oid_dict = FakeDict(())
conn = self.getFakeConnection()
app = self.getApp(conn=conn, history_result={
oid_1: [tid_list[2]],
})
ReplicationHandler(app).answerObjectHistoryFrom(conn, oid_dict)
calls = app.dm.mockGetNamedCalls('deleteObject')
actual_deletes = set(((x.getParam(0), x.getParam(1)) for x in calls))
expected_deletes = set((
(oid_1, tid_list[2]),
))
self.assertEqual(actual_deletes, expected_deletes)
calls = app.replicator.mockGetNamedCalls('setReplicationDone')
self.assertEqual(len(calls), 1)
def test_answerObject(self): def test_answerObject(self):
conn = self.getFakeConnection() conn = self.getFakeConnection()
...@@ -411,7 +388,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -411,7 +388,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
# ...and delete partition tail # ...and delete partition tail
calls = app.dm.mockGetNamedCalls('deleteTransactionsAbove') calls = app.dm.mockGetNamedCalls('deleteTransactionsAbove')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(num_partitions, rid, max_tid) calls[0].checkArgs(num_partitions, rid, add64(max_tid, 1))
def test_answerCheckTIDRangeDifferentBigChunk(self): def test_answerCheckTIDRangeDifferentBigChunk(self):
min_tid = self.getNextTID() min_tid = self.getNextTID()
...@@ -562,7 +539,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -562,7 +539,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
# ...and delete partition tail # ...and delete partition tail
calls = app.dm.mockGetNamedCalls('deleteObjectsAbove') calls = app.dm.mockGetNamedCalls('deleteObjectsAbove')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(num_partitions, rid, max_oid, max_serial) calls[0].checkArgs(num_partitions, rid, max_oid, add64(max_serial, 1))
def test_answerCheckSerialRangeDifferentBigChunk(self): def test_answerCheckSerialRangeDifferentBigChunk(self):
min_oid = self.getOID(1) min_oid = self.getOID(1)
...@@ -656,7 +633,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -656,7 +633,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
# ...and delete partition tail # ...and delete partition tail
calls = app.dm.mockGetNamedCalls('deleteObjectsAbove') calls = app.dm.mockGetNamedCalls('deleteObjectsAbove')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(num_partitions, rid, max_oid, max_serial) calls[0].checkArgs(num_partitions, rid, max_oid, add64(max_serial, 1))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -344,7 +344,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -344,7 +344,7 @@ class StorageDBTests(NeoUnitTestBase):
txn, objs = self.getTransaction([oid1]) txn, objs = self.getTransaction([oid1])
self.db.storeTransaction(tid, objs, txn) self.db.storeTransaction(tid, objs, txn)
self.db.finishTransaction(tid) self.db.finishTransaction(tid)
self.db.deleteTransactionsAbove(2, 0, tid1) self.db.deleteTransactionsAbove(2, 0, tid2)
# Right partition, below cutoff # Right partition, below cutoff
self.assertNotEqual(self.db.getTransaction(tid1, True), None) self.assertNotEqual(self.db.getTransaction(tid1, True), None)
# Wrong partition, above cutoff # Wrong partition, above cutoff
...@@ -381,7 +381,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -381,7 +381,7 @@ class StorageDBTests(NeoUnitTestBase):
txn, objs = self.getTransaction([oid1, oid2, oid3]) txn, objs = self.getTransaction([oid1, oid2, oid3])
self.db.storeTransaction(tid, objs, txn) self.db.storeTransaction(tid, objs, txn)
self.db.finishTransaction(tid) self.db.finishTransaction(tid)
self.db.deleteObjectsAbove(2, 0, oid1, tid1) self.db.deleteObjectsAbove(2, 0, oid1, tid2)
# Right partition, below cutoff # Right partition, below cutoff
self.assertNotEqual(self.db.getObject(oid1, tid=tid1), None) self.assertNotEqual(self.db.getObject(oid1, tid=tid1), None)
# Right partition, above tid cutoff # Right partition, above tid cutoff
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment