__init__.py 28.3 KB
Newer Older
1
#
Julien Muchembled's avatar
Julien Muchembled committed
2
# Copyright (C) 2011-2015  Nexedi SA
3 4 5 6 7 8 9 10 11 12 13 14
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17 18
# XXX: Consider using ClusterStates.STOPPING to stop clusters

19
import os, random, select, socket, sys, tempfile, threading, time, weakref
20
import traceback
21
from collections import deque
22
from ConfigParser import SafeConfigParser
23
from contextlib import contextmanager
24
from itertools import count
25
from functools import wraps
26
from zlib import decompress
27 28 29 30 31
from mock import Mock
import transaction, ZODB
import neo.admin.app, neo.master.app, neo.storage.app
import neo.client.app, neo.neoctl.app
from neo.client import Storage
32
from neo.lib import logging
33
from neo.lib.connection import BaseConnection, Connection
34
from neo.lib.connector import SocketConnector, ConnectorException
35
from neo.lib.locking import SimpleQueue
36
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, NodeTypes
37
from neo.lib.util import cached_property, parseMasterList, p64
38
from .. import NeoTestBase, Patch, getTempDirectory, setupMySQLdb, \
39
    ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, DB_PREFIX, DB_USER
40 41 42 43 44

BIND = IP_VERSION_FORMAT_DICT[ADDRESS_TYPE], 0
LOCAL_IP = socket.inet_pton(ADDRESS_TYPE, IP_VERSION_FORMAT_DICT[ADDRESS_TYPE])


45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
class FairLock(deque):
    """Same as a threading.Lock except that waiting threads are queued, so that
    the first one waiting for the lock is the first to get it. This is useful
    when several concurrent threads fight for the same resource in loop:
    the owner could give too little time for other to get a chance to acquire,
    blocking them for a long time with bad luck.
    """

    def __enter__(self, _allocate_lock=threading.Lock):
        me = _allocate_lock()
        me.acquire()
        self.append(me)
        other = self[0]
        while me is not other:
            with other:
                other = self[0]

    def __exit__(self, t, v, tb):
        self.popleft().release()


66
class Serialized(object):
67
    """
68 69
    "Threaded" tests run all nodes in the same process as the test itself,
    and threads are scheduled by this class, which mainly provides 2 features:
70 71 72 73 74 75 76 77 78 79 80 81 82
    - more determinism, by minimizing the number of active threads, and
      switching them in a round-robin;
    - tic() method to wait only the necessary time for the cluster to be idle.

    The basic concept is that each thread has a lock that always gets acquired
    by itself. The following pattern is used to yield the processor to the next
    thread:
        release(); acquire()
    It should be noted that this is not atomic, i.e. all other threads
    sometimes complete before a thread tries to acquire its lock: in order that
    the previous thread does not fail by releasing an un-acquired lock,
    we actually use Semaphores instead of Locks.

83
    The epoll object of each node is hooked so that thread switching happens
84 85
    before polling for network activity. An extra epoll object is used to
    detect which node has a readable epoll object.
86
    """
87 88
    check_timeout = False

89 90
    @classmethod
    def init(cls):
91 92 93 94 95
        cls._busy = set()
        cls._busy_cond = threading.Condition(threading.Lock())
        cls._epoll = select.epoll()
        cls._pdb = None
        cls._sched_lock = threading.Semaphore(0)
96 97
        cls._tic_lock = FairLock()
        cls._fd_dict = {}
98

99
    @classmethod
100 101 102 103
    def idle(cls, owner):
        with cls._busy_cond:
            cls._busy.discard(owner)
            cls._busy_cond.notify_all()
104

105
    @classmethod
106
    def stop(cls):
107 108 109
        assert not cls._fd_dict, cls._fd_dict
        del(cls._busy, cls._busy_cond, cls._epoll, cls._fd_dict,
            cls._pdb, cls._sched_lock, cls._tic_lock)
110

111
    @classmethod
112 113
    def _sort_key(cls, fd_event):
        return -cls._fd_dict[fd_event[0]]._last
114

115
    @classmethod
116 117 118 119 120 121 122 123 124 125 126 127 128 129
    @contextmanager
    def pdb(cls):
        try:
            cls._pdb = sys._getframe(2).f_trace.im_self
            cls._pdb.set_continue()
        except AttributeError:
            pass
        yield
        p = cls._pdb
        if p is not None:
            cls._pdb = None
            t = threading.currentThread()
            p.stdout.write(getattr(t, 'node_name', t.name))
            p.set_trace(sys._getframe(3))
130 131

    @classmethod
132 133 134
    def tic(cls, step=-1, check_timeout=()):
        # If you're in a pdb here, 'n' switches to another thread
        # (the following lines are not supposed to be debugged into)
135
        with cls._tic_lock, cls.pdb():
136
            f = sys._getframe(1)
137
            try:
138 139
                logging.info('tic (%s:%u) ...',
                    f.f_code.co_filename, f.f_lineno)
140
            finally:
141
                del f
142
            if cls._busy:
143 144 145
                with cls._busy_cond:
                    while cls._busy:
                        cls._busy_cond.wait()
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
            for app in check_timeout:
                app.em.epoll.check_timeout = True
                app.em.wakeup()
                del app
            while step:
                event_list = cls._epoll.poll(0)
                if not event_list:
                    break
                step -= 1
                event_list.sort(key=cls._sort_key)
                next_lock = cls._sched_lock
                for fd, event in event_list:
                    self = cls._fd_dict[fd]
                    self._release_next = next_lock.release
                    next_lock = self._lock
                del self
                next_lock.release()
                cls._sched_lock.acquire()
164 165 166 167

    def __init__(self, app, busy=True):
        self._epoll = app.em.epoll
        app.em.epoll = self
168 169 170
        # XXX: It may have been initialized before the SimpleQueue is patched.
        thread_container = getattr(app, '_thread_container', None)
        thread_container is None or thread_container.__init__()
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
        if busy:
            self._busy.add(self) # block tic until app waits for polling

    def __getattr__(self, attr):
        if attr in ('close', 'modify', 'register', 'unregister'):
            return getattr(self._epoll, attr)
        return self.__getattribute__(attr)

    def poll(self, timeout):
        if self.check_timeout:
            assert timeout >= 0, (self, timeout)
            del self.check_timeout
        elif timeout:
            with self.pdb(): # same as in tic()
                release = self._release_next
                self._release_next = None
                release()
                self._lock.acquire()
                self._last = time.time()
        return self._epoll.poll(timeout)

    def _release_next(self):
        self._last = time.time()
        self._lock = threading.Semaphore(0)
        fd = self._epoll.fileno()
        cls = self.__class__
        cls._fd_dict[fd] = self
        cls._epoll.register(fd)
        cls.idle(self)
200

201 202
    def exit(self):
        fd = self._epoll.fileno()
203
        cls = self.__class__
204 205
        if cls._fd_dict.pop(fd, None) is None:
            cls.idle(self)
206
        else:
207 208 209 210 211 212 213
            cls._epoll.unregister(fd)
            self._release_next()

class TestSerialized(Serialized):

    def __init__(*args):
        Serialized.__init__(busy=False, *args)
214

215
    def poll(self, timeout):
216
        if timeout:
217 218 219 220 221 222
            while 1:
                r = self._epoll.poll(0)
                if r:
                    return r
                Serialized.tic(step=1)
        return self._epoll.poll(timeout)
223

224

225 226
class Node(object):

227
    def getConnectionList(self, *peers):
228
        addr = lambda c: c and (c.addr if c.is_server else c.getAddress())
229
        addr_set = {addr(c.connector) for peer in peers
230
            for c in peer.em.connection_dict.itervalues()
231
            if isinstance(c, Connection)}
232
        addr_set.discard(None)
233
        return (c for c in self.em.connection_dict.itervalues()
234
            if isinstance(c, Connection) and addr(c.connector) in addr_set)
235 236 237

    def filterConnection(self, *peers):
        return ConnectionFilter(self.getConnectionList(*peers))
238 239

class ServerNode(Node):
240

241 242
    _server_class_dict = {}

243 244
    class __metaclass__(type):
        def __init__(cls, name, bases, d):
245
            if Node not in bases and threading.Thread not in cls.__mro__:
246
                cls.__bases__ = bases + (threading.Thread,)
247 248 249 250 251 252
                cls.node_type = getattr(NodeTypes, name[:-11].upper())
                cls._node_list = []
                cls._virtual_ip = socket.inet_ntop(ADDRESS_TYPE,
                    LOCAL_IP[:-1] + chr(2 + len(cls._server_class_dict)))
                cls._server_class_dict[cls._virtual_ip] = cls

253 254 255 256 257
    @staticmethod
    def resetPorts():
        for cls in ServerNode._server_class_dict.itervalues():
            del cls._node_list[:]

258 259 260 261 262 263 264 265 266 267 268 269 270
    @classmethod
    def newAddress(cls):
        address = cls._virtual_ip, len(cls._node_list)
        cls._node_list.append(None)
        return address

    @classmethod
    def resolv(cls, address):
        try:
            cls = cls._server_class_dict[address[0]]
        except KeyError:
            return address
        return cls._node_list[address[1]].getListeningAddress()
271

272
    def __init__(self, cluster=None, address=None, **kw):
273 274
        if not address:
            address = self.newAddress()
275 276 277 278 279 280
        if cluster is None:
            master_nodes = kw['master_nodes']
            name = kw['name']
        else:
            master_nodes = kw.get('master_nodes', cluster.master_nodes)
            name = kw.get('name', cluster.name)
281 282
        port = address[1]
        self._node_list[port] = weakref.proxy(self)
283 284 285
        self._init_args = init_args = kw.copy()
        init_args['cluster'] = cluster
        init_args['address'] = address
286
        threading.Thread.__init__(self)
287
        self.daemon = True
288
        self.node_name = '%s_%u' % (self.node_type, port)
289 290
        kw.update(getCluster=name, getBind=address,
                  getMasters=parseMasterList(master_nodes, address))
291 292
        super(ServerNode, self).__init__(Mock(kw))

293
    def getVirtualAddress(self):
294
        return self._init_args['address']
295

296
    def resetNode(self):
297
        assert not self.is_alive()
298
        kw = self._init_args
299
        self.close()
300
        self.__init__(**kw)
301 302

    def start(self):
303
        Serialized(self)
304 305 306 307 308 309 310
        threading.Thread.start(self)

    def run(self):
        try:
            super(ServerNode, self).run()
        finally:
            self._afterRun()
311
            logging.debug('stopping %r', self)
312
            self.em.epoll.exit()
313 314 315 316 317 318 319 320 321 322 323

    def _afterRun(self):
        try:
            self.listening_conn.close()
        except AttributeError:
            pass

    def getListeningAddress(self):
        try:
            return self.listening_conn.getAddress()
        except AttributeError:
324
            raise ConnectorException
325 326 327 328 329 330 331 332 333

class AdminApplication(ServerNode, neo.admin.app.Application):
    pass

class MasterApplication(ServerNode, neo.master.app.Application):
    pass

class StorageApplication(ServerNode, neo.storage.app.Application):

334 335
    dm = type('', (), {'close': lambda self: None})()

336
    def resetNode(self, clear_database=False):
337
        self._init_args['getReset'] = clear_database
338 339 340 341 342 343
        super(StorageApplication, self).resetNode()

    def _afterRun(self):
        super(StorageApplication, self)._afterRun()
        try:
            self.dm.close()
344
            del self.dm
345 346 347
        except StandardError: # AttributeError & ProgrammingError
            pass

348 349 350
    def getAdapter(self):
        return self._init_args['getAdapter']

351
    def switchTables(self):
352 353 354 355 356
        q = self.dm.query
        for table in 'trans', 'obj':
            q('ALTER TABLE %s RENAME TO tmp' % table)
            q('ALTER TABLE t%s RENAME TO %s' % (table, table))
            q('ALTER TABLE tmp RENAME TO t%s' % table)
357

358 359
    def getDataLockInfo(self):
        dm = self.dm
360 361
        index = tuple(dm.query("SELECT id, hash, compression FROM data"))
        assert set(dm._uncommitted_data).issubset(x[0] for x in index)
362
        get = dm._uncommitted_data.get
363 364 365 366 367
        return {(str(h), c & 0x7f): get(i, 0) for i, h, c in index}

    def sqlCount(self, table):
        (r,), = self.dm.query("SELECT COUNT(*) FROM " + table)
        return r
368

369
class ClientApplication(Node, neo.client.app.Application):
370

371 372
    def __init__(self, master_nodes, name, **kw):
        super(ClientApplication, self).__init__(master_nodes, name, **kw)
373 374
        self.poll_thread.node_name = name

375
    def _run(self):
376
        try:
377
            super(ClientApplication, self)._run()
378 379 380 381 382 383
        finally:
            self.em.epoll.exit()

    def start(self):
        isinstance(self.em.epoll, Serialized) or Serialized(self)
        super(ClientApplication, self).start()
384

385
    def getConnectionList(self, *peers):
386 387 388 389 390 391
        for peer in peers:
            if isinstance(peer, MasterApplication):
                conn = self._getMasterConnection()
            else:
                assert isinstance(peer, StorageApplication)
                conn = self.cp.getConnForNode(self.nm.getByUUID(peer.uuid))
392
            yield conn
393

394 395
class NeoCTL(neo.neoctl.app.NeoCTL):

396 397
    def __init__(self, *args, **kw):
        super(NeoCTL, self).__init__(*args, **kw)
398
        TestSerialized(self)
399 400


401
class LoggerThreadName(str):
402

403 404
    def __new__(cls, default='TEST'):
        return str.__new__(cls, default)
405

406
    def __getattribute__(self, attr):
407 408
        return getattr(str(self), attr)

409 410 411
    def __hash__(self):
        return id(self)

412 413
    def __str__(self):
        try:
414
            return threading.currentThread().node_name
415
        except AttributeError:
416
            return str.__str__(self)
417

418 419 420

class ConnectionFilter(object):

421
    filtered_count = 0
422 423 424 425 426 427 428 429
    filter_list = []
    filter_queue = weakref.WeakKeyDictionary()
    lock = threading.Lock()
    _addPacket = Connection._addPacket

    @contextmanager
    def __new__(cls, conn_list=()):
        self = object.__new__(cls)
430
        self.filter_dict = {}
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464
        self.conn_list = frozenset(conn_list)
        if not cls.filter_list:
            def _addPacket(conn, packet):
                with cls.lock:
                    try:
                        queue = cls.filter_queue[conn]
                    except KeyError:
                        for self in cls.filter_list:
                            if self(conn, packet):
                                self.filtered_count += 1
                                break
                        else:
                            return cls._addPacket(conn, packet)
                        cls.filter_queue[conn] = queue = deque()
                    p = packet.__new__(packet.__class__)
                    p.__dict__.update(packet.__dict__)
                    queue.append(p)
            Connection._addPacket = _addPacket
        try:
            cls.filter_list.append(self)
            yield self
        finally:
            del cls.filter_list[-1:]
            if not cls.filter_list:
                Connection._addPacket = cls._addPacket.im_func
        with cls.lock:
            cls._retry()

    def __call__(self, conn, packet):
        if not self.conn_list or conn in self.conn_list:
            for filter in self.filter_dict:
                if filter(conn, packet):
                    return True
        return False
465

466 467 468
    @classmethod
    def _retry(cls):
        for conn, queue in cls.filter_queue.items():
469 470
            while queue:
                packet = queue.popleft()
471 472
                for self in cls.filter_list:
                    if self(conn, packet):
473 474 475
                        queue.appendleft(packet)
                        break
                else:
476
                    cls._addPacket(conn, packet)
477 478
                    continue
                break
479 480
            else:
                del cls.filter_queue[conn]
481 482

    def add(self, filter, *patches):
483
        with self.lock:
484
            self.filter_dict[filter] = patches
485 486
            for p in patches:
                p.apply()
487 488

    def remove(self, *filters):
489
        with self.lock:
490 491 492 493
            for filter in filters:
                del self.filter_dict[filter]
            self._retry()

494 495 496 497 498 499
    def discard(self, *filters):
        try:
            self.remove(*filters)
        except KeyError:
            pass

500 501 502
    def __contains__(self, filter):
        return filter in self.filter_dict

503 504
class NEOCluster(object):

505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521
    def __init__(orig, self): # temporary definition for SimpleQueue patch
        orig(self)
        lock = self._lock
        def _lock(blocking=True):
            if blocking:
                while not lock(False):
                    Serialized.tic(step=1)
                return True
            return lock(False)
        self._lock = _lock
    _patches = (
        Patch(BaseConnection, getTimeout=lambda orig, self: None),
        Patch(SimpleQueue, __init__=__init__),
        Patch(SocketConnector, CONNECT_LIMIT=0),
        Patch(SocketConnector, _bind=lambda orig, self, addr: orig(self, BIND)),
        Patch(SocketConnector, _connect = lambda orig, self, addr:
            orig(self, ServerNode.resolv(addr))))
522 523
    _patch_count = 0
    _resource_dict = weakref.WeakValueDictionary()
524

525 526 527 528 529 530
    def _allocate(self, resource, new):
        result = resource, new()
        while result in self._resource_dict:
            result = resource, new()
        self._resource_dict[result] = self
        return result[1]
531

532 533 534
    @staticmethod
    def _patch():
        cls = NEOCluster
535 536 537
        cls._patch_count += 1
        if cls._patch_count > 1:
            return
538 539
        for patch in cls._patches:
            patch.apply()
540
        Serialized.init()
541

542
    @staticmethod
543
    def _unpatch():
544
        cls = NEOCluster
545 546 547 548
        assert cls._patch_count > 0
        cls._patch_count -= 1
        if cls._patch_count:
            return
549 550
        for patch in cls._patches:
            patch.revert()
551
        Serialized.stop()
552

553 554
    def __init__(self, master_count=1, partitions=1, replicas=0, upstream=None,
                       adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
555
                       storage_count=None, db_list=None, clear_databases=True,
556
                       db_user=DB_USER, db_password='', compress=True,
557
                       importer=None, autostart=None):
558 559
        self.name = 'neo_%s' % self._allocate('name',
            lambda: random.randint(0, 100))
560
        self.compress = compress
561 562 563
        master_list = [MasterApplication.newAddress()
                       for _ in xrange(master_count)]
        self.master_nodes = ' '.join('%s:%s' % x for x in master_list)
564 565 566
        weak_self = weakref.proxy(self)
        kw = dict(cluster=weak_self, getReplicas=replicas, getAdapter=adapter,
                  getPartitions=partitions, getReset=clear_databases)
567
        if upstream is not None:
Vincent Pelletier's avatar
Vincent Pelletier committed
568 569 570
            self.upstream = weakref.proxy(upstream)
            kw.update(getUpstreamCluster=upstream.name,
                getUpstreamMasters=parseMasterList(upstream.master_nodes))
571 572
        self.master_list = [MasterApplication(getAutostart=autostart,
                                              address=x, **kw)
573
                            for x in master_list]
574 575 576
        if db_list is None:
            if storage_count is None:
                storage_count = replicas + 1
577 578 579
            index = count().next
            db_list = ['%s%u' % (DB_PREFIX, self._allocate('db', index))
                       for _ in xrange(storage_count)]
580 581 582
        if adapter == 'MySQL':
            setupMySQLdb(db_list, db_user, db_password, clear_databases)
            db = '%s:%s@%%s' % (db_user, db_password)
583 584
        elif adapter == 'SQLite':
            db = os.path.join(getTempDirectory(), '%s.sqlite')
585 586
        else:
            assert False, adapter
587 588 589 590 591 592 593 594 595 596 597 598 599
        if importer:
            cfg = SafeConfigParser()
            cfg.add_section("neo")
            cfg.set("neo", "adapter", adapter)
            cfg.set("neo", "database", db % tuple(db_list))
            for name, zodb in importer:
                cfg.add_section(name)
                for x in zodb.iteritems():
                    cfg.set(name, *x)
            db = os.path.join(getTempDirectory(), '%s.conf')
            with open(db % tuple(db_list), "w") as f:
                cfg.write(f)
            kw["getAdapter"] = "Importer"
600 601 602
        self.storage_list = [StorageApplication(getDatabase=db % x, **kw)
                             for x in db_list]
        self.admin_list = [AdminApplication(**kw)]
603
        self.neoctl = NeoCTL(self.admin.getVirtualAddress())
604

605 606 607 608
    def __repr__(self):
        return "<%s(%s) at 0x%x>" % (self.__class__.__name__,
                                     self.name, id(self))

609 610 611 612 613 614 615 616 617 618 619 620 621 622 623
    # A few shortcuts that work when there's only 1 master/storage/admin
    @property
    def master(self):
        master, = self.master_list
        return master
    @property
    def storage(self):
        storage, = self.storage_list
        return storage
    @property
    def admin(self):
        admin, = self.admin_list
        return admin
    ###

624 625 626 627 628
    @property
    def primary_master(self):
        master, = [master for master in self.master_list if master.primary]
        return master

629
    def reset(self, clear_database=False):
630
        for node_type in 'master', 'storage', 'admin':
631 632 633 634 635
            kw = {}
            if node_type == 'storage':
                kw['clear_database'] = clear_database
            for node in getattr(self, node_type + '_list'):
                node.resetNode(**kw)
636
        self.neoctl.close()
637
        self.neoctl = NeoCTL(self.admin.getVirtualAddress())
638

639
    def start(self, storage_list=None, fast_startup=False):
640
        self._patch()
641 642 643
        for node_type in 'master', 'admin':
            for node in getattr(self, node_type + '_list'):
                node.start()
644
        Serialized.tic()
645
        if fast_startup:
646
            self.startCluster()
647 648 649 650
        if storage_list is None:
            storage_list = self.storage_list
        for node in storage_list:
            node.start()
651
        Serialized.tic()
652
        if not fast_startup:
653
            self.startCluster()
654
            Serialized.tic()
655
        state = self.neoctl.getClusterState()
656
        assert state in (ClusterStates.RUNNING, ClusterStates.BACKINGUP), state
657 658
        self.enableStorageList(storage_list)

659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
    @cached_property
    def client(self):
        client = ClientApplication(name=self.name,
            master_nodes=self.master_nodes, compress=self.compress)
        # Make sure client won't be reused after it was closed.
        def close():
            client = self.client
            del self.client, client.close
            client.close()
        client.close = close
        return client

    @cached_property
    def db(self):
        return ZODB.DB(storage=self.getZODBStorage())

675
    def startCluster(self):
676 677 678
        try:
            self.neoctl.startCluster()
        except RuntimeError:
679
            Serialized.tic()
680
            if self.neoctl.getClusterState() not in (
681
                      ClusterStates.BACKINGUP,
682 683 684 685 686
                      ClusterStates.RUNNING,
                      ClusterStates.VERIFYING,
                  ):
                raise

687 688
    def enableStorageList(self, storage_list):
        self.neoctl.enableStorageList([x.uuid for x in storage_list])
689
        Serialized.tic()
690 691 692
        for node in storage_list:
            assert self.getNodeState(node) == NodeStates.RUNNING

693 694 695 696 697 698 699
    def join(self, thread_list, timeout=5):
        timeout += time.time()
        while thread_list:
            assert time.time() < timeout
            Serialized.tic()
            thread_list = [t for t in thread_list if t.is_alive()]

700
    def stop(self):
701
        logging.debug("stopping %s", self)
702 703
        client = self.__dict__.get("client")
        client is None or self.__dict__.pop("db", client).close()
704 705 706
        node_list = self.admin_list + self.storage_list + self.master_list
        for node in node_list:
            node.em.wakeup(True)
707 708 709 710
        try:
            node_list.append(client.poll_thread)
        except AttributeError: # client is None or thread is already stopped
            pass
711
        self.join(node_list)
712
        logging.debug("stopped %s", self)
713
        self._unpatch()
714 715 716 717 718 719 720 721 722 723 724 725 726 727 728

    def getNodeState(self, node):
        uuid = node.uuid
        for node in self.neoctl.getNodeList(node.node_type):
            if node[2] == uuid:
                return node[3]

    def getOudatedCells(self):
        return [cell for row in self.neoctl.getPartitionRowList()[1]
                     for cell in row[1]
                     if cell[1] == CellStates.OUT_OF_DATE]

    def getZODBStorage(self, **kw):
        return Storage.Storage(None, self.name, _app=self.client, **kw)

729
    def importZODB(self, dummy_zodb=None, random=random):
730 731
        if dummy_zodb is None:
            from ..stat_zodb import PROD1
732
            dummy_zodb = PROD1(random)
733 734
        preindex = {}
        as_storage = dummy_zodb.as_storage
735 736
        return lambda count: self.getZODBStorage().importFrom(
            as_storage(count), preindex=preindex)
737

738 739 740 741 742 743 744 745
    def populate(self, transaction_list, tid=lambda i: p64(i+1),
                                         oid=lambda i: p64(i+1)):
        storage = self.getZODBStorage()
        tid_dict = {}
        for i, oid_list in enumerate(transaction_list):
            txn = transaction.Transaction()
            storage.tpc_begin(txn, tid(i))
            for o in oid_list:
746
                storage.store(oid(o), tid_dict.get(o), repr((i, o)), '', txn)
747 748 749 750 751
            storage.tpc_vote(txn)
            i = storage.tpc_finish(txn)
            for o in oid_list:
                tid_dict[o] = i

752 753
    def getTransaction(self):
        txn = transaction.TransactionManager()
754
        return txn, self.db.open(transaction_manager=txn)
755

756 757 758 759 760 761 762 763 764
    def __del__(self, __print_exc=traceback.print_exc):
        try:
            self.neoctl.close()
            for node_type in 'admin', 'storage', 'master':
                for node in getattr(self, node_type + '_list'):
                    node.close()
        except:
            __print_exc()
            raise
765

766
    def extraCellSortKey(self, key):
767 768
        return Patch(self.client.cp, getCellSortKey=lambda orig, cell:
            (orig(cell), key(cell)))
769

770

771
class NEOThreadedTest(NeoTestBase):
772 773 774

    def setupLog(self):
        log_file = os.path.join(getTempDirectory(), self.id() + '.log')
775
        logging.setup(log_file)
776
        return LoggerThreadName()
777

778 779
    def _tearDown(self, success):
        super(NEOThreadedTest, self)._tearDown(success)
780
        ServerNode.resetPorts()
781
        if success:
782 783 784
            with logging as db:
                db.execute("UPDATE packet SET body=NULL")
                db.execute("VACUUM")
785

786 787
    tic = Serialized.tic

788 789 790 791 792 793 794 795 796 797
    def getUnpickler(self, conn):
        reader = conn._reader
        def unpickler(data, compression=False):
            if compression:
                data = decompress(data)
            obj = reader.getGhost(data)
            reader.setGhostState(obj, data)
            return obj
        return unpickler

798 799 800 801 802
    class newThread(threading.Thread):

        def __init__(self, func, *args, **kw):
            threading.Thread.__init__(self)
            self.__target = func, args, kw
803
            self.daemon = True
804 805 806 807 808 809 810 811 812 813 814
            self.start()

        def run(self):
            try:
                apply(*self.__target)
                self.__exc_info = None
            except:
                self.__exc_info = sys.exc_info()

        def join(self, timeout=None):
            threading.Thread.join(self, timeout)
815
            if not self.is_alive() and self.__exc_info:
816 817 818
                etype, value, tb = self.__exc_info
                del self.__exc_info
                raise etype, value, tb
819 820 821 822 823 824


def predictable_random(seed=None):
    # Because we have 2 running threads when client works, we can't
    # patch neo.client.pool (and cluster should have 1 storage).
    from neo.master import backup_app
825
    from neo.master.handlers import administration
826 827 828 829
    from neo.storage import replicator
    def decorator(wrapped):
        def wrapper(*args, **kw):
            s = repr(time.time()) if seed is None else seed
830
            logging.info("using seed %r", s)
831 832
            r = random.Random(s)
            try:
833 834
                administration.random = backup_app.random = replicator.random \
                    = r
835 836
                return wrapped(*args, **kw)
            finally:
837 838
                administration.random = backup_app.random = replicator.random \
                    = random
839 840
        return wraps(wrapped)(wrapper)
    return decorator