Commit 06685e0b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent b5b0419e
......@@ -42,9 +42,9 @@ package neo
import (
"context"
stderrors "errors"
"errors"
"fmt"
"sync"
// "sync"
"time"
xxcontext "lab.nexedi.com/kirr/go123/xcontext"
......@@ -181,7 +181,7 @@ func NewMaster(clusterName string, net xnet.Networker) *Master {
// NOTE upon successful return cluster is not yet in running state - the transition will
// take time and could be also automatically aborted due to cluster environment change (e.g.
// a storage node goes down).
func (m *Master) Start() error {
func (m *Master) _Start() error {
ech := make(chan error)
m.ctlStart <- ech
return <-ech
......@@ -388,9 +388,9 @@ func (m *Master) recovery(ctx context.Context) (err error) {
ctx, rcancel := context.WithCancel(ctx)
defer rcancel()
recoveryq := make(chan storRecovery)
inprogress := 0 // in-progress stor recoveries
wg := &sync.WaitGroup{}
recoveredq := make(chan storRecovery) // <- result of 1 stor recovery
inprogress := 0 // in-progress stor recoveries
// wg := &sync.WaitGroup{}
start := false // whether we were instructed to start
//trace:event traceMasterStartReady(m *Master, ready bool)
......@@ -418,15 +418,15 @@ func (m *Master) recovery(ctx context.Context) (err error) {
}
}
// XXX set cluster state = RECOVERY
// XXX down clients
// TODO (?) set cluster state = RECOVERY
// TODO down clients
// goStorCtlRecovery spawns recovery task on a storage peer.
goStorCtlRecovery := func(stor *_MasteredPeer) {
inprogress++
wg.Add(1)
// wg.Add(1)
stor.wg.Go(func(peerCtx context.Context) error {
defer wg.Done()
// defer wg.Done()
ctx, cancel := xxcontext.Merge/*Cancel*/(ctx, peerCtx)
defer cancel()
......@@ -438,7 +438,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
})
ack := make(chan struct{})
recoveryq <- storRecovery{stor: stor, partTab: pt, err: err, ack: ack}
recoveredq <- storRecovery{stor: stor, partTab: pt, err: err, ack: ack}
<-ack
// canceled recovery does not mean we should down the storage node
......@@ -456,16 +456,34 @@ func (m *Master) recovery(ctx context.Context) (err error) {
}
}
// during stop: react only to task completion and node leaving
ctxDone := ctx.Done()
ctlStart := m.ctlStart
ctlStop := m.ctlStop
nodeComeq := m.nodeComeq
err = nil
stop := func(stopErr error) {
if err != nil {
return
}
err = stopErr
rcancel()
ctxDone = nil
ctlStart = nil
ctlStop = nil
nodeComeq = nil
}
loop:
for !(inprogress == 0 && readyToStart && start) {
for inprogress > 0 || !(
/*start*/(readyToStart && start) || /*stop*/(err != nil)) {
select {
case <-ctx.Done():
err = ctx.Err()
break loop
case <-ctxDone:
stop(ctx.Err())
// request to start the cluster - if ok we exit replying ok
// if not ok - we just reply not ok
case ech := <-m.ctlStart:
case ech := <-ctlStart:
if readyToStart {
log.Infof(ctx, "start command - we are ready")
// reply "ok to start" after whole recovery finishes
......@@ -481,13 +499,13 @@ loop:
// XXX (depending on storages state)
ech <- nil
}()
break loop
break loop // FIXME
}
log.Infof(ctx, "start command - err - we are not ready")
ech <- fmt.Errorf("start: cluster is non-operational")
case ech := <-m.ctlStop:
case ech := <-ctlStop:
close(ech) // ok; we are already recovering
// peer (should be) disconnected
......@@ -496,7 +514,7 @@ loop:
updateReadyToStart()
// node comes in and asks to be identified
case n := <-m.nodeComeq:
case n := <-nodeComeq:
peer, ok := m.identify(ctx, n,
// XXX only accept:
// - S -> PENDING
......@@ -513,7 +531,7 @@ loop:
// a storage node came through recovery - let's see whether
// ptid ↑ and if so we should take partition table from there
case r := <-recoveryq:
case r := <-recoveredq:
close(r.ack) // for <-"node leave" to happen after <-recovery in case of err
inprogress--
......@@ -533,11 +551,12 @@ loop:
}
}
/*
// wait all workers to finish (which should come without delay since it was cancelled)
// XXX not good - some of the rest of the storages can fail in the
// meantime and this will lead to partTab to become non-opertional.
// XXX also: some of the recoveries could still _succeed_ (e.g.
// successfuly recovery send was already queued to recoveryq but not
// successfuly recovery send was already queued to recoveredq but not
// yet received) - this successful recovery could bring us newer
// partTab fro which we should reconsider whether we have all needed
// nodes up and running.
......@@ -550,7 +569,7 @@ loop:
loop2:
for {
select {
case r := <-recoveryq:
case r := <-recoveredq:
close(r.ack)
log.Error(ctx, r.err)
......@@ -559,6 +578,7 @@ loop2:
break loop2
}
}
*/
if err != nil {
return err
......@@ -618,8 +638,8 @@ func storCtlRecovery(ctx context.Context, stor *_MasteredPeer) (_ *xneo.Partitio
}
var errStopRequested = stderrors.New("stop requested")
var errClusterDegraded = stderrors.New("cluster became non-operatonal")
var errStopRequested = errors.New("stop requested")
var errClusterDegraded = errors.New("cluster became non-operatonal")
// Cluster Verification (data recovery)
......@@ -648,8 +668,8 @@ func (m *Master) verify(ctx context.Context) (err error) {
ctx, vcancel := context.WithCancel(ctx)
defer vcancel()
verifyq := make(chan storVerify) // <- result of stor verify task
inprogress := 0 // in-progress verify tasks
verifiedq := make(chan storVerify) // <- result of stor verify task
inprogress := 0 // in-progress verify tasks
// NOTE we don't reset m.lastOid / m.lastTid to 0 in the beginning of verification
// XXX (= py), rationale=?
......@@ -667,7 +687,7 @@ func (m *Master) verify(ctx context.Context) (err error) {
})
ack := make(chan struct{})
verifyq <- storVerify{stor: stor, lastOid: lastOid, lastTid: lastTid, err: err, ack: ack}
verifiedq <- storVerify{stor: stor, lastOid: lastOid, lastTid: lastTid, err: err, ack: ack}
<-ack
// canceled verify does not mean we should down the storage node
......@@ -747,7 +767,7 @@ func (m *Master) verify(ctx context.Context) (err error) {
// on error check - whether cluster became non-operational and stop verification if so
//
// FIXME actually implement logic to decide to finish/rollback transactions
case v := <-verifyq:
case v := <-verifiedq:
close(v.ack) // XXX explain why (see recovery)
inprogress--
......@@ -857,7 +877,7 @@ func (m *Master) serve(ctx context.Context) (err error) {
defer cancel()
servedq := make(chan serveDone) // <- result of a serve task
inprogress := 0 // in-progress serve tasks
inprogress := 0 // in-progress serve tasks
// goServe spawns serve task for a peer.
goServe := func(peer *_MasteredPeer) {
......
......@@ -83,7 +83,7 @@ type tNode struct {
// ITestMaster represents tested master node.
type ITestMaster interface {
Start() error
_Start() error
}
// ITestStorage represents tested storage node.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment