Commit 347c32d8 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9b001898
...@@ -118,14 +118,18 @@ type EventRouter struct { ...@@ -118,14 +118,18 @@ type EventRouter struct {
// events specific to particular node - e.g. node starts listening // events specific to particular node - e.g. node starts listening
byNode map[string /*host*/]*tsync.SyncChan byNode map[string /*host*/]*tsync.SyncChan
byLink map[string /*host-host*/]*tsync.SyncChan
byLink map[string /*host-host*/]*linkDst
connected map[string /*addr-addr*/]bool
} }
func NewEventRouter() *EventRouter { func NewEventRouter() *EventRouter {
return &EventRouter{ return &EventRouter{
defaultq: tsync.NewSyncChan("default"), defaultq: tsync.NewSyncChan("default"),
byNode: make(map[string]*tsync.SyncChan), byNode: make(map[string]*tsync.SyncChan),
byLink: make(map[string]*tsync.SyncChan), byLink: make(map[string]*linkDst),
connected: make(map[string]bool),
} }
} }
...@@ -135,8 +139,9 @@ func (r *EventRouter) AllRoutes() []*tsync.SyncChan { ...@@ -135,8 +139,9 @@ func (r *EventRouter) AllRoutes() []*tsync.SyncChan {
for _, dst := range r.byNode { for _, dst := range r.byNode {
rtset[dst] = 1 rtset[dst] = 1
} }
for _, dst := range r.byLink { for _, ldst := range r.byLink {
rtset[dst] = 1 rtset[ldst.a] = 1
rtset[ldst.b] = 1
} }
var rtv []*tsync.SyncChan var rtv []*tsync.SyncChan
...@@ -179,12 +184,47 @@ func (r *EventRouter) Route(event interface{}) (dst *tsync.SyncChan) { ...@@ -179,12 +184,47 @@ func (r *EventRouter) Route(event interface{}) (dst *tsync.SyncChan) {
case *eventNetConnect: case *eventNetConnect:
link := host(ev.Src) + "-" + host(ev.Dst) link := host(ev.Src) + "-" + host(ev.Dst)
dst = r.byLink[link] ldst := r.byLink[link]
if ldst != nil {
dst = ldst.a
}
// remember who dialed and who was listening so that when
// seeing eventNeoSend we can determine by connID who initiated
// the exchange.
//
// remember with full host:port addresses, since potentially
// there can be several connections and a->b and a<-b at the
// same time. Having port around will allow to see which one it
// actually is.
r.connected[ev.Src + "-" + ev.Dst] = true
case *eventNeoSend: case *eventNeoSend:
// XXX adjust link according to ConnID/rest var ldst *linkDst
link := host(ev.Src) + "-" + host(ev.Dst)
dst = r.byLink[link] // find out link and cause dst according to ConnID and who connected to who
a, b := host(ev.Src), host(ev.Dst)
switch {
case r.connected[ev.Src + "-" + ev.Dst]:
ldst = r.byLink[a+"-"+b]
case r.connected[ev.Dst + "-" + ev.Src]:
ldst = r.byLink[b+"-"+a]
default:
// FIXME bad - did not seen connect
panic("TODO")
}
if ldst == nil {
break // link not branched
}
if ev.ConnID % 2 == 1 {
dst = ldst.a
} else {
dst = ldst.b
}
// state changes // state changes
case *eventNodeTab: case *eventNodeTab:
...@@ -212,10 +252,13 @@ func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) { ...@@ -212,10 +252,13 @@ func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) {
r.byNode[host] = dst r.byNode[host] = dst
} }
// BranchLink branches events corresponding to link in between a-b with cause root on a to dst. // BranchLink branches events corresponding to link in between a-b.
// //
// link should be of "a-b" form. // Link should be of "a-b" form with b listening and a dialing.
func (r *EventRouter) BranchLink(link string, dst *tsync.SyncChan) { //
// Event with networkiing cause root coming from a go to dsta, and with
// networking cause root coming from b - go to dstb.
func (r *EventRouter) BranchLink(link string, dsta, dstb *tsync.SyncChan) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
...@@ -223,7 +266,14 @@ func (r *EventRouter) BranchLink(link string, dst *tsync.SyncChan) { ...@@ -223,7 +266,14 @@ func (r *EventRouter) BranchLink(link string, dst *tsync.SyncChan) {
panic(fmt.Sprintf("event router: link %q already branched", link)) panic(fmt.Sprintf("event router: link %q already branched", link))
} }
r.byLink[link] = dst // XXX verify b-a not registered too ?
r.byLink[link] = &linkDst{dsta, dstb}
}
type linkDst struct {
a *tsync.SyncChan // net cause was on dialer
b *tsync.SyncChan // net cause was on listener
} }
// ---- trace probes, etc -> events -> dispatcher ---- // ---- trace probes, etc -> events -> dispatcher ----
...@@ -422,7 +472,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -422,7 +472,7 @@ func TestMasterStorage(t *testing.T) {
rt.BranchNode("m", cM) rt.BranchNode("m", cM)
rt.BranchNode("s", cS) rt.BranchNode("s", cS)
rt.BranchLink("s-m", cSM) rt.BranchLink("s-m", cSM, cMS)
// cluster nodes // cluster nodes
M := NewMaster("abc1", ":1", Mhost) M := NewMaster("abc1", ":1", Mhost)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment