Commit 3e488c7a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 22cb4a88
...@@ -197,9 +197,9 @@ class tDB: ...@@ -197,9 +197,9 @@ class tDB:
t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead") t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead")
t._wc_zheadv = [] t._wc_zheadv = []
# when ZBigFile(s) blocks were last accessed via wcfs. # when head/ ZBigFile(s) blocks were last accessed via wcfs.
# this is updated only explicitly via ._blkaccess() . # this is updated only explicitly via ._blkheadaccess() .
t._accessed = {} # ZBigFile -> {} blk -> head(when accessed) t._blkAccessedAtHead = {} # ZBigFile -> {} blk -> head(when accessed)
# tracked opened tFiles & tWatches # tracked opened tFiles & tWatches
t._files = set() t._files = set()
...@@ -298,11 +298,14 @@ class tDB: ...@@ -298,11 +298,14 @@ class tDB:
# head/at = last txn of whole db # head/at = last txn of whole db
assert t._read("head/at") == h(t.head) assert t._read("head/at") == h(t.head)
# _blkaccess marks head/zf[blk] accessed. # _blkheadaccess marks head/zf[blk] accessed.
def _blkaccess(t, zf, blk): def _blkheadaccess(t, zf, blk):
# XXX locking? # XXX locking needed? or we do everything seriall?
zfAccessed = t._accessed.setdefault(zf, {}) t._headOfAccess(zf)[blk] = t.head
zfAccessed[blk] = t.head
# _headOfAccess returns {} describing when head/zf blocks were accessed.
def _headOfAccess(t, zf): # {} blk -> head-when-accessed
return t._blkAccessedAtHead.setdefault(zf, {})
# _path returns path for object on wcfs. # _path returns path for object on wcfs.
# - str: wcfs root + obj; # - str: wcfs root + obj;
...@@ -409,11 +412,15 @@ class tFile: ...@@ -409,11 +412,15 @@ class tFile:
t.f.close() t.f.close()
# _blk returns memoryview of file[blk]. # _blk returns memoryview of file[blk].
# when/if block memory is accessed, the user has to notify tDB with _blkaccess call. # when/if block memory is accessed, the user has to notify tFile with _blkaccess call.
def _blk(t, blk): def _blk(t, blk):
assert blk <= t._max_tracked_pages assert blk <= t._max_tracked_pages
return memoryview(t.fmmap[blk*t.blksize:(blk+1)*t.blksize]) return memoryview(t.fmmap[blk*t.blksize:(blk+1)*t.blksize])
def _blkaccess(t, blk):
if t.at is None: # notify tDB only for head/file access
t.tdb._blkheadaccess(t.zf, blk)
# cached returns [] with indicating whether a file block is cached or not. # cached returns [] with indicating whether a file block is cached or not.
# 1 - cached, 0 - not cached, fractional (0,1) - some pages of the block are cached some not. # 1 - cached, 0 - not cached, fractional (0,1) - some pages of the block are cached some not.
def cached(t): def cached(t):
...@@ -536,7 +543,7 @@ class tFile: ...@@ -536,7 +543,7 @@ class tFile:
def _(): def _():
b = read_nogil(blkview[0:1]) b = read_nogil(blkview[0:1])
have_read.send(b) have_read.send(b)
t.tdb._blkaccess(t.zf, blk) t._blkaccess(blk)
go(_) go(_)
_, _rx = select( _, _rx = select(
ctx.done().recv, # 0 ctx.done().recv, # 0
...@@ -864,13 +871,15 @@ def watch(twlink, zf, at, pinok=None): # XXX -> tWatch ? ...@@ -864,13 +871,15 @@ def watch(twlink, zf, at, pinok=None): # XXX -> tWatch ?
at_from = '(%s ->) ' % t.hat(at_prev) at_from = '(%s ->) ' % t.hat(at_prev)
print('\nC: setup watch f<%s> %s%s' % (h(zf._p_oid), at_from, t.hat(at))) print('\nC: setup watch f<%s> %s%s' % (h(zf._p_oid), at_from, t.hat(at)))
headOfAccess = t._headOfAccess(zf)
pin_prev = {} pin_prev = {}
if at_prev is not None: if at_prev is not None:
assert at_prev <= at, 'TODO %s -> %s' % (t.hat(at_prev), t.hat(at)) assert at_prev <= at, 'TODO %s -> %s' % (t.hat(at_prev), t.hat(at))
pin_prev = t._needPinAt(zf, at_prev) pin_prev = t._needPinAt(zf, at_prev) # XXX & headOfBlkAccess(blk) > w.at
assert w.pinned == pin_prev # XXX & blkHeadAccessedRev > w.at assert w.pinned == pin_prev
pin = t._needPinAt(zf, at) pin = t._needPinAt(zf, at) # XXX & headOfBlkAccess > w.at
if at_prev != at and at_prev is not None: if at_prev != at and at_prev is not None:
print('# pin@old: %s\n# pin@new: %s' % (t.hpin(pin_prev), t.hpin(pin))) print('# pin@old: %s\n# pin@new: %s' % (t.hpin(pin_prev), t.hpin(pin)))
...@@ -886,10 +895,21 @@ def watch(twlink, zf, at, pinok=None): # XXX -> tWatch ? ...@@ -886,10 +895,21 @@ def watch(twlink, zf, at, pinok=None): # XXX -> tWatch ?
# blk ∈ pin_prev, blk ∉ pin -> unpin to head # blk ∈ pin_prev, blk ∉ pin -> unpin to head
elif blk in pin_prev and blk not in pin: elif blk in pin_prev and blk not in pin:
# blk ∈ pin_prev means blk was already accessed with rev > at_prev
assert blk in headOfAccess
assert t._blkRevAt(blk, headOfAccess[blk]) > at_prev
# blk ∉ pin means XXX
assert headOfAccess[blk] >= t._blkRevAt(blk, t.head)
pin[blk] = None # @head pin[blk] = None # @head
# blk ∈ pin_prev, blk ∈ pin -> if rev different: use pin # blk ∈ pin_prev, blk ∈ pin -> if rev different: use pin
elif blk in pin_prev and blk in pin: elif blk in pin_prev and blk in pin:
# XXX headOfAccess assert
assert pin_prev[blk] <= pin[blk] assert pin_prev[blk] <= pin[blk]
if pin_prev[blk] == pin[blk]: if pin_prev[blk] == pin[blk]:
del pin[blk] # would need to pin to what it is already pinned del pin[blk] # would need to pin to what it is already pinned
...@@ -908,7 +928,7 @@ def watch(twlink, zf, at, pinok=None): # XXX -> tWatch ? ...@@ -908,7 +928,7 @@ def watch(twlink, zf, at, pinok=None): # XXX -> tWatch ?
twlink._watch(zf, at, pinok, "ok") twlink._watch(zf, at, pinok, "ok")
w.at = at w.at = at
assert w.pinned == t._needPinAt(zf, at) assert w.pinned == t._needPinAt(zf, at) # XXX & headOfBlkAccess
# stop_watch instructs wlink to stop watching the file. # stop_watch instructs wlink to stop watching the file.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment