Commit 6d0cd046 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ee12db7c
...@@ -33,6 +33,7 @@ import ( ...@@ -33,6 +33,7 @@ import (
"lab.nexedi.com/kirr/neo/go/internal/log" "lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task" "lab.nexedi.com/kirr/neo/go/internal/task"
"lab.nexedi.com/kirr/neo/go/internal/xcontext"
"lab.nexedi.com/kirr/neo/go/internal/xio" "lab.nexedi.com/kirr/neo/go/internal/xio"
"lab.nexedi.com/kirr/neo/go/neo/neonet" "lab.nexedi.com/kirr/neo/go/neo/neonet"
...@@ -89,7 +90,8 @@ func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr ...@@ -89,7 +90,8 @@ func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr
// Dial does not update .NodeTab or its node entries in any way. // Dial does not update .NodeTab or its node entries in any way.
// For establishing links to peers present in .NodeTab use Node.Dial. // For establishing links to peers present in .NodeTab use Node.Dial.
// //
// XXX unexport after NodeApp += talkMaster // XXX unexport after NodeApp += talkMaster <- used only to dial to M
// <- dialing to other nodes always go through node.Dial
func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr string) (_ *neonet.NodeLink, _ *proto.AcceptIdentification, err error) { func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr string) (_ *neonet.NodeLink, _ *proto.AcceptIdentification, err error) {
defer task.Runningf(&ctx, "dial %v (%v)", addr, peerType)(&err) defer task.Runningf(&ctx, "dial %v (%v)", addr, peerType)(&err)
...@@ -100,7 +102,7 @@ func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr stri ...@@ -100,7 +102,7 @@ func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr stri
log.Info(ctx, "dialed ok; requesting identification...") log.Info(ctx, "dialed ok; requesting identification...")
defer xerr.Contextf(&err, "%s: request identification", link) defer xerr.Contextf(&err, "%s: request identification", link)
// close link on error or FIXME: ctx cancel // close link on error or FIXME: ctx cancel XXX -> xcontext.WithCloseOnErrCancel
//cleanup := xio.CloseWhenDone(ctx, link) //cleanup := xio.CloseWhenDone(ctx, link)
defer func() { defer func() {
if err != nil { if err != nil {
...@@ -183,7 +185,7 @@ type listener struct { ...@@ -183,7 +185,7 @@ type listener struct {
l neonet.LinkListener l neonet.LinkListener
} }
func (l *listener) Accept(ctx context.Context) (_ *neonet.Request, _ *proto.RequestIdentification, err error) { func (l *listener) Accept(ctx context.Context) (_ *neonet.Request, msgID *proto.RequestIdentification, err error) {
link, err := l.l.Accept(ctx) link, err := l.l.Accept(ctx)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
...@@ -191,20 +193,29 @@ func (l *listener) Accept(ctx context.Context) (_ *neonet.Request, _ *proto.Requ ...@@ -191,20 +193,29 @@ func (l *listener) Accept(ctx context.Context) (_ *neonet.Request, _ *proto.Requ
// identify peer // identify peer
// the first conn must come with RequestIdentification packet // the first conn must come with RequestIdentification packet
defer xerr.Context(&err, "identify") // XXX -> task.ErrContext? defer xerr.Context(&err, "identify")
req, err := link.Recv1(/*XXX ctx*/) var req neonet.Request
err = xcontext.WithCloseOnErrCancel(ctx, link, func() error {
var err error
req, err = link.Recv1(/*XXX ctx*/)
if err != nil {
return err
}
switch msg := req.Msg.(type) {
case *proto.RequestIdentification:
msgID = msg
return nil
}
emsg := &proto.Error{proto.PROTOCOL_ERROR, fmt.Sprintf("unexpected message %T", req.Msg)}
req.Reply(emsg) // ignore err
return emsg
})
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
return &req, msgID, err
switch msg := req.Msg.(type) {
case *proto.RequestIdentification:
return &req, msg, nil
}
emsg := &proto.Error{proto.PROTOCOL_ERROR, fmt.Sprintf("unexpected message %T", req.Msg)}
req.Reply(emsg) // XXX err
return nil, nil, emsg
} }
func (l *listener) Close() error { return l.l.Close() } func (l *listener) Close() error { return l.l.Close() }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment