Commit c28ad4d0 authored by Kirill Smelkov's avatar Kirill Smelkov

X Conn.Recv: receive without select

- BenchmarkLinkNetPipeRTT-4         500000              3768 ns/op             225 B/op          5 allocs/op
+ BenchmarkLinkNetPipeRTT-4         500000              3555 ns/op             225 B/op          5 allocs/op
parent 0fa96338
...@@ -94,8 +94,10 @@ type Conn struct { ...@@ -94,8 +94,10 @@ type Conn struct {
connId uint32 connId uint32
rxq chan *PktBuf // received packets for this Conn go here rxq chan *PktBuf // received packets for this Conn go here
rxqActive atomic32 // 1 while serveRecv is doing `rxq <- ...` rxqWrite atomic32 // 1 while serveRecv is doing `rxq <- ...`
rxqRead atomic32 // +1 while Conn.Recv is doing `... <- rxq`
rxdownFlag atomic32 // 1 when RX is marked no longer operational rxdownFlag atomic32 // 1 when RX is marked no longer operational
// XXX ^^^ split to different cache lines?
rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed XXX !light? rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed XXX !light?
// errMsg *Error // error message for peer if rx is down XXX try to do without it // errMsg *Error // error message for peer if rx is down XXX try to do without it
...@@ -110,7 +112,7 @@ type Conn struct { ...@@ -110,7 +112,7 @@ type Conn struct {
// //
// everything below is used during !light mode only. // everything below is used during !light mode only.
rxdown chan struct{} // ready when RX is marked no longer operational // rxdown chan struct{} // ready when RX is marked no longer operational
rxdownOnce sync.Once // ----//---- XXX review rxdownOnce sync.Once // ----//---- XXX review
rxclosed atomic32 // whether CloseRecv was called rxclosed atomic32 // whether CloseRecv was called
...@@ -208,7 +210,7 @@ var connPool = sync.Pool{New: func() interface{} { ...@@ -208,7 +210,7 @@ var connPool = sync.Pool{New: func() interface{} {
rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
txdown: make(chan struct{}), txdown: make(chan struct{}),
rxdown: make(chan struct{}), // rxdown: make(chan struct{}),
} }
}} }}
...@@ -232,14 +234,15 @@ func (c *Conn) reinit() { ...@@ -232,14 +234,15 @@ func (c *Conn) reinit() {
c.link = nil c.link = nil
c.connId = 0 c.connId = 0
// .rxq - set initially; does not change // .rxq - set initially; does not change
c.rxqActive.Set(0) // XXX store relaxed? c.rxqWrite.Set(0) // XXX store relaxed?
c.rxqRead.Set(0) // ----//----
c.rxdownFlag.Set(0) // ----//---- c.rxdownFlag.Set(0) // ----//----
c.rxerrOnce = sync.Once{} // XXX ok? c.rxerrOnce = sync.Once{} // XXX ok?
// XXX vvv not strictly needed for light mode? // XXX vvv not strictly needed for light mode?
ensureOpen(&c.rxdown) // ensureOpen(&c.rxdown)
c.rxdownOnce = sync.Once{} // XXX ok? c.rxdownOnce = sync.Once{} // XXX ok?
c.rxclosed.Set(0) c.rxclosed.Set(0)
...@@ -405,7 +408,7 @@ func (c *Conn) shutdownTX() { ...@@ -405,7 +408,7 @@ func (c *Conn) shutdownTX() {
// shutdownRX marks .rxq as no loner operational and interrupts Recv. // shutdownRX marks .rxq as no loner operational and interrupts Recv.
func (c *Conn) shutdownRX(errMsg *Error) { func (c *Conn) shutdownRX(errMsg *Error) {
c.rxdownOnce.Do(func() { c.rxdownOnce.Do(func() {
close(c.rxdown) // wakeup Conn.Recv // close(c.rxdown) // wakeup Conn.Recv
c.downRX(errMsg) c.downRX(errMsg)
}) })
} }
...@@ -429,7 +432,7 @@ func (c *Conn) downRX(errMsg *Error) { ...@@ -429,7 +432,7 @@ func (c *Conn) downRX(errMsg *Error) {
// rxq until next packet (where it will hit "before it"). // rxq until next packet (where it will hit "before it").
// //
// when serveRecv stopped sending we know we are done draining when rxq is empty. // when serveRecv stopped sending we know we are done draining when rxq is empty.
if c.rxqActive.Get() == 0 && len(c.rxq) == 0 { if c.rxqWrite.Get() == 0 && len(c.rxq) == 0 {
break break
} }
...@@ -446,6 +449,27 @@ func (c *Conn) downRX(errMsg *Error) { ...@@ -446,6 +449,27 @@ func (c *Conn) downRX(errMsg *Error) {
if i != 0 { if i != 0 {
go c.link.replyNoConn(c.connId, errMsg) go c.link.replyNoConn(c.connId, errMsg)
} }
// wakeup readers
for {
// similarly to above:
// we set .rxdownFlag=1
// now if recvPkt is outside `... <- .rxq` critical section we know that it is either:
// - before it -> it will eventually see .rxdownFlag=1 and won't try to read rxq.
// - after it -> it already read pktfrom rxq and won't touch
// rxq until next recvPkt (where it will het "before it").
if c.rxqRead.Get() == 0 {
break
}
select {
case c.rxq <- nil:
// ok - woken up
default:
// ok - continue spinning
}
}
} }
...@@ -584,6 +608,39 @@ func (c *Conn) errRecvShutdown() error { ...@@ -584,6 +608,39 @@ func (c *Conn) errRecvShutdown() error {
} }
} }
// recvPkt receives raw packet from connection
func (c *Conn) recvPkt() (*PktBuf, error) {
var pkt *PktBuf
var err error
// semantically equivalent to the following:
// (this is hot path and select is not used for performance reason)
//
// select {
// case <-c.rxdown:
// return nil, c.err("recv", c.errRecvShutdown())
//
// case pkt := <-c.rxq:
// return pkt, nil
// }
c.rxqRead.Add(1)
rxdown := c.rxdownFlag.Get() != 0
if !rxdown {
pkt = <-c.rxq
}
// decide about error while under rxqRead - if it was after the Conn can go away to be released
if rxdown || pkt == nil {
err = c.err("recv", c.errRecvShutdown())
}
c.rxqRead.Add(-1)
return pkt, err
}
/*
// recvPkt receives raw packet from connection // recvPkt receives raw packet from connection
func (c *Conn) recvPkt() (*PktBuf, error) { func (c *Conn) recvPkt() (*PktBuf, error) {
select { select {
...@@ -594,6 +651,7 @@ func (c *Conn) recvPkt() (*PktBuf, error) { ...@@ -594,6 +651,7 @@ func (c *Conn) recvPkt() (*PktBuf, error) {
return pkt, nil return pkt, nil
} }
} }
*/
// serveRecv handles incoming packets routing them to either appropriate // serveRecv handles incoming packets routing them to either appropriate
// already-established connection or, if node link is accepting incoming // already-established connection or, if node link is accepting incoming
...@@ -656,12 +714,12 @@ func (nl *NodeLink) serveRecv() { ...@@ -656,12 +714,12 @@ func (nl *NodeLink) serveRecv() {
// NOTE rxq must be buffered with at least 1 element so that // NOTE rxq must be buffered with at least 1 element so that
// queuing pkt succeeds for incoming connection that is not yet // queuing pkt succeeds for incoming connection that is not yet
// there in acceptq. // there in acceptq.
conn.rxqActive.Set(1) conn.rxqWrite.Set(1)
rxdown := conn.rxdownFlag.Get() != 0 rxdown := conn.rxdownFlag.Get() != 0
if !rxdown { if !rxdown {
conn.rxq <- pkt conn.rxq <- pkt
} }
conn.rxqActive.Set(0) conn.rxqWrite.Set(0)
//fmt.Printf("%p\tconn.rxdown: %v\taccept: %v\n", nl, rxdown, accept) //fmt.Printf("%p\tconn.rxdown: %v\taccept: %v\n", nl, rxdown, accept)
...@@ -1454,7 +1512,7 @@ type Request struct { ...@@ -1454,7 +1512,7 @@ type Request struct {
// //
// XXX doc // XXX doc
func (link *NodeLink) Recv1() (Request, error) { func (link *NodeLink) Recv1() (Request, error) {
conn, err := link.Accept(/*context.TODO()*/) // XXX remove context? conn, err := link.Accept()
if err != nil { if err != nil {
return Request{}, err // XXX or return *Request? (want to avoid alloc) return Request{}, err // XXX or return *Request? (want to avoid alloc)
} }
...@@ -1560,3 +1618,7 @@ func (a *atomic32) Get() int32 { ...@@ -1560,3 +1618,7 @@ func (a *atomic32) Get() int32 {
func (a *atomic32) Set(v int32) { func (a *atomic32) Set(v int32) {
atomic.StoreInt32(&a.v, v) atomic.StoreInt32(&a.v, v)
} }
func (a *atomic32) Add(δ int32) int32 {
return atomic.AddInt32(&a.v, δ)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment