testStorage.py 16.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
#
# Copyright (C) 2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

import ZODB
import unittest
import transaction
from Persistence import Persistent

23
from neo.tests.functional import NEOCluster, NEOFunctionalTest
24
from neo.client.Storage import Storage as NEOStorage
25
from neo import protocol
26
from neo.protocol import ClusterStates
27 28 29 30 31 32 33

class PObject(Persistent):
    
    def __init__(self, value):
        self.value = value


34 35
OBJECT_NUMBER = 100

36
class StorageTests(NEOFunctionalTest):
37 38

    def setUp(self):
39
        NEOFunctionalTest.setUp(self)
40
        self.neo = None
41 42

    def tearDown(self):
43 44
        if self.neo is not None:
            self.neo.stop()
45 46 47 48 49

    def queryCount(self, db, query):
        db.query(query)
        result = db.store_result().fetch_row()[0][0]
        return result
50

51 52
    def __setup(self, storage_number=2, pending_number=0, replicas=1, 
            partitions=10, master_node_count=2):
53
        # create a neo cluster
54
        self.neo = NEOCluster(['test_neo%d' % i for i in xrange(storage_number)],
55 56
            port_base=20000, master_node_count=master_node_count,
            partitions=partitions, replicas=replicas,
57
            temp_dir=self.getTempDirectory(),
58 59 60
        )
        self.neo.setupDB()
        # too many pending storage nodes requested
Vincent Pelletier's avatar
Vincent Pelletier committed
61
        assert pending_number <= storage_number
62 63 64 65 66 67 68 69 70
        storage_processes  = self.neo.getStorageProcessList()
        start_storage_number = len(storage_processes) - pending_number
        # return a tuple of storage processes lists
        started_processes = storage_processes[:start_storage_number]
        stopped_processes = storage_processes[start_storage_number:]
        self.neo.start(except_storages=stopped_processes)
        return (started_processes, stopped_processes)

    def __populate(self):
71
        db, conn = self.neo.getZODBConnection()
72 73 74 75 76
        root = conn.root()
        for i in xrange(OBJECT_NUMBER):
            root[i] = PObject(i) 
        transaction.commit()
        conn.close()
77
        db.close()
78

79
    def __checkDatabase(self, db_name):
80
        db = self.neo.getSQLConnection(db_name)
81 82 83
        # wait for the sql transaction to be commited
        def callback(last_try):
            object_number = self.queryCount(db, 'select count(*) from obj')
84
            return object_number == OBJECT_NUMBER + 2, object_number
85
        self.neo.expectCondition(callback)
86 87 88 89 90 91 92 93 94 95 96
        # no more temporarily objects
        t_objects = self.queryCount(db, 'select count(*) from tobj')
        self.assertEqual(t_objects, 0)
        # One revision per object and two for the root, before and after
        revisions = self.queryCount(db, 'select count(*) from obj')
        self.assertEqual(revisions, OBJECT_NUMBER + 2)
        # One object more for the root 
        query = 'select count(*) from (select * from obj group by oid) as t'
        objects = self.queryCount(db, query)
        self.assertEqual(objects, OBJECT_NUMBER + 1)

97
    def __checkReplicationDone(self):
98
        # wait for replication to finish
99
        def expect_all_storages(last_try):
100
            storage_number = len(self.neo.getStorageList())
101 102
            return storage_number == len(self.neo.db_list), storage_number
        self.neo.expectCondition(expect_all_storages, timeout=10)
103
        self.neo.expectOudatedCells(number=0, timeout=10)
104
        # check databases
105 106
        for db_name in self.neo.db_list:
            self.__checkDatabase(db_name)
107 108

        # check storages state
109
        storage_list = self.neo.getStorageList(protocol.RUNNING_STATE)
110 111 112 113 114 115 116 117 118 119 120
        self.assertEqual(len(storage_list), 2)

    def __expectRunning(self, process):
        self.neo.expectStorageState(process.getUUID(), protocol.RUNNING_STATE)

    def __expectPending(self, process):
        self.neo.expectStorageState(process.getUUID(), protocol.PENDING_STATE)
    
    def __expectUnavailable(self, process):
        self.neo.expectStorageState(process.getUUID(),
                protocol.TEMPORARILY_DOWN_STATE)
121 122 123

    def __expectNotKnown(self, process):
        def expected_storage_not_known(last_try):
124
            storage_list = self.neo.getStorageList()
125 126 127 128 129
            for storage in storage_list:
                if storage[2] == process.getUUID():
                    return False, storage
            return True, None
        self.neo.expectCondition(expected_storage_not_known)
130 131
    
    def testReplicationWithoutBreak(self):
132 133
        """ Start a cluster with two storage, one replicas, the two databasqes
        must have the same content """
134

135 136
        # populate the cluster then check the databases
        self.__setup(storage_number=2, replicas=1)
137
        self.neo.expectOudatedCells(number=0)
138 139
        self.__populate()
        self.__checkReplicationDone()
140

141
    def testNewNodesInPendingState(self):
142 143
        """ Check that new storage nodes are set as pending, the cluster remains
        running """
144

145 146 147 148 149
        # start with the first storage
        processes = self.__setup(storage_number=3, replicas=1, pending_number=2)
        started, stopped = processes
        self.__expectRunning(started[0])
        self.neo.expectClusterRunning()
150

151 152 153 154 155 156 157 158 159
        # start the second then the third
        stopped[0].start()
        self.__expectPending(stopped[0])
        self.neo.expectClusterRunning()
        stopped[1].start()
        self.__expectPending(stopped[1])
        self.neo.expectClusterRunning()

    def testReplicationWithNewStorage(self):
160 161 162
        """ create a cluster with one storage, populate it, add a new storage
        then check the database content to ensure the replication process is
        well done """
163 164

        # populate one storage
165 166
        processes = self.__setup(storage_number=2, replicas=1, pending_number=1,
                partitions=10)
167
        started, stopped = processes
168
        self.neo.expectOudatedCells(number=0)
169 170
        self.__populate()
        self.neo.expectClusterRunning()
171
        self.neo.expectAssignedCells(started[0].getUUID(), number=10)
172 173

        # start the second
174 175 176
        stopped[0].start()
        self.__expectPending(stopped[0])
        self.neo.expectClusterRunning()
177 178

        # add it to the partition table
179 180
        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
        self.__expectRunning(stopped[0])
181
        self.neo.expectAssignedCells(stopped[0].getUUID(), number=10)
182
        self.neo.expectClusterRunning()
183

184
        # wait for replication to finish then check 
185 186 187 188
        self.__checkReplicationDone()
        self.neo.expectClusterRunning()

    def testOudatedCellsOnDownStorage(self):
189 190
        """ Check that the storage cells are set as oudated when the node is
        down, the cluster remains up since there is a replica """
191 192 193

        # populate the two storages
        (started, _) = self.__setup(storage_number=2, replicas=1)
194
        self.neo.expectOudatedCells(number=0)
195 196 197 198 199 200 201 202 203 204
        self.__populate()
        self.__checkReplicationDone()
        self.neo.expectClusterRunning()

        # stop one storage and check outdated cells
        started[0].stop()
        self.neo.expectOudatedCells(number=10)
        self.neo.expectClusterRunning()

    def testVerificationTriggered(self):
205 206 207
        """ Check that the verification stage is executed when a storage node
        required to be operationnal is lost, and the cluster come back in
        running state when the storage is up again """
208 209 210 211

        # start neo with one storages
        (started, _) = self.__setup(replicas=0, storage_number=1)
        self.__expectRunning(started[0])
212
        self.neo.expectOudatedCells(number=0)
213 214 215 216 217 218 219 220 221 222 223 224

        # stop it, the cluster must switch to verification
        started[0].stop()
        self.__expectUnavailable(started[0])
        self.neo.expectClusterVeryfing()

        # restart it, the cluster must come back to running state
        started[0].start()
        self.__expectRunning(started[0])
        self.neo.expectClusterRunning()

    def testSequentialStorageKill(self):
225 226
        """ Check that the cluster remains running until the last storage node
        died when all are replicas """
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258

        # start neo with three storages / two replicas
        (started, _) = self.__setup(replicas=2, storage_number=3, partitions=10)
        self.__expectRunning(started[0])
        self.__expectRunning(started[1])
        self.__expectRunning(started[2])
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()

        # stop one storage, cluster must remains running
        started[0].stop()
        self.__expectUnavailable(started[0])
        self.__expectRunning(started[1])
        self.__expectRunning(started[2])
        self.neo.expectOudatedCells(number=10)
        self.neo.expectClusterRunning()

        # stop a second storage, cluster is still running
        started[1].stop()
        self.__expectUnavailable(started[0])
        self.__expectUnavailable(started[1])
        self.__expectRunning(started[2])
        self.neo.expectOudatedCells(number=20)
        self.neo.expectClusterRunning()

        # stop the last, cluster died
        started[2].stop()
        self.__expectUnavailable(started[0])
        self.__expectUnavailable(started[1])
        self.__expectUnavailable(started[2])
        self.neo.expectOudatedCells(number=20)
        self.neo.expectClusterVeryfing()
259 260

    def testConflictingStorageRejected(self):
261 262
        """ Check that a storage coming after the recovery process with the same
        UUID as another already running is refused """
263 264 265 266 267

        # start with one storage
        (started, stopped) = self.__setup(storage_number=2, pending_number=1)
        self.__expectRunning(started[0])
        self.neo.expectClusterRunning()
268
        self.neo.expectOudatedCells(number=0)
269 270 271 272

        # start the second with the same UUID as the first
        stopped[0].setUUID(started[0].getUUID())
        stopped[0].start()
273
        self.neo.expectOudatedCells(number=0)
274 275 276 277 278 279 280 281

        # check the first and the cluster are still running
        self.__expectRunning(started[0])
        self.neo.expectClusterRunning()

        # XXX: should wait for the storage rejection

        # check that no node were added
282
        storage_number = len(self.neo.getStorageList())
283
        self.assertEqual(storage_number, 1)
284

285 286 287 288 289 290 291 292 293 294
    def testPartitionTableReorganizedWithNewStorage(self):
        """ Check if the partition change when adding a new storage to a cluster
        with one storage and no replicas """

        # start with one storage and no replicas
        (started, stopped) = self.__setup(storage_number=2, pending_number=1, 
            partitions=10, replicas=0)
        self.__expectRunning(started[0])
        self.neo.expectClusterRunning()
        self.neo.expectAssignedCells(started[0].getUUID(), 10)
295
        self.neo.expectOudatedCells(number=0)
296 297 298 299 300 301 302

        # start the second and add it to the partition table
        stopped[0].start()
        self.__expectPending(stopped[0])
        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
        self.__expectRunning(stopped[0])
        self.neo.expectClusterRunning()
303
        self.neo.expectOudatedCells(number=0)
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318

        # the partition table must change, each node should be assigned to 
        # five partitions
        self.neo.expectAssignedCells(started[0].getUUID(), 5) 
        self.neo.expectAssignedCells(stopped[0].getUUID(), 5) 

    def testPartitionTableReorganizedAfterDrop(self):
        """ Check that the partition change when dropping a replicas from a
        cluster with two storages """

        # start with two storage / one replicas
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
                partitions=10, pending_number=0)
        self.__expectRunning(started[0])
        self.__expectRunning(started[1])
319
        self.neo.expectOudatedCells(number=0)
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
        self.neo.expectAssignedCells(started[0].getUUID(), 10)
        self.neo.expectAssignedCells(started[1].getUUID(), 10)

        # kill one storage, it should be set as unavailable
        started[0].stop()
        self.__expectUnavailable(started[0])
        self.__expectRunning(started[1])
        # and the partition table must not change
        self.neo.expectAssignedCells(started[0].getUUID(), 10)
        self.neo.expectAssignedCells(started[1].getUUID(), 10)
    
        # ask neoctl to drop it
        self.neo.neoctl.dropNode(started[0].getUUID())
        self.__expectNotKnown(started[0])
        self.neo.expectAssignedCells(started[0].getUUID(), 0)
        self.neo.expectAssignedCells(started[1].getUUID(), 10)

337 338 339 340 341 342 343 344 345
    def testReplicationThenRunningWithReplicas(self):
        """ Add a replicas to a cluster, wait for the replication to finish,
        shutdown the first storage then check the new storage content """

        # start with one storage 
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
                pending_number=1, partitions=10)
        self.__expectRunning(started[0])
        self.__expectNotKnown(stopped[0])
346
        self.neo.expectOudatedCells(number=0)
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385

        # populate the cluster with some data
        self.__populate()
        self.neo.expectClusterRunning()
        self.neo.expectOudatedCells(number=0)
        self.neo.expectAssignedCells(started[0].getUUID(), 10)
        self.__checkDatabase(self.neo.db_list[0])

        # add a second storage
        stopped[0].start()
        self.__expectPending(stopped[0])
        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
        self.__expectRunning(stopped[0])
        self.neo.expectClusterRunning()
        self.neo.expectAssignedCells(started[0].getUUID(), 10)
        self.neo.expectAssignedCells(stopped[0].getUUID(), 10)

        # wait for replication to finish
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()
        self.__checkReplicationDone()

        # kill the first storage
        started[0].stop()
        self.__expectUnavailable(started[0])
        self.neo.expectOudatedCells(number=10)
        self.neo.expectAssignedCells(started[0].getUUID(), 10)
        self.neo.expectAssignedCells(stopped[0].getUUID(), 10)
        self.neo.expectClusterRunning()
        self.__checkDatabase(self.neo.db_list[0])

        # drop it from partition table
        self.neo.neoctl.dropNode(started[0].getUUID())
        self.__expectNotKnown(started[0])
        self.__expectRunning(stopped[0])
        self.neo.expectAssignedCells(started[0].getUUID(), 0)
        self.neo.expectAssignedCells(stopped[0].getUUID(), 10)
        self.__checkDatabase(self.neo.db_list[1])

386
    def testStartWithManyPartitions(self):
387 388 389
        """ Just tests that cluster can start with more than 1000 partitions.
        1000, because currently there is an arbitrary packet split at
        every 1000 partition when sending a partition table. """
390 391
        self.__setup(storage_number=2, partitions=5000, master_node_count=1)
        neoctl = self.neo.getNEOCTL()
392
        self.neo.expectClusterState(ClusterStates.RUNNING)
393

394 395 396 397 398 399 400
    def testDropNodeThenRestartCluster(self):
        """ Start a cluster with more than one storage, down one, shutdown the
        cluster then restart it. The partition table recovered must not include
        the dropped node """

        # start with two storage / one replica
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
401
                master_node_count=1, partitions=10)
402 403
        self.__expectRunning(started[0])
        self.__expectRunning(started[1])
404
        self.neo.expectOudatedCells(number=0)
405 406 407 408 409 410

        # drop one
        self.neo.neoctl.dropNode(started[0].getUUID())
        self.__expectNotKnown(started[0])
        self.__expectRunning(started[1])

411
        # restart all nodes except the dropped, it must not be known
412
        self.neo.stop()
413 414 415
        self.neo.start(except_storages=[started[0]])
        self.__expectNotKnown(started[0])
        self.__expectRunning(started[1])
416

417 418
        # then restart it, it must be in pending state
        started[0].start()
419 420 421 422
        self.__expectPending(started[0])
        self.__expectRunning(started[1])

        
423 424
if __name__ == "__main__":
    unittest.main()