Commit f3effa6c authored by Kirill Smelkov's avatar Kirill Smelkov

X move tracetest to go123

kirr/go123@c8d9907e
parent 9d07a6b6
// Copyright (C) 2017-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package tracetest
// synchronous channels.
import (
"errors"
"flag"
"fmt"
"reflect"
"time"
)
var (
chatty = flag.Bool("tracetest.v", false, "verbose: print events as they are sent on trace channels")
deadTime = flag.Duration("tracetest.deadtime", 3*time.Second, "time after which no events activity is considered to be a deadlock")
)
// _Msg represents message with 1 event sent over _chan.
//
// The goroutine which sent the message will wait for Ack before continue.
type _Msg struct {
Event interface {}
ack chan<- error // nil on Ack; !nil on nak
}
// _chan provides synchronous channel associated with a stream.
//
// It comes with additional property that send blocks until receiving side
// explicitly acknowledges message was received and processed.
//
// New channels must be created via T.newChan.
//
// It is safe to use _chan from multiple goroutines simultaneously.
type _chan struct {
t *T // created for stream <.name> under <.t>
name string // name of the channel/stream
msgq chan *_Msg
down chan struct{} // becomes ready when closed
}
// Send sends event to a consumer and waits for ack.
// if main testing goroutine detects any problem Send panics.
func (ch *_chan) Send(event interface{}) {
if *chatty {
fmt.Printf("%s <- %T %v\n", ch.name, event, event)
}
ack := make(chan error)
select {
case <-ch.down:
ch.t.fatalfInNonMain("%s: send: channel was closed", ch.name)
case ch.msgq <- &_Msg{event, ack}:
err := <-ack
if err != nil {
ch.t.fatalfInNonMain("%s: send: %s", ch.name, err)
}
}
}
// Close closes the sending side of the channel.
func (ch *_chan) Close() {
close(ch.down) // note - not .msgq
}
// Recv receives message from a producer.
//
// The consumer, after dealing with the message, must send back an ack.
// Must be called from main testing thread.
func (ch *_chan) Recv() *_Msg {
t := ch.t; t.Helper()
msg := ch.recv()
if msg == nil {
t.Fatalf("%s: recv: deadlock\n", ch.name)
}
return msg
}
// RecvInto receives message from a producer, verifies that event type is the
// same as type of *event, and saves received event there.
//
// Must be called from main testing thread.
func (ch *_chan) RecvInto(eventp interface{}) *_Msg {
t := ch.t; t.Helper()
msg := ch.recv()
if msg == nil {
t.Fatalf("%s: recv: deadlock waiting for %T\n", ch.name, eventp)
}
reventp := reflect.ValueOf(eventp)
if reventp.Type().Elem() != reflect.TypeOf(msg.Event) {
t.queuenak(msg, "unexpected event type")
t.Fatalf("%s: expect: %s: got %T %v", ch.name, reventp.Elem().Type(), msg.Event, msg.Event)
}
// *eventp = msg.Event
reventp.Elem().Set(reflect.ValueOf(msg.Event))
return msg
}
func (ch *_chan) recv() *_Msg {
select {
case msg := <-ch.msgq:
return msg // ok
case <-time.After(*deadTime):
return nil // deadlock
}
}
// Ack acknowledges the event was processed and unblocks producer goroutine.
func (m *_Msg) Ack() {
m.ack <- nil
}
// nak tells sender that event verification failed and why.
// it is called only by tracetest internals.
func (m *_Msg) nak(why string) {
m.ack <- errors.New(why)
}
// nak represents scheduled call to `msg.nak(why)`.
type nak struct {
msg *_Msg
why string
}
// queuenak schedules call to `msg.nak(why)`.
func (t *T) queuenak(msg *_Msg, why string) {
t.nakq = append(t.nakq, nak{msg, why})
}
// newChan creates new _chan channel.
func (t *T) newChan(name string) *_chan {
// NOTE T ensures not to create channels with duplicate names.
return &_chan{t: t, name: name, msgq: make(chan *_Msg), down: make(chan struct{})}
}
// Copyright (C) 2018-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package tracetest_test demonstrates how to use package tracetest.
//
// It also serves as set of testcases for tracetest itself.
package tracetest_test
//go:generate gotrace gen .
import (
"fmt"
"os"
"os/exec"
"regexp"
"strings"
"sync"
"testing"
"time"
"lab.nexedi.com/kirr/go123/tracing"
"lab.nexedi.com/kirr/neo/go/internal/xtracing/tracetest"
)
// hi and hello are functions that emit "(Hi|Hello), <who>" and can be traced.
//trace:event traceHi(who string)
//trace:event traceHello(who string)
func hi(who string) {
traceHi(who)
fmt.Println("Hi,", who)
}
func hello(who string) {
traceHello(who)
fmt.Println("Hello,", who)
}
// we use tracing to attach probes to hi and hello, and emit corresponding
// eventHi and eventHello to tracetest.T from there.
type eventHi string
type eventHello string
func setupTracing(t *tracetest.T) *tracing.ProbeGroup {
pg := &tracing.ProbeGroup{}
tracing.Lock()
traceHi_Attach(pg, func(who string) {
t.RxEvent(eventHi(who))
})
traceHello_Attach(pg, func(who string) {
t.RxEvent(eventHello(who))
})
tracing.Unlock()
// NOTE pg.Done must be invoked by caller when setup tracing is no longer needed.
return pg
}
// routeEvent tells to which stream an event should go.
// Here, in example, we use the convention that who comes as "<threadID>·..."
// and we route the event to stream that corresponds to threadID.
func routeEvent(event interface{}) (stream string) {
who := ""
switch ev := event.(type) {
default:
panic(fmt.Sprintf("unexpected event type %T", event))
case eventHi:
who = string(ev)
case eventHello:
who = string(ev)
}
i := strings.Index(who, "·")
if i == -1 {
panic(fmt.Sprintf("who does not have threadID: %q", who))
}
return strings.ToLower(who[:i])
}
// verify calls tracetest.Verify on f with first preparing tracing setup and events delivery.
// It also verifies that tracetest detects errors as expected.
func verify(t *testing.T, f func(t *tracetest.T), targvExtra ...string) {
t.Helper()
verifyInSubprocess(t, func (t *testing.T) {
tracetest.Verify(t, func(t *tracetest.T) {
// setup tracing to deliver trace events to t.
pg := setupTracing(t)
defer pg.Done()
// tell t to which stream an event should go.
t.SetEventRouter(routeEvent)
// run test code
f(t)
})
}, targvExtra...)
}
// Test2ThreadsOK demonstrates verifying 2 threads that execute independently.
// There is no concurrency problem here.
func Test2ThreadsOK(t *testing.T) {
verify(t, func(t *tracetest.T) {
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(2)
go func() { // thread1
defer wg.Done()
hi("T1·A")
hello("T1·B")
}()
go func() { // thread2
defer wg.Done()
hello("T2·C")
hi("T2·D")
}()
// assert that events come as expected
// NOTE in checks t2 vs t1 order does not matter
t.Expect("t2", eventHello("T2·C"))
t.Expect("t2", eventHi("T2·D"))
t.Expect("t1", eventHi("T1·A"))
t.Expect("t1", eventHello("T1·B"))
})
}
// TestDeadlock demonstrates deadlock detection.
// XXX also test for wrong decomposition XXX or is it also covered by this test as well?
func TestDeadlock(t *testing.T) {
verify(t, func(t *tracetest.T) {
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() { // thread1
defer wg.Done()
hi("T1·A")
}()
// the checker expects something on stream "t2", but there is
// no event sent there -> deadlock.
t.Expect("t2", eventHi("zzz"))
}, "-tracetest.deadtime=0.5s")
}
// TestRace demonstrates detection of logical race.
func TestRace(t *testing.T) {
verify(t, func(t *tracetest.T) {
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(2)
// 2 threads should synchronize with each other and do step A before B.
// They do not properly synchronize though, and just happen to
// usually emit events in expected order due to sleep in T2.
// Tracetest detects that.
go func() { // thread1
defer wg.Done()
hi("x·A")
}()
go func() { // thread2
defer wg.Done()
time.Sleep(100*time.Millisecond)
hi("x·B")
}()
t.Expect("x", eventHi("x·A"))
t.Expect("x", eventHi("x·B"))
})
}
// other tests (mainly to verify tracetest itself)
// TestExpectType demonstrates Expect asserting with "unexpected event type".
func TestExpectType(t *testing.T) {
verify(t, func(t *tracetest.T) {
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() { // thread 1
defer wg.Done()
hi("T1·A")
}()
t.Expect("t1", eventHello("T1·A"))
})
}
// TestExpectValue demonstrates Expect asserting with "unexpected event value".
func TestExpectValue(t *testing.T) {
verify(t, func(t *tracetest.T) {
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() { // thread 1
defer wg.Done()
hi("T1·A")
}()
t.Expect("t1", eventHi("T1·B"))
})
}
// ----------------------------------------
// verifyInSubprocess runs f in subprocess and verifies that its output matches testExpectMap[t.Name].
func verifyInSubprocess(t *testing.T, f func(t *testing.T), targvExtra ...string) {
t.Helper()
if os.Getenv("TRACETEST_EX_VERIFY_IN_SUBPROCESS") == "1" {
f(t)
return
}
// spawn the test in subprocess and verify its output
expectOK, ok := testExpectMap[t.Name()]
if !ok {
panic(fmt.Sprintf("testExpectMap[%q] not defined", t.Name()))
}
outputOK := regexp.QuoteMeta(expectOK.output)
// empty line -> kind of "<BLANKLINE>"
for {
__ := strings.ReplaceAll(outputOK, "\n\n", "\n\\s*\n")
if __ == outputOK {
break
}
outputOK = __
}
outputOK = strings.ReplaceAll(outputOK, "<TIME>", ".+s")
outputOK = strings.ReplaceAll(outputOK, "<LINE>", "[0-9]+")
outputRe := regexp.MustCompile(outputOK)
argv := []string{"-test.run="+t.Name()}
argv = append(argv, targvExtra...)
cmd := exec.Command(os.Args[0], argv...)
cmd.Env = append(os.Environ(), "TRACETEST_EX_VERIFY_IN_SUBPROCESS=1")
bout, err := cmd.CombinedOutput() // NOTE `go test` itself combines everything to stdout only
out := string(bout)
ecode := 0
if testing.Verbose() {
t.Logf("stdout:\n%s\n", out)
}
if err != nil {
e, ok := err.(*exec.ExitError)
if !ok {
// e.g. could not respawn at all
t.Fatal(err)
}
ecode = e.ExitCode()
}
bad := ""
badf := func(format string, argv ...interface{}) {
bad += fmt.Sprintf(format+"\n", argv...)
}
if ecode != expectOK.exitCode {
badf("exit code: %d ; expected: %d", ecode, expectOK.exitCode)
}
if !outputRe.MatchString(out) {
badf("unexpected output:\n%s\nwant: ~\n%s\n", out, expectOK.output)
}
if bad != "" {
t.Fatal(bad)
}
}
// testExpect describes what result to expect from a test.
type testExpect struct {
exitCode int
output string
}
// testExpectMap maps <test name> -> testExpect.
var testExpectMap = map[string]testExpect{
"Test2ThreadsOK": {0, ""},
"TestDeadlock": {1,
`--- FAIL: TestDeadlock (<TIME>)
example_test.go:157: t2: recv: deadlock waiting for *tracetest_test.eventHi
example_test.go:157: test shutdown: #streams: 2, #(pending events): 1
t1 <- tracetest_test.eventHi T1·A
# t2
chan.go:<LINE>: t1: send: canceled (test failed)
`},
"TestRace": {1,
` --- FAIL: TestRace/delay@0(=x:0) (<TIME>)
example_test.go:183: x: expect: tracetest_test.eventHi:
want: x·A
have: x·B
diff:
-"x·A"
+"x·B"
`},
"TestExpectType": {1,
`--- FAIL: TestExpectType (<TIME>)
example_test.go:203: t1: expect: tracetest_test.eventHello: got tracetest_test.eventHi T1·A
example_test.go:203: test shutdown: #streams: 1, #(pending events): 0
# t1
chan.go:<LINE>: t1: send: unexpected event type
`},
"TestExpectValue": {1,
`--- FAIL: TestExpectValue (<TIME>)
example_test.go:219: t1: expect: tracetest_test.eventHi:
want: T1·B
have: T1·A
diff:
-"T1·B"
+"T1·A"
example_test.go:219: test shutdown: #streams: 1, #(pending events): 0
# t1
chan.go:<LINE>: t1: send: unexpected event data
`},
}
// Copyright (C) 2017-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package tracetest provides infrastructure for testing concurrent systems
// based on synchronous event tracing.
//
// A serial system can be verified by checking that its execution produces
// expected serial stream of events. But concurrent systems cannot be verified
// by exactly this way because events are only partly-ordered with respect to
// each other by causality or so called happens-before relation.
//
// However in a concurrent system one can decompose all events into serial
// streams in which events should be strictly ordered by causality with respect
// to each other. This decomposition in turn allows to verify that in every
// stream events happenned as expected.
//
// Verification of events for all streams can be done by one *sequential*
// process:
//
// - if events A and B in different streams are unrelated to each other by
// causality, the sequence of checks models a particular possible flow of
// time. Notably since events are delivered synchronously and sender is
// blocked until receiver/checker explicitly confirms event has been
// processed, by checking either A then B, or B then A allows to check
// for a particular race-condition.
//
// - if events A and B in different streams are related to each other by
// causality (i.e. there is some happens-before relation for them) the
// sequence of checking should represent that ordering relation.
//
// Basic package usage is as follows:
//
// func TestSomething(t *testing.T) {
// tracetest.Verify(t, func(t *tracetest.T) {
// // setup tracing so that events of test system are collected and
// // synchronously delivered to t.RxEvent. This can be done with e.g.
// // package lab.nexedi.com/kirr/go123/tracing or by other similar means.
// ...
//
// // tell t to which stream an event should go.
// t.SetEventRouter(...)
//
// // run the system and verify it produces expected events
//
// // <code to start the system>
// t.Expect("<stream₁>", eventOk₁)
// t.Expect("<stream₂>", eventOk₂)
// ...
//
// // <code to further control/affect the system>
// t.Expect("<stream₃>", eventOk₃)
// t.Expect("<stream₄>", eventOk₄)
// ...
// })
// }
//
// See example_test.go for more details.
package tracetest
// Note on detection of races
//
// Verify injects delays to empirically detect race conditions and if a test
// incorrectly decomposed its system into serial streams: consider unrelated to
// each other events A and B are incorrectly routed to the same channel. It
// could be so happening that the order of checks on the test side is almost
// always correct and so the error is not visible. However
//
// if we add delays to delivery of either A or B
// and test both combinations
//
// we will for sure detect the error as, if A and B are indeed
// unrelated, one of the delay combination will result in events
// delivered to test in different to what it expects order.
//
// the time for delay could be taken as follows:
//
// - run the test without delay; collect δt between events on particular stream
// - take delay = max(δt)·10
//
// to make sure there is indeed no different orderings possible on the
// stream, rerun the test N(event-on-stream) times, and during i'th run
// delay i'th event.
//
// See also on this topic:
// http://www.1024cores.net/home/relacy-race-detector
// http://www.1024cores.net/home/relacy-race-detector/rrd-introduction
import (
"fmt"
"sort"
"strings"
"sync"
"reflect"
"runtime"
"runtime/debug"
"testing"
"time"
"github.com/kylelemons/godebug/pretty"
)
// _testing_TB is alias for testing.TB that is non-public when embedded into a struct.
type _testing_TB = testing.TB
// T is similar to testing.T and represents tracetest test environment.
//
// It is passed by Verify and Run to tested function.
//
// Besides testing.TB it provides
//
// .RxEvent -- to where events should be synchronously delivered by the test
// .SetEventRouter -- to tell T to which stream an event should go
// .Expect -- to assert expectation of an event on a stream
type T struct {
_testing_TB
mu sync.Mutex
streamTab map[/*stream*/string]*_chan // where events on stream are delivered; set to nil on test shutdown
routeEvent func(event interface{}) (stream string)
tracev []eventTrace // record of events as they happen
delayInjectTab map[/*stream*/string]*delayInjectState
nakq []nak // naks queued to be sent after Fatal
}
// eventTrace keeps information about one event T received via RxEvent.
type eventTrace struct {
t time.Time // time of receive; monotonic
stream string
event interface{}
}
// delayInjectState is used by delay-injector to find out for which event on a
// stream a delay should be injected.
type delayInjectState struct {
seqno int // current sequence number of event on this stream.
delayAt int // event with `seqno == delayAt` will be delayed
delayT time.Duration // by delayT time.
}
// Run runs f under tracetest environment.
//
// It is similar to Verify but f is ran only once.
// Run does not check for race conditions.
func Run(t testing.TB, f func(t *T)) {
run(t, f, nil)
}
// run serves Run and Verify: it creates T that wraps t, and runs f under T.
func run(t testing.TB, f func(t *T), delayInjectTab map[string]*delayInjectState) *T {
tT := &T{
_testing_TB: t,
streamTab: make(map[string]*_chan),
delayInjectTab: delayInjectTab,
}
// verify in the end that no events are left unchecked / unconsumed,
// e.g. sent to RxEvent, but not received. Nak them if they are and fail.
//
// NOTE this complements T.Fatal and friends, because a test might
// think it completes successfully, but leaves unconsumed events behind it.
defer func() {
nnak := tT.closeStreamTab()
if nnak != 0 {
t.Fatal()
}
}()
f(tT)
return tT
}
// Verify verifies a test system.
//
// It runs f under T environment, catching race conditions, deadlocks and
// unexpected events. f is rerun several times and should not alter its
// behaviour from run to run.
func Verify(t *testing.T, f func(t *T)) {
// run f once. This produces initial trace of events.
tT0 := run(t, f, nil)
// now, if f succeeds, verify f with injected delays.
if tT0.Failed() {
return
}
trace0 := tT0.tracev
if len(trace0) < 2 {
return
}
streams0 := streamsOfTrace(trace0)
// sort trace0 by time just in case - events might come from multiple
// CPUs simultaneously, and so for close events they might be added to
// tracev not in time order.
sort.Slice(trace0, func(i, j int) bool {
return trace0[i].t.Before(trace0[j].t)
})
// find out max(δt) in between events
var δtMax time.Duration
for i := 1; i < len(trace0); i++ {
δt := trace0[i].t.Sub(trace0[i-1].t)
if δt > δtMax {
δtMax = δt
}
}
// retest f with 10·δtMax delay injected at i'th event
delayT := 10*δtMax // TODO make sure it < deadTime
delayTmin := 10*time.Millisecond // make sure delayT ≥ 10ms
if delayT < delayTmin {
delayT = delayTmin
}
for i := 0; i < len(trace0); i++ {
// stream and on-stream sequence number for i'th global event
stream := trace0[i].stream
istream := -1
for j := 0; j <= i; j++ {
if trace0[j].stream == stream {
istream++
}
}
t.Run(fmt.Sprintf("delay@%d(=%s:%d)", i, stream, istream), func(t *testing.T) {
tT := run(t, f, map[string]*delayInjectState{
stream: &delayInjectState{
delayAt: istream,
delayT: delayT,
},
})
// verify that streams are the same from run to run
if tT.Failed() {
return
}
streams := streamsOfTrace(tT.tracev)
if !reflect.DeepEqual(streams, streams0) {
tT.Fatalf("streams are not the same as in the first run:\n"+
"first: %s\nnow: %s\ndiff:\n%s\n\n",
streams0, streams, pretty.Compare(streams0, streams))
}
})
}
}
// T overrides FailNow/Fatal/Fatalf to also cancel all in-progress sends.
func (t *T) FailNow() {
t.Helper()
_ = t.closeStreamTab()
t._testing_TB.FailNow()
}
func (t *T) Fatal(argv ...interface{}) {
t.Helper()
t.Log(argv...)
t.FailNow()
}
func (t *T) Fatalf(format string, argv ...interface{}) {
t.Helper()
t.Logf(format, argv...)
t.FailNow()
}
// closeStreamTab prints details about pending event on streamTab, naks them
// and closes all channels. It returns the number of naked messages.
func (t *T) closeStreamTab() (nnak int) {
t.Helper()
// mark streamTab no longer operational
t.mu.Lock()
streamTab := t.streamTab
t.streamTab = nil
t.mu.Unlock()
if streamTab == nil {
return // already closed
}
// print details about pending events and all streams
type sendInfo struct{ch *_chan; msg *_Msg}
var sendv []sendInfo // sends are pending here
var quietv []*_chan // this channels are quiet
// order channels by name
var streams []string
for __ := range streamTab {
streams = append(streams, __)
}
sort.Slice(streams, func(i, j int) bool {
return strings.Compare(streams[i], streams[j]) < 0
})
for _, stream := range streams {
ch := streamTab[stream]
// check whether someone is sending on channels without blocking.
select {
case msg := <-ch.msgq:
sendv = append(sendv, sendInfo{ch, msg})
default:
quietv = append(quietv, ch)
}
}
pending := fmt.Sprintf("test shutdown: #streams: %d, #(pending events): %d\n", len(streams), len(sendv))
for _, __ := range sendv {
pending += fmt.Sprintf("%s\t<- %T %v\n", __.ch.name, __.msg.Event, __.msg.Event)
}
for _, ch := range quietv {
pending += fmt.Sprintf("# %s\n", ch.name)
}
// log the details and nak senders that we received from.
// nak them only after details printout, so that our text comes first,
// and their "panics" don't get intermixed with it.
t.Log(pending)
for _, __ := range t.nakq {
__.msg.nak(__.why)
nnak++
}
t.nakq = nil
for _, __ := range sendv {
__.msg.nak("canceled (test failed)")
nnak++
}
// in any case close channel where future Sends may arrive so that they will "panic" too.
for _, ch := range streamTab {
ch.Close()
}
return nnak
}
// streamsOfTrace returns sorted list of all streams present in a trace.
func streamsOfTrace(tracev []eventTrace) []string {
streams := make(map[string]struct{})
for _, t := range tracev {
streams[t.stream] = struct{}{}
}
streamv := []string{}
for stream := range streams {
streamv = append(streamv, stream)
}
sort.Strings(streamv)
return streamv
}
// ---- events delivery + Expect ----
// SetEventRouter tells t to which stream an event should go.
//
// It should be called not more than once.
// Before SetEventRouter is called, all events go to "default" stream.
func (t *T) SetEventRouter(routeEvent func(event interface{}) (stream string)) {
t.mu.Lock()
defer t.mu.Unlock()
if t.routeEvent != nil {
panic("double call to SetEventRouter")
}
t.routeEvent = routeEvent
}
// chanForStream returns channel corresponding to stream.
// must be called under mu.
func (t *T) chanForStream(stream string) *_chan {
if t.streamTab == nil {
return nil // t is no longer operational after e.g. deadlock
}
ch, ok := t.streamTab[stream]
if !ok {
ch = t.newChan(stream)
t.streamTab[stream] = ch
}
return ch
}
// RxEvent should be synchronously called from test system when an event occurs.
//
// The sequential process of the test system where event originated should be
// paused until RxEvent returns. This requirement can be usually met via
// inserting t.RxEvent() call into the code that produces the event.
func (t *T) RxEvent(event interface{}) {
t0 := time.Now()
stream := ""
t.mu.Lock()
if t.routeEvent != nil {
stream = t.routeEvent(event)
}
if stream == "" {
stream = "default"
}
t.tracev = append(t.tracev, eventTrace{t0, stream, event})
ch := t.chanForStream(stream)
var delay time.Duration
d, ok := t.delayInjectTab[stream]
if ok {
if d.seqno == d.delayAt {
delay = d.delayT
}
d.seqno++
}
t.mu.Unlock()
if ch == nil {
t.fatalfInNonMain("%s: (pre)send: canceled (test failed)", stream)
}
if delay != 0 {
time.Sleep(delay)
}
ch.Send(event)
}
// xget1 gets 1 event in place and checks it has expected type
//
// if checks do not pass - fatal testing error is raised
func (t *T) xget1(stream string, eventp interface{}) *_Msg {
t.Helper()
t.mu.Lock()
ch := t.chanForStream(stream)
t.mu.Unlock()
if ch == nil {
t.Fatalf("%s: recv: cancled (test failed)", stream)
}
return ch.RecvInto(eventp)
}
// Expect receives next event on stream and verifies it to be equal to eventOK.
//
// If check is successful ACK is sent back to event producer.
// If check does not pass - fatal testing error is raised.
func (t *T) Expect(stream string, eventOK interface{}) {
t.Helper()
msg := t.expect1(stream, eventOK)
msg.Ack()
}
// TODO ExpectNoACK? (then it would be possible to receive events from 2
// streams; have those 2 processes paused and inspect their state. After
// inspection unpause both)
// TODO Recv? (to receive an event for which we don't know type or value yet)
// TODO Select? (e.g. Select("a", "b") to fetch from either "a" or "b")
// expect1 receives next event on stream and verifies it to be equal to eventOK (both type and value).
//
// if checks do not pass - fatal testing error is raised.
func (t *T) expect1(stream string, eventExpect interface{}) *_Msg {
t.Helper()
reventExpect := reflect.ValueOf(eventExpect)
reventp := reflect.New(reventExpect.Type())
msg := t.xget1(stream, reventp.Interface())
revent := reventp.Elem()
if !reflect.DeepEqual(revent.Interface(), reventExpect.Interface()) {
t.queuenak(msg, "unexpected event data")
t.Fatalf("%s: expect: %s:\nwant: %v\nhave: %v\ndiff:\n%s\n\n",
stream,
reventExpect.Type(), reventExpect, revent,
pretty.Compare(reventExpect.Interface(), revent.Interface()))
}
return msg
}
// fatalfInNonMain should be called for fatal cases in non-main goroutines instead of panic.
//
// we don't panic because it will stop the process and prevent the main
// goroutine to print detailed reason for e.g. deadlock or other error.
var fatalLogMu sync.Mutex
func (t *T) fatalfInNonMain(format string, argv ...interface{}) {
t.Helper()
// serialize fatal log+traceback printout, so that such printouts from
// multiple goroutines do not get intermixed.
fatalLogMu.Lock()
defer fatalLogMu.Unlock()
t.Logf(format, argv...)
t.Logf("%s\n", debug.Stack())
runtime.Goexit()
}
// Code generated by lab.nexedi.com/kirr/go123/tracing/cmd/gotrace; DO NOT EDIT.
package tracetest_test
// code generated for tracepoints
import (
"lab.nexedi.com/kirr/go123/tracing"
"unsafe"
)
// traceevent: traceHello(who string)
type _t_traceHello struct {
tracing.Probe
probefunc func(who string)
}
var _traceHello *_t_traceHello
func traceHello(who string) {
if _traceHello != nil {
_traceHello_run(who)
}
}
func _traceHello_run(who string) {
for p := _traceHello; p != nil; p = (*_t_traceHello)(unsafe.Pointer(p.Next())) {
p.probefunc(who)
}
}
func traceHello_Attach(pg *tracing.ProbeGroup, probe func(who string)) *tracing.Probe {
p := _t_traceHello{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceHello)), &p.Probe)
return &p.Probe
}
// traceevent: traceHi(who string)
type _t_traceHi struct {
tracing.Probe
probefunc func(who string)
}
var _traceHi *_t_traceHi
func traceHi(who string) {
if _traceHi != nil {
_traceHi_run(who)
}
}
func _traceHi_run(who string) {
for p := _traceHi; p != nil; p = (*_t_traceHi)(unsafe.Pointer(p.Next())) {
p.probefunc(who)
}
}
func traceHi_Attach(pg *tracing.ProbeGroup, probe func(who string)) *tracing.Probe {
p := _t_traceHi{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceHi)), &p.Probe)
return &p.Probe
}
// trace export signature
func _trace_exporthash_51bb0086e9435e499919853bae2dbb429f70d833() {}
......@@ -33,9 +33,8 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/internal/xtracing/tracetest"
"lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/tracing/tracetest"
"lab.nexedi.com/kirr/go123/xsync"
)
......
......@@ -26,6 +26,7 @@ import (
"sync"
"testing"
"lab.nexedi.com/kirr/go123/tracing/tracetest"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xnet/lonet"
"lab.nexedi.com/kirr/go123/xnet/pipenet"
......@@ -33,7 +34,6 @@ import (
"lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/internal/xcontext"
"lab.nexedi.com/kirr/neo/go/internal/xtracing/tracetest"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/storage"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment