Commit 9d959716 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4c4a0340
...@@ -366,6 +366,9 @@ type storRecovery struct { ...@@ -366,6 +366,9 @@ type storRecovery struct {
err error err error
// XXX + backup_tid, truncate_tid ? // XXX + backup_tid, truncate_tid ?
// XXX naming cont? continue? unlock? unpause?
ack chan struct{} // main -> storCtlRecovery "thanks; please continue" XXX explain why
} }
// recovery drives cluster during recovery phase. // recovery drives cluster during recovery phase.
...@@ -380,22 +383,48 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -380,22 +383,48 @@ func (m *Master) recovery(ctx context.Context) (err error) {
ctx, rcancel := context.WithCancel(ctx) ctx, rcancel := context.WithCancel(ctx)
defer rcancel() defer rcancel()
recoveryq := make(chan storRecovery)
inprogress := 0 // in-progress stor recoveries
wg := &sync.WaitGroup{}
//trace:event traceMasterStartReady(m *Master, ready bool) //trace:event traceMasterStartReady(m *Master, ready bool)
readyToStart := false readyToStart := false
updateReadyToStart := func() {
// update indicator whether cluster currently can be operational or not
var ready bool
if m.node.State.PartTab.PTid == 0 {
// new cluster - allow startup if we have some storages passed
// recovery and there is no in-progress recovery running
nup := 0
for _, stor := range m.node.State.NodeTab.StorageList() {
if stor.State > proto.DOWN {
nup++
}
}
ready = (nup > 0 && inprogress == 0)
recovery := make(chan storRecovery) } else {
inprogress := 0 // in-progress stor recoveries ready = m.node.State.PartTab.OperationalWith(m.node.State.NodeTab) // XXX + node state
wg := &sync.WaitGroup{} }
// start recovery on all storages we are currently in touch with if readyToStart != ready {
readyToStart = ready
traceMasterStartReady(m, ready)
}
}
// XXX set cluster state = RECOVERY
// XXX close links to clients // XXX close links to clients
// start recovery on all storages we are currently in touch with
for _, stor := range m.node.State.NodeTab.StorageList() { for _, stor := range m.node.State.NodeTab.StorageList() {
if stor.State > proto.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ? if stor.State > proto.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++ inprogress++
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
storCtlRecovery(ctx, stor, recovery) storCtlRecovery(ctx, stor, recoveryq)
// XXX acj
}() }()
} }
} }
...@@ -424,7 +453,9 @@ loop: ...@@ -424,7 +453,9 @@ loop:
pt, err = storCtlRecovery(ctx, node) pt, err = storCtlRecovery(ctx, node)
return err return err
}) })
recovery <- storRecovery{stor: peer.node, partTab: pt, err: err} ack := make(chan struct{})
recoveryq <- storRecovery{stor: peer.node, partTab: pt, err: err, ack: ack}
<-ack
/* /*
err := m.accept(node, state0, n.req, resp) err := m.accept(node, state0, n.req, resp)
...@@ -443,28 +474,33 @@ loop: ...@@ -443,28 +474,33 @@ loop:
// XXX -> move to func // XXX -> move to func
peer := n.peer peer := n.peer
// XXX link.Close peer.node.Link().Close() // XXX err
// XXX node.ResetLink peer.node.ResetLink()
delete(m.peerTab, peer.node.NID) delete(m.peerTab, peer.node.NID)
m.updateNodeState(ctx, peer.node, proto.DOWN) m.updateNodeState(ctx, peer.node, proto.DOWN)
updateReadyToStart() // XXX here yes?
// a storage node came through recovery - let's see whether // a storage node came through recovery - let's see whether
// ptid ↑ and if so we should take partition table from there // ptid ↑ and if so we should take partition table from there
// //
// FIXME after a storage recovers, it could go away while // FIXME after a storage recovers, it could go away while
// recovery for others is in progress -> monitor this. // recovery for others is in progress -> monitor this.
case r := <-recovery: case r := <-recoveryq:
close(r.ack) // for <-"node leave" to happen after <-recovery in case of err
inprogress-- inprogress--
if r.err != nil { if r.err != nil {
log.Error(ctx, r.err) log.Error(ctx, r.err)
/*
if !xcontext.Canceled(r.err) { if !xcontext.Canceled(r.err) {
r.stor.ResetLink(ctx) r.stor.ResetLink(ctx)
m.updateNodeState(ctx, r.stor, proto.DOWN) m.updateNodeState(ctx, r.stor, proto.DOWN)
// XXX stop sending nodeTab/partTab updates to this node // XXX stop sending nodeTab/partTab updates to this node
} }
*/
} else { } else {
// we are interested in latest partTab // we are interested in latest partTab
...@@ -475,6 +511,10 @@ loop: ...@@ -475,6 +511,10 @@ loop:
} }
} }
updateReadyToStart()
/*
// XXX move -> updateReadyToStart?
// update indicator whether cluster currently can be operational or not // update indicator whether cluster currently can be operational or not
var ready bool var ready bool
if m.node.State.PartTab.PTid == 0 { if m.node.State.PartTab.PTid == 0 {
...@@ -496,6 +536,7 @@ loop: ...@@ -496,6 +536,7 @@ loop:
readyToStart = ready readyToStart = ready
traceMasterStartReady(m, ready) traceMasterStartReady(m, ready)
} }
*/
// request to start the cluster - if ok we exit replying ok // request to start the cluster - if ok we exit replying ok
...@@ -541,7 +582,7 @@ loop: ...@@ -541,7 +582,7 @@ loop:
loop2: loop2:
for { for {
select { select {
case r := <-recovery: case r := <-recoveryq:
// XXX dup wrt <-recovery handler above // XXX dup wrt <-recovery handler above
log.Error(ctx, r.err) log.Error(ctx, r.err)
...@@ -1182,7 +1223,7 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er ...@@ -1182,7 +1223,7 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er
// corresponding peer entry is returned. XXX // corresponding peer entry is returned. XXX
// Response message is constructed but not send back not to block the caller - it is // Response message is constructed but not send back not to block the caller - it is
// the caller responsibility to send the response to node which requested identification. XXX via .accept() // the caller responsibility to send the response to node which requested identification. XXX via .accept()
func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredNode, ok bool) { func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer, ok bool) {
// XXX also verify ? : // XXX also verify ? :
// - NodeType valid // - NodeType valid
// - IdTime ? // - IdTime ?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment