Commit b129267b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent cc4f3f2c
// Copyright (C) 2017-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package tracetest
// synchronous channels.
import (
"errors"
"flag"
"fmt"
)
var (
chatty = flag.Bool("tracetest.v", false, "verbose: print events as they are sent on trace channels")
)
// _Msg represents message with 1 event sent over _chan.
//
// The goroutine which sent the message will wait for Ack before continue.
type _Msg struct {
Event interface {}
ack chan<- error // nil on Ack; !nil on nak
}
// _chan provides synchronous channel associated with a stream.
//
// It comes with additional property that send blocks until receiving side
// explicitly acknowledges message was received and processed.
//
// New channels must be created via T.newChan.
//
// It is safe to use _chan from multiple goroutines simultaneously.
type _chan struct {
t *T // created for stream <.name> under <.t>
name string // name of the channel/stream
msgq chan *_Msg
down chan struct{} // becomes ready when closed
}
// Send sends event to a consumer and waits for ack.
// if main testing goroutine detects any problem Send panics.
func (ch *_chan) Send(event interface{}) {
if *chatty {
fmt.Printf("%s <- %T %v\n", ch.name, event, event)
}
ack := make(chan error)
select {
case <-ch.down:
ch.t.fatalfInNonMain("%s: send: channel was closed", ch.name)
case ch.msgq <- &_Msg{event, ack}:
err := <-ack
if err != nil {
ch.t.fatalfInNonMain("%s: send: %s", ch.name, err)
}
}
}
// Close closes the sending side of the channel.
func (ch *_chan) Close() {
close(ch.down) // note - not .msgq
}
// Recv receives message from a producer.
//
// The consumer, after dealing with the message, must send back an ack.
func (ch *_chan) Recv() *_Msg {
msg := <-ch.msgq
return msg
}
// XXX -> Unpause? Cont? Continue?
// Ack acknowledges the event was processed and unblocks producer goroutine.
func (m *_Msg) Ack() {
m.ack <- nil
}
// nak tells sender that event verification failed and why.
// it is called only by tracetest internals.
func (m *_Msg) nak(why string) {
m.ack <- errors.New(why)
}
// newChan creates new _chan channel.
func (t *T) newChan(name string) *_chan {
// NOTE T ensures not to create channels with duplicate names.
return &_chan{t: t, name: name, msgq: make(chan *_Msg), down: make(chan struct{}), }
}
...@@ -302,7 +302,7 @@ var testExpectMap = map[string]testExpect{ ...@@ -302,7 +302,7 @@ var testExpectMap = map[string]testExpect{
t1 <- tracetest_test.eventHi T1·A t1 <- tracetest_test.eventHi T1·A
# t2 # t2
tracetest.go:<LINE>: t1: send: canceled (test failed) chan.go:<LINE>: t1: send: canceled (test failed)
`}, `},
"TestRace": {1, "TestRace": {1,
...@@ -321,7 +321,7 @@ var testExpectMap = map[string]testExpect{ ...@@ -321,7 +321,7 @@ var testExpectMap = map[string]testExpect{
example_test.go:203: test shutdown: #streams: 1, #(pending events): 0 example_test.go:203: test shutdown: #streams: 1, #(pending events): 0
# t1 # t1
tracetest.go:<LINE>: t1: send: unexpected event type chan.go:<LINE>: t1: send: unexpected event type
`}, `},
"TestExpectValue": {1, "TestExpectValue": {1,
...@@ -336,6 +336,6 @@ var testExpectMap = map[string]testExpect{ ...@@ -336,6 +336,6 @@ var testExpectMap = map[string]testExpect{
example_test.go:219: test shutdown: #streams: 1, #(pending events): 0 example_test.go:219: test shutdown: #streams: 1, #(pending events): 0
# t1 # t1
tracetest.go:<LINE>: t1: send: unexpected event data chan.go:<LINE>: t1: send: unexpected event data
`}, `},
} }
...@@ -74,7 +74,6 @@ ...@@ -74,7 +74,6 @@
package tracetest package tracetest
import ( import (
"errors"
"flag" "flag"
"fmt" "fmt"
"sort" "sort"
...@@ -90,88 +89,9 @@ import ( ...@@ -90,88 +89,9 @@ import (
) )
var ( var (
chatty = flag.Bool("tracetest.v", false, "verbose: print events as they are sent on trace channels")
deadTime = flag.Duration("tracetest.deadtime", 3*time.Second, "time after which no events activity is considered to be a deadlock") deadTime = flag.Duration("tracetest.deadtime", 3*time.Second, "time after which no events activity is considered to be a deadlock")
) )
// _Msg represents message with 1 event sent over _chan.
//
// The goroutine which sent the message will wait for Ack before continue.
type _Msg struct {
Event interface {}
ack chan<- error // nil on Ack; !nil on nak
}
// _chan provides synchronous channel associated with a stream.
//
// It comes with additional property that send blocks until receiving side
// explicitly acknowledges message was received and processed.
//
// New channels must be created via T.newChan.
//
// It is safe to use _chan from multiple goroutines simultaneously.
type _chan struct {
t *T // created for stream <.name> under <.t>
name string // name of the channel/stream
msgq chan *_Msg
down chan struct{} // becomes ready when closed
}
// Send sends event to a consumer and waits for ack.
// if main testing goroutine detects any problem Send panics.
func (ch *_chan) Send(event interface{}) {
if *chatty {
fmt.Printf("%s <- %T %v\n", ch.name, event, event)
}
ack := make(chan error)
select {
case <-ch.down:
ch.t.fatalfInNonMain("%s: send: channel was closed", ch.name)
case ch.msgq <- &_Msg{event, ack}:
err := <-ack
if err != nil {
ch.t.fatalfInNonMain("%s: send: %s", ch.name, err)
}
}
}
// Close closes the sending side of the channel.
func (ch *_chan) Close() {
close(ch.down) // note - not .msgq
}
// Recv receives message from a producer.
//
// The consumer, after dealing with the message, must send back an ack.
func (ch *_chan) Recv() *_Msg {
msg := <-ch.msgq
return msg
}
// XXX -> Unpause? Cont? Continue?
// Ack acknowledges the event was processed and unblocks producer goroutine.
func (m *_Msg) Ack() {
m.ack <- nil
}
// nak tells sender that event verification failed and why.
// it is called only by tracetest internals.
func (m *_Msg) nak(why string) {
m.ack <- errors.New(why)
}
// newChan creates new _chan channel.
func (t *T) newChan(name string) *_chan {
// NOTE T ensures not to create channels with duplicate names.
return &_chan{t: t, name: name, msgq: make(chan *_Msg), down: make(chan struct{}), }
}
// ----------------------------------------
// _testing_TB is alias for testing.TB that is non-public when embedded into a struct. // _testing_TB is alias for testing.TB that is non-public when embedded into a struct.
type _testing_TB = testing.TB type _testing_TB = testing.TB
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment