Commit 17af7916 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ad889d0b
...@@ -50,12 +50,14 @@ cdef extern from "wcfs_misc.h" nogil: ...@@ -50,12 +50,14 @@ cdef extern from "wcfs_misc.h" nogil:
cdef extern from "wcfs_watchlink.h" nogil: cdef extern from "wcfs_watchlink.h" nogil:
cppclass _WatchLink: cppclass _WatchLink:
error close() error close()
error closeWrite()
error recvReq(context.Context ctx, PinReq *prx) error recvReq(context.Context ctx, PinReq *prx)
pair[string, error] sendReq(context.Context ctx, const string &req) pair[string, error] sendReq(context.Context ctx, const string &req)
cppclass WatchLink (refptr[_WatchLink]): cppclass WatchLink (refptr[_WatchLink]):
# WatchLink.X = WatchLink->X in C++ # WatchLink.X = WatchLink->X in C++
error close "_ptr()->close" () error close "_ptr()->close" ()
error closeWrite "_ptr()->closeWrite"()
error recvReq "_ptr()->recvReq" (context.Context ctx, PinReq *prx) error recvReq "_ptr()->recvReq" (context.Context ctx, PinReq *prx)
pair[string, error] sendReq "_ptr()->sendReq" (context.Context ctx, const string &req) pair[string, error] sendReq "_ptr()->sendReq" (context.Context ctx, const string &req)
...@@ -104,6 +106,12 @@ cdef class PyWatchLink: ...@@ -104,6 +106,12 @@ cdef class PyWatchLink:
if err != nil: if err != nil:
raise RuntimeError(err.Error()) # XXX exc class? raise RuntimeError(err.Error()) # XXX exc class?
def closeWrite(PyWatchLink pywlink):
with nogil:
err = wlink_closeWrite_pyexc(pywlink.wlink)
if err != nil:
raise RuntimeError(err.Error()) # XXX exc class?
def recvReq(PyWatchLink pywlink, context.PyContext pyctx): # -> PinReq | None when EOF def recvReq(PyWatchLink pywlink, context.PyContext pyctx): # -> PinReq | None when EOF
cdef PinReq req cdef PinReq req
...@@ -164,12 +172,15 @@ from golang cimport topyexc ...@@ -164,12 +172,15 @@ from golang cimport topyexc
cdef nogil: cdef nogil:
pair[WatchLink, error] wcfs_openwatch_pyexc(WCFS *wcfs) except +topyexc: pair[WatchLink, error] wcfs_openwatch_pyexc(WCFS *wcfs) except +topyexc:
return wcfs._openwatch() return wcfs._openwatch()
error wlink_close_pyexc(WatchLink wlink) except +topyexc: error wlink_close_pyexc(WatchLink wlink) except +topyexc:
return wlink.close() return wlink.close()
error wlink_closeWrite_pyexc(WatchLink wlink) except +topyexc:
return wlink.closeWrite()
error wlink_recvReq_pyexc(WatchLink wlink, context.Context ctx, PinReq *prx) except +topyexc: error wlink_recvReq_pyexc(WatchLink wlink, context.Context ctx, PinReq *prx) except +topyexc:
return wlink.recvReq(ctx, prx) return wlink.recvReq(ctx, prx)
......
...@@ -68,13 +68,14 @@ pair<WatchLink, error> WCFS::_openwatch() { ...@@ -68,13 +68,14 @@ pair<WatchLink, error> WCFS::_openwatch() {
return make_pair(wlink, nil); return make_pair(wlink, nil);
} }
void _WatchLink::_closeTX() { error _WatchLink::closeWrite() {
_WatchLink &wlink = *this; _WatchLink &wlink = *this;
error err;
wlink._txclose1.do_([&]() { wlink._txclose1.do_([&]() {
// ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up // ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up
// _serveRX on client (= on us). The connection can be already closed // _serveRX on client (= on us). The connection can be already closed
// by wcfs - so ignore errors when sending bye. // by wcfs - so ignore errors when sending bye.
(void)wlink._send(1, "bye"); // XXX stream ok? err = wlink._send(1, "bye"); // XXX stream ok?
// XXX vvv should be ~ shutdown(TX, wlink._f), however shutdown does // XXX vvv should be ~ shutdown(TX, wlink._f), however shutdown does
// not work for non-socket file descriptors. And even if we dup link // not work for non-socket file descriptors. And even if we dup link
// fd, and close only one used for TX, peer's RX will still be blocked // fd, and close only one used for TX, peer's RX will still be blocked
...@@ -82,13 +83,14 @@ void _WatchLink::_closeTX() { ...@@ -82,13 +83,14 @@ void _WatchLink::_closeTX() {
// state. So just use ^^^ "bye" as "TX closed" message. // state. So just use ^^^ "bye" as "TX closed" message.
// wlink._wtx.close(); // wlink._wtx.close();
}); });
return err;
} }
// close closes the link. // close closes the link.
error _WatchLink::close() { error _WatchLink::close() {
_WatchLink& wlink = *this; _WatchLink& wlink = *this;
wlink._closeTX(); error err = wlink.closeWrite();
wlink._serveCancel(); wlink._serveCancel();
// XXX we can get stuck here if wcfs does not behave as we want. // XXX we can get stuck here if wcfs does not behave as we want.
// XXX in particular if there is a silly - e.g. syntax or type error in // XXX in particular if there is a silly - e.g. syntax or type error in
...@@ -96,18 +98,20 @@ error _WatchLink::close() { ...@@ -96,18 +98,20 @@ error _WatchLink::close() {
// //
// XXX -> better pthread_kill(SIGINT) instead of relying on wcfs proper behaviour? // XXX -> better pthread_kill(SIGINT) instead of relying on wcfs proper behaviour?
// XXX -> we now have `kill -QUIT` to wcfs.go on test timeout - remove ^^^ comments? // XXX -> we now have `kill -QUIT` to wcfs.go on test timeout - remove ^^^ comments?
error err = wlink._serveWG->wait(); error err2 = wlink._serveWG->wait();
// canceled is expected and ok // canceled is expected and ok
if (err == context::canceled) if (err2 == context::canceled)
err = nil; err2 = nil;
//printf("close -> err =%s\n", (err != nil ? err->Error().c_str() : "nil")); //printf("close -> err =%s\n", (err != nil ? err->Error().c_str() : "nil"));
//printf("close -> err =%s\n", v(err)); //printf("close -> err =%s\n", v(err));
error err2 = wlink._f->close(); error err3 = wlink._f->close();
//printf("close -> err2=%s\n", (err != nil ? err->Error().c_str() : "nil")); //printf("close -> err2=%s\n", (err != nil ? err->Error().c_str() : "nil"));
if (err == nil) if (err == nil)
err = err2; err = err2;
if (err == nil)
err = err3;
return err; return err;
} }
......
...@@ -99,11 +99,11 @@ public: ...@@ -99,11 +99,11 @@ public:
public: public:
error close(); error close();
error closeWrite();
error recvReq(context::Context ctx, PinReq *rx_into); error recvReq(context::Context ctx, PinReq *rx_into);
pair<string, error> sendReq(context::Context ctx, const string &req); pair<string, error> sendReq(context::Context ctx, const string &req);
private: private:
void _closeTX();
error _serveRX(context::Context ctx); error _serveRX(context::Context ctx);
tuple<string, error> _readline(); tuple<string, error> _readline();
error _send(StreamID stream, const string &msg); error _send(StreamID stream, const string &msg);
......
...@@ -1224,7 +1224,7 @@ def test_wcfs_watch_robust(): ...@@ -1224,7 +1224,7 @@ def test_wcfs_watch_robust():
assert req is not None assert req is not None
assert req.msg == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1)) assert req.msg == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1))
# don't reply to req - close instead # don't reply to req - close instead
wl._closeTX() wl.closeWrite()
wg.go(_) wg.go(_)
wg.wait() wg.wait()
wl.close() wl.close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment