Commit 560e4fb1 authored by Julien Muchembled's avatar Julien Muchembled

storage: fix an AssertionError in internal replication

Traceback (most recent call last):
  ...
  File "neo/storage/handlers/storage.py", line 111, in answerFetchObjects
    self.app.replicator.finish()
  File "neo/storage/replicator.py", line 370, in finish
    self._nextPartition()
  File "neo/storage/replicator.py", line 279, in _nextPartition
    assert app.pt.getCell(offset, app.uuid).isOutOfDate()
AssertionError

The scenario is:
1. partition A: start of replication, with unfinished transactions
2. partition A: all unfinished transactions are finished
3. partition A: end of replication with ReplicationDone notification
4. replication of partition B
5. partition A: AssertionError when starting replication

The bug is that in 3, the partition A is partially replicated and the storage
node must not notify the master.
parent 61f72f9b
...@@ -276,7 +276,8 @@ class Replicator(object): ...@@ -276,7 +276,8 @@ class Replicator(object):
try: try:
addr, name = self.source_dict[offset] addr, name = self.source_dict[offset]
except KeyError: except KeyError:
assert app.pt.getCell(offset, app.uuid).isOutOfDate() assert app.pt.getCell(offset, app.uuid).isOutOfDate(), (
offset, app.pt.getCell(offset, app.uuid).getState())
node = random.choice([cell.getNode() node = random.choice([cell.getNode()
for cell in app.pt.getCellList(offset, readable=True) for cell in app.pt.getCellList(offset, readable=True)
if cell.getNodeState() == NodeStates.RUNNING]) if cell.getNodeState() == NodeStates.RUNNING])
...@@ -360,7 +361,8 @@ class Replicator(object): ...@@ -360,7 +361,8 @@ class Replicator(object):
p = self.partition_dict[offset] p = self.partition_dict[offset]
p.next_obj = add64(tid, 1) p.next_obj = add64(tid, 1)
self.updateBackupTID() self.updateBackupTID()
if p.max_ttid: if p.max_ttid or offset in self.replicate_dict and \
offset not in self.source_dict:
logging.debug("unfinished transactions: %r", self.ttid_set) logging.debug("unfinished transactions: %r", self.ttid_set)
else: else:
self.app.tm.replicated(offset, tid) self.app.tm.replicated(offset, tid)
......
...@@ -32,7 +32,7 @@ from neo.lib.connection import ClientConnection ...@@ -32,7 +32,7 @@ from neo.lib.connection import ClientConnection
from neo.lib.event import EventManager from neo.lib.event import EventManager
from neo.lib.protocol import CellStates, ClusterStates, Packets, \ from neo.lib.protocol import CellStates, ClusterStates, Packets, \
ZERO_OID, ZERO_TID, MAX_TID, uuid_str ZERO_OID, ZERO_TID, MAX_TID, uuid_str
from neo.lib.util import p64 from neo.lib.util import p64, u64
from .. import expectedFailure, Patch from .. import expectedFailure, Patch
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, \ from . import ConnectionFilter, NEOCluster, NEOThreadedTest, \
predictable_random, with_cluster predictable_random, with_cluster
...@@ -452,39 +452,53 @@ class ReplicationTests(NEOThreadedTest): ...@@ -452,39 +452,53 @@ class ReplicationTests(NEOThreadedTest):
cluster.join((s0,)) cluster.join((s0,))
t0, t1, t2 = c.db().storage.iterator() t0, t1, t2 = c.db().storage.iterator()
@with_cluster(start_cluster=0, replicas=1) @with_cluster(start_cluster=0, replicas=1, partitions=2)
def testReplicationBlockedByUnfinished(self, cluster): def testReplicationBlockedByUnfinished1(self, cluster,
if 1: delay_replication=False):
s0, s1 = cluster.storage_list s0, s1 = cluster.storage_list
cluster.start(storage_list=(s0,)) cluster.start(storage_list=(s0,))
storage = cluster.getZODBStorage() storage = cluster.getZODBStorage()
oid = storage.new_oid() oid = storage.new_oid()
with ConnectionFilter() as f, cluster.moduloTID(1 - u64(oid) % 2):
if delay_replication:
delay_replication = f.delayAnswerFetchObjects()
tid = None tid = None
expected = 'UO' expected = 'U|U'
for n in 1, 0: for n in xrange(3):
# On first iteration, the transaction will block replication # On second iteration, the transaction will block replication
# until tpc_finish. # until tpc_finish.
# We do a second iteration as a quick check that the cluster # We do a last iteration as a quick check that the cluster
# remains functional after such a scenario. # remains functional after such a scenario.
txn = transaction.Transaction() txn = transaction.Transaction()
storage.tpc_begin(txn) storage.tpc_begin(txn)
tid = storage.store(oid, tid, 'foo', '', txn) tid = storage.store(oid, tid, str(n), '', txn)
if n: if n == 1:
# Start the outdated storage. # Start the outdated storage.
s1.start() s1.start()
self.tic() self.tic()
cluster.enableStorageList((s1,)) cluster.enableStorageList((s1,))
cluster.neoctl.tweakPartitionTable() cluster.neoctl.tweakPartitionTable()
expected = 'UO|UO'
self.tic() self.tic()
self.assertPartitionTable(cluster, expected) self.assertPartitionTable(cluster, expected)
storage.tpc_vote(txn) storage.tpc_vote(txn)
self.assertPartitionTable(cluster, expected) self.assertPartitionTable(cluster, expected)
tid = storage.tpc_finish(txn) tid = storage.tpc_finish(txn)
if n == 1:
if delay_replication:
self.tic()
self.assertPartitionTable(cluster, expected)
f.remove(delay_replication)
delay_replication = None
self.tic() # replication resumes and ends self.tic() # replication resumes and ends
expected = 'UU' expected = 'UU|UU'
self.assertPartitionTable(cluster, expected) self.assertPartitionTable(cluster, expected)
self.assertEqual(cluster.neoctl.getClusterState(), self.assertEqual(cluster.neoctl.getClusterState(),
ClusterStates.RUNNING) ClusterStates.RUNNING)
self.checkPartitionReplicated(s0, s1, 0)
def testReplicationBlockedByUnfinished2(self):
self.testReplicationBlockedByUnfinished1(True)
@with_cluster(partitions=5, replicas=2, storage_count=3) @with_cluster(partitions=5, replicas=2, storage_count=3)
def testCheckReplicas(self, cluster): def testCheckReplicas(self, cluster):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment