Commit c4ac45a8 authored by Julien Muchembled's avatar Julien Muchembled

Fix several random failures in tests that didn't wait for transaction to be unlocked

NEOCluster.tic() gets a new 'slave' parameter that must be True when a client
node is in 'master' mode (i.e. setPoll(True)). In this case, tic() will wait
that all nodes finish their work and the client polls with a non-zero timeout.

Here, tic(slave=1) is used to wait for the storage to process
NotifyUnlockInformation notification from the master.

Traceback (most recent call last):
  File "neo/tests/threaded/test.py", line 80, in testBasicStore
    self.assertEqual(data_info, cluster.storage.getDataLockInfo())
  File "neo/tests/__init__.py", line 170, in assertEqual
    return super(NeoTestBase, self).assertEqual(first, second, msg=msg)
failureException: {('\x0b\xee\xc7\xb5\xea?\x0f\xdb\xc9]\r\xd4\x7f<[\xc2u\xda\x8a3', 0): 0} != {('\x0b\xee\xc7\xb5\xea?\x0f\xdb\xc9]\r\xd4\x7f<[\xc2u\xda\x8a3', 0): 1}
parent 5dc1f06c
......@@ -72,6 +72,7 @@ class Serialized(object):
cls._lock_list = deque() # FIFO of Semaphore
cls._lock_lock = threading.Lock()
cls._pdb = False
cls.blocking = threading.Event()
cls.pending = 0
@classmethod
......@@ -172,9 +173,16 @@ class SerializedEventManager(EventManager):
Serialized.tic(self._lock)
if blocking != 0:
blocking = self._blocking
if blocking != 0 and Serialized.pending == 1:
Serialized.pending = blocking = 0
EventManager._poll(self, blocking)
if blocking != 0:
if Serialized.pending == 1:
Serialized.pending = blocking = 0
else:
Serialized.blocking.set()
try:
EventManager._poll(self, blocking)
finally:
if blocking:
Serialized.blocking.clear()
def addReader(self, conn):
EventManager.addReader(self, conn)
......@@ -716,13 +724,14 @@ class NEOCluster(object):
self._unpatch()
@staticmethod
def tic(force=False):
# XXX: Should we automatically switch client in slave mode if it isn't ?
def tic(force=False, slave=False):
f = sys._getframe(1)
try:
logging.info('tic (%s:%u) ...', f.f_code.co_filename, f.f_lineno)
finally:
del f
if slave:
return Serialized.blocking.wait()
if force:
Serialized.tic()
logging.info('forced tic')
......
......@@ -77,6 +77,7 @@ class Test(NEOThreadedTest):
self.assertEqual(data_info, cluster.storage.getDataLockInfo())
serial = storage.tpc_finish(txn)
data_info[key] = 0
cluster.tic(slave=1)
self.assertEqual(data_info, cluster.storage.getDataLockInfo())
self.assertEqual((data, serial), storage.load(oid, ''))
storage._cache.clear()
......@@ -183,6 +184,7 @@ class Test(NEOThreadedTest):
self.assertEqual(data_info, cluster.storage.getDataLockInfo())
tid1 = storage.tpc_finish(txn[2])
cluster.tic(slave=1)
data_info[key] -= 1
self.assertEqual(data_info, cluster.storage.getDataLockInfo())
......@@ -401,6 +403,7 @@ class Test(NEOThreadedTest):
t, c = cluster.getTransaction()
c.root()[0] = 'ok'
t.commit()
cluster.tic(slave=1)
data_info = cluster.storage.getDataLockInfo()
self.assertEqual(data_info.values(), [0, 0])
# (obj|trans) become t(obj|trans)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment