Commit 4d9e9bb0 authored by Marius Bobin's avatar Marius Bobin Committed by Bob Van Landuyt

Move pipeline artifacts removal process into its own worker

Move pipeline artifacts removal process into its own worker
parent 9b3bc6b3
...@@ -75,6 +75,8 @@ module Ci ...@@ -75,6 +75,8 @@ module Ci
# TODO: Make sure this can also be parallelized # TODO: Make sure this can also be parallelized
# https://gitlab.com/gitlab-org/gitlab/-/issues/270973 # https://gitlab.com/gitlab-org/gitlab/-/issues/270973
def destroy_pipeline_artifacts_batch def destroy_pipeline_artifacts_batch
return false if ::Feature.enabled?(:ci_split_pipeline_artifacts_removal)
artifacts = Ci::PipelineArtifact.expired(BATCH_SIZE).to_a artifacts = Ci::PipelineArtifact.expired(BATCH_SIZE).to_a
return false if artifacts.empty? return false if artifacts.empty?
......
# frozen_string_literal: true
module Ci
module PipelineArtifacts
class DestroyExpiredArtifactsService
include ::Gitlab::LoopHelpers
include ::Gitlab::Utils::StrongMemoize
BATCH_SIZE = 100
LOOP_TIMEOUT = 5.minutes
LOOP_LIMIT = 1000
def initialize
@removed_artifacts_count = 0
end
def execute
loop_until(timeout: LOOP_TIMEOUT, limit: LOOP_LIMIT) do
destroy_artifacts_batch
end
@removed_artifacts_count
end
private
def destroy_artifacts_batch
artifacts = ::Ci::PipelineArtifact.expired(BATCH_SIZE).to_a
return false if artifacts.empty?
artifacts.each(&:destroy!)
increment_stats(artifacts.size)
true
end
def increment_stats(size)
destroyed_artifacts_counter.increment({}, size)
@removed_artifacts_count += size
end
def destroyed_artifacts_counter
strong_memoize(:destroyed_artifacts_counter) do
name = :destroyed_pipeline_artifacts_count_total
comment = 'Counter of destroyed expired pipeline artifacts'
::Gitlab::Metrics.counter(name, comment)
end
end
end
end
end
...@@ -147,6 +147,14 @@ ...@@ -147,6 +147,14 @@
:weight: 1 :weight: 1
:idempotent: :idempotent:
:tags: [] :tags: []
- :name: cronjob:ci_pipeline_artifacts_expire_artifacts
:feature_category: :continuous_integration
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:ci_platform_metrics_update_cron - :name: cronjob:ci_platform_metrics_update_cron
:feature_category: :continuous_integration :feature_category: :continuous_integration
:has_external_dependencies: :has_external_dependencies:
......
# frozen_string_literal: true
module Ci
module PipelineArtifacts
class ExpireArtifactsWorker
include ApplicationWorker
# rubocop:disable Scalability/CronWorkerContext
# This worker does not perform work scoped to a context
include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext
deduplicate :until_executed, including_scheduled: true
idempotent!
feature_category :continuous_integration
def perform
return unless ::Feature.enabled?(:ci_split_pipeline_artifacts_removal)
service = ::Ci::PipelineArtifacts::DestroyExpiredArtifactsService.new
artifacts_count = service.execute
log_extra_metadata_on_done(:destroyed_pipeline_artifacts_count, artifacts_count)
end
end
end
end
---
name: ci_split_pipeline_artifacts_removal
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/50446
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/295300
milestone: '13.8'
type: development
group: group::continuous integration
default_enabled: false
...@@ -436,6 +436,9 @@ production: &base ...@@ -436,6 +436,9 @@ production: &base
# Remove expired build artifacts # Remove expired build artifacts
expire_build_artifacts_worker: expire_build_artifacts_worker:
cron: "*/7 * * * *" cron: "*/7 * * * *"
# Remove expired pipeline artifacts
ci_pipelines_expire_artifacts_worker:
cron: "*/23 * * * *"
# Remove files from object storage # Remove files from object storage
ci_schedule_delete_objects_worker: ci_schedule_delete_objects_worker:
cron: "*/16 * * * *" cron: "*/16 * * * *"
......
...@@ -420,6 +420,9 @@ Settings.cron_jobs['pipeline_schedule_worker']['job_class'] = 'PipelineScheduleW ...@@ -420,6 +420,9 @@ Settings.cron_jobs['pipeline_schedule_worker']['job_class'] = 'PipelineScheduleW
Settings.cron_jobs['expire_build_artifacts_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['expire_build_artifacts_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['expire_build_artifacts_worker']['cron'] ||= '*/7 * * * *' Settings.cron_jobs['expire_build_artifacts_worker']['cron'] ||= '*/7 * * * *'
Settings.cron_jobs['expire_build_artifacts_worker']['job_class'] = 'ExpireBuildArtifactsWorker' Settings.cron_jobs['expire_build_artifacts_worker']['job_class'] = 'ExpireBuildArtifactsWorker'
Settings.cron_jobs['ci_pipelines_expire_artifacts_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['ci_pipelines_expire_artifacts_worker']['cron'] ||= '*/23 * * * *'
Settings.cron_jobs['ci_pipelines_expire_artifacts_worker']['job_class'] = 'Ci::PipelineArtifacts::ExpireArtifactsWorker'
Settings.cron_jobs['ci_schedule_delete_objects_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['ci_schedule_delete_objects_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['ci_schedule_delete_objects_worker']['cron'] ||= '*/16 * * * *' Settings.cron_jobs['ci_schedule_delete_objects_worker']['cron'] ||= '*/16 * * * *'
Settings.cron_jobs['ci_schedule_delete_objects_worker']['job_class'] = 'Ci::ScheduleDeleteObjectsCronWorker' Settings.cron_jobs['ci_schedule_delete_objects_worker']['job_class'] = 'Ci::ScheduleDeleteObjectsCronWorker'
......
...@@ -210,6 +210,7 @@ configuration option in `gitlab.yml`. These metrics are served from the ...@@ -210,6 +210,7 @@ configuration option in `gitlab.yml`. These metrics are served from the
| `limited_capacity_worker_max_running_jobs` | Gauge | 13.5 | Maximum number of running jobs | `worker` | | `limited_capacity_worker_max_running_jobs` | Gauge | 13.5 | Maximum number of running jobs | `worker` |
| `limited_capacity_worker_remaining_work_count` | Gauge | 13.5 | Number of jobs waiting to be enqueued | `worker` | | `limited_capacity_worker_remaining_work_count` | Gauge | 13.5 | Number of jobs waiting to be enqueued | `worker` |
| `destroyed_job_artifacts_count_total` | Counter | 13.6 | Number of destroyed expired job artifacts | | | `destroyed_job_artifacts_count_total` | Counter | 13.6 | Number of destroyed expired job artifacts | |
| `destroyed_pipeline_artifacts_count_total` | Counter | 13.8 | Number of destroyed expired pipeline artifacts | |
## Database load balancing metrics **(PREMIUM ONLY)** ## Database load balancing metrics **(PREMIUM ONLY)**
......
...@@ -220,11 +220,23 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared ...@@ -220,11 +220,23 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared
before do before do
[pipeline_artifact_1, pipeline_artifact_2].each { |pipeline_artifact| pipeline_artifact.pipeline.unlocked! } [pipeline_artifact_1, pipeline_artifact_2].each { |pipeline_artifact| pipeline_artifact.pipeline.unlocked! }
stub_feature_flags(ci_split_pipeline_artifacts_removal: false)
end end
it 'destroys pipeline artifacts' do it 'destroys pipeline artifacts' do
expect { subject }.to change { Ci::PipelineArtifact.count }.by(-2) expect { subject }.to change { Ci::PipelineArtifact.count }.by(-2)
end end
context 'with ci_split_pipeline_artifacts_removal enabled' do
before do
stub_feature_flags(ci_split_pipeline_artifacts_removal: true)
end
it 'does not destroy pipeline artifacts' do
expect { subject }.not_to change { Ci::PipelineArtifact.count }
end
end
end end
context 'when artifacts are not expired' do context 'when artifacts are not expired' do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::PipelineArtifacts::DestroyExpiredArtifactsService do
let(:service) { described_class.new }
describe '.execute' do
subject { service.execute }
context 'when timeout happens' do
before do
stub_const('Ci::PipelineArtifacts::DestroyExpiredArtifactsService::LOOP_TIMEOUT', 0.1.seconds)
allow(service).to receive(:destroy_artifacts_batch) { true }
end
it 'returns 0 and does not continue destroying' do
is_expected.to eq(0)
end
end
context 'when there are no artifacts' do
it 'does not raise error' do
expect { subject }.not_to raise_error
end
end
context 'when the loop limit is reached' do
before do
stub_const('::Ci::PipelineArtifacts::DestroyExpiredArtifactsService::LOOP_LIMIT', 1)
stub_const('::Ci::PipelineArtifacts::DestroyExpiredArtifactsService::BATCH_SIZE', 1)
create_list(:ci_pipeline_artifact, 2, expire_at: 1.week.ago)
end
it 'destroys one artifact' do
expect { subject }.to change { Ci::PipelineArtifact.count }.by(-1)
end
it 'reports the number of destroyed artifacts' do
is_expected.to eq(1)
end
end
context 'when there are artifacts more than batch sizes' do
before do
stub_const('Ci::PipelineArtifacts::DestroyExpiredArtifactsService::BATCH_SIZE', 1)
create_list(:ci_pipeline_artifact, 2, expire_at: 1.week.ago)
end
it 'destroys all expired artifacts' do
expect { subject }.to change { Ci::PipelineArtifact.count }.by(-2)
end
it 'reports the number of destroyed artifacts' do
is_expected.to eq(2)
end
end
context 'when artifacts are not expired' do
before do
create(:ci_pipeline_artifact, expire_at: 2.days.from_now)
end
it 'does not destroy pipeline artifacts' do
expect { subject }.not_to change { Ci::PipelineArtifact.count }
end
it 'reports the number of destroyed artifacts' do
is_expected.to eq(0)
end
end
end
describe '.destroy_artifacts_batch' do
it 'returns a falsy value without artifacts' do
expect(service.send(:destroy_artifacts_batch)).to be_falsy
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::PipelineArtifacts::ExpireArtifactsWorker do
let(:worker) { described_class.new }
describe '#perform' do
let_it_be(:pipeline_artifact) do
create(:ci_pipeline_artifact, expire_at: 1.week.ago)
end
it 'executes a service' do
expect_next_instance_of(::Ci::PipelineArtifacts::DestroyExpiredArtifactsService) do |instance|
expect(instance).to receive(:execute)
end
worker.perform
end
include_examples 'an idempotent worker' do
subject do
perform_multiple(worker: worker)
end
it 'removes the artifact only once' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:destroyed_pipeline_artifacts_count, 1)
expect(worker).to receive(:log_extra_metadata_on_done).with(:destroyed_pipeline_artifacts_count, 0)
subject
expect { pipeline_artifact.reload }.to raise_error(ActiveRecord::RecordNotFound)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment