Commit 402e1c9c authored by Tomasz Maczukin's avatar Tomasz Maczukin

Unexport newQueue and newQueueMetrics; improve function documentation

parent 07b40d17
...@@ -23,7 +23,13 @@ type queueMetrics struct { ...@@ -23,7 +23,13 @@ type queueMetrics struct {
queueingErrors *prometheus.CounterVec queueingErrors *prometheus.CounterVec
} }
func NewQueueMetrics(name string, timeout time.Duration) *queueMetrics { // newQueueMetrics prepares Prometheus metrics for queueing mechanism
// name specifies name of the queue, used to label metrics with ConstLabel `queue_name`
// Don't call newQueueMetrics twice with the same name argument!
// timeout specifies the timeout of storing a request in queue - queueMetrics
// uses it to calculate histogram buckets for gitlab_workhorse_queueing_waiting_time
// metric
func newQueueMetrics(name string, timeout time.Duration) *queueMetrics {
waitingTimeBuckets := []float64{ waitingTimeBuckets := []float64{
timeout.Seconds() * 0.01, timeout.Seconds() * 0.01,
timeout.Seconds() * 0.05, timeout.Seconds() * 0.05,
...@@ -121,12 +127,14 @@ type Queue struct { ...@@ -121,12 +127,14 @@ type Queue struct {
timeout time.Duration timeout time.Duration
} }
// NewQueue creates a new queue // newQueue creates a new queue
// name specifies name used to label queue metrics.
// Don't call newQueue twice with the same name argument!
// limit specifies number of requests run concurrently // limit specifies number of requests run concurrently
// queueLimit specifies maximum number of requests that can be queued // queueLimit specifies maximum number of requests that can be queued
// timeout specifies the time limit of storing the request in the queue // timeout specifies the time limit of storing the request in the queue
// if the number of requests is above the limit // if the number of requests is above the limit
func NewQueue(name string, limit, queueLimit uint, timeout time.Duration) *Queue { func newQueue(name string, limit, queueLimit uint, timeout time.Duration) *Queue {
queue := &Queue{ queue := &Queue{
name: name, name: name,
busyCh: make(chan struct{}, limit), busyCh: make(chan struct{}, limit),
...@@ -134,7 +142,7 @@ func NewQueue(name string, limit, queueLimit uint, timeout time.Duration) *Queue ...@@ -134,7 +142,7 @@ func NewQueue(name string, limit, queueLimit uint, timeout time.Duration) *Queue
timeout: timeout, timeout: timeout,
} }
queue.queueMetrics = NewQueueMetrics(name, timeout) queue.queueMetrics = newQueueMetrics(name, timeout)
queue.queueingLimit.Set(float64(limit)) queue.queueingLimit.Set(float64(limit))
queue.queueingQueueLimit.Set(float64(queueLimit)) queue.queueingQueueLimit.Set(float64(queueLimit))
queue.queueingQueueTimeout.Set(timeout.Seconds()) queue.queueingQueueTimeout.Set(timeout.Seconds())
......
...@@ -6,7 +6,7 @@ import ( ...@@ -6,7 +6,7 @@ import (
) )
func TestNormalQueueing(t *testing.T) { func TestNormalQueueing(t *testing.T) {
q := NewQueue("queue 1", 2, 1, time.Microsecond) q := newQueue("queue 1", 2, 1, time.Microsecond)
err1 := q.Acquire() err1 := q.Acquire()
if err1 != nil { if err1 != nil {
t.Fatal("we should acquire a new slot") t.Fatal("we should acquire a new slot")
...@@ -31,7 +31,7 @@ func TestNormalQueueing(t *testing.T) { ...@@ -31,7 +31,7 @@ func TestNormalQueueing(t *testing.T) {
} }
func TestQueueLimit(t *testing.T) { func TestQueueLimit(t *testing.T) {
q := NewQueue("queue 2", 1, 0, time.Microsecond) q := newQueue("queue 2", 1, 0, time.Microsecond)
err1 := q.Acquire() err1 := q.Acquire()
if err1 != nil { if err1 != nil {
t.Fatal("we should acquire a new slot") t.Fatal("we should acquire a new slot")
...@@ -44,7 +44,7 @@ func TestQueueLimit(t *testing.T) { ...@@ -44,7 +44,7 @@ func TestQueueLimit(t *testing.T) {
} }
func TestQueueProcessing(t *testing.T) { func TestQueueProcessing(t *testing.T) {
q := NewQueue("queue 3", 1, 1, time.Second) q := newQueue("queue 3", 1, 1, time.Second)
err1 := q.Acquire() err1 := q.Acquire()
if err1 != nil { if err1 != nil {
t.Fatal("we should acquire a new slot") t.Fatal("we should acquire a new slot")
......
...@@ -9,6 +9,13 @@ import ( ...@@ -9,6 +9,13 @@ import (
const DefaultTimeout = 30 * time.Second const DefaultTimeout = 30 * time.Second
// QueueRequests creates a new request queue
// name specifies the name of queue, used to label Prometheus metrics
// Don't call QueueRequests twice with the same name argument!
// h specifies a http.Handler which will handle the queue requests
// limit specifies number of requests run concurrently
// queueLimit specifies maximum number of requests that can be queued
// queueTimeout specifies the time limit of storing the request in the queue
func QueueRequests(name string, h http.Handler, limit, queueLimit uint, queueTimeout time.Duration) http.Handler { func QueueRequests(name string, h http.Handler, limit, queueLimit uint, queueTimeout time.Duration) http.Handler {
if limit == 0 { if limit == 0 {
return h return h
...@@ -17,7 +24,7 @@ func QueueRequests(name string, h http.Handler, limit, queueLimit uint, queueTim ...@@ -17,7 +24,7 @@ func QueueRequests(name string, h http.Handler, limit, queueLimit uint, queueTim
queueTimeout = DefaultTimeout queueTimeout = DefaultTimeout
} }
queue := NewQueue(name, limit, queueLimit, queueTimeout) queue := newQueue(name, limit, queueLimit, queueTimeout)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := queue.Acquire() err := queue.Acquire()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment