Commit 48dcb2b8 authored by Bob Van Landuyt's avatar Bob Van Landuyt

Merge branch 'dm-queue-mirror-jobs-in-batches-drain' into 'master'

Increase rate at which UpdateAllMirrorsWorker schedules jobs and reschedules itself

Closes #11874

See merge request gitlab-org/gitlab-ee!14573
parents 4492355a 3fb5c234
# frozen_string_literal: true
require 'sidekiq/api'
Sidekiq::Worker.extend ActiveSupport::Concern
module ApplicationWorker
......@@ -44,6 +46,10 @@ module ApplicationWorker
get_sidekiq_options['queue'].to_s
end
def queue_size
Sidekiq::Queue.new(queue).size
end
def bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
end
......
......@@ -7,17 +7,18 @@ class UpdateAllMirrorsWorker
LEASE_TIMEOUT = 5.minutes
SCHEDULE_WAIT_TIMEOUT = 4.minutes
LEASE_KEY = 'update_all_mirrors'.freeze
RESCHEDULE_WAIT = 10.seconds
RESCHEDULE_WAIT = 1.second
def perform
return if Gitlab::Database.read_only?
scheduling_ran = with_lease do
schedule_mirrors!
scheduled = 0
with_lease do
scheduled = schedule_mirrors!
end
# If we didn't get the lease, exit early
return unless scheduling_ran
# If we didn't get the lease, or no updates were scheduled, exit early
return unless scheduled > 0
# Wait to give some jobs a chance to complete
Kernel.sleep(RESCHEDULE_WAIT)
......@@ -26,7 +27,7 @@ class UpdateAllMirrorsWorker
# reschedule this job to enqueue more work.
#
# This is in addition to the regular (cron-like) scheduling of this job.
reschedule_if_capacity_left
UpdateAllMirrorsWorker.perform_async if Gitlab::Mirror.reschedule_immediately?
end
# rubocop: disable CodeReuse/ActiveRecord
......@@ -37,7 +38,7 @@ class UpdateAllMirrorsWorker
# can't end up in an infinite loop
now = Time.now
last = nil
all_project_ids = []
scheduled = 0
while capacity > 0
batch_size = [capacity * 2, 500].min
......@@ -47,7 +48,8 @@ class UpdateAllMirrorsWorker
project_ids = projects.lazy.select(&:mirror?).take(capacity).map(&:id).force
capacity -= project_ids.length
all_project_ids.concat(project_ids)
ProjectImportScheduleWorker.bulk_perform_async(project_ids.map { |id| [id] })
scheduled += project_ids.length
# If fewer than `batch_size` projects were returned, we don't need to query again
break if projects.length < batch_size
......@@ -55,18 +57,18 @@ class UpdateAllMirrorsWorker
last = projects.last.import_state.next_execution_timestamp
end
ProjectImportScheduleWorker.bulk_perform_and_wait(all_project_ids.map { |id| [id] }, timeout: SCHEDULE_WAIT_TIMEOUT.to_i)
if scheduled > 0
# Wait for all ProjectImportScheduleWorker jobs to complete
deadline = Time.now + SCHEDULE_WAIT_TIMEOUT
sleep 1 while ProjectImportScheduleWorker.queue_size > 0 && Time.now < deadline
end
scheduled
end
# rubocop: enable CodeReuse/ActiveRecord
private
def reschedule_if_capacity_left
return unless Gitlab::Mirror.reschedule_immediately?
UpdateAllMirrorsWorker.perform_async
end
def with_lease
if lease_uuid = try_obtain_lease
yield
......
---
title: Increase rate at which UpdateAllMirrorsWorker schedules jobs and reschedules itself
merge_request: 14573
author:
type: other
......@@ -33,15 +33,7 @@ module Gitlab
end
def reschedule_immediately?
available_spots = available_capacity
return false if available_spots < capacity_threshold
# Only reschedule if we are able to completely fill up the available spots.
mirrors_ready_to_sync_count(available_spots) >= available_spots
end
def mirrors_ready_to_sync_count(up_to = nil)
Project.mirrors_to_sync(Time.now, limit: up_to).count
available_capacity >= capacity_threshold
end
def available_capacity
......
......@@ -66,60 +66,24 @@ describe Gitlab::Mirror do
describe '#reschedule_immediately?' do
let(:mirror_capacity_threshold) { Gitlab::CurrentSettings.mirror_capacity_threshold }
context 'with number of mirrors to sync equal to the available capacity' do
it 'returns true if available capacity surpassed defined threshold' do
available_capacity = mirror_capacity_threshold + 1
expect(described_class).to receive(:available_capacity).and_return(available_capacity)
expect(described_class).to receive(:mirrors_ready_to_sync_count).and_return(available_capacity)
expect(described_class.reschedule_immediately?).to eq(true)
end
it 'returns true if available capacity is equal to the defined threshold' do
expect(described_class).to receive(:available_capacity).and_return(mirror_capacity_threshold)
expect(described_class).to receive(:mirrors_ready_to_sync_count).and_return(mirror_capacity_threshold)
expect(described_class.reschedule_immediately?).to eq(true)
end
end
context 'with number of mirrors to sync surpassing the available capacity' do
it 'returns true if available capacity surpassed defined threshold' do
available_capacity = mirror_capacity_threshold + 1
expect(described_class).to receive(:available_capacity).and_return(available_capacity)
expect(described_class).to receive(:mirrors_ready_to_sync_count).and_return(available_capacity + 1)
expect(described_class.reschedule_immediately?).to eq(true)
context 'when available capacity exceeds the defined threshold' do
before do
expect(described_class).to receive(:available_capacity).and_return(mirror_capacity_threshold + 1)
end
it 'returns true if available capacity is equal to the defined threshold' do
expect(described_class).to receive(:available_capacity).and_return(mirror_capacity_threshold)
expect(described_class).to receive(:mirrors_ready_to_sync_count).and_return(mirror_capacity_threshold + 1)
expect(described_class.reschedule_immediately?).to eq(true)
it 'returns true' do
expect(described_class.reschedule_immediately?).to be_truthy
end
end
it 'returns false if mirrors ready to sync is below the available capacity' do
expect(described_class).to receive(:available_capacity).and_return(mirror_capacity_threshold + 1)
expect(described_class).to receive(:mirrors_ready_to_sync_count).and_return(mirror_capacity_threshold)
expect(described_class.reschedule_immediately?).to eq(false)
context 'when the availabile capacity is lower than the defined threshold' do
before do
expect(described_class).to receive(:available_capacity).and_return(mirror_capacity_threshold - 1)
end
it 'returns false if available capacity is below the defined threshold' do
available_capacity = mirror_capacity_threshold - 1
expect(described_class).to receive(:available_capacity).and_return(available_capacity)
expect(described_class).not_to receive(:mirrors_ready_to_sync_count)
expect(described_class.reschedule_immediately?).to eq(false)
it 'returns false' do
expect(described_class.reschedule_immediately?).to be_falsey
end
after do
Gitlab::Redis::SharedState.with { |redis| redis.del(Gitlab::Mirror::PULL_CAPACITY_KEY) }
end
end
......
......@@ -26,33 +26,60 @@ describe UpdateAllMirrorsWorker do
end
it 'schedules mirrors' do
expect(worker).to receive(:schedule_mirrors!)
expect(worker).to receive(:schedule_mirrors!).and_call_original
worker.perform
end
context 'when updates were scheduled' do
before do
allow(worker).to receive(:schedule_mirrors!).and_return(1)
end
it 'sleeps a bit after scheduling mirrors' do
expect(Kernel).to receive(:sleep).with(described_class::RESCHEDULE_WAIT)
worker.perform
end
it 'reschedules the job if capacity is left' do
context 'if capacity is available' do
before do
allow(Gitlab::Mirror).to receive(:reschedule_immediately?).and_return(true)
end
it 'reschedules the job' do
expect(described_class).to receive(:perform_async)
worker.perform
end
end
context 'if no capacity is available' do
before do
allow(Gitlab::Mirror).to receive(:reschedule_immediately?).and_return(false)
end
it 'does not reschedule the job' do
expect(described_class).not_to receive(:perform_async)
worker.perform
end
end
end
it 'does not reschedule the job if no capacity left' do
context 'when no updates were scheduled' do
before do
allow(worker).to receive(:schedule_mirrors!).and_return(0)
allow(Gitlab::Mirror).to receive(:reschedule_immediately?).and_return(false)
end
it 'does not reschedule the job' do
expect(described_class).not_to receive(:perform_async)
worker.perform
end
end
end
describe '#schedule_mirrors!' do
def schedule_mirrors!(capacity:)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment