Commit 4f012c3c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ab3841e7
...@@ -565,7 +565,7 @@ class WatchLink(object): ...@@ -565,7 +565,7 @@ class WatchLink(object):
stream = wlink._req_next stream = wlink._req_next
wlink._req_next = (wlink._req_next + 2) & ((1<<64)-1) wlink._req_next = (wlink._req_next + 2) & ((1<<64)-1)
rxq = chan() rxq = chan() # -> XXX cap=1 so that we don't need to drain if _send fails
with wlink._rxmu: with wlink._rxmu:
assert stream not in wlink._rxtab # XXX !test assert - recheck assert stream not in wlink._rxtab # XXX !test assert - recheck
wlink._rxtab[stream] = rxq wlink._rxtab[stream] = rxq
......
...@@ -406,13 +406,12 @@ error WatchLink::_send(StreamID stream, const string &msg) { ...@@ -406,13 +406,12 @@ error WatchLink::_send(StreamID stream, const string &msg) {
//error WatchLink::_write(const Pkt *pkt) { //error WatchLink::_write(const Pkt *pkt) {
error WatchLink::_write(const string &pkt) { error WatchLink::_write(const string &pkt) {
WatchLink *wlink = this; WatchLink *wlink = this;
wlink->_txmu.lock(); wlink->_txmu.lock();
defer([&]() { defer([&]() {
wlink->_txmu.unlock(); wlink->_txmu.unlock();
}) })
//printf('C: watch : tx: %r' % pkt) //printf('C: watch : tx: %r' % pkt)
int n; int n;
error err; error err;
...@@ -425,28 +424,38 @@ error WatchLink::_write(const string &pkt) { ...@@ -425,28 +424,38 @@ error WatchLink::_write(const string &pkt) {
} }
// sendReq sends client -> server request and returns server reply. // sendReq sends client -> server request and returns server reply.
def WatchLink::sendReq(ctx, req) { // -> reply | None when EOF // XXX -> reply | None when EOF
tuple<string, error> WatchLink::sendReq(IContext *ctx, const string &req) {
WatchLink *wlink = this; WatchLink *wlink = this;
rxq = wlink._sendReq(ctx, req) // XXX err ctx
_, _rx = select( rxPkt rx;
ctx.done().recv, // 0 chan<rxPkt> rxq;
rxq.recv, // 1 error err;
) tie(rxq, err) = wlink._sendReq(ctx, req);
if _ == 0: // XXX err
raise ctx.err()
return _rx _ = select(
ctx.done().recvs(), // 0
rxq.recvs(&rx), // 1
);
if (_ == 0)
return make_tuple("", ctx.err());
// XXX check for EOF
string reply = rx.XXXtostring();
return make_tuple(reply, nil);
} }
def WatchLink::_sendReq(ctx, req) { // -> rxq tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(IContext *ctx, const string &req) {
WatchLink *wlink = this; WatchLink *wlink = this;
wlink->_txmu.lock(); // XXX -> atomic (currently uses arbitrary lock) wlink->_txmu.lock(); // XXX -> atomic (currently uses arbitrary lock)
StreamID stream = wlink->_req_next; StreamID stream = wlink->_req_next;
wlink->_req_next = (wlink->_req_next + 2) & ((1<<64)-1); wlink->_req_next = (wlink->_req_next + 2) & ((1ULL<<64)-1);
wlink->_txmu.unlock(); wlink->_txmu.unlock();
rxq = chan() auto rxq = makechan<rxPkt>(1);
wlink->_rxmu.lock(); wlink->_rxmu.lock();
if (has(wlink->rxtab, stream)) { if (has(wlink->rxtab, stream)) {
wlink->_rxmu.unlock(); wlink->_rxmu.unlock();
...@@ -456,9 +465,17 @@ def WatchLink::_sendReq(ctx, req) { // -> rxq ...@@ -456,9 +465,17 @@ def WatchLink::_sendReq(ctx, req) { // -> rxq
wlink->_rxmu.unlock(); wlink->_rxmu.unlock();
err = wlink->_send(stream, req); err = wlink->_send(stream, req);
// XXX err if (err != nil) {
// remove rxq from rxtab
wlink->_rxmu.lock();
wlink->rxtab.erase(stream)
wlink->_rxmu.unlock();
// no need to drain rxq - it was created with cap=1
rxq = nil;
}
return rxq; return make_tuple(rxq, err);
} }
// ---- WCFS raw file access ---- // ---- WCFS raw file access ----
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment