Commit b978935d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 94001fa9
...@@ -53,7 +53,7 @@ type NodeLink struct { ...@@ -53,7 +53,7 @@ type NodeLink struct {
nextConnId uint32 // next connId to use for Conn initiated by us nextConnId uint32 // next connId to use for Conn initiated by us
serveWg sync.WaitGroup // for serve{Send,Recv} serveWg sync.WaitGroup // for serve{Send,Recv}
//handleWg sync.WaitGroup // for spawned handlers XXX do we need this + .Wait() ? handleWg sync.WaitGroup // for spawned handlers
handleNewConn func(conn *Conn) // handler for new connections handleNewConn func(conn *Conn) // handler for new connections
txreq chan txReq // tx requests from Conns go via here txreq chan txReq // tx requests from Conns go via here
...@@ -159,6 +159,9 @@ func (nl *NodeLink) Close() error { ...@@ -159,6 +159,9 @@ func (nl *NodeLink) Close() error {
nl.serveWg.Wait() nl.serveWg.Wait()
//fmt.Printf("%p\t (wait) -> woken up\n", nl) //fmt.Printf("%p\t (wait) -> woken up\n", nl)
// XXX do we want to also Wait for handlers here?
// (problem is peerLink is closed first so this might cause handlers to see errors)
return err return err
} }
...@@ -293,7 +296,11 @@ func (nl *NodeLink) serveRecv() { ...@@ -293,7 +296,11 @@ func (nl *NodeLink) serveRecv() {
// TODO avoid spawning goroutine for each new Ask request - // TODO avoid spawning goroutine for each new Ask request -
// - by keeping pool of read inactive goroutine / conn pool ? // - by keeping pool of read inactive goroutine / conn pool ?
//println("+ handle", connId) //println("+ handle", connId)
go handleNewConn(conn) go func() {
nl.handleWg.Add(1)
defer nl.handleWg.Done()
handleNewConn(conn)
}()
} }
// route packet to serving goroutine handler // route packet to serving goroutine handler
...@@ -301,6 +308,12 @@ func (nl *NodeLink) serveRecv() { ...@@ -301,6 +308,12 @@ func (nl *NodeLink) serveRecv() {
} }
} }
// wait for all handlers spawned for accepted connections to complete
// XXX naming -> WaitHandlers ?
func (nl *NodeLink) Wait() {
nl.handleWg.Wait()
}
// request to transmit a packet. Result error goes back to errch // request to transmit a packet. Result error goes back to errch
type txReq struct { type txReq struct {
......
...@@ -264,7 +264,6 @@ func TestNodeLink(t *testing.T) { ...@@ -264,7 +264,6 @@ func TestNodeLink(t *testing.T) {
// Conn accept + exchange // Conn accept + exchange
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
hdone := make(chan struct{})
nl2.HandleNewConn(func(c *Conn) { nl2.HandleNewConn(func(c *Conn) {
// TODO raised err -> errch // TODO raised err -> errch
pkt := xrecv(c) pkt := xrecv(c)
...@@ -279,7 +278,6 @@ func TestNodeLink(t *testing.T) { ...@@ -279,7 +278,6 @@ func TestNodeLink(t *testing.T) {
xsend(c, mkpkt(36, []byte("pong2"))) xsend(c, mkpkt(36, []byte("pong2")))
xclose(c) xclose(c)
close(hdone)
}) })
c = nl1.NewConn() c = nl1.NewConn()
xsend(c, mkpkt(33, []byte("ping"))) xsend(c, mkpkt(33, []byte("ping")))
...@@ -288,10 +286,10 @@ func TestNodeLink(t *testing.T) { ...@@ -288,10 +286,10 @@ func TestNodeLink(t *testing.T) {
xsend(c, mkpkt(35, []byte("ping2"))) xsend(c, mkpkt(35, []byte("ping2")))
pkt = xrecv(c) pkt = xrecv(c)
xverifyPkt(pkt, c.connId, 36, []byte("pong2")) xverifyPkt(pkt, c.connId, 36, []byte("pong2"))
<-hdone
xclose(c) xclose(c)
xclose(nl1) xclose(nl1)
nl2.Wait()
xclose(nl2) xclose(nl2)
// test 2 channels with replies comming in reversed time order // test 2 channels with replies comming in reversed time order
...@@ -324,25 +322,20 @@ func TestNodeLink(t *testing.T) { ...@@ -324,25 +322,20 @@ func TestNodeLink(t *testing.T) {
c1 := nl1.NewConn() c1 := nl1.NewConn()
c2 := nl1.NewConn() c2 := nl1.NewConn()
println("111")
xsend(c1, mkpkt(1, []byte(""))) xsend(c1, mkpkt(1, []byte("")))
println("222")
xsend(c2, mkpkt(2, []byte(""))) xsend(c2, mkpkt(2, []byte("")))
println("333")
// replies must be coming in reverse order // replies must be coming in reverse order
xechoWait := func(c *Conn, msgCode uint16) { xechoWait := func(c *Conn, msgCode uint16) {
pkt := xrecv(c) pkt := xrecv(c)
xverifyPkt(pkt, c.connId, msgCode, []byte("")) xverifyPkt(pkt, c.connId, msgCode, []byte(""))
} }
println("aaa")
xechoWait(c2, 2) xechoWait(c2, 2)
println("bbb")
xechoWait(c1, 1) xechoWait(c1, 1)
println("ccc")
xclose(c1) xclose(c1)
xclose(c2) xclose(c2)
xclose(nl1) xclose(nl1)
nl2.Wait()
xclose(nl2) xclose(nl2)
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment