Commit 6fa0415a authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch '10586-geo-object-storage-replication' into 'master'

Geo: Refactor datasources to allow for replication of content in Object Storage

See merge request gitlab-org/gitlab-ee!13997
parents 428c5947 7d1f8ba4
......@@ -2,6 +2,8 @@
module Geo
class AttachmentRegistryFinder < FileRegistryFinder
# Counts all existing registries independent
# of any change on filters / selective sync
def count_registry
Geo::FileRegistry.attachments.count
end
......@@ -11,27 +13,25 @@ module Geo
end
def count_synced
registries_for_attachments.syncable.merge(Geo::FileRegistry.synced).count
registries_for_attachments.merge(Geo::FileRegistry.synced).count
end
def count_failed
registries_for_attachments.syncable.merge(Geo::FileRegistry.failed).count
registries_for_attachments.merge(Geo::FileRegistry.failed).count
end
def count_synced_missing_on_primary
registries_for_attachments
.syncable
.merge(Geo::FileRegistry.synced)
.merge(Geo::FileRegistry.missing_on_primary)
.count
end
def syncable
if selective_sync?
attachments.syncable
else
Upload.syncable
end
return attachments if selective_sync?
return Upload.with_files_stored_locally if local_storage_only?
Upload
end
# Find limited amount of non replicated attachments.
......@@ -48,7 +48,6 @@ module Geo
def find_unsynced(batch_size:, except_file_ids: [])
attachments
.missing_file_registry
.syncable
.id_not_in(except_file_ids)
.limit(batch_size)
end
......@@ -56,7 +55,7 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord
def find_migrated_local(batch_size:, except_file_ids: [])
attachments
all_attachments
.inner_join_file_registry
.with_files_stored_remotely
.merge(Geo::FileRegistry.attachments)
......@@ -91,6 +90,10 @@ module Geo
private
def attachments
local_storage_only? ? all_attachments.with_files_stored_locally : all_attachments
end
def all_attachments
current_node.attachments
end
......
......@@ -77,5 +77,9 @@ module Geo
def find_retryable_synced_missing_on_primary_registries
raise NotImplementedError
end
def local_storage_only?
!current_node&.sync_object_storage
end
end
end
......@@ -2,6 +2,8 @@
module Geo
class JobArtifactRegistryFinder < FileRegistryFinder
# Counts all existing registries independent
# of any change on filters / selective sync
def count_registry
Geo::JobArtifactRegistry.count
end
......@@ -23,11 +25,10 @@ module Geo
end
def syncable
if selective_sync?
job_artifacts.syncable
else
Ci::JobArtifact.syncable
end
return job_artifacts.not_expired if selective_sync?
return Ci::JobArtifact.not_expired.with_files_stored_locally if local_storage_only?
Ci::JobArtifact.not_expired
end
# Find limited amount of non replicated job artifacts.
......@@ -43,7 +44,7 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_artifact_ids: [])
job_artifacts
.syncable
.not_expired
.missing_job_artifact_registry
.id_not_in(except_artifact_ids)
.limit(batch_size)
......@@ -52,7 +53,7 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord
def find_migrated_local(batch_size:, except_artifact_ids: [])
job_artifacts
all_job_artifacts
.inner_join_job_artifact_registry
.with_files_stored_remotely
.id_not_in(except_artifact_ids)
......@@ -84,13 +85,17 @@ module Geo
private
def job_artifacts
local_storage_only? ? all_job_artifacts.with_files_stored_locally : all_job_artifacts
end
def all_job_artifacts
current_node.job_artifacts
end
def registries_for_job_artifacts
job_artifacts
.inner_join_job_artifact_registry
.syncable
.not_expired
end
end
end
......@@ -2,6 +2,8 @@
module Geo
class LfsObjectRegistryFinder < FileRegistryFinder
# Counts all existing registries independent
# of any change on filters / selective sync
def count_registry
Geo::FileRegistry.lfs_objects.count
end
......@@ -23,11 +25,10 @@ module Geo
end
def syncable
if selective_sync?
current_node.lfs_objects.syncable
else
LfsObject.syncable
end
return lfs_objects if selective_sync?
return LfsObject.with_files_stored_locally if local_storage_only?
LfsObject
end
# Find limited amount of non replicated lfs objects.
......@@ -39,9 +40,7 @@ module Geo
# @param [Array<Integer>] except_file_ids ids that will be ignored from the query
# rubocop:disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_file_ids: [])
current_node
.lfs_objects
.syncable
lfs_objects
.missing_file_registry
.id_not_in(except_file_ids)
.limit(batch_size)
......@@ -50,7 +49,7 @@ module Geo
# rubocop:disable CodeReuse/ActiveRecord
def find_migrated_local(batch_size:, except_file_ids: [])
lfs_objects
all_lfs_objects
.inner_join_file_registry
.with_files_stored_remotely
.id_not_in(except_file_ids)
......@@ -82,6 +81,10 @@ module Geo
private
def lfs_objects
local_storage_only? ? all_lfs_objects.with_files_stored_locally : all_lfs_objects
end
def all_lfs_objects
current_node.lfs_objects
end
......
......@@ -128,13 +128,11 @@ module Geo::SelectiveSync
end
def attachments_for_model_type_with_id_in(model_type, model_ids)
# This query was intentionally converted to a raw one to get it work in Rails 5.0.
# In Rails 5.0 and 5.1 there's a bug: https://github.com/rails/arel/issues/531
# Please convert it back when on rails 5.2 as it works again as expected since 5.2.
column_name = "#{uploads_table.name}.#{uploads_table[:model_id].name}"
raw_sql = Arel::Nodes::SqlLiteral.new("#{column_name} IN (#{model_ids.to_sql})")
uploads_table[:model_type].eq(model_type).and(raw_sql)
uploads_table[:model_type]
.eq(model_type)
.and(
uploads_table[:model_id].in(model_ids.arel)
)
end
# This concern doesn't define a geo_node_namespace_links relation. That's
......@@ -165,8 +163,7 @@ module Geo::SelectiveSync
end
def projects_table
raise NotImplementedError,
"#{self.class} does not implement #{__method__}"
project_model.arel_table
end
def uploads_model
......@@ -175,7 +172,6 @@ module Geo::SelectiveSync
end
def uploads_table
raise NotImplementedError,
"#{self.class} does not implement #{__method__}"
uploads_model.arel_table
end
end
......@@ -20,7 +20,6 @@ module EE
scope :not_expired, -> { where('expire_at IS NULL OR expire_at > ?', Time.current) }
scope :project_id_in, ->(ids) { joins(:project).merge(::Project.id_in(ids)) }
scope :syncable, -> { with_files_stored_locally.not_expired }
scope :with_files_stored_remotely, -> { where(file_store: ::JobArtifactUploader::Store::REMOTE) }
scope :security_reports, -> do
......
......@@ -16,7 +16,6 @@ module EE
after_destroy :log_geo_deleted_event
scope :project_id_in, ->(ids) { joins(:projects).merge(::Project.id_in(ids)) }
scope :syncable, -> { with_files_stored_locally }
end
def log_geo_deleted_event
......
......@@ -14,7 +14,6 @@ module Geo
scope :not_expired, -> { where('expire_at IS NULL OR expire_at > ?', Time.current) }
scope :project_id_in, ->(ids) { joins(:project).merge(Geo::Fdw::Project.id_in(ids)) }
scope :syncable, -> { with_files_stored_locally.not_expired }
class << self
def inner_join_job_artifact_registry
......
......@@ -100,17 +100,9 @@ module Geo
Geo::Fdw::Project
end
def projects_table
Geo::Fdw::Project.arel_table
end
def uploads_model
Geo::Fdw::Upload
end
def uploads_table
Geo::Fdw::Upload.arel_table
end
end
end
end
......@@ -14,12 +14,10 @@ module Geo
has_many :projects, class_name: 'Geo::Fdw::Project', through: :lfs_objects_projects
scope :project_id_in, ->(ids) { joins(:projects).merge(Geo::Fdw::Project.id_in(ids)) }
scope :syncable, -> { with_files_stored_locally }
class << self
def failed
inner_join_file_registry
.syncable
.merge(Geo::FileRegistry.failed)
end
......@@ -45,7 +43,6 @@ module Geo
def synced
inner_join_file_registry
.syncable
.merge(Geo::FileRegistry.synced)
end
......
......@@ -11,7 +11,7 @@ module Geo
self.primary_key = :id
self.table_name = Gitlab::Geo::Fdw.foreign_table_name('uploads')
scope :syncable, -> { with_files_stored_locally }
has_one :registry, class_name: 'Geo::FileRegistry', foreign_key: :file_id
class << self
def for_model(model)
......@@ -21,12 +21,7 @@ module Geo
end
def inner_join_file_registry
join_statement =
arel_table
.join(file_registry_table, Arel::Nodes::InnerJoin)
.on(arel_table[:id].eq(file_registry_table[:file_id]))
joins(join_statement.join_sources)
joins(:registry)
end
def missing_file_registry
......
......@@ -381,15 +381,7 @@ class GeoNode < ApplicationRecord
Project
end
def projects_table
Project.arel_table
end
def uploads_model
Upload
end
def uploads_table
Upload.arel_table
end
end
......@@ -279,9 +279,9 @@ class GeoNodeStatus < ApplicationRecord
def load_primary_data
return unless Gitlab::Geo.primary?
self.lfs_objects_count = LfsObject.syncable.count
self.job_artifacts_count = Ci::JobArtifact.syncable.count
self.attachments_count = Upload.syncable.count
self.lfs_objects_count = LfsObject.count
self.job_artifacts_count = Ci::JobArtifact.not_expired.count
self.attachments_count = Upload.count
self.replication_slots_count = geo_node.replication_slots_count
self.replication_slots_used_count = geo_node.replication_slots_used_count
......
---
title: ' Geo: Refactor data-sources to allow for replication of content in Object Storage'
merge_request: 13997
author:
type: changed
......@@ -16,5 +16,9 @@ FactoryBot.define do
minimum_reverification_interval 7
sync_object_storage false
end
trait :local_storage_only do
sync_object_storage false
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::FileRegistryFinder, :geo, :geo_fdw do
include ::EE::GeoHelpers
context 'with abstract methods' do
%w[
syncable
count_syncable
count_synced
count_failed
count_synced_missing_on_primary
count_registry
find_unsynced
find_migrated_local
find_retryable_failed_registries
find_retryable_synced_missing_on_primary_registries
].each do |required_method|
it "requires subclasses to implement #{required_method}" do
expect { subject.send(required_method) }.to raise_error(NotImplementedError)
end
end
end
context '#local_storage_only?' do
subject { described_class.new(current_node_id: geo_node.id) }
context 'sync_object_storage is enabled' do
let(:geo_node) { create(:geo_node, sync_object_storage: true) }
it 'returns false' do
expect(subject.local_storage_only?).to be_falsey
end
end
context 'sync_object_storage is disabled' do
let(:geo_node) { create(:geo_node, sync_object_storage: false) }
it 'returns true' do
expect(subject.local_storage_only?).to be_truthy
end
end
end
end
......@@ -42,4 +42,10 @@ describe Gitlab::Geo::FileDownloader, :geo do
stub_request(:get, url).to_return(status: 200, body: upload.build_uploader.file.read, headers: {})
end
def stub_geo_file_transfer_object_storage
url = primary_node.geo_transfers_url(file_type, upload.id.to_s)
stub_request(:get, url).to_return(status: 307, body: upload.build_uploader.url, headers: {})
end
end
......@@ -147,18 +147,10 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
context 'with LFS objects' do
let!(:lfs_object_local_store) { create(:lfs_object, :with_file) }
let!(:lfs_object_remote_store) { create(:lfs_object, :with_file) }
let!(:lfs_object_remote_store) { create(:lfs_object, :with_file, :object_storage) }
before do
stub_lfs_object_storage
lfs_object_remote_store.file.migrate!(LfsObjectUploader::Store::REMOTE)
end
it 'filters S3-backed files' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
context 'with files missing on the primary that are marked as synced' do
......@@ -171,6 +163,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
it 'retries the files if there is spare capacity' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_file_missing_on_primary.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
......@@ -178,7 +171,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
it 'does not retry those files if there is no spare capacity' do
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).once
subject.perform
end
......@@ -186,7 +179,9 @@ describe Geo::FileDownloadDispatchWorker, :geo, :geo_fdw do
it 'does not retry those files if they are already scheduled' do
scheduled_jobs = [{ type: 'lfs', id: lfs_object_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', lfs_object_remote_store.id)
subject.perform
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment