Commit c93ba76a authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre Committed by Mayra Cabrera

Treat registry as SSOT for Uploads

FileDownloadDispatchWorker looks only at the registry table
for things that have never been synced.

In order for the counts to be correct, we have to distinguish
between "never synced" and "failed" with "retry_count" being
"nil" for the former and "not nil" for the latter.

Feature flag is "geo_file_registry_ssot_sync".
parent 3f66ce3b
...@@ -34,64 +34,119 @@ module Geo ...@@ -34,64 +34,119 @@ module Geo
Upload Upload
end end
# Returns untracked IDs as well as tracked IDs that are unused.
#
# Untracked IDs are model IDs that are supposed to be synced but don't yet
# have a registry entry.
#
# Unused tracked IDs are model IDs that are not supposed to be synced but
# already have a registry entry. For example:
#
# - orphaned registries
# - records that became excluded from selective sync
# - records that are in object storage, and `sync_object_storage` became
# disabled
#
# We compute both sets in this method to reduce the number of DB queries
# performed.
#
# @return [Array] the first element is an Array of untracked IDs, and the second element is an Array of tracked IDs that are unused
def find_registry_differences(range)
source = attachments(fdw: false).where(id: range).pluck(::Upload.arel_table[:id], ::Upload.arel_table[:uploader]) # rubocop:disable CodeReuse/ActiveRecord
tracked = Geo::UploadRegistry.where(file_id: range).pluck(:file_id, :file_type) # rubocop:disable CodeReuse/ActiveRecord
untracked = source - tracked
unused_tracked = tracked - source
[untracked, unused_tracked]
end
# Returns Geo::UploadRegistry records that have never been synced.
#
# Does not care about selective sync, because it considers the Registry
# table to be the single source of truth. The contract is that other
# processes need to ensure that the table only contains records that should
# be synced.
#
# Any registries that have ever been synced that currently need to be
# resynced will be handled by other find methods (like
# #find_retryable_failed_registries)
#
# You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord
def find_never_synced_registries(batch_size:, except_ids: [])
Geo::UploadRegistry
.never
.model_id_not_in(except_ids)
.limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# Deprecated in favor of the process using
# #find_registry_differences and #find_never_synced_registries
#
# Find limited amount of non replicated attachments. # Find limited amount of non replicated attachments.
# #
# You can pass a list with `except_file_ids:` so you can exclude items you # You can pass a list with `except_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet # already scheduled but haven't finished and aren't persisted to the database yet
# #
# TODO: Alternative here is to use some sort of window function with a cursor instead # TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want # of simply limiting the query and passing a list of items we don't want
# #
# @param [Integer] batch_size used to limit the results returned # @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_file_ids ids that will be ignored from the query # @param [Array<Integer>] except_ids ids that will be ignored from the query
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_file_ids: []) def find_unsynced(batch_size:, except_ids: [])
attachments attachments
.missing_registry .missing_registry
.id_not_in(except_file_ids) .id_not_in(except_ids)
.limit(batch_size) .limit(batch_size)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_migrated_local(batch_size:, except_file_ids: []) def find_migrated_local(batch_size:, except_ids: [])
all_attachments all_attachments
.inner_join_registry .inner_join_registry
.with_files_stored_remotely .with_files_stored_remotely
.id_not_in(except_file_ids) .id_not_in(except_ids)
.limit(batch_size) .limit(batch_size)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_retryable_failed_registries(batch_size:, except_file_ids: []) def find_retryable_failed_registries(batch_size:, except_ids: [])
Geo::UploadRegistry Geo::UploadRegistry
.failed .failed
.retry_due .retry_due
.file_id_not_in(except_file_ids) .model_id_not_in(except_ids)
.limit(batch_size) .limit(batch_size)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def find_retryable_synced_missing_on_primary_registries(batch_size:, except_file_ids: []) def find_retryable_synced_missing_on_primary_registries(batch_size:, except_ids: [])
Geo::UploadRegistry Geo::UploadRegistry
.synced .synced
.missing_on_primary .missing_on_primary
.retry_due .retry_due
.file_id_not_in(except_file_ids) .model_id_not_in(except_ids)
.limit(batch_size) .limit(batch_size)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
private private
def attachments def attachments(fdw: true)
local_storage_only? ? all_attachments.with_files_stored_locally : all_attachments local_storage_only?(fdw: fdw) ? all_attachments(fdw: fdw).with_files_stored_locally : all_attachments(fdw: fdw)
end end
def all_attachments def all_attachments(fdw: true)
current_node.attachments current_node(fdw: fdw).attachments
end end
def registries_for_attachments def registries_for_attachments
......
...@@ -3,6 +3,9 @@ ...@@ -3,6 +3,9 @@
class Geo::UploadRegistry < Geo::BaseRegistry class Geo::UploadRegistry < Geo::BaseRegistry
include Geo::Syncable include Geo::Syncable
MODEL_CLASS = ::Upload
MODEL_FOREIGN_KEY = :file_id
self.table_name = 'file_registry' self.table_name = 'file_registry'
belongs_to :upload, foreign_key: :file_id belongs_to :upload, foreign_key: :file_id
...@@ -11,16 +14,27 @@ class Geo::UploadRegistry < Geo::BaseRegistry ...@@ -11,16 +14,27 @@ class Geo::UploadRegistry < Geo::BaseRegistry
scope :fresh, -> { order(created_at: :desc) } scope :fresh, -> { order(created_at: :desc) }
scope :never, -> { where(success: false, retry_count: nil) } scope :never, -> { where(success: false, retry_count: nil) }
def self.file_id_in(ids) def self.registry_consistency_worker_enabled?
where(file_id: ids) Feature.enabled?(:geo_file_registry_ssot_sync)
end
def self.finder_class
::Geo::AttachmentRegistryFinder
end end
def self.file_id_not_in(ids) # If false, RegistryConsistencyService will frequently check the end of the
where.not(file_id: ids) # table to quickly handle new replicables.
def self.has_create_events?
false
end end
def self.pluck_file_key # TODO: Investigate replacing this with bulk insert (there was an obstacle).
where(nil).pluck(:file_id) # https://gitlab.com/gitlab-org/gitlab/issues/197310
def self.insert_for_model_ids(attrs)
attrs.map do |file_id, file_type|
registry = create(file_id: file_id, file_type: file_type)
registry.id
end.compact
end end
def self.with_search(query) def self.with_search(query)
......
...@@ -3,12 +3,22 @@ ...@@ -3,12 +3,22 @@
module Geo module Geo
class FileDownloadDispatchWorker # rubocop:disable Scalability/IdempotentWorker class FileDownloadDispatchWorker # rubocop:disable Scalability/IdempotentWorker
class AttachmentJobFinder < JobFinder # rubocop:disable Scalability/IdempotentWorker class AttachmentJobFinder < JobFinder # rubocop:disable Scalability/IdempotentWorker
EXCEPT_RESOURCE_IDS_KEY = :except_file_ids EXCEPT_RESOURCE_IDS_KEY = :except_ids
def registry_finder def registry_finder
@registry_finder ||= Geo::AttachmentRegistryFinder.new(current_node_id: Gitlab::Geo.current_node.id) @registry_finder ||= Geo::AttachmentRegistryFinder.new(current_node_id: Gitlab::Geo.current_node.id)
end end
def find_unsynced_jobs(batch_size:)
if Geo::UploadRegistry.registry_consistency_worker_enabled?
convert_registry_relation_to_job_args(
registry_finder.find_never_synced_registries(find_batch_params(batch_size))
)
else
super
end
end
private private
# Why do we need a different `file_type` for each Uploader? Why not just use 'upload'? # Why do we need a different `file_type` for each Uploader? Why not just use 'upload'?
......
...@@ -12,7 +12,7 @@ module Geo ...@@ -12,7 +12,7 @@ module Geo
end end
def find_unsynced_jobs(batch_size:) def find_unsynced_jobs(batch_size:)
if Feature.enabled?(:geo_lfs_registry_ssot_sync) if Geo::LfsObjectRegistry.registry_consistency_worker_enabled?
convert_registry_relation_to_job_args( convert_registry_relation_to_job_args(
registry_finder.find_never_synced_registries(find_batch_params(batch_size)) registry_finder.find_never_synced_registries(find_batch_params(batch_size))
) )
......
...@@ -58,7 +58,7 @@ module Geo ...@@ -58,7 +58,7 @@ module Geo
def find_migrated_local_attachments_ids(batch_size:) def find_migrated_local_attachments_ids(batch_size:)
return [] unless attachments_object_store_enabled? return [] unless attachments_object_store_enabled?
attachments_finder.find_migrated_local(batch_size: batch_size, except_file_ids: scheduled_file_ids(Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES)) attachments_finder.find_migrated_local(batch_size: batch_size, except_ids: scheduled_file_ids(Gitlab::Geo::Replication::USER_UPLOADS_OBJECT_TYPES))
.pluck(:uploader, :id) .pluck(:uploader, :id)
.map { |uploader, id| [uploader.sub(/Uploader\z/, '').underscore, id] } .map { |uploader, id| [uploader.sub(/Uploader\z/, '').underscore, id] }
end end
......
...@@ -16,7 +16,7 @@ module Geo ...@@ -16,7 +16,7 @@ module Geo
feature_category :geo_replication feature_category :geo_replication
# This is probably not the best place to "register" replicables for this functionality # This is probably not the best place to "register" replicables for this functionality
REGISTRY_CLASSES = [Geo::JobArtifactRegistry, Geo::LfsObjectRegistry].freeze REGISTRY_CLASSES = [Geo::JobArtifactRegistry, Geo::LfsObjectRegistry, Geo::UploadRegistry].freeze
BATCH_SIZE = 1000 BATCH_SIZE = 1000
# @return [Boolean] true if at least 1 registry was created, else false # @return [Boolean] true if at least 1 registry was created, else false
......
...@@ -19,6 +19,11 @@ FactoryBot.define do ...@@ -19,6 +19,11 @@ FactoryBot.define do
retry_count { 1 } retry_count { 1 }
end end
trait :never_synced do
success { false }
retry_count { nil }
end
trait :with_file do trait :with_file do
after(:build, :stub) do |registry, _| after(:build, :stub) do |registry, _|
file = file =
......
...@@ -53,7 +53,7 @@ describe Geo::AttachmentRegistryFinder, :geo, :geo_fdw do ...@@ -53,7 +53,7 @@ describe Geo::AttachmentRegistryFinder, :geo, :geo_fdw do
end end
it 'returns attachments without an entry on the tracking database, excluding from exception list' do it 'returns attachments without an entry on the tracking database, excluding from exception list' do
attachments = subject.find_unsynced(batch_size: 10, except_file_ids: [upload_issuable_synced_nested_project.id]) attachments = subject.find_unsynced(batch_size: 10, except_ids: [upload_issuable_synced_nested_project.id])
expect(attachments).to match_ids(upload_unsynced_project, upload_synced_project, upload_personal_snippet, expect(attachments).to match_ids(upload_unsynced_project, upload_synced_project, upload_personal_snippet,
upload_remote_unsynced_project, upload_remote_synced_group) upload_remote_unsynced_project, upload_remote_synced_group)
...@@ -64,7 +64,7 @@ describe Geo::AttachmentRegistryFinder, :geo, :geo_fdw do ...@@ -64,7 +64,7 @@ describe Geo::AttachmentRegistryFinder, :geo, :geo_fdw do
let(:secondary) { create(:geo_node, :local_storage_only) } let(:secondary) { create(:geo_node, :local_storage_only) }
it 'returns local attachments only' do it 'returns local attachments only' do
attachments = subject.find_unsynced(batch_size: 10, except_file_ids: [upload_synced_project.id]) attachments = subject.find_unsynced(batch_size: 10, except_ids: [upload_synced_project.id])
expect(attachments).to match_ids(upload_issuable_synced_nested_project, upload_unsynced_project, expect(attachments).to match_ids(upload_issuable_synced_nested_project, upload_unsynced_project,
upload_personal_snippet) upload_personal_snippet)
...@@ -75,7 +75,7 @@ describe Geo::AttachmentRegistryFinder, :geo, :geo_fdw do ...@@ -75,7 +75,7 @@ describe Geo::AttachmentRegistryFinder, :geo, :geo_fdw do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) } let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'returns attachments without an entry on the tracking database, excluding from exception list' do it 'returns attachments without an entry on the tracking database, excluding from exception list' do
attachments = subject.find_unsynced(batch_size: 10, except_file_ids: [upload_synced_project.id]) attachments = subject.find_unsynced(batch_size: 10, except_ids: [upload_synced_project.id])
expect(attachments).to match_ids(upload_issuable_synced_nested_project, upload_personal_snippet, expect(attachments).to match_ids(upload_issuable_synced_nested_project, upload_personal_snippet,
upload_remote_synced_group) upload_remote_synced_group)
...@@ -101,7 +101,7 @@ describe Geo::AttachmentRegistryFinder, :geo, :geo_fdw do ...@@ -101,7 +101,7 @@ describe Geo::AttachmentRegistryFinder, :geo, :geo_fdw do
end end
it 'returns attachments stored remotely and successfully synced locally' do it 'returns attachments stored remotely and successfully synced locally' do
attachments = subject.find_migrated_local(batch_size: 100, except_file_ids: [upload_remote_unsynced_project.id]) attachments = subject.find_migrated_local(batch_size: 100, except_ids: [upload_remote_unsynced_project.id])
expect(attachments).to match_ids(upload_remote_synced_project) expect(attachments).to match_ids(upload_remote_synced_project)
end end
......
...@@ -25,8 +25,6 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -25,8 +25,6 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
end end
describe '#perform' do describe '#perform' do
subject { described_class.new }
before do before do
allow(subject).to receive(:sleep) # faster tests allow(subject).to receive(:sleep) # faster tests
end end
...@@ -80,14 +78,17 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -80,14 +78,17 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
it 'creates missing registries for each registry class' do it 'creates missing registries for each registry class' do
lfs_object = create(:lfs_object) lfs_object = create(:lfs_object)
job_artifact = create(:ci_job_artifact) job_artifact = create(:ci_job_artifact)
upload = create(:upload)
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(0) expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(0)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(0) expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(0)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(0)
subject.perform subject.perform
expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(1) expect(Geo::LfsObjectRegistry.where(lfs_object_id: lfs_object.id).count).to eq(1)
expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(1) expect(Geo::JobArtifactRegistry.where(artifact_id: job_artifact.id).count).to eq(1)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(1)
end end
context 'when geo_lfs_registry_ssot_sync is disabled' do context 'when geo_lfs_registry_ssot_sync is disabled' do
...@@ -103,6 +104,7 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -103,6 +104,7 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
it 'does not execute RegistryConsistencyService for LFS objects' do it 'does not execute RegistryConsistencyService for LFS objects' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000) expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000)
...@@ -123,6 +125,7 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -123,6 +125,7 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
it 'does not execute RegistryConsistencyService for Job Artifacts' do it 'does not execute RegistryConsistencyService for Job Artifacts' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::UploadRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000) expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000)
...@@ -130,6 +133,27 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do ...@@ -130,6 +133,27 @@ describe Geo::Secondary::RegistryConsistencyWorker, :geo, :geo_fdw do
end end
end end
context 'when geo_file_registry_ssot_sync is disabled' do
let_it_be(:upload) { create(:upload) }
before do
stub_feature_flags(geo_file_registry_ssot_sync: false)
end
it 'returns false' do
expect(subject.perform).to be_falsey
end
it 'does not execute RegistryConsistencyService for Uploads' do
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::JobArtifactRegistry, batch_size: 1000).and_call_original
allow(Geo::RegistryConsistencyService).to receive(:new).with(Geo::LfsObjectRegistry, batch_size: 1000).and_call_original
expect(Geo::RegistryConsistencyService).not_to receive(:new).with(Geo::UploadRegistry, batch_size: 1000)
subject.perform
end
end
context 'when the current Geo node is disabled or primary' do context 'when the current Geo node is disabled or primary' do
before do before do
stub_primary_node stub_primary_node
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment