Commit 8a8ddf1e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 55756877
...@@ -49,17 +49,17 @@ import ( ...@@ -49,17 +49,17 @@ import (
// //
// It is safe to use NodeLink from multiple goroutines simultaneously. // It is safe to use NodeLink from multiple goroutines simultaneously.
type NodeLink struct { type NodeLink struct {
peerLink net.Conn // raw conn to peer peerLink net.Conn // raw conn to peer
connMu sync.Mutex connMu sync.Mutex
connTab map[uint32]*Conn // connId -> Conn associated with connId connTab map[uint32]*Conn // connId -> Conn associated with connId
nextConnId uint32 // next connId to use for Conn initiated by us nextConnId uint32 // next connId to use for Conn initiated by us
serveWg sync.WaitGroup // for serve{Send,Recv} serveWg sync.WaitGroup // for serve{Send,Recv}
acceptq chan *Conn // queue of incoming connections for Accept acceptq chan *Conn // queue of incoming connections for Accept
// = nil if NodeLink is not accepting connections // = nil if NodeLink is not accepting connections
txq chan txReq // tx requests from Conns go via here txq chan txReq // tx requests from Conns go via here
// (rx packets are routed to Conn.rxq) // (rx packets are routed to Conn.rxq)
down chan struct{} // ready when NodeLink is marked as no longer operational down chan struct{} // ready when NodeLink is marked as no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error downOnce sync.Once // shutdown may be due to both Close and IO error
...@@ -87,10 +87,11 @@ type Conn struct { ...@@ -87,10 +87,11 @@ type Conn struct {
down chan struct{} // ready when Conn is marked as no longer operational down chan struct{} // ready when Conn is marked as no longer operational
downOnce sync.Once // shutdown may be called by both Close and nodelink.shutdown downOnce sync.Once // shutdown may be called by both Close and nodelink.shutdown
rxerrOnce sync.Once // IO error is reported only once - then it is link down or closed rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed
closed uint32 // whether Close was called closed uint32 // whether Close was called
} }
var ErrLinkClosed = errors.New("node link is closed") // operations on closed NodeLink var ErrLinkClosed = errors.New("node link is closed") // operations on closed NodeLink
var ErrLinkDown = errors.New("node link is down") // e.g. due to IO error var ErrLinkDown = errors.New("node link is down") // e.g. due to IO error
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections") var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
...@@ -180,7 +181,7 @@ func (nl *NodeLink) NewConn() (*Conn, error) { ...@@ -180,7 +181,7 @@ func (nl *NodeLink) NewConn() (*Conn, error) {
} }
// shutdown closes peerLink and marks NodeLink as no longer operational // shutdown closes peerLink and marks NodeLink as no longer operational
// it also shutdowns and all active Conns. // it also shutdowns and all opened connections over this node link.
func (nl *NodeLink) shutdown() { func (nl *NodeLink) shutdown() {
nl.downOnce.Do(func() { nl.downOnce.Do(func() {
close(nl.down) close(nl.down)
...@@ -231,7 +232,7 @@ func (c *Conn) shutdown() { ...@@ -231,7 +232,7 @@ func (c *Conn) shutdown() {
// Any blocked Send() or Recv() will be unblocked and return error // Any blocked Send() or Recv() will be unblocked and return error
// //
// NOTE for Send() - once transmission was started - it will complete in the // NOTE for Send() - once transmission was started - it will complete in the
// background on the wire not to break framing. // background on the wire not to break node-node link framing.
func (c *Conn) Close() error { func (c *Conn) Close() error {
// adjust nodeLink.connTab // adjust nodeLink.connTab
c.nodeLink.connMu.Lock() c.nodeLink.connMu.Lock()
...@@ -257,7 +258,7 @@ func (nl *NodeLink) Accept() (*Conn, error) { ...@@ -257,7 +258,7 @@ func (nl *NodeLink) Accept() (*Conn, error) {
} }
return nil, ErrLinkDown // XXX test return nil, ErrLinkDown // XXX test
case c := <-nl.acceptq: // XXX -> only c, ok := <-nl.acceptq ? case c := <-nl.acceptq:
return c, nil return c, nil
} }
} }
......
...@@ -329,15 +329,15 @@ func TestNodeLink(t *testing.T) { ...@@ -329,15 +329,15 @@ func TestNodeLink(t *testing.T) {
xclose(c12) xclose(c12)
xclose(nl2) xclose(nl2)
// NodeLink.Close vs Conn.Send/Recv on another side // NodeLink.Close vs Conn.Send/Recv and Accept on another side
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, 0)
c11 = xnewconn(nl1) c21 := xnewconn(nl2)
c12 = xnewconn(nl1) c22 := xnewconn(nl2)
c13 := xnewconn(nl1) c23 := xnewconn(nl2)
wg = WorkGroup() wg = WorkGroup()
var errRecv error var errRecv error
wg.Gox(func() { wg.Gox(func() {
pkt, err := c11.Recv() pkt, err := c21.Recv()
want1 := io.EOF // if recvPkt wakes up due to peer close want1 := io.EOF // if recvPkt wakes up due to peer close
want2 := io.ErrClosedPipe // if recvPkt wakes up due to sendPkt wakes up first and closes nl1 want2 := io.ErrClosedPipe // if recvPkt wakes up due to sendPkt wakes up first and closes nl1
if !(pkt == nil && (err == want1 || err == want2)) { if !(pkt == nil && (err == want1 || err == want2)) {
...@@ -348,81 +348,81 @@ func TestNodeLink(t *testing.T) { ...@@ -348,81 +348,81 @@ func TestNodeLink(t *testing.T) {
}) })
wg.Gox(func() { wg.Gox(func() {
pkt := &PktBuf{[]byte("data")} pkt := &PktBuf{[]byte("data")}
err := c12.Send(pkt) err := c22.Send(pkt)
want := io.ErrClosedPipe // always this in both due to peer close or recvPkt waking up and closing nl1 want := io.ErrClosedPipe // always this in both due to peer close or recvPkt waking up and closing nl2
if err != want { if err != want {
exc.Raisef("Conn.Send after peer NodeLink shutdown: %v", err) exc.Raisef("Conn.Send after peer NodeLink shutdown: %v", err)
} }
}) })
tdelay() tdelay()
xclose(nl2) xclose(nl1)
xwait(wg) xwait(wg)
// XXX denoise vvv // XXX denoise vvv
// NewConn after NodeLink stop // NewConn after NodeLink stop
c, err = nl1.NewConn() c, err = nl2.NewConn()
if err != ErrLinkDown { if err != ErrLinkDown {
t.Fatalf("NewConn after NodeLink stop: %v", err) t.Fatalf("NewConn after NodeLink stop: %v", err)
} }
// Recv/Send on another Conn // Recv/Send on another Conn
pkt, err = c13.Recv() pkt, err = c23.Recv()
if !(pkt == nil && err == errRecv) { if !(pkt == nil && err == errRecv) {
t.Fatalf("Conn.Recv 2 after peer NodeLink shutdown: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.Recv 2 after peer NodeLink shutdown: pkt = %v err = %v", pkt, err)
} }
err = c13.Send(&PktBuf{[]byte("data")}) err = c23.Send(&PktBuf{[]byte("data")})
if err != ErrLinkDown { if err != ErrLinkDown {
t.Fatalf("Conn.Send 2 after peer NodeLink shutdown: %v", err) t.Fatalf("Conn.Send 2 after peer NodeLink shutdown: %v", err)
} }
// Recv/Send error on second call // Recv/Send error on second call
pkt, err = c11.Recv() pkt, err = c21.Recv()
if !(pkt == nil && err == ErrLinkDown) { if !(pkt == nil && err == ErrLinkDown) {
t.Fatalf("Conn.Recv after NodeLink stop: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.Recv after NodeLink stop: pkt = %v err = %v", pkt, err)
} }
err = c12.Send(&PktBuf{[]byte("data")}) err = c22.Send(&PktBuf{[]byte("data")})
if err != ErrLinkDown { if err != ErrLinkDown {
t.Fatalf("Conn.Send after NodeLink stop: %v", err) t.Fatalf("Conn.Send after NodeLink stop: %v", err)
} }
xclose(c13) xclose(c23)
// Recv/Send on closed Conn but not closed NodeLink // Recv/Send on closed Conn but not closed NodeLink
pkt, err = c13.Recv() pkt, err = c23.Recv()
if !(pkt == nil && err == ErrClosedConn) { if !(pkt == nil && err == ErrClosedConn) {
t.Fatalf("Conn.Recv after close but only stopped NodeLink: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.Recv after close but only stopped NodeLink: pkt = %v err = %v", pkt, err)
} }
err = c13.Send(&PktBuf{[]byte("data")}) err = c23.Send(&PktBuf{[]byte("data")})
if err != ErrClosedConn { if err != ErrClosedConn {
t.Fatalf("Conn.Send after close but only stopped NodeLink: %v", err) t.Fatalf("Conn.Send after close but only stopped NodeLink: %v", err)
} }
xclose(nl1) xclose(nl2)
// Recv/Send error after NodeLink close // Recv/Send error after NodeLink close
pkt, err = c11.Recv() pkt, err = c21.Recv()
if !(pkt == nil && err == ErrLinkClosed) { if !(pkt == nil && err == ErrLinkClosed) {
t.Fatalf("Conn.Recv after NodeLink stop: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.Recv after NodeLink stop: pkt = %v err = %v", pkt, err)
} }
err = c12.Send(&PktBuf{[]byte("data")}) err = c22.Send(&PktBuf{[]byte("data")})
if err != ErrLinkClosed { if err != ErrLinkClosed {
t.Fatalf("Conn.Send after NodeLink stop: %v", err) t.Fatalf("Conn.Send after NodeLink stop: %v", err)
} }
// NewConn after NodeLink close // NewConn after NodeLink close
c, err = nl1.NewConn() c, err = nl2.NewConn()
if err != ErrLinkClosed { if err != ErrLinkClosed {
t.Fatalf("NewConn after NodeLink close: %v", err) t.Fatalf("NewConn after NodeLink close: %v", err)
} }
xclose(c11) xclose(c21)
xclose(c12) xclose(c22)
// Recv/Send error after Close & NodeLink shutdown // Recv/Send error after Close & NodeLink shutdown
pkt, err = c11.Recv() pkt, err = c21.Recv()
if !(pkt == nil && err == ErrClosedConn) { if !(pkt == nil && err == ErrClosedConn) {
t.Fatalf("Conn.Recv after close and NodeLink close: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.Recv after close and NodeLink close: pkt = %v err = %v", pkt, err)
} }
err = c12.Send(&PktBuf{[]byte("data")}) err = c22.Send(&PktBuf{[]byte("data")})
if err != ErrClosedConn { if err != ErrClosedConn {
t.Fatalf("Conn.Send after close and NodeLink close: %v", err) t.Fatalf("Conn.Send after close and NodeLink close: %v", err)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment