Commit 9b001898 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent fd1140a6
...@@ -57,7 +57,15 @@ import ( ...@@ -57,7 +57,15 @@ import (
// ---- events used in tests ---- // ---- events used in tests ----
// NOTE to ease testing we use strings only to reprsent addresses or where
// event happenned - not e.g. net.Addr or *NodeTab.
// xnet.TraceConnect // xnet.TraceConnect
// event: network connection was made
type eventNetConnect struct {
Src, Dst string
Dialed string
}
// xnet.TraceListen // xnet.TraceListen
// event: node starts listening // event: node starts listening
...@@ -67,7 +75,7 @@ type eventNetListen struct { ...@@ -67,7 +75,7 @@ type eventNetListen struct {
// event: tx via neo.Conn // event: tx via neo.Conn
type eventNeoSend struct { type eventNeoSend struct {
Src, Dst net.Addr // XXX -> string? Src, Dst string
ConnID uint32 ConnID uint32
Msg neo.Msg Msg neo.Msg
} }
...@@ -110,14 +118,14 @@ type EventRouter struct { ...@@ -110,14 +118,14 @@ type EventRouter struct {
// events specific to particular node - e.g. node starts listening // events specific to particular node - e.g. node starts listening
byNode map[string /*host*/]*tsync.SyncChan byNode map[string /*host*/]*tsync.SyncChan
byLink map[string /*host-host*/]*tsync.SyncChan
//byLink
} }
func NewEventRouter() *EventRouter { func NewEventRouter() *EventRouter {
return &EventRouter{ return &EventRouter{
defaultq: tsync.NewSyncChan("default"), defaultq: tsync.NewSyncChan("default"),
byNode: make(map[string]*tsync.SyncChan), byNode: make(map[string]*tsync.SyncChan),
byLink: make(map[string]*tsync.SyncChan),
} }
} }
...@@ -127,7 +135,9 @@ func (r *EventRouter) AllRoutes() []*tsync.SyncChan { ...@@ -127,7 +135,9 @@ func (r *EventRouter) AllRoutes() []*tsync.SyncChan {
for _, dst := range r.byNode { for _, dst := range r.byNode {
rtset[dst] = 1 rtset[dst] = 1
} }
// XXX byLink for _, dst := range r.byLink {
rtset[dst] = 1
}
var rtv []*tsync.SyncChan var rtv []*tsync.SyncChan
for dst := range rtset { for dst := range rtset {
...@@ -149,15 +159,34 @@ func hostport(addr string) (host string, port string) { ...@@ -149,15 +159,34 @@ func hostport(addr string) (host string, port string) {
return host, port return host, port
} }
// host returns hostname-only part from addr.
//
// see also hostport
func host(addr string) string {
host, _ := hostport(addr)
return host
}
// Route routes events according to rules specified via Branch*()
func (r *EventRouter) Route(event interface{}) (dst *tsync.SyncChan) { func (r *EventRouter) Route(event interface{}) (dst *tsync.SyncChan) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
switch ev := event.(type) { switch ev := event.(type) {
// networking
case *eventNetListen: case *eventNetListen:
host, _ := hostport(ev.Laddr) dst = r.byNode[host(ev.Laddr)]
dst = r.byNode[host]
case *eventNetConnect:
link := host(ev.Src) + "-" + host(ev.Dst)
dst = r.byLink[link]
case *eventNeoSend:
// XXX adjust link according to ConnID/rest
link := host(ev.Src) + "-" + host(ev.Dst)
dst = r.byLink[link]
// state changes
case *eventNodeTab: case *eventNodeTab:
dst = r.byNode[ev.Where] dst = r.byNode[ev.Where]
...@@ -171,6 +200,7 @@ func (r *EventRouter) Route(event interface{}) (dst *tsync.SyncChan) { ...@@ -171,6 +200,7 @@ func (r *EventRouter) Route(event interface{}) (dst *tsync.SyncChan) {
return dst return dst
} }
// XXX doc
func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) { func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
...@@ -182,6 +212,20 @@ func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) { ...@@ -182,6 +212,20 @@ func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) {
r.byNode[host] = dst r.byNode[host] = dst
} }
// BranchLink branches events corresponding to link in between a-b with cause root on a to dst.
//
// link should be of "a-b" form.
func (r *EventRouter) BranchLink(link string, dst *tsync.SyncChan) {
r.mu.Lock()
defer r.mu.Unlock()
if _, already := r.byLink[link]; already {
panic(fmt.Sprintf("event router: link %q already branched", link))
}
r.byLink[link] = dst
}
// ---- trace probes, etc -> events -> dispatcher ---- // ---- trace probes, etc -> events -> dispatcher ----
// TraceCollector connects to NEO-specific trace points via probes and sends events to dispatcher. // TraceCollector connects to NEO-specific trace points via probes and sends events to dispatcher.
...@@ -235,7 +279,13 @@ func (t *TraceCollector) RegisterNode(node *neo.NodeApp, name string) { ...@@ -235,7 +279,13 @@ func (t *TraceCollector) RegisterNode(node *neo.NodeApp, name string) {
} }
func (t *TraceCollector) TraceNetConnect(ev *xnet.TraceConnect) { t.d.Dispatch(ev) } func (t *TraceCollector) TraceNetConnect(ev *xnet.TraceConnect) {
t.d.Dispatch(&eventNetConnect{
Src: ev.Src.String(),
Dst: ev.Dst.String(),
Dialed: ev.Dialed,
})
}
func (t *TraceCollector) TraceNetListen(ev *xnet.TraceListen) { func (t *TraceCollector) TraceNetListen(ev *xnet.TraceListen) {
t.d.Dispatch(&eventNetListen{Laddr: ev.Laddr.String()}) t.d.Dispatch(&eventNetListen{Laddr: ev.Laddr.String()})
...@@ -244,7 +294,7 @@ func (t *TraceCollector) TraceNetListen(ev *xnet.TraceListen) { ...@@ -244,7 +294,7 @@ func (t *TraceCollector) TraceNetListen(ev *xnet.TraceListen) {
func (t *TraceCollector) TraceNetTx(ev *xnet.TraceTx) {} // we use traceNeoMsgSend instead func (t *TraceCollector) TraceNetTx(ev *xnet.TraceTx) {} // we use traceNeoMsgSend instead
func (t *TraceCollector) traceNeoMsgSendPre(l *neo.NodeLink, connID uint32, msg neo.Msg) { func (t *TraceCollector) traceNeoMsgSendPre(l *neo.NodeLink, connID uint32, msg neo.Msg) {
t.d.Dispatch(&eventNeoSend{l.LocalAddr(), l.RemoteAddr(), connID, msg}) t.d.Dispatch(&eventNeoSend{l.LocalAddr().String(), l.RemoteAddr().String(), connID, msg})
} }
func (t *TraceCollector) traceClusterState(cs *neo.ClusterState) { func (t *TraceCollector) traceClusterState(cs *neo.ClusterState) {
...@@ -277,7 +327,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -277,7 +327,7 @@ func TestMasterStorage(t *testing.T) {
defer tracer.Detach() defer tracer.Detach()
// by default events go to g // by default events go to g
g := tsync.NewEventChecker(t, dispatch, rt.defaultq) //g := tsync.NewEventChecker(t, dispatch, rt.defaultq)
...@@ -304,8 +354,11 @@ func TestMasterStorage(t *testing.T) { ...@@ -304,8 +354,11 @@ func TestMasterStorage(t *testing.T) {
// shortcut for net connect event // shortcut for net connect event
// XXX -> NetConnect ? // XXX -> NetConnect ?
netconnect := func(src, dst, dialed string) *xnet.TraceConnect { //netconnect := func(src, dst, dialed string) *xnet.TraceConnect {
return &xnet.TraceConnect{Src: xaddr(src), Dst: xaddr(dst), Dialed: dialed} // return &xnet.TraceConnect{Src: xaddr(src), Dst: xaddr(dst), Dialed: dialed}
//}
netconnect := func(src, dst, dialed string) *eventNetConnect {
return &eventNetConnect{Src: src, Dst: dst, Dialed: dialed}
} }
//netlisten := func(laddr string) *xnet.TraceListen { //netlisten := func(laddr string) *xnet.TraceListen {
...@@ -317,8 +370,11 @@ func TestMasterStorage(t *testing.T) { ...@@ -317,8 +370,11 @@ func TestMasterStorage(t *testing.T) {
} }
// shortcut for net tx event over nodelink connection // shortcut for net tx event over nodelink connection
//conntx := func(src, dst string, connid uint32, msg neo.Msg) *eventNeoSend {
// return &eventNeoSend{Src: xaddr(src), Dst: xaddr(dst), ConnID: connid, Msg: msg}
//}
conntx := func(src, dst string, connid uint32, msg neo.Msg) *eventNeoSend { conntx := func(src, dst string, connid uint32, msg neo.Msg) *eventNeoSend {
return &eventNeoSend{Src: xaddr(src), Dst: xaddr(dst), ConnID: connid, Msg: msg} return &eventNeoSend{Src: src, Dst: dst, ConnID: connid, Msg: msg}
} }
// shortcut for NodeInfo // shortcut for NodeInfo
...@@ -347,27 +403,38 @@ func TestMasterStorage(t *testing.T) { ...@@ -347,27 +403,38 @@ func TestMasterStorage(t *testing.T) {
} }
} }
// XXX -> M = testenv.NewMaster("m") (mkhost, chan, register to tracer ...)
// XXX ----//---- S, C
Mhost := xnet.NetTrace(net.Host("m"), tracer) Mhost := xnet.NetTrace(net.Host("m"), tracer)
Shost := xnet.NetTrace(net.Host("s"), tracer) Shost := xnet.NetTrace(net.Host("s"), tracer)
// Chost := xnet.NetTrace(net.Host("c"), tracer) // Chost := xnet.NetTrace(net.Host("c"), tracer)
cm := tsync.NewSyncChan("m.local") // trace of events local to M cM := tsync.NewSyncChan("m.main") // trace of events local to M
cs := tsync.NewSyncChan("s.local") // trace of events local to S XXX with cause root also on S cS := tsync.NewSyncChan("s.main") // trace of events local to S XXX with cause root also on S
tm := tsync.NewEventChecker(t, dispatch, cm) cMS := tsync.NewSyncChan("m-s") // trace of events with cause root being m -> s send
ts := tsync.NewEventChecker(t, dispatch, cs) cSM := tsync.NewSyncChan("s-m") // trace of events with cause root being s -> m send
rt.BranchNode("m", cm)
rt.BranchNode("s", cs) tM := tsync.NewEventChecker(t, dispatch, cM)
tS := tsync.NewEventChecker(t, dispatch, cS)
tMS := tsync.NewEventChecker(t, dispatch, cMS)
tSM := tsync.NewEventChecker(t, dispatch, cSM)
rt.BranchNode("m", cM)
rt.BranchNode("s", cS)
rt.BranchLink("s-m", cSM)
// cluster nodes // cluster nodes
M := NewMaster("abc1", ":1", Mhost) M := NewMaster("abc1", ":1", Mhost)
zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs") zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs")
S := NewStorage("abc1", "m:1", ":1", Shost, zstor) S := NewStorage("abc1", "m:1", ":1", Shost, zstor)
// register them // let tracer know how to map state addresses to node names
tracer.RegisterNode(M.node, "m") // XXX better Mhost.Name() tracer.RegisterNode(M.node, "m") // XXX better Mhost.Name() ?
tracer.RegisterNode(S.node, "s") tracer.RegisterNode(S.node, "s")
gwg := &errgroup.Group{} gwg := &errgroup.Group{}
// start master // start master
...@@ -388,20 +455,21 @@ func TestMasterStorage(t *testing.T) { ...@@ -388,20 +455,21 @@ func TestMasterStorage(t *testing.T) {
exc.Raiseif(err) exc.Raiseif(err)
}) })
// >>> trace >>>
// M starts listening // M starts listening
tm.Expect(netlisten("m:1")) tM.Expect(netlisten("m:1"))
tm.Expect(node("m", "m:1", neo.MASTER, 1, neo.RUNNING, neo.IdTimeNone)) tM.Expect(node("m", "m:1", neo.MASTER, 1, neo.RUNNING, neo.IdTimeNone))
tm.Expect(clusterState("m", neo.ClusterRecovering)) tM.Expect(clusterState("m", neo.ClusterRecovering))
// TODO create C; C tries connect to master - rejected ("not yet operational") // TODO create C; C tries connect to master - rejected ("not yet operational")
// S starts listening // S starts listening
ts.Expect(netlisten("s:1")) tS.Expect(netlisten("s:1"))
// S connects M // S connects M
g.Expect(netconnect("s:2", "m:2", "m:1")) tSM.Expect(netconnect("s:2", "m:2", "m:1"))
g.Expect(conntx("s:2", "m:2", 1, &neo.RequestIdentification{ tSM.Expect(conntx("s:2", "m:2", 1, &neo.RequestIdentification{
NodeType: neo.STORAGE, NodeType: neo.STORAGE,
UUID: 0, UUID: 0,
Address: xnaddr("s:1"), Address: xnaddr("s:1"),
...@@ -409,9 +477,10 @@ func TestMasterStorage(t *testing.T) { ...@@ -409,9 +477,10 @@ func TestMasterStorage(t *testing.T) {
IdTime: neo.IdTimeNone, IdTime: neo.IdTimeNone,
})) }))
g.Expect(node("m", "s:1", neo.STORAGE, 1, neo.PENDING, 0.01)) // XXX try chaning order ^ v
tM.Expect(node("m", "s:1", neo.STORAGE, 1, neo.PENDING, 0.01))
g.Expect(conntx("m:2", "s:2", 1, &neo.AcceptIdentification{ tSM.Expect(conntx("m:2", "s:2", 1, &neo.AcceptIdentification{
NodeType: neo.MASTER, NodeType: neo.MASTER,
MyUUID: neo.UUID(neo.MASTER, 1), MyUUID: neo.UUID(neo.MASTER, 1),
NumPartitions: 1, NumPartitions: 1,
...@@ -422,22 +491,24 @@ func TestMasterStorage(t *testing.T) { ...@@ -422,22 +491,24 @@ func TestMasterStorage(t *testing.T) {
// TODO test ID rejects (uuid already registered, ...) // TODO test ID rejects (uuid already registered, ...)
// M starts recovery on S // M starts recovery on S
g.Expect(conntx("m:2", "s:2", 0, &neo.Recovery{})) tMS.Expect(conntx("m:2", "s:2", 0, &neo.Recovery{}))
g.Expect(conntx("s:2", "m:2", 0, &neo.AnswerRecovery{ tMS.Expect(conntx("s:2", "m:2", 0, &neo.AnswerRecovery{
// empty new node // empty new node
PTid: 0, PTid: 0,
BackupTid: neo.INVALID_TID, BackupTid: neo.INVALID_TID,
TruncateTid: neo.INVALID_TID, TruncateTid: neo.INVALID_TID,
})) }))
g.Expect(conntx("m:2", "s:2", 2, &neo.AskPartitionTable{})) tMS.Expect(conntx("m:2", "s:2", 2, &neo.AskPartitionTable{}))
g.Expect(conntx("s:2", "m:2", 2, &neo.AnswerPartitionTable{ tMS.Expect(conntx("s:2", "m:2", 2, &neo.AnswerPartitionTable{
PTid: 0, PTid: 0,
RowList: []neo.RowInfo{}, RowList: []neo.RowInfo{},
})) }))
// M ready to start: new cluster, no in-progress S recovery // M ready to start: new cluster, no in-progress S recovery
g.Expect(masterStartReady(M, true)) tM.Expect(masterStartReady(M, true))
// <<< trace <<<
_ = Mcancel _ = Mcancel
_ = Scancel _ = Scancel
......
...@@ -249,7 +249,7 @@ func (evc *EventChecker) deadlock(eventp interface{}) { ...@@ -249,7 +249,7 @@ func (evc *EventChecker) deadlock(eventp interface{}) {
if len(sendv) == 0 { if len(sendv) == 0 {
bad += fmt.Sprintf("noone is sending\n") bad += fmt.Sprintf("noone is sending\n")
} else { } else {
bad += fmt.Sprintf("there are %d sender(s) on other channels:\n", len(sendv)) bad += fmt.Sprintf("there are %d sender(s) on other channel(s):\n", len(sendv))
for _, __ := range sendv { for _, __ := range sendv {
bad += fmt.Sprintf("%s:\t%T %v\n", __.dst.name, __.event, __.event) bad += fmt.Sprintf("%s:\t%T %v\n", __.dst.name, __.event, __.event)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment