__init__.py 14.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
#
# Copyright (C) 2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

from neo.neoctl.neoctl import NeoCTL, NotReadyException
from neo import protocol
import os
import sys
import time
import signal
24
import random
25 26 27 28
import MySQLdb
import tempfile
import traceback

29
from neo.client.Storage import Storage
Vincent Pelletier's avatar
Vincent Pelletier committed
30 31
from neo.tests import getNewUUID
from neo.util import dump
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64

NEO_CONFIG_HEADER = """
[DEFAULT]
master_nodes: %(master_nodes)s
replicas: %(replicas)s
partitions: %(partitions)s
name: %(name)s
user: %(user)s
password: %(password)s
connector: SocketConnector

[admin]
server: 127.0.0.1:%(port)s
"""

NEO_CONFIG_MASTER = """
[%(id)s]
server: 127.0.0.1:%(port)s
"""

NEO_CONFIG_STORAGE = """
[%(id)s]
database: %(db)s
server: 127.0.0.1:%(port)s
"""

NEO_MASTER_ID = 'master%s'
NEO_STORAGE_ID = 'storage%s'

NEO_MASTER = 'neomaster'
NEO_STORAGE = 'neostorage'
NEO_ADMIN = 'neoadmin'

65 66
DELAY_SAFETY_MARGIN = 5

67 68 69 70 71
class AlreadyRunning(Exception):
    pass

class AlreadyStopped(Exception):
    pass
72 73 74 75

class NEOProcess:
    pid = 0

76
    def __init__(self, command, uuid, port, arg_dict):
77
        self.command = command
Vincent Pelletier's avatar
Vincent Pelletier committed
78 79
        self.arg_dict = arg_dict
        self.setUUID(uuid)
80
        self.port = port
81 82 83 84 85 86

    def start(self):
        # Prevent starting when already forked and wait wasn't called.
        if self.pid != 0:
            raise AlreadyRunning, 'Already running with PID %r' % (self.pid, )
        command = self.command
Vincent Pelletier's avatar
Vincent Pelletier committed
87 88 89 90 91
        args = []
        for arg, param in self.arg_dict.iteritems():
            args.append(arg)
            if param is not None:
                args.append(param)
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            try:
                os.execlp(command, command, *args)
            except:
                print traceback.format_exc()
            # If we reach this line, exec call failed (is it possible to reach
            # it without going through above "except" branch ?).
            print 'Error executing %r.' % (command + ' ' + ' '.join(args), )
            # KeyboardInterrupt is not intercepted by test runner (it is still
            # above us in the stack), and we do want to exit.
            # To avoid polluting test foreground output with induced
            # traceback, replace stdout & stderr.
            sys.stdout = sys.stderr = open('/dev/null', 'w')
            raise KeyboardInterrupt

    def kill(self, sig=signal.SIGTERM):
        if self.pid:
            try:
                os.kill(self.pid, sig)
            except OSError:
                traceback.print_last()
115 116
        else:
            raise AlreadyStopped
117 118 119 120 121

    def __del__(self):
        # If we get killed, kill subprocesses aswell.
        try:
            self.kill(signal.SIGKILL)
122
            self.wait()
123 124 125 126 127 128 129
        except:
            # We can ignore all exceptions at this point, since there is no
            # garanteed way to handle them (other objects we would depend on
            # might already have been deleted).
            pass

    def wait(self, options=0):
130 131 132 133 134
        if self.pid == 0:
            raise AlreadyStopped
        result = os.WEXITSTATUS(os.waitpid(self.pid, options)[1])
        self.pid = 0
        return result
135

Vincent Pelletier's avatar
Vincent Pelletier committed
136 137 138 139 140 141 142 143
    def getUUID(self):
        return self.uuid

    def setUUID(self, uuid):
        """
          Note: for this change to take effect, the node must be restarted.
        """
        self.uuid = uuid
144
        self.arg_dict['-u'] = dump(uuid)
Vincent Pelletier's avatar
Vincent Pelletier committed
145

146 147 148
    def getPort(self):
        return self.port

149
class NEOCluster(object):
150

151 152 153 154 155 156
    def __init__(self, db_list, master_node_count=1,
                 partitions=1, replicas=0, port_base=10000,
                 db_user='neo', db_password='neo',
                 db_super_user='root', db_super_password=None,
                 cleanup_on_delete=False):
        self.cleanup_on_delete = cleanup_on_delete
Vincent Pelletier's avatar
Vincent Pelletier committed
157
        self.uuid_set = set()
158 159 160 161 162
        self.db_super_user = db_super_user
        self.db_super_password = db_super_password
        self.db_user = db_user
        self.db_password = db_password
        self.db_list = db_list
Vincent Pelletier's avatar
Vincent Pelletier committed
163
        self.process_dict = {}
164 165 166
        self.last_port = port_base
        self.temp_dir = temp_dir = tempfile.mkdtemp(prefix='neo_')
        print 'Using temp directory %r.' % (temp_dir, )
Vincent Pelletier's avatar
Vincent Pelletier committed
167
        self.config_file_path = config_file_path = os.path.join(temp_dir, 'neo.conf')
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
        config_file = open(config_file_path, 'w')
        neo_admin_port = self.__allocatePort()
        self.cluster_name = cluster_name = 'neo_%s' % (random.randint(0, 100), )
        master_node_dict = {}
        for master in xrange(master_node_count):
            master_node_dict[NEO_MASTER_ID % (master, )] = \
                self.__allocatePort()
        self.master_nodes = master_nodes = ' '.join('127.0.0.1:%s' % 
            (x, ) for x in master_node_dict.itervalues())
        config_file.write(NEO_CONFIG_HEADER % {
            'master_nodes': master_nodes,
            'replicas': replicas,
            'partitions': partitions,
            'name': cluster_name,
            'user': db_user,
            'password': db_password,
            'port': neo_admin_port,
        })
186
        self.__newProcess(NEO_ADMIN, 'admin', neo_admin_port)
187 188 189 190 191
        for config_id, port in master_node_dict.iteritems():
            config_file.write(NEO_CONFIG_MASTER % {
                'id': config_id,
                'port': port,
            })
192
            self.__newProcess(NEO_MASTER, config_id, port)
193 194
        for storage, db in enumerate(db_list):
            config_id = NEO_STORAGE_ID % (storage, )
195
            port = self.__allocatePort()
196 197 198
            config_file.write(NEO_CONFIG_STORAGE % {
                'id': config_id,
                'db': db,
199
                'port': port,
200
            })
201
            self.__newProcess(NEO_STORAGE, config_id, port)
202 203 204
        config_file.close()
        self.neoctl = NeoCTL('127.0.0.1', neo_admin_port,
                             'SocketConnector')
205

206
    def __newProcess(self, command, section, port):
Vincent Pelletier's avatar
Vincent Pelletier committed
207 208
        uuid = self.__allocateUUID()
        self.process_dict.setdefault(command, []).append(
209
            NEOProcess(command, uuid, port, {
Vincent Pelletier's avatar
Vincent Pelletier committed
210 211 212 213 214
                '-v': None,
                '-c': self.config_file_path,
                '-s': section,
                '-l': os.path.join(self.temp_dir, '%s.log' % (section, ))
            }))
215

216 217 218 219
    def __allocatePort(self):
        port = self.last_port
        self.last_port += 1
        return port
220

Vincent Pelletier's avatar
Vincent Pelletier committed
221 222 223 224 225 226 227 228
    def __allocateUUID(self):
        uuid_set = self.uuid_set
        uuid = None
        while uuid is None or uuid in uuid_set:
            uuid = getNewUUID()
        uuid_set.add(uuid)
        return uuid

229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
    def setupDB(self):
        # Cleanup or bootstrap databases
        connect_arg_dict = {'user': self.db_super_user}
        password = self.db_super_password
        if password is not None:
            connect_arg_dict['passwd'] = password
        sql_connection = MySQLdb.Connect(**connect_arg_dict)
        cursor = sql_connection.cursor()
        for database in self.db_list:
            cursor.execute('DROP DATABASE IF EXISTS `%s`' % (database, ))
            cursor.execute('CREATE DATABASE `%s`' % (database, ))
            cursor.execute('GRANT ALL ON `%s`.* TO "%s"@"localhost" '\
                           'IDENTIFIED BY "%s"' % (database, self.db_user,
                           self.db_password))
        cursor.close()
        sql_connection.close()
245

246
    def start(self, except_storages=[]):
247
        neoctl = self.neoctl
Vincent Pelletier's avatar
Vincent Pelletier committed
248 249 250
        assert len(self.process_dict)
        for process_list in self.process_dict.itervalues():
            for process in process_list:
251 252
                if process not in except_storages:
                    process.start()
253 254 255 256 257 258 259 260 261
        # Try to put cluster in running state. This will succeed as soon as
        # admin node could connect to the primary master node.
        while True:
            try:
                neoctl.startCluster()
            except NotReadyException:
                time.sleep(0.5)
            else:
                break
262
        target_count = len(self.db_list) - len(except_storages)
263 264 265 266 267
        while True:
            storage_node_list = neoctl.getNodeList(
                node_type=protocol.STORAGE_NODE_TYPE)
            if len(storage_node_list) == target_count:
                break
268
            time.sleep(0.5)
269 270 271
        neoctl.enableStorageList([x[2] for x in storage_node_list])

    def stop(self):
Vincent Pelletier's avatar
Vincent Pelletier committed
272 273 274 275 276 277 278
        for process_list in self.process_dict.itervalues():
            for process in process_list:
                try:
                    process.kill()
                    process.wait()
                except AlreadyStopped:
                    pass
279 280 281 282 283 284 285 286 287 288

    def getNEOCTL(self):
        return self.neoctl

    def getStorage(self):
        return Storage(
            master_nodes=self.master_nodes,
            name=self.cluster_name,
            connector='SocketConnector')

Vincent Pelletier's avatar
Vincent Pelletier committed
289 290 291 292 293 294 295 296 297 298 299 300
    def _getProcessList(self, type):
        return self.process_dict.get(type)

    def getMasterProcessList(self):
        return self._getProcessList(NEO_MASTER)

    def getStorageProcessList(self):
        return self._getProcessList(NEO_STORAGE)

    def getAdminProcessList(self):
        return self._getProcessList(NEO_ADMIN)

301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
    def _killMaster(self, primary=False, all=False):
        killed_uuid_list = []
        primary_uuid = self.neoctl.getPrimaryMaster()
        for master in self.getMasterProcessList():
            master_uuid = master.getUUID()
            is_primary = master_uuid == primary_uuid
            if primary and is_primary or \
               not (primary or is_primary):
                 killed_uuid_list.append(master_uuid)
                 master.kill()
                 master.wait()
                 if not all:
                     break
        return killed_uuid_list

    def killPrimaryMaster(self):
        return self._killMaster(primary=True)

    def killSecondaryMaster(self, all=False):
        return self._killMaster(primary=False, all=all)

    def killMasters(self):
        secondary_list = self.killSecondaryMaster(all=True)
        primary_list = self.killPrimaryMaster()
        return secondary_list + primary_list

327 328 329 330 331 332 333 334 335 336
    def killStorage(self, all=False):
        killed_uuid_list = []
        for storage in self.getStorageProcessList():
            killed_uuid_list.append(storage.getUUID())
            storage.kill()
            storage.wait()
            if not all:
                break
        return killed_uuid_list

337 338
    def __getNodeList(self, node_type, state=None):
        return [x for x in self.neoctl.getNodeList(node_type)
339 340
                if state is None or x[3] == state]

341 342 343 344 345 346
    def getMasterNodeList(self, state=None):
        return self.__getNodeList(protocol.MASTER_NODE_TYPE, state)

    def getStorageNodeList(self, state=None):
        return self.__getNodeList(protocol.STORAGE_NODE_TYPE, state)

347 348
    def __getNodeState(self, node_type, uuid):
        node_list = self.__getNodeList(node_type)
349 350 351 352 353 354 355
        for node_type, address, node_uuid, state in node_list:
            if node_uuid == uuid:
                break
        else:
            state = None
        return state

356 357 358
    def getMasterNodeState(self, uuid):
        return self.__getNodeState(protocol.MASTER_NODE_TYPE, uuid)

359 360 361 362 363 364 365
    def getPrimaryMaster(self):
        try:
            current_try = self.neoctl.getPrimaryMaster()
        except NotReadyException:
            current_try = None
        return current_try

366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
    def expectCondition(self, condition, timeout, delay):
        end = time.time() + timeout + DELAY_SAFETY_MARGIN
        opaque = None
        opaque_history = []
        while time.time() < end:
            reached, opaque = condition(opaque)
            if reached:
                break
            else:
                opaque_history.append(opaque)
                time.sleep(delay)
        else:
          raise AssertionError, 'Timeout while expecting condition. ' \
                                'History: %s' % (opaque_history, )

    def expectAllMasters(self, node_count, state=None, timeout=0, delay=1):
        def callback(last_try):
            current_try = len(self.getMasterNodeList(state=state))
            if last_try is not None and current_try < last_try:
                raise AssertionError, 'Regression: %s became %s' % \
                    (last_try, current_try)
            return (current_try == node_count, current_try)
        self.expectCondition(callback, timeout, delay)

390
    def __expectNodeState(self, node_type, uuid, state, timeout=0, delay=1):
391 392 393
        if not isinstance(state, (tuple, list)):
            state = (state, )
        def callback(last_try):
394
            current_try = self.__getNodeState(node_type, uuid)
395 396
            return current_try in state, current_try
        self.expectCondition(callback, timeout, delay)
397 398 399 400 401 402 403 404
        
    def expectMasterState(self, uuid, state, timeout=0, delay=1):
        self.__expectNodeState(protocol.MASTER_NODE_TYPE, uuid, state, timeout,
                delay)

    def expectStorageState(self, uuid, state, timeout=0, delay=1):
        self.__expectNodeState(protocol.STORAGE_NODE_TYPE, uuid, state, 
                timeout,delay)
405 406 407 408 409 410 411 412 413 414

    def expectPrimaryMaster(self, uuid=None, timeout=0, delay=1):
        def callback(last_try):
            current_try = self.getPrimaryMaster()
            if None not in (uuid, current_try) and uuid != current_try:
                raise AssertionError, 'An unexpected primary arised: %r, ' \
                    'expected %r' % (dump(current_try), dump(uuid))
            return uuid is None or uuid == current_try, current_try
        self.expectCondition(callback, timeout, delay)

415 416 417 418 419 420 421 422 423 424
    def expectNoOudatedCells(self, timeout=0, delay=1):
        def callback(last_try):
            row_list = self.neoctl.getPartitionRowList()[1]
            for row in row_list:
                for cell in row[1]:
                    if cell[1] != protocol.UP_TO_DATE_STATE:
                        return False, last_try
            return True, last_try
        self.expectCondition(callback, timeout, delay)

425 426 427 428 429
    def expectClusterState(self, state, timeout=0, delay=1):
        def callback(last_try):
            current_try = self.neoctl.getClusterState()
            return current_try == state, current_try
        self.expectCondition(callback, timeout, delay)
430

431 432 433
    def __del__(self):
        if self.cleanup_on_delete:
            os.removedirs(self.temp_dir)
434