Commit e5d3866a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2a4ad58d
......@@ -64,6 +64,8 @@ pair<WatchLink, error> WCFS::_openwatch() {
error _WatchLink::closeWrite() {
_WatchLink &wlink = *this;
// XXX err ctx?
wlink._txclose1.do_([&]() {
// ask wcfs to close its tx & rx sides; wcfs.close(tx) wakes up
// _serveRX on client (= on us). The connection can be already closed
......@@ -208,6 +210,8 @@ error _WatchLink::_serveRX(context::Context ctx) {
// _send sends raw message via specified stream.
//
// multiple _send can be called in parallel - _send serializes writes.
// msg must not include \n.
//
// XXX +ctx?
error _WatchLink::_send(StreamID stream, const string &msg) {
_WatchLink *wlink = this;
......@@ -241,7 +245,7 @@ error _WatchLink::_write(const string &pkt) {
// XXX -> reply | None when EOF
pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req) {
_WatchLink *wlink = this;
xerr::Contextf E("%s: sendReq", v(wlink));
xerr::Contextf E("%s: sendReq", v(wlink)); // XXX + streamID
//printf("wlink sendReq '%s'\n", v(req));
......@@ -272,7 +276,7 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, cons
_WatchLink *wlink = this;
// XXX errctx?
wlink->_txmu.lock(); // XXX -> atomic (currently uses arbitrary lock)
wlink->_txmu.lock(); // TODO ._req_next -> atomic (currently uses arbitrary lock)
StreamID stream = wlink->_req_next;
wlink->_req_next = (wlink->_req_next + 2); // wraparound at uint64 max
wlink->_txmu.unlock();
......@@ -309,7 +313,7 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, cons
// XXX document EOF. XXX -> no EOF here - only ErrUnexpectedEOF
error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string& answer) {
_WatchLink *wlink = this;
xerr::Contextf E("%s: replyReq", v(wlink));
xerr::Contextf E("%s: replyReq .%d", v(wlink), req.stream);
//print('C: reply %s <- %r ...' % (req, answer))
wlink->_rxmu.lock();
......
......@@ -83,7 +83,7 @@ class _WatchLink : public object {
_rxtab; // {} stream -> rxq server replies go via here
set<StreamID> _accepted; // streams we accepted but did not replied yet
StreamID _req_next; // stream ID for next client-originated request XXX -> atomic
StreamID _req_next; // stream ID for next client-originated request TODO -> atomic
sync::Mutex _txmu; // serializes writes
sync::Once _txclose1;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment