Commit e417781f authored by Douwe Maan's avatar Douwe Maan

Queue mirror update jobs in batches and wait for queue to drain

parent 815ebc83
# frozen_string_literal: true
require 'sidekiq/api'
Sidekiq::Worker.extend ActiveSupport::Concern
module ApplicationWorker
......@@ -44,6 +46,10 @@ module ApplicationWorker
get_sidekiq_options['queue'].to_s
end
def queue_size
Sidekiq::Queue.new(queue).size
end
def bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
end
......
......@@ -7,7 +7,7 @@ class UpdateAllMirrorsWorker
LEASE_TIMEOUT = 5.minutes
SCHEDULE_WAIT_TIMEOUT = 4.minutes
LEASE_KEY = 'update_all_mirrors'.freeze
RESCHEDULE_WAIT = 10.seconds
RESCHEDULE_WAIT = 1.second
def perform
return if Gitlab::Database.read_only?
......@@ -37,7 +37,6 @@ class UpdateAllMirrorsWorker
# can't end up in an infinite loop
now = Time.now
last = nil
all_project_ids = []
while capacity > 0
batch_size = [capacity * 2, 500].min
......@@ -47,7 +46,7 @@ class UpdateAllMirrorsWorker
project_ids = projects.lazy.select(&:mirror?).take(capacity).map(&:id).force
capacity -= project_ids.length
all_project_ids.concat(project_ids)
ProjectImportScheduleWorker.bulk_perform_async(project_ids.map { |id| [id] })
# If fewer than `batch_size` projects were returned, we don't need to query again
break if projects.length < batch_size
......@@ -55,7 +54,9 @@ class UpdateAllMirrorsWorker
last = projects.last.import_state.next_execution_timestamp
end
ProjectImportScheduleWorker.bulk_perform_and_wait(all_project_ids.map { |id| [id] }, timeout: SCHEDULE_WAIT_TIMEOUT.to_i)
# Wait for all ProjectImportScheduleWorker jobs to complete
deadline = Time.now + SCHEDULE_WAIT_TIMEOUT
sleep 1 while ProjectImportScheduleWorker.queue_size > 0 && Time.now < deadline
end
# rubocop: enable CodeReuse/ActiveRecord
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment