Commit dec81519 authored by Julien Muchembled's avatar Julien Muchembled

master: last tid/oid after recovery/verification

The important bugfix is to update the last oid when the master verifies a
transaction with new oids.

By resetting the transaction manager at the beginning of the recovery phase,
it become possible to avoid tid/oid holes:
- by reallocating previously unused allocated oids
- when going back "in the past", i.e. reverting to an older version of the
  database (with fewer oids) and/or adjusting the clock
parent e1f9a7da
......@@ -264,8 +264,6 @@ class Application(BaseApplication):
"""'provide service')
poll = self.em.poll
# Now everything is passive.
......@@ -17,7 +17,6 @@
from neo.lib import logging
from neo.lib.util import dump
from neo.lib.protocol import Packets, ProtocolError, ClusterStates, NodeStates
from neo.lib.protocol import ZERO_OID
from .handlers import MasterHandler
......@@ -49,6 +48,7 @@ class RecoveryManager(MasterHandler):
"""'begin the recovery of the status')
app =
pt =
......@@ -88,8 +88,6 @@ class RecoveryManager(MasterHandler):
if pt.getID() is None:'creating a new partition table')
# reset IDs generators & build new partition with running nodes
pt.getID(), pt.getRowList()))
......@@ -102,7 +100,6 @@ class RecoveryManager(MasterHandler):
app.backup_tid = pt.getBackupTid()
logging.debug('cluster starts with loid=%s and this partition table :',
......@@ -121,11 +118,9 @@ class RecoveryManager(MasterHandler):
def answerLastIDs(self, conn, loid, ltid, lptid, backup_tid):
# Get max values.
if loid is not None:
if ltid is not None:
tm =
if lptid > self.target_ptid:
# something newer
self.target_ptid = lptid
......@@ -17,7 +17,7 @@
from time import time
from struct import pack, unpack
from neo.lib import logging
from neo.lib.protocol import ProtocolError, uuid_str, ZERO_TID
from neo.lib.protocol import ProtocolError, uuid_str, ZERO_OID, ZERO_TID
from neo.lib.util import dump, u64, addTID, tidFromTime
class DelayedError(Exception):
......@@ -155,15 +155,18 @@ class TransactionManager(object):
Manage current transactions
_last_tid = ZERO_TID
def __init__(self, on_commit):
self._on_commit = on_commit
def reset(self):
# ttid -> transaction
self._ttid_dict = {}
# node -> transactions mapping
self._node_dict = {}
self._last_oid = None
self._on_commit = on_commit
self._last_oid = ZERO_OID
self._last_tid = ZERO_TID
# queue filled with ttids pointing to transactions with increasing tids
self._queue = []
......@@ -182,8 +185,6 @@ class TransactionManager(object):
def getNextOIDList(self, num_oids):
""" Generate a new OID list """
if self._last_oid is None:
raise RuntimeError, 'I do not know the last OID'
oid = unpack('!Q', self._last_oid)[0] + 1
oid_list = [pack('!Q', oid + i) for i in xrange(num_oids)]
self._last_oid = oid_list[-1]
......@@ -249,14 +250,6 @@ class TransactionManager(object):
if self._last_tid < tid:
self._last_tid = tid
def reset(self):
Discard all manager content
This doesn't reset the last TID.
self._ttid_dict = {}
self._node_dict = {}
def hasPending(self):
Returns True if some transactions are pending
......@@ -61,6 +61,7 @@ class VerificationManager(BaseServiceHandler):
if not app.backup_tid:
def verifyData(self):
app =
......@@ -96,18 +97,33 @@ class VerificationManager(BaseServiceHandler):
# Finish all transactions for which we know that tpc_finish was called
# but not fully processed. This may include replicas with transactions
# that were not even locked.
all_set = set()
for ttid, tid in self._locked_dict.iteritems():
uuid_set = self._voted_dict.get(ttid)
if uuid_set:
all_set |= uuid_set
packet = Packets.ValidateTransaction(ttid, tid)
for node in getIdentifiedList(pool_set=uuid_set):
if app.getLastTransaction() < tid: # XXX: refactoring needed
# If possible, send the packets now.
# Ask last oid/tid again for nodes that recovers locked transactions.
# In fact, this is mainly for the last oid since the last tid can be
# deduced from max(self._locked_dict.values()).
# If getLastIDs is not always instantaneous for some backends, we
# should split AskLastIDs to not ask the last oid/tid at the end of
# recovery phase (and instead ask all nodes once, here).
# With this request, we also prefer to make sure all nodes validate
# successfully before switching to RUNNING state.
def answerLastIDs(self, conn, loid, ltid, lptid, backup_tid):
tm =
ptid =
assert lptid < ptid if None != lptid != ptid else not backup_tid
def answerLockedTransactions(self, conn, tid_dict):
uuid = conn.getUUID()
......@@ -112,19 +112,6 @@ class testTransactionManager(NeoUnitTestBase):
# ...and the lock is available
txnman.begin(client, self.getNextTID())
def test_getNextOIDList(self):
txnman = TransactionManager(lambda tid, txn: None)
# must raise as we don"t have one
self.assertEqual(txnman.getLastOID(), None)
self.assertRaises(RuntimeError, txnman.getNextOIDList, 1)
# ask list
oid_list = txnman.getNextOIDList(15)
self.assertEqual(len(oid_list), 15)
# begin from 1, so generated oid from 2 to 16
for i, oid in zip(xrange(len(oid_list)), oid_list):
self.assertEqual(oid, self.getOID(i+2))
def test_forget(self):
client1 = Mock({'__hash__': 1})
client2 = Mock({'__hash__': 2})
......@@ -948,8 +948,12 @@ class Test(NEOThreadedTest):
raise _UnexpectedSuccess
except ConnectionClosed, e:
e = type(e), None, None
# Also check that the master reset the last oid to a correct value.
self.assertIn('x', c.root())
self.assertEqual(1, u64(c.root()['x']._p_oid))
self.assertEqual(2, u64(cluster.client.new_oid()))
raise _ExpectedFailure(e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment