Commit 62cc33a5 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e2630ae4
...@@ -96,8 +96,9 @@ var tstart time.Time = time.Now() ...@@ -96,8 +96,9 @@ var tstart time.Time = time.Now()
// state updates, scheduling data movement between storage nodes etc // state updates, scheduling data movement between storage nodes etc
func (m *Master) run(ctx context.Context) { func (m *Master) run(ctx context.Context) {
// current function to ask/control a storage depending on current cluster state and master idea // current function to ask/control a storage depending on current cluster state
// + associated context covering all storage nodes // + associated context covering all storage nodes
// XXX + waitgroup ?
storCtl := m.storCtlRecovery storCtl := m.storCtlRecovery
storCtlCtx, storCtlCancel := context.WithCancel(ctx) storCtlCtx, storCtlCancel := context.WithCancel(ctx)
...@@ -115,13 +116,10 @@ func (m *Master) run(ctx context.Context) { ...@@ -115,13 +116,10 @@ func (m *Master) run(ctx context.Context) {
} }
// new storage node joined cluster // new storage node joined cluster
switch m.clusterState {
case RECOVERING:
}
// XXX consider .clusterState change // XXX consider .clusterState change
// launch current storage control work on the new node // launch current storage control work on the joined node
go storCtl(storCtlCtx, n.link) go storCtl(storCtlCtx, n.link)
// TODO consider adjusting partTab // TODO consider adjusting partTab
...@@ -136,8 +134,10 @@ func (m *Master) run(ctx context.Context) { ...@@ -136,8 +134,10 @@ func (m *Master) run(ctx context.Context) {
if r.partTab.ptid > m.partTab.ptid { if r.partTab.ptid > m.partTab.ptid {
m.partTab = r.partTab m.partTab = r.partTab
// XXX also transfer subscribers ? // XXX also transfer subscribers ?
// XXX or during recovery no one must be subscribed to partTab ? // XXX -> during recovery no one must be subscribed to partTab
} }
// XXX consider clusterState change
} }
} }
...@@ -234,6 +234,7 @@ func (m *Master) storCtlRecovery(ctx context.Context, link *NodeLink) { ...@@ -234,6 +234,7 @@ func (m *Master) storCtlRecovery(ctx context.Context, link *NodeLink) {
if err != nil { if err != nil {
return return
} }
// XXX cancel on ctx
recovery := AnswerRecovery{} recovery := AnswerRecovery{}
err = Ask(conn, &Recovery{}, &recovery) err = Ask(conn, &Recovery{}, &recovery)
...@@ -311,7 +312,7 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -311,7 +312,7 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
}() }()
// identify // identify
// XXX add logic to verify/assign nodeID and do other requested identification checks // XXX -> change to use nodeCome
nodeInfo, err := IdentifyPeer(link, MASTER) nodeInfo, err := IdentifyPeer(link, MASTER)
if err != nil { if err != nil {
fmt.Printf("master: %v\n", err) fmt.Printf("master: %v\n", err)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment