Commit a7766c90 authored by Pieter Raubenheimer's avatar Pieter Raubenheimer

Add common method for checking host availability

parent ce8ee831
...@@ -25,11 +25,11 @@ type Random struct{} ...@@ -25,11 +25,11 @@ type Random struct{}
// Select selects an up host at random from the specified pool. // Select selects an up host at random from the specified pool.
func (r *Random) Select(pool HostPool) *UpstreamHost { func (r *Random) Select(pool HostPool) *UpstreamHost {
// instead of just generating a random index // instead of just generating a random index
// this is done to prevent selecting a down host // this is done to prevent selecting a unavailable host
var randHost *UpstreamHost var randHost *UpstreamHost
count := 0 count := 0
for _, host := range pool { for _, host := range pool {
if host.Down() { if !host.Available() {
continue continue
} }
count++ count++
...@@ -56,7 +56,7 @@ func (r *LeastConn) Select(pool HostPool) *UpstreamHost { ...@@ -56,7 +56,7 @@ func (r *LeastConn) Select(pool HostPool) *UpstreamHost {
count := 0 count := 0
leastConn := int64(1<<63 - 1) leastConn := int64(1<<63 - 1)
for _, host := range pool { for _, host := range pool {
if host.Down() { if !host.Available() {
continue continue
} }
hostConns := host.Conns hostConns := host.Conns
...@@ -90,11 +90,11 @@ func (r *RoundRobin) Select(pool HostPool) *UpstreamHost { ...@@ -90,11 +90,11 @@ func (r *RoundRobin) Select(pool HostPool) *UpstreamHost {
poolLen := uint32(len(pool)) poolLen := uint32(len(pool))
selection := atomic.AddUint32(&r.Robin, 1) % poolLen selection := atomic.AddUint32(&r.Robin, 1) % poolLen
host := pool[selection] host := pool[selection]
// if the currently selected host is down, just ffwd to up host // if the currently selected host is not available, just ffwd to up host
for i := uint32(1); host.Down() && i < poolLen; i++ { for i := uint32(1); !host.Available() && i < poolLen; i++ {
host = pool[(selection+i)%poolLen] host = pool[(selection+i)%poolLen]
} }
if host.Down() { if !host.Available() {
return nil return nil
} }
return host return host
......
...@@ -53,12 +53,23 @@ func TestRoundRobinPolicy(t *testing.T) { ...@@ -53,12 +53,23 @@ func TestRoundRobinPolicy(t *testing.T) {
if h != pool[2] { if h != pool[2] {
t.Error("Expected second round robin host to be third host in the pool.") t.Error("Expected second round robin host to be third host in the pool.")
} }
// mark host as down
pool[0].Unhealthy = true
h = rrPolicy.Select(pool) h = rrPolicy.Select(pool)
if h != pool[1] { if h != pool[0] {
t.Error("Expected third round robin host to be first host in the pool.") t.Error("Expected third round robin host to be first host in the pool.")
} }
// mark host as down
pool[1].Unhealthy = true
h = rrPolicy.Select(pool)
if h != pool[2] {
t.Error("Expected to skip down host.")
}
// mark host as full
pool[2].Conns = 1
pool[2].MaxConns = 1
h = rrPolicy.Select(pool)
if h != pool[0] {
t.Error("Expected to skip full host.")
}
} }
func TestLeastConnPolicy(t *testing.T) { func TestLeastConnPolicy(t *testing.T) {
......
...@@ -44,6 +44,7 @@ type UpstreamHost struct { ...@@ -44,6 +44,7 @@ type UpstreamHost struct {
ExtraHeaders http.Header ExtraHeaders http.Header
CheckDown UpstreamHostDownFunc CheckDown UpstreamHostDownFunc
WithoutPathPrefix string WithoutPathPrefix string
MaxConns int64
} }
// Down checks whether the upstream host is down or not. // Down checks whether the upstream host is down or not.
...@@ -57,6 +58,16 @@ func (uh *UpstreamHost) Down() bool { ...@@ -57,6 +58,16 @@ func (uh *UpstreamHost) Down() bool {
return uh.CheckDown(uh) return uh.CheckDown(uh)
} }
// Full checks whether the upstream host has reached its maximum connections
func (uh *UpstreamHost) Full() bool {
return uh.MaxConns > 0 && uh.Conns >= uh.MaxConns
}
// Available checks whether the upstream host is available for proxying to
func (uh *UpstreamHost) Available() bool {
return !uh.Down() && !uh.Full()
}
// tryDuration is how long to try upstream hosts; failures result in // tryDuration is how long to try upstream hosts; failures result in
// immediate retries until this duration ends or we get a nil host. // immediate retries until this duration ends or we get a nil host.
var tryDuration = 60 * time.Second var tryDuration = 60 * time.Second
......
...@@ -80,10 +80,6 @@ func NewStaticUpstreams(c parse.Dispenser) ([]Upstream, error) { ...@@ -80,10 +80,6 @@ func NewStaticUpstreams(c parse.Dispenser) ([]Upstream, error) {
ExtraHeaders: upstream.proxyHeaders, ExtraHeaders: upstream.proxyHeaders,
CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc {
return func(uh *UpstreamHost) bool { return func(uh *UpstreamHost) bool {
if upstream.MaxConns != 0 &&
uh.Conns >= upstream.MaxConns {
return true
}
if uh.Unhealthy { if uh.Unhealthy {
return true return true
} }
...@@ -95,6 +91,7 @@ func NewStaticUpstreams(c parse.Dispenser) ([]Upstream, error) { ...@@ -95,6 +91,7 @@ func NewStaticUpstreams(c parse.Dispenser) ([]Upstream, error) {
} }
}(upstream), }(upstream),
WithoutPathPrefix: upstream.WithoutPathPrefix, WithoutPathPrefix: upstream.WithoutPathPrefix,
MaxConns: upstream.MaxConns,
} }
if baseURL, err := url.Parse(uh.Name); err == nil { if baseURL, err := url.Parse(uh.Name); err == nil {
uh.ReverseProxy = NewSingleHostReverseProxy(baseURL, uh.WithoutPathPrefix) uh.ReverseProxy = NewSingleHostReverseProxy(baseURL, uh.WithoutPathPrefix)
...@@ -234,19 +231,19 @@ func (u *staticUpstream) HealthCheckWorker(stop chan struct{}) { ...@@ -234,19 +231,19 @@ func (u *staticUpstream) HealthCheckWorker(stop chan struct{}) {
func (u *staticUpstream) Select() *UpstreamHost { func (u *staticUpstream) Select() *UpstreamHost {
pool := u.Hosts pool := u.Hosts
if len(pool) == 1 { if len(pool) == 1 {
if pool[0].Down() { if !pool[0].Available() {
return nil return nil
} }
return pool[0] return pool[0]
} }
allDown := true allUnavailable := true
for _, host := range pool { for _, host := range pool {
if !host.Down() { if host.Available() {
allDown = false allUnavailable = false
break break
} }
} }
if allDown { if allUnavailable {
return nil return nil
} }
......
...@@ -40,6 +40,19 @@ func TestSelect(t *testing.T) { ...@@ -40,6 +40,19 @@ func TestSelect(t *testing.T) {
if h := upstream.Select(); h == nil { if h := upstream.Select(); h == nil {
t.Error("Expected select to not return nil") t.Error("Expected select to not return nil")
} }
upstream.Hosts[0].Conns = 1
upstream.Hosts[0].MaxConns = 1
upstream.Hosts[1].Conns = 1
upstream.Hosts[1].MaxConns = 1
upstream.Hosts[2].Conns = 1
upstream.Hosts[2].MaxConns = 1
if h := upstream.Select(); h != nil {
t.Error("Expected select to return nil as all hosts are full")
}
upstream.Hosts[2].Conns = 0
if h := upstream.Select(); h == nil {
t.Error("Expected select to not return nil")
}
} }
func TestRegisterPolicy(t *testing.T) { func TestRegisterPolicy(t *testing.T) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment