Commit 0b18819a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 213b9f14
...@@ -62,17 +62,16 @@ type NodeLink struct { ...@@ -62,17 +62,16 @@ type NodeLink struct {
txq chan txReq // tx requests from Conns go via here txq chan txReq // tx requests from Conns go via here
errMu sync.Mutex errMu sync.Mutex
// errSend error // error got from sendPkt, if any errRecv error // error got from recvPkt on shutdown
errRecv error // error got from recvPkt, if any
// once because: NodeLink has to be explicitly closed by user; it can also // once because: NodeLink has to be explicitly closed by user; it can also
// be "closed" by IO errors on peerLink // be "closed" by IO errors on peerLink
closeOnce sync.Once closeOnce sync.Once
closed chan struct{} // XXX text down chan struct{} // ready when NodeLink is marked as no longer operational
closeCalled uint32 // whether Close was called; ^^^ can be from IO error closeCalled uint32 // whether Close was called; ^^^ can be from IO error
closeWg sync.WaitGroup // XXX for close waiter shutdownWg sync.WaitGroup // for activities at shutdown
errClose error // error got from peerLink.Close errClose error // error got from peerLink.Close
} }
...@@ -89,18 +88,17 @@ type Conn struct { ...@@ -89,18 +88,17 @@ type Conn struct {
rxq chan *PktBuf // received packets for this Conn go here rxq chan *PktBuf // received packets for this Conn go here
txerr chan error // transmit errors for this Conn go back here txerr chan error // transmit errors for this Conn go back here
closed chan struct{} // whether Conn is marked as no longer operational down chan struct{} // ready when Conn is marked as no longer operational
downOnce sync.Once // shutdown may be called by both Close and nodelink.shutdown
closeCalled uint32 // whether Close was called; ^^^ can be from IO error on node link closeCalled uint32 // whether Close was called; ^^^ can be from IO error on node link
rxerrOnce sync.Once // XXX whether actual RX error was already reported to caller rxerrOnce sync.Once // XXX whether actual RX error was already reported to caller
// once because: Conn has to be explicitly closed by user; it can also
// be closed by NodeLink.Close .
closeOnce sync.Once
} }
// ErrLinkClosed is the error indicated for operations on closed NodeLink // ErrLinkClosed is the error indicated for operations on closed NodeLink
var ErrLinkClosed = errors.New("node link is closed") // XXX -> read/write but also Accept ? var ErrLinkClosed = errors.New("node link is closed") // XXX -> read/write but also Accept ?
var ErrLinkStopped = errors.New("node link was stopped") // XXX due to IO errors? var ErrLinkDown = errors.New("node link is down") // XXX due to IO errors?
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections") var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
var ErrClosedConn = errors.New("read/write on closed connection") var ErrClosedConn = errors.New("read/write on closed connection")
...@@ -149,7 +147,7 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink { ...@@ -149,7 +147,7 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink {
nextConnId: nextConnId, nextConnId: nextConnId,
acceptq: acceptq, acceptq: acceptq,
txq: make(chan txReq), txq: make(chan txReq),
closed: make(chan struct{}), down: make(chan struct{}),
} }
if role&linkNoRecvSend == 0 { if role&linkNoRecvSend == 0 {
nl.serveWg.Add(2) nl.serveWg.Add(2)
...@@ -166,7 +164,7 @@ func (nl *NodeLink) newConn(connId uint32) *Conn { ...@@ -166,7 +164,7 @@ func (nl *NodeLink) newConn(connId uint32) *Conn {
connId: connId, connId: connId,
rxq: make(chan *PktBuf), // TODO buffering rxq: make(chan *PktBuf), // TODO buffering
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
closed: make(chan struct{}), down: make(chan struct{}),
} }
nl.connTab[connId] = c nl.connTab[connId] = c
return c return c
...@@ -180,35 +178,38 @@ func (nl *NodeLink) NewConn() (*Conn, error) { ...@@ -180,35 +178,38 @@ func (nl *NodeLink) NewConn() (*Conn, error) {
if atomic.LoadUint32(&nl.closeCalled) != 0 { if atomic.LoadUint32(&nl.closeCalled) != 0 {
return nil, ErrLinkClosed return nil, ErrLinkClosed
} }
return nil, ErrLinkStopped return nil, ErrLinkDown
} }
c := nl.newConn(nl.nextConnId) c := nl.newConn(nl.nextConnId)
nl.nextConnId += 2 nl.nextConnId += 2
return c, nil return c, nil
} }
// close is worker for Close & friends. // shutdown closes peerLink and marks NodeLink as no longer operational
// It marks NodeLink and all active Conns as closed. // it also shutdowns and all active Conns.
func (nl *NodeLink) close() { func (nl *NodeLink) shutdown() {
nl.closeOnce.Do(func() { nl.closeOnce.Do(func() {
close(nl.closed) close(nl.down)
// close actual link to peer. this will wakeup serve{Send,Recv} // close actual link to peer. this will wakeup {send,recv}Pkt
// NOTE we need it here so that e.g. aborting on error serveSend wakes up serveRecv // NOTE we need it here so that e.g. aborting on error in serveSend wakes up serveRecv
nl.errClose = nl.peerLink.Close() nl.errClose = nl.peerLink.Close()
nl.closeWg.Add(1) nl.shutdownWg.Add(1)
go func() { go func() {
defer nl.closeWg.Done() defer nl.shutdownWg.Done()
// wait for serve{Send,Recv} to complete before signalling to Conns // wait for serve{Send,Recv} to complete before shutting connections down
//
// we have to do it so that e.g. serveSend has chance
// to return last error from sendPkt to requester.
nl.serveWg.Wait() nl.serveWg.Wait()
nl.connMu.Lock() nl.connMu.Lock()
for _, conn := range nl.connTab { for _, conn := range nl.connTab {
// NOTE anything waking up on Conn.closed must not lock // NOTE anything waking up on Conn.closed must not lock
// connMu - else it will deadlock. // connMu - else it will deadlock.
conn.close() conn.shutdown()
} }
nl.connTab = nil // clear + mark closed nl.connTab = nil // clear + mark closed
nl.connMu.Unlock() nl.connMu.Unlock()
...@@ -220,15 +221,15 @@ func (nl *NodeLink) close() { ...@@ -220,15 +221,15 @@ func (nl *NodeLink) close() {
// IO on connections established over it is automatically interrupted with an error. // IO on connections established over it is automatically interrupted with an error.
func (nl *NodeLink) Close() error { func (nl *NodeLink) Close() error {
atomic.StoreUint32(&nl.closeCalled, 1) atomic.StoreUint32(&nl.closeCalled, 1)
nl.close() nl.shutdown()
nl.closeWg.Wait() // wait for close to complete nl.shutdownWg.Wait()
return nl.errClose return nl.errClose
} }
// worker for Close() & co // shutdown marks connection as no longer operational
func (c *Conn) close() { func (c *Conn) shutdown() {
c.closeOnce.Do(func() { c.downOnce.Do(func() {
close(c.closed) close(c.down)
}) })
} }
...@@ -244,7 +245,7 @@ func (c *Conn) Close() error { ...@@ -244,7 +245,7 @@ func (c *Conn) Close() error {
c.nodeLink.connMu.Unlock() c.nodeLink.connMu.Unlock()
atomic.StoreUint32(&c.closeCalled, 1) atomic.StoreUint32(&c.closeCalled, 1)
c.close() c.shutdown()
return nil return nil
} }
...@@ -256,16 +257,19 @@ func (nl *NodeLink) Accept() (*Conn, error) { ...@@ -256,16 +257,19 @@ func (nl *NodeLink) Accept() (*Conn, error) {
} }
select { select {
case <-nl.closed: case <-nl.down:
if atomic.LoadUint32(&nl.closeCalled) != 0 {
return nil, ErrLinkClosed // XXX + op = Accept ? return nil, ErrLinkClosed // XXX + op = Accept ?
}
return nil, ErrLinkDown // XXX test
case c := <-nl.acceptq: // XXX -> only c, ok := <-nl.acceptq ? case c := <-nl.acceptq: // XXX -> only c, ok := <-nl.acceptq ?
return c, nil return c, nil
} }
} }
// errRecvClosed returns appropriate error when c.closed is found ready in Recv // errRecvShutdown returns appropriate error when c.closed is found ready in Recv
func (c *Conn) errRecvClosed() error { func (c *Conn) errRecvShutdown() error {
switch { switch {
case atomic.LoadUint32(&c.closeCalled) != 0: case atomic.LoadUint32(&c.closeCalled) != 0:
return ErrClosedConn return ErrClosedConn
...@@ -274,8 +278,8 @@ func (c *Conn) errRecvClosed() error { ...@@ -274,8 +278,8 @@ func (c *Conn) errRecvClosed() error {
return ErrLinkClosed return ErrLinkClosed
default: default:
// we have to check what was particular RX error on nodelink // we have to check what was particular RX error on nodelink shutdown
// only do that once - after reportin RX error the first time // only do that once - after reporting RX error the first time
// tell client the node link is no longer operational. // tell client the node link is no longer operational.
var err error var err error
c.rxerrOnce.Do(func() { c.rxerrOnce.Do(func() {
...@@ -284,7 +288,7 @@ func (c *Conn) errRecvClosed() error { ...@@ -284,7 +288,7 @@ func (c *Conn) errRecvClosed() error {
c.nodeLink.errMu.Unlock() c.nodeLink.errMu.Unlock()
}) })
if err == nil { if err == nil {
err = ErrLinkStopped err = ErrLinkDown
} }
return err return err
} }
...@@ -293,11 +297,11 @@ func (c *Conn) errRecvClosed() error { ...@@ -293,11 +297,11 @@ func (c *Conn) errRecvClosed() error {
// Recv receives packet from connection // Recv receives packet from connection
func (c *Conn) Recv() (*PktBuf, error) { func (c *Conn) Recv() (*PktBuf, error) {
select { select {
case <-c.closed: case <-c.down:
return nil, c.errRecvClosed() return nil, c.errRecvShutdown()
case pkt := <-c.rxq: // XXX try to leave only pkt, ok := <-c.rxq case pkt := <-c.rxq:
return pkt, nil // XXX error = ? return pkt, nil
} }
} }
...@@ -312,25 +316,13 @@ func (nl *NodeLink) serveRecv() { ...@@ -312,25 +316,13 @@ func (nl *NodeLink) serveRecv() {
//fmt.Printf("recvPkt -> %v, %v\n", pkt, err) //fmt.Printf("recvPkt -> %v, %v\n", pkt, err)
if err != nil { if err != nil {
// on IO error framing over peerLink becomes broken // on IO error framing over peerLink becomes broken
// so we mark node link and all connections as closed and stop service // so we shut down node link and all connections over it.
/*
select {
case <-nl.closed:
// error was due to closing NodeLink
err = ErrLinkClosed
default:
}
*/
nl.errMu.Lock() nl.errMu.Lock()
nl.errRecv = err nl.errRecv = err
nl.errMu.Unlock() nl.errMu.Unlock()
// wake-up all conns & mark node link as closed nl.shutdown()
// NOTE this also wakeups serveSend/sendPkt
nl.close()
return return
} }
...@@ -374,21 +366,21 @@ type txReq struct { ...@@ -374,21 +366,21 @@ type txReq struct {
errch chan error errch chan error
} }
// errSendClosed returns approproate error when c.closed is found ready in Send // errSendShutdown returns approproate error when c.down is found ready in Send
func (c *Conn) errSendClosed() error { func (c *Conn) errSendShutdown() error {
switch { switch {
case atomic.LoadUint32(&c.closeCalled) != 0: case atomic.LoadUint32(&c.closeCalled) != 0:
return ErrClosedConn return ErrClosedConn
// the only other error possible besides Conn being .Close()'ed is that // the only other error possible besides Conn being .Close()'ed is that
// NodeLink was closed/stopped itself - on actual IO problems corresponding // NodeLink was closed/shutdowned itself - on actual IO problems corresponding
// error is delivered to particular Send that caused it. // error is delivered to particular Send that caused it.
case atomic.LoadUint32(&c.nodeLink.closeCalled) != 0: case atomic.LoadUint32(&c.nodeLink.closeCalled) != 0:
return ErrLinkClosed return ErrLinkClosed
default: default:
return ErrLinkStopped return ErrLinkDown
} }
} }
...@@ -399,8 +391,8 @@ func (c *Conn) Send(pkt *PktBuf) error { ...@@ -399,8 +391,8 @@ func (c *Conn) Send(pkt *PktBuf) error {
var err error var err error
select { select {
case <-c.closed: case <-c.down:
return c.errSendClosed() return c.errSendShutdown()
case c.nodeLink.txq <- txReq{pkt, c.txerr}: case c.nodeLink.txq <- txReq{pkt, c.txerr}:
select { select {
...@@ -409,30 +401,27 @@ func (c *Conn) Send(pkt *PktBuf) error { ...@@ -409,30 +401,27 @@ func (c *Conn) Send(pkt *PktBuf) error {
// we cannot interrupt it as the only way to interrupt is // we cannot interrupt it as the only way to interrupt is
// .nodeLink.Close() which will close all other Conns. // .nodeLink.Close() which will close all other Conns.
// //
// That's why we are also checking for c.closed while waiting // That's why we are also checking for c.down while waiting
// for reply from serveSend (and leave pkt to finish transmitting). // for reply from serveSend (and leave pkt to finish transmitting).
// //
// NOTE after we return straight here serveSend won't be later // NOTE after we return straight here serveSend won't be later
// blocked on c.txerr<- because that backchannel is a non-blocking one. // blocked on c.txerr<- because that backchannel is a non-blocking one.
case <-c.closed: case <-c.down:
// also poll c.txerr here because: when there is TX error, // also poll c.txerr here because: when there is TX error,
// serveSend sends to c.txerr _and_ closes c.closed . // serveSend sends to c.txerr _and_ closes c.down .
// We still want to return actual transmission error to caller. // We still want to return actual transmission error to caller.
select { select {
case err = <-c.txerr: case err = <-c.txerr:
return err // XXX if nil ? return err
default: default:
return c.errSendClosed() return c.errSendShutdown()
} }
case err = <-c.txerr: case err = <-c.txerr:
//fmt.Printf("%v <- c.txerr\n", err) return err
return err // XXX if nil ?
} }
} }
// return err
} }
// serveSend handles requests to transmit packets from client connections and // serveSend handles requests to transmit packets from client connections and
...@@ -441,47 +430,19 @@ func (nl *NodeLink) serveSend() { ...@@ -441,47 +430,19 @@ func (nl *NodeLink) serveSend() {
defer nl.serveWg.Done() defer nl.serveWg.Done()
for { for {
select { select {
case <-nl.closed: case <-nl.down:
return return
case txreq := <-nl.txq: case txreq := <-nl.txq:
err := nl.sendPkt(txreq.pkt) err := nl.sendPkt(txreq.pkt)
//fmt.Printf("sendPkt -> %v\n", err) //fmt.Printf("sendPkt -> %v\n", err)
// on IO error framing over peerLink becomes broken txreq.errch <- err
// so mark node link and all connections as closed and stop service
/*
if err != nil {
select {
case <-nl.closed:
// error was due to closing NodeLink
err = ErrLinkClosed
default:
}
}
*/
txreq.errch <- err // XXX recheck wakeup logic for err case
// XXX we need to first wait till _both_ serveRecv & serveSend complete
// and only then close all Conns. Reason: e.g. when remote shutdowns
// both sendPkt and recvPkt get error. If recvPkt was
// first serveRecv will be first to mark connections as
// closed and even though sendPkt will return proper IO
// error it won't be delivered as Conn.Send waiting for
// it already waked up on c.closed without seeing
// txreq.errch being ready.
// on IO error framing over peerLink becomes broken
// so we shut down node link and all connections over it.
if err != nil { if err != nil {
// XXX use errMu to lock vvv if needed nl.shutdown()
// nl.sendErr = err
// wake-up all conns & mark node link as closed
// NOTE this also wakeups serveRecv/recvPkt
nl.close()
return return
} }
} }
...@@ -489,14 +450,6 @@ func (nl *NodeLink) serveSend() { ...@@ -489,14 +450,6 @@ func (nl *NodeLink) serveSend() {
} }
// XXX used ?
func errClosedConn(err error) error {
if err != nil {
return err
}
return ErrClosedConn
}
// ---- raw IO ---- // ---- raw IO ----
// sendPkt sends raw packet to peer // sendPkt sends raw packet to peer
......
...@@ -363,7 +363,7 @@ func TestNodeLink(t *testing.T) { ...@@ -363,7 +363,7 @@ func TestNodeLink(t *testing.T) {
// NewConn after NodeLink stop // NewConn after NodeLink stop
c, err = nl1.NewConn() c, err = nl1.NewConn()
if err != ErrLinkStopped { if err != ErrLinkDown {
t.Fatalf("NewConn after NodeLink stop: %v", err) t.Fatalf("NewConn after NodeLink stop: %v", err)
} }
...@@ -373,17 +373,17 @@ func TestNodeLink(t *testing.T) { ...@@ -373,17 +373,17 @@ func TestNodeLink(t *testing.T) {
t.Fatalf("Conn.Recv 2 after peer NodeLink shutdown: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.Recv 2 after peer NodeLink shutdown: pkt = %v err = %v", pkt, err)
} }
err = c13.Send(&PktBuf{[]byte("data")}) err = c13.Send(&PktBuf{[]byte("data")})
if err != ErrLinkStopped { if err != ErrLinkDown {
t.Fatalf("Conn.Send 2 after peer NodeLink shutdown: %v", err) t.Fatalf("Conn.Send 2 after peer NodeLink shutdown: %v", err)
} }
// Recv/Send error on second call // Recv/Send error on second call
pkt, err = c11.Recv() pkt, err = c11.Recv()
if !(pkt == nil && err == ErrLinkStopped) { if !(pkt == nil && err == ErrLinkDown) {
t.Fatalf("Conn.Recv after NodeLink stop: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.Recv after NodeLink stop: pkt = %v err = %v", pkt, err)
} }
err = c12.Send(&PktBuf{[]byte("data")}) err = c12.Send(&PktBuf{[]byte("data")})
if err != ErrLinkStopped { if err != ErrLinkDown {
t.Fatalf("Conn.Send after NodeLink stop: %v", err) t.Fatalf("Conn.Send after NodeLink stop: %v", err)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment