Commit f0744a6c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 09bdef24
...@@ -38,7 +38,7 @@ import logging as log ...@@ -38,7 +38,7 @@ import logging as log
from os.path import dirname from os.path import dirname
from errno import ENOENT, EEXIST from errno import ENOENT, EEXIST
from golang import chan, select, default, func from golang import chan, select, default, func, defer
from golang import sync, context from golang import sync, context
from golang.gcompat import qq from golang.gcompat import qq
import threading import threading
...@@ -151,6 +151,7 @@ class WatchLink(object): ...@@ -151,6 +151,7 @@ class WatchLink(object):
# test code - we currently get stuck here. # test code - we currently get stuck here.
# #
# XXX -> better pthread_kill(SIGINT) instead of relying on wcfs proper behaviour? # XXX -> better pthread_kill(SIGINT) instead of relying on wcfs proper behaviour?
# XXX -> we now have `kill -QUIT` to wcfs.go on test timeout - remove ^^^ comments?
try: try:
wlink._serveWG.wait() wlink._serveWG.wait()
except Exception as e: except Exception as e:
...@@ -166,6 +167,60 @@ class WatchLink(object): ...@@ -166,6 +167,60 @@ class WatchLink(object):
w.pinned = {} w.pinned = {}
wlink._watching = {} wlink._watching = {}
# _serveRX receives messages from ._wrx and dispatches them according to streamID.
@func
def _serveRX(wlink, ctx):
# when finishing - wakeup everyone waiting for rx
def _():
wlink._acceptq.close()
with wlink._rxmu:
rxtab = wlink._rxtab
wlink._rxtab = None # don't allow new rxtab registers
for rxq in rxtab.values():
rxq.close()
defer(_)
while 1:
# NOTE: .close() makes sure ._wrx.read*() will wake up
l = wlink._wrx.readline()
print('C: watch : rx: %r' % l)
if len(l) == 0: # peer closed its tx
wlink.rx_eof.close()
break
# <stream> ... \n
stream, msg = l.split(' ', 1)
stream = int(stream)
msg = msg.rstrip('\n')
if stream == 0: # control/fatal message from wcfs
# XXX print -> receive somewhere?
print('C: watch : rx fatal: %r' % msg)
wlink.fatalv.append(msg)
continue
reply = bool(stream % 2)
if reply:
with wlink._rxmu:
assert stream in wlink._rxtab # XXX !test assert - recheck
rxq = wlink._rxtab.pop(stream)
_, _rx = select(
ctx.done().recv, # 0
(rxq.send, msg), # 1
)
if _ == 0:
raise ctx.err()
else:
with wlink._rxmu:
assert stream not in wlink._accepted # XXX !test assert - recheck
wlink._accepted.add(stream)
_, _rx = select(
ctx.done().recv, # 0
(wlink._acceptq.send, (stream, msg)), # 1
)
if _ == 0:
raise ctx.err()
# ---- WCFS raw file access ---- # ---- WCFS raw file access ----
......
...@@ -719,7 +719,6 @@ class tWatch: ...@@ -719,7 +719,6 @@ class tWatch:
# tWatchLink provides testing environment for /head/watch link opened on wcfs. # tWatchLink provides testing environment for /head/watch link opened on wcfs.
# #
# .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages. XXX kill here?
# .watch() setups/adjusts a watch for a file and verifies that wcfs correctly sends initial pins. # .watch() setups/adjusts a watch for a file and verifies that wcfs correctly sends initial pins.
class tWatchLink(wcfs.WatchLink): class tWatchLink(wcfs.WatchLink):
...@@ -735,60 +734,6 @@ class tWatchLink(wcfs.WatchLink): ...@@ -735,60 +734,6 @@ class tWatchLink(wcfs.WatchLink):
# ---- message IO ---- # ---- message IO ----
# _serveRX receives messages from ._wrx and dispatches them according to streamID.
@func
def _serveRX(t, ctx):
# when finishing - wakeup everyone waiting for rx
def _():
t._acceptq.close()
with t._rxmu:
rxtab = t._rxtab
t._rxtab = None # don't allow new rxtab registers
for rxq in rxtab.values():
rxq.close()
defer(_)
while 1:
# NOTE: .close() makes sure ._wrx.read*() will wake up
l = t._wrx.readline()
print('C: watch : rx: %r' % l)
if len(l) == 0: # peer closed its tx
t.rx_eof.close()
break
# <stream> ... \n
stream, msg = l.split(' ', 1)
stream = int(stream)
msg = msg.rstrip('\n')
if stream == 0: # control/fatal message from wcfs
# XXX print -> receive somewhere?
print('C: watch : rx fatal: %r' % msg)
t.fatalv.append(msg)
continue
reply = bool(stream % 2)
if reply:
with t._rxmu:
assert stream in t._rxtab
rxq = t._rxtab.pop(stream)
_, _rx = select(
ctx.done().recv, # 0
(rxq.send, msg), # 1
)
if _ == 0:
raise ctx.err()
else:
with t._rxmu:
assert stream not in t._accepted
t._accepted.add(stream)
_, _rx = select(
ctx.done().recv, # 0
(t._acceptq.send, (stream, msg)), # 1
)
if _ == 0:
raise ctx.err()
# _send sends raw message via specified stream. # _send sends raw message via specified stream.
# #
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment