Commit 2c040f4b authored by Kirill Smelkov's avatar Kirill Smelkov

X Start moving internal infrastructure -> xneo

parent db4a166b
......@@ -64,3 +64,14 @@ func CloseWhenDone(ctx context.Context, c io.Closer) func() {
}
})
}
// LClose closes c and logs closing error if there was any.
// the error is otherwise ignored
//
// XXX naming? -> CloseOrLog? CloseErrLog? CloseLogErr? CloseAndLogErr?
func LClose(ctx context.Context, c io.Closer) {
err := c.Close()
if err != nil {
log.Error(ctx, err)
}
}
......@@ -45,12 +45,13 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/internal/xsha1"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/xneo"
"lab.nexedi.com/kirr/neo/go/zodb"
)
// Client is NEO node that talks to NEO cluster and exposes access to it via ZODB interfaces.
type Client struct {
node *NodeApp
node *xneo.NodeApp
talkMasterCancel func()
......@@ -96,7 +97,7 @@ var _ zodb.IStorageDriver = (*Client)(nil)
// Use Run to actually start running the node.
func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
return &Client{
node: NewNodeApp(net, proto.CLIENT, clusterName, masterAddr),
node: xneo.NewNodeApp(net, proto.CLIENT, clusterName, masterAddr),
mlinkReady: make(chan struct{}),
operational: false,
opReady: make(chan struct{}),
......@@ -288,7 +289,7 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
if err != nil {
return fmt.Errorf("after identification: %w", err)
}
pt := PartTabFromDump(mpt.PTid, mpt.RowList) // TODO handle mpt.NumReplicas
pt := xneo.PartTabFromDump(mpt.PTid, mpt.RowList) // TODO handle mpt.NumReplicas
log.Infof(ctx, "master initialized us with next parttab:\n%s", pt)
c.node.StateMu.Lock()
......@@ -526,7 +527,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
// here we have cluster state operational and rlocked. Retrieve
// storages we might need to access and release the lock.
storv := make([]*Node, 0, 1)
storv := make([]*xneo.Node, 0, 1)
for _, cell := range c.node.PartTab.Get(xid.Oid) {
if cell.Readable() {
stor := c.node.NodeTab.Get(cell.UUID)
......
......@@ -33,6 +33,7 @@ import (
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/xio"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
// for http://.../debug/pprof
......@@ -141,7 +142,7 @@ func listenAndServe(ctx context.Context, net xnet.Networker, laddr string, serve
}
log.Infof(ctx, "%s: %s", subj, serr)
conn.Close() // XXX lclose
xio.LClose(ctx, conn)
}
})
......
......@@ -33,6 +33,7 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/xneo"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task"
......@@ -42,7 +43,7 @@ import (
// Master is a node overseeing and managing how whole NEO cluster works.
type Master struct {
node *NodeApp
node *xneo.NodeApp
// master manages node and partition tables and broadcast their updates
// to all nodes in cluster
......@@ -72,7 +73,7 @@ type Master struct {
// Use Run to actually start running the node.
func NewMaster(clusterName string, net xnet.Networker) *Master {
m := &Master{
node: NewNodeApp(net, proto.MASTER, clusterName, ""),
node: xneo.NewNodeApp(net, proto.MASTER, clusterName, ""),
ctlStart: make(chan chan error),
ctlStop: make(chan chan struct{}),
......@@ -150,7 +151,7 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
// wrap listener with link / identificaton hello checker
ll := neonet.NewLinkListener(l)
lli := requireIdentifyHello(ll)
lli := xneo.NewListener(ll)
// accept incoming connections and pass them to main driver
wg := sync.WaitGroup{}
......@@ -194,7 +195,7 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
case <-ctx.Done():
// shutdown
lclose(ctx, req.Link())
xio.LClose(ctx, req.Link())
continue
}
}
......@@ -204,7 +205,7 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
err = m.runMain(ctx)
serveCancel()
lclose(ctx, lli)
xio.LClose(ctx, lli)
wg.Wait()
return err
......@@ -270,8 +271,8 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// storRecovery is result of 1 storage node passing recovery phase
type storRecovery struct {
stor *Node
partTab *PartitionTable
stor *xneo.Node
partTab *xneo.PartitionTable
err error
// XXX + backup_tid, truncate_tid ?
......@@ -459,13 +460,13 @@ loop2:
// if we are starting for new cluster - create partition table
if m.node.PartTab.PTid == 0 {
// XXX -> m.nodeTab.StorageList(State > DOWN)
storv := []*Node{}
storv := []*xneo.Node{}
for _, stor := range m.node.NodeTab.StorageList() {
if stor.State > proto.DOWN {
storv = append(storv, stor)
}
}
m.node.PartTab = MakePartTab(1 /* XXX hardcoded */, storv)
m.node.PartTab = xneo.MakePartTab(1 /* XXX hardcoded */, storv)
m.node.PartTab.PTid = 1
log.Infof(ctx, "creating new partition table: %s", m.node.PartTab)
}
......@@ -475,7 +476,7 @@ loop2:
// storCtlRecovery drives a storage node during cluster recovering state
// it retrieves various ids and partition table from as stored on the storage
func storCtlRecovery(ctx context.Context, stor *Node, res chan storRecovery) {
func storCtlRecovery(ctx context.Context, stor *xneo.Node, res chan storRecovery) {
var err error
defer func() {
if err == nil {
......@@ -504,7 +505,7 @@ func storCtlRecovery(ctx context.Context, stor *Node, res chan storRecovery) {
}
// reconstruct partition table from response
pt := PartTabFromDump(resp.PTid, resp.RowList) // TODO handle resp.NumReplicas
pt := xneo.PartTabFromDump(resp.PTid, resp.RowList) // TODO handle resp.NumReplicas
res <- storRecovery{stor: stor, partTab: pt}
}
......@@ -672,14 +673,14 @@ loop2:
// storVerify is result of a storage node passing verification phase
type storVerify struct {
stor *Node
stor *xneo.Node
lastOid zodb.Oid
lastTid zodb.Tid
err error
}
// storCtlVerify drives a storage node during cluster verifying (= starting) state
func storCtlVerify(ctx context.Context, stor *Node, pt *PartitionTable, res chan storVerify) {
func storCtlVerify(ctx context.Context, stor *xneo.Node, pt *xneo.PartitionTable, res chan storVerify) {
// XXX link.Close on err -> = xcontext.WithCloseOnErrCancel
// XXX cancel on ctx -> = ^^^
......@@ -737,7 +738,7 @@ func storCtlVerify(ctx context.Context, stor *Node, pt *PartitionTable, res chan
// serviceDone is the error returned after service-phase node handling is finished
type serviceDone struct {
node *Node
node *xneo.Node
err error
}
......@@ -844,7 +845,7 @@ loop:
}
// storCtlService drives a storage node during cluster service state
func storCtlService(ctx context.Context, stor *Node) (err error) {
func storCtlService(ctx context.Context, stor *xneo.Node) (err error) {
slink := stor.Link()
defer task.Runningf(&ctx, "%s: stor service", slink.RemoteAddr())(&err)
......@@ -891,7 +892,7 @@ func storCtlService(ctx context.Context, stor *Node) (err error) {
}
// serveClient serves incoming client link
func (m *Master) serveClient(ctx context.Context, cli *Node) (err error) {
func (m *Master) serveClient(ctx context.Context, cli *xneo.Node) (err error) {
clink := cli.Link()
defer task.Runningf(&ctx, "%s: client service", clink.RemoteAddr())(&err)
......@@ -1032,7 +1033,7 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er
// If node identification is accepted .nodeTab is updated and corresponding node entry is returned.
// Response message is constructed but not send back not to block the caller - it is
// the caller responsibility to send the response to node which requested identification.
func (m *Master) identify(ctx context.Context, n nodeCome) (node *Node, resp proto.Msg) {
func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.Node, resp proto.Msg) {
// XXX also verify ? :
// - NodeType valid
// - IdTime ?
......
......@@ -23,234 +23,3 @@
package neo
//go:generate gotrace gen .
import (
"context"
"fmt"
"net"
"sync"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task"
//"lab.nexedi.com/kirr/neo/go/internal/xio"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
)
// NodeApp is base for implementing NEO node applications.
//
// XXX -> internal?
type NodeApp struct {
MyInfo proto.NodeInfo
ClusterName string
Net xnet.Networker // network AP we are sending/receiving on
MasterAddr string // address of current master XXX put under StateMu ?
StateMu sync.RWMutex // <- XXX just embed?
NodeTab *NodeTable // information about nodes in the cluster
PartTab *PartitionTable // information about data distribution in the cluster
ClusterState proto.ClusterState // master idea about cluster state
// should be set by user so NodeApp can notify when master tells this node to shutdown
OnShutdown func()
}
// NewNodeApp creates new node application
func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr string) *NodeApp {
app := &NodeApp{
MyInfo: proto.NodeInfo{Type: typ, Addr: proto.Address{}, UUID: 0, IdTime: proto.IdTimeNone},
ClusterName: clusterName,
Net: net,
MasterAddr: masterAddr,
NodeTab: &NodeTable{},
PartTab: &PartitionTable{},
ClusterState: -1, // invalid
}
app.NodeTab.nodeApp = app
return app
}
// Dial connects to another node in the cluster.
//
// It handshakes, requests identification and checks peer type. If successful returned are:
//
// - established link
// - accept identification reply
//
// Dial does not update .NodeTab or its node entries in any way.
// For establishing links to peers present in .NodeTab use Node.Dial.
//
// XXX unexport
func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr string) (_ *neonet.NodeLink, _ *proto.AcceptIdentification, err error) {
defer task.Runningf(&ctx, "dial %v (%v)", addr, peerType)(&err)
link, err := neonet.DialLink(ctx, app.Net, addr)
if err != nil {
return nil, nil, err
}
log.Info(ctx, "dialed ok; requesting identification...")
defer xerr.Contextf(&err, "%s: request identification", link)
// close link on error or FIXME: ctx cancel
//cleanup := xio.CloseWhenDone(ctx, link)
defer func() {
if err != nil {
// FIXME wrong - err=nil -> goroutine still left hanging waiting
// for ctx and will close link if dial ctx closes
// cleanup()
lclose(ctx, link)
}
}()
req := &proto.RequestIdentification{
NodeType: app.MyInfo.Type,
UUID: app.MyInfo.UUID,
Address: app.MyInfo.Addr,
ClusterName: app.ClusterName,
IdTime: app.MyInfo.IdTime, // XXX ok?
DevPath: nil, // XXX stub
NewNID: nil, // XXX stub
}
accept := &proto.AcceptIdentification{}
// FIXME error if peer sends us something with another connID
// (currently we ignore and serveRecv will deadlock)
//
// XXX solution could be:
// link.CloseAccept()
// link.Ask1(req, accept)
// link.Listen()
// XXX but there is a race window in between recv in ask and listen
// start, and if peer sends new connection in that window it will be rejected.
//
// TODO thinking.
err = link.Ask1(req, accept)
if err != nil {
return nil, nil, err
}
// XXX vvv move out of here (e.g. to DialPeer) if we are not checking everthing in full here?
if accept.NodeType != peerType {
// XXX send Error to peer?
return nil, nil, fmt.Errorf("accepted, but peer is not %v (identifies as %v)", peerType, accept.NodeType)
}
// XXX accept.MyUUID, link // XXX register .NodeTab? (or better LinkTab as NodeTab is driven by M)
// XXX accept.YourUUID // XXX M can tell us to change UUID -> take in effect
// XXX accept.NumPartitions, ... wrt app.node.PartTab
log.Info(ctx, "identification accepted")
return link, accept, nil
}
// Listener is LinkListener adapted to return NodeLink with requested identification on Accept.
type Listener interface {
// from LinkListener:
Close() error
Addr() net.Addr
// Accept accepts incoming client connection.
//
// On success the link was handshaked and peer sent us RequestIdentification
// packet which we did not yet answer.
//
// On success returned are:
// - original peer request that carried identification
// - requested identification packet
//
// After successful accept it is the caller responsibility to reply the request.
//
// NOTE established link is Request.Link().
Accept(ctx context.Context) (*neonet.Request, *proto.RequestIdentification, error)
}
// requireIdentifyHello wraps inner LinkListener into ^^^ Listener.
// XXX naming -> NewListener ?
func requireIdentifyHello(inner neonet.LinkListener) Listener {
return &listener{l: inner}
}
type listener struct {
l neonet.LinkListener
}
func (l *listener) Accept(ctx context.Context) (_ *neonet.Request, _ *proto.RequestIdentification, err error) {
link, err := l.l.Accept(ctx)
if err != nil {
return nil, nil, err
}
// identify peer
// the first conn must come with RequestIdentification packet
defer xerr.Context(&err, "identify") // XXX -> task.ErrContext?
req, err := link.Recv1(/*ctx*/)
if err != nil {
return nil, nil, err
}
switch msg := req.Msg.(type) {
case *proto.RequestIdentification:
return &req, msg, nil
}
emsg := &proto.Error{proto.PROTOCOL_ERROR, fmt.Sprintf("unexpected message %T", req.Msg)}
req.Reply(emsg) // XXX err
return nil, nil, emsg
}
func (l *listener) Close() error { return l.l.Close() }
func (l *listener) Addr() net.Addr { return l.l.Addr() }
// ----------------------------------------
// UpdateNodeTab applies updates to .NodeTab from message and logs changes appropriately.
func (app *NodeApp) UpdateNodeTab(ctx context.Context, msg *proto.NotifyNodeInformation) {
// XXX msg.IdTime ?
for _, nodeInfo := range msg.NodeList {
log.Infof(ctx, "node update: %v", nodeInfo)
app.NodeTab.Update(nodeInfo)
// XXX we have to provide IdTime when requesting identification to other peers
// (e.g. Spy checks this is what master broadcast them and if not replies "unknown by master")
if nodeInfo.UUID == app.MyInfo.UUID {
// XXX recheck locking
// XXX do .MyInfo = nodeInfo ?
app.MyInfo.IdTime = nodeInfo.IdTime
// FIXME hack - better it be separate command and handled cleanly
if nodeInfo.State == proto.DOWN {
log.Info(ctx, "master told us to shutdown")
log.Flush()
app.OnShutdown()
// os.Exit(1)
return
}
}
}
// FIXME logging under lock (if caller took e.g. .StateMu before applying updates)
log.Infof(ctx, "full nodetab:\n%s", app.NodeTab)
}
// UpdatePartTab applies updates to .PartTab from message and logs changes appropriately.
func (app *NodeApp) UpdatePartTab(ctx context.Context, msg *proto.SendPartitionTable) {
pt := PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas
// XXX logging under lock
log.Infof(ctx, "parttab update: %v", pt)
app.PartTab = pt
}
// UpdateClusterState applies update to .ClusterState from message and logs change appropriately.
func (app *NodeApp) UpdateClusterState(ctx context.Context, msg *proto.NotifyClusterState) {
// XXX loging under lock
log.Infof(ctx, "state update: %v", msg.State)
app.ClusterState.Set(msg.State)
}
......@@ -31,6 +31,7 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/storage"
"lab.nexedi.com/kirr/neo/go/neo/xneo"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task"
......@@ -47,7 +48,7 @@ import (
//
// Storage implements only NEO protocol logic with data being persisted via provided storage.Backend.
type Storage struct {
node *NodeApp
node *xneo.NodeApp
// context for providing operational service
// it is renewed every time master tells us StartOpertion, so users
......@@ -66,7 +67,7 @@ type Storage struct {
// Use Run to actually start running the node.
func NewStorage(clusterName, masterAddr string, net xnet.Networker, back storage.Backend) *Storage {
stor := &Storage{
node: NewNodeApp(net, proto.STORAGE, clusterName, masterAddr),
node: xneo.NewNodeApp(net, proto.STORAGE, clusterName, masterAddr),
back: back,
}
......@@ -96,7 +97,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
// wrap listener with link / identificaton hello checker
ll := neonet.NewLinkListener(l)
lli := requireIdentifyHello(ll)
lli := xneo.NewListener(ll)
// start serving incoming connections
wg := sync.WaitGroup{}
......@@ -107,7 +108,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
// XXX -> xcontext.WithCloseOnRetCancel
stor.node.OnShutdown = func() {
serveCancel()
lclose(ctx, lli)
xio.LClose(ctx, lli)
}
wg.Add(1)
......@@ -143,7 +144,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
//
// case <-ctx.Done():
// // shutdown
// lclose(ctx, req.Link())
// xio.LClose(ctx, req.Link())
// continue
// }
}
......
......@@ -43,6 +43,7 @@ import (
"lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task"
"lab.nexedi.com/kirr/neo/go/internal/xio"
"lab.nexedi.com/kirr/neo/go/zodb"
_ "lab.nexedi.com/kirr/neo/go/zodb/wks"
......@@ -430,7 +431,7 @@ func zwrkPreconnect(ctx context.Context, url string, at zodb.Tid, nwrk int) (_ [
if err != nil {
for _, stor := range storv {
if stor != nil {
stor.Close() // XXX lclose
xio.LClose(stor)
}
}
return nil, err
......
......@@ -20,14 +20,13 @@
package neo
// NEO/go event tracer
//go:generate gotrace gen .
import (
"lab.nexedi.com/kirr/go123/tracing"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/xneo"
)
// GoTraceCollector collects events from NEO/go trace points and sends them to event dispatcher.
......@@ -39,8 +38,8 @@ type TraceCollector struct {
pg *tracing.ProbeGroup
rx interface { RxEvent(interface{}) }
node2Name map[*NodeApp]string
nodeTab2Owner map[*NodeTable]string
node2Name map[*xneo.NodeApp]string
nodeTab2Owner map[*xneo.NodeTable]string
clusterState2Owner map[*proto.ClusterState]string
}
......@@ -49,14 +48,15 @@ func NewTraceCollector(rx interface { RxEvent(interface{}) }) *TraceCollector {
pg: &tracing.ProbeGroup{},
rx: rx,
node2Name: make(map[*NodeApp]string),
nodeTab2Owner: make(map[*NodeTable]string),
node2Name: make(map[*xneo.NodeApp]string),
nodeTab2Owner: make(map[*xneo.NodeTable]string),
clusterState2Owner: make(map[*proto.ClusterState]string),
}
}
//trace:import "lab.nexedi.com/kirr/neo/go/neo/neonet"
//trace:import "lab.nexedi.com/kirr/neo/go/neo/proto"
//trace:import "lab.nexedi.com/kirr/neo/go/neo/xneo"
// Attach attaches the tracer to appropriate trace points.
func (t *TraceCollector) Attach() {
......@@ -64,7 +64,7 @@ func (t *TraceCollector) Attach() {
//neo_traceMsgRecv_Attach(t.pg, t.traceNeoMsgRecv)
neonet_traceMsgSendPre_Attach(t.pg, t.traceNeoMsgSendPre)
proto_traceClusterStateChanged_Attach(t.pg, t.traceClusterState)
traceNodeChanged_Attach(t.pg, t.traceNode)
xneo_traceNodeChanged_Attach(t.pg, t.traceNode)
traceMasterStartReady_Attach(t.pg, t.traceMasterStartReady)
tracing.Unlock()
}
......@@ -77,7 +77,7 @@ func (t *TraceCollector) Detach() {
//
// This way it can translate e.g. *NodeTable -> owner node name when creating
// corresponding event.
func (t *TraceCollector) RegisterNode(node *NodeApp, name string) {
func (t *TraceCollector) RegisterNode(node *xneo.NodeApp, name string) {
tracing.Lock()
defer tracing.Unlock()
......@@ -119,7 +119,7 @@ func (t *TraceCollector) traceClusterState(cs *proto.ClusterState) {
t.rx.RxEvent(&eventClusterState{where, *cs})
}
func (t *TraceCollector) traceNode(nt *NodeTable, n *Node) {
func (t *TraceCollector) traceNode(nt *xneo.NodeTable, n *xneo.Node) {
//t.rx.RxEvent(&eventNodeTab{unsafe.Pointer(nt), n.NodeInfo})
where := t.nodeTab2Owner[nt]
t.rx.RxEvent(&eventNodeTab{where, n.NodeInfo})
......
// Copyright (C) 2017-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package neo
import (
"context"
"io"
"lab.nexedi.com/kirr/neo/go/internal/log"
)
// lclose closes c and logs closing error if there was any.
// the error is otherwise ignored
func lclose(ctx context.Context, c io.Closer) {
err := c.Close()
if err != nil {
log.Error(ctx, err)
}
}
......@@ -17,7 +17,7 @@
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package neo
package xneo
// node management & node table
import (
......@@ -32,6 +32,7 @@ import (
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task"
"lab.nexedi.com/kirr/neo/go/internal/xio"
)
// NodeTable represents known nodes in a cluster.
......@@ -363,8 +364,7 @@ func (p *Node) dial(ctx context.Context) (_ *neonet.NodeLink, err error) {
}
if err != nil {
//log.Errorif(ctx, link.Close())
lclose(ctx, link)
xio.LClose(ctx, link)
link = nil
}
......
// Copyright (C) 2017-2020 Nexedi SA and Contributors.
// Copyright (C) 2017-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -17,7 +17,7 @@
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package neo
package xneo
// partition table
import (
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
// Copyright (C) 2017-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
......@@ -17,7 +17,7 @@
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package neo
package xneo
import (
"testing"
......
// Copyright (C) 2016-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xneo provides base and common functionality for package neo.
package xneo
//go:generate gotrace gen .
import (
"context"
"fmt"
"net"
"sync"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task"
"lab.nexedi.com/kirr/neo/go/internal/xio"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
)
// NodeApp is base for implementing NEO node applications.
//
// XXX -> internal?
type NodeApp struct {
MyInfo proto.NodeInfo
ClusterName string
Net xnet.Networker // network AP we are sending/receiving on
MasterAddr string // address of current master XXX put under StateMu ?
StateMu sync.RWMutex // <- XXX just embed?
NodeTab *NodeTable // information about nodes in the cluster
PartTab *PartitionTable // information about data distribution in the cluster
ClusterState proto.ClusterState // master idea about cluster state
// should be set by user so NodeApp can notify when master tells this node to shutdown
OnShutdown func()
}
// NewNodeApp creates new node application
func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr string) *NodeApp {
app := &NodeApp{
MyInfo: proto.NodeInfo{Type: typ, Addr: proto.Address{}, UUID: 0, IdTime: proto.IdTimeNone},
ClusterName: clusterName,
Net: net,
MasterAddr: masterAddr,
NodeTab: &NodeTable{},
PartTab: &PartitionTable{},
ClusterState: -1, // invalid
}
app.NodeTab.nodeApp = app
return app
}
// Dial connects to another node in the cluster.
//
// It handshakes, requests identification and checks peer type. If successful returned are:
//
// - established link
// - accept identification reply
//
// Dial does not update .NodeTab or its node entries in any way.
// For establishing links to peers present in .NodeTab use Node.Dial.
//
// XXX unexport
func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr string) (_ *neonet.NodeLink, _ *proto.AcceptIdentification, err error) {
defer task.Runningf(&ctx, "dial %v (%v)", addr, peerType)(&err)
link, err := neonet.DialLink(ctx, app.Net, addr)
if err != nil {
return nil, nil, err
}
log.Info(ctx, "dialed ok; requesting identification...")
defer xerr.Contextf(&err, "%s: request identification", link)
// close link on error or FIXME: ctx cancel
//cleanup := xio.CloseWhenDone(ctx, link)
defer func() {
if err != nil {
// FIXME wrong - err=nil -> goroutine still left hanging waiting
// for ctx and will close link if dial ctx closes
// cleanup()
xio.LClose(ctx, link)
}
}()
req := &proto.RequestIdentification{
NodeType: app.MyInfo.Type,
UUID: app.MyInfo.UUID,
Address: app.MyInfo.Addr,
ClusterName: app.ClusterName,
IdTime: app.MyInfo.IdTime, // XXX ok?
DevPath: nil, // XXX stub
NewNID: nil, // XXX stub
}
accept := &proto.AcceptIdentification{}
// FIXME error if peer sends us something with another connID
// (currently we ignore and serveRecv will deadlock)
//
// XXX solution could be:
// link.CloseAccept()
// link.Ask1(req, accept)
// link.Listen()
// XXX but there is a race window in between recv in ask and listen
// start, and if peer sends new connection in that window it will be rejected.
//
// TODO thinking.
err = link.Ask1(req, accept)
if err != nil {
return nil, nil, err
}
// XXX vvv move out of here (e.g. to DialPeer) if we are not checking everthing in full here?
if accept.NodeType != peerType {
// XXX send Error to peer?
return nil, nil, fmt.Errorf("accepted, but peer is not %v (identifies as %v)", peerType, accept.NodeType)
}
// XXX accept.MyUUID, link // XXX register .NodeTab? (or better LinkTab as NodeTab is driven by M)
// XXX accept.YourUUID // XXX M can tell us to change UUID -> take in effect
// XXX accept.NumPartitions, ... wrt app.node.PartTab
log.Info(ctx, "identification accepted")
return link, accept, nil
}
// Listener is LinkListener adapted to return NodeLink with requested identification on Accept.
type Listener interface {
// from LinkListener:
Close() error
Addr() net.Addr
// Accept accepts incoming client connection.
//
// On success the link was handshaked and peer sent us RequestIdentification
// packet which we did not yet answer.
//
// On success returned are:
// - original peer request that carried identification
// - requested identification packet
//
// After successful accept it is the caller responsibility to reply the request.
//
// NOTE established link is Request.Link().
Accept(ctx context.Context) (*neonet.Request, *proto.RequestIdentification, error)
}
// NewListener wraps inner LinkListener into Listener.
func NewListener(inner neonet.LinkListener) Listener {
return &listener{l: inner}
}
type listener struct {
l neonet.LinkListener
}
func (l *listener) Accept(ctx context.Context) (_ *neonet.Request, _ *proto.RequestIdentification, err error) {
link, err := l.l.Accept(ctx)
if err != nil {
return nil, nil, err
}
// identify peer
// the first conn must come with RequestIdentification packet
defer xerr.Context(&err, "identify") // XXX -> task.ErrContext?
req, err := link.Recv1(/*ctx*/)
if err != nil {
return nil, nil, err
}
switch msg := req.Msg.(type) {
case *proto.RequestIdentification:
return &req, msg, nil
}
emsg := &proto.Error{proto.PROTOCOL_ERROR, fmt.Sprintf("unexpected message %T", req.Msg)}
req.Reply(emsg) // XXX err
return nil, nil, emsg
}
func (l *listener) Close() error { return l.l.Close() }
func (l *listener) Addr() net.Addr { return l.l.Addr() }
// ----------------------------------------
// UpdateNodeTab applies updates to .NodeTab from message and logs changes appropriately.
func (app *NodeApp) UpdateNodeTab(ctx context.Context, msg *proto.NotifyNodeInformation) {
// XXX msg.IdTime ?
for _, nodeInfo := range msg.NodeList {
log.Infof(ctx, "node update: %v", nodeInfo)
app.NodeTab.Update(nodeInfo)
// XXX we have to provide IdTime when requesting identification to other peers
// (e.g. Spy checks this is what master broadcast them and if not replies "unknown by master")
if nodeInfo.UUID == app.MyInfo.UUID {
// XXX recheck locking
// XXX do .MyInfo = nodeInfo ?
app.MyInfo.IdTime = nodeInfo.IdTime
// FIXME hack - better it be separate command and handled cleanly
if nodeInfo.State == proto.DOWN {
log.Info(ctx, "master told us to shutdown")
log.Flush()
app.OnShutdown()
// os.Exit(1)
return
}
}
}
// FIXME logging under lock (if caller took e.g. .StateMu before applying updates)
log.Infof(ctx, "full nodetab:\n%s", app.NodeTab)
}
// UpdatePartTab applies updates to .PartTab from message and logs changes appropriately.
func (app *NodeApp) UpdatePartTab(ctx context.Context, msg *proto.SendPartitionTable) {
pt := PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas
// XXX logging under lock
log.Infof(ctx, "parttab update: %v", pt)
app.PartTab = pt
}
// UpdateClusterState applies update to .ClusterState from message and logs change appropriately.
func (app *NodeApp) UpdateClusterState(ctx context.Context, msg *proto.NotifyClusterState) {
// XXX loging under lock
log.Infof(ctx, "state update: %v", msg.State)
app.ClusterState.Set(msg.State)
}
// Code generated by lab.nexedi.com/kirr/go123/tracing/cmd/gotrace; DO NOT EDIT.
package xneo
// code generated for tracepoints
import (
"lab.nexedi.com/kirr/go123/tracing"
"unsafe"
)
// traceevent: traceNodeChanged(nt *NodeTable, n *Node)
type _t_traceNodeChanged struct {
tracing.Probe
probefunc func(nt *NodeTable, n *Node)
}
var _traceNodeChanged *_t_traceNodeChanged
func traceNodeChanged(nt *NodeTable, n *Node) {
if _traceNodeChanged != nil {
_traceNodeChanged_run(nt, n)
}
}
func _traceNodeChanged_run(nt *NodeTable, n *Node) {
for p := _traceNodeChanged; p != nil; p = (*_t_traceNodeChanged)(unsafe.Pointer(p.Next())) {
p.probefunc(nt, n)
}
}
func traceNodeChanged_Attach(pg *tracing.ProbeGroup, probe func(nt *NodeTable, n *Node)) *tracing.Probe {
p := _t_traceNodeChanged{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceNodeChanged)), &p.Probe)
return &p.Probe
}
// trace export signature
func _trace_exporthash_a393ecf34683256731eab893a4d035f1326c103e() {}
......@@ -35,32 +35,5 @@ func traceMasterStartReady_Attach(pg *tracing.ProbeGroup, probe func(m *Master,
return &p.Probe
}
// traceevent: traceNodeChanged(nt *NodeTable, n *Node)
type _t_traceNodeChanged struct {
tracing.Probe
probefunc func(nt *NodeTable, n *Node)
}
var _traceNodeChanged *_t_traceNodeChanged
func traceNodeChanged(nt *NodeTable, n *Node) {
if _traceNodeChanged != nil {
_traceNodeChanged_run(nt, n)
}
}
func _traceNodeChanged_run(nt *NodeTable, n *Node) {
for p := _traceNodeChanged; p != nil; p = (*_t_traceNodeChanged)(unsafe.Pointer(p.Next())) {
p.probefunc(nt, n)
}
}
func traceNodeChanged_Attach(pg *tracing.ProbeGroup, probe func(nt *NodeTable, n *Node)) *tracing.Probe {
p := _t_traceNodeChanged{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceNodeChanged)), &p.Probe)
return &p.Probe
}
// trace export signature
func _trace_exporthash_ee76c0bfa710c94614a1fd0fe7a79e9cb723a340() {}
func _trace_exporthash_fd2f9958709df62d1f79e16cfd88823a232f5771() {}
......@@ -9,6 +9,7 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/xneo"
)
// traceimport: "lab.nexedi.com/kirr/neo/go/neo/neonet"
......@@ -35,3 +36,14 @@ func init() { proto_trace_exporthash() }
//go:linkname proto_traceClusterStateChanged_Attach lab.nexedi.com/kirr/neo/go/neo/proto.traceClusterStateChanged_Attach
func proto_traceClusterStateChanged_Attach(*tracing.ProbeGroup, func(cs *proto.ClusterState)) *tracing.Probe
// traceimport: "lab.nexedi.com/kirr/neo/go/neo/xneo"
// rerun "gotrace gen" if you see link failure ↓↓↓
//go:linkname xneo_trace_exporthash lab.nexedi.com/kirr/neo/go/neo/xneo._trace_exporthash_a393ecf34683256731eab893a4d035f1326c103e
func xneo_trace_exporthash()
func init() { xneo_trace_exporthash() }
//go:linkname xneo_traceNodeChanged_Attach lab.nexedi.com/kirr/neo/go/neo/xneo.traceNodeChanged_Attach
func xneo_traceNodeChanged_Attach(*tracing.ProbeGroup, func(nt *xneo.NodeTable, n *xneo.Node)) *tracing.Probe
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment