Commit 084908f3 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 6fe55763
...@@ -263,132 +263,6 @@ func Verify(t *testing.T, f func(t *T)) { ...@@ -263,132 +263,6 @@ func Verify(t *testing.T, f func(t *T)) {
} }
// SetEventRouter tells t to which stream an event should go.
//
// It should be called not more than once.
// Before SetEventRouter is called, all events go to "default" stream.
func (t *T) SetEventRouter(routeEvent func(event interface{}) (stream string)) {
t.mu.Lock()
defer t.mu.Unlock()
if t.routeEvent != nil {
panic("double call to SetEventRouter")
}
t.routeEvent = routeEvent
}
// RxEvent should be synchronously called from test system when an event occurs.
//
// The sequential process of the test system where event originated should be
// paused until RxEvent returns. This requirement can be usually met via
// inserting t.RxEvent() call into the code that produces the event.
func (t *T) RxEvent(event interface{}) {
t0 := time.Now()
stream := ""
t.mu.Lock()
if t.routeEvent != nil {
stream = t.routeEvent(event)
}
if stream == "" {
stream = "default"
}
t.tracev = append(t.tracev, eventTrace{t0, stream, event})
ch := t.chanForStream(stream)
var delay time.Duration
d, ok := t.delayInjectTab[stream]
if ok {
if d.seqno == d.delayAt {
delay = d.delayT
}
d.seqno++
}
t.mu.Unlock()
if ch == nil {
t.fatalfInNonMain("%s: (pre)send: canceled (test failed)", stream)
}
if delay != 0 {
time.Sleep(delay)
}
ch.Send(event)
}
// XXX Chan.Send
// XXX Chan.Close
// XXX Chan.Recv + RecvInto ?
// xget1 gets 1 event in place and checks it has expected type
//
// if checks do not pass - fatal testing error is raised
func (t *T) xget1(stream string, eventp interface{}) *_Msg {
t.Helper()
t.mu.Lock()
ch := t.chanForStream(stream)
t.mu.Unlock()
// XXX ch == nil -> no longer operational
return ch.RecvInto(eventp)
}
// Expect receives next event on stream and verifies it to be equal to eventOK.
//
// If check is successful ACK is sent back to event producer.
// If check does not pass - fatal testing error is raised.
func (t *T) Expect(stream string, eventOK interface{}) {
t.Helper()
msg := t.expect1(stream, eventOK)
msg.Ack()
}
// TODO ExpectNoACK? (then it would be possible to receive events from 2
// streams; have those 2 processes paused and inspect their state. After
// inspection unpause both)
// TODO Recv? (to receive an event for which we don't know type or value yet)
// TODO Select? (e.g. Select("a", "b") to fetch from either "a" or "b")
// expect1 receives next event on stream and verifies it to be equal to eventOK (both type and value).
//
// if checks do not pass - fatal testing error is raised.
func (t *T) expect1(stream string, eventExpect interface{}) *_Msg {
t.Helper()
reventExpect := reflect.ValueOf(eventExpect)
reventp := reflect.New(reventExpect.Type())
msg := t.xget1(stream, reventp.Interface())
revent := reventp.Elem()
if !reflect.DeepEqual(revent.Interface(), reventExpect.Interface()) {
t.queuenak(msg, "unexpected event data")
t.Fatalf("%s: expect: %s:\nwant: %v\nhave: %v\ndiff:\n%s\n\n",
stream,
reventExpect.Type(), reventExpect, revent,
pretty.Compare(reventExpect.Interface(), revent.Interface()))
}
return msg
}
// fatalfInNonMain should be called for fatal cases in non-main goroutines instead of panic. // fatalfInNonMain should be called for fatal cases in non-main goroutines instead of panic.
// //
// we don't panic because it will stop the process and prevent the main // we don't panic because it will stop the process and prevent the main
...@@ -496,6 +370,36 @@ func (t *T) closeStreamTab() (nnak int) { ...@@ -496,6 +370,36 @@ func (t *T) closeStreamTab() (nnak int) {
} }
// streamsOfTrace returns sorted list of all streams present in a trace.
func streamsOfTrace(tracev []eventTrace) []string {
streams := make(map[string]struct{})
for _, t := range tracev {
streams[t.stream] = struct{}{}
}
streamv := []string{}
for stream := range streams {
streamv = append(streamv, stream)
}
sort.Strings(streamv)
return streamv
}
// ---- events delivery + Expect ----
// SetEventRouter tells t to which stream an event should go.
//
// It should be called not more than once.
// Before SetEventRouter is called, all events go to "default" stream.
func (t *T) SetEventRouter(routeEvent func(event interface{}) (stream string)) {
t.mu.Lock()
defer t.mu.Unlock()
if t.routeEvent != nil {
panic("double call to SetEventRouter")
}
t.routeEvent = routeEvent
}
// chanForStream returns channel corresponding to stream. // chanForStream returns channel corresponding to stream.
...@@ -513,17 +417,98 @@ func (t *T) chanForStream(stream string) *_chan { ...@@ -513,17 +417,98 @@ func (t *T) chanForStream(stream string) *_chan {
return ch return ch
} }
// RxEvent should be synchronously called from test system when an event occurs.
//
// The sequential process of the test system where event originated should be
// paused until RxEvent returns. This requirement can be usually met via
// inserting t.RxEvent() call into the code that produces the event.
func (t *T) RxEvent(event interface{}) {
t0 := time.Now()
stream := ""
t.mu.Lock()
if t.routeEvent != nil {
stream = t.routeEvent(event)
}
if stream == "" {
stream = "default"
}
t.tracev = append(t.tracev, eventTrace{t0, stream, event})
ch := t.chanForStream(stream)
// streamsOfTrace returns sorted list of all streams present in a trace. var delay time.Duration
func streamsOfTrace(tracev []eventTrace) []string { d, ok := t.delayInjectTab[stream]
streams := make(map[string]struct{}) if ok {
for _, t := range tracev { if d.seqno == d.delayAt {
streams[t.stream] = struct{}{} delay = d.delayT
}
d.seqno++
} }
streamv := []string{}
for stream := range streams { t.mu.Unlock()
streamv = append(streamv, stream)
if ch == nil {
t.fatalfInNonMain("%s: (pre)send: canceled (test failed)", stream)
} }
sort.Strings(streamv)
return streamv if delay != 0 {
time.Sleep(delay)
}
ch.Send(event)
}
// xget1 gets 1 event in place and checks it has expected type
//
// if checks do not pass - fatal testing error is raised
func (t *T) xget1(stream string, eventp interface{}) *_Msg {
t.Helper()
t.mu.Lock()
ch := t.chanForStream(stream)
t.mu.Unlock()
// XXX ch == nil -> no longer operational
return ch.RecvInto(eventp)
}
// Expect receives next event on stream and verifies it to be equal to eventOK.
//
// If check is successful ACK is sent back to event producer.
// If check does not pass - fatal testing error is raised.
func (t *T) Expect(stream string, eventOK interface{}) {
t.Helper()
msg := t.expect1(stream, eventOK)
msg.Ack()
}
// TODO ExpectNoACK? (then it would be possible to receive events from 2
// streams; have those 2 processes paused and inspect their state. After
// inspection unpause both)
// TODO Recv? (to receive an event for which we don't know type or value yet)
// TODO Select? (e.g. Select("a", "b") to fetch from either "a" or "b")
// expect1 receives next event on stream and verifies it to be equal to eventOK (both type and value).
//
// if checks do not pass - fatal testing error is raised.
func (t *T) expect1(stream string, eventExpect interface{}) *_Msg {
t.Helper()
reventExpect := reflect.ValueOf(eventExpect)
reventp := reflect.New(reventExpect.Type())
msg := t.xget1(stream, reventp.Interface())
revent := reventp.Elem()
if !reflect.DeepEqual(revent.Interface(), reventExpect.Interface()) {
t.queuenak(msg, "unexpected event data")
t.Fatalf("%s: expect: %s:\nwant: %v\nhave: %v\ndiff:\n%s\n\n",
stream,
reventExpect.Type(), reventExpect, revent,
pretty.Compare(reventExpect.Interface(), revent.Interface()))
}
return msg
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment