Commit 0b4849db authored by David Fernandez's avatar David Fernandez Committed by Bob Van Landuyt

Throttle the cleanup policies execution

By using a limited capacity worker and dedicated database column on
container repositories to mark them as:

* cleanup_scheduled: waiting for a worker pickup (happens after a
cleanup policy execution)
* cleanup_ongoing: worker cleaning up this repository
* cleanup_unfinished: worker cleaned part of the repository but had to
stop
* cleanup_unscheduled: nothing to do
parent 2fdb2d91
......@@ -293,6 +293,9 @@ class ApplicationSetting < ApplicationRecord
validates :container_registry_delete_tags_service_timeout,
numericality: { only_integer: true, greater_than_or_equal_to: 0 }
validates :container_registry_expiration_policies_worker_capacity,
numericality: { only_integer: true, greater_than_or_equal_to: 0 }
SUPPORTED_KEY_TYPES.each do |type|
validates :"#{type}_key_restriction", presence: true, key_restriction: { type: type }
end
......
......@@ -167,7 +167,8 @@ module ApplicationSettingImplementation
user_default_internal_regex: nil,
user_show_add_ssh_key_message: true,
wiki_page_max_content_bytes: 50.megabytes,
container_registry_delete_tags_service_timeout: 100
container_registry_delete_tags_service_timeout: 250,
container_registry_expiration_policies_worker_capacity: 0
}
end
......
......@@ -5,6 +5,13 @@ class ContainerExpirationPolicy < ApplicationRecord
include UsageStatistics
include EachBatch
POLICY_PARAMS = %w[
older_than
keep_n
name_regex
name_regex_keep
].freeze
belongs_to :project, inverse_of: :container_expiration_policy
delegate :container_repositories, to: :project
......@@ -20,8 +27,8 @@ class ContainerExpirationPolicy < ApplicationRecord
scope :active, -> { where(enabled: true) }
scope :preloaded, -> { preload(project: [:route]) }
def self.executable
runnable_schedules.where(
def self.with_container_repositories
where(
'EXISTS (?)',
ContainerRepository.select(1)
.where(
......@@ -67,4 +74,8 @@ class ContainerExpirationPolicy < ApplicationRecord
def disable!
update_attribute(:enabled, false)
end
def policy_params
attributes.slice(*POLICY_PARAMS)
end
end
......@@ -3,6 +3,9 @@
class ContainerRepository < ApplicationRecord
include Gitlab::Utils::StrongMemoize
include Gitlab::SQL::Pattern
include EachBatch
WAITING_CLEANUP_STATUSES = %i[cleanup_scheduled cleanup_unfinished].freeze
belongs_to :project
......@@ -10,6 +13,7 @@ class ContainerRepository < ApplicationRecord
validates :name, uniqueness: { scope: :project_id }
enum status: { delete_scheduled: 0, delete_failed: 1 }
enum expiration_policy_cleanup_status: { cleanup_unscheduled: 0, cleanup_scheduled: 1, cleanup_unfinished: 2, cleanup_ongoing: 3 }
delegate :client, to: :registry
......@@ -24,7 +28,9 @@ class ContainerRepository < ApplicationRecord
ContainerRepository
.joins("INNER JOIN (#{project_scope.to_sql}) projects on projects.id=container_repositories.project_id")
end
scope :for_project_id, ->(project_id) { where(project_id: project_id) }
scope :search_by_name, ->(query) { fuzzy_search(query, [:name], use_minimum_char_limit: false) }
scope :waiting_for_cleanup, -> { where(expiration_policy_cleanup_status: WAITING_CLEANUP_STATUSES) }
def self.exists_by_path?(path)
where(
......
# frozen_string_literal: true
module ContainerExpirationPolicies
class CleanupService
attr_reader :repository
def initialize(repository)
@repository = repository
end
def execute
return ServiceResponse.error(message: 'no repository') unless repository
repository.start_expiration_policy!
result = Projects::ContainerRepository::CleanupTagsService
.new(project, nil, policy_params.merge('container_expiration_policy' => true))
.execute(repository)
if result[:status] == :success
repository.update!(
expiration_policy_cleanup_status: :cleanup_unscheduled,
expiration_policy_started_at: nil
)
success(:finished)
else
repository.cleanup_unfinished!
success(:unfinished)
end
end
private
def success(cleanup_status)
ServiceResponse.success(message: "cleanup #{cleanup_status}", payload: { cleanup_status: cleanup_status, container_repository_id: repository.id })
end
def policy_params
return {} unless policy
policy.policy_params
end
def policy
project.container_expiration_policy
end
def project
repository&.project
end
end
end
......@@ -4,20 +4,14 @@ class ContainerExpirationPolicyService < BaseService
InvalidPolicyError = Class.new(StandardError)
def execute(container_expiration_policy)
unless container_expiration_policy.valid?
container_expiration_policy.disable!
raise InvalidPolicyError
end
container_expiration_policy.schedule_next_run!
container_expiration_policy.container_repositories.find_each do |container_repository|
CleanupContainerRepositoryWorker.perform_async(
nil,
container_repository.id,
container_expiration_policy.attributes
.except('created_at', 'updated_at')
.merge(container_expiration_policy: true)
container_expiration_policy.policy_params
.merge(container_expiration_policy: true)
)
end
end
......
......@@ -97,7 +97,15 @@
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent:
:idempotent: true
:tags: []
- :name: container_repository:container_expiration_policies_cleanup_container_repository
:feature_category: :container_registry
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: container_repository:delete_container_repository
:feature_category: :container_registry
......
# frozen_string_literal: true
class CleanupContainerRepositoryWorker # rubocop:disable Scalability/IdempotentWorker
class CleanupContainerRepositoryWorker
include ApplicationWorker
queue_namespace :container_repository
feature_category :container_registry
urgency :low
worker_resource_boundary :unknown
idempotent!
loggable_arguments 2
attr_reader :container_repository, :current_user
......
# frozen_string_literal: true
module ContainerExpirationPolicies
class CleanupContainerRepositoryWorker
include ApplicationWorker
include LimitedCapacity::Worker
include Gitlab::Utils::StrongMemoize
queue_namespace :container_repository
feature_category :container_registry
urgency :low
worker_resource_boundary :unknown
idempotent!
def perform_work
return unless throttling_enabled?
return unless container_repository
unless allowed_to_run?(container_repository)
container_repository.cleanup_unscheduled!
log_info(container_repository_id: container_repository.id, cleanup_status: :skipped)
return
end
result = ContainerExpirationPolicies::CleanupService.new(container_repository)
.execute
log_extra_metadata_on_done(:container_repository_id, result.payload[:container_repository_id])
log_extra_metadata_on_done(:cleanup_status, result.payload[:cleanup_status])
end
def remaining_work_count
cleanup_scheduled_count = ContainerRepository.cleanup_scheduled.count
cleanup_unfinished_count = ContainerRepository.cleanup_unfinished.count
total_count = cleanup_scheduled_count + cleanup_unfinished_count
log_info(
cleanup_scheduled_count: cleanup_scheduled_count,
cleanup_unfinished_count: cleanup_unfinished_count,
cleanup_total_count: total_count
)
total_count
end
def max_running_jobs
return 0 unless throttling_enabled?
::Gitlab::CurrentSettings.current_application_settings.container_registry_expiration_policies_worker_capacity
end
private
def allowed_to_run?(container_repository)
return false unless policy&.enabled && policy&.next_run_at
Time.zone.now + max_cleanup_execution_time.seconds < policy.next_run_at
end
def throttling_enabled?
Feature.enabled?(:container_registry_expiration_policies_throttling)
end
def max_cleanup_execution_time
::Gitlab::CurrentSettings.current_application_settings.container_registry_delete_tags_service_timeout
end
def policy
project.container_expiration_policy
end
def project
container_repository&.project
end
def container_repository
strong_memoize(:container_repository) do
ContainerRepository.transaction do
# rubocop: disable CodeReuse/ActiveRecord
# We need a lock to prevent two workers from picking up the same row
container_repository = ContainerRepository.waiting_for_cleanup
.order(:expiration_policy_cleanup_status, :expiration_policy_started_at)
.limit(1)
.lock('FOR UPDATE SKIP LOCKED')
.first
# rubocop: enable CodeReuse/ActiveRecord
container_repository&.tap(&:cleanup_ongoing!)
end
end
end
def log_info(extra_structure)
logger.info(structured_payload(extra_structure))
end
end
end
......@@ -3,20 +3,79 @@
class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
include CronjobQueue
include ExclusiveLeaseGuard
feature_category :container_registry
InvalidPolicyError = Class.new(StandardError)
BATCH_SIZE = 1000.freeze
def perform
ContainerExpirationPolicy.executable.preloaded.each_batch do |relation|
relation.each do |container_expiration_policy|
with_context(project: container_expiration_policy.project,
user: container_expiration_policy.project.owner) do |project:, user:|
ContainerExpirationPolicyService.new(project, user)
.execute(container_expiration_policy)
rescue ContainerExpirationPolicyService::InvalidPolicyError => e
Gitlab::ErrorTracking.log_exception(e, container_expiration_policy_id: container_expiration_policy.id)
throttling_enabled? ? perform_throttled : perform_unthrottled
end
private
def perform_unthrottled
with_runnable_policy(preloaded: true) do |policy|
with_context(project: policy.project,
user: policy.project.owner) do |project:, user:|
ContainerExpirationPolicyService.new(project, user)
.execute(policy)
end
end
end
def perform_throttled
try_obtain_lease do
with_runnable_policy do |policy|
policy.schedule_next_run!
ContainerRepository.for_project_id(policy.id)
.each_batch do |relation|
relation.update_all(expiration_policy_cleanup_status: :cleanup_scheduled)
end
end
ContainerExpirationPolicies::CleanupContainerRepositoryWorker.perform_with_capacity
end
end
# TODO : remove the preload option when cleaning FF container_registry_expiration_policies_throttling
def with_runnable_policy(preloaded: false)
ContainerExpirationPolicy.runnable_schedules.each_batch(of: BATCH_SIZE) do |policies|
# rubocop: disable CodeReuse/ActiveRecord
cte = Gitlab::SQL::CTE.new(:batched_policies, policies.limit(BATCH_SIZE))
# rubocop: enable CodeReuse/ActiveRecord
scope = cte.apply_to(ContainerExpirationPolicy.all).with_container_repositories
scope = scope.preloaded if preloaded
scope.each do |policy|
if policy.valid?
ContainerExpirationPolicy.transaction do
yield policy
end
else
disable_invalid_policy!(policy)
end
end
end
end
def disable_invalid_policy!(policy)
policy.disable!
Gitlab::ErrorTracking.log_exception(
::ContainerExpirationPolicyWorker::InvalidPolicyError.new,
container_expiration_policy_id: policy.id
)
end
def throttling_enabled?
Feature.enabled?(:container_registry_expiration_policies_throttling)
end
def lease_timeout
5.hours
end
end
---
title: Throttle container cleanup policies execution by using a limited capacity worker
merge_request: 40740
author:
type: changed
# frozen_string_literal: true
class AddContainerExpirationPolicyWorkerSettingsToApplicationSettings < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def up
unless column_exists?(:application_settings, :container_registry_expiration_policies_worker_capacity)
add_column(:application_settings, :container_registry_expiration_policies_worker_capacity, :integer, default: 0, null: false)
end
end
def down
if column_exists?(:application_settings, :container_registry_expiration_policies_worker_capacity)
remove_column(:application_settings, :container_registry_expiration_policies_worker_capacity)
end
end
end
# frozen_string_literal: true
class AddExpirationPolicyCleanupStatusToContainerRepositories < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
INDEX_NAME = 'idx_container_repositories_on_exp_cleanup_status_and_start_date'
disable_ddl_transaction!
def up
unless column_exists?(:container_repositories, :expiration_policy_cleanup_status)
add_column(:container_repositories, :expiration_policy_cleanup_status, :integer, limit: 2, default: 0, null: false)
end
add_concurrent_index(:container_repositories, [:expiration_policy_cleanup_status, :expiration_policy_started_at], name: INDEX_NAME)
end
def down
remove_concurrent_index(:container_repositories, [:expiration_policy_cleanup_status, :expiration_policy_started_at], name: INDEX_NAME)
if column_exists?(:container_repositories, :expiration_policy_cleanup_status)
remove_column(:container_repositories, :expiration_policy_cleanup_status)
end
end
end
# frozen_string_literal: true
# See https://docs.gitlab.com/ee/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class AddContainerRegistryExpirationPoliciesWorkerCapacityConstraint < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
CONSTRAINT_NAME = 'app_settings_registry_exp_policies_worker_capacity_positive'
disable_ddl_transaction!
def up
add_check_constraint :application_settings, 'container_registry_expiration_policies_worker_capacity >= 0', CONSTRAINT_NAME
end
def down
remove_check_constraint :application_settings, CONSTRAINT_NAME
end
end
# frozen_string_literal: true
class AddIndexProjectIdAndIdToContainerRepositories < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
INDEX_NAME = 'index_container_repositories_on_project_id_and_id'
disable_ddl_transaction!
def up
add_concurrent_index(:container_repositories, [:project_id, :id], name: INDEX_NAME)
end
def down
remove_concurrent_index(:container_repositories, [:project_id, :id], name: INDEX_NAME)
end
end
80d2beb7a1c5f60a4bf3462054fa5bcd0488152b6754f8a7164046201fcb08ed
\ No newline at end of file
1e274e744ed9e225e2ee09afc15871a1af63857f95c5d787e8efd9943fce1bed
\ No newline at end of file
d43764a44f6578548d8b7838dc011b7693da0b7d65cbcc1fff96a212d655024e
\ No newline at end of file
de07bcc8166421d01382038d930cabb6a4749b314f05ca148e8d13cff947447c
\ No newline at end of file
......@@ -9296,6 +9296,8 @@ CREATE TABLE application_settings (
automatic_purchased_storage_allocation boolean DEFAULT false NOT NULL,
encrypted_ci_jwt_signing_key text,
encrypted_ci_jwt_signing_key_iv text,
container_registry_expiration_policies_worker_capacity integer DEFAULT 0 NOT NULL,
CONSTRAINT app_settings_registry_exp_policies_worker_capacity_positive CHECK ((container_registry_expiration_policies_worker_capacity >= 0)),
CONSTRAINT check_2dba05b802 CHECK ((char_length(gitpod_url) <= 255)),
CONSTRAINT check_51700b31b5 CHECK ((char_length(default_branch_name) <= 255)),
CONSTRAINT check_57123c9593 CHECK ((char_length(help_page_documentation_base_url) <= 255)),
......@@ -11216,7 +11218,8 @@ CREATE TABLE container_repositories (
created_at timestamp without time zone NOT NULL,
updated_at timestamp without time zone NOT NULL,
status smallint,
expiration_policy_started_at timestamp with time zone
expiration_policy_started_at timestamp with time zone,
expiration_policy_cleanup_status smallint DEFAULT 0 NOT NULL
);
CREATE SEQUENCE container_repositories_id_seq
......@@ -19726,6 +19729,8 @@ CREATE INDEX idx_ci_pipelines_artifacts_locked ON ci_pipelines USING btree (ci_r
CREATE INDEX idx_container_exp_policies_on_project_id_next_run_at_enabled ON container_expiration_policies USING btree (project_id, next_run_at, enabled);
CREATE INDEX idx_container_repositories_on_exp_cleanup_status_and_start_date ON container_repositories USING btree (expiration_policy_cleanup_status, expiration_policy_started_at);
CREATE INDEX idx_deployment_clusters_on_cluster_id_and_kubernetes_namespace ON deployment_clusters USING btree (cluster_id, kubernetes_namespace);
CREATE UNIQUE INDEX idx_environment_merge_requests_unique_index ON deployment_merge_requests USING btree (environment_id, merge_request_id);
......@@ -20310,6 +20315,8 @@ CREATE INDEX index_container_expiration_policies_on_next_run_at_and_enabled ON c
CREATE INDEX index_container_repositories_on_project_id ON container_repositories USING btree (project_id);
CREATE INDEX index_container_repositories_on_project_id_and_id ON container_repositories USING btree (project_id, id);
CREATE UNIQUE INDEX index_container_repositories_on_project_id_and_name ON container_repositories USING btree (project_id, name);
CREATE INDEX index_container_repository_on_name_trigram ON container_repositories USING gin (name gin_trgm_ops);
......
......@@ -21,6 +21,18 @@ FactoryBot.define do
status { :delete_failed }
end
trait :cleanup_scheduled do
expiration_policy_cleanup_status { :cleanup_scheduled }
end
trait :cleanup_unfinished do
expiration_policy_cleanup_status { :cleanup_unfinished }
end
trait :cleanup_ongoing do
expiration_policy_cleanup_status { :cleanup_ongoing }
end
after(:build) do |repository, evaluator|
next if evaluator.tags.to_a.none?
......
......@@ -72,6 +72,7 @@ RSpec.describe ApplicationSetting do
it { is_expected.not_to allow_value(nil).for(:push_event_activities_limit) }
it { is_expected.to validate_numericality_of(:container_registry_delete_tags_service_timeout).only_integer.is_greater_than_or_equal_to(0) }
it { is_expected.to validate_numericality_of(:container_registry_expiration_policies_worker_capacity).only_integer.is_greater_than_or_equal_to(0) }
it { is_expected.to validate_numericality_of(:snippet_size_limit).only_integer.is_greater_than(0) }
it { is_expected.to validate_numericality_of(:wiki_page_max_content_bytes).only_integer.is_greater_than_or_equal_to(1024) }
......
......@@ -38,6 +38,33 @@ RSpec.describe ContainerExpirationPolicy, type: :model do
it { is_expected.not_to allow_value('foo').for(:keep_n) }
end
describe '#disable!' do
let_it_be(:policy) { create(:container_expiration_policy) }
subject { policy.disable! }
it 'disables the container expiration policy' do
expect { subject }.to change { policy.reload.enabled }.from(true).to(false)
end
end
describe '#policy_params' do
let_it_be(:policy) { create(:container_expiration_policy) }
let(:expected) do
{
'older_than' => policy.older_than,
'keep_n' => policy.keep_n,
'name_regex' => policy.name_regex,
'name_regex_keep' => policy.name_regex_keep
}
end
subject { policy.policy_params }
it { is_expected.to eq(expected) }
end
context 'with a set of regexps' do
valid_regexps = %w[master .* v.+ v10.1.* (?:v.+|master|release)]
invalid_regexps = ['[', '(?:v.+|master|release']
......@@ -104,25 +131,15 @@ RSpec.describe ContainerExpirationPolicy, type: :model do
end
end
describe '.executable' do
subject { described_class.executable }
describe '.with_container_repositories' do
subject { described_class.with_container_repositories }
let_it_be(:policy1) { create(:container_expiration_policy, :runnable) }
let_it_be(:policy1) { create(:container_expiration_policy) }
let_it_be(:container_repository1) { create(:container_repository, project: policy1.project) }
let_it_be(:policy2) { create(:container_expiration_policy, :runnable) }
let_it_be(:policy2) { create(:container_expiration_policy) }
let_it_be(:container_repository2) { create(:container_repository, project: policy2.project) }
let_it_be(:policy3) { create(:container_expiration_policy, :runnable) }
let_it_be(:policy3) { create(:container_expiration_policy) }
it { is_expected.to contain_exactly(policy1, policy2) }
end
describe '#disable!' do
let_it_be(:container_expiration_policy) { create(:container_expiration_policy) }
subject { container_expiration_policy.disable! }
it 'disables the container expiration policy' do
expect { subject }.to change { container_expiration_policy.reload.enabled }.from(true).to(false)
end
end
end
......@@ -352,4 +352,20 @@ RSpec.describe ContainerRepository do
it { is_expected.to contain_exactly(repository) }
end
describe '.for_project_id' do
subject { described_class.for_project_id(project.id) }
it { is_expected.to contain_exactly(repository) }
end
describe '.waiting_for_cleanup' do
let_it_be(:repository_cleanup_scheduled) { create(:container_repository, :cleanup_scheduled) }
let_it_be(:repository_cleanup_unfinished) { create(:container_repository, :cleanup_unfinished) }
let_it_be(:repository_cleanup_ongoing) { create(:container_repository, :cleanup_ongoing) }
subject { described_class.waiting_for_cleanup }
it { is_expected.to contain_exactly(repository_cleanup_scheduled, repository_cleanup_unfinished) }
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ContainerExpirationPolicies::CleanupService do
let_it_be(:repository, reload: true) { create(:container_repository) }
let_it_be(:project) { repository.project }
let(:service) { described_class.new(repository) }
describe '#execute' do
subject { service.execute }
context 'with a successful cleanup tags service execution' do
let(:cleanup_tags_service_params) { project.container_expiration_policy.policy_params.merge('container_expiration_policy' => true) }
let(:cleanup_tags_service) { instance_double(Projects::ContainerRepository::CleanupTagsService) }
it 'completely clean up the repository' do
expect(Projects::ContainerRepository::CleanupTagsService)
.to receive(:new).with(project, nil, cleanup_tags_service_params).and_return(cleanup_tags_service)
expect(cleanup_tags_service).to receive(:execute).with(repository).and_return(status: :success)
response = subject
aggregate_failures "checking the response and container repositories" do
expect(response.success?).to eq(true)
expect(response.payload).to include(cleanup_status: :finished, container_repository_id: repository.id)
expect(ContainerRepository.waiting_for_cleanup.count).to eq(0)
expect(repository.reload.cleanup_unscheduled?).to be_truthy
expect(repository.expiration_policy_started_at).to eq(nil)
end
end
end
context 'without a successful cleanup tags service execution' do
it 'partially clean up the repository' do
expect(Projects::ContainerRepository::CleanupTagsService)
.to receive(:new).and_return(double(execute: { status: :error, message: 'timeout' }))
response = subject
aggregate_failures "checking the response and container repositories" do
expect(response.success?).to eq(true)
expect(response.payload).to include(cleanup_status: :unfinished, container_repository_id: repository.id)
expect(ContainerRepository.waiting_for_cleanup.count).to eq(1)
expect(repository.reload.cleanup_unfinished?).to be_truthy
expect(repository.expiration_policy_started_at).not_to eq(nil)
end
end
end
context 'with no repository' do
let(:service) { described_class.new(nil) }
it 'returns an error response' do
response = subject
expect(response.success?).to eq(false)
end
end
end
end
......@@ -27,20 +27,5 @@ RSpec.describe ContainerExpirationPolicyService do
expect(container_expiration_policy.next_run_at).to be > Time.zone.now
end
context 'with an invalid container expiration policy' do
before do
allow(container_expiration_policy).to receive(:valid?).and_return(false)
end
it 'disables it' do
expect(container_expiration_policy).not_to receive(:schedule_next_run!)
expect(CleanupContainerRepositoryWorker).not_to receive(:perform_async)
expect { subject }
.to change { container_expiration_policy.reload.enabled }.from(true).to(false)
.and raise_error(ContainerExpirationPolicyService::InvalidPolicyError)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ContainerExpirationPolicies::CleanupContainerRepositoryWorker do
let_it_be(:repository, reload: true) { create(:container_repository, :cleanup_scheduled) }
let_it_be(:project) { repository.project }
let_it_be(:policy) { project.container_expiration_policy }
let_it_be(:other_repository) { create(:container_repository) }
let(:worker) { described_class.new }
describe '#perform_work' do
subject { worker.perform_work }
RSpec.shared_examples 'handling all repository conditions' do
it 'sends the repository for cleaning' do
expect(ContainerExpirationPolicies::CleanupService)
.to receive(:new).with(repository).and_return(double(execute: cleanup_service_response(repository: repository)))
expect(worker).to receive(:log_extra_metadata_on_done).with(:cleanup_status, :finished)
expect(worker).to receive(:log_extra_metadata_on_done).with(:container_repository_id, repository.id)
subject
end
context 'with unfinished cleanup' do
it 'logs an unfinished cleanup' do
expect(ContainerExpirationPolicies::CleanupService)
.to receive(:new).with(repository).and_return(double(execute: cleanup_service_response(status: :unfinished, repository: repository)))
expect(worker).to receive(:log_extra_metadata_on_done).with(:cleanup_status, :unfinished)
expect(worker).to receive(:log_extra_metadata_on_done).with(:container_repository_id, repository.id)
subject
end
end
context 'with policy running shortly' do
before do
repository.project
.container_expiration_policy
.update_column(:next_run_at, 1.minute.from_now)
end
it 'skips the repository' do
expect(ContainerExpirationPolicies::CleanupService).not_to receive(:new)
expect { subject }.to change { ContainerRepository.waiting_for_cleanup.count }.from(1).to(0)
expect(repository.reload.cleanup_unscheduled?).to be_truthy
end
end
context 'with disabled policy' do
before do
repository.project
.container_expiration_policy
.disable!
end
it 'skips the repository' do
expect(ContainerExpirationPolicies::CleanupService).not_to receive(:new)
expect { subject }.to change { ContainerRepository.waiting_for_cleanup.count }.from(1).to(0)
expect(repository.reload.cleanup_unscheduled?).to be_truthy
end
end
end
context 'with repository in cleanup scheduled state' do
it_behaves_like 'handling all repository conditions'
end
context 'with repository in cleanup unfinished state' do
before do
repository.cleanup_unfinished!
end
it_behaves_like 'handling all repository conditions'
end
context 'with another repository in cleanup unfinished state' do
let_it_be(:another_repository) { create(:container_repository, :cleanup_unfinished) }
it 'process the cleanup scheduled repository first' do
expect(ContainerExpirationPolicies::CleanupService)
.to receive(:new).with(repository).and_return(double(execute: cleanup_service_response(repository: repository)))
expect(worker).to receive(:log_extra_metadata_on_done).with(:cleanup_status, :finished)
expect(worker).to receive(:log_extra_metadata_on_done).with(:container_repository_id, repository.id)
subject
end
end
context 'with multiple repositories in cleanup unfinished state' do
let_it_be(:repository2) { create(:container_repository, :cleanup_unfinished, expiration_policy_started_at: 20.minutes.ago) }
let_it_be(:repository3) { create(:container_repository, :cleanup_unfinished, expiration_policy_started_at: 10.minutes.ago) }
before do
repository.update!(expiration_policy_cleanup_status: :cleanup_unfinished, expiration_policy_started_at: 30.minutes.ago)
end
it 'process the repository with the oldest expiration_policy_started_at' do
expect(ContainerExpirationPolicies::CleanupService)
.to receive(:new).with(repository).and_return(double(execute: cleanup_service_response(repository: repository)))
expect(worker).to receive(:log_extra_metadata_on_done).with(:cleanup_status, :finished)
expect(worker).to receive(:log_extra_metadata_on_done).with(:container_repository_id, repository.id)
subject
end
end
context 'with repository in cleanup ongoing state' do
before do
repository.cleanup_ongoing!
end
it 'does not process it' do
expect(Projects::ContainerRepository::CleanupTagsService).not_to receive(:new)
expect { subject }.not_to change { ContainerRepository.waiting_for_cleanup.count }
expect(repository.cleanup_ongoing?).to be_truthy
end
end
context 'with no repository in any cleanup state' do
before do
repository.cleanup_unscheduled!
end
it 'does not process it' do
expect(Projects::ContainerRepository::CleanupTagsService).not_to receive(:new)
expect { subject }.not_to change { ContainerRepository.waiting_for_cleanup.count }
expect(repository.cleanup_unscheduled?).to be_truthy
end
end
context 'with no container repository waiting' do
before do
repository.destroy!
end
it 'does not execute the cleanup tags service' do
expect(Projects::ContainerRepository::CleanupTagsService).not_to receive(:new)
expect { subject }.not_to change { ContainerRepository.waiting_for_cleanup.count }
end
end
context 'with feature flag disabled' do
before do
stub_feature_flags(container_registry_expiration_policies_throttling: false)
end
it 'is a no-op' do
expect(Projects::ContainerRepository::CleanupTagsService).not_to receive(:new)
expect { subject }.not_to change { ContainerRepository.waiting_for_cleanup.count }
end
end
def cleanup_service_response(status: :finished, repository:)
ServiceResponse.success(message: "cleanup #{status}", payload: { cleanup_status: status, container_repository_id: repository.id })
end
end
describe '#remaining_work_count' do
subject { worker.remaining_work_count }
context 'with container repositoires waiting for cleanup' do
let_it_be(:unfinished_repositories) { create_list(:container_repository, 2, :cleanup_unfinished) }
it { is_expected.to eq(3) }
it 'logs the work count' do
expect_log_info(
cleanup_scheduled_count: 1,
cleanup_unfinished_count: 2,
cleanup_total_count: 3
)
subject
end
end
context 'with no container repositories waiting for cleanup' do
before do
repository.cleanup_ongoing!
end
it { is_expected.to eq(0) }
it 'logs 0 work count' do
expect_log_info(
cleanup_scheduled_count: 0,
cleanup_unfinished_count: 0,
cleanup_total_count: 0
)
subject
end
end
end
describe '#max_running_jobs' do
let(:capacity) { 50 }
subject { worker.max_running_jobs }
before do
stub_application_setting(container_registry_expiration_policies_worker_capacity: capacity)
end
it { is_expected.to eq(capacity) }
context 'with feature flag disabled' do
before do
stub_feature_flags(container_registry_expiration_policies_throttling: false)
end
it { is_expected.to eq(0) }
end
end
def expect_log_info(structure)
expect(worker.logger)
.to receive(:info).with(worker.structured_payload(structure))
end
end
......@@ -5,71 +5,152 @@ require 'spec_helper'
RSpec.describe ContainerExpirationPolicyWorker do
include ExclusiveLeaseHelpers
subject { described_class.new.perform }
let(:worker) { described_class.new }
let(:started_at) { nil }
RSpec.shared_examples 'not executing any policy' do
it 'does not run any policy' do
expect(ContainerExpirationPolicyService).not_to receive(:new)
describe '#perform' do
subject { worker.perform }
subject
RSpec.shared_examples 'not executing any policy' do
it 'does not run any policy' do
expect(ContainerExpirationPolicyService).not_to receive(:new)
subject
end
end
end
context 'With no container expiration policies' do
it_behaves_like 'not executing any policy'
end
context 'With no container expiration policies' do
it 'does not execute any policies' do
expect(ContainerRepository).not_to receive(:for_project_id)
context 'With container expiration policies' do
let_it_be(:container_expiration_policy, reload: true) { create(:container_expiration_policy, :runnable) }
let_it_be(:container_repository) { create(:container_repository, project: container_expiration_policy.project) }
let_it_be(:user) { container_expiration_policy.project.owner }
expect { subject }.not_to change { ContainerRepository.cleanup_scheduled.count }
end
end
context 'a valid policy' do
it 'runs the policy' do
service = instance_double(ContainerExpirationPolicyService, execute: true)
context 'with container expiration policies' do
let_it_be(:container_expiration_policy) { create(:container_expiration_policy, :runnable) }
let_it_be(:container_repository) { create(:container_repository, project: container_expiration_policy.project) }
expect(ContainerExpirationPolicyService)
.to receive(:new).with(container_expiration_policy.project, user).and_return(service)
context 'with a valid container expiration policy' do
it 'schedules the next run' do
expect { subject }.to change { container_expiration_policy.reload.next_run_at }
end
subject
end
end
it 'marks the container repository as scheduled for cleanup' do
expect { subject }.to change { container_repository.reload.cleanup_scheduled? }.from(false).to(true)
expect(ContainerRepository.cleanup_scheduled.count).to eq(1)
end
context 'a disabled policy' do
before do
container_expiration_policy.disable!
it 'calls the limited capacity worker' do
expect(ContainerExpirationPolicies::CleanupContainerRepositoryWorker).to receive(:perform_with_capacity)
subject
end
end
it_behaves_like 'not executing any policy'
end
context 'with a disabled container expiration policy' do
before do
container_expiration_policy.disable!
end
context 'a policy that is not due for a run' do
before do
container_expiration_policy.update_column(:next_run_at, 2.minutes.from_now)
it 'does not run the policy' do
expect(ContainerRepository).not_to receive(:for_project_id)
expect { subject }.not_to change { ContainerRepository.cleanup_scheduled.count }
end
end
it_behaves_like 'not executing any policy'
context 'with an invalid container expiration policy' do
let(:user) { container_expiration_policy.project.owner }
before do
container_expiration_policy.update_column(:name_regex, '*production')
end
it 'disables the policy and tracks an error' do
expect(ContainerRepository).not_to receive(:for_project_id)
expect(Gitlab::ErrorTracking).to receive(:log_exception).with(instance_of(described_class::InvalidPolicyError), container_expiration_policy_id: container_expiration_policy.id)
expect { subject }.to change { container_expiration_policy.reload.enabled }.from(true).to(false)
expect(ContainerRepository.cleanup_scheduled).to be_empty
end
end
end
context 'a policy linked to no container repository' do
context 'with exclusive lease taken' do
before do
container_expiration_policy.container_repositories.delete_all
stub_exclusive_lease_taken(worker.lease_key, timeout: 5.hours)
end
it_behaves_like 'not executing any policy'
it 'does not execute any policy' do
expect(ContainerExpirationPolicies::CleanupContainerRepositoryWorker).not_to receive(:perform_with_capacity)
expect(worker).not_to receive(:runnable_policies)
expect { subject }.not_to change { ContainerRepository.cleanup_scheduled.count }
end
end
context 'an invalid policy' do
context 'with throttling disabled' do
before do
container_expiration_policy.update_column(:name_regex, '*production')
stub_feature_flags(container_registry_expiration_policies_throttling: false)
end
it 'runs the policy and tracks an error' do
expect(ContainerExpirationPolicyService)
.to receive(:new).with(container_expiration_policy.project, user).and_call_original
expect(Gitlab::ErrorTracking).to receive(:log_exception).with(instance_of(ContainerExpirationPolicyService::InvalidPolicyError), container_expiration_policy_id: container_expiration_policy.id)
context 'with no container expiration policies' do
it_behaves_like 'not executing any policy'
end
context 'with container expiration policies' do
let_it_be(:container_expiration_policy, reload: true) { create(:container_expiration_policy, :runnable) }
let_it_be(:container_repository) { create(:container_repository, project: container_expiration_policy.project) }
let_it_be(:user) { container_expiration_policy.project.owner }
context 'a valid policy' do
it 'runs the policy' do
service = instance_double(ContainerExpirationPolicyService, execute: true)
expect(ContainerExpirationPolicyService)
.to receive(:new).with(container_expiration_policy.project, user).and_return(service)
subject
end
end
context 'a disabled policy' do
before do
container_expiration_policy.disable!
end
it_behaves_like 'not executing any policy'
end
context 'a policy that is not due for a run' do
before do
container_expiration_policy.update_column(:next_run_at, 2.minutes.from_now)
end
it_behaves_like 'not executing any policy'
end
context 'a policy linked to no container repository' do
before do
container_expiration_policy.container_repositories.delete_all
end
it_behaves_like 'not executing any policy'
end
context 'an invalid policy' do
before do
container_expiration_policy.update_column(:name_regex, '*production')
end
it 'disables the policy and tracks an error' do
expect(ContainerExpirationPolicyService).not_to receive(:new).with(container_expiration_policy, user)
expect(Gitlab::ErrorTracking).to receive(:log_exception).with(instance_of(described_class::InvalidPolicyError), container_expiration_policy_id: container_expiration_policy.id)
expect { subject }.to change { container_expiration_policy.reload.enabled }.from(true).to(false)
expect { subject }.to change { container_expiration_policy.reload.enabled }.from(true).to(false)
end
end
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment