Commit c5775014 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3ef9b607
...@@ -28,7 +28,6 @@ import ( ...@@ -28,7 +28,6 @@ import (
"sync" "sync"
"time" "time"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync" "lab.nexedi.com/kirr/go123/xsync"
...@@ -187,7 +186,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -187,7 +186,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
DevPath: nil, // XXX stub DevPath: nil, // XXX stub
NewNID: nil, // XXX stub NewNID: nil, // XXX stub
} }
mlink, accept, err := dialNode(ctx, proto.MASTER, node.Net, node.MasterAddr, reqID) mlink, accept, err := xneo.Dial(ctx, proto.MASTER, node.Net, node.MasterAddr, reqID)
if err != nil { if err != nil {
return err return err
} }
...@@ -462,51 +461,5 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN ...@@ -462,51 +461,5 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN
} }
// XXX Dial + request identification + verify peer type
func dialNode(ctx context.Context, typ proto.NodeType, net xnet.Networker, addr string, reqID *proto.RequestIdentification) (_ *neonet.NodeLink, _ *proto.AcceptIdentification, err error) {
defer task.Runningf(&ctx, "dial %s (%s)", addr, typ)(&err)
link, err := neonet.DialLink(ctx, net, addr)
if err != nil {
return nil, nil, err
}
log.Info(ctx, "dialed ok; requesting identification...")
defer xerr.Contextf(&err, "%s: request identification", link)
accept := &proto.AcceptIdentification{}
err = xcontext.WithCloseOnErrCancel(ctx, link, func() error {
// FIXME error if peer sends us something with another connID
// (currently we ignore and serveRecv will deadlock)
//
// XXX solution could be:
// link.CloseAccept()
// link.Ask1(reqID, accept)
// link.Listen()
// XXX but there is a race window in between recv in ask and listen
// start, and if peer sends new connection in that window it will be rejected.
//
// TODO thinking.
err = link.Ask1(reqID, accept)
if err != nil {
return err
}
if accept.NodeType != typ {
// XXX send Error to peer?
return fmt.Errorf("accepted, but peer is not %v (identifies as %v)", typ, accept.NodeType)
}
return nil
})
if err != nil {
return nil, nil, err
}
log.Info(ctx, "identification accepted")
return link, accept, nil
}
// XXX = Dial node by NID, verify it accepts with "MyNID" == NID, YourNID == NID we sent // XXX = Dial node by NID, verify it accepts with "MyNID" == NID, YourNID == NID we sent
// func dialNID // func dialNID
...@@ -26,6 +26,10 @@ import ( ...@@ -26,6 +26,10 @@ import (
"net" "net"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task"
"lab.nexedi.com/kirr/neo/go/internal/xcontext" "lab.nexedi.com/kirr/neo/go/internal/xcontext"
"lab.nexedi.com/kirr/neo/go/neo/neonet" "lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
...@@ -97,3 +101,48 @@ func (l *listener) Accept(ctx context.Context) (_ *neonet.Request, msgID *proto. ...@@ -97,3 +101,48 @@ func (l *listener) Accept(ctx context.Context) (_ *neonet.Request, msgID *proto.
func (l *listener) Close() error { return l.l.Close() } func (l *listener) Close() error { return l.l.Close() }
func (l *listener) Addr() net.Addr { return l.l.Addr() } func (l *listener) Addr() net.Addr { return l.l.Addr() }
// XXX Dial + request identification + verify peer type
func Dial(ctx context.Context, typ proto.NodeType, net xnet.Networker, addr string, reqID *proto.RequestIdentification) (_ *neonet.NodeLink, _ *proto.AcceptIdentification, err error) {
defer task.Runningf(&ctx, "dial %s (%s)", addr, typ)(&err)
link, err := neonet.DialLink(ctx, net, addr)
if err != nil {
return nil, nil, err
}
log.Info(ctx, "dialed ok; requesting identification...")
defer xerr.Contextf(&err, "%s: request identification", link)
accept := &proto.AcceptIdentification{}
err = xcontext.WithCloseOnErrCancel(ctx, link, func() error {
// FIXME error if peer sends us something with another connID
// (currently we ignore and serveRecv will deadlock)
//
// XXX solution could be:
// link.CloseAccept()
// link.Ask1(reqID, accept)
// link.Listen()
// XXX but there is a race window in between recv in ask and listen
// start, and if peer sends new connection in that window it will be rejected.
//
// TODO thinking.
err = link.Ask1(reqID, accept)
if err != nil {
return err
}
if accept.NodeType != typ {
// XXX send Error to peer?
return fmt.Errorf("accepted, but peer is not %v (identifies as %v)", typ, accept.NodeType)
}
return nil
})
if err != nil {
return nil, nil, err
}
log.Info(ctx, "identification accepted")
return link, accept, nil
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment