Commit d571d7e9 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0800ed58
// Copyright (C) 2017-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package neo
// a node driven by master
import (
"context"
"errors"
"fmt"
"sync"
"time"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task"
"lab.nexedi.com/kirr/neo/go/internal/xcontext"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/xneo"
)
// _MasteredNode provides base functioanlity of a NEO node driven by master.
//
// talkMaster persists connection to master node, adn receives update from M
// about δNodeTab, δPartTab, ClusterState.
//
// XXX how to use
type _MasteredNode struct {
myInfo proto.NodeInfo // type, laddr, nid, state, idtime
clusterName string
net xnet.Networker // network AP we are sending/receiving on
masterAddr string // address of current master TODO -> masterRegistry
stateMu sync.RWMutex
nodeTab *xneo.NodeTable // information about nodes in the cluster
partTab *xneo.PartitionTable // information about data distribution in the cluster
clusterState proto.ClusterState // master idea about cluster state
// operational state in node is maintained by talkMaster.
// users retrieve it via withOperational(). XXX recheck
//
// NOTE being operational means:
// - link to master established and is ok
// - .partTab is operational wrt .nodeTab
// - .clusterState = RUNNING <- XXX needed?
//
// however master link is accessed separately (see ^^^ and masterLink)
//
// protected by .stateMu
operational bool
opReady chan struct{} // reinitialized each time state becomes non-operational
// TODO -> RecvM1 instead
// OnNotify, if !nil, is called when master notifies this node with a message.
// XXX not called for δstate
OnNotify func(msg proto.Msg) error
// OnNotifyδPartTab, if !nil, is called when master notifies this node
// with a change to partition table. (used by S to persist partTab)
OnNotifyδPartTab func(pt *xneo.PartitionTable) error
}
func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterAddr string) *_MasteredNode {
node := &_MasteredNode{
myInfo: proto.NodeInfo{
Type: typ,
Addr: proto.Address{},
NID: 0,
IdTime: proto.IdTimeNone,
},
clusterName: clusterName,
net: net,
masterAddr: masterAddr,
nodeTab: &xneo.NodeTable{},
partTab: &xneo.PartitionTable{},
clusterState: -1, // invalid
}
return node
}
// talkMaster dials master, identifies to it, and receives master updates to
// node/partition tables and cluster state.
//
// XXX connection to master is persisted (redial)
func (node *_MasteredNode) talkMaster(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "talk master(%s)", node.masterAddr)(&err)
for {
err := node.talkMaster1(ctx)
log.Warning(ctx, err) // XXX Warning ok? -> Error?
// TODO if err == "reject identification / protocol error" -> shutdown client
// TODO if err == shutdown -> return
// TODO if err == "not a primary" -> try redirected addresss
// exit on cancel / throttle reconnecting
select {
case <-ctx.Done():
return ctx.Err()
// XXX 1s hardcoded -> move out of here
case <-time.After(1*time.Second):
// ok
}
}
}
func (node *_MasteredNode) talkMaster1(ctx context.Context) (err error) {
reqID := &proto.RequestIdentification{
NodeType: node.myInfo.Type,
NID: node.myInfo.NID,
Address: node.myInfo.Addr,
ClusterName: node.clusterName,
IdTime: node.myInfo.IdTime, // XXX ok?
DevPath: nil, // XXX stub
NewNID: nil, // XXX stub
}
mlink, accept, err := dialNode(ctx, proto.MASTER, node.net, node.masterAddr, reqID)
if err != nil {
return err
}
err = xcontext.WithCloseOnErrCancel(ctx, mlink, func() error {
if accept.YourNID != node.myInfo.NID {
log.Info(ctx, "master told us to have nid=%s", accept.YourNID)
node.myInfo.NID = accept.YourNID // XXX locking ?
}
// master pushes whole nodeTab and partTab to us right after identification
// nodeTab
mnt := proto.NotifyNodeInformation{}
_, err = mlink.Expect1(&mnt)
if err != nil {
return fmt.Errorf("after identification: %w", err)
}
// partTab XXX not to S and secondary M?
// https://lab.nexedi.com/nexedi/neoppod/blob/v1.12-69-gd98205d0/neo/master/handlers/__init__.py#L60-67
mpt := proto.SendPartitionTable{}
_, err = mlink.Expect1(&mpt)
if err != nil {
return fmt.Errorf("after identification: %w", err)
}
pt := xneo.PartTabFromDump(mpt.PTid, mpt.RowList) // TODO handle mpt.NumReplicas
log.Infof(ctx, "master initialized us with next parttab:\n%s", pt)
// update cluster state
// XXX locking
err := node.updateNodeTab(ctx, &mnt)
if err != nil {
return err
}
node.partTab = pt
// XXX update "operational"
// XXX update .masterLink + notify waiters
return nil
})
if err != nil {
// XXX
}
// receive and handle notifications from master
// XXX put inside ^^^ ?
defer task.Running(&ctx, "rx")(&err)
for {
req, err := mlink.Recv1()
if err != nil {
return err
}
err = node.recvMaster1(ctx, req.Msg)
req.Close()
if err != nil {
return err
}
}
}
// recvMaster1 handles 1 message from master.
func (node *_MasteredNode) recvMaster1(ctx context.Context, msg proto.Msg) (err error) {
// messages for state changes are handled internally
δstate := true
switch msg.(type) {
default: δstate = false
case *proto.SendPartitionTable: // whole partTab
case *proto.NotifyPartitionChanges: // δ(partTab)
case *proto.NotifyNodeInformation: // δ(nodeTab)
case *proto.NotifyClusterState:
}
if δstate {
err = node.recvδstate(ctx, msg)
} else {
// XXX other messages? -> particular user
// XXX rework protocol so that M sends δstate on dedicated connection and other messages on other connections?
if node.OnNotify != nil {
err = node.OnNotify(msg)
} else {
err = fmt.Errorf("unexpected message: %T", msg)
}
}
return err
}
//trace:event traceClusterStateChanged(cs *proto.ClusterState)
// recvδstate handles reception of δstate messages.
func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (err error) {
δpt := false
node.stateMu.Lock()
// XXX defer unlock ?
switch msg := msg.(type) {
default:
node.stateMu.Unlock()
panic(fmt.Sprintf("unexpected message: %T", msg))
// <- whole partTab
case *proto.SendPartitionTable:
pt := xneo.PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas
// XXX logging under lock ok?
log.Infof(ctx, "parttab update: %s", pt)
node.partTab = pt
// <- δ(partTab)
case *proto.NotifyPartitionChanges:
panic("TODO δ(partTab)")
// <- δ(nodeTab)
case *proto.NotifyNodeInformation:
node.updateNodeTab(ctx, msg)
case *proto.NotifyClusterState:
log.Infof(ctx, "state update: %s", msg.State)
node.clusterState = msg.State
traceClusterStateChanged(&node.clusterState)
}
if δpt && node.OnNotifyδPartTab != nil {
err = node.OnNotifyδPartTab(node.partTab)
// XXX err -> return without notify?
panic("TODO")
}
// update .operational + notify those who was waiting for it
opready := node.updateOperational()
node.stateMu.Unlock()
opready()
return nil
}
// updateOperational updates .operational from current state.
//
// Must be called with .stateMu lock held.
//
// Returned sendReady func must be called by updateOperational caller after
// .node.StateMu lock is released - it will close current .opReady this way
// notifying .operational waiters.
func (node *_MasteredNode) updateOperational() (sendReady func()) {
// XXX py client does not wait for cluster state = running
operational := // node.clusterState == proto.ClusterRunning &&
node.partTab.OperationalWith(node.nodeTab)
//fmt.Printf("\nupdateOperatinal: %v\n", operational)
//fmt.Println(node.partTab)
//fmt.Println(node.nodeTab)
var opready chan struct{}
if operational != node.operational {
node.operational = operational
if operational {
opready = node.opReady // don't close from under stateMu
} else {
node.opReady = make(chan struct{}) // remake for next operational waiters
}
}
return func() {
if opready != nil {
//fmt.Println("updateOperational - notifying %v\n", opready)
close(opready)
}
}
}
// WhenOperational runs f during when cluster state is/becomes operational ... XXX
// XXX state is rlocked during f run
// XXX -> WhenOperationalAndRLocked ?
func (node *_MasteredNode) WhenOperational(ctx context.Context, f func(context.Context) error) error {
for {
node.stateMu.RLock()
if node.operational {
//fmt.Printf("withOperation -> ready\n");
break
}
ready := node.opReady
node.stateMu.RUnlock()
//fmt.Printf("withOperational - waiting on %v\n", ready)
select {
case <-ctx.Done():
return ctx.Err()
case <-ready:
// ok - try to relock and read again.
}
}
// node.operational=y and node.stateMu is rlocked
defer node.stateMu.RUnlock()
return f(ctx) // XXX do we need to pass ctx to f?
}
var cmdShutdown = errors.New("master told us to shutdown")
// updateNodeTab applies updates to .nodeTab from message and logs changes appropriately.
// the only possible error is cmdShutdown.
// must be called under .stateMu.
func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyNodeInformation) error {
// XXX msg.IdTime ?
for _, nodeInfo := range msg.NodeList {
log.Infof(ctx, "node update: %v", nodeInfo)
node.nodeTab.Update(nodeInfo)
// we have to provide IdTime when requesting identification to other peers
// (e.g. Spy checks this is what master broadcast them and if not replies "unknown by master")
if nodeInfo.NID == node.myInfo.NID {
// XXX recheck locking
// XXX do .myInfo = nodeInfo ?
node.myInfo.IdTime = nodeInfo.IdTime
// NEO/py currently employs this hack
// FIXME -> better it be separate command and handled cleanly
if nodeInfo.State == proto.DOWN {
err := cmdShutdown
log.Info(ctx, err)
return err
}
}
}
// FIXME logging under lock ok? (if caller took e.g. .stateMu before applying updates)
log.Infof(ctx, "full nodetab:\n%s", node.nodeTab)
return nil
}
// XXX Dial + request identification + verify peer type
func dialNode(ctx context.Context, typ proto.NodeType, net xnet.Networker, addr string, reqID *proto.RequestIdentification) (_ *neonet.NodeLink, _ *proto.AcceptIdentification, err error) {
defer task.Runningf(&ctx, "dial %s (%s)", addr, typ)(&err)
link, err := neonet.DialLink(ctx, net, addr)
if err != nil {
return nil, nil, err
}
log.Info(ctx, "dialed ok; requesting identification...")
defer xerr.Contextf(&err, "%s: request identification", link)
accept := &proto.AcceptIdentification{}
err = xcontext.WithCloseOnErrCancel(ctx, link, func() error {
// FIXME error if peer sends us something with another connID
// (currently we ignore and serveRecv will deadlock)
//
// XXX solution could be:
// link.CloseAccept()
// link.Ask1(reqID, accept)
// link.Listen()
// XXX but there is a race window in between recv in ask and listen
// start, and if peer sends new connection in that window it will be rejected.
//
// TODO thinking.
err = link.Ask1(reqID, accept)
if err != nil {
return err
}
if accept.NodeType != typ {
// XXX send Error to peer?
return fmt.Errorf("accepted, but peer is not %v (identifies as %v)", typ, accept.NodeType)
}
return nil
})
if err != nil {
return nil, nil, err
}
log.Info(ctx, "identification accepted")
return link, accept, nil
}
// XXX = Dial node by NID, verify it accepts wit "MyNID" == NID, YourNID == NID we sent
// func dialNID
......@@ -61,6 +61,7 @@ func (e *Error) Error() string {
// Use Set instead of direct assignment for ClusterState tracing to work.
//
// XXX move this to neo.clusterState wrapping proto.ClusterState?
// XXX kill (_MasteredNode does it)
func (cs *ClusterState) Set(v ClusterState) {
*cs = v
traceClusterStateChanged(cs)
......
......@@ -92,6 +92,8 @@ func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr
//
// XXX unexport after NodeApp += talkMaster <- used only to dial to M
// <- dialing to other nodes always go through node.Dial
//
// XXX <- use dialNode instead
func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr string) (_ *neonet.NodeLink, _ *proto.AcceptIdentification, err error) {
defer task.Runningf(&ctx, "dial %v (%v)", addr, peerType)(&err)
......
......@@ -6,8 +6,37 @@ package neo
import (
"lab.nexedi.com/kirr/go123/tracing"
"unsafe"
"lab.nexedi.com/kirr/neo/go/neo/proto"
)
// traceevent: traceClusterStateChanged(cs *proto.ClusterState)
type _t_traceClusterStateChanged struct {
tracing.Probe
probefunc func(cs *proto.ClusterState)
}
var _traceClusterStateChanged *_t_traceClusterStateChanged
func traceClusterStateChanged(cs *proto.ClusterState) {
if _traceClusterStateChanged != nil {
_traceClusterStateChanged_run(cs)
}
}
func _traceClusterStateChanged_run(cs *proto.ClusterState) {
for p := _traceClusterStateChanged; p != nil; p = (*_t_traceClusterStateChanged)(unsafe.Pointer(p.Next())) {
p.probefunc(cs)
}
}
func traceClusterStateChanged_Attach(pg *tracing.ProbeGroup, probe func(cs *proto.ClusterState)) *tracing.Probe {
p := _t_traceClusterStateChanged{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceClusterStateChanged)), &p.Probe)
return &p.Probe
}
// traceevent: traceMasterStartReady(m *Master, ready bool)
type _t_traceMasterStartReady struct {
......@@ -36,4 +65,4 @@ func traceMasterStartReady_Attach(pg *tracing.ProbeGroup, probe func(m *Master,
}
// trace export signature
func _trace_exporthash_fd2f9958709df62d1f79e16cfd88823a232f5771() {}
func _trace_exporthash_885c4ce269e66324d28c9dd9ffcd6959aecaf100() {}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment