Commit d0547293 authored by Kamil Trzciński's avatar Kamil Trzciński

Merge branch 'feature/remove-ci-build-from-queuing-query' into 'master'

Use pending build table as a source for builds queue [RUN ALL RSPEC] [RUN AS-IF-FOSS]

See merge request gitlab-org/gitlab!64093
parents 4c088b55 3886bfc0
......@@ -11,7 +11,6 @@ module Ci
include Importable
include Ci::HasRef
include IgnorableColumns
include TaggableQueries
BuildArchivedError = Class.new(StandardError)
......@@ -179,25 +178,6 @@ module Ci
joins(:metadata).where("ci_builds_metadata.config_options -> 'artifacts' -> 'reports' ?| array[:job_types]", job_types: job_types)
end
scope :matches_tag_ids, -> (tag_ids) do
matcher = ::ActsAsTaggableOn::Tagging
.where(taggable_type: CommitStatus.name)
.where(context: 'tags')
.where('taggable_id = ci_builds.id')
.where.not(tag_id: tag_ids).select('1')
where("NOT EXISTS (?)", matcher)
end
scope :with_any_tags, -> do
matcher = ::ActsAsTaggableOn::Tagging
.where(taggable_type: CommitStatus.name)
.where(context: 'tags')
.where('taggable_id = ci_builds.id').select('1')
where("EXISTS (?)", matcher)
end
scope :queued_before, ->(time) { where(arel_table[:queued_at].lt(time)) }
scope :preload_project_and_pipeline_project, -> do
......
......@@ -7,6 +7,9 @@ module Ci
belongs_to :project
belongs_to :build, class_name: 'Ci::Build'
scope :ref_protected, -> { where(protected: true) }
scope :queued_before, ->(time) { where(arel_table[:created_at].lt(time)) }
def self.upsert_from_build!(build)
entry = self.new(build: build, project: build.project, protected: build.protected?)
......
......@@ -7,6 +7,7 @@ class CommitStatus < ApplicationRecord
include Presentable
include EnumWithNil
include BulkInsertableAssociations
include TaggableQueries
self.table_name = 'ci_builds'
......
......@@ -12,5 +12,26 @@ module TaggableQueries
.where(taggings: { context: context, taggable_type: polymorphic_name })
.select('COALESCE(array_agg(tags.name ORDER BY name), ARRAY[]::text[])')
end
def matches_tag_ids(tag_ids, table: quoted_table_name, column: 'id')
matcher = ::ActsAsTaggableOn::Tagging
.where(taggable_type: CommitStatus.name)
.where(context: 'tags')
.where("taggable_id = #{connection.quote_table_name(table)}.#{connection.quote_column_name(column)}") # rubocop:disable GitlabSecurity/SqlInjection
.where.not(tag_id: tag_ids)
.select('1')
where("NOT EXISTS (?)", matcher)
end
def with_any_tags(table: quoted_table_name, column: 'id')
matcher = ::ActsAsTaggableOn::Tagging
.where(taggable_type: CommitStatus.name)
.where(context: 'tags')
.where("taggable_id = #{connection.quote_table_name(table)}.#{connection.quote_column_name(column)}") # rubocop:disable GitlabSecurity/SqlInjection
.select('1')
where("EXISTS (?)", matcher)
end
end
end
# frozen_string_literal: true
module Ci
module Queue
class BuildQueueService
include ::Gitlab::Utils::StrongMemoize
attr_reader :runner
def initialize(runner)
@runner = runner
end
def new_builds
strategy.new_builds
end
##
# This is overridden in EE
#
def builds_for_shared_runner
strategy.builds_for_shared_runner
end
# rubocop:disable CodeReuse/ActiveRecord
def builds_for_group_runner
# Workaround for weird Rails bug, that makes `runner.groups.to_sql` to return `runner_id = NULL`
groups = ::Group.joins(:runner_namespaces).merge(runner.runner_namespaces)
hierarchy_groups = Gitlab::ObjectHierarchy
.new(groups, options: { use_distinct: ::Feature.enabled?(:use_distinct_in_register_job_object_hierarchy) })
.base_and_descendants
projects = Project.where(namespace_id: hierarchy_groups)
.with_group_runners_enabled
.with_builds_enabled
.without_deleted
relation = new_builds.where(project: projects)
order(relation)
end
def builds_for_project_runner
relation = new_builds
.where(project: runner.projects.without_deleted.with_builds_enabled)
order(relation)
end
def builds_queued_before(relation, time)
relation.queued_before(time)
end
def builds_for_protected_runner(relation)
relation.ref_protected
end
def builds_matching_tag_ids(relation, ids)
strategy.builds_matching_tag_ids(relation, ids)
end
def builds_with_any_tags(relation)
strategy.builds_with_any_tags(relation)
end
def order(relation)
strategy.order(relation)
end
def execute(relation)
strategy.build_ids(relation)
end
private
def strategy
strong_memoize(:strategy) do
if ::Feature.enabled?(:ci_pending_builds_queue_source, runner, default_enabled: :yaml)
Queue::PendingBuildsStrategy.new(runner)
else
Queue::BuildsTableStrategy.new(runner)
end
end
end
end
end
end
Ci::Queue::BuildQueueService.prepend_mod_with('Ci::Queue::BuildQueueService')
# frozen_string_literal: true
module Ci
module Queue
class BuildsTableStrategy
attr_reader :runner
def initialize(runner)
@runner = runner
end
# rubocop:disable CodeReuse/ActiveRecord
def builds_for_shared_runner
relation = new_builds
# don't run projects which have not enabled shared runners and builds
.joins('INNER JOIN projects ON ci_builds.project_id = projects.id')
.where(projects: { shared_runners_enabled: true, pending_delete: false })
.joins('LEFT JOIN project_features ON ci_builds.project_id = project_features.project_id')
.where('project_features.builds_access_level IS NULL or project_features.builds_access_level > 0')
if Feature.enabled?(:ci_queueing_disaster_recovery, runner, type: :ops, default_enabled: :yaml)
# if disaster recovery is enabled, we fallback to FIFO scheduling
relation.order('ci_builds.id ASC')
else
# Implement fair scheduling
# this returns builds that are ordered by number of running builds
# we prefer projects that don't use shared runners at all
relation
.joins("LEFT JOIN (#{running_builds_for_shared_runners.to_sql}) AS project_builds ON ci_builds.project_id = project_builds.project_id")
.order(Arel.sql('COALESCE(project_builds.running_builds, 0) ASC'), 'ci_builds.id ASC')
end
end
def builds_matching_tag_ids(relation, ids)
# pick builds that does not have other tags than runner's one
relation.matches_tag_ids(ids)
end
def builds_with_any_tags(relation)
# pick builds that have at least one tag
relation.with_any_tags
end
def order(relation)
relation.order('id ASC')
end
def new_builds
::Ci::Build.pending.unstarted
end
def build_ids(relation)
relation.pluck(:id)
end
private
def running_builds_for_shared_runners
::Ci::Build.running
.where(runner: ::Ci::Runner.instance_type)
.group(:project_id)
.select(:project_id, 'COUNT(*) AS running_builds')
end
# rubocop:enable CodeReuse/ActiveRecord
end
end
end
# frozen_string_literal: true
module Ci
module Queue
class PendingBuildsStrategy
attr_reader :runner
def initialize(runner)
@runner = runner
end
# rubocop:disable CodeReuse/ActiveRecord
def builds_for_shared_runner
relation = new_builds
# don't run projects which have not enabled shared runners and builds
.joins('INNER JOIN projects ON ci_pending_builds.project_id = projects.id')
.where(projects: { shared_runners_enabled: true, pending_delete: false })
.joins('LEFT JOIN project_features ON ci_pending_builds.project_id = project_features.project_id')
.where('project_features.builds_access_level IS NULL or project_features.builds_access_level > 0')
if Feature.enabled?(:ci_queueing_disaster_recovery, runner, type: :ops, default_enabled: :yaml)
# if disaster recovery is enabled, we fallback to FIFO scheduling
relation.order('ci_pending_builds.build_id ASC')
else
# Implement fair scheduling
# this returns builds that are ordered by number of running builds
# we prefer projects that don't use shared runners at all
relation
.joins("LEFT JOIN (#{running_builds_for_shared_runners.to_sql}) AS project_builds ON ci_pending_builds.project_id=project_builds.project_id")
.order(Arel.sql('COALESCE(project_builds.running_builds, 0) ASC'), 'ci_pending_builds.build_id ASC')
end
end
def builds_matching_tag_ids(relation, ids)
relation.merge(CommitStatus.matches_tag_ids(ids, table: 'ci_pending_builds', column: 'build_id'))
end
def builds_with_any_tags(relation)
relation.merge(CommitStatus.with_any_tags(table: 'ci_pending_builds', column: 'build_id'))
end
def order(relation)
relation.order('build_id ASC')
end
def new_builds
::Ci::PendingBuild.all
end
def build_ids(relation)
relation.pluck(:build_id)
end
private
def running_builds_for_shared_runners
::Ci::RunningBuild
.instance_type
.group(:project_id)
.select(:project_id, 'COUNT(*) AS running_builds')
end
# rubocop:enable CodeReuse/ActiveRecord
end
end
end
......@@ -103,35 +103,40 @@ module Ci
# rubocop: disable CodeReuse/ActiveRecord
def each_build(params, &blk)
builds =
queue = ::Ci::Queue::BuildQueueService.new(runner)
builds = begin
if runner.instance_type?
builds_for_shared_runner
queue.builds_for_shared_runner
elsif runner.group_type?
builds_for_group_runner
queue.builds_for_group_runner
else
builds_for_project_runner
queue.builds_for_project_runner
end
end
if runner.ref_protected?
builds = queue.builds_for_protected_runner(builds)
end
# pick builds that does not have other tags than runner's one
builds = builds.matches_tag_ids(runner.tags.ids)
builds = queue.builds_matching_tag_ids(builds, runner.tags.ids)
# pick builds that have at least one tag
unless runner.run_untagged?
builds = builds.with_any_tags
builds = queue.builds_with_any_tags(builds)
end
# pick builds that older than specified age
if params.key?(:job_age)
builds = builds.queued_before(params[:job_age].seconds.ago)
builds = queue.builds_queued_before(builds, params[:job_age].seconds.ago)
end
build_ids = retrieve_queue(-> { builds.pluck(:id) })
build_ids = retrieve_queue(-> { queue.execute(builds) })
@metrics.observe_queue_size(-> { build_ids.size }, @runner.runner_type)
build_ids.each do |build_id|
yield Ci::Build.find(build_id)
end
build_ids.each { |build_id| yield Ci::Build.find(build_id) }
end
# rubocop: enable CodeReuse/ActiveRecord
......@@ -259,63 +264,6 @@ module Ci
)
end
# rubocop: disable CodeReuse/ActiveRecord
def builds_for_shared_runner
relation = new_builds.
# don't run projects which have not enabled shared runners and builds
joins(:project).where(projects: { shared_runners_enabled: true, pending_delete: false })
.joins('LEFT JOIN project_features ON ci_builds.project_id = project_features.project_id')
.where('project_features.builds_access_level IS NULL or project_features.builds_access_level > 0')
if Feature.enabled?(:ci_queueing_disaster_recovery, runner, type: :ops, default_enabled: :yaml)
# if disaster recovery is enabled, we fallback to FIFO scheduling
relation.order('ci_builds.id ASC')
else
# Implement fair scheduling
# this returns builds that are ordered by number of running builds
# we prefer projects that don't use shared runners at all
relation
.joins("LEFT JOIN (#{running_builds_for_shared_runners.to_sql}) AS project_builds ON ci_builds.project_id=project_builds.project_id")
.order(Arel.sql('COALESCE(project_builds.running_builds, 0) ASC'), 'ci_builds.id ASC')
end
end
def builds_for_project_runner
new_builds.where(project: runner.projects.without_deleted.with_builds_enabled).order('id ASC')
end
def builds_for_group_runner
# Workaround for weird Rails bug, that makes `runner.groups.to_sql` to return `runner_id = NULL`
groups = ::Group.joins(:runner_namespaces).merge(runner.runner_namespaces)
hierarchy_groups = Gitlab::ObjectHierarchy.new(groups, options: { use_distinct: Feature.enabled?(:use_distinct_in_register_job_object_hierarchy) }).base_and_descendants
projects = Project.where(namespace_id: hierarchy_groups)
.with_group_runners_enabled
.with_builds_enabled
.without_deleted
new_builds.where(project: projects).order('id ASC')
end
def running_builds_for_shared_runners
Ci::Build.running.where(runner: Ci::Runner.instance_type)
.group(:project_id).select(:project_id, 'count(*) AS running_builds')
end
def all_builds
if Feature.enabled?(:ci_pending_builds_queue_join, runner, default_enabled: :yaml)
Ci::Build.joins(:queuing_entry)
else
Ci::Build.all
end
end
# rubocop: enable CodeReuse/ActiveRecord
def new_builds
builds = all_builds.pending.unstarted
builds = builds.ref_protected if runner.ref_protected?
builds
end
def pre_assign_runner_checks
{
missing_dependency_failure: -> (build, _) { !build.has_valid_build_dependencies? },
......
---
name: ci_pending_builds_queue_join
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/62195
name: ci_pending_builds_queue_source
introduced_by_url:
rollout_issue_url:
milestone: '13.12'
milestone: '14.0'
type: development
group: group::pipeline execution
default_enabled: false
# frozen_string_literal: true
module EE
module Ci
module Queue
module BuildQueueService
extend ActiveSupport::Concern
extend ::Gitlab::Utils::Override
override :builds_for_shared_runner
def builds_for_shared_runner
# if disaster recovery is enabled, we disable quota
if ::Feature.enabled?(:ci_queueing_disaster_recovery, runner, type: :ops, default_enabled: :yaml)
super
else
enforce_minutes_based_on_cost_factors(super)
end
end
# rubocop: disable CodeReuse/ActiveRecord
def enforce_minutes_based_on_cost_factors(relation)
visibility_relation = ::CommitStatus.where(
projects: { visibility_level: runner.visibility_levels_without_minutes_quota })
enforce_limits_relation = ::CommitStatus.where('EXISTS (?)', builds_check_limit)
relation.merge(visibility_relation.or(enforce_limits_relation))
end
def builds_check_limit
all_namespaces
.joins('LEFT JOIN namespace_statistics ON namespace_statistics.namespace_id = namespaces.id')
.where('COALESCE(namespaces.shared_runners_minutes_limit, ?, 0) = 0 OR ' \
'COALESCE(namespace_statistics.shared_runners_seconds, 0) < ' \
'COALESCE('\
'(namespaces.shared_runners_minutes_limit + COALESCE(namespaces.extra_shared_runners_minutes_limit, 0)), ' \
'(? + COALESCE(namespaces.extra_shared_runners_minutes_limit, 0)), ' \
'0) * 60',
application_shared_runners_minutes, application_shared_runners_minutes)
.select('1')
end
def all_namespaces
if traversal_ids_enabled?
::Namespace
.where('namespaces.id = project_namespaces.traversal_ids[1]')
.joins('INNER JOIN namespaces as project_namespaces ON project_namespaces.id = projects.namespace_id')
else
namespaces = ::Namespace.reorder(nil).where('namespaces.id = projects.namespace_id')
::Gitlab::ObjectHierarchy.new(namespaces, options: { skip_ordering: true }).roots
end
end
# rubocop: enable CodeReuse/ActiveRecord
def application_shared_runners_minutes
::Gitlab::CurrentSettings.shared_runners_minutes
end
def traversal_ids_enabled?
::Feature.enabled?(:sync_traversal_ids, default_enabled: :yaml) &&
::Feature.enabled?(:traversal_ids_for_quota_calculation, type: :development, default_enabled: :yaml)
end
end
end
end
end
......@@ -2,71 +2,9 @@
module EE
module Ci
# RegisterJobService EE mixin
#
# This module is intended to encapsulate EE-specific service logic
# and be included in the `RegisterJobService` service
module RegisterJobService
extend ActiveSupport::Concern
extend ::Gitlab::Utils::Override
def builds_for_shared_runner
# if disaster recovery is enabled, we disable quota
if ::Feature.enabled?(:ci_queueing_disaster_recovery, runner, type: :ops, default_enabled: :yaml)
super
else
enforce_minutes_based_on_cost_factors(super)
end
end
# rubocop: disable CodeReuse/ActiveRecord
def enforce_minutes_based_on_cost_factors(relation)
visibility_relation = ::Ci::Build.where(
projects: { visibility_level: runner.visibility_levels_without_minutes_quota })
enforce_limits_relation = ::Ci::Build.where('EXISTS (?)', builds_check_limit)
relation.merge(visibility_relation.or(enforce_limits_relation))
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def builds_check_limit
all_namespaces
.joins('LEFT JOIN namespace_statistics ON namespace_statistics.namespace_id = namespaces.id')
.where('COALESCE(namespaces.shared_runners_minutes_limit, ?, 0) = 0 OR ' \
'COALESCE(namespace_statistics.shared_runners_seconds, 0) < ' \
'COALESCE('\
'(namespaces.shared_runners_minutes_limit + COALESCE(namespaces.extra_shared_runners_minutes_limit, 0)), ' \
'(? + COALESCE(namespaces.extra_shared_runners_minutes_limit, 0)), ' \
'0) * 60',
application_shared_runners_minutes, application_shared_runners_minutes)
.select('1')
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def all_namespaces
if traversal_ids_enabled?
::Namespace
.where('namespaces.id = project_namespaces.traversal_ids[1]')
.joins('INNER JOIN namespaces as project_namespaces ON project_namespaces.id = projects.namespace_id')
else
namespaces = ::Namespace.reorder(nil).where('namespaces.id = projects.namespace_id')
::Gitlab::ObjectHierarchy.new(namespaces, options: { skip_ordering: true }).roots
end
end
# rubocop: enable CodeReuse/ActiveRecord
def application_shared_runners_minutes
::Gitlab::CurrentSettings.shared_runners_minutes
end
def traversal_ids_enabled?
::Feature.enabled?(:sync_traversal_ids, default_enabled: :yaml) &&
::Feature.enabled?(:traversal_ids_for_quota_calculation, type: :development, default_enabled: :yaml)
end
override :pre_assign_runner_checks
def pre_assign_runner_checks
super.merge({
......
......@@ -2,14 +2,14 @@
require 'spec_helper'
RSpec.describe Ci::RegisterJobService do
RSpec.describe Ci::RegisterJobService, '#execute' do
let_it_be_with_refind(:shared_runner) { create(:ci_runner, :instance) }
let!(:project) { create(:project, shared_runners_enabled: true) }
let!(:pipeline) { create(:ci_empty_pipeline, project: project) }
let!(:pending_build) { create(:ci_build, :pending, :queued, pipeline: pipeline) }
describe '#execute' do
shared_examples 'namespace minutes quota' do
context 'shared runners minutes limit' do
subject { described_class.new(shared_runner).execute.build }
......@@ -267,4 +267,20 @@ RSpec.describe Ci::RegisterJobService do
end
end
end
context 'when legacy queuing is being used' do
before do
stub_feature_flags(ci_pending_builds_queue_source: false)
end
include_examples 'namespace minutes quota'
end
context 'when new pending builds table is used' do
before do
stub_feature_flags(ci_pending_builds_queue_source: true)
end
include_examples 'namespace minutes quota'
end
end
......@@ -297,7 +297,13 @@ RSpec.describe API::Ci::Runner, :clean_gitlab_redis_shared_state do
end
context 'when job filtered by job_age' do
let!(:job) { create(:ci_build, :pending, :queued, :tag, pipeline: pipeline, name: 'spinach', stage: 'test', stage_idx: 0, queued_at: 60.seconds.ago) }
let!(:job) do
create(:ci_build, :pending, :queued, :tag, pipeline: pipeline, name: 'spinach', stage: 'test', stage_idx: 0, queued_at: 60.seconds.ago)
end
before do
job.queuing_entry&.update!(created_at: 60.seconds.ago)
end
context 'job is queued less than job_age parameter' do
let(:job_age) { 120 }
......
......@@ -269,25 +269,27 @@ module Ci
let!(:unrelated_group_runner) { create(:ci_runner, :group, groups: [unrelated_group]) }
it 'does not consider builds from other group runners' do
expect(described_class.new(group_runner).send(:builds_for_group_runner).count).to eq 6
queue = ::Ci::Queue::BuildQueueService.new(group_runner)
expect(queue.builds_for_group_runner.size).to eq 6
execute(group_runner)
expect(described_class.new(group_runner).send(:builds_for_group_runner).count).to eq 5
expect(queue.builds_for_group_runner.size).to eq 5
execute(group_runner)
expect(described_class.new(group_runner).send(:builds_for_group_runner).count).to eq 4
expect(queue.builds_for_group_runner.size).to eq 4
execute(group_runner)
expect(described_class.new(group_runner).send(:builds_for_group_runner).count).to eq 3
expect(queue.builds_for_group_runner.size).to eq 3
execute(group_runner)
expect(described_class.new(group_runner).send(:builds_for_group_runner).count).to eq 2
expect(queue.builds_for_group_runner.size).to eq 2
execute(group_runner)
expect(described_class.new(group_runner).send(:builds_for_group_runner).count).to eq 1
expect(queue.builds_for_group_runner.size).to eq 1
execute(group_runner)
expect(described_class.new(group_runner).send(:builds_for_group_runner).count).to eq 0
expect(queue.builds_for_group_runner.size).to eq 0
expect(execute(group_runner)).to be_nil
end
end
......@@ -299,7 +301,9 @@ module Ci
end
it 'calls DISTINCT' do
expect(described_class.new(group_runner).send(:builds_for_group_runner).to_sql).to include("DISTINCT")
queue = ::Ci::Queue::BuildQueueService.new(group_runner)
expect(queue.builds_for_group_runner.to_sql).to include("DISTINCT")
end
end
......@@ -310,7 +314,9 @@ module Ci
end
it 'does not call DISTINCT' do
expect(described_class.new(group_runner).send(:builds_for_group_runner).to_sql).not_to include("DISTINCT")
queue = ::Ci::Queue::BuildQueueService.new(group_runner)
expect(queue.builds_for_group_runner.to_sql).not_to include("DISTINCT")
end
end
......@@ -349,8 +355,9 @@ module Ci
let!(:other_build) { create(:ci_build, :pending, :queued, pipeline: pipeline) }
before do
allow_any_instance_of(Ci::RegisterJobService).to receive(:builds_for_project_runner)
.and_return(Ci::Build.where(id: [pending_job, other_build]))
allow_any_instance_of(::Ci::Queue::BuildQueueService)
.to receive(:execute)
.and_return(Ci::Build.where(id: [pending_job, other_build]).pluck(:id))
end
it "receives second build from the queue" do
......@@ -361,8 +368,9 @@ module Ci
context 'when single build is in queue' do
before do
allow_any_instance_of(Ci::RegisterJobService).to receive(:builds_for_project_runner)
.and_return(Ci::Build.where(id: pending_job))
allow_any_instance_of(::Ci::Queue::BuildQueueService)
.to receive(:execute)
.and_return(Ci::Build.where(id: pending_job).pluck(:id))
end
it "does not receive any valid result" do
......@@ -372,8 +380,9 @@ module Ci
context 'when there is no build in queue' do
before do
allow_any_instance_of(Ci::RegisterJobService).to receive(:builds_for_project_runner)
.and_return(Ci::Build.none)
allow_any_instance_of(::Ci::Queue::BuildQueueService)
.to receive(:execute)
.and_return([])
end
it "does not receive builds but result is valid" do
......@@ -721,17 +730,17 @@ module Ci
include_examples 'handles runner assignment'
end
context 'when joining with pending builds table' do
context 'when using pending builds table' do
before do
stub_feature_flags(ci_pending_builds_queue_join: true)
stub_feature_flags(ci_pending_builds_queue_source: true)
end
include_examples 'handles runner assignment'
end
context 'when not joining with pending builds table' do
context 'when not using pending builds table' do
before do
stub_feature_flags(ci_pending_builds_queue_join: false)
stub_feature_flags(ci_pending_builds_queue_source: false)
end
include_examples 'handles runner assignment'
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment