Handle hashed storage migration events on Geo log cursor

parent 3236bff8
module Geo
class HashedStorageMigrationService
attr_reader :project_id, :old_disk_path, :new_disk_path
def initialize(project_id, old_disk_path, new_disk_path)
@project_id = project_id
@old_disk_path = old_disk_path
@new_disk_path = new_disk_path
end
def async_execute
GeoHashedStorageMigrationWorker.perform_async(project_id, old_disk_path, new_disk_path)
end
def execute
project = Project.find(project_id)
project.expire_caches_before_rename(old_disk_path)
Geo::MoveRepositoryService.new(project, old_disk_path, new_disk_path).execute
end
end
end
...@@ -2,35 +2,26 @@ module Geo ...@@ -2,35 +2,26 @@ module Geo
class MoveRepositoryService class MoveRepositoryService
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
attr_reader :id, :old_path_with_namespace, :new_path_with_namespace attr_reader :project, :old_disk_path, :new_disk_path
def initialize(id, old_path_with_namespace, new_path_with_namespace) def initialize(project, old_disk_path, new_disk_path)
@id = id @project = project
@old_path_with_namespace = old_path_with_namespace @old_disk_path = old_disk_path
@new_path_with_namespace = new_path_with_namespace @new_disk_path = new_disk_path
end
def async_execute
GeoRepositoryMoveWorker.perform_async(id, old_path_with_namespace, new_path_with_namespace)
end end
def execute def execute
project = Project.find(id)
project.expire_caches_before_rename(old_path_with_namespace)
return true if project.hashed_storage?(:repository)
# Make sure target directory exists (used when transfering repositories) # Make sure target directory exists (used when transfering repositories)
project.ensure_storage_path_exists project.ensure_storage_path_exists
if gitlab_shell.mv_repository(project.repository_storage_path, if gitlab_shell.mv_repository(project.repository_storage_path,
old_path_with_namespace, new_path_with_namespace) old_disk_path, new_disk_path)
# If repository moved successfully we need to send update instructions to users. # If repository moved successfully we need to send update instructions to users.
# However we cannot allow rollback since we moved repository # However we cannot allow rollback since we moved repository
# So we basically we mute exceptions in next actions # So we basically we mute exceptions in next actions
begin begin
gitlab_shell.mv_repository(project.repository_storage_path, gitlab_shell.mv_repository(project.repository_storage_path,
"#{old_path_with_namespace}.wiki", "#{new_path_with_namespace}.wiki") "#{old_disk_path}.wiki", "#{new_disk_path}.wiki")
rescue rescue
# Returning false does not rollback after_* transaction but gives # Returning false does not rollback after_* transaction but gives
# us information about failing some of tasks # us information about failing some of tasks
...@@ -39,7 +30,7 @@ module Geo ...@@ -39,7 +30,7 @@ module Geo
else else
# if we cannot move namespace directory we should rollback # if we cannot move namespace directory we should rollback
# db changes in order to prevent out of sync between db and fs # db changes in order to prevent out of sync between db and fs
raise Exception.new('repository cannot be renamed') raise StandardError.new('Repository cannot be renamed')
end end
true true
......
module Geo
class RenameRepositoryService
attr_reader :project_id, :old_disk_path, :new_disk_path
def initialize(project_id, old_disk_path, new_disk_path)
@project_id = project_id
@old_disk_path = old_disk_path
@new_disk_path = new_disk_path
end
def async_execute
GeoRenameRepositoryWorker.perform_async(project_id, old_disk_path, new_disk_path)
end
def execute
project = Project.find(project_id)
project.expire_caches_before_rename(old_disk_path)
return true if project.hashed_storage?(:repository)
Geo::MoveRepositoryService.new(project, old_disk_path, new_disk_path).execute
end
end
end
class GeoHashedStorageMigrationWorker
include Sidekiq::Worker
include GeoQueue
def perform(project_id, old_disk_path, new_disk_path)
Geo::HashedStorageMigrationService.new(project_id, old_disk_path, new_disk_path).execute
end
end
class GeoRenameRepositoryWorker
include Sidekiq::Worker
include GeoQueue
def perform(project_id, old_path_with_namespace, new_path_with_namespace)
Geo::RenameRepositoryService.new(project_id, old_path_with_namespace, new_path_with_namespace).execute
end
end
class GeoRepositoryMoveWorker
include Sidekiq::Worker
include GeoQueue
def perform(id, old_path_with_namespace, new_path_with_namespace)
Geo::MoveRepositoryService.new(id, old_path_with_namespace, new_path_with_namespace).execute
end
end
...@@ -76,6 +76,8 @@ module Gitlab ...@@ -76,6 +76,8 @@ module Gitlab
handle_repositories_changed(event_log.repositories_changed_event) handle_repositories_changed(event_log.repositories_changed_event)
elsif event_log.repository_renamed_event elsif event_log.repository_renamed_event
handle_repository_renamed(event_log) handle_repository_renamed(event_log)
elsif event_log.hashed_storage_migrated_event
handle_hashed_storage_migrated(event_log)
end end
end end
end end
...@@ -182,7 +184,7 @@ module Gitlab ...@@ -182,7 +184,7 @@ module Gitlab
old_path = event.old_path_with_namespace old_path = event.old_path_with_namespace
new_path = event.new_path_with_namespace new_path = event.new_path_with_namespace
job_id = ::Geo::MoveRepositoryService job_id = ::Geo::RenameRepositoryService
.new(event.project_id, old_path, new_path) .new(event.project_id, old_path, new_path)
.async_execute .async_execute
...@@ -195,6 +197,25 @@ module Gitlab ...@@ -195,6 +197,25 @@ module Gitlab
job_id: job_id) job_id: job_id)
end end
def handle_hashed_storage_migrated(event_log)
event = event_log.hashed_storage_migrated_event
return unless event.project_id
job_id = ::Geo::HashedStorageMigrationService
.new(event.project_id, event.old_disk_path, event.new_disk_path)
.async_execute
log_event_info(
event_log.created_at,
message: 'Migrating project to hashed storage',
project_id: event.project_id,
old_storage_version: event.old_storage_version,
new_storage_version: event.new_storage_version,
old_disk_path: event.old_disk_path,
new_disk_path: event.new_disk_path,
job_id: job_id)
end
def find_or_initialize_registry(project_id, attrs) def find_or_initialize_registry(project_id, attrs)
registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id) registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
registry.assign_attributes(attrs) registry.assign_attributes(attrs)
......
...@@ -15,6 +15,10 @@ FactoryGirl.define do ...@@ -15,6 +15,10 @@ FactoryGirl.define do
trait :renamed_event do trait :renamed_event do
repository_renamed_event factory: :geo_repository_renamed_event repository_renamed_event factory: :geo_repository_renamed_event
end end
trait :hashed_storage_migration_event do
hashed_storage_migrated_event factory: :geo_hashed_storage_migrated_event
end
end end
factory :geo_repository_created_event, class: Geo::RepositoryCreatedEvent do factory :geo_repository_created_event, class: Geo::RepositoryCreatedEvent do
......
...@@ -197,25 +197,45 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do ...@@ -197,25 +197,45 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do
context 'when processing a repository renamed event' do context 'when processing a repository renamed event' do
let(:event_log) { create(:geo_event_log, :renamed_event) } let(:event_log) { create(:geo_event_log, :renamed_event) }
let(:project) { event_log.repository_rename_event.project }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) } let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:repository_rename_event) { event_log.repository_renamed_event } let(:repository_renamed_event) { event_log.repository_renamed_event }
it 'does not create a new project registry' do it 'does not create a new project registry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count) expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end end
it 'schedules a GeoRepositoryDestroyWorker' do it 'schedules a GeoRenameRepositoryWorker' do
project_id = repository_rename_event.project_id project_id = repository_renamed_event.project_id
old_path_with_namespace = repository_rename_event.old_path_with_namespace old_path_with_namespace = repository_renamed_event.old_path_with_namespace
new_path_with_namespace = repository_rename_event.new_path_with_namespace new_path_with_namespace = repository_renamed_event.new_path_with_namespace
expect(::GeoRepositoryMoveWorker).to receive(:perform_async) expect(::GeoRenameRepositoryWorker).to receive(:perform_async)
.with(project_id, old_path_with_namespace, new_path_with_namespace) .with(project_id, old_path_with_namespace, new_path_with_namespace)
daemon.run_once! daemon.run_once!
end end
end end
context 'when processing a hashed storage migration event' do
let(:event_log) { create(:geo_event_log, :hashed_storage_migration_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:hashed_storage_migrated_event) { event_log.hashed_storage_migrated_event }
it 'does not create a new project registry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end
it 'schedules a GeoHashedStorageMigrationWorker' do
project_id = hashed_storage_migrated_event.project_id
old_disk_path = hashed_storage_migrated_event.old_disk_path
new_disk_path = hashed_storage_migrated_event.new_disk_path
expect(::GeoHashedStorageMigrationWorker).to receive(:perform_async)
.with(project_id, old_disk_path, new_disk_path)
daemon.run_once!
end
end
end end
describe '#full_scan!' do describe '#full_scan!' do
......
require 'spec_helper'
describe Geo::HashedStorageMigrationService do
let(:project) { create(:project, :repository) }
let(:new_path) { "#{project.full_path}+renamed" }
describe '#execute' do
it 'moves project backed by legacy storage' do
service = described_class.new(project.id, project.full_path, new_path)
expect_any_instance_of(Geo::MoveRepositoryService).to receive(:execute).once
service.execute
end
it 'moves project backed by hashed storage' do
project_hashed_storage = create(:project, :hashed)
service = described_class.new(project_hashed_storage.id, project_hashed_storage.full_path, new_path)
expect_any_instance_of(Geo::MoveRepositoryService).to receive(:execute).once
service.execute
end
end
describe '#async_execute' do
subject(:service) { described_class.new(project.id, project.full_path, new_path) }
it 'starts the worker' do
expect(GeoHashedStorageMigrationWorker).to receive(:perform_async)
service.async_execute
end
it 'returns job id' do
allow(GeoHashedStorageMigrationWorker).to receive(:perform_async).and_return('foo')
expect(service.async_execute).to eq('foo')
end
end
end
...@@ -5,44 +5,16 @@ describe Geo::MoveRepositoryService do ...@@ -5,44 +5,16 @@ describe Geo::MoveRepositoryService do
let(:new_path) { "#{project.full_path}+renamed" } let(:new_path) { "#{project.full_path}+renamed" }
describe '#execute' do describe '#execute' do
it 'moves project backed by legacy storage' do it 'renames the path' do
old_path = project.repository.path_to_repo old_path = project.repository.path_to_repo
full_new_path = File.join(project.repository_storage_path, new_path) full_new_path = File.join(project.repository_storage_path, new_path)
service = described_class.new(project.id, project.full_path, new_path) service = described_class.new(project, project.full_path, new_path)
expect(File.directory?(old_path)).to be_truthy expect(File.directory?(old_path)).to be_truthy
expect(service.execute).to eq(true) expect(service.execute).to eq(true)
expect(File.directory?(old_path)).to be_falsey expect(File.directory?(old_path)).to be_falsey
expect(File.directory?("#{full_new_path}.git")).to be_truthy expect(File.directory?("#{full_new_path}.git")).to be_truthy
end end
it 'does not move project backed by hashed storage' do
project_hashed_storage = create(:project, :hashed)
gitlab_shell = Gitlab::Shell.new
service = described_class.new(project_hashed_storage.id, project_hashed_storage.full_path, new_path)
allow(service).to receive(:gitlab_shell).and_return(gitlab_shell)
expect(service.execute).to eq(true)
expect(gitlab_shell).not_to receive(:mv_repository)
end
end
describe '#async_execute' do
subject(:service) { described_class.new(project.id, project.full_path, new_path) }
it 'starts the worker' do
expect(GeoRepositoryMoveWorker).to receive(:perform_async)
service.async_execute
end
it 'returns job id' do
allow(GeoRepositoryMoveWorker).to receive(:perform_async).and_return('foo')
expect(service.async_execute).to eq('foo')
end
end end
end end
require 'spec_helper'
describe Geo::RenameRepositoryService do
let(:project) { create(:project, :repository) }
let(:new_path) { "#{project.full_path}+renamed" }
describe '#execute' do
it 'moves project backed by legacy storage' do
service = described_class.new(project.id, project.full_path, new_path)
expect_any_instance_of(Geo::MoveRepositoryService).to receive(:execute).once
service.execute
end
it 'does not move project backed by hashed storage' do
project_hashed_storage = create(:project, :hashed)
service = described_class.new(project_hashed_storage.id, project_hashed_storage.full_path, new_path)
expect_any_instance_of(Geo::MoveRepositoryService).not_to receive(:execute)
service.execute
end
end
describe '#async_execute' do
subject(:service) { described_class.new(project.id, project.full_path, new_path) }
it 'starts the worker' do
expect(GeoRenameRepositoryWorker).to receive(:perform_async)
service.async_execute
end
it 'returns job id' do
allow(GeoRenameRepositoryWorker).to receive(:perform_async).and_return('foo')
expect(service.async_execute).to eq('foo')
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment