Commit 1800377d authored by Nikola Milojevic's avatar Nikola Milojevic Committed by Bob Van Landuyt

Enable load_balancing for sidekiq

- Introduce server and client sidekiq middleware
- Introduce data consistencies for workers
parent c557d48c
......@@ -11,6 +11,8 @@ module WorkerAttributes
# Urgencies that workers can declare through the `urgencies` attribute
VALID_URGENCIES = [:high, :low, :throttled].freeze
VALID_DATA_CONSISTENCIES = [:always, :sticky, :delayed].freeze
NAMESPACE_WEIGHTS = {
auto_devops: 2,
auto_merge: 3,
......@@ -69,6 +71,35 @@ module WorkerAttributes
class_attributes[:urgency] || :low
end
def data_consistency(data_consistency, feature_flag: nil)
raise ArgumentError, "Invalid data consistency: #{data_consistency}" unless VALID_DATA_CONSISTENCIES.include?(data_consistency)
raise ArgumentError, 'Data consistency is already set' if class_attributes[:data_consistency]
class_attributes[:data_consistency_feature_flag] = feature_flag if feature_flag
class_attributes[:data_consistency] = data_consistency
validate_worker_attributes!
end
def validate_worker_attributes!
# Since the deduplication should always take into account the latest binary replication pointer into account,
# not the first one, the deduplication will not work with sticky or delayed.
# Follow up issue to improve this: https://gitlab.com/gitlab-org/gitlab/-/issues/325291
if idempotent? && get_data_consistency != :always
raise ArgumentError, "Class can't be marked as idempotent if data_consistency is not set to :always"
end
end
def get_data_consistency
class_attributes[:data_consistency] || :always
end
def get_data_consistency_feature_flag_enabled?
return true unless class_attributes[:data_consistency_feature_flag]
Feature.enabled?(class_attributes[:data_consistency_feature_flag], default_enabled: :yaml)
end
# Set this attribute on a job when it will call to services outside of the
# application, such as 3rd party applications, other k8s clusters etc See
# doc/development/sidekiq_style_guide.md#jobs-with-external-dependencies for
......@@ -96,6 +127,8 @@ module WorkerAttributes
def idempotent!
class_attributes[:idempotent] = true
validate_worker_attributes!
end
def idempotent?
......
# frozen_string_literal: true
module EE
module Gitlab
# The SidekiqMiddleware class is responsible for configuring the
# middleware stacks used in the client and server middlewares
module SidekiqMiddleware
extend ::Gitlab::Utils::Override
override :server_configurator
def server_configurator(metrics: true, arguments_logger: true, memory_killer: true)
lambda do |chain|
super.call(chain)
if load_balancing_enabled?
chain.insert_after(::Labkit::Middleware::Sidekiq::Server,
::Gitlab::Database::LoadBalancing::SidekiqServerMiddleware)
end
end
end
override :client_configurator
def client_configurator
lambda do |chain|
super.call(chain)
chain.add ::Gitlab::Database::LoadBalancing::SidekiqClientMiddleware if load_balancing_enabled?
end
end
private
def load_balancing_enabled?
::Gitlab::Database::LoadBalancing.enable?
end
end
end
end
......@@ -84,7 +84,8 @@ module Gitlab
# Returns true if load balancing is to be enabled.
def self.enable?
return false if Gitlab::Runtime.rake? || Gitlab::Runtime.sidekiq?
return false if Gitlab::Runtime.rake?
return false if Gitlab::Runtime.sidekiq? && !Gitlab::Utils.to_boolean(ENV['ENABLE_LOAD_BALANCING_FOR_SIDEKIQ'], default: false)
return false unless self.configured?
true
......@@ -113,7 +114,6 @@ module Gitlab
# -> Set @feature_available to true
# -> return true
# - Second call: return @feature_available right away
return @feature_available if defined?(@feature_available)
@feature_available = false
......
# frozen_string_literal: true
module Gitlab
module Database
module LoadBalancing
class SidekiqClientMiddleware
def call(worker_class, job, _queue, _redis_pool)
worker_class = worker_class.to_s.safe_constantize
mark_data_consistency_location(worker_class, job)
yield
end
private
def mark_data_consistency_location(worker_class, job)
# Mailers can't be constantized
return unless worker_class
return unless worker_class.include?(::ApplicationWorker)
return unless worker_class.get_data_consistency_feature_flag_enabled?
return if worker_class.get_data_consistency == :always
if Session.current.performed_write?
job['database_write_location'] = load_balancer.primary_write_location
else
# It is possible that the current replica has a different write-ahead
# replication log location from the sidekiq server replica.
# In the follow-up issue https://gitlab.com/gitlab-org/gitlab/-/issues/325519,
# we want to pass database replica location as well
job['database_replica_location'] = true
end
end
def load_balancer
LoadBalancing.proxy.load_balancer
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Database
module LoadBalancing
class SidekiqServerMiddleware
JobReplicaNotUpToDate = Class.new(StandardError)
def call(worker, job, _queue)
if requires_primary?(worker.class, job)
Session.current.use_primary!
end
yield
ensure
clear
end
private
def clear
load_balancer.release_host
Session.clear_session
end
def requires_primary?(worker_class, job)
return true unless worker_class.include?(::ApplicationWorker)
job[:worker_data_consistency] = worker_class.get_data_consistency
return true if worker_class.get_data_consistency == :always
return true unless worker_class.get_data_consistency_feature_flag_enabled?
if job['database_replica_location'] || replica_caught_up?(job['database_write_location'] )
false
elsif worker_class.get_data_consistency == :delayed && job['retry_count'].to_i == 0
raise JobReplicaNotUpToDate, "Sidekiq job #{worker_class} JID-#{job['jid']} couldn't use the replica."\
" Replica was not up to date."
else
true
end
end
def load_balancer
LoadBalancing.proxy.load_balancer
end
def replica_caught_up?(location)
return true unless location
load_balancer.host.caught_up?(location)
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::SidekiqMiddleware do
let(:job_args) { [0.01] }
let(:disabled_sidekiq_middlewares) { [] }
let(:chain) { Sidekiq::Middleware::Chain.new }
let(:queue) { 'test' }
let(:enabled_sidekiq_middlewares) { all_sidekiq_middlewares - disabled_sidekiq_middlewares }
let(:worker_class) do
Class.new do
def self.name
'TestWorker'
end
include ApplicationWorker
def perform(*args)
end
end
end
before do
stub_const('TestWorker', worker_class)
end
shared_examples "a middleware chain" do |load_balancing_enabled|
before do
allow(::Gitlab::Database::LoadBalancing).to receive(:enable?).and_return(load_balancing_enabled)
configurator.call(chain)
end
it "passes through the right middlewares", :aggregate_failures do
enabled_sidekiq_middlewares.each do |middleware|
expect_next_instances_of(middleware, 1, true) do |middleware_instance|
expect(middleware_instance).to receive(:call).with(*middleware_expected_args).once.and_call_original
end
end
expect { |b| chain.invoke(*worker_args, &b) }.to yield_control.once
end
end
shared_examples "a middleware chain for mailer" do |load_balancing_enabled|
let(:worker_class) { ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper }
it_behaves_like "a middleware chain", load_balancing_enabled
end
describe '.server_configurator' do
let(:configurator) { described_class.server_configurator }
let(:worker_args) { [worker_class.new, { 'args' => job_args }, queue] }
let(:middleware_expected_args) { [a_kind_of(worker_class), hash_including({ 'args' => job_args }), queue] }
let(:all_sidekiq_middlewares) do
[
::Gitlab::SidekiqMiddleware::Monitor,
::Gitlab::SidekiqMiddleware::ServerMetrics,
::Gitlab::SidekiqMiddleware::ArgumentsLogger,
::Gitlab::SidekiqMiddleware::MemoryKiller,
::Gitlab::SidekiqMiddleware::RequestStoreMiddleware,
::Gitlab::SidekiqMiddleware::ExtraDoneLogMetadata,
::Gitlab::SidekiqMiddleware::BatchLoader,
::Labkit::Middleware::Sidekiq::Server,
::Gitlab::Database::LoadBalancing::SidekiqServerMiddleware,
::Gitlab::SidekiqMiddleware::InstrumentationLogger,
::Gitlab::SidekiqMiddleware::AdminMode::Server,
::Gitlab::SidekiqVersioning::Middleware,
::Gitlab::SidekiqStatus::ServerMiddleware,
::Gitlab::SidekiqMiddleware::WorkerContext::Server,
::Gitlab::SidekiqMiddleware::DuplicateJobs::Server
]
end
context "when load balancing is enabled" do
before do
allow(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer, :release_host)
end
it_behaves_like "a middleware chain", true
it_behaves_like "a middleware chain for mailer", true
end
context "when load balancing is disabled" do
let(:disabled_sidekiq_middlewares) do
[
Gitlab::Database::LoadBalancing::SidekiqServerMiddleware
]
end
it_behaves_like "a middleware chain", false
it_behaves_like "a middleware chain for mailer", false
end
end
describe '.client_configurator' do
let(:configurator) { described_class.client_configurator }
let(:redis_pool) { Sidekiq.redis_pool }
let(:middleware_expected_args) { [worker_class, hash_including({ 'args' => job_args }), queue, redis_pool] }
let(:worker_args) { [worker_class, { 'args' => job_args }, queue, redis_pool] }
let(:all_sidekiq_middlewares) do
[
::Gitlab::SidekiqMiddleware::WorkerContext::Client,
::Labkit::Middleware::Sidekiq::Client,
::Gitlab::SidekiqMiddleware::DuplicateJobs::Client,
::Gitlab::SidekiqStatus::ClientMiddleware,
::Gitlab::SidekiqMiddleware::AdminMode::Client,
::Gitlab::SidekiqMiddleware::SizeLimiter::Client,
::Gitlab::SidekiqMiddleware::ClientMetrics,
::Gitlab::Database::LoadBalancing::SidekiqClientMiddleware
]
end
context "when load balancing is disabled" do
let(:disabled_sidekiq_middlewares) do
[
Gitlab::Database::LoadBalancing::SidekiqClientMiddleware
]
end
it_behaves_like "a middleware chain", false
it_behaves_like "a middleware chain for mailer", false
end
context "when load balancing is enabled" do
it_behaves_like "a middleware chain", true
it_behaves_like "a middleware chain for mailer", true
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Database::LoadBalancing::SidekiqClientMiddleware do
let(:middleware) { described_class.new }
after do
Gitlab::Database::LoadBalancing::Session.clear_session
end
describe '#call' do
shared_context 'data consistency worker class' do |data_consistency, feature_flag|
let(:worker_class) do
Class.new do
def self.name
'TestDataConsistencyWorker'
end
include ApplicationWorker
data_consistency data_consistency, feature_flag: feature_flag
def perform(*args)
end
end
end
before do
stub_const('TestDataConsistencyWorker', worker_class)
end
end
shared_examples_for 'mark database_replica_location' do
it 'passes database_replica_location' do
expect(middleware).not_to receive(:load_balancer)
middleware.call(worker_class, job, double(:queue), redis_pool) { 10 }
expect(job['database_replica_location']).to be_truthy
end
end
shared_examples_for 'does not pass database locations' do
it 'does not pass database locations', :aggregate_failures do
middleware.call(worker_class, job, double(:queue), redis_pool) { 10 }
expect(job['database_replica_location']).to be_nil
expect(job['database_write_location']).to be_nil
end
end
shared_examples_for 'mark data consistency location' do |data_consistency|
include_context 'data consistency worker class', data_consistency, :load_balancing_for_test_data_consistency_worker
let(:location) { '0/D525E3A8' }
context 'when feature flag load_balancing_for_sidekiq is disabled' do
before do
stub_feature_flags(load_balancing_for_test_data_consistency_worker: false)
end
include_examples 'does not pass database locations'
end
context 'when write was not performed' do
before do
allow(Gitlab::Database::LoadBalancing::Session.current).to receive(:performed_write?).and_return(false)
end
include_examples 'mark database_replica_location'
end
context 'when write was performed' do
before do
allow(Gitlab::Database::LoadBalancing::Session.current).to receive(:performed_write?).and_return(true)
end
it 'passes primary write location', :aggregate_failures do
expect(middleware).to receive_message_chain(:load_balancer, :primary_write_location).and_return(location)
middleware.call(worker_class, job, double(:queue), redis_pool) { 10 }
expect(job['database_write_location']).to eq(location)
end
end
end
let(:queue) { 'default' }
let(:redis_pool) { Sidekiq.redis_pool }
let(:worker_class) { 'TestDataConsistencyWorker' }
let(:job) { { "job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e" } }
before do
skip_feature_flags_yaml_validation
skip_default_enabled_yaml_check
end
context 'when worker cannot be constantized' do
let(:worker_class) { 'ActionMailer::MailDeliveryJob' }
include_examples 'does not pass database locations'
end
context 'when worker class does not include ApplicationWorker' do
let(:worker_class) { ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper }
include_examples 'does not pass database locations'
end
context 'when worker data consistency is :always' do
include_context 'data consistency worker class', :always, :load_balancing_for_test_data_consistency_worker
include_examples 'does not pass database locations'
end
context 'when worker data consistency is :delayed' do
include_examples 'mark data consistency location', :delayed
end
context 'when worker data consistency is :sticky' do
include_examples 'mark data consistency location', :sticky
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
let(:middleware) { described_class.new }
after do
Gitlab::Database::LoadBalancing::Session.clear_session
end
describe '#call' do
shared_context 'data consistency worker class' do |data_consistency, feature_flag|
let(:worker_class) do
Class.new do
def self.name
'TestDataConsistencyWorker'
end
include ApplicationWorker
data_consistency data_consistency, feature_flag: feature_flag
def perform(*args)
end
end
end
before do
stub_const('TestDataConsistencyWorker', worker_class)
end
end
shared_examples_for 'stick to the primary' do
it 'sticks to the primary' do
middleware.call(worker, job, double(:queue)) do
expect(Gitlab::Database::LoadBalancing::Session.current.use_primary?).to be_truthy
end
end
end
shared_examples_for 'sticks based on data consistency' do |data_consistency|
include_context 'data consistency worker class', data_consistency, :load_balancing_for_test_data_consistency_worker
context 'when load_balancing_for_test_data_consistency_worker is disabled' do
before do
stub_feature_flags(load_balancing_for_test_data_consistency_worker: false)
end
include_examples 'stick to the primary'
end
context 'database replica location is set' do
let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e', 'database_replica_location' => 'true' } }
it 'do not stick to the primary' do
middleware.call(worker, job, double(:queue)) do
expect(Gitlab::Database::LoadBalancing::Session.current.use_primary?).not_to be_truthy
end
end
end
context 'write was not performed' do
let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e' } }
it 'do not stick to the primary' do
middleware.call(worker, job, double(:queue)) do
expect(Gitlab::Database::LoadBalancing::Session.current.use_primary?).not_to be_truthy
end
end
end
context 'replica is up to date' do
before do
allow(middleware).to receive(:replica_caught_up?).and_return(true)
end
it 'do not stick to the primary' do
middleware.call(worker, job, double(:queue)) do
expect(Gitlab::Database::LoadBalancing::Session.current.use_primary?).not_to be_truthy
end
end
end
end
let(:queue) { 'default' }
let(:redis_pool) { Sidekiq.redis_pool }
let(:worker) { worker_class.new }
let(:job) { { "job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e", 'primary_write_location' => '0/D525E3A8' } }
let(:block) { 10 }
before do
skip_feature_flags_yaml_validation
skip_default_enabled_yaml_check
allow(middleware).to receive(:clear)
allow(Gitlab::Database::LoadBalancing::Session.current).to receive(:performed_write?).and_return(true)
end
context 'when worker class does not include ApplicationWorker' do
let(:worker) { ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper.new }
include_examples 'stick to the primary'
end
context 'when worker data consistency is :always' do
include_context 'data consistency worker class', :always, :load_balancing_for_test_data_consistency_worker
include_examples 'stick to the primary'
end
context 'when worker data consistency is :delayed' do
include_examples 'sticks based on data consistency', :delayed
context 'when replica is not up to date' do
before do
allow(middleware).to receive(:replica_caught_up?).and_return(false)
end
context 'when job is retried once' do
it 'raise an error and retries' do
expect { middleware.call(worker, job, double(:queue)) { block } }.to raise_error(Gitlab::Database::LoadBalancing::SidekiqServerMiddleware::JobReplicaNotUpToDate)
end
end
context 'when job is retried more then once' do
before do
job['retry_count'] = 1
end
include_examples 'stick to the primary'
end
end
end
context 'when worker data consistency is :sticky' do
include_examples 'sticks based on data consistency', :sticky
context 'when replica is not up to date' do
before do
allow(middleware).to receive(:replica_caught_up?).and_return(false)
end
include_examples 'stick to the primary'
end
end
end
end
......@@ -142,7 +142,6 @@ RSpec.describe Gitlab::Database::LoadBalancing do
end
it 'returns false when Sidekiq is being used' do
allow(described_class).to receive(:hosts).and_return(%w(foo))
allow(Gitlab::Runtime).to receive(:sidekiq?).and_return(true)
expect(described_class.enable?).to eq(false)
......@@ -171,6 +170,18 @@ RSpec.describe Gitlab::Database::LoadBalancing do
expect(described_class.enable?).to eq(true)
end
context 'when ENABLE_LOAD_BALANCING_FOR_SIDEKIQ environment variable is set' do
before do
stub_env('ENABLE_LOAD_BALANCING_FOR_SIDEKIQ', 'true')
end
it 'returns true when Sidekiq is being used' do
allow(Gitlab::Runtime).to receive(:sidekiq?).and_return(true)
expect(described_class.enable?).to eq(true)
end
end
context 'without a license' do
before do
License.destroy_all # rubocop: disable Cop/DestroyAll
......
......@@ -16,6 +16,7 @@ module Gitlab
:elasticsearch_calls,
:elasticsearch_duration_s,
:elasticsearch_timed_out_count,
:worker_data_consistency,
*::Gitlab::Memory::Instrumentation::KEY_MAPPING.values,
*::Gitlab::Instrumentation::Redis.known_payload_keys,
*::Gitlab::Metrics::Subscribers::ActiveRecord::DB_COUNTERS,
......
......@@ -43,3 +43,5 @@ module Gitlab
end
end
end
Gitlab::SidekiqMiddleware.singleton_class.prepend_if_ee('EE::Gitlab::SidekiqMiddleware')
......@@ -17,6 +17,7 @@ RSpec.describe Gitlab::InstrumentationHelper do
:elasticsearch_calls,
:elasticsearch_duration_s,
:elasticsearch_timed_out_count,
:worker_data_consistency,
:mem_objects,
:mem_bytes,
:mem_mallocs,
......
......@@ -2,25 +2,26 @@
module NextInstanceOf
def expect_next_instance_of(klass, *new_args, &blk)
stub_new(expect(klass), nil, *new_args, &blk)
stub_new(expect(klass), nil, false, *new_args, &blk)
end
def expect_next_instances_of(klass, number, *new_args, &blk)
stub_new(expect(klass), number, *new_args, &blk)
def expect_next_instances_of(klass, number, ordered = false, *new_args, &blk)
stub_new(expect(klass), number, ordered, *new_args, &blk)
end
def allow_next_instance_of(klass, *new_args, &blk)
stub_new(allow(klass), nil, *new_args, &blk)
stub_new(allow(klass), nil, false, *new_args, &blk)
end
def allow_next_instances_of(klass, number, *new_args, &blk)
stub_new(allow(klass), number, *new_args, &blk)
def allow_next_instances_of(klass, number, ordered = false, *new_args, &blk)
stub_new(allow(klass), number, ordered, *new_args, &blk)
end
private
def stub_new(target, number, *new_args, &blk)
def stub_new(target, number, ordered = false, *new_args, &blk)
receive_new = receive(:new)
receive_new.ordered if ordered
receive_new.exactly(number).times if number
receive_new.with(*new_args) if new_args.any?
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe WorkerAttributes do
let(:worker) do
Class.new do
def self.name
"TestWorker"
end
include ApplicationWorker
end
end
describe '.data_consistency' do
context 'with valid data_consistency' do
it 'returns correct data_consistency' do
worker.data_consistency(:sticky)
expect(worker.get_data_consistency).to eq(:sticky)
end
end
context 'when data_consistency is not provided' do
it 'defaults to :always' do
expect(worker.get_data_consistency).to eq(:always)
end
end
context 'with invalid data_consistency' do
it 'raise exception' do
expect { worker.data_consistency(:invalid) }
.to raise_error('Invalid data consistency: invalid')
end
end
context 'when job is idempotent' do
context 'when data_consistency is not :always' do
it 'raise exception' do
worker.idempotent!
expect { worker.data_consistency(:sticky) }
.to raise_error("Class can't be marked as idempotent if data_consistency is not set to :always")
end
end
context 'when feature_flag is provided' do
before do
stub_feature_flags(test_feature_flag: false)
skip_feature_flags_yaml_validation
skip_default_enabled_yaml_check
end
it 'returns correct feature flag value' do
worker.data_consistency(:sticky, feature_flag: :test_feature_flag)
expect(worker.get_data_consistency_feature_flag_enabled?).not_to be_truthy
end
end
end
end
describe '.idempotent!' do
context 'when data consistency is not :always' do
it 'raise exception' do
worker.data_consistency(:sticky)
expect { worker.idempotent! }
.to raise_error("Class can't be marked as idempotent if data_consistency is not set to :always")
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment