Commit 7d80c5de authored by Marius Bobin's avatar Marius Bobin

Merge branch 'new-worker-for-artifacts-backlog' into 'master'

Introduce Ci::UpdateLockedUnknownArtifactsWorker

See merge request gitlab-org/gitlab!76509
parents c371e823 24a412b2
......@@ -186,6 +186,7 @@ module Ci
scope :downloadable, -> { where(file_type: DOWNLOADABLE_TYPES) }
scope :unlocked, -> { joins(job: :pipeline).merge(::Ci::Pipeline.unlocked) }
scope :order_expired_asc, -> { order(expire_at: :asc) }
scope :order_expired_desc, -> { order(expire_at: :desc) }
scope :with_destroy_preloads, -> { includes(project: [:route, :statistics]) }
......@@ -273,6 +274,10 @@ module Ci
self.where(project: project).sum(:size)
end
def self.pluck_job_id
pluck(:job_id)
end
##
# FastDestroyAll concerns
# rubocop: disable CodeReuse/ServiceClass
......
# frozen_string_literal: true
module Ci
module JobArtifacts
class UpdateUnknownLockedStatusService
include ::Gitlab::ExclusiveLeaseHelpers
include ::Gitlab::LoopHelpers
BATCH_SIZE = 100
LOOP_TIMEOUT = 5.minutes
LOOP_LIMIT = 100
LARGE_LOOP_LIMIT = 500
EXCLUSIVE_LOCK_KEY = 'unknown_status_job_artifacts:update:lock'
LOCK_TIMEOUT = 6.minutes
def initialize
@removed_count = 0
@locked_count = 0
@start_at = Time.current
@loop_limit = Feature.enabled?(:ci_job_artifacts_backlog_large_loop_limit) ? LARGE_LOOP_LIMIT : LOOP_LIMIT
end
def execute
in_lock(EXCLUSIVE_LOCK_KEY, ttl: LOCK_TIMEOUT, retries: 1) do
update_locked_status_on_unknown_artifacts
end
{ removed: @removed_count, locked: @locked_count }
end
private
def update_locked_status_on_unknown_artifacts
loop_until(timeout: LOOP_TIMEOUT, limit: @loop_limit) do
unknown_status_build_ids = safely_ordered_ci_job_artifacts_locked_unknown_relation.pluck_job_id.uniq
locked_pipe_build_ids = ::Ci::Build
.with_pipeline_locked_artifacts
.id_in(unknown_status_build_ids)
.pluck_primary_key
@locked_count += update_unknown_artifacts(locked_pipe_build_ids, Ci::JobArtifact.lockeds[:artifacts_locked])
unlocked_pipe_build_ids = unknown_status_build_ids - locked_pipe_build_ids
service_response = batch_destroy_artifacts(unlocked_pipe_build_ids)
@removed_count += service_response[:destroyed_artifacts_count]
end
end
def update_unknown_artifacts(build_ids, locked_value)
return 0 unless build_ids.any?
expired_locked_unknown_artifacts.for_job_ids(build_ids).update_all(locked: locked_value)
end
def batch_destroy_artifacts(build_ids)
deleteable_artifacts_relation =
if build_ids.any?
expired_locked_unknown_artifacts.for_job_ids(build_ids)
else
Ci::JobArtifact.none
end
Ci::JobArtifacts::DestroyBatchService.new(deleteable_artifacts_relation).execute
end
def expired_locked_unknown_artifacts
# UPDATE queries perform better without the specific order and limit
# https://gitlab.com/gitlab-org/gitlab/-/merge_requests/76509#note_891260455
Ci::JobArtifact.expired_before(@start_at).artifact_unknown
end
def safely_ordered_ci_job_artifacts_locked_unknown_relation
# Adding the ORDER and LIMIT improves performance when we don't have build_id
expired_locked_unknown_artifacts.limit(BATCH_SIZE).order_expired_asc
end
end
end
end
......@@ -264,6 +264,15 @@
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:ci_update_locked_unknown_artifacts
:worker_name: Ci::UpdateLockedUnknownArtifactsWorker
:feature_category: :build_artifacts
:has_external_dependencies:
:urgency: :throttled
:resource_boundary: :unknown
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:clusters_integrations_check_prometheus_health
:worker_name: Clusters::Integrations::CheckPrometheusHealthWorker
:feature_category: :incident_management
......
# frozen_string_literal: true
module Ci
class UpdateLockedUnknownArtifactsWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
data_consistency :sticky
urgency :throttled
# rubocop:disable Scalability/CronWorkerContext
# This worker does not perform work scoped to a context
include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext
feature_category :build_artifacts
def perform
return unless ::Feature.enabled?(:ci_job_artifacts_backlog_work)
artifact_counts = Ci::JobArtifacts::UpdateUnknownLockedStatusService.new.execute
log_extra_metadata_on_done(:removed_count, artifact_counts[:removed])
log_extra_metadata_on_done(:locked_count, artifact_counts[:locked])
end
end
end
---
name: ci_job_artifacts_backlog_large_loop_limit
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/76509
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/347151
milestone: '14.10'
type: development
group: group::pipeline execution
default_enabled: false
---
name: ci_job_artifacts_backlog_work
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/76509
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/347144
milestone: '14.10'
type: development
group: group::pipeline execution
default_enabled: false
......@@ -446,6 +446,9 @@ Settings.cron_jobs['pipeline_schedule_worker']['job_class'] = 'PipelineScheduleW
Settings.cron_jobs['expire_build_artifacts_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['expire_build_artifacts_worker']['cron'] ||= '*/7 * * * *'
Settings.cron_jobs['expire_build_artifacts_worker']['job_class'] = 'ExpireBuildArtifactsWorker'
Settings.cron_jobs['update_locked_unknown_artifacts_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['update_locked_unknown_artifacts_worker']['cron'] ||= '*/7 * * * *'
Settings.cron_jobs['update_locked_unknown_artifacts_worker']['job_class'] = 'Ci::UpdateLockedUnknownArtifactsWorker'
Settings.cron_jobs['ci_pipelines_expire_artifacts_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['ci_pipelines_expire_artifacts_worker']['cron'] ||= '*/23 * * * *'
Settings.cron_jobs['ci_pipelines_expire_artifacts_worker']['job_class'] = 'Ci::PipelineArtifacts::ExpireArtifactsWorker'
......
......@@ -279,6 +279,15 @@ RSpec.describe Ci::JobArtifact do
end
end
describe '.order_expired_asc' do
let_it_be(:first_artifact) { create(:ci_job_artifact, expire_at: 2.days.ago) }
let_it_be(:second_artifact) { create(:ci_job_artifact, expire_at: 1.day.ago) }
it 'returns ordered artifacts' do
expect(described_class.order_expired_asc).to eq([first_artifact, second_artifact])
end
end
describe '.for_project' do
it 'returns artifacts only for given project(s)', :aggregate_failures do
artifact1 = create(:ci_job_artifact)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::JobArtifacts::UpdateUnknownLockedStatusService, :clean_gitlab_redis_shared_state do
include ExclusiveLeaseHelpers
let(:service) { described_class.new }
describe '.execute' do
subject { service.execute }
let_it_be(:locked_pipeline) { create(:ci_pipeline, :artifacts_locked) }
let_it_be(:pipeline) { create(:ci_pipeline, :unlocked) }
let_it_be(:locked_job) { create(:ci_build, :success, pipeline: locked_pipeline) }
let_it_be(:job) { create(:ci_build, :success, pipeline: pipeline) }
let!(:unknown_unlocked_artifact) do
create(:ci_job_artifact, :junit, expire_at: 1.hour.ago, job: job, locked: Ci::JobArtifact.lockeds[:unknown])
end
let!(:unknown_locked_artifact) do
create(:ci_job_artifact, :lsif,
expire_at: 1.day.ago,
job: locked_job,
locked: Ci::JobArtifact.lockeds[:unknown]
)
end
let!(:unlocked_artifact) do
create(:ci_job_artifact, :archive, expire_at: 1.hour.ago, job: job, locked: Ci::JobArtifact.lockeds[:unlocked])
end
let!(:locked_artifact) do
create(:ci_job_artifact, :sast, :raw,
expire_at: 1.day.ago,
job: locked_job,
locked: Ci::JobArtifact.lockeds[:artifacts_locked]
)
end
context 'when artifacts are expired' do
it 'sets artifact_locked when the pipeline is locked' do
expect { service.execute }
.to change { unknown_locked_artifact.reload.locked }.from('unknown').to('artifacts_locked')
.and not_change { Ci::JobArtifact.exists?(locked_artifact.id) }
end
it 'destroys the artifact when the pipeline is unlocked' do
expect { subject }.to change { Ci::JobArtifact.exists?(unknown_unlocked_artifact.id) }.from(true).to(false)
end
it 'does not update ci_job_artifact rows with known locked values' do
expect { service.execute }
.to not_change(locked_artifact, :attributes)
.and not_change { Ci::JobArtifact.exists?(locked_artifact.id) }
.and not_change(unlocked_artifact, :attributes)
.and not_change { Ci::JobArtifact.exists?(unlocked_artifact.id) }
end
it 'logs the counts of affected artifacts' do
expect(subject).to eq({ removed: 1, locked: 1 })
end
end
context 'in a single iteration' do
before do
stub_const("#{described_class}::BATCH_SIZE", 1)
end
context 'due to the LOOP_TIMEOUT' do
before do
stub_const("#{described_class}::LOOP_TIMEOUT", 0.seconds)
end
it 'affects the earliest expired artifact first' do
subject
expect(unknown_locked_artifact.reload.locked).to eq('artifacts_locked')
expect(unknown_unlocked_artifact.reload.locked).to eq('unknown')
end
it 'reports the number of destroyed artifacts' do
is_expected.to eq({ removed: 0, locked: 1 })
end
end
context 'due to @loop_limit' do
before do
stub_const("#{described_class}::LARGE_LOOP_LIMIT", 1)
end
it 'affects the most recently expired artifact first' do
subject
expect(unknown_locked_artifact.reload.locked).to eq('artifacts_locked')
expect(unknown_unlocked_artifact.reload.locked).to eq('unknown')
end
it 'reports the number of destroyed artifacts' do
is_expected.to eq({ removed: 0, locked: 1 })
end
end
end
context 'when artifact is not expired' do
let!(:unknown_unlocked_artifact) do
create(:ci_job_artifact, :junit,
expire_at: 1.year.from_now,
job: job,
locked: Ci::JobArtifact.lockeds[:unknown]
)
end
it 'does not change the locked status' do
expect { service.execute }.not_to change { unknown_unlocked_artifact.locked }
expect(Ci::JobArtifact.exists?(unknown_unlocked_artifact.id)).to eq(true)
end
end
context 'when exclusive lease has already been taken by the other instance' do
before do
stub_exclusive_lease_taken(described_class::EXCLUSIVE_LOCK_KEY, timeout: described_class::LOCK_TIMEOUT)
end
it 'raises an error and' do
expect { subject }.to raise_error(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError)
end
end
context 'when there are no unknown status artifacts' do
before do
Ci::JobArtifact.update_all(locked: :unlocked)
end
it 'does not raise error' do
expect { subject }.not_to raise_error
end
it 'reports the number of destroyed artifacts' do
is_expected.to eq({ removed: 0, locked: 0 })
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::UpdateLockedUnknownArtifactsWorker do
let(:worker) { described_class.new }
describe '#perform' do
it 'executes an instance of Ci::JobArtifacts::UpdateUnknownLockedStatusService' do
expect_next_instance_of(Ci::JobArtifacts::UpdateUnknownLockedStatusService) do |instance|
expect(instance).to receive(:execute).and_call_original
end
expect(worker).to receive(:log_extra_metadata_on_done).with(:removed_count, 0)
expect(worker).to receive(:log_extra_metadata_on_done).with(:locked_count, 0)
worker.perform
end
context 'with the ci_job_artifacts_backlog_work flag shut off' do
before do
stub_feature_flags(ci_job_artifacts_backlog_work: false)
end
it 'does not instantiate a new Ci::JobArtifacts::UpdateUnknownLockedStatusService' do
expect(Ci::JobArtifacts::UpdateUnknownLockedStatusService).not_to receive(:new)
worker.perform
end
it 'does not log any artifact counts' do
expect(worker).not_to receive(:log_extra_metadata_on_done)
worker.perform
end
it 'does not query the database' do
query_count = ActiveRecord::QueryRecorder.new { worker.perform }.count
expect(query_count).to eq(0)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment