testStorage.py 7.85 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
#
# Copyright (C) 2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

import ZODB
import MySQLdb
import unittest
import transaction
from Persistence import Persistent

from neo.tests.functional import NEOCluster
from neo.client.Storage import Storage as NEOStorage
26
from neo import protocol
27 28 29 30 31 32 33

class PObject(Persistent):
    
    def __init__(self, value):
        self.value = value


34 35 36
OBJECT_NUMBER = 100

class StorageTests(unittest.TestCase):
37 38

    def setUp(self):
39
        self.neo = None
40 41

    def tearDown(self):
42 43
        if self.neo is not None:
            self.neo.stop()
44 45 46 47 48

    def queryCount(self, db, query):
        db.query(query)
        result = db.store_result().fetch_row()[0][0]
        return result
49

50 51
    def __setup(self, storage_number=2, pending_number=0, replicas=1, partitions=10):
        # create a neo cluster
52 53
        self.neo = NEOCluster(['test_neo%d' % i for i in xrange(storage_number)],
            port_base=20000, 
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
            master_node_count=2,
            partitions=10, replicas=replicas,
        )
        self.neo.setupDB()
        # too many pending storage nodes requested
        if pending_number > storage_number:
            pending_number = storage_number
        storage_processes  = self.neo.getStorageProcessList()
        start_storage_number = len(storage_processes) - pending_number
        # return a tuple of storage processes lists
        started_processes = storage_processes[:start_storage_number]
        stopped_processes = storage_processes[start_storage_number:]
        self.neo.start(except_storages=stopped_processes)
        return (started_processes, stopped_processes)

    def __populate(self):
70
        db, conn = self.neo.getZODBConnection()
71 72 73 74 75
        root = conn.root()
        for i in xrange(OBJECT_NUMBER):
            root[i] = PObject(i) 
        transaction.commit()
        conn.close()
76
        db.close()
77

78
    def __checkDatabase(self, db_name):
79 80 81 82
        db = MySQLdb.connect(db=db_name, user='test')
        # wait for the sql transaction to be commited
        def callback(last_try):
            object_number = self.queryCount(db, 'select count(*) from obj')
83 84
            return object_number == OBJECT_NUMBER + 2, object_number
        self.neo.expectCondition(callback, 0, 1)
85 86 87 88 89 90 91 92 93 94 95
        # no more temporarily objects
        t_objects = self.queryCount(db, 'select count(*) from tobj')
        self.assertEqual(t_objects, 0)
        # One revision per object and two for the root, before and after
        revisions = self.queryCount(db, 'select count(*) from obj')
        self.assertEqual(revisions, OBJECT_NUMBER + 2)
        # One object more for the root 
        query = 'select count(*) from (select * from obj group by oid) as t'
        objects = self.queryCount(db, query)
        self.assertEqual(objects, OBJECT_NUMBER + 1)

96
    def __checkReplicationDone(self):
97
        # wait for replication to finish
98
        self.neo.expectOudatedCells(number=0, timeout=10)
99
        # check databases
100 101
        for db_name in self.neo.db_list:
            self.__checkDatabase(db_name)
102 103

        # check storages state
104 105 106 107 108 109 110 111 112 113 114 115
        storage_list = self.neo.getStorageNodeList(protocol.RUNNING_STATE)
        self.assertEqual(len(storage_list), 2)

    def __expectRunning(self, process):
        self.neo.expectStorageState(process.getUUID(), protocol.RUNNING_STATE)

    def __expectPending(self, process):
        self.neo.expectStorageState(process.getUUID(), protocol.PENDING_STATE)
    
    def __expectUnavailable(self, process):
        self.neo.expectStorageState(process.getUUID(),
                protocol.TEMPORARILY_DOWN_STATE)
116 117 118
    
    def testReplicationWithoutBreak(self):

119 120 121 122
        # populate the cluster then check the databases
        self.__setup(storage_number=2, replicas=1)
        self.__populate()
        self.__checkReplicationDone()
123

124
    def testNewNodesInPendingState(self):
125

126 127 128 129 130
        # start with the first storage
        processes = self.__setup(storage_number=3, replicas=1, pending_number=2)
        started, stopped = processes
        self.__expectRunning(started[0])
        self.neo.expectClusterRunning()
131

132 133 134 135 136 137 138 139 140
        # start the second then the third
        stopped[0].start()
        self.__expectPending(stopped[0])
        self.neo.expectClusterRunning()
        stopped[1].start()
        self.__expectPending(stopped[1])
        self.neo.expectClusterRunning()

    def testReplicationWithNewStorage(self):
141 142

        # populate one storage
143 144 145 146
        processes = self.__setup(storage_number=2, replicas=1, pending_number=1)
        started, stopped = processes
        self.__populate()
        self.neo.expectClusterRunning()
147 148

        # start the second
149 150 151
        stopped[0].start()
        self.__expectPending(stopped[0])
        self.neo.expectClusterRunning()
152 153

        # add it to the partition table
154 155 156
        self.neo.neoctl.enableStorageList([stopped[0].getUUID()])
        self.__expectRunning(stopped[0])
        self.neo.expectClusterRunning()
157

158
        # wait for replication to finish then check 
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
        self.__checkReplicationDone()
        self.neo.expectClusterRunning()

    def testOudatedCellsOnDownStorage(self):

        # populate the two storages
        (started, _) = self.__setup(storage_number=2, replicas=1)
        self.__populate()
        self.__checkReplicationDone()
        self.neo.expectClusterRunning()

        # stop one storage and check outdated cells
        started[0].stop()
        self.neo.expectOudatedCells(number=10)
        self.neo.expectClusterRunning()

    def testVerificationTriggered(self):

        # start neo with one storages
        (started, _) = self.__setup(replicas=0, storage_number=1)
        self.__expectRunning(started[0])

        # stop it, the cluster must switch to verification
        started[0].stop()
        self.__expectUnavailable(started[0])
        self.neo.expectClusterVeryfing()

        # restart it, the cluster must come back to running state
        started[0].start()
        self.__expectRunning(started[0])
        self.neo.expectClusterRunning()

    def testSequentialStorageKill(self):

        # start neo with three storages / two replicas
        (started, _) = self.__setup(replicas=2, storage_number=3, partitions=10)
        self.__expectRunning(started[0])
        self.__expectRunning(started[1])
        self.__expectRunning(started[2])
        self.neo.expectOudatedCells(number=0)
        self.neo.expectClusterRunning()

        # stop one storage, cluster must remains running
        started[0].stop()
        self.__expectUnavailable(started[0])
        self.__expectRunning(started[1])
        self.__expectRunning(started[2])
        self.neo.expectOudatedCells(number=10)
        self.neo.expectClusterRunning()

        # stop a second storage, cluster is still running
        started[1].stop()
        self.__expectUnavailable(started[0])
        self.__expectUnavailable(started[1])
        self.__expectRunning(started[2])
        self.neo.expectOudatedCells(number=20)
        self.neo.expectClusterRunning()

        # stop the last, cluster died
        started[2].stop()
        self.__expectUnavailable(started[0])
        self.__expectUnavailable(started[1])
        self.__expectUnavailable(started[2])
        self.neo.expectOudatedCells(number=20)
        self.neo.expectClusterVeryfing()
224 225 226 227
    

if __name__ == "__main__":
    unittest.main()