Commit 7ecb8f14 authored by Kirill Smelkov's avatar Kirill Smelkov

X *: tests: don't hang on exception in non-main thread

Previously if an assert or something failed in spawned thread, the main
thread was usually spinning indefinitely = tests hang. -> Switch all
threading places to use sync.WorkGroup and this way if a thread fails,
all other threads are canceled and the exception is reported back to
wg.wait in main thread.
parent a4bfea98
# Wendeling.core.bigarray | Tests for ZBigArray
# Copyright (C) 2014-2019 Nexedi SA and Contributors.
# Copyright (C) 2014-2020 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -28,8 +28,8 @@ import transaction
from transaction import TransactionManager
from ZODB.POSException import ConflictError
from numpy import dtype, uint8, all, array_equal, arange
from golang import defer, func
from threading import Thread
from golang import defer, func, chan
from golang import context, sync
from six.moves import _thread
from pytest import raises
......@@ -255,7 +255,9 @@ def test_zbigarray_vs_conn_migration():
c21_1 = NotifyChannel() # T21 -> T11
# open, modify, commit, close, open, commit
def T11():
T11ident = [None] # [0] = gettid(T11)
def T11(ctx):
T11ident[0] = _thread.get_ident()
tell, wait = c12_1.tell, c21_1.wait
conn11_1 = db.open()
......@@ -273,8 +275,8 @@ def test_zbigarray_vs_conn_migration():
# close conn, wait till T21 reopens it
del a11, root11_1
conn11_1.close()
tell('T1-conn11_1-closed')
wait('T2-conn21-opened')
tell(ctx, 'T1-conn11_1-closed')
wait(ctx, 'T2-conn21-opened')
# open nother connection. it must be different
# (see appropriate place in zfile test about why)
......@@ -282,28 +284,31 @@ def test_zbigarray_vs_conn_migration():
assert conn11_2 is not conn11_1
root11_2 = conn11_2.root()
wait('T2-zarray2-modified')
wait(ctx, 'T2-zarray2-modified')
transaction.commit() # should be nothing
tell('T1-txn12-committed')
tell(ctx, 'T1-txn12-committed')
wait('T2-conn21-closed')
wait(ctx, 'T2-conn21-closed')
del root11_2
conn11_2.close()
# hold on this thread until main driver tells us
wait('T11-exit-command')
wait(ctx, 'T11-exit-command')
# open, modify, abort
def T21():
T21done = chan()
@func
def T21(ctx):
defer(T21done.close)
tell, wait = c21_1.tell, c12_1.wait
# wait until T1 finish setting up initial data and get its connection
# (see appropriate place in zfile tests for details)
wait('T1-conn11_1-closed')
wait(ctx, 'T1-conn11_1-closed')
conn21 = db.open()
assert conn21 is conn01
tell('T2-conn21-opened')
tell(ctx, 'T2-conn21-opened')
# modify zarray and arrange timings so that T1 commits after zarray is
# modified, but before we commit/abort.
......@@ -312,21 +317,21 @@ def test_zbigarray_vs_conn_migration():
a21[0:1] = [21] # XXX -> [0] = 21 after BigArray can
tell('T2-zarray2-modified')
wait('T1-txn12-committed')
tell(ctx, 'T2-zarray2-modified')
wait(ctx, 'T1-txn12-committed')
# abort - zarray2 should stay unchanged
transaction.abort()
del a21, root21
conn21.close()
tell('T2-conn21-closed')
tell(ctx, 'T2-conn21-closed')
t11, t21 = Thread(target=T11), Thread(target=T21)
t11.start(); t21.start()
t11_ident = t11.ident
t21.join() # NOTE not joining t11 yet
wg = sync.WorkGroup(context.background())
wg.go(T11)
wg.go(T21)
T21done.recv() # NOTE not joining t11 yet
# now verify that zarray2 stays at 11 state, i.e. T21 was really aborted
conn02 = db.open()
......@@ -346,62 +351,69 @@ def test_zbigarray_vs_conn_migration():
c21_2 = NotifyChannel() # T22 -> T12
# open, abort
T12done = chan()
@func
def T12():
def T12(ctx):
defer(T12done.close)
tell, wait = c12_2.tell, c21_2.wait
wait('T2-conn22-opened')
wait(ctx, 'T2-conn22-opened')
conn12 = db.open()
defer(conn12.close)
tell('T1-conn12-opened')
wait('T2-zarray2-modified')
tell(ctx, 'T1-conn12-opened')
wait(ctx, 'T2-zarray2-modified')
transaction.abort()
tell('T1-txn-aborted')
wait('T2-txn-committed')
tell(ctx, 'T1-txn-aborted')
wait(ctx, 'T2-txn-committed')
# open, modify, commit
T22done = chan()
@func
def T22():
def T22(ctx):
defer(T22done.close)
tell, wait = c21_2.tell, c12_2.wait
# make sure we are not the same thread which ran T11
# (should be so because we cared not to stop T11 yet)
assert _thread.get_ident() != t11_ident
assert _thread.get_ident() != T11ident[0]
conn22 = db.open()
defer(conn22.close)
assert conn22 is conn01
tell('T2-conn22-opened')
tell(ctx, 'T2-conn22-opened')
# modify zarray and arrange timings so that T1 does abort after we
# modify, but before we commit
wait('T1-conn12-opened')
wait(ctx, 'T1-conn12-opened')
root22 = conn22.root()
a22 = root22['zarray2']
a22[0:1] = [22] # XXX -> [0] = 22 after BigArray can
tell('T2-zarray2-modified')
wait('T1-txn-aborted')
tell(ctx, 'T2-zarray2-modified')
wait(ctx, 'T1-txn-aborted')
# commit - changes should propagate to zarray
transaction.commit()
tell('T2-txn-committed')
tell(ctx, 'T2-txn-committed')
t12, t22 = Thread(target=T12), Thread(target=T22)
t12.start(); t22.start()
t12.join(); t22.join()
wg.go(T12)
wg.go(T22)
T12done.recv()
T22done.recv()
# tell T11 to stop also
c21_1.tell('T11-exit-command')
t11.join()
def _(ctx):
c21_1.tell(ctx, 'T11-exit-command')
wg.go(_)
wg.wait()
# now verify that zarray2 changed to 22 state, i.e. T22 was really committed
conn03 = db.open()
......
......@@ -28,8 +28,8 @@ import transaction
from transaction import TransactionManager
from ZODB.POSException import ConflictError
from numpy import ndarray, array_equal, uint32, zeros, arange
from golang import defer, func
from threading import Thread
from golang import defer, func, chan
from golang import context, sync
from six.moves import _thread
from six import b
import struct
......@@ -229,7 +229,9 @@ def test_bigfile_filezodb_vs_conn_migration():
c21_1 = NotifyChannel() # T21 -> T11
# open, modify, commit, close, open, commit
def T11():
T11ident = [None] # [0] = gettid(T11)
def T11(ctx):
T11ident[0] = _thread.get_ident()
tell, wait = c12_1.tell, c21_1.wait
conn11_1 = db.open()
......@@ -253,8 +255,8 @@ def test_bigfile_filezodb_vs_conn_migration():
# close conn, wait till T21 reopens it
del vma11, fh11, a11, f11, root11_1
conn11_1.close()
tell('T1-conn11_1-closed')
wait('T2-conn21-opened')
tell(ctx, 'T1-conn11_1-closed')
wait(ctx, 'T2-conn21-opened')
# open another connection (e.g. for handling next request) which does
# not touch zfile at all, and arrange timings so that T2 modifies
......@@ -263,33 +265,36 @@ def test_bigfile_filezodb_vs_conn_migration():
assert conn11_2 is not conn11_1
root11_2 = conn11_2.root()
wait('T2-zfile2-modified')
wait(ctx, 'T2-zfile2-modified')
# XXX do we want to also modify some other objects?
# (but this have side effect for joining conn11_2 to txn)
transaction.commit() # should be nothing
tell('T1-txn12-committed')
tell(ctx, 'T1-txn12-committed')
wait('T2-conn21-closed')
wait(ctx, 'T2-conn21-closed')
del root11_2
conn11_2.close()
# hold on this thread until main driver tells us
wait('T11-exit-command')
wait(ctx, 'T11-exit-command')
# open, modify, abort
def T21():
T21done = chan()
@func
def T21(ctx):
defer(T21done.close)
tell, wait = c21_1.tell, c12_1.wait
# - wait until T1 finish setting up initial data for zfile and closes connection.
# - open that connection before T1 is asleep - because ZODB organizes
# connection pool as stack (with correction for #active objects),
# we should get exactly the same connection T1 had.
wait('T1-conn11_1-closed')
wait(ctx, 'T1-conn11_1-closed')
conn21 = db.open()
assert conn21 is conn01
tell('T2-conn21-opened')
tell(ctx, 'T2-conn21-opened')
# modify zfile and arrange timings so that T1 commits after zfile is
# modified, but before we commit/abort.
......@@ -301,21 +306,21 @@ def test_bigfile_filezodb_vs_conn_migration():
Blk(vma21, 0)[0] = 21
tell('T2-zfile2-modified')
wait('T1-txn12-committed')
tell(ctx, 'T2-zfile2-modified')
wait(ctx, 'T1-txn12-committed')
# abort - zfile2 should stay unchanged
transaction.abort()
del vma21, fh21, a21, root21
conn21.close()
tell('T2-conn21-closed')
tell(ctx, 'T2-conn21-closed')
t11, t21 = Thread(target=T11), Thread(target=T21)
t11.start(); t21.start()
t11_ident = t11.ident
t21.join() # NOTE not joining t11 yet
wg = sync.WorkGroup(context.background())
wg.go(T11)
wg.go(T21)
T21done.recv() # NOTE not joining t11 yet
# now verify that zfile2 stays at 11 state, i.e. T21 was really aborted
conn02 = db.open()
......@@ -340,39 +345,45 @@ def test_bigfile_filezodb_vs_conn_migration():
c21_2 = NotifyChannel() # T22 -> T12
# open, abort
def T12():
T12done = chan()
@func
def T12(ctx):
defer(T12done.close)
tell, wait = c12_2.tell, c21_2.wait
wait('T2-conn22-opened')
wait(ctx, 'T2-conn22-opened')
conn12 = db.open()
tell('T1-conn12-opened')
wait('T2-zfile2-modified')
tell(ctx, 'T1-conn12-opened')
wait(ctx, 'T2-zfile2-modified')
transaction.abort()
tell('T1-txn-aborted')
wait('T2-txn-committed')
tell(ctx, 'T1-txn-aborted')
wait(ctx, 'T2-txn-committed')
conn12.close()
# open, modify, commit
def T22():
T22done = chan()
@func
def T22(ctx):
defer(T22done.close)
tell, wait = c21_2.tell, c12_2.wait
# make sure we are not the same thread which ran T11
# (should be so because we cared not to stop T11 yet)
assert _thread.get_ident() != t11_ident
assert _thread.get_ident() != T11ident[0]
conn22 = db.open()
assert conn22 is conn01
tell('T2-conn22-opened')
tell(ctx, 'T2-conn22-opened')
# modify zfile and arrange timings so that T1 does abort after we
# modify, but before we commit
wait('T1-conn12-opened')
wait(ctx, 'T1-conn12-opened')
root22 = conn22.root()
a22 = root22['zarray2']
......@@ -381,24 +392,27 @@ def test_bigfile_filezodb_vs_conn_migration():
Blk(vma22, 0)[0] = 22
tell('T2-zfile2-modified')
wait('T1-txn-aborted')
tell(ctx, 'T2-zfile2-modified')
wait(ctx, 'T1-txn-aborted')
# commit - changes should propagate to zfile
transaction.commit()
tell('T2-txn-committed')
tell(ctx, 'T2-txn-committed')
conn22.close()
t12, t22 = Thread(target=T12), Thread(target=T22)
t12.start(); t22.start()
t12.join(); t22.join()
wg.go(T12)
wg.go(T22)
T12done.recv()
T22done.recv()
# tell T11 to stop also
c21_1.tell('T11-exit-command')
t11.join()
def _(ctx):
c21_1.tell(ctx, 'T11-exit-command')
wg.go(_)
wg.wait()
# now verify that zfile2 changed to 22 state, i.e. T22 was really committed
conn03 = db.open()
......
# Wendelin.core.bigfile | Threading tests
# Copyright (C) 2014-2015 Nexedi SA and Contributors.
# Copyright (C) 2014-2020 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -18,11 +18,13 @@
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from wendelin.bigfile import BigFile, WRITEOUT_STORE
from threading import Thread, Lock
from threading import Lock
from time import sleep
from wendelin.bigfile.tests.test_basic import bord_py3
from six.moves import _thread
from golang import select, default
from golang import context, sync
# Notify channel for
# - one thread to .wait('condition'), until
......@@ -32,19 +34,23 @@ class NotifyChannel:
def __init__(self):
self.state = None
def tell(self, condition):
def tell(self, ctx, condition):
#print >>sys.stderr, ' tell %s\tthread_id: %s\n' \
# % (condition, _thread.get_ident()),
# wait until other thread reads previous tell
while self.state is not None:
if ready(ctx.done()):
raise ctx.err()
pass
self.state = condition
def wait(self, condition):
def wait(self, ctx, condition):
#print >>sys.stderr, ' wait %s\tthread_id: %s\n' \
# % (condition, _thread.get_ident()),
while self.state != condition:
if ready(ctx.done()):
raise ctx.err()
pass
#print >>sys.stderr, ' have %s\tthread_id: %s\n' \
# % (condition, _thread.get_ident()),
......@@ -87,16 +93,18 @@ def test_thread_lock_vs_virtmem_lock():
c21 = NotifyChannel() # T2 -> T1
class ZLockBigFile(BigFile):
# .t1ctx - set by T1
def __new__(cls, blksize):
obj = BigFile.__new__(cls, blksize)
return obj
def Zsync_and_lockunlock(self):
tell, wait = c12.tell, c21.wait
ctx = self.t1ctx
# synchronize with invalidate in T2
tell('T1-V-under')
wait('T2-Z-taken')
tell(ctx, 'T1-V-under')
wait(ctx, 'T2-Z-taken')
# this will deadlock, if V is plain lock and calling from under-virtmem
# is done with V held
......@@ -115,34 +123,35 @@ def test_thread_lock_vs_virtmem_lock():
vma = fh.mmap(0, 1)
m = memoryview(vma)
def T1():
def T1(ctx):
f.t1ctx = ctx
m[0] # calls ZLockBigFile.loadblk()
tell, wait = c12.tell, c21.wait
wait('T2-Z-released')
wait(ctx, 'T2-Z-released')
m[0] = bord_py3(b'1') # make page dirty
fh.dirty_writeout(WRITEOUT_STORE) # calls ZLockBigFile.storeblk()
def T2():
def T2(ctx):
tell, wait = c21.tell, c12.wait
# cycle 0: vs loadblk in T0
# cycle 1: vs storeblk in T0
for _ in range(2):
wait('T1-V-under')
wait(ctx, 'T1-V-under')
Z.acquire()
tell('T2-Z-taken')
tell(ctx, 'T2-Z-taken')
fh2.invalidate_page(0) # NOTE invalidating page _not_ of fh
Z.release()
tell('T2-Z-released')
tell(ctx, 'T2-Z-released')
t1, t2 = Thread(target=T1), Thread(target=T2)
t1.start(); t2.start()
t1.join(); t2.join()
wg = sync.WorkGroup(context.background())
wg.go(T1)
wg.go(T2)
wg.wait()
# multiple access from several threads to the same page - block loaded only once
......@@ -162,12 +171,13 @@ def test_thread_multiaccess_sameblk():
vma = fh.mmap(0, 1)
m = memoryview(vma)
def T():
def T(ctx):
m[0] # calls CountBigFile.loadblk()
t1, t2 = Thread(target=T), Thread(target=T)
t1.start(); t2.start()
t1.join(); t2.join()
wg = sync.WorkGroup(context.background())
wg.go(T)
wg.go(T)
wg.wait()
assert d[0] == 1
......@@ -175,14 +185,21 @@ def test_thread_multiaccess_sameblk():
def test_thread_multiaccess_parallel():
# tid -> (T0 -> T<tid>, T<tid> -> T0)
channels = {}
# [0] = channels<T1>
# [1] = channels<T2>
channelv = [None, None]
# tid -> ctx in T<tid>
tidctx = {}
class SyncBigFile(BigFile):
def loadblk(self, blk, buf):
# tell driver we are in loadblk and wait untill it says us to go
cin, cout = channels[_thread.get_ident()]
cout.tell('ready')
cin.wait('go')
tid = _thread.get_ident()
ctx = tidctx[tid]
cin, cout = channels[tid]
cout.tell(ctx, 'ready')
cin.wait(ctx, 'go')
f = SyncBigFile(PS)
fh = f.fileh_open()
......@@ -190,30 +207,39 @@ def test_thread_multiaccess_parallel():
m = memoryview(vma)
def T1():
channels[_thread.get_ident()] = (NotifyChannel(), NotifyChannel())
def T1(ctx):
tid = _thread.get_ident()
tidctx[tid] = ctx
channelv[0] = channels[tid] = (NotifyChannel(), NotifyChannel())
m[0*PS]
def T2():
channels[_thread.get_ident()] = (NotifyChannel(), NotifyChannel())
def T2(ctx):
tid = _thread.get_ident()
tidctx[tid] = ctx
channelv[1] = channels[tid] = (NotifyChannel(), NotifyChannel())
m[1*PS]
t1, t2 = Thread(target=T1), Thread(target=T2)
t1.start(); t2.start()
wg = sync.WorkGroup(context.background())
wg.go(T1)
wg.go(T2)
while len(channels) != 2:
pass
c01, c10 = channels[t1.ident]
c02, c20 = channels[t2.ident]
c01, c10 = channelv[0]
c02, c20 = channelv[1]
def _(ctx):
c10.wait(ctx, 'ready'); c20.wait(ctx, 'ready')
c01.tell(ctx, 'go'); c02.tell(ctx, 'go')
wg.go(_)
c10.wait('ready'); c20.wait('ready')
c01.tell('go'); c02.tell('go')
t1.join(); t2.join()
wg.wait()
# loading vs invalidate of same page in another thread
def test_thread_load_vs_invalidate():
c12 = NotifyChannel() # T1 -> T2
c21 = NotifyChannel() # T2 -> T1
tidctx = {} # tid -> ctx used in T<n>
class RetryBigFile(BigFile):
def __new__(cls, blksize):
......@@ -222,13 +248,14 @@ def test_thread_load_vs_invalidate():
return obj
def loadblk(self, blk, buf):
ctx = tidctx[_thread.get_ident()]
tell, wait = c12.tell, c21.wait
bufmem = memoryview(buf)
# on the first cycle we synchronize with invalidate in T2
if self.cycle == 0:
tell('T1-loadblk0-ready')
wait('T1-loadblk0-go')
tell(ctx, 'T1-loadblk0-ready')
wait(ctx, 'T1-loadblk0-go')
# here we know request to invalidate this page came in and this
# '1' should be ignored by virtmem
bufmem[0] = bord_py3(b'1')
......@@ -246,16 +273,32 @@ def test_thread_load_vs_invalidate():
vma = fh.mmap(0, 1)
m = memoryview(vma)
def T1():
def T1(ctx):
tidctx[_thread.get_ident()] = ctx
assert m[0] == bord_py3(b'2')
def T2():
def T2(ctx):
tidctx[_thread.get_ident()] = ctx
tell, wait = c21.tell, c12.wait
wait('T1-loadblk0-ready')
wait(ctx, 'T1-loadblk0-ready')
fh.invalidate_page(0)
tell('T1-loadblk0-go')
tell(ctx, 'T1-loadblk0-go')
wg = sync.WorkGroup(context.background())
wg.go(T1)
wg.go(T2)
wg.wait()
# ---- misc ----
t1, t2 = Thread(target=T1), Thread(target=T2)
t1.start(); t2.start()
t1.join(); t2.join()
def ready(ch):
_, _rx = select(
default, # 0
ch.recv, # 1
)
if _ == 0:
return False
return True
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment