Commit 5bd51666 authored by Douwe Maan's avatar Douwe Maan

Schedule mirror updates in parallel

parent d60a6d2f
......@@ -137,6 +137,7 @@
- elastic_indexer
- export_csv
- ldap_group_sync
- project_import_schedule
- project_update_repository_storage
- rebase
- repository_update_mirror
......
......@@ -75,6 +75,7 @@
- [repository_remove_remote, 1]
- [repository_update_mirror, 1]
- [repository_update_remote_mirror, 1]
- [project_import_schedule, 1]
- [project_update_repository_storage, 1]
- [admin_emails, 1]
- [elastic_batch_project_indexer, 1]
......
......@@ -490,7 +490,7 @@ module EE
def load_licensed_feature_available(feature)
globally_available = License.feature_available?(feature)
if namespace && ::Gitlab::CurrentSettings.should_check_namespace_plan?
if ::Gitlab::CurrentSettings.should_check_namespace_plan? && namespace
globally_available &&
(public? && namespace.public? || namespace.feature_available_in_plan?(feature))
else
......
class ProjectImportScheduleWorker
include ApplicationWorker
prepend WaitableWorker
def perform(project_id)
project = Project.find_by(id: project_id)
project&.import_schedule
end
end
......@@ -3,6 +3,7 @@ class UpdateAllMirrorsWorker
include CronjobQueue
LEASE_TIMEOUT = 5.minutes
SCHEDULE_WAIT_TIMEOUT = 4.minutes
LEASE_KEY = 'update_all_mirrors'.freeze
def perform
......@@ -15,29 +16,31 @@ class UpdateAllMirrorsWorker
end
def schedule_mirrors!
capacity = batch_size = Gitlab::Mirror.available_capacity
capacity = Gitlab::Mirror.available_capacity
# Ignore mirrors that become due for scheduling once work begins, so we
# can't end up in an infinite loop
now = Time.now
last = nil
all_project_ids = []
# Normally, this will complete in 1-2 batches. One batch will be added per
# `batch_size` unlicensed projects in the database.
while capacity > 0
projects = pull_mirrors_batch(freeze_at: now, batch_size: batch_size, offset_at: last)
batch_size = [capacity * 2, 500].min
projects = pull_mirrors_batch(freeze_at: now, batch_size: batch_size, offset_at: last).to_a
break if projects.empty?
last = projects.last.mirror_data.next_execution_timestamp
project_ids = projects.lazy.select(&:mirror?).take(capacity).map(&:id).force
capacity -= project_ids.length
all_project_ids.concat(project_ids)
projects.each do |project|
next unless project.mirror?
# If fewer than `batch_size` projects were returned, we don't need to query again
break if projects.length < batch_size
capacity -= 1
project.import_schedule
break unless capacity > 0
end
last = projects.last.mirror_data.next_execution_timestamp
end
ProjectImportScheduleWorker.bulk_perform_and_wait(all_project_ids.map { |id| [id] }, timeout: SCHEDULE_WAIT_TIMEOUT.to_i)
end
private
......@@ -51,7 +54,11 @@ class UpdateAllMirrorsWorker
end
def pull_mirrors_batch(freeze_at:, batch_size:, offset_at: nil)
relation = Project.mirrors_to_sync(freeze_at).reorder('project_mirror_data.next_execution_timestamp').limit(batch_size)
relation = Project
.mirrors_to_sync(freeze_at)
.reorder('project_mirror_data.next_execution_timestamp')
.limit(batch_size)
.includes(:namespace) # Used by `project.mirror?`
relation = relation.where('next_execution_timestamp > ?', offset_at) if offset_at
......
---
title: Schedule mirror updates in parallel
merge_request:
author:
type: changed
......@@ -13,7 +13,7 @@ describe UpdateAllMirrorsWorker do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(false)
expect(worker).not_to receive(:fail_stuck_mirrors!)
expect(worker).not_to receive(:schedule_mirrors!)
worker.perform
end
......@@ -29,7 +29,9 @@ describe UpdateAllMirrorsWorker do
def schedule_mirrors!(capacity:)
allow(Gitlab::Mirror).to receive_messages(available_capacity: capacity)
Sidekiq::Testing.fake! do
allow_any_instance_of(RepositoryImportWorker).to receive(:perform)
Sidekiq::Testing.inline! do
worker.schedule_mirrors!
end
end
......@@ -74,29 +76,82 @@ describe UpdateAllMirrorsWorker do
allow(Gitlab).to receive_messages(com?: true)
end
let!(:unlicensed_project) { scheduled_mirror(at: 4.weeks.ago, licensed: false) }
let!(:earliest_project) { scheduled_mirror(at: 3.weeks.ago, licensed: true) }
let!(:latest_project) { scheduled_mirror(at: 2.weeks.ago, licensed: true) }
let!(:unlicensed_project1) { scheduled_mirror(at: 8.weeks.ago, licensed: false) }
let!(:unlicensed_project2) { scheduled_mirror(at: 7.weeks.ago, licensed: false) }
let!(:licensed_project1) { scheduled_mirror(at: 6.weeks.ago, licensed: true) }
let!(:unlicensed_project3) { scheduled_mirror(at: 5.weeks.ago, licensed: false) }
let!(:licensed_project2) { scheduled_mirror(at: 4.weeks.ago, licensed: true) }
let!(:unlicensed_project4) { scheduled_mirror(at: 3.weeks.ago, licensed: false) }
let!(:licensed_project3) { scheduled_mirror(at: 1.week.ago, licensed: true) }
let(:unlicensed_projects) { [unlicensed_project1, unlicensed_project2, unlicensed_project3, unlicensed_project4] }
context 'when capacity is in excess' do
it "schedules all available mirrors" do
schedule_mirrors!(capacity: 4)
expect_import_scheduled(licensed_project1, licensed_project2, licensed_project3)
expect_import_not_scheduled(*unlicensed_projects)
end
it "schedules all available mirrors when capacity is in excess" do
schedule_mirrors!(capacity: 3)
it 'requests as many batches as necessary' do
# The first batch will only contain 3 licensed mirrors, but since we have
# fewer than 8 mirrors in total, there's no need to request another batch
expect(subject).to receive(:pull_mirrors_batch).with(hash_including(batch_size: 8)).and_call_original
expect_import_scheduled(earliest_project, latest_project)
expect_import_not_scheduled(unlicensed_project)
schedule_mirrors!(capacity: 4)
end
end
it "schedules all available mirrors when capacity is sufficient" do
schedule_mirrors!(capacity: 2)
context 'when capacity is exacly sufficient' do
it "schedules all available mirrors" do
schedule_mirrors!(capacity: 3)
expect_import_scheduled(earliest_project, latest_project)
expect_import_not_scheduled(unlicensed_project)
expect_import_scheduled(licensed_project1, licensed_project2, licensed_project3)
expect_import_not_scheduled(*unlicensed_projects)
end
it 'requests as many batches as necessary' do
# The first batch will only contain 2 licensed mirrors, so we need to request another batch
expect(subject).to receive(:pull_mirrors_batch).with(hash_including(batch_size: 6)).ordered.and_call_original
expect(subject).to receive(:pull_mirrors_batch).with(hash_including(batch_size: 2)).ordered.and_call_original
schedule_mirrors!(capacity: 3)
end
end
it 'schedules mirrors by next_execution_timestamp when capacity is insufficient' do
schedule_mirrors!(capacity: 1)
context 'when capacity is insufficient' do
it 'schedules mirrors by next_execution_timestamp' do
schedule_mirrors!(capacity: 2)
expect_import_scheduled(licensed_project1, licensed_project2)
expect_import_not_scheduled(*unlicensed_projects, licensed_project3)
end
it 'requests as many batches as necessary' do
# The first batch will only contain 1 licensed mirror, so we need to request another batch
expect(subject).to receive(:pull_mirrors_batch).with(hash_including(batch_size: 4)).ordered.and_call_original
expect(subject).to receive(:pull_mirrors_batch).with(hash_including(batch_size: 2)).ordered.and_call_original
schedule_mirrors!(capacity: 2)
end
end
context 'when capacity is insufficient and the first batch is empty' do
it 'schedules mirrors by next_execution_timestamp' do
schedule_mirrors!(capacity: 1)
expect_import_scheduled(licensed_project1)
expect_import_not_scheduled(*unlicensed_projects, licensed_project2, licensed_project3)
end
it 'requests as many batches as necessary' do
# The first batch will not contain any licensed mirrors, so we need to request another batch
expect(subject).to receive(:pull_mirrors_batch).with(hash_including(batch_size: 2)).ordered.and_call_original
expect(subject).to receive(:pull_mirrors_batch).with(hash_including(batch_size: 2)).ordered.and_call_original
expect_import_scheduled(earliest_project)
expect_import_not_scheduled(unlicensed_project, latest_project)
schedule_mirrors!(capacity: 1)
end
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment