Commit 0d126f55 authored by Nick Thomas's avatar Nick Thomas

Merge branch '5309-geo-repository-verification-per-shard-on-secondary-node' into 'master'

Resolve "Geo - Repository verification per shard on secondary node"

Closes #5309 and #5175

See merge request gitlab-org/gitlab-ee!5068
parents 39cc0753 e5909dab
...@@ -126,14 +126,18 @@ ...@@ -126,14 +126,18 @@
- cronjob:geo_repository_verification_primary_batch - cronjob:geo_repository_verification_primary_batch
- cronjob:geo_repository_verification_secondary_scheduler - cronjob:geo_repository_verification_secondary_scheduler
- cronjob:geo_sidekiq_cron_config - cronjob:geo_sidekiq_cron_config
- cronjob:geo_scheduler_per_shard_scheduler
- cronjob:geo_scheduler_primary_per_shard_scheduler
- cronjob:geo_scheduler_secondary_per_shard_scheduler
- cronjob:geo_repository_verification_secondary_shard
- cronjob:historical_data - cronjob:historical_data
- cronjob:ldap_all_groups_sync - cronjob:ldap_all_groups_sync
- cronjob:ldap_sync - cronjob:ldap_sync
- cronjob:update_all_mirrors - cronjob:update_all_mirrors
- geo:geo_scheduler_base - geo:geo_scheduler_scheduler
- geo:geo_scheduler_primary - geo:geo_scheduler_primary_scheduler
- geo:geo_scheduler_secondary - geo:geo_scheduler_secondary_scheduler
- geo:geo_file_download - geo:geo_file_download
- geo:geo_file_removal - geo:geo_file_removal
- geo:geo_hashed_storage_attachments_migration - geo:geo_hashed_storage_attachments_migration
......
module Geo module Geo
class FileDownloadDispatchWorker < Geo::Scheduler::SecondaryWorker class FileDownloadDispatchWorker < Geo::Scheduler::Secondary::SchedulerWorker
include CronjobQueue include CronjobQueue
private private
......
module Geo module Geo
class RepositoryShardSyncWorker < Geo::Scheduler::SecondaryWorker class RepositoryShardSyncWorker < Geo::Scheduler::Secondary::SchedulerWorker
sidekiq_options retry: false sidekiq_options retry: false
attr_accessor :shard_name attr_accessor :shard_name
......
module Geo module Geo
class RepositorySyncWorker class RepositorySyncWorker < Geo::Scheduler::Secondary::PerShardSchedulerWorker
include ApplicationWorker def schedule_job(shard_name)
include CronjobQueue RepositoryShardSyncWorker.perform_async(shard_name)
HEALTHY_SHARD_CHECKS = [
Gitlab::HealthChecks::FsShardsCheck,
Gitlab::HealthChecks::GitalyCheck
].freeze
def perform
return unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary?
shards = selective_sync_filter(healthy_shards)
Gitlab::Geo::ShardHealthCache.update(shards)
shards.each do |shard_name|
RepositoryShardSyncWorker.perform_async(shard_name)
end
end
def healthy_shards
# For now, we need to perform both Gitaly and direct filesystem checks to ensure
# the shard is healthy. We take the intersection of the successful checks
# as the healthy shards.
HEALTHY_SHARD_CHECKS.map(&:readiness)
.map { |check_result| check_result.select(&:success) }
.inject(:&)
.map { |check_result| check_result.labels[:shard] }
.compact
.uniq
end
def selective_sync_filter(shards)
return shards unless ::Gitlab::Geo.current_node&.selective_sync_by_shards?
shards & ::Gitlab::Geo.current_node.selective_sync_shards
end end
end end
end end
module Geo module Geo
module RepositoryVerification module RepositoryVerification
module Primary module Primary
class BatchWorker class BatchWorker < Geo::Scheduler::Primary::PerShardSchedulerWorker
include ApplicationWorker
include CronjobQueue
include ::Gitlab::Utils::StrongMemoize
HEALTHY_SHARD_CHECKS = [
Gitlab::HealthChecks::FsShardsCheck,
Gitlab::HealthChecks::GitalyCheck
].freeze
def perform def perform
return unless Feature.enabled?('geo_repository_verification') return unless Feature.enabled?('geo_repository_verification')
return unless Gitlab::Geo.primary?
Gitlab::Geo::ShardHealthCache.update(healthy_shards)
healthy_shards.each do |shard_name| super
Geo::RepositoryVerification::Primary::ShardWorker.perform_async(shard_name)
end
end end
def healthy_shards def schedule_job(shard_name)
strong_memoize(:healthy_shards) do Geo::RepositoryVerification::Primary::ShardWorker.perform_async(shard_name)
# For now, we need to perform both Gitaly and direct filesystem checks to ensure
# the shard is healthy. We take the intersection of the successful checks
# as the healthy shards.
HEALTHY_SHARD_CHECKS.map(&:readiness)
.map { |check_result| check_result.select(&:success) }
.inject(:&)
.map { |check_result| check_result.labels[:shard] }
.compact
.uniq
end
end end
end end
end end
......
module Geo module Geo
module RepositoryVerification module RepositoryVerification
module Primary module Primary
class ShardWorker < Geo::Scheduler::PrimaryWorker class ShardWorker < Geo::Scheduler::Primary::SchedulerWorker
sidekiq_options retry: false sidekiq_options retry: false
MAX_CAPACITY = 100 MAX_CAPACITY = 100
......
module Geo module Geo
module RepositoryVerification module RepositoryVerification
module Secondary module Secondary
class SchedulerWorker < Geo::Scheduler::SecondaryWorker class SchedulerWorker < Geo::Scheduler::Secondary::PerShardSchedulerWorker
include CronjobQueue
MAX_CAPACITY = 1000
def perform def perform
return unless Feature.enabled?('geo_repository_verification') return unless Feature.enabled?('geo_repository_verification')
super super
end end
private def schedule_job(shard_name)
Geo::RepositoryVerification::Secondary::ShardWorker.perform_async(shard_name)
def max_capacity
MAX_CAPACITY
end
def load_pending_resources
finder.find_registries_to_verify(batch_size: db_retrieve_batch_size)
.pluck(:id)
end
def schedule_job(registry_id)
job_id = Geo::RepositoryVerification::Secondary::SingleWorker.perform_async(registry_id)
{ id: registry_id, job_id: job_id } if job_id
end
def finder
@finder ||= Geo::ProjectRegistryFinder.new
end end
end end
end end
......
module Geo
module RepositoryVerification
module Secondary
class ShardWorker < Geo::Scheduler::Secondary::SchedulerWorker
include CronjobQueue
MAX_CAPACITY = 1000
attr_accessor :shard_name
def perform(shard_name)
@shard_name = shard_name
return unless Gitlab::Geo::ShardHealthCache.healthy_shard?(shard_name)
super()
end
private
def worker_metadata
{ shard: shard_name }
end
def max_capacity
MAX_CAPACITY
end
def load_pending_resources
finder.find_registries_to_verify(batch_size: db_retrieve_batch_size)
.pluck(:id)
end
def schedule_job(registry_id)
job_id = Geo::RepositoryVerification::Secondary::SingleWorker.perform_async(registry_id)
{ id: registry_id, job_id: job_id } if job_id
end
def finder
@finder ||= Geo::ProjectRegistryFinder.new
end
end
end
end
end
module Geo
module Scheduler
class PerShardSchedulerWorker
include ApplicationWorker
include CronjobQueue
include ::Gitlab::Utils::StrongMemoize
HEALTHY_SHARD_CHECKS = [
Gitlab::HealthChecks::FsShardsCheck,
Gitlab::HealthChecks::GitalyCheck
].freeze
def perform
Gitlab::Geo::ShardHealthCache.update(eligible_shards)
eligible_shards.each do |shard_name|
schedule_job(shard_name)
end
end
def schedule_job(shard_name)
raise NotImplementedError
end
def eligible_shards
healthy_shards
end
def healthy_shards
strong_memoize(:healthy_shards) do
# For now, we need to perform both Gitaly and direct filesystem checks to ensure
# the shard is healthy. We take the intersection of the successful checks
# as the healthy shards.
HEALTHY_SHARD_CHECKS.map(&:readiness)
.map { |check_result| check_result.select(&:success) }
.inject(:&)
.map { |check_result| check_result.labels[:shard] }
.compact
.uniq
end
end
end
end
end
module Geo
module Scheduler
module Primary
class PerShardSchedulerWorker < Geo::Scheduler::PerShardSchedulerWorker
def perform
return unless Gitlab::Geo.primary?
super
end
end
end
end
end
module Geo
module Scheduler
module Primary
class SchedulerWorker < Geo::Scheduler::SchedulerWorker
def perform
return unless Gitlab::Geo.primary?
super
end
end
end
end
end
module Geo
module Scheduler
class PrimaryWorker < Geo::Scheduler::BaseWorker
def perform
return unless Gitlab::Geo.primary?
super
end
end
end
end
module Geo module Geo
module Scheduler module Scheduler
class BaseWorker class SchedulerWorker
include ApplicationWorker include ApplicationWorker
include GeoQueue include GeoQueue
include ExclusiveLeaseGuard include ExclusiveLeaseGuard
......
module Geo
module Scheduler
module Secondary
class PerShardSchedulerWorker < Geo::Scheduler::PerShardSchedulerWorker
def perform
return unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary?
super
end
def eligible_shards
selective_sync_filter(healthy_shards)
end
def selective_sync_filter(shards)
return shards unless ::Gitlab::Geo.current_node&.selective_sync_by_shards?
shards & ::Gitlab::Geo.current_node.selective_sync_shards
end
end
end
end
end
module Geo
module Scheduler
module Secondary
class SchedulerWorker < Geo::Scheduler::SchedulerWorker
def perform
return unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary?
super
end
end
end
end
end
module Geo
module Scheduler
class SecondaryWorker < Geo::Scheduler::BaseWorker
def perform
return unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary?
super
end
end
end
end
---
title: Geo - Perform the repository verification per shard on a secondary node
merge_request: 5068
author:
type: changed
...@@ -110,7 +110,7 @@ describe Geo::FileDownloadDispatchWorker, :geo do ...@@ -110,7 +110,7 @@ describe Geo::FileDownloadDispatchWorker, :geo do
# 1. A total of 10 files in the queue, and we can load a maximimum of 5 and send 2 at a time. # 1. A total of 10 files in the queue, and we can load a maximimum of 5 and send 2 at a time.
# 2. We send 2, wait for 1 to finish, and then send again. # 2. We send 2, wait for 1 to finish, and then send again.
it 'attempts to load a new batch without pending downloads' do it 'attempts to load a new batch without pending downloads' do
stub_const('Geo::Scheduler::BaseWorker::DB_RETRIEVE_BATCH_SIZE', 5) stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 5)
secondary.update!(files_max_capacity: 2) secondary.update!(files_max_capacity: 2)
allow_any_instance_of(::Gitlab::Geo::Transfer).to receive(:download_from_primary).and_return(100) allow_any_instance_of(::Gitlab::Geo::Transfer).to receive(:download_from_primary).and_return(100)
...@@ -142,7 +142,7 @@ describe Geo::FileDownloadDispatchWorker, :geo do ...@@ -142,7 +142,7 @@ describe Geo::FileDownloadDispatchWorker, :geo do
it 'does not stall backfill' do it 'does not stall backfill' do
unsynced = create(:lfs_object, :with_file) unsynced = create(:lfs_object, :with_file)
stub_const('Geo::BaseSchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 1) stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 1)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with(:lfs, failed_registry.file_id) expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with(:lfs, failed_registry.file_id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with(:lfs, unsynced.id) expect(Geo::FileDownloadWorker).to receive(:perform_async).with(:lfs, unsynced.id)
......
...@@ -17,6 +17,10 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -17,6 +17,10 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
end end
around do |example|
Sidekiq::Testing.inline! { example.run }
end
describe '#perform' do describe '#perform' do
context 'additional shards' do context 'additional shards' do
it 'skips backfill for repositories on other shards' do it 'skips backfill for repositories on other shards' do
...@@ -31,7 +35,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -31,7 +35,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with(project_in_synced_group.repository.storage) expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with(project_in_synced_group.repository.storage)
expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('broken') expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('broken')
Sidekiq::Testing.inline! { subject.perform } subject.perform
end end
it 'skips backfill for projects on shards excluded by selective sync' do it 'skips backfill for projects on shards excluded by selective sync' do
...@@ -46,7 +50,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -46,7 +50,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with('default') expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with('default')
expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('broken') expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('broken')
Sidekiq::Testing.inline! { subject.perform } subject.perform
end end
it 'skips backfill for projects on missing shards' do it 'skips backfill for projects on missing shards' do
...@@ -63,7 +67,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -63,7 +67,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with(project_in_synced_group.repository.storage) expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with(project_in_synced_group.repository.storage)
expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('unknown') expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('unknown')
Sidekiq::Testing.inline! { subject.perform } subject.perform
end end
it 'skips backfill for projects with downed Gitaly server' do it 'skips backfill for projects with downed Gitaly server' do
...@@ -81,7 +85,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -81,7 +85,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with(healthy_shard) expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with(healthy_shard)
expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('broken') expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('broken')
Sidekiq::Testing.inline! { subject.perform } subject.perform
end end
end end
end end
......
require 'spec_helper'
describe Geo::RepositoryVerification::Secondary::SchedulerWorker, :postgresql, :clean_gitlab_redis_cache do
include ::EE::GeoHelpers
set(:healthy_not_verified) { create(:project) }
let!(:secondary) { create(:geo_node) }
let(:healthy_shard) { healthy_not_verified.repository.storage }
before do
stub_current_geo_node(secondary)
end
around do |example|
Sidekiq::Testing.inline! { example.run }
end
describe '#perform' do
context 'when geo_repository_verification is enabled' do
before do
stub_feature_flags(geo_repository_verification: true)
end
it 'skips verification for repositories on other shards' do
unhealthy_not_verified = create(:project, repository_storage: 'broken')
# Make the shard unhealthy
FileUtils.rm_rf(unhealthy_not_verified.repository_storage_path)
expect(Geo::RepositoryVerification::Secondary::ShardWorker).to receive(:perform_async).with(healthy_shard)
expect(Geo::RepositoryVerification::Secondary::ShardWorker).not_to receive(:perform_async).with('broken')
subject.perform
end
it 'skips verification for projects on missing shards' do
missing_not_verified = create(:project)
missing_not_verified.update_column(:repository_storage, 'unknown')
# hide the 'broken' storage for this spec
stub_storage_settings({})
expect(Geo::RepositoryVerification::Secondary::ShardWorker).to receive(:perform_async).with(healthy_shard)
expect(Geo::RepositoryVerification::Secondary::ShardWorker).not_to receive(:perform_async).with('unknown')
subject.perform
end
it 'skips verification for projects with downed Gitaly server' do
create(:project, repository_storage: 'broken')
# Report only one healthy shard
expect(Gitlab::HealthChecks::FsShardsCheck).to receive(:readiness)
.and_return([result(true, healthy_shard), result(true, 'broken')])
expect(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness)
.and_return([result(true, healthy_shard), result(false, 'broken')])
expect(Geo::RepositoryVerification::Secondary::ShardWorker).to receive(:perform_async).with(healthy_shard)
expect(Geo::RepositoryVerification::Secondary::ShardWorker).not_to receive(:perform_async).with('broken')
subject.perform
end
it 'skips verification for projects on shards excluded by selective sync' do
secondary.update!(selective_sync_type: 'shards', selective_sync_shards: [healthy_shard])
# Report both shards as healthy
expect(Gitlab::HealthChecks::FsShardsCheck).to receive(:readiness)
.and_return([result(true, healthy_shard), result(true, 'broken')])
expect(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness)
.and_return([result(true, healthy_shard), result(true, 'broken')])
expect(Geo::RepositoryVerification::Secondary::ShardWorker).to receive(:perform_async).with(healthy_shard)
expect(Geo::RepositoryVerification::Secondary::ShardWorker).not_to receive(:perform_async).with('broken')
subject.perform
end
end
context 'when geo_repository_verification is disabled' do
before do
stub_feature_flags(geo_repository_verification: false)
end
it 'does not schedule jobs' do
expect(Geo::RepositoryVerification::Secondary::ShardWorker)
.not_to receive(:perform_async).with(healthy_shard)
subject.perform
end
end
end
def result(success, shard)
Gitlab::HealthChecks::Result.new(success, nil, { shard: shard })
end
end
require 'spec_helper'
describe Geo::RepositoryVerification::Secondary::ShardWorker, :postgresql, :clean_gitlab_redis_cache do
include ::EE::GeoHelpers
let!(:secondary) { create(:geo_node) }
let(:shard_name) { Gitlab.config.repositories.storages.keys.first }
set(:project) { create(:project) }
before do
stub_current_geo_node(secondary)
end
describe '#perform' do
before do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true }
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:renew) { true }
Gitlab::Geo::ShardHealthCache.update([shard_name])
end
it 'schedule job for each project' do
other_project = create(:project)
create(:repository_state, :repository_verified, project: project)
create(:repository_state, :repository_verified, project: other_project)
create(:geo_project_registry, :repository_verification_outdated, project: project)
create(:geo_project_registry, :repository_verification_outdated, project: other_project)
expect(Geo::RepositoryVerification::Secondary::SingleWorker)
.to receive(:perform_async).twice
subject.perform(shard_name)
end
it 'schedule job for projects missing repository verification' do
create(:repository_state, :wiki_verified, project: project)
missing_repository_verification = create(:geo_project_registry, :wiki_verified, project: project)
expect(Geo::RepositoryVerification::Secondary::SingleWorker)
.to receive(:perform_async).with(missing_repository_verification.id)
subject.perform(shard_name)
end
it 'schedule job for projects missing wiki verification' do
create(:repository_state, :repository_verified, project: project)
missing_wiki_verification = create(:geo_project_registry, :repository_verified, project: project)
expect(Geo::RepositoryVerification::Secondary::SingleWorker)
.to receive(:perform_async).with(missing_wiki_verification.id)
subject.perform(shard_name)
end
it 'does not schedule jobs when shard becomes unhealthy' do
create(:repository_state, project: project)
Gitlab::Geo::ShardHealthCache.update([])
expect(Geo::RepositoryVerification::Secondary::SingleWorker)
.not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not schedule jobs when no geo database is configured' do
allow(Gitlab::Geo).to receive(:geo_database_configured?) { false }
expect(Geo::RepositoryVerification::Secondary::SingleWorker)
.not_to receive(:perform_async)
subject.perform(shard_name)
# We need to unstub here or the DatabaseCleaner will have issues since it
# will appear as though the tracking DB were not available
allow(Gitlab::Geo).to receive(:geo_database_configured?).and_call_original
end
it 'does not schedule jobs when not running on a secondary' do
allow(Gitlab::Geo).to receive(:primary?) { false }
expect(Geo::RepositoryVerification::Secondary::SingleWorker)
.not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not schedule jobs when number of scheduled jobs exceeds capacity' do
create(:project)
is_expected.to receive(:scheduled_job_ids).and_return(1..1000).at_least(:once)
is_expected.not_to receive(:schedule_job)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment