__init__.py 24.1 KB
Newer Older
1
#
2
# Copyright (C) 2009-2015  Nexedi SA
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
import errno
18 19 20
import os
import sys
import time
21
import ZODB
22
import socket
23
import signal
24
import random
25
import MySQLdb
26
import sqlite3
27
import unittest
28 29
import tempfile
import traceback
30
import threading
31
import psutil
32
from ConfigParser import SafeConfigParser
33

34
import neo.scripts
35
from neo.neoctl.neoctl import NeoCTL, NotReadyException
36
from neo.lib import logging
37 38
from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates, \
    UUID_NAMESPACES
39
from neo.lib.util import dump
40
from .. import cluster, DB_USER, setupMySQLdb, NeoTestBase, buildUrlFromString, \
41
        ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, getTempDirectory
42
from neo.client.Storage import Storage
43
from neo.storage.database import buildDatabaseManager
44

45 46 47 48 49
command_dict = {
    NodeTypes.MASTER: 'neomaster',
    NodeTypes.STORAGE: 'neostorage',
    NodeTypes.ADMIN: 'neoadmin',
}
50

51
DELAY_SAFETY_MARGIN = 10
52
MAX_START_TIME = 30
53

54 55 56
class NodeProcessError(Exception):
    pass

57 58 59 60 61
class AlreadyRunning(Exception):
    pass

class AlreadyStopped(Exception):
    pass
62

63 64 65
class NotFound(Exception):
    pass

66 67 68 69
class PortAllocator(object):

    def __init__(self):
        self.socket_list = []
70
        self.tried_port_set = set()
71 72

    def allocate(self, address_type, local_ip):
73 74 75
        min_port = n = 16384
        max_port = min_port + n
        tried_port_set = self.tried_port_set
76
        while True:
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
            s = socket.socket(address_type, socket.SOCK_STREAM)
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            # Find an unreserved port.
            while True:
                # Do not let the system choose the port to avoid conflicts
                # with other software. IOW, use a range different than:
                # - /proc/sys/net/ipv4/ip_local_port_range on Linux
                # - what IANA recommends (49152 to 65535)
                port = random.randrange(min_port, max_port)
                if port not in tried_port_set:
                    tried_port_set.add(port)
                    try:
                        s.bind((local_ip, port))
                        break
                    except socket.error, e:
                        if e.errno != errno.EADDRINUSE:
                            raise
                elif len(tried_port_set) >= n:
                    raise RuntimeError("No free port")
            # Reserve port.
97
            try:
98 99 100
                s.listen(1)
                self.socket_list.append(s)
                return port
101
            except socket.error, e:
102 103
                if e.errno != errno.EADDRINUSE:
                    raise
104 105 106 107

    def release(self):
        for s in self.socket_list:
            s.close()
108
        self.__init__()
109

110
    __del__ = release
111

112

113
class NEOProcess(object):
114 115
    pid = 0

116
    def __init__(self, command, uuid, arg_dict):
117 118 119
        try:
            __import__('neo.scripts.' + command)
        except ImportError:
120
            raise NotFound, '%s not found' % (command)
121
        self.command = command
122
        self.arg_dict = {'--' + k: v for k, v in arg_dict.iteritems()}
123
        self.with_uuid = True
124
        self.setUUID(uuid)
125

126
    def start(self, with_uuid=True):
127 128 129 130
        # Prevent starting when already forked and wait wasn't called.
        if self.pid != 0:
            raise AlreadyRunning, 'Already running with PID %r' % (self.pid, )
        command = self.command
131
        args = []
132
        self.with_uuid = with_uuid
133
        for arg, param in self.arg_dict.iteritems():
134 135
            if with_uuid is False and arg == '--uuid':
                continue
136 137
            args.append(arg)
            if param is not None:
138
                args.append(str(param))
139 140 141 142
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            try:
143
                # release SQLite debug log
144
                logging.setup()
145
                sys.argv = [command] + args
146
                getattr(neo.scripts,  command).main()
147 148 149 150 151 152 153
                status = 0
            except SystemExit, e:
                status = e.code
                if status is None:
                    status = 0
            except KeyboardInterrupt:
                status = 1
154
            except:
155 156 157 158 159 160 161 162 163 164 165
                status = -1
                traceback.print_exc()
            finally:
                # prevent child from killing anything (cf __del__), or
                # running any other cleanup code normally done by the parent
                try:
                    os._exit(status)
                except:
                    print >>sys.stderr, status
                finally:
                    os._exit(1)
166 167
        logging.info('pid %u: %s %s',
            self.pid, command, ' '.join(map(repr, args)))
168

169 170
    def kill(self, sig=signal.SIGTERM):
        if self.pid:
171
            logging.info('kill pid %u', self.pid)
172
            try:
173 174 175
                pdb.kill(self.pid, sig)
            except OSError:
                traceback.print_last()
176 177
        else:
            raise AlreadyStopped
178 179 180 181 182

    def __del__(self):
        # If we get killed, kill subprocesses aswell.
        try:
            self.kill(signal.SIGKILL)
183
            self.wait()
184 185 186 187 188 189 190
        except:
            # We can ignore all exceptions at this point, since there is no
            # garanteed way to handle them (other objects we would depend on
            # might already have been deleted).
            pass

    def wait(self, options=0):
191 192 193 194
        if self.pid == 0:
            raise AlreadyStopped
        result = os.WEXITSTATUS(os.waitpid(self.pid, options)[1])
        self.pid = 0
195 196 197
        if result:
            raise NodeProcessError('%r %r exited with status %r' % (
                self.command, self.arg_dict, result))
198
        return result
199

200 201 202 203
    def stop(self):
        self.kill()
        self.wait()

204 205 206
    def getPID(self):
        return self.pid

207
    def getUUID(self):
208
        assert self.with_uuid, 'UUID disabled on this process'
209 210 211 212 213 214 215
        return self.uuid

    def setUUID(self, uuid):
        """
          Note: for this change to take effect, the node must be restarted.
        """
        self.uuid = uuid
216
        self.arg_dict['--uuid'] = str(uuid)
217

218
    def isAlive(self):
219
        try:
220
            return psutil.Process(self.pid).status() != psutil.STATUS_ZOMBIE
221 222
        except psutil.NoSuchProcess:
            return False
223

224
class NEOCluster(object):
225

226
    def __init__(self, db_list, master_count=1, partitions=1, replicas=0,
227
                 db_user=DB_USER, db_password='', name=None,
228
                 cleanup_on_delete=False, temp_dir=None, clear_databases=True,
229
                 adapter=os.getenv('NEO_TESTS_ADAPTER'),
230
                 address_type=ADDRESS_TYPE, bind_ip=None, logger=True,
231
                 importer=None):
232 233
        if not adapter:
            adapter = 'MySQL'
234
        self.adapter = adapter
235
        self.zodb_storage_list = []
236
        self.cleanup_on_delete = cleanup_on_delete
237
        self.uuid_dict = {}
238
        self.db_list = db_list
239 240 241
        if temp_dir is None:
            temp_dir = tempfile.mkdtemp(prefix='neo_')
            print 'Using temp directory ' + temp_dir
242 243 244
        if adapter == 'MySQL':
            self.db_user = db_user
            self.db_password = db_password
245
            self.db_template = ('%s:%s@%%s' % (db_user, db_password)).__mod__
246
        elif adapter == 'SQLite':
247 248 249
            self.db_template = (lambda t: lambda db:
                ':memory:' if db is None else db if os.sep in db else t % db
                )(os.path.join(temp_dir, '%s.sqlite'))
250 251
        else:
            assert False, adapter
Olivier Cros's avatar
Olivier Cros committed
252
        self.address_type = address_type
253 254
        self.local_ip = local_ip = bind_ip or \
            IP_VERSION_FORMAT_DICT[self.address_type]
255
        self.setupDB(clear_databases)
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
        if importer:
            cfg = SafeConfigParser()
            cfg.add_section("neo")
            cfg.set("neo", "adapter", adapter)
            cfg.set("neo", "database", self.db_template(*db_list))
            for name, zodb in importer:
                cfg.add_section(name)
                for x in zodb.iteritems():
                    cfg.set(name, *x)
            importer_conf = os.path.join(temp_dir, 'importer.cfg')
            with open(importer_conf, 'w') as f:
                cfg.write(f)
            adapter = "Importer"
            self.db_template = str
            db_list = importer_conf,
271
        self.process_dict = {}
272
        self.temp_dir = temp_dir
273 274
        self.port_allocator = PortAllocator()
        admin_port = self.port_allocator.allocate(address_type, local_ip)
275
        self.cluster_name = name or 'neo_%s' % random.randint(0, 100)
276
        master_node_list = [self.port_allocator.allocate(address_type, local_ip)
277
                            for i in xrange(master_count)]
278
        self.master_nodes = ' '.join('%s:%s' % (
279
                buildUrlFromString(self.local_ip), x, )
Olivier Cros's avatar
Olivier Cros committed
280
                for x in master_node_list)
281
        # create admin node
282
        self._newProcess(NodeTypes.ADMIN, logger and 'admin', admin_port)
283
        # create master nodes
284
        for i, port in enumerate(master_node_list):
285
            self._newProcess(NodeTypes.MASTER, logger and 'master_%u' % i,
286
                             port, partitions=partitions, replicas=replicas)
287
        # create storage nodes
288
        for i, db in enumerate(db_list):
289
            self._newProcess(NodeTypes.STORAGE, logger and 'storage_%u' % i,
290
                             0, adapter=adapter, database=self.db_template(db))
291
        # create neoctl
Olivier Cros's avatar
Olivier Cros committed
292
        self.neoctl = NeoCTL((self.local_ip, admin_port))
293

294
    def _newProcess(self, node_type, logfile=None, port=None, **kw):
295 296
        self.uuid_dict[node_type] = uuid = 1 + self.uuid_dict.get(node_type, 0)
        uuid += UUID_NAMESPACES[node_type] << 24
297 298 299 300 301 302 303
        kw['uuid'] = uuid
        kw['cluster'] = self.cluster_name
        kw['masters'] = self.master_nodes
        if logfile:
            kw['logfile'] = os.path.join(self.temp_dir, logfile + '.log')
        if port is not None:
            kw['bind'] = '%s:%u' % (buildUrlFromString(self.local_ip), port)
304
        self.process_dict.setdefault(node_type, []).append(
305
            NEOProcess(command_dict[node_type], uuid, kw))
306

307
    def setupDB(self, clear_databases=True):
308
        if self.adapter == 'MySQL':
309 310
            setupMySQLdb(self.db_list, self.db_user, self.db_password,
                         clear_databases)
311 312 313
        elif self.adapter == 'SQLite':
            if clear_databases:
                for db in self.db_list:
314 315 316
                    if db is None:
                        continue
                    db = self.db_template(db)
317
                    try:
318
                        os.remove(db)
319 320 321 322
                    except OSError, e:
                        if e.errno != errno.ENOENT:
                            raise
                    else:
323
                        logging.debug('%r deleted', db)
324

325 326
    def run(self, except_storages=()):
        """ Start cluster processes except some storage nodes """
327
        assert len(self.process_dict)
328
        self.port_allocator.release()
329 330
        for process_list in self.process_dict.itervalues():
            for process in process_list:
331 332
                if process not in except_storages:
                    process.start()
333
        # wait for the admin node availability
334
        def test():
335
            try:
336
                self.neoctl.getClusterState()
337
            except NotReadyException:
338 339
                return False
            return True
340
        if not pdb.wait(test, MAX_START_TIME):
341
            raise AssertionError('Timeout when starting cluster')
342

343
    def start(self, except_storages=()):
344
        """ Do a complete start of a cluster """
345
        self.run(except_storages=except_storages)
346
        neoctl = self.neoctl
347
        target = [len(self.db_list) - len(except_storages)]
348
        def test():
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
            try:
                state = neoctl.getClusterState()
                if state == ClusterStates.RUNNING:
                    return True
                if state == ClusterStates.RECOVERING and target[0]:
                    pending_count = 0
                    for x in neoctl.getNodeList(node_type=NodeTypes.STORAGE):
                        if x[3] != NodeStates.PENDING:
                            target[0] = None # cluster must start automatically
                            break
                        pending_count += 1
                    if pending_count == target[0]:
                        neoctl.startCluster()
            except (NotReadyException, RuntimeError):
                pass
364
        if not pdb.wait(test, MAX_START_TIME):
365
            raise AssertionError('Timeout when starting cluster')
366

367
    def stop(self, clients=True):
368 369 370 371 372
        # Suspend all processes to kill before actually killing them, so that
        # nodes don't log errors because they get disconnected from other nodes:
        # otherwise, storage nodes would often flush MB of logs just because we
        # killed the master first, and waste much file system space.
        stopped_list = []
373 374 375
        for process_list in self.process_dict.itervalues():
            for process in process_list:
                try:
376 377
                    process.kill(signal.SIGSTOP)
                    stopped_list.append(process)
378 379
                except AlreadyStopped:
                    pass
380 381 382 383 384 385 386
        error_list = []
        for process in stopped_list:
            try:
                process.kill(signal.SIGKILL)
                process.wait()
            except NodeProcessError, e:
                error_list += e.args
387 388 389 390
        if clients:
            for zodb_storage in self.zodb_storage_list:
                zodb_storage.close()
            self.zodb_storage_list = []
391
        time.sleep(0.5)
392 393
        if error_list:
            raise NodeProcessError('\n'.join(error_list))
394

395 396 397 398 399 400 401 402
    def waitAll(self):
        for process_list in self.process_dict.itervalues():
            for process in process_list:
                try:
                    process.wait()
                except (AlreadyStopped, NodeProcessError):
                    pass

403 404 405
    def getNEOCTL(self):
        return self.neoctl

406
    def getZODBStorage(self, **kw):
407
        master_nodes = self.master_nodes.replace('/', ' ')
408
        result = Storage(
409
            master_nodes=master_nodes,
410
            name=self.cluster_name,
411
            **kw)
412 413
        self.zodb_storage_list.append(result)
        return result
414

415
    def getZODBConnection(self, **kw):
416
        """ Return a tuple with the database and a connection """
417
        db = ZODB.DB(storage=self.getZODBStorage(**kw))
418 419
        return (db, db.open())

420
    def getSQLConnection(self, db):
421 422
        assert db is not None and db in self.db_list
        return buildDatabaseManager(self.adapter, (self.db_template(db),))
423

424
    def getMasterProcessList(self):
425
        return self.process_dict.get(NodeTypes.MASTER)
426 427

    def getStorageProcessList(self):
428
        return self.process_dict.get(NodeTypes.STORAGE)
429 430

    def getAdminProcessList(self):
431
        return self.process_dict.get(NodeTypes.ADMIN)
432

433 434
    def _killMaster(self, primary=False, all=False):
        killed_uuid_list = []
435
        primary_uuid = self.neoctl.getPrimary()
436 437 438
        for master in self.getMasterProcessList():
            master_uuid = master.getUUID()
            is_primary = master_uuid == primary_uuid
439 440 441 442 443 444
            if primary and is_primary or not (primary or is_primary):
                killed_uuid_list.append(master_uuid)
                master.kill()
                master.wait()
                if not all:
                    break
445 446
        return killed_uuid_list

447
    def killPrimary(self):
448 449 450 451 452 453 454
        return self._killMaster(primary=True)

    def killSecondaryMaster(self, all=False):
        return self._killMaster(primary=False, all=all)

    def killMasters(self):
        secondary_list = self.killSecondaryMaster(all=True)
455
        primary_list = self.killPrimary()
456 457
        return secondary_list + primary_list

458 459 460 461 462 463 464 465 466 467
    def killStorage(self, all=False):
        killed_uuid_list = []
        for storage in self.getStorageProcessList():
            killed_uuid_list.append(storage.getUUID())
            storage.kill()
            storage.wait()
            if not all:
                break
        return killed_uuid_list

468 469
    def __getNodeList(self, node_type, state=None):
        return [x for x in self.neoctl.getNodeList(node_type)
470 471
                if state is None or x[3] == state]

472
    def getMasterList(self, state=None):
473
        return self.__getNodeList(NodeTypes.MASTER, state)
474

475
    def getStorageList(self, state=None):
476
        return self.__getNodeList(NodeTypes.STORAGE, state)
477

478 479 480
    def getClientlist(self, state=None):
        return self.__getNodeList(NodeTypes.CLIENT, state)

481 482
    def __getNodeState(self, node_type, uuid):
        node_list = self.__getNodeList(node_type)
483 484 485 486 487 488 489
        for node_type, address, node_uuid, state in node_list:
            if node_uuid == uuid:
                break
        else:
            state = None
        return state

490
    def getMasterNodeState(self, uuid):
491
        return self.__getNodeState(NodeTypes.MASTER, uuid)
492

493
    def getPrimary(self):
494
        try:
495
            current_try = self.neoctl.getPrimary()
496 497 498 499
        except NotReadyException:
            current_try = None
        return current_try

500
    def expectCondition(self, condition, timeout=0, on_fail=None):
501
        end = time.time() + timeout + DELAY_SAFETY_MARGIN
502 503 504 505
        opaque_history = [None]
        def test():
            reached, opaque = condition(opaque_history[-1])
            if not reached:
506
                opaque_history.append(opaque)
507
            return reached
508
        if not pdb.wait(test, timeout + DELAY_SAFETY_MARGIN):
509
            del opaque_history[0]
510 511
            if on_fail is not None:
                on_fail(opaque_history)
512 513
            raise AssertionError('Timeout while expecting condition. '
                                 'History: %s' % opaque_history)
514

515
    def expectAllMasters(self, node_count, state=None, *args, **kw):
516
        def callback(last_try):
517 518 519 520
            try:
                current_try = len(self.getMasterList(state=state))
            except NotReadyException:
                current_try = 0
521 522 523 524
            if last_try is not None and current_try < last_try:
                raise AssertionError, 'Regression: %s became %s' % \
                    (last_try, current_try)
            return (current_try == node_count, current_try)
525
        self.expectCondition(callback, *args, **kw)
526

527
    def __expectNodeState(self, node_type, uuid, state, *args, **kw):
528 529 530
        if not isinstance(state, (tuple, list)):
            state = (state, )
        def callback(last_try):
531 532 533 534
            try:
                current_try = self.__getNodeState(node_type, uuid)
            except NotReadyException:
                current_try = None
535
            return current_try in state, current_try
536
        self.expectCondition(callback, *args, **kw)
537

538 539
    def expectMasterState(self, uuid, state, *args, **kw):
        self.__expectNodeState(NodeTypes.MASTER, uuid, state, *args, **kw)
540

541 542
    def expectStorageState(self, uuid, state, *args, **kw):
        self.__expectNodeState(NodeTypes.STORAGE, uuid, state, *args, **kw)
543

544 545 546
    def expectRunning(self, process, *args, **kw):
        self.expectStorageState(process.getUUID(), NodeStates.RUNNING,
                                *args, **kw)
547

548 549 550
    def expectPending(self, process, *args, **kw):
        self.expectStorageState(process.getUUID(), NodeStates.PENDING,
                                *args, **kw)
551

552 553 554
    def expectUnknown(self, process, *args, **kw):
        self.expectStorageState(process.getUUID(), NodeStates.UNKNOWN,
                                *args, **kw)
555

556
    def expectUnavailable(self, process, *args, **kw):
557
        self.expectStorageState(process.getUUID(),
558
                NodeStates.TEMPORARILY_DOWN, *args, **kw)
559

560
    def expectPrimary(self, uuid=None, *args, **kw):
561
        def callback(last_try):
562
            current_try = self.getPrimary()
563 564 565 566
            if None not in (uuid, current_try) and uuid != current_try:
                raise AssertionError, 'An unexpected primary arised: %r, ' \
                    'expected %r' % (dump(current_try), dump(uuid))
            return uuid is None or uuid == current_try, current_try
567
        self.expectCondition(callback, *args, **kw)
568

569
    def expectOudatedCells(self, number, *args, **kw):
570 571
        def callback(last_try):
            row_list = self.neoctl.getPartitionRowList()[1]
572
            number_of_oudated = 0
573 574
            for row in row_list:
                for cell in row[1]:
575
                    if cell[1] == CellStates.OUT_OF_DATE:
576 577
                        number_of_oudated += 1
            return number_of_oudated == number, number_of_oudated
578
        self.expectCondition(callback, *args, **kw)
579

580
    def expectAssignedCells(self, process, number, *args, **kw):
581 582 583 584 585
        def callback(last_try):
            row_list = self.neoctl.getPartitionRowList()[1]
            assigned_cells_number = 0
            for row in row_list:
                for cell in row[1]:
586
                    if cell[0] == process.getUUID():
587 588
                        assigned_cells_number += 1
            return assigned_cells_number == number, assigned_cells_number
589
        self.expectCondition(callback, *args, **kw)
590

591
    def expectClusterState(self, state, *args, **kw):
592
        def callback(last_try):
593 594 595 596
            try:
                current_try = self.neoctl.getClusterState()
            except NotReadyException:
                current_try = None
597
            return current_try == state, current_try
598
        self.expectCondition(callback, *args, **kw)
599

600 601
    def expectClusterRecovering(self, *args, **kw):
        self.expectClusterState(ClusterStates.RECOVERING, *args, **kw)
602

603 604
    def expectClusterVerifying(self, *args, **kw):
        self.expectClusterState(ClusterStates.VERIFYING, *args, **kw)
605

606 607
    def expectClusterRunning(self, *args, **kw):
        self.expectClusterState(ClusterStates.RUNNING, *args, **kw)
608

609
    def expectAlive(self, process, *args, **kw):
610 611 612
        def callback(last_try):
            current_try = process.isAlive()
            return current_try, current_try
613
        self.expectCondition(callback, *args, **kw)
614

615 616 617 618 619 620
    def expectDead(self, process, *args, **kw):
        def callback(last_try):
            current_try = not process.isAlive()
            return current_try, current_try
        self.expectCondition(callback, *args, **kw)

621
    def expectStorageNotKnown(self, process, *args, **kw):
622 623 624 625 626 627 628
        # /!\ Not Known != Unknown
        process_uuid = process.getUUID()
        def expected_storage_not_known(last_try):
            for storage in self.getStorageList():
                if storage[2] == process_uuid:
                    return False, storage
            return True, None
629
        self.expectCondition(expected_storage_not_known, *args, **kw)
630

631 632 633
    def __del__(self):
        if self.cleanup_on_delete:
            os.removedirs(self.temp_dir)
634

635

636
class NEOFunctionalTest(NeoTestBase):
637

638
    def setupLog(self):
639
        logging.setup(os.path.join(self.getTempDirectory(), 'test.log'))
640

641 642
    def getTempDirectory(self):
        # build the full path based on test case and current test method
643
        temp_dir = os.path.join(getTempDirectory(), self.id())
644 645 646 647
        # build the path if needed
        if not os.path.exists(temp_dir):
            os.makedirs(temp_dir)
        return temp_dir
648

649 650 651
    def runWithTimeout(self, timeout, method, args=(), kwargs=None):
        if kwargs is None:
            kwargs = {}
652 653 654 655 656 657 658
        exc_list = []
        def excWrapper(*args, **kw):
            try:
                method(*args, **kw)
            except:
                exc_list.append(sys.exc_info())
        thread = threading.Thread(None, excWrapper, args=args, kwargs=kwargs)
659
        thread.daemon = True
660 661 662
        thread.start()
        thread.join(timeout)
        self.assertFalse(thread.isAlive(), 'Run timeout')
663 664 665 666
        if exc_list:
            assert len(exc_list) == 1, exc_list
            exc = exc_list[0]
            raise exc[0], exc[1], exc[2]
667