Commit 03a6d5f4 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent fc667f6c
...@@ -258,19 +258,10 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neo.NodeLink) error { ...@@ -258,19 +258,10 @@ func (c *Client) recvMaster(ctx context.Context, mlink *neo.NodeLink) error {
// TODO // TODO
case *neo.NotifyNodeInformation: case *neo.NotifyNodeInformation:
// XXX msg.IdTimestamp ? c.node.UpdateNodeTab(ctx, msg)
for _, nodeInfo := range msg.NodeList {
log.Infof(ctx, "rx node update: %v", nodeInfo)
c.node.NodeTab.Update(nodeInfo)
}
// FIXME logging under lock
log.Infof(ctx, "full nodetab:\n%s", c.node.NodeTab)
case *neo.NotifyClusterState: case *neo.NotifyClusterState:
// XXX loging under lock c.node.UpdateClusterState(ctx, msg)
log.Infof(ctx, "rx state update: %v", msg.State)
c.node.ClusterState.Set(msg.State)
} }
// update .operational + notify those who was waiting for it // update .operational + notify those who was waiting for it
......
...@@ -322,7 +322,27 @@ func (l *listener) Addr() net.Addr { ...@@ -322,7 +322,27 @@ func (l *listener) Addr() net.Addr {
return l.l.Addr() return l.l.Addr()
} }
// ----------------------------------------
// TODO functions to update: // TODO functions to update:
// .PartTab from NotifyPartitionTable msg // .PartTab from NotifyPartitionTable msg
// .NodeTab from NotifyNodeInformation msg
// .ClusterState from NotifyClusterState msg
// UpdateNodeTab applies updates to .NodeTab from message and logs changes appropriately.
func (app *NodeApp) UpdateNodeTab(ctx context.Context, msg *NotifyNodeInformation) {
// XXX msg.IdTimestamp ?
for _, nodeInfo := range msg.NodeList {
log.Infof(ctx, "rx node update: %v", nodeInfo)
app.NodeTab.Update(nodeInfo)
}
// FIXME logging under lock (if caller took e.g. .StateMu before applying updates)
log.Infof(ctx, "full nodetab:\n%s", app.NodeTab)
}
// UpdateClusterState applies update to .ClusterState from message and logs change appropriately.
func (app *NodeApp) UpdateClusterState(ctx context.Context, msg *NotifyClusterState) {
// XXX loging under lock
log.Infof(ctx, "rx state update: %v", msg.State)
app.ClusterState.Set(msg.State)
}
...@@ -143,12 +143,12 @@ func (stor *Storage) Run(ctx context.Context) error { ...@@ -143,12 +143,12 @@ func (stor *Storage) Run(ctx context.Context) error {
return err // XXX err ctx return err // XXX err ctx
} }
// --- connect to master and let it direct us --- // --- connect to master and let it drive us ---
// talkMaster connects to master, announces self and receives commands and notifications. // talkMaster connects to master, announces self and receives commands and notifications.
// it tries to persist master link reconnecting as needed. // it tries to persist master link reconnecting as needed.
// //
// it always returns an error - either due to cancel or command from master to shutdown // it always returns an error - either due to cancel or command from master to shutdown.
func (stor *Storage) talkMaster(ctx context.Context) (err error) { func (stor *Storage) talkMaster(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "talk master(%v)", stor.node.MasterAddr)(&err) defer task.Runningf(&ctx, "talk master(%v)", stor.node.MasterAddr)(&err)
...@@ -173,7 +173,7 @@ func (stor *Storage) talkMaster(ctx context.Context) (err error) { ...@@ -173,7 +173,7 @@ func (stor *Storage) talkMaster(ctx context.Context) (err error) {
// talkMaster1 does 1 cycle of connect/talk/disconnect to master. // talkMaster1 does 1 cycle of connect/talk/disconnect to master.
// //
// it returns error describing why such cycle had to finish // it returns error describing why such cycle had to finish.
// XXX distinguish between temporary problems and non-temporary ones? // XXX distinguish between temporary problems and non-temporary ones?
func (stor *Storage) talkMaster1(ctx context.Context) (err error) { func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// XXX dup in Client.talkMaster1 ? // XXX dup in Client.talkMaster1 ?
...@@ -276,11 +276,10 @@ func (stor *Storage) m1initialize(ctx context.Context, mlink *neo.NodeLink) (req ...@@ -276,11 +276,10 @@ func (stor *Storage) m1initialize(ctx context.Context, mlink *neo.NodeLink) (req
case *neo.NotifyNodeInformation: case *neo.NotifyNodeInformation:
// XXX check for myUUID and consider it a command (like neo/py) does? // XXX check for myUUID and consider it a command (like neo/py) does?
// TODO update .nodeTab stor.node.UpdateNodeTab(ctx, msg) // XXX lock?
case *neo.NotifyClusterState: case *neo.NotifyClusterState:
// TODO .clusterState = ... XXX what to do with it? stor.node.UpdateClusterState(ctx, msg) // XXX lock? what to do with it?
} }
// XXX move req.Reply here and ^^^ only prepare reply // XXX move req.Reply here and ^^^ only prepare reply
...@@ -336,6 +335,15 @@ func (stor *Storage) m1serve(ctx context.Context, reqStart *neo.Request) (err er ...@@ -336,6 +335,15 @@ func (stor *Storage) m1serve(ctx context.Context, reqStart *neo.Request) (err er
case *neo.StopOperation: case *neo.StopOperation:
return fmt.Errorf("stop requested") return fmt.Errorf("stop requested")
// XXX NotifyPartitionTable?
// XXX NotifyPartitionChanges?
case *neo.NotifyNodeInformation:
stor.node.UpdateNodeTab(ctx, msg) // XXX lock?
case *neo.NotifyClusterState:
stor.node.UpdateClusterState(ctx, msg) // XXX lock? what to do with it?
// TODO commit related messages // TODO commit related messages
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment