Refactor Geo::BaseSchedulerWorker#perform

parent 151cccb6
......@@ -8,6 +8,8 @@ module Geo
MAX_CAPACITY = 10
RUN_TIME = 60.minutes.to_i
attr_reader :pending_resources, :scheduled_jobs, :start_time
def initialize
@pending_resources = []
@scheduled_jobs = []
......@@ -35,7 +37,7 @@ module Geo
break unless node_enabled?
update_jobs_in_progress
load_pending_resources if reload_queue?
@pending_resources = load_pending_resources if reload_queue?
# If we are still under the limit after refreshing our DB, we can end
# after scheduling the remaining transfers.
......@@ -75,15 +77,15 @@ module Geo
end
def reload_queue?
@pending_resources.size < max_capacity
pending_resources.size < max_capacity
end
def resources_remain?
@pending_resources.size > 0
pending_resources.size > 0
end
def over_time?
Time.now - @start_time >= run_time
Time.now - start_time >= run_time
end
def interleave(first, second)
......@@ -104,7 +106,7 @@ module Geo
end
def scheduled_job_ids
@scheduled_jobs.map { |data| data[:job_id] }
scheduled_jobs.map { |data| data[:job_id] }
end
def try_obtain_lease
......
......@@ -8,12 +8,12 @@ class GeoFileDownloadDispatchWorker < Geo::BaseSchedulerWorker
end
def schedule_jobs
num_to_schedule = [max_capacity - scheduled_job_ids.size, @pending_resources.size].min
num_to_schedule = [max_capacity - scheduled_job_ids.size, pending_resources.size].min
return unless resources_remain?
num_to_schedule.times do
object_db_id, object_type = @pending_resources.shift
object_db_id, object_type = pending_resources.shift
job_id = GeoFileDownloadWorker.perform_async(object_type, object_db_id)
if job_id
......@@ -26,7 +26,7 @@ class GeoFileDownloadDispatchWorker < Geo::BaseSchedulerWorker
lfs_object_ids = find_lfs_object_ids(db_retrieve_batch_size)
objects_ids = find_object_ids(db_retrieve_batch_size)
@pending_resources = interleave(lfs_object_ids, objects_ids)
interleave(lfs_object_ids, objects_ids)
end
def find_object_ids(limit)
......@@ -55,6 +55,6 @@ class GeoFileDownloadDispatchWorker < Geo::BaseSchedulerWorker
end
def scheduled_file_ids(types)
@scheduled_jobs.select { |data| types.include?(data[:type]) }.map { |data| data[:id] }
scheduled_jobs.select { |data| types.include?(data[:type]) }.map { |data| data[:id] }
end
end
......@@ -14,15 +14,15 @@ class GeoRepositorySyncWorker < Geo::BaseSchedulerWorker
end
def schedule_jobs
num_to_schedule = [max_capacity - scheduled_job_ids.size, @pending_resources.size].min
num_to_schedule = [max_capacity - scheduled_job_ids.size, pending_resources.size].min
return unless resources_remain?
num_to_schedule.times do
project_id = @pending_resources.shift
project_id = pending_resources.shift
job_id = Geo::ProjectSyncWorker.perform_in(BACKOFF_DELAY, project_id, Time.now)
@scheduled_jobs << { id: project_id, job_id: job_id } if job_id
scheduled_jobs << { id: project_id, job_id: job_id } if job_id
end
end
......@@ -30,7 +30,7 @@ class GeoRepositorySyncWorker < Geo::BaseSchedulerWorker
project_ids_not_synced = find_project_ids_not_synced
project_ids_updated_recently = find_project_ids_updated_recently
@pending_resources = interleave(project_ids_not_synced, project_ids_updated_recently)
interleave(project_ids_not_synced, project_ids_updated_recently)
end
def find_project_ids_not_synced
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment