Commit ffce8121 authored by Nick Thomas's avatar Nick Thomas

Merge branch 'mk/geo/refactor-job-finders' into 'master'

Geo: Refactor FileDownloadDispatchWorker JobFinders and FileRegistryFinders

Closes #5601

See merge request gitlab-org/gitlab-ee!5331
parents 79f13b9d b7444374
module Geo module Geo
class AttachmentRegistryFinder < FileRegistryFinder class AttachmentRegistryFinder < FileRegistryFinder
def attachments def syncable
if selective_sync? all.geo_syncable
Upload.where(group_uploads.or(project_uploads).or(other_uploads))
else
Upload.all
end
end end
def syncable_attachments def count_syncable
attachments.geo_syncable syncable.count
end end
def count_syncable_attachments def count_synced
syncable_attachments.count
end
def count_synced_attachments
if aggregate_pushdown_supported? if aggregate_pushdown_supported?
find_synced_attachments.count find_synced.count
else else
legacy_find_synced_attachments.count legacy_find_synced.count
end end
end end
def count_failed_attachments def count_failed
if aggregate_pushdown_supported? if aggregate_pushdown_supported?
find_failed_attachments.count find_failed.count
else else
legacy_find_failed_attachments.count legacy_find_failed.count
end end
end end
def count_synced_missing_on_primary_attachments def count_synced_missing_on_primary
if aggregate_pushdown_supported? && !use_legacy_queries? if aggregate_pushdown_supported? && !use_legacy_queries?
fdw_find_synced_missing_on_primary_attachments.count fdw_find_synced_missing_on_primary.count
else else
legacy_find_synced_missing_on_primary_attachments.count legacy_find_synced_missing_on_primary.count
end end
end end
def count_registry_attachments def count_registry
Geo::FileRegistry.attachments.count Geo::FileRegistry.attachments.count
end end
def find_synced_attachments
if use_legacy_queries?
legacy_find_synced_attachments
else
fdw_find_synced_attachments
end
end
def find_failed_attachments
if use_legacy_queries?
legacy_find_failed_attachments
else
fdw_find_failed_attachments
end
end
# Find limited amount of non replicated attachments. # Find limited amount of non replicated attachments.
# #
# You can pass a list with `except_file_ids:` so you can exclude items you # You can pass a list with `except_file_ids:` so you can exclude items you
...@@ -70,52 +46,52 @@ module Geo ...@@ -70,52 +46,52 @@ module Geo
# #
# @param [Integer] batch_size used to limit the results returned # @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_file_ids ids that will be ignored from the query # @param [Array<Integer>] except_file_ids ids that will be ignored from the query
def find_unsynced_attachments(batch_size:, except_file_ids: []) def find_unsynced(batch_size:, except_file_ids: [])
relation = relation =
if use_legacy_queries? if use_legacy_queries?
legacy_find_unsynced_attachments(except_file_ids: except_file_ids) legacy_find_unsynced(except_file_ids: except_file_ids)
else else
fdw_find_unsynced_attachments(except_file_ids: except_file_ids) fdw_find_unsynced(except_file_ids: except_file_ids)
end end
relation.limit(batch_size) relation.limit(batch_size)
end end
def find_migrated_local_attachments(batch_size:, except_file_ids: []) def find_migrated_local(batch_size:, except_file_ids: [])
relation = relation =
if use_legacy_queries? if use_legacy_queries?
legacy_find_migrated_local_attachments(except_file_ids: except_file_ids) legacy_find_migrated_local(except_file_ids: except_file_ids)
else else
fdw_find_migrated_local_attachments(except_file_ids: except_file_ids) fdw_find_migrated_local(except_file_ids: except_file_ids)
end end
relation.limit(batch_size) relation.limit(batch_size)
end end
def find_retryable_failed_attachments_registries(batch_size:, except_file_ids: []) def find_retryable_failed_registries(batch_size:, except_file_ids: [])
find_failed_attachments_registries find_failed_registries
.retry_due .retry_due
.where.not(file_id: except_file_ids) .where.not(file_id: except_file_ids)
.limit(batch_size) .limit(batch_size)
end end
def find_retryable_synced_missing_on_primary_attachments_registries(batch_size:, except_file_ids: []) def find_retryable_synced_missing_on_primary_registries(batch_size:, except_file_ids: [])
find_synced_missing_on_primary_attachments_registries find_synced_missing_on_primary_registries
.retry_due .retry_due
.where.not(file_id: except_file_ids) .where.not(file_id: except_file_ids)
.limit(batch_size) .limit(batch_size)
end end
def find_failed_attachments_registries private
Geo::FileRegistry.attachments.failed
end
def find_synced_missing_on_primary_attachments_registries def all
Geo::FileRegistry.attachments.synced.missing_on_primary if selective_sync?
Upload.where(group_uploads.or(project_uploads).or(other_uploads))
else
Upload.all
end
end end
private
def group_uploads def group_uploads
namespace_ids = namespace_ids =
if current_node.selective_sync_by_namespaces? if current_node.selective_sync_by_namespaces?
...@@ -147,40 +123,68 @@ module Geo ...@@ -147,40 +123,68 @@ module Geo
Upload.arel_table Upload.arel_table
end end
def find_synced
if use_legacy_queries?
legacy_find_synced
else
fdw_find_synced
end
end
def find_failed
if use_legacy_queries?
legacy_find_failed
else
fdw_find_failed
end
end
def find_synced_registries
Geo::FileRegistry.attachments.synced
end
def find_failed_registries
Geo::FileRegistry.attachments.failed
end
def find_synced_missing_on_primary_registries
find_synced_registries.missing_on_primary
end
# #
# FDW accessors # FDW accessors
# #
def fdw_find_synced_attachments def fdw_find_synced
fdw_find_syncable_attachments.merge(Geo::FileRegistry.synced) fdw_find_syncable.merge(Geo::FileRegistry.synced)
end end
def fdw_find_failed_attachments def fdw_find_failed
fdw_find_syncable_attachments.merge(Geo::FileRegistry.failed) fdw_find_syncable.merge(Geo::FileRegistry.failed)
end end
def fdw_find_syncable_attachments def fdw_find_syncable
fdw_attachments.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_attachments_table}.id") fdw_all.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id")
.geo_syncable .geo_syncable
.merge(Geo::FileRegistry.attachments) .merge(Geo::FileRegistry.attachments)
end end
def fdw_find_unsynced_attachments(except_file_ids:) def fdw_find_unsynced(except_file_ids:)
upload_types = Geo::FileService::DEFAULT_OBJECT_TYPES.map { |val| "'#{val}'" }.join(',') upload_types = Geo::FileService::DEFAULT_OBJECT_TYPES.map { |val| "'#{val}'" }.join(',')
fdw_attachments.joins("LEFT OUTER JOIN file_registry fdw_all.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_attachments_table}.id ON file_registry.file_id = #{fdw_table}.id
AND file_registry.file_type IN (#{upload_types})") AND file_registry.file_type IN (#{upload_types})")
.geo_syncable .geo_syncable
.where(file_registry: { id: nil }) .where(file_registry: { id: nil })
.where.not(id: except_file_ids) .where.not(id: except_file_ids)
end end
def fdw_find_synced_missing_on_primary_attachments def fdw_find_synced_missing_on_primary
fdw_find_synced_attachments.merge(Geo::FileRegistry.missing_on_primary) fdw_find_synced.merge(Geo::FileRegistry.missing_on_primary)
end end
def fdw_attachments def fdw_all
if selective_sync? if selective_sync?
Geo::Fdw::Upload.where(group_uploads.or(project_uploads).or(other_uploads)) Geo::Fdw::Upload.where(group_uploads.or(project_uploads).or(other_uploads))
else else
...@@ -188,12 +192,12 @@ module Geo ...@@ -188,12 +192,12 @@ module Geo
end end
end end
def fdw_attachments_table def fdw_table
Geo::Fdw::Upload.table_name Geo::Fdw::Upload.table_name
end end
def fdw_find_migrated_local_attachments(except_file_ids:) def fdw_find_migrated_local(except_file_ids:)
fdw_attachments.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_attachments_table}.id") fdw_all.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id")
.with_files_stored_remotely .with_files_stored_remotely
.merge(Geo::FileRegistry.attachments) .merge(Geo::FileRegistry.attachments)
.where.not(id: except_file_ids) .where.not(id: except_file_ids)
...@@ -203,46 +207,46 @@ module Geo ...@@ -203,46 +207,46 @@ module Geo
# Legacy accessors (non FDW) # Legacy accessors (non FDW)
# #
def legacy_find_synced_attachments def legacy_find_synced
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
syncable_attachments, syncable,
Geo::FileRegistry.attachments.synced.pluck(:file_id), find_synced_registries.pluck(:file_id),
Upload Upload
) )
end end
def legacy_find_failed_attachments def legacy_find_failed
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
syncable_attachments, syncable,
find_failed_attachments_registries.pluck(:file_id), find_failed_registries.pluck(:file_id),
Upload Upload
) )
end end
def legacy_find_unsynced_attachments(except_file_ids:) def legacy_find_unsynced(except_file_ids:)
registry_file_ids = legacy_pluck_registry_file_ids(file_types: Geo::FileService::DEFAULT_OBJECT_TYPES) | except_file_ids registry_file_ids = Geo::FileRegistry.attachments.pluck(:file_id) | except_file_ids
legacy_left_outer_join_registry_ids( legacy_left_outer_join_registry_ids(
syncable_attachments, syncable,
registry_file_ids, registry_file_ids,
Upload Upload
) )
end end
def legacy_find_migrated_local_attachments(except_file_ids:) def legacy_find_migrated_local(except_file_ids:)
registry_file_ids = Geo::FileRegistry.attachments.pluck(:file_id) - except_file_ids registry_file_ids = Geo::FileRegistry.attachments.pluck(:file_id) - except_file_ids
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
attachments.with_files_stored_remotely, all.with_files_stored_remotely,
registry_file_ids, registry_file_ids,
Upload Upload
) )
end end
def legacy_find_synced_missing_on_primary_attachments def legacy_find_synced_missing_on_primary
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
syncable_attachments, syncable,
Geo::FileRegistry.attachments.synced.missing_on_primary.pluck(:file_id), find_synced_missing_on_primary_registries.pluck(:file_id),
Upload Upload
) )
end end
......
module Geo module Geo
class FileRegistryFinder < RegistryFinder class FileRegistryFinder < RegistryFinder
protected # @abstract Subclass is expected to implement the declared methods
def legacy_pluck_registry_file_ids(file_types:) # @!method syncable
Geo::FileRegistry.where(file_type: file_types).pluck(:file_id) # Return an ActiveRecord::Relation of tracked resource records, filtered
# by selective sync, with files stored locally
def syncable
raise NotImplementedError
end
# @!method count_syncable
# Return a count of tracked resource records, filtered by selective
# sync, with files stored locally
def count_syncable
raise NotImplementedError
end
# @!method count_synced
# Return a count of tracked resource records, filtered by selective
# sync, with files stored locally, and are synced
def count_synced
raise NotImplementedError
end
# @!method count_failed
# Return a count of tracked resource records, filtered by selective
# sync, with files stored locally, and are failed
def count_failed
raise NotImplementedError
end
# @!method count_synced_missing_on_primary
# Return a count of tracked resource records, filtered by selective
# sync, with files stored locally, and are synced and missing on the
# primary
def count_synced_missing_on_primary
raise NotImplementedError
end
# @!method count_registry
# Return a count of the registry records for the tracked file_type(s)
def count_registry
raise NotImplementedError
end
# @!method find_unsynced
# Return an ActiveRecord::Relation of not-yet-tracked resource records,
# filtered by selective sync, with files stored locally, excluding
# specified IDs, limited to batch_size
def find_unsynced
raise NotImplementedError
end
# @!method find_migrated_local
# Return an ActiveRecord::Relation of tracked resource records, filtered
# by selective sync, with files stored remotely, excluding
# specified IDs, limited to batch_size
def find_migrated_local
raise NotImplementedError
end
# @!method find_retryable_failed_registries
# Return an ActiveRecord::Relation of registry records marked as failed,
# which are ready to be retried, excluding specified IDs, limited to
# batch_size
def find_retryable_failed_registries
raise NotImplementedError
end
# @!method find_retryable_synced_missing_on_primary_registries
# Return an ActiveRecord::Relation of registry records marked as synced
# and missing on the primary, which are ready to be retried, excluding
# specified IDs, limited to batch_size
def find_retryable_synced_missing_on_primary_registries
raise NotImplementedError
end end
end end
end end
module Geo module Geo
class JobArtifactRegistryFinder < RegistryFinder class JobArtifactRegistryFinder < RegistryFinder
def count_syncable_job_artifacts def count_syncable
syncable_job_artifacts.count syncable.count
end end
def count_synced_job_artifacts def count_synced
if aggregate_pushdown_supported? if aggregate_pushdown_supported?
find_synced_job_artifacts.count find_synced.count
else else
legacy_find_synced_job_artifacts.count legacy_find_synced.count
end end
end end
def count_failed_job_artifacts def count_failed
if aggregate_pushdown_supported? if aggregate_pushdown_supported?
find_failed_job_artifacts.count find_failed.count
else else
legacy_find_failed_job_artifacts.count legacy_find_failed.count
end end
end end
def count_synced_missing_on_primary_job_artifacts def count_synced_missing_on_primary
if aggregate_pushdown_supported? if aggregate_pushdown_supported?
find_synced_missing_on_primary_job_artifacts.count find_synced_missing_on_primary.count
else else
legacy_find_synced_missing_on_primary_job_artifacts.count legacy_find_synced_missing_on_primary.count
end end
end end
def count_registry_job_artifacts def count_registry
Geo::JobArtifactRegistry.count Geo::JobArtifactRegistry.count
end end
...@@ -42,117 +42,117 @@ module Geo ...@@ -42,117 +42,117 @@ module Geo
# #
# @param [Integer] batch_size used to limit the results returned # @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_artifact_ids ids that will be ignored from the query # @param [Array<Integer>] except_artifact_ids ids that will be ignored from the query
def find_unsynced_job_artifacts(batch_size:, except_artifact_ids: []) def find_unsynced(batch_size:, except_artifact_ids: [])
relation = relation =
if use_legacy_queries? if use_legacy_queries?
legacy_find_unsynced_job_artifacts(except_artifact_ids: except_artifact_ids) legacy_find_unsynced(except_artifact_ids: except_artifact_ids)
else else
fdw_find_unsynced_job_artifacts(except_artifact_ids: except_artifact_ids) fdw_find_unsynced(except_artifact_ids: except_artifact_ids)
end end
relation.limit(batch_size) relation.limit(batch_size)
end end
def find_migrated_local_job_artifacts(batch_size:, except_artifact_ids: []) def find_migrated_local(batch_size:, except_artifact_ids: [])
relation = relation =
if use_legacy_queries? if use_legacy_queries?
legacy_find_migrated_local_job_artifacts(except_artifact_ids: except_artifact_ids) legacy_find_migrated_local(except_artifact_ids: except_artifact_ids)
else else
fdw_find_migrated_local_job_artifacts(except_artifact_ids: except_artifact_ids) fdw_find_migrated_local(except_artifact_ids: except_artifact_ids)
end end
relation.limit(batch_size) relation.limit(batch_size)
end end
def job_artifacts def syncable
if selective_sync? all.geo_syncable
Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
else
Ci::JobArtifact.all
end
end
def syncable_job_artifacts
job_artifacts.geo_syncable
end end
def find_retryable_failed_job_artifacts_registries(batch_size:, except_artifact_ids: []) def find_retryable_failed_registries(batch_size:, except_artifact_ids: [])
find_failed_job_artifacts_registries find_failed_registries
.retry_due .retry_due
.where.not(artifact_id: except_artifact_ids) .where.not(artifact_id: except_artifact_ids)
.limit(batch_size) .limit(batch_size)
end end
def find_retryable_synced_missing_on_primary_job_artifacts_registries(batch_size:, except_artifact_ids: []) def find_retryable_synced_missing_on_primary_registries(batch_size:, except_artifact_ids: [])
find_synced_missing_on_primary_job_artifacts_registries find_synced_missing_on_primary_registries
.retry_due .retry_due
.where.not(artifact_id: except_artifact_ids) .where.not(artifact_id: except_artifact_ids)
.limit(batch_size) .limit(batch_size)
end end
def find_synced_job_artifacts_registries private
Geo::JobArtifactRegistry.synced
end
def find_synced_missing_on_primary_job_artifacts_registries
Geo::JobArtifactRegistry.synced.missing_on_primary
end
def find_failed_job_artifacts_registries def all
Geo::JobArtifactRegistry.failed if selective_sync?
Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
else
Ci::JobArtifact.all
end
end end
private def find_synced
def find_synced_job_artifacts
if use_legacy_queries? if use_legacy_queries?
legacy_find_synced_job_artifacts legacy_find_synced
else else
fdw_find_job_artifacts.merge(find_synced_job_artifacts_registries) fdw_find.merge(find_synced_registries)
end end
end end
def find_synced_missing_on_primary_job_artifacts def find_synced_missing_on_primary
if use_legacy_queries? if use_legacy_queries?
legacy_find_synced_missing_on_primary_job_artifacts legacy_find_synced_missing_on_primary
else else
fdw_find_job_artifacts.merge(find_synced_missing_on_primary_job_artifacts_registries) fdw_find.merge(find_synced_missing_on_primary_registries)
end end
end end
def find_failed_job_artifacts def find_failed
if use_legacy_queries? if use_legacy_queries?
legacy_find_failed_job_artifacts legacy_find_failed
else else
fdw_find_job_artifacts.merge(find_failed_job_artifacts_registries) fdw_find.merge(find_failed_registries)
end end
end end
def find_synced_registries
Geo::JobArtifactRegistry.synced
end
def find_synced_missing_on_primary_registries
find_synced_registries.missing_on_primary
end
def find_failed_registries
Geo::JobArtifactRegistry.failed
end
# #
# FDW accessors # FDW accessors
# #
def fdw_find_job_artifacts def fdw_find
fdw_job_artifacts.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_job_artifacts_table}.id") fdw_all.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_table}.id")
.geo_syncable .geo_syncable
end end
def fdw_find_unsynced_job_artifacts(except_artifact_ids:) def fdw_find_unsynced(except_artifact_ids:)
fdw_job_artifacts.joins("LEFT OUTER JOIN job_artifact_registry fdw_all.joins("LEFT OUTER JOIN job_artifact_registry
ON job_artifact_registry.artifact_id = #{fdw_job_artifacts_table}.id") ON job_artifact_registry.artifact_id = #{fdw_table}.id")
.geo_syncable .geo_syncable
.where(job_artifact_registry: { artifact_id: nil }) .where(job_artifact_registry: { artifact_id: nil })
.where.not(id: except_artifact_ids) .where.not(id: except_artifact_ids)
end end
def fdw_find_migrated_local_job_artifacts(except_artifact_ids:) def fdw_find_migrated_local(except_artifact_ids:)
fdw_job_artifacts.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_job_artifacts_table}.id") fdw_all.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_table}.id")
.with_files_stored_remotely .with_files_stored_remotely
.where.not(id: except_artifact_ids) .where.not(id: except_artifact_ids)
.merge(Geo::JobArtifactRegistry.all) .merge(Geo::JobArtifactRegistry.all)
end end
def fdw_job_artifacts def fdw_all
if selective_sync? if selective_sync?
Geo::Fdw::Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects }) Geo::Fdw::Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
else else
...@@ -160,7 +160,7 @@ module Geo ...@@ -160,7 +160,7 @@ module Geo
end end
end end
def fdw_job_artifacts_table def fdw_table
Geo::Fdw::Ci::JobArtifact.table_name Geo::Fdw::Ci::JobArtifact.table_name
end end
...@@ -168,51 +168,46 @@ module Geo ...@@ -168,51 +168,46 @@ module Geo
# Legacy accessors (non FDW) # Legacy accessors (non FDW)
# #
def legacy_find_synced_job_artifacts def legacy_find_synced
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
syncable_job_artifacts, syncable,
find_synced_job_artifacts_registries.pluck(:artifact_id), find_synced_registries.pluck(:artifact_id),
Ci::JobArtifact Ci::JobArtifact
) )
end end
def legacy_find_failed_job_artifacts def legacy_find_failed
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
syncable_job_artifacts, syncable,
find_failed_job_artifacts_registries.pluck(:artifact_id), find_failed_registries.pluck(:artifact_id),
Ci::JobArtifact Ci::JobArtifact
) )
end end
def legacy_find_unsynced_job_artifacts(except_artifact_ids:) def legacy_find_unsynced(except_artifact_ids:)
registry_artifact_ids = legacy_pluck_artifact_ids(include_registry_ids: except_artifact_ids) registry_artifact_ids = Geo::JobArtifactRegistry.pluck(:artifact_id) | except_artifact_ids
legacy_left_outer_join_registry_ids( legacy_left_outer_join_registry_ids(
syncable_job_artifacts, syncable,
registry_artifact_ids, registry_artifact_ids,
Ci::JobArtifact Ci::JobArtifact
) )
end end
def legacy_pluck_artifact_ids(include_registry_ids:) def legacy_find_migrated_local(except_artifact_ids:)
ids = Geo::JobArtifactRegistry.pluck(:artifact_id) registry_artifact_ids = Geo::JobArtifactRegistry.pluck(:artifact_id) - except_artifact_ids
(ids + include_registry_ids).uniq
end
def legacy_find_migrated_local_job_artifacts(except_artifact_ids:)
registry_file_ids = Geo::JobArtifactRegistry.pluck(:artifact_id) - except_artifact_ids
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
job_artifacts.with_files_stored_remotely, all.with_files_stored_remotely,
registry_file_ids, registry_artifact_ids,
Ci::JobArtifact Ci::JobArtifact
) )
end end
def legacy_find_synced_missing_on_primary_job_artifacts def legacy_find_synced_missing_on_primary
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
syncable_job_artifacts, syncable,
find_synced_missing_on_primary_job_artifacts_registries.pluck(:artifact_id), find_synced_missing_on_primary_registries.pluck(:artifact_id),
Ci::JobArtifact Ci::JobArtifact
) )
end end
......
module Geo module Geo
class LfsObjectRegistryFinder < FileRegistryFinder class LfsObjectRegistryFinder < FileRegistryFinder
def count_syncable_lfs_objects def count_syncable
syncable_lfs_objects.count syncable.count
end end
def count_synced_lfs_objects def count_synced
if aggregate_pushdown_supported? if aggregate_pushdown_supported?
find_synced_lfs_objects.count find_synced.count
else else
legacy_find_synced_lfs_objects.count legacy_find_synced.count
end end
end end
def count_failed_lfs_objects def count_failed
if aggregate_pushdown_supported? if aggregate_pushdown_supported?
find_failed_lfs_objects.count find_failed.count
else else
legacy_find_failed_lfs_objects.count legacy_find_failed.count
end end
end end
def count_synced_missing_on_primary_lfs_objects def count_synced_missing_on_primary
if aggregate_pushdown_supported? && !use_legacy_queries? if aggregate_pushdown_supported? && !use_legacy_queries?
fdw_find_synced_missing_on_primary_lfs_objects.count fdw_find_synced_missing_on_primary.count
else else
legacy_find_synced_missing_on_primary_lfs_objects.count legacy_find_synced_missing_on_primary.count
end end
end end
def count_registry_lfs_objects def count_registry
Geo::FileRegistry.lfs_objects.count Geo::FileRegistry.lfs_objects.count
end end
...@@ -42,119 +42,123 @@ module Geo ...@@ -42,119 +42,123 @@ module Geo
# #
# @param [Integer] batch_size used to limit the results returned # @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_file_ids ids that will be ignored from the query # @param [Array<Integer>] except_file_ids ids that will be ignored from the query
def find_unsynced_lfs_objects(batch_size:, except_file_ids: []) def find_unsynced(batch_size:, except_file_ids: [])
relation = relation =
if use_legacy_queries? if use_legacy_queries?
legacy_find_unsynced_lfs_objects(except_file_ids: except_file_ids) legacy_find_unsynced(except_file_ids: except_file_ids)
else else
fdw_find_unsynced_lfs_objects(except_file_ids: except_file_ids) fdw_find_unsynced(except_file_ids: except_file_ids)
end end
relation.limit(batch_size) relation.limit(batch_size)
end end
def find_migrated_local_lfs_objects(batch_size:, except_file_ids: []) def find_migrated_local(batch_size:, except_file_ids: [])
relation = relation =
if use_legacy_queries? if use_legacy_queries?
legacy_find_migrated_local_lfs_objects(except_file_ids: except_file_ids) legacy_find_migrated_local(except_file_ids: except_file_ids)
else else
fdw_find_migrated_local_lfs_objects(except_file_ids: except_file_ids) fdw_find_migrated_local(except_file_ids: except_file_ids)
end end
relation.limit(batch_size) relation.limit(batch_size)
end end
def lfs_objects def syncable
if selective_sync? all.geo_syncable
LfsObject.joins(:projects).where(projects: { id: current_node.projects })
else
LfsObject.all
end
end
def syncable_lfs_objects
lfs_objects.geo_syncable
end end
def find_retryable_failed_lfs_objects_registries(batch_size:, except_file_ids: []) def find_retryable_failed_registries(batch_size:, except_file_ids: [])
find_failed_lfs_objects_registries find_failed_registries
.retry_due .retry_due
.where.not(file_id: except_file_ids) .where.not(file_id: except_file_ids)
.limit(batch_size) .limit(batch_size)
end end
def find_retryable_synced_missing_on_primary_lfs_objects_registries(batch_size:, except_file_ids: []) def find_retryable_synced_missing_on_primary_registries(batch_size:, except_file_ids: [])
find_synced_missing_on_primary_lfs_objects_registries find_synced_missing_on_primary_registries
.retry_due .retry_due
.where.not(file_id: except_file_ids) .where.not(file_id: except_file_ids)
.limit(batch_size) .limit(batch_size)
end end
def find_failed_lfs_objects_registries private
Geo::FileRegistry.lfs_objects.failed
end
def find_synced_missing_on_primary_lfs_objects_registries def all
Geo::FileRegistry.lfs_objects.synced.missing_on_primary if selective_sync?
LfsObject.joins(:projects).where(projects: { id: current_node.projects })
else
LfsObject.all
end
end end
private def find_synced
def find_synced_lfs_objects
if use_legacy_queries? if use_legacy_queries?
legacy_find_synced_lfs_objects legacy_find_synced
else else
fdw_find_synced_lfs_objects fdw_find_synced
end end
end end
def find_failed_lfs_objects def find_failed
if use_legacy_queries? if use_legacy_queries?
legacy_find_failed_lfs_objects legacy_find_failed
else else
fdw_find_failed_lfs_objects fdw_find_failed
end end
end end
def find_synced_registries
Geo::FileRegistry.lfs_objects.synced
end
def find_failed_registries
Geo::FileRegistry.lfs_objects.failed
end
def find_synced_missing_on_primary_registries
find_synced_registries.missing_on_primary
end
# #
# FDW accessors # FDW accessors
# #
def fdw_find_lfs_objects def fdw_find
fdw_lfs_objects.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_lfs_objects_table}.id") fdw_all.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id")
.geo_syncable .geo_syncable
.merge(Geo::FileRegistry.lfs_objects) .merge(Geo::FileRegistry.lfs_objects)
end end
def fdw_find_unsynced_lfs_objects(except_file_ids:) def fdw_find_unsynced(except_file_ids:)
fdw_lfs_objects.joins("LEFT OUTER JOIN file_registry fdw_all.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_lfs_objects_table}.id ON file_registry.file_id = #{fdw_table}.id
AND file_registry.file_type = 'lfs'") AND file_registry.file_type = 'lfs'")
.geo_syncable .geo_syncable
.where(file_registry: { id: nil }) .where(file_registry: { id: nil })
.where.not(id: except_file_ids) .where.not(id: except_file_ids)
end end
def fdw_find_migrated_local_lfs_objects(except_file_ids:) def fdw_find_migrated_local(except_file_ids:)
fdw_lfs_objects.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_lfs_objects_table}.id") fdw_all.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id")
.with_files_stored_remotely .with_files_stored_remotely
.where.not(id: except_file_ids) .where.not(id: except_file_ids)
.merge(Geo::FileRegistry.lfs_objects) .merge(Geo::FileRegistry.lfs_objects)
end end
def fdw_find_synced_lfs_objects def fdw_find_synced
fdw_find_lfs_objects.merge(Geo::FileRegistry.synced) fdw_find.merge(Geo::FileRegistry.synced)
end end
def fdw_find_synced_missing_on_primary_lfs_objects def fdw_find_synced_missing_on_primary
fdw_find_lfs_objects.merge(Geo::FileRegistry.synced.missing_on_primary) fdw_find.merge(Geo::FileRegistry.synced.missing_on_primary)
end end
def fdw_find_failed_lfs_objects def fdw_find_failed
fdw_find_lfs_objects.merge(Geo::FileRegistry.failed) fdw_find.merge(Geo::FileRegistry.failed)
end end
def fdw_lfs_objects def fdw_all
if selective_sync? if selective_sync?
Geo::Fdw::LfsObject.joins(:project).where(projects: { id: current_node.projects }) Geo::Fdw::LfsObject.joins(:project).where(projects: { id: current_node.projects })
else else
...@@ -162,7 +166,7 @@ module Geo ...@@ -162,7 +166,7 @@ module Geo
end end
end end
def fdw_lfs_objects_table def fdw_table
Geo::Fdw::LfsObject.table_name Geo::Fdw::LfsObject.table_name
end end
...@@ -170,46 +174,46 @@ module Geo ...@@ -170,46 +174,46 @@ module Geo
# Legacy accessors (non FDW) # Legacy accessors (non FDW)
# #
def legacy_find_synced_lfs_objects def legacy_find_synced
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
syncable_lfs_objects, syncable,
Geo::FileRegistry.lfs_objects.synced.pluck(:file_id), find_synced_registries.pluck(:file_id),
LfsObject LfsObject
) )
end end
def legacy_find_failed_lfs_objects def legacy_find_failed
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
syncable_lfs_objects, syncable,
find_failed_lfs_objects_registries.pluck(:file_id), find_failed_registries.pluck(:file_id),
LfsObject LfsObject
) )
end end
def legacy_find_unsynced_lfs_objects(except_file_ids:) def legacy_find_unsynced(except_file_ids:)
registry_file_ids = legacy_pluck_registry_file_ids(file_types: :lfs) | except_file_ids registry_file_ids = Geo::FileRegistry.lfs_objects.pluck(:file_id) | except_file_ids
legacy_left_outer_join_registry_ids( legacy_left_outer_join_registry_ids(
syncable_lfs_objects, syncable,
registry_file_ids, registry_file_ids,
LfsObject LfsObject
) )
end end
def legacy_find_migrated_local_lfs_objects(except_file_ids:) def legacy_find_migrated_local(except_file_ids:)
registry_file_ids = Geo::FileRegistry.lfs_objects.pluck(:file_id) - except_file_ids registry_file_ids = Geo::FileRegistry.lfs_objects.pluck(:file_id) - except_file_ids
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
lfs_objects.with_files_stored_remotely, all.with_files_stored_remotely,
registry_file_ids, registry_file_ids,
LfsObject LfsObject
) )
end end
def legacy_find_synced_missing_on_primary_lfs_objects def legacy_find_synced_missing_on_primary
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
syncable_lfs_objects, syncable,
Geo::FileRegistry.lfs_objects.synced.missing_on_primary.pluck(:file_id), find_synced_missing_on_primary_registries.pluck(:file_id),
LfsObject LfsObject
) )
end end
......
...@@ -150,9 +150,9 @@ class GeoNodeStatus < ActiveRecord::Base ...@@ -150,9 +150,9 @@ class GeoNodeStatus < ActiveRecord::Base
self.last_event_date = latest_event&.created_at self.last_event_date = latest_event&.created_at
self.repositories_count = projects_finder.count_repositories self.repositories_count = projects_finder.count_repositories
self.wikis_count = projects_finder.count_wikis self.wikis_count = projects_finder.count_wikis
self.lfs_objects_count = lfs_objects_finder.count_syncable_lfs_objects self.lfs_objects_count = lfs_objects_finder.count_syncable
self.job_artifacts_count = job_artifacts_finder.count_syncable_job_artifacts self.job_artifacts_count = job_artifacts_finder.count_syncable
self.attachments_count = attachments_finder.count_syncable_attachments self.attachments_count = attachments_finder.count_syncable
self.last_successful_status_check_at = Time.now self.last_successful_status_check_at = Time.now
self.storage_shards = StorageShard.all self.storage_shards = StorageShard.all
...@@ -202,18 +202,18 @@ class GeoNodeStatus < ActiveRecord::Base ...@@ -202,18 +202,18 @@ class GeoNodeStatus < ActiveRecord::Base
self.repositories_failed_count = projects_finder.count_failed_repositories self.repositories_failed_count = projects_finder.count_failed_repositories
self.wikis_synced_count = projects_finder.count_synced_wikis self.wikis_synced_count = projects_finder.count_synced_wikis
self.wikis_failed_count = projects_finder.count_failed_wikis self.wikis_failed_count = projects_finder.count_failed_wikis
self.lfs_objects_synced_count = lfs_objects_finder.count_synced_lfs_objects self.lfs_objects_synced_count = lfs_objects_finder.count_synced
self.lfs_objects_failed_count = lfs_objects_finder.count_failed_lfs_objects self.lfs_objects_failed_count = lfs_objects_finder.count_failed
self.lfs_objects_registry_count = lfs_objects_finder.count_registry_lfs_objects self.lfs_objects_registry_count = lfs_objects_finder.count_registry
self.lfs_objects_synced_missing_on_primary_count = lfs_objects_finder.count_synced_missing_on_primary_lfs_objects self.lfs_objects_synced_missing_on_primary_count = lfs_objects_finder.count_synced_missing_on_primary
self.job_artifacts_synced_count = job_artifacts_finder.count_synced_job_artifacts self.job_artifacts_synced_count = job_artifacts_finder.count_synced
self.job_artifacts_failed_count = job_artifacts_finder.count_failed_job_artifacts self.job_artifacts_failed_count = job_artifacts_finder.count_failed
self.job_artifacts_registry_count = job_artifacts_finder.count_registry_job_artifacts self.job_artifacts_registry_count = job_artifacts_finder.count_registry
self.job_artifacts_synced_missing_on_primary_count = job_artifacts_finder.count_synced_missing_on_primary_job_artifacts self.job_artifacts_synced_missing_on_primary_count = job_artifacts_finder.count_synced_missing_on_primary
self.attachments_synced_count = attachments_finder.count_synced_attachments self.attachments_synced_count = attachments_finder.count_synced
self.attachments_failed_count = attachments_finder.count_failed_attachments self.attachments_failed_count = attachments_finder.count_failed
self.attachments_registry_count = attachments_finder.count_registry_attachments self.attachments_registry_count = attachments_finder.count_registry
self.attachments_synced_missing_on_primary_count = attachments_finder.count_synced_missing_on_primary_attachments self.attachments_synced_missing_on_primary_count = attachments_finder.count_synced_missing_on_primary
load_verification_data load_verification_data
end end
......
...@@ -28,24 +28,25 @@ module Geo ...@@ -28,24 +28,25 @@ module Geo
end end
end end
# Get a batch of unsynced resources, taking equal parts from each resource.
#
# @return [Array] job arguments of unsynced resources # @return [Array] job arguments of unsynced resources
def find_unsynced_jobs(batch_size:) def find_unsynced_jobs(batch_size:)
find_jobs(sync_statuses: [:unsynced], batch_size: batch_size) jobs = job_finders.reduce([]) do |jobs, job_finder|
end jobs << job_finder.find_unsynced_jobs(batch_size: batch_size)
end
# @return [Array] job arguments of low priority resources take_batch(*jobs, batch_size: batch_size)
def find_low_priority_jobs(batch_size:)
find_jobs(sync_statuses: [:failed, :synced_missing_on_primary], batch_size: batch_size)
end end
# Get a batch of resources taking equal parts from each resource. # Get a batch of failed and synced-but-missing-on-primary resources, taking
# equal parts from each resource.
# #
# @return [Array] job arguments of a batch of resources # @return [Array] job arguments of low priority resources
def find_jobs(sync_statuses:, batch_size:) def find_low_priority_jobs(batch_size:)
jobs = job_finders.reduce([]) do |jobs, job_finder| jobs = job_finders.reduce([]) do |jobs, job_finder|
sync_statuses.reduce(jobs) do |jobs, sync_status| jobs << job_finder.find_failed_jobs(batch_size: batch_size)
jobs << job_finder.find_jobs(sync_status: sync_status, batch_size: batch_size) jobs << job_finder.find_synced_missing_on_primary_jobs(batch_size: batch_size)
end
end end
take_batch(*jobs, batch_size: batch_size) take_batch(*jobs, batch_size: batch_size)
......
module Geo module Geo
class FileDownloadDispatchWorker class FileDownloadDispatchWorker
class AttachmentJobFinder < JobFinder class AttachmentJobFinder < JobFinder
def resource_type EXCEPT_RESOURCE_IDS_KEY = :except_file_ids
:attachment
end
def except_resource_ids_key def registry_finder
:except_file_ids @registry_finder ||= Geo::AttachmentRegistryFinder.new(current_node: Gitlab::Geo.current_node)
end end
# Why do we need a different `file_type` for each Uploader? Why not just use 'upload'? private
def find_unsynced_jobs(batch_size:)
registry_finder.find_unsynced_attachments(batch_size: batch_size, except_file_ids: scheduled_file_ids)
.pluck(:id, :uploader)
.map { |id, uploader| [uploader.sub(/Uploader\z/, '').underscore, id] }
end
def find_failed_jobs(batch_size:) # Why do we need a different `file_type` for each Uploader? Why not just use 'upload'?
find_failed_registries(batch_size: batch_size).pluck(:file_type, :file_id) def convert_resource_relation_to_job_args(relation)
relation.pluck(:id, :uploader)
.map { |id, uploader| [uploader.sub(/Uploader\z/, '').underscore, id] }
end end
def find_synced_missing_on_primary_jobs(batch_size:) def convert_registry_relation_to_job_args(relation)
find_synced_missing_on_primary_registries(batch_size: batch_size).pluck(:file_type, :file_id) relation.pluck(:file_type, :file_id)
end end
end end
end end
......
module Geo module Geo
class FileDownloadDispatchWorker class FileDownloadDispatchWorker
class JobArtifactJobFinder < JobFinder class JobArtifactJobFinder < JobFinder
def resource_type RESOURCE_ID_KEY = :artifact_id
:job_artifact EXCEPT_RESOURCE_IDS_KEY = :except_artifact_ids
end FILE_SERVICE_OBJECT_TYPE = :job_artifact
def resource_id_prefix
:artifact
end
def find_unsynced_jobs(batch_size:)
registry_finder.find_unsynced_job_artifacts(batch_size: batch_size, except_artifact_ids: scheduled_file_ids)
.pluck(:id)
.map { |id| ['job_artifact', id] }
end
def find_failed_jobs(batch_size:)
find_failed_registries(batch_size: batch_size).pluck(:artifact_id).map { |id| ['job_artifact', id] }
end
def find_synced_missing_on_primary_jobs(batch_size:) def registry_finder
find_synced_missing_on_primary_registries(batch_size: batch_size).pluck(:artifact_id).map { |id| ['job_artifact', id] } @registry_finder ||= Geo::JobArtifactRegistryFinder.new(current_node: Gitlab::Geo.current_node)
end end
end end
end end
......
module Geo module Geo
class FileDownloadDispatchWorker class FileDownloadDispatchWorker
# This class is meant to be inherited, and is responsible for generating
# batches of job arguments for FileDownloadWorker.
#
# The subclass should define
#
# * registry_finder
# * EXCEPT_RESOURCE_IDS_KEY
# * RESOURCE_ID_KEY
# * FILE_SERVICE_OBJECT_TYPE
#
class JobFinder class JobFinder
include Gitlab::Utils::StrongMemoize include Gitlab::Utils::StrongMemoize
attr_reader :registry_finder, :scheduled_file_ids attr_reader :scheduled_file_ids
def initialize(scheduled_file_ids) def initialize(scheduled_file_ids)
current_node = Gitlab::Geo.current_node
@registry_finder = registry_finder_class.new(current_node: current_node)
@scheduled_file_ids = scheduled_file_ids @scheduled_file_ids = scheduled_file_ids
end end
def registry_finder_class def find_unsynced_jobs(batch_size:)
"Geo::#{resource_type.to_s.classify}RegistryFinder".constantize convert_resource_relation_to_job_args(
registry_finder.find_unsynced(find_batch_params(batch_size))
)
end end
def except_resource_ids_key def find_failed_jobs(batch_size:)
:"except_#{resource_id_prefix}_ids" convert_registry_relation_to_job_args(
registry_finder.find_retryable_failed_registries(find_batch_params(batch_size))
)
end end
def find_jobs(sync_status:, batch_size:) def find_synced_missing_on_primary_jobs(batch_size:)
self.public_send(:"find_#{sync_status}_jobs", batch_size: batch_size) # rubocop:disable GitlabSecurity/PublicSend convert_registry_relation_to_job_args(
registry_finder.find_retryable_synced_missing_on_primary_registries(find_batch_params(batch_size))
)
end end
def find_failed_registries(batch_size:) private
registry_finder.public_send(:"find_retryable_failed_#{resource_type}s_registries", batch_size: batch_size, except_resource_ids_key => scheduled_file_ids) # rubocop:disable GitlabSecurity/PublicSend
def find_batch_params(batch_size)
{
:batch_size => batch_size,
self.class::EXCEPT_RESOURCE_IDS_KEY => scheduled_file_ids
}
end
def convert_resource_relation_to_job_args(relation)
relation.pluck(:id).map { |id| [self.class::FILE_SERVICE_OBJECT_TYPE.to_s, id] }
end end
def find_synced_missing_on_primary_registries(batch_size:) def convert_registry_relation_to_job_args(relation)
registry_finder.public_send(:"find_retryable_synced_missing_on_primary_#{resource_type}s_registries", batch_size: batch_size, except_resource_ids_key => scheduled_file_ids) # rubocop:disable GitlabSecurity/PublicSend relation.pluck(self.class::RESOURCE_ID_KEY).map { |id| [self.class::FILE_SERVICE_OBJECT_TYPE.to_s, id] }
end end
end end
end end
......
module Geo module Geo
class FileDownloadDispatchWorker class FileDownloadDispatchWorker
class LfsObjectJobFinder < JobFinder class LfsObjectJobFinder < JobFinder
def resource_type RESOURCE_ID_KEY = :file_id
:lfs_object EXCEPT_RESOURCE_IDS_KEY = :except_file_ids
end FILE_SERVICE_OBJECT_TYPE = :lfs
def except_resource_ids_key
:except_file_ids
end
def find_unsynced_jobs(batch_size:)
registry_finder.find_unsynced_lfs_objects(batch_size: batch_size, except_file_ids: scheduled_file_ids)
.pluck(:id)
.map { |id| ['lfs', id] }
end
def find_failed_jobs(batch_size:)
find_failed_registries(batch_size: batch_size).pluck(:file_id).map { |id| ['lfs', id] }
end
def find_synced_missing_on_primary_jobs(batch_size:) def registry_finder
find_synced_missing_on_primary_registries(batch_size: batch_size).pluck(:file_id).map { |id| ['lfs', id] } @registry_finder ||= Geo::LfsObjectRegistryFinder.new(current_node: Gitlab::Geo.current_node)
end end
end end
end end
......
...@@ -45,7 +45,7 @@ module Geo ...@@ -45,7 +45,7 @@ module Geo
def find_migrated_local_lfs_objects_ids(batch_size:) def find_migrated_local_lfs_objects_ids(batch_size:)
return [] unless lfs_objects_object_store_enabled? return [] unless lfs_objects_object_store_enabled?
lfs_objects_finder.find_migrated_local_lfs_objects(batch_size: batch_size, except_file_ids: scheduled_file_ids(:lfs)) lfs_objects_finder.find_migrated_local(batch_size: batch_size, except_file_ids: scheduled_file_ids(:lfs))
.pluck(:id) .pluck(:id)
.map { |id| ['lfs', id] } .map { |id| ['lfs', id] }
end end
...@@ -53,7 +53,7 @@ module Geo ...@@ -53,7 +53,7 @@ module Geo
def find_migrated_local_attachments_ids(batch_size:) def find_migrated_local_attachments_ids(batch_size:)
return [] unless attachments_object_store_enabled? return [] unless attachments_object_store_enabled?
attachments_finder.find_migrated_local_attachments(batch_size: batch_size, except_file_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES)) attachments_finder.find_migrated_local(batch_size: batch_size, except_file_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
.pluck(:uploader, :id) .pluck(:uploader, :id)
.map { |uploader, id| [uploader.sub(/Uploader\z/, '').underscore, id] } .map { |uploader, id| [uploader.sub(/Uploader\z/, '').underscore, id] }
end end
...@@ -61,7 +61,7 @@ module Geo ...@@ -61,7 +61,7 @@ module Geo
def find_migrated_local_job_artifacts_ids(batch_size:) def find_migrated_local_job_artifacts_ids(batch_size:)
return [] unless job_artifacts_object_store_enabled? return [] unless job_artifacts_object_store_enabled?
job_artifacts_finder.find_migrated_local_job_artifacts(batch_size: batch_size, except_artifact_ids: scheduled_file_ids(:job_artifact)) job_artifacts_finder.find_migrated_local(batch_size: batch_size, except_artifact_ids: scheduled_file_ids(:job_artifact))
.pluck(:id) .pluck(:id)
.map { |id| ['job_artifact', id] } .map { |id| ['job_artifact', id] }
end end
......
...@@ -3,6 +3,10 @@ require 'spec_helper' ...@@ -3,6 +3,10 @@ require 'spec_helper'
describe Geo::ProjectRegistryFinder, :geo do describe Geo::ProjectRegistryFinder, :geo do
include ::EE::GeoHelpers include ::EE::GeoHelpers
# Using let() instead of set() because set() does not work properly
# when using the :delete DatabaseCleaner strategy, which is required for FDW
# tests because a foreign table can't see changes inside a transaction of a
# different connection.
let(:secondary) { create(:geo_node) } let(:secondary) { create(:geo_node) }
let(:synced_group) { create(:group) } let(:synced_group) { create(:group) }
let!(:project_not_synced) { create(:project) } let!(:project_not_synced) { create(:project) }
......
shared_examples_for 'a file registry finder' do
it 'responds to file registry finder methods' do
file_registry_finder_methods = %i{
syncable
count_syncable
count_synced
count_failed
count_synced_missing_on_primary
count_registry
find_unsynced
find_migrated_local
find_retryable_failed_registries
find_retryable_synced_missing_on_primary_registries
}
file_registry_finder_methods.each do |method|
expect(subject).to respond_to(method)
end
end
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
context 'FDW', :delete do
before do
skip('FDW is not configured') if Gitlab::Database.postgresql? && !Gitlab::Geo::Fdw.enabled?
end
include_examples 'counts all the things'
include_examples 'finds all the things' do
let(:method_prefix) { 'fdw' }
end
end
context 'Legacy' do
before do
allow(Gitlab::Geo::Fdw).to receive(:enabled?).and_return(false)
end
include_examples 'counts all the things'
include_examples 'finds all the things' do
let(:method_prefix) { 'legacy' }
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment