Commit dc3654f8 authored by Valery Sizov's avatar Valery Sizov

Geo: Add a backoff time to few more workers

parent c145774a
# frozen_string_literal: true
# Concern that sets the backoff delay to geo related workers
module GeoBackoffDelay
extend ActiveSupport::Concern
BACKOFF_TIME = 5.minutes
included do
def set_backoff_time!
Rails.cache.write(skip_cache_key, true, expires_in: BACKOFF_TIME)
end
def skip_cache_key
"#{self.class.name.underscore}:skip"
end
def should_be_skipped?
Rails.cache.read(skip_cache_key)
end
end
end
...@@ -4,32 +4,20 @@ module Geo ...@@ -4,32 +4,20 @@ module Geo
attr_accessor :shard_name attr_accessor :shard_name
BACKOFF_TIME = 5.minutes
def perform(shard_name) def perform(shard_name)
@shard_name = shard_name @shard_name = shard_name
return unless Gitlab::ShardHealthCache.healthy_shard?(shard_name) return unless Gitlab::ShardHealthCache.healthy_shard?(shard_name)
return if should_be_skipped?
super() super()
end end
private private
def skip_shard_key def skip_cache_key
"#{self.class.name.underscore}:shard:#{shard_name}:skip" "#{self.class.name.underscore}:shard:#{shard_name}:skip"
end end
def should_be_skipped?
Rails.cache.read(skip_shard_key)
end
def set_backoff_time!
Rails.cache.write(skip_shard_key, true, expires_in: BACKOFF_TIME.minutes)
end
def worker_metadata def worker_metadata
{ shard: shard_name } { shard: shard_name }
end end
...@@ -70,15 +58,11 @@ module Geo ...@@ -70,15 +58,11 @@ module Geo
resources = find_project_ids_not_synced(batch_size: db_retrieve_batch_size) resources = find_project_ids_not_synced(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.size remaining_capacity = db_retrieve_batch_size - resources.size
pending_resources = if remaining_capacity.zero? if remaining_capacity.zero?
resources resources
else else
resources + find_project_ids_updated_recently(batch_size: remaining_capacity) resources + find_project_ids_updated_recently(batch_size: remaining_capacity)
end end
set_backoff_time! if pending_resources.empty?
pending_resources
end end
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
......
...@@ -16,6 +16,10 @@ module Geo ...@@ -16,6 +16,10 @@ module Geo
private private
def skip_cache_key
"#{self.class.name.underscore}:shard:#{shard_name}:skip"
end
def worker_metadata def worker_metadata
{ shard: shard_name } { shard: shard_name }
end end
......
...@@ -3,7 +3,6 @@ module Geo ...@@ -3,7 +3,6 @@ module Geo
module Secondary module Secondary
class ShardWorker < Geo::Scheduler::Secondary::SchedulerWorker class ShardWorker < Geo::Scheduler::Secondary::SchedulerWorker
include CronjobQueue include CronjobQueue
attr_accessor :shard_name attr_accessor :shard_name
def perform(shard_name) def perform(shard_name)
...@@ -16,6 +15,10 @@ module Geo ...@@ -16,6 +15,10 @@ module Geo
private private
def skip_cache_key
"#{self.class.name.underscore}:shard:#{shard_name}:skip"
end
def worker_metadata def worker_metadata
{ shard: shard_name } { shard: shard_name }
end end
...@@ -26,8 +29,7 @@ module Geo ...@@ -26,8 +29,7 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def load_pending_resources def load_pending_resources
finder.find_registries_to_verify(batch_size: db_retrieve_batch_size) finder.find_registries_to_verify(batch_size: db_retrieve_batch_size).pluck(:id)
.pluck(:id)
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
......
...@@ -6,6 +6,7 @@ module Geo ...@@ -6,6 +6,7 @@ module Geo
include ExclusiveLeaseGuard include ExclusiveLeaseGuard
include ::Gitlab::Geo::LogHelpers include ::Gitlab::Geo::LogHelpers
include ::Gitlab::Utils::StrongMemoize include ::Gitlab::Utils::StrongMemoize
include GeoBackoffDelay
DB_RETRIEVE_BATCH_SIZE = 1000 DB_RETRIEVE_BATCH_SIZE = 1000
LEASE_TIMEOUT = 60.minutes LEASE_TIMEOUT = 60.minutes
...@@ -70,10 +71,6 @@ module Geo ...@@ -70,10 +71,6 @@ module Geo
private private
def should_be_skipped?
false
end
# Subclasses should override this method to provide additional metadata # Subclasses should override this method to provide additional metadata
# in log messages # in log messages
def worker_metadata def worker_metadata
...@@ -166,7 +163,10 @@ module Geo ...@@ -166,7 +163,10 @@ module Geo
end end
def update_pending_resources def update_pending_resources
@pending_resources = load_pending_resources if reload_queue? if reload_queue?
@pending_resources = load_pending_resources
set_backoff_time! if @pending_resources.empty?
end
end end
def schedule_jobs def schedule_jobs
......
---
title: 'Geo: Add a backoff time to few Geo workers to save resources'
merge_request: 7470
author:
type: added
...@@ -47,17 +47,15 @@ describe Geo::FileDownloadDispatchWorker, :geo do ...@@ -47,17 +47,15 @@ describe Geo::FileDownloadDispatchWorker, :geo do
end end
context 'with attachments (Upload records)' do context 'with attachments (Upload records)' do
it 'performs Geo::FileDownloadWorker for unsynced attachments' do let(:upload) { create(:upload) }
upload = create(:upload)
it 'performs Geo::FileDownloadWorker for unsynced attachments' do
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', upload.id) expect(Geo::FileDownloadWorker).to receive(:perform_async).with('avatar', upload.id)
subject.perform subject.perform
end end
it 'performs Geo::FileDownloadWorker for failed-sync attachments' do it 'performs Geo::FileDownloadWorker for failed-sync attachments' do
upload = create(:upload)
create(:geo_file_registry, :avatar, file_id: upload.id, bytes: 0, success: false) create(:geo_file_registry, :avatar, file_id: upload.id, bytes: 0, success: false)
expect(Geo::FileDownloadWorker).to receive(:perform_async) expect(Geo::FileDownloadWorker).to receive(:perform_async)
...@@ -67,8 +65,6 @@ describe Geo::FileDownloadDispatchWorker, :geo do ...@@ -67,8 +65,6 @@ describe Geo::FileDownloadDispatchWorker, :geo do
end end
it 'does not perform Geo::FileDownloadWorker for synced attachments' do it 'does not perform Geo::FileDownloadWorker for synced attachments' do
upload = create(:upload)
create(:geo_file_registry, :avatar, file_id: upload.id, bytes: 1234, success: true) create(:geo_file_registry, :avatar, file_id: upload.id, bytes: 1234, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async) expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
...@@ -77,8 +73,6 @@ describe Geo::FileDownloadDispatchWorker, :geo do ...@@ -77,8 +73,6 @@ describe Geo::FileDownloadDispatchWorker, :geo do
end end
it 'does not perform Geo::FileDownloadWorker for synced attachments even with 0 bytes downloaded' do it 'does not perform Geo::FileDownloadWorker for synced attachments even with 0 bytes downloaded' do
upload = create(:upload)
create(:geo_file_registry, :avatar, file_id: upload.id, bytes: 0, success: true) create(:geo_file_registry, :avatar, file_id: upload.id, bytes: 0, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async) expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
...@@ -311,6 +305,26 @@ describe Geo::FileDownloadDispatchWorker, :geo do ...@@ -311,6 +305,26 @@ describe Geo::FileDownloadDispatchWorker, :geo do
end end
end end
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:skip" }
it 'sets the back off time when there are no pending items' do
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform
end
it 'does not perform Geo::FileDownloadWorker when the backoff time is set' do
create(:lfs_object, :with_file)
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
subject.perform
end
end
# Test the case where we have: # Test the case where we have:
# #
# 1. A total of 10 files in the queue, and we can load a maximimum of 5 and send 2 at a time. # 1. A total of 10 files in the queue, and we can load a maximimum of 5 and send 2 at a time.
......
...@@ -147,6 +147,30 @@ describe Geo::MigratedLocalFilesCleanUpWorker, :geo do ...@@ -147,6 +147,30 @@ describe Geo::MigratedLocalFilesCleanUpWorker, :geo do
worker.perform worker.perform
end end
end end
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:skip" }
before do
stub_uploads_object_storage(AvatarUploader)
end
it 'sets the back off time when there are no pending items' do
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform
end
it 'does not perform Geo::FileRegistryRemovalWorker when the backoff time is set' do
create(:geo_file_registry, :avatar)
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async)
subject.perform
end
end
end end
# Disable transactions via :delete method because a foreign table # Disable transactions via :delete method because a foreign table
......
...@@ -10,7 +10,6 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach ...@@ -10,7 +10,6 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach
let!(:secondary) { create(:geo_node) } let!(:secondary) { create(:geo_node) }
let(:shard_name) { Gitlab.config.repositories.storages.keys.first } let(:shard_name) { Gitlab.config.repositories.storages.keys.first }
let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" }
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
...@@ -239,21 +238,25 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach ...@@ -239,21 +238,25 @@ describe Geo::RepositoryShardSyncWorker, :geo, :delete, :clean_gitlab_redis_cach
end end
end end
it 'sets the back off time when there no pending items' do context 'backoff time' do
create(:geo_project_registry, :synced, project: unsynced_project_in_restricted_group) let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" }
create(:geo_project_registry, :synced, project: unsynced_project)
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 18000).once it 'sets the back off time when there are no pending items' do
create(:geo_project_registry, :synced, project: unsynced_project_in_restricted_group)
create(:geo_project_registry, :synced, project: unsynced_project)
subject.perform(shard_name) expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
end
subject.perform(shard_name)
end
it 'does not perform Geo::ProjectSyncWorker when the backoff time is set' do it 'does not perform Geo::ProjectSyncWorker when the backoff time is set' do
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true) expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async) expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name) subject.perform(shard_name)
end
end end
end end
......
...@@ -126,6 +126,27 @@ describe Geo::RepositoryVerification::Primary::ShardWorker, :postgresql, :clean_ ...@@ -126,6 +126,27 @@ describe Geo::RepositoryVerification::Primary::ShardWorker, :postgresql, :clean_
Sidekiq::Testing.inline! { subject.perform(shard_name) } Sidekiq::Testing.inline! { subject.perform(shard_name) }
end end
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" }
it 'sets the back off time when there are no pending items' do
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform(shard_name)
end
it 'does not perform Geo::RepositoryVerification::Primary::SingleWorker when the backoff time is set' do
repository_outdated = create(:project)
create(:repository_state, :repository_outdated, project: repository_outdated)
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::RepositoryVerification::Primary::SingleWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
end
# test that jobs are always moving forward and we're not querying the same things # test that jobs are always moving forward and we're not querying the same things
# over and over # over and over
describe 'resource loading' do describe 'resource loading' do
......
...@@ -163,5 +163,26 @@ describe Geo::RepositoryVerification::Secondary::ShardWorker, :postgresql, :clea ...@@ -163,5 +163,26 @@ describe Geo::RepositoryVerification::Secondary::ShardWorker, :postgresql, :clea
Sidekiq::Testing.inline! { subject.perform(shard_name) } Sidekiq::Testing.inline! { subject.perform(shard_name) }
end end
context 'backoff time' do
let(:cache_key) { "#{described_class.name.underscore}:shard:#{shard_name}:skip" }
it 'sets the back off time when there are no pending items' do
expect(Rails.cache).to receive(:write).with(cache_key, true, expires_in: 300.seconds).once
subject.perform(shard_name)
end
it 'does not perform Geo::RepositoryVerification::Secondary::SingleWorker when the backoff time is set' do
create(:repository_state, :repository_verified, project: project)
create(:geo_project_registry, :synced, :repository_verification_outdated, project: project)
expect(Rails.cache).to receive(:read).with(cache_key).and_return(true)
expect(Geo::RepositoryVerification::Secondary::SingleWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment