Commit 595d591d authored by Stan Hu's avatar Stan Hu

Use PostgreSQL FDW for Geo downloads

If the PostgreSQL Foreign Data Wrapper (postgres_fdw) extension is available,
we can optimize cross-database queries by using that. We automatically
detect if it is available and fallback to the non-FDW version.

Part of #3382
parent 0bf399f5
class LfsObject < ActiveRecord::Base
# EE specific modules
prepend EE::LfsObject
has_many :lfs_objects_projects, dependent: :destroy # rubocop:disable Cop/ActiveRecordDependent
has_many :projects, through: :lfs_objects_projects
......
class Upload < ActiveRecord::Base
# EE specific modules
prepend EE::Upload
# Upper limit for foreground checksum processing
CHECKSUM_THRESHOLD = 100.megabytes
......
......@@ -21,9 +21,9 @@ module Geo
def find_unsynced_objects
lfs_object_ids = find_lfs_object_ids
objects_ids = find_object_ids
upload_objects_ids = find_upload_object_ids
interleave(lfs_object_ids, objects_ids)
interleave(lfs_object_ids, upload_objects_ids)
end
def find_failed_objects
......@@ -33,8 +33,53 @@ module Geo
.pluck(:file_id, :file_type)
end
def find_object_ids
unsynced_downloads = filter_registry_ids(
def find_lfs_object_ids
# Selective project replication adds a wrinkle to FDW queries, so
# we fallback to the legacy version for now.
if Gitlab::Geo.fdw? && !current_node.restricted_project_ids
fdw_find_lfs_object_ids
else
legacy_find_lfs_object_ids
end
end
def find_upload_object_ids
# Selective project replication adds a wrinkle to FDW queries, so
# we fallback to the legacy version for now.
if Gitlab::Geo.fdw? && !current_node.restricted_project_ids
fdw_find_upload_object_ids
else
legacy_find_upload_object_ids
end
end
def fdw_find_lfs_object_ids
fdw_table = "#{Gitlab::Geo.fdw_schema}.lfs_objects"
# Filter out objects in object storage (this is done in GeoNode#lfs_objects)
LfsObject.fdw.joins("LEFT OUTER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id AND file_registry.file_type = 'lfs'")
.where("#{fdw_table}.file_store IS NULL OR #{fdw_table}.file_store = #{LfsObjectUploader::LOCAL_STORE}")
.where('file_registry.file_id IS NULL')
.order(created_at: :desc)
.limit(db_retrieve_batch_size)
.pluck(:id)
.map { |id| [id, :lfs] }
end
def fdw_find_upload_object_ids
fdw_table = "#{Gitlab::Geo.fdw_schema}.uploads"
obj_types = Geo::FileService::DEFAULT_OBJECT_TYPES.map { |val| "'#{val}'" }.join(',')
Upload.fdw.joins("LEFT OUTER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id AND file_registry.file_type IN (#{obj_types})")
.where('file_registry.file_id IS NULL')
.order(created_at: :desc)
.limit(db_retrieve_batch_size)
.pluck(:id, :uploader)
.map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] }
end
def legacy_find_upload_object_ids
unsynced_downloads = legacy_filter_registry_ids(
current_node.uploads,
Geo::FileService::DEFAULT_OBJECT_TYPES,
Upload.table_name
......@@ -46,8 +91,8 @@ module Geo
.map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] }
end
def find_lfs_object_ids
unsynced_downloads = filter_registry_ids(
def legacy_find_lfs_object_ids
unsynced_downloads = legacy_filter_registry_ids(
current_node.lfs_objects,
[:lfs],
LfsObject.table_name
......@@ -63,8 +108,8 @@ module Geo
# plucks a list of file IDs from one into the other. This will not scale
# well with the number of synchronized files--the query will increase
# linearly in size--so this should be replaced with postgres_fdw ASAP.
def filter_registry_ids(objects, file_types, table_name)
registry_ids = pluck_registry_ids(Geo::FileRegistry, file_types)
def legacy_filter_registry_ids(objects, file_types, table_name)
registry_ids = legacy_pluck_registry_ids(Geo::FileRegistry, file_types)
return objects if registry_ids.empty?
......@@ -78,7 +123,7 @@ module Geo
joined_relation.where(file_registry: { registry_present: [nil, false] })
end
def pluck_registry_ids(relation, file_types)
def legacy_pluck_registry_ids(relation, file_types)
ids = relation.where(file_type: file_types).pluck(:file_id)
(ids + scheduled_file_ids(file_types)).uniq
end
......
---
title: Use PostgreSQL FDW for Geo downloads
merge_request:
author:
type: added
module EE
module Geo
module ForeignDataWrapped
extend ActiveSupport::Concern
class_methods do
def fdw
instance = self.clone
instance.table_name = "#{::Gitlab::Geo.fdw_schema}.#{self.table_name}"
def instance.name
table_name
end
instance.establish_connection Rails.configuration.geo_database
instance
end
end
end
end
end
module EE
# LfsObject EE mixin
#
# This module is intended to encapsulate EE-specific model logic
# and be prepended in the `LfsObject` model
module LfsObject
extend ActiveSupport::Concern
prepended do
prepend ::EE::Geo::ForeignDataWrapped
end
end
end
......@@ -10,6 +10,7 @@ module EE
include Elastic::ProjectsSearch
prepend GeoAwareAvatar
prepend ImportStatusStateMachine
prepend ::EE::Geo::ForeignDataWrapped
before_validation :mark_remote_mirrors_for_removal
......
module EE
# Upload EE mixin
#
# This module is intended to encapsulate EE-specific model logic
# and be prepended in the `Upload` model
module Upload
extend ActiveSupport::Concern
prepended do
prepend ::EE::Geo::ForeignDataWrapped
end
end
end
......@@ -17,7 +17,7 @@ module EE
def process_wiki_repository_update
if ::Gitlab::Geo.primary?
# Create wiki repository updated event on Geo event log
::Geo::RepositoryUpdatedEventStore.new(project, source: Geo::RepositoryUpdatedEvent::WIKI).create
::Geo::RepositoryUpdatedEventStore.new(project, source: ::Geo::RepositoryUpdatedEvent::WIKI).create
end
end
end
......
......@@ -22,7 +22,7 @@ module EE
if ::Gitlab::Geo.enabled?
# Create wiki repository updated event on Geo event log
::Geo::RepositoryUpdatedEventStore.new(post_received.project, source: Geo::RepositoryUpdatedEvent::WIKI).create
::Geo::RepositoryUpdatedEventStore.new(post_received.project, source: ::Geo::RepositoryUpdatedEvent::WIKI).create
end
end
......
......@@ -25,8 +25,8 @@ describe WikiPages::CreateService do
end
it 'triggers Geo::RepositoryUpdatedEventStore when Geo is enabled' do
expect(Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(Geo::RepositoryUpdatedEventStore).to receive(:create)
expect(::Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(::Geo::RepositoryUpdatedEventStore).to receive(:create)
service.execute
end
......
......@@ -18,8 +18,8 @@ describe WikiPages::DestroyService do
end
it 'triggers Geo::RepositoryUpdatedEventStore when Geo is enabled' do
expect(Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(Geo::RepositoryUpdatedEventStore).to receive(:create)
expect(::Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: ::Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(::Geo::RepositoryUpdatedEventStore).to receive(:create)
service.execute(page)
end
......
......@@ -26,8 +26,8 @@ describe WikiPages::UpdateService do
end
it 'triggers Geo::RepositoryUpdatedEventStore when Geo is enabled' do
expect(Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(Geo::RepositoryUpdatedEventStore).to receive(:create)
expect(::Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(::Geo::RepositoryUpdatedEventStore).to receive(:create)
service.execute(page)
end
......
......@@ -26,7 +26,7 @@ describe PostReceive do
end
it 'calls Geo::RepositoryUpdatedEventStore' do
expect_any_instance_of(Geo::RepositoryUpdatedEventStore).to receive(:create)
expect_any_instance_of(::Geo::RepositoryUpdatedEventStore).to receive(:create)
described_class.new.perform(gl_repository, key_id, base64_changes)
end
......@@ -39,8 +39,8 @@ describe PostReceive do
it 'triggers Geo::RepositoryUpdatedEventStore when Geo is enabled' do
allow(Gitlab::Geo).to receive(:enabled?) { true }
expect(Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(Geo::RepositoryUpdatedEventStore).to receive(:create)
expect(::Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: ::Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(::Geo::RepositoryUpdatedEventStore).to receive(:create)
described_class.new.perform(gl_repository, key_id, base64_changes)
end
......
require 'spec_helper'
describe Geo::FileDownloadDispatchWorker, :geo do
# Disable transactions via :truncate method because a foreign table
# can't see changes inside a transaction of a different connection.
describe Geo::FileDownloadDispatchWorker, :geo, :truncate do
include ::EE::GeoHelpers
set(:primary) { create(:geo_node, :primary, host: 'primary-geo-node') }
set(:secondary) { create(:geo_node) }
let(:primary) { create(:geo_node, :primary, host: 'primary-geo-node') }
let(:secondary) { create(:geo_node) }
before do
stub_current_geo_node(secondary)
......@@ -16,7 +18,11 @@ describe Geo::FileDownloadDispatchWorker, :geo do
subject { described_class.new }
describe '#perform' do
shared_examples '#perform' do |skip_tests|
before do
skip if skip_tests
end
it 'does not schedule anything when secondary role is disabled' do
create(:lfs_object, :with_file)
......@@ -25,6 +31,10 @@ describe Geo::FileDownloadDispatchWorker, :geo do
expect(GeoFileDownloadWorker).not_to receive(:perform_async)
subject.perform
# We need to unstub here or the DatabaseCleaner will have issues since it
# will appear as though the tracking DB were not available
allow(Gitlab::Geo).to receive(:geo_database_configured?).and_call_original
end
it 'does not schedule anything when node is disabled' do
......@@ -144,4 +154,17 @@ describe Geo::FileDownloadDispatchWorker, :geo do
end
end
end
describe 'when PostgreSQL FDW is available', :geo do
# Skip if FDW isn't activated on this database
it_behaves_like '#perform', !Gitlab::Geo.fdw?
end
describe 'when PostgreSQL FDW is not enabled', :geo do
before do
allow(Gitlab::Geo).to receive(:fdw?).and_return(false)
end
it_behaves_like '#perform', false
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment