Commit 19d30f82 authored by Michael Kozono's avatar Michael Kozono

Merge branch '222494-remove-feature-flag-to-make-registry-table-ssot-for-uploads' into 'master'

Remove feature flag to make registry table SSOT for Uploads

See merge request gitlab-org/gitlab!35921
parents df8b61be 474448ff
...@@ -501,11 +501,6 @@ production: &base ...@@ -501,11 +501,6 @@ production: &base
geo_registry_sync_worker: geo_registry_sync_worker:
cron: "*/1 * * * *" cron: "*/1 * * * *"
# GitLab Geo migrated local files clean up worker
# NOTE: This will only take effect if Geo is enabled (secondary nodes only)
geo_migrated_local_files_clean_up_worker:
cron: "15 */6 * * *"
# Export pseudonymized data in CSV format for analysis # Export pseudonymized data in CSV format for analysis
pseudonymizer_worker: pseudonymizer_worker:
cron: "0 * * * *" cron: "0 * * * *"
...@@ -514,7 +509,7 @@ production: &base ...@@ -514,7 +509,7 @@ production: &base
# NOTE: This will only take effect if elasticsearch is enabled. # NOTE: This will only take effect if elasticsearch is enabled.
elastic_index_bulk_cron_worker: elastic_index_bulk_cron_worker:
cron: "*/1 * * * *" cron: "*/1 * * * *"
# Elasticsearch bulk updater for initial updates. # Elasticsearch bulk updater for initial updates.
# NOTE: This will only take effect if elasticsearch is enabled. # NOTE: This will only take effect if elasticsearch is enabled.
elastic_index_initial_bulk_cron_worker: elastic_index_initial_bulk_cron_worker:
......
...@@ -522,9 +522,6 @@ Gitlab.ee do ...@@ -522,9 +522,6 @@ Gitlab.ee do
Settings.cron_jobs['geo_metrics_update_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['geo_metrics_update_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_metrics_update_worker']['cron'] ||= '*/1 * * * *' Settings.cron_jobs['geo_metrics_update_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['geo_metrics_update_worker']['job_class'] ||= 'Geo::MetricsUpdateWorker' Settings.cron_jobs['geo_metrics_update_worker']['job_class'] ||= 'Geo::MetricsUpdateWorker'
Settings.cron_jobs['geo_migrated_local_files_clean_up_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_migrated_local_files_clean_up_worker']['cron'] ||= '15 */6 * * *'
Settings.cron_jobs['geo_migrated_local_files_clean_up_worker']['job_class'] ||= 'Geo::MigratedLocalFilesCleanUpWorker'
Settings.cron_jobs['geo_prune_event_log_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['geo_prune_event_log_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_prune_event_log_worker']['cron'] ||= '*/5 * * * *' Settings.cron_jobs['geo_prune_event_log_worker']['cron'] ||= '*/5 * * * *'
Settings.cron_jobs['geo_prune_event_log_worker']['job_class'] ||= 'Geo::PruneEventLogWorker' Settings.cron_jobs['geo_prune_event_log_worker']['job_class'] ||= 'Geo::PruneEventLogWorker'
......
...@@ -2,10 +2,8 @@ ...@@ -2,10 +2,8 @@
module Geo module Geo
class AttachmentRegistryFinder < FileRegistryFinder class AttachmentRegistryFinder < FileRegistryFinder
# Counts all existing registries independent
# of any change on filters / selective sync
def count_registry def count_registry
Geo::UploadRegistry.count syncable.count
end end
def count_syncable def count_syncable
...@@ -13,25 +11,19 @@ module Geo ...@@ -13,25 +11,19 @@ module Geo
end end
def count_synced def count_synced
registries_for_attachments.merge(Geo::UploadRegistry.synced).count syncable.synced.count
end end
def count_failed def count_failed
registries_for_attachments.merge(Geo::UploadRegistry.failed).count syncable.failed.count
end end
def count_synced_missing_on_primary def count_synced_missing_on_primary
registries_for_attachments syncable.synced.missing_on_primary.count
.merge(Geo::UploadRegistry.synced)
.merge(Geo::UploadRegistry.missing_on_primary)
.count
end end
def syncable def syncable
return attachments if selective_sync? Geo::UploadRegistry
return Upload.with_files_stored_locally if local_storage_only?
Upload
end end
# Returns untracked uploads as well as tracked uploads that are unused. # Returns untracked uploads as well as tracked uploads that are unused.
...@@ -57,13 +49,13 @@ module Geo ...@@ -57,13 +49,13 @@ module Geo
def find_registry_differences(range) def find_registry_differences(range)
# rubocop:disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
source = source =
attachments(fdw: false) attachments
.id_in(range) .id_in(range)
.pluck(::Upload.arel_table[:id], ::Upload.arel_table[:uploader]) .pluck(::Upload.arel_table[:id], ::Upload.arel_table[:uploader])
.map! { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] } .map! { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] }
tracked = tracked =
Geo::UploadRegistry syncable
.model_id_in(range) .model_id_in(range)
.pluck(:file_id, :file_type) .pluck(:file_id, :file_type)
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
...@@ -92,48 +84,17 @@ module Geo ...@@ -92,48 +84,17 @@ module Geo
# @param [Array<Integer>] except_ids ids that will be ignored from the query # @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: []) def find_never_synced_registries(batch_size:, except_ids: [])
Geo::UploadRegistry syncable
.never .never
.model_id_not_in(except_ids) .model_id_not_in(except_ids)
.limit(batch_size) .limit(batch_size)
end end
alias_method :find_unsynced, :find_never_synced_registries
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
# Deprecated in favor of the process using
# #find_registry_differences and #find_never_synced_registries
#
# Find limited amount of non replicated attachments.
#
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop: disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_ids: [])
attachments
.missing_registry
.id_not_in(except_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def find_migrated_local(batch_size:, except_ids: [])
all_attachments
.inner_join_registry
.with_files_stored_remotely
.id_not_in(except_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_retryable_failed_registries(batch_size:, except_ids: []) def find_retryable_failed_registries(batch_size:, except_ids: [])
Geo::UploadRegistry syncable
.failed .failed
.retry_due .retry_due
.model_id_not_in(except_ids) .model_id_not_in(except_ids)
...@@ -143,7 +104,7 @@ module Geo ...@@ -143,7 +104,7 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_retryable_synced_missing_on_primary_registries(batch_size:, except_ids: []) def find_retryable_synced_missing_on_primary_registries(batch_size:, except_ids: [])
Geo::UploadRegistry syncable
.synced .synced
.missing_on_primary .missing_on_primary
.retry_due .retry_due
...@@ -154,16 +115,12 @@ module Geo ...@@ -154,16 +115,12 @@ module Geo
private private
def attachments(fdw: true) def attachments
local_storage_only?(fdw: fdw) ? all_attachments(fdw: fdw).with_files_stored_locally : all_attachments(fdw: fdw) local_storage_only?(fdw: false) ? all_attachments.with_files_stored_locally : all_attachments
end
def all_attachments(fdw: true)
current_node(fdw: fdw).attachments
end end
def registries_for_attachments def all_attachments
attachments.inner_join_registry current_node(fdw: false).attachments
end end
end end
end end
...@@ -54,14 +54,6 @@ module Geo ...@@ -54,14 +54,6 @@ module Geo
raise NotImplementedError raise NotImplementedError
end end
# @!method find_migrated_local
# Return an ActiveRecord::Relation of tracked resource records, filtered
# by selective sync, with files stored remotely, excluding
# specified IDs, limited to batch_size
def find_migrated_local
raise NotImplementedError
end
# @!method find_retryable_failed_registries # @!method find_retryable_failed_registries
# Return an ActiveRecord::Relation of registry records marked as failed, # Return an ActiveRecord::Relation of registry records marked as failed,
# which are ready to be retried, excluding specified IDs, limited to # which are ready to be retried, excluding specified IDs, limited to
......
...@@ -14,10 +14,6 @@ class Geo::UploadRegistry < Geo::BaseRegistry ...@@ -14,10 +14,6 @@ class Geo::UploadRegistry < Geo::BaseRegistry
scope :fresh, -> { order(created_at: :desc) } scope :fresh, -> { order(created_at: :desc) }
scope :never, -> { where(success: false, retry_count: nil) } scope :never, -> { where(success: false, retry_count: nil) }
def self.registry_consistency_worker_enabled?
Feature.enabled?(:geo_file_registry_ssot_sync, default_enabled: true)
end
def self.finder_class def self.finder_class
::Geo::AttachmentRegistryFinder ::Geo::AttachmentRegistryFinder
end end
......
...@@ -75,14 +75,6 @@ ...@@ -75,14 +75,6 @@
:weight: 1 :weight: 1
:idempotent: :idempotent:
:tags: [] :tags: []
- :name: cronjob:geo_migrated_local_files_clean_up
:feature_category: :geo_replication
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:geo_prune_event_log - :name: cronjob:geo_prune_event_log
:feature_category: :geo_replication :feature_category: :geo_replication
:has_external_dependencies: :has_external_dependencies:
......
...@@ -10,13 +10,9 @@ module Geo ...@@ -10,13 +10,9 @@ module Geo
end end
def find_unsynced_jobs(batch_size:) def find_unsynced_jobs(batch_size:)
if Geo::UploadRegistry.registry_consistency_worker_enabled? convert_registry_relation_to_job_args(
convert_registry_relation_to_job_args( registry_finder.find_never_synced_registries(find_batch_params(batch_size))
registry_finder.find_never_synced_registries(find_batch_params(batch_size)) )
)
else
super
end
end end
private private
......
# frozen_string_literal: true
module Geo
class MigratedLocalFilesCleanUpWorker < ::Geo::Scheduler::Secondary::SchedulerWorker # rubocop:disable Scalability/IdempotentWorker
include ::CronjobQueue
MAX_CAPACITY = 1000
def perform
# No need to run when objects stored in Object Storage should be synced too
return if sync_object_storage_enabled?
# No need to run when nothing is configured to be in Object Storage
return unless object_store_enabled?
super
end
private
def max_capacity
MAX_CAPACITY
end
def schedule_job(object_type, object_db_id)
job_id = ::Geo::FileRegistryRemovalWorker.perform_async(object_type.to_s, object_db_id)
if job_id
retval = { id: object_db_id, type: object_type, job_id: job_id }
log_info('Scheduled Geo::FileRegistryRemovalWorker', retval)
retval
end
end
def load_pending_resources
find_migrated_local_objects(batch_size: db_retrieve_batch_size)
end
def find_migrated_local_objects(batch_size:)
find_migrated_local_attachments_ids(batch_size: batch_size)
end
# rubocop: disable CodeReuse/ActiveRecord
def find_migrated_local_attachments_ids(batch_size:)
return [] unless attachments_object_store_enabled?
attachments_finder.find_migrated_local(batch_size: batch_size, except_ids: scheduled_file_ids(Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES))
.pluck(Geo::Fdw::Upload.arel_table[:uploader], Geo::Fdw::Upload.arel_table[:id])
.map { |uploader, id| [uploader.sub(/Uploader\z/, '').underscore, id] }
end
# rubocop: enable CodeReuse/ActiveRecord
def scheduled_file_ids(file_types)
file_types = Array(file_types)
file_types = file_types.map(&:to_s)
scheduled_jobs.select { |data| file_types.include?(data[:type].to_s) }.map { |data| data[:id] }
end
def attachments_object_store_enabled?
FileUploader.object_store_enabled?
end
def object_store_enabled?
attachments_object_store_enabled?
end
def sync_object_storage_enabled?
current_node.sync_object_storage
end
def attachments_finder
@attachments_finder ||= AttachmentRegistryFinder.new(current_node_id: current_node.id)
end
end
end
---
title: 'Geo: Make registry table SSOT for uploads'
merge_request: 35921
author:
type: performance
...@@ -16,7 +16,6 @@ module Gitlab ...@@ -16,7 +16,6 @@ module Gitlab
SECONDARY_JOBS = %w[ SECONDARY_JOBS = %w[
geo_file_download_dispatch_worker geo_file_download_dispatch_worker
geo_registry_sync_worker geo_registry_sync_worker
geo_migrated_local_files_clean_up_worker
geo_repository_sync_worker geo_repository_sync_worker
geo_container_repository_sync_worker geo_container_repository_sync_worker
geo_repository_verification_secondary_scheduler_worker geo_repository_verification_secondary_scheduler_worker
......
...@@ -2,373 +2,267 @@ ...@@ -2,373 +2,267 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Geo::AttachmentRegistryFinder, :geo, :geo_fdw do RSpec.describe Geo::AttachmentRegistryFinder, :geo do
include ::EE::GeoHelpers include ::EE::GeoHelpers
# Using let() instead of set() because set() does not work properly let_it_be(:secondary) { create(:geo_node) }
# when using the :delete DatabaseCleaner strategy, which is required
# for FDW tests because a foreign table can't see changes inside a let_it_be(:synced_group) { create(:group) }
# transaction of a different connection. let_it_be(:synced_subgroup) { create(:group, parent: synced_group) }
let(:secondary) { create(:geo_node) } let_it_be(:unsynced_group) { create(:group) }
let_it_be(:synced_project) { create(:project, group: synced_group) }
let(:synced_group) { create(:group) } let_it_be(:synced_project_in_nested_group) { create(:project, group: synced_subgroup) }
let(:synced_subgroup) { create(:group, parent: synced_group) } let_it_be(:unsynced_project) { create(:project, :broken_storage, group: unsynced_group) }
let(:unsynced_group) { create(:group) }
let_it_be(:upload_1) { create(:upload, model: synced_group) }
let(:synced_project) { create(:project, group: synced_group) } let_it_be(:upload_2) { create(:upload, model: unsynced_group) }
let(:synced_project_in_nested_group) { create(:project, group: synced_subgroup) } let_it_be(:upload_3) { create(:upload, :issuable_upload, model: synced_project_in_nested_group) }
let(:unsynced_project) { create(:project, :broken_storage, group: unsynced_group) } let_it_be(:upload_4) { create(:upload, model: unsynced_project) }
let_it_be(:upload_5) { create(:upload, model: synced_project) }
subject { described_class.new(current_node_id: secondary.id) } let_it_be(:upload_6) { create(:upload, :personal_snippet_upload) }
let_it_be(:upload_7) { create(:upload, :object_storage, model: synced_project) }
let_it_be(:upload_8) { create(:upload, :object_storage, model: unsynced_project) }
let_it_be(:upload_9) { create(:upload, :object_storage, model: synced_group) }
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
end end
let!(:upload_synced_group) { create(:upload, model: synced_group) } subject { described_class.new(current_node_id: secondary.id) }
let!(:upload_unsynced_group) { create(:upload, model: unsynced_group) }
let!(:upload_issuable_synced_nested_project) { create(:upload, :issuable_upload, model: synced_project_in_nested_group) } describe '#count_syncable' do
let!(:upload_unsynced_project) { create(:upload, model: unsynced_project) } it 'counts registries for uploads' do
let!(:upload_synced_project) { create(:upload, model: synced_project) } create(:geo_upload_registry, :attachment, :failed, file_id: upload_1.id)
let!(:upload_personal_snippet) { create(:upload, :personal_snippet_upload) } create(:geo_upload_registry, :attachment, file_id: upload_2.id, missing_on_primary: true)
let!(:upload_remote_synced_project) { create(:upload, :object_storage, model: synced_project) } create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_3.id)
let!(:upload_remote_unsynced_project) { create(:upload, :object_storage, model: unsynced_project) } create(:geo_upload_registry, :attachment, :failed, file_id: upload_4.id)
let!(:upload_remote_synced_group) { create(:upload, :object_storage, model: synced_group) } create(:geo_upload_registry, :attachment, file_id: upload_5.id, missing_on_primary: true, retry_at: 1.day.ago)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_6.id)
context 'finds all the things' do create(:geo_upload_registry, :attachment, file_id: upload_7.id, missing_on_primary: true)
describe '#find_unsynced' do create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_8.id)
before do
create(:geo_upload_registry, :avatar, file_id: upload_synced_group.id) expect(subject.count_syncable).to eq 8
create(:geo_upload_registry, :avatar, file_id: upload_unsynced_group.id)
create(:geo_upload_registry, :avatar, file_id: upload_remote_synced_project.id)
end
context 'with object storage sync enabled' do
it 'returns attachments without an entry on the tracking database' do
attachments = subject.find_unsynced(batch_size: 10)
expect(attachments).to match_ids(upload_issuable_synced_nested_project, upload_unsynced_project,
upload_synced_project, upload_personal_snippet, upload_remote_unsynced_project,
upload_remote_synced_group)
end
it 'returns attachments without an entry on the tracking database, excluding from exception list' do
attachments = subject.find_unsynced(batch_size: 10, except_ids: [upload_issuable_synced_nested_project.id])
expect(attachments).to match_ids(upload_unsynced_project, upload_synced_project, upload_personal_snippet,
upload_remote_unsynced_project, upload_remote_synced_group)
end
end
context 'with object storage sync disabled' do
let(:secondary) { create(:geo_node, :local_storage_only) }
it 'returns local attachments only' do
attachments = subject.find_unsynced(batch_size: 10, except_ids: [upload_synced_project.id])
expect(attachments).to match_ids(upload_issuable_synced_nested_project, upload_unsynced_project,
upload_personal_snippet)
end
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'returns attachments without an entry on the tracking database, excluding from exception list' do
attachments = subject.find_unsynced(batch_size: 10, except_ids: [upload_synced_project.id])
expect(attachments).to match_ids(upload_issuable_synced_nested_project, upload_personal_snippet,
upload_remote_synced_group)
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
it 'returns attachments without an entry on the tracking database' do
attachments = subject.find_unsynced(batch_size: 10)
expect(attachments).to match_ids(upload_unsynced_project, upload_personal_snippet,
upload_remote_unsynced_project)
end
end
end end
end
describe '#find_migrated_local' do describe '#count_registry' do
before do it 'counts registries for uploads' do
create(:geo_upload_registry, :avatar, file_id: upload_remote_synced_project.id) create(:geo_upload_registry, :attachment, :failed, file_id: upload_1.id)
create(:geo_upload_registry, :avatar, file_id: upload_remote_unsynced_project.id) create(:geo_upload_registry, :attachment, file_id: upload_2.id, missing_on_primary: true)
end create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_3.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_4.id)
create(:geo_upload_registry, :attachment, file_id: upload_5.id, missing_on_primary: true, retry_at: 1.day.ago)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_6.id)
create(:geo_upload_registry, :attachment, file_id: upload_7.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_8.id)
expect(subject.count_registry).to eq 8
end
end
it 'returns attachments stored remotely and successfully synced locally' do describe '#count_synced' do
attachments = subject.find_migrated_local(batch_size: 100, except_ids: [upload_remote_unsynced_project.id]) it 'counts registries that has been synced' do
create(:geo_upload_registry, :attachment, :failed, file_id: upload_1.id)
create(:geo_upload_registry, :attachment, file_id: upload_2.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_3.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_4.id)
create(:geo_upload_registry, :attachment, file_id: upload_5.id, missing_on_primary: true, retry_at: 1.day.ago)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_6.id)
create(:geo_upload_registry, :attachment, file_id: upload_7.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_8.id)
expect(subject.count_synced).to eq 3
end
end
expect(attachments).to match_ids(upload_remote_synced_project) describe '#count_failed' do
end it 'counts registries that sync has failed' do
create(:geo_upload_registry, :attachment, :failed, file_id: upload_1.id)
create(:geo_upload_registry, :attachment, file_id: upload_2.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_3.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_4.id)
create(:geo_upload_registry, :attachment, file_id: upload_5.id, missing_on_primary: true, retry_at: 1.day.ago)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_6.id)
create(:geo_upload_registry, :attachment, file_id: upload_7.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_8.id)
expect(subject.count_failed).to eq 3
end
end
it 'excludes attachments stored remotely, but not synced yet' do describe '#count_synced_missing_on_primary' do
attachments = subject.find_migrated_local(batch_size: 100) it 'counts registries that have been synced and are missing on the primary, excluding not synced ones' do
create(:geo_upload_registry, :attachment, :failed, file_id: upload_1.id)
create(:geo_upload_registry, :attachment, file_id: upload_2.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_3.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_4.id)
create(:geo_upload_registry, :attachment, file_id: upload_5.id, missing_on_primary: true, retry_at: 1.day.ago)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_6.id)
create(:geo_upload_registry, :attachment, file_id: upload_7.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_8.id)
expect(subject.count_synced_missing_on_primary).to eq 3
end
end
expect(attachments).to match_ids(upload_remote_synced_project, upload_remote_unsynced_project) describe '#find_registry_differences' do
end it 'returns untracked IDs as well as tracked IDs that are unused', :aggregate_failures do
max_id = Upload.maximum(:id)
create(:geo_upload_registry, :avatar, file_id: upload_1.id)
create(:geo_upload_registry, :file, file_id: upload_3.id)
create(:geo_upload_registry, :avatar, file_id: upload_5.id)
create(:geo_upload_registry, :personal_file, file_id: upload_6.id)
create(:geo_upload_registry, :avatar, file_id: upload_7.id)
unused_registry_1 = create(:geo_upload_registry, :attachment, file_id: max_id + 1)
unused_registry_2 = create(:geo_upload_registry, :personal_file, file_id: max_id + 2)
range = 1..(max_id + 2)
untracked, unused = subject.find_registry_differences(range)
expected_untracked = [
[upload_2.id, 'avatar'],
[upload_4.id, 'avatar'],
[upload_8.id, 'avatar'],
[upload_9.id, 'avatar']
]
expected_unused = [
[unused_registry_1.file_id, 'attachment'],
[unused_registry_2.file_id, 'personal_file']
]
expect(untracked).to match_array(expected_untracked)
expect(unused).to match_array(expected_unused)
end
end
context 'with selective sync by namespace' do describe '#find_never_synced_registries' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) } it 'returns registries for uploads that have never been synced' do
create(:geo_upload_registry, :attachment, :failed, file_id: upload_1.id)
create(:geo_upload_registry, :attachment, file_id: upload_2.id, missing_on_primary: true)
registry_upload_3 = create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_3.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_4.id)
create(:geo_upload_registry, :attachment, file_id: upload_5.id, missing_on_primary: true, retry_at: 1.day.ago)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_6.id)
create(:geo_upload_registry, :attachment, file_id: upload_7.id, missing_on_primary: true)
registry_upload_8 = create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_8.id)
it 'returns attachments stored remotely and successfully synced locally' do registries = subject.find_never_synced_registries(batch_size: 10)
attachments = subject.find_migrated_local(batch_size: 10)
expect(attachments).to match_ids(upload_remote_synced_project) expect(registries).to match_ids(registry_upload_3, registry_upload_8)
end end
end
context 'with selective sync by shard' do it 'excludes except_ids' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) } create(:geo_upload_registry, :attachment, :failed, file_id: upload_1.id)
create(:geo_upload_registry, :attachment, file_id: upload_2.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_3.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_4.id)
create(:geo_upload_registry, :attachment, file_id: upload_5.id, missing_on_primary: true, retry_at: 1.day.ago)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_6.id)
create(:geo_upload_registry, :attachment, file_id: upload_7.id, missing_on_primary: true)
registry_upload_8 = create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_8.id)
it 'returns attachments stored remotely and successfully synced locally' do registries = subject.find_never_synced_registries(batch_size: 10, except_ids: [upload_3.id])
attachments = subject.find_migrated_local(batch_size: 10)
expect(attachments).to match_ids(upload_remote_unsynced_project) expect(registries).to match_ids(registry_upload_8)
end
end
end end
end end
context 'counts all the things' do describe '#find_unsynced' do
describe '#count_synced' do it 'returns registries for uploads that have never been synced' do
before do create(:geo_upload_registry, :attachment, :failed, file_id: upload_1.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_synced_group.id) create(:geo_upload_registry, :attachment, file_id: upload_2.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, file_id: upload_unsynced_group.id) registry_upload_3 = create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_3.id)
create(:geo_upload_registry, :attachment, file_id: upload_issuable_synced_nested_project.id) create(:geo_upload_registry, :attachment, :failed, file_id: upload_4.id)
create(:geo_upload_registry, :attachment, file_id: upload_unsynced_project.id) create(:geo_upload_registry, :attachment, file_id: upload_5.id, missing_on_primary: true, retry_at: 1.day.ago)
create(:geo_upload_registry, :attachment, file_id: upload_synced_project.id) create(:geo_upload_registry, :attachment, :failed, file_id: upload_6.id)
create(:geo_upload_registry, :attachment, file_id: upload_personal_snippet.id) create(:geo_upload_registry, :attachment, file_id: upload_7.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, file_id: upload_remote_synced_project.id) registry_upload_8 = create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_8.id)
create(:geo_upload_registry, :attachment, file_id: upload_remote_unsynced_project.id)
end
context 'with object storage sync enabled' do
it 'counts attachments that have been synced' do
expect(subject.count_synced).to eq 7
end
end
context 'with object storage sync disabled' do
let(:secondary) { create(:geo_node, :local_storage_only) }
it 'counts only local attachments that have been synced' do
expect(subject.count_synced).to eq 5
end
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'counts attachments that has been synced' do
expect(subject.count_synced).to eq 4
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
it 'counts attachments that has been synced' do
expect(subject.count_synced).to eq 4
end
end
end
describe '#count_failed' do registries = subject.find_unsynced(batch_size: 10)
before do
create(:geo_upload_registry, :attachment, file_id: upload_synced_group.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_unsynced_group.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_issuable_synced_nested_project.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_unsynced_project.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_synced_project.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_personal_snippet.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_remote_synced_project.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_remote_unsynced_project.id)
end
context 'with object storage sync enabled' do
it 'counts attachments that sync has failed' do
expect(subject.count_failed).to eq 7
end
end
context 'with object storage sync disabled' do
let(:secondary) { create(:geo_node, :local_storage_only) }
it 'counts only local attachments that have failed' do
expect(subject.count_failed).to eq 5
end
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'counts attachments that sync has failed' do
expect(subject.count_failed).to eq 4
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
it 'counts attachments that sync has failed' do
expect(subject.count_failed).to eq 4
end
end
end
describe '#count_synced_missing_on_primary' do expect(registries).to match_ids(registry_upload_3, registry_upload_8)
before do
create(:geo_upload_registry, :attachment, :failed, file_id: upload_synced_group.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, file_id: upload_unsynced_group.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, file_id: upload_issuable_synced_nested_project.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, file_id: upload_unsynced_project.id, missing_on_primary: false)
create(:geo_upload_registry, :attachment, file_id: upload_synced_project.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, file_id: upload_personal_snippet.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, file_id: upload_remote_synced_project.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, file_id: upload_remote_unsynced_project.id, missing_on_primary: true)
end
context 'with object storage sync enabled' do
it 'counts attachments that have been synced and are missing on the primary' do
expect(subject.count_synced_missing_on_primary).to eq 6
end
end
context 'with object storage sync disabled' do
let(:secondary) { create(:geo_node, :local_storage_only) }
it 'counts only local attachments that have been synced and are missing on the primary' do
expect(subject.count_synced_missing_on_primary).to eq 4
end
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'counts attachments that have been synced and are missing on the primary' do
expect(subject.count_synced_missing_on_primary).to eq 4
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
it 'counts attachments that have been synced, are missing on the primary' do
expect(subject.count_synced_missing_on_primary).to eq 3
end
end
end end
describe '#count_syncable' do it 'excludes except_ids' do
context 'with object storage sync enabled' do create(:geo_upload_registry, :attachment, :failed, file_id: upload_1.id)
it 'counts attachments' do create(:geo_upload_registry, :attachment, file_id: upload_2.id, missing_on_primary: true)
expect(subject.count_syncable).to eq 9 registry_upload_3 = create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_3.id)
end create(:geo_upload_registry, :attachment, :failed, file_id: upload_4.id)
end create(:geo_upload_registry, :attachment, file_id: upload_5.id, missing_on_primary: true, retry_at: 1.day.ago)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_6.id)
create(:geo_upload_registry, :attachment, file_id: upload_7.id, missing_on_primary: true)
registry_upload_8 = create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_8.id)
context 'with object storage sync disabled' do registries = subject.find_unsynced(batch_size: 10, except_ids: [registry_upload_3.file_id])
let(:secondary) { create(:geo_node, :local_storage_only) }
it 'counts only local attachments' do expect(registries).to match_ids(registry_upload_8)
expect(subject.count_syncable).to eq 6 end
end end
end
context 'with selective sync by namespace' do describe '#find_retryable_failed_registries' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) } it 'returns registries for job artifacts that have failed to sync' do
registry_upload_1 = create(:geo_upload_registry, :attachment, :failed, file_id: upload_1.id)
create(:geo_upload_registry, :attachment, file_id: upload_2.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_3.id)
registry_upload_4 = create(:geo_upload_registry, :attachment, :failed, file_id: upload_4.id)
create(:geo_upload_registry, :attachment, file_id: upload_5.id, missing_on_primary: true, retry_at: 1.day.ago)
registry_upload_6 = create(:geo_upload_registry, :attachment, :failed, file_id: upload_6.id)
create(:geo_upload_registry, :attachment, file_id: upload_7.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_8.id)
it 'counts attachments' do registries = subject.find_retryable_failed_registries(batch_size: 10)
expect(subject.count_syncable).to eq 6
end expect(registries).to match_ids(registry_upload_1, registry_upload_4, registry_upload_6)
end end
context 'with selective sync by shard' do it 'excludes except_ids' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) } registry_upload_1 = create(:geo_upload_registry, :attachment, :failed, file_id: upload_1.id)
create(:geo_upload_registry, :attachment, file_id: upload_2.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_3.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_4.id)
create(:geo_upload_registry, :attachment, file_id: upload_5.id, missing_on_primary: true, retry_at: 1.day.ago)
registry_upload_6 = create(:geo_upload_registry, :attachment, :failed, file_id: upload_6.id)
create(:geo_upload_registry, :attachment, file_id: upload_7.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_8.id)
it 'counts attachments' do registries = subject.find_retryable_failed_registries(batch_size: 10, except_ids: [upload_4.id])
expect(subject.count_syncable).to eq 4
end expect(registries).to match_ids(registry_upload_1, registry_upload_6)
end
end end
end
describe '#find_retryable_synced_missing_on_primary_registries' do
it 'returns registries for job artifacts that have been synced and are missing on the primary' do
create(:geo_upload_registry, :attachment, :failed, file_id: upload_1.id)
registry_upload_2 = create(:geo_upload_registry, :attachment, file_id: upload_2.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_3.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_4.id)
registry_upload_5 = create(:geo_upload_registry, :attachment, file_id: upload_5.id, missing_on_primary: true, retry_at: 1.day.ago)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_6.id)
create(:geo_upload_registry, :attachment, :failed, file_id: upload_7.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_8.id)
registries = subject.find_retryable_synced_missing_on_primary_registries(batch_size: 10)
describe '#count_registry' do expect(registries).to match_ids(registry_upload_2, registry_upload_5)
before do
create(:geo_upload_registry, :attachment, :failed, file_id: upload_synced_group.id)
create(:geo_upload_registry, :attachment, file_id: upload_unsynced_group.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, file_id: upload_issuable_synced_nested_project.id)
create(:geo_upload_registry, :attachment, file_id: upload_unsynced_project.id, missing_on_primary: false)
create(:geo_upload_registry, :attachment, file_id: upload_synced_project.id)
create(:geo_upload_registry, :attachment, file_id: upload_personal_snippet.id)
create(:geo_upload_registry, :attachment, file_id: upload_remote_synced_project.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, file_id: upload_remote_unsynced_project.id, missing_on_primary: true)
create(:geo_upload_registry, :attachment, file_id: upload_remote_synced_group.id, missing_on_primary: true)
end
context 'with object storage sync enabled' do
it 'counts file registries for attachments' do
expect(subject.count_registry).to eq 9
end
end
context 'with object storage sync disabled' do
let(:secondary) { create(:geo_node, :local_storage_only) }
it 'does not apply local attachments only restriction' do
expect(subject.count_registry).to eq 9
end
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'does not apply the selective sync restriction' do
expect(subject.count_registry).to eq 9
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
it 'does not apply the selective sync restriction' do
expect(subject.count_registry).to eq 9
end
end
end end
describe '#find_registry_differences' do it 'excludes except_ids' do
it 'returns untracked IDs as well as tracked IDs that are unused', :aggregate_failures do create(:geo_upload_registry, :attachment, :failed, file_id: upload_1.id)
max_id = Upload.maximum(:id) registry_upload_2 = create(:geo_upload_registry, :attachment, file_id: upload_2.id, missing_on_primary: true)
create(:geo_upload_registry, :avatar, file_id: upload_synced_group.id) create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_3.id)
create(:geo_upload_registry, :file, file_id: upload_issuable_synced_nested_project.id) create(:geo_upload_registry, :attachment, :failed, file_id: upload_4.id)
create(:geo_upload_registry, :avatar, file_id: upload_synced_project.id) create(:geo_upload_registry, :attachment, file_id: upload_5.id, missing_on_primary: true, retry_at: 1.day.ago)
create(:geo_upload_registry, :personal_file, file_id: upload_personal_snippet.id) create(:geo_upload_registry, :attachment, :failed, file_id: upload_6.id)
create(:geo_upload_registry, :avatar, file_id: upload_remote_synced_project.id) create(:geo_upload_registry, :attachment, :failed, file_id: upload_7.id, missing_on_primary: true)
unused_registry_1 = create(:geo_upload_registry, :attachment, file_id: max_id + 1) create(:geo_upload_registry, :attachment, :never_synced, file_id: upload_8.id)
unused_registry_2 = create(:geo_upload_registry, :personal_file, file_id: max_id + 2)
range = 1..(max_id + 2) registries = subject.find_retryable_synced_missing_on_primary_registries(batch_size: 10, except_ids: [upload_5.id])
untracked, unused = subject.find_registry_differences(range) expect(registries).to match_ids(registry_upload_2)
expected_untracked = [
[upload_unsynced_group.id, 'avatar'],
[upload_unsynced_project.id, 'avatar'],
[upload_remote_unsynced_project.id, 'avatar'],
[upload_remote_synced_group.id, 'avatar']
]
expected_unused = [
[unused_registry_1.file_id, 'attachment'],
[unused_registry_2.file_id, 'personal_file']
]
expect(untracked).to match_array(expected_untracked)
expect(unused).to match_array(expected_unused)
end
end end
end end
it_behaves_like 'a file registry finder'
end end
...@@ -14,7 +14,6 @@ RSpec.describe Geo::FileRegistryFinder, :geo, :geo_fdw do ...@@ -14,7 +14,6 @@ RSpec.describe Geo::FileRegistryFinder, :geo, :geo_fdw do
count_synced_missing_on_primary count_synced_missing_on_primary
count_registry count_registry
find_unsynced find_unsynced
find_migrated_local
find_retryable_failed_registries find_retryable_failed_registries
find_retryable_synced_missing_on_primary_registries find_retryable_synced_missing_on_primary_registries
].each do |required_method| ].each do |required_method|
......
...@@ -16,7 +16,6 @@ RSpec.describe Gitlab::Geo::CronManager, :geo do ...@@ -16,7 +16,6 @@ RSpec.describe Gitlab::Geo::CronManager, :geo do
geo_repository_verification_secondary_scheduler_worker geo_repository_verification_secondary_scheduler_worker
geo_metrics_update_worker geo_metrics_update_worker
geo_prune_event_log_worker geo_prune_event_log_worker
geo_migrated_local_files_clean_up_worker
].freeze ].freeze
def job(name) def job(name)
...@@ -39,8 +38,7 @@ RSpec.describe Gitlab::Geo::CronManager, :geo do ...@@ -39,8 +38,7 @@ RSpec.describe Gitlab::Geo::CronManager, :geo do
job('geo_registry_sync_worker'), job('geo_registry_sync_worker'),
job('geo_repository_sync_worker'), job('geo_repository_sync_worker'),
job('geo_container_repository_sync_worker'), job('geo_container_repository_sync_worker'),
job('geo_repository_verification_secondary_scheduler_worker'), job('geo_repository_verification_secondary_scheduler_worker')
job('geo_migrated_local_files_clean_up_worker')
] ]
end end
......
...@@ -125,7 +125,7 @@ RSpec.describe GeoNodeStatus, :geo, :geo_fdw do ...@@ -125,7 +125,7 @@ RSpec.describe GeoNodeStatus, :geo, :geo_fdw do
describe '#attachments_synced_count' do describe '#attachments_synced_count' do
it 'only counts successful syncs' do it 'only counts successful syncs' do
create_list(:user, 3, avatar: fixture_file_upload('spec/fixtures/dk.png', 'image/png')) create_list(:user, 3, avatar: fixture_file_upload('spec/fixtures/dk.png', 'image/png'))
uploads = Upload.all.pluck(:id) uploads = Upload.pluck(:id)
create(:geo_upload_registry, :avatar, file_id: uploads[0]) create(:geo_upload_registry, :avatar, file_id: uploads[0])
create(:geo_upload_registry, :avatar, file_id: uploads[1]) create(:geo_upload_registry, :avatar, file_id: uploads[1])
...@@ -133,42 +133,12 @@ RSpec.describe GeoNodeStatus, :geo, :geo_fdw do ...@@ -133,42 +133,12 @@ RSpec.describe GeoNodeStatus, :geo, :geo_fdw do
expect(subject.attachments_synced_count).to eq(2) expect(subject.attachments_synced_count).to eq(2)
end end
it 'does not count synced files that were replaced' do
user = create(:user, avatar: fixture_file_upload('spec/fixtures/dk.png', 'image/png'))
expect(subject.attachments_count).to eq(1)
expect(subject.attachments_synced_count).to eq(0)
upload = Upload.find_by(model: user, uploader: 'AvatarUploader')
create(:geo_upload_registry, :avatar, file_id: upload.id)
subject = described_class.current_node_status
expect(subject.attachments_count).to eq(1)
expect(subject.attachments_synced_count).to eq(1)
user.update(avatar: fixture_file_upload('spec/fixtures/rails_sample.jpg', 'image/jpeg'))
subject = described_class.current_node_status
expect(subject.attachments_count).to eq(1)
expect(subject.attachments_synced_count).to eq(0)
upload = Upload.find_by(model: user, uploader: 'AvatarUploader')
create(:geo_upload_registry, :avatar, file_id: upload.id)
subject = described_class.current_node_status
expect(subject.attachments_count).to eq(1)
expect(subject.attachments_synced_count).to eq(1)
end
end end
describe '#attachments_synced_missing_on_primary_count' do describe '#attachments_synced_missing_on_primary_count' do
it 'only counts successful syncs' do it 'only counts successful syncs' do
create_list(:user, 3, avatar: fixture_file_upload('spec/fixtures/dk.png', 'image/png')) create_list(:user, 3, avatar: fixture_file_upload('spec/fixtures/dk.png', 'image/png'))
uploads = Upload.all.pluck(:id) uploads = Upload.pluck(:id)
create(:geo_upload_registry, :avatar, file_id: uploads[0], missing_on_primary: true) create(:geo_upload_registry, :avatar, file_id: uploads[0], missing_on_primary: true)
create(:geo_upload_registry, :avatar, file_id: uploads[1]) create(:geo_upload_registry, :avatar, file_id: uploads[1])
...@@ -194,32 +164,20 @@ RSpec.describe GeoNodeStatus, :geo, :geo_fdw do ...@@ -194,32 +164,20 @@ RSpec.describe GeoNodeStatus, :geo, :geo_fdw do
end end
describe '#attachments_synced_in_percentage' do describe '#attachments_synced_in_percentage' do
let(:avatar) { fixture_file_upload('spec/fixtures/dk.png') } it 'returns 0 when no registries are available' do
let(:upload_1) { create(:upload, model: group, path: avatar) }
let(:upload_2) { create(:upload, model: project_1, path: avatar) }
before do
create(:upload, model: create(:group), path: avatar)
create(:upload, model: project_3, path: avatar)
end
it 'returns 0 when no objects are available' do
expect(subject.attachments_synced_in_percentage).to eq(0) expect(subject.attachments_synced_in_percentage).to eq(0)
end end
it 'returns the right percentage with no group restrictions' do it 'returns the right percentage' do
create(:geo_upload_registry, :avatar, file_id: upload_1.id) create_list(:user, 4, avatar: fixture_file_upload('spec/fixtures/dk.png', 'image/png'))
create(:geo_upload_registry, :avatar, file_id: upload_2.id) uploads = Upload.pluck(:id)
expect(subject.attachments_synced_in_percentage).to be_within(0.0001).of(50)
end
it 'returns the right percentage with group restrictions' do create(:geo_upload_registry, :avatar, file_id: uploads[0])
secondary.update!(selective_sync_type: 'namespaces', namespaces: [group]) create(:geo_upload_registry, :avatar, file_id: uploads[1])
create(:geo_upload_registry, :avatar, file_id: upload_1.id) create(:geo_upload_registry, :avatar, :failed, file_id: uploads[2])
create(:geo_upload_registry, :avatar, file_id: upload_2.id) create(:geo_upload_registry, :avatar, :never_synced, file_id: uploads[3])
expect(subject.attachments_synced_in_percentage).to be_within(0.0001).of(100) expect(subject.attachments_synced_in_percentage).to be_within(0.0001).of(50)
end end
end end
......
...@@ -10,7 +10,6 @@ RSpec.shared_examples 'a file registry finder' do ...@@ -10,7 +10,6 @@ RSpec.shared_examples 'a file registry finder' do
count_synced_missing_on_primary count_synced_missing_on_primary
count_registry count_registry
find_unsynced find_unsynced
find_migrated_local
find_retryable_failed_registries find_retryable_failed_registries
find_retryable_synced_missing_on_primary_registries find_retryable_synced_missing_on_primary_registries
} }
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw, :use_sql_query_cache_for_tracking_db do RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_tracking_db do
include ::EE::GeoHelpers include ::EE::GeoHelpers
include ExclusiveLeaseHelpers include ExclusiveLeaseHelpers
...@@ -88,219 +88,106 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw, :use_sql_query_c ...@@ -88,219 +88,106 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw, :use_sql_query_c
context 'with attachments (Upload records)' do context 'with attachments (Upload records)' do
let(:upload) { create(:upload) } let(:upload) { create(:upload) }
context 'with geo_file_registry_ssot_sync feature enabled' do it 'performs Geo::FileDownloadWorker for unsynced attachments' do
before do create(:geo_upload_registry, :avatar, :never_synced, file_id: upload.id)
stub_feature_flags(geo_file_registry_ssot_sync: true)
end
it 'performs Geo::FileDownloadWorker for unsynced attachments' do
create(:geo_upload_registry, :avatar, :never_synced, file_id: upload.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', upload.id)
subject.perform
end
it 'performs Geo::FileDownloadWorker for failed-sync attachments' do
create(:geo_upload_registry, :avatar, :failed, file_id: upload.id, bytes: 0)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('avatar', upload.id).once.and_return(spy)
subject.perform
end
it 'does not perform Geo::FileDownloadWorker for synced attachments' do
create(:geo_upload_registry, :avatar, file_id: upload.id, bytes: 1234)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
subject.perform
end
it 'does not perform Geo::FileDownloadWorker for synced attachments even with 0 bytes downloaded' do expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', upload.id)
create(:geo_upload_registry, :avatar, file_id: upload.id, bytes: 0)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async) subject.perform
end
subject.perform it 'performs Geo::FileDownloadWorker for failed-sync attachments' do
end create(:geo_upload_registry, :avatar, :failed, file_id: upload.id, bytes: 0)
context 'with a failed file' do expect(Geo::FileDownloadWorker).to receive(:perform_async)
let(:failed_registry) { create(:geo_upload_registry, :avatar, :failed, file_id: non_existing_record_id) } .with('avatar', upload.id).once.and_return(spy)
it 'does not stall backfill' do subject.perform
unsynced_registry = create(:geo_upload_registry, :avatar, :with_file, :never_synced) end
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 1) it 'does not perform Geo::FileDownloadWorker for synced attachments' do
create(:geo_upload_registry, :avatar, file_id: upload.id, bytes: 1234)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with('avatar', failed_registry.file_id) expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', unsynced_registry.file_id)
subject.perform subject.perform
end end
it 'retries failed files' do it 'does not perform Geo::FileDownloadWorker for synced attachments even with 0 bytes downloaded' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', failed_registry.file_id) create(:geo_upload_registry, :avatar, file_id: upload.id, bytes: 0)
subject.perform expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
end
it 'does not retry failed files when retry_at is tomorrow' do subject.perform
failed_registry = create(:geo_upload_registry, :avatar, :failed, file_id: non_existing_record_id, retry_at: Date.tomorrow) end
expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with('avatar', failed_registry.file_id) context 'with a failed file' do
let(:failed_registry) { create(:geo_upload_registry, :avatar, :failed, file_id: non_existing_record_id) }
subject.perform it 'does not stall backfill' do
end unsynced_registry = create(:geo_upload_registry, :avatar, :with_file, :never_synced)
it 'retries failed files when retry_at is in the past' do stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 1)
failed_registry = create(:geo_upload_registry, :avatar, :failed, file_id: non_existing_record_id, retry_at: Date.yesterday)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', failed_registry.file_id) expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with('avatar', failed_registry.file_id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', unsynced_registry.file_id)
subject.perform subject.perform
end
end end
context 'with Upload files missing on the primary that are marked as synced' do it 'retries failed files' do
let(:synced_upload_with_file_missing_on_primary) { create(:upload) } expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', failed_registry.file_id)
before do
Geo::UploadRegistry.create!(file_type: :avatar, file_id: synced_upload_with_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true)
end
it 'retries the files if there is spare capacity' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', synced_upload_with_file_missing_on_primary.id)
subject.perform
end
it 'does not retry those files if there is no spare capacity' do
unsynced_registry = create(:geo_upload_registry, :avatar, :with_file, :never_synced)
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', unsynced_registry.file_id)
subject.perform
end
it 'does not retry those files if they are already scheduled' do
unsynced_registry = create(:geo_upload_registry, :avatar, :with_file, :never_synced)
scheduled_jobs = [{ type: 'avatar', id: synced_upload_with_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', unsynced_registry.file_id)
subject.perform subject.perform
end
end end
end
context 'with geo_file_registry_ssot_sync feature disabled' do it 'does not retry failed files when retry_at is tomorrow' do
before do failed_registry = create(:geo_upload_registry, :avatar, :failed, file_id: non_existing_record_id, retry_at: Date.tomorrow)
stub_feature_flags(geo_file_registry_ssot_sync: false)
end
it 'performs Geo::FileDownloadWorker for unsynced attachments' do expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with('avatar', failed_registry.file_id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', upload.id)
subject.perform subject.perform
end end
it 'performs Geo::FileDownloadWorker for failed-sync attachments' do it 'retries failed files when retry_at is in the past' do
create(:geo_upload_registry, :avatar, :failed, file_id: upload.id, bytes: 0) failed_registry = create(:geo_upload_registry, :avatar, :failed, file_id: non_existing_record_id, retry_at: Date.yesterday)
expect(Geo::FileDownloadWorker).to receive(:perform_async) expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', failed_registry.file_id)
.with('avatar', upload.id).once.and_return(spy)
subject.perform subject.perform
end end
end
it 'does not perform Geo::FileDownloadWorker for synced attachments' do context 'with Upload files missing on the primary that are marked as synced' do
create(:geo_upload_registry, :avatar, file_id: upload.id, bytes: 1234) let(:synced_upload_with_file_missing_on_primary) { create(:upload) }
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
subject.perform before do
Geo::UploadRegistry.create!(file_type: :avatar, file_id: synced_upload_with_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true)
end end
it 'does not perform Geo::FileDownloadWorker for synced attachments even with 0 bytes downloaded' do it 'retries the files if there is spare capacity' do
create(:geo_upload_registry, :avatar, file_id: upload.id, bytes: 0) expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', synced_upload_with_file_missing_on_primary.id)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
subject.perform subject.perform
end end
context 'with a failed file' do it 'does not retry those files if there is no spare capacity' do
let(:failed_registry) { create(:geo_upload_registry, :avatar, :failed, file_id: non_existing_record_id) } unsynced_registry = create(:geo_upload_registry, :avatar, :with_file, :never_synced)
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
it 'does not stall backfill' do
unsynced = create(:upload)
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 1)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with('avatar', failed_registry.file_id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', unsynced.id)
subject.perform
end
it 'retries failed files' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', failed_registry.file_id)
subject.perform
end
it 'does not retry failed files when retry_at is tomorrow' do
failed_registry = create(:geo_upload_registry, :avatar, :failed, file_id: non_existing_record_id, retry_at: Date.tomorrow)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with('avatar', failed_registry.file_id)
subject.perform
end
it 'retries failed files when retry_at is in the past' do
failed_registry = create(:geo_upload_registry, :avatar, :failed, file_id: non_existing_record_id, retry_at: Date.yesterday)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', failed_registry.file_id) expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', unsynced_registry.file_id)
subject.perform subject.perform
end
end end
context 'with Upload files missing on the primary that are marked as synced' do it 'does not retry those files if they are already scheduled' do
let(:synced_upload_with_file_missing_on_primary) { create(:upload) } unsynced_registry = create(:geo_upload_registry, :avatar, :with_file, :never_synced)
before do
Geo::UploadRegistry.create!(file_type: :avatar, file_id: synced_upload_with_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true)
end
it 'retries the files if there is spare capacity' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', synced_upload_with_file_missing_on_primary.id)
subject.perform
end
it 'does not retry those files if there is no spare capacity' do
unsynced_upload = create(:upload)
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', unsynced_upload.id)
subject.perform
end
it 'does not retry those files if they are already scheduled' do
unsynced_upload = create(:upload)
scheduled_jobs = [{ type: 'avatar', id: synced_upload_with_file_missing_on_primary.id, job_id: 'foo' }] scheduled_jobs = [{ type: 'avatar', id: synced_upload_with_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1) expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', unsynced_upload.id) expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', unsynced_registry.file_id)
subject.perform subject.perform
end
end end
end end
end end
...@@ -506,41 +393,4 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw, :use_sql_query_c ...@@ -506,41 +393,4 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw, :use_sql_query_c
subject.perform subject.perform
end end
end end
context 'when node has namespace restrictions', :request_store do
let(:synced_group) { create(:group) }
let(:project_in_synced_group) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project) }
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
allow(ProjectCacheWorker).to receive(:perform_async).and_return(true)
allow(::Gitlab::Geo).to receive(:current_node).and_call_original
Rails.cache.write(:current_node, secondary.to_json)
allow(::GeoNode).to receive(:current_node).and_return(secondary)
end
context 'with geo_file_registry_ssot_sync feature disabled' do
before do
stub_feature_flags(geo_file_registry_ssot_sync: false)
end
it 'does not perform Geo::FileDownloadWorker for upload objects that do not belong to selected namespaces to replicate' do
avatar = fixture_file_upload('spec/fixtures/dk.png')
avatar_in_synced_group = create(:upload, model: synced_group, path: avatar)
create(:upload, model: create(:group), path: avatar)
avatar_in_project_in_synced_group = create(:upload, model: project_in_synced_group, path: avatar)
create(:upload, model: unsynced_project, path: avatar)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('avatar', avatar_in_project_in_synced_group.id).once.and_return(spy)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('avatar', avatar_in_synced_group.id).once.and_return(spy)
subject.perform
end
end
end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::MigratedLocalFilesCleanUpWorker, :geo, :geo_fdw, :use_sql_query_cache_for_tracking_db do
include ::EE::GeoHelpers
include ExclusiveLeaseHelpers
let(:primary) { create(:geo_node, :primary, host: 'primary-geo-node') }
let(:secondary) { create(:geo_node, :local_storage_only) }
let(:synced_group) { create(:group) }
let(:synced_project) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project) }
let(:project_broken_storage) { create(:project, :broken_storage) }
before do
stub_current_geo_node(secondary)
stub_exclusive_lease(renew: true)
end
it 'does not run when node is disabled' do
secondary.update_column(:enabled, false)
expect(subject).not_to receive(:try_obtain_lease)
subject.perform
end
it 'does not run when sync_object_storage is enabled' do
secondary.update_column(:sync_object_storage, true)
expect(subject).not_to receive(:try_obtain_lease)
subject.perform
end
context 'with attachments' do
let(:avatar_upload) { create(:upload) }
let(:personal_snippet_upload) { create(:upload, :personal_snippet_upload) }
let(:issuable_upload) { create(:upload, :issuable_upload) }
let(:namespace_upload) { create(:upload, :namespace_upload) }
let(:attachment_upload) { create(:upload, :attachment_upload) }
let(:favicon_upload) { create(:upload, :favicon_upload) }
before do
create(:geo_upload_registry, :avatar, file_id: avatar_upload.id)
create(:geo_upload_registry, :personal_file, file_id: personal_snippet_upload.id)
create(:geo_upload_registry, :file, file_id: issuable_upload.id)
create(:geo_upload_registry, :namespace_file, file_id: namespace_upload.id)
create(:geo_upload_registry, :attachment, file_id: attachment_upload.id)
create(:geo_upload_registry, :favicon, file_id: favicon_upload.id)
end
it 'schedules nothing for attachments stored locally' do
expect(subject).not_to receive(:schedule_job).with(anything, avatar_upload.id)
expect(subject).not_to receive(:schedule_job).with(anything, personal_snippet_upload.id)
expect(subject).not_to receive(:schedule_job).with(anything, issuable_upload.id)
expect(subject).not_to receive(:schedule_job).with(anything, namespace_upload.id)
expect(subject).not_to receive(:schedule_job).with(anything, attachment_upload.id)
expect(subject).not_to receive(:schedule_job).with(anything, favicon_upload.id)
subject.perform
end
context 'attachments stored remotely' do
before do
stub_uploads_object_storage(AvatarUploader)
stub_uploads_object_storage(PersonalFileUploader)
stub_uploads_object_storage(FileUploader)
stub_uploads_object_storage(NamespaceFileUploader)
stub_uploads_object_storage(AttachmentUploader)
stub_uploads_object_storage(FaviconUploader)
avatar_upload.update_column(:store, FileUploader::Store::REMOTE)
personal_snippet_upload.update_column(:store, FileUploader::Store::REMOTE)
issuable_upload.update_column(:store, FileUploader::Store::REMOTE)
namespace_upload.update_column(:store, FileUploader::Store::REMOTE)
attachment_upload.update_column(:store, FileUploader::Store::REMOTE)
favicon_upload.update_column(:store, FileUploader::Store::REMOTE)
end
it 'schedules workers for uploads stored remotely and synced locally' do
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('avatar', avatar_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('personal_file', personal_snippet_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('file', issuable_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('namespace_file', namespace_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('attachment', attachment_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('favicon', favicon_upload.id)
subject.perform
end
context 'with selective sync by namespace' do
let(:issuable_upload_synced_group) { create(:upload, :issuable_upload, model: synced_project) }
let(:secondary) { create(:geo_node, :local_storage_only, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
before do
create(:geo_upload_registry, :file, file_id: issuable_upload_synced_group.id)
issuable_upload_synced_group.update_column(:store, FileUploader::Store::REMOTE)
end
it 'schedules workers for uploads stored remotely and synced locally' do
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('file', issuable_upload_synced_group.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('favicon', favicon_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('personal_file', personal_snippet_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('attachment', attachment_upload.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with('avatar', avatar_upload.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with('file', issuable_upload.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with('namespace_file', namespace_upload.id)
subject.perform
end
end
context 'with selective sync by shard' do
let(:issuable_upload_synced_group) { create(:upload, :issuable_upload, model: project_broken_storage) }
let(:secondary) { create(:geo_node, :local_storage_only, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
before do
create(:geo_upload_registry, :file, file_id: issuable_upload_synced_group.id)
issuable_upload_synced_group.update_column(:store, FileUploader::Store::REMOTE)
end
it 'schedules workers for uploads stored remotely and synced locally' do
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('file', issuable_upload_synced_group.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('favicon', favicon_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('personal_file', personal_snippet_upload.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('attachment', attachment_upload.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with('avatar', avatar_upload.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with('file', issuable_upload.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with('namespace_file', namespace_upload.id)
subject.perform
end
end
end
end
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:skip" }
before do
stub_uploads_object_storage(AvatarUploader)
allow(Rails.cache).to receive(:read).and_call_original
allow(Rails.cache).to receive(:write).and_call_original
end
it 'sets the back off time when there are no pending items' do
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform
end
it 'does not perform Geo::FileRegistryRemovalWorker when the backoff time is set' do
create(:geo_upload_registry, :avatar)
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async)
subject.perform
end
end
end
...@@ -97,27 +97,6 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -97,27 +97,6 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(1) expect(Geo::PackageFileRegistry.where(package_file_id: package_file.id).count).to eq(1)
end end
context 'when geo_file_registry_ssot_sync is disabled' do
before do
stub_feature_flags(geo_file_registry_ssot_sync: false)
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService for Uploads' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::PackageFileRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::ProjectRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::UploadRegistry, batch_size: 1000)
subject.perform
end
end
context 'when geo_project_registry_ssot_sync is disabled' do context 'when geo_project_registry_ssot_sync is disabled' do
before do before do
stub_feature_flags(geo_project_registry_ssot_sync: false) stub_feature_flags(geo_project_registry_ssot_sync: false)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment