Commit c8be2150 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4a9252a6
...@@ -757,6 +757,7 @@ loop: ...@@ -757,6 +757,7 @@ loop:
} }
// wait all workers to finish (which should come without delay since it was cancelled) // wait all workers to finish (which should come without delay since it was cancelled)
// XXX not good - see loop2 in recovery about why
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
wg.Wait() wg.Wait()
...@@ -767,15 +768,9 @@ loop2: ...@@ -767,15 +768,9 @@ loop2:
for { for {
select { select {
case v := <-verifyq: case v := <-verifyq:
// XXX dup wrt <-verifyq handler above close(v.ack)
log.Error(ctx, v.err) log.Error(ctx, v.err)
if !xcontext.Canceled(v.err) {
v.stor.ResetLink(ctx)
m.updateNodeState(ctx, v.stor, proto.DOWN)
// XXX nodeLeave <-
}
case <-done: case <-done:
break loop2 break loop2
} }
...@@ -790,6 +785,8 @@ type storVerify struct { ...@@ -790,6 +785,8 @@ type storVerify struct {
lastOid zodb.Oid lastOid zodb.Oid
lastTid zodb.Tid lastTid zodb.Tid
err error err error
ack chan struct{}
} }
// storCtlVerify drives a storage node during cluster verifying (= starting) state. // storCtlVerify drives a storage node during cluster verifying (= starting) state.
...@@ -797,7 +794,7 @@ func storCtlVerify(ctx context.Context, stor *_MasteredPeer, pt *xneo.PartitionT ...@@ -797,7 +794,7 @@ func storCtlVerify(ctx context.Context, stor *_MasteredPeer, pt *xneo.PartitionT
// XXX link.Close on err -> = xcontext.WithCloseOnErrCancel // XXX link.Close on err -> = xcontext.WithCloseOnErrCancel
// XXX cancel on ctx -> = ^^^ // XXX cancel on ctx -> = ^^^
slink := stor.Link() slink := stor.node.Link()
defer task.Runningf(&ctx, "%s: stor verify", stor.node.NID)(&err) defer task.Runningf(&ctx, "%s: stor verify", stor.node.NID)(&err)
lastOid = zodb.InvalidOid lastOid = zodb.InvalidOid
...@@ -864,9 +861,27 @@ func (m *Master) service(ctx context.Context) (err error) { ...@@ -864,9 +861,27 @@ func (m *Master) service(ctx context.Context) (err error) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
serviced := make(chan serviceDone) servedq := make(chan serviceDone)
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
// goServe spawns serve task for a peer.
goServe := func(peer *_MasteredPeer) {
peer.wg.Go(func(peerCtx context.Context) error {
ctx, cancel := xxcontext.Merge/*Cancel*/(ctx, peerCtx)
defer cancel()
ack := make(chan struct{})
servedq <- storRecovery{stor: stor, partTab: pt, err: err, ack: ack}
<-ack
// canceled service does not necessarily mean we should down the peer
if xcontext.Canceled(err) {
err = nil
}
return err
})
}
// spawn per-storage service driver // spawn per-storage service driver
for _, stor := range m.node.State.NodeTab.StorageList() { for _, stor := range m.node.State.NodeTab.StorageList() {
if stor.State == proto.RUNNING { // XXX note PENDING - not adding to service; ok? if stor.State == proto.RUNNING { // XXX note PENDING - not adding to service; ok?
...@@ -895,6 +910,16 @@ loop: ...@@ -895,6 +910,16 @@ loop:
// XXX tell storages to stop // XXX tell storages to stop
break loop break loop
// peer (should be) disconnected
case n := <-m.nodeLeave:
m.disconnectPeer(ctx, n.peer)
// if cluster became non-operational - cancel service
if !m.node.State.PartTab.OperationalWith(m.node.State.NodeTab) {
err = errClusterDegraded
break loop
}
// new connection comes in // new connection comes in
case n := <-m.nodeComeq: case n := <-m.nodeComeq:
peer, ok := m.identify(ctx, n, /* XXX accept everyone */) peer, ok := m.identify(ctx, n, /* XXX accept everyone */)
...@@ -925,21 +950,10 @@ loop: ...@@ -925,21 +950,10 @@ loop:
serviced <- serviceDone{node: node, err: err} serviced <- serviceDone{node: node, err: err}
}() }()
case d := <-serviced: case d := <-servedq:
// TODO if S goes away -> check partTab still operational -> if not - recovery // TODO if S goes away -> check partTab still operational -> if not - recovery
_ = d _ = d
/*
// XXX reenable? -> no - do it ^^^ in <-serviced (?)
case n := <-m.nodeLeave:
n.node.SetState(proto.DOWN)
// if cluster became non-operational - cancel service
if !m.node.State.PartTab.OperationalWith(m.node.State.NodeTab) {
err = errClusterDegraded
break loop
}
*/
// XXX what else ? (-> txn control at least) // XXX what else ? (-> txn control at least)
} }
...@@ -947,6 +961,7 @@ loop: ...@@ -947,6 +961,7 @@ loop:
// XXX wait all spawned service workers // XXX wait all spawned service workers
wg.Wait()
return err return err
} }
...@@ -1046,7 +1061,9 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms ...@@ -1046,7 +1061,9 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms
// disconnectPeer resets link to the peer and sets its state to DOWN in nodeTab. // disconnectPeer resets link to the peer and sets its state to DOWN in nodeTab.
// other peers are notified with δnodeTab about it. // other peers are notified with δnodeTab about it.
// XXX place=?
func (m *Master) disconnectPeer(ctx context.Context, peer *_MasteredPeer) { func (m *Master) disconnectPeer(ctx context.Context, peer *_MasteredPeer) {
// XXX log?
peer.node.ResetLink(ctx) peer.node.ResetLink(ctx)
delete(m.peerTab, peer.node.NID) delete(m.peerTab, peer.node.NID)
m.updateNodeState(ctx, peer.node, proto.DOWN) m.updateNodeState(ctx, peer.node, proto.DOWN)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment