Commit 0334ca5d authored by David Fernandez's avatar David Fernandez Committed by Bob Van Landuyt

Implement the loopless mode for the cleanup policy worker

parent 67665130
......@@ -7,6 +7,7 @@ class ContainerRepository < ApplicationRecord
include Sortable
WAITING_CLEANUP_STATUSES = %i[cleanup_scheduled cleanup_unfinished].freeze
REQUIRING_CLEANUP_STATUSES = %i[cleanup_unscheduled cleanup_scheduled].freeze
belongs_to :project
......@@ -31,6 +32,7 @@ class ContainerRepository < ApplicationRecord
scope :for_project_id, ->(project_id) { where(project_id: project_id) }
scope :search_by_name, ->(query) { fuzzy_search(query, [:name], use_minimum_char_limit: false) }
scope :waiting_for_cleanup, -> { where(expiration_policy_cleanup_status: WAITING_CLEANUP_STATUSES) }
scope :expiration_policy_started_at_nil_or_before, ->(timestamp) { where('expiration_policy_started_at < ? OR expiration_policy_started_at IS NULL', timestamp) }
def self.exists_by_path?(path)
where(
......@@ -39,6 +41,23 @@ class ContainerRepository < ApplicationRecord
).exists?
end
def self.with_enabled_policy
joins("INNER JOIN container_expiration_policies ON container_repositories.project_id = container_expiration_policies.project_id")
.where(container_expiration_policies: { enabled: true })
end
def self.requiring_cleanup
where(
container_repositories: { expiration_policy_cleanup_status: REQUIRING_CLEANUP_STATUSES },
project_id: ::ContainerExpirationPolicy.runnable_schedules
.select(:project_id)
)
end
def self.with_unfinished_cleanup
with_enabled_policy.cleanup_unfinished
end
# rubocop: disable CodeReuse/ServiceClass
def registry
@registry ||= begin
......
......@@ -13,7 +13,14 @@ module ContainerExpirationPolicies
def execute
return ServiceResponse.error(message: 'no repository') unless repository
unless policy.valid?
disable_policy!
return ServiceResponse.error(message: 'invalid policy')
end
repository.start_expiration_policy!
schedule_next_run_if_needed
begin
service_result = Projects::ContainerRepository::CleanupTagsService
......@@ -28,7 +35,6 @@ module ContainerExpirationPolicies
if service_result[:status] == :success
repository.update!(
expiration_policy_cleanup_status: :cleanup_unscheduled,
expiration_policy_started_at: nil,
expiration_policy_completed_at: Time.zone.now
)
......@@ -42,6 +48,27 @@ module ContainerExpirationPolicies
private
def schedule_next_run_if_needed
return unless Feature.enabled?(:container_registry_expiration_policies_loopless)
return if policy.next_run_at.future?
repos_before_next_run = ::ContainerRepository.for_project_id(policy.project_id)
.expiration_policy_started_at_nil_or_before(policy.next_run_at)
return if repos_before_next_run.exists?
policy.schedule_next_run!
end
def disable_policy!
policy.disable!
repository.cleanup_unscheduled!
Gitlab::ErrorTracking.log_exception(
::ContainerExpirationPolicyWorker::InvalidPolicyError.new,
container_expiration_policy_id: policy.id
)
end
def success(cleanup_status, service_result)
payload = {
cleanup_status: cleanup_status,
......
......@@ -24,16 +24,22 @@ module ContainerExpirationPolicies
cleanup_tags_service_deleted_size
].freeze
delegate :perform_work, :remaining_work_count, to: :inner_instance
def perform_work
return unless throttling_enabled?
return unless container_repository
def inner_instance
strong_memoize(:inner_instance) do
if loopless_enabled?
Loopless.new(self)
else
Looping.new(self)
end
log_extra_metadata_on_done(:container_repository_id, container_repository.id)
log_extra_metadata_on_done(:project_id, project.id)
unless allowed_to_run?
container_repository.cleanup_unscheduled!
log_extra_metadata_on_done(:cleanup_status, :skipped)
return
end
result = ContainerExpirationPolicies::CleanupService.new(container_repository)
.execute
log_on_done(result)
end
def max_running_jobs
......@@ -42,6 +48,97 @@ module ContainerExpirationPolicies
::Gitlab::CurrentSettings.container_registry_expiration_policies_worker_capacity
end
def remaining_work_count
total_count = cleanup_scheduled_count + cleanup_unfinished_count
log_info(
cleanup_scheduled_count: cleanup_scheduled_count,
cleanup_unfinished_count: cleanup_unfinished_count,
cleanup_total_count: total_count
)
total_count
end
private
def container_repository
strong_memoize(:container_repository) do
ContainerRepository.transaction do
# rubocop: disable CodeReuse/ActiveRecord
# We need a lock to prevent two workers from picking up the same row
container_repository = if loopless_enabled?
next_container_repository
else
ContainerRepository.waiting_for_cleanup
.order(:expiration_policy_cleanup_status, :expiration_policy_started_at)
.limit(1)
.lock('FOR UPDATE SKIP LOCKED')
.first
end
# rubocop: enable CodeReuse/ActiveRecord
container_repository&.tap(&:cleanup_ongoing!)
end
end
end
def next_container_repository
# rubocop: disable CodeReuse/ActiveRecord
next_one_requiring = ContainerRepository.requiring_cleanup
.order(:expiration_policy_cleanup_status, :expiration_policy_started_at)
.limit(1)
.lock('FOR UPDATE SKIP LOCKED')
.first
return next_one_requiring if next_one_requiring
ContainerRepository.with_unfinished_cleanup
.order(:expiration_policy_started_at)
.limit(1)
.lock('FOR UPDATE SKIP LOCKED')
.first
# rubocop: enable CodeReuse/ActiveRecord
end
def cleanup_scheduled_count
strong_memoize(:cleanup_scheduled_count) do
if loopless_enabled?
limit = max_running_jobs + 1
ContainerExpirationPolicy.with_container_repositories
.runnable_schedules
.limit(limit)
.count
else
ContainerRepository.cleanup_scheduled.count
end
end
end
def cleanup_unfinished_count
strong_memoize(:cleanup_unfinished_count) do
if loopless_enabled?
limit = max_running_jobs + 1
ContainerRepository.with_unfinished_cleanup
.limit(limit)
.count
else
ContainerRepository.cleanup_unfinished.count
end
end
end
def allowed_to_run?
return false unless policy&.enabled && policy&.next_run_at
now = Time.zone.now
if loopless_enabled?
policy.next_run_at < now || (now + max_cleanup_execution_time.seconds < policy.next_run_at)
else
now + max_cleanup_execution_time.seconds < policy.next_run_at
end
end
def throttling_enabled?
Feature.enabled?(:container_registry_expiration_policies_throttling)
end
......@@ -59,6 +156,11 @@ module ContainerExpirationPolicies
end
def log_on_done(result)
if result.error?
log_extra_metadata_on_done(:cleanup_status, :error)
log_extra_metadata_on_done(:cleanup_error_message, result.message)
end
LOG_ON_DONE_FIELDS.each do |field|
value = result.payload[field]
......@@ -76,75 +178,6 @@ module ContainerExpirationPolicies
log_extra_metadata_on_done(:running_jobs_count, running_jobs_count)
end
# rubocop: disable Scalability/IdempotentWorker
# TODO: move the logic from this class to the parent one when container_registry_expiration_policies_loopless is removed
# Tracking issue: https://gitlab.com/gitlab-org/gitlab/-/issues/325273
class Loopless
# TODO fill the logic here with the approach documented in
# https://gitlab.com/gitlab-org/gitlab/-/issues/267546#limited-worker
def initialize(parent)
@parent = parent
end
end
# rubocop: enable Scalability/IdempotentWorker
# rubocop: disable Scalability/IdempotentWorker
# TODO remove this class when `container_registry_expiration_policies_loopless` is removed
# Tracking issue: https://gitlab.com/gitlab-org/gitlab/-/issues/325273
class Looping
include Gitlab::Utils::StrongMemoize
delegate :throttling_enabled?,
:log_extra_metadata_on_done,
:log_info,
:log_on_done,
:max_cleanup_execution_time,
to: :@parent
def initialize(parent)
@parent = parent
end
def perform_work
return unless throttling_enabled?
return unless container_repository
log_extra_metadata_on_done(:container_repository_id, container_repository.id)
log_extra_metadata_on_done(:project_id, project.id)
unless allowed_to_run?(container_repository)
container_repository.cleanup_unscheduled!
log_extra_metadata_on_done(:cleanup_status, :skipped)
return
end
result = ContainerExpirationPolicies::CleanupService.new(container_repository)
.execute
log_on_done(result)
end
def remaining_work_count
cleanup_scheduled_count = ContainerRepository.cleanup_scheduled.count
cleanup_unfinished_count = ContainerRepository.cleanup_unfinished.count
total_count = cleanup_scheduled_count + cleanup_unfinished_count
log_info(
cleanup_scheduled_count: cleanup_scheduled_count,
cleanup_unfinished_count: cleanup_unfinished_count,
cleanup_total_count: total_count
)
total_count
end
private
def allowed_to_run?(container_repository)
return false unless policy&.enabled && policy&.next_run_at
Time.zone.now + max_cleanup_execution_time.seconds < policy.next_run_at
end
def policy
project.container_expiration_policy
end
......@@ -152,23 +185,5 @@ module ContainerExpirationPolicies
def project
container_repository.project
end
def container_repository
strong_memoize(:container_repository) do
ContainerRepository.transaction do
# rubocop: disable CodeReuse/ActiveRecord
# We need a lock to prevent two workers from picking up the same row
container_repository = ContainerRepository.waiting_for_cleanup
.order(:expiration_policy_cleanup_status, :expiration_policy_started_at)
.limit(1)
.lock('FOR UPDATE SKIP LOCKED')
.first
# rubocop: enable CodeReuse/ActiveRecord
container_repository&.tap(&:cleanup_ongoing!)
end
end
end
end
# rubocop: enable Scalability/IdempotentWorker
end
end
---
title: Add indexes for cleanup policies on container_repositories and container_expiration_policies
merge_request: 58123
author:
type: added
# frozen_string_literal: true
class AddProjectIdNextRunAtIndexToContainerExpirationPolicies < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
disable_ddl_transaction!
INDEX_NAME = 'idx_container_exp_policies_on_project_id_next_run_at'
def up
add_concurrent_index :container_expiration_policies, [:project_id, :next_run_at], name: INDEX_NAME, where: 'enabled = true'
end
def down
remove_concurrent_index :container_expiration_policies, [:project_id, :next_run_at], name: INDEX_NAME
end
end
eaefc2a0f08ce312b1ae3fb100e4a818eb3013b95c38d940371a25b605b09ca1
\ No newline at end of file
......@@ -21991,6 +21991,8 @@ CREATE INDEX idx_award_emoji_on_user_emoji_name_awardable_type_awardable_id ON a
CREATE INDEX idx_ci_pipelines_artifacts_locked ON ci_pipelines USING btree (ci_ref_id, id) WHERE (locked = 1);
CREATE INDEX idx_container_exp_policies_on_project_id_next_run_at ON container_expiration_policies USING btree (project_id, next_run_at) WHERE (enabled = true);
CREATE INDEX idx_container_exp_policies_on_project_id_next_run_at_enabled ON container_expiration_policies USING btree (project_id, next_run_at, enabled);
CREATE INDEX idx_container_repositories_on_exp_cleanup_status_and_start_date ON container_repositories USING btree (expiration_policy_cleanup_status, expiration_policy_started_at);
......@@ -3,6 +3,8 @@
require 'spec_helper'
RSpec.describe ContainerRepository do
using RSpec::Parameterized::TableSyntax
let(:group) { create(:group, name: 'group') }
let(:project) { create(:project, path: 'test', group: group) }
......@@ -29,18 +31,6 @@ RSpec.describe ContainerRepository do
end
end
describe '.exists_by_path?' do
it 'returns true for known container repository paths' do
path = ContainerRegistry::Path.new("#{project.full_path}/#{repository.name}")
expect(described_class.exists_by_path?(path)).to be_truthy
end
it 'returns false for unknown container repository paths' do
path = ContainerRegistry::Path.new('you/dont/know/me')
expect(described_class.exists_by_path?(path)).to be_falsey
end
end
describe '#tag' do
it 'has a test tag' do
expect(repository.tag('test')).not_to be_nil
......@@ -359,6 +349,17 @@ RSpec.describe ContainerRepository do
it { is_expected.to contain_exactly(repository) }
end
describe '.expiration_policy_started_at_nil_or_before' do
let_it_be(:repository1) { create(:container_repository, expiration_policy_started_at: nil) }
let_it_be(:repository2) { create(:container_repository, expiration_policy_started_at: 1.day.ago) }
let_it_be(:repository3) { create(:container_repository, expiration_policy_started_at: 2.hours.ago) }
let_it_be(:repository4) { create(:container_repository, expiration_policy_started_at: 1.week.ago) }
subject { described_class.expiration_policy_started_at_nil_or_before(3.hours.ago) }
it { is_expected.to contain_exactly(repository1, repository2, repository4) }
end
describe '.waiting_for_cleanup' do
let_it_be(:repository_cleanup_scheduled) { create(:container_repository, :cleanup_scheduled) }
let_it_be(:repository_cleanup_unfinished) { create(:container_repository, :cleanup_unfinished) }
......@@ -368,4 +369,74 @@ RSpec.describe ContainerRepository do
it { is_expected.to contain_exactly(repository_cleanup_scheduled, repository_cleanup_unfinished) }
end
describe '.exists_by_path?' do
it 'returns true for known container repository paths' do
path = ContainerRegistry::Path.new("#{project.full_path}/#{repository.name}")
expect(described_class.exists_by_path?(path)).to be_truthy
end
it 'returns false for unknown container repository paths' do
path = ContainerRegistry::Path.new('you/dont/know/me')
expect(described_class.exists_by_path?(path)).to be_falsey
end
end
describe '.with_enabled_policy' do
let_it_be(:repository) { create(:container_repository) }
let_it_be(:repository2) { create(:container_repository) }
subject { described_class.with_enabled_policy }
before do
repository.project.container_expiration_policy.update!(enabled: true)
end
it { is_expected.to eq([repository]) }
end
context 'with repositories' do
let_it_be_with_reload(:repository) { create(:container_repository, :cleanup_unscheduled) }
let_it_be(:other_repository) { create(:container_repository, :cleanup_unscheduled) }
let(:policy) { repository.project.container_expiration_policy }
before do
ContainerExpirationPolicy.update_all(enabled: true)
end
describe '.requiring_cleanup' do
subject { described_class.requiring_cleanup }
context 'with next_run_at in the future' do
before do
policy.update_column(:next_run_at, 10.minutes.from_now)
end
it { is_expected.to eq([]) }
end
context 'with next_run_at in the past' do
before do
policy.update_column(:next_run_at, 10.minutes.ago)
end
it { is_expected.to eq([repository]) }
end
end
describe '.with_unfinished_cleanup' do
subject { described_class.with_unfinished_cleanup }
it { is_expected.to eq([]) }
context 'with an unfinished repository' do
before do
repository.cleanup_unfinished!
end
it { is_expected.to eq([repository]) }
end
end
end
end
......@@ -3,7 +3,7 @@
require 'spec_helper'
RSpec.describe ContainerExpirationPolicies::CleanupService do
let_it_be(:repository, reload: true) { create(:container_repository) }
let_it_be(:repository, reload: true) { create(:container_repository, expiration_policy_started_at: 30.minutes.ago) }
let_it_be(:project) { repository.project }
let(:service) { described_class.new(repository) }
......@@ -11,6 +11,7 @@ RSpec.describe ContainerExpirationPolicies::CleanupService do
describe '#execute' do
subject { service.execute }
shared_examples 'cleaning up a container repository' do
context 'with a successful cleanup tags service execution' do
let(:cleanup_tags_service_params) { project.container_expiration_policy.policy_params.merge('container_expiration_policy' => true) }
let(:cleanup_tags_service) { instance_double(Projects::ContainerRepository::CleanupTagsService) }
......@@ -27,8 +28,8 @@ RSpec.describe ContainerExpirationPolicies::CleanupService do
expect(response.payload).to include(cleanup_status: :finished, container_repository_id: repository.id)
expect(ContainerRepository.waiting_for_cleanup.count).to eq(0)
expect(repository.reload.cleanup_unscheduled?).to be_truthy
expect(repository.expiration_policy_started_at).to eq(nil)
expect(repository.expiration_policy_completed_at).not_to eq(nil)
expect(repository.expiration_policy_started_at).not_to eq(nil)
end
end
end
......@@ -94,9 +95,25 @@ RSpec.describe ContainerExpirationPolicies::CleanupService do
let(:service) { described_class.new(nil) }
it 'returns an error response' do
response = subject
expect(subject.success?).to eq(false)
expect(subject.message).to eq('no repository')
end
end
expect(response.success?).to eq(false)
context 'with an invalid policy' do
let(:policy) { repository.project.container_expiration_policy }
before do
policy.name_regex = nil
policy.enabled = true
repository.expiration_policy_cleanup_status = :cleanup_ongoing
end
it 'returns an error response' do
expect { subject }.to change { repository.expiration_policy_cleanup_status }.from('cleanup_ongoing').to('cleanup_unscheduled')
expect(subject.success?).to eq(false)
expect(subject.message).to eq('invalid policy')
expect(policy).not_to be_enabled
end
end
......@@ -116,4 +133,93 @@ RSpec.describe ContainerExpirationPolicies::CleanupService do
end
end
end
context 'with loopless enabled' do
let(:policy) { repository.project.container_expiration_policy }
before do
policy.update!(enabled: true)
policy.update_column(:next_run_at, 5.minutes.ago)
end
it_behaves_like 'cleaning up a container repository'
context 'next run scheduling' do
let_it_be_with_reload(:repository2) { create(:container_repository, project: project) }
let_it_be_with_reload(:repository3) { create(:container_repository, project: project) }
before do
cleanup_tags_service = instance_double(Projects::ContainerRepository::CleanupTagsService)
allow(Projects::ContainerRepository::CleanupTagsService)
.to receive(:new).and_return(cleanup_tags_service)
allow(cleanup_tags_service).to receive(:execute).and_return(status: :success)
end
shared_examples 'not scheduling the next run' do
it 'does not scheduled the next run' do
expect(policy).not_to receive(:schedule_next_run!)
expect { subject }.not_to change { policy.reload.next_run_at }
end
end
shared_examples 'scheduling the next run' do
it 'schedules the next run' do
expect(policy).to receive(:schedule_next_run!).and_call_original
expect { subject }.to change { policy.reload.next_run_at }
end
end
context 'with cleanups started_at before policy next_run_at' do
before do
ContainerRepository.update_all(expiration_policy_started_at: 10.minutes.ago)
end
it_behaves_like 'not scheduling the next run'
end
context 'with cleanups started_at around policy next_run_at' do
before do
repository3.update!(expiration_policy_started_at: policy.next_run_at + 10.minutes.ago)
end
it_behaves_like 'not scheduling the next run'
end
context 'with only the current repository started_at before the policy next_run_at' do
before do
repository2.update!(expiration_policy_started_at: policy.next_run_at + 10.minutes)
repository3.update!(expiration_policy_started_at: policy.next_run_at + 12.minutes)
end
it_behaves_like 'scheduling the next run'
end
context 'with cleanups started_at after policy next_run_at' do
before do
ContainerRepository.update_all(expiration_policy_started_at: policy.next_run_at + 10.minutes)
end
it_behaves_like 'scheduling the next run'
end
context 'with a future policy next_run_at' do
before do
policy.update_column(:next_run_at, 5.minutes.from_now)
end
it_behaves_like 'not scheduling the next run'
end
end
end
context 'with loopless disabled' do
before do
stub_feature_flags(container_registry_expiration_policies_loopless: false)
end
it_behaves_like 'cleaning up a container repository'
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment