Commit 6c8c30f0 authored by Bob Van Landuyt's avatar Bob Van Landuyt

Merge branch 'mmj-bulk-delays-for-bulk-perform-in-sidekiq' into 'master'

Limit Sidekiq `push_bulk` to a maximum of 1000 jobs in one go

See merge request gitlab-org/gitlab!72263
parents 1ac8a9d1 49141e3e
...@@ -14,6 +14,7 @@ module ApplicationWorker ...@@ -14,6 +14,7 @@ module ApplicationWorker
LOGGING_EXTRA_KEY = 'extra' LOGGING_EXTRA_KEY = 'extra'
DEFAULT_DELAY_INTERVAL = 1 DEFAULT_DELAY_INTERVAL = 1
SAFE_PUSH_BULK_LIMIT = 1000
included do included do
set_queue set_queue
...@@ -135,24 +136,47 @@ module ApplicationWorker ...@@ -135,24 +136,47 @@ module ApplicationWorker
end end
def bulk_perform_async(args_list) def bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list) if Feature.enabled?(:sidekiq_push_bulk_in_batches)
in_safe_limit_batches(args_list) do |args_batch, _|
Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch)
end
else
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
end
end end
def bulk_perform_in(delay, args_list, batch_size: nil, batch_delay: nil) def bulk_perform_in(delay, args_list, batch_size: nil, batch_delay: nil)
now = Time.now.to_i now = Time.now.to_i
schedule = now + delay.to_i base_schedule_at = now + delay.to_i
if schedule <= now if base_schedule_at <= now
raise ArgumentError, _('The schedule time must be in the future!') raise ArgumentError, 'The schedule time must be in the future!'
end end
schedule_at = base_schedule_at
if batch_size && batch_delay if batch_size && batch_delay
args_list.each_slice(batch_size.to_i).with_index do |args_batch, idx| batch_size = batch_size.to_i
batch_schedule = schedule + idx * batch_delay.to_i batch_delay = batch_delay.to_i
Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => batch_schedule)
raise ArgumentError, 'batch_size should be greater than 0' unless batch_size > 0
raise ArgumentError, 'batch_delay should be greater than 0' unless batch_delay > 0
# build an array of schedules corresponding to each item in `args_list`
bulk_schedule_at = Array.new(args_list.size) do |index|
batch_number = index / batch_size
base_schedule_at + (batch_number * batch_delay)
end
schedule_at = bulk_schedule_at
end
if Feature.enabled?(:sidekiq_push_bulk_in_batches)
in_safe_limit_batches(args_list, schedule_at) do |args_batch, schedule_at_for_batch|
Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => schedule_at_for_batch)
end end
else else
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule) Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule_at)
end end
end end
...@@ -161,5 +185,34 @@ module ApplicationWorker ...@@ -161,5 +185,34 @@ module ApplicationWorker
def delay_interval def delay_interval
DEFAULT_DELAY_INTERVAL.seconds DEFAULT_DELAY_INTERVAL.seconds
end end
private
def in_safe_limit_batches(args_list, schedule_at = nil, safe_limit = SAFE_PUSH_BULK_LIMIT)
# `schedule_at` could be one of
# - nil.
# - a single Numeric that represents time, like `30.minutes.from_now.to_i`.
# - an array, where each element is a Numeric that reprsents time.
# - Each element in this array would correspond to the time at which
# - the job in `args_list` at the corresponding index needs to be scheduled.
# In the case where `schedule_at` is an array of Numeric, it needs to be sliced
# in the same manner as the `args_list`, with each slice containing `safe_limit`
# number of elements.
schedule_at = schedule_at.each_slice(safe_limit).to_a if schedule_at.is_a?(Array)
args_list.each_slice(safe_limit).with_index.flat_map do |args_batch, index|
schedule_at_for_batch = process_schedule_at_for_batch(schedule_at, index)
yield(args_batch, schedule_at_for_batch)
end
end
def process_schedule_at_for_batch(schedule_at, index)
return unless schedule_at
return schedule_at[index] if schedule_at.is_a?(Array) && schedule_at.all?(Array)
schedule_at
end
end end
end end
---
name: sidekiq_push_bulk_in_batches
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/72263
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/343740
milestone: '14.5'
type: development
group: group::access
default_enabled: false
...@@ -34237,9 +34237,6 @@ msgstr "" ...@@ -34237,9 +34237,6 @@ msgstr ""
msgid "The same shared runner executes code from multiple projects, unless you configure autoscaling with %{link} set to 1 (which it is on GitLab.com)." msgid "The same shared runner executes code from multiple projects, unless you configure autoscaling with %{link} set to 1 (which it is on GitLab.com)."
msgstr "" msgstr ""
msgid "The schedule time must be in the future!"
msgstr ""
msgid "The snippet can be accessed without any authentication." msgid "The snippet can be accessed without any authentication."
msgstr "" msgstr ""
......
...@@ -285,48 +285,38 @@ RSpec.describe ApplicationWorker do ...@@ -285,48 +285,38 @@ RSpec.describe ApplicationWorker do
end end
end end
describe '.bulk_perform_async' do context 'different kinds of push_bulk' do
before do shared_context 'disable the `sidekiq_push_bulk_in_batches` feature flag' do
stub_const(worker.name, worker) before do
stub_feature_flags(sidekiq_push_bulk_in_batches: false)
end
end end
it 'enqueues jobs in bulk' do shared_context 'set safe limit beyond the number of jobs to be enqueued' do
Sidekiq::Testing.fake! do before do
worker.bulk_perform_async([['Foo', [1]], ['Foo', [2]]]) stub_const("#{described_class}::SAFE_PUSH_BULK_LIMIT", args.count + 1)
expect(worker.jobs.count).to eq 2
expect(worker.jobs).to all(include('enqueued_at'))
end end
end end
end
describe '.bulk_perform_in' do shared_context 'set safe limit below the number of jobs to be enqueued' do
before do before do
stub_const(worker.name, worker) stub_const("#{described_class}::SAFE_PUSH_BULK_LIMIT", 2)
end
end end
context 'when delay is valid' do shared_examples_for 'returns job_id of all enqueued jobs' do
it 'correctly schedules jobs' do let(:job_id_regex) { /[0-9a-f]{12}/ }
Sidekiq::Testing.fake! do
worker.bulk_perform_in(1.minute, [['Foo', [1]], ['Foo', [2]]])
expect(worker.jobs.count).to eq 2 it 'returns job_id of all enqueued jobs' do
expect(worker.jobs).to all(include('at')) job_ids = perform_action
end
end
end
context 'when delay is invalid' do expect(job_ids.count).to eq(args.count)
it 'raises an ArgumentError exception' do expect(job_ids).to all(match(job_id_regex))
expect { worker.bulk_perform_in(-60, [['Foo']]) }
.to raise_error(ArgumentError)
end end
end end
context 'with batches' do shared_examples_for 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit' do
let(:batch_delay) { 1.minute } it 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit' do
it 'correctly schedules jobs' do
expect(Sidekiq::Client).to( expect(Sidekiq::Client).to(
receive(:push_bulk).with(hash_including('args' => [['Foo', [1]], ['Foo', [2]]])) receive(:push_bulk).with(hash_including('args' => [['Foo', [1]], ['Foo', [2]]]))
.ordered .ordered
...@@ -337,28 +327,257 @@ RSpec.describe ApplicationWorker do ...@@ -337,28 +327,257 @@ RSpec.describe ApplicationWorker do
.and_call_original) .and_call_original)
expect(Sidekiq::Client).to( expect(Sidekiq::Client).to(
receive(:push_bulk).with(hash_including('args' => [['Foo', [5]]])) receive(:push_bulk).with(hash_including('args' => [['Foo', [5]]]))
.ordered .ordered
.and_call_original) .and_call_original)
perform_action
expect(worker.jobs.count).to eq args.count
expect(worker.jobs).to all(include('enqueued_at'))
end
end
shared_examples_for 'enqueues jobs in one go' do
it 'enqueues jobs in one go' do
expect(Sidekiq::Client).to(
receive(:push_bulk).with(hash_including('args' => args)).once.and_call_original)
perform_action
expect(worker.jobs.count).to eq args.count
expect(worker.jobs).to all(include('enqueued_at'))
end
end
before do
stub_const(worker.name, worker)
end
let(:args) do
[
['Foo', [1]],
['Foo', [2]],
['Foo', [3]],
['Foo', [4]],
['Foo', [5]]
]
end
describe '.bulk_perform_async' do
shared_examples_for 'does not schedule the jobs for any specific time' do
it 'does not schedule the jobs for any specific time' do
perform_action
expect(worker.jobs).to all(exclude('at'))
end
end
subject(:perform_action) do
worker.bulk_perform_async(args)
end
context 'push_bulk in safe limit batches' do
context 'when the number of jobs to be enqueued does not exceed the safe limit' do
include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'does not schedule the jobs for any specific time'
end
context 'when the number of jobs to be enqueued exceeds safe limit' do
include_context 'set safe limit below the number of jobs to be enqueued'
it_behaves_like 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'does not schedule the jobs for any specific time'
end
context 'when the feature flag `sidekiq_push_bulk_in_batches` is disabled' do
include_context 'disable the `sidekiq_push_bulk_in_batches` feature flag'
context 'when the number of jobs to be enqueued does not exceed the safe limit' do
include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'does not schedule the jobs for any specific time'
end
context 'when the number of jobs to be enqueued exceeds safe limit' do
include_context 'set safe limit below the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'does not schedule the jobs for any specific time'
end
end
end
end
describe '.bulk_perform_in' do
context 'without batches' do
shared_examples_for 'schedules all the jobs at a specific time' do
it 'schedules all the jobs at a specific time' do
perform_action
worker.jobs.each do |job_detail|
expect(job_detail['at']).to be_within(3.seconds).of(expected_scheduled_at_time)
end
end
end
let(:delay) { 3.minutes }
let(:expected_scheduled_at_time) { Time.current.to_i + delay.to_i }
subject(:perform_action) do
worker.bulk_perform_in(delay, args)
end
context 'when the scheduled time falls in the past' do
let(:delay) { -60 }
it 'raises an ArgumentError exception' do
expect { perform_action }
.to raise_error(ArgumentError)
end
end
context 'push_bulk in safe limit batches' do
context 'when the number of jobs to be enqueued does not exceed the safe limit' do
include_context 'set safe limit beyond the number of jobs to be enqueued'
worker.bulk_perform_in( it_behaves_like 'enqueues jobs in one go'
1.minute, it_behaves_like 'returns job_id of all enqueued jobs'
[['Foo', [1]], ['Foo', [2]], ['Foo', [3]], ['Foo', [4]], ['Foo', [5]]], it_behaves_like 'schedules all the jobs at a specific time'
batch_size: 2, batch_delay: batch_delay) end
context 'when the number of jobs to be enqueued exceeds safe limit' do
include_context 'set safe limit below the number of jobs to be enqueued'
it_behaves_like 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time'
end
context 'when the feature flag `sidekiq_push_bulk_in_batches` is disabled' do
include_context 'disable the `sidekiq_push_bulk_in_batches` feature flag'
context 'when the number of jobs to be enqueued does not exceed the safe limit' do
include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time'
end
expect(worker.jobs.count).to eq 5 context 'when the number of jobs to be enqueued exceeds safe limit' do
expect(worker.jobs[0]['at']).to eq(worker.jobs[1]['at']) include_context 'set safe limit below the number of jobs to be enqueued'
expect(worker.jobs[2]['at']).to eq(worker.jobs[3]['at'])
expect(worker.jobs[2]['at'] - worker.jobs[1]['at']).to eq(batch_delay) it_behaves_like 'enqueues jobs in one go'
expect(worker.jobs[4]['at'] - worker.jobs[3]['at']).to eq(batch_delay) it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time'
end
end
end
end end
context 'when batch_size is invalid' do context 'with batches' do
it 'raises an ArgumentError exception' do shared_examples_for 'schedules all the jobs at a specific time, per batch' do
expect do it 'schedules all the jobs at a specific time, per batch' do
worker.bulk_perform_in(1.minute, perform_action
[['Foo']],
batch_size: -1, batch_delay: batch_delay) expect(worker.jobs[0]['at']).to eq(worker.jobs[1]['at'])
end.to raise_error(ArgumentError) expect(worker.jobs[2]['at']).to eq(worker.jobs[3]['at'])
expect(worker.jobs[2]['at'] - worker.jobs[1]['at']).to eq(batch_delay)
expect(worker.jobs[4]['at'] - worker.jobs[3]['at']).to eq(batch_delay)
end
end
let(:delay) { 1.minute }
let(:batch_size) { 2 }
let(:batch_delay) { 10.minutes }
subject(:perform_action) do
worker.bulk_perform_in(delay, args, batch_size: batch_size, batch_delay: batch_delay)
end
context 'when the `batch_size` is invalid' do
context 'when `batch_size` is 0' do
let(:batch_size) { 0 }
it 'raises an ArgumentError exception' do
expect { perform_action }
.to raise_error(ArgumentError)
end
end
context 'when `batch_size` is negative' do
let(:batch_size) { -3 }
it 'raises an ArgumentError exception' do
expect { perform_action }
.to raise_error(ArgumentError)
end
end
end
context 'when the `batch_delay` is invalid' do
context 'when `batch_delay` is 0' do
let(:batch_delay) { 0.minutes }
it 'raises an ArgumentError exception' do
expect { perform_action }
.to raise_error(ArgumentError)
end
end
context 'when `batch_delay` is negative' do
let(:batch_delay) { -3.minutes }
it 'raises an ArgumentError exception' do
expect { perform_action }
.to raise_error(ArgumentError)
end
end
end
context 'push_bulk in safe limit batches' do
context 'when the number of jobs to be enqueued does not exceed the safe limit' do
include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time, per batch'
end
context 'when the number of jobs to be enqueued exceeds safe limit' do
include_context 'set safe limit below the number of jobs to be enqueued'
it_behaves_like 'enqueues the jobs in a batched fashion, with each batch enqueing jobs as per the set safe limit'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time, per batch'
end
context 'when the feature flag `sidekiq_push_bulk_in_batches` is disabled' do
include_context 'disable the `sidekiq_push_bulk_in_batches` feature flag'
context 'when the number of jobs to be enqueued does not exceed the safe limit' do
include_context 'set safe limit beyond the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time, per batch'
end
context 'when the number of jobs to be enqueued exceeds safe limit' do
include_context 'set safe limit below the number of jobs to be enqueued'
it_behaves_like 'enqueues jobs in one go'
it_behaves_like 'returns job_id of all enqueued jobs'
it_behaves_like 'schedules all the jobs at a specific time, per batch'
end
end
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment