Commit f02f3651 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch '343047-add-ci-worker-for-bg-migrations' into 'master'

Prepare for ci background migration worker

See merge request gitlab-org/gitlab!72916
parents a60b2d4b 41da33b3
......@@ -10,7 +10,6 @@ Database/MultipleDatabases:
- ee/lib/pseudonymizer/pager.rb
- ee/lib/system_check/geo/geo_database_configured_check.rb
- ee/spec/lib/pseudonymizer/dumper_spec.rb
- ee/spec/models/pg_replication_slot_spec.rb
- ee/spec/services/ee/merge_requests/update_service_spec.rb
- lib/backup/database.rb
- lib/backup/manager.rb
......
# frozen_string_literal: true
module Postgresql
class ReplicationSlot < ApplicationRecord
class ReplicationSlot < Gitlab::Database::SharedModel
self.table_name = 'pg_replication_slots'
# Returns true if there are any replication slots in use.
......
# frozen_string_literal: true
module BackgroundMigration
module SingleDatabaseWorker
extend ActiveSupport::Concern
include ApplicationWorker
MAX_LEASE_ATTEMPTS = 5
included do
data_consistency :always
sidekiq_options retry: 3
feature_category :database
urgency :throttled
loggable_arguments 0, 1
end
class_methods do
# The minimum amount of time between processing two jobs of the same migration
# class.
#
# This interval is set to 2 or 5 minutes so autovacuuming and other
# maintenance related tasks have plenty of time to clean up after a migration
# has been performed.
def minimum_interval
2.minutes.to_i
end
def tracking_database
raise NotImplementedError, "#{self.name} does not implement #{__method__}"
end
def unhealthy_metric_name
raise NotImplementedError, "#{self.name} does not implement #{__method__}"
end
end
# Performs the background migration.
#
# See Gitlab::BackgroundMigration.perform for more information.
#
# class_name - The class name of the background migration to run.
# arguments - The arguments to pass to the migration class.
# lease_attempts - The number of times we will try to obtain an exclusive
# lease on the class before giving up. See MR for more discussion.
# https://gitlab.com/gitlab-org/gitlab/-/merge_requests/45298#note_434304956
def perform(class_name, arguments = [], lease_attempts = MAX_LEASE_ATTEMPTS)
job_coordinator.with_shared_connection do
perform_with_connection(class_name, arguments, lease_attempts)
end
end
private
def job_coordinator
@job_coordinator ||= Gitlab::BackgroundMigration.coordinator_for_database(self.class.tracking_database)
end
def perform_with_connection(class_name, arguments, lease_attempts)
with_context(caller_id: class_name.to_s) do
retried = lease_attempts != MAX_LEASE_ATTEMPTS
attempts_left = lease_attempts - 1
should_perform, ttl = perform_and_ttl(class_name, attempts_left, retried)
break if should_perform.nil?
if should_perform
job_coordinator.perform(class_name, arguments)
else
# If the lease could not be obtained this means either another process is
# running a migration of this class or we ran one recently. In this case
# we'll reschedule the job in such a way that it is picked up again around
# the time the lease expires.
self.class
.perform_in(ttl || self.class.minimum_interval, class_name, arguments, attempts_left)
end
end
end
def perform_and_ttl(class_name, attempts_left, retried)
# In test environments `perform_in` will run right away. This can then
# lead to stack level errors in the above `#perform`. To work around this
# we'll just perform the migration right away in the test environment.
return [true, nil] if always_perform?
lease = lease_for(class_name, retried)
lease_obtained = !!lease.try_obtain
healthy_db = healthy_database?
perform = lease_obtained && healthy_db
database_unhealthy_counter.increment if lease_obtained && !healthy_db
# When the DB is unhealthy or the lease can't be obtained after several tries,
# then give up on the job and log a warning. Otherwise we could end up in
# an infinite rescheduling loop. Jobs can be tracked in the database with the
# use of Gitlab::Database::BackgroundMigrationJob
if !perform && attempts_left < 0
msg = if !lease_obtained
'Job could not get an exclusive lease after several tries. Giving up.'
else
'Database was unhealthy after several tries. Giving up.'
end
Sidekiq.logger.warn(class: class_name, message: msg, job_id: jid)
return [nil, nil]
end
[perform, lease.ttl]
end
def lease_for(class_name, retried)
Gitlab::ExclusiveLease
.new(lease_key_for(class_name, retried), timeout: self.class.minimum_interval)
end
def lease_key_for(class_name, retried)
key = "#{self.class.name}:#{class_name}"
# We use a different exclusive lock key for retried jobs to allow them running concurrently with the scheduled jobs.
# See https://gitlab.com/gitlab-org/gitlab/-/merge_requests/68763 for more information.
key += ":retried" if retried
key
end
def always_perform?
Rails.env.test?
end
# Returns true if the database is healthy enough to allow the migration to be
# performed.
#
# class_name - The name of the background migration that we might want to
# run.
def healthy_database?
!Postgresql::ReplicationSlot.lag_too_great?
end
def database_unhealthy_counter
Gitlab::Metrics.counter(
self.class.unhealthy_metric_name,
'The number of times a background migration is rescheduled because the database is unhealthy.'
)
end
end
end
# frozen_string_literal: true
class BackgroundMigrationWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
include BackgroundMigration::SingleDatabaseWorker
MAX_LEASE_ATTEMPTS = 5
data_consistency :always
sidekiq_options retry: 3
feature_category :database
urgency :throttled
loggable_arguments 0, 1
# The minimum amount of time between processing two jobs of the same migration
# class.
#
# This interval is set to 2 or 5 minutes so autovacuuming and other
# maintenance related tasks have plenty of time to clean up after a migration
# has been performed.
def self.minimum_interval
2.minutes.to_i
end
# Performs the background migration.
#
# See Gitlab::BackgroundMigration.perform for more information.
#
# class_name - The class name of the background migration to run.
# arguments - The arguments to pass to the migration class.
# lease_attempts - The number of times we will try to obtain an exclusive
# lease on the class before giving up. See MR for more discussion.
# https://gitlab.com/gitlab-org/gitlab/-/merge_requests/45298#note_434304956
def perform(class_name, arguments = [], lease_attempts = MAX_LEASE_ATTEMPTS)
with_context(caller_id: class_name.to_s) do
retried = lease_attempts != MAX_LEASE_ATTEMPTS
attempts_left = lease_attempts - 1
should_perform, ttl = perform_and_ttl(class_name, attempts_left, retried)
break if should_perform.nil?
if should_perform
Gitlab::BackgroundMigration.perform(class_name, arguments)
else
# If the lease could not be obtained this means either another process is
# running a migration of this class or we ran one recently. In this case
# we'll reschedule the job in such a way that it is picked up again around
# the time the lease expires.
self.class
.perform_in(ttl || self.class.minimum_interval, class_name, arguments, attempts_left)
end
end
end
def perform_and_ttl(class_name, attempts_left, retried)
# In test environments `perform_in` will run right away. This can then
# lead to stack level errors in the above `#perform`. To work around this
# we'll just perform the migration right away in the test environment.
return [true, nil] if always_perform?
lease = lease_for(class_name, retried)
lease_obtained = !!lease.try_obtain
healthy_db = healthy_database?
perform = lease_obtained && healthy_db
database_unhealthy_counter.increment if lease_obtained && !healthy_db
# When the DB is unhealthy or the lease can't be obtained after several tries,
# then give up on the job and log a warning. Otherwise we could end up in
# an infinite rescheduling loop. Jobs can be tracked in the database with the
# use of Gitlab::Database::BackgroundMigrationJob
if !perform && attempts_left < 0
msg = if !lease_obtained
'Job could not get an exclusive lease after several tries. Giving up.'
else
'Database was unhealthy after several tries. Giving up.'
end
Sidekiq.logger.warn(class: class_name, message: msg, job_id: jid)
return [nil, nil]
end
[perform, lease.ttl]
end
def lease_for(class_name, retried)
Gitlab::ExclusiveLease
.new(lease_key_for(class_name, retried), timeout: self.class.minimum_interval)
end
def lease_key_for(class_name, retried)
key = "#{self.class.name}:#{class_name}"
# We use a different exclusive lock key for retried jobs to allow them running concurrently with the scheduled jobs.
# See https://gitlab.com/gitlab-org/gitlab/-/merge_requests/68763 for more information.
key += ":retried" if retried
key
end
def always_perform?
Rails.env.test?
end
# Returns true if the database is healthy enough to allow the migration to be
# performed.
#
# class_name - The name of the background migration that we might want to
# run.
def healthy_database?
!Postgresql::ReplicationSlot.lag_too_great?
def self.tracking_database
@tracking_database ||= Gitlab::Database::MAIN_DATABASE_NAME.to_sym
end
def database_unhealthy_counter
Gitlab::Metrics.counter(
:background_migration_database_health_reschedules,
'The number of times a background migration is rescheduled because the database is unhealthy.'
)
def self.unhealthy_metric_name
@unhealthy_metric_name ||= :background_migration_database_health_reschedules
end
end
......@@ -3,6 +3,8 @@
require 'spec_helper'
RSpec.describe Postgresql::ReplicationSlot do
it { is_expected.to be_a Gitlab::Database::SharedModel }
describe '.in_use?' do
it 'returns true when replication slots are present' do
expect(described_class).to receive(:exists?).and_return(true)
......@@ -73,28 +75,22 @@ RSpec.describe Postgresql::ReplicationSlot do
before(:all) do
skip('max_replication_slots too small') if skip_examples
@current_slot_count = ApplicationRecord
@current_slot_count = described_class
.connection
.execute("SELECT COUNT(*) FROM pg_replication_slots;")
.first
.fetch('count')
.to_i
.select_value("SELECT COUNT(*) FROM pg_replication_slots")
@current_unused_count = ApplicationRecord
@current_unused_count = described_class
.connection
.execute("SELECT COUNT(*) FROM pg_replication_slots WHERE active = 'f';")
.first
.fetch('count')
.to_i
.select_value("SELECT COUNT(*) FROM pg_replication_slots WHERE active = 'f';")
ApplicationRecord
described_class
.connection
.execute("SELECT * FROM pg_create_physical_replication_slot('test_slot');")
end
after(:all) do
unless skip_examples
ApplicationRecord
described_class
.connection
.execute("SELECT pg_drop_replication_slot('test_slot');")
end
......
# frozen_string_literal: true
RSpec.shared_examples 'it runs background migration jobs' do |tracking_database, metric_name|
describe 'defining the job attributes' do
it 'defines the data_consistency as always' do
expect(described_class.get_data_consistency).to eq(:always)
end
it 'defines the retry count in sidekiq_options' do
expect(described_class.sidekiq_options['retry']).to eq(3)
end
it 'defines the feature_category as database' do
expect(described_class.get_feature_category).to eq(:database)
end
it 'defines the urgency as throttled' do
expect(described_class.get_urgency).to eq(:throttled)
end
it 'defines the loggable_arguments' do
expect(described_class.loggable_arguments).to match_array([0, 1])
end
end
describe '.tracking_database' do
it 'does not raise an error' do
expect { described_class.tracking_database }.not_to raise_error
end
it 'overrides the method to return the tracking database' do
expect(described_class.tracking_database).to eq(tracking_database)
end
end
describe '.unhealthy_metric_name' do
it 'does not raise an error' do
expect { described_class.unhealthy_metric_name }.not_to raise_error
end
it 'overrides the method to return the unhealthy metric name' do
expect(described_class.unhealthy_metric_name).to eq(metric_name)
end
end
describe '.minimum_interval' do
it 'returns 2 minutes' do
expect(described_class.minimum_interval).to eq(2.minutes.to_i)
end
end
describe '#perform' do
let(:worker) { described_class.new }
before do
allow(worker).to receive(:jid).and_return(1)
allow(worker).to receive(:always_perform?).and_return(false)
allow(Postgresql::ReplicationSlot).to receive(:lag_too_great?).and_return(false)
end
it 'performs jobs using the coordinator for the correct database' do
expect_next_instance_of(Gitlab::BackgroundMigration::JobCoordinator) do |coordinator|
allow(coordinator).to receive(:with_shared_connection).and_yield
expect(coordinator.database).to eq(tracking_database)
expect(coordinator).to receive(:perform).with('Foo', [10, 20])
end
worker.perform('Foo', [10, 20])
end
context 'when lease can be obtained' do
let(:coordinator) { double('job coordinator') }
before do
allow(Gitlab::BackgroundMigration).to receive(:coordinator_for_database)
.with(tracking_database)
.and_return(coordinator)
allow(coordinator).to receive(:with_shared_connection).and_yield
end
it 'sets up the shared connection before checking replication' do
expect(coordinator).to receive(:with_shared_connection).and_yield.ordered
expect(Postgresql::ReplicationSlot).to receive(:lag_too_great?).and_return(false).ordered
expect(coordinator).to receive(:perform).with('Foo', [10, 20])
worker.perform('Foo', [10, 20])
end
it 'performs a background migration' do
expect(coordinator).to receive(:perform).with('Foo', [10, 20])
worker.perform('Foo', [10, 20])
end
context 'when lease_attempts is 1' do
it 'performs a background migration' do
expect(coordinator).to receive(:perform).with('Foo', [10, 20])
worker.perform('Foo', [10, 20], 1)
end
end
it 'can run scheduled job and retried job concurrently' do
expect(coordinator)
.to receive(:perform)
.with('Foo', [10, 20])
.exactly(2).time
worker.perform('Foo', [10, 20])
worker.perform('Foo', [10, 20], described_class::MAX_LEASE_ATTEMPTS - 1)
end
it 'sets the class that will be executed as the caller_id' do
expect(coordinator).to receive(:perform) do
expect(Gitlab::ApplicationContext.current).to include('meta.caller_id' => 'Foo')
end
worker.perform('Foo', [10, 20])
end
end
context 'when lease not obtained (migration of same class was performed recently)' do
let(:timeout) { described_class.minimum_interval }
let(:lease_key) { "#{described_class.name}:Foo" }
let(:coordinator) { double('job coordinator') }
before do
allow(Gitlab::BackgroundMigration).to receive(:coordinator_for_database)
.with(tracking_database)
.and_return(coordinator)
allow(coordinator).to receive(:with_shared_connection).and_yield
expect(coordinator).not_to receive(:perform)
Gitlab::ExclusiveLease.new(lease_key, timeout: timeout).try_obtain
end
it 'reschedules the migration and decrements the lease_attempts' do
expect(described_class)
.to receive(:perform_in)
.with(a_kind_of(Numeric), 'Foo', [10, 20], 4)
worker.perform('Foo', [10, 20], 5)
end
context 'when lease_attempts is 1' do
let(:lease_key) { "#{described_class.name}:Foo:retried" }
it 'reschedules the migration and decrements the lease_attempts' do
expect(described_class)
.to receive(:perform_in)
.with(a_kind_of(Numeric), 'Foo', [10, 20], 0)
worker.perform('Foo', [10, 20], 1)
end
end
context 'when lease_attempts is 0' do
let(:lease_key) { "#{described_class.name}:Foo:retried" }
it 'gives up performing the migration' do
expect(described_class).not_to receive(:perform_in)
expect(Sidekiq.logger).to receive(:warn).with(
class: 'Foo',
message: 'Job could not get an exclusive lease after several tries. Giving up.',
job_id: 1)
worker.perform('Foo', [10, 20], 0)
end
end
end
context 'when database is not healthy' do
before do
expect(Postgresql::ReplicationSlot).to receive(:lag_too_great?).and_return(true)
end
it 'reschedules a migration if the database is not healthy' do
expect(described_class)
.to receive(:perform_in)
.with(a_kind_of(Numeric), 'Foo', [10, 20], 4)
worker.perform('Foo', [10, 20])
end
it 'increments the unhealthy counter' do
counter = Gitlab::Metrics.counter(metric_name, 'msg')
expect(described_class).to receive(:perform_in)
expect { worker.perform('Foo', [10, 20]) }.to change { counter.get }.by(1)
end
context 'when lease_attempts is 0' do
it 'gives up performing the migration' do
expect(described_class).not_to receive(:perform_in)
expect(Sidekiq.logger).to receive(:warn).with(
class: 'Foo',
message: 'Database was unhealthy after several tries. Giving up.',
job_id: 1)
worker.perform('Foo', [10, 20], 0)
end
end
end
end
end
......@@ -3,148 +3,5 @@
require 'spec_helper'
RSpec.describe BackgroundMigrationWorker, :clean_gitlab_redis_shared_state do
let(:worker) { described_class.new }
describe '.minimum_interval' do
it 'returns 2 minutes' do
expect(described_class.minimum_interval).to eq(2.minutes.to_i)
end
end
describe '#perform' do
before do
allow(worker).to receive(:jid).and_return(1)
allow(worker).to receive(:always_perform?).and_return(false)
end
it 'can run scheduled job and retried job concurrently' do
expect(Gitlab::BackgroundMigration)
.to receive(:perform)
.with('Foo', [10, 20])
.exactly(2).time
worker.perform('Foo', [10, 20])
worker.perform('Foo', [10, 20], described_class::MAX_LEASE_ATTEMPTS - 1)
end
context 'when lease can be obtained' do
before do
expect(Gitlab::BackgroundMigration)
.to receive(:perform)
.with('Foo', [10, 20])
end
it 'performs a background migration' do
worker.perform('Foo', [10, 20])
end
context 'when lease_attempts is 1' do
it 'performs a background migration' do
worker.perform('Foo', [10, 20], 1)
end
end
end
context 'when lease not obtained (migration of same class was performed recently)' do
before do
expect(Gitlab::BackgroundMigration).not_to receive(:perform)
worker.lease_for('Foo', false).try_obtain
end
it 'reschedules the migration and decrements the lease_attempts' do
expect(described_class)
.to receive(:perform_in)
.with(a_kind_of(Numeric), 'Foo', [10, 20], 4)
worker.perform('Foo', [10, 20], 5)
end
context 'when lease_attempts is 1' do
before do
worker.lease_for('Foo', true).try_obtain
end
it 'reschedules the migration and decrements the lease_attempts' do
expect(described_class)
.to receive(:perform_in)
.with(a_kind_of(Numeric), 'Foo', [10, 20], 0)
worker.perform('Foo', [10, 20], 1)
end
end
context 'when lease_attempts is 0' do
before do
worker.lease_for('Foo', true).try_obtain
end
it 'gives up performing the migration' do
expect(described_class).not_to receive(:perform_in)
expect(Sidekiq.logger).to receive(:warn).with(
class: 'Foo',
message: 'Job could not get an exclusive lease after several tries. Giving up.',
job_id: 1)
worker.perform('Foo', [10, 20], 0)
end
end
end
context 'when database is not healthy' do
before do
allow(worker).to receive(:healthy_database?).and_return(false)
end
it 'reschedules a migration if the database is not healthy' do
expect(described_class)
.to receive(:perform_in)
.with(a_kind_of(Numeric), 'Foo', [10, 20], 4)
worker.perform('Foo', [10, 20])
end
context 'when lease_attempts is 0' do
it 'gives up performing the migration' do
expect(described_class).not_to receive(:perform_in)
expect(Sidekiq.logger).to receive(:warn).with(
class: 'Foo',
message: 'Database was unhealthy after several tries. Giving up.',
job_id: 1)
worker.perform('Foo', [10, 20], 0)
end
end
end
it 'sets the class that will be executed as the caller_id' do
expect(Gitlab::BackgroundMigration).to receive(:perform) do
expect(Gitlab::ApplicationContext.current).to include('meta.caller_id' => 'Foo')
end
worker.perform('Foo', [10, 20])
end
end
describe '#healthy_database?' do
context 'when replication lag is too great' do
it 'returns false' do
allow(Postgresql::ReplicationSlot)
.to receive(:lag_too_great?)
.and_return(true)
expect(worker.healthy_database?).to eq(false)
end
context 'when replication lag is small enough' do
it 'returns true' do
allow(Postgresql::ReplicationSlot)
.to receive(:lag_too_great?)
.and_return(false)
expect(worker.healthy_database?).to eq(true)
end
end
end
end
it_behaves_like 'it runs background migration jobs', :main, :background_migration_database_health_reschedules
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment