Commit fc5edaca authored by Ben Burkert's avatar Ben Burkert Committed by Brad Fitzpatrick

net: use splice(2) on Linux when reading from UnixConn, rework splice tests

Rework the splice tests and benchmarks. Move the reading and writing of
the spliced connections to child processes so that the I/O is not part
of benchmarks or profiles.

Enable the use of splice(2) when reading from a unix connection and
writing to a TCP connection. The updated benchmarks show a performance
gain when using splice(2) to copy large chunks of data that the original
benchmark did not capture.

  name                          old time/op    new time/op    delta
  Splice/tcp-to-tcp/1024-8        5.01µs ± 2%    5.08µs ± 3%      ~     (p=0.068 n=8+10)
  Splice/tcp-to-tcp/2048-8        4.76µs ± 5%    4.65µs ± 3%    -2.36%  (p=0.015 n=9+8)
  Splice/tcp-to-tcp/4096-8        4.91µs ± 2%    4.98µs ± 5%      ~     (p=0.315 n=9+10)
  Splice/tcp-to-tcp/8192-8        5.50µs ± 4%    5.44µs ± 3%      ~     (p=0.758 n=7+9)
  Splice/tcp-to-tcp/16384-8       7.65µs ± 7%    6.53µs ± 3%   -14.65%  (p=0.000 n=10+9)
  Splice/tcp-to-tcp/32768-8       15.3µs ± 7%     8.5µs ± 5%   -44.21%  (p=0.000 n=10+10)
  Splice/tcp-to-tcp/65536-8       30.0µs ± 6%    15.7µs ± 1%   -47.58%  (p=0.000 n=10+8)
  Splice/tcp-to-tcp/131072-8      59.2µs ± 2%    27.4µs ± 5%   -53.75%  (p=0.000 n=9+9)
  Splice/tcp-to-tcp/262144-8       121µs ± 4%      54µs ±19%   -55.56%  (p=0.000 n=9+10)
  Splice/tcp-to-tcp/524288-8       247µs ± 6%     108µs ±12%   -56.34%  (p=0.000 n=10+10)
  Splice/tcp-to-tcp/1048576-8      490µs ± 4%     199µs ±12%   -59.31%  (p=0.000 n=8+10)
  Splice/unix-to-tcp/1024-8       1.20µs ± 2%    1.35µs ± 7%   +12.47%  (p=0.000 n=10+10)
  Splice/unix-to-tcp/2048-8       1.33µs ±12%    1.57µs ± 4%   +17.85%  (p=0.000 n=9+10)
  Splice/unix-to-tcp/4096-8       2.24µs ± 4%    1.67µs ± 4%   -25.14%  (p=0.000 n=9+10)
  Splice/unix-to-tcp/8192-8       4.59µs ± 8%    2.20µs ±10%   -52.01%  (p=0.000 n=10+10)
  Splice/unix-to-tcp/16384-8      8.46µs ±13%    3.48µs ± 6%   -58.91%  (p=0.000 n=10+10)
  Splice/unix-to-tcp/32768-8      18.5µs ± 9%     6.1µs ± 9%   -66.99%  (p=0.000 n=10+10)
  Splice/unix-to-tcp/65536-8      35.9µs ± 7%    13.5µs ± 6%   -62.40%  (p=0.000 n=10+9)
  Splice/unix-to-tcp/131072-8     79.4µs ± 6%    25.7µs ± 4%   -67.62%  (p=0.000 n=10+9)
  Splice/unix-to-tcp/262144-8      157µs ± 4%      54µs ± 8%   -65.63%  (p=0.000 n=10+10)
  Splice/unix-to-tcp/524288-8      311µs ± 3%     107µs ± 8%   -65.74%  (p=0.000 n=10+10)
  Splice/unix-to-tcp/1048576-8     643µs ± 4%     185µs ±32%   -71.21%  (p=0.000 n=10+10)

  name                          old speed      new speed      delta
  Splice/tcp-to-tcp/1024-8       204MB/s ± 2%   202MB/s ± 3%      ~     (p=0.068 n=8+10)
  Splice/tcp-to-tcp/2048-8       430MB/s ± 5%   441MB/s ± 3%    +2.39%  (p=0.014 n=9+8)
  Splice/tcp-to-tcp/4096-8       833MB/s ± 2%   823MB/s ± 5%      ~     (p=0.315 n=9+10)
  Splice/tcp-to-tcp/8192-8      1.49GB/s ± 4%  1.51GB/s ± 3%      ~     (p=0.758 n=7+9)
  Splice/tcp-to-tcp/16384-8     2.14GB/s ± 7%  2.51GB/s ± 3%   +17.03%  (p=0.000 n=10+9)
  Splice/tcp-to-tcp/32768-8     2.15GB/s ± 7%  3.85GB/s ± 5%   +79.11%  (p=0.000 n=10+10)
  Splice/tcp-to-tcp/65536-8     2.19GB/s ± 5%  4.17GB/s ± 1%   +90.65%  (p=0.000 n=10+8)
  Splice/tcp-to-tcp/131072-8    2.22GB/s ± 2%  4.79GB/s ± 4%  +116.26%  (p=0.000 n=9+9)
  Splice/tcp-to-tcp/262144-8    2.17GB/s ± 4%  4.93GB/s ±17%  +127.25%  (p=0.000 n=9+10)
  Splice/tcp-to-tcp/524288-8    2.13GB/s ± 6%  4.89GB/s ±13%  +130.15%  (p=0.000 n=10+10)
  Splice/tcp-to-tcp/1048576-8   2.09GB/s ±10%  5.29GB/s ±11%  +153.36%  (p=0.000 n=10+10)
  Splice/unix-to-tcp/1024-8      850MB/s ± 2%   757MB/s ± 7%   -10.94%  (p=0.000 n=10+10)
  Splice/unix-to-tcp/2048-8     1.54GB/s ±11%  1.31GB/s ± 3%   -15.32%  (p=0.000 n=9+10)
  Splice/unix-to-tcp/4096-8     1.83GB/s ± 4%  2.45GB/s ± 4%   +33.59%  (p=0.000 n=9+10)
  Splice/unix-to-tcp/8192-8     1.79GB/s ± 9%  3.73GB/s ± 9%  +108.05%  (p=0.000 n=10+10)
  Splice/unix-to-tcp/16384-8    1.95GB/s ±13%  4.68GB/s ± 3%  +139.80%  (p=0.000 n=10+9)
  Splice/unix-to-tcp/32768-8    1.78GB/s ± 9%  5.38GB/s ±10%  +202.71%  (p=0.000 n=10+10)
  Splice/unix-to-tcp/65536-8    1.83GB/s ± 8%  4.85GB/s ± 6%  +165.70%  (p=0.000 n=10+9)
  Splice/unix-to-tcp/131072-8   1.65GB/s ± 6%  5.10GB/s ± 4%  +208.77%  (p=0.000 n=10+9)
  Splice/unix-to-tcp/262144-8   1.67GB/s ± 4%  4.87GB/s ± 7%  +191.19%  (p=0.000 n=10+10)
  Splice/unix-to-tcp/524288-8   1.69GB/s ± 3%  4.93GB/s ± 7%  +192.38%  (p=0.000 n=10+10)
  Splice/unix-to-tcp/1048576-8  1.63GB/s ± 3%  5.60GB/s ±44%  +243.26%  (p=0.000 n=10+9)

Change-Id: I1eae4c3459c918558c70fc42283db22ff7e0442c
Reviewed-on: https://go-review.googlesource.com/113997Reviewed-by: default avatarBrad Fitzpatrick <bradfitz@golang.org>
parent f94de9c9
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
// splice transfers data from r to c using the splice system call to minimize // splice transfers data from r to c using the splice system call to minimize
// copies from and to userspace. c must be a TCP connection. Currently, splice // copies from and to userspace. c must be a TCP connection. Currently, splice
// is only enabled if r is also a TCP connection. // is only enabled if r is a TCP or Unix connection.
// //
// If splice returns handled == false, it has performed no work. // If splice returns handled == false, it has performed no work.
func splice(c *netFD, r io.Reader) (written int64, err error, handled bool) { func splice(c *netFD, r io.Reader) (written int64, err error, handled bool) {
...@@ -23,11 +23,17 @@ func splice(c *netFD, r io.Reader) (written int64, err error, handled bool) { ...@@ -23,11 +23,17 @@ func splice(c *netFD, r io.Reader) (written int64, err error, handled bool) {
return 0, nil, true return 0, nil, true
} }
} }
s, ok := r.(*TCPConn)
if !ok { var s *netFD
if tc, ok := r.(*TCPConn); ok {
s = tc.fd
} else if uc, ok := r.(*UnixConn); ok {
s = uc.fd
} else {
return 0, nil, false return 0, nil, false
} }
written, handled, sc, err := poll.Splice(&c.pfd, &s.fd.pfd, remain)
written, handled, sc, err := poll.Splice(&c.pfd, &s.pfd, remain)
if lr != nil { if lr != nil {
lr.N -= written lr.N -= written
} }
......
...@@ -7,239 +7,103 @@ ...@@ -7,239 +7,103 @@
package net package net
import ( import (
"bytes"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"log"
"os"
"os/exec"
"strconv"
"sync" "sync"
"testing" "testing"
) )
func TestSplice(t *testing.T) { func TestSplice(t *testing.T) {
t.Run("simple", testSpliceSimple) t.Run("tcp-to-tcp", func(t *testing.T) { testSplice(t, "tcp", "tcp") })
t.Run("multipleWrite", testSpliceMultipleWrite) t.Run("unix-to-tcp", func(t *testing.T) { testSplice(t, "unix", "tcp") })
t.Run("big", testSpliceBig)
t.Run("honorsLimitedReader", testSpliceHonorsLimitedReader)
t.Run("readerAtEOF", testSpliceReaderAtEOF)
t.Run("issue25985", testSpliceIssue25985)
} }
func testSpliceSimple(t *testing.T) { func testSplice(t *testing.T, upNet, downNet string) {
srv, err := newSpliceTestServer() t.Run("simple", spliceTestCase{upNet, downNet, 128, 128, 0}.test)
if err != nil { t.Run("multipleWrite", spliceTestCase{upNet, downNet, 4096, 1 << 20, 0}.test)
t.Fatal(err) t.Run("big", spliceTestCase{upNet, downNet, 5 << 20, 1 << 30, 0}.test)
} t.Run("honorsLimitedReader", spliceTestCase{upNet, downNet, 4096, 1 << 20, 1 << 10}.test)
defer srv.Close() t.Run("updatesLimitedReaderN", spliceTestCase{upNet, downNet, 1024, 4096, 4096 + 100}.test)
copyDone := srv.Copy() t.Run("limitedReaderAtLimit", spliceTestCase{upNet, downNet, 32, 128, 128}.test)
msg := []byte("splice test") t.Run("readerAtEOF", func(t *testing.T) { testSpliceReaderAtEOF(t, upNet, downNet) })
if _, err := srv.Write(msg); err != nil { t.Run("issue25985", func(t *testing.T) { testSpliceIssue25985(t, upNet, downNet) })
t.Fatal(err)
}
got := make([]byte, len(msg))
if _, err := io.ReadFull(srv, got); err != nil {
t.Fatal(err)
}
if !bytes.Equal(got, msg) {
t.Errorf("got %q, wrote %q", got, msg)
}
srv.CloseWrite()
srv.CloseRead()
if err := <-copyDone; err != nil {
t.Errorf("splice: %v", err)
}
} }
func testSpliceMultipleWrite(t *testing.T) { type spliceTestCase struct {
srv, err := newSpliceTestServer() upNet, downNet string
if err != nil {
t.Fatal(err)
}
defer srv.Close()
copyDone := srv.Copy()
msg1 := []byte("splice test part 1 ")
msg2 := []byte(" splice test part 2")
if _, err := srv.Write(msg1); err != nil {
t.Fatalf("Write: %v", err)
}
if _, err := srv.Write(msg2); err != nil {
t.Fatal(err)
}
got := make([]byte, len(msg1)+len(msg2))
if _, err := io.ReadFull(srv, got); err != nil {
t.Fatal(err)
}
want := append(msg1, msg2...)
if !bytes.Equal(got, want) {
t.Errorf("got %q, wrote %q", got, want)
}
srv.CloseWrite()
srv.CloseRead()
if err := <-copyDone; err != nil {
t.Errorf("splice: %v", err)
}
}
func testSpliceBig(t *testing.T) { chunkSize, totalSize int
// The maximum amount of data that internal/poll.Splice will use in a limitReadSize int
// splice(2) call is 4 << 20. Use a bigger size here so that we test an
// amount that doesn't fit in a single call.
size := 5 << 20
srv, err := newSpliceTestServer()
if err != nil {
t.Fatal(err)
}
defer srv.Close()
big := make([]byte, size)
copyDone := srv.Copy()
type readResult struct {
b []byte
err error
}
readDone := make(chan readResult)
go func() {
got := make([]byte, len(big))
_, err := io.ReadFull(srv, got)
readDone <- readResult{got, err}
}()
if _, err := srv.Write(big); err != nil {
t.Fatal(err)
}
res := <-readDone
if res.err != nil {
t.Fatal(res.err)
}
got := res.b
if !bytes.Equal(got, big) {
t.Errorf("input and output differ")
}
srv.CloseWrite()
srv.CloseRead()
if err := <-copyDone; err != nil {
t.Errorf("splice: %v", err)
}
}
func testSpliceHonorsLimitedReader(t *testing.T) {
t.Run("stopsAfterN", testSpliceStopsAfterN)
t.Run("updatesN", testSpliceUpdatesN)
t.Run("readerAtLimit", testSpliceReaderAtLimit)
} }
func testSpliceStopsAfterN(t *testing.T) { func (tc spliceTestCase) test(t *testing.T) {
clientUp, serverUp, err := spliceTestSocketPair("tcp") clientUp, serverUp, err := spliceTestSocketPair(tc.upNet)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer clientUp.Close()
defer serverUp.Close() defer serverUp.Close()
clientDown, serverDown, err := spliceTestSocketPair("tcp") cleanup, err := startSpliceClient(clientUp, "w", tc.chunkSize, tc.totalSize)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer clientDown.Close() defer cleanup()
defer serverDown.Close() clientDown, serverDown, err := spliceTestSocketPair(tc.downNet)
count := 128
copyDone := make(chan error)
lr := &io.LimitedReader{
N: int64(count),
R: serverUp,
}
go func() {
_, err := io.Copy(serverDown, lr)
serverDown.Close()
copyDone <- err
}()
msg := make([]byte, 2*count)
if _, err := clientUp.Write(msg); err != nil {
t.Fatal(err)
}
clientUp.Close()
var buf bytes.Buffer
if _, err := io.Copy(&buf, clientDown); err != nil {
t.Fatal(err)
}
if buf.Len() != count {
t.Errorf("splice transferred %d bytes, want to stop after %d", buf.Len(), count)
}
clientDown.Close()
if err := <-copyDone; err != nil {
t.Errorf("splice: %v", err)
}
}
func testSpliceUpdatesN(t *testing.T) {
clientUp, serverUp, err := spliceTestSocketPair("tcp")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer clientUp.Close()
defer serverUp.Close()
clientDown, serverDown, err := spliceTestSocketPair("tcp")
if err != nil {
t.Fatal(err)
}
defer clientDown.Close()
defer serverDown.Close() defer serverDown.Close()
count := 128 cleanup, err = startSpliceClient(clientDown, "r", tc.chunkSize, tc.totalSize)
copyDone := make(chan error) if err != nil {
lr := &io.LimitedReader{
N: int64(100 + count),
R: serverUp,
}
go func() {
_, err := io.Copy(serverDown, lr)
copyDone <- err
}()
msg := make([]byte, count)
if _, err := clientUp.Write(msg); err != nil {
t.Fatal(err)
}
clientUp.Close()
got := make([]byte, count)
if _, err := io.ReadFull(clientDown, got); err != nil {
t.Fatal(err) t.Fatal(err)
} }
clientDown.Close() defer cleanup()
if err := <-copyDone; err != nil { var (
t.Errorf("splice: %v", err) r io.Reader = serverUp
} size = tc.totalSize
wantN := int64(100) )
if lr.N != wantN { if tc.limitReadSize > 0 {
t.Errorf("lr.N = %d, want %d", lr.N, wantN) if tc.limitReadSize < size {
} size = tc.limitReadSize
} }
func testSpliceReaderAtLimit(t *testing.T) { r = &io.LimitedReader{
clientUp, serverUp, err := spliceTestSocketPair("tcp") N: int64(tc.limitReadSize),
if err != nil { R: serverUp,
t.Fatal(err) }
defer serverUp.Close()
} }
defer clientUp.Close() n, err := io.Copy(serverDown, r)
defer serverUp.Close() serverDown.Close()
clientDown, serverDown, err := spliceTestSocketPair("tcp")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer clientDown.Close() if want := int64(size); want != n {
defer serverDown.Close() t.Errorf("want %d bytes spliced, got %d", want, n)
lr := &io.LimitedReader{
N: 0,
R: serverUp,
} }
_, err, handled := splice(serverDown.(*TCPConn).fd, lr)
if !handled { if tc.limitReadSize > 0 {
t.Errorf("exhausted LimitedReader: got err = %v, handled = %t, want handled = true", err, handled) wantN := 0
if tc.limitReadSize > size {
wantN = tc.limitReadSize - size
}
if n := r.(*io.LimitedReader).N; n != int64(wantN) {
t.Errorf("r.N = %d, want %d", n, wantN)
}
} }
} }
func testSpliceReaderAtEOF(t *testing.T) { func testSpliceReaderAtEOF(t *testing.T, upNet, downNet string) {
clientUp, serverUp, err := spliceTestSocketPair("tcp") clientUp, serverUp, err := spliceTestSocketPair(upNet)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer clientUp.Close() defer clientUp.Close()
clientDown, serverDown, err := spliceTestSocketPair("tcp") clientDown, serverDown, err := spliceTestSocketPair(downNet)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -265,7 +129,7 @@ func testSpliceReaderAtEOF(t *testing.T) { ...@@ -265,7 +129,7 @@ func testSpliceReaderAtEOF(t *testing.T) {
// get a goodbye signal. Test for the goodbye signal. // get a goodbye signal. Test for the goodbye signal.
msg := "bye" msg := "bye"
go func() { go func() {
serverDown.(*TCPConn).ReadFrom(serverUp) serverDown.(io.ReaderFrom).ReadFrom(serverUp)
io.WriteString(serverDown, msg) io.WriteString(serverDown, msg)
serverDown.Close() serverDown.Close()
}() }()
...@@ -280,13 +144,13 @@ func testSpliceReaderAtEOF(t *testing.T) { ...@@ -280,13 +144,13 @@ func testSpliceReaderAtEOF(t *testing.T) {
} }
} }
func testSpliceIssue25985(t *testing.T) { func testSpliceIssue25985(t *testing.T, upNet, downNet string) {
front, err := newLocalListener("tcp") front, err := newLocalListener(upNet)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer front.Close() defer front.Close()
back, err := newLocalListener("tcp") back, err := newLocalListener(downNet)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -300,7 +164,7 @@ func testSpliceIssue25985(t *testing.T) { ...@@ -300,7 +164,7 @@ func testSpliceIssue25985(t *testing.T) {
if err != nil { if err != nil {
return return
} }
dst, err := Dial("tcp", back.Addr().String()) dst, err := Dial(downNet, back.Addr().String())
if err != nil { if err != nil {
return return
} }
...@@ -318,7 +182,7 @@ func testSpliceIssue25985(t *testing.T) { ...@@ -318,7 +182,7 @@ func testSpliceIssue25985(t *testing.T) {
go proxy() go proxy()
toFront, err := Dial("tcp", front.Addr().String()) toFront, err := Dial(upNet, front.Addr().String())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -340,166 +204,71 @@ func testSpliceIssue25985(t *testing.T) { ...@@ -340,166 +204,71 @@ func testSpliceIssue25985(t *testing.T) {
wg.Wait() wg.Wait()
} }
func BenchmarkTCPReadFrom(b *testing.B) { func BenchmarkSplice(b *testing.B) {
testHookUninstaller.Do(uninstallTestHooks) testHookUninstaller.Do(uninstallTestHooks)
var chunkSizes []int b.Run("tcp-to-tcp", func(b *testing.B) { benchSplice(b, "tcp", "tcp") })
for i := uint(10); i <= 20; i++ { b.Run("unix-to-tcp", func(b *testing.B) { benchSplice(b, "unix", "tcp") })
chunkSizes = append(chunkSizes, 1<<i)
}
// To benchmark the genericReadFrom code path, set this to false.
useSplice := true
for _, chunkSize := range chunkSizes {
b.Run(fmt.Sprint(chunkSize), func(b *testing.B) {
benchmarkSplice(b, chunkSize, useSplice)
})
}
} }
func benchmarkSplice(b *testing.B, chunkSize int, useSplice bool) { func benchSplice(b *testing.B, upNet, downNet string) {
srv, err := newSpliceTestServer() for i := 0; i <= 10; i++ {
if err != nil { chunkSize := 1 << uint(i+10)
b.Fatal(err) tc := spliceTestCase{
} upNet: upNet,
defer srv.Close() downNet: downNet,
var copyDone <-chan error chunkSize: chunkSize,
if useSplice {
copyDone = srv.Copy()
} else {
copyDone = srv.CopyNoSplice()
}
chunk := make([]byte, chunkSize)
discardDone := make(chan struct{})
go func() {
for {
buf := make([]byte, chunkSize)
_, err := srv.Read(buf)
if err != nil {
break
}
} }
discardDone <- struct{}{}
}() b.Run(strconv.Itoa(chunkSize), tc.bench)
b.SetBytes(int64(chunkSize))
b.ResetTimer()
for i := 0; i < b.N; i++ {
srv.Write(chunk)
} }
srv.CloseWrite()
<-copyDone
srv.CloseRead()
<-discardDone
} }
type spliceTestServer struct { func (tc spliceTestCase) bench(b *testing.B) {
clientUp io.WriteCloser // To benchmark the genericReadFrom code path, set this to false.
clientDown io.ReadCloser useSplice := true
serverUp io.ReadCloser
serverDown io.WriteCloser
}
func newSpliceTestServer() (*spliceTestServer, error) { clientUp, serverUp, err := spliceTestSocketPair(tc.upNet)
// For now, both networks are hard-coded to TCP.
// If splice is enabled for non-tcp upstream connections,
// newSpliceTestServer will need to take a network parameter.
clientUp, serverUp, err := spliceTestSocketPair("tcp")
if err != nil { if err != nil {
return nil, err b.Fatal(err)
} }
clientDown, serverDown, err := spliceTestSocketPair("tcp") defer serverUp.Close()
cleanup, err := startSpliceClient(clientUp, "w", tc.chunkSize, tc.chunkSize*b.N)
if err != nil { if err != nil {
clientUp.Close() b.Fatal(err)
serverUp.Close()
return nil, err
} }
return &spliceTestServer{clientUp, clientDown, serverUp, serverDown}, nil defer cleanup()
}
// Read reads from the downstream connection.
func (srv *spliceTestServer) Read(b []byte) (int, error) {
return srv.clientDown.Read(b)
}
// Write writes to the upstream connection.
func (srv *spliceTestServer) Write(b []byte) (int, error) {
return srv.clientUp.Write(b)
}
// Close closes the server. clientDown, serverDown, err := spliceTestSocketPair(tc.downNet)
func (srv *spliceTestServer) Close() error { if err != nil {
err := srv.closeUp() b.Fatal(err)
err1 := srv.closeDown()
if err == nil {
return err1
} }
return err defer serverDown.Close()
}
// CloseWrite closes the client side of the upstream connection.
func (srv *spliceTestServer) CloseWrite() error {
return srv.clientUp.Close()
}
// CloseRead closes the client side of the downstream connection.
func (srv *spliceTestServer) CloseRead() error {
return srv.clientDown.Close()
}
// Copy copies from the server side of the upstream connection
// to the server side of the downstream connection, in a separate
// goroutine. Copy is done when the first send on the returned
// channel succeeds.
func (srv *spliceTestServer) Copy() <-chan error {
ch := make(chan error)
go func() {
_, err := io.Copy(srv.serverDown, srv.serverUp)
ch <- err
close(ch)
}()
return ch
}
// CopyNoSplice is like Copy, but ensures that the splice code path cleanup, err = startSpliceClient(clientDown, "r", tc.chunkSize, tc.chunkSize*b.N)
// is not reached. if err != nil {
func (srv *spliceTestServer) CopyNoSplice() <-chan error { b.Fatal(err)
type onlyReader struct {
io.Reader
} }
ch := make(chan error) defer cleanup()
go func() {
_, err := io.Copy(srv.serverDown, onlyReader{srv.serverUp})
ch <- err
close(ch)
}()
return ch
}
func (srv *spliceTestServer) closeUp() error { b.SetBytes(int64(tc.chunkSize))
var err, err1 error b.ResetTimer()
if srv.serverUp != nil {
err = srv.serverUp.Close()
}
if srv.clientUp != nil {
err1 = srv.clientUp.Close()
}
if err == nil {
return err1
}
return err
}
func (srv *spliceTestServer) closeDown() error { if useSplice {
var err, err1 error _, err := io.Copy(serverDown, serverUp)
if srv.serverDown != nil { if err != nil {
err = srv.serverDown.Close() b.Fatal(err)
} }
if srv.clientDown != nil { } else {
err1 = srv.clientDown.Close() type onlyReader struct {
} io.Reader
if err == nil { }
return err1 _, err := io.Copy(serverDown, onlyReader{serverUp})
if err != nil {
b.Fatal(err)
}
} }
return err
} }
func spliceTestSocketPair(net string) (client, server Conn, err error) { func spliceTestSocketPair(net string) (client, server Conn, err error) {
...@@ -530,3 +299,85 @@ func spliceTestSocketPair(net string) (client, server Conn, err error) { ...@@ -530,3 +299,85 @@ func spliceTestSocketPair(net string) (client, server Conn, err error) {
} }
return client, server, nil return client, server, nil
} }
func startSpliceClient(conn Conn, op string, chunkSize, totalSize int) (func(), error) {
f, err := conn.(interface{ File() (*os.File, error) }).File()
if err != nil {
return nil, err
}
cmd := exec.Command(os.Args[0], os.Args[1:]...)
cmd.Env = []string{
"GO_NET_TEST_SPLICE=1",
"GO_NET_TEST_SPLICE_OP=" + op,
"GO_NET_TEST_SPLICE_CHUNK_SIZE=" + strconv.Itoa(chunkSize),
"GO_NET_TEST_SPLICE_TOTAL_SIZE=" + strconv.Itoa(totalSize),
}
cmd.ExtraFiles = append(cmd.ExtraFiles, f)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
return nil, err
}
donec := make(chan struct{})
go func() {
cmd.Wait()
conn.Close()
f.Close()
close(donec)
}()
return func() { <-donec }, nil
}
func init() {
if os.Getenv("GO_NET_TEST_SPLICE") == "" {
return
}
defer os.Exit(0)
f := os.NewFile(uintptr(3), "splice-test-conn")
defer f.Close()
conn, err := FileConn(f)
if err != nil {
log.Fatal(err)
}
var chunkSize int
if chunkSize, err = strconv.Atoi(os.Getenv("GO_NET_TEST_SPLICE_CHUNK_SIZE")); err != nil {
log.Fatal(err)
}
buf := make([]byte, chunkSize)
var totalSize int
if totalSize, err = strconv.Atoi(os.Getenv("GO_NET_TEST_SPLICE_TOTAL_SIZE")); err != nil {
log.Fatal(err)
}
var fn func([]byte) (int, error)
switch op := os.Getenv("GO_NET_TEST_SPLICE_OP"); op {
case "r":
fn = conn.Read
case "w":
defer conn.Close()
fn = conn.Write
default:
log.Fatalf("unknown op %q", op)
}
var n int
for count := 0; count < totalSize; count += n {
if count+chunkSize > totalSize {
buf = buf[:totalSize-count]
}
var err error
if n, err = fn(buf); err != nil {
return
}
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment