Commit 9fe0087c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent cd923f51
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
# XXX doc # XXX doc
from golang cimport string, error, refptr from golang cimport chan, structZ, string, error, refptr, pychan
from golang cimport context from golang cimport context
from libcpp cimport nullptr_t, nullptr as nil from libcpp cimport nullptr_t, nullptr as nil
...@@ -54,7 +54,9 @@ cdef extern from "wcfs_watchlink.h" nogil: ...@@ -54,7 +54,9 @@ cdef extern from "wcfs_watchlink.h" nogil:
error closeWrite() error closeWrite()
error recvReq(context.Context ctx, PinReq *prx) error recvReq(context.Context ctx, PinReq *prx)
pair[string, error] sendReq(context.Context ctx, const string &req) pair[string, error] sendReq(context.Context ctx, const string &req)
vector[string] fatalv vector[string] fatalv
chan[structZ] rx_eof
cppclass WatchLink (refptr[_WatchLink]): cppclass WatchLink (refptr[_WatchLink]):
# WatchLink.X = WatchLink->X in C++ # WatchLink.X = WatchLink->X in C++
...@@ -62,7 +64,9 @@ cdef extern from "wcfs_watchlink.h" nogil: ...@@ -62,7 +64,9 @@ cdef extern from "wcfs_watchlink.h" nogil:
error closeWrite "_ptr()->closeWrite"() error closeWrite "_ptr()->closeWrite"()
error recvReq "_ptr()->recvReq" (context.Context ctx, PinReq *prx) error recvReq "_ptr()->recvReq" (context.Context ctx, PinReq *prx)
pair[string, error] sendReq "_ptr()->sendReq" (context.Context ctx, const string &req) pair[string, error] sendReq "_ptr()->sendReq" (context.Context ctx, const string &req)
vector[string] fatalv "_ptr()->fatalv" vector[string] fatalv "_ptr()->fatalv"
chan[structZ] rx_eof "_ptr()->rx_eof"
cppclass PinReq: cppclass PinReq:
Oid foid Oid foid
...@@ -151,6 +155,9 @@ cdef class PyWatchLink: ...@@ -151,6 +155,9 @@ cdef class PyWatchLink:
property fatalv: property fatalv:
def __get__(PyWatchLink pywlink): def __get__(PyWatchLink pywlink):
return pywlink.wlink.fatalv return pywlink.wlink.fatalv
property rx_eof:
def __get__(PyWatchLink pywlink):
return pychan.from_chan_structZ(pywlink.wlink.rx_eof)
cdef class PyPinReq: cdef class PyPinReq:
......
...@@ -53,11 +53,12 @@ pair<WatchLink, error> WCFS::_openwatch() { ...@@ -53,11 +53,12 @@ pair<WatchLink, error> WCFS::_openwatch() {
WatchLink wlink = adoptref(new(_WatchLink)); WatchLink wlink = adoptref(new(_WatchLink));
wlink->_wc = wc; wlink->_wc = wc;
wlink->_f = f; wlink->_f = f;
wlink->_rx_eof = makechan<structZ>();
wlink->_acceptq = makechan<rxPkt>(); wlink->_acceptq = makechan<rxPkt>();
wlink->_rxdown = false; wlink->_rxdown = false;
wlink->_req_next = 1; wlink->_req_next = 1;
wlink->rx_eof = makechan<structZ>();
context::Context serveCtx; context::Context serveCtx;
tie(serveCtx, wlink->_serveCancel) = context::with_cancel(context::background()); tie(serveCtx, wlink->_serveCancel) = context::with_cancel(context::background());
wlink->_serveWG = sync::NewWorkGroup(serveCtx); wlink->_serveWG = sync::NewWorkGroup(serveCtx);
...@@ -144,7 +145,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ? ...@@ -144,7 +145,7 @@ error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
//printf(" readline -> woken up; l='%s' ; err='%s'\n", l.c_str(), v(err)); //printf(" readline -> woken up; l='%s' ; err='%s'\n", l.c_str(), v(err));
if (err == io::EOF_) { // peer closed its tx if (err == io::EOF_) { // peer closed its tx
// XXX what happens on other errors? // XXX what happens on other errors?
wlink._rx_eof.close(); wlink.rx_eof.close();
} }
if (err != nil) { if (err != nil) {
// XXX place=ok? // XXX place=ok?
......
...@@ -72,7 +72,6 @@ class _WatchLink : public object { ...@@ -72,7 +72,6 @@ class _WatchLink : public object {
WCFS *_wc; WCFS *_wc;
os::File _f; // head/watch file handle os::File _f; // head/watch file handle
string _rxbuf; // buffer for read data from _f string _rxbuf; // buffer for read data from _f
chan<structZ> _rx_eof; // becomes ready when wcfs closes its tx side
// inv.protocol message IO // inv.protocol message IO
chan<rxPkt> _acceptq; // server originated messages go here chan<rxPkt> _acceptq; // server originated messages go here
...@@ -89,9 +88,10 @@ class _WatchLink : public object { ...@@ -89,9 +88,10 @@ class _WatchLink : public object {
func<void()> _serveCancel; func<void()> _serveCancel;
sync::WorkGroup _serveWG; sync::WorkGroup _serveWG;
// XXX for tests, ad-hoc, racy. TODO rework to send messeges to control channel // XXX for tests
public: public:
vector<string> fatalv; vector<string> fatalv; // ad-hoc, racy. TODO rework to send messeges to control channel
chan<structZ> rx_eof; // becomes ready when wcfs closes its tx side
// don't new - create only via WCFS._openwatch() // don't new - create only via WCFS._openwatch()
private: private:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment