Commit e2183483 authored by Julien Muchembled's avatar Julien Muchembled

qa: make closure of NEOCluster more reliable in treaded tests

Instances of NEOCluster were not deleted as soon as the only referrers were
weak proxies (at least that's what a quick check with the 'gc' module showed
at the beginning of tearDown). In some cases, __del__ was called while the next
test was logging a message, which led to deadlocks.

Without those proxies, it may be reliable, but only on CPython. See
  http://doc.pypy.org/en/latest/cpython_differences.html#differences-related-to-garbage-collection-strategies

Relying on __del__ to close a cluster was wrong. NEOCluster is now a context
manager that does it explicitly at exit, in addition to automatically stop it.
The NEOCluster.stop method combines the previous stop/__del__/reset methods.

A new 'with_cluster' decorator is also added to avoid excessive indentation
in tests. Unindentation of existing tests will be done later.
parent 1d738521
......@@ -624,6 +624,8 @@ class NEOCluster(object):
patch.revert()
Serialized.stop()
started = False
def __init__(self, master_count=1, partitions=1, replicas=0, upstream=None,
adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
storage_count=None, db_list=None, clear_databases=True,
......@@ -632,6 +634,7 @@ class NEOCluster(object):
self.name = 'neo_%s' % self._allocate('name',
lambda: random.randint(0, 100))
self.compress = compress
self.num_partitions = partitions
master_list = [MasterApplication.newAddress()
for _ in xrange(master_count)]
self.master_nodes = ' '.join('%s:%s' % x for x in master_list)
......@@ -675,7 +678,6 @@ class NEOCluster(object):
self.storage_list = [StorageApplication(getDatabase=db % x, **kw)
for x in db_list]
self.admin_list = [AdminApplication(**kw)]
self.neoctl = NeoCTL(self.admin.getVirtualAddress(), ssl=self.SSL)
def __repr__(self):
return "<%s(%s) at 0x%x>" % (self.__class__.__name__,
......@@ -711,18 +713,16 @@ class NEOCluster(object):
return master
###
def reset(self, clear_database=False):
for node_type in 'master', 'storage', 'admin':
kw = {}
if node_type == 'storage':
kw['clear_database'] = clear_database
for node in getattr(self, node_type + '_list'):
node.resetNode(**kw)
self.neoctl.close()
self.neoctl = NeoCTL(self.admin.getVirtualAddress(), ssl=self.SSL)
def __enter__(self):
return self
def __exit__(self, t, v, tb):
self.stop(None)
def start(self, storage_list=None, fast_startup=False):
self.started = True
self._patch()
self.neoctl = NeoCTL(self.admin.getVirtualAddress(), ssl=self.SSL)
for node_type in 'master', 'admin':
for node in getattr(self, node_type + '_list'):
node.start()
......@@ -741,6 +741,40 @@ class NEOCluster(object):
assert state in (ClusterStates.RUNNING, ClusterStates.BACKINGUP), state
self.enableStorageList(storage_list)
def stop(self, clear_database=False, __print_exc=traceback.print_exc):
if self.started:
del self.started
logging.debug("stopping %s", self)
client = self.__dict__.get("client")
client is None or self.__dict__.pop("db", client).close()
node_list = self.admin_list + self.storage_list + self.master_list
for node in node_list:
node.stop()
try:
node_list.append(client.poll_thread)
except AttributeError: # client is None or thread is already stopped
pass
self.join(node_list)
self.neoctl.close()
del self.neoctl
logging.debug("stopped %s", self)
self._unpatch()
if clear_database is None:
try:
for node_type in 'admin', 'storage', 'master':
for node in getattr(self, node_type + '_list'):
node.close()
except:
__print_exc()
raise
else:
for node_type in 'master', 'storage', 'admin':
kw = {}
if node_type == 'storage':
kw['clear_database'] = clear_database
for node in getattr(self, node_type + '_list'):
node.resetNode(**kw)
def _newClient(self):
return ClientApplication(name=self.name, master_nodes=self.master_nodes,
compress=self.compress, ssl=self.SSL)
......@@ -801,21 +835,6 @@ class NEOCluster(object):
Serialized.tic()
thread_list = [t for t in thread_list if t.is_alive()]
def stop(self):
logging.debug("stopping %s", self)
client = self.__dict__.get("client")
client is None or self.__dict__.pop("db", client).close()
node_list = self.admin_list + self.storage_list + self.master_list
for node in node_list:
node.stop()
try:
node_list.append(client.poll_thread)
except AttributeError: # client is None or thread is already stopped
pass
self.join(node_list)
logging.debug("stopped %s", self)
self._unpatch()
def getNodeState(self, node):
uuid = node.uuid
for node in self.neoctl.getNodeList(node.node_type):
......@@ -861,16 +880,6 @@ class NEOCluster(object):
txn = transaction.TransactionManager()
return txn, (self.db if db is None else db).open(txn)
def __del__(self, __print_exc=traceback.print_exc):
try:
self.neoctl.close()
for node_type in 'admin', 'storage', 'master':
for node in getattr(self, node_type + '_list'):
node.close()
except:
__print_exc()
raise
def extraCellSortKey(self, key):
return Patch(self.client.cp, getCellSortKey=lambda orig, cell:
(orig(cell), key(cell)))
......@@ -1000,3 +1009,13 @@ def predictable_random(seed=None):
= random
return wraps(wrapped)(wrapper)
return decorator
def with_cluster(start_cluster=True, **cluster_kw):
def decorator(wrapped):
def wrapper(self, *args, **kw):
with NEOCluster(**cluster_kw) as cluster:
if start_cluster:
cluster.start()
return wrapped(self, cluster, *args, **kw)
return wraps(wrapped)(wrapper)
return decorator
......@@ -33,7 +33,7 @@ from neo.lib.exception import DatabaseFailure, StoppedOperation
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \
ZERO_OID, ZERO_TID
from .. import expectedFailure, Patch
from . import LockLock, NEOCluster, NEOThreadedTest
from . import LockLock, NEOThreadedTest, with_cluster
from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError
from neo.client.pool import CELL_CONNECTED, CELL_GOOD
......@@ -54,10 +54,9 @@ class PCounterWithResolution(PCounter):
class Test(NEOThreadedTest):
def testBasicStore(self):
cluster = NEOCluster()
try:
cluster.start()
@with_cluster()
def testBasicStore(self, cluster):
if 1:
storage = cluster.getZODBStorage()
data_info = {}
compressible = 'x' * 20
......@@ -107,13 +106,10 @@ class Test(NEOThreadedTest):
if big:
self.assertFalse(cluster.storage.sqlCount('bigdata'))
self.assertFalse(cluster.storage.sqlCount('data'))
finally:
cluster.stop()
def testDeleteObject(self):
cluster = NEOCluster()
try:
cluster.start()
@with_cluster()
def testDeleteObject(self, cluster):
if 1:
storage = cluster.getZODBStorage()
for clear_cache in 0, 1:
for tst in 'a.', 'bcd.':
......@@ -132,13 +128,10 @@ class Test(NEOThreadedTest):
storage._cache.clear()
self.assertRaises(POSException.POSKeyError,
storage.load, oid, '')
finally:
cluster.stop()
def testCreationUndoneHistory(self):
cluster = NEOCluster()
try:
cluster.start()
@with_cluster()
def testCreationUndoneHistory(self, cluster):
if 1:
storage = cluster.getZODBStorage()
oid = storage.new_oid()
txn = transaction.Transaction()
......@@ -156,18 +149,15 @@ class Test(NEOThreadedTest):
for x in storage.history(oid, 10):
self.assertEqual((x['tid'], x['size']), expected.pop())
self.assertFalse(expected)
finally:
cluster.stop()
def testUndoConflict(self, conflict_during_store=False):
@with_cluster()
def testUndoConflict(self, cluster, conflict_during_store=False):
def waitResponses(orig, *args):
orig(*args)
p.revert()
ob.value += 3
t.commit()
cluster = NEOCluster()
try:
cluster.start()
if 1:
t, c = cluster.getTransaction()
c.root()[0] = ob = PCounterWithResolution()
t.commit()
......@@ -187,17 +177,14 @@ class Test(NEOThreadedTest):
undo.tpc_finish(txn)
t.begin()
self.assertEqual(ob.value, 3)
finally:
cluster.stop()
@expectedFailure(POSException.ConflictError)
def testUndoConflictDuringStore(self):
self.testUndoConflict(True)
def testStorageDataLock(self):
cluster = NEOCluster()
try:
cluster.start()
@with_cluster()
def testStorageDataLock(self, cluster):
if 1:
storage = cluster.getZODBStorage()
data_info = {}
......@@ -243,10 +230,9 @@ class Test(NEOThreadedTest):
storage.sync()
data_info[key] -= 1
self.assertEqual(data_info, cluster.storage.getDataLockInfo())
finally:
cluster.stop()
def testDelayedUnlockInformation(self):
@with_cluster(storage_count=1)
def testDelayedUnlockInformation(self, cluster):
except_list = []
def onStoreObject(orig, tm, ttid, serial, oid, *args):
if oid == resume_oid and delayUnlockInformation in m2s:
......@@ -256,9 +242,7 @@ class Test(NEOThreadedTest):
except Exception, e:
except_list.append(e.__class__)
raise
cluster = NEOCluster(storage_count=1)
try:
cluster.start()
if 1:
t, c = cluster.getTransaction()
c.root()[0] = ob = PCounter()
with cluster.master.filterConnection(cluster.storage) as m2s:
......@@ -270,11 +254,10 @@ class Test(NEOThreadedTest):
ob._p_changed = 1
t.commit()
self.assertNotIn(delayUnlockInformation, m2s)
finally:
cluster.stop()
self.assertEqual(except_list, [DelayedError])
def _testDeadlockAvoidance(self, scenario):
@with_cluster(storage_count=2, replicas=1)
def _testDeadlockAvoidance(self, cluster, scenario):
except_list = []
delay = threading.Event(), threading.Event()
ident = get_ident()
......@@ -306,9 +289,7 @@ class Test(NEOThreadedTest):
delay[c2].clear()
delay[1-c2].set()
cluster = NEOCluster(storage_count=2, replicas=1)
try:
cluster.start()
if 1:
t, c = cluster.getTransaction()
c.root()[0] = ob = PCounterWithResolution()
t.commit()
......@@ -331,8 +312,6 @@ class Test(NEOThreadedTest):
t2.begin()
self.assertEqual(o1.value, 3)
self.assertEqual(o2.value, 3)
finally:
cluster.stop()
return except_list
def testDelayedStore(self):
......@@ -354,11 +333,10 @@ class Test(NEOThreadedTest):
self.assertEqual(self._testDeadlockAvoidance([1, 3]),
[DelayedError, ConflictError, "???" ])
def testConflictResolutionTriggered2(self):
@with_cluster()
def testConflictResolutionTriggered2(self, cluster):
""" Check that conflict resolution works """
cluster = NEOCluster()
try:
cluster.start()
if 1:
# create the initial object
t, c = cluster.getTransaction()
c.root()['with_resolution'] = ob = PCounterWithResolution()
......@@ -426,10 +404,9 @@ class Test(NEOThreadedTest):
# check history
self.assertEqual([x['tid'] for x in c1.db().history(oid, size=10)],
[tid3, tid2, tid1, tid0])
finally:
cluster.stop()
def testDelayedLoad(self):
@with_cluster()
def testDelayedLoad(self, cluster):
"""
Check that a storage node delays reads from the database,
when the requested data may still be in a temporary place.
......@@ -441,9 +418,7 @@ class Test(NEOThreadedTest):
orig(*args)
idle.append(cluster.storage.em.isIdle())
l.release()
cluster = NEOCluster()
try:
cluster.start()
if 1:
t, c = cluster.getTransaction()
r = c.root()
r[''] = ''
......@@ -462,15 +437,12 @@ class Test(NEOThreadedTest):
load.join()
self.assertEqual(idle, [1, 0])
self.assertIn('', r)
finally:
cluster.stop()
def test_notifyNodeInformation(self):
@with_cluster(replicas=1)
def test_notifyNodeInformation(self, cluster):
# translated from MasterNotificationsHandlerTests
# (neo.tests.client.testMasterHandler)
cluster = NEOCluster(replicas=1)
try:
cluster.start()
if 1:
cluster.db # open DB
s0, s1 = cluster.client.nm.getStorageList()
conn = s0.getConnection()
......@@ -485,27 +457,21 @@ class Test(NEOThreadedTest):
# was called (even if it's useless in this case),
# but we would need an API to do that easily.
self.assertFalse(cluster.client.dispatcher.registered(conn))
finally:
cluster.stop()
def testRestartWithMissingStorage(self):
@with_cluster(replicas=1, partitions=10)
def testRestartWithMissingStorage(self, cluster):
# translated from neo.tests.functional.testStorage.StorageTest
cluster = NEOCluster(replicas=1, partitions=10)
s1, s2 = cluster.storage_list
try:
cluster.start()
if 1:
self.assertEqual([], cluster.getOutdatedCells())
finally:
cluster.stop()
cluster.stop()
# restart it with one storage only
cluster.reset()
try:
if 1:
cluster.start(storage_list=(s1,))
self.assertEqual(NodeStates.UNKNOWN, cluster.getNodeState(s2))
finally:
cluster.stop()
def testRestartStoragesWithReplicas(self):
@with_cluster(storage_count=2, partitions=2, replicas=1)
def testRestartStoragesWithReplicas(self, cluster):
"""
Check that the master must discard its partition table when the
cluster is not operational anymore. Which means that it must go back
......@@ -532,9 +498,7 @@ class Test(NEOThreadedTest):
self.assertNotEqual(getClusterState(), ClusterStates.RUNNING)
s0.resetNode()
s1.resetNode()
cluster = NEOCluster(storage_count=2, partitions=2, replicas=1)
try:
cluster.start()
if 1:
s0, s1 = cluster.storage_list
getClusterState = cluster.neoctl.getClusterState
if 1:
......@@ -557,10 +521,9 @@ class Test(NEOThreadedTest):
self.assertEqual(getClusterState(), ClusterStates.RUNNING)
self.assertEqual(cluster.getOutdatedCells(),
[(0, s0.uuid), (1, s0.uuid)])
finally:
cluster.stop()
def testVerificationCommitUnfinishedTransactions(self):
@with_cluster(partitions=2, storage_count=2)
def testVerificationCommitUnfinishedTransactions(self, cluster):
""" Verification step should commit locked transactions """
def onLockTransaction(storage, die=False):
def lock(orig, *args, **kw):
......@@ -569,9 +532,7 @@ class Test(NEOThreadedTest):
orig(*args, **kw)
storage.master_conn.close()
return Patch(storage.tm, lock=lock)
cluster = NEOCluster(partitions=2, storage_count=2)
try:
cluster.start()
if 1:
s0, s1 = cluster.sortStorageList()
t, c = cluster.getTransaction()
r = c.root()
......@@ -612,9 +573,7 @@ class Test(NEOThreadedTest):
di0 = s0.getDataLockInfo()
with onLockTransaction(s1, die=True):
self.commitWithStorageFailure(cluster.client, t)
finally:
cluster.stop()
cluster.reset()
cluster.stop()
(k, v), = set(s0.getDataLockInfo().iteritems()
).difference(di0.iteritems())
self.assertEqual(v, 1)
......@@ -625,7 +584,7 @@ class Test(NEOThreadedTest):
k, = (k for k, v in di1.iteritems() if v == 1)
del di1[k] # x.value = 1
self.assertEqual(di1.values(), [0])
try:
if 1:
cluster.start()
t, c = cluster.getTransaction()
r = c.root()
......@@ -634,19 +593,16 @@ class Test(NEOThreadedTest):
self.assertEqual(r[2], 'ok')
self.assertEqual(di0, s0.getDataLockInfo())
self.assertEqual(di1, s1.getDataLockInfo())
finally:
cluster.stop()
def testVerificationWithNodesWithoutReadableCells(self):
@with_cluster(replicas=1)
def testVerificationWithNodesWithoutReadableCells(self, cluster):
def onLockTransaction(storage, die_after):
def lock(orig, *args, **kw):
if die_after:
orig(*args, **kw)
sys.exit()
return Patch(storage.tm, lock=lock)
cluster = NEOCluster(replicas=1)
try:
cluster.start()
if 1:
t, c = cluster.getTransaction()
c.root()[0] = None
s0, s1 = cluster.storage_list
......@@ -672,10 +628,9 @@ class Test(NEOThreadedTest):
self.assertEqual(sorted(c.root()), [1])
self.tic()
t0, t1 = c.db().storage.iterator()
finally:
cluster.stop()
def testDropUnfinishedData(self):
@with_cluster(partitions=2, storage_count=2, replicas=1)
def testDropUnfinishedData(self, cluster):
def lock(orig, *args, **kw):
orig(*args, **kw)
storage.master_conn.close()
......@@ -684,9 +639,7 @@ class Test(NEOThreadedTest):
r.append(len(orig.__self__.getUnfinishedTIDDict()))
orig()
r.append(len(orig.__self__.getUnfinishedTIDDict()))
cluster = NEOCluster(partitions=2, storage_count=2, replicas=1)
try:
cluster.start()
if 1:
t, c = cluster.getTransaction()
c.root()._p_changed = 1
storage = cluster.storage_list[0]
......@@ -695,13 +648,10 @@ class Test(NEOThreadedTest):
t.commit()
self.tic()
self.assertEqual(r, [1, 0])
finally:
cluster.stop()
def testStorageUpgrade1(self):
cluster = NEOCluster()
try:
cluster.start()
@with_cluster()
def testStorageUpgrade1(self, cluster):
if 1:
storage = cluster.storage
t, c = cluster.getTransaction()
storage.dm.setConfiguration("version", None)
......@@ -718,42 +668,32 @@ class Test(NEOThreadedTest):
with Patch(storage.tm, lock=lambda *_: sys.exit()):
self.commitWithStorageFailure(cluster.client, t)
self.assertRaises(DatabaseFailure, storage.resetNode)
finally:
cluster.stop()
def testStorageReconnectDuringStore(self):
cluster = NEOCluster(replicas=1)
try:
cluster.start()
@with_cluster(replicas=1)
def testStorageReconnectDuringStore(self, cluster):
if 1:
t, c = cluster.getTransaction()
c.root()[0] = 'ok'
cluster.client.cp.closeAll()
t.commit() # store request
finally:
cluster.stop()
def testStorageReconnectDuringTransactionLog(self):
cluster = NEOCluster(storage_count=2, partitions=2)
try:
cluster.start()
@with_cluster(storage_count=2, partitions=2)
def testStorageReconnectDuringTransactionLog(self, cluster):
if 1:
t, c = cluster.getTransaction()
cluster.client.cp.closeAll()
tid, (t1,) = cluster.client.transactionLog(
ZERO_TID, c.db().lastTransaction(), 10)
finally:
cluster.stop()
def testStorageReconnectDuringUndoLog(self):
cluster = NEOCluster(storage_count=2, partitions=2)
try:
cluster.start()
@with_cluster(storage_count=2, partitions=2)
def testStorageReconnectDuringUndoLog(self, cluster):
if 1:
t, c = cluster.getTransaction()
cluster.client.cp.closeAll()
t1, = cluster.client.undoLog(0, 10)
finally:
cluster.stop()
def testDropNodeThenRestartCluster(self):
@with_cluster(storage_count=2, replicas=1)
def testDropNodeThenRestartCluster(self, cluster):
""" Start a cluster with more than one storage, down one, shutdown the
cluster then restart it. The partition table recovered must not include
the dropped node """
......@@ -762,10 +702,8 @@ class Test(NEOThreadedTest):
self.assertEqual(cluster.getNodeState(s2), NodeStates.RUNNING)
# start with two storage / one replica
cluster = NEOCluster(storage_count=2, replicas=1)
s1, s2 = cluster.storage_list
try:
cluster.start()
if 1:
checkNodeState(NodeStates.RUNNING)
self.assertEqual([], cluster.getOutdatedCells())
# drop one
......@@ -775,39 +713,29 @@ class Test(NEOThreadedTest):
checkNodeState(None)
self.assertEqual([], cluster.getOutdatedCells())
# restart with s2 only
finally:
cluster.stop()
cluster.reset()
try:
cluster.stop()
if 1:
cluster.start(storage_list=[s2])
checkNodeState(None)
# then restart it, it must be in pending state
s1.start()
self.tic()
checkNodeState(NodeStates.PENDING)
finally:
cluster.stop()
def test2Clusters(self):
cluster1 = NEOCluster()
cluster2 = NEOCluster()
try:
cluster1.start()
cluster2.start()
@with_cluster()
@with_cluster()
def test2Clusters(self, cluster1, cluster2):
if 1:
t1, c1 = cluster1.getTransaction()
t2, c2 = cluster2.getTransaction()
c1.root()['1'] = c2.root()['2'] = ''
t1.commit()
t2.commit()
finally:
cluster1.stop()
cluster2.stop()
def testAbortStorage(self):
cluster = NEOCluster(partitions=2, storage_count=2)
@with_cluster(partitions=2, storage_count=2)
def testAbortStorage(self, cluster):
storage = cluster.storage_list[0]
try:
cluster.start()
if 1:
# prevent storage to reconnect, in order to easily test
# that cluster becomes non-operational
with Patch(storage, connectToPrimary=sys.exit):
......@@ -821,17 +749,13 @@ class Test(NEOThreadedTest):
self.tic()
self.assertEqual(cluster.neoctl.getClusterState(),
ClusterStates.RUNNING)
finally:
cluster.stop()
def testShutdown(self):
@with_cluster(master_count=3, partitions=10, replicas=1, storage_count=3)
def testShutdown(self, cluster):
# BUG: Due to bugs in election, master nodes sometimes crash, or they
# declare themselves primary too quickly. The consequence is
# often an endless tic loop.
cluster = NEOCluster(master_count=3, partitions=10,
replicas=1, storage_count=3)
try:
cluster.start()
if 1:
# fill DB a little
t, c = cluster.getTransaction()
c.root()[''] = ''
......@@ -842,9 +766,7 @@ class Test(NEOThreadedTest):
cluster.join(cluster.master_list
+ cluster.storage_list
+ cluster.admin_list)
finally:
cluster.stop()
cluster.reset() # reopen DB to check partition tables
cluster.stop() # stop and reopen DB to check partition tables
dm = cluster.storage_list[0].dm
self.assertEqual(1, dm.getPTID())
pt = list(dm.getPartitionTable())
......@@ -855,14 +777,13 @@ class Test(NEOThreadedTest):
self.assertEqual(s.dm.getPTID(), 1)
self.assertEqual(list(s.dm.getPartitionTable()), pt)
def testInternalInvalidation(self):
@with_cluster()
def testInternalInvalidation(self, cluster):
def _handlePacket(orig, conn, packet, kw={}, handler=None):
if type(packet) is Packets.AnswerTransactionFinished:
ll()
orig(conn, packet, kw, handler)
cluster = NEOCluster()
try:
cluster.start()
if 1:
t1, c1 = cluster.getTransaction()
c1.root()['x'] = x1 = PCounter()
t1.commit()
......@@ -877,18 +798,9 @@ class Test(NEOThreadedTest):
t2.begin()
t.join()
self.assertEqual(x2.value, 1)
finally:
cluster.stop()
def testExternalInvalidation(self):
cluster = NEOCluster()
try:
self._testExternalInvalidation(cluster)
finally:
cluster.stop()
def _testExternalInvalidation(self, cluster):
cluster.start()
@with_cluster()
def testExternalInvalidation(self, cluster):
# Initialize objects
t1, c1 = cluster.getTransaction()
c1.root()['x'] = x1 = PCounter()
......@@ -982,10 +894,9 @@ class Test(NEOThreadedTest):
self.assertFalse(invalidations(c1))
self.assertEqual(x1.value, 1)
def testReadVerifyingStorage(self):
cluster = NEOCluster(storage_count=2, partitions=2)
try:
cluster.start()
@with_cluster(storage_count=2, partitions=2)
def testReadVerifyingStorage(self, cluster):
if 1:
t1, c1 = cluster.getTransaction()
c1.root()['x'] = x = PCounter()
t1.commit()
......@@ -1015,13 +926,10 @@ class Test(NEOThreadedTest):
self.assertEqual(map(u64, t1.oid_list), [0, 1])
# Check oid 1 is part of transaction metadata.
self.assertEqual(t2.oid_list, t1.oid_list)
finally:
cluster.stop()
def testClientReconnection(self):
cluster = NEOCluster()
try:
cluster.start()
@with_cluster()
def testClientReconnection(self, cluster):
if 1:
t1, c1 = cluster.getTransaction()
c1.root()['x'] = x1 = PCounter()
c1.root()['y'] = y = PCounter()
......@@ -1051,13 +959,10 @@ class Test(NEOThreadedTest):
t1.begin()
self.assertEqual(x1._p_changed, None)
self.assertEqual(x1.value, 1)
finally:
cluster.stop()
def testInvalidTTID(self):
cluster = NEOCluster()
try:
cluster.start()
@with_cluster()
def testInvalidTTID(self, cluster):
if 1:
client = cluster.client
txn = transaction.Transaction()
client.tpc_begin(txn)
......@@ -1065,16 +970,13 @@ class Test(NEOThreadedTest):
txn_context['ttid'] = add64(txn_context['ttid'], 1)
self.assertRaises(POSException.StorageError,
client.tpc_finish, txn, None)
finally:
cluster.stop()
def testStorageFailureDuringTpcFinish(self):
@with_cluster()
def testStorageFailureDuringTpcFinish(self, cluster):
def answerTransactionFinished(conn, packet):
if isinstance(packet, Packets.AnswerTransactionFinished):
raise StoppedOperation
cluster = NEOCluster()
try:
cluster.start()
if 1:
t, c = cluster.getTransaction()
c.root()['x'] = PCounter()
with cluster.master.filterConnection(cluster.client) as m2c:
......@@ -1088,10 +990,9 @@ class Test(NEOThreadedTest):
self.assertEqual(1, u64(c.root()['x']._p_oid))
self.assertFalse(cluster.client.new_oid_list)
self.assertEqual(2, u64(cluster.client.new_oid()))
finally:
cluster.stop()
def testClientFailureDuringTpcFinish(self):
@with_cluster()
def testClientFailureDuringTpcFinish(self, cluster):
"""
Third scenario:
......@@ -1126,9 +1027,7 @@ class Test(NEOThreadedTest):
self.tic()
s2m.remove(delayAnswerLockInformation)
return conn
cluster = NEOCluster()
try:
cluster.start()
if 1:
t, c = cluster.getTransaction()
r = c.root()
r['x'] = PCounter()
......@@ -1162,32 +1061,26 @@ class Test(NEOThreadedTest):
t.begin()
self.assertEqual(r['x'].value, 2)
self.assertTrue(tid2 < r['x']._p_serial)
finally:
cluster.stop()
def testMasterFailureBeforeVote(self):
@with_cluster(storage_count=2, partitions=2)
def testMasterFailureBeforeVote(self, cluster):
def waitStoreResponses(orig, *args):
result = orig(*args)
m2c, = cluster.master.getConnectionList(orig.__self__)
m2c.close()
self.tic()
return result
cluster = NEOCluster(storage_count=2, partitions=2)
try:
cluster.start()
if 1:
t, c = cluster.getTransaction()
c.root()['x'] = PCounter() # 1 store() to each storage
with Patch(cluster.client, waitStoreResponses=waitStoreResponses):
self.assertRaises(POSException.StorageError, t.commit)
self.assertEqual(cluster.neoctl.getClusterState(),
ClusterStates.RUNNING)
finally:
cluster.stop()
def testEmptyTransaction(self):
cluster = NEOCluster()
try:
cluster.start()
@with_cluster()
def testEmptyTransaction(self, cluster):
if 1:
txn = transaction.Transaction()
storage = cluster.getZODBStorage()
storage.tpc_begin(txn)
......@@ -1196,16 +1089,13 @@ class Test(NEOThreadedTest):
t, = storage.iterator()
self.assertEqual(t.tid, serial)
self.assertFalse(t.oid_list)
finally:
cluster.stop()
def testRecycledClientUUID(self):
@with_cluster()
def testRecycledClientUUID(self, cluster):
def notReady(orig, *args):
m2s.discard(delayNotifyInformation)
return orig(*args)
cluster = NEOCluster()
try:
cluster.start()
if 1:
cluster.getTransaction()
with cluster.master.filterConnection(cluster.storage) as m2s:
delayNotifyInformation = m2s.delayNotifyNodeInformation()
......@@ -1214,31 +1104,24 @@ class Test(NEOThreadedTest):
client.storage_bootstrap_handler, notReady=notReady):
x = client.load(ZERO_TID)
self.assertNotIn(delayNotifyInformation, m2s)
finally:
cluster.stop()
def testAutostart(self):
def startCluster():
@with_cluster(start_cluster=0, storage_count=3, autostart=3)
def testAutostart(self, cluster):
def startCluster(orig):
getClusterState = cluster.neoctl.getClusterState
self.assertEqual(ClusterStates.RECOVERING, getClusterState())
cluster.storage_list[2].start()
cluster = NEOCluster(storage_count=3, autostart=3)
try:
cluster.startCluster = startCluster
with Patch(cluster, startCluster=startCluster):
cluster.start(cluster.storage_list[:2])
finally:
cluster.stop()
del cluster.startCluster
def testAbortVotedTransaction(self):
@with_cluster(storage_count=2, partitions=2)
def testAbortVotedTransaction(self, cluster):
r = []
def tpc_finish(*args, **kw):
for storage in cluster.storage_list:
r.append(len(storage.dm.getUnfinishedTIDDict()))
raise NEOStorageError
cluster = NEOCluster(storage_count=2, partitions=2)
try:
cluster.start()
if 1:
t, c = cluster.getTransaction()
c.root()['x'] = PCounter()
with Patch(cluster.client, tpc_finish=tpc_finish):
......@@ -1249,17 +1132,11 @@ class Test(NEOThreadedTest):
self.assertFalse(storage.dm.getUnfinishedTIDDict())
t.begin()
self.assertNotIn('x', c.root())
finally:
cluster.stop()
def testStorageLostDuringRecovery(self):
@with_cluster(storage_count=2, partitions=2)
def testStorageLostDuringRecovery(self, cluster):
# Initialize a cluster.
cluster = NEOCluster(storage_count=2, partitions=2)
try:
cluster.start()
finally:
cluster.stop()
cluster.reset()
cluster.stop()
# Restart with a connection failure for the first AskPartitionTable.
# The master must not be stuck in RECOVERING state
# or re-make the partition table.
......@@ -1268,16 +1145,15 @@ class Test(NEOThreadedTest):
def askPartitionTable(orig, self, conn):
p.revert()
conn.close()
try:
if 1:
with Patch(cluster.master.pt, make=make), \
Patch(InitializationHandler,
askPartitionTable=askPartitionTable) as p:
cluster.start()
self.assertFalse(p.applied)
finally:
cluster.stop()
def testTruncate(self):
@with_cluster(replicas=1)
def testTruncate(self, cluster):
calls = [0, 0]
def dieFirst(i):
def f(orig, *args, **kw):
......@@ -1286,9 +1162,7 @@ class Test(NEOThreadedTest):
sys.exit()
return orig(*args, **kw)
return f
cluster = NEOCluster(replicas=1)
try:
cluster.start()
if 1:
t, c = cluster.getTransaction()
r = c.root()
tids = []
......@@ -1332,8 +1206,6 @@ class Test(NEOThreadedTest):
self.assertEqual(1, u64(c._storage.new_oid()))
for s in cluster.storage_list:
self.assertEqual(s.dm.getLastIDs()[0], truncate_tid)
finally:
cluster.stop()
def testConnectionTimeout(self):
conn = self.getLoopbackConnection()
......@@ -1348,14 +1220,13 @@ class Test(NEOThreadedTest):
conn.em.poll(1)
self.assertFalse(conn.isClosed())
def testClientDisconnectedFromMaster(self):
@with_cluster()
def testClientDisconnectedFromMaster(self, cluster):
def disconnect(conn, packet):
if isinstance(packet, Packets.AskObject):
m2c.close()
#return True
cluster = NEOCluster()
try:
cluster.start()
if 1:
t, c = cluster.getTransaction()
m2c, = cluster.master.getConnectionList(cluster.client)
cluster.client._cache.clear()
......@@ -1383,10 +1254,9 @@ class Test(NEOThreadedTest):
self.assertNotEqual(uuid, cluster.client.uuid)
# Second reconnection, for a successful load.
c.root
finally:
cluster.stop()
def testIdTimestamp(self):
@with_cluster()
def testIdTimestamp(self, cluster):
"""
Given a master M, a storage S, and 2 clients Ca and Cb.
......@@ -1412,9 +1282,7 @@ class Test(NEOThreadedTest):
ll()
def connectToStorage(client):
next(client.cp.iterateForObject(0))
cluster = NEOCluster()
try:
cluster.start()
if 1:
Ca = cluster.client
Ca.pt # only connect to the master
# In a separate thread, connect to the storage but suspend the
......@@ -1432,13 +1300,10 @@ class Test(NEOThreadedTest):
self.assertRaises(NEOPrimaryMasterLost, t.join)
self.assertTrue(s2c.isClosed())
connectToStorage(Cb)
finally:
cluster.stop()
def testPruneOrphan(self):
cluster = NEOCluster(storage_count=2, partitions=2)
try:
cluster.start()
@with_cluster(storage_count=2, partitions=2)
def testPruneOrphan(self, cluster):
if 1:
cluster.importZODB()(3)
bad = []
ok = []
......@@ -1466,10 +1331,9 @@ class Test(NEOThreadedTest):
check(1, bad)
check(0, ok)
check(1, ok)
finally:
cluster.stop()
def testLateConflictOnReplica(self):
@with_cluster(replicas=1)
def testLateConflictOnReplica(self, cluster):
"""
Already resolved conflict: check the case of a storage node that
reports a conflict after that this conflict was fully resolved with
......@@ -1480,9 +1344,7 @@ class Test(NEOThreadedTest):
p.revert()
ll()
orig(conn, conflicting, *args)
cluster = NEOCluster(replicas=1)
try:
cluster.start()
if 1:
s0, s1 = cluster.storage_list
t1, c1 = cluster.getTransaction()
c1.root()['x'] = x = PCounterWithResolution()
......@@ -1498,8 +1360,6 @@ class Test(NEOThreadedTest):
t = self.newThread(t1.commit)
ll()
t.join()
finally:
cluster.stop()
if __name__ == "__main__":
unittest.main()
......@@ -172,8 +172,7 @@ class ImporterTests(NEOThreadedTest):
c.db().close()
#del importer[0][1][importer.pop()[0]]
# Start NEO cluster with transparent import of a multi-base ZODB.
cluster = NEOCluster(compress=False, importer=importer)
try:
with NEOCluster(compress=False, importer=importer) as cluster:
# Suspend import for a while, so that import
# is finished in the middle of the below 'for' loop.
# Use a slightly different main loop for storage so that it
......@@ -220,8 +219,6 @@ class ImporterTests(NEOThreadedTest):
(x[i:] or '.', sorted(y), sorted(z))
for x, y, z in os.walk(src_root)))
t.commit()
finally:
cluster.stop()
if __name__ == "__main__":
......
......@@ -34,25 +34,21 @@ from neo.lib.protocol import CellStates, ClusterStates, Packets, \
ZERO_OID, ZERO_TID, MAX_TID, uuid_str
from neo.lib.util import p64
from .. import expectedFailure, Patch
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, predictable_random
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, \
predictable_random, with_cluster
def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
def decorator(wrapped):
def wrapper(self):
upstream = NEOCluster(partitions, **upstream_kw)
try:
with NEOCluster(partitions, **upstream_kw) as upstream:
upstream.start()
backup = NEOCluster(partitions, upstream=upstream, **backup_kw)
try:
with NEOCluster(partitions, upstream=upstream,
**backup_kw) as backup:
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
self.tic()
wrapped(self, backup)
finally:
backup.stop()
finally:
upstream.stop()
return wraps(wrapped)(wrapper)
return decorator
......@@ -90,14 +86,17 @@ class ReplicationTests(NEOThreadedTest):
np = 7
nr = 2
check_dict = dict.fromkeys(xrange(np))
upstream = NEOCluster(partitions=np, replicas=nr-1, storage_count=3)
try:
with NEOCluster(partitions=np, replicas=nr-1, storage_count=3
) as upstream:
upstream.start()
importZODB = upstream.importZODB()
importZODB(3)
backup = NEOCluster(partitions=np, replicas=nr-1, storage_count=5,
upstream=upstream)
try:
def delaySecondary(conn, packet):
if isinstance(packet, Packets.Replicate):
tid, upstream_name, source_dict = packet.decode()
return not upstream_name and all(source_dict.itervalues())
with NEOCluster(partitions=np, replicas=nr-1, storage_count=5,
upstream=upstream) as backup:
backup.start()
# Initialize & catch up.
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
......@@ -107,11 +106,9 @@ class ReplicationTests(NEOThreadedTest):
importZODB(17)
self.tic()
self.assertEqual(np*nr, self.checkBackup(backup))
# Check that a backup cluster can be restarted.
finally:
# Check that a backup cluster can be restarted.
backup.stop()
backup.reset()
try:
backup.start()
self.assertEqual(backup.neoctl.getClusterState(),
ClusterStates.BACKINGUP)
......@@ -126,14 +123,8 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(np*nr, self.checkBackup(backup))
self.assertEqual(backup.neoctl.getClusterState(),
ClusterStates.RUNNING)
finally:
backup.stop()
def delaySecondary(conn, packet):
if isinstance(packet, Packets.Replicate):
tid, upstream_name, source_dict = packet.decode()
return not upstream_name and all(source_dict.itervalues())
backup.reset()
try:
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
self.tic()
......@@ -147,10 +138,8 @@ class ReplicationTests(NEOThreadedTest):
self.tic()
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.last_tid))
finally:
backup.stop()
backup.reset()
try:
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
self.tic()
......@@ -164,10 +153,6 @@ class ReplicationTests(NEOThreadedTest):
self.tic()
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.last_tid))
finally:
backup.stop()
finally:
upstream.stop()
@predictable_random()
def testBackupNodeLost(self):
......@@ -193,16 +178,14 @@ class ReplicationTests(NEOThreadedTest):
node_list.remove(txn.getNode())
node_list[0].getConnection().close()
return orig(txn)
upstream = NEOCluster(partitions=np, replicas=0, storage_count=1)
try:
with NEOCluster(partitions=np, replicas=0, storage_count=1) as upstream:
upstream.start()
importZODB = upstream.importZODB(random=random)
# Do not start with an empty DB so that 'primary_dict' below is not
# empty on the first iteration.
importZODB(1)
backup = NEOCluster(partitions=np, replicas=2, storage_count=4,
upstream=upstream)
try:
with NEOCluster(partitions=np, replicas=2, storage_count=4,
upstream=upstream) as backup:
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
self.tic()
......@@ -232,10 +215,6 @@ class ReplicationTests(NEOThreadedTest):
backup.neoctl.checkReplicas(check_dict, ZERO_TID, None)
self.tic()
self.assertEqual(np*3, self.checkBackup(backup))
finally:
backup.stop()
finally:
upstream.stop()
@backup_test()
def testBackupUpstreamMasterDead(self, backup):
......@@ -305,16 +284,13 @@ class ReplicationTests(NEOThreadedTest):
self.tic()
self.assertEqual(1, self.checkBackup(backup))
def testBackupEarlyInvalidation(self):
@with_cluster()
def testBackupEarlyInvalidation(self, upstream):
"""
The backup master must ignore notification before being fully
The backup master must ignore notifications before being fully
initialized.
"""
upstream = NEOCluster()
try:
upstream.start()
backup = NEOCluster(upstream=upstream)
try:
with NEOCluster(upstream=upstream) as backup:
backup.start()
with ConnectionFilter() as f:
f.delayAskPartitionTable(lambda conn:
......@@ -323,11 +299,7 @@ class ReplicationTests(NEOThreadedTest):
upstream.importZODB()(1)
self.tic()
self.tic()
self.assertTrue(backup.master.isAlive())
finally:
backup.stop()
finally:
upstream.stop()
self.assertTrue(backup.master.is_alive())
@backup_test()
def testBackupTid(self, backup):
......@@ -343,7 +315,6 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(last_tid, backup.backup_tid)
backup.stop()
importZODB(1)
backup.reset()
with ConnectionFilter() as f:
f.delayAskFetchTransactions()
backup.start()
......@@ -351,7 +322,8 @@ class ReplicationTests(NEOThreadedTest):
self.tic()
self.assertEqual(1, self.checkBackup(backup))
def testSafeTweak(self):
@with_cluster(start_cluster=0, partitions=3, replicas=1, storage_count=3)
def testSafeTweak(self, cluster):
"""
Check that tweak always tries to keep a minimum of (replicas + 1)
readable cells, otherwise we have less/no redundancy as long as
......@@ -360,9 +332,8 @@ class ReplicationTests(NEOThreadedTest):
def changePartitionTable(orig, *args):
orig(*args)
sys.exit()
cluster = NEOCluster(partitions=3, replicas=1, storage_count=3)
s0, s1, s2 = cluster.storage_list
try:
if 1:
cluster.start([s0, s1])
s2.start()
self.tic()
......@@ -375,10 +346,9 @@ class ReplicationTests(NEOThreadedTest):
self.tic()
expectedFailure(self.assertEqual)(cluster.neoctl.getClusterState(),
ClusterStates.RUNNING)
finally:
cluster.stop()
def testReplicationAbortedBySource(self):
@with_cluster(start_cluster=0, partitions=3, replicas=1, storage_count=3)
def testReplicationAbortedBySource(self, cluster):
"""
Check that a feeding node aborts replication when its partition is
dropped, and that the out-of-date node finishes to replicate from
......@@ -403,11 +373,12 @@ class ReplicationTests(NEOThreadedTest):
# default for performance reason
orig.im_self.dropPartitions((offset,))
return orig(ptid, cell_list)
np = 3
cluster = NEOCluster(partitions=np, replicas=1, storage_count=3)
np = cluster.num_partitions
s0, s1, s2 = cluster.storage_list
for delayed in Packets.AskFetchTransactions, Packets.AskFetchObjects:
try:
if cluster.started:
cluster.stop(1)
if 1:
cluster.start([s0])
cluster.populate([range(np*2)] * np)
s1.start()
......@@ -425,20 +396,17 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(1, connection_filter.filtered_count)
self.tic()
self.checkPartitionReplicated(s1, s2, offset)
finally:
cluster.stop()
cluster.reset(True)
def testClientReadingDuringTweak(self):
@with_cluster(start_cluster=0, partitions=2, storage_count=2)
def testClientReadingDuringTweak(self, cluster):
# XXX: Currently, the test passes because data of dropped cells are not
# deleted while the cluster is operational: this is only done
# during the RECOVERING phase. But we'll want to be able to free
# disk space without service interruption, and for this the client
# may have to retry reading data from the new cells. If s0 deleted
# all data for partition 1, the test would fail with a POSKeyError.
cluster = NEOCluster(partitions=2, storage_count=2)
s0, s1 = cluster.storage_list
try:
if 1:
cluster.start([s0])
storage = cluster.getZODBStorage()
oid = p64(1)
......@@ -455,12 +423,10 @@ class ReplicationTests(NEOThreadedTest):
m2c.delayNotifyPartitionChanges()
self.tic()
self.assertEqual('foo', storage.load(oid)[0])
finally:
cluster.stop()
def testResumingReplication(self):
cluster = NEOCluster(replicas=1)
try:
@with_cluster(start_cluster=0, replicas=1)
def testResumingReplication(self, cluster):
if 1:
s0, s1 = cluster.storage_list
cluster.start(storage_list=(s0,))
t, c = cluster.getTransaction()
......@@ -485,12 +451,10 @@ class ReplicationTests(NEOThreadedTest):
s0.stop()
cluster.join((s0,))
t0, t1, t2 = c.db().storage.iterator()
finally:
cluster.stop()
def testReplicationBlockedByUnfinished(self):
cluster = NEOCluster(replicas=1)
try:
@with_cluster(start_cluster=0, replicas=1)
def testReplicationBlockedByUnfinished(self, cluster):
if 1:
s0, s1 = cluster.storage_list
cluster.start(storage_list=(s0,))
storage = cluster.getZODBStorage()
......@@ -521,10 +485,9 @@ class ReplicationTests(NEOThreadedTest):
self.assertPartitionTable(cluster, expected)
self.assertEqual(cluster.neoctl.getClusterState(),
ClusterStates.RUNNING)
finally:
cluster.stop()
def testCheckReplicas(self):
@with_cluster(partitions=5, replicas=2, storage_count=3)
def testCheckReplicas(self, cluster):
from neo.storage import checker
def corrupt(offset):
s0, s1, s2 = (storage_dict[cell.getUUID()]
......@@ -539,14 +502,11 @@ class ReplicationTests(NEOThreadedTest):
for cell in row[1]
if cell[1] == CellStates.CORRUPTED]))
self.assertEqual(expected_state, cluster.neoctl.getClusterState())
np = 5
np = cluster.num_partitions
tid_count = np * 3
corrupt_tid = tid_count // 2
check_dict = dict.fromkeys(xrange(np))
cluster = NEOCluster(partitions=np, replicas=2, storage_count=3)
try:
checker.CHECK_COUNT = 2
cluster.start()
with Patch(checker, CHECK_COUNT=2):
cluster.populate([range(np*2)] * tid_count)
storage_dict = {x.uuid: x for x in cluster.storage_list}
cluster.neoctl.checkReplicas(check_dict, ZERO_TID, None)
......@@ -566,9 +526,6 @@ class ReplicationTests(NEOThreadedTest):
cluster.neoctl.checkReplicas(check_dict, ZERO_TID, None)
self.tic()
check(ClusterStates.RECOVERING, 4)
finally:
checker.CHECK_COUNT = CHECK_COUNT
cluster.stop()
@backup_test()
def testBackupReadOnlyAccess(self, backup):
......
......@@ -44,7 +44,10 @@ class ZODBTestCase(TestCase):
def _tearDown(self, success):
self._storage.cleanup()
try:
self.neo.stop()
if functional:
self.neo.stop()
else:
self.neo.stop(None)
except Exception:
if success:
raise
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment