Refactor legacy finder to find registries pending verification

These changes use the new finder when FDW is enabled without selective
sync to avoid code duplication.
parent 0bec1dc9
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# Finder for retrieving project registries that need a repository or # Finder for retrieving project registries that need a repository or
# wiki verification where projects belong to the specific shard # wiki verification where projects belong to the specific shard
# using cross-database joins for selective sync. # using cross-database joins.
# #
# Basic usage: # Basic usage:
# #
...@@ -18,11 +18,18 @@ module Geo ...@@ -18,11 +18,18 @@ module Geo
end end
def execute def execute
if use_legacy_queries? registries = find_registries_pending_verification_on_secondary
registries_pending_verification_for_selective_sync return Geo::ProjectRegistry.none if registries.empty?
else
registries_pending_verification registries_to_verify = filter_registries_verified_in_primary(registries)
end return registries_to_verify unless selective_sync?
legacy_inner_join_registry_ids(
registries_to_verify,
current_node.projects.pluck_primary_key,
Geo::ProjectRegistry,
foreign_key: :project_id
)
end end
private private
...@@ -30,55 +37,58 @@ module Geo ...@@ -30,55 +37,58 @@ module Geo
attr_reader :batch_size, :shard_name attr_reader :batch_size, :shard_name
# rubocop:disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def registries_pending_verification def find_registries_pending_verification_on_secondary
Geo::ProjectRegistry.all Geo::ProjectRegistry
.merge(Geo::Fdw::ProjectRegistry.registries_pending_verification) .where(Geo::ProjectRegistry.registries_pending_verification)
.merge(Geo::Fdw::ProjectRegistry.within_shards(shard_name)) .pluck(
.limit(batch_size) :project_id,
Geo::ProjectRegistry.repositories_pending_verification.to_sql,
Geo::ProjectRegistry.wikis_pending_verification.to_sql
)
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord def filter_registries_verified_in_primary(registries)
def registries_pending_verification_for_selective_sync filtered_project_ids = filter_projects_verified_on_primary(registries)
registries = Geo::ProjectRegistry Geo::ProjectRegistry.project_id_in(filtered_project_ids)
.where(Geo::ProjectRegistry.registries_pending_verification) end
.pluck(:project_id, Geo::ProjectRegistry.repositories_pending_verification.to_sql, Geo::ProjectRegistry.wikis_pending_verification.to_sql)
return Geo::ProjectRegistry.none if registries.empty?
id_and_want_to_sync = registries.map do |project_id, want_to_sync_repo, want_to_sync_wiki|
"(#{project_id}, #{quote_value(want_to_sync_repo)}, #{quote_value(want_to_sync_wiki)})"
end
legacy_repository_state_table = ::ProjectRepositoryState.arel_table
project_registry_sync_table = Arel::Table.new(:project_registry_sync_table)
joined_relation =
ProjectRepositoryState.joins(<<~SQL_REPO)
INNER JOIN
(VALUES #{id_and_want_to_sync.join(',')})
project_registry_sync_table(project_id, want_to_sync_repo, want_to_sync_wiki)
ON #{legacy_repository_state_table.name}.project_id = project_registry_sync_table.project_id
SQL_REPO
project_ids = joined_relation # rubocop:disable CodeReuse/ActiveRecord
def filter_projects_verified_on_primary(registries)
inner_join_project_repository_state(registries)
.joins(:project) .joins(:project)
.where(projects: { repository_storage: shard_name }) .merge(Project.within_shards(shard_name))
.where( .where(
legacy_repository_state_table[:repository_verification_checksum].not_eq(nil) legacy_repository_state_table[:repository_verification_checksum].not_eq(nil)
.and(project_registry_sync_table[:want_to_sync_repo].eq(true)) .and(project_registry_verify_table[:want_to_verify_repo].eq(true))
.or(legacy_repository_state_table[:wiki_verification_checksum].not_eq(nil) .or(legacy_repository_state_table[:wiki_verification_checksum].not_eq(nil)
.and(project_registry_sync_table[:want_to_sync_wiki].eq(true)))) .and(project_registry_verify_table[:want_to_verify_wiki].eq(true))))
.limit(batch_size) .limit(batch_size)
.pluck_project_key .pluck_project_key
end
# rubocop:enable CodeReuse/ActiveRecord
legacy_inner_join_registry_ids( # rubocop:disable CodeReuse/ActiveRecord
Geo::ProjectRegistry.project_id_in(project_ids), def inner_join_project_repository_state(registries)
current_node.projects.pluck_primary_key, id_and_want_to_verify = registries.map do |project_id, want_to_verify_repo, want_to_verify_wiki|
Geo::ProjectRegistry, "(#{project_id}, #{quote_value(want_to_verify_repo)}, #{quote_value(want_to_verify_wiki)})"
foreign_key: :project_id end
)
ProjectRepositoryState.joins(<<~SQL_REPO)
INNER JOIN
(VALUES #{id_and_want_to_verify.join(',')})
#{project_registry_verify_table.name}(project_id, want_to_verify_repo, want_to_verify_wiki)
ON #{legacy_repository_state_table.name}.project_id = #{project_registry_verify_table.name}.project_id
SQL_REPO
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
def legacy_repository_state_table
@legacy_repository_state_table ||= ProjectRepositoryState.arel_table
end
def project_registry_verify_table
@project_registry_verify_table ||= Arel::Table.new(:project_registry_verify_table)
end
end end
end end
...@@ -67,7 +67,9 @@ module Geo ...@@ -67,7 +67,9 @@ module Geo
end end
def find_registries_to_verify(shard_name:, batch_size:) def find_registries_to_verify(shard_name:, batch_size:)
registries_pending_verification(shard_name, batch_size) finder_klass_for_registries_pending_verification
.new(current_node: current_node, shard_name: shard_name, batch_size: batch_size)
.execute
end end
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
...@@ -161,6 +163,10 @@ module Geo ...@@ -161,6 +163,10 @@ module Geo
private private
def use_legacy_queries_for_selective_sync?
selective_sync? && !Gitlab::Geo::Fdw.enabled_for_selective_sync?
end
def finder_klass_for_synced_registries def finder_klass_for_synced_registries
if Gitlab::Geo::Fdw.enabled_for_selective_sync? if Gitlab::Geo::Fdw.enabled_for_selective_sync?
Geo::ProjectRegistrySyncedFinder Geo::ProjectRegistrySyncedFinder
...@@ -246,17 +252,11 @@ module Geo ...@@ -246,17 +252,11 @@ module Geo
end end
def finder_klass_for_registries_pending_verification def finder_klass_for_registries_pending_verification
if Gitlab::Geo::Fdw.enabled_for_selective_sync? if !Gitlab::Geo::Fdw.enabled? || use_legacy_queries_for_selective_sync?
Geo::ProjectRegistryPendingVerificationFinder
else
Geo::LegacyProjectRegistryPendingVerificationFinder Geo::LegacyProjectRegistryPendingVerificationFinder
else
Geo::ProjectRegistryPendingVerificationFinder
end end
end end
def registries_pending_verification(shard_name, batch_size)
finder_klass_for_registries_pending_verification
.new(current_node: current_node, shard_name: shard_name, batch_size: batch_size)
.execute
end
end end
end end
...@@ -87,7 +87,7 @@ module EE ...@@ -87,7 +87,7 @@ module EE
end end
scope :with_wiki_enabled, -> { with_feature_enabled(:wiki) } scope :with_wiki_enabled, -> { with_feature_enabled(:wiki) }
scope :within_shards, -> (shard_names) { where(repository_storage: Array(shard_names)) }
scope :verification_failed_repos, -> { joins(:repository_state).merge(ProjectRepositoryState.verification_failed_repos) } scope :verification_failed_repos, -> { joins(:repository_state).merge(ProjectRepositoryState.verification_failed_repos) }
scope :verification_failed_wikis, -> { joins(:repository_state).merge(ProjectRepositoryState.verification_failed_wikis) } scope :verification_failed_wikis, -> { joins(:repository_state).merge(ProjectRepositoryState.verification_failed_wikis) }
scope :for_plan_name, -> (name) { joins(namespace: :plan).where(plans: { name: name }) } scope :for_plan_name, -> (name) { joins(namespace: :plan).where(plans: { name: name }) }
......
require 'spec_helper' require 'spec_helper'
describe Geo::ProjectRegistryFinder, :geo do describe Geo::ProjectRegistryFinder, :geo do
using RSpec::Parameterized::TableSyntax
include ::EE::GeoHelpers include ::EE::GeoHelpers
# Using let() instead of set() because set() does not work properly # Using let() instead of set() because set() does not work properly
...@@ -466,16 +468,6 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -466,16 +468,6 @@ describe Geo::ProjectRegistryFinder, :geo do
include_examples 'counts all the things', 'fdw' include_examples 'counts all the things', 'fdw'
include_examples 'finds all the things', 'fdw' include_examples 'finds all the things', 'fdw'
describe '#find_registries_to_verify' do
it 'delegates to Geo::LegacyProjectRegistryPendingVerificationFinder' do
expect_next_instance_of(Geo::LegacyProjectRegistryPendingVerificationFinder, current_node: secondary, shard_name: 'default', batch_size: 100) do |finder|
expect(finder).to receive(:execute).once
end
subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)
end
end
end end
context 'with use_fdw_queries_for_selective_sync enabled' do context 'with use_fdw_queries_for_selective_sync enabled' do
...@@ -485,16 +477,6 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -485,16 +477,6 @@ describe Geo::ProjectRegistryFinder, :geo do
include_examples 'counts all the things', 'fdw' include_examples 'counts all the things', 'fdw'
include_examples 'finds all the things', 'fdw' include_examples 'finds all the things', 'fdw'
describe '#find_registries_to_verify' do
it 'delegates to Geo::ProjectRegistryPendingVerificationFinder' do
expect_next_instance_of(Geo::ProjectRegistryPendingVerificationFinder, current_node: secondary, shard_name: 'default', batch_size: 100) do |finder|
expect(finder).to receive(:execute).once
end
subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)
end
end
end end
end end
...@@ -505,10 +487,29 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -505,10 +487,29 @@ describe Geo::ProjectRegistryFinder, :geo do
include_examples 'counts all the things', 'legacy' include_examples 'counts all the things', 'legacy'
include_examples 'finds all the things', 'legacy' include_examples 'finds all the things', 'legacy'
end
describe '#find_registries_to_verify', :delete do
where(:selective_sync, :fdw_enabled, :use_fdw_queries_for_selective_sync, :finder) do
false | false | false | Geo::LegacyProjectRegistryPendingVerificationFinder
false | false | true | Geo::LegacyProjectRegistryPendingVerificationFinder
false | true | true | Geo::ProjectRegistryPendingVerificationFinder
false | true | false | Geo::ProjectRegistryPendingVerificationFinder
true | false | false | Geo::LegacyProjectRegistryPendingVerificationFinder
true | false | true | Geo::LegacyProjectRegistryPendingVerificationFinder
true | true | true | Geo::ProjectRegistryPendingVerificationFinder
true | true | false | Geo::LegacyProjectRegistryPendingVerificationFinder
end
with_them do
before do
stub_fdw(fdw_enabled)
stub_feature_flags(use_fdw_queries_for_selective_sync: use_fdw_queries_for_selective_sync)
stub_selective_sync(secondary, selective_sync)
end
describe '#find_registries_to_verify' do it 'delegates to Geo::ProjectRegistryPendingVerificationFinder' do
it 'delegates to Geo::LegacyProjectRegistryPendingVerificationFinder' do expect_next_instance_of(finder, current_node: secondary, shard_name: 'default', batch_size: 100) do |finder|
expect_next_instance_of(Geo::LegacyProjectRegistryPendingVerificationFinder, current_node: secondary, shard_name: 'default', batch_size: 100) do |finder|
expect(finder).to receive(:execute).once expect(finder).to receive(:execute).once
end end
......
...@@ -14,8 +14,16 @@ module EE ...@@ -14,8 +14,16 @@ module EE
allow(::Gitlab::Geo).to receive(:secondary?).and_return(true) allow(::Gitlab::Geo).to receive(:secondary?).and_return(true)
end end
def stub_fdw(value)
allow(::Gitlab::Geo::Fdw).to receive(:enabled?).and_return(value)
end
def stub_fdw_disabled def stub_fdw_disabled
allow(::Gitlab::Geo::Fdw).to receive(:enabled?).and_return(false) stub_fdw(false)
end
def stub_selective_sync(node, value)
allow(node).to receive(:selective_sync?).and_return(value)
end end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment