Commit 538ac6f8 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 8045d1e3
......@@ -207,6 +207,7 @@ public:
private:
void _closeTX();
void _serveRX(IContext *ctx);
error _send(StreamID stream, const string &msg);
error _write(const string &pkt);
tuple<chan<rxPkt>, error> _sendReq(IContext *ctx, const string &req);
......@@ -448,6 +449,67 @@ error WatchLink::close() {
return err;
}
// _serveRX receives messages from ._f and dispatches them according to streamID.
void WatchLink::_serveRX(IContext *ctx) { // XXX void -> ?
WatchLink& wlink = *this;
// when finishing - wakeup everyone waiting for rx
defer([&]() {
wlink._acceptq.close();
with wlink._rxmu:
rxtab = wlink._rxtab
wlink._rxtab = None // don't allow new rxtab registers
for rxq in rxtab.values():
rxq.close();
});
while (1) {
// NOTE: .close() makes sure .f.read*() will wake up
l = wlink.f.readline()
printf("C: watch : rx: %r" % l);
if (len(l) == 0) { // peer closed its tx
wlink.rx_eof.close();
break;
}
// <stream> ... \n
stream, msg = l.split(' ', 1)
stream = int(stream)
msg = msg.rstrip('\n')
if (stream == 0) { // control/fatal message from wcfs
// XXX print -> receive somewhere? XXX -> recvCtl ?
print("C: watch : rx fatal: %r" % msg);
wlink.fatalv.append(msg);
continue;
}
bool reply = (stream % 2 != 0);
if (reply) {
with wlink._rxmu:
assert stream in wlink._rxtab // XXX !test assert - recheck
rxq = wlink._rxtab.pop(stream)
_, _rx = select(
ctx.done().recv, // 0
(rxq.send, msg), // 1
)
if _ == 0:
raise ctx.err()
}
else {
with wlink._rxmu:
assert stream not in wlink._accepted // XXX !test assert - recheck
wlink._accepted.add(stream)
_, _rx = select(
ctx.done().recv, // 0
(wlink._acceptq.send, (stream, msg)), // 1
)
if _ == 0:
raise ctx.err()
}
}
}
// _send sends raw message via specified stream.
//
// multiple _send can be called in parallel - _send serializes writes.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment