Commit 7c63e8dc authored by Nick Thomas's avatar Nick Thomas

Merge branch 'sh-rails-fdw-support' into 'master'

Use PostgreSQL FDW for Geo downloads

See merge request gitlab-org/gitlab-ee!3128
parents 6ccdf354 e0622d3f
class Geo::BaseFdw < Geo::TrackingBase
self.abstract_class = true
end
class Geo::BaseRegistry < ActiveRecord::Base
class Geo::BaseRegistry < Geo::TrackingBase
self.abstract_class = true
if Gitlab::Geo.geo_database_configured?
establish_connection Rails.configuration.geo_database
end
def self.connection
raise 'Geo secondary database is not configured' unless Gitlab::Geo.geo_database_configured?
super
end
end
module Geo
module Fdw
class LfsObject < ::Geo::BaseFdw
self.table_name = Gitlab::Geo.fdw_table('lfs_objects')
end
end
end
module Geo
module Fdw
class Upload < ::Geo::BaseFdw
self.table_name = Gitlab::Geo.fdw_table('uploads')
end
end
end
# This module is intended to centralize all database access to the secondary
# tracking database for Geo.
module Geo
class TrackingBase < ActiveRecord::Base
self.abstract_class = true
if ::Gitlab::Geo.geo_database_configured?
establish_connection Rails.configuration.geo_database
end
def self.connection
raise 'Geo secondary database is not configured' unless ::Gitlab::Geo.geo_database_configured?
super
end
end
end
......@@ -21,9 +21,9 @@ module Geo
def find_unsynced_objects
lfs_object_ids = find_lfs_object_ids
objects_ids = find_object_ids
upload_objects_ids = find_upload_object_ids
interleave(lfs_object_ids, objects_ids)
interleave(lfs_object_ids, upload_objects_ids)
end
def find_failed_objects
......@@ -33,38 +33,82 @@ module Geo
.pluck(:file_id, :file_type)
end
def find_object_ids
unsynced_downloads = filter_registry_ids(
current_node.uploads,
Geo::FileService::DEFAULT_OBJECT_TYPES,
Upload.table_name
)
def selective_sync
current_node.restricted_project_ids
end
def find_lfs_object_ids
# Selective project replication adds a wrinkle to FDW queries, so
# we fallback to the legacy version for now.
relation =
if Gitlab::Geo.fdw? && !selective_sync
fdw_find_lfs_object_ids
else
legacy_find_lfs_object_ids
end
relation
.limit(db_retrieve_batch_size)
.pluck(:id)
.map { |id| [id, :lfs] }
end
unsynced_downloads
def find_upload_object_ids
# Selective project replication adds a wrinkle to FDW queries, so
# we fallback to the legacy version for now.
relation =
if Gitlab::Geo.fdw? && !selective_sync
fdw_find_upload_object_ids
else
legacy_find_upload_object_ids
end
relation
.limit(db_retrieve_batch_size)
.pluck(:id, :uploader)
.map { |id, uploader| [id, uploader.sub(/Uploader\z/, '').underscore] }
end
def find_lfs_object_ids
unsynced_downloads = filter_registry_ids(
def fdw_find_lfs_object_ids
fdw_table = Geo::Fdw::LfsObject.table_name
# Filter out objects in object storage (this is done in GeoNode#lfs_objects)
Geo::Fdw::LfsObject.joins("LEFT OUTER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id AND file_registry.file_type = 'lfs'")
.where("#{fdw_table}.file_store IS NULL OR #{fdw_table}.file_store = #{LfsObjectUploader::LOCAL_STORE}")
.where('file_registry.file_id IS NULL')
end
def fdw_find_upload_object_ids
fdw_table = Geo::Fdw::Upload.table_name
obj_types = Geo::FileService::DEFAULT_OBJECT_TYPES.map { |val| "'#{val}'" }.join(',')
Geo::Fdw::Upload.joins("LEFT OUTER JOIN file_registry ON file_registry.file_id = #{fdw_table}.id AND file_registry.file_type IN (#{obj_types})")
.where('file_registry.file_id IS NULL')
.order(created_at: :desc)
end
def legacy_find_upload_object_ids
legacy_filter_registry_ids(
current_node.uploads,
Geo::FileService::DEFAULT_OBJECT_TYPES,
Upload.table_name
)
end
def legacy_find_lfs_object_ids
legacy_filter_registry_ids(
current_node.lfs_objects,
[:lfs],
LfsObject.table_name
)
unsynced_downloads
.limit(db_retrieve_batch_size)
.pluck(:id)
.map { |id| [id, :lfs] }
end
# This query requires data from two different databases, and unavoidably
# plucks a list of file IDs from one into the other. This will not scale
# well with the number of synchronized files--the query will increase
# linearly in size--so this should be replaced with postgres_fdw ASAP.
def filter_registry_ids(objects, file_types, table_name)
registry_ids = pluck_registry_ids(Geo::FileRegistry, file_types)
def legacy_filter_registry_ids(objects, file_types, table_name)
registry_ids = legacy_pluck_registry_ids(Geo::FileRegistry, file_types)
return objects if registry_ids.empty?
......@@ -78,7 +122,7 @@ module Geo
joined_relation.where(file_registry: { registry_present: [nil, false] })
end
def pluck_registry_ids(relation, file_types)
def legacy_pluck_registry_ids(relation, file_types)
ids = relation.where(file_type: file_types).pluck(:file_id)
(ids + scheduled_file_ids(file_types)).uniq
end
......
---
title: Use PostgreSQL FDW for Geo downloads
merge_request:
author:
type: added
......@@ -17,7 +17,7 @@ module EE
def process_wiki_repository_update
if ::Gitlab::Geo.primary?
# Create wiki repository updated event on Geo event log
::Geo::RepositoryUpdatedEventStore.new(project, source: Geo::RepositoryUpdatedEvent::WIKI).create
::Geo::RepositoryUpdatedEventStore.new(project, source: ::Geo::RepositoryUpdatedEvent::WIKI).create
end
end
end
......
......@@ -22,7 +22,7 @@ module EE
if ::Gitlab::Geo.enabled?
# Create wiki repository updated event on Geo event log
::Geo::RepositoryUpdatedEventStore.new(post_received.project, source: Geo::RepositoryUpdatedEvent::WIKI).create
::Geo::RepositoryUpdatedEventStore.new(post_received.project, source: ::Geo::RepositoryUpdatedEvent::WIKI).create
end
end
......
......@@ -14,6 +14,8 @@ module Gitlab
SECONDARY_JOBS = %i(repository_sync_job file_download_job).freeze
FDW_SCHEMA = 'gitlab_secondary'.freeze
def self.current_node
self.cache_value(:geo_node_current) do
GeoNode.find_by(host: Gitlab.config.gitlab.host,
......@@ -77,13 +79,13 @@ module Gitlab
def self.fdw?
self.cache_value(:geo_fdw?) do
::Geo::BaseRegistry.connection.execute(
"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = '#{self.fdw_schema}' AND table_type = 'FOREIGN TABLE'"
"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = '#{FDW_SCHEMA}' AND table_type = 'FOREIGN TABLE'"
).first.fetch('count').to_i.positive?
end
end
def self.fdw_schema
'gitlab_secondary'.freeze
def self.fdw_table(table_name)
FDW_SCHEMA + ".#{table_name}"
end
def self.repository_sync_job
......
......@@ -25,8 +25,8 @@ describe WikiPages::CreateService do
end
it 'triggers Geo::RepositoryUpdatedEventStore when Geo is enabled' do
expect(Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(Geo::RepositoryUpdatedEventStore).to receive(:create)
expect(::Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(::Geo::RepositoryUpdatedEventStore).to receive(:create)
service.execute
end
......
......@@ -18,8 +18,8 @@ describe WikiPages::DestroyService do
end
it 'triggers Geo::RepositoryUpdatedEventStore when Geo is enabled' do
expect(Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(Geo::RepositoryUpdatedEventStore).to receive(:create)
expect(::Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: ::Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(::Geo::RepositoryUpdatedEventStore).to receive(:create)
service.execute(page)
end
......
......@@ -26,8 +26,8 @@ describe WikiPages::UpdateService do
end
it 'triggers Geo::RepositoryUpdatedEventStore when Geo is enabled' do
expect(Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(Geo::RepositoryUpdatedEventStore).to receive(:create)
expect(::Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(::Geo::RepositoryUpdatedEventStore).to receive(:create)
service.execute(page)
end
......
......@@ -26,7 +26,7 @@ describe PostReceive do
end
it 'calls Geo::RepositoryUpdatedEventStore' do
expect_any_instance_of(Geo::RepositoryUpdatedEventStore).to receive(:create)
expect_any_instance_of(::Geo::RepositoryUpdatedEventStore).to receive(:create)
described_class.new.perform(gl_repository, key_id, base64_changes)
end
......@@ -39,8 +39,8 @@ describe PostReceive do
it 'triggers Geo::RepositoryUpdatedEventStore when Geo is enabled' do
allow(Gitlab::Geo).to receive(:enabled?) { true }
expect(Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(Geo::RepositoryUpdatedEventStore).to receive(:create)
expect(::Geo::RepositoryUpdatedEventStore).to receive(:new).with(instance_of(Project), source: ::Geo::RepositoryUpdatedEvent::WIKI).and_call_original
expect_any_instance_of(::Geo::RepositoryUpdatedEventStore).to receive(:create)
described_class.new.perform(gl_repository, key_id, base64_changes)
end
......
require 'spec_helper'
describe Geo::FileDownloadDispatchWorker, :geo do
# Disable transactions via :truncate method because a foreign table
# can't see changes inside a transaction of a different connection.
describe Geo::FileDownloadDispatchWorker, :geo, :truncate do
include ::EE::GeoHelpers
set(:primary) { create(:geo_node, :primary, host: 'primary-geo-node') }
set(:secondary) { create(:geo_node) }
let(:primary) { create(:geo_node, :primary, host: 'primary-geo-node') }
let(:secondary) { create(:geo_node) }
before do
stub_current_geo_node(secondary)
......@@ -16,7 +18,11 @@ describe Geo::FileDownloadDispatchWorker, :geo do
subject { described_class.new }
describe '#perform' do
shared_examples '#perform' do |skip_tests|
before do
skip if skip_tests
end
it 'does not schedule anything when secondary role is disabled' do
create(:lfs_object, :with_file)
......@@ -25,6 +31,10 @@ describe Geo::FileDownloadDispatchWorker, :geo do
expect(GeoFileDownloadWorker).not_to receive(:perform_async)
subject.perform
# We need to unstub here or the DatabaseCleaner will have issues since it
# will appear as though the tracking DB were not available
allow(Gitlab::Geo).to receive(:geo_database_configured?).and_call_original
end
it 'does not schedule anything when node is disabled' do
......@@ -144,4 +154,17 @@ describe Geo::FileDownloadDispatchWorker, :geo do
end
end
end
describe 'when PostgreSQL FDW is available', :geo do
# Skip if FDW isn't activated on this database
it_behaves_like '#perform', Gitlab::Database.postgresql? && !Gitlab::Geo.fdw?
end
describe 'when PostgreSQL FDW is not enabled', :geo do
before do
allow(Gitlab::Geo).to receive(:fdw?).and_return(false)
end
it_behaves_like '#perform', false
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment