Commit 14ca848c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 87a6b2d4
...@@ -100,8 +100,6 @@ type Conn struct { ...@@ -100,8 +100,6 @@ type Conn struct {
rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed
rxclosed int32 // whether CloseRecv was called rxclosed int32 // whether CloseRecv was called
txclosed int32 // whether CloseSend was called txclosed int32 // whether CloseSend was called
// closed int32 // 1 if Close was called or "connection closed" entry
// // incremented during every replyNoConn() in progress
errMsg *Error // error message for replyNoConn errMsg *Error // error message for replyNoConn
} }
...@@ -113,10 +111,6 @@ var ErrLinkNoListen = errors.New("node link is not listening for incoming connec ...@@ -113,10 +111,6 @@ var ErrLinkNoListen = errors.New("node link is not listening for incoming connec
var ErrLinkManyConn = errors.New("too many opened connections") var ErrLinkManyConn = errors.New("too many opened connections")
var ErrClosedConn = errors.New("connection is closed") var ErrClosedConn = errors.New("connection is closed")
// XXX unify LinkError & ConnError -> NetError?
// (think from point of view how user should be handling errors)
// XXX or it is good to be able to distinguish between only conn error vs whole-link error?
// LinkError is returned by NodeLink operations // LinkError is returned by NodeLink operations
type LinkError struct { type LinkError struct {
Link *NodeLink Link *NodeLink
...@@ -182,7 +176,7 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink { ...@@ -182,7 +176,7 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
switch role &^ linkFlagsMask { switch role &^ linkFlagsMask {
case LinkServer: case LinkServer:
nextConnId = 0 // all initiated by us connId will be even nextConnId = 0 // all initiated by us connId will be even
acceptq = make(chan *Conn) // accept queue; TODO use backlog? acceptq = make(chan *Conn) // accept queue; TODO use backlog
case LinkClient: case LinkClient:
nextConnId = 1 // ----//---- odd nextConnId = 1 // ----//---- odd
acceptq = nil // not accepting incoming connections acceptq = nil // not accepting incoming connections
...@@ -317,7 +311,7 @@ func (c *Conn) shutdownRX(errMsg *Error) { ...@@ -317,7 +311,7 @@ func (c *Conn) shutdownRX(errMsg *Error) {
// time to keep record of a closed connection so that we can properly reply // time to keep record of a closed connection so that we can properly reply
// "connection closed" if a packet comes in with this connection's connID. // "connection closed" if a packet comes in with same connID.
var connKeepClosed = 1*time.Minute var connKeepClosed = 1*time.Minute
// CloseRecv closes reading end of connection. // CloseRecv closes reading end of connection.
...@@ -504,7 +498,6 @@ func (nl *NodeLink) serveRecv() { ...@@ -504,7 +498,6 @@ func (nl *NodeLink) serveRecv() {
// resetting it waits for us to finish. // resetting it waits for us to finish.
conn := nl.connTab[connId] conn := nl.connTab[connId]
//fmt.Printf("RX .%d -> %v\n", connId, conn)
if conn == nil { if conn == nil {
// "new" connection will be needed in all cases - e.g. // "new" connection will be needed in all cases - e.g.
// even temporarily to reply "connection refused" // even temporarily to reply "connection refused"
...@@ -512,30 +505,25 @@ func (nl *NodeLink) serveRecv() { ...@@ -512,30 +505,25 @@ func (nl *NodeLink) serveRecv() {
// message with connid that should be initiated by us // message with connid that should be initiated by us
if connId % 2 == nl.nextConnId % 2 { if connId % 2 == nl.nextConnId % 2 {
//conn.errMsg = errConnClosed
conn.shutdownRX(errConnClosed) conn.shutdownRX(errConnClosed)
// message with connid for a stream initiated by peer // message with connid for a stream initiated by peer
} else { } else {
if nl.acceptq == nil { if nl.acceptq == nil {
//conn.errMsg = errConnRefused
conn.shutdownRX(errConnRefused) conn.shutdownRX(errConnRefused)
} else { } else {
// we are accepting new incoming connection // we are accepting new incoming connection
accept = true accept = true
} }
} }
}
/* // delete temporary conn from .connTab - this way the
// we are not accepting packet in any way // connection will be automatically garbage-collected
if conn.errMsg != nil { // after its final use.
atomic.AddInt32(&conn.closed, 1) if !accept {
nl.connMu.Unlock() delete(nl.connTab, conn.connId)
go conn.replyNoConn() }
continue
} }
*/
nl.connMu.Unlock() nl.connMu.Unlock()
...@@ -601,19 +589,8 @@ var errConnClosed = &Error{PROTOCOL_ERROR, "connection closed"} ...@@ -601,19 +589,8 @@ var errConnClosed = &Error{PROTOCOL_ERROR, "connection closed"}
var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"} var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"}
// replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection // replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection
// and removes connection from nodeLink connTab if ekeep==false. XXX ekeep gone
//func (c *Conn) replyNoConn(e Msg, ekeep bool) {
func (c *Conn) replyNoConn() { func (c *Conn) replyNoConn() {
c.Send(c.errMsg) // ignore errors c.Send(c.errMsg) // ignore errors
//fmt.Println("errsend:", err)
// remove connTab entry - if all users of this temporary conn created
// only to send the error are now gone.
c.nodeLink.connMu.Lock()
if atomic.AddInt32(&c.closed, -1) == 0 {
delete(c.nodeLink.connTab, c.connId)
}
c.nodeLink.connMu.Unlock()
} }
// ---- transmit ---- // ---- transmit ----
......
...@@ -163,7 +163,7 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) { ...@@ -163,7 +163,7 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) {
} }
// XXX temp for cluster_test.go // XXX temp for cluster_test.go
var NodeLinkPipe = nodeLinkPipe // var NodeLinkPipe = nodeLinkPipe
func TestNodeLink(t *testing.T) { func TestNodeLink(t *testing.T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup) // TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
...@@ -457,6 +457,7 @@ func TestNodeLink(t *testing.T) { ...@@ -457,6 +457,7 @@ func TestNodeLink(t *testing.T) {
// Conn accept + exchange // Conn accept + exchange
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
wg = &xsync.WorkGroup{} wg = &xsync.WorkGroup{}
closed := make(chan int)
wg.Gox(func() { wg.Gox(func() {
c := xaccept(nl2) c := xaccept(nl2)
...@@ -472,48 +473,58 @@ func TestNodeLink(t *testing.T) { ...@@ -472,48 +473,58 @@ func TestNodeLink(t *testing.T) {
xsendPkt(c, mkpkt(36, []byte("pong2"))) xsendPkt(c, mkpkt(36, []byte("pong2")))
xclose(c) xclose(c)
closed <- 1
//println("B.111") // once again as ^^^ but finish only with CloseRecv
c2 := xaccept(nl2)
pkt = xrecvPkt(c2)
xverifyPkt(pkt, c2.connId, 41, []byte("ping5"))
xsendPkt(c2, mkpkt(42, []byte("pong5")))
c2.CloseRecv()
closed <- 2
// "connection refused" when trying to connect to not-listening peer // "connection refused" when trying to connect to not-listening peer
c = xnewconn(nl2) // XXX should get error here? c = xnewconn(nl2) // XXX should get error here?
xsendPkt(c, mkpkt(38, []byte("pong3"))) xsendPkt(c, mkpkt(38, []byte("pong3")))
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
xverifyMsg(pkt, c.connId, errConnRefused) xverifyMsg(pkt, c.connId, errConnRefused)
//println("B.222")
xsendPkt(c, mkpkt(40, []byte("pong4"))) // once again xsendPkt(c, mkpkt(40, []byte("pong4"))) // once again
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
xverifyMsg(pkt, c.connId, errConnRefused) xverifyMsg(pkt, c.connId, errConnRefused)
//println("B.333")
xclose(c) xclose(c)
}) })
//println("A.111") c1 := xnewconn(nl1)
c = xnewconn(nl1) xsendPkt(c1, mkpkt(33, []byte("ping")))
xsendPkt(c, mkpkt(33, []byte("ping"))) pkt = xrecvPkt(c1)
pkt = xrecvPkt(c) xverifyPkt(pkt, c1.connId, 34, []byte("pong"))
xverifyPkt(pkt, c.connId, 34, []byte("pong")) xsendPkt(c1, mkpkt(35, []byte("ping2")))
xsendPkt(c, mkpkt(35, []byte("ping2"))) pkt = xrecvPkt(c1)
pkt = xrecvPkt(c) xverifyPkt(pkt, c1.connId, 36, []byte("pong2"))
xverifyPkt(pkt, c.connId, 36, []byte("pong2"))
xwait(wg)
//println()
//println()
//println("A.222")
// "connection closed" after peer closed its end // "connection closed" after peer closed its end
xsendPkt(c, mkpkt(37, []byte("ping3"))) <-closed
//println("A.qqq") xsendPkt(c1, mkpkt(37, []byte("ping3")))
pkt = xrecvPkt(c) pkt = xrecvPkt(c1)
xverifyMsg(pkt, c.connId, errConnClosed) xverifyMsg(pkt, c1.connId, errConnClosed)
//println("A.zzz") xsendPkt(c1, mkpkt(39, []byte("ping4"))) // once again
xsendPkt(c, mkpkt(39, []byte("ping4"))) // once again pkt = xrecvPkt(c1)
pkt = xrecvPkt(c) xverifyMsg(pkt, c1.connId, errConnClosed)
xverifyMsg(pkt, c.connId, errConnClosed)
// XXX also should get EOF on recv // XXX also should get EOF on recv
//println("A.333") // one more time but now peer does only .CloseRecv()
c2 := xnewconn(nl1)
xsendPkt(c2, mkpkt(41, []byte("ping5")))
pkt = xrecvPkt(c2)
xverifyPkt(pkt, c2.connId, 42, []byte("pong5"))
<-closed
xsendPkt(c2, mkpkt(41, []byte("ping6")))
pkt = xrecvPkt(c2)
xverifyMsg(pkt, c2.connId, errConnClosed)
xwait(wg)
// make sure entry for closed nl2.1 stays in nl2.connTab // make sure entry for closed nl2.1 stays in nl2.connTab
nl2.connMu.Lock() nl2.connMu.Lock()
...@@ -530,7 +541,8 @@ func TestNodeLink(t *testing.T) { ...@@ -530,7 +541,8 @@ func TestNodeLink(t *testing.T) {
} }
nl2.connMu.Unlock() nl2.connMu.Unlock()
xclose(c) xclose(c1)
xclose(c2)
xclose(nl1) xclose(nl1)
xclose(nl2) xclose(nl2)
connKeepClosed = saveKeepClosed connKeepClosed = saveKeepClosed
...@@ -570,8 +582,8 @@ func TestNodeLink(t *testing.T) { ...@@ -570,8 +582,8 @@ func TestNodeLink(t *testing.T) {
} }
}) })
c1 := xnewconn(nl1) c1 = xnewconn(nl1)
c2 := xnewconn(nl1) c2 = xnewconn(nl1)
xsendPkt(c1, mkpkt(1, []byte(""))) xsendPkt(c1, mkpkt(1, []byte("")))
xsendPkt(c2, mkpkt(2, []byte(""))) xsendPkt(c2, mkpkt(2, []byte("")))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment