Commit 0d613d98 authored by Gabriel Mazetto's avatar Gabriel Mazetto Committed by Luke Duncalfe

Refactor repository backup related classes

The new `gitlay-backup create` uses a pipe based process
communication. This refactor makes it more explicit what
is going on and what is the expected message format
parent 0d54fa17
......@@ -21,7 +21,7 @@ RSpec.describe Backup::Repositories do
expect(strategy).to have_received(:start).with(:create)
expect(strategy).to have_received(:enqueue).with(project, Gitlab::GlRepository::PROJECT)
expect(strategy).to have_received(:enqueue).with(group, Gitlab::GlRepository::WIKI)
expect(strategy).to have_received(:wait)
expect(strategy).to have_received(:finish!)
end
end
......@@ -35,7 +35,7 @@ RSpec.describe Backup::Repositories do
groups.each do |group|
expect(strategy).to receive(:enqueue).with(group, Gitlab::GlRepository::WIKI)
end
expect(strategy).to receive(:wait)
expect(strategy).to receive(:finish!)
subject.dump(max_concurrency: 1, max_storage_concurrency: 1)
end
......@@ -78,7 +78,7 @@ RSpec.describe Backup::Repositories do
expect(strategy).to have_received(:start).with(:restore)
expect(strategy).to have_received(:enqueue).with(project, Gitlab::GlRepository::PROJECT)
expect(strategy).to have_received(:enqueue).with(group, Gitlab::GlRepository::WIKI)
expect(strategy).to have_received(:wait)
expect(strategy).to have_received(:finish!)
end
end
end
......@@ -2,11 +2,17 @@
module Backup
# Backup and restores repositories using gitaly-backup
#
# gitaly-backup can work in parallel and accepts a list of repositories
# through input pipe using a specific json format for both backup and restore
class GitalyBackup
def initialize(progress, parallel: nil, parallel_storage: nil)
# @param [StringIO] progress IO interface to output progress
# @param [Integer] max_parallelism max parallelism when running backups
# @param [Integer] storage_parallelism max parallelism per storage (is affected by max_parallelism)
def initialize(progress, max_parallelism: nil, storage_parallelism: nil)
@progress = progress
@parallel = parallel
@parallel_storage = parallel_storage
@max_parallelism = max_parallelism
@storage_parallelism = storage_parallelism
end
def start(type)
......@@ -22,20 +28,20 @@ module Backup
end
args = []
args += ['-parallel', @parallel.to_s] if @parallel
args += ['-parallel-storage', @parallel_storage.to_s] if @parallel_storage
args += ['-parallel', @max_parallelism.to_s] if @max_parallelism
args += ['-parallel-storage', @storage_parallelism.to_s] if @storage_parallelism
@stdin, stdout, @thread = Open3.popen2(build_env, bin_path, command, '-path', backup_repos_path, *args)
@input_stream, stdout, @thread = Open3.popen2(build_env, bin_path, command, '-path', backup_repos_path, *args)
@out_reader = Thread.new do
IO.copy_stream(stdout, @progress)
end
end
def wait
def finish!
return unless started?
@stdin.close
@input_stream.close
[@thread, @out_reader].each(&:join)
status = @thread.value
......@@ -49,12 +55,7 @@ module Backup
repository = repo_type.repository_for(container)
@stdin.puts({
storage_name: repository.storage,
relative_path: repository.relative_path,
gl_project_path: repository.gl_project_path,
always_create: repo_type.project?
}.merge(Gitlab::GitalyClient.connection_data(repository.storage)).to_json)
schedule_backup_job(repository, always_create: repo_type.project?)
end
def parallel_enqueue?
......@@ -63,6 +64,24 @@ module Backup
private
# Schedule a new backup job through a non-blocking JSON based pipe protocol
#
# @see https://gitlab.com/gitlab-org/gitaly/-/blob/master/doc/gitaly-backup.md
def schedule_backup_job(repository, always_create:)
connection_params = Gitlab::GitalyClient.connection_data(repository.storage)
json_job = {
address: connection_params['address'],
token: connection_params['token'],
storage_name: repository.storage,
relative_path: repository.relative_path,
gl_project_path: repository.gl_project_path,
always_create: always_create
}.to_json
@input_stream.puts(json_job)
end
def build_env
{
'SSL_CERT_FILE' => OpenSSL::X509::DEFAULT_CERT_FILE,
......
......@@ -23,7 +23,7 @@ module Backup
end
end
def wait
def finish!
@type = nil
end
......
......@@ -40,7 +40,7 @@ module Backup
raise errors.pop unless errors.empty?
ensure
strategy.wait
strategy.finish!
end
def restore
......@@ -48,7 +48,7 @@ module Backup
enqueue_consecutive
ensure
strategy.wait
strategy.finish!
cleanup_snippets_without_repositories
restore_object_pools
......
......@@ -351,7 +351,7 @@ namespace :gitlab do
if Feature.enabled?(:gitaly_backup, default_enabled: :yaml)
max_concurrency = ENV['GITLAB_BACKUP_MAX_CONCURRENCY'].presence
max_storage_concurrency = ENV['GITLAB_BACKUP_MAX_STORAGE_CONCURRENCY'].presence
Backup::GitalyBackup.new(progress, parallel: max_concurrency, parallel_storage: max_storage_concurrency)
Backup::GitalyBackup.new(progress, max_parallelism: max_concurrency, storage_parallelism: max_storage_concurrency)
else
Backup::GitalyRpcBackup.new(progress)
end
......
......@@ -3,8 +3,8 @@
require 'spec_helper'
RSpec.describe Backup::GitalyBackup do
let(:parallel) { nil }
let(:parallel_storage) { nil }
let(:max_parallelism) { nil }
let(:storage_parallelism) { nil }
let(:progress) do
Tempfile.new('progress').tap do |progress|
......@@ -23,7 +23,7 @@ RSpec.describe Backup::GitalyBackup do
progress.close
end
subject { described_class.new(progress, parallel: parallel, parallel_storage: parallel_storage) }
subject { described_class.new(progress, max_parallelism: max_parallelism, storage_parallelism: storage_parallelism) }
context 'unknown' do
it 'fails to start unknown' do
......@@ -48,7 +48,7 @@ RSpec.describe Backup::GitalyBackup do
subject.enqueue(project, Gitlab::GlRepository::DESIGN)
subject.enqueue(personal_snippet, Gitlab::GlRepository::SNIPPET)
subject.enqueue(project_snippet, Gitlab::GlRepository::SNIPPET)
subject.wait
subject.finish!
expect(File).to exist(File.join(Gitlab.config.backup.path, 'repositories', project.disk_path + '.bundle'))
expect(File).to exist(File.join(Gitlab.config.backup.path, 'repositories', project.disk_path + '.wiki.bundle'))
......@@ -58,24 +58,24 @@ RSpec.describe Backup::GitalyBackup do
end
context 'parallel option set' do
let(:parallel) { 3 }
let(:max_parallelism) { 3 }
it 'passes parallel option through' do
expect(Open3).to receive(:popen2).with(expected_env, anything, 'create', '-path', anything, '-parallel', '3').and_call_original
subject.start(:create)
subject.wait
subject.finish!
end
end
context 'parallel_storage option set' do
let(:parallel_storage) { 3 }
let(:storage_parallelism) { 3 }
it 'passes parallel option through' do
expect(Open3).to receive(:popen2).with(expected_env, anything, 'create', '-path', anything, '-parallel-storage', '3').and_call_original
subject.start(:create)
subject.wait
subject.finish!
end
end
......@@ -83,7 +83,7 @@ RSpec.describe Backup::GitalyBackup do
expect(subject).to receive(:bin_path).and_return(Gitlab::Utils.which('false'))
subject.start(:create)
expect { subject.wait }.to raise_error(::Backup::Error, 'gitaly-backup exit status 1')
expect { subject.finish! }.to raise_error(::Backup::Error, 'gitaly-backup exit status 1')
end
end
......@@ -115,7 +115,7 @@ RSpec.describe Backup::GitalyBackup do
expect(Open3).to receive(:popen2).with(ssl_env, anything, 'create', '-path', anything).and_call_original
subject.start(:create)
subject.wait
subject.finish!
end
end
end
......@@ -145,7 +145,7 @@ RSpec.describe Backup::GitalyBackup do
subject.enqueue(project, Gitlab::GlRepository::DESIGN)
subject.enqueue(personal_snippet, Gitlab::GlRepository::SNIPPET)
subject.enqueue(project_snippet, Gitlab::GlRepository::SNIPPET)
subject.wait
subject.finish!
collect_commit_shas = -> (repo) { repo.commits('master', limit: 10).map(&:sha) }
......@@ -157,24 +157,24 @@ RSpec.describe Backup::GitalyBackup do
end
context 'parallel option set' do
let(:parallel) { 3 }
let(:max_parallelism) { 3 }
it 'passes parallel option through' do
expect(Open3).to receive(:popen2).with(expected_env, anything, 'restore', '-path', anything, '-parallel', '3').and_call_original
subject.start(:restore)
subject.wait
subject.finish!
end
end
context 'parallel_storage option set' do
let(:parallel_storage) { 3 }
let(:storage_parallelism) { 3 }
it 'passes parallel option through' do
expect(Open3).to receive(:popen2).with(expected_env, anything, 'restore', '-path', anything, '-parallel-storage', '3').and_call_original
subject.start(:restore)
subject.wait
subject.finish!
end
end
......@@ -182,7 +182,7 @@ RSpec.describe Backup::GitalyBackup do
expect(subject).to receive(:bin_path).and_return(Gitlab::Utils.which('false'))
subject.start(:restore)
expect { subject.wait }.to raise_error(::Backup::Error, 'gitaly-backup exit status 1')
expect { subject.finish! }.to raise_error(::Backup::Error, 'gitaly-backup exit status 1')
end
end
end
......@@ -33,7 +33,7 @@ RSpec.describe Backup::GitalyRpcBackup do
subject.enqueue(project, Gitlab::GlRepository::DESIGN)
subject.enqueue(personal_snippet, Gitlab::GlRepository::SNIPPET)
subject.enqueue(project_snippet, Gitlab::GlRepository::SNIPPET)
subject.wait
subject.finish!
expect(File).to exist(File.join(Gitlab.config.backup.path, 'repositories', project.disk_path + '.bundle'))
expect(File).to exist(File.join(Gitlab.config.backup.path, 'repositories', project.disk_path + '.wiki.bundle'))
......@@ -52,7 +52,7 @@ RSpec.describe Backup::GitalyRpcBackup do
it 'logs an appropriate message', :aggregate_failures do
subject.start(:create)
subject.enqueue(project, Gitlab::GlRepository::PROJECT)
subject.wait
subject.finish!
expect(progress).to have_received(:puts).with("[Failed] backing up #{project.full_path} (#{project.disk_path})")
expect(progress).to have_received(:puts).with("Error Fail in tests")
......@@ -96,7 +96,7 @@ RSpec.describe Backup::GitalyRpcBackup do
subject.enqueue(project, Gitlab::GlRepository::DESIGN)
subject.enqueue(personal_snippet, Gitlab::GlRepository::SNIPPET)
subject.enqueue(project_snippet, Gitlab::GlRepository::SNIPPET)
subject.wait
subject.finish!
collect_commit_shas = -> (repo) { repo.commits('master', limit: 10).map(&:sha) }
......@@ -129,7 +129,7 @@ RSpec.describe Backup::GitalyRpcBackup do
subject.enqueue(project, Gitlab::GlRepository::DESIGN)
subject.enqueue(personal_snippet, Gitlab::GlRepository::SNIPPET)
subject.enqueue(project_snippet, Gitlab::GlRepository::SNIPPET)
subject.wait
subject.finish!
end
context 'failure' do
......@@ -143,7 +143,7 @@ RSpec.describe Backup::GitalyRpcBackup do
it 'logs an appropriate message', :aggregate_failures do
subject.start(:restore)
subject.enqueue(project, Gitlab::GlRepository::PROJECT)
subject.wait
subject.finish!
expect(progress).to have_received(:puts).with("[Failed] restoring #{project.full_path} (#{project.disk_path})")
expect(progress).to have_received(:puts).with("Error Fail in tests")
......
......@@ -25,7 +25,7 @@ RSpec.describe Backup::Repositories do
expect(strategy).to have_received(:enqueue).with(project, Gitlab::GlRepository::DESIGN)
expect(strategy).to have_received(:enqueue).with(project_snippet, Gitlab::GlRepository::SNIPPET)
expect(strategy).to have_received(:enqueue).with(personal_snippet, Gitlab::GlRepository::SNIPPET)
expect(strategy).to have_received(:wait)
expect(strategy).to have_received(:finish!)
end
end
......@@ -49,7 +49,7 @@ RSpec.describe Backup::Repositories do
projects.each do |project|
expect(strategy).to receive(:enqueue).with(project, Gitlab::GlRepository::PROJECT)
end
expect(strategy).to receive(:wait)
expect(strategy).to receive(:finish!)
subject.dump(max_concurrency: 1, max_storage_concurrency: 1)
end
......@@ -91,7 +91,7 @@ RSpec.describe Backup::Repositories do
projects.each do |project|
expect(strategy).to receive(:enqueue).with(project, Gitlab::GlRepository::PROJECT)
end
expect(strategy).to receive(:wait)
expect(strategy).to receive(:finish!)
subject.dump(max_concurrency: 2, max_storage_concurrency: 2)
end
......@@ -114,7 +114,7 @@ RSpec.describe Backup::Repositories do
projects.each do |project|
expect(strategy).to receive(:enqueue).with(project, Gitlab::GlRepository::PROJECT)
end
expect(strategy).to receive(:wait)
expect(strategy).to receive(:finish!)
subject.dump(max_concurrency: 1, max_storage_concurrency: max_storage_concurrency)
end
......@@ -128,7 +128,7 @@ RSpec.describe Backup::Repositories do
projects.each do |project|
expect(strategy).to receive(:enqueue).with(project, Gitlab::GlRepository::PROJECT)
end
expect(strategy).to receive(:wait)
expect(strategy).to receive(:finish!)
subject.dump(max_concurrency: 3, max_storage_concurrency: max_storage_concurrency)
end
......@@ -184,7 +184,7 @@ RSpec.describe Backup::Repositories do
expect(strategy).to have_received(:enqueue).with(project, Gitlab::GlRepository::DESIGN)
expect(strategy).to have_received(:enqueue).with(project_snippet, Gitlab::GlRepository::SNIPPET)
expect(strategy).to have_received(:enqueue).with(personal_snippet, Gitlab::GlRepository::SNIPPET)
expect(strategy).to have_received(:wait)
expect(strategy).to have_received(:finish!)
end
context 'restoring object pools' do
......
......@@ -432,7 +432,7 @@ RSpec.describe 'gitlab:app namespace rake task', :delete do
.with(max_concurrency: 5, max_storage_concurrency: 2)
.and_call_original
end
expect(::Backup::GitalyBackup).to receive(:new).with(anything, parallel: 5, parallel_storage: 2).and_call_original
expect(::Backup::GitalyBackup).to receive(:new).with(anything, max_parallelism: 5, storage_parallelism: 2).and_call_original
expect { run_rake_task('gitlab:backup:create') }.to output.to_stdout_from_any_process
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment