Commit aef12542 authored by Patrick Bair's avatar Patrick Bair

Merge branch '351585-support-multiple-databases-for-batched-background-migrations' into 'master'

Resolve "Support multiple databases for batched background migrations"

See merge request gitlab-org/gitlab!79742
parents 10b474ba a053fcef
......@@ -309,6 +309,15 @@
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:database_batched_background_migration_ci_database
:worker_name: Database::BatchedBackgroundMigration::CiDatabaseWorker
:feature_category: :database
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:database_drop_detached_partitions
:worker_name: Database::DropDetachedPartitionsWorker
:feature_category: :database
......
# frozen_string_literal: true
module Database
module BatchedBackgroundMigration
class CiDatabaseWorker # rubocop:disable Scalability/IdempotentWorker
include SingleDatabaseWorker
def self.tracking_database
@tracking_database ||= Gitlab::Database::CI_DATABASE_NAME
end
end
end
end
# frozen_string_literal: true
module Database
module BatchedBackgroundMigration
module SingleDatabaseWorker
extend ActiveSupport::Concern
include ApplicationWorker
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
LEASE_TIMEOUT_MULTIPLIER = 3
MINIMUM_LEASE_TIMEOUT = 10.minutes.freeze
INTERVAL_VARIANCE = 5.seconds.freeze
included do
data_consistency :always
feature_category :database
idempotent!
end
class_methods do
# :nocov:
def tracking_database
raise NotImplementedError, "#{self.name} does not implement #{__method__}"
end
# :nocov:
def lease_key
name.demodulize.underscore
end
end
def perform
Gitlab::Database::SharedModel.using_connection(base_model.connection) do
break unless Feature.enabled?(:execute_batched_migrations_on_schedule, type: :ops, default_enabled: :yaml) && active_migration
with_exclusive_lease(active_migration.interval) do
# Now that we have the exclusive lease, reload migration in case another process has changed it.
# This is a temporary solution until we have better concurrency handling around job execution
#
# We also have to disable this cop, because ApplicationRecord aliases reset to reload, but our database
# models don't inherit from ApplicationRecord
active_migration.reload # rubocop:disable Cop/ActiveRecordAssociationReload
run_active_migration if active_migration.active? && active_migration.interval_elapsed?(variance: INTERVAL_VARIANCE)
end
end
end
private
def active_migration
@active_migration ||= Gitlab::Database::BackgroundMigration::BatchedMigration.active_migration
end
def run_active_migration
Gitlab::Database::BackgroundMigration::BatchedMigrationRunner.new(connection: base_model.connection).run_migration_job(active_migration)
end
def base_model
@base_model ||= Gitlab::Database.database_base_models[self.class.tracking_database]
end
def with_exclusive_lease(interval)
timeout = [interval * LEASE_TIMEOUT_MULTIPLIER, MINIMUM_LEASE_TIMEOUT].max
lease = Gitlab::ExclusiveLease.new(self.class.lease_key, timeout: timeout)
yield if lease.try_obtain
ensure
lease&.cancel
end
end
end
end
# frozen_string_literal: true
module Database
class BatchedBackgroundMigrationWorker
include ApplicationWorker
class BatchedBackgroundMigrationWorker # rubocop:disable Scalability/IdempotentWorker
include BatchedBackgroundMigration::SingleDatabaseWorker
data_consistency :always
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
feature_category :database
idempotent!
LEASE_TIMEOUT_MULTIPLIER = 3
MINIMUM_LEASE_TIMEOUT = 10.minutes.freeze
INTERVAL_VARIANCE = 5.seconds.freeze
def perform
return unless Feature.enabled?(:execute_batched_migrations_on_schedule, type: :ops, default_enabled: :yaml) && active_migration
with_exclusive_lease(active_migration.interval) do
# Now that we have the exclusive lease, reload migration in case another process has changed it.
# This is a temporary solution until we have better concurrency handling around job execution
#
# We also have to disable this cop, because ApplicationRecord aliases reset to reload, but our database
# models don't inherit from ApplicationRecord
active_migration.reload # rubocop:disable Cop/ActiveRecordAssociationReload
run_active_migration if active_migration.active? && active_migration.interval_elapsed?(variance: INTERVAL_VARIANCE)
end
end
private
def active_migration
@active_migration ||= Gitlab::Database::BackgroundMigration::BatchedMigration.active_migration
end
def run_active_migration
Gitlab::Database::BackgroundMigration::BatchedMigrationRunner.new.run_migration_job(active_migration)
end
def with_exclusive_lease(interval)
timeout = [interval * LEASE_TIMEOUT_MULTIPLIER, MINIMUM_LEASE_TIMEOUT].max
lease = Gitlab::ExclusiveLease.new(lease_key, timeout: timeout)
yield if lease.try_obtain
ensure
lease&.cancel
end
def lease_key
self.class.name.demodulize.underscore
def self.tracking_database
@tracking_database ||= Gitlab::Database::MAIN_DATABASE_NAME.to_sym
end
end
end
......@@ -611,6 +611,9 @@ Settings.cron_jobs['ci_delete_unit_tests_worker']['job_class'] = 'Ci::DeleteUnit
Settings.cron_jobs['batched_background_migrations_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['batched_background_migrations_worker']['cron'] ||= '* * * * *'
Settings.cron_jobs['batched_background_migrations_worker']['job_class'] = 'Database::BatchedBackgroundMigrationWorker'
Settings.cron_jobs['batched_background_migration_worker_ci_database'] ||= Settingslogic.new({})
Settings.cron_jobs['batched_background_migration_worker_ci_database']['cron'] ||= '* * * * *'
Settings.cron_jobs['batched_background_migration_worker_ci_database']['job_class'] = 'Database::BatchedBackgroundMigration::CiDatabaseWorker'
Settings.cron_jobs['issues_reschedule_stuck_issue_rebalances'] ||= Settingslogic.new({})
Settings.cron_jobs['issues_reschedule_stuck_issue_rebalances']['cron'] ||= '*/15 * * * *'
Settings.cron_jobs['issues_reschedule_stuck_issue_rebalances']['job_class'] = 'Issues::RescheduleStuckIssueRebalancesWorker'
......
# frozen_string_literal: true
module Gitlab
module BackgroundMigration
module BatchingStrategies
# Simple base class for batching strategy job classes.
#
# Any strategy class that inherits from the base class will have connection to the tracking database set on
# initialization.
class BaseStrategy
def initialize(connection:)
@connection = connection
end
def next_batch(*arguments)
raise NotImplementedError,
"#{self.class} does not implement #{__method__}"
end
private
attr_reader :connection
end
end
end
end
......@@ -8,7 +8,7 @@ module Gitlab
# values for the next batch as an array.
#
# If no more batches exist in the table, returns nil.
class PrimaryKeyBatchingStrategy
class PrimaryKeyBatchingStrategy < BaseStrategy
include Gitlab::Database::DynamicModelHelpers
# Finds and returns the next batch in the table.
......@@ -19,7 +19,7 @@ module Gitlab
# batch_size - The size of the next batch
# job_arguments - The migration job arguments
def next_batch(table_name, column_name, batch_min_value:, batch_size:, job_arguments:)
model_class = define_batchable_model(table_name, connection: ActiveRecord::Base.connection)
model_class = define_batchable_model(table_name, connection: connection)
quoted_column_name = model_class.connection.quote_column_name(column_name)
relation = model_class.where("#{quoted_column_name} >= ?", batch_min_value)
......
......@@ -3,7 +3,7 @@
module Gitlab
module Database
module BackgroundMigration
class BatchedJob < ActiveRecord::Base # rubocop:disable Rails/ApplicationRecord
class BatchedJob < SharedModel
include EachBatch
include FromUnion
......@@ -87,7 +87,7 @@ module Gitlab
raise 'Job cannot be split further' if new_batch_size < 1
batching_strategy = batched_migration.batch_class.new
batching_strategy = batched_migration.batch_class.new(connection: self.class.connection)
next_batch_bounds = batching_strategy.next_batch(
batched_migration.table_name,
batched_migration.column_name,
......
......@@ -3,7 +3,7 @@
module Gitlab
module Database
module BackgroundMigration
class BatchedMigration < ActiveRecord::Base # rubocop:disable Rails/ApplicationRecord
class BatchedMigration < SharedModel
JOB_CLASS_MODULE = 'Gitlab::BackgroundMigration'
BATCH_CLASS_MODULE = "#{JOB_CLASS_MODULE}::BatchingStrategies"
......
......@@ -6,12 +6,13 @@ module Gitlab
class BatchedMigrationRunner
FailedToFinalize = Class.new(RuntimeError)
def self.finalize(job_class_name, table_name, column_name, job_arguments)
new.finalize(job_class_name, table_name, column_name, job_arguments)
def self.finalize(job_class_name, table_name, column_name, job_arguments, connection: ApplicationRecord.connection)
new(connection: connection).finalize(job_class_name, table_name, column_name, job_arguments)
end
def initialize(migration_wrapper = BatchedMigrationWrapper.new)
def initialize(migration_wrapper = BatchedMigrationWrapper.new, connection: ApplicationRecord.connection)
@migration_wrapper = migration_wrapper
@connection = connection
end
# Runs the next batched_job for a batched_background_migration.
......@@ -77,7 +78,7 @@ module Gitlab
private
attr_reader :migration_wrapper
attr_reader :migration_wrapper, :connection
def find_or_create_next_batched_job(active_migration)
if next_batch_range = find_next_batch_range(active_migration)
......@@ -88,7 +89,7 @@ module Gitlab
end
def find_next_batch_range(active_migration)
batching_strategy = active_migration.batch_class.new
batching_strategy = active_migration.batch_class.new(connection: connection)
batch_min_value = active_migration.next_min_value
next_batch_bounds = batching_strategy.next_batch(
......
......@@ -29,7 +29,12 @@ module Gitlab
# The worker classes aren't constants here, because that would force
# Application Settings to be loaded earlier causing failures loading
# the environment in rake tasks
EXEMPT_WORKER_NAMES = %w[BackgroundMigrationWorker BackgroundMigration::CiDatabaseWorker Database::BatchedBackgroundMigrationWorker].to_set
EXEMPT_WORKER_NAMES = %w[BackgroundMigrationWorker
BackgroundMigration::CiDatabaseWorker
Database::BatchedBackgroundMigrationWorker
Database::BatchedBackgroundMigration::CiDatabaseWorker].to_set
JOB_STATUS_KEY = 'size_limiter'
class << self
......
......@@ -15,7 +15,7 @@ RSpec.describe Gitlab::BackgroundMigration::BatchingStrategies::BackfillProjectN
let!(:project2) { projects.create!(name: 'project2', path: 'project2', namespace_id: namespace2.id, visibility_level: 20) }
let!(:project3) { projects.create!(name: 'project3', path: 'project3', namespace_id: namespace3.id, visibility_level: 20) }
let!(:project4) { projects.create!(name: 'project4', path: 'project4', namespace_id: namespace3.id, visibility_level: 20) }
let!(:batching_strategy) { described_class.new }
let!(:batching_strategy) { described_class.new(connection: ActiveRecord::Base.connection) }
let(:job_arguments) { [namespace1.id, 'up'] }
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::BackgroundMigration::BatchingStrategies::BaseStrategy, '#next_batch' do
let(:connection) { double(:connection) }
let(:base_strategy_class) { Class.new(described_class) }
let(:base_strategy) { base_strategy_class.new(connection: connection) }
describe '#next_batch' do
it 'raises an error if not overridden by a subclass' do
expect { base_strategy.next_batch }.to raise_error(NotImplementedError, /does not implement next_batch/)
end
end
end
......@@ -3,7 +3,7 @@
require 'spec_helper'
RSpec.describe Gitlab::BackgroundMigration::BatchingStrategies::PrimaryKeyBatchingStrategy, '#next_batch' do
let(:batching_strategy) { described_class.new }
let(:batching_strategy) { described_class.new(connection: ActiveRecord::Base.connection) }
let(:namespaces) { table(:namespaces) }
let!(:namespace1) { namespaces.create!(name: 'batchtest1', path: 'batch-test1') }
......@@ -11,6 +11,8 @@ RSpec.describe Gitlab::BackgroundMigration::BatchingStrategies::PrimaryKeyBatchi
let!(:namespace3) { namespaces.create!(name: 'batchtest3', path: 'batch-test3') }
let!(:namespace4) { namespaces.create!(name: 'batchtest4', path: 'batch-test4') }
it { expect(described_class).to be < Gitlab::BackgroundMigration::BatchingStrategies::BaseStrategy }
context 'when starting on the first batch' do
it 'returns the bounds of the next batch' do
batch_bounds = batching_strategy.next_batch(:namespaces, :id, batch_min_value: namespace1.id, batch_size: 3, job_arguments: nil)
......
......@@ -5,6 +5,8 @@ require 'spec_helper'
RSpec.describe Gitlab::Database::BackgroundMigration::BatchedJob, type: :model do
it_behaves_like 'having unique enum values'
it { is_expected.to be_a Gitlab::Database::SharedModel }
describe 'associations' do
it { is_expected.to belong_to(:batched_migration).with_foreign_key(:batched_background_migration_id) }
it { is_expected.to have_many(:batched_job_transition_logs).with_foreign_key(:batched_background_migration_job_id) }
......
......@@ -428,4 +428,27 @@ RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationRunner do
end
end
end
describe '.finalize' do
context 'when the connection is passed' do
let(:connection) { double('connection') }
let(:table_name) { :_test_batched_migrations_test_table }
let(:column_name) { :some_id }
let(:job_arguments) { [:some, :other, :arguments] }
let(:batched_migration) { create(:batched_background_migration, table_name: table_name, column_name: column_name) }
it 'initializes the object with the given connection' do
expect(described_class).to receive(:new).with(connection: connection).and_call_original
described_class.finalize(
batched_migration.job_class_name,
table_name,
column_name,
job_arguments,
connection: connection
)
end
end
end
end
......@@ -5,6 +5,8 @@ require 'spec_helper'
RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigration, type: :model do
it_behaves_like 'having unique enum values'
it { is_expected.to be_a Gitlab::Database::SharedModel }
describe 'associations' do
it { is_expected.to have_many(:batched_jobs).with_foreign_key(:batched_background_migration_id) }
......
# frozen_string_literal: true
RSpec.shared_examples 'it runs batched background migration jobs' do |tracking_database|
include ExclusiveLeaseHelpers
describe 'defining the job attributes' do
it 'defines the data_consistency as always' do
expect(described_class.get_data_consistency).to eq(:always)
end
it 'defines the feature_category as database' do
expect(described_class.get_feature_category).to eq(:database)
end
it 'defines the idempotency as true' do
expect(described_class.idempotent?).to be_truthy
end
end
describe '.tracking_database' do
it 'does not raise an error' do
expect { described_class.tracking_database }.not_to raise_error
end
it 'overrides the method to return the tracking database' do
expect(described_class.tracking_database).to eq(tracking_database)
end
end
describe '.lease_key' do
let(:lease_key) { described_class.name.demodulize.underscore }
it 'does not raise an error' do
expect { described_class.lease_key }.not_to raise_error
end
it 'returns the lease key' do
expect(described_class.lease_key).to eq(lease_key)
end
end
describe '#perform' do
subject(:worker) { described_class.new }
context 'when the feature flag is disabled' do
before do
stub_feature_flags(execute_batched_migrations_on_schedule: false)
end
it 'does nothing' do
expect(worker).not_to receive(:active_migration)
expect(worker).not_to receive(:run_active_migration)
worker.perform
end
end
context 'when the feature flag is enabled' do
before do
stub_feature_flags(execute_batched_migrations_on_schedule: true)
allow(Gitlab::Database::BackgroundMigration::BatchedMigration).to receive(:active_migration).and_return(nil)
end
context 'when no active migrations exist' do
it 'does nothing' do
expect(worker).not_to receive(:run_active_migration)
worker.perform
end
end
context 'when active migrations exist' do
let(:job_interval) { 5.minutes }
let(:lease_timeout) { 15.minutes }
let(:lease_key) { described_class.name.demodulize.underscore }
let(:migration) { build(:batched_background_migration, :active, interval: job_interval) }
let(:interval_variance) { described_class::INTERVAL_VARIANCE }
before do
allow(Gitlab::Database::BackgroundMigration::BatchedMigration).to receive(:active_migration)
.and_return(migration)
allow(migration).to receive(:interval_elapsed?).with(variance: interval_variance).and_return(true)
allow(migration).to receive(:reload)
end
context 'when the reloaded migration is no longer active' do
it 'does not run the migration' do
expect_to_obtain_exclusive_lease(lease_key, timeout: lease_timeout)
expect(migration).to receive(:reload)
expect(migration).to receive(:active?).and_return(false)
expect(worker).not_to receive(:run_active_migration)
worker.perform
end
end
context 'when the interval has not elapsed' do
it 'does not run the migration' do
expect_to_obtain_exclusive_lease(lease_key, timeout: lease_timeout)
expect(migration).to receive(:interval_elapsed?).with(variance: interval_variance).and_return(false)
expect(worker).not_to receive(:run_active_migration)
worker.perform
end
end
context 'when the reloaded migration is still active and the interval has elapsed' do
it 'runs the migration' do
expect_to_obtain_exclusive_lease(lease_key, timeout: lease_timeout)
expect_next_instance_of(Gitlab::Database::BackgroundMigration::BatchedMigrationRunner) do |instance|
expect(instance).to receive(:run_migration_job).with(migration)
end
expect(worker).to receive(:run_active_migration).and_call_original
worker.perform
end
end
context 'when the calculated timeout is less than the minimum allowed' do
let(:minimum_timeout) { described_class::MINIMUM_LEASE_TIMEOUT }
let(:job_interval) { 2.minutes }
it 'sets the lease timeout to the minimum value' do
expect_to_obtain_exclusive_lease(lease_key, timeout: minimum_timeout)
expect_next_instance_of(Gitlab::Database::BackgroundMigration::BatchedMigrationRunner) do |instance|
expect(instance).to receive(:run_migration_job).with(migration)
end
expect(worker).to receive(:run_active_migration).and_call_original
worker.perform
end
end
it 'always cleans up the exclusive lease' do
lease = stub_exclusive_lease_taken(lease_key, timeout: lease_timeout)
expect(lease).to receive(:try_obtain).and_return(true)
expect(worker).to receive(:run_active_migration).and_raise(RuntimeError, 'I broke')
expect(lease).to receive(:cancel)
expect { worker.perform }.to raise_error(RuntimeError, 'I broke')
end
it 'receives the correct connection' do
base_model = Gitlab::Database.database_base_models[tracking_database]
expect(Gitlab::Database::SharedModel).to receive(:using_connection).with(base_model.connection).and_yield
worker.perform
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Database::BatchedBackgroundMigration::CiDatabaseWorker, :clean_gitlab_redis_shared_state, if: Gitlab::Database.has_config?(:ci) do
it_behaves_like 'it runs batched background migration jobs', 'ci'
end
......@@ -2,120 +2,6 @@
require 'spec_helper'
RSpec.describe Database::BatchedBackgroundMigrationWorker, '#perform', :clean_gitlab_redis_shared_state do
include ExclusiveLeaseHelpers
let(:worker) { described_class.new }
context 'when the feature flag is disabled' do
before do
stub_feature_flags(execute_batched_migrations_on_schedule: false)
end
it 'does nothing' do
expect(worker).not_to receive(:active_migration)
expect(worker).not_to receive(:run_active_migration)
worker.perform
end
end
context 'when the feature flag is enabled' do
before do
stub_feature_flags(execute_batched_migrations_on_schedule: true)
allow(Gitlab::Database::BackgroundMigration::BatchedMigration).to receive(:active_migration).and_return(nil)
end
context 'when no active migrations exist' do
it 'does nothing' do
expect(worker).not_to receive(:run_active_migration)
worker.perform
end
end
context 'when active migrations exist' do
let(:job_interval) { 5.minutes }
let(:lease_timeout) { 15.minutes }
let(:lease_key) { 'batched_background_migration_worker' }
let(:migration) { build(:batched_background_migration, :active, interval: job_interval) }
let(:interval_variance) { described_class::INTERVAL_VARIANCE }
before do
allow(Gitlab::Database::BackgroundMigration::BatchedMigration).to receive(:active_migration)
.and_return(migration)
allow(migration).to receive(:interval_elapsed?).with(variance: interval_variance).and_return(true)
allow(migration).to receive(:reload)
end
context 'when the reloaded migration is no longer active' do
it 'does not run the migration' do
expect_to_obtain_exclusive_lease(lease_key, timeout: lease_timeout)
expect(migration).to receive(:reload)
expect(migration).to receive(:active?).and_return(false)
expect(worker).not_to receive(:run_active_migration)
worker.perform
end
end
context 'when the interval has not elapsed' do
it 'does not run the migration' do
expect_to_obtain_exclusive_lease(lease_key, timeout: lease_timeout)
expect(migration).to receive(:interval_elapsed?).with(variance: interval_variance).and_return(false)
expect(worker).not_to receive(:run_active_migration)
worker.perform
end
end
context 'when the reloaded migration is still active and the interval has elapsed' do
it 'runs the migration' do
expect_to_obtain_exclusive_lease(lease_key, timeout: lease_timeout)
expect_next_instance_of(Gitlab::Database::BackgroundMigration::BatchedMigrationRunner) do |instance|
expect(instance).to receive(:run_migration_job).with(migration)
end
expect(worker).to receive(:run_active_migration).and_call_original
worker.perform
end
end
context 'when the calculated timeout is less than the minimum allowed' do
let(:minimum_timeout) { described_class::MINIMUM_LEASE_TIMEOUT }
let(:job_interval) { 2.minutes }
it 'sets the lease timeout to the minimum value' do
expect_to_obtain_exclusive_lease(lease_key, timeout: minimum_timeout)
expect_next_instance_of(Gitlab::Database::BackgroundMigration::BatchedMigrationRunner) do |instance|
expect(instance).to receive(:run_migration_job).with(migration)
end
expect(worker).to receive(:run_active_migration).and_call_original
worker.perform
end
end
it 'always cleans up the exclusive lease' do
lease = stub_exclusive_lease_taken(lease_key, timeout: lease_timeout)
expect(lease).to receive(:try_obtain).and_return(true)
expect(worker).to receive(:run_active_migration).and_raise(RuntimeError, 'I broke')
expect(lease).to receive(:cancel)
expect { worker.perform }.to raise_error(RuntimeError, 'I broke')
end
end
end
RSpec.describe Database::BatchedBackgroundMigrationWorker do
it_behaves_like 'it runs batched background migration jobs', :main
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment