Commit 7f7e2a02 authored by Kirill Smelkov's avatar Kirill Smelkov

X Move net + net_trace to xcommon/xnet/

parent ad8bdfdd
......@@ -24,6 +24,7 @@ import (
"../../neo"
"../../zodb"
"../../xcommon/xnet"
)
// Client talks to NEO cluster and exposes access it via ZODB interfaces
......@@ -33,7 +34,7 @@ type Client struct {
myInfo neo.NodeInfo // XXX -> only NodeUUID
clusterName string
net neo.Network // network we are sending/receiving on
net xnet.Network // network we are sending/receiving on
masterAddr string // address of master XXX -> Address ?
// ---- 8< ----
......@@ -134,7 +135,7 @@ func NewClient(storLink *neo.NodeLink) (*Client, error) {
func openClientByURL(ctx context.Context, u *url.URL) (zodb.IStorage, error) {
// XXX for now url is treated as storage node URL
// XXX check/use other url fields
net := neo.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
storLink, err := neo.Dial(ctx, net, u.Host)
if err != nil {
return nil, err
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Open Source Initiative approved licenses and Convey
// the resulting work. Corresponding source of such a combination shall include
// the source code for all other software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// XXX goes away - we don't need it.
package neo
// cluster state XXX
// ClusterInfo represents information about state and participants of a NEO cluster
//
// Usually ClusterInfo is Master's idea about the cluster which Master shares
// with other nodes. XXX text ok?
//
// XXX naming -> ClusterState ? (but conflict with proto.ClusterState)
type ClusterInfo struct {
State ClusterState // what is cluster currently doing: recovering/verification/service/...
NodeTab NodeTable // nodes participating in the cluster
PartTab PartitionTable // data space partitioning
// XXX do we want to put data movement scheduling plans here ?
}
......@@ -20,16 +20,16 @@ package neo
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"reflect"
"sync"
"sync/atomic"
"encoding/binary"
"fmt"
"reflect"
"../xcommon/xnet"
)
// NodeLink is a node-node link in NEO
......@@ -683,7 +683,7 @@ func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) {
// ---- for convenience: Dial ----
// Dial connects to address on given network, handshakes and wraps the connection as NodeLink
func Dial(ctx context.Context, net Network, addr string) (nl *NodeLink, err error) {
func Dial(ctx context.Context, net xnet.Network, addr string) (nl *NodeLink, err error) {
peerConn, err := net.Dial(ctx, addr)
if err != nil {
return nil, err
......
......@@ -26,8 +26,8 @@ import (
"io"
"os"
"../../neo"
"../../neo/server"
"../../xcommon/xnet"
)
const masterSummary = "run master node"
......@@ -64,7 +64,7 @@ func masterMain(argv []string) {
os.Exit(2)
}
net := neo.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
masterSrv := server.NewMaster(*cluster, *bind, net)
......
......@@ -27,9 +27,9 @@ import (
"os"
"strings"
"../../neo"
"../../neo/server"
"../../zodb/storage/fs1"
"../../xcommon/xnet"
)
const storageSummary = "run storage node"
......@@ -81,7 +81,7 @@ func storageMain(argv []string) {
log.Fatal(err)
}
net := neo.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
storSrv := server.NewStorage(*cluster, master, *bind, net, zstor)
......
......@@ -4,6 +4,8 @@ package neo
import (
"fmt"
"net"
"strconv"
"strings"
)
......@@ -42,3 +44,48 @@ func (nodeUUID NodeUUID) String() string {
return s
}
// ----------------------------------------
// Addr converts network address string into NEO Address
// TODO make neo.Address just string without host:port split
func AddrString(network, addr string) (Address, error) {
// e.g. on unix, pipenet, etc networks there is no host/port split - the address there
// is single string -> we put it into .Host and set .Port=0 to indicate such cases
if strings.HasPrefix(network, "tcp") || strings.HasPrefix(network, "udp") {
// networks that have host:port split
host, portstr, err := net.SplitHostPort(addr)
if err != nil {
return Address{}, err
}
// XXX also lookup portstr in /etc/services (net.LookupPort) ?
port, err := strconv.ParseUint(portstr, 10, 16)
if err != nil {
return Address{}, &net.AddrError{Err: "invalid port", Addr: addr}
}
return Address{Host: host, Port: uint16(port)}, nil
}
return Address{Host: addr, Port: 0}, nil
}
// Addr converts net.Addr into NEO Address
func Addr(addr net.Addr) (Address, error) {
return AddrString(addr.Network(), addr.String())
}
// String formats Address to networked address string
func (addr Address) String() string {
// XXX in py if .Host == "" -> whole Address is assumed to be empty
// see Addr ^^^ about .Port=0 meaning no host:port split was applied
switch addr.Port {
case 0:
return addr.Host
default:
return net.JoinHostPort(addr.Host, fmt.Sprintf("%d", addr.Port))
}
}
......@@ -25,12 +25,12 @@ import (
//"reflect"
"testing"
"../../neo"
//"../../neo/client"
//"../../zodb"
"../../zodb/storage/fs1"
"../../xcommon/xnet/pipenet"
"../../xcommon/xsync"
"lab.nexedi.com/kirr/go123/exc"
......@@ -50,7 +50,7 @@ func xfs1stor(path string) *fs1.FileStorage {
// M drives cluster with 1 S through recovery -> verification -> service -> shutdown
func TestMasterStorage(t *testing.T) {
net := neo.NetPipe("") // test network
net := pipenet.New("") // test network
Maddr := "0"
Saddr := "1"
......@@ -91,11 +91,11 @@ func TestClientStorage(t *testing.T) {
/*
Cnl, Snl := NodeLinkPipe()
wg := neo.WorkGroup()
wg := &xsync.WorkGroup{}
Sctx, Scancel := context.WithCancel(context.Background())
net := neo.NetPipe("") // XXX here? (or a bit above?)
net := pipenet.New("") // XXX here? (or a bit above?)
zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs") // XXX +readonly
S := NewStorage("cluster", "Maddr", "Saddr", net, zstor)
wg.Gox(func() {
......
......@@ -28,6 +28,7 @@ import (
"../../neo"
"../../zodb"
"../../xcommon/xnet"
"lab.nexedi.com/kirr/go123/xerr"
)
......@@ -44,10 +45,13 @@ type Master struct {
// master manages node and partition tables and broadcast their updates
// to all nodes in cluster
///*
stateMu sync.RWMutex // XXX recheck: needed ?
nodeTab neo.NodeTable
partTab neo.PartitionTable
clusterState neo.ClusterState
//*/
clusterInfo neo.ClusterInfo
// channels controlling main driver
ctlStart chan chan error // request to start cluster
......@@ -73,7 +77,7 @@ type nodeLeave struct {
}
// NewMaster TODO ...
func NewMaster(clusterName, serveAddr string, net neo.Network) *Master {
func NewMaster(clusterName, serveAddr string, net xnet.Network) *Master {
// XXX serveAddr + net
m := &Master{clusterName: clusterName}
......@@ -115,11 +119,8 @@ func (m *Master) Shutdown() error {
// setClusterState sets .clusterState and notifies subscribers
func (m *Master) setClusterState(state neo.ClusterState) {
if state == m.clusterState { // <- XXX do we really need this ?
return
}
m.clusterState = state
// TODO notify subscribers
}
......@@ -316,7 +317,7 @@ func storCtlRecovery(ctx context.Context, link *neo.NodeLink, res chan storRecov
}
resp := neo.AnswerPartitionTable{}
err = conn.Ask(&neo.X_PartitionTable{}, &resp)
err = conn.Ask(&neo.AskPartitionTable{}, &resp)
if err != nil {
return
}
......
......@@ -25,6 +25,7 @@ import (
"../../neo"
"../../zodb"
"../../xcommon/xnet"
)
// XXX fmt -> log
......@@ -36,7 +37,7 @@ type Storage struct {
myInfo neo.NodeInfo // XXX -> only Address + NodeUUID ?
clusterName string
net neo.Network // network we are sending/receiving on
net xnet.Network // network we are sending/receiving on
masterAddr string // address of master
// ---- 8< ----
......@@ -46,7 +47,7 @@ type Storage struct {
// NewStorage creates new storage node that will listen on serveAddr and talk to master on masterAddr
// The storage uses zstor as underlying backend for storing data.
// To actually start running the node - call Run. XXX text
func NewStorage(cluster, masterAddr, serveAddr string, net neo.Network, zstor zodb.IStorage) *Storage {
func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Network, zstor zodb.IStorage) *Storage {
// convert serveAddr into neo format
addr, err := neo.AddrString(net.Network(), serveAddr)
if err != nil {
......
......@@ -15,22 +15,17 @@
//
// See COPYING file for full licensing terms.
package neo
// unified interface for accessing various kinds of networks
// Package xnet provides addons to std package net
package xnet
import (
"context"
"fmt"
"net"
"strconv"
"strings"
"crypto/tls"
"../xcommon/pipenet"
)
// Network represents interface to work with some kind of streaming network
// Network is the interface to work with various kinds of streaming networks
//
// NOTE in NEO a node usually needs to both 1) listen and serve incoming
// connections, and 2) dial peers. For this reason the interface is not split
......@@ -45,6 +40,8 @@ type Network interface {
// Listen starts listening on local address laddr on underlying network
// see net.Listen for semantic details
//
// XXX also introduce xnet.Listener in which Accept() accepts also ctx?
Listen(laddr string) (net.Listener, error)
}
......@@ -70,15 +67,10 @@ func (n netPlain) Listen(laddr string) (net.Listener, error) {
return net.Listen(string(n), laddr)
}
// NetPipe creates Network corresponding to in-memory pipenet
// name is passed directly to pipenet.New
func NetPipe(name string) Network {
return pipenet.New(name)
}
// NetTLS wraps underlying network with TLS layer according to config
// The config must be valid for both tls.Client and tls.Server for Dial and Listen to work
// The config must be valid:
// - for tls.Client -- for Dial to work,
// - for tls.Server -- for Listen to work.
func NetTLS(inner Network, config *tls.Config) Network {
return &netTLS{inner, config}
}
......@@ -107,48 +99,3 @@ func (n *netTLS) Listen(laddr string) (net.Listener, error) {
}
return tls.NewListener(l, n.config), nil
}
// ----------------------------------------
// Addr converts network address string into NEO Address
// TODO make neo.Address just string without host:port split
func AddrString(network, addr string) (Address, error) {
// e.g. on unix, pipenet, etc networks there is no host/port split - the address there
// is single string -> we put it into .Host and set .Port=0 to indicate such cases
if strings.HasPrefix(network, "tcp") || strings.HasPrefix(network, "udp") {
// networks that have host:port split
host, portstr, err := net.SplitHostPort(addr)
if err != nil {
return Address{}, err
}
// XXX also lookup portstr in /etc/services (net.LookupPort) ?
port, err := strconv.ParseUint(portstr, 10, 16)
if err != nil {
return Address{}, &net.AddrError{Err: "invalid port", Addr: addr}
}
return Address{Host: host, Port: uint16(port)}, nil
}
return Address{Host: addr, Port: 0}, nil
}
// Addr converts net.Addre into NEO Address
func Addr(addr net.Addr) (Address, error) {
return AddrString(addr.Network(), addr.String())
}
// String formats Address to networked address string
func (addr Address) String() string {
// XXX in py if .Host == "" -> whole Address is assumed to be empty
// see Addr ^^^ about .Port=0 meaning no host:port split was applied
switch addr.Port {
case 0:
return addr.Host
default:
return net.JoinHostPort(addr.Host, fmt.Sprintf("%d", addr.Port))
}
}
......@@ -15,52 +15,60 @@
//
// See COPYING file for full licensing terms.
// +build test
package neo
package xnet
// network tracing
// XXX move to xnet/trace ?
import (
"context"
"net"
)
// NetTrace wraps underlying network with IO tracing layer
//
// the tracing is done via calling trace func right before corresponding packet
// Tracing is done via calling trace func right before corresponding packet
// is sent for Tx to underlying network. No synchronization for notification is
// performed - if one is required tracing func must implement such
// synchronization itself.
//
// only Tx events are traced:
// - because Write, contrary to Read, never writes partial data on non-error
// - because in case of NetPipe tracing writes only is enough to get whole network exchange picture
func NetTrace(inner Network, trace func (t *traceTx)) Network {
&netTrace{inner, trace}
// - because in case of pipenet tracing writes only is enough to get whole network exchange picture
func NetTrace(inner Network, trace func (t *TraceTx)) Network {
return &netTrace{inner, trace}
}
// traceTx is event corresponding to network transmission
type traceTx struct {
src, dst net.Addr
pkt []byte
// TraceTx is event corresponding to network transmission
type TraceTx struct {
Src, Dst net.Addr
Pkt []byte
}
// netTrace wraps underlying Network such that whenever a connection is created
// it is wrapped with traceConn
type netTrace struct {
inner Network
trace func(t *traceTx)
trace func(t *TraceTx)
}
func (nt *netTrace) Network() string {
return nt.inner.Network() // XXX + "+trace" ?
}
func (nt *netTrace) Dial(ctx context.Context, addr string) (net.Conn, error) {
c, err := nt.inner.Dial(ctx, addr)
if err != nil {
return err
return nil, err
}
return &traceConn{nt, c}
return &traceConn{nt, c}, nil
}
func (nt *netTrace) Listen(laddr string) (net.Listener, error) {
l, err := nt.inner.Listen(laddr)
if err != nil {
return err
return nil, err
}
return &netTraceListener{nt, l}
return &netTraceListener{nt, l}, nil
}
// netTraceListener wraps net.Listener to wrap accepted connections with traceConn
......@@ -74,7 +82,7 @@ func (ntl *netTraceListener) Accept() (net.Conn, error) {
if err != nil {
return nil, err
}
return &traceConn{ntl.nt, c}
return &traceConn{ntl.nt, c}, nil
}
// traceConn wraps net.Conn and notifies tracer on Writes
......@@ -84,7 +92,7 @@ type traceConn struct {
}
func (tc *traceConn) Write(b []byte) (int, error) {
t := &traceTx{src: tc.LocalAddr(), dst: tc.RemoteAddr(), pkt: b}
t := &TraceTx{Src: tc.LocalAddr(), Dst: tc.RemoteAddr(), Pkt: b}
tc.nt.trace(t)
return tc.Conn.Write(b)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment