Commit 50d25d00 authored by Julien Muchembled's avatar Julien Muchembled

Drop 'background' mode completely in threaded tests

It was still used to stop a cluster.
parent 4253d24f
......@@ -44,6 +44,27 @@ BIND = IP_VERSION_FORMAT_DICT[ADDRESS_TYPE], 0
LOCAL_IP = socket.inet_pton(ADDRESS_TYPE, IP_VERSION_FORMAT_DICT[ADDRESS_TYPE])
class FairLock(deque):
"""Same as a threading.Lock except that waiting threads are queued, so that
the first one waiting for the lock is the first to get it. This is useful
when several concurrent threads fight for the same resource in loop:
the owner could give too little time for other to get a chance to acquire,
blocking them for a long time with bad luck.
"""
def __enter__(self, _allocate_lock=threading.Lock):
me = _allocate_lock()
me.acquire()
self.append(me)
other = self[0]
while me is not other:
with other:
other = self[0]
def __exit__(self, t, v, tb):
self.popleft().release()
class Serialized(object):
"""
"Threaded" tests run all nodes in the same process as the test itself,
......@@ -62,54 +83,20 @@ class Serialized(object):
we actually use Semaphores instead of Locks.
The epoll object of each node is hooked so that thread switching happens
before polling for network activity. An extra thread act as a scheduler
and uses an epoll object to detect which node has a readable epoll object.
before polling for network activity. An extra epoll object is used to
detect which node has a readable epoll object.
"""
check_timeout = False
class _Trigger(object):
_last = 0
def __init__(self):
self._lock = l = threading.Lock()
self._fd, w = os.pipe()
os.close(w)
l.acquire()
def __del__(self):
os.close(self._fd)
@classmethod
def init(cls):
cls._background = 0
cls._busy = set()
cls._busy_cond = threading.Condition(threading.Lock())
cls._epoll = select.epoll()
cls._pdb = None
cls._sched_lock = threading.Semaphore(0)
cls._step = -1
cls._trigger = t = cls._Trigger()
cls._fd_dict = {t._fd: t}
cls._thread = t = threading.Thread(target=cls._run, name=cls.__name__)
t.daemon = True
t.start()
@classmethod
def background(cls, background):
prev = cls._background
if prev != background:
if background:
cls._background = background
cls._sched_lock.release()
else:
fd = cls._trigger._fd
cls._epoll.register(fd)
cls._trigger._lock.acquire()
cls._epoll.unregister(fd)
cls.idle(None)
cls._background = background
return prev
cls._tic_lock = FairLock()
cls._fd_dict = {}
@classmethod
def idle(cls, owner):
......@@ -119,49 +106,17 @@ class Serialized(object):
@classmethod
def stop(cls):
assert cls._background
fd = cls._trigger._fd
cls._epoll.register(fd)
cls._trigger._lock.acquire()
del cls._trigger, cls._fd_dict[fd]
assert not cls._fd_dict
cls._sched_lock.release()
cls._thread.join()
del(cls._background, cls._busy, cls._busy_cond, cls._epoll,
cls._fd_dict, cls._pdb, cls._sched_lock, cls._step, cls._thread)
assert not cls._fd_dict, cls._fd_dict
del(cls._busy, cls._busy_cond, cls._epoll, cls._fd_dict,
cls._pdb, cls._sched_lock, cls._tic_lock)
@classmethod
def _run(cls):
sched_lock = cls._sched_lock
fd_dict = cls._fd_dict
sort_key = lambda fd_event: -fd_dict[fd_event[0]]._last
while 1:
sched_lock.acquire()
event_list = cls._step and cls._epoll.poll(0)
cls._step -= 1
if not event_list:
cls.idle(None)
if not cls._background:
continue
if not fd_dict:
break
event_list = cls._epoll.poll(-1)
cls._busy.add(None)
event_list.sort(key=sort_key)
next_lock = sched_lock
for fd, event in event_list:
self = fd_dict[fd]
self._release_next = next_lock.release
next_lock = self._lock
del self
next_lock.release()
def _sort_key(cls, fd_event):
return -cls._fd_dict[fd_event[0]]._last
@classmethod
@contextmanager
def pdb(cls):
if cls._background:
yield
return
try:
cls._pdb = sys._getframe(2).f_trace.im_self
cls._pdb.set_continue()
......@@ -179,29 +134,35 @@ class Serialized(object):
def tic(cls, step=-1, check_timeout=()):
# If you're in a pdb here, 'n' switches to another thread
# (the following lines are not supposed to be debugged into)
with cls.pdb(): # does nothing if background(1) was called
with cls._tic_lock, cls.pdb():
f = sys._getframe(1)
try:
logging.info('tic (%s:%u) ...',
f.f_code.co_filename, f.f_lineno)
finally:
del f
if cls._background:
assert step == -1 and not check_timeout
else:
if cls._busy:
with cls._busy_cond:
while cls._busy:
cls._busy_cond.wait()
cls._busy.add(None)
cls._step = step
for app in check_timeout:
app.em.epoll.check_timeout = True
app.em.wakeup()
del app
cls._sched_lock.release()
with cls._busy_cond:
while cls._busy:
cls._busy_cond.wait()
while step:
event_list = cls._epoll.poll(0)
if not event_list:
break
step -= 1
event_list.sort(key=cls._sort_key)
next_lock = cls._sched_lock
for fd, event in event_list:
self = cls._fd_dict[fd]
self._release_next = next_lock.release
next_lock = self._lock
del self
next_lock.release()
cls._sched_lock.acquire()
def __init__(self, app, busy=True):
self._epoll = app.em.epoll
......@@ -251,7 +212,7 @@ class TestSerialized(Serialized):
Serialized.__init__(busy=False, *args)
def poll(self, timeout):
if timeout and not self._background:
if timeout:
while 1:
r = self._epoll.poll(0)
if r:
......@@ -584,12 +545,11 @@ class NEOCluster(object):
Serialized.init()
@staticmethod
def _unpatch(background):
def _unpatch():
cls = NEOCluster
assert cls._patch_count > 0
cls._patch_count -= 1
if cls._patch_count:
Serialized.background(background)
return
BaseConnection.getTimeout = cls.BaseConnection_getTimeout
SimpleQueue.__init__ = cls.SimpleQueue__init__
......@@ -735,21 +695,23 @@ class NEOCluster(object):
self._db = db = ZODB.DB(storage=self.getZODBStorage())
return db
def join(self, thread_list, timeout=5):
timeout += time.time()
while thread_list:
assert time.time() < timeout
Serialized.tic()
thread_list = [t for t in thread_list if t.is_alive()]
def stop(self):
logging.debug("stopping %s", self)
background = Serialized.background(True)
self.__dict__.pop('_db', self.client).close()
node_list = self.admin_list + self.storage_list + self.master_list
for node in node_list:
node.em.wakeup(True)
for node in node_list:
if node._Thread__started.is_set():
node.join()
client = self.client.poll_thread
if client.is_alive():
client.join()
node_list.append(self.client.poll_thread)
self.join(node_list)
logging.debug("stopped %s", self)
self._unpatch(background)
self._unpatch()
def getNodeState(self, node):
uuid = node.uuid
......
......@@ -533,16 +533,10 @@ class Test(NEOThreadedTest):
t.commit()
# tell admin to shutdown the cluster
cluster.neoctl.setClusterState(ClusterStates.STOPPING)
self.tic()
# all nodes except clients should exit
for master in cluster.master_list:
master.join(5)
self.assertFalse(master.is_alive())
for storage in cluster.storage_list:
storage.join(5)
self.assertFalse(storage.is_alive())
cluster.admin.join(5)
self.assertFalse(cluster.admin.is_alive())
cluster.join(cluster.master_list
+ cluster.storage_list
+ cluster.admin_list)
finally:
cluster.stop()
cluster.reset() # reopen DB to check partition tables
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment