Commit 8160cfe7 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch '7593-geo-decrease-frequency-of-schedulers-when-few-items-to-schedule' into 'master'

Geo: Add a backoff time to few more workers

Closes #7593

See merge request gitlab-org/gitlab-ee!7470
parents 9ad19c53 dc3654f8
# frozen_string_literal: true
# Concern that sets the backoff delay to geo related workers
module GeoBackoffDelay
extend ActiveSupport::Concern
BACKOFF_TIME = 5.minutes
included do
def set_backoff_time!
Rails.cache.write(skip_cache_key, true, expires_in: BACKOFF_TIME)
end
def skip_cache_key
"#{self.class.name.underscore}:skip"
end
def should_be_skipped?
Rails.cache.read(skip_cache_key)
end
end
end
......@@ -4,32 +4,20 @@ module Geo
attr_accessor :shard_name
BACKOFF_TIME = 5.minutes
def perform(shard_name)
@shard_name = shard_name
return unless Gitlab::ShardHealthCache.healthy_shard?(shard_name)
return if should_be_skipped?
super()
end
private
def skip_shard_key
def skip_cache_key
"#{self.class.name.underscore}:shard:#{shard_name}:skip"
end
def should_be_skipped?
Rails.cache.read(skip_shard_key)
end
def set_backoff_time!
Rails.cache.write(skip_shard_key, true, expires_in: BACKOFF_TIME.minutes)
end
def worker_metadata
{ shard: shard_name }
end
......@@ -70,15 +58,11 @@ module Geo
resources = find_project_ids_not_synced(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.size
pending_resources = if remaining_capacity.zero?
resources
else
resources + find_project_ids_updated_recently(batch_size: remaining_capacity)
end
set_backoff_time! if pending_resources.empty?
pending_resources
if remaining_capacity.zero?
resources
else
resources + find_project_ids_updated_recently(batch_size: remaining_capacity)
end
end
# rubocop: disable CodeReuse/ActiveRecord
......
......@@ -21,6 +21,10 @@ module Geo
private
def skip_cache_key
"#{self.class.name.underscore}:shard:#{shard_name}:skip"
end
def worker_metadata
{ shard: shard_name }
end
......
......@@ -3,7 +3,6 @@ module Geo
module Secondary
class ShardWorker < Geo::Scheduler::Secondary::SchedulerWorker
include CronjobQueue
attr_accessor :shard_name
def perform(shard_name)
......@@ -20,6 +19,10 @@ module Geo
private
def skip_cache_key
"#{self.class.name.underscore}:shard:#{shard_name}:skip"
end
def worker_metadata
{ shard: shard_name }
end
......@@ -30,8 +33,7 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord
def load_pending_resources
finder.find_registries_to_verify(batch_size: db_retrieve_batch_size)
.pluck(:id)
finder.find_registries_to_verify(batch_size: db_retrieve_batch_size).pluck(:id)
end
# rubocop: enable CodeReuse/ActiveRecord
......
......@@ -6,6 +6,7 @@ module Geo
include ExclusiveLeaseGuard
include ::Gitlab::Geo::LogHelpers
include ::Gitlab::Utils::StrongMemoize
include GeoBackoffDelay
DB_RETRIEVE_BATCH_SIZE = 1000
LEASE_TIMEOUT = 60.minutes
......@@ -70,10 +71,6 @@ module Geo
private
def should_be_skipped?
false
end
# Subclasses should override this method to provide additional metadata
# in log messages
def worker_metadata
......@@ -166,7 +163,10 @@ module Geo
end
def update_pending_resources
@pending_resources = load_pending_resources if reload_queue?
if reload_queue?
@pending_resources = load_pending_resources
set_backoff_time! if @pending_resources.empty?
end
end
def schedule_jobs
......
---
title: 'Geo: Add a backoff time to few Geo workers to save resources'
merge_request: 7470
author:
type: added
......@@ -47,17 +47,15 @@ describe Geo::FileDownloadDispatchWorker, :geo do
end
context 'with attachments (Upload records)' do
it 'performs Geo::FileDownloadWorker for unsynced attachments' do
upload = create(:upload)
let(:upload) { create(:upload) }
it 'performs Geo::FileDownloadWorker for unsynced attachments' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', upload.id)
subject.perform
end
it 'performs Geo::FileDownloadWorker for failed-sync attachments' do
upload = create(:upload)
create(:geo_file_registry, :avatar, file_id: upload.id, bytes: 0, success: false)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
......@@ -67,8 +65,6 @@ describe Geo::FileDownloadDispatchWorker, :geo do
end
it 'does not perform Geo::FileDownloadWorker for synced attachments' do
upload = create(:upload)
create(:geo_file_registry, :avatar, file_id: upload.id, bytes: 1234, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
......@@ -77,8 +73,6 @@ describe Geo::FileDownloadDispatchWorker, :geo do
end
it 'does not perform Geo::FileDownloadWorker for synced attachments even with 0 bytes downloaded' do
upload = create(:upload)
create(:geo_file_registry, :avatar, file_id: upload.id, bytes: 0, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
......@@ -311,6 +305,26 @@ describe Geo::FileDownloadDispatchWorker, :geo do
end
end
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:skip" }
it 'sets the back off time when there are no pending items' do
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform
end
it 'does not perform Geo::FileDownloadWorker when the backoff time is set' do
create(:lfs_object, :with_file)
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
subject.perform
end
end
# Test the case where we have:
#
# 1. A total of 10 files in the queue, and we can load a maximimum of 5 and send 2 at a time.
......
......@@ -147,6 +147,30 @@ describe Geo::MigratedLocalFilesCleanUpWorker, :geo do
worker.perform
end
end
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:skip" }
before do
stub_uploads_object_storage(AvatarUploader)
end
it 'sets the back off time when there are no pending items' do
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform
end
it 'does not perform Geo::FileRegistryRemovalWorker when the backoff time is set' do
create(:geo_file_registry, :avatar)
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async)
subject.perform
end
end
end
# Disable transactions via :delete method because a foreign table
......
......@@ -10,7 +10,6 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach
let!(:secondary) { create(:geo_node) }
let(:shard_name) { Gitlab.config.repositories.storages.keys.first }
let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" }
before do
stub_current_geo_node(secondary)
......@@ -239,21 +238,25 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach
end
end
it 'sets the back off time when there no pending items' do
create(:geo_project_registry, :synced, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" }
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 18000).once
it 'sets the back off time when there are no pending items' do
create(:geo_project_registry, :synced, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
subject.perform(shard_name)
end
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform(shard_name)
end
it 'does not perform Geo::ProjectSyncWorker when the backoff time is set' do
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
it 'does not perform Geo::ProjectSyncWorker when the backoff time is set' do
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
subject.perform(shard_name)
end
end
end
......
......@@ -134,6 +134,27 @@ describe Geo::RepositoryVerification::Primary::ShardWorker, :postgresql, :clean_
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" }
it 'sets the back off time when there are no pending items' do
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform(shard_name)
end
it 'does not perform Geo::RepositoryVerification::Primary::SingleWorker when the backoff time is set' do
repository_outdated = create(:project)
create(:repository_state, :repository_outdated, project: repository_outdated)
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::RepositoryVerification::Primary::SingleWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
end
# test that jobs are always moving forward and we're not querying the same things
# over and over
describe 'resource loading' do
......
......@@ -171,5 +171,26 @@ describe Geo::RepositoryVerification::Secondary::ShardWorker, :postgresql, :clea
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" }
it 'sets the back off time when there are no pending items' do
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform(shard_name)
end
it 'does not perform Geo::RepositoryVerification::Secondary::SingleWorker when the backoff time is set' do
create(:repository_state, :repository_verified, project: project)
create(:geo_project_registry, :synced, :repository_verification_outdated, project: project)
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::RepositoryVerification::Secondary::SingleWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment