diff --git a/neo/master/handlers/storage.py b/neo/master/handlers/storage.py index 568813b2cd504f275caec5b28b0896c8332d7562..1723c20cfcab014ebf7fee90feb01cf25f58d129 100644 --- a/neo/master/handlers/storage.py +++ b/neo/master/handlers/storage.py @@ -41,6 +41,10 @@ class StorageServiceHandler(BaseServiceHandler): # this is intentionaly placed after the raise because the last cell in a # partition must not oudated to allows a cluster restart. self.app.outdateAndBroadcastPartition() + uuid = conn.getUUID() + for tid, transaction in self.app.tm.items(): + if transaction.forget(uuid): + self._afterLock(tid) def askLastIDs(self, conn): app = self.app @@ -65,10 +69,15 @@ class StorageServiceHandler(BaseServiceHandler): t = tm[tid] if not t.lock(uuid): return + self._afterLock(tid) + def _afterLock(self, tid): # I have received all the lock answers now: # - send a Notify Transaction Finished to the initiated client node # - Invalidate Objects to the other client nodes + app = self.app + tm = app.tm + t = tm[tid] nm = app.nm transaction_node = t.getNode() invalidate_objects = Packets.InvalidateObjects(tid, t.getOIDList()) diff --git a/neo/master/transactions.py b/neo/master/transactions.py index d86f587215d96804f89f347eaf1ce3d5fcca75aa..44f3dc110d297e5098a2f56343cdd5644bc3dbd1 100644 --- a/neo/master/transactions.py +++ b/neo/master/transactions.py @@ -85,6 +85,18 @@ class Transaction(object): self._uuid_dict = dict.fromkeys(uuid_list, False) self._msg_id = msg_id + def forget(self, uuid): + """ + Given storage was lost while waiting for its lock, stop waiting + for it. + Does nothing if the node was not part of the transaction. + """ + # XXX: We might loose information that a storage successfully locked + # data but was later found to be disconnected. This loss has no impact + # on current code, but it might be disturbing to reader or future code. + self._uuid_dict.pop(uuid, None) + return self.locked() + def lock(self, uuid): """ Define that a node has locked the transaction @@ -126,6 +138,9 @@ class TransactionManager(object): """ return tid in self._tid_dict + def items(self): + return self._tid_dict.items() + def _nextTID(self): """ Compute the next TID based on the current time and check collisions """ tm = time() diff --git a/neo/tests/master/testStorageHandler.py b/neo/tests/master/testStorageHandler.py index 650d38bd9768d461ff8a60a55899d9ac79d1a619..7cc720b681101b17cb2daa77a7fcc33b82c39be3 100644 --- a/neo/tests/master/testStorageHandler.py +++ b/neo/tests/master/testStorageHandler.py @@ -192,6 +192,59 @@ class MasterStorageHandlerTests(NeoTestBase): self._testWithMethod(self.service.connectionClosed, NodeStates.TEMPORARILY_DOWN) + def test_nodeLostAfterAskLockInformation(self): + # 2 storage nodes, one will die + node1, conn1 = self._getStorage() + node2, conn2 = self._getStorage() + # client nodes, to distinguish answers for the sample transactions + client1, cconn1 = self._getClient() + client2, cconn2 = self._getClient() + client3, cconn3 = self._getClient() + tid1 = self.getNextTID() + tid2 = self.getNextTID(tid1) + tid3 = self.getNextTID(tid2) + oid_list = [self.getOID(), ] + + # Some shortcuts to simplify test code + self.app.pt = Mock({'operational': True}) + self.app.outdateAndBroadcastPartition = lambda: None + + # Register some transactions + tm = self.app.tm + # Transaction 1: 2 storage nodes involved, one will die and the other + # already answered node lock + msg_id_1 = 1 + tm.begin(client1, tid1) + tm.prepare(tid1, oid_list, [node1.getUUID(), node2.getUUID()], msg_id_1) + tm.lock(tid1, node2.getUUID()) + # Transaction 2: 2 storage nodes involved, one will die + msg_id_2 = 2 + tm.begin(client2, tid2) + tm.prepare(tid2, oid_list, [node1.getUUID(), node2.getUUID()], msg_id_2) + # Transaction 3: 1 storage node involved, which won't die + msg_id_3 = 3 + tm.begin(client3, tid3) + tm.prepare(tid3, oid_list, [node2.getUUID(), ], msg_id_3) + + # Assert initial state + self.checkNoPacketSent(cconn1) + self.checkNoPacketSent(cconn2) + self.checkNoPacketSent(cconn3) + + # Storage 1 dies + node1.setTemporarilyDown() + self.service.nodeLost(conn1, node1) + + # Check state after node lost + # T1: last locking node lost, client receives AnswerTransactionFinished + self.checkAnswerTransactionFinished(cconn1) + # ...and notifications are sent to other clients + self.checkInvalidateObjects(cconn2) + self.checkInvalidateObjects(cconn3) + # T2: pending locking answer, client keeps waiting + self.checkNoPacketSent(cconn2, check_notify=False) + # T3: action not significant to this transacion, so no response + self.checkNoPacketSent(cconn3, check_notify=False) if __name__ == '__main__': unittest.main() diff --git a/neo/tests/master/testTransactions.py b/neo/tests/master/testTransactions.py index 651a86f0b9a950dae7b5059f48019d94b2630e86..34e44bda96898f8b706efcaf34b4aec2f8e2e67d 100644 --- a/neo/tests/master/testTransactions.py +++ b/neo/tests/master/testTransactions.py @@ -119,7 +119,51 @@ class testTransactionManager(NeoTestBase): self.assertTrue(tid2 is not None) self.assertTrue(tid2 > ntid > tid1) + def test_forget(self): + client1 = Mock({'__hash__': 1}) + client2 = Mock({'__hash__': 2}) + client3 = Mock({'__hash__': 3}) + storage_1_uuid = self.makeUUID(1) + storage_2_uuid = self.makeUUID(2) + tid1 = self.makeTID(1) + tid2 = self.makeTID(2) + tid3 = self.makeTID(3) + oid_list = [self.makeOID(1), ] + tm = TransactionManager() + # Transaction 1: 2 storage nodes involved, one will die and the other + # already answered node lock + msg_id_1 = 1 + tm.begin(client1, tid1) + tm.prepare(tid1, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_1) + tm.lock(tid1, storage_2_uuid) + # Transaction 2: 2 storage nodes involved, one will die + msg_id_2 = 2 + tm.begin(client2, tid2) + tm.prepare(tid2, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_2) + # Transaction 3: 1 storage node involved, which won't die + msg_id_3 = 3 + tm.begin(client3, tid3) + tm.prepare(tid3, oid_list, [storage_2_uuid, ], msg_id_3) + + t1 = tm[tid1] + t2 = tm[tid2] + t3 = tm[tid3] + + # Assert initial state + self.assertFalse(t1.locked()) + self.assertFalse(t2.locked()) + self.assertFalse(t3.locked()) + + # Storage 1 dies: + # t1 is over + self.assertTrue(t1.forget(storage_1_uuid)) + # t2 still waits for storage 2 + self.assertFalse(t2.forget(storage_1_uuid)) + self.assertTrue(t2.lock(storage_2_uuid)) + # t3 doesn't care + self.assertFalse(t3.forget(storage_1_uuid)) + self.assertTrue(t3.lock(storage_2_uuid)) if __name__ == '__main__': unittest.main()