__init__.py 16.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
#
# Copyright (C) 2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

import os
import sys
import time
21
import ZODB
22
import signal
23
import random
24
import MySQLdb
25
import unittest
26 27 28
import tempfile
import traceback

29 30
from neo.neoctl.neoctl import NeoCTL, NotReadyException
from neo.protocol import ClusterStates, NodeTypes, CellStates
31
from neo.client.Storage import Storage
Vincent Pelletier's avatar
Vincent Pelletier committed
32 33
from neo.tests import getNewUUID
from neo.util import dump
34 35 36 37 38

NEO_MASTER = 'neomaster'
NEO_STORAGE = 'neostorage'
NEO_ADMIN = 'neoadmin'

39
DELAY_SAFETY_MARGIN = 10
40
MAX_START_TIME = 30
41

42 43 44 45 46
class AlreadyRunning(Exception):
    pass

class AlreadyStopped(Exception):
    pass
47 48 49 50

class NEOProcess:
    pid = 0

51
    def __init__(self, command, uuid, arg_dict):
52
        self.command = command
Vincent Pelletier's avatar
Vincent Pelletier committed
53 54
        self.arg_dict = arg_dict
        self.setUUID(uuid)
55 56 57 58 59 60

    def start(self):
        # Prevent starting when already forked and wait wasn't called.
        if self.pid != 0:
            raise AlreadyRunning, 'Already running with PID %r' % (self.pid, )
        command = self.command
Vincent Pelletier's avatar
Vincent Pelletier committed
61 62 63 64
        args = []
        for arg, param in self.arg_dict.iteritems():
            args.append(arg)
            if param is not None:
65
                args.append(str(param))
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            try:
                os.execlp(command, command, *args)
            except:
                print traceback.format_exc()
            # If we reach this line, exec call failed (is it possible to reach
            # it without going through above "except" branch ?).
            print 'Error executing %r.' % (command + ' ' + ' '.join(args), )
            # KeyboardInterrupt is not intercepted by test runner (it is still
            # above us in the stack), and we do want to exit.
            # To avoid polluting test foreground output with induced
            # traceback, replace stdout & stderr.
            sys.stdout = sys.stderr = open('/dev/null', 'w')
            raise KeyboardInterrupt

    def kill(self, sig=signal.SIGTERM):
        if self.pid:
            try:
                os.kill(self.pid, sig)
            except OSError:
                traceback.print_last()
89 90
        else:
            raise AlreadyStopped
91 92 93 94 95

    def __del__(self):
        # If we get killed, kill subprocesses aswell.
        try:
            self.kill(signal.SIGKILL)
96
            self.wait()
97 98 99 100 101 102 103
        except:
            # We can ignore all exceptions at this point, since there is no
            # garanteed way to handle them (other objects we would depend on
            # might already have been deleted).
            pass

    def wait(self, options=0):
104 105 106 107 108
        if self.pid == 0:
            raise AlreadyStopped
        result = os.WEXITSTATUS(os.waitpid(self.pid, options)[1])
        self.pid = 0
        return result
109

110 111 112 113
    def stop(self):
        self.kill()
        self.wait()

Vincent Pelletier's avatar
Vincent Pelletier committed
114 115 116 117 118 119 120 121
    def getUUID(self):
        return self.uuid

    def setUUID(self, uuid):
        """
          Note: for this change to take effect, the node must be restarted.
        """
        self.uuid = uuid
122
        self.arg_dict['--uuid'] = dump(uuid)
Vincent Pelletier's avatar
Vincent Pelletier committed
123

124

125
class NEOCluster(object):
126

127 128 129 130
    def __init__(self, db_list, master_node_count=1,
                 partitions=1, replicas=0, port_base=10000,
                 db_user='neo', db_password='neo',
                 db_super_user='root', db_super_password=None,
131 132
                 cleanup_on_delete=False, temp_dir=None, 
                 clear_databases=True):
133
        self.cleanup_on_delete = cleanup_on_delete
Vincent Pelletier's avatar
Vincent Pelletier committed
134
        self.uuid_set = set()
135 136 137 138 139
        self.db_super_user = db_super_user
        self.db_super_password = db_super_password
        self.db_user = db_user
        self.db_password = db_password
        self.db_list = db_list
140 141
        if clear_databases:
            self.setupDB()
Vincent Pelletier's avatar
Vincent Pelletier committed
142
        self.process_dict = {}
143
        self.last_port = port_base
144 145 146 147
        if temp_dir is None:
            temp_dir = tempfile.mkdtemp(prefix='neo_')
            print 'Using temp directory %r.' % (temp_dir, )
        self.temp_dir = temp_dir
148
        admin_port = self.__allocatePort()
149 150
        self.cluster_name = 'neo_%s' % (random.randint(0, 100), )
        master_node_list = [self.__allocatePort() for i in xrange(master_node_count)]
151
        self.master_nodes = '/'.join('127.0.0.1:%s' % (x, ) for x in master_node_list)
152 153 154 155 156 157
        # create admin node
        self.__newProcess(NEO_ADMIN, {
            '--cluster': self.cluster_name,
            '--name': 'admin',
            '--bind': '127.0.0.1:%d' % (admin_port, ),
            '--masters': self.master_nodes,
158
        })
159 160 161 162 163 164 165 166 167
        # create master nodes
        for index, port in enumerate(master_node_list):
            self.__newProcess(NEO_MASTER, {
                '--cluster': self.cluster_name,
                '--name': 'master_%d' % index,
                '--bind': '127.0.0.1:%d' % (port, ),
                '--masters': self.master_nodes,
                '--replicas': replicas,
                '--partitions': partitions,
168
            })
169 170
        # create storage nodes
        for index, db in enumerate(db_list):
171
            port = self.__allocatePort()
172 173 174 175 176 177
            self.__newProcess(NEO_STORAGE, {
                '--cluster': self.cluster_name,
                '--name': 'storage_%d' % index,
                '--bind': '127.0.0.1:%d' % (port, ),
                '--masters': self.master_nodes,
                '--database': '%s:%s@%s' % (db_user, db_password, db),
178
            })
179 180
        # create neoctl
        self.neoctl = NeoCTL('127.0.0.1', admin_port,
181
                             'SocketConnector')
182

183
    def __newProcess(self, command, arguments):
Vincent Pelletier's avatar
Vincent Pelletier committed
184
        uuid = self.__allocateUUID()
185 186 187 188 189 190
        arguments['--uuid'] = uuid
        arguments['--verbose'] = None
        logfile = arguments['--name']
        arguments['--logfile'] = os.path.join(self.temp_dir, '%s.log' % (logfile, ))
        self.process_dict.setdefault(command, []).append( 
            NEOProcess(command, uuid, arguments))
191

192 193 194 195
    def __allocatePort(self):
        port = self.last_port
        self.last_port += 1
        return port
196

Vincent Pelletier's avatar
Vincent Pelletier committed
197 198 199 200 201 202 203 204
    def __allocateUUID(self):
        uuid_set = self.uuid_set
        uuid = None
        while uuid is None or uuid in uuid_set:
            uuid = getNewUUID()
        uuid_set.add(uuid)
        return uuid

205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
    def setupDB(self):
        # Cleanup or bootstrap databases
        connect_arg_dict = {'user': self.db_super_user}
        password = self.db_super_password
        if password is not None:
            connect_arg_dict['passwd'] = password
        sql_connection = MySQLdb.Connect(**connect_arg_dict)
        cursor = sql_connection.cursor()
        for database in self.db_list:
            cursor.execute('DROP DATABASE IF EXISTS `%s`' % (database, ))
            cursor.execute('CREATE DATABASE `%s`' % (database, ))
            cursor.execute('GRANT ALL ON `%s`.* TO "%s"@"localhost" '\
                           'IDENTIFIED BY "%s"' % (database, self.db_user,
                           self.db_password))
        cursor.close()
220
        sql_connection.commit()
221
        sql_connection.close()
222

223
    def start(self, except_storages=()):
224
        neoctl = self.neoctl
Vincent Pelletier's avatar
Vincent Pelletier committed
225 226 227
        assert len(self.process_dict)
        for process_list in self.process_dict.itervalues():
            for process in process_list:
228 229
                if process not in except_storages:
                    process.start()
230 231
        # Try to put cluster in running state. This will succeed as soon as
        # admin node could connect to the primary master node.
232
        end_time = time.time() + MAX_START_TIME
233
        while True:
234 235
            if time.time() > end_time:
                raise AssertionError, 'Timeout when starting cluster'
236 237 238 239 240 241
            try:
                neoctl.startCluster()
            except NotReadyException:
                time.sleep(0.5)
            else:
                break
242
        target_count = len(self.db_list) - len(except_storages)
243
        while True:
244 245
            if time.time() > end_time:
                raise AssertionError, 'Timeout when starting cluster'
246
            storage_node_list = neoctl.getNodeList(
247
                node_type=NodeTypes.STORAGE)
248 249
            if len(storage_node_list) == target_count:
                break
250
            time.sleep(0.5)
251 252 253
        neoctl.enableStorageList([x[2] for x in storage_node_list])

    def stop(self):
Vincent Pelletier's avatar
Vincent Pelletier committed
254 255 256 257 258 259 260
        for process_list in self.process_dict.itervalues():
            for process in process_list:
                try:
                    process.kill()
                    process.wait()
                except AlreadyStopped:
                    pass
261 262 263 264

    def getNEOCTL(self):
        return self.neoctl

265
    def getZODBStorage(self):
266 267 268 269 270
        return Storage(
            master_nodes=self.master_nodes,
            name=self.cluster_name,
            connector='SocketConnector')

271
    def getZODBConnection(self):
272
        """ Return a tuple with the database and a connection """
273
        db = ZODB.DB(storage=self.getZODBStorage())
274 275
        return (db, db.open())

276 277 278 279 280
    def getSQLConnection(self, db):
        assert db in self.db_list
        return MySQLdb.Connect(user=self.db_user, passwd=self.db_password,
                               db=db)

Vincent Pelletier's avatar
Vincent Pelletier committed
281 282 283 284 285 286 287 288 289 290 291 292
    def _getProcessList(self, type):
        return self.process_dict.get(type)

    def getMasterProcessList(self):
        return self._getProcessList(NEO_MASTER)

    def getStorageProcessList(self):
        return self._getProcessList(NEO_STORAGE)

    def getAdminProcessList(self):
        return self._getProcessList(NEO_ADMIN)

293 294
    def _killMaster(self, primary=False, all=False):
        killed_uuid_list = []
295
        primary_uuid = self.neoctl.getPrimary()
296 297 298
        for master in self.getMasterProcessList():
            master_uuid = master.getUUID()
            is_primary = master_uuid == primary_uuid
299 300 301 302 303 304
            if primary and is_primary or not (primary or is_primary):
                killed_uuid_list.append(master_uuid)
                master.kill()
                master.wait()
                if not all:
                    break
305 306
        return killed_uuid_list

307
    def killPrimary(self):
308 309 310 311 312 313 314
        return self._killMaster(primary=True)

    def killSecondaryMaster(self, all=False):
        return self._killMaster(primary=False, all=all)

    def killMasters(self):
        secondary_list = self.killSecondaryMaster(all=True)
315
        primary_list = self.killPrimary()
316 317
        return secondary_list + primary_list

318 319 320 321 322 323 324 325 326 327
    def killStorage(self, all=False):
        killed_uuid_list = []
        for storage in self.getStorageProcessList():
            killed_uuid_list.append(storage.getUUID())
            storage.kill()
            storage.wait()
            if not all:
                break
        return killed_uuid_list

328 329
    def __getNodeList(self, node_type, state=None):
        return [x for x in self.neoctl.getNodeList(node_type)
330 331
                if state is None or x[3] == state]

332
    def getMasterList(self, state=None):
333
        return self.__getNodeList(NodeTypes.MASTER, state)
334

335
    def getStorageList(self, state=None):
336
        return self.__getNodeList(NodeTypes.STORAGE, state)
337

338 339
    def __getNodeState(self, node_type, uuid):
        node_list = self.__getNodeList(node_type)
340 341 342 343 344 345 346
        for node_type, address, node_uuid, state in node_list:
            if node_uuid == uuid:
                break
        else:
            state = None
        return state

347
    def getMasterNodeState(self, uuid):
348
        return self.__getNodeState(NodeTypes.MASTER, uuid)
349

350
    def getPrimary(self):
351
        try:
352
            current_try = self.neoctl.getPrimary()
353 354 355 356
        except NotReadyException:
            current_try = None
        return current_try

357
    def expectCondition(self, condition, timeout=0, delay=1):
358 359 360 361 362 363 364 365 366 367 368
        end = time.time() + timeout + DELAY_SAFETY_MARGIN
        opaque = None
        opaque_history = []
        while time.time() < end:
            reached, opaque = condition(opaque)
            if reached:
                break
            else:
                opaque_history.append(opaque)
                time.sleep(delay)
        else:
369
            raise AssertionError, 'Timeout while expecting condition. ' \
370 371 372 373
                                'History: %s' % (opaque_history, )

    def expectAllMasters(self, node_count, state=None, timeout=0, delay=1):
        def callback(last_try):
374
            current_try = len(self.getMasterList(state=state))
375 376 377 378 379 380
            if last_try is not None and current_try < last_try:
                raise AssertionError, 'Regression: %s became %s' % \
                    (last_try, current_try)
            return (current_try == node_count, current_try)
        self.expectCondition(callback, timeout, delay)

381
    def __expectNodeState(self, node_type, uuid, state, timeout=0, delay=1):
382 383 384
        if not isinstance(state, (tuple, list)):
            state = (state, )
        def callback(last_try):
385
            current_try = self.__getNodeState(node_type, uuid)
386 387
            return current_try in state, current_try
        self.expectCondition(callback, timeout, delay)
388 389
        
    def expectMasterState(self, uuid, state, timeout=0, delay=1):
390
        self.__expectNodeState(NodeTypes.MASTER, uuid, state, timeout,
391 392 393
                delay)

    def expectStorageState(self, uuid, state, timeout=0, delay=1):
394
        self.__expectNodeState(NodeTypes.STORAGE, uuid, state, 
395
                timeout,delay)
396

397
    def expectPrimary(self, uuid=None, timeout=0, delay=1):
398
        def callback(last_try):
399
            current_try = self.getPrimary()
400 401 402 403 404 405
            if None not in (uuid, current_try) and uuid != current_try:
                raise AssertionError, 'An unexpected primary arised: %r, ' \
                    'expected %r' % (dump(current_try), dump(uuid))
            return uuid is None or uuid == current_try, current_try
        self.expectCondition(callback, timeout, delay)

406
    def expectOudatedCells(self, number, timeout=0, delay=1):
407 408
        def callback(last_try):
            row_list = self.neoctl.getPartitionRowList()[1]
409
            number_of_oudated = 0
410 411
            for row in row_list:
                for cell in row[1]:
412
                    if cell[1] == CellStates.OUT_OF_DATE:
413 414
                        number_of_oudated += 1
            return number_of_oudated == number, number_of_oudated
415 416
        self.expectCondition(callback, timeout, delay)

417 418 419 420 421 422 423 424 425 426 427
    def expectAssignedCells(self, uuid, number, timeout=0, delay=1):
        def callback(last_try):
            row_list = self.neoctl.getPartitionRowList()[1]
            assigned_cells_number = 0
            for row in row_list:
                for cell in row[1]:
                    if cell[0] == uuid:
                        assigned_cells_number += 1
            return assigned_cells_number == number, assigned_cells_number
        self.expectCondition(callback, timeout, delay)

428 429 430 431 432
    def expectClusterState(self, state, timeout=0, delay=1):
        def callback(last_try):
            current_try = self.neoctl.getClusterState()
            return current_try == state, current_try
        self.expectCondition(callback, timeout, delay)
433

434
    def expectClusterRecovering(self, timeout=0, delay=1):
435
        self.expectClusterState(ClusterStates.RECOVERING)
436 437

    def expectClusterVeryfing(self, timeout=0, delay=1):
438
        self.expectClusterState(ClusterStates.VERIFYING)
439 440

    def expectClusterRunning(self, timeout=0, delay=1):
441
        self.expectClusterState(ClusterStates.RUNNING)
442

443 444 445
    def __del__(self):
        if self.cleanup_on_delete:
            os.removedirs(self.temp_dir)
446

447 448 449 450 451 452 453 454

class NEOFunctionalTest(unittest.TestCase):

    def getTempDirectory(self):
        # get the current temp directory or a new one
        temp_dir = os.environ.get('TEMP', None)
        if temp_dir is None:
            temp_dir = tempfile.mkdtemp(prefix='neo_')
455
            print 'Using temp directory %r.' % (temp_dir, )
456 457 458 459 460 461 462 463 464 465 466
        # build the full path based on test case and current test method
        test_case_name = self.__class__.__name__
        test_method_name = self._TestCase__testMethodName
        temp_dir = os.path.join(temp_dir, test_case_name)
        temp_dir = os.path.join(temp_dir, test_method_name)
        # build the path if needed
        if not os.path.exists(temp_dir):
            os.makedirs(temp_dir)
        return temp_dir