Commit 51421f25 authored by Mike Kozono's avatar Mike Kozono

Implement Geo proxy routing

By default, a Geo proxy will proxy all requests to the URL returned by
the API endpoint `/api/v4/geo/proxy`.

Certain routes, defined in `GeoLocalRoutes`, can be served locally for
improved performance. Many more local routes will be added later.
parent b5438e74
...@@ -337,6 +337,42 @@ func configureRoutes(u *upstream) { ...@@ -337,6 +337,42 @@ func configureRoutes(u *upstream) {
u.route("", "", defaultUpstream), u.route("", "", defaultUpstream),
} }
// Routes which should actually be served locally by a Geo Proxy. If none
// matches, then then proxy the request.
u.geoLocalRoutes = []routeEntry{
// Git and LFS requests
//
// Note that Geo already redirects pushes, with special terminal output.
// Note that excessive secondary lag can cause unexpected behavior since
// pulls are performed against a different source of truth. Ideally, we'd
// proxy/redirect pulls as well, when the secondary is not up-to-date.
//
u.route("GET", gitProjectPattern+`info/refs\z`, git.GetInfoRefsHandler(api)),
u.route("POST", gitProjectPattern+`git-upload-pack\z`, contentEncodingHandler(git.UploadPack(api)), withMatcher(isContentType("application/x-git-upload-pack-request"))),
u.route("POST", gitProjectPattern+`git-receive-pack\z`, contentEncodingHandler(git.ReceivePack(api)), withMatcher(isContentType("application/x-git-receive-pack-request"))),
u.route("PUT", gitProjectPattern+`gitlab-lfs/objects/([0-9a-f]{64})/([0-9]+)\z`, lfs.PutStore(api, signingProxy, preparers.lfs), withMatcher(isContentType("application/octet-stream"))),
// Serve health checks from this Geo secondary
u.route("", "^/-/(readiness|liveness)$", static.DeployPage(probeUpstream)),
u.route("", "^/-/health$", static.DeployPage(healthUpstream)),
u.route("", "^/-/metrics$", defaultUpstream),
// Authentication routes
u.route("", "^/users/(sign_in|sign_out)$", defaultUpstream),
u.route("", "^/oauth/geo/(auth|callback|logout)$", defaultUpstream),
// Admin Area > Geo routes
u.route("", "^/admin/geo$", defaultUpstream),
u.route("", "^/admin/geo/", defaultUpstream),
// Geo API routes
u.route("", "^/api/v4/geo_nodes", defaultUpstream),
u.route("", "^/api/v4/geo_replication", defaultUpstream),
// Don't define a catch-all route. If a route does not match, then we know
// the request should be proxied.
}
} }
func createUploadPreparers(cfg config.Config) uploadPreparers { func createUploadPreparers(cfg config.Config) uploadPreparers {
......
...@@ -9,8 +9,10 @@ package upstream ...@@ -9,8 +9,10 @@ package upstream
import ( import (
"fmt" "fmt"
"os" "os"
"sync"
"net/http" "net/http"
"net/url"
"strings" "strings"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
...@@ -21,6 +23,7 @@ import ( ...@@ -21,6 +23,7 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/log" "gitlab.com/gitlab-org/gitlab/workhorse/internal/log"
proxypkg "gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/rejectmethods" "gitlab.com/gitlab-org/gitlab/workhorse/internal/rejectmethods"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upload" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper"
...@@ -41,8 +44,13 @@ type upstream struct { ...@@ -41,8 +44,13 @@ type upstream struct {
RoundTripper http.RoundTripper RoundTripper http.RoundTripper
CableRoundTripper http.RoundTripper CableRoundTripper http.RoundTripper
APIClient *apipkg.API APIClient *apipkg.API
geoProxyBackend *url.URL
geoLocalRoutes []routeEntry
geoProxyCableRoute routeEntry
geoProxyRoute routeEntry
accessLogger *logrus.Logger accessLogger *logrus.Logger
enableGeoProxyFeature bool enableGeoProxyFeature bool
mu sync.RWMutex
} }
func NewUpstream(cfg config.Config, accessLogger *logrus.Logger) http.Handler { func NewUpstream(cfg config.Config, accessLogger *logrus.Logger) http.Handler {
...@@ -119,25 +127,9 @@ func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) { ...@@ -119,25 +127,9 @@ func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
// Look for a matching route cleanedPath := prefix.Strip(URIPath)
var route *routeEntry
if u.enableGeoProxyFeature { route := u.findRoute(cleanedPath, r)
geoProxyURL, err := u.APIClient.GetGeoProxyURL()
if err == nil {
log.WithRequest(r).WithFields(log.Fields{"geoProxyURL": geoProxyURL}).Info("Geo Proxy: Set route according to Geo Proxy logic")
} else if err != apipkg.ErrNotGeoSecondary {
log.WithRequest(r).WithError(err).Error("Geo Proxy: Unable to determine Geo Proxy URL. Falling back to normal routing")
}
}
for _, ro := range u.Routes {
if ro.isMatch(prefix.Strip(URIPath), r) {
route = &ro
break
}
}
if route == nil { if route == nil {
// The protocol spec in git/Documentation/technical/http-protocol.txt // The protocol spec in git/Documentation/technical/http-protocol.txt
...@@ -152,3 +144,65 @@ func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) { ...@@ -152,3 +144,65 @@ func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
route.handler.ServeHTTP(w, r) route.handler.ServeHTTP(w, r)
} }
func (u *upstream) findRoute(cleanedPath string, r *http.Request) *routeEntry {
if u.enableGeoProxyFeature {
if route := u.findGeoProxyRoute(cleanedPath, r); route != nil {
return route
}
}
for _, ro := range u.Routes {
if ro.isMatch(cleanedPath, r) {
return &ro
}
}
return nil
}
func (u *upstream) findGeoProxyRoute(cleanedPath string, r *http.Request) *routeEntry {
geoProxyURL, err := u.APIClient.GetGeoProxyURL()
if err == nil {
u.setGeoProxyRoutes(geoProxyURL)
return u.matchGeoProxyRoute(cleanedPath, r)
} else if err != apipkg.ErrNotGeoSecondary {
log.WithRequest(r).WithError(err).Error("Geo Proxy: Unable to determine Geo Proxy URL. Falling back to normal routing")
}
return nil
}
func (u *upstream) matchGeoProxyRoute(cleanedPath string, r *http.Request) *routeEntry {
// Some routes are safe to serve from this GitLab instance
for _, ro := range u.geoLocalRoutes {
if ro.isMatch(cleanedPath, r) {
log.WithRequest(r).Debug("Geo Proxy: Handle this request locally")
return &ro
}
}
log.WithRequest(r).WithFields(log.Fields{"geoProxyBackend": u.geoProxyBackend}).Debug("Geo Proxy: Forward this request")
u.mu.RLock()
defer u.mu.RUnlock()
if cleanedPath == "/-/cable" {
return &u.geoProxyCableRoute
}
return &u.geoProxyRoute
}
func (u *upstream) setGeoProxyRoutes(geoProxyURL *url.URL) {
u.mu.Lock()
defer u.mu.Unlock()
if u.geoProxyBackend == nil || u.geoProxyBackend.String() != geoProxyURL.String() {
log.WithFields(log.Fields{"geoProxyURL": geoProxyURL}).Debug("Geo Proxy: Update GeoProxyRoute")
u.geoProxyBackend = geoProxyURL
geoProxyRoundTripper := roundtripper.NewBackendRoundTripper(u.geoProxyBackend, "", u.ProxyHeadersTimeout, u.DevelopmentMode)
geoProxyUpstream := proxypkg.NewProxy(u.geoProxyBackend, u.Version, geoProxyRoundTripper)
u.geoProxyCableRoute = u.wsRoute(`^/-/cable\z`, geoProxyUpstream)
u.geoProxyRoute = u.route("", "", geoProxyUpstream)
}
}
package upstream package upstream
import ( import (
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
...@@ -11,8 +12,21 @@ import ( ...@@ -11,8 +12,21 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/testhelper"
) )
const (
geoProxyEndpoint = "/api/v4/geo/proxy"
testDocumentRoot = "testdata/public"
)
type testCase struct {
desc string
path string
expectedResponse string
}
func TestRouting(t *testing.T) { func TestRouting(t *testing.T) {
handle := func(u *upstream, regex string) routeEntry { handle := func(u *upstream, regex string) routeEntry {
handler := http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { handler := http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
...@@ -34,15 +48,10 @@ func TestRouting(t *testing.T) { ...@@ -34,15 +48,10 @@ func TestRouting(t *testing.T) {
handle(u, main), handle(u, main),
} }
}) })
ts := httptest.NewServer(u) ts := httptest.NewServer(u)
defer ts.Close() defer ts.Close()
testCases := []struct { testCases := []testCase{
desc string
path string
route string
}{
{"main route works", "/", main}, {"main route works", "/", main},
{"foobar route works", "/foobar", foobar}, {"foobar route works", "/foobar", foobar},
{"quxbaz route works", "/quxbaz", quxbaz}, {"quxbaz route works", "/quxbaz", quxbaz},
...@@ -51,9 +60,109 @@ func TestRouting(t *testing.T) { ...@@ -51,9 +60,109 @@ func TestRouting(t *testing.T) {
{"double escaped path traversal does not match any route", "/foobar%252f%252e%252e%252fquxbaz", main}, {"double escaped path traversal does not match any route", "/foobar%252f%252e%252e%252fquxbaz", main},
} }
runTestCases(t, ts, testCases)
}
// This test can be removed when the environment variable `GEO_SECONDARY_PROXY` is removed
func TestGeoProxyFeatureDisabledOnGeoSecondarySite(t *testing.T) {
// We could just not set up the primary, but then we'd have to assert
// that the internal API call isn't made. This is easier.
remoteServer, rsDeferredClose := startRemoteServer("Geo primary")
defer rsDeferredClose()
geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL)
railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody)
defer deferredClose()
ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, false)
defer wsDeferredClose()
testCases := []testCase{
{"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"},
{"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"},
{"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"},
}
runTestCases(t, ws, testCases)
}
func TestGeoProxyFeatureEnabledOnGeoSecondarySite(t *testing.T) {
remoteServer, rsDeferredClose := startRemoteServer("Geo primary")
defer rsDeferredClose()
geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL)
railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody)
defer deferredClose()
ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true)
defer wsDeferredClose()
testCases := []testCase{
{"jobs request is forwarded", "/api/v4/jobs/request", "Geo primary received request to path /api/v4/jobs/request"},
{"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"},
{"unknown route is forwarded", "/anything", "Geo primary received request to path /anything"},
}
runTestCases(t, ws, testCases)
}
// This test can be removed when the environment variable `GEO_SECONDARY_PROXY` is removed
func TestGeoProxyFeatureDisabledOnNonGeoSecondarySite(t *testing.T) {
geoProxyEndpointResponseBody := "{}"
railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody)
defer deferredClose()
ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, false)
defer wsDeferredClose()
testCases := []testCase{
{"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"},
{"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"},
{"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"},
}
runTestCases(t, ws, testCases)
}
func TestGeoProxyFeatureEnabledOnNonGeoSecondarySite(t *testing.T) {
geoProxyEndpointResponseBody := "{}"
railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody)
defer deferredClose()
ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true)
defer wsDeferredClose()
testCases := []testCase{
{"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"},
{"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"},
{"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"},
}
runTestCases(t, ws, testCases)
}
func TestGeoProxyWithAPIError(t *testing.T) {
geoProxyEndpointResponseBody := "Invalid response"
railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody)
defer deferredClose()
ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true)
defer wsDeferredClose()
testCases := []testCase{
{"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"},
{"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"},
{"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"},
}
runTestCases(t, ws, testCases)
}
func runTestCases(t *testing.T, ws *httptest.Server, testCases []testCase) {
t.Helper()
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) { t.Run(tc.desc, func(t *testing.T) {
resp, err := http.Get(ts.URL + tc.path) resp, err := http.Get(ws.URL + tc.path)
require.NoError(t, err) require.NoError(t, err)
defer resp.Body.Close() defer resp.Body.Close()
...@@ -61,7 +170,61 @@ func TestRouting(t *testing.T) { ...@@ -61,7 +170,61 @@ func TestRouting(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode, "response code") require.Equal(t, 200, resp.StatusCode, "response code")
require.Equal(t, tc.route, string(body)) require.Equal(t, tc.expectedResponse, string(body))
}) })
} }
} }
func newUpstreamConfig(authBackend string) *config.Config {
return &config.Config{
Version: "123",
DocumentRoot: testDocumentRoot,
Backend: helper.URLMustParse(authBackend),
ImageResizerConfig: config.DefaultImageResizerConfig,
}
}
func startRemoteServer(serverName string) (*httptest.Server, func()) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body := serverName + " received request to path " + r.URL.Path
w.WriteHeader(200)
fmt.Fprint(w, body)
}))
return ts, ts.Close
}
func startRailsServer(railsServerName string, geoProxyEndpointResponseBody string) (*httptest.Server, func()) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var body string
if r.URL.Path == geoProxyEndpoint {
w.Header().Set("Content-Type", "application/vnd.gitlab-workhorse+json")
body = geoProxyEndpointResponseBody
} else {
body = railsServerName + " received request to path " + r.URL.Path
}
w.WriteHeader(200)
fmt.Fprint(w, body)
}))
return ts, ts.Close
}
func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*httptest.Server, func()) {
myConfigureRoutes := func(u *upstream) {
// Enable environment variable "feature flag"
u.enableGeoProxyFeature = enableGeoProxyFeature
// call original
configureRoutes(u)
}
cfg := newUpstreamConfig(railsServerURL)
upstreamHandler := newUpstream(*cfg, logrus.StandardLogger(), myConfigureRoutes)
ws := httptest.NewServer(upstreamHandler)
testhelper.ConfigureSecret()
return ws, ws.Close
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment