Commit 155993d9 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent dc59c1da
......@@ -118,8 +118,6 @@ type nodeLeave struct {
type _MasteredPeer struct {
node *xneo.PeerNode
accept *proto.AcceptIdentification // identify decided to accept this peer with .accept
// all tasks are spawned under wg. If any task fails - whole wg is canceled.
// cancel signals all tasks under wg to stop.
wg *xsync.WorkGroup
......@@ -132,6 +130,8 @@ type _MasteredPeer struct {
// notifyqOverflow becomes ready if main detects that peer is to slow to consume updates
// XXX no need? (peer.notify is canceled via peerWork.cancel)
notifyqOverflow chan struct{}
acceptDone chan struct{} // ready after initial accept sequence is sent to the peer
}
// _ΔClusterState represents δnodeTab/δpartTab/δClusterState.
......@@ -1100,101 +1100,16 @@ func (m *Master) updateNodeState(ctx context.Context, node *xneo.PeerNode, state
m.updateNodeTab(ctx, nodei)
}
/*
// keepPeerUpdated sends cluster state updates to peer on the link.
func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (err error) {
// link should be already in parent ctx (XXX and closed on cancel ?)
defer task.Runningf(&ctx, "keep updated")(&err)
// first lock cluster state to get its first consistent snapshot and
// atomically subscribe to updates
m.node.StateMu.RLock()
//clusterState := m.node.ClusterState
// XXX ^^^ + subscribe
nodev := m.node.State.NodeTab.All()
nodeiv := make([]proto.NodeInfo, len(nodev))
for i, node := range nodev {
// NOTE .NodeInfo is data not pointers - so won't change after we copy it to nodeiv
nodeiv[i] = node.NodeInfo
}
ptid := m.node.State.PartTab.PTid
ptnr := uint32(0) // FIXME hardcoded NumReplicas; NEO/py keeps this as n(replica)-1
ptv := m.node.State.PartTab.Dump()
// XXX RLock is not enough for subscribe - right?
nodech, nodeUnsubscribe := m.node.State.NodeTab.SubscribeBuffered()
m.node.StateMu.RUnlock()
// don't forget to unsubscribe when we are done
defer func() {
m.node.StateMu.RLock() // XXX rlock not enough for unsubscribe
// XXX ClusterState unsubscribe
nodeUnsubscribe()
m.node.StateMu.RUnlock()
}()
// ok now we have state snapshot and subscription channels.
// first send the snapshot.
// XXX +ClusterState
err = link.Send1(&proto.NotifyNodeInformation{
IdTime: proto.IdTimeNone, // XXX what here?
NodeList: nodeiv,
})
if err != nil {
return err
}
err = link.Send1(&proto.SendPartitionTable{ // XXX to C, but not to S?
PTid: ptid,
NumReplicas: ptnr,
RowList: ptv,
})
if err != nil {
return err
}
// now proxy the updates until we are done
for {
var msg proto.Msg
select {
case <-ctx.Done():
return ctx.Err()
// XXX ClusterState
case nodeiv = <-nodech:
msg = &proto.NotifyNodeInformation{
IdTime: proto.IdTimeNone, // XXX what here?
NodeList: nodeiv,
}
}
// XXX vvv don't allow it to send very slowly and thus our
// buffered subscription channel to grow up indefinitely.
// XXX -> if it is too slow - just close the link.
err = link.Send1(msg)
if err != nil {
return err
}
}
}
*/
// ----------------------------------------
// identify processes identification request of just connected node and either accepts or declines it.
//
// If node identification is accepted .nodeTab and .peerTab are updated and
// corresponding peer entry is returned. XXX
// Response message is constructed but not send back not to block the caller - it is
// the caller responsibility to send the response to node which requested identification. XXX via .accept()
// corresponding peer entry is returned. New task is spawned to reply with
// either accept or reject.
//
// XXX If the peer is accepted (run something after initial accept completes)
func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer, ok bool) {
// XXX also verify ? :
// - NodeType valid
......@@ -1212,7 +1127,6 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
nid = m.allocNID(nodeType)
}
// XXX nid < 0 (temporary) -> reallocate if conflict ?
// XXX check nid matches NodeType
node := m.node.State.NodeTab.Get(nid)
......@@ -1245,7 +1159,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
if err != nil {
log.Infof(ctx, "%s: rejecting: %s", subj, err)
m.mainWG.Go(func(ctx context.Context) error {
// XXX close link on ctx cancel?
// XXX close link on ctx cancel
n.req.Reply(err)
n.req.Link().Close()
// XXX log err (if any)
......@@ -1288,9 +1202,9 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
// create peer with nodeTab/partTab snapshot to push to accepted node
// and subscribe it for updates.
peerCtx, peerCancel := context.WithCancel(m.runCtx)
// XXX add accept.NID to peerCtx task?
peer = &_MasteredPeer{
node: node,
accept: accept,
wg: xsync.NewWorkGroup(peerCtx),
cancel: peerCancel,
state0: m.node.State.Snapshot(),
......@@ -1298,27 +1212,37 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
// (see updateNodeTab for details)
notifyq: make(chan _ΔClusterState, 1024),
notifyqOverflow: make(chan struct{}),
acceptDone: make(chan struct{}),
}
m.peerTab[node.NID] = peer
// XXX peer.wg.Go(m.accept)
// spawn task to send accept and proxy δnodeTab/δpartTab to the peer
peer.wg.Go(func(ctx context.Context) error {
// go main <- peer "peer (should be) disconnected" when all peer's task finish
m.mainWG.Go(func(_ context.Context) error {
// wait for all tasks related to peer to complete and then
// notify main that peer node should go. Don't take ctx into
// account - it is ~ runCtx and should be parent of context
// under which per-peer tasks are spawned. This way if runCtx
// is canceled -> any per-peer ctx should be canceled too and
// wg.Wait should not block.
err := peer.wg.Wait()
m.nodeLeaveq <- nodeLeave{peer, err} // XXX detect if if main is already done
return nil // XXX or ctx.Err() ?
})
return peer, true
}
// XXX err -> indicated that accept0 failed ?
// XXX close link on ctx cancel?
link := peer.node.Link()
// accept sends acceptance to just identified peer, sends nodeTab and partTab
// and spawns task to proxy their updates to the peer. XXX
// XXX +ctx?
func (m *Master) __accept(peer *_MasteredPeer, idReq *neonet.Request) error {
// XXX errctx?
err := idReq.Reply(peer.accept)
// send acceptance to just identified peer
err := n.req.Reply(accept)
if err != nil {
return fmt.Errorf("send accept: %w", err)
}
// XXX idReq close?
// send initial state snapshot to accepted node
link := peer.node.Link() // XXX -> idReq.Link() instead?
// nodeTab
err = link.Send1(&peer.state0.NodeTab)
......@@ -1336,23 +1260,35 @@ func (m *Master) __accept(peer *_MasteredPeer, idReq *neonet.Request) error {
// XXX send clusterState too? (NEO/py does not send it)
// TODO indicate that initial phase of accept is done
// indicate to run that initial acceptance is done
close(peer.acceptDone)
peer.wg.Go(peer.notify) // main -> peer δnodeTab/δpartTab/δcluterState to proxy to peer link
// go main <- peer "peer (should be) disconnected"
m.mainWG.Go(func (_ context.Context) error {
// wait for all tasks related to peer to complete and then
// notify main that peer node should go. Don't take ctx into
// account - it is ~ runCtx and should be parent of context
// under which per-peer tasks are spawned. This way if runCtx
// is canceled -> any per-peer ctx should be canceled too and
// wg.Wait should not block.
err := peer.wg.Wait()
m.nodeLeaveq <- nodeLeave{peer, err} // XXX detect if if main is already done
return nil // XXX or ctx.Err() ?
// proxy δnodeTab,δpartTab/δclusterState from main to the peer
return peer.notify(ctx)
})
return nil
return peer, true
}
// XXX run runs f after initial phase of peer acceptance is over.
//
// XXX this is very similar if a separate Accept call would return peers
// already identified and answered with initial accept message sequence.
// However identification needs decisions from main task (to e.g. consult
// nodeTab to see if peer laddr is not overlapping with anyone's, and to assign
// nid). Because main is involved we cannot move it to completely separate task
// and give main only one Accept entry point to call.
func (p *_MasteredPeer) run(ctx context.Context, f func() error) error {
select {
case <-ctx.Done():
return ctx.Err()
// XXX in general we should also wait for if "accept0 failed". However
// as that means accept0 task error, it would cancel ctx for all other
// tasks run through p.wg . And run is called with contexts whose
// cancel is derived from wg cancel - so we don't check for that. XXX
case <-p.acceptDone:
return f()
}
}
// notify proxies δnodeTab/δpeerTab/δClusterState update to the peer.
......@@ -1419,19 +1355,6 @@ func (p *_MasteredPeer) notify(ctx context.Context) (err error) {
}
// XXX run runs f after initial phase of peer acceptance is over.
//
// XXX this is very similar if a separate Accept call would return peers
// already identified and answered with initial accept message sequence.
// However identification needs decisions from main task (to e.g. consult
// nodeTab to see if peer laddr is not overlapping with anyone's, and to assign
// nid). Because main is involved we cannot move it to completely separate task
// and give main only one Accept entry point to call.
func (p *_MasteredPeer) run(ctx context.Context, f func() error) error {
// XXX wait p.acceptDone
return f()
}
// allocNID allocates new node ID for a node of kind nodeType.
// XXX it is bad idea for master to assign node ID to coming node
// -> better nodes generate really unique UUID themselves and always show with them
......
......@@ -1195,7 +1195,7 @@ func (c *Conn) sendPktDirect(pkt *pktBuf) error {
// ---- raw IO ----
const dumpio = false
const dumpio = true
// sendPkt sends raw packet to peer.
//
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment