Commit 7eb7cf1b by Julien Muchembled

Minimize the amount of work during tpc_finish

NEO did not ensure that all data and metadata are written on disk before
tpc_finish, and it was for example vulnerable to ENOSPC errors.
In other words, some work had to be moved to tpc_vote:

- In tpc_vote, all involved storage nodes are now asked to write all metadata
  to ttrans/tobj and _commit_. Because the final tid is not known yet, the tid
  column of ttrans and tobj now contains NULL and the ttid respectively.

- In tpc_finish, AskLockInformation is still required for read locking,
  ttrans.tid is updated with the final value and this change is _committed_.

- The verification phase is greatly simplified, more reliable and faster. For
  all voted transactions, we can know if a tpc_finish was started by getting
  the final tid from the ttid, either from ttrans or from trans. And we know
  that such transactions can't be partial so we don't need to check oids.

So in addition to minimizing the risk of failures during tpc_finish, we also
fix a bug causing the verification phase to discard transactions with objects
for which readCurrent was called.

On performance side:

- Although tpc_vote now asks all involved storages, instead of only those
  storing the transaction metadata, the client has been improved to do this
  in parallel. The additional commits are also all done in parallel.

- A possible improvement to compensate the additional commits is to delay the
  commit done by the unlock.

- By minimizing the time to lock transactions, objects are read-locked for a
  much shorter period. This is even more important that locked transactions
  must be unlocked in the same order.

Transactions with too many modified objects will now timeout inside tpc_vote
instead of tpc_finish. Of course, such transactions may still cause other
transaction to timeout in tpc_finish.
1 parent 99ac542c
...@@ -58,8 +58,6 @@ ...@@ -58,8 +58,6 @@
committed by future transactions. committed by future transactions.
- Add a 'devid' storage configuration so that master do not distribute - Add a 'devid' storage configuration so that master do not distribute
replicated partitions on storages with same 'devid'. replicated partitions on storages with same 'devid'.
- Make tpc_finish safer as described in its __doc__: moving work to
tpc_vote and recover from master failure when possible.
Storage Storage
- Use libmysqld instead of a stand-alone MySQL server. - Use libmysqld instead of a stand-alone MySQL server.
......
...@@ -612,18 +612,29 @@ class Application(ThreadedApplication): ...@@ -612,18 +612,29 @@ class Application(ThreadedApplication):
packet = Packets.AskStoreTransaction(ttid, str(transaction.user), packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), dumps(transaction._extension), str(transaction.description), dumps(transaction._extension),
txn_context['cache_dict']) txn_context['cache_dict'])
add_involved_nodes = txn_context['involved_nodes'].add queue = txn_context['queue']
trans_nodes = []
for node, conn in self.cp.iterateForObject(ttid): for node, conn in self.cp.iterateForObject(ttid):
logging.debug("voting transaction %s on %s", dump(ttid), logging.debug("voting transaction %s on %s", dump(ttid),
dump(conn.getUUID())) dump(conn.getUUID()))
try: try:
self._askStorage(conn, packet) conn.ask(packet, queue=queue)
except ConnectionClosed: except ConnectionClosed:
continue continue
add_involved_nodes(node) trans_nodes.append(node)
# check at least one storage node accepted # check at least one storage node accepted
if txn_context['involved_nodes']: if trans_nodes:
involved_nodes = txn_context['involved_nodes']
packet = Packets.AskVoteTransaction(ttid)
for node in involved_nodes.difference(trans_nodes):
conn = self.cp.getConnForNode(node)
if conn is not None:
try:
conn.ask(packet, queue=queue)
except ConnectionClosed:
pass
involved_nodes.update(trans_nodes)
self.waitResponses(queue)
txn_context['voted'] = None txn_context['voted'] = None
# We must not go further if connection to master was lost since # We must not go further if connection to master was lost since
# tpc_begin, to lower the probability of failing during tpc_finish. # tpc_begin, to lower the probability of failing during tpc_finish.
...@@ -667,27 +678,14 @@ class Application(ThreadedApplication): ...@@ -667,27 +678,14 @@ class Application(ThreadedApplication):
fail in tpc_finish. In particular, making a transaction permanent fail in tpc_finish. In particular, making a transaction permanent
should ideally be as simple as switching a bit permanently. should ideally be as simple as switching a bit permanently.
In NEO, tpc_finish breaks this promise by not ensuring earlier that all In NEO, all the data (with the exception of the tid, simply because
data and metadata are written, and it is for example vulnerable to it is not known yet) is already flushed on disk at the end on the vote.
ENOSPC errors. In other words, some work should be moved to tpc_vote. During tpc_finish, all nodes storing the transaction metadata are asked
to commit by saving the new tid and flushing again: for SQL backends,
TODO: - In tpc_vote, all involved storage nodes must be asked to write it's just an UPDATE of 1 cell. At last, the metadata is moved to
all metadata to ttrans/tobj and _commit_. AskStoreTransaction a final place so that the new transaction is readable, but this is
can be extended for this: for nodes that don't store anything something that can always be replayed (during the verification phase)
in ttrans, it can just contain the ttid. The final tid is not if any failure happens.
known yet, so ttrans/tobj would contain the ttid.
- In tpc_finish, AskLockInformation is still required for read
locking, ttrans.tid must be updated with the final value and
ttrans _committed_.
- The Verification phase would need some change because
ttrans/tobj may contain data for which tpc_finish was not
called. The ttid is also in trans so a mapping ttid<->tid is
always possible and can be forwarded via the master so that all
storage are still able to update the tid column with the final
value when moving rows from tobj to obj.
The resulting cost is:
- additional RPCs in tpc_vote
- 1 updated row in ttrans + commit
TODO: We should recover from master failures when the transaction got TODO: We should recover from master failures when the transaction got
successfully committed. More precisely, we should not raise: successfully committed. More precisely, we should not raise:
......
...@@ -112,9 +112,11 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -112,9 +112,11 @@ class StorageAnswersHandler(AnswerBaseHandler):
answerCheckCurrentSerial = answerStoreObject answerCheckCurrentSerial = answerStoreObject
def answerStoreTransaction(self, conn, _): def answerStoreTransaction(self, conn):
pass pass
answerVoteTransaction = answerStoreTransaction
def answerTIDsFrom(self, conn, tid_list): def answerTIDsFrom(self, conn, tid_list):
logging.debug('Get %u TIDs from %r', len(tid_list), conn) logging.debug('Get %u TIDs from %r', len(tid_list), conn)
self.app.setHandlerData(tid_list) self.app.setHandlerData(tid_list)
......
...@@ -786,8 +786,8 @@ class StopOperation(Packet): ...@@ -786,8 +786,8 @@ class StopOperation(Packet):
class UnfinishedTransactions(Packet): class UnfinishedTransactions(Packet):
""" """
Ask unfinished transactions PM -> S. Ask unfinished transactions S -> PM.
Answer unfinished transactions S -> PM. Answer unfinished transactions PM -> S.
""" """
_answer = PStruct('answer_unfinished_transactions', _answer = PStruct('answer_unfinished_transactions',
PTID('max_tid'), PTID('max_tid'),
...@@ -796,36 +796,36 @@ class UnfinishedTransactions(Packet): ...@@ -796,36 +796,36 @@ class UnfinishedTransactions(Packet):
), ),
) )
class ObjectPresent(Packet): class LockedTransactions(Packet):
""" """
Ask if an object is present. If not present, OID_NOT_FOUND should be Ask locked transactions PM -> S.
returned. PM -> S. Answer locked transactions S -> PM.
Answer that an object is present. PM -> S.
""" """
_fmt = PStruct('object_present', _answer = PStruct('answer_locked_transactions',
POID('oid'), PDict('tid_dict',
PTID('tid'), PTID('ttid'),
) PTID('tid'),
),
_answer = PStruct('object_present',
POID('oid'),
PTID('tid'),
) )
class DeleteTransaction(Packet): class FinalTID(Packet):
""" """
Delete a transaction. PM -> S. Return final tid if ttid has been committed. * -> S.
""" """
_fmt = PStruct('delete_transaction', _fmt = PStruct('final_tid',
PTID('ttid'),
)
_answer = PStruct('final_tid',
PTID('tid'), PTID('tid'),
PFOidList,
) )
class CommitTransaction(Packet): class ValidateTransaction(Packet):
""" """
Commit a transaction. PM -> S. Commit a transaction. PM -> S.
""" """
_fmt = PStruct('commit_transaction', _fmt = PStruct('validate_transaction',
PTID('ttid'),
PTID('tid'), PTID('tid'),
) )
...@@ -878,11 +878,10 @@ class LockInformation(Packet): ...@@ -878,11 +878,10 @@ class LockInformation(Packet):
_fmt = PStruct('ask_lock_informations', _fmt = PStruct('ask_lock_informations',
PTID('ttid'), PTID('ttid'),
PTID('tid'), PTID('tid'),
PFOidList,
) )
_answer = PStruct('answer_information_locked', _answer = PStruct('answer_information_locked',
PTID('tid'), PTID('ttid'),
) )
class InvalidateObjects(Packet): class InvalidateObjects(Packet):
...@@ -899,7 +898,7 @@ class UnlockInformation(Packet): ...@@ -899,7 +898,7 @@ class UnlockInformation(Packet):
Unlock information on a transaction. PM -> S. Unlock information on a transaction. PM -> S.
""" """
_fmt = PStruct('notify_unlock_information', _fmt = PStruct('notify_unlock_information',
PTID('tid'), PTID('ttid'),
) )
class GenerateOIDs(Packet): class GenerateOIDs(Packet):
...@@ -961,10 +960,17 @@ class StoreTransaction(Packet): ...@@ -961,10 +960,17 @@ class StoreTransaction(Packet):
PString('extension'), PString('extension'),
PFOidList, PFOidList,
) )
_answer = PFEmpty
_answer = PStruct('answer_store_transaction', class VoteTransaction(Packet):
"""
Ask to store a transaction. C -> S.
Answer if transaction has been stored. S -> C.
"""
_fmt = PStruct('ask_vote_transaction',
PTID('tid'), PTID('tid'),
) )
_answer = PFEmpty
class GetObject(Packet): class GetObject(Packet):
""" """
...@@ -1600,12 +1606,12 @@ class Packets(dict): ...@@ -1600,12 +1606,12 @@ class Packets(dict):
StopOperation) StopOperation)
AskUnfinishedTransactions, AnswerUnfinishedTransactions = register( AskUnfinishedTransactions, AnswerUnfinishedTransactions = register(
UnfinishedTransactions) UnfinishedTransactions)
AskObjectPresent, AnswerObjectPresent = register( AskLockedTransactions, AnswerLockedTransactions = register(
ObjectPresent) LockedTransactions)
DeleteTransaction = register( AskFinalTID, AnswerFinalTID = register(
DeleteTransaction) FinalTID)
CommitTransaction = register( ValidateTransaction = register(
CommitTransaction) ValidateTransaction)
AskBeginTransaction, AnswerBeginTransaction = register( AskBeginTransaction, AnswerBeginTransaction = register(
BeginTransaction) BeginTransaction)
AskFinishTransaction, AnswerTransactionFinished = register( AskFinishTransaction, AnswerTransactionFinished = register(
...@@ -1624,6 +1630,8 @@ class Packets(dict): ...@@ -1624,6 +1630,8 @@ class Packets(dict):
AbortTransaction) AbortTransaction)
AskStoreTransaction, AnswerStoreTransaction = register( AskStoreTransaction, AnswerStoreTransaction = register(
StoreTransaction) StoreTransaction)
AskVoteTransaction, AnswerVoteTransaction = register(
VoteTransaction)
AskObject, AnswerObject = register( AskObject, AnswerObject = register(
GetObject) GetObject)
AskTIDs, AnswerTIDs = register( AskTIDs, AnswerTIDs = register(
......
...@@ -59,9 +59,10 @@ class ClientServiceHandler(MasterHandler): ...@@ -59,9 +59,10 @@ class ClientServiceHandler(MasterHandler):
pt = app.pt pt = app.pt
# Collect partitions related to this transaction. # Collect partitions related to this transaction.
lock_oid_list = oid_list + checked_list getPartition = pt.getPartition
partition_set = set(map(pt.getPartition, lock_oid_list)) partition_set = set(map(getPartition, oid_list))
partition_set.add(pt.getPartition(ttid)) partition_set.update(map(getPartition, checked_list))
partition_set.add(getPartition(ttid))
# Collect the UUIDs of nodes related to this transaction. # Collect the UUIDs of nodes related to this transaction.
uuid_list = filter(app.isStorageReady, {cell.getUUID() uuid_list = filter(app.isStorageReady, {cell.getUUID()
...@@ -85,7 +86,6 @@ class ClientServiceHandler(MasterHandler): ...@@ -85,7 +86,6 @@ class ClientServiceHandler(MasterHandler):
{x.getUUID() for x in identified_node_list}, {x.getUUID() for x in identified_node_list},
conn.getPeerId(), conn.getPeerId(),
), ),
lock_oid_list,
) )
for node in identified_node_list: for node in identified_node_list:
node.ask(p, timeout=60) node.ask(p, timeout=60)
......
...@@ -359,7 +359,12 @@ class TransactionManager(object): ...@@ -359,7 +359,12 @@ class TransactionManager(object):
self._unlockPending() self._unlockPending()
def _unlockPending(self): def _unlockPending(self):
# unlock pending transactions """Serialize transaction unlocks
This should rarely delay unlocks since the time needed to lock a
transaction is roughly constant. The most common case where reordering
is required is when some storages are already busy by other tasks.
"""
queue = self._queue queue = self._queue
pop = queue.pop pop = queue.pop
insert = queue.insert insert = queue.insert
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections import defaultdict
from neo.lib import logging from neo.lib import logging
from neo.lib.util import dump from neo.lib.util import dump
from neo.lib.protocol import ClusterStates, Packets, NodeStates from neo.lib.protocol import ClusterStates, Packets, NodeStates
...@@ -37,10 +38,9 @@ class VerificationManager(BaseServiceHandler): ...@@ -37,10 +38,9 @@ class VerificationManager(BaseServiceHandler):
""" """
def __init__(self, app): def __init__(self, app):
self._oid_set = set() self._locked_dict = {}
self._tid_set = set() self._voted_dict = defaultdict(set)
self._uuid_set = set() self._uuid_set = set()
self._object_present = False
def _askStorageNodesAndWait(self, packet, node_list): def _askStorageNodesAndWait(self, packet, node_list):
poll = self.app.em.poll poll = self.app.em.poll
...@@ -87,7 +87,6 @@ class VerificationManager(BaseServiceHandler): ...@@ -87,7 +87,6 @@ class VerificationManager(BaseServiceHandler):
self.app.broadcastPartitionChanges(self.app.pt.outdate()) self.app.broadcastPartitionChanges(self.app.pt.outdate())
def verifyData(self): def verifyData(self):
"""Verify the data in storage nodes and clean them up, if necessary."""
app = self.app app = self.app
# wait for any missing node # wait for any missing node
...@@ -100,106 +99,59 @@ class VerificationManager(BaseServiceHandler): ...@@ -100,106 +99,59 @@ class VerificationManager(BaseServiceHandler):
logging.info('start to verify data') logging.info('start to verify data')
getIdentifiedList = app.nm.getIdentifiedList getIdentifiedList = app.nm.getIdentifiedList
# Gather all unfinished transactions. # Gather all transactions that may have been partially finished.
self._askStorageNodesAndWait(Packets.AskUnfinishedTransactions(), self._askStorageNodesAndWait(Packets.AskLockedTransactions(),
[x for x in getIdentifiedList() if x.isStorage()]) [x for x in getIdentifiedList() if x.isStorage()])
# Gather OIDs for each unfinished TID, and verify whether the # Some nodes may have already unlocked these transactions and
# transaction can be finished or must be aborted. This could be # _locked_dict is incomplete, but we can ask them the final tid.
# in parallel in theory, but not so easy. Thus do it one-by-one for ttid, voted_set in self._voted_dict.iteritems():
# at the moment. if ttid in self._locked_dict:
for tid in self._tid_set: continue
uuid_set = self.verifyTransaction(tid) partition = app.pt.getPartition(ttid)
if uuid_set is None: for node in getIdentifiedList(pool_set={cell.getUUID()
packet = Packets.DeleteTransaction(tid, self._oid_set or []) # If an outdated cell had unlocked ttid, then either
# Make sure that no node has this transaction. # it is already in _locked_dict or a readable cell also
for node in getIdentifiedList(): # unlocked it.
if node.isStorage(): for cell in app.pt.getCellList(partition, readable=True)
node.notify(packet) } - voted_set):
self._askStorageNodesAndWait(Packets.AskFinalTID(ttid), (node,))
if self._tid is not None:
self._locked_dict[ttid] = self._tid
break
else: else:
if app.getLastTransaction() < tid: # XXX: refactoring needed # Transaction not locked. No need to tell nodes to delete it,
app.setLastTransaction(tid) # since they drop any unfinished data just before being
app.tm.setLastTID(tid) # operational.
packet = Packets.CommitTransaction(tid) pass
# Finish all transactions for which we know that tpc_finish was called
# but not fully processed. This may include replicas with transactions
# that were not even locked.
for ttid, tid in self._locked_dict.iteritems():
uuid_set = self._voted_dict.get(ttid)
if uuid_set:
packet = Packets.ValidateTransaction(ttid, tid)
for node in getIdentifiedList(pool_set=uuid_set): for node in getIdentifiedList(pool_set=uuid_set):
node.notify(packet) node.notify(packet)
self._oid_set = set() if app.getLastTransaction() < tid: # XXX: refactoring needed
# If possible, send the packets now. app.setLastTransaction(tid)
app.em.poll(0) app.tm.setLastTID(tid)
def verifyTransaction(self, tid): # If possible, send the packets now.
nm = self.app.nm app.em.poll(0)
uuid_set = set()
def answerLockedTransactions(self, conn, tid_dict):
# Determine to which nodes I should ask. uuid = conn.getUUID()
partition = self.app.pt.getPartition(tid) self._uuid_set.remove(uuid)
uuid_list = [cell.getUUID() for cell \ for ttid, tid in tid_dict.iteritems():
in self.app.pt.getCellList(partition, readable=True)] if tid:
if len(uuid_list) == 0: self._locked_dict[ttid] = tid
raise VerificationFailure self._voted_dict[ttid].add(uuid)
uuid_set.update(uuid_list)
def answerFinalTID(self, conn, tid):
# Gather OIDs.
node_list = self.app.nm.getIdentifiedList(pool_set=uuid_list)
if len(node_list) == 0:
raise VerificationFailure
self._askStorageNodesAndWait(Packets.AskTransactionInformation(tid),
node_list)
if self._oid_set is None or len(self._oid_set) == 0:
# Not commitable.
return None
# Verify that all objects are present.
for oid in self._oid_set:
partition = self.app.pt.getPartition(oid)
object_uuid_list = [cell.getUUID() for cell \
in self.app.pt.getCellList(partition, readable=True)]
if len(object_uuid_list) == 0:
raise VerificationFailure
uuid_set.update(object_uuid_list)
self._object_present = True
self._askStorageNodesAndWait(Packets.AskObjectPresent(oid, tid),
nm.getIdentifiedList(pool_set=object_uuid_list))
if not self._object_present:
# Not commitable.
return None
return uuid_set
def answerUnfinishedTransactions(self, conn, max_tid, tid_list):
logging.info('got unfinished transactions %s from %r',
map(dump, tid_list), conn)
self._uuid_set.remove(conn.getUUID())
self._tid_set.update(tid_list)
def answerTransactionInformation(self, conn, tid,
user, desc, ext, packed, oid_list):
self._uuid_set.remove(conn.getUUID())
oid_set = set(oid_list)
if self._oid_set is None:
# Someone does not agree.
pass
elif len(self._oid_set) == 0:
# This is the first answer.
self._oid_set.update(oid_set)
elif self._oid_set != oid_set:
raise ValueError, "Inconsistent transaction %s" % \
(dump(tid, ))
def tidNotFound(self, conn, message):
logging.info('TID not found: %s', message)
self._uuid_set.remove(conn.getUUID())
self._oid_set = None
def answerObjectPresent(self, conn, oid, tid):
logging.info('object %s:%s found', dump(oid), dump(tid))
self._uuid_set.remove(conn.getUUID())
def oidNotFound(self, conn, message):
logging.info('OID not found: %s', message)
self._uuid_set.remove(conn.getUUID()) self._uuid_set.remove(conn.getUUID())
self._object_present = False self._tid = tid
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
pass pass
......
...@@ -53,7 +53,6 @@ UNIT_TEST_MODULES = [ ...@@ -53,7 +53,6 @@ UNIT_TEST_MODULES = [
'neo.tests.storage.testMasterHandler', 'neo.tests.storage.testMasterHandler',
'neo.tests.storage.testStorageApp', 'neo.tests.storage.testStorageApp',
'neo.tests.storage.testStorage' + os.getenv('NEO_TESTS_ADAPTER', 'SQLite'), 'neo.tests.storage.testStorage' + os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
'neo.tests.storage.testVerificationHandler',
'neo.tests.storage.testIdentificationHandler', 'neo.tests.storage.testIdentificationHandler',
'neo.tests.storage.testTransactions', 'neo.tests.storage.testTransactions',
# client application # client application
......
...@@ -297,8 +297,9 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -297,8 +297,9 @@ class ImporterDatabaseManager(DatabaseManager):
self.db = buildDatabaseManager(main['adapter'], self.db = buildDatabaseManager(main['adapter'],
(main['database'], main.get('engine'), main['wait'])) (main['database'], main.get('engine'), main['wait']))
for x in """query erase getConfiguration _setConfiguration for x in """query erase getConfiguration _setConfiguration
getPartitionTable changePartitionTable getUnfinishedTIDList getPartitionTable changePartitionTable
dropUnfinishedData storeTransaction finishTransaction getUnfinishedTIDDict dropUnfinishedData abortTransaction
storeTransaction lockTransaction unlockTransaction
storeData _pruneData storeData _pruneData
""".split(): """.split():
setattr(self, x, getattr(self.db, x)) setattr(self, x, getattr(self.db, x))
...@@ -421,7 +422,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -421,7 +422,7 @@ class ImporterDatabaseManager(DatabaseManager):
logging.warning("All data are imported. You should change" logging.warning("All data are imported. You should change"
" your configuration to use the native backend and restart.") " your configuration to use the native backend and restart.")
self._import = None self._import = None
for x in """getObject objectPresent getReplicationTIDList for x in """getObject getReplicationTIDList
""".split(): """.split():
setattr(self, x, getattr(self.db, x)) setattr(self, x, getattr(self.db, x))
...@@ -434,23 +435,11 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -434,23 +435,11 @@ class ImporterDatabaseManager(DatabaseManager):
zodb = self.zodb[bisect(self.zodb_index, oid) - 1] zodb = self.zodb[bisect(self.zodb_index, oid) - 1]
return zodb, oid - zodb.shift_oid return zodb, oid - zodb.shift_oid
def getLastIDs(self, all=True): def getLastIDs(self):
tid, _, _, oid = self.db.getLastIDs(all) tid, _, _, oid = self.db.getLastIDs()
return (max(tid, util.p64(self.zodb_ltid)), None, None, return (max(tid, util.p64(self.zodb_ltid)), None, None,
max(oid, util.p64(self.zodb_loid))) max(oid, util.p64(self.zodb_loid)))
def objectPresent(self, oid, tid, all=True):
r = self.db.objectPresent(oid, tid, all)
if not r:
u_oid = util.u64(oid)
u_tid = util.u64(tid)
if self.inZodb(u_oid, u_tid):
zodb, oid = self.zodbFromOid(u_oid)
try:
return zodb.loadSerial(util.p64(oid), tid)
except POSKeyError:
pass
def getObject(self, oid, tid=None, before_tid=None): def getObject(self, oid, tid=None, before_tid=None):
u64 = util.u64 u64 = util.u64
u_oid = u64(oid) u_oid = u64(oid)
...@@ -511,6 +500,16 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -511,6 +500,16 @@ class ImporterDatabaseManager(DatabaseManager):
else: else:
return self.db.getTransaction(tid, all) return self.db.getTransaction(tid, all)
def getFinalTID(self, ttid):
if u64(ttid) <= self.zodb_ltid and self._import:
raise NotImplementedError
return self.db.getFinalTID(ttid)
def deleteTransaction(self, tid):
if u64(tid) <= self.zodb_ltid and self._import:
raise NotImplementedError
self.db.deleteTransaction(tid)
def getReplicationTIDList(self, min_tid, max_tid, length, partition): def getReplicationTIDList(self, min_tid, max_tid, length, partition):
p64 = util.p64 p64 = util.p64
tid = p64(self.zodb_tid) tid = p64(self.zodb_tid)
......
...@@ -222,10 +222,10 @@ class DatabaseManager(object): ...@@ -222,10 +222,10 @@ class DatabaseManager(object):
""" """
raise NotImplementedError raise NotImplementedError
def _getLastIDs(self, all=True): def _getLastIDs(self):
raise NotImplementedError raise NotImplementedError
def getLastIDs(self, all=True): def getLastIDs(self):
trans, obj, oid = self._getLastIDs() trans, obj, oid = self._getLastIDs()
if trans: if trans:
tid = max(trans.itervalues()) tid = max(trans.itervalues())
...@@ -241,16 +241,16 @@ class DatabaseManager(object): ...@@ -241,16 +241,16 @@ class DatabaseManager(object):
trans = obj = {} trans = obj = {}
return tid, trans, obj, oid return tid, trans, obj, oid
def getUnfinishedTIDList(self): def _getUnfinishedTIDDict(self):
"""Return a list of unfinished transaction's IDs."""
raise NotImplementedError raise NotImplementedError
def objectPresent(self, oid, tid, all = True): def getUnfinishedTIDDict(self):
"""Return true iff an object specified by a given pair of an trans, obj = self._getUnfinishedTIDDict()
object ID and a transaction ID is present in a database. obj = dict.fromkeys(obj)
Otherwise, return false. If all is true, the object must be obj.update(trans)
searched from unfinished transactions as well.""" p64 = util.p64
raise NotImplementedError return {p64(ttid): None if tid is None else p64(tid)
for ttid, tid in obj.iteritems()}
@fallback @fallback
def getLastObjectTID(self, oid): def getLastObjectTID(self, oid):
...@@ -478,14 +478,18 @@ class DatabaseManager(object): ...@@ -478,14 +478,18 @@ class DatabaseManager(object):
data_tid = p64(data_tid) data_tid = p64(data_tid)
return p64(current_tid), data_tid, is_current return p64(current_tid), data_tid, is_current
def finishTransaction(self, tid): def lockTransaction(self, tid, ttid):
"""Finish a transaction specified by a given ID, by moving """Mark voted transaction 'ttid' as committed with given 'tid'"""
temporarily data to a finished area.""" raise NotImplementedError
def unlockTransaction(self, tid, ttid):
"""Finalize a transaction by moving data to a finished area."""
raise NotImplementedError
def abortTransaction(self, ttid):
raise NotImplementedError raise NotImplementedError
def deleteTransaction(self, tid, oid_list=()): def deleteTransaction(self, tid):
"""Delete a transaction and its content specified by a given ID and
an oid list"""
raise NotImplementedError raise NotImplementedError
def deleteObject(self, oid, serial=None): def deleteObject(self, oid, serial=None):
......
...@@ -214,7 +214,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -214,7 +214,7 @@ class MySQLDatabaseManager(DatabaseManager):
# The table "ttrans" stores information on uncommitted transactions. # The table "ttrans" stores information on uncommitted transactions.
q("""CREATE TABLE IF NOT EXISTS ttrans ( q("""CREATE TABLE IF NOT EXISTS ttrans (
`partition` SMALLINT UNSIGNED NOT NULL, `partition` SMALLINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED NOT NULL, tid BIGINT UNSIGNED,
packed BOOLEAN NOT NULL, packed BOOLEAN NOT NULL,
oids MEDIUMBLOB NOT NULL, oids MEDIUMBLOB NOT NULL,
user BLOB NOT NULL, user BLOB NOT NULL,
...@@ -274,7 +274,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -274,7 +274,7 @@ class MySQLDatabaseManager(DatabaseManager):
return self.query("SELECT MAX(t) FROM (SELECT MAX(tid) as t FROM trans" return self.query("SELECT MAX(t) FROM (SELECT MAX(tid) as t FROM trans"
" WHERE tid<=%s GROUP BY `partition`) as t" % max_tid)[0][0] " WHERE tid<=%s GROUP BY `partition`) as t" % max_tid)[0][0]
def _getLastIDs(self, all=True): def _getLastIDs(self):
p64 = util.p64 p64 = util.p64
q = self.query q = self.query
trans = {partition: p64(tid) trans = {partition: p64(tid)
...@@ -285,29 +285,21 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -285,29 +285,21 @@ class MySQLDatabaseManager(DatabaseManager):
" FROM obj GROUP BY `partition`")} " FROM obj GROUP BY `partition`")}
oid = q("SELECT MAX(oid) FROM (SELECT MAX(oid) AS oid FROM obj" oid = q("SELECT MAX(oid) FROM (SELECT MAX(oid) AS oid FROM obj"
" GROUP BY `partition`) as t")[0][0] " GROUP BY `partition`) as t")[0][0]
if all:
tid = q("SELECT MAX(tid) FROM ttrans")[0][0]
if tid is not None:
trans[None] = p64(tid)
tid, toid = q("SELECT MAX(tid), MAX(oid) FROM tobj")[0]
if tid is not None:
obj[None] = p64(tid)
if toid is not None and (oid < toid or oid is None):
oid = toid
return trans, obj, None if oid is None else p64(oid) return trans, obj, None if oid is None else p64(oid)
def getUnfinishedTIDList(self): def _getUnfinishedTIDDict(self):
p64 = util.p64
return [p64(t[0]) for t in self.query("SELECT tid FROM ttrans"
" UNION SELECT tid FROM tobj")]
def objectPresent(self, oid, tid, all = True):
oid = util.u64(oid)
tid = util.u64(tid)
q = self.query q = self.query
return q("SELECT 1 FROM obj WHERE `partition`=%d AND oid=%d AND tid=%d" return q("SELECT ttid, tid FROM ttrans"), (ttid
% (self._getPartition(oid), oid, tid)) or all and \ for ttid, in q("SELECT DISTINCT tid FROM tobj"))
q("SELECT 1 FROM tobj WHERE tid=%d AND oid=%d" % (tid, oid))
def getFinalTID(self, ttid):
ttid = util.u64(ttid)
# MariaDB is smart enough to realize that 'ttid' is constant.
r = self.query("SELECT tid FROM trans"
" WHERE `partition`=%s AND tid>=ttid AND ttid=%s LIMIT 1"
% (self._getPartition(ttid), ttid))
if r:
return util.p64(r[0][0])
def getLastObjectTID(self, oid): def getLastObjectTID(self, oid):
oid = util.u64(oid) oid = util.u64(oid)
...@@ -450,9 +442,9 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -450,9 +442,9 @@ class MySQLDatabaseManager(DatabaseManager):
oid_list, user, desc, ext, packed, ttid = transaction oid_list, user, desc, ext, packed, ttid = transaction
partition = self._getPartition(tid) partition = self._getPartition(tid)
assert packed in (0, 1) assert packed in (0, 1)
q("REPLACE INTO %s VALUES (%d,%d,%i,'%s','%s','%s','%s',%d)" % ( q("REPLACE INTO %s VALUES (%s,%s,%s,'%s','%s','%s','%s',%s)" % (
trans_table, partition, tid, packed, e(''.join(oid_list)), trans_table, partition, 'NULL' if temporary else tid, packed,
e(user), e(desc), e(ext), u64(ttid))) e(''.join(oid_list)), e(user), e(desc), e(ext), u64(ttid)))
if temporary: if temporary:
self.commit() self.commit()
...@@ -544,40 +536,40 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -544,40 +536,40 @@ class MySQLDatabaseManager(DatabaseManager):
r = self.query(sql) r = self.query(sql)
return r[0] if r else (None, None) return r[0] if r else (None, None)
def finishTransaction(self, tid): def lockTransaction(self, tid, ttid):
u64 = util.u64
self.query("UPDATE ttrans SET tid=%d WHERE ttid=%d LIMIT 1"
% (u64(tid), u64(ttid)))
self.commit()
def unlockTransaction(self, tid, ttid):
q = self.query q = self.query
tid = util.u64(tid) u64 = util.u64
sql = " FROM tobj WHERE tid=%d" % tid tid = u64(tid)
sql = " FROM tobj WHERE tid=%d" % u64(ttid)
data_id_list = [x for x, in q("SELECT data_id" + sql) if x] data_id_list = [x for x, in q("SELECT data_id" + sql) if x]
q("INSERT INTO obj SELECT *" + sql) q("INSERT INTO obj SELECT `partition`, oid, %d, data_id, value_tid %s"
q("DELETE FROM tobj WHERE tid=%d" % tid) % (tid, sql))
q("DELETE" + sql)
q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=%d" % tid) q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=%d" % tid)
q("DELETE FROM ttrans WHERE tid=%d" % tid) q("DELETE FROM ttrans WHERE tid=%d" % tid)
self.releaseData(data_id_list) self.releaseData(data_id_list)
self.commit() self.commit()
def deleteTransaction(self, tid, oid_list=()): def abortTransaction(self, ttid):
u64 = util.u64 ttid = util.u64(ttid)
tid = u64(tid)
getPartition = self._getPartition
q = self.query q = self.query
sql = " FROM tobj WHERE tid=%d" % tid sql = " FROM tobj WHERE tid=%s" % ttid
data_id_list = [x for x, in q("SELECT data_id" + sql) if x] data_id_list = [x for x, in q("SELECT data_id" + sql) if x]
self.releaseData(data_id_list)
q("DELETE" + sql) q("DELETE" + sql)
q("""DELETE FROM ttrans WHERE tid = %d""" % tid) q("DELETE FROM ttrans WHERE ttid=%s" % ttid)
q("""DELETE FROM trans WHERE `partition` = %d AND tid = %d""" % self.releaseData(data_id_list, True)
(getPartition(tid), tid))
# delete from obj using indexes def deleteTransaction(self, tid):
data_id_list = set(data_id_list) tid = util.u64(tid)
for oid in oid_list: getPartition = self._getPartition
oid = u64(oid) self.query("DELETE FROM trans WHERE `partition`=%s AND tid=%s" %