Commit c94f3da8 authored by Gabriel Mazetto's avatar Gabriel Mazetto

Geo Attachments Migration to Hashed Storage

parent 1e8b7077
module Geo
AttachmentMigrationError = Class.new(StandardError)
class HashedStorageAttachmentsMigrationService
include ::Gitlab::Geo::LogHelpers
attr_reader :project_id, :old_attachments_path, :new_attachments_path
def initialize(project_id, old_attachments_path:, new_attachments_path:)
@project_id = project_id
@old_attachments_path = old_attachments_path
@new_attachments_path = new_attachments_path
end
def async_execute
Geo::HashedStorageAttachmentsMigrationWorker.perform_async(
project_id,
old_attachments_path,
new_attachments_path
)
end
def execute
origin = File.join(CarrierWave.root, FileUploader.base_dir, old_attachments_path)
target = File.join(CarrierWave.root, FileUploader.base_dir, new_attachments_path)
move_folder!(origin, target)
end
private
def project
@project ||= Project.find(project_id)
end
def move_folder!(old_path, new_path)
unless File.directory?(old_path)
log_info("Skipped attachments migration to Hashed Storage, source path doesn't exist or is not a directory", project_id: project.id, source: old_path, target: new_path)
return
end
if File.exist?(new_path)
log_error("Cannot migrate attachments to Hashed Storage, target path already exist", project_id: project.id, source: old_path, target: new_path)
raise AttachmentMigrationError, "Target path '#{new_path}' already exist"
end
# Create hashed storage base path folder
FileUtils.mkdir_p(File.dirname(new_path))
FileUtils.mv(old_path, new_path)
log_info("Migrated project attachments to Hashed Storage", project_id: project.id, source: old_path, target: new_path)
true
end
end
end
module Geo
class HashedStorageAttachmentsMigrationWorker
include Sidekiq::Worker
include GeoQueue
def perform(project_id, old_attachments_path, new_attachments_path)
Geo::HashedStorageAttachmentsMigrationService.new(
project_id,
old_attachments_path: old_attachments_path,
new_attachments_path: new_attachments_path
).execute
end
end
end
...@@ -183,6 +183,23 @@ module Gitlab ...@@ -183,6 +183,23 @@ module Gitlab
job_id: job_id) job_id: job_id)
end end
def handle_hashed_storage_attachments_event(event, created_at)
job_id = ::Geo::HashedStorageAttachmentsMigrationService.new(
event.project_id,
old_attachments_path: event.old_attachments_path,
new_attachments_path: event.new_attachments_path
).async_execute
logger.event_info(
created_at,
message: 'Migrating attachments to hashed storage',
project_id: event.project_id,
old_attachments_path: event.old_attachments_path,
new_attachments_path: event.new_attachments_path,
job_id: job_id
)
end
def handle_lfs_object_deleted_event(event, created_at) def handle_lfs_object_deleted_event(event, created_at)
file_path = File.join(LfsObjectUploader.local_store_path, event.file_path) file_path = File.join(LfsObjectUploader.local_store_path, event.file_path)
......
...@@ -20,6 +20,10 @@ FactoryGirl.define do ...@@ -20,6 +20,10 @@ FactoryGirl.define do
hashed_storage_migrated_event factory: :geo_hashed_storage_migrated_event hashed_storage_migrated_event factory: :geo_hashed_storage_migrated_event
end end
trait :hashed_storage_attachments_event do
hashed_storage_attachments_event factory: :geo_hashed_storage_attachments_event
end
trait :lfs_object_deleted_event do trait :lfs_object_deleted_event do
lfs_object_deleted_event factory: :geo_lfs_object_deleted_event lfs_object_deleted_event factory: :geo_lfs_object_deleted_event
end end
...@@ -81,6 +85,13 @@ FactoryGirl.define do ...@@ -81,6 +85,13 @@ FactoryGirl.define do
new_storage_version { Project::LATEST_STORAGE_VERSION } new_storage_version { Project::LATEST_STORAGE_VERSION }
end end
factory :geo_hashed_storage_attachments_event, class: Geo::HashedStorageAttachmentsEvent do
project { create(:project, :repository) }
old_attachments_path { Storage::LegacyProject.new(project).disk_path }
new_attachments_path { Storage::HashedProject.new(project).disk_path }
end
factory :geo_lfs_object_deleted_event, class: Geo::LfsObjectDeletedEvent do factory :geo_lfs_object_deleted_event, class: Geo::LfsObjectDeletedEvent do
lfs_object { create(:lfs_object, :with_file) } lfs_object { create(:lfs_object, :with_file) }
......
...@@ -253,6 +253,27 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -253,6 +253,27 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
end end
end end
context 'when processing an attachment migration event to hashed storage' do
let(:event_log) { create(:geo_event_log, :hashed_storage_attachments_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:hashed_storage_attachments_event) { event_log.hashed_storage_attachments_event }
it 'does not create a new project registry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end
it 'schedules a Geo::HashedStorageAttachmentsMigrationWorker' do
project = hashed_storage_attachments_event.project
old_attachments_path = hashed_storage_attachments_event.old_attachments_path
new_attachments_path = hashed_storage_attachments_event.new_attachments_path
expect(::Geo::HashedStorageAttachmentsMigrationWorker).to receive(:perform_async)
.with(project.id, old_attachments_path, new_attachments_path)
daemon.run_once!
end
end
context 'when replaying a LFS object deleted event' do context 'when replaying a LFS object deleted event' do
let(:event_log) { create(:geo_event_log, :lfs_object_deleted_event) } let(:event_log) { create(:geo_event_log, :lfs_object_deleted_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) } let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
......
...@@ -2,9 +2,10 @@ require 'spec_helper' ...@@ -2,9 +2,10 @@ require 'spec_helper'
describe Geo::HashedStorageAttachmentsEventStore do describe Geo::HashedStorageAttachmentsEventStore do
let(:project) { create(:project, :hashed, path: 'bar') } let(:project) { create(:project, :hashed, path: 'bar') }
let(:attachments_event) { build(:geo_hashed_storage_attachments_event, project: project) }
set(:secondary_node) { create(:geo_node) } set(:secondary_node) { create(:geo_node) }
let(:old_attachments_path) { "/public/uploads/#{project.full_path}" } let(:old_attachments_path) { attachments_event.old_attachments_path }
let(:new_attachments_path) { "/public/uploads/#{project.disk_path}" } let(:new_attachments_path) {attachments_event.new_attachments_path }
subject(:event_store) { described_class.new(project, old_storage_version: 1, new_storage_version: 2, old_attachments_path: old_attachments_path, new_attachments_path: new_attachments_path) } subject(:event_store) { described_class.new(project, old_storage_version: 1, new_storage_version: 2, old_attachments_path: old_attachments_path, new_attachments_path: new_attachments_path) }
......
require 'spec_helper'
describe Geo::HashedStorageAttachmentsMigrationService do
let!(:project) { create(:project) }
let(:legacy_storage) { Storage::LegacyProject.new(project) }
let(:hashed_storage) { Storage::HashedProject.new(project) }
let!(:upload) { Upload.find_by(path: file_uploader.relative_path) }
let(:file_uploader) { build(:file_uploader, project: project) }
let(:old_path) { File.join(base_path(legacy_storage), upload.path) }
let(:new_path) { File.join(base_path(hashed_storage), upload.path) }
subject(:service) { described_class.new(project.id, old_attachments_path: legacy_storage.disk_path, new_attachments_path: hashed_storage.disk_path) }
describe '#execute' do
context 'when succeeds' do
it 'moves attachments to hashed storage layout' do
expect(File.file?(old_path)).to be_truthy
expect(File.file?(new_path)).to be_falsey
expect(File.exist?(base_path(legacy_storage))).to be_truthy
expect(File.exist?(base_path(hashed_storage))).to be_falsey
expect(FileUtils).to receive(:mv).with(base_path(legacy_storage), base_path(hashed_storage)).and_call_original
service.execute
expect(File.exist?(base_path(hashed_storage))).to be_truthy
expect(File.exist?(base_path(legacy_storage))).to be_falsey
expect(File.file?(old_path)).to be_falsey
expect(File.file?(new_path)).to be_truthy
end
end
context 'when original folder does not exist anymore' do
before do
FileUtils.rm_rf(base_path(legacy_storage))
end
it 'skips moving folders and go to next' do
expect(FileUtils).not_to receive(:mv).with(base_path(legacy_storage), base_path(hashed_storage))
service.execute
expect(File.exist?(base_path(hashed_storage))).to be_falsey
expect(File.file?(new_path)).to be_falsey
end
end
context 'when target folder already exists' do
before do
FileUtils.mkdir_p(base_path(hashed_storage))
end
it 'raises AttachmentMigrationError' do
expect(FileUtils).not_to receive(:mv).with(base_path(legacy_storage), base_path(hashed_storage))
expect { service.execute }.to raise_error(::Geo::AttachmentMigrationError)
end
end
end
describe '#async_execute' do
it 'starts the worker' do
expect(Geo::HashedStorageAttachmentsMigrationWorker).to receive(:perform_async)
service.async_execute
end
it 'returns job id' do
allow(Geo::HashedStorageAttachmentsMigrationWorker).to receive(:perform_async).and_return('foo')
expect(service.async_execute).to eq('foo')
end
end
def base_path(storage)
File.join(CarrierWave.root, FileUploader.base_dir, storage.disk_path)
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment