Commit 0beb359a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ff17df41
......@@ -322,21 +322,22 @@ func TestMasterStorage(t *testing.T) {
LastTid: lastTid,
}))
// XXX M -> S ClusterInformation(VERIFICATION) ?
// expect:
// ? M -> S ClusterInformation(VERIFICATION)
// TODO there is actually txn to finish
// TODO S leave at verify
// TODO S join at verify
// TODO M.Stop() while verify
// + TODO there is actually txn to finish
// + TODO S leave at verify
// + TODO S join at verify
// + TODO M.Stop() while verify
// verification ok; M start service
// TODO
// expect:
// M.clusterState <- RUNNING + TODO it should be sent to S
// + TODO S leave while service
// + TODO S join while service
// + TODO M.Stop while service
// TODO S leave while service
// TODO S join while service
// TODO M.Stop while service
// + TODO Client connects here ?
......
......@@ -296,7 +296,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
recovery := make(chan storRecovery)
inprogress := 0 // in-progress stor recoveries
wg := sync.WaitGroup{}
wg := &sync.WaitGroup{}
// start recovery on all storages we are currently in touch with
// XXX close links to clients
......@@ -320,11 +320,7 @@ loop:
// XXX set node.State = PENDING
if node == nil {
wg.Add(1)
go func() {
defer wg.Done()
m.reject(ctx, n.conn, resp)
}()
goreject(ctx, wg, n.conn, resp)
return
}
......@@ -336,7 +332,6 @@ loop:
err := m.accept(ctx, n.conn, resp)
if err != nil {
// XXX move this m.nodeLeave <- to accept() ?
recovery <- storRecovery{stor: node, err: err}
return
}
......@@ -347,6 +342,9 @@ loop:
// a storage node came through recovery - let's see whether
// ptid ↑ and if so we should take partition table from there
//
// FIXME after a storage recovers, it could go away while
// recovery for others is in progress -> monitor this.
case r := <-recovery:
inprogress--
......@@ -354,6 +352,7 @@ loop:
log.Error(ctx, r.err)
if !xcontext.Canceled(errors.Cause(r.err)) {
// XXX dup wrt vvv (loop2)
log.Infof(ctx, "%v: closing link", r.stor.Link)
// close stor link / update .nodeTab
......@@ -391,7 +390,6 @@ loop:
readyToStart = ready
traceMasterStartReady(m, ready)
}
// XXX -> create new parttab for new-cluster case
// request to start the cluster - if ok we exit replying ok
......@@ -442,8 +440,12 @@ loop2:
log.Error(ctx, r.err)
if !xcontext.Canceled(errors.Cause(r.err)) {
// XXX not so ok
// FIXME log / close node link; update NT
// XXX -> r.stor.CloseLink(ctx) ?
log.Infof(ctx, "%v: closing link", r.stor.Link)
// close stor link / update .nodeTab
lclose(ctx, r.stor.Link)
m.nodeTab.SetNodeState(r.stor, neo.DOWN)
}
case <-done:
......@@ -542,7 +544,7 @@ var errClusterDegraded = stderrors.New("cluster became non-operatonal")
// - once we are done without loosing too much storages in the process (so that
// partition table is still operational) we are ready to enter servicing state.
// verify drives cluster via verification phase
// verify drives cluster during verification phase
//
// when verify finishes error indicates:
// - nil: verification completed ok; cluster is ready to enter running state
......@@ -553,14 +555,12 @@ func (m *Master) verify(ctx context.Context) (err error) {
defer running(&ctx, "verify")(&err)
m.setClusterState(neo.ClusterVerifying)
vctx, vcancel := context.WithCancel(ctx)
ctx, vcancel := context.WithCancel(ctx)
defer vcancel()
verify := make(chan storVerify)
inprogress := 0
// verification = ask every storage to verify and wait for all them to complete
// XXX "all them" -> "enough of them"?
wg := &sync.WaitGroup{}
// NOTE we don't reset m.lastOid / m.lastTid to 0 in the beginning of verification
// XXX (= py), rationale=?
......@@ -569,7 +569,11 @@ func (m *Master) verify(ctx context.Context) (err error) {
for _, stor := range m.nodeTab.StorageList() {
if stor.State > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++
go storCtlVerify(vctx, stor, m.partTab, verify)
wg.Add(1)
go func() {
defer wg.Done()
storCtlVerify(ctx, stor, m.partTab, verify)
}()
}
}
......@@ -578,16 +582,26 @@ loop:
select {
case n := <-m.nodeCome:
node, resp := m.identify(ctx, n, /* XXX only accept storages -> known ? RUNNING : PENDING */)
// XXX handle resp ^^^ like in recover
_, ok := resp.(*neo.AcceptIdentification)
if !ok {
break
if node == nil {
goreject(ctx, wg, n.conn, resp)
return
}
// new storage arrived - start verification on it too
// XXX ok? or it must first go through recovery check?
inprogress++
go storCtlVerify(vctx, node, m.partTab, verify)
wg.Add(1)
go func() {
defer wg.Done()
err := m.accept(ctx, n.conn, resp)
if err != nil {
verify <- storVerify{stor: node, err: err}
return
}
storCtlVerify(ctx, node, m.partTab, verify)
}()
case n := <-m.nodeLeave:
m.nodeTab.SetNodeState(n.node, neo.DOWN)
......@@ -603,14 +617,22 @@ loop:
// a storage node came through verification - adjust our last{Oid,Tid} if ok
// on error check - whether cluster became non-operational and stop verification if so
//
// FIXME actually implement logic to decide to finish/rollback transactions
case v := <-verify:
inprogress--
if v.err != nil {
log.Error(ctx, v.err)
// mark storage as non-working in nodeTab
m.nodeTab.SetNodeState(v.node, neo.DOWN)
if !xcontext.Canceled(errors.Cause(v.err)) {
// XXX dup wrt recovery ^^^
log.Infof(ctx, "%s: closing link", v.stor.Link)
// mark storage as non-working in nodeTab
lclose(ctx, v.stor.Link)
m.nodeTab.SetNodeState(v.stor, neo.DOWN)
}
// check partTab is still operational
// if not -> cancel to go back to recovery
......@@ -628,9 +650,6 @@ loop:
}
}
// XXX if m.partTab.OperationalWith(&.nodeTab, RUNNING) -> break (ok)
case ech := <-m.ctlStart:
ech <- nil // we are already starting
......@@ -645,20 +664,48 @@ loop:
}
}
/*
// consume left verify responses (which should come without delay since it was cancelled)
for ; inprogress > 0; inprogress-- {
<-verify
}
*/
// wait all workers to finish (which should come without delay since it was cancelled)
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
loop2:
for {
select {
case v := <-verify:
// XXX dup wrt <-verify handler above
log.Error(ctx, v.err)
if !xcontext.Canceled(errors.Cause(v.err)) {
log.Infof(ctx, "%v: closing link", v.stor.Link)
// close stor link / update .nodeTab
lclose(ctx, v.stor.Link)
m.nodeTab.SetNodeState(v.stor, neo.DOWN)
}
case <-done:
break loop2
}
}
return err
}
// storVerify is result of a storage node passing verification phase
type storVerify struct {
node *neo.Node
stor *neo.Node
lastOid zodb.Oid
lastTid zodb.Tid
// link *neo.NodeLink // XXX -> Node
err error
}
......@@ -670,7 +717,7 @@ func storCtlVerify(ctx context.Context, stor *neo.Node, pt *neo.PartitionTable,
var err error
defer func() {
if err != nil {
res <- storVerify{node: stor, err: err}
res <- storVerify{stor: stor, err: err}
}
}()
defer runningf(&ctx, "%s: stor verify", stor.Link)(&err)
......@@ -705,7 +752,7 @@ func storCtlVerify(ctx context.Context, stor *neo.Node, pt *neo.PartitionTable,
}
// send results to driver
res <- storVerify{node: stor, lastOid: last.LastOid, lastTid: last.LastTid}
res <- storVerify{stor: stor, lastOid: last.LastOid, lastTid: last.LastTid}
}
......@@ -984,7 +1031,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp
}
// reject sends rejective identification response and closes associated link
func (m *Master) reject(ctx context.Context, conn *neo.Conn, resp neo.Msg) {
func reject(ctx context.Context, conn *neo.Conn, resp neo.Msg) {
// XXX cancel on ctx?
// XXX log?
err1 := conn.Send(resp)
......@@ -996,6 +1043,13 @@ func (m *Master) reject(ctx context.Context, conn *neo.Conn, resp neo.Msg) {
}
}
// goreject spawns reject in separate goroutine properly added/done on wg
func goreject(ctx context.Context, wg *sync.WaitGroup, conn *neo.Conn, resp neo.Msg) {
wg.Add(1)
defer wg.Done()
reject(ctx, conn, resp)
}
// accept sends acceptive identification response and closes conn
// XXX if problem -> .nodeLeave
// XXX spawn ping goroutine from here?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment