Commit 3229af24 authored by Kirill Smelkov's avatar Kirill Smelkov

X NodeTab tracing; IdTimestamp currently flaky

parent 5adc637c
...@@ -81,9 +81,12 @@ type NodeTable struct { ...@@ -81,9 +81,12 @@ type NodeTable struct {
//ver int // ↑ for versioning XXX do we need this? //ver int // ↑ for versioning XXX do we need this?
} }
//trace:event traceNodeChanged(nt *NodeTable, n *Node)
// Node represents a node entry in NodeTable // Node represents a node entry in NodeTable
type Node struct { type Node struct {
NodeInfo NodeInfo
// XXX have Node point to -> NodeTable?
// link to this node; =nil if not connected // link to this node; =nil if not connected
Link *NodeLink Link *NodeLink
...@@ -120,7 +123,11 @@ func (nt *NodeTable) Update(nodeInfo NodeInfo, conn *Conn /*XXX better link *Nod ...@@ -120,7 +123,11 @@ func (nt *NodeTable) Update(nodeInfo NodeInfo, conn *Conn /*XXX better link *Nod
node.NodeInfo = nodeInfo node.NodeInfo = nodeInfo
node.Conn = conn node.Conn = conn
if conn != nil {
node.Link = conn.Link() node.Link = conn.Link()
}
traceNodeChanged(nt, node)
nt.notify(node.NodeInfo) nt.notify(node.NodeInfo)
return node return node
...@@ -142,6 +149,7 @@ func (nt *NodeTable) GetByLink(link *NodeLink) *Node { ...@@ -142,6 +149,7 @@ func (nt *NodeTable) GetByLink(link *NodeLink) *Node {
// XXX doc // XXX doc
func (nt *NodeTable) SetNodeState(node *Node, state NodeState) { func (nt *NodeTable) SetNodeState(node *Node, state NodeState) {
node.NodeState = state node.NodeState = state
traceNodeChanged(nt, node)
nt.notify(node.NodeInfo) nt.notify(node.NodeInfo)
} }
......
...@@ -30,6 +30,7 @@ import ( ...@@ -30,6 +30,7 @@ import (
"net" "net"
//"reflect" //"reflect"
"testing" "testing"
"unsafe"
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
//"lab.nexedi.com/kirr/neo/go/neo/client" //"lab.nexedi.com/kirr/neo/go/neo/client"
...@@ -74,6 +75,7 @@ func (t *MyTracer) TraceNetTx(ev *xnet.TraceTx) {} // { t.Trace1(ev) } ...@@ -74,6 +75,7 @@ func (t *MyTracer) TraceNetTx(ev *xnet.TraceTx) {} // { t.Trace1(ev) }
//type traceNeoRecv struct {conn *neo.Conn; msg neo.Msg} //type traceNeoRecv struct {conn *neo.Conn; msg neo.Msg}
//func (t *MyTracer) traceNeoConnRecv(c *neo.Conn, msg neo.Msg) { t.Trace1(&traceNeoRecv{c, msg}) } //func (t *MyTracer) traceNeoConnRecv(c *neo.Conn, msg neo.Msg) { t.Trace1(&traceNeoRecv{c, msg}) }
// tx via neo.Conn
type traceNeoSend struct { type traceNeoSend struct {
Src, Dst net.Addr Src, Dst net.Addr
ConnID uint32 ConnID uint32
...@@ -83,62 +85,27 @@ func (t *MyTracer) traceNeoConnSendPre(c *neo.Conn, msg neo.Msg) { ...@@ -83,62 +85,27 @@ func (t *MyTracer) traceNeoConnSendPre(c *neo.Conn, msg neo.Msg) {
t.Trace1(&traceNeoSend{c.Link().LocalAddr(), c.Link().RemoteAddr(), c.ConnID(), msg}) t.Trace1(&traceNeoSend{c.Link().LocalAddr(), c.Link().RemoteAddr(), c.ConnID(), msg})
} }
type traceNeoClusterState struct { // cluster state changed
type traceClusterState struct {
Ptr *neo.ClusterState // pointer to variable which holds the state Ptr *neo.ClusterState // pointer to variable which holds the state
State neo.ClusterState State neo.ClusterState
} }
func (t *MyTracer) traceNeoClusterState(cs *neo.ClusterState) { func (t *MyTracer) traceClusterState(cs *neo.ClusterState) {
t.Trace1(&traceNeoClusterState{cs, *cs}) t.Trace1(&traceClusterState{cs, *cs})
} }
func clusterState(cs *neo.ClusterState, v neo.ClusterState) *traceNeoClusterState { func clusterState(cs *neo.ClusterState, v neo.ClusterState) *traceClusterState {
return &traceNeoClusterState{cs, v} return &traceClusterState{cs, v}
} }
/* // nodetab entry changed
func (tc *TraceChecker) ExpectNetDial(dst string) { type traceNode struct {
tc.t.Helper() NodeTab unsafe.Pointer // *neo.NodeTable XXX not to noise test diff output
NodeInfo neo.NodeInfo
var ev *xnet.TraceDial
msg := tc.xget1(&ev)
if ev.Dst != dst {
tc.t.Fatalf("net dial: have %v; want: %v", ev.Dst, dst)
}
close(msg.ack)
} }
func (t *MyTracer) traceNode(nt *neo.NodeTable, n *neo.Node) {
func (tc *TraceChecker) ExpectNetListen(laddr string) { t.Trace1(&traceNode{unsafe.Pointer(nt), n.NodeInfo})
tc.t.Helper()
var ev *xnet.TraceListen
msg := tc.xget1(&ev)
if ev.Laddr != laddr {
tc.t.Fatalf("net listen: have %v; want %v", ev.Laddr, laddr)
}
close(msg.ack)
} }
func (tc *TraceChecker) ExpectNetTx(src, dst string, pkt string) {
tc.t.Helper()
var ev *xnet.TraceTx
msg := tc.xget1(&ev)
pktb := []byte(pkt)
if !(ev.Src.String() == src &&
ev.Dst.String() == dst &&
bytes.Equal(ev.Pkt, pktb)) {
// TODO also print all (?) previous events
tc.t.Fatalf("expect:\nhave: %s -> %s %v\nwant: %s -> %s %v",
ev.Src, ev.Dst, ev.Pkt, src, dst, pktb)
}
close(msg.ack)
}
*/
//trace:import "lab.nexedi.com/kirr/neo/go/neo" //trace:import "lab.nexedi.com/kirr/neo/go/neo"
...@@ -155,7 +122,8 @@ func TestMasterStorage(t *testing.T) { ...@@ -155,7 +122,8 @@ func TestMasterStorage(t *testing.T) {
tracing.Lock() tracing.Lock()
//neo_traceConnRecv_Attach(pg, tracer.traceNeoConnRecv) //neo_traceConnRecv_Attach(pg, tracer.traceNeoConnRecv)
neo_traceConnSendPre_Attach(pg, tracer.traceNeoConnSendPre) neo_traceConnSendPre_Attach(pg, tracer.traceNeoConnSendPre)
neo_traceClusterStateChanged_Attach(pg, tracer.traceNeoClusterState) neo_traceClusterStateChanged_Attach(pg, tracer.traceClusterState)
neo_traceNodeChanged_Attach(pg, tracer.traceNode)
tracing.Unlock() tracing.Unlock()
...@@ -165,6 +133,11 @@ func TestMasterStorage(t *testing.T) { ...@@ -165,6 +133,11 @@ func TestMasterStorage(t *testing.T) {
exc.Raiseif(err) exc.Raiseif(err)
return a return a
} }
xnaddr := func(addr string) neo.Address {
a, err := neo.Addr(xaddr(addr))
exc.Raiseif(err)
return a
}
// shortcut for net tx event // shortcut for net tx event
// XXX -> NetTx ? // XXX -> NetTx ?
...@@ -187,6 +160,20 @@ func TestMasterStorage(t *testing.T) { ...@@ -187,6 +160,20 @@ func TestMasterStorage(t *testing.T) {
return &traceNeoSend{Src: xaddr(src), Dst: xaddr(dst), ConnID: connid, Msg: msg} return &traceNeoSend{Src: xaddr(src), Dst: xaddr(dst), ConnID: connid, Msg: msg}
} }
// shortcut for nodetab change
node := func(nt *neo.NodeTable, laddr string, typ neo.NodeType, num int32, state neo.NodeState, idtstamp float64) *traceNode {
return &traceNode{
NodeTab: unsafe.Pointer(nt),
NodeInfo: neo.NodeInfo{
NodeType: typ,
Address: xnaddr(laddr),
NodeUUID: neo.UUID(typ, num),
NodeState: state,
IdTimestamp: idtstamp,
},
}
}
Mhost := xnet.NetTrace(net.Host("m"), tracer) Mhost := xnet.NetTrace(net.Host("m"), tracer)
Shost := xnet.NetTrace(net.Host("s"), tracer) Shost := xnet.NetTrace(net.Host("s"), tracer)
...@@ -206,7 +193,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -206,7 +193,7 @@ func TestMasterStorage(t *testing.T) {
// expect: // expect:
tc.Expect(netlisten("m:1")) tc.Expect(netlisten("m:1"))
// XXX M.nodeTab <- Node(M) tc.Expect(node(M.nodeTab, "m:1", neo.MASTER, 1, neo.RUNNING, 0.0))
tc.Expect(clusterState(&M.clusterState, neo.ClusterRecovering)) tc.Expect(clusterState(&M.clusterState, neo.ClusterRecovering))
// start storage // start storage
...@@ -232,12 +219,12 @@ func TestMasterStorage(t *testing.T) { ...@@ -232,12 +219,12 @@ func TestMasterStorage(t *testing.T) {
tc.Expect(conntx("s:2", "m:2", 1, &neo.RequestIdentification{ tc.Expect(conntx("s:2", "m:2", 1, &neo.RequestIdentification{
NodeType: neo.STORAGE, NodeType: neo.STORAGE,
NodeUUID: 0, NodeUUID: 0,
Address: neo.Address{"s", 1}, //XXX "s:1", Address: xnaddr("s:1"),
ClusterName: "abc1", ClusterName: "abc1",
IdTimestamp: 0, IdTimestamp: 0,
})) }))
// XXX M.nodeTab <- Node(S1) tc.Expect(node(M.nodeTab, "s:1", neo.STORAGE, 1, neo.PENDING, 0.0)) // XXX t
tc.Expect(conntx("m:2", "s:2", 1, &neo.AcceptIdentification{ tc.Expect(conntx("m:2", "s:2", 1, &neo.AcceptIdentification{
NodeType: neo.MASTER, NodeType: neo.MASTER,
......
...@@ -147,9 +147,6 @@ func (m *Master) setClusterState(state neo.ClusterState) { ...@@ -147,9 +147,6 @@ func (m *Master) setClusterState(state neo.ClusterState) {
// Run starts master node and runs it until ctx is cancelled or fatal error // Run starts master node and runs it until ctx is cancelled or fatal error
func (m *Master) Run(ctx context.Context) (err error) { func (m *Master) Run(ctx context.Context) (err error) {
m.node.MyInfo.NodeUUID = m.allocUUID(neo.MASTER)
// TODO update nodeTab with self
// start listening // start listening
l, err := m.node.Listen() l, err := m.node.Listen()
if err != nil { if err != nil {
...@@ -159,6 +156,24 @@ func (m *Master) Run(ctx context.Context) (err error) { ...@@ -159,6 +156,24 @@ func (m *Master) Run(ctx context.Context) (err error) {
defer runningf(&ctx, "master(%v)", l.Addr())(&err) defer runningf(&ctx, "master(%v)", l.Addr())(&err)
m.node.MasterAddr = l.Addr().String() m.node.MasterAddr = l.Addr().String()
naddr, err := neo.Addr(l.Addr())
if err != nil {
// must be ok since l.Addr() is valid since it is listening
// XXX panic -> errors.Wrap?
panic(err)
}
m.node.MyInfo = neo.NodeInfo{
NodeType: neo.MASTER,
Address: naddr,
NodeUUID: m.allocUUID(neo.MASTER),
NodeState: neo.RUNNING,
IdTimestamp: 0, // XXX ok?
}
// update nodeTab with self
m.nodeTab.Update(m.node.MyInfo, nil /*XXX ok? we are not connecting to self*/)
// accept incoming connections and pass them to main driver // accept incoming connections and pass them to main driver
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
......
...@@ -13,7 +13,7 @@ import ( ...@@ -13,7 +13,7 @@ import (
// traceimport: "lab.nexedi.com/kirr/neo/go/neo" // traceimport: "lab.nexedi.com/kirr/neo/go/neo"
// rerun "gotrace gen" if you see link failure ↓↓↓ // rerun "gotrace gen" if you see link failure ↓↓↓
//go:linkname neo_trace_exporthash lab.nexedi.com/kirr/neo/go/neo._trace_exporthash_46f45c4a2306b317d62d3cded6f5ec228f0cf669 //go:linkname neo_trace_exporthash lab.nexedi.com/kirr/neo/go/neo._trace_exporthash_ab325b43be064a06d1c80db96d5bf50678b5b037
func neo_trace_exporthash() func neo_trace_exporthash()
func init() { neo_trace_exporthash() } func init() { neo_trace_exporthash() }
...@@ -26,3 +26,6 @@ func neo_traceConnRecv_Attach(*tracing.ProbeGroup, func(c *neo.Conn, msg neo.Msg ...@@ -26,3 +26,6 @@ func neo_traceConnRecv_Attach(*tracing.ProbeGroup, func(c *neo.Conn, msg neo.Msg
//go:linkname neo_traceConnSendPre_Attach lab.nexedi.com/kirr/neo/go/neo.traceConnSendPre_Attach //go:linkname neo_traceConnSendPre_Attach lab.nexedi.com/kirr/neo/go/neo.traceConnSendPre_Attach
func neo_traceConnSendPre_Attach(*tracing.ProbeGroup, func(c *neo.Conn, msg neo.Msg)) *tracing.Probe func neo_traceConnSendPre_Attach(*tracing.ProbeGroup, func(c *neo.Conn, msg neo.Msg)) *tracing.Probe
//go:linkname neo_traceNodeChanged_Attach lab.nexedi.com/kirr/neo/go/neo.traceNodeChanged_Attach
func neo_traceNodeChanged_Attach(*tracing.ProbeGroup, func(nt *neo.NodeTable, n *neo.Node)) *tracing.Probe
...@@ -89,5 +89,32 @@ func traceConnSendPre_Attach(pg *tracing.ProbeGroup, probe func(c *Conn, msg Msg ...@@ -89,5 +89,32 @@ func traceConnSendPre_Attach(pg *tracing.ProbeGroup, probe func(c *Conn, msg Msg
return &p.Probe return &p.Probe
} }
// traceevent: traceNodeChanged(nt *NodeTable, n *Node)
type _t_traceNodeChanged struct {
tracing.Probe
probefunc func(nt *NodeTable, n *Node)
}
var _traceNodeChanged *_t_traceNodeChanged
func traceNodeChanged(nt *NodeTable, n *Node) {
if _traceNodeChanged != nil {
_traceNodeChanged_run(nt, n)
}
}
func _traceNodeChanged_run(nt *NodeTable, n *Node) {
for p := _traceNodeChanged; p != nil; p = (*_t_traceNodeChanged)(unsafe.Pointer(p.Next())) {
p.probefunc(nt, n)
}
}
func traceNodeChanged_Attach(pg *tracing.ProbeGroup, probe func(nt *NodeTable, n *Node)) *tracing.Probe {
p := _t_traceNodeChanged{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceNodeChanged)), &p.Probe)
return &p.Probe
}
// trace export signature // trace export signature
func _trace_exporthash_46f45c4a2306b317d62d3cded6f5ec228f0cf669() {} func _trace_exporthash_ab325b43be064a06d1c80db96d5bf50678b5b037() {}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment