testClientHandler.py 21.7 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#
# Copyright (C) 2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

18 19
import os
import unittest
20
from neo import logging
21 22
from mock import Mock
from struct import pack, unpack
23
from neo.tests import NeoTestBase
24
import neo.master
25
from neo import protocol
26
from neo.protocol import Packet, INVALID_UUID
27
from neo.master.handlers.client import ClientServiceHandler
28
from neo.master.app import Application
29 30 31
from neo.protocol import ERROR, PING, PONG, ANNOUNCE_PRIMARY_MASTER, \
     REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION,  \
     ASK_LAST_IDS, ANSWER_LAST_IDS, NOTIFY_PARTITION_CHANGES, \
32
     ASK_UNFINISHED_TRANSACTIONS, ASK_BEGIN_TRANSACTION, FINISH_TRANSACTION, \
33
     NOTIFY_INFORMATION_LOCKED, ASK_NEW_OIDS, ABORT_TRANSACTION, \
34 35 36 37
     STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, MASTER_NODE_TYPE, \
     RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
     UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, DISCARDED_STATE
from neo.exception import OperationFailure, ElectionFailure     
Grégory Wisniewski's avatar
Grégory Wisniewski committed
38
from neo.node import MasterNode, StorageNode, ClientNode
39

40
class MasterClientHandlerTests(NeoTestBase):
41 42 43

    def setUp(self):
        # create an application object
44 45
        config = self.getMasterConfiguration(master_number=1, replicas=1)
        self.app = Application(**config)
46
        self.app.pt.clear()
47
        self.app.pt.setID(pack('!Q', 1))
48
        self.app.em = Mock({"getConnectionList" : []})
Grégory Wisniewski's avatar
Grégory Wisniewski committed
49 50
        self.app.loid = '\0' * 8
        self.app.ltid = '\0' * 8
51 52 53
        self.app.finishing_transaction_dict = {}
        for server in self.app.master_node_list:
            self.app.nm.add(MasterNode(server = server))
54
        self.service = ClientServiceHandler(self.app)
55 56 57 58
        # define some variable to simulate client and storage node
        self.client_port = 11022
        self.storage_port = 10021
        self.master_port = 10010
59 60 61
        self.master_address = ('127.0.0.1', self.master_port)
        self.client_address = ('127.0.0.1', self.client_port)
        self.storage_address = ('127.0.0.1', self.storage_port)
62 63 64
        # register the storage
        kw = {'uuid':self.getNewUUID(), 'server': self.master_address}
        self.app.nm.add(StorageNode(**kw))
65
        
66
    def tearDown(self):
67
        NeoTestBase.tearDown(self)
68 69 70 71 72 73

    def getLastUUID(self):
        return self.uuid

    def identifyToMasterNode(self, node_type=STORAGE_NODE_TYPE, ip="127.0.0.1",
                             port=10021):
74 75
        """Do first step of identification to MN """
        # register the master itself
76
        uuid = self.getNewUUID()
Grégory Wisniewski's avatar
Grégory Wisniewski committed
77 78 79 80 81 82 83 84
        address = (ip, port)
        if node_type == MASTER_NODE_TYPE:
            node = MasterNode(address, uuid)
        elif node_type == CLIENT_NODE_TYPE:
            node = ClientNode(address, uuid)
        else:
            node = StorageNode(address, uuid)
        self.app.nm.add(node)
85 86 87 88 89 90
        return uuid

    # Tests
    def test_05_handleNotifyNodeInformation(self):
        service = self.service
        uuid = self.identifyToMasterNode()
91
        packet = Packet(msg_type=NOTIFY_NODE_INFORMATION)
92
        # tell the master node that is not running any longer, it must raises
93
        conn = self.getFakeConnection(uuid, self.storage_address)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
94
        node_list = [(MASTER_NODE_TYPE, ('127.0.0.1', self.master_port), self.app.uuid, DOWN_STATE),]
95 96
        self.assertRaises(RuntimeError, service.handleNotifyNodeInformation, conn, packet, node_list)
        # tell the master node that it's running, nothing change
97
        conn = self.getFakeConnection(uuid, self.storage_address)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
98
        node_list = [(MASTER_NODE_TYPE, ('127.0.0.1', self.master_port), self.app.uuid, RUNNING_STATE),]
99 100 101 102 103
        service.handleNotifyNodeInformation(conn, packet, node_list)
        for call in conn.mockGetAllCalls():
            self.assertEquals(call.getName(), "getUUID")
        # notify about a client node, don't care
        new_uuid = self.getNewUUID()
104
        conn = self.getFakeConnection(uuid, self.storage_address)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
105
        node_list = [(CLIENT_NODE_TYPE, ('127.0.0.1', self.client_port), new_uuid, BROKEN_STATE),]
106 107 108 109
        service.handleNotifyNodeInformation(conn, packet, node_list)
        for call in conn.mockGetAllCalls():
            self.assertEquals(call.getName(), "getUUID")
        # notify about an unknown node, don't care
110
        conn = self.getFakeConnection(uuid, self.storage_address)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
111
        node_list = [(STORAGE_NODE_TYPE, ('127.0.0.1', 11010), new_uuid, BROKEN_STATE),]
112 113 114 115 116
        service.handleNotifyNodeInformation(conn, packet, node_list)
        for call in conn.mockGetAllCalls():
            self.assertEquals(call.getName(), "getUUID")
        # notify about a known node but with bad address, don't care
        self.app.nm.add(StorageNode(("127.0.0.1", 11011), self.getNewUUID()))
117
        conn = self.getFakeConnection(uuid, self.storage_address)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
118
        node_list = [(STORAGE_NODE_TYPE, ('127.0.0.1', 11012), uuid, BROKEN_STATE),]
119 120 121 122
        service.handleNotifyNodeInformation(conn, packet, node_list)
        for call in conn.mockGetAllCalls():
            self.assertEquals(call.getName(), "getUUID")
        # notify node is running, as PMN already know it, nothing is done
123
        conn = self.getFakeConnection(uuid, self.storage_address)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
124
        node_list = [(STORAGE_NODE_TYPE, ('127.0.0.1', self.storage_port), uuid, RUNNING_STATE),]
125 126 127 128
        service.handleNotifyNodeInformation(conn, packet, node_list)
        for call in conn.mockGetAllCalls():
            self.assertEquals(call.getName(), "getUUID")
        # notify node is temp down, must be taken into account
129
        ptid = self.app.pt.getID()
130
        conn = self.getFakeConnection(uuid, self.storage_address)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
131
        node_list = [(STORAGE_NODE_TYPE, ('127.0.0.1', self.storage_port), uuid, TEMPORARILY_DOWN_STATE),]
132 133 134 135 136
        service.handleNotifyNodeInformation(conn, packet, node_list)
        for call in conn.mockGetAllCalls():
            self.assertEquals(call.getName(), "getUUID")
        sn = self.app.nm.getStorageNodeList()[0]
        self.assertEquals(sn.getState(), TEMPORARILY_DOWN_STATE)
137
        self.assertEquals(ptid, self.app.pt.getID())
138
        # notify node is broken, must be taken into account and partition must changed
139
        conn = self.getFakeConnection(uuid, self.storage_address)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
140
        node_list = [(STORAGE_NODE_TYPE, ('127.0.0.1', self.storage_port), uuid, BROKEN_STATE),]
141 142 143 144 145
        service.handleNotifyNodeInformation(conn, packet, node_list)
        for call in conn.mockGetAllCalls():
            self.assertEquals(call.getName(), "getUUID")
        sn = self.app.nm.getStorageNodeList()[0]
        self.assertEquals(sn.getState(), BROKEN_STATE)
146
        self.failUnless(ptid < self.app.pt.getID())
147

148
    
149 150 151
    def test_06_handleAnswerLastIDs(self):
        service = self.service
        uuid = self.identifyToMasterNode()
152
        packet = Packet(msg_type=ANSWER_LAST_IDS)
153 154
        loid = self.app.loid
        ltid = self.app.ltid
155
        lptid = self.app.pt.getID()
156 157
        # do not care if client node call it
        client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port)
158
        conn = self.getFakeConnection(client_uuid, self.client_address)
159
        node_list = []
160
        self.checkUnexpectedPacketRaised(service.handleAnswerLastIDs, conn, packet, None, None, None)
161 162
        self.assertEquals(loid, self.app.loid)
        self.assertEquals(ltid, self.app.ltid)
163
        self.assertEquals(lptid, self.app.pt.getID())
164
        # send information which are later to what PMN knows, this must raise
165
        conn = self.getFakeConnection(uuid, self.storage_address)
166 167 168
        node_list = []
        new_ptid = unpack('!Q', lptid)[0]
        new_ptid = pack('!Q', new_ptid + 1)
169
        self.failUnless(new_ptid > self.app.pt.getID())
170 171 172
        self.assertRaises(OperationFailure, service.handleAnswerLastIDs, conn, packet, None, None, new_ptid)
        self.assertEquals(loid, self.app.loid)
        self.assertEquals(ltid, self.app.ltid)
173
        self.assertEquals(lptid, self.app.pt.getID())
174 175
        

176
    def test_07_handleAskBeginTransaction(self):        
177 178
        service = self.service
        uuid = self.identifyToMasterNode()
179
        packet = Packet(msg_type=ASK_BEGIN_TRANSACTION)
180 181 182
        ltid = self.app.ltid
        # client call it
        client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port)
183
        conn = self.getFakeConnection(client_uuid, self.client_address)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
184
        service.handleAskBeginTransaction(conn, packet, None)
185 186 187 188 189 190 191 192 193
        self.failUnless(ltid < self.app.ltid)
        self.assertEquals(len(self.app.finishing_transaction_dict), 1)
        tid = self.app.finishing_transaction_dict.keys()[0]
        self.assertEquals(tid, self.app.ltid)
        

    def test_08_handleAskNewOIDs(self):        
        service = self.service
        uuid = self.identifyToMasterNode()
194
        packet = Packet(msg_type=ASK_NEW_OIDS)
195 196 197
        loid = self.app.loid
        # client call it
        client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port)
198
        conn = self.getFakeConnection(client_uuid, self.client_address)
199 200 201 202 203 204
        service.handleAskNewOIDs(conn, packet, 1)
        self.failUnless(loid < self.app.loid)

    def test_09_handleFinishTransaction(self):
        service = self.service
        uuid = self.identifyToMasterNode()
205 206
        packet = Packet(msg_type=FINISH_TRANSACTION)
        packet.setId(9)
207 208
        # give an older tid than the PMN known, must abort
        client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port)
209
        conn = self.getFakeConnection(client_uuid, self.client_address)
210 211 212
        oid_list = []
        upper, lower = unpack('!LL', self.app.ltid)
        new_tid = pack('!LL', upper, lower + 10)
213 214 215 216 217
        self.checkUnexpectedPacketRaised(service.handleFinishTransaction, conn, packet, oid_list, new_tid)
        old_node = self.app.nm.getNodeByUUID(uuid)
        self.app.nm.remove(old_node)
        self.app.pt.dropNode(old_node)

218 219 220
        # do the right job
        client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port)
        storage_uuid = self.identifyToMasterNode()
221
        storage_conn = self.getFakeConnection(storage_uuid, self.storage_address)
222
        self.assertNotEquals(uuid, client_uuid)
223
        conn = self.getFakeConnection(client_uuid, self.client_address)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
224
        service.handleAskBeginTransaction(conn, packet, None)
225 226
        oid_list = []
        tid = self.app.ltid
227
        conn = self.getFakeConnection(client_uuid, self.client_address)
228
        self.app.em = Mock({"getConnectionList" : [conn, storage_conn]})
229
        service.handleFinishTransaction(conn, packet, oid_list, tid)
230
        self.checkLockInformation(storage_conn)
231 232 233 234 235 236 237 238 239 240 241 242
        self.assertEquals(len(self.app.finishing_transaction_dict), 1)
        apptid = self.app.finishing_transaction_dict.keys()[0]
        self.assertEquals(tid, apptid)
        txn = self.app.finishing_transaction_dict.values()[0]
        self.assertEquals(len(txn.getOIDList()), 0)
        self.assertEquals(len(txn.getUUIDSet()), 1)
        self.assertEquals(txn.getMessageId(), 9)


    def test_11_handleAbortTransaction(self):
        service = self.service
        uuid = self.identifyToMasterNode()
243
        packet = Packet(msg_type=ABORT_TRANSACTION)
244 245
        # give a bad tid, must not failed, just ignored it
        client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port)
246
        conn = self.getFakeConnection(client_uuid, self.client_address)
247 248 249 250
        self.assertEqual(len(self.app.finishing_transaction_dict.keys()), 0)
        service.handleAbortTransaction(conn, packet, None)
        self.assertEqual(len(self.app.finishing_transaction_dict.keys()), 0)        
        # give a known tid
251
        conn = self.getFakeConnection(client_uuid, self.client_address)
252 253 254 255 256 257 258 259 260 261
        tid = self.app.ltid
        self.app.finishing_transaction_dict[tid] = None
        self.assertEqual(len(self.app.finishing_transaction_dict.keys()), 1)
        service.handleAbortTransaction(conn, packet, tid)
        self.assertEqual(len(self.app.finishing_transaction_dict.keys()), 0)


    def test_12_handleAskLastIDs(self):
        service = self.service
        uuid = self.identifyToMasterNode()
262
        packet = Packet(msg_type=ASK_LAST_IDS)
263
        # give a uuid
264
        conn = self.getFakeConnection(uuid, self.storage_address)
265
        ptid = self.app.pt.getID()
Grégory Wisniewski's avatar
Grégory Wisniewski committed
266 267
        self.app.ltid = '\1' * 8
        self.app.loid = '\1' * 8
268
        service.handleAskLastIDs(conn, packet)
269
        packet = self.checkAnswerLastIDs(conn, answered_packet=packet)
270
        loid, ltid, lptid = protocol._decodeAnswerLastIDs(packet._body)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
271 272
        self.assertEqual(loid, self.app.loid)
        self.assertEqual(ltid, self.app.ltid)
273 274 275 276 277 278
        self.assertEqual(lptid, ptid)
        

    def test_13_handleAskUnfinishedTransactions(self):
        service = self.service
        uuid = self.identifyToMasterNode()
279
        packet = Packet(msg_type=ASK_UNFINISHED_TRANSACTIONS)
280
        # give a uuid
281
        conn = self.getFakeConnection(uuid, self.storage_address)
282
        service.handleAskUnfinishedTransactions(conn, packet)
283
        packet = self.checkAnswerUnfinishedTransactions(conn, answered_packet=packet)
284
        tid_list = protocol._decodeAnswerUnfinishedTransactions(packet._body)[0]
285 286 287 288
        self.assertEqual(len(tid_list), 0)
        # create some transaction
        client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE,
                                                port=self.client_port)
289
        conn = self.getFakeConnection(client_uuid, self.client_address)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
290 291 292
        service.handleAskBeginTransaction(conn, packet, None)
        service.handleAskBeginTransaction(conn, packet, None)
        service.handleAskBeginTransaction(conn, packet, None)
293
        conn = self.getFakeConnection(uuid, self.storage_address)
294
        service.handleAskUnfinishedTransactions(conn, packet)
295
        packet = self.checkAnswerUnfinishedTransactions(conn, answered_packet=packet)
296
        tid_list = protocol._decodeAnswerUnfinishedTransactions(packet._body)[0]
297 298 299 300 301 302 303 304 305 306 307 308 309
        self.assertEqual(len(tid_list), 3)
        

    def test_15_peerBroken(self):
        service = self.service
        uuid = self.identifyToMasterNode()
        # add a second storage node and then declare it as broken
        self.identifyToMasterNode(port = self.storage_port+2)
        storage_uuid = self.identifyToMasterNode(port = self.storage_port+1)
        # filled the pt
        self.app.pt.make(self.app.nm.getStorageNodeList())
        self.assertTrue(self.app.pt.filled())
        self.assertTrue(self.app.pt.operational())
310
        conn = self.getFakeConnection(storage_uuid, ('127.0.0.1', self.storage_port+1))
311
        lptid = self.app.pt.getID()
312 313 314
        self.assertEquals(self.app.nm.getNodeByUUID(storage_uuid).getState(), RUNNING_STATE)
        service.peerBroken(conn)
        self.assertEquals(self.app.nm.getNodeByUUID(storage_uuid).getState(), BROKEN_STATE) 
315
        self.failUnless(lptid < self.app.pt.getID())        
316
        # give an uuid, must raise as no other storage node available
317
        conn = self.getFakeConnection(uuid, self.storage_address)
318
        lptid = self.app.pt.getID()
319 320 321
        self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), RUNNING_STATE) 
        self.assertRaises(OperationFailure, service.peerBroken, conn)
        self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), BROKEN_STATE) 
322
        self.failUnless(lptid < self.app.pt.getID())
323 324 325
        # give a client uuid which have unfinished transactions
        client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE,
                                                port = self.client_port)
326
        conn = self.getFakeConnection(client_uuid, self.client_address)
327
        lptid = self.app.pt.getID()
328 329 330 331
        packet = Packet(msg_type=ASK_BEGIN_TRANSACTION)
        service.handleAskBeginTransaction(conn, packet)
        service.handleAskBeginTransaction(conn, packet)
        service.handleAskBeginTransaction(conn, packet)
332 333 334 335 336
        self.assertEquals(self.app.nm.getNodeByUUID(client_uuid).getState(), RUNNING_STATE)
        self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 3)
        service.peerBroken(conn)
        # node must be have been remove, and no more transaction must remains
        self.assertEquals(self.app.nm.getNodeByUUID(client_uuid), None) 
337
        self.assertEquals(lptid, self.app.pt.getID())
338 339 340 341 342 343 344 345 346 347 348 349 350
        self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 0)
        

    def test_16_timeoutExpired(self):
        service = self.service
        uuid = self.identifyToMasterNode()
        # add a second storage node and then declare it as temp down
        self.identifyToMasterNode(port = self.storage_port+2)
        storage_uuid = self.identifyToMasterNode(port = self.storage_port+1)
        # filled the pt
        self.app.pt.make(self.app.nm.getStorageNodeList())
        self.assertTrue(self.app.pt.filled())
        self.assertTrue(self.app.pt.operational())
351
        conn = self.getFakeConnection(storage_uuid, ('127.0.0.1', self.storage_port+1))
352
        lptid = self.app.pt.getID()
353 354 355
        self.assertEquals(self.app.nm.getNodeByUUID(storage_uuid).getState(), RUNNING_STATE)
        service.timeoutExpired(conn)
        self.assertEquals(self.app.nm.getNodeByUUID(storage_uuid).getState(), TEMPORARILY_DOWN_STATE) 
356
        self.assertEquals(lptid, self.app.pt.getID())        
357
        # give an uuid, must raise as no other storage node available
358
        conn = self.getFakeConnection(uuid, self.storage_address)
359
        lptid = self.app.pt.getID()
360 361 362
        self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), RUNNING_STATE) 
        self.assertRaises(OperationFailure, service.timeoutExpired, conn)
        self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), TEMPORARILY_DOWN_STATE) 
363
        self.assertEquals(lptid, self.app.pt.getID())
364 365 366
        # give a client uuid which have unfinished transactions
        client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE,
                                                port = self.client_port)
367
        conn = self.getFakeConnection(client_uuid, self.client_address)
368
        lptid = self.app.pt.getID()
369 370 371 372
        packet = Packet(msg_type=ASK_BEGIN_TRANSACTION)
        service.handleAskBeginTransaction(conn, packet)
        service.handleAskBeginTransaction(conn, packet)
        service.handleAskBeginTransaction(conn, packet)
373 374 375 376 377
        self.assertEquals(self.app.nm.getNodeByUUID(client_uuid).getState(), RUNNING_STATE)
        self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 3)
        service.timeoutExpired(conn)
        # node must be have been remove, and no more transaction must remains
        self.assertEquals(self.app.nm.getNodeByUUID(client_uuid), None) 
378
        self.assertEquals(lptid, self.app.pt.getID())
379 380 381 382 383 384 385 386 387 388 389 390 391
        self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 0)


    def test_17_connectionClosed(self):
        service = self.service
        uuid = self.identifyToMasterNode()
        # add a second storage node and then declare it as temp down
        self.identifyToMasterNode(port = self.storage_port+2)
        storage_uuid = self.identifyToMasterNode(port = self.storage_port+1)
        # filled the pt
        self.app.pt.make(self.app.nm.getStorageNodeList())
        self.assertTrue(self.app.pt.filled())
        self.assertTrue(self.app.pt.operational())
392
        conn = self.getFakeConnection(storage_uuid, ('127.0.0.1', self.storage_port+1))
393
        lptid = self.app.pt.getID()
394 395 396
        self.assertEquals(self.app.nm.getNodeByUUID(storage_uuid).getState(), RUNNING_STATE)
        service.connectionClosed(conn)
        self.assertEquals(self.app.nm.getNodeByUUID(storage_uuid).getState(), TEMPORARILY_DOWN_STATE) 
397
        self.assertEquals(lptid, self.app.pt.getID())        
398
        # give an uuid, must raise as no other storage node available
399
        conn = self.getFakeConnection(uuid, self.storage_address)
400
        lptid = self.app.pt.getID()
401 402 403
        self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), RUNNING_STATE) 
        self.assertRaises(OperationFailure, service.connectionClosed, conn)
        self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), TEMPORARILY_DOWN_STATE) 
404
        self.assertEquals(lptid, self.app.pt.getID())
405 406 407
        # give a client uuid which have unfinished transactions
        client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE,
                                                port = self.client_port)
408
        conn = self.getFakeConnection(client_uuid, self.client_address)
409
        lptid = self.app.pt.getID()
410 411 412 413
        packet = Packet(msg_type=ASK_BEGIN_TRANSACTION)
        service.handleAskBeginTransaction(conn, packet)
        service.handleAskBeginTransaction(conn, packet)
        service.handleAskBeginTransaction(conn, packet)
414 415 416 417 418
        self.assertEquals(self.app.nm.getNodeByUUID(client_uuid).getState(), RUNNING_STATE)
        self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 3)
        service.connectionClosed(conn)
        # node must be have been remove, and no more transaction must remains
        self.assertEquals(self.app.nm.getNodeByUUID(client_uuid), None) 
419
        self.assertEquals(lptid, self.app.pt.getID())
420 421 422 423 424 425 426 427
        self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 0)




if __name__ == '__main__':
    unittest.main()