Commit dd1540ff authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e6e0ecb9
...@@ -33,6 +33,7 @@ import ( ...@@ -33,6 +33,7 @@ import (
"sync" "sync"
"time" "time"
xxcontext "lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync" "lab.nexedi.com/kirr/go123/xsync"
...@@ -378,6 +379,7 @@ loop: ...@@ -378,6 +379,7 @@ loop:
// FIXME if identify=ok -> subscribe to δ(nodeTab) and send initial nodeTab right after accept (accept should do it?) // FIXME if identify=ok -> subscribe to δ(nodeTab) and send initial nodeTab right after accept (accept should do it?)
node, state0, resp := m.identify(ctx, n, /* XXX only accept storages -> PENDING */) node, state0, resp := m.identify(ctx, n, /* XXX only accept storages -> PENDING */)
// XXX -> move to identify (spawn under peerWork.wg) ?
if node == nil { if node == nil {
goreject(ctx, wg, n.req, resp) goreject(ctx, wg, n.req, resp)
break break
...@@ -389,15 +391,17 @@ loop: ...@@ -389,15 +391,17 @@ loop:
go func() { go func() {
defer wg.Done() defer wg.Done()
/*
// start recovery // start recovery
var pt *xneo.PartitionTable var pt *xneo.PartitionTable
err := acceptAndRun(func(node *_PeerNode) error { err := acceptAndRun(func(workCtx context.Context, node *xneo.PeerNode) error {
ctx, cancel := xxcontext.Merge/*Cancel*/(ctx, workCtx)
defer cancel()
pt, err = storCtlRecovery(ctx, node) pt, err = storCtlRecovery(ctx, node)
return err
}) })
recevery <- storRecovery{stor: node, partTab: pt, err: err} recovery <- storRecovery{stor: node, partTab: pt, err: err}
*/
/*
err := m.accept(node, state0, n.req, resp) err := m.accept(node, state0, n.req, resp)
if err != nil { if err != nil {
recovery <- storRecovery{stor: node, err: err} recovery <- storRecovery{stor: node, err: err}
...@@ -406,6 +410,7 @@ loop: ...@@ -406,6 +410,7 @@ loop:
// start recovery // start recovery
storCtlRecovery(ctx, node, recovery) storCtlRecovery(ctx, node, recovery)
*/
}() }()
// XXX <-m.nodeLeave // XXX <-m.nodeLeave
...@@ -1266,10 +1271,11 @@ func (m *Master) accept(peer *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot, ...@@ -1266,10 +1271,11 @@ func (m *Master) accept(peer *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot,
// XXX send clusterState too? (NEO/py does not send it) // XXX send clusterState too? (NEO/py does not send it)
var w *peerWork // XXX stub <- = .peerWorkTab[peer.NID] set from main
// go proxy δstate ... XXX // go proxy δstate ... XXX
// XXX under which wg? -> under per-peer wg // XXX under which wg? -> under per-peer wg
wg.Go(func(ctx context.Context) (err error) { w.wg.Go(func(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "send cluster updates")(&err) defer task.Runningf(&ctx, "send cluster updates")(&err)
stateCode := state0.Code stateCode := state0.Code
...@@ -1281,7 +1287,7 @@ func (m *Master) accept(peer *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot, ...@@ -1281,7 +1287,7 @@ func (m *Master) accept(peer *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot,
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case δstate = <-notifyq: // XXX could be also closed? case δstate = <-w.notifyq: // XXX could be also closed?
} }
var msg proto.Msg var msg proto.Msg
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment