Commit 876f74b7 authored by Michael Kozono's avatar Michael Kozono

Merge branch '10067-geo-migrate-lfs-objects-into-their-own-registry-table' into 'master'

Geo - Migrate LFS objects into their own registry table

See merge request gitlab-org/gitlab!18415
parents 8497ad15 7b21f05b
...@@ -12,18 +12,19 @@ ...@@ -12,18 +12,19 @@
ActiveSupport::Inflector.inflections do |inflect| ActiveSupport::Inflector.inflections do |inflect|
inflect.uncountable %w( inflect.uncountable %w(
award_emoji award_emoji
project_statistics container_repository_registry
system_note_metadata design_registry
event_log event_log
project_auto_devops
project_registry
file_registry file_registry
group_view
job_artifact_registry job_artifact_registry
container_repository_registry lfs_object_registry
design_registry project_auto_devops
vulnerability_feedback project_registry
project_statistics
system_note_metadata
vulnerabilities_feedback vulnerabilities_feedback
group_view vulnerability_feedback
) )
inflect.acronym 'EE' inflect.acronym 'EE'
end end
...@@ -5,7 +5,7 @@ module Geo ...@@ -5,7 +5,7 @@ module Geo
# Counts all existing registries independent # Counts all existing registries independent
# of any change on filters / selective sync # of any change on filters / selective sync
def count_registry def count_registry
Geo::FileRegistry.lfs_objects.count Geo::LfsObjectRegistry.count
end end
def count_syncable def count_syncable
...@@ -33,47 +33,47 @@ module Geo ...@@ -33,47 +33,47 @@ module Geo
# Find limited amount of non replicated lfs objects. # Find limited amount of non replicated lfs objects.
# #
# You can pass a list with `except_file_ids:` so you can exclude items you # You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet # already scheduled but haven't finished and aren't persisted to the database yet
# #
# @param [Integer] batch_size used to limit the results returned # @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_file_ids ids that will be ignored from the query # @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_file_ids: []) def find_unsynced(batch_size:, except_ids: [])
lfs_objects lfs_objects
.missing_file_registry .missing_file_registry
.id_not_in(except_file_ids) .id_not_in(except_ids)
.limit(batch_size) .limit(batch_size)
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def find_migrated_local(batch_size:, except_file_ids: []) def find_migrated_local(batch_size:, except_ids: [])
all_lfs_objects all_lfs_objects
.inner_join_file_registry .inner_join_lfs_object_registry
.with_files_stored_remotely .with_files_stored_remotely
.id_not_in(except_file_ids) .id_not_in(except_ids)
.limit(batch_size) .limit(batch_size)
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def find_retryable_failed_registries(batch_size:, except_file_ids: []) def find_retryable_failed_registries(batch_size:, except_ids: [])
registries_for_lfs_objects registries_for_lfs_objects
.merge(Geo::FileRegistry.failed) .merge(Geo::LfsObjectRegistry.failed)
.merge(Geo::FileRegistry.retry_due) .merge(Geo::LfsObjectRegistry.retry_due)
.file_id_not_in(except_file_ids) .lfs_object_id_not_in(except_ids)
.limit(batch_size) .limit(batch_size)
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def find_retryable_synced_missing_on_primary_registries(batch_size:, except_file_ids: []) def find_retryable_synced_missing_on_primary_registries(batch_size:, except_ids: [])
registries_for_lfs_objects registries_for_lfs_objects
.synced .synced
.missing_on_primary .missing_on_primary
.retry_due .retry_due
.file_id_not_in(except_file_ids) .lfs_object_id_not_in(except_ids)
.limit(batch_size) .limit(batch_size)
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
......
...@@ -39,7 +39,7 @@ module Geo ...@@ -39,7 +39,7 @@ module Geo
end end
def lfs_object_registries def lfs_object_registries
return Geo::FileRegistry.lfs_objects unless selective_sync? return Geo::LfsObjectRegistry.all unless selective_sync?
Gitlab::Geo::Fdw::LfsObjectRegistryQueryBuilder.new Gitlab::Geo::Fdw::LfsObjectRegistryQueryBuilder.new
.for_lfs_objects(lfs_objects) .for_lfs_objects(lfs_objects)
......
...@@ -17,49 +17,45 @@ module Geo ...@@ -17,49 +17,45 @@ module Geo
class << self class << self
def failed def failed
inner_join_file_registry inner_join_lfs_object_registry
.merge(Geo::FileRegistry.failed) .merge(Geo::LfsObjectRegistry.failed)
end end
def inner_join_file_registry def inner_join_lfs_object_registry
join_statement = join_statement =
arel_table arel_table
.join(file_registry_table, Arel::Nodes::InnerJoin) .join(lfs_object_registry_table, Arel::Nodes::InnerJoin)
.on(arel_table[:id].eq(file_registry_table[:file_id])) .on(arel_table[:id].eq(lfs_object_registry_table[:lfs_object_id]))
joins(join_statement.join_sources) joins(join_statement.join_sources)
.merge(Geo::FileRegistry.lfs_objects)
end end
def missing_file_registry def missing_file_registry
left_outer_join_file_registry left_outer_join_lfs_object_registry
.where(file_registry_table[:id].eq(nil)) .where(lfs_object_registry_table[:id].eq(nil))
end end
def missing_on_primary def missing_on_primary
inner_join_file_registry inner_join_lfs_object_registry
.merge(Geo::FileRegistry.synced.missing_on_primary) .merge(Geo::LfsObjectRegistry.synced.missing_on_primary)
end end
def synced def synced
inner_join_file_registry inner_join_lfs_object_registry
.merge(Geo::FileRegistry.synced) .merge(Geo::LfsObjectRegistry.synced)
end end
private private
def file_registry_table def lfs_object_registry_table
Geo::FileRegistry.arel_table Geo::LfsObjectRegistry.arel_table
end end
def left_outer_join_file_registry def left_outer_join_lfs_object_registry
join_statement = join_statement =
arel_table arel_table
.join(file_registry_table, Arel::Nodes::OuterJoin) .join(lfs_object_registry_table, Arel::Nodes::OuterJoin)
.on( .on(arel_table[:id].eq(lfs_object_registry_table[:lfs_object_id]))
arel_table[:id].eq(file_registry_table[:file_id])
.and(file_registry_table[:file_type].eq(:lfs))
)
joins(join_statement.join_sources) joins(join_statement.join_sources)
end end
......
...@@ -6,16 +6,13 @@ class Geo::FileRegistry < Geo::BaseRegistry ...@@ -6,16 +6,13 @@ class Geo::FileRegistry < Geo::BaseRegistry
scope :attachments, -> { where(file_type: Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES) } scope :attachments, -> { where(file_type: Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES) }
scope :failed, -> { where(success: false).where.not(retry_count: nil) } scope :failed, -> { where(success: false).where.not(retry_count: nil) }
scope :fresh, -> { order(created_at: :desc) } scope :fresh, -> { order(created_at: :desc) }
scope :lfs_objects, -> { where(file_type: :lfs) }
scope :never, -> { where(success: false, retry_count: nil) } scope :never, -> { where(success: false, retry_count: nil) }
scope :uploads, -> { where(file_type: Gitlab::Geo::Replication::UPLOAD_OBJECT_TYPE) } scope :uploads, -> { where(file_type: Gitlab::Geo::Replication::UPLOAD_OBJECT_TYPE) }
self.inheritance_column = 'file_type' self.inheritance_column = 'file_type'
def self.find_sti_class(file_type) def self.find_sti_class(file_type)
if file_type == 'lfs' if Gitlab::Geo::Replication.object_type_from_user_uploads?(file_type)
Geo::LfsObjectRegistry
elsif Gitlab::Geo::Replication.object_type_from_user_uploads?(file_type)
Geo::UploadRegistry Geo::UploadRegistry
end end
end end
......
# frozen_string_literal: true # frozen_string_literal: true
class Geo::LfsObjectRegistry < Geo::FileRegistry class Geo::LfsObjectRegistry < Geo::BaseRegistry
belongs_to :lfs_object, foreign_key: :file_id, class_name: 'LfsObject' include ::ShaAttribute
include ::Geo::Syncable
def self.sti_name sha_attribute :sha256
'lfs'
belongs_to :lfs_object, class_name: 'LfsObject'
def self.lfs_object_id_in(ids)
where(lfs_object_id: ids)
end
def self.lfs_object_id_not_in(ids)
where.not(lfs_object_id: ids)
end end
end end
...@@ -6,6 +6,8 @@ module Geo ...@@ -6,6 +6,8 @@ module Geo
# * Executing the Downloader # * Executing the Downloader
# * Marking the FileRegistry record as synced or needing retry # * Marking the FileRegistry record as synced or needing retry
class FileDownloadService < BaseFileService class FileDownloadService < BaseFileService
include Gitlab::Utils::StrongMemoize
LEASE_TIMEOUT = 8.hours.freeze LEASE_TIMEOUT = 8.hours.freeze
include Delay include Delay
...@@ -54,17 +56,20 @@ module Geo ...@@ -54,17 +56,20 @@ module Geo
end end
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def update_registry(bytes_downloaded, mark_as_synced:, missing_on_primary: false) def registry
registry = strong_memoize(:registry) do
if job_artifact? if job_artifact?
Geo::JobArtifactRegistry.find_or_initialize_by(artifact_id: object_db_id) Geo::JobArtifactRegistry.find_or_initialize_by(artifact_id: object_db_id)
elsif lfs?
Geo::LfsObjectRegistry.find_or_initialize_by(lfs_object_id: object_db_id)
else else
Geo::FileRegistry.find_or_initialize_by( Geo::FileRegistry.find_or_initialize_by(file_type: object_type, file_id: object_db_id)
file_type: object_type, end
file_id: object_db_id end
)
end end
# rubocop: enable CodeReuse/ActiveRecord
def update_registry(bytes_downloaded, mark_as_synced:, missing_on_primary: false)
registry.bytes = bytes_downloaded registry.bytes = bytes_downloaded
registry.success = mark_as_synced registry.success = mark_as_synced
registry.missing_on_primary = missing_on_primary registry.missing_on_primary = missing_on_primary
...@@ -82,7 +87,6 @@ module Geo ...@@ -82,7 +87,6 @@ module Geo
registry.save registry.save
end end
# rubocop: enable CodeReuse/ActiveRecord
def lease_key def lease_key
"file_download_service:#{object_type}:#{object_db_id}" "file_download_service:#{object_type}:#{object_db_id}"
......
...@@ -48,6 +48,8 @@ module Geo ...@@ -48,6 +48,8 @@ module Geo
strong_memoize(:file_registry) do strong_memoize(:file_registry) do
if job_artifact? if job_artifact?
::Geo::JobArtifactRegistry.find_by(artifact_id: object_db_id) ::Geo::JobArtifactRegistry.find_by(artifact_id: object_db_id)
elsif lfs?
::Geo::LfsObjectRegistry.find_by(lfs_object_id: object_db_id)
else else
::Geo::FileRegistry.find_by(file_type: object_type, file_id: object_db_id) ::Geo::FileRegistry.find_by(file_type: object_type, file_id: object_db_id)
end end
......
...@@ -3,8 +3,8 @@ ...@@ -3,8 +3,8 @@
module Geo module Geo
class FileDownloadDispatchWorker class FileDownloadDispatchWorker
class LfsObjectJobFinder < JobFinder class LfsObjectJobFinder < JobFinder
RESOURCE_ID_KEY = :file_id RESOURCE_ID_KEY = :lfs_object_id
EXCEPT_RESOURCE_IDS_KEY = :except_file_ids EXCEPT_RESOURCE_IDS_KEY = :except_ids
FILE_SERVICE_OBJECT_TYPE = :lfs FILE_SERVICE_OBJECT_TYPE = :lfs
def registry_finder def registry_finder
......
...@@ -48,7 +48,7 @@ module Geo ...@@ -48,7 +48,7 @@ module Geo
def find_migrated_local_lfs_objects_ids(batch_size:) def find_migrated_local_lfs_objects_ids(batch_size:)
return [] unless lfs_objects_object_store_enabled? return [] unless lfs_objects_object_store_enabled?
lfs_objects_finder.find_migrated_local(batch_size: batch_size, except_file_ids: scheduled_file_ids(:lfs)) lfs_objects_finder.find_migrated_local(batch_size: batch_size, except_ids: scheduled_file_ids(:lfs))
.pluck(:id) .pluck(:id)
.map { |id| ['lfs', id] } .map { |id| ['lfs', id] }
end end
......
# frozen_string_literal: true
class CreateLfsObjectRegistry < ActiveRecord::Migration[5.2]
DOWNTIME = false
def change
create_table :lfs_object_registry, force: :cascade do |t|
t.datetime_with_timezone :created_at
t.datetime_with_timezone :retry_at
t.integer :bytes, limit: 8
t.integer :lfs_object_id
t.integer :retry_count
t.boolean :missing_on_primary, default: false, null: false
t.boolean :success, default: false, null: false
t.binary :sha256
t.index :lfs_object_id, unique: true
t.index :retry_at
t.index :success
end
end
end
# frozen_string_literal: true
class MigrateLfsObjectsToSeparateRegistry < ActiveRecord::Migration[5.2]
DOWNTIME = false
def up
Geo::TrackingBase.transaction do
execute('LOCK TABLE file_registry IN EXCLUSIVE MODE')
execute <<~EOF
INSERT INTO lfs_object_registry (created_at, retry_at, lfs_object_id, bytes, retry_count, missing_on_primary, success, sha256)
SELECT created_at, retry_at, file_id, bytes, retry_count, missing_on_primary, success, sha256::bytea
FROM file_registry WHERE file_type = 'lfs'
EOF
execute <<~EOF
CREATE OR REPLACE FUNCTION replicate_lfs_object_registry()
RETURNS trigger AS
$BODY$
BEGIN
IF (TG_OP = 'UPDATE') THEN
UPDATE lfs_object_registry
SET (retry_at, bytes, retry_count, missing_on_primary, success, sha256) =
(NEW.retry_at, NEW.bytes, NEW.retry_count, NEW.missing_on_primary, NEW.success, NEW.sha256::bytea)
WHERE lfs_object_id = NEW.file_id;
ELSEIF (TG_OP = 'INSERT') THEN
INSERT INTO lfs_object_registry (created_at, retry_at, lfs_object_id, bytes, retry_count, missing_on_primary, success, sha256)
VALUES (NEW.created_at, NEW.retry_at, NEW.file_id, NEW.bytes, NEW.retry_count, NEW.missing_on_primary, NEW.success, NEW.sha256::bytea);
END IF;
RETURN NEW;
END;
$BODY$
LANGUAGE 'plpgsql'
VOLATILE;
EOF
execute <<~EOF
CREATE TRIGGER replicate_lfs_object_registry
AFTER INSERT OR UPDATE ON file_registry
FOR EACH ROW WHEN (NEW.file_type = 'lfs') EXECUTE PROCEDURE replicate_lfs_object_registry();
EOF
end
end
def down
execute("DELETE FROM lfs_object_registry WHERE ID IN (SELECT file_id FROM file_registry WHERE file_type = 'lfs')")
execute("DROP TRIGGER IF EXISTS replicate_lfs_object_registry ON file_registry")
execute("DROP FUNCTION IF EXISTS replicate_lfs_object_registry()")
end
end
# frozen_string_literal: true
class DeleteLfsObjectsFromFileRegistry < ActiveRecord::Migration[5.2]
DOWNTIME = false
def up
execute("DELETE FROM file_registry WHERE file_type = 'lfs'")
execute('DROP TRIGGER IF EXISTS replicate_lfs_object_registry ON file_registry')
execute('DROP FUNCTION IF EXISTS replicate_lfs_object_registry()')
end
def down
# no-op
end
end
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
# #
# It's strongly recommended that you check this file into your version control system. # It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 2019_10_07_122326) do ActiveRecord::Schema.define(version: 2019_10_25_194337) do
# These are extensions that must be enabled in order to support this database # These are extensions that must be enabled in order to support this database
enable_extension "plpgsql" enable_extension "plpgsql"
...@@ -77,6 +77,20 @@ ActiveRecord::Schema.define(version: 2019_10_07_122326) do ...@@ -77,6 +77,20 @@ ActiveRecord::Schema.define(version: 2019_10_07_122326) do
t.index ["success"], name: "index_job_artifact_registry_on_success" t.index ["success"], name: "index_job_artifact_registry_on_success"
end end
create_table "lfs_object_registry", force: :cascade do |t|
t.datetime_with_timezone "created_at"
t.datetime_with_timezone "retry_at"
t.bigint "bytes"
t.integer "lfs_object_id"
t.integer "retry_count"
t.boolean "missing_on_primary", default: false, null: false
t.boolean "success", default: false, null: false
t.binary "sha256"
t.index ["lfs_object_id"], name: "index_lfs_object_registry_on_lfs_object_id", unique: true
t.index ["retry_at"], name: "index_lfs_object_registry_on_retry_at"
t.index ["success"], name: "index_lfs_object_registry_on_success"
end
create_table "project_registry", id: :serial, force: :cascade do |t| create_table "project_registry", id: :serial, force: :cascade do |t|
t.integer "project_id", null: false t.integer "project_id", null: false
t.datetime "last_repository_synced_at" t.datetime "last_repository_synced_at"
......
...@@ -14,23 +14,22 @@ module Gitlab ...@@ -14,23 +14,22 @@ module Gitlab
class Fdw class Fdw
class LfsObjectRegistryQueryBuilder < BaseQueryBuilder class LfsObjectRegistryQueryBuilder < BaseQueryBuilder
# rubocop:disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def for_lfs_objects(file_ids) def for_lfs_objects(lfs_object_ids)
query query
.joins(fdw_inner_join_lfs_objects) .joins(fdw_inner_join_lfs_objects)
.file_id_in(file_ids) .lfs_object_id_in(lfs_object_ids)
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
private private
def base def base
::Geo::FileRegistry ::Geo::LfsObjectRegistry
.select(file_registry_table[Arel.star]) .select(lfs_registry_table[Arel.star])
.lfs_objects
end end
def file_registry_table def lfs_registry_table
::Geo::FileRegistry.arel_table ::Geo::LfsObjectRegistry.arel_table
end end
def fdw_lfs_object_table def fdw_lfs_object_table
...@@ -38,9 +37,9 @@ module Gitlab ...@@ -38,9 +37,9 @@ module Gitlab
end end
def fdw_inner_join_lfs_objects def fdw_inner_join_lfs_objects
file_registry_table lfs_registry_table
.join(fdw_lfs_object_table, Arel::Nodes::InnerJoin) .join(fdw_lfs_object_table, Arel::Nodes::InnerJoin)
.on(file_registry_table[:file_id].eq(fdw_lfs_object_table[:id])) .on(lfs_registry_table[:lfs_object_id].eq(fdw_lfs_object_table[:id]))
.join_sources .join_sources
end end
end end
......
...@@ -9,7 +9,6 @@ FactoryBot.define do ...@@ -9,7 +9,6 @@ FactoryBot.define do
trait(:attachment) { file_type { :attachment } } trait(:attachment) { file_type { :attachment } }
trait(:avatar) { file_type { :avatar } } trait(:avatar) { file_type { :avatar } }
trait(:file) { file_type { :file } } trait(:file) { file_type { :file } }
trait(:lfs) { file_type { :lfs } }
trait(:namespace_file) { file_type { :namespace_file } } trait(:namespace_file) { file_type { :namespace_file } }
trait(:personal_file) { file_type { :personal_file } } trait(:personal_file) { file_type { :personal_file } }
trait(:favicon) { file_type { :favicon } } trait(:favicon) { file_type { :favicon } }
...@@ -26,7 +25,7 @@ FactoryBot.define do ...@@ -26,7 +25,7 @@ FactoryBot.define do
after(:build, :stub) do |registry, _| after(:build, :stub) do |registry, _|
file = file =
if registry.file_type.to_sym == :lfs if registry.file_type.to_sym == :lfs
create(:lfs_object) raise NotImplementedError, 'Use create(:geo_lfs_object_registry, :with_lfs_object) instead'
elsif registry.file_type.to_sym == :job_artifact elsif registry.file_type.to_sym == :job_artifact
raise NotImplementedError, 'Use create(:geo_job_artifact_registry, :with_artifact) instead' raise NotImplementedError, 'Use create(:geo_job_artifact_registry, :with_artifact) instead'
else else
......
# frozen_string_literal: true
FactoryBot.define do
factory :geo_lfs_object_registry, class: Geo::LfsObjectRegistry do
sequence(:lfs_object_id)
success { true }
trait :failed do
success { false }
retry_count { 1 }
end
trait :with_lfs_object do
after(:build, :stub) do |registry, _|
lfs_object = create(:lfs_object)
registry.lfs_object_id = lfs_object.id
end
end
end
end
...@@ -17,13 +17,13 @@ describe Gitlab::Geo::LogCursor::Events::LfsObjectDeletedEvent, :clean_gitlab_re ...@@ -17,13 +17,13 @@ describe Gitlab::Geo::LogCursor::Events::LfsObjectDeletedEvent, :clean_gitlab_re
describe '#process' do describe '#process' do
it 'does not create a tracking database entry' do it 'does not create a tracking database entry' do
expect { subject.process }.not_to change(Geo::FileRegistry, :count) expect { subject.process }.not_to change(Geo::LfsObjectRegistry, :count)
end end
it 'removes the tracking database entry if exist' do it 'removes the tracking database entry if exist' do
create(:geo_file_registry, :lfs, file_id: lfs_object.id) create(:geo_lfs_object_registry, lfs_object_id: lfs_object.id)
expect { subject.process }.to change(Geo::FileRegistry.lfs_objects, :count).by(-1) expect { subject.process }.to change(Geo::LfsObjectRegistry, :count).by(-1)
end end
it 'schedules a Geo::FileRegistryRemovalWorker job' do it 'schedules a Geo::FileRegistryRemovalWorker job' do
......
# frozen_string_literal: true
require 'spec_helper'
require Rails.root.join('ee', 'db', 'geo', 'post_migrate', '20191010204941_migrate_lfs_objects_to_separate_registry.rb')
describe MigrateLfsObjectsToSeparateRegistry, :geo, :migration do
let(:file_registry) { table(:file_registry) }
let(:lfs_object_registry) { table(:lfs_object_registry) }
before do
file_registry.create!(file_id: 1, file_type: 'lfs', success: true, bytes: 1024, sha256: '0' * 64)
file_registry.create!(file_id: 2, file_type: 'lfs', success: false, bytes: 2048, sha256: '1' * 64)
file_registry.create!(file_id: 3, file_type: 'attachment', success: true)
file_registry.create!(file_id: 4, file_type: 'lfs', success: false, bytes: 4096, sha256: '2' * 64)
end
describe '#up' do
it 'migrates all file registries for LFS objects to its own data table' do
expect(file_registry.all.count).to eq(4)
migrate!
expect(file_registry.all.count).to eq(4)
expect(lfs_object_registry.all.count).to eq(3)
expect(lfs_object_registry.where(lfs_object_id: 1, success: true, bytes: 1024, sha256: '0' * 64).count).to eq(1)
expect(lfs_object_registry.where(lfs_object_id: 2, success: false, bytes: 2048, sha256: '1' * 64).count).to eq(1)
expect(lfs_object_registry.where(lfs_object_id: 4, success: false, bytes: 4096, sha256: '2' * 64).count).to eq(1)
expect(file_registry.where(file_id: 3, file_type: 'attachment', success: true).count).to eq(1)
end
it 'creates a new lfs object registry with the trigger' do
migrate!
expect(lfs_object_registry.all.count).to eq(3)
file_registry.create!(file_id: 5, file_type: 'lfs', success: true, bytes: 8192, sha256: '3' * 64)
expect(lfs_object_registry.all.count).to eq(4)
expect(lfs_object_registry.where(lfs_object_id: 5, success: true, bytes: 8192, sha256: '3' * 64).count).to eq(1)
end
it 'updates a new lfs object with the trigger' do
migrate!
expect(lfs_object_registry.all.count).to eq(3)
entry = file_registry.find_by(file_id: 1)
entry.update(success: false, bytes: 10240, sha256: '10' * 64)
expect(lfs_object_registry.where(lfs_object_id: 1, success: false, bytes: 10240, sha256: '10' * 64).count).to eq(1)
# Ensure that *only* the correct lfs object is updated
expect(lfs_object_registry.find_by(lfs_object_id: 2).bytes).to eq(2048)
end
it 'creates a new lfs object using the next ID' do
migrate!
max_id = lfs_object_registry.maximum(:id)
last_id = lfs_object_registry.create!(lfs_object_id: 5, success: true).id
expect(last_id - max_id).to eq(1)
end
end
describe '#down' do
it 'rolls back data properly' do
migrate!
expect(file_registry.all.count).to eq(4)
expect(lfs_object_registry.all.count).to eq(3)
schema_migrate_down!
expect(file_registry.all.count).to eq(4)
expect(file_registry.where(file_type: 'attachment').count).to eq(1)
expect(file_registry.where(file_type: 'lfs').count).to eq(3)
expect(file_registry.where(file_type: 'lfs', bytes: 1024, sha256: '0' * 64).count).to eq(1)
expect(file_registry.where(file_type: 'lfs', bytes: 2048, sha256: '1' * 64).count).to eq(1)
expect(file_registry.where(file_type: 'lfs', bytes: 4096, sha256: '2' * 64).count).to eq(1)
end
end
end
...@@ -3,14 +3,7 @@ ...@@ -3,14 +3,7 @@
require 'spec_helper' require 'spec_helper'
describe Geo::LfsObjectRegistry, :geo do describe Geo::LfsObjectRegistry, :geo do
set(:lfs_registry) { create(:geo_file_registry, :lfs, :with_file) } describe 'relationships' do
set(:attachment_registry) { create(:geo_file_registry, :attachment) } it { is_expected.to belong_to(:lfs_object).class_name('LfsObject') }
it 'only finds lfs registries' do
expect(described_class.all).to match_ids(lfs_registry)
end
it 'finds associated LfsObject record' do
expect(described_class.find(lfs_registry.id).lfs_object).to be_an_instance_of(LfsObject)
end end
end end
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
require 'spec_helper' require 'spec_helper'
describe Geo::UploadRegistry, :geo, :geo_fdw do describe Geo::UploadRegistry, :geo, :geo_fdw do
let!(:lfs_registry) { create(:geo_file_registry, :lfs) } let!(:lfs_registry) { create(:geo_lfs_object_registry) }
let!(:attachment_registry) { create(:geo_file_registry, :attachment, :with_file) } let!(:attachment_registry) { create(:geo_file_registry, :attachment, :with_file) }
let!(:avatar_registry) { create(:geo_file_registry, :avatar) } let!(:avatar_registry) { create(:geo_file_registry, :avatar) }
let!(:file_registry) { create(:geo_file_registry, :file) } let!(:file_registry) { create(:geo_file_registry, :file) }
......
...@@ -189,7 +189,7 @@ describe GeoNodeStatus, :geo, :geo_fdw do ...@@ -189,7 +189,7 @@ describe GeoNodeStatus, :geo, :geo_fdw do
describe '#attachments_failed_count' do describe '#attachments_failed_count' do
it 'counts failed avatars, attachment, personal snippets and files' do it 'counts failed avatars, attachment, personal snippets and files' do
# These two should be ignored # These two should be ignored
create(:geo_file_registry, :lfs, :with_file, :failed) create(:geo_lfs_object_registry, :with_lfs_object, :failed)
create(:geo_file_registry, :with_file) create(:geo_file_registry, :with_file)
create(:geo_file_registry, :with_file, :failed, file_type: :personal_file) create(:geo_file_registry, :with_file, :failed, file_type: :personal_file)
...@@ -253,9 +253,9 @@ describe GeoNodeStatus, :geo, :geo_fdw do ...@@ -253,9 +253,9 @@ describe GeoNodeStatus, :geo, :geo_fdw do
create(:geo_file_registry, :failed) create(:geo_file_registry, :failed)
create(:geo_file_registry, :avatar) create(:geo_file_registry, :avatar)
create(:geo_file_registry, file_type: :attachment) create(:geo_file_registry, file_type: :attachment)
create(:geo_file_registry, :lfs, :with_file, :failed) create(:geo_lfs_object_registry, :with_lfs_object, :failed)
create(:geo_file_registry, :lfs, :with_file) create(:geo_lfs_object_registry, :with_lfs_object)
expect(subject.lfs_objects_synced_count).to eq(1) expect(subject.lfs_objects_synced_count).to eq(1)
end end
...@@ -267,9 +267,9 @@ describe GeoNodeStatus, :geo, :geo_fdw do ...@@ -267,9 +267,9 @@ describe GeoNodeStatus, :geo, :geo_fdw do
create(:geo_file_registry, :failed) create(:geo_file_registry, :failed)
create(:geo_file_registry, :avatar, missing_on_primary: true) create(:geo_file_registry, :avatar, missing_on_primary: true)
create(:geo_file_registry, file_type: :attachment, missing_on_primary: true) create(:geo_file_registry, file_type: :attachment, missing_on_primary: true)
create(:geo_file_registry, :lfs, :with_file, :failed) create(:geo_lfs_object_registry, :with_lfs_object, :failed)
create(:geo_file_registry, :lfs, :with_file, missing_on_primary: true) create(:geo_lfs_object_registry, :with_lfs_object, missing_on_primary: true)
expect(subject.lfs_objects_synced_missing_on_primary_count).to eq(1) expect(subject.lfs_objects_synced_missing_on_primary_count).to eq(1)
end end
...@@ -281,9 +281,9 @@ describe GeoNodeStatus, :geo, :geo_fdw do ...@@ -281,9 +281,9 @@ describe GeoNodeStatus, :geo, :geo_fdw do
create(:geo_file_registry, :failed) create(:geo_file_registry, :failed)
create(:geo_file_registry, :avatar, :failed) create(:geo_file_registry, :avatar, :failed)
create(:geo_file_registry, :failed, file_type: :attachment) create(:geo_file_registry, :failed, file_type: :attachment)
create(:geo_file_registry, :lfs, :with_file) create(:geo_lfs_object_registry, :with_lfs_object)
create(:geo_file_registry, :lfs, :with_file, :failed) create(:geo_lfs_object_registry, :with_lfs_object, :failed)
expect(subject.lfs_objects_failed_count).to eq(1) expect(subject.lfs_objects_failed_count).to eq(1)
end end
...@@ -304,14 +304,14 @@ describe GeoNodeStatus, :geo, :geo_fdw do ...@@ -304,14 +304,14 @@ describe GeoNodeStatus, :geo, :geo_fdw do
end end
it 'returns the right percentage with no group restrictions' do it 'returns the right percentage with no group restrictions' do
create(:geo_file_registry, :lfs, file_id: lfs_object_project.lfs_object_id) create(:geo_lfs_object_registry, lfs_object_id: lfs_object_project.lfs_object_id)
expect(subject.lfs_objects_synced_in_percentage).to be_within(0.0001).of(25) expect(subject.lfs_objects_synced_in_percentage).to be_within(0.0001).of(25)
end end
it 'returns the right percentage with group restrictions' do it 'returns the right percentage with group restrictions' do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [group]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [group])
create(:geo_file_registry, :lfs, file_id: lfs_object_project.lfs_object_id) create(:geo_lfs_object_registry, lfs_object_id: lfs_object_project.lfs_object_id)
expect(subject.lfs_objects_synced_in_percentage).to be_within(0.0001).of(50) expect(subject.lfs_objects_synced_in_percentage).to be_within(0.0001).of(50)
end end
......
...@@ -98,7 +98,17 @@ describe Geo::FileDownloadService do ...@@ -98,7 +98,17 @@ describe Geo::FileDownloadService do
shared_examples_for 'a service that downloads the file and registers the sync result' do |file_type| shared_examples_for 'a service that downloads the file and registers the sync result' do |file_type|
let(:download_service) { described_class.new(file_type, file.id) } let(:download_service) { described_class.new(file_type, file.id) }
let(:registry) { file_type == 'job_artifact' ? Geo::JobArtifactRegistry : Geo::FileRegistry }
let(:registry) do
case file_type
when 'job_artifact'
Geo::JobArtifactRegistry
when 'lfs'
Geo::LfsObjectRegistry
else
Geo::FileRegistry
end
end
subject(:execute!) { download_service.execute } subject(:execute!) { download_service.execute }
...@@ -223,8 +233,11 @@ describe Geo::FileDownloadService do ...@@ -223,8 +233,11 @@ describe Geo::FileDownloadService do
context 'for a registered file that failed to sync' do context 'for a registered file that failed to sync' do
let!(:registry_entry) do let!(:registry_entry) do
if file_type == 'job_artifact' case file_type
when 'job_artifact'
create(:geo_job_artifact_registry, success: false, artifact_id: file.id, retry_count: 3, retry_at: 1.hour.ago) create(:geo_job_artifact_registry, success: false, artifact_id: file.id, retry_count: 3, retry_at: 1.hour.ago)
when 'lfs'
create(:geo_lfs_object_registry, success: false, lfs_object_id: file.id, retry_count: 3, retry_at: 1.hour.ago)
else else
create(:geo_file_registry, file_type.to_sym, success: false, file_id: file.id, retry_count: 3, retry_at: 1.hour.ago) create(:geo_file_registry, file_type.to_sym, success: false, file_id: file.id, retry_count: 3, retry_at: 1.hour.ago)
end end
......
...@@ -63,12 +63,33 @@ describe Geo::FileRegistryRemovalService do ...@@ -63,12 +63,33 @@ describe Geo::FileRegistryRemovalService do
end end
end end
shared_examples 'removes LFS object' do
subject(:service) { described_class.new('lfs', registry.lfs_object_id) }
before do
stub_exclusive_lease("file_registry_removal_service:lfs:#{registry.lfs_object_id}",
timeout: Geo::FileRegistryRemovalService::LEASE_TIMEOUT)
end
it 'file from disk' do
expect do
service.execute
end.to change { File.exist?(file_path) }.from(true).to(false)
end
it 'registry when file was deleted successfully' do
expect do
service.execute
end.to change(Geo::LfsObjectRegistry, :count).by(-1)
end
end
context 'with LFS object' do context 'with LFS object' do
let!(:lfs_object) { create(:lfs_object, :with_file) } let!(:lfs_object) { create(:lfs_object, :with_file) }
let!(:file_registry) { create(:geo_file_registry, :lfs, file_id: lfs_object.id) } let!(:registry) { create(:geo_lfs_object_registry, lfs_object_id: lfs_object.id) }
let!(:file_path) { lfs_object.file.path } let!(:file_path) { lfs_object.file.path }
it_behaves_like 'removes' it_behaves_like 'removes LFS object'
context 'migrated to object storage' do context 'migrated to object storage' do
before do before do
...@@ -76,7 +97,7 @@ describe Geo::FileRegistryRemovalService do ...@@ -76,7 +97,7 @@ describe Geo::FileRegistryRemovalService do
lfs_object.update_column(:file_store, LfsObjectUploader::Store::REMOTE) lfs_object.update_column(:file_store, LfsObjectUploader::Store::REMOTE)
end end
it_behaves_like 'removes' it_behaves_like 'removes LFS object'
end end
context 'no lfs_object record' do context 'no lfs_object record' do
...@@ -84,8 +105,8 @@ describe Geo::FileRegistryRemovalService do ...@@ -84,8 +105,8 @@ describe Geo::FileRegistryRemovalService do
lfs_object.delete lfs_object.delete
end end
it_behaves_like 'removes' do it_behaves_like 'removes LFS object' do
subject(:service) { described_class.new(file_registry.file_type, file_registry.file_id, file_path) } subject(:service) { described_class.new('lfs', registry.lfs_object_id, file_path) }
end end
end end
end end
......
...@@ -22,7 +22,7 @@ module EE ...@@ -22,7 +22,7 @@ module EE
override :migrations_paths override :migrations_paths
def migrations_paths def migrations_paths
if geo_migration? if geo_migration?
::Gitlab::Geo::DatabaseTasks.geo_migrate_path ::Gitlab::Geo::DatabaseTasks.geo_migrations_paths
else else
super super
end end
......
...@@ -29,8 +29,8 @@ describe Geo::MigratedLocalFilesCleanUpWorker, :geo, :geo_fdw do ...@@ -29,8 +29,8 @@ describe Geo::MigratedLocalFilesCleanUpWorker, :geo, :geo_fdw do
before do before do
stub_lfs_object_storage stub_lfs_object_storage
create(:geo_file_registry, :lfs, file_id: lfs_object_local.id) create(:geo_lfs_object_registry, lfs_object_id: lfs_object_local.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_remote.id) create(:geo_lfs_object_registry, lfs_object_id: lfs_object_remote.id)
end end
it 'schedules job for file stored remotely and synced locally' do it 'schedules job for file stored remotely and synced locally' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment