Commit c4776192 authored by Kirill Smelkov's avatar Kirill Smelkov

X master, storage: Speak both NEO and HTTP protocol on the same port

We can detect which protocol a client speaks by analyzing the first few
bytes it sends. Then we can use HTTP for administrative & debugging tasks.

In particular HTTP right now allows to collect profiles and traces:

	https://golang.org/pkg/net/http/pprof/

and requests tracing for places where it is used:

	https://godoc.org/golang.org/x/net/trace

Even though it is a pity that such wrapping retains artifact that the
connection actually used is a wrapper connection over accepted one, the
performance impact is negligable:

	name                             old time/object  new time/object  delta
	deco/neo/go/fs1(!sha1)/zhash.go      31.6µs ± 2%      31.6µs ± 2%   ~     (p=0.897 n=28+29)
parent f069dd10
......@@ -84,7 +84,7 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
func newClient(clusterName, masterAddr string, net xnet.Networker) *Client {
return &Client{
node: NewNodeApp(net, proto.CLIENT, clusterName, masterAddr, ""),
node: NewNodeApp(net, proto.CLIENT, clusterName, masterAddr),
mlinkReady: make(chan struct{}),
operational: false,
opReady: make(chan struct{}),
......
......@@ -38,6 +38,7 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/storage"
"lab.nexedi.com/kirr/neo/go/zodb"
......@@ -67,6 +68,9 @@ type eventNetConnect struct {
// xnet.TraceListen
// event: node starts listening
//
// XXX we don't actually need this event - nodes always start with already provided listener
// TODO -> remove.
type eventNetListen struct {
Laddr string
}
......@@ -410,6 +414,54 @@ func (t *TraceCollector) traceMasterStartReady(m *Master, ready bool) {
t.d.Dispatch(&eventMStartReady{where, ready})
}
// ----------------------------------------
// test-wrapper around Storage - to automatically listen by address, not provided listener.
type tStorage struct {
*Storage
serveAddr string
}
func tNewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, back storage.Backend) *tStorage {
return &tStorage{
Storage: NewStorage(clusterName, masterAddr, net, back),
serveAddr: serveAddr,
}
}
func (s *tStorage) Run(ctx context.Context) error {
l, err := s.node.Net.Listen(s.serveAddr)
if err != nil {
return err
}
return s.Storage.Run(ctx, l)
}
// test-wrapper around Master - to automatically listen by address, not provided listener.
type tMaster struct {
*Master
serveAddr string
}
func tNewMaster(clusterName, serveAddr string, net xnet.Networker) *tMaster {
return &tMaster{
Master: NewMaster(clusterName, net),
serveAddr: serveAddr,
}
}
func (m *tMaster) Run(ctx context.Context) error {
l, err := m.node.Net.Listen(m.serveAddr)
if err != nil {
return err
}
return m.Master.Run(ctx, l)
}
// ----------------------------------------
// M drives cluster with 1 S & C through recovery -> verification -> service -> shutdown
......@@ -506,10 +558,10 @@ func TestMasterStorage(t *testing.T) {
rt.BranchState("c", cMC) // state on C is controlled by M
// cluster nodes
M := NewMaster("abc1", ":1", Mhost)
M := tNewMaster("abc1", ":1", Mhost)
zstor := xfs1stor("../zodb/storage/fs1/testdata/1.fs")
zback := xfs1back("../zodb/storage/fs1/testdata/1.fs")
S := NewStorage("abc1", "m:1", ":1", Shost, zback)
S := tNewStorage("abc1", "m:1", ":1", Shost, zback)
C := newClient("abc1", "m:1", Chost)
// let tracer know how to map state addresses to node names
......@@ -928,7 +980,7 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
defer cancel()
// spawn M
M := NewMaster("abc1", "", Mnet)
M := tNewMaster("abc1", "", Mnet)
// XXX to wait for "M listens at ..." & "ready to start" -> XXX add something to M api?
cG := tracetest.NewSyncChan("main")
......@@ -958,7 +1010,7 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
ev.Ack()
// now after we know Maddr create S & C and start S serving
S := NewStorage("abc1", Maddr, "", Snet, zback)
S := tNewStorage("abc1", Maddr, "", Snet, zback)
C := NewClient("abc1", Maddr, Cnet)
wg.Go(func() error {
......
......@@ -25,6 +25,7 @@ import (
"flag"
"fmt"
"io"
stdnet "net"
"os"
"lab.nexedi.com/kirr/go123/prog"
......@@ -57,15 +58,13 @@ func masterMain(argv []string) {
}
argv = flags.Args()
if len(argv) < 1 {
if len(argv) > 0 {
flags.Usage()
prog.Exit(2)
}
net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
masterSrv := neo.NewMaster(*cluster, *bind, net)
ctx := context.Background()
/*
ctx, cancel := context.WithCancel(context.Background())
......@@ -75,7 +74,11 @@ func masterMain(argv []string) {
}()
*/
err := masterSrv.Run(ctx)
err := listenAndServe(ctx, net, *bind, func(ctx context.Context, l stdnet.Listener) error {
master := neo.NewMaster(*cluster, net)
return master.Run(ctx, l)
})
if err != nil {
prog.Fatal(err)
}
......
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package main
// routines common to several subcommands
import (
"context"
"encoding/binary"
stdnet "net"
"net/http"
"io"
"fmt"
"golang.org/x/sync/errgroup"
"github.com/soheilhy/cmux"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
_ "net/http/pprof"
)
// neoMatch tells whether incoming stream starts like a NEO protocol handshake word.
func neoMatch(r io.Reader) bool {
var b [4]byte
n, _ := io.ReadFull(r, b[:])
if n < 4 {
return false
}
version := binary.BigEndian.Uint32(b[:])
return (version < 0xff) // so it looks like 00 00 00 v
}
// listenAndServe runs service on laddr.
//
// It starts listening, multiplexes incoming connection to NEO and HTTP
// protocols, passes NEO connections to service and passes HTTP connection to
// default HTTP mux.
//
// default HTTP mux can be assumed to contain /debug/pprof and the like.
func listenAndServe(ctx context.Context, net xnet.Networker, laddr string, serve func(ctx context.Context, l stdnet.Listener) error) error {
l, err := net.Listen(laddr)
if err != nil {
return err
}
// XXX who closes l?
log.Infof(ctx, "listening at %s ...", l.Addr())
log.Flush() // XXX ok?
mux := cmux.New(l)
neoL := mux.Match(neoMatch)
httpL := mux.Match(cmux.HTTP1(), cmux.HTTP2()) // XXX verify http2 works
miscL := mux.Match(cmux.Any())
wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
// XXX shutdown serve on ctx cancel
return mux.Serve()
})
wg.Go(func() error {
return serve(ctx, neoL)
})
wg.Go(func() error {
// XXX shutdown http on ctx cancel
return http.Serve(httpL, nil)
})
wg.Go(func() error {
// XXX shutdown on ctx cancel
for {
conn, err := miscL.Accept()
if err != nil {
return err
}
// got something unexpected - grab the header (which we
// already have read), log it and reject the
// connection.
b := make([]byte, 1024)
// must not block as some data is already there in cmux buffer
n, _ := conn.Read(b)
subj := fmt.Sprintf("strange connection from %s:", conn.RemoteAddr())
serr := "peer sent nothing"
if n > 0 {
serr = fmt.Sprintf("peer sent %q", b[:n])
}
log.Infof(ctx, "%s: %s", subj, serr)
conn.Close() // XXX lclose
}
})
err = wg.Wait()
return err
}
......@@ -24,6 +24,7 @@ import (
"context"
"flag"
"fmt"
stdnet "net"
"io"
"os"
"runtime"
......@@ -97,16 +98,6 @@ func storageMain(argv []string) {
maxprocs := runtime.GOMAXPROCS(0)
runtime.GOMAXPROCS(maxprocs*8) // XXX *8 is enough?
back, err := storage.OpenBackend(context.Background(), argv[0])
if err != nil {
prog.Fatal(err)
}
net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
storSrv := neo.NewStorage(*cluster, master, *bind, net, back)
ctx := context.Background()
/*
ctx, cancel := context.WithCancel(context.Background())
......@@ -116,7 +107,20 @@ func storageMain(argv []string) {
}()
*/
err = storSrv.Run(ctx)
back, err := storage.OpenBackend(ctx, argv[0])
if err != nil {
prog.Fatal(err)
}
net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
err = listenAndServe(ctx, net, *bind, func(ctx context.Context, l stdnet.Listener) error {
stor := neo.NewStorage(*cluster, master, net, back)
return stor.Run(ctx, l)
})
// XXX back.Close
if err != nil {
prog.Fatal(err)
}
......
......@@ -24,6 +24,7 @@ import (
"context"
stderrors "errors"
"fmt"
stdnet "net"
"sync"
"time"
......@@ -69,12 +70,12 @@ type Master struct {
}
// NewMaster creates new master node that will listen on serveAddr.
// NewMaster creates new master node.
//
// Use Run to actually start running the node.
func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
func NewMaster(clusterName string, net xnet.Networker) *Master {
m := &Master{
node: NewNodeApp(net, proto.MASTER, clusterName, serveAddr, serveAddr),
node: NewNodeApp(net, proto.MASTER, clusterName, ""),
ctlStart: make(chan chan error),
ctlStop: make(chan chan struct{}),
......@@ -127,22 +128,18 @@ func (m *Master) setClusterState(state proto.ClusterState) {
// Run starts master node and runs it until ctx is cancelled or fatal error.
func (m *Master) Run(ctx context.Context) (err error) {
// start listening
l, err := m.node.Listen()
if err != nil {
return err // XXX err ctx
}
defer task.Runningf(&ctx, "master(%v)", l.Addr())(&err)
//
// The master will be serving incoming connections on l.
func (m *Master) Run(ctx context.Context, l stdnet.Listener) (err error) {
addr := l.Addr()
defer task.Runningf(&ctx, "master(%v)", addr)(&err)
m.node.MasterAddr = l.Addr().String()
naddr, err := proto.Addr(l.Addr())
// update our master & serving address in node
naddr, err := proto.Addr(addr)
if err != nil {
// must be ok since l.Addr() is valid since it is listening
panic(err)
return err
}
m.node.MasterAddr = addr.String()
m.node.MyInfo = proto.NodeInfo{
Type: proto.MASTER,
Addr: naddr,
......@@ -154,6 +151,9 @@ func (m *Master) Run(ctx context.Context) (err error) {
// update nodeTab with self
m.node.NodeTab.Update(m.node.MyInfo)
// wrap listener with link / identificaton hello checker
ll := neonet.NewLinkListener(l)
lli := requireIdentifyHello(ll)
// accept incoming connections and pass them to main driver
wg := sync.WaitGroup{}
......@@ -169,7 +169,7 @@ func (m *Master) Run(ctx context.Context) (err error) {
return ctx.Err()
}
req, idReq, err := l.Accept(ctx)
req, idReq, err := lli.Accept(ctx)
if err != nil {
if !xcontext.Canceled(err) {
log.Error(ctx, err) // XXX throttle?
......@@ -207,7 +207,7 @@ func (m *Master) Run(ctx context.Context) (err error) {
err = m.runMain(ctx)
serveCancel()
lclose(ctx, l)
lclose(ctx, lli)
wg.Wait()
return err
......
......@@ -61,15 +61,9 @@ type NodeApp struct {
}
// NewNodeApp creates new node application
func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr, serveAddr string) *NodeApp {
// convert serveAddr into neo format
addr, err := proto.AddrString(net.Network(), serveAddr)
if err != nil {
panic(err) // XXX
}
func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr string) *NodeApp {
app := &NodeApp{
MyInfo: proto.NodeInfo{Type: typ, Addr: addr, UUID: 0, IdTime: proto.IdTimeNone},
MyInfo: proto.NodeInfo{Type: typ, Addr: proto.Address{}, UUID: 0, IdTime: proto.IdTimeNone},
ClusterName: clusterName,
Net: net,
MasterAddr: masterAddr,
......@@ -153,41 +147,8 @@ func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr stri
}
// Listen starts listening at node's listening address.
//
// If the address is empty one new free is automatically selected.
// The node information about where it listens at is appropriately updated.
func (app *NodeApp) Listen() (Listener, error) {
// start listening
ll, err := neonet.ListenLink(app.Net, app.MyInfo.Addr.String())
if err != nil {
return nil, err // XXX err ctx
}
// now we know our listening address (in case it was autobind before)
// NOTE listen("tcp", ":1234") gives l.Addr 0.0.0.0:1234 and
// listen("tcp6", ":1234") gives l.Addr [::]:1234
// -> host is never empty
addr, err := proto.Addr(ll.Addr())
if err != nil {
// XXX -> panic here ?
ll.Close()
return nil, err // XXX err ctx
}
app.MyInfo.Addr = addr
l := &listener{
l: ll,
acceptq: make(chan accepted),
closed: make(chan struct{}),
}
go l.run()
return l, nil
}
// Listener is LinkListener adapted to return NodeLink with requested identification on Accept.
// XXX name -> idListener?
type Listener interface {
// from LinkListener:
Close() error
......@@ -208,6 +169,18 @@ type Listener interface {
Accept(ctx context.Context) (*neonet.Request, *proto.RequestIdentification, error)
}
// requireIdentifyHello wraps inner LinkListener into ^^^ Listener.
func requireIdentifyHello(inner neonet.LinkListener) Listener {
l := &listener{
l: inner,
acceptq: make(chan accepted),
closed: make(chan struct{}),
}
go l.run()
return l
}
type listener struct {
l neonet.LinkListener
acceptq chan accepted
......
......@@ -155,14 +155,21 @@ func ListenLink(net xnet.Networker, laddr string) (LinkListener, error) {
return nil, err
}
return NewLinkListener(rawl), nil
}
// NewLinkListener creates LinkListener which accepts connections from an inner
// net.Listener and wraps them as NodeLink.
//
// The listener accepts only those connections that pass NEO protocol handshake.
func NewLinkListener(inner net.Listener) LinkListener {
l := &linkListener{
l: rawl,
l: inner,
acceptq: make(chan linkAccepted),
closed: make(chan struct{}),
}
go l.run()
return l, nil
return l
}
// LinkListener is net.Listener adapted to return handshaked NodeLink on Accept.
......
......@@ -23,6 +23,7 @@ package neo
import (
"context"
"fmt"
stdnet "net"
"sync"
"time"
......@@ -58,13 +59,13 @@ type Storage struct {
//nodeCome chan nodeCome // node connected
}
// NewStorage creates new storage node that will listen on serveAddr and talk to master on masterAddr.
// NewStorage creates new storage node that will talk to master on masterAddr.
//
// The storage uses back as underlying backend for storing data.
// Use Run to actually start running the node.
func NewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, back storage.Backend) *Storage {
func NewStorage(clusterName, masterAddr string, net xnet.Networker, back storage.Backend) *Storage {
stor := &Storage{
node: NewNodeApp(net, proto.STORAGE, clusterName, masterAddr, serveAddr),
node: NewNodeApp(net, proto.STORAGE, clusterName, masterAddr),
back: back,
}
......@@ -79,14 +80,22 @@ func NewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, b
// Run starts storage node and runs it until either ctx is cancelled or master
// commands it to shutdown.
func (stor *Storage) Run(ctx context.Context) error {
// start listening
l, err := stor.node.Listen()
//
// The storage will be serving incoming connections on l.
func (stor *Storage) Run(ctx context.Context, l stdnet.Listener) (err error) {
addr := l.Addr()
defer task.Runningf(&ctx, "storage(%v)", addr)(&err)
// update our serving address in node
naddr, err := proto.Addr(addr)
if err != nil {
return err // XXX err ctx
return err
}
stor.node.MyInfo.Addr = naddr
defer task.Runningf(&ctx, "storage(%v)", l.Addr())(&err)
// wrap listener with link / identificaton hello checker
ll := neonet.NewLinkListener(l)
lli := requireIdentifyHello(ll)
// start serving incoming connections
wg := sync.WaitGroup{}
......@@ -96,7 +105,7 @@ func (stor *Storage) Run(ctx context.Context) error {
// XXX hack: until ctx cancel is not handled properly by Recv/Send
stor.node.OnShutdown = func() {
serveCancel()
lclose(ctx, l)
lclose(ctx, lli)
}
wg.Add(1)
......@@ -110,7 +119,7 @@ func (stor *Storage) Run(ctx context.Context) error {
return ctx.Err()
}
req, idReq, err := l.Accept(ctx)
req, idReq, err := lli.Accept(ctx)
if err != nil {
if !xcontext.Canceled(err) {
log.Error(ctx, err) // XXX throttle?
......
......@@ -285,8 +285,6 @@ Spy() {
# Sgo <data.fs> - spawn NEO/go storage
Sgo() {
# -alsologtostderr
# -cpuprofile cpu.out
# -trace trace.out
exec -a Sgo \
neo -log_dir=$log storage -cluster=$neocluster -bind=$Sbind -masters=$Mbind "$@" &
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment