Geo - Extract a base scheduler worker

parent 4a5bc019
module Geo
class BaseSchedulerWorker
include Sidekiq::Worker
include CronjobQueue
def initialize
@pending_resources = []
@scheduled_jobs = []
end
# The scheduling works as the following:
#
# 1. Load a batch of IDs that we need to schedule (DB_RETRIEVE_BATCH_SIZE) into a pending list.
# 2. Schedule them so that at most MAX_CAPACITY are running at once.
# 3. When a slot frees, schedule another job.
# 4. When we have drained the pending list, load another batch into memory, and schedule the
# remaining jobs, excluding ones in progress.
# 5. Quit when we have scheduled all jobs or exceeded MAX_RUNTIME.
def perform
return unless Gitlab::Geo.secondary_role_enabled?
return unless Gitlab::Geo.secondary?
logger.info "Started #{self.class.name}"
@start_time = Time.now
# Prevent multiple Sidekiq workers from attempting to schedule projects synchronization
try_obtain_lease do
loop do
break unless node_enabled?
update_jobs_in_progress
load_pending_resources if reload_queue?
# If we are still under the limit after refreshing our DB, we can end
# after scheduling the remaining transfers.
last_batch = reload_queue?
break if over_time?
break unless resources_remain?
schedule_jobs
break if last_batch
break unless renew_lease!
sleep(1)
end
logger.info "Finished #{self.class.name}"
end
end
private
def reload_queue?
@pending_resources.size < max_capacity
end
def resources_remain?
@pending_resources.size > 0
end
def over_time?
Time.now - @start_time >= run_time
end
def interleave(first, second)
if first.length >= second.length
first.zip(second)
else
second.zip(first).map(&:reverse)
end.flatten(1).uniq.compact.take(db_retrieve_batch_size)
end
def update_jobs_in_progress
status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids)
# SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise.
# For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, completed)| job if completed }.compact
end
def scheduled_job_ids
@scheduled_jobs.map { |data| data[:job_id] }
end
def try_obtain_lease
lease = exclusive_lease.try_obtain
unless lease
logger.info " #{self.class.name}: Cannot obtain an exclusive lease. There must be another worker already in execution."
return
end
begin
yield lease
ensure
release_lease(lease)
end
end
def exclusive_lease
@lease ||= Gitlab::ExclusiveLease.new(lease_key, timeout: lease_timeout)
end
def renew_lease!
exclusive_lease.renew
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(lease_key, uuid)
end
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && @last_enabled_check > 1.minute.ago
@last_enabled_check = Time.now
@current_node_enabled = nil
end
@current_node_enabled ||= Gitlab::Geo.current_node_enabled?
end
end
end
class GeoFileDownloadDispatchWorker
include Sidekiq::Worker
include CronjobQueue
class GeoFileDownloadDispatchWorker < Geo::BaseSchedulerWorker
LEASE_KEY = 'geo_file_download_dispatch_worker'.freeze
LEASE_TIMEOUT = 8.hours.freeze
RUN_TIME = 60.minutes.to_i.freeze
DB_RETRIEVE_BATCH = 1000.freeze
MAX_CONCURRENT_DOWNLOADS = 10.freeze
def initialize
@pending_downloads = []
@scheduled_jobs = []
end
# The scheduling works as the following:
#
# 1. Load a batch of IDs that we need to download from the primary (DB_RETRIEVE_BATCH) into a pending list.
# 2. Schedule them so that at most MAX_CONCURRENT_DOWNLOADS are running at once.
# 3. When a slot frees, schedule another download.
# 4. When we have drained the pending list, load another batch into memory, and schedule the remaining
# files, excluding ones in progress.
# 5. Quit when we have scheduled all downloads or exceeded an hour.
def perform
return unless Gitlab::Geo.secondary_role_enabled?
return unless Gitlab::Geo.secondary?
@start_time = Time.now
# Prevent multiple Sidekiq workers from attempting to schedule downloads
try_obtain_lease do
loop do
break unless node_enabled?
update_jobs_in_progress
load_pending_downloads if reload_queue?
# If we are still under the limit after refreshing our DB, we can end
# after scheduling the remaining transfers.
last_batch = reload_queue?
break if over_time?
break unless downloads_remain?
schedule_downloads
break if last_batch
sleep(1)
end
end
end
LEASE_TIMEOUT = 10.minutes
DB_RETRIEVE_BATCH_SIZE = 1000
MAX_CAPACITY = 10
RUN_TIME = 60.minutes.to_i
private
def reload_queue?
@pending_downloads.size < MAX_CONCURRENT_DOWNLOADS
def lease_key
LEASE_KEY
end
def over_time?
Time.now - @start_time >= RUN_TIME
def lease_timeout
LEASE_TIMEOUT
end
def load_pending_downloads
lfs_object_ids = find_lfs_object_ids(DB_RETRIEVE_BATCH)
objects_ids = find_object_ids(DB_RETRIEVE_BATCH)
@pending_downloads = interleave(lfs_object_ids, objects_ids)
def db_retrieve_batch_size
DB_RETRIEVE_BATCH_SIZE
end
def interleave(first, second)
if first.length >= second.length
first.zip(second)
else
second.zip(first).map(&:reverse)
end.flatten(1).compact.take(DB_RETRIEVE_BATCH)
def max_capacity
MAX_CAPACITY
end
def downloads_remain?
@pending_downloads.size
def run_time
RUN_TIME
end
def schedule_downloads
num_to_schedule = [MAX_CONCURRENT_DOWNLOADS - job_ids.size, @pending_downloads.size].min
def schedule_jobs
num_to_schedule = [MAX_CAPACITY - scheduled_job_ids.size, @pending_resources.size].min
return unless downloads_remain?
return unless resources_remain?
num_to_schedule.times do
object_db_id, object_type = @pending_downloads.shift
object_db_id, object_type = @pending_resources.shift
job_id = GeoFileDownloadWorker.perform_async(object_type, object_db_id)
if job_id
......@@ -95,6 +42,13 @@ class GeoFileDownloadDispatchWorker
end
end
def load_pending_resources
lfs_object_ids = find_lfs_object_ids(DB_RETRIEVE_BATCH_SIZE)
objects_ids = find_object_ids(DB_RETRIEVE_BATCH_SIZE)
@pending_resources = interleave(lfs_object_ids, objects_ids)
end
def find_object_ids(limit)
downloaded_ids = find_downloaded_ids([:attachment, :avatar, :file])
......@@ -117,47 +71,10 @@ class GeoFileDownloadDispatchWorker
def find_downloaded_ids(file_types)
downloaded_ids = Geo::FileRegistry.where(file_type: file_types).pluck(:file_id)
(downloaded_ids + scheduled_ids(file_types)).uniq
(downloaded_ids + scheduled_file_ids(file_types)).uniq
end
def update_jobs_in_progress
status = Gitlab::SidekiqStatus.job_status(job_ids)
# SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise.
# For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, completed)| job if completed }.compact
end
def job_ids
@scheduled_jobs.map { |data| data[:job_id] }
end
def scheduled_ids(types)
def scheduled_file_ids(types)
@scheduled_jobs.select { |data| types.include?(data[:type]) }.map { |data| data[:id] }
end
def try_obtain_lease
uuid = Gitlab::ExclusiveLease.new(LEASE_KEY, timeout: LEASE_TIMEOUT).try_obtain
return unless uuid
yield
release_lease(uuid)
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(LEASE_KEY, uuid)
end
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && @last_enabled_check > 1.minute.ago
@last_enabled_check = Time.now
@current_node_enabled = nil
end
@current_node_enabled ||= Gitlab::Geo.current_node_enabled?
end
end
class GeoRepositorySyncWorker
include Sidekiq::Worker
include CronjobQueue
class GeoRepositorySyncWorker < Geo::BaseSchedulerWorker
LEASE_KEY = 'geo_repository_sync_worker'.freeze
LEASE_TIMEOUT = 10.minutes
BATCH_SIZE = 1000
BACKOFF_DELAY = 5.minutes
DB_RETRIEVE_BATCH_SIZE = 1000
MAX_CAPACITY = 25
RUN_TIME = 60.minutes.to_i
def initialize
@pending_projects = []
@scheduled_jobs = []
end
def perform
return unless Gitlab::Geo.secondary_role_enabled?
return unless Gitlab::Geo.primary_node.present?
logger.info "Started Geo repository sync scheduler"
@start_time = Time.now
# Prevent multiple Sidekiq workers from attempting to schedule projects synchronization
try_obtain_lease do
loop do
break unless node_enabled?
update_jobs_in_progress
load_pending_projects if reload_queue?
BACKOFF_DELAY = 5.minutes
# If we are still under the limit after refreshing our DB, we can end
# after scheduling the remaining transfers.
last_batch = reload_queue?
private
break if over_time?
break unless projects_remain?
def lease_key
LEASE_KEY
end
schedule_jobs
def lease_timeout
LEASE_TIMEOUT
end
break if last_batch
break unless renew_lease!
def db_retrieve_batch_size
DB_RETRIEVE_BATCH_SIZE
end
sleep(1)
end
def max_capacity
MAX_CAPACITY
end
logger.info "Finished Geo repository sync scheduler"
end
def run_time
RUN_TIME
end
private
def schedule_jobs
num_to_schedule = [MAX_CAPACITY - scheduled_job_ids.size, @pending_resources.size].min
def reload_queue?
@pending_projects.size < MAX_CAPACITY
end
return unless resources_remain?
def projects_remain?
@pending_projects.size > 0
end
num_to_schedule.times do
project_id = @pending_resources.shift
job_id = Geo::ProjectSyncWorker.perform_in(BACKOFF_DELAY, project_id, Time.now)
def over_time?
Time.now - @start_time >= RUN_TIME
@scheduled_jobs << { id: project_id, job_id: job_id } if job_id
end
end
def load_pending_projects
def load_pending_resources
project_ids_not_synced = find_project_ids_not_synced
project_ids_updated_recently = find_project_ids_updated_recently
@pending_projects = interleave(project_ids_not_synced, project_ids_updated_recently)
@pending_resources = interleave(project_ids_not_synced, project_ids_updated_recently)
end
def find_project_ids_not_synced
Project.where.not(id: Geo::ProjectRegistry.synced.pluck(:project_id))
.order(last_repository_updated_at: :desc)
.limit(BATCH_SIZE)
.limit(DB_RETRIEVE_BATCH_SIZE)
.pluck(:id)
end
def find_project_ids_updated_recently
Geo::ProjectRegistry.dirty
.order(Gitlab::Database.nulls_first_order(:last_repository_synced_at, :desc))
.limit(BATCH_SIZE)
.limit(DB_RETRIEVE_BATCH_SIZE)
.pluck(:project_id)
end
def interleave(first, second)
if first.length >= second.length
first.zip(second)
else
second.zip(first).map(&:reverse)
end.flatten(1).uniq.compact.take(BATCH_SIZE)
end
def schedule_jobs
num_to_schedule = [MAX_CAPACITY - scheduled_job_ids.size, @pending_projects.size].min
return unless projects_remain?
num_to_schedule.times do
project_id = @pending_projects.shift
job_id = Geo::ProjectSyncWorker.perform_in(BACKOFF_DELAY, project_id, Time.now)
@scheduled_jobs << { id: project_id, job_id: job_id } if job_id
end
end
def scheduled_job_ids
@scheduled_jobs.map { |data| data[:job_id] }
end
def update_jobs_in_progress
status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids)
# SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise.
# For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, completed)| job if completed }.compact
end
def try_obtain_lease
lease = exclusive_lease.try_obtain
unless lease
logger.info "Cannot obtain an exclusive lease. There must be another worker already in execution."
return
end
begin
yield lease
ensure
release_lease(lease)
end
end
def exclusive_lease
@lease ||= Gitlab::ExclusiveLease.new(LEASE_KEY, timeout: LEASE_TIMEOUT)
end
def renew_lease!
exclusive_lease.renew
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(LEASE_KEY, uuid)
end
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && @last_enabled_check > 1.minute.ago
@last_enabled_check = Time.now
@current_node_enabled = nil
end
@current_node_enabled ||= Gitlab::Geo.current_node_enabled?
end
end
......@@ -7,6 +7,8 @@ describe GeoFileDownloadDispatchWorker do
allow(Gitlab::Geo).to receive(:secondary?).and_return(true)
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:renew).and_return(true)
WebMock.stub_request(:get, /primary-geo-node/).to_return(status: 200, body: "", headers: {})
end
......@@ -48,8 +50,8 @@ describe GeoFileDownloadDispatchWorker do
# 1. A total of 8 files in the queue, and we can load a maximimum of 5 and send 2 at a time.
# 2. We send 2, wait for 1 to finish, and then send again.
it 'attempts to load a new batch without pending downloads' do
stub_const('GeoFileDownloadDispatchWorker::DB_RETRIEVE_BATCH', 5)
stub_const('GeoFileDownloadDispatchWorker::MAX_CONCURRENT_DOWNLOADS', 2)
stub_const('GeoFileDownloadDispatchWorker::DB_RETRIEVE_BATCH_SIZE', 5)
stub_const('GeoFileDownloadDispatchWorker::MAX_CAPACITY', 2)
avatar = fixture_file_upload(Rails.root.join('spec/fixtures/dk.png'))
create_list(:lfs_object, 2, :with_file)
......@@ -65,7 +67,7 @@ describe GeoFileDownloadDispatchWorker do
# 2. 4 get sent out, 1 remains. This triggers another reload, which loads in the remaining 4.
# 3. Since the second reload filled the pipe with 4, we need to do a final reload to ensure
# zero are left.
expect(subject).to receive(:load_pending_downloads).exactly(3).times.and_call_original
expect(subject).to receive(:load_pending_resources).exactly(3).times.and_call_original
Sidekiq::Testing.inline! do
subject.perform
......
......@@ -11,6 +11,7 @@ describe GeoRepositorySyncWorker do
describe '#perform' do
before do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true }
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:renew) { true }
end
it 'performs Geo::ProjectSyncWorker for each project' do
......@@ -46,8 +47,8 @@ describe GeoRepositorySyncWorker do
subject.perform
end
it 'does not perform Geo::ProjectSyncWorker when primary node does not exists' do
allow(Gitlab::Geo).to receive(:primary_node) { nil }
it 'does not perform Geo::ProjectSyncWorker when not running on a secondary' do
allow(Gitlab::Geo).to receive(:secondary?) { false }
expect(Geo::ProjectSyncWorker).not_to receive(:perform_in)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment