Commit 0a125d3c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 728c8e4f
......@@ -509,22 +509,22 @@ class tFile:
defer(_)
cached = t.cached()[blk]
assert cached in (0, 1)
shouldPin = False # whether at least one wlink should receive a pin
assert cached in (0, 1) # every check access a block in full
shouldPin = False # whether at least one wlink should receive a pin
# watches must be notified if access goes to @head/file; not if to @rev/file. XXX text
# watches that must be notified if access goes to @head/file
wpin = {} # tWatchLink -> pinok
for wlink in t.tdb._wlinks:
pinok = {}
if t.at is None: # @head/...
wat = wlink._watching.get(t.zf)
if wat is not None and wat < blkrev:
w = wlink._watching.get(t.zf)
if w is not None and w.at < blkrev:
if cached == 1:
# XXX assert blk already pinned on that watch
pass
else:
# XXX and watch not already pinned on the watch
pinok = {blk: t.tdb._blkRev(t.zf, blk, wat)}
pinok = {blk: t.tdb._blkRev(t.zf, blk, w.at)}
shouldPin = True
wpin[wlink] = pinok
......@@ -603,6 +603,12 @@ class tFile:
t.assertCache([1]*len(dataokv))
# tWatch represents watch for one file setup on a tWatchLink.
class tWatch:
def __init__(w):
w.at = None # None only initially - always concrete after setup
w.pinned = {} # blk -> rev
# tWatchLink provides testing environment for /head/watch link opened on wcfs.
#
# .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages.
......@@ -640,8 +646,8 @@ class tWatchLink:
t._serveWG = sync.WorkGroup(serveCtx)
t._serveWG.go(t._serveRX)
# this tWatchLink currently watches files at particular state.
t._watching = {} # {} ZBigFile -> @at
# this tWatchLink currently watches the following files at particular state.
t._watching = {} # {} ZBigFile -> tWatch
tdb._wlinks.add(t)
......@@ -868,7 +874,11 @@ def _pinAt(twlink, zf, at): # -> pin = {} blk -> rev
@func(tWatchLink)
def watch(twlink, zf, at, pinok=None): # XXX -> ?
t = twlink.tdb
at_prev = twlink._watching.get(zf) # we were previously watching zf @at_prev
w = twlink._watching.get(zf)
if w is None:
w = tWatch()
at_prev = w.at # we were previously watching zf @at_prev
at_from = ''
if at_prev is not None:
at_from = '(%s ->) ' % t.hat(at_prev)
......@@ -927,6 +937,10 @@ def watch(twlink, zf, at, pinok=None): # XXX -> ?
# changed > at. FIXME "in-cache" is currently not handled
twlink._watch(zf, at, pinok, "ok")
w.at = at
# XXX update pinned
twlink._watching[zf] = w
# _watch sends watch request for zf@at, expects initial pins specified by pinok and final reply.
#
# pinok: {} blk -> at that have to be pinned.
......@@ -942,7 +956,6 @@ def _watch(twlink, zf, at, pinok, replyok):
assert reply == replyok
doCheckingPin(_, {twlink: (zf, pinok)})
twlink._watching[zf] = at
# doCheckingPin calls f and verifies that wcfs sends expected pins during the
......@@ -1159,36 +1172,36 @@ def test_wcfs():
"""
# XXX move before setup watch?
print('\n\n\n\nWATCH+COMMIT\n\n\n\n')
print('\n\n\n\nWATCH+COMMIT\n\n')
# watched + commit -> read -> receive pin messages; read is stuck until pins are acknowledged
wl = t.openwatch()
wl.watch(zf, at3); assert at3 == t.head
wl1 = t.openwatch()
wl2 = t.openwatch()
wl1.watch(zf, at3); assert at3 == t.head
wl2.watch(zf, at2)
f.assertCache([1,1,1,1])
t.change(zf, { 2: '4c', 5: '4f'}) # FIXME + 4a after δbtree works
at4 = t.commit()
f.assertCache([1,1,0,1,0,0]) # FIXME a must be invalidated - see δbtree ^^^
t.dump_history()
return
f.assertBlk(0, '', {wl: {}}) # XXX 0, {0, at3} after δbtree works
f.assertBlk(1, '', {wl: {}})
f.assertBlk(2, '4c', {wl: {2: at3}})
f.assertBlk(0, '', {wl1: {}, wl2: {}}) # XXX + {0, at3} after δbtree works
f.assertBlk(1, '', {wl1: {}, wl2: {}})
f.assertBlk(2, '4c', {wl1: {2: at3}, wl2: {}})
# blk4 is hole @head - the same as at earlier db view - not pinned
# XXX or do not allow hole past .size ?
f.assertBlk(4, '', {wl: {}})
f.assertBlk(5, '4f', {wl: {5: at0}}) # XXX at0 -> ø XXX also triggers access to #4 ?
f.assertBlk(4, '', {wl1: {}, wl2: {}})
f.assertBlk(5, '4f', {wl1: {5: at0}, wl2: {5: at0}}) # XXX at0 -> ø XXX also triggers access to #4 ?
# XXX commit again, but block is already pinned - not notified
# XXX wlink close -> watch no longer notified
# XXX 2 opened watchs for 1 file at the same time
# XXX watch with at="-" -> watch no longer notified
# XXX 2 (or more) opened watch for 1 file at the same time
# XXX watch for 2 files via single watch open
wl.close()
wl1.close()
wl2.close() # XXX temp
# XXX commit after current file size -> watch
......@@ -1203,6 +1216,8 @@ def test_wcfs():
# XXX going not only up, but also down at1 <- at2 <- at3 ? -> forbid?
# XXX watch for 2 files via single watch open
# XXX watch with @at > head - must wait for head to become >= at
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment