Commit 0e00297d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 37bb05e1
...@@ -59,14 +59,15 @@ func tracetestMasterStorage(t0 *tracetest.T) { ...@@ -59,14 +59,15 @@ func tracetestMasterStorage(t0 *tracetest.T) {
M := t.NewMaster("m") M := t.NewMaster("m")
zstor := xfs1stor("../zodb/storage/fs1/testdata/1.fs") zstor := xfs1stor("../zodb/storage/fs1/testdata/1.fs")
zback := xfs1back("../zodb/storage/fs1/testdata/1.fs") zback := xfs1back("../zodb/storage/fs1/testdata/1.fs")
S := t.NewStorage("s", "m:1", zback) // XXX do we need to provide Mlist here? _ = t.NewStorage("s", "m:1", zback) // XXX do we need to provide Mlist here?
C := t.NewClient("c", "m:1") C := t.NewClient("c", "m:1")
lastTid, err := zstor.Sync(bg); X(err) lastTid, err := zstor.Sync(bg); X(err)
/*
// start nodes XXX move starting to TestCluster? // start nodes XXX move starting to TestCluster?
gwg := xsync.NewWorkGroup(bg) gwg := xsync.NewWorkGroup(bg)
//defer xwait(gwg) XXX not yet correctly stopped on context cancel defer xwait(gwg) // FIXME not yet correctly stopped on context cancel
gwg.Go(func(ctx context.Context) error { gwg.Go(func(ctx context.Context) error {
return M.Run(ctx) return M.Run(ctx)
...@@ -77,6 +78,7 @@ func tracetestMasterStorage(t0 *tracetest.T) { ...@@ -77,6 +78,7 @@ func tracetestMasterStorage(t0 *tracetest.T) {
gwg.Go(func(ctx context.Context) error { gwg.Go(func(ctx context.Context) error {
return C.Run(ctx) return C.Run(ctx)
}) })
*/
tM := t.Checker("m") tM := t.Checker("m")
tS := t.Checker("s") tS := t.Checker("s")
......
...@@ -27,7 +27,9 @@ import ( ...@@ -27,7 +27,9 @@ import (
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xnet/pipenet" "lab.nexedi.com/kirr/go123/xnet/pipenet"
"lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/internal/xcontext"
"lab.nexedi.com/kirr/neo/go/internal/xtracing/tracetest" "lab.nexedi.com/kirr/neo/go/internal/xtracing/tracetest"
"lab.nexedi.com/kirr/neo/go/neo/storage" "lab.nexedi.com/kirr/neo/go/neo/storage"
...@@ -45,11 +47,13 @@ type tCluster struct { ...@@ -45,11 +47,13 @@ type tCluster struct {
gotracer *TraceCollector // for tracing go nodes XXX -> GoTracer gotracer *TraceCollector // for tracing go nodes XXX -> GoTracer
//tpy *PyTracer // for tracing py nodes //tpy *PyTracer // for tracing py nodes
erouter *EventRouter // to which stream an event should go
erouter *EventRouter
tabMu sync.Mutex tabMu sync.Mutex
nodeTab map[string/*node*/]*tNode nodeTab map[string/*node*/]*tNode
runWG *xsync.WorkGroup // started nodes are .Run under runWG
runCancel func()
} }
// tNode represents information about a test node ... XXX // tNode represents information about a test node ... XXX
...@@ -80,11 +84,12 @@ type ITestClient interface { ...@@ -80,11 +84,12 @@ type ITestClient interface {
// XXX defer t.Stop() // XXX defer t.Stop()
func tNewCluster(ttest *tracetest.T, name string) *tCluster { func tNewCluster(ttest *tracetest.T, name string) *tCluster {
t := &tCluster{ t := &tCluster{
name: name, T: ttest,
name: name,
network: pipenet.New("testnet"), // test network network: pipenet.New("testnet"), // test network
nodeTab: make(map[string]*tNode), nodeTab: make(map[string]*tNode),
T: ttest,
} }
t.erouter = NewEventRouter() t.erouter = NewEventRouter()
...@@ -92,6 +97,11 @@ func tNewCluster(ttest *tracetest.T, name string) *tCluster { ...@@ -92,6 +97,11 @@ func tNewCluster(ttest *tracetest.T, name string) *tCluster {
ttest.SetEventRouter(t.erouter.RouteEvent) ttest.SetEventRouter(t.erouter.RouteEvent)
t.gotracer.Attach() t.gotracer.Attach()
runCtx, runCancel := context.WithCancel(context.Background())
t.runWG = xsync.NewWorkGroup(runCtx)
t.runCancel = runCancel
return t return t
} }
...@@ -99,8 +109,18 @@ func tNewCluster(ttest *tracetest.T, name string) *tCluster { ...@@ -99,8 +109,18 @@ func tNewCluster(ttest *tracetest.T, name string) *tCluster {
// //
// All processes of the cluster are stopped ... XXX // All processes of the cluster are stopped ... XXX
func (t *tCluster) Stop() { func (t *tCluster) Stop() {
t.Helper()
t.gotracer.Detach() t.gotracer.Detach()
//XXX t.pytracer.Detach() //XXX t.pytracer.Detach()
t.runCancel()
err := t.runWG.Wait()
if xcontext.Canceled(err) { // cancel is expected
err = nil
}
if err != nil {
t.Fatalf("run shutdown: %s", err)
}
} }
// Checker returns tracechecker corresponding to name. // Checker returns tracechecker corresponding to name.
...@@ -181,6 +201,11 @@ func (t *tCluster) NewMaster(name string) ITestMaster { ...@@ -181,6 +201,11 @@ func (t *tCluster) NewMaster(name string) ITestMaster {
// let tracer know how to map state addresses to node names // let tracer know how to map state addresses to node names
t.gotracer.RegisterNode(m.node, name) t.gotracer.RegisterNode(m.node, name)
// start the node
t.runWG.Go(func(ctx context.Context) error {
return m.Run(ctx)
})
return m return m
} }
...@@ -188,6 +213,9 @@ func (t *tCluster) NewStorage(name, masterAddr string, back storage.Backend) ITe ...@@ -188,6 +213,9 @@ func (t *tCluster) NewStorage(name, masterAddr string, back storage.Backend) ITe
node := t.registerNewNode(name) node := t.registerNewNode(name)
s := tNewStorage(t.name, masterAddr, ":1", node.net, back) s := tNewStorage(t.name, masterAddr, ":1", node.net, back)
t.gotracer.RegisterNode(s.node, name) t.gotracer.RegisterNode(s.node, name)
t.runWG.Go(func(ctx context.Context) error {
return s.Run(ctx)
})
return s return s
} }
...@@ -195,6 +223,9 @@ func (t *tCluster) NewClient(name, masterAddr string) ITestClient { ...@@ -195,6 +223,9 @@ func (t *tCluster) NewClient(name, masterAddr string) ITestClient {
node := t.registerNewNode(name) node := t.registerNewNode(name)
c := NewClient(t.name, masterAddr, node.net) c := NewClient(t.name, masterAddr, node.net)
t.gotracer.RegisterNode(c.node, name) t.gotracer.RegisterNode(c.node, name)
t.runWG.Go(func(ctx context.Context) error {
return c.Run(ctx)
})
return c return c
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment