Commit b6e77ad5 authored by Sean McGivern's avatar Sean McGivern

Merge branch...

Merge branch 'qmnguyen0711/340630-make-repository-pull-mirroring-not-depend-on-sidekiq-queue-sizes' into 'master'

Make repository pull mirroring not depend on sidekiq queue sizes

See merge request gitlab-org/gitlab!79097
parents d03a58f3 ef61b0f0
---
name: project_import_schedule_worker_job_tracker
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/79097
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/351408
milestone: '14.8'
type: development
group: group::scalability
default_enabled: false
---
name: update_all_mirrors_job_tracker
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/79097
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/351420
milestone: '14.8'
type: development
group: group::scalability
default_enabled: false
...@@ -4,6 +4,10 @@ class ProjectImportScheduleWorker ...@@ -4,6 +4,10 @@ class ProjectImportScheduleWorker
ImportStateNotFound = Class.new(StandardError) ImportStateNotFound = Class.new(StandardError)
include ApplicationWorker include ApplicationWorker
# At the moment, this inclusion is to enable job tracking ability. In the
# future, the capacity management should be moved to this worker instead of
# UpdateAllMirrorsWorker
include LimitedCapacity::Worker
data_consistency :always data_consistency :always
prepend WaitableWorker prepend WaitableWorker
...@@ -21,6 +25,8 @@ class ProjectImportScheduleWorker ...@@ -21,6 +25,8 @@ class ProjectImportScheduleWorker
tags :needs_own_queue tags :needs_own_queue
def perform(project_id) def perform(project_id)
job_tracker.register(jid, capacity) if job_tracking?
return if Gitlab::Database.read_only? return if Gitlab::Database.read_only?
project = Project.with_route.with_import_state.with_namespace.find_by_id(project_id) project = Project.with_route.with_import_state.with_namespace.find_by_id(project_id)
...@@ -29,5 +35,17 @@ class ProjectImportScheduleWorker ...@@ -29,5 +35,17 @@ class ProjectImportScheduleWorker
with_context(project: project) do with_context(project: project) do
project.import_state.schedule project.import_state.schedule
end end
ensure
job_tracker.remove(jid) if job_tracking?
end
private
def capacity
Gitlab::Mirror.available_capacity
end
def job_tracking?
Feature.enabled?(:project_import_schedule_worker_job_tracker, default_enabled: :yaml)
end end
end end
...@@ -18,14 +18,22 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -18,14 +18,22 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
scheduled = 0 scheduled = 0
with_lease do with_lease do
clean_project_import_jobs_tracking
scheduled = schedule_mirrors! scheduled = schedule_mirrors!
if scheduled > 0
# Wait for all ProjectImportScheduleWorker jobs to complete
deadline = Time.current + SCHEDULE_WAIT_TIMEOUT
sleep 1 while pending_project_import_jobs? && Time.current < deadline
end
end end
# If we didn't get the lease, or no updates were scheduled, exit early # If we didn't get the lease, or no updates were scheduled, exit early
return unless scheduled > 0 return unless scheduled > 0
# Wait to give some jobs a chance to complete # Wait to give some jobs a chance to complete
Kernel.sleep(RESCHEDULE_WAIT) sleep(RESCHEDULE_WAIT)
# If there's capacity left now (some jobs completed), # If there's capacity left now (some jobs completed),
# reschedule this job to enqueue more work. # reschedule this job to enqueue more work.
...@@ -73,12 +81,6 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -73,12 +81,6 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
last = projects.last.import_state.next_execution_timestamp last = projects.last.import_state.next_execution_timestamp
end end
if scheduled > 0
# Wait for all ProjectImportScheduleWorker jobs to complete
deadline = Time.current + SCHEDULE_WAIT_TIMEOUT
sleep 1 while ProjectImportScheduleWorker.queue_size > 0 && Time.current < deadline
end
scheduled scheduled
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
...@@ -156,4 +158,26 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -156,4 +158,26 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
end.to_sql end.to_sql
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
def job_tracker
@job_tracker ||= LimitedCapacity::JobTracker.new(ProjectImportScheduleWorker.name)
end
def pending_project_import_jobs?
if job_tracker_enabled?
job_tracker.count > 0
else
ProjectImportScheduleWorker.queue_size > 0
end
end
def clean_project_import_jobs_tracking
# Clean-up completed jobs with stale status
job_tracker.clean_up if job_tracker_enabled?
end
def job_tracker_enabled?
Feature.enabled?(:project_import_schedule_worker_job_tracker, default_enabled: :yaml) &&
Feature.enabled?(:update_all_mirrors_job_tracker, default_enabled: :yaml)
end
end end
...@@ -11,7 +11,14 @@ RSpec.describe ProjectImportScheduleWorker do ...@@ -11,7 +11,14 @@ RSpec.describe ProjectImportScheduleWorker do
let(:job_args) { [project.id] } let(:job_args) { [project.id] }
let(:job_tracker_instance) { double(LimitedCapacity::JobTracker) }
before do before do
allow(Gitlab::Mirror).to receive(:available_capacity).and_return(5)
allow(LimitedCapacity::JobTracker).to receive(:new).with('ProjectImportScheduleWorker').and_return(job_tracker_instance)
allow(job_tracker_instance).to receive(:register)
allow(job_tracker_instance).to receive(:remove)
allow(Project).to receive(:find_by_id).with(project.id).and_return(project) allow(Project).to receive(:find_by_id).with(project.id).and_return(project)
allow(project).to receive(:add_import_job) allow(project).to receive(:add_import_job)
end end
...@@ -31,6 +38,32 @@ RSpec.describe ProjectImportScheduleWorker do ...@@ -31,6 +38,32 @@ RSpec.describe ProjectImportScheduleWorker do
expect(import_state).to be_scheduled expect(import_state).to be_scheduled
end end
context 'project_import_schedule_worker_job_tracker flag is enabled' do
before do
stub_feature_flags(project_import_schedule_worker_job_tracker: true)
end
it 'tracks the status of the worker' do
subject
expect(job_tracker_instance).to have_received(:register).with(any_args, 5).at_least(:once)
expect(job_tracker_instance).to have_received(:remove).with(any_args).at_least(:once)
end
end
context 'project_import_schedule_worker_job_tracker flag is disabled' do
before do
stub_feature_flags(project_import_schedule_worker_job_tracker: false)
end
it 'does not track the status of the worker' do
subject
expect(job_tracker_instance).not_to have_received(:register)
expect(job_tracker_instance).not_to have_received(:remove)
end
end
end end
context 'project is not found' do context 'project is not found' do
......
...@@ -56,15 +56,68 @@ RSpec.describe UpdateAllMirrorsWorker do ...@@ -56,15 +56,68 @@ RSpec.describe UpdateAllMirrorsWorker do
end end
context 'when updates were scheduled' do context 'when updates were scheduled' do
let(:job_tracker_instance) { double(LimitedCapacity::JobTracker) }
before do before do
allow(worker).to receive(:schedule_mirrors!).and_return(1) allow(worker).to receive(:schedule_mirrors!).and_return(1)
end end
context 'job tracker flags are on' do
before do
stub_feature_flags(
project_import_schedule_worker_job_tracker: true,
update_all_mirrors_job_tracker: true
)
allow(LimitedCapacity::JobTracker).to receive(:new).with('ProjectImportScheduleWorker').and_return(job_tracker_instance)
count = 3
allow(job_tracker_instance).to receive(:clean_up)
allow(job_tracker_instance).to receive(:register)
allow(job_tracker_instance).to receive(:remove)
allow(job_tracker_instance).to receive(:count) { |_| count -= 1 }
end
it 'waits until ProjectImportScheduleWorker job tracker returns 0' do
worker.perform
expect(job_tracker_instance).to have_received(:count).exactly(3).times
end
it 'cleans up finished ProjectImportScheduleWorker jobs' do
worker.perform
expect(job_tracker_instance).to have_received(:clean_up).once
end
it 'sleeps a bit after scheduling mirrors' do
expect(worker).to receive(:sleep).with(described_class::RESCHEDULE_WAIT).exactly(3).times
worker.perform
end
end
context 'any of job tracker flags is off' do
before do
stub_feature_flags(
project_import_schedule_worker_job_tracker: true,
update_all_mirrors_job_tracker: false
)
count = 3
allow(ProjectImportScheduleWorker).to receive(:queue_size) { |_| count -= 1 }
end
it 'waits until ProjectImportScheduleWorker jobs to complete' do
worker.perform
expect(ProjectImportScheduleWorker).to have_received(:queue_size).exactly(3).times
end
it 'sleeps a bit after scheduling mirrors' do it 'sleeps a bit after scheduling mirrors' do
expect(Kernel).to receive(:sleep).with(described_class::RESCHEDULE_WAIT) expect(worker).to receive(:sleep).with(described_class::RESCHEDULE_WAIT).exactly(3).times
worker.perform worker.perform
end end
end
context 'if capacity is available' do context 'if capacity is available' do
before do before do
...@@ -102,6 +155,20 @@ RSpec.describe UpdateAllMirrorsWorker do ...@@ -102,6 +155,20 @@ RSpec.describe UpdateAllMirrorsWorker do
worker.perform worker.perform
end end
it 'does not poll for ProjectImportScheduleWorker jobs to complete' do
expect_next_instance_of(LimitedCapacity::JobTracker) do |instance|
expect(instance).not_to receive(:count)
end
worker.perform
end
it 'does not wait' do
expect(worker).not_to receive(:sleep)
worker.perform
end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment