Commit 310b2283 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent fac36e13
......@@ -10,7 +10,7 @@ fi
# executed under qemu-runlinux
#export WENDELIN_CORE_WCFS_OPTIONS="-d -alsologtostderr -v=1"
py.test \
`# diable cache & warnings that it cannot be updated since fs under qrun is read-only` \
`# disable cache & warnings that it cannot be updated since fs under qrun is read-only` \
`# https://stackoverflow.com/a/47893653/9456786` \
-p no:cacheprovider \
\
......
......@@ -1486,6 +1486,7 @@ func (wlink *WatchLink) _serveRX() (err error) {
}
e2 := wlink.sk.Close()
fmt.Printf("S: wlink %d: sk.close -> %v\n", wlink.id, e2)
if e == nil {
e = e2
}
......
......@@ -421,21 +421,25 @@ class tWatch:
t._txmu = threading.Lock() # serializes writes
t._serveDone = chan()
go(t._serveRecv)
serveCtx, t._serveCancel = context.with_cancel(context.background())
t._serveWG = sync.WorkGroup(serveCtx)
t._serveWG.go(t._serveRecv)
tdb._tracked.add(t)
def close(t):
t.tdb._tracked.remove(t)
t._serveCancel()
# ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up
# _serveRecv on client (= on us).
t._send(1, b'bye')
# XXX we can get stuck here if wcfs does not behave as we want.
# XXX in particular if there is a silly - e.g. syntax or type error in
# test code - we curently get stuck here.
t._serveDone.recv()
print('# serveDone.recv() ...')
t._serveWG.wait()
print('# serveDone ready')
t.wtx.close()
t.wrx.close()
......@@ -450,9 +454,9 @@ class tWatch:
# _serveRecv receives messages from .w and dispatches them according to streamID.
@func
def _serveRecv(t):
defer(t._serveDone.close)
def _serveRecv(t, ctx):
while 1:
print('C: watch : rx wait ...')
l = t.wrx.readline()
print('C: watch : rx: %r' % l)
if len(l) == 0:
......@@ -468,12 +472,23 @@ class tWatch:
with t._rxmu:
assert stream in t._rxtab
rxq = t._rxtab.pop(stream)
rxq.send(msg)
_, _rx = select(
ctx.done().recv, # 0
(rxq.send, msg), # 1
)
if _ == 0:
raise ctx.err() # XXX ok? -> or just return?
else:
with t._rxmu:
assert stream not in t._accepted
t._accepted.add(stream)
t._acceptq.send((stream, msg))
_, _rx = select(
ctx.done().recv, # 0
(t._acceptq.send, (stream, msg)), # 1
)
if _ == 0:
raise ctx.err() # XXX ok? -> or just return?
# _send sends raw message via specified stream.
#
......@@ -657,9 +672,9 @@ def test_wcfs():
# XXX invalidation protocol ...
print('\n\n inv. protocol \n\n')
"""
w = t.openwatch()
"""
ctx = context.background() # XXX stub
wg = sync.WorkGroup(ctx)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment