Commit 18501e37 authored by Matthias Kaeppler's avatar Matthias Kaeppler

Refactor db selection in SidekiqServerMiddleware

This change separates decision making from acting
when considering how the database should be accessed
in Sidekiq workers (primary vs replica nodes). This
makes for a cleaner split between state decisions
and side effects we need to perform based on them.

This also fixes a problem we introduced earlier
where the data_consistency job field was injected
twice, and renames database_chose to
load_balancing_strategy.

Changelog: changed
parent 11731870
...@@ -12,6 +12,7 @@ module WorkerAttributes ...@@ -12,6 +12,7 @@ module WorkerAttributes
VALID_URGENCIES = [:high, :low, :throttled].freeze VALID_URGENCIES = [:high, :low, :throttled].freeze
VALID_DATA_CONSISTENCIES = [:always, :sticky, :delayed].freeze VALID_DATA_CONSISTENCIES = [:always, :sticky, :delayed].freeze
DEFAULT_DATA_CONSISTENCY = :always
NAMESPACE_WEIGHTS = { NAMESPACE_WEIGHTS = {
auto_devops: 2, auto_devops: 2,
...@@ -110,7 +111,7 @@ module WorkerAttributes ...@@ -110,7 +111,7 @@ module WorkerAttributes
end end
def get_data_consistency def get_data_consistency
class_attributes[:data_consistency] || :always class_attributes[:data_consistency] || DEFAULT_DATA_CONSISTENCY
end end
def get_data_consistency_feature_flag_enabled? def get_data_consistency_feature_flag_enabled?
......
...@@ -265,7 +265,7 @@ The following metrics are available: ...@@ -265,7 +265,7 @@ The following metrics are available:
| Metric | Type | Since | Description | Labels | | Metric | Type | Since | Description | Labels |
|:--------------------------------- |:--------- |:------------------------------------------------------------- |:-------------------------------------- |:--------------------------------------------------------- | |:--------------------------------- |:--------- |:------------------------------------------------------------- |:-------------------------------------- |:--------------------------------------------------------- |
| `db_load_balancing_hosts` | Gauge | [12.3](https://gitlab.com/gitlab-org/gitlab/-/issues/13630) | Current number of load balancing hosts | | | `db_load_balancing_hosts` | Gauge | [12.3](https://gitlab.com/gitlab-org/gitlab/-/issues/13630) | Current number of load balancing hosts | |
| `sidekiq_load_balancing_count` | Counter | 13.11 | Sidekiq jobs using load balancing with data consistency set to :sticky or :delayed | `queue`, `boundary`, `external_dependencies`, `feature_category`, `job_status`, `urgency`, `data_consistency`, `database_chosen` | | `sidekiq_load_balancing_count` | Counter | 13.11 | Sidekiq jobs using load balancing with data consistency set to :sticky or :delayed | `queue`, `boundary`, `external_dependencies`, `feature_category`, `job_status`, `urgency`, `data_consistency`, `load_balancing_strategy` |
## Database partitioning metrics **(PREMIUM SELF)** ## Database partitioning metrics **(PREMIUM SELF)**
......
...@@ -7,8 +7,21 @@ module Gitlab ...@@ -7,8 +7,21 @@ module Gitlab
JobReplicaNotUpToDate = Class.new(StandardError) JobReplicaNotUpToDate = Class.new(StandardError)
def call(worker, job, _queue) def call(worker, job, _queue)
if requires_primary?(worker.class, job) worker_class = worker.class
strategy = select_load_balancing_strategy(worker_class, job)
# This is consumed by ServerMetrics and StructuredLogger to emit metrics so we only
# make this available when load-balancing is actually utilized.
job['load_balancing_strategy'] = strategy.to_s if load_balancing_available?(worker_class)
case strategy
when :primary, :retry_primary
Session.current.use_primary! Session.current.use_primary!
when :retry_replica
raise JobReplicaNotUpToDate, "Sidekiq job #{worker_class} JID-#{job['jid']} couldn't use the replica."\
" Replica was not up to date."
when :replica
# this means we selected an up-to-date replica, but there is nothing to do in this case.
end end
yield yield
...@@ -23,31 +36,27 @@ module Gitlab ...@@ -23,31 +36,27 @@ module Gitlab
Session.clear_session Session.clear_session
end end
def requires_primary?(worker_class, job) def select_load_balancing_strategy(worker_class, job)
return true unless worker_class.include?(::ApplicationWorker) return :primary unless load_balancing_available?(worker_class)
return true unless worker_class.utilizes_load_balancing_capabilities?
return true unless worker_class.get_data_consistency_feature_flag_enabled?
location = job['database_write_location'] || job['database_replica_location'] location = job['database_write_location'] || job['database_replica_location']
return :primary unless location
return true unless location
job_data_consistency = worker_class.get_data_consistency
job[:data_consistency] = job_data_consistency.to_s
if replica_caught_up?(location) if replica_caught_up?(location)
job[:database_chosen] = 'replica' :replica
false elsif worker_class.get_data_consistency == :delayed
elsif job_data_consistency == :delayed && not_yet_retried?(job) not_yet_retried?(job) ? :retry_replica : :retry_primary
job[:database_chosen] = 'retry'
raise JobReplicaNotUpToDate, "Sidekiq job #{worker_class} JID-#{job['jid']} couldn't use the replica."\
" Replica was not up to date."
else else
job[:database_chosen] = 'primary' :primary
true
end end
end end
def load_balancing_available?(worker_class)
worker_class.include?(::ApplicationWorker) &&
worker_class.utilizes_load_balancing_capabilities? &&
worker_class.get_data_consistency_feature_flag_enabled?
end
def not_yet_retried?(job) def not_yet_retried?(job)
# if `retry_count` is `nil` it indicates that this job was never retried # if `retry_count` is `nil` it indicates that this job was never retried
# the `0` indicates that this is a first retry # the `0` indicates that this is a first retry
......
...@@ -68,7 +68,7 @@ module Gitlab ...@@ -68,7 +68,7 @@ module Gitlab
message = base_message(payload) message = base_message(payload)
payload['database_chosen'] = job[:database_chosen] if job[:database_chosen] payload['load_balancing_strategy'] = job['load_balancing_strategy'] if job['load_balancing_strategy']
if job_exception if job_exception
payload['message'] = "#{message}: fail: #{payload['duration_s']} sec" payload['message'] = "#{message}: fail: #{payload['duration_s']} sec"
......
...@@ -74,10 +74,10 @@ module Gitlab ...@@ -74,10 +74,10 @@ module Gitlab
@metrics[:sidekiq_elasticsearch_requests_total].increment(labels, get_elasticsearch_calls(instrumentation)) @metrics[:sidekiq_elasticsearch_requests_total].increment(labels, get_elasticsearch_calls(instrumentation))
@metrics[:sidekiq_elasticsearch_requests_duration_seconds].observe(labels, get_elasticsearch_time(instrumentation)) @metrics[:sidekiq_elasticsearch_requests_duration_seconds].observe(labels, get_elasticsearch_time(instrumentation))
if ::Gitlab::Database::LoadBalancing.enable? && job[:database_chosen] with_load_balancing_settings(job) do |settings|
load_balancing_labels = { load_balancing_labels = {
database_chosen: job[:database_chosen], load_balancing_strategy: settings['load_balancing_strategy'],
data_consistency: job[:data_consistency] data_consistency: settings['worker_data_consistency']
} }
@metrics[:sidekiq_load_balancing_count].increment(labels.merge(load_balancing_labels), 1) @metrics[:sidekiq_load_balancing_count].increment(labels.merge(load_balancing_labels), 1)
...@@ -105,6 +105,15 @@ module Gitlab ...@@ -105,6 +105,15 @@ module Gitlab
private private
def with_load_balancing_settings(job)
return unless ::Gitlab::Database::LoadBalancing.enable?
keys = %w[load_balancing_strategy worker_data_consistency]
return unless keys.all? { |k| job.key?(k) }
yield job.slice(*keys)
end
def get_thread_cputime def get_thread_cputime
defined?(Process::CLOCK_THREAD_CPUTIME_ID) ? Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID) : 0 defined?(Process::CLOCK_THREAD_CPUTIME_ID) ? Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID) : 0
end end
......
...@@ -5,6 +5,14 @@ require 'spec_helper' ...@@ -5,6 +5,14 @@ require 'spec_helper'
RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
let(:middleware) { described_class.new } let(:middleware) { described_class.new }
let(:load_balancer) { double.as_null_object }
let(:has_replication_lag) { false }
before do
allow(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer).and_return(load_balancer)
allow(load_balancer).to receive(:select_up_to_date_host).and_return(!has_replication_lag)
end
after do after do
Gitlab::Database::LoadBalancing::Session.clear_session Gitlab::Database::LoadBalancing::Session.clear_session
end end
...@@ -39,7 +47,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -39,7 +47,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
end end
end end
shared_examples_for 'replica is up to date' do |location, data_consistency| shared_examples_for 'replica is up to date' do |location|
it 'does not stick to the primary', :aggregate_failures do it 'does not stick to the primary', :aggregate_failures do
expect(middleware).to receive(:replica_caught_up?).with(location).and_return(true) expect(middleware).to receive(:replica_caught_up?).with(location).and_return(true)
...@@ -47,13 +55,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -47,13 +55,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
expect(Gitlab::Database::LoadBalancing::Session.current.use_primary?).not_to be_truthy expect(Gitlab::Database::LoadBalancing::Session.current.use_primary?).not_to be_truthy
end end
expect(job[:database_chosen]).to eq('replica') expect(job['load_balancing_strategy']).to eq('replica')
end
it "updates job hash with data_consistency :#{data_consistency}" do
middleware.call(worker, job, double(:queue)) do
expect(job).to include(data_consistency: data_consistency.to_s)
end
end end
end end
...@@ -75,7 +77,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -75,7 +77,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
allow(middleware).to receive(:replica_caught_up?).and_return(true) allow(middleware).to receive(:replica_caught_up?).and_return(true)
end end
it_behaves_like 'replica is up to date', '0/D525E3A8', data_consistency it_behaves_like 'replica is up to date', '0/D525E3A8'
end end
context 'when database primary location is set' do context 'when database primary location is set' do
...@@ -85,13 +87,13 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -85,13 +87,13 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
allow(middleware).to receive(:replica_caught_up?).and_return(true) allow(middleware).to receive(:replica_caught_up?).and_return(true)
end end
it_behaves_like 'replica is up to date', '0/D525E3A8', data_consistency it_behaves_like 'replica is up to date', '0/D525E3A8'
end end
context 'when database location is not set' do context 'when database location is not set' do
let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e' } } let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e' } }
it_behaves_like 'stick to the primary', nil it_behaves_like 'stick to the primary'
end end
end end
...@@ -124,10 +126,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -124,10 +126,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
include_examples 'sticks based on data consistency', :delayed include_examples 'sticks based on data consistency', :delayed
context 'when replica is not up to date' do context 'when replica is not up to date' do
before do let(:has_replication_lag) { true }
allow(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer, :release_host)
allow(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer, :select_up_to_date_host).and_return(false)
end
around do |example| around do |example|
with_sidekiq_server_middleware do |chain| with_sidekiq_server_middleware do |chain|
...@@ -143,7 +142,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -143,7 +142,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
end.to raise_error(Sidekiq::JobRetry::Skip) end.to raise_error(Sidekiq::JobRetry::Skip)
expect(job['error_class']).to eq('Gitlab::Database::LoadBalancing::SidekiqServerMiddleware::JobReplicaNotUpToDate') expect(job['error_class']).to eq('Gitlab::Database::LoadBalancing::SidekiqServerMiddleware::JobReplicaNotUpToDate')
expect(job[:database_chosen]).to eq('retry') expect(job['load_balancing_strategy']).to eq('retry_replica')
end end
end end
...@@ -154,7 +153,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -154,7 +153,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
end.to raise_error(Sidekiq::JobRetry::Skip) end.to raise_error(Sidekiq::JobRetry::Skip)
process_job(job) process_job(job)
expect(job[:database_chosen]).to eq('primary') expect(job['load_balancing_strategy']).to eq('retry_primary')
end end
end end
...@@ -163,7 +162,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -163,7 +162,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
stub_feature_flags(sidekiq_load_balancing_rotate_up_to_date_replica: false) stub_feature_flags(sidekiq_load_balancing_rotate_up_to_date_replica: false)
end end
it 'uses different implmentation' do it 'uses different implementation' do
expect(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer, :host, :caught_up?).and_return(false) expect(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer, :host, :caught_up?).and_return(false)
expect do expect do
...@@ -185,9 +184,9 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do ...@@ -185,9 +184,9 @@ RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
include_examples 'stick to the primary' include_examples 'stick to the primary'
it 'updates job hash with primary database chosen', :aggregate_failures do it 'updates job hash with primary database chosen', :aggregate_failures do
expect { |b| middleware.call(worker, job, double(:queue), &b) }.to yield_control middleware.call(worker, job, double(:queue)) do
expect(job['load_balancing_strategy']).to eq('primary')
expect(job[:database_chosen]).to eq('primary') end
end end
end end
end end
......
...@@ -342,7 +342,7 @@ RSpec.describe Gitlab::SidekiqLogging::StructuredLogger do ...@@ -342,7 +342,7 @@ RSpec.describe Gitlab::SidekiqLogging::StructuredLogger do
end end
context 'when the job uses load balancing capabilities' do context 'when the job uses load balancing capabilities' do
let(:expected_payload) { { 'database_chosen' => 'retry' } } let(:expected_payload) { { 'load_balancing_strategy' => 'retry' } }
before do before do
allow(Time).to receive(:now).and_return(timestamp) allow(Time).to receive(:now).and_return(timestamp)
...@@ -354,7 +354,7 @@ RSpec.describe Gitlab::SidekiqLogging::StructuredLogger do ...@@ -354,7 +354,7 @@ RSpec.describe Gitlab::SidekiqLogging::StructuredLogger do
expect(logger).to receive(:info).with(include(expected_payload)).ordered expect(logger).to receive(:info).with(include(expected_payload)).ordered
call_subject(job, 'test_queue') do call_subject(job, 'test_queue') do
job[:database_chosen] = 'retry' job['load_balancing_strategy'] = 'retry'
end end
end end
end end
......
...@@ -109,22 +109,20 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do ...@@ -109,22 +109,20 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do
end end
context 'DB load balancing' do context 'DB load balancing' do
using RSpec::Parameterized::TableSyntax
subject { described_class.new } subject { described_class.new }
let(:queue) { :test } let(:queue) { :test }
let(:worker_class) { worker.class } let(:worker_class) { worker.class }
let(:job) { {} } let(:worker) { TestWorker.new }
let(:job_status) { :done } let(:client_middleware) { Gitlab::Database::LoadBalancing::SidekiqClientMiddleware.new }
let(:labels_with_job_status) { default_labels.merge(job_status: job_status.to_s) } let(:load_balancer) { double.as_null_object }
let(:default_labels) do let(:load_balancing_metric) { double('load balancing metric') }
{ queue: queue.to_s, let(:job) { { "retry" => 3, "job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e" } }
worker: worker_class.to_s,
boundary: "", def process_job
external_dependencies: "no", client_middleware.call(worker_class, job, queue, double) do
feature_category: "", worker_class.process_job(job)
urgency: "low" } end
end end
before do before do
...@@ -132,84 +130,93 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do ...@@ -132,84 +130,93 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do
TestWorker.class_eval do TestWorker.class_eval do
include Sidekiq::Worker include Sidekiq::Worker
include WorkerAttributes include WorkerAttributes
def perform(*args)
end end
end end
let(:worker) { TestWorker.new } allow(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer).and_return(load_balancer)
allow(load_balancing_metric).to receive(:increment)
include_context 'server metrics with mocked prometheus'
context 'when load_balancing is enabled' do
let(:load_balancing_metric) { double('load balancing metric') }
include_context 'clear DB Load Balancing configuration'
before do
allow(::Gitlab::Database::LoadBalancing).to receive(:enable?).and_return(true)
allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_load_balancing_count, anything).and_return(load_balancing_metric) allow(Gitlab::Metrics).to receive(:counter).with(:sidekiq_load_balancing_count, anything).and_return(load_balancing_metric)
end end
describe '#initialize' do around do |example|
it 'sets load_balancing metrics' do with_sidekiq_server_middleware do |chain|
expect(Gitlab::Metrics).to receive(:counter).with(:sidekiq_load_balancing_count, anything).and_return(load_balancing_metric) chain.add Gitlab::Database::LoadBalancing::SidekiqServerMiddleware
chain.add described_class
subject Sidekiq::Testing.inline! { example.run }
end end
end end
describe '#call' do include_context 'server metrics with mocked prometheus'
include_context 'server metrics call' include_context 'server metrics call'
include_context 'clear DB Load Balancing configuration'
context 'when :database_chosen is provided' do shared_context 'worker declaring data consistency' do
where(:database_chosen) do let(:worker_class) { LBTestWorker }
%w[primary retry replica]
end before do
stub_const('LBTestWorker', Class.new(TestWorker))
LBTestWorker.class_eval do
include ApplicationWorker
with_them do data_consistency :delayed
context "when #{params[:database_chosen]} is used" do end
let(:labels_with_load_balancing) do end
labels_with_job_status.merge(database_chosen: database_chosen, data_consistency: 'delayed')
end end
context 'when load_balancing is enabled' do
before do before do
job[:database_chosen] = database_chosen allow(::Gitlab::Database::LoadBalancing).to receive(:enable?).and_return(true)
job[:data_consistency] = 'delayed'
allow(load_balancing_metric).to receive(:increment)
end end
it 'increment sidekiq_load_balancing_count' do describe '#call' do
expect(load_balancing_metric).to receive(:increment).with(labels_with_load_balancing, 1) context 'when worker declares data consistency' do
include_context 'worker declaring data consistency'
described_class.new.call(worker, job, :test) { nil } it 'increments load balancing counter' do
end process_job
end
expect(load_balancing_metric).to have_received(:increment).with(
a_hash_including(
data_consistency: :delayed,
load_balancing_strategy: 'replica'
), 1)
end end
end end
context 'when :database_chosen is not provided' do context 'when worker does not declare data consistency' do
it 'does not increment sidekiq_load_balancing_count' do it 'does not increment load balancing counter' do
expect(load_balancing_metric).not_to receive(:increment) process_job
described_class.new.call(worker, job, :test) { nil } expect(load_balancing_metric).not_to have_received(:increment)
end end
end end
end end
end end
context 'when load_balancing is disabled' do context 'when load_balancing is disabled' do
include_context 'clear DB Load Balancing configuration' include_context 'worker declaring data consistency'
before do before do
allow(::Gitlab::Database::LoadBalancing).to receive(:enable?).and_return(false) allow(::Gitlab::Database::LoadBalancing).to receive(:enable?).and_return(false)
end end
describe '#initialize' do describe '#initialize' do
it 'doesnt set load_balancing metrics' do it 'does not set load_balancing metrics' do
expect(Gitlab::Metrics).not_to receive(:counter).with(:sidekiq_load_balancing_count, anything) expect(Gitlab::Metrics).not_to receive(:counter).with(:sidekiq_load_balancing_count, anything)
subject subject
end end
end end
describe '#call' do
it 'does not increment load balancing counter' do
process_job
expect(load_balancing_metric).not_to have_received(:increment)
end
end
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment