Blame view

neo/tests/functional/__init__.py 24.1 KB
Vincent Pelletier committed
1
#
Julien Muchembled committed
2
# Copyright (C) 2009-2015  Nexedi SA
Grégory Wisniewski committed
3
#
Vincent Pelletier committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
Grégory Wisniewski committed
8
#
Vincent Pelletier committed
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
Julien Muchembled committed
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
Vincent Pelletier committed
16

Julien Muchembled committed
17
import errno
Vincent Pelletier committed
18 19 20
import os
import sys
import time
Grégory Wisniewski committed
21
import ZODB
Grégory Wisniewski committed
22
import socket
Vincent Pelletier committed
23
import signal
Vincent Pelletier committed
24
import random
Vincent Pelletier committed
25
import MySQLdb
Julien Muchembled committed
26
import sqlite3
Grégory Wisniewski committed
27
import unittest
Vincent Pelletier committed
28 29
import tempfile
import traceback
Vincent Pelletier committed
30
import threading
Julien Muchembled committed
31
import psutil
Julien Muchembled committed
32
from ConfigParser import SafeConfigParser
Vincent Pelletier committed
33

Julien Muchembled committed
34
import neo.scripts
Grégory Wisniewski committed
35
from neo.neoctl.neoctl import NeoCTL, NotReadyException
Julien Muchembled committed
36
from neo.lib import logging
Julien Muchembled committed
37 38
from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates, \
    UUID_NAMESPACES
Julien Muchembled committed
39
from neo.lib.util import dump
Julien Muchembled committed
40
from .. import cluster, DB_USER, setupMySQLdb, NeoTestBase, buildUrlFromString, \
Julien Muchembled committed
41
        ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, getTempDirectory
Vincent Pelletier committed
42
from neo.client.Storage import Storage
Julien Muchembled committed
43
from neo.storage.database import buildDatabaseManager
Vincent Pelletier committed
44

Julien Muchembled committed
45 46 47 48 49
command_dict = {
    NodeTypes.MASTER: 'neomaster',
    NodeTypes.STORAGE: 'neostorage',
    NodeTypes.ADMIN: 'neoadmin',
}
Vincent Pelletier committed
50

Vincent Pelletier committed
51
DELAY_SAFETY_MARGIN = 10
Grégory Wisniewski committed
52
MAX_START_TIME = 30
Grégory Wisniewski committed
53

Vincent Pelletier committed
54 55 56
class NodeProcessError(Exception):
    pass

Vincent Pelletier committed
57 58 59 60 61
class AlreadyRunning(Exception):
    pass

class AlreadyStopped(Exception):
    pass
Vincent Pelletier committed
62

Olivier Cros committed
63 64 65
class NotFound(Exception):
    pass

Julien Muchembled committed
66 67 68 69
class PortAllocator(object):

    def __init__(self):
        self.socket_list = []
Julien Muchembled committed
70
        self.tried_port_set = set()
Julien Muchembled committed
71 72

    def allocate(self, address_type, local_ip):
Julien Muchembled committed
73 74 75
        min_port = n = 16384
        max_port = min_port + n
        tried_port_set = self.tried_port_set
Julien Muchembled committed
76
        while True:
Julien Muchembled committed
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
            s = socket.socket(address_type, socket.SOCK_STREAM)
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            # Find an unreserved port.
            while True:
                # Do not let the system choose the port to avoid conflicts
                # with other software. IOW, use a range different than:
                # - /proc/sys/net/ipv4/ip_local_port_range on Linux
                # - what IANA recommends (49152 to 65535)
                port = random.randrange(min_port, max_port)
                if port not in tried_port_set:
                    tried_port_set.add(port)
                    try:
                        s.bind((local_ip, port))
                        break
                    except socket.error, e:
                        if e.errno != errno.EADDRINUSE:
                            raise
                elif len(tried_port_set) >= n:
                    raise RuntimeError("No free port")
            # Reserve port.
Julien Muchembled committed
97
            try:
Julien Muchembled committed
98 99 100
                s.listen(1)
                self.socket_list.append(s)
                return port
Julien Muchembled committed
101
            except socket.error, e:
Vincent Pelletier committed
102 103
                if e.errno != errno.EADDRINUSE:
                    raise
Julien Muchembled committed
104 105 106 107

    def release(self):
        for s in self.socket_list:
            s.close()
Julien Muchembled committed
108
        self.__init__()
Julien Muchembled committed
109

Julien Muchembled committed
110
    __del__ = release
Julien Muchembled committed
111

Julien Muchembled committed
112

Grégory Wisniewski committed
113
class NEOProcess(object):
Vincent Pelletier committed
114 115
    pid = 0

Grégory Wisniewski committed
116
    def __init__(self, command, uuid, arg_dict):
Julien Muchembled committed
117 118 119
        try:
            __import__('neo.scripts.' + command)
        except ImportError:
Olivier Cros committed
120
            raise NotFound, '%s not found' % (command)
Julien Muchembled committed
121
        self.command = command
Julien Muchembled committed
122
        self.arg_dict = {'--' + k: v for k, v in arg_dict.iteritems()}
Grégory Wisniewski committed
123
        self.with_uuid = True
Vincent Pelletier committed
124
        self.setUUID(uuid)
Vincent Pelletier committed
125

Grégory Wisniewski committed
126
    def start(self, with_uuid=True):
Vincent Pelletier committed
127 128 129 130
        # Prevent starting when already forked and wait wasn't called.
        if self.pid != 0:
            raise AlreadyRunning, 'Already running with PID %r' % (self.pid, )
        command = self.command
Vincent Pelletier committed
131
        args = []
Grégory Wisniewski committed
132
        self.with_uuid = with_uuid
Vincent Pelletier committed
133
        for arg, param in self.arg_dict.iteritems():
Grégory Wisniewski committed
134 135
            if with_uuid is False and arg == '--uuid':
                continue
Vincent Pelletier committed
136 137
            args.append(arg)
            if param is not None:
Grégory Wisniewski committed
138
                args.append(str(param))
Vincent Pelletier committed
139 140 141 142
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            try:
Julien Muchembled committed
143
                # release SQLite debug log
Julien Muchembled committed
144
                logging.setup()
Vincent Pelletier committed
145
                sys.argv = [command] + args
Julien Muchembled committed
146
                getattr(neo.scripts,  command).main()
Julien Muchembled committed
147 148 149 150 151 152 153
                status = 0
            except SystemExit, e:
                status = e.code
                if status is None:
                    status = 0
            except KeyboardInterrupt:
                status = 1
Vincent Pelletier committed
154
            except:
Julien Muchembled committed
155 156 157 158 159 160 161 162 163 164 165
                status = -1
                traceback.print_exc()
            finally:
                # prevent child from killing anything (cf __del__), or
                # running any other cleanup code normally done by the parent
                try:
                    os._exit(status)
                except:
                    print >>sys.stderr, status
                finally:
                    os._exit(1)
Julien Muchembled committed
166 167
        logging.info('pid %u: %s %s',
            self.pid, command, ' '.join(map(repr, args)))
Vincent Pelletier committed
168

Vincent Pelletier committed
169 170
    def kill(self, sig=signal.SIGTERM):
        if self.pid:
Julien Muchembled committed
171
            logging.info('kill pid %u', self.pid)
Vincent Pelletier committed
172
            try:
Julien Muchembled committed
173 174 175
                pdb.kill(self.pid, sig)
            except OSError:
                traceback.print_last()
Vincent Pelletier committed
176 177
        else:
            raise AlreadyStopped
Vincent Pelletier committed
178 179 180 181 182

    def __del__(self):
        # If we get killed, kill subprocesses aswell.
        try:
            self.kill(signal.SIGKILL)
Grégory Wisniewski committed
183
            self.wait()
Vincent Pelletier committed
184 185 186 187 188 189 190
        except:
            # We can ignore all exceptions at this point, since there is no
            # garanteed way to handle them (other objects we would depend on
            # might already have been deleted).
            pass

    def wait(self, options=0):
Vincent Pelletier committed
191 192 193 194
        if self.pid == 0:
            raise AlreadyStopped
        result = os.WEXITSTATUS(os.waitpid(self.pid, options)[1])
        self.pid = 0
Vincent Pelletier committed
195 196 197
        if result:
            raise NodeProcessError('%r %r exited with status %r' % (
                self.command, self.arg_dict, result))
Vincent Pelletier committed
198
        return result
Vincent Pelletier committed
199

Grégory Wisniewski committed
200 201 202 203
    def stop(self):
        self.kill()
        self.wait()

Grégory Wisniewski committed
204 205 206
    def getPID(self):
        return self.pid

Vincent Pelletier committed
207
    def getUUID(self):
Grégory Wisniewski committed
208
        assert self.with_uuid, 'UUID disabled on this process'
Vincent Pelletier committed
209 210 211 212 213 214 215
        return self.uuid

    def setUUID(self, uuid):
        """
          Note: for this change to take effect, the node must be restarted.
        """
        self.uuid = uuid
Julien Muchembled committed
216
        self.arg_dict['--uuid'] = str(uuid)
Vincent Pelletier committed
217

Grégory Wisniewski committed
218
    def isAlive(self):
Vincent Pelletier committed
219
        try:
Julien Muchembled committed
220
            return psutil.Process(self.pid).status() != psutil.STATUS_ZOMBIE
Julien Muchembled committed
221 222
        except psutil.NoSuchProcess:
            return False
Vincent Pelletier committed
223

Vincent Pelletier committed
224
class NEOCluster(object):
Grégory Wisniewski committed
225

Julien Muchembled committed
226
    def __init__(self, db_list, master_count=1, partitions=1, replicas=0,
Julien Muchembled committed
227
                 db_user=DB_USER, db_password='', name=None,
Julien Muchembled committed
228
                 cleanup_on_delete=False, temp_dir=None, clear_databases=True,
Julien Muchembled committed
229
                 adapter=os.getenv('NEO_TESTS_ADAPTER'),
Julien Muchembled committed
230
                 address_type=ADDRESS_TYPE, bind_ip=None, logger=True,
Julien Muchembled committed
231
                 importer=None):
Julien Muchembled committed
232 233
        if not adapter:
            adapter = 'MySQL'
Julien Muchembled committed
234
        self.adapter = adapter
Vincent Pelletier committed
235
        self.zodb_storage_list = []
Vincent Pelletier committed
236
        self.cleanup_on_delete = cleanup_on_delete
Julien Muchembled committed
237
        self.uuid_dict = {}
Vincent Pelletier committed
238
        self.db_list = db_list
Julien Muchembled committed
239 240 241
        if temp_dir is None:
            temp_dir = tempfile.mkdtemp(prefix='neo_')
            print 'Using temp directory ' + temp_dir
Julien Muchembled committed
242 243 244
        if adapter == 'MySQL':
            self.db_user = db_user
            self.db_password = db_password
Julien Muchembled committed
245
            self.db_template = ('%s:%s@%%s' % (db_user, db_password)).__mod__
Julien Muchembled committed
246
        elif adapter == 'SQLite':
Julien Muchembled committed
247 248 249
            self.db_template = (lambda t: lambda db:
                ':memory:' if db is None else db if os.sep in db else t % db
                )(os.path.join(temp_dir, '%s.sqlite'))
Julien Muchembled committed
250 251
        else:
            assert False, adapter
Olivier Cros committed
252
        self.address_type = address_type
Julien Muchembled committed
253 254
        self.local_ip = local_ip = bind_ip or \
            IP_VERSION_FORMAT_DICT[self.address_type]
Julien Muchembled committed
255
        self.setupDB(clear_databases)
Julien Muchembled committed
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
        if importer:
            cfg = SafeConfigParser()
            cfg.add_section("neo")
            cfg.set("neo", "adapter", adapter)
            cfg.set("neo", "database", self.db_template(*db_list))
            for name, zodb in importer:
                cfg.add_section(name)
                for x in zodb.iteritems():
                    cfg.set(name, *x)
            importer_conf = os.path.join(temp_dir, 'importer.cfg')
            with open(importer_conf, 'w') as f:
                cfg.write(f)
            adapter = "Importer"
            self.db_template = str
            db_list = importer_conf,
Vincent Pelletier committed
271
        self.process_dict = {}
Grégory Wisniewski committed
272
        self.temp_dir = temp_dir
Julien Muchembled committed
273 274
        self.port_allocator = PortAllocator()
        admin_port = self.port_allocator.allocate(address_type, local_ip)
Julien Muchembled committed
275
        self.cluster_name = name or 'neo_%s' % random.randint(0, 100)
Julien Muchembled committed
276
        master_node_list = [self.port_allocator.allocate(address_type, local_ip)
Julien Muchembled committed
277
                            for i in xrange(master_count)]
Vincent Pelletier committed
278
        self.master_nodes = ' '.join('%s:%s' % (
Julien Muchembled committed
279
                buildUrlFromString(self.local_ip), x, )
Olivier Cros committed
280
                for x in master_node_list)
Grégory Wisniewski committed
281
        # create admin node
Julien Muchembled committed
282
        self._newProcess(NodeTypes.ADMIN, logger and 'admin', admin_port)
Grégory Wisniewski committed
283
        # create master nodes
Julien Muchembled committed
284
        for i, port in enumerate(master_node_list):
Julien Muchembled committed
285
            self._newProcess(NodeTypes.MASTER, logger and 'master_%u' % i,
Julien Muchembled committed
286
                             port, partitions=partitions, replicas=replicas)
Grégory Wisniewski committed
287
        # create storage nodes
Julien Muchembled committed
288
        for i, db in enumerate(db_list):
Julien Muchembled committed
289
            self._newProcess(NodeTypes.STORAGE, logger and 'storage_%u' % i,
Julien Muchembled committed
290
                             0, adapter=adapter, database=self.db_template(db))
Grégory Wisniewski committed
291
        # create neoctl
Olivier Cros committed
292
        self.neoctl = NeoCTL((self.local_ip, admin_port))
Julien Muchembled committed
293

Julien Muchembled committed
294
    def _newProcess(self, node_type, logfile=None, port=None, **kw):
Julien Muchembled committed
295 296
        self.uuid_dict[node_type] = uuid = 1 + self.uuid_dict.get(node_type, 0)
        uuid += UUID_NAMESPACES[node_type] << 24
Julien Muchembled committed
297 298 299 300 301 302 303
        kw['uuid'] = uuid
        kw['cluster'] = self.cluster_name
        kw['masters'] = self.master_nodes
        if logfile:
            kw['logfile'] = os.path.join(self.temp_dir, logfile + '.log')
        if port is not None:
            kw['bind'] = '%s:%u' % (buildUrlFromString(self.local_ip), port)
Julien Muchembled committed
304
        self.process_dict.setdefault(node_type, []).append(
Julien Muchembled committed
305
            NEOProcess(command_dict[node_type], uuid, kw))
Vincent Pelletier committed
306

Julien Muchembled committed
307
    def setupDB(self, clear_databases=True):
Julien Muchembled committed
308
        if self.adapter == 'MySQL':
Julien Muchembled committed
309 310
            setupMySQLdb(self.db_list, self.db_user, self.db_password,
                         clear_databases)
Julien Muchembled committed
311 312 313
        elif self.adapter == 'SQLite':
            if clear_databases:
                for db in self.db_list:
Julien Muchembled committed
314 315 316
                    if db is None:
                        continue
                    db = self.db_template(db)
Julien Muchembled committed
317
                    try:
Julien Muchembled committed
318
                        os.remove(db)
Julien Muchembled committed
319 320 321 322
                    except OSError, e:
                        if e.errno != errno.ENOENT:
                            raise
                    else:
Julien Muchembled committed
323
                        logging.debug('%r deleted', db)
Vincent Pelletier committed
324

Grégory Wisniewski committed
325 326
    def run(self, except_storages=()):
        """ Start cluster processes except some storage nodes """
Vincent Pelletier committed
327
        assert len(self.process_dict)
Julien Muchembled committed
328
        self.port_allocator.release()
Vincent Pelletier committed
329 330
        for process_list in self.process_dict.itervalues():
            for process in process_list:
Grégory Wisniewski committed
331 332
                if process not in except_storages:
                    process.start()
Grégory Wisniewski committed
333
        # wait for the admin node availability
Julien Muchembled committed
334
        def test():
Vincent Pelletier committed
335
            try:
Grégory Wisniewski committed
336
                self.neoctl.getClusterState()
Vincent Pelletier committed
337
            except NotReadyException:
Julien Muchembled committed
338 339
                return False
            return True
Julien Muchembled committed
340
        if not pdb.wait(test, MAX_START_TIME):
Julien Muchembled committed
341
            raise AssertionError('Timeout when starting cluster')
Grégory Wisniewski committed
342

Julien Muchembled committed
343
    def start(self, except_storages=()):
Grégory Wisniewski committed
344
        """ Do a complete start of a cluster """
Julien Muchembled committed
345
        self.run(except_storages=except_storages)
Grégory Wisniewski committed
346
        neoctl = self.neoctl
Julien Muchembled committed
347
        target = [len(self.db_list) - len(except_storages)]
Julien Muchembled committed
348
        def test():
Julien Muchembled committed
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
            try:
                state = neoctl.getClusterState()
                if state == ClusterStates.RUNNING:
                    return True
                if state == ClusterStates.RECOVERING and target[0]:
                    pending_count = 0
                    for x in neoctl.getNodeList(node_type=NodeTypes.STORAGE):
                        if x[3] != NodeStates.PENDING:
                            target[0] = None # cluster must start automatically
                            break
                        pending_count += 1
                    if pending_count == target[0]:
                        neoctl.startCluster()
            except (NotReadyException, RuntimeError):
                pass
Julien Muchembled committed
364
        if not pdb.wait(test, MAX_START_TIME):
Julien Muchembled committed
365
            raise AssertionError('Timeout when starting cluster')
Vincent Pelletier committed
366

Vincent Pelletier committed
367
    def stop(self, clients=True):
Julien Muchembled committed
368 369 370 371 372
        # Suspend all processes to kill before actually killing them, so that
        # nodes don't log errors because they get disconnected from other nodes:
        # otherwise, storage nodes would often flush MB of logs just because we
        # killed the master first, and waste much file system space.
        stopped_list = []
Vincent Pelletier committed
373 374 375
        for process_list in self.process_dict.itervalues():
            for process in process_list:
                try:
Julien Muchembled committed
376 377
                    process.kill(signal.SIGSTOP)
                    stopped_list.append(process)
Vincent Pelletier committed
378 379
                except AlreadyStopped:
                    pass
Julien Muchembled committed
380 381 382 383 384 385 386
        error_list = []
        for process in stopped_list:
            try:
                process.kill(signal.SIGKILL)
                process.wait()
            except NodeProcessError, e:
                error_list += e.args
Vincent Pelletier committed
387 388 389 390
        if clients:
            for zodb_storage in self.zodb_storage_list:
                zodb_storage.close()
            self.zodb_storage_list = []
Grégory Wisniewski committed
391
        time.sleep(0.5)
Julien Muchembled committed
392 393
        if error_list:
            raise NodeProcessError('\n'.join(error_list))
Vincent Pelletier committed
394

Julien Muchembled committed
395 396 397 398 399 400 401 402
    def waitAll(self):
        for process_list in self.process_dict.itervalues():
            for process in process_list:
                try:
                    process.wait()
                except (AlreadyStopped, NodeProcessError):
                    pass

Vincent Pelletier committed
403 404 405
    def getNEOCTL(self):
        return self.neoctl

Julien Muchembled committed
406
    def getZODBStorage(self, **kw):
Grégory Wisniewski committed
407
        master_nodes = self.master_nodes.replace('/', ' ')
Vincent Pelletier committed
408
        result = Storage(
Grégory Wisniewski committed
409
            master_nodes=master_nodes,
Vincent Pelletier committed
410
            name=self.cluster_name,
Julien Muchembled committed
411
            **kw)
Vincent Pelletier committed
412 413
        self.zodb_storage_list.append(result)
        return result
Vincent Pelletier committed
414

Grégory Wisniewski committed
415
    def getZODBConnection(self, **kw):
Grégory Wisniewski committed
416
        """ Return a tuple with the database and a connection """
Grégory Wisniewski committed
417
        db = ZODB.DB(storage=self.getZODBStorage(**kw))
Grégory Wisniewski committed
418 419
        return (db, db.open())

Julien Muchembled committed
420
    def getSQLConnection(self, db):
Julien Muchembled committed
421 422
        assert db is not None and db in self.db_list
        return buildDatabaseManager(self.adapter, (self.db_template(db),))
Vincent Pelletier committed
423

Vincent Pelletier committed
424
    def getMasterProcessList(self):
Julien Muchembled committed
425
        return self.process_dict.get(NodeTypes.MASTER)
Vincent Pelletier committed
426 427

    def getStorageProcessList(self):
Julien Muchembled committed
428
        return self.process_dict.get(NodeTypes.STORAGE)
Vincent Pelletier committed
429 430

    def getAdminProcessList(self):
Julien Muchembled committed
431
        return self.process_dict.get(NodeTypes.ADMIN)
Vincent Pelletier committed
432

Grégory Wisniewski committed
433 434
    def _killMaster(self, primary=False, all=False):
        killed_uuid_list = []
Grégory Wisniewski committed
435
        primary_uuid = self.neoctl.getPrimary()
Grégory Wisniewski committed
436 437 438
        for master in self.getMasterProcessList():
            master_uuid = master.getUUID()
            is_primary = master_uuid == primary_uuid
Grégory Wisniewski committed
439 440 441 442 443 444
            if primary and is_primary or not (primary or is_primary):
                killed_uuid_list.append(master_uuid)
                master.kill()
                master.wait()
                if not all:
                    break
Grégory Wisniewski committed
445 446
        return killed_uuid_list

Grégory Wisniewski committed
447
    def killPrimary(self):
Grégory Wisniewski committed
448 449 450 451 452 453 454
        return self._killMaster(primary=True)

    def killSecondaryMaster(self, all=False):
        return self._killMaster(primary=False, all=all)

    def killMasters(self):
        secondary_list = self.killSecondaryMaster(all=True)
Grégory Wisniewski committed
455
        primary_list = self.killPrimary()
Grégory Wisniewski committed
456 457
        return secondary_list + primary_list

Vincent Pelletier committed
458 459 460 461 462 463 464 465 466 467
    def killStorage(self, all=False):
        killed_uuid_list = []
        for storage in self.getStorageProcessList():
            killed_uuid_list.append(storage.getUUID())
            storage.kill()
            storage.wait()
            if not all:
                break
        return killed_uuid_list

Grégory Wisniewski committed
468 469
    def __getNodeList(self, node_type, state=None):
        return [x for x in self.neoctl.getNodeList(node_type)
Grégory Wisniewski committed
470 471
                if state is None or x[3] == state]

Grégory Wisniewski committed
472
    def getMasterList(self, state=None):
Grégory Wisniewski committed
473
        return self.__getNodeList(NodeTypes.MASTER, state)
Grégory Wisniewski committed
474

Grégory Wisniewski committed
475
    def getStorageList(self, state=None):
Grégory Wisniewski committed
476
        return self.__getNodeList(NodeTypes.STORAGE, state)
Grégory Wisniewski committed
477

Grégory Wisniewski committed
478 479 480
    def getClientlist(self, state=None):
        return self.__getNodeList(NodeTypes.CLIENT, state)

Grégory Wisniewski committed
481 482
    def __getNodeState(self, node_type, uuid):
        node_list = self.__getNodeList(node_type)
Grégory Wisniewski committed
483 484 485 486 487 488 489
        for node_type, address, node_uuid, state in node_list:
            if node_uuid == uuid:
                break
        else:
            state = None
        return state

Grégory Wisniewski committed
490
    def getMasterNodeState(self, uuid):
Grégory Wisniewski committed
491
        return self.__getNodeState(NodeTypes.MASTER, uuid)
Grégory Wisniewski committed
492

Grégory Wisniewski committed
493
    def getPrimary(self):
Grégory Wisniewski committed
494
        try:
Grégory Wisniewski committed
495
            current_try = self.neoctl.getPrimary()
Grégory Wisniewski committed
496 497 498 499
        except NotReadyException:
            current_try = None
        return current_try

Julien Muchembled committed
500
    def expectCondition(self, condition, timeout=0, on_fail=None):
Grégory Wisniewski committed
501
        end = time.time() + timeout + DELAY_SAFETY_MARGIN
Julien Muchembled committed
502 503 504 505
        opaque_history = [None]
        def test():
            reached, opaque = condition(opaque_history[-1])
            if not reached:
Grégory Wisniewski committed
506
                opaque_history.append(opaque)
Julien Muchembled committed
507
            return reached
Julien Muchembled committed
508
        if not pdb.wait(test, timeout + DELAY_SAFETY_MARGIN):
Julien Muchembled committed
509
            del opaque_history[0]
Vincent Pelletier committed
510 511
            if on_fail is not None:
                on_fail(opaque_history)
Julien Muchembled committed
512 513
            raise AssertionError('Timeout while expecting condition. '
                                 'History: %s' % opaque_history)
Grégory Wisniewski committed
514

Julien Muchembled committed
515
    def expectAllMasters(self, node_count, state=None, *args, **kw):
Grégory Wisniewski committed
516
        def callback(last_try):
Vincent Pelletier committed
517 518 519 520
            try:
                current_try = len(self.getMasterList(state=state))
            except NotReadyException:
                current_try = 0
Grégory Wisniewski committed
521 522 523 524
            if last_try is not None and current_try < last_try:
                raise AssertionError, 'Regression: %s became %s' % \
                    (last_try, current_try)
            return (current_try == node_count, current_try)
Julien Muchembled committed
525
        self.expectCondition(callback, *args, **kw)
Grégory Wisniewski committed
526

Julien Muchembled committed
527
    def __expectNodeState(self, node_type, uuid, state, *args, **kw):
Grégory Wisniewski committed
528 529 530
        if not isinstance(state, (tuple, list)):
            state = (state, )
        def callback(last_try):
Vincent Pelletier committed
531 532 533 534
            try:
                current_try = self.__getNodeState(node_type, uuid)
            except NotReadyException:
                current_try = None
Grégory Wisniewski committed
535
            return current_try in state, current_try
Julien Muchembled committed
536
        self.expectCondition(callback, *args, **kw)
Grégory Wisniewski committed
537

Julien Muchembled committed
538 539
    def expectMasterState(self, uuid, state, *args, **kw):
        self.__expectNodeState(NodeTypes.MASTER, uuid, state, *args, **kw)
Grégory Wisniewski committed
540

Julien Muchembled committed
541 542
    def expectStorageState(self, uuid, state, *args, **kw):
        self.__expectNodeState(NodeTypes.STORAGE, uuid, state, *args, **kw)
Grégory Wisniewski committed
543

Julien Muchembled committed
544 545 546
    def expectRunning(self, process, *args, **kw):
        self.expectStorageState(process.getUUID(), NodeStates.RUNNING,
                                *args, **kw)
Grégory Wisniewski committed
547

Julien Muchembled committed
548 549 550
    def expectPending(self, process, *args, **kw):
        self.expectStorageState(process.getUUID(), NodeStates.PENDING,
                                *args, **kw)
Grégory Wisniewski committed
551

Julien Muchembled committed
552 553 554
    def expectUnknown(self, process, *args, **kw):
        self.expectStorageState(process.getUUID(), NodeStates.UNKNOWN,
                                *args, **kw)
Grégory Wisniewski committed
555

Julien Muchembled committed
556
    def expectUnavailable(self, process, *args, **kw):
Grégory Wisniewski committed
557
        self.expectStorageState(process.getUUID(),
Julien Muchembled committed
558
                NodeStates.TEMPORARILY_DOWN, *args, **kw)
Grégory Wisniewski committed
559

Julien Muchembled committed
560
    def expectPrimary(self, uuid=None, *args, **kw):
Grégory Wisniewski committed
561
        def callback(last_try):
Grégory Wisniewski committed
562
            current_try = self.getPrimary()
Grégory Wisniewski committed
563 564 565 566
            if None not in (uuid, current_try) and uuid != current_try:
                raise AssertionError, 'An unexpected primary arised: %r, ' \
                    'expected %r' % (dump(current_try), dump(uuid))
            return uuid is None or uuid == current_try, current_try
Julien Muchembled committed
567
        self.expectCondition(callback, *args, **kw)
Grégory Wisniewski committed
568

Julien Muchembled committed
569
    def expectOudatedCells(self, number, *args, **kw):
Grégory Wisniewski committed
570 571
        def callback(last_try):
            row_list = self.neoctl.getPartitionRowList()[1]
Grégory Wisniewski committed
572
            number_of_oudated = 0
Grégory Wisniewski committed
573 574
            for row in row_list:
                for cell in row[1]:
Grégory Wisniewski committed
575
                    if cell[1] == CellStates.OUT_OF_DATE:
Grégory Wisniewski committed
576 577
                        number_of_oudated += 1
            return number_of_oudated == number, number_of_oudated
Julien Muchembled committed
578
        self.expectCondition(callback, *args, **kw)
Grégory Wisniewski committed
579

Julien Muchembled committed
580
    def expectAssignedCells(self, process, number, *args, **kw):
Grégory Wisniewski committed
581 582 583 584 585
        def callback(last_try):
            row_list = self.neoctl.getPartitionRowList()[1]
            assigned_cells_number = 0
            for row in row_list:
                for cell in row[1]:
Grégory Wisniewski committed
586
                    if cell[0] == process.getUUID():
Grégory Wisniewski committed
587 588
                        assigned_cells_number += 1
            return assigned_cells_number == number, assigned_cells_number
Julien Muchembled committed
589
        self.expectCondition(callback, *args, **kw)
Grégory Wisniewski committed
590

Julien Muchembled committed
591
    def expectClusterState(self, state, *args, **kw):
Vincent Pelletier committed
592
        def callback(last_try):
Julien Muchembled committed
593 594 595 596
            try:
                current_try = self.neoctl.getClusterState()
            except NotReadyException:
                current_try = None
Vincent Pelletier committed
597
            return current_try == state, current_try
Julien Muchembled committed
598
        self.expectCondition(callback, *args, **kw)
Grégory Wisniewski committed
599

Julien Muchembled committed
600 601
    def expectClusterRecovering(self, *args, **kw):
        self.expectClusterState(ClusterStates.RECOVERING, *args, **kw)
Grégory Wisniewski committed
602

Julien Muchembled committed
603 604
    def expectClusterVerifying(self, *args, **kw):
        self.expectClusterState(ClusterStates.VERIFYING, *args, **kw)
Grégory Wisniewski committed
605

Julien Muchembled committed
606 607
    def expectClusterRunning(self, *args, **kw):
        self.expectClusterState(ClusterStates.RUNNING, *args, **kw)
Grégory Wisniewski committed
608

Julien Muchembled committed
609
    def expectAlive(self, process, *args, **kw):
Grégory Wisniewski committed
610 611 612
        def callback(last_try):
            current_try = process.isAlive()
            return current_try, current_try
Julien Muchembled committed
613
        self.expectCondition(callback, *args, **kw)
Grégory Wisniewski committed
614

Julien Muchembled committed
615 616 617 618 619 620
    def expectDead(self, process, *args, **kw):
        def callback(last_try):
            current_try = not process.isAlive()
            return current_try, current_try
        self.expectCondition(callback, *args, **kw)

Julien Muchembled committed
621
    def expectStorageNotKnown(self, process, *args, **kw):
Vincent Pelletier committed
622 623 624 625 626 627 628
        # /!\ Not Known != Unknown
        process_uuid = process.getUUID()
        def expected_storage_not_known(last_try):
            for storage in self.getStorageList():
                if storage[2] == process_uuid:
                    return False, storage
            return True, None
Julien Muchembled committed
629
        self.expectCondition(expected_storage_not_known, *args, **kw)
Vincent Pelletier committed
630

Vincent Pelletier committed
631 632 633
    def __del__(self):
        if self.cleanup_on_delete:
            os.removedirs(self.temp_dir)
Vincent Pelletier committed
634

Grégory Wisniewski committed
635

Vincent Pelletier committed
636
class NEOFunctionalTest(NeoTestBase):
Grégory Wisniewski committed
637

Julien Muchembled committed
638
    def setupLog(self):
Julien Muchembled committed
639
        logging.setup(os.path.join(self.getTempDirectory(), 'test.log'))
Julien Muchembled committed
640

Grégory Wisniewski committed
641 642
    def getTempDirectory(self):
        # build the full path based on test case and current test method
Julien Muchembled committed
643
        temp_dir = os.path.join(getTempDirectory(), self.id())
Grégory Wisniewski committed
644 645 646 647
        # build the path if needed
        if not os.path.exists(temp_dir):
            os.makedirs(temp_dir)
        return temp_dir
Grégory Wisniewski committed
648

Vincent Pelletier committed
649 650 651
    def runWithTimeout(self, timeout, method, args=(), kwargs=None):
        if kwargs is None:
            kwargs = {}
Vincent Pelletier committed
652 653 654 655 656 657 658
        exc_list = []
        def excWrapper(*args, **kw):
            try:
                method(*args, **kw)
            except:
                exc_list.append(sys.exc_info())
        thread = threading.Thread(None, excWrapper, args=args, kwargs=kwargs)
Julien Muchembled committed
659
        thread.daemon = True
Grégory Wisniewski committed
660 661 662
        thread.start()
        thread.join(timeout)
        self.assertFalse(thread.isAlive(), 'Run timeout')
Vincent Pelletier committed
663 664 665 666
        if exc_list:
            assert len(exc_list) == 1, exc_list
            exc = exc_list[0]
            raise exc[0], exc[1], exc[2]
Grégory Wisniewski committed
667