Commit 82b82457 authored by Kamil Trzcinski's avatar Kamil Trzcinski

Allow to queue API requests and limit given capacity

Expose three options to configure queueing limits:
- apiLimit
- apiQueueLimit
- apiQueueTimeout
parent 0b970386
......@@ -21,6 +21,18 @@ func LogError(r *http.Request, err error) {
printError(r, err)
}
func ServiceUnavailable(w http.ResponseWriter, r *http.Request, err error) {
http.Error(w, "Service Unavailable", http.StatusServiceUnavailable)
captureRavenError(r, err)
printError(r, err)
}
func TooManyRequests(w http.ResponseWriter, r *http.Request, err error) {
http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
captureRavenError(r, err)
printError(r, err)
}
func printError(r *http.Request, err error) {
if r != nil {
log.Printf("error: %s %q: %v", r.Method, r.RequestURI, err)
......
package queueing
import (
"errors"
"time"
)
type errTooManyRequests struct{ error }
type errQueueingTimedout struct{ error }
var ErrTooManyRequests = &errTooManyRequests{errors.New("too many requests queued")}
var ErrQueueingTimedout = &errQueueingTimedout{errors.New("queueing timedout")}
type Queue struct {
busyCh chan struct{}
waitingCh chan struct{}
}
// NewQueue creates a new queue
// limit specifies number of requests run concurrently
// queueLimit specifies maximum number of requests that can be queued
// if the number of requests is above the limit
func NewQueue(limit, queueLimit int) *Queue {
return &Queue{
busyCh: make(chan struct{}, limit),
waitingCh: make(chan struct{}, queueLimit),
}
}
// Acquire takes one slot from the Queue
// and returns when a request should be processed
// it allows up to (limit) of requests running at a time
// it allows to queue up to (queue-limit) requests
func (s *Queue) Acquire(timeout time.Duration) (err error) {
// push item to a queue to claim your own slot (non-blocking)
select {
case s.waitingCh <- struct{}{}:
break
default:
return ErrTooManyRequests
}
defer func() {
if err != nil {
<-s.waitingCh
}
}()
// fast path: push item to current processed items (non-blocking)
select {
case s.busyCh <- struct{}{}:
return nil
default:
break
}
timer := time.NewTimer(timeout)
defer timer.Stop()
// push item to current processed items (blocking)
select {
case s.busyCh <- struct{}{}:
return nil
case <-timer.C:
return ErrQueueingTimedout
}
}
// Release marks the finish of processing of requests
// It triggers next request to be processed if it's in queue
func (s *Queue) Release() {
// dequeue from queue to allow next request to be processed
<-s.waitingCh
<-s.busyCh
}
package queueing_test
import (
"testing"
"time"
. "gitlab.com/gitlab-org/gitlab-workhorse/internal/queueing"
)
func TestNormalQueueing(t *testing.T) {
q := NewQueue(2, 3)
err1 := q.Acquire(time.Microsecond)
if err1 != nil {
t.Fatal("we should acquire a new slot")
}
err2 := q.Acquire(time.Microsecond)
if err2 != nil {
t.Fatal("we should acquire a new slot")
}
err3 := q.Acquire(time.Microsecond)
if err3 != ErrQueueingTimedout {
t.Fatal("we should timeout")
}
q.Release()
err4 := q.Acquire(time.Microsecond)
if err4 != nil {
t.Fatal("we should acquire a new slot")
}
}
func TestQueueLimit(t *testing.T) {
q := NewQueue(1, 1)
err1 := q.Acquire(time.Microsecond)
if err1 != nil {
t.Fatal("we should acquire a new slot")
}
err2 := q.Acquire(time.Microsecond)
if err2 != ErrTooManyRequests {
t.Fatal("we should fail because of not enough slots in queue")
}
}
func TestQueueProcessing(t *testing.T) {
q := NewQueue(1, 2)
err1 := q.Acquire(time.Microsecond)
if err1 != nil {
t.Fatal("we should acquire a new slot")
}
go func() {
time.Sleep(50 * time.Microsecond)
q.Release()
}()
err2 := q.Acquire(time.Second)
if err2 != nil {
t.Fatal("we should acquire slot after the previous one finished")
}
}
package queueing
import (
"net/http"
"time"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
const DefaultTimeout = 30 * time.Second
func QueueRequests(h http.Handler, limit, queueLimit int, queueTimeout time.Duration) http.Handler {
if queueLimit == 0 || limit == 0 {
return h
}
if queueTimeout == 0 {
queueTimeout = DefaultTimeout
}
queue := NewQueue(limit, queueLimit)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := queue.Acquire(queueTimeout)
switch err {
case nil:
defer queue.Release()
h.ServeHTTP(w, r)
case ErrTooManyRequests:
helper.TooManyRequests(w, r, err)
case ErrQueueingTimedout:
helper.ServiceUnavailable(w, r, err)
default:
helper.Fail500(w, r, err)
}
})
}
package queueing_test
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
. "gitlab.com/gitlab-org/gitlab-workhorse/internal/queueing"
)
var httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "OK")
})
func slowHttpHandler(closeCh chan struct{}) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-closeCh
fmt.Fprintln(w, "OK")
})
}
func TestQueueRequests(t *testing.T) {
w := httptest.NewRecorder()
h := QueueRequests(httpHandler, 1, 2, time.Second)
h.ServeHTTP(w, nil)
if w.Code != 200 {
t.Fatal("QueueRequests should process request")
}
}
func testSlowRequestProcessing(count, limit, queueLimit int, queueTimeout time.Duration) *httptest.ResponseRecorder {
closeCh := make(chan struct{})
defer close(closeCh)
handler := QueueRequests(slowHttpHandler(closeCh), limit, queueLimit, queueTimeout)
respCh := make(chan *httptest.ResponseRecorder, count)
// queue requests to use up the queue
for count > 0 {
go func() {
w := httptest.NewRecorder()
handler.ServeHTTP(w, nil)
respCh <- w
}()
count--
}
// dequeue first request
return <-respCh
}
func TestQueueingTimeout(t *testing.T) {
w := testSlowRequestProcessing(2, 1, 2, time.Microsecond)
if w.Code != 503 {
t.Fatal("QueueRequests should timeout queued request")
}
}
func TestQueuedRequests(t *testing.T) {
w := testSlowRequestProcessing(3, 1, 2, time.Minute)
if w.Code != 429 {
t.Fatal("QueueRequests should return immediately and return too many requests")
}
}
......@@ -9,6 +9,7 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/git"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/lfs"
proxypkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/proxy"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/queueing"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/senddata"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/sendfile"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/staticpages"
......@@ -55,6 +56,7 @@ func (u *Upstream) configureRoutes() {
git.SendPatch,
artifacts.SendEntry,
)
apiProxyQueue := queueing.QueueRequests(proxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
u.Routes = []route{
// Git Clone
......@@ -67,8 +69,8 @@ func (u *Upstream) configureRoutes() {
route{"POST", regexp.MustCompile(ciAPIPattern + `v1/builds/[0-9]+/artifacts\z`), contentEncodingHandler(artifacts.UploadArtifacts(api, proxy))},
// Explicitly proxy API requests
route{"", regexp.MustCompile(apiPattern), proxy},
route{"", regexp.MustCompile(ciAPIPattern), proxy},
route{"", regexp.MustCompile(apiPattern), apiProxyQueue},
route{"", regexp.MustCompile(ciAPIPattern), apiProxyQueue},
// Serve assets
route{"", regexp.MustCompile(`^/assets/`),
......
......@@ -20,30 +20,34 @@ import (
var DefaultBackend = helper.URLMustParse("http://localhost:8080")
type Upstream struct {
Backend *url.URL
Version string
SecretPath string
DocumentRoot string
DevelopmentMode bool
type Config struct {
Backend *url.URL
Version string
SecretPath string
DocumentRoot string
DevelopmentMode bool
Socket string
ProxyHeadersTimeout time.Duration
APILimit int
APIQueueLimit int
APIQueueTimeout time.Duration
}
type Upstream struct {
Config
URLPrefix urlprefix.Prefix
Routes []route
RoundTripper *badgateway.RoundTripper
}
func NewUpstream(backend *url.URL, socket, version, secretFile, documentRoot string, developmentMode bool, proxyHeadersTimeout time.Duration) *Upstream {
func NewUpstream(config Config) *Upstream {
up := Upstream{
Backend: backend,
Version: version,
SecretPath: secretFile,
DocumentRoot: documentRoot,
DevelopmentMode: developmentMode,
Config: config,
}
if backend == nil {
if up.Backend == nil {
up.Backend = DefaultBackend
}
up.RoundTripper = badgateway.NewRoundTripper(up.Backend, socket, proxyHeadersTimeout)
up.RoundTripper = badgateway.NewRoundTripper(up.Backend, up.Socket, up.ProxyHeadersTimeout)
up.configureURLPrefix()
up.configureRoutes()
return &up
......
......@@ -24,6 +24,7 @@ import (
"syscall"
"time"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/queueing"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/upstream"
)
......@@ -41,6 +42,9 @@ var documentRoot = flag.String("documentRoot", "public", "Path to static files c
var proxyHeadersTimeout = flag.Duration("proxyHeadersTimeout", 5*time.Minute, "How long to wait for response headers when proxying the request")
var developmentMode = flag.Bool("developmentMode", false, "Allow to serve assets from Rails app")
var secretPath = flag.String("secretPath", "./.gitlab_workhorse_secret", "File with secret key to authenticate with authBackend")
var apiLimit = flag.Int("apiLimit", 0, "Number of API requests allowed at single time")
var apiQueueLimit = flag.Int("apiQueueLimit", 0, "Number of API requests allowed to be queued")
var apiQueueTimeout = flag.Duration("apiQueueDuration", queueing.DefaultTimeout, "Maximum queueing duration of requests")
func main() {
flag.Usage = func() {
......@@ -89,16 +93,20 @@ func main() {
}()
}
up := wrapRaven(
upstream.NewUpstream(
backendURL,
*authSocket,
Version,
*secretPath,
*documentRoot,
*developmentMode,
*proxyHeadersTimeout,
))
upConfig := upstream.Config{
Backend: backendURL,
Socket: *authSocket,
Version: Version,
SecretPath: *secretPath,
DocumentRoot: *documentRoot,
DevelopmentMode: *developmentMode,
ProxyHeadersTimeout: *proxyHeadersTimeout,
APILimit: *apiLimit,
APIQueueLimit: *apiQueueLimit,
APIQueueTimeout: *apiQueueTimeout,
}
up := wrapRaven(upstream.NewUpstream(upConfig))
log.Fatal(http.Serve(listener, up))
}
......@@ -868,15 +868,14 @@ func archiveOKServer(t *testing.T, archiveName string) *httptest.Server {
}
func startWorkhorseServer(authBackend string) *httptest.Server {
u := upstream.NewUpstream(
helper.URLMustParse(authBackend),
"",
"123",
testhelper.SecretPath(),
testDocumentRoot,
false,
0,
)
config := upstream.Config{
Backend: helper.URLMustParse(authBackend),
Version: "123",
SecretPath: testhelper.SecretPath(),
DocumentRoot: testDocumentRoot,
}
u := upstream.NewUpstream(config)
return httptest.NewServer(u)
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment