Commit dc669d91 authored by Marius Bobin's avatar Marius Bobin

Use slow iteration to remove expired artifacts

Use batching of slow iterations on expire_at column
parent c38c150c
...@@ -9,6 +9,7 @@ module Ci ...@@ -9,6 +9,7 @@ module Ci
include Sortable include Sortable
include Artifactable include Artifactable
include FileStoreMounter include FileStoreMounter
include EachBatch
extend Gitlab::Ci::Model extend Gitlab::Ci::Model
TEST_REPORT_FILE_TYPES = %w[junit].freeze TEST_REPORT_FILE_TYPES = %w[junit].freeze
...@@ -176,7 +177,8 @@ module Ci ...@@ -176,7 +177,8 @@ module Ci
end end
scope :downloadable, -> { where(file_type: DOWNLOADABLE_TYPES) } scope :downloadable, -> { where(file_type: DOWNLOADABLE_TYPES) }
scope :unlocked, -> { joins(job: :pipeline).merge(::Ci::Pipeline.unlocked).order(expire_at: :desc) } scope :unlocked, -> { joins(job: :pipeline).merge(::Ci::Pipeline.unlocked) }
scope :order_expired_desc, -> { order(expire_at: :desc) }
scope :with_destroy_preloads, -> { includes(project: [:route, :statistics]) } scope :with_destroy_preloads, -> { includes(project: [:route, :statistics]) }
scope :scoped_project, -> { where('ci_job_artifacts.project_id = projects.id') } scope :scoped_project, -> { where('ci_job_artifacts.project_id = projects.id') }
......
...@@ -18,7 +18,8 @@ module Ci ...@@ -18,7 +18,8 @@ module Ci
gzip: 3 gzip: 3
}, _suffix: true }, _suffix: true
scope :expired, -> (limit) { where('expire_at < ?', Time.current).limit(limit) } scope :expired_before, -> (timestamp) { where(arel_table[:expire_at].lt(timestamp)) }
scope :expired, -> (limit) { expired_before(Time.current).limit(limit) }
end end
def each_blob(&blk) def each_blob(&blk)
......
...@@ -47,7 +47,7 @@ module EachBatch ...@@ -47,7 +47,7 @@ module EachBatch
# order_hint does not affect the search results. For example, # order_hint does not affect the search results. For example,
# `ORDER BY id ASC, updated_at ASC` means the same thing as `ORDER # `ORDER BY id ASC, updated_at ASC` means the same thing as `ORDER
# BY id ASC`. # BY id ASC`.
def each_batch(of: 1000, column: primary_key, order_hint: nil) def each_batch(of: 1000, column: primary_key, order: :asc, order_hint: nil)
unless column unless column
raise ArgumentError, raise ArgumentError,
'the column: argument must be set to a column name to use for ordering rows' 'the column: argument must be set to a column name to use for ordering rows'
...@@ -55,7 +55,7 @@ module EachBatch ...@@ -55,7 +55,7 @@ module EachBatch
start = except(:select) start = except(:select)
.select(column) .select(column)
.reorder(column => :asc) .reorder(column => order)
start = start.order(order_hint) if order_hint start = start.order(order_hint) if order_hint
start = start.take start = start.take
...@@ -66,10 +66,12 @@ module EachBatch ...@@ -66,10 +66,12 @@ module EachBatch
arel_table = self.arel_table arel_table = self.arel_table
1.step do |index| 1.step do |index|
start_cond = arel_table[column].gteq(start_id)
start_cond = arel_table[column].lteq(start_id) if order == :desc
stop = except(:select) stop = except(:select)
.select(column) .select(column)
.where(arel_table[column].gteq(start_id)) .where(start_cond)
.reorder(column => :asc) .reorder(column => order)
stop = stop.order(order_hint) if order_hint stop = stop.order(order_hint) if order_hint
stop = stop stop = stop
...@@ -77,12 +79,14 @@ module EachBatch ...@@ -77,12 +79,14 @@ module EachBatch
.limit(1) .limit(1)
.take .take
relation = where(arel_table[column].gteq(start_id)) relation = where(start_cond)
if stop if stop
stop_id = stop[column] stop_id = stop[column]
start_id = stop_id start_id = stop_id
relation = relation.where(arel_table[column].lt(stop_id)) stop_cond = arel_table[column].lt(stop_id)
stop_cond = arel_table[column].gt(stop_id) if order == :desc
relation = relation.where(stop_cond)
end end
# Any ORDER BYs are useless for this relation and can lead to less # Any ORDER BYs are useless for this relation and can lead to less
......
...@@ -20,14 +20,30 @@ module Ci ...@@ -20,14 +20,30 @@ module Ci
# which is scheduled every 7 minutes. # which is scheduled every 7 minutes.
def execute def execute
in_lock(EXCLUSIVE_LOCK_KEY, ttl: LOCK_TIMEOUT, retries: 1) do in_lock(EXCLUSIVE_LOCK_KEY, ttl: LOCK_TIMEOUT, retries: 1) do
loop_until(timeout: LOOP_TIMEOUT, limit: LOOP_LIMIT) do if ::Feature.enabled?(:ci_slow_artifacts_removal)
destroy_artifacts_batch destroy_job_and_pipeline_artifacts
else
loop_until(timeout: LOOP_TIMEOUT, limit: LOOP_LIMIT) do
destroy_artifacts_batch
end
end end
end end
end end
private private
def destroy_job_and_pipeline_artifacts
start_at = Time.current
destroy_job_artifacts_with_slow_iteration(start_at)
timeout = LOOP_TIMEOUT - (Time.current - start_at)
return false if timeout < 0
loop_until(timeout: timeout, limit: LOOP_LIMIT) do
destroy_pipeline_artifacts_batch
end
end
def destroy_artifacts_batch def destroy_artifacts_batch
destroy_job_artifacts_batch || destroy_pipeline_artifacts_batch destroy_job_artifacts_batch || destroy_pipeline_artifacts_batch
end end
...@@ -36,6 +52,7 @@ module Ci ...@@ -36,6 +52,7 @@ module Ci
artifacts = Ci::JobArtifact artifacts = Ci::JobArtifact
.expired(BATCH_SIZE) .expired(BATCH_SIZE)
.unlocked .unlocked
.order_expired_desc
.with_destroy_preloads .with_destroy_preloads
.to_a .to_a
...@@ -45,6 +62,16 @@ module Ci ...@@ -45,6 +62,16 @@ module Ci
true true
end end
def destroy_job_artifacts_with_slow_iteration(start_at)
Ci::JobArtifact.expired_before(start_at).each_batch(of: BATCH_SIZE, column: :expire_at, order: :desc) do |relation, index|
artifacts = relation.unlocked.with_destroy_preloads.to_a
parallel_destroy_batch(artifacts) if artifacts.any?
break if loop_timeout?(start_at)
break if index >= LOOP_LIMIT
end
end
# TODO: Make sure this can also be parallelized # TODO: Make sure this can also be parallelized
# https://gitlab.com/gitlab-org/gitlab/-/issues/270973 # https://gitlab.com/gitlab-org/gitlab/-/issues/270973
def destroy_pipeline_artifacts_batch def destroy_pipeline_artifacts_batch
...@@ -88,6 +115,10 @@ module Ci ...@@ -88,6 +115,10 @@ module Ci
::Gitlab::Metrics.counter(name, comment) ::Gitlab::Metrics.counter(name, comment)
end end
end end
def loop_timeout?(start_at)
Time.current > start_at + LOOP_TIMEOUT
end
end end
end end
......
---
title: Fix database timeout errors when removing expired job artifacts
merge_request: 47496
author:
type: fixed
---
name: ci_slow_artifacts_removal
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/47496
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/281688
milestone: '13.8'
type: development
group: 'group::continuous integration'
default_enabled: false
...@@ -219,6 +219,39 @@ RSpec.describe Ci::JobArtifact do ...@@ -219,6 +219,39 @@ RSpec.describe Ci::JobArtifact do
end end
end end
describe '.unlocked' do
let_it_be(:job_artifact) { create(:ci_job_artifact) }
context 'with locked pipelines' do
before do
job_artifact.job.pipeline.artifacts_locked!
end
it 'returns an empty array' do
expect(described_class.unlocked).to be_empty
end
end
context 'with unlocked pipelines' do
before do
job_artifact.job.pipeline.unlocked!
end
it 'returns the artifact' do
expect(described_class.unlocked).to eq([job_artifact])
end
end
end
describe '.order_expired_desc' do
let_it_be(:first_artifact) { create(:ci_job_artifact, expire_at: 2.days.ago) }
let_it_be(:second_artifact) { create(:ci_job_artifact, expire_at: 1.day.ago) }
it 'returns ordered artifacts' do
expect(described_class.order_expired_desc).to eq([second_artifact, first_artifact])
end
end
describe 'callbacks' do describe 'callbacks' do
describe '#schedule_background_upload' do describe '#schedule_background_upload' do
subject { create(:ci_job_artifact, :archive) } subject { create(:ci_job_artifact, :archive) }
......
...@@ -54,4 +54,23 @@ RSpec.describe Ci::Artifactable do ...@@ -54,4 +54,23 @@ RSpec.describe Ci::Artifactable do
end end
end end
end end
context 'ActiveRecord scopes' do
let_it_be(:recently_expired_artifact) { create(:ci_job_artifact, expire_at: 1.day.ago) }
let_it_be(:later_expired_artifact) { create(:ci_job_artifact, expire_at: 2.days.ago) }
let_it_be(:not_expired_artifact) { create(:ci_job_artifact, expire_at: 1.day.from_now) }
describe '.expired_before' do
it 'returns expired artifacts' do
expect(Ci::JobArtifact.expired_before(1.hour.ago))
.to match_array([recently_expired_artifact, later_expired_artifact])
end
end
describe '.expired' do
it 'returns a limited number of expired artifacts' do
expect(Ci::JobArtifact.expired(1).order_id_asc).to eq([recently_expired_artifact])
end
end
end
end end
...@@ -56,5 +56,21 @@ RSpec.describe EachBatch do ...@@ -56,5 +56,21 @@ RSpec.describe EachBatch do
it_behaves_like 'each_batch handling', {} it_behaves_like 'each_batch handling', {}
it_behaves_like 'each_batch handling', { order_hint: :updated_at } it_behaves_like 'each_batch handling', { order_hint: :updated_at }
it 'orders ascending by default' do
ids = []
model.each_batch(of: 1) { |rel| ids.concat(rel.ids) }
expect(ids).to eq(ids.sort)
end
it 'accepts descending order' do
ids = []
model.each_batch(of: 1, order: :desc) { |rel| ids.concat(rel.ids) }
expect(ids).to eq(ids.sort.reverse)
end
end end
end end
...@@ -30,14 +30,16 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared ...@@ -30,14 +30,16 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared
it 'performs the smallest number of queries for job_artifacts' do it 'performs the smallest number of queries for job_artifacts' do
log = ActiveRecord::QueryRecorder.new { subject } log = ActiveRecord::QueryRecorder.new { subject }
# SELECT expired ci_job_artifacts # SELECT expired ci_job_artifacts - 3 queries from each_batch
# PRELOAD projects, routes, project_statistics # PRELOAD projects, routes, project_statistics
# BEGIN # BEGIN
# INSERT into ci_deleted_objects # INSERT into ci_deleted_objects
# DELETE loaded ci_job_artifacts # DELETE loaded ci_job_artifacts
# DELETE security_findings -- for EE # DELETE security_findings -- for EE
# COMMIT # COMMIT
expect(log.count).to be_within(1).of(8) # SELECT next expired ci_job_artifacts
expect(log.count).to be_within(1).of(11)
end end
end end
...@@ -164,7 +166,7 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared ...@@ -164,7 +166,7 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared
context 'when timeout happens' do context 'when timeout happens' do
before do before do
stub_const('Ci::DestroyExpiredJobArtifactsService::LOOP_TIMEOUT', 1.second) stub_const('Ci::DestroyExpiredJobArtifactsService::LOOP_TIMEOUT', 1.second)
allow_any_instance_of(described_class).to receive(:destroy_artifacts_batch) { true } allow_any_instance_of(described_class).to receive(:destroy_pipeline_artifacts_batch) { true }
end end
it 'returns false and does not continue destroying' do it 'returns false and does not continue destroying' do
...@@ -182,10 +184,6 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared ...@@ -182,10 +184,6 @@ RSpec.describe Ci::DestroyExpiredJobArtifactsService, :clean_gitlab_redis_shared
let!(:second_artifact) { create(:ci_job_artifact, expire_at: 1.day.ago) } let!(:second_artifact) { create(:ci_job_artifact, expire_at: 1.day.ago) }
it 'raises an error and does not continue destroying' do
is_expected.to be_falsy
end
it 'destroys one artifact' do it 'destroys one artifact' do
expect { subject }.to change { Ci::JobArtifact.count }.by(-1) expect { subject }.to change { Ci::JobArtifact.count }.by(-1)
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment