Commit 9f85c446 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre Committed by Nick Thomas

Use LEFT OUTER JOIN with CTE instead of NOT IN

This change improves the performance of the query
to retrieve the projects that need to be cleaned up.
parent 6e72ac37
......@@ -11,6 +11,30 @@ module Geo::SelectiveSync
end
end
def projects_outside_selected_namespaces
return project_model.none unless selective_sync_by_namespaces?
cte_query = selected_namespaces_and_descendants_cte
cte_table = cte_query.table
join_statement =
projects_table
.join(cte_table, Arel::Nodes::OuterJoin)
.on(projects_table[:namespace_id].eq(cte_table[:id]))
project_model
.joins(join_statement.join_sources)
.where(cte_table[:id].eq(nil))
.with
.recursive(cte_query.to_arel)
end
def projects_outside_selected_shards
return project_model.none unless selective_sync_by_shards?
project_model.outside_shards(selective_sync_shards)
end
def selective_sync?
selective_sync_type.present?
end
......@@ -135,6 +159,16 @@ module Geo::SelectiveSync
namespaces.arel_table
end
def project_model
raise NotImplementedError,
"#{self.class} does not implement #{__method__}"
end
def projects_table
raise NotImplementedError,
"#{self.class} does not implement #{__method__}"
end
def uploads_model
raise NotImplementedError,
"#{self.class} does not implement #{__method__}"
......
......@@ -89,6 +89,7 @@ module EE
scope :with_wiki_enabled, -> { with_feature_enabled(:wiki) }
scope :within_shards, -> (shard_names) { where(repository_storage: Array(shard_names)) }
scope :outside_shards, -> (shard_names) { where.not(repository_storage: Array(shard_names)) }
scope :verification_failed_repos, -> { joins(:repository_state).merge(ProjectRepositoryState.verification_failed_repos) }
scope :verification_failed_wikis, -> { joins(:repository_state).merge(ProjectRepositoryState.verification_failed_wikis) }
scope :for_plan_name, -> (name) { joins(namespace: :plan).where(plans: { name: name }) }
......
......@@ -78,6 +78,14 @@ module Geo
.within_shards(selective_sync_shards)
end
def project_model
Geo::Fdw::Project
end
def projects_table
Geo::Fdw::Project.arel_table
end
def uploads_model
Geo::Fdw::Upload
end
......
......@@ -336,6 +336,14 @@ class GeoNode < ApplicationRecord
Project.within_shards(selective_sync_shards)
end
def project_model
Project
end
def projects_table
Project.arel_table
end
def uploads_model
Upload
end
......
......@@ -5,12 +5,11 @@ module Geo
attr_reader :geo_node, :old_namespace_ids, :old_shards, :params
def initialize(geo_node, params)
@geo_node = geo_node
@geo_node = geo_node
@old_namespace_ids = geo_node.namespace_ids
@old_shards = geo_node.selective_sync_shards
@old_shards = geo_node.selective_sync_shards
@params = params.dup
@params[:namespace_ids] = @params[:namespace_ids].to_s.split(',')
end
......
......@@ -19,7 +19,11 @@ module Geo
def execute
destroy_project
delete_project_registry_entries
destroy_registry_entries
rescue => e
log_error('Could not destroy repository', e, project_id: id, shard: repository_storage, disk_path: disk_path)
destroy_registry_entries
raise
end
private
......@@ -29,7 +33,7 @@ module Geo
end
# rubocop: disable CodeReuse/ActiveRecord
def delete_project_registry_entries
def destroy_registry_entries
::Geo::ProjectRegistry.where(project_id: id).delete_all
log_info("Project registry entry removed", project_id: id)
......
......@@ -11,14 +11,13 @@ module Geo
BATCH_SIZE = 250
LEASE_TIMEOUT = 60.minutes
# rubocop: disable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def perform(geo_node_id)
# Prevent multiple Sidekiq workers from performing repositories clean up
try_obtain_lease do
geo_node = GeoNode.find(geo_node_id)
break unless geo_node.selective_sync?
node = GeoNode.find(geo_node_id)
break unless node.selective_sync?
Project.where.not(id: geo_node.projects).find_in_batches(batch_size: BATCH_SIZE) do |batch|
projects_to_clean_up(node).find_in_batches(batch_size: BATCH_SIZE) do |batch|
batch.each do |project|
clean_up_repositories(project)
end
......@@ -27,19 +26,33 @@ module Geo
rescue ActiveRecord::RecordNotFound => error
log_error('Could not find Geo node, skipping repositories clean up', error, geo_node_id: geo_node_id)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop:enable CodeReuse/ActiveRecord
private
def projects_to_clean_up(node)
if node.selective_sync_by_namespaces?
node.projects_outside_selected_namespaces
elsif node.selective_sync_by_shards?
node.projects_outside_selected_shards
else
Project.none
end
end
# rubocop:disable CodeReuse/ActiveRecord
def clean_up_repositories(project)
return unless Geo::ProjectRegistry.exists?(project_id: project.id)
job_id = ::Geo::RepositoryCleanupWorker.perform_async(project.id, project.name, project.disk_path, project.repository.storage)
if job_id
log_info('Repository clean up scheduled', project_id: project.id, shard: project.repository.storage, disk_path: project.disk_path, job_id: job_id)
else
log_error('Could not clean up repository', project_id: project.id, shard: project.repository.storage, disk_path: project.disk_path)
log_error('Could not schedule a repository clean up', project_id: project.id, shard: project.repository.storage, disk_path: project.disk_path)
end
end
# rubocop:enable CodeReuse/ActiveRecord
def lease_timeout
LEASE_TIMEOUT
......
---
title: Geo - Improve performance of the selective sync cleanup worker
merge_request: 11998
author:
type: performance
......@@ -4,9 +4,6 @@ describe Geo::RepositoryDestroyService do
include ::EE::GeoHelpers
set(:secondary) { create(:geo_node) }
let(:project) { create(:project_empty_repo) }
subject(:service) { described_class.new(project.id, project.name, project.disk_path, project.repository_storage) }
before do
stub_current_geo_node(secondary)
......@@ -14,6 +11,9 @@ describe Geo::RepositoryDestroyService do
describe '#async_execute' do
it 'starts the worker' do
project = create(:project)
subject = described_class.new(project.id, project.name, project.disk_path, project.repository_storage)
expect(GeoRepositoryDestroyWorker).to receive(:perform_async)
subject.async_execute
......@@ -21,20 +21,16 @@ describe Geo::RepositoryDestroyService do
end
describe '#execute' do
it 'delegates project removal to Projects::DestroyService' do
expect_any_instance_of(EE::Projects::DestroyService).to receive(:geo_replicate)
service.execute
end
context 'with a project on a legacy storage' do
let(:project) { create(:project_empty_repo, :legacy_storage) }
it 'removes the tracking entry' do
create(:geo_project_registry, project: project)
subject(:service) { described_class.new(project.id, project.name, project.disk_path, project.repository_storage) }
expect { service.execute }.to change(Geo::ProjectRegistry, :count).by(-1)
end
it 'delegates project removal to Projects::DestroyService' do
expect_any_instance_of(EE::Projects::DestroyService).to receive(:geo_replicate)
context 'legacy storage project' do
let(:project) { create(:project_empty_repo, :legacy_storage) }
service.execute
end
it 'removes the repository from disk' do
project.delete
......@@ -55,9 +51,25 @@ describe Geo::RepositoryDestroyService do
service.execute
end
it 'removes the tracking entry' do
create(:geo_project_registry, project: project)
expect { service.execute }.to change(Geo::ProjectRegistry, :count).by(-1)
end
end
context 'hashed storage project' do
context 'with a project on a hashed storage' do
let(:project) { create(:project_empty_repo) }
subject(:service) { described_class.new(project.id, project.name, project.disk_path, project.repository_storage) }
it 'delegates project removal to Projects::DestroyService' do
expect_any_instance_of(EE::Projects::DestroyService).to receive(:geo_replicate)
service.execute
end
it 'removes the repository from disk' do
project.delete
......@@ -77,6 +89,32 @@ describe Geo::RepositoryDestroyService do
service.execute
end
it 'removes the tracking entry' do
create(:geo_project_registry, project: project)
expect { service.execute }.to change(Geo::ProjectRegistry, :count).by(-1)
end
end
context 'with a project on a broken storage' do
let(:project) { create(:project, :broken_storage) }
subject(:service) { described_class.new(project.id, project.name, project.disk_path, project.repository_storage) }
it 'delegates project removal to Projects::DestroyService' do
expect_any_instance_of(EE::Projects::DestroyService).to receive(:geo_replicate)
service.execute
end
it 'removes the tracking entry' do
create(:geo_project_registry, project: project)
expect { service.execute }.to raise_error RuntimeError, 'storage not found: "broken"'
expect(Geo::ProjectRegistry.where(project: project)).to be_empty
end
end
end
end
require 'spec_helper'
describe Geo::RepositoriesCleanUpWorker, :geo do
describe Geo::RepositoriesCleanUpWorker, :geo, :geo_fdw do
include ::EE::GeoHelpers
include ExclusiveLeaseHelpers
describe '#perform' do
set(:secondary) { create(:geo_node) }
let(:secondary) { create(:geo_node) }
let(:synced_group) { create(:group) }
let(:synced_subgroup) { create(:group, parent: synced_group) }
let(:unsynced_group) { create(:group) }
let(:project_1) { create(:project, group: synced_group) }
let(:project_2) { create(:project, group: synced_group) }
let!(:project_3) { create(:project, :repository, group: unsynced_group) }
let(:project_4) { create(:project, :repository, group: unsynced_group) }
let(:project_5) { create(:project, group: synced_subgroup) }
let(:project_6) { create(:project, group: synced_subgroup) }
let(:project_7) { create(:project) }
let(:project_8) { create(:project) }
before do
stub_current_geo_node(secondary)
stub_exclusive_lease
end
context 'when node has selective sync enabled' do
let(:synced_group) { create(:group) }
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
create(:geo_project_registry, project: project_1)
create(:geo_project_registry, project: project_2)
create(:geo_project_registry, project: project_4)
create(:geo_project_registry, project: project_5)
create(:geo_project_registry, project: project_6)
create(:geo_project_registry, project: project_7)
create(:geo_project_registry, project: project_8)
end
context 'legacy storage' do
it 'performs Geo::RepositoryCleanupWorker for each project that does not belong to selected namespaces to replicate' do
project_in_synced_group = create(:project, :legacy_storage, group: synced_group)
unsynced_project = create(:project, :repository, :legacy_storage)
disk_path = "#{unsynced_project.namespace.full_path}/#{unsynced_project.path}"
it 'does not perform Geo::RepositoryCleanupWorker when cannnot obtain a lease' do
stub_exclusive_lease_taken
expect(Geo::RepositoryCleanupWorker).to receive(:perform_async)
.with(unsynced_project.id, unsynced_project.name, disk_path, unsynced_project.repository.storage)
.once.and_return(1)
expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async)
expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async)
.with(project_in_synced_group.id, project_in_synced_group.name, project_in_synced_group.disk_path, project_in_synced_group.repository.storage)
subject.perform(secondary.id)
end
subject.perform(secondary.id)
end
end
it 'does not raise an error when node could not be found' do
expect { subject.perform(-1) }.not_to raise_error
end
context 'hashed storage' do
before do
stub_application_setting(hashed_storage_enabled: true)
end
context 'without selective sync' do
it 'does not perform Geo::RepositoryCleanupWorker' do
expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async)
it 'performs Geo::RepositoryCleanupWorker for each project that does not belong to selected namespaces to replicate' do
project_in_synced_group = create(:project, group: synced_group)
unsynced_project = create(:project, :repository)
subject.perform(secondary.id)
end
end
hash = Digest::SHA2.hexdigest(unsynced_project.id.to_s)
disk_path = "@hashed/#{hash[0..1]}/#{hash[2..3]}/#{hash}"
context 'with selective sync by namespace' do
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end
it 'performs the clean up worker for projects that does not belong to the selected namespaces' do
[project_4, project_7, project_8].each do |project|
expect(Geo::RepositoryCleanupWorker).to receive(:perform_async)
.with(unsynced_project.id, unsynced_project.name, disk_path, unsynced_project.repository.storage)
.once.and_return(1)
.with(project.id, project.name, project.disk_path, project.repository.storage)
.once
.and_return(1)
end
[project_1, project_2, project_3, project_5, project_6].each do |project|
expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async)
.with(project_in_synced_group.id, project_in_synced_group.name, project_in_synced_group.disk_path, project_in_synced_group.repository.storage)
.with(project.id, project.name, project.disk_path, project.repository.storage)
end
subject.perform(secondary.id)
end
it 'does not leave orphaned entries in the project_registry table' do
Sidekiq::Testing.inline! do
subject.perform(secondary.id)
end
expect(Geo::ProjectRegistry.where(project_id: [project_3, project_4, project_7, project_8])).to be_empty
end
end
context 'with selective sync by shard' do
before do
project_7.update_column(:repository_storage, 'broken')
project_8.update_column(:repository_storage, 'broken')
context 'when the project repository does not exist on disk' do
let(:project) { create(:project) }
secondary.update!(selective_sync_type: 'shards', selective_sync_shards: ['broken'])
end
it 'performs Geo::RepositoryCleanupWorker' do
it 'performs the clean up worker for synced projects that does not belong to the selected shards' do
[project_1, project_2, project_4, project_5, project_6].each do |project|
expect(Geo::RepositoryCleanupWorker).to receive(:perform_async)
.with(project.id, anything, anything, anything)
.with(project.id, project.name, project.disk_path, project.repository.storage)
.once
.and_return(1)
subject.perform(secondary.id)
end
it 'does not leave orphaned entries in the project_registry table' do
create(:geo_project_registry, :sync_failed, project: project)
Sidekiq::Testing.inline! do
subject.perform(secondary.id)
end
expect(Geo::ProjectRegistry.where(project_id: project)).to be_empty
[project_3, project_7, project_8].each do |project|
expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async)
.with(project.id, project.name, project.disk_path, project.repository.storage)
end
end
end
it 'does not perform Geo::RepositoryCleanupWorker when does not node have namespace restrictions' do
expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async)
subject.perform(secondary.id)
end
it 'does not perform Geo::RepositoryCleanupWorker when cannnot obtain a lease' do
stub_exclusive_lease_taken
expect(Geo::RepositoryCleanupWorker).not_to receive(:perform_async)
subject.perform(secondary.id)
end
subject.perform(secondary.id)
end
it 'does not leave orphaned entries in the project_registry table' do
Sidekiq::Testing.inline! do
subject.perform(secondary.id)
end
it 'does not raise an error when node could not be found' do
expect { subject.perform(-1) }.not_to raise_error
expect(Geo::ProjectRegistry.where(project_id: [project_1, project_2, project_3, project_4, project_5, project_6])).to be_empty
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment