Commit 36d7ed95 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 1947eb4d
...@@ -32,6 +32,8 @@ from libcpp.utility cimport pair ...@@ -32,6 +32,8 @@ from libcpp.utility cimport pair
cdef extern from *: cdef extern from *:
ctypedef bint cbool "bool" ctypedef bint cbool "bool"
from libc.stdio cimport printf # XXX temp
cdef extern from "wcfs_watchlink.h" nogil: cdef extern from "wcfs_watchlink.h" nogil:
cppclass _WatchLink: cppclass _WatchLink:
error close() error close()
...@@ -90,9 +92,12 @@ cdef class PyWatchLink: ...@@ -90,9 +92,12 @@ cdef class PyWatchLink:
reply = _.first reply = _.first
err = _.second err = _.second
printf("pyxsendReq: reply='%s'\n", reply.c_str())
if err != nil: if err != nil:
raise RuntimeError(err.Error()) # XXX -> Xpy(err) ? pyraiseIf(err) ? raise RuntimeError(err.Error()) # XXX -> Xpy(err) ? pyraiseIf(err) ?
printf("pyxsendReq: reply -> ok\n")
return reply return reply
......
...@@ -124,7 +124,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ? ...@@ -124,7 +124,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
// NOTE: .close() makes sure .f.read*() will wake up // NOTE: .close() makes sure .f.read*() will wake up
printf("serveRX -> readline ...\n"); printf("serveRX -> readline ...\n");
tie(l, err) = wlink._readline(); // XXX +maxlen tie(l, err) = wlink._readline(); // XXX +maxlen
printf("\treadline -> woken up; l='%s' ; err='%s'\n", l.c_str(), printf(" readline -> woken up; l='%s' ; err='%s'\n", l.c_str(),
(err != nil ? err->Error().c_str() : "nil")); (err != nil ? err->Error().c_str() : "nil"));
if (err == io::EOF_) { // peer closed its tx if (err == io::EOF_) { // peer closed its tx
// XXX what happens on other errors? // XXX what happens on other errors?
...@@ -135,9 +135,13 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ? ...@@ -135,9 +135,13 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
printf("C: watch : rx: %s", l.c_str()); printf("C: watch : rx: %s", l.c_str());
err = pkt.from_string(l); err = pkt.from_string(l);
printf("line -> pkt: err='%s'\n", (err != nil ? err->Error().c_str() : "nil"));
if (err != nil) if (err != nil)
return err; return err;
printf("pkt.stream: %lu\n", pkt.stream);
printf("pkt.datalen: %u\n", pkt.datalen);
if (pkt.stream == 0) { // control/fatal message from wcfs if (pkt.stream == 0) { // control/fatal message from wcfs
// XXX print -> receive somewhere? XXX -> recvCtl ? // XXX print -> receive somewhere? XXX -> recvCtl ?
printf("C: watch : rx fatal: %s\n", l.c_str()); printf("C: watch : rx fatal: %s\n", l.c_str());
...@@ -163,6 +167,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ? ...@@ -163,6 +167,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
ctx->done().recvs(), // 0 ctx->done().recvs(), // 0
rxq.sends(&pkt), // 1 rxq.sends(&pkt), // 1
}); });
printf("rxq <- pkt: -> sel #%d\n", _);
if (_ == 0) if (_ == 0)
return ctx->err(); return ctx->err();
} }
...@@ -229,17 +234,18 @@ pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req) ...@@ -229,17 +234,18 @@ pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req)
if (err != nil) if (err != nil)
return make_pair("", err); return make_pair("", err);
printf("\twait ...\n"); printf("sendReq: wait ...\n");
int _ = select({ int _ = select({
ctx->done().recvs(), // 0 ctx->done().recvs(), // 0
rxq.recvs(&rx), // 1 rxq.recvs(&rx), // 1
}); });
printf("\twoken up #%d\n", _); printf("sendReq: woken up #%d\n", _);
if (_ == 0) if (_ == 0)
return make_pair("", ctx->err()); return make_pair("", ctx->err());
// XXX check for EOF // XXX check for EOF
string reply = rx.to_string(); string reply = rx.to_string();
printf("sendReq: reply='%s'\n", reply.c_str());
return make_pair(reply, nil); return make_pair(reply, nil);
} }
...@@ -351,15 +357,16 @@ tuple<string, error> _WatchLink::_readline() { ...@@ -351,15 +357,16 @@ tuple<string, error> _WatchLink::_readline() {
if (nl != string::npos) { if (nl != string::npos) {
auto line = wlink._rxbuf.substr(0, nl+1); auto line = wlink._rxbuf.substr(0, nl+1);
wlink._rxbuf = wlink._rxbuf.substr(nl+1); wlink._rxbuf = wlink._rxbuf.substr(nl+1);
printf("\t_readline -> ret '%s'\n", line.c_str());
return make_tuple(line, nil); return make_tuple(line, nil);
} }
nl_searchfrom = wlink._rxbuf.length(); nl_searchfrom = wlink._rxbuf.length();
int n; int n;
error err; error err;
printf(" _readline -> read ...\n"); printf("\t_readline -> read ...\n");
tie(n, err) = wlink._f->read(buf, sizeof(buf)); tie(n, err) = wlink._f->read(buf, sizeof(buf));
printf(" _readline -> read: n=%d err='%s'\n", n, (err != nil ? err->Error().c_str() : "nil")); printf("\t_readline -> read: n=%d err='%s'\n", n, (err != nil ? err->Error().c_str() : "nil"));
if (n > 0) { if (n > 0) {
// XXX limit line length to avoid DoS // XXX limit line length to avoid DoS
wlink._rxbuf += string(buf, n); wlink._rxbuf += string(buf, n);
......
...@@ -1201,6 +1201,11 @@ def test_wcfs_watch_robust(): ...@@ -1201,6 +1201,11 @@ def test_wcfs_watch_robust():
print('\n\nAAA') print('\n\nAAA')
wl = t.openwatch() wl = t.openwatch()
print('\n\nBBB') print('\n\nBBB')
assert "zzz" == "qqq"
print('\n\n\nalpha\n\n\n')
x = wl.sendReq(timeout(), b"watch %s @%s" % (h(zf._p_oid), h(at1)))
print(x)
print('\n\nBBB 2')
assert wl.sendReq(timeout(), b"watch %s @%s" % (h(zf._p_oid), h(at1))) == \ assert wl.sendReq(timeout(), b"watch %s @%s" % (h(zf._p_oid), h(at1))) == \
"error setup watch f<%s> @%s: " % (h(zf._p_oid), h(at1)) + \ "error setup watch f<%s> @%s: " % (h(zf._p_oid), h(at1)) + \
"file not yet known to wcfs or is not a ZBigFile" "file not yet known to wcfs or is not a ZBigFile"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment