Commit d7d65763 authored by Michael Kozono's avatar Michael Kozono

Merge branch '223104-geo-refactor-finders-part-II' into 'master'

Geo - Refactor finders (Part II)

See merge request gitlab-org/gitlab!40542
parents 394217b4 f9ae015f
...@@ -2,50 +2,6 @@ ...@@ -2,50 +2,6 @@
module Geo module Geo
class AttachmentRegistryFinder < FileRegistryFinder class AttachmentRegistryFinder < FileRegistryFinder
# Returns untracked uploads as well as tracked uploads that are unused.
#
# Untracked uploads is an array where each item is a tuple of [id, file_type]
# that is supposed supposed to be synced but don't yet have a registry entry.
#
# Unused uploads is an array where each item is a tuple of [id, file_type]
# that is not supposed to be synced but already have a registry entry. For
# example:
#
# - orphaned registries
# - records that became excluded from selective sync
# - records that are in object storage, and `sync_object_storage` became
# disabled
#
# We compute both sets in this method to reduce the number of DB queries
# performed.
#
# @return [Array] the first element is an Array of untracked uploads, and the
# second element is an Array of tracked uploads that are unused.
# For example: [[[1, 'avatar'], [5, 'file']], [[3, 'attachment']]]
def find_registry_differences(range)
# rubocop:disable CodeReuse/ActiveRecord
source =
replicables
.id_in(range)
.pluck(::Upload.arel_table[:id], ::Upload.arel_table[:uploader])
.map! { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] }
tracked =
registry_class
.model_id_in(range)
.pluck(:file_id, :file_type)
# rubocop:enable CodeReuse/ActiveRecord
untracked = source - tracked
unused_tracked = tracked - source
[untracked, unused_tracked]
end
def replicables
::Upload.replicables_for_geo_node
end
def registry_class def registry_class
Geo::UploadRegistry Geo::UploadRegistry
end end
......
...@@ -2,48 +2,6 @@ ...@@ -2,48 +2,6 @@
module Geo module Geo
class ContainerRepositoryRegistryFinder < RegistryFinder class ContainerRepositoryRegistryFinder < RegistryFinder
# Returns Geo::ContainerRepositoryRegistry records that have never been synced.
#
# Does not care about selective sync, because it considers the Registry
# table to be the single source of truth. The contract is that other
# processes need to ensure that the table only contains records that should
# be synced.
#
# Any registries that have ever been synced that currently need to be
# resynced will be handled by other find methods (like
# #find_retryable_dirty_registries)
#
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: [])
registry_class
.never_synced
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def find_retryable_dirty_registries(batch_size:, except_ids: [])
registry_class
.failed
.retry_due
.model_id_not_in(except_ids)
.order(Gitlab::Database.nulls_first_order(:last_synced_at))
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
private
def replicables
current_node.container_repositories
end
def registry_class def registry_class
Geo::ContainerRepositoryRegistry Geo::ContainerRepositoryRegistry
end end
......
...@@ -2,47 +2,6 @@ ...@@ -2,47 +2,6 @@
module Geo module Geo
class DesignRegistryFinder < RegistryFinder class DesignRegistryFinder < RegistryFinder
# Returns Geo::DesignRegistry records that have never been synced.
#
# Does not care about selective sync, because it considers the Registry
# table to be the single source of truth. The contract is that other
# processes need to ensure that the table only contains records that should
# be synced.
#
# Any registries that have ever been synced that currently need to be
# resynced will be handled by other find methods (like
# #find_retryable_dirty_registries)
#
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: [])
registry_class
.never_synced
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def find_retryable_dirty_registries(batch_size:, except_ids: [])
registry_class
.updated_recently
.model_id_not_in(except_ids)
.order(Gitlab::Database.nulls_first_order(:last_synced_at))
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
private
def replicables
current_node.designs
end
def registry_class def registry_class
Geo::DesignRegistry Geo::DesignRegistry
end end
......
...@@ -2,59 +2,13 @@ ...@@ -2,59 +2,13 @@
module Geo module Geo
class FileRegistryFinder < RegistryFinder class FileRegistryFinder < RegistryFinder
# @!method count_synced_missing_on_primary # @!method synced_missing_on_primary_count
# Return a count of the registry records for the tracked file_type(s) # Return a count of the registry records for the tracked file_type(s)
# that are synced and missing on the primary # that are synced and missing on the primary
def count_synced_missing_on_primary def synced_missing_on_primary_count
registry_class.synced.missing_on_primary.count registry_class.synced.missing_on_primary.count
end end
# @!method find_never_synced_registries
# Return an ActiveRecord::Relation of the registry records for the
# tracked file_type(s) that have never been synced.
#
# Does not care about selective sync, because it considers the Registry
# table to be the single source of truth. The contract is that other
# processes need to ensure that the table only contains records that should
# be synced.
#
# Any registries that have ever been synced that currently need to be
# resynced will be handled by other find methods (like
# #find_retryable_failed_registries)
#
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
#
# rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: [])
registry_class
.never
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# @!method find_retryable_failed_registries
# Return an ActiveRecord::Relation of registry records marked as failed,
# which are ready to be retried, excluding specified IDs, limited to
# batch_size
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
#
# rubocop:disable CodeReuse/ActiveRecord
def find_retryable_failed_registries(batch_size:, except_ids: [])
registry_class
.failed
.retry_due
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# @!method find_retryable_synced_missing_on_primary_registries # @!method find_retryable_synced_missing_on_primary_registries
# Return an ActiveRecord::Relation of registry records marked as synced # Return an ActiveRecord::Relation of registry records marked as synced
# and missing on the primary, which are ready to be retried, excluding # and missing on the primary, which are ready to be retried, excluding
...@@ -73,9 +27,5 @@ module Geo ...@@ -73,9 +27,5 @@ module Geo
.limit(batch_size) .limit(batch_size)
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
def local_storage_only?
!current_node&.sync_object_storage
end
end end
end end
...@@ -2,10 +2,6 @@ ...@@ -2,10 +2,6 @@
module Geo module Geo
class JobArtifactRegistryFinder < FileRegistryFinder class JobArtifactRegistryFinder < FileRegistryFinder
def replicables
::Ci::JobArtifact.replicables_for_geo_node
end
def registry_class def registry_class
Geo::JobArtifactRegistry Geo::JobArtifactRegistry
end end
......
...@@ -2,18 +2,8 @@ ...@@ -2,18 +2,8 @@
module Geo module Geo
class LfsObjectRegistryFinder < FileRegistryFinder class LfsObjectRegistryFinder < FileRegistryFinder
def replicables
local_storage_only? ? lfs_objects.with_files_stored_locally : lfs_objects
end
def registry_class def registry_class
Geo::LfsObjectRegistry Geo::LfsObjectRegistry
end end
private
def lfs_objects
current_node.lfs_objects
end
end end
end end
...@@ -2,16 +2,16 @@ ...@@ -2,16 +2,16 @@
module Geo module Geo
class ProjectRegistryFinder class ProjectRegistryFinder
# Returns ProjectRegistry records that have never been synced. # Returns ProjectRegistry records where sync has never been attempted.
# #
# Does not care about selective sync, because it considers the Registry # Does not care about selective sync, because it considers the Registry
# table to be the single source of truth. The contract is that other # table to be the single source of truth. The contract is that other
# processes need to ensure that the table only contains records that should # processes need to ensure that the table only contains records that should
# be synced. # be synced.
# #
# Any registries that have ever been synced that currently need to be # Any registries that this secondary has ever attempted to sync that currently need to be
# resynced will be handled by other find methods (like # resynced will be handled by other find methods (like
# #find_retryable_dirty_registries) # #find_registries_needs_sync_again)
# #
# You can pass a list with `except_ids:` so you can exclude items you # You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet # already scheduled but haven't finished and aren't persisted to the database yet
...@@ -19,28 +19,22 @@ module Geo ...@@ -19,28 +19,22 @@ module Geo
# @param [Integer] batch_size used to limit the results returned # @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query # @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: []) def find_registries_never_attempted_sync(batch_size:, except_ids: [])
Geo::ProjectRegistry registry_class
.never_synced .find_registries_never_attempted_sync(batch_size: batch_size, except_ids: except_ids)
.model_id_not_in(except_ids)
.limit(batch_size)
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def find_retryable_dirty_registries(batch_size:, except_ids: []) def find_registries_needs_sync_again(batch_size:, except_ids: [])
Geo::ProjectRegistry registry_class
.dirty .find_registries_needs_sync_again(batch_size: batch_size, except_ids: except_ids)
.retry_due
.model_id_not_in(except_ids)
.order(Gitlab::Database.nulls_first_order(:last_repository_synced_at))
.limit(batch_size)
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def find_project_ids_pending_verification(batch_size:, except_ids: []) def find_project_ids_pending_verification(batch_size:, except_ids: [])
Geo::ProjectRegistry registry_class
.from_union([ .from_union([
repositories_checksummed_pending_verification, repositories_checksummed_pending_verification,
wikis_checksummed_pending_verification wikis_checksummed_pending_verification
...@@ -53,19 +47,23 @@ module Geo ...@@ -53,19 +47,23 @@ module Geo
private private
def registry_class
Geo::ProjectRegistry
end
# rubocop:disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def repositories_checksummed_pending_verification def repositories_checksummed_pending_verification
Geo::ProjectRegistry registry_class
.repositories_checksummed_pending_verification .repositories_checksummed_pending_verification
.select(Geo::ProjectRegistry.arel_table[:project_id]) .select(registry_class.arel_table[:project_id])
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord # rubocop:disable CodeReuse/ActiveRecord
def wikis_checksummed_pending_verification def wikis_checksummed_pending_verification
Geo::ProjectRegistry registry_class
.wikis_checksummed_pending_verification .wikis_checksummed_pending_verification
.select(Geo::ProjectRegistry.arel_table[:project_id]) .select(registry_class.arel_table[:project_id])
end end
# rubocop:enable CodeReuse/ActiveRecord # rubocop:enable CodeReuse/ActiveRecord
end end
......
...@@ -2,56 +2,46 @@ ...@@ -2,56 +2,46 @@
module Geo module Geo
class RegistryFinder class RegistryFinder
include ::Gitlab::Utils::StrongMemoize # @!method find_registries_never_attempted_sync
# Return an ActiveRecord::Relation of the registry records for the
attr_reader :current_node_id # tracked type that this secondary has never attempted to sync.
def initialize(current_node_id: nil)
@current_node_id = current_node_id
end
# @!method find_registry_differences
# Returns untracked IDs as well as tracked IDs that are unused.
# #
# Untracked IDs are model IDs that are supposed to be synced but don't yet # Does not care about selective sync, because it considers the Registry
# have a registry entry. # table to be the single source of truth. The contract is that other
# processes need to ensure that the table only contains records that should
# be synced.
# #
# Unused tracked IDs are model IDs that are not supposed to be synced but # Any registries that this secondary has ever attempted to sync that currently need to be
# already have a registry entry. For example: # resynced will be handled by other find methods (like
# #find_registries_needs_sync_again)
# #
# - orphaned registries # You can pass a list with `except_ids:` so you can exclude items you
# - records that became excluded from selective sync # already scheduled but haven't finished and aren't persisted to the database yet
# - records that are in object storage, and `sync_object_storage` became
# disabled
# #
# We compute both sets in this method to reduce the number of DB queries # @param [Integer] batch_size used to limit the results returned
# performed. # @param [Array<Integer>] except_ids ids that will be ignored from the query
# #
# @return [Array] the first element is an Array of untracked IDs, and the second element is an Array of tracked IDs that are unused # rubocop:disable CodeReuse/ActiveRecord
def find_registry_differences(range) def find_registries_never_attempted_sync(batch_size:, except_ids: [])
source_ids = replicables.id_in(range).pluck(replicable_primary_key) # rubocop:disable CodeReuse/ActiveRecord registry_class
tracked_ids = registry_class.pluck_model_ids_in_range(range) .find_registries_never_attempted_sync(batch_size: batch_size, except_ids: except_ids)
untracked_ids = source_ids - tracked_ids
unused_tracked_ids = tracked_ids - source_ids
[untracked_ids, unused_tracked_ids]
end
# @!method registry_class
# Return an ActiveRecord::Base class for the tracked type
def registry_class
raise NotImplementedError,
"#{self.class} does not implement #{__method__}"
end end
# rubocop:enable CodeReuse/ActiveRecord
# @!method replicables # @!method find_registries_needs_sync_again
# Return an ActiveRecord::Relation of the replicable records for the # Return an ActiveRecord::Relation of registry records marked as failed,
# tracked file_type(s) # which are ready to be retried, excluding specified IDs, limited to
def replicables # batch_size
raise NotImplementedError, #
"#{self.class} does not implement #{__method__}" # @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
#
# rubocop:disable CodeReuse/ActiveRecord
def find_registries_needs_sync_again(batch_size:, except_ids: [])
registry_class
.find_registries_needs_sync_again(batch_size: batch_size, except_ids: except_ids)
end end
# rubocop:enable CodeReuse/ActiveRecord
# @!method registry_count # @!method registry_count
# Return a count of the registry records for the tracked type(s) # Return a count of the registry records for the tracked type(s)
...@@ -73,19 +63,11 @@ module Geo ...@@ -73,19 +63,11 @@ module Geo
registry_class.failed.count registry_class.failed.count
end end
private
def current_node
strong_memoize(:current_node) do
GeoNode.find(current_node_id) if current_node_id
end
end
# @!method registry_class # @!method registry_class
# Return the fully qualified name of the replicable primary key for the # Return an ActiveRecord::Base class for the tracked type
# tracked file_type(s) def registry_class
def replicable_primary_key raise NotImplementedError,
registry_class::MODEL_CLASS.arel_table[:id] "#{self.class} does not implement #{__method__}"
end end
end end
end end
...@@ -35,12 +35,13 @@ module Geo::ReplicableRegistry ...@@ -35,12 +35,13 @@ module Geo::ReplicableRegistry
included do included do
include ::Delay include ::Delay
scope :never, -> { where(last_synced_at: nil) }
scope :failed, -> { with_state(:failed) } scope :failed, -> { with_state(:failed) }
scope :synced, -> { with_state(:synced) } scope :needs_sync_again, -> { failed.retry_due }
scope :never_attempted_sync, -> { pending.where(last_synced_at: nil) }
scope :ordered, -> { order(:id) }
scope :pending, -> { with_state(:pending) } scope :pending, -> { with_state(:pending) }
scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current))) } scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current))) }
scope :ordered, -> { order(:id) } scope :synced, -> { with_state(:synced) }
state_machine :state, initial: :pending do state_machine :state, initial: :pending do
state :pending, value: STATE_VALUES[:pending] state :pending, value: STATE_VALUES[:pending]
......
...@@ -4,9 +4,11 @@ module Geo::Syncable ...@@ -4,9 +4,11 @@ module Geo::Syncable
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
scope :failed, -> { where(success: false) } scope :failed, -> { where(success: false).where.not(retry_count: nil) }
scope :synced, -> { where(success: true) }
scope :retry_due, -> { where('retry_at is NULL OR retry_at < ?', Time.current) }
scope :missing_on_primary, -> { where(missing_on_primary: true) } scope :missing_on_primary, -> { where(missing_on_primary: true) }
scope :needs_sync_again, -> { failed.retry_due }
scope :never_attempted_sync, -> { where(success: false, retry_count: nil) }
scope :retry_due, -> { where('retry_at is NULL OR retry_at < ?', Time.current) }
scope :synced, -> { where(success: true) }
end end
end end
...@@ -8,6 +8,12 @@ module EE ...@@ -8,6 +8,12 @@ module EE
scope :project_id_in, ->(ids) { joins(:project).merge(::Project.id_in(ids)) } scope :project_id_in, ->(ids) { joins(:project).merge(::Project.id_in(ids)) }
end end
class_methods do
def replicables_for_geo_node(node = ::Gitlab::Geo.current_node)
node.container_repositories
end
end
def push_blob(digest, file_path) def push_blob(digest, file_path)
client.push_blob(path, digest, file_path) client.push_blob(path, digest, file_path)
end end
......
...@@ -16,6 +16,13 @@ module EE ...@@ -16,6 +16,13 @@ module EE
scope :project_id_in, ->(ids) { joins(:projects).merge(::Project.id_in(ids)) } scope :project_id_in, ->(ids) { joins(:projects).merge(::Project.id_in(ids)) }
end end
class_methods do
def replicables_for_geo_node(node = ::Gitlab::Geo.current_node)
local_storage_only = !node&.sync_object_storage
local_storage_only ? node.lfs_objects.with_files_stored_locally : node.lfs_objects
end
end
def log_geo_deleted_event def log_geo_deleted_event
::Geo::LfsObjectDeletedEventStore.new(self).create! ::Geo::LfsObjectDeletedEventStore.new(self).create!
end end
......
...@@ -192,6 +192,10 @@ module EE ...@@ -192,6 +192,10 @@ module EE
class_methods do class_methods do
extend ::Gitlab::Utils::Override extend ::Gitlab::Utils::Override
def replicables_for_geo_node(node = ::Gitlab::Geo.current_node)
node.projects
end
def search_by_visibility(level) def search_by_visibility(level)
where(visibility_level: ::Gitlab::VisibilityLevel.string_options[level]) where(visibility_level: ::Gitlab::VisibilityLevel.string_options[level])
end end
......
...@@ -41,31 +41,14 @@ class Geo::BaseRegistry < Geo::TrackingBase ...@@ -41,31 +41,14 @@ class Geo::BaseRegistry < Geo::TrackingBase
end end
end end
def self.replicator_class
self::MODEL_CLASS.replicator_class
end
def self.find_unsynced_registries(batch_size:, except_ids: [])
pending
.model_id_not_in(except_ids)
.limit(batch_size)
end
def self.find_failed_registries(batch_size:, except_ids: [])
failed
.retry_due
.model_id_not_in(except_ids)
.limit(batch_size)
end
def self.has_create_events?
true
end
def self.delete_worker_class def self.delete_worker_class
::Geo::FileRegistryRemovalWorker ::Geo::FileRegistryRemovalWorker
end end
def self.replicator_class
self::MODEL_CLASS.replicator_class
end
def self.find_registry_differences(range) def self.find_registry_differences(range)
source_ids = self::MODEL_CLASS source_ids = self::MODEL_CLASS
.replicables_for_geo_node .replicables_for_geo_node
...@@ -80,6 +63,22 @@ class Geo::BaseRegistry < Geo::TrackingBase ...@@ -80,6 +63,22 @@ class Geo::BaseRegistry < Geo::TrackingBase
[untracked_ids, unused_tracked_ids] [untracked_ids, unused_tracked_ids]
end end
def self.find_registries_never_attempted_sync(batch_size:, except_ids: [])
never_attempted_sync
.model_id_not_in(except_ids)
.limit(batch_size)
end
def self.find_registries_needs_sync_again(batch_size:, except_ids: [])
needs_sync_again
.model_id_not_in(except_ids)
.limit(batch_size)
end
def self.has_create_events?
true
end
def model_record_id def model_record_id
read_attribute(self.class::MODEL_FOREIGN_KEY) read_attribute(self.class::MODEL_FOREIGN_KEY)
end end
......
...@@ -8,10 +8,11 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry ...@@ -8,10 +8,11 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry
belongs_to :container_repository belongs_to :container_repository
scope :never_synced, -> { with_state(:pending).where(last_synced_at: nil) }
scope :failed, -> { with_state(:failed) } scope :failed, -> { with_state(:failed) }
scope :synced, -> { with_state(:synced) } scope :needs_sync_again, -> { failed.retry_due }
scope :never_attempted_sync, -> { with_state(:pending).where(last_synced_at: nil) }
scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current))) } scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current))) }
scope :synced, -> { with_state(:synced) }
state_machine :state, initial: :pending do state_machine :state, initial: :pending do
state :started state :started
...@@ -37,12 +38,8 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry ...@@ -37,12 +38,8 @@ class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry
end end
end end
def self.finder_class def self.find_registries_needs_sync_again(batch_size:, except_ids: [])
::Geo::ContainerRepositoryRegistryFinder super.order(Gitlab::Database.nulls_first_order(:last_synced_at))
end
def self.find_registry_differences(range)
finder_class.new(current_node_id: Gitlab::Geo.current_node.id).find_registry_differences(range)
end end
def self.delete_for_model_ids(container_repository_ids) def self.delete_for_model_ids(container_repository_ids)
......
...@@ -10,11 +10,12 @@ class Geo::DesignRegistry < Geo::BaseRegistry ...@@ -10,11 +10,12 @@ class Geo::DesignRegistry < Geo::BaseRegistry
belongs_to :project belongs_to :project
scope :never_synced, -> { with_state(:pending).where(last_synced_at: nil) } scope :dirty, -> { with_state(:pending).where.not(last_synced_at: nil) }
scope :pending, -> { with_state(:pending) }
scope :failed, -> { with_state(:failed) } scope :failed, -> { with_state(:failed) }
scope :synced, -> { with_state(:synced) } scope :needs_sync_again, -> { dirty.or(failed.retry_due) }
scope :never_attempted_sync, -> { with_state(:pending).where(last_synced_at: nil) }
scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current))) } scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current))) }
scope :synced, -> { with_state(:synced) }
state_machine :state, initial: :pending do state_machine :state, initial: :pending do
state :started state :started
...@@ -50,12 +51,18 @@ class Geo::DesignRegistry < Geo::BaseRegistry ...@@ -50,12 +51,18 @@ class Geo::DesignRegistry < Geo::BaseRegistry
project_ids project_ids
end end
def self.finder_class def self.find_registry_differences(range)
::Geo::DesignRegistryFinder source_ids = Gitlab::Geo.current_node.designs.id_in(range).pluck_primary_key
tracked_ids = self.pluck_model_ids_in_range(range)
untracked_ids = source_ids - tracked_ids
unused_tracked_ids = tracked_ids - source_ids
[untracked_ids, unused_tracked_ids]
end end
def self.find_registry_differences(range) def self.find_registries_needs_sync_again(batch_size:, except_ids: [])
finder_class.new(current_node_id: Gitlab::Geo.current_node.id).find_registry_differences(range) super.order(Gitlab::Database.nulls_first_order(:last_synced_at))
end end
# Search for a list of projects associated with registries, # Search for a list of projects associated with registries,
...@@ -75,10 +82,6 @@ class Geo::DesignRegistry < Geo::BaseRegistry ...@@ -75,10 +82,6 @@ class Geo::DesignRegistry < Geo::BaseRegistry
designs_repositories designs_repositories
end end
def self.updated_recently
pending.or(failed.retry_due)
end
def fail_sync!(message, error, attrs = {}) def fail_sync!(message, error, attrs = {})
new_retry_count = retry_count + 1 new_retry_count = retry_count + 1
......
...@@ -6,20 +6,6 @@ class Geo::JobArtifactRegistry < Geo::BaseRegistry ...@@ -6,20 +6,6 @@ class Geo::JobArtifactRegistry < Geo::BaseRegistry
MODEL_CLASS = ::Ci::JobArtifact MODEL_CLASS = ::Ci::JobArtifact
MODEL_FOREIGN_KEY = :artifact_id MODEL_FOREIGN_KEY = :artifact_id
scope :never, -> { where(success: false, retry_count: nil) }
def self.failed
where(success: false).where.not(retry_count: nil)
end
def self.finder_class
::Geo::JobArtifactRegistryFinder
end
def self.find_registry_differences(range)
finder_class.new(current_node_id: Gitlab::Geo.current_node.id).find_registry_differences(range)
end
# When false, RegistryConsistencyService will frequently check the end of the # When false, RegistryConsistencyService will frequently check the end of the
# table to quickly handle new replicables. # table to quickly handle new replicables.
def self.has_create_events? def self.has_create_events?
......
...@@ -11,20 +11,6 @@ class Geo::LfsObjectRegistry < Geo::BaseRegistry ...@@ -11,20 +11,6 @@ class Geo::LfsObjectRegistry < Geo::BaseRegistry
belongs_to :lfs_object, class_name: 'LfsObject' belongs_to :lfs_object, class_name: 'LfsObject'
scope :never, -> { where(success: false, retry_count: nil) }
def self.failed
where(success: false).where.not(retry_count: nil)
end
def self.finder_class
::Geo::LfsObjectRegistryFinder
end
def self.find_registry_differences(range)
finder_class.new(current_node_id: Gitlab::Geo.current_node.id).find_registry_differences(range)
end
# If false, RegistryConsistencyService will frequently check the end of the # If false, RegistryConsistencyService will frequently check the end of the
# table to quickly handle new replicables. # table to quickly handle new replicables.
def self.has_create_events? def self.has_create_events?
......
...@@ -21,8 +21,9 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -21,8 +21,9 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
validates :project, presence: true, uniqueness: true validates :project, presence: true, uniqueness: true
scope :never_synced, -> { where(last_repository_synced_at: nil) }
scope :dirty, -> { where(arel_table[:resync_repository].eq(true).or(arel_table[:resync_wiki].eq(true))) } scope :dirty, -> { where(arel_table[:resync_repository].eq(true).or(arel_table[:resync_wiki].eq(true))) }
scope :needs_sync_again, -> { dirty.retry_due }
scope :never_attempted_sync, -> { where(last_repository_synced_at: nil) }
scope :synced_repos, -> { where(resync_repository: false) } scope :synced_repos, -> { where(resync_repository: false) }
scope :synced_wikis, -> { where(resync_wiki: false) } scope :synced_wikis, -> { where(resync_wiki: false) }
scope :failed_repos, -> { where(arel_table[:repository_retry_count].gt(0)) } scope :failed_repos, -> { where(arel_table[:repository_retry_count].gt(0)) }
...@@ -43,14 +44,8 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -43,14 +44,8 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
where(nil).pluck(:project_id) where(nil).pluck(:project_id)
end end
def self.find_registry_differences(range) def self.find_registries_needs_sync_again(batch_size:, except_ids: [])
source_ids = Gitlab::Geo.current_node.projects.id_in(range).pluck_primary_key super.order(Gitlab::Database.nulls_first_order(:last_repository_synced_at))
tracked_ids = self.pluck_model_ids_in_range(range)
untracked_ids = source_ids - tracked_ids
unused_tracked_ids = tracked_ids - source_ids
[untracked_ids, unused_tracked_ids]
end end
def self.delete_worker_class def self.delete_worker_class
......
...@@ -10,16 +10,43 @@ class Geo::UploadRegistry < Geo::BaseRegistry ...@@ -10,16 +10,43 @@ class Geo::UploadRegistry < Geo::BaseRegistry
belongs_to :upload, foreign_key: :file_id belongs_to :upload, foreign_key: :file_id
scope :failed, -> { where(success: false).where.not(retry_count: nil) }
scope :fresh, -> { order(created_at: :desc) } scope :fresh, -> { order(created_at: :desc) }
scope :never, -> { where(success: false, retry_count: nil) }
def self.finder_class
::Geo::AttachmentRegistryFinder
end
# Returns untracked uploads as well as tracked uploads that are unused.
#
# Untracked uploads is an array where each item is a tuple of [id, file_type]
# that is supposed to be synced but don't yet have a registry entry.
#
# Unused uploads is an array where each item is a tuple of [id, file_type]
# that is not supposed to be synced but already have a registry entry. For
# example:
#
# - orphaned registries
# - records that became excluded from selective sync
# - records that are in object storage, and `sync_object_storage` became
# disabled
#
# We compute both sets in this method to reduce the number of DB queries
# performed.
#
# @return [Array] the first element is an Array of untracked uploads, and the
# second element is an Array of tracked uploads that are unused.
# For example: [[[1, 'avatar'], [5, 'file']], [[3, 'attachment']]]
def self.find_registry_differences(range) def self.find_registry_differences(range)
finder_class.new(current_node_id: Gitlab::Geo.current_node.id).find_registry_differences(range) source =
self::MODEL_CLASS.replicables_for_geo_node
.id_in(range)
.pluck(self::MODEL_CLASS.arel_table[:id], self::MODEL_CLASS.arel_table[:uploader])
.map! { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] }
tracked =
self.model_id_in(range)
.pluck(:file_id, :file_type)
untracked = source - tracked
unused_tracked = tracked - source
[untracked, unused_tracked]
end end
# If false, RegistryConsistencyService will frequently check the end of the # If false, RegistryConsistencyService will frequently check the end of the
...@@ -52,9 +79,8 @@ class Geo::UploadRegistry < Geo::BaseRegistry ...@@ -52,9 +79,8 @@ class Geo::UploadRegistry < Geo::BaseRegistry
case status case status
when 'synced', 'failed' when 'synced', 'failed'
self.public_send(status) # rubocop: disable GitlabSecurity/PublicSend self.public_send(status) # rubocop: disable GitlabSecurity/PublicSend
# Explained via: https://gitlab.com/gitlab-org/gitlab/-/issues/216049
when 'pending' when 'pending'
self.never never_attempted_sync
else else
all all
end end
......
...@@ -483,7 +483,7 @@ class GeoNodeStatus < ApplicationRecord ...@@ -483,7 +483,7 @@ class GeoNodeStatus < ApplicationRecord
self.lfs_objects_synced_count = lfs_objects_finder.synced_count self.lfs_objects_synced_count = lfs_objects_finder.synced_count
self.lfs_objects_failed_count = lfs_objects_finder.failed_count self.lfs_objects_failed_count = lfs_objects_finder.failed_count
self.lfs_objects_registry_count = lfs_objects_finder.registry_count self.lfs_objects_registry_count = lfs_objects_finder.registry_count
self.lfs_objects_synced_missing_on_primary_count = lfs_objects_finder.count_synced_missing_on_primary self.lfs_objects_synced_missing_on_primary_count = lfs_objects_finder.synced_missing_on_primary_count
end end
def load_job_artifacts_data def load_job_artifacts_data
...@@ -493,7 +493,7 @@ class GeoNodeStatus < ApplicationRecord ...@@ -493,7 +493,7 @@ class GeoNodeStatus < ApplicationRecord
self.job_artifacts_synced_count = job_artifacts_finder.synced_count self.job_artifacts_synced_count = job_artifacts_finder.synced_count
self.job_artifacts_failed_count = job_artifacts_finder.failed_count self.job_artifacts_failed_count = job_artifacts_finder.failed_count
self.job_artifacts_registry_count = job_artifacts_finder.registry_count self.job_artifacts_registry_count = job_artifacts_finder.registry_count
self.job_artifacts_synced_missing_on_primary_count = job_artifacts_finder.count_synced_missing_on_primary self.job_artifacts_synced_missing_on_primary_count = job_artifacts_finder.synced_missing_on_primary_count
end end
def load_attachments_data def load_attachments_data
...@@ -503,7 +503,7 @@ class GeoNodeStatus < ApplicationRecord ...@@ -503,7 +503,7 @@ class GeoNodeStatus < ApplicationRecord
self.attachments_synced_count = attachments_finder.synced_count self.attachments_synced_count = attachments_finder.synced_count
self.attachments_failed_count = attachments_finder.failed_count self.attachments_failed_count = attachments_finder.failed_count
self.attachments_registry_count = attachments_finder.registry_count self.attachments_registry_count = attachments_finder.registry_count
self.attachments_synced_missing_on_primary_count = attachments_finder.count_synced_missing_on_primary self.attachments_synced_missing_on_primary_count = attachments_finder.synced_missing_on_primary_count
end end
def load_container_registry_data def load_container_registry_data
...@@ -581,23 +581,23 @@ class GeoNodeStatus < ApplicationRecord ...@@ -581,23 +581,23 @@ class GeoNodeStatus < ApplicationRecord
end end
def attachments_finder def attachments_finder
@attachments_finder ||= Geo::AttachmentRegistryFinder.new(current_node_id: geo_node.id) @attachments_finder ||= Geo::AttachmentRegistryFinder.new
end end
def lfs_objects_finder def lfs_objects_finder
@lfs_objects_finder ||= Geo::LfsObjectRegistryFinder.new(current_node_id: geo_node.id) @lfs_objects_finder ||= Geo::LfsObjectRegistryFinder.new
end end
def job_artifacts_finder def job_artifacts_finder
@job_artifacts_finder ||= Geo::JobArtifactRegistryFinder.new(current_node_id: geo_node.id) @job_artifacts_finder ||= Geo::JobArtifactRegistryFinder.new
end end
def container_registry_finder def container_registry_finder
@container_registry_finder ||= Geo::ContainerRepositoryRegistryFinder.new(current_node_id: geo_node.id) @container_registry_finder ||= Geo::ContainerRepositoryRegistryFinder.new
end end
def design_registry_finder def design_registry_finder
@design_registry_finder ||= Geo::DesignRegistryFinder.new(current_node_id: geo_node.id) @design_registry_finder ||= Geo::DesignRegistryFinder.new
end end
def repository_verification_finder def repository_verification_finder
......
...@@ -36,30 +36,30 @@ module Geo ...@@ -36,30 +36,30 @@ module Geo
# #
# @return [Array] resources to be transferred # @return [Array] resources to be transferred
def load_pending_resources def load_pending_resources
resources = find_container_repository_ids_not_synced(batch_size: db_retrieve_batch_size) resources = find_jobs_never_attempted_sync(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.size remaining_capacity = db_retrieve_batch_size - resources.size
if remaining_capacity == 0 if remaining_capacity == 0
resources resources
else else
resources + find_retryable_container_registry_ids(batch_size: remaining_capacity) resources + find_jobs_needs_sync_again(batch_size: remaining_capacity)
end end
end end
def find_container_repository_ids_not_synced(batch_size:) def find_jobs_never_attempted_sync(batch_size:)
registry_finder registry_finder
.find_never_synced_registries(batch_size: batch_size, except_ids: scheduled_repository_ids) .find_registries_never_attempted_sync(batch_size: batch_size, except_ids: scheduled_repository_ids)
.pluck_model_foreign_key .pluck_model_foreign_key
end end
def find_retryable_container_registry_ids(batch_size:) def find_jobs_needs_sync_again(batch_size:)
registry_finder registry_finder
.find_retryable_dirty_registries(batch_size: batch_size, except_ids: scheduled_repository_ids) .find_registries_needs_sync_again(batch_size: batch_size, except_ids: scheduled_repository_ids)
.pluck_model_foreign_key .pluck_model_foreign_key
end end
def registry_finder def registry_finder
@registry_finder ||= Geo::ContainerRepositoryRegistryFinder.new(current_node_id: current_node.id) @registry_finder ||= Geo::ContainerRepositoryRegistryFinder.new
end end
end end
end end
...@@ -11,10 +11,10 @@ module Geo ...@@ -11,10 +11,10 @@ module Geo
end end
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_project_ids_not_synced(except_ids:, batch_size:) def find_jobs_never_attempted_sync(except_ids:, batch_size:)
project_ids = project_ids =
registry_finder registry_finder
.find_never_synced_registries(batch_size: batch_size, except_ids: except_ids) .find_registries_never_attempted_sync(batch_size: batch_size, except_ids: except_ids)
.pluck_model_foreign_key .pluck_model_foreign_key
find_project_ids_within_shard(project_ids, direction: :desc) find_project_ids_within_shard(project_ids, direction: :desc)
...@@ -22,10 +22,10 @@ module Geo ...@@ -22,10 +22,10 @@ module Geo
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_project_ids_updated_recently(except_ids:, batch_size:) def find_jobs_needs_sync_again(except_ids:, batch_size:)
project_ids = project_ids =
registry_finder registry_finder
.find_retryable_dirty_registries(batch_size: batch_size, except_ids: except_ids) .find_registries_needs_sync_again(batch_size: batch_size, except_ids: except_ids)
.pluck_model_foreign_key .pluck_model_foreign_key
find_project_ids_within_shard(project_ids, direction: :asc) find_project_ids_within_shard(project_ids, direction: :asc)
......
...@@ -33,7 +33,7 @@ module Geo ...@@ -33,7 +33,7 @@ module Geo
# #
# @return [Array] resources to be transferred # @return [Array] resources to be transferred
def load_pending_resources def load_pending_resources
resources = find_unsynced_jobs(batch_size: db_retrieve_batch_size) resources = find_jobs_never_attempted_sync(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.count remaining_capacity = db_retrieve_batch_size - resources.count
if remaining_capacity == 0 if remaining_capacity == 0
...@@ -43,12 +43,13 @@ module Geo ...@@ -43,12 +43,13 @@ module Geo
end end
end end
# Get a batch of unsynced resources, taking equal parts from each resource. # Get a batch of resources that never have an attempt to sync, taking
# equal parts from each resource.
# #
# @return [Array] job arguments of unsynced resources # @return [Array] job arguments of resources that never have an attempt to sync
def find_unsynced_jobs(batch_size:) def find_jobs_never_attempted_sync(batch_size:)
jobs = job_finders.reduce([]) do |jobs, job_finder| jobs = job_finders.reduce([]) do |jobs, job_finder|
jobs << job_finder.find_unsynced_jobs(batch_size: batch_size) jobs << job_finder.find_jobs_never_attempted_sync(batch_size: batch_size)
end end
take_batch(*jobs, batch_size: batch_size) take_batch(*jobs, batch_size: batch_size)
...@@ -60,8 +61,8 @@ module Geo ...@@ -60,8 +61,8 @@ module Geo
# @return [Array] job arguments of low priority resources # @return [Array] job arguments of low priority resources
def find_low_priority_jobs(batch_size:) def find_low_priority_jobs(batch_size:)
jobs = job_finders.reduce([]) do |jobs, job_finder| jobs = job_finders.reduce([]) do |jobs, job_finder|
jobs << job_finder.find_failed_jobs(batch_size: batch_size) jobs << job_finder.find_jobs_needs_sync_again(batch_size: batch_size)
jobs << job_finder.find_synced_missing_on_primary_jobs(batch_size: batch_size) jobs << job_finder.find_jobs_synced_missing_on_primary(batch_size: batch_size)
end end
take_batch(*jobs, batch_size: batch_size) take_batch(*jobs, batch_size: batch_size)
......
...@@ -6,7 +6,7 @@ module Geo ...@@ -6,7 +6,7 @@ module Geo
EXCEPT_RESOURCE_IDS_KEY = :except_ids EXCEPT_RESOURCE_IDS_KEY = :except_ids
def registry_finder def registry_finder
@registry_finder ||= Geo::AttachmentRegistryFinder.new(current_node_id: Gitlab::Geo.current_node.id) @registry_finder ||= Geo::AttachmentRegistryFinder.new
end end
private private
......
...@@ -8,7 +8,7 @@ module Geo ...@@ -8,7 +8,7 @@ module Geo
FILE_SERVICE_OBJECT_TYPE = :job_artifact FILE_SERVICE_OBJECT_TYPE = :job_artifact
def registry_finder def registry_finder
@registry_finder ||= Geo::JobArtifactRegistryFinder.new(current_node_id: Gitlab::Geo.current_node.id) @registry_finder ||= Geo::JobArtifactRegistryFinder.new
end end
end end
end end
......
...@@ -21,19 +21,19 @@ module Geo ...@@ -21,19 +21,19 @@ module Geo
@scheduled_file_ids = scheduled_file_ids @scheduled_file_ids = scheduled_file_ids
end end
def find_unsynced_jobs(batch_size:) def find_jobs_never_attempted_sync(batch_size:)
convert_registry_relation_to_job_args( convert_registry_relation_to_job_args(
registry_finder.find_never_synced_registries(find_batch_params(batch_size)) registry_finder.find_registries_never_attempted_sync(find_batch_params(batch_size))
) )
end end
def find_failed_jobs(batch_size:) def find_jobs_needs_sync_again(batch_size:)
convert_registry_relation_to_job_args( convert_registry_relation_to_job_args(
registry_finder.find_retryable_failed_registries(find_batch_params(batch_size)) registry_finder.find_registries_needs_sync_again(find_batch_params(batch_size))
) )
end end
def find_synced_missing_on_primary_jobs(batch_size:) def find_jobs_synced_missing_on_primary(batch_size:)
convert_registry_relation_to_job_args( convert_registry_relation_to_job_args(
registry_finder.find_retryable_synced_missing_on_primary_registries(find_batch_params(batch_size)) registry_finder.find_retryable_synced_missing_on_primary_registries(find_batch_params(batch_size))
) )
......
...@@ -8,7 +8,7 @@ module Geo ...@@ -8,7 +8,7 @@ module Geo
FILE_SERVICE_OBJECT_TYPE = :lfs FILE_SERVICE_OBJECT_TYPE = :lfs
def registry_finder def registry_finder
@registry_finder ||= Geo::LfsObjectRegistryFinder.new(current_node_id: Gitlab::Geo.current_node.id) @registry_finder ||= Geo::LfsObjectRegistryFinder.new
end end
end end
end end
......
...@@ -32,25 +32,26 @@ module Geo ...@@ -32,25 +32,26 @@ module Geo
# #
# @return [Array] resources to be transferred # @return [Array] resources to be transferred
def load_pending_resources def load_pending_resources
resources = find_unsynced_jobs(batch_size: db_retrieve_batch_size) resources = find_jobs_never_attempted_sync(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.count remaining_capacity = db_retrieve_batch_size - resources.count
if remaining_capacity == 0 if remaining_capacity == 0
resources resources
else else
resources + find_low_priority_jobs(batch_size: remaining_capacity) resources + find_jobs_needs_sync_again(batch_size: remaining_capacity)
end end
end end
# Get a batch of unsynced resources, taking equal parts from each resource. # Get a batch of resources that never have an attempt to sync, taking
# equal parts from each resource.
# #
# @return [Array] job arguments of unsynced resources # @return [Array] job arguments of resources that never have an attempt to sync
def find_unsynced_jobs(batch_size:) def find_jobs_never_attempted_sync(batch_size:)
jobs = replicator_classes.reduce([]) do |jobs, replicator_class| jobs = replicator_classes.reduce([]) do |jobs, replicator_class|
except_ids = scheduled_replicable_ids(replicator_class.replicable_name) except_ids = scheduled_replicable_ids(replicator_class.replicable_name)
jobs << replicator_class jobs << replicator_class
.find_unsynced_registries(batch_size: batch_size, except_ids: except_ids) .find_registries_never_attempted_sync(batch_size: batch_size, except_ids: except_ids)
.map { |registry| [replicator_class.replicable_name, registry.model_record_id] } .map { |registry| [replicator_class.replicable_name, registry.model_record_id] }
end end
...@@ -61,12 +62,12 @@ module Geo ...@@ -61,12 +62,12 @@ module Geo
# equal parts from each resource. # equal parts from each resource.
# #
# @return [Array] job arguments of low priority resources # @return [Array] job arguments of low priority resources
def find_low_priority_jobs(batch_size:) def find_jobs_needs_sync_again(batch_size:)
jobs = replicator_classes.reduce([]) do |jobs, replicator_class| jobs = replicator_classes.reduce([]) do |jobs, replicator_class|
except_ids = scheduled_replicable_ids(replicator_class.replicable_name) except_ids = scheduled_replicable_ids(replicator_class.replicable_name)
jobs << replicator_class jobs << replicator_class
.find_failed_registries(batch_size: batch_size, except_ids: except_ids) .find_registries_needs_sync_again(batch_size: batch_size, except_ids: except_ids)
.map { |registry| [replicator_class.replicable_name, registry.model_record_id] } .map { |registry| [replicator_class.replicable_name, registry.model_record_id] }
end end
......
...@@ -64,21 +64,21 @@ module Geo ...@@ -64,21 +64,21 @@ module Geo
def load_pending_resources def load_pending_resources
return [] unless valid_shard? return [] unless valid_shard?
resources = find_project_ids_not_synced(except_ids: scheduled_project_ids, batch_size: db_retrieve_batch_size) resources = find_jobs_never_attempted_sync(except_ids: scheduled_project_ids, batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.size remaining_capacity = db_retrieve_batch_size - resources.size
if remaining_capacity == 0 if remaining_capacity == 0
resources resources
else else
resources + find_project_ids_updated_recently(except_ids: scheduled_project_ids + resources, batch_size: remaining_capacity) resources + find_jobs_needs_sync_again(except_ids: scheduled_project_ids + resources, batch_size: remaining_capacity)
end end
end end
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_project_ids_not_synced(except_ids:, batch_size:) def find_jobs_never_attempted_sync(except_ids:, batch_size:)
project_ids = project_ids =
registry_finder registry_finder
.find_never_synced_registries(batch_size: batch_size, except_ids: except_ids) .find_registries_never_attempted_sync(batch_size: batch_size, except_ids: except_ids)
.pluck_model_foreign_key .pluck_model_foreign_key
find_project_ids_within_shard(project_ids, direction: :desc) find_project_ids_within_shard(project_ids, direction: :desc)
...@@ -86,10 +86,10 @@ module Geo ...@@ -86,10 +86,10 @@ module Geo
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_project_ids_updated_recently(except_ids:, batch_size:) def find_jobs_needs_sync_again(except_ids:, batch_size:)
project_ids = project_ids =
registry_finder registry_finder
.find_retryable_dirty_registries(batch_size: batch_size, except_ids: except_ids) .find_registries_needs_sync_again(batch_size: batch_size, except_ids: except_ids)
.pluck_model_foreign_key .pluck_model_foreign_key
find_project_ids_within_shard(project_ids, direction: :asc) find_project_ids_within_shard(project_ids, direction: :asc)
......
...@@ -22,7 +22,7 @@ module Gitlab ...@@ -22,7 +22,7 @@ module Gitlab
delegate :in_replicables_for_geo_node?, to: :model_record delegate :in_replicables_for_geo_node?, to: :model_record
class << self class << self
delegate :find_unsynced_registries, :find_failed_registries, to: :registry_class delegate :find_registries_never_attempted_sync, :find_registries_needs_sync_again, to: :registry_class
end end
# Declare supported event # Declare supported event
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::FileRegistryFinder, :geo do
context 'with abstract methods' do
%w[
replicables
registry_class
].each do |required_method|
it "requires subclasses to implement #{required_method}" do
expect { subject.send(required_method) }.to raise_error(NotImplementedError)
end
end
end
describe '#local_storage_only?' do
subject { described_class.new(current_node_id: geo_node.id) }
context 'sync_object_storage is enabled' do
let(:geo_node) { create(:geo_node, sync_object_storage: true) }
it 'returns false' do
expect(subject.local_storage_only?).to be_falsey
end
end
context 'sync_object_storage is disabled' do
let(:geo_node) { create(:geo_node, sync_object_storage: false) }
it 'returns true' do
expect(subject.local_storage_only?).to be_truthy
end
end
end
end
...@@ -17,29 +17,29 @@ RSpec.describe Geo::ProjectRegistryFinder, :geo do ...@@ -17,29 +17,29 @@ RSpec.describe Geo::ProjectRegistryFinder, :geo do
let_it_be(:registry_project_5) { create(:geo_project_registry, :wiki_dirty, project_id: project_5.id, last_repository_synced_at: 5.days.ago) } let_it_be(:registry_project_5) { create(:geo_project_registry, :wiki_dirty, project_id: project_5.id, last_repository_synced_at: 5.days.ago) }
let_it_be(:registry_project_6) { create(:geo_project_registry, project_id: project_6.id) } let_it_be(:registry_project_6) { create(:geo_project_registry, project_id: project_6.id) }
describe '#find_never_synced_registries' do describe '#find_registries_never_attempted_sync' do
it 'returns registries for projects that have never been synced' do it 'returns registries for projects that have never have an attempt to sync' do
registries = subject.find_never_synced_registries(batch_size: 10) registries = subject.find_registries_never_attempted_sync(batch_size: 10)
expect(registries).to match_ids(registry_project_3, registry_project_6) expect(registries).to match_ids(registry_project_3, registry_project_6)
end end
it 'excludes except_ids' do it 'excludes except_ids' do
registries = subject.find_never_synced_registries(batch_size: 10, except_ids: [project_3.id]) registries = subject.find_registries_never_attempted_sync(batch_size: 10, except_ids: [project_3.id])
expect(registries).to match_ids(registry_project_6) expect(registries).to match_ids(registry_project_6)
end end
end end
describe '#find_retryable_dirty_registries' do describe '#find_registries_needs_sync_again' do
it 'returns registries for projects that have been recently updated or that have never been synced' do it 'returns registries for dirty projects or that have failed to sync' do
registries = subject.find_retryable_dirty_registries(batch_size: 10) registries = subject.find_registries_needs_sync_again(batch_size: 10)
expect(registries).to match_ids(registry_project_2, registry_project_3, registry_project_4, registry_project_5, registry_project_6) expect(registries).to match_ids(registry_project_2, registry_project_3, registry_project_4, registry_project_5, registry_project_6)
end end
it 'excludes except_ids' do it 'excludes except_ids' do
registries = subject.find_retryable_dirty_registries(batch_size: 10, except_ids: [project_4.id, project_5.id, project_6.id]) registries = subject.find_registries_needs_sync_again(batch_size: 10, except_ids: [project_4.id, project_5.id, project_6.id])
expect(registries).to match_ids(registry_project_2, registry_project_3) expect(registries).to match_ids(registry_project_2, registry_project_3)
end end
......
...@@ -3,23 +3,23 @@ ...@@ -3,23 +3,23 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Geo::ContainerRepositoryRegistry, :geo do RSpec.describe Geo::ContainerRepositoryRegistry, :geo do
include ::EE::GeoHelpers
it_behaves_like 'a BulkInsertSafe model', Geo::ContainerRepositoryRegistry do it_behaves_like 'a BulkInsertSafe model', Geo::ContainerRepositoryRegistry do
let(:valid_items_for_bulk_insertion) { build_list(:container_repository_registry, 10, created_at: Time.zone.now) } let(:valid_items_for_bulk_insertion) { build_list(:container_repository_registry, 10, created_at: Time.zone.now) }
let(:invalid_items_for_bulk_insertion) { [] } # class does not have any validations defined let(:invalid_items_for_bulk_insertion) { [] } # class does not have any validations defined
end end
let_it_be(:registry) { create(:container_repository_registry) } it_behaves_like 'a Geo registry' do
let(:registry) { create(:container_repository_registry) }
end
describe 'relationships' do describe 'relationships' do
it { is_expected.to belong_to(:container_repository) } it { is_expected.to belong_to(:container_repository) }
end end
it_behaves_like 'a Geo registry' do
let(:registry) { create(:container_repository_registry) }
end
describe '#finish_sync!' do describe '#finish_sync!' do
let(:registry) { create(:container_repository_registry, :sync_started) } let_it_be(:registry) { create(:container_repository_registry, :sync_started) }
it 'finishes registry record' do it 'finishes registry record' do
registry.finish_sync! registry.finish_sync!
...@@ -58,6 +58,158 @@ RSpec.describe Geo::ContainerRepositoryRegistry, :geo do ...@@ -58,6 +58,158 @@ RSpec.describe Geo::ContainerRepositoryRegistry, :geo do
end end
end end
describe '.find_registry_differences' do
let_it_be(:secondary) { create(:geo_node) }
let_it_be(:synced_group) { create(:group) }
let_it_be(:nested_group) { create(:group, parent: synced_group) }
let_it_be(:project_synced_group) { create(:project, group: synced_group) }
let_it_be(:project_nested_group) { create(:project, group: nested_group) }
let_it_be(:project_broken_storage) { create(:project, :broken_storage) }
let_it_be(:container_repository_1) { create(:container_repository, project: project_synced_group) }
let_it_be(:container_repository_2) { create(:container_repository, project: project_nested_group) }
let_it_be(:container_repository_3) { create(:container_repository) }
let_it_be(:container_repository_4) { create(:container_repository) }
let_it_be(:container_repository_5) { create(:container_repository, project: project_broken_storage) }
let_it_be(:container_repository_6) { create(:container_repository, project: project_broken_storage) }
before do
stub_current_geo_node(secondary)
stub_registry_replication_config(enabled: true)
end
context 'untracked IDs' do
before do
create(:container_repository_registry, container_repository_id: container_repository_1.id)
create(:container_repository_registry, :sync_failed, container_repository_id: container_repository_3.id)
create(:container_repository_registry, container_repository_id: container_repository_5.id)
end
it 'includes container registries IDs without an entry on the tracking database' do
range = ContainerRepository.minimum(:id)..ContainerRepository.maximum(:id)
untracked_ids, _ = described_class.find_registry_differences(range)
expect(untracked_ids).to match_array([container_repository_2.id, container_repository_4.id, container_repository_6.id])
end
it 'excludes container registries outside the ID range' do
untracked_ids, _ = described_class.find_registry_differences(container_repository_4.id..container_repository_6.id)
expect(untracked_ids).to match_array([container_repository_4.id, container_repository_6.id])
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'excludes container_registry IDs that projects are not in the selected namespaces' do
range = ContainerRepository.minimum(:id)..ContainerRepository.maximum(:id)
untracked_ids, _ = described_class.find_registry_differences(range)
expect(untracked_ids).to match_array([container_repository_2.id])
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
it 'excludes container_registry IDs that projects are not in the selected shards' do
range = ContainerRepository.minimum(:id)..ContainerRepository.maximum(:id)
untracked_ids, _ = described_class.find_registry_differences(range)
expect(untracked_ids).to match_array([container_repository_6.id])
end
end
end
context 'unused tracked IDs' do
context 'with an orphaned registry' do
let!(:orphaned) { create(:container_repository_registry, container_repository_id: container_repository_1.id) }
before do
container_repository_1.delete
end
it 'includes tracked IDs that do not exist in the model table' do
range = container_repository_1.id..container_repository_1.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([container_repository_1.id])
end
it 'excludes IDs outside the ID range' do
range = (container_repository_1.id + 1)..ContainerRepository.maximum(:id)
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
context 'with a tracked container_registry' do
context 'excluded from selective sync' do
let!(:registry_entry) { create(:container_repository_registry, container_repository_id: container_repository_3.id) }
it 'includes tracked container_registry IDs that exist but are not in a selectively synced project' do
range = container_repository_3.id..container_repository_3.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([container_repository_3.id])
end
end
context 'included in selective sync' do
let!(:registry_entry) { create(:container_repository_registry, container_repository_id: container_repository_1.id) }
it 'excludes tracked container_registry IDs that are in selectively synced projects' do
range = container_repository_1.id..container_repository_1.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
context 'with a tracked container_registry' do
let!(:registry_entry) { create(:container_repository_registry, container_repository_id: container_repository_1.id) }
context 'excluded from selective sync' do
it 'includes tracked container_registry IDs that exist but are not in a selectively synced project' do
range = container_repository_1.id..container_repository_1.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([container_repository_1.id])
end
end
context 'included in selective sync' do
let!(:registry_entry) { create(:container_repository_registry, container_repository_id: container_repository_5.id) }
it 'excludes tracked container_registry IDs that are in selectively synced projects' do
range = container_repository_5.id..container_repository_5.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
end
end
end
end
describe '.replication_enabled?' do describe '.replication_enabled?' do
it 'returns true when registry replication is enabled' do it 'returns true when registry replication is enabled' do
stub_geo_setting(registry_replication: { enabled: true }) stub_geo_setting(registry_replication: { enabled: true })
......
...@@ -3,6 +3,8 @@ ...@@ -3,6 +3,8 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Geo::DesignRegistry, :geo do RSpec.describe Geo::DesignRegistry, :geo do
include ::EE::GeoHelpers
it_behaves_like 'a BulkInsertSafe model', Geo::DesignRegistry do it_behaves_like 'a BulkInsertSafe model', Geo::DesignRegistry do
let(:valid_items_for_bulk_insertion) { build_list(:geo_design_registry, 10, created_at: Time.zone.now) } let(:valid_items_for_bulk_insertion) { build_list(:geo_design_registry, 10, created_at: Time.zone.now) }
let(:invalid_items_for_bulk_insertion) { [] } # class does not have any validations defined let(:invalid_items_for_bulk_insertion) { [] } # class does not have any validations defined
...@@ -16,6 +18,172 @@ RSpec.describe Geo::DesignRegistry, :geo do ...@@ -16,6 +18,172 @@ RSpec.describe Geo::DesignRegistry, :geo do
let(:registry) { create(:geo_design_registry) } let(:registry) { create(:geo_design_registry) }
end end
describe '.find_registry_differences' do
let_it_be(:secondary) { create(:geo_node) }
let_it_be(:synced_group) { create(:group) }
let_it_be(:nested_group) { create(:group, parent: synced_group) }
let_it_be(:project_1) { create(:project, group: synced_group) }
let_it_be(:project_2) { create(:project, group: nested_group) }
let_it_be(:project_3) { create(:project) }
let_it_be(:project_4) { create(:project) }
let_it_be(:project_5) { create(:project, :broken_storage) }
let_it_be(:project_6) { create(:project, :broken_storage) }
let_it_be(:project_7) { create(:project) }
before do
stub_current_geo_node(secondary)
end
before_all do
create(:design, project: project_1)
create(:design, project: project_2)
create(:design, project: project_3)
create(:design, project: project_4)
create(:design, project: project_5)
create(:design, project: project_6)
end
context 'untracked IDs' do
before do
create(:geo_design_registry, project_id: project_1.id)
create(:geo_design_registry, :sync_failed, project_id: project_3.id)
create(:geo_design_registry, project_id: project_5.id)
end
it 'includes project IDs without an entry on the tracking database' do
range = Project.minimum(:id)..Project.maximum(:id)
untracked_ids, _ = described_class.find_registry_differences(range)
expect(untracked_ids).to match_array([project_2.id, project_4.id, project_6.id])
end
it 'excludes projects outside the ID range' do
untracked_ids, _ = described_class.find_registry_differences(project_4.id..project_6.id)
expect(untracked_ids).to match_array([project_4.id, project_6.id])
end
it 'excludes projects without designs' do
range = Project.minimum(:id)..Project.maximum(:id)
untracked_ids, _ = described_class.find_registry_differences(range)
expect(untracked_ids).not_to include([project_7])
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'excludes project IDs that are not in selectively synced projects' do
range = Project.minimum(:id)..Project.maximum(:id)
untracked_ids, _ = described_class.find_registry_differences(range)
expect(untracked_ids).to match_array([project_2.id])
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
it 'excludes project IDs that are not in selectively synced projects' do
range = Project.minimum(:id)..Project.maximum(:id)
untracked_ids, _ = described_class.find_registry_differences(range)
expect(untracked_ids).to match_array([project_6.id])
end
end
end
context 'unused tracked IDs' do
context 'with an orphaned registry' do
let!(:orphaned) { create(:geo_design_registry, project_id: project_1.id) }
before do
project_1.delete
end
it 'includes tracked IDs that do not exist in the model table' do
range = project_1.id..project_1.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([project_1.id])
end
it 'excludes IDs outside the ID range' do
range = (project_1.id + 1)..Project.maximum(:id)
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
context 'with a tracked project' do
context 'excluded from selective sync' do
let!(:registry_entry) { create(:geo_design_registry, project_id: project_3.id) }
it 'includes tracked project IDs that exist but are not in a selectively synced project' do
range = project_3.id..project_3.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([project_3.id])
end
end
context 'included in selective sync' do
let!(:registry_entry) { create(:geo_design_registry, project_id: project_1.id) }
it 'excludes tracked project IDs that are in selectively synced projects' do
range = project_1.id..project_1.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
context 'with a tracked project' do
let!(:registry_entry) { create(:geo_design_registry, project_id: project_1.id) }
context 'excluded from selective sync' do
it 'includes tracked project IDs that exist but are not in a selectively synced project' do
range = project_1.id..project_1.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([project_1.id])
end
end
context 'included in selective sync' do
let!(:registry_entry) { create(:geo_design_registry, project_id: project_5.id) }
it 'excludes tracked project IDs that are in selectively synced projects' do
range = project_5.id..project_5.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
end
end
end
end
describe '#search' do describe '#search' do
let!(:design_registry) { create(:geo_design_registry) } let!(:design_registry) { create(:geo_design_registry) }
let!(:failed_registry) { create(:geo_design_registry, :sync_failed) } let!(:failed_registry) { create(:geo_design_registry, :sync_failed) }
......
...@@ -3,12 +3,203 @@ ...@@ -3,12 +3,203 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Geo::LfsObjectRegistry, :geo do RSpec.describe Geo::LfsObjectRegistry, :geo do
describe 'relationships' do include EE::GeoHelpers
it { is_expected.to belong_to(:lfs_object).class_name('LfsObject') }
end
it_behaves_like 'a BulkInsertSafe model', Geo::LfsObjectRegistry do it_behaves_like 'a BulkInsertSafe model', Geo::LfsObjectRegistry do
let(:valid_items_for_bulk_insertion) { build_list(:geo_lfs_object_registry, 10) } let(:valid_items_for_bulk_insertion) { build_list(:geo_lfs_object_registry, 10) }
let(:invalid_items_for_bulk_insertion) { [] } # class does not have any validations defined let(:invalid_items_for_bulk_insertion) { [] } # class does not have any validations defined
end end
describe 'relationships' do
it { is_expected.to belong_to(:lfs_object).class_name('LfsObject') }
end
describe '.find_registry_differences' do
let_it_be(:secondary) { create(:geo_node) }
let_it_be(:synced_group) { create(:group) }
let_it_be(:nested_group_1) { create(:group, parent: synced_group) }
let_it_be(:synced_project) { create(:project, group: synced_group) }
let_it_be(:synced_project_in_nested_group) { create(:project, group: nested_group_1) }
let_it_be(:unsynced_project) { create(:project) }
let_it_be(:project_broken_storage) { create(:project, :broken_storage) }
before do
stub_current_geo_node(secondary)
stub_lfs_object_storage
end
let_it_be(:lfs_object_1) { create(:lfs_object) }
let_it_be(:lfs_object_2) { create(:lfs_object) }
let_it_be(:lfs_object_3) { create(:lfs_object) }
let_it_be(:lfs_object_4) { create(:lfs_object) }
let_it_be(:lfs_object_5) { create(:lfs_object) }
let!(:lfs_object_remote_1) { create(:lfs_object, :object_storage) }
let!(:lfs_object_remote_2) { create(:lfs_object, :object_storage) }
let!(:lfs_object_remote_3) { create(:lfs_object, :object_storage) }
context 'untracked IDs' do
before do
create(:geo_lfs_object_registry, lfs_object_id: lfs_object_1.id)
create(:geo_lfs_object_registry, :failed, lfs_object_id: lfs_object_3.id)
create(:geo_lfs_object_registry, lfs_object_id: lfs_object_4.id)
create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_1)
create(:lfs_objects_project, project: synced_project_in_nested_group, lfs_object: lfs_object_2)
create(:lfs_objects_project, project: synced_project_in_nested_group, lfs_object: lfs_object_3)
create(:lfs_objects_project, project: unsynced_project, lfs_object: lfs_object_4)
create(:lfs_objects_project, project: project_broken_storage, lfs_object: lfs_object_5)
end
it 'includes LFS object IDs without an entry on the tracking database' do
untracked_ids, _ = described_class.find_registry_differences(LfsObject.first.id..LfsObject.last.id)
expect(untracked_ids).to match_array(
[lfs_object_2.id, lfs_object_5.id, lfs_object_remote_1.id,
lfs_object_remote_2.id, lfs_object_remote_3.id])
end
it 'excludes LFS objects outside the ID range' do
untracked_ids, _ = described_class.find_registry_differences(lfs_object_3.id..lfs_object_remote_2.id)
expect(untracked_ids).to match_array(
[lfs_object_5.id, lfs_object_remote_1.id,
lfs_object_remote_2.id])
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'excludes LFS object IDs that are not in selectively synced projects' do
untracked_ids, _ = described_class.find_registry_differences(LfsObject.first.id..LfsObject.last.id)
expect(untracked_ids).to match_array([lfs_object_2.id])
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
it 'excludes LFS object IDs that are not in selectively synced projects' do
untracked_ids, _ = described_class.find_registry_differences(LfsObject.first.id..LfsObject.last.id)
expect(untracked_ids).to match_array([lfs_object_5.id])
end
end
context 'with object storage sync disabled' do
let(:secondary) { create(:geo_node, :local_storage_only) }
it 'excludes LFS objects in object storage' do
untracked_ids, _ = described_class.find_registry_differences(LfsObject.first.id..LfsObject.last.id)
expect(untracked_ids).to match_array([lfs_object_2.id, lfs_object_5.id])
end
end
end
context 'unused tracked IDs' do
context 'with an orphaned registry' do
let!(:orphaned) { create(:geo_lfs_object_registry, lfs_object_id: non_existing_record_id) }
it 'includes tracked IDs that do not exist in the model table' do
range = non_existing_record_id..non_existing_record_id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([non_existing_record_id])
end
it 'excludes IDs outside the ID range' do
range = 1..1000
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
context 'with a tracked LFS object' do
let!(:registry_entry) { create(:geo_lfs_object_registry, lfs_object_id: lfs_object_1.id) }
let(:range) { lfs_object_1.id..lfs_object_1.id }
context 'excluded from selective sync' do
it 'includes tracked LFS object IDs that exist but are not in a selectively synced project' do
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([lfs_object_1.id])
end
end
context 'included in selective sync' do
let!(:join_record) { create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_1) }
it 'excludes tracked LFS object IDs that are in selectively synced projects' do
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
context 'with a tracked LFS object' do
let!(:registry_entry) { create(:geo_lfs_object_registry, lfs_object_id: lfs_object_1.id) }
let(:range) { lfs_object_1.id..lfs_object_1.id }
context 'excluded from selective sync' do
it 'includes tracked LFS object IDs that exist but are not in a selectively synced project' do
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([lfs_object_1.id])
end
end
context 'included in selective sync' do
let!(:join_record) { create(:lfs_objects_project, project: project_broken_storage, lfs_object: lfs_object_1) }
it 'excludes tracked LFS object IDs that are in selectively synced projects' do
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
end
end
context 'with object storage sync disabled' do
let(:secondary) { create(:geo_node, :local_storage_only) }
context 'with a tracked LFS object' do
context 'in object storage' do
it 'includes tracked LFS object IDs that are in object storage' do
create(:geo_lfs_object_registry, lfs_object_id: lfs_object_remote_1.id)
range = lfs_object_remote_1.id..lfs_object_remote_1.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to match_array([lfs_object_remote_1.id])
end
end
context 'not in object storage' do
it 'excludes tracked LFS object IDs that are not in object storage' do
create(:geo_lfs_object_registry, lfs_object_id: lfs_object_1.id)
range = lfs_object_1.id..lfs_object_1.id
_, unused_tracked_ids = described_class.find_registry_differences(range)
expect(unused_tracked_ids).to be_empty
end
end
end
end
end
end
end end
...@@ -5,9 +5,6 @@ require 'spec_helper' ...@@ -5,9 +5,6 @@ require 'spec_helper'
RSpec.describe Geo::UploadRegistry, :geo do RSpec.describe Geo::UploadRegistry, :geo do
include EE::GeoHelpers include EE::GeoHelpers
let!(:failed) { create(:geo_upload_registry, :failed) }
let!(:synced) { create(:geo_upload_registry) }
it_behaves_like 'a BulkInsertSafe model', Geo::UploadRegistry do it_behaves_like 'a BulkInsertSafe model', Geo::UploadRegistry do
let(:valid_items_for_bulk_insertion) { build_list(:geo_upload_registry, 10, created_at: Time.zone.now) } let(:valid_items_for_bulk_insertion) { build_list(:geo_upload_registry, 10, created_at: Time.zone.now) }
let(:invalid_items_for_bulk_insertion) { [] } # class does not have any validations defined let(:invalid_items_for_bulk_insertion) { [] } # class does not have any validations defined
...@@ -19,20 +16,75 @@ RSpec.describe Geo::UploadRegistry, :geo do ...@@ -19,20 +16,75 @@ RSpec.describe Geo::UploadRegistry, :geo do
expect(described_class.find(registry.id).upload).to be_an_instance_of(Upload) expect(described_class.find(registry.id).upload).to be_an_instance_of(Upload)
end end
describe '.find_registry_differences' do
let_it_be(:secondary) { create(:geo_node) }
let_it_be(:project) { create(:project) }
let_it_be(:upload_1) { create(:upload, model: project) }
let_it_be(:upload_2) { create(:upload, model: project) }
let_it_be(:upload_3) { create(:upload, :issuable_upload, model: project) }
let_it_be(:upload_4) { create(:upload, model: project) }
let_it_be(:upload_5) { create(:upload, model: project) }
let_it_be(:upload_6) { create(:upload, :personal_snippet_upload) }
let_it_be(:upload_7) { create(:upload, :object_storage, model: project) }
let_it_be(:upload_8) { create(:upload, :object_storage, model: project) }
let_it_be(:upload_9) { create(:upload, :object_storage, model: project) }
before do
stub_current_geo_node(secondary)
end
it 'returns untracked IDs as well as tracked IDs that are unused', :aggregate_failures do
max_id = Upload.maximum(:id)
create(:geo_upload_registry, :avatar, file_id: upload_1.id)
create(:geo_upload_registry, :file, file_id: upload_3.id)
create(:geo_upload_registry, :avatar, file_id: upload_5.id)
create(:geo_upload_registry, :personal_file, file_id: upload_6.id)
create(:geo_upload_registry, :avatar, file_id: upload_7.id)
unused_registry_1 = create(:geo_upload_registry, :attachment, file_id: max_id + 1)
unused_registry_2 = create(:geo_upload_registry, :personal_file, file_id: max_id + 2)
range = 1..(max_id + 2)
untracked, unused = described_class.find_registry_differences(range)
expected_untracked = [
[upload_2.id, 'avatar'],
[upload_4.id, 'avatar'],
[upload_8.id, 'avatar'],
[upload_9.id, 'avatar']
]
expected_unused = [
[unused_registry_1.file_id, 'attachment'],
[unused_registry_2.file_id, 'personal_file']
]
expect(untracked).to match_array(expected_untracked)
expect(unused).to match_array(expected_unused)
end
end
describe '.failed' do describe '.failed' do
it 'returns registries in the failed state' do it 'returns registries in the failed state' do
failed = create(:geo_upload_registry, :failed)
create(:geo_upload_registry)
expect(described_class.failed).to match_ids(failed) expect(described_class.failed).to match_ids(failed)
end end
end end
describe '.synced' do describe '.synced' do
it 'returns registries in the synced state' do it 'returns registries in the synced state' do
create(:geo_upload_registry, :failed)
synced = create(:geo_upload_registry)
expect(described_class.synced).to match_ids(synced) expect(described_class.synced).to match_ids(synced)
end end
end end
describe '.retry_due' do describe '.retry_due' do
it 'returns registries in the synced state' do it 'returns registries in the synced state' do
failed = create(:geo_upload_registry, :failed)
synced = create(:geo_upload_registry)
retry_yesterday = create(:geo_upload_registry, retry_at: Date.yesterday) retry_yesterday = create(:geo_upload_registry, retry_at: Date.yesterday)
create(:geo_upload_registry, retry_at: Date.tomorrow) create(:geo_upload_registry, retry_at: Date.tomorrow)
...@@ -40,11 +92,13 @@ RSpec.describe Geo::UploadRegistry, :geo do ...@@ -40,11 +92,13 @@ RSpec.describe Geo::UploadRegistry, :geo do
end end
end end
describe '.never' do describe '.never_attempted_sync' do
it 'returns registries that are never synced' do it 'returns registries that are never synced' do
never = create(:geo_upload_registry, retry_count: nil, success: false) create(:geo_upload_registry, :failed)
create(:geo_upload_registry)
pending = create(:geo_upload_registry, retry_count: nil, success: false)
expect(described_class.never).to match_ids([never]) expect(described_class.never_attempted_sync).to match_ids([pending])
end end
end end
...@@ -55,12 +109,12 @@ RSpec.describe Geo::UploadRegistry, :geo do ...@@ -55,12 +109,12 @@ RSpec.describe Geo::UploadRegistry, :geo do
described_class.with_status('synced') described_class.with_status('synced')
end end
# Explained via: https://gitlab.com/gitlab-org/gitlab/-/issues/216049 it 'finds the registries with status "never_attempted_sync" when filter is set to "pending"' do
it 'finds the registries with status "never" when filter is set to "pending"' do expect(described_class).to receive(:never_attempted_sync)
expect(described_class).to receive(:never)
described_class.with_status('pending') described_class.with_status('pending')
end end
it 'finds the registries with status "failed"' do it 'finds the registries with status "failed"' do
expect(described_class).to receive(:failed) expect(described_class).to receive(:failed)
...@@ -93,6 +147,9 @@ RSpec.describe Geo::UploadRegistry, :geo do ...@@ -93,6 +147,9 @@ RSpec.describe Geo::UploadRegistry, :geo do
end end
describe '#synchronization_state' do describe '#synchronization_state' do
let_it_be(:failed) { create(:geo_upload_registry, :failed) }
let_it_be(:synced) { create(:geo_upload_registry) }
it 'returns :synced for a successful synced registry' do it 'returns :synced for a successful synced registry' do
expect(synced.synchronization_state).to eq(:synced) expect(synced.synchronization_state).to eq(:synced)
end end
......
# frozen_string_literal: true
RSpec.shared_examples 'a file registry finder' do
include_examples 'a registry finder'
it 'responds to file registry finder methods' do
file_registry_finder_methods = %i{
synced_missing_on_primary_count
find_retryable_synced_missing_on_primary_registries
}
file_registry_finder_methods.each do |method|
expect(subject).to respond_to(method)
end
end
describe '#synced_missing_on_primary_count' do
it 'counts registries that have been synced and are missing on the primary, excluding not synced ones' do
expect(subject.synced_missing_on_primary_count).to eq 2
end
end
describe '#find_retryable_synced_missing_on_primary_registries' do
it 'returns registries that have been synced and are missing on the primary' do
registries = subject.find_retryable_synced_missing_on_primary_registries(batch_size: 10)
expect(registries).to match_ids(registry_2, registry_5)
end
it 'excludes except_ids' do
registries = subject.find_retryable_synced_missing_on_primary_registries(batch_size: 10, except_ids: [replicable_5.id])
expect(registries).to match_ids(registry_2)
end
end
end
# frozen_string_literal: true
RSpec.shared_examples 'a file registry finder' do
it 'responds to file registry finder methods' do
file_registry_finder_methods = %i{
registry_class
registry_count
synced_count
failed_count
count_synced_missing_on_primary
find_retryable_failed_registries
find_retryable_synced_missing_on_primary_registries
}
file_registry_finder_methods.each do |method|
expect(subject).to respond_to(method)
end
end
end
# frozen_string_literal: true
RSpec.shared_examples 'a registry finder' do
it 'responds to registry finder methods' do
registry_finder_methods = %i{
failed_count
find_registries_never_attempted_sync
find_registries_needs_sync_again
registry_class
registry_count
synced_count
}
registry_finder_methods.each do |method|
expect(subject).to respond_to(method)
end
end
describe '#registry_count' do
it 'counts registries' do
expect(subject.registry_count).to eq 8
end
end
describe '#synced_count' do
it 'counts registries that has been synced' do
expect(subject.synced_count).to eq 2
end
end
describe '#failed_count' do
it 'counts registries that sync has failed' do
expect(subject.failed_count).to eq 4
end
end
describe '#find_registries_never_attempted_sync' do
it 'returns registries that have never been synced' do
registries = subject.find_registries_never_attempted_sync(batch_size: 10)
expect(registries).to match_ids(registry_3, registry_8)
end
it 'excludes except_ids' do
registries = subject.find_registries_never_attempted_sync(batch_size: 10, except_ids: [replicable_3.id])
expect(registries).to match_ids(registry_8)
end
end
describe '#find_registries_needs_sync_again' do
it 'returns registries for that have failed to sync' do
registries = subject.find_registries_needs_sync_again(batch_size: 10)
expect(registries).to match_ids(registry_1, registry_4, registry_6, registry_7)
end
it 'excludes except_ids' do
registries = subject.find_registries_needs_sync_again(batch_size: 10, except_ids: [replicable_4.id, replicable_7.id])
expect(registries).to match_ids(registry_1, registry_6)
end
end
end
...@@ -9,26 +9,26 @@ RSpec.shared_examples 'a Geo framework registry' do ...@@ -9,26 +9,26 @@ RSpec.shared_examples 'a Geo framework registry' do
let!(:unsynced_item1) { create(registry_class_factory) } let!(:unsynced_item1) { create(registry_class_factory) }
let!(:unsynced_item2) { create(registry_class_factory) } let!(:unsynced_item2) { create(registry_class_factory) }
describe '.find_unsynced_registries' do describe '.find_registries_never_attempted_sync' do
it 'returns unsynced items' do it 'returns unsynced items' do
result = described_class.find_unsynced_registries(batch_size: 10) result = described_class.find_registries_never_attempted_sync(batch_size: 10)
expect(result).to include(unsynced_item1, unsynced_item2) expect(result).to include(unsynced_item1, unsynced_item2)
end end
it 'returns unsynced items except some specific item ID' do it 'returns items that never have an attempt to sync except some specific item ID' do
except_id = unsynced_item1.model_record_id except_id = unsynced_item1.model_record_id
result = described_class.find_unsynced_registries(batch_size: 10, except_ids: [except_id]) result = described_class.find_registries_never_attempted_sync(batch_size: 10, except_ids: [except_id])
expect(result).to include(unsynced_item2) expect(result).to include(unsynced_item2)
expect(result).not_to include(unsynced_item1) expect(result).not_to include(unsynced_item1)
end end
end end
describe '.find_failed_registries' do describe '.find_registries_needs_sync_again' do
it 'returns failed items' do it 'returns failed items' do
result = described_class.find_failed_registries(batch_size: 10) result = described_class.find_registries_needs_sync_again(batch_size: 10)
expect(result).to include(failed_item1, failed_item2) expect(result).to include(failed_item1, failed_item2)
end end
...@@ -36,7 +36,7 @@ RSpec.shared_examples 'a Geo framework registry' do ...@@ -36,7 +36,7 @@ RSpec.shared_examples 'a Geo framework registry' do
it 'returns failed items except some specific item ID' do it 'returns failed items except some specific item ID' do
except_id = failed_item1.model_record_id except_id = failed_item1.model_record_id
result = described_class.find_failed_registries(batch_size: 10, except_ids: [except_id]) result = described_class.find_registries_needs_sync_again(batch_size: 10, except_ids: [except_id])
expect(result).to include(failed_item2) expect(result).to include(failed_item2)
expect(result).not_to include(failed_item1) expect(result).not_to include(failed_item1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment