Commit 67f98d72 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 534b777c
...@@ -334,7 +334,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -334,7 +334,7 @@ func TestMasterStorage(t *testing.T) {
// TODO ^^^ should be sent to S // TODO ^^^ should be sent to S
tc.Expect(conntx("m:2", "s:2", 1, &neo.StartOperation{Backup: false})) tc.Expect(conntx("m:2", "s:2", 1, &neo.StartOperation{Backup: false}))
tc.Expect(conntx("s:2", "m:2", 1, &neo.NotifyReady{}) tc.Expect(conntx("s:2", "m:2", 1, &neo.NotifyReady{}))
// TODO S leave while service // TODO S leave while service
......
...@@ -26,6 +26,7 @@ import ( ...@@ -26,6 +26,7 @@ import (
"fmt" "fmt"
// "math" // "math"
"sync" "sync"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
...@@ -759,9 +760,9 @@ func storCtlVerify(ctx context.Context, stor *neo.Node, pt *neo.PartitionTable, ...@@ -759,9 +760,9 @@ func storCtlVerify(ctx context.Context, stor *neo.Node, pt *neo.PartitionTable,
// //
// TODO also plan data movement on new storage nodes appearing // TODO also plan data movement on new storage nodes appearing
// nodeServiced is the error returned after service-phase node handling is finished // serviceDone is the error returned after service-phase node handling is finished
type nodeServiced { type serviceDone struct {
node *neoNode node *neo.Node
err error err error
} }
...@@ -777,7 +778,7 @@ func (m *Master) service(ctx context.Context) (err error) { ...@@ -777,7 +778,7 @@ func (m *Master) service(ctx context.Context) (err error) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
service := make( serviced := make(chan serviceDone)
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
// spawn per-storage service driver // spawn per-storage service driver
...@@ -786,7 +787,7 @@ func (m *Master) service(ctx context.Context) (err error) { ...@@ -786,7 +787,7 @@ func (m *Master) service(ctx context.Context) (err error) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
storCtlService(ctx, stor, service) storCtlService(ctx, stor, serviced)
}() }()
} }
} }
...@@ -807,18 +808,18 @@ loop: ...@@ -807,18 +808,18 @@ loop:
go func() { go func() {
defer wg.Done() defer wg.Done()
err = accept(n.conn, resp) err = m.accept(ctx, n.conn, resp)
if err != nil { if err != nil {
service <- nodeService{node: node, err: err} serviced <- serviceDone{node: node, err: err}
return return
} }
switch node.Type { switch node.Type {
case neo.STORAGE: case neo.STORAGE:
storCtlService(ctx, node, service) storCtlService(ctx, node, serviced)
//case neo.CLIENT: //case neo.CLIENT:
// serveClient(ctx, node, service) // serveClient(ctx, node, serviced)
// XXX ADMIN // XXX ADMIN
} }
...@@ -857,16 +858,14 @@ loop: ...@@ -857,16 +858,14 @@ loop:
// storCtlService drives a storage node during cluster service state // storCtlService drives a storage node during cluster service state
// XXX text // XXX text
func storCtlService(ctx context.Context, stor *neo.Node, srv chan serviceErr) { func storCtlService(ctx context.Context, stor *neo.Node, done chan serviceDone) {
/* err := storCtlService1(ctx, stor)
var err error done <- serviceDone{node: stor, err: err}
defer func() { }
if err == nil {
return func storCtlService1(ctx context.Context, stor *neo.Node) (err error) {
} defer runningf(&ctx, "%s: stor service", stor.Link.RemoteAddr())(&err)
// on error provide feedback to main driver
*/
conn := stor.Conn conn := stor.Conn
// XXX send nodeTab ? // XXX send nodeTab ?
...@@ -888,7 +887,8 @@ func storCtlService(ctx context.Context, stor *neo.Node, srv chan serviceErr) { ...@@ -888,7 +887,8 @@ func storCtlService(ctx context.Context, stor *neo.Node, srv chan serviceErr) {
case <-ctx.Done(): case <-ctx.Done():
// XXX also send StopOperation? // XXX also send StopOperation?
break loop // XXX close link?
return ctx.Err() // XXX ok?
} }
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment