Commit 79795b40 authored by Vitali Tatarintev's avatar Vitali Tatarintev

Merge branch 'qmnguyen0711/implement-job-tracker-for-gitlab-mirrors' into 'master'

Implement Gitlab mirror scheduling tracker to make UpdateAllMirrorsWorker independent from Sidekiq queue sizes

See merge request gitlab-org/gitlab!81249
parents 69a3cc7d b1194aa8
---
name: update_all_mirrors_job_tracker
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/79097
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/351420
milestone: '14.8'
name: mirror_scheduling_tracking
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/81249
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/353440
milestone: '14.9'
type: development
group: group::scalability
default_enabled: false
---
name: project_import_schedule_worker_job_tracker
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/79097
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/351408
milestone: '14.8'
type: development
group: group::scalability
default_enabled: false
......@@ -4,10 +4,6 @@ class ProjectImportScheduleWorker
ImportStateNotFound = Class.new(StandardError)
include ApplicationWorker
# At the moment, this inclusion is to enable job tracking ability. In the
# future, the capacity management should be moved to this worker instead of
# UpdateAllMirrorsWorker
include LimitedCapacity::Worker
data_consistency :always
prepend WaitableWorker
......@@ -25,7 +21,7 @@ class ProjectImportScheduleWorker
tags :needs_own_queue
def perform(project_id)
job_tracker.register(jid, capacity) if job_tracking?
::Gitlab::Mirror.untrack_scheduling(project_id) if scheduling_tracking_enabled?
return if Gitlab::Database.read_only?
......@@ -35,17 +31,11 @@ class ProjectImportScheduleWorker
with_context(project: project) do
project.import_state.schedule
end
ensure
job_tracker.remove(jid) if job_tracking?
end
private
def capacity
Gitlab::Mirror.available_capacity
end
def job_tracking?
Feature.enabled?(:project_import_schedule_worker_job_tracker, default_enabled: :yaml)
def scheduling_tracking_enabled?
Feature.enabled?(:mirror_scheduling_tracking, default_enabled: :yaml)
end
end
......@@ -9,7 +9,7 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
data_consistency :sticky
LEASE_TIMEOUT = 5.minutes
SCHEDULE_WAIT_TIMEOUT = 4.minutes
SCHEDULE_WAIT_TIMEOUT = 2.minutes
LEASE_KEY = 'update_all_mirrors'
RESCHEDULE_WAIT = 1.second
......@@ -18,14 +18,12 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
scheduled = 0
with_lease do
clean_project_import_jobs_tracking
scheduled = schedule_mirrors!
if scheduled > 0
# Wait for all ProjectImportScheduleWorker jobs to complete
# Wait for all ProjectImportScheduleWorker jobs to be picked up
deadline = Time.current + SCHEDULE_WAIT_TIMEOUT
sleep 1 while pending_project_import_jobs? && Time.current < deadline
sleep 1 while pending_project_import_scheduling? && Time.current < deadline
end
end
......@@ -44,6 +42,16 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
# rubocop: disable CodeReuse/ActiveRecord
def schedule_mirrors!
# Clean up mirror scheduling counter before schedule mirrors. After this job is executed, there are some cases:
# - There are no projects to be scheduled, the job exits early, the counter is not used.
# - All projects transition to scheduled states. The counter must be equal to 0.
# - The timeout of 4 minutes is exceeded. In this case, another job will be
# rescheduled, regardless of the value of the counter.
# Therefore, the scheduling counter should reset the counter before entering
# the scheduling phase. In addition, this clean-up task prevents a project
# id from being stuck in the list forever.
::Gitlab::Mirror.reset_scheduling if scheduling_tracking_enabled?
capacity = Gitlab::Mirror.available_capacity
# Ignore mirrors that become due for scheduling once work begins, so we
......@@ -133,6 +141,11 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
# rubocop: enable CodeReuse/ActiveRecord
def schedule_projects_in_batch(projects)
return if projects.empty?
# projects were materialized at this stage
::Gitlab::Mirror.track_scheduling(projects.map(&:id)) if scheduling_tracking_enabled?
ProjectImportScheduleWorker.bulk_perform_async_with_contexts(
projects,
arguments_proc: -> (project) { project.id },
......@@ -154,25 +167,15 @@ class UpdateAllMirrorsWorker # rubocop:disable Scalability/IdempotentWorker
end
# rubocop: enable CodeReuse/ActiveRecord
def job_tracker
@job_tracker ||= LimitedCapacity::JobTracker.new(ProjectImportScheduleWorker.name)
end
def pending_project_import_jobs?
if job_tracker_enabled?
job_tracker.count > 0
def pending_project_import_scheduling?
if scheduling_tracking_enabled?
::Gitlab::Mirror.current_scheduling > 0
else
ProjectImportScheduleWorker.queue_size > 0
end
end
def clean_project_import_jobs_tracking
# Clean-up completed jobs with stale status
job_tracker.clean_up if job_tracker_enabled?
end
def job_tracker_enabled?
Feature.enabled?(:project_import_schedule_worker_job_tracker, default_enabled: :yaml) &&
Feature.enabled?(:update_all_mirrors_job_tracker, default_enabled: :yaml)
def scheduling_tracking_enabled?
Feature.enabled?(:mirror_scheduling_tracking, default_enabled: :yaml)
end
end
......@@ -5,6 +5,7 @@ module Gitlab
# Runs scheduler every minute
SCHEDULER_CRON = '* * * * *'
PULL_CAPACITY_KEY = 'MIRROR_PULL_CAPACITY'
SCHEDULING_TRACKING_KEY = 'MIRROR_SCHEDULING_TRACKING'
JITTER = 1.minute
# TODO: Dynamically determine mirror update interval based on total number
......@@ -77,6 +78,22 @@ module Gitlab
Gitlab::CurrentSettings.mirror_capacity_threshold
end
def track_scheduling(project_ids)
Gitlab::Redis::SharedState.with { |redis| redis.sadd(SCHEDULING_TRACKING_KEY, project_ids) }
end
def untrack_scheduling(project_id)
Gitlab::Redis::SharedState.with { |redis| redis.srem(SCHEDULING_TRACKING_KEY, project_id) }
end
def reset_scheduling
Gitlab::Redis::SharedState.with { |redis| redis.del(SCHEDULING_TRACKING_KEY) }
end
def current_scheduling
Gitlab::Redis::SharedState.with { |redis| redis.scard(SCHEDULING_TRACKING_KEY) }.to_i
end
private
def update_all_mirrors_cron_job
......
......@@ -52,7 +52,7 @@ RSpec.describe Gitlab::Mirror do
end
end
describe '#max_mirror_capacity_reached?' do
describe '#max_mirror_capacity_reached?', :clean_gitlab_redis_shared_state do
it 'returns true if available capacity is 0' do
expect(described_class).to receive(:available_capacity).and_return(0)
......@@ -64,10 +64,6 @@ RSpec.describe Gitlab::Mirror do
expect(described_class.max_mirror_capacity_reached?).to eq(false)
end
after do
Gitlab::Redis::SharedState.with { |redis| redis.del(Gitlab::Mirror::PULL_CAPACITY_KEY) }
end
end
describe '#reschedule_immediately?' do
......@@ -94,7 +90,7 @@ RSpec.describe Gitlab::Mirror do
end
end
describe '#available_capacity' do
describe '#available_capacity', :clean_gitlab_redis_shared_state do
context 'when redis key does not exist' do
it 'returns mirror_max_capacity' do
expect(described_class.available_capacity).to eq(Gitlab::CurrentSettings.mirror_max_capacity)
......@@ -114,25 +110,17 @@ RSpec.describe Gitlab::Mirror do
expect(described_class.available_capacity).to eq(Gitlab::CurrentSettings.mirror_max_capacity - current_capacity)
end
end
after do
Gitlab::Redis::SharedState.with { |redis| redis.del(Gitlab::Mirror::PULL_CAPACITY_KEY) }
end
end
describe '#increment_capacity' do
describe '#increment_capacity', :clean_gitlab_redis_shared_state do
it 'increments capacity' do
max_capacity = Gitlab::CurrentSettings.mirror_max_capacity
expect { described_class.increment_capacity(1) }.to change { described_class.available_capacity }.from(max_capacity).to(max_capacity - 1)
end
after do
Gitlab::Redis::SharedState.with { |redis| redis.del(Gitlab::Mirror::PULL_CAPACITY_KEY) }
end
end
describe '#decrement_capacity' do
describe '#decrement_capacity', :clean_gitlab_redis_shared_state do
let!(:id) { 1 }
context 'with capacity above 0' do
......@@ -150,9 +138,57 @@ RSpec.describe Gitlab::Mirror do
expect { described_class.decrement_capacity(id) }.not_to change { described_class.available_capacity }
end
end
end
describe '#track_scheduling', :clean_gitlab_redis_shared_state do
it 'increments current scheduling counter' do
expect { described_class.track_scheduling([1, 2, 3, 4]) }.to change { described_class.current_scheduling }.from(0).to(4)
end
it 'excludes existing ids from existing counter' do
described_class.track_scheduling([1, 2, 3])
expect { described_class.track_scheduling([1, 2, 3, 4]) }.to change { described_class.current_scheduling }.from(3).to(4)
end
end
after do
Gitlab::Redis::SharedState.with { |redis| redis.del(Gitlab::Mirror::PULL_CAPACITY_KEY) }
describe '#untrack_scheduling', :clean_gitlab_redis_shared_state do
context 'with scheduling counter above 0' do
before do
described_class.track_scheduling([1, 2, 3])
end
it 'decrements scheduling counter' do
expect { described_class.untrack_scheduling(1) }.to change { described_class.current_scheduling }.from(3).to(2)
end
it 'does not decrement scheduling counter for non-existant id' do
expect { described_class.untrack_scheduling(5) }.not_to change { described_class.current_scheduling }
end
end
context 'with scheduling counter equal to 0' do
it 'does not decrement scheduling counter' do
expect { described_class.untrack_scheduling(1) }.not_to change { described_class.current_scheduling }
end
end
end
describe '#reset_scheduling', :clean_gitlab_redis_shared_state do
context 'with scheduling counter above 0' do
before do
described_class.track_scheduling([1, 2, 3])
end
it 'decrements scheduling counter to 0' do
expect { described_class.reset_scheduling }.to change { described_class.current_scheduling }.from(3).to(0)
end
end
context 'with scheduling counter equal to 0' do
it 'decrements scheduling counter to 0' do
expect { described_class.reset_scheduling }.not_to change { described_class.current_scheduling }
expect(described_class.current_scheduling).to eq(0)
end
end
end
......
......@@ -11,13 +11,9 @@ RSpec.describe ProjectImportScheduleWorker do
let(:job_args) { [project.id] }
let(:job_tracker_instance) { double(LimitedCapacity::JobTracker) }
before do
allow(Gitlab::Mirror).to receive(:available_capacity).and_return(5)
allow(LimitedCapacity::JobTracker).to receive(:new).with('ProjectImportScheduleWorker').and_return(job_tracker_instance)
allow(job_tracker_instance).to receive(:register)
allow(job_tracker_instance).to receive(:remove)
allow(Gitlab::Mirror).to receive(:untrack_scheduling).and_call_original
allow(Project).to receive(:find_by_id).with(project.id).and_return(project)
allow(project).to receive(:add_import_job)
......@@ -39,29 +35,27 @@ RSpec.describe ProjectImportScheduleWorker do
expect(import_state).to be_scheduled
end
context 'project_import_schedule_worker_job_tracker flag is enabled' do
context 'mirror_scheduling_tracking flag is enabled' do
before do
stub_feature_flags(project_import_schedule_worker_job_tracker: true)
stub_feature_flags(mirror_scheduling_tracking: true)
end
it 'tracks the status of the worker' do
subject
expect(job_tracker_instance).to have_received(:register).with(any_args, 5).at_least(:once)
expect(job_tracker_instance).to have_received(:remove).with(any_args).at_least(:once)
expect(Gitlab::Mirror).to have_received(:untrack_scheduling).with(project.id).at_least(:once)
end
end
context 'project_import_schedule_worker_job_tracker flag is disabled' do
context 'mirror_scheduling_tracking flag is disabled' do
before do
stub_feature_flags(project_import_schedule_worker_job_tracker: false)
stub_feature_flags(mirror_scheduling_tracking: false)
end
it 'does not track the status of the worker' do
subject
expect(job_tracker_instance).not_to have_received(:register)
expect(job_tracker_instance).not_to have_received(:remove)
expect(Gitlab::Mirror).not_to have_received(:untrack_scheduling)
end
end
end
......
......@@ -56,37 +56,21 @@ RSpec.describe UpdateAllMirrorsWorker do
end
context 'when updates were scheduled' do
let(:job_tracker_instance) { double(LimitedCapacity::JobTracker) }
before do
allow(worker).to receive(:schedule_mirrors!).and_return(1)
count = 3
allow(Gitlab::Mirror).to receive(:current_scheduling) { |_| count -= 1 }
end
context 'job tracker flags are on' do
context 'mirror_scheduling_tracking flags is on' do
before do
stub_feature_flags(
project_import_schedule_worker_job_tracker: true,
update_all_mirrors_job_tracker: true
)
allow(LimitedCapacity::JobTracker).to receive(:new).with('ProjectImportScheduleWorker').and_return(job_tracker_instance)
count = 3
allow(job_tracker_instance).to receive(:clean_up)
allow(job_tracker_instance).to receive(:register)
allow(job_tracker_instance).to receive(:remove)
allow(job_tracker_instance).to receive(:count) { |_| count -= 1 }
stub_feature_flags(mirror_scheduling_tracking: true)
end
it 'waits until ProjectImportScheduleWorker job tracker returns 0' do
worker.perform
expect(job_tracker_instance).to have_received(:count).exactly(3).times
end
it 'cleans up finished ProjectImportScheduleWorker jobs' do
worker.perform
expect(job_tracker_instance).to have_received(:clean_up).once
expect(Gitlab::Mirror).to have_received(:current_scheduling).exactly(3).times
end
it 'sleeps a bit after scheduling mirrors' do
......@@ -96,12 +80,9 @@ RSpec.describe UpdateAllMirrorsWorker do
end
end
context 'any of job tracker flags is off' do
context 'mirror_scheduling_tracking flags is off' do
before do
stub_feature_flags(
project_import_schedule_worker_job_tracker: true,
update_all_mirrors_job_tracker: false
)
stub_feature_flags(mirror_scheduling_tracking: false)
count = 3
allow(ProjectImportScheduleWorker).to receive(:queue_size) { |_| count -= 1 }
end
......@@ -156,14 +137,6 @@ RSpec.describe UpdateAllMirrorsWorker do
worker.perform
end
it 'does not poll for ProjectImportScheduleWorker jobs to complete' do
expect_next_instance_of(LimitedCapacity::JobTracker) do |instance|
expect(instance).not_to receive(:count)
end
worker.perform
end
it 'does not wait' do
expect(worker).not_to receive(:sleep)
......@@ -172,7 +145,18 @@ RSpec.describe UpdateAllMirrorsWorker do
end
end
describe '#schedule_mirrors!' do
describe '#schedule_mirrors!', :clean_gitlab_redis_shared_state do
before do
# This tests the ability of this worker to clean the state before
# scheduling mirrors
Gitlab::Redis::SharedState.with do |redis|
redis.sadd(Gitlab::Mirror::SCHEDULING_TRACKING_KEY, [1, 2, 3])
end
allow(Gitlab::Mirror).to receive(:track_scheduling).and_call_original
allow(Gitlab::Mirror).to receive(:untrack_scheduling).and_call_original
end
def schedule_mirrors!(capacity:)
allow(Gitlab::Mirror).to receive_messages(available_capacity: capacity)
......@@ -195,6 +179,28 @@ RSpec.describe UpdateAllMirrorsWorker do
projects.each { |project| expect_import_status(project, 'none') }
end
def expect_mirror_scheduling_tracked(*project_batches)
# Expect that Gitlab::Mirror tracks the project IDs
project_batches.each do |project_batch|
expect(Gitlab::Mirror).to have_received(:track_scheduling).ordered.with(
match(project_batch.map(&:id))
)
end
# rubocop:disable Style/CombinableLoops
# And then each project is untracked individually when the status switched
# to scheduled. We need to loop these batches twice to ensure the
# ordering of the `track_scheduling` invocations don't mingle with the
# `untrack_scheduling` invocation.
project_batches.each do |project_batch|
project_batch.each do |project|
expect(Gitlab::Mirror).to have_received(:untrack_scheduling).with(project.id).at_least(:once)
end
end
# rubocop:enable Style/CombinableLoops
expect(::Gitlab::Mirror.current_scheduling).to eq(0)
end
context 'when the instance is unlicensed' do
it 'does not schedule when project does not have repository mirrors available' do
project = create(:project, :mirror)
......@@ -222,6 +228,8 @@ RSpec.describe UpdateAllMirrorsWorker do
schedule_mirrors!(capacity: 3)
expect_import_scheduled(project1, project2)
expect_mirror_scheduling_tracked([project1, project2])
end
end
end
......@@ -262,8 +270,10 @@ RSpec.describe UpdateAllMirrorsWorker do
it 'schedules all available mirrors' do
schedule_mirrors!(capacity: 4)
expect_import_scheduled(licensed_project1, licensed_project2, public_project)
expect_import_not_scheduled(*unlicensed_projects)
expect_import_scheduled(licensed_project1, licensed_project2, public_project)
expect_mirror_scheduling_tracked([licensed_project1, licensed_project2, public_project])
end
end
......@@ -290,6 +300,8 @@ RSpec.describe UpdateAllMirrorsWorker do
expect_import_scheduled(licensed_project1, licensed_project2, public_project)
expect_import_not_scheduled(*unlicensed_projects)
expect_mirror_scheduling_tracked([licensed_project1, licensed_project2, public_project])
end
it 'requests as many batches as necessary' do
......@@ -308,6 +320,8 @@ RSpec.describe UpdateAllMirrorsWorker do
expect_import_scheduled(licensed_project2, public_project)
expect_import_not_scheduled(licensed_project1)
expect_import_not_scheduled(*unlicensed_projects)
expect_mirror_scheduling_tracked([licensed_project2, public_project])
end
it "does not schedule a mirror of an pending_delete project" do
......@@ -318,6 +332,8 @@ RSpec.describe UpdateAllMirrorsWorker do
expect_import_scheduled(licensed_project2, public_project)
expect_import_not_scheduled(licensed_project1)
expect_import_not_scheduled(*unlicensed_projects)
expect_mirror_scheduling_tracked([licensed_project2, public_project])
end
end
......@@ -327,6 +343,8 @@ RSpec.describe UpdateAllMirrorsWorker do
expect_import_scheduled(licensed_project1, licensed_project2, public_project)
expect_import_not_scheduled(*unlicensed_projects)
expect_mirror_scheduling_tracked([licensed_project1, licensed_project2], [public_project])
end
it 'requests as many batches as necessary' do
......@@ -344,6 +362,8 @@ RSpec.describe UpdateAllMirrorsWorker do
expect_import_scheduled(licensed_project1, licensed_project2)
expect_import_not_scheduled(*unlicensed_projects, public_project)
expect_mirror_scheduling_tracked([licensed_project1], [licensed_project2])
end
it 'requests as many batches as necessary' do
......@@ -361,6 +381,8 @@ RSpec.describe UpdateAllMirrorsWorker do
expect_import_scheduled(licensed_project1)
expect_import_not_scheduled(*unlicensed_projects, licensed_project2, public_project)
expect_mirror_scheduling_tracked([licensed_project1])
end
it 'requests as many batches as necessary' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment