Commit 3ce03313 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ce04e56e
...@@ -55,8 +55,9 @@ cdef extern from "wcfs_watchlink.h" nogil: ...@@ -55,8 +55,9 @@ cdef extern from "wcfs_watchlink.h" nogil:
cppclass _WatchLink: cppclass _WatchLink:
error close() error close()
error closeWrite() error closeWrite()
error recvReq(context.Context ctx, PinReq *prx)
pair[string, error] sendReq(context.Context ctx, const string &req) pair[string, error] sendReq(context.Context ctx, const string &req)
error recvReq(context.Context ctx, PinReq *prx)
error replyReq(context.Context ctx, const PinReq *req, const string& reply);
vector[string] fatalv vector[string] fatalv
chan[structZ] rx_eof chan[structZ] rx_eof
...@@ -65,8 +66,9 @@ cdef extern from "wcfs_watchlink.h" nogil: ...@@ -65,8 +66,9 @@ cdef extern from "wcfs_watchlink.h" nogil:
# WatchLink.X = WatchLink->X in C++ # WatchLink.X = WatchLink->X in C++
error close "_ptr()->close" () error close "_ptr()->close" ()
error closeWrite "_ptr()->closeWrite"() error closeWrite "_ptr()->closeWrite"()
error recvReq "_ptr()->recvReq" (context.Context ctx, PinReq *prx)
pair[string, error] sendReq "_ptr()->sendReq" (context.Context ctx, const string &req) pair[string, error] sendReq "_ptr()->sendReq" (context.Context ctx, const string &req)
error recvReq "_ptr()->recvReq" (context.Context ctx, PinReq *prx)
error replyReq "_ptr()->replyReq" (context.Context ctx, const PinReq *req, const string& reply);
vector[string] fatalv "_ptr()->fatalv" vector[string] fatalv "_ptr()->fatalv"
chan[structZ] rx_eof "_ptr()->rx_eof" chan[structZ] rx_eof "_ptr()->rx_eof"
...@@ -125,10 +127,27 @@ cdef class PyWatchLink: ...@@ -125,10 +127,27 @@ cdef class PyWatchLink:
raise RuntimeError(err.Error()) # XXX exc class? raise RuntimeError(err.Error()) # XXX exc class?
def sendReq(PyWatchLink pywlink, context.PyContext pyctx, string req): # -> reply(string)
with nogil:
_ = wlink_sendReq_pyexc(pywlink.wlink, pyctx.ctx, req)
reply = _.first
err = _.second
if err != nil:
# XXX -> common place? support for other errors? is it good idea?
if err.eq(context.canceled):
raise pycontext.canceled
if err.eq(context.deadlineExceeded):
raise pycontext.deadlineExceeded
raise RuntimeError(err.Error()) # XXX -> Xpy(err) ? pyraiseIf(err) ?
return reply
def recvReq(PyWatchLink pywlink, context.PyContext pyctx): # -> PinReq | None when EOF def recvReq(PyWatchLink pywlink, context.PyContext pyctx): # -> PinReq | None when EOF
cdef PinReq req cdef PyPinReq pyreq = PyPinReq.__new__(PyPinReq)
with nogil: with nogil:
err = wlink_recvReq_pyexc(pywlink.wlink, pyctx.ctx, &req) err = wlink_recvReq_pyexc(pywlink.wlink, pyctx.ctx, &pyreq.pinreq)
if err.eq(EOF): if err.eq(EOF):
return None return None
...@@ -142,15 +161,11 @@ cdef class PyWatchLink: ...@@ -142,15 +161,11 @@ cdef class PyWatchLink:
raise RuntimeError(err.Error()) # XXX exc class? raise RuntimeError(err.Error()) # XXX exc class?
cdef PyPinReq pyreq = PyPinReq.__new__(PyPinReq)
pyreq.pinreq = req
return pyreq return pyreq
def sendReq(PyWatchLink pywlink, context.PyContext pyctx, string req): # -> reply(string) def replyReq(PyWatchLink pywlink, context.PyContext pyctx, PyPinReq pyreq, string reply):
with nogil: with nogil:
_ = wlink_sendReq_pyexc(pywlink.wlink, pyctx.ctx, req) err = wlink_replyReq_pyexc(pywlink.wlink, pyctx.ctx, &pyreq.pinreq, reply)
reply = _.first
err = _.second
if err != nil: if err != nil:
# XXX -> common place? support for other errors? is it good idea? # XXX -> common place? support for other errors? is it good idea?
...@@ -161,7 +176,7 @@ cdef class PyWatchLink: ...@@ -161,7 +176,7 @@ cdef class PyWatchLink:
raise RuntimeError(err.Error()) # XXX -> Xpy(err) ? pyraiseIf(err) ? raise RuntimeError(err.Error()) # XXX -> Xpy(err) ? pyraiseIf(err) ?
return reply return
# XXX for tests # XXX for tests
...@@ -219,11 +234,14 @@ cdef nogil: ...@@ -219,11 +234,14 @@ cdef nogil:
error wlink_closeWrite_pyexc(WatchLink wlink) except +topyexc: error wlink_closeWrite_pyexc(WatchLink wlink) except +topyexc:
return wlink.closeWrite() return wlink.closeWrite()
pair[string, error] wlink_sendReq_pyexc(WatchLink wlink, context.Context ctx, const string &req) except +topyexc:
return wlink.sendReq(ctx, req)
error wlink_recvReq_pyexc(WatchLink wlink, context.Context ctx, PinReq *prx) except +topyexc: error wlink_recvReq_pyexc(WatchLink wlink, context.Context ctx, PinReq *prx) except +topyexc:
return wlink.recvReq(ctx, prx) return wlink.recvReq(ctx, prx)
pair[string, error] wlink_sendReq_pyexc(WatchLink wlink, context.Context ctx, const string &req) except +topyexc: error wlink_replyReq_pyexc(WatchLink wlink, context.Context ctx, const PinReq *req, const string& reply) except +topyexc:
return wlink.sendReq(ctx, req) return wlink.replyReq(ctx, req, reply)
error _twlinkwrite_pyexc(WatchLink wlink, const string& pkt) except +topyexc: error _twlinkwrite_pyexc(WatchLink wlink, const string& pkt) except +topyexc:
return _twlinkwrite(wlink, pkt) return _twlinkwrite(wlink, pkt)
...@@ -308,6 +308,33 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, cons ...@@ -308,6 +308,33 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, cons
return make_tuple(rxq, err); return make_tuple(rxq, err);
} }
// replyReq sends reply to client <- server request received by recvReq.
error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string& answer) {
_WatchLink *wlink = this;
// XXX err ctx?
//print('C: reply %s <- %r ...' % (req, answer))
wlink->_rxmu.lock();
bool ok = wlink->_accepted.has(req->stream);
wlink->_rxmu.unlock();
if (!ok)
panic("reply to not accepted stream");
error err = wlink->_send(req->stream, answer);
wlink->_rxmu.lock();
ok = wlink->_accepted.has(req->stream);
if (ok)
wlink->_accepted.erase(req->stream);
wlink->_rxmu.unlock();
if (!ok)
panic("BUG: stream vanished from wlink._accepted while reply was in progress");
// XXX also track as answered? (and don't accept with the same ID ?)
return err;
}
// recvReq receives client <- server request. // recvReq receives client <- server request.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt); static error _parsePinReq(PinReq *pin, const rxPkt *pkt);
error _WatchLink::recvReq(context::Context ctx, PinReq *prx) { error _WatchLink::recvReq(context::Context ctx, PinReq *prx) {
......
...@@ -104,8 +104,9 @@ public: ...@@ -104,8 +104,9 @@ public:
public: public:
error close(); error close();
error closeWrite(); error closeWrite();
error recvReq(context::Context ctx, PinReq *rx_into);
pair<string, error> sendReq(context::Context ctx, const string &req); pair<string, error> sendReq(context::Context ctx, const string &req);
error recvReq(context::Context ctx, PinReq *rx_into);
error replyReq(context::Context ctx, const PinReq *req, const string& reply);
private: private:
error _serveRX(context::Context ctx); error _serveRX(context::Context ctx);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment