Commit 47e453f7 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3af99b6f
...@@ -225,15 +225,20 @@ class tDB: ...@@ -225,15 +225,20 @@ class tDB:
# open opens wcfs file corresponding to zf@at and starts to track it. # open opens wcfs file corresponding to zf@at and starts to track it.
# see returned tFile for details. # see returned tFile for details.
def open(t, zf, at=None): def open(t, zf, at=None): # -> tFile
return tFile(t, zf, at=at) return tFile(t, zf, at=at)
# openwatch opens /head/watch on wcfs.
# see returned tWatch for details.
def openwatch(t): # -> tWatch
return tWatch(t)
# tFile is testing environment for one bigfile on wcfs. # tFile is testing environment for one bigfile on wcfs.
class tFile: class tFile:
# maximum number of pages we mmap for 1 file. # maximum number of pages we mmap for 1 file.
# this should be not big not to exceed mlock limit. # this should be not big not to exceed mlock limit.
_max_tracked = 10 _max_tracked = 8
def __init__(t, tdb, zf, at=None): def __init__(t, tdb, zf, at=None):
assert isinstance(zf, ZBigFile) assert isinstance(zf, ZBigFile)
...@@ -359,7 +364,7 @@ class tWatch: ...@@ -359,7 +364,7 @@ class tWatch:
def __init__(t, tdb): def __init__(t, tdb):
t.tdb = tdb t.tdb = tdb
t.w = tdb._open("/head/watch", mode='rwb') t.w = tdb._open("head/watch", mode='rwb')
t._acceptq = chan() # (stream, msg) server originated messages go here t._acceptq = chan() # (stream, msg) server originated messages go here
t._rxmu = threading.Lock() t._rxmu = threading.Lock()
...@@ -374,11 +379,16 @@ class tWatch: ...@@ -374,11 +379,16 @@ class tWatch:
tdb._tracked.add(t) tdb._tracked.add(t)
def close(t): def close(t):
# XXX close * from rxtab
t.tdb._tracked.remove(t) t.tdb._tracked.remove(t)
t.w.close() t.w.close()
t._serveDone.recv() t._serveDone.recv()
# wakeup everyone waiting for rx
t._acceptq.close()
with t._rxmu:
rxtab = t._rxtab
t._rxtab = None # don't allow new rxtab registers
for rxq in rxtab.values():
rxq.close()
# _serveRecv receives messages from .w and dispatches them according to streamID. # _serveRecv receives messages from .w and dispatches them according to streamID.
@func @func
...@@ -406,14 +416,13 @@ class tWatch: ...@@ -406,14 +416,13 @@ class tWatch:
t._accepted.add(stream) t._accepted.add(stream)
t._acceptq.send((stream, msg)) t._acceptq.send((stream, msg))
# _send sends raw message via specified stream. # _send sends raw message via specified stream.
# #
# multiple _send can be called in parallel - _send serializes writes. # multiple _send can be called in parallel - _send serializes writes.
def _send(t, stream, msg): def _send(t, stream, msg):
assert '\n' not in msg assert '\n' not in msg
with t._txmu: with t._txmu:
t.w.write("%d %s\n" % (stream, msg)) t.w.write(b"%d %s\n" % (stream, msg))
# sendReq sends client -> server request and returns server reply. # sendReq sends client -> server request and returns server reply.
# #
...@@ -440,6 +449,29 @@ class tWatch: ...@@ -440,6 +449,29 @@ class tWatch:
stream, msg = rx stream, msg = rx
return tSrvReq(t, stream, msg) return tSrvReq(t, stream, msg)
# expectPin ... XXX
#
# expectv is [] of (zf, blk, at)
# returns [] of received pin requests.
def expectPin(t, expectv):
expected = set() # of expected pin messages
for zf, blk, at in expectv:
msg = b"pin %s #%d @%s" % (h(zf._p_oid), blk, h(at))
assert msg not in expected
expected.add(msg)
reqv = [] # of received requests
while len(expected) > 0:
req = t.recvReq()
assert req.msg in expected
expected.delete(req.msg)
reqv.append(req)
return reqv
# tSrvReq represents 1 server-initiated wcfs request received over /head/watch. # tSrvReq represents 1 server-initiated wcfs request received over /head/watch.
class tSrvReq: class tSrvReq:
def __init__(req, twatch, stream, msg): def __init__(req, twatch, stream, msg):
...@@ -547,15 +579,17 @@ def test_wcfs(): ...@@ -547,15 +579,17 @@ def test_wcfs():
# >>> XXX commit data to not yet accessed f part - nothing happens # >>> XXX commit data to not yet accessed f part - nothing happens
w = tdb.openwatch() # XXX invalidation protocol ...
w = t.openwatch()
# XXX vvv -> expectPin?
done = chan() done = chan()
@func
def _(): def _():
r = w.recvReq(); assert r == ("pin %s #%d @%s" % zf, ...) defer(done.close)
r.ack() pinv = w.expectPin([(zf, blk, at), (zf, blk, at)])
... for p in pinv:
done.close() p.ack()
go(_) go(_)
assert w.sentReq("watch %s @%s", zf, at1) == "ok" assert w.sentReq("watch %s @%s", zf, at1) == "ok"
done.recv() done.recv()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment