Commit fd1d1a5a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ca492f72
...@@ -193,13 +193,10 @@ func (m *Master) Stop() { ...@@ -193,13 +193,10 @@ func (m *Master) Stop() {
<-ech <-ech
} }
// setClusterState sets .clusterState and notifies subscribers. // setClusterState sets .clusterState and notifies subscribers.
func (m *Master) setClusterState(state proto.ClusterState) { func (m *Master) setClusterState(ctx context.Context, state proto.ClusterState) {
m.node.State.Code.Set(state) m.node.State.Code.Set(state)
//m.notifyAll(ctx, &_ΔStateCode{state}) TODO enable
// TODO notify subscribers
// <- _ΔStateCode{state}
} }
...@@ -377,7 +374,7 @@ type storRecovery struct { ...@@ -377,7 +374,7 @@ type storRecovery struct {
func (m *Master) recovery(ctx context.Context) (err error) { func (m *Master) recovery(ctx context.Context) (err error) {
defer task.Running(&ctx, "recovery")(&err) defer task.Running(&ctx, "recovery")(&err)
m.setClusterState(proto.ClusterRecovering) m.setClusterState(ctx, proto.ClusterRecovering)
ctx, rcancel := context.WithCancel(ctx) ctx, rcancel := context.WithCancel(ctx)
defer rcancel() defer rcancel()
...@@ -642,7 +639,7 @@ var errClusterDegraded = errors.New("cluster became non-operatonal") ...@@ -642,7 +639,7 @@ var errClusterDegraded = errors.New("cluster became non-operatonal")
func (m *Master) verify(ctx context.Context) (err error) { func (m *Master) verify(ctx context.Context) (err error) {
defer task.Running(&ctx, "verify")(&err) defer task.Running(&ctx, "verify")(&err)
m.setClusterState(proto.ClusterVerifying) m.setClusterState(ctx, proto.ClusterVerifying)
ctx, vcancel := context.WithCancel(ctx) ctx, vcancel := context.WithCancel(ctx)
defer vcancel() defer vcancel()
...@@ -849,7 +846,7 @@ type serveDone struct { ...@@ -849,7 +846,7 @@ type serveDone struct {
func (m *Master) serve(ctx context.Context) (err error) { func (m *Master) serve(ctx context.Context) (err error) {
defer task.Running(&ctx, "serve")(&err) defer task.Running(&ctx, "serve")(&err)
m.setClusterState(proto.ClusterRunning) m.setClusterState(ctx, proto.ClusterRunning)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
...@@ -1037,6 +1034,7 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms ...@@ -1037,6 +1034,7 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms
// disconnectPeer resets link to the peer and sets its state to DOWN in nodeTab. // disconnectPeer resets link to the peer and sets its state to DOWN in nodeTab.
// other peers are notified with δnodeTab about it. // other peers are notified with δnodeTab about it.
// must be called from main.
// XXX place=? // XXX place=?
func (m *Master) disconnectPeer(ctx context.Context, peer *_MasteredPeer) { func (m *Master) disconnectPeer(ctx context.Context, peer *_MasteredPeer) {
// XXX log? // XXX log?
...@@ -1045,13 +1043,10 @@ func (m *Master) disconnectPeer(ctx context.Context, peer *_MasteredPeer) { ...@@ -1045,13 +1043,10 @@ func (m *Master) disconnectPeer(ctx context.Context, peer *_MasteredPeer) {
m.updateNodeState(ctx, peer.node, proto.DOWN) m.updateNodeState(ctx, peer.node, proto.DOWN)
} }
// updateNodeTab = .nodeTab.Update + send δnodeTab to all subscribers. // notifyAll notifies all peers about event.
// must be called from main.
// XXX place // XXX place
// called from main master process. XXX func (m *Master) notifyAll(ctx context.Context, event _ΔClusterState) {
func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xneo.PeerNode {
node := m.node.State.NodeTab.Update(nodeInfo)
event := &_ΔNodeTab{nodeInfo}
// XXX locking // XXX locking
for nid, peer := range m.peerTab { for nid, peer := range m.peerTab {
// TODO change limiting by buffer size to limiting by time - // TODO change limiting by buffer size to limiting by time -
...@@ -1070,6 +1065,14 @@ func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xn ...@@ -1070,6 +1065,14 @@ func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xn
panic("TODO") panic("TODO")
} }
}
// updateNodeTab = .nodeTab.Update + send δnodeTab to all subscribers.
// must be called from main.
// XXX place
func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xneo.PeerNode {
node := m.node.State.NodeTab.Update(nodeInfo)
m.notifyAll(ctx, &_ΔNodeTab{nodeInfo})
return node return node
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment