Commit 7204a481 authored by Grzegorz Bizon's avatar Grzegorz Bizon

Add tracking of running shared runner builds

This commit adds support for tracking running builds that have been
picked by shared runners. This will make it easier to retrieve this
information to performe more efficient queuing of the builds. This
information is going to be used by the fair scheduling algorithm.

Changelog: performance
parent 90d0ad42
......@@ -39,6 +39,7 @@ module Ci
has_one :deployment, as: :deployable, class_name: 'Deployment'
has_one :pending_state, class_name: 'Ci::BuildPendingState', inverse_of: :build
has_one :queuing_entry, class_name: 'Ci::PendingBuild', foreign_key: :build_id
has_one :shared_runner_metadata, class_name: 'Ci::SharedRunnerBuild', foreign_key: :build_id
has_many :trace_sections, class_name: 'Ci::BuildTraceSection'
has_many :trace_chunks, class_name: 'Ci::BuildTraceChunk', foreign_key: :build_id, inverse_of: :build
has_many :report_results, class_name: 'Ci::BuildReportResult', inverse_of: :build
......@@ -1068,16 +1069,22 @@ module Ci
options.dig(:allow_failure_criteria, :exit_codes).present?
end
def all_queuing_entries
# We can have only one queuing entry, because there is a unique index on
# `build_id`, but we need a relation to remove this single queuing entry
# more efficiently in a single statement without actually load data.
def create_queuing_entry!
::Ci::PendingBuild.upsert_from_build!(self)
end
##
# We can have only one queuing entry or shared runner build tracking entry,
# because there is a unique index on `build_id` in each table, but we need
# a relation to remove these entries more efficiently in a single statement
# without actually loading data.
#
def all_queuing_entries
::Ci::PendingBuild.where(build_id: self.id)
end
def create_queuing_entry!
::Ci::PendingBuild.upsert_from_build!(self)
def all_shared_runner_metadata
::Ci::SharedRunnerBuild.where(build_id: self.id)
end
protected
......
# frozen_string_literal: true
module Ci
class SharedRunnerBuild < ApplicationRecord
extend Gitlab::Ci::Model
belongs_to :project
belongs_to :build, class_name: 'Ci::Build'
belongs_to :runner, class_name: 'Ci::Runner'
def self.upsert_from_build!(build)
unless build.runner && build.runner.instance_type?
raise ArgumentError, 'build has not been picked by a shared runner'
end
entry = self.new(build: build, project: build.project, runner: build.runner)
entry.validate!
self.upsert(entry.attributes.compact, returning: %w[build_id], unique_by: :build_id)
end
end
end
......@@ -48,6 +48,44 @@ module Ci
end
end
##
# Add shared runner build tracking entry (used for queuing).
#
def track(build, transition)
return unless Feature.enabled?(:ci_track_shared_runner_builds, build.project, default_enabled: :yaml)
raise InvalidQueueTransition unless transition.to == 'running'
transition.within_transaction do
result = ::Ci::SharedRunnerBuild.upsert_from_build!(build)
unless result.empty?
metrics.increment_queue_operation(:shared_runner_build_new)
result.rows.dig(0, 0)
end
end
end
##
# Remove a shared runner build tracking entry (used for queuing).
#
def untrack(build, transition)
return unless Feature.enabled?(:ci_untrack_shared_runner_builds, build.project, default_enabled: :yaml)
raise InvalidQueueTransition unless transition.from == 'running'
transition.within_transaction do
removed = build.all_shared_runner_metadata.delete_all
if removed > 0
metrics.increment_queue_operation(:shared_runner_build_done)
build.id
end
end
end
##
# Unblock runner associated with given project / build
#
......
---
name: ci_track_shared_runner_builds
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/62912
rollout_issue_url:
milestone: '14.0'
type: development
group: group::pipeline execution
default_enabled: false
---
name: ci_untrack_shared_runner_builds
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/62912
rollout_issue_url:
milestone: '14.0'
type: development
group: group::pipeline execution
default_enabled: false
......@@ -33,7 +33,9 @@ module Gitlab
:queue_replication_lag,
:runner_pre_assign_checks_failed,
:runner_pre_assign_checks_success,
:runner_queue_tick
:runner_queue_tick,
:shared_runner_build_new,
:shared_runner_build_done
].to_set.freeze
QUEUE_DEPTH_HISTOGRAMS = [
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::SharedRunnerBuild do
let_it_be(:project) { create(:project) }
let_it_be(:pipeline) { create(:ci_pipeline, project: project) }
let(:runner) { create(:ci_runner, :instance_type) }
let(:build) { create(:ci_build, :running, runner: runner, pipeline: pipeline) }
describe '.upsert_from_build!' do
context 'another pending entry does not exist' do
it 'creates a new pending entry' do
result = described_class.upsert_from_build!(build)
expect(result.rows.dig(0, 0)).to eq build.id
expect(build.reload.shared_runner_metadata).to be_present
end
end
context 'when another queuing entry exists for given build' do
before do
described_class.create!(build: build, project: project, runner: runner)
end
it 'returns a build id as a result' do
result = described_class.upsert_from_build!(build)
expect(result.rows.dig(0, 0)).to eq build.id
end
end
context 'when build has been picked by a specific runner' do
let(:runner) { create(:ci_runner, :project) }
it 'raises an error' do
expect { described_class.upsert_from_build!(build) }
.to raise_error(ArgumentError, 'build has not been picked by a shared runner')
end
end
context 'when build has not been picked by a runner yet' do
let(:build) { create(:ci_build, pipeline: pipeline) }
it 'raises an error' do
expect { described_class.upsert_from_build!(build) }
.to raise_error(ArgumentError, 'build has not been picked by a shared runner')
end
end
end
end
......@@ -60,7 +60,7 @@ RSpec.describe Ci::RetryBuildService do
artifacts_file artifacts_metadata artifacts_size commands
resource resource_group_id processed security_scans author
pipeline_id report_results pending_state pages_deployments
queuing_entry].freeze
queuing_entry shared_runner_metadata].freeze
shared_examples 'build duplication' do
let_it_be(:another_pipeline) { create(:ci_empty_pipeline, project: project) }
......
......@@ -4,101 +4,206 @@ require 'spec_helper'
RSpec.describe Ci::UpdateBuildQueueService do
let(:project) { create(:project, :repository) }
let(:build) { create(:ci_build, pipeline: pipeline) }
let(:pipeline) { create(:ci_pipeline, project: project) }
let(:build) { create(:ci_build, pipeline: pipeline) }
describe '#push' do
let(:transition) { double('transition') }
describe 'pending builds queue push / pop' do
describe '#push' do
let(:transition) { double('transition') }
before do
allow(transition).to receive(:to).and_return('pending')
allow(transition).to receive(:within_transaction).and_yield
end
before do
allow(transition).to receive(:to).and_return('pending')
allow(transition).to receive(:within_transaction).and_yield
end
context 'when pending build can be created' do
it 'creates a new pending build in transaction' do
queued = subject.push(build, transition)
context 'when pending build can be created' do
it 'creates a new pending build in transaction' do
queued = subject.push(build, transition)
expect(queued).to eq build.id
end
expect(queued).to eq build.id
end
it 'increments queue push metric' do
metrics = spy('metrics')
it 'increments queue push metric' do
metrics = spy('metrics')
described_class.new(metrics).push(build, transition)
described_class.new(metrics).push(build, transition)
expect(metrics)
.to have_received(:increment_queue_operation)
.with(:build_queue_push)
expect(metrics)
.to have_received(:increment_queue_operation)
.with(:build_queue_push)
end
end
end
context 'when invalid transition is detected' do
it 'raises an error' do
allow(transition).to receive(:to).and_return('created')
context 'when invalid transition is detected' do
it 'raises an error' do
allow(transition).to receive(:to).and_return('created')
expect { subject.push(build, transition) }
.to raise_error(described_class::InvalidQueueTransition)
expect { subject.push(build, transition) }
.to raise_error(described_class::InvalidQueueTransition)
end
end
context 'when duplicate entry exists' do
before do
::Ci::PendingBuild.create!(build: build, project: project)
end
it 'does nothing and returns build id' do
queued = subject.push(build, transition)
expect(queued).to eq build.id
end
end
end
context 'when duplicate entry exists' do
describe '#pop' do
let(:transition) { double('transition') }
before do
::Ci::PendingBuild.create!(build: build, project: project)
allow(transition).to receive(:from).and_return('pending')
allow(transition).to receive(:within_transaction).and_yield
end
context 'when pending build exists' do
before do
Ci::PendingBuild.create!(build: build, project: project)
end
it 'removes pending build in a transaction' do
dequeued = subject.pop(build, transition)
expect(dequeued).to eq build.id
end
it 'increments queue pop metric' do
metrics = spy('metrics')
described_class.new(metrics).pop(build, transition)
expect(metrics)
.to have_received(:increment_queue_operation)
.with(:build_queue_pop)
end
end
it 'does nothing and returns build id' do
queued = subject.push(build, transition)
context 'when pending build does not exist' do
it 'does nothing if there is no pending build to remove' do
dequeued = subject.pop(build, transition)
expect(queued).to eq build.id
expect(dequeued).to be_nil
end
end
context 'when invalid transition is detected' do
it 'raises an error' do
allow(transition).to receive(:from).and_return('created')
expect { subject.pop(build, transition) }
.to raise_error(described_class::InvalidQueueTransition)
end
end
end
end
describe '#pop' do
let(:transition) { double('transition') }
describe 'shared runner builds tracking' do
let(:runner) { create(:ci_runner, :instance_type) }
let(:build) { create(:ci_build, runner: runner, pipeline: pipeline) }
before do
allow(transition).to receive(:from).and_return('pending')
allow(transition).to receive(:within_transaction).and_yield
end
describe '#track' do
let(:transition) { double('transition') }
context 'when pending build exists' do
before do
Ci::PendingBuild.create!(build: build, project: project)
allow(transition).to receive(:to).and_return('running')
allow(transition).to receive(:within_transaction).and_yield
end
context 'when a shared runner build can be tracked' do
it 'creates a new shared runner build tracking entry' do
build_id = subject.track(build, transition)
expect(build_id).to eq build.id
end
it 'increments new shared runner build metric' do
metrics = spy('metrics')
described_class.new(metrics).track(build, transition)
expect(metrics)
.to have_received(:increment_queue_operation)
.with(:shared_runner_build_new)
end
end
it 'removes pending build in a transaction' do
dequeued = subject.pop(build, transition)
context 'when invalid transition is detected' do
it 'raises an error' do
allow(transition).to receive(:to).and_return('pending')
expect(dequeued).to eq build.id
expect { subject.track(build, transition) }
.to raise_error(described_class::InvalidQueueTransition)
end
end
it 'increments queue pop metric' do
metrics = spy('metrics')
context 'when duplicate entry exists' do
before do
::Ci::SharedRunnerBuild
.create!(build: build, project: project, runner: runner)
end
described_class.new(metrics).pop(build, transition)
it 'does nothing and returns build id' do
build_id = subject.track(build, transition)
expect(metrics)
.to have_received(:increment_queue_operation)
.with(:build_queue_pop)
expect(build_id).to eq build.id
end
end
end
context 'when pending build does not exist' do
it 'does nothing if there is no pending build to remove' do
dequeued = subject.pop(build, transition)
describe '#untrack' do
let(:transition) { double('transition') }
expect(dequeued).to be_nil
before do
allow(transition).to receive(:from).and_return('running')
allow(transition).to receive(:within_transaction).and_yield
end
end
context 'when invalid transition is detected' do
it 'raises an error' do
allow(transition).to receive(:from).and_return('created')
context 'when shared runner build tracking entry exists' do
before do
Ci::SharedRunnerBuild
.create!(build: build, project: project, runner: runner)
end
it 'removes shared runner build' do
build_id = subject.untrack(build, transition)
expect(build_id).to eq build.id
end
it 'increments shared runner build done metric' do
metrics = spy('metrics')
described_class.new(metrics).untrack(build, transition)
expect(metrics)
.to have_received(:increment_queue_operation)
.with(:shared_runner_build_done)
end
end
context 'when tracking entry does not exist' do
it 'does nothing if there is no tracking entry to remove' do
build_id = subject.untrack(build, transition)
expect(build_id).to be_nil
end
end
expect { subject.pop(build, transition) }
.to raise_error(described_class::InvalidQueueTransition)
context 'when invalid transition is detected' do
it 'raises an error' do
allow(transition).to receive(:from).and_return('pending')
expect { subject.untrack(build, transition) }
.to raise_error(described_class::InvalidQueueTransition)
end
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment