Commit 1e1edff7 authored by Heinrich Lee Yu's avatar Heinrich Lee Yu

Merge branch 'georgekoltsov/dont-throttle-bulk-import-entity-worker' into 'master'

Do not throttle enqueueing of BulkImports::EntityWorker

See merge request gitlab-org/gitlab!84208
parents e0394402 7a4c3dd0
...@@ -3,15 +3,12 @@ ...@@ -3,15 +3,12 @@
class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker include ApplicationWorker
data_consistency :always PERFORM_DELAY = 5.seconds
data_consistency :always
feature_category :importers feature_category :importers
sidekiq_options retry: false, dead: false sidekiq_options retry: false, dead: false
PERFORM_DELAY = 5.seconds
DEFAULT_BATCH_SIZE = 5
def perform(bulk_import_id) def perform(bulk_import_id)
@bulk_import = BulkImport.find_by_id(bulk_import_id) @bulk_import = BulkImport.find_by_id(bulk_import_id)
...@@ -19,11 +16,10 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -19,11 +16,10 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
return if @bulk_import.finished? || @bulk_import.failed? return if @bulk_import.finished? || @bulk_import.failed?
return @bulk_import.fail_op! if all_entities_failed? return @bulk_import.fail_op! if all_entities_failed?
return @bulk_import.finish! if all_entities_processed? && @bulk_import.started? return @bulk_import.finish! if all_entities_processed? && @bulk_import.started?
return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running
@bulk_import.start! if @bulk_import.created? @bulk_import.start! if @bulk_import.created?
created_entities.first(next_batch_size).each do |entity| created_entities.find_each do |entity|
entity.create_pipeline_trackers! entity.create_pipeline_trackers!
BulkImports::ExportRequestWorker.perform_async(entity.id) BulkImports::ExportRequestWorker.perform_async(entity.id)
...@@ -45,10 +41,6 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -45,10 +41,6 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
@entities ||= @bulk_import.entities @entities ||= @bulk_import.entities
end end
def started_entities
entities.with_status(:started)
end
def created_entities def created_entities
entities.with_status(:created) entities.with_status(:created)
end end
...@@ -61,14 +53,6 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -61,14 +53,6 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
entities.all? { |entity| entity.failed? } entities.all? { |entity| entity.failed? }
end end
def max_batch_size_exceeded?
started_entities.count >= DEFAULT_BATCH_SIZE
end
def next_batch_size
[DEFAULT_BATCH_SIZE - started_entities.count, 0].max
end
# A new BulkImportWorker job is enqueued to either # A new BulkImportWorker job is enqueued to either
# - Process the new BulkImports::Entity created during import (e.g. for the subgroups) # - Process the new BulkImports::Entity created during import (e.g. for the subgroups)
# - Or to mark the `bulk_import` as finished # - Or to mark the `bulk_import` as finished
......
...@@ -56,17 +56,6 @@ RSpec.describe BulkImportWorker do ...@@ -56,17 +56,6 @@ RSpec.describe BulkImportWorker do
end end
end end
context 'when maximum allowed number of import entities in progress' do
it 'reenqueues itself' do
bulk_import = create(:bulk_import, :started)
(described_class::DEFAULT_BATCH_SIZE + 1).times { |_| create(:bulk_import_entity, :started, bulk_import: bulk_import) }
expect(described_class).to receive(:perform_in).with(described_class::PERFORM_DELAY, bulk_import.id)
subject.perform(bulk_import.id)
end
end
context 'when bulk import is created' do context 'when bulk import is created' do
it 'marks bulk import as started' do it 'marks bulk import as started' do
bulk_import = create(:bulk_import, :created) bulk_import = create(:bulk_import, :created)
...@@ -93,21 +82,17 @@ RSpec.describe BulkImportWorker do ...@@ -93,21 +82,17 @@ RSpec.describe BulkImportWorker do
context 'when there are created entities to process' do context 'when there are created entities to process' do
let_it_be(:bulk_import) { create(:bulk_import, :created) } let_it_be(:bulk_import) { create(:bulk_import, :created) }
before do it 'marks all entities as started, enqueues EntityWorker, ExportRequestWorker and reenqueues' do
stub_const("#{described_class}::DEFAULT_BATCH_SIZE", 1)
end
it 'marks a batch of entities as started, enqueues EntityWorker, ExportRequestWorker and reenqueues' do
create(:bulk_import_entity, :created, bulk_import: bulk_import) create(:bulk_import_entity, :created, bulk_import: bulk_import)
create(:bulk_import_entity, :created, bulk_import: bulk_import) create(:bulk_import_entity, :created, bulk_import: bulk_import)
expect(described_class).to receive(:perform_in).with(described_class::PERFORM_DELAY, bulk_import.id) expect(described_class).to receive(:perform_in).with(described_class::PERFORM_DELAY, bulk_import.id)
expect(BulkImports::EntityWorker).to receive(:perform_async) expect(BulkImports::EntityWorker).to receive(:perform_async).twice
expect(BulkImports::ExportRequestWorker).to receive(:perform_async) expect(BulkImports::ExportRequestWorker).to receive(:perform_async).twice
subject.perform(bulk_import.id) subject.perform(bulk_import.id)
expect(bulk_import.entities.map(&:status_name)).to contain_exactly(:created, :started) expect(bulk_import.entities.map(&:status_name)).to contain_exactly(:started, :started)
end end
context 'when there are project entities to process' do context 'when there are project entities to process' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment