Commit 622ee617 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2b8c87d4
...@@ -41,6 +41,10 @@ type Master struct { ...@@ -41,6 +41,10 @@ type Master struct {
partTab PartitionTable partTab PartitionTable
clusterState ClusterState clusterState ClusterState
// channels controlling main driver
ctlStart chan ctlStart // request to start cluster
ctlStop chan ctlStop // request to stop cluster
// channels from various workers to main driver // channels from various workers to main driver
nodeCome chan nodeCome // node connected nodeCome chan nodeCome // node connected
nodeLeave chan nodeLeave // node disconnected nodeLeave chan nodeLeave // node disconnected
...@@ -48,6 +52,15 @@ type Master struct { ...@@ -48,6 +52,15 @@ type Master struct {
storRecovery chan storRecovery // storage node passed recovery XXX better explicitly pass to worker as arg? storRecovery chan storRecovery // storage node passed recovery XXX better explicitly pass to worker as arg?
} }
type ctlStart struct {
// XXX +ctx ?
resp chan error
}
type ctlStop struct {
// XXX +ctx ?
resp chan error
}
// node connects // node connects
type nodeCome struct { type nodeCome struct {
...@@ -72,7 +85,8 @@ type storRecovery struct { ...@@ -72,7 +85,8 @@ type storRecovery struct {
func NewMaster(clusterName string) *Master { func NewMaster(clusterName string) *Master {
m := &Master{clusterName: clusterName} m := &Master{clusterName: clusterName}
m.SetClusterState(RECOVERING) // XXX no elections - we are the only master m.clusterState = RECOVERING // XXX no elections - we are the only master
go m.run(context.TODO()) // XXX ctx
return m return m
} }
...@@ -80,9 +94,11 @@ func NewMaster(clusterName string) *Master { ...@@ -80,9 +94,11 @@ func NewMaster(clusterName string) *Master {
// XXX NotifyNodeInformation to all nodes whenever nodetab changes // XXX NotifyNodeInformation to all nodes whenever nodetab changes
func (m *Master) SetClusterState(state ClusterState) { // XXX -> Start(), Stop()
m.clusterState = state func (m *Master) SetClusterState(state ClusterState) error {
// XXX actions ? ch := make(chan error)
m.ctlState <- ctlState{state, ch}
return <-ch
} }
// run implements main master cluster management logic: node tracking, cluster // run implements main master cluster management logic: node tracking, cluster
...@@ -100,6 +116,29 @@ func (m *Master) run(ctx context.Context) { ...@@ -100,6 +116,29 @@ func (m *Master) run(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
panic("TODO") panic("TODO")
// command to start cluster
case c := <-m.ctlStart:
if c.state == m.clusterState {
// already there
m.resp <- nil
break
}
switch c.state {
case RECOVERING:
case VERIFYING: // = RUNNING
case CLUSTER_RUNNING:
case STOPPING:
default:
// TODO
}
// command to stop cluster
case c := <-m.ctlStop:
// TODO
// node connects & requests identification // node connects & requests identification
case n := <-m.nodeCome: case n := <-m.nodeCome:
nodeInfo, ok := m.accept(n) nodeInfo, ok := m.accept(n)
...@@ -334,8 +373,8 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -334,8 +373,8 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
idResp := <-idRespCh idResp := <-idRespCh
// if master accepted this node - don't forget to notify when it leaves // if master accepted this node - don't forget to notify when it leaves
_, noaccept := idResp.(error) _, rejected := idResp.(error)
if !noaccept { if !rejected {
defer func() { defer func() {
m.nodeLeave <- nodeLeave{link} m.nodeLeave <- nodeLeave{link}
}() }()
...@@ -348,7 +387,7 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -348,7 +387,7 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
} }
// nothing to do more here if identification was not accepted // nothing to do more here if identification was not accepted
if noaccept { if rejected {
logf("identify: %v", idResp) logf("identify: %v", idResp)
return return
} }
...@@ -358,6 +397,8 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -358,6 +397,8 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
// ---------------------------------------- // ----------------------------------------
// XXX recheck vvv // XXX recheck vvv
// XXX temp hack
connNotify := conn
// subscribe to nodeTab/partTab/clusterState and notify peer with updates // subscribe to nodeTab/partTab/clusterState and notify peer with updates
m.stateMu.Lock() m.stateMu.Lock()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment