Commit 18072adb authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: reuse HTTP/1 Transport conns more for gzipped responses

Flip around the composition order of the http.Response.Body's
gzip.Reader vs. the reader which keeps track of waiting to see the end
of the HTTP/1 response framing (whether that's a Content-Length or
HTTP/1.1 chunking).

Previously:

user -> http.Response.Body
     -> bodyEOFSignal
     -> gzipReader
     -> gzip.Reader
     -> bufio.Reader
   [ -> http/1.1 de-chunking reader ]   optional
     -> http1 framing *body

But because bodyEOFSignal was waiting to see an EOF from the
underlying gzip.Reader before reusing the connection, and gzip.Reader
(or more specifically: the flate.Reader) wasn't returning an early
io.EOF with the final chunk, the bodyEOfSignal was never releasing the
connection, because the EOF from the http1 framing was read by a party
who didn't care about it yet: the helper bufio.Reader created to do
byte-at-a-time reading in the flate.Reader.

Flip the read composition around to:

user -> http.Response.Body
     -> gzipReader
     -> gzip.Reader
     -> bufio.Reader
     -> bodyEOFSignal
   [ -> http/1.1 de-chunking reader ]   optional
     -> http1 framing *body

Now when gzip.Reader does its byte-at-a-time reading via the
bufio.Reader, the bufio.Reader will do its big reads against the
bodyEOFSignal reader instead, which will then see the underlying http1
framing EOF, and be able to reuse the connection.

Updates google/go-github#317
Updates #14867
And related abandoned fix to flate.Reader: https://golang.org/cl/21290

Change-Id: I3729dfdffe832ad943b84f4734b0f59b0e834749
Reviewed-on: https://go-review.googlesource.com/21291Reviewed-by: default avatarDavid Symonds <dsymonds@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
parent 7f067c87
...@@ -1147,17 +1147,15 @@ func (pc *persistConn) readLoop() { ...@@ -1147,17 +1147,15 @@ func (pc *persistConn) readLoop() {
continue continue
} }
if rc.addedGzip {
maybeUngzipResponse(resp)
}
resp.Body = &bodyEOFSignal{body: resp.Body}
waitForBodyRead := make(chan bool, 2) waitForBodyRead := make(chan bool, 2)
resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error { body := &bodyEOFSignal{
body: resp.Body,
earlyCloseFn: func() error {
waitForBodyRead <- false waitForBodyRead <- false
return nil return nil
}
resp.Body.(*bodyEOFSignal).fn = func(err error) error { },
fn: func(err error) error {
isEOF := err == io.EOF isEOF := err == io.EOF
waitForBodyRead <- isEOF waitForBodyRead <- isEOF
if isEOF { if isEOF {
...@@ -1166,6 +1164,15 @@ func (pc *persistConn) readLoop() { ...@@ -1166,6 +1164,15 @@ func (pc *persistConn) readLoop() {
return errRequestCanceled return errRequestCanceled
} }
return err return err
},
}
resp.Body = body
if rc.addedGzip && resp.Header.Get("Content-Encoding") == "gzip" {
resp.Body = &gzipReader{body: body}
resp.Header.Del("Content-Encoding")
resp.Header.Del("Content-Length")
resp.ContentLength = -1
} }
select { select {
...@@ -1199,15 +1206,6 @@ func (pc *persistConn) readLoop() { ...@@ -1199,15 +1206,6 @@ func (pc *persistConn) readLoop() {
} }
} }
func maybeUngzipResponse(resp *Response) {
if resp.Header.Get("Content-Encoding") == "gzip" {
resp.Header.Del("Content-Encoding")
resp.Header.Del("Content-Length")
resp.ContentLength = -1
resp.Body = &gzipReader{body: resp.Body}
}
}
func (pc *persistConn) readLoopPeekFailLocked(peekErr error) { func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
if pc.closed != nil { if pc.closed != nil {
return return
...@@ -1580,7 +1578,11 @@ func canonicalAddr(url *url.URL) string { ...@@ -1580,7 +1578,11 @@ func canonicalAddr(url *url.URL) string {
return addr return addr
} }
// bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most // bodyEOFSignal is used by the HTTP/1 transport when reading response
// bodies to make sure we see the end of a response body before
// proceeding and reading on the connection again.
//
// It wraps a ReadCloser but runs fn (if non-nil) at most
// once, right before its final (error-producing) Read or Close call // once, right before its final (error-producing) Read or Close call
// returns. fn should return the new error to return from Read or Close. // returns. fn should return the new error to return from Read or Close.
// //
...@@ -1596,12 +1598,14 @@ type bodyEOFSignal struct { ...@@ -1596,12 +1598,14 @@ type bodyEOFSignal struct {
earlyCloseFn func() error // optional alt Close func used if io.EOF not seen earlyCloseFn func() error // optional alt Close func used if io.EOF not seen
} }
var errReadOnClosedResBody = errors.New("http: read on closed response body")
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
es.mu.Lock() es.mu.Lock()
closed, rerr := es.closed, es.rerr closed, rerr := es.closed, es.rerr
es.mu.Unlock() es.mu.Unlock()
if closed { if closed {
return 0, errors.New("http: read on closed response body") return 0, errReadOnClosedResBody
} }
if rerr != nil { if rerr != nil {
return 0, rerr return 0, rerr
...@@ -1646,17 +1650,30 @@ func (es *bodyEOFSignal) condfn(err error) error { ...@@ -1646,17 +1650,30 @@ func (es *bodyEOFSignal) condfn(err error) error {
// gzipReader wraps a response body so it can lazily // gzipReader wraps a response body so it can lazily
// call gzip.NewReader on the first call to Read // call gzip.NewReader on the first call to Read
type gzipReader struct { type gzipReader struct {
body io.ReadCloser // underlying Response.Body body *bodyEOFSignal // underlying HTTP/1 response body framing
zr io.Reader // lazily-initialized gzip reader zr *gzip.Reader // lazily-initialized gzip reader
zerr error // any error from gzip.NewReader; sticky
} }
func (gz *gzipReader) Read(p []byte) (n int, err error) { func (gz *gzipReader) Read(p []byte) (n int, err error) {
if gz.zr == nil { if gz.zr == nil {
gz.zr, err = gzip.NewReader(gz.body) if gz.zerr == nil {
gz.zr, gz.zerr = gzip.NewReader(gz.body)
}
if gz.zerr != nil {
return 0, gz.zerr
}
}
gz.body.mu.Lock()
if gz.body.closed {
err = errReadOnClosedResBody
}
gz.body.mu.Unlock()
if err != nil { if err != nil {
return 0, err return 0, err
} }
}
return gz.zr.Read(p) return gz.zr.Read(p)
} }
......
...@@ -923,7 +923,9 @@ func TestTransportGzipRecursive(t *testing.T) { ...@@ -923,7 +923,9 @@ func TestTransportGzipRecursive(t *testing.T) {
})) }))
defer ts.Close() defer ts.Close()
c := &Client{Transport: &Transport{}} tr := &Transport{}
defer tr.CloseIdleConnections()
c := &Client{Transport: tr}
res, err := c.Get(ts.URL) res, err := c.Get(ts.URL)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -3044,6 +3046,50 @@ func TestNoCrashReturningTransportAltConn(t *testing.T) { ...@@ -3044,6 +3046,50 @@ func TestNoCrashReturningTransportAltConn(t *testing.T) {
<-handledPendingDial <-handledPendingDial
} }
func TestTransportReuseConnection_Gzip_Chunked(t *testing.T) {
testTransportReuseConnection_Gzip(t, true)
}
func TestTransportReuseConnection_Gzip_ContentLength(t *testing.T) {
testTransportReuseConnection_Gzip(t, false)
}
// Make sure we re-use underlying TCP connection for gzipped responses too.
func testTransportReuseConnection_Gzip(t *testing.T, chunked bool) {
defer afterTest(t)
addr := make(chan string, 2)
ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
addr <- r.RemoteAddr
w.Header().Set("Content-Encoding", "gzip")
if chunked {
w.(Flusher).Flush()
}
w.Write(rgz) // arbitrary gzip response
}))
defer ts.Close()
tr := &Transport{}
defer tr.CloseIdleConnections()
c := &Client{Transport: tr}
for i := 0; i < 2; i++ {
res, err := c.Get(ts.URL)
if err != nil {
t.Fatal(err)
}
buf := make([]byte, len(rgz))
if n, err := io.ReadFull(res.Body, buf); err != nil {
t.Errorf("%d. ReadFull = %v, %v", i, n, err)
}
// Note: no res.Body.Close call. It should work without it,
// since the flate.Reader's internal buffering will hit EOF
// and that should be sufficient.
}
a1, a2 := <-addr, <-addr
if a1 != a2 {
t.Fatalf("didn't reuse connection")
}
}
var errFakeRoundTrip = errors.New("fake roundtrip") var errFakeRoundTrip = errors.New("fake roundtrip")
type funcRoundTripper func() type funcRoundTripper func()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment