Commit 76f4f073 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 363d5cdb
......@@ -34,6 +34,8 @@ import (
"time"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/go123/xerr"
)
// NodeLink is a node-node link in NEO
......@@ -346,7 +348,7 @@ func (c *Conn) Close() error {
// ---- receive ----
// Accept waits for and accepts incoming connection on top of node-node link.
func (nl *NodeLink) Accept(ctx context.Context) (c *Conn, err error) {
func (nl *NodeLink) Accept(/*ctx context.Context*/) (c *Conn, err error) {
defer func() {
if err != nil {
err = nl.err("accept", err)
......@@ -369,9 +371,11 @@ func (nl *NodeLink) Accept(ctx context.Context) (c *Conn, err error) {
// lower cases that are run at every select.
//
// XXX see xio.CloseWhenDone() for helper for this.
/*
// XXX ctx cancel tests
case <-ctx.Done():
return nil, ctx.Err()
*/
case c := <-nl.acceptq:
return c, nil
......@@ -1110,6 +1114,7 @@ func (c *Conn) Expect(msgv ...Msg) (which int, err error) {
}
// Ask sends request and receives response.
//
// It expects response to be exactly of resp type and errors otherwise
// XXX clarify error semantic (when Error is decoded)
// XXX do the same as Expect wrt respv ?
......@@ -1130,3 +1135,107 @@ func (c *Conn) Ask(req Msg, resp Msg) error {
return err
}
// ---- exchange of 1-1 request-reply ----
// (impedance matche for current neo/py imlementation)
// Request is a message received from the link + connection handle to make a reply.
//
// Request represents 1 request - 0|1 reply interaction model XXX
type Request struct {
Msg Msg
conn *Conn
}
// Recv1 receives message from link and wraps it into Request.
//
// XXX doc
func (link *NodeLink) Recv1() (Request, error) {
conn, err := link.Accept(/*context.TODO()*/) // XXX remove context?
if err != nil {
return Request{}, nil // XXX or return *Request? (want to avoid alloc)
}
// NOTE serveRecv guaranty that when a conn is accepted, there is 1 message in conn.rxq
msg, err := conn.Recv()
if err != nil {
conn.Close() // XXX -> lclose(conn)
return Request{}, nil
}
// noone will be reading from conn anymore - shutdown rx so that if
// peer sends any another packet with same .ConnID serveRecv does not
// deadlock trying to put it to conn.rxq.
// conn.CloseRead() // XXX err?
return Request{Msg: msg, conn: conn}, nil
}
// Reply sends response to request.
//
// XXX doc
func (req Request) Reply(resp Msg) error {
err1 := req.conn.Send(resp)
err2 := req.conn.Close()
return xerr.First(err1, err2)
}
// Close should be called to free request resources for requests without a reply
//
// XXX doc
// It is safe to call Close several times.
func (req Request) Close() error {
return req.conn.Close()
}
// Send1 sends one message over link
//
// XXX doc
func (link *NodeLink) Send1(msg Msg) error {
conn, err := link.NewConn()
if err != nil {
return err
}
// XXX conn.CloseRead() ?
err1 := conn.Send(msg)
err2 := conn.Close()
return xerr.First(err1, err2)
}
// Ask1 sends request and receives response. XXX in 1-1 model
//
// See Conn.Ask for semantic details.
// XXX doc
func (link *NodeLink) Ask1(req Msg, resp Msg) (err error) {
conn, err := link.NewConn()
if err != nil {
return err
}
defer func() {
err2 := conn.Close()
if err == nil {
err = err2
}
}()
err = conn.Send(req)
if err != nil {
return err
}
nerr := &Error{}
which, err := conn.Expect(resp, nerr)
switch which {
case 0:
return nil
case 1:
return ErrDecode(nerr)
}
return err
}
......@@ -233,7 +233,7 @@ func (l *listener) accept1(ctx context.Context, link *NodeLink, err0 error) (_ *
// identify peer
// the first conn must come with RequestIdentification packet
conn, err := link.Accept(ctx)
conn, err := link.Accept(/*ctx*/)
if err != nil {
return nil, nil, err
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment