Apply shard restriction while loading new resources to verify

This changes prevents a project to be schedule more than one time
parent 1ce962d8
...@@ -107,11 +107,11 @@ module Geo ...@@ -107,11 +107,11 @@ module Geo
end end
# Find all registries that need a repository or wiki verification # Find all registries that need a repository or wiki verification
def find_registries_to_verify(batch_size:) def find_registries_to_verify(shard_name:, batch_size:)
if use_legacy_queries? if use_legacy_queries?
legacy_find_registries_to_verify(batch_size: batch_size) legacy_find_registries_to_verify(shard_name: shard_name, batch_size: batch_size)
else else
fdw_find_registries_to_verify(batch_size: batch_size) fdw_find_registries_to_verify(shard_name: shard_name, batch_size: batch_size)
end end
end end
...@@ -191,7 +191,7 @@ module Geo ...@@ -191,7 +191,7 @@ module Geo
# @return [ActiveRecord::Relation<Geo::Fdw::Project>] # @return [ActiveRecord::Relation<Geo::Fdw::Project>]
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def fdw_find_unsynced_projects def fdw_find_unsynced_projects
Geo::Fdw::Project.joins("LEFT OUTER JOIN project_registry ON project_registry.project_id = #{fdw_project_table}.id") Geo::Fdw::Project.joins("LEFT OUTER JOIN project_registry ON project_registry.project_id = #{fdw_project_table.name}.id")
.where(project_registry: { project_id: nil }) .where(project_registry: { project_id: nil })
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
...@@ -204,7 +204,7 @@ module Geo ...@@ -204,7 +204,7 @@ module Geo
# @return [ActiveRecord::Relation<Geo::Fdw::Project>] # @return [ActiveRecord::Relation<Geo::Fdw::Project>]
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def fdw_find_projects_updated_recently def fdw_find_projects_updated_recently
Geo::Fdw::Project.joins("INNER JOIN project_registry ON project_registry.project_id = #{fdw_project_table}.id") Geo::Fdw::Project.joins("INNER JOIN project_registry ON project_registry.project_id = #{fdw_project_table.name}.id")
.merge(Geo::ProjectRegistry.dirty) .merge(Geo::ProjectRegistry.dirty)
.merge(Geo::ProjectRegistry.retry_due) .merge(Geo::ProjectRegistry.retry_due)
end end
...@@ -213,17 +213,20 @@ module Geo ...@@ -213,17 +213,20 @@ module Geo
# Find all registries that repository or wiki need verification # Find all registries that repository or wiki need verification
# @return [ActiveRecord::Relation<Geo::ProjectRegistry>] list of registries that need verification # @return [ActiveRecord::Relation<Geo::ProjectRegistry>] list of registries that need verification
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def fdw_find_registries_to_verify(batch_size:) def fdw_find_registries_to_verify(shard_name:, batch_size:)
repo_condition = repo_condition =
local_repo_condition local_repo_condition
.and(fdw_repository_state_table[:repository_verification_checksum].not_eq(nil)) .and(fdw_repository_state_table[:repository_verification_checksum].not_eq(nil))
wiki_condition = wiki_condition =
local_wiki_condition local_wiki_condition
.and(fdw_repository_state_table[:wiki_verification_checksum].not_eq(nil)) .and(fdw_repository_state_table[:wiki_verification_checksum].not_eq(nil))
Geo::ProjectRegistry Geo::ProjectRegistry
.joins(fdw_inner_join_projects)
.joins(fdw_inner_join_repository_state) .joins(fdw_inner_join_repository_state)
.where(repo_condition.or(wiki_condition)) .where(repo_condition.or(wiki_condition))
.where(fdw_project_table[:repository_storage].eq(shard_name))
.limit(batch_size) .limit(batch_size)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
...@@ -233,6 +236,13 @@ module Geo ...@@ -233,6 +236,13 @@ module Geo
Geo::ProjectRegistry.verified_wikis Geo::ProjectRegistry.verified_wikis
end end
def fdw_inner_join_projects
local_registry_table
.join(fdw_project_table, Arel::Nodes::InnerJoin)
.on(local_registry_table[:project_id].eq(fdw_project_table[:id]))
.join_sources
end
def fdw_inner_join_repository_state def fdw_inner_join_repository_state
local_registry_table local_registry_table
.join(fdw_repository_state_table, Arel::Nodes::InnerJoin) .join(fdw_repository_state_table, Arel::Nodes::InnerJoin)
...@@ -361,7 +371,7 @@ module Geo ...@@ -361,7 +371,7 @@ module Geo
# @return [ActiveRecord::Relation<Geo::ProjectRegistry>] list of registries that need verification # @return [ActiveRecord::Relation<Geo::ProjectRegistry>] list of registries that need verification
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def legacy_find_registries_to_verify(batch_size:) def legacy_find_registries_to_verify(shard_name:, batch_size:)
registries = Geo::ProjectRegistry registries = Geo::ProjectRegistry
.where(local_repo_condition.or(local_wiki_condition)) .where(local_repo_condition.or(local_wiki_condition))
.pluck(:project_id, local_repo_condition.to_sql, local_wiki_condition.to_sql) .pluck(:project_id, local_repo_condition.to_sql, local_wiki_condition.to_sql)
...@@ -383,6 +393,8 @@ module Geo ...@@ -383,6 +393,8 @@ module Geo
SQL_REPO SQL_REPO
project_ids = joined_relation project_ids = joined_relation
.joins(:project)
.where(projects: { repository_storage: shard_name })
.where( .where(
legacy_repository_state_table[:repository_verification_checksum].not_eq(nil) legacy_repository_state_table[:repository_verification_checksum].not_eq(nil)
.and(project_registry_sync_table[:want_to_sync_repo].eq(true)) .and(project_registry_sync_table[:want_to_sync_repo].eq(true))
...@@ -400,7 +412,7 @@ module Geo ...@@ -400,7 +412,7 @@ module Geo
end end
def fdw_project_table def fdw_project_table
Geo::Fdw::Project.table_name Geo::Fdw::Project.arel_table
end end
def fdw_repository_state_table def fdw_repository_state_table
......
...@@ -35,7 +35,8 @@ module Geo ...@@ -35,7 +35,8 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def load_pending_resources def load_pending_resources
finder.find_registries_to_verify(batch_size: db_retrieve_batch_size).pluck(:id) finder.find_registries_to_verify(shard_name: shard_name, batch_size: db_retrieve_batch_size)
.pluck(:id)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
...@@ -46,7 +47,7 @@ module Geo ...@@ -46,7 +47,7 @@ module Geo
end end
def finder def finder
@finder ||= Geo::ProjectRegistryFinder.new @finder ||= Geo::ProjectRegistryFinder.new(current_node: current_node)
end end
end end
end end
......
---
title: Geo - Respect shard restriction while loading new resources to verify on the
Geo secondary node
merge_request: 9343
author:
type: fixed
...@@ -515,7 +515,7 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -515,7 +515,7 @@ describe Geo::ProjectRegistryFinder, :geo do
it 'delegates to the correct method' do it 'delegates to the correct method' do
expect(subject).to receive("#{method_prefix}_find_registries_to_verify".to_sym).and_call_original expect(subject).to receive("#{method_prefix}_find_registries_to_verify".to_sym).and_call_original
subject.find_registries_to_verify(batch_size: 10) subject.find_registries_to_verify(shard_name: 'default', batch_size: 10)
end end
it 'does not return registries that are verified on primary and secondary' do it 'does not return registries that are verified on primary and secondary' do
...@@ -527,7 +527,7 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -527,7 +527,7 @@ describe Geo::ProjectRegistryFinder, :geo do
create(:geo_project_registry, :repository_verified, project: repository_verified) create(:geo_project_registry, :repository_verified, project: repository_verified)
create(:geo_project_registry, :wiki_verified, project: wiki_verified) create(:geo_project_registry, :wiki_verified, project: wiki_verified)
expect(subject.find_registries_to_verify(batch_size: 100)).to be_empty expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end end
it 'does not return registries that were unverified/outdated on primary' do it 'does not return registries that were unverified/outdated on primary' do
...@@ -541,7 +541,7 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -541,7 +541,7 @@ describe Geo::ProjectRegistryFinder, :geo do
create(:geo_project_registry, :repository_verified, :wiki_verified, project: repository_outdated_primary) create(:geo_project_registry, :repository_verified, :wiki_verified, project: repository_outdated_primary)
create(:geo_project_registry, :repository_verified, :wiki_verified, project: wiki_outdated_primary) create(:geo_project_registry, :repository_verified, :wiki_verified, project: wiki_outdated_primary)
expect(subject.find_registries_to_verify(batch_size: 100)).to be_empty expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end end
it 'returns registries that were unverified/outdated on secondary' do it 'returns registries that were unverified/outdated on secondary' do
...@@ -556,7 +556,7 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -556,7 +556,7 @@ describe Geo::ProjectRegistryFinder, :geo do
registry_repository_outdated_secondary = create(:geo_project_registry, :synced, :repository_verification_outdated, :wiki_verified, project: repository_outdated_secondary) registry_repository_outdated_secondary = create(:geo_project_registry, :synced, :repository_verification_outdated, :wiki_verified, project: repository_outdated_secondary)
registry_wiki_outdated_secondary = create(:geo_project_registry, :synced, :repository_verified, :wiki_verification_outdated, project: wiki_outdated_secondary) registry_wiki_outdated_secondary = create(:geo_project_registry, :synced, :repository_verified, :wiki_verification_outdated, project: wiki_outdated_secondary)
expect(subject.find_registries_to_verify(batch_size: 100)) expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100))
.to match_array([ .to match_array([
registry_unverified_secondary, registry_unverified_secondary,
registry_outdated_secondary, registry_outdated_secondary,
...@@ -570,7 +570,7 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -570,7 +570,7 @@ describe Geo::ProjectRegistryFinder, :geo do
create(:geo_project_registry, project: verification_failed_primary) create(:geo_project_registry, project: verification_failed_primary)
expect(subject.find_registries_to_verify(batch_size: 100)).to be_empty expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end end
it 'returns registries where one failed and one verified on the primary' do it 'returns registries where one failed and one verified on the primary' do
...@@ -582,7 +582,7 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -582,7 +582,7 @@ describe Geo::ProjectRegistryFinder, :geo do
registry_repository_failed_primary = create(:geo_project_registry, :synced, project: repository_failed_primary) registry_repository_failed_primary = create(:geo_project_registry, :synced, project: repository_failed_primary)
registry_wiki_failed_primary = create(:geo_project_registry, :synced, project: wiki_failed_primary) registry_wiki_failed_primary = create(:geo_project_registry, :synced, project: wiki_failed_primary)
expect(subject.find_registries_to_verify(batch_size: 100)) expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100))
.to match_array([ .to match_array([
registry_repository_failed_primary, registry_repository_failed_primary,
registry_wiki_failed_primary registry_wiki_failed_primary
...@@ -599,35 +599,35 @@ describe Geo::ProjectRegistryFinder, :geo do ...@@ -599,35 +599,35 @@ describe Geo::ProjectRegistryFinder, :geo do
create(:geo_project_registry, :repository_verification_failed, project: repository_failed_secondary) create(:geo_project_registry, :repository_verification_failed, project: repository_failed_secondary)
create(:geo_project_registry, :wiki_verification_failed, project: wiki_failed_secondary) create(:geo_project_registry, :wiki_verification_failed, project: wiki_failed_secondary)
expect(subject.find_registries_to_verify(batch_size: 100)).to be_empty expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end end
it 'does not return registries when the repo needs to be resynced' do it 'does not return registries when the repo needs to be resynced' do
project_verified = create(:repository_state, :repository_verified).project project_verified = create(:repository_state, :repository_verified).project
create(:geo_project_registry, :repository_sync_failed, project: project_verified) create(:geo_project_registry, :repository_sync_failed, project: project_verified)
expect(subject.find_registries_to_verify(batch_size: 100)).to be_empty expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end end
it 'does not return registries when the wiki needs to be resynced' do it 'does not return registries when the wiki needs to be resynced' do
project_verified = create(:repository_state, :wiki_verified).project project_verified = create(:repository_state, :wiki_verified).project
create(:geo_project_registry, :wiki_sync_failed, project: project_verified) create(:geo_project_registry, :wiki_sync_failed, project: project_verified)
expect(subject.find_registries_to_verify(batch_size: 100)).to be_empty expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end end
it 'does not return registries when the repository is missing on primary' do it 'does not return registries when the repository is missing on primary' do
project_verified = create(:repository_state, :repository_verified).project project_verified = create(:repository_state, :repository_verified).project
create(:geo_project_registry, :synced, project: project_verified, repository_missing_on_primary: true) create(:geo_project_registry, :synced, project: project_verified, repository_missing_on_primary: true)
expect(subject.find_registries_to_verify(batch_size: 100)).to be_empty expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end end
it 'does not return registries when the wiki is missing on primary' do it 'does not return registries when the wiki is missing on primary' do
project_verified = create(:repository_state, :wiki_verified).project project_verified = create(:repository_state, :wiki_verified).project
create(:geo_project_registry, :synced, project: project_verified, wiki_missing_on_primary: true) create(:geo_project_registry, :synced, project: project_verified, wiki_missing_on_primary: true)
expect(subject.find_registries_to_verify(batch_size: 100)).to be_empty expect(subject.find_registries_to_verify(shard_name: 'default', batch_size: 100)).to be_empty
end end
end end
end end
......
...@@ -13,5 +13,9 @@ module EE ...@@ -13,5 +13,9 @@ module EE
allow(::Gitlab::Geo).to receive(:primary?).and_return(false) allow(::Gitlab::Geo).to receive(:primary?).and_return(false)
allow(::Gitlab::Geo).to receive(:secondary?).and_return(true) allow(::Gitlab::Geo).to receive(:secondary?).and_return(true)
end end
def stub_fdw_disabled
allow(::Gitlab::Geo::Fdw).to receive(:enabled?).and_return(false)
end
end end
end end
...@@ -12,7 +12,7 @@ describe Geo::RepositoryVerification::Secondary::ShardWorker, :postgresql, :clea ...@@ -12,7 +12,7 @@ describe Geo::RepositoryVerification::Secondary::ShardWorker, :postgresql, :clea
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
allow(Gitlab::Geo::Fdw).to receive(:enabled?).and_return(false) stub_fdw_disabled
end end
describe '#perform' do describe '#perform' do
...@@ -60,6 +60,17 @@ describe Geo::RepositoryVerification::Secondary::ShardWorker, :postgresql, :clea ...@@ -60,6 +60,17 @@ describe Geo::RepositoryVerification::Secondary::ShardWorker, :postgresql, :clea
subject.perform(shard_name) subject.perform(shard_name)
end end
it 'does not schedule jobs for projects on other shards' do
project_other_shard = create(:project)
project_other_shard.update_column(:repository_storage, 'other')
create(:repository_state, :repository_verified, :wiki_verified, project: project_other_shard)
registry_other_shard = create(:geo_project_registry, :synced, :wiki_verified, project: project_other_shard)
expect(secondary_singleworker).not_to receive(:perform_async).with(registry_other_shard.id)
subject.perform(shard_name)
end
it 'does not schedule jobs for projects missing repositories on primary' do it 'does not schedule jobs for projects missing repositories on primary' do
other_project = create(:project) other_project = create(:project)
create(:repository_state, :repository_verified, project: project) create(:repository_state, :repository_verified, project: project)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment