Commit 4e33b5f7 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 561c9f61
......@@ -64,6 +64,16 @@ type NodeTable struct {
notifyv []chan NodeInfo // subscribers
}
// Len returns N(entries) in the table.
func (nt *NodeTable) Len() int {
return len(nt.nodev)
}
// All returns all entries in the table as one slice.
// XXX -> better iter?
func (nt *NodeTable) All() []*Node {
return nt.nodev
}
// XXX vvv move -> peer.go?
......
......@@ -924,8 +924,7 @@ func (m *Master) serveClient(ctx context.Context, cli *neo.Node) (err error) {
// M -> C notifications about cluster state
wg.Go(func() error {
//return m.notifyPeer(ctx, clink) // XXX -> keepPeerUpdated?
return nil
return m.keepPeerUpdated(ctx, clink)
})
// M <- C requests handler
......@@ -970,6 +969,79 @@ func (m *Master) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Msg) {
// ----------------------------------------
// keepPeerUpdated sends cluster state updates to peer on the link
func (m *Master) keepPeerUpdated(ctx context.Context, link *neo.NodeLink) (err error) {
// link should be already in parent ctx (XXX and closed on cancel ?)
defer task.Runningf(&ctx, "keep updated")(&err)
// first lock cluster state to get its first consistent snapshot and
// atomically subscribe to updates
m.node.StateMu.RLock()
//clusterState := m.node.ClusterState
// XXX ^^^ + subscribe
nodev := m.node.NodeTab.All()
nodeiv := make([]neo.NodeInfo, len(nodev))
for i, node := range nodev {
// NOTE .NodeInfo is data not pointers - so won't change after we copy it to nodeiv
nodeiv[i] = node.NodeInfo
}
// XXX RLock is not enough for subscribe - right?
nodech, nodeUnsubscribe := m.node.NodeTab.SubscribeBuffered()
m.node.StateMu.RUnlock()
// don't forget to unsubscribe when we are done
defer func() {
m.node.StateMu.RLock() // XXX rlock not enough for unsubscribe
// XXX ClusterState unsubscribe
nodeUnsubscribe()
m.node.StateMu.RUnlock()
}()
// ok now we have state snapshot and subscription channels.
// first send the snapshot.
// XXX +ClusterState
err = link.Send1(&neo.NotifyNodeInformation{
IdTimestamp: 0, // XXX what here?
NodeList: nodeiv,
})
if err != nil {
return err
}
// now proxy the updates until we are done
for {
var msg neo.Msg
select {
case <-ctx.Done():
return ctx.Err()
// XXX ClusterState
case nodeiv = <-nodech:
msg = &neo.NotifyNodeInformation{
IdTimestamp: 0, // XXX what here?
NodeList: nodeiv,
}
}
// XXX vvv don't allow it to send very slowly and thus our
// buffered subscription channel to grow up indefinitely.
// XXX -> if it is too slow - just close the link.
err = link.Send1(msg)
if err != nil {
return err
}
}
}
// ----------------------------------------
// identify processes identification request of just connected node and either accepts or declines it.
//
// If node identification is accepted .nodeTab is updated and corresponding node entry is returned.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment