diff --git a/ee/app/finders/geo/repository_verification_finder.rb b/ee/app/finders/geo/repository_verification_finder.rb index f3b718e98a861f9a9d03895ed99ddd605f719506..610768cd3cd84a72a1e6166ef1944306086d54f3 100644 --- a/ee/app/finders/geo/repository_verification_finder.rb +++ b/ee/app/finders/geo/repository_verification_finder.rb @@ -4,6 +4,15 @@ module Geo @shard_name = shard_name end + def find_failed_projects(batch_size:) + query = build_query_to_find_failed_projects(batch_size: batch_size) + cte = Gitlab::SQL::CTE.new(:failed_projects, query) + + Project.with(cte.to_arel) + .from(cte.alias_to(projects_table)) + .order(last_repository_updated_at_asc) + end + def find_outdated_projects(batch_size:) query = build_query_to_find_outdated_projects(batch_size: batch_size) cte = Gitlab::SQL::CTE.new(:outdated_projects, query) @@ -45,6 +54,18 @@ module Geo attr_reader :shard_name + def build_query_to_find_failed_projects(batch_size:) + query = + projects_table + .join(repository_state_table).on(project_id_matcher) + .project(projects_table[:id], projects_table[:last_repository_updated_at]) + .where(repository_failed.or(wiki_failed)) + .take(batch_size) + + query = apply_shard_restriction(query) if shard_name.present? + query + end + def build_query_to_find_outdated_projects(batch_size:) query = projects_table @@ -76,11 +97,19 @@ module Geo .join_sources end + def repository_failed + repository_state_table[:last_repository_verification_failure].not_eq(nil) + end + def repository_outdated repository_state_table[:repository_verification_checksum].eq(nil) .and(repository_state_table[:last_repository_verification_failure].eq(nil)) end + def wiki_failed + repository_state_table[:last_wiki_verification_failure].not_eq(nil) + end + def wiki_outdated repository_state_table[:wiki_verification_checksum].eq(nil) .and(repository_state_table[:last_wiki_verification_failure].eq(nil)) diff --git a/ee/app/workers/geo/repository_verification/primary/shard_worker.rb b/ee/app/workers/geo/repository_verification/primary/shard_worker.rb index 446323998fe668e735600abe0d5d26faf961ceeb..9cd86fe49a88d93d7bce294247578bb9dbb66272 100644 --- a/ee/app/workers/geo/repository_verification/primary/shard_worker.rb +++ b/ee/app/workers/geo/repository_verification/primary/shard_worker.rb @@ -42,12 +42,13 @@ module Geo def load_pending_resources resources = find_unverified_project_ids(batch_size: db_retrieve_batch_size) remaining_capacity = db_retrieve_batch_size - resources.size + return resources if remaining_capacity.zero? - if remaining_capacity.zero? - resources - else - resources + find_outdated_project_ids(batch_size: remaining_capacity) - end + resources = resources + find_outdated_project_ids(batch_size: remaining_capacity) + remaining_capacity = db_retrieve_batch_size - resources.size + return resources if remaining_capacity.zero? + + resources + find_failed_project_ids(batch_size: remaining_capacity) end def find_unverified_project_ids(batch_size:) @@ -57,6 +58,10 @@ module Geo def find_outdated_project_ids(batch_size:) finder.find_outdated_projects(batch_size: batch_size).pluck(:id) end + + def find_failed_project_ids(batch_size:) + finder.find_failed_projects(batch_size: batch_size).pluck(:id) + end end end end diff --git a/ee/spec/finders/geo/repository_verification_finder_spec.rb b/ee/spec/finders/geo/repository_verification_finder_spec.rb index 814508c3827db05f7290bd608a0b5b0d5e24c868..8b65cf2dfb66c5e3180766d136f2fbc3bb85eb00 100644 --- a/ee/spec/finders/geo/repository_verification_finder_spec.rb +++ b/ee/spec/finders/geo/repository_verification_finder_spec.rb @@ -3,6 +3,70 @@ require 'spec_helper' describe Geo::RepositoryVerificationFinder, :postgresql do set(:project) { create(:project) } + describe '#find_failed_projects' do + it 'returns projects where repository verification failed' do + create(:repository_state, :repository_failed, :wiki_verified, project: project) + + expect(subject.find_failed_projects(batch_size: 10)) + .to match_array(project) + end + + it 'does not return projects where repository verification is outdated' do + create(:repository_state, :repository_outdated, project: project) + + expect(subject.find_failed_projects(batch_size: 10)).to be_empty + end + + it 'does not return projects where repository verification is pending' do + create(:repository_state, :wiki_verified, project: project) + + expect(subject.find_failed_projects(batch_size: 10)).to be_empty + end + + it 'returns projects where wiki verification failed' do + create(:repository_state, :repository_verified, :wiki_failed, project: project) + + expect(subject.find_failed_projects(batch_size: 10)) + .to match_array(project) + end + + it 'does non return projects where wiki verification is outdated' do + create(:repository_state, :wiki_outdated, project: project) + + expect(subject.find_failed_projects(batch_size: 10)).to be_empty + end + + it 'does not return projects where wiki verification is pending' do + create(:repository_state, :repository_verified, project: project) + + expect(subject.find_failed_projects(batch_size: 10)).to be_empty + end + + it 'returns less active projects first' do + less_active_project = create(:project) + create(:repository_state, :repository_failed, project: project) + create(:repository_state, :repository_failed, project: less_active_project) + project.update_column(:last_repository_updated_at, 30.minutes.ago) + less_active_project.update_column(:last_repository_updated_at, 2.days.ago) + + expect(subject.find_failed_projects(batch_size: 10)).to eq [less_active_project, project] + end + + context 'with shard restriction' do + subject { described_class.new(shard_name: project.repository_storage) } + + it 'does not return projects on other shards' do + project_other_shard = create(:project) + project_other_shard.update_column(:repository_storage, 'other') + create(:repository_state, :repository_failed, project: project) + create(:repository_state, :repository_failed, project: project_other_shard) + + expect(subject.find_failed_projects(batch_size: 10)) + .to match_array(project) + end + end + end + describe '#find_outdated_projects' do it 'returns projects where repository verification is outdated' do create(:repository_state, :repository_outdated, project: project) diff --git a/ee/spec/workers/geo/repository_verification/primary/shard_worker_spec.rb b/ee/spec/workers/geo/repository_verification/primary/shard_worker_spec.rb index 2b42faaeb85d5eb02ebe805c035f0e7a4e6d227d..42c542a4eed7adee3a2d7945b717074db901a7fb 100644 --- a/ee/spec/workers/geo/repository_verification/primary/shard_worker_spec.rb +++ b/ee/spec/workers/geo/repository_verification/primary/shard_worker_spec.rb @@ -63,6 +63,26 @@ describe Geo::RepositoryVerification::Primary::ShardWorker, :postgresql, :clean_ subject.perform(shard_name) end + it 'performs Geo::RepositoryVerification::Primary::SingleWorker for projects where repository verification failed' do + repository_verification_failed = create(:project) + + create(:repository_state, :repository_failed, :wiki_verified, project: repository_verification_failed) + + expect(primary_singleworker).to receive(:perform_async).with(repository_verification_failed.id) + + subject.perform(shard_name) + end + + it 'performs Geo::RepositoryVerification::Primary::SingleWorker for projects where wiki verification failed' do + wiki_verification_failed = create(:project) + + create(:repository_state, :repository_verified, :wiki_failed, project: wiki_verification_failed) + + expect(primary_singleworker).to receive(:perform_async).with(wiki_verification_failed.id) + + subject.perform(shard_name) + end + it 'does not perform Geo::RepositoryVerification::Primary::SingleWorker when shard becomes unhealthy' do create(:project) @@ -133,14 +153,14 @@ describe Geo::RepositoryVerification::Primary::ShardWorker, :postgresql, :clean_ end end - it 'handles multiple batches of projects needing verification, skipping failed repos' do + it 'handles multiple batches of projects needing verification, including failed repos' do expect(primary_singleworker).to receive(:perform_async).with(project_repo_unverified.id).once.and_call_original expect(primary_singleworker).to receive(:perform_async).with(project_wiki_unverified.id).once.and_call_original expect(primary_singleworker).to receive(:perform_async).with(project_repo_verified.id).once.and_call_original expect(primary_singleworker).to receive(:perform_async).with(project_wiki_verified.id).once.and_call_original - expect(primary_singleworker).not_to receive(:perform_async).with(project_both_failed.id) - expect(primary_singleworker).not_to receive(:perform_async).with(project_repo_failed_wiki_verified.id) - expect(primary_singleworker).not_to receive(:perform_async).with(project_repo_verified_wiki_failed.id) + expect(primary_singleworker).to receive(:perform_async).with(project_both_failed.id).once.and_call_original + expect(primary_singleworker).to receive(:perform_async).with(project_repo_failed_wiki_verified.id).once.and_call_original + expect(primary_singleworker).to receive(:perform_async).with(project_repo_verified_wiki_failed.id).once.and_call_original 8.times do Sidekiq::Testing.inline! { subject.perform(shard_name) }