Commit 7e7af30c authored by Julien Muchembled's avatar Julien Muchembled

qa: hack to make threaded tests pass on a single-core CPU

In reality, this was tested with
  taskset 1 neotestrunner ...
parent 180e8f6a
...@@ -117,10 +117,6 @@ class Serialized(object): ...@@ -117,10 +117,6 @@ class Serialized(object):
The epoll object of each node is hooked so that thread switching happens The epoll object of each node is hooked so that thread switching happens
before polling for network activity. An extra epoll object is used to before polling for network activity. An extra epoll object is used to
detect which node has a readable epoll object. detect which node has a readable epoll object.
XXX: It seems wrong to rely only on epoll as way to know if there are
pending network messages. I had rare random failures due to tic()
returning prematurely.
""" """
check_timeout = False check_timeout = False
...@@ -169,7 +165,13 @@ class Serialized(object): ...@@ -169,7 +165,13 @@ class Serialized(object):
p.set_trace(sys._getframe(3)) p.set_trace(sys._getframe(3))
@classmethod @classmethod
def tic(cls, step=-1, check_timeout=(), quiet=False): def tic(cls, step=-1, check_timeout=(), quiet=False,
# BUG: We overuse epoll as a way to know if there are pending
# network messages. Sometimes, and this is more visible with
# a single-core CPU, other threads are still busy and haven't
# sent anything yet on the network. This causes tic() to
# return prematurely. Passing a non-zero value is a hack.
timeout=0):
# If you're in a pdb here, 'n' switches to another thread # If you're in a pdb here, 'n' switches to another thread
# (the following lines are not supposed to be debugged into) # (the following lines are not supposed to be debugged into)
with cls._tic_lock, cls.pdb(): with cls._tic_lock, cls.pdb():
...@@ -189,7 +191,7 @@ class Serialized(object): ...@@ -189,7 +191,7 @@ class Serialized(object):
app.em.wakeup() app.em.wakeup()
del app del app
while step: while step:
event_list = cls._epoll.poll(0) event_list = cls._epoll.poll(timeout)
if not event_list: if not event_list:
break break
step -= 1 step -= 1
...@@ -259,7 +261,7 @@ class TestSerialized(Serialized): ...@@ -259,7 +261,7 @@ class TestSerialized(Serialized):
r = self._epoll.poll(0) r = self._epoll.poll(0)
if r: if r:
return r return r
Serialized.tic(step=1) Serialized.tic(step=1, timeout=.001)
raise Exception("tic is looping forever") raise Exception("tic is looping forever")
return self._epoll.poll(timeout) return self._epoll.poll(timeout)
...@@ -593,7 +595,7 @@ class NEOCluster(object): ...@@ -593,7 +595,7 @@ class NEOCluster(object):
for i in TIC_LOOP: for i in TIC_LOOP:
if lock(False): if lock(False):
return True return True
Serialized.tic(step=1, quiet=True) Serialized.tic(step=1, quiet=True, timeout=.001)
raise Exception("tic is looping forever") raise Exception("tic is looping forever")
return lock(False) return lock(False)
self._lock = _lock self._lock = _lock
...@@ -841,13 +843,15 @@ class NEOCluster(object): ...@@ -841,13 +843,15 @@ class NEOCluster(object):
self.neoctl.enableStorageList([x.uuid for x in storage_list]) self.neoctl.enableStorageList([x.uuid for x in storage_list])
Serialized.tic() Serialized.tic()
for node in storage_list: for node in storage_list:
assert self.getNodeState(node) == NodeStates.RUNNING state = self.getNodeState(node)
assert state == NodeStates.RUNNING, state
def join(self, thread_list, timeout=5): def join(self, thread_list, timeout=5):
timeout += time.time() timeout += time.time()
while thread_list: while thread_list:
assert time.time() < timeout, thread_list # Map with repr before that threads become unprintable.
Serialized.tic() assert time.time() < timeout, map(repr, thread_list)
Serialized.tic(timeout=.001)
thread_list = [t for t in thread_list if t.is_alive()] thread_list = [t for t in thread_list if t.is_alive()]
def getNodeState(self, node): def getNodeState(self, node):
......
...@@ -732,6 +732,7 @@ class Test(NEOThreadedTest): ...@@ -732,6 +732,7 @@ class Test(NEOThreadedTest):
c.root()._p_changed = 1 c.root()._p_changed = 1
with Patch(storage.tm, lock=lambda *_: sys.exit()): with Patch(storage.tm, lock=lambda *_: sys.exit()):
self.commitWithStorageFailure(cluster.client, t) self.commitWithStorageFailure(cluster.client, t)
cluster.join((storage,))
self.assertRaises(DatabaseFailure, storage.resetNode) self.assertRaises(DatabaseFailure, storage.resetNode)
@with_cluster(replicas=1) @with_cluster(replicas=1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment