Commit d3c8b76d authored by Julien Muchembled's avatar Julien Muchembled

Safer DB truncation, new 'truncate' ctl command

With the previous commit, the request to truncate the DB was not stored
persistently, which means that this operation was still vulnerable to the case
where the master is restarted after some nodes, but not all, have already
truncated. The master didn't have the information to fix this and the result
was a DB partially truncated.

-> On a Truncate packet, a storage node only stores the tid somewhere, to send
   it back to the master, which stays in RECOVERING state as long as any node
   has a different value than that of the node with the latest partition table.

We also want to make sure that there is no unfinished data, because a user may
truncate at a tid higher than a locked one.

-> Truncation is now effective at the end on the VERIFYING phase, just before
   returning the last ids to the master.

At last all nodes should be truncated, to avoid that an offline node comes back
with a different history. Currently, this would not be an issue since
replication is always restart from the beginning, but later we'd like they
remember where they stopped to replicate.

-> If a truncation is requested, the master waits for all nodes to be pending,
   even if it was previously started (the user can still force the cluster to
   start with neoctl). And any lost node during verification also causes the
   master to go back to recovery.

Obviously, the protocol has been changed to split the LastIDs packet and
introduce a new Recovery, since it does not make sense anymore to ask last ids
during recovery.
parent 3e3eab5b
......@@ -141,9 +141,7 @@
- Make admin node able to monitor multiple clusters simultaneously
- Send notifications (ie: mail) when a storage or master node is lost
- Add ctl command to truncate DB at arbitrary TID. 'Truncate' message
can be reused. There should also be a way to list last transactions,
like fstail for FileStorage.
- Add ctl command to list last transactions, like fstail for FileStorage.
- Use another mock library: Python 3.3+ has unittest.mock, which is
......@@ -65,10 +65,12 @@ class AdminEventHandler(EventHandler):
askLastIDs = forward_ask(Packets.AskLastIDs)
askLastTransaction = forward_ask(Packets.AskLastTransaction)
addPendingNodes = forward_ask(Packets.AddPendingNodes)
askRecovery = forward_ask(Packets.AskRecovery)
tweakPartitionTable = forward_ask(Packets.TweakPartitionTable)
setClusterState = forward_ask(Packets.SetClusterState)
setNodeState = forward_ask(Packets.SetNodeState)
checkReplicas = forward_ask(Packets.CheckReplicas)
truncate = forward_ask(Packets.Truncate)
class MasterEventHandler(EventHandler):
......@@ -102,11 +102,17 @@ class PrimaryNotificationsHandler(MTEventHandler):
if app.master_conn is None:
oid_list = app._cache.clear_current()
db = app.getDB()
if db is not None:
db.invalidate(app.last_tid and
add64(app.last_tid, 1), oid_list)
if app.last_tid < ltid:
oid_list = app._cache.clear_current()
db is None or db.invalidate(
app.last_tid and add64(app.last_tid, 1),
# The DB was truncated. It happens so
# rarely that we don't need to optimize.
db is None or db.invalidateCache()
app.last_tid = ltid
......@@ -23,7 +23,7 @@ class ElectionFailure(NeoException):
class PrimaryFailure(NeoException):
class OperationFailure(NeoException):
class StoppedOperation(NeoException):
class DatabaseFailure(NeoException):
......@@ -722,16 +722,24 @@ class ReelectPrimary(Packet):
Force a re-election of a primary master node. M -> M.
class Recovery(Packet):
Ask all data needed by master to recover. PM -> S, S -> PM.
_answer = PStruct('answer_recovery',
class LastIDs(Packet):
Ask the last OID, the last TID and the last Partition Table ID so that
a master recover. PM -> S, S -> PM.
Ask the last OID/TID so that a master can initialize its TransactionManager.
PM -> S, S -> PM.
_answer = PStruct('answer_last_ids',
class PartitionTable(Packet):
......@@ -1470,13 +1478,14 @@ class ReplicationDone(Packet):
class Truncate(Packet):
XXX: Used for both make storage consistent and leave backup mode
M -> S
Request DB to be truncated. Also used to leave backup mode.
_fmt = PStruct('truncate',
_answer = Error
StaticRegistry = {}
def register(request, ignore_when_closed=None):
......@@ -1594,6 +1603,8 @@ class Packets(dict):
NotifyNodeInformation = register(
AskRecovery, AnswerRecovery = register(
AskLastIDs, AnswerLastIDs = register(
AskPartitionTable, AnswerPartitionTable = register(
......@@ -24,7 +24,7 @@ from neo.lib.protocol import uuid_str, UUID_NAMESPACES, ZERO_TID
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.lib.handler import EventHandler
from neo.lib.connection import ListeningConnection, ClientConnection
from neo.lib.exception import ElectionFailure, PrimaryFailure, OperationFailure
from neo.lib.exception import ElectionFailure, PrimaryFailure, StoppedOperation
class StateChangedException(Exception): pass
......@@ -45,6 +45,7 @@ class Application(BaseApplication):
backup_tid = None
backup_app = None
uuid = None
truncate_tid = None
def __init__(self, config):
super(Application, self).__init__(
......@@ -331,12 +332,9 @@ class Application(BaseApplication):
# machines but must not start automatically: otherwise, each storage
# node would diverge.
self._startup_allowed = False
self.truncate_tid = None
while True:
# Automatic restart if we become non-operational.
self._startup_allowed = True
if not self.backup_tid:
......@@ -346,10 +344,13 @@ class Application(BaseApplication):
if self.backup_app is None:
raise RuntimeError("No upstream cluster to backup"
" defined in configuration")
self.truncate_tid = self.backup_app.provideService()
except OperationFailure:
truncate = Packets.Truncate(
except StoppedOperation, e:
logging.critical('No longer operational')
self.truncate_tid = None
truncate = Packets.Truncate(*e.args) if e.args else None
# Automatic restart except if we truncate or retry to.
self._startup_allowed = not (self.truncate_tid or truncate)
node_list = []
for node in self.nm.getIdentifiedList():
if node.isStorage() or node.isClient():
......@@ -357,7 +358,10 @@ class Application(BaseApplication):
if node.isClient():
elif node.isRunning():
if truncate:
if node.isRunning():
......@@ -475,7 +479,7 @@ class Application(BaseApplication):
# wait for all transaction to be finished
except OperationFailure:
except StoppedOperation:
logging.critical('No longer operational')"asking remaining nodes to shutdown")
......@@ -152,17 +152,19 @@ class BackupApplication(object):
assert tid != ZERO_TID
logging.warning("Truncating at %s (last_tid was %s)",
dump(app.backup_tid), dump(last_tid))
# We will really truncate so do not start automatically
# if there's any missing storage.
app._startup_allowed = False
# We will do a dummy truncation, just to leave backup mode,
# so it's fine to start automatically if there's any
# missing storage.
# XXX: Consider using another method to leave backup mode,
# at least when there's nothing to truncate. Because
# in case of StoppedOperation during VERIFYING state,
# this flag will be wrongly set to False.
app._startup_allowed = True
# If any error happened before reaching this line, we'd go back
# to backup mode, which is the right mode to recover.
del app.backup_tid
# We will go through a recovery phase in order to reset the
# transaction manager and this is only possible if storages
# already know that we left backup mode. To that purpose, we
# always stop operation with a tid, even if there's nothing to
# truncate.
# Now back to RECOVERY...
return tid
del self.primary_partition_dict, self.tid_list
......@@ -15,7 +15,7 @@
# along with this program. If not, see <>.
from neo.lib import logging
from neo.lib.exception import OperationFailure
from neo.lib.exception import StoppedOperation
from neo.lib.handler import EventHandler
from neo.lib.protocol import (uuid_str, NodeTypes, NodeStates, Packets,
......@@ -66,13 +66,16 @@ class MasterHandler(EventHandler):
state =
def askLastIDs(self, conn):
def askRecovery(self, conn):
app =
app.backup_tid and,
def askLastIDs(self, conn):
tm =
conn.answer(Packets.AnswerLastIDs(tm.getLastOID(), tm.getLastTID()))
def askLastTransaction(self, conn):
......@@ -130,9 +133,11 @@ class BaseServiceHandler(MasterHandler):'drop a pending node from the node manager')
if app.truncate_tid:
raise StoppedOperation
if not
raise OperationFailure("cannot continue operation")
raise StoppedOperation
def notifyReady(self, conn):
......@@ -19,6 +19,7 @@ import random
from . import MasterHandler
from import StateChangedException
from neo.lib import logging
from neo.lib.exception import StoppedOperation
from import PartitionTableException
from neo.lib.protocol import ClusterStates, Errors, \
NodeStates, NodeTypes, Packets, ProtocolError, uuid_str
......@@ -159,6 +160,13 @@ class AdministrationHandler(MasterHandler):
map(app.nm.getByUUID, uuid_list)))
def truncate(self, conn, tid):
app =
if app.cluster_state != ClusterStates.RUNNING:
raise ProtocolError('Can not truncate in this state')
raise StoppedOperation(tid)
def checkReplicas(self, conn, partition_dict, min_tid, max_tid):
app =
pt =
......@@ -16,7 +16,7 @@
from neo.lib import logging
from neo.lib.protocol import CellStates, ClusterStates, Packets, ProtocolError
from neo.lib.exception import OperationFailure
from neo.lib.exception import StoppedOperation
from import PartitionTableException
from . import BaseServiceHandler
......@@ -76,7 +76,7 @@ class StorageServiceHandler(BaseServiceHandler):
if not
raise OperationFailure('cannot continue operation')
raise StoppedOperation
def notifyReplicationDone(self, conn, offset, tid):
app =
......@@ -15,7 +15,6 @@
# along with this program. If not, see <>.
from neo.lib import logging
from neo.lib.util import dump
from neo.lib.protocol import Packets, ProtocolError, ClusterStates, NodeStates
from .handlers import MasterHandler
......@@ -30,6 +29,7 @@ class RecoveryManager(MasterHandler):
self.target_ptid = None
self.ask_pt = []
self.backup_tid_dict = {}
self.truncate_dict = {}
def getHandler(self):
return self
......@@ -49,7 +49,6 @@ class RecoveryManager(MasterHandler):
"""'begin the recovery of the status')
app =
pt =
......@@ -64,8 +63,12 @@ class RecoveryManager(MasterHandler):
node_list = pt.getOperationalNodeSet()
if app._startup_allowed:
node_list = [node for node in node_list if node.isPending()]
elif not all(node.isPending() for node in node_list):
elif node_list:
# we want all nodes to be there if we're going to truncate
if app.truncate_tid:
node_list = pt.getNodeSet()
if not all(node.isPending() for node in node_list):
elif app._startup_allowed or app.autostart:
# No partition table and admin allowed startup, we are
# creating a new cluster out of all pending nodes.
......@@ -77,6 +80,17 @@ class RecoveryManager(MasterHandler):
if node_list and not any(node.getConnection().isPending()
for node in node_list):
if pt.filled():
if app.truncate_tid:
node_list = app.nm.getIdentifiedList(pool_set={uuid
for uuid, tid in self.truncate_dict.iteritems()
if not tid or app.truncate_tid < tid})
if node_list:
truncate = Packets.Truncate(app.truncate_tid)
for node in node_list:
conn = node.getConnection()
self.connectionCompleted(conn, False)
node_list = pt.getConnectedNodeList()
......@@ -101,12 +115,13 @@ class RecoveryManager(MasterHandler):
app.backup_tid = pt.getBackupTid()
logging.debug('cluster starts with loid=%s and this partition table :',
logging.debug('cluster starts this partition table:')
def connectionLost(self, conn, new_state):
uuid = conn.getUUID()
self.backup_tid_dict.pop(uuid, None)
self.truncate_dict.pop(uuid, None)
node =
i = self.ask_pt.index(uuid)
......@@ -129,40 +144,38 @@ class RecoveryManager(MasterHandler):[node])
def connectionCompleted(self, conn, new):
tid =
if tid:
# ask the last IDs to perform the recovery
def answerLastIDs(self, conn, loid, ltid, lptid, backup_tid):
tm =
def answerRecovery(self, conn, ptid, backup_tid, truncate_tid):
uuid = conn.getUUID()
if self.target_ptid <= lptid:
if self.target_ptid <= ptid:
# Maybe a newer partition table.
if self.target_ptid == lptid and self.ask_pt:
if self.target_ptid == ptid and self.ask_pt:
# Another node is already asked.
elif self.target_ptid < lptid or self.ask_pt is not ():
elif self.target_ptid < ptid or self.ask_pt is not ():
# No node asked yet for the newest partition table.
self.target_ptid = lptid
self.target_ptid = ptid
self.ask_pt = [uuid]
self.backup_tid_dict[uuid] = backup_tid
self.truncate_dict[uuid] = truncate_tid
def answerPartitionTable(self, conn, ptid, row_list):
# If this is not from a target node, ignore it.
if ptid == self.target_ptid:
app =
new_nodes =, row_list,
new_nodes =, row_list, app.nm)
except IndexError:
raise ProtocolError('Invalid offset')
Packets.SendPartitionTable(ptid, row_list))
self.ask_pt = () = self.backup_tid_dict[conn.getUUID()]
uuid = conn.getUUID()
app.backup_tid = self.backup_tid_dict[uuid]
app.truncate_tid = self.truncate_dict[uuid]
def _notifyAdmins(self, *packets):
for node in
......@@ -59,9 +59,18 @@ class VerificationManager(BaseServiceHandler):
def run(self):
app =
if not app.backup_tid:
# This is where storages truncate if requested:
# - we make sure all nodes are running with a truncate_tid value saved
# - there's no unfinished data
# - just before they return the last tid/oid
[x for x in app.nm.getIdentifiedList() if x.isStorage()])
# Just to not return meaningless information in AnswerRecovery.
app.truncate_tid = None
def verifyData(self):
app =
......@@ -97,33 +106,18 @@ class VerificationManager(BaseServiceHandler):
# Finish all transactions for which we know that tpc_finish was called
# but not fully processed. This may include replicas with transactions
# that were not even locked.
all_set = set()
for ttid, tid in self._locked_dict.iteritems():
uuid_set = self._voted_dict.get(ttid)
if uuid_set:
all_set |= uuid_set
packet = Packets.ValidateTransaction(ttid, tid)
for node in getIdentifiedList(pool_set=uuid_set):
# Ask last oid/tid again for nodes that recovers locked transactions.
# In fact, this is mainly for the last oid since the last tid can be
# deduced from max(self._locked_dict.values()).
# If getLastIDs is not always instantaneous for some backends, we
# should split AskLastIDs to not ask the last oid/tid at the end of
# recovery phase (and instead ask all nodes once, here).
# With this request, we also prefer to make sure all nodes validate
# successfully before switching to RUNNING state.
def answerLastIDs(self, conn, loid, ltid, lptid, backup_tid):
def answerLastIDs(self, conn, loid, ltid):
tm =
ptid =
assert lptid < ptid if None != lptid != ptid else not backup_tid
def answerLockedTransactions(self, conn, tid_dict):
uuid = conn.getUUID()
......@@ -37,6 +37,7 @@ action_dict = {
'tweak': 'tweakPartitionTable',
'drop': 'dropNode',
'kill': 'killNode',
'truncate': 'truncate',
uuid_int = (lambda ns: lambda uuid:
......@@ -85,11 +86,14 @@ class TerminalNeoCTL(object):
Get last ids.
assert not params
r = self.neoctl.getLastIds()
if r[3]:
return "last_tid = 0x%x" % u64(self.neoctl.getLastTransaction())
return "last_oid = 0x%x\nlast_tid = 0x%x\nlast_ptid = %u" % (
u64(r[0]), u64(r[1]), r[2])
ptid, backup_tid, truncate_tid = self.neoctl.getRecovery()
if backup_tid:
ltid = self.neoctl.getLastTransaction()
r = "backup_tid = 0x%x" % u64(backup_tid)
loid, ltid = self.neoctl.getLastIds()
r = "last_oid = 0x%x" % u64(loid)
return r + "\nlast_tid = 0x%x\nlast_ptid = %u" % (u64(ltid), ptid)
def getPartitionRowList(self, params):
......@@ -193,6 +197,19 @@ class TerminalNeoCTL(object):
return uuid_str(self.neoctl.getPrimary())
def truncate(self, params):
Truncate the database at the given tid.
The cluster must be in RUNNING state, without any pending transaction.
This causes the cluster to go back in RECOVERING state, waiting all
nodes to be pending (do not use 'start' command unless you're sure
the missing nodes don't need to be truncated).
Parameters: tid
def checkReplicas(self, params):
Test whether partitions have corrupted metadata
......@@ -61,3 +61,4 @@ class CommandEventHandler(EventHandler):
answerPrimary = __answer(Packets.AnswerPrimary)
answerLastIDs = __answer(Packets.AnswerLastIDs)
answerLastTransaction = __answer(Packets.AnswerLastTransaction)
answerRecovery = __answer(Packets.AnswerRecovery)
......@@ -120,6 +120,12 @@ class NeoCTL(BaseApplication):
raise RuntimeError(response)
return response[1]
def getRecovery(self):
response = self.__ask(Packets.AskRecovery())
if response[0] != Packets.AnswerRecovery:
raise RuntimeError(response)
return response[1:]
def getNodeList(self, node_type=None):
Get a list of nodes, filtering with given type.
......@@ -163,6 +169,12 @@ class NeoCTL(BaseApplication):
raise RuntimeError(response)
return response[1]
def truncate(self, tid):
response = self.__ask(Packets.Truncate(tid))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def checkReplicas(self, *args):
response = self.__ask(Packets.CheckReplicas(*args))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
......@@ -23,7 +23,7 @@ from neo.lib.protocol import uuid_str, \
CellStates, ClusterStates, NodeTypes, Packets
from neo.lib.node import NodeManager
from neo.lib.connection import ListeningConnection
from neo.lib.exception import OperationFailure, PrimaryFailure
from neo.lib.exception import StoppedOperation, PrimaryFailure
from import PartitionTable
from neo.lib.util import dump
from neo.lib.bootstrap import BootstrapManager
......@@ -196,7 +196,7 @@ class Application(BaseApplication):
raise RuntimeError, 'should not reach here'
except OperationFailure, msg:
except StoppedOperation, msg:
logging.error('operation stopped: %s', msg)
except PrimaryFailure, msg:
logging.error('primary master is down: %s', msg)
......@@ -194,10 +194,18 @@ class DatabaseManager(object):
def getBackupTID(self):
return util.bin(self.getConfiguration('backup_tid'))
def setBackupTID(self, backup_tid):
tid = util.dump(backup_tid)
def _setBackupTID(self, tid):
tid = util.dump(tid)
logging.debug('backup_tid = %s', tid)
return self.setConfiguration('backup_tid', tid)
return self._setConfiguration('backup_tid', tid)
def getTruncateTID(self):
return util.bin(self.getConfiguration('truncate_tid'))
def _setTruncateTID(self, tid):
tid = util.dump(tid)
logging.debug('truncate_tid = %s', tid)
return self._setConfiguration('truncate_tid', tid)
def _setPackTID(self, tid):
self._setConfiguration('_pack_tid', tid)
......@@ -502,11 +510,14 @@ class DatabaseManager(object):
and max_tid (included)"""
raise NotImplementedError
def truncate(self, tid):
assert tid not in (None, ZERO_TID), tid
for partition in xrange(self.getNumPartitions()):
self._deleteRange(partition, tid)
self.setBackupTID(None) # this also commits
def truncate(self):
tid = self.getTruncateTID()
if tid:
assert tid != ZERO_TID, tid
for partition in xrange(self.getNumPartitions()):
self._deleteRange(partition, tid)
def getTransaction(self, tid, all = False):
"""Return a tuple of the list of OIDs, user information,
......@@ -16,7 +16,7 @@
from neo.lib import logging
from neo.lib.handler import EventHandler
from neo.lib.exception import PrimaryFailure, OperationFailure
from neo.lib.exception import PrimaryFailure, StoppedOperation
from neo.lib.protocol import uuid_str, NodeStates, NodeTypes, Packets
class BaseMasterHandler(EventHandler):
......@@ -27,7 +27,7 @@ class BaseMasterHandler(EventHandler):
raise PrimaryFailure('connection lost')
def stopOperation(self, conn):
raise OperationFailure('operation stopped')
raise StoppedOperation
def reelectPrimary(self, conn):
raise PrimaryFailure('re-election occurs')
......@@ -48,7 +48,7 @@ class BaseMasterHandler(EventHandler):
erase = state == NodeStates.DOWN
elif state == NodeStates.HIDDEN:
raise OperationFailure
raise StoppedOperation
elif node_type == NodeTypes.CLIENT and state != NodeStates.RUNNING:'Notified of non-running client, abort (%s)',
......@@ -46,16 +46,23 @@ class InitializationHandler(BaseMasterHandler):, cell_list, reset=True)
def truncate(self, conn, tid):
dm =
def askLastIDs(self, conn):
def askRecovery(self, conn):
app =
ltid, _, _, loid =
def askLastIDs(self, conn):
dm =
ltid, _, _, loid = dm.getLastIDs()
conn.answer(Packets.AnswerLastIDs(loid, ltid))
def askPartitionTable(self, conn):
pt =
......@@ -80,4 +87,5 @@ class InitializationHandler(BaseMasterHandler):
tid = dm.getLastIDs()[0] or ZERO_TID
tid = None
......@@ -28,7 +28,8 @@ class MasterOperationHandler(BaseMasterHandler):
assert and backup
dm =
if not dm.getBackupTID():
dm.setBackupTID(dm.getLastIDs()[0] or ZERO_TID)
dm._setBackupTID(dm.getLastIDs()[0] or ZERO_TID)
def notifyTransactionFinished(self, conn, *args, **kw):*args, **kw)
......@@ -128,7 +128,8 @@ class Replicator(object):
if tid:
new_tid = self.getBackupTID()
if tid != new_tid:
def populate(self):
app =
......@@ -67,29 +67,6 @@ class MasterRecoveryTests(NeoUnitTestBase):
def test_09_answerLastIDs(self):
recovery = self.recovery
uuid = self.identifyToMasterNode()
oid1 = self.getOID(1)
oid2 = self.getOID(2)
tid1 = self.getNextTID()
tid2 = self.getNextTID(tid1)
ptid1 = self.getPTID(1)
ptid2 = self.getPTID(2)
# send information which are later to what PMN knows, this must update target node
conn = self.getFakeConnection(uuid, self.storage_port)
self.assertTrue(ptid2 >
self.assertTrue(oid2 >
self.assertTrue(tid2 >