Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
N
neo
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Stefane Fermigier
neo
Commits
06e18013
Commit
06e18013
authored
Feb 08, 2018
by
Kirill Smelkov
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
.
parent
42bc63f7
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
185 additions
and
66 deletions
+185
-66
go/neo/server/cluster_test.go
go/neo/server/cluster_test.go
+106
-58
go/xcommon/xtracing/tsync/tsync.go
go/xcommon/xtracing/tsync/tsync.go
+79
-8
No files found.
go/neo/server/cluster_test.go
View file @
06e18013
...
@@ -23,35 +23,36 @@ package server
...
@@ -23,35 +23,36 @@ package server
//go:generate gotrace gen .
//go:generate gotrace gen .
import
(
import
(
"bytes"
//
"bytes"
"context"
"context"
"crypto/sha1"
//
"crypto/sha1"
"io"
//
"io"
"net"
"net"
"reflect"
//"reflect"
"sync"
"testing"
"testing"
"unsafe"
"unsafe"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/errgroup"
"github.com/kylelemons/godebug/pretty"
//
"github.com/kylelemons/godebug/pretty"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/client"
//
"lab.nexedi.com/kirr/neo/go/neo/client"
"lab.nexedi.com/kirr/neo/go/neo/internal/common"
//
"lab.nexedi.com/kirr/neo/go/neo/internal/common"
"lab.nexedi.com/kirr/neo/go/zodb"
//
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/xt
esting
"
"lab.nexedi.com/kirr/neo/go/xcommon/xt
racing/tsync
"
"lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/tracing"
"lab.nexedi.com/kirr/go123/tracing"
"lab.nexedi.com/kirr/go123/xerr"
//
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xnet/pipenet"
"lab.nexedi.com/kirr/go123/xnet/pipenet"
"fmt"
"fmt"
"time"
//
"time"
)
)
// ---- events used in tests ----
// ---- events used in tests ----
...
@@ -61,7 +62,7 @@ import (
...
@@ -61,7 +62,7 @@ import (
// event: tx via neo.Conn
// event: tx via neo.Conn
type
eventNeoSend
struct
{
type
eventNeoSend
struct
{
Src
,
Dst
net
.
Addr
Src
,
Dst
net
.
Addr
// XXX -> string?
ConnID
uint32
ConnID
uint32
Msg
neo
.
Msg
Msg
neo
.
Msg
}
}
...
@@ -92,58 +93,96 @@ func masterStartReady(m *Master, ready bool) *eventMStartReady {
...
@@ -92,58 +93,96 @@ func masterStartReady(m *Master, ready bool) *eventMStartReady {
return
&
eventMStartReady
{
unsafe
.
Pointer
(
m
),
ready
}
return
&
eventMStartReady
{
unsafe
.
Pointer
(
m
),
ready
}
}
}
// ---- events routing ----
// ----------------------------------------
// EventRouter implements NEO-specific routing of events to trace test channels.
type
EventRouter
struct
{
mu
sync
.
Mutex
defaultq
*
tsync
.
SyncChan
}
// XXX tracer which can collect tracing events from net + TODO master/storage/etc...
func
NewEventRouter
()
*
EventRouter
{
// XXX naming
return
&
EventRouter
{
defaultq
:
tsync
.
NewSyncChan
()}
type
MyTracer
struct
{
*
xtesting
.
SyncChan
}
}
func
(
t
*
MyTracer
)
TraceNetConnect
(
ev
*
xnet
.
TraceConnect
)
{
t
.
Send
(
ev
)
}
func
(
r
*
EventRouter
)
Route
(
event
interface
{})
*
tsync
.
SyncChan
{
func
(
t
*
MyTracer
)
TraceNetListen
(
ev
*
xnet
.
TraceListen
)
{
t
.
Send
(
ev
)
}
r
.
mu
.
Lock
()
func
(
t
*
MyTracer
)
TraceNetTx
(
ev
*
xnet
.
TraceTx
)
{}
// { t.Send(ev) }
defer
r
.
mu
.
Unlock
()
switch
event
.
(
type
)
{
// ...
}
return
r
.
defaultq
// use default XXX or better nil?
}
//type traceNeoRecv struct {conn *neo.Conn; msg neo.Msg}
// ---- trace probes, etc -> events -> dispatcher ----
//func (t *MyTracer) traceNeoConnRecv(c *neo.Conn, msg neo.Msg) { t.Send(&traceNeoRecv{c, msg}) }
func
(
t
*
MyTracer
)
traceNeoMsgSendPre
(
l
*
neo
.
NodeLink
,
connID
uint32
,
msg
neo
.
Msg
)
{
// TraceCollector connects to NEO-specific trace points via probes and sends events to dispatcher.
t
.
Send
(
&
eventNeoSend
{
l
.
LocalAddr
(),
l
.
RemoteAddr
(),
connID
,
msg
})
type
TraceCollector
struct
{
pg
*
tracing
.
ProbeGroup
d
*
tsync
.
EventDispatcher
}
}
func
(
t
*
MyTracer
)
traceClusterState
(
cs
*
neo
.
ClusterState
)
{
func
NewTraceCollector
(
dispatch
*
tsync
.
EventDispatcher
)
*
TraceCollector
{
t
.
Send
(
&
eventClusterState
{
cs
,
*
cs
})
return
&
TraceCollector
{
pg
:
&
tracing
.
ProbeGroup
{},
d
:
dispatch
}
}
}
func
(
t
*
MyTracer
)
traceNode
(
nt
*
neo
.
NodeTable
,
n
*
neo
.
Node
)
{
//trace:import "lab.nexedi.com/kirr/neo/go/neo"
t
.
Send
(
&
eventNodeTab
{
unsafe
.
Pointer
(
nt
),
n
.
NodeInfo
})
// Attach attaches the tracer to appropriate trace points.
func
(
t
*
TraceCollector
)
Attach
()
{
tracing
.
Lock
()
//neo_traceMsgRecv_Attach(t.pg, t.traceNeoMsgRecv)
neo_traceMsgSendPre_Attach
(
t
.
pg
,
t
.
traceNeoMsgSendPre
)
neo_traceClusterStateChanged_Attach
(
t
.
pg
,
t
.
traceClusterState
)
neo_traceNodeChanged_Attach
(
t
.
pg
,
t
.
traceNode
)
traceMasterStartReady_Attach
(
t
.
pg
,
t
.
traceMasterStartReady
)
tracing
.
Unlock
()
}
}
func
(
t
*
MyTracer
)
traceMasterStartReady
(
m
*
Master
,
ready
bool
)
{
func
(
t
*
TraceCollector
)
Detach
(
)
{
t
.
Send
(
masterStartReady
(
m
,
ready
)
)
t
.
pg
.
Done
(
)
}
}
func
(
t
*
TraceCollector
)
TraceNetConnect
(
ev
*
xnet
.
TraceConnect
)
{
t
.
d
.
Dispatch
(
ev
)
}
func
(
t
*
TraceCollector
)
TraceNetListen
(
ev
*
xnet
.
TraceListen
)
{
t
.
d
.
Dispatch
(
ev
)
}
func
(
t
*
TraceCollector
)
TraceNetTx
(
ev
*
xnet
.
TraceTx
)
{}
// we use traceNeoMsgSend instead
//trace:import "lab.nexedi.com/kirr/neo/go/neo"
func
(
t
*
TraceCollector
)
traceNeoMsgSendPre
(
l
*
neo
.
NodeLink
,
connID
uint32
,
msg
neo
.
Msg
)
{
t
.
d
.
Dispatch
(
&
eventNeoSend
{
l
.
LocalAddr
(),
l
.
RemoteAddr
(),
connID
,
msg
})
}
func
(
t
*
TraceCollector
)
traceClusterState
(
cs
*
neo
.
ClusterState
)
{
t
.
d
.
Dispatch
(
&
eventClusterState
{
cs
,
*
cs
})
}
func
(
t
*
TraceCollector
)
traceNode
(
nt
*
neo
.
NodeTable
,
n
*
neo
.
Node
)
{
t
.
d
.
Dispatch
(
&
eventNodeTab
{
unsafe
.
Pointer
(
nt
),
n
.
NodeInfo
})
}
func
(
t
*
TraceCollector
)
traceMasterStartReady
(
m
*
Master
,
ready
bool
)
{
t
.
d
.
Dispatch
(
masterStartReady
(
m
,
ready
))
}
// ----------------------------------------
// M drives cluster with 1 S & C through recovery -> verification -> service -> shutdown
// M drives cluster with 1 S & C through recovery -> verification -> service -> shutdown
func
TestMasterStorage
(
t
*
testing
.
T
)
{
func
TestMasterStorage
(
t
*
testing
.
T
)
{
tracer
:=
&
MyTracer
{
xtesting
.
NewSyncChan
()}
rt
:=
NewEventRouter
()
tc
:=
xtesting
.
NewEventChecker
(
t
,
tracer
.
SyncChan
)
dispatch
:=
tsync
.
NewEventDispatcher
(
rt
)
tracer
:=
NewTraceCollector
(
dispatch
)
net
:=
pipenet
.
New
(
"testnet"
)
// test network
net
:=
pipenet
.
New
(
"testnet"
)
// test network
pg
:=
&
tracing
.
ProbeGroup
{}
tracer
.
Attach
()
defer
pg
.
Done
()
defer
tracer
.
Detach
()
// by default events go to g
g
:=
tsync
.
NewEventChecker
(
t
,
rt
.
defaultq
)
tracing
.
Lock
()
//neo_traceMsgRecv_Attach(pg, tracer.traceNeoMsgRecv)
neo_traceMsgSendPre_Attach
(
pg
,
tracer
.
traceNeoMsgSendPre
)
neo_traceClusterStateChanged_Attach
(
pg
,
tracer
.
traceClusterState
)
neo_traceNodeChanged_Attach
(
pg
,
tracer
.
traceNode
)
traceMasterStartReady_Attach
(
pg
,
tracer
.
traceMasterStartReady
)
tracing
.
Unlock
()
// shortcut for addresses
// shortcut for addresses
...
@@ -204,7 +243,7 @@ func TestMasterStorage(t *testing.T) {
...
@@ -204,7 +243,7 @@ func TestMasterStorage(t *testing.T) {
Mhost
:=
xnet
.
NetTrace
(
net
.
Host
(
"m"
),
tracer
)
Mhost
:=
xnet
.
NetTrace
(
net
.
Host
(
"m"
),
tracer
)
Shost
:=
xnet
.
NetTrace
(
net
.
Host
(
"s"
),
tracer
)
Shost
:=
xnet
.
NetTrace
(
net
.
Host
(
"s"
),
tracer
)
Chost
:=
xnet
.
NetTrace
(
net
.
Host
(
"c"
),
tracer
)
//
Chost := xnet.NetTrace(net.Host("c"), tracer)
gwg
:=
&
errgroup
.
Group
{}
gwg
:=
&
errgroup
.
Group
{}
...
@@ -220,9 +259,9 @@ func TestMasterStorage(t *testing.T) {
...
@@ -220,9 +259,9 @@ func TestMasterStorage(t *testing.T) {
})
})
// M starts listening
// M starts listening
tc
.
Expect
(
netlisten
(
"m:1"
))
g
.
Expect
(
netlisten
(
"m:1"
))
tc
.
Expect
(
node
(
M
.
node
,
"m:1"
,
neo
.
MASTER
,
1
,
neo
.
RUNNING
,
neo
.
IdTimeNone
))
g
.
Expect
(
node
(
M
.
node
,
"m:1"
,
neo
.
MASTER
,
1
,
neo
.
RUNNING
,
neo
.
IdTimeNone
))
tc
.
Expect
(
clusterState
(
&
M
.
node
.
ClusterState
,
neo
.
ClusterRecovering
))
g
.
Expect
(
clusterState
(
&
M
.
node
.
ClusterState
,
neo
.
ClusterRecovering
))
// TODO create C; C tries connect to master - rejected ("not yet operational")
// TODO create C; C tries connect to master - rejected ("not yet operational")
...
@@ -237,11 +276,11 @@ func TestMasterStorage(t *testing.T) {
...
@@ -237,11 +276,11 @@ func TestMasterStorage(t *testing.T) {
})
})
// S starts listening
// S starts listening
tc
.
Expect
(
netlisten
(
"s:1"
))
g
.
Expect
(
netlisten
(
"s:1"
))
// S connects M
// S connects M
tc
.
Expect
(
netconnect
(
"s:2"
,
"m:2"
,
"m:1"
))
g
.
Expect
(
netconnect
(
"s:2"
,
"m:2"
,
"m:1"
))
tc
.
Expect
(
conntx
(
"s:2"
,
"m:2"
,
1
,
&
neo
.
RequestIdentification
{
g
.
Expect
(
conntx
(
"s:2"
,
"m:2"
,
1
,
&
neo
.
RequestIdentification
{
NodeType
:
neo
.
STORAGE
,
NodeType
:
neo
.
STORAGE
,
UUID
:
0
,
UUID
:
0
,
Address
:
xnaddr
(
"s:1"
),
Address
:
xnaddr
(
"s:1"
),
...
@@ -249,9 +288,9 @@ func TestMasterStorage(t *testing.T) {
...
@@ -249,9 +288,9 @@ func TestMasterStorage(t *testing.T) {
IdTime
:
neo
.
IdTimeNone
,
IdTime
:
neo
.
IdTimeNone
,
}))
}))
tc
.
Expect
(
node
(
M
.
node
,
"s:1"
,
neo
.
STORAGE
,
1
,
neo
.
PENDING
,
0.01
))
g
.
Expect
(
node
(
M
.
node
,
"s:1"
,
neo
.
STORAGE
,
1
,
neo
.
PENDING
,
0.01
))
tc
.
Expect
(
conntx
(
"m:2"
,
"s:2"
,
1
,
&
neo
.
AcceptIdentification
{
g
.
Expect
(
conntx
(
"m:2"
,
"s:2"
,
1
,
&
neo
.
AcceptIdentification
{
NodeType
:
neo
.
MASTER
,
NodeType
:
neo
.
MASTER
,
MyUUID
:
neo
.
UUID
(
neo
.
MASTER
,
1
),
MyUUID
:
neo
.
UUID
(
neo
.
MASTER
,
1
),
NumPartitions
:
1
,
NumPartitions
:
1
,
...
@@ -262,23 +301,29 @@ func TestMasterStorage(t *testing.T) {
...
@@ -262,23 +301,29 @@ func TestMasterStorage(t *testing.T) {
// TODO test ID rejects (uuid already registered, ...)
// TODO test ID rejects (uuid already registered, ...)
// M starts recovery on S
// M starts recovery on S
tc
.
Expect
(
conntx
(
"m:2"
,
"s:2"
,
0
,
&
neo
.
Recovery
{}))
g
.
Expect
(
conntx
(
"m:2"
,
"s:2"
,
0
,
&
neo
.
Recovery
{}))
tc
.
Expect
(
conntx
(
"s:2"
,
"m:2"
,
0
,
&
neo
.
AnswerRecovery
{
g
.
Expect
(
conntx
(
"s:2"
,
"m:2"
,
0
,
&
neo
.
AnswerRecovery
{
// empty new node
// empty new node
PTid
:
0
,
PTid
:
0
,
BackupTid
:
neo
.
INVALID_TID
,
BackupTid
:
neo
.
INVALID_TID
,
TruncateTid
:
neo
.
INVALID_TID
,
TruncateTid
:
neo
.
INVALID_TID
,
}))
}))
tc
.
Expect
(
conntx
(
"m:2"
,
"s:2"
,
2
,
&
neo
.
AskPartitionTable
{}))
g
.
Expect
(
conntx
(
"m:2"
,
"s:2"
,
2
,
&
neo
.
AskPartitionTable
{}))
tc
.
Expect
(
conntx
(
"s:2"
,
"m:2"
,
2
,
&
neo
.
AnswerPartitionTable
{
g
.
Expect
(
conntx
(
"s:2"
,
"m:2"
,
2
,
&
neo
.
AnswerPartitionTable
{
PTid
:
0
,
PTid
:
0
,
RowList
:
[]
neo
.
RowInfo
{},
RowList
:
[]
neo
.
RowInfo
{},
}))
}))
// M ready to start: new cluster, no in-progress S recovery
// M ready to start: new cluster, no in-progress S recovery
tc
.
Expect
(
masterStartReady
(
M
,
true
))
g
.
Expect
(
masterStartReady
(
M
,
true
))
_
=
Mcancel
_
=
Scancel
return
}
/*
// M <- start cmd
// M <- start cmd
wg := &errgroup.Group{}
wg := &errgroup.Group{}
gox(wg, func() {
gox(wg, func() {
...
@@ -563,8 +608,10 @@ func TestMasterStorage(t *testing.T) {
...
@@ -563,8 +608,10 @@ func TestMasterStorage(t *testing.T) {
Scancel() // ---- // ----
Scancel() // ---- // ----
xwait(gwg)
xwait(gwg)
}
}
*/
/*
func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit func(xcload1 func())) {
func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit func(xcload1 func())) {
// create test cluster <- XXX factor to utility func
// create test cluster <- XXX factor to utility func
zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs")
zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs")
...
@@ -578,8 +625,8 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
...
@@ -578,8 +625,8 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
M := NewMaster("abc1", "", Mnet)
M := NewMaster("abc1", "", Mnet)
// XXX to wait for "M listens at ..." & "ready to start" -> XXX add something to M api?
// XXX to wait for "M listens at ..." & "ready to start" -> XXX add something to M api?
tracer
:=
&
MyTracer
{
xtesting
.
NewSyncChan
()}
tracer := &
TraceRouter{tsync
.NewSyncChan()}
tc
:=
xtesting
.
NewEventChecker
(
b
,
tracer
.
SyncChan
)
tc :=
tsync
.NewEventChecker(b, tracer.SyncChan)
pg := &tracing.ProbeGroup{}
pg := &tracing.ProbeGroup{}
tracing.Lock()
tracing.Lock()
pnode := neo_traceNodeChanged_Attach(nil, tracer.traceNode)
pnode := neo_traceNodeChanged_Attach(nil, tracer.traceNode)
...
@@ -697,3 +744,4 @@ func BenchmarkGetObjectTCPloParallel(b *testing.B) {
...
@@ -697,3 +744,4 @@ func BenchmarkGetObjectTCPloParallel(b *testing.B) {
net := xnet.NetPlain("tcp")
net := xnet.NetPlain("tcp")
benchmarkGetObjectParallel(b, net, net, net)
benchmarkGetObjectParallel(b, net, net, net)
}
}
*/
go/xcommon/xt
esting/xtesting
.go
→
go/xcommon/xt
racing/tsync/tsync
.go
View file @
06e18013
// Copyright (C) 2017 Nexedi SA and Contributors.
// Copyright (C) 2017
-2018
Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
Kirill Smelkov <kirr@nexedi.com>
//
//
// This program is free software: you can Use, Study, Modify and Redistribute
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// it under the terms of the GNU General Public License version 3, or (at your
...
@@ -17,8 +17,39 @@
...
@@ -17,8 +17,39 @@
// See COPYING file for full licensing terms.
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xtesting provides addons to std package testing.
// Package tsync provides infrastructure for synchronous testing based on program tracing.
package
xtesting
// XXX naming -> ttest?
//
// A serial system can be verified by checking that its execution produces
// expected serial stream of events. But concurrent systems cannot be verified
// by exactly this way because events are only partly-ordered with respect to
// each other by causality or so called happens-before relation.
//
// However in a concurrent system one can decompose all events into serial
// streams in which events are strictly ordered by causality with respect to
// each other. This decomposition in turn allows to verify that in every stream
// events were as expected.
//
// Verification of events for all streams can be done by one *sequential*
// process:
//
// - if events A and B in different streams are unrelated to each other by
// causality, the sequence of checks models a particular possible flow of
// time. Notably since events are delivered synchronously and sender is
// blocked until receiver/checker explicitly confirms event has been
// processed, by checking either A then B, or B then A allows to check
// for a particular race-condition.
//
// - if events A and B in different streams are related to each other by
// causality (i.e. there is some happens-before relation for them) the
// sequence of checking should represent that ordering relation.
//
// XXX more text describing how to use the package.
//
// XXX (if tested system is serial only there is no need to use Dispatcher and
// routing - the collector can send output directly to the only SyncChan with
// only one EventChecker connected to it).
package
tsync
import
(
import
(
"reflect"
"reflect"
...
@@ -75,7 +106,8 @@ func NewSyncChan() *SyncChan {
...
@@ -75,7 +106,8 @@ func NewSyncChan() *SyncChan {
// ----------------------------------------
// ----------------------------------------
// EventChecker is testing utility to verify events coming from a SyncChan are as expected.
// EventChecker is testing utility to verify that sequence of events coming
// from a single SyncChan are as expected.
type
EventChecker
struct
{
type
EventChecker
struct
{
t
testing
.
TB
t
testing
.
TB
in
*
SyncChan
in
*
SyncChan
...
@@ -92,6 +124,7 @@ func NewEventChecker(t testing.TB, in *SyncChan) *EventChecker {
...
@@ -92,6 +124,7 @@ func NewEventChecker(t testing.TB, in *SyncChan) *EventChecker {
// if checks do not pass - fatal testing error is raised
// if checks do not pass - fatal testing error is raised
func
(
evc
*
EventChecker
)
xget1
(
eventp
interface
{})
*
SyncMsg
{
func
(
evc
*
EventChecker
)
xget1
(
eventp
interface
{})
*
SyncMsg
{
evc
.
t
.
Helper
()
evc
.
t
.
Helper
()
// XXX handle deadlock timeout
msg
:=
evc
.
in
.
Recv
()
msg
:=
evc
.
in
.
Recv
()
reventp
:=
reflect
.
ValueOf
(
eventp
)
reventp
:=
reflect
.
ValueOf
(
eventp
)
...
@@ -151,9 +184,6 @@ func (evc *EventChecker) ExpectNoACK(expected interface{}) *SyncMsg {
...
@@ -151,9 +184,6 @@ func (evc *EventChecker) ExpectNoACK(expected interface{}) *SyncMsg {
return
msg
return
msg
}
}
// XXX goes away? (if there is no happens-before for events - just check them one by one in dedicated goroutines ?)
// XXX goes away? (if there is no happens-before for events - just check them one by one in dedicated goroutines ?)
/*
/*
// ExpectPar asks checker to expect next series of events to be from eventExpectV in no particular order
// ExpectPar asks checker to expect next series of events to be from eventExpectV in no particular order
...
@@ -185,3 +215,44 @@ loop:
...
@@ -185,3 +215,44 @@ loop:
}
}
}
}
*/
*/
// ----------------------------------------
// EventRouter is the interface used for routing events to appropriate output SyncChan.
type
EventRouter
interface
{
// Route should return appropriate destination for event.
//
// If nil is returned default destination is used. // XXX ok?
//
// It should be safe to call Route from multiple goroutines simultaneously.
Route
(
event
interface
{})
*
SyncChan
// AllDst() []*SyncChan
}
// EventDispatcher dispatches events to appropriate SyncChan for checking
// according to provided router.
type
EventDispatcher
struct
{
rt
EventRouter
}
// NewEventDispatcher creates new dispatcher and provides router to it.
func
NewEventDispatcher
(
router
EventRouter
)
*
EventDispatcher
{
return
&
EventDispatcher
{
rt
:
router
}
}
// Dispatch dispatches event to appropriate output channel.
//
// It is safe to use Dispatch from multiple goroutines simultaneously.
func
(
d
*
EventDispatcher
)
Dispatch
(
event
interface
{})
{
outch
:=
d
.
rt
.
Route
(
event
)
// XXX if nil?
// TODO timeout: deadlock? (print all-in-flight events on timout)
// XXX or better ^^^ to do on receiver side?
//
// XXX -> if deadlock detection is done on receiver side (so in
// EventChecker) -> we don't need EventDispatcher at all?
outch
.
Send
(
event
)
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment