Move common constants to the Geo::BaseSchedulerWorker

parent 3e7632cf
...@@ -3,6 +3,11 @@ module Geo ...@@ -3,6 +3,11 @@ module Geo
include Sidekiq::Worker include Sidekiq::Worker
include CronjobQueue include CronjobQueue
DB_RETRIEVE_BATCH_SIZE = 1000
LEASE_TIMEOUT = 10.minutes
MAX_CAPACITY = 10
RUN_TIME = 60.minutes.to_i
def initialize def initialize
@pending_resources = [] @pending_resources = []
@scheduled_jobs = [] @scheduled_jobs = []
...@@ -53,6 +58,22 @@ module Geo ...@@ -53,6 +58,22 @@ module Geo
private private
def db_retrieve_batch_size
DB_RETRIEVE_BATCH_SIZE
end
def lease_timeout
LEASE_TIMEOUT
end
def max_capacity
MAX_CAPACITY
end
def run_time
RUN_TIME
end
def reload_queue? def reload_queue?
@pending_resources.size < max_capacity @pending_resources.size < max_capacity
end end
......
class GeoFileDownloadDispatchWorker < Geo::BaseSchedulerWorker class GeoFileDownloadDispatchWorker < Geo::BaseSchedulerWorker
LEASE_KEY = 'geo_file_download_dispatch_worker'.freeze LEASE_KEY = 'geo_file_download_dispatch_worker'.freeze
LEASE_TIMEOUT = 10.minutes
DB_RETRIEVE_BATCH_SIZE = 1000
MAX_CAPACITY = 10
RUN_TIME = 60.minutes.to_i
private private
...@@ -11,24 +7,8 @@ class GeoFileDownloadDispatchWorker < Geo::BaseSchedulerWorker ...@@ -11,24 +7,8 @@ class GeoFileDownloadDispatchWorker < Geo::BaseSchedulerWorker
LEASE_KEY LEASE_KEY
end end
def lease_timeout
LEASE_TIMEOUT
end
def db_retrieve_batch_size
DB_RETRIEVE_BATCH_SIZE
end
def max_capacity
MAX_CAPACITY
end
def run_time
RUN_TIME
end
def schedule_jobs def schedule_jobs
num_to_schedule = [MAX_CAPACITY - scheduled_job_ids.size, @pending_resources.size].min num_to_schedule = [max_capacity - scheduled_job_ids.size, @pending_resources.size].min
return unless resources_remain? return unless resources_remain?
...@@ -43,8 +23,8 @@ class GeoFileDownloadDispatchWorker < Geo::BaseSchedulerWorker ...@@ -43,8 +23,8 @@ class GeoFileDownloadDispatchWorker < Geo::BaseSchedulerWorker
end end
def load_pending_resources def load_pending_resources
lfs_object_ids = find_lfs_object_ids(DB_RETRIEVE_BATCH_SIZE) lfs_object_ids = find_lfs_object_ids(db_retrieve_batch_size)
objects_ids = find_object_ids(DB_RETRIEVE_BATCH_SIZE) objects_ids = find_object_ids(db_retrieve_batch_size)
@pending_resources = interleave(lfs_object_ids, objects_ids) @pending_resources = interleave(lfs_object_ids, objects_ids)
end end
......
class GeoRepositorySyncWorker < Geo::BaseSchedulerWorker class GeoRepositorySyncWorker < Geo::BaseSchedulerWorker
LEASE_KEY = 'geo_repository_sync_worker'.freeze LEASE_KEY = 'geo_repository_sync_worker'.freeze
LEASE_TIMEOUT = 10.minutes
DB_RETRIEVE_BATCH_SIZE = 1000
MAX_CAPACITY = 25
RUN_TIME = 60.minutes.to_i
BACKOFF_DELAY = 5.minutes BACKOFF_DELAY = 5.minutes
MAX_CAPACITY = 25
private private
...@@ -13,24 +9,12 @@ class GeoRepositorySyncWorker < Geo::BaseSchedulerWorker ...@@ -13,24 +9,12 @@ class GeoRepositorySyncWorker < Geo::BaseSchedulerWorker
LEASE_KEY LEASE_KEY
end end
def lease_timeout
LEASE_TIMEOUT
end
def db_retrieve_batch_size
DB_RETRIEVE_BATCH_SIZE
end
def max_capacity def max_capacity
MAX_CAPACITY MAX_CAPACITY
end end
def run_time
RUN_TIME
end
def schedule_jobs def schedule_jobs
num_to_schedule = [MAX_CAPACITY - scheduled_job_ids.size, @pending_resources.size].min num_to_schedule = [max_capacity - scheduled_job_ids.size, @pending_resources.size].min
return unless resources_remain? return unless resources_remain?
...@@ -52,14 +36,14 @@ class GeoRepositorySyncWorker < Geo::BaseSchedulerWorker ...@@ -52,14 +36,14 @@ class GeoRepositorySyncWorker < Geo::BaseSchedulerWorker
def find_project_ids_not_synced def find_project_ids_not_synced
Project.where.not(id: Geo::ProjectRegistry.synced.pluck(:project_id)) Project.where.not(id: Geo::ProjectRegistry.synced.pluck(:project_id))
.order(last_repository_updated_at: :desc) .order(last_repository_updated_at: :desc)
.limit(DB_RETRIEVE_BATCH_SIZE) .limit(db_retrieve_batch_size)
.pluck(:id) .pluck(:id)
end end
def find_project_ids_updated_recently def find_project_ids_updated_recently
Geo::ProjectRegistry.dirty Geo::ProjectRegistry.dirty
.order(Gitlab::Database.nulls_first_order(:last_repository_synced_at, :desc)) .order(Gitlab::Database.nulls_first_order(:last_repository_synced_at, :desc))
.limit(DB_RETRIEVE_BATCH_SIZE) .limit(db_retrieve_batch_size)
.pluck(:project_id) .pluck(:project_id)
end end
end end
...@@ -50,8 +50,8 @@ describe GeoFileDownloadDispatchWorker do ...@@ -50,8 +50,8 @@ describe GeoFileDownloadDispatchWorker do
# 1. A total of 8 files in the queue, and we can load a maximimum of 5 and send 2 at a time. # 1. A total of 8 files in the queue, and we can load a maximimum of 5 and send 2 at a time.
# 2. We send 2, wait for 1 to finish, and then send again. # 2. We send 2, wait for 1 to finish, and then send again.
it 'attempts to load a new batch without pending downloads' do it 'attempts to load a new batch without pending downloads' do
stub_const('GeoFileDownloadDispatchWorker::DB_RETRIEVE_BATCH_SIZE', 5) stub_const('Geo::BaseSchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 5)
stub_const('GeoFileDownloadDispatchWorker::MAX_CAPACITY', 2) stub_const('Geo::BaseSchedulerWorker::MAX_CAPACITY', 2)
avatar = fixture_file_upload(Rails.root.join('spec/fixtures/dk.png')) avatar = fixture_file_upload(Rails.root.join('spec/fixtures/dk.png'))
create_list(:lfs_object, 2, :with_file) create_list(:lfs_object, 2, :with_file)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment