Commit b6e0d39a authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: Transport socket late binding

Implement what Chrome calls socket "late binding". See:
https://insouciant.org/tech/connection-management-in-chromium/

In a nutshell, if our HTTP client needs a TCP connection to a
remote host and there's not an idle one available, rather than
kick off a dial and wait for that specific dial, we instead
kick off a dial and wait for either our own dial to finish, or
any other TCP connection to that same host to become
available.

The implementation looks like a classic "Learning Go
Concurrency" slide.

Chrome's commit and numbers:
http://src.chromium.org/viewvc/chrome?view=rev&revision=36230

R=golang-dev, daniel.morsing, adg
CC=golang-dev
https://golang.org/cl/7587043
parent a566deac
...@@ -41,12 +41,13 @@ const DefaultMaxIdleConnsPerHost = 2 ...@@ -41,12 +41,13 @@ const DefaultMaxIdleConnsPerHost = 2
// https, and http proxies (for either http or https with CONNECT). // https, and http proxies (for either http or https with CONNECT).
// Transport can also cache connections for future re-use. // Transport can also cache connections for future re-use.
type Transport struct { type Transport struct {
idleMu sync.Mutex idleMu sync.Mutex
idleConn map[string][]*persistConn idleConn map[string][]*persistConn
reqMu sync.Mutex idleConnCh map[string]chan *persistConn
reqConn map[*Request]*persistConn reqMu sync.Mutex
altMu sync.RWMutex reqConn map[*Request]*persistConn
altProto map[string]RoundTripper // nil or map of URI scheme => RoundTripper altMu sync.RWMutex
altProto map[string]RoundTripper // nil or map of URI scheme => RoundTripper
// TODO: tunable on global max cached connections // TODO: tunable on global max cached connections
// TODO: tunable on timeout on cached connections // TODO: tunable on timeout on cached connections
...@@ -279,6 +280,17 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { ...@@ -279,6 +280,17 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
max = DefaultMaxIdleConnsPerHost max = DefaultMaxIdleConnsPerHost
} }
t.idleMu.Lock() t.idleMu.Lock()
select {
case t.idleConnCh[key] <- pconn:
// We're done with this pconn and somebody else is
// currently waiting for a conn of this type (they're
// actively dialing, but this conn is ready
// first). Chrome calls this socket late binding. See
// https://insouciant.org/tech/connection-management-in-chromium/
t.idleMu.Unlock()
return true
default:
}
if t.idleConn == nil { if t.idleConn == nil {
t.idleConn = make(map[string][]*persistConn) t.idleConn = make(map[string][]*persistConn)
} }
...@@ -297,8 +309,23 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { ...@@ -297,8 +309,23 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
return true return true
} }
func (t *Transport) getIdleConnCh(cm *connectMethod) chan *persistConn {
key := cm.key()
t.idleMu.Lock()
defer t.idleMu.Unlock()
if t.idleConnCh == nil {
t.idleConnCh = make(map[string]chan *persistConn)
}
ch, ok := t.idleConnCh[key]
if !ok {
ch = make(chan *persistConn)
t.idleConnCh[key] = ch
}
return ch
}
func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) { func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) {
key := cm.String() key := cm.key()
t.idleMu.Lock() t.idleMu.Lock()
defer t.idleMu.Unlock() defer t.idleMu.Unlock()
if t.idleConn == nil { if t.idleConn == nil {
...@@ -354,6 +381,37 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { ...@@ -354,6 +381,37 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
return pc, nil return pc, nil
} }
type dialRes struct {
pc *persistConn
err error
}
dialc := make(chan dialRes)
go func() {
pc, err := t.dialConn(cm)
dialc <- dialRes{pc, err}
}()
idleConnCh := t.getIdleConnCh(cm)
select {
case v := <-dialc:
// Our dial finished.
return v.pc, v.err
case pc := <-idleConnCh:
// Another request finished first and its net.Conn
// became available before our dial. Or somebody
// else's dial that they didn't use.
// But our dial is still going, so give it away
// when it finishes:
go func() {
if v := <-dialc; v.err == nil {
t.putIdleConn(v.pc)
}
}()
return pc, nil
}
}
func (t *Transport) dialConn(cm *connectMethod) (*persistConn, error) {
conn, err := t.dial("tcp", cm.addr()) conn, err := t.dial("tcp", cm.addr())
if err != nil { if err != nil {
if cm.proxyURL != nil { if cm.proxyURL != nil {
...@@ -366,7 +424,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { ...@@ -366,7 +424,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
pconn := &persistConn{ pconn := &persistConn{
t: t, t: t,
cacheKey: cm.String(), cacheKey: cm.key(),
conn: conn, conn: conn,
reqch: make(chan requestAndChan, 50), reqch: make(chan requestAndChan, 50),
writech: make(chan writeRequest, 50), writech: make(chan writeRequest, 50),
...@@ -516,6 +574,10 @@ type connectMethod struct { ...@@ -516,6 +574,10 @@ type connectMethod struct {
targetAddr string // Not used if proxy + http targetScheme (4th example in table) targetAddr string // Not used if proxy + http targetScheme (4th example in table)
} }
func (ck *connectMethod) key() string {
return ck.String() // TODO: use a struct type instead
}
func (ck *connectMethod) String() string { func (ck *connectMethod) String() string {
proxyStr := "" proxyStr := ""
targetAddr := ck.targetAddr targetAddr := ck.targetAddr
......
...@@ -1324,6 +1324,64 @@ func TestTransportNoHost(t *testing.T) { ...@@ -1324,6 +1324,64 @@ func TestTransportNoHost(t *testing.T) {
} }
} }
func TestTransportSocketLateBinding(t *testing.T) {
defer checkLeakedTransports(t)
mux := NewServeMux()
fooGate := make(chan bool, 1)
mux.HandleFunc("/foo", func(w ResponseWriter, r *Request) {
w.Header().Set("foo-ipport", r.RemoteAddr)
w.(Flusher).Flush()
<-fooGate
})
mux.HandleFunc("/bar", func(w ResponseWriter, r *Request) {
w.Header().Set("bar-ipport", r.RemoteAddr)
})
ts := httptest.NewServer(mux)
defer ts.Close()
dialGate := make(chan bool, 1)
tr := &Transport{
Dial: func(n, addr string) (net.Conn, error) {
<-dialGate
return net.Dial(n, addr)
},
DisableKeepAlives: false,
}
defer tr.CloseIdleConnections()
c := &Client{
Transport: tr,
}
dialGate <- true // only allow one dial
fooRes, err := c.Get(ts.URL + "/foo")
if err != nil {
t.Fatal(err)
}
fooAddr := fooRes.Header.Get("foo-ipport")
if fooAddr == "" {
t.Fatal("No addr on /foo request")
}
time.AfterFunc(200*time.Millisecond, func() {
// let the foo response finish so we can use its
// connection for /bar
fooGate <- true
io.Copy(ioutil.Discard, fooRes.Body)
fooRes.Body.Close()
})
barRes, err := c.Get(ts.URL + "/bar")
if err != nil {
t.Fatal(err)
}
barAddr := barRes.Header.Get("bar-ipport")
if barAddr != fooAddr {
t.Fatalf("/foo came from conn %q; /bar came from %q instead", fooAddr, barAddr)
}
barRes.Body.Close()
dialGate <- true
}
type proxyFromEnvTest struct { type proxyFromEnvTest struct {
req string // URL to fetch; blank means "http://example.com" req string // URL to fetch; blank means "http://example.com"
env string env string
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment