Commit 41da33b3 authored by pbair's avatar pbair

Prepare for ci background migration worker

Change the existing BackgroundMigrationWorker to use SharedModel with
the correct connection for running BG migration jobs. Also make the
class and specs generic, in preparation for adding a new worker which
will process jobs specific to the ci database.
parent d178b976
......@@ -10,7 +10,6 @@ Database/MultipleDatabases:
- ee/lib/pseudonymizer/pager.rb
- ee/lib/system_check/geo/geo_database_configured_check.rb
- ee/spec/lib/pseudonymizer/dumper_spec.rb
- ee/spec/models/pg_replication_slot_spec.rb
- ee/spec/services/ee/merge_requests/update_service_spec.rb
- lib/backup/database.rb
- lib/backup/manager.rb
......
# frozen_string_literal: true
module Postgresql
class ReplicationSlot < ApplicationRecord
class ReplicationSlot < Gitlab::Database::SharedModel
self.table_name = 'pg_replication_slots'
# Returns true if there are any replication slots in use.
......
# frozen_string_literal: true
module BackgroundMigration
module SingleDatabaseWorker
extend ActiveSupport::Concern
include ApplicationWorker
MAX_LEASE_ATTEMPTS = 5
included do
data_consistency :always
sidekiq_options retry: 3
feature_category :database
urgency :throttled
loggable_arguments 0, 1
end
class_methods do
# The minimum amount of time between processing two jobs of the same migration
# class.
#
# This interval is set to 2 or 5 minutes so autovacuuming and other
# maintenance related tasks have plenty of time to clean up after a migration
# has been performed.
def minimum_interval
2.minutes.to_i
end
def tracking_database
raise NotImplementedError, "#{self.name} does not implement #{__method__}"
end
def unhealthy_metric_name
raise NotImplementedError, "#{self.name} does not implement #{__method__}"
end
end
# Performs the background migration.
#
# See Gitlab::BackgroundMigration.perform for more information.
#
# class_name - The class name of the background migration to run.
# arguments - The arguments to pass to the migration class.
# lease_attempts - The number of times we will try to obtain an exclusive
# lease on the class before giving up. See MR for more discussion.
# https://gitlab.com/gitlab-org/gitlab/-/merge_requests/45298#note_434304956
def perform(class_name, arguments = [], lease_attempts = MAX_LEASE_ATTEMPTS)
job_coordinator.with_shared_connection do
perform_with_connection(class_name, arguments, lease_attempts)
end
end
private
def job_coordinator
@job_coordinator ||= Gitlab::BackgroundMigration.coordinator_for_database(self.class.tracking_database)
end
def perform_with_connection(class_name, arguments, lease_attempts)
with_context(caller_id: class_name.to_s) do
retried = lease_attempts != MAX_LEASE_ATTEMPTS
attempts_left = lease_attempts - 1
should_perform, ttl = perform_and_ttl(class_name, attempts_left, retried)
break if should_perform.nil?
if should_perform
job_coordinator.perform(class_name, arguments)
else
# If the lease could not be obtained this means either another process is
# running a migration of this class or we ran one recently. In this case
# we'll reschedule the job in such a way that it is picked up again around
# the time the lease expires.
self.class
.perform_in(ttl || self.class.minimum_interval, class_name, arguments, attempts_left)
end
end
end
def perform_and_ttl(class_name, attempts_left, retried)
# In test environments `perform_in` will run right away. This can then
# lead to stack level errors in the above `#perform`. To work around this
# we'll just perform the migration right away in the test environment.
return [true, nil] if always_perform?
lease = lease_for(class_name, retried)
lease_obtained = !!lease.try_obtain
healthy_db = healthy_database?
perform = lease_obtained && healthy_db
database_unhealthy_counter.increment if lease_obtained && !healthy_db
# When the DB is unhealthy or the lease can't be obtained after several tries,
# then give up on the job and log a warning. Otherwise we could end up in
# an infinite rescheduling loop. Jobs can be tracked in the database with the
# use of Gitlab::Database::BackgroundMigrationJob
if !perform && attempts_left < 0
msg = if !lease_obtained
'Job could not get an exclusive lease after several tries. Giving up.'
else
'Database was unhealthy after several tries. Giving up.'
end
Sidekiq.logger.warn(class: class_name, message: msg, job_id: jid)
return [nil, nil]
end
[perform, lease.ttl]
end
def lease_for(class_name, retried)
Gitlab::ExclusiveLease
.new(lease_key_for(class_name, retried), timeout: self.class.minimum_interval)
end
def lease_key_for(class_name, retried)
key = "#{self.class.name}:#{class_name}"
# We use a different exclusive lock key for retried jobs to allow them running concurrently with the scheduled jobs.
# See https://gitlab.com/gitlab-org/gitlab/-/merge_requests/68763 for more information.
key += ":retried" if retried
key
end
def always_perform?
Rails.env.test?
end
# Returns true if the database is healthy enough to allow the migration to be
# performed.
#
# class_name - The name of the background migration that we might want to
# run.
def healthy_database?
!Postgresql::ReplicationSlot.lag_too_great?
end
def database_unhealthy_counter
Gitlab::Metrics.counter(
self.class.unhealthy_metric_name,
'The number of times a background migration is rescheduled because the database is unhealthy.'
)
end
end
end
# frozen_string_literal: true
class BackgroundMigrationWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
include BackgroundMigration::SingleDatabaseWorker
MAX_LEASE_ATTEMPTS = 5
data_consistency :always
sidekiq_options retry: 3
feature_category :database
urgency :throttled
loggable_arguments 0, 1
# The minimum amount of time between processing two jobs of the same migration
# class.
#
# This interval is set to 2 or 5 minutes so autovacuuming and other
# maintenance related tasks have plenty of time to clean up after a migration
# has been performed.
def self.minimum_interval
2.minutes.to_i
end
# Performs the background migration.
#
# See Gitlab::BackgroundMigration.perform for more information.
#
# class_name - The class name of the background migration to run.
# arguments - The arguments to pass to the migration class.
# lease_attempts - The number of times we will try to obtain an exclusive
# lease on the class before giving up. See MR for more discussion.
# https://gitlab.com/gitlab-org/gitlab/-/merge_requests/45298#note_434304956
def perform(class_name, arguments = [], lease_attempts = MAX_LEASE_ATTEMPTS)
with_context(caller_id: class_name.to_s) do
retried = lease_attempts != MAX_LEASE_ATTEMPTS
attempts_left = lease_attempts - 1
should_perform, ttl = perform_and_ttl(class_name, attempts_left, retried)
break if should_perform.nil?
if should_perform
Gitlab::BackgroundMigration.perform(class_name, arguments)
else
# If the lease could not be obtained this means either another process is
# running a migration of this class or we ran one recently. In this case
# we'll reschedule the job in such a way that it is picked up again around
# the time the lease expires.
self.class
.perform_in(ttl || self.class.minimum_interval, class_name, arguments, attempts_left)
end
end
end
def perform_and_ttl(class_name, attempts_left, retried)
# In test environments `perform_in` will run right away. This can then
# lead to stack level errors in the above `#perform`. To work around this
# we'll just perform the migration right away in the test environment.
return [true, nil] if always_perform?
lease = lease_for(class_name, retried)
lease_obtained = !!lease.try_obtain
healthy_db = healthy_database?
perform = lease_obtained && healthy_db
database_unhealthy_counter.increment if lease_obtained && !healthy_db
# When the DB is unhealthy or the lease can't be obtained after several tries,
# then give up on the job and log a warning. Otherwise we could end up in
# an infinite rescheduling loop. Jobs can be tracked in the database with the
# use of Gitlab::Database::BackgroundMigrationJob
if !perform && attempts_left < 0
msg = if !lease_obtained
'Job could not get an exclusive lease after several tries. Giving up.'
else
'Database was unhealthy after several tries. Giving up.'
end
Sidekiq.logger.warn(class: class_name, message: msg, job_id: jid)
return [nil, nil]
end
[perform, lease.ttl]
end
def lease_for(class_name, retried)
Gitlab::ExclusiveLease
.new(lease_key_for(class_name, retried), timeout: self.class.minimum_interval)
end
def lease_key_for(class_name, retried)
key = "#{self.class.name}:#{class_name}"
# We use a different exclusive lock key for retried jobs to allow them running concurrently with the scheduled jobs.
# See https://gitlab.com/gitlab-org/gitlab/-/merge_requests/68763 for more information.
key += ":retried" if retried
key
end
def always_perform?
Rails.env.test?
end
# Returns true if the database is healthy enough to allow the migration to be
# performed.
#
# class_name - The name of the background migration that we might want to
# run.
def healthy_database?
!Postgresql::ReplicationSlot.lag_too_great?
def self.tracking_database
@tracking_database ||= Gitlab::Database::MAIN_DATABASE_NAME.to_sym
end
def database_unhealthy_counter
Gitlab::Metrics.counter(
:background_migration_database_health_reschedules,
'The number of times a background migration is rescheduled because the database is unhealthy.'
)
def self.unhealthy_metric_name
@unhealthy_metric_name ||= :background_migration_database_health_reschedules
end
end
......@@ -3,6 +3,8 @@
require 'spec_helper'
RSpec.describe Postgresql::ReplicationSlot do
it { is_expected.to be_a Gitlab::Database::SharedModel }
describe '.in_use?' do
it 'returns true when replication slots are present' do
expect(described_class).to receive(:exists?).and_return(true)
......@@ -73,28 +75,22 @@ RSpec.describe Postgresql::ReplicationSlot do
before(:all) do
skip('max_replication_slots too small') if skip_examples
@current_slot_count = ApplicationRecord
@current_slot_count = described_class
.connection
.execute("SELECT COUNT(*) FROM pg_replication_slots;")
.first
.fetch('count')
.to_i
.select_value("SELECT COUNT(*) FROM pg_replication_slots")
@current_unused_count = ApplicationRecord
@current_unused_count = described_class
.connection
.execute("SELECT COUNT(*) FROM pg_replication_slots WHERE active = 'f';")
.first
.fetch('count')
.to_i
.select_value("SELECT COUNT(*) FROM pg_replication_slots WHERE active = 'f';")
ApplicationRecord
described_class
.connection
.execute("SELECT * FROM pg_create_physical_replication_slot('test_slot');")
end
after(:all) do
unless skip_examples
ApplicationRecord
described_class
.connection
.execute("SELECT pg_drop_replication_slot('test_slot');")
end
......
# frozen_string_literal: true
RSpec.shared_examples 'it runs background migration jobs' do |tracking_database, metric_name|
describe 'defining the job attributes' do
it 'defines the data_consistency as always' do
expect(described_class.get_data_consistency).to eq(:always)
end
it 'defines the retry count in sidekiq_options' do
expect(described_class.sidekiq_options['retry']).to eq(3)
end
it 'defines the feature_category as database' do
expect(described_class.get_feature_category).to eq(:database)
end
it 'defines the urgency as throttled' do
expect(described_class.get_urgency).to eq(:throttled)
end
it 'defines the loggable_arguments' do
expect(described_class.loggable_arguments).to match_array([0, 1])
end
end
describe '.tracking_database' do
it 'does not raise an error' do
expect { described_class.tracking_database }.not_to raise_error
end
it 'overrides the method to return the tracking database' do
expect(described_class.tracking_database).to eq(tracking_database)
end
end
describe '.unhealthy_metric_name' do
it 'does not raise an error' do
expect { described_class.unhealthy_metric_name }.not_to raise_error
end
it 'overrides the method to return the unhealthy metric name' do
expect(described_class.unhealthy_metric_name).to eq(metric_name)
end
end
describe '.minimum_interval' do
it 'returns 2 minutes' do
expect(described_class.minimum_interval).to eq(2.minutes.to_i)
end
end
describe '#perform' do
let(:worker) { described_class.new }
before do
allow(worker).to receive(:jid).and_return(1)
allow(worker).to receive(:always_perform?).and_return(false)
allow(Postgresql::ReplicationSlot).to receive(:lag_too_great?).and_return(false)
end
it 'performs jobs using the coordinator for the correct database' do
expect_next_instance_of(Gitlab::BackgroundMigration::JobCoordinator) do |coordinator|
allow(coordinator).to receive(:with_shared_connection).and_yield
expect(coordinator.database).to eq(tracking_database)
expect(coordinator).to receive(:perform).with('Foo', [10, 20])
end
worker.perform('Foo', [10, 20])
end
context 'when lease can be obtained' do
let(:coordinator) { double('job coordinator') }
before do
allow(Gitlab::BackgroundMigration).to receive(:coordinator_for_database)
.with(tracking_database)
.and_return(coordinator)
allow(coordinator).to receive(:with_shared_connection).and_yield
end
it 'sets up the shared connection before checking replication' do
expect(coordinator).to receive(:with_shared_connection).and_yield.ordered
expect(Postgresql::ReplicationSlot).to receive(:lag_too_great?).and_return(false).ordered
expect(coordinator).to receive(:perform).with('Foo', [10, 20])
worker.perform('Foo', [10, 20])
end
it 'performs a background migration' do
expect(coordinator).to receive(:perform).with('Foo', [10, 20])
worker.perform('Foo', [10, 20])
end
context 'when lease_attempts is 1' do
it 'performs a background migration' do
expect(coordinator).to receive(:perform).with('Foo', [10, 20])
worker.perform('Foo', [10, 20], 1)
end
end
it 'can run scheduled job and retried job concurrently' do
expect(coordinator)
.to receive(:perform)
.with('Foo', [10, 20])
.exactly(2).time
worker.perform('Foo', [10, 20])
worker.perform('Foo', [10, 20], described_class::MAX_LEASE_ATTEMPTS - 1)
end
it 'sets the class that will be executed as the caller_id' do
expect(coordinator).to receive(:perform) do
expect(Gitlab::ApplicationContext.current).to include('meta.caller_id' => 'Foo')
end
worker.perform('Foo', [10, 20])
end
end
context 'when lease not obtained (migration of same class was performed recently)' do
let(:timeout) { described_class.minimum_interval }
let(:lease_key) { "#{described_class.name}:Foo" }
let(:coordinator) { double('job coordinator') }
before do
allow(Gitlab::BackgroundMigration).to receive(:coordinator_for_database)
.with(tracking_database)
.and_return(coordinator)
allow(coordinator).to receive(:with_shared_connection).and_yield
expect(coordinator).not_to receive(:perform)
Gitlab::ExclusiveLease.new(lease_key, timeout: timeout).try_obtain
end
it 'reschedules the migration and decrements the lease_attempts' do
expect(described_class)
.to receive(:perform_in)
.with(a_kind_of(Numeric), 'Foo', [10, 20], 4)
worker.perform('Foo', [10, 20], 5)
end
context 'when lease_attempts is 1' do
let(:lease_key) { "#{described_class.name}:Foo:retried" }
it 'reschedules the migration and decrements the lease_attempts' do
expect(described_class)
.to receive(:perform_in)
.with(a_kind_of(Numeric), 'Foo', [10, 20], 0)
worker.perform('Foo', [10, 20], 1)
end
end
context 'when lease_attempts is 0' do
let(:lease_key) { "#{described_class.name}:Foo:retried" }
it 'gives up performing the migration' do
expect(described_class).not_to receive(:perform_in)
expect(Sidekiq.logger).to receive(:warn).with(
class: 'Foo',
message: 'Job could not get an exclusive lease after several tries. Giving up.',
job_id: 1)
worker.perform('Foo', [10, 20], 0)
end
end
end
context 'when database is not healthy' do
before do
expect(Postgresql::ReplicationSlot).to receive(:lag_too_great?).and_return(true)
end
it 'reschedules a migration if the database is not healthy' do
expect(described_class)
.to receive(:perform_in)
.with(a_kind_of(Numeric), 'Foo', [10, 20], 4)
worker.perform('Foo', [10, 20])
end
it 'increments the unhealthy counter' do
counter = Gitlab::Metrics.counter(metric_name, 'msg')
expect(described_class).to receive(:perform_in)
expect { worker.perform('Foo', [10, 20]) }.to change { counter.get }.by(1)
end
context 'when lease_attempts is 0' do
it 'gives up performing the migration' do
expect(described_class).not_to receive(:perform_in)
expect(Sidekiq.logger).to receive(:warn).with(
class: 'Foo',
message: 'Database was unhealthy after several tries. Giving up.',
job_id: 1)
worker.perform('Foo', [10, 20], 0)
end
end
end
end
end
......@@ -3,148 +3,5 @@
require 'spec_helper'
RSpec.describe BackgroundMigrationWorker, :clean_gitlab_redis_shared_state do
let(:worker) { described_class.new }
describe '.minimum_interval' do
it 'returns 2 minutes' do
expect(described_class.minimum_interval).to eq(2.minutes.to_i)
end
end
describe '#perform' do
before do
allow(worker).to receive(:jid).and_return(1)
allow(worker).to receive(:always_perform?).and_return(false)
end
it 'can run scheduled job and retried job concurrently' do
expect(Gitlab::BackgroundMigration)
.to receive(:perform)
.with('Foo', [10, 20])
.exactly(2).time
worker.perform('Foo', [10, 20])
worker.perform('Foo', [10, 20], described_class::MAX_LEASE_ATTEMPTS - 1)
end
context 'when lease can be obtained' do
before do
expect(Gitlab::BackgroundMigration)
.to receive(:perform)
.with('Foo', [10, 20])
end
it 'performs a background migration' do
worker.perform('Foo', [10, 20])
end
context 'when lease_attempts is 1' do
it 'performs a background migration' do
worker.perform('Foo', [10, 20], 1)
end
end
end
context 'when lease not obtained (migration of same class was performed recently)' do
before do
expect(Gitlab::BackgroundMigration).not_to receive(:perform)
worker.lease_for('Foo', false).try_obtain
end
it 'reschedules the migration and decrements the lease_attempts' do
expect(described_class)
.to receive(:perform_in)
.with(a_kind_of(Numeric), 'Foo', [10, 20], 4)
worker.perform('Foo', [10, 20], 5)
end
context 'when lease_attempts is 1' do
before do
worker.lease_for('Foo', true).try_obtain
end
it 'reschedules the migration and decrements the lease_attempts' do
expect(described_class)
.to receive(:perform_in)
.with(a_kind_of(Numeric), 'Foo', [10, 20], 0)
worker.perform('Foo', [10, 20], 1)
end
end
context 'when lease_attempts is 0' do
before do
worker.lease_for('Foo', true).try_obtain
end
it 'gives up performing the migration' do
expect(described_class).not_to receive(:perform_in)
expect(Sidekiq.logger).to receive(:warn).with(
class: 'Foo',
message: 'Job could not get an exclusive lease after several tries. Giving up.',
job_id: 1)
worker.perform('Foo', [10, 20], 0)
end
end
end
context 'when database is not healthy' do
before do
allow(worker).to receive(:healthy_database?).and_return(false)
end
it 'reschedules a migration if the database is not healthy' do
expect(described_class)
.to receive(:perform_in)
.with(a_kind_of(Numeric), 'Foo', [10, 20], 4)
worker.perform('Foo', [10, 20])
end
context 'when lease_attempts is 0' do
it 'gives up performing the migration' do
expect(described_class).not_to receive(:perform_in)
expect(Sidekiq.logger).to receive(:warn).with(
class: 'Foo',
message: 'Database was unhealthy after several tries. Giving up.',
job_id: 1)
worker.perform('Foo', [10, 20], 0)
end
end
end
it 'sets the class that will be executed as the caller_id' do
expect(Gitlab::BackgroundMigration).to receive(:perform) do
expect(Gitlab::ApplicationContext.current).to include('meta.caller_id' => 'Foo')
end
worker.perform('Foo', [10, 20])
end
end
describe '#healthy_database?' do
context 'when replication lag is too great' do
it 'returns false' do
allow(Postgresql::ReplicationSlot)
.to receive(:lag_too_great?)
.and_return(true)
expect(worker.healthy_database?).to eq(false)
end
context 'when replication lag is small enough' do
it 'returns true' do
allow(Postgresql::ReplicationSlot)
.to receive(:lag_too_great?)
.and_return(false)
expect(worker.healthy_database?).to eq(true)
end
end
end
end
it_behaves_like 'it runs background migration jobs', :main, :background_migration_database_health_reschedules
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment