Commit ebf2ca71 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent dd1540ff
...@@ -104,6 +104,8 @@ type peerWork struct { ...@@ -104,6 +104,8 @@ type peerWork struct {
state0 *xneo.ClusterStateSnapshot state0 *xneo.ClusterStateSnapshot
// main sends δnodeTab/δpartTab/δstateCode to notifyq. // main sends δnodeTab/δpartTab/δstateCode to notifyq.
notifyq chan _ΔClusterState notifyq chan _ΔClusterState
// notifyqOverflow becomes ready if main detects that peer is to slow to consume updates
notifyqOverflow chan struct{}
} }
// _ΔClusterState represents δnodeTab/δpartTab/δClusterState. // _ΔClusterState represents δnodeTab/δpartTab/δClusterState.
...@@ -1039,7 +1041,9 @@ func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xn ...@@ -1039,7 +1041,9 @@ func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xn
} }
log.Warningf(ctx, "peer %s is slow -> detaching it", nid) log.Warningf(ctx, "peer %s is slow -> detaching it", nid)
// TODO ^^^ close(w.notifyqOverflow)
// TODO delete(m.peerWorkTab, nid)
// XXX what else?
panic("TODO") panic("TODO")
} }
...@@ -1233,11 +1237,12 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, ...@@ -1233,11 +1237,12 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
// make nodeTab/partTab snapshot to push to accepted node and subscribe it for updates // make nodeTab/partTab snapshot to push to accepted node and subscribe it for updates
m.peerWorkTab[node.NID] = &peerWork{ m.peerWorkTab[node.NID] = &peerWork{
wg: xsync.NewWorkGroup(m.runCtx), wg: xsync.NewWorkGroup(m.runCtx),
state0: m.node.State.Snapshot(), state0: m.node.State.Snapshot(),
// TODO change limiting by buffer size -> to limiting by time // TODO change limiting by buffer size -> to limiting by time
// (see updateNodeTab for details) // (see updateNodeTab for details)
notifyq: make(chan _ΔClusterState, 1024), notifyq: make(chan _ΔClusterState, 1024),
notifyqOverflow: make(chan struct{}),
} }
return node, accept return node, accept
...@@ -1285,7 +1290,11 @@ func (m *Master) accept(peer *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot, ...@@ -1285,7 +1290,11 @@ func (m *Master) accept(peer *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot,
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err() // XXX signal to nodeLeaveq ?
case <-w.notifyqOverflow:
// XXX err -> ? XXX signal to nodeLeaveq ?
return fmt.Errorf("detaching (peer is too slow to consume updates)")
case δstate = <-w.notifyq: // XXX could be also closed? case δstate = <-w.notifyq: // XXX could be also closed?
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment