Commit a3f6517c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2b37a365
...@@ -425,7 +425,7 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -425,7 +425,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
var pt *xneo.PartitionTable var pt *xneo.PartitionTable
err := stor.run(ctx, func(...) { err := stor.run(ctx, func(...) {
pt, err = storCtlRecovery(...) pt, err = storCtlRecovery(ctx, stor)
}) })
ack := make(chan struct{}) ack := make(chan struct{})
...@@ -590,18 +590,9 @@ loop2: ...@@ -590,18 +590,9 @@ loop2:
// storCtlRecovery drives a storage node during cluster recovering state. // storCtlRecovery drives a storage node during cluster recovering state.
// it retrieves various ids and partition table from as stored on the storage // it retrieves various ids and partition table from as stored on the storage
func storCtlRecovery(ctx context.Context, stor *xneo.PeerNode, res chan storRecovery) { func storCtlRecovery(ctx context.Context, stor *_MasteredPeer) (_ *xneo.PartTab, err error) {
var err error slink := stor.node.Link()
defer func() { defer task.Runningf(&ctx, "%s: stor recovery", stor.node.NID)(&err)
if err == nil {
return
}
// on error provide feedback to storRecovery chan
res <- storRecovery{stor: stor, err: err}
}()
slink := stor.Link()
defer task.Runningf(&ctx, "%s: stor recovery", slink.RemoteAddr())(&err)
// XXX cancel on ctx // XXX cancel on ctx
// XXX close slink on err? (if yes -> xcontext.WithCloseOnErrCancel) // XXX close slink on err? (if yes -> xcontext.WithCloseOnErrCancel)
...@@ -609,18 +600,18 @@ func storCtlRecovery(ctx context.Context, stor *xneo.PeerNode, res chan storReco ...@@ -609,18 +600,18 @@ func storCtlRecovery(ctx context.Context, stor *xneo.PeerNode, res chan storReco
recovery := proto.AnswerRecovery{} recovery := proto.AnswerRecovery{}
err = slink.Ask1(&proto.Recovery{}, &recovery) err = slink.Ask1(&proto.Recovery{}, &recovery)
if err != nil { if err != nil {
return return nil, err
} }
resp := proto.AnswerPartitionTable{} resp := proto.AnswerPartitionTable{}
err = slink.Ask1(&proto.AskPartitionTable{}, &resp) err = slink.Ask1(&proto.AskPartitionTable{}, &resp)
if err != nil { if err != nil {
return return nil, err
} }
// reconstruct partition table from response // reconstruct partition table from response
pt := xneo.PartTabFromDump(resp.PTid, resp.RowList) // TODO handle resp.NumReplicas pt := xneo.PartTabFromDump(resp.PTid, resp.RowList) // TODO handle resp.NumReplicas
res <- storRecovery{stor: stor, partTab: pt} return pt, nil
} }
...@@ -807,7 +798,7 @@ func storCtlVerify(ctx context.Context, stor *xneo.PeerNode, pt *xneo.PartitionT ...@@ -807,7 +798,7 @@ func storCtlVerify(ctx context.Context, stor *xneo.PeerNode, pt *xneo.PartitionT
} }
}() }()
slink := stor.Link() slink := stor.Link()
defer task.Runningf(&ctx, "%s: stor verify", slink)(&err) defer task.Runningf(&ctx, "%s: stor verify", stor.node.NID)(&err)
// send just recovered parttab so storage saves it // send just recovered parttab so storage saves it
err = slink.Send1(&proto.SendPartitionTable{ err = slink.Send1(&proto.SendPartitionTable{
...@@ -961,7 +952,7 @@ loop: ...@@ -961,7 +952,7 @@ loop:
// storCtlService drives a storage node during cluster service state // storCtlService drives a storage node during cluster service state
func storCtlService(ctx context.Context, stor *xneo.PeerNode) (err error) { func storCtlService(ctx context.Context, stor *xneo.PeerNode) (err error) {
slink := stor.Link() slink := stor.Link()
defer task.Runningf(&ctx, "%s: stor service", slink.RemoteAddr())(&err) defer task.Runningf(&ctx, "%s: stor service", stor.node.NID)(&err)
// XXX current neo/py does StartOperation / NotifyReady as separate // XXX current neo/py does StartOperation / NotifyReady as separate
// sends, not exchange on the same conn. - py draftly fixed: see // sends, not exchange on the same conn. - py draftly fixed: see
...@@ -1003,9 +994,9 @@ func storCtlService(ctx context.Context, stor *xneo.PeerNode) (err error) { ...@@ -1003,9 +994,9 @@ func storCtlService(ctx context.Context, stor *xneo.PeerNode) (err error) {
} }
// serveClient serves incoming client link. // serveClient serves incoming client link.
func (m *Master) serveClient(ctx context.Context, cli *xneo.PeerNode) (err error) { func (m *Master) serveClient(ctx context.Context, cli *_MasteredPeer) (err error) {
clink := cli.Link() clink := cli.Link()
defer task.Runningf(&ctx, "%s: client service", clink.RemoteAddr())(&err) defer task.Runningf(&ctx, "%s: client service", cli.node.NID)(&err)
// wg, ctx := errgroup.WithContext(ctx) // XXX -> sync.WorkGroup // wg, ctx := errgroup.WithContext(ctx) // XXX -> sync.WorkGroup
defer xio.CloseWhenDone(ctx, clink)() // XXX -> cli.ResetLink? (better not here) defer xio.CloseWhenDone(ctx, clink)() // XXX -> cli.ResetLink? (better not here)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment