Commit 03b5e73a authored by Sean McGivern's avatar Sean McGivern

Allow migrating scheduled and retried Sidekiq jobs to new queues

We now allow Sidekiq worker routing to be configured by administrators.
For example, they can say 'all jobs go to the default queue', or
'project export and import workers share a queue'. Right now, the only
really useful case is to re-route jobs to the default queue, but we will
support other options in future.

Migrating this sounds simple: listen to the old and new queues, update
the worker routing, wait for the old queue to be empty, and stop
listening to the old queue. But there's a catch: Sidekiq maintains two
sorted sets with jobs that are to be run in the future. There is the
scheduled set (for jobs that use `perform_in` or `perform_at` or
similar, where we choose to run a job in the future) and the retry
set (after failing, a job will get retried with some back-off).

Both of those sets are 'global' - there isn't one for each possible
destination queue. That means that the set entries themselves contain
information about their destination queue. And in the migration case
above, the destination queue might be the old queue and no longer
listened to.

This adds two Rake tasks (one for the retry set and one for the
scheduled set) to allow administrators to rewrite the job data in those
sorted sets

It uses these Redis commands:

1. ZCARD to get the initial size of the set, for showing to the
   operator. This is O(1) and is called once.
2. ZSCAN to iterate over the sets. This is O(1) per call, and provides
   useful guarantees about iterating over a set that may be changing as
   it's operated on.
3. ZREM to remove the old job hash. This is O(log(N)) per call, where N
   is the number of elements in the set.
4. ZADD to add the new job hash with the new queue name. This is also
   O(log(N)) per call.

ZREM and ZADD will each be called once per item to be migrated, so there
may be many invocations of these commands during this task's run.

Changelog: added
parent d125cf04
---
title: Allow migrating scheduled and retried Sidekiq jobs to new queues
merge_request: 60724
author:
type: added
......@@ -41,7 +41,8 @@ The following Rake tasks are available for use with GitLab:
| [Praefect Rake tasks](../administration/raketasks/praefect.md) | [Praefect](../administration/gitaly/praefect.md)-related tasks. |
| [Project import/export](../administration/raketasks/project_import_export.md) | Prepare for [project exports and imports](../user/project/settings/import_export.md). |
| [Sample Prometheus data](generate_sample_prometheus_data.md) | Generate sample Prometheus data. |
| [SPDX license list import](spdx.md) | Import a local copy of the [SPDX license list](https://spdx.org/licenses/) for matching [License Compliance policies](../user/compliance/license_compliance/index.md). | |
| [Sidekiq job migration](sidekiq_job_migration.md) | Migrate Sidekiq jobs scheduled for future dates to a new queue. |
| [SPDX license list import](spdx.md) | Import a local copy of the [SPDX license list](https://spdx.org/licenses/) for matching [License Compliance policies](../user/compliance/license_compliance/index.md). |
| [Repository storage](../administration/raketasks/storage.md) | List and migrate existing projects and attachments from legacy storage to hashed storage. |
| [Uploads migrate](../administration/raketasks/uploads/migrate.md) | Migrate uploads between local storage and object storage. |
| [Uploads sanitize](../administration/raketasks/uploads/sanitize.md) | Remove EXIF data from images uploaded to earlier versions of GitLab. |
......
---
stage: none
group: unassigned
info: To determine the technical writer assigned to the Stage/Group associated with this page, see https://about.gitlab.com/handbook/engineering/ux/technical-writing/#assignments
---
# Sidekiq job migration **(FREE SELF)**
WARNING:
This operation should be very uncommon. We do not recommend it for the vast majority of GitLab instances.
Sidekiq routing rules allow administrators to re-route certain background jobs from their regular queue to an alternative queue. By default, GitLab uses one queue per background job type. GitLab has over 400 background job types, and so correspondingly it has over 400 queues.
Most administrators will not need to change this setting. In some cases with particularly large background job processing workloads, Redis performance may suffer due to the number of queues that GitLab listens to.
If the Sidekiq routing rules are changed, administrators need to take care with the migration to avoid losing jobs entirely. The basic migration steps are:
1. Listen to both the old and new queues.
1. Update the routing rules.
1. Wait until there are no publishers dispatching jobs to the old queues.
1. Run the [Rake tasks for future jobs](#future-jobs).
1. Wait for the old queues to be empty.
1. Stop listening to the old queues.
## Future jobs
Step 4 involves rewriting some Sidekiq job data for jobs that are already stored in Redis, but due to run in future. There are two sets of jobs to run in future: scheduled jobs and jobs to be retried. We provide a separate Rake task to migrate each set:
- `gitlab:sidekiq:migrate_jobs:retry` for jobs to be retried.
- `gitlab:sidekiq:migrate_jobs:scheduled` for scheduled jobs.
Most of the time, running both at the same time is the correct choice. There are two separate tasks to allow for more fine-grained control where needed. To run both at once:
```shell
# omnibus-gitlab
sudo gitlab-rake gitlab:sidekiq:migrate_jobs:retry gitlab:sidekiq:migrate_jobs:schedule
# source installations
bundle exec rake gitlab:sidekiq:migrate_jobs:retry gitlab:sidekiq:migrate_jobs:schedule RAILS_ENV=production
```
# frozen_string_literal: true
module Gitlab
class SidekiqMigrateJobs
LOG_FREQUENCY = 1_000
attr_reader :sidekiq_set, :logger
def initialize(sidekiq_set, logger: nil)
@sidekiq_set = sidekiq_set
@logger = logger
end
# mappings is a hash of WorkerClassName => target_queue_name
def execute(mappings)
source_queues_regex = Regexp.union(mappings.keys)
cursor = 0
scanned = 0
migrated = 0
estimated_size = Sidekiq.redis { |c| c.zcard(sidekiq_set) }
logger&.info("Processing #{sidekiq_set} set. Estimated size: #{estimated_size}.")
begin
cursor, jobs = Sidekiq.redis { |c| c.zscan(sidekiq_set, cursor) }
jobs.each do |(job, score)|
if scanned > 0 && scanned % LOG_FREQUENCY == 0
logger&.info("In progress. Scanned records: #{scanned}. Migrated records: #{migrated}.")
end
scanned += 1
next unless job.match?(source_queues_regex)
job_hash = Sidekiq.load_json(job)
destination_queue = mappings[job_hash['class']]
next unless mappings.has_key?(job_hash['class'])
next if job_hash['queue'] == destination_queue
job_hash['queue'] = destination_queue
migrated += migrate_job(job, score, job_hash)
end
end while cursor.to_i != 0
logger&.info("Done. Scanned records: #{scanned}. Migrated records: #{migrated}.")
{
scanned: scanned,
migrated: migrated
}
end
private
def migrate_job(job, score, job_hash)
Sidekiq.redis do |connection|
removed = connection.zrem(sidekiq_set, job)
if removed
connection.zadd(sidekiq_set, score, Sidekiq.dump_json(job_hash))
1
else
0
end
end
end
end
end
......@@ -8,6 +8,29 @@ namespace :gitlab do
File.write(path, banner + YAML.dump(object).gsub(/ *$/m, ''))
end
namespace :migrate_jobs do
def mappings
::Gitlab::SidekiqConfig
.workers
.reject { |worker| worker.klass.is_a?(Gitlab::SidekiqConfig::DummyWorker) }
.to_h { |worker| [worker.klass.to_s, ::Gitlab::SidekiqConfig::WorkerRouter.global.route(worker.klass)] }
end
desc 'GitLab | Sidekiq | Migrate jobs in the scheduled set to new queue names'
task schedule: :environment do
::Gitlab::SidekiqMigrateJobs
.new('schedule', logger: Logger.new($stdout))
.execute(mappings)
end
desc 'GitLab | Sidekiq | Migrate jobs in the retry set to new queue names'
task retry: :environment do
::Gitlab::SidekiqMigrateJobs
.new('retry', logger: Logger.new($stdout))
.execute(mappings)
end
end
namespace :all_queues_yml do
desc 'GitLab | Sidekiq | Generate all_queues.yml based on worker definitions'
task generate: :environment do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do
def clear_queues
Sidekiq::Queue.new('authorized_projects').clear
Sidekiq::Queue.new('post_receive').clear
Sidekiq::RetrySet.new.clear
Sidekiq::ScheduledSet.new.clear
end
around do |example|
clear_queues
Sidekiq::Testing.disable!(&example)
clear_queues
end
describe '#execute', :aggregate_failures do
shared_examples 'processing a set' do
let(:migrator) { described_class.new(set_name) }
let(:set_after) do
Sidekiq.redis { |c| c.zrange(set_name, 0, -1, with_scores: true) }
.map { |item, score| [Sidekiq.load_json(item), score] }
end
context 'when the set is empty' do
it 'returns the number of scanned and migrated jobs' do
expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue')).to eq(scanned: 0, migrated: 0)
end
end
context 'when the set is not empty' do
it 'returns the number of scanned and migrated jobs' do
create_jobs
expect(migrator.execute({})).to eq(scanned: 4, migrated: 0)
end
end
context 'when there are no matching jobs' do
it 'does not change any queue names' do
create_jobs(include_post_receive: false)
expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 3, migrated: 0)
expect(set_after.length).to eq(3)
expect(set_after.map(&:first)).to all(include('queue' => 'authorized_projects',
'class' => 'AuthorizedProjectsWorker'))
end
end
context 'when there are matching jobs' do
it 'migrates only the workers matching the given worker from the set' do
freeze_time do
create_jobs
expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue')).to eq(scanned: 4, migrated: 3)
set_after.each.with_index do |(item, score), i|
if item['class'] == 'AuthorizedProjectsWorker'
expect(item).to include('queue' => 'new_queue', 'args' => [i])
else
expect(item).to include('queue' => 'post_receive', 'args' => [i])
end
expect(score).to eq(i.succ.hours.from_now.to_i)
end
end
end
it 'allows migrating multiple workers at once' do
freeze_time do
create_jobs
expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue', 'PostReceive' => 'another_queue'))
.to eq(scanned: 4, migrated: 4)
set_after.each.with_index do |(item, score), i|
if item['class'] == 'AuthorizedProjectsWorker'
expect(item).to include('queue' => 'new_queue', 'args' => [i])
else
expect(item).to include('queue' => 'another_queue', 'args' => [i])
end
expect(score).to eq(i.succ.hours.from_now.to_i)
end
end
end
it 'allows migrating multiple workers to the same queue' do
freeze_time do
create_jobs
expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue', 'PostReceive' => 'new_queue'))
.to eq(scanned: 4, migrated: 4)
set_after.each.with_index do |(item, score), i|
expect(item).to include('queue' => 'new_queue', 'args' => [i])
expect(score).to eq(i.succ.hours.from_now.to_i)
end
end
end
it 'does not try to migrate jobs that are removed from the set during the migration' do
freeze_time do
create_jobs
allow(migrator).to receive(:migrate_job).and_wrap_original do |meth, *args|
Sidekiq.redis { |c| c.zrem(set_name, args.first) }
meth.call(*args)
end
expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 4, migrated: 0)
expect(set_after.length).to eq(3)
expect(set_after.map(&:first)).to all(include('queue' => 'authorized_projects'))
end
end
it 'does not try to migrate unmatched jobs that are added to the set during the migration' do
create_jobs
calls = 0
allow(migrator).to receive(:migrate_job).and_wrap_original do |meth, *args|
if calls == 0
travel_to(5.hours.from_now) { create_jobs(include_post_receive: false) }
end
calls += 1
meth.call(*args)
end
expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 4, migrated: 1)
expect(set_after.group_by { |job| job.first['queue'] }.transform_values(&:count))
.to eq('authorized_projects' => 6, 'new_queue' => 1)
end
it 'iterates through the entire set of jobs' do
50.times do |i|
travel_to(i.hours.from_now) { create_jobs }
end
expect(migrator.execute('NonExistentWorker' => 'new_queue')).to eq(scanned: 200, migrated: 0)
expect(set_after.length).to eq(200)
end
it 'logs output at the start, finish, and every LOG_FREQUENCY jobs' do
freeze_time do
create_jobs
stub_const("#{described_class}::LOG_FREQUENCY", 2)
logger = Logger.new(StringIO.new)
migrator = described_class.new(set_name, logger: logger)
expect(logger).to receive(:info).with(a_string_matching('Processing')).once.ordered
expect(logger).to receive(:info).with(a_string_matching('In progress')).once.ordered
expect(logger).to receive(:info).with(a_string_matching('Done')).once.ordered
expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue', 'PostReceive' => 'new_queue'))
.to eq(scanned: 4, migrated: 4)
end
end
end
end
context 'scheduled jobs' do
let(:set_name) { 'schedule' }
def create_jobs(include_post_receive: true)
AuthorizedProjectsWorker.perform_in(1.hour, 0)
AuthorizedProjectsWorker.perform_in(2.hours, 1)
PostReceive.perform_in(3.hours, 2) if include_post_receive
AuthorizedProjectsWorker.perform_in(4.hours, 3)
end
it_behaves_like 'processing a set'
end
context 'retried jobs' do
let(:set_name) { 'retry' }
# Try to mimic as closely as possible what Sidekiq will actually
# do to retry a job.
def retry_in(klass, time, args)
# In Sidekiq 6, this argument will become a JSON string
message = { 'class' => klass, 'args' => [args], 'retry' => true }
allow(klass).to receive(:sidekiq_retry_in_block).and_return(proc { time })
begin
Sidekiq::JobRetry.new.local(klass, message, klass.queue) { raise 'boom' }
rescue Sidekiq::JobRetry::Skip
# Sidekiq scheduled the retry
end
end
def create_jobs(include_post_receive: true)
retry_in(AuthorizedProjectsWorker, 1.hour, 0)
retry_in(AuthorizedProjectsWorker, 2.hours, 1)
retry_in(PostReceive, 3.hours, 2) if include_post_receive
retry_in(AuthorizedProjectsWorker, 4.hours, 3)
end
it_behaves_like 'processing a set'
end
end
end
# frozen_string_literal: true
require 'rake_helper'
RSpec.describe 'sidekiq.rake', :aggregate_failures do
before do
Rake.application.rake_require 'tasks/gitlab/sidekiq'
stub_warn_user_is_not_gitlab
end
shared_examples 'migration rake task' do
it 'runs the migrator with a mapping of workers to queues' do
test_routes = [
['urgency=high', 'default'],
['*', nil]
]
test_router = ::Gitlab::SidekiqConfig::WorkerRouter.new(test_routes)
migrator = ::Gitlab::SidekiqMigrateJobs.new(sidekiq_set, logger: Logger.new($stdout))
allow(::Gitlab::SidekiqConfig::WorkerRouter)
.to receive(:global).and_return(test_router)
expect(::Gitlab::SidekiqMigrateJobs)
.to receive(:new).with(sidekiq_set, logger: an_instance_of(Logger)).and_return(migrator)
expect(migrator)
.to receive(:execute)
.with(a_hash_including('PostReceive' => 'default',
'MergeWorker' => 'default',
'DeleteDiffFilesWorker' => 'delete_diff_files'))
.and_call_original
run_rake_task("gitlab:sidekiq:migrate_jobs:#{sidekiq_set}")
expect($stdout.string).to include("Processing #{sidekiq_set}")
expect($stdout.string).to include('Done')
end
end
describe 'gitlab:sidekiq:migrate_jobs:schedule rake task' do
let(:sidekiq_set) { 'schedule' }
it_behaves_like 'migration rake task'
end
describe 'gitlab:sidekiq:migrate_jobs:retry rake task' do
let(:sidekiq_set) { 'retry' }
it_behaves_like 'migration rake task'
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment