Commit 51d8015d authored by Kirill Smelkov's avatar Kirill Smelkov

X "connection closed/refused" seems to be working

parent b00a904e
...@@ -321,9 +321,10 @@ func (c *Conn) Close() error { ...@@ -321,9 +321,10 @@ func (c *Conn) Close() error {
// "connection closed" if another packet comes to it. // "connection closed" if another packet comes to it.
} else { } else {
cc := nl.newConn(c.connId) cc := nl.newConn(c.connId)
// 1 so that cc is not freed by replyNoConn // cc.closed=1 so that cc is not freed by replyNoConn
atomic.StoreInt32(&cc.closed, 1)
// NOTE cc.down stays not closed so Send could work // NOTE cc.down stays not closed so Send could work
atomic.StoreInt32(&cc.closed, 1)
cc.errMsg = errConnClosed
time.AfterFunc(connKeepClosed, func() { time.AfterFunc(connKeepClosed, func() {
nl.connMu.Lock() nl.connMu.Lock()
delete(nl.connTab, cc.connId) delete(nl.connTab, cc.connId)
...@@ -437,14 +438,14 @@ func (nl *NodeLink) serveRecv() { ...@@ -437,14 +438,14 @@ func (nl *NodeLink) serveRecv() {
// resetting it waits for us to finish. // resetting it waits for us to finish.
conn := nl.connTab[connId] conn := nl.connTab[connId]
fmt.Printf("RX .%d -> %v\n", connId, conn) //fmt.Printf("RX .%d -> %v\n", connId, conn)
if conn == nil { if conn == nil {
// "new" connection will be needed in all cases - e.g. // "new" connection will be needed in all cases - e.g.
// temporarily to reply "connection refused" // temporarily to reply "connection refused"
conn = nl.newConn(connId) conn = nl.newConn(connId)
fmt.Printf("connId: %d (%d)\n", connId, connId % 2) //fmt.Printf("connId: %d (%d)\n", connId, connId % 2)
fmt.Printf("nextConnId: %d (%d)\n", nl.nextConnId, nl.nextConnId % 2) //fmt.Printf("nextConnId: %d (%d)\n", nl.nextConnId, nl.nextConnId % 2)
// message with connid that should be initiated by us // message with connid that should be initiated by us
if connId % 2 == nl.nextConnId % 2 { if connId % 2 == nl.nextConnId % 2 {
...@@ -459,13 +460,13 @@ func (nl *NodeLink) serveRecv() { ...@@ -459,13 +460,13 @@ func (nl *NodeLink) serveRecv() {
accept = true accept = true
} }
fmt.Println("ZZZ", conn.errMsg, accept) //fmt.Println("ZZZ", conn.errMsg, accept)
} }
} }
// we are not accepting packet in any way // we are not accepting packet in any way
if conn.errMsg != nil { if conn.errMsg != nil {
fmt.Printf(".%d EMSG: %v\n", connId, conn.errMsg) //fmt.Printf(".%d EMSG: %v\n", connId, conn.errMsg)
atomic.AddInt32(&conn.closed, 1) atomic.AddInt32(&conn.closed, 1)
nl.connMu.Unlock() nl.connMu.Unlock()
go conn.replyNoConn() go conn.replyNoConn()
...@@ -518,6 +519,7 @@ var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"} ...@@ -518,6 +519,7 @@ var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"}
//func (c *Conn) replyNoConn(e Msg, ekeep bool) { //func (c *Conn) replyNoConn(e Msg, ekeep bool) {
func (c *Conn) replyNoConn() { func (c *Conn) replyNoConn() {
c.Send(c.errMsg) // ignore errors c.Send(c.errMsg) // ignore errors
//fmt.Println("errsend:", err)
// remove connTab entry - if all users of this temporary conn created // remove connTab entry - if all users of this temporary conn created
// only to send the error are now gone. // only to send the error are now gone.
......
...@@ -449,7 +449,10 @@ func TestNodeLink(t *testing.T) { ...@@ -449,7 +449,10 @@ func TestNodeLink(t *testing.T) {
} }
println("\n---------------------\n") //println("\n---------------------\n")
saveKeepClosed := connKeepClosed
connKeepClosed = 10*time.Millisecond
// Conn accept + exchange // Conn accept + exchange
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
...@@ -470,23 +473,23 @@ func TestNodeLink(t *testing.T) { ...@@ -470,23 +473,23 @@ func TestNodeLink(t *testing.T) {
xclose(c) xclose(c)
println("B.111") //println("B.111")
// "connection refused" when trying to connect to not-listening peer // "connection refused" when trying to connect to not-listening peer
c = xnewconn(nl2) // XXX should get error here? c = xnewconn(nl2) // XXX should get error here?
xsendPkt(c, mkpkt(38, []byte("pong3"))) xsendPkt(c, mkpkt(38, []byte("pong3")))
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
xverifyMsg(pkt, c.connId, errConnRefused) xverifyMsg(pkt, c.connId, errConnRefused)
println("B.222") //println("B.222")
xsendPkt(c, mkpkt(40, []byte("pong4"))) // once again xsendPkt(c, mkpkt(40, []byte("pong4"))) // once again
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
xverifyMsg(pkt, c.connId, errConnRefused) xverifyMsg(pkt, c.connId, errConnRefused)
println("B.333") //println("B.333")
xclose(c) xclose(c)
}) })
println("A.111") //println("A.111")
c = xnewconn(nl1) c = xnewconn(nl1)
xsendPkt(c, mkpkt(33, []byte("ping"))) xsendPkt(c, mkpkt(33, []byte("ping")))
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
...@@ -496,25 +499,41 @@ func TestNodeLink(t *testing.T) { ...@@ -496,25 +499,41 @@ func TestNodeLink(t *testing.T) {
xverifyPkt(pkt, c.connId, 36, []byte("pong2")) xverifyPkt(pkt, c.connId, 36, []byte("pong2"))
xwait(wg) xwait(wg)
println() //println()
println() //println()
println("A.222") //println("A.222")
// "connection closed" after peer closed its end // "connection closed" after peer closed its end
xsendPkt(c, mkpkt(37, []byte("ping3"))) xsendPkt(c, mkpkt(37, []byte("ping3")))
println("A.qqq") //println("A.qqq")
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
xverifyMsg(pkt, c.connId, errConnClosed) xverifyMsg(pkt, c.connId, errConnClosed)
println("A.zzz") //println("A.zzz")
xsendPkt(c, mkpkt(39, []byte("ping4"))) // once again xsendPkt(c, mkpkt(39, []byte("ping4"))) // once again
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
xverifyMsg(pkt, c.connId, errConnClosed) xverifyMsg(pkt, c.connId, errConnClosed)
// XXX also should get EOF on recv // XXX also should get EOF on recv
println("A.333") //println("A.333")
// make sure entry for closed nl2.1 stays in nl2.connTab
nl2.connMu.Lock()
if cnl2 := nl2.connTab[1]; cnl2 == nil {
t.Fatal("nl2.connTab[1] == nil ; want \"closed\" entry")
}
nl2.connMu.Unlock()
// make sure "closed" entry goes away after its time
time.Sleep(3*connKeepClosed)
nl2.connMu.Lock()
if cnl2 := nl2.connTab[1]; cnl2 != nil {
t.Fatalf("nl2.connTab[1] == %v after close time window ; want nil", cnl2)
}
nl2.connMu.Unlock()
xclose(c) xclose(c)
xclose(nl1) xclose(nl1)
xclose(nl2) xclose(nl2)
connKeepClosed = saveKeepClosed
// test 2 channels with replies coming in reversed time order // test 2 channels with replies coming in reversed time order
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment