Commit 893d6866 authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: update bundled http2, add h2 Transport.IdleConnTimeout tests

Updates bundled http2 to x/net git rev a333c53 for:

   http2: add Transport support for IdleConnTimeout
   https://golang.org/cl/30075

And add tests.

The bundled http2 also includes a change adding a Ping method to
http2.ClientConn, but that type isn't exposed in the standard
library. Nevertheless, the code gets compiled and adds a dependency on
"crypto/rand", requiring an update to go/build's dependency
test. Because net/http already depends on crypto/tls, which uses
crypto/rand, it's not really a new dependency.

Fixes #16808

Change-Id: I1ec8666ea74762f27c70a6f30a366a6647f923f7
Reviewed-on: https://go-review.googlesource.com/30078
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarIan Lance Taylor <iant@golang.org>
parent 44150215
...@@ -376,16 +376,21 @@ var pkgDeps = map[string][]string{ ...@@ -376,16 +376,21 @@ var pkgDeps = map[string][]string{
// HTTP, kingpin of dependencies. // HTTP, kingpin of dependencies.
"net/http": { "net/http": {
"L4", "NET", "OS", "L4", "NET", "OS",
"context", "compress/gzip", "container/list", "crypto/tls", "compress/gzip",
"mime/multipart", "runtime/debug", "container/list",
"net/http/internal", "context",
"crypto/rand",
"crypto/tls",
"golang_org/x/net/http2/hpack", "golang_org/x/net/http2/hpack",
"golang_org/x/net/idna", "golang_org/x/net/idna",
"golang_org/x/net/lex/httplex", "golang_org/x/net/lex/httplex",
"golang_org/x/text/unicode/norm", "golang_org/x/text/unicode/norm",
"golang_org/x/text/width", "golang_org/x/text/width",
"internal/nettrace", "internal/nettrace",
"mime/multipart",
"net/http/httptrace", "net/http/httptrace",
"net/http/internal",
"runtime/debug",
}, },
"net/http/internal": {"L4"}, "net/http/internal": {"L4"},
"net/http/httptrace": {"context", "internal/nettrace", "net", "reflect", "time"}, "net/http/httptrace": {"context", "internal/nettrace", "net", "reflect", "time"},
......
...@@ -100,6 +100,24 @@ func (t *Transport) IdleConnStrsForTesting() []string { ...@@ -100,6 +100,24 @@ func (t *Transport) IdleConnStrsForTesting() []string {
return ret return ret
} }
func (t *Transport) IdleConnStrsForTesting_h2() []string {
var ret []string
noDialPool := t.h2transport.ConnPool.(http2noDialClientConnPool)
pool := noDialPool.http2clientConnPool
pool.mu.Lock()
defer pool.mu.Unlock()
for k, cc := range pool.conns {
for range cc {
ret = append(ret, k)
}
}
sort.Strings(ret)
return ret
}
func (t *Transport) IdleConnCountForTesting(cacheKey string) int { func (t *Transport) IdleConnCountForTesting(cacheKey string) int {
t.idleMu.Lock() t.idleMu.Lock()
defer t.idleMu.Unlock() defer t.idleMu.Unlock()
......
...@@ -21,6 +21,7 @@ import ( ...@@ -21,6 +21,7 @@ import (
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"context" "context"
"crypto/rand"
"crypto/tls" "crypto/tls"
"encoding/binary" "encoding/binary"
"errors" "errors"
...@@ -1255,7 +1256,7 @@ func (f *http2Framer) WriteSettings(settings ...http2Setting) error { ...@@ -1255,7 +1256,7 @@ func (f *http2Framer) WriteSettings(settings ...http2Setting) error {
return f.endWrite() return f.endWrite()
} }
// WriteSettings writes an empty SETTINGS frame with the ACK bit set. // WriteSettingsAck writes an empty SETTINGS frame with the ACK bit set.
// //
// It will perform exactly one Write to the underlying Writer. // It will perform exactly one Write to the underlying Writer.
// It is the caller's responsibility to not call other Write methods concurrently. // It is the caller's responsibility to not call other Write methods concurrently.
...@@ -2092,6 +2093,13 @@ type http2clientTrace httptrace.ClientTrace ...@@ -2092,6 +2093,13 @@ type http2clientTrace httptrace.ClientTrace
func http2reqContext(r *Request) context.Context { return r.Context() } func http2reqContext(r *Request) context.Context { return r.Context() }
func (t *http2Transport) idleConnTimeout() time.Duration {
if t.t1 != nil {
return t.t1.IdleConnTimeout
}
return 0
}
func http2setResponseUncompressed(res *Response) { res.Uncompressed = true } func http2setResponseUncompressed(res *Response) { res.Uncompressed = true }
func http2traceGotConn(req *Request, cc *http2ClientConn) { func http2traceGotConn(req *Request, cc *http2ClientConn) {
...@@ -2146,6 +2154,11 @@ func http2requestTrace(req *Request) *http2clientTrace { ...@@ -2146,6 +2154,11 @@ func http2requestTrace(req *Request) *http2clientTrace {
return (*http2clientTrace)(trace) return (*http2clientTrace)(trace)
} }
// Ping sends a PING frame to the server and waits for the ack.
func (cc *http2ClientConn) Ping(ctx context.Context) error {
return cc.ping(ctx)
}
func http2cloneTLSConfig(c *tls.Config) *tls.Config { return c.Clone() } func http2cloneTLSConfig(c *tls.Config) *tls.Config { return c.Clone() }
var http2DebugGoroutines = os.Getenv("DEBUG_HTTP2_GOROUTINES") == "1" var http2DebugGoroutines = os.Getenv("DEBUG_HTTP2_GOROUTINES") == "1"
...@@ -5013,6 +5026,9 @@ type http2ClientConn struct { ...@@ -5013,6 +5026,9 @@ type http2ClientConn struct {
readerDone chan struct{} // closed on error readerDone chan struct{} // closed on error
readerErr error // set before readerDone is closed readerErr error // set before readerDone is closed
idleTimeout time.Duration // or 0 for never
idleTimer *time.Timer
mu sync.Mutex // guards following mu sync.Mutex // guards following
cond *sync.Cond // hold mu; broadcast on flow/closed changes cond *sync.Cond // hold mu; broadcast on flow/closed changes
flow http2flow // our conn-level flow control quota (cs.flow is per stream) flow http2flow // our conn-level flow control quota (cs.flow is per stream)
...@@ -5023,6 +5039,7 @@ type http2ClientConn struct { ...@@ -5023,6 +5039,7 @@ type http2ClientConn struct {
goAwayDebug string // goAway frame's debug data, retained as a string goAwayDebug string // goAway frame's debug data, retained as a string
streams map[uint32]*http2clientStream // client-initiated streams map[uint32]*http2clientStream // client-initiated
nextStreamID uint32 nextStreamID uint32
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
bw *bufio.Writer bw *bufio.Writer
br *bufio.Reader br *bufio.Reader
fr *http2Framer fr *http2Framer
...@@ -5293,6 +5310,11 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client ...@@ -5293,6 +5310,11 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client
streams: make(map[uint32]*http2clientStream), streams: make(map[uint32]*http2clientStream),
singleUse: singleUse, singleUse: singleUse,
wantSettingsAck: true, wantSettingsAck: true,
pings: make(map[[8]byte]chan struct{}),
}
if d := t.idleConnTimeout(); d != 0 {
cc.idleTimeout = d
cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
} }
if http2VerboseLogs { if http2VerboseLogs {
t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr()) t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
...@@ -5365,6 +5387,16 @@ func (cc *http2ClientConn) canTakeNewRequestLocked() bool { ...@@ -5365,6 +5387,16 @@ func (cc *http2ClientConn) canTakeNewRequestLocked() bool {
cc.nextStreamID < math.MaxInt32 cc.nextStreamID < math.MaxInt32
} }
// onIdleTimeout is called from a time.AfterFunc goroutine. It will
// only be called when we're idle, but because we're coming from a new
// goroutine, there could be a new request coming in at the same time,
// so this simply calls the synchronized closeIfIdle to shut down this
// connection. The timer could just call closeIfIdle, but this is more
// clear.
func (cc *http2ClientConn) onIdleTimeout() {
cc.closeIfIdle()
}
func (cc *http2ClientConn) closeIfIdle() { func (cc *http2ClientConn) closeIfIdle() {
cc.mu.Lock() cc.mu.Lock()
if len(cc.streams) > 0 { if len(cc.streams) > 0 {
...@@ -5499,6 +5531,9 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { ...@@ -5499,6 +5531,9 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
if err := http2checkConnHeaders(req); err != nil { if err := http2checkConnHeaders(req); err != nil {
return nil, err return nil, err
} }
if cc.idleTimer != nil {
cc.idleTimer.Stop()
}
trailers, err := http2commaSeparatedTrailers(req) trailers, err := http2commaSeparatedTrailers(req)
if err != nil { if err != nil {
...@@ -5848,7 +5883,7 @@ func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trail ...@@ -5848,7 +5883,7 @@ func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trail
cc.writeHeader(":method", req.Method) cc.writeHeader(":method", req.Method)
if req.Method != "CONNECT" { if req.Method != "CONNECT" {
cc.writeHeader(":path", path) cc.writeHeader(":path", path)
cc.writeHeader(":scheme", "https") cc.writeHeader(":scheme", req.URL.Scheme)
} }
if trailers != "" { if trailers != "" {
cc.writeHeader("trailer", trailers) cc.writeHeader("trailer", trailers)
...@@ -5966,6 +6001,9 @@ func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStr ...@@ -5966,6 +6001,9 @@ func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStr
if andRemove && cs != nil && !cc.closed { if andRemove && cs != nil && !cc.closed {
cc.lastActive = time.Now() cc.lastActive = time.Now()
delete(cc.streams, id) delete(cc.streams, id)
if len(cc.streams) == 0 && cc.idleTimer != nil {
cc.idleTimer.Reset(cc.idleTimeout)
}
close(cs.done) close(cs.done)
cc.cond.Broadcast() cc.cond.Broadcast()
} }
...@@ -6022,6 +6060,10 @@ func (rl *http2clientConnReadLoop) cleanup() { ...@@ -6022,6 +6060,10 @@ func (rl *http2clientConnReadLoop) cleanup() {
defer cc.t.connPool().MarkDead(cc) defer cc.t.connPool().MarkDead(cc)
defer close(cc.readerDone) defer close(cc.readerDone)
if cc.idleTimer != nil {
cc.idleTimer.Stop()
}
err := cc.readerErr err := cc.readerErr
cc.mu.Lock() cc.mu.Lock()
if cc.goAway != nil && http2isEOFOrNetReadError(err) { if cc.goAway != nil && http2isEOFOrNetReadError(err) {
...@@ -6577,9 +6619,56 @@ func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) er ...@@ -6577,9 +6619,56 @@ func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) er
return nil return nil
} }
// Ping sends a PING frame to the server and waits for the ack.
// Public implementation is in go17.go and not_go17.go
func (cc *http2ClientConn) ping(ctx http2contextContext) error {
c := make(chan struct{})
// Generate a random payload
var p [8]byte
for {
if _, err := rand.Read(p[:]); err != nil {
return err
}
cc.mu.Lock()
if _, found := cc.pings[p]; !found {
cc.pings[p] = c
cc.mu.Unlock()
break
}
cc.mu.Unlock()
}
cc.wmu.Lock()
if err := cc.fr.WritePing(false, p); err != nil {
cc.wmu.Unlock()
return err
}
if err := cc.bw.Flush(); err != nil {
cc.wmu.Unlock()
return err
}
cc.wmu.Unlock()
select {
case <-c:
return nil
case <-ctx.Done():
return ctx.Err()
case <-cc.readerDone:
return cc.readerErr
}
}
func (rl *http2clientConnReadLoop) processPing(f *http2PingFrame) error { func (rl *http2clientConnReadLoop) processPing(f *http2PingFrame) error {
if f.IsAck() { if f.IsAck() {
cc := rl.cc
cc.mu.Lock()
defer cc.mu.Unlock()
if c, ok := cc.pings[f.Data]; ok {
close(c)
delete(cc.pings, f.Data)
}
return nil return nil
} }
cc := rl.cc cc := rl.cc
......
...@@ -3481,27 +3481,36 @@ func TestTransportMaxIdleConns(t *testing.T) { ...@@ -3481,27 +3481,36 @@ func TestTransportMaxIdleConns(t *testing.T) {
} }
} }
func TestTransportIdleConnTimeout(t *testing.T) { func TestTransportIdleConnTimeout_h1(t *testing.T) { testTransportIdleConnTimeout(t, h1Mode) }
func TestTransportIdleConnTimeout_h2(t *testing.T) { testTransportIdleConnTimeout(t, h2Mode) }
func testTransportIdleConnTimeout(t *testing.T, h2 bool) {
if testing.Short() { if testing.Short() {
t.Skip("skipping in short mode") t.Skip("skipping in short mode")
} }
defer afterTest(t) defer afterTest(t)
ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) { const timeout = 1 * time.Second
cst := newClientServerTest(t, h2, HandlerFunc(func(w ResponseWriter, r *Request) {
// No body for convenience. // No body for convenience.
})) }))
defer ts.Close() defer cst.close()
tr := cst.tr
const timeout = 1 * time.Second tr.IdleConnTimeout = timeout
tr := &Transport{
IdleConnTimeout: timeout,
}
defer tr.CloseIdleConnections() defer tr.CloseIdleConnections()
c := &Client{Transport: tr} c := &Client{Transport: tr}
idleConns := func() []string {
if h2 {
return tr.IdleConnStrsForTesting_h2()
} else {
return tr.IdleConnStrsForTesting()
}
}
var conn string var conn string
doReq := func(n int) { doReq := func(n int) {
req, _ := NewRequest("GET", ts.URL, nil) req, _ := NewRequest("GET", cst.ts.URL, nil)
req = req.WithContext(httptrace.WithClientTrace(context.Background(), &httptrace.ClientTrace{ req = req.WithContext(httptrace.WithClientTrace(context.Background(), &httptrace.ClientTrace{
PutIdleConn: func(err error) { PutIdleConn: func(err error) {
if err != nil { if err != nil {
...@@ -3514,7 +3523,7 @@ func TestTransportIdleConnTimeout(t *testing.T) { ...@@ -3514,7 +3523,7 @@ func TestTransportIdleConnTimeout(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
res.Body.Close() res.Body.Close()
conns := tr.IdleConnStrsForTesting() conns := idleConns()
if len(conns) != 1 { if len(conns) != 1 {
t.Fatalf("req %v: unexpected number of idle conns: %q", n, conns) t.Fatalf("req %v: unexpected number of idle conns: %q", n, conns)
} }
...@@ -3530,7 +3539,7 @@ func TestTransportIdleConnTimeout(t *testing.T) { ...@@ -3530,7 +3539,7 @@ func TestTransportIdleConnTimeout(t *testing.T) {
time.Sleep(timeout / 2) time.Sleep(timeout / 2)
} }
time.Sleep(timeout * 3 / 2) time.Sleep(timeout * 3 / 2)
if got := tr.IdleConnStrsForTesting(); len(got) != 0 { if got := idleConns(); len(got) != 0 {
t.Errorf("idle conns = %q; want none", got) t.Errorf("idle conns = %q; want none", got)
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment