Commit 871d11af authored by Matt Holt's avatar Matt Holt Committed by GitHub

Merge pull request #1135 from mholt/proxyerrs

proxy: Improve failover logic and retries
parents 4adbcd25 6397a85e
...@@ -13,23 +13,32 @@ import ( ...@@ -13,23 +13,32 @@ import (
"github.com/mholt/caddy/caddyhttp/httpserver" "github.com/mholt/caddy/caddyhttp/httpserver"
) )
var errUnreachable = errors.New("unreachable backend")
// Proxy represents a middleware instance that can proxy requests. // Proxy represents a middleware instance that can proxy requests.
type Proxy struct { type Proxy struct {
Next httpserver.Handler Next httpserver.Handler
Upstreams []Upstream Upstreams []Upstream
} }
// Upstream manages a pool of proxy upstream hosts. Select should return a // Upstream manages a pool of proxy upstream hosts.
// suitable upstream host, or nil if no such hosts are available.
type Upstream interface { type Upstream interface {
// The path this upstream host should be routed on // The path this upstream host should be routed on
From() string From() string
// Selects an upstream host to be routed to.
// Selects an upstream host to be routed to. It
// should return a suitable upstream host, or nil
// if no such hosts are available.
Select(*http.Request) *UpstreamHost Select(*http.Request) *UpstreamHost
// Checks if subpath is not an ignored path // Checks if subpath is not an ignored path
AllowedPath(string) bool AllowedPath(string) bool
// Gets how long to try selecting upstream hosts
// in the case of cascading failures.
GetTryDuration() time.Duration
// Gets how long to wait between selecting upstream
// hosts in the case of cascading failures.
GetTryInterval() time.Duration
} }
// UpstreamHostDownFunc can be used to customize how Down behaves. // UpstreamHostDownFunc can be used to customize how Down behaves.
...@@ -71,10 +80,6 @@ func (uh *UpstreamHost) Available() bool { ...@@ -71,10 +80,6 @@ func (uh *UpstreamHost) Available() bool {
return !uh.Down() && !uh.Full() return !uh.Down() && !uh.Full()
} }
// tryDuration is how long to try upstream hosts; failures result in
// immediate retries until this duration ends or we get a nil host.
var tryDuration = 60 * time.Second
// ServeHTTP satisfies the httpserver.Handler interface. // ServeHTTP satisfies the httpserver.Handler interface.
func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) { func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) {
// start by selecting most specific matching upstream config // start by selecting most specific matching upstream config
...@@ -89,13 +94,33 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) { ...@@ -89,13 +94,33 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) {
// outreq is the request that makes a roundtrip to the backend // outreq is the request that makes a roundtrip to the backend
outreq := createUpstreamRequest(r) outreq := createUpstreamRequest(r)
// since Select() should give us "up" hosts, keep retrying // The keepRetrying function will return true if we should
// hosts until timeout (or until we get a nil host). // loop and try to select another host, or false if we
// should break and stop retrying.
start := time.Now() start := time.Now()
for time.Now().Sub(start) < tryDuration { keepRetrying := func() bool {
// if we've tried long enough, break
if time.Since(start) >= upstream.GetTryDuration() {
return false
}
// otherwise, wait and try the next available host
time.Sleep(upstream.GetTryInterval())
return true
}
var backendErr error
for {
// since Select() should give us "up" hosts, keep retrying
// hosts until timeout (or until we get a nil host).
host := upstream.Select(r) host := upstream.Select(r)
if host == nil { if host == nil {
return http.StatusBadGateway, errUnreachable if backendErr == nil {
backendErr = errors.New("no hosts available upstream")
}
if !keepRetrying() {
break
}
continue
} }
if rr, ok := w.(*httpserver.ResponseRecorder); ok && rr.Replacer != nil { if rr, ok := w.(*httpserver.ResponseRecorder); ok && rr.Replacer != nil {
rr.Replacer.Set("upstream", host.Name) rr.Replacer.Set("upstream", host.Name)
...@@ -141,29 +166,35 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) { ...@@ -141,29 +166,35 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) {
// tell the proxy to serve the request // tell the proxy to serve the request
atomic.AddInt64(&host.Conns, 1) atomic.AddInt64(&host.Conns, 1)
backendErr := proxy.ServeHTTP(w, outreq, downHeaderUpdateFn) backendErr = proxy.ServeHTTP(w, outreq, downHeaderUpdateFn)
atomic.AddInt64(&host.Conns, -1) atomic.AddInt64(&host.Conns, -1)
// if no errors, we're done here; otherwise failover // if no errors, we're done here
if backendErr == nil { if backendErr == nil {
return 0, nil return 0, nil
} }
// failover; remember this failure for some time if
// request failure counting is enabled
timeout := host.FailTimeout timeout := host.FailTimeout
if timeout == 0 { if timeout > 0 {
timeout = 10 * time.Second atomic.AddInt32(&host.Fails, 1)
go func(host *UpstreamHost, timeout time.Duration) {
time.Sleep(timeout)
atomic.AddInt32(&host.Fails, -1)
}(host, timeout)
}
// if we've tried long enough, break
if !keepRetrying() {
break
} }
atomic.AddInt32(&host.Fails, 1)
go func(host *UpstreamHost, timeout time.Duration) {
time.Sleep(timeout)
atomic.AddInt32(&host.Fails, -1)
}(host, timeout)
} }
return http.StatusBadGateway, errUnreachable return http.StatusBadGateway, backendErr
} }
// match finds the best match for a proxy config based // match finds the best match for a proxy config based on r.
// on r.
func (p Proxy) match(r *http.Request) Upstream { func (p Proxy) match(r *http.Request) Upstream {
var u Upstream var u Upstream
var longestMatch int var longestMatch int
......
...@@ -25,10 +25,6 @@ import ( ...@@ -25,10 +25,6 @@ import (
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
) )
func init() {
tryDuration = 50 * time.Millisecond // prevent tests from hanging
}
func TestReverseProxy(t *testing.T) { func TestReverseProxy(t *testing.T) {
log.SetOutput(ioutil.Discard) log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stderr) defer log.SetOutput(os.Stderr)
...@@ -792,9 +788,9 @@ func (u *fakeUpstream) Select(r *http.Request) *UpstreamHost { ...@@ -792,9 +788,9 @@ func (u *fakeUpstream) Select(r *http.Request) *UpstreamHost {
return u.host return u.host
} }
func (u *fakeUpstream) AllowedPath(requestPath string) bool { func (u *fakeUpstream) AllowedPath(requestPath string) bool { return true }
return true func (u *fakeUpstream) GetTryDuration() time.Duration { return 1 * time.Second }
} func (u *fakeUpstream) GetTryInterval() time.Duration { return 250 * time.Millisecond }
// newWebSocketTestProxy returns a test proxy that will // newWebSocketTestProxy returns a test proxy that will
// redirect to the specified backendAddr. The function // redirect to the specified backendAddr. The function
...@@ -834,9 +830,9 @@ func (u *fakeWsUpstream) Select(r *http.Request) *UpstreamHost { ...@@ -834,9 +830,9 @@ func (u *fakeWsUpstream) Select(r *http.Request) *UpstreamHost {
} }
} }
func (u *fakeWsUpstream) AllowedPath(requestPath string) bool { func (u *fakeWsUpstream) AllowedPath(requestPath string) bool { return true }
return true func (u *fakeWsUpstream) GetTryDuration() time.Duration { return 1 * time.Second }
} func (u *fakeWsUpstream) GetTryInterval() time.Duration { return 250 * time.Millisecond }
// recorderHijacker is a ResponseRecorder that can // recorderHijacker is a ResponseRecorder that can
// be hijacked. // be hijacked.
......
...@@ -31,6 +31,8 @@ type staticUpstream struct { ...@@ -31,6 +31,8 @@ type staticUpstream struct {
FailTimeout time.Duration FailTimeout time.Duration
MaxFails int32 MaxFails int32
TryDuration time.Duration
TryInterval time.Duration
MaxConns int64 MaxConns int64
HealthCheck struct { HealthCheck struct {
Client http.Client Client http.Client
...@@ -53,8 +55,8 @@ func NewStaticUpstreams(c caddyfile.Dispenser) ([]Upstream, error) { ...@@ -53,8 +55,8 @@ func NewStaticUpstreams(c caddyfile.Dispenser) ([]Upstream, error) {
downstreamHeaders: make(http.Header), downstreamHeaders: make(http.Header),
Hosts: nil, Hosts: nil,
Policy: &Random{}, Policy: &Random{},
FailTimeout: 10 * time.Second,
MaxFails: 1, MaxFails: 1,
TryInterval: 250 * time.Millisecond,
MaxConns: 0, MaxConns: 0,
KeepAlive: http.DefaultMaxIdleConnsPerHost, KeepAlive: http.DefaultMaxIdleConnsPerHost,
} }
...@@ -114,11 +116,6 @@ func NewStaticUpstreams(c caddyfile.Dispenser) ([]Upstream, error) { ...@@ -114,11 +116,6 @@ func NewStaticUpstreams(c caddyfile.Dispenser) ([]Upstream, error) {
return upstreams, nil return upstreams, nil
} }
// RegisterPolicy adds a custom policy to the proxy.
func RegisterPolicy(name string, policy func() Policy) {
supportedPolicies[name] = policy
}
func (u *staticUpstream) From() string { func (u *staticUpstream) From() string {
return u.from return u.from
} }
...@@ -141,8 +138,7 @@ func (u *staticUpstream) NewHost(host string) (*UpstreamHost, error) { ...@@ -141,8 +138,7 @@ func (u *staticUpstream) NewHost(host string) (*UpstreamHost, error) {
if uh.Unhealthy { if uh.Unhealthy {
return true return true
} }
if uh.Fails >= u.MaxFails && if uh.Fails >= u.MaxFails {
u.MaxFails != 0 {
return true return true
} }
return false return false
...@@ -237,7 +233,28 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { ...@@ -237,7 +233,28 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error {
if err != nil { if err != nil {
return err return err
} }
if n < 1 {
return c.Err("max_fails must be at least 1")
}
u.MaxFails = int32(n) u.MaxFails = int32(n)
case "try_duration":
if !c.NextArg() {
return c.ArgErr()
}
dur, err := time.ParseDuration(c.Val())
if err != nil {
return err
}
u.TryDuration = dur
case "try_interval":
if !c.NextArg() {
return c.ArgErr()
}
interval, err := time.ParseDuration(c.Val())
if err != nil {
return err
}
u.TryInterval = interval
case "max_conns": case "max_conns":
if !c.NextArg() { if !c.NextArg() {
return c.ArgErr() return c.ArgErr()
...@@ -397,3 +414,18 @@ func (u *staticUpstream) AllowedPath(requestPath string) bool { ...@@ -397,3 +414,18 @@ func (u *staticUpstream) AllowedPath(requestPath string) bool {
} }
return true return true
} }
// GetTryDuration returns u.TryDuration.
func (u *staticUpstream) GetTryDuration() time.Duration {
return u.TryDuration
}
// GetTryInterval returns u.TryInterval.
func (u *staticUpstream) GetTryInterval() time.Duration {
return u.TryInterval
}
// RegisterPolicy adds a custom policy to the proxy.
func RegisterPolicy(name string, policy func() Policy) {
supportedPolicies[name] = policy
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment