Commit 3e706636 authored by Kamil Trzciński's avatar Kamil Trzciński

Fetch build one-by-one

This is based on hypothesis:

- Long queues inflate memory allocation
- Long queues make us go very deep into a queue
  as that build could already be picked by another
  runner, thus increasing amount of retries
parent e294fa99
......@@ -9,6 +9,7 @@ module Ci
include FromUnion
include TokenAuthenticatable
include IgnorableColumns
include FeatureGate
add_authentication_token_field :token, encrypted: -> { Feature.enabled?(:ci_runners_tokens_optional_encryption, default_enabled: true) ? :optional : :required }
......
......@@ -23,36 +23,11 @@ module Ci
private
# rubocop: disable CodeReuse/ActiveRecord
def process_queue(params)
builds =
if runner.instance_type?
builds_for_shared_runner
elsif runner.group_type?
builds_for_group_runner
else
builds_for_project_runner
end
# pick builds that does not have other tags than runner's one
builds = builds.matches_tag_ids(runner.tags.ids)
# pick builds that have at least one tag
unless runner.run_untagged?
builds = builds.with_any_tags
end
# pick builds that older than specified age
if params.key?(:job_age)
builds = builds.queued_before(params[:job_age].seconds.ago)
end
@metrics.observe_queue_size(-> { builds.to_a.size })
valid = true
depth = 0
builds.each do |build|
each_build(params) do |build|
depth += 1
@metrics.increment_queue_operation(:queue_iteration)
......@@ -78,9 +53,53 @@ module Ci
Result.new(nil, nil, valid)
end
# rubocop: disable CodeReuse/ActiveRecord
def each_build(params, &blk)
builds =
if runner.instance_type?
builds_for_shared_runner
elsif runner.group_type?
builds_for_group_runner
else
builds_for_project_runner
end
# pick builds that does not have other tags than runner's one
builds = builds.matches_tag_ids(runner.tags.ids)
# pick builds that have at least one tag
unless runner.run_untagged?
builds = builds.with_any_tags
end
# pick builds that older than specified age
if params.key?(:job_age)
builds = builds.queued_before(params[:job_age].seconds.ago)
end
if Feature.enabled?(:ci_register_job_service_one_by_one, runner)
build_ids = builds.pluck(:id)
@metrics.observe_queue_size(-> { build_ids.size })
build_ids.each do |build_id|
yield Ci::Build.find(build_id)
end
else
@metrics.observe_queue_size(-> { builds.to_a.size })
builds.each(&blk)
end
end
# rubocop: enable CodeReuse/ActiveRecord
def process_build(build, params)
unless build.pending?
@metrics.increment_queue_operation(:build_not_pending)
return
end
if runner.can_pick?(build)
@metrics.increment_queue_operation(:build_can_pick)
else
......
---
name: ci_register_job_service_one_by_one
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/55194
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/323177
milestone: '13.10'
type: development
group: group::memory
default_enabled: false
......@@ -19,6 +19,7 @@ module Gitlab
OPERATION_COUNTERS = [
:build_can_pick,
:build_not_pick,
:build_not_pending,
:build_conflict_lock,
:build_conflict_exception,
:build_conflict_transition,
......
......@@ -13,6 +13,7 @@ module Ci
let!(:pending_job) { create(:ci_build, pipeline: pipeline) }
describe '#execute' do
shared_examples 'handles runner assignment' do
context 'runner follow tag list' do
it "picks build with the same tag" do
pending_job.update!(tag_list: ["linux"])
......@@ -581,6 +582,47 @@ module Ci
expect(pending_job).to be_running
end
end
context 'when only some builds can be matched by runner' do
let!(:specific_runner) { create(:ci_runner, :project, projects: [project], tag_list: %w[matching]) }
let!(:pending_job) { create(:ci_build, pipeline: pipeline, tag_list: %w[matching]) }
before do
# create additional matching and non-matching jobs
create_list(:ci_build, 2, pipeline: pipeline, tag_list: %w[matching])
create(:ci_build, pipeline: pipeline, tag_list: %w[non-matching])
end
it "observes queue size of only matching jobs" do
# pending_job + 2 x matching ones
expect(Gitlab::Ci::Queue::Metrics.queue_size_total).to receive(:observe).with({}, 3)
expect(execute(specific_runner)).to eq(pending_job)
end
end
end
context 'when ci_register_job_service_one_by_one is enabled' do
before do
stub_feature_flags(ci_register_job_service_one_by_one: true)
end
it 'picks builds one-by-one' do
expect(Ci::Build).to receive(:find).with(pending_job.id).and_call_original
expect(execute(specific_runner)).to eq(pending_job)
end
include_examples 'handles runner assignment'
end
context 'when ci_register_job_service_one_by_one is disabled' do
before do
stub_feature_flags(ci_register_job_service_one_by_one: false)
end
include_examples 'handles runner assignment'
end
end
describe '#register_success' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment