Commit ebabc92e authored by Julien Muchembled's avatar Julien Muchembled

Add threaded test decorator to help writing backup tests

parent 00b76f60
......@@ -20,6 +20,7 @@ import time
import threading
import transaction
import unittest
from functools import wraps
from neo.lib import logging
from neo.storage.checker import CHECK_COUNT
from neo.storage.replicator import Replicator
......@@ -34,6 +35,26 @@ from . import ConnectionFilter, NEOCluster, NEOThreadedTest, Patch, \
from neo.client.pool import CELL_CONNECTED, CELL_GOOD
def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
def decorator(wrapped):
def wrapper(self):
upstream = NEOCluster(partitions, **upstream_kw)
try:
upstream.start()
backup = NEOCluster(partitions, upstream=upstream, **backup_kw)
try:
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
backup.tic()
wrapped(self, backup)
finally:
backup.stop()
finally:
upstream.stop()
return wraps(wrapped)(wrapper)
return decorator
class ReplicationTests(NEOThreadedTest):
def checksumPartition(self, storage, partition, max_tid=MAX_TID):
......@@ -220,7 +241,8 @@ class ReplicationTests(NEOThreadedTest):
finally:
upstream.stop()
def testBackupUpstreamMasterDead(self):
@backup_test()
def testBackupUpstreamMasterDead(self, backup):
"""Check proper behaviour when upstream master is unreachable
More generally, this checks that when a handler raises when a connection
......@@ -228,32 +250,19 @@ class ReplicationTests(NEOThreadedTest):
be, for example, closed again after the exception is catched, without
assertion failure.
"""
upstream = NEOCluster()
try:
upstream.start()
importZODB = upstream.importZODB()
backup = NEOCluster(upstream=upstream)
try:
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
backup.tic()
conn, = backup.master.getConnectionList(upstream.master)
# trigger ping
conn.updateTimeout(1)
self.assertFalse(conn.isPending())
conn.checkTimeout(time.time())
self.assertTrue(conn.isPending())
# force ping to have expired
conn.updateTimeout(1)
# connection will be closed before upstream master has time
# to answer
backup.tic(force=1)
new_conn, = backup.master.getConnectionList(upstream.master)
self.assertFalse(new_conn is conn)
finally:
backup.stop()
finally:
upstream.stop()
conn, = backup.master.getConnectionList(backup.upstream.master)
# trigger ping
conn.updateTimeout(1)
self.assertFalse(conn.isPending())
conn.checkTimeout(time.time())
self.assertTrue(conn.isPending())
# force ping to have expired
conn.updateTimeout(1)
# connection will be closed before upstream master has time
# to answer
backup.tic(force=1)
new_conn, = backup.master.getConnectionList(backup.upstream.master)
self.assertFalse(new_conn is conn)
def testReplicationAbortedBySource(self):
"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment