testStorage.py 18.5 KB
Newer Older
1
#
2
# Copyright (C) 2009-2012  Nexedi SA
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
import time
18 19
import unittest
import transaction
20
from persistent import Persistent
21

22
from . import NEOCluster, NEOFunctionalTest
23
from neo.lib.protocol import ClusterStates, NodeStates
24
from ZODB.tests.StorageTestBase import zodb_pickle
25 26

class PObject(Persistent):
27

28 29 30 31
    def __init__(self, value):
        self.value = value


32 33
OBJECT_NUMBER = 100

34
class StorageTests(NEOFunctionalTest):
35 36

    def setUp(self):
37
        NEOFunctionalTest.setUp(self)
38
        self.neo = None
39

40
    def _tearDown(self, success):
41 42
        if self.neo is not None:
            self.neo.stop()
43
        NEOFunctionalTest._tearDown(self, success)
44 45

    def queryCount(self, db, query):
46 47 48 49 50
        try:
            db.query(query)
        except AttributeError:
            return db.execute(query).fetchone()[0]
        return db.store_result().fetch_row()[0][0]
51

52
    def __setup(self, storage_number=2, pending_number=0, replicas=1,
53
            partitions=10, master_count=2):
54
        # create a neo cluster
55
        self.neo = NEOCluster(['test_neo%d' % i for i in xrange(storage_number)],
56
            master_count=master_count,
57
            partitions=partitions, replicas=replicas,
58
            temp_dir=self.getTempDirectory(),
59
            clear_databases=True,
60 61
        )
        # too many pending storage nodes requested
Vincent Pelletier's avatar
Vincent Pelletier committed
62
        assert pending_number <= storage_number
63 64 65 66 67 68 69 70 71
        storage_processes  = self.neo.getStorageProcessList()
        start_storage_number = len(storage_processes) - pending_number
        # return a tuple of storage processes lists
        started_processes = storage_processes[:start_storage_number]
        stopped_processes = storage_processes[start_storage_number:]
        self.neo.start(except_storages=stopped_processes)
        return (started_processes, stopped_processes)

    def __populate(self):
72
        db, conn = self.neo.getZODBConnection()
73 74
        root = conn.root()
        for i in xrange(OBJECT_NUMBER):
75
            root[i] = PObject(i)
76 77
        transaction.commit()
        conn.close()
78
        db.close()
79

80
    def __checkDatabase(self, db_name):
81
        db = self.neo.getSQLConnection(db_name)
82 83 84
        # wait for the sql transaction to be commited
        def callback(last_try):
            object_number = self.queryCount(db, 'select count(*) from obj')
85
            return object_number == OBJECT_NUMBER + 2, object_number
86
        self.neo.expectCondition(callback)
87 88 89 90 91 92
        # no more temporarily objects
        t_objects = self.queryCount(db, 'select count(*) from tobj')
        self.assertEqual(t_objects, 0)
        # One revision per object and two for the root, before and after
        revisions = self.queryCount(db, 'select count(*) from obj')
        self.assertEqual(revisions, OBJECT_NUMBER + 2)
93
        # One object more for the root
94 95 96
        query = 'select count(*) from (select * from obj group by oid) as t'
        objects = self.queryCount(db, query)
        self.assertEqual(objects, OBJECT_NUMBER + 1)
97 98 99 100 101 102 103 104 105
        # Check object content
        db, conn = self.neo.getZODBConnection()
        root = conn.root()
        for i in xrange(OBJECT_NUMBER):
            obj = root[i]
            self.assertEqual(obj.value, i)
        transaction.abort()
        conn.close()
        db.close()
106

107
    def __checkReplicationDone(self):
108
        # wait for replication to finish
109
        def expect_all_storages(last_try):
110
            storage_number = len(self.neo.getStorageList())
111 112
            return storage_number == len(self.neo.db_list), storage_number
        self.neo.expectCondition(expect_all_storages, timeout=10)
113
        self.neo.expectOudatedCells(number=0, timeout=10)
114
        # check databases
115 116
        for db_name in self.neo.db_list:
            self.__checkDatabase(db_name)
117 118

        # check storages state
119
        storage_list = self.neo.getStorageList(NodeStates.RUNNING)
120 121 122
        self.assertEqual(len(storage_list), 2)

    def testNewNodesInPendingState(self):
123 124
        """ Check that new storage nodes are set as pending, the cluster remains
        running """
125

126 127 128
        # start with the first storage
        processes = self.__setup(storage_number=3, replicas=1, pending_number=2)
        started, stopped = processes
129
        self.neo.expectRunning(started[0])
130
        self.neo.expectClusterRunning()
131

132 133
        # start the second then the third
        stopped[0].start()
134
        self.neo.expectPending(stopped[0])
135 136
        self.neo.expectClusterRunning()
        stopped[1].start()
137
        self.neo.expectPending(stopped[1])
138 139 140
        self.neo.expectClusterRunning()

    def testReplicationWithNewStorage(self):
141 142 143
        """ create a cluster with one storage, populate it, add a new storage
        then check the database content to ensure the replication process is
        well done """
144 145

        # populate one storage
146 147
        processes = self.__setup(storage_number=2, replicas=1, pending_number=1,
                partitions=10)
148
        started, stopped = processes
149
        self.neo.expectOudatedCells(number=0)
150 151
        self.__populate()
        self.neo.expectClusterRunning()
152
        self.neo.expectAssignedCells(started[0], number=10)
153 154

        # start the second
155
        stopped[0].start()
156
        self.neo.expectPending(stopped[0])
157
        self.neo.expectClusterRunning()
158 159

        # add it to the partition table
160
        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
161
        self.neo.expectRunning(stopped[0])
162
        self.neo.neoctl.tweakPartitionTable()
163
        self.neo.expectAssignedCells(stopped[0], number=10)
164
        self.neo.expectClusterRunning()
165

166
        # wait for replication to finish then check
167 168 169 170
        self.__checkReplicationDone()
        self.neo.expectClusterRunning()

    def testOudatedCellsOnDownStorage(self):
171 172
        """ Check that the storage cells are set as oudated when the node is
        down, the cluster remains up since there is a replica """
173 174

        # populate the two storages
175
        started, _ = self.__setup(partitions=3, replicas=1, storage_number=3)
176 177
        self.neo.expectRunning(started[0])
        self.neo.expectRunning(started[1])
178
        self.neo.expectRunning(started[2])
179
        self.neo.expectOudatedCells(number=0)
180

181
        self.neo.neoctl.killNode(started[0].getUUID())
182 183 184 185
        # Cluster still operational. All cells of first storage should be
        # outdated.
        self.neo.expectUnavailable(started[0])
        self.neo.expectOudatedCells(2)
186 187
        self.neo.expectClusterRunning()

188 189
        self.assertRaises(RuntimeError, self.neo.neoctl.killNode,
            started[1].getUUID())
190 191 192 193 194 195 196
        started[1].stop()
        # Cluster not operational anymore. Only cells of second storage that
        # were shared with the third one should become outdated.
        self.neo.expectUnavailable(started[1])
        self.neo.expectClusterVerifying()
        self.neo.expectOudatedCells(3)

197
    def testVerificationTriggered(self):
198 199 200
        """ Check that the verification stage is executed when a storage node
        required to be operationnal is lost, and the cluster come back in
        running state when the storage is up again """
201 202 203

        # start neo with one storages
        (started, _) = self.__setup(replicas=0, storage_number=1)
204
        self.neo.expectRunning(started[0])
205
        self.neo.expectOudatedCells(number=0)
206 207 208 209 210
        # add a client node
        db, conn = self.neo.getZODBConnection()
        root = conn.root()['test'] = 'ok'
        transaction.commit()
        self.assertEqual(len(self.neo.getClientlist()), 1)
211 212 213

        # stop it, the cluster must switch to verification
        started[0].stop()
214
        self.neo.expectUnavailable(started[0])
215
        self.neo.expectClusterVerifying()
216 217 218 219
        # client must have been disconnected
        self.assertEqual(len(self.neo.getClientlist()), 0)
        conn.close()
        db.close()
220 221 222

        # restart it, the cluster must come back to running state
        started[0].start()
223
        self.neo.expectRunning(started[0])
224 225 226
        self.neo.expectClusterRunning()

    def testSequentialStorageKill(self):
227 228
        """ Check that the cluster remains running until the last storage node
        died when all are replicas """
229 230 231

        # start neo with three storages / two replicas
        (started, _) = self.__setup(replicas=2, storage_number=3, partitions=10)
232 233 234
        self.neo.expectRunning(started[0])
        self.neo.expectRunning(started[1])
        self.neo.expectRunning(started[2])
235 236 237 238 239
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()

        # stop one storage, cluster must remains running
        started[0].stop()
240 241 242
        self.neo.expectUnavailable(started[0])
        self.neo.expectRunning(started[1])
        self.neo.expectRunning(started[2])
243 244 245 246 247
        self.neo.expectOudatedCells(number=10)
        self.neo.expectClusterRunning()

        # stop a second storage, cluster is still running
        started[1].stop()
248 249 250
        self.neo.expectUnavailable(started[0])
        self.neo.expectUnavailable(started[1])
        self.neo.expectRunning(started[2])
251 252 253 254 255
        self.neo.expectOudatedCells(number=20)
        self.neo.expectClusterRunning()

        # stop the last, cluster died
        started[2].stop()
256 257 258
        self.neo.expectUnavailable(started[0])
        self.neo.expectUnavailable(started[1])
        self.neo.expectUnavailable(started[2])
259
        self.neo.expectOudatedCells(number=20)
260
        self.neo.expectClusterVerifying()
261 262

    def testConflictingStorageRejected(self):
263 264
        """ Check that a storage coming after the recovery process with the same
        UUID as another already running is refused """
265 266 267

        # start with one storage
        (started, stopped) = self.__setup(storage_number=2, pending_number=1)
268
        self.neo.expectRunning(started[0])
269
        self.neo.expectClusterRunning()
270
        self.neo.expectOudatedCells(number=0)
271 272 273 274

        # start the second with the same UUID as the first
        stopped[0].setUUID(started[0].getUUID())
        stopped[0].start()
275
        self.neo.expectOudatedCells(number=0)
276 277

        # check the first and the cluster are still running
278
        self.neo.expectRunning(started[0])
279 280 281 282 283
        self.neo.expectClusterRunning()

        # XXX: should wait for the storage rejection

        # check that no node were added
284
        storage_number = len(self.neo.getStorageList())
285
        self.assertEqual(storage_number, 1)
286

287 288 289 290 291
    def testPartitionTableReorganizedWithNewStorage(self):
        """ Check if the partition change when adding a new storage to a cluster
        with one storage and no replicas """

        # start with one storage and no replicas
292
        (started, stopped) = self.__setup(storage_number=2, pending_number=1,
293
            partitions=10, replicas=0)
294
        self.neo.expectRunning(started[0])
295
        self.neo.expectClusterRunning()
296
        self.neo.expectAssignedCells(started[0], 10)
297
        self.neo.expectOudatedCells(number=0)
298 299 300

        # start the second and add it to the partition table
        stopped[0].start()
301
        self.neo.expectPending(stopped[0])
302
        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
303
        self.neo.neoctl.tweakPartitionTable()
304
        self.neo.expectRunning(stopped[0])
305
        self.neo.expectClusterRunning()
306
        self.neo.expectOudatedCells(number=0)
307

308
        # the partition table must change, each node should be assigned to
309
        # five partitions
310 311
        self.neo.expectAssignedCells(started[0], 5)
        self.neo.expectAssignedCells(stopped[0], 5)
312 313 314 315 316 317 318 319

    def testPartitionTableReorganizedAfterDrop(self):
        """ Check that the partition change when dropping a replicas from a
        cluster with two storages """

        # start with two storage / one replicas
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
                partitions=10, pending_number=0)
320 321
        self.neo.expectRunning(started[0])
        self.neo.expectRunning(started[1])
322
        self.neo.expectOudatedCells(number=0)
323 324
        self.neo.expectAssignedCells(started[0], 10)
        self.neo.expectAssignedCells(started[1], 10)
325 326 327

        # kill one storage, it should be set as unavailable
        started[0].stop()
328 329
        self.neo.expectUnavailable(started[0])
        self.neo.expectRunning(started[1])
330
        # and the partition table must not change
331 332
        self.neo.expectAssignedCells(started[0], 10)
        self.neo.expectAssignedCells(started[1], 10)
333

334 335
        # ask neoctl to drop it
        self.neo.neoctl.dropNode(started[0].getUUID())
336
        self.neo.expectStorageNotKnown(started[0])
337 338
        self.neo.expectAssignedCells(started[0], 0)
        self.neo.expectAssignedCells(started[1], 10)
339 340 341
        self.assertRaises(RuntimeError, self.neo.neoctl.dropNode,
                          started[1].getUUID())
        self.neo.expectClusterRunning()
342

343 344 345 346
    def testReplicationThenRunningWithReplicas(self):
        """ Add a replicas to a cluster, wait for the replication to finish,
        shutdown the first storage then check the new storage content """

347
        # start with one storage
348 349
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
                pending_number=1, partitions=10)
350
        self.neo.expectRunning(started[0])
351
        self.neo.expectStorageNotKnown(stopped[0])
352
        self.neo.expectOudatedCells(number=0)
353 354 355 356 357

        # populate the cluster with some data
        self.__populate()
        self.neo.expectClusterRunning()
        self.neo.expectOudatedCells(number=0)
358
        self.neo.expectAssignedCells(started[0], 10)
359 360 361 362
        self.__checkDatabase(self.neo.db_list[0])

        # add a second storage
        stopped[0].start()
363
        self.neo.expectPending(stopped[0])
364
        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
365
        self.neo.neoctl.tweakPartitionTable()
366
        self.neo.expectRunning(stopped[0])
367
        self.neo.expectClusterRunning()
368 369
        self.neo.expectAssignedCells(started[0], 10)
        self.neo.expectAssignedCells(stopped[0], 10)
370 371 372 373 374 375 376 377

        # wait for replication to finish
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()
        self.__checkReplicationDone()

        # kill the first storage
        started[0].stop()
378
        self.neo.expectUnavailable(started[0])
379
        self.neo.expectOudatedCells(number=10)
380 381
        self.neo.expectAssignedCells(started[0], 10)
        self.neo.expectAssignedCells(stopped[0], 10)
382 383 384 385 386
        self.neo.expectClusterRunning()
        self.__checkDatabase(self.neo.db_list[0])

        # drop it from partition table
        self.neo.neoctl.dropNode(started[0].getUUID())
387
        self.neo.expectStorageNotKnown(started[0])
388
        self.neo.expectRunning(stopped[0])
389 390
        self.neo.expectAssignedCells(started[0], 0)
        self.neo.expectAssignedCells(stopped[0], 10)
391 392
        self.__checkDatabase(self.neo.db_list[1])

393
    def testStartWithManyPartitions(self):
394 395 396
        """ Just tests that cluster can start with more than 1000 partitions.
        1000, because currently there is an arbitrary packet split at
        every 1000 partition when sending a partition table. """
397
        self.__setup(storage_number=2, partitions=5000, master_count=1)
398
        self.neo.expectClusterState(ClusterStates.RUNNING)
399

400 401 402 403
    def testRecoveryWithMultiplePT(self):
        # start a cluster with 2 storages and a replica
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
                pending_number=0, partitions=10)
404 405
        self.neo.expectRunning(started[0])
        self.neo.expectRunning(started[1])
406 407 408 409 410
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()

        # drop the first then the second storage
        started[0].stop()
411 412
        self.neo.expectUnavailable(started[0])
        self.neo.expectRunning(started[1])
413 414
        self.neo.expectOudatedCells(number=10)
        started[1].stop()
415 416
        self.neo.expectUnavailable(started[0])
        self.neo.expectUnavailable(started[1])
417
        self.neo.expectOudatedCells(number=10)
418
        self.neo.expectClusterVerifying()
419 420 421 422 423
        # XXX: need to sync with storages first
        self.neo.stop()

        # restart the cluster with the first storage killed
        self.neo.run(except_storages=[started[1]])
424
        self.neo.expectPending(started[0])
425
        self.neo.expectUnknown(started[1])
426
        self.neo.expectClusterRecovering()
427
        # Cluster doesn't know there are outdated cells
428 429
        self.neo.expectOudatedCells(number=0)
        started[1].start()
430 431
        self.neo.expectRunning(started[0])
        self.neo.expectRunning(started[1])
432 433
        self.neo.expectClusterRunning()
        self.neo.expectOudatedCells(number=0)
434

435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
    def testReplicationBlockedByUnfinished(self):
        # start a cluster with 1 of 2 storages and a replica
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
                pending_number=1, partitions=10)
        self.neo.expectRunning(started[0])
        self.neo.expectStorageNotKnown(stopped[0])
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()
        self.__populate()
        self.neo.expectOudatedCells(number=0)

        # start a transaction that will block the end of the replication
        db, conn = self.neo.getZODBConnection()
        st = conn._storage
        t = transaction.Transaction()
        t.user  = 'user'
        t.description = 'desc'
        oid = st.new_oid()
        rev = '\0' * 8
        data = zodb_pickle(PObject(42))
        st.tpc_begin(t)
        st.store(oid, rev, data, '', t)

        # start the oudated storage
        stopped[0].start()
        self.neo.expectPending(stopped[0])
        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
462
        self.neo.neoctl.tweakPartitionTable()
463 464 465 466 467 468 469 470 471 472 473 474 475 476
        self.neo.expectRunning(stopped[0])
        self.neo.expectClusterRunning()
        self.neo.expectAssignedCells(started[0], 10)
        self.neo.expectAssignedCells(stopped[0], 10)
        # wait a bit, replication must not happen. This hack is required
        # because we cannot gather informations directly from the storages
        time.sleep(10)
        self.neo.expectOudatedCells(number=10)

        # finish the transaction, the replication must happen and finish
        st.tpc_vote(t)
        st.tpc_finish(t)
        self.neo.expectOudatedCells(number=0, timeout=10)

477 478
if __name__ == "__main__":
    unittest.main()