Commit 9cb8a1c6 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3dd45621
...@@ -120,8 +120,10 @@ type _MasteredPeer struct { ...@@ -120,8 +120,10 @@ type _MasteredPeer struct {
accept *proto.AcceptIdentification // identify decided to accept this peer with .accept accept *proto.AcceptIdentification // identify decided to accept this peer with .accept
// all tasks are spawned under wg. If any task fails - whole wg is canceled. // all tasks are spawned under wg. If any task fails - whole wg is canceled.
wg *xsync.WorkGroup // cancel signals all tasks under wg to stop.
// XXX +cancel wg *xsync.WorkGroup
cancel func()
// snapshot of nodeTab/partTab/stateCode when peer was accepted by main. // snapshot of nodeTab/partTab/stateCode when peer was accepted by main.
state0 *xneo.ClusterStateSnapshot state0 *xneo.ClusterStateSnapshot
// main -> peerWG.notify δnodeTab/δpartTab/δstateCode. // main -> peerWG.notify δnodeTab/δpartTab/δstateCode.
...@@ -1049,19 +1051,18 @@ func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xn ...@@ -1049,19 +1051,18 @@ func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xn
event := &_ΔNodeTab{nodeInfo} event := &_ΔNodeTab{nodeInfo}
// XXX locking // XXX locking
// for nid, ch := range m.notifyTab { for nid, p := range m.peerTab {
for nid, w := range m.peerWorkTab {
// TODO change limiting by buffer size to limiting by time - // TODO change limiting by buffer size to limiting by time -
// - i.e. detach peer if event queue grows more than 30s of time. // - i.e. detach peer if event queue grows more than 30s of time.
select { select {
case w.notifyq <- event: case p.notifyq <- event:
continue // ok continue // ok
default: default:
} }
log.Warningf(ctx, "peer %s is slow -> detaching it", nid) log.Warningf(ctx, "peer %s is slow -> detaching it", nid)
close(w.notifyqOverflow) close(p.notifyqOverflow)
// TODO delete(m.peerWorkTab, nid) // TODO delete(m.peerTab, nid) -> p.cancel()
// XXX what else? // XXX what else?
panic("TODO") panic("TODO")
} }
...@@ -1261,26 +1262,31 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredNode, ...@@ -1261,26 +1262,31 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredNode,
node = m.updateNodeTab(ctx, nodeInfo) node = m.updateNodeTab(ctx, nodeInfo)
node.SetLink(n.req.Link()) node.SetLink(n.req.Link())
// make nodeTab/partTab snapshot to push to accepted node and subscribe it for updates
m.peerWorkTab[node.NID] = &_MasteredPeer{ // create peer with nodeTab/partTab snapshot to push to accepted node
peer: node, // and subscribe it for updates.
peerCtx, peerCancel := context.WithCancel(m.runCtx)
peer = &_MasteredPeer{
node: node,
accept: accept, accept: accept,
wg: xsync.NewWorkGroup(m.runCtx), // XXX wrong -> per peer ctx (derived from runCtx) wg: xsync.NewWorkGroup(peerCtx),
cancel: peerCancel,
state0: m.node.State.Snapshot(), state0: m.node.State.Snapshot(),
// TODO change limiting by buffer size -> to limiting by time // TODO change limiting by buffer size -> to limiting by time
// (see updateNodeTab for details) // (see updateNodeTab for details)
notifyq: make(chan _ΔClusterState, 1024), notifyq: make(chan _ΔClusterState, 1024),
notifyqOverflow: make(chan struct{}), notifyqOverflow: make(chan struct{}),
} }
m.peerTab[node.NID] = peer
return peer, true return peer, true
} }
// accept sends acceptance to just identified peer, sends nodeTab and partTab // accept sends acceptance to just identified peer, sends nodeTab and partTab
// and spawns task to proxy their updates to the peer. XXX // and spawns task to proxy their updates to the peer. XXX
func (m *Master) accept(p *_MasteredPeer, idReq *neonet.Request, idResp proto.Msg) error { func (m *Master) accept(p *_MasteredPeer, idReq *neonet.Request) error {
// XXX errctx? // XXX errctx?
err := idReq.Reply(idResp) err := idReq.Reply(p.accept)
if err != nil { if err != nil {
return fmt.Errorf("send accept: %w", err) return fmt.Errorf("send accept: %w", err)
} }
...@@ -1305,8 +1311,8 @@ func (m *Master) accept(p *_MasteredPeer, idReq *neonet.Request, idResp proto.Ms ...@@ -1305,8 +1311,8 @@ func (m *Master) accept(p *_MasteredPeer, idReq *neonet.Request, idResp proto.Ms
// XXX send clusterState too? (NEO/py does not send it) // XXX send clusterState too? (NEO/py does not send it)
// spawn p.notify to proxy δnodeTab/δpartTab/δcluterState to peer p.wg.Go(p.notify) // main -> peer δnodeTab/δpartTab/δcluterState to proxy to peer link
p.wg.Go(p.notify) m.mainWG.Go(p.waitAll) // main <- peer "peer (should be) disconnected"
return nil return nil
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment