testMasterHandler.py 8.71 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#
# Copyright (C) 2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

import os
import unittest
20
from neo import logging
21 22 23
from struct import pack, unpack
from mock import Mock
from collections import deque
24
from neo.tests import NeoTestBase
25
from neo.storage.app import Application
26
from neo.storage.handlers.master import MasterOperationHandler
27 28
from neo.exception import PrimaryFailure, OperationFailure
from neo.pt import PartitionTable
29
from neo.protocol import CellStates, Packets, Packet
30
from neo.protocol import INVALID_TID, INVALID_OID
31 32 33 34 35 36

class StorageMasterHandlerTests(NeoTestBase):

    def checkHandleUnexpectedPacket(self, _call, _msg_type, _listening=True, **kwargs):
        conn = Mock({ 
            "getAddress" : ("127.0.0.1", self.master_port), 
37
            "isServer": _listening,    
38 39 40 41 42 43 44 45 46
        })
        packet = Packet(msg_type=_msg_type)
        # hook
        self.operation.peerBroken = lambda c: c.peerBrokendCalled()
        self.checkUnexpectedPacketRaised(_call, conn=conn, packet=packet, **kwargs)

    def setUp(self):
        self.prepareDatabase(number=1)
        # create an application object
47 48
        config = self.getStorageConfiguration(master_number=1)
        self.app = Application(**config)
49 50 51 52
        self.app.transaction_dict = {}
        self.app.store_lock_dict = {}
        self.app.load_lock_dict = {}
        self.app.event_queue = deque()
53 54
        for address in self.app.master_node_list:
            self.app.nm.createMaster(address=address)
55 56 57 58
        # handler
        self.operation = MasterOperationHandler(self.app)
        # set pmn
        self.master_uuid = self.getNewUUID()
59
        pmn = self.app.nm.getMasterList()[0]
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
        pmn.setUUID(self.master_uuid)
        self.app.primary_master_node = pmn
        self.master_port = 10010

    def tearDown(self):
        NeoTestBase.tearDown(self)


    def test_06_timeoutExpired(self):
        # client connection
        conn = Mock({
            "getUUID": self.master_uuid,
            "getAddress" : ("127.0.0.1", self.master_port),
        })
        self.assertRaises(PrimaryFailure, self.operation.timeoutExpired, conn)
        self.checkNoPacketSent(conn)

    def test_07_connectionClosed2(self):
        # primary has closed the connection
        conn = Mock({
            "getUUID": self.master_uuid,
            "getAddress" : ("127.0.0.1", self.master_port),
        })
        self.assertRaises(PrimaryFailure, self.operation.connectionClosed, conn)
        self.checkNoPacketSent(conn)

    def test_08_peerBroken(self):
        # client connection
        conn = Mock({
            "getUUID": self.master_uuid,
            "getAddress" : ("127.0.0.1", self.master_port),
        })
        self.assertRaises(PrimaryFailure, self.operation.peerBroken, conn)
        self.checkNoPacketSent(conn)

95
    def test_14_notifyPartitionChanges1(self):
96 97 98
        # old partition change -> do nothing
        app = self.app
        conn = Mock({
99
            "isServer": False,
100 101 102
            "getAddress" : ("127.0.0.1", self.master_port), 
        })
        app.replicator = Mock({})
103
        packet = Packets.NotifyPartitionChanges()
104
        self.app.pt = Mock({'getID': 1})
105
        count = len(self.app.nm.getList())
106
        self.operation.notifyPartitionChanges(conn, packet, 0, ())
107
        self.assertEquals(self.app.pt.getID(), 1)
108
        self.assertEquals(len(self.app.nm.getList()), count)
109 110 111 112 113
        calls = self.app.replicator.mockGetNamedCalls('removePartition')
        self.assertEquals(len(calls), 0)
        calls = self.app.replicator.mockGetNamedCalls('addPartition')
        self.assertEquals(len(calls), 0)

114
    def test_14_notifyPartitionChanges2(self):
115
        # cases :
116
        uuid1, uuid2, uuid3 = [self.getNewUUID() for i in range(3)]
117
        cells = (
118 119 120
            (0, uuid1, CellStates.UP_TO_DATE),
            (1, uuid2, CellStates.DISCARDED),
            (2, uuid3, CellStates.OUT_OF_DATE),
121 122 123
        )
        # context
        conn = Mock({
124
            "isServer": False,
125 126
            "getAddress" : ("127.0.0.1", self.master_port), 
        })
127
        packet = Packets.NotifyPartitionChanges()
128
        app = self.app
129
        # register nodes
130 131 132
        app.nm.createStorage(uuid=uuid1)
        app.nm.createStorage(uuid=uuid2)
        app.nm.createStorage(uuid=uuid3)
133
        ptid1, ptid2 = (1, 2)
134 135 136 137
        self.assertNotEquals(ptid1, ptid2)
        app.pt = PartitionTable(3, 1)
        app.dm = Mock({ })
        app.replicator = Mock({})
138
        count = len(app.nm.getList())
139
        self.operation.notifyPartitionChanges(conn, packet, ptid2, cells)
140
        # ptid set
141
        self.assertEquals(app.pt.getID(), ptid2)
142 143 144 145 146
        # dm call
        calls = self.app.dm.mockGetNamedCalls('changePartitionTable')
        self.assertEquals(len(calls), 1)
        calls[0].checkArgs(ptid2, cells)

147
    def test_16_stopOperation1(self):
148
        # OperationFailure
149
        conn = Mock({ 'isServer': False })
150
        packet = Packets.StopOperation()
151
        self.assertRaises(OperationFailure, self.operation.stopOperation, conn, packet)
152

153
    def test_22_lockInformation2(self):
154
        # load transaction informations
155
        conn = Mock({ 'isServer': False, })
156
        self.app.dm = Mock({ })
157
        packet = Packets.LockInformation()
158
        packet.setId(1)
159 160
        transaction = Mock({ 'getObjectList': ((0, ), ), })
        self.app.transaction_dict[INVALID_TID] = transaction
161
        self.operation.lockInformation(conn, packet, INVALID_TID)
162 163 164 165 166 167
        self.assertEquals(self.app.load_lock_dict[0], INVALID_TID)
        calls = self.app.dm.mockGetNamedCalls('storeTransaction')
        self.assertEquals(len(calls), 1)
        self.checkNotifyInformationLocked(conn, answered_packet=packet)
        # transaction not in transaction_dict -> KeyError
        transaction = Mock({ 'getObjectList': ((0, ), ), })
168
        conn = Mock({ 'isServer': False, })
169
        self.operation.lockInformation(conn, packet, '\x01' * 8)
170 171
        self.checkNotifyInformationLocked(conn, answered_packet=packet)

172
    def test_23_unlockInformation2(self):
173
        # delete transaction informations
174
        conn = Mock({ 'isServer': False, })
175
        self.app.dm = Mock({ })
176
        packet = Packets.LockInformation()
177
        packet.setId(1)
178 179 180 181
        transaction = Mock({ 'getObjectList': ((0, ), ), })
        self.app.transaction_dict[INVALID_TID] = transaction
        self.app.load_lock_dict[0] = transaction
        self.app.store_lock_dict[0] = transaction
182
        self.operation.unlockInformation(conn, packet, INVALID_TID)
183 184 185 186 187 188 189 190
        self.assertEquals(len(self.app.load_lock_dict), 0)
        self.assertEquals(len(self.app.store_lock_dict), 0)
        self.assertEquals(len(self.app.store_lock_dict), 0)
        calls = self.app.dm.mockGetNamedCalls('finishTransaction')
        self.assertEquals(len(calls), 1)
        calls[0].checkArgs(INVALID_TID)
        # transaction not in transaction_dict -> KeyError
        transaction = Mock({ 'getObjectList': ((0, ), ), })
191
        conn = Mock({ 'isServer': False, })
192
        self.operation.lockInformation(conn, packet, '\x01' * 8)
193 194
        self.checkNotifyInformationLocked(conn, answered_packet=packet)

195
    def test_30_answerLastIDs(self):
196 197
        # set critical TID on replicator
        conn = Mock()
198
        packet = Packets.AnswerLastIDs()
199
        self.app.replicator = Mock()
200
        self.operation.answerLastIDs(
201 202 203 204 205 206 207 208 209 210
            conn=conn,
            packet=packet,
            loid=INVALID_OID,
            ltid=INVALID_TID,
            lptid=INVALID_TID,
        )
        calls = self.app.replicator.mockGetNamedCalls('setCriticalTID')
        self.assertEquals(len(calls), 1)
        calls[0].checkArgs(packet, INVALID_TID)

211
    def test_31_answerUnfinishedTransactions(self):
212 213
        # set unfinished TID on replicator
        conn = Mock()
214
        packet = Packets.AnswerUnfinishedTransactions()
215
        self.app.replicator = Mock()
216
        self.operation.answerUnfinishedTransactions(
217 218 219 220 221 222 223 224 225 226
            conn=conn,
            packet=packet,
            tid_list=(INVALID_TID, ),
        )
        calls = self.app.replicator.mockGetNamedCalls('setUnfinishedTIDList')
        self.assertEquals(len(calls), 1)
        calls[0].checkArgs((INVALID_TID, ))

if __name__ == "__main__":
    unittest.main()