Commit b89457d9 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre Committed by Nick Thomas

Add disabled? method to Gitlab::Geo::Fdw

Extract a helper method to avoid code duplication.
parent 920ae5a8
......@@ -2,9 +2,11 @@
module Geo
class ExpireUploadsFinder
UPLOAD_TYPE = 'file'
def find_project_uploads(project)
if Gitlab::Geo::Fdw.enabled?
Geo::Fdw::Upload.for_model_with_type(project, 'file')
Geo::Fdw::Upload.for_model_with_type(project, UPLOAD_TYPE)
else
legacy_find_project_uploads(project)
end
......@@ -12,9 +14,9 @@ module Geo
def find_file_registries_uploads(project)
if Gitlab::Geo::Fdw.enabled?
Gitlab::Geo::Fdw::FileRegistryQueryBuilder.new
Gitlab::Geo::Fdw::UploadRegistryQueryBuilder.new
.for_model(project)
.with_type('file')
.with_type(UPLOAD_TYPE)
else
legacy_find_file_registries_uploads(project)
end
......@@ -28,13 +30,12 @@ module Geo
return Geo::FileRegistry.none if upload_ids.empty?
values_sql = upload_ids.map { |id| "(#{id})" }.join(',')
upload_type = 'file'
Geo::FileRegistry.joins(<<~SQL)
JOIN (VALUES #{values_sql})
AS uploads (id)
ON uploads.id = file_registry.file_id
AND file_registry.file_type='#{upload_type}'
AND file_registry.file_type='#{UPLOAD_TYPE}'
SQL
end
# rubocop:enable CodeReuse/ActiveRecord
......
# frozen_string_literal: true
module Geo
class LegacyLfsObjectRegistryFinder < RegistryFinder
def syncable
current_node.lfs_objects.syncable
end
def lfs_objects_synced
legacy_inner_join_registry_ids(
syncable,
Geo::FileRegistry.lfs_objects.synced.pluck_file_key,
LfsObject
)
end
def lfs_objects_failed
legacy_inner_join_registry_ids(
syncable,
Geo::FileRegistry.lfs_objects.failed.pluck_file_key,
LfsObject
)
end
def lfs_objects_unsynced(except_file_ids:)
registry_file_ids = Geo::FileRegistry.lfs_objects.pluck_file_key | except_file_ids
legacy_left_outer_join_registry_ids(
syncable,
registry_file_ids,
LfsObject
)
end
def lfs_objects_migrated_local(except_file_ids:)
legacy_inner_join_registry_ids(
current_node.lfs_objects.with_files_stored_remotely,
Geo::FileRegistry.lfs_objects.file_id_not_in(except_file_ids).pluck_file_key,
LfsObject
)
end
def lfs_objects_synced_missing_on_primary
legacy_inner_join_registry_ids(
syncable,
Geo::FileRegistry.lfs_objects.synced.missing_on_primary.pluck_file_key,
LfsObject
)
end
def registries_for_lfs_objects
return Geo::FileRegistry.lfs_objects unless selective_sync?
legacy_inner_join_registry_ids(
Geo::FileRegistry.lfs_objects,
current_node.lfs_objects.pluck_primary_key,
Geo::FileRegistry,
foreign_key: :file_id
)
end
end
end
......@@ -7,31 +7,29 @@ module Geo
end
def count_synced
if aggregate_pushdown_supported?
find_synced.count
else
legacy_find_synced.count
end
lfs_objects_synced.count
end
def count_failed
if aggregate_pushdown_supported?
find_failed.count
else
legacy_find_failed.count
end
lfs_objects_failed.count
end
def count_synced_missing_on_primary
if aggregate_pushdown_supported? && !use_legacy_queries?
fdw_find_synced_missing_on_primary.count
else
legacy_find_synced_missing_on_primary.count
end
lfs_objects_synced_missing_on_primary.count
end
def count_registry
Geo::FileRegistry.lfs_objects.count
registries_for_lfs_objects.count
end
def syncable
if use_legacy_queries_for_selective_sync?
legacy_finder.syncable
elsif selective_sync?
fdw_geo_node.lfs_objects.syncable
else
LfsObject.syncable
end
end
# Find limited amount of non replicated lfs objects.
......@@ -44,206 +42,111 @@ module Geo
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_file_ids ids that will be ignored from the query
# rubocop: disable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_file_ids: [])
relation =
if use_legacy_queries?
legacy_find_unsynced(except_file_ids: except_file_ids)
if use_legacy_queries_for_selective_sync?
legacy_finder.lfs_objects_unsynced(except_file_ids: except_file_ids)
else
fdw_find_unsynced(except_file_ids: except_file_ids)
lfs_objects_unsynced(except_file_ids: except_file_ids)
end
relation.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop:enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def find_migrated_local(batch_size:, except_file_ids: [])
relation =
if use_legacy_queries?
legacy_find_migrated_local(except_file_ids: except_file_ids)
if use_legacy_queries_for_selective_sync?
legacy_finder.lfs_objects_migrated_local(except_file_ids: except_file_ids)
else
fdw_find_migrated_local(except_file_ids: except_file_ids)
lfs_objects_migrated_local(except_file_ids: except_file_ids)
end
relation.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop:enable CodeReuse/ActiveRecord
def syncable
all.geo_syncable
end
# rubocop: disable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def find_retryable_failed_registries(batch_size:, except_file_ids: [])
find_failed_registries
.retry_due
.where.not(file_id: except_file_ids)
registries_for_lfs_objects
.merge(Geo::FileRegistry.failed)
.merge(Geo::FileRegistry.retry_due)
.file_id_not_in(except_file_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop:enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def find_retryable_synced_missing_on_primary_registries(batch_size:, except_file_ids: [])
find_synced_missing_on_primary_registries
registries_for_lfs_objects
.synced
.missing_on_primary
.retry_due
.where.not(file_id: except_file_ids)
.file_id_not_in(except_file_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop:enable CodeReuse/ActiveRecord
private
def all
if selective_sync?
LfsObject.project_id_in(current_node.projects)
else
LfsObject.all
end
# rubocop:disable CodeReuse/Finder
def legacy_finder
@legacy_finder ||= Geo::LegacyLfsObjectRegistryFinder.new(current_node: current_node)
end
# rubocop:enable CodeReuse/Finder
def find_synced
if use_legacy_queries?
legacy_find_synced
else
fdw_find_synced
end
def fdw_geo_node
@fdw_geo_node ||= Geo::Fdw::GeoNode.find(current_node.id)
end
def find_failed
if use_legacy_queries?
legacy_find_failed
def registries_for_lfs_objects
if use_legacy_queries_for_selective_sync?
legacy_finder.registries_for_lfs_objects
else
fdw_find_failed
fdw_geo_node.lfs_object_registries
end
end
def find_synced_registries
Geo::FileRegistry.lfs_objects.synced
end
def find_failed_registries
Geo::FileRegistry.lfs_objects.failed
end
def find_synced_missing_on_primary_registries
find_synced_registries.missing_on_primary
def lfs_objects_synced
if use_legacy_queries_for_selective_sync?
legacy_finder.lfs_objects_synced
else
fdw_geo_node.lfs_objects.synced
end
end
#
# FDW accessors
#
# rubocop: disable CodeReuse/ActiveRecord
def fdw_find
fdw_all.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id")
.geo_syncable
.merge(Geo::FileRegistry.lfs_objects)
def lfs_objects_failed
if use_legacy_queries_for_selective_sync?
legacy_finder.lfs_objects_failed
else
fdw_geo_node.lfs_objects.failed
end
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def fdw_find_unsynced(except_file_ids:)
fdw_all.joins("LEFT OUTER JOIN file_registry
ON file_registry.file_id = #{fdw_table}.id
AND file_registry.file_type = 'lfs'")
.geo_syncable
.where(file_registry: { id: nil })
.where.not(id: except_file_ids)
def lfs_objects_unsynced(except_file_ids:)
fdw_geo_node
.lfs_objects
.syncable
.missing_file_registry
.id_not_in(except_file_ids)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def fdw_find_migrated_local(except_file_ids:)
fdw_all.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id")
def lfs_objects_migrated_local(except_file_ids:)
fdw_geo_node
.lfs_objects
.inner_join_file_registry
.with_files_stored_remotely
.where.not(id: except_file_ids)
.merge(Geo::FileRegistry.lfs_objects)
end
# rubocop: enable CodeReuse/ActiveRecord
def fdw_find_synced
fdw_find.merge(Geo::FileRegistry.synced)
.id_not_in(except_file_ids)
end
def fdw_find_synced_missing_on_primary
fdw_find.merge(Geo::FileRegistry.synced.missing_on_primary)
end
def fdw_find_failed
fdw_find.merge(Geo::FileRegistry.failed)
end
# rubocop: disable CodeReuse/ActiveRecord
def fdw_all
if selective_sync?
Geo::Fdw::LfsObject.joins(:project).where(projects: { id: current_node.projects })
def lfs_objects_synced_missing_on_primary
if use_legacy_queries_for_selective_sync?
legacy_finder.lfs_objects_synced_missing_on_primary
else
Geo::Fdw::LfsObject.all
fdw_geo_node.lfs_objects.synced.missing_on_primary
end
end
# rubocop: enable CodeReuse/ActiveRecord
def fdw_table
Geo::Fdw::LfsObject.table_name
end
#
# Legacy accessors (non FDW)
#
# rubocop: disable CodeReuse/ActiveRecord
def legacy_find_synced
legacy_inner_join_registry_ids(
syncable,
find_synced_registries.pluck(:file_id),
LfsObject
)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def legacy_find_failed
legacy_inner_join_registry_ids(
syncable,
find_failed_registries.pluck(:file_id),
LfsObject
)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def legacy_find_unsynced(except_file_ids:)
registry_file_ids = Geo::FileRegistry.lfs_objects.pluck(:file_id) | except_file_ids
legacy_left_outer_join_registry_ids(
syncable,
registry_file_ids,
LfsObject
)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def legacy_find_migrated_local(except_file_ids:)
registry_file_ids = Geo::FileRegistry.lfs_objects.pluck(:file_id) - except_file_ids
legacy_inner_join_registry_ids(
all.with_files_stored_remotely,
registry_file_ids,
LfsObject
)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def legacy_find_synced_missing_on_primary
legacy_inner_join_registry_ids(
syncable,
find_synced_missing_on_primary_registries.pluck(:file_id),
LfsObject
)
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
......@@ -86,14 +86,6 @@ module Geo
private
def fdw_disabled?
!Gitlab::Geo::Fdw.enabled?
end
def use_legacy_queries_for_selective_sync?
fdw_disabled? || selective_sync? && !Gitlab::Geo::Fdw.enabled_for_selective_sync?
end
def finder_klass_for_unsynced_projects
if use_legacy_queries_for_selective_sync?
Geo::LegacyProjectUnsyncedFinder
......
......@@ -23,7 +23,11 @@ module Geo
def use_legacy_queries?
# Selective project replication adds a wrinkle to FDW
# queries, so we fallback to the legacy version for now.
!Gitlab::Geo::Fdw.enabled? || selective_sync?
Gitlab::Geo::Fdw.disabled? || selective_sync?
end
def use_legacy_queries_for_selective_sync?
use_legacy_queries? && !Gitlab::Geo::Fdw.enabled_for_selective_sync?
end
# rubocop: disable CodeReuse/ActiveRecord
......
......@@ -8,12 +8,15 @@ module EE
module LfsObject
extend ActiveSupport::Concern
STORE_COLUMN = :file_store
prepended do
include ObjectStorable
after_destroy :log_geo_deleted_event
scope :geo_syncable, -> { with_files_stored_locally }
scope :project_id_in, ->(ids) { joins(:projects).merge(::Project.id_in(ids)) }
scope :with_files_stored_remotely, -> { where(file_store: LfsObjectUploader::Store::REMOTE) }
scope :syncable, -> { with_files_stored_locally }
end
def log_geo_deleted_event
......
......@@ -14,6 +14,19 @@ module Geo
has_many :geo_node_namespace_links, class_name: 'Geo::Fdw::GeoNodeNamespaceLink'
has_many :namespaces, class_name: 'Geo::Fdw::Namespace', through: :geo_node_namespace_links
def lfs_objects
return Geo::Fdw::LfsObject.all unless selective_sync?
Geo::Fdw::LfsObject.project_id_in(projects)
end
def lfs_object_registries
return Geo::FileRegistry.lfs_objects unless selective_sync?
Gitlab::Geo::Fdw::LfsObjectRegistryQueryBuilder.new
.for_lfs_objects(lfs_objects)
end
def projects
return Geo::Fdw::Project.all unless selective_sync?
......
......@@ -9,7 +9,60 @@ module Geo
self.table_name = Gitlab::Geo::Fdw.foreign_table_name('lfs_objects')
scope :geo_syncable, -> { with_files_stored_locally }
has_many :lfs_objects_projects, class_name: 'Geo::Fdw::LfsObjectsProject'
has_many :projects, class_name: 'Geo::Fdw::Project', through: :lfs_objects_projects
scope :project_id_in, ->(ids) { joins(:projects).merge(Geo::Fdw::Project.id_in(ids)) }
scope :syncable, -> { with_files_stored_locally }
class << self
def failed
inner_join_file_registry
.syncable
.merge(Geo::FileRegistry.failed)
end
def inner_join_file_registry
join_statement =
arel_table
.join(file_registry_table, Arel::Nodes::InnerJoin)
.on(arel_table[:id].eq(file_registry_table[:file_id]))
joins(join_statement.join_sources)
.merge(Geo::FileRegistry.lfs_objects)
end
def missing_file_registry
left_outer_join_file_registry
.where(file_registry_table[:id].eq(nil))
end
def missing_on_primary
inner_join_file_registry
.merge(Geo::FileRegistry.synced.missing_on_primary)
end
def synced
inner_join_file_registry
.syncable
.merge(Geo::FileRegistry.synced)
end
private
def file_registry_table
Geo::FileRegistry.arel_table
end
def left_outer_join_file_registry
join_statement =
arel_table
.join(file_registry_table, Arel::Nodes::OuterJoin)
.on(arel_table[:id].eq(file_registry_table[:file_id]))
joins(join_statement.join_sources)
end
end
end
end
end
# frozen_string_literal: true
module Geo
module Fdw
class LfsObjectsProject < ::Geo::BaseFdw
self.table_name = Gitlab::Geo::Fdw.foreign_table_name('lfs_objects_projects')
belongs_to :lfs_object, class_name: 'Geo::Fdw::LfsObject', inverse_of: :projects
belongs_to :project, class_name: 'Geo::Fdw::Project', inverse_of: :lfs_objects
end
end
end
......@@ -7,6 +7,9 @@ module Geo
self.table_name = Gitlab::Geo::Fdw.foreign_table_name('projects')
has_many :lfs_objects_projects, class_name: 'Geo::Fdw::LfsObjectsProject'
has_many :lfs_objects, class_name: 'Geo::Fdw::LfsObject', through: :lfs_objects_projects
class << self
def missing_project_registry
left_outer_join_project_registry
......
......@@ -3,11 +3,11 @@
class Geo::FileRegistry < Geo::BaseRegistry
include Geo::Syncable
scope :lfs_objects, -> { where(file_type: :lfs) }
scope :attachments, -> { where(file_type: Geo::FileService::DEFAULT_OBJECT_TYPES) }
scope :failed, -> { where(success: false).where.not(retry_count: nil) }
scope :never, -> { where(success: false, retry_count: nil) }
scope :fresh, -> { order(created_at: :desc) }
scope :lfs_objects, -> { where(file_type: :lfs) }
scope :never, -> { where(success: false, retry_count: nil) }
scope :with_file_type, ->(type) { where(file_type: type) }
self.inheritance_column = 'file_type'
......@@ -20,6 +20,18 @@ class Geo::FileRegistry < Geo::BaseRegistry
end
end
def self.file_id_in(ids)
where(file_id: ids)
end
def self.file_id_not_in(ids)
where.not(file_id: ids)
end
def self.pluck_file_key
where(nil).pluck(:file_id)
end
def self.with_status(status)
case status
when 'synced', 'never', 'failed'
......
......@@ -194,6 +194,12 @@ class GeoNode < ApplicationRecord
end
end
def lfs_objects
return LfsObject.all unless selective_sync?
LfsObject.project_id_in(projects)
end
def projects
return Project.all unless selective_sync?
......
---
title: Geo - Implement selective sync support for the LFS objects FDW queries
merge_request: 10757
author:
type: changed
......@@ -19,6 +19,10 @@ module Gitlab
value.nil? ? true : value
end
def disabled?
!enabled?
end
def enabled_for_selective_sync?
enabled? && Feature.enabled?(:use_fdw_queries_for_selective_sync)
end
......
# frozen_string_literal: true
# Builder class to create composable queries using FDW to
# retrieve file registries for LFS objects.
#
# Basic usage:
#
# Gitlab::Geo::Fdw::LfsObjectRegistryQueryBuilder
# .new
# .inner_join_lfs_objects
#
module Gitlab
module Geo
class Fdw
class LfsObjectRegistryQueryBuilder < BaseQueryBuilder
# rubocop:disable CodeReuse/ActiveRecord
def for_lfs_objects(file_ids)
query
.joins(fdw_inner_join_lfs_objects)
.file_id_in(file_ids)
end
# rubocop:enable CodeReuse/ActiveRecord
private
def base
::Geo::FileRegistry
.select(file_registry_table[Arel.star])
.lfs_objects
end
def file_registry_table
::Geo::FileRegistry.arel_table
end
def fdw_lfs_object_table
::Geo::Fdw::LfsObject.arel_table
end
def fdw_inner_join_lfs_objects
file_registry_table
.join(fdw_lfs_object_table, Arel::Nodes::InnerJoin)
.on(file_registry_table[:file_id].eq(fdw_lfs_object_table[:id]))
.join_sources
end
end
end
end
end
......@@ -5,14 +5,14 @@
#
# Basic usage:
#
# Gitlab::Geo::Fdw::FileRegistryQueryBuilder
# Gitlab::Geo::Fdw::UploadRegistryQueryBuilder
# .new
# .for_project_with_type(project, 'file')
#
module Gitlab
module Geo
class Fdw
class FileRegistryQueryBuilder < BaseQueryBuilder
class UploadRegistryQueryBuilder < BaseQueryBuilder
# rubocop:disable CodeReuse/ActiveRecord
def for_model(model)
reflect(
......
......@@ -4,7 +4,7 @@ require 'spec_helper'
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
describe Gitlab::Geo::Fdw::FileRegistryQueryBuilder, :geo, :delete do
describe Gitlab::Geo::Fdw::UploadRegistryQueryBuilder, :geo, :delete do
let(:project) { create(:project) }
let(:upload_1) { create(:upload, :issuable_upload, model: project) }
let(:upload_2) { create(:upload, :issuable_upload, model: project) }
......
......@@ -44,6 +44,49 @@ describe Gitlab::Geo::Fdw, :geo do
end
end
describe '.enabled?' do
it 'returns true when foreign server does not exist' do
drop_foreign_server
expect(described_class.disabled?).to eq true
end
it 'returns true when foreign server exists but foreign schema does not exist' do
drop_foreign_schema
expect(described_class.disabled?).to eq true
end
it 'returns true when foreign server and schema exists but foreign tables are empty' do
drop_foreign_schema
create_foreign_schema
expect(described_class.disabled?).to eq true
end
it 'returns true when fdw is disabled in `config/database_geo.yml`' do
allow(Rails.configuration).to receive(:geo_database).and_return('fdw' => false)
expect(described_class.disabled?).to eq true
end
it 'returns false when fdw is set in `config/database_geo.yml`' do
allow(Rails.configuration).to receive(:geo_database).and_return('fdw' => true)
expect(described_class.disabled?).to eq false
end
it 'returns false when fdw is nil in `config/database_geo.yml`' do
allow(Rails.configuration).to receive(:geo_database).and_return('fdw' => nil)
expect(described_class.disabled?).to eq false
end
it 'returns false with a functional fdw environment' do
expect(described_class.disabled?).to eq false
end
end
describe '.enabled_for_selective_sync?' do
context 'when the feature flag is enabled' do
before do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::Fdw::LfsObject, :geo, type: :model do
context 'relationships' do
it { is_expected.to have_many(:lfs_objects_projects).class_name('Geo::Fdw::LfsObjectsProject') }
it { is_expected.to have_many(:projects).class_name('Geo::Fdw::Project').through(:lfs_objects_projects) }
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::Fdw::LfsObjectsProject, :geo, type: :model do
context 'relationships' do
it { is_expected.to belong_to(:lfs_object).class_name('Geo::Fdw::LfsObject').inverse_of(:projects) }
it { is_expected.to belong_to(:project).class_name('Geo::Fdw::Project').inverse_of(:lfs_objects) }
end
end
......@@ -3,6 +3,11 @@
require 'spec_helper'
RSpec.describe Geo::Fdw::Project, :geo, type: :model do
context 'relationships' do
it { is_expected.to have_many(:lfs_objects_projects).class_name('Geo::Fdw::LfsObjectsProject') }
it { is_expected.to have_many(:lfs_objects).class_name('Geo::Fdw::LfsObject').through(:lfs_objects_projects) }
end
describe '.search' do
set(:test_project) { create(:project, description: 'kitten mittens') }
set(:project) { described_class.find(test_project.id) }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment