Commit 716493af authored by Kirill Smelkov's avatar Kirill Smelkov

X First cut on unified interface for network access

- regular
- pipenet
- TLS over anything
parent c9fcbb0b
...@@ -27,12 +27,25 @@ import ( ...@@ -27,12 +27,25 @@ import (
"../zodb" "../zodb"
"../zodb/storage/fs1" "../zodb/storage/fs1"
"../xcommon/pipenet"
) )
// xfs1stor creates new NEO storage node backed by fs1
func xfs1stor(path string) *Storage {
// TODO +readonly ?
zstor, err := fs1.Open(context.Background(), path)
exc.Raiseif(err)
return NewStorage(zstor)
}
// M drives cluster with 1 S through recovery -> verification -> service -> shutdown // M drives cluster with 1 S through recovery -> verification -> service -> shutdown
func TestMasterStorage(t *testing.T) { func TestMasterStorage(t *testing.T) {
// TODO net = pipenet.New("") // test network XXX New registers to global table
S := xfs1stor("../zodb/storage/fs1/testdata/1.fs") // XXX +readonly
M := NewMaster("abc1")
} }
// basic interaction between Client -- Storage // basic interaction between Client -- Storage
...@@ -42,13 +55,7 @@ func TestClientStorage(t *testing.T) { ...@@ -42,13 +55,7 @@ func TestClientStorage(t *testing.T) {
Sctx, Scancel := context.WithCancel(context.Background()) Sctx, Scancel := context.WithCancel(context.Background())
// TODO +readonly ? S := xfs1stor("../zodb/storage/fs1/testdata/1.fs") // XXX +readonly
zstor, err := fs1.Open(context.Background(), "../zodb/storage/fs1/testdata/1.fs")
if err != nil {
t.Fatalf("zstor: %v", err) // XXX err ctx ?
}
S := NewStorage(zstor)
wg.Gox(func() { wg.Gox(func() {
S.ServeLink(Sctx, Snl) S.ServeLink(Sctx, Snl)
// XXX + test error return // XXX + test error return
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Open Source Initiative approved licenses and Convey
// the resulting work. Corresponding source of such a combination shall include
// the source code for all other software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
package neo
// unified interface for accessing various kinds of networks
import (
"context"
"net"
"crypto/tls"
"../xcommon/pipenet"
)
// Network represents interface to work with some kind of streaming network
type Network interface {
// Dial connects to addr on underlying network
// see net.Dial for semantic details
Dial(ctx context.Context, addr string) (net.Conn, error)
// Listen starts listening on local address laddr on underlying network
// see net.Listen for semantic details
Listen(laddr string) (net.Listener, error)
}
// NetPlain creates Network corresponding to regular network
// network is "tcp", "tcp4", "tcp6", "unix", etc...
func NetPlain(network string) Network {
return netPlain(network)
}
type netPlain string
func (n netPlain) Dial(ctx context.Context, addr string) (net.Conn, error) {
d := net.Dialer{}
return d.DialContext(ctx, string(n), addr)
}
func (n netPlain) Listen(laddr string) (net.Listener, error) {
return net.Listen(string(n), laddr)
}
// NetPipe creates Network corresponding to in-memory pipenet
// name is anything valid according to pipenet.New rules
func NetPipe(name string) Network {
return pipenet.New(name)
}
// NetTLS wraps underlying network with TLS layer according to config
// The config must be valid for both tls.Client and tls.Server for Dial and Listen to work
func NetTLS(inner Network, config *tls.Config) Network {
return &netTLS{inner, config}
}
type netTLS struct {
inner Network
config *tls.Config
}
func (n *netTLS) Dial(ctx context.Context, addr string) (net.Conn, error) {
c, err := n.inner.Dial(ctx, addr)
if err != nil {
return nil, err
}
return tls.Client(c, n.config), nil
}
func (n *netTLS) Listen(laddr string) (net.Listener, error) {
l, err := n.inner.Listen(laddr)
if err != nil {
return nil, err
}
return tls.NewListener(l, n.config), nil
}
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
package pipenet package pipenet
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"net" "net"
...@@ -232,7 +233,7 @@ func (l *listener) Accept() (net.Conn, error) { ...@@ -232,7 +233,7 @@ func (l *listener) Accept() (net.Conn, error) {
} }
// Dial tries to connect to Accept called on listener corresponding to addr // Dial tries to connect to Accept called on listener corresponding to addr
func (n *Network) Dial(addr string) (net.Conn, error) { func (n *Network) Dial(ctx context.Context, addr string) (net.Conn, error) {
derr := func(err error) error { derr := func(err error) error {
return &net.OpError{Op: "dial", Net: n.netname(), Addr: &Addr{n.netname(), addr}, Err: err} return &net.OpError{Op: "dial", Net: n.netname(), Addr: &Addr{n.netname(), addr}, Err: err}
} }
...@@ -261,6 +262,9 @@ func (n *Network) Dial(addr string) (net.Conn, error) { ...@@ -261,6 +262,9 @@ func (n *Network) Dial(addr string) (net.Conn, error) {
resp := make(chan net.Conn) resp := make(chan net.Conn)
select { select {
case <-ctx.Done():
return nil, derr(ctx.Err())
case <-l.down: case <-l.down:
return nil, derr(errConnRefused) return nil, derr(errConnRefused)
...@@ -362,13 +366,13 @@ func lookupNet(name string) (*Network, error) { ...@@ -362,13 +366,13 @@ func lookupNet(name string) (*Network, error) {
// Dial dials addr on a pipenet // Dial dials addr on a pipenet
// network should be full network name, e.g. "pipe" // network should be full network name, e.g. "pipe"
func Dial(network, addr string) (net.Conn, error) { func Dial(ctx context.Context, network, addr string) (net.Conn, error) {
n, err := lookupNet(network) n, err := lookupNet(network)
if err != nil { if err != nil {
return nil, &net.OpError{Op: "dial", Net: network, Addr: &Addr{network, addr}, Err: err} return nil, &net.OpError{Op: "dial", Net: network, Addr: &Addr{network, addr}, Err: err}
} }
return n.Dial(addr) return n.Dial(ctx, addr)
} }
// Listen starts listening on a pipenet address // Listen starts listening on a pipenet address
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package pipenet package pipenet
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"net" "net"
...@@ -45,7 +46,7 @@ func xaccept(l net.Listener) net.Conn { ...@@ -45,7 +46,7 @@ func xaccept(l net.Listener) net.Conn {
} }
func xdial(network, addr string) net.Conn { func xdial(network, addr string) net.Conn {
c, err := Dial(network, addr) c, err := Dial(context.Background(), network, addr)
exc.Raiseif(err) exc.Raiseif(err)
return c return c
} }
...@@ -79,10 +80,10 @@ func assertEq(t *testing.T, a, b interface{}) { ...@@ -79,10 +80,10 @@ func assertEq(t *testing.T, a, b interface{}) {
func TestPipeNet(t *testing.T) { func TestPipeNet(t *testing.T) {
New("α") New("α")
_, err := Dial("α", "0") _, err := Dial(context.Background(), "α", "0")
assertEq(t, err, &net.OpError{Op: "dial", Net: "α", Addr: &Addr{"α", "0"}, Err: errBadNetwork}) assertEq(t, err, &net.OpError{Op: "dial", Net: "α", Addr: &Addr{"α", "0"}, Err: errBadNetwork})
_, err = Dial("pipeα", "0") _, err = Dial(context.Background(), "pipeα", "0")
assertEq(t, err, &net.OpError{Op: "dial", Net: "pipeα", Addr: &Addr{"pipeα", "0"}, Err: errConnRefused}) assertEq(t, err, &net.OpError{Op: "dial", Net: "pipeα", Addr: &Addr{"pipeα", "0"}, Err: errConnRefused})
l1 := xlisten("pipeα", "") l1 := xlisten("pipeα", "")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment