Commit 9648f72f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e29f9384
...@@ -408,24 +408,28 @@ class tFile: ...@@ -408,24 +408,28 @@ class tFile:
# tWatch provides testing environment for /head/watch opened on wcfs. # tWatch provides testing environment for /head/watch opened on wcfs.
#
# .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages.
# .watch() setups a watch for a file and verifies ... XXX
class tWatch: class tWatch:
def __init__(t, tdb): def __init__(t, tdb):
t.tdb = tdb t.tdb = tdb
# open new head/watch handle. # head/watch handle.
# #
# python/stdio lock file object on read/write, however we need both # python/stdio lock file object on read/write, however we need both
# read and write to be working simultaneously. # read and write to be working simultaneously.
# -> we use 2 separate file objects for rx and tx. # -> use 2 separate file objects for rx and tx.
# #
# fdopen takes ownership of file descriptor and closes it when file # fdopen takes ownership of file descriptor and closes it when file
# object is closed -> we dup fd so that each file object has its own fd. # object is closed -> dup fd so that each file object has its own fd.
wh = os.open(tdb.path("head/watch"), os.O_RDWR) wh = os.open(tdb.path("head/watch"), os.O_RDWR)
wh2 = os.dup(wh) wh2 = os.dup(wh)
t.wrx = os.fdopen(wh, 'rb') t.wrx = os.fdopen(wh, 'rb')
t.wtx = os.fdopen(wh2, 'wb') t.wtx = os.fdopen(wh2, 'wb')
# inv.protocol message IO
t._acceptq = chan() # (stream, msg) server originated messages go here t._acceptq = chan() # (stream, msg) server originated messages go here
t._rxmu = threading.Lock() t._rxmu = threading.Lock()
t._rxtab = {} # stream -> rxq server replies go via here t._rxtab = {} # stream -> rxq server replies go via here
...@@ -437,6 +441,9 @@ class tWatch: ...@@ -437,6 +441,9 @@ class tWatch:
t._serveWG = sync.WorkGroup(serveCtx) t._serveWG = sync.WorkGroup(serveCtx)
t._serveWG.go(t._serveRecv) t._serveWG.go(t._serveRecv)
# this tWatch currently watches files at particular state.
t._watching = {} # {} ZBigFile -> @at
tdb._tracked.add(t) tdb._tracked.add(t)
def close(t): def close(t):
...@@ -469,6 +476,9 @@ class tWatch: ...@@ -469,6 +476,9 @@ class tWatch:
for rxq in rxtab.values(): for rxq in rxtab.values():
rxq.close() rxq.close()
# ---- message IO ----
# _serveRecv receives messages from .w and dispatches them according to streamID. # _serveRecv receives messages from .w and dispatches them according to streamID.
@func @func
def _serveRecv(t, ctx): def _serveRecv(t, ctx):
...@@ -584,7 +594,6 @@ class tWatch: ...@@ -584,7 +594,6 @@ class tWatch:
return reqv return reqv
# tSrvReq represents 1 server-initiated wcfs request received over /head/watch. # tSrvReq represents 1 server-initiated wcfs request received over /head/watch.
class tSrvReq: class tSrvReq:
def __init__(req, twatch, stream, msg): def __init__(req, twatch, stream, msg):
...@@ -607,6 +616,61 @@ class tSrvReq: ...@@ -607,6 +616,61 @@ class tSrvReq:
# XXX also track as answered? (and don't accept with the same ID ?) # XXX also track as answered? (and don't accept with the same ID ?)
# ---- watch setup/adjust ----
# watch sets up a watch for file@at.
# XXX and verifies that wcfs sends correct initial pins?
# XXX or adjusts
@func(tWatch)
def watch(w, zf, at): # XXX -> ?
at_prev = w._watching.get(zf) # we were previously watching zf @at_prev
if at_prev is not None:
assert False # TODO
t = w.tdb
print('\nC: setup watch f<%s> %s' % (h(zf._p_oid), t.hat(at)))
# all changes to zf
vdf = [_.byfile[zf] for _ in t.dFtail if zf in _.byfile]
# {} blk -> at that have to be pinned
# XXX also check that head/file[blk] is in cache - else no need to pin
pinok = {}
for df in [_ for _ in vdf if _.rev > at]:
for blk in df.ddata:
if blk in pinok:
continue
# history of blk changes <= at
# XXX -> > at_prev?
blkhistoryat = [_.rev for _ in vdf if blk in _.ddata and _.rev <= at]
if len(blkhistoryat) == 0:
pinrev = t._headv[0] # was hole - at0 XXX -> pin to @00?
else:
pinrev = max(blkhistoryat)
pinok[blk] = pinrev
pinokv = ['%d: %s' % (blk, t.hat(pinok[blk])) for blk in sorted(pinok.keys())]
print('# pinok: {%s}' % ', '.join(pinokv))
# send watch request and check that we receive pins for in-cache blocks
# changed > at.
ctx, cancel = context.with_cancel(context.background())
wg = sync.WorkGroup(ctx)
def _(ctx):
pinv = w.expectPin(ctx, zf, pinok)
for p in pinv:
p.reply(b"ack") # XXX -> return to caller?
wg.go(_)
def _(ctx):
assert w.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at))) == "ok"
# cancel expectPin waiting upon receiving "ok" from wcfs
# -> error that missed pins were not received.
cancel()
wg.go(_)
wg.wait()
# test_wcfs exercises wcfs functionality. # test_wcfs exercises wcfs functionality.
@func @func
...@@ -688,8 +752,8 @@ def test_wcfs(): ...@@ -688,8 +752,8 @@ def test_wcfs():
# >>> invalidation protocol # >>> invalidation protocol
print('\n\n inv. protocol \n\n') print('\n\n inv. protocol \n\n')
bg = context.background()
"""
# checkSetupWatch verifies setting up new watch for zf@at. # checkSetupWatch verifies setting up new watch for zf@at.
def checkSetupWatch(zf, at): def checkSetupWatch(zf, at):
print('\nC: check setup watch f<%s> %s' % (h(zf._p_oid), t.hat(at))) print('\nC: check setup watch f<%s> %s' % (h(zf._p_oid), t.hat(at)))
...@@ -735,10 +799,13 @@ def test_wcfs(): ...@@ -735,10 +799,13 @@ def test_wcfs():
wg.go(_) wg.go(_)
wg.wait() wg.wait()
"""
for zf in t.zfiles(): for zf in t.zfiles():
for dF in t.dFtail: for dF in t.dFtail:
checkSetupWatch(zf, dF.rev) w = t.openwatch()
w.watch(zf, dF.rev)
#checkSetupWatch(zf, dF.rev)
print() print()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment