Commit 3c28a75b authored by Vitali Tatarintev's avatar Vitali Tatarintev Committed by Bob Van Landuyt

Add copies of ArchiveTraceWorker and BuildFinishedWorker

parent deee2572
......@@ -326,7 +326,11 @@ module Ci
build.run_after_commit do
build.run_status_commit_hooks!
BuildFinishedWorker.perform_async(id)
if Feature.enabled?(:ci_build_finished_worker_namespace_changed, build.project, default_enabled: :yaml)
Ci::BuildFinishedWorker.perform_async(id)
else
::BuildFinishedWorker.perform_async(id)
end
end
end
......
......@@ -224,7 +224,7 @@ module Ci
end
after_transition [:created, :waiting_for_resource, :preparing, :pending, :running] => :success do |pipeline|
# We wait a little bit to ensure that all BuildFinishedWorkers finish first
# We wait a little bit to ensure that all Ci::BuildFinishedWorkers finish first
# because this is where some metrics like code coverage is parsed and stored
# in CI build records which the daily build metrics worker relies on.
pipeline.run_after_commit { Ci::DailyBuildGroupReportResultsWorker.perform_in(10.minutes, pipeline.id) }
......
......@@ -1374,6 +1374,15 @@
:weight: 1
:idempotent:
:tags: []
- :name: pipeline_background:ci_archive_trace
:worker_name: Ci::ArchiveTraceWorker
:feature_category: :continuous_integration
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent:
:tags: []
- :name: pipeline_background:ci_build_trace_chunk_flush
:worker_name: Ci::BuildTraceChunkFlushWorker
:feature_category: :continuous_integration
......@@ -1594,6 +1603,15 @@
:weight: 5
:idempotent:
:tags: []
- :name: pipeline_processing:ci_build_finished
:worker_name: Ci::BuildFinishedWorker
:feature_category: :continuous_integration
:has_external_dependencies:
:urgency: :high
:resource_boundary: :cpu
:weight: 5
:idempotent:
:tags: []
- :name: pipeline_processing:ci_build_prepare
:worker_name: Ci::BuildPrepareWorker
:feature_category: :continuous_integration
......
# frozen_string_literal: true
class ArchiveTraceWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
sidekiq_options retry: 3
include PipelineBackgroundQueue
# rubocop: disable CodeReuse/ActiveRecord
def perform(job_id)
Ci::Build.without_archived_trace.find_by(id: job_id).try do |job|
Ci::ArchiveTraceService.new.execute(job, worker_name: self.class.name)
end
end
# rubocop: enable CodeReuse/ActiveRecord
class ArchiveTraceWorker < ::Ci::ArchiveTraceWorker # rubocop:disable Scalability/IdempotentWorker
# DEPRECATED: Not triggered since https://gitlab.com/gitlab-org/gitlab/-/merge_requests/64934/
end
# frozen_string_literal: true
class BuildFinishedWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
class BuildFinishedWorker < ::Ci::BuildFinishedWorker # rubocop:disable Scalability/IdempotentWorker
# DEPRECATED: Not triggered since https://gitlab.com/gitlab-org/gitlab/-/merge_requests/64934/
sidekiq_options retry: 3
include PipelineQueue
queue_namespace :pipeline_processing
# We need to explicitly specify these settings. They aren't inheriting from the parent class.
urgency :high
worker_resource_boundary :cpu
ARCHIVE_TRACES_IN = 2.minutes.freeze
# rubocop: disable CodeReuse/ActiveRecord
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
process_build(build)
end
end
# rubocop: enable CodeReuse/ActiveRecord
private
# Processes a single CI build that has finished.
#
# This logic resides in a separate method so that EE can extend it more
# easily.
#
# @param [Ci::Build] build The build to process.
def process_build(build)
# We execute these in sync to reduce IO.
build.parse_trace_sections!
build.update_coverage
Ci::BuildReportResultService.new.execute(build)
# We execute these async as these are independent operations.
BuildHooksWorker.perform_async(build.id)
ChatNotificationWorker.perform_async(build.id) if build.pipeline.chat?
if build.failed?
::Ci::MergeRequests::AddTodoWhenBuildFailsWorker.perform_async(build.id)
end
##
# We want to delay sending a build trace to object storage operation to
# validate that this fixes a race condition between this and flushing live
# trace chunks and chunks being removed after consolidation and putting
# them into object storage archive.
#
# TODO This is temporary fix we should improve later, after we validate
# that this is indeed the culprit.
#
# See https://gitlab.com/gitlab-org/gitlab/-/issues/267112 for more
# details.
#
ArchiveTraceWorker.perform_in(ARCHIVE_TRACES_IN, build.id)
end
end
BuildFinishedWorker.prepend_mod_with('BuildFinishedWorker')
# frozen_string_literal: true
module Ci
class ArchiveTraceWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
sidekiq_options retry: 3
include PipelineBackgroundQueue
# rubocop: disable CodeReuse/ActiveRecord
def perform(job_id)
Ci::Build.without_archived_trace.find_by(id: job_id).try do |job|
Ci::ArchiveTraceService.new.execute(job, worker_name: self.class.name)
end
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
......@@ -12,7 +12,7 @@ module Ci
# rubocop: disable CodeReuse/ActiveRecord
def perform
# Archive stale live traces which still resides in redis or database
# This could happen when ArchiveTraceWorker sidekiq jobs were lost by receiving SIGKILL
# This could happen when Ci::ArchiveTraceWorker sidekiq jobs were lost by receiving SIGKILL
# More details in https://gitlab.com/gitlab-org/gitlab-foss/issues/36791
Ci::Build.with_stale_live_trace.find_each(batch_size: 100) do |build|
Ci::ArchiveTraceService.new.execute(build, worker_name: self.class.name)
......
# frozen_string_literal: true
module Ci
class BuildFinishedWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
sidekiq_options retry: 3
include PipelineQueue
queue_namespace :pipeline_processing
urgency :high
worker_resource_boundary :cpu
ARCHIVE_TRACES_IN = 2.minutes.freeze
# rubocop: disable CodeReuse/ActiveRecord
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
process_build(build)
end
end
# rubocop: enable CodeReuse/ActiveRecord
private
# Processes a single CI build that has finished.
#
# This logic resides in a separate method so that EE can extend it more
# easily.
#
# @param [Ci::Build] build The build to process.
def process_build(build)
# We execute these in sync to reduce IO.
build.parse_trace_sections!
build.update_coverage
Ci::BuildReportResultService.new.execute(build)
# We execute these async as these are independent operations.
BuildHooksWorker.perform_async(build.id)
ChatNotificationWorker.perform_async(build.id) if build.pipeline.chat?
if build.failed?
::Ci::MergeRequests::AddTodoWhenBuildFailsWorker.perform_async(build.id)
end
##
# We want to delay sending a build trace to object storage operation to
# validate that this fixes a race condition between this and flushing live
# trace chunks and chunks being removed after consolidation and putting
# them into object storage archive.
#
# TODO This is temporary fix we should improve later, after we validate
# that this is indeed the culprit.
#
# See https://gitlab.com/gitlab-org/gitlab/-/issues/267112 for more
# details.
#
archive_trace_worker_class(build).perform_in(ARCHIVE_TRACES_IN, build.id)
end
def archive_trace_worker_class(build)
if Feature.enabled?(:ci_build_finished_worker_namespace_changed, build.project, default_enabled: :yaml)
Ci::ArchiveTraceWorker
else
::ArchiveTraceWorker
end
end
end
end
Ci::BuildFinishedWorker.prepend_mod_with('Ci::BuildFinishedWorker')
---
name: ci_build_finished_worker_namespace_changed
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/64934
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/335499
milestone: '14.1'
type: development
group: group::pipeline execution
default_enabled: false
......@@ -275,7 +275,7 @@ Post-processing is currently limited to a project's default branch, see the abov
sequenceDiagram
autonumber
Rails->>+Sidekiq: gl-secret-detection-report.json
Sidekiq-->+Sidekiq: BuildFinishedWorker
Sidekiq-->+Sidekiq: Ci::BuildFinishedWorker
Sidekiq-->+RevocationAPI: GET revocable keys types
RevocationAPI-->>-Sidekiq: OK
Sidekiq->>+RevocationAPI: POST revoke revocable keys
......
# frozen_string_literal: true
module EE
module BuildFinishedWorker
def process_build(build)
unless ::Feature.enabled?(:cancel_pipelines_prior_to_destroy, default_enabled: :yaml)
::Ci::Minutes::UpdateBuildMinutesService.new(build.project, nil).execute(build)
end
unless build.project.requirements.empty?
RequirementsManagement::ProcessRequirementsReportsWorker.perform_async(build.id)
end
super
end
end
end
# frozen_string_literal: true
module EE
module Ci
module BuildFinishedWorker
def process_build(build)
::Ci::Minutes::UpdateBuildMinutesService.new(build.project, nil).execute(build)
# We need to use `reset` on `project` because their AR associations have been cached
# and `Namespace#namespace_statistics` will return stale data.
::Ci::Minutes::EmailNotificationService.new(build.project.reset).execute if ::Gitlab.com?
unless build.project.requirements.empty?
RequirementsManagement::ProcessRequirementsReportsWorker.perform_async(build.id)
end
super
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::BuildFinishedWorker do
let(:ci_runner) { create(:ci_runner) }
let(:build) { create(:ee_ci_build, :success, runner: ci_runner) }
let(:project) { build.project }
let(:namespace) { project.shared_runners_limit_namespace }
subject do
described_class.new.perform(build.id)
end
def namespace_stats
namespace.namespace_statistics || namespace.create_namespace_statistics
end
def project_stats
project.statistics || project.create_statistics(namespace: project.namespace)
end
describe '#perform' do
context 'when on .com' do
before do
allow(Gitlab).to receive(:com?).and_return(true)
allow_any_instance_of(EE::Project).to receive(:shared_runners_minutes_limit_enabled?).and_return(true) # rubocop:disable RSpec/AnyInstanceOf
end
it 'updates the project stats' do
expect { subject }.to change { project_stats.reload.shared_runners_seconds }
end
it 'updates the namespace stats' do
expect { subject }.to change { namespace_stats.reload.shared_runners_seconds }
end
it 'notifies the owners of Groups' do
namespace.update_attribute(:shared_runners_minutes_limit, 2000)
namespace_stats.update_attribute(:shared_runners_seconds, 2100 * 60)
expect(CiMinutesUsageMailer).to receive(:notify).once.with(namespace, [namespace.owner.email]).and_return(spy)
subject
end
end
context 'when not on .com' do
before do
allow(Gitlab).to receive(:com?).and_return(false)
end
it 'does not notify the owners of Groups' do
expect(::Ci::Minutes::EmailNotificationService).not_to receive(:new)
subject
end
end
it 'does not schedule processing of requirement reports by default' do
expect(RequirementsManagement::ProcessRequirementsReportsWorker).not_to receive(:perform_async)
subject
end
it 'schedules processing of requirement reports if project has requirements' do
create(:requirement, project: project)
expect(RequirementsManagement::ProcessRequirementsReportsWorker).to receive(:perform_async)
subject
end
context 'when token revocation is disabled' do
before do
allow_next_instance_of(described_class) do |build_finished_worker|
allow(build_finished_worker).to receive(:revoke_secret_detection_token?) { false }
end
end
it 'does not scan security reports for token revocation' do
expect(ScanSecurityReportSecretsWorker).not_to receive(:perform_async)
subject
end
end
end
end
......@@ -134,7 +134,7 @@ RSpec.describe Gitlab::SidekiqConfig do
.to receive(:global).and_return(::Gitlab::SidekiqConfig::WorkerRouter.new(test_routes))
expect(described_class.worker_queue_mappings).to include('MergeWorker' => 'default',
'BuildFinishedWorker' => 'default',
'Ci::BuildFinishedWorker' => 'default',
'BackgroundMigrationWorker' => 'background_migration',
'AdminEmailWorker' => 'cronjob:admin_email')
end
......@@ -155,7 +155,7 @@ RSpec.describe Gitlab::SidekiqConfig do
mappings = described_class.current_worker_queue_mappings
expect(mappings).to include('MergeWorker' => 'default',
'BuildFinishedWorker' => 'default',
'Ci::BuildFinishedWorker' => 'default',
'BackgroundMigrationWorker' => 'background_migration')
expect(mappings).not_to include('AdminEmailWorker' => 'cronjob:admin_email')
......
......@@ -18,7 +18,7 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do
it 'initializes sidekiq_jobs_completion_seconds for the workers in the current Sidekiq process' do
allow(Gitlab::SidekiqConfig)
.to receive(:current_worker_queue_mappings)
.and_return('MergeWorker' => 'merge', 'BuildFinishedWorker' => 'default')
.and_return('MergeWorker' => 'merge', 'Ci::BuildFinishedWorker' => 'default')
expect(completion_seconds_metric)
.to receive(:get).with(queue: 'merge',
......@@ -40,7 +40,7 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do
expect(completion_seconds_metric)
.to receive(:get).with(queue: 'default',
worker: 'BuildFinishedWorker',
worker: 'Ci::BuildFinishedWorker',
urgency: 'high',
external_dependencies: 'no',
feature_category: 'continuous_integration',
......@@ -49,7 +49,7 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do
expect(completion_seconds_metric)
.to receive(:get).with(queue: 'default',
worker: 'BuildFinishedWorker',
worker: 'Ci::BuildFinishedWorker',
urgency: 'high',
external_dependencies: 'no',
feature_category: 'continuous_integration',
......@@ -73,7 +73,7 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do
it 'does not initialize sidekiq_jobs_completion_seconds' do
allow(Gitlab::SidekiqConfig)
.to receive(:current_worker_queue_mappings)
.and_return('MergeWorker' => 'merge', 'BuildFinishedWorker' => 'default')
.and_return('MergeWorker' => 'merge', 'Ci::BuildFinishedWorker' => 'default')
expect(completion_seconds_metric).not_to receive(:get)
......@@ -118,7 +118,7 @@ RSpec.describe Gitlab::SidekiqMiddleware::ServerMetrics do
allow(Gitlab::SidekiqConfig)
.to receive(:current_worker_queue_mappings)
.and_return('MergeWorker' => 'merge', 'BuildFinishedWorker' => 'default')
.and_return('MergeWorker' => 'merge', 'Ci::BuildFinishedWorker' => 'default')
allow(completion_seconds_metric).to receive(:get) do |labels|
expect { label_validator.validate(labels) }.not_to raise_error
......
......@@ -39,6 +39,34 @@ RSpec.describe Ci::Build do
it { is_expected.to delegate_method(:merge_request_ref?).to(:pipeline) }
it { is_expected.to delegate_method(:legacy_detached_merge_request_pipeline?).to(:pipeline) }
shared_examples 'calling proper BuildFinishedWorker' do
context 'when ci_build_finished_worker_namespace_changed feature flag enabled' do
before do
stub_feature_flags(ci_build_finished_worker_namespace_changed: build.project)
end
it 'calls Ci::BuildFinishedWorker' do
expect(Ci::BuildFinishedWorker).to receive(:perform_async)
expect(::BuildFinishedWorker).not_to receive(:perform_async)
subject
end
end
context 'when ci_build_finished_worker_namespace_changed feature flag disabled' do
before do
stub_feature_flags(ci_build_finished_worker_namespace_changed: false)
end
it 'calls ::BuildFinishedWorker' do
expect(::BuildFinishedWorker).to receive(:perform_async)
expect(Ci::BuildFinishedWorker).not_to receive(:perform_async)
subject
end
end
end
describe 'associations' do
it 'has a bidirectional relationship with projects' do
expect(described_class.reflect_on_association(:project).has_inverse?).to eq(:builds)
......@@ -1323,6 +1351,7 @@ RSpec.describe Ci::Build do
end
it_behaves_like 'avoid deadlock'
it_behaves_like 'calling proper BuildFinishedWorker'
it 'transits deployment status to success' do
subject
......@@ -1335,6 +1364,7 @@ RSpec.describe Ci::Build do
let(:event) { :drop! }
it_behaves_like 'avoid deadlock'
it_behaves_like 'calling proper BuildFinishedWorker'
it 'transits deployment status to failed' do
subject
......@@ -1359,6 +1389,7 @@ RSpec.describe Ci::Build do
let(:event) { :cancel! }
it_behaves_like 'avoid deadlock'
it_behaves_like 'calling proper BuildFinishedWorker'
it 'transits deployment status to canceled' do
subject
......
......@@ -3,7 +3,7 @@
require 'spec_helper'
RSpec.describe Ci::ArchiveTraceService, '#execute' do
subject { described_class.new.execute(job, worker_name: ArchiveTraceWorker.name) }
subject { described_class.new.execute(job, worker_name: Ci::ArchiveTraceWorker.name) }
context 'when job is finished' do
let(:job) { create(:ci_build, :success, :trace_live) }
......@@ -51,7 +51,7 @@ RSpec.describe Ci::ArchiveTraceService, '#execute' do
it 'leaves a warning message in sidekiq log' do
expect(Sidekiq.logger).to receive(:warn).with(
class: ArchiveTraceWorker.name,
class: Ci::ArchiveTraceWorker.name,
message: 'The job does not have live trace but going to be archived.',
job_id: job.id)
......@@ -68,7 +68,7 @@ RSpec.describe Ci::ArchiveTraceService, '#execute' do
it 'leaves a warning message in sidekiq log' do
expect(Sidekiq.logger).to receive(:warn).with(
class: ArchiveTraceWorker.name,
class: Ci::ArchiveTraceWorker.name,
message: 'The job does not have archived trace after archiving.',
job_id: job.id)
......@@ -88,7 +88,7 @@ RSpec.describe Ci::ArchiveTraceService, '#execute' do
job_id: job.id).once
expect(Sidekiq.logger).to receive(:warn).with(
class: ArchiveTraceWorker.name,
class: Ci::ArchiveTraceWorker.name,
message: "Failed to archive trace. message: Job is not finished yet.",
job_id: job.id).and_call_original
......
......@@ -10,6 +10,7 @@ RSpec.describe BuildFinishedWorker do
let_it_be(:build) { create(:ci_build, :success, pipeline: create(:ci_pipeline)) }
before do
stub_feature_flags(ci_build_finished_worker_namespace_changed: build.project)
expect(Ci::Build).to receive(:find_by).with(id: build.id).and_return(build)
end
......@@ -23,11 +24,23 @@ RSpec.describe BuildFinishedWorker do
expect(BuildHooksWorker).to receive(:perform_async)
expect(ChatNotificationWorker).not_to receive(:perform_async)
expect(ArchiveTraceWorker).to receive(:perform_in)
expect(Ci::ArchiveTraceWorker).to receive(:perform_in)
subject
end
context 'with ci_build_finished_worker_namespace_changed feature flag disabled' do
before do
stub_feature_flags(ci_build_finished_worker_namespace_changed: false)
end
it 'calls deprecated worker' do
expect(ArchiveTraceWorker).to receive(:perform_in)
subject
end
end
context 'when build is failed' do
before do
build.update!(status: :failed)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::ArchiveTraceWorker do
describe '#perform' do
subject { described_class.new.perform(job&.id) }
context 'when job is found' do
let(:job) { create(:ci_build, :trace_live) }
it 'executes service' do
allow_next_instance_of(Ci::ArchiveTraceService) do |instance|
allow(instance).to receive(:execute).with(job, anything)
end
subject
end
end
context 'when job is not found' do
let(:job) { nil }
it 'does not execute service' do
allow_next_instance_of(Ci::ArchiveTraceService) do |instance|
allow(instance).not_to receive(:execute)
end
subject
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::BuildFinishedWorker do
subject { described_class.new.perform(build.id) }
describe '#perform' do
context 'when build exists' do
let_it_be(:build) { create(:ci_build, :success, pipeline: create(:ci_pipeline)) }
before do
stub_feature_flags(ci_build_finished_worker_namespace_changed: build.project)
expect(Ci::Build).to receive(:find_by).with(id: build.id).and_return(build)
end
it 'calculates coverage and calls hooks', :aggregate_failures do
expect(build).to receive(:parse_trace_sections!).ordered
expect(build).to receive(:update_coverage).ordered
expect_next_instance_of(Ci::BuildReportResultService) do |build_report_result_service|
expect(build_report_result_service).to receive(:execute).with(build)
end
expect(BuildHooksWorker).to receive(:perform_async)
expect(ChatNotificationWorker).not_to receive(:perform_async)
expect(Ci::ArchiveTraceWorker).to receive(:perform_in)
subject
end
context 'with ci_build_finished_worker_namespace_changed feature flag disabled' do
before do
stub_feature_flags(ci_build_finished_worker_namespace_changed: false)
end
it 'calls deprecated worker' do
expect(ArchiveTraceWorker).to receive(:perform_in)
subject
end
end
context 'when build is failed' do
before do
build.update!(status: :failed)
end
it 'adds a todo' do
expect(::Ci::MergeRequests::AddTodoWhenBuildFailsWorker).to receive(:perform_async)
subject
end
end
context 'when build has a chat' do
before do
build.pipeline.update!(source: :chat)
end
it 'schedules a ChatNotification job' do
expect(ChatNotificationWorker).to receive(:perform_async).with(build.id)
subject
end
end
end
context 'when build does not exist' do
it 'does not raise exception' do
expect { described_class.new.perform(non_existing_record_id) }
.not_to raise_error
end
end
end
end
......@@ -148,7 +148,9 @@ RSpec.describe 'Every Sidekiq worker' do
'Chaos::LeakMemWorker' => 3,
'Chaos::SleepWorker' => 3,
'ChatNotificationWorker' => false,
'Ci::ArchiveTraceWorker' => 3,
'Ci::BatchResetMinutesWorker' => 10,
'Ci::BuildFinishedWorker' => 3,
'Ci::BuildPrepareWorker' => 3,
'Ci::BuildScheduleWorker' => 3,
'Ci::BuildTraceChunkFlushWorker' => 3,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment