Commit 9b8080f3 authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: update bundled http2 copy

Updates golang.org/x/net/http2 to git rev 438097d76

Fixes #13444

Change-Id: I699ac02d23b56db3e8a27d3f599ae56cd0a5b4b2
Reviewed-on: https://go-review.googlesource.com/17570Reviewed-by: default avatarBrad Fitzpatrick <bradfitz@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
parent 616e45ea
...@@ -3858,6 +3858,7 @@ type http2clientStream struct { ...@@ -3858,6 +3858,7 @@ type http2clientStream struct {
inflow http2flow // guarded by cc.mu inflow http2flow // guarded by cc.mu
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
readErr error // sticky read error; owned by transportResponseBody.Read readErr error // sticky read error; owned by transportResponseBody.Read
stopReqBody bool // stop writing req body; guarded by cc.mu
peerReset chan struct{} // closed on peer reset peerReset chan struct{} // closed on peer reset
resetErr error // populated before peerReset is closed resetErr error // populated before peerReset is closed
...@@ -3874,6 +3875,14 @@ func (cs *http2clientStream) checkReset() error { ...@@ -3874,6 +3875,14 @@ func (cs *http2clientStream) checkReset() error {
} }
} }
func (cs *http2clientStream) abortRequestBodyWrite() {
cc := cs.cc
cc.mu.Lock()
cs.stopReqBody = true
cc.cond.Broadcast()
cc.mu.Unlock()
}
type http2stickyErrWriter struct { type http2stickyErrWriter struct {
w io.Writer w io.Writer
err *error err *error
...@@ -4202,26 +4211,25 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { ...@@ -4202,26 +4211,25 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
return nil, werr return nil, werr
} }
var bodyCopyErrc chan error var bodyCopyErrc chan error // result of body copy
var gotResHeaders chan struct{} // closed on resheaders
if hasBody { if hasBody {
bodyCopyErrc = make(chan error, 1) bodyCopyErrc = make(chan error, 1)
gotResHeaders = make(chan struct{})
go func() { go func() {
bodyCopyErrc <- cs.writeRequestBody(req.Body, gotResHeaders) bodyCopyErrc <- cs.writeRequestBody(req.Body)
}() }()
} }
for { for {
select { select {
case re := <-cs.resc: case re := <-cs.resc:
if gotResHeaders != nil { res := re.res
close(gotResHeaders) if re.err != nil || res.StatusCode > 299 {
cs.abortRequestBodyWrite()
} }
if re.err != nil { if re.err != nil {
return nil, re.err return nil, re.err
} }
res := re.res
res.Request = req res.Request = req
res.TLS = cc.tlsState res.TLS = cc.tlsState
return res, nil return res, nil
...@@ -4233,45 +4241,49 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { ...@@ -4233,45 +4241,49 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
} }
} }
var http2errServerResponseBeforeRequestBody = errors.New("http2: server sent response while still writing request body") // errAbortReqBodyWrite is an internal error value.
// It doesn't escape to callers.
var http2errAbortReqBodyWrite = errors.New("http2: aborting request body write")
func (cs *http2clientStream) writeRequestBody(body io.Reader, gotResHeaders <-chan struct{}) error { func (cs *http2clientStream) writeRequestBody(body io.ReadCloser) (err error) {
cc := cs.cc cc := cs.cc
sentEnd := false sentEnd := false
buf := cc.frameScratchBuffer() buf := cc.frameScratchBuffer()
defer cc.putFrameScratchBuffer(buf) defer cc.putFrameScratchBuffer(buf)
for !sentEnd { defer func() {
cerr := body.Close()
if err == nil {
err = cerr
}
}()
var sawEOF bool var sawEOF bool
n, err := io.ReadFull(body, buf) for !sawEOF {
if err == io.ErrUnexpectedEOF { n, err := body.Read(buf)
if err == io.EOF {
sawEOF = true sawEOF = true
err = nil err = nil
} else if err == io.EOF {
break
} else if err != nil { } else if err != nil {
return err return err
} }
toWrite := buf[:n] remain := buf[:n]
for len(toWrite) > 0 && err == nil { for len(remain) > 0 && err == nil {
var allowed int32 var allowed int32
allowed, err = cs.awaitFlowControl(int32(len(toWrite))) allowed, err = cs.awaitFlowControl(len(remain))
if err != nil { if err != nil {
return err return err
} }
cc.wmu.Lock() cc.wmu.Lock()
select { data := remain[:allowed]
case <-gotResHeaders: remain = remain[allowed:]
err = http2errServerResponseBeforeRequestBody sentEnd = sawEOF && len(remain) == 0
case <-cs.peerReset:
err = cs.resetErr
default:
data := toWrite[:allowed]
toWrite = toWrite[allowed:]
sentEnd = sawEOF && len(toWrite) == 0
err = cc.fr.WriteData(cs.ID, sentEnd, data) err = cc.fr.WriteData(cs.ID, sentEnd, data)
if err == nil {
err = cc.bw.Flush()
} }
cc.wmu.Unlock() cc.wmu.Unlock()
} }
...@@ -4280,8 +4292,6 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, gotResHeaders <-ch ...@@ -4280,8 +4292,6 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, gotResHeaders <-ch
} }
} }
var err error
cc.wmu.Lock() cc.wmu.Lock()
if !sentEnd { if !sentEnd {
err = cc.fr.WriteData(cs.ID, true, nil) err = cc.fr.WriteData(cs.ID, true, nil)
...@@ -4298,7 +4308,7 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, gotResHeaders <-ch ...@@ -4298,7 +4308,7 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, gotResHeaders <-ch
// control tokens from the server. // control tokens from the server.
// It returns either the non-zero number of tokens taken or an error // It returns either the non-zero number of tokens taken or an error
// if the stream is dead. // if the stream is dead.
func (cs *http2clientStream) awaitFlowControl(maxBytes int32) (taken int32, err error) { func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc cc := cs.cc
cc.mu.Lock() cc.mu.Lock()
defer cc.mu.Unlock() defer cc.mu.Unlock()
...@@ -4306,13 +4316,17 @@ func (cs *http2clientStream) awaitFlowControl(maxBytes int32) (taken int32, err ...@@ -4306,13 +4316,17 @@ func (cs *http2clientStream) awaitFlowControl(maxBytes int32) (taken int32, err
if cc.closed { if cc.closed {
return 0, http2errClientConnClosed return 0, http2errClientConnClosed
} }
if cs.stopReqBody {
return 0, http2errAbortReqBodyWrite
}
if err := cs.checkReset(); err != nil { if err := cs.checkReset(); err != nil {
return 0, err return 0, err
} }
if a := cs.flow.available(); a > 0 { if a := cs.flow.available(); a > 0 {
take := a take := a
if take > maxBytes { if int(take) > maxBytes {
take = maxBytes
take = int32(maxBytes)
} }
if take > int32(cc.maxFrameSize) { if take > int32(cc.maxFrameSize) {
take = int32(cc.maxFrameSize) take = int32(cc.maxFrameSize)
...@@ -4751,6 +4765,7 @@ func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) er ...@@ -4751,6 +4765,7 @@ func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) er
cs.resetErr = err cs.resetErr = err
close(cs.peerReset) close(cs.peerReset)
cs.bufPipe.CloseWithError(err) cs.bufPipe.CloseWithError(err)
cs.cc.cond.Broadcast()
} }
delete(rl.activeRes, cs.ID) delete(rl.activeRes, cs.ID)
return nil return nil
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment