Commit deb59cbe authored by Kamil Trzciński's avatar Kamil Trzciński

Merge branch 'feature/gb/add-pending-builds-table' into 'master'

Accelerate builds queuing using a denormalized accelerated table [RUN ALL RSPEC] [RUN AS-IF-FOSS]

See merge request gitlab-org/gitlab!61581
parents f66f448b e057e178
......@@ -38,6 +38,7 @@ module Ci
has_one :deployment, as: :deployable, class_name: 'Deployment'
has_one :pending_state, class_name: 'Ci::BuildPendingState', inverse_of: :build
has_one :queuing_entry, class_name: 'Ci::PendingBuild', foreign_key: :build_id
has_many :trace_sections, class_name: 'Ci::BuildTraceSection'
has_many :trace_chunks, class_name: 'Ci::BuildTraceChunk', foreign_key: :build_id, inverse_of: :build
has_many :report_results, class_name: 'Ci::BuildReportResult', inverse_of: :build
......@@ -305,12 +306,20 @@ module Ci
end
end
after_transition any => [:pending] do |build|
# rubocop:disable CodeReuse/ServiceClass
after_transition any => [:pending] do |build, transition|
Ci::UpdateBuildQueueService.new.push(build, transition)
build.run_after_commit do
BuildQueueWorker.perform_async(id)
end
end
after_transition pending: any do |build, transition|
Ci::UpdateBuildQueueService.new.pop(build, transition)
end
# rubocop:enable CodeReuse/ServiceClass
after_transition pending: :running do |build|
build.deployment&.run
......@@ -1067,6 +1076,14 @@ module Ci
options.dig(:allow_failure_criteria, :exit_codes).present?
end
def all_queuing_entries
# We can have only one queuing entry, because there is a unique index on
# `build_id`, but we need a relation to remove this single queuing entry
# more efficiently in a single statement without actually load data.
::Ci::PendingBuild.where(build_id: self.id)
end
protected
def run_status_commit_hooks!
......
# frozen_string_literal: true
module Ci
class PendingBuild < ApplicationRecord
extend Gitlab::Ci::Model
belongs_to :project
belongs_to :build, class_name: 'Ci::Build'
def self.upsert_from_build!(build)
entry = self.new(build: build, project: build.project)
entry.validate!
self.upsert(entry.attributes.compact, returning: %w[build_id], unique_by: :build_id)
end
end
end
......@@ -2,13 +2,62 @@
module Ci
class UpdateBuildQueueService
def execute(build, metrics = ::Gitlab::Ci::Queue::Metrics)
tick_for(build, build.project.all_runners, metrics)
InvalidQueueTransition = Class.new(StandardError)
attr_reader :metrics
def initialize(metrics = ::Gitlab::Ci::Queue::Metrics)
@metrics = metrics
end
##
# Add a build to the pending builds queue
#
def push(build, transition)
return unless maintain_pending_builds_queue?(build)
raise InvalidQueueTransition unless transition.to == 'pending'
transition.within_transaction do
result = ::Ci::PendingBuild.upsert_from_build!(build)
unless result.empty?
metrics.increment_queue_operation(:build_queue_push)
result.rows.dig(0, 0)
end
end
end
##
# Remove a build from the pending builds queue
#
def pop(build, transition)
return unless maintain_pending_builds_queue?(build)
raise InvalidQueueTransition unless transition.from == 'pending'
transition.within_transaction do
removed = build.all_queuing_entries.delete_all
if removed > 0
metrics.increment_queue_operation(:build_queue_pop)
build.id
end
end
end
##
# Unblock runner associated with given project / build
#
def tick(build)
tick_for(build, build.project.all_runners)
end
private
def tick_for(build, runners, metrics)
def tick_for(build, runners)
runners = runners.with_recent_runner_queue
runners = runners.with_tags if Feature.enabled?(:ci_preload_runner_tags, default_enabled: :yaml)
......@@ -20,5 +69,9 @@ module Ci
runner.pick_build!(build)
end
end
def maintain_pending_builds_queue?(build)
Feature.enabled?(:ci_pending_builds_queue_maintain, build.project, default_enabled: :yaml)
end
end
end
......@@ -14,7 +14,7 @@ class BuildQueueWorker # rubocop:disable Scalability/IdempotentWorker
# rubocop: disable CodeReuse/ActiveRecord
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
Ci::UpdateBuildQueueService.new.execute(build)
Ci::UpdateBuildQueueService.new.tick(build)
end
end
# rubocop: enable CodeReuse/ActiveRecord
......
---
title: Accelerate builds queuing using a denormalized accelerated table
merge_request: 61581
author:
type: performance
---
name: ci_pending_builds_queue_maintain
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/61581
rollout_issue_url:
milestone: '13.12'
type: development
group: group::continuous integration
default_enabled: false
# frozen_string_literal: true
class AddPendingBuildsTable < ActiveRecord::Migration[6.0]
def up
create_table :ci_pending_builds do |t|
t.references :build, index: { unique: true }, null: false, foreign_key: { to_table: :ci_builds, on_delete: :cascade }
t.references :project, index: true, null: false, foreign_key: { on_delete: :cascade }
t.datetime_with_timezone :created_at, null: false, default: -> { 'NOW()' }
end
end
def down
drop_table :ci_pending_builds
end
end
1acc251417e3230c9b0a46e294cb9a6e8768f31978b8d4f439101f8de4e9269e
\ No newline at end of file
......@@ -10775,6 +10775,22 @@ CREATE SEQUENCE ci_namespace_monthly_usages_id_seq
ALTER SEQUENCE ci_namespace_monthly_usages_id_seq OWNED BY ci_namespace_monthly_usages.id;
CREATE TABLE ci_pending_builds (
id bigint NOT NULL,
build_id bigint NOT NULL,
project_id bigint NOT NULL,
created_at timestamp with time zone DEFAULT now() NOT NULL
);
CREATE SEQUENCE ci_pending_builds_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE ci_pending_builds_id_seq OWNED BY ci_pending_builds.id;
CREATE TABLE ci_pipeline_artifacts (
id bigint NOT NULL,
created_at timestamp with time zone NOT NULL,
......@@ -19578,6 +19594,8 @@ ALTER TABLE ONLY ci_job_variables ALTER COLUMN id SET DEFAULT nextval('ci_job_va
ALTER TABLE ONLY ci_namespace_monthly_usages ALTER COLUMN id SET DEFAULT nextval('ci_namespace_monthly_usages_id_seq'::regclass);
ALTER TABLE ONLY ci_pending_builds ALTER COLUMN id SET DEFAULT nextval('ci_pending_builds_id_seq'::regclass);
ALTER TABLE ONLY ci_pipeline_artifacts ALTER COLUMN id SET DEFAULT nextval('ci_pipeline_artifacts_id_seq'::regclass);
ALTER TABLE ONLY ci_pipeline_chat_data ALTER COLUMN id SET DEFAULT nextval('ci_pipeline_chat_data_id_seq'::regclass);
......@@ -20757,6 +20775,9 @@ ALTER TABLE ONLY ci_job_variables
ALTER TABLE ONLY ci_namespace_monthly_usages
ADD CONSTRAINT ci_namespace_monthly_usages_pkey PRIMARY KEY (id);
ALTER TABLE ONLY ci_pending_builds
ADD CONSTRAINT ci_pending_builds_pkey PRIMARY KEY (id);
ALTER TABLE ONLY ci_pipeline_artifacts
ADD CONSTRAINT ci_pipeline_artifacts_pkey PRIMARY KEY (id);
......@@ -22673,6 +22694,10 @@ CREATE UNIQUE INDEX index_ci_job_variables_on_key_and_job_id ON ci_job_variables
CREATE UNIQUE INDEX index_ci_namespace_monthly_usages_on_namespace_id_and_date ON ci_namespace_monthly_usages USING btree (namespace_id, date);
CREATE UNIQUE INDEX index_ci_pending_builds_on_build_id ON ci_pending_builds USING btree (build_id);
CREATE INDEX index_ci_pending_builds_on_project_id ON ci_pending_builds USING btree (project_id);
CREATE INDEX index_ci_pipeline_artifacts_failed_verification ON ci_pipeline_artifacts USING btree (verification_retry_at NULLS FIRST) WHERE (verification_state = 3);
CREATE INDEX index_ci_pipeline_artifacts_needs_verification ON ci_pipeline_artifacts USING btree (verification_state) WHERE ((verification_state = 0) OR (verification_state = 3));
......@@ -26444,6 +26469,9 @@ ALTER TABLE ONLY vulnerability_feedback
ALTER TABLE ONLY user_custom_attributes
ADD CONSTRAINT fk_rails_47b91868a8 FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE;
ALTER TABLE ONLY ci_pending_builds
ADD CONSTRAINT fk_rails_480669c3b3 FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE;
ALTER TABLE ONLY ci_pipeline_artifacts
ADD CONSTRAINT fk_rails_4a70390ca6 FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE;
......@@ -26690,6 +26718,9 @@ ALTER TABLE ONLY list_user_preferences
ALTER TABLE ONLY project_custom_attributes
ADD CONSTRAINT fk_rails_719c3dccc5 FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE;
ALTER TABLE ONLY ci_pending_builds
ADD CONSTRAINT fk_rails_725a2644a3 FOREIGN KEY (build_id) REFERENCES ci_builds(id) ON DELETE CASCADE;
ALTER TABLE ONLY security_findings
ADD CONSTRAINT fk_rails_729b763a54 FOREIGN KEY (scanner_id) REFERENCES vulnerability_scanners(id) ON DELETE CASCADE;
......@@ -20,6 +20,8 @@ module Gitlab
:build_can_pick,
:build_not_pick,
:build_not_pending,
:build_queue_push,
:build_queue_pop,
:build_temporary_locked,
:build_conflict_lock,
:build_conflict_exception,
......@@ -77,11 +79,7 @@ module Gitlab
# rubocop: enable CodeReuse/ActiveRecord
def increment_queue_operation(operation)
if !Rails.env.production? && !OPERATION_COUNTERS.include?(operation)
raise ArgumentError, "unknown queue operation: #{operation}"
end
self.class.queue_operations_total.increment(operation: operation)
self.class.increment_queue_operation(operation)
end
def observe_queue_depth(queue, size)
......@@ -121,6 +119,14 @@ module Gitlab
result
end
def self.increment_queue_operation(operation)
if !Rails.env.production? && !OPERATION_COUNTERS.include?(operation)
raise ArgumentError, "unknown queue operation: #{operation}"
end
queue_operations_total.increment(operation: operation)
end
def self.observe_active_runners(runners_proc)
return unless Feature.enabled?(:gitlab_ci_builds_queuing_metrics, default_enabled: false)
......
......@@ -323,8 +323,6 @@ RSpec.describe Ci::Build do
describe '#enqueue' do
let(:build) { create(:ci_build, :created) }
subject { build.enqueue }
before do
allow(build).to receive(:any_unmet_prerequisites?).and_return(has_prerequisites)
allow(Ci::PrepareBuildService).to receive(:perform_async)
......@@ -334,28 +332,74 @@ RSpec.describe Ci::Build do
let(:has_prerequisites) { true }
it 'transitions to preparing' do
subject
build.enqueue
expect(build).to be_preparing
end
it 'does not push build to the queue' do
build.enqueue
expect(::Ci::PendingBuild.all.count).to be_zero
end
end
context 'build has no prerequisites' do
let(:has_prerequisites) { false }
it 'transitions to pending' do
subject
build.enqueue
expect(build).to be_pending
end
it 'pushes build to a queue' do
build.enqueue
expect(build.queuing_entry).to be_present
end
context 'when build status transition fails' do
before do
::Ci::Build.find(build.id).update_column(:lock_version, 100)
end
it 'does not push build to a queue' do
expect { build.enqueue! }
.to raise_error(ActiveRecord::StaleObjectError)
expect(build.queuing_entry).not_to be_present
end
end
context 'when there is a queuing entry already present' do
before do
::Ci::PendingBuild.create!(build: build, project: build.project)
end
it 'does not raise an error' do
expect { build.enqueue! }.not_to raise_error
expect(build.reload.queuing_entry).to be_present
end
end
context 'when both failure scenario happen at the same time' do
before do
::Ci::Build.find(build.id).update_column(:lock_version, 100)
::Ci::PendingBuild.create!(build: build, project: build.project)
end
it 'raises stale object error exception' do
expect { build.enqueue! }
.to raise_error(ActiveRecord::StaleObjectError)
end
end
end
end
describe '#enqueue_preparing' do
let(:build) { create(:ci_build, :preparing) }
subject { build.enqueue_preparing }
before do
allow(build).to receive(:any_unmet_prerequisites?).and_return(has_unmet_prerequisites)
end
......@@ -364,9 +408,10 @@ RSpec.describe Ci::Build do
let(:has_unmet_prerequisites) { false }
it 'transitions to pending' do
subject
build.enqueue_preparing
expect(build).to be_pending
expect(build.queuing_entry).to be_present
end
end
......@@ -374,9 +419,10 @@ RSpec.describe Ci::Build do
let(:has_unmet_prerequisites) { true }
it 'remains in preparing' do
subject
build.enqueue_preparing
expect(build).to be_preparing
expect(build.queuing_entry).not_to be_present
end
end
end
......@@ -405,6 +451,36 @@ RSpec.describe Ci::Build do
end
end
describe '#run' do
context 'when build has been just created' do
let(:build) { create(:ci_build, :created) }
it 'creates queuing entry and then removes it' do
build.enqueue!
expect(build.queuing_entry).to be_present
build.run!
expect(build.reload.queuing_entry).not_to be_present
end
end
context 'when build status transition fails' do
let(:build) { create(:ci_build, :pending) }
before do
::Ci::PendingBuild.create!(build: build, project: build.project)
::Ci::Build.find(build.id).update_column(:lock_version, 100)
end
it 'does not remove build from a queue' do
expect { build.run! }
.to raise_error(ActiveRecord::StaleObjectError)
expect(build.queuing_entry).to be_present
end
end
end
describe '#schedulable?' do
subject { build.schedulable? }
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::PendingBuild do
let_it_be(:project) { create(:project) }
let_it_be(:pipeline) { create(:ci_pipeline, project: project) }
let(:build) { create(:ci_build, :created, pipeline: pipeline) }
describe '.upsert_from_build!' do
context 'another pending entry does not exist' do
it 'creates a new pending entry' do
result = described_class.upsert_from_build!(build)
expect(result.rows.dig(0, 0)).to eq build.id
expect(build.reload.queuing_entry).to be_present
end
end
context 'when another queuing entry exists for given build' do
before do
described_class.create!(build: build, project: project)
end
it 'returns a build id as a result' do
result = described_class.upsert_from_build!(build)
expect(result.rows.dig(0, 0)).to eq build.id
end
end
end
end
......@@ -2726,7 +2726,7 @@ RSpec.describe Ci::Pipeline, :mailer, factory_default: :keep do
pipeline2.cancel_running
end
extra_update_queries = 3 # transition ... => :canceled
extra_update_queries = 4 # transition ... => :canceled, queue pop
extra_generic_commit_status_validation_queries = 2 # name_uniqueness_across_types
expect(control2.count).to eq(control1.count + extra_update_queries + extra_generic_commit_status_validation_queries)
......
......@@ -59,7 +59,8 @@ RSpec.describe Ci::RetryBuildService do
metadata runner_session trace_chunks upstream_pipeline_id
artifacts_file artifacts_metadata artifacts_size commands
resource resource_group_id processed security_scans author
pipeline_id report_results pending_state pages_deployments].freeze
pipeline_id report_results pending_state pages_deployments
queuing_entry].freeze
shared_examples 'build duplication' do
let_it_be(:another_pipeline) { create(:ci_empty_pipeline, project: project) }
......
......@@ -7,15 +7,112 @@ RSpec.describe Ci::UpdateBuildQueueService do
let(:build) { create(:ci_build, pipeline: pipeline) }
let(:pipeline) { create(:ci_pipeline, project: project) }
describe '#push' do
let(:transition) { double('transition') }
before do
allow(transition).to receive(:to).and_return('pending')
allow(transition).to receive(:within_transaction).and_yield
end
context 'when pending build can be created' do
it 'creates a new pending build in transaction' do
queued = subject.push(build, transition)
expect(queued).to eq build.id
end
it 'increments queue push metric' do
metrics = spy('metrics')
described_class.new(metrics).push(build, transition)
expect(metrics)
.to have_received(:increment_queue_operation)
.with(:build_queue_push)
end
end
context 'when invalid transition is detected' do
it 'raises an error' do
allow(transition).to receive(:to).and_return('created')
expect { subject.push(build, transition) }
.to raise_error(described_class::InvalidQueueTransition)
end
end
context 'when duplicate entry exists' do
before do
::Ci::PendingBuild.create!(build: build, project: project)
end
it 'does nothing and returns build id' do
queued = subject.push(build, transition)
expect(queued).to eq build.id
end
end
end
describe '#pop' do
let(:transition) { double('transition') }
before do
allow(transition).to receive(:from).and_return('pending')
allow(transition).to receive(:within_transaction).and_yield
end
context 'when pending build exists' do
before do
Ci::PendingBuild.create!(build: build, project: project)
end
it 'removes pending build in a transaction' do
dequeued = subject.pop(build, transition)
expect(dequeued).to eq build.id
end
it 'increments queue pop metric' do
metrics = spy('metrics')
described_class.new(metrics).pop(build, transition)
expect(metrics)
.to have_received(:increment_queue_operation)
.with(:build_queue_pop)
end
end
context 'when pending build does not exist' do
it 'does nothing if there is no pending build to remove' do
dequeued = subject.pop(build, transition)
expect(dequeued).to be_nil
end
end
context 'when invalid transition is detected' do
it 'raises an error' do
allow(transition).to receive(:from).and_return('created')
expect { subject.pop(build, transition) }
.to raise_error(described_class::InvalidQueueTransition)
end
end
end
describe '#tick' do
shared_examples 'refreshes runner' do
it 'ticks runner queue value' do
expect { subject.execute(build) }.to change { runner.ensure_runner_queue_value }
expect { subject.tick(build) }.to change { runner.ensure_runner_queue_value }
end
end
shared_examples 'does not refresh runner' do
it 'ticks runner queue value' do
expect { subject.execute(build) }.not_to change { runner.ensure_runner_queue_value }
expect { subject.tick(build) }.not_to change { runner.ensure_runner_queue_value }
end
end
......@@ -30,7 +127,7 @@ RSpec.describe Ci::UpdateBuildQueueService do
it 'avoids running redundant queries' do
expect(Ci::Runner).not_to receive(:owned_or_instance_wide)
subject.execute(build)
subject.tick(build)
end
context 'when feature flag ci_reduce_queries_when_ticking_runner_queue is disabled' do
......@@ -42,7 +139,7 @@ RSpec.describe Ci::UpdateBuildQueueService do
it 'runs redundant queries using `owned_or_instance_wide` scope' do
expect(Ci::Runner).to receive(:owned_or_instance_wide).and_call_original
subject.execute(build)
subject.tick(build)
end
end
end
......@@ -130,11 +227,11 @@ RSpec.describe Ci::UpdateBuildQueueService do
end
it 'does execute the same amount of queries regardless of number of runners' do
control_count = ActiveRecord::QueryRecorder.new { subject.execute(build) }.count
control_count = ActiveRecord::QueryRecorder.new { subject.tick(build) }.count
create_list(:ci_runner, 10, :project, :online, projects: [project], tag_list: %w[b c d])
expect { subject.execute(build) }.not_to exceed_all_query_limit(control_count)
expect { subject.tick(build) }.not_to exceed_all_query_limit(control_count)
end
end
......@@ -147,11 +244,12 @@ RSpec.describe Ci::UpdateBuildQueueService do
end
it 'does execute more queries for more runners' do
control_count = ActiveRecord::QueryRecorder.new { subject.execute(build) }.count
control_count = ActiveRecord::QueryRecorder.new { subject.tick(build) }.count
create_list(:ci_runner, 10, :project, :online, projects: [project], tag_list: %w[b c d])
expect { subject.execute(build) }.to exceed_all_query_limit(control_count)
expect { subject.tick(build) }.to exceed_all_query_limit(control_count)
end
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment