Commit 3ef9b607 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent baf20c7d
// Copyright (C) 2016-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package xneo
// dial/listen with request/accept identification
import (
"context"
"fmt"
"net"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/internal/xcontext"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
)
// Listener is LinkListener adapted to return NodeLink with requested identification on Accept.
type Listener interface {
// from LinkListener:
Close() error
Addr() net.Addr
// Accept accepts incoming client connection.
//
// On success the link was handshaked and peer sent us RequestIdentification
// packet which we did not yet answer.
//
// On success returned are:
// - original peer request that carried identification
// - requested identification packet
//
// After successful accept it is the caller responsibility to reply the request.
//
// NOTE established link is Request.Link().
Accept(ctx context.Context) (*neonet.Request, *proto.RequestIdentification, error)
}
// NewListener wraps inner LinkListener into Listener.
func NewListener(inner neonet.LinkListener) Listener {
return &listener{l: inner}
}
type listener struct {
l neonet.LinkListener
}
func (l *listener) Accept(ctx context.Context) (_ *neonet.Request, msgID *proto.RequestIdentification, err error) {
link, err := l.l.Accept(ctx)
if err != nil {
return nil, nil, err
}
// identify peer
// the first conn must come with RequestIdentification packet
defer xerr.Context(&err, "identify")
var req neonet.Request
err = xcontext.WithCloseOnErrCancel(ctx, link, func() error {
var err error
req, err = link.Recv1(/*XXX ctx*/)
if err != nil {
return err
}
switch msg := req.Msg.(type) {
case *proto.RequestIdentification:
msgID = msg
return nil
}
emsg := &proto.Error{proto.PROTOCOL_ERROR, fmt.Sprintf("unexpected message %T", req.Msg)}
req.Reply(emsg) // ignore err
return emsg
})
if err != nil {
return nil, nil, err
}
return &req, msgID, err
}
func (l *listener) Close() error { return l.l.Close() }
func (l *listener) Addr() net.Addr { return l.l.Addr() }
......@@ -25,7 +25,6 @@ package xneo
import (
"context"
"fmt"
"net"
"sync"
"lab.nexedi.com/kirr/go123/xerr"
......@@ -33,7 +32,6 @@ import (
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task"
"lab.nexedi.com/kirr/neo/go/internal/xcontext"
"lab.nexedi.com/kirr/neo/go/internal/xio"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
......@@ -168,72 +166,6 @@ func (node *Node) Dial(ctx context.Context, peerType proto.NodeType, addr string
}
// Listener is LinkListener adapted to return NodeLink with requested identification on Accept.
type Listener interface {
// from LinkListener:
Close() error
Addr() net.Addr
// Accept accepts incoming client connection.
//
// On success the link was handshaked and peer sent us RequestIdentification
// packet which we did not yet answer.
//
// On success returned are:
// - original peer request that carried identification
// - requested identification packet
//
// After successful accept it is the caller responsibility to reply the request.
//
// NOTE established link is Request.Link().
Accept(ctx context.Context) (*neonet.Request, *proto.RequestIdentification, error)
}
// NewListener wraps inner LinkListener into Listener.
func NewListener(inner neonet.LinkListener) Listener {
return &listener{l: inner}
}
type listener struct {
l neonet.LinkListener
}
func (l *listener) Accept(ctx context.Context) (_ *neonet.Request, msgID *proto.RequestIdentification, err error) {
link, err := l.l.Accept(ctx)
if err != nil {
return nil, nil, err
}
// identify peer
// the first conn must come with RequestIdentification packet
defer xerr.Context(&err, "identify")
var req neonet.Request
err = xcontext.WithCloseOnErrCancel(ctx, link, func() error {
var err error
req, err = link.Recv1(/*XXX ctx*/)
if err != nil {
return err
}
switch msg := req.Msg.(type) {
case *proto.RequestIdentification:
msgID = msg
return nil
}
emsg := &proto.Error{proto.PROTOCOL_ERROR, fmt.Sprintf("unexpected message %T", req.Msg)}
req.Reply(emsg) // ignore err
return emsg
})
if err != nil {
return nil, nil, err
}
return &req, msgID, err
}
func (l *listener) Close() error { return l.l.Close() }
func (l *listener) Addr() net.Addr { return l.l.Addr() }
// ----------------------------------------
// UpdateNodeTab applies updates to .NodeTab from message and logs changes appropriately.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment