Commit f2335127 authored by Kirill Smelkov's avatar Kirill Smelkov

racetest: Review

- Factor common logic to spawn and run group of test threads into
  TestGroup. This way the extra checks and robustness improvements, that
  Dieter just added to check_race_external_invalidate_vs_disconnect,
  become available to all tests in racetest module.

  Rework moved code so that nwork is not fixed beforehand and test threads
  can be added dynamically.

- fix waiting logic in Finished:
  * on py2 Condition.wait does not return True/False as it does on py3 -
    we need to manually inspect the state.
  * fix race for when wait is called with already met condition:
    previously in such case it was waiting indefinitely and reporting
    failure on timeout

- rename Finished to well-established concept of WaitGroup and adjust
  its interface accordingly (see
  https://pkg.go.dev/sync#WaitGroup,
  https://lab.nexedi.com/nexedi/pygolang/blob/master/golang/sync.py and
  https://lab.nexedi.com/nexedi/pygolang/blob/39dde7eb/golang/sync.cpp#L153-200)

- no need to wrap try/except with additional try/finally, as
  try/except/finally works out of the box.
parent 017928c0
...@@ -160,10 +160,7 @@ class RaceTests(object): ...@@ -160,10 +160,7 @@ class RaceTests(object):
# Access to half of the objects is organized to always trigger loading # Access to half of the objects is organized to always trigger loading
# from zstor. Access to the other half goes through zconn cache and so # from zstor. Access to the other half goes through zconn cache and so
# verifies whether the cache is not stale. # verifies whether the cache is not stale.
failed = threading.Event() def verify(tg):
failure = [None]
def verify():
transaction.begin() transaction.begin()
zconn = db.open() zconn = db.open()
root = zconn.root() root = zconn.root()
...@@ -177,8 +174,7 @@ class RaceTests(object): ...@@ -177,8 +174,7 @@ class RaceTests(object):
except AssertionError as e: except AssertionError as e:
msg = "verify: %s\n" % e msg = "verify: %s\n" % e
msg += _state_details(root) msg += _state_details(root)
failure[0] = msg tg.fail(msg)
failed.set()
# we did not changed anything; also fails with commit: # we did not changed anything; also fails with commit:
transaction.abort() transaction.abort()
...@@ -187,7 +183,7 @@ class RaceTests(object): ...@@ -187,7 +183,7 @@ class RaceTests(object):
# `modify` changes objects in the database by executing "next" step. # `modify` changes objects in the database by executing "next" step.
# #
# Spec invariant should be preserved. # Spec invariant should be preserved.
def modify(): def modify(tg):
transaction.begin() transaction.begin()
zconn = db.open() zconn = db.open()
...@@ -200,32 +196,21 @@ class RaceTests(object): ...@@ -200,32 +196,21 @@ class RaceTests(object):
# `xrun` runs f in a loop until either N iterations, or until failed is # `xrun` runs f in a loop until either N iterations, or until failed is
# set. # set.
def xrun(f, N): def xrun(tg, tx, f, N):
try:
for i in range(N): for i in range(N):
# print('%s.%d' % (f.__name__, i)) # print('%s.%d' % (f.__name__, i))
f() f(tg)
if failed.is_set(): if tg.failed.is_set():
break break
except: # noqa: E722 do not use bare 'except'
failed.set()
raise
# loop verify and modify concurrently. # loop verify and modify concurrently.
init() init()
N = 500 N = 500
tverify = Daemon( tg = TestGroup(self)
name='Tverify', target=xrun, args=(verify, N)) tg.go(xrun, verify, N, name='Tverify')
tmodify = Daemon( tg.go(xrun, modify, N, name='Tmodify')
name='Tmodify', target=xrun, args=(modify, N)) tg.wait(60)
tverify.start()
tmodify.start()
tverify.join(60)
tmodify.join(60)
if failed.is_set():
self.fail(failure[0])
# client-server storages like ZEO, NEO and RelStorage allow several storage # client-server storages like ZEO, NEO and RelStorage allow several storage
# clients to be connected to single storage server. # clients to be connected to single storage server.
...@@ -286,10 +271,7 @@ class RaceTests(object): ...@@ -286,10 +271,7 @@ class RaceTests(object):
# #
# Once in a while T tries to modify the database executing spec "next" # Once in a while T tries to modify the database executing spec "next"
# as test source of changes for other workers. # as test source of changes for other workers.
failed = threading.Event() def T(tg, tx, N):
failure = [None] * nwork # [tx] is failure from T(tx)
def T(tx, N):
db = self.dbopen() db = self.dbopen()
def t_(): def t_():
...@@ -306,8 +288,7 @@ class RaceTests(object): ...@@ -306,8 +288,7 @@ class RaceTests(object):
except AssertionError as e: except AssertionError as e:
msg = "T%s: %s\n" % (tx, e) msg = "T%s: %s\n" % (tx, e)
msg += _state_details(root) msg += _state_details(root)
failure[tx] = msg tg.fail(msg)
failed.set()
# change objects once in a while # change objects once in a while
if randint(0, 4) == 0: if randint(0, 4) == 0:
...@@ -327,11 +308,8 @@ class RaceTests(object): ...@@ -327,11 +308,8 @@ class RaceTests(object):
for i in range(N): for i in range(N):
# print('T%s.%d' % (tx, i)) # print('T%s.%d' % (tx, i))
t_() t_()
if failed.is_set(): if tg.failed.is_set():
break break
except: # noqa: E722 do not use bare 'except'
failed.set()
raise
finally: finally:
db.close() db.close()
...@@ -339,17 +317,10 @@ class RaceTests(object): ...@@ -339,17 +317,10 @@ class RaceTests(object):
init() init()
N = 100 N = 100
tg = [] tg = TestGroup(self)
for x in range(nwork): for _ in range(nwork):
t = Daemon(name='T%d' % x, target=T, args=(x, N)) tg.go(T, N)
t.start() tg.wait(60)
tg.append(t)
for t in tg:
t.join(60)
if failed.is_set():
self.fail('\n\n'.join([_ for _ in failure if _]))
# verify storage for race in between client disconnect and external # verify storage for race in between client disconnect and external
# invalidations. https://github.com/zopefoundation/ZEO/issues/209 # invalidations. https://github.com/zopefoundation/ZEO/issues/209
...@@ -382,11 +353,7 @@ class RaceTests(object): ...@@ -382,11 +353,7 @@ class RaceTests(object):
# `T` is similar to the T from _check_race_load_vs_external_invalidate # `T` is similar to the T from _check_race_load_vs_external_invalidate
# but reconnects to the database often. # but reconnects to the database often.
failed = threading.Event() def T(tg, tx, N):
failure = [None] * nwork # [tx] is failure from T(tx)
finished = Finished(nwork)
def T(tx, N):
def t_(): def t_():
def work1(db): def work1(db):
transaction.begin() transaction.begin()
...@@ -402,8 +369,7 @@ class RaceTests(object): ...@@ -402,8 +369,7 @@ class RaceTests(object):
except AssertionError as e: except AssertionError as e:
msg = "T%s: %s\n" % (tx, e) msg = "T%s: %s\n" % (tx, e)
msg += _state_details(root) msg += _state_details(root)
failure[tx] = msg tg.fail(msg)
failed.set()
zconn.close() zconn.close()
transaction.abort() transaction.abort()
...@@ -426,53 +392,26 @@ class RaceTests(object): ...@@ -426,53 +392,26 @@ class RaceTests(object):
db = self.dbopen() db = self.dbopen()
try: try:
for i in range(4): for i in range(4):
if failed.is_set(): if tg.failed.is_set():
break break
work1(db) work1(db)
finally: finally:
db.close() db.close()
try:
try:
for i in range(N): for i in range(N):
# print('T%s.%d' % (tx, i)) # print('T%s.%d' % (tx, i))
if failed.is_set(): if tg.failed.is_set():
break break
t_() t_()
except: # noqa: E722 do not use bare 'except'
failed.set()
raise
finally:
finished()
# run the workers concurrently. # run the workers concurrently.
init() init()
N = 100 // (2*4) # N reduced to save time N = 100 // (2*4) # N reduced to save time
tg = [] tg = TestGroup(self)
for x in range(nwork): for _ in range(nwork):
t = Daemon(name='T%d' % x, target=T, args=(x, N)) tg.go(T, N)
t.start() tg.wait(60)
tg.append(t)
time_to_finish = 60 # seconds
if not finished.wait(time_to_finish):
failed.set()
failure.append("test did not finish within %s seconds"
% time_to_finish)
failed_to_finish = []
for t in tg:
try:
t.join(1)
except AssertionError:
failed.set()
failed_to_finish.append(t.name)
if failed_to_finish:
failure.append("threads did not finish: %s" % failed_to_finish)
if failed.is_set():
self.fail('\n\n'.join([_ for _ in failure if _]))
# `_state_init` initializes the database according to the spec. # `_state_init` initializes the database according to the spec.
...@@ -544,6 +483,68 @@ def _state_details(root): # -> txt ...@@ -544,6 +483,68 @@ def _state_details(root): # -> txt
return txt return txt
class TestGroup(object):
"""TestGroup represents group of threads that run together to verify
somthing.
- .go() adds test thread to the group.
- .wait() waits for all spawned threads to finish and reports all
collected failures to containing testcase.
- a test should indicate failure by call to .fail()
"""
def __init__(tg, testcase):
tg.testcase = testcase
tg.failed = threading.Event()
tg.fail_mu = threading.Lock()
tg.failv = [None] # failures registerd by .fail
tg.threadv = [] # spawned threads
tg.waitg = WaitGroup() # to wait for spawned threads
def fail(tg, msg):
"""fail adds failure to test result."""
with tg.fail_mu:
tg.failv.append(msg)
tg.failed.set()
def go(tg, f, *argv, **kw):
"""go spawns f(tg, #thread, *argv, **kw) in new test thread."""
tg.waitg.add(1)
tx = len(tg.threadv)
tname = kw.pop('name', 'T%d' % tx)
t = Daemon(name=tname, target=tg._run, args=(f, tx, argv, kw))
t.start()
tg.threadv.append(t)
def _run(tg, f, tx, argv, kw):
try:
f(tg, tx, *argv, **kw)
except Exception as e:
tg.fail("Unhandled exception %r" % (e,))
raise
finally:
tg.waitg.done()
def wait(tg, timeout):
"""wait waits for all test threads to complete and reports all
collected failures to containing testcase."""
if not tg.waitg.wait(timeout):
tg.fail("test did not finish within %s seconds" % timeout)
failed_to_finish = []
for t in tg.threadv:
try:
t.join(1)
except AssertionError:
tg.failed.set()
failed_to_finish.append(t.name)
if failed_to_finish:
tg.fail("threads did not finish: %s" % failed_to_finish)
if tg.failed.is_set():
tg.testcase.fail('\n\n'.join([_ for _ in tg.failv if _]))
class Daemon(threading.Thread): class Daemon(threading.Thread):
"""auxiliary class to create daemon threads and fail if not stopped. """auxiliary class to create daemon threads and fail if not stopped.
...@@ -597,20 +598,33 @@ class Daemon(threading.Thread): ...@@ -597,20 +598,33 @@ class Daemon(threading.Thread):
exc_lock = threading.Lock() exc_lock = threading.Lock()
class Finished(object): class WaitGroup(object):
"""Auxiliary class to wait for n threads to finish.""" """WaitGroup provides service to wait for spawned workers to be done.
def __init__(self, no_threads):
"""initialize to wait for *no_threads* to finish.""" - .add() adds workers
self.no_threads = no_threads - .done() indicates that one worker is done
- .wait() waits until all workers are done
"""
def __init__(self):
self.n = 0
self.condition = threading.Condition() self.condition = threading.Condition()
def __call__(self): def add(self, delta):
"""report that one thread finished."""
with self.condition: with self.condition:
self.no_threads -= 1 self.n += delta
if self.no_threads <= 0: if self.n < 0:
raise AssertionError("#workers is negative")
if self.n == 0:
self.condition.notify() self.condition.notify()
def wait(self, timeout): def done(self):
self.add(-1)
def wait(self, timeout): # -> ok
with self.condition: with self.condition:
return self.condition.wait(timeout) if self.n == 0:
return True
ok = self.condition.wait(timeout)
if ok is None: # py2
ok = (self.n == 0)
return ok
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment