Commit e46cd8b5 authored by Nick Thomas's avatar Nick Thomas

Merge branch 'tc-geo-migration-object-storage' into 'master'

Geo ensure files moved to object storage are cleaned up

Closes #4215

See merge request gitlab-org/gitlab-ee!4689
parents a36bcd36 65709d49
...@@ -11,6 +11,7 @@ module Ci ...@@ -11,6 +11,7 @@ module Ci
before_save :set_size, if: :file_changed? before_save :set_size, if: :file_changed?
scope :with_files_stored_locally, -> { where(file_store: [nil, ::JobArtifactUploader::Store::LOCAL]) } scope :with_files_stored_locally, -> { where(file_store: [nil, ::JobArtifactUploader::Store::LOCAL]) }
scope :with_files_stored_remotely, -> { where(file_store: ::JobArtifactUploader::Store::REMOTE) }
mount_uploader :file, JobArtifactUploader mount_uploader :file, JobArtifactUploader
......
...@@ -7,6 +7,7 @@ class LfsObject < ActiveRecord::Base ...@@ -7,6 +7,7 @@ class LfsObject < ActiveRecord::Base
has_many :projects, through: :lfs_objects_projects has_many :projects, through: :lfs_objects_projects
scope :with_files_stored_locally, -> { where(file_store: [nil, LfsObjectUploader::Store::LOCAL]) } scope :with_files_stored_locally, -> { where(file_store: [nil, LfsObjectUploader::Store::LOCAL]) }
scope :with_files_stored_remotely, -> { where(file_store: LfsObjectUploader::Store::REMOTE) }
validates :oid, presence: true, uniqueness: true validates :oid, presence: true, uniqueness: true
......
...@@ -12,6 +12,7 @@ class Upload < ActiveRecord::Base ...@@ -12,6 +12,7 @@ class Upload < ActiveRecord::Base
validates :uploader, presence: true validates :uploader, presence: true
scope :with_files_stored_locally, -> { where(store: [nil, ObjectStorage::Store::LOCAL]) } scope :with_files_stored_locally, -> { where(store: [nil, ObjectStorage::Store::LOCAL]) }
scope :with_files_stored_remotely, -> { where(store: ObjectStorage::Store::REMOTE) }
before_save :calculate_checksum!, if: :foreground_checksummable? before_save :calculate_checksum!, if: :foreground_checksummable?
after_commit :schedule_checksum, if: :checksummable? after_commit :schedule_checksum, if: :checksummable?
......
...@@ -122,6 +122,7 @@ ...@@ -122,6 +122,7 @@
- cronjob:geo_file_download_dispatch - cronjob:geo_file_download_dispatch
- cronjob:geo_metrics_update - cronjob:geo_metrics_update
- cronjob:geo_prune_event_log - cronjob:geo_prune_event_log
- cronjob:geo_migrated_local_files_clean_up
- cronjob:geo_repository_sync - cronjob:geo_repository_sync
- cronjob:geo_repository_verification_primary_batch - cronjob:geo_repository_verification_primary_batch
- cronjob:geo_repository_verification_secondary_scheduler - cronjob:geo_repository_verification_secondary_scheduler
...@@ -140,6 +141,7 @@ ...@@ -140,6 +141,7 @@
- geo:geo_scheduler_secondary_scheduler - geo:geo_scheduler_secondary_scheduler
- geo:geo_file_download - geo:geo_file_download
- geo:geo_file_removal - geo:geo_file_removal
- geo:geo_file_registry_removal
- geo:geo_hashed_storage_attachments_migration - geo:geo_hashed_storage_attachments_migration
- geo:geo_hashed_storage_migration - geo:geo_hashed_storage_migration
- geo:geo_project_sync - geo:geo_project_sync
......
...@@ -288,6 +288,11 @@ production: &base ...@@ -288,6 +288,11 @@ production: &base
geo_file_download_dispatch_worker: geo_file_download_dispatch_worker:
cron: "*/1 * * * *" cron: "*/1 * * * *"
# GitLab Geo migrated local files clean up worker
# NOTE: This will only take effect if Geo is enabled (secondary nodes only)
geo_migrated_local_files_clean_up_worker:
cron: "15 */6 * * *"
registry: registry:
# enabled: true # enabled: true
# host: registry.example.com # host: registry.example.com
......
...@@ -482,6 +482,9 @@ Settings.cron_jobs['geo_repository_verification_primary_batch_worker']['job_clas ...@@ -482,6 +482,9 @@ Settings.cron_jobs['geo_repository_verification_primary_batch_worker']['job_clas
Settings.cron_jobs['geo_repository_verification_secondary_scheduler_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['geo_repository_verification_secondary_scheduler_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_repository_verification_secondary_scheduler_worker']['cron'] ||= '*/1 * * * *' Settings.cron_jobs['geo_repository_verification_secondary_scheduler_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['geo_repository_verification_secondary_scheduler_worker']['job_class'] ||= 'Geo::RepositoryVerification::Secondary::SchedulerWorker' Settings.cron_jobs['geo_repository_verification_secondary_scheduler_worker']['job_class'] ||= 'Geo::RepositoryVerification::Secondary::SchedulerWorker'
Settings.cron_jobs['geo_migrated_local_files_clean_up_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_migrated_local_files_clean_up_worker']['cron'] ||= '15 */6 * * *'
Settings.cron_jobs['geo_migrated_local_files_clean_up_worker']['job_class'] ||= 'Geo::MigratedLocalFilesCleanUpWorker'
Settings.cron_jobs['import_export_project_cleanup_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['import_export_project_cleanup_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['import_export_project_cleanup_worker']['cron'] ||= '0 * * * *' Settings.cron_jobs['import_export_project_cleanup_worker']['cron'] ||= '0 * * * *'
Settings.cron_jobs['import_export_project_cleanup_worker']['job_class'] = 'ImportExportProjectCleanupWorker' Settings.cron_jobs['import_export_project_cleanup_worker']['job_class'] = 'ImportExportProjectCleanupWorker'
......
...@@ -32,6 +32,10 @@ module Geo ...@@ -32,6 +32,10 @@ module Geo
end end
end end
def count_registry_attachments
Geo::FileRegistry.attachments.count
end
def find_synced_attachments def find_synced_attachments
if use_legacy_queries? if use_legacy_queries?
legacy_find_synced_attachments legacy_find_synced_attachments
...@@ -69,6 +73,17 @@ module Geo ...@@ -69,6 +73,17 @@ module Geo
relation.limit(batch_size) relation.limit(batch_size)
end end
def find_migrated_local_attachments(batch_size:, except_file_ids: [])
relation =
if use_legacy_queries?
legacy_find_migrated_local_attachments(except_file_ids: except_file_ids)
else
fdw_find_migrated_local_attachments(except_file_ids: except_file_ids)
end
relation.limit(batch_size)
end
private private
def group_uploads def group_uploads
...@@ -143,6 +158,13 @@ module Geo ...@@ -143,6 +158,13 @@ module Geo
Geo::Fdw::Upload.table_name Geo::Fdw::Upload.table_name
end end
def fdw_find_migrated_local_attachments(except_file_ids:)
fdw_attachments.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_attachments_table}.id")
.with_files_stored_remotely
.merge(Geo::FileRegistry.attachments)
.where.not(id: except_file_ids)
end
# #
# Legacy accessors (non FDW) # Legacy accessors (non FDW)
# #
...@@ -172,5 +194,15 @@ module Geo ...@@ -172,5 +194,15 @@ module Geo
Upload Upload
) )
end end
def legacy_find_migrated_local_attachments(except_file_ids:)
registry_file_ids = Geo::FileRegistry.attachments.pluck(:file_id) - except_file_ids
legacy_inner_join_registry_ids(
attachments.with_files_stored_remotely,
registry_file_ids,
Upload
)
end
end end
end end
module Geo module Geo
class JobArtifactRegistryFinder < FileRegistryFinder class JobArtifactRegistryFinder < FileRegistryFinder
def count_job_artifacts def count_local_job_artifacts
local_job_artifacts.count local_job_artifacts.count
end end
...@@ -20,6 +20,10 @@ module Geo ...@@ -20,6 +20,10 @@ module Geo
end end
end end
def count_registry_job_artifacts
Geo::FileRegistry.job_artifacts.count
end
# Find limited amount of non replicated lfs objects. # Find limited amount of non replicated lfs objects.
# #
# You can pass a list with `except_file_ids:` so you can exclude items you # You can pass a list with `except_file_ids:` so you can exclude items you
...@@ -41,6 +45,17 @@ module Geo ...@@ -41,6 +45,17 @@ module Geo
relation.limit(batch_size) relation.limit(batch_size)
end end
def find_migrated_local_job_artifacts(batch_size:, except_file_ids: [])
relation =
if use_legacy_queries?
legacy_find_migrated_local_job_artifacts(except_file_ids: except_file_ids)
else
fdw_find_migrated_local_job_artifacts(except_file_ids: except_file_ids)
end
relation.limit(batch_size)
end
def job_artifacts def job_artifacts
if selective_sync? if selective_sync?
Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects }) Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
...@@ -90,6 +105,13 @@ module Geo ...@@ -90,6 +105,13 @@ module Geo
.where.not(id: except_file_ids) .where.not(id: except_file_ids)
end end
def fdw_find_migrated_local_job_artifacts(except_file_ids:)
fdw_job_artifacts.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_job_artifacts_table}.id")
.with_files_stored_remotely
.where.not(id: except_file_ids)
.merge(Geo::FileRegistry.job_artifacts)
end
def fdw_job_artifacts def fdw_job_artifacts
if selective_sync? if selective_sync?
Geo::Fdw::Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects }) Geo::Fdw::Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
...@@ -131,5 +153,15 @@ module Geo ...@@ -131,5 +153,15 @@ module Geo
Ci::JobArtifact Ci::JobArtifact
) )
end end
def legacy_find_migrated_local_job_artifacts(except_file_ids:)
registry_file_ids = Geo::FileRegistry.job_artifacts.pluck(:file_id) - except_file_ids
legacy_inner_join_registry_ids(
job_artifacts.with_files_stored_remotely,
registry_file_ids,
Ci::JobArtifact
)
end
end end
end end
module Geo module Geo
class LfsObjectRegistryFinder < FileRegistryFinder class LfsObjectRegistryFinder < FileRegistryFinder
def count_lfs_objects def count_local_lfs_objects
local_lfs_objects.count local_lfs_objects.count
end end
...@@ -20,6 +20,10 @@ module Geo ...@@ -20,6 +20,10 @@ module Geo
end end
end end
def count_registry_lfs_objects
Geo::FileRegistry.lfs_objects.count
end
# Find limited amount of non replicated lfs objects. # Find limited amount of non replicated lfs objects.
# #
# You can pass a list with `except_file_ids:` so you can exclude items you # You can pass a list with `except_file_ids:` so you can exclude items you
...@@ -41,6 +45,17 @@ module Geo ...@@ -41,6 +45,17 @@ module Geo
relation.limit(batch_size) relation.limit(batch_size)
end end
def find_migrated_local_lfs_objects(batch_size:, except_file_ids: [])
relation =
if use_legacy_queries?
legacy_find_migrated_local_lfs_objects(except_file_ids: except_file_ids)
else
fdw_find_migrated_local_lfs_objects(except_file_ids: except_file_ids)
end
relation.limit(batch_size)
end
def lfs_objects def lfs_objects
if selective_sync? if selective_sync?
LfsObject.joins(:projects).where(projects: { id: current_node.projects }) LfsObject.joins(:projects).where(projects: { id: current_node.projects })
...@@ -90,6 +105,13 @@ module Geo ...@@ -90,6 +105,13 @@ module Geo
.where.not(id: except_file_ids) .where.not(id: except_file_ids)
end end
def fdw_find_migrated_local_lfs_objects(except_file_ids:)
fdw_lfs_objects.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_lfs_objects_table}.id")
.with_files_stored_remotely
.where.not(id: except_file_ids)
.merge(Geo::FileRegistry.lfs_objects)
end
def fdw_lfs_objects def fdw_lfs_objects
if selective_sync? if selective_sync?
Geo::Fdw::LfsObject.joins(:project).where(projects: { id: current_node.projects }) Geo::Fdw::LfsObject.joins(:project).where(projects: { id: current_node.projects })
...@@ -131,5 +153,15 @@ module Geo ...@@ -131,5 +153,15 @@ module Geo
LfsObject LfsObject
) )
end end
def legacy_find_migrated_local_lfs_objects(except_file_ids:)
registry_file_ids = Geo::FileRegistry.lfs_objects.pluck(:file_id) - except_file_ids
legacy_inner_join_registry_ids(
lfs_objects.with_files_stored_remotely,
registry_file_ids,
LfsObject
)
end
end end
end end
...@@ -5,6 +5,7 @@ module Geo ...@@ -5,6 +5,7 @@ module Geo
self.table_name = Gitlab::Geo::Fdw.table('ci_job_artifacts') self.table_name = Gitlab::Geo::Fdw.table('ci_job_artifacts')
scope :with_files_stored_locally, -> { where(file_store: [nil, JobArtifactUploader::Store::LOCAL]) } scope :with_files_stored_locally, -> { where(file_store: [nil, JobArtifactUploader::Store::LOCAL]) }
scope :with_files_stored_remotely, -> { where(file_store: JobArtifactUploader::Store::REMOTE) }
end end
end end
end end
......
...@@ -5,5 +5,4 @@ class Geo::FileRegistry < Geo::BaseRegistry ...@@ -5,5 +5,4 @@ class Geo::FileRegistry < Geo::BaseRegistry
scope :lfs_objects, -> { where(file_type: :lfs) } scope :lfs_objects, -> { where(file_type: :lfs) }
scope :job_artifacts, -> { where(file_type: :job_artifact) } scope :job_artifacts, -> { where(file_type: :job_artifact) }
scope :attachments, -> { where(file_type: Geo::FileService::DEFAULT_OBJECT_TYPES) } scope :attachments, -> { where(file_type: Geo::FileService::DEFAULT_OBJECT_TYPES) }
scope :stored_locally, -> { where(store: [nil, ObjectStorage::Store::LOCAL]) }
end end
...@@ -13,6 +13,7 @@ class GeoNodeStatus < ActiveRecord::Base ...@@ -13,6 +13,7 @@ class GeoNodeStatus < ActiveRecord::Base
:repository_created_max_id, :repository_updated_max_id, :repository_created_max_id, :repository_updated_max_id,
:repository_deleted_max_id, :repository_renamed_max_id, :repositories_changed_max_id, :repository_deleted_max_id, :repository_renamed_max_id, :repositories_changed_max_id,
:lfs_object_deleted_max_id, :job_artifact_deleted_max_id, :lfs_object_deleted_max_id, :job_artifact_deleted_max_id,
:lfs_objects_registry_count, :job_artifacts_registry_count, :attachments_registry_count,
:hashed_storage_migrated_max_id, :hashed_storage_attachments_max_id :hashed_storage_migrated_max_id, :hashed_storage_attachments_max_id
# Be sure to keep this consistent with Prometheus naming conventions # Be sure to keep this consistent with Prometheus naming conventions
...@@ -31,12 +32,15 @@ class GeoNodeStatus < ActiveRecord::Base ...@@ -31,12 +32,15 @@ class GeoNodeStatus < ActiveRecord::Base
lfs_objects_count: 'Total number of local LFS objects available on primary', lfs_objects_count: 'Total number of local LFS objects available on primary',
lfs_objects_synced_count: 'Number of local LFS objects synced on secondary', lfs_objects_synced_count: 'Number of local LFS objects synced on secondary',
lfs_objects_failed_count: 'Number of local LFS objects failed to sync on secondary', lfs_objects_failed_count: 'Number of local LFS objects failed to sync on secondary',
lfs_objects_registry_count: 'Number of LFS objects in the registry',
job_artifacts_count: 'Total number of local job artifacts available on primary', job_artifacts_count: 'Total number of local job artifacts available on primary',
job_artifacts_synced_count: 'Number of local job artifacts synced on secondary', job_artifacts_synced_count: 'Number of local job artifacts synced on secondary',
job_artifacts_failed_count: 'Number of local job artifacts failed to sync on secondary', job_artifacts_failed_count: 'Number of local job artifacts failed to sync on secondary',
job_artifacts_registry_count: 'Number of job artifacts in the registry',
attachments_count: 'Total number of local file attachments available on primary', attachments_count: 'Total number of local file attachments available on primary',
attachments_synced_count: 'Number of local file attachments synced on secondary', attachments_synced_count: 'Number of local file attachments synced on secondary',
attachments_failed_count: 'Number of local file attachments failed to sync on secondary', attachments_failed_count: 'Number of local file attachments failed to sync on secondary',
attachments_registry_count: 'Number of attachments in the registry',
replication_slots_count: 'Total number of replication slots on the primary', replication_slots_count: 'Total number of replication slots on the primary',
replication_slots_used_count: 'Number of replication slots in use on the primary', replication_slots_used_count: 'Number of replication slots in use on the primary',
replication_slots_max_retained_wal_bytes: 'Maximum number of bytes retained in the WAL on the primary', replication_slots_max_retained_wal_bytes: 'Maximum number of bytes retained in the WAL on the primary',
...@@ -107,8 +111,8 @@ class GeoNodeStatus < ActiveRecord::Base ...@@ -107,8 +111,8 @@ class GeoNodeStatus < ActiveRecord::Base
self.last_event_date = latest_event&.created_at self.last_event_date = latest_event&.created_at
self.repositories_count = projects_finder.count_repositories self.repositories_count = projects_finder.count_repositories
self.wikis_count = projects_finder.count_wikis self.wikis_count = projects_finder.count_wikis
self.lfs_objects_count = lfs_objects_finder.count_lfs_objects self.lfs_objects_count = lfs_objects_finder.count_local_lfs_objects
self.job_artifacts_count = job_artifacts_finder.count_job_artifacts self.job_artifacts_count = job_artifacts_finder.count_local_job_artifacts
self.attachments_count = attachments_finder.count_local_attachments self.attachments_count = attachments_finder.count_local_attachments
self.last_successful_status_check_at = Time.now self.last_successful_status_check_at = Time.now
self.storage_shards = StorageShard.all self.storage_shards = StorageShard.all
...@@ -162,10 +166,13 @@ class GeoNodeStatus < ActiveRecord::Base ...@@ -162,10 +166,13 @@ class GeoNodeStatus < ActiveRecord::Base
self.wikis_verification_failed_count = projects_finder.count_verification_failed_wikis self.wikis_verification_failed_count = projects_finder.count_verification_failed_wikis
self.lfs_objects_synced_count = lfs_objects_finder.count_synced_lfs_objects self.lfs_objects_synced_count = lfs_objects_finder.count_synced_lfs_objects
self.lfs_objects_failed_count = lfs_objects_finder.count_failed_lfs_objects self.lfs_objects_failed_count = lfs_objects_finder.count_failed_lfs_objects
self.lfs_objects_registry_count = lfs_objects_finder.count_registry_lfs_objects
self.job_artifacts_synced_count = job_artifacts_finder.count_synced_job_artifacts self.job_artifacts_synced_count = job_artifacts_finder.count_synced_job_artifacts
self.job_artifacts_failed_count = job_artifacts_finder.count_failed_job_artifacts self.job_artifacts_failed_count = job_artifacts_finder.count_failed_job_artifacts
self.job_artifacts_registry_count = job_artifacts_finder.count_registry_job_artifacts
self.attachments_synced_count = attachments_finder.count_synced_attachments self.attachments_synced_count = attachments_finder.count_synced_attachments
self.attachments_failed_count = attachments_finder.count_failed_attachments self.attachments_failed_count = attachments_finder.count_failed_attachments
self.attachments_registry_count = attachments_finder.count_registry_attachments
end end
end end
......
...@@ -47,6 +47,6 @@ module ExclusiveLeaseGuard ...@@ -47,6 +47,6 @@ module ExclusiveLeaseGuard
end end
def log_error(message, extra_args = {}) def log_error(message, extra_args = {})
logger.error(messages) logger.error(message)
end end
end end
...@@ -3,9 +3,10 @@ module Geo ...@@ -3,9 +3,10 @@ module Geo
LEASE_TIMEOUT = 8.hours.freeze LEASE_TIMEOUT = 8.hours.freeze
include Delay include Delay
include ExclusiveLeaseGuard
def execute def execute
try_obtain_lease do |lease| try_obtain_lease do
start_time = Time.now start_time = Time.now
bytes_downloaded = downloader.execute bytes_downloaded = downloader.execute
success = (bytes_downloaded.present? && bytes_downloaded >= 0) success = (bytes_downloaded.present? && bytes_downloaded >= 0)
...@@ -27,18 +28,6 @@ module Geo ...@@ -27,18 +28,6 @@ module Geo
raise raise
end end
def try_obtain_lease
uuid = Gitlab::ExclusiveLease.new(lease_key, timeout: LEASE_TIMEOUT).try_obtain
return unless uuid.present?
begin
yield
ensure
Gitlab::ExclusiveLease.cancel(lease_key, uuid)
end
end
def update_registry(bytes_downloaded, success:) def update_registry(bytes_downloaded, success:)
transfer = Geo::FileRegistry.find_or_initialize_by( transfer = Geo::FileRegistry.find_or_initialize_by(
file_type: object_type, file_type: object_type,
...@@ -60,5 +49,9 @@ module Geo ...@@ -60,5 +49,9 @@ module Geo
def lease_key def lease_key
"file_download_service:#{object_type}:#{object_db_id}" "file_download_service:#{object_type}:#{object_db_id}"
end end
def lease_timeout
LEASE_TIMEOUT
end
end end
end end
module Geo
class FileRegistryRemovalService < FileService
include ::Gitlab::Utils::StrongMemoize
LEASE_TIMEOUT = 8.hours.freeze
def execute
log_info('Executing')
try_obtain_lease do
log_info('Lease obtained')
unless file_registry
log_error('Could not find file_registry', type: object_type, id: object_db_id)
return
end
if File.exist?(file_path)
log_info('Unlinking file', file_path: file_path)
File.unlink(file_path)
end
log_info('Removing file registry', file_registry_id: file_registry.id)
file_registry.destroy
log_info('Local file & registry removed')
end
rescue SystemCallError
log_error('Could not remove file', e.message)
raise
end
private
def file_registry
strong_memoize(:file_registry) do
::Geo::FileRegistry.find_by(file_type: object_type, file_id: object_db_id)
end
end
def file_path
strong_memoize(:file_path) do
# When local storage is used, just rely on the existing methods
next file_uploader.file.path if file_uploader.object_store == ObjectStorage::Store::LOCAL
# For remote storage more juggling is needed to actually get the full path on disk
if upload?
upload = file_uploader.upload
file_uploader.class.absolute_path(upload)
else
file_uploader.class.absolute_path(file_uploader.file)
end
end
end
def file_uploader
strong_memoize(:file_uploader) do
case object_type.to_s
when 'lfs'
LfsObject.find_by!(id: object_db_id).file
when 'job_artifact'
Ci::JobArtifact.find_by!(id: object_db_id).file
when *Geo::FileService::DEFAULT_OBJECT_TYPES
Upload.find_by!(id: object_db_id).build_uploader
else
raise NameError, "Unrecognized type: #{object_type}"
end
end
rescue NameError, ActiveRecord::RecordNotFound => err
log_error('Could not build uploader', err.message)
raise
end
def upload?
Geo::FileService::DEFAULT_OBJECT_TYPES.include?(object_type)
end
def lease_key
"file_registry_removal_service:#{object_type}:#{object_db_id}"
end
def lease_timeout
LEASE_TIMEOUT
end
end
end
module Geo module Geo
class FileService class FileService
include ExclusiveLeaseGuard
include ::Gitlab::Geo::LogHelpers
attr_reader :object_type, :object_db_id attr_reader :object_type, :object_db_id
DEFAULT_OBJECT_TYPES = %w[attachment avatar file namespace_file personal_file].freeze DEFAULT_OBJECT_TYPES = %w[attachment avatar file namespace_file personal_file].freeze
...@@ -27,19 +30,7 @@ module Geo ...@@ -27,19 +30,7 @@ module Geo
klass_name.camelize klass_name.camelize
end end
def log_info(message, details = {}) def base_log_data(message)
data = log_base_data(message)
data.merge!(details) if details
Gitlab::Geo::Logger.info(data)
end
def log_error(message, error)
data = log_base_data(message)
data[:error] = error
Gitlab::Geo::Logger.error(data)
end
def log_base_data(message)
{ {
class: self.class.name, class: self.class.name,
object_type: object_type, object_type: object_type,
......
...@@ -8,7 +8,7 @@ module Geo ...@@ -8,7 +8,7 @@ module Geo
current_node.files_max_capacity current_node.files_max_capacity
end end
def schedule_job(object_db_id, object_type) def schedule_job(object_type, object_db_id)
job_id = FileDownloadWorker.perform_async(object_type, object_db_id) job_id = FileDownloadWorker.perform_async(object_type, object_db_id)
{ id: object_db_id, type: object_type, job_id: job_id } if job_id { id: object_db_id, type: object_type, job_id: job_id } if job_id
...@@ -55,24 +55,24 @@ module Geo ...@@ -55,24 +55,24 @@ module Geo
def find_unsynced_lfs_objects_ids(batch_size:) def find_unsynced_lfs_objects_ids(batch_size:)
lfs_objects_finder.find_unsynced_lfs_objects(batch_size: batch_size, except_file_ids: scheduled_file_ids(:lfs)) lfs_objects_finder.find_unsynced_lfs_objects(batch_size: batch_size, except_file_ids: scheduled_file_ids(:lfs))
.pluck(:id) .pluck(:id)
.map { |id| [id, :lfs] } .map { |id| [:lfs, id] }
end end
def find_unsynced_attachments_ids(batch_size:) def find_unsynced_attachments_ids(batch_size:)
attachments_finder.find_unsynced_attachments(batch_size: batch_size, except_file_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES)) attachments_finder.find_unsynced_attachments(batch_size: batch_size, except_file_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
.pluck(:id, :uploader) .pluck(:uploader, :id)
.map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] } .map { |uploader, id| [uploader.sub(/Uploader\z/, '').underscore, id] }
end end
def find_unsynced_job_artifacts_ids(batch_size:) def find_unsynced_job_artifacts_ids(batch_size:)
job_artifacts_finder.find_unsynced_job_artifacts(batch_size: batch_size, except_file_ids: scheduled_file_ids(:job_artifact)) job_artifacts_finder.find_unsynced_job_artifacts(batch_size: batch_size, except_file_ids: scheduled_file_ids(:job_artifact))
.pluck(:id) .pluck(:id)
.map { |id| [id, :job_artifact] } .map { |id| [:job_artifact, id] }
end end
def find_failed_upload_object_ids(batch_size:) def find_failed_upload_object_ids(batch_size:)
file_registry_finder.find_failed_file_registries(batch_size: batch_size) file_registry_finder.find_failed_file_registries(batch_size: batch_size)
.pluck(:file_id, :file_type) .pluck(:file_type, :file_id)
end end
def scheduled_file_ids(file_types) def scheduled_file_ids(file_types)
......
module Geo
class FileRegistryRemovalWorker
include ApplicationWorker
include GeoQueue
include ::Gitlab::Geo::LogHelpers
def perform(object_type, object_db_id)
log_info('Executing Geo::FileRegistryRemovalService', id: object_db_id, type: object_type)
::Geo::FileRegistryRemovalService.new(object_type, object_db_id).execute
end
end
end
module Geo
class MigratedLocalFilesCleanUpWorker < ::Geo::Scheduler::Secondary::SchedulerWorker
include ::CronjobQueue
MAX_CAPACITY = 1000
def perform
# No need to run when nothing is configured to be in Object Storage
return unless attachments_object_store_enabled? ||
lfs_objects_object_store_enabled? ||
job_artifacts_object_store_enabled?
super
end
private
def max_capacity
MAX_CAPACITY
end
def schedule_job(object_type, object_db_id)
job_id = ::Geo::FileRegistryRemovalWorker.perform_async(object_type, object_db_id)
if job_id
retval = { id: object_db_id, type: object_type, job_id: job_id }
log_info('Scheduled Geo::FileRegistryRemovalWorker', retval)
retval
end
end
def load_pending_resources
find_migrated_local_objects(batch_size: db_retrieve_batch_size)
end
def find_migrated_local_objects(batch_size:)
lfs_object_ids = find_migrated_local_lfs_objects_ids(batch_size: batch_size)
attachment_ids = find_migrated_local_attachments_ids(batch_size: batch_size)
job_artifact_ids = find_migrated_local_job_artifacts_ids(batch_size: batch_size)
take_batch(lfs_object_ids, attachment_ids, job_artifact_ids)
end
def find_migrated_local_lfs_objects_ids(batch_size:)
return [] unless lfs_objects_object_store_enabled?
lfs_objects_finder.find_migrated_local_lfs_objects(batch_size: batch_size, except_file_ids: scheduled_file_ids(:lfs))
.pluck(:id)
.map { |id| [:lfs, id] }
end
def find_migrated_local_attachments_ids(batch_size:)
return [] unless attachments_object_store_enabled?
attachments_finder.find_migrated_local_attachments(batch_size: batch_size, except_file_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
.pluck(:uploader, :id)
.map { |uploader, id| [uploader.sub(/Uploader\z/, '').underscore, id] }
end
def find_migrated_local_job_artifacts_ids(batch_size:)
return [] unless job_artifacts_object_store_enabled?
job_artifacts_finder.find_migrated_local_job_artifacts(batch_size: batch_size, except_file_ids: scheduled_file_ids(:job_artifact))
.pluck(:id)
.map { |id| [:job_artifact, id] }
end
def scheduled_file_ids(file_types)
file_types = Array(file_types)
scheduled_jobs.select { |data| file_types.include?(data[:type]) }.map { |data| data[:id] }
end
def attachments_object_store_enabled?
FileUploader.object_store_enabled?
end
def lfs_objects_object_store_enabled?
LfsObjectUploader.object_store_enabled?
end
def job_artifacts_object_store_enabled?
JobArtifactUploader.object_store_enabled?
end
def attachments_finder
@attachments_finder ||= AttachmentRegistryFinder.new(current_node: current_node)
end
def lfs_objects_finder
@lfs_objects_finder ||= LfsObjectRegistryFinder.new(current_node: current_node)
end
def job_artifacts_finder
@job_artifacts_finder ||= JobArtifactRegistryFinder.new(current_node: current_node)
end
end
end
...@@ -3,8 +3,15 @@ module Geo ...@@ -3,8 +3,15 @@ module Geo
module Secondary module Secondary
class PerShardSchedulerWorker < Geo::Scheduler::PerShardSchedulerWorker class PerShardSchedulerWorker < Geo::Scheduler::PerShardSchedulerWorker
def perform def perform
return unless Gitlab::Geo.geo_database_configured? unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary? log_info('Geo database not configured')
return
end
unless Gitlab::Geo.secondary?
log_info('Current node not a secondary')
return
end
super super
end end
......
...@@ -3,8 +3,15 @@ module Geo ...@@ -3,8 +3,15 @@ module Geo
module Secondary module Secondary
class SchedulerWorker < Geo::Scheduler::SchedulerWorker class SchedulerWorker < Geo::Scheduler::SchedulerWorker
def perform def perform
return unless Gitlab::Geo.geo_database_configured? unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary? log_info('Geo database not configured')
return
end
unless Gitlab::Geo.secondary?
log_info('Current node not a secondary')
return
end
super super
end end
......
---
title: Geo ensure files moved to object storage are cleaned up
merge_request: 4689
author:
type: added
...@@ -12,6 +12,7 @@ module Gitlab ...@@ -12,6 +12,7 @@ module Gitlab
geo_repository_sync_worker geo_repository_sync_worker
geo_file_download_dispatch_worker geo_file_download_dispatch_worker
geo_repository_verification_secondary_scheduler_worker geo_repository_verification_secondary_scheduler_worker
geo_migrated_local_files_clean_up_worker
].freeze ].freeze
GEO_JOBS = (COMMON_JOBS + PRIMARY_JOBS + SECONDARY_JOBS).freeze GEO_JOBS = (COMMON_JOBS + PRIMARY_JOBS + SECONDARY_JOBS).freeze
......
...@@ -4,17 +4,13 @@ FactoryBot.define do ...@@ -4,17 +4,13 @@ FactoryBot.define do
file_type :file file_type :file
success true success true
trait :avatar do trait(:attachment) { file_type :attachment }
file_type :avatar trait(:avatar) { file_type :avatar }
end trait(:file) { file_type :file }
trait(:job_artifact) { file_type :job_artifact }
trait :lfs do trait(:lfs) { file_type :lfs }
file_type :lfs trait(:namespace_file) { file_type :namespace_file }
end trait(:personal_file) { file_type :personal_file }
trait :job_artifact do
file_type :job_artifact
end
trait :with_file do trait :with_file do
after(:build, :stub) do |registry, _| after(:build, :stub) do |registry, _|
......
...@@ -11,10 +11,10 @@ describe Geo::AttachmentRegistryFinder, :geo do ...@@ -11,10 +11,10 @@ describe Geo::AttachmentRegistryFinder, :geo do
let(:synced_project) { create(:project, group: synced_group) } let(:synced_project) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project, group: unsynced_group, repository_storage: 'broken') } let(:unsynced_project) { create(:project, group: unsynced_group, repository_storage: 'broken') }
let!(:upload_1) { create(:upload, model: synced_group) } let(:upload_1) { create(:upload, model: synced_group) }
let!(:upload_2) { create(:upload, model: unsynced_group) } let(:upload_2) { create(:upload, model: unsynced_group) }
let!(:upload_3) { create(:upload, :issuable_upload, model: synced_project) } let(:upload_3) { create(:upload, :issuable_upload, model: synced_project) }
let!(:upload_4) { create(:upload, model: unsynced_project) } let(:upload_4) { create(:upload, model: unsynced_project) }
let(:upload_5) { create(:upload, model: synced_project) } let(:upload_5) { create(:upload, model: synced_project) }
let(:upload_6) { create(:upload, :personal_snippet_upload) } let(:upload_6) { create(:upload, :personal_snippet_upload) }
let(:upload_7) { create(:upload, model: synced_subgroup) } let(:upload_7) { create(:upload, model: synced_subgroup) }
...@@ -188,6 +188,50 @@ describe Geo::AttachmentRegistryFinder, :geo do ...@@ -188,6 +188,50 @@ describe Geo::AttachmentRegistryFinder, :geo do
expect(uploads).to match_ids(upload_2, upload_3, upload_4) expect(uploads).to match_ids(upload_2, upload_3, upload_4)
end end
end end
describe '#find_migrated_local_attachments' do
it 'delegates to the correct method' do
expect(subject).to receive("#{method_prefix}_find_migrated_local_attachments".to_sym).and_call_original
subject.find_migrated_local_attachments(batch_size: 100)
end
it 'returns uploads stored remotely and successfully synced locally' do
upload = create(:upload, :object_storage, model: synced_group)
create(:geo_file_registry, :avatar, file_id: upload.id)
uploads = subject.find_migrated_local_attachments(batch_size: 100)
expect(uploads).to match_ids(upload)
end
it 'excludes uploads stored remotely, but not synced yet' do
create(:upload, :object_storage, model: synced_group)
uploads = subject.find_migrated_local_attachments(batch_size: 100)
expect(uploads).to be_empty
end
it 'excludes synced uploads that are stored locally' do
create(:geo_file_registry, :avatar, file_id: upload_5.id)
uploads = subject.find_migrated_local_attachments(batch_size: 100)
expect(uploads).to be_empty
end
it 'excludes except_file_ids' do
upload_a = create(:upload, :object_storage, model: synced_group)
upload_b = create(:upload, :object_storage, model: unsynced_group)
create(:geo_file_registry, :avatar, file_id: upload_a.id, success: true)
create(:geo_file_registry, :avatar, file_id: upload_b.id, success: true)
uploads = subject.find_migrated_local_attachments(batch_size: 10, except_file_ids: [upload_a.id])
expect(uploads).to match_ids(upload_b)
end
end
end end
# Disable transactions via :delete method because a foreign table # Disable transactions via :delete method because a foreign table
......
...@@ -8,15 +8,18 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -8,15 +8,18 @@ describe Geo::JobArtifactRegistryFinder, :geo do
let(:synced_project) { create(:project, group: synced_group) } let(:synced_project) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project) } let(:unsynced_project) { create(:project) }
let!(:job_artifact_1) { create(:ci_job_artifact, id: 1, project: synced_project) } let(:job_artifact_1) { create(:ci_job_artifact, project: synced_project) }
let!(:job_artifact_2) { create(:ci_job_artifact, id: 2, project: unsynced_project) } let(:job_artifact_2) { create(:ci_job_artifact, project: unsynced_project) }
let!(:job_artifact_3) { create(:ci_job_artifact, id: 3, project: synced_project) } let(:job_artifact_3) { create(:ci_job_artifact, project: synced_project) }
let!(:job_artifact_4) { create(:ci_job_artifact, id: 4, project: unsynced_project) } let(:job_artifact_4) { create(:ci_job_artifact, project: unsynced_project) }
let(:job_artifact_remote_1) { create(:ci_job_artifact, :remote_store, project: synced_project) }
let(:job_artifact_remote_2) { create(:ci_job_artifact, :remote_store, project: unsynced_project) }
subject { described_class.new(current_node: secondary) } subject { described_class.new(current_node: secondary) }
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
stub_artifacts_object_storage
end end
describe '#count_synced_job_artifacts' do describe '#count_synced_job_artifacts' do
...@@ -56,15 +59,22 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -56,15 +59,22 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end end
shared_examples 'counts all the things' do shared_examples 'counts all the things' do
describe '#count_job_artifacts' do describe '#count_local_job_artifacts' do
before do
job_artifact_1
job_artifact_2
job_artifact_3
job_artifact_4
end
it 'counts job artifacts' do it 'counts job artifacts' do
expect(subject.count_job_artifacts).to eq 4 expect(subject.count_local_job_artifacts).to eq 4
end end
it 'ignores remote job artifacts' do it 'ignores remote job artifacts' do
job_artifact_1.update!(file_store: ObjectStorage::Store::REMOTE) job_artifact_1.update!(file_store: ObjectStorage::Store::REMOTE)
expect(subject.count_job_artifacts).to eq 3 expect(subject.count_local_job_artifacts).to eq 3
end end
context 'with selective sync' do context 'with selective sync' do
...@@ -73,13 +83,13 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -73,13 +83,13 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end end
it 'counts job artifacts' do it 'counts job artifacts' do
expect(subject.count_job_artifacts).to eq 2 expect(subject.count_local_job_artifacts).to eq 2
end end
it 'ignores remote job artifacts' do it 'ignores remote job artifacts' do
job_artifact_1.update!(file_store: ObjectStorage::Store::REMOTE) job_artifact_1.update!(file_store: ObjectStorage::Store::REMOTE)
expect(subject.count_job_artifacts).to eq 1 expect(subject.count_local_job_artifacts).to eq 1
end end
end end
end end
...@@ -94,10 +104,9 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -94,10 +104,9 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end end
it 'ignores remote job artifacts' do it 'ignores remote job artifacts' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id) create(:geo_file_registry, :job_artifact, file_id: job_artifact_remote_1.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id) create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id) create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id)
job_artifact_1.update!(file_store: ObjectStorage::Store::REMOTE)
expect(subject.count_synced_job_artifacts).to eq 2 expect(subject.count_synced_job_artifacts).to eq 2
end end
...@@ -122,10 +131,9 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -122,10 +131,9 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end end
it 'ignores remote job artifacts' do it 'ignores remote job artifacts' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id) create(:geo_file_registry, :job_artifact, file_id: job_artifact_remote_1.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id) create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id) create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id)
job_artifact_1.update!(file_store: ObjectStorage::Store::REMOTE)
expect(subject.count_synced_job_artifacts).to eq 1 expect(subject.count_synced_job_artifacts).to eq 1
end end
...@@ -142,10 +150,9 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -142,10 +150,9 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end end
it 'ignores remote job artifacts' do it 'ignores remote job artifacts' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false) create(:geo_file_registry, :job_artifact, file_id: job_artifact_remote_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id, success: false) create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false) create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false)
job_artifact_1.update!(file_store: ObjectStorage::Store::REMOTE)
expect(subject.count_failed_job_artifacts).to eq 2 expect(subject.count_failed_job_artifacts).to eq 2
end end
...@@ -212,6 +219,48 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -212,6 +219,48 @@ describe Geo::JobArtifactRegistryFinder, :geo do
expect(job_artifacts).to match_ids(job_artifact_4) expect(job_artifacts).to match_ids(job_artifact_4)
end end
end end
describe '#find_migrated_local_job_artifacts' do
it 'delegates to the correct method' do
expect(subject).to receive("#{method_prefix}_find_migrated_local_job_artifacts".to_sym).and_call_original
subject.find_migrated_local_job_artifacts(batch_size: 10)
end
it 'returns job artifacts remotely and successfully synced locally' do
job_artifact = create(:ci_job_artifact, :remote_store, project: synced_project)
create(:geo_file_registry, :job_artifact, file_id: job_artifact.id)
job_artifacts = subject.find_migrated_local_job_artifacts(batch_size: 10)
expect(job_artifacts).to match_ids(job_artifact)
end
it 'excludes job artifacts stored remotely, but not synced yet' do
create(:ci_job_artifact, :remote_store, project: synced_project)
job_artifacts = subject.find_migrated_local_job_artifacts(batch_size: 10)
expect(job_artifacts).to be_empty
end
it 'excludes synced job artifacts that are stored locally' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id)
job_artifacts = subject.find_migrated_local_job_artifacts(batch_size: 10)
expect(job_artifacts).to be_empty
end
it 'excludes except_file_ids' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_remote_1.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_remote_2.id)
job_artifacts = subject.find_migrated_local_job_artifacts(batch_size: 10, except_file_ids: [job_artifact_remote_1.id])
expect(job_artifacts).to match_ids(job_artifact_remote_2)
end
end
end end
# Disable transactions via :delete method because a foreign table # Disable transactions via :delete method because a foreign table
......
...@@ -8,15 +8,18 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -8,15 +8,18 @@ describe Geo::LfsObjectRegistryFinder, :geo do
let(:synced_project) { create(:project, group: synced_group) } let(:synced_project) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project) } let(:unsynced_project) { create(:project) }
let!(:lfs_object_1) { create(:lfs_object) } let(:lfs_object_1) { create(:lfs_object) }
let!(:lfs_object_2) { create(:lfs_object) } let(:lfs_object_2) { create(:lfs_object) }
let!(:lfs_object_3) { create(:lfs_object) } let(:lfs_object_3) { create(:lfs_object) }
let!(:lfs_object_4) { create(:lfs_object) } let(:lfs_object_4) { create(:lfs_object) }
let(:lfs_object_remote_1) { create(:lfs_object, :object_storage) }
let(:lfs_object_remote_2) { create(:lfs_object, :object_storage) }
subject { described_class.new(current_node: secondary) } subject { described_class.new(current_node: secondary) }
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
stub_lfs_object_storage
end end
describe '#count_synced_lfs_objects' do describe '#count_synced_lfs_objects' do
...@@ -35,10 +38,9 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -35,10 +38,9 @@ describe Geo::LfsObjectRegistryFinder, :geo do
end end
it 'ignores remote LFS objects' do it 'ignores remote LFS objects' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id) create(:geo_file_registry, :lfs, file_id: lfs_object_remote_1.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_2.id) create(:geo_file_registry, :lfs, file_id: lfs_object_2.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id) create(:geo_file_registry, :lfs, file_id: lfs_object_3.id)
lfs_object_1.update_column(:file_store, ObjectStorage::Store::REMOTE)
expect(subject.count_synced_lfs_objects).to eq 2 expect(subject.count_synced_lfs_objects).to eq 2
end end
...@@ -69,10 +71,9 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -69,10 +71,9 @@ describe Geo::LfsObjectRegistryFinder, :geo do
end end
it 'ignores remote LFS objects' do it 'ignores remote LFS objects' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id) create(:geo_file_registry, :lfs, file_id: lfs_object_remote_1.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_2.id) create(:geo_file_registry, :lfs, file_id: lfs_object_2.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id) create(:geo_file_registry, :lfs, file_id: lfs_object_3.id)
lfs_object_1.update_column(:file_store, ObjectStorage::Store::REMOTE)
expect(subject.count_synced_lfs_objects).to eq 1 expect(subject.count_synced_lfs_objects).to eq 1
end end
...@@ -95,10 +96,9 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -95,10 +96,9 @@ describe Geo::LfsObjectRegistryFinder, :geo do
end end
it 'ignores remote LFS objects' do it 'ignores remote LFS objects' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: false) create(:geo_file_registry, :lfs, file_id: lfs_object_remote_1.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object_2.id, success: false) create(:geo_file_registry, :lfs, file_id: lfs_object_2.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false) create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false)
lfs_object_1.update_column(:file_store, ObjectStorage::Store::REMOTE)
expect(subject.count_failed_lfs_objects).to eq 2 expect(subject.count_failed_lfs_objects).to eq 2
end end
...@@ -129,10 +129,9 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -129,10 +129,9 @@ describe Geo::LfsObjectRegistryFinder, :geo do
end end
it 'ignores remote LFS objects' do it 'ignores remote LFS objects' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: false) create(:geo_file_registry, :lfs, file_id: lfs_object_remote_1.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object_2.id, success: false) create(:geo_file_registry, :lfs, file_id: lfs_object_2.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false) create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false)
lfs_object_1.update_column(:file_store, ObjectStorage::Store::REMOTE)
expect(subject.count_failed_lfs_objects).to eq 1 expect(subject.count_failed_lfs_objects).to eq 1
end end
...@@ -165,6 +164,47 @@ describe Geo::LfsObjectRegistryFinder, :geo do ...@@ -165,6 +164,47 @@ describe Geo::LfsObjectRegistryFinder, :geo do
expect(lfs_objects).to match_ids(lfs_object_4) expect(lfs_objects).to match_ids(lfs_object_4)
end end
end end
describe '#find_migrated_local_lfs_objects' do
it 'delegates to the correct method' do
expect(subject).to receive("#{method_prefix}_find_migrated_local_lfs_objects".to_sym).and_call_original
subject.find_migrated_local_lfs_objects(batch_size: 10)
end
it 'returns LFS objects remotely and successfully synced locally' do
create(:geo_file_registry, :lfs, file_id: lfs_object_remote_1.id)
lfs_objects = subject.find_migrated_local_lfs_objects(batch_size: 10)
expect(lfs_objects).to match_ids(lfs_object_remote_1)
end
it 'excludes LFS objects stored remotely, but not synced yet' do
create(:lfs_object, :object_storage)
lfs_objects = subject.find_migrated_local_lfs_objects(batch_size: 10)
expect(lfs_objects).to be_empty
end
it 'excludes synced LFS objects that are stored locally' do
create(:geo_file_registry, :avatar, file_id: lfs_object_1.id)
lfs_objects = subject.find_migrated_local_lfs_objects(batch_size: 10)
expect(lfs_objects).to be_empty
end
it 'excludes except_file_ids' do
create(:geo_file_registry, :lfs, file_id: lfs_object_remote_1.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_remote_2.id)
lfs_objects = subject.find_migrated_local_lfs_objects(batch_size: 10, except_file_ids: [lfs_object_remote_1.id])
expect(lfs_objects).to match_ids(lfs_object_remote_2)
end
end
end end
# Disable transactions via :delete method because a foreign table # Disable transactions via :delete method because a foreign table
......
...@@ -31,6 +31,7 @@ describe Gitlab::Geo::CronManager, :geo do ...@@ -31,6 +31,7 @@ describe Gitlab::Geo::CronManager, :geo do
geo_repository_verification_secondary_scheduler_worker geo_repository_verification_secondary_scheduler_worker
geo_metrics_update_worker geo_metrics_update_worker
geo_prune_event_log_worker geo_prune_event_log_worker
geo_migrated_local_files_clean_up_worker
].freeze ].freeze
before(:all) do before(:all) do
...@@ -49,7 +50,8 @@ describe Gitlab::Geo::CronManager, :geo do ...@@ -49,7 +50,8 @@ describe Gitlab::Geo::CronManager, :geo do
[ [
job('geo_file_download_dispatch_worker'), job('geo_file_download_dispatch_worker'),
job('geo_repository_sync_worker'), job('geo_repository_sync_worker'),
job('geo_repository_verification_secondary_scheduler_worker') job('geo_repository_verification_secondary_scheduler_worker'),
job('geo_migrated_local_files_clean_up_worker')
] ]
end end
......
require 'spec_helper'
describe Geo::FileRegistryRemovalService do
include ::EE::GeoHelpers
set(:secondary) { create(:geo_node) }
before do
stub_current_geo_node(secondary)
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(true)
end
describe '#execute' do
it 'delegates log_error to the Geo logger' do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(false)
expect(Gitlab::Geo::Logger).to receive(:error)
described_class.new(:lfs, 99).execute
end
shared_examples 'removes' do
subject(:service) { described_class.new(file_registry.file_type, file_registry.file_id) }
it 'file from disk' do
expect do
service.execute
end.to change { File.exist?(file_path) }.from(true).to(false)
end
it 'registry when file was deleted successfully' do
expect do
service.execute
end.to change(Geo::FileRegistry, :count).by(-1)
end
end
context 'with LFS object' do
let!(:lfs_object) { create(:lfs_object, :with_file) }
let!(:file_registry) { create(:geo_file_registry, :lfs, file_id: lfs_object.id) }
let!(:file_path) { lfs_object.file.path }
it_behaves_like 'removes'
context 'migrated to object storage' do
before do
stub_lfs_object_storage
lfs_object.update_column(:file_store, LfsObjectUploader::Store::REMOTE)
end
it_behaves_like 'removes'
end
end
context 'with job artifact' do
let!(:job_artifact) { create(:ci_job_artifact, :archive) }
let!(:file_registry) { create(:geo_file_registry, :job_artifact, file_id: job_artifact.id) }
let!(:file_path) { job_artifact.file.path }
it_behaves_like 'removes'
context 'migrated to object storage' do
before do
stub_artifacts_object_storage
job_artifact.update_column(:file_store, JobArtifactUploader::Store::REMOTE)
end
it_behaves_like 'removes'
end
end
context 'with avatar' do
let!(:upload) { create(:user, :with_avatar).avatar.upload }
let!(:file_registry) { create(:geo_file_registry, :avatar, file_id: upload.id) }
let!(:file_path) { upload.build_uploader.file.path }
it_behaves_like 'removes'
context 'migrated to object storage' do
before do
stub_uploads_object_storage(AvatarUploader)
upload.update_column(:store, AvatarUploader::Store::REMOTE)
end
it_behaves_like 'removes'
end
end
context 'with attachment' do
let!(:upload) { create(:note, :with_attachment).attachment.upload }
let!(:file_registry) { create(:geo_file_registry, :attachment, file_id: upload.id) }
let!(:file_path) { upload.build_uploader.file.path }
it_behaves_like 'removes'
context 'migrated to object storage' do
before do
stub_uploads_object_storage(AttachmentUploader)
upload.update_column(:store, AttachmentUploader::Store::REMOTE)
end
it_behaves_like 'removes'
end
end
context 'with file' do # TODO
let!(:upload) { create(:user, :with_avatar).avatar.upload }
let!(:file_registry) { create(:geo_file_registry, :avatar, file_id: upload.id) }
let!(:file_path) { upload.build_uploader.file.path }
it_behaves_like 'removes'
context 'migrated to object storage' do
before do
stub_uploads_object_storage(AvatarUploader)
upload.update_column(:store, AvatarUploader::Store::REMOTE)
end
it_behaves_like 'removes'
end
end
context 'with namespace_file' do
set(:group) { create(:group) }
let(:file) { fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png') }
let!(:upload) do
NamespaceFileUploader.new(group).store!(file)
Upload.find_by(model: group, uploader: NamespaceFileUploader)
end
let!(:file_registry) { create(:geo_file_registry, :namespace_file, file_id: upload.id) }
let!(:file_path) { upload.build_uploader.file.path }
it_behaves_like 'removes'
context 'migrated to object storage' do
before do
stub_uploads_object_storage(NamespaceFileUploader)
upload.update_column(:store, NamespaceFileUploader::Store::REMOTE)
end
it_behaves_like 'removes'
end
end
context 'with personal_file' do
let(:snippet) { create(:personal_snippet) }
let(:file) { fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png') }
let!(:upload) do
PersonalFileUploader.new(snippet).store!(file)
Upload.find_by(model: snippet, uploader: PersonalFileUploader)
end
let!(:file_registry) { create(:geo_file_registry, :personal_file, file_id: upload.id) }
let!(:file_path) { upload.build_uploader.file.path }
it_behaves_like 'removes'
context 'migrated to object storage' do
before do
stub_uploads_object_storage(PersonalFileUploader)
upload.update_column(:store, PersonalFileUploader::Store::REMOTE)
end
it_behaves_like 'removes'
end
end
end
end
require 'spec_helper'
describe Geo::MigratedLocalFilesCleanUpWorker, :geo do
include ::EE::GeoHelpers
let(:primary) { create(:geo_node, :primary, host: 'primary-geo-node') }
let(:secondary) { create(:geo_node) }
subject(:worker) { described_class.new }
before do
stub_current_geo_node(secondary)
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:renew).and_return(true)
end
shared_examples '#perform' do |skip_tests|
before do
skip('FDW is not configured') if skip_tests
end
it 'does not run when node is disabled' do
secondary.enabled = false
secondary.save
expect(worker).not_to receive(:try_obtain_lease)
worker.perform
end
context 'with LFS objects' do
let(:lfs_object_local) { create(:lfs_object) }
let(:lfs_object_remote) { create(:lfs_object, :object_storage) }
before do
stub_lfs_object_storage
create(:geo_file_registry, :lfs, file_id: lfs_object_local.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_remote.id)
end
it 'schedules job for file stored remotely and synced locally' do
expect(worker).to receive(:schedule_job).with(:lfs, lfs_object_remote.id)
expect(worker).not_to receive(:schedule_job).with(anything, lfs_object_local.id)
worker.perform
end
it 'schedules worker for file stored remotely and synced locally' do
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with(:lfs, lfs_object_remote.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with(anything, lfs_object_local.id)
worker.perform
end
end
context 'with attachments' do
let(:avatar_upload) { create(:upload) }
let(:personal_snippet_upload) { create(:upload, :personal_snippet_upload) }
let(:issuable_upload) { create(:upload, :issuable_upload) }
let(:namespace_upload) { create(:upload, :namespace_upload) }
let(:attachment_upload) { create(:upload, :attachment_upload) }
before do
create(:geo_file_registry, :avatar, file_id: avatar_upload.id)
create(:geo_file_registry, :personal_file, file_id: personal_snippet_upload.id)
create(:geo_file_registry, :file, file_id: issuable_upload.id)
create(:geo_file_registry, :namespace_file, file_id: namespace_upload.id)
create(:geo_file_registry, :attachment, file_id: attachment_upload.id)
end
it 'schedules nothing for attachments stored locally' do
expect(worker).not_to receive(:schedule_job).with(anything, avatar_upload.id)
expect(worker).not_to receive(:schedule_job).with(anything, personal_snippet_upload.id)
expect(worker).not_to receive(:schedule_job).with(anything, issuable_upload.id)
expect(worker).not_to receive(:schedule_job).with(anything, namespace_upload.id)
expect(worker).not_to receive(:schedule_job).with(anything, attachment_upload.id)
worker.perform
end
context 'attachments stored remotely' do
before do
stub_uploads_object_storage(AvatarUploader)
stub_uploads_object_storage(PersonalFileUploader)
stub_uploads_object_storage(FileUploader)
stub_uploads_object_storage(NamespaceFileUploader)
stub_uploads_object_storage(AttachmentUploader)
avatar_upload.update_column(:store, FileUploader::Store::REMOTE)
personal_snippet_upload.update_column(:store, FileUploader::Store::REMOTE)
issuable_upload.update_column(:store, FileUploader::Store::REMOTE)
namespace_upload.update_column(:store, FileUploader::Store::REMOTE)
attachment_upload.update_column(:store, FileUploader::Store::REMOTE)
end
it 'schedules jobs for uploads stored remotely and synced locally' do
expect(worker).to receive(:schedule_job).with('avatar', avatar_upload.id)
expect(worker).to receive(:schedule_job).with('personal_file', personal_snippet_upload.id)
expect(worker).to receive(:schedule_job).with('file', issuable_upload.id)
expect(worker).to receive(:schedule_job).with('namespace_file', namespace_upload.id)
expect(worker).to receive(:schedule_job).with('attachment', attachment_upload.id)
worker.perform
end
it 'schedules workers for uploads stored remotely and synced locally' do
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('avatar', avatar_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('personal_file', personal_snippet_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('file', issuable_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('namespace_file', namespace_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('attachment', attachment_upload.id)
worker.perform
end
end
end
context 'with job artifacts' do
let(:job_artifact_local) { create(:ci_job_artifact) }
let(:job_artifact_remote) { create(:ci_job_artifact, :remote_store) }
before do
stub_artifacts_object_storage
create(:geo_file_registry, :job_artifact, file_id: job_artifact_local.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_remote.id)
end
it 'schedules job for artifact stored remotely and synced locally' do
expect(worker).to receive(:schedule_job).with(:job_artifact, job_artifact_remote.id)
expect(worker).not_to receive(:schedule_job).with(anything, job_artifact_local.id)
worker.perform
end
it 'schedules worker for artifact stored remotely and synced locally' do
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with(:job_artifact, job_artifact_remote.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with(anything, job_artifact_local.id)
worker.perform
end
end
end
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
describe 'when PostgreSQL FDW is available', :geo, :delete do
# Skip if FDW isn't activated on this database
it_behaves_like '#perform', Gitlab::Database.postgresql? && !Gitlab::Geo::Fdw.enabled?
end
describe 'when PostgreSQL FDW is not enabled', :geo do
before do
allow(Gitlab::Geo::Fdw).to receive(:enabled?).and_return(false)
end
it_behaves_like '#perform', false
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment