Commit 6cf09f94 authored by Kirill Smelkov's avatar Kirill Smelkov

X Master is ready to start in case of new cluster

parent eff3e3e4
...@@ -100,13 +100,25 @@ func clusterState(cs *neo.ClusterState, v neo.ClusterState) *traceClusterState { ...@@ -100,13 +100,25 @@ func clusterState(cs *neo.ClusterState, v neo.ClusterState) *traceClusterState {
// nodetab entry changed // nodetab entry changed
type traceNode struct { type traceNode struct {
NodeTab unsafe.Pointer // *neo.NodeTable XXX not to noise test diff output NodeTab unsafe.Pointer // *neo.NodeTable XXX not to noise test diff
NodeInfo neo.NodeInfo NodeInfo neo.NodeInfo
} }
func (t *MyTracer) traceNode(nt *neo.NodeTable, n *neo.Node) { func (t *MyTracer) traceNode(nt *neo.NodeTable, n *neo.Node) {
t.Trace1(&traceNode{unsafe.Pointer(nt), n.NodeInfo}) t.Trace1(&traceNode{unsafe.Pointer(nt), n.NodeInfo})
} }
// master ready to start changed
type traceMStartReady struct {
Master unsafe.Pointer // *Master XXX not to noise test diff
Ready bool
}
func (t *MyTracer) traceMasterStartReady(m *Master, ready bool) {
t.Trace1(masterStartReady(m, ready))
}
func masterStartReady(m *Master, ready bool) *traceMStartReady {
return &traceMStartReady{unsafe.Pointer(m), ready}
}
// vclock is a virtual clock // vclock is a virtual clock
// XXX place -> util? // XXX place -> util?
...@@ -144,6 +156,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -144,6 +156,7 @@ func TestMasterStorage(t *testing.T) {
neo_traceConnSendPre_Attach(pg, tracer.traceNeoConnSendPre) neo_traceConnSendPre_Attach(pg, tracer.traceNeoConnSendPre)
neo_traceClusterStateChanged_Attach(pg, tracer.traceClusterState) neo_traceClusterStateChanged_Attach(pg, tracer.traceClusterState)
neo_traceNodeChanged_Attach(pg, tracer.traceNode) neo_traceNodeChanged_Attach(pg, tracer.traceNode)
traceMasterStartReady_Attach(pg, tracer.traceMasterStartReady)
tracing.Unlock() tracing.Unlock()
...@@ -159,11 +172,11 @@ func TestMasterStorage(t *testing.T) { ...@@ -159,11 +172,11 @@ func TestMasterStorage(t *testing.T) {
return a return a
} }
// shortcut for net tx event // // shortcut for net tx event
// XXX -> NetTx ? // // XXX -> NetTx ?
nettx := func(src, dst, pkt string) *xnet.TraceTx { // nettx := func(src, dst, pkt string) *xnet.TraceTx {
return &xnet.TraceTx{Src: xaddr(src), Dst: xaddr(dst), Pkt: []byte(pkt)} // return &xnet.TraceTx{Src: xaddr(src), Dst: xaddr(dst), Pkt: []byte(pkt)}
} // }
// shortcut for net connect event // shortcut for net connect event
// XXX -> NetConnect ? // XXX -> NetConnect ?
...@@ -211,13 +224,13 @@ func TestMasterStorage(t *testing.T) { ...@@ -211,13 +224,13 @@ func TestMasterStorage(t *testing.T) {
_ = err // XXX _ = err // XXX
}) })
// M starts listening
// expect:
tc.Expect(netlisten("m:1")) tc.Expect(netlisten("m:1"))
tc.Expect(node(M.nodeTab, "m:1", neo.MASTER, 1, neo.RUNNING, 0.0)) tc.Expect(node(M.nodeTab, "m:1", neo.MASTER, 1, neo.RUNNING, 0.0))
tc.Expect(clusterState(&M.clusterState, neo.ClusterRecovering)) tc.Expect(clusterState(&M.clusterState, neo.ClusterRecovering))
// TODO create C; C tries connect to master - rejected ("not yet operational")
// start storage // start storage
zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs") zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs")
S := NewStorage("abc1", "m:1", ":1", Shost, zstor) S := NewStorage("abc1", "m:1", ":1", Shost, zstor)
...@@ -228,16 +241,11 @@ func TestMasterStorage(t *testing.T) { ...@@ -228,16 +241,11 @@ func TestMasterStorage(t *testing.T) {
_ = err // XXX _ = err // XXX
}) })
// expect: // S starts listening
tc.Expect(netlisten("s:1")) tc.Expect(netlisten("s:1"))
tc.Expect(netconnect("s:2", "m:2", "m:1"))
//tc.ExpectPar(
// nettx("s:1", "m:1", "\x00\x00\x00\x01"), // handshake
// nettx("m:1", "s:1", "\x00\x00\x00\x01"),
//)
_ = nettx
// S connects M
tc.Expect(netconnect("s:2", "m:2", "m:1"))
tc.Expect(conntx("s:2", "m:2", 1, &neo.RequestIdentification{ tc.Expect(conntx("s:2", "m:2", 1, &neo.RequestIdentification{
NodeType: neo.STORAGE, NodeType: neo.STORAGE,
NodeUUID: 0, NodeUUID: 0,
...@@ -256,7 +264,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -256,7 +264,7 @@ func TestMasterStorage(t *testing.T) {
YourNodeUUID: neo.UUID(neo.STORAGE, 1), YourNodeUUID: neo.UUID(neo.STORAGE, 1),
})) }))
// TODO test ID rejects // TODO test ID rejects (uuid already registered, ...)
// M starts recovery on S // M starts recovery on S
tc.Expect(conntx("m:2", "s:2", 1, &neo.Recovery{})) tc.Expect(conntx("m:2", "s:2", 1, &neo.Recovery{}))
...@@ -273,6 +281,14 @@ func TestMasterStorage(t *testing.T) { ...@@ -273,6 +281,14 @@ func TestMasterStorage(t *testing.T) {
RowList: []neo.RowInfo{}, RowList: []neo.RowInfo{},
})) }))
// M ready to start: new cluster, no in-progress S recovery
tc.Expect(masterStartReady(M, true))
// XXX M.partTab = ø
// XXX M can start -> writes parttab to S and goes to verification
// XXX M.partTab <- ... // XXX M.partTab <- ...
// XXX updated something cluster currently can be operational // XXX updated something cluster currently can be operational
......
...@@ -66,7 +66,6 @@ type Master struct { ...@@ -66,7 +66,6 @@ type Master struct {
nodeCome chan nodeCome // node connected XXX -> acceptq? nodeCome chan nodeCome // node connected XXX -> acceptq?
nodeLeave chan nodeLeave // node disconnected XXX -> don't need nodeLeave chan nodeLeave // node disconnected XXX -> don't need
// so tests could override // so tests could override
monotime func() float64 monotime func() float64
} }
...@@ -268,6 +267,8 @@ func (m *Master) runMain(ctx context.Context) (err error) { ...@@ -268,6 +267,8 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// - retrieve and recover latest previously saved partition table from storages // - retrieve and recover latest previously saved partition table from storages
// - monitor whether partition table becomes operational wrt currently up nodeset // - monitor whether partition table becomes operational wrt currently up nodeset
// - if yes - finish recovering upon receiving "start" command XXX or autostart // - if yes - finish recovering upon receiving "start" command XXX or autostart
// - start is also allowed if storages connected and say there is no partition
// table saved to them (empty new cluster case).
// storRecovery is result of 1 storage node passing recovery phase // storRecovery is result of 1 storage node passing recovery phase
type storRecovery struct { type storRecovery struct {
...@@ -290,14 +291,18 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -290,14 +291,18 @@ func (m *Master) recovery(ctx context.Context) (err error) {
ctx, rcancel := context.WithCancel(ctx) ctx, rcancel := context.WithCancel(ctx)
defer rcancel() defer rcancel()
//trace:event traceMasterStartReady(m *Master, ready bool)
readyToStart := false readyToStart := false
recovery := make(chan storRecovery) recovery := make(chan storRecovery)
inprogress := 0 // in-progress stor recoveries
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
// start recovery on all storages we are currently in touch with // start recovery on all storages we are currently in touch with
// XXX close links to clients // XXX close links to clients
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.nodeTab.StorageList() {
if stor.NodeState > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ? if stor.NodeState > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
...@@ -314,16 +319,21 @@ loop: ...@@ -314,16 +319,21 @@ loop:
node, resp := m.identify(ctx, n, /* XXX only accept storages -> PENDING */) node, resp := m.identify(ctx, n, /* XXX only accept storages -> PENDING */)
// XXX set node.State = PENDING // XXX set node.State = PENDING
// if new storage arrived - start recovery on it too if node == nil {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
if node == nil {
m.reject(ctx, n.conn, resp) m.reject(ctx, n.conn, resp)
}()
return return
} }
// if new storage arrived - start recovery on it too
inprogress++
wg.Add(1)
go func() {
defer wg.Done()
err := m.accept(ctx, n.conn, resp) err := m.accept(ctx, n.conn, resp)
if err != nil { if err != nil {
// XXX move this m.nodeLeave <- to accept() ? // XXX move this m.nodeLeave <- to accept() ?
...@@ -338,6 +348,8 @@ loop: ...@@ -338,6 +348,8 @@ loop:
// a storage node came through recovery - let's see whether // a storage node came through recovery - let's see whether
// ptid ↑ and if so we should take partition table from there // ptid ↑ and if so we should take partition table from there
case r := <-recovery: case r := <-recovery:
inprogress--
if r.err != nil { if r.err != nil {
log.Error(ctx, r.err) log.Error(ctx, r.err)
...@@ -359,10 +371,27 @@ loop: ...@@ -359,10 +371,27 @@ loop:
} }
// update indicator whether cluster currently can be operational or not // update indicator whether cluster currently can be operational or not
readyToStart = m.partTab.OperationalWith(m.nodeTab) // XXX + node state var ready bool
if m.partTab.PTid == 0 {
// new cluster - allow startup if we have some storages passed
// recovery and there is no in-progress recovery running
nok := 0
for _, stor := range m.nodeTab.StorageList() {
if stor.NodeState > neo.DOWN {
nok++
}
}
ready = (nok > 0 && inprogress == 0)
// XXX handle case of new cluster - when no storage reports valid parttab } else {
// XXX -> create new parttab ready = m.partTab.OperationalWith(m.nodeTab) // XXX + node state
}
if readyToStart != ready {
readyToStart = ready
traceMasterStartReady(m, ready)
}
// XXX -> create new parttab for new-cluster case
// request to start the cluster - if ok we exit replying ok // request to start the cluster - if ok we exit replying ok
......
// Code generated by lab.nexedi.com/kirr/go123/tracing/cmd/gotrace; DO NOT EDIT.
package server
// code generated for tracepoints
import (
"lab.nexedi.com/kirr/neo/go/xcommon/tracing"
"unsafe"
)
// traceevent: traceMasterStartReady(m *Master, ready bool)
type _t_traceMasterStartReady struct {
tracing.Probe
probefunc func(m *Master, ready bool)
}
var _traceMasterStartReady *_t_traceMasterStartReady
func traceMasterStartReady(m *Master, ready bool) {
if _traceMasterStartReady != nil {
_traceMasterStartReady_run(m, ready)
}
}
func _traceMasterStartReady_run(m *Master, ready bool) {
for p := _traceMasterStartReady; p != nil; p = (*_t_traceMasterStartReady)(unsafe.Pointer(p.Next())) {
p.probefunc(m, ready)
}
}
func traceMasterStartReady_Attach(pg *tracing.ProbeGroup, probe func(m *Master, ready bool)) *tracing.Probe {
p := _t_traceMasterStartReady{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceMasterStartReady)), &p.Probe)
return &p.Probe
}
// trace export signature
func _trace_exporthash_1002eef247af7731924a09f42e9c3f4131f5bccb() {}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment