Commit be0f459e authored by Yorick Peterse's avatar Yorick Peterse

Re-organize queues for EE workers

This builds on
https://gitlab.com/gitlab-org/gitlab-ce/merge_requests/7006 and adjusts
various parts to take care of EE specific workers.
parent 931ff815
......@@ -12,6 +12,7 @@ Please view this file on the master branch, on stable branches it's out of date.
- Decrease maximum time that GitLab waits for a mirror to finish !791 (Borja Aparicio)
- User groups (that can be assigned as approvers)
- Fix a search for non-default branches when ES is enabled
- Re-organized the Sidekiq queues for EE specific workers
## 8.12.7
......
class AdminEmailsWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
def perform(recipient_id, subject, body)
recipient_list(recipient_id).pluck(:id).each do |user_id|
......
# Concern for setting Sidekiq settings for the various GitLab GEO workers.
module GeoQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: :geo
end
end
class GeoBulkNotifyWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include CronjobQueue
def perform
Geo::NotifyNodesService.new.execute
......
class GeoKeyRefreshWorker
include Sidekiq::Worker
include ::GeoDynamicBackoff
sidekiq_options queue: :default
include GeoQueue
def perform(key_id, key, action)
action = action.to_sym
......
class GeoRepositoryDestroyWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include GeoQueue
def perform(id, name, path_with_namespace)
# We don't have access to the original model anymore, so we are
......
class GeoRepositoryMoveWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include GeoQueue
def perform(id, name, old_path_with_namespace, new_path_with_namespace)
Geo::MoveRepositoryService.new(id, name, old_path_with_namespace, new_path_with_namespace).execute
......
class GeoRepositoryUpdateWorker
include Sidekiq::Worker
include Gitlab::ShellAdapter
sidekiq_options queue: :default
include GeoQueue
attr_accessor :project
......
class GeoWikiRepositoryUpdateWorker
include Sidekiq::Worker
include Gitlab::ShellAdapter
sidekiq_options queue: :default
include GeoQueue
attr_accessor :project
......
......@@ -2,11 +2,9 @@ class GitlabUsagePingWorker
LEASE_TIMEOUT = 86400
include Sidekiq::Worker
include CronjobQueue
include HTTParty
# This is not guaranteed to succeed, so don't retry on failure
sidekiq_options queue: :default, retry: false
def perform
return unless current_application_settings.usage_ping_enabled
......
class HistoricalDataWorker
include Sidekiq::Worker
include CronjobQueue
def perform
return if Gitlab::Geo.secondary?
......
class LdapGroupSyncWorker
include Sidekiq::Worker
sidekiq_options retry: false
include CronjobQueue
def perform(group_id = nil)
if group_id
......
class LdapSyncWorker
include Sidekiq::Worker
sidekiq_options retry: false
include CronjobQueue
def perform
return unless Gitlab.config.ldap.enabled
......
class ProjectUpdateRepositoryStorageWorker
include Sidekiq::Worker
sidekiq_options queue: :default
include DedicatedSidekiqQueue
def perform(project_id, new_repository_storage_key)
project = Project.find(project_id)
......
class RebaseWorker
include Sidekiq::Worker
sidekiq_options queue: :default
sidekiq_options queue: :merge
def perform(merge_request_id, current_user_id)
current_user = User.find(current_user_id)
......
class UpdateAllMirrorsWorker
include Sidekiq::Worker
include CronjobQueue
LEASE_TIMEOUT = 3600
......
class UpdateAllRemoteMirrorsWorker
include Sidekiq::Worker
include CronjobQueue
def perform
fail_stuck_mirrors!
......
......@@ -44,3 +44,9 @@
- [git_garbage_collect, 1]
- [cronjob, 1]
- [default, 1]
# EE specific queues
- [pages, 1]
- [elasticsearch, 1]
- [geo, 1]
- [project_update_repository_storage, 1]
- [admin_emails, 1]
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class MigrateEeSidekiqQueuesFromDefault < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = true
DOWNTIME_REASON = <<-EOF
Moving Sidekiq jobs from queues requires Sidekiq to be stopped. Not stopping
Sidekiq will result in the loss of jobs that are scheduled after this
migration completes.
EOF
disable_ddl_transaction!
# Jobs for which the queue names have been changed (e.g. multiple workers
# using the same non-default queue).
#
# The keys are the old queue names, the values the jobs to move and their new
# queue names.
RENAMED_QUEUES = {
default: {
'LdapGroupSyncWorker' => :cronjob,
'LdapSyncWorker' => :cronjob,
}
}
def up
Sidekiq.redis do |redis|
RENAMED_QUEUES.each do |queue, jobs|
migrate_from_queue(redis, queue, jobs)
end
end
end
def down
Sidekiq.redis do |redis|
RENAMED_QUEUES.each do |dest_queue, jobs|
jobs.each do |worker, from_queue|
migrate_from_queue(redis, from_queue, worker => dest_queue)
end
end
end
end
def migrate_from_queue(redis, queue, job_mapping)
while job = redis.lpop("queue:#{queue}")
payload = JSON.load(job)
new_queue = job_mapping[payload['class']]
# If we have no target queue to migrate to we're probably dealing with
# some ancient job for which the worker no longer exists. In that case
# there's no sane option we can take, other than just dropping the job.
next unless new_queue
payload['queue'] = new_queue
redis.lpush("queue:#{new_queue}", JSON.dump(payload))
end
end
end
......@@ -11,7 +11,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20161007133303) do
ActiveRecord::Schema.define(version: 20161021185735) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
......@@ -1020,9 +1020,9 @@ ActiveRecord::Schema.define(version: 20161007133303) do
t.boolean "only_allow_merge_if_build_succeeds", default: false, null: false
t.boolean "has_external_issue_tracker"
t.string "repository_storage", default: "default", null: false
t.boolean "repository_read_only"
t.boolean "request_access_enabled", default: true, null: false
t.boolean "has_external_wiki"
t.boolean "repository_read_only"
t.boolean "lfs_enabled"
t.text "description_html"
t.integer "repository_size_limit"
......@@ -1049,7 +1049,7 @@ ActiveRecord::Schema.define(version: 20161007133303) do
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.integer "user_id"
t.integer "group_id"
t.integer "group_id"
end
add_index "protected_branch_merge_access_levels", ["protected_branch_id"], name: "index_protected_branch_merge_access", using: :btree
......@@ -1061,7 +1061,7 @@ ActiveRecord::Schema.define(version: 20161007133303) do
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.integer "user_id"
t.integer "group_id"
t.integer "group_id"
end
add_index "protected_branch_push_access_levels", ["protected_branch_id"], name: "index_protected_branch_push_access", using: :btree
......@@ -1388,8 +1388,8 @@ ActiveRecord::Schema.define(version: 20161007133303) do
t.boolean "note_events", default: false, null: false
t.boolean "enable_ssl_verification", default: true
t.boolean "build_events", default: false, null: false
t.boolean "wiki_page_events", default: false, null: false
t.string "token"
t.boolean "wiki_page_events", default: false, null: false
t.boolean "pipeline_events", default: false, null: false
t.boolean "confidential_issues_events", default: false, null: false
end
......
require 'spec_helper'
describe GeoQueue do
let(:worker) do
Class.new do
include Sidekiq::Worker
include GeoQueue
end
end
it 'sets the queue name of a worker' do
expect(worker.sidekiq_options['queue'].to_s).to eq('geo')
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment