diff --git a/wcfs/wcfs.go b/wcfs/wcfs.go index 7363d8b6ffae660d23bfd6ee41f0cd4387ae131c..31d13671ab5a002f1a2da4b90b6e98dbfcd3f52c 100644 --- a/wcfs/wcfs.go +++ b/wcfs/wcfs.go @@ -122,7 +122,7 @@ // clients, that had requested it (separately to each client), about the // changes: // -// S: 2 pin <bigfileX> #<blk> @<rev_max> +// S: 2 pin <bigfileX> #<blk> @<rev_max> XXX 2-> 2*k (multiple pins in parallel) // // and waits until all clients confirm that changed file block can be updated // in global OS cache. diff --git a/wcfs/wcfs_test.py b/wcfs/wcfs_test.py index 7033982b747ff79f7d8be6ca281118fca01dbcf5..1480537dcad4ac4e5782e68ef07cd691f89fd945 100644 --- a/wcfs/wcfs_test.py +++ b/wcfs/wcfs_test.py @@ -125,7 +125,7 @@ class tDB: # ZBigFile(s) scheduled for commit t._changed = {} # ZBigFile -> {} blk -> data - # commited: head + head history + # committed: head + head history t.head = None t._headv = [] @@ -133,7 +133,7 @@ class tDB: t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead") t._wc_zheadv = [] - # tracked tFiles + # tracked tFiles & tWatches t._tracked = set() def close(t): @@ -182,7 +182,7 @@ class tDB: def wcsync(t): while len(t._wc_zheadv) < len(t._headv): l = t._wc_zheadfh.readline() - #print '> zhead read: %r' % l + #print('> zhead read: %r' % l) l = l.rstrip('\n') wchead = fromhex(l) i = len(t._wc_zheadv) @@ -219,9 +219,9 @@ class tDB: return os.stat(path) # _open opens file corresponding to obj on wcfs. - def _open(t, obj, at=None): + def _open(t, obj, mode='rb', at=None): path = t.path(obj, at=at) - return open(path, 'rb', 0) # unbuffered + return open(path, mode, 0) # unbuffered # open opens wcfs file corresponding to zf@at and starts to track it. # see returned tFile for details. @@ -327,13 +327,12 @@ class tFile: assert blk < (st.st_size // t.blksize) assert st.st_size // t.blksize <= t._max_tracked - # XXX assert individually for every block page? (easier debugging?) + # XXX assert individually for every block's page? (easier debugging?) assert t.blk(blk) == data, ("#blk: %d" % blk) # we just accessed the block - it has to be in OS cache assert t.cached()[blk] == 1 - # assertData asserts that file has data blocks as specified. # # Expected blocks may be given with size < zf.blksize. In such case they @@ -355,8 +354,115 @@ class tFile: t.assertCache([1]*len(datav)) +# tWatch is testing environment for /head/watch opened on wcfs. +class tWatch: + + def __init__(t, tdb): + t.tdb = tdb + t.w = tdb._open("/head/watch", mode='rwb') + + t._acceptq = chan() # (stream, msg) server originated messages go here + t._rxmu = threading.Lock() + t._rxtab = {} # stream -> rxq server replies go via here + t._accepted = set() # of stream streams we accpted but did not replied yet + + t._txmu = threading.Lock() # serializes writes + + t._serveDone = chan() + go(t._serveRecv) + + tdb._tracked.add(t) + + def close(t): + # XXX close * from rxtab + t.tdb._tracked.remove(t) + t.w.close() + t._serveDone.recv() + + + # _serveRecv receives messages from .w and dispatches them according to streamID. + @func + def _serveRecv(t): + defer(t._serveDone.close) + while 1: + l = t.w.readline() + print('watch: rx: %r' % l) + if len(l) == 0: + break # closed + + # <stream> ... + stream, msg = l.split(' ', 1) + stream = int(stream) + + reply = bool(stream % 2) + if reply: + with t._mu: + assert stream in t._rxtab + rxq = t._rxtabs[stream] + rxq.send(msg) + else: + with t._mu: + assert stream not in t._accepted + t._accepted.add(stream) + t._acceptq.send((stream, msg)) + + + # _send sends raw message via specified stream. + # + # multiple _send can be called in parallel - _send serializes writes. + def _send(t, stream, msg): + assert '\n' not in msg + with t._txmu: + t.w.write("%d %s\n" % (stream, msg)) + + # sendReq sends client -> server request and returns server reply. + # + # only 1 sendReq must be used at a time. # XXX relax? + def sendReq(t, req): + stream = 1 + + rxq = chan() + with t._mu: + assert stream not in t._rxtab + t._rxtab[stream] = rxq + + t._send(stream, req) + return rxq.recv() + + # recvReq receives client <- server request. + # + # multiple recvReq could be used at a time. + def recvReq(t): # -> tSrvReq + rx = t._acceptq.recv() + if rx is None: + return rx -# XXX text ... + stream, msg = rx + return tSrvReq(t, stream, msg) + +# tSrvReq represents 1 server-initiated wcfs request received over /head/watch. +class tSrvReq: + def __init__(req, twatch, stream, msg): + req.twatch = twatch + req.stream = stream + req.msg = msg + + def reply(req, answer): + t = req.twatch + with t._mu: + assert stream in t._accepted + + t._send(req.stream, answer) + + with t._mu: + assert stream in t._accepted + t._accepted.delete(req.stream) + + # XXX also track as answered? (and don't accept with the same ID ?) + + + +# test_wcfs exercises wcfs functionality. @func def test_wcfs(): t = tDB() @@ -370,31 +476,27 @@ def test_wcfs(): assert tidtime(tid2) > tidtime(tid1) t.wcsync() - # >>> lookup non-BigFile -> must be rejected with raises(OSError) as exc: t.stat(nonfile) assert exc.value.errno == EINVAL - f = t.open(zf) - # >>> file initially empty + f = t.open(zf) f.assertCache([]) f.assertData ([], mtime=tid1) - # >>> commit data to zf -> verify we can see it on wcfs + # >>> (@at1) commit data -> we can see it on wcfs t.change(zf, {2: b'alpha'}) - t.commit() + at1 = t.commit() t.wcsync() f.assertCache([0,0,0]) # initially not cached f.assertData ([b'',b'',b'alpha'], mtime=t.head) - - # >>> commit data again -> verify we can see both latest and snapshotted states. - at1 = t.head + # >>> (@at2) commit again -> we can see both latest and snapshotted states t.change(zf, {2: b'beta', 3: b'gamma'}) - t.commit() + at2 = t.commit() t.wcsync() @@ -408,14 +510,12 @@ def test_wcfs(): f1.assertData ([b'',b'',b'alpha']) # XXX + mtime=at1? - # >>> commit again without changing zf size - at2 = t.head - f2 = t.open(zf, at=at2) + # >>> (@at3) commit again without changing zf size + f2 = t.open(zf, at=at2) t.change(zf, {2: b'kitty'}) - t.commit() + at3 = t.commit() t.wcsync() - f.assertCache([1,1,0,1]) # f @head is opened again -> cache must not be lost @@ -436,7 +536,6 @@ def test_wcfs(): f1.assertData ([b'',b'',b'alpha']) # XXX + mtime=at1? - # >>> f close / open again -> cache must not be lost # XXX bit flaky since OS can evict whole f cache under pressure f.assertCache([1,1,1,1]) @@ -448,6 +547,23 @@ def test_wcfs(): # >>> XXX commit data to not yet accessed f part - nothing happens + w = tdb.openwatch() + + # XXX vvv -> expectPin? + done = chan() + def _(): + r = w.recvReq(); assert r == ("pin %s #%d @%s" % zf, ...) + r.ack() + ... + done.close() + go(_) + assert w.sentReq("watch %s @%s", zf, at1) == "ok" + done.recv() + + tdb.watch(zf, at1) # XXX <- pin @at2 @at3 + + +