Commit fa597211 authored by Bob Van Landuyt's avatar Bob Van Landuyt

Merge branch 'georgekoltsov/mark-bulk-import-as-failed-if-all-failed' into 'master'

Update Bulk Import state more accurately when import fails

See merge request gitlab-org/gitlab!63883
parents f6140400 cea74ea4
...@@ -15,7 +15,8 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -15,7 +15,8 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
@bulk_import = BulkImport.find_by_id(bulk_import_id) @bulk_import = BulkImport.find_by_id(bulk_import_id)
return unless @bulk_import return unless @bulk_import
return if @bulk_import.finished? return if @bulk_import.finished? || @bulk_import.failed?
return @bulk_import.fail_op! if all_entities_failed?
return @bulk_import.finish! if all_entities_processed? && @bulk_import.started? return @bulk_import.finish! if all_entities_processed? && @bulk_import.started?
return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running
...@@ -55,6 +56,10 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -55,6 +56,10 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
entities.all? { |entity| entity.finished? || entity.failed? } entities.all? { |entity| entity.finished? || entity.failed? }
end end
def all_entities_failed?
entities.all? { |entity| entity.failed? }
end
def max_batch_size_exceeded? def max_batch_size_exceeded?
started_entities.count >= DEFAULT_BATCH_SIZE started_entities.count >= DEFAULT_BATCH_SIZE
end end
......
...@@ -10,29 +10,39 @@ module BulkImports ...@@ -10,29 +10,39 @@ module BulkImports
def initialize(context) def initialize(context)
@context = context @context = context
@entity = @context.entity
@trackers = @entity.trackers
end end
def run def run
return if context.entity.finished? return if entity.finished? || entity.failed?
context.entity.finish! if all_other_trackers_failed?
entity.fail_op!
else
entity.finish!
end
logger.info( logger.info(
bulk_import_id: context.bulk_import.id, bulk_import_id: context.bulk_import.id,
bulk_import_entity_id: context.entity.id, bulk_import_entity_id: context.entity.id,
bulk_import_entity_type: context.entity.source_type, bulk_import_entity_type: context.entity.source_type,
pipeline_class: self.class.name, pipeline_class: self.class.name,
message: 'Entity finished' message: "Entity #{entity.status_name}"
) )
end end
private private
attr_reader :context attr_reader :context, :entity, :trackers
def logger def logger
@logger ||= Gitlab::Import::Logger.build @logger ||= Gitlab::Import::Logger.build
end end
def all_other_trackers_failed?
trackers.where.not(relation: self.class.name).all? { |tracker| tracker.failed? } # rubocop: disable CodeReuse/ActiveRecord
end
end end
end end
end end
......
...@@ -19,5 +19,11 @@ FactoryBot.define do ...@@ -19,5 +19,11 @@ FactoryBot.define do
sequence(:jid) { |n| "bulk_import_entity_#{n}" } sequence(:jid) { |n| "bulk_import_entity_#{n}" }
end end
trait :failed do
status { -1 }
sequence(:jid) { |n| "bulk_import_entity_#{n}" }
end
end end
end end
...@@ -25,8 +25,10 @@ RSpec.describe BulkImports::Groups::Pipelines::EntityFinisher do ...@@ -25,8 +25,10 @@ RSpec.describe BulkImports::Groups::Pipelines::EntityFinisher do
.to change(entity, :status_name).to(:finished) .to change(entity, :status_name).to(:finished)
end end
it 'does nothing when the entity is already finished' do context 'when entity is in a final finished or failed state' do
entity = create(:bulk_import_entity, :finished) shared_examples 'performs no state update' do |entity_state|
it 'does nothing' do
entity = create(:bulk_import_entity, entity_state)
pipeline_tracker = create(:bulk_import_tracker, entity: entity) pipeline_tracker = create(:bulk_import_tracker, entity: entity)
context = BulkImports::Pipeline::Context.new(pipeline_tracker) context = BulkImports::Pipeline::Context.new(pipeline_tracker)
subject = described_class.new(context) subject = described_class.new(context)
...@@ -34,4 +36,22 @@ RSpec.describe BulkImports::Groups::Pipelines::EntityFinisher do ...@@ -34,4 +36,22 @@ RSpec.describe BulkImports::Groups::Pipelines::EntityFinisher do
expect { subject.run } expect { subject.run }
.not_to change(entity, :status_name) .not_to change(entity, :status_name)
end end
end
include_examples 'performs no state update', :finished
include_examples 'performs no state update', :failed
end
context 'when all entity trackers failed' do
it 'marks entity as failed' do
entity = create(:bulk_import_entity, :started)
create(:bulk_import_tracker, :failed, entity: entity)
pipeline_tracker = create(:bulk_import_tracker, entity: entity, relation: described_class)
context = BulkImports::Pipeline::Context.new(pipeline_tracker)
described_class.new(context).run
expect(entity.reload.failed?).to eq(true)
end
end
end end
...@@ -22,6 +22,16 @@ RSpec.describe BulkImportWorker do ...@@ -22,6 +22,16 @@ RSpec.describe BulkImportWorker do
end end
end end
context 'when bulk import is failed' do
it 'does nothing' do
bulk_import = create(:bulk_import, :failed)
expect(described_class).not_to receive(:perform_in)
subject.perform(bulk_import.id)
end
end
context 'when all entities are processed' do context 'when all entities are processed' do
it 'marks bulk import as finished' do it 'marks bulk import as finished' do
bulk_import = create(:bulk_import, :started) bulk_import = create(:bulk_import, :started)
...@@ -34,6 +44,18 @@ RSpec.describe BulkImportWorker do ...@@ -34,6 +44,18 @@ RSpec.describe BulkImportWorker do
end end
end end
context 'when all entities are failed' do
it 'marks bulk import as failed' do
bulk_import = create(:bulk_import, :started)
create(:bulk_import_entity, :failed, bulk_import: bulk_import)
create(:bulk_import_entity, :failed, bulk_import: bulk_import)
subject.perform(bulk_import.id)
expect(bulk_import.reload.failed?).to eq(true)
end
end
context 'when maximum allowed number of import entities in progress' do context 'when maximum allowed number of import entities in progress' do
it 'reenqueues itself' do it 'reenqueues itself' do
bulk_import = create(:bulk_import, :started) bulk_import = create(:bulk_import, :started)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment