Commit 71c4c43f authored by Furkan Ayhan's avatar Furkan Ayhan Committed by Grzegorz Bizon

Remove legacy pipeline processing service and FF ci_atomic_processing

This is the first iteration of removing legacy pipeline processing.
With this, we will only have atomic processing for pipelines.
parent 89e6e12f
...@@ -418,22 +418,6 @@ module Ci ...@@ -418,22 +418,6 @@ module Ci
false false
end end
def ordered_stages
if ::Gitlab::Ci::Features.atomic_processing?(project)
# The `Ci::Stage` contains all up-to date data
# as atomic processing updates all data in-bulk
stages
elsif complete?
# The `Ci::Stage` contains up-to date data only for `completed` pipelines
# this is due to asynchronous processing of pipeline, and stages possibly
# not updated inline with processing of pipeline
stages
else
# In other cases, we need to calculate stages dynamically
legacy_stages
end
end
def legacy_stages_using_sql def legacy_stages_using_sql
# TODO, this needs refactoring, see gitlab-foss#26481. # TODO, this needs refactoring, see gitlab-foss#26481.
stages_query = statuses stages_query = statuses
...@@ -470,6 +454,7 @@ module Ci ...@@ -470,6 +454,7 @@ module Ci
triggered_pipelines.preload(:source_job) triggered_pipelines.preload(:source_job)
end end
# TODO: Remove usage of this method in templates
def legacy_stages def legacy_stages
if ::Gitlab::Ci::Features.composite_status?(project) if ::Gitlab::Ci::Features.composite_status?(project)
legacy_stages_using_composite_status legacy_stages_using_composite_status
...@@ -1048,10 +1033,6 @@ module Ci ...@@ -1048,10 +1033,6 @@ module Ci
@persistent_ref ||= PersistentRef.new(pipeline: self) @persistent_ref ||= PersistentRef.new(pipeline: self)
end end
def find_successful_build_ids_by_names(names)
statuses.latest.success.where(name: names).pluck(:id)
end
def cacheable? def cacheable?
Ci::PipelineEnums.ci_config_sources.key?(config_source.to_sym) Ci::PipelineEnums.ci_config_sources.key?(config_source.to_sym)
end end
......
...@@ -100,9 +100,7 @@ class CommitStatus < ApplicationRecord ...@@ -100,9 +100,7 @@ class CommitStatus < ApplicationRecord
# will not be refreshed to pick the change # will not be refreshed to pick the change
self.processed_will_change! self.processed_will_change!
if !::Gitlab::Ci::Features.atomic_processing?(project) if latest?
self.processed = nil
elsif latest?
self.processed = false # force refresh of all dependent ones self.processed = false # force refresh of all dependent ones
elsif retried? elsif retried?
self.processed = true # retried are considered to be already processed self.processed = true # retried are considered to be already processed
...@@ -164,8 +162,7 @@ class CommitStatus < ApplicationRecord ...@@ -164,8 +162,7 @@ class CommitStatus < ApplicationRecord
next unless commit_status.project next unless commit_status.project
commit_status.run_after_commit do commit_status.run_after_commit do
schedule_stage_and_pipeline_update PipelineProcessWorker.perform_async(pipeline_id)
ExpireJobCacheWorker.perform_async(id) ExpireJobCacheWorker.perform_async(id)
end end
end end
...@@ -186,14 +183,6 @@ class CommitStatus < ApplicationRecord ...@@ -186,14 +183,6 @@ class CommitStatus < ApplicationRecord
select(:name) select(:name)
end end
def self.status_for_prior_stages(index, project:)
before_stage(index).latest.slow_composite_status(project: project) || 'success'
end
def self.status_for_names(names, project:)
where(name: names).latest.slow_composite_status(project: project) || 'success'
end
def self.update_as_processed! def self.update_as_processed!
# Marks items as processed # Marks items as processed
# we do not increase `lock_version`, as we are the one # we do not increase `lock_version`, as we are the one
...@@ -286,21 +275,6 @@ class CommitStatus < ApplicationRecord ...@@ -286,21 +275,6 @@ class CommitStatus < ApplicationRecord
def unrecoverable_failure? def unrecoverable_failure?
script_failure? || missing_dependency_failure? || archived_failure? || scheduler_failure? || data_integrity_failure? script_failure? || missing_dependency_failure? || archived_failure? || scheduler_failure? || data_integrity_failure?
end end
def schedule_stage_and_pipeline_update
if ::Gitlab::Ci::Features.atomic_processing?(project)
# Atomic Processing requires only single Worker
PipelineProcessWorker.perform_async(pipeline_id, [id])
else
if complete? || manual?
PipelineProcessWorker.perform_async(pipeline_id, [id])
else
PipelineUpdateWorker.perform_async(pipeline_id)
end
StageUpdateWorker.perform_async(stage_id)
end
end
end end
CommitStatus.prepend_if_ee('::EE::CommitStatus') CommitStatus.prepend_if_ee('::EE::CommitStatus')
...@@ -25,7 +25,7 @@ class MergeRequests::PipelineEntity < Grape::Entity ...@@ -25,7 +25,7 @@ class MergeRequests::PipelineEntity < Grape::Entity
pipeline.detailed_status(request.current_user) pipeline.detailed_status(request.current_user)
end end
expose :ordered_stages, as: :stages, using: StageEntity expose :stages, using: StageEntity
end end
# Coverage isn't always necessary (e.g. when displaying project pipelines in # Coverage isn't always necessary (e.g. when displaying project pipelines in
......
...@@ -36,7 +36,7 @@ class PipelineEntity < Grape::Entity ...@@ -36,7 +36,7 @@ class PipelineEntity < Grape::Entity
expose :details do expose :details do
expose :detailed_status, as: :status, with: DetailedStatusEntity expose :detailed_status, as: :status, with: DetailedStatusEntity
expose :ordered_stages, as: :stages, using: StageEntity expose :stages, using: StageEntity
expose :duration expose :duration
expose :finished_at expose :finished_at
expose :name expose :name
......
...@@ -24,8 +24,8 @@ class TriggeredPipelineEntity < Grape::Entity ...@@ -24,8 +24,8 @@ class TriggeredPipelineEntity < Grape::Entity
expose :details do expose :details do
expose :detailed_status, as: :status, with: DetailedStatusEntity expose :detailed_status, as: :status, with: DetailedStatusEntity
expose :ordered_stages, expose :stages,
as: :stages, using: StageEntity, using: StageEntity,
if: -> (_, opts) { can_read_details? && expand?(opts) } if: -> (_, opts) { can_read_details? && expand?(opts) }
end end
......
...@@ -32,7 +32,7 @@ module Ci ...@@ -32,7 +32,7 @@ module Ci
Ci::ProcessPipelineService Ci::ProcessPipelineService
.new(pipeline) .new(pipeline)
.execute(nil, initial_process: true) .execute
pipeline_created_counter.increment(source: :webide) pipeline_created_counter.increment(source: :webide)
end end
......
# frozen_string_literal: true
module Ci
module PipelineProcessing
class LegacyProcessingService
include Gitlab::Utils::StrongMemoize
attr_reader :pipeline
def initialize(pipeline)
@pipeline = pipeline
end
def execute(trigger_build_ids = nil, initial_process: false)
success = process_stages_for_stage_scheduling
# we evaluate dependent needs,
# only when the another job has finished
success = process_dag_builds_without_needs || success if initial_process
success = process_dag_builds_with_needs(trigger_build_ids) || success
@pipeline.update_legacy_status
success
end
private
def process_stages_for_stage_scheduling
stage_indexes_of_created_stage_scheduled_processables.flat_map do |index|
process_stage_for_stage_scheduling(index)
end.any?
end
def process_stage_for_stage_scheduling(index)
current_status = status_for_prior_stages(index)
return unless Ci::HasStatus::COMPLETED_STATUSES.include?(current_status)
created_stage_scheduled_processables_in_stage(index).find_each.select do |build|
process_build(build, current_status)
end.any?
end
def process_dag_builds_without_needs
created_processables.scheduling_type_dag.without_needs.each do |build|
process_build(build, 'success')
end
end
def process_dag_builds_with_needs(trigger_build_ids)
return false unless trigger_build_ids.present?
# we find processables that are dependent:
# 1. because of current dependency,
trigger_build_names = pipeline.processables.latest
.for_ids(trigger_build_ids).names
# 2. does not have builds that not yet complete
incomplete_build_names = pipeline.processables.latest
.incomplete.names
# Each found processable is guaranteed here to have completed status
created_processables
.scheduling_type_dag
.with_needs(trigger_build_names)
.without_needs(incomplete_build_names)
.find_each
.map(&method(:process_dag_build_with_needs))
.any?
end
def process_dag_build_with_needs(build)
current_status = status_for_build_needs(build.needs.map(&:name))
return unless Ci::HasStatus::COMPLETED_STATUSES.include?(current_status)
process_build(build, current_status)
end
def process_build(build, current_status)
Gitlab::OptimisticLocking.retry_lock(build) do |subject|
Ci::ProcessBuildService.new(project, subject.user)
.execute(subject, current_status)
end
end
def status_for_prior_stages(index)
pipeline.processables.status_for_prior_stages(index, project: pipeline.project)
end
def status_for_build_needs(needs)
pipeline.processables.status_for_names(needs, project: pipeline.project)
end
# rubocop: disable CodeReuse/ActiveRecord
def stage_indexes_of_created_stage_scheduled_processables
created_stage_scheduled_processables.order(:stage_idx)
.pluck(Arel.sql('DISTINCT stage_idx'))
end
# rubocop: enable CodeReuse/ActiveRecord
def created_stage_scheduled_processables_in_stage(index)
created_stage_scheduled_processables
.with_preloads
.for_stage(index)
end
def created_stage_scheduled_processables
created_processables.scheduling_type_stage
end
def created_processables
pipeline.processables.created
end
def project
pipeline.project
end
end
end
end
...@@ -8,20 +8,14 @@ module Ci ...@@ -8,20 +8,14 @@ module Ci
@pipeline = pipeline @pipeline = pipeline
end end
def execute(trigger_build_ids = nil, initial_process: false) def execute
increment_processing_counter increment_processing_counter
update_retried update_retried
if ::Gitlab::Ci::Features.atomic_processing?(pipeline.project)
Ci::PipelineProcessing::AtomicProcessingService Ci::PipelineProcessing::AtomicProcessingService
.new(pipeline) .new(pipeline)
.execute .execute
else
Ci::PipelineProcessing::LegacyProcessingService
.new(pipeline)
.execute(trigger_build_ids, initial_process: initial_process)
end
end end
def metrics def metrics
......
...@@ -22,12 +22,6 @@ module Ci ...@@ -22,12 +22,6 @@ module Ci
needs += build.needs.map(&:name) needs += build.needs.map(&:name)
end end
# In a DAG, the dependencies may have already completed. Figure out
# which builds have succeeded and use them to update the pipeline. If we don't
# do this, then builds will be stuck in the created state since their dependencies
# will never run.
completed_build_ids = pipeline.find_successful_build_ids_by_names(needs) if needs.any?
pipeline.builds.latest.skipped.find_each do |skipped| pipeline.builds.latest.skipped.find_each do |skipped|
retry_optimistic_lock(skipped) { |build| build.process } retry_optimistic_lock(skipped) { |build| build.process }
end end
...@@ -38,7 +32,7 @@ module Ci ...@@ -38,7 +32,7 @@ module Ci
Ci::ProcessPipelineService Ci::ProcessPipelineService
.new(pipeline) .new(pipeline)
.execute(completed_build_ids, initial_process: true) .execute
end end
end end
end end
...@@ -10,11 +10,13 @@ class PipelineProcessWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -10,11 +10,13 @@ class PipelineProcessWorker # rubocop:disable Scalability/IdempotentWorker
loggable_arguments 1 loggable_arguments 1
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(pipeline_id, build_ids = nil) # `_build_ids` is deprecated and will be removed in 14.0
# See: https://gitlab.com/gitlab-org/gitlab/-/issues/232806
def perform(pipeline_id, _build_ids = nil)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline| Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
Ci::ProcessPipelineService Ci::ProcessPipelineService
.new(pipeline) .new(pipeline)
.execute(build_ids) .execute
end end
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
......
# frozen_string_literal: true # frozen_string_literal: true
# This worker is deprecated and will be removed in 14.0
# See: https://gitlab.com/gitlab-org/gitlab/-/issues/232806
class PipelineUpdateWorker class PipelineUpdateWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
...@@ -9,7 +11,7 @@ class PipelineUpdateWorker ...@@ -9,7 +11,7 @@ class PipelineUpdateWorker
idempotent! idempotent!
def perform(pipeline_id) def perform(_pipeline_id)
Ci::Pipeline.find_by_id(pipeline_id)&.update_legacy_status # no-op
end end
end end
---
title: Remove legacy pipeline processing service and FF ci_atomic_processing
merge_request: 37339
author:
type: other
...@@ -19,16 +19,7 @@ RSpec.describe Ci::ProcessPipelineService, '#execute' do ...@@ -19,16 +19,7 @@ RSpec.describe Ci::ProcessPipelineService, '#execute' do
end end
describe 'cross-project pipelines' do describe 'cross-project pipelines' do
using RSpec::Parameterized::TableSyntax
where(:ci_atomic_processing) do
[true, false]
end
with_them do
before do before do
stub_feature_flags(ci_atomic_processing: ci_atomic_processing)
create_processable(:build, name: 'test', stage: 'test') create_processable(:build, name: 'test', stage: 'test')
create_processable(:bridge, :variables, name: 'cross', create_processable(:bridge, :variables, name: 'cross',
stage: 'build', stage: 'build',
...@@ -56,7 +47,6 @@ RSpec.describe Ci::ProcessPipelineService, '#execute' do ...@@ -56,7 +47,6 @@ RSpec.describe Ci::ProcessPipelineService, '#execute' do
.to include(key: 'BRIDGE', value: 'cross', public: false, masked: false) .to include(key: 'BRIDGE', value: 'cross', public: false, masked: false)
end end
end end
end
def expect_statuses(*expected) def expect_statuses(*expected)
statuses = pipeline.statuses statuses = pipeline.statuses
......
...@@ -22,10 +22,6 @@ module Gitlab ...@@ -22,10 +22,6 @@ module Gitlab
::Feature.enabled?(:ci_composite_status, project, default_enabled: true) ::Feature.enabled?(:ci_composite_status, project, default_enabled: true)
end end
def self.atomic_processing?(project)
::Feature.enabled?(:ci_atomic_processing, project, default_enabled: true)
end
def self.pipeline_latest? def self.pipeline_latest?
::Feature.enabled?(:ci_pipeline_latest, default_enabled: true) ::Feature.enabled?(:ci_pipeline_latest, default_enabled: true)
end end
......
...@@ -10,7 +10,7 @@ module Gitlab ...@@ -10,7 +10,7 @@ module Gitlab
def perform! def perform!
::Ci::ProcessPipelineService ::Ci::ProcessPipelineService
.new(@pipeline) .new(@pipeline)
.execute(nil, initial_process: true) .execute
end end
def break? def break?
......
...@@ -1044,19 +1044,6 @@ RSpec.describe Ci::Pipeline, :mailer do ...@@ -1044,19 +1044,6 @@ RSpec.describe Ci::Pipeline, :mailer do
end end
describe '#stages' do describe '#stages' do
before do
create(:ci_stage_entity, project: project,
pipeline: pipeline,
name: 'build')
end
it 'returns persisted stages' do
expect(pipeline.stages).not_to be_empty
expect(pipeline.stages).to all(be_persisted)
end
end
describe '#ordered_stages' do
before do before do
create(:ci_stage_entity, project: project, create(:ci_stage_entity, project: project,
pipeline: pipeline, pipeline: pipeline,
...@@ -1086,14 +1073,7 @@ RSpec.describe Ci::Pipeline, :mailer do ...@@ -1086,14 +1073,7 @@ RSpec.describe Ci::Pipeline, :mailer do
name: 'cleanup') name: 'cleanup')
end end
subject { pipeline.ordered_stages } subject { pipeline.stages }
context 'when using atomic processing' do
before do
stub_feature_flags(
ci_atomic_processing: true
)
end
context 'when pipelines is not complete' do context 'when pipelines is not complete' do
it 'returns stages in valid order' do it 'returns stages in valid order' do
...@@ -1115,34 +1095,6 @@ RSpec.describe Ci::Pipeline, :mailer do ...@@ -1115,34 +1095,6 @@ RSpec.describe Ci::Pipeline, :mailer do
end end
end end
end end
context 'when using persisted stages' do
before do
stub_feature_flags(
ci_atomic_processing: false
)
end
context 'when pipelines is not complete' do
it 'still returns legacy stages' do
expect(subject).to all(be_a Ci::LegacyStage)
expect(subject.map(&:name)).to eq %w[build test]
end
end
context 'when pipeline is complete' do
before do
pipeline.succeed!
end
it 'returns stages in valid order' do
expect(subject).to all(be_a Ci::Stage)
expect(subject.map(&:name))
.to eq %w[sanity build test deploy cleanup]
end
end
end
end
end end
describe 'state machine' do describe 'state machine' do
......
...@@ -66,21 +66,6 @@ RSpec.describe CommitStatus do ...@@ -66,21 +66,6 @@ RSpec.describe CommitStatus do
describe '#processed' do describe '#processed' do
subject { commit_status.processed } subject { commit_status.processed }
context 'when ci_atomic_processing is disabled' do
before do
stub_feature_flags(ci_atomic_processing: false)
commit_status.save!
end
it { is_expected.to be_nil }
end
context 'when ci_atomic_processing is enabled' do
before do
stub_feature_flags(ci_atomic_processing: true)
end
context 'status is latest' do context 'status is latest' do
before do before do
commit_status.update!(retried: false, status: :pending) commit_status.update!(retried: false, status: :pending)
...@@ -112,7 +97,6 @@ RSpec.describe CommitStatus do ...@@ -112,7 +97,6 @@ RSpec.describe CommitStatus do
expect(CommitStatus.find(commit_status.id).processed).to eq(false) expect(CommitStatus.find(commit_status.id).processed).to eq(false)
end end
end end
end
describe '#started?' do describe '#started?' do
subject { commit_status.started? } subject { commit_status.started? }
......
...@@ -5,20 +5,12 @@ require_relative 'shared_processing_service.rb' ...@@ -5,20 +5,12 @@ require_relative 'shared_processing_service.rb'
require_relative 'shared_processing_service_tests_with_yaml.rb' require_relative 'shared_processing_service_tests_with_yaml.rb'
RSpec.describe Ci::PipelineProcessing::AtomicProcessingService do RSpec.describe Ci::PipelineProcessing::AtomicProcessingService do
before do
stub_feature_flags(ci_atomic_processing: true)
# This feature flag is implicit
# Atomic Processing does not process statuses differently
stub_feature_flags(ci_composite_status: true)
end
it_behaves_like 'Pipeline Processing Service' it_behaves_like 'Pipeline Processing Service'
it_behaves_like 'Pipeline Processing Service Tests With Yaml' it_behaves_like 'Pipeline Processing Service Tests With Yaml'
private private
def process_pipeline(initial_process: false) def process_pipeline
described_class.new(pipeline).execute described_class.new(pipeline).execute
end end
end end
# frozen_string_literal: true
require 'spec_helper'
require_relative 'shared_processing_service.rb'
require_relative 'shared_processing_service_tests_with_yaml.rb'
RSpec.describe Ci::PipelineProcessing::LegacyProcessingService do
before do
stub_feature_flags(ci_atomic_processing: false)
end
context 'when ci_composite_status is enabled' do
before do
stub_feature_flags(ci_composite_status: true)
end
it_behaves_like 'Pipeline Processing Service'
it_behaves_like 'Pipeline Processing Service Tests With Yaml'
end
context 'when ci_composite_status is disabled' do
before do
stub_feature_flags(ci_composite_status: false)
end
it_behaves_like 'Pipeline Processing Service'
it_behaves_like 'Pipeline Processing Service Tests With Yaml'
end
private
def process_pipeline(initial_process: false)
described_class.new(pipeline).execute(initial_process: initial_process)
end
end
...@@ -788,8 +788,7 @@ RSpec.shared_examples 'Pipeline Processing Service' do ...@@ -788,8 +788,7 @@ RSpec.shared_examples 'Pipeline Processing Service' do
let!(:deploy_pages) { create_build('deploy_pages', stage: 'deploy', stage_idx: 2, scheduling_type: :dag) } let!(:deploy_pages) { create_build('deploy_pages', stage: 'deploy', stage_idx: 2, scheduling_type: :dag) }
it 'runs deploy_pages without waiting prior stages' do it 'runs deploy_pages without waiting prior stages' do
# Ci::PipelineProcessing::LegacyProcessingService requires :initial_process parameter expect(process_pipeline).to be_truthy
expect(process_pipeline(initial_process: true)).to be_truthy
expect(stages).to eq(%w(pending created pending)) expect(stages).to eq(%w(pending created pending))
expect(builds.pending).to contain_exactly(linux_build, mac_build, deploy_pages) expect(builds.pending).to contain_exactly(linux_build, mac_build, deploy_pages)
......
...@@ -42,7 +42,7 @@ RSpec.shared_context 'Pipeline Processing Service Tests With Yaml' do ...@@ -42,7 +42,7 @@ RSpec.shared_context 'Pipeline Processing Service Tests With Yaml' do
{ {
pipeline: pipeline.status, pipeline: pipeline.status,
stages: pipeline.ordered_stages.pluck(:name, :status).to_h, stages: pipeline.stages.pluck(:name, :status).to_h,
jobs: pipeline.statuses.latest.pluck(:name, :status).to_h jobs: pipeline.statuses.latest.pluck(:name, :status).to_h
} }
end end
......
...@@ -162,15 +162,6 @@ RSpec.describe Projects::UpdatePagesService do ...@@ -162,15 +162,6 @@ RSpec.describe Projects::UpdatePagesService do
end end
context 'with background jobs running', :sidekiq_inline do context 'with background jobs running', :sidekiq_inline do
where(:ci_atomic_processing) do
[true, false]
end
with_them do
before do
stub_feature_flags(ci_atomic_processing: ci_atomic_processing)
end
it 'succeeds' do it 'succeeds' do
expect(project.pages_deployed?).to be_falsey expect(project.pages_deployed?).to be_falsey
expect(execute).to eq(:success) expect(execute).to eq(:success)
...@@ -178,7 +169,6 @@ RSpec.describe Projects::UpdatePagesService do ...@@ -178,7 +169,6 @@ RSpec.describe Projects::UpdatePagesService do
end end
end end
end end
end
it 'fails to remove project pages when no pages is deployed' do it 'fails to remove project pages when no pages is deployed' do
expect(PagesWorker).not_to receive(:perform_in) expect(PagesWorker).not_to receive(:perform_in)
......
...@@ -12,17 +12,6 @@ RSpec.describe PipelineProcessWorker do ...@@ -12,17 +12,6 @@ RSpec.describe PipelineProcessWorker do
described_class.new.perform(pipeline.id) described_class.new.perform(pipeline.id)
end end
context 'when build_ids are passed' do
let(:build) { create(:ci_build, pipeline: pipeline, name: 'my-build') }
it 'processes pipeline with a list of builds' do
expect_any_instance_of(Ci::ProcessPipelineService).to receive(:execute)
.with([build.id])
described_class.new.perform(pipeline.id, [build.id])
end
end
end end
context 'when pipeline does not exist' do context 'when pipeline does not exist' do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe PipelineUpdateWorker do
describe '#perform' do
context 'when pipeline exists' do
let(:pipeline) { create(:ci_pipeline) }
it 'updates pipeline status' do
expect_any_instance_of(Ci::Pipeline).to receive(:set_status).with('skipped')
described_class.new.perform(pipeline.id)
end
include_examples 'an idempotent worker' do
let(:job_args) { [pipeline.id] }
it 'sets pipeline status to skipped' do
expect { subject }.to change { pipeline.reload.status }.from('pending').to('skipped')
end
end
end
context 'when pipeline does not exist' do
it 'does not raise exception' do
expect { described_class.new.perform(123) }
.not_to raise_error
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment