Commit 323ba50c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 97befba7
...@@ -211,6 +211,60 @@ error _WatchLink::_serveRX(context::Context ctx) { ...@@ -211,6 +211,60 @@ error _WatchLink::_serveRX(context::Context ctx) {
} }
} }
// recvReq receives client <- server request.
//
// it returns EOF when server closes the link.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt);
error _WatchLink::recvReq(context::Context ctx, PinReq *prx) {
_WatchLink& wlink = *this;
xerr::Contextf E("%s: recvReq", v(wlink));
rxPkt pkt;
bool ok;
int _ = select({
ctx->done().recvs(), // 0
wlink._acceptq.recvs(&pkt, &ok), // 1
});
if (_ == 0)
return E(ctx->err());
if (!ok)
return io::EOF_; // NOTE EOF goes without E
return E(_parsePinReq(prx, &pkt));
}
// replyReq sends reply to client <- server request received by recvReq.
error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string& answer) {
_WatchLink& wlink = *this;
xerr::Contextf E("%s: replyReq .%d", v(wlink), req->stream);
//print('C: reply %s <- %r ...' % (req, answer))
wlink._rxmu.lock();
bool ok = wlink._accepted.has(req->stream);
wlink._rxmu.unlock();
if (!ok)
panic("reply to not accepted stream");
error err = wlink._send(req->stream, answer);
wlink._rxmu.lock();
ok = wlink._accepted.has(req->stream);
if (ok)
wlink._accepted.erase(req->stream);
wlink._rxmu.unlock();
if (!ok)
panic("BUG: stream vanished from wlink._accepted while reply was in progress");
// TODO also track as answered? (and don't accept with the same ID ?)
return E(err);
}
// _send sends raw message via specified stream. // _send sends raw message via specified stream.
// //
// multiple _send can be called in parallel - _send serializes writes. // multiple _send can be called in parallel - _send serializes writes.
...@@ -322,55 +376,6 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, cons ...@@ -322,55 +376,6 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, cons
return make_tuple(rxq, err); return make_tuple(rxq, err);
} }
// replyReq sends reply to client <- server request received by recvReq.
error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string& answer) {
_WatchLink& wlink = *this;
xerr::Contextf E("%s: replyReq .%d", v(wlink), req->stream);
//print('C: reply %s <- %r ...' % (req, answer))
wlink._rxmu.lock();
bool ok = wlink._accepted.has(req->stream);
wlink._rxmu.unlock();
if (!ok)
panic("reply to not accepted stream");
error err = wlink._send(req->stream, answer);
wlink._rxmu.lock();
ok = wlink._accepted.has(req->stream);
if (ok)
wlink._accepted.erase(req->stream);
wlink._rxmu.unlock();
if (!ok)
panic("BUG: stream vanished from wlink._accepted while reply was in progress");
// TODO also track as answered? (and don't accept with the same ID ?)
return E(err);
}
// recvReq receives client <- server request.
//
// it returns EOF when server closes the link.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt);
error _WatchLink::recvReq(context::Context ctx, PinReq *prx) {
_WatchLink& wlink = *this;
xerr::Contextf E("%s: recvReq", v(wlink));
rxPkt pkt;
bool ok;
int _ = select({
ctx->done().recvs(), // 0
wlink._acceptq.recvs(&pkt, &ok), // 1
});
if (_ == 0)
return E(ctx->err());
if (!ok)
return io::EOF_; // NOTE EOF goes without E
return E(_parsePinReq(prx, &pkt));
}
// _parsePinReq parses message into PinReq according to wcfs isolation protocol. // _parsePinReq parses message into PinReq according to wcfs isolation protocol.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt) { static error _parsePinReq(PinReq *pin, const rxPkt *pkt) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment