diff --git a/neo/storage/handlers/client.py b/neo/storage/handlers/client.py index fce46507e19ad8bc28189b0635625060d4878479..4df754a1dde4430cc18ae6e00c33d3ab23de21ff 100644 --- a/neo/storage/handlers/client.py +++ b/neo/storage/handlers/client.py @@ -43,16 +43,23 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler): def askStoreTransaction(self, conn, tid, user, desc, ext, oid_list): - uuid = conn.getUUID() - self.app.tm.storeTransaction(uuid, tid, oid_list, user, desc, ext, + self.app.tm.register(conn.getUUID(), tid) + self.app.tm.storeTransaction(tid, oid_list, user, desc, ext, False) conn.answer(Packets.AnswerStoreTransaction(tid)) def _askStoreObject(self, conn, oid, serial, compression, checksum, data, tid, request_time): - uuid = conn.getUUID() + if tid not in self.app.tm: + # transaction was aborted, cancel this event + logging.info('Forget store of %s:%s by %s delayed by %s', + dump(oid), dump(serial), dump(tid), + dump(self.app.tm.getLockingTID(oid))) + # send an answer as the client side is waiting for it + conn.answer(Packets.AnswerStoreObject(0, oid, serial)) + return try: - self.app.tm.storeObject(uuid, tid, serial, oid, compression, + self.app.tm.storeObject(tid, serial, oid, compression, checksum, data, None) except ConflictError, err: # resolvable or not @@ -71,6 +78,8 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler): def askStoreObject(self, conn, oid, serial, compression, checksum, data, tid): + # register the transaction + self.app.tm.register(conn.getUUID(), tid) self._askStoreObject(conn, oid, serial, compression, checksum, data, tid, time.time()) diff --git a/neo/storage/transactions.py b/neo/storage/transactions.py index d731eec36a5ee6e96174e310ea2fbad8ece806fa..d722f042abaaf59bac93919db8a56c38071d9d68 100644 --- a/neo/storage/transactions.py +++ b/neo/storage/transactions.py @@ -123,9 +123,9 @@ class TransactionManager(object): """ return tid in self._transaction_dict - def _getTransaction(self, tid, uuid): + def register(self, uuid, tid): """ - Get or create the transaction object for this tid + Register a transaction, it may be already registered """ transaction = self._transaction_dict.get(tid, None) if transaction is None: @@ -193,17 +193,18 @@ class TransactionManager(object): self._loid = self._loid_seen self._app.dm.setLastOID(self._loid) - def storeTransaction(self, uuid, tid, oid_list, user, desc, ext, packed): + def storeTransaction(self, tid, oid_list, user, desc, ext, packed): """ Store transaction information received from client node """ - transaction = self._getTransaction(tid, uuid) + assert tid in self, "Transaction not registered" + transaction = self._transaction_dict[tid] transaction.prepare(oid_list, user, desc, ext, packed) def getLockingTID(self, oid): return self._store_lock_dict.get(oid) - def storeObject(self, uuid, tid, serial, oid, compression, checksum, data, + def storeObject(self, tid, serial, oid, compression, checksum, data, value_serial): """ Store an object received from client node @@ -235,7 +236,8 @@ class TransactionManager(object): raise ConflictError(locking_tid) # store object - transaction = self._getTransaction(tid, uuid) + assert tid in self, "Transaction not registered" + transaction = self._transaction_dict[tid] transaction.addObject(oid, compression, checksum, data, value_serial) # update loid diff --git a/neo/tests/storage/testClientHandler.py b/neo/tests/storage/testClientHandler.py index f2533d37f73da75d33befdc7ca7d1fdbce13f590..41bfce7a9cb625af24dacba18c8683ae09cfde54 100644 --- a/neo/tests/storage/testClientHandler.py +++ b/neo/tests/storage/testClientHandler.py @@ -44,7 +44,7 @@ class StorageClientHandlerTests(NeoTestBase): self.app.store_lock_dict = {} self.app.load_lock_dict = {} self.app.event_queue = deque() - self.app.tm = Mock() + self.app.tm = Mock({'__contains__': True}) # handler self.operation = ClientOperationHandler(self.app) # set pmn @@ -211,7 +211,7 @@ class StorageClientHandlerTests(NeoTestBase): oid, serial, comp, checksum, data = self._getObject() self.operation.askStoreObject(conn, oid, serial, comp, checksum, data, tid) - self._checkStoreObjectCalled(uuid, tid, serial, oid, comp, + self._checkStoreObjectCalled(tid, serial, oid, comp, checksum, data, None) self.checkAnswerStoreObject(conn) diff --git a/neo/tests/storage/testTransactions.py b/neo/tests/storage/testTransactions.py index 1f22f90893ce4e6a13c1b8ebfed9992a0a80b0ee..ff87277fddf18b5f57847e39a79972d25fd9284c 100644 --- a/neo/tests/storage/testTransactions.py +++ b/neo/tests/storage/testTransactions.py @@ -96,7 +96,7 @@ class TransactionManagerTests(NeoTestBase): def _storeTransactionObjects(self, tid, txn): for i, oid in enumerate(txn[0]): - self.manager.storeObject(self.getNewUUID(), tid, None, + self.manager.storeObject(tid, None, oid, 1, str(i), '0' + str(i), None) def _getObject(self, value): @@ -117,16 +117,17 @@ class TransactionManagerTests(NeoTestBase): def _checkQueuedEventExecuted(self, number=1): calls = self.app.mockGetNamedCalls('executeQueuedEvents') self.assertEqual(len(calls), number) - + def testSimpleCase(self): """ One node, one transaction, not abort """ uuid = self.getNewUUID() tid, txn = self._getTransaction() serial1, object1 = self._getObject(1) serial2, object2 = self._getObject(2) - self.manager.storeTransaction(uuid, tid, *txn) - self.manager.storeObject(uuid, tid, serial1, *object1) - self.manager.storeObject(uuid, tid, serial2, *object2) + self.manager.register(uuid,tid) + self.manager.storeTransaction(tid, *txn) + self.manager.storeObject(tid, serial1, *object1) + self.manager.storeObject(tid, serial2, *object2) self.assertTrue(tid in self.manager) self.manager.lock(tid, txn[0]) self._checkTransactionStored(tid, [object1, object2], txn) @@ -141,15 +142,17 @@ class TransactionManagerTests(NeoTestBase): tid2, txn2 = self._getTransaction() serial, obj = self._getObject(1) # first transaction lock the object - self.manager.storeTransaction(uuid, tid1, *txn1) + self.manager.register(uuid, tid1) + self.manager.storeTransaction(tid1, *txn1) self.assertTrue(tid1 in self.manager) self._storeTransactionObjects(tid1, txn1) self.manager.lock(tid1, txn1[0]) # the second is delayed - self.manager.storeTransaction(uuid, tid2, *txn2) + self.manager.register(uuid, tid2) + self.manager.storeTransaction(tid2, *txn2) self.assertTrue(tid2 in self.manager) - self.assertRaises(DelayedError, self.manager.storeObject, - uuid, tid2, serial, *obj) + self.assertRaises(DelayedError, self.manager.storeObject, + tid2, serial, *obj) def testUnresolvableConflict(self): """ A newer transaction has already modified an object """ @@ -158,16 +161,18 @@ class TransactionManagerTests(NeoTestBase): tid2, txn2 = self._getTransaction() serial, obj = self._getObject(1) # the (later) transaction lock (change) the object - self.manager.storeTransaction(uuid, tid2, *txn2) - self.manager.storeObject(uuid, tid2, serial, *obj) + self.manager.register(uuid, tid2) + self.manager.storeTransaction(tid2, *txn2) + self.manager.storeObject(tid2, serial, *obj) self.assertTrue(tid2 in self.manager) self._storeTransactionObjects(tid2, txn2) self.manager.lock(tid2, txn2[0]) # the previous it's not using the latest version - self.manager.storeTransaction(uuid, tid1, *txn1) + self.manager.register(uuid, tid1) + self.manager.storeTransaction(tid1, *txn1) self.assertTrue(tid1 in self.manager) - self.assertRaises(ConflictError, self.manager.storeObject, - uuid, tid1, serial, *obj) + self.assertRaises(ConflictError, self.manager.storeObject, + tid1, serial, *obj) def testResolvableConflict(self): """ Try to store an object with the lastest revision """ @@ -177,9 +182,10 @@ class TransactionManagerTests(NeoTestBase): next_serial = self.getNextTID(serial) # try to store without the last revision self.app.dm = Mock({'getObjectHistory': [next_serial]}) - self.manager.storeTransaction(uuid, tid, *txn) - self.assertRaises(ConflictError, self.manager.storeObject, - uuid, tid, serial, *obj) + self.manager.register(uuid, tid) + self.manager.storeTransaction(tid, *txn) + self.assertRaises(ConflictError, self.manager.storeObject, + tid, serial, *obj) def testLockDelayed(self): """ Check lock delaytion""" @@ -191,18 +197,20 @@ class TransactionManagerTests(NeoTestBase): serial1, obj1 = self._getObject(1) serial2, obj2 = self._getObject(2) # first transaction lock objects - self.manager.storeTransaction(uuid1, tid1, *txn1) + self.manager.register(uuid1, tid1) + self.manager.storeTransaction(tid1, *txn1) self.assertTrue(tid1 in self.manager) - self.manager.storeObject(uuid1, tid1, serial1, *obj1) - self.manager.storeObject(uuid1, tid1, serial1, *obj2) + self.manager.storeObject(tid1, serial1, *obj1) + self.manager.storeObject(tid1, serial1, *obj2) self.manager.lock(tid1, txn1[0]) # second transaction is delayed - self.manager.storeTransaction(uuid2, tid2, *txn2) + self.manager.register(uuid2, tid2) + self.manager.storeTransaction(tid2, *txn2) self.assertTrue(tid2 in self.manager) self.assertRaises(DelayedError, self.manager.storeObject, - uuid2, tid2, serial1, *obj1) + tid2, serial1, *obj1) self.assertRaises(DelayedError, self.manager.storeObject, - uuid2, tid2, serial2, *obj2) + tid2, serial2, *obj2) def testLockConflict(self): """ Check lock conflict """ @@ -214,26 +222,29 @@ class TransactionManagerTests(NeoTestBase): serial1, obj1 = self._getObject(1) serial2, obj2 = self._getObject(2) # the second transaction lock objects - self.manager.storeTransaction(uuid2, tid2, *txn2) - self.manager.storeObject(uuid2, tid2, serial1, *obj1) - self.manager.storeObject(uuid2, tid2, serial2, *obj2) + self.manager.register(uuid2, tid2) + self.manager.storeTransaction(tid2, *txn2) + self.manager.storeObject(tid2, serial1, *obj1) + self.manager.storeObject(tid2, serial2, *obj2) self.assertTrue(tid2 in self.manager) self.manager.lock(tid2, txn1[0]) # the first get a conflict - self.manager.storeTransaction(uuid1, tid1, *txn1) + self.manager.register(uuid1, tid1) + self.manager.storeTransaction(tid1, *txn1) self.assertTrue(tid1 in self.manager) self.assertRaises(ConflictError, self.manager.storeObject, - uuid1, tid1, serial1, *obj1) + tid1, serial1, *obj1) self.assertRaises(ConflictError, self.manager.storeObject, - uuid1, tid1, serial2, *obj2) + tid1, serial2, *obj2) def testAbortUnlocked(self): """ Abort a non-locked transaction """ uuid = self.getNewUUID() tid, txn = self._getTransaction() serial, obj = self._getObject(1) - self.manager.storeTransaction(uuid, tid, *txn) - self.manager.storeObject(uuid, tid, serial, *obj) + self.manager.register(uuid, tid) + self.manager.storeTransaction(tid, *txn) + self.manager.storeObject(tid, serial, *obj) self.assertTrue(tid in self.manager) # transaction is not locked self.manager.abort(tid) @@ -245,7 +256,8 @@ class TransactionManagerTests(NeoTestBase): """ Try to abort a locked transaction """ uuid = self.getNewUUID() tid, txn = self._getTransaction() - self.manager.storeTransaction(uuid, tid, *txn) + self.manager.register(uuid, tid) + self.manager.storeTransaction(tid, *txn) self._storeTransactionObjects(tid, txn) # lock transaction self.manager.lock(tid, txn[0]) @@ -255,7 +267,7 @@ class TransactionManagerTests(NeoTestBase): for oid in txn[0]: self.assertTrue(self.manager.loadLocked(oid)) self._checkQueuedEventExecuted(number=0) - + def testAbortForNode(self): """ Abort transaction for a node """ uuid1 = self.getNewUUID() @@ -264,10 +276,13 @@ class TransactionManagerTests(NeoTestBase): tid1, txn1 = self._getTransaction() tid2, txn2 = self._getTransaction() tid3, txn3 = self._getTransaction() - self.manager.storeTransaction(uuid1, tid1, *txn1) + self.manager.register(uuid1, tid1) + self.manager.register(uuid2, tid2) + self.manager.register(uuid2, tid3) + self.manager.storeTransaction(tid1, *txn1) # node 2 owns tid2 & tid3 and lock tid2 only - self.manager.storeTransaction(uuid2, tid2, *txn2) - self.manager.storeTransaction(uuid2, tid3, *txn3) + self.manager.storeTransaction(tid2, *txn2) + self.manager.storeTransaction(tid3, *txn3) self._storeTransactionObjects(tid2, txn2) self.manager.lock(tid2, txn2[0]) self.assertTrue(tid1 in self.manager) @@ -279,12 +294,13 @@ class TransactionManagerTests(NeoTestBase): self.assertTrue(tid2 in self.manager) self.assertFalse(tid3 in self.manager) self._checkQueuedEventExecuted(number=1) - + def testReset(self): """ Reset the manager """ uuid = self.getNewUUID() tid, txn = self._getTransaction() - self.manager.storeTransaction(uuid, tid, *txn) + self.manager.register(uuid, tid) + self.manager.storeTransaction(tid, *txn) self._storeTransactionObjects(tid, txn) self.manager.lock(tid, txn[0]) self.assertTrue(tid in self.manager) @@ -299,7 +315,8 @@ class TransactionManagerTests(NeoTestBase): tid2, txn2 = self._getTransaction() serial1, obj1 = self._getObject(1) serial2, obj2 = self._getObject(2) - self.manager.storeObject(uuid, tid1, serial1, *obj1) + self.manager.register(uuid, tid1) + self.manager.storeObject(tid1, serial1, *obj1) self.assertEqual(self.manager.getObjectFromTransaction(tid2, obj1[0]), None) self.assertEqual(self.manager.getObjectFromTransaction(tid1, obj2[0]), @@ -313,7 +330,8 @@ class TransactionManagerTests(NeoTestBase): oid1 = obj1[0] tid1, txn1 = self._getTransaction() self.assertEqual(self.manager.getLockingTID(oid1), None) - self.manager.storeObject(uuid, tid1, serial1, *obj1) + self.manager.register(uuid, tid1) + self.manager.storeObject(tid1, serial1, *obj1) self.assertEqual(self.manager.getLockingTID(oid1), tid1) if __name__ == "__main__":