Commit 7fa98467 authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: tighten protocol between Transport.roundTrip and persistConn.readLoop

In debugging the flaky test in #13825, I discovered that my previous
change to tighten and simplify the communication protocol between
Transport.roundTrip and persistConn.readLoop in
https://golang.org/cl/17890 wasn't complete.

This change simplifies it further: the buffered-vs-unbuffered
complexity goes away, and we no longer need to re-try channel reads in
the select case. It was trying to prioritize channels in the case that
two were readable in the select. (it was only failing in the race builder
because the race builds randomize select scheduling)

The problem was that in the bodyless response case we had to return
the idle connection before replying to roundTrip. But putIdleConn
previously both added it to the free list (which we wanted), but also
closed the connection, which made the caller goroutine
(Transport.roundTrip) have two readable cases: pc.closech, and the
response. We guarded against similar conditions in the caller's select
for two readable channels, but such a fix wasn't possible here, and would
be overly complicated.

Instead, switch to unbuffered channels. The unbuffered channels were only
to prevent goroutine leaks, so address that differently: add a "callerGone"
channel closed by the caller on exit, and select on that during any unbuffered
sends.

As part of the fix, split putIdleConn into two halves: a part that
just returns to the freelist, and a part that also closes. Update the
four callers to the variants each wanted.

Incidentally, the connections were closing on return to the pool due
to MaxIdleConnsPerHost (somewhat related: #13801), but this bug
could've manifested for plenty of other reasons.

Fixes #13825

Change-Id: I6fa7136e2c52909d57a22ea4b74d0155fdf0e6fa
Reviewed-on: https://go-review.googlesource.com/18282
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarAndrew Gerrand <adg@golang.org>
parent 59ca8789
......@@ -121,12 +121,12 @@ func (t *Transport) RequestIdleConnChForTesting() {
func (t *Transport) PutIdleTestConn() bool {
c, _ := net.Pipe()
return t.putIdleConn(&persistConn{
return t.tryPutIdleConn(&persistConn{
t: t,
conn: c, // dummy
closech: make(chan struct{}), // so it can be closed
cacheKey: connectMethodKey{"", "http", "example.com"},
})
}) == nil
}
// All test hooks must be non-nil so they can be called directly,
......
......@@ -365,7 +365,7 @@ func (t *Transport) CloseIdleConnections() {
t.idleMu.Unlock()
for _, conns := range m {
for _, pconn := range conns {
pconn.close()
pconn.close(errCloseIdleConns)
}
}
}
......@@ -450,17 +450,34 @@ func (cm *connectMethod) proxyAuth() string {
return ""
}
// putIdleConn adds pconn to the list of idle persistent connections awaiting
// error values for debugging and testing, not seen by users.
var (
errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
errWantIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
errCloseIdleConns = errors.New("http: CloseIdleConnections called")
errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
errServerClosedIdle = errors.New("http: server closed idle conn")
)
func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
if err := t.tryPutIdleConn(pconn); err != nil {
pconn.close(err)
}
}
// tryPutIdleConn adds pconn to the list of idle persistent connections awaiting
// a new request.
// If pconn is no longer needed or not in a good state, putIdleConn
// returns false.
func (t *Transport) putIdleConn(pconn *persistConn) bool {
// If pconn is no longer needed or not in a good state, tryPutIdleConn returns
// an error explaining why it wasn't registered.
// tryPutIdleConn does not close pconn. Use putOrCloseIdleConn instead for that.
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
pconn.close()
return false
return errKeepAlivesDisabled
}
if pconn.isBroken() {
return false
return errConnBroken
}
key := pconn.cacheKey
max := t.MaxIdleConnsPerHost
......@@ -479,7 +496,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
// first). Chrome calls this socket late binding. See
// https://insouciant.org/tech/connection-management-in-chromium/
t.idleMu.Unlock()
return true
return nil
default:
if waitingDialer != nil {
// They had populated this, but their dial won
......@@ -489,16 +506,14 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
}
if t.wantIdle {
t.idleMu.Unlock()
pconn.close()
return false
return errWantIdle
}
if t.idleConn == nil {
t.idleConn = make(map[connectMethodKey][]*persistConn)
}
if len(t.idleConn[key]) >= max {
t.idleMu.Unlock()
pconn.close()
return false
return errTooManyIdle
}
for _, exist := range t.idleConn[key] {
if exist == pconn {
......@@ -507,7 +522,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
}
t.idleConn[key] = append(t.idleConn[key], pconn)
t.idleMu.Unlock()
return true
return nil
}
// getIdleConnCh returns a channel to receive and return idle
......@@ -626,7 +641,7 @@ func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error
testHookPrePendingDial()
go func() {
if v := <-dialc; v.err == nil {
t.putIdleConn(v.pc)
t.putOrCloseIdleConn(v.pc)
}
testHookPostPendingDial()
}()
......@@ -929,10 +944,10 @@ type persistConn struct {
lk sync.Mutex // guards following fields
numExpectedResponses int
closed bool // whether conn has been closed
broken bool // an error has happened on this connection; marked broken so it's not reused.
canceled bool // whether this conn was broken due a CancelRequest
reused bool // whether conn has had successful request/response and is being reused.
closed error // set non-nil when conn is closed, before closech is closed
broken bool // an error has happened on this connection; marked broken so it's not reused.
canceled bool // whether this conn was broken due a CancelRequest
reused bool // whether conn has had successful request/response and is being reused.
// mutateHeaderFunc is an optional func to modify extra
// headers on each outbound request before it's written. (the
// original Request given to RoundTrip is not modified)
......@@ -966,11 +981,20 @@ func (pc *persistConn) cancelRequest() {
pc.lk.Lock()
defer pc.lk.Unlock()
pc.canceled = true
pc.closeLocked()
pc.closeLocked(errRequestCanceled)
}
func (pc *persistConn) readLoop() {
defer pc.close()
closeErr := errReadLoopExiting // default value, if not changed below
defer func() { pc.close(closeErr) }()
tryPutIdleConn := func() bool {
if err := pc.t.tryPutIdleConn(pc); err != nil {
closeErr = err
return false
}
return true
}
// eofc is used to block caller goroutines reading from Response.Body
// at EOF until this goroutines has (potentially) added the connection
......@@ -1016,7 +1040,11 @@ func (pc *persistConn) readLoop() {
if checkTransportResend(err, rc.req, pc) != nil {
pc.t.setReqCanceler(rc.req, nil)
}
rc.ch <- responseAndError{err: err}
select {
case rc.ch <- responseAndError{err: err}:
case <-rc.callerGone:
return
}
return
}
......@@ -1035,8 +1063,6 @@ func (pc *persistConn) readLoop() {
if !hasBody {
pc.t.setReqCanceler(rc.req, nil)
resc := make(chan *Response) // unbuffered matters; see below
rc.ch <- responseAndError{ch: resc} // buffered send
// Put the idle conn back into the pool before we send the response
// so if they process it quickly and make another request, they'll
......@@ -1047,9 +1073,13 @@ func (pc *persistConn) readLoop() {
alive = alive &&
!pc.sawEOF &&
pc.wroteRequest() &&
pc.t.putIdleConn(pc)
tryPutIdleConn()
resc <- resp // unbuffered send
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
// Now that they've read from the unbuffered channel, they're safely
// out of the select that also waits on this goroutine to die, so
......@@ -1079,7 +1109,11 @@ func (pc *persistConn) readLoop() {
return err
}
rc.ch <- responseAndError{r: resp}
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
// Before looping back to the top of this function and peeking on
// the bufio.Reader, wait for the caller goroutine to finish
......@@ -1091,7 +1125,7 @@ func (pc *persistConn) readLoop() {
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
pc.t.putIdleConn(pc)
tryPutIdleConn()
if bodyEOF {
eofc <- struct{}{}
}
......@@ -1116,14 +1150,19 @@ func maybeUngzipResponse(resp *Response) {
}
func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
if pc.closed {
if pc.closed != nil {
return
}
if n := pc.br.Buffered(); n > 0 {
buf, _ := pc.br.Peek(n)
log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
}
pc.closeLocked()
if peekErr == io.EOF {
// common case.
pc.closeLocked(errServerClosedIdle)
} else {
pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %v", peekErr))
}
}
// readResponse reads an HTTP response (or two, in the case of "Expect:
......@@ -1227,25 +1266,13 @@ func (pc *persistConn) wroteRequest() bool {
// responseAndError is how the goroutine reading from an HTTP/1 server
// communicates with the goroutine doing the RoundTrip.
type responseAndError struct {
ch chan *Response // if non-nil, res should be read from here
r *Response // else use this response (see res method)
res *Response // else use this response (see res method)
err error
}
func (re responseAndError) res() *Response {
switch {
case re.err != nil:
return nil
case re.ch != nil:
return <-re.ch
default:
return re.r
}
}
type requestAndChan struct {
req *Request
ch chan responseAndError
ch chan responseAndError // unbuffered; always send in select on callerGone
// did the Transport (as opposed to the client code) add an
// Accept-Encoding gzip header? only if it we set it do
......@@ -1257,6 +1284,8 @@ type requestAndChan struct {
// the server responds 100 Continue, readLoop send a value
// to writeLoop via this chan.
continueCh chan<- struct{}
callerGone <-chan struct{} // closed when roundTrip caller has returned
}
// A writeRequest is sent by the readLoop's goroutine to the
......@@ -1283,7 +1312,7 @@ func (e *httpError) Timeout() bool { return e.timeout }
func (e *httpError) Temporary() bool { return true }
var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true}
var errClosed error = &httpError{err: "net/http: transport closed before response was received"}
var errClosed error = &httpError{err: "net/http: server closed connection before response was received"}
var errRequestCanceled = errors.New("net/http: request canceled")
func nop() {}
......@@ -1309,7 +1338,7 @@ type beforeRespHeaderError struct {
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
testHookEnterRoundTrip()
if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) {
pc.t.putIdleConn(pc)
pc.t.putOrCloseIdleConn(pc)
return nil, errRequestCanceled
}
pc.lk.Lock()
......@@ -1355,14 +1384,23 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err
req.extraHeaders().Set("Connection", "close")
}
gone := make(chan struct{})
defer close(gone)
// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
writeErrCh := make(chan error, 1)
pc.writech <- writeRequest{req, writeErrCh, continueCh}
resc := make(chan responseAndError, 1)
pc.reqch <- requestAndChan{req.Request, resc, requestedGzip, continueCh}
resc := make(chan responseAndError)
pc.reqch <- requestAndChan{
req: req.Request,
ch: resc,
addedGzip: requestedGzip,
continueCh: continueCh,
callerGone: gone,
}
var re responseAndError
var respHeaderTimer <-chan time.Time
......@@ -1372,22 +1410,9 @@ WaitResponse:
testHookWaitResLoop()
select {
case err := <-writeErrCh:
if isNetWriteError(err) {
// Issue 11745. If we failed to write the request
// body, it's possible the server just heard enough
// and already wrote to us. Prioritize the server's
// response over returning a body write error.
select {
case re = <-resc:
pc.close()
break WaitResponse
case <-time.After(50 * time.Millisecond):
// Fall through.
}
}
if err != nil {
re = responseAndError{err: beforeRespHeaderError{err}}
pc.close()
pc.close(fmt.Errorf("write error: %v", err))
break WaitResponse
}
if d := pc.t.ResponseHeaderTimeout; d > 0 {
......@@ -1400,12 +1425,12 @@ WaitResponse:
if pc.isCanceled() {
err = errRequestCanceled
} else {
err = beforeRespHeaderError{errClosed}
err = beforeRespHeaderError{fmt.Errorf("net/http: HTTP/1 transport connection broken: %v", pc.closed)}
}
re = responseAndError{err: err}
break WaitResponse
case <-respHeaderTimer:
pc.close()
pc.close(errTimeout)
re = responseAndError{err: errTimeout}
break WaitResponse
case re = <-resc:
......@@ -1419,7 +1444,10 @@ WaitResponse:
if re.err != nil {
pc.t.setReqCanceler(req.Request, nil)
}
return re.res(), re.err
if (re.res == nil) == (re.err == nil) {
panic("internal error: exactly one of res or err should be set")
}
return re.res, re.err
}
// markBroken marks a connection as broken (so it's not reused).
......@@ -1439,17 +1467,25 @@ func (pc *persistConn) markReused() {
pc.lk.Unlock()
}
func (pc *persistConn) close() {
// close closes the underlying TCP connection and closes
// the pc.closech channel.
//
// The provided err is only for testing and debugging; in normal
// circumstances it should never be seen by users.
func (pc *persistConn) close(err error) {
pc.lk.Lock()
defer pc.lk.Unlock()
pc.closeLocked()
pc.closeLocked(err)
}
func (pc *persistConn) closeLocked() {
func (pc *persistConn) closeLocked(err error) {
if err == nil {
panic("nil error")
}
pc.broken = true
if !pc.closed {
if pc.closed == nil {
pc.conn.Close()
pc.closed = true
pc.closed = err
close(pc.closech)
}
pc.mutateHeaderFunc = nil
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment