Minimize the amount of work during tpc_finish

NEO did not ensure that all data and metadata are written on disk before
tpc_finish, and it was for example vulnerable to ENOSPC errors.
In other words, some work had to be moved to tpc_vote:

- In tpc_vote, all involved storage nodes are now asked to write all metadata
  to ttrans/tobj and _commit_. Because the final tid is not known yet, the tid
  column of ttrans and tobj now contains NULL and the ttid respectively.

- In tpc_finish, AskLockInformation is still required for read locking,
  ttrans.tid is updated with the final value and this change is _committed_.

- The verification phase is greatly simplified, more reliable and faster. For
  all voted transactions, we can know if a tpc_finish was started by getting
  the final tid from the ttid, either from ttrans or from trans. And we know
  that such transactions can't be partial so we don't need to check oids.

So in addition to minimizing the risk of failures during tpc_finish, we also
fix a bug causing the verification phase to discard transactions with objects
for which readCurrent was called.

On performance side:

- Although tpc_vote now asks all involved storages, instead of only those
  storing the transaction metadata, the client has been improved to do this
  in parallel. The additional commits are also all done in parallel.

- A possible improvement to compensate the additional commits is to delay the
  commit done by the unlock.

- By minimizing the time to lock transactions, objects are read-locked for a
  much shorter period. This is even more important that locked transactions
  must be unlocked in the same order.

Transactions with too many modified objects will now timeout inside tpc_vote
instead of tpc_finish. Of course, such transactions may still cause other
transaction to timeout in tpc_finish.
......@@ -58,8 +58,6 @@
committed by future transactions.
- Add a 'devid' storage configuration so that master do not distribute
replicated partitions on storages with same 'devid'.
- Make tpc_finish safer as described in its __doc__: moving work to
tpc_vote and recover from master failure when possible.
- Use libmysqld instead of a stand-alone MySQL server.
......@@ -612,18 +612,29 @@ class Application(ThreadedApplication):
packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), dumps(transaction._extension),
add_involved_nodes = txn_context['involved_nodes'].add
queue = txn_context['queue']
trans_nodes = []
for node, conn in self.cp.iterateForObject(ttid):
logging.debug("voting transaction %s on %s", dump(ttid),
self._askStorage(conn, packet)
conn.ask(packet, queue=queue)
except ConnectionClosed:
# check at least one storage node accepted
if txn_context['involved_nodes']:
if trans_nodes:
involved_nodes = txn_context['involved_nodes']
packet = Packets.AskVoteTransaction(ttid)
for node in involved_nodes.difference(trans_nodes):
conn = self.cp.getConnForNode(node)
if conn is not None:
conn.ask(packet, queue=queue)
except ConnectionClosed:
txn_context['voted'] = None
# We must not go further if connection to master was lost since
# tpc_begin, to lower the probability of failing during tpc_finish.
......@@ -667,27 +678,14 @@ class Application(ThreadedApplication):
fail in tpc_finish. In particular, making a transaction permanent
should ideally be as simple as switching a bit permanently.
In NEO, tpc_finish breaks this promise by not ensuring earlier that all
data and metadata are written, and it is for example vulnerable to
ENOSPC errors. In other words, some work should be moved to tpc_vote.
TODO: - In tpc_vote, all involved storage nodes must be asked to write
all metadata to ttrans/tobj and _commit_. AskStoreTransaction
can be extended for this: for nodes that don't store anything
in ttrans, it can just contain the ttid. The final tid is not
known yet, so ttrans/tobj would contain the ttid.
- In tpc_finish, AskLockInformation is still required for read
locking, ttrans.tid must be updated with the final value and
ttrans _committed_.
- The Verification phase would need some change because
ttrans/tobj may contain data for which tpc_finish was not
called. The ttid is also in trans so a mapping ttid<->tid is
always possible and can be forwarded via the master so that all
storage are still able to update the tid column with the final
value when moving rows from tobj to obj.
The resulting cost is:
- additional RPCs in tpc_vote
- 1 updated row in ttrans + commit
In NEO, all the data (with the exception of the tid, simply because
it is not known yet) is already flushed on disk at the end on the vote.
During tpc_finish, all nodes storing the transaction metadata are asked
to commit by saving the new tid and flushing again: for SQL backends,
it's just an UPDATE of 1 cell. At last, the metadata is moved to
a final place so that the new transaction is readable, but this is
something that can always be replayed (during the verification phase)
if any failure happens.
TODO: We should recover from master failures when the transaction got
successfully committed. More precisely, we should not raise:
......@@ -112,9 +112,11 @@ class StorageAnswersHandler(AnswerBaseHandler):
answerCheckCurrentSerial = answerStoreObject
def answerStoreTransaction(self, conn, _):
def answerStoreTransaction(self, conn):
answerVoteTransaction = answerStoreTransaction
def answerTIDsFrom(self, conn, tid_list):
logging.debug('Get %u TIDs from %r', len(tid_list), conn)
......@@ -786,8 +786,8 @@ class StopOperation(Packet):
class UnfinishedTransactions(Packet):
Ask unfinished transactions PM -> S.
Answer unfinished transactions S -> PM.
Ask unfinished transactions S -> PM.
Answer unfinished transactions PM -> S.
_answer = PStruct('answer_unfinished_transactions',
......@@ -796,36 +796,36 @@ class UnfinishedTransactions(Packet):
class ObjectPresent(Packet):
class LockedTransactions(Packet):
Ask if an object is present. If not present, OID_NOT_FOUND should be
returned. PM -> S.
Answer that an object is present. PM -> S.
Ask locked transactions PM -> S.
Answer locked transactions S -> PM.
_fmt = PStruct('object_present',
_answer = PStruct('object_present',
_answer = PStruct('answer_locked_transactions',
class DeleteTransaction(Packet):
class FinalTID(Packet):
Delete a transaction. PM -> S.
Return final tid if ttid has been committed. * -> S.
_fmt = PStruct('delete_transaction',
_fmt = PStruct('final_tid',
_answer = PStruct('final_tid',
class CommitTransaction(Packet):
class ValidateTransaction(Packet):
Commit a transaction. PM -> S.
_fmt = PStruct('commit_transaction',
_fmt = PStruct('validate_transaction',
......@@ -878,11 +878,10 @@ class LockInformation(Packet):
_fmt = PStruct('ask_lock_informations',
_answer = PStruct('answer_information_locked',
class InvalidateObjects(Packet):
......@@ -899,7 +898,7 @@ class UnlockInformation(Packet):
Unlock information on a transaction. PM -> S.
_fmt = PStruct('notify_unlock_information',
class GenerateOIDs(Packet):
......@@ -961,10 +960,17 @@ class StoreTransaction(Packet):
_answer = PFEmpty
_answer = PStruct('answer_store_transaction',
class VoteTransaction(Packet):
Ask to store a transaction. C -> S.
Answer if transaction has been stored. S -> C.
_fmt = PStruct('ask_vote_transaction',
_answer = PFEmpty
class GetObject(Packet):
......@@ -1600,12 +1606,12 @@ class Packets(dict):
AskUnfinishedTransactions, AnswerUnfinishedTransactions = register(
AskObjectPresent, AnswerObjectPresent = register(
DeleteTransaction = register(
CommitTransaction = register(
AskLockedTransactions, AnswerLockedTransactions = register(
AskFinalTID, AnswerFinalTID = register(
ValidateTransaction = register(
AskBeginTransaction, AnswerBeginTransaction = register(
AskFinishTransaction, AnswerTransactionFinished = register(
......@@ -1624,6 +1630,8 @@ class Packets(dict):
AskStoreTransaction, AnswerStoreTransaction = register(
AskVoteTransaction, AnswerVoteTransaction = register(
AskObject, AnswerObject = register(
AskTIDs, AnswerTIDs = register(
......@@ -59,9 +59,10 @@ class ClientServiceHandler(MasterHandler):
pt =
# Collect partitions related to this transaction.
lock_oid_list = oid_list + checked_list
partition_set = set(map(pt.getPartition, lock_oid_list))
getPartition = pt.getPartition
partition_set = set(map(getPartition, oid_list))
partition_set.update(map(getPartition, checked_list))
# Collect the UUIDs of nodes related to this transaction.
uuid_list = filter(app.isStorageReady, {cell.getUUID()
......@@ -85,7 +86,6 @@ class ClientServiceHandler(MasterHandler):
{x.getUUID() for x in identified_node_list},
for node in identified_node_list:
node.ask(p, timeout=60)
......@@ -359,7 +359,12 @@ class TransactionManager(object):
def _unlockPending(self):
# unlock pending transactions
"""Serialize transaction unlocks
This should rarely delay unlocks since the time needed to lock a
transaction is roughly constant. The most common case where reordering
is required is when some storages are already busy by other tasks.
queue = self._queue
pop = queue.pop
insert = queue.insert
......@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <>.
from collections import defaultdict
from neo.lib import logging
from neo.lib.util import dump
from neo.lib.protocol import ClusterStates, Packets, NodeStates
......@@ -37,10 +38,9 @@ class VerificationManager(BaseServiceHandler):
def __init__(self, app):
self._oid_set = set()
self._tid_set = set()
self._locked_dict = {}
self._voted_dict = defaultdict(set)
self._uuid_set = set()
self._object_present = False
def _askStorageNodesAndWait(self, packet, node_list):
poll =
......@@ -87,7 +87,6 @@ class VerificationManager(BaseServiceHandler):
def verifyData(self):
"""Verify the data in storage nodes and clean them up, if necessary."""
app =
# wait for any missing node
......@@ -100,106 +99,59 @@ class VerificationManager(BaseServiceHandler):'start to verify data')
getIdentifiedList = app.nm.getIdentifiedList
# Gather all unfinished transactions.
# Gather all transactions that may have been partially finished.
[x for x in getIdentifiedList() if x.isStorage()])
# Gather OIDs for each unfinished TID, and verify whether the
# transaction can be finished or must be aborted. This could be
# in parallel in theory, but not so easy. Thus do it one-by-one
# at the moment.
for tid in self._tid_set:
uuid_set = self.verifyTransaction(tid)
if uuid_set is None:
packet = Packets.DeleteTransaction(tid, self._oid_set or [])
# Make sure that no node has this transaction.
for node in getIdentifiedList():
if node.isStorage():
# Some nodes may have already unlocked these transactions and
# _locked_dict is incomplete, but we can ask them the final tid.
for ttid, voted_set in self._voted_dict.iteritems():
if ttid in self._locked_dict:
partition =
for node in getIdentifiedList(pool_set={cell.getUUID()
# If an outdated cell had unlocked ttid, then either
# it is already in _locked_dict or a readable cell also
# unlocked it.
for cell in, readable=True)
} - voted_set):
self._askStorageNodesAndWait(Packets.AskFinalTID(ttid), (node,))
if self._tid is not None:
self._locked_dict[ttid] = self._tid
if app.getLastTransaction() < tid: # XXX: refactoring needed
packet = Packets.CommitTransaction(tid)
# Transaction not locked. No need to tell nodes to delete it,
# since they drop any unfinished data just before being
# operational.
# Finish all transactions for which we know that tpc_finish was called
# but not fully processed. This may include replicas with transactions
# that were not even locked.
for ttid, tid in self._locked_dict.iteritems():
uuid_set = self._voted_dict.get(ttid)
if uuid_set:
packet = Packets.ValidateTransaction(ttid, tid)
for node in getIdentifiedList(pool_set=uuid_set):
self._oid_set = set()
# If possible, send the packets now.
def verifyTransaction(self, tid):
nm =
uuid_set = set()
# Determine to which nodes I should ask.
partition =
uuid_list = [cell.getUUID() for cell \
in, readable=True)]
if len(uuid_list) == 0:
raise VerificationFailure
# Gather OIDs.
node_list =
if len(node_list) == 0:
raise VerificationFailure
if self._oid_set is None or len(self._oid_set) == 0:
# Not commitable.
return None
# Verify that all objects are present.
for oid in self._oid_set:
partition =
object_uuid_list = [cell.getUUID() for cell \
in, readable=True)]
if len(object_uuid_list) == 0:
raise VerificationFailure
self._object_present = True
self._askStorageNodesAndWait(Packets.AskObjectPresent(oid, tid),
if not self._object_present:
# Not commitable.
return None
return uuid_set
def answerUnfinishedTransactions(self, conn, max_tid, tid_list):'got unfinished transactions %s from %r',
map(dump, tid_list), conn)
def answerTransactionInformation(self, conn, tid,
user, desc, ext, packed, oid_list):
oid_set = set(oid_list)
if self._oid_set is None:
# Someone does not agree.
elif len(self._oid_set) == 0:
# This is the first answer.
elif self._oid_set != oid_set:
raise ValueError, "Inconsistent transaction %s" % \
(dump(tid, ))
def tidNotFound(self, conn, message):'TID not found: %s', message)
self._oid_set = None
def answerObjectPresent(self, conn, oid, tid):'object %s:%s found', dump(oid), dump(tid))
def oidNotFound(self, conn, message):'OID not found: %s', message)
if app.getLastTransaction() < tid: # XXX: refactoring needed
# If possible, send the packets now.
def answerLockedTransactions(self, conn, tid_dict):
uuid = conn.getUUID()
for ttid, tid in tid_dict.iteritems():
if tid:
self._locked_dict[ttid] = tid
def answerFinalTID(self, conn, tid):
self._object_present = False
self._tid = tid
def connectionCompleted(self, conn):
......@@ -53,7 +53,6 @@ UNIT_TEST_MODULES = [
'' + os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
# client application
......@@ -297,8 +297,9 @@ class ImporterDatabaseManager(DatabaseManager):
self.db = buildDatabaseManager(main['adapter'],
(main['database'], main.get('engine'), main['wait']))
for x in """query erase getConfiguration _setConfiguration
getPartitionTable changePartitionTable getUnfinishedTIDList
dropUnfinishedData storeTransaction finishTransaction
getPartitionTable changePartitionTable
getUnfinishedTIDDict dropUnfinishedData abortTransaction
storeTransaction lockTransaction unlockTransaction
storeData _pruneData
setattr(self, x, getattr(self.db, x))
......@@ -421,7 +422,7 @@ class ImporterDatabaseManager(DatabaseManager):
logging.warning("All data are imported. You should change"
" your configuration to use the native backend and restart.")
self._import = None
for x in """getObject objectPresent getReplicationTIDList
for x in """getObject getReplicationTIDList
setattr(self, x, getattr(self.db, x))
......@@ -434,23 +435,11 @@ class ImporterDatabaseManager(DatabaseManager):
zodb = self.zodb[bisect(self.zodb_index, oid) - 1]
return zodb, oid - zodb.shift_oid
def getLastIDs(self, all=True):
tid, _, _, oid = self.db.getLastIDs(all)
def getLastIDs(self):
tid, _, _, oid = self.db.getLastIDs()
return (max(tid, util.p64(self.zodb_ltid)), None, None,
max(oid, util.p64(self.zodb_loid)))
def objectPresent(self, oid, tid, all=True):
r = self.db.objectPresent(oid, tid, all)
if not r:
u_oid = util.u64(oid)
u_tid = util.u64(tid)
if self.inZodb(u_oid, u_tid):
zodb, oid = self.zodbFromOid(u_oid)
return zodb.loadSerial(util.p64(oid), tid)
except POSKeyError:
def getObject(self, oid, tid=None, before_tid=None):
u64 = util.u64
u_oid = u64(oid)
......@@ -511,6 +500,16 @@ class ImporterDatabaseManager(DatabaseManager):
return self.db.getTransaction(tid, all)
def getFinalTID(self, ttid):
if u64(ttid) <= self.zodb_ltid and self._import:
raise NotImplementedError
return self.db.getFinalTID(ttid)
def deleteTransaction(self, tid):
if u64(tid) <= self.zodb_ltid and self._import:
raise NotImplementedError
def getReplicationTIDList(self, min_tid, max_tid, length, partition):
p64 = util.p64
tid = p64(self.zodb_tid)
......@@ -222,10 +222,10 @@ class DatabaseManager(object):
raise NotImplementedError
def _getLastIDs(self, all=True):
def _getLastIDs(self):
raise NotImplementedError
def getLastIDs(self, all=True):
def getLastIDs(self):
trans, obj, oid = self._getLastIDs()
if trans:
tid = max(trans.itervalues())
......@@ -241,16 +241,16 @@ class DatabaseManager(object):
trans = obj = {}
return tid, trans, obj, oid
def getUnfinishedTIDList(self):
"""Return a list of unfinished transaction's IDs."""
def _getUnfinishedTIDDict(self):
raise NotImplementedError
def objectPresent(self, oid, tid, all = True):
"""Return true iff an object specified by a given pair of an
object ID and a transaction ID is present in a database.
Otherwise, return false. If all is true, the object must be
searched from unfinished transactions as well."""
raise NotImplementedError
def getUnfinishedTIDDict(self):
trans, obj = self._getUnfinishedTIDDict()
obj = dict.fromkeys(obj)
p64 = util.p64
return {p64(ttid): None if tid is None else p64(tid)
for ttid, tid in obj.iteritems()}
def getLastObjectTID(self, oid):
......@@ -478,14 +478,18 @@ class DatabaseManager(object):
data_tid = p64(data_tid)
return p64(current_tid), data_tid, is_current
def finishTransaction(self, tid):
"""Finish a transaction specified by a given ID, by moving
temporarily data to a finished area."""
def lockTransaction(self, tid, ttid):
"""Mark voted transaction 'ttid' as committed with given 'tid'"""
raise NotImplementedError
def unlockTransaction(self, tid, ttid):
"""Finalize a transaction by moving data to a finished area."""
raise NotImplementedError
def abortTransaction(self, ttid):
raise NotImplementedError
def deleteTransaction(self, tid, oid_list=()):
"""Delete a transaction and its content specified by a given ID and
an oid list"""
def deleteTransaction(self, tid):
raise NotImplementedError
def deleteObject(self, oid, serial=None):
......@@ -214,7 +214,7 @@ class MySQLDatabaseManager(DatabaseManager):
# The table "ttrans" stores information on uncommitted transactions.
......@@ -274,7 +274,7 @@ class MySQLDatabaseManager(DatabaseManager):
return self.query("SELECT MAX(t) FROM (SELECT MAX(tid) as t FROM trans"
" WHERE tid<=%s GROUP BY `partition`) as t" % max_tid)[0][0]
def _getLastIDs(self, all=True):
def _getLastIDs(self):
p64 = util.p64
q = self.query
trans = {partition: p64(tid)
......@@ -285,29 +285,21 @@ class MySQLDatabaseManager(DatabaseManager):
" FROM obj GROUP BY `partition`")}
oid = q("SELECT MAX(oid) FROM (SELECT MAX(oid) AS oid FROM obj"
" GROUP BY `partition`) as t")[0][0]
if all:
tid = q("SELECT MAX(tid) FROM ttrans")[0][0]
if tid is not None:
trans[None] = p64(tid)
tid, toid = q("SELECT MAX(tid), MAX(oid) FROM tobj")[0]
if tid is not None:
obj[None] = p64(tid)
if toid is not None and (oid < toid or oid is None):
oid = toid
return trans, obj, None if oid is None else p64(oid)
def getUnfinishedTIDList(self):
p64 = util.p64
return [p64(t[0]) for t in self.query("SELECT tid FROM ttrans"
" UNION SELECT tid FROM tobj")]
def objectPresent(self, oid, tid, all = True):
oid = util.u64(oid)
tid = util.u64(tid)
def _getUnfinishedTIDDict(self):
q = self.query
return q("SELECT 1 FROM obj WHERE `partition`=%d AND oid=%d AND tid=%d"
% (self._getPartition(oid), oid, tid)) or all and \
q("SELECT 1 FROM tobj WHERE tid=%d AND oid=%d" % (tid, oid))
return q("SELECT ttid, tid FROM ttrans"), (ttid
for ttid, in q("SELECT DISTINCT tid FROM tobj"))
def getFinalTID(self, ttid):
ttid = util.u64(ttid)
# MariaDB is smart enough to realize that 'ttid' is constant.
r = self.query("SELECT tid FROM trans"
" WHERE `partition`=%s AND tid>=ttid AND ttid=%s LIMIT 1"
% (self._getPartition(ttid), ttid))
if r:
return util.p64(r[0][0])
def getLastObjectTID(self, oid):
oid = util.u64(oid)
......@@ -450,9 +442,9 @@ class MySQLDatabaseManager(DatabaseManager):
oid_list, user, desc, ext, packed, ttid = transaction
partition = self._getPartition(tid)
assert packed in (0, 1)
q("REPLACE INTO %s VALUES (%d,%d,%i,'%s','%s','%s','%s',%d)" % (
trans_table, partition, tid, packed, e(''.join(oid_list)),
e(user), e(desc), e(ext), u64(ttid)))
q("REPLACE INTO %s VALUES (%s,%s,%s,'%s','%s','%s','%s',%s)" % (
trans_table, partition, 'NULL' if temporary else tid, packed,
e(''.join(oid_list)), e(user), e(desc), e(ext), u64(ttid)))
if temporary:
......@@ -544,40 +536,40 @@ class MySQLDatabaseManager(DatabaseManager):
r = self.query(sql)
return r[0] if r else (None, None)
def finishTransaction(self, tid):
def lockTransaction(self, tid, ttid):
u64 = util.u64
self.query("UPDATE ttrans SET tid=%d WHERE ttid=%d LIMIT 1"
% (u64(tid), u64(ttid)))
def unlockTransaction(self, tid, ttid):
q = self.query
tid = util.u64(tid)
sql = " FROM tobj WHERE tid=%d" % tid
u64 = util.u64
tid = u64(tid)
sql = " FROM tobj WHERE tid=%d" % u64(ttid)
data_id_list = [x for x, in q("SELECT data_id" + sql) if x]
q("INSERT INTO obj SELECT *" + sql)
q("DELETE FROM tobj WHERE tid=%d" % tid)
q("INSERT INTO obj SELECT `partition`, oid, %d, data_id, value_tid %s"
% (tid, sql))
q("DELETE" + sql)
q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=%d" % tid)
q("DELETE FROM ttrans WHERE tid=%d" % tid)
def deleteTransaction(self, tid, oid_list=()):
u64 = util.u64
tid = u64(tid)
getPartition = self._getPartition
def abortTransaction(self, ttid):
ttid = util.u64(ttid)
q = self.query
sql = " FROM tobj WHERE tid=%d" % tid
sql = " FROM tobj WHERE tid=%s" % ttid
data_id_list = [x for x, in q("SELECT data_id" + sql) if x]
q("DELETE" + sql)
q("""DELETE FROM ttrans WHERE tid = %d""" % tid)
q("""DELETE FROM trans WHERE `partition` = %d AND tid = %d""" %
(getPartition(tid), tid))
# delete from obj using indexes
data_id_list = set(data_id_list)
for oid in oid_list:
oid = u64(oid)
sql = " FROM obj WHERE `partition`=%d AND oid=%d AND tid=%d" \
% (getPartition(oid), oid, tid)
data_id_list.update(*q("SELECT data_id" + sql))
q("DELETE" + sql)
q("DELETE FROM ttrans WHERE ttid=%s" % ttid)
self.releaseData(data_id_list, True)
def deleteTransaction(self, tid):
tid = util.u64(tid)
getPartition = self._getPartition