Commit efa808f3 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Serialize only the last part of the 2PC.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2564 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent e0e81079
...@@ -45,8 +45,6 @@ class Application(object): ...@@ -45,8 +45,6 @@ class Application(object):
last_transaction = ZERO_TID last_transaction = ZERO_TID
def __init__(self, config): def __init__(self, config):
self._queued_events = []
# always use default connector for now # always use default connector for now
self.connector_handler = getConnectorHandler() self.connector_handler = getConnectorHandler()
...@@ -584,9 +582,8 @@ class Application(object): ...@@ -584,9 +582,8 @@ class Application(object):
getByUUID(storage_uuid).getConnection().notify(notify_unlock) getByUUID(storage_uuid).getConnection().notify(notify_unlock)
# remove transaction from manager # remove transaction from manager
self.tm.remove(tid) self.tm.remove(transaction_node.getUUID(), tid)
self.setLastTransaction(tid) self.setLastTransaction(tid)
self.executeQueuedEvent()
def getLastTransaction(self): def getLastTransaction(self):
return self.last_transaction return self.last_transaction
...@@ -605,17 +602,3 @@ class Application(object): ...@@ -605,17 +602,3 @@ class Application(object):
def isStorageReady(self, uuid): def isStorageReady(self, uuid):
return uuid in self.storage_readiness return uuid in self.storage_readiness
def queueEvent(self, func, conn, *args, **kw):
msg_id = conn.getPeerId()
self._queued_events.append((func, msg_id, conn, args, kw))
def executeQueuedEvent(self):
queue = self._queued_events
while queue:
func, msg_id, conn, args, kw = queue.pop(0)
if conn.isAborted() or conn.isClosed():
continue
conn.setPeerId(msg_id)
func(conn, *args, **kw)
break
...@@ -51,11 +51,8 @@ class ClientServiceHandler(MasterHandler): ...@@ -51,11 +51,8 @@ class ClientServiceHandler(MasterHandler):
""" """
A client request a TID, nothing is kept about it until the finish. A client request a TID, nothing is kept about it until the finish.
""" """
try: conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin(
conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin( conn.getUUID(), tid)))
conn.getUUID(), tid)))
except DelayedError:
self.app.queueEvent(self.askBeginTransaction, conn, tid)
def askNewOIDs(self, conn, num_oids): def askNewOIDs(self, conn, num_oids):
app = self.app app = self.app
...@@ -88,13 +85,8 @@ class ClientServiceHandler(MasterHandler): ...@@ -88,13 +85,8 @@ class ClientServiceHandler(MasterHandler):
partitions = app.pt.getPartitions() partitions = app.pt.getPartitions()
peer_id = conn.getPeerId() peer_id = conn.getPeerId()
node = app.nm.getByUUID(conn.getUUID()) node = app.nm.getByUUID(conn.getUUID())
try: tid = app.tm.prepare(node, ttid, partitions, oid_list,
tid = app.tm.prepare(node, ttid, partitions, oid_list, usable_uuid_set, peer_id)
usable_uuid_set, peer_id)
except DelayedError:
app.queueEvent(self.askFinishTransaction, conn, ttid,
oid_list)
return
# check if greater and foreign OID was stored # check if greater and foreign OID was stored
if app.tm.updateLastOID(oid_list): if app.tm.updateLastOID(oid_list):
...@@ -124,7 +116,5 @@ class ClientServiceHandler(MasterHandler): ...@@ -124,7 +116,5 @@ class ClientServiceHandler(MasterHandler):
self.app.getLastTransaction())) self.app.getLastTransaction()))
def abortTransaction(self, conn, tid): def abortTransaction(self, conn, tid):
app = self.app self.app.tm.remove(conn.getUUID(), tid)
app.tm.remove(tid)
app.executeQueuedEvent()
...@@ -187,10 +187,6 @@ class TransactionManager(object): ...@@ -187,10 +187,6 @@ class TransactionManager(object):
Manage current transactions Manage current transactions
""" """
_last_tid = ZERO_TID _last_tid = ZERO_TID
# Transaction serialisation
# We don't need to use a real lock, as we are mono-threaded.
_locked = None
_next_ttid = 0 _next_ttid = 0
def __init__(self, on_commit): def __init__(self, on_commit):
...@@ -200,6 +196,7 @@ class TransactionManager(object): ...@@ -200,6 +196,7 @@ class TransactionManager(object):
self._node_dict = {} self._node_dict = {}
self._last_oid = None self._last_oid = None
self._on_commit = on_commit self._on_commit = on_commit
self._queue = []
def __getitem__(self, tid): def __getitem__(self, tid):
""" """
...@@ -336,46 +333,36 @@ class TransactionManager(object): ...@@ -336,46 +333,36 @@ class TransactionManager(object):
# No TID requested, generate a temporary one # No TID requested, generate a temporary one
tid = self.getTTID() tid = self.getTTID()
else: else:
# TID requested, take commit lock immediately self._queue.append((uuid, tid))
if self._locked is not None:
raise DelayedError()
self._locked = (uuid, tid)
return tid return tid
def prepare(self, node, ttid, divisor, oid_list, uuid_list, msg_id): def prepare(self, node, ttid, divisor, oid_list, uuid_list, msg_id):
""" """
Prepare a transaction to be finished Prepare a transaction to be finished
""" """
locked = self._locked # XXX: not efficient but the list should be often small
uuid = node.getUUID() for _, tid in self._queue:
if locked is not None and locked[1] == ttid: if ttid == tid:
assert locked[0] == uuid break
# Transaction requested some TID upon begin, and it owns the commit
# lock since then.
tid = ttid
else: else:
# Otherwise, acquire lock and allocate a new TID.
if locked is not None:
raise DelayedError()
tid = self._nextTID(ttid, divisor) tid = self._nextTID(ttid, divisor)
self._locked = (uuid, tid) self._queue.append((node.getUUID(), tid))
self.setLastTID(tid) self.setLastTID(tid)
neo.logging.debug('Finish TXN %s for %s (was %s)', dump(tid), node, dump(ttid))
txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id) txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id)
self._tid_dict[tid] = txn self._tid_dict[tid] = txn
self._node_dict.setdefault(node, {})[tid] = txn self._node_dict.setdefault(node, {})[tid] = txn
return tid return tid
def remove(self, tid): def remove(self, uuid, tid):
""" """
Remove a transaction, commited or aborted Remove a transaction, commited or aborted
""" """
locked = self._locked try:
if locked is not None and tid == locked[1]: self._queue.remove((uuid, tid))
# If TID has the lock, release it. except ValueError:
# It might legitimately not have the lock (ex: a transaction # finish might not have been started
# aborting, which didn't request a TID upon begin) pass
self._locked = None
tid_dict = self._tid_dict tid_dict = self._tid_dict
if tid in tid_dict: if tid in tid_dict:
# ...and tried to finish # ...and tried to finish
...@@ -392,7 +379,7 @@ class TransactionManager(object): ...@@ -392,7 +379,7 @@ class TransactionManager(object):
txn = self._tid_dict[tid] txn = self._tid_dict[tid]
if txn.lock(uuid): if txn.lock(uuid):
# all storage are locked # all storage are locked
self._on_commit(tid, txn) self._unlockPending()
def forget(self, uuid): def forget(self, uuid):
""" """
...@@ -401,22 +388,37 @@ class TransactionManager(object): ...@@ -401,22 +388,37 @@ class TransactionManager(object):
""" """
for tid, txn in self._tid_dict.items(): for tid, txn in self._tid_dict.items():
if txn.forget(uuid): if txn.forget(uuid):
self._unlockPending()
def _unlockPending(self):
# unlock pending transactions
while self._queue:
tid = self._queue[0][1]
# _queue can contain un-prepared transactions
txn = self._tid_dict.get(tid, None)
if txn is not None and txn.locked():
self._queue.pop()
self._on_commit(tid, txn) self._on_commit(tid, txn)
else:
break
def abortFor(self, node): def abortFor(self, node):
""" """
Abort pending transactions initiated by a node Abort pending transactions initiated by a node
""" """
locked = self._locked neo.logging.debug('Abort for %s', node)
if locked is not None and locked[0] == node.getUUID():
self._locked = None
# nothing to do # nothing to do
if node not in self._node_dict: if node not in self._node_dict:
return return
# remove transactions # remove transactions
uuid = node.getUUID()
remove = self.remove remove = self.remove
for tid in self._node_dict[node].keys(): for tid in self._node_dict[node].keys():
remove(tid) remove(uuid, tid)
# the code below is usefull only during an import
for nuuid, ntid in list(self._queue):
if nuuid == uuid:
self._queue.remove((uuid, tid))
# discard node entry # discard node entry
del self._node_dict[node] del self._node_dict[node]
......
...@@ -82,16 +82,6 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -82,16 +82,6 @@ class MasterClientHandlerTests(NeoUnitTestBase):
# Client asks for a TID # Client asks for a TID
self.app.tm = tm_org self.app.tm = tm_org
service.askBeginTransaction(conn, tid1) service.askBeginTransaction(conn, tid1)
# If asking again for a TID, call is queued
call_marker = []
def queueEvent(*args, **kw):
call_marker.append((args, kw))
self.app.queueEvent = queueEvent
service.askBeginTransaction(conn, tid2)
self.assertEqual(len(call_marker), 1)
args, kw = call_marker[0]
self.assertEqual(kw, {})
self.assertEqual(args, (service.askBeginTransaction, conn, tid2))
def test_08_askNewOIDs(self): def test_08_askNewOIDs(self):
service = self.service service = self.service
......
...@@ -231,7 +231,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -231,7 +231,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
[node1.getUUID(), node2.getUUID()], msg_id_2) [node1.getUUID(), node2.getUUID()], msg_id_2)
# T2: pending locking answer, client keeps waiting # T2: pending locking answer, client keeps waiting
self.checkNoPacketSent(cconn2, check_notify=False) self.checkNoPacketSent(cconn2, check_notify=False)
tm.remove(tid2) tm.remove(node1.getUUID(), tid2)
# Transaction 3: 1 storage node involved, which won't die # Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3 msg_id_3 = 3
...@@ -240,7 +240,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -240,7 +240,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
[node2.getUUID(), ], msg_id_3) [node2.getUUID(), ], msg_id_3)
# T3: action not significant to this transacion, so no response # T3: action not significant to this transacion, so no response
self.checkNoPacketSent(cconn3, check_notify=False) self.checkNoPacketSent(cconn3, check_notify=False)
tm.remove(tid3) tm.remove(node1.getUUID(), tid3)
def test_answerPack(self): def test_answerPack(self):
# Note: incomming status has no meaning here, so it's left to False. # Note: incomming status has no meaning here, so it's left to False.
......
...@@ -84,7 +84,7 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -84,7 +84,7 @@ class testTransactionManager(NeoUnitTestBase):
txnman.lock(tid, uuid2) txnman.lock(tid, uuid2)
self.assertEqual(len(callback.getNamedCalls('__call__')), 1) self.assertEqual(len(callback.getNamedCalls('__call__')), 1)
# transaction finished # transaction finished
txnman.remove(tid) txnman.remove(client_uuid, tid)
self.assertEqual(txnman.getPendingList(), []) self.assertEqual(txnman.getPendingList(), [])
def testAbortFor(self): def testAbortFor(self):
...@@ -108,7 +108,7 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -108,7 +108,7 @@ class testTransactionManager(NeoUnitTestBase):
self.assertEqual(txnman.getPendingList(), []) self.assertEqual(txnman.getPendingList(), [])
self.assertFalse(txnman.hasPending()) self.assertFalse(txnman.hasPending())
# ...and the lock is available # ...and the lock is available
txnman.begin(client_uuid, self.getNextTID()) txnman.begin(self.getNextTID())
def test_getNextOIDList(self): def test_getNextOIDList(self):
txnman = TransactionManager(lambda tid, txn: None) txnman = TransactionManager(lambda tid, txn: None)
...@@ -146,7 +146,7 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -146,7 +146,7 @@ class testTransactionManager(NeoUnitTestBase):
# t1 is over # t1 is over
self.assertTrue(t1.forget(storage_1_uuid)) self.assertTrue(t1.forget(storage_1_uuid))
self.assertEqual(t1.getUUIDList(), [storage_2_uuid]) self.assertEqual(t1.getUUIDList(), [storage_2_uuid])
tm.remove(tid1) tm.remove(client_uuid, tid1)
# Transaction 2: 2 storage nodes involved, one will die # Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2 msg_id_2 = 2
...@@ -160,7 +160,7 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -160,7 +160,7 @@ class testTransactionManager(NeoUnitTestBase):
self.assertFalse(t2.forget(storage_1_uuid)) self.assertFalse(t2.forget(storage_1_uuid))
self.assertEqual(t2.getUUIDList(), [storage_2_uuid]) self.assertEqual(t2.getUUIDList(), [storage_2_uuid])
self.assertTrue(t2.lock(storage_2_uuid)) self.assertTrue(t2.lock(storage_2_uuid))
tm.remove(tid2) tm.remove(client_uuid, tid2)
# Transaction 3: 1 storage node involved, which won't die # Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3 msg_id_3 = 3
...@@ -174,7 +174,7 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -174,7 +174,7 @@ class testTransactionManager(NeoUnitTestBase):
self.assertFalse(t3.forget(storage_1_uuid)) self.assertFalse(t3.forget(storage_1_uuid))
self.assertEqual(t3.getUUIDList(), [storage_2_uuid]) self.assertEqual(t3.getUUIDList(), [storage_2_uuid])
self.assertTrue(t3.lock(storage_2_uuid)) self.assertTrue(t3.lock(storage_2_uuid))
tm.remove(tid3) tm.remove(client_uuid, tid3)
def testTIDUtils(self): def testTIDUtils(self):
""" """
...@@ -215,16 +215,13 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -215,16 +215,13 @@ class testTransactionManager(NeoUnitTestBase):
ttid2 = self.getNextTID() ttid2 = self.getNextTID()
tid1 = tm.begin(client_uuid, ttid1) tid1 = tm.begin(client_uuid, ttid1)
self.assertEqual(tid1, ttid1) self.assertEqual(tid1, ttid1)
self.assertRaises(DelayedError, tm.begin, client_uuid, ttid2) tm.remove(client_uuid, tid1)
tm.remove(tid1)
tm.remove(tm.begin(client_uuid, ttid2))
# Without a requested TID, lock spans from prepare to remove only # Without a requested TID, lock spans from prepare to remove only
ttid3 = tm.begin(client_uuid) ttid3 = tm.begin(client_uuid)
ttid4 = tm.begin(client_uuid) # Doesn't raise ttid4 = tm.begin(client_uuid) # Doesn't raise
node = Mock({'getUUID': client_uuid, '__hash__': 0}) node = Mock({'getUUID': client_uuid, '__hash__': 0})
tid4 = tm.prepare(node, ttid4, 1, [], [], 0) tid4 = tm.prepare(node, ttid4, 1, [], [], 0)
self.assertRaises(DelayedError, tm.prepare, node, ttid3, 1, [], [], 0) tm.remove(client_uuid, tid4)
tm.remove(tid4)
tm.prepare(node, ttid3, 1, [], [], 0) tm.prepare(node, ttid3, 1, [], [], 0)
def testClientDisconectsAfterBegin(self): def testClientDisconectsAfterBegin(self):
...@@ -233,11 +230,10 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -233,11 +230,10 @@ class testTransactionManager(NeoUnitTestBase):
tm = TransactionManager(lambda tid, txn: None) tm = TransactionManager(lambda tid, txn: None)
tid1 = self.getNextTID() tid1 = self.getNextTID()
tid2 = self.getNextTID() tid2 = self.getNextTID()
tm.begin(client1_uuid, tid1) tm.begin(tid1)
self.assertRaises(DelayedError, tm.begin, client2_uuid, tid2)
node1 = Mock({'getUUID': client1_uuid, '__hash__': 0}) node1 = Mock({'getUUID': client1_uuid, '__hash__': 0})
tm.abortFor(node1) tm.abortFor(node1)
tm.begin(client2_uuid, tid2) tm.begin(tid2)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment