Commit 8413cff4 authored by Grzegorz Bizon's avatar Grzegorz Bizon

Merge branch 'improve-pipeline-processing' into 'master'

Implement AtomicProcessing service

See merge request gitlab-org/gitlab!20229
parents 8d416b7b 38e549b6
......@@ -447,10 +447,6 @@ module Ci
options_retry_when.include?('always')
end
def latest?
!retried?
end
def any_unmet_prerequisites?
prerequisites.present?
end
......
......@@ -515,7 +515,9 @@ module Ci
# rubocop: enable CodeReuse/ServiceClass
def mark_as_processable_after_stage(stage_idx)
builds.skipped.after_stage(stage_idx).find_each(&:process)
builds.skipped.after_stage(stage_idx).find_each do |build|
Gitlab::OptimisticLocking.retry_lock(build, &:process)
end
end
def latest?
......@@ -554,6 +556,13 @@ module Ci
end
end
def needs_processing?
statuses
.where(processed: [false, nil])
.latest
.exists?
end
# TODO: this logic is duplicate with Pipeline::Chain::Config::Content
# we should persist this is `ci_pipelines.config_path`
def config_path
......@@ -583,9 +592,8 @@ module Ci
project.notes.for_commit_id(sha)
end
def update_status
def set_status(new_status)
retry_optimistic_lock(self) do
new_status = latest_builds_status.to_s
case new_status
when 'created' then nil
when 'waiting_for_resource' then request_resource
......@@ -605,6 +613,10 @@ module Ci
end
end
def update_legacy_status
set_status(latest_builds_status.to_s)
end
def protected_ref?
strong_memoize(:protected_ref) { project.protected_for?(git_ref) }
end
......
......@@ -8,8 +8,26 @@ module Ci
scope :preload_needs, -> { preload(:needs) }
def self.select_with_aggregated_needs(project)
return all unless Feature.enabled?(:ci_dag_support, project, default_enabled: true)
aggregated_needs_names = Ci::BuildNeed
.scoped_build
.select("ARRAY_AGG(name)")
.to_sql
all.select(
'*',
"(#{aggregated_needs_names}) as aggregated_needs_names"
)
end
validates :type, presence: true
def aggregated_needs_names
read_attribute(:aggregated_needs_names)
end
def schedulable?
raise NotImplementedError
end
......
......@@ -13,9 +13,12 @@ module Ci
belongs_to :pipeline
has_many :statuses, class_name: 'CommitStatus', foreign_key: :stage_id
has_many :processables, class_name: 'Ci::Processable', foreign_key: :stage_id
has_many :builds, foreign_key: :stage_id
has_many :bridges, foreign_key: :stage_id
scope :ordered, -> { order(position: :asc) }
with_options unless: :importing? do
validates :project, presence: true
validates :pipeline, presence: true
......@@ -80,9 +83,8 @@ module Ci
end
end
def update_status
def set_status(new_status)
retry_optimistic_lock(self) do
new_status = latest_stage_status.to_s
case new_status
when 'created' then nil
when 'waiting_for_resource' then request_resource
......@@ -102,6 +104,10 @@ module Ci
end
end
def update_legacy_status
set_status(latest_stage_status.to_s)
end
def groups
@groups ||= Ci::Group.fabricate(self)
end
......
......@@ -40,6 +40,7 @@ class CommitStatus < ApplicationRecord
scope :latest, -> { where(retried: [false, nil]) }
scope :retried, -> { where(retried: true) }
scope :ordered, -> { order(:name) }
scope :ordered_by_stage, -> { order(stage_idx: :asc) }
scope :latest_ordered, -> { latest.ordered.includes(project: :namespace) }
scope :retried_ordered, -> { retried.ordered.includes(project: :namespace) }
scope :before_stage, -> (index) { where('stage_idx < ?', index) }
......@@ -57,6 +58,10 @@ class CommitStatus < ApplicationRecord
preload(:project, :user)
end
scope :with_project_preload, -> do
preload(project: :namespace)
end
scope :with_needs, -> (names = nil) do
needs = Ci::BuildNeed.scoped_build.select(1)
needs = needs.where(name: names) if names
......@@ -69,6 +74,15 @@ class CommitStatus < ApplicationRecord
where('NOT EXISTS (?)', needs)
end
scope :match_id_and_lock_version, -> (slice) do
# it expects that items are an array of attributes to match
# each hash needs to have `id` and `lock_version`
slice.inject(self) do |relation, item|
match = CommitStatus.where(item.slice(:id, :lock_version))
relation.or(match)
end
end
# We use `CommitStatusEnums.failure_reasons` here so that EE can more easily
# extend this `Hash` with new values.
enum_with_nil failure_reason: ::CommitStatusEnums.failure_reasons
......@@ -86,6 +100,16 @@ class CommitStatus < ApplicationRecord
# rubocop: enable CodeReuse/ServiceClass
end
before_save if: :status_changed?, unless: :importing? do
if Feature.disabled?(:ci_atomic_processing, project)
self.processed = nil
elsif latest?
self.processed = false # force refresh of all dependent ones
elsif retried?
self.processed = true # retried are considered to be already processed
end
end
state_machine :status do
event :process do
transition [:skipped, :manual] => :created
......@@ -136,19 +160,13 @@ class CommitStatus < ApplicationRecord
end
after_transition do |commit_status, transition|
next unless commit_status.project
next if transition.loopback?
next if commit_status.processed?
next unless commit_status.project
commit_status.run_after_commit do
if pipeline_id
if complete? || manual?
PipelineProcessWorker.perform_async(pipeline_id, [id])
else
PipelineUpdateWorker.perform_async(pipeline_id)
end
end
StageUpdateWorker.perform_async(stage_id)
schedule_stage_and_pipeline_update
ExpireJobCacheWorker.perform_async(id)
end
end
......@@ -177,6 +195,11 @@ class CommitStatus < ApplicationRecord
where(name: names).latest.slow_composite_status || 'success'
end
def self.update_as_processed!
# Marks items as processed, and increases `lock_version` (Optimisitc Locking)
update_all('processed=TRUE, lock_version=COALESCE(lock_version,0)+1')
end
def locking_enabled?
will_save_change_to_status?
end
......@@ -193,6 +216,10 @@ class CommitStatus < ApplicationRecord
calculate_duration
end
def latest?
!retried?
end
def playable?
false
end
......@@ -244,4 +271,21 @@ class CommitStatus < ApplicationRecord
v =~ /\d+/ ? v.to_i : v
end
end
private
def schedule_stage_and_pipeline_update
if Feature.enabled?(:ci_atomic_processing, project)
# Atomic Processing requires only single Worker
PipelineProcessWorker.perform_async(pipeline_id, [id])
else
if complete? || manual?
PipelineProcessWorker.perform_async(pipeline_id, [id])
else
PipelineUpdateWorker.perform_async(pipeline_id)
end
StageUpdateWorker.perform_async(stage_id)
end
end
end
# frozen_string_literal: true
module Ci
module PipelineProcessing
class AtomicProcessingService
include Gitlab::Utils::StrongMemoize
include ExclusiveLeaseGuard
attr_reader :pipeline
DEFAULT_LEASE_TIMEOUT = 1.minute
BATCH_SIZE = 20
def initialize(pipeline)
@pipeline = pipeline
@collection = AtomicProcessingService::StatusCollection.new(pipeline)
end
def execute
return unless pipeline.needs_processing?
success = try_obtain_lease { process! }
# re-schedule if we need further processing
if success && pipeline.needs_processing?
PipelineProcessWorker.perform_async(pipeline.id)
end
success
end
private
def process!
update_stages!
update_pipeline!
update_statuses_processed!
true
end
def update_stages!
pipeline.stages.ordered.each(&method(:update_stage!))
end
def update_stage!(stage)
# Update processables for a given stage in bulk/slices
ids = @collection.created_processable_ids_for_stage_position(stage.position)
ids.in_groups_of(BATCH_SIZE, false, &method(:update_processables!))
status = @collection.status_for_stage_position(stage.position)
stage.set_status(status)
end
def update_processables!(ids)
created_processables = pipeline.processables.for_ids(ids)
.with_project_preload
.created
.latest
.ordered_by_stage
.select_with_aggregated_needs(project)
created_processables.each(&method(:update_processable!))
end
def update_pipeline!
pipeline.set_status(@collection.status_of_all)
end
def update_statuses_processed!
processing = @collection.processing_processables
processing.each_slice(BATCH_SIZE) do |slice|
pipeline.statuses.match_id_and_lock_version(slice)
.update_as_processed!
end
end
def update_processable!(processable)
status = processable_status(processable)
return unless HasStatus::COMPLETED_STATUSES.include?(status)
# transition status if possible
Gitlab::OptimisticLocking.retry_lock(processable) do |subject|
Ci::ProcessBuildService.new(project, subject.user)
.execute(subject, status)
# update internal representation of status
# to make the status change of processable
# to be taken into account during further processing
@collection.set_processable_status(
processable.id, processable.status, processable.lock_version)
end
end
def processable_status(processable)
if needs_names = processable.aggregated_needs_names
# Processable uses DAG, get status of all dependent needs
@collection.status_for_names(needs_names)
else
# Processable uses Stages, get status of prior stage
@collection.status_for_prior_stage_position(processable.stage_idx.to_i)
end
end
def project
pipeline.project
end
def lease_key
"#{super}::pipeline_id:#{pipeline.id}"
end
def lease_timeout
DEFAULT_LEASE_TIMEOUT
end
end
end
end
# frozen_string_literal: true
module Ci
module PipelineProcessing
class AtomicProcessingService
class StatusCollection
include Gitlab::Utils::StrongMemoize
attr_reader :pipeline
# We use these columns to perform an efficient
# calculation of a status
STATUSES_COLUMNS = [
:id, :name, :status, :allow_failure,
:stage_idx, :processed, :lock_version
].freeze
def initialize(pipeline)
@pipeline = pipeline
@stage_statuses = {}
@prior_stage_statuses = {}
end
# This method updates internal status for given ID
def set_processable_status(id, status, lock_version)
processable = all_statuses_by_id[id]
return unless processable
processable[:status] = status
processable[:lock_version] = lock_version
end
# This methods gets composite status of all processables
def status_of_all
status_for_array(all_statuses)
end
# This methods gets composite status for processables with given names
def status_for_names(names)
name_statuses = all_statuses_by_name.slice(*names)
status_for_array(name_statuses.values)
end
# This methods gets composite status for processables before given stage
def status_for_prior_stage_position(position)
strong_memoize("status_for_prior_stage_position_#{position}") do
stage_statuses = all_statuses_grouped_by_stage_position
.select { |stage_position, _| stage_position < position }
status_for_array(stage_statuses.values.flatten)
end
end
# This methods gets a list of processables for a given stage
def created_processable_ids_for_stage_position(current_position)
all_statuses_grouped_by_stage_position[current_position]
.to_a
.select { |processable| processable[:status] == 'created' }
.map { |processable| processable[:id] }
end
# This methods gets composite status for processables at a given stage
def status_for_stage_position(current_position)
strong_memoize("status_for_stage_position_#{current_position}") do
stage_statuses = all_statuses_grouped_by_stage_position[current_position].to_a
status_for_array(stage_statuses.flatten)
end
end
# This method returns a list of all processable, that are to be processed
def processing_processables
all_statuses.lazy.reject { |status| status[:processed] }
end
private
def status_for_array(statuses)
result = Gitlab::Ci::Status::Composite
.new(statuses)
.status
result || 'success'
end
def all_statuses_grouped_by_stage_position
strong_memoize(:all_statuses_by_order) do
all_statuses.group_by { |status| status[:stage_idx].to_i }
end
end
def all_statuses_by_id
strong_memoize(:all_statuses_by_id) do
all_statuses.map do |row|
[row[:id], row]
end.to_h
end
end
def all_statuses_by_name
strong_memoize(:statuses_by_name) do
all_statuses.map do |row|
[row[:name], row]
end.to_h
end
end
# rubocop: disable CodeReuse/ActiveRecord
def all_statuses
# We fetch all relevant data in one go.
#
# This is more efficient than relying
# on PostgreSQL to calculate composite status
# for us
#
# Since we need to reprocess everything
# we can fetch all of them and do processing
# ourselves.
strong_memoize(:all_statuses) do
raw_statuses = pipeline
.statuses
.latest
.ordered_by_stage
.pluck(*STATUSES_COLUMNS)
raw_statuses.map do |row|
STATUSES_COLUMNS.zip(row).to_h
end
end
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
end
end
......@@ -18,7 +18,7 @@ module Ci
# only when the another job has finished
success = process_builds_with_needs(trigger_build_ids) || success
@pipeline.update_status
@pipeline.update_legacy_status
success
end
......
......@@ -11,9 +11,15 @@ module Ci
def execute(trigger_build_ids = nil)
update_retried
Ci::PipelineProcessing::LegacyProcessingService
.new(pipeline)
.execute(trigger_build_ids)
if Feature.enabled?(:ci_atomic_processing, pipeline.project)
Ci::PipelineProcessing::AtomicProcessingService
.new(pipeline)
.execute
else
Ci::PipelineProcessing::LegacyProcessingService
.new(pipeline)
.execute(trigger_build_ids)
end
end
private
......
......@@ -11,7 +11,7 @@ module Ci
reprocess!(build).tap do |new_build|
build.pipeline.mark_as_processable_after_stage(build.stage_idx)
new_build.enqueue!
Gitlab::OptimisticLocking.retry_lock(new_build, &:enqueue)
MergeRequests::AddTodoWhenBuildFailsService
.new(project, current_user)
......@@ -31,15 +31,17 @@ module Ci
attributes.push([:user, current_user])
build.retried = true
Ci::Build.transaction do
# mark all other builds of that name as retried
build.pipeline.builds.latest
.where(name: build.name)
.update_all(retried: true)
.update_all(retried: true, processed: true)
create_build!(attributes)
create_build!(attributes).tap do
# mark existing object as retried/processed without a reload
build.retried = true
build.processed = true
end
end
end
# rubocop: enable CodeReuse/ActiveRecord
......@@ -49,6 +51,7 @@ module Ci
def create_build!(attributes)
build = project.builds.new(Hash[attributes])
build.deployment = ::Gitlab::Ci::Pipeline::Seed::Deployment.new(build).to_resource
build.retried = false
build.save!
build
end
......
......@@ -7,10 +7,7 @@ class PipelineUpdateWorker
queue_namespace :pipeline_processing
latency_sensitive_worker!
# rubocop: disable CodeReuse/ActiveRecord
def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id)
.try(:update_status)
Ci::Pipeline.find_by_id(pipeline_id)&.update_legacy_status
end
# rubocop: enable CodeReuse/ActiveRecord
end
......@@ -7,11 +7,7 @@ class StageUpdateWorker
queue_namespace :pipeline_processing
latency_sensitive_worker!
# rubocop: disable CodeReuse/ActiveRecord
def perform(stage_id)
Ci::Stage.find_by(id: stage_id).try do |stage|
stage.update_status
end
Ci::Stage.find_by_id(stage_id)&.update_legacy_status
end
# rubocop: enable CodeReuse/ActiveRecord
end
---
title: Implement Atomic Processing that updates status of builds, stages and pipelines in one go
merge_request: 20229
author:
type: performance
......@@ -57,7 +57,7 @@ class Gitlab::Seeder::Pipelines
BUILDS.each { |opts| build_create!(pipeline, opts) }
EXTERNAL_JOBS.each { |opts| commit_status_create!(pipeline, opts) }
pipeline.update_duration
pipeline.update_status
pipeline.update_legacy_status
end
end
......
......@@ -187,7 +187,7 @@ class Gitlab::Seeder::CycleAnalytics
pipeline.builds.each(&:enqueue) # make sure all pipelines in pending state
pipeline.builds.each(&:run!)
pipeline.update_status
pipeline.update_legacy_status
end
end
......@@ -208,7 +208,7 @@ class Gitlab::Seeder::CycleAnalytics
job = merge_request.head_pipeline.builds.where.not(environment: nil).last
job.success!
job.pipeline.update_status
job.pipeline.update_legacy_status
end
end
end
......
# frozen_string_literal: true
class AddProcessedToCiBuilds < ActiveRecord::Migration[5.2]
DOWNTIME = false
def change
add_column :ci_builds, :processed, :boolean
end
end
......@@ -697,6 +697,7 @@ ActiveRecord::Schema.define(version: 2020_01_14_204949) do
t.integer "upstream_pipeline_id"
t.bigint "resource_group_id"
t.datetime_with_timezone "waiting_for_resource_at"
t.boolean "processed"
t.index ["artifacts_expire_at"], name: "index_ci_builds_on_artifacts_expire_at", where: "(artifacts_file <> ''::text)"
t.index ["auto_canceled_by_id"], name: "index_ci_builds_on_auto_canceled_by_id"
t.index ["commit_id", "artifacts_expire_at", "id"], name: "index_ci_builds_on_commit_id_and_artifacts_expireatandidpartial", where: "(((type)::text = 'Ci::Build'::text) AND ((retried = false) OR (retried IS NULL)) AND ((name)::text = ANY (ARRAY[('sast'::character varying)::text, ('dependency_scanning'::character varying)::text, ('sast:container'::character varying)::text, ('container_scanning'::character varying)::text, ('dast'::character varying)::text])))"
......
......@@ -19,41 +19,53 @@ describe Ci::ProcessPipelineService, '#execute' do
end
describe 'cross-project pipelines' do
before do
create_processable(:build, name: 'test', stage: 'test')
create_processable(:bridge, :variables, name: 'cross',
stage: 'build',
downstream: downstream)
create_processable(:build, name: 'deploy', stage: 'deploy')
stub_ci_pipeline_to_return_yaml_file
using RSpec::Parameterized::TableSyntax
where(:ci_atomic_processing) do
[true, false]
end
it 'creates a downstream cross-project pipeline', :sidekiq_might_not_need_inline do
service.execute
with_them do
before do
stub_feature_flags(ci_atomic_processing: ci_atomic_processing)
expect_statuses(%w[test pending], %w[cross created], %w[deploy created])
create_processable(:build, name: 'test', stage: 'test')
create_processable(:bridge, :variables, name: 'cross',
stage: 'build',
downstream: downstream)
create_processable(:build, name: 'deploy', stage: 'deploy')
update_build_status(:test, :success)
stub_ci_pipeline_to_return_yaml_file
end
expect_statuses(%w[test success], %w[cross success], %w[deploy pending])
it 'creates a downstream cross-project pipeline', :sidekiq do
service.execute
Sidekiq::Worker.drain_all
expect(downstream.ci_pipelines).to be_one
expect(downstream.ci_pipelines.first).to be_pending
expect(downstream.builds).not_to be_empty
expect(downstream.builds.first.variables)
.to include(key: 'BRIDGE', value: 'cross', public: false, masked: false)
end
end
expect_statuses(%w[test pending], %w[cross created], %w[deploy created])
update_build_status(:test, :success)
Sidekiq::Worker.drain_all
def expect_statuses(*statuses)
statuses.each do |name, status|
pipeline.statuses.find_by(name: name).yield_self do |build|
expect(build.status).to eq status
expect_statuses(%w[test success], %w[cross success], %w[deploy pending])
expect(downstream.ci_pipelines).to be_one
expect(downstream.ci_pipelines.first).to be_pending
expect(downstream.builds).not_to be_empty
expect(downstream.builds.first.variables)
.to include(key: 'BRIDGE', value: 'cross', public: false, masked: false)
end
end
end
def expect_statuses(*expected)
statuses = pipeline.statuses
.where(name: expected.map(&:first))
.pluck(:name, :status)
expect(statuses).to contain_exactly(*expected)
end
def update_build_status(name, status)
pipeline.builds.find_by(name: name).public_send(status)
end
......
......@@ -240,6 +240,7 @@ excluded_attributes:
- :upstream_pipeline_id
- :resource_group_id
- :waiting_for_resource_at
- :processed
sentry_issue:
- :issue_id
push_event_payload:
......
......@@ -63,7 +63,7 @@ describe 'test coverage badge' do
create(:ci_pipeline, opts).tap do |pipeline|
yield pipeline
pipeline.update_status
pipeline.update_legacy_status
end
end
......
......@@ -102,7 +102,7 @@ describe Gitlab::Badge::Coverage::Report do
create(:ci_pipeline, opts).tap do |pipeline|
yield pipeline
pipeline.update_status
pipeline.update_legacy_status
end
end
end
......@@ -216,6 +216,7 @@ stages:
- project
- pipeline
- statuses
- processables
- builds
- bridges
statuses:
......
......@@ -333,6 +333,7 @@ CommitStatus:
- scheduled_at
- upstream_pipeline_id
- interruptible
- processed
Ci::Variable:
- id
- project_id
......
......@@ -604,7 +604,7 @@ describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
context 'when traces are archived' do
let(:subject) do
project.builds.each do |build|
build.success!
build.reset.success!
end
end
......
......@@ -1008,22 +1008,22 @@ describe Ci::Pipeline, :mailer do
end
end
describe '#duration', :sidekiq_might_not_need_inline do
describe '#duration', :sidekiq_inline do
context 'when multiple builds are finished' do
before do
travel_to(current + 30) do
build.run!
build.success!
build.reload.success!
build_b.run!
build_c.run!
end
travel_to(current + 40) do
build_b.drop!
build_b.reload.drop!
end
travel_to(current + 70) do
build_c.success!
build_c.reload.success!
end
end
......@@ -1044,7 +1044,7 @@ describe Ci::Pipeline, :mailer do
end
travel_to(current + 5.minutes) do
build.success!
build.reload.success!
end
end
......@@ -1585,6 +1585,30 @@ describe Ci::Pipeline, :mailer do
end
end
describe '#needs_processing?' do
using RSpec::Parameterized::TableSyntax
subject { pipeline.needs_processing? }
where(:processed, :result) do
nil | true
false | true
true | false
end
with_them do
let(:build) do
create(:ci_build, :success, pipeline: pipeline, name: 'rubocop')
end
before do
build.update_column(:processed, processed)
end
it { is_expected.to eq(result) }
end
end
shared_context 'with some outdated pipelines' do
before do
create_pipeline(:canceled, 'ref', 'A', project)
......@@ -1785,7 +1809,7 @@ describe Ci::Pipeline, :mailer do
it { is_expected.not_to include('created', 'waiting_for_resource', 'preparing', 'pending') }
end
describe '#status', :sidekiq_might_not_need_inline do
describe '#status', :sidekiq_inline do
let(:build) do
create(:ci_build, :created, pipeline: pipeline, name: 'test')
end
......@@ -1826,7 +1850,7 @@ describe Ci::Pipeline, :mailer do
context 'on run' do
before do
build.enqueue
build.run
build.reload.run
end
it { is_expected.to eq('running') }
......@@ -1885,7 +1909,7 @@ describe Ci::Pipeline, :mailer do
it 'updates does not change pipeline status' do
expect(pipeline.statuses.latest.slow_composite_status).to be_nil
expect { pipeline.update_status }
expect { pipeline.update_legacy_status }
.to change { pipeline.reload.status }
.from('created')
.to('skipped')
......@@ -1898,7 +1922,7 @@ describe Ci::Pipeline, :mailer do
end
it 'updates pipeline status to running' do
expect { pipeline.update_status }
expect { pipeline.update_legacy_status }
.to change { pipeline.reload.status }
.from('created')
.to('running')
......@@ -1911,7 +1935,7 @@ describe Ci::Pipeline, :mailer do
end
it 'updates pipeline status to scheduled' do
expect { pipeline.update_status }
expect { pipeline.update_legacy_status }
.to change { pipeline.reload.status }
.from('created')
.to('scheduled')
......@@ -1926,7 +1950,7 @@ describe Ci::Pipeline, :mailer do
end
it 'raises an exception' do
expect { pipeline.update_status }
expect { pipeline.update_legacy_status }
.to raise_error(HasStatus::UnknownStatusError)
end
end
......@@ -2214,11 +2238,11 @@ describe Ci::Pipeline, :mailer do
stub_full_request(hook.url, method: :post)
end
context 'with multiple builds', :sidekiq_might_not_need_inline do
context 'with multiple builds', :sidekiq_inline do
context 'when build is queued' do
before do
build_a.enqueue
build_b.enqueue
build_a.reload.enqueue
build_b.reload.enqueue
end
it 'receives a pending event once' do
......@@ -2228,10 +2252,10 @@ describe Ci::Pipeline, :mailer do
context 'when build is run' do
before do
build_a.enqueue
build_a.run
build_b.enqueue
build_b.run
build_a.reload.enqueue
build_a.reload.run!
build_b.reload.enqueue
build_b.reload.run!
end
it 'receives a running event once' do
......@@ -2292,6 +2316,7 @@ describe Ci::Pipeline, :mailer do
:created,
pipeline: pipeline,
name: name,
stage: "stage:#{stage_idx}",
stage_idx: stage_idx)
end
end
......
# frozen_string_literal: true
require 'spec_helper'
describe Ci::Processable do
set(:project) { create(:project) }
set(:pipeline) { create(:ci_pipeline, project: project) }
describe '#aggregated_needs_names' do
let(:with_aggregated_needs) { pipeline.processables.select_with_aggregated_needs(project) }
context 'with created status' do
let!(:processable) { create(:ci_build, :created, project: project, pipeline: pipeline) }
context 'with needs' do
before do
create(:ci_build_need, build: processable, name: 'test1')
create(:ci_build_need, build: processable, name: 'test2')
end
it 'returns all processables' do
expect(with_aggregated_needs).to contain_exactly(processable)
end
it 'returns all needs' do
expect(with_aggregated_needs.first.aggregated_needs_names).to contain_exactly('test1', 'test2')
end
context 'with ci_dag_support disabled' do
before do
stub_feature_flags(ci_dag_support: false)
end
it 'returns all processables' do
expect(with_aggregated_needs).to contain_exactly(processable)
end
it 'returns empty needs' do
expect(with_aggregated_needs.first.aggregated_needs_names).to be_nil
end
end
end
context 'without needs' do
it 'returns all processables' do
expect(with_aggregated_needs).to contain_exactly(processable)
end
it 'returns empty needs' do
expect(with_aggregated_needs.first.aggregated_needs_names).to be_nil
end
end
end
end
end
......@@ -63,7 +63,7 @@ describe Ci::Stage, :models do
end
it 'updates stage status correctly' do
expect { stage.update_status }
expect { stage.update_legacy_status }
.to change { stage.reload.status }
.to eq 'running'
end
......@@ -87,7 +87,7 @@ describe Ci::Stage, :models do
end
it 'updates status to skipped' do
expect { stage.update_status }
expect { stage.update_legacy_status }
.to change { stage.reload.status }
.to eq 'skipped'
end
......@@ -99,7 +99,7 @@ describe Ci::Stage, :models do
end
it 'updates status to scheduled' do
expect { stage.update_status }
expect { stage.update_legacy_status }
.to change { stage.reload.status }
.to 'scheduled'
end
......@@ -111,7 +111,7 @@ describe Ci::Stage, :models do
end
it 'updates status to waiting for resource' do
expect { stage.update_status }
expect { stage.update_legacy_status }
.to change { stage.reload.status }
.to 'waiting_for_resource'
end
......@@ -119,7 +119,7 @@ describe Ci::Stage, :models do
context 'when stage is skipped because is empty' do
it 'updates status to skipped' do
expect { stage.update_status }
expect { stage.update_legacy_status }
.to change { stage.reload.status }
.to eq('skipped')
end
......@@ -133,7 +133,7 @@ describe Ci::Stage, :models do
it 'retries a lock to update a stage status' do
stage.lock_version = 100
stage.update_status
stage.update_legacy_status
expect(stage.reload).to be_failed
end
......@@ -147,7 +147,7 @@ describe Ci::Stage, :models do
end
it 'raises an exception' do
expect { stage.update_status }
expect { stage.update_legacy_status }
.to raise_error(HasStatus::UnknownStatusError)
end
end
......@@ -179,7 +179,7 @@ describe Ci::Stage, :models do
stage_id: stage.id,
status: status)
stage.update_status
stage.update_legacy_status
end
end
......@@ -196,7 +196,7 @@ describe Ci::Stage, :models do
status: :failed,
allow_failure: true)
stage.update_status
stage.update_legacy_status
end
it 'is passed with warnings' do
......@@ -243,7 +243,7 @@ describe Ci::Stage, :models do
it 'recalculates index before updating status' do
expect(stage.reload.position).to be_nil
stage.update_status
stage.update_legacy_status
expect(stage.reload.position).to eq 10
end
......@@ -253,7 +253,7 @@ describe Ci::Stage, :models do
it 'fallbacks to zero' do
expect(stage.reload.position).to be_nil
stage.update_status
stage.update_legacy_status
expect(stage.reload.position).to eq 0
end
......
......@@ -63,6 +63,42 @@ describe CommitStatus do
end
end
describe '#processed' do
subject { commit_status.processed }
context 'when ci_atomic_processing is disabled' do
before do
stub_feature_flags(ci_atomic_processing: false)
commit_status.save!
end
it { is_expected.to be_nil }
end
context 'when ci_atomic_processing is enabled' do
before do
stub_feature_flags(ci_atomic_processing: true)
end
context 'status is latest' do
before do
commit_status.update!(retried: false, status: :pending)
end
it { is_expected.to be_falsey }
end
context 'status is retried' do
before do
commit_status.update!(retried: true, status: :pending)
end
it { is_expected.to be_truthy }
end
end
end
describe '#started?' do
subject { commit_status.started? }
......
......@@ -362,11 +362,11 @@ describe Ci::CreatePipelineService do
context 'when build that is not marked as interruptible is running' do
it 'cancels running outdated pipelines', :sidekiq_might_not_need_inline do
pipeline_on_previous_commit
.builds
.find_by_name('build_2_1')
.tap(&:enqueue!)
.run!
build_2_1 = pipeline_on_previous_commit
.builds.find_by_name('build_2_1')
build_2_1.enqueue!
build_2_1.reset.run!
pipeline
......@@ -377,12 +377,12 @@ describe Ci::CreatePipelineService do
end
context 'when an uninterruptible build is running' do
it 'does not cancel running outdated pipelines', :sidekiq_might_not_need_inline do
pipeline_on_previous_commit
.builds
.find_by_name('build_3_1')
.tap(&:enqueue!)
.run!
it 'does not cancel running outdated pipelines', :sidekiq_inline do
build_3_1 = pipeline_on_previous_commit
.builds.find_by_name('build_3_1')
build_3_1.enqueue!
build_3_1.reset.run!
pipeline
......
# frozen_string_literal: true
require 'spec_helper'
describe Ci::PipelineProcessing::AtomicProcessingService::StatusCollection do
using RSpec::Parameterized::TableSyntax
set(:pipeline) { create(:ci_pipeline) }
set(:build_a) { create(:ci_build, :success, name: 'build-a', stage: 'build', stage_idx: 0, pipeline: pipeline) }
set(:build_b) { create(:ci_build, :failed, name: 'build-b', stage: 'build', stage_idx: 0, pipeline: pipeline) }
set(:test_a) { create(:ci_build, :running, name: 'test-a', stage: 'test', stage_idx: 1, pipeline: pipeline) }
set(:test_b) { create(:ci_build, :pending, name: 'test-b', stage: 'test', stage_idx: 1, pipeline: pipeline) }
set(:deploy) { create(:ci_build, :created, name: 'deploy', stage: 'deploy', stage_idx: 2, pipeline: pipeline) }
let(:collection) { described_class.new(pipeline) }
describe '#set_processable_status' do
it 'does update existing status of processable' do
collection.set_processable_status(test_a.id, 'success', 100)
expect(collection.status_for_names(['test-a'])).to eq('success')
end
it 'ignores a missing processable' do
collection.set_processable_status(-1, 'failed', 100)
end
end
describe '#status_of_all' do
it 'returns composite status of the collection' do
expect(collection.status_of_all).to eq('running')
end
end
describe '#status_for_names' do
where(:names, :status) do
%w[build-a] | 'success'
%w[build-a build-b] | 'failed'
%w[build-a test-a] | 'running'
end
with_them do
it 'returns composite status of given names' do
expect(collection.status_for_names(names)).to eq(status)
end
end
end
describe '#status_for_prior_stage_position' do
where(:stage, :status) do
0 | 'success'
1 | 'failed'
2 | 'running'
end
with_them do
it 'returns composite status for processables in prior stages' do
expect(collection.status_for_prior_stage_position(stage)).to eq(status)
end
end
end
describe '#status_for_stage_position' do
where(:stage, :status) do
0 | 'failed'
1 | 'running'
2 | 'created'
end
with_them do
it 'returns composite status for processables at a given stages' do
expect(collection.status_for_stage_position(stage)).to eq(status)
end
end
end
describe '#created_processable_ids_for_stage_position' do
it 'returns IDs of processables at a given stage position' do
expect(collection.created_processable_ids_for_stage_position(0)).to be_empty
expect(collection.created_processable_ids_for_stage_position(1)).to be_empty
expect(collection.created_processable_ids_for_stage_position(2)).to contain_exactly(deploy.id)
end
end
describe '#processing_processables' do
it 'returns processables marked as processing' do
expect(collection.processing_processables.map { |processable| processable[:id]} )
.to contain_exactly(build_a.id, build_b.id, test_a.id, test_b.id, deploy.id)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
require_relative 'shared_processing_service.rb'
describe Ci::PipelineProcessing::AtomicProcessingService do
before do
stub_feature_flags(ci_atomic_processing: true)
end
it_behaves_like 'Pipeline Processing Service'
end
......@@ -4,5 +4,9 @@ require 'spec_helper'
require_relative 'shared_processing_service.rb'
describe Ci::PipelineProcessing::LegacyProcessingService do
before do
stub_feature_flags(ci_atomic_processing: false)
end
it_behaves_like 'Pipeline Processing Service'
end
......@@ -879,19 +879,27 @@ shared_examples 'Pipeline Processing Service' do
end
def succeed_pending
builds.pending.map(&:success)
builds.pending.each do |build|
build.reset.success
end
end
def succeed_running_or_pending
pipeline.builds.running_or_pending.each(&:success)
pipeline.builds.running_or_pending.each do |build|
build.reset.success
end
end
def fail_running_or_pending
pipeline.builds.running_or_pending.each(&:drop)
pipeline.builds.running_or_pending.each do |build|
build.reset.drop
end
end
def cancel_running_or_pending
pipeline.builds.running_or_pending.each(&:cancel)
pipeline.builds.running_or_pending.each do |build|
build.reset.cancel
end
end
def play_manual_action(name)
......@@ -911,11 +919,15 @@ shared_examples 'Pipeline Processing Service' do
end
def create_build(name, **opts)
create(:ci_build, :created, pipeline: pipeline, name: name, **opts)
create(:ci_build, :created, pipeline: pipeline, name: name, **with_stage_opts(opts))
end
def successful_build(name, **opts)
create(:ci_build, :success, pipeline: pipeline, name: name, **opts)
create(:ci_build, :success, pipeline: pipeline, name: name, **with_stage_opts(opts))
end
def with_stage_opts(opts)
{ stage: "stage-#{opts[:stage_idx].to_i}" }.merge(opts)
end
def delayed_options
......
......@@ -45,7 +45,8 @@ describe Ci::RetryBuildService do
user_id auto_canceled_by_id retried failure_reason
sourced_pipelines artifacts_file_store artifacts_metadata_store
metadata runner_session trace_chunks upstream_pipeline_id
artifacts_file artifacts_metadata artifacts_size commands resource resource_group_id].freeze
artifacts_file artifacts_metadata artifacts_size commands
resource resource_group_id processed].freeze
shared_examples 'build duplication' do
let(:another_pipeline) { create(:ci_empty_pipeline, project: project) }
......@@ -202,12 +203,13 @@ describe Ci::RetryBuildService do
it 'does not enqueue the new build' do
expect(new_build).to be_created
expect(new_build).not_to be_processed
end
it 'does mark old build as retried in the database and on the instance' do
it 'does mark old build as retried' do
expect(new_build).to be_latest
expect(build).to be_retried
expect(build.reload).to be_retried
expect(build).to be_processed
end
context 'when build with deployment is retried' do
......
......@@ -330,7 +330,7 @@ describe Ci::RetryPipelineService, '#execute' do
stage: "stage_#{stage_num}",
stage_idx: stage_num,
pipeline: pipeline, **opts) do |build|
pipeline.update_status
pipeline.update_legacy_status
end
end
end
......@@ -8,7 +8,7 @@ describe PipelineUpdateWorker do
let(:pipeline) { create(:ci_pipeline) }
it 'updates pipeline status' do
expect_any_instance_of(Ci::Pipeline).to receive(:update_status)
expect_any_instance_of(Ci::Pipeline).to receive(:set_status).with('skipped')
described_class.new.perform(pipeline.id)
end
......
......@@ -8,7 +8,7 @@ describe StageUpdateWorker do
let(:stage) { create(:ci_stage_entity) }
it 'updates stage status' do
expect_any_instance_of(Ci::Stage).to receive(:update_status)
expect_any_instance_of(Ci::Stage).to receive(:set_status).with('skipped')
described_class.new.perform(stage.id)
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment