Commit 9cff604d authored by Heinrich Lee Yu's avatar Heinrich Lee Yu

Sleep instead of scheduling in the future

When handling Sidekiq workers that use replicas, we add a sleep in the
worker when the minimum delay is not yet reached. This replaces the
strategy we do now where we schedule jobs 1 second in the future.

We are testing this because Sidekiq scheduled set processing can be
expensive and the latencies are high.
parent 1bdfeab2
......@@ -93,9 +93,11 @@ module ApplicationWorker
end
def perform_async(*args)
return super if Gitlab::Database::LoadBalancing.primary_only?
# Worker execution for workers with data_consistency set to :delayed or :sticky
# will be delayed to give replication enough time to complete
if utilizes_load_balancing_capabilities?
if utilizes_load_balancing_capabilities? && Feature.disabled?(:skip_scheduling_workers_for_replicas, default_enabled: :yaml)
perform_in(delay_interval, *args)
else
super
......
---
name: skip_scheduling_workers_for_replicas
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/74532
rollout_issue_url: https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1380
milestone: '14.5'
type: development
group: group::project management
default_enabled: false
......@@ -30,6 +30,10 @@ module Gitlab
end
end
def self.primary_only?
each_load_balancer.all?(&:primary_only?)
end
def self.release_hosts
each_load_balancer(&:release_host)
end
......
......@@ -6,6 +6,8 @@ module Gitlab
class SidekiqServerMiddleware
JobReplicaNotUpToDate = Class.new(StandardError)
MINIMUM_DELAY_INTERVAL = 1
def call(worker, job, _queue)
worker_class = worker.class
strategy = select_load_balancing_strategy(worker_class, job)
......@@ -42,7 +44,9 @@ module Gitlab
wal_locations = get_wal_locations(job)
return :primary_no_wal unless wal_locations
return :primary_no_wal if wal_locations.blank?
sleep_if_needed(job)
if databases_in_sync?(wal_locations)
# Happy case: we can read from a replica.
......@@ -56,6 +60,11 @@ module Gitlab
end
end
def sleep_if_needed(job)
time_diff = Time.current.to_f - job['created_at'].to_f
sleep time_diff if time_diff > 0 && time_diff < MINIMUM_DELAY_INTERVAL
end
def get_wal_locations(job)
job['dedup_wal_locations'] || job['wal_locations'] || legacy_wal_location(job)
end
......
......@@ -77,9 +77,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware, :clean_
include_examples 'load balancing strategy', expected_strategy
end
shared_examples_for 'sticks based on data consistency' do |data_consistency|
include_context 'data consistency worker class', data_consistency, :load_balancing_for_test_data_consistency_worker
shared_examples_for 'sticks based on data consistency' do
context 'when load_balancing_for_test_data_consistency_worker is disabled' do
before do
stub_feature_flags(load_balancing_for_test_data_consistency_worker: false)
......@@ -136,6 +134,52 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware, :clean_
end
end
shared_examples_for 'sleeps when necessary' do
context 'when WAL locations are blank', :freeze_time do
let(:job) { { "retry" => 3, "job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e", "wal_locations" => {}, "created_at" => Time.current.to_f - (described_class::MINIMUM_DELAY_INTERVAL - 0.3) } }
it 'does not sleep' do
expect(middleware).not_to receive(:sleep)
run_middleware
end
end
context 'when WAL locations are present', :freeze_time do
let(:job) { { "retry" => 3, "job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e", "database_replica_location" => "0/D525E3A8", "created_at" => Time.current.to_f - elapsed_time } }
context 'when delay interval has not elapsed' do
let(:elapsed_time) { described_class::MINIMUM_DELAY_INTERVAL - 0.3 }
it 'sleeps until the minimum delay is reached' do
expect(middleware).to receive(:sleep).with(be_within(0.01).of(elapsed_time))
run_middleware
end
end
context 'when delay interval has elapsed' do
let(:elapsed_time) { described_class::MINIMUM_DELAY_INTERVAL + 0.3 }
it 'does not sleep' do
expect(middleware).not_to receive(:sleep)
run_middleware
end
end
context 'when created_at is in the future' do
let(:elapsed_time) { -5 }
it 'does not sleep' do
expect(middleware).not_to receive(:sleep)
run_middleware
end
end
end
end
context 'when worker class does not include ApplicationWorker' do
let(:worker) { ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper.new }
......@@ -146,10 +190,24 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware, :clean_
include_context 'data consistency worker class', :always, :load_balancing_for_test_data_consistency_worker
include_examples 'stick to the primary', 'primary'
context 'when delay interval has not elapsed', :freeze_time do
let(:job) { { "retry" => 3, "job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e", 'database_replica_location' => '0/D525E3A8', "created_at" => Time.current.to_f - elapsed_time } }
let(:elapsed_time) { described_class::MINIMUM_DELAY_INTERVAL - 0.3 }
it 'does not sleep' do
expect(middleware).not_to receive(:sleep)
run_middleware
end
end
end
context 'when worker data consistency is :delayed' do
include_examples 'sticks based on data consistency', :delayed
include_context 'data consistency worker class', :delayed, :load_balancing_for_test_data_consistency_worker
include_examples 'sticks based on data consistency'
include_examples 'sleeps when necessary'
context 'when replica is not up to date' do
before do
......@@ -195,7 +253,10 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware, :clean_
end
context 'when worker data consistency is :sticky' do
include_examples 'sticks based on data consistency', :sticky
include_context 'data consistency worker class', :sticky, :load_balancing_for_test_data_consistency_worker
include_examples 'sticks based on data consistency'
include_examples 'sleeps when necessary'
context 'when replica is not up to date' do
before do
......@@ -255,7 +316,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware, :clean_
end
def run_middleware
middleware.call(worker, job, double(:queue)) { yield }
middleware.call(worker, job, double(:queue)) { yield if block_given? }
rescue described_class::JobReplicaNotUpToDate
# we silence errors here that cause the job to retry
end
......
......@@ -38,6 +38,24 @@ RSpec.describe Gitlab::Database::LoadBalancing do
end
end
describe '.primary_only?' do
it 'returns true if all load balancers have no replicas' do
described_class.each_load_balancer do |lb|
allow(lb).to receive(:primary_only?).and_return(true)
end
expect(described_class.primary_only?).to eq(true)
end
it 'returns false if at least one has replicas' do
described_class.each_load_balancer.with_index do |lb, index|
allow(lb).to receive(:primary_only?).and_return(index != 0)
end
expect(described_class.primary_only?).to eq(false)
end
end
describe '.release_hosts' do
it 'releases the host of every load balancer' do
described_class.each_load_balancer do |lb|
......
......@@ -239,6 +239,8 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do
shared_context 'worker declaring data consistency' do
let(:worker_class) { LBTestWorker }
let(:wal_locations) { { Gitlab::Database::MAIN_DATABASE_NAME.to_sym => 'AB/12345' } }
let(:job) { { "retry" => 3, "job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e", "wal_locations" => wal_locations } }
before do
stub_const('LBTestWorker', Class.new(TestWorker))
......
......@@ -23,14 +23,6 @@ RSpec.describe BuildHooksWorker do
end
end
describe '.perform_async' do
it 'delays scheduling a job by calling perform_in with default delay' do
expect(described_class).to receive(:perform_in).with(ApplicationWorker::DEFAULT_DELAY_INTERVAL.second, 123)
described_class.perform_async(123)
end
end
it_behaves_like 'worker with data consistency',
described_class,
data_consistency: :delayed
......
......@@ -248,39 +248,40 @@ RSpec.describe ApplicationWorker do
end
describe '.perform_async' do
before do
stub_const(worker.name, worker)
end
shared_examples_for 'worker utilizes load balancing capabilities' do |data_consistency|
before do
worker.data_consistency(data_consistency)
end
it 'call perform_in' do
expect(worker).to receive(:perform_in).with(described_class::DEFAULT_DELAY_INTERVAL.seconds, 123)
using RSpec::Parameterized::TableSyntax
worker.perform_async(123)
end
where(:primary_only?, :skip_scheduling_ff, :data_consistency, :schedules_job?) do
true | false | :sticky | false
true | false | :delayed | false
true | false | :always | false
true | true | :sticky | false
true | true | :delayed | false
true | true | :always | false
false | false | :sticky | true
false | false | :delayed | true
false | false | :always | false
false | true | :sticky | false
false | true | :delayed | false
false | true | :always | false
end
context 'when workers data consistency is :sticky' do
it_behaves_like 'worker utilizes load balancing capabilities', :sticky
end
before do
stub_const(worker.name, worker)
worker.data_consistency(data_consistency)
context 'when workers data consistency is :delayed' do
it_behaves_like 'worker utilizes load balancing capabilities', :delayed
allow(Gitlab::Database::LoadBalancing).to receive(:primary_only?).and_return(primary_only?)
stub_feature_flags(skip_scheduling_workers_for_replicas: skip_scheduling_ff)
end
context 'when workers data consistency is :always' do
before do
worker.data_consistency(:always)
end
it 'does not call perform_in' do
expect(worker).not_to receive(:perform_in)
with_them do
it 'schedules or enqueues the job correctly' do
if schedules_job?
expect(worker).to receive(:perform_in).with(described_class::DEFAULT_DELAY_INTERVAL.seconds, 123)
else
expect(worker).not_to receive(:perform_in)
end
worker.perform_async
worker.perform_async(123)
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment