Commit 07798a21 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 48f3f0bb
......@@ -338,7 +338,7 @@ func (m *Master) main(ctx context.Context) (err error) {
}
// provide service as long as partition table stays operational
err = m.service(ctx)
err = m.serve(ctx)
if err != nil {
//log.Error(ctx, err)
continue // -> recovery
......@@ -390,6 +390,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
inprogress := 0 // in-progress stor recoveries
wg := &sync.WaitGroup{}
start := false // whether we were instructed to start
//trace:event traceMasterStartReady(m *Master, ready bool)
readyToStart := false // whether cluster currently can be operational or not
updateReadyToStart := func() {
......@@ -454,7 +455,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
}
loop:
for {
for !(inprogress == 0 && readyToStart && start) {
select {
case <-ctx.Done():
err = ctx.Err()
......@@ -645,9 +646,8 @@ func (m *Master) verify(ctx context.Context) (err error) {
ctx, vcancel := context.WithCancel(ctx)
defer vcancel()
verifyq := make(chan storVerify)
inprogress := 0
wg := &sync.WaitGroup{}
verifyq := make(chan storVerify) // <- result of stor verify task
inprogress := 0 // in-progress verify tasks
// NOTE we don't reset m.lastOid / m.lastTid to 0 in the beginning of verification
// XXX (= py), rationale=?
......@@ -655,10 +655,7 @@ func (m *Master) verify(ctx context.Context) (err error) {
// goStorCtlVerify spawns verify task on a storage peer.
goStorCtlVerify := func(stor *_MasteredPeer) {
inprogress++
wg.Add(1)
stor.wg.Go(func(peerCtx context.Context) error {
defer wg.Done()
var lastOid zodb.Oid
var lastTid zodb.Tid
err := stor.run(ctx, func() error {
......@@ -686,20 +683,35 @@ func (m *Master) verify(ctx context.Context) (err error) {
}
}
loop:
// during stop: react only to task completion and node leaving
ctxDone := ctx.Done()
ctlStart := m.ctlStart
ctlStop := m.ctlStop
nodeComeq := m.nodeComeq
err = nil
stop := func(stopErr error) {
if err != nil {
return
}
err = stopErr
vcancel()
ctxDone = nil
ctlStart = nil
ctlStop = nil
nodeComeq = nil
}
for inprogress > 0 {
select {
case <-ctx.Done():
err = ctx.Err()
break loop
case <-ctxDone:
stop(ctx.Err())
case ech := <-m.ctlStart:
case ech := <-ctlStart:
ech <- nil // we are already starting
case ech := <-m.ctlStop:
case ech := <-ctlStop:
close(ech) // ok
err = errStopRequested
break loop
stop(errStopRequested)
// peer (should be) disconnected
case n := <-m.nodeLeaveq:
......@@ -709,13 +721,11 @@ loop:
if !m.node.State.PartTab.OperationalWith(m.node.State.NodeTab) {
// XXX ok to instantly cancel? or better
// graceful shutdown in-flight verifications?
vcancel()
err = errClusterDegraded
break loop
stop(errClusterDegraded)
}
// node comes in and asks to be identified
case n := <-m.nodeComeq:
case n := <-nodeComeq:
peer, ok := m.identify(ctx, n,
// XXX only accept:
// - S -> known ? RUNNING : PENDING
......@@ -745,9 +755,7 @@ loop:
// check partTab is still operational
// if not -> cancel to go back to recovery
if !m.node.State.PartTab.OperationalWith(m.node.State.NodeTab) {
vcancel()
err = errClusterDegraded
break loop
stop(errClusterDegraded)
}
} else {
if v.lastOid > m.lastOid {
......@@ -760,26 +768,6 @@ loop:
}
}
// wait all workers to finish (which should come without delay since it was cancelled)
// XXX not good - see loop2 in recovery about why
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
loop2:
for {
select {
case v := <-verifyq:
close(v.ack)
log.Error(ctx, v.err)
case <-done:
break loop2
}
}
return err
}
......@@ -846,39 +834,41 @@ func storCtlVerify(ctx context.Context, stor *_MasteredPeer, pt *xneo.PartitionT
//
// TODO also plan data movement on new storage nodes appearing
// serviceDone is the error returned after service-phase node handling is finished.
type serviceDone struct {
// serveDone is the error returned after serving peer is finished.
type serveDone struct {
peer *_MasteredPeer
err error
ack chan struct{}
}
// service drives cluster during running state.
// serve drives cluster during running state.
//
// TODO document error meanings on return
//
// prerequisite for start: .partTab is operational wrt .nodeTab and verification passed
// XXX naming -> serve?
func (m *Master) service(ctx context.Context) (err error) {
defer task.Running(&ctx, "service")(&err)
func (m *Master) serve(ctx context.Context) (err error) {
defer task.Running(&ctx, "serve")(&err)
m.setClusterState(proto.ClusterRunning)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
servedq := make(chan serviceDone)
wg := &sync.WaitGroup{}
servedq := make(chan serveDone) // <- result of a serve task
inprogress := 0 // in-progress serve tasks
println("M: serve")
// goServe spawns serve task for a peer.
goServe := func(peer *_MasteredPeer) {
inprogress++
peer.wg.Go(func(peerCtx context.Context) error {
ctx, cancel := xxcontext.Merge/*Cancel*/(ctx, peerCtx)
defer cancel()
switch peer.node.Type {
case proto.STORAGE:
err = storCtlService(ctx, peer)
err = storCtlServe(ctx, peer)
case proto.CLIENT:
err = m.serveClient(ctx, peer)
......@@ -886,14 +876,11 @@ func (m *Master) service(ctx context.Context) (err error) {
// XXX ADMIN
}
// XXX do we need vvv ?
// FIXME deadlock wrt when loop is over and noone reads from servedq
// (XXX review similar places in recovery and verify)
ack := make(chan struct{})
servedq <- serviceDone{peer: peer, err: err, ack: ack}
servedq <- serveDone{peer: peer, err: err, ack: ack}
<-ack
// canceled service does not necessarily mean we should down the peer
// canceled serve does not necessarily mean we should down the peer
if xcontext.Canceled(err) {
err = nil
}
......@@ -904,41 +891,55 @@ func (m *Master) service(ctx context.Context) (err error) {
// spawn peer serve driver (it should be only storages on entry here?)
for _, peer := range m.peerTab {
// XXX clients? other nodes?
// XXX note PENDING - not adding to service; ok?
// XXX note PENDING - not adding to serve; ok?
if peer.node.Type == proto.STORAGE && peer.node.State == proto.RUNNING {
goServe(peer)
}
}
loop:
for {
// during stop: react only to task completion and node leaving
ctxDone := ctx.Done()
ctlStart := m.ctlStart
ctlStop := m.ctlStop
nodeComeq := m.nodeComeq
err = nil
stop := func(stopErr error) {
if err != nil {
return
}
err = stopErr
cancel()
ctxDone = nil
ctlStart = nil
ctlStop = nil
nodeComeq = nil
// XXX tell storages to stop
}
for inprogress > 0 {
select {
case <-ctx.Done():
err = ctx.Err()
break loop
case <-ctxDone:
stop(ctx.Err())
case ech := <-m.ctlStart:
case ech := <-ctlStart:
ech <- nil // we are already started
case ech := <-m.ctlStop:
case ech := <-ctlStop:
close(ech) // ok
err = fmt.Errorf("stop requested")
// XXX tell storages to stop
break loop
stop(fmt.Errorf("stop requested"))
// peer (should be) disconnected
case n := <-m.nodeLeaveq:
m.disconnectPeer(ctx, n.peer)
// if cluster became non-operational - cancel service
// XXX cancel() ?
// if cluster became non-operational - cancel serve
if !m.node.State.PartTab.OperationalWith(m.node.State.NodeTab) {
err = errClusterDegraded
break loop
stop(errClusterDegraded)
}
// new connection comes in
case n := <-m.nodeComeq:
// new connection comes in -> start serving it
case n := <-nodeComeq:
peer, ok := m.identify(ctx, n, /* XXX accept everyone */)
if !ok {
break
......@@ -946,25 +947,19 @@ loop:
goServe(peer)
// serving a peer is done
case d := <-servedq:
// TODO if S goes away -> check partTab still operational -> if not - recovery
_ = d
// XXX what else ? (-> txn control at least)
close(d.ack) // XXX explain why (see recovery)
inprogress--
}
}
// XXX wait all spawned service workers
wg.Wait()
return err
}
// storCtlService drives a storage node during cluster service state
func storCtlService(ctx context.Context, stor *_MasteredPeer) (err error) {
defer task.Runningf(&ctx, "%s: stor service", stor.node.NID)(&err)
// storCtlServe drives a storage node during cluster serve state
func storCtlServe(ctx context.Context, stor *_MasteredPeer) (err error) {
defer task.Runningf(&ctx, "%s: stor serve", stor.node.NID)(&err)
slink := stor.node.Link()
// XXX current neo/py does StartOperation / NotifyReady as separate
......@@ -1008,7 +1003,7 @@ func storCtlService(ctx context.Context, stor *_MasteredPeer) (err error) {
// serveClient serves incoming client link.
func (m *Master) serveClient(ctx context.Context, cli *_MasteredPeer) (err error) {
defer task.Runningf(&ctx, "%s: client service", cli.node.NID)(&err)
defer task.Runningf(&ctx, "%s: serve client", cli.node.NID)(&err)
clink := cli.node.Link()
// wg, ctx := errgroup.WithContext(ctx) // XXX -> sync.WorkGroup
......@@ -1209,7 +1204,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
node: node,
wg: xsync.NewWorkGroup(peerCtx),
cancel: peerCancel,
state0: m.node.State.Snapshot(),
state0: m.node.State.Snapshot(), // XXX don't need .state0 if vvv is not moved to .acceptPeer
// TODO change limiting by buffer size -> to limiting by time
// (see updateNodeTab for details)
notifyq: make(chan _ΔClusterState, 1024),
......@@ -1219,6 +1214,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
m.peerTab[node.NID] = peer
// spawn task to send accept and proxy δnodeTab/δpartTab to the peer
// XXX -> func m.acceptPeer ?
peer.wg.Go(func(ctx context.Context) error {
// go main <- peer "peer (should be) disconnected" when all peer's task finish
m.mainWG.Go(func(ctx context.Context) error {
......@@ -1299,6 +1295,7 @@ func (p *_MasteredPeer) run(ctx context.Context, f func() error) error {
}
// notify proxies δnodeTab/δpeerTab/δClusterState update to the peer.
// XXX merge into m.acceptPeer ?
func (p *_MasteredPeer) notify(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "notify")(&err)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment