Commit ad01f379 authored by Julien Muchembled's avatar Julien Muchembled

Backup bugfixes

- fix stopping backup cluster
- fix leaving backup mode, including truncating to consistent TID
- fix backup_tid on master and storages
parent 96897224
......@@ -527,14 +527,13 @@ class Application(object):
# remove transaction from manager
self.tm.remove(transaction_node.getUUID(), ttid)
assert self.last_transaction < tid, (self.last_transaction, tid)
self.setLastTransaction(tid)
def getLastTransaction(self):
return self.last_transaction
def setLastTransaction(self, tid):
ltid = self.last_transaction
assert tid >= ltid, (tid, ltid)
self.last_transaction = tid
def setStorageNotReady(self, uuid):
......
......@@ -16,6 +16,7 @@
import random, weakref
from bisect import bisect
from collections import defaultdict
from neo.lib import logging
from neo.lib.bootstrap import BootstrapManager
from neo.lib.connector import getConnectorHandler
......@@ -38,10 +39,17 @@ but are managed by another master in a different cluster.
When the cluster is in BACKINGUP state, its master acts like a client to the
master of the main cluster. It gets notified of new data thanks to invalidation,
and notifies in turn its storage nodes what/when to replicate.
Storages stay in UP_TO_DATE state, even if partitions are synchronized up to
different tids. Storage nodes remember they are in such state and when
switching into RUNNING state, the cluster cuts the DB at the last TID for which
we have all data.
switching into RUNNING state, the cluster cuts the DB at the "backup TID", which
is the last TID for which we have all data. This TID can't be guessed from
'trans' and 'obj' tables, like it is done in normal mode, so:
- The master must even notify storages of transactions that don't modify their
partitions: see Replicate packets without any source.
- 'backup_tid' properties exist in many places, on the master and the storages,
so that the DB can be made consistent again at any moment, without losing
any (or little) data.
Out of backup storage nodes assigned to a partition, one is chosen as primary
for that partition. It means only this node will fetch data from the upstream
......@@ -126,12 +134,20 @@ class BackupApplication(object):
except AttributeError:
pass
except StateChangedException, e:
if e.args[0] != ClusterStates.STOPPING_BACKUP:
raise
app.changeClusterState(*e.args)
tid = app.backup_tid
# Wait for non-primary partitions to catch up,
# so that all UP_TO_DATE cells are really UP_TO_DATE.
# Any unfinished replication from upstream will be truncated.
while pt.getCheckTid(xrange(pt.getPartitions())) < tid:
poll(1)
last_tid = app.getLastTransaction()
if last_tid < app.backup_tid:
if tid < last_tid:
logging.warning("Truncating at %s (last_tid was %s)",
dump(app.backup_tid), dump(last_tid))
p = Packets.AskTruncate(app.backup_tid)
p = Packets.AskTruncate(tid)
connection_list = []
for node in app.nm.getStorageList(only_identified=True):
conn = node.getConnection()
......@@ -140,7 +156,9 @@ class BackupApplication(object):
for conn in connection_list:
while conn.isPending():
poll(1)
app.setLastTransaction(app.backup_tid)
app.setLastTransaction(tid)
# If any error happened before reaching this line, we'd go back
# to backup mode, which is the right mode to recover.
del app.backup_tid
break
finally:
......@@ -176,27 +194,22 @@ class BackupApplication(object):
pt = app.pt
getByUUID = app.nm.getByUUID
trigger_set = set()
untouched_dict = defaultdict(dict)
for offset in xrange(pt.getPartitions()):
try:
last_max_tid = self.tid_list[offset][-1]
except IndexError:
last_max_tid = INVALID_TID
last_max_tid = prev_tid
if offset in partition_set:
self.tid_list[offset].append(tid)
node_list = []
for cell in pt.getCellList(offset, readable=True):
node = cell.getNode()
assert node.isConnected()
if not app.isStorageReady(node.getUUID()):
continue
node_list.append(node)
if last_max_tid <= cell.backup_tid:
# This is the last time we can increase
# 'backup_tid' without replication.
logging.debug(
"partition %u: updating backup_tid of %r to %s",
offset, cell, dump(prev_tid))
cell.backup_tid = prev_tid
assert cell.backup_tid < last_max_tid or \
cell.backup_tid == prev_tid
if app.isStorageReady(node.getUUID()):
node_list.append(node)
assert node_list
trigger_set.update(node_list)
# Make sure we have a primary storage for this partition.
......@@ -209,9 +222,16 @@ class BackupApplication(object):
for cell in pt.getCellList(offset, readable=True):
if last_max_tid <= cell.backup_tid:
cell.backup_tid = tid
logging.debug(
"partition %u: updating backup_tid of %r to %s",
offset, cell, dump(tid))
untouched_dict[cell.getNode()][offset] = None
elif last_max_tid <= cell.replicating:
# Same for 'replicating' to avoid useless orders.
logging.debug("silently update replicating order"
" of %s for partition %u, up to %s",
uuid_str(cell.getUUID()), offset, dump(tid))
cell.replicating = tid
for node, untouched_dict in untouched_dict.iteritems():
if app.isStorageReady(node.getUUID()):
node.notify(Packets.Replicate(tid, '', untouched_dict))
for node in trigger_set:
self.triggerBackup(node)
count = sum(map(len, self.tid_list))
......@@ -263,7 +283,10 @@ class BackupApplication(object):
try:
tid = add64(tid_list[bisect(tid_list, tid)], -1)
except IndexError:
tid = app.getLastTransaction()
last_tid = app.getLastTransaction()
if tid < last_tid:
tid = last_tid
node.notify(Packets.Replicate(tid, '', {offset: None}))
logging.debug("partition %u: updating backup_tid of %r to %s",
offset, cell, dump(tid))
cell.backup_tid = tid
......@@ -273,31 +296,31 @@ class BackupApplication(object):
primary_node = self.primary_partition_dict.get(offset)
primary = primary_node is node
result = None if primary else app.pt.setUpToDate(node, offset)
if app.getClusterState() == ClusterStates.BACKINGUP:
assert cell.isReadable()
if result: # was out-of-date
max_tid, = [x.backup_tid for x in cell_list
if x.getNode() is primary_node]
if tid < max_tid:
cell.replicating = max_tid
logging.debug(
"ask %s to replicate partition %u up to %s from %s",
uuid_str(node.getUUID()), offset, dump(max_tid),
uuid_str(primary_node.getUUID()))
node.getConnection().notify(Packets.Replicate(max_tid,
'', {offset: primary_node.getAddress()}))
else:
assert cell.isReadable()
if result: # was out-of-date
max_tid, = [x.backup_tid for x in cell_list
if x.getNode() is primary_node]
if tid < max_tid:
cell.replicating = max_tid
logging.debug(
"ask %s to replicate partition %u up to %s from %s",
uuid_str(node.getUUID()), offset, dump(max_tid),
uuid_str(primary_node.getUUID()))
node.notify(Packets.Replicate(max_tid, '',
{offset: primary_node.getAddress()}))
else:
if app.getClusterState() == ClusterStates.BACKINGUP:
self.triggerBackup(node)
if primary:
# Notify secondary storages that they can replicate from
# primary ones, even if they are already replicating.
p = Packets.Replicate(tid, '', {offset: node.getAddress()})
for cell in cell_list:
if max(cell.backup_tid, cell.replicating) < tid:
cell.replicating = tid
logging.debug(
"ask %s to replicate partition %u up to %s from"
" %s", uuid_str(cell.getUUID()), offset,
dump(tid), uuid_str(node.getUUID()))
cell.getNode().getConnection().notify(p)
if primary:
# Notify secondary storages that they can replicate from
# primary ones, even if they are already replicating.
p = Packets.Replicate(tid, '', {offset: node.getAddress()})
for cell in cell_list:
if max(cell.backup_tid, cell.replicating) < tid:
cell.replicating = tid
logging.debug(
"ask %s to replicate partition %u up to %s from %s",
uuid_str(cell.getUUID()), offset,
dump(tid), uuid_str(node.getUUID()))
cell.getNode().notify(p)
return result
......@@ -184,6 +184,7 @@ class Application(object):
# start the operation. This cycle will be executed permanently,
# until the user explicitly requests a shutdown.
while True:
self.cluster_state = None
self.ready = False
self.operational = False
if self.master_node is None:
......@@ -207,6 +208,8 @@ class Application(object):
raise RuntimeError, 'should not reach here'
except OperationFailure, msg:
logging.error('operation stopped: %s', msg)
if self.cluster_state == ClusterStates.STOPPING_BACKUP:
self.dm.setBackupTID(None)
except PrimaryFailure, msg:
logging.error('primary master is down: %s', msg)
finally:
......@@ -309,13 +312,15 @@ class Application(object):
_poll()
finally:
del self.task_queue
self.replicator = Replicator(self)
# Abort any replication, whether we are feeding or out-of-date.
for node in self.nm.getStorageList(only_identified=True):
node.getConnection().close()
def changeClusterState(self, state):
self.cluster_state = state
if state == ClusterStates.STOPPING_BACKUP:
self.dm.setBackupTID(None)
self.replicator.stop()
def wait(self):
# change handler
......
......@@ -175,7 +175,9 @@ class DatabaseManager(object):
return util.bin(self.getConfiguration('backup_tid'))
def setBackupTID(self, backup_tid):
return self.setConfiguration('backup_tid', util.dump(backup_tid))
tid = util.dump(backup_tid)
logging.debug('backup_tid = %s', tid)
return self.setConfiguration('backup_tid', tid)
def getPartitionTable(self):
"""Return a whole partition table as a sequence of rows. Each row
......
......@@ -65,10 +65,11 @@ class MasterOperationHandler(BaseMasterHandler):
def replicate(self, conn, tid, upstream_name, source_dict):
self.app.replicator.backup(tid,
dict((p, (a, upstream_name))
dict((p, a and (a, upstream_name))
for p, a in source_dict.iteritems()))
def askTruncate(self, conn, tid):
self.app.replicator.cancel()
self.app.dm.truncate(tid)
conn.answer(Packets.AnswerTruncate())
......
......@@ -53,8 +53,8 @@ TODO: Packing and replication currently fail when then happen at the same time.
import random
from neo.lib import bootstrap, logging
from neo.lib.protocol import CellStates, NodeTypes, NodeStates, Packets, \
INVALID_TID, ZERO_TID, ZERO_OID
from neo.lib.protocol import CellStates, ClusterStates, NodeTypes, NodeStates, \
Packets, INVALID_TID, ZERO_TID, ZERO_OID
from neo.lib.connection import ClientConnection
from neo.lib.util import add64, dump
from .handlers.storage import StorageOperationHandler
......@@ -117,6 +117,14 @@ class Replicator(object):
return add64(tid, -1)
return ZERO_TID
def updateBackupTID(self):
dm = self.app.dm
tid = dm.getBackupTID()
if tid:
new_tid = self.getBackupTID()
if tid != new_tid:
dm.setBackupTID(new_tid)
def populate(self):
app = self.app
pt = app.pt
......@@ -126,9 +134,8 @@ class Replicator(object):
self.source_dict = {}
self.ttid_set = set()
last_tid, last_trans_dict, last_obj_dict, _ = app.dm.getLastIDs()
backup_tid = app.dm.getBackupTID()
if backup_tid and last_tid < backup_tid:
last_tid = backup_tid
next_tid = app.dm.getBackupTID() or last_tid
next_tid = add64(next_tid, 1) if next_tid else ZERO_TID
outdated_list = []
for offset in xrange(pt.getPartitions()):
for cell in pt.getCellList(offset):
......@@ -143,7 +150,7 @@ class Replicator(object):
p.next_obj = last_obj_dict.get(offset, ZERO_TID)
p.max_ttid = INVALID_TID
else:
p.next_trans = p.next_obj = last_tid
p.next_trans = p.next_obj = next_tid
p.max_ttid = None
if outdated_list:
self.app.master_conn.ask(Packets.AskUnfinishedTransactions(),
......@@ -182,9 +189,23 @@ class Replicator(object):
self.abort()
def backup(self, tid, source_dict):
for offset in source_dict:
self.replicate_dict[offset] = tid
self.source_dict.update(source_dict)
next_tid = None
for offset, source in source_dict.iteritems():
if source:
self.source_dict[offset] = source
self.replicate_dict[offset] = tid
elif offset != self.current_partition and \
offset not in self.replicate_dict:
# The master did its best to avoid useless replication orders
# but there may still be a few, and we may receive redundant
# update notification of backup_tid.
# So, we do nothing here if we are already replicating.
p = self.partition_dict[offset]
if not next_tid:
next_tid = add64(tid, 1)
p.next_trans = p.next_obj = next_tid
if next_tid:
self.updateBackupTID()
self._nextPartition()
def _nextPartition(self):
......@@ -207,7 +228,7 @@ class Replicator(object):
try:
addr, name = self.source_dict[offset]
except KeyError:
assert self.app.pt.getCell(offset, self.app.uuid).isOutOfDate()
assert app.pt.getCell(offset, app.uuid).isOutOfDate()
node = random.choice([cell.getNode()
for cell in app.pt.getCellList(offset, readable=True)
if cell.getNodeState() == NodeStates.RUNNING])
......@@ -266,17 +287,10 @@ class Replicator(object):
p = self.partition_dict[offset]
max_tid = self.replicate_tid
if min_tid:
if p.next_obj < self.next_backup_tid:
self.app.dm.setBackupTID(min_tid)
p.next_obj = min_tid
else:
min_tid = p.next_obj
p.next_trans = p.next_obj = add64(max_tid, 1)
if self.app.dm.getBackupTID() is None or \
self.app.pt.getCell(offset, self.app.uuid).isOutOfDate():
self.next_backup_tid = ZERO_TID
else:
self.next_backup_tid = self.getBackupTID()
p.next_obj = min_tid
p.next_trans = add64(max_tid, 1)
object_dict = {}
for serial, oid in self.app.dm.getReplicationObjectList(min_tid,
max_tid, FETCH_COUNT, offset, min_oid):
......@@ -290,12 +304,10 @@ class Replicator(object):
def finish(self):
offset = self.current_partition
tid = self.replicate_tid
del self.current_partition, self.replicate_tid, self.next_backup_tid
del self.current_partition, self.replicate_tid
p = self.partition_dict[offset]
p.next_obj = add64(tid, 1)
dm = self.app.dm
if dm.getBackupTID() is not None:
dm.setBackupTID(self.getBackupTID())
self.updateBackupTID()
if not p.max_ttid:
p = Packets.NotifyReplicationDone(offset, tid)
self.app.master_conn.notify(p)
......@@ -310,8 +322,6 @@ class Replicator(object):
del self.current_partition
logging.warning('replication aborted for partition %u%s',
offset, message and ' (%s)' % message)
if self.app.master_node is None:
return
if offset in self.partition_dict:
# XXX: Try another partition if possible, to increase probability to
# connect to another node. It would be better to explicitely
......@@ -325,3 +335,30 @@ class Replicator(object):
self._nextPartition()
else: # partition removed
self._nextPartition()
def cancel(self):
offset = self.current_partition
if offset is not None:
logging.info('cancel replication of partition %u', offset)
del self.current_partition
try:
self.replicate_dict.setdefault(offset, self.replicate_tid)
del self.replicate_tid
except AttributeError:
pass
self.getCurrentConnection().close()
def stop(self):
d = None, None
# Cancel all replication orders from upstream cluster.
for offset in self.replicate_dict.keys():
addr, name = self.source_dict.get(offset, d)
if name:
tid = self.replicate_dict.pop(offset)
logging.info('cancel replication of partition %u from %r'
' up to %s', offset, addr, dump(tid))
# Close any open connection to an upstream storage,
# possibly aborting current replication.
node = self.current_node
if node is not None is node.getUUID():
self.cancel()
......@@ -22,29 +22,31 @@ import transaction
import unittest
from neo.lib import logging
from neo.storage.checker import CHECK_COUNT
from neo.storage.replicator import Replicator
from neo.storage.transactions import TransactionManager, \
DelayedError, ConflictError
from neo.lib.connection import MTClientConnection
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \
ZERO_OID, ZERO_TID, MAX_TID, uuid_str
from neo.lib.util import p64
from . import NEOCluster, NEOThreadedTest, Patch, predictable_random
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, Patch, \
predictable_random
from neo.client.pool import CELL_CONNECTED, CELL_GOOD
class ReplicationTests(NEOThreadedTest):
def checksumPartition(self, storage, partition):
def checksumPartition(self, storage, partition, max_tid=MAX_TID):
dm = storage.dm
args = partition, None, ZERO_TID, MAX_TID
args = partition, None, ZERO_TID, max_tid
return dm.checkTIDRange(*args), \
dm.checkSerialRange(min_oid=ZERO_OID, *args)
def checkPartitionReplicated(self, source, destination, partition):
self.assertEqual(self.checksumPartition(source, partition),
self.checksumPartition(destination, partition))
def checkPartitionReplicated(self, source, destination, partition, **kw):
self.assertEqual(self.checksumPartition(source, partition, **kw),
self.checksumPartition(destination, partition, **kw))
def checkBackup(self, cluster):
def checkBackup(self, cluster, **kw):
upstream_pt = cluster.upstream.primary_master.pt
pt = cluster.primary_master.pt
np = pt.getPartitions()
......@@ -56,7 +58,7 @@ class ReplicationTests(NEOThreadedTest):
for partition in pt.getAssignedPartitionList(storage.uuid):
cell_list = upstream_pt.getCellList(partition, readable=True)
source = source_dict[random.choice(cell_list).getUUID()]
self.checkPartitionReplicated(source, storage, partition)
self.checkPartitionReplicated(source, storage, partition, **kw)
checked += 1
return checked
......@@ -105,6 +107,47 @@ class ReplicationTests(NEOThreadedTest):
ClusterStates.RUNNING)
finally:
backup.stop()
def delaySecondary(conn, packet):
if isinstance(packet, Packets.Replicate):
tid, upstream_name, source_dict = packet.decode()
return not upstream_name and all(source_dict.itervalues())
backup.reset()
try:
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
backup.tic()
with backup.master.filterConnection(*backup.storage_list) as f:
f.add(delaySecondary)
while not f.filtered_count:
importZODB(1)
upstream.client.setPoll(0)
backup.tic()
backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
backup.tic()
backup.tic(force=1)
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.master.getLastTransaction()))
finally:
backup.stop()
backup.reset()
try:
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
backup.tic()
with ConnectionFilter() as f:
f.add(lambda conn, packet: conn.getUUID() is None and
isinstance(packet, Packets.AddObject))
while not f.filtered_count:
importZODB(1)
upstream.client.setPoll(0)
backup.tic()
backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
backup.tic()
backup.tic(force=1)
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.master.getLastTransaction()))
finally:
backup.stop()
finally:
upstream.stop()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment