Commit f48af85b authored by Valery Sizov's avatar Valery Sizov

Scheduling the design repositories sync

This MR implements backfill of the designs to secondary node
parent c820e576
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
module Geo module Geo
class DesignRegistryFinder < RegistryFinder class DesignRegistryFinder < RegistryFinder
def count_syncable def count_syncable
GeoNode.find(current_node_id).projects.count_designs GeoNode.find(current_node_id).projects.with_designs.count
end end
def count_synced def count_synced
...@@ -25,7 +25,7 @@ module Geo ...@@ -25,7 +25,7 @@ module Geo
def registries def registries
current_node current_node
.projects .projects
.inner_join_design_management .with_designs
.inner_join_design_registry .inner_join_design_registry
end end
end end
......
# frozen_string_literal: true
# Finder for retrieving unsynced designs that belong to a specific
# shard using FDW queries.
#
# Basic usage:
#
# Geo::DesignUnsyncedFinder
# .new(current_node: Gitlab::Geo.current_node, shard_name: 'default', batch_size: 1000)
# .execute.
module Geo
class DesignUnsyncedFinder
def initialize(current_node:, shard_name:, batch_size: nil)
@current_node = Geo::Fdw::GeoNode.find(current_node.id)
@shard_name = shard_name
@batch_size = batch_size
end
# rubocop:disable CodeReuse/ActiveRecord
def execute
return Geo::Fdw::Project.none unless valid_shard?
relation = projects.with_designs.missing_design_registry.within_shards(shard_name)
relation = relation.limit(batch_size) unless batch_size.nil?
relation
end
# rubocop:enable CodeReuse/ActiveRecord
private
attr_reader :current_node, :shard_name, :batch_size
def projects
return Geo::Fdw::Project.all if current_node.selective_sync_by_shards?
current_node.projects
end
def valid_shard?
return true unless current_node.selective_sync_by_shards?
current_node.selective_sync_shards.include?(shard_name)
end
end
end
# frozen_string_literal: true
# Finder for retrieving designs updated recently that belong to a specific
# shard using FDW queries.
#
# Basic usage:
#
# Geo::DesignUpdatedRecentlyFinder
# .new(current_node: Gitlab::Geo.current_node, shard_name: 'default', batch_size: 1000)
# .execute.
module Geo
class DesignUpdatedRecentlyFinder
def initialize(current_node:, shard_name:, batch_size: nil)
@current_node = Geo::Fdw::GeoNode.find(current_node.id)
@shard_name = shard_name
@batch_size = batch_size
end
# rubocop:disable CodeReuse/ActiveRecord
def execute
return Geo::Fdw::Project.none unless valid_shard?
relation = projects.with_designs.recently_updated_designs.within_shards(shard_name)
relation = relation.limit(batch_size) unless batch_size.nil?
relation
end
# rubocop:enable CodeReuse/ActiveRecord
private
attr_reader :current_node, :shard_name, :batch_size
def projects
return Geo::Fdw::Project.all if current_node.selective_sync_by_shards?
current_node.projects
end
def valid_shard?
return true unless current_node.selective_sync_by_shards?
current_node.selective_sync_shards.include?(shard_name)
end
end
end
...@@ -144,6 +144,7 @@ module EE ...@@ -144,6 +144,7 @@ module EE
scope :aimed_for_deletion, -> (date) { where('marked_for_deletion_at <= ?', date).without_deleted } scope :aimed_for_deletion, -> (date) { where('marked_for_deletion_at <= ?', date).without_deleted }
scope :with_repos_templates, -> { where(namespace_id: ::Gitlab::CurrentSettings.current_application_settings.custom_project_templates_group_id) } scope :with_repos_templates, -> { where(namespace_id: ::Gitlab::CurrentSettings.current_application_settings.custom_project_templates_group_id) }
scope :with_groups_level_repos_templates, -> { joins("INNER JOIN namespaces ON projects.namespace_id = namespaces.custom_project_templates_group_id") } scope :with_groups_level_repos_templates, -> { joins("INNER JOIN namespaces ON projects.namespace_id = namespaces.custom_project_templates_group_id") }
scope :with_designs, -> { where(id: DesignManagement::Design.select(:project_id)) }
delegate :shared_runners_minutes, :shared_runners_seconds, :shared_runners_seconds_last_reset, delegate :shared_runners_minutes, :shared_runners_seconds, :shared_runners_seconds_last_reset,
to: :statistics, allow_nil: true to: :statistics, allow_nil: true
...@@ -195,19 +196,6 @@ module EE ...@@ -195,19 +196,6 @@ module EE
joins('LEFT JOIN services ON services.project_id = projects.id AND services.type = \'GitlabSlackApplicationService\' AND services.active IS true') joins('LEFT JOIN services ON services.project_id = projects.id AND services.type = \'GitlabSlackApplicationService\' AND services.active IS true')
.where('services.id IS NULL') .where('services.id IS NULL')
end end
def inner_join_design_management
join_statement =
arel_table
.join(DesignManagement::Design.arel_table, Arel::Nodes::InnerJoin)
.on(arel_table[:id].eq(DesignManagement::Design.arel_table[:project_id]))
joins(join_statement.join_sources)
end
def count_designs
inner_join_design_management.distinct.count
end
end end
def can_store_security_reports? def can_store_security_reports?
......
...@@ -7,6 +7,7 @@ class Geo::DesignRegistry < Geo::BaseRegistry ...@@ -7,6 +7,7 @@ class Geo::DesignRegistry < Geo::BaseRegistry
belongs_to :project belongs_to :project
scope :pending, -> { with_state(:pending) }
scope :failed, -> { with_state(:failed) } scope :failed, -> { with_state(:failed) }
scope :synced, -> { with_state(:synced) } scope :synced, -> { with_state(:synced) }
scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.now))) } scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.now))) }
...@@ -22,7 +23,7 @@ class Geo::DesignRegistry < Geo::BaseRegistry ...@@ -22,7 +23,7 @@ class Geo::DesignRegistry < Geo::BaseRegistry
end end
before_transition any => :pending do |registry, _| before_transition any => :pending do |registry, _|
registry.retry_at = 0 registry.retry_at = nil
registry.retry_count = 0 registry.retry_count = 0
end end
...@@ -50,6 +51,10 @@ class Geo::DesignRegistry < Geo::BaseRegistry ...@@ -50,6 +51,10 @@ class Geo::DesignRegistry < Geo::BaseRegistry
designs_repositories designs_repositories
end end
def self.updated_recently
pending.or(failed.retry_due)
end
def fail_sync!(message, error, attrs = {}) def fail_sync!(message, error, attrs = {})
new_retry_count = retry_count + 1 new_retry_count = retry_count + 1
......
...@@ -90,11 +90,24 @@ module Geo ...@@ -90,11 +90,24 @@ module Geo
joins(join_statement.join_sources) joins(join_statement.join_sources)
end end
def inner_join_design_management def missing_design_registry
left_outer_join_design_registry
.where(Geo::DesignRegistry.arel_table[:project_id].eq(nil))
end
def recently_updated_designs
inner_join_design_registry
.merge(Geo::DesignRegistry.updated_recently)
end
def with_designs
design_table = Geo::Fdw::DesignManagementDesign.arel_table
design_subquery = design_table.project(design_table[:project_id]).distinct.as('sub_design_table')
join_statement = join_statement =
arel_table arel_table
.join(Geo::Fdw::DesignManagementDesign.arel_table, Arel::Nodes::InnerJoin) .join(design_subquery, Arel::Nodes::InnerJoin)
.on(arel_table[:id].eq(Geo::Fdw::DesignManagementDesign.arel_table[:project_id])) .on(arel_table[:id].eq(design_subquery[:project_id]))
joins(join_statement.join_sources) joins(join_statement.join_sources)
end end
...@@ -109,6 +122,15 @@ module Geo ...@@ -109,6 +122,15 @@ module Geo
joins(join_statement.join_sources) joins(join_statement.join_sources)
end end
def left_outer_join_design_registry
join_statement =
arel_table
.join(Geo::DesignRegistry.arel_table, Arel::Nodes::OuterJoin)
.on(arel_table[:id].eq(Geo::DesignRegistry.arel_table[:project_id]))
joins(join_statement.join_sources)
end
end end
end end
end end
......
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
- geo:geo_repository_cleanup - geo:geo_repository_cleanup
- geo:geo_repository_destroy - geo:geo_repository_destroy
- geo:geo_repository_shard_sync - geo:geo_repository_shard_sync
- geo:geo_design_repository_shard_sync
- geo:geo_repository_verification_primary_shard - geo:geo_repository_verification_primary_shard
- geo:geo_repository_verification_primary_single - geo:geo_repository_verification_primary_single
- geo:geo_repository_verification_secondary_single - geo:geo_repository_verification_secondary_single
......
# frozen_string_literal: true
module Geo
class DesignRepositoryShardSyncWorker < RepositoryShardSyncWorker
private
def schedule_job(project_id)
job_id = Geo::DesignRepositorySyncWorker.perform_async(project_id)
{ project_id: project_id, job_id: job_id } if job_id
end
# rubocop: disable CodeReuse/ActiveRecord
def find_project_ids_not_synced(batch_size:)
find_unsynced_projects(batch_size: batch_size)
.id_not_in(scheduled_project_ids)
.reorder(last_repository_updated_at: :desc)
.pluck_primary_key
end
# rubocop: enable CodeReuse/ActiveRecord
def find_unsynced_projects(batch_size:)
Geo::DesignUnsyncedFinder
.new(current_node: current_node, shard_name: shard_name, batch_size: batch_size)
.execute
end
# rubocop: disable CodeReuse/ActiveRecord
def find_project_ids_updated_recently(batch_size:)
find_projects_updated_recently(batch_size: batch_size)
.id_not_in(scheduled_project_ids)
.order('design_registry.last_synced_at ASC NULLS FIRST')
.pluck_primary_key
end
# rubocop: enable CodeReuse/ActiveRecord
def find_projects_updated_recently(batch_size:)
Geo::DesignUpdatedRecentlyFinder
.new(current_node: current_node, shard_name: shard_name, batch_size: batch_size)
.execute
end
end
end
...@@ -8,6 +8,10 @@ module Geo ...@@ -8,6 +8,10 @@ module Geo
else else
Geo::RepositoryShardSyncWorker.perform_async(shard_name) Geo::RepositoryShardSyncWorker.perform_async(shard_name)
end end
if Feature.enabled?(:enable_geo_design_sync)
Geo::DesignRepositoryShardSyncWorker.perform_async(shard_name)
end
end end
end end
end end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::DesignUnsyncedFinder, :geo, :geo_fdw do
describe '#execute' do
let(:node) { create(:geo_node) }
let(:group_1) { create(:group) }
let(:group_2) { create(:group) }
let(:nested_group_1) { create(:group, parent: group_1) }
let!(:project_1) { create(:project, group: group_1) }
let!(:project_2) { create(:project, group: nested_group_1) }
let!(:project_3) { create(:project, group: group_2) }
let!(:project_4) { create(:project, group: group_1) }
before do
project_4.update_column(:repository_storage, 'foo')
create(:design, project: project_1)
create(:design, project: project_2)
create(:design, project: project_3)
create(:design, project: project_4)
end
subject { described_class.new(current_node: node, shard_name: 'default', batch_size: 100) }
context 'without selective sync' do
it 'returns designs without an entry on the tracking database' do
create(:geo_design_registry, :synced, project: project_2)
expect(subject.execute).to match_ids(project_1, project_3)
end
end
context 'with selective sync by namespace' do
it 'returns designs that belong to the namespaces without an entry on the tracking database' do
create(:geo_design_registry, :synced, project: project_4)
node.update!(selective_sync_type: 'namespaces', namespaces: [group_1, nested_group_1])
expect(subject.execute).to match_ids(project_1, project_2)
end
end
context 'with selective sync by shard' do
before do
node.update!(selective_sync_type: 'shards', selective_sync_shards: ['foo'])
end
it 'does not return designs out of synced shards' do
subject = described_class.new(current_node: node, shard_name: 'default', batch_size: 100)
expect(subject.execute).to be_empty
end
it 'returns designs that belong to the shards without an entry on the tracking database' do
project_5 = create(:project, group: group_1)
project_5.update_column(:repository_storage, 'foo')
create(:design, project: project_5)
create(:geo_design_registry, :synced, project: project_4)
subject = described_class.new(current_node: node, shard_name: 'foo', batch_size: 100)
expect(subject.execute).to match_ids(project_5)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::DesignUpdatedRecentlyFinder, :geo, :geo_fdw do
describe '#execute' do
let(:node) { create(:geo_node) }
let(:group_1) { create(:group) }
let(:group_2) { create(:group) }
let(:nested_group_1) { create(:group, parent: group_1) }
let!(:project_1) { create(:project, group: group_1) }
let!(:project_2) { create(:project, group: nested_group_1) }
let!(:project_3) { create(:project, group: group_2) }
let!(:project_4) { create(:project, group: group_1) }
before do
project_4.update_column(:repository_storage, 'foo')
create(:geo_design_registry, project: project_1)
create(:geo_design_registry, project: project_2)
create(:geo_design_registry, :synced, project: project_3)
create(:geo_design_registry, project: project_4)
create(:design, project: project_1)
create(:design, project: project_2)
create(:design, project: project_3)
create(:design, project: project_4)
end
subject { described_class.new(current_node: node, shard_name: 'default', batch_size: 100) }
context 'without selective sync' do
it 'returns desings with a dirty entry on the tracking database' do
expect(subject.execute).to match_ids(project_1, project_2)
end
end
context 'with selective sync by namespace' do
it 'returns designs that belong to the namespaces with a dirty entry on the tracking database' do
node.update!(selective_sync_type: 'namespaces', namespaces: [group_1])
expect(subject.execute).to match_ids(project_1, project_2)
end
end
context 'with selective sync by shard' do
before do
node.update!(selective_sync_type: 'shards', selective_sync_shards: ['foo'])
end
it 'does not return designs out of selected shard' do
subject = described_class.new(current_node: node, shard_name: 'default', batch_size: 100)
expect(subject.execute).to be_empty
end
it 'returns designs that belong to the shards with a dirty entry on the tracking database' do
project_5 = create(:project, group: group_1)
project_5.update_column(:repository_storage, 'foo')
create(:design, project: project_5)
create(:geo_design_registry, :synced, project: project_5)
subject = described_class.new(current_node: node, shard_name: 'foo', batch_size: 100)
expect(subject.execute).to match_ids(project_4)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::DesignRepositoryShardSyncWorker, :geo, :geo_fdw, :clean_gitlab_redis_cache do
include ::EE::GeoHelpers
include ExclusiveLeaseHelpers
let!(:primary) { create(:geo_node, :primary) }
let!(:secondary) { create(:geo_node) }
let(:shard_name) { Gitlab.config.repositories.storages.keys.first }
before do
stub_current_geo_node(secondary)
end
describe '#perform' do
let(:restricted_group) { create(:group) }
let(:unsynced_project_in_restricted_group) { create(:project, group: restricted_group) }
let(:unsynced_project) { create(:project) }
before do
stub_exclusive_lease(renew: true)
Gitlab::ShardHealthCache.update([shard_name])
create(:design, project: unsynced_project_in_restricted_group)
create(:design, project: unsynced_project)
end
it 'performs Geo::DesignRepositorySyncWorker for each project' do
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'performs Geo::DesignRepositorySyncWorker for designs where last attempt to sync failed' do
create(:geo_design_registry, :sync_failed, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, :synced, project: unsynced_project)
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async).once.and_return(spy)
subject.perform(shard_name)
end
it 'does not perform Geo::DesignRepositorySyncWorker when shard becomes unhealthy' do
Gitlab::ShardHealthCache.update([])
expect(Geo::DesignRepositorySyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'performs Geo::DesignRepositorySyncWorker for designs updated recently' do
create(:geo_design_registry, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, :synced, project: unsynced_project)
create(:geo_design_registry)
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'does not schedule a job twice for the same project' do
scheduled_jobs = [
{ job_id: 1, project_id: unsynced_project.id },
{ job_id: 2, project_id: unsynced_project_in_restricted_group.id }
]
is_expected.to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(:once)
is_expected.not_to receive(:schedule_job)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
it 'does not perform Geo::DesignRepositorySyncWorker when no geo database is configured' do
allow(Gitlab::Geo).to receive(:geo_database_configured?) { false }
expect(Geo::DesignRepositorySyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
# We need to unstub here or the DatabaseCleaner will have issues since it
# will appear as though the tracking DB were not available
allow(Gitlab::Geo).to receive(:geo_database_configured?).and_call_original
end
it 'does not perform Geo::ProjectSyncWorker when not running on a secondary' do
allow(Gitlab::Geo).to receive(:secondary?) { false }
expect(Geo::DesignRepositorySyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not perform Geo::DesignRepositorySyncWorker when node is disabled' do
allow_any_instance_of(GeoNode).to receive(:enabled?) { false }
expect(Geo::DesignRepositorySyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
context 'multiple shards' do
it 'uses two loops to schedule jobs', :sidekiq_might_not_need_inline do
expect(subject).to receive(:schedule_jobs).twice.and_call_original
Gitlab::ShardHealthCache.update([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
secondary.update!(repos_max_capacity: 5)
subject.perform(shard_name)
end
end
context 'when node has namespace restrictions', :request_store do
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [restricted_group])
allow(::Gitlab::Geo).to receive(:current_node).and_call_original
Rails.cache.write(:current_node, secondary.to_json)
allow(::GeoNode).to receive(:current_node).and_return(secondary)
end
it 'does not perform Geo::DesignRepositorySyncWorker for projects that do not belong to selected namespaces to replicate' do
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id)
.once
.and_return(spy)
subject.perform(shard_name)
end
it 'does not perform Geo::DesignRepositorySyncWorker for synced projects updated recently that do not belong to selected namespaces to replicate' do
create(:geo_design_registry, project: unsynced_project_in_restricted_group)
create(:geo_design_registry, project: unsynced_project)
expect(Geo::DesignRepositorySyncWorker).to receive(:perform_async)
.with(unsynced_project_in_restricted_group.id)
.once
.and_return(spy)
subject.perform(shard_name)
end
end
end
end
...@@ -11,6 +11,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -11,6 +11,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
let!(:project_in_synced_group) { create(:project, group: synced_group) } let!(:project_in_synced_group) { create(:project, group: synced_group) }
let!(:unsynced_project) { create(:project) } let!(:unsynced_project) { create(:project) }
let(:healthy_shard_name) { project_in_synced_group.repository.storage } let(:healthy_shard_name) { project_in_synced_group.repository.storage }
let(:design_worker) { Geo::DesignRepositoryShardSyncWorker }
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
...@@ -32,6 +33,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -32,6 +33,7 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
end end
expect(worker).not_to receive(:perform_async).with('broken') expect(worker).not_to receive(:perform_async).with('broken')
expect(design_worker).not_to receive(:perform_async).with('broken')
subject.perform subject.perform
end end
...@@ -44,7 +46,9 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -44,7 +46,9 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
.and_return([result(true, healthy_shard_name), result(true, 'broken')]) .and_return([result(true, healthy_shard_name), result(true, 'broken')])
expect(worker).to receive(:perform_async).with('default') expect(worker).to receive(:perform_async).with('default')
expect(design_worker).to receive(:perform_async).with('default')
expect(worker).not_to receive(:perform_async).with('broken') expect(worker).not_to receive(:perform_async).with('broken')
expect(design_worker).not_to receive(:perform_async).with('broken')
subject.perform subject.perform
end end
...@@ -61,7 +65,9 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -61,7 +65,9 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
stub_storage_settings({}) stub_storage_settings({})
expect(worker).to receive(:perform_async).with(project_in_synced_group.repository.storage) expect(worker).to receive(:perform_async).with(project_in_synced_group.repository.storage)
expect(design_worker).to receive(:perform_async).with(project_in_synced_group.repository.storage)
expect(worker).not_to receive(:perform_async).with('unknown') expect(worker).not_to receive(:perform_async).with('unknown')
expect(design_worker).not_to receive(:perform_async).with('unknown')
subject.perform subject.perform
end end
...@@ -77,7 +83,9 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -77,7 +83,9 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
.and_return([result(true, healthy_shard_name), result(false, 'broken')]) .and_return([result(true, healthy_shard_name), result(false, 'broken')])
expect(worker).to receive(:perform_async).with(healthy_shard_name) expect(worker).to receive(:perform_async).with(healthy_shard_name)
expect(design_worker).to receive(:perform_async).with(healthy_shard_name)
expect(worker).not_to receive(:perform_async).with('broken') expect(worker).not_to receive(:perform_async).with('broken')
expect(design_worker).not_to receive(:perform_async).with('broken')
subject.perform subject.perform
end end
...@@ -100,6 +108,23 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do ...@@ -100,6 +108,23 @@ describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
include_examples '#perform', Geo::RepositoryShardSyncWorker include_examples '#perform', Geo::RepositoryShardSyncWorker
end end
context 'when enable_geo_design_sync flag is disabled' do
before do
stub_feature_flags(enable_geo_design_sync: false)
end
it 'skips calling Geo::DesignRepositoryShardSyncWorker' do
# Report that default shard is healthy
expect(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness)
.and_return([result(true, healthy_shard_name)])
expect(Geo::Secondary::RepositoryBackfillWorker).to receive(:perform_async).with('default')
expect(design_worker).not_to receive(:perform_async).with('default')
subject.perform
end
end
def result(success, shard) def result(success, shard)
Gitlab::HealthChecks::Result.new('gitaly_check', success, nil, { shard: shard }) Gitlab::HealthChecks::Result.new('gitaly_check', success, nil, { shard: shard })
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment