Commit 3af99b6f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 8f497094
...@@ -122,7 +122,7 @@ ...@@ -122,7 +122,7 @@
// clients, that had requested it (separately to each client), about the // clients, that had requested it (separately to each client), about the
// changes: // changes:
// //
// S: 2 pin <bigfileX> #<blk> @<rev_max> // S: 2 pin <bigfileX> #<blk> @<rev_max> XXX 2-> 2*k (multiple pins in parallel)
// //
// and waits until all clients confirm that changed file block can be updated // and waits until all clients confirm that changed file block can be updated
// in global OS cache. // in global OS cache.
......
...@@ -125,7 +125,7 @@ class tDB: ...@@ -125,7 +125,7 @@ class tDB:
# ZBigFile(s) scheduled for commit # ZBigFile(s) scheduled for commit
t._changed = {} # ZBigFile -> {} blk -> data t._changed = {} # ZBigFile -> {} blk -> data
# commited: head + head history # committed: head + head history
t.head = None t.head = None
t._headv = [] t._headv = []
...@@ -133,7 +133,7 @@ class tDB: ...@@ -133,7 +133,7 @@ class tDB:
t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead") t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead")
t._wc_zheadv = [] t._wc_zheadv = []
# tracked tFiles # tracked tFiles & tWatches
t._tracked = set() t._tracked = set()
def close(t): def close(t):
...@@ -182,7 +182,7 @@ class tDB: ...@@ -182,7 +182,7 @@ class tDB:
def wcsync(t): def wcsync(t):
while len(t._wc_zheadv) < len(t._headv): while len(t._wc_zheadv) < len(t._headv):
l = t._wc_zheadfh.readline() l = t._wc_zheadfh.readline()
#print '> zhead read: %r' % l #print('> zhead read: %r' % l)
l = l.rstrip('\n') l = l.rstrip('\n')
wchead = fromhex(l) wchead = fromhex(l)
i = len(t._wc_zheadv) i = len(t._wc_zheadv)
...@@ -219,9 +219,9 @@ class tDB: ...@@ -219,9 +219,9 @@ class tDB:
return os.stat(path) return os.stat(path)
# _open opens file corresponding to obj on wcfs. # _open opens file corresponding to obj on wcfs.
def _open(t, obj, at=None): def _open(t, obj, mode='rb', at=None):
path = t.path(obj, at=at) path = t.path(obj, at=at)
return open(path, 'rb', 0) # unbuffered return open(path, mode, 0) # unbuffered
# open opens wcfs file corresponding to zf@at and starts to track it. # open opens wcfs file corresponding to zf@at and starts to track it.
# see returned tFile for details. # see returned tFile for details.
...@@ -327,13 +327,12 @@ class tFile: ...@@ -327,13 +327,12 @@ class tFile:
assert blk < (st.st_size // t.blksize) assert blk < (st.st_size // t.blksize)
assert st.st_size // t.blksize <= t._max_tracked assert st.st_size // t.blksize <= t._max_tracked
# XXX assert individually for every block page? (easier debugging?) # XXX assert individually for every block's page? (easier debugging?)
assert t.blk(blk) == data, ("#blk: %d" % blk) assert t.blk(blk) == data, ("#blk: %d" % blk)
# we just accessed the block - it has to be in OS cache # we just accessed the block - it has to be in OS cache
assert t.cached()[blk] == 1 assert t.cached()[blk] == 1
# assertData asserts that file has data blocks as specified. # assertData asserts that file has data blocks as specified.
# #
# Expected blocks may be given with size < zf.blksize. In such case they # Expected blocks may be given with size < zf.blksize. In such case they
...@@ -355,8 +354,115 @@ class tFile: ...@@ -355,8 +354,115 @@ class tFile:
t.assertCache([1]*len(datav)) t.assertCache([1]*len(datav))
# tWatch is testing environment for /head/watch opened on wcfs.
class tWatch:
def __init__(t, tdb):
t.tdb = tdb
t.w = tdb._open("/head/watch", mode='rwb')
t._acceptq = chan() # (stream, msg) server originated messages go here
t._rxmu = threading.Lock()
t._rxtab = {} # stream -> rxq server replies go via here
t._accepted = set() # of stream streams we accpted but did not replied yet
t._txmu = threading.Lock() # serializes writes
t._serveDone = chan()
go(t._serveRecv)
tdb._tracked.add(t)
def close(t):
# XXX close * from rxtab
t.tdb._tracked.remove(t)
t.w.close()
t._serveDone.recv()
# _serveRecv receives messages from .w and dispatches them according to streamID.
@func
def _serveRecv(t):
defer(t._serveDone.close)
while 1:
l = t.w.readline()
print('watch: rx: %r' % l)
if len(l) == 0:
break # closed
# <stream> ...
stream, msg = l.split(' ', 1)
stream = int(stream)
reply = bool(stream % 2)
if reply:
with t._mu:
assert stream in t._rxtab
rxq = t._rxtabs[stream]
rxq.send(msg)
else:
with t._mu:
assert stream not in t._accepted
t._accepted.add(stream)
t._acceptq.send((stream, msg))
# _send sends raw message via specified stream.
#
# multiple _send can be called in parallel - _send serializes writes.
def _send(t, stream, msg):
assert '\n' not in msg
with t._txmu:
t.w.write("%d %s\n" % (stream, msg))
# sendReq sends client -> server request and returns server reply.
#
# only 1 sendReq must be used at a time. # XXX relax?
def sendReq(t, req):
stream = 1
rxq = chan()
with t._mu:
assert stream not in t._rxtab
t._rxtab[stream] = rxq
t._send(stream, req)
return rxq.recv()
# recvReq receives client <- server request.
#
# multiple recvReq could be used at a time.
def recvReq(t): # -> tSrvReq
rx = t._acceptq.recv()
if rx is None:
return rx
# XXX text ... stream, msg = rx
return tSrvReq(t, stream, msg)
# tSrvReq represents 1 server-initiated wcfs request received over /head/watch.
class tSrvReq:
def __init__(req, twatch, stream, msg):
req.twatch = twatch
req.stream = stream
req.msg = msg
def reply(req, answer):
t = req.twatch
with t._mu:
assert stream in t._accepted
t._send(req.stream, answer)
with t._mu:
assert stream in t._accepted
t._accepted.delete(req.stream)
# XXX also track as answered? (and don't accept with the same ID ?)
# test_wcfs exercises wcfs functionality.
@func @func
def test_wcfs(): def test_wcfs():
t = tDB() t = tDB()
...@@ -370,31 +476,27 @@ def test_wcfs(): ...@@ -370,31 +476,27 @@ def test_wcfs():
assert tidtime(tid2) > tidtime(tid1) assert tidtime(tid2) > tidtime(tid1)
t.wcsync() t.wcsync()
# >>> lookup non-BigFile -> must be rejected # >>> lookup non-BigFile -> must be rejected
with raises(OSError) as exc: with raises(OSError) as exc:
t.stat(nonfile) t.stat(nonfile)
assert exc.value.errno == EINVAL assert exc.value.errno == EINVAL
f = t.open(zf)
# >>> file initially empty # >>> file initially empty
f = t.open(zf)
f.assertCache([]) f.assertCache([])
f.assertData ([], mtime=tid1) f.assertData ([], mtime=tid1)
# >>> commit data to zf -> verify we can see it on wcfs # >>> (@at1) commit data -> we can see it on wcfs
t.change(zf, {2: b'alpha'}) t.change(zf, {2: b'alpha'})
t.commit() at1 = t.commit()
t.wcsync() t.wcsync()
f.assertCache([0,0,0]) # initially not cached f.assertCache([0,0,0]) # initially not cached
f.assertData ([b'',b'',b'alpha'], mtime=t.head) f.assertData ([b'',b'',b'alpha'], mtime=t.head)
# >>> (@at2) commit again -> we can see both latest and snapshotted states
# >>> commit data again -> verify we can see both latest and snapshotted states.
at1 = t.head
t.change(zf, {2: b'beta', 3: b'gamma'}) t.change(zf, {2: b'beta', 3: b'gamma'})
t.commit() at2 = t.commit()
t.wcsync() t.wcsync()
...@@ -408,14 +510,12 @@ def test_wcfs(): ...@@ -408,14 +510,12 @@ def test_wcfs():
f1.assertData ([b'',b'',b'alpha']) # XXX + mtime=at1? f1.assertData ([b'',b'',b'alpha']) # XXX + mtime=at1?
# >>> commit again without changing zf size # >>> (@at3) commit again without changing zf size
at2 = t.head
f2 = t.open(zf, at=at2) f2 = t.open(zf, at=at2)
t.change(zf, {2: b'kitty'}) t.change(zf, {2: b'kitty'})
t.commit() at3 = t.commit()
t.wcsync() t.wcsync()
f.assertCache([1,1,0,1]) f.assertCache([1,1,0,1])
# f @head is opened again -> cache must not be lost # f @head is opened again -> cache must not be lost
...@@ -436,7 +536,6 @@ def test_wcfs(): ...@@ -436,7 +536,6 @@ def test_wcfs():
f1.assertData ([b'',b'',b'alpha']) # XXX + mtime=at1? f1.assertData ([b'',b'',b'alpha']) # XXX + mtime=at1?
# >>> f close / open again -> cache must not be lost # >>> f close / open again -> cache must not be lost
# XXX bit flaky since OS can evict whole f cache under pressure # XXX bit flaky since OS can evict whole f cache under pressure
f.assertCache([1,1,1,1]) f.assertCache([1,1,1,1])
...@@ -448,6 +547,23 @@ def test_wcfs(): ...@@ -448,6 +547,23 @@ def test_wcfs():
# >>> XXX commit data to not yet accessed f part - nothing happens # >>> XXX commit data to not yet accessed f part - nothing happens
w = tdb.openwatch()
# XXX vvv -> expectPin?
done = chan()
def _():
r = w.recvReq(); assert r == ("pin %s #%d @%s" % zf, ...)
r.ack()
...
done.close()
go(_)
assert w.sentReq("watch %s @%s", zf, at1) == "ok"
done.recv()
tdb.watch(zf, at1) # XXX <- pin @at2 @at3
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment