Commit e7420a37 authored by Furkan Ayhan's avatar Furkan Ayhan

Implement allowing empty needs for DAG pipeline jobs

This will allow jobs to have "needs: []" configuration.
Those jobs will start immediately without waiting previous stages
parent 0a8d441b
......@@ -818,7 +818,7 @@ module Ci
depended_jobs = depends_on_builds
# find all jobs that are needed
if Feature.enabled?(:ci_dag_support, project, default_enabled: true) && needs.exists?
if Feature.enabled?(:ci_dag_support, project, default_enabled: true) && scheduling_type_dag?
depended_jobs = depended_jobs.where(name: needs.artifacts.select(:name))
end
......
......@@ -12,6 +12,18 @@ module Ci
scope :preload_needs, -> { preload(:needs) }
scope :with_needs, -> (names = nil) do
needs = Ci::BuildNeed.scoped_build.select(1)
needs = needs.where(name: names) if names
where('EXISTS (?)', needs).preload(:needs)
end
scope :without_needs, -> (names = nil) do
needs = Ci::BuildNeed.scoped_build.select(1)
needs = needs.where(name: names) if names
where('NOT EXISTS (?)', needs)
end
def self.select_with_aggregated_needs(project)
return all unless Feature.enabled?(:ci_dag_support, project, default_enabled: true)
......@@ -26,6 +38,18 @@ module Ci
)
end
# Old processables may have scheduling_type as nil,
# so we need to ensure the data exists before using it.
def self.populate_scheduling_type!
needs = Ci::BuildNeed.scoped_build.select(1)
where(scheduling_type: nil).update_all(
"scheduling_type = CASE WHEN (EXISTS (#{needs.to_sql}))
THEN #{scheduling_types[:dag]}
ELSE #{scheduling_types[:stage]}
END"
)
end
validates :type, presence: true
validates :scheduling_type, presence: true, on: :create, if: :validate_scheduling_type?
......@@ -53,6 +77,11 @@ module Ci
raise NotImplementedError
end
# Overriding scheduling_type enum's method for nil `scheduling_type`s
def scheduling_type_dag?
super || find_legacy_scheduling_type == :dag
end
# scheduling_type column of previous builds/bridges have not been populated,
# so we calculate this value on runtime when we need it.
def find_legacy_scheduling_type
......
......@@ -62,18 +62,6 @@ class CommitStatus < ApplicationRecord
preload(project: :namespace)
end
scope :with_needs, -> (names = nil) do
needs = Ci::BuildNeed.scoped_build.select(1)
needs = needs.where(name: names) if names
where('EXISTS (?)', needs).preload(:needs)
end
scope :without_needs, -> (names = nil) do
needs = Ci::BuildNeed.scoped_build.select(1)
needs = needs.where(name: names) if names
where('NOT EXISTS (?)', needs)
end
scope :match_id_and_lock_version, -> (slice) do
# it expects that items are an array of attributes to match
# each hash needs to have `id` and `lock_version`
......
......@@ -61,7 +61,7 @@ module Ci
Ci::ProcessPipelineService
.new(pipeline)
.execute
.execute(nil, initial_process: true)
end
end
......
......@@ -93,9 +93,9 @@ module Ci
end
def processable_status(processable)
if needs_names = processable.aggregated_needs_names
if Feature.enabled?(:ci_dag_support, project, default_enabled: true) && processable.scheduling_type_dag?
# Processable uses DAG, get status of all dependent needs
@collection.status_for_names(needs_names)
@collection.status_for_names(processable.aggregated_needs_names.to_a)
else
# Processable uses Stages, get status of prior stage
@collection.status_for_prior_stage_position(processable.stage_idx.to_i)
......
......@@ -11,12 +11,13 @@ module Ci
@pipeline = pipeline
end
def execute(trigger_build_ids = nil)
success = process_stages_without_needs
def execute(trigger_build_ids = nil, initial_process: false)
success = process_stages_for_stage_scheduling
# we evaluate dependent needs,
# only when the another job has finished
success = process_builds_with_needs(trigger_build_ids) || success
success = process_dag_builds_without_needs || success if initial_process
success = process_dag_builds_with_needs(trigger_build_ids) || success
@pipeline.update_legacy_status
......@@ -25,23 +26,31 @@ module Ci
private
def process_stages_without_needs
stage_indexes_of_created_processables_without_needs.flat_map do |index|
process_stage_without_needs(index)
def process_stages_for_stage_scheduling
stage_indexes_of_created_stage_scheduled_processables.flat_map do |index|
process_stage_for_stage_scheduling(index)
end.any?
end
def process_stage_without_needs(index)
def process_stage_for_stage_scheduling(index)
current_status = status_for_prior_stages(index)
return unless HasStatus::COMPLETED_STATUSES.include?(current_status)
created_processables_in_stage_without_needs(index).find_each.select do |build|
created_stage_scheduled_processables_in_stage(index).find_each.select do |build|
process_build(build, current_status)
end.any?
end
def process_builds_with_needs(trigger_build_ids)
def process_dag_builds_without_needs
return false unless Feature.enabled?(:ci_dag_support, project, default_enabled: true)
created_processables.scheduling_type_dag.without_needs.each do |build|
process_build(build, 'success')
end
end
def process_dag_builds_with_needs(trigger_build_ids)
return false unless trigger_build_ids.present?
return false unless Feature.enabled?(:ci_dag_support, project, default_enabled: true)
......@@ -56,14 +65,15 @@ module Ci
# Each found processable is guaranteed here to have completed status
created_processables
.scheduling_type_dag
.with_needs(trigger_build_names)
.without_needs(incomplete_build_names)
.find_each
.map(&method(:process_build_with_needs))
.map(&method(:process_dag_build_with_needs))
.any?
end
def process_build_with_needs(build)
def process_dag_build_with_needs(build)
current_status = status_for_build_needs(build.needs.map(&:name))
return unless HasStatus::COMPLETED_STATUSES.include?(current_status)
......@@ -87,23 +97,23 @@ module Ci
end
# rubocop: disable CodeReuse/ActiveRecord
def stage_indexes_of_created_processables_without_needs
created_processables_without_needs.order(:stage_idx)
def stage_indexes_of_created_stage_scheduled_processables
created_stage_scheduled_processables.order(:stage_idx)
.pluck(Arel.sql('DISTINCT stage_idx'))
end
# rubocop: enable CodeReuse/ActiveRecord
def created_processables_in_stage_without_needs(index)
created_processables_without_needs
def created_stage_scheduled_processables_in_stage(index)
created_stage_scheduled_processables
.with_preloads
.for_stage(index)
end
def created_processables_without_needs
def created_stage_scheduled_processables
if Feature.enabled?(:ci_dag_support, project, default_enabled: true)
pipeline.processables.created.without_needs
created_processables.scheduling_type_stage
else
pipeline.processables.created
created_processables
end
end
......
......@@ -8,8 +8,9 @@ module Ci
@pipeline = pipeline
end
def execute(trigger_build_ids = nil)
def execute(trigger_build_ids = nil, initial_process: false)
update_retried
ensure_scheduling_type_for_processables
if Feature.enabled?(:ci_atomic_processing, pipeline.project)
Ci::PipelineProcessing::AtomicProcessingService
......@@ -18,7 +19,7 @@ module Ci
else
Ci::PipelineProcessing::LegacyProcessingService
.new(pipeline)
.execute(trigger_build_ids)
.execute(trigger_build_ids, initial_process: initial_process)
end
end
......@@ -43,5 +44,17 @@ module Ci
.update_all(retried: true) if latest_statuses.any?
end
# rubocop: enable CodeReuse/ActiveRecord
# Set scheduling type of processables if they were created before scheduling_type
# data was deployed (https://gitlab.com/gitlab-org/gitlab/-/merge_requests/22246).
# Given that this service runs multiple times during the pipeline
# life cycle we need to ensure we populate the data once.
# See more: https://gitlab.com/gitlab-org/gitlab/issues/205426
def ensure_scheduling_type_for_processables
lease = Gitlab::ExclusiveLease.new("set-scheduling-types:#{pipeline.id}", timeout: 1.hour.to_i)
return unless lease.try_obtain
pipeline.processables.populate_scheduling_type!
end
end
end
......@@ -36,7 +36,7 @@ module Ci
Ci::ProcessPipelineService
.new(pipeline)
.execute(completed_build_ids)
.execute(completed_build_ids, initial_process: true)
end
end
end
---
title: Implement allowing empty needs for jobs in DAG pipelines
merge_request: 22246
author:
type: added
......@@ -2248,6 +2248,7 @@ and bring back the old behavior.
> - [Introduced](https://gitlab.com/gitlab-org/gitlab-foss/issues/47063) in GitLab 12.2.
> - In GitLab 12.3, maximum number of jobs in `needs` array raised from five to 50.
> - [Introduced](https://gitlab.com/gitlab-org/gitlab/issues/30631) in GitLab 12.8, `needs: []` lets jobs start immediately.
The `needs:` keyword enables executing jobs out-of-order, allowing you to implement
a [directed acyclic graph](../directed_acyclic_graph/index.md) in your `.gitlab-ci.yml`.
......@@ -2264,6 +2265,10 @@ linux:build:
mac:build:
stage: build
lint:
stage: test
needs: []
linux:rspec:
stage: test
needs: ["linux:build"]
......@@ -2284,7 +2289,9 @@ production:
stage: deploy
```
This example creates three paths of execution:
This example creates four paths of execution:
- Linter: the `lint` job will run immediately without waiting for the `build` stage to complete because it has no needs (`needs: []`).
- Linux path: the `linux:rspec` and `linux:rubocop` jobs will be run as soon
as the `linux:build` job finishes without waiting for `mac:build` to finish.
......@@ -2308,9 +2315,6 @@ This example creates three paths of execution:
- For self-managed instances, the limit is:
- 10, if the `ci_dag_limit_needs` feature flag is enabled (default).
- 50, if the `ci_dag_limit_needs` feature flag is disabled.
- It is impossible for now to have `needs: []` (empty needs), the job always needs to
depend on something, unless this is the job in the first stage. However, support for
an empty needs array [is planned](https://gitlab.com/gitlab-org/gitlab/issues/30631).
- If `needs:` refers to a job that is marked as `parallel:`.
the current job will depend on all parallel jobs created.
- `needs:` is similar to `dependencies:` in that it needs to use jobs from prior stages,
......
......@@ -32,7 +32,7 @@ module Ci
Ci::ProcessPipelineService
.new(pipeline)
.execute
.execute(nil, initial_process: true)
pipeline_created_counter.increment(source: :webide)
end
......
......@@ -55,4 +55,29 @@ describe Ci::CreatePipelineService do
expect(bridge.scheduling_type).to eq('dag')
end
context 'when needs is empty array' do
let(:config) do
<<~YAML
regular_job:
stage: build
script: echo 'hello'
bridge_dag_job:
stage: test
needs: []
trigger: 'some/project'
YAML
end
it 'creates a pipeline with regular_job and bridge_dag_job pending' do
processables = execute.processables
regular_job = processables.find { |processable| processable.name == 'regular_job' }
bridge_dag_job = processables.find { |processable| processable.name == 'bridge_dag_job' }
expect(execute).to be_persisted
expect(regular_job.status).to eq('pending')
expect(bridge_dag_job.status).to eq('pending')
end
end
end
......@@ -11,12 +11,14 @@ module Gitlab
include ::Gitlab::Config::Entry::Validatable
validations do
validates :config, presence: true
validate do
unless config.is_a?(Hash) || config.is_a?(Array)
errors.add(:config, 'can only be a Hash or an Array')
end
if config.is_a?(Hash) && config.empty?
errors.add(:config, 'can not be an empty Hash')
end
end
validate on: :composed do
......
......@@ -2,7 +2,7 @@
FactoryBot.define do
factory :ci_build_need, class: 'Ci::BuildNeed' do
build factory: :ci_build
build factory: :ci_build, scheduling_type: :dag
sequence(:name) { |n| "build_#{n}" }
end
end
......@@ -762,8 +762,10 @@ describe Ci::Build do
let(:needs) { }
let!(:final) do
scheduling_type = needs.present? ? :dag : :stage
create(:ci_build,
pipeline: pipeline, name: 'final',
pipeline: pipeline, name: 'final', scheduling_type: scheduling_type,
stage_idx: 3, stage: 'deploy', options: {
dependencies: dependencies
}
......
......@@ -120,4 +120,29 @@ describe Ci::Processable do
end
end
end
describe '.populate_scheduling_type!' do
let!(:build_without_needs) { create(:ci_build, project: project, pipeline: pipeline) }
let!(:build_with_needs) { create(:ci_build, project: project, pipeline: pipeline) }
let!(:needs_relation) { create(:ci_build_need, build: build_with_needs) }
let!(:another_build) { create(:ci_build, project: project) }
before do
Ci::Processable.update_all(scheduling_type: nil)
end
it 'populates scheduling_type of processables' do
expect do
pipeline.processables.populate_scheduling_type!
end.to change(pipeline.processables.where(scheduling_type: nil), :count).from(2).to(0)
expect(build_without_needs.reload.scheduling_type).to eq('stage')
expect(build_with_needs.reload.scheduling_type).to eq('dag')
end
it 'does not affect processables from other pipelines' do
pipeline.processables.populate_scheduling_type!
expect(another_build.reload.scheduling_type).to be_nil
end
end
end
......@@ -175,5 +175,67 @@ describe Ci::CreatePipelineService do
.to eq('jobs:test_a:needs:need artifacts should be a boolean value')
end
end
context 'when needs is empty array' do
let(:config) do
<<~YAML
build_a:
stage: build
script: ls
test_a:
stage: test
script: ls
test_b:
stage: test
script: ls
needs: []
deploy_a:
stage: deploy
script: ls
needs: [test_a]
deploy_b:
stage: deploy
script: ls
when: manual
needs: []
YAML
end
it 'creates a pipeline with build_a and test_b pending; deploy_b manual' do
processables = pipeline.processables
build_a = processables.find { |processable| processable.name == 'build_a' }
test_a = processables.find { |processable| processable.name == 'test_a' }
test_b = processables.find { |processable| processable.name == 'test_b' }
deploy_a = processables.find { |processable| processable.name == 'deploy_a' }
deploy_b = processables.find { |processable| processable.name == 'deploy_b' }
expect(pipeline).to be_persisted
expect(build_a.status).to eq('pending')
expect(test_a.status).to eq('created')
expect(test_b.status).to eq('pending')
expect(deploy_a.status).to eq('created')
expect(deploy_b.status).to eq('manual')
end
end
context 'when needs is empty hash' do
let(:config) do
<<~YAML
regular_job:
stage: build
script: echo 'hello'
invalid_dag_job:
stage: test
script: ls
needs: {}
YAML
end
it 'raises error' do
expect(pipeline.yaml_errors)
.to eq('jobs:invalid_dag_job:needs config can not be an empty hash')
end
end
end
end
......@@ -9,4 +9,10 @@ describe Ci::PipelineProcessing::AtomicProcessingService do
end
it_behaves_like 'Pipeline Processing Service'
private
def process_pipeline(initial_process: false)
described_class.new(pipeline).execute
end
end
......@@ -9,4 +9,10 @@ describe Ci::PipelineProcessing::LegacyProcessingService do
end
it_behaves_like 'Pipeline Processing Service'
private
def process_pipeline(initial_process: false)
described_class.new(pipeline).execute(initial_process: initial_process)
end
end
......@@ -710,10 +710,10 @@ shared_examples 'Pipeline Processing Service' do
context 'when pipeline with needs is created', :sidekiq_inline do
let!(:linux_build) { create_build('linux:build', stage: 'build', stage_idx: 0) }
let!(:mac_build) { create_build('mac:build', stage: 'build', stage_idx: 0) }
let!(:linux_rspec) { create_build('linux:rspec', stage: 'test', stage_idx: 1) }
let!(:linux_rubocop) { create_build('linux:rubocop', stage: 'test', stage_idx: 1) }
let!(:mac_rspec) { create_build('mac:rspec', stage: 'test', stage_idx: 1) }
let!(:mac_rubocop) { create_build('mac:rubocop', stage: 'test', stage_idx: 1) }
let!(:linux_rspec) { create_build('linux:rspec', stage: 'test', stage_idx: 1, scheduling_type: :dag) }
let!(:linux_rubocop) { create_build('linux:rubocop', stage: 'test', stage_idx: 1, scheduling_type: :dag) }
let!(:mac_rspec) { create_build('mac:rspec', stage: 'test', stage_idx: 1, scheduling_type: :dag) }
let!(:mac_rubocop) { create_build('mac:rubocop', stage: 'test', stage_idx: 1, scheduling_type: :dag) }
let!(:deploy) { create_build('deploy', stage: 'deploy', stage_idx: 2) }
let!(:linux_rspec_on_build) { create(:ci_build_need, build: linux_rspec, name: 'linux:build') }
......@@ -795,7 +795,7 @@ shared_examples 'Pipeline Processing Service' do
end
context 'when one of the jobs is run on a failure' do
let!(:linux_notify) { create_build('linux:notify', stage: 'deploy', stage_idx: 2, when: 'on_failure') }
let!(:linux_notify) { create_build('linux:notify', stage: 'deploy', stage_idx: 2, when: 'on_failure', scheduling_type: :dag) }
let!(:linux_notify_on_build) { create(:ci_build_need, build: linux_notify, name: 'linux:build') }
......@@ -837,10 +837,47 @@ shared_examples 'Pipeline Processing Service' do
end
end
end
end
def process_pipeline
described_class.new(pipeline).execute
context 'when there is a job scheduled with dag but no need (needs: [])' do
let!(:deploy_pages) { create_build('deploy_pages', stage: 'deploy', stage_idx: 2, scheduling_type: :dag) }
it 'runs deploy_pages without waiting prior stages' do
# Ci::PipelineProcessing::LegacyProcessingService requires :initial_process parameter
expect(process_pipeline(initial_process: true)).to be_truthy
expect(stages).to eq(%w(pending created pending))
expect(builds.pending).to contain_exactly(linux_build, mac_build, deploy_pages)
linux_build.reset.success!
deploy_pages.reset.success!
expect(stages).to eq(%w(running pending running))
expect(builds.success).to contain_exactly(linux_build, deploy_pages)
expect(builds.pending).to contain_exactly(mac_build, linux_rspec, linux_rubocop)
linux_rspec.reset.success!
linux_rubocop.reset.success!
mac_build.reset.success!
mac_rspec.reset.success!
mac_rubocop.reset.success!
expect(stages).to eq(%w(success success running))
expect(builds.pending).to contain_exactly(deploy)
end
context 'when ci_dag_support is disabled' do
before do
stub_feature_flags(ci_dag_support: false)
end
it 'does run deploy_pages at the start' do
expect(process_pipeline).to be_truthy
expect(stages).to eq(%w(pending created created))
expect(builds.pending).to contain_exactly(linux_build, mac_build)
end
end
end
end
def all_builds
......
......@@ -33,6 +33,25 @@ describe Ci::ProcessPipelineService do
end
end
context 'with a pipeline which has processables with nil scheduling_type', :clean_gitlab_redis_shared_state do
let!(:build1) { create_build('build1') }
let!(:build2) { create_build('build2') }
let!(:build3) { create_build('build3', scheduling_type: :dag) }
let!(:build3_on_build2) { create(:ci_build_need, build: build3, name: 'build2') }
before do
pipeline.processables.update_all(scheduling_type: nil)
end
it 'populates scheduling_type before processing' do
process_pipeline
expect(build1.scheduling_type).to eq('stage')
expect(build2.scheduling_type).to eq('stage')
expect(build3.scheduling_type).to eq('dag')
end
end
def process_pipeline
described_class.new(pipeline).execute
end
......
......@@ -95,7 +95,7 @@ describe Ci::RetryPipelineService, '#execute' do
before do
create_build('build', :success, 0)
create_build('build2', :success, 0)
test_build = create_build('test', :failed, 1)
test_build = create_build('test', :failed, 1, scheduling_type: :dag)
create(:ci_build_need, build: test_build, name: 'build')
create(:ci_build_need, build: test_build, name: 'build2')
end
......@@ -108,6 +108,21 @@ describe Ci::RetryPipelineService, '#execute' do
expect(build('test')).to be_pending
expect(build('test').needs.map(&:name)).to match_array(%w(build build2))
end
context 'when there is a failed DAG test without needs' do
before do
create_build('deploy', :failed, 2, scheduling_type: :dag)
end
it 'retries the test' do
service.execute(pipeline)
expect(build('build')).to be_success
expect(build('build2')).to be_success
expect(build('test')).to be_pending
expect(build('deploy')).to be_pending
end
end
end
context 'when the last stage was skipepd' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment