Commit efc51ca2 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 8c7ee360
...@@ -8,5 +8,5 @@ if test -z "$qrun_loglevel"; then ...@@ -8,5 +8,5 @@ if test -z "$qrun_loglevel"; then
fi fi
# executed under qemu-runlinux # executed under qemu-runlinux
export WENDELIN_CORE_WCFS_OPTIONS="-d -alsologtostderr -v=1" #export WENDELIN_CORE_WCFS_OPTIONS="-d -alsologtostderr -v=1"
py.test -vsx -k test_wcfs py.test -vsx -k test_wcfs
...@@ -1300,7 +1300,7 @@ func (w *Watcher) _serve() (err error) { ...@@ -1300,7 +1300,7 @@ func (w *Watcher) _serve() (err error) {
return err // XXX err ctx? return err // XXX err ctx?
} }
fmt.Printf("watch: rx: %q\n", l) fmt.Printf("S: watch: rx: %q\n", l)
stream, msg, err := parseWatchFrame(l) stream, msg, err := parseWatchFrame(l)
if err != nil { if err != nil {
......
...@@ -187,7 +187,7 @@ class tDB: ...@@ -187,7 +187,7 @@ class tDB:
wchead = fromhex(l) wchead = fromhex(l)
i = len(t._wc_zheadv) i = len(t._wc_zheadv)
if wchead != t._headv[i]: if wchead != t._headv[i]:
raise RuntimeError("wcsync #%d: wczhead (%s) != zhead (%s)" % (i, wchead, t._headv[i])) raise RuntimeError("wcsync #%d: wczhead (%s) != zhead (%s)" % (i, h(wchead), h(t._headv[i])))
t._wc_zheadv.append(wchead) t._wc_zheadv.append(wchead)
# head/at = last txn of whole db # head/at = last txn of whole db
...@@ -409,7 +409,7 @@ class tWatch: ...@@ -409,7 +409,7 @@ class tWatch:
defer(t._serveDone.close) defer(t._serveDone.close)
while 1: while 1:
l = t.wrx.readline() l = t.wrx.readline()
print('watch: rx: %r' % l) print('C: watch: rx: %r' % l)
if len(l) == 0: if len(l) == 0:
break # closed break # closed
...@@ -433,41 +433,30 @@ class tWatch: ...@@ -433,41 +433,30 @@ class tWatch:
# #
# multiple _send can be called in parallel - _send serializes writes. # multiple _send can be called in parallel - _send serializes writes.
def _send(t, stream, msg): def _send(t, stream, msg):
print('qqq')
assert '\n' not in msg assert '\n' not in msg
print('zzz')
with t._txmu: with t._txmu:
print('rrr')
a = b"%d %s\n" % (stream, msg) a = b"%d %s\n" % (stream, msg)
print(`a`)
t.wtx.write(b"%d %s\n" % (stream, msg)) # XXX read/write don't work in parallel? t.wtx.write(b"%d %s\n" % (stream, msg)) # XXX read/write don't work in parallel?
print('sss')
t.wtx.flush() t.wtx.flush()
print('ooo')
# sendReq sends client -> server request and returns server reply. # sendReq sends client -> server request and returns server reply.
# #
# only 1 sendReq must be used at a time. # XXX relax? # only 1 sendReq must be used at a time. # XXX relax?
def sendReq(t, req): def sendReq(t, req):
print("000")
stream = 1 stream = 1
rxq = chan() rxq = chan()
print("111")
with t._rxmu: with t._rxmu:
print("222")
assert stream not in t._rxtab assert stream not in t._rxtab
t._rxtab[stream] = rxq t._rxtab[stream] = rxq
print("333")
t._send(stream, req) t._send(stream, req)
print("444")
return rxq.recv() return rxq.recv()
# recvReq receives client <- server request. # recvReq receives client <- server request.
# #
# multiple recvReq could be used at a time. # multiple recvReq could be used at a time.
def recvReq(t): # -> tSrvReq def recvReq(t): # -> tSrvReq | None when EOF
rx = t._acceptq.recv() rx = t._acceptq.recv()
if rx is None: if rx is None:
return rx return rx
...@@ -476,7 +465,7 @@ class tWatch: ...@@ -476,7 +465,7 @@ class tWatch:
return tSrvReq(t, stream, msg) return tSrvReq(t, stream, msg)
# expectPin ... XXX # expectPin asserts that wcfs sends expected pin messages.
# #
# expectv is [] of (zf, blk, at) # expectv is [] of (zf, blk, at)
# returns [] of received pin requests. # returns [] of received pin requests.
...@@ -490,6 +479,7 @@ class tWatch: ...@@ -490,6 +479,7 @@ class tWatch:
reqv = [] # of received requests reqv = [] # of received requests
while len(expected) > 0: while len(expected) > 0:
req = t.recvReq() req = t.recvReq()
assert req is not None
assert req.msg in expected assert req.msg in expected
expected.delete(req.msg) expected.delete(req.msg)
reqv.append(req) reqv.append(req)
...@@ -607,7 +597,6 @@ def test_wcfs(): ...@@ -607,7 +597,6 @@ def test_wcfs():
# XXX invalidation protocol ... # XXX invalidation protocol ...
print('\n\n') print('\n\n')
w = t.openwatch() w = t.openwatch()
done = chan() done = chan()
...@@ -615,15 +604,11 @@ def test_wcfs(): ...@@ -615,15 +604,11 @@ def test_wcfs():
def _(): def _():
defer(done.close) defer(done.close)
pinv = w.expectPin([(zf, 2, at1), (zf, 3, at1)]) pinv = w.expectPin([(zf, 2, at1), (zf, 3, at1)])
#pinv = w.expectPin({zf: [(2, at1), (3, at1)]}) XXX <- this way better? (sugar)
for p in pinv: for p in pinv:
p.ack() p.ack()
go(_) go(_)
print('\nAAA\n')
try:
assert w.sendReq(b"watch %s @%s" % (h(zf._p_oid), h(at1))) == "ok" assert w.sendReq(b"watch %s @%s" % (h(zf._p_oid), h(at1))) == "ok"
except Exception, e:
print(e)
raise
print('\nBBB\n') print('\nBBB\n')
done.recv() done.recv()
print('\nCCC\n') print('\nCCC\n')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment