Commit 27229793 authored by Julien Muchembled's avatar Julien Muchembled

master: fix resumption of backup replication (internal or not)

Before, it waited for upstream activity until all partitions are touched.
However, when upstream is idle the backup cluster could remain stuck forever
if it was interrupted whereas some cells were still late.
parent 7b2e6752
...@@ -206,12 +206,21 @@ class BackupApplication(object): ...@@ -206,12 +206,21 @@ class BackupApplication(object):
except IndexError: except IndexError:
last_max_tid = prev_tid last_max_tid = prev_tid
if offset in partition_set: if offset in partition_set:
self.tid_list[offset].append(tid) primary_list = []
node_list = [] node_list = []
for cell in pt.getCellList(offset, readable=True): cell_list = pt.getCellList(offset, readable=True)
for cell in cell_list:
node = cell.getNode() node = cell.getNode()
assert node.isConnected(), node assert node.isConnected(), node
if cell.backup_tid == prev_tid: if cell.backup_tid == prev_tid:
if prev_tid == tid:
# Connecting to upstream: any node is that is
# up-to-date wrt upstream is candidate for being
# primary.
assert self.ignore_invalidations
if app.isStorageReady(node.getUUID()):
primary_list.append(node)
continue
# Let's given 4 TID t0,t1,t2,t3: if a cell is only # Let's given 4 TID t0,t1,t2,t3: if a cell is only
# modified by t0 & t3 and has all data for t0, 4 values # modified by t0 & t3 and has all data for t0, 4 values
# are possible for its 'backup_tid' until it replicates # are possible for its 'backup_tid' until it replicates
...@@ -231,12 +240,19 @@ class BackupApplication(object): ...@@ -231,12 +240,19 @@ class BackupApplication(object):
cell.backup_tid, last_max_tid, prev_tid, tid) cell.backup_tid, last_max_tid, prev_tid, tid)
if app.isStorageReady(node.getUUID()): if app.isStorageReady(node.getUUID()):
node_list.append(node) node_list.append(node)
assert node_list
trigger_set.update(node_list)
# Make sure we have a primary storage for this partition. # Make sure we have a primary storage for this partition.
if offset not in self.primary_partition_dict: if offset not in self.primary_partition_dict:
self.primary_partition_dict[offset] = \ self.primary_partition_dict[offset] = \
random.choice(node_list) random.choice(primary_list or node_list)
if node_list:
self.tid_list[offset].append(tid)
if primary_list:
# Resume replication to secondary cells.
self._triggerSecondary(
self.primary_partition_dict[offset],
offset, tid, cell_list)
else:
trigger_set.update(node_list)
else: else:
# Partition not touched, so increase 'backup_tid' of all # Partition not touched, so increase 'backup_tid' of all
# "up-to-date" replicas, without having to replicate. # "up-to-date" replicas, without having to replicate.
...@@ -335,15 +351,18 @@ class BackupApplication(object): ...@@ -335,15 +351,18 @@ class BackupApplication(object):
if app.getClusterState() == ClusterStates.BACKINGUP: if app.getClusterState() == ClusterStates.BACKINGUP:
self.triggerBackup(node) self.triggerBackup(node)
if primary: if primary:
# Notify secondary storages that they can replicate from self._triggerSecondary(node, offset, tid, cell_list)
# primary ones, even if they are already replicating.
p = Packets.Replicate(tid, '', {offset: node.getAddress()})
for cell in cell_list:
if max(cell.backup_tid, cell.replicating) < tid:
cell.replicating = tid
logging.debug(
"ask %s to replicate partition %u up to %s from %s",
uuid_str(cell.getUUID()), offset,
dump(tid), uuid_str(node.getUUID()))
cell.getNode().send(p)
return result return result
def _triggerSecondary(self, node, offset, tid, cell_list):
# Notify secondary storages that they can replicate from
# primary ones, even if they are already replicating.
p = Packets.Replicate(tid, '', {offset: node.getAddress()})
for cell in cell_list:
if max(cell.backup_tid, cell.replicating) < tid:
cell.replicating = tid
logging.debug(
"ask %s to replicate partition %u up to %s from %s",
uuid_str(cell.getUUID()), offset,
dump(tid), uuid_str(node.getUUID()))
cell.getNode().send(p)
...@@ -35,7 +35,7 @@ class BackupHandler(EventHandler): ...@@ -35,7 +35,7 @@ class BackupHandler(EventHandler):
def answerLastTransaction(self, conn, tid): def answerLastTransaction(self, conn, tid):
app = self.app app = self.app
prev_tid = app.app.getLastTransaction() prev_tid = app.app.getLastTransaction()
if prev_tid < tid: if prev_tid <= tid:
# Since we don't know which partitions were modified during our # Since we don't know which partitions were modified during our
# absence, we must force replication on all storages. As long as # absence, we must force replication on all storages. As long as
# they haven't done this first check, our backup tid will remain # they haven't done this first check, our backup tid will remain
...@@ -43,8 +43,12 @@ class BackupHandler(EventHandler): ...@@ -43,8 +43,12 @@ class BackupHandler(EventHandler):
# >= app.app.getLastTransaction() # >= app.app.getLastTransaction()
# < tid # < tid
# but passing 'tid' is good enough. # but passing 'tid' is good enough.
# A special case is when prev_tid == tid: even in this case, we
# must restore the state of the backup app so that any interrupted
# replication (internal or not) is resumed, otherwise the global
# backup_tid could remain stuck to an old tid if upstream is idle.
app.invalidatePartitions(tid, tid, xrange(app.pt.getPartitions())) app.invalidatePartitions(tid, tid, xrange(app.pt.getPartitions()))
elif prev_tid != tid: else:
raise RuntimeError("upstream DB truncated") raise RuntimeError("upstream DB truncated")
app.ignore_invalidations = False app.ignore_invalidations = False
......
...@@ -132,10 +132,14 @@ class ReplicationTests(NEOThreadedTest): ...@@ -132,10 +132,14 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(backup.neoctl.getClusterState(), self.assertEqual(backup.neoctl.getClusterState(),
ClusterStates.RUNNING) ClusterStates.RUNNING)
# Restart and switch to BACKINGUP mode again.
backup.stop() backup.stop()
backup.start() backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP) backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
self.tic() self.tic()
# Leave BACKINGUP mode when 1 replica is late. The cluster
# remains in STOPPING_BACKUP state until it catches up.
with backup.master.filterConnection(*backup.storage_list) as f: with backup.master.filterConnection(*backup.storage_list) as f:
f.add(delaySecondary) f.add(delaySecondary)
while not f.filtered_count: while not f.filtered_count:
...@@ -147,6 +151,8 @@ class ReplicationTests(NEOThreadedTest): ...@@ -147,6 +151,8 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(np*nr, self.checkBackup(backup, self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.last_tid)) max_tid=backup.last_tid))
# Again but leave BACKINGUP mode when a storage node is
# receiving data from the upstream cluster.
backup.stop() backup.stop()
backup.start() backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP) backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
...@@ -162,6 +168,48 @@ class ReplicationTests(NEOThreadedTest): ...@@ -162,6 +168,48 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(np*nr, self.checkBackup(backup, self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.last_tid)) max_tid=backup.last_tid))
storage = upstream.getZODBStorage()
# Check that replication from upstream is resumed even if
# upstream is idle.
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
self.tic()
x = backup.master.backup_app.primary_partition_dict
new_oid_storage = x[0]
with upstream.moduloTID(next(p for p, n in x.iteritems()
if n is not new_oid_storage)), \
ConnectionFilter() as f:
f.delayAddObject()
# Transaction that touches 2 primary cells on 2 different
# nodes.
txn = transaction.Transaction()
tid = storage.load(ZERO_OID)[1]
storage.tpc_begin(txn)
storage.store(ZERO_OID, tid, '', '', txn)
storage.tpc_vote(txn)
storage.tpc_finish(txn)
self.tic()
# Stop when exactly 1 of the 2 cells is synced with
# upstream.
backup.stop()
backup.start()
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.last_tid))
# Check that replication to secondary cells is resumed even if
# upstream is idle.
with backup.master.filterConnection(*backup.storage_list) as f:
f.add(delaySecondary)
txn = transaction.Transaction()
storage.tpc_begin(txn)
storage.tpc_finish(txn)
self.tic()
backup.stop()
backup.start()
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.last_tid))
@predictable_random() @predictable_random()
def testBackupNodeLost(self): def testBackupNodeLost(self):
"""Check backup cluster can recover after random connection loss """Check backup cluster can recover after random connection loss
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment