Commit e4007af2 authored by James Fargher's avatar James Fargher

Remove repository backup concurrency code

Now that we do not need to maintain the original rails based repository
backup strategy we can remove all the associated concurrency code.
Concurrency will now be handled by the `gitaly-backup` command.
parent 25e7fb5a
......@@ -7,11 +7,6 @@ module EE
private
override :repository_storage_klasses
def repository_storage_klasses
super << GroupWikiRepository
end
def group_relation
::Group.includes(:route, :owners, group_wiki_repository: :shard) # rubocop: disable CodeReuse/ActiveRecord
end
......@@ -22,15 +17,6 @@ module EE
end
end
override :enqueue_container
def enqueue_container(container)
if container.is_a?(Group)
enqueue_group(container)
else
super
end
end
def enqueue_group(group)
strategy.enqueue(group, ::Gitlab::GlRepository::WIKI)
end
......@@ -47,15 +33,6 @@ module EE
enqueue_group(group)
end
end
override :records_to_enqueue
def records_to_enqueue(storage)
super << groups_in_storage(storage)
end
def groups_in_storage(storage)
group_relation.id_in(GroupWikiRepository.for_repository_storage(storage).select(:group_id))
end
end
end
end
......@@ -9,6 +9,8 @@ RSpec.describe Backup::Repositories do
subject { described_class.new(progress, strategy: strategy) }
describe '#dump' do
let_it_be(:groups) { create_list(:group, 5, :wiki_repo) }
context 'hashed storage' do
let_it_be(:project) { create(:project, :repository) }
let_it_be(:group) { create(:group, :wiki_repo) }
......@@ -16,7 +18,7 @@ RSpec.describe Backup::Repositories do
it 'calls enqueue for each repository type', :aggregate_failures do
create(:wiki_page, container: group)
subject.dump(max_concurrency: 1, max_storage_concurrency: 1)
subject.dump
expect(strategy).to have_received(:start).with(:create)
expect(strategy).to have_received(:enqueue).with(project, Gitlab::GlRepository::PROJECT)
......@@ -25,46 +27,30 @@ RSpec.describe Backup::Repositories do
end
end
context 'no concurrency' do
let_it_be(:groups) { create_list(:group, 5, :wiki_repo) }
it 'creates the expected number of threads' do
expect(Thread).not_to receive(:new)
context 'command failure' do
it 'enqueue_group raises an error' do
allow(strategy).to receive(:enqueue).with(anything, Gitlab::GlRepository::WIKI).and_raise(IOError)
expect(strategy).to receive(:start).with(:create)
groups.each do |group|
expect(strategy).to receive(:enqueue).with(group, Gitlab::GlRepository::WIKI)
end
expect(strategy).to receive(:wait)
subject.dump(max_concurrency: 1, max_storage_concurrency: 1)
expect { subject.dump }.to raise_error(IOError)
end
describe 'command failure' do
it 'enqueue_group raises an error' do
allow(strategy).to receive(:enqueue).with(anything, Gitlab::GlRepository::WIKI).and_raise(IOError)
expect { subject.dump(max_concurrency: 1, max_storage_concurrency: 1) }.to raise_error(IOError)
end
it 'group query raises an error' do
allow(Group).to receive_message_chain(:includes, :find_each).and_raise(ActiveRecord::StatementTimeout)
it 'group query raises an error' do
allow(Group).to receive_message_chain(:includes, :find_each).and_raise(ActiveRecord::StatementTimeout)
expect { subject.dump(max_concurrency: 1, max_storage_concurrency: 1) }.to raise_error(ActiveRecord::StatementTimeout)
end
expect { subject.dump }.to raise_error(ActiveRecord::StatementTimeout)
end
end
it 'avoids N+1 database queries' do
control_count = ActiveRecord::QueryRecorder.new do
subject.dump(max_concurrency: 1, max_storage_concurrency: 1)
end.count
it 'avoids N+1 database queries' do
control_count = ActiveRecord::QueryRecorder.new do
subject.dump
end.count
create_list(:group, 2, :wiki_repo)
create_list(:group, 2, :wiki_repo)
expect do
subject.dump(max_concurrency: 1, max_storage_concurrency: 1)
end.not_to exceed_query_limit(control_count)
end
expect do
subject.dump
end.not_to exceed_query_limit(control_count)
end
end
......
......@@ -57,10 +57,6 @@ module Backup
}.merge(Gitlab::GitalyClient.connection_data(repository.storage)).to_json)
end
def parallel_enqueue?
false
end
private
def started?
......
......@@ -9,36 +9,10 @@ module Backup
@strategy = strategy
end
def dump(max_concurrency:, max_storage_concurrency:)
def dump
strategy.start(:create)
enqueue_consecutive
# gitaly-backup is designed to handle concurrency on its own. So we want
# to avoid entering the buggy concurrency code here when gitaly-backup
# is enabled.
if (max_concurrency <= 1 && max_storage_concurrency <= 1) || !strategy.parallel_enqueue?
return enqueue_consecutive
end
check_valid_storages!
semaphore = Concurrent::Semaphore.new(max_concurrency)
errors = Queue.new
threads = Gitlab.config.repositories.storages.keys.map do |storage|
Thread.new do
Rails.application.executor.wrap do
enqueue_storage(storage, semaphore, max_storage_concurrency: max_storage_concurrency)
rescue StandardError => e
errors << e
end
end
end
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
threads.each(&:join)
end
raise errors.pop unless errors.empty?
ensure
strategy.wait
end
......@@ -58,18 +32,6 @@ module Backup
attr_reader :progress, :strategy
def check_valid_storages!
repository_storage_klasses.each do |klass|
if klass.excluding_repository_storage(Gitlab.config.repositories.storages.keys).exists?
raise Error, "repositories.storages in gitlab.yml does not include all storages used by #{klass}"
end
end
end
def repository_storage_klasses
[ProjectRepository, SnippetRepository]
end
def enqueue_consecutive
enqueue_consecutive_projects
enqueue_consecutive_snippets
......@@ -85,50 +47,6 @@ module Backup
Snippet.find_each(batch_size: 1000) { |snippet| enqueue_snippet(snippet) }
end
def enqueue_storage(storage, semaphore, max_storage_concurrency:)
errors = Queue.new
queue = InterlockSizedQueue.new(1)
threads = Array.new(max_storage_concurrency) do
Thread.new do
Rails.application.executor.wrap do
while container = queue.pop
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
semaphore.acquire
end
begin
enqueue_container(container)
rescue StandardError => e
errors << e
break
ensure
semaphore.release
end
end
end
end
end
enqueue_records_for_storage(storage, queue, errors)
raise errors.pop unless errors.empty?
ensure
queue.close
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
threads.each(&:join)
end
end
def enqueue_container(container)
case container
when Project
enqueue_project(container)
when Snippet
enqueue_snippet(container)
end
end
def enqueue_project(project)
strategy.enqueue(project, Gitlab::GlRepository::PROJECT)
strategy.enqueue(project, Gitlab::GlRepository::WIKI)
......@@ -139,32 +57,10 @@ module Backup
strategy.enqueue(snippet, Gitlab::GlRepository::SNIPPET)
end
def enqueue_records_for_storage(storage, queue, errors)
records_to_enqueue(storage).each do |relation|
relation.find_each(batch_size: 100) do |project|
break unless errors.empty?
queue.push(project)
end
end
end
def records_to_enqueue(storage)
[projects_in_storage(storage), snippets_in_storage(storage)]
end
def projects_in_storage(storage)
project_relation.id_in(ProjectRepository.for_repository_storage(storage).select(:project_id))
end
def project_relation
Project.includes(:route, :group, namespace: :owner)
end
def snippets_in_storage(storage)
Snippet.id_in(SnippetRepository.for_repository_storage(storage).select(:snippet_id))
end
def restore_object_pools
PoolRepository.includes(:source_project).find_each do |pool|
progress.puts " - Object pool #{pool.disk_path}..."
......@@ -199,24 +95,6 @@ module Backup
Snippet.id_in(invalid_snippets).delete_all
end
class InterlockSizedQueue < SizedQueue
extend ::Gitlab::Utils::Override
override :pop
def pop(*)
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
super
end
end
override :push
def push(*)
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
super
end
end
end
end
end
......
......@@ -102,19 +102,10 @@ namespace :gitlab do
task create: :gitlab_environment do
puts_time "Dumping repositories ...".color(:blue)
max_concurrency = ENV.fetch('GITLAB_BACKUP_MAX_CONCURRENCY', 1).to_i
max_storage_concurrency = ENV.fetch('GITLAB_BACKUP_MAX_STORAGE_CONCURRENCY', 1).to_i
if ENV["SKIP"] && ENV["SKIP"].include?("repositories")
puts_time "[SKIPPED]".color(:cyan)
elsif max_concurrency < 1 || max_storage_concurrency < 1
puts "GITLAB_BACKUP_MAX_CONCURRENCY and GITLAB_BACKUP_MAX_STORAGE_CONCURRENCY must have a value of at least 1".color(:red)
exit 1
else
Backup::Repositories.new(progress, strategy: repository_backup_strategy).dump(
max_concurrency: max_concurrency,
max_storage_concurrency: max_storage_concurrency
)
Backup::Repositories.new(progress, strategy: repository_backup_strategy).dump
puts_time "done".color(:green)
end
end
......
......@@ -4,8 +4,7 @@ require 'spec_helper'
RSpec.describe Backup::Repositories do
let(:progress) { spy(:stdout) }
let(:parallel_enqueue) { true }
let(:strategy) { spy(:strategy, parallel_enqueue?: parallel_enqueue) }
let(:strategy) { spy(:strategy) }
subject { described_class.new(progress, strategy: strategy) }
......@@ -17,7 +16,7 @@ RSpec.describe Backup::Repositories do
project_snippet = create(:project_snippet, :repository, project: project)
personal_snippet = create(:personal_snippet, :repository, author: project.owner)
subject.dump(max_concurrency: 1, max_storage_concurrency: 1)
subject.dump
expect(strategy).to have_received(:start).with(:create)
expect(strategy).to have_received(:enqueue).with(project, Gitlab::GlRepository::PROJECT)
......@@ -41,132 +40,30 @@ RSpec.describe Backup::Repositories do
it_behaves_like 'creates repository bundles'
end
context 'no concurrency' do
it 'creates the expected number of threads' do
expect(Thread).not_to receive(:new)
context 'command failure' do
it 'enqueue_project raises an error' do
allow(strategy).to receive(:enqueue).with(anything, Gitlab::GlRepository::PROJECT).and_raise(IOError)
expect(strategy).to receive(:start).with(:create)
projects.each do |project|
expect(strategy).to receive(:enqueue).with(project, Gitlab::GlRepository::PROJECT)
end
expect(strategy).to receive(:wait)
subject.dump(max_concurrency: 1, max_storage_concurrency: 1)
end
describe 'command failure' do
it 'enqueue_project raises an error' do
allow(strategy).to receive(:enqueue).with(anything, Gitlab::GlRepository::PROJECT).and_raise(IOError)
expect { subject.dump(max_concurrency: 1, max_storage_concurrency: 1) }.to raise_error(IOError)
end
it 'project query raises an error' do
allow(Project).to receive_message_chain(:includes, :find_each).and_raise(ActiveRecord::StatementTimeout)
expect { subject.dump(max_concurrency: 1, max_storage_concurrency: 1) }.to raise_error(ActiveRecord::StatementTimeout)
end
end
it 'avoids N+1 database queries' do
control_count = ActiveRecord::QueryRecorder.new do
subject.dump(max_concurrency: 1, max_storage_concurrency: 1)
end.count
create_list(:project, 2, :repository)
expect do
subject.dump(max_concurrency: 1, max_storage_concurrency: 1)
end.not_to exceed_query_limit(control_count)
expect { subject.dump }.to raise_error(IOError)
end
end
context 'concurrency with a strategy without parallel enqueueing support' do
let(:parallel_enqueue) { false }
it 'project query raises an error' do
allow(Project).to receive_message_chain(:includes, :find_each).and_raise(ActiveRecord::StatementTimeout)
it 'enqueues all projects sequentially' do
expect(Thread).not_to receive(:new)
expect(strategy).to receive(:start).with(:create)
projects.each do |project|
expect(strategy).to receive(:enqueue).with(project, Gitlab::GlRepository::PROJECT)
end
expect(strategy).to receive(:wait)
subject.dump(max_concurrency: 2, max_storage_concurrency: 2)
expect { subject.dump }.to raise_error(ActiveRecord::StatementTimeout)
end
end
[4, 10].each do |max_storage_concurrency|
context "max_storage_concurrency #{max_storage_concurrency}", quarantine: 'https://gitlab.com/gitlab-org/gitlab/-/issues/241701' do
let(:storage_keys) { %w[default test_second_storage] }
before do
allow(Gitlab.config.repositories.storages).to receive(:keys).and_return(storage_keys)
end
it 'avoids N+1 database queries' do
control_count = ActiveRecord::QueryRecorder.new do
subject.dump
end.count
it 'creates the expected number of threads' do
expect(Thread).to receive(:new)
.exactly(storage_keys.length * (max_storage_concurrency + 1)).times
.and_call_original
create_list(:project, 2, :repository)
expect(strategy).to receive(:start).with(:create)
projects.each do |project|
expect(strategy).to receive(:enqueue).with(project, Gitlab::GlRepository::PROJECT)
end
expect(strategy).to receive(:wait)
subject.dump(max_concurrency: 1, max_storage_concurrency: max_storage_concurrency)
end
it 'creates the expected number of threads with extra max concurrency' do
expect(Thread).to receive(:new)
.exactly(storage_keys.length * (max_storage_concurrency + 1)).times
.and_call_original
expect(strategy).to receive(:start).with(:create)
projects.each do |project|
expect(strategy).to receive(:enqueue).with(project, Gitlab::GlRepository::PROJECT)
end
expect(strategy).to receive(:wait)
subject.dump(max_concurrency: 3, max_storage_concurrency: max_storage_concurrency)
end
describe 'command failure' do
it 'enqueue_project raises an error' do
allow(strategy).to receive(:enqueue).and_raise(IOError)
expect { subject.dump(max_concurrency: 1, max_storage_concurrency: max_storage_concurrency) }.to raise_error(IOError)
end
it 'project query raises an error' do
allow(Project).to receive_message_chain(:for_repository_storage, :includes, :find_each).and_raise(ActiveRecord::StatementTimeout)
expect { subject.dump(max_concurrency: 1, max_storage_concurrency: max_storage_concurrency) }.to raise_error(ActiveRecord::StatementTimeout)
end
context 'misconfigured storages' do
let(:storage_keys) { %w[test_second_storage] }
it 'raises an error' do
expect { subject.dump(max_concurrency: 1, max_storage_concurrency: max_storage_concurrency) }.to raise_error(Backup::Error, 'repositories.storages in gitlab.yml is misconfigured')
end
end
end
it 'avoids N+1 database queries' do
control_count = ActiveRecord::QueryRecorder.new do
subject.dump(max_concurrency: 1, max_storage_concurrency: max_storage_concurrency)
end.count
create_list(:project, 2, :repository)
expect do
subject.dump(max_concurrency: 1, max_storage_concurrency: max_storage_concurrency)
end.not_to exceed_query_limit(control_count)
end
end
expect do
subject.dump
end.not_to exceed_query_limit(control_count)
end
end
......
......@@ -383,30 +383,10 @@ RSpec.describe 'gitlab:app namespace rake task', :delete do
create(:project, :repository)
end
it 'has defaults' do
expect_next_instance_of(::Backup::Repositories) do |instance|
expect(instance).to receive(:dump)
.with(max_concurrency: 1, max_storage_concurrency: 1)
.and_call_original
end
expect { run_rake_task('gitlab:backup:create') }.to output.to_stdout_from_any_process
end
it 'passes through concurrency environment variables' do
# The way concurrency is handled will change with the `gitaly_backup`
# feature flag. For now we need to check that both ways continue to
# work. This will be cleaned up in the rollout issue.
# See https://gitlab.com/gitlab-org/gitlab/-/issues/333034
stub_env('GITLAB_BACKUP_MAX_CONCURRENCY', 5)
stub_env('GITLAB_BACKUP_MAX_STORAGE_CONCURRENCY', 2)
expect_next_instance_of(::Backup::Repositories) do |instance|
expect(instance).to receive(:dump)
.with(max_concurrency: 5, max_storage_concurrency: 2)
.and_call_original
end
expect(::Backup::GitalyBackup).to receive(:new).with(anything, parallel: 5, parallel_storage: 2).and_call_original
expect { run_rake_task('gitlab:backup:create') }.to output.to_stdout_from_any_process
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment