Commit b84a150b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a3f6517c
......@@ -361,7 +361,7 @@ func (m *Master) main(ctx context.Context) (err error) {
// storRecovery is result of 1 storage node passing recovery phase.
type storRecovery struct {
stor *xneo.PeerNode
stor *_MasteredPeer
partTab *xneo.PartitionTable
err error
......@@ -424,8 +424,10 @@ func (m *Master) recovery(ctx context.Context) (err error) {
defer cancel()
var pt *xneo.PartitionTable
err := stor.run(ctx, func(...) {
err := stor.run(ctx, func() error {
var err error
pt, err = storCtlRecovery(ctx, stor)
return err
})
ack := make(chan struct{})
......@@ -486,18 +488,19 @@ loop:
// XXX -> move to func
peer := n.peer
peer.node.Link().Close() // XXX err
peer.node.ResetLink()
peer.node.ResetLink(ctx)
delete(m.peerTab, peer.node.NID)
m.updateNodeState(ctx, peer.node, proto.DOWN)
updateReadyToStart()
// new connection comes in and asks to be identified
// node comes in and asks to be identified
case n := <-m.nodeComeq:
peer, ok := m.identify(ctx, n,
/* XXX only accept storages -> PENDING | MASTER */
// XXX only accept:
// - S -> PENDING
// - M
)
if !ok {
break
......@@ -590,7 +593,7 @@ loop2:
// storCtlRecovery drives a storage node during cluster recovering state.
// it retrieves various ids and partition table from as stored on the storage
func storCtlRecovery(ctx context.Context, stor *_MasteredPeer) (_ *xneo.PartTab, err error) {
func storCtlRecovery(ctx context.Context, stor *_MasteredPeer) (_ *xneo.PartitionTable, err error) {
slink := stor.node.Link()
defer task.Runningf(&ctx, "%s: stor recovery", stor.node.NID)(&err)
......@@ -645,22 +648,34 @@ func (m *Master) verify(ctx context.Context) (err error) {
ctx, vcancel := context.WithCancel(ctx)
defer vcancel()
verify := make(chan storVerify)
verifyq := make(chan storVerify)
inprogress := 0
wg := &sync.WaitGroup{}
// NOTE we don't reset m.lastOid / m.lastTid to 0 in the beginning of verification
// XXX (= py), rationale=?
goStorCtlVerify := func(stor *_MasteredPeer) {
// XXX rework
inprogress++
wg.Add(1)
go func() {
defer wg.Done()
err := m.accept(node, state0, n.req, resp)
if err != nil {
verifyq <- storVerify{stor: node, err: err}
return
}
storCtlVerify(ctx, node, m.node.State.PartTab, verifyq)
}()
}
// start verification on all storages we are currently in touch with
for _, stor := range m.node.State.NodeTab.StorageList() {
if stor.State > proto.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++
wg.Add(1)
go func() {
defer wg.Done()
storCtlVerify(ctx, stor, m.node.State.PartTab, verify)
}()
for _, peer := range m.peerTab() {
if peer.node.Type == proto.STORAGE {
goStorCtlVerify(peer)
}
}
......@@ -679,31 +694,9 @@ loop:
err = errStopRequested
break loop
case n := <-m.nodeComeq:
node, state0, resp := m.identify(ctx, n, /* XXX only accept storages -> known ? RUNNING : PENDING */)
if node == nil {
goreject(ctx, wg, n.req, resp)
break
}
// new storage arrived - start verification on it too
inprogress++
wg.Add(1)
go func() {
defer wg.Done()
err := m.accept(node, state0, n.req, resp)
if err != nil {
verify <- storVerify{stor: node, err: err}
return
}
storCtlVerify(ctx, node, m.node.State.PartTab, verify)
}()
/* XXX reenable
case n := <-m.nodeLeave:
// peer (should be) disconnected
case n := <-m.nodeLeaveq:
// XXX update
n.node.SetState(proto.DOWN)
// if cluster became non-operational - we cancel verification
......@@ -714,24 +707,35 @@ loop:
err = errClusterDegraded
break loop
}
*/
// node comes in and asks to be identified
case n := <-m.nodeComeq:
peer, ok := m.identify(ctx, n,
// XXX only accept:
// - S -> known ? RUNNING : PENDING
// - M
)
if !ok {
break
}
// S -> start verification on it too
if peer.node.Type == proto.STORAGE {
goStorCtlVerify(peer)
}
// a storage node came through verification - adjust our last{Oid,Tid} if ok
// on error check - whether cluster became non-operational and stop verification if so
//
// FIXME actually implement logic to decide to finish/rollback transactions
case v := <-verify:
case v := <-verifyq:
// XXX ack
inprogress--
if v.err != nil {
log.Error(ctx, v.err)
if !xcontext.Canceled(v.err) {
v.stor.ResetLink(ctx)
m.updateNodeState(ctx, v.stor, proto.DOWN)
// XXX nodeLeave <-
}
// check partTab is still operational
// if not -> cancel to go back to recovery
if !m.node.State.PartTab.OperationalWith(m.node.State.NodeTab) {
......@@ -760,8 +764,8 @@ loop:
loop2:
for {
select {
case v := <-verify:
// XXX dup wrt <-verify handler above
case v := <-verifyq:
// XXX dup wrt <-verifyq handler above
log.Error(ctx, v.err)
if !xcontext.Canceled(v.err) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment