Commit 0b93b1fb authored by Julien Muchembled's avatar Julien Muchembled

Fix occasional deadlocks in threaded tests

deadlocks mainly happened while stopping a cluster, hence the complete review
of NEOCluster.stop()

A major change is to make the client node handle its lock like other nodes
(i.e. in the polling thread itself) to better know when to call
Serialized.background() (there was a race condition with the test of
'self.poll_thread.isAlive()' in ClientApplication.close).
parent 1ab594b4
...@@ -160,7 +160,6 @@ ...@@ -160,7 +160,6 @@
See http://garybernhardt.github.com/python-mock-comparison/ See http://garybernhardt.github.com/python-mock-comparison/
for a comparison of available mocking libraries/frameworks. for a comparison of available mocking libraries/frameworks.
- Fix epoll descriptor leak. - Fix epoll descriptor leak.
- Fix occasional deadlocks in threaded tests.
Later Later
- Consider auto-generating cluster name upon initial startup (it might - Consider auto-generating cluster name upon initial startup (it might
......
...@@ -29,6 +29,7 @@ import transaction, ZODB ...@@ -29,6 +29,7 @@ import transaction, ZODB
import neo.admin.app, neo.master.app, neo.storage.app import neo.admin.app, neo.master.app, neo.storage.app
import neo.client.app, neo.neoctl.app import neo.client.app, neo.neoctl.app
from neo.client import Storage from neo.client import Storage
from neo.client.poll import _ThreadedPoll
from neo.lib import logging from neo.lib import logging
from neo.lib.connection import BaseConnection, Connection from neo.lib.connection import BaseConnection, Connection
from neo.lib.connector import SocketConnector, \ from neo.lib.connector import SocketConnector, \
...@@ -44,43 +45,59 @@ LOCAL_IP = socket.inet_pton(ADDRESS_TYPE, IP_VERSION_FORMAT_DICT[ADDRESS_TYPE]) ...@@ -44,43 +45,59 @@ LOCAL_IP = socket.inet_pton(ADDRESS_TYPE, IP_VERSION_FORMAT_DICT[ADDRESS_TYPE])
class Serialized(object): class Serialized(object):
"""
Collection of variables/functions to schedule threads in a way that the
first to be suspended is also the first to be awoken. They also track
whether there's still pending activity in the cluster. This mainly provides
2 features to test cases:
- more determinism, by minimizing the number of active threads, and
switching them in a round-robin;
- tic() method to wait only the necessary time for the cluster to be idle.
The basic concept is that each thread has a lock that always gets acquired
by itself. The following pattern is used to yield the processor to the next
thread:
release(); acquire()
It should be noted that this is not atomic, i.e. all other threads
sometimes complete before a thread tries to acquire its lock: in order that
the previous thread does not fail by releasing an un-acquired lock,
we actually use Semaphores instead of Locks.
Threading switching usually happens before polling for network activity.
"""
@classmethod @classmethod
def init(cls): def init(cls):
cls._global_lock = threading.Lock() cls._global_lock = threading.Semaphore(0)
cls._global_lock.acquire() cls._lock_list = deque() # FIFO of Semaphore
cls._lock_list = deque()
cls._lock_lock = threading.Lock() cls._lock_lock = threading.Lock()
cls._pdb = False cls._pdb = False
cls.pending = 0 cls.pending = 0
@classmethod
def stop(cls, node_list):
cls.pending = frozenset(node_list) if node_list else 0
@classmethod @classmethod
def release(cls, lock=None, wake_other=True, stop=None): def release(cls, lock=None, wake_other=True, stop=None):
"""Suspend lock owner and resume first suspended thread""" """Resume first suspended thread"""
if lock is None: if lock is None:
lock = cls._global_lock lock = cls._global_lock
if stop: cls.stop(stop)
cls.pending = frozenset(stop)
else:
cls.pending = 0
try: try:
sys._getframe(1).f_trace.im_self.set_continue() sys._getframe(1).f_trace.im_self.set_continue()
cls._pdb = True cls._pdb = True
except AttributeError: except AttributeError:
pass pass
q = cls._lock_list q = cls._lock_list
l = cls._lock_lock with cls._lock_lock:
l.acquire()
try:
q.append(lock) q.append(lock)
if wake_other: if wake_other:
q.popleft().release() q.popleft().release()
finally:
l.release()
@classmethod @classmethod
def acquire(cls, lock=None): def acquire(cls, lock=None):
"""Suspend all threads except lock owner""" """Suspend lock owner"""
if lock is None: if lock is None:
lock = cls._global_lock lock = cls._global_lock
lock.acquire() lock.acquire()
...@@ -109,6 +126,8 @@ class Serialized(object): ...@@ -109,6 +126,8 @@ class Serialized(object):
with cls._lock_lock: with cls._lock_lock:
if cls._lock_list: if cls._lock_list:
cls._lock_list.popleft().release() cls._lock_list.popleft().release()
else:
cls._global_lock.release()
class SerializedEventManager(EventManager): class SerializedEventManager(EventManager):
...@@ -243,8 +262,7 @@ class ServerNode(Node): ...@@ -243,8 +262,7 @@ class ServerNode(Node):
def start(self): def start(self):
Serialized.pending = 1 Serialized.pending = 1
self.em._lock = l = threading.Lock() self.em._lock = l = threading.Semaphore(0)
l.acquire()
Serialized.release(l, wake_other=0) Serialized.release(l, wake_other=0)
threading.Thread.start(self) threading.Thread.start(self)
...@@ -325,7 +343,7 @@ class ClientApplication(Node, neo.client.app.Application): ...@@ -325,7 +343,7 @@ class ClientApplication(Node, neo.client.app.Application):
@SerializedEventManager.decorate @SerializedEventManager.decorate
def __init__(self, master_nodes, name, **kw): def __init__(self, master_nodes, name, **kw):
super(ClientApplication, self).__init__(master_nodes, name, **kw) super(ClientApplication, self).__init__(master_nodes, name, **kw)
self.em._lock = threading.Lock() self.em._lock = threading.Semaphore(0)
def setPoll(self, master=False): def setPoll(self, master=False):
"""Set whether this thread runs freely or not """Set whether this thread runs freely or not
...@@ -336,20 +354,14 @@ class ClientApplication(Node, neo.client.app.Application): ...@@ -336,20 +354,14 @@ class ClientApplication(Node, neo.client.app.Application):
processe packets upon NEOCluster.tic() calls. processe packets upon NEOCluster.tic() calls.
""" """
if master: if master:
assert not self.em._blocking
self.em._blocking = 1 self.em._blocking = 1
if not self.em._lock.acquire(0):
Serialized.background() Serialized.background()
else: else:
Serialized.release(wake_other=0); Serialized.acquire() Serialized.release(wake_other=0)
self.em._blocking = 0 self.em._blocking = 0
self.em.wakeup()
def __del__(self): Serialized.acquire()
try:
super(ClientApplication, self).__del__()
finally:
if self.poll_thread.isAlive():
Serialized.background()
close = __del__
def getConnectionList(self, *peers): def getConnectionList(self, *peers):
for peer in peers: for peer in peers:
...@@ -380,8 +392,9 @@ class LoggerThreadName(str): ...@@ -380,8 +392,9 @@ class LoggerThreadName(str):
return id(self) return id(self)
def __str__(self): def __str__(self):
t = threading.currentThread()
try: try:
return threading.currentThread().node_name return t.name if isinstance(t, _ThreadedPoll) else t.node_name
except AttributeError: except AttributeError:
return str.__str__(self) return str.__str__(self)
...@@ -472,6 +485,7 @@ class NEOCluster(object): ...@@ -472,6 +485,7 @@ class NEOCluster(object):
SocketConnector_connect = staticmethod(SocketConnector._connect) SocketConnector_connect = staticmethod(SocketConnector._connect)
SocketConnector_receive = staticmethod(SocketConnector.receive) SocketConnector_receive = staticmethod(SocketConnector.receive)
SocketConnector_send = staticmethod(SocketConnector.send) SocketConnector_send = staticmethod(SocketConnector.send)
_ThreadedPoll_run = staticmethod(_ThreadedPoll.run)
_patch_count = 0 _patch_count = 0
_resource_dict = weakref.WeakValueDictionary() _resource_dict = weakref.WeakValueDictionary()
...@@ -507,6 +521,14 @@ class NEOCluster(object): ...@@ -507,6 +521,14 @@ class NEOCluster(object):
if data: if data:
return data return data
raise raise
def run(self):
l = self.em._lock
Serialized.release(l, wake_other=0)
try:
Serialized.acquire(l)
cls._ThreadedPoll_run(self)
finally:
Serialized.background()
BaseConnection.getTimeout = lambda self: None BaseConnection.getTimeout = lambda self: None
SocketConnector.CONNECT_LIMIT = 0 SocketConnector.CONNECT_LIMIT = 0
SocketConnector._bind = lambda self, addr: \ SocketConnector._bind = lambda self, addr: \
...@@ -515,6 +537,7 @@ class NEOCluster(object): ...@@ -515,6 +537,7 @@ class NEOCluster(object):
cls.SocketConnector_connect(self, ServerNode.resolv(addr)) cls.SocketConnector_connect(self, ServerNode.resolv(addr))
SocketConnector.receive = receive SocketConnector.receive = receive
SocketConnector.send = send SocketConnector.send = send
_ThreadedPoll.run = run
Serialized.init() Serialized.init()
@staticmethod @staticmethod
...@@ -530,6 +553,7 @@ class NEOCluster(object): ...@@ -530,6 +553,7 @@ class NEOCluster(object):
SocketConnector._connect = cls.SocketConnector_connect SocketConnector._connect = cls.SocketConnector_connect
SocketConnector.receive = cls.SocketConnector_receive SocketConnector.receive = cls.SocketConnector_receive
SocketConnector.send = cls.SocketConnector_send SocketConnector.send = cls.SocketConnector_send
_ThreadedPoll.run = cls._ThreadedPoll_run
def __init__(self, master_count=1, partitions=1, replicas=0, upstream=None, def __init__(self, master_count=1, partitions=1, replicas=0, upstream=None,
adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'), adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
...@@ -584,6 +608,10 @@ class NEOCluster(object): ...@@ -584,6 +608,10 @@ class NEOCluster(object):
master_nodes=self.master_nodes, compress=compress) master_nodes=self.master_nodes, compress=compress)
self.neoctl = NeoCTL(self.admin.getVirtualAddress()) self.neoctl = NeoCTL(self.admin.getVirtualAddress())
def __repr__(self):
return "<%s(%s) at 0x%x>" % (self.__class__.__name__,
self.name, id(self))
# A few shortcuts that work when there's only 1 master/storage/admin # A few shortcuts that work when there's only 1 master/storage/admin
@property @property
def master(self): def master(self):
...@@ -662,18 +690,29 @@ class NEOCluster(object): ...@@ -662,18 +690,29 @@ class NEOCluster(object):
return db return db
def stop(self): def stop(self):
if hasattr(self, '_db') and self.client.em._blocking == 0: logging.debug("stopping %s", self)
self.client.setPoll(True) client = self.client.em._blocking
if client:
self.client.em._blocking = 0
self.__dict__.pop('_db', self.client).close() self.__dict__.pop('_db', self.client).close()
try: try:
Serialized.release(stop= Serialized.stop(
self.admin_list + self.storage_list + self.master_list) self.admin_list + self.storage_list + self.master_list)
client or Serialized.background()
for node_type in 'admin', 'storage', 'master': for node_type in 'admin', 'storage', 'master':
for node in getattr(self, node_type + '_list'): for node in getattr(self, node_type + '_list'):
if node.isAlive(): if node.isAlive():
node.join() node.join()
client = self.client.poll_thread
if client.isAlive():
client.join()
finally: finally:
Serialized.acquire() # Acquire again in case there's another cluster running, otherwise
# the test would continue with too many simultaneous awoken thread.
# If we're the last cluster, the last call to Serialized.background
# does a dummy release so that we don't get stuck here.
Serialized.release(wake_other=0); Serialized.acquire()
logging.debug("stopped %s", self)
self._unpatch() self._unpatch()
@staticmethod @staticmethod
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment