Commit df371d3f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 92002789
...@@ -167,7 +167,7 @@ error map_into(void *addr, size_t size, int prot, int flags, const os::File f, o ...@@ -167,7 +167,7 @@ error map_into(void *addr, size_t size, int prot, int flags, const os::File f, o
namespace strings { namespace strings {
bool has_prefix(const string &s, const string &prefix) { bool has_prefix(const string &s, const string &prefix) {
return s.compare(0, prefix.size(), prefix); return s.compare(0, prefix.size(), prefix) == 0; // XXX -> pygolang + tests
} }
bool has_prefix(const string &s, char prefix) { bool has_prefix(const string &s, char prefix) {
......
...@@ -61,7 +61,7 @@ pair<WatchLink, error> WCFS::_openwatch() { ...@@ -61,7 +61,7 @@ pair<WatchLink, error> WCFS::_openwatch() {
tie(serveCtx, wlink->_serveCancel) = context::with_cancel(context::background()); tie(serveCtx, wlink->_serveCancel) = context::with_cancel(context::background());
wlink->_serveWG = sync::NewWorkGroup(serveCtx); wlink->_serveWG = sync::NewWorkGroup(serveCtx);
wlink->_serveWG->go([wlink](context::Context ctx) -> error { wlink->_serveWG->go([wlink](context::Context ctx) -> error {
return wlink->_serveRX(ctx); return wlink->_serveRX(ctx);
}); });
return make_pair(wlink, nil); return make_pair(wlink, nil);
...@@ -117,6 +117,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ? ...@@ -117,6 +117,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
// when finishing - wakeup everyone waiting for rx // when finishing - wakeup everyone waiting for rx
defer([&]() { defer([&]() {
printf("serveRX: close all chans\n");
wlink._acceptq.close(); wlink._acceptq.close();
wlink._rxmu.lock(); wlink._rxmu.lock();
wlink._rxdown = true; // don't allow new rxtab registers wlink._rxdown = true; // don't allow new rxtab registers
...@@ -135,7 +136,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ? ...@@ -135,7 +136,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
// NOTE: .close() makes sure .f.read*() will wake up // NOTE: .close() makes sure .f.read*() will wake up
//printf("serveRX -> readline ...\n"); //printf("serveRX -> readline ...\n");
tie(l, err) = wlink._readline(); // XXX +maxlen tie(l, err) = wlink._readline(); // XXX +maxlen
//printf(" readline -> woken up; l='%s' ; err='%s'\n", l.c_str(), v(err)); printf(" readline -> woken up; l='%s' ; err='%s'\n", l.c_str(), v(err));
if (err == io::EOF_) { // peer closed its tx if (err == io::EOF_) { // peer closed its tx
// XXX what happens on other errors? // XXX what happens on other errors?
wlink._rx_eof.close(); wlink._rx_eof.close();
...@@ -149,12 +150,12 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ? ...@@ -149,12 +150,12 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
printf("C: watch : rx: \"%s\"", l.c_str()); printf("C: watch : rx: \"%s\"", l.c_str());
err = pkt.from_string(l); err = pkt.from_string(l);
printf("line -> pkt: err='%s'\n", v(err)); printf("line -> pkt: err='%s'\n", v(err));
if (err != nil) if (err != nil)
return err; return err;
printf("pkt.stream: %lu\n", pkt.stream); printf("pkt.stream: %lu\n", pkt.stream);
printf("pkt.datalen: %u\n", pkt.datalen); printf("pkt.datalen: %u\n", pkt.datalen);
if (pkt.stream == 0) { // control/fatal message from wcfs if (pkt.stream == 0) { // control/fatal message from wcfs
// XXX print -> receive somewhere? XXX -> recvCtl ? // XXX print -> receive somewhere? XXX -> recvCtl ?
...@@ -181,7 +182,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ? ...@@ -181,7 +182,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
ctx->done().recvs(), // 0 ctx->done().recvs(), // 0
rxq.sends(&pkt), // 1 rxq.sends(&pkt), // 1
}); });
printf("rxq <- pkt: -> sel #%d\n", _); printf("rxq <- pkt: -> sel #%d\n", _);
if (_ == 0) if (_ == 0)
return ctx->err(); return ctx->err();
} }
...@@ -317,19 +318,25 @@ error _WatchLink::recvReq(context::Context ctx, PinReq *prx) { ...@@ -317,19 +318,25 @@ error _WatchLink::recvReq(context::Context ctx, PinReq *prx) {
if (!ok) if (!ok)
return io::EOF_; return io::EOF_;
pkt.to_string(); //pkt.to_string();
return _parsePinReq(prx, &pkt); return _parsePinReq(prx, &pkt);
} }
// _parsePinReq parses message into PinReq according to wcfs invalidation protocol. // _parsePinReq parses message into PinReq according to wcfs invalidation protocol.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt) { static error _parsePinReq(PinReq *pin, const rxPkt *pkt) {
// XXX err ctx "bad pin" // XXX err ctx "bad pin"
printf("parse pinreq: stream=%lu msg='%s'\n", pkt->stream, &pkt->data[0]);
pin->stream = pkt->stream; pin->stream = pkt->stream;
auto msg = pkt->to_string(); string msg = pkt->to_string();
printf("'%s'\n", msg.c_str());
printf("has_prefix: %i\n", strings::has_prefix(msg, "pin "));
// pin <foid>) #<blk> @<at> // pin <foid>) #<blk> @<at>
if (!strings::has_prefix(msg, "pin ")) if (!strings::has_prefix(msg, "pin ")) {
printf("\n\n\nnot a pin request: '%s'\n", msg.c_str()); // XXX temp
abort();
return fmt::errorf("not a pin request: '%s'", msg.c_str()); // XXX msg -> errctx ? return fmt::errorf("not a pin request: '%s'", msg.c_str()); // XXX msg -> errctx ?
}
auto argv = strings::split(msg.substr(4), ' '); auto argv = strings::split(msg.substr(4), ' ');
if (argv.size() != 3) if (argv.size() != 3)
...@@ -371,7 +378,7 @@ tuple<string, error> _WatchLink::_readline() { ...@@ -371,7 +378,7 @@ tuple<string, error> _WatchLink::_readline() {
if (nl != string::npos) { if (nl != string::npos) {
auto line = wlink._rxbuf.substr(0, nl+1); auto line = wlink._rxbuf.substr(0, nl+1);
wlink._rxbuf = wlink._rxbuf.substr(nl+1); wlink._rxbuf = wlink._rxbuf.substr(nl+1);
//printf("\t_readline -> ret '%s'\n", line.c_str()); //printf("\t_readline -> ret '%s'\n", line.c_str());
return make_tuple(line, nil); return make_tuple(line, nil);
} }
nl_searchfrom = wlink._rxbuf.length(); nl_searchfrom = wlink._rxbuf.length();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment