Commit 4253d24f authored by Julien Muchembled's avatar Julien Muchembled

Stop using 'background' mode in threaded tests

This makes tests easier to write, with more determinism.
If only I had the idea to monkey-patch SimpleQueue several years ago.
parent 7025db52
...@@ -29,6 +29,7 @@ import transaction, ZODB ...@@ -29,6 +29,7 @@ import transaction, ZODB
import neo.admin.app, neo.master.app, neo.storage.app import neo.admin.app, neo.master.app, neo.storage.app
import neo.client.app, neo.neoctl.app import neo.client.app, neo.neoctl.app
from neo.client import Storage from neo.client import Storage
from neo.client.container import SimpleQueue
from neo.client.poll import _ThreadedPoll from neo.client.poll import _ThreadedPoll
from neo.lib import logging from neo.lib import logging
from neo.lib.connection import BaseConnection, Connection from neo.lib.connection import BaseConnection, Connection
...@@ -96,7 +97,8 @@ class Serialized(object): ...@@ -96,7 +97,8 @@ class Serialized(object):
@classmethod @classmethod
def background(cls, background): def background(cls, background):
if cls._background != background: prev = cls._background
if prev != background:
if background: if background:
cls._background = background cls._background = background
cls._sched_lock.release() cls._sched_lock.release()
...@@ -107,6 +109,7 @@ class Serialized(object): ...@@ -107,6 +109,7 @@ class Serialized(object):
cls._epoll.unregister(fd) cls._epoll.unregister(fd)
cls.idle(None) cls.idle(None)
cls._background = background cls._background = background
return prev
@classmethod @classmethod
def idle(cls, owner): def idle(cls, owner):
...@@ -530,6 +533,7 @@ class NEOCluster(object): ...@@ -530,6 +533,7 @@ class NEOCluster(object):
BaseConnection_getTimeout = staticmethod(BaseConnection.getTimeout) BaseConnection_getTimeout = staticmethod(BaseConnection.getTimeout)
CONNECT_LIMIT = SocketConnector.CONNECT_LIMIT CONNECT_LIMIT = SocketConnector.CONNECT_LIMIT
SimpleQueue__init__ = staticmethod(SimpleQueue.__init__)
SocketConnector_bind = staticmethod(SocketConnector._bind) SocketConnector_bind = staticmethod(SocketConnector._bind)
SocketConnector_connect = staticmethod(SocketConnector._connect) SocketConnector_connect = staticmethod(SocketConnector._connect)
_ThreadedPoll_run = staticmethod(_ThreadedPoll.run) _ThreadedPoll_run = staticmethod(_ThreadedPoll.run)
...@@ -550,6 +554,16 @@ class NEOCluster(object): ...@@ -550,6 +554,16 @@ class NEOCluster(object):
cls._patch_count += 1 cls._patch_count += 1
if cls._patch_count > 1: if cls._patch_count > 1:
return return
def __init__(self):
cls.SimpleQueue__init__(self)
lock = self._lock
def _lock(blocking=True):
if blocking:
while not lock(False):
Serialized.tic(step=1)
return True
return lock(False)
self._lock = _lock
def start(self): def start(self):
Serialized(self) Serialized(self)
cls._ThreadedPoll_start(self) cls._ThreadedPoll_start(self)
...@@ -559,6 +573,7 @@ class NEOCluster(object): ...@@ -559,6 +573,7 @@ class NEOCluster(object):
finally: finally:
self.em.epoll.exit() self.em.epoll.exit()
BaseConnection.getTimeout = lambda self: None BaseConnection.getTimeout = lambda self: None
SimpleQueue.__init__ = __init__
SocketConnector.CONNECT_LIMIT = 0 SocketConnector.CONNECT_LIMIT = 0
SocketConnector._bind = lambda self, addr: \ SocketConnector._bind = lambda self, addr: \
cls.SocketConnector_bind(self, BIND) cls.SocketConnector_bind(self, BIND)
...@@ -569,13 +584,15 @@ class NEOCluster(object): ...@@ -569,13 +584,15 @@ class NEOCluster(object):
Serialized.init() Serialized.init()
@staticmethod @staticmethod
def _unpatch(): def _unpatch(background):
cls = NEOCluster cls = NEOCluster
assert cls._patch_count > 0 assert cls._patch_count > 0
cls._patch_count -= 1 cls._patch_count -= 1
if cls._patch_count: if cls._patch_count:
Serialized.background(background)
return return
BaseConnection.getTimeout = cls.BaseConnection_getTimeout BaseConnection.getTimeout = cls.BaseConnection_getTimeout
SimpleQueue.__init__ = cls.SimpleQueue__init__
SocketConnector.CONNECT_LIMIT = cls.CONNECT_LIMIT SocketConnector.CONNECT_LIMIT = cls.CONNECT_LIMIT
SocketConnector._bind = cls.SocketConnector_bind SocketConnector._bind = cls.SocketConnector_bind
SocketConnector._connect = cls.SocketConnector_connect SocketConnector._connect = cls.SocketConnector_connect
...@@ -673,6 +690,7 @@ class NEOCluster(object): ...@@ -673,6 +690,7 @@ class NEOCluster(object):
def start(self, storage_list=None, fast_startup=False): def start(self, storage_list=None, fast_startup=False):
self._patch() self._patch()
self.client._thread_container.__init__()
for node_type in 'master', 'admin': for node_type in 'master', 'admin':
for node in getattr(self, node_type + '_list'): for node in getattr(self, node_type + '_list'):
node.start() node.start()
...@@ -719,7 +737,7 @@ class NEOCluster(object): ...@@ -719,7 +737,7 @@ class NEOCluster(object):
def stop(self): def stop(self):
logging.debug("stopping %s", self) logging.debug("stopping %s", self)
Serialized.background(True) background = Serialized.background(True)
self.__dict__.pop('_db', self.client).close() self.__dict__.pop('_db', self.client).close()
node_list = self.admin_list + self.storage_list + self.master_list node_list = self.admin_list + self.storage_list + self.master_list
for node in node_list: for node in node_list:
...@@ -731,7 +749,7 @@ class NEOCluster(object): ...@@ -731,7 +749,7 @@ class NEOCluster(object):
if client.is_alive(): if client.is_alive():
client.join() client.join()
logging.debug("stopped %s", self) logging.debug("stopped %s", self)
self._unpatch() self._unpatch(background)
def getNodeState(self, node): def getNodeState(self, node):
uuid = node.uuid uuid = node.uuid
...@@ -745,8 +763,6 @@ class NEOCluster(object): ...@@ -745,8 +763,6 @@ class NEOCluster(object):
if cell[1] == CellStates.OUT_OF_DATE] if cell[1] == CellStates.OUT_OF_DATE]
def getZODBStorage(self, **kw): def getZODBStorage(self, **kw):
# automatically let nodes running in the background
Serialized.background(True)
return Storage.Storage(None, self.name, _app=self.client, **kw) return Storage.Storage(None, self.name, _app=self.client, **kw)
def importZODB(self, dummy_zodb=None, random=random): def importZODB(self, dummy_zodb=None, random=random):
...@@ -807,7 +823,6 @@ class NEOThreadedTest(NeoTestBase): ...@@ -807,7 +823,6 @@ class NEOThreadedTest(NeoTestBase):
db.execute("UPDATE packet SET body=NULL") db.execute("UPDATE packet SET body=NULL")
db.execute("VACUUM") db.execute("VACUUM")
background = Serialized.background
tic = Serialized.tic tic = Serialized.tic
def getUnpickler(self, conn): def getUnpickler(self, conn):
......
...@@ -360,7 +360,6 @@ class Test(NEOThreadedTest): ...@@ -360,7 +360,6 @@ class Test(NEOThreadedTest):
try: try:
cluster.start() cluster.start()
cluster.db # open DB cluster.db # open DB
self.background(0)
s0, s1 = cluster.client.nm.getStorageList() s0, s1 = cluster.client.nm.getStorageList()
conn = s0.getConnection() conn = s0.getConnection()
self.assertFalse(conn.isClosed()) self.assertFalse(conn.isClosed())
...@@ -532,7 +531,6 @@ class Test(NEOThreadedTest): ...@@ -532,7 +531,6 @@ class Test(NEOThreadedTest):
t, c = cluster.getTransaction() t, c = cluster.getTransaction()
c.root()[''] = '' c.root()[''] = ''
t.commit() t.commit()
self.background(0)
# tell admin to shutdown the cluster # tell admin to shutdown the cluster
cluster.neoctl.setClusterState(ClusterStates.STOPPING) cluster.neoctl.setClusterState(ClusterStates.STOPPING)
self.tic() self.tic()
...@@ -716,7 +714,6 @@ class Test(NEOThreadedTest): ...@@ -716,7 +714,6 @@ class Test(NEOThreadedTest):
y = c1._storage.load(y._p_oid)[0] y = c1._storage.load(y._p_oid)[0]
# close connections to master & storage # close connections to master & storage
self.background(0)
c, = cluster.master.nm.getClientList() c, = cluster.master.nm.getClientList()
c.getConnection().close() c.getConnection().close()
c, = cluster.storage.nm.getClientList() c, = cluster.storage.nm.getClientList()
...@@ -726,7 +723,6 @@ class Test(NEOThreadedTest): ...@@ -726,7 +723,6 @@ class Test(NEOThreadedTest):
# modify x with another client # modify x with another client
client = ClientApplication(name=cluster.name, client = ClientApplication(name=cluster.name,
master_nodes=cluster.master_nodes) master_nodes=cluster.master_nodes)
self.background(1)
txn = transaction.Transaction() txn = transaction.Transaction()
client.tpc_begin(txn) client.tpc_begin(txn)
client.store(x1._p_oid, x1._p_serial, y, '', txn) client.store(x1._p_oid, x1._p_serial, y, '', txn)
...@@ -752,7 +748,6 @@ class Test(NEOThreadedTest): ...@@ -752,7 +748,6 @@ class Test(NEOThreadedTest):
try: try:
cluster.start() cluster.start()
client = cluster.client client = cluster.client
self.background(1)
txn = transaction.Transaction() txn = transaction.Transaction()
client.tpc_begin(txn) client.tpc_begin(txn)
txn_context = client._txn_container.get(txn) txn_context = client._txn_container.get(txn)
...@@ -815,13 +810,11 @@ class Test(NEOThreadedTest): ...@@ -815,13 +810,11 @@ class Test(NEOThreadedTest):
with cluster.master.filterConnection(cluster.storage) as m2s: with cluster.master.filterConnection(cluster.storage) as m2s:
m2s.add(delayNotifyInformation) m2s.add(delayNotifyInformation)
cluster.client.master_conn.close() cluster.client.master_conn.close()
self.background(0)
client = ClientApplication(name=cluster.name, client = ClientApplication(name=cluster.name,
master_nodes=cluster.master_nodes) master_nodes=cluster.master_nodes)
p = Patch(client.storage_bootstrap_handler, notReady=notReady) p = Patch(client.storage_bootstrap_handler, notReady=notReady)
try: try:
p.apply() p.apply()
self.background(1)
x = client.load(ZERO_TID) x = client.load(ZERO_TID)
finally: finally:
del p del p
......
...@@ -89,7 +89,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -89,7 +89,6 @@ class ReplicationTests(NEOThreadedTest):
upstream.start() upstream.start()
importZODB = upstream.importZODB() importZODB = upstream.importZODB()
importZODB(3) importZODB(3)
self.background(0)
backup = NEOCluster(partitions=np, replicas=nr-1, storage_count=5, backup = NEOCluster(partitions=np, replicas=nr-1, storage_count=5,
upstream=upstream) upstream=upstream)
try: try:
...@@ -100,7 +99,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -100,7 +99,6 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(np*nr, self.checkBackup(backup)) self.assertEqual(np*nr, self.checkBackup(backup))
# Normal case, following upstream cluster closely. # Normal case, following upstream cluster closely.
importZODB(17) importZODB(17)
self.background(0)
self.tic() self.tic()
self.assertEqual(np*nr, self.checkBackup(backup)) self.assertEqual(np*nr, self.checkBackup(backup))
# Check that a backup cluster can be restarted. # Check that a backup cluster can be restarted.
...@@ -112,7 +110,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -112,7 +110,6 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(backup.neoctl.getClusterState(), self.assertEqual(backup.neoctl.getClusterState(),
ClusterStates.BACKINGUP) ClusterStates.BACKINGUP)
importZODB(17) importZODB(17)
self.background(0)
self.tic() self.tic()
self.assertEqual(np*nr, self.checkBackup(backup)) self.assertEqual(np*nr, self.checkBackup(backup))
backup.neoctl.checkReplicas(check_dict, ZERO_TID, None) backup.neoctl.checkReplicas(check_dict, ZERO_TID, None)
...@@ -138,7 +135,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -138,7 +135,6 @@ class ReplicationTests(NEOThreadedTest):
f.add(delaySecondary) f.add(delaySecondary)
while not f.filtered_count: while not f.filtered_count:
importZODB(1) importZODB(1)
self.background(0)
self.tic() self.tic()
backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP) backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
self.tic() self.tic()
...@@ -157,7 +153,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -157,7 +153,6 @@ class ReplicationTests(NEOThreadedTest):
isinstance(packet, Packets.AddObject)) isinstance(packet, Packets.AddObject))
while not f.filtered_count: while not f.filtered_count:
importZODB(1) importZODB(1)
self.background(0)
self.tic() self.tic()
backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP) backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
self.tic() self.tic()
...@@ -200,7 +195,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -200,7 +195,6 @@ class ReplicationTests(NEOThreadedTest):
# Do not start with an empty DB so that 'primary_dict' below is not # Do not start with an empty DB so that 'primary_dict' below is not
# empty on the first iteration. # empty on the first iteration.
importZODB(1) importZODB(1)
self.background(0)
backup = NEOCluster(partitions=np, replicas=2, storage_count=4, backup = NEOCluster(partitions=np, replicas=2, storage_count=4,
upstream=upstream) upstream=upstream)
try: try:
...@@ -229,7 +223,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -229,7 +223,6 @@ class ReplicationTests(NEOThreadedTest):
fetchObjects=fetchObjects) fetchObjects=fetchObjects)
with p: with p:
importZODB(lambda x: counts[0] > 1) importZODB(lambda x: counts[0] > 1)
self.background(0)
if event > 5: if event > 5:
backup.neoctl.checkReplicas(check_dict, ZERO_TID, None) backup.neoctl.checkReplicas(check_dict, ZERO_TID, None)
self.tic() self.tic()
...@@ -274,7 +267,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -274,7 +267,6 @@ class ReplicationTests(NEOThreadedTest):
f.add(lambda conn, packet: f.add(lambda conn, packet:
isinstance(packet, Packets.InvalidateObjects)) isinstance(packet, Packets.InvalidateObjects))
upstream.importZODB()(1) upstream.importZODB()(1)
self.background(0)
count = [0] count = [0]
def _connect(orig, conn): def _connect(orig, conn):
count[0] += 1 count[0] += 1
...@@ -305,7 +297,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -305,7 +297,6 @@ class ReplicationTests(NEOThreadedTest):
f.add(lambda conn, packet: f.add(lambda conn, packet:
isinstance(packet, Packets.NotifyUnlockInformation)) isinstance(packet, Packets.NotifyUnlockInformation))
upstream.importZODB()(1) upstream.importZODB()(1)
self.background(0)
self.tic() self.tic()
self.tic() self.tic()
self.assertEqual(1, self.checkBackup(backup)) self.assertEqual(1, self.checkBackup(backup))
...@@ -342,7 +333,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -342,7 +333,6 @@ class ReplicationTests(NEOThreadedTest):
try: try:
cluster.start([s0]) cluster.start([s0])
cluster.populate([range(np*2)] * np) cluster.populate([range(np*2)] * np)
self.background(0)
s1.start() s1.start()
s2.start() s2.start()
self.tic() self.tic()
...@@ -386,7 +376,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -386,7 +376,6 @@ class ReplicationTests(NEOThreadedTest):
checker.CHECK_COUNT = 2 checker.CHECK_COUNT = 2
cluster.start() cluster.start()
cluster.populate([range(np*2)] * tid_count) cluster.populate([range(np*2)] * tid_count)
self.background(0)
storage_dict = {x.uuid: x for x in cluster.storage_list} storage_dict = {x.uuid: x for x in cluster.storage_list}
cluster.neoctl.checkReplicas(check_dict, ZERO_TID, None) cluster.neoctl.checkReplicas(check_dict, ZERO_TID, None)
self.tic() self.tic()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment