Commit 4b697c4b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 1ca427c0
...@@ -124,40 +124,43 @@ func (m *Master) Shutdown() error { ...@@ -124,40 +124,43 @@ func (m *Master) Shutdown() error {
} }
func (m *Master) setClusterState(state ClusterState) { func (m *Master) setClusterState(state ClusterState) {
if state == m.clusterState {
return
}
m.clusterState = state m.clusterState = state
// TODO notify subscribers // TODO notify subscribers
} }
func (m *Master) xxx(ctx ...) { // run is a process which implements main master cluster management logic: node tracking, cluster
var err error // state updates, scheduling data movement between storage nodes etc
func (m *Master) run(ctx context.Context) {
// NOTE run's goroutine is the only mutator of nodeTab, partTab and other cluster state
for ctx.Err() == nil { for ctx.Err() == nil {
err = recovery(ctx) err := m.recovery(ctx)
if err != nil { if err != nil {
return // XXX return // recovery cancelled XXX recheck
} }
// successful recovery -> verify // successful recovery -> verify
err = verify(ctx) err = m.verify(ctx)
if err != nil { if err != nil {
continue // -> recovery continue // -> recovery
} }
// successful verify -> service // successful verify -> service
err = service(ctx) err = m.service(ctx)
if err != nil { if err != nil {
// XXX what about shutdown ?
continue // -> recovery continue // -> recovery
} }
}
}
// run is a process which implements main master cluster management logic: node tracking, cluster // XXX shutdown
// state updates, scheduling data movement between storage nodes etc }
func (m *Master) run(ctx context.Context) {
/*
go m.recovery(ctx) go m.recovery(ctx)
for { for {
...@@ -203,7 +206,7 @@ func (m *Master) run(ctx context.Context) { ...@@ -203,7 +206,7 @@ func (m *Master) run(ctx context.Context) {
// TODO // TODO
} }
} }
*/
} }
...@@ -221,7 +224,9 @@ func (m *Master) run(ctx context.Context) { ...@@ -221,7 +224,9 @@ func (m *Master) run(ctx context.Context) {
// when recovery finishes error indicates: // when recovery finishes error indicates:
// - nil: recovery was ok and a command came for cluster to start // - nil: recovery was ok and a command came for cluster to start
// - !nil: recovery was cancelled // - !nil: recovery was cancelled
func (m *Master) recovery(ctx context.Context) error { func (m *Master) recovery(ctx context.Context) (err error) {
m.setClusterState(ClusterRecovering)
recovery := make(chan storRecovery) recovery := make(chan storRecovery)
rctx, rcancel := context.WithCancel(ctx) rctx, rcancel := context.WithCancel(ctx)
defer rcancel() defer rcancel()
...@@ -274,9 +279,8 @@ loop: ...@@ -274,9 +279,8 @@ loop:
// XXX update something indicating cluster currently can be operational or not ? // XXX update something indicating cluster currently can be operational or not ?
// request from master: "I want to start - ok?" - if ok we reply ok and exit // request to start the cluster - if ok we exit replying ok
// if not ok - we just reply not ok // if not ok - we just reply not ok
//case s := <-m.wantToStart:
case c := <-m.ctlStart: case c := <-m.ctlStart:
if m.partTab.OperationalWith(&m.nodeTab) { if m.partTab.OperationalWith(&m.nodeTab) {
// reply "ok to start" after whole recovery finishes // reply "ok to start" after whole recovery finishes
...@@ -293,7 +297,7 @@ loop: ...@@ -293,7 +297,7 @@ loop:
break loop break loop
} }
s <- fmt.Errorf("start: cluster is non-operational") c.resp <- fmt.Errorf("start: cluster is non-operational")
case c := <-m.ctlStop: case c := <-m.ctlStop:
c.resp <- nil // we are already recovering c.resp <- nil // we are already recovering
...@@ -309,7 +313,7 @@ loop: ...@@ -309,7 +313,7 @@ loop:
<-recovery <-recovery
} }
// XXX err return err
} }
// storRecovery is result of a storage node passing recovery phase // storRecovery is result of a storage node passing recovery phase
...@@ -390,11 +394,11 @@ func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery) ...@@ -390,11 +394,11 @@ func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery)
// - once we are done without loosing too much storages in the process (so that // - once we are done without loosing too much storages in the process (so that
// parttab is still operational) we are ready to enter servicing state. // parttab is still operational) we are ready to enter servicing state.
// verify is a process that drives cluster via verification phase // verify drives cluster via verification phase
// //
// prerequisite for start: .partTab is operational wrt .nodeTab // prerequisite for start: .partTab is operational wrt .nodeTab
func (m *Master) verify(ctx context.Context) error { //, storv []*NodeLink) error { func (m *Master) verify(ctx context.Context) error {
// XXX ask every storage for verify and wait for _all_ them to complete? m.setClusterState(ClusterVerifying)
var err error var err error
verify := make(chan storVerify) verify := make(chan storVerify)
...@@ -402,6 +406,7 @@ func (m *Master) verify(ctx context.Context) error { //, storv []*NodeLink) erro ...@@ -402,6 +406,7 @@ func (m *Master) verify(ctx context.Context) error { //, storv []*NodeLink) erro
defer vcancel() defer vcancel()
inprogress := 0 inprogress := 0
// XXX ask every storage for verify and wait for _all_ them to complete?
// XXX do we need to reset m.lastOid / m.lastTid to 0 in the beginning? // XXX do we need to reset m.lastOid / m.lastTid to 0 in the beginning?
// start verification on all storages we are currently in touch with // start verification on all storages we are currently in touch with
...@@ -415,9 +420,11 @@ loop: ...@@ -415,9 +420,11 @@ loop:
select { select {
case n := <-m.nodeCome: case n := <-m.nodeCome:
// TODO // TODO
_ = n
case n := <-m.nodeLeave: case n := <-m.nodeLeave:
// TODO // TODO
_ = n
case v := <-verify: case v := <-verify:
inprogress-- inprogress--
...@@ -526,18 +533,21 @@ func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) { ...@@ -526,18 +533,21 @@ func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) {
// //
// TODO also plan data movement on new storage nodes appearing // TODO also plan data movement on new storage nodes appearing
// service is the process that drives cluster during running state // service drives cluster during running state
// //
func (m *Master) service(ctx context.Context) { func (m *Master) service(ctx context.Context) (err error) {
m.setClusterState(ClusterRunning)
loop: loop:
for { for {
select { select {
case n := <-m.nodeCome: case n := <-m.nodeCome:
// TODO // TODO
_ = n
case n := <-m.nodeLeave: case n := <-m.nodeLeave:
// TODO // TODO
_ = n
// XXX what else ? (-> txn control at least) // XXX what else ? (-> txn control at least)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment