Commit 4d25e431 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5cc06ec3
...@@ -27,7 +27,6 @@ import ( ...@@ -27,7 +27,6 @@ import (
"net/url" "net/url"
"os" "os"
"os/exec" "os/exec"
"strings"
"testing" "testing"
"time" "time"
...@@ -280,7 +279,7 @@ func StartNEOGoSrv(opt NEOSrvOptions) (_ *NEOGoSrv, err error) { ...@@ -280,7 +279,7 @@ func StartNEOGoSrv(opt NEOSrvOptions) (_ *NEOGoSrv, err error) {
if err == nil { if err == nil {
break break
} }
if !strings.HasSuffix(err.Error(), "start: cluster is non-operational") { // XXX if !errors.Is(err, ErrStartNonOperational) {
return nil, err return nil, err
} }
......
...@@ -180,6 +180,7 @@ func NewMaster(clusterName string, net xnet.Networker) *Master { ...@@ -180,6 +180,7 @@ func NewMaster(clusterName string, net xnet.Networker) *Master {
// NOTE upon successful return cluster is not yet in running state - the transition will // NOTE upon successful return cluster is not yet in running state - the transition will
// take time and could be also automatically aborted due to cluster environment change (e.g. // take time and could be also automatically aborted due to cluster environment change (e.g.
// a storage node goes down). // a storage node goes down).
var ErrStartNonOperational = errors.New("start: cluster is non-operational")
func (m *Master) Start() error { func (m *Master) Start() error {
ech := make(chan error) ech := make(chan error)
m.ctlStart <- ech m.ctlStart <- ech
...@@ -195,6 +196,7 @@ func (m *Master) Stop() { ...@@ -195,6 +196,7 @@ func (m *Master) Stop() {
// setClusterState sets .clusterState and notifies subscribers. // setClusterState sets .clusterState and notifies subscribers.
func (m *Master) setClusterState(ctx context.Context, state proto.ClusterState) { func (m *Master) setClusterState(ctx context.Context, state proto.ClusterState) {
log.Infof(ctx, "cluster state <- %s", state)
m.node.State.Code.Set(state) m.node.State.Code.Set(state)
//m.notifyAll(ctx, &_ΔStateCode{state}) TODO enable //m.notifyAll(ctx, &_ΔStateCode{state}) TODO enable
} }
...@@ -235,7 +237,6 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -235,7 +237,6 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
m.mainWG.Go(func(ctx context.Context) (err error) { m.mainWG.Go(func(ctx context.Context) (err error) {
defer task.Running(&ctx, "accept")(&err) defer task.Running(&ctx, "accept")(&err)
// XXX dup in storage
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {
return ctx.Err() return ctx.Err()
...@@ -292,16 +293,11 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -292,16 +293,11 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
// main is the process that implements main master cluster management logic: node tracking, cluster // main is the process that implements main master cluster management logic: node tracking, cluster
// state updates, scheduling data movement between storage nodes, etc. // state updates, scheduling data movement between storage nodes, etc.
//
// NOTE main's goroutine is the only mutator of nodeTab, partTab and other cluster state
func (m *Master) main(ctx context.Context) (err error) { func (m *Master) main(ctx context.Context) (err error) {
defer task.Running(&ctx, "main")(&err) defer task.Running(&ctx, "main")(&err)
// NOTE Run's goroutine is the only mutator of nodeTab, partTab and other cluster state
// XXX however since clients request state reading we should use node.StateMu?
// XXX -> better rework protocol so that master pushes itself (not
// being pulled) to clients everything they need.
// -> it was reworked (see bf240897)
for ctx.Err() == nil { for ctx.Err() == nil {
// recover partition table from storages and wait till enough // recover partition table from storages and wait till enough
// storages connects us so that we can see the partition table // storages connects us so that we can see the partition table
...@@ -311,22 +307,24 @@ func (m *Master) main(ctx context.Context) (err error) { ...@@ -311,22 +307,24 @@ func (m *Master) main(ctx context.Context) (err error) {
// a command came to us to start the cluster. // a command came to us to start the cluster.
err := m.recovery(ctx) err := m.recovery(ctx)
if err != nil { if err != nil {
//log.Error(ctx, err) log.Error(ctx, err)
return err // recovery cancelled return err // recovery cancelled
} }
log.Infof(ctx, "recovered ok; partTab:\n%s", m.node.State.PartTab)
// make sure transactions on storages are properly finished, in // make sure transactions on storages are properly finished, in
// case previously it was unclean shutdown. // case previously it was unclean shutdown.
err = m.verify(ctx) err = m.verify(ctx)
if err != nil { if err != nil {
//log.Error(ctx, err) log.Warning(ctx, err)
continue // -> recovery continue // -> recovery
} }
log.Info(ctx, "verified ok")
// provide service as long as partition table stays operational // provide service as long as partition table stays operational
err = m.serve(ctx) err = m.serve(ctx)
if err != nil { if err != nil {
//log.Error(ctx, err) log.Warning(ctx, err)
continue // -> recovery continue // -> recovery
} }
...@@ -378,9 +376,8 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -378,9 +376,8 @@ func (m *Master) recovery(ctx context.Context) (err error) {
// requests to .ctlStart received when readyToStart // requests to .ctlStart received when readyToStart
// on success answered when full recovery completes // on success answered when full recovery completes
startReqv := []chan error{} startReqv := []chan error{}
errStartNonOperational := fmt.Errorf("start: cluster is non-operational")
defer func() { defer func() {
errStart := errStartNonOperational errStart := ErrStartNonOperational
if err == nil { if err == nil {
errStart = nil errStart = nil
} }
...@@ -420,7 +417,7 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -420,7 +417,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
// cluster became non-operational - cancel previously queued start requests // cluster became non-operational - cancel previously queued start requests
if !ready { if !ready {
for _, ech := range startReqv { for _, ech := range startReqv {
ech <- errStartNonOperational ech <- ErrStartNonOperational
} }
startReqv = startReqv[:0] startReqv = startReqv[:0]
} }
...@@ -500,7 +497,7 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -500,7 +497,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
startReqv = append(startReqv, ech) startReqv = append(startReqv, ech)
} else { } else {
log.Infof(ctx, "start command - err - we are not ready") log.Infof(ctx, "start command - err - we are not ready")
ech <- errStartNonOperational ech <- ErrStartNonOperational
} }
case ech := <-ctlStop: case ech := <-ctlStop:
......
...@@ -268,8 +268,7 @@ func (stor *Storage) serve(ctx context.Context) (err error) { ...@@ -268,8 +268,7 @@ func (stor *Storage) serve(ctx context.Context) (err error) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
defer wg.Wait() defer wg.Wait()
// XXX dup from master -> Node.Listen() -> Accept() ? // XXX ? -> _MasteredNode.Accept(lli) (it will verify IdTime against .nodeTab[nid])
// XXX ? -> Node.Accept(lli) (it will verify IdTime against Node.nodeTab[nid])
// XXX ? -> Node.Serve(lli -> func(idReq)) // XXX ? -> Node.Serve(lli -> func(idReq))
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment