Commit 60df26bd authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0e21bfff
......@@ -178,10 +178,14 @@ class tDB:
# committed: (tail, head] + δF history
t.tail = t.root._p_jar.db().storage.lastTransaction()
t.head = None
t._headv = [] # XXX -> just use dFtail[·].rev ?
t.head = None # XXX -> property = dFtail[-1].rev ?
t._headv = [] # XXX -> just use dFtail[·].rev ?
t.dFtail = [] # of DF
# when ZBigFile(s) blocks were last accessed via wcfs.
# this is updated only explicitly via ._blkaccess() .
t._accessed = {} # ZBigFile -> {} blk -> head(when accessed)
# fh(.wcfs/zhead) + history of zhead read from there
t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead")
t._wc_zheadv = []
......@@ -267,6 +271,12 @@ class tDB:
return head
# _blkaccess marks head/zf[blk] accessed.
def _blkaccess(t, zf, blk):
# XXX locking
zfAccessed = t._accessed.setdefault(zf, {})
zfAccessed[blk] = t.head
# _wcsync makes sure wcfs is synchronized to latest committed transaction.
def _wcsync(t):
while len(t._wc_zheadv) < len(t._headv):
......@@ -361,10 +371,24 @@ class tDB:
_, rev = t._blkData(zf, blk, at)
return rev
# XXX vvv -> not what we need, think again
# _blkHeadAccessed returns whether block state corresponding to zf[blk] at
# current head was accessed.
#
# for example - if head/<zf>[blk] was accessed and later changed, the answer is "no".
# buf if head/<zf>[blk] was accessed again after change, and was no longer
# changed, the answer is "yes"
# XXX text
"""
def _blkHeadAccessed(t, zf, blk):
zfAccessed = t._accessed.get(zf, {})
zfAccessed.get(blk) vs t._blkRev(zf, blk, t.head)
"""
# tFile provides testing environment for one bigfile on wcfs.
#
# .blk() provides access to data of a block. .cached() gives state of which
# ._blk() provides access to data of a block. .cached() gives state of which
# blocks are in OS pagecache. .assertCache and .assertBlk/.assertData assert
# on state of cache and data.
class tFile:
......@@ -437,8 +461,8 @@ class tFile:
mm.unmap(t.fmmap)
t.f.close()
# blk returns memoryview of file[blk].
def blk(t, blk):
# _blk returns memoryview of file[blk].
def _blk(t, blk):
assert blk <= t._max_tracked_pages
return memoryview(t.fmmap[blk*t.blksize:(blk+1)*t.blksize])
......@@ -544,7 +568,7 @@ class tFile:
for wlink, pinok in pinokByWLink.items():
pinokByWLink[wlink] = (t.zf, pinok)
blkview = t.blk(blk)
blkview = t._blk(blk)
assert t.cached()[blk] == cached
def _(ctx, ev):
......@@ -559,6 +583,7 @@ class tFile:
def _():
b = read0_nogil(blkview)
have_read.send(b)
t.tdb._blkaccess(t.zf, blk)
go(_)
_, _rx = select(
ctx.done().recv, # 0
......@@ -863,7 +888,7 @@ class tSrvReq:
# _pinAt returns which blocks needs to be pinned for zf@at.
#
# it does not take into account whether blocks are in cache or not and computes
# pin from all changes. XXX is it desired behaviour?
# pins for all changes. XXX is it desired behaviour?
@func(tWatchLink)
def _pinAt(twlink, zf, at): # -> pin = {} blk -> rev
t = twlink.tdb
......@@ -926,7 +951,7 @@ def watch(twlink, zf, at, pinok=None): # XXX -> ?
if at_prev is not None:
assert at_prev <= at, 'TODO %s -> %s' % (t.hat(at_prev), t.hat(at))
pin_prev = twlink._pinAt(zf, at_prev)
assert w.pinned == pin_prev
assert w.pinned == pin_prev # XXX & blkHeadAccessedRev > w.at
pin = twlink._pinAt(zf, at)
......@@ -1237,6 +1262,7 @@ def test_wcfs():
wl.watch(zf, at1, {2: at1, 3: at0}) # -> at1 (new watch) XXX at0 -> ø (blk3 was hole)?
wl.watch(zf, at2, {2: at2, 3: None}) # at1 -> at2
wl.watch(zf, at3, {2: None}) # at2 -> at3 (current head)
# XXX + .watch in presence !accessed & changed [blk]
wl.close()
# all valid watch setup/update requests going at_i -> at_j -> ... with automatic pinok
......@@ -1256,6 +1282,9 @@ def test_wcfs():
# watched + commit -> read -> receive pin messages.
# read vs pin ordering is checked by assertBlk.
#
# 5(f) is kept not accessed to check later how wcfs.go handles δFtail
# rebuild after it sees not yet accessed ZBlk that has change history.
wl3 = t.openwatch(); wl3.watch(zf, at3); assert at3 == t.head
assert set(wl3._watching.keys()) == {zf}
w3 = wl3._watching[zf]
......@@ -1301,8 +1330,7 @@ def test_wcfs():
f.assertBlk(4, '', {wl3: {}, wl3_: {}, wl2: {}})
w_assertPin( {2:at3}, {2:at3}, {2:at2})
# 5(f) is kept unaccessed to test how wcfs.go handles δFtail rebuild after
# it sees not yet accessed ZBlk that has change history.
# 5(f) is kept unaccessed (see ^^^)
assert f.cached()[5] == 0
# f.assertBlk(5, 'f4', {wl3: {5:at0}, wl3_: {5:at0}, wl2: {5:at0}}) # XXX at0->ø?
# w_assertPin( {2:at3, 5:at0}, {2:at3, 5:at0}, {2:at2, 5:at0})
......@@ -1311,10 +1339,10 @@ def test_wcfs():
w_assertPin( {2:at3, 6:at0}, {2:at3, 6:at0}, {2:at2, 6:at0})
# commit again:
# - block is already pinned (#2) -> wl3 not notified
# - 2(c) is already pinned -> wl3 not notified
# - watch stopped (wl3_) -> watch no longer notified
# - wlink closed (wl2) -> watch no longer notified
# XXX about 5
# - 5(f) is still kept unaccessed (see ^^^)
f.assertCache([1,1,1,1,1,0,1])
t.change(zf, {2: 'c5', 3: 'd5', 5: 'f5'})
at5 = t.commit()
......@@ -1342,8 +1370,7 @@ def test_wcfs():
f.assertBlk(4, '', {wl3: {}, wl3_: {}})
w_assertPin( {2:at3, 3:at2, 6:at0})
# 5(f) is kept still unaccessed, so that when it will be accessed
# corresponding ZBlk will have change history with len > 1.
# 5(f) is kept still unaccessed (see ^^^)
assert f.cached()[5] == 0
# f.assertBlk(5, 'f4', {wl3: {}, wl3_: {}})
# w_assertPin( {2:at3, 3:at2, 5:at0})
......@@ -1351,13 +1378,15 @@ def test_wcfs():
f.assertBlk(6, 'g4', {wl3: {}, wl3_: {}})
w_assertPin( {2:at3, 3:at2, 6:at0})
return
# advance watch - receives new pins/unpins to @head.
# this is also tested ^^^ in `at_i -> at_j -> ...` watch setup/adjust.
wl3.watch(zf, at4, {2:at4, 5:None}) # at3 -> at4
# NOTE 5(f) is not affected because it was not pinned previously.
wl3.watch(zf, at4, {2:at4, 6:None}) # at3 -> at4
w_assertPin( {2:at4, 3:at2})
return
# XXX access 5(f) -> wl3 should be correctly pinned <- !!!
wl3.watch(zf, at5, {2:None, 3:None}) # at4 -> at5
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment