Commit 832d2bf3 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2645c87a
...@@ -59,7 +59,7 @@ pair<WatchLink, error> WCFS::_openwatch() { ...@@ -59,7 +59,7 @@ pair<WatchLink, error> WCFS::_openwatch() {
wlink->_wc = wc; wlink->_wc = wc;
wlink->_f = f; wlink->_f = f;
wlink->_acceptq = makechan<rxPkt>(); wlink->_acceptq = makechan<rxPkt>();
wlink->_rxdown = false; wlink->_down = false;
wlink->_req_next = 1; wlink->_req_next = 1;
wlink->rx_eof = makechan<structZ>(); wlink->rx_eof = makechan<structZ>();
...@@ -131,7 +131,7 @@ error _WatchLink::_serveRX(context::Context ctx) { ...@@ -131,7 +131,7 @@ error _WatchLink::_serveRX(context::Context ctx) {
defer([&]() { defer([&]() {
wlink._acceptq.close(); wlink._acceptq.close();
wlink._rxmu.lock(); wlink._rxmu.lock();
wlink._rxdown = true; // don't allow new rxtab registers, mark the link as down wlink._down = true; // don't allow new rxtab registers; mark the link as down
wlink._rxmu.unlock(); wlink._rxmu.unlock();
for (auto _ : wlink._rxtab) { // FIXME iterates without lock for (auto _ : wlink._rxtab) { // FIXME iterates without lock
auto rxq = _.second; auto rxq = _.second;
...@@ -230,7 +230,7 @@ error _WatchLink::recvReq(context::Context ctx, PinReq *prx) { ...@@ -230,7 +230,7 @@ error _WatchLink::recvReq(context::Context ctx, PinReq *prx) {
if (!ok) { if (!ok) {
wlink._rxmu.lock(); wlink._rxmu.lock();
bool down = wlink._rxdown; bool down = wlink._down;
wlink._rxmu.unlock(); wlink._rxmu.unlock();
if (down) if (down)
...@@ -248,7 +248,7 @@ error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string ...@@ -248,7 +248,7 @@ error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string
wlink._rxmu.lock(); wlink._rxmu.lock();
bool ok = wlink._accepted.has(req->stream); bool ok = wlink._accepted.has(req->stream);
bool down = wlink._rxdown; bool down = wlink._down;
wlink._rxmu.unlock(); wlink._rxmu.unlock();
if (!ok) if (!ok)
panic("reply to not accepted stream"); panic("reply to not accepted stream");
...@@ -296,7 +296,7 @@ pair</*reply*/string, error> _WatchLink::sendReq(context::Context ctx, const str ...@@ -296,7 +296,7 @@ pair</*reply*/string, error> _WatchLink::sendReq(context::Context ctx, const str
if (!ok) { if (!ok) {
wlink._rxmu.lock(); wlink._rxmu.lock();
bool down = wlink._rxdown; bool down = wlink._down;
wlink._rxmu.unlock(); wlink._rxmu.unlock();
return make_pair("", E(down ? ErrLinkDown : io::ErrUnexpectedEOF)); return make_pair("", E(down ? ErrLinkDown : io::ErrUnexpectedEOF));
...@@ -310,7 +310,7 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, Stre ...@@ -310,7 +310,7 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, Stre
auto rxq = makechan<rxPkt>(1); auto rxq = makechan<rxPkt>(1);
wlink._rxmu.lock(); wlink._rxmu.lock();
if (wlink._rxdown) { if (wlink._down) {
wlink._rxmu.unlock(); wlink._rxmu.unlock();
return make_tuple(nil, ErrLinkDown); return make_tuple(nil, ErrLinkDown);
} }
......
...@@ -78,7 +78,7 @@ class _WatchLink : public object { ...@@ -78,7 +78,7 @@ class _WatchLink : public object {
// iso.protocol message IO // iso.protocol message IO
chan<rxPkt> _acceptq; // server originated messages go here chan<rxPkt> _acceptq; // server originated messages go here
sync::Mutex _rxmu; sync::Mutex _rxmu;
bool _rxdown; // y when the link is no-longer operational bool _down; // y when the link is no-longer operational
dict<StreamID, chan<rxPkt>> dict<StreamID, chan<rxPkt>>
_rxtab; // {} stream -> rxq server replies go via here _rxtab; // {} stream -> rxq server replies go via here
set<StreamID> _accepted; // streams we accepted but did not replied yet set<StreamID> _accepted; // streams we accepted but did not replied yet
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment