Commit 8021a1d5 authored by Kirill Smelkov's avatar Kirill Smelkov

X rxghandoff

With some care it is possible for serveRecv to put itself to pause and
handoff execution to goroutine for which a packet arrived.

See comments in serveRecv about how it is done.

For BenchmarkTCPlosr cuts RTT 12.5μs -> 6.6μs .
parent 4df008a0
......@@ -387,7 +387,9 @@ func (c *Client) LastOid(ctx context.Context) (zodb.Oid, error) {
}
func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *zodb.Buf, serial zodb.Tid, err error) {
defer func() {
// defer func() ...
buf, serial, err = c._Load(ctx, xid)
switch err.(type) {
case nil:
// ok (avoid allocation in xerr.Contextf() call for no-error case)
......@@ -400,9 +402,12 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *zodb.Buf, serial
default:
xerr.Contextf(&err, "client: load %v", xid)
}
}()
err = c.withOperational(ctx)
return buf, serial, err
}
func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*zodb.Buf, zodb.Tid, error) {
err := c.withOperational(ctx)
if err != nil {
return nil, 0, err
}
......@@ -453,7 +458,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *zodb.Buf, serial
return nil, 0, err // XXX err context
}
buf = resp.Data
buf := resp.Data
//checksum := sha1.Sum(buf.Data)
//if checksum != resp.Checksum {
......
......@@ -86,8 +86,15 @@ type NodeLink struct {
closed atomic32 // whether Close was called
rxbuf rbuf.RingBuf // buffer for reading from peerLink
// scheduling optimization: whenever serveRecv sends to Conn.rxq
// receiving side must ack here to receive G handoff.
// See comments in serveRecv for details.
rxghandoff chan struct{}
}
const rxghandoff = true // XXX whether to do rxghandoff trick
// Conn is a connection established over NodeLink
//
// Data can be sent and received over it.
......@@ -197,6 +204,7 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
nextConnId: nextConnId,
acceptq: make(chan *Conn), // XXX +buf
txq: make(chan txReq),
rxghandoff: make(chan struct{}),
// axdown: make(chan struct{}),
down: make(chan struct{}),
}
......@@ -475,6 +483,7 @@ func (c *Conn) downRX(errMsg *Error) {
select {
case <-c.rxq:
c.rxack()
i++
default:
......@@ -685,9 +694,23 @@ func (c *Conn) recvPkt() (*PktBuf, error) {
}
c.rxqRead.Add(-1)
if err == nil {
c.rxack()
}
return pkt, err
}
// rxack unblocks serveRecv after it handed G to us.
// see comments about rxghandoff in serveRecv.
func (c *Conn) rxack() {
if !rxghandoff {
return
}
//fmt.Printf("conn: rxack <- ...\n")
c.link.rxghandoff <- struct{}{}
//fmt.Printf("\tconn: rxack <- ... ok\n")
}
// serveRecv handles incoming packets routing them to either appropriate
// already-established connection or, if node link is accepting incoming
// connections, to new connection put to accept queue.
......@@ -781,6 +804,8 @@ func (nl *NodeLink) serveRecv() {
nl.connMu.Lock()
delete(nl.connTab, conn.connId)
nl.connMu.Unlock()
continue
}
}
......@@ -789,9 +814,29 @@ func (nl *NodeLink) serveRecv() {
// Normally serveRecv G will continue to run with G waking up
// on rxq/acceptq only being put into the runqueue of current proc.
// By default proc runq will execute only when sendRecv blocks
// next time deep in nl.recvPkt(), but let's force the switch
// again next time deep in nl.recvPkt(), but let's force the switch
// now without additional wating to reduce latency.
// XXX bad - puts serveRecv to global runq thus with high p to switch M
//runtime.Gosched()
// handoff execution to receiving goroutine via channel.
// rest of serveRecv is put to current P local runq.
//
// details:
// - https://github.com/golang/go/issues/20168
// - https://github.com/golang/go/issues/15110
//
// see BenchmarkTCPlo* - for serveRecv style RX handoff there
// cuts RTT from 12.5μs to 6.6μs (RTT without serveRecv style G is 4.8μs)
//
// TODO report upstream
if rxghandoff {
//fmt.Printf("serveRecv: <-rxghandoff\n")
<-nl.rxghandoff
//fmt.Printf("\tserveRecv: <-rxghandoff ok\n")
}
/*
// XXX goes away in favour of .rxdownFlag; reasons
// - no need to reallocate rxdown for light conn
......
......@@ -26,6 +26,7 @@ import (
"io"
"net"
"reflect"
"runtime"
"testing"
"time"
......@@ -911,9 +912,21 @@ func BenchmarkBufChanAXRXRTT(b *testing.B) {
}
var gosched = make(chan struct{})
// GoschedLocal is like runtime.Gosched but queus current goroutine on P-local
// runqueue instead of global runqueu.
// FIXME does not work - in the end goroutines appear on different Ps/Ms
func GoschedLocal() {
go func() {
gosched <- struct{}{}
}()
<-gosched
}
// rtt over net.Conn Read/Write
// if serveRecv=t - do RX path with additional serveRecv-style goroutine
func benchmarkNetConnRTT(b *testing.B, c1, c2 net.Conn, serveRecv bool) {
func benchmarkNetConnRTT(b *testing.B, c1, c2 net.Conn, serveRecv bool, ghandoff bool) {
buf1 := make([]byte, 1)
buf2 := make([]byte, 1)
......@@ -925,22 +938,51 @@ func benchmarkNetConnRTT(b *testing.B, c1, c2 net.Conn, serveRecv bool) {
n int
erx error
}
rxq := make(chan rx)
go func() {
rxq := make(chan rx, 1)
rxghandoff := make(chan struct{})
var serveRx func()
serveRx = func() {
for {
n, erx := io.ReadFull(c, buf)
//fmt.Printf("(go) %p rx -> %v %v\n", c, n, erx)
rxq <- rx{n, erx}
// good: reduce switch to receiver G latency
// see comment about rxghandoff in serveRecv
// in case of TCP/loopback saves ~5μs
if ghandoff {
<-rxghandoff
}
// stop on first error
if erx != nil {
return
}
if false {
// bad - puts G in global runq and so it changes M
runtime.Gosched()
}
if false {
// bad - same as runtime.Gosched
GoschedLocal()
}
}()
if false {
// bad - in the end Gs appear on different Ms
go serveRx()
return
}
}
}
go serveRx()
recv = func() (int, error) {
r := <-rxq
if ghandoff {
rxghandoff <- struct{}{}
}
return r.n, r.erx
}
......@@ -1019,12 +1061,17 @@ func benchmarkNetConnRTT(b *testing.B, c1, c2 net.Conn, serveRecv bool) {
// rtt over net.Pipe - for comparision as base
func BenchmarkNetPipeRTT(b *testing.B) {
c1, c2 := net.Pipe()
benchmarkNetConnRTT(b, c1, c2, false)
benchmarkNetConnRTT(b, c1, c2, false, false)
}
func BenchmarkNetPipeRTTsr(b *testing.B) {
c1, c2 := net.Pipe()
benchmarkNetConnRTT(b, c1, c2, true)
benchmarkNetConnRTT(b, c1, c2, true, false)
}
func BenchmarkNetPipeRTTsrho(b *testing.B) {
c1, c2 := net.Pipe()
benchmarkNetConnRTT(b, c1, c2, true, true)
}
// xtcpPipe creates two TCP connections connected to each other via loopback
......@@ -1046,12 +1093,17 @@ func xtcpPipe() (*net.TCPConn, *net.TCPConn) {
// rtt over TCP/loopback - for comparision as base
func BenchmarkTCPlo(b *testing.B) {
c1, c2 := xtcpPipe()
benchmarkNetConnRTT(b, c1, c2, false)
benchmarkNetConnRTT(b, c1, c2, false, false)
}
func BenchmarkTCPlosr(b *testing.B) {
c1, c2 := xtcpPipe()
benchmarkNetConnRTT(b, c1, c2, true)
benchmarkNetConnRTT(b, c1, c2, true, false)
}
func BenchmarkTCPlosrho(b *testing.B) {
c1, c2 := xtcpPipe()
benchmarkNetConnRTT(b, c1, c2, true, true)
}
......
......@@ -627,6 +627,8 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker) {
if !(bytes.Equal(cbuf1.Data, buf1.Data) && cserial1 == serial1) {
b.Fatalf("C.Load first -> %q %v ; want %q %v", cbuf1.Data, cserial1, buf1.Data, serial1)
}
cbuf1.Release()
}
// do first C.Load - this also implicitly waits for M & S to come up
......@@ -641,7 +643,7 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker) {
}
}
func BenchmarkGetObjectPipe(b *testing.B) {
func BenchmarkGetObjectNetPipe(b *testing.B) {
net := pipenet.New("testnet")
Mhost := net.Host("m")
Shost := net.Host("s")
......
......@@ -170,7 +170,9 @@ func (m *Master) Run(ctx context.Context) (err error) {
req, idReq, err := l.Accept(ctx)
if err != nil {
if !xcontext.Canceled(err) {
log.Error(ctx, err) // XXX throttle?
}
continue
}
......
......@@ -115,7 +115,9 @@ func (stor *Storage) Run(ctx context.Context) error {
req, idReq, err := l.Accept(ctx)
if err != nil {
if !xcontext.Canceled(err) {
log.Error(ctx, err) // XXX throttle?
}
continue
}
......
......@@ -252,8 +252,13 @@ func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *zodb.Buf, tid
dh.Oid = xid.Oid
dh.Tid = zodb.TidMax
dh.PrevRevPos = dataPos
defer dh.Free()
//defer dh.Free()
buf, tid, err = fs._Load(dh, xid)
dh.Free()
return buf, tid, err
}
func (fs *FileStorage) _Load(dh *DataHeader, xid zodb.Xid) (*zodb.Buf, zodb.Tid, error) {
tidBefore := xid.XTid.Tid
if !xid.XTid.TidBefore {
tidBefore++ // XXX recheck this is ok wrt overflow
......@@ -261,7 +266,7 @@ func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *zodb.Buf, tid
// search backwards for when we first have data record with tid satisfying xid.XTid
for dh.Tid >= tidBefore {
err = dh.LoadPrevRev(fs.file)
err := dh.LoadPrevRev(fs.file)
if err != nil {
if err == io.EOF {
// no such oid revision
......@@ -281,9 +286,9 @@ func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *zodb.Buf, tid
// even if we will scan back via backpointers, the tid returned should
// be of first-found transaction
tid = dh.Tid
tid := dh.Tid
buf, err = dh.LoadData(fs.file)
buf, err := dh.LoadData(fs.file)
if err != nil {
return nil, 0, &ErrXidLoad{xid, err}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment