Commit 6b799777 authored by Vincent Pelletier's avatar Vincent Pelletier

Serialise the whole 2PC at cluster level.

This ensures invalidations are sent in strict ascending TID values order.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2530 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 6adc92fb
......@@ -601,11 +601,10 @@ class Application(object):
if self.local_var.txn is not None:
raise NeoException, 'local_var is not clean in tpc_begin'
# use the given TID or request a new one to the master
self.local_var.tid = tid
if tid is None:
self._askPrimary(Packets.AskBeginTransaction())
if self.local_var.tid is None:
raise NEOStorageError('tpc_begin failed')
self._askPrimary(Packets.AskBeginTransaction(tid))
if self.local_var.tid is None:
raise NEOStorageError('tpc_begin failed')
assert tid in (None, self.local_var.tid), (tid, self.local_var.tid)
self.local_var.txn = transaction
@profiler_decorator
......@@ -837,6 +836,7 @@ class Application(object):
except:
neo.logging.error('Exception in tpc_abort while notifying ' \
'storage node %r of abortion, ignoring.', conn, exc_info=1)
self._getMasterConnection().notify(p)
# Just wait for responses to arrive. If any leads to an exception,
# log it and continue: we *must* eat all answers to not disturb the
......
......@@ -196,7 +196,7 @@ class EventHandler(object):
def commitTransaction(self, conn, tid):
raise UnexpectedPacketError
def askBeginTransaction(self, conn):
def askBeginTransaction(self, conn, tid):
raise UnexpectedPacketError
def answerBeginTransaction(self, conn, tid):
......
......@@ -45,6 +45,7 @@ class Application(object):
last_transaction = ZERO_TID
def __init__(self, config):
self._queued_events = []
# always use default connector for now
self.connector_handler = getConnectorHandler()
......@@ -578,3 +579,17 @@ class Application(object):
def isStorageReady(self, uuid):
return uuid in self.storage_readiness
def queueEvent(self, func, conn, *args, **kw):
msg_id = conn.getPeerId()
self._queued_events.append((func, msg_id, conn, args, kw))
def executeQueuedEvent(self):
queue = self._queued_events
while queue:
func, msg_id, conn, args, kw = queue.pop(0)
if conn.isAborted() or conn.isClosed():
continue
conn.setPeerId(msg_id)
func(conn, *args, **kw)
break
......@@ -20,7 +20,7 @@ import neo
from neo.protocol import NodeStates, Packets, ProtocolError
from neo.master.handlers import MasterHandler
from neo.util import dump
from neo.master.transactions import DelayedError
class ClientServiceHandler(MasterHandler):
""" Handler dedicated to client during service state """
......@@ -47,11 +47,14 @@ class ClientServiceHandler(MasterHandler):
conn.notify(Packets.NotifyNodeInformation(node_list))
conn.answer(Packets.AnswerNodeInformation())
def askBeginTransaction(self, conn):
def askBeginTransaction(self, conn, tid):
"""
A client request a TID, nothing is kept about it until the finish.
"""
conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin()))
try:
conn.answer(Packets.AnswerBeginTransaction(self.app.tm.begin(tid)))
except DelayedError:
self.app.queueEvent(self.askBeginTransaction, conn, tid)
def askNewOIDs(self, conn, num_oids):
app = self.app
......@@ -108,3 +111,8 @@ class ClientServiceHandler(MasterHandler):
conn.answer(Packets.AnswerLastTransaction(
self.app.getLastTransaction()))
def abortTransaction(self, conn, tid):
app = self.app
app.tm.remove(tid)
app.executeQueuedEvent()
......@@ -107,6 +107,7 @@ class StorageServiceHandler(BaseServiceHandler):
# remove transaction from manager
tm.remove(tid)
app.setLastTransaction(tid)
app.executeQueuedEvent()
def notifyReplicationDone(self, conn, offset):
node = self.app.nm.getByUUID(conn.getUUID())
......
......@@ -84,6 +84,9 @@ def addTID(ptid, offset):
higher = (d.year, d.month, d.day, d.hour, d.minute)
return packTID((higher, lower))
class DelayedError(Exception):
pass
class Transaction(object):
"""
A pending transaction
......@@ -177,6 +180,9 @@ class TransactionManager(object):
Manage current transactions
"""
_last_tid = ZERO_TID
# Transaction serialisation
# We don't need to use a real lock, as we are mono-threaded.
_locked = None
def __init__(self):
# tid -> transaction
......@@ -272,11 +278,16 @@ class TransactionManager(object):
"""
return self._tid_dict.keys()
def begin(self):
def begin(self, tid=None):
"""
Generate a new TID
"""
return self._nextTID()
if self._locked is not None:
raise DelayedError()
if tid is None:
tid = self._nextTID()
self._locked = tid
return tid
def prepare(self, node, tid, oid_list, uuid_list, msg_id):
"""
......@@ -291,9 +302,14 @@ class TransactionManager(object):
"""
Remove a transaction, commited or aborted
"""
node = self._tid_dict[tid].getNode()
del self._tid_dict[tid]
del self._node_dict[node][tid]
assert self._locked == tid, (self._locked, tid)
self._locked = None
tid_dict = self._tid_dict
if tid in tid_dict:
# ...and tried to finish
node = tid_dict[tid].getNode()
del tid_dict[tid]
del self._node_dict[node][tid]
def lock(self, tid, uuid):
"""
......
......@@ -742,6 +742,11 @@ class AskBeginTransaction(Packet):
"""
Ask to begin a new transaction. C -> PM.
"""
def _encode(self, tid):
return _encodeTID(tid)
def _decode(self, body):
return (_decodeTID(unpack('8s', body)[0]), )
class AnswerBeginTransaction(Packet):
"""
......
......@@ -70,7 +70,7 @@ class MasterClientHandlerTests(NeoUnitTestBase):
# client call it
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
service.askBeginTransaction(conn)
service.askBeginTransaction(conn, None)
self.assertTrue(ltid < self.app.tm.getLastTID())
def test_08_askNewOIDs(self):
......@@ -102,7 +102,7 @@ class MasterClientHandlerTests(NeoUnitTestBase):
'getPartition': 0,
'getCellList': [Mock({'getUUID': storage_uuid})],
})
service.askBeginTransaction(conn)
service.askBeginTransaction(conn, None)
oid_list = []
tid = self.app.tm.getLastTID()
conn = self.getFakeConnection(client_uuid, self.client_address)
......
......@@ -201,9 +201,6 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
client1, cconn1 = self._getClient()
client2, cconn2 = self._getClient()
client3, cconn3 = self._getClient()
tid1 = self.getNextTID()
tid2 = self.getNextTID(tid1)
tid3 = self.getNextTID(tid2)
oid_list = [self.getOID(), ]
# Some shortcuts to simplify test code
......@@ -215,25 +212,13 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
# Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock
msg_id_1 = 1
tm.prepare(client1, tid1, oid_list, [node1.getUUID(), node2.getUUID()], msg_id_1)
tid1 = tm.begin()
tm.prepare(client1, tid1, oid_list,
[node1.getUUID(), node2.getUUID()], msg_id_1)
tm.lock(tid1, node2.getUUID())
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
tm.prepare(client2, tid2, oid_list, [node1.getUUID(), node2.getUUID()], msg_id_2)
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
tm.prepare(client3, tid3, oid_list, [node2.getUUID(), ], msg_id_3)
# Assert initial state
self.checkNoPacketSent(cconn1)
self.checkNoPacketSent(cconn2)
self.checkNoPacketSent(cconn3)
# Storage 1 dies
node1.setTemporarilyDown()
self.service.nodeLost(conn1, node1)
# Check state after node lost
# T1: last locking node lost, client receives AnswerTransactionFinished
self.checkAnswerTransactionFinished(cconn1)
self.checkNotifyUnlockInformation(conn2)
......@@ -241,10 +226,24 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
# ...and notifications are sent to other clients
self.checkInvalidateObjects(cconn2)
self.checkInvalidateObjects(cconn3)
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
tid2 = tm.begin()
tm.prepare(client2, tid2, oid_list,
[node1.getUUID(), node2.getUUID()], msg_id_2)
# T2: pending locking answer, client keeps waiting
self.checkNoPacketSent(cconn2, check_notify=False)
tm.remove(tid2)
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
tid3 = tm.begin()
tm.prepare(client3, tid3, oid_list,
[node2.getUUID(), ], msg_id_3)
# T3: action not significant to this transacion, so no response
self.checkNoPacketSent(cconn3, check_notify=False)
tm.remove(tid3)
def test_answerPack(self):
# Note: incomming status has no meaning here, so it's left to False.
......
......@@ -22,7 +22,7 @@ from neo.tests import NeoUnitTestBase
from neo.protocol import ZERO_TID
from neo.master.transactions import Transaction, TransactionManager
from neo.master.transactions import packTID, unpackTID, addTID
from neo.master.transactions import packTID, unpackTID, addTID, DelayedError
class testTransactionManager(NeoUnitTestBase):
......@@ -90,26 +90,19 @@ class testTransactionManager(NeoUnitTestBase):
storage_2_uuid = self.makeUUID(2)
txnman = TransactionManager()
# register 4 transactions made by two nodes
tid11 = txnman.begin()
txnman.prepare(node1, tid11, oid_list, [storage_1_uuid], 1)
tid12 = txnman.begin()
txnman.prepare(node1, tid12, oid_list, [storage_1_uuid], 2)
tid21 = txnman.begin()
txnman.prepare(node2, tid21, oid_list, [storage_2_uuid], 3)
tid22 = txnman.begin()
txnman.prepare(node2, tid22, oid_list, [storage_2_uuid], 4)
self.assertTrue(tid11 < tid12 < tid21 < tid22)
self.assertEqual(len(txnman.getPendingList()), 4)
# abort transactions of one node
txnman.abortFor(node1)
tid_list = txnman.getPendingList()
self.assertEqual(len(tid_list), 2)
self.assertTrue(tid21 in tid_list)
self.assertTrue(tid22 in tid_list)
# then the other
self.assertEqual(txnman.getPendingList(), [])
tid1 = txnman.begin()
txnman.prepare(node1, tid1, oid_list, [storage_1_uuid], 1)
self.assertEqual(txnman.getPendingList(), [tid1])
# abort transactions of another node, transaction stays
txnman.abortFor(node2)
self.assertEqual(txnman.getPendingList(), [tid1])
# abort transactions of requesting node, transaction is removed
txnman.abortFor(node1)
self.assertEqual(txnman.getPendingList(), [])
self.assertFalse(txnman.hasPending())
# ...and we can start another transaction
tid2 = txnman.begin()
def test_getNextOIDList(self):
txnman = TransactionManager()
......@@ -138,6 +131,9 @@ class testTransactionManager(NeoUnitTestBase):
txnman.setLastTID(ntid)
self.assertEqual(txnman.getLastTID(), ntid)
self.assertTrue(ntid > tid1)
# If a new TID is generated, DelayedError is raised
self.assertRaises(DelayedError, txnman.begin)
txnman.remove(tid1)
# new trancation
node2 = Mock({'__hash__': 2})
tid2 = txnman.begin()
......@@ -159,36 +155,39 @@ class testTransactionManager(NeoUnitTestBase):
tid1 = tm.begin()
tm.prepare(client1, tid1, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_1)
tm.lock(tid1, storage_2_uuid)
t1 = tm[tid1]
self.assertFalse(t1.locked())
# Storage 1 dies:
# t1 is over
self.assertTrue(t1.forget(storage_1_uuid))
self.assertEqual(t1.getUUIDList(), [storage_2_uuid])
tm.remove(tid1)
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
tid2 = tm.begin()
tm.prepare(client2, tid2, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_2)
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
tid3 = tm.begin()
tm.prepare(client3, tid3, oid_list, [storage_2_uuid, ], msg_id_3)
t1 = tm[tid1]
t2 = tm[tid2]
t3 = tm[tid3]
# Assert initial state
self.assertFalse(t1.locked())
self.assertFalse(t2.locked())
self.assertFalse(t3.locked())
# Storage 1 dies:
# t1 is over
self.assertTrue(t1.forget(storage_1_uuid))
self.assertEqual(t1.getUUIDList(), [storage_2_uuid])
# t2 still waits for storage 2
self.assertFalse(t2.forget(storage_1_uuid))
self.assertEqual(t2.getUUIDList(), [storage_2_uuid])
self.assertTrue(t2.lock(storage_2_uuid))
tm.remove(tid2)
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
tid3 = tm.begin()
tm.prepare(client3, tid3, oid_list, [storage_2_uuid, ], msg_id_3)
t3 = tm[tid3]
self.assertFalse(t3.locked())
# Storage 1 dies:
# t3 doesn't care
self.assertFalse(t3.forget(storage_1_uuid))
self.assertEqual(t3.getUUIDList(), [storage_2_uuid])
self.assertTrue(t3.lock(storage_2_uuid))
tm.remove(tid3)
def testTIDUtils(self):
"""
......@@ -216,5 +215,19 @@ class testTransactionManager(NeoUnitTestBase):
unpackTID(addTID(packTID(((2010, 11, 30, 23, 59), 2**32 - 1)), 1)),
((2010, 12, 1, 0, 0), 0))
def testTransactionLock(self):
"""
Transaction lock is present to ensure invalidation TIDs are sent in
strictly increasing order.
Note: this implementation might change later, to allow more paralelism.
"""
tm = TransactionManager()
tid1 = tm.begin()
# Further calls fail with DelayedError
self.assertRaises(DelayedError, tm.begin)
# ...until tid1 gets removed
tm.remove(tid1)
tid2 = tm.begin()
if __name__ == '__main__':
unittest.main()
......@@ -236,8 +236,10 @@ class ProtocolTests(NeoUnitTestBase):
def test_32_askBeginTransaction(self):
p = Packets.AskBeginTransaction()
self.assertEqual(p.decode(), ())
tid = self.getNextTID()
p = Packets.AskBeginTransaction(tid)
ptid = p.decode()[0]
self.assertEqual(tid, ptid)
def test_33_answerBeginTransaction(self):
tid = self.getNextTID()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment