Commit 4a46d011 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9648f72f
......@@ -569,31 +569,6 @@ class tWatch:
return tSrvReq(t, stream, msg)
# expectPin asserts that wcfs sends expected pin messages.
#
# expect is {} blk -> at
# returns [] of received pin requests.
def expectPin(t, ctx, zf, expect):
expected = set() # of expected pin messages
for blk, at in expect.items():
msg = b"pin %s #%d @%s" % (h(zf._p_oid), blk, h(at))
assert msg not in expected
expected.add(msg)
reqv = [] # of received requests
while len(expected) > 0:
try:
req = t.recvReq(ctx)
except Exception as e:
raise RuntimeError("%s\nnot all pin missages received - pending:\n%s" % (e, expected))
assert req is not None # channel not closed
assert req.msg in expected
expected.remove(req.msg)
reqv.append(req)
return reqv
# tSrvReq represents 1 server-initiated wcfs request received over /head/watch.
class tSrvReq:
def __init__(req, twatch, stream, msg):
......@@ -658,19 +633,45 @@ def watch(w, zf, at): # XXX -> ?
ctx, cancel = context.with_cancel(context.background())
wg = sync.WorkGroup(ctx)
def _(ctx):
pinv = w.expectPin(ctx, zf, pinok)
pinv = w._expectPin(ctx, zf, pinok)
for p in pinv:
p.reply(b"ack") # XXX -> return to caller?
wg.go(_)
def _(ctx):
assert w.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at))) == "ok"
# cancel expectPin waiting upon receiving "ok" from wcfs
# cancel _expectPin waiting upon receiving "ok" from wcfs
# -> error that missed pins were not received.
cancel()
wg.go(_)
wg.wait()
# _expectPin asserts that wcfs sends expected pin messages.
#
# expect is {} blk -> at
# returns [] of received pin requests.
@func(tWatch)
def _expectPin(w, ctx, zf, expect):
expected = set() # of expected pin messages
for blk, at in expect.items():
msg = b"pin %s #%d @%s" % (h(zf._p_oid), blk, h(at))
assert msg not in expected
expected.add(msg)
reqv = [] # of received requests
while len(expected) > 0:
try:
req = w.recvReq(ctx)
except Exception as e:
raise RuntimeError("%s\nnot all pin missages received - pending:\n%s" % (e, expected))
assert req is not None # channel not closed
assert req.msg in expected
expected.remove(req.msg)
reqv.append(req)
return reqv
# test_wcfs exercises wcfs functionality.
@func
......@@ -753,59 +754,10 @@ def test_wcfs():
# >>> invalidation protocol
print('\n\n inv. protocol \n\n')
"""
# checkSetupWatch verifies setting up new watch for zf@at.
def checkSetupWatch(zf, at):
print('\nC: check setup watch f<%s> %s' % (h(zf._p_oid), t.hat(at)))
# all changes to zf
vdf = [_.byfile[zf] for _ in t.dFtail if zf in _.byfile]
# {} blk -> at that have to be pinned
# XXX also check that head/file[blk] is in cache - else no need to pin
pinok = {}
for df in [_ for _ in vdf if _.rev > at]:
for blk in df.ddata:
if blk in pinok:
continue
# history of blk changes <= at
blkhistoryat = [_.rev for _ in vdf if blk in _.ddata and _.rev <= at]
if len(blkhistoryat) == 0:
pinrev = at0 # was hole XXX -> pin to @00?
else:
pinrev = max(blkhistoryat)
pinok[blk] = pinrev
pinokv = ['%d: %s' % (blk, t.hat(pinok[blk])) for blk in sorted(pinok.keys())]
print('# pinok: {%s}' % ', '.join(pinokv))
# open watch, send watch request and check that we receive pins for
# in-cache blocks changed > at.
w = t.openwatch()
# defer(w.close) XXX close here, or checkSetupWatch -> tDB.topenwatch(zf, at) ?
ctx, cancel = context.with_cancel(bg)
wg = sync.WorkGroup(ctx)
def _(ctx):
pinv = w.expectPin(ctx, zf, pinok)
for p in pinv:
p.reply(b"ack")
wg.go(_)
def _(ctx):
assert w.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at))) == "ok"
# cancel expectPin waiting upon receiving "ok" from wcfs
# -> error that missed pins were not received.
cancel()
wg.go(_)
wg.wait()
"""
for zf in t.zfiles():
for dF in t.dFtail:
w = t.openwatch()
w.watch(zf, dF.rev)
#checkSetupWatch(zf, dF.rev)
print()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment