Commit 8a4f8840 authored by Tomasz Maczukin's avatar Tomasz Maczukin

Add metrics for queueing mechanism

parent 7378c05b
...@@ -3,6 +3,8 @@ package queueing ...@@ -3,6 +3,8 @@ package queueing
import ( import (
"errors" "errors"
"time" "time"
"github.com/prometheus/client_golang/prometheus"
) )
type errTooManyRequests struct{ error } type errTooManyRequests struct{ error }
...@@ -11,16 +13,57 @@ type errQueueingTimedout struct{ error } ...@@ -11,16 +13,57 @@ type errQueueingTimedout struct{ error }
var ErrTooManyRequests = &errTooManyRequests{errors.New("too many requests queued")} var ErrTooManyRequests = &errTooManyRequests{errors.New("too many requests queued")}
var ErrQueueingTimedout = &errQueueingTimedout{errors.New("queueing timedout")} var ErrQueueingTimedout = &errQueueingTimedout{errors.New("queueing timedout")}
var (
queueingLimit = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_limit",
Help: "Current limit set for the queueing mechanism",
})
queueingQueueLimit = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_queue_limit",
Help: "Current queueLimit set for the queueing mechanism",
})
queueingBusy = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_busy",
Help: "How many queued requests are now processed",
})
queueingWaiting = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gitlab_workhorse_queueing_waiting",
Help: "How many requests are now queued",
})
queueingErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitlab_workhorse_queueing_errors",
Help: "How many times the TooManyRequests or QueueintTimedout errors were returned while queueing, partitioned by error type",
},
[]string{"type"},
)
)
type Queue struct { type Queue struct {
busyCh chan struct{} busyCh chan struct{}
waitingCh chan struct{} waitingCh chan struct{}
} }
func init() {
prometheus.MustRegister(queueingErrors)
prometheus.MustRegister(queueingLimit)
prometheus.MustRegister(queueingBusy)
prometheus.MustRegister(queueingWaiting)
prometheus.MustRegister(queueingQueueLimit)
}
// NewQueue creates a new queue // NewQueue creates a new queue
// limit specifies number of requests run concurrently // limit specifies number of requests run concurrently
// queueLimit specifies maximum number of requests that can be queued // queueLimit specifies maximum number of requests that can be queued
// if the number of requests is above the limit // if the number of requests is above the limit
func NewQueue(limit, queueLimit uint) *Queue { func NewQueue(limit, queueLimit uint) *Queue {
queueingLimit.Set(float64(limit))
queueingQueueLimit.Set(float64(queueLimit))
return &Queue{ return &Queue{
busyCh: make(chan struct{}, limit), busyCh: make(chan struct{}, limit),
waitingCh: make(chan struct{}, limit+queueLimit), waitingCh: make(chan struct{}, limit+queueLimit),
...@@ -35,20 +78,24 @@ func (s *Queue) Acquire(timeout time.Duration) (err error) { ...@@ -35,20 +78,24 @@ func (s *Queue) Acquire(timeout time.Duration) (err error) {
// push item to a queue to claim your own slot (non-blocking) // push item to a queue to claim your own slot (non-blocking)
select { select {
case s.waitingCh <- struct{}{}: case s.waitingCh <- struct{}{}:
queueingWaiting.Inc()
break break
default: default:
queueingErrors.WithLabelValues("too_many_requests").Inc()
return ErrTooManyRequests return ErrTooManyRequests
} }
defer func() { defer func() {
if err != nil { if err != nil {
<-s.waitingCh <-s.waitingCh
queueingWaiting.Dec()
} }
}() }()
// fast path: push item to current processed items (non-blocking) // fast path: push item to current processed items (non-blocking)
select { select {
case s.busyCh <- struct{}{}: case s.busyCh <- struct{}{}:
queueingBusy.Inc()
return nil return nil
default: default:
break break
...@@ -60,9 +107,11 @@ func (s *Queue) Acquire(timeout time.Duration) (err error) { ...@@ -60,9 +107,11 @@ func (s *Queue) Acquire(timeout time.Duration) (err error) {
// push item to current processed items (blocking) // push item to current processed items (blocking)
select { select {
case s.busyCh <- struct{}{}: case s.busyCh <- struct{}{}:
queueingBusy.Inc()
return nil return nil
case <-timer.C: case <-timer.C:
queueingErrors.WithLabelValues("queueing_timedout").Inc()
return ErrQueueingTimedout return ErrQueueingTimedout
} }
} }
...@@ -72,5 +121,7 @@ func (s *Queue) Acquire(timeout time.Duration) (err error) { ...@@ -72,5 +121,7 @@ func (s *Queue) Acquire(timeout time.Duration) (err error) {
func (s *Queue) Release() { func (s *Queue) Release() {
// dequeue from queue to allow next request to be processed // dequeue from queue to allow next request to be processed
<-s.waitingCh <-s.waitingCh
queueingWaiting.Dec()
<-s.busyCh <-s.busyCh
queueingBusy.Dec()
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment