Commit 839d42a6 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a6935abb
...@@ -985,6 +985,7 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms ...@@ -985,6 +985,7 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms
// ---------------------------------------- // ----------------------------------------
// updateNodeTab = .nodeTab.Update + send δnodeTab to all subscribers.
// XXX place // XXX place
// called from main master process. XXX // called from main master process. XXX
func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xneo.PeerNode { func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xneo.PeerNode {
...@@ -1003,6 +1004,7 @@ func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xn ...@@ -1003,6 +1004,7 @@ func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xn
log.Warningf(ctx, "peer %s is slow -> detaching it", nid) log.Warningf(ctx, "peer %s is slow -> detaching it", nid)
// TODO ^^^ // TODO ^^^
panic("TODO")
} }
return peer return peer
...@@ -1108,8 +1110,9 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er ...@@ -1108,8 +1110,9 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er
// identify processes identification request of just connected node and either accepts or declines it. // identify processes identification request of just connected node and either accepts or declines it.
// //
// If node identification is accepted .nodeTab is updated and corresponding node entry is returned. // If node identification is accepted .nodeTab is updated and corresponding node entry is returned.
// XXX + state0. XXX +notifyTab.
// Response message is constructed but not send back not to block the caller - it is // Response message is constructed but not send back not to block the caller - it is
// the caller responsibility to send the response to node which requested identification. // the caller responsibility to send the response to node which requested identification. XXX via .accept()
func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot, resp proto.Msg) { func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot, resp proto.Msg) {
// XXX also verify ? : // XXX also verify ? :
// - NodeType valid // - NodeType valid
...@@ -1189,7 +1192,6 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, ...@@ -1189,7 +1192,6 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
IdTime: proto.IdTime(m.monotime()), IdTime: proto.IdTime(m.monotime()),
} }
// node = m.node.State.NodeTab.Update(nodeInfo) // NOTE this notifies all nodeTab subscribers
node = m.updateNodeTab(ctx, nodeInfo) node = m.updateNodeTab(ctx, nodeInfo)
node.SetLink(n.req.Link()) node.SetLink(n.req.Link())
...@@ -1199,13 +1201,6 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, ...@@ -1199,13 +1201,6 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
// (see updateNodeTab for details) // (see updateNodeTab for details)
notifyq := make(chan _ΔClusterState, 1024) notifyq := make(chan _ΔClusterState, 1024)
m.notifyTab[node.NID] = notifyq m.notifyTab[node.NID] = notifyq
// XXX go not here - only after initial state is sent out
/*
m.notifyWG.Add(1)
go func() {
defer m.notifyWG.Done()
}()
*/
return node, state0, accept return node, state0, accept
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment