Commit 10316757 authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: update bundled http2 for flow control window adjustment fix

Updates bundled http2 to x/net/http2 git rev 075e191 for:

   http2: adjust flow control on open streams when processing SETTINGS
   https://golang.org/cl/25508

Fixes #16612

Change-Id: Ib0513201bff44ab747a574ae6894479325c105d2
Reviewed-on: https://go-review.googlesource.com/25543
Run-TryBot: Chris Broadfoot <cbro@golang.org>
Reviewed-by: default avatarIan Lance Taylor <iant@golang.org>
Reviewed-by: default avatarChris Broadfoot <cbro@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
parent da070bed
...@@ -28,6 +28,7 @@ import ( ...@@ -28,6 +28,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
"math"
"net" "net"
"net/http/httptrace" "net/http/httptrace"
"net/textproto" "net/textproto"
...@@ -403,9 +404,17 @@ func (e http2ConnectionError) Error() string { ...@@ -403,9 +404,17 @@ func (e http2ConnectionError) Error() string {
type http2StreamError struct { type http2StreamError struct {
StreamID uint32 StreamID uint32
Code http2ErrCode Code http2ErrCode
Cause error // optional additional detail
}
func http2streamError(id uint32, code http2ErrCode) http2StreamError {
return http2StreamError{StreamID: id, Code: code}
} }
func (e http2StreamError) Error() string { func (e http2StreamError) Error() string {
if e.Cause != nil {
return fmt.Sprintf("stream error: stream ID %d; %v; %v", e.StreamID, e.Code, e.Cause)
}
return fmt.Sprintf("stream error: stream ID %d; %v", e.StreamID, e.Code) return fmt.Sprintf("stream error: stream ID %d; %v", e.StreamID, e.Code)
} }
...@@ -1366,7 +1375,7 @@ func http2parseWindowUpdateFrame(fh http2FrameHeader, p []byte) (http2Frame, err ...@@ -1366,7 +1375,7 @@ func http2parseWindowUpdateFrame(fh http2FrameHeader, p []byte) (http2Frame, err
if fh.StreamID == 0 { if fh.StreamID == 0 {
return nil, http2ConnectionError(http2ErrCodeProtocol) return nil, http2ConnectionError(http2ErrCodeProtocol)
} }
return nil, http2StreamError{fh.StreamID, http2ErrCodeProtocol} return nil, http2streamError(fh.StreamID, http2ErrCodeProtocol)
} }
return &http2WindowUpdateFrame{ return &http2WindowUpdateFrame{
http2FrameHeader: fh, http2FrameHeader: fh,
...@@ -1444,7 +1453,7 @@ func http2parseHeadersFrame(fh http2FrameHeader, p []byte) (_ http2Frame, err er ...@@ -1444,7 +1453,7 @@ func http2parseHeadersFrame(fh http2FrameHeader, p []byte) (_ http2Frame, err er
} }
} }
if len(p)-int(padLength) <= 0 { if len(p)-int(padLength) <= 0 {
return nil, http2StreamError{fh.StreamID, http2ErrCodeProtocol} return nil, http2streamError(fh.StreamID, http2ErrCodeProtocol)
} }
hf.headerFragBuf = p[:len(p)-int(padLength)] hf.headerFragBuf = p[:len(p)-int(padLength)]
return hf, nil return hf, nil
...@@ -1911,6 +1920,9 @@ func (fr *http2Framer) readMetaFrame(hf *http2HeadersFrame) (*http2MetaHeadersFr ...@@ -1911,6 +1920,9 @@ func (fr *http2Framer) readMetaFrame(hf *http2HeadersFrame) (*http2MetaHeadersFr
hdec.SetEmitEnabled(true) hdec.SetEmitEnabled(true)
hdec.SetMaxStringLength(fr.maxHeaderStringLen()) hdec.SetMaxStringLength(fr.maxHeaderStringLen())
hdec.SetEmitFunc(func(hf hpack.HeaderField) { hdec.SetEmitFunc(func(hf hpack.HeaderField) {
if http2VerboseLogs && http2logFrameReads {
log.Printf("http2: decoded hpack field %+v", hf)
}
if !httplex.ValidHeaderFieldValue(hf.Value) { if !httplex.ValidHeaderFieldValue(hf.Value) {
invalid = http2headerFieldValueError(hf.Value) invalid = http2headerFieldValueError(hf.Value)
} }
...@@ -1969,11 +1981,17 @@ func (fr *http2Framer) readMetaFrame(hf *http2HeadersFrame) (*http2MetaHeadersFr ...@@ -1969,11 +1981,17 @@ func (fr *http2Framer) readMetaFrame(hf *http2HeadersFrame) (*http2MetaHeadersFr
} }
if invalid != nil { if invalid != nil {
fr.errDetail = invalid fr.errDetail = invalid
return nil, http2StreamError{mh.StreamID, http2ErrCodeProtocol} if http2VerboseLogs {
log.Printf("http2: invalid header: %v", invalid)
}
return nil, http2StreamError{mh.StreamID, http2ErrCodeProtocol, invalid}
} }
if err := mh.checkPseudos(); err != nil { if err := mh.checkPseudos(); err != nil {
fr.errDetail = err fr.errDetail = err
return nil, http2StreamError{mh.StreamID, http2ErrCodeProtocol} if http2VerboseLogs {
log.Printf("http2: invalid pseudo headers: %v", err)
}
return nil, http2StreamError{mh.StreamID, http2ErrCodeProtocol, err}
} }
return mh, nil return mh, nil
} }
...@@ -3604,7 +3622,7 @@ func (sc *http2serverConn) wroteFrame(res http2frameWriteResult) { ...@@ -3604,7 +3622,7 @@ func (sc *http2serverConn) wroteFrame(res http2frameWriteResult) {
case http2stateOpen: case http2stateOpen:
st.state = http2stateHalfClosedLocal st.state = http2stateHalfClosedLocal
errCancel := http2StreamError{st.id, http2ErrCodeCancel} errCancel := http2streamError(st.id, http2ErrCodeCancel)
sc.resetStream(errCancel) sc.resetStream(errCancel)
case http2stateHalfClosedRemote: case http2stateHalfClosedRemote:
sc.closeStream(st, http2errHandlerComplete) sc.closeStream(st, http2errHandlerComplete)
...@@ -3797,7 +3815,7 @@ func (sc *http2serverConn) processWindowUpdate(f *http2WindowUpdateFrame) error ...@@ -3797,7 +3815,7 @@ func (sc *http2serverConn) processWindowUpdate(f *http2WindowUpdateFrame) error
return nil return nil
} }
if !st.flow.add(int32(f.Increment)) { if !st.flow.add(int32(f.Increment)) {
return http2StreamError{f.StreamID, http2ErrCodeFlowControl} return http2streamError(f.StreamID, http2ErrCodeFlowControl)
} }
default: default:
if !sc.flow.add(int32(f.Increment)) { if !sc.flow.add(int32(f.Increment)) {
...@@ -3819,7 +3837,7 @@ func (sc *http2serverConn) processResetStream(f *http2RSTStreamFrame) error { ...@@ -3819,7 +3837,7 @@ func (sc *http2serverConn) processResetStream(f *http2RSTStreamFrame) error {
if st != nil { if st != nil {
st.gotReset = true st.gotReset = true
st.cancelCtx() st.cancelCtx()
sc.closeStream(st, http2StreamError{f.StreamID, f.ErrCode}) sc.closeStream(st, http2streamError(f.StreamID, f.ErrCode))
} }
return nil return nil
} }
...@@ -3922,13 +3940,13 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { ...@@ -3922,13 +3940,13 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error {
if !ok || st.state != http2stateOpen || st.gotTrailerHeader { if !ok || st.state != http2stateOpen || st.gotTrailerHeader {
if sc.inflow.available() < int32(f.Length) { if sc.inflow.available() < int32(f.Length) {
return http2StreamError{id, http2ErrCodeFlowControl} return http2streamError(id, http2ErrCodeFlowControl)
} }
sc.inflow.take(int32(f.Length)) sc.inflow.take(int32(f.Length))
sc.sendWindowUpdate(nil, int(f.Length)) sc.sendWindowUpdate(nil, int(f.Length))
return http2StreamError{id, http2ErrCodeStreamClosed} return http2streamError(id, http2ErrCodeStreamClosed)
} }
if st.body == nil { if st.body == nil {
panic("internal error: should have a body in this state") panic("internal error: should have a body in this state")
...@@ -3936,19 +3954,19 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { ...@@ -3936,19 +3954,19 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error {
if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes { if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
return http2StreamError{id, http2ErrCodeStreamClosed} return http2streamError(id, http2ErrCodeStreamClosed)
} }
if f.Length > 0 { if f.Length > 0 {
if st.inflow.available() < int32(f.Length) { if st.inflow.available() < int32(f.Length) {
return http2StreamError{id, http2ErrCodeFlowControl} return http2streamError(id, http2ErrCodeFlowControl)
} }
st.inflow.take(int32(f.Length)) st.inflow.take(int32(f.Length))
if len(data) > 0 { if len(data) > 0 {
wrote, err := st.body.Write(data) wrote, err := st.body.Write(data)
if err != nil { if err != nil {
return http2StreamError{id, http2ErrCodeStreamClosed} return http2streamError(id, http2ErrCodeStreamClosed)
} }
if wrote != len(data) { if wrote != len(data) {
panic("internal error: bad Writer") panic("internal error: bad Writer")
...@@ -4046,10 +4064,10 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error { ...@@ -4046,10 +4064,10 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error {
if sc.unackedSettings == 0 { if sc.unackedSettings == 0 {
return http2StreamError{st.id, http2ErrCodeProtocol} return http2streamError(st.id, http2ErrCodeProtocol)
} }
return http2StreamError{st.id, http2ErrCodeRefusedStream} return http2streamError(st.id, http2ErrCodeRefusedStream)
} }
rw, req, err := sc.newWriterAndRequest(st, f) rw, req, err := sc.newWriterAndRequest(st, f)
...@@ -4083,18 +4101,18 @@ func (st *http2stream) processTrailerHeaders(f *http2MetaHeadersFrame) error { ...@@ -4083,18 +4101,18 @@ func (st *http2stream) processTrailerHeaders(f *http2MetaHeadersFrame) error {
} }
st.gotTrailerHeader = true st.gotTrailerHeader = true
if !f.StreamEnded() { if !f.StreamEnded() {
return http2StreamError{st.id, http2ErrCodeProtocol} return http2streamError(st.id, http2ErrCodeProtocol)
} }
if len(f.PseudoFields()) > 0 { if len(f.PseudoFields()) > 0 {
return http2StreamError{st.id, http2ErrCodeProtocol} return http2streamError(st.id, http2ErrCodeProtocol)
} }
if st.trailer != nil { if st.trailer != nil {
for _, hf := range f.RegularFields() { for _, hf := range f.RegularFields() {
key := sc.canonicalHeader(hf.Name) key := sc.canonicalHeader(hf.Name)
if !http2ValidTrailerHeader(key) { if !http2ValidTrailerHeader(key) {
return http2StreamError{st.id, http2ErrCodeProtocol} return http2streamError(st.id, http2ErrCodeProtocol)
} }
st.trailer[key] = append(st.trailer[key], hf.Value) st.trailer[key] = append(st.trailer[key], hf.Value)
} }
...@@ -4148,18 +4166,18 @@ func (sc *http2serverConn) newWriterAndRequest(st *http2stream, f *http2MetaHead ...@@ -4148,18 +4166,18 @@ func (sc *http2serverConn) newWriterAndRequest(st *http2stream, f *http2MetaHead
isConnect := method == "CONNECT" isConnect := method == "CONNECT"
if isConnect { if isConnect {
if path != "" || scheme != "" || authority == "" { if path != "" || scheme != "" || authority == "" {
return nil, nil, http2StreamError{f.StreamID, http2ErrCodeProtocol} return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol)
} }
} else if method == "" || path == "" || } else if method == "" || path == "" ||
(scheme != "https" && scheme != "http") { (scheme != "https" && scheme != "http") {
return nil, nil, http2StreamError{f.StreamID, http2ErrCodeProtocol} return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol)
} }
bodyOpen := !f.StreamEnded() bodyOpen := !f.StreamEnded()
if method == "HEAD" && bodyOpen { if method == "HEAD" && bodyOpen {
return nil, nil, http2StreamError{f.StreamID, http2ErrCodeProtocol} return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol)
} }
var tlsState *tls.ConnectionState // nil if not scheme https var tlsState *tls.ConnectionState // nil if not scheme https
...@@ -4216,7 +4234,7 @@ func (sc *http2serverConn) newWriterAndRequest(st *http2stream, f *http2MetaHead ...@@ -4216,7 +4234,7 @@ func (sc *http2serverConn) newWriterAndRequest(st *http2stream, f *http2MetaHead
var err error var err error
url_, err = url.ParseRequestURI(path) url_, err = url.ParseRequestURI(path)
if err != nil { if err != nil {
return nil, nil, http2StreamError{f.StreamID, http2ErrCodeProtocol} return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol)
} }
requestURI = path requestURI = path
} }
...@@ -4993,11 +5011,11 @@ type http2ClientConn struct { ...@@ -4993,11 +5011,11 @@ type http2ClientConn struct {
br *bufio.Reader br *bufio.Reader
fr *http2Framer fr *http2Framer
lastActive time.Time lastActive time.Time
// Settings from peer: (also guarded by mu)
// Settings from peer:
maxFrameSize uint32 maxFrameSize uint32
maxConcurrentStreams uint32 maxConcurrentStreams uint32
initialWindowSize uint32 initialWindowSize uint32
hbuf bytes.Buffer // HPACK encoder writes into this hbuf bytes.Buffer // HPACK encoder writes into this
henc *hpack.Encoder henc *hpack.Encoder
freeBuf [][]byte freeBuf [][]byte
...@@ -5244,10 +5262,6 @@ func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) { ...@@ -5244,10 +5262,6 @@ func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) {
} }
func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2ClientConn, error) { func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2ClientConn, error) {
if http2VerboseLogs {
t.vlogf("http2: Transport creating client conn to %v", c.RemoteAddr())
}
cc := &http2ClientConn{ cc := &http2ClientConn{
t: t, t: t,
tconn: c, tconn: c,
...@@ -5260,6 +5274,10 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client ...@@ -5260,6 +5274,10 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client
singleUse: singleUse, singleUse: singleUse,
wantSettingsAck: true, wantSettingsAck: true,
} }
if http2VerboseLogs {
t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
}
cc.cond = sync.NewCond(&cc.mu) cc.cond = sync.NewCond(&cc.mu)
cc.flow.add(int32(http2initialWindowSize)) cc.flow.add(int32(http2initialWindowSize))
...@@ -5324,7 +5342,7 @@ func (cc *http2ClientConn) canTakeNewRequestLocked() bool { ...@@ -5324,7 +5342,7 @@ func (cc *http2ClientConn) canTakeNewRequestLocked() bool {
} }
return cc.goAway == nil && !cc.closed && return cc.goAway == nil && !cc.closed &&
int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) && int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
cc.nextStreamID < 2147483647 cc.nextStreamID < math.MaxInt32
} }
func (cc *http2ClientConn) closeIfIdle() { func (cc *http2ClientConn) closeIfIdle() {
...@@ -5334,9 +5352,13 @@ func (cc *http2ClientConn) closeIfIdle() { ...@@ -5334,9 +5352,13 @@ func (cc *http2ClientConn) closeIfIdle() {
return return
} }
cc.closed = true cc.closed = true
nextID := cc.nextStreamID
cc.mu.Unlock() cc.mu.Unlock()
if http2VerboseLogs {
cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
}
cc.tconn.Close() cc.tconn.Close()
} }
...@@ -5986,11 +6008,15 @@ func (rl *http2clientConnReadLoop) run() error { ...@@ -5986,11 +6008,15 @@ func (rl *http2clientConnReadLoop) run() error {
for { for {
f, err := cc.fr.ReadFrame() f, err := cc.fr.ReadFrame()
if err != nil { if err != nil {
cc.vlogf("Transport readFrame error: (%T) %v", err, err) cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
} }
if se, ok := err.(http2StreamError); ok { if se, ok := err.(http2StreamError); ok {
if cs := cc.streamByID(se.StreamID, true); cs != nil { if cs := cc.streamByID(se.StreamID, true); cs != nil {
rl.endStreamError(cs, cc.fr.errDetail) cs.cc.writeStreamReset(cs.ID, se.Code, err)
if se.Cause == nil {
se.Cause = cc.fr.errDetail
}
rl.endStreamError(cs, se)
} }
continue continue
} else if err != nil { } else if err != nil {
...@@ -6034,6 +6060,9 @@ func (rl *http2clientConnReadLoop) run() error { ...@@ -6034,6 +6060,9 @@ func (rl *http2clientConnReadLoop) run() error {
cc.logf("Transport: unhandled response frame type %T", f) cc.logf("Transport: unhandled response frame type %T", f)
} }
if err != nil { if err != nil {
if http2VerboseLogs {
cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, http2summarizeFrame(f), err)
}
return err return err
} }
if rl.closeWhenIdle && gotReply && maybeIdle && len(rl.activeRes) == 0 { if rl.closeWhenIdle && gotReply && maybeIdle && len(rl.activeRes) == 0 {
...@@ -6381,6 +6410,11 @@ func (rl *http2clientConnReadLoop) endStreamError(cs *http2clientStream, err err ...@@ -6381,6 +6410,11 @@ func (rl *http2clientConnReadLoop) endStreamError(cs *http2clientStream, err err
if http2isConnectionCloseRequest(cs.req) { if http2isConnectionCloseRequest(cs.req) {
rl.closeWhenIdle = true rl.closeWhenIdle = true
} }
select {
case cs.resc <- http2resAndError{err: err}:
default:
}
} }
func (cs *http2clientStream) copyTrailers() { func (cs *http2clientStream) copyTrailers() {
...@@ -6425,6 +6459,16 @@ func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error ...@@ -6425,6 +6459,16 @@ func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error
cc.maxConcurrentStreams = s.Val cc.maxConcurrentStreams = s.Val
case http2SettingInitialWindowSize: case http2SettingInitialWindowSize:
if s.Val > math.MaxInt32 {
return http2ConnectionError(http2ErrCodeFlowControl)
}
delta := int32(s.Val) - int32(cc.initialWindowSize)
for _, cs := range cc.streams {
cs.flow.add(delta)
}
cc.cond.Broadcast()
cc.initialWindowSize = s.Val cc.initialWindowSize = s.Val
default: default:
...@@ -6475,7 +6519,7 @@ func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) er ...@@ -6475,7 +6519,7 @@ func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) er
case <-cs.peerReset: case <-cs.peerReset:
default: default:
err := http2StreamError{cs.ID, f.ErrCode} err := http2streamError(cs.ID, f.ErrCode)
cs.resetErr = err cs.resetErr = err
close(cs.peerReset) close(cs.peerReset)
cs.bufPipe.CloseWithError(err) cs.bufPipe.CloseWithError(err)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment