Commit 64434884 authored by pbair's avatar pbair

Support auto-batching background migrations

Introduce a new background migration queueing system that tracks the
progress of each migration in the database. This allows the application
to queue background jobs per batch over time, rather than in a single
call when the migration is initially run. It also gives better control
over the execution, tracking job staus and allowing configuration
options like batch size to be adjusted on the fly.
parent 9458c49d
---
title: Create tables to track auto-batched background migrations
merge_request: 54628
author:
type: added
# frozen_string_literal: true
class CreateBackgroundMigrationTrackingTables < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def change
create_table_with_constraints :batched_background_migrations do |t|
t.timestamps_with_timezone
t.bigint :min_value, null: false, default: 1
t.bigint :max_value, null: false
t.integer :batch_size, null: false
t.integer :sub_batch_size, null: false
t.integer :interval, limit: 2, null: false
t.integer :status, limit: 2, null: false, default: 0
t.text :job_class_name, null: false
t.text :batch_class_name, null: false,
default: 'Gitlab::Database::BackgroundMigration::PrimaryKeyBatchingStrategy'
t.text :table_name, null: false
t.text :column_name, null: false
t.jsonb :job_arguments, null: false, default: '[]'
t.text_limit :job_class_name, 100
t.text_limit :batch_class_name, 100
t.text_limit :table_name, 63
t.text_limit :column_name, 63
t.check_constraint :check_positive_min_value, 'min_value > 0'
t.check_constraint :check_max_value_in_range, 'max_value >= min_value'
t.check_constraint :check_positive_sub_batch_size, 'sub_batch_size > 0'
t.check_constraint :check_batch_size_in_range, 'batch_size >= sub_batch_size'
t.index %i[job_class_name table_name column_name], name: :index_batched_migrations_on_job_table_and_column_name
end
create_table :batched_background_migration_jobs do |t|
t.timestamps_with_timezone
t.datetime_with_timezone :started_at
t.datetime_with_timezone :finished_at
t.references :batched_background_migration, null: false, index: false, foreign_key: { on_delete: :cascade }
t.bigint :min_value, null: false
t.bigint :max_value, null: false
t.integer :batch_size, null: false
t.integer :sub_batch_size, null: false
t.integer :status, limit: 2, null: false, default: 0
t.integer :attempts, limit: 2, null: false, default: 0
t.index [:batched_background_migration_id, :created_at], name: :index_batched_jobs_by_batched_migration_id_created_at
end
end
def down
drop_table :batched_background_migration_jobs
drop_table :batched_background_migrations
end
end
1cf1305ad5eaaef51f99f057b8a2e81731d69a6d02629c0c9a7d94dfdecbea47
\ No newline at end of file
......@@ -9768,6 +9768,64 @@ CREATE SEQUENCE badges_id_seq
ALTER SEQUENCE badges_id_seq OWNED BY badges.id;
CREATE TABLE batched_background_migration_jobs (
id bigint NOT NULL,
created_at timestamp with time zone NOT NULL,
updated_at timestamp with time zone NOT NULL,
started_at timestamp with time zone,
finished_at timestamp with time zone,
batched_background_migration_id bigint NOT NULL,
min_value bigint NOT NULL,
max_value bigint NOT NULL,
batch_size integer NOT NULL,
sub_batch_size integer NOT NULL,
status smallint DEFAULT 0 NOT NULL,
attempts smallint DEFAULT 0 NOT NULL
);
CREATE SEQUENCE batched_background_migration_jobs_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE batched_background_migration_jobs_id_seq OWNED BY batched_background_migration_jobs.id;
CREATE TABLE batched_background_migrations (
id bigint NOT NULL,
created_at timestamp with time zone NOT NULL,
updated_at timestamp with time zone NOT NULL,
min_value bigint DEFAULT 1 NOT NULL,
max_value bigint NOT NULL,
batch_size integer NOT NULL,
sub_batch_size integer NOT NULL,
"interval" smallint NOT NULL,
status smallint DEFAULT 0 NOT NULL,
job_class_name text NOT NULL,
batch_class_name text DEFAULT 'Gitlab::Database::BackgroundMigration::PrimaryKeyBatchingStrategy'::text NOT NULL,
table_name text NOT NULL,
column_name text NOT NULL,
job_arguments jsonb DEFAULT '"[]"'::jsonb NOT NULL,
CONSTRAINT check_5bb0382d6f CHECK ((char_length(column_name) <= 63)),
CONSTRAINT check_6b6a06254a CHECK ((char_length(table_name) <= 63)),
CONSTRAINT check_batch_size_in_range CHECK ((batch_size >= sub_batch_size)),
CONSTRAINT check_e6c75b1e29 CHECK ((char_length(job_class_name) <= 100)),
CONSTRAINT check_fe10674721 CHECK ((char_length(batch_class_name) <= 100)),
CONSTRAINT check_max_value_in_range CHECK ((max_value >= min_value)),
CONSTRAINT check_positive_min_value CHECK ((min_value > 0)),
CONSTRAINT check_positive_sub_batch_size CHECK ((sub_batch_size > 0))
);
CREATE SEQUENCE batched_background_migrations_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE batched_background_migrations_id_seq OWNED BY batched_background_migrations.id;
CREATE TABLE board_assignees (
id integer NOT NULL,
board_id integer NOT NULL,
......@@ -18811,6 +18869,10 @@ ALTER TABLE ONLY background_migration_jobs ALTER COLUMN id SET DEFAULT nextval('
ALTER TABLE ONLY badges ALTER COLUMN id SET DEFAULT nextval('badges_id_seq'::regclass);
ALTER TABLE ONLY batched_background_migration_jobs ALTER COLUMN id SET DEFAULT nextval('batched_background_migration_jobs_id_seq'::regclass);
ALTER TABLE ONLY batched_background_migrations ALTER COLUMN id SET DEFAULT nextval('batched_background_migrations_id_seq'::regclass);
ALTER TABLE ONLY board_assignees ALTER COLUMN id SET DEFAULT nextval('board_assignees_id_seq'::regclass);
ALTER TABLE ONLY board_group_recent_visits ALTER COLUMN id SET DEFAULT nextval('board_group_recent_visits_id_seq'::regclass);
......@@ -19892,6 +19954,12 @@ ALTER TABLE ONLY background_migration_jobs
ALTER TABLE ONLY badges
ADD CONSTRAINT badges_pkey PRIMARY KEY (id);
ALTER TABLE ONLY batched_background_migration_jobs
ADD CONSTRAINT batched_background_migration_jobs_pkey PRIMARY KEY (id);
ALTER TABLE ONLY batched_background_migrations
ADD CONSTRAINT batched_background_migrations_pkey PRIMARY KEY (id);
ALTER TABLE ONLY board_assignees
ADD CONSTRAINT board_assignees_pkey PRIMARY KEY (id);
......@@ -21637,6 +21705,10 @@ CREATE INDEX index_badges_on_group_id ON badges USING btree (group_id);
CREATE INDEX index_badges_on_project_id ON badges USING btree (project_id);
CREATE INDEX index_batched_jobs_by_batched_migration_id_created_at ON batched_background_migration_jobs USING btree (batched_background_migration_id, created_at);
CREATE INDEX index_batched_migrations_on_job_table_and_column_name ON batched_background_migrations USING btree (job_class_name, table_name, column_name);
CREATE INDEX index_board_assignees_on_assignee_id ON board_assignees USING btree (assignee_id);
CREATE UNIQUE INDEX index_board_assignees_on_board_id_and_assignee_id ON board_assignees USING btree (board_id, assignee_id);
......@@ -25379,6 +25451,9 @@ ALTER TABLE ONLY ci_resources
ALTER TABLE ONLY clusters_applications_fluentd
ADD CONSTRAINT fk_rails_4319b1dcd2 FOREIGN KEY (cluster_id) REFERENCES clusters(id) ON DELETE CASCADE;
ALTER TABLE ONLY batched_background_migration_jobs
ADD CONSTRAINT fk_rails_432153b86d FOREIGN KEY (batched_background_migration_id) REFERENCES batched_background_migrations(id) ON DELETE CASCADE;
ALTER TABLE ONLY operations_strategies_user_lists
ADD CONSTRAINT fk_rails_43241e8d29 FOREIGN KEY (strategy_id) REFERENCES operations_strategies(id) ON DELETE CASCADE;
......@@ -2,13 +2,11 @@
module Gitlab
module BackgroundMigration
# Background migration that extends CopyColumn to update the value of a
# Background migration that updates the value of a
# column using the value of another column in the same table.
#
# - The {start_id, end_id} arguments are at the start so that it can be used
# with `queue_background_migration_jobs_by_range_at_intervals`
# - Provides support for background job tracking through the use of
# Gitlab::Database::BackgroundMigrationJob
# with `queue_batched_background_migration`
# - Uses sub-batching so that we can keep each update's execution time at
# low 100s ms, while being able to update more records per 2 minutes
# that we allow background migration jobs to be scheduled one after the other
......@@ -22,28 +20,24 @@ module Gitlab
# start_id - The start ID of the range of rows to update.
# end_id - The end ID of the range of rows to update.
# table - The name of the table that contains the columns.
# primary_key - The primary key column of the table.
# copy_from - The column containing the data to copy.
# copy_to - The column to copy the data to.
# batch_table - The name of the table that contains the columns.
# batch_column - The name of the column we use to batch over the table.
# sub_batch_size - We don't want updates to take more than ~100ms
# This allows us to run multiple smaller batches during
# the minimum 2.minute interval that we can schedule jobs
def perform(start_id, end_id, table, primary_key, copy_from, copy_to, sub_batch_size)
# copy_from - The column containing the data to copy.
# copy_to - The column to copy the data to.
def perform(start_id, end_id, batch_table, batch_column, sub_batch_size, copy_from, copy_to)
quoted_copy_from = connection.quote_column_name(copy_from)
quoted_copy_to = connection.quote_column_name(copy_to)
parent_batch_relation = relation_scoped_to_range(table, primary_key, start_id, end_id)
parent_batch_relation = relation_scoped_to_range(batch_table, batch_column, start_id, end_id)
parent_batch_relation.each_batch(column: primary_key, of: sub_batch_size) do |sub_batch|
parent_batch_relation.each_batch(column: batch_column, of: sub_batch_size) do |sub_batch|
sub_batch.update_all("#{quoted_copy_to}=#{quoted_copy_from}")
sleep(PAUSE_SECONDS)
end
# We have to add all arguments when marking a job as succeeded as they
# are all used to track the job by `queue_background_migration_jobs_by_range_at_intervals`
mark_job_as_succeeded(start_id, end_id, table, primary_key, copy_from, copy_to, sub_batch_size)
end
private
......@@ -52,10 +46,6 @@ module Gitlab
ActiveRecord::Base.connection
end
def mark_job_as_succeeded(*arguments)
Gitlab::Database::BackgroundMigrationJob.mark_all_as_succeeded(self.class.name, arguments)
end
def relation_scoped_to_range(source_table, source_key_column, start_id, stop_id)
define_batchable_model(source_table).where(source_key_column => start_id..stop_id)
end
......
# frozen_string_literal: true
module Gitlab
module Database
module BackgroundMigration
class BatchedJob < ActiveRecord::Base # rubocop:disable Rails/ApplicationRecord
self.table_name = :batched_background_migration_jobs
belongs_to :batched_migration, foreign_key: :batched_background_migration_id
enum status: {
pending: 0,
running: 1,
failed: 2,
succeeded: 3
}
delegate :aborted?, :job_class, :table_name, :column_name, :job_arguments,
to: :batched_migration, prefix: :migration
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Database
module BackgroundMigration
class BatchedMigration < ActiveRecord::Base # rubocop:disable Rails/ApplicationRecord
self.table_name = :batched_background_migrations
has_many :batched_jobs, foreign_key: :batched_background_migration_id
has_one :last_job, -> { order(created_at: :desc) },
class_name: 'Gitlab::Database::BackgroundMigration::BatchedJob',
foreign_key: :batched_background_migration_id
scope :queue_order, -> { order(created_at: :asc) }
scope :for_batch_configuration, -> (job_class_name, table_name, column_name) do
where(job_class_name: remove_toplevel_prefix(job_class_name),
table_name: table_name,
column_name: column_name)
end
enum status: {
paused: 0,
active: 1,
aborted: 2,
finished: 3
}
def self.remove_toplevel_prefix(name)
name&.sub(/\A::/, '')
end
def interval_elapsed?
last_job.nil? || last_job.created_at <= Time.current - interval
end
def create_batched_job!(min, max)
batched_jobs.create!(min_value: min, max_value: max, batch_size: batch_size, sub_batch_size: sub_batch_size)
end
def next_min_value
last_job&.max_value&.next || min_value
end
def job_class
job_class_name.constantize
end
def batch_class
batch_class_name.constantize
end
def job_class_name=(class_name)
write_attribute(:job_class_name, self.class.remove_toplevel_prefix(class_name))
end
def batch_class_name=(class_name)
write_attribute(:batch_class_name, self.class.remove_toplevel_prefix(class_name))
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Database
module BackgroundMigration
class BatchedMigrationWrapper
def perform(batch_tracking_record)
return if batch_tracking_record.migration_aborted?
begin
start_tracking_execution(batch_tracking_record)
execute_batch(batch_tracking_record)
batch_tracking_record.status = :succeeded
rescue => e
batch_tracking_record.status = :failed
raise e
ensure
finish_tracking_execution(batch_tracking_record)
end
end
private
def start_tracking_execution(tracking_record)
tracking_record.update!(attempts: tracking_record.attempts + 1, status: :running, started_at: Time.current)
end
def execute_batch(tracking_record)
job_instance = tracking_record.migration_job_class.new
job_instance.perform(
tracking_record.min_value,
tracking_record.max_value,
tracking_record.migration_table_name,
tracking_record.migration_column_name,
tracking_record.sub_batch_size,
*tracking_record.migration_job_arguments)
end
def finish_tracking_execution(tracking_record)
tracking_record.finished_at = Time.current
tracking_record.save!
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Database
module BackgroundMigration
class PrimaryKeyBatchingStrategy
include Gitlab::Database::DynamicModelHelpers
def next_batch(table_name, column_name, batch_min_value:, batch_size:)
model_class = define_batchable_model(table_name)
quoted_column_name = model_class.connection.quote_column_name(column_name)
relation = model_class.where("#{quoted_column_name} >= ?", batch_min_value)
next_batch_bounds = nil
relation.each_batch(of: batch_size, column: column_name) do |batch| # rubocop:disable Lint/UnreachableLoop
next_batch_bounds = batch.pluck(Arel.sql("MIN(#{quoted_column_name}), MAX(#{quoted_column_name})")).first
break
end
next_batch_bounds
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Database
module BackgroundMigration
class Scheduler
def perform(migration_wrapper: BatchedMigrationWrapper.new)
active_migration = BatchedMigration.active.queue_order.first
return unless active_migration&.interval_elapsed?
next_batched_job = create_next_batched_job!(active_migration)
if next_batched_job.nil?
finish_active_migration(active_migration)
return
end
migration_wrapper.perform(next_batched_job)
end
private
def create_next_batched_job!(active_migration)
next_batch_range = find_next_batch_range(active_migration)
return if next_batch_range.nil?
active_migration.create_batched_job!(next_batch_range.min, next_batch_range.max)
end
def find_next_batch_range(active_migration)
batching_strategy = active_migration.batch_class.new
batch_min_value = active_migration.next_min_value
next_batch_bounds = batching_strategy.next_batch(
active_migration.table_name,
active_migration.column_name,
batch_min_value: batch_min_value,
batch_size: active_migration.batch_size)
return if next_batch_bounds.nil?
clamped_batch_range(active_migration, next_batch_bounds)
end
def clamped_batch_range(active_migration, next_bounds)
min_value, max_value = next_bounds
return if min_value > active_migration.max_value
max_value = max_value.clamp(min_value, active_migration.max_value)
(min_value..max_value)
end
def finish_active_migration(active_migration)
active_migration.finished!
end
end
end
end
end
......@@ -5,7 +5,9 @@ module Gitlab
module Migrations
module BackgroundMigrationHelpers
BACKGROUND_MIGRATION_BATCH_SIZE = 1_000 # Number of rows to process per job
BACKGROUND_MIGRATION_SUB_BATCH_SIZE = 100 # Number of rows to process per sub-batch
BACKGROUND_MIGRATION_JOB_BUFFER_SIZE = 1_000 # Number of jobs to bulk queue at a time
BACKGROUND_MIGRATION_BATCH_CLASS_NAME = 'Gitlab::Database::BackgroundMigration::PrimaryKeyBatchingStrategy'
# Bulk queues background migration jobs for an entire table, batched by ID range.
# "Bulk" meaning many jobs will be pushed at a time for efficiency.
......@@ -55,6 +57,47 @@ module Gitlab
bulk_migrate_async(jobs) unless jobs.empty?
end
def queue_batched_background_migration( # rubocop:disable Metrics/ParameterLists
job_class_name,
batch_table_name,
batch_column_name,
job_interval:,
batch_min_value: 1,
batch_max_value: nil,
batch_class_name: BACKGROUND_MIGRATION_BATCH_CLASS_NAME,
batch_size: BACKGROUND_MIGRATION_BATCH_SIZE,
sub_batch_size: BACKGROUND_MIGRATION_SUB_BATCH_SIZE,
other_job_arguments: []
)
batch_max_value ||= connection.select_value(<<~SQL)
SELECT MAX(#{connection.quote_column_name(batch_column_name)})
FROM #{connection.quote_table_name(batch_table_name)}
SQL
return if batch_max_value.nil?
Gitlab::Database::BackgroundMigration::BatchedMigration.create!(
job_class_name: job_class_name,
table_name: batch_table_name,
column_name: batch_column_name,
interval: job_interval,
min_value: batch_min_value,
max_value: batch_max_value,
batch_class_name: batch_class_name,
batch_size: batch_size,
sub_batch_size: sub_batch_size,
job_arguments: other_job_arguments,
status: :active)
end
def abort_batched_background_migrations(job_class_name, table_name, column_name)
Gitlab::Database::BackgroundMigration::BatchedMigration
.for_batch_configuration(job_class_name, table_name, column_name)
.not_finished
.update_all(status: :aborted, updated_at: Time.current)
end
# Queues background migration jobs for an entire table in batches.
# The default batching column used is the standard primary key `id`.
# Each job is scheduled with a `delay_interval` in between.
......
# frozen_string_literal: true
FactoryBot.define do
factory :batched_background_migration_job, class: '::Gitlab::Database::BackgroundMigration::BatchedJob' do
batched_migration factory: :batched_background_migration
min_value { 1 }
max_value { 10 }
batch_size { 5 }
sub_batch_size { 1 }
end
end
# frozen_string_literal: true
FactoryBot.define do
factory :batched_background_migration, class: '::Gitlab::Database::BackgroundMigration::BatchedMigration' do
max_value { 10 }
batch_size { 5 }
sub_batch_size { 1 }
interval { 2.minutes }
job_class_name { 'Gitlab::BackgroundMigration::CopyColumnUsingBackgroundMigrationJob' }
table_name { :events }
column_name { :id }
end
end
......@@ -38,22 +38,9 @@ RSpec.describe Gitlab::BackgroundMigration::CopyColumnUsingBackgroundMigrationJo
describe '#perform' do
let(:migration_class) { described_class.name }
let!(:job1) do
table(:background_migration_jobs).create!(
class_name: migration_class,
arguments: [1, 10, table_name, 'id', 'id', 'id_convert_to_bigint', sub_batch_size]
)
end
let!(:job2) do
table(:background_migration_jobs).create!(
class_name: migration_class,
arguments: [11, 20, table_name, 'id', 'id', 'id_convert_to_bigint', sub_batch_size]
)
end
it 'copies all primary keys in range' do
subject.perform(12, 15, table_name, 'id', 'id', 'id_convert_to_bigint', sub_batch_size)
subject.perform(12, 15, table_name, 'id', sub_batch_size, 'id', 'id_convert_to_bigint')
expect(test_table.where('id = id_convert_to_bigint').pluck(:id)).to contain_exactly(12, 15)
expect(test_table.where(id_convert_to_bigint: 0).pluck(:id)).to contain_exactly(11, 19)
......@@ -61,7 +48,7 @@ RSpec.describe Gitlab::BackgroundMigration::CopyColumnUsingBackgroundMigrationJo
end
it 'copies all foreign keys in range' do
subject.perform(10, 14, table_name, 'id', 'fk', 'fk_convert_to_bigint', sub_batch_size)
subject.perform(10, 14, table_name, 'id', sub_batch_size, 'fk', 'fk_convert_to_bigint')
expect(test_table.where('fk = fk_convert_to_bigint').pluck(:id)).to contain_exactly(11, 12)
expect(test_table.where(fk_convert_to_bigint: 0).pluck(:id)).to contain_exactly(15, 19)
......@@ -71,21 +58,11 @@ RSpec.describe Gitlab::BackgroundMigration::CopyColumnUsingBackgroundMigrationJo
it 'copies columns with NULLs' do
expect(test_table.where("name_convert_to_text = 'no name'").count).to eq(4)
subject.perform(10, 20, table_name, 'id', 'name', 'name_convert_to_text', sub_batch_size)
subject.perform(10, 20, table_name, 'id', sub_batch_size, 'name', 'name_convert_to_text')
expect(test_table.where('name = name_convert_to_text').pluck(:id)).to contain_exactly(11, 12, 19)
expect(test_table.where('name is NULL and name_convert_to_text is NULL').pluck(:id)).to contain_exactly(15)
expect(test_table.where("name_convert_to_text = 'no name'").count).to eq(0)
end
it 'tracks completion with BackgroundMigrationJob' do
expect do
subject.perform(11, 20, table_name, 'id', 'id', 'id_convert_to_bigint', sub_batch_size)
end.to change { Gitlab::Database::BackgroundMigrationJob.succeeded.count }.from(0).to(1)
expect(job1.reload.status).to eq(0)
expect(job2.reload.status).to eq(1)
expect(test_table.where('id = id_convert_to_bigint').count).to eq(4)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Database::BackgroundMigration::BatchedJob, type: :model do
it_behaves_like 'having unique enum values'
describe 'associations' do
it { is_expected.to belong_to(:batched_migration).with_foreign_key(:batched_background_migration_id) }
end
describe 'delegated batched_migration attributes' do
let(:batched_job) { build(:batched_background_migration_job) }
let(:batched_migration) { batched_job.batched_migration }
describe '#migration_job_class' do
before do
batched_migration.status = :aborted
end
it 'returns the migration aborted?' do
expect(batched_job.migration_aborted?).to eq(batched_migration.aborted?)
end
end
describe '#migration_job_class' do
it 'returns the migration job_class' do
expect(batched_job.migration_job_class).to eq(batched_migration.job_class)
end
end
describe '#migration_table_name' do
it 'returns the migration table_name' do
expect(batched_job.migration_table_name).to eq(batched_migration.table_name)
end
end
describe '#migration_column_name' do
it 'returns the migration column_name' do
expect(batched_job.migration_column_name).to eq(batched_migration.column_name)
end
end
describe '#migration_job_arguments' do
it 'returns the migration job_arguments' do
expect(batched_job.migration_job_arguments).to eq(batched_migration.job_arguments)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigration, type: :model do
it_behaves_like 'having unique enum values'
describe 'associations' do
it { is_expected.to have_many(:batched_jobs).with_foreign_key(:batched_background_migration_id) }
describe '#last_job' do
let!(:batched_migration) { create(:batched_background_migration) }
let!(:batched_job1) { create(:batched_background_migration_job, batched_migration: batched_migration) }
let!(:batched_job2) { create(:batched_background_migration_job, batched_migration: batched_migration, created_at: 1.month.ago) }
it 'returns the most recently created batch_job' do
expect(batched_migration.last_job).to eq(batched_job1)
end
end
end
describe '.queue_order' do
let!(:migration1) { create(:batched_background_migration, created_at: 1.month.ago) }
let!(:migration2) { create(:batched_background_migration, created_at: 2.months.ago) }
let!(:migration3) { create(:batched_background_migration) }
it 'returns batched_migrations ordered by created_at' do
expect(described_class.queue_order.all).to eq([migration2, migration1, migration3])
end
end
describe '.for_batch_configuration' do
let(:job_class_name) { 'TestClass' }
let(:table_name) { :issues }
let(:column_name) { :project_id }
let!(:migration1) { create(:batched_background_migration, job_class_name: job_class_name, table_name: table_name) }
let!(:migration2) { create(:batched_background_migration, job_class_name: job_class_name, column_name: column_name) }
let!(:migration3) { create(:batched_background_migration, job_class_name: job_class_name, table_name: table_name, column_name: column_name) }
let!(:migration4) { create(:batched_background_migration, table_name: table_name, column_name: column_name) }
it 'returns records matching the migration configuration' do
relation = described_class.for_batch_configuration(job_class_name, table_name, column_name)
expect(relation.all).to eq([migration3])
end
it 'normalizes the job class name' do
relation = described_class.for_batch_configuration("::#{job_class_name}", table_name, column_name)
expect(relation.all).to eq([migration3])
end
end
describe '#interval_elapsed?' do
context 'when the migration has no last_job' do
let(:batched_migration) { build(:batched_background_migration) }
it 'returns false' do
expect(batched_migration.interval_elapsed?).to eq(true)
end
end
context 'when the migration has a last_job' do
let(:interval) { 2.minutes }
let(:batched_migration) { create(:batched_background_migration, interval: interval) }
context 'when the last_job is less than an interval old' do
it 'returns false' do
freeze_time do
create(:batched_background_migration_job,
batched_migration: batched_migration,
created_at: Time.current - 1.minute)
expect(batched_migration.interval_elapsed?).to eq(false)
end
end
end
context 'when the last_job is exactly an interval old' do
it 'returns true' do
freeze_time do
create(:batched_background_migration_job,
batched_migration: batched_migration,
created_at: Time.current - 2.minutes)
expect(batched_migration.interval_elapsed?).to eq(true)
end
end
end
context 'when the last_job is more than an interval old' do
it 'returns true' do
freeze_time do
create(:batched_background_migration_job,
batched_migration: batched_migration,
created_at: Time.current - 3.minutes)
expect(batched_migration.interval_elapsed?).to eq(true)
end
end
end
end
end
describe '#create_batched_job!' do
let(:batched_migration) { create(:batched_background_migration) }
it 'creates a batched_job with the correct batch configuration' do
batched_job = batched_migration.create_batched_job!(1, 5)
expect(batched_job).to have_attributes(
min_value: 1,
max_value: 5,
batch_size: batched_migration.batch_size,
sub_batch_size: batched_migration.sub_batch_size)
end
end
describe '#next_min_value' do
let!(:batched_migration) { create(:batched_background_migration) }
context 'when a previous job exists' do
let!(:batched_job) { create(:batched_background_migration_job, batched_migration: batched_migration) }
it 'returns the next value after the previous maximum' do
expect(batched_migration.next_min_value).to eq(batched_job.max_value + 1)
end
end
context 'when a previous job does not exist' do
it 'returns the migration minimum value' do
expect(batched_migration.next_min_value).to eq(batched_migration.min_value)
end
end
end
describe '#job_class' do
let(:job_class) { Gitlab::BackgroundMigration::CopyColumnUsingBackgroundMigrationJob }
let(:batched_migration) { build(:batched_background_migration) }
it 'returns the class of the job for the migration' do
expect(batched_migration.job_class).to eq(job_class)
end
end
describe '#batch_class' do
let(:batch_class) { Gitlab::Database::BackgroundMigration::PrimaryKeyBatchingStrategy}
let(:batched_migration) { build(:batched_background_migration) }
it 'returns the class of the batch strategy for the migration' do
expect(batched_migration.batch_class).to eq(batch_class)
end
end
shared_examples_for 'an attr_writer that normalizes assigned class names' do |attribute_name|
let(:batched_migration) { build(:batched_background_migration) }
context 'when the toplevel namespace prefix exists' do
it 'removes the leading prefix' do
batched_migration.public_send(:"#{attribute_name}=", '::Foo::Bar')
expect(batched_migration[attribute_name]).to eq('Foo::Bar')
end
end
context 'when the toplevel namespace prefix does not exist' do
it 'does not change the given class name' do
batched_migration.public_send(:"#{attribute_name}=", '::Foo::Bar')
expect(batched_migration[attribute_name]).to eq('Foo::Bar')
end
end
end
describe '#job_class_name=' do
it_behaves_like 'an attr_writer that normalizes assigned class names', :job_class_name
end
describe '#batch_class_name=' do
it_behaves_like 'an attr_writer that normalizes assigned class names', :batch_class_name
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper, '#perform' do
let(:migration_wrapper) { described_class.new }
let(:job_class) { Gitlab::BackgroundMigration::CopyColumnUsingBackgroundMigrationJob }
let_it_be(:active_migration) { create(:batched_background_migration, :active, job_arguments: [:id, :other_id]) }
context 'when the migration is aborted' do
let!(:aborted_migration) { create(:batched_background_migration, :aborted) }
let!(:job_record) { create(:batched_background_migration_job, batched_migration: aborted_migration) }
it 'does not run the migration job instance' do
expect(job_record).not_to receive(:migration_job_class)
expect(job_class).not_to receive(:new)
migration_wrapper.perform(job_record)
end
it 'does not update the tracking record in the database' do
migration_wrapper.perform(job_record)
reloaded_job_record = job_record.reload
expect(reloaded_job_record).to be_pending
expect(reloaded_job_record.attempts).to eq(0)
end
end
context 'when the migration is not aborted' do
let!(:job_record) { create(:batched_background_migration_job, batched_migration: active_migration) }
it 'runs the migration job' do
expect_next_instance_of(job_class) do |job_instance|
expect(job_instance).to receive(:perform).with(1, 10, 'events', 'id', 1, 'id', 'other_id')
end
migration_wrapper.perform(job_record)
end
it 'updates the the tracking record in the database' do
expect(job_record).to receive(:update!).with(hash_including(attempts: 1, status: :running)).and_call_original
freeze_time do
migration_wrapper.perform(job_record)
reloaded_job_record = job_record.reload
expect(reloaded_job_record).not_to be_pending
expect(reloaded_job_record.attempts).to eq(1)
expect(reloaded_job_record.started_at).to eq(Time.current)
end
end
context 'when the migration job does not raise an error' do
it 'marks the tracking record as succeeded' do
expect_next_instance_of(job_class) do |job_instance|
expect(job_instance).to receive(:perform).with(1, 10, 'events', 'id', 1, 'id', 'other_id')
end
freeze_time do
migration_wrapper.perform(job_record)
reloaded_job_record = job_record.reload
expect(reloaded_job_record).to be_succeeded
expect(reloaded_job_record.finished_at).to eq(Time.current)
end
end
end
context 'when the migration job raises an error' do
it 'marks the tracking record as failed before raising the error' do
expect_next_instance_of(job_class) do |job_instance|
expect(job_instance).to receive(:perform)
.with(1, 10, 'events', 'id', 1, 'id', 'other_id')
.and_raise(RuntimeError, 'Something broke!')
end
freeze_time do
expect { migration_wrapper.perform(job_record) }.to raise_error(RuntimeError, 'Something broke!')
reloaded_job_record = job_record.reload
expect(reloaded_job_record).to be_failed
expect(reloaded_job_record.finished_at).to eq(Time.current)
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Database::BackgroundMigration::PrimaryKeyBatchingStrategy, '#next_batch' do
let(:batching_strategy) { described_class.new }
let_it_be(:event1) { create(:event) }
let_it_be(:event2) { create(:event) }
let_it_be(:event3) { create(:event) }
let_it_be(:event4) { create(:event) }
context 'when starting on the first batch' do
it 'returns the bounds of the next batch' do
batch_bounds = batching_strategy.next_batch(:events, :id, batch_min_value: event1.id, batch_size: 3)
expect(batch_bounds).to eq([event1.id, event3.id])
end
end
context 'when additional batches remain' do
it 'returns the bounds of the next batch' do
batch_bounds = batching_strategy.next_batch(:events, :id, batch_min_value: event2.id, batch_size: 3)
expect(batch_bounds).to eq([event2.id, event4.id])
end
end
context 'when on the final batch' do
it 'returns the bounds of the next batch' do
batch_bounds = batching_strategy.next_batch(:events, :id, batch_min_value: event4.id, batch_size: 3)
expect(batch_bounds).to eq([event4.id, event4.id])
end
end
context 'when no additional batches remain' do
it 'returns nil' do
batch_bounds = batching_strategy.next_batch(:events, :id, batch_min_value: event4.id + 1, batch_size: 1)
expect(batch_bounds).to be_nil
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Database::BackgroundMigration::Scheduler, '#perform' do
let(:scheduler) { described_class.new }
shared_examples_for 'it has no jobs to run' do
it 'does not create and run a migration job' do
test_wrapper = double('test wrapper')
expect(test_wrapper).not_to receive(:perform)
expect do
scheduler.perform(migration_wrapper: test_wrapper)
end.not_to change { Gitlab::Database::BackgroundMigration::BatchedJob.count }
end
end
context 'when there are no active migrations' do
let!(:migration) { create(:batched_background_migration, :finished) }
it_behaves_like 'it has no jobs to run'
end
shared_examples_for 'it has completed the migration' do
it 'marks the migration as finished' do
relation = Gitlab::Database::BackgroundMigration::BatchedMigration.finished.where(id: first_migration.id)
expect { scheduler.perform }.to change { relation.count }.by(1)
end
end
context 'when there are active migrations' do
let!(:last_migration) { create(:batched_background_migration, :active, created_at: 1.month.ago) }
let!(:first_migration) { create(:batched_background_migration, :active, batch_size: 2, created_at: 2.months.ago) }
let(:job_relation) do
Gitlab::Database::BackgroundMigration::BatchedJob.where(batched_background_migration_id: first_migration.id)
end
context 'when the migration interval has not elapsed' do
before do
expect_next_found_instance_of(Gitlab::Database::BackgroundMigration::BatchedMigration) do |migration|
expect(migration).to receive(:interval_elapsed?).and_return(false)
end
end
it_behaves_like 'it has no jobs to run'
end
context 'when the interval has elapsed' do
before do
expect_next_found_instance_of(Gitlab::Database::BackgroundMigration::BatchedMigration) do |migration|
expect(migration).to receive(:interval_elapsed?).and_return(true)
end
end
context 'when the first migration has no previous jobs' do
context 'when the migration has batches to process' do
let!(:event1) { create(:event) }
let!(:event2) { create(:event) }
let!(:event3) { create(:event) }
it 'runs the job for the first batch' do
first_migration.update!(min_value: event1.id, max_value: event3.id)
expect_next_instance_of(Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper) do |wrapper|
expect(wrapper).to receive(:perform).and_wrap_original do |_, job_record|
expect(job_record).to eq(job_relation.first)
end
end
expect { scheduler.perform }.to change { job_relation.count }.by(1)
expect(job_relation.first).to have_attributes(
min_value: event1.id,
max_value: event2.id,
batch_size: first_migration.batch_size,
sub_batch_size: first_migration.sub_batch_size)
end
end
context 'when the migration has no batches to process' do
it_behaves_like 'it has no jobs to run'
it_behaves_like 'it has completed the migration'
end
end
context 'when the first migration has previous jobs' do
let!(:event1) { create(:event) }
let!(:event2) { create(:event) }
let!(:event3) { create(:event) }
let!(:previous_job) do
create(:batched_background_migration_job,
batched_migration: first_migration,
min_value: event1.id,
max_value: event2.id,
batch_size: 2,
sub_batch_size: 1)
end
context 'when the migration is ready to process another job' do
it 'runs the migration job for the next batch' do
first_migration.update!(min_value: event1.id, max_value: event3.id)
expect_next_instance_of(Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper) do |wrapper|
expect(wrapper).to receive(:perform).and_wrap_original do |_, job_record|
expect(job_record).to eq(job_relation.last)
end
end
expect { scheduler.perform }.to change { job_relation.count }.by(1)
expect(job_relation.last).to have_attributes(
min_value: event3.id,
max_value: event3.id,
batch_size: first_migration.batch_size,
sub_batch_size: first_migration.sub_batch_size)
end
end
context 'when the migration has no batches remaining' do
let!(:final_job) do
create(:batched_background_migration_job,
batched_migration: first_migration,
min_value: event3.id,
max_value: event3.id,
batch_size: 2,
sub_batch_size: 1)
end
it_behaves_like 'it has no jobs to run'
it_behaves_like 'it has completed the migration'
end
end
context 'when the bounds of the next batch exceed the migration maximum value' do
let!(:event1) { create(:event) }
let!(:event2) { create(:event) }
let!(:event3) { create(:event) }
context 'when the batch maximum exceeds the migration maximum' do
it 'clamps the batch maximum to the migration maximum' do
first_migration.update!(batch_size: 5, min_value: event1.id, max_value: event2.id)
expect_next_instance_of(Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper) do |wrapper|
expect(wrapper).to receive(:perform)
end
expect { scheduler.perform }.to change { job_relation.count }.by(1)
expect(job_relation.first).to have_attributes(
min_value: event1.id,
max_value: event2.id,
batch_size: first_migration.batch_size,
sub_batch_size: first_migration.sub_batch_size)
end
end
context 'when the batch minimum exceeds the migration maximum' do
let!(:previous_job) do
create(:batched_background_migration_job,
batched_migration: first_migration,
min_value: event1.id,
max_value: event2.id,
batch_size: 5,
sub_batch_size: 1)
end
before do
first_migration.update!(batch_size: 5, min_value: 1, max_value: event2.id)
end
it_behaves_like 'it has no jobs to run'
it_behaves_like 'it has completed the migration'
end
end
end
end
end
......@@ -79,6 +79,89 @@ RSpec.describe Gitlab::Database::Migrations::BackgroundMigrationHelpers do
end
end
describe '#queue_batched_background_migration' do
it 'creates the database record for the migration' do
expect do
model.queue_batched_background_migration(
'MyJobClass',
:projects,
:id,
job_interval: 5.minutes,
batch_min_value: 5,
batch_max_value: 1000,
batch_class_name: 'MyBatchClass',
batch_size: 100,
sub_batch_size: 10,
other_job_arguments: %w[my arguments])
end.to change { Gitlab::Database::BackgroundMigration::BatchedMigration.count }.from(0).to(1)
expect(Gitlab::Database::BackgroundMigration::BatchedMigration.first).to have_attributes(
job_class_name: 'MyJobClass',
table_name: 'projects',
column_name: 'id',
interval: 300,
min_value: 5,
max_value: 1000,
batch_class_name: 'MyBatchClass',
batch_size: 100,
sub_batch_size: 10,
job_arguments: %w[my arguments],
status: 'active')
end
context 'when the max_value is not given' do
context 'when records exist in the database' do
let!(:event1) { create(:event) }
let!(:event2) { create(:event) }
let!(:event3) { create(:event) }
it 'creates the record with current max value' do
expect do
model.queue_batched_background_migration('MyJobClass', :events, :id, job_interval: 5.minutes)
end.to change { Gitlab::Database::BackgroundMigration::BatchedMigration.count }.from(0).to(1)
expect(Gitlab::Database::BackgroundMigration::BatchedMigration.first).to have_attributes(
job_class_name: 'MyJobClass',
table_name: 'events',
column_name: 'id',
interval: 300,
min_value: 1,
max_value: event3.id,
batch_class_name: described_class::BACKGROUND_MIGRATION_BATCH_CLASS_NAME,
batch_size: described_class::BACKGROUND_MIGRATION_BATCH_SIZE,
sub_batch_size: described_class::BACKGROUND_MIGRATION_SUB_BATCH_SIZE,
job_arguments: [],
status: 'active')
end
end
context 'when the database is empty' do
it 'does not create the record' do
expect do
model.queue_batched_background_migration('MyJobClass', :events, :id, job_interval: 5.minutes)
end.not_to change { Gitlab::Database::BackgroundMigration::BatchedMigration.count }
end
end
end
end
describe '#abort_batched_background_migrations' do
let!(:migration1) { create(:batched_background_migration, :finished) }
let!(:migration2) { create(:batched_background_migration) }
let!(:migration3) { create(:batched_background_migration, job_class_name: 'SomeOtherJobClassName') }
let!(:migration4) { create(:batched_background_migration, table_name: 'some_other_table_name') }
it 'aborts unfinished migrations that match the job class and table configuration' do
job_class_name = 'Gitlab::BackgroundMigration::CopyColumnUsingBackgroundMigrationJob'
expect do
model.abort_batched_background_migrations(job_class_name, 'events', 'id')
end.to change { Gitlab::Database::BackgroundMigration::BatchedMigration.aborted.count }.from(0).to(1)
expect(migration2.reload).to be_aborted
end
end
describe '#queue_background_migration_jobs_by_range_at_intervals' do
context 'when the model has an ID column' do
let!(:id1) { create(:user).id }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment