Commit c042a9f1 authored by Stan Hu's avatar Stan Hu

Move Geo download jobs into separate queues for easier tracking and scheduling

Also moves the geo_file_download_worker.rb -> geo/file_download_worker.rb
parent ccddf64f
......@@ -7,7 +7,7 @@ module Geo
end
def schedule_job(object_db_id, object_type)
job_id = GeoFileDownloadWorker.perform_async(object_type, object_db_id)
job_id = FileDownloadWorker.perform_async(object_type, object_db_id)
{ id: object_db_id, type: object_type, job_id: job_id } if job_id
end
......
module Geo
class FileDownloadWorker
include Sidekiq::Worker
sidekiq_options queue: :geo_file_download, retry: 3, dead: false
def perform(object_type, object_id)
Geo::FileDownloadService.new(object_type.to_sym, object_id).execute
end
end
end
......@@ -2,7 +2,7 @@ module Geo
class ProjectSyncWorker
include Sidekiq::Worker
sidekiq_options queue: :geo, retry: 3, dead: false
sidekiq_options queue: :geo_project_sync, retry: 3, dead: false
sidekiq_retry_in { |count| 30 * count }
......
class GeoFileDownloadWorker
include Sidekiq::Worker
include GeoQueue
def perform(object_type, object_id)
Geo::FileDownloadService.new(object_type.to_sym, object_id).execute
end
end
......@@ -75,7 +75,8 @@
- [repository_update_remote_mirror, 1]
- [project_update_repository_storage, 1]
- [admin_emails, 1]
- [geo_repository_update, 1]
- [geo_project_sync, 1]
- [geo_file_download, 1]
- [elastic_batch_project_indexer, 1]
- [elastic_indexer, 1]
- [elastic_commit_indexer, 1]
......
......@@ -28,7 +28,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :truncate do
allow(Gitlab::Geo).to receive(:geo_database_configured?) { false }
expect(GeoFileDownloadWorker).not_to receive(:perform_async)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
subject.perform
......@@ -43,7 +43,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :truncate do
secondary.enabled = false
secondary.save
expect(GeoFileDownloadWorker).not_to receive(:perform_async)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
subject.perform
end
......@@ -58,8 +58,8 @@ describe Geo::FileDownloadDispatchWorker, :geo, :truncate do
end
it 'filters S3-backed files' do
expect(GeoFileDownloadWorker).to receive(:perform_async).with(:lfs, lfs_object_local_store.id)
expect(GeoFileDownloadWorker).not_to receive(:perform_async).with(:lfs, lfs_object_remote_store.id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with(:lfs, lfs_object_local_store.id)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with(:lfs, lfs_object_remote_store.id)
subject.perform
end
......@@ -81,7 +81,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :truncate do
create_list(:upload, 2, :personal_snippet)
create(:appearance, logo: avatar, header_logo: avatar)
expect(GeoFileDownloadWorker).to receive(:perform_async).exactly(10).times.and_call_original
expect(Geo::FileDownloadWorker).to receive(:perform_async).exactly(10).times.and_call_original
# For 10 downloads, we expect four database reloads:
# 1. Load the first batch of 5.
# 2. 4 get sent out, 1 remains. This triggers another reload, which loads in the next 5.
......@@ -103,14 +103,14 @@ describe Geo::FileDownloadDispatchWorker, :geo, :truncate do
stub_const('Geo::BaseSchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 1)
expect(GeoFileDownloadWorker).not_to receive(:perform_async).with(:lfs, failed_registry.file_id)
expect(GeoFileDownloadWorker).to receive(:perform_async).with(:lfs, unsynced.id)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with(:lfs, failed_registry.file_id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with(:lfs, unsynced.id)
subject.perform
end
it 'retries failed files' do
expect(GeoFileDownloadWorker).to receive(:perform_async).with('lfs', failed_registry.file_id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', failed_registry.file_id)
subject.perform
end
......@@ -118,7 +118,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :truncate do
it 'does not retries failed files when retry_at is tomorrow' do
failed_registry = create(:geo_file_registry, :lfs, file_id: 999, success: false, retry_at: Date.tomorrow)
expect(GeoFileDownloadWorker).not_to receive(:perform_async).with('lfs', failed_registry.file_id)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async).with('lfs', failed_registry.file_id)
subject.perform
end
......@@ -126,7 +126,7 @@ describe Geo::FileDownloadDispatchWorker, :geo, :truncate do
it 'does not retries failed files when retry_at is in the past' do
failed_registry = create(:geo_file_registry, :lfs, file_id: 999, success: false, retry_at: Date.yesterday)
expect(GeoFileDownloadWorker).to receive(:perform_async).with('lfs', failed_registry.file_id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('lfs', failed_registry.file_id)
subject.perform
end
......@@ -143,27 +143,27 @@ describe Geo::FileDownloadDispatchWorker, :geo, :truncate do
secondary.update_attribute(:namespaces, [synced_group])
end
it 'does not perform GeoFileDownloadWorker for LFS object that does not belong to selected namespaces to replicate' do
it 'does not perform Geo::FileDownloadWorker for LFS object that does not belong to selected namespaces to replicate' do
lfs_objec_in_synced_group = create(:lfs_objects_project, project: project_in_synced_group)
create(:lfs_objects_project, project: unsynced_project)
expect(GeoFileDownloadWorker).to receive(:perform_async)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with(:lfs, lfs_objec_in_synced_group.lfs_object_id).once.and_return(spy)
subject.perform
end
it 'does not perform GeoFileDownloadWorker for upload objects that do not belong to selected namespaces to replicate' do
it 'does not perform Geo::FileDownloadWorker for upload objects that do not belong to selected namespaces to replicate' do
avatar = fixture_file_upload(Rails.root.join('spec/fixtures/dk.png'))
avatar_in_synced_group = create(:upload, model: synced_group, path: avatar)
create(:upload, model: create(:group), path: avatar)
avatar_in_project_in_synced_group = create(:upload, model: project_in_synced_group, path: avatar)
create(:upload, model: unsynced_project, path: avatar)
expect(GeoFileDownloadWorker).to receive(:perform_async)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('avatar', avatar_in_project_in_synced_group.id).once.and_return(spy)
expect(GeoFileDownloadWorker).to receive(:perform_async)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('avatar', avatar_in_synced_group.id).once.and_return(spy)
subject.perform
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment