Commit 23e2d875 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d571d7e9
......@@ -28,17 +28,17 @@ import (
"os"
"strings"
"sync"
"time"
// "time"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
// "golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/internal/log"
// "lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task"
"lab.nexedi.com/kirr/neo/go/internal/xio"
// "lab.nexedi.com/kirr/neo/go/internal/xio"
"lab.nexedi.com/kirr/neo/go/internal/xurl"
"lab.nexedi.com/kirr/neo/go/internal/xzlib"
"lab.nexedi.com/kirr/neo/go/internal/xzodb"
......@@ -51,7 +51,8 @@ import (
// Client is NEO node that talks to NEO cluster and exposes access to it via ZODB interfaces.
type Client struct {
node *xneo.NodeApp
// node *xneo.NodeApp
node *_MasteredNode
talkMasterCancel func()
......@@ -61,6 +62,7 @@ type Client struct {
mlink *neonet.NodeLink
mlinkReady chan struct{} // reinitialized at each new talk cycle
/*
// operational state in node is maintained by recvMaster.
// users retrieve it via withOperational().
//
......@@ -74,6 +76,7 @@ type Client struct {
// protected by .node.StateMu
operational bool // XXX <- somehow move to NodeApp?
opReady chan struct{} // reinitialized each time state becomes non-operational
*/
// driver client <- watcher: database commits | errors.
watchq chan<- zodb.Event
......@@ -97,10 +100,11 @@ var _ zodb.IStorageDriver = (*Client)(nil)
// Use Run to actually start running the node.
func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
return &Client{
node: xneo.NewNodeApp(net, proto.CLIENT, clusterName, masterAddr),
// node: xneo.NewNodeApp(net, proto.CLIENT, clusterName, masterAddr),
node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr),
mlinkReady: make(chan struct{}),
operational: false,
opReady: make(chan struct{}),
// operational: false,
// opReady: make(chan struct{}),
at0Ready: make(chan struct{}),
}
}
......@@ -111,7 +115,7 @@ func (cli *Client) Run(ctx context.Context) error {
// run process which performs master talk
ctx, cancel := context.WithCancel(ctx)
cli.talkMasterCancel = cancel
cli.node.OnShutdown = cancel // XXX ok?
// cli.node.OnShutdown = cancel // XXX ok?
return cli.talkMaster(ctx)
}
......@@ -156,6 +160,7 @@ func (c *Client) masterLink(ctx context.Context) (*neonet.NodeLink, error) {
}
}
/*
// updateOperational updates .operational from current state.
//
// Must be called with .node.StateMu lock held.
......@@ -191,7 +196,9 @@ func (c *Client) updateOperational() (sendReady func()) {
}
}
}
*/
/*
// withOperational waits for cluster state to be operational.
//
// If successful it returns with operational state RLocked (c.node.StateMu) and
......@@ -223,7 +230,9 @@ func (c *Client) withOperational(ctx context.Context) error {
}
}
}
*/
/*
// talkMaster connects to master, announces self and receives notifications.
// it tries to persist master link reconnecting as needed.
//
......@@ -331,6 +340,7 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
return wg.Wait()
}
*/
// initFromMaster asks M for DB head right after identification.
func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) {
......@@ -376,7 +386,7 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neonet.NodeLink) (err er
defer task.Running(&ctx, "rx")(&err)
for {
req, err := mlink.Recv1()
req, err := mlink.Recv1() // XXX -> Recv1M
if err != nil {
return err
}
......@@ -395,16 +405,18 @@ func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error {
// <- committed txn
case *proto.InvalidateObjects:
return c.invalidateObjects(msg)
default:
return fmt.Errorf("unexpected message: %T", msg)
}
/*
// messages for state changes
// XXX -> NodeApp into common code to handle NodeTab + PartTab updates from M?
c.node.StateMu.Lock()
switch msg := req.Msg.(type) {
default:
c.node.StateMu.Unlock()
c.node.statemu.unlock()
return fmt.Errorf("unexpected message: %T", msg)
// <- whole partTab
......@@ -430,6 +442,7 @@ func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error {
opready()
return nil
*/
}
// invalidateObjects is called by recvMaster1 on receiving invalidateObjects notification.
......@@ -520,6 +533,25 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
}
}()
// Retrieve storages we might need to access.
storv := make([]*xneo.Node, 0, 1)
err = c.node.WithOperational(ctx, func(cs *xneo.ClusterState) error {
for _, cell := range cs.PartTab.Get(xid.Oid) {
if cell.Readable() {
stor := cs.NodeTab.Get(cell.NID)
// this storage might not yet come up
if stor != nil && stor.State == proto.RUNNING {
storv = append(storv, stor)
}
}
}
return nil
})
if err != nil {
return nil, 0, err
}
/*
err = c.withOperational(ctx)
if err != nil {
return nil, 0, err
......@@ -538,6 +570,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
}
}
c.node.StateMu.RUnlock()
*/
if len(storv) == 0 {
// XXX recheck it adds traceback to log -> XXX it does not -> add our Bugf which always forces +v on such error print
......
......@@ -46,14 +46,15 @@ import (
// XXX how to use
type _MasteredNode struct {
myInfo proto.NodeInfo // type, laddr, nid, state, idtime
clusterName string
net xnet.Networker // network AP we are sending/receiving on
masterAddr string // address of current master TODO -> masterRegistry
ClusterName string
Net xnet.Networker // network AP we are sending/receiving on
MasterAddr string // address of current master TODO -> masterRegistry
stateMu sync.RWMutex
nodeTab *xneo.NodeTable // information about nodes in the cluster
partTab *xneo.PartitionTable // information about data distribution in the cluster
clusterState proto.ClusterState // master idea about cluster state
state xneo.ClusterState
// nodeTab *xneo.NodeTable // information about nodes in the cluster
// partTab *xneo.PartitionTable // information about data distribution in the cluster
// clusterState proto.ClusterState // master idea about cluster state
// operational state in node is maintained by talkMaster.
......@@ -92,14 +93,16 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
NID: 0,
IdTime: proto.IdTimeNone,
},
clusterName: clusterName,
ClusterName: clusterName,
net: net,
masterAddr: masterAddr,
Net: net,
MasterAddr: masterAddr,
nodeTab: &xneo.NodeTable{},
partTab: &xneo.PartitionTable{},
clusterState: -1, // invalid
state: xneo.ClusterState{
NodeTab: &xneo.NodeTable{},
PartTab: &xneo.PartitionTable{},
Code: -1, // invalid
},
}
return node
......@@ -112,7 +115,7 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
//
// XXX connection to master is persisted (redial)
func (node *_MasteredNode) talkMaster(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "talk master(%s)", node.masterAddr)(&err)
defer task.Runningf(&ctx, "talk master(%s)", node.MasterAddr)(&err)
for {
err := node.talkMaster1(ctx)
......@@ -138,12 +141,12 @@ func (node *_MasteredNode) talkMaster1(ctx context.Context) (err error) {
NodeType: node.myInfo.Type,
NID: node.myInfo.NID,
Address: node.myInfo.Addr,
ClusterName: node.clusterName,
ClusterName: node.ClusterName,
IdTime: node.myInfo.IdTime, // XXX ok?
DevPath: nil, // XXX stub
NewNID: nil, // XXX stub
}
mlink, accept, err := dialNode(ctx, proto.MASTER, node.net, node.masterAddr, reqID)
mlink, accept, err := dialNode(ctx, proto.MASTER, node.Net, node.MasterAddr, reqID)
if err != nil {
return err
}
......@@ -180,7 +183,7 @@ func (node *_MasteredNode) talkMaster1(ctx context.Context) (err error) {
if err != nil {
return err
}
node.partTab = pt
node.state.PartTab = pt
// XXX update "operational"
// XXX update .masterLink + notify waiters
......@@ -253,7 +256,7 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (err
pt := xneo.PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas
// XXX logging under lock ok?
log.Infof(ctx, "parttab update: %s", pt)
node.partTab = pt
node.state.PartTab = pt
// <- δ(partTab)
case *proto.NotifyPartitionChanges:
......@@ -265,12 +268,12 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (err
case *proto.NotifyClusterState:
log.Infof(ctx, "state update: %s", msg.State)
node.clusterState = msg.State
traceClusterStateChanged(&node.clusterState)
node.state.Code = msg.State
traceClusterStateChanged(&node.state.Code)
}
if δpt && node.OnNotifyδPartTab != nil {
err = node.OnNotifyδPartTab(node.partTab)
err = node.OnNotifyδPartTab(node.state.PartTab)
// XXX err -> return without notify?
panic("TODO")
}
......@@ -291,9 +294,7 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (err
// .node.StateMu lock is released - it will close current .opReady this way
// notifying .operational waiters.
func (node *_MasteredNode) updateOperational() (sendReady func()) {
// XXX py client does not wait for cluster state = running
operational := // node.clusterState == proto.ClusterRunning &&
node.partTab.OperationalWith(node.nodeTab)
operational := node.state.IsOperational()
//fmt.Printf("\nupdateOperatinal: %v\n", operational)
//fmt.Println(node.partTab)
......@@ -318,10 +319,9 @@ func (node *_MasteredNode) updateOperational() (sendReady func()) {
}
// WhenOperational runs f during when cluster state is/becomes operational ... XXX
// XXX state is rlocked during f run
// XXX -> WhenOperationalAndRLocked ?
func (node *_MasteredNode) WhenOperational(ctx context.Context, f func(context.Context) error) error {
// WithOperational runs f during when cluster state is/becomes operational.
// The cluster state is guaranteed not to change during f run.
func (node *_MasteredNode) WithOperational(ctx context.Context, f func(cs *xneo.ClusterState) error) error {
for {
node.stateMu.RLock()
if node.operational {
......@@ -345,7 +345,7 @@ func (node *_MasteredNode) WhenOperational(ctx context.Context, f func(context.C
// node.operational=y and node.stateMu is rlocked
defer node.stateMu.RUnlock()
return f(ctx) // XXX do we need to pass ctx to f?
return f(&node.state)
}
var cmdShutdown = errors.New("master told us to shutdown")
......@@ -357,7 +357,7 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN
// XXX msg.IdTime ?
for _, nodeInfo := range msg.NodeList {
log.Infof(ctx, "node update: %v", nodeInfo)
node.nodeTab.Update(nodeInfo)
node.state.NodeTab.Update(nodeInfo)
// we have to provide IdTime when requesting identification to other peers
// (e.g. Spy checks this is what master broadcast them and if not replies "unknown by master")
......@@ -377,7 +377,7 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN
}
// FIXME logging under lock ok? (if caller took e.g. .stateMu before applying updates)
log.Infof(ctx, "full nodetab:\n%s", node.nodeTab)
log.Infof(ctx, "full nodetab:\n%s", node.state.NodeTab)
return nil
}
......
......@@ -40,6 +40,18 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/proto"
)
// ClusterState represent state of a cluster.
type ClusterState struct {
NodeTab *NodeTable // information about nodes in the cluster
PartTab *PartitionTable // information about data distribution in the cluster
Code proto.ClusterState // master idea about cluster state
}
func (cs *ClusterState) IsOperational() bool {
// XXX py client does not wait for cluster state==RUNNING
return /* cs.Code == proto.ClusterRunning && */ cs.PartTab.OperationalWith(cs.NodeTab)
}
// NodeApp provides base functionality underlying any NEO node. XXX -> NodeBase? NodeSrv? NodeInstance?
//
// Every node knows how to talk to master and receives master idea about:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment