Commit 731c40a0 authored by Allison Browne's avatar Allison Browne Committed by Fabio Pitino

Refactor: Extract JobArtifactsDestroyAsyncService

Extract parallel batch destruction of artifacts for
reuse in project destruction
parent d61780e9
...@@ -17,9 +17,9 @@ module Ci ...@@ -17,9 +17,9 @@ module Ci
.lock('FOR UPDATE SKIP LOCKED') .lock('FOR UPDATE SKIP LOCKED')
end end
def self.bulk_import(artifacts) def self.bulk_import(artifacts, pick_up_at = nil)
attributes = artifacts.each.with_object([]) do |artifact, accumulator| attributes = artifacts.each.with_object([]) do |artifact, accumulator|
record = artifact.to_deleted_object_attrs record = artifact.to_deleted_object_attrs(pick_up_at)
accumulator << record if record[:store_dir] && record[:file] accumulator << record if record[:store_dir] && record[:file]
end end
......
...@@ -307,12 +307,12 @@ module Ci ...@@ -307,12 +307,12 @@ module Ci
max_size&.megabytes.to_i max_size&.megabytes.to_i
end end
def to_deleted_object_attrs def to_deleted_object_attrs(pick_up_at = nil)
{ {
file_store: file_store, file_store: file_store,
store_dir: file.store_dir.to_s, store_dir: file.store_dir.to_s,
file: file_identifier, file: file_identifier,
pick_up_at: expire_at || Time.current pick_up_at: pick_up_at || expire_at || Time.current
} }
end end
......
...@@ -4,7 +4,6 @@ module Ci ...@@ -4,7 +4,6 @@ module Ci
class DestroyExpiredJobArtifactsService class DestroyExpiredJobArtifactsService
include ::Gitlab::ExclusiveLeaseHelpers include ::Gitlab::ExclusiveLeaseHelpers
include ::Gitlab::LoopHelpers include ::Gitlab::LoopHelpers
include ::Gitlab::Utils::StrongMemoize
BATCH_SIZE = 100 BATCH_SIZE = 100
LOOP_TIMEOUT = 5.minutes LOOP_TIMEOUT = 5.minutes
...@@ -34,50 +33,20 @@ module Ci ...@@ -34,50 +33,20 @@ module Ci
def destroy_job_artifacts_with_slow_iteration(start_at) def destroy_job_artifacts_with_slow_iteration(start_at)
Ci::JobArtifact.expired_before(start_at).each_batch(of: BATCH_SIZE, column: :expire_at, order: :desc) do |relation, index| Ci::JobArtifact.expired_before(start_at).each_batch(of: BATCH_SIZE, column: :expire_at, order: :desc) do |relation, index|
artifacts = relation.unlocked.with_destroy_preloads.to_a # For performance reasons, join with ci_pipelines after the batch is queried.
# See: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/47496
artifacts = relation.unlocked
service_response = destroy_batch_async(artifacts)
@removed_artifacts_count += service_response[:destroyed_artifacts_count]
parallel_destroy_batch(artifacts) if artifacts.any?
break if loop_timeout?(start_at) break if loop_timeout?(start_at)
break if index >= LOOP_LIMIT break if index >= LOOP_LIMIT
end end
end end
def parallel_destroy_batch(job_artifacts) def destroy_batch_async(artifacts)
Ci::DeletedObject.transaction do Ci::JobArtifactsDestroyBatchService.new(artifacts).execute
Ci::DeletedObject.bulk_import(job_artifacts)
Ci::JobArtifact.id_in(job_artifacts.map(&:id)).delete_all
destroy_related_records_for(job_artifacts)
end
# This is executed outside of the transaction because it depends on Redis
update_project_statistics_for(job_artifacts)
increment_monitoring_statistics(job_artifacts.size)
end
# This method is implemented in EE and it must do only database work
def destroy_related_records_for(job_artifacts); end
def update_project_statistics_for(job_artifacts)
artifacts_by_project = job_artifacts.group_by(&:project)
artifacts_by_project.each do |project, artifacts|
delta = -artifacts.sum { |artifact| artifact.size.to_i }
ProjectStatistics.increment_statistic(
project, Ci::JobArtifact.project_statistics_name, delta)
end
end
def increment_monitoring_statistics(size)
destroyed_artifacts_counter.increment({}, size)
@removed_artifacts_count += size
end
def destroyed_artifacts_counter
strong_memoize(:destroyed_artifacts_counter) do
name = :destroyed_job_artifacts_count_total
comment = 'Counter of destroyed expired job artifacts'
::Gitlab::Metrics.counter(name, comment)
end
end end
def loop_timeout?(start_at) def loop_timeout?(start_at)
...@@ -85,5 +54,3 @@ module Ci ...@@ -85,5 +54,3 @@ module Ci
end end
end end
end end
Ci::DestroyExpiredJobArtifactsService.prepend_if_ee('EE::Ci::DestroyExpiredJobArtifactsService')
# frozen_string_literal: true
module Ci
class JobArtifactsDestroyBatchService
include BaseServiceUtility
include ::Gitlab::Utils::StrongMemoize
# Danger: Private - Should only be called in Ci Services that pass a batch of job artifacts
# Not for use outsie of the ci namespace
# Adds the passed batch of job artifacts to the `ci_deleted_objects` table
# for asyncronous destruction of the objects in Object Storage via the `Ci::DeleteObjectsService`
# and then deletes the batch of related `ci_job_artifacts` records.
# Params:
# +job_artifacts+:: A relation of job artifacts to destroy (fewer than MAX_JOB_ARTIFACT_BATCH_SIZE)
# +pick_up_at+:: When to pick up for deletion of files
# Returns:
# +Hash+:: A hash with status and destroyed_artifacts_count keys
def initialize(job_artifacts, pick_up_at: nil)
@job_artifacts = job_artifacts.with_destroy_preloads.to_a
@pick_up_at = pick_up_at
end
# rubocop: disable CodeReuse/ActiveRecord
def execute
return success(destroyed_artifacts_count: artifacts_count) if @job_artifacts.empty?
Ci::DeletedObject.transaction do
Ci::DeletedObject.bulk_import(@job_artifacts, @pick_up_at)
Ci::JobArtifact.id_in(@job_artifacts.map(&:id)).delete_all
destroy_related_records(@job_artifacts)
end
# This is executed outside of the transaction because it depends on Redis
update_project_statistics
increment_monitoring_statistics(artifacts_count)
success(destroyed_artifacts_count: artifacts_count)
end
# rubocop: enable CodeReuse/ActiveRecord
private
# This method is implemented in EE and it must do only database work
def destroy_related_records(artifacts); end
def update_project_statistics
artifacts_by_project = @job_artifacts.group_by(&:project)
artifacts_by_project.each do |project, artifacts|
delta = -artifacts.sum { |artifact| artifact.size.to_i }
ProjectStatistics.increment_statistic(
project, Ci::JobArtifact.project_statistics_name, delta)
end
end
def increment_monitoring_statistics(size)
metrics.increment_destroyed_artifacts(size)
end
def metrics
@metrics ||= ::Gitlab::Ci::Artifacts::Metrics.new
end
def artifacts_count
strong_memoize(:artifacts_count) do
@job_artifacts.count
end
end
end
end
Ci::JobArtifactsDestroyBatchService.prepend_if_ee('EE::Ci::JobArtifactsDestroyBatchService')
...@@ -2,17 +2,17 @@ ...@@ -2,17 +2,17 @@
module EE module EE
module Ci module Ci
module DestroyExpiredJobArtifactsService module JobArtifactsDestroyBatchService
extend ::Gitlab::Utils::Override extend ::Gitlab::Utils::Override
private private
override :destroy_related_records_for override :destroy_related_records
def destroy_related_records_for(artifacts) def destroy_related_records(artifacts)
destroy_security_findings_for(artifacts) destroy_security_findings(artifacts)
end end
def destroy_security_findings_for(artifacts) def destroy_security_findings(artifacts)
job_ids = artifacts.map(&:job_id) job_ids = artifacts.map(&:job_id)
::Security::Finding.by_build_ids(job_ids).delete_all ::Security::Finding.by_build_ids(job_ids).delete_all
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::JobArtifactsDestroyBatchService do
describe '.execute' do
subject { service.execute }
let(:service) { described_class.new(Ci::JobArtifact.all, pick_up_at: Time.current) }
let_it_be(:artifact) { create(:ci_job_artifact) }
let_it_be(:security_scan) { create(:security_scan, build: artifact.job) }
let_it_be(:security_finding) { create(:security_finding, scan: security_scan) }
it 'destroys all expired artifacts' do
expect { subject }.to change { Ci::JobArtifact.count }.by(-1)
.and change { Security::Finding.count }.from(1).to(0)
end
end
end
# frozen_string_literal: true
module Gitlab
module Ci
module Artifacts
class Metrics
include Gitlab::Utils::StrongMemoize
def increment_destroyed_artifacts(size)
destroyed_artifacts_counter.increment({}, size.to_i)
end
private
def destroyed_artifacts_counter
strong_memoize(:destroyed_artifacts_counter) do
name = :destroyed_job_artifacts_count_total
comment = 'Counter of destroyed expired job artifacts'
::Gitlab::Metrics.counter(name, comment)
end
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Ci::Artifacts::Metrics, :prometheus do
let(:metrics) { described_class.new }
describe '#increment_destroyed_artifacts' do
context 'when incrementing by more than one' do
let(:counter) { metrics.send(:destroyed_artifacts_counter) }
it 'increments a single counter' do
subject.increment_destroyed_artifacts(10)
subject.increment_destroyed_artifacts(20)
subject.increment_destroyed_artifacts(30)
expect(counter.get).to eq 60
expect(counter.values.count).to eq 1
end
end
end
end
...@@ -77,14 +77,6 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared ...@@ -77,14 +77,6 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared
it 'does not remove the files' do it 'does not remove the files' do
expect { subject }.not_to change { artifact.file.exists? } expect { subject }.not_to change { artifact.file.exists? }
end end
it 'reports metrics for destroyed artifacts' do
counter = service.send(:destroyed_artifacts_counter)
expect(counter).to receive(:increment).with({}, 1).and_call_original
subject
end
end end
end end
...@@ -244,5 +236,17 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared ...@@ -244,5 +236,17 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared
expect { subject }.to change { Ci::JobArtifact.count }.by(-1) expect { subject }.to change { Ci::JobArtifact.count }.by(-1)
end end
end end
context 'when all artifacts are locked' do
before do
pipeline = create(:ci_pipeline, locked: :artifacts_locked)
job = create(:ci_build, pipeline: pipeline)
artifact.update!(job: job)
end
it 'destroys no artifacts' do
expect { subject }.to change { Ci::JobArtifact.count }.by(0)
end
end
end end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::JobArtifactsDestroyBatchService do
include ExclusiveLeaseHelpers
let(:artifacts) { Ci::JobArtifact.all }
let(:service) { described_class.new(artifacts, pick_up_at: Time.current) }
describe '.execute' do
subject(:execute) { service.execute }
let_it_be(:artifact, refind: true) do
create(:ci_job_artifact)
end
context 'when the artifact has a file attached to it' do
before do
artifact.file = fixture_file_upload(Rails.root.join('spec/fixtures/ci_build_artifacts.zip'), 'application/zip')
artifact.save!
end
it 'creates a deleted object' do
expect { subject }.to change { Ci::DeletedObject.count }.by(1)
end
it 'resets project statistics' do
expect(ProjectStatistics).to receive(:increment_statistic).once
.with(artifact.project, :build_artifacts_size, -artifact.file.size)
.and_call_original
execute
end
it 'does not remove the files' do
expect { execute }.not_to change { artifact.file.exists? }
end
it 'reports metrics for destroyed artifacts' do
expect_next_instance_of(Gitlab::Ci::Artifacts::Metrics) do |metrics|
expect(metrics).to receive(:increment_destroyed_artifacts).with(1).and_call_original
end
execute
end
end
context 'when failed to destroy artifact' do
context 'when the import fails' do
before do
expect(Ci::DeletedObject)
.to receive(:bulk_import)
.once
.and_raise(ActiveRecord::RecordNotDestroyed)
end
it 'raises an exception and stop destroying' do
expect { execute }.to raise_error(ActiveRecord::RecordNotDestroyed)
.and not_change { Ci::JobArtifact.count }.from(1)
end
end
end
context 'when there are no artifacts' do
let(:artifacts) { Ci::JobArtifact.none }
before do
artifact.destroy!
end
it 'does not raise error' do
expect { execute }.not_to raise_error
end
it 'reports the number of destroyed artifacts' do
is_expected.to eq(destroyed_artifacts_count: 0, status: :success)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment