Commit 3ca021ae authored by Grzegorz Bizon's avatar Grzegorz Bizon

Merge branch 'feature/gb/shared-runner-builds-table' into 'master'

Add shared runner builds table to the database [RUN ALL RSPEC] [RUN AS-IF-FOSS]

See merge request gitlab-org/gitlab!62912
parents 4ed3d12b fc9fbda2
......@@ -39,6 +39,7 @@ module Ci
has_one :deployment, as: :deployable, class_name: 'Deployment'
has_one :pending_state, class_name: 'Ci::BuildPendingState', inverse_of: :build
has_one :queuing_entry, class_name: 'Ci::PendingBuild', foreign_key: :build_id
has_one :runtime_metadata, class_name: 'Ci::RunningBuild', foreign_key: :build_id
has_many :trace_sections, class_name: 'Ci::BuildTraceSection'
has_many :trace_chunks, class_name: 'Ci::BuildTraceChunk', foreign_key: :build_id, inverse_of: :build
has_many :report_results, class_name: 'Ci::BuildReportResult', inverse_of: :build
......@@ -310,7 +311,22 @@ module Ci
after_transition pending: any do |build, transition|
Ci::UpdateBuildQueueService.new.pop(build, transition)
end
after_transition any => [:running] do |build, transition|
Ci::UpdateBuildQueueService.new.track(build, transition)
end
after_transition running: any do |build, transition|
Ci::UpdateBuildQueueService.new.untrack(build, transition)
Ci::BuildRunnerSession.where(build: build).delete_all
end
# rubocop:enable CodeReuse/ServiceClass
#
after_transition pending: :running do |build|
build.ensure_metadata.update_timeout_state
end
after_transition pending: :running do |build|
build.deployment&.run
......@@ -364,14 +380,6 @@ module Ci
end
end
after_transition pending: :running do |build|
build.ensure_metadata.update_timeout_state
end
after_transition running: any do |build|
Ci::BuildRunnerSession.where(build: build).delete_all
end
after_transition any => [:skipped, :canceled] do |build, transition|
if transition.to_name == :skipped
build.deployment&.skip
......@@ -1068,16 +1076,26 @@ module Ci
options.dig(:allow_failure_criteria, :exit_codes).present?
end
def all_queuing_entries
# We can have only one queuing entry, because there is a unique index on
# `build_id`, but we need a relation to remove this single queuing entry
# more efficiently in a single statement without actually load data.
def create_queuing_entry!
::Ci::PendingBuild.upsert_from_build!(self)
end
##
# We can have only one queuing entry or running build tracking entry,
# because there is a unique index on `build_id` in each table, but we need
# a relation to remove these entries more efficiently in a single statement
# without actually loading data.
#
def all_queuing_entries
::Ci::PendingBuild.where(build_id: self.id)
end
def create_queuing_entry!
::Ci::PendingBuild.upsert_from_build!(self)
def all_runtime_metadata
::Ci::RunningBuild.where(build_id: self.id)
end
def shared_runner_build?
runner&.instance_type?
end
protected
......
# frozen_string_literal: true
module Ci
class RunningBuild < ApplicationRecord
extend Gitlab::Ci::Model
belongs_to :project
belongs_to :build, class_name: 'Ci::Build'
belongs_to :runner, class_name: 'Ci::Runner'
enum runner_type: ::Ci::Runner.runner_types
def self.upsert_shared_runner_build!(build)
unless build.shared_runner_build?
raise ArgumentError, 'build has not been picked by a shared runner'
end
entry = self.new(build: build,
project: build.project,
runner: build.runner,
runner_type: build.runner.runner_type)
entry.validate!
self.upsert(entry.attributes.compact, returning: %w[build_id], unique_by: :build_id)
end
end
end
......@@ -48,6 +48,47 @@ module Ci
end
end
##
# Add shared runner build tracking entry (used for queuing).
#
def track(build, transition)
return unless Feature.enabled?(:ci_track_shared_runner_builds, build.project, default_enabled: :yaml)
return unless build.shared_runner_build?
raise InvalidQueueTransition unless transition.to == 'running'
transition.within_transaction do
result = ::Ci::RunningBuild.upsert_shared_runner_build!(build)
unless result.empty?
metrics.increment_queue_operation(:shared_runner_build_new)
result.rows.dig(0, 0)
end
end
end
##
# Remove a runtime build tracking entry for a shared runner build (used for
# queuing).
#
def untrack(build, transition)
return unless Feature.enabled?(:ci_untrack_shared_runner_builds, build.project, default_enabled: :yaml)
return unless build.shared_runner_build?
raise InvalidQueueTransition unless transition.from == 'running'
transition.within_transaction do
removed = build.all_runtime_metadata.delete_all
if removed > 0
metrics.increment_queue_operation(:shared_runner_build_done)
build.id
end
end
end
##
# Unblock runner associated with given project / build
#
......
---
name: ci_track_shared_runner_builds
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/62912
rollout_issue_url:
milestone: '14.0'
type: development
group: group::pipeline execution
default_enabled: false
---
name: ci_untrack_shared_runner_builds
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/62912
rollout_issue_url:
milestone: '14.0'
type: development
group: group::pipeline execution
default_enabled: false
# frozen_string_literal: true
class AddRunningBuildsTable < ActiveRecord::Migration[6.0]
def up
create_table :ci_running_builds do |t|
t.references :build, index: { unique: true }, null: false, foreign_key: { to_table: :ci_builds, on_delete: :cascade }
t.references :project, index: true, null: false, foreign_key: { on_delete: :cascade }
t.references :runner, index: true, null: false, foreign_key: { to_table: :ci_runners, on_delete: :cascade }
t.datetime_with_timezone :created_at, null: false, default: -> { 'NOW()' }
t.integer :runner_type, limit: 2, null: false
end
end
def down
drop_table :ci_running_builds
end
end
d4a0098c30cd1acea008fa5f1cfb4c23d5b5b894eab2b72f5004acc5233f2576
\ No newline at end of file
......@@ -11151,6 +11151,24 @@ CREATE SEQUENCE ci_runners_id_seq
ALTER SEQUENCE ci_runners_id_seq OWNED BY ci_runners.id;
CREATE TABLE ci_running_builds (
id bigint NOT NULL,
build_id bigint NOT NULL,
project_id bigint NOT NULL,
runner_id bigint NOT NULL,
created_at timestamp with time zone DEFAULT now() NOT NULL,
runner_type smallint NOT NULL
);
CREATE SEQUENCE ci_running_builds_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE ci_running_builds_id_seq OWNED BY ci_running_builds.id;
CREATE TABLE ci_sources_pipelines (
id integer NOT NULL,
project_id integer,
......@@ -19687,6 +19705,8 @@ ALTER TABLE ONLY ci_runner_projects ALTER COLUMN id SET DEFAULT nextval('ci_runn
ALTER TABLE ONLY ci_runners ALTER COLUMN id SET DEFAULT nextval('ci_runners_id_seq'::regclass);
ALTER TABLE ONLY ci_running_builds ALTER COLUMN id SET DEFAULT nextval('ci_running_builds_id_seq'::regclass);
ALTER TABLE ONLY ci_sources_pipelines ALTER COLUMN id SET DEFAULT nextval('ci_sources_pipelines_id_seq'::regclass);
ALTER TABLE ONLY ci_sources_projects ALTER COLUMN id SET DEFAULT nextval('ci_sources_projects_id_seq'::regclass);
......@@ -20892,6 +20912,9 @@ ALTER TABLE ONLY ci_runner_projects
ALTER TABLE ONLY ci_runners
ADD CONSTRAINT ci_runners_pkey PRIMARY KEY (id);
ALTER TABLE ONLY ci_running_builds
ADD CONSTRAINT ci_running_builds_pkey PRIMARY KEY (id);
ALTER TABLE ONLY ci_sources_pipelines
ADD CONSTRAINT ci_sources_pipelines_pkey PRIMARY KEY (id);
......@@ -22889,6 +22912,12 @@ CREATE INDEX index_ci_runners_on_token ON ci_runners USING btree (token);
CREATE INDEX index_ci_runners_on_token_encrypted ON ci_runners USING btree (token_encrypted);
CREATE UNIQUE INDEX index_ci_running_builds_on_build_id ON ci_running_builds USING btree (build_id);
CREATE INDEX index_ci_running_builds_on_project_id ON ci_running_builds USING btree (project_id);
CREATE INDEX index_ci_running_builds_on_runner_id ON ci_running_builds USING btree (runner_id);
CREATE INDEX index_ci_sources_pipelines_on_pipeline_id ON ci_sources_pipelines USING btree (pipeline_id);
CREATE INDEX index_ci_sources_pipelines_on_project_id ON ci_sources_pipelines USING btree (project_id);
......@@ -26709,6 +26738,9 @@ ALTER TABLE ONLY vulnerability_scanners
ALTER TABLE ONLY reviews
ADD CONSTRAINT fk_rails_5ca11d8c31 FOREIGN KEY (merge_request_id) REFERENCES merge_requests(id) ON DELETE CASCADE;
ALTER TABLE ONLY ci_running_builds
ADD CONSTRAINT fk_rails_5ca491d360 FOREIGN KEY (runner_id) REFERENCES ci_runners(id) ON DELETE CASCADE;
ALTER TABLE ONLY epic_issues
ADD CONSTRAINT fk_rails_5d942936b4 FOREIGN KEY (epic_id) REFERENCES epics(id) ON DELETE CASCADE;
......@@ -27414,6 +27446,9 @@ ALTER TABLE ONLY geo_hashed_storage_attachments_events
ALTER TABLE ONLY merge_request_reviewers
ADD CONSTRAINT fk_rails_d9fec24b9d FOREIGN KEY (merge_request_id) REFERENCES merge_requests(id) ON DELETE CASCADE;
ALTER TABLE ONLY ci_running_builds
ADD CONSTRAINT fk_rails_da45cfa165 FOREIGN KEY (build_id) REFERENCES ci_builds(id) ON DELETE CASCADE;
ALTER TABLE ONLY jira_imports
ADD CONSTRAINT fk_rails_da617096ce FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL;
......@@ -27426,6 +27461,9 @@ ALTER TABLE ONLY issues_prometheus_alert_events
ALTER TABLE ONLY board_user_preferences
ADD CONSTRAINT fk_rails_dbebdaa8fe FOREIGN KEY (board_id) REFERENCES boards(id) ON DELETE CASCADE;
ALTER TABLE ONLY ci_running_builds
ADD CONSTRAINT fk_rails_dc1d0801e8 FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE;
ALTER TABLE ONLY vulnerability_occurrence_pipelines
ADD CONSTRAINT fk_rails_dc3ae04693 FOREIGN KEY (occurrence_id) REFERENCES vulnerability_occurrences(id) ON DELETE CASCADE;
......@@ -33,7 +33,9 @@ module Gitlab
:queue_replication_lag,
:runner_pre_assign_checks_failed,
:runner_pre_assign_checks_success,
:runner_queue_tick
:runner_queue_tick,
:shared_runner_build_new,
:shared_runner_build_done
].to_set.freeze
QUEUE_DEPTH_HISTOGRAMS = [
......
......@@ -493,6 +493,34 @@ RSpec.describe Ci::Build do
expect(build.queuing_entry).to be_present
end
end
context 'when build has been picked by a shared runner' do
let(:build) { create(:ci_build, :pending) }
it 'creates runtime metadata entry' do
build.runner = create(:ci_runner, :instance_type)
build.run!
expect(build.reload.runtime_metadata).to be_present
end
end
end
describe '#drop' do
context 'when has a runtime tracking entry' do
let(:build) { create(:ci_build, :pending) }
it 'removes runtime tracking entry' do
build.runner = create(:ci_runner, :instance_type)
build.run!
expect(build.reload.runtime_metadata).to be_present
build.drop!
expect(build.reload.runtime_metadata).not_to be_present
end
end
end
describe '#schedulable?' do
......@@ -5181,4 +5209,34 @@ RSpec.describe Ci::Build do
it { expect(matcher.project).to eq(build.project) }
end
describe '#shared_runner_build?' do
context 'when build does not have a runner assigned' do
it 'is not a shared runner build' do
expect(build.runner).to be_nil
expect(build).not_to be_shared_runner_build
end
end
context 'when build has a project runner assigned' do
before do
build.runner = create(:ci_runner, :project)
end
it 'is not a shared runner build' do
expect(build).not_to be_shared_runner_build
end
end
context 'when build has an instance runner assigned' do
before do
build.runner = create(:ci_runner, :instance_type)
end
it 'is a shared runner build' do
expect(build).to be_shared_runner_build
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::RunningBuild do
let_it_be(:project) { create(:project) }
let_it_be(:pipeline) { create(:ci_pipeline, project: project) }
let(:runner) { create(:ci_runner, :instance_type) }
let(:build) { create(:ci_build, :running, runner: runner, pipeline: pipeline) }
describe '.upsert_shared_runner_build!' do
context 'another pending entry does not exist' do
it 'creates a new pending entry' do
result = described_class.upsert_shared_runner_build!(build)
expect(result.rows.dig(0, 0)).to eq build.id
expect(build.reload.runtime_metadata).to be_present
end
end
context 'when another queuing entry exists for given build' do
before do
described_class.create!(build: build,
project: project,
runner: runner,
runner_type: runner.runner_type)
end
it 'returns a build id as a result' do
result = described_class.upsert_shared_runner_build!(build)
expect(result.rows.dig(0, 0)).to eq build.id
end
end
context 'when build has been picked by a specific runner' do
let(:runner) { create(:ci_runner, :project) }
it 'raises an error' do
expect { described_class.upsert_shared_runner_build!(build) }
.to raise_error(ArgumentError, 'build has not been picked by a shared runner')
end
end
context 'when build has not been picked by a runner yet' do
let(:build) { create(:ci_build, pipeline: pipeline) }
it 'raises an error' do
expect { described_class.upsert_shared_runner_build!(build) }
.to raise_error(ArgumentError, 'build has not been picked by a shared runner')
end
end
end
end
......@@ -60,7 +60,7 @@ RSpec.describe Ci::RetryBuildService do
artifacts_file artifacts_metadata artifacts_size commands
resource resource_group_id processed security_scans author
pipeline_id report_results pending_state pages_deployments
queuing_entry].freeze
queuing_entry runtime_metadata].freeze
shared_examples 'build duplication' do
let_it_be(:another_pipeline) { create(:ci_empty_pipeline, project: project) }
......
......@@ -4,101 +4,208 @@ require 'spec_helper'
RSpec.describe Ci::UpdateBuildQueueService do
let(:project) { create(:project, :repository) }
let(:build) { create(:ci_build, pipeline: pipeline) }
let(:pipeline) { create(:ci_pipeline, project: project) }
let(:build) { create(:ci_build, pipeline: pipeline) }
describe '#push' do
let(:transition) { double('transition') }
describe 'pending builds queue push / pop' do
describe '#push' do
let(:transition) { double('transition') }
before do
allow(transition).to receive(:to).and_return('pending')
allow(transition).to receive(:within_transaction).and_yield
end
before do
allow(transition).to receive(:to).and_return('pending')
allow(transition).to receive(:within_transaction).and_yield
end
context 'when pending build can be created' do
it 'creates a new pending build in transaction' do
queued = subject.push(build, transition)
context 'when pending build can be created' do
it 'creates a new pending build in transaction' do
queued = subject.push(build, transition)
expect(queued).to eq build.id
end
expect(queued).to eq build.id
end
it 'increments queue push metric' do
metrics = spy('metrics')
it 'increments queue push metric' do
metrics = spy('metrics')
described_class.new(metrics).push(build, transition)
described_class.new(metrics).push(build, transition)
expect(metrics)
.to have_received(:increment_queue_operation)
.with(:build_queue_push)
expect(metrics)
.to have_received(:increment_queue_operation)
.with(:build_queue_push)
end
end
end
context 'when invalid transition is detected' do
it 'raises an error' do
allow(transition).to receive(:to).and_return('created')
context 'when invalid transition is detected' do
it 'raises an error' do
allow(transition).to receive(:to).and_return('created')
expect { subject.push(build, transition) }
.to raise_error(described_class::InvalidQueueTransition)
expect { subject.push(build, transition) }
.to raise_error(described_class::InvalidQueueTransition)
end
end
context 'when duplicate entry exists' do
before do
::Ci::PendingBuild.create!(build: build, project: project)
end
it 'does nothing and returns build id' do
queued = subject.push(build, transition)
expect(queued).to eq build.id
end
end
end
context 'when duplicate entry exists' do
describe '#pop' do
let(:transition) { double('transition') }
before do
::Ci::PendingBuild.create!(build: build, project: project)
allow(transition).to receive(:from).and_return('pending')
allow(transition).to receive(:within_transaction).and_yield
end
it 'does nothing and returns build id' do
queued = subject.push(build, transition)
context 'when pending build exists' do
before do
Ci::PendingBuild.create!(build: build, project: project)
end
expect(queued).to eq build.id
it 'removes pending build in a transaction' do
dequeued = subject.pop(build, transition)
expect(dequeued).to eq build.id
end
it 'increments queue pop metric' do
metrics = spy('metrics')
described_class.new(metrics).pop(build, transition)
expect(metrics)
.to have_received(:increment_queue_operation)
.with(:build_queue_pop)
end
end
context 'when pending build does not exist' do
it 'does nothing if there is no pending build to remove' do
dequeued = subject.pop(build, transition)
expect(dequeued).to be_nil
end
end
context 'when invalid transition is detected' do
it 'raises an error' do
allow(transition).to receive(:from).and_return('created')
expect { subject.pop(build, transition) }
.to raise_error(described_class::InvalidQueueTransition)
end
end
end
end
describe '#pop' do
let(:transition) { double('transition') }
describe 'shared runner builds tracking' do
let(:runner) { create(:ci_runner, :instance_type) }
let(:build) { create(:ci_build, runner: runner, pipeline: pipeline) }
before do
allow(transition).to receive(:from).and_return('pending')
allow(transition).to receive(:within_transaction).and_yield
end
describe '#track' do
let(:transition) { double('transition') }
context 'when pending build exists' do
before do
Ci::PendingBuild.create!(build: build, project: project)
allow(transition).to receive(:to).and_return('running')
allow(transition).to receive(:within_transaction).and_yield
end
it 'removes pending build in a transaction' do
dequeued = subject.pop(build, transition)
context 'when a shared runner build can be tracked' do
it 'creates a new shared runner build tracking entry' do
build_id = subject.track(build, transition)
expect(build_id).to eq build.id
end
it 'increments new shared runner build metric' do
metrics = spy('metrics')
described_class.new(metrics).track(build, transition)
expect(dequeued).to eq build.id
expect(metrics)
.to have_received(:increment_queue_operation)
.with(:shared_runner_build_new)
end
end
it 'increments queue pop metric' do
metrics = spy('metrics')
context 'when invalid transition is detected' do
it 'raises an error' do
allow(transition).to receive(:to).and_return('pending')
described_class.new(metrics).pop(build, transition)
expect { subject.track(build, transition) }
.to raise_error(described_class::InvalidQueueTransition)
end
end
expect(metrics)
.to have_received(:increment_queue_operation)
.with(:build_queue_pop)
context 'when duplicate entry exists' do
before do
::Ci::RunningBuild.create!(
build: build, project: project, runner: runner, runner_type: runner.runner_type
)
end
it 'does nothing and returns build id' do
build_id = subject.track(build, transition)
expect(build_id).to eq build.id
end
end
end
context 'when pending build does not exist' do
it 'does nothing if there is no pending build to remove' do
dequeued = subject.pop(build, transition)
describe '#untrack' do
let(:transition) { double('transition') }
expect(dequeued).to be_nil
before do
allow(transition).to receive(:from).and_return('running')
allow(transition).to receive(:within_transaction).and_yield
end
end
context 'when invalid transition is detected' do
it 'raises an error' do
allow(transition).to receive(:from).and_return('created')
context 'when shared runner build tracking entry exists' do
before do
Ci::RunningBuild.create!(
build: build, project: project, runner: runner, runner_type: runner.runner_type
)
end
it 'removes shared runner build' do
build_id = subject.untrack(build, transition)
expect(build_id).to eq build.id
end
it 'increments shared runner build done metric' do
metrics = spy('metrics')
described_class.new(metrics).untrack(build, transition)
expect(metrics)
.to have_received(:increment_queue_operation)
.with(:shared_runner_build_done)
end
end
context 'when tracking entry does not exist' do
it 'does nothing if there is no tracking entry to remove' do
build_id = subject.untrack(build, transition)
expect(build_id).to be_nil
end
end
expect { subject.pop(build, transition) }
.to raise_error(described_class::InvalidQueueTransition)
context 'when invalid transition is detected' do
it 'raises an error' do
allow(transition).to receive(:from).and_return('pending')
expect { subject.untrack(build, transition) }
.to raise_error(described_class::InvalidQueueTransition)
end
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment