Commit 7025db52 authored by Julien Muchembled's avatar Julien Muchembled

Rewrite of scheduler for threaded tests

The previous implementation was built around a 'pending' global variable that
was set by a few monkey-patches when some network activity was pending between
nodes. All this is replaced by an extra epoll object is used to wait for nodes
that have pending network events: this is simpler, and faster since it
significantly reduces the number of context switches.
parent 61009341
......@@ -16,7 +16,7 @@
# XXX: Consider using ClusterStates.STOPPING to stop clusters
import os, random, socket, sys, tempfile, threading, time, types, weakref
import os, random, select, socket, sys, tempfile, threading, time, weakref
import traceback
from collections import deque
from ConfigParser import SafeConfigParser
......@@ -33,8 +33,7 @@ from neo.client.poll import _ThreadedPoll
from neo.lib import logging
from neo.lib.connection import BaseConnection, Connection
from neo.lib.connector import SocketConnector, \
ConnectorConnectionRefusedException, ConnectorTryAgainException
from neo.lib.event import EventManager
ConnectorConnectionRefusedException
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, NodeTypes
from neo.lib.util import parseMasterList, p64
from .. import NeoTestBase, Patch, getTempDirectory, setupMySQLdb, \
......@@ -46,10 +45,8 @@ LOCAL_IP = socket.inet_pton(ADDRESS_TYPE, IP_VERSION_FORMAT_DICT[ADDRESS_TYPE])
class Serialized(object):
"""
Collection of variables/functions to schedule threads in a way that the
first to be suspended is also the first to be awoken. They also track
whether there's still pending activity in the cluster. This mainly provides
2 features to test cases:
"Threaded" tests run all nodes in the same process as the test itself,
and threads are scheduled by this class, which mainly provides 2 features:
- more determinism, by minimizing the number of active threads, and
switching them in a round-robin;
- tic() method to wait only the necessary time for the cluster to be idle.
......@@ -63,131 +60,201 @@ class Serialized(object):
the previous thread does not fail by releasing an un-acquired lock,
we actually use Semaphores instead of Locks.
Threading switching usually happens before polling for network activity.
The epoll object of each node is hooked so that thread switching happens
before polling for network activity. An extra thread act as a scheduler
and uses an epoll object to detect which node has a readable epoll object.
"""
check_timeout = False
class _Trigger(object):
_last = 0
def __init__(self):
self._lock = l = threading.Lock()
self._fd, w = os.pipe()
os.close(w)
l.acquire()
def __del__(self):
os.close(self._fd)
@classmethod
def init(cls):
cls._global_lock = threading.Semaphore(0)
cls._lock_list = deque() # FIFO of Semaphore
cls._lock_lock = threading.Lock()
cls._pdb = False
cls.blocking = threading.Event()
cls.pending = 0
cls._background = 0
cls._busy = set()
cls._busy_cond = threading.Condition(threading.Lock())
cls._epoll = select.epoll()
cls._pdb = None
cls._sched_lock = threading.Semaphore(0)
cls._step = -1
cls._trigger = t = cls._Trigger()
cls._fd_dict = {t._fd: t}
cls._thread = t = threading.Thread(target=cls._run, name=cls.__name__)
t.daemon = True
t.start()
@classmethod
def stop(cls, node_list):
cls.pending = frozenset(node_list) if node_list else 0
def background(cls, background):
if cls._background != background:
if background:
cls._background = background
cls._sched_lock.release()
else:
fd = cls._trigger._fd
cls._epoll.register(fd)
cls._trigger._lock.acquire()
cls._epoll.unregister(fd)
cls.idle(None)
cls._background = background
@classmethod
def release(cls, lock=None, wake_other=True, stop=None):
"""Resume first suspended thread"""
if lock is None:
lock = cls._global_lock
cls.stop(stop)
try:
sys._getframe(1).f_trace.im_self.set_continue()
cls._pdb = True
except AttributeError:
pass
q = cls._lock_list
with cls._lock_lock:
q.append(lock)
if wake_other:
q.popleft().release()
def idle(cls, owner):
with cls._busy_cond:
cls._busy.discard(owner)
cls._busy_cond.notify_all()
@classmethod
def acquire(cls, lock=None):
"""Suspend lock owner"""
if lock is None:
lock = cls._global_lock
lock.acquire()
pending = cls.pending # XXX: getattr once to avoid race conditions
if type(pending) is frozenset:
if lock is cls._global_lock:
cls.pending = 0
elif threading.currentThread() in pending:
sys.exit()
if cls._pdb:
cls._pdb = False
try:
sys.stdout.write(threading.currentThread().node_name)
except AttributeError:
pass
pdb(1)
def stop(cls):
assert cls._background
fd = cls._trigger._fd
cls._epoll.register(fd)
cls._trigger._lock.acquire()
del cls._trigger, cls._fd_dict[fd]
assert not cls._fd_dict
cls._sched_lock.release()
cls._thread.join()
del(cls._background, cls._busy, cls._busy_cond, cls._epoll,
cls._fd_dict, cls._pdb, cls._sched_lock, cls._step, cls._thread)
@classmethod
def tic(cls, lock=None):
# switch to another thread
# (the following calls are not supposed to be debugged into)
cls.release(lock); cls.acquire(lock)
def _run(cls):
sched_lock = cls._sched_lock
fd_dict = cls._fd_dict
sort_key = lambda fd_event: -fd_dict[fd_event[0]]._last
while 1:
sched_lock.acquire()
event_list = cls._step and cls._epoll.poll(0)
cls._step -= 1
if not event_list:
cls.idle(None)
if not cls._background:
continue
if not fd_dict:
break
event_list = cls._epoll.poll(-1)
cls._busy.add(None)
event_list.sort(key=sort_key)
next_lock = sched_lock
for fd, event in event_list:
self = fd_dict[fd]
self._release_next = next_lock.release
next_lock = self._lock
del self
next_lock.release()
@classmethod
def background(cls):
with cls._lock_lock:
if cls._lock_list:
cls._lock_list.popleft().release()
else:
cls._global_lock.release()
class SerializedEventManager(EventManager):
_lock = None
_blocking = 0
@contextmanager
def pdb(cls):
if cls._background:
yield
return
try:
cls._pdb = sys._getframe(2).f_trace.im_self
cls._pdb.set_continue()
except AttributeError:
pass
yield
p = cls._pdb
if p is not None:
cls._pdb = None
t = threading.currentThread()
p.stdout.write(getattr(t, 'node_name', t.name))
p.set_trace(sys._getframe(3))
@classmethod
def decorate(cls, func):
def decorator(*args, **kw):
def tic(cls, step=-1, check_timeout=()):
# If you're in a pdb here, 'n' switches to another thread
# (the following lines are not supposed to be debugged into)
with cls.pdb(): # does nothing if background(1) was called
f = sys._getframe(1)
try:
EventManager.__init__ = types.MethodType(
cls.__init__.im_func, None, EventManager)
return func(*args, **kw)
logging.info('tic (%s:%u) ...',
f.f_code.co_filename, f.f_lineno)
finally:
EventManager.__init__ = types.MethodType(
cls._super__init__.im_func, None, EventManager)
return wraps(func)(decorator)
_super__init__ = EventManager.__init__.im_func
del f
if cls._background:
assert step == -1 and not check_timeout
else:
with cls._busy_cond:
while cls._busy:
cls._busy_cond.wait()
cls._busy.add(None)
cls._step = step
for app in check_timeout:
app.em.epoll.check_timeout = True
app.em.wakeup()
del app
cls._sched_lock.release()
with cls._busy_cond:
while cls._busy:
cls._busy_cond.wait()
def __init__(self, app, busy=True):
self._epoll = app.em.epoll
app.em.epoll = self
if busy:
self._busy.add(self) # block tic until app waits for polling
def __getattr__(self, attr):
if attr in ('close', 'modify', 'register', 'unregister'):
return getattr(self._epoll, attr)
return self.__getattribute__(attr)
def poll(self, timeout):
if self.check_timeout:
assert timeout >= 0, (self, timeout)
del self.check_timeout
elif timeout:
with self.pdb(): # same as in tic()
release = self._release_next
self._release_next = None
release()
self._lock.acquire()
self._last = time.time()
return self._epoll.poll(timeout)
def _release_next(self):
self._last = time.time()
self._lock = threading.Semaphore(0)
fd = self._epoll.fileno()
cls = self.__class__
cls._fd_dict[fd] = self
cls._epoll.register(fd)
cls.idle(self)
def __init__(self):
def exit(self):
fd = self._epoll.fileno()
cls = self.__class__
assert cls is EventManager
self.__class__ = SerializedEventManager
self._super__init__()
def _poll(self, blocking):
if self._pending_processing:
assert blocking == 0, blocking
elif 0 == self._blocking == blocking == Serialized.pending == len(
self.writer_set):
return
if cls._fd_dict.pop(fd, None) is None:
cls.idle(self)
else:
if self.writer_set and Serialized.pending == 0:
Serialized.pending = 1
# Jump to another thread before polling, so that when a message is
# sent on the network, one can debug immediately the receiving part.
# XXX: Unfortunately, this means we have a useless full-cycle
# before the first message is sent.
# TODO: Detect where a message is sent to jump immediately to nodes
# that will do something.
Serialized.tic(self._lock)
if blocking != 0:
blocking = self._blocking
if blocking != 0:
if Serialized.pending == 1:
Serialized.pending = blocking = 0
else:
Serialized.blocking.set()
try:
EventManager._poll(self, blocking)
finally:
if blocking:
Serialized.blocking.clear()
cls._epoll.unregister(fd)
self._release_next()
class TestSerialized(Serialized):
def __init__(*args):
Serialized.__init__(busy=False, *args)
def addReader(self, conn):
EventManager.addReader(self, conn)
if type(Serialized.pending) is not frozenset:
Serialized.pending = 1
def poll(self, timeout):
if timeout and not self._background:
while 1:
r = self._epoll.poll(0)
if r:
return r
Serialized.tic(step=1)
return self._epoll.poll(timeout)
class Node(object):
......@@ -237,7 +304,6 @@ class ServerNode(Node):
return address
return cls._node_list[address[1]].getListeningAddress()
@SerializedEventManager.decorate
def __init__(self, cluster=None, address=None, **kw):
if not address:
address = self.newAddress()
......@@ -269,19 +335,16 @@ class ServerNode(Node):
self.__init__(**kw)
def start(self):
Serialized.pending = 1
self.em._lock = l = threading.Semaphore(0)
Serialized.release(l, wake_other=0)
Serialized(self)
threading.Thread.start(self)
def run(self):
try:
Serialized.acquire(self.em._lock)
super(ServerNode, self).run()
finally:
self._afterRun()
logging.debug('stopping %r', self)
Serialized.background()
self.em.epoll.exit()
def _afterRun(self):
try:
......@@ -289,13 +352,6 @@ class ServerNode(Node):
except AttributeError:
pass
def stop(self):
try:
Serialized.release(stop=(self,))
self.join()
finally:
Serialized.acquire()
def getListeningAddress(self):
try:
return self.listening_conn.getAddress()
......@@ -348,28 +404,8 @@ class StorageApplication(ServerNode, neo.storage.app.Application):
class ClientApplication(Node, neo.client.app.Application):
@SerializedEventManager.decorate
def __init__(self, master_nodes, name, **kw):
super(ClientApplication, self).__init__(master_nodes, name, **kw)
self.em._lock = threading.Semaphore(0)
def setPoll(self, master=False):
"""Set whether this thread runs freely or not
The client must be switched in master mode (by passing True) before
doing any synchronous call to the ZODB, otherwise the test deadlocks.
When called with master=False, the cluster is suspended and will only
processe packets upon NEOCluster.tic() calls.
"""
if master:
assert not self.em._blocking
self.em._blocking = 1
Serialized.background()
else:
Serialized.release(wake_other=0)
self.em._blocking = 0
self.em.wakeup()
Serialized.acquire()
def getConnectionList(self, *peers):
for peer in peers:
......@@ -382,10 +418,9 @@ class ClientApplication(Node, neo.client.app.Application):
class NeoCTL(neo.neoctl.app.NeoCTL):
@SerializedEventManager.decorate
def __init__(self, *args, **kw):
super(NeoCTL, self).__init__(*args, **kw)
self.em._blocking = 1
TestSerialized(self)
class LoggerThreadName(str):
......@@ -497,9 +532,8 @@ class NEOCluster(object):
CONNECT_LIMIT = SocketConnector.CONNECT_LIMIT
SocketConnector_bind = staticmethod(SocketConnector._bind)
SocketConnector_connect = staticmethod(SocketConnector._connect)
SocketConnector_receive = staticmethod(SocketConnector.receive)
SocketConnector_send = staticmethod(SocketConnector.send)
_ThreadedPoll_run = staticmethod(_ThreadedPoll.run)
_ThreadedPoll_start = staticmethod(_ThreadedPoll.start)
_patch_count = 0
_resource_dict = weakref.WeakValueDictionary()
......@@ -516,42 +550,22 @@ class NEOCluster(object):
cls._patch_count += 1
if cls._patch_count > 1:
return
def send(self, msg):
result = cls.SocketConnector_send(self, msg)
if type(Serialized.pending) is not frozenset:
Serialized.pending = 1
return result
def receive(self):
# If the peer sent an entire packet, make sure we read it entirely,
# otherwise Serialize.pending would be reset to 0.
data = ''
try:
while True:
d = cls.SocketConnector_receive(self)
if not d:
return data
data += d
except ConnectorTryAgainException:
if data:
return data
raise
def start(self):
Serialized(self)
cls._ThreadedPoll_start(self)
def run(self):
l = self.em._lock
Serialized.release(l, wake_other=0)
try:
Serialized.acquire(l)
cls._ThreadedPoll_run(self)
finally:
Serialized.background()
self.em.epoll.exit()
BaseConnection.getTimeout = lambda self: None
SocketConnector.CONNECT_LIMIT = 0
SocketConnector._bind = lambda self, addr: \
cls.SocketConnector_bind(self, BIND)
SocketConnector._connect = lambda self, addr: \
cls.SocketConnector_connect(self, ServerNode.resolv(addr))
SocketConnector.receive = receive
SocketConnector.send = send
_ThreadedPoll.run = run
_ThreadedPoll.start = start
Serialized.init()
@staticmethod
......@@ -565,9 +579,9 @@ class NEOCluster(object):
SocketConnector.CONNECT_LIMIT = cls.CONNECT_LIMIT
SocketConnector._bind = cls.SocketConnector_bind
SocketConnector._connect = cls.SocketConnector_connect
SocketConnector.receive = cls.SocketConnector_receive
SocketConnector.send = cls.SocketConnector_send
_ThreadedPoll.run = cls._ThreadedPoll_run
_ThreadedPoll.start = cls._ThreadedPoll_start
Serialized.stop()
def __init__(self, master_count=1, partitions=1, replicas=0, upstream=None,
adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
......@@ -662,17 +676,17 @@ class NEOCluster(object):
for node_type in 'master', 'admin':
for node in getattr(self, node_type + '_list'):
node.start()
self.tic()
Serialized.tic()
if fast_startup:
self.startCluster()
if storage_list is None:
storage_list = self.storage_list
for node in storage_list:
node.start()
self.tic()
Serialized.tic()
if not fast_startup:
self.startCluster()
self.tic()
Serialized.tic()
state = self.neoctl.getClusterState()
assert state in (ClusterStates.RUNNING, ClusterStates.BACKINGUP), state
self.enableStorageList(storage_list)
......@@ -681,7 +695,7 @@ class NEOCluster(object):
try:
self.neoctl.startCluster()
except RuntimeError:
self.tic()
Serialized.tic()
if self.neoctl.getClusterState() not in (
ClusterStates.BACKINGUP,
ClusterStates.RUNNING,
......@@ -691,7 +705,7 @@ class NEOCluster(object):
def enableStorageList(self, storage_list):
self.neoctl.enableStorageList([x.uuid for x in storage_list])
self.tic()
Serialized.tic()
for node in storage_list:
assert self.getNodeState(node) == NodeStates.RUNNING
......@@ -705,46 +719,20 @@ class NEOCluster(object):
def stop(self):
logging.debug("stopping %s", self)
client = self.client.em._blocking
if client:
self.client.em._blocking = 0
Serialized.background(True)
self.__dict__.pop('_db', self.client).close()
try:
Serialized.stop(
self.admin_list + self.storage_list + self.master_list)
client or Serialized.background()
for node_type in 'admin', 'storage', 'master':
for node in getattr(self, node_type + '_list'):
if node.is_alive():
node.join()
client = self.client.poll_thread
if client.is_alive():
client.join()
finally:
# Acquire again in case there's another cluster running, otherwise
# the test would continue with too many simultaneous awoken thread.
# If we're the last cluster, the last call to Serialized.background
# does a dummy release so that we don't get stuck here.
Serialized.release(wake_other=0); Serialized.acquire()
node_list = self.admin_list + self.storage_list + self.master_list
for node in node_list:
node.em.wakeup(True)
for node in node_list:
if node._Thread__started.is_set():
node.join()
client = self.client.poll_thread
if client.is_alive():
client.join()
logging.debug("stopped %s", self)
self._unpatch()
@staticmethod
def tic(force=False, slave=False):
f = sys._getframe(1)
try:
logging.info('tic (%s:%u) ...', f.f_code.co_filename, f.f_lineno)
finally:
del f
if slave:
return Serialized.blocking.wait()
if force:
Serialized.tic()
logging.info('forced tic')
while Serialized.pending:
Serialized.tic()
logging.info('tic')
def getNodeState(self, node):
uuid = node.uuid
for node in self.neoctl.getNodeList(node.node_type):
......@@ -757,9 +745,8 @@ class NEOCluster(object):
if cell[1] == CellStates.OUT_OF_DATE]
def getZODBStorage(self, **kw):
# automatically put client in master mode
if self.client.em._blocking == 0:
self.client.setPoll(True)
# automatically let nodes running in the background
Serialized.background(True)
return Storage.Storage(None, self.name, _app=self.client, **kw)
def importZODB(self, dummy_zodb=None, random=random):
......@@ -820,6 +807,9 @@ class NEOThreadedTest(NeoTestBase):
db.execute("UPDATE packet SET body=NULL")
db.execute("VACUUM")
background = Serialized.background
tic = Serialized.tic
def getUnpickler(self, conn):
reader = conn._reader
def unpickler(data, compression=False):
......
......@@ -77,7 +77,7 @@ class Test(NEOThreadedTest):
self.assertEqual(data_info, cluster.storage.getDataLockInfo())
serial = storage.tpc_finish(txn)
data_info[key] = 0
cluster.tic(slave=1)
self.tic()
self.assertEqual(data_info, cluster.storage.getDataLockInfo())
self.assertEqual((data, serial), storage.load(oid, ''))
storage._cache.clear()
......@@ -184,7 +184,7 @@ class Test(NEOThreadedTest):
self.assertEqual(data_info, cluster.storage.getDataLockInfo())
tid1 = storage.tpc_finish(txn[2])
cluster.tic(slave=1)
self.tic()
data_info[key] -= 1
self.assertEqual(data_info, cluster.storage.getDataLockInfo())
......@@ -360,7 +360,7 @@ class Test(NEOThreadedTest):
try:
cluster.start()
cluster.db # open DB
cluster.client.setPoll(0)
self.background(0)
s0, s1 = cluster.client.nm.getStorageList()
conn = s0.getConnection()
self.assertFalse(conn.isClosed())
......@@ -403,7 +403,7 @@ class Test(NEOThreadedTest):
t, c = cluster.getTransaction()
c.root()[0] = 'ok'
t.commit()
cluster.tic(slave=1)
self.tic()
data_info = cluster.storage.getDataLockInfo()
self.assertEqual(data_info.values(), [0, 0])
# (obj|trans) become t(obj|trans)
......@@ -475,7 +475,7 @@ class Test(NEOThreadedTest):
# drop one
cluster.neoctl.dropNode(s1.uuid)
checkNodeState(None)
cluster.tic() # Let node state update reach remaining storage
self.tic() # Let node state update reach remaining storage
checkNodeState(None)
self.assertEqual([], cluster.getOudatedCells())
# restart with s2 only
......@@ -487,7 +487,7 @@ class Test(NEOThreadedTest):
checkNodeState(None)
# then restart it, it must be in pending state
s1.start()
cluster.tic()
self.tic()
checkNodeState(NodeStates.PENDING)
finally:
cluster.stop()
......@@ -517,7 +517,7 @@ class Test(NEOThreadedTest):
storage.connectToPrimary = sys.exit
# send an unexpected to master so it aborts connection to storage
storage.master_conn.answer(Packets.Pong())
cluster.tic(force=1)
self.tic()
self.assertEqual(cluster.neoctl.getClusterState(),
ClusterStates.VERIFYING)
finally:
......@@ -532,10 +532,10 @@ class Test(NEOThreadedTest):
t, c = cluster.getTransaction()
c.root()[''] = ''
t.commit()
cluster.client.setPoll(0)
self.background(0)
# tell admin to shutdown the cluster
cluster.neoctl.setClusterState(ClusterStates.STOPPING)
cluster.tic()
self.tic()
# all nodes except clients should exit
for master in cluster.master_list:
master.join(5)
......@@ -609,10 +609,8 @@ class Test(NEOThreadedTest):
# (at this time, we still have x=0 and y=1)
t2, c2 = cluster.getTransaction()
# Copy y to x using a different Master-Client connection
cluster.client.setPoll(0)
client = ClientApplication(name=cluster.name,
master_nodes=cluster.master_nodes)
client.setPoll(1)
txn = transaction.Transaction()
client.tpc_begin(txn)
client.store(x1._p_oid, x1._p_serial, y, '', txn)
......@@ -621,8 +619,6 @@ class Test(NEOThreadedTest):
m2c.add(lambda conn, packet:
isinstance(packet, Packets.InvalidateObjects))
tid = client.tpc_finish(txn, None)
client.setPoll(0)
cluster.client.setPoll(1)
# Change to x is committed. Testing connection must ask the
# storage node to return original value of x, even if we
# haven't processed yet any invalidation for x.
......@@ -657,14 +653,10 @@ class Test(NEOThreadedTest):
# from the storage (which is <value=1, next_tid=None>) is about
# to be processed.
# Now modify x to receive an invalidation for it.
cluster.client.setPoll(0)
client.setPoll(1)
txn = transaction.Transaction()
client.tpc_begin(txn)
client.store(x2._p_oid, tid, x, '', txn) # value=0
tid = client.tpc_finish(txn, None)
client.setPoll(0)
cluster.client.setPoll(1)
t1.begin() # make sure invalidation is processed
finally:
del p
......@@ -690,15 +682,11 @@ class Test(NEOThreadedTest):
p.apply()
t = self.newThread(t1.begin)
l1.acquire()
cluster.client.setPoll(0)
client.setPoll(1)
txn = transaction.Transaction()
client.tpc_begin(txn)
client.store(x2._p_oid, tid, y, '', txn)
tid = client.tpc_finish(txn, None)
client.close()
client.setPoll(0)
cluster.client.setPoll(1)
finally:
del p
l2.release()
......@@ -728,24 +716,23 @@ class Test(NEOThreadedTest):
y = c1._storage.load(y._p_oid)[0]
# close connections to master & storage
cluster.client.setPoll(0)
self.background(0)
c, = cluster.master.nm.getClientList()
c.getConnection().close()
c, = cluster.storage.nm.getClientList()
c.getConnection().close()
cluster.tic(force=1)
self.tic()
# modify x with another client
client = ClientApplication(name=cluster.name,
master_nodes=cluster.master_nodes)
client.setPoll(1)
self.background(1)
txn = transaction.Transaction()
client.tpc_begin(txn)
client.store(x1._p_oid, x1._p_serial, y, '', txn)
tid = client.tpc_finish(txn, None)
client.close()
client.setPoll(0)
cluster.client.setPoll(1)
self.tic()
# Check reconnection to storage.
with Patch(cluster.client.cp, getConnForNode=getConnForNode):
......@@ -765,7 +752,7 @@ class Test(NEOThreadedTest):
try:
cluster.start()
client = cluster.client
client.setPoll(1)
self.background(1)
txn = transaction.Transaction()
client.tpc_begin(txn)
txn_context = client._txn_container.get(txn)
......@@ -828,13 +815,13 @@ class Test(NEOThreadedTest):
with cluster.master.filterConnection(cluster.storage) as m2s:
m2s.add(delayNotifyInformation)
cluster.client.master_conn.close()
cluster.client.setPoll(0)
self.background(0)
client = ClientApplication(name=cluster.name,
master_nodes=cluster.master_nodes)
p = Patch(client.storage_bootstrap_handler, notReady=notReady)
try:
p.apply()
client.setPoll(1)
self.background(1)
x = client.load(ZERO_TID)
finally:
del p
......
......@@ -204,6 +204,7 @@ class ImporterTests(NEOThreadedTest):
t.commit()
if cluster.storage.dm._import:
last_import = i
self.tic()
self.assertTrue(last_import and not cluster.storage.dm._import)
i = len(src_root) + 1
self.assertEqual(sorted(r.walk()), sorted(
......
......@@ -28,8 +28,7 @@ from neo.lib.protocol import CellStates, ClusterStates, Packets, \
ZERO_OID, ZERO_TID, MAX_TID, uuid_str
from neo.lib.util import p64
from .. import Patch
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, \
predictable_random, Serialized
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, predictable_random
def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
......@@ -42,7 +41,7 @@ def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
try:
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
backup.tic()
self.tic()
wrapped(self, backup)
finally:
backup.stop()
......@@ -90,19 +89,19 @@ class ReplicationTests(NEOThreadedTest):
upstream.start()
importZODB = upstream.importZODB()
importZODB(3)
upstream.client.setPoll(0)
self.background(0)
backup = NEOCluster(partitions=np, replicas=nr-1, storage_count=5,
upstream=upstream)
try:
backup.start()
# Initialize & catch up.
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
backup.tic()
self.tic()
self.assertEqual(np*nr, self.checkBackup(backup))
# Normal case, following upstream cluster closely.
importZODB(17)
upstream.client.setPoll(0)
backup.tic()
self.background(0)
self.tic()
self.assertEqual(np*nr, self.checkBackup(backup))
# Check that a backup cluster can be restarted.
finally:
......@@ -113,14 +112,14 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(backup.neoctl.getClusterState(),
ClusterStates.BACKINGUP)
importZODB(17)
upstream.client.setPoll(0)
backup.tic()
self.background(0)
self.tic()
self.assertEqual(np*nr, self.checkBackup(backup))
backup.neoctl.checkReplicas(check_dict, ZERO_TID, None)
backup.tic()
self.tic()
# Stop backing up, nothing truncated.
backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
backup.tic()
self.tic()
self.assertEqual(np*nr, self.checkBackup(backup))
self.assertEqual(backup.neoctl.getClusterState(),
ClusterStates.RUNNING)
......@@ -134,16 +133,16 @@ class ReplicationTests(NEOThreadedTest):
try:
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
backup.tic()
self.tic()
with backup.master.filterConnection(*backup.storage_list) as f:
f.add(delaySecondary)
while not f.filtered_count:
importZODB(1)
upstream.client.setPoll(0)
backup.tic()
self.background(0)
self.tic()
backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
backup.tic()
backup.tic(force=1)
self.tic()
self.tic()
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.master.getLastTransaction()))
finally:
......@@ -152,17 +151,17 @@ class ReplicationTests(NEOThreadedTest):
try:
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
backup.tic()
self.tic()
with ConnectionFilter() as f:
f.add(lambda conn, packet: conn.getUUID() is None and
isinstance(packet, Packets.AddObject))
while not f.filtered_count:
importZODB(1)
upstream.client.setPoll(0)
backup.tic()
self.background(0)
self.tic()
backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
backup.tic()
backup.tic(force=1)
self.tic()
self.tic()
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.master.getLastTransaction()))
finally:
......@@ -201,13 +200,13 @@ class ReplicationTests(NEOThreadedTest):
# Do not start with an empty DB so that 'primary_dict' below is not
# empty on the first iteration.
importZODB(1)
upstream.client.setPoll(0)
self.background(0)
backup = NEOCluster(partitions=np, replicas=2, storage_count=4,
upstream=upstream)
try:
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
backup.tic()
self.tic()
storage_list = [x.uuid for x in backup.storage_list]
slave = set(xrange(len(storage_list))).difference
for event in xrange(10):
......@@ -230,10 +229,10 @@ class ReplicationTests(NEOThreadedTest):
fetchObjects=fetchObjects)
with p:
importZODB(lambda x: counts[0] > 1)
upstream.client.setPoll(0)
self.background(0)
if event > 5:
backup.neoctl.checkReplicas(check_dict, ZERO_TID, None)
backup.tic()
self.tic()
self.assertEqual(np*3, self.checkBackup(backup))
finally:
backup.stop()
......@@ -264,7 +263,7 @@ class ReplicationTests(NEOThreadedTest):
else:
orig(self, blocking)
with Patch(EventManager, _poll=_poll) as p:
backup.tic(force=1)
self.tic()
new_conn, = backup.master.getConnectionList(backup.upstream.master)
self.assertIsNot(new_conn, conn)
......@@ -275,23 +274,24 @@ class ReplicationTests(NEOThreadedTest):
f.add(lambda conn, packet:
isinstance(packet, Packets.InvalidateObjects))
upstream.importZODB()(1)
upstream.client.setPoll(0)
self.background(0)
count = [0]
def _connect(orig, conn):
count[0] += 1
orig(conn)
with Patch(ClientConnection, _connect=_connect):
upstream.storage.listening_conn.close()
Serialized.tic(); self.assertEqual(count[0], 0)
SocketConnector.CONNECT_LIMIT = 1
Serialized.tic(); count[0] or Serialized.tic()
t = time.time()
# XXX: review API for checking timeouts
backup.storage.em._blocking = 1
Serialized.tic(); self.assertEqual(count[0], 2)
Serialized.tic(); self.assertEqual(count[0], 2)
Serialized.tic(); self.assertEqual(count[0], 3)
self.assertTrue(t + 1 <= time.time())
self.tic(step=2)
self.assertEqual(count[0], 0)
t = SocketConnector.CONNECT_LIMIT = .5
t += time.time()
self.tic()
# 1st attempt failed, 2nd is deferred
self.assertEqual(count[0], 2)
self.tic(check_timeout=(backup.storage,))
# 2nd failed, 3rd deferred
self.assertEqual(count[0], 4)
self.assertTrue(t <= time.time())
@backup_test()
def testBackupDelayedUnlockTransaction(self, backup):
......@@ -305,9 +305,9 @@ class ReplicationTests(NEOThreadedTest):
f.add(lambda conn, packet:
isinstance(packet, Packets.NotifyUnlockInformation))
upstream.importZODB()(1)
upstream.client.setPoll(0)
backup.tic()
backup.tic(force=1)
self.background(0)
self.tic()
self.tic()
self.assertEqual(1, self.checkBackup(backup))
def testReplicationAbortedBySource(self):
......@@ -342,10 +342,10 @@ class ReplicationTests(NEOThreadedTest):
try:
cluster.start([s0])
cluster.populate([range(np*2)] * np)
cluster.client.setPoll(0)
self.background(0)
s1.start()
s2.start()
cluster.tic()
self.tic()
cluster.neoctl.enableStorageList([s1.uuid, s2.uuid])
cluster.neoctl.tweakPartitionTable()
offset, = [offset for offset, row in enumerate(
......@@ -354,9 +354,9 @@ class ReplicationTests(NEOThreadedTest):
with ConnectionFilter() as connection_filter:
connection_filter.add(delayAskFetch,
Patch(s0.dm, changePartitionTable=changePartitionTable))
cluster.tic()
self.tic()
self.assertEqual(1, connection_filter.filtered_count)
cluster.tic()
self.tic()
self.checkPartitionReplicated(s1, s2, offset)
finally:
cluster.stop()
......@@ -386,24 +386,24 @@ class ReplicationTests(NEOThreadedTest):
checker.CHECK_COUNT = 2
cluster.start()
cluster.populate([range(np*2)] * tid_count)
cluster.client.setPoll(0)
self.background(0)
storage_dict = {x.uuid: x for x in cluster.storage_list}
cluster.neoctl.checkReplicas(check_dict, ZERO_TID, None)
cluster.tic()
self.tic()
check(ClusterStates.RUNNING, 0)
source = corrupt(0)
cluster.neoctl.checkReplicas(check_dict, p64(corrupt_tid+1), None)
cluster.tic()
self.tic()
check(ClusterStates.RUNNING, 0)
cluster.neoctl.checkReplicas({0: source}, ZERO_TID, None)
cluster.tic()
self.tic()
check(ClusterStates.RUNNING, 1)
corrupt(1)
cluster.neoctl.checkReplicas(check_dict, p64(corrupt_tid+1), None)
cluster.tic()
self.tic()
check(ClusterStates.RUNNING, 1)
cluster.neoctl.checkReplicas(check_dict, ZERO_TID, None)
cluster.tic()
self.tic()
check(ClusterStates.VERIFYING, 4)
finally:
checker.CHECK_COUNT = CHECK_COUNT
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment