Commit d2973294 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 381ce6b2
......@@ -789,56 +789,16 @@ def watch(twlink, zf, at): # XXX -> ?
pinok = pin
print('# pinok: %s' % pinstr(pinok))
# send watch request and check that we receive pins for in-cache blocks
# changed > at. FIXME "in-cache" is currently not handled
twlink._watch(zf, at, pinok, "ok")
# _watch sends watch request for zf@at, expects initial pins specified by pinok, and final reply.
# _watch sends watch request for zf@at, expects initial pins specified by pinok and final reply.
#
# pinok: {} blk -> at that have to be pinned.
# if replyok ends with '…' only reply prefix until the dots is checked.
@func(tWatchLink)
def _watch(twlink, zf, at, pinok, replyok):
# send watch request and check that we receive pins for in-cache blocks
# changed > at. Use timeout to detect wcfs replying less pins than expected.
"""
#
# XXX detect not sent pins with timeout, or better via ack'ing previous
# pins as they come in (not waiting for all of them) and then seeing that
# we did not received expeced pin when wcfs sends final ok?
ctx, cancel = with_timeout()
wg = sync.WorkGroup(ctx)
def _(ctx):
pinv = twlink._expectPin(ctx, zf, pinok)
tdelay() # increase probability to receive erroneous extra pins
for p in pinv:
p.reply(b"ack") # XXX -> return to caller?
# check that we don't get extra pins before reply to "watch"
try:
req = twlink.recvReq(ctx)
except Exception as e:
if e is context.canceled:
return # cancel is expected after seeing "ok"
reraise(e, None, e.__traceback__)
assert False, "extra pin message received: %r" % req.msg
wg.go(_)
def _(ctx):
reply = twlink.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at)))
if replyok.endswith('…'):
rok = replyok[:-len('…')]
assert reply[:len(rok)] == rok
else:
assert reply == replyok
# cancel _expectPin waiting upon receiving reply from wcfs
# -> error that missed pins were not received.
cancel()
wg.go(_)
wg.wait()
"""
def _(ctx, ev):
reply = twlink.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at)))
if replyok.endswith('…'):
......@@ -846,8 +806,8 @@ def _watch(twlink, zf, at, pinok, replyok):
assert reply[:len(rok)] == rok
else:
assert reply == replyok
doCheckingPin(_, {twlink: (zf, pinok)})
doCheckingPin(_, {twlink: (zf, pinok)})
twlink._watching[zf] = at
......@@ -858,15 +818,15 @@ def _watch(twlink, zf, at, pinok, replyok):
# pinokByWLink: {} tWatchLink -> (zf, {} blk -> at).
# pinfunc(wlink, foid, blk, at) | None. XXX foid -> ZBigFile?
#
# pinfunc is called after pin request is received from wcfs but before pin ack
# is replied back. pinfunc must not block.
# pinfunc is called after pin request is received from wcfs, but before pin ack
# is replied back. Pinfunc must not block.
def doCheckingPin(f, pinokByWLink, pinfunc=None): # -> []event(str)
# call f and check that we receive pins as specified.
# Use timeout to detect wcfs replying less pins than expected.
#
# XXX detect not sent pins with timeout, or better via ack'ing previous
# pins as they come in (not waiting for all of them) and then seeing that
# we did not received expeced pin when wcfs sends final ok?
# XXX detect not sent pins via ack'ing previous pins as they come in (not
# waiting for all of them) and then seeing that we did not received expeced
# pin when f completes?
ctx, cancel = with_timeout()
wg = sync.WorkGroup(ctx)
ev = []
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment