Perform the repository verification per shard on a secondary node

parent b17e7d7c
......@@ -126,14 +126,18 @@
- cronjob:geo_repository_verification_primary_batch
- cronjob:geo_repository_verification_secondary_scheduler
- cronjob:geo_sidekiq_cron_config
- cronjob:geo_scheduler_per_shard_scheduler
- cronjob:geo_scheduler_primary_per_shard_scheduler
- cronjob:geo_scheduler_secondary_per_shard_scheduler
- cronjob:geo_repository_verification_secondary_shard
- cronjob:historical_data
- cronjob:ldap_all_groups_sync
- cronjob:ldap_sync
- cronjob:update_all_mirrors
- geo:geo_scheduler_base
- geo:geo_scheduler_primary
- geo:geo_scheduler_secondary
- geo:geo_scheduler_scheduler
- geo:geo_scheduler_primary_scheduler
- geo:geo_scheduler_secondary_scheduler
- geo:geo_file_download
- geo:geo_file_removal
- geo:geo_hashed_storage_attachments_migration
......
module Geo
class FileDownloadDispatchWorker < Geo::Scheduler::SecondaryWorker
class FileDownloadDispatchWorker < Geo::Scheduler::Secondary::SchedulerWorker
include CronjobQueue
private
......
module Geo
class RepositoryShardSyncWorker < Geo::Scheduler::SecondaryWorker
class RepositoryShardSyncWorker < Geo::Scheduler::Secondary::SchedulerWorker
sidekiq_options retry: false
attr_accessor :shard_name
......
module Geo
class RepositorySyncWorker
include ApplicationWorker
include CronjobQueue
HEALTHY_SHARD_CHECKS = [
Gitlab::HealthChecks::FsShardsCheck,
Gitlab::HealthChecks::GitalyCheck
].freeze
def perform
return unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary?
shards = selective_sync_filter(healthy_shards)
Gitlab::Geo::ShardHealthCache.update(shards)
shards.each do |shard_name|
class RepositorySyncWorker < Geo::Scheduler::Secondary::PerShardSchedulerWorker
def schedule_job(shard_name)
RepositoryShardSyncWorker.perform_async(shard_name)
end
end
def healthy_shards
# For now, we need to perform both Gitaly and direct filesystem checks to ensure
# the shard is healthy. We take the intersection of the successful checks
# as the healthy shards.
HEALTHY_SHARD_CHECKS.map(&:readiness)
.map { |check_result| check_result.select(&:success) }
.inject(:&)
.map { |check_result| check_result.labels[:shard] }
.compact
.uniq
end
def selective_sync_filter(shards)
return shards unless ::Gitlab::Geo.current_node&.selective_sync_by_shards?
shards & ::Gitlab::Geo.current_node.selective_sync_shards
end
end
end
module Geo
module RepositoryVerification
module Primary
class BatchWorker
include ApplicationWorker
include CronjobQueue
include ::Gitlab::Utils::StrongMemoize
HEALTHY_SHARD_CHECKS = [
Gitlab::HealthChecks::FsShardsCheck,
Gitlab::HealthChecks::GitalyCheck
].freeze
class BatchWorker < Geo::Scheduler::Primary::PerShardSchedulerWorker
def perform
return unless Feature.enabled?('geo_repository_verification')
return unless Gitlab::Geo.primary?
Gitlab::Geo::ShardHealthCache.update(healthy_shards)
healthy_shards.each do |shard_name|
Geo::RepositoryVerification::Primary::ShardWorker.perform_async(shard_name)
end
super
end
def healthy_shards
strong_memoize(:healthy_shards) do
# For now, we need to perform both Gitaly and direct filesystem checks to ensure
# the shard is healthy. We take the intersection of the successful checks
# as the healthy shards.
HEALTHY_SHARD_CHECKS.map(&:readiness)
.map { |check_result| check_result.select(&:success) }
.inject(:&)
.map { |check_result| check_result.labels[:shard] }
.compact
.uniq
end
def schedule_job(shard_name)
Geo::RepositoryVerification::Primary::ShardWorker.perform_async(shard_name)
end
end
end
......
module Geo
module RepositoryVerification
module Primary
class ShardWorker < Geo::Scheduler::PrimaryWorker
class ShardWorker < Geo::Scheduler::Primary::SchedulerWorker
sidekiq_options retry: false
MAX_CAPACITY = 100
......
module Geo
module RepositoryVerification
module Secondary
class SchedulerWorker < Geo::Scheduler::SecondaryWorker
include CronjobQueue
MAX_CAPACITY = 1000
class SchedulerWorker < Geo::Scheduler::Secondary::PerShardSchedulerWorker
def perform
return unless Feature.enabled?('geo_repository_verification')
super
end
private
def max_capacity
MAX_CAPACITY
end
def load_pending_resources
finder.find_registries_to_verify(batch_size: db_retrieve_batch_size)
.pluck(:id)
end
def schedule_job(registry_id)
job_id = Geo::RepositoryVerification::Secondary::SingleWorker.perform_async(registry_id)
{ id: registry_id, job_id: job_id } if job_id
end
def finder
@finder ||= Geo::ProjectRegistryFinder.new
def schedule_job(shard_name)
Geo::RepositoryVerification::Secondary::ShardWorker.perform_async(shard_name)
end
end
end
......
module Geo
module RepositoryVerification
module Secondary
class ShardWorker < Geo::Scheduler::Secondary::SchedulerWorker
include CronjobQueue
MAX_CAPACITY = 1000
attr_accessor :shard_name
def perform(shard_name)
@shard_name = shard_name
return unless Gitlab::Geo::ShardHealthCache.healthy_shard?(shard_name)
super()
end
private
def worker_metadata
{ shard: shard_name }
end
def max_capacity
MAX_CAPACITY
end
def load_pending_resources
finder.find_registries_to_verify(batch_size: db_retrieve_batch_size)
.pluck(:id)
end
def schedule_job(registry_id)
job_id = Geo::RepositoryVerification::Secondary::SingleWorker.perform_async(registry_id)
{ id: registry_id, job_id: job_id } if job_id
end
def finder
@finder ||= Geo::ProjectRegistryFinder.new
end
end
end
end
end
module Geo
module Scheduler
class PerShardSchedulerWorker
include ApplicationWorker
include CronjobQueue
include ::Gitlab::Utils::StrongMemoize
HEALTHY_SHARD_CHECKS = [
Gitlab::HealthChecks::FsShardsCheck,
Gitlab::HealthChecks::GitalyCheck
].freeze
def perform
Gitlab::Geo::ShardHealthCache.update(eligible_shards)
eligible_shards.each do |shard_name|
schedule_job(shard_name)
end
end
def schedule_job(shard_name)
raise NotImplementedError
end
def eligible_shards
healthy_shards
end
def healthy_shards
strong_memoize(:healthy_shards) do
# For now, we need to perform both Gitaly and direct filesystem checks to ensure
# the shard is healthy. We take the intersection of the successful checks
# as the healthy shards.
HEALTHY_SHARD_CHECKS.map(&:readiness)
.map { |check_result| check_result.select(&:success) }
.inject(:&)
.map { |check_result| check_result.labels[:shard] }
.compact
.uniq
end
end
end
end
end
module Geo
module Scheduler
module Primary
class PerShardSchedulerWorker < Geo::Scheduler::PerShardSchedulerWorker
def perform
return unless Gitlab::Geo.primary?
super
end
end
end
end
end
module Geo
module Scheduler
module Primary
class SchedulerWorker < Geo::Scheduler::SchedulerWorker
def perform
return unless Gitlab::Geo.primary?
super
end
end
end
end
end
module Geo
module Scheduler
class PrimaryWorker < Geo::Scheduler::BaseWorker
def perform
return unless Gitlab::Geo.primary?
super
end
end
end
end
module Geo
module Scheduler
class BaseWorker
class SchedulerWorker
include ApplicationWorker
include GeoQueue
include ExclusiveLeaseGuard
......
module Geo
module Scheduler
module Secondary
class PerShardSchedulerWorker < Geo::Scheduler::PerShardSchedulerWorker
def perform
return unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary?
super
end
def eligible_shards
selective_sync_filter(healthy_shards)
end
def selective_sync_filter(shards)
return shards unless ::Gitlab::Geo.current_node&.selective_sync_by_shards?
shards & ::Gitlab::Geo.current_node.selective_sync_shards
end
end
end
end
end
module Geo
module Scheduler
module Secondary
class SchedulerWorker < Geo::Scheduler::SchedulerWorker
def perform
return unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary?
super
end
end
end
end
end
module Geo
module Scheduler
class SecondaryWorker < Geo::Scheduler::BaseWorker
def perform
return unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary?
super
end
end
end
end
......@@ -110,7 +110,7 @@ describe Geo::FileDownloadDispatchWorker, :geo do
# 1. A total of 10 files in the queue, and we can load a maximimum of 5 and send 2 at a time.
# 2. We send 2, wait for 1 to finish, and then send again.
it 'attempts to load a new batch without pending downloads' do
stub_const('Geo::Scheduler::BaseWorker::DB_RETRIEVE_BATCH_SIZE', 5)
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 5)
secondary.update!(files_max_capacity: 2)
allow_any_instance_of(::Gitlab::Geo::Transfer).to receive(:download_from_primary).and_return(100)
......@@ -142,7 +142,7 @@ describe Geo::FileDownloadDispatchWorker, :geo do
it 'does not stall backfill' do
unsynced = create(:lfs_object, :with_file)
stub_const('Geo::BaseSchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 1)
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 1)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with(:lfs, failed_registry.file_id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with(:lfs, unsynced.id)
......
......@@ -17,6 +17,10 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
stub_current_geo_node(secondary)
end
around do |example|
Sidekiq::Testing.inline! { example.run }
end
describe '#perform' do
context 'additional shards' do
it 'skips backfill for repositories on other shards' do
......@@ -31,7 +35,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with(project_in_synced_group.repository.storage)
expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('broken')
Sidekiq::Testing.inline! { subject.perform }
subject.perform
end
it 'skips backfill for projects on shards excluded by selective sync' do
......@@ -46,7 +50,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with('default')
expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('broken')
Sidekiq::Testing.inline! { subject.perform }
subject.perform
end
it 'skips backfill for projects on missing shards' do
......@@ -63,7 +67,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with(project_in_synced_group.repository.storage)
expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('unknown')
Sidekiq::Testing.inline! { subject.perform }
subject.perform
end
it 'skips backfill for projects with downed Gitaly server' do
......@@ -81,7 +85,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with(healthy_shard)
expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('broken')
Sidekiq::Testing.inline! { subject.perform }
subject.perform
end
end
end
......
require 'spec_helper'
describe Geo::RepositoryVerification::Secondary::SchedulerWorker, :postgresql, :clean_gitlab_redis_cache do
include ::EE::GeoHelpers
set(:healthy_not_verified) { create(:project) }
let!(:secondary) { create(:geo_node) }
let(:healthy_shard) { healthy_not_verified.repository.storage }
before do
stub_current_geo_node(secondary)
end
around do |example|
Sidekiq::Testing.inline! { example.run }
end
describe '#perform' do
context 'when geo_repository_verification is enabled' do
before do
stub_feature_flags(geo_repository_verification: true)
end
it 'skips verification for repositories on other shards' do
unhealthy_not_verified = create(:project, repository_storage: 'broken')
# Make the shard unhealthy
FileUtils.rm_rf(unhealthy_not_verified.repository_storage_path)
expect(Geo::RepositoryVerification::Secondary::ShardWorker).to receive(:perform_async).with(healthy_shard)
expect(Geo::RepositoryVerification::Secondary::ShardWorker).not_to receive(:perform_async).with('broken')
subject.perform
end
it 'skips verification for projects on missing shards' do
missing_not_verified = create(:project)
missing_not_verified.update_column(:repository_storage, 'unknown')
# hide the 'broken' storage for this spec
stub_storage_settings({})
expect(Geo::RepositoryVerification::Secondary::ShardWorker).to receive(:perform_async).with(healthy_shard)
expect(Geo::RepositoryVerification::Secondary::ShardWorker).not_to receive(:perform_async).with('unknown')
subject.perform
end
it 'skips verification for projects with downed Gitaly server' do
create(:project, repository_storage: 'broken')
# Report only one healthy shard
expect(Gitlab::HealthChecks::FsShardsCheck).to receive(:readiness)
.and_return([result(true, healthy_shard), result(true, 'broken')])
expect(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness)
.and_return([result(true, healthy_shard), result(false, 'broken')])
expect(Geo::RepositoryVerification::Secondary::ShardWorker).to receive(:perform_async).with(healthy_shard)
expect(Geo::RepositoryVerification::Secondary::ShardWorker).not_to receive(:perform_async).with('broken')
subject.perform
end
it 'skips verification for projects on shards excluded by selective sync' do
secondary.update!(selective_sync_type: 'shards', selective_sync_shards: [healthy_shard])
# Report both shards as healthy
expect(Gitlab::HealthChecks::FsShardsCheck).to receive(:readiness)
.and_return([result(true, healthy_shard), result(true, 'broken')])
expect(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness)
.and_return([result(true, healthy_shard), result(true, 'broken')])
expect(Geo::RepositoryVerification::Secondary::ShardWorker).to receive(:perform_async).with(healthy_shard)
expect(Geo::RepositoryVerification::Secondary::ShardWorker).not_to receive(:perform_async).with('broken')
subject.perform
end
end
context 'when geo_repository_verification is disabled' do
before do
stub_feature_flags(geo_repository_verification: false)
end
it 'does not schedule jobs' do
expect(Geo::RepositoryVerification::Secondary::ShardWorker)
.not_to receive(:perform_async).with(healthy_shard)
subject.perform
end
end
end
def result(success, shard)
Gitlab::HealthChecks::Result.new(success, nil, { shard: shard })
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment