Commit c1a59460 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 622e0a43
...@@ -210,14 +210,18 @@ class tDB: ...@@ -210,14 +210,18 @@ class tDB:
t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead") t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead")
t._wc_zheadv = [] t._wc_zheadv = []
# tracked tFiles & tWatches # tracked opened tFiles & tWatches
t._tracked = set() t._files = set()
t._wlinks = set()
# close closes test database as well as all tracked files, watches and wcfs. # close closes test database as well as all tracked files, watch links and wcfs.
def close(t): def close(t):
for tf in t._tracked.copy(): for tf in t._files.copy():
tf.close() tf.close()
assert len(t._tracked) == 0 for tw in t._wlinks.copy():
tw.close()
assert len(t._files) == 0
assert len(t._wlinks) == 0
t._wc_zheadfh.close() t._wc_zheadfh.close()
t.wc.close() t.wc.close()
dbclose(t.root) dbclose(t.root)
...@@ -333,6 +337,8 @@ class tDB: ...@@ -333,6 +337,8 @@ class tDB:
return open(path, mode, 0) # unbuffered return open(path, mode, 0) # unbuffered
# XXX vvv misc -> tail?
# hat returns string for at. # hat returns string for at.
# it gives both symbolic version and raw hex for at, for example: # it gives both symbolic version and raw hex for at, for example:
# @at2 (03cf7850500b5f66) # @at2 (03cf7850500b5f66)
...@@ -370,7 +376,9 @@ class tDB: ...@@ -370,7 +376,9 @@ class tDB:
#print(' '*level, 'zzz', tail) #print(' '*level, 'zzz', tail)
yield ([dF.rev] + tail) yield ([dF.rev] + tail)
# _blkData returns expected blk data and revision for @at database view. # _blkData returns expected zf[blk] data and revision as of @at database state.
# XXX ret for blk does not exists?
# XXX kill at=None?
def _blkData(t, zf, blk, at=None): # -> (data, rev) def _blkData(t, zf, blk, at=None): # -> (data, rev)
if at is None: if at is None:
at = t.head at = t.head
...@@ -394,6 +402,10 @@ class tDB: ...@@ -394,6 +402,10 @@ class tDB:
return data, rev return data, rev
# _blkRev returns expected zf[blk] revision as of @at database state?
def _blkRev(t, zf, blk, at): # -> rev
_, rev = t._blkData(zf, blk, at)
return rev
...@@ -406,7 +418,7 @@ class tDB: ...@@ -406,7 +418,7 @@ class tDB:
class tFile: class tFile:
# maximum number of pages we mmap for 1 file. # maximum number of pages we mmap for 1 file.
# this should be not big not to exceed mlock limit. # this should be not big not to exceed mlock limit.
_max_tracked = 8 _max_tracked_pages = 8
def __init__(t, tdb, zf, at=None): def __init__(t, tdb, zf, at=None):
assert isinstance(zf, ZBigFile) assert isinstance(zf, ZBigFile)
...@@ -416,24 +428,24 @@ class tFile: ...@@ -416,24 +428,24 @@ class tFile:
t.f = tdb._open(zf, at=at) t.f = tdb._open(zf, at=at)
t.blksize = zf.blksize t.blksize = zf.blksize
# mmap the file past the end up to _max_tracked pages and lock the # mmap the file past the end up to _max_tracked_pages and lock the
# pages with MLOCK_ONFAULT. This way when a page is read by mmap access # pages with MLOCK_ONFAULT. This way when a page is read by mmap access
# we have the guarantee from kernel that the page will stay in # we have the guarantee from kernel that the page will stay in
# pagecache. We rely on this to verify OS cache state. # pagecache. We rely on this to verify OS cache state.
assert t.blksize % mm.PAGE_SIZE == 0 assert t.blksize % mm.PAGE_SIZE == 0
t.fmmap = mm.map_ro(t.f.fileno(), 0, t._max_tracked*t.blksize) t.fmmap = mm.map_ro(t.f.fileno(), 0, t._max_tracked_pages*t.blksize)
mm.lock(t.fmmap, mm.MLOCK_ONFAULT) mm.lock(t.fmmap, mm.MLOCK_ONFAULT)
tdb._tracked.add(t) tdb._files.add(t)
def close(t): def close(t):
t.tdb._tracked.remove(t) t.tdb._files.remove(t)
mm.unmap(t.fmmap) mm.unmap(t.fmmap)
t.f.close() t.f.close()
# blk returns memoryview of file[blk]. # blk returns memoryview of file[blk].
def blk(t, blk): def blk(t, blk):
assert blk <= t._max_tracked assert blk <= t._max_tracked_pages
return memoryview(t.fmmap[blk*t.blksize:(blk+1)*t.blksize]) return memoryview(t.fmmap[blk*t.blksize:(blk+1)*t.blksize])
# cached returns [] with indicating whether a file block is cached or not. # cached returns [] with indicating whether a file block is cached or not.
...@@ -458,7 +470,7 @@ class tFile: ...@@ -458,7 +470,7 @@ class tFile:
def _sizeinblk(t): def _sizeinblk(t):
st = os.fstat(t.f.fileno()) st = os.fstat(t.f.fileno())
assert st.st_size % t.blksize == 0 assert st.st_size % t.blksize == 0
assert st.st_size // t.blksize <= t._max_tracked assert st.st_size // t.blksize <= t._max_tracked_pages
return st.st_size // t.blksize return st.st_size // t.blksize
# assertCache asserts state of OS cache for file. # assertCache asserts state of OS cache for file.
...@@ -471,15 +483,35 @@ class tFile: ...@@ -471,15 +483,35 @@ class tFile:
# #
# Expected data may be given with size < t.blksize. In such case the data # Expected data may be given with size < t.blksize. In such case the data
# is implicitly appended with trailing zeros. Data can be both bytes and unicode. # is implicitly appended with trailing zeros. Data can be both bytes and unicode.
def assertBlk(t, blk, data): #
# pinokByWLink: {} tWatchLink -> (zf, {} blk -> at).
# pinokByWLink can be None - in that case it is computed automatically.
def assertBlk(t, blk, data, pinokByWLink=None):
if not isinstance(data, bytes): if not isinstance(data, bytes):
data = data.encode('utf-8') data = data.encode('utf-8')
assert len(data) <= t.blksize assert len(data) <= t.blksize
dataOk, _ = t.tdb._blkData(t.zf, blk, t.at) dataOk, blkrev = t.tdb._blkData(t.zf, blk, t.at)
assert data == dataOk, "explicit vs computed data" assert dataOk == data, "computed vs explicit data"
data += b'\0'*(t.blksize - len(data)) # tailing zeros data += b'\0'*(t.blksize - len(data)) # tailing zeros
assert blk < t._sizeinblk() assert blk < t._sizeinblk()
# if access goes to @head/file watches must be notified. if to @rev/file - silent.
wpin = {} # tWatchLink -> (zf, pinok)
for wlink in t.tdb._wlinks:
pinok = {}
if t.at is None: # @head/...
wat = wlink._watching.get(zf)
if wat is not None and wat < blkrev:
# XXX and block not cached and watch not already pinned on the watch
pinok = {blk: t.tdb._blkRev(t.zf, blk, wat)}
wpin[wlink] = (t.zf, pinok)
if pinokByWLink is not None:
assert wpin == pinokByWLink, "computed vs explicit pinokByWLink"
pinokByWLink = wpin
# XXX assert individually for every block's page? (easier debugging?) # XXX assert individually for every block's page? (easier debugging?)
assert t.blk(blk) == data, ("#blk: %d" % blk) assert t.blk(blk) == data, ("#blk: %d" % blk)
...@@ -545,10 +577,10 @@ class tWatchLink: ...@@ -545,10 +577,10 @@ class tWatchLink:
# this tWatchLink currently watches files at particular state. # this tWatchLink currently watches files at particular state.
t._watching = {} # {} ZBigFile -> @at t._watching = {} # {} ZBigFile -> @at
tdb._tracked.add(t) tdb._wlinks.add(t)
def close(t): def close(t):
t.tdb._tracked.remove(t) t.tdb._wlinks.remove(t)
t._serveCancel() t._serveCancel()
# ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up # ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up
...@@ -819,7 +851,7 @@ def watch(twlink, zf, at, pinok=None): # XXX -> ? ...@@ -819,7 +851,7 @@ def watch(twlink, zf, at, pinok=None): # XXX -> ?
# {} blk -> at that have to be pinned. # {} blk -> at that have to be pinned.
# XXX also check that head/file[blk] is in cache - else no need to pin # XXX also check that head/file[blk] is in cache - else no need to pin
if pinok is not None: if pinok is not None:
assert pinok == pin, "explicit pinok != computed pinok" assert pin == pinok, "computed vs explicit pinok"
pinok = pin pinok = pin
print('# pinok: %s' % pinstr(pinok)) print('# pinok: %s' % pinstr(pinok))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment