testStorage.py 20.4 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2009-2010  Nexedi SA
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 18 19 20 21

import unittest
import transaction
from Persistence import Persistent

22
from neo.tests.functional import NEOCluster, NEOFunctionalTest
23
from neo.protocol import ClusterStates, NodeStates
24 25

class PObject(Persistent):
26

27 28 29 30
    def __init__(self, value):
        self.value = value


31 32
OBJECT_NUMBER = 100

33
class StorageTests(NEOFunctionalTest):
34 35

    def setUp(self):
36
        NEOFunctionalTest.setUp(self)
37
        self.neo = None
38 39

    def tearDown(self):
40 41
        if self.neo is not None:
            self.neo.stop()
42 43 44 45 46

    def queryCount(self, db, query):
        db.query(query)
        result = db.store_result().fetch_row()[0][0]
        return result
47

48
    def __setup(self, storage_number=2, pending_number=0, replicas=1,
49
            partitions=10, master_node_count=2):
50
        # create a neo cluster
51
        self.neo = NEOCluster(['test_neo%d' % i for i in xrange(storage_number)],
52 53
            port_base=20000, master_node_count=master_node_count,
            partitions=partitions, replicas=replicas,
54
            temp_dir=self.getTempDirectory(),
55
            clear_databases=True,
56 57
        )
        # too many pending storage nodes requested
Vincent Pelletier's avatar
Vincent Pelletier committed
58
        assert pending_number <= storage_number
59 60 61 62 63 64 65 66 67
        storage_processes  = self.neo.getStorageProcessList()
        start_storage_number = len(storage_processes) - pending_number
        # return a tuple of storage processes lists
        started_processes = storage_processes[:start_storage_number]
        stopped_processes = storage_processes[start_storage_number:]
        self.neo.start(except_storages=stopped_processes)
        return (started_processes, stopped_processes)

    def __populate(self):
68
        db, conn = self.neo.getZODBConnection()
69 70
        root = conn.root()
        for i in xrange(OBJECT_NUMBER):
71
            root[i] = PObject(i)
72 73
        transaction.commit()
        conn.close()
74
        db.close()
75

76
    def __checkDatabase(self, db_name):
77
        db = self.neo.getSQLConnection(db_name, autocommit=True)
78 79 80
        # wait for the sql transaction to be commited
        def callback(last_try):
            object_number = self.queryCount(db, 'select count(*) from obj')
81
            return object_number == OBJECT_NUMBER + 2, object_number
82
        self.neo.expectCondition(callback)
83 84 85 86 87 88
        # no more temporarily objects
        t_objects = self.queryCount(db, 'select count(*) from tobj')
        self.assertEqual(t_objects, 0)
        # One revision per object and two for the root, before and after
        revisions = self.queryCount(db, 'select count(*) from obj')
        self.assertEqual(revisions, OBJECT_NUMBER + 2)
89
        # One object more for the root
90 91 92 93
        query = 'select count(*) from (select * from obj group by oid) as t'
        objects = self.queryCount(db, query)
        self.assertEqual(objects, OBJECT_NUMBER + 1)

94
    def __checkReplicationDone(self):
95
        # wait for replication to finish
96
        def expect_all_storages(last_try):
97
            storage_number = len(self.neo.getStorageList())
98 99
            return storage_number == len(self.neo.db_list), storage_number
        self.neo.expectCondition(expect_all_storages, timeout=10)
100
        self.neo.expectOudatedCells(number=0, timeout=10)
101
        # check databases
102 103
        for db_name in self.neo.db_list:
            self.__checkDatabase(db_name)
104 105

        # check storages state
106
        storage_list = self.neo.getStorageList(NodeStates.RUNNING)
107 108
        self.assertEqual(len(storage_list), 2)

109
    def __checkReplicateCount(self, db_name, target_count, timeout=0, delay=1):
110
        db = self.neo.getSQLConnection(db_name, autocommit=True)
111 112 113 114 115 116 117 118 119
        def callback(last_try):
            replicate_count = self.queryCount(db,
                'select count(distinct uuid) from pt')
            if last_try is not None and last_try < replicate_count:
                raise AssertionError, 'Regression: %s became %s' % \
                    (last_try, replicate_count)
            return replicate_count == target_count, replicate_count
        self.neo.expectCondition(callback, timeout, delay)

120
    def __expectRunning(self, process):
121
        self.neo.expectStorageState(process.getUUID(), NodeStates.RUNNING)
122 123

    def __expectPending(self, process):
124
        self.neo.expectStorageState(process.getUUID(), NodeStates.PENDING)
125

126 127 128
    def __expectUnknown(self, process):
        self.neo.expectStorageState(process.getUUID(), NodeStates.UNKNOWN)

129 130
    def __expectUnavailable(self, process):
        self.neo.expectStorageState(process.getUUID(),
131
                NodeStates.TEMPORARILY_DOWN)
132

133
    def testReplicationWithoutBreak(self):
134 135
        """ Start a cluster with two storage, one replicas, the two databasqes
        must have the same content """
136

137
        # populate the cluster then check the databases
138 139 140
        (started, _) = self.__setup(storage_number=2, replicas=1)
        self.__expectRunning(started[0])
        self.__expectRunning(started[1])
141
        self.neo.expectOudatedCells(number=0)
142 143
        self.__populate()
        self.__checkReplicationDone()
144

145
    def testNewNodesInPendingState(self):
146 147
        """ Check that new storage nodes are set as pending, the cluster remains
        running """
148

149 150 151 152 153
        # start with the first storage
        processes = self.__setup(storage_number=3, replicas=1, pending_number=2)
        started, stopped = processes
        self.__expectRunning(started[0])
        self.neo.expectClusterRunning()
154

155 156 157 158 159 160 161 162 163
        # start the second then the third
        stopped[0].start()
        self.__expectPending(stopped[0])
        self.neo.expectClusterRunning()
        stopped[1].start()
        self.__expectPending(stopped[1])
        self.neo.expectClusterRunning()

    def testReplicationWithNewStorage(self):
164 165 166
        """ create a cluster with one storage, populate it, add a new storage
        then check the database content to ensure the replication process is
        well done """
167 168

        # populate one storage
169 170
        processes = self.__setup(storage_number=2, replicas=1, pending_number=1,
                partitions=10)
171
        started, stopped = processes
172
        self.neo.expectOudatedCells(number=0)
173 174
        self.__populate()
        self.neo.expectClusterRunning()
175
        self.neo.expectAssignedCells(started[0], number=10)
176 177

        # start the second
178 179 180
        stopped[0].start()
        self.__expectPending(stopped[0])
        self.neo.expectClusterRunning()
181 182

        # add it to the partition table
183 184
        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
        self.__expectRunning(stopped[0])
185
        self.neo.expectAssignedCells(stopped[0], number=10)
186
        self.neo.expectClusterRunning()
187

188
        # wait for replication to finish then check
189 190 191 192
        self.__checkReplicationDone()
        self.neo.expectClusterRunning()

    def testOudatedCellsOnDownStorage(self):
193 194
        """ Check that the storage cells are set as oudated when the node is
        down, the cluster remains up since there is a replica """
195 196 197

        # populate the two storages
        (started, _) = self.__setup(storage_number=2, replicas=1)
198 199
        self.__expectRunning(started[0])
        self.__expectRunning(started[1])
200
        self.neo.expectOudatedCells(number=0)
201 202 203 204 205 206 207 208 209 210
        self.__populate()
        self.__checkReplicationDone()
        self.neo.expectClusterRunning()

        # stop one storage and check outdated cells
        started[0].stop()
        self.neo.expectOudatedCells(number=10)
        self.neo.expectClusterRunning()

    def testVerificationTriggered(self):
211 212 213
        """ Check that the verification stage is executed when a storage node
        required to be operationnal is lost, and the cluster come back in
        running state when the storage is up again """
214 215 216 217

        # start neo with one storages
        (started, _) = self.__setup(replicas=0, storage_number=1)
        self.__expectRunning(started[0])
218
        self.neo.expectOudatedCells(number=0)
219 220 221 222 223
        # add a client node
        db, conn = self.neo.getZODBConnection()
        root = conn.root()['test'] = 'ok'
        transaction.commit()
        self.assertEqual(len(self.neo.getClientlist()), 1)
224 225 226 227 228

        # stop it, the cluster must switch to verification
        started[0].stop()
        self.__expectUnavailable(started[0])
        self.neo.expectClusterVeryfing()
229 230 231 232
        # client must have been disconnected
        self.assertEqual(len(self.neo.getClientlist()), 0)
        conn.close()
        db.close()
233 234 235 236 237 238 239

        # restart it, the cluster must come back to running state
        started[0].start()
        self.__expectRunning(started[0])
        self.neo.expectClusterRunning()

    def testSequentialStorageKill(self):
240 241
        """ Check that the cluster remains running until the last storage node
        died when all are replicas """
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273

        # start neo with three storages / two replicas
        (started, _) = self.__setup(replicas=2, storage_number=3, partitions=10)
        self.__expectRunning(started[0])
        self.__expectRunning(started[1])
        self.__expectRunning(started[2])
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()

        # stop one storage, cluster must remains running
        started[0].stop()
        self.__expectUnavailable(started[0])
        self.__expectRunning(started[1])
        self.__expectRunning(started[2])
        self.neo.expectOudatedCells(number=10)
        self.neo.expectClusterRunning()

        # stop a second storage, cluster is still running
        started[1].stop()
        self.__expectUnavailable(started[0])
        self.__expectUnavailable(started[1])
        self.__expectRunning(started[2])
        self.neo.expectOudatedCells(number=20)
        self.neo.expectClusterRunning()

        # stop the last, cluster died
        started[2].stop()
        self.__expectUnavailable(started[0])
        self.__expectUnavailable(started[1])
        self.__expectUnavailable(started[2])
        self.neo.expectOudatedCells(number=20)
        self.neo.expectClusterVeryfing()
274 275

    def testConflictingStorageRejected(self):
276 277
        """ Check that a storage coming after the recovery process with the same
        UUID as another already running is refused """
278 279 280 281 282

        # start with one storage
        (started, stopped) = self.__setup(storage_number=2, pending_number=1)
        self.__expectRunning(started[0])
        self.neo.expectClusterRunning()
283
        self.neo.expectOudatedCells(number=0)
284 285 286 287

        # start the second with the same UUID as the first
        stopped[0].setUUID(started[0].getUUID())
        stopped[0].start()
288
        self.neo.expectOudatedCells(number=0)
289 290 291 292 293 294 295 296

        # check the first and the cluster are still running
        self.__expectRunning(started[0])
        self.neo.expectClusterRunning()

        # XXX: should wait for the storage rejection

        # check that no node were added
297
        storage_number = len(self.neo.getStorageList())
298
        self.assertEqual(storage_number, 1)
299

300 301 302 303 304
    def testPartitionTableReorganizedWithNewStorage(self):
        """ Check if the partition change when adding a new storage to a cluster
        with one storage and no replicas """

        # start with one storage and no replicas
305
        (started, stopped) = self.__setup(storage_number=2, pending_number=1,
306 307 308
            partitions=10, replicas=0)
        self.__expectRunning(started[0])
        self.neo.expectClusterRunning()
309
        self.neo.expectAssignedCells(started[0], 10)
310
        self.neo.expectOudatedCells(number=0)
311 312 313 314 315 316 317

        # start the second and add it to the partition table
        stopped[0].start()
        self.__expectPending(stopped[0])
        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
        self.__expectRunning(stopped[0])
        self.neo.expectClusterRunning()
318
        self.neo.expectOudatedCells(number=0)
319

320
        # the partition table must change, each node should be assigned to
321
        # five partitions
322 323
        self.neo.expectAssignedCells(started[0], 5)
        self.neo.expectAssignedCells(stopped[0], 5)
324 325 326 327 328 329 330 331 332 333

    def testPartitionTableReorganizedAfterDrop(self):
        """ Check that the partition change when dropping a replicas from a
        cluster with two storages """

        # start with two storage / one replicas
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
                partitions=10, pending_number=0)
        self.__expectRunning(started[0])
        self.__expectRunning(started[1])
334
        self.neo.expectOudatedCells(number=0)
335 336
        self.neo.expectAssignedCells(started[0], 10)
        self.neo.expectAssignedCells(started[1], 10)
337 338 339 340 341 342

        # kill one storage, it should be set as unavailable
        started[0].stop()
        self.__expectUnavailable(started[0])
        self.__expectRunning(started[1])
        # and the partition table must not change
343 344
        self.neo.expectAssignedCells(started[0], 10)
        self.neo.expectAssignedCells(started[1], 10)
345

346 347
        # ask neoctl to drop it
        self.neo.neoctl.dropNode(started[0].getUUID())
348
        self.neo.expectStorageNotKnown(started[0])
349 350
        self.neo.expectAssignedCells(started[0], 0)
        self.neo.expectAssignedCells(started[1], 10)
351

352 353 354 355
    def testReplicationThenRunningWithReplicas(self):
        """ Add a replicas to a cluster, wait for the replication to finish,
        shutdown the first storage then check the new storage content """

356
        # start with one storage
357 358 359
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
                pending_number=1, partitions=10)
        self.__expectRunning(started[0])
360
        self.neo.expectStorageNotKnown(stopped[0])
361
        self.neo.expectOudatedCells(number=0)
362 363 364 365 366

        # populate the cluster with some data
        self.__populate()
        self.neo.expectClusterRunning()
        self.neo.expectOudatedCells(number=0)
367
        self.neo.expectAssignedCells(started[0], 10)
368 369 370 371 372 373 374 375
        self.__checkDatabase(self.neo.db_list[0])

        # add a second storage
        stopped[0].start()
        self.__expectPending(stopped[0])
        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
        self.__expectRunning(stopped[0])
        self.neo.expectClusterRunning()
376 377
        self.neo.expectAssignedCells(started[0], 10)
        self.neo.expectAssignedCells(stopped[0], 10)
378 379 380 381 382 383 384 385 386 387

        # wait for replication to finish
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()
        self.__checkReplicationDone()

        # kill the first storage
        started[0].stop()
        self.__expectUnavailable(started[0])
        self.neo.expectOudatedCells(number=10)
388 389
        self.neo.expectAssignedCells(started[0], 10)
        self.neo.expectAssignedCells(stopped[0], 10)
390 391 392 393 394
        self.neo.expectClusterRunning()
        self.__checkDatabase(self.neo.db_list[0])

        # drop it from partition table
        self.neo.neoctl.dropNode(started[0].getUUID())
395
        self.neo.expectStorageNotKnown(started[0])
396
        self.__expectRunning(stopped[0])
397 398
        self.neo.expectAssignedCells(started[0], 0)
        self.neo.expectAssignedCells(stopped[0], 10)
399 400
        self.__checkDatabase(self.neo.db_list[1])

401
    def testStartWithManyPartitions(self):
402 403 404
        """ Just tests that cluster can start with more than 1000 partitions.
        1000, because currently there is an arbitrary packet split at
        every 1000 partition when sending a partition table. """
405
        self.__setup(storage_number=2, partitions=5000, master_node_count=1)
406
        self.neo.expectClusterState(ClusterStates.RUNNING)
407

408 409 410 411 412 413 414
    def testDropNodeThenRestartCluster(self):
        """ Start a cluster with more than one storage, down one, shutdown the
        cluster then restart it. The partition table recovered must not include
        the dropped node """

        # start with two storage / one replica
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
415
                master_node_count=1, partitions=10)
416 417
        self.__expectRunning(started[0])
        self.__expectRunning(started[1])
418
        self.neo.expectOudatedCells(number=0)
419 420 421

        # drop one
        self.neo.neoctl.dropNode(started[0].getUUID())
422
        self.neo.expectStorageNotKnown(started[0])
423 424
        self.__expectRunning(started[1])

425 426 427
        # wait for running storage to store new partition table
        self.__checkReplicateCount(self.neo.db_list[1], 1)

428
        # restart all nodes except the dropped, it must not be known
429
        self.neo.stop()
430
        self.neo.start(except_storages=[started[0]])
431
        self.neo.expectStorageNotKnown(started[0])
432
        self.__expectRunning(started[1])
433

434 435
        # then restart it, it must be in pending state
        started[0].start()
436 437 438
        self.__expectPending(started[0])
        self.__expectRunning(started[1])

439 440 441 442 443 444 445
    def testAcceptFirstEmptyStorageAfterStartupAllowed(self):
        """ Create a new cluster with no storage node, allow it to starts
        then run the first empty storage, it must be accepted """
        (started, stopped) = self.__setup(storage_number=1, replicas=0,
                pending_number=1, partitions=10)
        # start without storage
        self.neo.expectClusterRecovering()
446
        self.neo.expectStorageNotKnown(stopped[0])
447 448 449 450 451 452
        # start the empty storage, it must be accepted
        stopped[0].start(with_uuid=False)
        self.neo.expectClusterRunning()
        self.assertEqual(len(self.neo.getStorageList()), 1)
        self.neo.expectOudatedCells(number=0)

453 454 455 456 457 458
    def testDropNodeWithOtherPending(self):
        """ Ensure we can drop a node """
        # start with one storage
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
                pending_number=1, partitions=10)
        self.__expectRunning(started[0])
459
        self.neo.expectStorageNotKnown(stopped[0])
460 461 462 463 464 465 466
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()

        # set the second storage in pending state and drop the first
        stopped[0].start()
        self.__expectPending(stopped[0])
        self.neo.neoctl.dropNode(started[0].getUUID())
467
        self.neo.expectStorageNotKnown(started[0])
468
        self.__expectPending(stopped[0])
469

470 471 472 473 474 475 476 477
    def testRestartWithMissingStorage(self):
        # start a cluster with a replica
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
                pending_number=0, partitions=10)
        self.__expectRunning(started[0])
        self.__expectRunning(started[1])
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()
478 479
        # XXX: need to sync with storages first
        self.neo.stop()
480 481 482 483 484 485 486

        # restart it with one storage only
        self.neo.start(except_storages=(started[1], ))
        self.__expectRunning(started[0])
        self.__expectUnknown(started[1])
        self.neo.expectClusterRunning()

487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
    def testRecoveryWithMultiplePT(self):
        # start a cluster with 2 storages and a replica
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
                pending_number=0, partitions=10)
        self.__expectRunning(started[0])
        self.__expectRunning(started[1])
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()

        # drop the first then the second storage
        started[0].stop()
        self.__expectUnavailable(started[0])
        self.__expectRunning(started[1])
        self.neo.expectOudatedCells(number=10)
        started[1].stop()
        self.__expectUnavailable(started[0])
        self.__expectUnavailable(started[1])
        self.neo.expectOudatedCells(number=10)
        self.neo.expectClusterVeryfing()
        # XXX: need to sync with storages first
        self.neo.stop()

        # restart the cluster with the first storage killed
        self.neo.run(except_storages=[started[1]])
        self.__expectRunning(started[0])
        self.__expectUnknown(started[1])
        self.neo.expectClusterRecovering()
        self.neo.expectOudatedCells(number=0)
        started[1].start()
        self.__expectRunning(started[0])
        self.__expectRunning(started[1])
        self.neo.expectClusterRecovering()
        self.neo.expectOudatedCells(number=10)
520

521 522
if __name__ == "__main__":
    unittest.main()