Commit 8457a6a9 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c00d94c7
......@@ -172,7 +172,7 @@ class DFile:
# tDB provides database/wcfs testing environment.
#
# BigFiles opened under tDB are represented as tFile - see .open for details.
# Watches opened under tDB are represented as tWatch - see .openwatch for details.
# Watches opened under tDB are represented as tWatchLink - see .openwatch for details.
#
# XXX .open -> .topen
# XXX .openwatch -> .topenwatch ?
......@@ -213,9 +213,9 @@ class tDB:
return tFile(t, zf, at=at)
# openwatch opens /head/watch on wcfs.
# see returned tWatch for details.
def openwatch(t): # -> tWatch
return tWatch(t)
# see returned tWatchLink for details.
def openwatch(t): # -> tWatchLink
return tWatchLink(t)
# change schedules zf to be changed according changeDelta at commit.
#
......@@ -338,7 +338,7 @@ class tDB:
# iter_revv iterates through all possible at_i -> at_j -> at_k ... sequences.
# at_i < at_j
# XXX all sequences go till head.
# NOTE all sequences go till head.
def iter_revv(t, start=z64, level=0):
dFtail = [_ for _ in t.dFtail if _.rev > start]
#print(' '*level, 'iter_revv', t.hat(start), [t.hat(_.rev) for _ in dFtail])
......@@ -455,13 +455,11 @@ class tFile:
t.assertCache([1]*len(datav))
# tWatch provides testing environment for /head/watch link opened on wcfs.
# tWatchLink provides testing environment for /head/watch link opened on wcfs.
#
# .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages.
# .watch() setups a watch for a file and verifies ... XXX
#
# XXX -> tWatchLink ?
class tWatch:
class tWatchLink:
def __init__(t, tdb):
t.tdb = tdb
......@@ -489,9 +487,9 @@ class tWatch:
serveCtx, t._serveCancel = context.with_cancel(context.background())
t._serveWG = sync.WorkGroup(serveCtx)
t._serveWG.go(t._serveRecv)
t._serveWG.go(t._serveRX)
# this tWatch currently watches files at particular state.
# this tWatchLink currently watches files at particular state.
t._watching = {} # {} ZBigFile -> @at
tdb._tracked.add(t)
......@@ -501,7 +499,7 @@ class tWatch:
t._serveCancel()
# ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up
# _serveRecv on client (= on us).
# _serveRX on client (= on us).
t._send(1, b'bye')
# XXX we can get stuck here if wcfs does not behave as we want.
# XXX in particular if there is a silly - e.g. syntax or type error in
......@@ -521,9 +519,9 @@ class tWatch:
# ---- message IO ----
# _serveRecv receives messages from .w and dispatches them according to streamID.
# _serveRX receives messages from .wrx and dispatches them according to streamID.
@func
def _serveRecv(t, ctx):
def _serveRX(t, ctx):
# when finishing - wakeup everyone waiting for rx
def _():
t._acceptq.close()
......@@ -621,16 +619,16 @@ class tWatch:
return tSrvReq(t, stream, msg)
# tSrvReq represents 1 server-initiated wcfs request received over /head/watch.
# tSrvReq represents 1 server-initiated wcfs request received over /head/watch link.
class tSrvReq:
def __init__(req, twatch, stream, msg):
req.twatch = twatch
def __init__(req, twlink, stream, msg):
req.twlink = twlink
req.stream = stream
req.msg = msg
def reply(req, answer):
#print('C: reply %s <- %r ...' % (req, answer))
t = req.twatch
t = req.twlink
with t._rxmu:
assert req.stream in t._accepted
......@@ -648,10 +646,10 @@ class tSrvReq:
# _pinAt returns which blocks needs to be pinned for zf@at.
#
# it does not take into account whether blocks are in cache or not and computes
# pin from all changes. XXX desired behaviour?
@func(tWatch)
def _pinAt(w, zf, at): # -> pin = {} blk -> rev
t = w.tdb
# pin from all changes. XXX is it desired behaviour?
@func(tWatchLink)
def _pinAt(twlink, zf, at): # -> pin = {} blk -> rev
t = twlink.tdb
# all changes to zf
vdf = [_.byfile[zf] for _ in t.dFtail if zf in _.byfile]
......@@ -676,10 +674,10 @@ def _pinAt(w, zf, at): # -> pin = {} blk -> rev
# watch sets up a watch for file@at.
# XXX and verifies that wcfs sends correct initial pins?
# XXX or adjusts
@func(tWatch)
def watch(w, zf, at): # XXX -> ?
t = w.tdb
at_prev = w._watching.get(zf) # we were previously watching zf @at_prev
@func(tWatchLink)
def watch(twlink, zf, at): # XXX -> ?
t = twlink.tdb
at_prev = twlink._watching.get(zf) # we were previously watching zf @at_prev
at_from = ''
if at_prev is not None:
at_from = '(%s ->) ' % t.hat(at_prev)
......@@ -700,9 +698,9 @@ def watch(w, zf, at): # XXX -> ?
pin_prev = {}
if at_prev is not None:
assert at_prev <= at, 'TODO %s -> %s' % (t.hat(at_prev), t.hat(at))
pin_prev = w._pinAt(zf, at_prev)
pin_prev = twlink._pinAt(zf, at_prev)
pin = w._pinAt(zf, at)
pin = twlink._pinAt(zf, at)
if at_prev != at and at_prev is not None:
print('# pin@old: %s\n# pin@new: %s' % (pinstr(pin_prev), pinstr(pin)))
......@@ -743,12 +741,12 @@ def watch(w, zf, at): # XXX -> ?
wg = sync.WorkGroup(ctx)
def _(ctx):
pinv = w._expectPin(ctx, zf, pinok)
pinv = twlink._expectPin(ctx, zf, pinok)
for p in pinv:
p.reply(b"ack") # XXX -> return to caller?
# check that we don't get extra pins before "ok" reply to "watch"
try:
req = w.recvReq(ctx)
req = twlink.recvReq(ctx)
except Exception as e:
if e is context.canceled:
return # cancel is expected after seeing "ok"
......@@ -758,21 +756,21 @@ def watch(w, zf, at): # XXX -> ?
wg.go(_)
def _(ctx):
assert w.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at))) == "ok"
assert twlink.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at))) == "ok"
# cancel _expectPin waiting upon receiving "ok" from wcfs
# -> error that missed pins were not received.
cancel()
wg.go(_)
wg.wait()
w._watching[zf] = at
twlink._watching[zf] = at
# _expectPin asserts that wcfs sends expected pin messages.
#
# expect is {} blk -> at
# returns [] of received pin requests.
@func(tWatch)
def _expectPin(w, ctx, zf, expect):
@func(tWatchLink)
def _expectPin(twlink, ctx, zf, expect):
expected = set() # of expected pin messages
for blk, at in expect.items():
hat = h(at) if at is not None else 'head'
......@@ -783,7 +781,7 @@ def _expectPin(w, ctx, zf, expect):
reqv = [] # of received requests
while len(expected) > 0:
try:
req = w.recvReq(ctx)
req = twlink.recvReq(ctx)
except Exception as e:
raise RuntimeError("%s\nnot all pin missages received - pending:\n%s" % (e, expected))
assert req is not None # channel not closed
......@@ -877,26 +875,23 @@ def test_wcfs():
# XXX invalid requests -> wcfs replies error
# XXX -> separate test?
w = t.openwatch()
print('\n\nzzzzzzz\n\n')
assert w.sendReq(context.background(), b'bla bla') == ""
wl = t.openwatch()
assert wl.sendReq(context.background(), b'bla bla') == ""
# assert w closed
print('\n\n0000000\n\n')
w.close()
print('\n\n1111111\n\n')
# assert wl closed
wl.close()
for zf in t.zfiles():
# watch going at_i -> at_j -> ...
for revv in t.iter_revv():
print('\n--------')
#print(' -> '.join([t.hat(_) for _ in revv]))
w = t.openwatch()
w.watch(zf, revv[0])
w.watch(zf, revv[0]) # verify at_i -> at_i
wl = t.openwatch()
wl.watch(zf, revv[0])
wl.watch(zf, revv[0]) # verify at_i -> at_i
for at in revv[1:]:
w.watch(zf, at)
w.close()
wl.watch(zf, at)
wl.close()
print()
......@@ -941,22 +936,6 @@ def test_wcfs():
# setupWatch must send pins.
@func
def test_wcfs_invproto():
# XXX temp debug
import sys, traceback
def _():
print('BBB')
1/0
xdefer(_)
#raise RuntimeError('zzz')
print('\nAAA')
assert 1 == 2
# ---- misc ---
# readfile reads file @ path.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment