Commit 59333e47 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a98c9d24
......@@ -346,7 +346,7 @@ func (c *Conn) Close() error {
// ---- receive ----
// Accept waits for and accepts incoming connection on top of node-node link.
func (nl *NodeLink) Accept() (c *Conn, err error) {
func (nl *NodeLink) Accept(ctx context.Context) (c *Conn, err error) {
defer func() {
if err != nil {
err = nl.err("accept", err)
......@@ -365,6 +365,10 @@ func (nl *NodeLink) Accept() (c *Conn, err error) {
}
return nil, ErrLinkDown
// XXX ctx cancel tests
case <-ctx.Done():
return nil, ctx.Err()
case c := <-nl.acceptq:
return c, nil
}
......@@ -968,7 +972,7 @@ func (c *Conn) ConnID() uint32 {
}
// ---- for convenience: String / Error ----
// ---- for convenience: String / Error / Cause ----
func (nl *NodeLink) String() string {
s := fmt.Sprintf("%s - %s", nl.LocalAddr(), nl.RemoteAddr())
return s // XXX add "(closed)" if nl is closed ?
......@@ -988,6 +992,9 @@ func (e *ConnError) Error() string {
return fmt.Sprintf("%s: %s: %s", e.Conn, e.Op, e.Err)
}
func (e *LinkError) Cause() error { return e.Err }
func (e *ConnError) Cause() error { return e.Err }
func (nl *NodeLink) err(op string, e error) error {
if e == nil {
return nil
......
......@@ -158,7 +158,7 @@ type Listener interface {
// On success returned are:
// - primary link connection which carried identification
// - requested identification packet
Accept() (*Conn, *RequestIdentification, error)
Accept(ctx context.Context) (*Conn, *RequestIdentification, error)
}
type listener struct {
......@@ -197,7 +197,7 @@ func (l *listener) run() {
func (l *listener) accept(link *NodeLink, err error) {
res := make(chan accepted, 1)
go func() {
conn, idReq, err := l.accept1(link, err)
conn, idReq, err := l.accept1(context.Background(), link, err) // XXX ctx cancel on l close?
res <- accepted{conn, idReq, err}
}()
......@@ -224,16 +224,16 @@ func (l *listener) accept(link *NodeLink, err error) {
}
}
func (l *listener) accept1(link *NodeLink, err0 error) (_ *Conn, _ *RequestIdentification, err error) {
func (l *listener) accept1(ctx context.Context, link *NodeLink, err0 error) (_ *Conn, _ *RequestIdentification, err error) {
if err0 != nil {
return nil, nil, err0
}
defer xerr.Context(&err, "identify")
defer xerr.Context(&err, "identify") // XXX -> task.ErrContext?
// identify peer
// the first conn must come with RequestIdentification packet
conn, err := link.Accept()
conn, err := link.Accept(ctx)
if err != nil {
return nil, nil, err
}
......@@ -254,13 +254,16 @@ func (l *listener) accept1(link *NodeLink, err0 error) (_ *Conn, _ *RequestIdent
return conn, idReq, nil
}
func (l *listener) Accept() (*Conn, *RequestIdentification, error) {
func (l *listener) Accept(ctx context.Context) (*Conn, *RequestIdentification, error) {
select{
case <-l.closed:
// we know raw listener is already closed - return proper error about it
_, err := l.l.Accept()
return nil, nil, err
case <-ctx.Done():
return nil, nil, ctx.Err()
case a := <-l.acceptq:
return a.conn, a.idReq, a.err
}
......
......@@ -191,7 +191,7 @@ func (m *Master) Run(ctx context.Context) (err error) {
// XXX dup in storage
for serveCtx.Err() == nil {
conn, idReq, err := l.Accept()
conn, idReq, err := l.Accept(serveCtx)
if err != nil {
// TODO log / throttle
continue
......
......@@ -23,7 +23,7 @@ package server
// common parts for organizing network servers
import (
// "context"
"context"
// "fmt"
// "net"
......@@ -90,11 +90,11 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error {
// IdentifyPeer identifies peer on the link
// it expects peer to send RequestIdentification packet and replies with AcceptIdentification if identification passes.
// returns information about identified node or error.
func IdentifyPeer(link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.RequestIdentification, err error) {
func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.RequestIdentification, err error) {
defer xerr.Contextf(&err, "%s: identify", link)
// the first conn must come with RequestIdentification packet
conn, err := link.Accept()
conn, err := link.Accept(ctx)
if err != nil {
return nodeInfo, err
}
......
......@@ -106,7 +106,7 @@ func (stor *Storage) Run(ctx context.Context) error {
// XXX dup from master
for serveCtx.Err() == nil {
conn, idReq, err := l.Accept()
conn, idReq, err := l.Accept(serveCtx)
if err != nil {
// TODO log / throttle
continue
......@@ -214,7 +214,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
return
for {
conn, err := Mlink.Accept()
conn, err := Mlink.Accept(ctx)
select {
case acceptq <- accepted{conn, err}:
......@@ -433,7 +433,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
// XXX only accept clients
// XXX only accept when operational (?)
nodeInfo, err := IdentifyPeer(link, neo.STORAGE)
nodeInfo, err := IdentifyPeer(ctx, link, neo.STORAGE)
if err != nil {
log.Error(ctx, err)
return
......@@ -452,7 +452,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
// identification passed, now serve other requests
for {
conn, err := link.Accept()
conn, err := link.Accept(ctx)
if err != nil {
log.Error(ctx, err)
break
......@@ -476,6 +476,12 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context,
return xcontext.Merge(ctx, opCtx)
}
// serveClient serves incoming connection on which peer identified itself as client
// the connection is closed when serveClient returns
// XXX +error return?
//
// XXX version that reuses goroutine to serve next client requests
// XXX for py compatibility (py has no way to tell us Conn is closed)
func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
log.Infof(ctx, "%s: serving new client conn", conn) // XXX -> running?
......@@ -499,7 +505,8 @@ func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
conn, err = link.Accept(ctx)
if err != nil {
// lclose(link) XXX ?
return err
log.Error(ctx, "%v: %v", conn, err)
return
}
}
}
......@@ -507,6 +514,10 @@ func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
// serveClient serves incoming connection on which peer identified itself as client
// the connection is closed when serveClient returns
// XXX +error return?
//
// XXX version that keeps 1 goroutine per 1 Conn
// XXX unusable until Conn.Close signals peer
/*
func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
log.Infof(ctx, "%s: serving new client conn", conn) // XXX -> running?
......@@ -544,6 +555,7 @@ func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
log.Infof(ctx, "%v: closing client conn", conn)
conn.Close() // XXX err
}
*/
// serveClient1 serves 1 request from a client
func (stor *Storage) serveClient1(ctx context.Context, conn *neo.Conn) error {
......@@ -564,7 +576,7 @@ func (stor *Storage) serveClient1(ctx context.Context, conn *neo.Conn) error {
}
var reply neo.Msg
data, tid, err := stor.zstor.Load(xid)
data, tid, err := stor.zstor.Load(ctx, xid)
if err != nil {
// TODO translate err to NEO protocol error codes
reply = neo.ErrEncode(err)
......@@ -587,7 +599,7 @@ func (stor *Storage) serveClient1(ctx context.Context, conn *neo.Conn) error {
case *neo.LastTransaction:
var reply neo.Msg
lastTid, err := stor.zstor.LastTid()
lastTid, err := stor.zstor.LastTid(ctx)
if err != nil {
reply = neo.ErrEncode(err)
} else {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment