Commit fb4d8efc authored by Kirill Smelkov's avatar Kirill Smelkov

fixup! neonet/newlink: Fix lost conn in encoding detector

parent 088502fd
// Copyright (C) 2016-2021 Nexedi SA and Contributors.
// Copyright (C) 2016-2023 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -59,6 +59,19 @@ type _HandshakeError struct {
Err error
}
// _VersionMismatchError is reported as cause when handshake detects that peer
// uses protocol version different than ours.
type _VersionMismatchError struct {
LocalVer uint32
RemoteVer uint32
}
// _EncodingMismatchError is reported as cause when handshake detects that peer does not accept our encoding.
type _EncodingMismatchError struct {
LocalEnc proto.Encoding
RemoteEnc proto.Encoding
}
func (e *_HandshakeError) Error() string {
role := ""
switch e.LocalRole {
......@@ -72,26 +85,32 @@ func (e *_HandshakeError) Error() string {
func (e *_HandshakeError) Cause() error { return e.Err }
func (e *_HandshakeError) Unwrap() error { return e.Err }
func (e *_VersionMismatchError) Error() string {
return fmt.Sprintf("protocol version mismatch: peer = %08x ; our side = %08x", e.RemoteVer, e.LocalVer)
}
func (e *_EncodingMismatchError) Error() string {
return fmt.Sprintf("protocol encoding mismatch: peer = %q ; our side = %q", e.RemoteEnc, e.LocalEnc)
}
// handshakeClient implements client-side NEO protocol handshake just after raw
// connection between 2 nodes was established.
//
// Client indicates its version and preferred encoding, but accepts any
// encoding chosen to use by server.
// Client indicates its version and encoding.
//
// On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed.
func handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPrefer proto.Encoding) (*NodeLink, error) {
enc, rxbuf, err := _handshakeClient(ctx, conn, version, encPrefer)
func handshakeClient(ctx context.Context, conn net.Conn, version uint32, encoding proto.Encoding) (*NodeLink, error) {
rxbuf, err := _handshakeClient(ctx, conn, version, encoding)
if err != nil {
return nil, err
}
return newNodeLink(conn, enc, _LinkClient, rxbuf), nil
return newNodeLink(conn, encoding, _LinkClient, rxbuf), nil
}
// handshakeServer implements server-side NEO protocol handshake just after raw
// connection between 2 nodes was established.
//
// Server verifies that its version matches Client and accepts client preferred encoding.
// Server verifies that its version matches Client and accepts client encoding.
//
// On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed.
......@@ -103,7 +122,7 @@ func handshakeServer(ctx context.Context, conn net.Conn, version uint32) (*NodeL
return newNodeLink(conn, enc, _LinkServer, rxbuf), nil
}
func _handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPrefer proto.Encoding) (enc proto.Encoding, rxbuf *xbufReader, err error) {
func _handshakeClient(ctx context.Context, conn net.Conn, version uint32, encoding proto.Encoding) (rxbuf *xbufReader, err error) {
defer func() {
if err != nil {
err = &_HandshakeError{_LinkClient, conn.LocalAddr(), conn.RemoteAddr(), err}
......@@ -115,7 +134,7 @@ func _handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPre
var peerEnc proto.Encoding
err = xio.WithCloseOnErrCancel(ctx, conn, func() error {
// tx client hello
err := txHello("tx hello", conn, version, encPrefer)
err := txHello("tx hello", conn, version, encoding)
if err != nil {
return err
}
......@@ -126,13 +145,15 @@ func _handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPre
if err != nil {
return err
}
if peerEnc != encPrefer {
return ErrEncodingMismatch
}
// verify version
if peerVer != version {
return fmt.Errorf("protocol version mismatch: peer = %08x ; our side = %08x", peerVer, version)
return &_VersionMismatchError{version, peerVer}
}
// verify encoding
if peerEnc != encoding {
return &_EncodingMismatchError{encoding, peerEnc}
}
return nil
......@@ -141,12 +162,10 @@ func _handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPre
if ctx.Err() != nil {
err = ctx.Err() // error was due to ctx cancel
}
return 0, nil, err
return nil, err
}
// use peer encoding (server should return the same, but we are ok if
// it asks to switch to different)
return peerEnc, rxbuf, nil
return rxbuf, nil
}
func _handshakeServer(ctx context.Context, conn net.Conn, version uint32) (enc proto.Encoding, rxbuf *xbufReader, err error) {
......@@ -179,7 +198,7 @@ func _handshakeServer(ctx context.Context, conn net.Conn, version uint32) (enc p
// verify version
if peerVer != version {
return fmt.Errorf("protocol version mismatch: peer = %08x ; our side = %08x", peerVer, version)
return &_VersionMismatchError{version, peerVer}
}
return nil
......@@ -302,7 +321,6 @@ func init() {
// handshake and wraps the connection as NodeLink.
func DialLink(ctx context.Context, net xnet.Networker, addr string) (link *NodeLink, err error) {
for _, enc := range dialEncTryOrder {
peerConn, err := net.Dial(ctx, addr)
if err != nil {
return nil, err
......@@ -325,7 +343,8 @@ func DialLink(ctx context.Context, net xnet.Networker, addr string) (link *NodeL
//
// -> in both cases retry with next encoding trying to autodetect and match server.
// -> stop trying on success, or on any other error.
if err == nil || !(errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, ErrEncodingMismatch)) {
var errEnc *_EncodingMismatchError
if err == nil || !(errors.Is(err, io.ErrUnexpectedEOF) || errors.As(err, &errEnc)) {
break
}
}
......@@ -334,9 +353,6 @@ func DialLink(ctx context.Context, net xnet.Networker, addr string) (link *NodeL
return link, err
}
var ErrEncodingMismatch = errors.New("protocol encoding mismatch")
// ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink.
//
// The listener accepts only those connections that pass NEO protocol handshake.
......
// Copyright (C) 2016-2021 Nexedi SA and Contributors.
// Copyright (C) 2016-2023 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -24,6 +24,7 @@ import (
"errors"
"io"
"net"
"reflect"
"testing"
"lab.nexedi.com/kirr/go123/exc"
......@@ -31,13 +32,10 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/proto"
)
// _xhandshakeClient handshakes as client with encPrefer encoding and verifies that server accepts it.
func _xhandshakeClient(ctx context.Context, c net.Conn, version uint32, encPrefer proto.Encoding) {
enc, _, err := _handshakeClient(ctx, c, version, encPrefer)
// _xhandshakeClient handshakes as client with specified version and encoding and verifies that server accepts it.
func _xhandshakeClient(ctx context.Context, c net.Conn, version uint32, encoding proto.Encoding) {
_, err := _handshakeClient(ctx, c, version, encoding)
exc.Raiseif(err)
if enc != encPrefer {
exc.Raisef("enc (%c) != encPrefer (%c)", enc, encPrefer)
}
}
// _xhandshakeServer handshakes as server and verifies negotiated encoding to be encOK.
......@@ -72,7 +70,7 @@ func _TestHandshake(t *T) {
var err1, err2 error
wg = xsync.NewWorkGroup(bg)
gox(wg, func(ctx context.Context) {
_, _, err1 = _handshakeClient(ctx, p1, 1, t.enc)
_, err1 = _handshakeClient(ctx, p1, 1, t.enc)
})
gox(wg, func(ctx context.Context) {
_, _, err2 = _handshakeServer(ctx, p2, 2)
......@@ -81,22 +79,84 @@ func _TestHandshake(t *T) {
xclose(p1)
xclose(p2)
err1Want := "pipe - pipe: handshake (client): protocol version mismatch: peer = 00000002 ; our side = 00000001"
err2Want := "pipe - pipe: handshake (server): protocol version mismatch: peer = 00000001 ; our side = 00000002"
e1ok := &_HandshakeError{
LocalRole: _LinkClient,
LocalAddr: p1.LocalAddr(),
RemoteAddr: p1.RemoteAddr(),
Err: &_VersionMismatchError{
LocalVer: 1,
RemoteVer: 2,
},
}
e2ok := &_HandshakeError{
LocalRole: _LinkServer,
LocalAddr: p2.LocalAddr(),
RemoteAddr: p2.RemoteAddr(),
Err: &_VersionMismatchError{
LocalVer: 2,
RemoteVer: 1,
},
}
if !(err1 != nil && err1.Error() == err1Want) {
t.Errorf("handshake ver mismatch: p1: unexpected error:\nhave: %v\nwant: %v", err1, err1Want)
if !reflect.DeepEqual(err1, e1ok) {
t.Errorf("handshake ver mismatch: p1: unexpected error:\nhave: %#v\n %q\nwant: %#v\n %q",
err1, estr(err1), e1ok, e1ok.Error())
}
if !(err2 != nil && err2.Error() == err2Want) {
t.Errorf("handshake ver mismatch: p2: unexpected error:\nhave: %v\nwant: %v", err2, err2Want)
if !reflect.DeepEqual(err2, e2ok) {
t.Errorf("handshake ver mismatch: p2: unexpected error:\nhave: %#v\n %q\nwant: %#v\n %q",
err2, estr(err2), e2ok, e2ok.Error())
}
// tx & rx problem (client)
// encoding mismatch (mimic behaviour of NEO/py server who does not accept client-proposed encoding)
p1, p2 = net.Pipe()
var err error
var srvEnc proto.Encoding
switch t.enc {
case 'N': srvEnc = 'M'
case 'M': srvEnc = 'N'
default: panic("bug")
}
wg = xsync.NewWorkGroup(bg)
gox(wg, func(ctx context.Context) {
_, err = _handshakeClient(ctx, p1, 1, t.enc)
})
gox(wg, func(ctx context.Context) {
wg := xsync.NewWorkGroup(ctx)
gox(wg, func(ctx context.Context) {
err := txHello("tx hello", p2, 1, srvEnc)
exc.Raiseif(err)
})
gox(wg, func(ctx context.Context) {
rxbuf := newXBufReader(p2, 1024)
_, _, err := rxHello("rx hello", rxbuf)
exc.Raiseif(err)
})
xwait(wg)
})
xwait(wg)
xclose(p1)
xclose(p2)
eok := &_HandshakeError{
LocalRole: _LinkClient,
LocalAddr: p1.LocalAddr(),
RemoteAddr: p1.RemoteAddr(),
Err: &_EncodingMismatchError{
LocalEnc: t.enc,
RemoteEnc: srvEnc,
},
}
if !reflect.DeepEqual(err, eok) {
t.Errorf("handshake encoding mismatch: client: unexpected error:\nhave: %#v\n %q\nwant: %#v\n %q",
err, estr(err), eok, eok.Error())
}
// tx & rx problem (client)
p1, p2 = net.Pipe()
wg = xsync.NewWorkGroup(bg)
gox(wg, func(ctx context.Context) {
_, _, err = _handshakeClient(ctx, p1, 1, t.enc)
_, err = _handshakeClient(ctx, p1, 1, t.enc)
})
gox(wg, func(_ context.Context) {
xclose(p2)
......@@ -131,7 +191,7 @@ func _TestHandshake(t *T) {
ctx, cancel := context.WithCancel(bg)
wg = xsync.NewWorkGroup(ctx)
gox(wg, func(ctx context.Context) {
_, _, err = _handshakeClient(ctx, p1, 1, t.enc)
_, err = _handshakeClient(ctx, p1, 1, t.enc)
})
tdelay()
cancel()
......@@ -162,3 +222,12 @@ func _TestHandshake(t *T) {
t.Errorf("handshake (server): cancel: unexpected error: %#v", err)
}
}
// estr returns err.Error() or "<nil>".
func estr(err error) string {
if err == nil {
return "<nil>"
} else {
return err.Error()
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment