Commit cc088fff authored by Nick Thomas's avatar Nick Thomas

Merge branch...

Merge branch '11001-geo-implement-selective-sync-support-for-the-job-artifacts-fdw-queries' into 'master'

Geo - Add selective sync support for the job artifacts FDW queries

Closes #11001

See merge request gitlab-org/gitlab-ee!11892
parents e1c4d28e 79822cfa
......@@ -7,31 +7,29 @@ module Geo
end
def count_synced
if aggregate_pushdown_supported?
find_synced.count
else
legacy_find_synced.count
end
job_artifacts_synced.count
end
def count_failed
if aggregate_pushdown_supported?
find_failed.count
else
legacy_find_failed.count
end
job_artifacts_failed.count
end
def count_synced_missing_on_primary
if aggregate_pushdown_supported?
find_synced_missing_on_primary.count
else
legacy_find_synced_missing_on_primary.count
end
job_artifacts_synced_missing_on_primary.count
end
def count_registry
Geo::JobArtifactRegistry.count
registries_for_job_artifacts.count
end
def syncable
if use_legacy_queries_for_selective_sync?
legacy_finder.syncable
elsif selective_sync?
fdw_geo_node.job_artifacts.syncable
else
Ci::JobArtifact.syncable
end
end
# Find limited amount of non replicated job artifacts.
......@@ -47,10 +45,10 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_artifact_ids: [])
relation =
if use_legacy_queries?
legacy_find_unsynced(except_artifact_ids: except_artifact_ids)
if use_legacy_queries_for_selective_sync?
legacy_finder.job_artifacts_unsynced(except_artifact_ids: except_artifact_ids)
else
fdw_find_unsynced(except_artifact_ids: except_artifact_ids)
job_artifacts_unsynced(except_artifact_ids: except_artifact_ids)
end
relation.limit(batch_size)
......@@ -60,186 +58,98 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord
def find_migrated_local(batch_size:, except_artifact_ids: [])
relation =
if use_legacy_queries?
legacy_find_migrated_local(except_artifact_ids: except_artifact_ids)
if use_legacy_queries_for_selective_sync?
legacy_finder.job_artifacts_migrated_local(except_artifact_ids: except_artifact_ids)
else
fdw_find_migrated_local(except_artifact_ids: except_artifact_ids)
job_artifacts_migrated_local(except_artifact_ids: except_artifact_ids)
end
relation.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
def syncable
all.geo_syncable
end
# rubocop: disable CodeReuse/ActiveRecord
def find_retryable_failed_registries(batch_size:, except_artifact_ids: [])
find_failed_registries
Geo::JobArtifactRegistry
.failed
.retry_due
.where.not(artifact_id: except_artifact_ids)
.artifact_id_not_in(except_artifact_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def find_retryable_synced_missing_on_primary_registries(batch_size:, except_artifact_ids: [])
find_synced_missing_on_primary_registries
Geo::JobArtifactRegistry
.synced
.missing_on_primary
.retry_due
.where.not(artifact_id: except_artifact_ids)
.artifact_id_not_in(except_artifact_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
private
# rubocop: disable CodeReuse/ActiveRecord
def all
if selective_sync?
Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
else
Ci::JobArtifact.all
end
# rubocop:disable CodeReuse/Finder
def legacy_finder
@legacy_finder ||= Geo::LegacyJobArtifactRegistryFinder.new(current_node: current_node)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop:enable CodeReuse/Finder
def find_synced
if use_legacy_queries?
legacy_find_synced
else
fdw_find.merge(find_synced_registries)
end
def fdw_geo_node
@fdw_geo_node ||= Geo::Fdw::GeoNode.find(current_node.id)
end
def find_synced_missing_on_primary
if use_legacy_queries?
legacy_find_synced_missing_on_primary
def registries_for_job_artifacts
if use_legacy_queries_for_selective_sync?
legacy_finder.registries_for_job_artifacts
else
fdw_find.merge(find_synced_missing_on_primary_registries)
fdw_geo_node
.job_artifacts
.inner_join_job_artifact_registry
.syncable
end
end
def find_failed
if use_legacy_queries?
legacy_find_failed
def job_artifacts_synced
if use_legacy_queries_for_selective_sync?
legacy_finder.job_artifacts_synced
else
fdw_find.merge(find_failed_registries)
registries_for_job_artifacts.merge(Geo::JobArtifactRegistry.synced)
end
end
def find_synced_registries
Geo::JobArtifactRegistry.synced
end
def find_synced_missing_on_primary_registries
find_synced_registries.missing_on_primary
end
def find_failed_registries
Geo::JobArtifactRegistry.failed
end
#
# FDW accessors
#
# rubocop: disable CodeReuse/ActiveRecord
def fdw_find
fdw_all.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_table}.id")
.geo_syncable
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def fdw_find_unsynced(except_artifact_ids:)
fdw_all.joins("LEFT OUTER JOIN job_artifact_registry
ON job_artifact_registry.artifact_id = #{fdw_table}.id")
.geo_syncable
.where(job_artifact_registry: { artifact_id: nil })
.where.not(id: except_artifact_ids)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def fdw_find_migrated_local(except_artifact_ids:)
fdw_all.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_table}.id")
.with_files_stored_remotely
.where.not(id: except_artifact_ids)
.merge(Geo::JobArtifactRegistry.all)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def fdw_all
if selective_sync?
Geo::Fdw::Ci::JobArtifact.joins(:project).where(projects: { id: current_node.projects })
def job_artifacts_failed
if use_legacy_queries_for_selective_sync?
legacy_finder.job_artifacts_failed
else
Geo::Fdw::Ci::JobArtifact.all
registries_for_job_artifacts.merge(Geo::JobArtifactRegistry.failed)
end
end
# rubocop: enable CodeReuse/ActiveRecord
def fdw_table
Geo::Fdw::Ci::JobArtifact.table_name
end
#
# Legacy accessors (non FDW)
#
# rubocop: disable CodeReuse/ActiveRecord
def legacy_find_synced
legacy_inner_join_registry_ids(
syncable,
find_synced_registries.pluck(:artifact_id),
Ci::JobArtifact
)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def legacy_find_failed
legacy_inner_join_registry_ids(
syncable,
find_failed_registries.pluck(:artifact_id),
Ci::JobArtifact
)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def legacy_find_unsynced(except_artifact_ids:)
registry_artifact_ids = Geo::JobArtifactRegistry.pluck(:artifact_id) | except_artifact_ids
legacy_left_outer_join_registry_ids(
syncable,
registry_artifact_ids,
Ci::JobArtifact
)
def job_artifacts_synced_missing_on_primary
if use_legacy_queries_for_selective_sync?
legacy_finder.job_artifacts_synced_missing_on_primary
else
registries_for_job_artifacts.merge(Geo::JobArtifactRegistry.synced.missing_on_primary)
end
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def legacy_find_migrated_local(except_artifact_ids:)
registry_artifact_ids = Geo::JobArtifactRegistry.pluck(:artifact_id) - except_artifact_ids
legacy_inner_join_registry_ids(
all.with_files_stored_remotely,
registry_artifact_ids,
Ci::JobArtifact
)
def job_artifacts_unsynced(except_artifact_ids:)
fdw_geo_node
.job_artifacts
.syncable
.missing_job_artifact_registry
.id_not_in(except_artifact_ids)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def legacy_find_synced_missing_on_primary
legacy_inner_join_registry_ids(
syncable,
find_synced_missing_on_primary_registries.pluck(:artifact_id),
Ci::JobArtifact
)
def job_artifacts_migrated_local(except_artifact_ids:)
fdw_geo_node
.job_artifacts
.inner_join_job_artifact_registry
.with_files_stored_remotely
.id_not_in(except_artifact_ids)
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
# frozen_string_literal: true
module Geo
class LegacyJobArtifactRegistryFinder < RegistryFinder
def syncable
current_node.job_artifacts.syncable
end
def job_artifacts_synced
legacy_inner_join_registry_ids(
syncable,
Geo::JobArtifactRegistry.synced.pluck_artifact_key,
Ci::JobArtifact
)
end
def job_artifacts_failed
legacy_inner_join_registry_ids(
syncable,
Geo::JobArtifactRegistry.failed.pluck_artifact_key,
Ci::JobArtifact
)
end
def job_artifacts_synced_missing_on_primary
legacy_inner_join_registry_ids(
syncable,
Geo::JobArtifactRegistry.synced.missing_on_primary.pluck_artifact_key,
Ci::JobArtifact
)
end
def job_artifacts_unsynced(except_artifact_ids: [])
registry_artifact_ids = Geo::JobArtifactRegistry.pluck_artifact_key | except_artifact_ids
legacy_left_outer_join_registry_ids(
syncable,
registry_artifact_ids,
Ci::JobArtifact
)
end
def job_artifacts_migrated_local(except_artifact_ids: [])
registry_artifact_ids = Geo::JobArtifactRegistry.pluck_artifact_key - except_artifact_ids
legacy_inner_join_registry_ids(
current_node.job_artifacts.with_files_stored_remotely,
registry_artifact_ids,
Ci::JobArtifact
)
end
def registries_for_job_artifacts
return Geo::JobArtifactRegistry.all unless selective_sync?
legacy_inner_join_registry_ids(
Geo::JobArtifactRegistry.all,
current_node.job_artifacts.pluck_primary_key,
Geo::JobArtifactRegistry,
foreign_key: :artifact_id
)
end
end
end
......@@ -16,7 +16,8 @@ module EE
METRICS_REPORT_FILE_TYPES = %w[metrics].freeze
scope :not_expired, -> { where('expire_at IS NULL OR expire_at > ?', Time.current) }
scope :geo_syncable, -> { with_files_stored_locally.not_expired }
scope :project_id_in, ->(ids) { joins(:project).merge(::Project.id_in(ids)) }
scope :syncable, -> { with_files_stored_locally.not_expired }
scope :with_files_stored_remotely, -> { where(file_store: ::JobArtifactUploader::Store::REMOTE) }
scope :security_reports, -> do
......
......@@ -10,8 +10,42 @@ module Geo
self.table_name = Gitlab::Geo::Fdw.foreign_table_name('ci_job_artifacts')
belongs_to :project, class_name: 'Geo::Fdw::Project', inverse_of: :job_artifacts
scope :not_expired, -> { where('expire_at IS NULL OR expire_at > ?', Time.current) }
scope :geo_syncable, -> { with_files_stored_locally.not_expired }
scope :project_id_in, ->(ids) { joins(:project).merge(Geo::Fdw::Project.id_in(ids)) }
scope :syncable, -> { with_files_stored_locally.not_expired }
class << self
def inner_join_job_artifact_registry
join_statement =
arel_table
.join(job_artifact_registry_table, Arel::Nodes::InnerJoin)
.on(arel_table[:id].eq(job_artifact_registry_table[:artifact_id]))
joins(join_statement.join_sources)
end
def missing_job_artifact_registry
left_outer_join_job_artifact_registry
.where(job_artifact_registry_table[:id].eq(nil))
end
private
def job_artifact_registry_table
Geo::JobArtifactRegistry.arel_table
end
def left_outer_join_job_artifact_registry
join_statement =
arel_table
.join(job_artifact_registry_table, Arel::Nodes::OuterJoin)
.on(arel_table[:id].eq(job_artifact_registry_table[:artifact_id]))
joins(join_statement.join_sources)
end
end
end
end
end
......
......@@ -14,6 +14,12 @@ module Geo
has_many :geo_node_namespace_links, class_name: 'Geo::Fdw::GeoNodeNamespaceLink'
has_many :namespaces, class_name: 'Geo::Fdw::Namespace', through: :geo_node_namespace_links
def job_artifacts
Geo::Fdw::Ci::JobArtifact.all unless selective_sync?
Geo::Fdw::Ci::JobArtifact.project_id_in(projects)
end
def lfs_objects
return Geo::Fdw::LfsObject.all unless selective_sync?
......
......@@ -7,6 +7,7 @@ module Geo
self.table_name = Gitlab::Geo::Fdw.foreign_table_name('projects')
has_many :job_artifacts, class_name: 'Geo::Fdw::Ci::JobArtifact'
has_many :lfs_objects_projects, class_name: 'Geo::Fdw::LfsObjectsProject'
has_many :lfs_objects, class_name: 'Geo::Fdw::LfsObject', through: :lfs_objects_projects
......
......@@ -2,4 +2,12 @@
class Geo::JobArtifactRegistry < Geo::BaseRegistry
include Geo::Syncable
def self.artifact_id_not_in(ids)
where.not(artifact_id: ids)
end
def self.pluck_artifact_key
where(nil).pluck(:artifact_id)
end
end
......@@ -198,6 +198,12 @@ class GeoNode < ApplicationRecord
end
end
def job_artifacts
Ci::JobArtifact.all unless selective_sync?
Ci::JobArtifact.project_id_in(projects)
end
def lfs_objects
return LfsObject.all unless selective_sync?
......
---
title: Geo - Add selective sync support for the job artifacts FDW queries
merge_request: 11892
author:
type: changed
......@@ -28,25 +28,6 @@ describe Geo::AttachmentRegistryFinder, :geo do
stub_current_geo_node(secondary)
end
it 'responds to file registry finder methods' do
file_registry_finder_methods = %i{
syncable
count_syncable
count_synced
count_failed
count_synced_missing_on_primary
count_registry
find_unsynced
find_migrated_local
find_retryable_failed_registries
find_retryable_synced_missing_on_primary_registries
}
file_registry_finder_methods.each do |method|
expect(subject).to respond_to(method)
end
end
shared_examples 'finds all the things' do
describe '#find_unsynced' do
let!(:upload_1) { create(:upload, model: synced_group) }
......@@ -475,32 +456,5 @@ describe Geo::AttachmentRegistryFinder, :geo do
end
end
context 'FDW', :geo_fdw do
context 'with use_fdw_queries_for_selective_sync disabled' do
before do
stub_feature_flags(use_fdw_queries_for_selective_sync: false)
end
include_examples 'counts all the things'
include_examples 'finds all the things'
end
context 'with use_fdw_queries_for_selective_sync enabled' do
before do
stub_feature_flags(use_fdw_queries_for_selective_sync: true)
end
include_examples 'counts all the things'
include_examples 'finds all the things'
end
end
context 'Legacy' do
before do
stub_fdw_disabled
end
include_examples 'counts all the things'
include_examples 'finds all the things'
end
it_behaves_like 'a file registry finder'
end
......@@ -31,25 +31,6 @@ describe Geo::LfsObjectRegistryFinder, :geo do
stub_lfs_object_storage
end
it 'responds to file registry finder methods' do
file_registry_finder_methods = %i{
syncable
count_syncable
count_synced
count_failed
count_synced_missing_on_primary
count_registry
find_unsynced
find_migrated_local
find_retryable_failed_registries
find_retryable_synced_missing_on_primary_registries
}
file_registry_finder_methods.each do |method|
expect(subject).to respond_to(method)
end
end
shared_examples 'counts all the things' do
describe '#count_syncable' do
before do
......@@ -661,34 +642,5 @@ describe Geo::LfsObjectRegistryFinder, :geo do
end
end
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
context 'FDW', :geo_fdw, :delete do
context 'with use_fdw_queries_for_selective_sync disabled' do
before do
stub_feature_flags(use_fdw_queries_for_selective_sync: false)
end
include_examples 'counts all the things'
include_examples 'finds all the things'
end
context 'with use_fdw_queries_for_selective_sync enabled' do
before do
stub_feature_flags(use_fdw_queries_for_selective_sync: true)
end
include_examples 'counts all the things'
include_examples 'finds all the things'
end
end
context 'Legacy' do
before do
stub_fdw_disabled
end
include_examples 'counts all the things'
include_examples 'finds all the things'
end
it_behaves_like 'a file registry finder'
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::Fdw::Ci::JobArtifact, :geo, type: :model do
context 'relationships' do
it { is_expected.to belong_to(:project).class_name('Geo::Fdw::Project').inverse_of(:job_artifacts) }
end
end
......@@ -4,6 +4,7 @@ require 'spec_helper'
RSpec.describe Geo::Fdw::Project, :geo, type: :model do
context 'relationships' do
it { is_expected.to have_many(:job_artifacts).class_name('Geo::Fdw::Ci::JobArtifact') }
it { is_expected.to have_many(:lfs_objects_projects).class_name('Geo::Fdw::LfsObjectsProject') }
it { is_expected.to have_many(:lfs_objects).class_name('Geo::Fdw::LfsObject').through(:lfs_objects_projects) }
end
......
......@@ -18,23 +18,34 @@ shared_examples_for 'a file registry finder' do
end
end
context 'FDW', :geo_fdw do
include_examples 'counts all the things'
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
context 'FDW', :geo_fdw, :delete do
context 'with use_fdw_queries_for_selective_sync disabled' do
before do
stub_feature_flags(use_fdw_queries_for_selective_sync: false)
end
include_examples 'counts all the things'
include_examples 'finds all the things'
end
include_examples 'finds all the things' do
let(:method_prefix) { 'fdw' }
context 'with use_fdw_queries_for_selective_sync enabled' do
before do
stub_feature_flags(use_fdw_queries_for_selective_sync: true)
end
include_examples 'counts all the things'
include_examples 'finds all the things'
end
end
context 'Legacy' do
before do
allow(Gitlab::Geo::Fdw).to receive(:enabled?).and_return(false)
stub_fdw_disabled
end
include_examples 'counts all the things'
include_examples 'finds all the things' do
let(:method_prefix) { 'legacy' }
end
include_examples 'finds all the things'
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment