Commit caafdbaa authored by Kirill Smelkov's avatar Kirill Smelkov

X Split Handshake into HandshakeClient + HandshakeServer

Dial will use handshakeClient with options and retries.
parent 655906f5
...@@ -1141,12 +1141,12 @@ func xlinkPipe(c1, c2 net.Conn) (*NodeLink, *NodeLink) { ...@@ -1141,12 +1141,12 @@ func xlinkPipe(c1, c2 net.Conn) (*NodeLink, *NodeLink) {
wg := xsync.NewWorkGroup(context.Background()) wg := xsync.NewWorkGroup(context.Background())
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
l, err := _Handshake(ctx, c1, _LinkClient) l, err := _HandshakeClient(ctx, c1)
exc.Raiseif(err) exc.Raiseif(err)
l1 = l l1 = l
}) })
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
l, err := _Handshake(ctx, c2, _LinkServer) l, err := _HandshakeServer(ctx, c2)
exc.Raiseif(err) exc.Raiseif(err)
l2 = l l2 = l
}) })
......
...@@ -55,34 +55,16 @@ func init() { ...@@ -55,34 +55,16 @@ func init() {
// ---- Handshake ---- // ---- Handshake ----
// XXX _Handshake may be needed to become public in case when we have already // XXX _Handshake{Client,Server} may be needed to become public in case when we have already
// established raw connection and want to hand-over it to NEO. But currently we // established raw connection and want to hand-over it to NEO. But currently we
// do not have such uses. // do not have such uses.
// _Handshake performs NEO protocol handshake just after raw connection between func _HandshakeClient(ctx context.Context, conn net.Conn) (*NodeLink, error) {
// 2 nodes was established. return handshakeClient(ctx, conn, proto.Version, encDefault)
// }
// On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed.
func _Handshake(ctx context.Context, conn net.Conn, role _LinkRole) (nl *NodeLink, err error) {
enc := encDefault // default encoding
var rxbuf *fwd.Reader
switch role &^ linkFlagsMask {
case _LinkServer:
enc, rxbuf, err = handshakeServer(ctx, conn, proto.Version)
case _LinkClient:
enc, rxbuf, err = handshakeClient(ctx, conn, proto.Version, enc)
default:
panic("bug")
}
if err != nil {
return nil, err
}
// handshake ok -> NodeLink func _HandshakeServer(ctx context.Context, conn net.Conn) (*NodeLink, error) {
return newNodeLink(conn, enc, role, rxbuf), nil return handshakeServer(ctx, conn, proto.Version)
} }
// _HandshakeError is returned when there is an error while performing handshake. // _HandshakeError is returned when there is an error while performing handshake.
...@@ -106,11 +88,38 @@ func (e *_HandshakeError) Error() string { ...@@ -106,11 +88,38 @@ func (e *_HandshakeError) Error() string {
func (e *_HandshakeError) Cause() error { return e.Err } func (e *_HandshakeError) Cause() error { return e.Err }
func (e *_HandshakeError) Unwrap() error { return e.Err } func (e *_HandshakeError) Unwrap() error { return e.Err }
// handshakeClient implements client-side handshake. // handshakeClient implements client-side NEO protocol handshake just after raw
// connection between 2 nodes was established.
// //
// Client indicates its version and preferred encoding, but accepts any // Client indicates its version and preferred encoding, but accepts any
// encoding choosen to use by server. // encoding chosen to use by server.
func handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPrefer proto.Encoding) (enc proto.Encoding, rxbuf *fwd.Reader, err error) { //
// On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed.
func handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPrefer proto.Encoding) (*NodeLink, error) {
enc, rxbuf, err := _handshakeClient(ctx, conn, version, encPrefer)
if err != nil {
return nil, err
}
return newNodeLink(conn, enc, _LinkClient, rxbuf), nil
}
// handshakeServer implements server-side NEO protocol handshake just after raw
// connection between 2 nodes was established.
//
// Server verifies that its version matches Client and accepts client preferred encoding.
//
// On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed.
func handshakeServer(ctx context.Context, conn net.Conn, version uint32) (*NodeLink, error) {
enc, rxbuf, err := _handshakeServer(ctx, conn, version)
if err != nil {
return nil, err
}
return newNodeLink(conn, enc, _LinkServer, rxbuf), nil
}
func _handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPrefer proto.Encoding) (enc proto.Encoding, rxbuf *fwd.Reader, err error) {
defer func() { defer func() {
if err != nil { if err != nil {
err = &_HandshakeError{_LinkClient, conn.LocalAddr(), conn.RemoteAddr(), err} err = &_HandshakeError{_LinkClient, conn.LocalAddr(), conn.RemoteAddr(), err}
...@@ -150,10 +159,7 @@ func handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPref ...@@ -150,10 +159,7 @@ func handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPref
return peerEnc, rxbuf, nil return peerEnc, rxbuf, nil
} }
// handshakeServer implementss server-side handshake. func _handshakeServer(ctx context.Context, conn net.Conn, version uint32) (enc proto.Encoding, rxbuf *fwd.Reader, err error) {
//
// Server verifies that its version matches Client and accepts client preferred encoding.
func handshakeServer(ctx context.Context, conn net.Conn, version uint32) (enc proto.Encoding, rxbuf *fwd.Reader, err error) {
defer func() { defer func() {
if err != nil { if err != nil {
err = &_HandshakeError{_LinkServer, conn.LocalAddr(), conn.RemoteAddr(), err} err = &_HandshakeError{_LinkServer, conn.LocalAddr(), conn.RemoteAddr(), err}
...@@ -285,9 +291,10 @@ func DialLink(ctx context.Context, net xnet.Networker, addr string) (*NodeLink, ...@@ -285,9 +291,10 @@ func DialLink(ctx context.Context, net xnet.Networker, addr string) (*NodeLink,
return nil, err return nil, err
} }
// TODO if handshake fails with "closed" (= might be unexpected encoding) // // TODO if handshake fails with "closed" (= might be unexpected encoding)
// -> try redial and handshaking with different encoding (= autodetect encoding) // // -> try redial and handshaking with different encoding (= autodetect encoding)
return _Handshake(ctx, peerConn, _LinkClient) // return _Handshake(ctx, peerConn, _LinkClient)
return handshakeClient(ctx, peerConn, proto.Version, encDefault)
} }
// ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink. // ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink.
...@@ -335,7 +342,7 @@ func (l *linkListener) Accept(ctx context.Context) (*NodeLink, error) { ...@@ -335,7 +342,7 @@ func (l *linkListener) Accept(ctx context.Context) (*NodeLink, error) {
} }
// NOTE Handshake closes conn in case of failure // NOTE Handshake closes conn in case of failure
link, err := _Handshake(ctx, conn, _LinkServer) link, err := _HandshakeServer(ctx, conn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -31,18 +31,18 @@ import ( ...@@ -31,18 +31,18 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
) )
// xhandshakeClient handshakes as client with encPrefer encoding and verifies that server accepts it. // _xhandshakeClient handshakes as client with encPrefer encoding and verifies that server accepts it.
func xhandshakeClient(ctx context.Context, c net.Conn, version uint32, encPrefer proto.Encoding) { func _xhandshakeClient(ctx context.Context, c net.Conn, version uint32, encPrefer proto.Encoding) {
enc, _, err := handshakeClient(ctx, c, version, encPrefer) enc, _, err := _handshakeClient(ctx, c, version, encPrefer)
exc.Raiseif(err) exc.Raiseif(err)
if enc != encPrefer { if enc != encPrefer {
exc.Raisef("enc (%c) != encPrefer (%c)", enc, encPrefer) exc.Raisef("enc (%c) != encPrefer (%c)", enc, encPrefer)
} }
} }
// xhandshakeServer handshakes as server and verifies negotiated encoding to be encOK. // _xhandshakeServer handshakes as server and verifies negotiated encoding to be encOK.
func xhandshakeServer(ctx context.Context, c net.Conn, version uint32, encOK proto.Encoding) { func _xhandshakeServer(ctx context.Context, c net.Conn, version uint32, encOK proto.Encoding) {
enc, _, err := handshakeServer(ctx, c, version) enc, _, err := _handshakeServer(ctx, c, version)
exc.Raiseif(err) exc.Raiseif(err)
if enc != encOK { if enc != encOK {
exc.Raisef("enc (%c) != encOK (%c)", enc, encOK) exc.Raisef("enc (%c) != encOK (%c)", enc, encOK)
...@@ -58,10 +58,10 @@ func _TestHandshake(t *T) { ...@@ -58,10 +58,10 @@ func _TestHandshake(t *T) {
p1, p2 := net.Pipe() p1, p2 := net.Pipe()
wg := xsync.NewWorkGroup(bg) wg := xsync.NewWorkGroup(bg)
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
xhandshakeClient(ctx, p1, 1, t.enc) _xhandshakeClient(ctx, p1, 1, t.enc)
}) })
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
xhandshakeServer(ctx, p2, 1, t.enc) _xhandshakeServer(ctx, p2, 1, t.enc)
}) })
xwait(wg) xwait(wg)
xclose(p1) xclose(p1)
...@@ -72,10 +72,10 @@ func _TestHandshake(t *T) { ...@@ -72,10 +72,10 @@ func _TestHandshake(t *T) {
var err1, err2 error var err1, err2 error
wg = xsync.NewWorkGroup(bg) wg = xsync.NewWorkGroup(bg)
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
_, _, err1 = handshakeClient(ctx, p1, 1, t.enc) _, _, err1 = _handshakeClient(ctx, p1, 1, t.enc)
}) })
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
_, _, err2 = handshakeServer(ctx, p2, 2) _, _, err2 = _handshakeServer(ctx, p2, 2)
}) })
xwait(wg) xwait(wg)
xclose(p1) xclose(p1)
...@@ -96,7 +96,7 @@ func _TestHandshake(t *T) { ...@@ -96,7 +96,7 @@ func _TestHandshake(t *T) {
err1, err2 = nil, nil err1, err2 = nil, nil
wg = xsync.NewWorkGroup(bg) wg = xsync.NewWorkGroup(bg)
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
_, _, err1 = handshakeClient(ctx, p1, 1, t.enc) _, _, err1 = _handshakeClient(ctx, p1, 1, t.enc)
}) })
gox(wg, func(_ context.Context) { gox(wg, func(_ context.Context) {
xclose(p2) xclose(p2)
...@@ -110,16 +110,16 @@ func _TestHandshake(t *T) { ...@@ -110,16 +110,16 @@ func _TestHandshake(t *T) {
t.Errorf("handshake peer close: unexpected error: %#v", err1) t.Errorf("handshake peer close: unexpected error: %#v", err1)
} }
// XXX same for handshakeServer // XXX same for _handshakeServer
// ctx cancel // ctx cancel
// XXX same for handshakeServer // XXX same for _handshakeServer
p1, p2 = net.Pipe() p1, p2 = net.Pipe()
ctx, cancel := context.WithCancel(bg) ctx, cancel := context.WithCancel(bg)
wg = xsync.NewWorkGroup(ctx) wg = xsync.NewWorkGroup(ctx)
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
_, _, err1 = handshakeClient(ctx, p1, 1, t.enc) _, _, err1 = _handshakeClient(ctx, p1, 1, t.enc)
}) })
tdelay() tdelay()
cancel() cancel()
......
...@@ -387,7 +387,7 @@ func Verify(t *testing.T, f func(*tEnv)) { ...@@ -387,7 +387,7 @@ func Verify(t *testing.T, f func(*tEnv)) {
// TODO verify M=(go|py) x S=(go|py) x ... // TODO verify M=(go|py) x S=(go|py) x ...
// for now we only verify for all combinations of network // for now we only verify for all combinations of network
// TODO verify enc=(M|N) // TODO verify enc=(M|N) ?
// for all networks // for all networks
for _, network := range []string{"pipenet", "lonet"} { for _, network := range []string{"pipenet", "lonet"} {
...@@ -395,7 +395,6 @@ func Verify(t *testing.T, f func(*tEnv)) { ...@@ -395,7 +395,6 @@ func Verify(t *testing.T, f func(*tEnv)) {
Network: network, Network: network,
} }
// TODO don't pass in opt -> instead pass in tEnv which will have .NewCluster() and use T.opt for it.
t.Run("net="+network, func (t *testing.T) { t.Run("net="+network, func (t *testing.T) {
tracetest.Verify(t, func(t *tracetest.T) { tracetest.Verify(t, func(t *tracetest.T) {
tenv := &tEnv{ tenv := &tEnv{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment