Commit 3f010a30 authored by Valery Sizov's avatar Valery Sizov

Merge branch '213872-geo-implement-backfill-of-packagefiles-blobs-2' into 'master'

Geo: Implement backfill of PackageFiles/Blobs

See merge request gitlab-org/gitlab!31943
parents 6123dff4 e0fe59db
......@@ -469,6 +469,11 @@ production: &base
geo_file_download_dispatch_worker:
cron: "*/1 * * * *"
# GitLab Geo registry sync worker (for backfilling)
# NOTE: This will only take effect if Geo is enabled (secondary nodes only)
geo_registry_sync_worker:
cron: "*/1 * * * *"
# GitLab Geo migrated local files clean up worker
# NOTE: This will only take effect if Geo is enabled (secondary nodes only)
geo_migrated_local_files_clean_up_worker:
......
......@@ -513,6 +513,9 @@ Gitlab.ee do
Settings.cron_jobs['geo_file_download_dispatch_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_file_download_dispatch_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['geo_file_download_dispatch_worker']['job_class'] ||= 'Geo::FileDownloadDispatchWorker'
Settings.cron_jobs['geo_registry_sync_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_registry_sync_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['geo_registry_sync_worker']['job_class'] ||= 'Geo::RegistrySyncWorker'
Settings.cron_jobs['geo_metrics_update_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_metrics_update_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['geo_metrics_update_worker']['job_class'] ||= 'Geo::MetricsUpdateWorker'
......
......@@ -34,4 +34,21 @@ class Geo::BaseRegistry < Geo::TrackingBase
def self.delete_for_model_ids(ids)
raise NotImplementedError, "#{self.class} does not implement #{__method__}"
end
def self.find_unsynced_registries(batch_size:, except_ids: [])
pending
.model_id_not_in(except_ids)
.limit(batch_size)
end
def self.find_failed_registries(batch_size:, except_ids: [])
failed
.retry_due
.model_id_not_in(except_ids)
.limit(batch_size)
end
def model_record_id
read_attribute(self.class::MODEL_FOREIGN_KEY)
end
end
......@@ -4,6 +4,8 @@ class Geo::PackageFileRegistry < Geo::BaseRegistry
include ::Delay
include ShaAttribute
MODEL_FOREIGN_KEY = :package_file_id
def self.declarative_policy_class
'Geo::RegistryPolicy'
end
......@@ -20,6 +22,7 @@ class Geo::PackageFileRegistry < Geo::BaseRegistry
scope :never, -> { where(last_synced_at: nil) }
scope :failed, -> { with_state(:failed) }
scope :synced, -> { with_state(:synced) }
scope :pending, -> { with_state(:pending) }
scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current))) }
scope :ordered, -> { order(:id) }
......
......@@ -75,6 +75,14 @@
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:geo_registry_sync
:feature_category: :geo_replication
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:geo_repository_sync
:feature_category: :geo_replication
:has_external_dependencies:
......
# frozen_string_literal: true
module Geo
class RegistrySyncWorker < Geo::Scheduler::Secondary::SchedulerWorker
# This worker does not perform work scoped to a context
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
idempotent!
private
# We use inexpensive queries now so we don't need a backoff time
#
# Overrides Geo::Scheduler::SchedulerWorker#should_apply_backoff?
def should_apply_backoff?
false
end
def max_capacity
# Transition-period-solution.
# Explained in https://gitlab.com/gitlab-org/gitlab/-/issues/213872#note_336828581
[current_node.files_max_capacity / 4, 1].max
end
def schedule_job(replicable_name, model_record_id)
job_id = ::Geo::EventWorker.perform_async(replicable_name, :created, model_record_id: model_record_id)
{ model_record_id: model_record_id, replicable_name: replicable_name, job_id: job_id } if job_id
end
# Pools for new resources to be transferred
#
# @return [Array] resources to be transferred
def load_pending_resources
resources = find_unsynced_jobs(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.count
if remaining_capacity.zero?
resources
else
resources + find_low_priority_jobs(batch_size: remaining_capacity)
end
end
# Get a batch of unsynced resources, taking equal parts from each resource.
#
# @return [Array] job arguments of unsynced resources
def find_unsynced_jobs(batch_size:)
jobs = replicator_classes.reduce([]) do |jobs, replicator_class|
except_ids = scheduled_replicable_ids(replicator_class.replicable_name)
jobs << replicator_class
.find_unsynced_registries(batch_size: batch_size, except_ids: except_ids)
.map { |registry| [replicator_class.replicable_name, registry.model_record_id] }
end
take_batch(*jobs, batch_size: batch_size)
end
# Get a batch of failed and synced-but-missing-on-primary resources, taking
# equal parts from each resource.
#
# @return [Array] job arguments of low priority resources
def find_low_priority_jobs(batch_size:)
jobs = replicator_classes.reduce([]) do |jobs, replicator_class|
except_ids = scheduled_replicable_ids(replicator_class.replicable_name)
jobs << replicator_class
.find_failed_registries(batch_size: batch_size, except_ids: except_ids)
.map { |registry| [replicator_class.replicable_name, registry.model_record_id] }
end
take_batch(*jobs, batch_size: batch_size)
end
def scheduled_replicable_ids(replicable_name)
scheduled_jobs.select { |data| data[:replicable_name] == replicable_name }.map { |data| data[:model_record_id] }
end
def replicator_classes
Gitlab::Geo::ReplicableModel.replicators
end
end
end
......@@ -15,6 +15,7 @@ module Gitlab
SECONDARY_JOBS = %w[
geo_file_download_dispatch_worker
geo_registry_sync_worker
geo_migrated_local_files_clean_up_worker
geo_repository_sync_worker
geo_container_repository_sync_worker
......
......@@ -24,6 +24,8 @@ module Gitlab
def with_replicator(klass)
raise ArgumentError, 'Must be a class inheriting from Gitlab::Geo::Replicator' unless klass < ::Gitlab::Geo::Replicator
Gitlab::Geo::ReplicableModel.add_replicator(klass)
class_eval <<-RUBY, __FILE__, __LINE__ + 1
define_method :replicator do
@_replicator ||= klass.new(model_record: self)
......@@ -32,6 +34,16 @@ module Gitlab
end
end
def self.add_replicator(klass)
@_replicators ||= []
@_replicators << klass
end
def self.replicators
@_replicators ||= []
@_replicators.filter { |replicator| const_defined?(replicator.to_s) }
end
# Geo Replicator
#
# @abstract
......
......@@ -17,8 +17,13 @@ module Gitlab
CLASS_SUFFIXES = %w(RegistryFinder RegistriesResolver).freeze
attr_reader :model_record_id
delegate :model, to: :class
class << self
delegate :find_unsynced_registries, :find_failed_registries, to: :registry_class
end
# Declare supported event
#
# @example Declaring support for :update and :delete events
......
......@@ -11,6 +11,7 @@ RSpec.describe Gitlab::Geo::CronManager, :geo do
geo_repository_verification_primary_batch_worker
geo_repository_sync_worker
geo_file_download_dispatch_worker
geo_registry_sync_worker
geo_container_repository_sync_worker
geo_repository_verification_secondary_scheduler_worker
geo_metrics_update_worker
......@@ -35,6 +36,7 @@ RSpec.describe Gitlab::Geo::CronManager, :geo do
let(:secondary_jobs) do
[
job('geo_file_download_dispatch_worker'),
job('geo_registry_sync_worker'),
job('geo_repository_sync_worker'),
job('geo_container_repository_sync_worker'),
job('geo_repository_verification_secondary_scheduler_worker'),
......
......@@ -7,4 +7,6 @@ RSpec.describe Geo::PackageFileRegistry, :geo, type: :model do
let(:valid_items_for_bulk_insertion) { build_list(:package_file_registry, 10, created_at: Time.zone.now) }
let(:invalid_items_for_bulk_insertion) { [] } # class does not have any validations defined
end
include_examples 'a Geo framework registry'
end
# frozen_string_literal: true
shared_examples 'a Geo framework registry' do
let(:registry_class_factory) { described_class.underscore.tr('/', '_').sub('geo_', '').to_sym }
let!(:failed_item1) { create(registry_class_factory, :failed) }
let!(:failed_item2) { create(registry_class_factory, :failed) }
let!(:unsynced_item1) { create(registry_class_factory) }
let!(:unsynced_item2) { create(registry_class_factory) }
describe '.find_unsynced_registries' do
it 'returns unsynced items' do
result = described_class.find_unsynced_registries(batch_size: 10)
expect(result).to include(unsynced_item1, unsynced_item2)
end
it 'returns unsynced items except some specific item ID' do
except_id = unsynced_item1.model_record_id
result = described_class.find_unsynced_registries(batch_size: 10, except_ids: [except_id])
expect(result).to include(unsynced_item2)
expect(result).not_to include(unsynced_item1)
end
end
describe '.find_failed_registries' do
it 'returns failed items' do
result = described_class.find_failed_registries(batch_size: 10)
expect(result).to include(failed_item1, failed_item2)
end
it 'returns failed items except some specific item ID' do
except_id = failed_item1.model_record_id
result = described_class.find_failed_registries(batch_size: 10, except_ids: [except_id])
expect(result).to include(failed_item2)
expect(result).not_to include(failed_item1)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::RegistrySyncWorker, :geo, :use_sql_query_cache_for_tracking_db do
include ::EE::GeoHelpers
include ExclusiveLeaseHelpers
let(:primary) { create(:geo_node, :primary) }
let(:secondary) { create(:geo_node) }
before do
stub_current_geo_node(secondary)
stub_exclusive_lease(renew: true)
allow_next_instance_of(described_class) do |instance|
allow(instance).to receive(:over_time?).and_return(false)
end
end
it 'does not schedule anything when tracking database is not configured' do
create(:package_file_registry)
expect(::Geo::EventWorker).not_to receive(:perform_async)
with_no_geo_database_configured do
subject.perform
end
end
it 'does not schedule anything when node is disabled' do
create(:package_file_registry)
secondary.enabled = false
secondary.save!
expect(::Geo::EventWorker).not_to receive(:perform_async)
subject.perform
end
it 'does not schedule duplicated jobs' do
package_file_1 = create(:package_file_registry)
package_file_2 = create(:package_file_registry)
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 5)
secondary.update!(files_max_capacity: 8)
allow(Gitlab::SidekiqStatus).to receive(:job_status).with([]).and_return([]).twice
allow(Gitlab::SidekiqStatus).to receive(:job_status).with(array_including('123', '456')).and_return([true, true], [true, true], [false, false])
expect(::Geo::EventWorker)
.to receive(:perform_async)
.with('package_file', :created, { model_record_id: package_file_1.package_file.id })
.once
.and_return('123')
expect(::Geo::EventWorker)
.to receive(:perform_async)
.with('package_file', :created, { model_record_id: package_file_2.package_file.id })
.once
.and_return('456')
subject.perform
end
it 'does not schedule duplicated jobs because of query cache' do
package_file_1 = create(:package_file_registry)
package_file_2 = create(:package_file_registry)
# We retrieve all the items in a single batch
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 2)
# 8 / 4 = 2 We use one quarter of common files_max_capacity in the Geo::RegistrySyncWorker
secondary.update!(files_max_capacity: 8)
expect(Geo::EventWorker).to receive(:perform_async).with('package_file', :created, { model_record_id: package_file_1.package_file.id }).once do
Thread.new do
# Rails will invalidate the query cache if the update happens in the same thread
Geo::PackageFileRegistry.update(state: Geo::PackageFileRegistry::STATE_VALUES[:synced])
end
end
expect(Geo::EventWorker).to receive(:perform_async)
.with('package_file', :created, { model_record_id: package_file_2.package_file.id })
.once
subject.perform
end
# Test the case where we have:
#
# 1. A total of 10 files in the queue, and we can load a maximimum of 5 and send 2 at a time.
# 2. We send 2, wait for 1 to finish, and then send again.
it 'attempts to load a new batch without pending downloads' do
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 5)
# 8 / 4 = 2 We use one quarter of common files_max_capacity in the Geo::RegistrySyncWorker
secondary.update!(files_max_capacity: 8)
result_object = double(
:result,
success: true,
bytes_downloaded: 100,
primary_missing_file: false,
reason: '',
extra_details: {}
)
allow_any_instance_of(::Gitlab::Geo::Replication::BlobDownloader).to receive(:execute).and_return(result_object)
create_list(:package_file_registry, 10)
expect(::Geo::EventWorker).to receive(:perform_async).exactly(10).times.and_call_original
# For 10 downloads, we expect four database reloads:
# 1. Load the first batch of 5.
# 2. 4 get sent out, 1 remains. This triggers another reload, which loads in the next 5.
# 3. Those 4 get sent out, and 1 remains.
# 3. Since the second reload filled the pipe with 4, we need to do a final reload to ensure
# zero are left.
expect(subject).to receive(:load_pending_resources).exactly(4).times.and_call_original
Sidekiq::Testing.inline! do
subject.perform
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment