testStorage.py 18.2 KB
Newer Older
1
#
Julien Muchembled's avatar
Julien Muchembled committed
2
# Copyright (C) 2009-2016  Nexedi SA
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
import time
18 19
import unittest
import transaction
20
from persistent import Persistent
21

22
from . import NEOCluster, NEOFunctionalTest
23
from neo.lib.protocol import ClusterStates, NodeStates
24
from ZODB.tests.StorageTestBase import zodb_pickle
25 26

class PObject(Persistent):
27

28 29 30 31
    def __init__(self, value):
        self.value = value


32 33
OBJECT_NUMBER = 100

34
class StorageTests(NEOFunctionalTest):
35

36
    def _tearDown(self, success):
37
        if hasattr(self, "neo"):
38
            self.neo.stop()
39
            del self.neo
40
        NEOFunctionalTest._tearDown(self, success)
41

42
    def __setup(self, storage_number=2, pending_number=0, replicas=1,
43
            partitions=10, master_count=2):
44
        # create a neo cluster
45
        self.neo = NEOCluster(['test_neo%d' % i for i in xrange(storage_number)],
46
            master_count=master_count,
47
            partitions=partitions, replicas=replicas,
48
            temp_dir=self.getTempDirectory(),
49
            clear_databases=True,
50 51
        )
        # too many pending storage nodes requested
Vincent Pelletier's avatar
Vincent Pelletier committed
52
        assert pending_number <= storage_number
53 54 55 56 57 58 59 60 61
        storage_processes  = self.neo.getStorageProcessList()
        start_storage_number = len(storage_processes) - pending_number
        # return a tuple of storage processes lists
        started_processes = storage_processes[:start_storage_number]
        stopped_processes = storage_processes[start_storage_number:]
        self.neo.start(except_storages=stopped_processes)
        return (started_processes, stopped_processes)

    def __populate(self):
62
        db, conn = self.neo.getZODBConnection()
63 64
        root = conn.root()
        for i in xrange(OBJECT_NUMBER):
65
            root[i] = PObject(i)
66 67
        transaction.commit()
        conn.close()
68
        db.close()
69

70
    def __checkDatabase(self, db_name):
71
        db = self.neo.getSQLConnection(db_name)
72 73
        # wait for the sql transaction to be commited
        def callback(last_try):
74
            db.commit() # to get a fresh view
75
            # One revision per object and two for the root, before and after
76
            (object_number,), = db.query('SELECT count(*) FROM obj')
77
            return object_number == OBJECT_NUMBER + 2, object_number
78
        self.neo.expectCondition(callback)
79
        # no more temporarily objects
80
        (t_objects,), = db.query('SELECT count(*) FROM tobj')
81
        self.assertEqual(t_objects, 0)
82
        # One object more for the root
83
        query = 'SELECT count(*) FROM (SELECT * FROM obj GROUP BY oid) AS t'
84
        (objects,), = db.query(query)
85
        self.assertEqual(objects, OBJECT_NUMBER + 1)
86 87 88 89 90 91 92 93 94
        # Check object content
        db, conn = self.neo.getZODBConnection()
        root = conn.root()
        for i in xrange(OBJECT_NUMBER):
            obj = root[i]
            self.assertEqual(obj.value, i)
        transaction.abort()
        conn.close()
        db.close()
95

96
    def __checkReplicationDone(self):
97
        # wait for replication to finish
98
        def expect_all_storages(last_try):
99
            storage_number = len(self.neo.getStorageList())
100 101
            return storage_number == len(self.neo.db_list), storage_number
        self.neo.expectCondition(expect_all_storages, timeout=10)
102
        self.neo.expectOudatedCells(number=0, timeout=10)
103
        # check databases
104 105
        for db_name in self.neo.db_list:
            self.__checkDatabase(db_name)
106 107

        # check storages state
108
        storage_list = self.neo.getStorageList(NodeStates.RUNNING)
109 110 111
        self.assertEqual(len(storage_list), 2)

    def testNewNodesInPendingState(self):
112 113
        """ Check that new storage nodes are set as pending, the cluster remains
        running """
114

115 116 117
        # start with the first storage
        processes = self.__setup(storage_number=3, replicas=1, pending_number=2)
        started, stopped = processes
118
        self.neo.expectRunning(started[0])
119
        self.neo.expectClusterRunning()
120

121 122
        # start the second then the third
        stopped[0].start()
123
        self.neo.expectPending(stopped[0])
124 125
        self.neo.expectClusterRunning()
        stopped[1].start()
126
        self.neo.expectPending(stopped[1])
127 128 129
        self.neo.expectClusterRunning()

    def testReplicationWithNewStorage(self):
130 131 132
        """ create a cluster with one storage, populate it, add a new storage
        then check the database content to ensure the replication process is
        well done """
133 134

        # populate one storage
135 136
        processes = self.__setup(storage_number=2, replicas=1, pending_number=1,
                partitions=10)
137
        started, stopped = processes
138
        self.neo.expectOudatedCells(number=0)
139 140
        self.__populate()
        self.neo.expectClusterRunning()
141
        self.neo.expectAssignedCells(started[0], number=10)
142 143

        # start the second
144
        stopped[0].start()
145
        self.neo.expectPending(stopped[0])
146
        self.neo.expectClusterRunning()
147 148

        # add it to the partition table
149
        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
150
        self.neo.expectRunning(stopped[0])
151
        self.neo.neoctl.tweakPartitionTable()
152
        self.neo.expectAssignedCells(stopped[0], number=10)
153
        self.neo.expectClusterRunning()
154

155
        # wait for replication to finish then check
156 157 158 159
        self.__checkReplicationDone()
        self.neo.expectClusterRunning()

    def testOudatedCellsOnDownStorage(self):
160 161
        """ Check that the storage cells are set as oudated when the node is
        down, the cluster remains up since there is a replica """
162 163

        # populate the two storages
164
        started, _ = self.__setup(partitions=3, replicas=1, storage_number=3)
165 166
        self.neo.expectRunning(started[0])
        self.neo.expectRunning(started[1])
167
        self.neo.expectRunning(started[2])
168
        self.neo.expectOudatedCells(number=0)
169

170
        self.neo.neoctl.killNode(started[0].getUUID())
171 172 173 174
        # Cluster still operational. All cells of first storage should be
        # outdated.
        self.neo.expectUnavailable(started[0])
        self.neo.expectOudatedCells(2)
175 176
        self.neo.expectClusterRunning()

177 178
        self.assertRaises(RuntimeError, self.neo.neoctl.killNode,
            started[1].getUUID())
179 180 181 182
        started[1].stop()
        # Cluster not operational anymore. Only cells of second storage that
        # were shared with the third one should become outdated.
        self.neo.expectUnavailable(started[1])
183
        self.neo.expectClusterRecovering()
184 185
        self.neo.expectOudatedCells(3)

186
    def testVerificationTriggered(self):
187 188 189
        """ Check that the verification stage is executed when a storage node
        required to be operationnal is lost, and the cluster come back in
        running state when the storage is up again """
190 191 192

        # start neo with one storages
        (started, _) = self.__setup(replicas=0, storage_number=1)
193
        self.neo.expectRunning(started[0])
194
        self.neo.expectOudatedCells(number=0)
195 196 197 198 199
        # add a client node
        db, conn = self.neo.getZODBConnection()
        root = conn.root()['test'] = 'ok'
        transaction.commit()
        self.assertEqual(len(self.neo.getClientlist()), 1)
200 201 202

        # stop it, the cluster must switch to verification
        started[0].stop()
203
        self.neo.expectUnavailable(started[0])
204
        self.neo.expectClusterRecovering()
205 206 207 208
        # client must have been disconnected
        self.assertEqual(len(self.neo.getClientlist()), 0)
        conn.close()
        db.close()
209 210 211

        # restart it, the cluster must come back to running state
        started[0].start()
212
        self.neo.expectRunning(started[0])
213 214 215
        self.neo.expectClusterRunning()

    def testSequentialStorageKill(self):
216 217
        """ Check that the cluster remains running until the last storage node
        died when all are replicas """
218 219 220

        # start neo with three storages / two replicas
        (started, _) = self.__setup(replicas=2, storage_number=3, partitions=10)
221 222 223
        self.neo.expectRunning(started[0])
        self.neo.expectRunning(started[1])
        self.neo.expectRunning(started[2])
224 225 226 227 228
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()

        # stop one storage, cluster must remains running
        started[0].stop()
229 230 231
        self.neo.expectUnavailable(started[0])
        self.neo.expectRunning(started[1])
        self.neo.expectRunning(started[2])
232 233 234 235 236
        self.neo.expectOudatedCells(number=10)
        self.neo.expectClusterRunning()

        # stop a second storage, cluster is still running
        started[1].stop()
237 238 239
        self.neo.expectUnavailable(started[0])
        self.neo.expectUnavailable(started[1])
        self.neo.expectRunning(started[2])
240 241 242 243 244
        self.neo.expectOudatedCells(number=20)
        self.neo.expectClusterRunning()

        # stop the last, cluster died
        started[2].stop()
245 246 247
        self.neo.expectUnavailable(started[0])
        self.neo.expectUnavailable(started[1])
        self.neo.expectUnavailable(started[2])
248
        self.neo.expectOudatedCells(number=20)
249
        self.neo.expectClusterRecovering()
250 251

    def testConflictingStorageRejected(self):
252 253
        """ Check that a storage coming after the recovery process with the same
        UUID as another already running is refused """
254 255 256

        # start with one storage
        (started, stopped) = self.__setup(storage_number=2, pending_number=1)
257
        self.neo.expectRunning(started[0])
258
        self.neo.expectClusterRunning()
259
        self.neo.expectOudatedCells(number=0)
260 261 262 263

        # start the second with the same UUID as the first
        stopped[0].setUUID(started[0].getUUID())
        stopped[0].start()
264
        self.neo.expectOudatedCells(number=0)
265 266

        # check the first and the cluster are still running
267
        self.neo.expectRunning(started[0])
268 269 270 271 272
        self.neo.expectClusterRunning()

        # XXX: should wait for the storage rejection

        # check that no node were added
273
        storage_number = len(self.neo.getStorageList())
274
        self.assertEqual(storage_number, 1)
275

276 277 278 279 280
    def testPartitionTableReorganizedWithNewStorage(self):
        """ Check if the partition change when adding a new storage to a cluster
        with one storage and no replicas """

        # start with one storage and no replicas
281
        (started, stopped) = self.__setup(storage_number=2, pending_number=1,
282
            partitions=10, replicas=0)
283
        self.neo.expectRunning(started[0])
284
        self.neo.expectClusterRunning()
285
        self.neo.expectAssignedCells(started[0], 10)
286
        self.neo.expectOudatedCells(number=0)
287 288 289

        # start the second and add it to the partition table
        stopped[0].start()
290
        self.neo.expectPending(stopped[0])
291
        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
292
        self.neo.neoctl.tweakPartitionTable()
293
        self.neo.expectRunning(stopped[0])
294
        self.neo.expectClusterRunning()
295
        self.neo.expectOudatedCells(number=0)
296

297
        # the partition table must change, each node should be assigned to
298
        # five partitions
299 300
        self.neo.expectAssignedCells(started[0], 5)
        self.neo.expectAssignedCells(stopped[0], 5)
301 302 303 304 305 306 307 308

    def testPartitionTableReorganizedAfterDrop(self):
        """ Check that the partition change when dropping a replicas from a
        cluster with two storages """

        # start with two storage / one replicas
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
                partitions=10, pending_number=0)
309 310
        self.neo.expectRunning(started[0])
        self.neo.expectRunning(started[1])
311
        self.neo.expectOudatedCells(number=0)
312 313
        self.neo.expectAssignedCells(started[0], 10)
        self.neo.expectAssignedCells(started[1], 10)
314 315 316

        # kill one storage, it should be set as unavailable
        started[0].stop()
317 318
        self.neo.expectUnavailable(started[0])
        self.neo.expectRunning(started[1])
319
        # and the partition table must not change
320 321
        self.neo.expectAssignedCells(started[0], 10)
        self.neo.expectAssignedCells(started[1], 10)
322

323 324
        # ask neoctl to drop it
        self.neo.neoctl.dropNode(started[0].getUUID())
325
        self.neo.expectStorageNotKnown(started[0])
326 327
        self.neo.expectAssignedCells(started[0], 0)
        self.neo.expectAssignedCells(started[1], 10)
328 329 330
        self.assertRaises(RuntimeError, self.neo.neoctl.dropNode,
                          started[1].getUUID())
        self.neo.expectClusterRunning()
331

332 333 334 335
    def testReplicationThenRunningWithReplicas(self):
        """ Add a replicas to a cluster, wait for the replication to finish,
        shutdown the first storage then check the new storage content """

336
        # start with one storage
337 338
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
                pending_number=1, partitions=10)
339
        self.neo.expectRunning(started[0])
340
        self.neo.expectStorageNotKnown(stopped[0])
341
        self.neo.expectOudatedCells(number=0)
342 343 344 345 346

        # populate the cluster with some data
        self.__populate()
        self.neo.expectClusterRunning()
        self.neo.expectOudatedCells(number=0)
347
        self.neo.expectAssignedCells(started[0], 10)
348 349 350 351
        self.__checkDatabase(self.neo.db_list[0])

        # add a second storage
        stopped[0].start()
352
        self.neo.expectPending(stopped[0])
353
        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
354
        self.neo.neoctl.tweakPartitionTable()
355
        self.neo.expectRunning(stopped[0])
356
        self.neo.expectClusterRunning()
357 358
        self.neo.expectAssignedCells(started[0], 10)
        self.neo.expectAssignedCells(stopped[0], 10)
359 360 361 362 363 364 365 366

        # wait for replication to finish
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()
        self.__checkReplicationDone()

        # kill the first storage
        started[0].stop()
367
        self.neo.expectUnavailable(started[0])
368
        self.neo.expectOudatedCells(number=10)
369 370
        self.neo.expectAssignedCells(started[0], 10)
        self.neo.expectAssignedCells(stopped[0], 10)
371 372 373 374 375
        self.neo.expectClusterRunning()
        self.__checkDatabase(self.neo.db_list[0])

        # drop it from partition table
        self.neo.neoctl.dropNode(started[0].getUUID())
376
        self.neo.expectStorageNotKnown(started[0])
377
        self.neo.expectRunning(stopped[0])
378 379
        self.neo.expectAssignedCells(started[0], 0)
        self.neo.expectAssignedCells(stopped[0], 10)
380 381
        self.__checkDatabase(self.neo.db_list[1])

382
    def testStartWithManyPartitions(self):
383 384 385
        """ Just tests that cluster can start with more than 1000 partitions.
        1000, because currently there is an arbitrary packet split at
        every 1000 partition when sending a partition table. """
386
        self.__setup(storage_number=2, partitions=5000, master_count=1)
387
        self.neo.expectClusterState(ClusterStates.RUNNING)
388

389 390 391 392
    def testRecoveryWithMultiplePT(self):
        # start a cluster with 2 storages and a replica
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
                pending_number=0, partitions=10)
393 394
        self.neo.expectRunning(started[0])
        self.neo.expectRunning(started[1])
395 396 397 398 399
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()

        # drop the first then the second storage
        started[0].stop()
400 401
        self.neo.expectUnavailable(started[0])
        self.neo.expectRunning(started[1])
402 403
        self.neo.expectOudatedCells(number=10)
        started[1].stop()
404 405
        self.neo.expectUnavailable(started[0])
        self.neo.expectUnavailable(started[1])
406
        self.neo.expectOudatedCells(number=10)
407
        self.neo.expectClusterRecovering()
408 409 410 411 412
        # XXX: need to sync with storages first
        self.neo.stop()

        # restart the cluster with the first storage killed
        self.neo.run(except_storages=[started[1]])
413
        self.neo.expectPending(started[0])
414
        self.neo.expectUnknown(started[1])
415
        self.neo.expectClusterRecovering()
416
        # Cluster doesn't know there are outdated cells
417 418
        self.neo.expectOudatedCells(number=0)
        started[1].start()
419 420
        self.neo.expectRunning(started[0])
        self.neo.expectRunning(started[1])
421 422
        self.neo.expectClusterRunning()
        self.neo.expectOudatedCells(number=0)
423

424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450
    def testReplicationBlockedByUnfinished(self):
        # start a cluster with 1 of 2 storages and a replica
        (started, stopped) = self.__setup(storage_number=2, replicas=1,
                pending_number=1, partitions=10)
        self.neo.expectRunning(started[0])
        self.neo.expectStorageNotKnown(stopped[0])
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()
        self.__populate()
        self.neo.expectOudatedCells(number=0)

        # start a transaction that will block the end of the replication
        db, conn = self.neo.getZODBConnection()
        st = conn._storage
        t = transaction.Transaction()
        t.user  = 'user'
        t.description = 'desc'
        oid = st.new_oid()
        rev = '\0' * 8
        data = zodb_pickle(PObject(42))
        st.tpc_begin(t)
        st.store(oid, rev, data, '', t)

        # start the oudated storage
        stopped[0].start()
        self.neo.expectPending(stopped[0])
        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
451
        self.neo.neoctl.tweakPartitionTable()
452 453 454 455 456 457 458 459 460 461 462 463 464 465
        self.neo.expectRunning(stopped[0])
        self.neo.expectClusterRunning()
        self.neo.expectAssignedCells(started[0], 10)
        self.neo.expectAssignedCells(stopped[0], 10)
        # wait a bit, replication must not happen. This hack is required
        # because we cannot gather informations directly from the storages
        time.sleep(10)
        self.neo.expectOudatedCells(number=10)

        # finish the transaction, the replication must happen and finish
        st.tpc_vote(t)
        st.tpc_finish(t)
        self.neo.expectOudatedCells(number=0, timeout=10)

466 467
if __name__ == "__main__":
    unittest.main()