Commit 7504862e authored by comp500's avatar comp500 Committed by Łukasz Nowak

Add upstream header replacements (TODO: tests, docs)

parent 80dfb8b2
......@@ -92,8 +92,10 @@ type UpstreamHost struct {
// This is an int32 so that we can use atomic operations to do concurrent
// reads & writes to this value. The default value of 0 indicates that it
// is healthy and any non-zero value indicates unhealthy.
Unhealthy int32
HealthCheckResult atomic.Value
Unhealthy int32
HealthCheckResult atomic.Value
UpstreamHeaderReplacements headerReplacements
DownstreamHeaderReplacements headerReplacements
}
// Down checks whether the upstream host is down or not.
......@@ -220,7 +222,7 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) {
// set headers for request going upstream
if host.UpstreamHeaders != nil {
// modify headers for request that will be sent to the upstream host
mutateHeadersByRules(outreq.Header, host.UpstreamHeaders, replacer)
mutateHeadersByRules(outreq.Header, host.UpstreamHeaders, replacer, host.UpstreamHeaderReplacements)
if hostHeaders, ok := outreq.Header["Host"]; ok && len(hostHeaders) > 0 {
outreq.Host = hostHeaders[len(hostHeaders)-1]
}
......@@ -230,7 +232,7 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) {
// headers coming back downstream
var downHeaderUpdateFn respUpdateFn
if host.DownstreamHeaders != nil {
downHeaderUpdateFn = createRespHeaderUpdateFn(host.DownstreamHeaders, replacer)
downHeaderUpdateFn = createRespHeaderUpdateFn(host.DownstreamHeaders, replacer, host.UpstreamHeaderReplacements)
}
// Before we retry the request we have to make sure
......@@ -376,13 +378,13 @@ func createUpstreamRequest(rw http.ResponseWriter, r *http.Request) (*http.Reque
return outreq, cancel
}
func createRespHeaderUpdateFn(rules http.Header, replacer httpserver.Replacer) respUpdateFn {
func createRespHeaderUpdateFn(rules http.Header, replacer httpserver.Replacer, replacements headerReplacements) respUpdateFn {
return func(resp *http.Response) {
mutateHeadersByRules(resp.Header, rules, replacer)
mutateHeadersByRules(resp.Header, rules, replacer, replacements)
}
}
func mutateHeadersByRules(headers, rules http.Header, repl httpserver.Replacer) {
func mutateHeadersByRules(headers, rules http.Header, repl httpserver.Replacer, replacements headerReplacements) {
for ruleField, ruleValues := range rules {
if strings.HasPrefix(ruleField, "+") {
for _, ruleValue := range ruleValues {
......@@ -400,6 +402,17 @@ func mutateHeadersByRules(headers, rules http.Header, repl httpserver.Replacer)
}
}
}
for ruleField, ruleValues := range replacements {
for _, ruleValue := range ruleValues {
replacement := repl.Replace(ruleValue.to)
original := headers.Get(ruleField)
if len(replacement) > 0 && len(original) > 0 {
replaced := ruleValue.regexp.ReplaceAllString(original, replacement)
headers.Set(ruleField, replaced)
}
}
}
}
const CustomStatusContextCancelled = 499
......@@ -23,8 +23,10 @@ import (
"io/ioutil"
"net"
"net/http"
"net/textproto"
"net/url"
"path"
"regexp"
"strconv"
"strings"
"sync"
......@@ -71,12 +73,30 @@ type staticUpstream struct {
MaxFails int32
resolver srvResolver
CaCertPool *x509.CertPool
upstreamHeaderReplacements headerReplacements
downstreamHeaderReplacements headerReplacements
}
type srvResolver interface {
LookupSRV(context.Context, string, string, string) (string, []*net.SRV, error)
}
type headerReplacement struct {
regexp *regexp.Regexp
to string
}
type headerReplacements map[string][]headerReplacement
func (h headerReplacements) Add(key string, value headerReplacement) {
key = textproto.CanonicalMIMEHeaderKey(key)
h[key] = append(h[key], value)
}
func (h headerReplacements) Del(key string) {
delete(h, textproto.CanonicalMIMEHeaderKey(key))
}
// NewStaticUpstreams parses the configuration input and sets up
// static upstreams for the proxy middleware. The host string parameter,
// if not empty, is used for setting the upstream Host header for the
......@@ -86,18 +106,20 @@ func NewStaticUpstreams(c caddyfile.Dispenser, host string) ([]Upstream, error)
for c.Next() {
upstream := &staticUpstream{
from: "",
stop: make(chan struct{}),
upstreamHeaders: make(http.Header),
downstreamHeaders: make(http.Header),
Hosts: nil,
Policy: &Random{},
MaxFails: 1,
TryInterval: 250 * time.Millisecond,
MaxConns: 0,
KeepAlive: http.DefaultMaxIdleConnsPerHost,
Timeout: 30 * time.Second,
resolver: net.DefaultResolver,
from: "",
stop: make(chan struct{}),
upstreamHeaders: make(http.Header),
downstreamHeaders: make(http.Header),
Hosts: nil,
Policy: &Random{},
MaxFails: 1,
TryInterval: 250 * time.Millisecond,
MaxConns: 0,
KeepAlive: http.DefaultMaxIdleConnsPerHost,
Timeout: 30 * time.Second,
resolver: net.DefaultResolver,
upstreamHeaderReplacements: make(headerReplacements),
downstreamHeaderReplacements: make(headerReplacements),
}
if !c.Args(&upstream.from) {
......@@ -220,9 +242,11 @@ func (u *staticUpstream) NewHost(host string) (*UpstreamHost, error) {
return false
}
}(u),
WithoutPathPrefix: u.WithoutPathPrefix,
MaxConns: u.MaxConns,
HealthCheckResult: atomic.Value{},
WithoutPathPrefix: u.WithoutPathPrefix,
MaxConns: u.MaxConns,
HealthCheckResult: atomic.Value{},
UpstreamHeaderReplacements: u.upstreamHeaderReplacements,
DownstreamHeaderReplacements: u.downstreamHeaderReplacements,
}
baseURL, err := url.Parse(uh.Name)
......@@ -431,23 +455,47 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream, hasSrv bool) error {
}
u.HealthCheck.ContentString = c.Val()
case "header_upstream":
var header, value string
if !c.Args(&header, &value) {
// When removing a header, the value can be optional.
if !strings.HasPrefix(header, "-") {
var header, value, replaced string
if c.Args(&header, &value, &replaced) {
// Don't allow - or + in replacements
if strings.HasPrefix(header, "-") || strings.HasPrefix(header, "+") {
return c.ArgErr()
}
r, err := regexp.Compile(value)
if err != nil {
return c.ArgErr()
}
u.upstreamHeaderReplacements.Add(header, headerReplacement{r, replaced})
} else {
if !c.Args(&header, &value) {
// When removing a header, the value can be optional.
if !strings.HasPrefix(header, "-") {
return c.ArgErr()
}
}
u.upstreamHeaders.Add(header, value)
}
u.upstreamHeaders.Add(header, value)
case "header_downstream":
var header, value string
if !c.Args(&header, &value) {
// When removing a header, the value can be optional.
if !strings.HasPrefix(header, "-") {
var header, value, replaced string
if c.Args(&header, &value, &replaced) {
// Don't allow - or + in replacements
if strings.HasPrefix(header, "-") || strings.HasPrefix(header, "+") {
return c.ArgErr()
}
r, err := regexp.Compile(value)
if err != nil {
return c.ArgErr()
}
u.upstreamHeaderReplacements.Add(header, headerReplacement{r, replaced})
} else {
if !c.Args(&header, &value) {
// When removing a header, the value can be optional.
if !strings.HasPrefix(header, "-") {
return c.ArgErr()
}
}
u.downstreamHeaders.Add(header, value)
}
u.downstreamHeaders.Add(header, value)
case "transparent":
// Note: X-Forwarded-For header is always being appended for proxy connections
// See implementation of createUpstreamRequest in proxy.go
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment