Commit 931ff815 authored by Yorick Peterse's avatar Yorick Peterse

Re-organize queues to use for Sidekiq

Dumping too many jobs in the same queue (e.g. the "default" queue) is a
dangerous setup. Jobs that take a long time to process can effectively
block any other work from being performed given there are enough of
these jobs.

Furthermore it becomes harder to monitor the jobs as a single queue
could contain jobs for different workers. In such a setup the only
reliable way of getting counts per job is to iterate over all jobs in a
queue, which is a rather time consuming process.

By using separate queues for various workers we have better control over
throughput, we can add weight to queues, and we can monitor queues
better. Some workers still use the same queue whenever their work is
related. For example, the various CI pipeline workers use the same
"pipeline" queue.

This commit includes a Rails migration that moves Sidekiq jobs from the
old queues to the new ones. This migration also takes care of doing the
inverse if ever needed. This does require downtime as otherwise new jobs
could be scheduled in the old queues after this migration completes.

This commit also includes an RSpec test that blacklists the use of the
"default" queue and ensures cron workers use the "cronjob" queue.

Fixes gitlab-org/gitlab-ce#23370
parent c41ed4d1
......@@ -2,6 +2,9 @@ Please view this file on the master branch, on stable branches it's out of date.
## 8.13.0 (2016-10-22)
- Fix save button on project pipeline settings page. (!6955)
- All Sidekiq workers now use their own queue
- Avoid race condition when asynchronously removing expired artifacts. (!6881)
- Improve Merge When Build Succeeds triggers and execute on pipeline success. (!6675)
- Respond with 404 Not Found for non-existent tags (Linus Thiel)
- Truncate long labels with ellipsis in labels page
......
class AdminEmailWorker
include Sidekiq::Worker
sidekiq_options retry: false # this job auto-repeats via sidekiq-cron
include CronjobQueue
def perform
repository_check_failed_count = Project.where(last_repository_check_failed: true).count
......
class BuildCoverageWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include BuildQueue
def perform(build_id)
Ci::Build.find_by(id: build_id)
......
class BuildEmailWorker
include Sidekiq::Worker
include BuildQueue
def perform(build_id, recipients, push_data)
recipients.each do |recipient|
......
class BuildFinishedWorker
include Sidekiq::Worker
include BuildQueue
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
......
class BuildHooksWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include BuildQueue
def perform(build_id)
Ci::Build.find_by(id: build_id)
......
class BuildSuccessWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include BuildQueue
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
......
# This worker clears all cache fields in the database, working in batches.
class ClearDatabaseCacheWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
BATCH_SIZE = 1000
......
# Concern for setting Sidekiq settings for the various CI build workers.
module BuildQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: :build
end
end
# Concern that sets various Sidekiq settings for workers executed using a
# cronjob.
module CronjobQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: :cronjob, retry: false
end
end
# Concern that sets the queue of a Sidekiq worker based on the worker's class
# name/namespace.
module DedicatedSidekiqQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: name.sub(/Worker\z/, '').underscore.tr('/', '_')
end
end
# Concern for setting Sidekiq settings for the various CI pipeline workers.
module PipelineQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: :pipeline
end
end
# Concern for setting Sidekiq settings for the various repository check workers.
module RepositoryCheckQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: :repository_check, retry: false
end
end
class DeleteUserWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
def perform(current_user_id, delete_user_id, options = {})
delete_user = User.find(delete_user_id)
......
class EmailReceiverWorker
include Sidekiq::Worker
sidekiq_options queue: :incoming_email
include DedicatedSidekiqQueue
def perform(raw)
return unless Gitlab::IncomingEmail.enabled?
......
class EmailsOnPushWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
sidekiq_options queue: :mailers
attr_reader :email, :skip_premailer
def perform(project_id, recipients, push_data, options = {})
......
class ExpireBuildArtifactsWorker
include Sidekiq::Worker
include CronjobQueue
def perform
Rails.logger.info 'Scheduling removal of build artifacts'
......
class ExpireBuildInstanceArtifactsWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
def perform(build_id)
build = Ci::Build.with_expired_artifacts.reorder(nil).find_by(id: build_id)
......
class GitGarbageCollectWorker
include Sidekiq::Worker
include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
sidekiq_options queue: :gitlab_shell, retry: false
sidekiq_options retry: false
def perform(project_id)
project = Project.find(project_id)
......
class GitlabShellWorker
include Sidekiq::Worker
include Gitlab::ShellAdapter
sidekiq_options queue: :gitlab_shell
include DedicatedSidekiqQueue
def perform(action, *arg)
gitlab_shell.send(action, *arg)
......
class GroupDestroyWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include DedicatedSidekiqQueue
def perform(group_id, user_id)
begin
......
class ImportExportProjectCleanupWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include CronjobQueue
def perform
ImportExportCleanUpService.new.execute
......
......@@ -3,6 +3,7 @@ require 'socket'
class IrkerWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
def perform(project_id, chans, colors, push_data, settings)
project = Project.find(project_id)
......
class MergeWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include DedicatedSidekiqQueue
def perform(merge_request_id, current_user_id, params)
params = params.with_indifferent_access
......
class NewNoteWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include DedicatedSidekiqQueue
def perform(note_id, note_params)
note = Note.find(note_id)
......
class PipelineHooksWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include PipelineQueue
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
......
class PipelineMetricsWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include PipelineQueue
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
......
class PipelineProcessWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include PipelineQueue
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
......
class PipelineSuccessWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include PipelineQueue
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
......
class PipelineUpdateWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include PipelineQueue
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
......
class PostReceive
include Sidekiq::Worker
include DedicatedSidekiqQueue
extend Gitlab::CurrentSettings
sidekiq_options queue: :post_receive
def perform(repo_path, identifier, changes)
if path = Gitlab.config.repositories.storages.find { |p| repo_path.start_with?(p[1].to_s) }
repo_path.gsub!(path[1].to_s, "")
......
class ProjectCacheWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include DedicatedSidekiqQueue
def perform(project_id)
project = Project.find(project_id)
......
class ProjectDestroyWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include DedicatedSidekiqQueue
def perform(project_id, user_id, params)
begin
......
class ProjectExportWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
sidekiq_options queue: :gitlab_shell, retry: 3
sidekiq_options retry: 3
def perform(current_user_id, project_id)
current_user = User.find(current_user_id)
......
class ProjectServiceWorker
include Sidekiq::Worker
sidekiq_options queue: :project_web_hook
include DedicatedSidekiqQueue
def perform(hook_id, data)
data = data.with_indifferent_access
......
class ProjectWebHookWorker
include Sidekiq::Worker
sidekiq_options queue: :project_web_hook
include DedicatedSidekiqQueue
def perform(hook_id, data, hook_name)
data = data.with_indifferent_access
......
class PruneOldEventsWorker
include Sidekiq::Worker
include CronjobQueue
def perform
# Contribution calendar shows maximum 12 months of events.
......
class RemoveExpiredGroupLinksWorker
include Sidekiq::Worker
include CronjobQueue
def perform
ProjectGroupLink.expired.destroy_all
......
class RemoveExpiredMembersWorker
include Sidekiq::Worker
include CronjobQueue
def perform
Member.expired.find_each do |member|
......
class RepositoryArchiveCacheWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include CronjobQueue
def perform
RepositoryArchiveCleanUpService.new.execute
......
module RepositoryCheck
class BatchWorker
include Sidekiq::Worker
include CronjobQueue
RUN_TIME = 3600
sidekiq_options retry: false
def perform
start = Time.now
......
module RepositoryCheck
class ClearWorker
include Sidekiq::Worker
sidekiq_options retry: false
include RepositoryCheckQueue
def perform
# Do small batched updates because these updates will be slow and locking
......
module RepositoryCheck
class SingleRepositoryWorker
include Sidekiq::Worker
sidekiq_options retry: false
include RepositoryCheckQueue
def perform(project_id)
project = Project.find(project_id)
......
class RepositoryForkWorker
include Sidekiq::Worker
include Gitlab::ShellAdapter
sidekiq_options queue: :gitlab_shell
include DedicatedSidekiqQueue
def perform(project_id, forked_from_repository_storage_path, source_path, target_path)
Gitlab::Metrics.add_event(:fork_repository,
......
class RepositoryImportWorker
include Sidekiq::Worker
include Gitlab::ShellAdapter
sidekiq_options queue: :gitlab_shell
include DedicatedSidekiqQueue
attr_accessor :project, :current_user
......
class RequestsProfilesWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include CronjobQueue
def perform
Gitlab::RequestProfiler.remove_all_profiles
......
class StuckCiBuildsWorker
include Sidekiq::Worker
include CronjobQueue
BUILD_STUCK_TIMEOUT = 1.day
......
class SystemHookWorker
include Sidekiq::Worker
sidekiq_options queue: :system_hook
include DedicatedSidekiqQueue
def perform(hook_id, data, hook_name)
SystemHook.find(hook_id).execute(data, hook_name)
......
class TrendingProjectsWorker
include Sidekiq::Worker
sidekiq_options queue: :trending_projects
include CronjobQueue
def perform
Rails.logger.info('Refreshing trending projects')
......
class UpdateMergeRequestsWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
def perform(project_id, user_id, oldrev, newrev, ref)
project = Project.find_by(id: project_id)
......
......@@ -4,6 +4,7 @@ cd $(dirname $0)/..
app_root=$(pwd)
sidekiq_pidfile="$app_root/tmp/pids/sidekiq.pid"
sidekiq_logfile="$app_root/log/sidekiq.log"
sidekiq_config="$app_root/config/sidekiq_queues.yml"
gitlab_user=$(ls -l config.ru | awk '{print $3}')
warn()
......@@ -37,7 +38,7 @@ start_no_deamonize()
start_sidekiq()
{
exec bundle exec sidekiq -q post_receive -q mailers -q archive_repo -q system_hook -q project_web_hook -q gitlab_shell -q incoming_email -q runner -q common -q pages -q default -q elasticsearch -e $RAILS_ENV -P $sidekiq_pidfile "$@"
exec bundle exec sidekiq -C "${sidekiq_config}" -e $RAILS_ENV -P $sidekiq_pidfile "$@"
}
load_ok()
......
......@@ -24,7 +24,8 @@ module Gitlab
#{config.root}/app/models/ci
#{config.root}/app/models/hooks
#{config.root}/app/models/members
#{config.root}/app/models/project_services))
#{config.root}/app/models/project_services
#{config.root}/app/workers/concerns))
config.generators.templates.push("#{config.root}/generator_templates")
......
# This configuration file should be exclusively used to set queue settings for
# Sidekiq. Any other setting should be specified using the Sidekiq CLI or the
# Sidekiq Ruby API (see config/initializers/sidekiq.rb).
---
# All the queues to process and their weights. Every queue _must_ have a weight
# defined.
#
# The available weights are as follows
#
# 1: low priority
# 2: medium priority
# 3: high priority
# 5: _super_ high priority, this should only be used for _very_ important queues
#
# As per http://stackoverflow.com/a/21241357/290102 the formula for calculating
# the likelihood of a job being popped off a queue (given all queues have work
# to perform) is:
#
# chance = (queue weight / total weight of all queues) * 100
:queues:
- [post_receive, 5]
- [merge, 5]
- [update_merge_requests, 3]
- [new_note, 2]
- [build, 2]
- [pipeline, 2]
- [gitlab_shell, 2]
- [email_receiver, 2]
- [emails_on_push, 2]
- [repository_fork, 1]
- [repository_import, 1]
- [project_service, 1]
- [clear_database_cache, 1]
- [delete_user, 1]
- [expire_build_instance_artifacts, 1]
- [group_destroy, 1]
- [irker, 1]
- [project_cache, 1]
- [project_destroy, 1]
- [project_export, 1]
- [project_web_hook, 1]
- [repository_check, 1]
- [system_hook, 1]
- [git_garbage_collect, 1]
- [cronjob, 1]
- [default, 1]
require 'json'
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class MigrateSidekiqQueuesFromDefault < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = true
DOWNTIME_REASON = <<-EOF
Moving Sidekiq jobs from queues requires Sidekiq to be stopped. Not stopping
Sidekiq will result in the loss of jobs that are scheduled after this
migration completes.
EOF
disable_ddl_transaction!
# Jobs for which the queue names have been changed (e.g. multiple workers
# using the same non-default queue).
#
# The keys are the old queue names, the values the jobs to move and their new
# queue names.
RENAMED_QUEUES = {
gitlab_shell: {
'GitGarbageCollectorWorker' => :git_garbage_collector,
'ProjectExportWorker' => :project_export,
'RepositoryForkWorker' => :repository_fork,
'RepositoryImportWorker' => :repository_import
},
project_web_hook: {
'ProjectServiceWorker' => :project_service
},
incoming_email: {
'EmailReceiverWorker' => :email_receiver
},
mailers: {
'EmailsOnPushWorker' => :emails_on_push
},
default: {
'AdminEmailWorker' => :cronjob,
'BuildCoverageWorker' => :build,
'BuildEmailWorker' => :build,
'BuildFinishedWorker' => :build,
'BuildHooksWorker' => :build,
'BuildSuccessWorker' => :build,
'ClearDatabaseCacheWorker' => :clear_database_cache,
'DeleteUserWorker' => :delete_user,
'ExpireBuildArtifactsWorker' => :cronjob,
'ExpireBuildInstanceArtifactsWorker' => :expire_build_instance_artifacts,
'GroupDestroyWorker' => :group_destroy,
'ImportExportProjectCleanupWorker' => :cronjob,
'IrkerWorker' => :irker,
'MergeWorker' => :merge,
'NewNoteWorker' => :new_note,
'PipelineHooksWorker' => :pipeline,
'PipelineMetricsWorker' => :pipeline,
'PipelineProcessWorker' => :pipeline,
'PipelineSuccessWorker' => :pipeline,
'PipelineUpdateWorker' => :pipeline,
'ProjectCacheWorker' => :project_cache,
'ProjectDestroyWorker' => :project_destroy,
'PruneOldEventsWorker' => :cronjob,
'RemoveExpiredGroupLinksWorker' => :cronjob,
'RemoveExpiredMembersWorker' => :cronjob,
'RepositoryArchiveCacheWorker' => :cronjob,
'RepositoryCheck::BatchWorker' => :cronjob,
'RepositoryCheck::ClearWorker' => :repository_check,
'RepositoryCheck::SingleRepositoryWorker' => :repository_check,
'RequestsProfilesWorker' => :cronjob,
'StuckCiBuildsWorker' => :cronjob,
'UpdateMergeRequestsWorker' => :update_merge_requests
}
}
def up
Sidekiq.redis do |redis|
RENAMED_QUEUES.each do |queue, jobs|
migrate_from_queue(redis, queue, jobs)
end
end
end
def down
Sidekiq.redis do |redis|
RENAMED_QUEUES.each do |dest_queue, jobs|
jobs.each do |worker, from_queue|
migrate_from_queue(redis, from_queue, worker => dest_queue)
end
end
end
end
def migrate_from_queue(redis, queue, job_mapping)
while job = redis.lpop("queue:#{queue}")
payload = JSON.load(job)
new_queue = job_mapping[payload['class']]
# If we have no target queue to migrate to we're probably dealing with
# some ancient job for which the worker no longer exists. In that case
# there's no sane option we can take, other than just dropping the job.
next unless new_queue
payload['queue'] = new_queue
redis.lpush("queue:#{new_queue}", JSON.dump(payload))
end
end
end
......@@ -14,7 +14,8 @@
- [Testing standards and style guidelines](testing.md)
- [UI guide](ui_guide.md) for building GitLab with existing CSS styles and elements
- [Frontend guidelines](frontend.md)
- [SQL guidelines](sql.md) for SQL guidelines
- [SQL guidelines](sql.md) for working with SQL queries
- [Sidekiq guidelines](sidekiq_style_guide.md) for working with Sidekiq workers
## Process
......
# Sidekiq Style Guide
This document outlines various guidelines that should be followed when adding or
modifying Sidekiq workers.
## Default Queue
Use of the "default" queue is not allowed. Every worker should use a queue that
matches the worker's purpose the closest. For example, workers that are to be
executed periodically should use the "cronjob" queue.
A list of all available queues can be found in `config/sidekiq_queues.yml`.
## Dedicated Queues
Most workers should use their own queue. To ease this process a worker can
include the `DedicatedSidekiqQueue` concern as follows:
```ruby
class ProcessSomethingWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
end
```
This will set the queue name based on the class' name, minus the `Worker`
suffix. In the above example this would lead to the queue being
`process_something`.
In some cases multiple workers do use the same queue. For example, the various
workers for updating CI pipelines all use the `pipeline` queue. Adding workers
to existing queues should be done with care, as adding more workers can lead to
slow jobs blocking work (even for different jobs) on the shared queue.
## Tests
Each Sidekiq worker must be tested using RSpec, just like any other class. These
tests should be placed in `spec/workers`.
require 'spec_helper'
describe BuildQueue do
let(:worker) do
Class.new do
include Sidekiq::Worker
include BuildQueue
end
end
it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('build')
end
end
require 'spec_helper'
describe CronjobQueue do
let(:worker) do
Class.new do
include Sidekiq::Worker
include CronjobQueue
end
end
it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob')
end
it 'disables retrying of failed jobs' do
expect(worker.sidekiq_options['retry']).to eq(false)
end
end
require 'spec_helper'
describe DedicatedSidekiqQueue do
let(:worker) do
Class.new do
def self.name
'Foo::Bar::DummyWorker'
end
include Sidekiq::Worker
include DedicatedSidekiqQueue
end
end
describe 'queue names' do
it 'sets the queue name based on the class name' do
expect(worker.sidekiq_options['queue']).to eq('foo_bar_dummy')
end
end
end
require 'spec_helper'
describe PipelineQueue do
let(:worker) do
Class.new do
include Sidekiq::Worker
include PipelineQueue
end
end
it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('pipeline')
end
end
require 'spec_helper'
describe RepositoryCheckQueue do
let(:worker) do
Class.new do
include Sidekiq::Worker
include RepositoryCheckQueue
end
end
it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('repository_check')
end
it 'disables retrying of failed jobs' do
expect(worker.sidekiq_options['retry']).to eq(false)
end
end
require 'spec_helper'
describe 'Every Sidekiq worker' do
let(:workers) do
root = Rails.root.join('app', 'workers')
concerns = root.join('concerns').to_s
workers = Dir[root.join('**', '*.rb')].
reject { |path| path.start_with?(concerns) }
workers.map do |path|
ns = Pathname.new(path).relative_path_from(root).to_s.gsub('.rb', '')
ns.camelize.constantize
end
end
it 'does not use the default queue' do
workers.each do |worker|
expect(worker.sidekiq_options['queue'].to_s).not_to eq('default')
end
end
it 'uses the cronjob queue when the worker runs as a cronjob' do
cron_workers = Settings.cron_jobs.
map { |job_name, options| options['job_class'].constantize }.
to_set
workers.each do |worker|
next unless cron_workers.include?(worker)
expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob')
end
end
it 'defines the queue in the Sidekiq configuration file' do
config = YAML.load_file(Rails.root.join('config', 'sidekiq_queues.yml').to_s)
queue_names = config[:queues].map { |(queue, _)| queue }.to_set
workers.each do |worker|
expect(queue_names).to include(worker.sidekiq_options['queue'].to_s)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment