Commit 07b40d17 authored by Tomasz Maczukin's avatar Tomasz Maczukin

Allow NewQueueMetrics() to be called multiple times

parent 6f9fa8c7
...@@ -23,7 +23,7 @@ type queueMetrics struct { ...@@ -23,7 +23,7 @@ type queueMetrics struct {
queueingErrors *prometheus.CounterVec queueingErrors *prometheus.CounterVec
} }
func NewQueueMetrics(timeout time.Duration) *queueMetrics { func NewQueueMetrics(name string, timeout time.Duration) *queueMetrics {
waitingTimeBuckets := []float64{ waitingTimeBuckets := []float64{
timeout.Seconds() * 0.01, timeout.Seconds() * 0.01,
timeout.Seconds() * 0.05, timeout.Seconds() * 0.05,
...@@ -43,31 +43,49 @@ func NewQueueMetrics(timeout time.Duration) *queueMetrics { ...@@ -43,31 +43,49 @@ func NewQueueMetrics(timeout time.Duration) *queueMetrics {
queueingLimit: prometheus.NewGauge(prometheus.GaugeOpts{ queueingLimit: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_limit", Name: "gitlab_workhorse_queueing_limit",
Help: "Current limit set for the queueing mechanism", Help: "Current limit set for the queueing mechanism",
ConstLabels: prometheus.Labels{
"queue_name": name,
},
}), }),
queueingQueueLimit: prometheus.NewGauge(prometheus.GaugeOpts{ queueingQueueLimit: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_queue_limit", Name: "gitlab_workhorse_queueing_queue_limit",
Help: "Current queueLimit set for the queueing mechanism", Help: "Current queueLimit set for the queueing mechanism",
ConstLabels: prometheus.Labels{
"queue_name": name,
},
}), }),
queueingQueueTimeout: prometheus.NewGauge(prometheus.GaugeOpts{ queueingQueueTimeout: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_queue_timeout", Name: "gitlab_workhorse_queueing_queue_timeout",
Help: "Current queueTimeout set for the queueing mechanism", Help: "Current queueTimeout set for the queueing mechanism",
ConstLabels: prometheus.Labels{
"queue_name": name,
},
}), }),
queueingBusy: prometheus.NewGauge(prometheus.GaugeOpts{ queueingBusy: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_busy", Name: "gitlab_workhorse_queueing_busy",
Help: "How many queued requests are now processed", Help: "How many queued requests are now processed",
ConstLabels: prometheus.Labels{
"queue_name": name,
},
}), }),
queueingWaiting: prometheus.NewGauge(prometheus.GaugeOpts{ queueingWaiting: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_waiting", Name: "gitlab_workhorse_queueing_waiting",
Help: "How many requests are now queued", Help: "How many requests are now queued",
ConstLabels: prometheus.Labels{
"queue_name": name,
},
}), }),
queueingWaitingTime: prometheus.NewHistogram(prometheus.HistogramOpts{ queueingWaitingTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "gitlab_workhorse_queueing_waiting_time", Name: "gitlab_workhorse_queueing_waiting_time",
Help: "How many time a request spent in queue", Help: "How many time a request spent in queue",
ConstLabels: prometheus.Labels{
"queue_name": name,
},
Buckets: waitingTimeBuckets, Buckets: waitingTimeBuckets,
}), }),
...@@ -75,6 +93,9 @@ func NewQueueMetrics(timeout time.Duration) *queueMetrics { ...@@ -75,6 +93,9 @@ func NewQueueMetrics(timeout time.Duration) *queueMetrics {
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "gitlab_workhorse_queueing_errors", Name: "gitlab_workhorse_queueing_errors",
Help: "How many times the TooManyRequests or QueueintTimedout errors were returned while queueing, partitioned by error type", Help: "How many times the TooManyRequests or QueueintTimedout errors were returned while queueing, partitioned by error type",
ConstLabels: prometheus.Labels{
"queue_name": name,
},
}, },
[]string{"type"}, []string{"type"},
), ),
...@@ -94,6 +115,7 @@ func NewQueueMetrics(timeout time.Duration) *queueMetrics { ...@@ -94,6 +115,7 @@ func NewQueueMetrics(timeout time.Duration) *queueMetrics {
type Queue struct { type Queue struct {
*queueMetrics *queueMetrics
name string
busyCh chan struct{} busyCh chan struct{}
waitingCh chan time.Time waitingCh chan time.Time
timeout time.Duration timeout time.Duration
...@@ -104,14 +126,15 @@ type Queue struct { ...@@ -104,14 +126,15 @@ type Queue struct {
// queueLimit specifies maximum number of requests that can be queued // queueLimit specifies maximum number of requests that can be queued
// timeout specifies the time limit of storing the request in the queue // timeout specifies the time limit of storing the request in the queue
// if the number of requests is above the limit // if the number of requests is above the limit
func NewQueue(limit, queueLimit uint, timeout time.Duration) *Queue { func NewQueue(name string, limit, queueLimit uint, timeout time.Duration) *Queue {
queue := &Queue{ queue := &Queue{
name: name,
busyCh: make(chan struct{}, limit), busyCh: make(chan struct{}, limit),
waitingCh: make(chan time.Time, limit+queueLimit), waitingCh: make(chan time.Time, limit+queueLimit),
timeout: timeout, timeout: timeout,
} }
queue.queueMetrics = NewQueueMetrics(timeout) queue.queueMetrics = NewQueueMetrics(name, timeout)
queue.queueingLimit.Set(float64(limit)) queue.queueingLimit.Set(float64(limit))
queue.queueingQueueLimit.Set(float64(queueLimit)) queue.queueingQueueLimit.Set(float64(queueLimit))
queue.queueingQueueTimeout.Set(timeout.Seconds()) queue.queueingQueueTimeout.Set(timeout.Seconds())
......
...@@ -6,7 +6,7 @@ import ( ...@@ -6,7 +6,7 @@ import (
) )
func TestNormalQueueing(t *testing.T) { func TestNormalQueueing(t *testing.T) {
q := NewQueue(2, 1, time.Microsecond) q := NewQueue("queue 1", 2, 1, time.Microsecond)
err1 := q.Acquire() err1 := q.Acquire()
if err1 != nil { if err1 != nil {
t.Fatal("we should acquire a new slot") t.Fatal("we should acquire a new slot")
...@@ -31,7 +31,7 @@ func TestNormalQueueing(t *testing.T) { ...@@ -31,7 +31,7 @@ func TestNormalQueueing(t *testing.T) {
} }
func TestQueueLimit(t *testing.T) { func TestQueueLimit(t *testing.T) {
q := NewQueue(1, 0, time.Microsecond) q := NewQueue("queue 2", 1, 0, time.Microsecond)
err1 := q.Acquire() err1 := q.Acquire()
if err1 != nil { if err1 != nil {
t.Fatal("we should acquire a new slot") t.Fatal("we should acquire a new slot")
...@@ -44,7 +44,7 @@ func TestQueueLimit(t *testing.T) { ...@@ -44,7 +44,7 @@ func TestQueueLimit(t *testing.T) {
} }
func TestQueueProcessing(t *testing.T) { func TestQueueProcessing(t *testing.T) {
q := NewQueue(1, 1, time.Second) q := NewQueue("queue 3", 1, 1, time.Second)
err1 := q.Acquire() err1 := q.Acquire()
if err1 != nil { if err1 != nil {
t.Fatal("we should acquire a new slot") t.Fatal("we should acquire a new slot")
......
...@@ -9,7 +9,7 @@ import ( ...@@ -9,7 +9,7 @@ import (
const DefaultTimeout = 30 * time.Second const DefaultTimeout = 30 * time.Second
func QueueRequests(h http.Handler, limit, queueLimit uint, queueTimeout time.Duration) http.Handler { func QueueRequests(name string, h http.Handler, limit, queueLimit uint, queueTimeout time.Duration) http.Handler {
if limit == 0 { if limit == 0 {
return h return h
} }
...@@ -17,7 +17,7 @@ func QueueRequests(h http.Handler, limit, queueLimit uint, queueTimeout time.Dur ...@@ -17,7 +17,7 @@ func QueueRequests(h http.Handler, limit, queueLimit uint, queueTimeout time.Dur
queueTimeout = DefaultTimeout queueTimeout = DefaultTimeout
} }
queue := NewQueue(limit, queueLimit, queueTimeout) queue := NewQueue(name, limit, queueLimit, queueTimeout)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := queue.Acquire() err := queue.Acquire()
......
...@@ -21,7 +21,7 @@ func pausedHttpHandler(pauseCh chan struct{}) http.Handler { ...@@ -21,7 +21,7 @@ func pausedHttpHandler(pauseCh chan struct{}) http.Handler {
func TestNormalRequestProcessing(t *testing.T) { func TestNormalRequestProcessing(t *testing.T) {
w := httptest.NewRecorder() w := httptest.NewRecorder()
h := QueueRequests(httpHandler, 1, 1, time.Second) h := QueueRequests("Normal request processing", httpHandler, 1, 1, time.Second)
h.ServeHTTP(w, nil) h.ServeHTTP(w, nil)
if w.Code != 200 { if w.Code != 200 {
t.Fatal("QueueRequests should process request") t.Fatal("QueueRequests should process request")
...@@ -32,11 +32,11 @@ func TestNormalRequestProcessing(t *testing.T) { ...@@ -32,11 +32,11 @@ func TestNormalRequestProcessing(t *testing.T) {
// then it runs a number of requests that are going through queue, // then it runs a number of requests that are going through queue,
// we return the response of first finished request, // we return the response of first finished request,
// where status of request can be 200, 429 or 503 // where status of request can be 200, 429 or 503
func testSlowRequestProcessing(count int, limit, queueLimit uint, queueTimeout time.Duration) *httptest.ResponseRecorder { func testSlowRequestProcessing(name string, count int, limit, queueLimit uint, queueTimeout time.Duration) *httptest.ResponseRecorder {
pauseCh := make(chan struct{}) pauseCh := make(chan struct{})
defer close(pauseCh) defer close(pauseCh)
handler := QueueRequests(pausedHttpHandler(pauseCh), limit, queueLimit, queueTimeout) handler := QueueRequests("Slow request processing: "+name, pausedHttpHandler(pauseCh), limit, queueLimit, queueTimeout)
respCh := make(chan *httptest.ResponseRecorder, count) respCh := make(chan *httptest.ResponseRecorder, count)
...@@ -57,7 +57,7 @@ func testSlowRequestProcessing(count int, limit, queueLimit uint, queueTimeout t ...@@ -57,7 +57,7 @@ func testSlowRequestProcessing(count int, limit, queueLimit uint, queueTimeout t
// the queue limit and length is 1, // the queue limit and length is 1,
// the second request gets timed-out // the second request gets timed-out
func TestQueueingTimeout(t *testing.T) { func TestQueueingTimeout(t *testing.T) {
w := testSlowRequestProcessing(2, 1, 1, time.Microsecond) w := testSlowRequestProcessing("timeout", 2, 1, 1, time.Microsecond)
if w.Code != 503 { if w.Code != 503 {
t.Fatal("QueueRequests should timeout queued request") t.Fatal("QueueRequests should timeout queued request")
...@@ -68,7 +68,7 @@ func TestQueueingTimeout(t *testing.T) { ...@@ -68,7 +68,7 @@ func TestQueueingTimeout(t *testing.T) {
// the queue limit and length is 1, // the queue limit and length is 1,
// so the third request has to be rejected with 429 // so the third request has to be rejected with 429
func TestQueueingTooManyRequests(t *testing.T) { func TestQueueingTooManyRequests(t *testing.T) {
w := testSlowRequestProcessing(3, 1, 1, time.Minute) w := testSlowRequestProcessing("too many requests", 3, 1, 1, time.Minute)
if w.Code != 429 { if w.Code != 429 {
t.Fatal("QueueRequests should return immediately and return too many requests") t.Fatal("QueueRequests should return immediately and return too many requests")
......
...@@ -119,7 +119,7 @@ func (u *Upstream) configureRoutes() { ...@@ -119,7 +119,7 @@ func (u *Upstream) configureRoutes() {
) )
uploadAccelerateProxy := upload.Accelerate(path.Join(u.DocumentRoot, "uploads/tmp"), proxy) uploadAccelerateProxy := upload.Accelerate(path.Join(u.DocumentRoot, "uploads/tmp"), proxy)
ciAPIProxyQueue := queueing.QueueRequests(uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout) ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration) ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration)
u.Routes = []routeEntry{ u.Routes = []routeEntry{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment