Commit 30284372 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 91e13fd5
......@@ -60,6 +60,7 @@ pair<WatchLink, error> WCFS::_openwatch() {
wlink->_f = f;
wlink->_acceptq = makechan<rxPkt>();
wlink->_down = false;
wlink->_rxeof = false;
wlink->_req_next = 1;
wlink->rx_eof = makechan<structZ>();
......@@ -127,12 +128,15 @@ error _WatchLink::_serveRX(context::Context ctx) {
_WatchLink& wlink = *this;
xerr::Contextf E("%s: serve rx", v(wlink));
bool rxeof = false;
// when finishing - wakeup everyone waiting for rx
defer([&]() {
wlink._acceptq.close();
wlink._rxmu.lock();
wlink._down = true; // don't allow new rxtab registers; mark the link as down
wlink._rxeof = rxeof;
wlink._down = true; // don't allow new rxtab registers; mark the link as down
wlink._rxmu.unlock();
wlink._acceptq.close();
for (auto _ : wlink._rxtab) { // FIXME iterates without lock
auto rxq = _.second;
rxq.close();
......@@ -150,6 +154,7 @@ error _WatchLink::_serveRX(context::Context ctx) {
// peer closed its tx
if (err == io::EOF_) {
err = nil;
rxeof = true;
wlink.rx_eof.close();
}
return E(err);
......@@ -230,12 +235,12 @@ error _WatchLink::recvReq(context::Context ctx, PinReq *prx) {
if (!ok) {
wlink._rxmu.lock();
bool down = wlink._down;
bool rxeof = wlink._rxeof;
wlink._rxmu.unlock();
if (down)
return E(ErrLinkDown);
return io::EOF_; // NOTE EOF goes without E
if (rxeof)
return io::EOF_; // NOTE EOF goes without E
return E(ErrLinkDown);
}
return E(_parsePinReq(prx, &pkt));
......
......@@ -77,8 +77,9 @@ class _WatchLink : public object {
// iso.protocol message IO
chan<rxPkt> _acceptq; // server originated messages go here
sync::Mutex _rxmu;
sync::Mutex _rxmu; // XXX -> _mu ?
bool _down; // y when the link is no-longer operational
bool _rxeof; // y if EOF was received from server
dict<StreamID, chan<rxPkt>>
_rxtab; // {} stream -> rxq server replies go via here
set<StreamID> _accepted; // streams we accepted but did not replied yet
......
......@@ -1804,7 +1804,7 @@ func (wlink *WatchLink) _serve() (err error) {
return e
})
// XXX recheck that it is safe to hanle multiple simultaneous watch requests.
// XXX recheck that it is safe to handle multiple simultaneous watch requests.
for {
l, err := r.ReadString('\n') // XXX limit accepted line len to prevent DOS
......
......@@ -1240,7 +1240,8 @@ def test_wcfs_watch_robust():
#assert wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1))) == \
# "error setup watch f<%s> @%s: " % (h(zf._p_oid), h(at1)) + \
# "pin #%d @%s: context canceled" % (2, h(at1))
with raises(error, match="unexpected EOF"):
#with raises(error, match="unexpected EOF"):
with raises(error, match="recvReply: link is down"):
wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1)))
wg.go(_)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment