Add selective sync support to FDW queries to find registries to verify

These changes introduces a new finder to make it easier to
remove the legacy queries in the future.
parent f70aea90
# frozen_string_literal: true
# Finder for retrieving project registries that need a repository or
# wiki verification where projects belong to the specific shard
# using cross-database joins for selective sync.
#
# Basic usage:
#
# Geo::LegacyProjectRegistryPendingVerificationFinder
# .new(current_node: Gitlab::Geo.current_node, shard_name: 'default', batch_size: 1000)
# .execute
module Geo
class LegacyProjectRegistryPendingVerificationFinder < RegistryFinder
def initialize(current_node: nil, shard_name:, batch_size:)
super(current_node: current_node)
@shard_name = shard_name
@batch_size = batch_size
end
def execute
if use_legacy_queries?
registries_pending_verification_for_selective_sync
else
registries_pending_verification
end
end
private
attr_reader :batch_size, :shard_name
def local_registry_table
Geo::ProjectRegistry.arel_table
end
def fdw_project_table
Geo::Fdw::Project.arel_table
end
def fdw_repository_state_table
Geo::Fdw::ProjectRepositoryState.arel_table
end
def fdw_inner_join_projects
local_registry_table
.join(fdw_project_table, Arel::Nodes::InnerJoin)
.on(local_registry_table[:project_id].eq(fdw_project_table[:id]))
.join_sources
end
def fdw_inner_join_repository_state
local_registry_table
.join(fdw_repository_state_table, Arel::Nodes::InnerJoin)
.on(local_registry_table[:project_id].eq(fdw_repository_state_table[:project_id]))
.join_sources
end
def local_repo_condition
local_registry_table[:repository_verification_checksum_sha].eq(nil)
.and(local_registry_table[:last_repository_verification_failure].eq(nil))
.and(local_registry_table[:resync_repository].eq(false))
.and(repository_missing_on_primary_is_not_true)
end
def repository_missing_on_primary_is_not_true
Arel::Nodes::SqlLiteral.new("project_registry.repository_missing_on_primary IS NOT TRUE")
end
def local_wiki_condition
local_registry_table[:wiki_verification_checksum_sha].eq(nil)
.and(local_registry_table[:last_wiki_verification_failure].eq(nil))
.and(local_registry_table[:resync_wiki].eq(false))
.and(wiki_missing_on_primary_is_not_true)
end
def wiki_missing_on_primary_is_not_true
Arel::Nodes::SqlLiteral.new("project_registry.wiki_missing_on_primary IS NOT TRUE")
end
# rubocop:disable CodeReuse/ActiveRecord
def registries_pending_verification
repo_condition =
local_repo_condition
.and(fdw_repository_state_table[:repository_verification_checksum].not_eq(nil))
wiki_condition =
local_wiki_condition
.and(fdw_repository_state_table[:wiki_verification_checksum].not_eq(nil))
Geo::ProjectRegistry
.joins(fdw_inner_join_projects)
.joins(fdw_inner_join_repository_state)
.where(repo_condition.or(wiki_condition))
.where(fdw_project_table[:repository_storage].eq(shard_name))
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def registries_pending_verification_for_selective_sync
registries = Geo::ProjectRegistry
.where(local_repo_condition.or(local_wiki_condition))
.pluck(:project_id, local_repo_condition.to_sql, local_wiki_condition.to_sql)
return Geo::ProjectRegistry.none if registries.empty?
id_and_want_to_sync = registries.map do |project_id, want_to_sync_repo, want_to_sync_wiki|
"(#{project_id}, #{quote_value(want_to_sync_repo)}, #{quote_value(want_to_sync_wiki)})"
end
project_registry_sync_table = Arel::Table.new(:project_registry_sync_table)
joined_relation =
ProjectRepositoryState.joins(<<~SQL_REPO)
INNER JOIN
(VALUES #{id_and_want_to_sync.join(',')})
project_registry_sync_table(project_id, want_to_sync_repo, want_to_sync_wiki)
ON #{legacy_repository_state_table.name}.project_id = project_registry_sync_table.project_id
SQL_REPO
project_ids = joined_relation
.joins(:project)
.where(projects: { repository_storage: shard_name })
.where(
legacy_repository_state_table[:repository_verification_checksum].not_eq(nil)
.and(project_registry_sync_table[:want_to_sync_repo].eq(true))
.or(legacy_repository_state_table[:wiki_verification_checksum].not_eq(nil)
.and(project_registry_sync_table[:want_to_sync_wiki].eq(true))))
.limit(batch_size)
.pluck(:project_id)
legacy_inner_join_registry_ids(
Geo::ProjectRegistry.where(project_id: project_ids),
current_node.projects.pluck(:id),
Geo::ProjectRegistry,
foreign_key: :project_id
)
end
# rubocop: enable CodeReuse/ActiveRecord
def legacy_repository_state_table
::ProjectRepositoryState.arel_table
end
end
end
......@@ -66,13 +66,8 @@ module Geo
registries_retrying_verification(:wiki).count
end
# Find all registries that need a repository or wiki verification
def find_registries_to_verify(shard_name:, batch_size:)
if use_legacy_queries?
legacy_find_registries_to_verify(shard_name: shard_name, batch_size: batch_size)
else
fdw_find_registries_to_verify(shard_name: shard_name, batch_size: batch_size)
end
registries_pending_verification(shard_name, batch_size)
end
# rubocop: disable CodeReuse/ActiveRecord
......@@ -124,41 +119,6 @@ module Geo
end
# rubocop: enable CodeReuse/ActiveRecord
# Find all registries that repository or wiki need verification
# @return [ActiveRecord::Relation<Geo::ProjectRegistry>] list of registries that need verification
# rubocop: disable CodeReuse/ActiveRecord
def fdw_find_registries_to_verify(shard_name:, batch_size:)
repo_condition =
local_repo_condition
.and(fdw_repository_state_table[:repository_verification_checksum].not_eq(nil))
wiki_condition =
local_wiki_condition
.and(fdw_repository_state_table[:wiki_verification_checksum].not_eq(nil))
Geo::ProjectRegistry
.joins(fdw_inner_join_projects)
.joins(fdw_inner_join_repository_state)
.where(repo_condition.or(wiki_condition))
.where(fdw_project_table[:repository_storage].eq(shard_name))
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
def fdw_inner_join_projects
local_registry_table
.join(fdw_project_table, Arel::Nodes::InnerJoin)
.on(local_registry_table[:project_id].eq(fdw_project_table[:id]))
.join_sources
end
def fdw_inner_join_repository_state
local_registry_table
.join(fdw_repository_state_table, Arel::Nodes::InnerJoin)
.on(local_registry_table[:project_id].eq(fdw_repository_state_table[:project_id]))
.join_sources
end
#
# Legacy accessors (non FDW)
#
......@@ -195,86 +155,10 @@ module Geo
end
# rubocop: enable CodeReuse/ActiveRecord
def quote_value(value)
::Gitlab::SQL::Glob.q(value)
end
# @return [ActiveRecord::Relation<Geo::ProjectRegistry>] list of registries that need verification
# rubocop: disable CodeReuse/ActiveRecord
def legacy_find_registries_to_verify(shard_name:, batch_size:)
registries = Geo::ProjectRegistry
.where(local_repo_condition.or(local_wiki_condition))
.pluck(:project_id, local_repo_condition.to_sql, local_wiki_condition.to_sql)
return Geo::ProjectRegistry.none if registries.empty?
id_and_want_to_sync = registries.map do |project_id, want_to_sync_repo, want_to_sync_wiki|
"(#{project_id}, #{quote_value(want_to_sync_repo)}, #{quote_value(want_to_sync_wiki)})"
end
project_registry_sync_table = Arel::Table.new(:project_registry_sync_table)
joined_relation =
ProjectRepositoryState.joins(<<~SQL_REPO)
INNER JOIN
(VALUES #{id_and_want_to_sync.join(',')})
project_registry_sync_table(project_id, want_to_sync_repo, want_to_sync_wiki)
ON #{legacy_repository_state_table.name}.project_id = project_registry_sync_table.project_id
SQL_REPO
project_ids = joined_relation
.joins(:project)
.where(projects: { repository_storage: shard_name })
.where(
legacy_repository_state_table[:repository_verification_checksum].not_eq(nil)
.and(project_registry_sync_table[:want_to_sync_repo].eq(true))
.or(legacy_repository_state_table[:wiki_verification_checksum].not_eq(nil)
.and(project_registry_sync_table[:want_to_sync_wiki].eq(true))))
.limit(batch_size)
.pluck(:project_id)
Geo::ProjectRegistry.where(project_id: project_ids)
end
# rubocop: enable CodeReuse/ActiveRecord
def legacy_repository_state_table
::ProjectRepositoryState.arel_table
end
def fdw_project_table
Geo::Fdw::Project.arel_table
end
def fdw_repository_state_table
Geo::Fdw::ProjectRepositoryState.arel_table
end
def local_registry_table
Geo::ProjectRegistry.arel_table
end
def local_repo_condition
local_registry_table[:repository_verification_checksum_sha].eq(nil)
.and(local_registry_table[:last_repository_verification_failure].eq(nil))
.and(local_registry_table[:resync_repository].eq(false))
.and(repository_missing_on_primary_is_not_true)
end
def local_wiki_condition
local_registry_table[:wiki_verification_checksum_sha].eq(nil)
.and(local_registry_table[:last_wiki_verification_failure].eq(nil))
.and(local_registry_table[:resync_wiki].eq(false))
.and(wiki_missing_on_primary_is_not_true)
end
def repository_missing_on_primary_is_not_true
Arel::Nodes::SqlLiteral.new("project_registry.repository_missing_on_primary IS NOT TRUE")
end
def wiki_missing_on_primary_is_not_true
Arel::Nodes::SqlLiteral.new("project_registry.wiki_missing_on_primary IS NOT TRUE")
end
private
def finder_klass_for_synced_registries
......@@ -360,5 +244,19 @@ module Geo
.new(current_node: current_node, type: type)
.execute
end
def finder_klass_for_registries_pending_verification
if Gitlab::Geo::Fdw.enabled_for_selective_sync?
Geo::ProjectRegistryPendingVerificationFinder
else
Geo::LegacyProjectRegistryPendingVerificationFinder
end
end
def registries_pending_verification(shard_name, batch_size)
finder_klass_for_registries_pending_verification
.new(current_node: current_node, shard_name: shard_name, batch_size: batch_size)
.execute
end
end
end
# frozen_string_literal: true
# Finder for retrieving project registries that that need a repository or
# wiki verification where projects belong to the specific shard using
# FDW queries.
#
# Basic usage:
#
# Geo::ProjectRegistryPendingVerificationFinder
# .new(current_node: Gitlab::Geo.current_node, shard_name: 'default', batch_size: 1000)
# .execute.
module Geo
class ProjectRegistryPendingVerificationFinder
def initialize(current_node:, shard_name:, batch_size:)
@current_node = Geo::Fdw::GeoNode.find(current_node.id)
@shard_name = shard_name
@batch_size = batch_size
end
def execute
repo_condition =
local_repo_condition
.and(fdw_repository_state_table[:repository_verification_checksum].not_eq(nil))
wiki_condition =
local_wiki_condition
.and(fdw_repository_state_table[:wiki_verification_checksum].not_eq(nil))
current_node.project_registries
.joins(fdw_inner_join_projects)
.joins(fdw_inner_join_repository_state)
.where(repo_condition.or(wiki_condition))
.where(fdw_project_table[:repository_storage].eq(shard_name))
.limit(batch_size)
end
private
attr_reader :current_node, :shard_name,:batch_size
def local_registry_table
Geo::ProjectRegistry.arel_table
end
def fdw_project_table
Geo::Fdw::Project.arel_table
end
def fdw_repository_state_table
Geo::Fdw::ProjectRepositoryState.arel_table
end
def fdw_inner_join_projects
local_registry_table
.join(fdw_project_table, Arel::Nodes::InnerJoin)
.on(local_registry_table[:project_id].eq(fdw_project_table[:id]))
.join_sources
end
def fdw_inner_join_repository_state
local_registry_table
.join(fdw_repository_state_table, Arel::Nodes::InnerJoin)
.on(local_registry_table[:project_id].eq(fdw_repository_state_table[:project_id]))
.join_sources
end
def local_repo_condition
local_registry_table[:repository_verification_checksum_sha].eq(nil)
.and(local_registry_table[:last_repository_verification_failure].eq(nil))
.and(local_registry_table[:resync_repository].eq(false))
.and(repository_missing_on_primary_is_not_true)
end
def repository_missing_on_primary_is_not_true
Arel::Nodes::SqlLiteral.new("project_registry.repository_missing_on_primary IS NOT TRUE")
end
def local_wiki_condition
local_registry_table[:wiki_verification_checksum_sha].eq(nil)
.and(local_registry_table[:last_wiki_verification_failure].eq(nil))
.and(local_registry_table[:resync_wiki].eq(false))
.and(wiki_missing_on_primary_is_not_true)
end
def wiki_missing_on_primary_is_not_true
Arel::Nodes::SqlLiteral.new("project_registry.wiki_missing_on_primary IS NOT TRUE")
end
end
end
......@@ -55,5 +55,9 @@ module Geo
joined_relation.where(registry: { registry_present: [nil, false] })
end
# rubocop: enable CodeReuse/ActiveRecord
def quote_value(value)
::Gitlab::SQL::Glob.q(value)
end
end
end
......@@ -450,126 +450,6 @@ describe Geo::ProjectRegistryFinder, :geo do
end
end
end
describe '#find_registries_to_verify' do
it 'delegates to the correct method' do
expect(subject).to receive("#{method_prefix}_find_registries_to_verify".to_sym).and_call_original
subject.find_registries_to_verify(shard_name: 'default', batch_size: 10)
end
it 'does not return registries that are verified on primary and secondary' do
project_verified = create(:repository_state, :repository_verified, :wiki_verified).project
repository_verified = create(:repository_state, :repository_verified).project
wiki_verified = create(:repository_state, :wiki_verified).project
create(:geo_project_registry, :repository_verified, :wiki_verified, project: project_verified)
create(:geo_project_registry, :repository_verified, project: repository_verified)
create(:geo_project_registry, :wiki_verified, project: wiki_verified)
expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end
it 'does not return registries that were unverified/outdated on primary' do
project_unverified_primary = create(:project)
project_outdated_primary = create(:repository_state, :repository_outdated, :wiki_outdated).project
repository_outdated_primary = create(:repository_state, :repository_outdated, :wiki_verified).project
wiki_outdated_primary = create(:repository_state, :repository_verified, :wiki_outdated).project
create(:geo_project_registry, project: project_unverified_primary)
create(:geo_project_registry, :repository_verification_outdated, :wiki_verification_outdated, project: project_outdated_primary)
create(:geo_project_registry, :repository_verified, :wiki_verified, project: repository_outdated_primary)
create(:geo_project_registry, :repository_verified, :wiki_verified, project: wiki_outdated_primary)
expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end
it 'returns registries that were unverified/outdated on secondary' do
# Secondary unverified/outdated
project_unverified_secondary = create(:repository_state, :repository_verified, :wiki_verified).project
project_outdated_secondary = create(:repository_state, :repository_verified, :wiki_verified).project
repository_outdated_secondary = create(:repository_state, :repository_verified, :wiki_verified).project
wiki_outdated_secondary = create(:repository_state, :repository_verified, :wiki_verified).project
registry_unverified_secondary = create(:geo_project_registry, :synced, project: project_unverified_secondary)
registry_outdated_secondary = create(:geo_project_registry, :synced, :repository_verification_outdated, :wiki_verification_outdated, project: project_outdated_secondary)
registry_repository_outdated_secondary = create(:geo_project_registry, :synced, :repository_verification_outdated, :wiki_verified, project: repository_outdated_secondary)
registry_wiki_outdated_secondary = create(:geo_project_registry, :synced, :repository_verified, :wiki_verification_outdated, project: wiki_outdated_secondary)
expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100))
.to match_array([
registry_unverified_secondary,
registry_outdated_secondary,
registry_repository_outdated_secondary,
registry_wiki_outdated_secondary
])
end
it 'does not return registries that failed on primary' do
verification_failed_primary = create(:repository_state, :repository_failed, :wiki_failed).project
create(:geo_project_registry, project: verification_failed_primary)
expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end
it 'returns registries where one failed and one verified on the primary' do
verification_failed_primary = create(:repository_state, :repository_failed, :wiki_failed).project
repository_failed_primary = create(:repository_state, :repository_failed, :wiki_verified).project
wiki_failed_primary = create(:repository_state, :repository_verified, :wiki_failed).project
create(:geo_project_registry, :synced, project: verification_failed_primary)
registry_repository_failed_primary = create(:geo_project_registry, :synced, project: repository_failed_primary)
registry_wiki_failed_primary = create(:geo_project_registry, :synced, project: wiki_failed_primary)
expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100))
.to match_array([
registry_repository_failed_primary,
registry_wiki_failed_primary
])
end
it 'does not return registries where verification failed on secondary' do
# Verification failed on secondary
verification_failed_secondary = create(:repository_state, :repository_verified, :wiki_verified).project
repository_failed_secondary = create(:repository_state, :repository_verified).project
wiki_failed_secondary = create(:repository_state, :wiki_verified).project
create(:geo_project_registry, :repository_verification_failed, :wiki_verification_failed, project: verification_failed_secondary)
create(:geo_project_registry, :repository_verification_failed, project: repository_failed_secondary)
create(:geo_project_registry, :wiki_verification_failed, project: wiki_failed_secondary)
expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end
it 'does not return registries when the repo needs to be resynced' do
project_verified = create(:repository_state, :repository_verified).project
create(:geo_project_registry, :repository_sync_failed, project: project_verified)
expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end
it 'does not return registries when the wiki needs to be resynced' do
project_verified = create(:repository_state, :wiki_verified).project
create(:geo_project_registry, :wiki_sync_failed, project: project_verified)
expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end
it 'does not return registries when the repository is missing on primary' do
project_verified = create(:repository_state, :repository_verified).project
create(:geo_project_registry, :synced, project: project_verified, repository_missing_on_primary: true)
expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end
it 'does not return registries when the wiki is missing on primary' do
project_verified = create(:repository_state, :wiki_verified).project
create(:geo_project_registry, :synced, project: project_verified, wiki_missing_on_primary: true)
expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end
end
end
# Disable transactions via :delete method because a foreign table
......@@ -586,6 +466,16 @@ describe Geo::ProjectRegistryFinder, :geo do
include_examples 'counts all the things', 'fdw'
include_examples 'finds all the things', 'fdw'
describe '#find_registries_to_verify' do
it 'delegates to Geo::LegacyProjectRegistryPendingVerificationFinder' do
expect_next_instance_of(Geo::LegacyProjectRegistryPendingVerificationFinder, current_node: secondary, shard_name: 'default', batch_size: 100) do |finder|
expect(finder).to receive(:execute).once
end
subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)
end
end
end
context 'with use_fdw_queries_for_selective_sync enabled' do
......@@ -595,6 +485,16 @@ describe Geo::ProjectRegistryFinder, :geo do
include_examples 'counts all the things', 'fdw'
include_examples 'finds all the things', 'fdw'
describe '#find_registries_to_verify' do
it 'delegates to Geo::ProjectRegistryPendingVerificationFinder' do
expect_next_instance_of(Geo::ProjectRegistryPendingVerificationFinder, current_node: secondary, shard_name: 'default', batch_size: 100) do |finder|
expect(finder).to receive(:execute).once
end
subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)
end
end
end
end
......@@ -605,5 +505,15 @@ describe Geo::ProjectRegistryFinder, :geo do
include_examples 'counts all the things', 'legacy'
include_examples 'finds all the things', 'legacy'
describe '#find_registries_to_verify' do
it 'delegates to Geo::LegacyProjectRegistryPendingVerificationFinder' do
expect_next_instance_of(Geo::LegacyProjectRegistryPendingVerificationFinder, current_node: secondary, shard_name: 'default', batch_size: 100) do |finder|
expect(finder).to receive(:execute).once
end
subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment