Commit f70489a3 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e744ca3e
...@@ -661,6 +661,7 @@ func (m *Master) verify(ctx context.Context) (err error) { ...@@ -661,6 +661,7 @@ func (m *Master) verify(ctx context.Context) (err error) {
err := stor.run(ctx, func() error { err := stor.run(ctx, func() error {
var err error var err error
lastOid, lastTid, err = storCtlVerify(ctx, stor, m.node.State.PartTab) lastOid, lastTid, err = storCtlVerify(ctx, stor, m.node.State.PartTab)
return err
}) })
ack := make(chan struct{}) ack := make(chan struct{})
...@@ -1242,8 +1243,8 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer, ...@@ -1242,8 +1243,8 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
log.Infof(ctx, "%s: rejecting: %s", subj, err) log.Infof(ctx, "%s: rejecting: %s", subj, err)
m.mainWG.Go(func(ctx context.Context) error { m.mainWG.Go(func(ctx context.Context) error {
// XXX close link on ctx cancel? // XXX close link on ctx cancel?
idReq.Reply(err) n.req.Reply(err)
idReq.Link.Close() n.req.Link().Close()
// XXX log err (if any) // XXX log err (if any)
return nil // not to cancel main by a failing reject return nil // not to cancel main by a failing reject
}) })
...@@ -1274,10 +1275,10 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer, ...@@ -1274,10 +1275,10 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
Addr: n.idReq.Address, Addr: n.idReq.Address,
NID: nid, NID: nid,
State: nodeState, State: nodeState,
IdTime: proto.IdTime(m.timeMono()), IdTime: proto.IdTime(m.xtimeMono()),
} }
node = m.updateNodeTab(ctx, nodeInfo) node := m.updateNodeTab(ctx, nodeInfo)
node.SetLink(n.req.Link()) node.SetLink(n.req.Link())
...@@ -1305,26 +1306,26 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer, ...@@ -1305,26 +1306,26 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
// accept sends acceptance to just identified peer, sends nodeTab and partTab // accept sends acceptance to just identified peer, sends nodeTab and partTab
// and spawns task to proxy their updates to the peer. XXX // and spawns task to proxy their updates to the peer. XXX
// XXX +ctx? // XXX +ctx?
func (m *Master) __accept(p *_MasteredPeer, idReq *neonet.Request) error { func (m *Master) __accept(peer *_MasteredPeer, idReq *neonet.Request) error {
// XXX errctx? // XXX errctx?
err := idReq.Reply(p.accept) err := idReq.Reply(peer.accept)
if err != nil { if err != nil {
return fmt.Errorf("send accept: %w", err) return fmt.Errorf("send accept: %w", err)
} }
// XXX idReq close? // XXX idReq close?
// send initial state snapshot to accepted node // send initial state snapshot to accepted node
link := p.peer.Link() // XXX -> idReq.Link() instead? link := peer.node.Link() // XXX -> idReq.Link() instead?
// nodeTab // nodeTab
err = link.Send1(&p.state0.NodeTab) err = link.Send1(&peer.state0.NodeTab)
if err != nil { if err != nil {
return fmt.Errorf("send nodeTab: %w", err) return fmt.Errorf("send nodeTab: %w", err)
} }
// partTab (not to S until cluster is RUNNING) // partTab (not to S until cluster is RUNNING)
if !(peer.Type == proto.STORAGE && state0.Code != proto.ClusterRunning) { if !(peer.node.Type == proto.STORAGE && peer.state0.Code != proto.ClusterRunning) {
err = link.Send1(&p.state0.PartTab) err = link.Send1(&peer.state0.PartTab)
if err != nil { if err != nil {
return fmt.Errorf("send partTab: %w", err) return fmt.Errorf("send partTab: %w", err)
} }
...@@ -1334,8 +1335,8 @@ func (m *Master) __accept(p *_MasteredPeer, idReq *neonet.Request) error { ...@@ -1334,8 +1335,8 @@ func (m *Master) __accept(p *_MasteredPeer, idReq *neonet.Request) error {
// TODO indicate that initial phase of accept is done // TODO indicate that initial phase of accept is done
p.wg.Go(p.notify) // main -> peer δnodeTab/δpartTab/δcluterState to proxy to peer link peer.wg.Go(peer.notify) // main -> peer δnodeTab/δpartTab/δcluterState to proxy to peer link
m.mainWG.Go(p.waitAll) // main <- peer "peer (should be) disconnected" m.mainWG.Go(peer.waitAll) // main <- peer "peer (should be) disconnected"
return nil return nil
} }
...@@ -1346,7 +1347,7 @@ func (p *_MasteredPeer) notify(ctx context.Context) (err error) { ...@@ -1346,7 +1347,7 @@ func (p *_MasteredPeer) notify(ctx context.Context) (err error) {
stateCode := p.state0.Code stateCode := p.state0.Code
// XXX vvv right? // XXX vvv right?
return xxcontext.WithCloseOnErrCancel(ctx, link, func() error { return xcontext.WithCloseOnErrCancel(ctx, p.node.Link(), func() error {
for { for {
var δstate _ΔClusterState var δstate _ΔClusterState
...@@ -1371,7 +1372,7 @@ func (p *_MasteredPeer) notify(ctx context.Context) (err error) { ...@@ -1371,7 +1372,7 @@ func (p *_MasteredPeer) notify(ctx context.Context) (err error) {
case *_ΔPartTab: case *_ΔPartTab:
// don't send δpartTab to S unless cluster is RUNNING // don't send δpartTab to S unless cluster is RUNNING
if peer.Type == proto.STORAGE && stateCode != proto.ClusterRunning { if p.node.Type == proto.STORAGE && stateCode != proto.ClusterRunning {
continue continue
} }
msg = &proto.NotifyPartitionChanges{ msg = &proto.NotifyPartitionChanges{
...@@ -1392,7 +1393,7 @@ func (p *_MasteredPeer) notify(ctx context.Context) (err error) { ...@@ -1392,7 +1393,7 @@ func (p *_MasteredPeer) notify(ctx context.Context) (err error) {
panic("bug") panic("bug")
} }
err := link.Send1(msg) err := p.node.Link().Send1(msg)
if err != nil { if err != nil {
return err return err
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment