Commit faf882d1 authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: make Server Handler's Request.Context be done on conn errors

This CL changes how the http1 Server reads from the client.

The goal of this change is to make the Request.Context given to Server
Handlers become done when the TCP connection dies (has seen any read
or write error). I didn't finish that for Go 1.7 when Context was
added to http package.

We can't notice the peer disconnect unless we're blocked in a Read
call, though, and previously we were only doing read calls as needed,
when reading the body or the next request. One exception to that was
the old pre-context CloseNotifier mechanism.

The implementation of CloseNotifier has always been tricky. The past
few releases have contained the complexity and moved the
reading-from-TCP-conn logic into the "connReader" type. This CL
extends connReader to make sure that it's always blocked in a Read
call, at least once the request body has been fully consumed.

In the process, this deletes all the old CloseNotify code and unifies
it with the context cancelation code. The two notification mechanisms
are nearly identical, except the CloseNotify path always notifies on
the arrival of pipelined HTTP/1 requests. We might want to change that
in a subsequent commit. I left a TODO for that. For now there's no
change in behavior except that the context now cancels as it was
supposed to.

As a bonus that fell out for free, a Handler can now use CloseNotifier
and Hijack together in the same request now.

Fixes #15224 (make http1 Server always in a Read, like http2)
Fixes #15927 (cancel context when underlying connection closes)
Updates #9763 (CloseNotifier + Hijack)

Change-Id: I972cf6ecbab7f1230efe8cc971e89f8e6e56196b
Reviewed-on: https://go-review.googlesource.com/31173
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarIan Lance Taylor <iant@golang.org>
parent 35e5fd0c
...@@ -6,6 +6,7 @@ package http ...@@ -6,6 +6,7 @@ package http
import ( import (
"strings" "strings"
"time"
"unicode/utf8" "unicode/utf8"
"golang_org/x/net/lex/httplex" "golang_org/x/net/lex/httplex"
...@@ -15,6 +16,10 @@ import ( ...@@ -15,6 +16,10 @@ import (
// Transport's byte-limiting readers. // Transport's byte-limiting readers.
const maxInt64 = 1<<63 - 1 const maxInt64 = 1<<63 - 1
// aLongTimeAgo is a non-zero time, far in the past, used for
// immediate cancelation of network operations.
var aLongTimeAgo = time.Unix(233431200, 0)
// TODO(bradfitz): move common stuff here. The other files have accumulated // TODO(bradfitz): move common stuff here. The other files have accumulated
// generic http stuff in random places. // generic http stuff in random places.
......
...@@ -4202,13 +4202,11 @@ func testServerRequestContextCancel_ServeHTTPDone(t *testing.T, h2 bool) { ...@@ -4202,13 +4202,11 @@ func testServerRequestContextCancel_ServeHTTPDone(t *testing.T, h2 bool) {
} }
} }
// Tests that the Request.Context available to the Handler is canceled
// if the peer closes their TCP connection. This requires that the server
// is always blocked in a Read call so it notices the EOF from the client.
// See issues 15927 and 15224.
func TestServerRequestContextCancel_ConnClose(t *testing.T) { func TestServerRequestContextCancel_ConnClose(t *testing.T) {
// Currently the context is not canceled when the connection
// is closed because we're not reading from the connection
// until after ServeHTTP for the previous handler is done.
// Until the server code is modified to always be in a read
// (Issue 15224), this test doesn't work yet.
t.Skip("TODO(bradfitz): this test doesn't yet work; golang.org/issue/15224")
defer afterTest(t) defer afterTest(t)
inHandler := make(chan struct{}) inHandler := make(chan struct{})
handlerDone := make(chan struct{}) handlerDone := make(chan struct{})
...@@ -4237,7 +4235,7 @@ func TestServerRequestContextCancel_ConnClose(t *testing.T) { ...@@ -4237,7 +4235,7 @@ func TestServerRequestContextCancel_ConnClose(t *testing.T) {
select { select {
case <-handlerDone: case <-handlerDone:
case <-time.After(3 * time.Second): case <-time.After(4 * time.Second):
t.Fatalf("timeout waiting to see ServeHTTP exit") t.Fatalf("timeout waiting to see ServeHTTP exit")
} }
} }
......
...@@ -208,6 +208,9 @@ type conn struct { ...@@ -208,6 +208,9 @@ type conn struct {
// Immutable; never nil. // Immutable; never nil.
server *Server server *Server
// cancelCtx cancels the connection-level context.
cancelCtx context.CancelFunc
// rwc is the underlying network connection. // rwc is the underlying network connection.
// This is never wrapped by other types and is the value given out // This is never wrapped by other types and is the value given out
// to CloseNotifier callers. It is usually of type *net.TCPConn or // to CloseNotifier callers. It is usually of type *net.TCPConn or
...@@ -247,6 +250,8 @@ type conn struct { ...@@ -247,6 +250,8 @@ type conn struct {
// mu guards hijackedv, use of bufr, (*response).closeNotifyCh. // mu guards hijackedv, use of bufr, (*response).closeNotifyCh.
mu sync.Mutex mu sync.Mutex
curReq atomic.Value // of *response (which has a Request in it)
// hijackedv is whether this connection has been hijacked // hijackedv is whether this connection has been hijacked
// by a Handler with the Hijacker interface. // by a Handler with the Hijacker interface.
// It is guarded by mu. // It is guarded by mu.
...@@ -264,8 +269,12 @@ func (c *conn) hijackLocked() (rwc net.Conn, buf *bufio.ReadWriter, err error) { ...@@ -264,8 +269,12 @@ func (c *conn) hijackLocked() (rwc net.Conn, buf *bufio.ReadWriter, err error) {
if c.hijackedv { if c.hijackedv {
return nil, nil, ErrHijacked return nil, nil, ErrHijacked
} }
c.r.abortPendingRead()
c.hijackedv = true c.hijackedv = true
rwc = c.rwc rwc = c.rwc
rwc.SetDeadline(time.Time{})
buf = bufio.NewReadWriter(c.bufr, bufio.NewWriter(rwc)) buf = bufio.NewReadWriter(c.bufr, bufio.NewWriter(rwc))
c.setState(rwc, StateHijacked) c.setState(rwc, StateHijacked)
return return
...@@ -415,9 +424,11 @@ type response struct { ...@@ -415,9 +424,11 @@ type response struct {
dateBuf [len(TimeFormat)]byte dateBuf [len(TimeFormat)]byte
clenBuf [10]byte clenBuf [10]byte
// closeNotifyCh is non-nil once CloseNotify is called. // closeNotifyCh is the channel returned by CloseNotify.
// Guarded by conn.mu // TODO(bradfitz): this is currently (for Go 1.8) always
closeNotifyCh <-chan bool // non-nil. Make this lazily-created again as it used to be.
closeNotifyCh chan bool
didCloseNotify int32 // atomic (only 0->1 winner should send)
} }
type atomicBool int32 type atomicBool int32
...@@ -550,60 +561,148 @@ type readResult struct { ...@@ -550,60 +561,148 @@ type readResult struct {
// call blocked in a background goroutine to wait for activity and // call blocked in a background goroutine to wait for activity and
// trigger a CloseNotifier channel. // trigger a CloseNotifier channel.
type connReader struct { type connReader struct {
r io.Reader conn *conn
remain int64 // bytes remaining
mu sync.Mutex // guards following
hasByte bool
byteBuf [1]byte
bgErr error // non-nil means error happened on background read
cond *sync.Cond
inRead bool
aborted bool // set true before conn.rwc deadline is set to past
remain int64 // bytes remaining
}
// ch is non-nil if a background read is in progress. func (cr *connReader) lock() {
// It is guarded by conn.mu. cr.mu.Lock()
ch chan readResult if cr.cond == nil {
cr.cond = sync.NewCond(&cr.mu)
}
}
func (cr *connReader) unlock() { cr.mu.Unlock() }
func (cr *connReader) startBackgroundRead() {
cr.lock()
defer cr.unlock()
if cr.inRead {
panic("invalid concurrent Body.Read call")
}
cr.inRead = true
go cr.backgroundRead()
}
func (cr *connReader) backgroundRead() {
n, err := cr.conn.rwc.Read(cr.byteBuf[:])
cr.lock()
if n == 1 {
cr.hasByte = true
// We were at EOF already (since we wouldn't be in a
// background read otherwise), so this is a pipelined
// HTTP request.
cr.closeNotifyFromPipelinedRequest()
}
if ne, ok := err.(net.Error); ok && cr.aborted && ne.Timeout() {
// Ignore this error. It's the expected error from
// another goroutine calling abortPendingRead.
} else if err != nil {
cr.handleReadError(err)
}
cr.aborted = false
cr.inRead = false
cr.unlock()
cr.cond.Broadcast()
}
func (cr *connReader) abortPendingRead() {
cr.lock()
defer cr.unlock()
if !cr.inRead {
return
}
cr.aborted = true
cr.conn.rwc.SetReadDeadline(aLongTimeAgo)
for cr.inRead {
cr.cond.Wait()
}
cr.conn.rwc.SetReadDeadline(time.Time{})
} }
func (cr *connReader) setReadLimit(remain int64) { cr.remain = remain } func (cr *connReader) setReadLimit(remain int64) { cr.remain = remain }
func (cr *connReader) setInfiniteReadLimit() { cr.remain = maxInt64 } func (cr *connReader) setInfiniteReadLimit() { cr.remain = maxInt64 }
func (cr *connReader) hitReadLimit() bool { return cr.remain <= 0 } func (cr *connReader) hitReadLimit() bool { return cr.remain <= 0 }
// may be called from multiple goroutines.
func (cr *connReader) handleReadError(err error) {
cr.conn.cancelCtx()
cr.closeNotify()
}
// closeNotifyFromPipelinedRequest simply calls closeNotify.
//
// This method wrapper is here for documentation. The callers are the
// cases where we send on the closenotify channel because of a
// pipelined HTTP request, per the previous Go behavior and
// documentation (that this "MAY" happen).
//
// TODO: consider changing this behavior and making context
// cancelation and closenotify work the same.
func (cr *connReader) closeNotifyFromPipelinedRequest() {
cr.closeNotify()
}
// may be called from multiple goroutines.
func (cr *connReader) closeNotify() {
res, _ := cr.conn.curReq.Load().(*response)
if res != nil {
if atomic.CompareAndSwapInt32(&res.didCloseNotify, 0, 1) {
res.closeNotifyCh <- true
}
}
}
func (cr *connReader) Read(p []byte) (n int, err error) { func (cr *connReader) Read(p []byte) (n int, err error) {
cr.lock()
if cr.inRead {
cr.unlock()
panic("invalid concurrent Body.Read call")
}
if cr.hitReadLimit() { if cr.hitReadLimit() {
cr.unlock()
return 0, io.EOF return 0, io.EOF
} }
if cr.bgErr != nil {
err = cr.bgErr
cr.unlock()
return 0, err
}
if len(p) == 0 { if len(p) == 0 {
return cr.unlock()
return 0, nil
} }
if int64(len(p)) > cr.remain { if int64(len(p)) > cr.remain {
p = p[:cr.remain] p = p[:cr.remain]
} }
if cr.hasByte {
// Is a background read (started by CloseNotifier) already in p[0] = cr.byteBuf[0]
// flight? If so, wait for it and use its result. cr.hasByte = false
ch := cr.ch cr.unlock()
if ch != nil { return 1, nil
cr.ch = nil
res := <-ch
if res.n == 1 {
p[0] = res.b
cr.remain -= 1
}
return res.n, res.err
} }
n, err = cr.r.Read(p) cr.inRead = true
cr.remain -= int64(n) cr.unlock()
return n, err = cr.conn.rwc.Read(p)
}
func (cr *connReader) startBackgroundRead(onReadComplete func()) { cr.lock()
if cr.ch != nil { cr.inRead = false
// Background read already started. if err != nil {
return cr.handleReadError(err)
} }
cr.ch = make(chan readResult, 1) cr.remain -= int64(n)
go cr.closeNotifyAwaitActivityRead(cr.ch, onReadComplete) cr.unlock()
}
func (cr *connReader) closeNotifyAwaitActivityRead(ch chan<- readResult, onReadComplete func()) { cr.cond.Broadcast()
var buf [1]byte return n, err
n, err := cr.r.Read(buf[:1])
onReadComplete()
ch <- readResult{n, err, buf[0]}
} }
var ( var (
...@@ -818,6 +917,7 @@ func (c *conn) readRequest(ctx context.Context) (w *response, err error) { ...@@ -818,6 +917,7 @@ func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
reqBody: req.Body, reqBody: req.Body,
handlerHeader: make(Header), handlerHeader: make(Header),
contentLength: -1, contentLength: -1,
closeNotifyCh: make(chan bool, 1),
// We populate these ahead of time so we're not // We populate these ahead of time so we're not
// reading from req.Header after their Handler starts // reading from req.Header after their Handler starts
...@@ -1358,6 +1458,8 @@ func (w *response) finishRequest() { ...@@ -1358,6 +1458,8 @@ func (w *response) finishRequest() {
w.cw.close() w.cw.close()
w.conn.bufw.Flush() w.conn.bufw.Flush()
w.conn.r.abortPendingRead()
// Close the body (regardless of w.closeAfterReply) so we can // Close the body (regardless of w.closeAfterReply) so we can
// re-use its bufio.Reader later safely. // re-use its bufio.Reader later safely.
w.reqBody.Close() w.reqBody.Close()
...@@ -1525,13 +1627,14 @@ func (c *conn) serve(ctx context.Context) { ...@@ -1525,13 +1627,14 @@ func (c *conn) serve(ctx context.Context) {
// HTTP/1.x from here on. // HTTP/1.x from here on.
c.r = &connReader{r: c.rwc}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
ctx, cancelCtx := context.WithCancel(ctx) ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx() defer cancelCtx()
c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
for { for {
w, err := c.readRequest(ctx) w, err := c.readRequest(ctx)
if c.r.remain != c.server.initialReadLimitSize() { if c.r.remain != c.server.initialReadLimitSize() {
...@@ -1575,11 +1678,24 @@ func (c *conn) serve(ctx context.Context) { ...@@ -1575,11 +1678,24 @@ func (c *conn) serve(ctx context.Context) {
return return
} }
c.curReq.Store(w)
if requestBodyRemains(req.Body) {
registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
} else {
if w.conn.bufr.Buffered() > 0 {
w.conn.r.closeNotifyFromPipelinedRequest()
}
w.conn.r.startBackgroundRead()
}
// HTTP cannot have multiple simultaneous active requests.[*] // HTTP cannot have multiple simultaneous active requests.[*]
// Until the server replies to this request, it can't read another, // Until the server replies to this request, it can't read another,
// so we might as well run the handler in this goroutine. // so we might as well run the handler in this goroutine.
// [*] Not strictly true: HTTP pipelining. We could let them all process // [*] Not strictly true: HTTP pipelining. We could let them all process
// in parallel even if their responses need to be serialized. // in parallel even if their responses need to be serialized.
// But we're not going to implement HTTP pipelining because it
// was never deployed in the wild and the answer is HTTP/2.
serverHandler{c.server}.ServeHTTP(w, w.req) serverHandler{c.server}.ServeHTTP(w, w.req)
w.cancelCtx() w.cancelCtx()
if c.hijacked() { if c.hijacked() {
...@@ -1593,6 +1709,7 @@ func (c *conn) serve(ctx context.Context) { ...@@ -1593,6 +1709,7 @@ func (c *conn) serve(ctx context.Context) {
return return
} }
c.setState(c.rwc, StateIdle) c.setState(c.rwc, StateIdle)
c.curReq.Store((*response)(nil))
} }
} }
...@@ -1628,10 +1745,6 @@ func (w *response) Hijack() (rwc net.Conn, buf *bufio.ReadWriter, err error) { ...@@ -1628,10 +1745,6 @@ func (w *response) Hijack() (rwc net.Conn, buf *bufio.ReadWriter, err error) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if w.closeNotifyCh != nil {
return nil, nil, errors.New("http: Hijack is incompatible with use of CloseNotifier in same ServeHTTP call")
}
// Release the bufioWriter that writes to the chunk writer, it is not // Release the bufioWriter that writes to the chunk writer, it is not
// used after a connection has been hijacked. // used after a connection has been hijacked.
rwc, buf, err = c.hijackLocked() rwc, buf, err = c.hijackLocked()
...@@ -1646,50 +1759,7 @@ func (w *response) CloseNotify() <-chan bool { ...@@ -1646,50 +1759,7 @@ func (w *response) CloseNotify() <-chan bool {
if w.handlerDone.isSet() { if w.handlerDone.isSet() {
panic("net/http: CloseNotify called after ServeHTTP finished") panic("net/http: CloseNotify called after ServeHTTP finished")
} }
c := w.conn return w.closeNotifyCh
c.mu.Lock()
defer c.mu.Unlock()
if w.closeNotifyCh != nil {
return w.closeNotifyCh
}
ch := make(chan bool, 1)
w.closeNotifyCh = ch
if w.conn.hijackedv {
// CloseNotify is undefined after a hijack, but we have
// no place to return an error, so just return a channel,
// even though it'll never receive a value.
return ch
}
var once sync.Once
notify := func() { once.Do(func() { ch <- true }) }
if requestBodyRemains(w.reqBody) {
// They're still consuming the request body, so we
// shouldn't notify yet.
registerOnHitEOF(w.reqBody, func() {
c.mu.Lock()
defer c.mu.Unlock()
startCloseNotifyBackgroundRead(c, notify)
})
} else {
startCloseNotifyBackgroundRead(c, notify)
}
return ch
}
// c.mu must be held.
func startCloseNotifyBackgroundRead(c *conn, notify func()) {
if c.bufr.Buffered() > 0 {
// They've consumed the request body, so anything
// remaining is a pipelined request, which we
// document as firing on.
notify()
} else {
c.r.startBackgroundRead(notify)
}
} }
func registerOnHitEOF(rc io.ReadCloser, fn func()) { func registerOnHitEOF(rc io.ReadCloser, fn func()) {
...@@ -2725,6 +2795,7 @@ func (w checkConnErrorWriter) Write(p []byte) (n int, err error) { ...@@ -2725,6 +2795,7 @@ func (w checkConnErrorWriter) Write(p []byte) (n int, err error) {
n, err = w.c.rwc.Write(p) n, err = w.c.rwc.Write(p)
if err != nil && w.c.werr == nil { if err != nil && w.c.werr == nil {
w.c.werr = err w.c.werr = err
w.c.cancelCtx()
} }
return return
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment