Commit 31e260d6 authored by Marius Bobin's avatar Marius Bobin

Backfill CI queuing tables

Changelog: other
parent dfddb423
# frozen_string_literal: true
class StartBackfillCiQueuingTables < Gitlab::Database::Migration[1.0]
MIGRATION = 'BackfillCiQueuingTables'
BATCH_SIZE = 500
DELAY_INTERVAL = 2.minutes
disable_ddl_transaction!
def up
return if Gitlab.com?
queue_background_migration_jobs_by_range_at_intervals(
Gitlab::BackgroundMigration::BackfillCiQueuingTables::Ci::Build.pending,
MIGRATION,
DELAY_INTERVAL,
batch_size: BATCH_SIZE,
track_jobs: true)
end
def down
# no-op
end
end
dbe6760198b8fa068c30871a439298e56802867044a178baa6b8b009f8da13e6
\ No newline at end of file
# frozen_string_literal: true
module Gitlab
module BackgroundMigration
# Ensure queuing entries are present even if admins skip upgrades.
class BackfillCiQueuingTables
class Namespace < ActiveRecord::Base # rubocop:disable Style/Documentation
self.table_name = 'namespaces'
self.inheritance_column = :_type_disabled
end
class Project < ActiveRecord::Base # rubocop:disable Style/Documentation
self.table_name = 'projects'
belongs_to :namespace
has_one :ci_cd_settings, class_name: 'Gitlab::BackgroundMigration::BackfillCiQueuingTables::ProjectCiCdSetting'
def group_runners_enabled?
return false unless ci_cd_settings
ci_cd_settings.group_runners_enabled?
end
end
class ProjectCiCdSetting < ActiveRecord::Base # rubocop:disable Style/Documentation
self.table_name = 'project_ci_cd_settings'
end
class Taggings < ActiveRecord::Base # rubocop:disable Style/Documentation
self.table_name = 'taggings'
end
module Ci
class Build < ActiveRecord::Base # rubocop:disable Style/Documentation
include EachBatch
self.table_name = 'ci_builds'
self.inheritance_column = :_type_disabled
belongs_to :project
scope :pending, -> do
where(status: :pending, type: 'Ci::Build', runner_id: nil)
end
def self.each_batch(of: 1000, column: :id, order: { runner_id: :asc, id: :asc }, order_hint: nil)
start = except(:select).select(column).reorder(order)
start = start.take
return unless start
start_id = start[column]
arel_table = self.arel_table
1.step do |index|
start_cond = arel_table[column].gteq(start_id)
stop = except(:select).select(column).where(start_cond).reorder(order)
stop = stop.offset(of).limit(1).take
relation = where(start_cond)
if stop
stop_id = stop[column]
start_id = stop_id
stop_cond = arel_table[column].lt(stop_id)
relation = relation.where(stop_cond)
end
# Any ORDER BYs are useless for this relation and can lead to less
# efficient UPDATE queries, hence we get rid of it.
relation = relation.except(:order)
# Using unscoped is necessary to prevent leaking the current scope used by
# ActiveRecord to chain `each_batch` method.
unscoped { yield relation, index }
break unless stop
end
end
def tags_ids
BackfillCiQueuingTables::Taggings
.where(taggable_id: id, taggable_type: 'CommitStatus')
.pluck(:tag_id)
end
end
class PendingBuild < ActiveRecord::Base # rubocop:disable Style/Documentation
self.table_name = 'ci_pending_builds'
class << self
def upsert_from_build!(build)
entry = self.new(args_from_build(build))
self.upsert(
entry.attributes.compact,
returning: %w[build_id],
unique_by: :build_id)
end
def args_from_build(build)
project = build.project
{
build_id: build.id,
project_id: build.project_id,
protected: build.protected?,
namespace_id: project.namespace_id,
tag_ids: build.tags_ids,
instance_runners_enabled: project.shared_runners_enabled?,
namespace_traversal_ids: namespace_traversal_ids(project)
}
end
def namespace_traversal_ids(project)
if project.group_runners_enabled?
project.namespace.traversal_ids
else
[]
end
end
end
end
end
BATCH_SIZE = 100
def perform(start_id, end_id)
scope = BackfillCiQueuingTables::Ci::Build.pending.where(id: start_id..end_id)
pending_builds_query = BackfillCiQueuingTables::Ci::PendingBuild
.where('ci_builds.id = ci_pending_builds.build_id')
.select(1)
scope.each_batch(of: BATCH_SIZE) do |builds|
builds = builds.where('NOT EXISTS (?)', pending_builds_query)
builds = builds.includes(:project, project: [:namespace, :ci_cd_settings])
builds.each do |build|
BackfillCiQueuingTables::Ci::PendingBuild.upsert_from_build!(build)
end
end
mark_job_as_succeeded(start_id, end_id)
end
private
def mark_job_as_succeeded(*arguments)
Gitlab::Database::BackgroundMigrationJob.mark_all_as_succeeded(
self.class.name.demodulize,
arguments)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::BackgroundMigration::BackfillCiQueuingTables, :migration, schema: 20220208115439 do
let(:namespaces) { table(:namespaces) }
let(:projects) { table(:projects) }
let(:ci_cd_settings) { table(:project_ci_cd_settings) }
let(:builds) { table(:ci_builds) }
let(:queuing_entries) { table(:ci_pending_builds) }
let(:tags) { table(:tags) }
let(:taggings) { table(:taggings) }
subject { described_class.new }
describe '#perform' do
let!(:namespace) do
namespaces.create!(
id: 10,
name: 'namespace10',
path: 'namespace10',
traversal_ids: [10])
end
let!(:other_namespace) do
namespaces.create!(
id: 11,
name: 'namespace11',
path: 'namespace11',
traversal_ids: [11])
end
let!(:project) do
projects.create!(id: 5, namespace_id: 10, name: 'test1', path: 'test1')
end
let!(:ci_cd_setting) do
ci_cd_settings.create!(id: 5, project_id: 5, group_runners_enabled: true)
end
let!(:other_project) do
projects.create!(id: 7, namespace_id: 11, name: 'test2', path: 'test2')
end
let!(:other_ci_cd_setting) do
ci_cd_settings.create!(id: 7, project_id: 7, group_runners_enabled: false)
end
let!(:another_project) do
projects.create!(id: 9, namespace_id: 10, name: 'test3', path: 'test3', shared_runners_enabled: false)
end
let!(:ruby_tag) do
tags.create!(id: 22, name: 'ruby')
end
let!(:postgres_tag) do
tags.create!(id: 23, name: 'postgres')
end
it 'creates ci_pending_builds for all pending builds in range' do
builds.create!(id: 50, status: :pending, name: 'test1', project_id: 5, type: 'Ci::Build')
builds.create!(id: 51, status: :created, name: 'test2', project_id: 5, type: 'Ci::Build')
builds.create!(id: 52, status: :pending, name: 'test3', project_id: 5, protected: true, type: 'Ci::Build')
taggings.create!(taggable_id: 52, taggable_type: 'CommitStatus', tag_id: 22)
taggings.create!(taggable_id: 52, taggable_type: 'CommitStatus', tag_id: 23)
builds.create!(id: 60, status: :pending, name: 'test1', project_id: 7, type: 'Ci::Build')
builds.create!(id: 61, status: :running, name: 'test2', project_id: 7, protected: true, type: 'Ci::Build')
builds.create!(id: 62, status: :pending, name: 'test3', project_id: 7, type: 'Ci::Build')
taggings.create!(taggable_id: 60, taggable_type: 'CommitStatus', tag_id: 23)
taggings.create!(taggable_id: 62, taggable_type: 'CommitStatus', tag_id: 22)
builds.create!(id: 70, status: :pending, name: 'test1', project_id: 9, protected: true, type: 'Ci::Build')
builds.create!(id: 71, status: :failed, name: 'test2', project_id: 9, type: 'Ci::Build')
builds.create!(id: 72, status: :pending, name: 'test3', project_id: 9, type: 'Ci::Build')
taggings.create!(taggable_id: 71, taggable_type: 'CommitStatus', tag_id: 22)
subject.perform(1, 100)
expect(queuing_entries.all).to contain_exactly(
an_object_having_attributes(
build_id: 50,
project_id: 5,
namespace_id: 10,
protected: false,
instance_runners_enabled: true,
minutes_exceeded: false,
tag_ids: [],
namespace_traversal_ids: [10]),
an_object_having_attributes(
build_id: 52,
project_id: 5,
namespace_id: 10,
protected: true,
instance_runners_enabled: true,
minutes_exceeded: false,
tag_ids: [22, 23],
namespace_traversal_ids: [10]),
an_object_having_attributes(
build_id: 60,
project_id: 7,
namespace_id: 11,
protected: false,
instance_runners_enabled: true,
minutes_exceeded: false,
tag_ids: [23],
namespace_traversal_ids: []),
an_object_having_attributes(
build_id: 62,
project_id: 7,
namespace_id: 11,
protected: false,
instance_runners_enabled: true,
minutes_exceeded: false,
tag_ids: [22],
namespace_traversal_ids: []),
an_object_having_attributes(
build_id: 70,
project_id: 9,
namespace_id: 10,
protected: true,
instance_runners_enabled: false,
minutes_exceeded: false,
tag_ids: [],
namespace_traversal_ids: []),
an_object_having_attributes(
build_id: 72,
project_id: 9,
namespace_id: 10,
protected: false,
instance_runners_enabled: false,
minutes_exceeded: false,
tag_ids: [],
namespace_traversal_ids: [])
)
end
it 'skips builds that already have ci_pending_builds' do
builds.create!(id: 50, status: :pending, name: 'test1', project_id: 5, type: 'Ci::Build')
builds.create!(id: 51, status: :created, name: 'test2', project_id: 5, type: 'Ci::Build')
builds.create!(id: 52, status: :pending, name: 'test3', project_id: 5, protected: true, type: 'Ci::Build')
taggings.create!(taggable_id: 50, taggable_type: 'CommitStatus', tag_id: 22)
taggings.create!(taggable_id: 52, taggable_type: 'CommitStatus', tag_id: 23)
queuing_entries.create!(build_id: 50, project_id: 5, namespace_id: 10)
subject.perform(1, 100)
expect(queuing_entries.all).to contain_exactly(
an_object_having_attributes(
build_id: 50,
project_id: 5,
namespace_id: 10,
protected: false,
instance_runners_enabled: false,
minutes_exceeded: false,
tag_ids: [],
namespace_traversal_ids: []),
an_object_having_attributes(
build_id: 52,
project_id: 5,
namespace_id: 10,
protected: true,
instance_runners_enabled: true,
minutes_exceeded: false,
tag_ids: [23],
namespace_traversal_ids: [10])
)
end
it 'upserts values in case of conflicts' do
builds.create!(id: 50, status: :pending, name: 'test1', project_id: 5, type: 'Ci::Build')
queuing_entries.create!(build_id: 50, project_id: 5, namespace_id: 10)
build = described_class::Ci::Build.find(50)
described_class::Ci::PendingBuild.upsert_from_build!(build)
expect(queuing_entries.all).to contain_exactly(
an_object_having_attributes(
build_id: 50,
project_id: 5,
namespace_id: 10,
protected: false,
instance_runners_enabled: true,
minutes_exceeded: false,
tag_ids: [],
namespace_traversal_ids: [10])
)
end
end
context 'Ci::Build' do
describe '.each_batch' do
let(:model) { described_class::Ci::Build }
before do
builds.create!(id: 1, status: :pending, name: 'test1', project_id: 5, type: 'Ci::Build')
builds.create!(id: 2, status: :pending, name: 'test2', project_id: 5, type: 'Ci::Build')
builds.create!(id: 3, status: :pending, name: 'test3', project_id: 5, type: 'Ci::Build')
builds.create!(id: 4, status: :pending, name: 'test4', project_id: 5, type: 'Ci::Build')
builds.create!(id: 5, status: :pending, name: 'test5', project_id: 5, type: 'Ci::Build')
end
it 'yields an ActiveRecord::Relation when a block is given' do
model.each_batch do |relation|
expect(relation).to be_a_kind_of(ActiveRecord::Relation)
end
end
it 'yields a batch index as the second argument' do
model.each_batch do |_, index|
expect(index).to eq(1)
end
end
it 'accepts a custom batch size' do
amount = 0
model.each_batch(of: 1) { amount += 1 }
expect(amount).to eq(5)
end
it 'does not include ORDER BYs in the yielded relations' do
model.each_batch do |relation|
expect(relation.to_sql).not_to include('ORDER BY')
end
end
it 'orders ascending' do
ids = []
model.each_batch(of: 1) { |rel| ids.concat(rel.ids) }
expect(ids).to eq(ids.sort)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
require_migration!
RSpec.describe StartBackfillCiQueuingTables do
let(:namespaces) { table(:namespaces) }
let(:projects) { table(:projects) }
let(:builds) { table(:ci_builds) }
let!(:namespace) do
namespaces.create!(name: 'namespace1', path: 'namespace1')
end
let!(:project) do
projects.create!(namespace_id: namespace.id, name: 'test1', path: 'test1')
end
let!(:pending_build_1) do
builds.create!(status: :pending, name: 'test1', type: 'Ci::Build', project_id: project.id)
end
let!(:running_build) do
builds.create!(status: :running, name: 'test2', type: 'Ci::Build', project_id: project.id)
end
let!(:pending_build_2) do
builds.create!(status: :pending, name: 'test3', type: 'Ci::Build', project_id: project.id)
end
before do
stub_const("#{described_class.name}::BATCH_SIZE", 1)
end
it 'schedules jobs for builds that are pending' do
Sidekiq::Testing.fake! do
freeze_time do
migrate!
expect(described_class::MIGRATION).to be_scheduled_delayed_migration(
2.minutes, pending_build_1.id, pending_build_1.id)
expect(described_class::MIGRATION).to be_scheduled_delayed_migration(
4.minutes, pending_build_2.id, pending_build_2.id)
expect(BackgroundMigrationWorker.jobs.size).to eq(2)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment