testElectionHandler.py 34.4 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#
# Copyright (C) 2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

import os
import unittest
20
from neo import logging
Aurel's avatar
Aurel committed
21 22
from mock import Mock
from struct import pack, unpack
23
from neo.tests import NeoTestBase
24
from neo import protocol
25
from neo.protocol import Packet, INVALID_UUID
26
from neo.master.handlers.election import ClientElectionHandler, ServerElectionHandler
Aurel's avatar
Aurel committed
27 28 29 30 31 32 33 34
from neo.master.app import Application
from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \
     PING, PONG, ASK_PRIMARY_MASTER, ANSWER_PRIMARY_MASTER, ANNOUNCE_PRIMARY_MASTER, \
     REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, START_OPERATION, \
     STOP_OPERATION, ASK_LAST_IDS, ANSWER_LAST_IDS, ASK_PARTITION_TABLE, \
     ANSWER_PARTITION_TABLE, SEND_PARTITION_TABLE, NOTIFY_PARTITION_CHANGES, \
     ASK_UNFINISHED_TRANSACTIONS, ANSWER_UNFINISHED_TRANSACTIONS, \
     ASK_OBJECT_PRESENT, ANSWER_OBJECT_PRESENT, \
35
     DELETE_TRANSACTION, COMMIT_TRANSACTION, ASK_BEGIN_TRANSACTION, ANSWER_BEGIN_TRANSACTION, \
Aurel's avatar
Aurel committed
36 37 38 39 40 41 42
     FINISH_TRANSACTION, NOTIFY_TRANSACTION_FINISHED, LOCK_INFORMATION, \
     NOTIFY_INFORMATION_LOCKED, INVALIDATE_OBJECTS, UNLOCK_INFORMATION, \
     ASK_NEW_OIDS, ANSWER_NEW_OIDS, ASK_STORE_OBJECT, ANSWER_STORE_OBJECT, \
     ABORT_TRANSACTION, ASK_STORE_TRANSACTION, ANSWER_STORE_TRANSACTION, \
     ASK_OBJECT, ANSWER_OBJECT, ASK_TIDS, ANSWER_TIDS, ASK_TRANSACTION_INFORMATION, \
     ANSWER_TRANSACTION_INFORMATION, ASK_OBJECT_HISTORY, ANSWER_OBJECT_HISTORY, \
     ASK_OIDS, ANSWER_OIDS, \
43 44
     NOT_READY_CODE, OID_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \
     PROTOCOL_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \
Aurel's avatar
Aurel committed
45 46 47 48 49 50
     INTERNAL_ERROR_CODE, \
     STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, MASTER_NODE_TYPE, \
     RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
     UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, DISCARDED_STATE
from neo.exception import OperationFailure, ElectionFailure     
from neo.node import MasterNode, StorageNode
51
from neo.tests import DoNothingConnector
Aurel's avatar
Aurel committed
52 53
from neo.connection import ClientConnection

Grégory Wisniewski's avatar
Grégory Wisniewski committed
54
# patch connection so that we can register _addPacket messages
Aurel's avatar
Aurel committed
55
# in mock object
Grégory Wisniewski's avatar
Grégory Wisniewski committed
56
def _addPacket(self, packet):
Aurel's avatar
Aurel committed
57
    if self.connector is not None:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
58
        self.connector._addPacket(packet)
Aurel's avatar
Aurel committed
59 60 61 62 63
def expectMessage(self, packet):
    if self.connector is not None:
        self.connector.expectMessage(packet)


64
class MasterClientElectionTests(NeoTestBase):
Aurel's avatar
Aurel committed
65

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
    def setUp(self):
        # create an application object
        config = self.getConfigFile()
        self.app = Application(config, "master1")        
        self.app.pt.clear()
        self.app.em = Mock({"getConnectionList" : []})
        self.app.finishing_transaction_dict = {}
        for server in self.app.master_node_list:
            self.app.nm.add(MasterNode(server = server))
        self.election = ClientElectionHandler(self.app)
        self.app.unconnected_master_node_set = set()
        self.app.negotiating_master_node_set = set()
        for node in self.app.nm.getMasterNodeList():
            self.app.unconnected_master_node_set.add(node.getServer())
            node.setState(RUNNING_STATE)
        # define some variable to simulate client and storage node
        self.client_port = 11022
        self.storage_port = 10021
        self.master_port = 10011
        # apply monkey patches
        self._addPacket = ClientConnection._addPacket
        self.expectMessage = ClientConnection.expectMessage
        ClientConnection._addPacket = _addPacket
        ClientConnection.expectMessage = expectMessage
        
    def tearDown(self):
92 93 94
        # restore patched methods
        ClientConnection._addPacket = self._addPacket
        ClientConnection.expectMessage = self.expectMessage
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
        NeoTestBase.tearDown(self)

    def test_01_connectionStarted(self):
        uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port)
        conn = Mock({"getUUID" : uuid,
                     "getAddress" : ("127.0.0.1", self.master_port)})
        self.assertEqual(len(self.app.unconnected_master_node_set), 1)
        self.assertEqual(len(self.app.negotiating_master_node_set), 0)
        self.election.connectionStarted(conn)
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
        

    def test_02_connectionCompleted(self):
        uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port)
        conn = Mock({"getUUID" : uuid,
                     "getAddress" : ("127.0.0.1", self.master_port)})
        self.election.connectionCompleted(conn)
        self.checkCalledRequestNodeIdentification(conn)
    

    def test_03_connectionFailed(self):
        uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port)
        conn = Mock({"getUUID" : uuid,
                     "getAddress" : ("127.0.0.1", self.master_port)})
        self.assertEqual(len(self.app.unconnected_master_node_set), 1)
        self.assertEqual(len(self.app.negotiating_master_node_set), 0)
        self.election.connectionStarted(conn)
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), RUNNING_STATE)
        self.election.connectionFailed(conn)
        self.assertEqual(len(self.app.unconnected_master_node_set), 1)
        self.assertEqual(len(self.app.negotiating_master_node_set), 0)
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), TEMPORARILY_DOWN_STATE)


class MasterServerElectionTests(NeoTestBase):
Aurel's avatar
Aurel committed
133 134 135

    def setUp(self):
        # create an application object
136 137
        config = self.getConfigFile()
        self.app = Application(config, "master1")        
Aurel's avatar
Aurel committed
138
        self.app.pt.clear()
139
        self.app.em = Mock({"getConnectionList" : []})
Aurel's avatar
Aurel committed
140 141 142
        self.app.finishing_transaction_dict = {}
        for server in self.app.master_node_list:
            self.app.nm.add(MasterNode(server = server))
143
        self.election = ServerElectionHandler(self.app)
Aurel's avatar
Aurel committed
144 145 146 147 148 149 150 151 152
        self.app.unconnected_master_node_set = set()
        self.app.negotiating_master_node_set = set()
        for node in self.app.nm.getMasterNodeList():
            self.app.unconnected_master_node_set.add(node.getServer())
            node.setState(RUNNING_STATE)
        # define some variable to simulate client and storage node
        self.client_port = 11022
        self.storage_port = 10021
        self.master_port = 10011
153 154 155 156 157
        # apply monkey patches
        self._addPacket = ClientConnection._addPacket
        self.expectMessage = ClientConnection.expectMessage
        ClientConnection._addPacket = _addPacket
        ClientConnection.expectMessage = expectMessage
Aurel's avatar
Aurel committed
158 159
        
    def tearDown(self):
160
        NeoTestBase.tearDown(self)
161 162 163
        # restore environnement
        ClientConnection._addPacket = self._addPacket
        ClientConnection.expectMessage = self.expectMessage
Aurel's avatar
Aurel committed
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185

    # Common methods
    def getNewUUID(self):
        uuid = INVALID_UUID
        while uuid == INVALID_UUID:
            uuid = os.urandom(16)
        self.uuid = uuid
        return uuid

    def getLastUUID(self):
        return self.uuid

    def identifyToMasterNode(self, node_type=STORAGE_NODE_TYPE, ip="127.0.0.1",
                             port=10021):
        """Do first step of identification to MN
        """
        uuid = self.getNewUUID()
        return uuid

    # Method to test the kind of packet returned in answer
    def checkCalledRequestNodeIdentification(self, conn, packet_number=0):
        """ Check Request Node Identification has been send"""
186
        self.assertEquals(len(conn.mockGetNamedCalls("ask")), 1)
Aurel's avatar
Aurel committed
187
        self.assertEquals(len(conn.mockGetNamedCalls("abort")), 0)
188
        call = conn.mockGetNamedCalls("ask")[packet_number]
Aurel's avatar
Aurel committed
189 190 191 192 193 194 195
        packet = call.getParam(0)
        self.assertTrue(isinstance(packet, Packet))
        self.assertEquals(packet.getType(), REQUEST_NODE_IDENTIFICATION)

        
    def checkCalledAskPrimaryMaster(self, conn, packet_number=0):
        """ Check ask primary master has been send"""
Grégory Wisniewski's avatar
Grégory Wisniewski committed
196
        call = conn.mockGetNamedCalls("_addPacket")[packet_number]
Aurel's avatar
Aurel committed
197 198 199 200
        packet = call.getParam(0)
        self.assertTrue(isinstance(packet, Packet))
        self.assertEquals(packet.getType(),ASK_PRIMARY_MASTER)

Aurel's avatar
Aurel committed
201 202
    def checkCalledAnswerPrimaryMaster(self, conn, packet_number=0):
        """ Check Answer primaty master message has been send"""
203
        call = conn.mockGetNamedCalls("answer")[packet_number]
Aurel's avatar
Aurel committed
204 205 206 207
        packet = call.getParam(0)
        self.assertTrue(isinstance(packet, Packet))
        self.assertEquals(packet.getType(), ANSWER_PRIMARY_MASTER)

Aurel's avatar
Aurel committed
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
    # Tests

    def test_04_connectionClosed(self):
        uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port)
        conn = ClientConnection(self.app.em, self.election, addr = ("127.0.0.1", self.master_port),
                                connector_handler = DoNothingConnector)
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), RUNNING_STATE)
        self.election.connectionClosed(conn)
        self.assertEqual(len(self.app.unconnected_master_node_set), 1)
        self.assertEqual(len(self.app.negotiating_master_node_set), 0)
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), TEMPORARILY_DOWN_STATE)

    def test_05_timeoutExpired(self):
        uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port)
        conn = ClientConnection(self.app.em, self.election, addr = ("127.0.0.1", self.master_port),
                                connector_handler = DoNothingConnector)
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), RUNNING_STATE)
        self.election.timeoutExpired(conn)
        self.assertEqual(len(self.app.unconnected_master_node_set), 1)
        self.assertEqual(len(self.app.negotiating_master_node_set), 0)
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), TEMPORARILY_DOWN_STATE)

    def test_06_peerBroken1(self):
        uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port)
        conn = ClientConnection(self.app.em, self.election, addr = ("127.0.0.1", self.master_port),
                                connector_handler = DoNothingConnector)
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), RUNNING_STATE)
        self.election.peerBroken(conn)
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 0)
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), DOWN_STATE)

    def test_06_peerBroken2(self):
        uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port)
        # Without a client connection
        conn = Mock({"getUUID" : uuid,
250
                     "isServer" : True,
Aurel's avatar
Aurel committed
251 252 253 254 255 256 257 258 259 260 261 262 263 264
                     "getAddress" : ("127.0.0.1", self.master_port),})
        self.assertEqual(len(self.app.unconnected_master_node_set), 1)
        self.assertEqual(len(self.app.negotiating_master_node_set), 0)
        self.election.connectionStarted(conn)
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), RUNNING_STATE)
        self.election.peerBroken(conn)
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), BROKEN_STATE)

    def test_07_packetReceived(self):
        uuid = self.identifyToMasterNode(node_type=MASTER_NODE_TYPE, port=self.master_port)
265
        p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE, uuid,
266
                       "127.0.0.1", self.master_port, 1009, 2, self.app.uuid)
Aurel's avatar
Aurel committed
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284

        conn = ClientConnection(self.app.em, self.election, addr = ("127.0.0.1", self.master_port),
                                connector_handler = DoNothingConnector)
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
        node = self.app.nm.getNodeByServer(conn.getAddress())
        node.setState(DOWN_STATE)
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), DOWN_STATE)
        self.election.packetReceived(conn, p)
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getState(), RUNNING_STATE)

    def test_08_handleAcceptNodeIdentification1(self):
        # test with storage node, must be rejected
        uuid = self.getNewUUID()
        conn = ClientConnection(self.app.em, self.election, addr = ("127.0.0.1", self.master_port),
                                connector_handler = DoNothingConnector)
285
        args = (MASTER_NODE_TYPE, uuid, '127.0.0.1', self.master_port,
286
                self.app.pt.getPartitions(), self.app.pt.getReplicas(), self.app.uuid)
287
        p = protocol.acceptNodeIdentification(*args)
Aurel's avatar
Aurel committed
288 289 290 291
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getUUID(), None)
        self.assertEqual(conn.getUUID(), None)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
292
        self.assertEqual(len(conn.getConnector().mockGetNamedCalls("_addPacket")),1)
Aurel's avatar
Aurel committed
293 294
        self.election.handleAcceptNodeIdentification(conn, p, STORAGE_NODE_TYPE,
                                                     uuid, "127.0.0.1", self.master_port,
295 296
                                                     self.app.pt.getPartitions(), 
                                                     self.app.pt.getReplicas(),
297
                                                     self.app.uuid
Aurel's avatar
Aurel committed
298 299 300 301 302 303 304 305 306 307
                                                     )
        self.assertEqual(conn.getConnector(), None)
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 0)
        
    def test_08_handleAcceptNodeIdentification2(self):
        # test with bad address, must be rejected
        uuid = self.getNewUUID()
        conn = ClientConnection(self.app.em, self.election, addr = ("127.0.0.1", self.master_port),
                                connector_handler = DoNothingConnector)
308
        args = (MASTER_NODE_TYPE, uuid, '127.0.0.1', self.master_port,
309
                self.app.pt.getPartitions(), self.app.pt.getReplicas(), self.app.uuid)
310
        p = protocol.acceptNodeIdentification(*args)
Aurel's avatar
Aurel committed
311 312 313 314
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getUUID(), None)
        self.assertEqual(conn.getUUID(), None)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
315
        self.assertEqual(len(conn.getConnector().mockGetNamedCalls("_addPacket")),1)
Aurel's avatar
Aurel committed
316 317
        self.election.handleAcceptNodeIdentification(conn, p, STORAGE_NODE_TYPE,
                                                     uuid, "127.0.0.2", self.master_port,
318 319
                                                     self.app.pt.getPartitions(), 
                                                     self.app.pt.getReplicas(),
320
                                                     self.app.uuid)
Aurel's avatar
Aurel committed
321 322 323 324 325 326 327
        self.assertEqual(conn.getConnector(), None)

    def test_08_handleAcceptNodeIdentification3(self):
        # test with master node, must be ok
        uuid = self.getNewUUID()
        conn = ClientConnection(self.app.em, self.election, addr = ("127.0.0.1", self.master_port),
                                connector_handler = DoNothingConnector)
328
        args = (MASTER_NODE_TYPE, uuid, '127.0.0.1', self.master_port,
329
                self.app.pt.getPartitions(), self.app.pt.getReplicas(), self.app.uuid)
330
        p = protocol.acceptNodeIdentification(*args)
Aurel's avatar
Aurel committed
331 332 333 334
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getUUID(), None)
        self.assertEqual(conn.getUUID(), None)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
335
        self.assertEqual(len(conn.getConnector().mockGetNamedCalls("_addPacket")),1)
Aurel's avatar
Aurel committed
336 337 338

        self.election.handleAcceptNodeIdentification(conn, p, MASTER_NODE_TYPE,
                                                     uuid, "127.0.0.1", self.master_port,
339 340
                                                     self.app.pt.getPartitions(), 
                                                     self.app.pt.getReplicas(),
341
                                                     self.app.uuid)
Aurel's avatar
Aurel committed
342 343
        self.assertEqual(self.app.nm.getNodeByServer(conn.getAddress()).getUUID(), uuid)
        self.assertEqual(conn.getUUID(), uuid)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
344
        self.assertEqual(len(conn.getConnector().mockGetNamedCalls("_addPacket")),2)
345
        self.checkCalledAskPrimaryMaster(conn.getConnector(), 1)
Aurel's avatar
Aurel committed
346 347 348 349 350 351 352 353 354 355
        

    def test_09_handleAnswerPrimaryMaster1(self):
        # test with master node and greater uuid
        uuid = self.getNewUUID()
        while uuid < self.app.uuid:
            uuid = self.getNewUUID()
        conn = ClientConnection(self.app.em, self.election, addr = ("127.0.0.1", self.master_port),
                                connector_handler = DoNothingConnector)
        conn.setUUID(uuid)
356
        p = protocol.askPrimaryMaster()
Aurel's avatar
Aurel committed
357 358
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
359
        self.assertEqual(len(conn.getConnector().mockGetNamedCalls("_addPacket")),1)
Aurel's avatar
Aurel committed
360 361
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 1)
        self.election.handleAnswerPrimaryMaster(conn, p, INVALID_UUID, [])
Grégory Wisniewski's avatar
Grégory Wisniewski committed
362
        self.assertEqual(len(conn.getConnector().mockGetNamedCalls("_addPacket")),1)
Aurel's avatar
Aurel committed
363 364 365 366 367 368 369 370 371 372 373 374 375 376
        self.assertEqual(self.app.primary, False)
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 1)
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 0)


    def test_09_handleAnswerPrimaryMaster2(self):
        # test with master node and lesser uuid
        uuid = self.getNewUUID()
        while uuid > self.app.uuid:
            uuid = self.getNewUUID()
        conn = ClientConnection(self.app.em, self.election, addr = ("127.0.0.1", self.master_port),
                                connector_handler = DoNothingConnector)
        conn.setUUID(uuid)
377
        p = protocol.askPrimaryMaster()
Aurel's avatar
Aurel committed
378 379
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
380
        self.assertEqual(len(conn.getConnector().mockGetNamedCalls("_addPacket")),1)
Aurel's avatar
Aurel committed
381 382
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 1)
        self.election.handleAnswerPrimaryMaster(conn, p, INVALID_UUID, [])
Grégory Wisniewski's avatar
Grégory Wisniewski committed
383
        self.assertEqual(len(conn.getConnector().mockGetNamedCalls("_addPacket")),1)
Aurel's avatar
Aurel committed
384 385 386 387 388 389 390 391 392 393 394 395
        self.assertEqual(self.app.primary, None)
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 1)
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 0)


    def test_09_handleAnswerPrimaryMaster3(self):
        # test with master node and given uuid for PMN
        uuid = self.getNewUUID()
        conn = ClientConnection(self.app.em, self.election, addr = ("127.0.0.1", self.master_port),
                                connector_handler = DoNothingConnector)
        conn.setUUID(uuid)
396
        p = protocol.askPrimaryMaster()
Aurel's avatar
Aurel committed
397 398 399
        self.app.nm.add(MasterNode(("127.0.0.1", self.master_port), uuid))
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
400
        self.assertEqual(len(conn.getConnector().mockGetNamedCalls("_addPacket")),1)
Aurel's avatar
Aurel committed
401 402 403
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 2)
        self.assertEqual(self.app.primary_master_node, None)
        self.election.handleAnswerPrimaryMaster(conn, p, uuid, [])
Grégory Wisniewski's avatar
Grégory Wisniewski committed
404
        self.assertEqual(len(conn.getConnector().mockGetNamedCalls("_addPacket")),1)
Aurel's avatar
Aurel committed
405 406 407 408 409 410 411 412 413 414 415 416 417
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 2)
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 0)
        self.assertNotEqual(self.app.primary_master_node, None)
        self.assertEqual(self.app.primary, False)


    def test_09_handleAnswerPrimaryMaster4(self):
        # test with master node and unknown uuid for PMN
        uuid = self.getNewUUID()
        conn = ClientConnection(self.app.em, self.election, addr = ("127.0.0.1", self.master_port),
                                connector_handler = DoNothingConnector)
        conn.setUUID(uuid)
418
        p = protocol.askPrimaryMaster()
Aurel's avatar
Aurel committed
419 420
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
421
        self.assertEqual(len(conn.getConnector().mockGetNamedCalls("_addPacket")),1)
Aurel's avatar
Aurel committed
422 423 424
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 1)
        self.assertEqual(self.app.primary_master_node, None)
        self.election.handleAnswerPrimaryMaster(conn, p, uuid, [])
Grégory Wisniewski's avatar
Grégory Wisniewski committed
425
        self.assertEqual(len(conn.getConnector().mockGetNamedCalls("_addPacket")),1)
Aurel's avatar
Aurel committed
426 427 428 429 430 431 432 433 434 435 436 437 438
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 1)
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 0)
        self.assertEqual(self.app.primary_master_node, None)
        self.assertEqual(self.app.primary, None)


    def test_09_handleAnswerPrimaryMaster5(self):
        # test with master node and new uuid for PMN
        uuid = self.getNewUUID()
        conn = ClientConnection(self.app.em, self.election, addr = ("127.0.0.1", self.master_port),
                                connector_handler = DoNothingConnector)
        conn.setUUID(uuid)
439
        p = protocol.askPrimaryMaster()
Aurel's avatar
Aurel committed
440 441 442
        self.app.nm.add(MasterNode(("127.0.0.1", self.master_port), uuid))
        self.assertEqual(len(self.app.unconnected_master_node_set), 0)
        self.assertEqual(len(self.app.negotiating_master_node_set), 1)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
443
        self.assertEqual(len(conn.getConnector().mockGetNamedCalls("_addPacket")),1)
Aurel's avatar
Aurel committed
444 445 446 447
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 2)
        self.assertEqual(self.app.primary_master_node, None)
        master_uuid = self.getNewUUID()
        self.election.handleAnswerPrimaryMaster(conn, p, master_uuid, [("127.0.0.1", self.master_port+1, master_uuid),])
Grégory Wisniewski's avatar
Grégory Wisniewski committed
448
        self.assertEqual(len(conn.getConnector().mockGetNamedCalls("_addPacket")),1)
Aurel's avatar
Aurel committed
449 450 451 452 453 454 455 456 457 458 459 460
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 3)
        self.assertEqual(len(self.app.unconnected_master_node_set), 1)
        self.assertEqual(len(self.app.negotiating_master_node_set), 0)
        self.assertNotEqual(self.app.primary_master_node, None)
        self.assertEqual(self.app.primary, False)
        # Now tell it's another node which is primary, it must raise 
        self.assertRaises(ElectionFailure, self.election.handleAnswerPrimaryMaster, conn, p, uuid, [])

        
    def test_10_handleRequestNodeIdentification(self):
        election = self.election
        uuid = self.getNewUUID()
461
        args = (MASTER_NODE_TYPE, uuid, '127.0.0.1', self.storage_port, 'INVALID_NAME')
462
        packet = protocol.requestNodeIdentification(*args)
Aurel's avatar
Aurel committed
463
        # test alien cluster
Grégory Wisniewski's avatar
Grégory Wisniewski committed
464
        conn = Mock({"_addPacket" : None, "abort" : None,
465
                     "isServer" : True})
466 467 468 469 470 471 472 473 474
        self.checkProtocolErrorRaised(
                election.handleRequestNodeIdentification,
                conn,
                packet=packet,
                node_type=MASTER_NODE_TYPE,
                uuid=uuid,
                ip_address='127.0.0.1',
                port=self.storage_port,
                name="INVALID_NAME",)
Aurel's avatar
Aurel committed
475
        # test connection of a storage node
Grégory Wisniewski's avatar
Grégory Wisniewski committed
476
        conn = Mock({"_addPacket" : None, "abort" : None, "expectMessage" : None,
477
                    "isServer" : True})
478 479 480 481 482 483 484 485 486
        self.checkNotReadyErrorRaised(
                election.handleRequestNodeIdentification,
                conn,
                packet=packet,
                node_type=STORAGE_NODE_TYPE,
                uuid=uuid,
                ip_address='127.0.0.1',
                port=self.storage_port,
                name=self.app.name,)
Aurel's avatar
Aurel committed
487 488

        # known node
Grégory Wisniewski's avatar
Grégory Wisniewski committed
489
        conn = Mock({"_addPacket" : None, "abort" : None, "expectMessage" : None,
490
                    "isServer" : True})
Aurel's avatar
Aurel committed
491 492 493 494 495 496 497 498 499 500 501 502 503 504
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 1)
        node = self.app.nm.getMasterNodeList()[0]
        self.assertEqual(node.getUUID(), None)
        self.assertEqual(node.getState(), RUNNING_STATE)
        election.handleRequestNodeIdentification(conn,
                                                packet=packet,
                                                node_type=MASTER_NODE_TYPE,
                                                uuid=uuid,
                                                ip_address='127.0.0.1',
                                                port=self.master_port,
                                                name=self.app.name,)
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 1)
        self.assertEqual(node.getUUID(), uuid)
        self.assertEqual(node.getState(), RUNNING_STATE)
505
        self.checkAcceptNodeIdentification(conn, answered_packet=packet)
Aurel's avatar
Aurel committed
506
        # unknown node
Grégory Wisniewski's avatar
Grégory Wisniewski committed
507
        conn = Mock({"_addPacket" : None, "abort" : None, "expectMessage" : None,
508
                    "isServer" : True})
Aurel's avatar
Aurel committed
509 510 511 512 513 514 515 516 517 518 519 520
        new_uuid = self.getNewUUID()
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 1)
        self.assertEqual(len(self.app.unconnected_master_node_set), 1)
        self.assertEqual(len(self.app.negotiating_master_node_set), 0)
        election.handleRequestNodeIdentification(conn,
                                                packet=packet,
                                                node_type=MASTER_NODE_TYPE,
                                                uuid=new_uuid,
                                                ip_address='127.0.0.1',
                                                port=self.master_port+1,
                                                name=self.app.name,)
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 2)
521
        self.checkAcceptNodeIdentification(conn, answered_packet=packet)
Aurel's avatar
Aurel committed
522 523 524
        self.assertEqual(len(self.app.unconnected_master_node_set), 2)
        self.assertEqual(len(self.app.negotiating_master_node_set), 0)
        # broken node
Grégory Wisniewski's avatar
Grégory Wisniewski committed
525
        conn = Mock({"_addPacket" : None, "abort" : None, "expectMessage" : None,
526
                    "isServer" : True})
Aurel's avatar
Aurel committed
527 528 529 530 531
        node = self.app.nm.getNodeByServer(("127.0.0.1", self.master_port+1))
        self.assertEqual(node.getUUID(), new_uuid)
        self.assertEqual(node.getState(), RUNNING_STATE)
        node.setState(BROKEN_STATE)
        self.assertEqual(node.getState(), BROKEN_STATE)
532
        self.checkBrokenNodeDisallowedErrorRaised(
533 534 535 536 537 538 539 540
                election.handleRequestNodeIdentification,
                conn,
                packet=packet,
                node_type=MASTER_NODE_TYPE,
                uuid=new_uuid,
                ip_address='127.0.0.1',
                port=self.master_port+1,
                name=self.app.name,)        
Aurel's avatar
Aurel committed
541 542 543 544 545


    def test_11_handleAskPrimaryMaster(self):
        election = self.election
        uuid = self.identifyToMasterNode(MASTER_NODE_TYPE, port=self.master_port)
546
        packet = protocol.askPrimaryMaster()
Grégory Wisniewski's avatar
Grégory Wisniewski committed
547
        conn = Mock({"_addPacket" : None,
Aurel's avatar
Aurel committed
548
                     "getUUID" : uuid,
549
                     "isServer" : True,
Aurel's avatar
Aurel committed
550 551 552
                     "getAddress" : ("127.0.0.1", self.master_port)})
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 1)
        election.handleAskPrimaryMaster(conn, packet)        
553
        self.assertEquals(len(conn.mockGetNamedCalls("answer")), 1)
Aurel's avatar
Aurel committed
554
        self.assertEquals(len(conn.mockGetNamedCalls("abort")), 0)
Aurel's avatar
Aurel committed
555
        self.checkCalledAnswerPrimaryMaster(conn, 0)
Aurel's avatar
Aurel committed
556 557 558 559

    def test_12_handleAnnouncePrimaryMaster(self):
        election = self.election
        uuid = self.identifyToMasterNode(MASTER_NODE_TYPE, port=self.master_port)
560
        packet = Packet(msg_type=ANNOUNCE_PRIMARY_MASTER)
Aurel's avatar
Aurel committed
561
        # No uuid
Grégory Wisniewski's avatar
Grégory Wisniewski committed
562
        conn = Mock({"_addPacket" : None,
Aurel's avatar
Aurel committed
563
                     "getUUID" : None,
564
                     "isServer" : True,
Aurel's avatar
Aurel committed
565 566
                     "getAddress" : ("127.0.0.1", self.master_port)})
        self.assertEqual(len(self.app.nm.getMasterNodeList()), 1)
567
        self.checkIdenficationRequired(election.handleAnnouncePrimaryMaster, conn, packet)
Aurel's avatar
Aurel committed
568
        # announce
Grégory Wisniewski's avatar
Grégory Wisniewski committed
569
        conn = Mock({"_addPacket" : None,
Aurel's avatar
Aurel committed
570
                     "getUUID" : uuid,
571
                     "isServer" : True,
Aurel's avatar
Aurel committed
572 573 574 575 576 577 578
                     "getAddress" : ("127.0.0.1", self.master_port)})
        self.assertEqual(self.app.primary, None)
        self.assertEqual(self.app.primary_master_node, None)
        election.handleAnnouncePrimaryMaster(conn, packet)        
        self.assertEqual(self.app.primary, False)
        self.assertNotEqual(self.app.primary_master_node, None)
        # set current as primary, and announce another, must raise
Grégory Wisniewski's avatar
Grégory Wisniewski committed
579
        conn = Mock({"_addPacket" : None,
Aurel's avatar
Aurel committed
580
                     "getUUID" : uuid,
581
                     "isServer" : True,
Aurel's avatar
Aurel committed
582 583 584 585 586 587 588 589 590
                     "getAddress" : ("127.0.0.1", self.master_port)})
        self.app.primary = True
        self.assertEqual(self.app.primary, True)
        self.assertRaises(ElectionFailure, election.handleAnnouncePrimaryMaster, conn, packet)        
        

    def test_13_handleReelectPrimaryMaster(self):
        election = self.election
        uuid = self.identifyToMasterNode(MASTER_NODE_TYPE, port=self.master_port)
591
        packet = protocol.askPrimaryMaster()
Aurel's avatar
Aurel committed
592
        # No uuid
Grégory Wisniewski's avatar
Grégory Wisniewski committed
593
        conn = Mock({"_addPacket" : None,
Aurel's avatar
Aurel committed
594
                     "getUUID" : None,
595
                     "isServer" : True,
Aurel's avatar
Aurel committed
596 597 598 599 600 601
                     "getAddress" : ("127.0.0.1", self.master_port)})
        self.assertRaises(ElectionFailure, election.handleReelectPrimaryMaster, conn, packet)

    def test_14_handleNotifyNodeInformation(self):
        election = self.election
        uuid = self.identifyToMasterNode(MASTER_NODE_TYPE, port=self.master_port)
602
        packet = Packet(msg_type=NOTIFY_NODE_INFORMATION)
Aurel's avatar
Aurel committed
603 604 605 606
        # do not answer if no uuid
        conn = Mock({"getUUID" : None,
                     "getAddress" : ("127.0.0.1", self.master_port)})
        node_list = []
607
        self.checkIdenficationRequired(election.handleNotifyNodeInformation, conn, packet, node_list)
Aurel's avatar
Aurel committed
608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653
        # tell the master node about itself, must do nothing
        conn = Mock({"getUUID" : uuid,
                     "getAddress" : ("127.0.0.1", self.master_port)})                
        node_list = [(MASTER_NODE_TYPE, '127.0.0.1', self.master_port - 1, self.app.uuid, DOWN_STATE),]
        node = self.app.nm.getNodeByServer(("127.0.0.1", self.master_port-1))
        self.assertEqual(node, None)
        election.handleNotifyNodeInformation(conn, packet, node_list)
        node = self.app.nm.getNodeByServer(("127.0.0.1", self.master_port-1))
        self.assertEqual(node, None)
        # tell about a storage node, do nothing
        conn = Mock({"getUUID" : uuid,
                     "getAddress" : ("127.0.0.1", self.master_port)})                
        node_list = [(STORAGE_NODE_TYPE, '127.0.0.1', self.master_port - 1, self.getNewUUID(), DOWN_STATE),]
        self.assertEqual(len(self.app.nm.getStorageNodeList()), 0)
        election.handleNotifyNodeInformation(conn, packet, node_list)
        self.assertEqual(len(self.app.nm.getStorageNodeList()), 0)
        # tell about a client node, do nothing
        conn = Mock({"getUUID" : uuid,
                     "getAddress" : ("127.0.0.1", self.master_port)})                
        node_list = [(CLIENT_NODE_TYPE, '127.0.0.1', self.master_port - 1, self.getNewUUID(), DOWN_STATE),]
        self.assertEqual(len(self.app.nm.getClientNodeList()), 0)
        election.handleNotifyNodeInformation(conn, packet, node_list)
        self.assertEqual(len(self.app.nm.getClientNodeList()), 0)
        # tell about another master node
        conn = Mock({"getUUID" : uuid,
                     "getAddress" : ("127.0.0.1", self.master_port)})                
        node_list = [(MASTER_NODE_TYPE, '127.0.0.1', self.master_port + 1, self.getNewUUID(), RUNNING_STATE),]
        node = self.app.nm.getNodeByServer(("127.0.0.1", self.master_port+1))
        self.assertEqual(node, None)
        election.handleNotifyNodeInformation(conn, packet, node_list)
        node = self.app.nm.getNodeByServer(("127.0.0.1", self.master_port+1))
        self.assertNotEqual(node, None)
        self.assertEqual(node.getServer(), ("127.0.0.1", self.master_port+1))
        self.assertEqual(node.getState(), RUNNING_STATE)
        # tell that node is down
        node_list = [(MASTER_NODE_TYPE, '127.0.0.1', self.master_port + 1, self.getNewUUID(), DOWN_STATE),]
        election.handleNotifyNodeInformation(conn, packet, node_list)
        node = self.app.nm.getNodeByServer(("127.0.0.1", self.master_port+1))
        self.assertNotEqual(node, None)
        self.assertEqual(node.getServer(), ("127.0.0.1", self.master_port+1))
        self.assertEqual(node.getState(), DOWN_STATE)

    
if __name__ == '__main__':
    unittest.main()