Commit 7ee9a615 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4d71333f
...@@ -21,6 +21,7 @@ package neo ...@@ -21,6 +21,7 @@ package neo
import ( import (
"context" "context"
"errors"
"flag" "flag"
"fmt" "fmt"
"io" "io"
...@@ -54,7 +55,7 @@ type Master struct { ...@@ -54,7 +55,7 @@ type Master struct {
ctlStop chan chan error // request to stop cluster ctlStop chan chan error // request to stop cluster
ctlShutdown chan chan error // request to shutdown cluster XXX with ctx ? ctlShutdown chan chan error // request to shutdown cluster XXX with ctx ?
// channels from various workers to main driver // channels from workers directly serving peers to main driver
nodeCome chan nodeCome // node connected nodeCome chan nodeCome // node connected
nodeLeave chan nodeLeave // node disconnected nodeLeave chan nodeLeave // node disconnected
} }
...@@ -72,6 +73,7 @@ type nodeLeave struct { ...@@ -72,6 +73,7 @@ type nodeLeave struct {
link *NodeLink // XXX better use uuid allocated on nodeCome ? link *NodeLink // XXX better use uuid allocated on nodeCome ?
} }
// NewMaster TODO ...
func NewMaster(clusterName string) *Master { func NewMaster(clusterName string) *Master {
m := &Master{clusterName: clusterName} m := &Master{clusterName: clusterName}
m.nodeUUID = m.allocUUID(MASTER) m.nodeUUID = m.allocUUID(MASTER)
...@@ -121,7 +123,7 @@ func (m *Master) setClusterState(state ClusterState) { ...@@ -121,7 +123,7 @@ func (m *Master) setClusterState(state ClusterState) {
} }
// run is a process which implements main master cluster management logic: node tracking, cluster // run is the process which implements main master cluster management logic: node tracking, cluster
// state updates, scheduling data movement between storage nodes etc // state updates, scheduling data movement between storage nodes etc
func (m *Master) run(ctx context.Context) { func (m *Master) run(ctx context.Context) {
...@@ -130,23 +132,30 @@ func (m *Master) run(ctx context.Context) { ...@@ -130,23 +132,30 @@ func (m *Master) run(ctx context.Context) {
for ctx.Err() == nil { for ctx.Err() == nil {
err := m.recovery(ctx) err := m.recovery(ctx)
if err != nil { if err != nil {
return // recovery cancelled XXX recheck fmt.Println(err)
return // recovery cancelled
} }
// successful recovery -> verify // successful recovery -> verify
err = m.verify(ctx) err = m.verify(ctx)
if err != nil { if err != nil {
fmt.Println(err)
continue // -> recovery continue // -> recovery
} }
// successful verify -> service // successful verify -> service
err = m.service(ctx) err = m.service(ctx)
if err != nil { if err != nil {
fmt.Println(err)
continue // -> recovery continue // -> recovery
} }
// XXX could err be == nil here - after service finishes ?
// XXX shutdown ? // XXX shutdown ?
} }
fmt.Printf("master: run: %v\n", ctx.Err())
} }
...@@ -165,6 +174,7 @@ func (m *Master) run(ctx context.Context) { ...@@ -165,6 +174,7 @@ func (m *Master) run(ctx context.Context) {
// - nil: recovery was ok and a command came for cluster to start // - nil: recovery was ok and a command came for cluster to start
// - !nil: recovery was cancelled // - !nil: recovery was cancelled
func (m *Master) recovery(ctx context.Context) (err error) { func (m *Master) recovery(ctx context.Context) (err error) {
fmt.Println("master: recovery")
defer errcontextf(&err, "master: recovery") defer errcontextf(&err, "master: recovery")
m.setClusterState(ClusterRecovering) m.setClusterState(ClusterRecovering)
...@@ -218,6 +228,9 @@ loop: ...@@ -218,6 +228,9 @@ loop:
m.partTab = r.partTab m.partTab = r.partTab
} }
// XXX handle case of new cluster - when no storage reports valid parttab
// XXX -> create new parttab
// XXX update something indicating cluster currently can be operational or not ? // XXX update something indicating cluster currently can be operational or not ?
...@@ -269,6 +282,7 @@ type storRecovery struct { ...@@ -269,6 +282,7 @@ type storRecovery struct {
// it retrieves various ids and partition table from as stored on the storage // it retrieves various ids and partition table from as stored on the storage
func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery) { func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery) {
var err error var err error
// XXX where this close link on error should be ?
defer func() { defer func() {
if err == nil { if err == nil {
return return
...@@ -341,8 +355,13 @@ var errClusterDegraded = errors.New("cluster became non-operatonal") ...@@ -341,8 +355,13 @@ var errClusterDegraded = errors.New("cluster became non-operatonal")
// verify drives cluster via verification phase // verify drives cluster via verification phase
// //
// when verify finishes error indicates:
// - nil: verification completed ok; cluster is ready to enter running state
// - !nil: verification failed; cluster needs to be reset to recovery state
//
// prerequisite for start: .partTab is operational wrt .nodeTab // prerequisite for start: .partTab is operational wrt .nodeTab
func (m *Master) verify(ctx context.Context) (err error) { func (m *Master) verify(ctx context.Context) (err error) {
fmt.Println("master: verify")
defer errcontextf(&err, "master: verify") defer errcontextf(&err, "master: verify")
m.setClusterState(ClusterVerifying) m.setClusterState(ClusterVerifying)
...@@ -352,10 +371,12 @@ func (m *Master) verify(ctx context.Context) (err error) { ...@@ -352,10 +371,12 @@ func (m *Master) verify(ctx context.Context) (err error) {
verify := make(chan storVerify) verify := make(chan storVerify)
inprogress := 0 inprogress := 0
// verification = ask every storage to verify and wait for all them to complete
// XXX "all them" -> "enough of them"?
// NOTE we don't reset m.lastOid / m.lastTid to 0 in the beginning of verification // NOTE we don't reset m.lastOid / m.lastTid to 0 in the beginning of verification
// with the idea that XXX // XXX (= py), rationale=?
// XXX ask every storage to verify and wait for _all_ them to complete?
// start verification on all storages we are currently in touch with // start verification on all storages we are currently in touch with
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.nodeTab.StorageList() {
if stor.NodeState > DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ? if stor.NodeState > DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
...@@ -390,13 +411,17 @@ loop: ...@@ -390,13 +411,17 @@ loop:
break loop break loop
} }
// a storage node came through verification - TODO // a storage node came through verification - adjust our last{Oid,Tid} if ok
// on error check - whether cluster became non-operational and stop verification if so
case v := <-verify: case v := <-verify:
inprogress-- inprogress--
if v.err != nil { if v.err != nil {
fmt.Printf("master: verify: %v\n", v.err) fmt.Printf("master: verify: %v\n", v.err)
// XXX mark S as non-working in nodeTab
// mark storage as non-working in nodeTab
// FIXME better -> v.node.setState(DOWN) ?
m.nodeTab.UpdateLinkDown(v.link)
// check partTab is still operational // check partTab is still operational
// if not -> cancel to go back to recovery // if not -> cancel to go back to recovery
...@@ -441,18 +466,19 @@ loop: ...@@ -441,18 +466,19 @@ loop:
type storVerify struct { type storVerify struct {
lastOid zodb.Oid lastOid zodb.Oid
lastTid zodb.Tid lastTid zodb.Tid
link *NodeLink
err error err error
} }
// storCtlVerify drives a storage node during cluster verifying (= starting) state // storCtlVerify drives a storage node during cluster verifying (= starting) state
func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) { func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) {
// XXX err context + link.Close on err // XXX link.Close on err
// XXX cancel on ctx // XXX cancel on ctx
var err error var err error
defer func() { defer func() {
if err != nil { if err != nil {
res <- storVerify{err: err} res <- storVerify{link: link, err: err}
} }
}() }()
defer errcontextf(&err, "%s: verify", link) defer errcontextf(&err, "%s: verify", link)
...@@ -479,7 +505,7 @@ func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) { ...@@ -479,7 +505,7 @@ func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) {
} }
// send results to driver // send results to driver
res <- storVerify{lastOid: last.LastOid, lastTid: last.LastTid} res <- storVerify{link: link, lastOid: last.LastOid, lastTid: last.LastTid}
} }
...@@ -494,20 +520,34 @@ func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) { ...@@ -494,20 +520,34 @@ func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) {
// service drives cluster during running state // service drives cluster during running state
// //
// prerequisite for start: .partTab is operational wrt .nodeTab and verification passed (XXX) // TODO document error meanings on return
//
// prerequisite for start: .partTab is operational wrt .nodeTab and verification passed
func (m *Master) service(ctx context.Context) (err error) { func (m *Master) service(ctx context.Context) (err error) {
fmt.Println("master: service")
defer errcontextf(&err, "master: service")
m.setClusterState(ClusterRunning) m.setClusterState(ClusterRunning)
loop: loop:
for { for {
select { select {
case n := <-m.nodeCome: case n := <-m.nodeCome:
// TODO _, ok := m.accept(n, /* XXX accept everyone */)
_ = n if !ok {
break
}
// XXX what here ?
case n := <-m.nodeLeave: case n := <-m.nodeLeave:
// TODO m.nodeTab.UpdateLinkDown(n.link)
_ = n
// if cluster became non-operational - cancel service
if !m.partTab.OperationalWith(&m.nodeTab) {
err = errClusterDegraded
break loop
}
// XXX what else ? (-> txn control at least) // XXX what else ? (-> txn control at least)
...@@ -526,10 +566,6 @@ loop: ...@@ -526,10 +566,6 @@ loop:
} }
} }
if err != nil {
// TODO
}
return err return err
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment