Commit c2efb8ac authored by Grzegorz Bizon's avatar Grzegorz Bizon

Use ActiveRecord 5 batching to schedule bg migration

parent 945cdf32
...@@ -14,7 +14,7 @@ class BackgroundMigrationWorker ...@@ -14,7 +14,7 @@ class BackgroundMigrationWorker
'args' => jobs) 'args' => jobs)
end end
# Schedules a number of jobs in bulk, with a delay. # Schedules multiple jobs in bulk, with a delay.
# #
def self.perform_bulk_in(delay, jobs) def self.perform_bulk_in(delay, jobs)
now = Time.now.to_f now = Time.now.to_f
......
...@@ -15,7 +15,7 @@ module ActiveRecord ...@@ -15,7 +15,7 @@ module ActiveRecord
relation = relation.where(arel_table[primary_key].lteq(finish)) if finish relation = relation.where(arel_table[primary_key].lteq(finish)) if finish
batch_relation = relation batch_relation = relation
loop do 1.step do |index|
if load if load
records = batch_relation.records records = batch_relation.records
ids = records.map(&:id) ids = records.map(&:id)
...@@ -31,7 +31,7 @@ module ActiveRecord ...@@ -31,7 +31,7 @@ module ActiveRecord
primary_key_offset = ids.last primary_key_offset = ids.last
raise ArgumentError.new("Primary key not included in the custom select clause") unless primary_key_offset raise ArgumentError.new("Primary key not included in the custom select clause") unless primary_key_offset
yield yielded_relation yield yielded_relation, index
break if ids.length < of break if ids.length < of
batch_relation = relation.where(arel_table[primary_key].gt(primary_key_offset)) batch_relation = relation.where(arel_table[primary_key].gt(primary_key_offset))
......
...@@ -13,10 +13,9 @@ class MigrateStageIdReferenceInBackground < ActiveRecord::Migration ...@@ -13,10 +13,9 @@ class MigrateStageIdReferenceInBackground < ActiveRecord::Migration
def up def up
Build.where(stage_id: nil) Build.where(stage_id: nil)
.find_in_batches(batch_size: BATCH_SIZE) .in_batches(of: BATCH_SIZE) do |relation, index|
.with_index do |builds, batch| schedule = index * 5.minutes
builds.each do |build| relation.each do |build|
schedule = (batch - 1) * 5.minutes
BackgroundMigrationWorker.perform_at(schedule, MIGRATION, [build.id]) BackgroundMigrationWorker.perform_at(schedule, MIGRATION, [build.id])
end end
end end
......
require 'spec_helper' require 'spec_helper'
require Rails.root.join('db', 'post_migrate', '20170628080858_migrate_stage_id_reference_in_background') require Rails.root.join('db', 'post_migrate', '20170628080858_migrate_stage_id_reference_in_background')
RSpec::Matchers.define :have_migrated do |*expected| describe MigrateStageIdReferenceInBackground, :migration, :sidekiq do
match do |migration| matcher :have_migrated do |*expected|
BackgroundMigrationWorker.jobs.any? do |job| match do |migration|
job['enqueued_at'].present? && job['args'] == [migration, expected] BackgroundMigrationWorker.jobs.any? do |job|
job['enqueued_at'].present? && job['args'] == [migration, expected]
end
end end
end
failure_message do |migration| failure_message do |migration|
<<-EOS "Migration `#{migration}` with args `#{expected.inspect}` not executed!"
Background migration `#{migration}` with args `#{expected.inspect}` end
not migrated!
EOS
end end
end
RSpec::Matchers.define :have_scheduled_migration do |time, *expected| matcher :have_scheduled_migration do |delay, *expected|
match do |migration| match do |migration|
BackgroundMigrationWorker.jobs.any? do |job| BackgroundMigrationWorker.jobs.any? do |job|
job['args'] == [migration, expected] && job['at'] >= time job['args'] == [migration, expected] &&
job['at'].to_f == (delay.to_f + Time.now.to_f)
end
end end
end
failure_message do |migration| failure_message do |migration|
<<-EOS "Migration `#{migration}` with args `#{expected.inspect}` not scheduled!"
Background migration `#{migration}` with args `#{expected.inspect}` end
not scheduled!
EOS
end end
end
describe MigrateStageIdReferenceInBackground, :migration do
let(:jobs) { table(:ci_builds) } let(:jobs) { table(:ci_builds) }
let(:stages) { table(:ci_stages) } let(:stages) { table(:ci_stages) }
let(:pipelines) { table(:ci_pipelines) } let(:pipelines) { table(:ci_pipelines) }
let(:projects) { table(:projects) } let(:projects) { table(:projects) }
before do before do
stub_const('MigrateStageIdReferenceInBackground::BATCH_SIZE', 1) stub_const('MigrateStageIdReferenceInBackground::BATCH_SIZE', 2)
projects.create!(id: 123, name: 'gitlab1', path: 'gitlab1') projects.create!(id: 123, name: 'gitlab1', path: 'gitlab1')
pipelines.create!(id: 1, project_id: 123, ref: 'master', sha: 'adf43c3a') pipelines.create!(id: 1, project_id: 123, ref: 'master', sha: 'adf43c3a')
...@@ -55,12 +50,14 @@ describe MigrateStageIdReferenceInBackground, :migration do ...@@ -55,12 +50,14 @@ describe MigrateStageIdReferenceInBackground, :migration do
it 'correctly schedules background migrations' do it 'correctly schedules background migrations' do
Sidekiq::Testing.fake! do Sidekiq::Testing.fake! do
migrate! Timecop.freeze do
migrate!
expect(described_class::MIGRATION).to have_migrated(1) expect(described_class::MIGRATION).to have_scheduled_migration(5.minutes, 1)
expect(described_class::MIGRATION).to have_migrated(2) expect(described_class::MIGRATION).to have_scheduled_migration(5.minutes, 2)
expect(described_class::MIGRATION).to have_scheduled_migration(5.minutes, 3) expect(described_class::MIGRATION).to have_scheduled_migration(10.minutes, 3)
expect(described_class::MIGRATION).to have_scheduled_migration(5.minutes, 4) expect(described_class::MIGRATION).to have_scheduled_migration(10.minutes, 4)
end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment