Commit 89b26760 authored by Mikio Hara's avatar Mikio Hara Committed by Russ Cox

net: implement TCP connection setup with fast failover

This CL adds minimal support of Happy Eyeballs-like TCP connection
setup to Dialer API. Happy Eyeballs and derivation techniques are
described in the following:

- Happy Eyeballs: Success with Dual-Stack Hosts
  http://tools.ietf.org/html/rfc6555

- Analysing Dual Stack Behaviour and IPv6 Quality
  http://www.potaroo.net/presentations/2012-04-17-dual-stack-quality.pdf

Usually, the techniques consist of three components below.

- DNS query racers, that run A and AAAA queries in parallel or series
- A short list of destination addresses
- TCP SYN racers, that run IPv4 and IPv6 transport in parallel or series

This CL implements only the latter two. The existing DNS query
component gathers together A and AAAA records in series, so we don't
touch it here. This CL just uses extended resolveInternetAddr and makes
it possible to run multiple Dial racers in parallel.

For example, when the given destination is a DNS name and the name has
multiple address family A and AAAA records, and it happens on the TCP
wildcard network "tcp" with DualStack=true like the following:

(&net.Dialer{DualStack: true}).Dial("tcp", "www.example.com:80")

The function will return a first established connection either TCP over
IPv4 or TCP over IPv6, and close the other connection internally.

Fixes #3610.
Fixes #5267.

Benchmark results on freebsd/amd64 virtual machine, tip vs. tip+12416043:

benchmark                           old ns/op    new ns/op    delta
BenchmarkTCP4OneShot                    50696        52141   +2.85%
BenchmarkTCP4OneShotTimeout             65775        66426   +0.99%
BenchmarkTCP4Persistent                 10986        10457   -4.82%
BenchmarkTCP4PersistentTimeout          11207        10445   -6.80%
BenchmarkTCP6OneShot                    62009        63718   +2.76%
BenchmarkTCP6OneShotTimeout             78351        79138   +1.00%
BenchmarkTCP6Persistent                 14695        14659   -0.24%
BenchmarkTCP6PersistentTimeout          15032        14646   -2.57%
BenchmarkTCP4ConcurrentReadWrite         7215         6217  -13.83%
BenchmarkTCP6ConcurrentReadWrite         7528         7493   -0.46%

benchmark                          old allocs   new allocs    delta
BenchmarkTCP4OneShot                       36           36    0.00%
BenchmarkTCP4OneShotTimeout                36           36    0.00%
BenchmarkTCP4Persistent                     0            0     n/a%
BenchmarkTCP4PersistentTimeout              0            0     n/a%
BenchmarkTCP6OneShot                       37           37    0.00%
BenchmarkTCP6OneShotTimeout                37           37    0.00%
BenchmarkTCP6Persistent                     0            0     n/a%
BenchmarkTCP6PersistentTimeout              0            0     n/a%
BenchmarkTCP4ConcurrentReadWrite            0            0     n/a%
BenchmarkTCP6ConcurrentReadWrite            0            0     n/a%

benchmark                           old bytes    new bytes    delta
BenchmarkTCP4OneShot                     2500         2503    0.12%
BenchmarkTCP4OneShotTimeout              2508         2505   -0.12%
BenchmarkTCP4Persistent                     0            0     n/a%
BenchmarkTCP4PersistentTimeout              0            0     n/a%
BenchmarkTCP6OneShot                     2713         2707   -0.22%
BenchmarkTCP6OneShotTimeout              2722         2720   -0.07%
BenchmarkTCP6Persistent                     0            0     n/a%
BenchmarkTCP6PersistentTimeout              0            0     n/a%
BenchmarkTCP4ConcurrentReadWrite            0            0     n/a%
BenchmarkTCP6ConcurrentReadWrite            0            0     n/a%

R=golang-dev, bradfitz, nightlyone, rsc
CC=golang-dev
https://golang.org/cl/12416043
parent e6a49555
......@@ -37,6 +37,13 @@ type Dialer struct {
// network being dialed.
// If nil, a local address is automatically chosen.
LocalAddr Addr
// DualStack allows a single dial to attempt to establish
// multiple IPv4 and IPv6 connections and to return the first
// established connection when the network is "tcp" and the
// destination is a host name that has multiple address family
// DNS records.
DualStack bool
}
// Return either now+Timeout or Deadline, whichever comes first.
......@@ -143,10 +150,72 @@ func DialTimeout(network, address string, timeout time.Duration) (Conn, error) {
// See func Dial for a description of the network and address
// parameters.
func (d *Dialer) Dial(network, address string) (Conn, error) {
return resolveAndDial(network, address, d.LocalAddr, d.deadline())
ra, err := resolveAddr("dial", network, address, d.deadline())
if err != nil {
return nil, &OpError{Op: "dial", Net: network, Addr: nil, Err: err}
}
dialer := func(deadline time.Time) (Conn, error) {
return dialSingle(network, address, d.LocalAddr, ra.toAddr(), deadline)
}
if ras, ok := ra.(addrList); ok && d.DualStack && network == "tcp" {
dialer = func(deadline time.Time) (Conn, error) {
return dialMulti(network, address, d.LocalAddr, ras, deadline)
}
}
return dial(network, ra.toAddr(), dialer, d.deadline())
}
func dial(net, addr string, la, ra Addr, deadline time.Time) (Conn, error) {
// dialMulti attempts to establish connections to each destination of
// the list of addresses. It will return the first established
// connection and close the other connections. Otherwise it returns
// error on the last attempt.
func dialMulti(net, addr string, la Addr, ras addrList, deadline time.Time) (Conn, error) {
type racer struct {
Conn
Addr
error
}
// Sig controls the flow of dial results on lane. It passes a
// token to the next racer and also indicates the end of flow
// by using closed channel.
sig := make(chan bool, 1)
lane := make(chan racer, 1)
for _, ra := range ras {
go func(ra Addr) {
c, err := dialSingle(net, addr, la, ra, deadline)
if _, ok := <-sig; ok {
lane <- racer{c, ra, err}
} else if err == nil {
// We have to return the resources
// that belong to the other
// connections here for avoiding
// unnecessary resource starvation.
c.Close()
}
}(ra.toAddr())
}
defer close(sig)
var failAddr Addr
lastErr := errTimeout
nracers := len(ras)
for nracers > 0 {
sig <- true
select {
case racer := <-lane:
if racer.error == nil {
return racer.Conn, nil
}
failAddr = racer.Addr
lastErr = racer.error
nracers--
}
}
return nil, &OpError{Op: "dial", Net: net, Addr: failAddr, Err: lastErr}
}
// dialSingle attempts to establish and returns a single connection to
// the destination address.
func dialSingle(net, addr string, la, ra Addr, deadline time.Time) (Conn, error) {
if la != nil && la.Network() != ra.Network() {
return nil, &OpError{Op: "dial", Net: net, Addr: ra, Err: errors.New("mismatched local address type " + la.Network())}
}
......@@ -168,13 +237,6 @@ func dial(net, addr string, la, ra Addr, deadline time.Time) (Conn, error) {
}
}
type stringAddr struct {
net, addr string
}
func (a stringAddr) Network() string { return a.net }
func (a stringAddr) String() string { return a.addr }
// Listen announces on the local network address laddr.
// The network net must be a stream-oriented network: "tcp", "tcp4",
// "tcp6", "unix" or "unixpacket".
......
......@@ -12,62 +12,35 @@ import (
var testingIssue5349 bool // used during tests
// resolveAndDialChannel is the simple pure-Go implementation of
// resolveAndDial, still used on operating systems where the deadline
// hasn't been pushed down into the pollserver. (Plan 9 and some old
// versions of Windows)
func resolveAndDialChannel(net, addr string, localAddr Addr, deadline time.Time) (Conn, error) {
// dialChannel is the simple pure-Go implementation of dial, still
// used on operating systems where the deadline hasn't been pushed
// down into the pollserver. (Plan 9 and some old versions of Windows)
func dialChannel(net string, ra Addr, dialer func(time.Time) (Conn, error), deadline time.Time) (Conn, error) {
var timeout time.Duration
if !deadline.IsZero() {
timeout = deadline.Sub(time.Now())
}
if timeout <= 0 {
ra, err := resolveAddr("dial", net, addr, noDeadline)
if err != nil {
return nil, &OpError{Op: "dial", Net: net, Addr: nil, Err: err}
}
return dial(net, addr, localAddr, ra.toAddr(), noDeadline)
return dialer(noDeadline)
}
t := time.NewTimer(timeout)
defer t.Stop()
type pair struct {
type racer struct {
Conn
error
}
ch := make(chan pair, 1)
resolvedAddr := make(chan Addr, 1)
ch := make(chan racer, 1)
go func() {
if testingIssue5349 {
time.Sleep(time.Millisecond)
}
ra, err := resolveAddr("dial", net, addr, noDeadline)
if err != nil {
ch <- pair{nil, &OpError{Op: "dial", Net: net, Addr: nil, Err: err}}
return
}
resolvedAddr <- ra.toAddr() // in case we need it for OpError
c, err := dial(net, addr, localAddr, ra.toAddr(), noDeadline)
ch <- pair{c, err}
c, err := dialer(noDeadline)
ch <- racer{c, err}
}()
select {
case <-t.C:
// Try to use the real Addr in our OpError, if we resolved it
// before the timeout. Otherwise we just use stringAddr.
var ra Addr
select {
case a := <-resolvedAddr:
ra = a
default:
ra = &stringAddr{net, addr}
}
err := &OpError{
Op: "dial",
Net: net,
Addr: ra,
Err: errTimeout,
}
return nil, err
case p := <-ch:
return p.Conn, p.error
return nil, &OpError{Op: "dial", Net: net, Addr: ra, Err: errTimeout}
case racer := <-ch:
return racer.Conn, racer.error
}
}
......@@ -5,13 +5,17 @@
package net
import (
"bytes"
"flag"
"fmt"
"io"
"os"
"os/exec"
"reflect"
"regexp"
"runtime"
"strconv"
"sync"
"testing"
"time"
)
......@@ -314,6 +318,96 @@ func TestDialTimeoutFDLeak(t *testing.T) {
}
}
func numTCP() (ntcp, nopen, nclose int, err error) {
lsof, err := exec.Command("lsof", "-n", "-p", strconv.Itoa(os.Getpid())).Output()
if err != nil {
return 0, 0, 0, err
}
ntcp += bytes.Count(lsof, []byte("TCP"))
for _, state := range []string{"LISTEN", "SYN_SENT", "SYN_RECEIVED", "ESTABLISHED"} {
nopen += bytes.Count(lsof, []byte(state))
}
for _, state := range []string{"CLOSED", "CLOSE_WAIT", "LAST_ACK", "FIN_WAIT_1", "FIN_WAIT_2", "CLOSING", "TIME_WAIT"} {
nclose += bytes.Count(lsof, []byte(state))
}
return ntcp, nopen, nclose, nil
}
func TestDialMultiFDLeak(t *testing.T) {
if !supportsIPv4 || !supportsIPv6 {
t.Skip("neither ipv4 nor ipv6 is supported")
}
halfDeadServer := func(dss *dualStackServer, ln Listener) {
for {
if c, err := ln.Accept(); err != nil {
return
} else {
// It just keeps established
// connections like a half-dead server
// does.
dss.putConn(c)
}
}
}
dss, err := newDualStackServer([]streamListener{
{net: "tcp4", addr: "127.0.0.1"},
{net: "tcp6", addr: "[::1]"},
})
if err != nil {
t.Fatalf("newDualStackServer failed: %v", err)
}
defer dss.teardown()
if err := dss.buildup(halfDeadServer); err != nil {
t.Fatalf("dualStackServer.buildup failed: %v", err)
}
_, before, _, err := numTCP()
if err != nil {
t.Skipf("skipping test; error finding or running lsof: %v", err)
}
var wg sync.WaitGroup
portnum, _, _ := dtoi(dss.port, 0)
ras := addrList{
// Losers that will fail to connect, see RFC 6890.
&TCPAddr{IP: IPv4(198, 18, 0, 254), Port: portnum},
&TCPAddr{IP: ParseIP("2001:2::254"), Port: portnum},
// Winner candidates of this race.
&TCPAddr{IP: IPv4(127, 0, 0, 1), Port: portnum},
&TCPAddr{IP: IPv6loopback, Port: portnum},
// Losers that will have established connections.
&TCPAddr{IP: IPv4(127, 0, 0, 1), Port: portnum},
&TCPAddr{IP: IPv6loopback, Port: portnum},
}
const T1 = 10 * time.Millisecond
const T2 = 2 * T1
const N = 10
for i := 0; i < N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if c, err := dialMulti("tcp", "fast failover test", nil, ras, time.Now().Add(T1)); err == nil {
c.Close()
}
}()
}
wg.Wait()
time.Sleep(T2)
ntcp, after, nclose, err := numTCP()
if err != nil {
t.Skipf("skipping test; error finding or running lsof: %v", err)
}
t.Logf("tcp sessions: %v, open sessions: %v, closing sessions: %v", ntcp, after, nclose)
if after != before {
t.Fatalf("got %v open sessions; expected %v", after, before)
}
}
func numFD() int {
if runtime.GOOS == "linux" {
f, err := os.Open("/proc/self/fd")
......@@ -404,3 +498,46 @@ func TestDialer(t *testing.T) {
t.Error(err)
}
}
func TestDialDualStackLocalhost(t *testing.T) {
if ips, err := LookupIP("localhost"); err != nil {
t.Fatalf("LookupIP failed: %v", err)
} else if len(ips) < 2 || !supportsIPv4 || !supportsIPv6 {
t.Skip("localhost doesn't have a pair of different address family IP addresses")
}
touchAndByeServer := func(dss *dualStackServer, ln Listener) {
for {
if c, err := ln.Accept(); err != nil {
return
} else {
c.Close()
}
}
}
dss, err := newDualStackServer([]streamListener{
{net: "tcp4", addr: "127.0.0.1"},
{net: "tcp6", addr: "[::1]"},
})
if err != nil {
t.Fatalf("newDualStackServer failed: %v", err)
}
defer dss.teardown()
if err := dss.buildup(touchAndByeServer); err != nil {
t.Fatalf("dualStackServer.buildup failed: %v", err)
}
d := &Dialer{DualStack: true}
for _ = range dss.lns {
if c, err := d.Dial("tcp", "localhost:"+dss.port); err != nil {
t.Errorf("Dial failed: %v", err)
} else {
if addr := c.LocalAddr().(*TCPAddr); addr.IP.To4() != nil {
dss.teardownNetwork("tcp4")
} else if addr.IP.To16() != nil && addr.IP.To4() == nil {
dss.teardownNetwork("tcp6")
}
c.Close()
}
}
}
......@@ -41,6 +41,34 @@ func TestResolveGoogle(t *testing.T) {
}
}
func TestDialGoogle(t *testing.T) {
if testing.Short() || !*testExternal {
t.Skip("skipping test to avoid external network")
}
d := &Dialer{DualStack: true}
for _, network := range []string{"tcp", "tcp4", "tcp6"} {
if network == "tcp" && !supportsIPv4 && !supportsIPv6 {
t.Logf("skipping test; both ipv4 and ipv6 are not supported")
continue
} else if network == "tcp4" && !supportsIPv4 {
t.Logf("skipping test; ipv4 is not supported")
continue
} else if network == "tcp6" && !supportsIPv6 {
t.Logf("skipping test; ipv6 is not supported")
continue
} else if network == "tcp6" && !*testIPv6 {
t.Logf("test disabled; use -ipv6 to enable")
continue
}
if c, err := d.Dial(network, "www.google.com:http"); err != nil {
t.Errorf("Dial failed: %v", err)
} else {
c.Close()
}
}
}
// fd is already connected to the destination, port 80.
// Run an HTTP request to fetch the appropriate page.
func fetchGoogle(t *testing.T, fd Conn, network, addr string) {
......
......@@ -21,10 +21,10 @@ type netFD struct {
func sysInit() {
}
func resolveAndDial(net, addr string, localAddr Addr, deadline time.Time) (Conn, error) {
func dial(net string, ra Addr, dialer func(time.Time) (Conn, error), deadline time.Time) (Conn, error) {
// On plan9, use the relatively inefficient
// goroutine-racing implementation.
return resolveAndDialChannel(net, addr, localAddr, deadline)
return dialChannel(net, ra, dialer, deadline)
}
func newFD(proto, name string, ctl, data *os.File, laddr, raddr Addr) *netFD {
......
......@@ -36,12 +36,8 @@ type netFD struct {
func sysInit() {
}
func resolveAndDial(net, addr string, localAddr Addr, deadline time.Time) (Conn, error) {
ra, err := resolveAddr("dial", net, addr, deadline)
if err != nil {
return nil, &OpError{Op: "dial", Net: net, Addr: nil, Err: err}
}
return dial(net, addr, localAddr, ra.toAddr(), deadline)
func dial(network string, ra Addr, dialer func(time.Time) (Conn, error), deadline time.Time) (Conn, error) {
return dialer(deadline)
}
func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
......
......@@ -83,17 +83,13 @@ func canUseConnectEx(net string) bool {
return syscall.LoadConnectEx() == nil
}
func resolveAndDial(net, addr string, localAddr Addr, deadline time.Time) (Conn, error) {
func dial(net string, ra Addr, dialer func(time.Time) (Conn, error), deadline time.Time) (Conn, error) {
if !canUseConnectEx(net) {
// Use the relatively inefficient goroutine-racing
// implementation of DialTimeout.
return resolveAndDialChannel(net, addr, localAddr, deadline)
return dialChannel(net, ra, dialer, deadline)
}
ra, err := resolveAddr("dial", net, addr, deadline)
if err != nil {
return nil, &OpError{Op: "dial", Net: net, Addr: nil, Err: err}
}
return dial(net, addr, localAddr, ra.toAddr(), deadline)
return dialer(deadline)
}
// operation contains superset of data necessary to perform all async IO.
......
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package net
import "sync"
type streamListener struct {
net, addr string
ln Listener
}
type dualStackServer struct {
lnmu sync.RWMutex
lns []streamListener
port string
cmu sync.RWMutex
cs []Conn // established connections at the passive open side
}
func (dss *dualStackServer) buildup(server func(*dualStackServer, Listener)) error {
for i := range dss.lns {
go server(dss, dss.lns[i].ln)
}
return nil
}
func (dss *dualStackServer) putConn(c Conn) error {
dss.cmu.Lock()
dss.cs = append(dss.cs, c)
dss.cmu.Unlock()
return nil
}
func (dss *dualStackServer) teardownNetwork(net string) error {
dss.lnmu.Lock()
for i := range dss.lns {
if net == dss.lns[i].net && dss.lns[i].ln != nil {
dss.lns[i].ln.Close()
dss.lns[i].ln = nil
}
}
dss.lnmu.Unlock()
return nil
}
func (dss *dualStackServer) teardown() error {
dss.lnmu.Lock()
for i := range dss.lns {
if dss.lns[i].ln != nil {
dss.lns[i].ln.Close()
}
}
dss.lnmu.Unlock()
dss.cmu.Lock()
for _, c := range dss.cs {
c.Close()
}
dss.cmu.Unlock()
return nil
}
func newDualStackServer(lns []streamListener) (*dualStackServer, error) {
dss := &dualStackServer{lns: lns, port: "0"}
for i := range dss.lns {
ln, err := Listen(dss.lns[i].net, dss.lns[i].addr+":"+dss.port)
if err != nil {
dss.teardown()
return nil, err
}
dss.lns[i].ln = ln
if dss.port == "0" {
if _, dss.port, err = SplitHostPort(ln.Addr().String()); err != nil {
dss.teardown()
return nil, err
}
}
}
return dss, nil
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment