Commit 557d3bfc authored by Kerri Miller's avatar Kerri Miller

Merge branch 'allow-migrating-sidekiq-scheduled-and-retry-jobs' into 'master'

Allow migrating scheduled and to-be-retried Sidekiq jobs

See merge request gitlab-org/gitlab!60724
parents 083498e7 03b5e73a
---
title: Allow migrating scheduled and retried Sidekiq jobs to new queues
merge_request: 60724
author:
type: added
...@@ -41,7 +41,8 @@ The following Rake tasks are available for use with GitLab: ...@@ -41,7 +41,8 @@ The following Rake tasks are available for use with GitLab:
| [Praefect Rake tasks](../administration/raketasks/praefect.md) | [Praefect](../administration/gitaly/praefect.md)-related tasks. | | [Praefect Rake tasks](../administration/raketasks/praefect.md) | [Praefect](../administration/gitaly/praefect.md)-related tasks. |
| [Project import/export](../administration/raketasks/project_import_export.md) | Prepare for [project exports and imports](../user/project/settings/import_export.md). | | [Project import/export](../administration/raketasks/project_import_export.md) | Prepare for [project exports and imports](../user/project/settings/import_export.md). |
| [Sample Prometheus data](generate_sample_prometheus_data.md) | Generate sample Prometheus data. | | [Sample Prometheus data](generate_sample_prometheus_data.md) | Generate sample Prometheus data. |
| [SPDX license list import](spdx.md) | Import a local copy of the [SPDX license list](https://spdx.org/licenses/) for matching [License Compliance policies](../user/compliance/license_compliance/index.md). | | | [Sidekiq job migration](sidekiq_job_migration.md) | Migrate Sidekiq jobs scheduled for future dates to a new queue. |
| [SPDX license list import](spdx.md) | Import a local copy of the [SPDX license list](https://spdx.org/licenses/) for matching [License Compliance policies](../user/compliance/license_compliance/index.md). |
| [Repository storage](../administration/raketasks/storage.md) | List and migrate existing projects and attachments from legacy storage to hashed storage. | | [Repository storage](../administration/raketasks/storage.md) | List and migrate existing projects and attachments from legacy storage to hashed storage. |
| [Uploads migrate](../administration/raketasks/uploads/migrate.md) | Migrate uploads between local storage and object storage. | | [Uploads migrate](../administration/raketasks/uploads/migrate.md) | Migrate uploads between local storage and object storage. |
| [Uploads sanitize](../administration/raketasks/uploads/sanitize.md) | Remove EXIF data from images uploaded to earlier versions of GitLab. | | [Uploads sanitize](../administration/raketasks/uploads/sanitize.md) | Remove EXIF data from images uploaded to earlier versions of GitLab. |
......
---
stage: none
group: unassigned
info: To determine the technical writer assigned to the Stage/Group associated with this page, see https://about.gitlab.com/handbook/engineering/ux/technical-writing/#assignments
---
# Sidekiq job migration **(FREE SELF)**
WARNING:
This operation should be very uncommon. We do not recommend it for the vast majority of GitLab instances.
Sidekiq routing rules allow administrators to re-route certain background jobs from their regular queue to an alternative queue. By default, GitLab uses one queue per background job type. GitLab has over 400 background job types, and so correspondingly it has over 400 queues.
Most administrators will not need to change this setting. In some cases with particularly large background job processing workloads, Redis performance may suffer due to the number of queues that GitLab listens to.
If the Sidekiq routing rules are changed, administrators need to take care with the migration to avoid losing jobs entirely. The basic migration steps are:
1. Listen to both the old and new queues.
1. Update the routing rules.
1. Wait until there are no publishers dispatching jobs to the old queues.
1. Run the [Rake tasks for future jobs](#future-jobs).
1. Wait for the old queues to be empty.
1. Stop listening to the old queues.
## Future jobs
Step 4 involves rewriting some Sidekiq job data for jobs that are already stored in Redis, but due to run in future. There are two sets of jobs to run in future: scheduled jobs and jobs to be retried. We provide a separate Rake task to migrate each set:
- `gitlab:sidekiq:migrate_jobs:retry` for jobs to be retried.
- `gitlab:sidekiq:migrate_jobs:scheduled` for scheduled jobs.
Most of the time, running both at the same time is the correct choice. There are two separate tasks to allow for more fine-grained control where needed. To run both at once:
```shell
# omnibus-gitlab
sudo gitlab-rake gitlab:sidekiq:migrate_jobs:retry gitlab:sidekiq:migrate_jobs:schedule
# source installations
bundle exec rake gitlab:sidekiq:migrate_jobs:retry gitlab:sidekiq:migrate_jobs:schedule RAILS_ENV=production
```
# frozen_string_literal: true
module Gitlab
class SidekiqMigrateJobs
LOG_FREQUENCY = 1_000
attr_reader :sidekiq_set, :logger
def initialize(sidekiq_set, logger: nil)
@sidekiq_set = sidekiq_set
@logger = logger
end
# mappings is a hash of WorkerClassName => target_queue_name
def execute(mappings)
source_queues_regex = Regexp.union(mappings.keys)
cursor = 0
scanned = 0
migrated = 0
estimated_size = Sidekiq.redis { |c| c.zcard(sidekiq_set) }
logger&.info("Processing #{sidekiq_set} set. Estimated size: #{estimated_size}.")
begin
cursor, jobs = Sidekiq.redis { |c| c.zscan(sidekiq_set, cursor) }
jobs.each do |(job, score)|
if scanned > 0 && scanned % LOG_FREQUENCY == 0
logger&.info("In progress. Scanned records: #{scanned}. Migrated records: #{migrated}.")
end
scanned += 1
next unless job.match?(source_queues_regex)
job_hash = Sidekiq.load_json(job)
destination_queue = mappings[job_hash['class']]
next unless mappings.has_key?(job_hash['class'])
next if job_hash['queue'] == destination_queue
job_hash['queue'] = destination_queue
migrated += migrate_job(job, score, job_hash)
end
end while cursor.to_i != 0
logger&.info("Done. Scanned records: #{scanned}. Migrated records: #{migrated}.")
{
scanned: scanned,
migrated: migrated
}
end
private
def migrate_job(job, score, job_hash)
Sidekiq.redis do |connection|
removed = connection.zrem(sidekiq_set, job)
if removed
connection.zadd(sidekiq_set, score, Sidekiq.dump_json(job_hash))
1
else
0
end
end
end
end
end
...@@ -8,6 +8,29 @@ namespace :gitlab do ...@@ -8,6 +8,29 @@ namespace :gitlab do
File.write(path, banner + YAML.dump(object).gsub(/ *$/m, '')) File.write(path, banner + YAML.dump(object).gsub(/ *$/m, ''))
end end
namespace :migrate_jobs do
def mappings
::Gitlab::SidekiqConfig
.workers
.reject { |worker| worker.klass.is_a?(Gitlab::SidekiqConfig::DummyWorker) }
.to_h { |worker| [worker.klass.to_s, ::Gitlab::SidekiqConfig::WorkerRouter.global.route(worker.klass)] }
end
desc 'GitLab | Sidekiq | Migrate jobs in the scheduled set to new queue names'
task schedule: :environment do
::Gitlab::SidekiqMigrateJobs
.new('schedule', logger: Logger.new($stdout))
.execute(mappings)
end
desc 'GitLab | Sidekiq | Migrate jobs in the retry set to new queue names'
task retry: :environment do
::Gitlab::SidekiqMigrateJobs
.new('retry', logger: Logger.new($stdout))
.execute(mappings)
end
end
namespace :all_queues_yml do namespace :all_queues_yml do
desc 'GitLab | Sidekiq | Generate all_queues.yml based on worker definitions' desc 'GitLab | Sidekiq | Generate all_queues.yml based on worker definitions'
task generate: :environment do task generate: :environment do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::SidekiqMigrateJobs, :clean_gitlab_redis_queues do
def clear_queues
Sidekiq::Queue.new('authorized_projects').clear
Sidekiq::Queue.new('post_receive').clear
Sidekiq::RetrySet.new.clear
Sidekiq::ScheduledSet.new.clear
end
around do |example|
clear_queues
Sidekiq::Testing.disable!(&example)
clear_queues
end
describe '#execute', :aggregate_failures do
shared_examples 'processing a set' do
let(:migrator) { described_class.new(set_name) }
let(:set_after) do
Sidekiq.redis { |c| c.zrange(set_name, 0, -1, with_scores: true) }
.map { |item, score| [Sidekiq.load_json(item), score] }
end
context 'when the set is empty' do
it 'returns the number of scanned and migrated jobs' do
expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue')).to eq(scanned: 0, migrated: 0)
end
end
context 'when the set is not empty' do
it 'returns the number of scanned and migrated jobs' do
create_jobs
expect(migrator.execute({})).to eq(scanned: 4, migrated: 0)
end
end
context 'when there are no matching jobs' do
it 'does not change any queue names' do
create_jobs(include_post_receive: false)
expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 3, migrated: 0)
expect(set_after.length).to eq(3)
expect(set_after.map(&:first)).to all(include('queue' => 'authorized_projects',
'class' => 'AuthorizedProjectsWorker'))
end
end
context 'when there are matching jobs' do
it 'migrates only the workers matching the given worker from the set' do
freeze_time do
create_jobs
expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue')).to eq(scanned: 4, migrated: 3)
set_after.each.with_index do |(item, score), i|
if item['class'] == 'AuthorizedProjectsWorker'
expect(item).to include('queue' => 'new_queue', 'args' => [i])
else
expect(item).to include('queue' => 'post_receive', 'args' => [i])
end
expect(score).to eq(i.succ.hours.from_now.to_i)
end
end
end
it 'allows migrating multiple workers at once' do
freeze_time do
create_jobs
expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue', 'PostReceive' => 'another_queue'))
.to eq(scanned: 4, migrated: 4)
set_after.each.with_index do |(item, score), i|
if item['class'] == 'AuthorizedProjectsWorker'
expect(item).to include('queue' => 'new_queue', 'args' => [i])
else
expect(item).to include('queue' => 'another_queue', 'args' => [i])
end
expect(score).to eq(i.succ.hours.from_now.to_i)
end
end
end
it 'allows migrating multiple workers to the same queue' do
freeze_time do
create_jobs
expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue', 'PostReceive' => 'new_queue'))
.to eq(scanned: 4, migrated: 4)
set_after.each.with_index do |(item, score), i|
expect(item).to include('queue' => 'new_queue', 'args' => [i])
expect(score).to eq(i.succ.hours.from_now.to_i)
end
end
end
it 'does not try to migrate jobs that are removed from the set during the migration' do
freeze_time do
create_jobs
allow(migrator).to receive(:migrate_job).and_wrap_original do |meth, *args|
Sidekiq.redis { |c| c.zrem(set_name, args.first) }
meth.call(*args)
end
expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 4, migrated: 0)
expect(set_after.length).to eq(3)
expect(set_after.map(&:first)).to all(include('queue' => 'authorized_projects'))
end
end
it 'does not try to migrate unmatched jobs that are added to the set during the migration' do
create_jobs
calls = 0
allow(migrator).to receive(:migrate_job).and_wrap_original do |meth, *args|
if calls == 0
travel_to(5.hours.from_now) { create_jobs(include_post_receive: false) }
end
calls += 1
meth.call(*args)
end
expect(migrator.execute('PostReceive' => 'new_queue')).to eq(scanned: 4, migrated: 1)
expect(set_after.group_by { |job| job.first['queue'] }.transform_values(&:count))
.to eq('authorized_projects' => 6, 'new_queue' => 1)
end
it 'iterates through the entire set of jobs' do
50.times do |i|
travel_to(i.hours.from_now) { create_jobs }
end
expect(migrator.execute('NonExistentWorker' => 'new_queue')).to eq(scanned: 200, migrated: 0)
expect(set_after.length).to eq(200)
end
it 'logs output at the start, finish, and every LOG_FREQUENCY jobs' do
freeze_time do
create_jobs
stub_const("#{described_class}::LOG_FREQUENCY", 2)
logger = Logger.new(StringIO.new)
migrator = described_class.new(set_name, logger: logger)
expect(logger).to receive(:info).with(a_string_matching('Processing')).once.ordered
expect(logger).to receive(:info).with(a_string_matching('In progress')).once.ordered
expect(logger).to receive(:info).with(a_string_matching('Done')).once.ordered
expect(migrator.execute('AuthorizedProjectsWorker' => 'new_queue', 'PostReceive' => 'new_queue'))
.to eq(scanned: 4, migrated: 4)
end
end
end
end
context 'scheduled jobs' do
let(:set_name) { 'schedule' }
def create_jobs(include_post_receive: true)
AuthorizedProjectsWorker.perform_in(1.hour, 0)
AuthorizedProjectsWorker.perform_in(2.hours, 1)
PostReceive.perform_in(3.hours, 2) if include_post_receive
AuthorizedProjectsWorker.perform_in(4.hours, 3)
end
it_behaves_like 'processing a set'
end
context 'retried jobs' do
let(:set_name) { 'retry' }
# Try to mimic as closely as possible what Sidekiq will actually
# do to retry a job.
def retry_in(klass, time, args)
# In Sidekiq 6, this argument will become a JSON string
message = { 'class' => klass, 'args' => [args], 'retry' => true }
allow(klass).to receive(:sidekiq_retry_in_block).and_return(proc { time })
begin
Sidekiq::JobRetry.new.local(klass, message, klass.queue) { raise 'boom' }
rescue Sidekiq::JobRetry::Skip
# Sidekiq scheduled the retry
end
end
def create_jobs(include_post_receive: true)
retry_in(AuthorizedProjectsWorker, 1.hour, 0)
retry_in(AuthorizedProjectsWorker, 2.hours, 1)
retry_in(PostReceive, 3.hours, 2) if include_post_receive
retry_in(AuthorizedProjectsWorker, 4.hours, 3)
end
it_behaves_like 'processing a set'
end
end
end
# frozen_string_literal: true
require 'rake_helper'
RSpec.describe 'sidekiq.rake', :aggregate_failures do
before do
Rake.application.rake_require 'tasks/gitlab/sidekiq'
stub_warn_user_is_not_gitlab
end
shared_examples 'migration rake task' do
it 'runs the migrator with a mapping of workers to queues' do
test_routes = [
['urgency=high', 'default'],
['*', nil]
]
test_router = ::Gitlab::SidekiqConfig::WorkerRouter.new(test_routes)
migrator = ::Gitlab::SidekiqMigrateJobs.new(sidekiq_set, logger: Logger.new($stdout))
allow(::Gitlab::SidekiqConfig::WorkerRouter)
.to receive(:global).and_return(test_router)
expect(::Gitlab::SidekiqMigrateJobs)
.to receive(:new).with(sidekiq_set, logger: an_instance_of(Logger)).and_return(migrator)
expect(migrator)
.to receive(:execute)
.with(a_hash_including('PostReceive' => 'default',
'MergeWorker' => 'default',
'DeleteDiffFilesWorker' => 'delete_diff_files'))
.and_call_original
run_rake_task("gitlab:sidekiq:migrate_jobs:#{sidekiq_set}")
expect($stdout.string).to include("Processing #{sidekiq_set}")
expect($stdout.string).to include('Done')
end
end
describe 'gitlab:sidekiq:migrate_jobs:schedule rake task' do
let(:sidekiq_set) { 'schedule' }
it_behaves_like 'migration rake task'
end
describe 'gitlab:sidekiq:migrate_jobs:retry rake task' do
let(:sidekiq_set) { 'retry' }
it_behaves_like 'migration rake task'
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment