Commit 8048e9c3 authored by Leonard Hecker's avatar Leonard Hecker

proxy: Added unbuffered request optimization

If only one upstream is defined we don't need to buffer the body.
Instead we directly stream the body to the upstream host,
which reduces memory usage as well as latency.
Furthermore this enables different kinds of HTTP streaming
applications like gRPC for instance.
parent fab3b5bf
...@@ -39,6 +39,9 @@ type Upstream interface { ...@@ -39,6 +39,9 @@ type Upstream interface {
// Gets how long to wait between selecting upstream // Gets how long to wait between selecting upstream
// hosts in the case of cascading failures. // hosts in the case of cascading failures.
GetTryInterval() time.Duration GetTryInterval() time.Duration
// Gets the number of upstream hosts.
GetHostCount() int
} }
// UpstreamHostDownFunc can be used to customize how Down behaves. // UpstreamHostDownFunc can be used to customize how Down behaves.
...@@ -94,13 +97,26 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) { ...@@ -94,13 +97,26 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) {
// outreq is the request that makes a roundtrip to the backend // outreq is the request that makes a roundtrip to the backend
outreq := createUpstreamRequest(r) outreq := createUpstreamRequest(r)
// record and replace outreq body // If we have more than one upstream host defined and if retrying is enabled
body, err := newBufferedBody(outreq.Body) // by setting try_duration to a non-zero value, caddy will try to
if err != nil { // retry the request at a different host if the first one failed.
return http.StatusBadRequest, errors.New("failed to read downstream request body") //
} // This requires us to possibly rewind and replay the request body though,
if body != nil { // which in turn requires us to buffer the request body first.
outreq.Body = body //
// An unbuffered request is usually preferrable, because it reduces latency
// as well as memory usage. Furthermore it enables different kinds of
// HTTP streaming applications like gRPC for instance.
requiresBuffering := upstream.GetHostCount() > 1 && upstream.GetTryDuration() != 0
if requiresBuffering {
body, err := newBufferedBody(outreq.Body)
if err != nil {
return http.StatusBadRequest, errors.New("failed to read downstream request body")
}
if body != nil {
outreq.Body = body
}
} }
// The keepRetrying function will return true if we should // The keepRetrying function will return true if we should
...@@ -173,15 +189,25 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) { ...@@ -173,15 +189,25 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) {
downHeaderUpdateFn = createRespHeaderUpdateFn(host.DownstreamHeaders, replacer) downHeaderUpdateFn = createRespHeaderUpdateFn(host.DownstreamHeaders, replacer)
} }
// rewind request body to its beginning // Before we retry the request we have to make sure
if err := body.rewind(); err != nil { // that the body is rewound to it's beginning.
return http.StatusInternalServerError, errors.New("unable to rewind downstream request body") if bb, ok := outreq.Body.(*bufferedBody); ok {
if err := bb.rewind(); err != nil {
return http.StatusInternalServerError, errors.New("unable to rewind downstream request body")
}
} }
// tell the proxy to serve the request // tell the proxy to serve the request
atomic.AddInt64(&host.Conns, 1) //
backendErr = proxy.ServeHTTP(w, outreq, downHeaderUpdateFn) // NOTE:
atomic.AddInt64(&host.Conns, -1) // The call to proxy.ServeHTTP can theoretically panic.
// To prevent host.Conns from getting out-of-sync we thus have to
// make sure that it's _always_ correctly decremented afterwards.
func() {
atomic.AddInt64(&host.Conns, 1)
defer atomic.AddInt64(&host.Conns, -1)
backendErr = proxy.ServeHTTP(w, outreq, downHeaderUpdateFn)
}()
// if no errors, we're done here // if no errors, we're done here
if backendErr == nil { if backendErr == nil {
......
...@@ -423,6 +423,10 @@ func (u *staticUpstream) GetTryInterval() time.Duration { ...@@ -423,6 +423,10 @@ func (u *staticUpstream) GetTryInterval() time.Duration {
return u.TryInterval return u.TryInterval
} }
func (u *staticUpstream) GetHostCount() int {
return len(u.Hosts)
}
// RegisterPolicy adds a custom policy to the proxy. // RegisterPolicy adds a custom policy to the proxy.
func RegisterPolicy(name string, policy func() Policy) { func RegisterPolicy(name string, policy func() Policy) {
supportedPolicies[name] = policy supportedPolicies[name] = policy
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment