Commit 7db1fad6 authored by Vincent Pelletier's avatar Vincent Pelletier

Release commit lock if client disconnects after acquiring it.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2535 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 76214fbb
...@@ -52,7 +52,8 @@ class ClientServiceHandler(MasterHandler): ...@@ -52,7 +52,8 @@ class ClientServiceHandler(MasterHandler):
A client request a TID, nothing is kept about it until the finish. A client request a TID, nothing is kept about it until the finish.
""" """
try: try:
conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin(tid))) conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin(
conn.getUUID(), tid)))
except DelayedError: except DelayedError:
self.app.queueEvent(self.askBeginTransaction, conn, tid) self.app.queueEvent(self.askBeginTransaction, conn, tid)
......
...@@ -325,7 +325,7 @@ class TransactionManager(object): ...@@ -325,7 +325,7 @@ class TransactionManager(object):
""" """
return self._tid_dict.keys() return self._tid_dict.keys()
def begin(self, tid=None): def begin(self, uuid, tid=None):
""" """
Generate a new TID Generate a new TID
""" """
...@@ -336,7 +336,7 @@ class TransactionManager(object): ...@@ -336,7 +336,7 @@ class TransactionManager(object):
# TID requested, take commit lock immediately # TID requested, take commit lock immediately
if self._locked is not None: if self._locked is not None:
raise DelayedError() raise DelayedError()
self._locked = tid self._locked = (uuid, tid)
return tid return tid
def prepare(self, node, ttid, divisor, oid_list, uuid_list, msg_id): def prepare(self, node, ttid, divisor, oid_list, uuid_list, msg_id):
...@@ -344,7 +344,9 @@ class TransactionManager(object): ...@@ -344,7 +344,9 @@ class TransactionManager(object):
Prepare a transaction to be finished Prepare a transaction to be finished
""" """
locked = self._locked locked = self._locked
if locked == ttid: uuid = node.getUUID()
if locked is not None and locked[1] == ttid:
assert locked[0] == uuid
# Transaction requested some TID upon begin, and it owns the commit # Transaction requested some TID upon begin, and it owns the commit
# lock since then. # lock since then.
tid = ttid tid = ttid
...@@ -353,7 +355,7 @@ class TransactionManager(object): ...@@ -353,7 +355,7 @@ class TransactionManager(object):
if locked is not None: if locked is not None:
raise DelayedError() raise DelayedError()
tid = self._nextTID(ttid, divisor) tid = self._nextTID(ttid, divisor)
self._locked = tid self._locked = (uuid, tid)
self.setLastTID(tid) self.setLastTID(tid)
txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id) txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id)
...@@ -365,7 +367,8 @@ class TransactionManager(object): ...@@ -365,7 +367,8 @@ class TransactionManager(object):
""" """
Remove a transaction, commited or aborted Remove a transaction, commited or aborted
""" """
if tid == self._locked: locked = self._locked
if locked is not None and tid == locked[1]:
# If TID has the lock, release it. # If TID has the lock, release it.
# It might legitimately not have the lock (ex: a transaction # It might legitimately not have the lock (ex: a transaction
# aborting, which didn't request a TID upon begin) # aborting, which didn't request a TID upon begin)
...@@ -389,6 +392,9 @@ class TransactionManager(object): ...@@ -389,6 +392,9 @@ class TransactionManager(object):
""" """
Abort pending transactions initiated by a node Abort pending transactions initiated by a node
""" """
locked = self._locked
if locked is not None and locked[0] == node.getUUID():
self._locked = None
# nothing to do # nothing to do
if node not in self._node_dict: if node not in self._node_dict:
return return
......
...@@ -78,7 +78,7 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -78,7 +78,7 @@ class MasterClientHandlerTests(NeoUnitTestBase):
service.askBeginTransaction(conn, None) service.askBeginTransaction(conn, None)
calls = tm.mockGetNamedCalls('begin') calls = tm.mockGetNamedCalls('begin')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(None) calls[0].checkArgs(client_uuid, None)
# Client asks for a TID # Client asks for a TID
self.app.tm = tm_org self.app.tm = tm_org
service.askBeginTransaction(conn, tid1) service.askBeginTransaction(conn, tid1)
......
...@@ -101,7 +101,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -101,7 +101,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
oid_list = self.getOID(), self.getOID() oid_list = self.getOID(), self.getOID()
msg_id = 1 msg_id = 1
# register a transaction # register a transaction
ttid = self.app.tm.begin() ttid = self.app.tm.begin(client_1.getUUID())
tid = self.app.tm.prepare(client_1, ttid, 1, oid_list, uuid_list, tid = self.app.tm.prepare(client_1, ttid, 1, oid_list, uuid_list,
msg_id) msg_id)
self.assertTrue(tid in self.app.tm) self.assertTrue(tid in self.app.tm)
...@@ -208,7 +208,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -208,7 +208,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
# Transaction 1: 2 storage nodes involved, one will die and the other # Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock # already answered node lock
msg_id_1 = 1 msg_id_1 = 1
ttid1 = tm.begin() ttid1 = tm.begin(node1.getUUID())
tid1 = tm.prepare(client1, ttid1, 1, oid_list, tid1 = tm.prepare(client1, ttid1, 1, oid_list,
[node1.getUUID(), node2.getUUID()], msg_id_1) [node1.getUUID(), node2.getUUID()], msg_id_1)
tm.lock(tid1, node2.getUUID()) tm.lock(tid1, node2.getUUID())
...@@ -226,7 +226,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -226,7 +226,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
# Transaction 2: 2 storage nodes involved, one will die # Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2 msg_id_2 = 2
ttid2 = tm.begin() ttid2 = tm.begin(node1.getUUID())
tid2 = tm.prepare(client2, ttid2, 1, oid_list, tid2 = tm.prepare(client2, ttid2, 1, oid_list,
[node1.getUUID(), node2.getUUID()], msg_id_2) [node1.getUUID(), node2.getUUID()], msg_id_2)
# T2: pending locking answer, client keeps waiting # T2: pending locking answer, client keeps waiting
...@@ -235,7 +235,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -235,7 +235,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
# Transaction 3: 1 storage node involved, which won't die # Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3 msg_id_3 = 3
ttid3 = tm.begin() ttid3 = tm.begin(node1.getUUID())
tid3 = tm.prepare(client3, ttid3, 1, oid_list, tid3 = tm.prepare(client3, ttid3, 1, oid_list,
[node2.getUUID(), ], msg_id_3) [node2.getUUID(), ], msg_id_3)
# T3: action not significant to this transacion, so no response # T3: action not significant to this transacion, so no response
......
...@@ -59,12 +59,13 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -59,12 +59,13 @@ class testTransactionManager(NeoUnitTestBase):
msg_id = 1 msg_id = 1
oid_list = (oid1, oid2) = self.makeOID(1), self.makeOID(2) oid_list = (oid1, oid2) = self.makeOID(1), self.makeOID(2)
uuid_list = (uuid1, uuid2) = self.makeUUID(1), self.makeUUID(2) uuid_list = (uuid1, uuid2) = self.makeUUID(1), self.makeUUID(2)
client_uuid = self.makeUUID(3)
# create transaction manager # create transaction manager
txnman = TransactionManager() txnman = TransactionManager()
self.assertFalse(txnman.hasPending()) self.assertFalse(txnman.hasPending())
self.assertEqual(txnman.getPendingList(), []) self.assertEqual(txnman.getPendingList(), [])
# begin the transaction # begin the transaction
ttid = txnman.begin() ttid = txnman.begin(client_uuid)
self.assertTrue(ttid is not None) self.assertTrue(ttid is not None)
self.assertFalse(txnman.hasPending()) self.assertFalse(txnman.hasPending())
self.assertEqual(len(txnman.getPendingList()), 0) self.assertEqual(len(txnman.getPendingList()), 0)
...@@ -89,10 +90,11 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -89,10 +90,11 @@ class testTransactionManager(NeoUnitTestBase):
oid_list = [self.makeOID(1), ] oid_list = [self.makeOID(1), ]
storage_1_uuid = self.makeUUID(1) storage_1_uuid = self.makeUUID(1)
storage_2_uuid = self.makeUUID(2) storage_2_uuid = self.makeUUID(2)
client_uuid = self.makeUUID(3)
txnman = TransactionManager() txnman = TransactionManager()
# register 4 transactions made by two nodes # register 4 transactions made by two nodes
self.assertEqual(txnman.getPendingList(), []) self.assertEqual(txnman.getPendingList(), [])
ttid1 = txnman.begin() ttid1 = txnman.begin(client_uuid)
tid1 = txnman.prepare(node1, ttid1, 1, oid_list, [storage_1_uuid], 1) tid1 = txnman.prepare(node1, ttid1, 1, oid_list, [storage_1_uuid], 1)
self.assertEqual(txnman.getPendingList(), [tid1]) self.assertEqual(txnman.getPendingList(), [tid1])
# abort transactions of another node, transaction stays # abort transactions of another node, transaction stays
...@@ -103,7 +105,7 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -103,7 +105,7 @@ class testTransactionManager(NeoUnitTestBase):
self.assertEqual(txnman.getPendingList(), []) self.assertEqual(txnman.getPendingList(), [])
self.assertFalse(txnman.hasPending()) self.assertFalse(txnman.hasPending())
# ...and the lock is available # ...and the lock is available
txnman.begin(self.getNextTID()) txnman.begin(client_uuid, self.getNextTID())
def test_getNextOIDList(self): def test_getNextOIDList(self):
txnman = TransactionManager() txnman = TransactionManager()
...@@ -125,12 +127,13 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -125,12 +127,13 @@ class testTransactionManager(NeoUnitTestBase):
storage_1_uuid = self.makeUUID(1) storage_1_uuid = self.makeUUID(1)
storage_2_uuid = self.makeUUID(2) storage_2_uuid = self.makeUUID(2)
oid_list = [self.makeOID(1), ] oid_list = [self.makeOID(1), ]
client_uuid = self.makeUUID(3)
tm = TransactionManager() tm = TransactionManager()
# Transaction 1: 2 storage nodes involved, one will die and the other # Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock # already answered node lock
msg_id_1 = 1 msg_id_1 = 1
ttid1 = tm.begin() ttid1 = tm.begin(client_uuid)
tid1 = tm.prepare(client1, ttid1, 1, oid_list, tid1 = tm.prepare(client1, ttid1, 1, oid_list,
[storage_1_uuid, storage_2_uuid], msg_id_1) [storage_1_uuid, storage_2_uuid], msg_id_1)
tm.lock(tid1, storage_2_uuid) tm.lock(tid1, storage_2_uuid)
...@@ -144,7 +147,7 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -144,7 +147,7 @@ class testTransactionManager(NeoUnitTestBase):
# Transaction 2: 2 storage nodes involved, one will die # Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2 msg_id_2 = 2
ttid2 = tm.begin() ttid2 = tm.begin(client_uuid)
tid2 = tm.prepare(client2, ttid2, 1, oid_list, tid2 = tm.prepare(client2, ttid2, 1, oid_list,
[storage_1_uuid, storage_2_uuid], msg_id_2) [storage_1_uuid, storage_2_uuid], msg_id_2)
t2 = tm[tid2] t2 = tm[tid2]
...@@ -158,7 +161,7 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -158,7 +161,7 @@ class testTransactionManager(NeoUnitTestBase):
# Transaction 3: 1 storage node involved, which won't die # Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3 msg_id_3 = 3
ttid3 = tm.begin() ttid3 = tm.begin(client_uuid)
tid3 = tm.prepare(client3, ttid3, 1, oid_list, [storage_2_uuid, ], tid3 = tm.prepare(client3, ttid3, 1, oid_list, [storage_2_uuid, ],
msg_id_3) msg_id_3)
t3 = tm[tid3] t3 = tm[tid3]
...@@ -202,22 +205,36 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -202,22 +205,36 @@ class testTransactionManager(NeoUnitTestBase):
strictly increasing order. strictly increasing order.
Note: this implementation might change later, to allow more paralelism. Note: this implementation might change later, to allow more paralelism.
""" """
client_uuid = self.makeUUID(3)
tm = TransactionManager() tm = TransactionManager()
# With a requested TID, lock spans from begin to remove # With a requested TID, lock spans from begin to remove
ttid1 = self.getNextTID() ttid1 = self.getNextTID()
ttid2 = self.getNextTID() ttid2 = self.getNextTID()
tid1 = tm.begin(ttid1) tid1 = tm.begin(client_uuid, ttid1)
self.assertEqual(tid1, ttid1) self.assertEqual(tid1, ttid1)
self.assertRaises(DelayedError, tm.begin, ttid2) self.assertRaises(DelayedError, tm.begin, client_uuid, ttid2)
tm.remove(tid1) tm.remove(tid1)
tm.remove(tm.begin(ttid2)) tm.remove(tm.begin(client_uuid, ttid2))
# Without a requested TID, lock spans from prepare to remove only # Without a requested TID, lock spans from prepare to remove only
ttid3 = tm.begin() ttid3 = tm.begin(client_uuid)
ttid4 = tm.begin() # Doesn't raise ttid4 = tm.begin(client_uuid) # Doesn't raise
tid4 = tm.prepare(None, ttid4, 1, [], [], 0) node = Mock({'getUUID': client_uuid, '__hash__': 0})
self.assertRaises(DelayedError, tm.prepare, None, ttid3, 1, [], [], 0) tid4 = tm.prepare(node, ttid4, 1, [], [], 0)
self.assertRaises(DelayedError, tm.prepare, node, ttid3, 1, [], [], 0)
tm.remove(tid4) tm.remove(tid4)
tm.prepare(None, ttid3, 1, [], [], 0) tm.prepare(node, ttid3, 1, [], [], 0)
def testClientDisconectsAfterBegin(self):
client1_uuid = self.makeUUID(1)
client2_uuid = self.makeUUID(2)
tm = TransactionManager()
tid1 = self.getNextTID()
tid2 = self.getNextTID()
tm.begin(client1_uuid, tid1)
self.assertRaises(DelayedError, tm.begin, client2_uuid, tid2)
node1 = Mock({'getUUID': client1_uuid, '__hash__': 0})
tm.abortFor(node1)
tm.begin(client2_uuid, tid2)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment