Commit 2c3bea29 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Master transaction manager use TTID as index.

- AnswerInformationLocked give ttid instead of tid
- Master transaction manager always use ttid in data structures
- It's no more makes sense to check if the tid is greater than the last
generated as it never comes back from a storage, just check if the ttid is
well known by the transaction manager.
- Rename all tid variable that now hold a ttid
- Transaction manager's queue contains ttids but the corresponding tids are
increasing to keep commit order.
- Adjust tests

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2613 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent cf86ab60
...@@ -220,7 +220,7 @@ class EventHandler(object): ...@@ -220,7 +220,7 @@ class EventHandler(object):
def askLockInformation(self, conn, ttid, tid, oid_list): def askLockInformation(self, conn, ttid, tid, oid_list):
raise UnexpectedPacketError raise UnexpectedPacketError
def answerInformationLocked(self, conn, tid): def answerInformationLocked(self, conn, ttid):
raise UnexpectedPacketError raise UnexpectedPacketError
def invalidateObjects(self, conn, tid, oid_list): def invalidateObjects(self, conn, tid, oid_list):
......
...@@ -560,11 +560,12 @@ class Application(object): ...@@ -560,11 +560,12 @@ class Application(object):
neo.logging.info('Accept a storage %s (%s)' % (dump(uuid), state)) neo.logging.info('Accept a storage %s (%s)' % (dump(uuid), state))
return (uuid, node, state, handler, node_ctor) return (uuid, node, state, handler, node_ctor)
def onTransactionCommitted(self, tid, txn): def onTransactionCommitted(self, txn):
# I have received all the lock answers now: # I have received all the lock answers now:
# - send a Notify Transaction Finished to the initiated client node # - send a Notify Transaction Finished to the initiated client node
# - Invalidate Objects to the other client nodes # - Invalidate Objects to the other client nodes
ttid = txn.getTTID() ttid = txn.getTTID()
tid = txn.getTID()
transaction_node = txn.getNode() transaction_node = txn.getNode()
invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList()) invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList())
transaction_finished = Packets.AnswerTransactionFinished(ttid, tid) transaction_finished = Packets.AnswerTransactionFinished(ttid, tid)
...@@ -582,7 +583,7 @@ class Application(object): ...@@ -582,7 +583,7 @@ class Application(object):
getByUUID(storage_uuid).getConnection().notify(notify_unlock) getByUUID(storage_uuid).getConnection().notify(notify_unlock)
# remove transaction from manager # remove transaction from manager
self.tm.remove(transaction_node.getUUID(), tid) self.tm.remove(transaction_node.getUUID(), ttid)
self.setLastTransaction(tid) self.setLastTransaction(tid)
def getLastTransaction(self): def getLastTransaction(self):
......
...@@ -62,16 +62,12 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -62,16 +62,12 @@ class StorageServiceHandler(BaseServiceHandler):
p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList()) p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList())
conn.answer(p) conn.answer(p)
def answerInformationLocked(self, conn, tid): def answerInformationLocked(self, conn, ttid):
tm = self.app.tm tm = self.app.tm
if ttid not in tm:
# If the given transaction ID is later than the last TID, the peer raise ProtocolError('Unknown transaction')
# is crazy.
if tid > tm.getLastTID():
raise ProtocolError('TID too big')
# transaction locked on this storage node # transaction locked on this storage node
tm.lock(tid, conn.getUUID()) self.app.tm.lock(ttid, conn.getUUID())
def notifyReplicationDone(self, conn, offset): def notifyReplicationDone(self, conn, offset):
node = self.app.nm.getByUUID(conn.getUUID()) node = self.app.nm.getByUUID(conn.getUUID())
......
...@@ -190,26 +190,27 @@ class TransactionManager(object): ...@@ -190,26 +190,27 @@ class TransactionManager(object):
_next_ttid = 0 _next_ttid = 0
def __init__(self, on_commit): def __init__(self, on_commit):
# tid -> transaction # ttid -> transaction
self._tid_dict = {} self._ttid_dict = {}
# node -> transactions mapping # node -> transactions mapping
self._node_dict = {} self._node_dict = {}
self._last_oid = None self._last_oid = None
self._on_commit = on_commit self._on_commit = on_commit
# queue filled with ttids pointing to transactions with increasing tids
self._queue = [] self._queue = []
def __getitem__(self, tid): def __getitem__(self, ttid):
""" """
Return the transaction object for this TID Return the transaction object for this TID
""" """
# XXX: used by unit tests only # XXX: used by unit tests only
return self._tid_dict[tid] return self._ttid_dict[ttid]
def __contains__(self, tid): def __contains__(self, ttid):
""" """
Returns True if this is a pending transaction Returns True if this is a pending transaction
""" """
return tid in self._tid_dict return ttid in self._ttid_dict
def getNextOIDList(self, num_oids): def getNextOIDList(self, num_oids):
""" Generate a new OID list """ """ Generate a new OID list """
...@@ -306,20 +307,20 @@ class TransactionManager(object): ...@@ -306,20 +307,20 @@ class TransactionManager(object):
Discard all manager content Discard all manager content
This doesn't reset the last TID. This doesn't reset the last TID.
""" """
self._tid_dict = {} self._ttid_dict = {}
self._node_dict = {} self._node_dict = {}
def hasPending(self): def hasPending(self):
""" """
Returns True if some transactions are pending Returns True if some transactions are pending
""" """
return bool(self._tid_dict) return bool(self._ttid_dict)
def getPendingList(self): def getPendingList(self):
""" """
Return the list of pending transaction IDs Return the list of pending transaction IDs
""" """
return self._tid_dict.keys() return [txn.getTID() for txn in self._ttid_dict.values()]
def begin(self, uuid, tid=None): def begin(self, uuid, tid=None):
""" """
...@@ -345,38 +346,41 @@ class TransactionManager(object): ...@@ -345,38 +346,41 @@ class TransactionManager(object):
break break
else: else:
tid = self._nextTID(ttid, divisor) tid = self._nextTID(ttid, divisor)
self._queue.append((node.getUUID(), tid)) self._queue.append((node.getUUID(), ttid))
neo.logging.debug('Finish TXN %s for %s (was %s)', dump(tid), node, dump(ttid)) neo.logging.debug('Finish TXN %s for %s (was %s)', dump(tid), node, dump(ttid))
txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id) txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id)
self._tid_dict[tid] = txn self._ttid_dict[ttid] = txn
self._node_dict.setdefault(node, {})[tid] = txn self._node_dict.setdefault(node, {})[ttid] = txn
return tid return tid
def remove(self, uuid, tid): def remove(self, uuid, ttid):
""" """
Remove a transaction, commited or aborted Remove a transaction, commited or aborted
""" """
try: try:
self._queue.remove((uuid, tid)) # only in case of an import:
self._queue.remove((uuid, ttid))
except ValueError: except ValueError:
# finish might not have been started # finish might not have been started
pass pass
tid_dict = self._tid_dict ttid_dict = self._ttid_dict
if tid in tid_dict: if ttid in ttid_dict:
txn = ttid_dict[ttid]
tid = txn.getTID()
node = txn.getNode()
# ...and tried to finish # ...and tried to finish
node = tid_dict[tid].getNode() del ttid_dict[ttid]
del tid_dict[tid] del self._node_dict[node][ttid]
del self._node_dict[node][tid]
def lock(self, tid, uuid): def lock(self, ttid, uuid):
""" """
Set that a node has locked the transaction. Set that a node has locked the transaction.
If transaction is completely locked, calls function given at If transaction is completely locked, calls function given at
instanciation time. instanciation time.
""" """
assert tid in self._tid_dict, "Transaction not started" assert ttid in self._ttid_dict, "Transaction not started"
txn = self._tid_dict[tid] txn = self._ttid_dict[ttid]
if txn.lock(uuid) and self._queue[0][1] == tid: if txn.lock(uuid) and self._queue[0][1] == ttid:
# all storage are locked and we unlock the commit queue # all storage are locked and we unlock the commit queue
self._unlockPending() self._unlockPending()
...@@ -387,8 +391,8 @@ class TransactionManager(object): ...@@ -387,8 +391,8 @@ class TransactionManager(object):
""" """
unlock = False unlock = False
# iterate over a copy because _unlockPending may alter the dict # iterate over a copy because _unlockPending may alter the dict
for tid, txn in self._tid_dict.items(): for ttid, txn in self._ttid_dict.items():
if txn.forget(uuid) and self._queue[0][1] == tid: if txn.forget(uuid) and self._queue[0][1] == ttid:
unlock = True unlock = True
if unlock: if unlock:
self._unlockPending() self._unlockPending()
...@@ -399,15 +403,15 @@ class TransactionManager(object): ...@@ -399,15 +403,15 @@ class TransactionManager(object):
pop = queue.pop pop = queue.pop
insert = queue.insert insert = queue.insert
on_commit = self._on_commit on_commit = self._on_commit
get = self._tid_dict.get get = self._ttid_dict.get
while queue: while queue:
uuid, tid = pop(0) uuid, ttid = pop(0)
txn = get(tid, None) txn = get(ttid, None)
# _queue can contain un-prepared transactions # _queue can contain un-prepared transactions
if txn is not None and txn.locked(): if txn is not None and txn.locked():
on_commit(tid, txn) on_commit(txn)
else: else:
insert(0, (uuid, tid)) insert(0, (uuid, ttid))
break break
def abortFor(self, node): def abortFor(self, node):
...@@ -423,13 +427,13 @@ class TransactionManager(object): ...@@ -423,13 +427,13 @@ class TransactionManager(object):
if node in self._node_dict: if node in self._node_dict:
# remove transactions # remove transactions
remove = self.remove remove = self.remove
for tid in self._node_dict[node].keys(): for ttid in self._node_dict[node].keys():
remove(uuid, tid) remove(uuid, ttid)
# discard node entry # discard node entry
del self._node_dict[node] del self._node_dict[node]
def log(self): def log(self):
neo.logging.info('Transactions:') neo.logging.info('Transactions:')
for txn in self._tid_dict.itervalues(): for txn in self._ttid_dict.itervalues():
neo.logging.info(' %r', txn) neo.logging.info(' %r', txn)
...@@ -58,7 +58,7 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -58,7 +58,7 @@ class MasterOperationHandler(BaseMasterHandler):
raise ProtocolError('Unknown transaction') raise ProtocolError('Unknown transaction')
self.app.tm.lock(ttid, tid, oid_list) self.app.tm.lock(ttid, tid, oid_list)
if not conn.isClosed(): if not conn.isClosed():
conn.answer(Packets.AnswerInformationLocked(tid)) conn.answer(Packets.AnswerInformationLocked(ttid))
def notifyUnlockInformation(self, conn, ttid): def notifyUnlockInformation(self, conn, ttid):
if not ttid in self.app.tm: if not ttid in self.app.tm:
......
...@@ -127,24 +127,24 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -127,24 +127,24 @@ class MasterClientHandlerTests(NeoUnitTestBase):
], ],
'getPartitions': 2, 'getPartitions': 2,
}) })
service.askBeginTransaction(conn, None) ttid = self.getNextTID()
service.askBeginTransaction(conn, ttid)
oid_list = [] oid_list = []
tid = self.app.tm.getLastTID()
conn = self.getFakeConnection(client_uuid, self.client_address) conn = self.getFakeConnection(client_uuid, self.client_address)
self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn) self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
# No packet sent if storage node is not ready # No packet sent if storage node is not ready
self.assertFalse(self.app.isStorageReady(storage_uuid)) self.assertFalse(self.app.isStorageReady(storage_uuid))
service.askFinishTransaction(conn, tid, oid_list) service.askFinishTransaction(conn, ttid, oid_list)
self.checkNoPacketSent(storage_conn) self.checkNoPacketSent(storage_conn)
self.app.tm.abortFor(self.app.nm.getByUUID(client_uuid)) self.app.tm.abortFor(self.app.nm.getByUUID(client_uuid))
# ...but AskLockInformation is sent if it is ready # ...but AskLockInformation is sent if it is ready
self.app.setStorageReady(storage_uuid) self.app.setStorageReady(storage_uuid)
self.assertTrue(self.app.isStorageReady(storage_uuid)) self.assertTrue(self.app.isStorageReady(storage_uuid))
service.askFinishTransaction(conn, tid, oid_list) service.askFinishTransaction(conn, ttid, oid_list)
self.checkAskLockInformation(storage_conn) self.checkAskLockInformation(storage_conn)
self.assertEquals(len(self.app.tm.getPendingList()), 1) self.assertEquals(len(self.app.tm.getPendingList()), 1)
apptid = self.app.tm.getPendingList()[0] txn = self.app.tm[ttid]
txn = self.app.tm[apptid] self.assertEquals(txn.getTID(), self.app.tm.getPendingList()[0])
self.assertEquals(len(txn.getOIDList()), 0) self.assertEquals(len(txn.getOIDList()), 0)
self.assertEquals(len(txn.getUUIDList()), 1) self.assertEquals(len(txn.getUUIDList()), 1)
......
...@@ -104,15 +104,15 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -104,15 +104,15 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
ttid = self.app.tm.begin(client_1.getUUID()) ttid = self.app.tm.begin(client_1.getUUID())
tid = self.app.tm.prepare(client_1, ttid, 1, oid_list, uuid_list, tid = self.app.tm.prepare(client_1, ttid, 1, oid_list, uuid_list,
msg_id) msg_id)
self.assertTrue(tid in self.app.tm) self.assertTrue(ttid in self.app.tm)
# the first storage acknowledge the lock # the first storage acknowledge the lock
self.service.answerInformationLocked(storage_conn_1, tid) self.service.answerInformationLocked(storage_conn_1, ttid)
self.checkNoPacketSent(client_conn_1) self.checkNoPacketSent(client_conn_1)
self.checkNoPacketSent(client_conn_2) self.checkNoPacketSent(client_conn_2)
self.checkNoPacketSent(storage_conn_1) self.checkNoPacketSent(storage_conn_1)
self.checkNoPacketSent(storage_conn_2) self.checkNoPacketSent(storage_conn_2)
# then the second # then the second
self.service.answerInformationLocked(storage_conn_2, tid) self.service.answerInformationLocked(storage_conn_2, ttid)
self.checkAnswerTransactionFinished(client_conn_1) self.checkAnswerTransactionFinished(client_conn_1)
self.checkInvalidateObjects(client_conn_2) self.checkInvalidateObjects(client_conn_2)
self.checkNotifyUnlockInformation(storage_conn_1) self.checkNotifyUnlockInformation(storage_conn_1)
...@@ -211,7 +211,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -211,7 +211,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
ttid1 = tm.begin(node1.getUUID()) ttid1 = tm.begin(node1.getUUID())
tid1 = tm.prepare(client1, ttid1, 1, oid_list, tid1 = tm.prepare(client1, ttid1, 1, oid_list,
[node1.getUUID(), node2.getUUID()], msg_id_1) [node1.getUUID(), node2.getUUID()], msg_id_1)
tm.lock(tid1, node2.getUUID()) tm.lock(ttid1, node2.getUUID())
self.checkNoPacketSent(cconn1) self.checkNoPacketSent(cconn1)
# Storage 1 dies # Storage 1 dies
node1.setTemporarilyDown() node1.setTemporarilyDown()
...@@ -231,7 +231,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -231,7 +231,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
[node1.getUUID(), node2.getUUID()], msg_id_2) [node1.getUUID(), node2.getUUID()], msg_id_2)
# T2: pending locking answer, client keeps waiting # T2: pending locking answer, client keeps waiting
self.checkNoPacketSent(cconn2, check_notify=False) self.checkNoPacketSent(cconn2, check_notify=False)
tm.remove(node1.getUUID(), tid2) tm.remove(node1.getUUID(), ttid2)
# Transaction 3: 1 storage node involved, which won't die # Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3 msg_id_3 = 3
...@@ -240,7 +240,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -240,7 +240,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
[node2.getUUID(), ], msg_id_3) [node2.getUUID(), ], msg_id_3)
# T3: action not significant to this transacion, so no response # T3: action not significant to this transacion, so no response
self.checkNoPacketSent(cconn3, check_notify=False) self.checkNoPacketSent(cconn3, check_notify=False)
tm.remove(node1.getUUID(), tid3) tm.remove(node1.getUUID(), ttid3)
def test_answerPack(self): def test_answerPack(self):
# Note: incomming status has no meaning here, so it's left to False. # Note: incomming status has no meaning here, so it's left to False.
......
...@@ -79,17 +79,17 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -79,17 +79,17 @@ class testTransactionManager(NeoUnitTestBase):
tid = txnman.prepare(node, ttid, 1, oid_list, uuid_list, msg_id) tid = txnman.prepare(node, ttid, 1, oid_list, uuid_list, msg_id)
self.assertTrue(txnman.hasPending()) self.assertTrue(txnman.hasPending())
self.assertEqual(txnman.getPendingList()[0], tid) self.assertEqual(txnman.getPendingList()[0], tid)
self.assertEqual(txnman[tid].getTID(), tid) txn = txnman[ttid]
txn = txnman[tid] self.assertEqual(txn.getTID(), tid)
self.assertEqual(txn.getUUIDList(), list(uuid_list)) self.assertEqual(txn.getUUIDList(), list(uuid_list))
self.assertEqual(txn.getOIDList(), list(oid_list)) self.assertEqual(txn.getOIDList(), list(oid_list))
# lock nodes # lock nodes
txnman.lock(tid, uuid1) txnman.lock(ttid, uuid1)
self.assertEqual(len(callback.getNamedCalls('__call__')), 0) self.assertEqual(len(callback.getNamedCalls('__call__')), 0)
txnman.lock(tid, uuid2) txnman.lock(ttid, uuid2)
self.assertEqual(len(callback.getNamedCalls('__call__')), 1) self.assertEqual(len(callback.getNamedCalls('__call__')), 1)
# transaction finished # transaction finished
txnman.remove(client_uuid, tid) txnman.remove(client_uuid, ttid)
self.assertEqual(txnman.getPendingList(), []) self.assertEqual(txnman.getPendingList(), [])
def testAbortFor(self): def testAbortFor(self):
...@@ -144,8 +144,8 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -144,8 +144,8 @@ class testTransactionManager(NeoUnitTestBase):
ttid1 = tm.begin(client_uuid) ttid1 = tm.begin(client_uuid)
tid1 = tm.prepare(client1, ttid1, 1, oid_list, tid1 = tm.prepare(client1, ttid1, 1, oid_list,
[storage_1_uuid, storage_2_uuid], msg_id_1) [storage_1_uuid, storage_2_uuid], msg_id_1)
tm.lock(tid1, storage_2_uuid) tm.lock(ttid1, storage_2_uuid)
t1 = tm[tid1] t1 = tm[ttid1]
self.assertFalse(t1.locked()) self.assertFalse(t1.locked())
# Storage 1 dies: # Storage 1 dies:
# t1 is over # t1 is over
...@@ -158,7 +158,7 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -158,7 +158,7 @@ class testTransactionManager(NeoUnitTestBase):
ttid2 = tm.begin(client_uuid) ttid2 = tm.begin(client_uuid)
tid2 = tm.prepare(client2, ttid2, 1, oid_list, tid2 = tm.prepare(client2, ttid2, 1, oid_list,
[storage_1_uuid, storage_2_uuid], msg_id_2) [storage_1_uuid, storage_2_uuid], msg_id_2)
t2 = tm[tid2] t2 = tm[ttid2]
self.assertFalse(t2.locked()) self.assertFalse(t2.locked())
# Storage 1 dies: # Storage 1 dies:
# t2 still waits for storage 2 # t2 still waits for storage 2
...@@ -172,7 +172,7 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -172,7 +172,7 @@ class testTransactionManager(NeoUnitTestBase):
ttid3 = tm.begin(client_uuid) ttid3 = tm.begin(client_uuid)
tid3 = tm.prepare(client3, ttid3, 1, oid_list, [storage_2_uuid, ], tid3 = tm.prepare(client3, ttid3, 1, oid_list, [storage_2_uuid, ],
msg_id_3) msg_id_3)
t3 = tm[tid3] t3 = tm[ttid3]
self.assertFalse(t3.locked()) self.assertFalse(t3.locked())
# Storage 1 dies: # Storage 1 dies:
# t3 doesn't care # t3 doesn't care
...@@ -249,10 +249,10 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -249,10 +249,10 @@ class testTransactionManager(NeoUnitTestBase):
ttid2 = tm.begin(uuid2) ttid2 = tm.begin(uuid2)
tid1 = tm.prepare(node1, ttid1, 1, [], [storage_uuid], 0) tid1 = tm.prepare(node1, ttid1, 1, [], [storage_uuid], 0)
tid2 = tm.prepare(node2, ttid2, 1, [], [storage_uuid], 0) tid2 = tm.prepare(node2, ttid2, 1, [], [storage_uuid], 0)
tm.lock(tid2, storage_uuid) tm.lock(ttid2, storage_uuid)
# txn 2 is still blocked by txn 1 # txn 2 is still blocked by txn 1
self.assertEqual(len(callback.getNamedCalls('__call__')), 0) self.assertEqual(len(callback.getNamedCalls('__call__')), 0)
tm.lock(tid1, storage_uuid) tm.lock(ttid1, storage_uuid)
# both transactions are unlocked when txn 1 is fully locked # both transactions are unlocked when txn 1 is fully locked
self.assertEqual(len(callback.getNamedCalls('__call__')), 2) self.assertEqual(len(callback.getNamedCalls('__call__')), 2)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment