Commit 059fb1d4 authored by Nick Thomas's avatar Nick Thomas

Merge branch...

Merge branch '4231-geo-selective-sync-cleanup-worker-uses-inefficient-not-in-sql-query' into 'master'

Resolve "Geo selective sync cleanup worker uses inefficient `NOT IN` SQL query"

Closes #4231

See merge request gitlab-org/gitlab-ee!11998
parents 6e72ac37 9f85c446
...@@ -11,6 +11,30 @@ module Geo::SelectiveSync ...@@ -11,6 +11,30 @@ module Geo::SelectiveSync
end end
end end
def projects_outside_selected_namespaces
return project_model.none unless selective_sync_by_namespaces?
cte_query = selected_namespaces_and_descendants_cte
cte_table = cte_query.table
join_statement =
projects_table
.join(cte_table, Arel::Nodes::OuterJoin)
.on(projects_table[:namespace_id].eq(cte_table[:id]))
project_model
.joins(join_statement.join_sources)
.where(cte_table[:id].eq(nil))
.with
.recursive(cte_query.to_arel)
end
def projects_outside_selected_shards
return project_model.none unless selective_sync_by_shards?
project_model.outside_shards(selective_sync_shards)
end
def selective_sync? def selective_sync?
selective_sync_type.present? selective_sync_type.present?
end end
...@@ -135,6 +159,16 @@ module Geo::SelectiveSync ...@@ -135,6 +159,16 @@ module Geo::SelectiveSync
namespaces.arel_table namespaces.arel_table
end end
def project_model
raise NotImplementedError,
"#{self.class} does not implement #{__method__}"
end
def projects_table
raise NotImplementedError,
"#{self.class} does not implement #{__method__}"
end
def uploads_model def uploads_model
raise NotImplementedError, raise NotImplementedError,
"#{self.class} does not implement #{__method__}" "#{self.class} does not implement #{__method__}"
......
...@@ -89,6 +89,7 @@ module EE ...@@ -89,6 +89,7 @@ module EE
scope :with_wiki_enabled, -> { with_feature_enabled(:wiki) } scope :with_wiki_enabled, -> { with_feature_enabled(:wiki) }
scope :within_shards, -> (shard_names) { where(repository_storage: Array(shard_names)) } scope :within_shards, -> (shard_names) { where(repository_storage: Array(shard_names)) }
scope :outside_shards, -> (shard_names) { where.not(repository_storage: Array(shard_names)) }
scope :verification_failed_repos, -> { joins(:repository_state).merge(ProjectRepositoryState.verification_failed_repos) } scope :verification_failed_repos, -> { joins(:repository_state).merge(ProjectRepositoryState.verification_failed_repos) }
scope :verification_failed_wikis, -> { joins(:repository_state).merge(ProjectRepositoryState.verification_failed_wikis) } scope :verification_failed_wikis, -> { joins(:repository_state).merge(ProjectRepositoryState.verification_failed_wikis) }
scope :for_plan_name, -> (name) { joins(namespace: :plan).where(plans: { name: name }) } scope :for_plan_name, -> (name) { joins(namespace: :plan).where(plans: { name: name }) }
......
...@@ -78,6 +78,14 @@ module Geo ...@@ -78,6 +78,14 @@ module Geo
.within_shards(selective_sync_shards) .within_shards(selective_sync_shards)
end end
def project_model
Geo::Fdw::Project
end
def projects_table
Geo::Fdw::Project.arel_table
end
def uploads_model def uploads_model
Geo::Fdw::Upload Geo::Fdw::Upload
end end
......
...@@ -336,6 +336,14 @@ class GeoNode < ApplicationRecord ...@@ -336,6 +336,14 @@ class GeoNode < ApplicationRecord
Project.within_shards(selective_sync_shards) Project.within_shards(selective_sync_shards)
end end
def project_model
Project
end
def projects_table
Project.arel_table
end
def uploads_model def uploads_model
Upload Upload
end end
......
...@@ -10,7 +10,6 @@ module Geo ...@@ -10,7 +10,6 @@ module Geo
@old_shards = geo_node.selective_sync_shards @old_shards = geo_node.selective_sync_shards
@params = params.dup @params = params.dup
@params[:namespace_ids] = @params[:namespace_ids].to_s.split(',') @params[:namespace_ids] = @params[:namespace_ids].to_s.split(',')
end end
......
...@@ -19,7 +19,11 @@ module Geo ...@@ -19,7 +19,11 @@ module Geo
def execute def execute
destroy_project destroy_project
delete_project_registry_entries destroy_registry_entries
rescue => e
log_error('Could not destroy repository', e, project_id: id, shard: repository_storage, disk_path: disk_path)
destroy_registry_entries
raise
end end
private private
...@@ -29,7 +33,7 @@ module Geo ...@@ -29,7 +33,7 @@ module Geo
end end
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def delete_project_registry_entries def destroy_registry_entries
::Geo::ProjectRegistry.where(project_id: id).delete_all ::Geo::ProjectRegistry.where(project_id: id).delete_all
log_info("Project registry entry removed", project_id: id) log_info("Project registry entry removed", project_id: id)
......
...@@ -11,14 +11,13 @@ module Geo ...@@ -11,14 +11,13 @@ module Geo
BATCH_SIZE = 250 BATCH_SIZE = 250
LEASE_TIMEOUT = 60.minutes LEASE_TIMEOUT = 60.minutes
# rubocop: disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def perform(geo_node_id) def perform(geo_node_id)
# Prevent multiple Sidekiq workers from performing repositories clean up
try_obtain_lease do try_obtain_lease do
geo_node = GeoNode.find(geo_node_id) node = GeoNode.find(geo_node_id)
break unless geo_node.selective_sync? break unless node.selective_sync?
Project.where.not(id: geo_node.projects).find_in_batches(batch_size: BATCH_SIZE) do |batch| projects_to_clean_up(node).find_in_batches(batch_size: BATCH_SIZE) do |batch|
batch.each do |project| batch.each do |project|
clean_up_repositories(project) clean_up_repositories(project)
end end
...@@ -27,19 +26,33 @@ module Geo ...@@ -27,19 +26,33 @@ module Geo
rescue ActiveRecord::RecordNotFound => error rescue ActiveRecord::RecordNotFound => error
log_error('Could not find Geo node, skipping repositories clean up', error, geo_node_id: geo_node_id) log_error('Could not find Geo node, skipping repositories clean up', error, geo_node_id: geo_node_id)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
private private
def projects_to_clean_up(node)
if node.selective_sync_by_namespaces?
node.projects_outside_selected_namespaces
elsif node.selective_sync_by_shards?
node.projects_outside_selected_shards
else
Project.none
end
end
# rubocop:disable CodeReuse/ActiveRecord
def clean_up_repositories(project) def clean_up_repositories(project)
return unless Geo::ProjectRegistry.exists?(project_id: project.id)
job_id = ::Geo::RepositoryCleanupWorker.perform_async(project.id, project.name, project.disk_path, project.repository.storage) job_id = ::Geo::RepositoryCleanupWorker.perform_async(project.id, project.name, project.disk_path, project.repository.storage)
if job_id if job_id
log_info('Repository clean up scheduled', project_id: project.id, shard: project.repository.storage, disk_path: project.disk_path, job_id: job_id) log_info('Repository clean up scheduled', project_id: project.id, shard: project.repository.storage, disk_path: project.disk_path, job_id: job_id)
else else
log_error('Could not clean up repository', project_id: project.id, shard: project.repository.storage, disk_path: project.disk_path) log_error('Could not schedule a repository clean up', project_id: project.id, shard: project.repository.storage, disk_path: project.disk_path)
end end
end end
# rubocop:enable CodeReuse/ActiveRecord
def lease_timeout def lease_timeout
LEASE_TIMEOUT LEASE_TIMEOUT
......
---
title: Geo - Improve performance of the selective sync cleanup worker
merge_request: 11998
author:
type: performance
...@@ -4,9 +4,6 @@ describe Geo::RepositoryDestroyService do ...@@ -4,9 +4,6 @@ describe Geo::RepositoryDestroyService do
include ::EE::GeoHelpers include ::EE::GeoHelpers
set(:secondary) { create(:geo_node) } set(:secondary) { create(:geo_node) }
let(:project) { create(:project_empty_repo) }
subject(:service) { described_class.new(project.id, project.name, project.disk_path, project.repository_storage) }
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
...@@ -14,6 +11,9 @@ describe Geo::RepositoryDestroyService do ...@@ -14,6 +11,9 @@ describe Geo::RepositoryDestroyService do
describe '#async_execute' do describe '#async_execute' do
it 'starts the worker' do it 'starts the worker' do
project = create(:project)
subject = described_class.new(project.id, project.name, project.disk_path, project.repository_storage)
expect(GeoRepositoryDestroyWorker).to receive(:perform_async) expect(GeoRepositoryDestroyWorker).to receive(:perform_async)
subject.async_execute subject.async_execute
...@@ -21,21 +21,17 @@ describe Geo::RepositoryDestroyService do ...@@ -21,21 +21,17 @@ describe Geo::RepositoryDestroyService do
end end
describe '#execute' do describe '#execute' do
context 'with a project on a legacy storage' do
let(:project) { create(:project_empty_repo, :legacy_storage) }
subject(:service) { described_class.new(project.id, project.name, project.disk_path, project.repository_storage) }
it 'delegates project removal to Projects::DestroyService' do it 'delegates project removal to Projects::DestroyService' do
expect_any_instance_of(EE::Projects::DestroyService).to receive(:geo_replicate) expect_any_instance_of(EE::Projects::DestroyService).to receive(:geo_replicate)
service.execute service.execute
end end
it 'removes the tracking entry' do
create(:geo_project_registry, project: project)
expect { service.execute }.to change(Geo::ProjectRegistry, :count).by(-1)
end
context 'legacy storage project' do
let(:project) { create(:project_empty_repo, :legacy_storage) }
it 'removes the repository from disk' do it 'removes the repository from disk' do
project.delete project.delete
...@@ -55,9 +51,25 @@ describe Geo::RepositoryDestroyService do ...@@ -55,9 +51,25 @@ describe Geo::RepositoryDestroyService do
service.execute service.execute
end end
it 'removes the tracking entry' do
create(:geo_project_registry, project: project)
expect { service.execute }.to change(Geo::ProjectRegistry, :count).by(-1)
end
end
context 'with a project on a hashed storage' do
let(:project) { create(:project_empty_repo) }
subject(:service) { described_class.new(project.id, project.name, project.disk_path, project.repository_storage) }
it 'delegates project removal to Projects::DestroyService' do
expect_any_instance_of(EE::Projects::DestroyService).to receive(:geo_replicate)
service.execute
end end
context 'hashed storage project' do
it 'removes the repository from disk' do it 'removes the repository from disk' do
project.delete project.delete
...@@ -77,6 +89,32 @@ describe Geo::RepositoryDestroyService do ...@@ -77,6 +89,32 @@ describe Geo::RepositoryDestroyService do
service.execute service.execute
end end
it 'removes the tracking entry' do
create(:geo_project_registry, project: project)
expect { service.execute }.to change(Geo::ProjectRegistry, :count).by(-1)
end
end
context 'with a project on a broken storage' do
let(:project) { create(:project, :broken_storage) }
subject(:service) { described_class.new(project.id, project.name, project.disk_path, project.repository_storage) }
it 'delegates project removal to Projects::DestroyService' do
expect_any_instance_of(EE::Projects::DestroyService).to receive(:geo_replicate)
service.execute
end
it 'removes the tracking entry' do
create(:geo_project_registry, project: project)
expect { service.execute }.to raise_error RuntimeError, 'storage not found: "broken"'
expect(Geo::ProjectRegistry.where(project: project)).to be_empty
end
end end
end end
end end
require 'spec_helper' require 'spec_helper'
describe Geo::RepositoriesCleanUpWorker, :geo do describe Geo::RepositoriesCleanUpWorker, :geo, :geo_fdw do
include ::EE::GeoHelpers include ::EE::GeoHelpers
include ExclusiveLeaseHelpers include ExclusiveLeaseHelpers
describe '#perform' do describe '#perform' do
set(:secondary) { create(:geo_node) } let(:secondary) { create(:geo_node) }
let(:synced_group) { create(:group) }
let(:synced_subgroup) { create(:group, parent: synced_group) }
let(:unsynced_group) { create(:group) }
let(:project_1) { create(:project, group: synced_group) }
let(:project_2) { create(:project, group: synced_group) }
let!(:project_3) { create(:project, :repository, group: unsynced_group) }
let(:project_4) { create(:project, :repository, group: unsynced_group) }
let(:project_5) { create(:project, group: synced_subgroup) }
let(:project_6) { create(:project, group: synced_subgroup) }
let(:project_7) { create(:project) }
let(:project_8) { create(:project) }
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
stub_exclusive_lease stub_exclusive_lease
end
context 'when node has selective sync enabled' do create(:geo_project_registry, project: project_1)
let(:synced_group) { create(:group) } create(:geo_project_registry, project: project_2)
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) } create(:geo_project_registry, project: project_4)
create(:geo_project_registry, project: project_5)
context 'legacy storage' do create(:geo_project_registry, project: project_6)
it 'performs Geo::RepositoryCleanupWorker for each project that does not belong to selected namespaces to replicate' do create(:geo_project_registry, project: project_7)
project_in_synced_group = create(:project, :legacy_storage, group: synced_group) create(:geo_project_registry, project: project_8)
unsynced_project = create(:project, :repository, :legacy_storage) end
disk_path = "#{unsynced_project.namespace.full_path}/#{unsynced_project.path}"
expect(Geo::RepositoryCleanupWorker).to receive(:perform_async) it 'does not perform Geo::RepositoryCleanupWorker when cannnot obtain a lease' do
.with(unsynced_project.id, unsynced_project.name, disk_path, unsynced_project.repository.storage) stub_exclusive_lease_taken
.once.and_return(1)
expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async) expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async)
.with(project_in_synced_group.id, project_in_synced_group.name, project_in_synced_group.disk_path, project_in_synced_group.repository.storage)
subject.perform(secondary.id) subject.perform(secondary.id)
end end
end
context 'hashed storage' do it 'does not raise an error when node could not be found' do
before do expect { subject.perform(-1) }.not_to raise_error
stub_application_setting(hashed_storage_enabled: true)
end end
it 'performs Geo::RepositoryCleanupWorker for each project that does not belong to selected namespaces to replicate' do context 'without selective sync' do
project_in_synced_group = create(:project, group: synced_group) it 'does not perform Geo::RepositoryCleanupWorker' do
unsynced_project = create(:project, :repository)
hash = Digest::SHA2.hexdigest(unsynced_project.id.to_s)
disk_path = "@hashed/#{hash[0..1]}/#{hash[2..3]}/#{hash}"
expect(Geo::RepositoryCleanupWorker).to receive(:perform_async)
.with(unsynced_project.id, unsynced_project.name, disk_path, unsynced_project.repository.storage)
.once.and_return(1)
expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async) expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async)
.with(project_in_synced_group.id, project_in_synced_group.name, project_in_synced_group.disk_path, project_in_synced_group.repository.storage)
subject.perform(secondary.id) subject.perform(secondary.id)
end end
end end
context 'when the project repository does not exist on disk' do context 'with selective sync by namespace' do
let(:project) { create(:project) } before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end
it 'performs Geo::RepositoryCleanupWorker' do it 'performs the clean up worker for projects that does not belong to the selected namespaces' do
[project_4, project_7, project_8].each do |project|
expect(Geo::RepositoryCleanupWorker).to receive(:perform_async) expect(Geo::RepositoryCleanupWorker).to receive(:perform_async)
.with(project.id, anything, anything, anything) .with(project.id, project.name, project.disk_path, project.repository.storage)
.once .once
.and_return(1) .and_return(1)
end
[project_1, project_2, project_3, project_5, project_6].each do |project|
expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async)
.with(project.id, project.name, project.disk_path, project.repository.storage)
end
subject.perform(secondary.id) subject.perform(secondary.id)
end end
it 'does not leave orphaned entries in the project_registry table' do it 'does not leave orphaned entries in the project_registry table' do
create(:geo_project_registry, :sync_failed, project: project)
Sidekiq::Testing.inline! do Sidekiq::Testing.inline! do
subject.perform(secondary.id) subject.perform(secondary.id)
end end
expect(Geo::ProjectRegistry.where(project_id: project)).to be_empty expect(Geo::ProjectRegistry.where(project_id: [project_3, project_4, project_7, project_8])).to be_empty
end
end end
end end
it 'does not perform Geo::RepositoryCleanupWorker when does not node have namespace restrictions' do context 'with selective sync by shard' do
expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async) before do
project_7.update_column(:repository_storage, 'broken')
project_8.update_column(:repository_storage, 'broken')
subject.perform(secondary.id) secondary.update!(selective_sync_type: 'shards', selective_sync_shards: ['broken'])
end end
it 'does not perform Geo::RepositoryCleanupWorker when cannnot obtain a lease' do it 'performs the clean up worker for synced projects that does not belong to the selected shards' do
stub_exclusive_lease_taken [project_1, project_2, project_4, project_5, project_6].each do |project|
expect(Geo::RepositoryCleanupWorker).to receive(:perform_async)
.with(project.id, project.name, project.disk_path, project.repository.storage)
.once
.and_return(1)
end
[project_3, project_7, project_8].each do |project|
expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async) expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async)
.with(project.id, project.name, project.disk_path, project.repository.storage)
end
subject.perform(secondary.id) subject.perform(secondary.id)
end end
it 'does not raise an error when node could not be found' do it 'does not leave orphaned entries in the project_registry table' do
expect { subject.perform(-1) }.not_to raise_error Sidekiq::Testing.inline! do
subject.perform(secondary.id)
end
expect(Geo::ProjectRegistry.where(project_id: [project_1, project_2, project_3, project_4, project_5, project_6])).to be_empty
end
end end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment