Commit 60dacc49 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f3985859
......@@ -239,7 +239,7 @@ error Contextf::operator() (error err) const {
if (err == nil)
return nil;
return fmt::errorf("%s: %s", c.errctx.c_str(), v(err));
return fmt::errorf("%s: %s", v(c.errctx), v(err));
}
} // xerr::
......
......@@ -200,6 +200,7 @@ template<typename T> string v_(T obj) {
return obj.String();
}
template<> inline string v_(string s) { return s; } // XXX -> const string& ?
template<> string v_(error);
template<> string v_(zodb::Tid);
template<> string v_(zodb::Oid);
......
......@@ -40,14 +40,14 @@ void _WatchLink::decref() {
// _openwatch opens new watch link on wcfs.
pair<WatchLink, error> WCFS::_openwatch() {
WCFS *wc = this;
// XXX errctx += "wcfs %s: openwatch", wc.mountpoint ?
xerr::Contextf E("wcfs %s: openwatch", v(wc->mountpoint));
// head/watch handle.
os::File f;
error err;
tie(f, err) = wc->_open("head/watch", O_RDWR);
if (err != nil)
return make_pair(nil, err);
return make_pair(nil, E(err));
WatchLink wlink = adoptref(new(_WatchLink));
wlink->_wc = wc;
......@@ -110,14 +110,13 @@ error _WatchLink::close() {
if (err == nil)
err = err3;
//return errctx(err);
//return E(errctx, err);
return E(err);
}
// _serveRX receives messages from ._f and dispatches them according to streamID.
error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
_WatchLink& wlink = *this;
xerr::Contextf E("wlink X: serve rx"); // XXX +wlink details
// when finishing - wakeup everyone waiting for rx
defer([&]() {
......@@ -149,14 +148,14 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
// XXX place=ok?
if (err == io::EOF_)
err = nil;
return err;
return E(err);
}
printf("C: watch : rx: \"%s\"", l.c_str());
err = pkt.from_string(l);
//printf("line -> pkt: err='%s'\n", v(err));
if (err != nil)
return err;
return E(err);
//printf("pkt.stream: %lu\n", pkt.stream);
//printf("pkt.datalen: %u\n", pkt.datalen);
......@@ -188,7 +187,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
});
//printf("rxq <- pkt: -> sel #%d\n", _);
if (_ == 0)
return ctx->err();
return E(ctx->err());
}
else {
wlink._rxmu.lock();
......@@ -206,7 +205,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
wlink._acceptq.sends(&pkt), // 1
});
if (_ == 0)
return ctx->err();
return E(ctx->err());
}
}
}
......@@ -217,6 +216,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
// XXX +ctx?
error _WatchLink::_send(StreamID stream, const string &msg) {
_WatchLink *wlink = this;
// XXX +errctx ?
if (msg.find('\n') != string::npos)
panic("msg has \\n");
string pkt = fmt::sprintf("%lu %s\n", stream, msg.c_str());
......@@ -228,6 +228,7 @@ error _twlinkwrite(WatchLink wlink, const string &pkt) {
}
error _WatchLink::_write(const string &pkt) {
_WatchLink *wlink = this;
// XXX +errctx ?
wlink->_txmu.lock();
defer([&]() {
......@@ -245,7 +246,7 @@ error _WatchLink::_write(const string &pkt) {
// XXX -> reply | None when EOF
pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req) {
_WatchLink *wlink = this;
// XXX errctx
xerr::Contextf E("wlink X: sendReq"); // XXX wlink details
//printf("wlink sendReq '%s'\n", req.c_str());
......@@ -254,7 +255,7 @@ pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req)
error err;
tie(rxq, err) = wlink->_sendReq(ctx, req);
if (err != nil)
return make_pair("", err);
return make_pair("", E(err));
//printf("sendReq: wait ...\n");
int _ = select({
......@@ -263,10 +264,10 @@ pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req)
});
//printf("sendReq: woken up #%d\n", _);
if (_ == 0)
return make_pair("", ctx->err());
return make_pair("", E(ctx->err()));
if (!ok)
return make_pair("", io::ErrUnexpectedEOF); // XXX error ok?
return make_pair("", E(io::ErrUnexpectedEOF)); // XXX error ok?
string reply = rx.to_string();
//printf("sendReq: reply='%s'\n", reply.c_str());
return make_pair(reply, nil);
......@@ -313,7 +314,7 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, cons
// XXX document EOF.
error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string& answer) {
_WatchLink *wlink = this;
// XXX errctx?
xerr::Contextf E("wlink X: replyReq"); // XXX +wlink details
//print('C: reply %s <- %r ...' % (req, answer))
wlink->_rxmu.lock();
......@@ -334,14 +335,14 @@ error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string
panic("BUG: stream vanished from wlink._accepted while reply was in progress");
// XXX also track as answered? (and don't accept with the same ID ?)
return err;
return E(err);
}
// recvReq receives client <- server request.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt);
error _WatchLink::recvReq(context::Context ctx, PinReq *prx) {
_WatchLink& wlink = *this;
// XXX errctx?
xerr::Contextf E("wlink X: recvReq"); // XXX +wlink details
rxPkt pkt;
bool ok;
......@@ -350,17 +351,16 @@ error _WatchLink::recvReq(context::Context ctx, PinReq *prx) {
wlink._acceptq.recvs(&pkt, &ok), // 1
});
if (_ == 0)
return ctx->err();
return E(ctx->err());
if (!ok)
return io::EOF_;
return io::EOF_; // NOTE EOF goes without E
return _parsePinReq(prx, &pkt);
return E(_parsePinReq(prx, &pkt));
}
// _parsePinReq parses message into PinReq according to wcfs invalidation protocol.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt) {
// XXX errctx "bad pin"
//printf("parse pinreq: stream=%lu msg='%s'\n", pkt->stream, &pkt->data[0]);
pin->stream = pkt->stream;
string msg = pkt->to_string();
......@@ -368,37 +368,37 @@ static error _parsePinReq(PinReq *pin, const rxPkt *pkt) {
//printf("'%s'\n", msg.c_str());
//printf("has_prefix: %i\n", strings::has_prefix(msg, "pin "));
xerr::Contextf E("bad pin: '%s'", v(msg));
// pin <foid>) #<blk> @<at>
if (!strings::has_prefix(msg, "pin ")) {
//printf("\n\n\nnot a pin request: '%s'\n", msg.c_str()); // XXX temp
//abort();
return fmt::errorf("not a pin request: '%s'", msg.c_str()); // XXX msg -> errctx ?
return E(fmt::errorf("not a pin request"));
}
auto argv = strings::split(msg.substr(4), ' ');
if (argv.size() != 3)
return fmt::errorf("expected 3 arguments, got %zd", argv.size());
return E(fmt::errorf("expected 3 arguments, got %zd", argv.size()));
error err;
tie(pin->foid, err) = xstrconv::parseHex64(argv[0]);
if (err != nil)
return fmt::errorf("invalid foid");
return E(fmt::errorf("invalid foid"));
if (!strings::has_prefix(argv[1], '#'))
return fmt::errorf("invalid blk");
return E(fmt::errorf("invalid blk"));
tie(pin->blk, err) = xstrconv::parseInt(argv[1].substr(1));
if (err != nil)
return fmt::errorf("invalid blk");
return E(fmt::errorf("invalid blk"));
if (!strings::has_prefix(argv[2], '@'))
return fmt::errorf("invalid at");
return E(fmt::errorf("invalid at"));
auto at = argv[2].substr(1);
if (at == "head") {
pin->at = TidHead;
} else {
tie(pin->at, err) = xstrconv::parseHex64(at);
if (err != nil)
return fmt::errorf("invalid at");
return E(fmt::errorf("invalid at"));
}
return nil;
......@@ -407,6 +407,7 @@ static error _parsePinReq(PinReq *pin, const rxPkt *pkt) {
// _readline reads next raw line sent from wcfs.
tuple<string, error> _WatchLink::_readline() {
_WatchLink& wlink = *this;
// XXX errctx ?
char buf[128];
size_t nl_searchfrom = 0;
......@@ -441,24 +442,25 @@ tuple<string, error> _WatchLink::_readline() {
// from_string parses string into rxPkt.
error rxPkt::from_string(const string &rx) {
rxPkt& pkt = *this;
xerr::Contextf E("invalid pkt");
// <stream> ... \n
auto sp = rx.find(' ');
if (sp == string::npos)
return fmt::errorf("invalid pkt: no SP");
return E(fmt::errorf("no SP"));
if (!strings::has_suffix(rx, '\n'))
return fmt::errorf("invalid pkt: no LF");
return E(fmt::errorf("no LF"));
string sid = rx.substr(0, sp);
string smsg = strings::trim_suffix(rx.substr(sp+1), '\n');
error err;
tie(pkt.stream, err) = xstrconv::parseUint(sid);
if (err != nil)
return fmt::errorf("invalid pkt: invalid stream ID");
return E(fmt::errorf("invalid stream ID"));
auto msglen = smsg.length();
if (msglen > ARRAY_SIZE(pkt.data))
return fmt::errorf("invalid pkt: len(msg) > %zu", ARRAY_SIZE(pkt.data));
return E(fmt::errorf("len(msg) > %zu", ARRAY_SIZE(pkt.data)));
memcpy(pkt.data, smsg.c_str(), msglen);
pkt.datalen = msglen;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment