Commit 0b4cf8c7 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 641b1326
...@@ -44,7 +44,7 @@ import ( ...@@ -44,7 +44,7 @@ import (
// Master is a node overseeing and managing how whole NEO cluster works. // Master is a node overseeing and managing how whole NEO cluster works.
type Master struct { type Master struct {
node *xneo.NodeApp node *xneo.Node
// master manages node and partition tables and broadcast their updates // master manages node and partition tables and broadcast their updates
// to all nodes in cluster // to all nodes in cluster
...@@ -74,7 +74,7 @@ type Master struct { ...@@ -74,7 +74,7 @@ type Master struct {
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewMaster(clusterName string, net xnet.Networker) *Master { func NewMaster(clusterName string, net xnet.Networker) *Master {
m := &Master{ m := &Master{
node: xneo.NewNodeApp(net, proto.MASTER, clusterName, ""), node: xneo.NewNode(net, proto.MASTER, clusterName, ""),
ctlStart: make(chan chan error), ctlStart: make(chan chan error),
ctlStop: make(chan chan struct{}), ctlStop: make(chan chan struct{}),
......
...@@ -48,7 +48,7 @@ import ( ...@@ -48,7 +48,7 @@ import (
// //
// Storage implements only NEO protocol logic with data being persisted via provided storage.Backend. // Storage implements only NEO protocol logic with data being persisted via provided storage.Backend.
type Storage struct { type Storage struct {
node *xneo.NodeApp node *xneo.Node
// context for providing operational service // context for providing operational service
// it is renewed every time master tells us StartOpertion, so users // it is renewed every time master tells us StartOpertion, so users
...@@ -67,7 +67,7 @@ type Storage struct { ...@@ -67,7 +67,7 @@ type Storage struct {
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewStorage(clusterName, masterAddr string, net xnet.Networker, back storage.Backend) *Storage { func NewStorage(clusterName, masterAddr string, net xnet.Networker, back storage.Backend) *Storage {
stor := &Storage{ stor := &Storage{
node: xneo.NewNodeApp(net, proto.STORAGE, clusterName, masterAddr), node: xneo.NewNode(net, proto.STORAGE, clusterName, masterAddr),
back: back, back: back,
} }
...@@ -316,7 +316,7 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro ...@@ -316,7 +316,7 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
err = req.Reply(&proto.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid}) err = req.Reply(&proto.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid})
// XXX -> somehow to common part in NodeApp ? // XXX -> somehow to common part in Node ?
case *proto.SendPartitionTable: case *proto.SendPartitionTable:
// TODO M sends us whole PT -> save locally // TODO M sends us whole PT -> save locally
stor.node.UpdatePartTab(ctx, msg) // XXX lock? XXX handle msg.NumReplicas stor.node.UpdatePartTab(ctx, msg) // XXX lock? XXX handle msg.NumReplicas
...@@ -388,7 +388,7 @@ func (stor *Storage) m1serve1(ctx context.Context, req neonet.Request) error { ...@@ -388,7 +388,7 @@ func (stor *Storage) m1serve1(ctx context.Context, req neonet.Request) error {
case *proto.StopOperation: case *proto.StopOperation:
return fmt.Errorf("stop requested") return fmt.Errorf("stop requested")
// should be served by NodeApp.commonRecv1 // should be served by Node.commonRecv1
// ---- 8< ---- // ---- 8< ----
// XXX SendPartitionTable? // XXX SendPartitionTable?
// XXX NotifyPartitionChanges? // XXX NotifyPartitionChanges?
......
...@@ -73,7 +73,7 @@ import ( ...@@ -73,7 +73,7 @@ import (
type NodeTable struct { type NodeTable struct {
// XXX for PeerNode.Dial to work. see also comments vvv near "peer link" // XXX for PeerNode.Dial to work. see also comments vvv near "peer link"
// XXX move pointer to local node to PeerNode instead? // XXX move pointer to local node to PeerNode instead?
nodeApp *NodeApp localNode *Node
nodev []*PeerNode // all nodes nodev []*PeerNode // all nodes
notifyv []chan proto.NodeInfo // subscribers notifyv []chan proto.NodeInfo // subscribers
...@@ -326,21 +326,21 @@ func (p *PeerNode) ResetLink(ctx context.Context) { ...@@ -326,21 +326,21 @@ func (p *PeerNode) ResetLink(ctx context.Context) {
func (p *PeerNode) dial(ctx context.Context) (_ *neonet.NodeLink, err error) { func (p *PeerNode) dial(ctx context.Context) (_ *neonet.NodeLink, err error) {
defer task.Runningf(&ctx, "connect %s", p.NID)(&err) // XXX "connect" good word here? defer task.Runningf(&ctx, "connect %s", p.NID)(&err) // XXX "connect" good word here?
app := p.nodeTab.nodeApp node := p.nodeTab.localNode
link, accept, err := app.Dial(ctx, p.Type, p.Addr.String()) link, accept, err := node.Dial(ctx, p.Type, p.Addr.String())
if err != nil { if err != nil {
return nil, err return nil, err
} }
// verify peer identifies as what we expect // verify peer identifies as what we expect
switch { switch {
// type is already checked by app.Dial // type is already checked by node.Dial
case accept.MyNID != p.NID: case accept.MyNID != p.NID:
err = fmt.Errorf("connected, but peer's nid is not %v (identifies as %v)", p.NID, accept.MyNID) err = fmt.Errorf("connected, but peer's nid is not %v (identifies as %v)", p.NID, accept.MyNID)
case accept.YourNID != app.MyInfo.NID: case accept.YourNID != node.MyInfo.NID:
err = fmt.Errorf("connected, but peer gives us nid %v (our is %v)", accept.YourNID, app.MyInfo.NID) err = fmt.Errorf("connected, but peer gives us nid %v (our is %v)", accept.YourNID, node.MyInfo.NID)
// XXX PeerNode.Dial is currently used by Client only. // XXX PeerNode.Dial is currently used by Client only.
// XXX For Client it would be not correct to check #partition only at // XXX For Client it would be not correct to check #partition only at
......
...@@ -52,14 +52,14 @@ func (cs *ClusterState) IsOperational() bool { ...@@ -52,14 +52,14 @@ func (cs *ClusterState) IsOperational() bool {
return /* cs.Code == proto.ClusterRunning && */ cs.PartTab.OperationalWith(cs.NodeTab) return /* cs.Code == proto.ClusterRunning && */ cs.PartTab.OperationalWith(cs.NodeTab)
} }
// NodeApp provides base functionality underlying any NEO node. XXX -> NodeBase? NodeSrv? NodeInstance? // Node provides base functionality underlying any NEO node.
// //
// Every node knows how to talk to master and receives master idea about: // Every node knows how to talk to master and receives master idea about:
// //
// - current node table (all nodes in the cluster), // - current node table (all nodes in the cluster),
// - current partition table (how data is split around storage nodes), // - current partition table (how data is split around storage nodes),
// - current cluster state. // - current cluster state.
type NodeApp struct { type Node struct {
MyInfo proto.NodeInfo MyInfo proto.NodeInfo
ClusterName string ClusterName string
...@@ -71,13 +71,13 @@ type NodeApp struct { ...@@ -71,13 +71,13 @@ type NodeApp struct {
PartTab *PartitionTable // information about data distribution in the cluster PartTab *PartitionTable // information about data distribution in the cluster
ClusterState proto.ClusterState // master idea about cluster state ClusterState proto.ClusterState // master idea about cluster state
// should be set by user so NodeApp can notify when master tells this node to shutdown // should be set by user so Node can notify when master tells this node to shutdown
OnShutdown func() OnShutdown func()
} }
// NewNodeApp creates new node application // NewNode creates new node.
func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr string) *NodeApp { func NewNode(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr string) *Node {
app := &NodeApp{ node := &Node{
MyInfo: proto.NodeInfo{Type: typ, Addr: proto.Address{}, NID: 0, IdTime: proto.IdTimeNone}, MyInfo: proto.NodeInfo{Type: typ, Addr: proto.Address{}, NID: 0, IdTime: proto.IdTimeNone},
ClusterName: clusterName, ClusterName: clusterName,
Net: net, Net: net,
...@@ -88,8 +88,8 @@ func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr ...@@ -88,8 +88,8 @@ func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr
ClusterState: -1, // invalid ClusterState: -1, // invalid
} }
app.NodeTab.nodeApp = app node.NodeTab.localNode = node
return app return node
} }
// Dial connects to another node in the cluster. // Dial connects to another node in the cluster.
...@@ -102,14 +102,14 @@ func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr ...@@ -102,14 +102,14 @@ func NewNodeApp(net xnet.Networker, typ proto.NodeType, clusterName, masterAddr
// Dial does not update .NodeTab or its node entries in any way. // Dial does not update .NodeTab or its node entries in any way.
// For establishing links to peers present in .NodeTab use Node.Dial. // For establishing links to peers present in .NodeTab use Node.Dial.
// //
// XXX unexport after NodeApp += talkMaster <- used only to dial to M // XXX unexport after Node += talkMaster <- used only to dial to M
// <- dialing to other nodes always go through node.Dial // <- dialing to other nodes always go through node.Dial
// //
// XXX <- use dialNode instead // XXX <- use dialNode instead
func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr string) (_ *neonet.NodeLink, _ *proto.AcceptIdentification, err error) { func (node *Node) Dial(ctx context.Context, peerType proto.NodeType, addr string) (_ *neonet.NodeLink, _ *proto.AcceptIdentification, err error) {
defer task.Runningf(&ctx, "dial %v (%v)", addr, peerType)(&err) defer task.Runningf(&ctx, "dial %v (%v)", addr, peerType)(&err)
link, err := neonet.DialLink(ctx, app.Net, addr) link, err := neonet.DialLink(ctx, node.Net, addr)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
...@@ -129,11 +129,11 @@ func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr stri ...@@ -129,11 +129,11 @@ func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr stri
}() }()
req := &proto.RequestIdentification{ req := &proto.RequestIdentification{
NodeType: app.MyInfo.Type, NodeType: node.MyInfo.Type,
NID: app.MyInfo.NID, NID: node.MyInfo.NID,
Address: app.MyInfo.Addr, Address: node.MyInfo.Addr,
ClusterName: app.ClusterName, ClusterName: node.ClusterName,
IdTime: app.MyInfo.IdTime, // XXX ok? IdTime: node.MyInfo.IdTime, // XXX ok?
DevPath: nil, // XXX stub DevPath: nil, // XXX stub
NewNID: nil, // XXX stub NewNID: nil, // XXX stub
} }
...@@ -237,24 +237,24 @@ func (l *listener) Addr() net.Addr { return l.l.Addr() } ...@@ -237,24 +237,24 @@ func (l *listener) Addr() net.Addr { return l.l.Addr() }
// ---------------------------------------- // ----------------------------------------
// UpdateNodeTab applies updates to .NodeTab from message and logs changes appropriately. // UpdateNodeTab applies updates to .NodeTab from message and logs changes appropriately.
func (app *NodeApp) UpdateNodeTab(ctx context.Context, msg *proto.NotifyNodeInformation) { func (node *Node) UpdateNodeTab(ctx context.Context, msg *proto.NotifyNodeInformation) {
// XXX msg.IdTime ? // XXX msg.IdTime ?
for _, nodeInfo := range msg.NodeList { for _, nodeInfo := range msg.NodeList {
log.Infof(ctx, "node update: %v", nodeInfo) log.Infof(ctx, "node update: %v", nodeInfo)
app.NodeTab.Update(nodeInfo) node.NodeTab.Update(nodeInfo)
// XXX we have to provide IdTime when requesting identification to other peers // XXX we have to provide IdTime when requesting identification to other peers
// (e.g. Spy checks this is what master broadcast them and if not replies "unknown by master") // (e.g. Spy checks this is what master broadcast them and if not replies "unknown by master")
if nodeInfo.NID == app.MyInfo.NID { if nodeInfo.NID == node.MyInfo.NID {
// XXX recheck locking // XXX recheck locking
// XXX do .MyInfo = nodeInfo ? // XXX do .MyInfo = nodeInfo ?
app.MyInfo.IdTime = nodeInfo.IdTime node.MyInfo.IdTime = nodeInfo.IdTime
// FIXME hack - better it be separate command and handled cleanly // FIXME hack - better it be separate command and handled cleanly
if nodeInfo.State == proto.DOWN { if nodeInfo.State == proto.DOWN {
log.Info(ctx, "master told us to shutdown") log.Info(ctx, "master told us to shutdown")
log.Flush() log.Flush()
app.OnShutdown() node.OnShutdown()
// os.Exit(1) // os.Exit(1)
return return
} }
...@@ -262,20 +262,20 @@ func (app *NodeApp) UpdateNodeTab(ctx context.Context, msg *proto.NotifyNodeInfo ...@@ -262,20 +262,20 @@ func (app *NodeApp) UpdateNodeTab(ctx context.Context, msg *proto.NotifyNodeInfo
} }
// FIXME logging under lock (if caller took e.g. .StateMu before applying updates) // FIXME logging under lock (if caller took e.g. .StateMu before applying updates)
log.Infof(ctx, "full nodetab:\n%s", app.NodeTab) log.Infof(ctx, "full nodetab:\n%s", node.NodeTab)
} }
// UpdatePartTab applies updates to .PartTab from message and logs changes appropriately. // UpdatePartTab applies updates to .PartTab from message and logs changes appropriately.
func (app *NodeApp) UpdatePartTab(ctx context.Context, msg *proto.SendPartitionTable) { func (node *Node) UpdatePartTab(ctx context.Context, msg *proto.SendPartitionTable) {
pt := PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas pt := PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas
// XXX logging under lock // XXX logging under lock
log.Infof(ctx, "parttab update: %v", pt) log.Infof(ctx, "parttab update: %v", pt)
app.PartTab = pt node.PartTab = pt
} }
// UpdateClusterState applies update to .ClusterState from message and logs change appropriately. // UpdateClusterState applies update to .ClusterState from message and logs change appropriately.
func (app *NodeApp) UpdateClusterState(ctx context.Context, msg *proto.NotifyClusterState) { func (node *Node) UpdateClusterState(ctx context.Context, msg *proto.NotifyClusterState) {
// XXX loging under lock // XXX loging under lock
log.Infof(ctx, "state update: %v", msg.State) log.Infof(ctx, "state update: %v", msg.State)
app.ClusterState.Set(msg.State) node.ClusterState.Set(msg.State)
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment