Commit 70c2e81a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 76f4f073
...@@ -92,8 +92,10 @@ type Conn struct { ...@@ -92,8 +92,10 @@ type Conn struct {
rxq chan *PktBuf // received packets for this Conn go here rxq chan *PktBuf // received packets for this Conn go here
txerr chan error // transmit results for this Conn go back here txerr chan error // transmit results for this Conn go back here
down chan struct{} // ready when Conn is marked as no longer operational txdown chan struct{} // ready when Conn TX is marked as no longer operational
downOnce sync.Once // shutdown may be called by both Close and nodelink.shutdown rxdown chan struct{} // ----//---- RX
txdownOnce sync.Once // tx shutdown may be called by both Close and nodelink.shutdown
rxdownOnce sync.Once // ----//----
rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed
closed int32 // 1 if Close was called or "connection closed" entry closed int32 // 1 if Close was called or "connection closed" entry
...@@ -209,7 +211,8 @@ func (nl *NodeLink) newConn(connId uint32) *Conn { ...@@ -209,7 +211,8 @@ func (nl *NodeLink) newConn(connId uint32) *Conn {
connId: connId, connId: connId,
rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
down: make(chan struct{}), txdown: make(chan struct{}),
rxdown: make(chan struct{}),
} }
nl.connTab[connId] = c nl.connTab[connId] = c
return c return c
...@@ -293,16 +296,64 @@ func (nl *NodeLink) Close() error { ...@@ -293,16 +296,64 @@ func (nl *NodeLink) Close() error {
// shutdown marks connection as no longer operational // shutdown marks connection as no longer operational
func (c *Conn) shutdown() { func (c *Conn) shutdown() {
c.downOnce.Do(func() { c.shutdownTX()
close(c.down) c.shutdownRX(errConnClosed)
}
func (c *Conn) shutdownTX() {
c.txdownOnce.Do(func() {
close(c.txdown)
})
}
func (c *Conn) shutdownRX(errMsg *Error) {
c.rxdownOnce.Do(func() {
c.errMsg = errMsg
close(c.rxdown)
}) })
} }
var connKeepClosed = 1*time.Minute var connKeepClosed = 1*time.Minute
// CloseRecv closes reading end of connection.
//
// A peer will receive "connection closed" if it tries to send to connection
// with closed reading end.
//
// XXX no race wrt in-progress c.rxq <- ... ?
//
// Any blocked Recv*() will be unblocked and return error.
//
// It is safe to call CloseRecv several times.
func (c *Conn) CloseRecv() {
// XXX c.closedRecv = 1?
c.shutdownRX(errConnClosed) // XXX ok?
// deque all pakcets already queued in c.rxq
// (once serveRecv sees c.rxdown it won't try to put new packets into
// c.rxq but something finite could be already there)
i := 0
loop:
for {
select {
case <-c.rxq:
i++
default:
break loop
}
}
// if something was queued already there - reply "connection closed"
if i != 0 {
go c.replyNoConn()
}
}
// Close closes connection. // Close closes connection.
// Any blocked Send*() or Recv*() will be unblocked and return error //
// Any blocked Send*() or Recv*() will be unblocked and return error.
// //
// NOTE for Send() - once transmission was started - it will complete in the // NOTE for Send() - once transmission was started - it will complete in the
// background on the wire not to break node-node link framing. // background on the wire not to break node-node link framing.
...@@ -324,10 +375,11 @@ func (c *Conn) Close() error { ...@@ -324,10 +375,11 @@ func (c *Conn) Close() error {
// "connection closed" if another packet comes to it. // "connection closed" if another packet comes to it.
} else { } else {
cc := nl.newConn(c.connId) cc := nl.newConn(c.connId)
// cc.closed=1 so that cc is not freed by replyNoConn cc.shutdownRX(errConnClosed)
// NOTE cc.down stays not closed so Send could work // // cc.closed=1 so that cc is not freed by replyNoConn
atomic.StoreInt32(&cc.closed, 1) // // NOTE cc.down stays not closed so Send could work
cc.errMsg = errConnClosed // atomic.StoreInt32(&cc.closed, 1)
// cc.errMsg = errConnClosed
time.AfterFunc(connKeepClosed, func() { time.AfterFunc(connKeepClosed, func() {
nl.connMu.Lock() nl.connMu.Lock()
delete(nl.connTab, cc.connId) delete(nl.connTab, cc.connId)
...@@ -382,12 +434,16 @@ func (nl *NodeLink) Accept(/*ctx context.Context*/) (c *Conn, err error) { ...@@ -382,12 +434,16 @@ func (nl *NodeLink) Accept(/*ctx context.Context*/) (c *Conn, err error) {
} }
} }
// errRecvShutdown returns appropriate error when c.down is found ready in recvPkt // errRecvShutdown returns appropriate error when c.rxdown is found ready in recvPkt
func (c *Conn) errRecvShutdown() error { func (c *Conn) errRecvShutdown() error {
switch { switch {
// XXX adjust for CloseRead (shutdownRX)
case atomic.LoadInt32(&c.closed) != 0: case atomic.LoadInt32(&c.closed) != 0:
return ErrClosedConn return ErrClosedConn
case c.errMsg != nil:
return ErrClosedConn // shutdownRX
case atomic.LoadUint32(&c.nodeLink.closed) != 0: case atomic.LoadUint32(&c.nodeLink.closed) != 0:
return ErrLinkClosed return ErrLinkClosed
...@@ -411,8 +467,7 @@ func (c *Conn) errRecvShutdown() error { ...@@ -411,8 +467,7 @@ func (c *Conn) errRecvShutdown() error {
// recvPkt receives raw packet from connection // recvPkt receives raw packet from connection
func (c *Conn) recvPkt() (*PktBuf, error) { func (c *Conn) recvPkt() (*PktBuf, error) {
select { select {
// XXX maybe possible to detect "down" by seeing c.rxq is closed? case <-c.rxdown:
case <-c.down:
return nil, c.err("recv", c.errRecvShutdown()) return nil, c.err("recv", c.errRecvShutdown())
case pkt := <-c.rxq: case pkt := <-c.rxq:
...@@ -455,53 +510,73 @@ func (nl *NodeLink) serveRecv() { ...@@ -455,53 +510,73 @@ func (nl *NodeLink) serveRecv() {
//fmt.Printf("RX .%d -> %v\n", connId, conn) //fmt.Printf("RX .%d -> %v\n", connId, conn)
if conn == nil { if conn == nil {
// "new" connection will be needed in all cases - e.g. // "new" connection will be needed in all cases - e.g.
// temporarily to reply "connection refused" // even temporarily to reply "connection refused"
conn = nl.newConn(connId) conn = nl.newConn(connId)
//fmt.Printf("connId: %d (%d)\n", connId, connId % 2)
//fmt.Printf("nextConnId: %d (%d)\n", nl.nextConnId, nl.nextConnId % 2)
// message with connid that should be initiated by us // message with connid that should be initiated by us
if connId % 2 == nl.nextConnId % 2 { if connId % 2 == nl.nextConnId % 2 {
conn.errMsg = errConnClosed //conn.errMsg = errConnClosed
conn.shutdownRX(errConnClosed)
// message with connid for a stream initiated by peer // message with connid for a stream initiated by peer
} else { } else {
if nl.acceptq == nil { if nl.acceptq == nil {
conn.errMsg = errConnRefused //conn.errMsg = errConnRefused
conn.shutdownRX(errConnRefused)
} else { } else {
// we are accepting new incoming connection // we are accepting new incoming connection
accept = true accept = true
} }
//fmt.Println("ZZZ", conn.errMsg, accept)
} }
} }
/*
// we are not accepting packet in any way // we are not accepting packet in any way
if conn.errMsg != nil { if conn.errMsg != nil {
//fmt.Printf(".%d EMSG: %v\n", connId, conn.errMsg)
atomic.AddInt32(&conn.closed, 1) atomic.AddInt32(&conn.closed, 1)
nl.connMu.Unlock() nl.connMu.Unlock()
go conn.replyNoConn() go conn.replyNoConn()
continue continue
} }
*/
nl.connMu.Unlock()
// don't even try to `conn.rxq <- ...` if .rxdown is ready
// ( else since select is picking random ready variant Recv/serveRecv
// could receve something on rxdown Conn half sometimes )
rxdown := false
select {
case <-conn.rxdown:
rxdown = true
default:
// ok
}
// route packet to serving goroutine handler // route packet to serving goroutine handler
// //
// TODO backpressure when Recv is not keeping up with Send on peer side? // TODO backpressure when Recv is not keeping up with Send on peer side?
// (not to let whole nodelink starve because of one connection) // (not to let whole link starve because of one connection)
// //
// NOTE rxq must be buffered with at least 1 element so that // NOTE rxq must be buffered with at least 1 element so that
// queuing pkt succeeds for incoming connection that is not yet // queuing pkt succeeds for incoming connection that is not yet
// there in acceptq. // there in acceptq.
conn.rxq <- pkt if !rxdown {
select {
case <-conn.rxdown:
rxdown = true
case conn.rxq <- pkt:
// ok
}
}
// we are not accepting packet in any way
if rxdown {
go conn.replyNoConn()
continue
}
// keep connMu locked until here: so that ^^^ `conn.rxq <- pkt` can be
// sure conn stays not down e.g. closed by Conn.Close or NodeLink.shutdown
//
// XXX try to release connMu earlier - before `rxq <- pkt`
nl.connMu.Unlock()
if accept { if accept {
select { select {
...@@ -529,7 +604,7 @@ var errConnClosed = &Error{PROTOCOL_ERROR, "connection closed"} ...@@ -529,7 +604,7 @@ var errConnClosed = &Error{PROTOCOL_ERROR, "connection closed"}
var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"} var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"}
// replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection // replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection
// and removes connection from nodeLink connTab if ekeep==false. // and removes connection from nodeLink connTab if ekeep==false. XXX ekeep gone
//func (c *Conn) replyNoConn(e Msg, ekeep bool) { //func (c *Conn) replyNoConn(e Msg, ekeep bool) {
func (c *Conn) replyNoConn() { func (c *Conn) replyNoConn() {
c.Send(c.errMsg) // ignore errors c.Send(c.errMsg) // ignore errors
...@@ -552,7 +627,7 @@ type txReq struct { ...@@ -552,7 +627,7 @@ type txReq struct {
errch chan error errch chan error
} }
// errSendShutdown returns appropriate error when c.down is found ready in Send // errSendShutdown returns appropriate error when c.txdown is found ready in Send
func (c *Conn) errSendShutdown() error { func (c *Conn) errSendShutdown() error {
switch { switch {
case atomic.LoadInt32(&c.closed) != 0: case atomic.LoadInt32(&c.closed) != 0:
...@@ -582,7 +657,7 @@ func (c *Conn) sendPkt2(pkt *PktBuf) error { ...@@ -582,7 +657,7 @@ func (c *Conn) sendPkt2(pkt *PktBuf) error {
var err error var err error
select { select {
case <-c.down: case <-c.txdown:
return c.errSendShutdown() return c.errSendShutdown()
case c.nodeLink.txq <- txReq{pkt, c.txerr}: case c.nodeLink.txq <- txReq{pkt, c.txerr}:
...@@ -592,15 +667,15 @@ func (c *Conn) sendPkt2(pkt *PktBuf) error { ...@@ -592,15 +667,15 @@ func (c *Conn) sendPkt2(pkt *PktBuf) error {
// we cannot interrupt it as the only way to interrupt is // we cannot interrupt it as the only way to interrupt is
// .nodeLink.Close() which will close all other Conns. // .nodeLink.Close() which will close all other Conns.
// //
// That's why we are also checking for c.down while waiting // That's why we are also checking for c.txdown while waiting
// for reply from serveSend (and leave pkt to finish transmitting). // for reply from serveSend (and leave pkt to finish transmitting).
// //
// NOTE after we return straight here serveSend won't be later // NOTE after we return straight here serveSend won't be later
// blocked on c.txerr<- because that backchannel is a non-blocking one. // blocked on c.txerr<- because that backchannel is a non-blocking one.
case <-c.down: case <-c.txdown:
// also poll c.txerr here because: when there is TX error, // also poll c.txerr here because: when there is TX error,
// serveSend sends to c.txerr _and_ closes c.down . // serveSend sends to c.txerr _and_ closes c.txdown .
// We still want to return actual transmission error to caller. // We still want to return actual transmission error to caller.
select { select {
case err = <-c.txerr: case err = <-c.txerr:
...@@ -629,6 +704,9 @@ func (nl *NodeLink) serveSend() { ...@@ -629,6 +704,9 @@ func (nl *NodeLink) serveSend() {
err := nl.sendPkt(txreq.pkt) err := nl.sendPkt(txreq.pkt)
//fmt.Printf("sendPkt -> %v\n", err) //fmt.Printf("sendPkt -> %v\n", err)
// FIXME if several goroutines call conn.Send
// simultaneously - c.txerr even if buffered(1) will be
// overflown and thus deadlock here.
txreq.errch <- err txreq.errch <- err
// on IO error framing over peerLink becomes broken // on IO error framing over peerLink becomes broken
...@@ -1166,7 +1244,7 @@ func (link *NodeLink) Recv1() (Request, error) { ...@@ -1166,7 +1244,7 @@ func (link *NodeLink) Recv1() (Request, error) {
// noone will be reading from conn anymore - shutdown rx so that if // noone will be reading from conn anymore - shutdown rx so that if
// peer sends any another packet with same .ConnID serveRecv does not // peer sends any another packet with same .ConnID serveRecv does not
// deadlock trying to put it to conn.rxq. // deadlock trying to put it to conn.rxq.
// conn.CloseRead() // XXX err? conn.CloseRead()
return Request{Msg: msg, conn: conn}, nil return Request{Msg: msg, conn: conn}, nil
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment