Commit 9026a353 authored by Jacob Vosmaer's avatar Jacob Vosmaer

Merge branch 'cat-workhorse-geo-unset-url-panic' into 'master'

Prevent Workhorse panics when Geo proxy URL is unset

See merge request gitlab-org/gitlab!71285
parents 9d1e7ef8 b3d88bda
...@@ -50,7 +50,7 @@ type upstream struct { ...@@ -50,7 +50,7 @@ type upstream struct {
geoLocalRoutes []routeEntry geoLocalRoutes []routeEntry
geoProxyCableRoute routeEntry geoProxyCableRoute routeEntry
geoProxyRoute routeEntry geoProxyRoute routeEntry
geoProxyTestChannel chan struct{} geoProxyPollSleep func(time.Duration)
accessLogger *logrus.Logger accessLogger *logrus.Logger
enableGeoProxyFeature bool enableGeoProxyFeature bool
mu sync.RWMutex mu sync.RWMutex
...@@ -68,6 +68,9 @@ func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback ...@@ -68,6 +68,9 @@ func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback
enableGeoProxyFeature: os.Getenv("GEO_SECONDARY_PROXY") == "1", enableGeoProxyFeature: os.Getenv("GEO_SECONDARY_PROXY") == "1",
geoProxyBackend: &url.URL{}, geoProxyBackend: &url.URL{},
} }
if up.geoProxyPollSleep == nil {
up.geoProxyPollSleep = time.Sleep
}
if up.Backend == nil { if up.Backend == nil {
up.Backend = DefaultBackend up.Backend = DefaultBackend
} }
...@@ -205,13 +208,7 @@ func (u *upstream) findGeoProxyRoute(cleanedPath string, r *http.Request) *route ...@@ -205,13 +208,7 @@ func (u *upstream) findGeoProxyRoute(cleanedPath string, r *http.Request) *route
func (u *upstream) pollGeoProxyAPI() { func (u *upstream) pollGeoProxyAPI() {
for { for {
u.callGeoProxyAPI() u.callGeoProxyAPI()
u.geoProxyPollSleep(geoProxyApiPollingInterval)
// Notify tests when callGeoProxyAPI() finishes
if u.geoProxyTestChannel != nil {
u.geoProxyTestChannel <- struct{}{}
}
time.Sleep(geoProxyApiPollingInterval)
} }
} }
...@@ -234,6 +231,11 @@ func (u *upstream) updateGeoProxyFields(geoProxyURL *url.URL) { ...@@ -234,6 +231,11 @@ func (u *upstream) updateGeoProxyFields(geoProxyURL *url.URL) {
defer u.mu.Unlock() defer u.mu.Unlock()
u.geoProxyBackend = geoProxyURL u.geoProxyBackend = geoProxyURL
if u.geoProxyBackend.String() == "" {
return
}
geoProxyRoundTripper := roundtripper.NewBackendRoundTripper(u.geoProxyBackend, "", u.ProxyHeadersTimeout, u.DevelopmentMode) geoProxyRoundTripper := roundtripper.NewBackendRoundTripper(u.geoProxyBackend, "", u.ProxyHeadersTimeout, u.DevelopmentMode)
geoProxyUpstream := proxypkg.NewProxy(u.geoProxyBackend, u.Version, geoProxyRoundTripper) geoProxyUpstream := proxypkg.NewProxy(u.geoProxyBackend, u.Version, geoProxyRoundTripper)
u.geoProxyCableRoute = u.wsRoute(`^/-/cable\z`, geoProxyUpstream) u.geoProxyCableRoute = u.wsRoute(`^/-/cable\z`, geoProxyUpstream)
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"time"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -71,10 +72,10 @@ func TestGeoProxyFeatureDisabledOnGeoSecondarySite(t *testing.T) { ...@@ -71,10 +72,10 @@ func TestGeoProxyFeatureDisabledOnGeoSecondarySite(t *testing.T) {
defer rsDeferredClose() defer rsDeferredClose()
geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL) geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL)
railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
defer deferredClose() defer deferredClose()
ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, false) ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, false)
defer wsDeferredClose() defer wsDeferredClose()
testCases := []testCase{ testCases := []testCase{
...@@ -91,10 +92,10 @@ func TestGeoProxyFeatureEnabledOnGeoSecondarySite(t *testing.T) { ...@@ -91,10 +92,10 @@ func TestGeoProxyFeatureEnabledOnGeoSecondarySite(t *testing.T) {
defer rsDeferredClose() defer rsDeferredClose()
geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL) geoProxyEndpointResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL)
railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
defer deferredClose() defer deferredClose()
ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true) ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true)
defer wsDeferredClose() defer wsDeferredClose()
testCases := []testCase{ testCases := []testCase{
...@@ -109,10 +110,10 @@ func TestGeoProxyFeatureEnabledOnGeoSecondarySite(t *testing.T) { ...@@ -109,10 +110,10 @@ func TestGeoProxyFeatureEnabledOnGeoSecondarySite(t *testing.T) {
// This test can be removed when the environment variable `GEO_SECONDARY_PROXY` is removed // This test can be removed when the environment variable `GEO_SECONDARY_PROXY` is removed
func TestGeoProxyFeatureDisabledOnNonGeoSecondarySite(t *testing.T) { func TestGeoProxyFeatureDisabledOnNonGeoSecondarySite(t *testing.T) {
geoProxyEndpointResponseBody := "{}" geoProxyEndpointResponseBody := "{}"
railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
defer deferredClose() defer deferredClose()
ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, false) ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, false)
defer wsDeferredClose() defer wsDeferredClose()
testCases := []testCase{ testCases := []testCase{
...@@ -126,10 +127,10 @@ func TestGeoProxyFeatureDisabledOnNonGeoSecondarySite(t *testing.T) { ...@@ -126,10 +127,10 @@ func TestGeoProxyFeatureDisabledOnNonGeoSecondarySite(t *testing.T) {
func TestGeoProxyFeatureEnabledOnNonGeoSecondarySite(t *testing.T) { func TestGeoProxyFeatureEnabledOnNonGeoSecondarySite(t *testing.T) {
geoProxyEndpointResponseBody := "{}" geoProxyEndpointResponseBody := "{}"
railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
defer deferredClose() defer deferredClose()
ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true) ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true)
defer wsDeferredClose() defer wsDeferredClose()
testCases := []testCase{ testCases := []testCase{
...@@ -143,10 +144,10 @@ func TestGeoProxyFeatureEnabledOnNonGeoSecondarySite(t *testing.T) { ...@@ -143,10 +144,10 @@ func TestGeoProxyFeatureEnabledOnNonGeoSecondarySite(t *testing.T) {
func TestGeoProxyFeatureEnabledButWithAPIError(t *testing.T) { func TestGeoProxyFeatureEnabledButWithAPIError(t *testing.T) {
geoProxyEndpointResponseBody := "Invalid response" geoProxyEndpointResponseBody := "Invalid response"
railsServer, deferredClose := startRailsServer("Local Rails server", geoProxyEndpointResponseBody) railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
defer deferredClose() defer deferredClose()
ws, wsDeferredClose := startWorkhorseServer(railsServer.URL, true) ws, wsDeferredClose, _ := startWorkhorseServer(railsServer.URL, true)
defer wsDeferredClose() defer wsDeferredClose()
testCases := []testCase{ testCases := []testCase{
...@@ -158,6 +159,49 @@ func TestGeoProxyFeatureEnabledButWithAPIError(t *testing.T) { ...@@ -158,6 +159,49 @@ func TestGeoProxyFeatureEnabledButWithAPIError(t *testing.T) {
runTestCases(t, ws, testCases) runTestCases(t, ws, testCases)
} }
func TestGeoProxyFeatureEnablingAndDisabling(t *testing.T) {
remoteServer, rsDeferredClose := startRemoteServer("Geo primary")
defer rsDeferredClose()
geoProxyEndpointEnabledResponseBody := fmt.Sprintf(`{"geo_proxy_url":"%v"}`, remoteServer.URL)
geoProxyEndpointDisabledResponseBody := "{}"
geoProxyEndpointResponseBody := geoProxyEndpointEnabledResponseBody
railsServer, deferredClose := startRailsServer("Local Rails server", &geoProxyEndpointResponseBody)
defer deferredClose()
ws, wsDeferredClose, waitForNextApiPoll := startWorkhorseServer(railsServer.URL, true)
defer wsDeferredClose()
testCasesLocal := []testCase{
{"jobs request is served locally", "/api/v4/jobs/request", "Local Rails server received request to path /api/v4/jobs/request"},
{"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"},
{"unknown route is served locally", "/anything", "Local Rails server received request to path /anything"},
}
testCasesProxied := []testCase{
{"jobs request is forwarded", "/api/v4/jobs/request", "Geo primary received request to path /api/v4/jobs/request"},
{"health check is served locally", "/-/health", "Local Rails server received request to path /-/health"},
{"unknown route is forwarded", "/anything", "Geo primary received request to path /anything"},
}
// Enabled initially, run tests
runTestCases(t, ws, testCasesProxied)
// Disable proxying and run tests. It's safe to write to
// geoProxyEndpointResponseBody because the polling goroutine is blocked.
geoProxyEndpointResponseBody = geoProxyEndpointDisabledResponseBody
waitForNextApiPoll()
runTestCases(t, ws, testCasesLocal)
// Re-enable proxying and run tests
geoProxyEndpointResponseBody = geoProxyEndpointEnabledResponseBody
waitForNextApiPoll()
runTestCases(t, ws, testCasesProxied)
}
func runTestCases(t *testing.T, ws *httptest.Server, testCases []testCase) { func runTestCases(t *testing.T, ws *httptest.Server, testCases []testCase) {
t.Helper() t.Helper()
for _, tc := range testCases { for _, tc := range testCases {
...@@ -195,13 +239,13 @@ func startRemoteServer(serverName string) (*httptest.Server, func()) { ...@@ -195,13 +239,13 @@ func startRemoteServer(serverName string) (*httptest.Server, func()) {
return ts, ts.Close return ts, ts.Close
} }
func startRailsServer(railsServerName string, geoProxyEndpointResponseBody string) (*httptest.Server, func()) { func startRailsServer(railsServerName string, geoProxyEndpointResponseBody *string) (*httptest.Server, func()) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var body string var body string
if r.URL.Path == geoProxyEndpoint { if r.URL.Path == geoProxyEndpoint {
w.Header().Set("Content-Type", "application/vnd.gitlab-workhorse+json") w.Header().Set("Content-Type", "application/vnd.gitlab-workhorse+json")
body = geoProxyEndpointResponseBody body = *geoProxyEndpointResponseBody
} else { } else {
body = railsServerName + " received request to path " + r.URL.Path body = railsServerName + " received request to path " + r.URL.Path
} }
...@@ -213,15 +257,19 @@ func startRailsServer(railsServerName string, geoProxyEndpointResponseBody strin ...@@ -213,15 +257,19 @@ func startRailsServer(railsServerName string, geoProxyEndpointResponseBody strin
return ts, ts.Close return ts, ts.Close
} }
func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*httptest.Server, func()) { func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*httptest.Server, func(), func()) {
geoProxyTestChannel := make(chan struct{}) geoProxySleepC := make(chan struct{})
geoProxySleep := func(time.Duration) {
geoProxySleepC <- struct{}{}
<-geoProxySleepC
}
myConfigureRoutes := func(u *upstream) { myConfigureRoutes := func(u *upstream) {
// Enable environment variable "feature flag" // Enable environment variable "feature flag"
u.enableGeoProxyFeature = enableGeoProxyFeature u.enableGeoProxyFeature = enableGeoProxyFeature
// An empty message will be sent to this channel after every callGeoProxyAPI() // Replace the time.Sleep function with geoProxySleep
u.geoProxyTestChannel = geoProxyTestChannel u.geoProxyPollSleep = geoProxySleep
// call original // call original
configureRoutes(u) configureRoutes(u)
...@@ -231,13 +279,20 @@ func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*h ...@@ -231,13 +279,20 @@ func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*h
ws := httptest.NewServer(upstreamHandler) ws := httptest.NewServer(upstreamHandler)
testhelper.ConfigureSecret() testhelper.ConfigureSecret()
waitForNextApiPoll := func() {}
if enableGeoProxyFeature { if enableGeoProxyFeature {
// Wait for an empty message from callGeoProxyAPI(). This should be done on // Wait for geoProxySleep to be entered for the first time
// all tests where enableGeoProxyFeature is true, including the ones where <-geoProxySleepC
// we expect geoProxyURL to be nil or error, to ensure the tests do not pass
// by coincidence. waitForNextApiPoll = func() {
<-geoProxyTestChannel // Cause geoProxySleep to return
geoProxySleepC <- struct{}{}
// Wait for geoProxySleep to be entered again
<-geoProxySleepC
}
} }
return ws, ws.Close return ws, ws.Close, waitForNextApiPoll
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment