Commit 23d34fb0 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 16371832
...@@ -474,8 +474,11 @@ class tWatchLink: ...@@ -474,8 +474,11 @@ class tWatchLink:
# object is closed -> dup fd so that each file object has its own fd. # object is closed -> dup fd so that each file object has its own fd.
wh = os.open(tdb.path("head/watch"), os.O_RDWR) wh = os.open(tdb.path("head/watch"), os.O_RDWR)
wh2 = os.dup(wh) wh2 = os.dup(wh)
t.wrx = os.fdopen(wh, 'rb') t._wrx = os.fdopen(wh, 'rb')
t.wtx = os.fdopen(wh2, 'wb') t._wtx = os.fdopen(wh2, 'wb')
# .rx_eof becomes ready when wcfs closes its tx side
t.rx_eof = chan()
# inv.protocol message IO # inv.protocol message IO
t._acceptq = chan() # (stream, msg) server originated messages go here t._acceptq = chan() # (stream, msg) server originated messages go here
...@@ -513,17 +516,18 @@ class tWatchLink: ...@@ -513,17 +516,18 @@ class tWatchLink:
if e != context.canceled: if e != context.canceled:
reraise(e, None, e.__traceback__) reraise(e, None, e.__traceback__)
t.wtx.close() t._wtx.close()
t.wrx.close() t._wrx.close()
# ---- message IO ---- # ---- message IO ----
# _serveRX receives messages from .wrx and dispatches them according to streamID. # _serveRX receives messages from ._wrx and dispatches them according to streamID.
@func @func
def _serveRX(t, ctx): def _serveRX(t, ctx):
# when finishing - wakeup everyone waiting for rx # when finishing - wakeup everyone waiting for rx
def _(): def _():
t.rx_eof.close()
t._acceptq.close() t._acceptq.close()
with t._rxmu: with t._rxmu:
rxtab = t._rxtab rxtab = t._rxtab
...@@ -533,8 +537,8 @@ class tWatchLink: ...@@ -533,8 +537,8 @@ class tWatchLink:
defer(_) defer(_)
while 1: while 1:
# NOTE: .close() makes sure .wrx.read*() will wake up # NOTE: .close() makes sure ._wrx.read*() will wake up
l = t.wrx.readline() l = t._wrx.readline()
print('C: watch : rx: %r' % l) print('C: watch : rx: %r' % l)
if len(l) == 0: if len(l) == 0:
break # closed break # closed
...@@ -576,8 +580,8 @@ class tWatchLink: ...@@ -576,8 +580,8 @@ class tWatchLink:
with t._txmu: with t._txmu:
pkt = b"%d %s\n" % (stream, msg) pkt = b"%d %s\n" % (stream, msg)
#print('C: watch : tx: %r' % pkt) #print('C: watch : tx: %r' % pkt)
t.wtx.write(pkt) t._wtx.write(pkt)
t.wtx.flush() t._wtx.flush()
# sendReq sends client -> server request and returns server reply. # sendReq sends client -> server request and returns server reply.
# #
...@@ -873,13 +877,20 @@ def test_wcfs(): ...@@ -873,13 +877,20 @@ def test_wcfs():
# >>> invalidation protocol # >>> invalidation protocol
print('\n\n inv. protocol \n\n') print('\n\n inv. protocol \n\n')
# XXX invalid requests -> wcfs replies error # invalid requests -> wcfs replies error
# XXX -> separate test?
wl = t.openwatch() wl = t.openwatch()
assert wl.sendReq(context.background(), b'bla bla') == b"error bad watch: not a watch request" assert wl.sendReq(context.background(), b'bla bla') == b"error bad watch: not a watch request"
# XXX assert wl closed # wcfs must close watch link after invalid request
_, _rx = select(
time.after(3*time.second).recv,
wl.rx_eof.recv,
)
if _ == 0:
raise RuntimeError("%s: did not rx EOF after invalid watch request" % wl)
wl.close() wl.close()
return
for zf in t.zfiles(): for zf in t.zfiles():
# watch going at_i -> at_j -> ... # watch going at_i -> at_j -> ...
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment