Commit 24c420c0 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3c25e926
......@@ -414,7 +414,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
}
// XXX set cluster state = RECOVERY
// XXX close links to clients
// XXX down clients
// start recovery on all storages we are currently in touch with
for _, stor := range m.node.State.NodeTab.StorageList() {
......@@ -463,54 +463,41 @@ loop:
case ech := <-m.ctlStop:
close(ech) // ok; we are already recovering
// new connection comes in
// new connection comes in and asks to be identified
case n := <-m.nodeComeq:
peer, ok := m.identify(ctx, n, /* XXX only accept storages -> PENDING */)
peer, ok := m.identify(ctx, n,
/* XXX only accept storages -> PENDING | MASTER */
)
if !ok {
break
}
// if new storage arrived - start recovery on it too
inprogress++
peer.wg.Go(func(peerCtx context.Context) error {
err := m.accept(peerCtx, peer)
if err != nil {
return err // XXX -> recoveryq
}
// S -> start recovery
if peer.node.Type != proto.STORAGE {
break
}
// XXX wg.Add(1) + defer wg.Done() ?
peer.wg.Go(func(peerCtx context.Context) error {
ctx, cancel := xxcontext.Merge/*Cancel*/(ctx, peerCtx)
//defer cancel()
})
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
// start recovery
var pt *xneo.PartitionTable
err := acceptAndRun(func(workCtx context.Context, node *xneo.PeerNode) error {
ctx, cancel := xxcontext.Merge/*Cancel*/(ctx, workCtx)
defer cancel()
pt, err = storCtlRecovery(ctx, node)
return err
err := peer.run(ctx, func(...) {
pt, err = storCtlRecovery(...)
})
ack := make(chan struct{})
recoveryq <- storRecovery{stor: peer.node, partTab: pt, err: err, ack: ack}
recoveryq <- storRecovery{stor: peer, partTab: pt, err: err, ack: ack}
<-ack
/*
err := m.accept(node, state0, n.req, resp)
if err != nil {
recovery <- storRecovery{stor: node, err: err}
return
// canceled recovery does not mean we should down the peer
if xcontext.Canceled(err) {
err = nil
}
return err
})
// start recovery
storCtlRecovery(ctx, node, recovery)
*/
}()
// XXX move up (before nodeComeq) ?
case n := <-m.nodeLeaveq:
......@@ -1316,13 +1303,15 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
}
m.peerTab[node.NID] = peer
// XXX peer.wg.Go(m.accept)
return peer, true
}
// accept sends acceptance to just identified peer, sends nodeTab and partTab
// and spawns task to proxy their updates to the peer. XXX
// XXX +ctx?
func (m *Master) accept(p *_MasteredPeer, idReq *neonet.Request) error {
func (m *Master) __accept(p *_MasteredPeer, idReq *neonet.Request) error {
// XXX errctx?
err := idReq.Reply(p.accept)
if err != nil {
......@@ -1349,6 +1338,8 @@ func (m *Master) accept(p *_MasteredPeer, idReq *neonet.Request) error {
// XXX send clusterState too? (NEO/py does not send it)
// TODO indicate that initial phase of accept is done
p.wg.Go(p.notify) // main -> peer δnodeTab/δpartTab/δcluterState to proxy to peer link
m.mainWG.Go(p.waitAll) // main <- peer "peer (should be) disconnected"
return nil
......@@ -1420,6 +1411,7 @@ func (p *_MasteredPeer) notify(ctx context.Context) (err error) {
// waitAll waits for all tasks related to peer to complete and then notifies
// main that peer node should go. It is spawned under mainWG.
// XXX naming -> wait?
// XXX inline into identify
func (p *_MasteredPeer) waitAll(_ context.Context) error {
// don't take our ctx into account - it is ~ runCtx and should be
// parent of context under which per-peer tasks are spawned. This way
......@@ -1431,6 +1423,19 @@ func (p *_MasteredPeer) waitAll(_ context.Context) error {
return nil // XXX or ctx.Err() ?
}
// XXX run runs f after initial phase of peer acceptance is over.
//
// XXX this is very similar if a separate Accept call would return peers
// already identified and answered with initial accept message sequence.
// However identification needs decisions from main task (to e.g. consult
// nodeTab to see if peer laddr is not overlapping with anyone's, and to assign
// nid). Because main is involved we cannot move it to completely separate task
// and give main only one Accept entry point to call.
func (p *_MasteredPeer) run(f) error {
// XXX wait p.acceptDone
// XXX f()
}
// allocNID allocates new node ID for a node of kind nodeType.
// XXX it is bad idea for master to assign node ID to coming node
// -> better nodes generate really unique UUID themselves and always show with them
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment