Commit e64aeb1b authored by Kassio Borges's avatar Kassio Borges

BulkImports: Add `BulkImports::PipelineWorker`

The Pipelines are grouped in Stages, like the CI jobs. Stages run in
sequence, one after the other, while the pipelines within a stage run in
parallel.

Each pipeline runs on an, individual, `BulkImports::PipelineWorker` job,
which enables:

- smaller/shorter jobs: Background jobs can be interrupted during
  deploys or other unexpected infrastructure/ops events. To make jobs
  more resilient it's desirable to have smaller jobs wherever possible.
- faster imports: Some pipelines can run in parallel, which reduces the
  total time of an import.
- (follow-up) network/rate limits handling: When a pipeline gets rate
  limitted, it can be schedule to retry after the rate limit timeout.
  https://gitlab.com/gitlab-org/gitlab/-/issues/262024
parent fb551ba2
# frozen_string_literal: true
module BulkImports
class Stage
include Singleton
CONFIG = {
group: {
pipeline: BulkImports::Groups::Pipelines::GroupPipeline,
stage: 0
},
subgroups: {
pipeline: BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline,
stage: 1
},
members: {
pipeline: BulkImports::Groups::Pipelines::MembersPipeline,
stage: 1
},
labels: {
pipeline: BulkImports::Groups::Pipelines::LabelsPipeline,
stage: 1
},
milestones: {
pipeline: BulkImports::Groups::Pipelines::MilestonesPipeline,
stage: 1
},
badges: {
pipeline: BulkImports::Groups::Pipelines::BadgesPipeline,
stage: 1
},
finisher: {
pipeline: BulkImports::Groups::Pipelines::EntityFinisher,
stage: 2
}
}.freeze
def self.pipelines
instance.pipelines
end
def self.pipeline_exists?(name)
pipelines.any? do |(_, pipeline)|
pipeline.to_s == name.to_s
end
end
def pipelines
@pipelines ||= config
.values
.sort_by { |entry| entry[:stage] }
.map do |entry|
[entry[:stage], entry[:pipeline]]
end
end
private
def config
@config ||= CONFIG
end
end
end
::BulkImports::Stage.prepend_if_ee('::EE::BulkImports::Stage')
......@@ -20,6 +20,27 @@ class BulkImports::Tracker < ApplicationRecord
DEFAULT_PAGE_SIZE = 500
scope :next_pipeline_trackers_for, -> (entity_id) {
entity_scope = where(bulk_import_entity_id: entity_id)
next_stage_scope = entity_scope.with_status(:created).select('MIN(stage)')
entity_scope.where(stage: next_stage_scope)
}
def self.stage_running?(entity_id, stage)
where(stage: stage, bulk_import_entity_id: entity_id)
.with_status(:created, :started)
.exists?
end
def pipeline_class
unless BulkImports::Stage.pipeline_exists?(pipeline_name)
raise NameError.new("'#{pipeline_name}' is not a valid BulkImport Pipeline")
end
pipeline_name.constantize
end
state_machine :status, initial: :created do
state :created, value: 0
state :started, value: 1
......@@ -32,10 +53,6 @@ class BulkImports::Tracker < ApplicationRecord
end
event :finish do
# When applying the concurrent model,
# remove the created => finished transaction
# https://gitlab.com/gitlab-org/gitlab/-/issues/323384
transition created: :finished
transition started: :finished
transition failed: :failed
transition skipped: :skipped
......
......@@ -1539,6 +1539,14 @@
:weight: 1
:idempotent:
:tags: []
- :name: bulk_imports_pipeline
:feature_category: :importers
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent:
:tags: []
- :name: chat_notification
:feature_category: :chatops
:has_external_dependencies: true
......
......@@ -21,9 +21,11 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
@bulk_import.start! if @bulk_import.created?
created_entities.first(next_batch_size).each do |entity|
entity.start!
create_pipeline_tracker_for(entity)
BulkImports::EntityWorker.perform_async(entity.id)
entity.start!
end
re_enqueue
......@@ -65,4 +67,13 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
def re_enqueue
BulkImportWorker.perform_in(PERFORM_DELAY, @bulk_import.id)
end
def create_pipeline_tracker_for(entity)
BulkImports::Stage.pipelines.each do |stage, pipeline|
entity.trackers.create!(
stage: stage,
pipeline_name: pipeline
)
end
end
end
......@@ -10,24 +10,47 @@ module BulkImports
worker_has_external_dependencies!
def perform(entity_id)
entity = BulkImports::Entity.with_status(:started).find_by_id(entity_id)
def perform(entity_id, current_stage = nil)
return if stage_running?(entity_id, current_stage)
logger.info(
worker: self.class.name,
entity_id: entity_id,
current_stage: current_stage
)
next_pipeline_trackers_for(entity_id).each do |pipeline_tracker|
BulkImports::PipelineWorker.perform_async(
pipeline_tracker.id,
pipeline_tracker.stage,
entity_id
)
end
rescue => e
logger.error(
worker: self.class.name,
entity_id: entity_id,
current_stage: current_stage,
error_message: e.message
)
Gitlab::ErrorTracking.track_exception(e, entity_id: entity_id)
end
if entity
entity.update!(jid: jid)
private
BulkImports::Importers::GroupImporter.new(entity).execute
end
def stage_running?(entity_id, stage)
return unless stage
rescue => e
extra = {
bulk_import_id: entity&.bulk_import&.id,
entity_id: entity&.id
}
BulkImports::Tracker.stage_running?(entity_id, stage)
end
Gitlab::ErrorTracking.track_exception(e, extra)
def next_pipeline_trackers_for(entity_id)
BulkImports::Tracker.next_pipeline_trackers_for(entity_id)
end
entity&.fail_op
def logger
@logger ||= Gitlab::Import::Logger.build
end
end
end
# frozen_string_literal: true
module BulkImports
class PipelineWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
feature_category :importers
sidekiq_options retry: false, dead: false
worker_has_external_dependencies!
def perform(pipeline_tracker_id, stage, entity_id)
pipeline_tracker = ::BulkImports::Tracker
.with_status(:created)
.find_by_id(pipeline_tracker_id)
if pipeline_tracker.present?
logger.info(
worker: self.class.name,
entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name
)
run(pipeline_tracker)
else
logger.error(
worker: self.class.name,
entity_id: entity_id,
pipeline_tracker_id: pipeline_tracker_id,
message: 'Unstarted pipeline not found'
)
end
ensure
::BulkImports::EntityWorker.perform_async(entity_id, stage)
end
private
def run(pipeline_tracker)
pipeline_tracker.update!(status_event: 'start', jid: jid)
context = ::BulkImports::Pipeline::Context.new(pipeline_tracker)
pipeline_tracker.pipeline_class.new(context).run
pipeline_tracker.finish!
rescue => e
pipeline_tracker.fail_op!
logger.error(
worker: self.class.name,
entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name,
message: e.message
)
Gitlab::ErrorTracking.track_exception(
e,
entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name
)
end
def logger
@logger ||= Gitlab::Import::Logger.build
end
end
end
---
title: 'BulkImports: Add `BulkImports::PipelineWorker` to process each BulkImport pipeline on its own background job'
merge_request: 57153
author:
type: added
......@@ -56,6 +56,8 @@
- 1
- - bulk_imports_entity
- 1
- - bulk_imports_pipeline
- 1
- - chaos
- 2
- - chat_notification
......
# frozen_string_literal: true
module EE
module BulkImports
module Stage
extend ::Gitlab::Utils::Override
EE_CONFIG = {
iterations: {
pipeline: EE::BulkImports::Groups::Pipelines::IterationsPipeline,
stage: 1
},
epics: {
pipeline: EE::BulkImports::Groups::Pipelines::EpicsPipeline,
stage: 2
},
epic_award_emojis: {
pipeline: EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline,
stage: 3
},
epic_events: {
pipeline: EE::BulkImports::Groups::Pipelines::EpicEventsPipeline,
stage: 3
},
# Override the CE stage value for the EntityFinisher Pipeline
finisher: {
stage: 4
}
}.freeze
override :config
def config
@config ||= super.deep_merge(EE_CONFIG)
end
end
end
end
# frozen_string_literal: true
module EE
module BulkImports
module Importers
module GroupImporter
extend ::Gitlab::Utils::Override
private
override :pipelines
def pipelines
super + [
EE::BulkImports::Groups::Pipelines::EpicsPipeline,
EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline,
EE::BulkImports::Groups::Pipelines::EpicEventsPipeline,
EE::BulkImports::Groups::Pipelines::IterationsPipeline
]
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Importers::GroupImporter do
let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) }
let_it_be(:bulk_import) { create(:bulk_import, user: user) }
let_it_be(:entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import, group: group) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
subject { described_class.new(entity) }
before do
allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context)
end
describe '#execute' do
it "starts the entity and run its pipelines" do
expect_to_run_pipeline BulkImports::Groups::Pipelines::GroupPipeline, context: context
expect_to_run_pipeline BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, context: context
expect_to_run_pipeline BulkImports::Groups::Pipelines::MembersPipeline, context: context
expect_to_run_pipeline BulkImports::Groups::Pipelines::LabelsPipeline, context: context
expect_to_run_pipeline BulkImports::Groups::Pipelines::MilestonesPipeline, context: context
expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicsPipeline, context: context
expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline, context: context
expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::EpicEventsPipeline, context: context
expect_to_run_pipeline EE::BulkImports::Groups::Pipelines::IterationsPipeline, context: context
subject.execute
expect(entity.reload).to be_finished
end
end
def expect_to_run_pipeline(klass, context:)
expect_next_instance_of(klass, context) do |pipeline|
expect(pipeline).to receive(:run)
end
end
end
# frozen_string_literal: true
require 'fast_spec_helper'
RSpec.describe BulkImports::Stage do
let(:pipelines) do
[
[0, BulkImports::Groups::Pipelines::GroupPipeline],
[1, BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline],
[1, BulkImports::Groups::Pipelines::MembersPipeline],
[1, BulkImports::Groups::Pipelines::LabelsPipeline],
[1, BulkImports::Groups::Pipelines::MilestonesPipeline],
[1, BulkImports::Groups::Pipelines::BadgesPipeline],
[1, EE::BulkImports::Groups::Pipelines::IterationsPipeline],
[2, EE::BulkImports::Groups::Pipelines::EpicsPipeline],
[3, EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline],
[3, EE::BulkImports::Groups::Pipelines::EpicEventsPipeline],
[4, BulkImports::Groups::Pipelines::EntityFinisher]
]
end
describe '#each' do
it 'iterates over all pipelines with the stage number' do
expect(described_class.pipelines).to match_array(pipelines)
end
end
end
# frozen_string_literal: true
module BulkImports
module Groups
module Pipelines
class EntityFinisher
def initialize(context)
@context = context
end
def run
return if context.entity.finished?
context.entity.finish!
logger.info(
bulk_import_id: context.bulk_import.id,
bulk_import_entity_id: context.entity.id,
bulk_import_entity_type: context.entity.source_type,
pipeline_class: self.class.name,
message: 'Entity finished'
)
end
private
attr_reader :context
def logger
@logger ||= Gitlab::Import::Logger.build
end
end
end
end
end
# frozen_string_literal: true
module BulkImports
module Importers
class GroupImporter
def initialize(entity)
@entity = entity
end
def execute
pipelines.each.with_index do |pipeline, stage|
pipeline_tracker = entity.trackers.create!(
pipeline_name: pipeline,
stage: stage
)
context = BulkImports::Pipeline::Context.new(pipeline_tracker)
pipeline.new(context).run
pipeline_tracker.finish!
end
entity.finish!
end
private
attr_reader :entity
def pipelines
[
BulkImports::Groups::Pipelines::GroupPipeline,
BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline,
BulkImports::Groups::Pipelines::MembersPipeline,
BulkImports::Groups::Pipelines::LabelsPipeline,
BulkImports::Groups::Pipelines::MilestonesPipeline,
BulkImports::Groups::Pipelines::BadgesPipeline
]
end
end
end
end
BulkImports::Importers::GroupImporter.prepend_if_ee('EE::BulkImports::Importers::GroupImporter')
......@@ -5,7 +5,19 @@ FactoryBot.define do
association :entity, factory: :bulk_import_entity
stage { 0 }
relation { :relation }
has_next_page { false }
sequence(:pipeline_name) { |n| "pipeline_name_#{n}" }
trait :started do
status { 1 }
sequence(:jid) { |n| "bulk_import_entity_#{n}" }
end
trait :finished do
status { 2 }
sequence(:jid) { |n| "bulk_import_entity_#{n}" }
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Groups::Pipelines::EntityFinisher do
it 'updates the entity status to finished' do
entity = create(:bulk_import_entity, :started)
pipeline_tracker = create(:bulk_import_tracker, entity: entity)
context = BulkImports::Pipeline::Context.new(pipeline_tracker)
subject = described_class.new(context)
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info)
.with(
bulk_import_id: entity.bulk_import.id,
bulk_import_entity_id: entity.id,
bulk_import_entity_type: entity.source_type,
pipeline_class: described_class.name,
message: 'Entity finished'
)
end
expect { subject.run }
.to change(entity, :status_name).to(:finished)
end
it 'does nothing when the entity is already finished' do
entity = create(:bulk_import_entity, :finished)
pipeline_tracker = create(:bulk_import_tracker, entity: entity)
context = BulkImports::Pipeline::Context.new(pipeline_tracker)
subject = described_class.new(context)
expect { subject.run }
.not_to change(entity, :status_name)
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Importers::GroupImporter do
let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) }
let_it_be(:bulk_import) { create(:bulk_import) }
let_it_be(:entity) { create(:bulk_import_entity, :started, group: group) }
let_it_be(:tracker) { create(:bulk_import_tracker, entity: entity, pipeline_name: described_class.name) }
let_it_be(:context) { BulkImports::Pipeline::Context.new(tracker) }
before do
allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context)
end
subject { described_class.new(entity) }
describe '#execute' do
it 'starts the entity and run its pipelines' do
expect_to_run_pipeline BulkImports::Groups::Pipelines::GroupPipeline, context: context
expect_to_run_pipeline BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, context: context
expect_to_run_pipeline BulkImports::Groups::Pipelines::MembersPipeline, context: context
expect_to_run_pipeline BulkImports::Groups::Pipelines::LabelsPipeline, context: context
expect_to_run_pipeline BulkImports::Groups::Pipelines::MilestonesPipeline, context: context
expect_to_run_pipeline BulkImports::Groups::Pipelines::BadgesPipeline, context: context
if Gitlab.ee?
expect_to_run_pipeline('EE::BulkImports::Groups::Pipelines::EpicsPipeline'.constantize, context: context)
expect_to_run_pipeline('EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline'.constantize, context: context)
expect_to_run_pipeline('EE::BulkImports::Groups::Pipelines::EpicEventsPipeline'.constantize, context: context)
expect_to_run_pipeline('EE::BulkImports::Groups::Pipelines::IterationsPipeline'.constantize, context: context)
end
subject.execute
expect(entity).to be_finished
end
context 'when failed' do
let(:entity) { create(:bulk_import_entity, :failed, bulk_import: bulk_import, group: group) }
it 'does not transition entity to finished state' do
allow(entity).to receive(:start!)
subject.execute
expect(entity.reload).to be_failed
end
end
end
def expect_to_run_pipeline(klass, context:)
expect_next_instance_of(klass, context) do |pipeline|
expect(pipeline).to receive(:run)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Stage do
let(:pipelines) do
if Gitlab.ee?
[
[0, BulkImports::Groups::Pipelines::GroupPipeline],
[1, BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline],
[1, BulkImports::Groups::Pipelines::MembersPipeline],
[1, BulkImports::Groups::Pipelines::LabelsPipeline],
[1, BulkImports::Groups::Pipelines::MilestonesPipeline],
[1, BulkImports::Groups::Pipelines::BadgesPipeline],
[1, 'EE::BulkImports::Groups::Pipelines::IterationsPipeline'.constantize],
[2, 'EE::BulkImports::Groups::Pipelines::EpicsPipeline'.constantize],
[3, 'EE::BulkImports::Groups::Pipelines::EpicAwardEmojiPipeline'.constantize],
[3, 'EE::BulkImports::Groups::Pipelines::EpicEventsPipeline'.constantize],
[4, BulkImports::Groups::Pipelines::EntityFinisher]
]
else
[
[0, BulkImports::Groups::Pipelines::GroupPipeline],
[1, BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline],
[1, BulkImports::Groups::Pipelines::MembersPipeline],
[1, BulkImports::Groups::Pipelines::LabelsPipeline],
[1, BulkImports::Groups::Pipelines::MilestonesPipeline],
[1, BulkImports::Groups::Pipelines::BadgesPipeline],
[2, BulkImports::Groups::Pipelines::EntityFinisher]
]
end
end
describe '.pipelines' do
it 'list all the pipelines with their stage number, ordered by stage' do
expect(described_class.pipelines).to match_array(pipelines)
end
end
describe '.pipeline_exists?' do
it 'returns true when the given pipeline name exists in the pipelines list' do
expect(described_class.pipeline_exists?(BulkImports::Groups::Pipelines::GroupPipeline)).to eq(true)
expect(described_class.pipeline_exists?('BulkImports::Groups::Pipelines::GroupPipeline')).to eq(true)
end
it 'returns false when the given pipeline name exists in the pipelines list' do
expect(described_class.pipeline_exists?('BulkImports::Groups::Pipelines::InexistentPipeline')).to eq(false)
end
end
end
......@@ -26,4 +26,60 @@ RSpec.describe BulkImports::Tracker, type: :model do
end
end
end
describe '.stage_running?' do
it 'returns true if there is any unfinished pipeline in the given stage' do
tracker = create(:bulk_import_tracker)
expect(described_class.stage_running?(tracker.entity.id, 0))
.to eq(true)
end
it 'returns false if there are no unfinished pipeline in the given stage' do
tracker = create(:bulk_import_tracker, :finished)
expect(described_class.stage_running?(tracker.entity.id, 0))
.to eq(false)
end
end
describe '.next_pipeline_trackers_for' do
let_it_be(:entity) { create(:bulk_import_entity) }
let_it_be(:stage_0_tracker) { create(:bulk_import_tracker, :finished, entity: entity) }
it 'returns empty when all the stages pipelines are finished' do
expect(described_class.next_pipeline_trackers_for(entity.id))
.to eq([])
end
it 'returns the not started pipeline trackers from the minimum stage number' do
stage_1_tracker = create(:bulk_import_tracker, entity: entity, stage: 1)
stage_2_tracker = create(:bulk_import_tracker, entity: entity, stage: 2)
expect(described_class.next_pipeline_trackers_for(entity.id))
.to include(stage_1_tracker)
expect(described_class.next_pipeline_trackers_for(entity.id))
.not_to include(stage_2_tracker)
end
end
describe '#pipeline_class' do
it 'returns the pipeline class' do
pipeline_class = BulkImports::Stage.pipelines.first[1]
tracker = create(:bulk_import_tracker, pipeline_name: pipeline_class)
expect(tracker.pipeline_class).to eq(pipeline_class)
end
it 'raises an error when the pipeline is not valid' do
tracker = create(:bulk_import_tracker, pipeline_name: 'InexistingPipeline')
expect { tracker.pipeline_class }
.to raise_error(
NameError,
"'InexistingPipeline' is not a valid BulkImport Pipeline"
)
end
end
end
......@@ -4,10 +4,6 @@ require 'spec_helper'
RSpec.describe BulkImportWorker do
describe '#perform' do
before do
stub_const("#{described_class}::DEFAULT_BATCH_SIZE", 1)
end
context 'when no bulk import is found' do
it 'does nothing' do
expect(described_class).not_to receive(:perform_in)
......@@ -59,10 +55,26 @@ RSpec.describe BulkImportWorker do
expect(bulk_import.reload.started?).to eq(true)
end
it 'creates all the required pipeline trackers' do
bulk_import = create(:bulk_import, :created)
entity_1 = create(:bulk_import_entity, :created, bulk_import: bulk_import)
entity_2 = create(:bulk_import_entity, :created, bulk_import: bulk_import)
expect { subject.perform(bulk_import.id) }
.to change(BulkImports::Tracker, :count)
.by(BulkImports::Stage.pipelines.size * 2)
expect(entity_1.trackers).not_to be_empty
expect(entity_2.trackers).not_to be_empty
end
context 'when there are created entities to process' do
it 'marks a batch of entities as started, enqueues BulkImports::EntityWorker and reenqueues' do
stub_const("#{described_class}::DEFAULT_BATCH_SIZE", 1)
bulk_import = create(:bulk_import, :created)
(described_class::DEFAULT_BATCH_SIZE + 1).times { |_| create(:bulk_import_entity, :created, bulk_import: bulk_import) }
create(:bulk_import_entity, :created, bulk_import: bulk_import)
create(:bulk_import_entity, :created, bulk_import: bulk_import)
expect(described_class).to receive(:perform_in).with(described_class::PERFORM_DELAY, bulk_import.id)
expect(BulkImports::EntityWorker).to receive(:perform_async)
......
......@@ -3,51 +3,107 @@
require 'spec_helper'
RSpec.describe BulkImports::EntityWorker do
describe '#execute' do
let(:bulk_import) { create(:bulk_import) }
context 'when started entity exists' do
let(:entity) { create(:bulk_import_entity, :started, bulk_import: bulk_import) }
it 'executes BulkImports::Importers::GroupImporter' do
expect(BulkImports::Importers::GroupImporter).to receive(:new).with(entity).and_call_original
let_it_be(:entity) { create(:bulk_import_entity) }
let_it_be(:pipeline_tracker) do
create(
:bulk_import_tracker,
entity: entity,
pipeline_name: 'Stage0::Pipeline',
stage: 0
)
end
subject.perform(entity.id)
end
it 'enqueues the first stage pipelines work' do
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info)
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil
)
end
it 'sets jid' do
jid = 'jid'
expect(BulkImports::PipelineWorker)
.to receive(:perform_async)
.with(
pipeline_tracker.id,
pipeline_tracker.stage,
entity.id
)
allow(subject).to receive(:jid).and_return(jid)
subject.perform(entity.id)
end
subject.perform(entity.id)
it 'do not enqueue a new pipeline job if the current stage still running' do
expect(BulkImports::PipelineWorker)
.not_to receive(:perform_async)
expect(entity.reload.jid).to eq(jid)
end
subject.perform(entity.id, 0)
end
context 'when exception occurs' do
it 'tracks the exception & marks entity as failed' do
allow(BulkImports::Importers::GroupImporter).to receive(:new) { raise StandardError }
it 'enqueues the next stage pipelines when the current stage is finished' do
next_stage_pipeline_tracker = create(
:bulk_import_tracker,
entity: entity,
pipeline_name: 'Stage1::Pipeline',
stage: 1
)
pipeline_tracker.fail_op!
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info)
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: 0
)
end
expect(Gitlab::ErrorTracking)
.to receive(:track_exception)
.with(kind_of(StandardError), bulk_import_id: bulk_import.id, entity_id: entity.id)
expect(BulkImports::PipelineWorker)
.to receive(:perform_async)
.with(
next_stage_pipeline_tracker.id,
next_stage_pipeline_tracker.stage,
entity.id
)
subject.perform(entity.id)
subject.perform(entity.id, 0)
end
expect(entity.reload.failed?).to eq(true)
end
end
it 'logs and tracks the raised exceptions' do
exception = StandardError.new('Error!')
expect(BulkImports::PipelineWorker)
.to receive(:perform_async)
.and_raise(exception)
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info)
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil
)
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
entity_id: entity.id,
current_stage: nil,
error_message: 'Error!'
)
end
context 'when started entity does not exist' do
it 'does not execute BulkImports::Importers::GroupImporter' do
entity = create(:bulk_import_entity, bulk_import: bulk_import)
expect(Gitlab::ErrorTracking)
.to receive(:track_exception)
.with(exception, entity_id: entity.id)
expect(BulkImports::Importers::GroupImporter).not_to receive(:new)
subject.perform(entity.id)
end
end
subject.perform(entity.id)
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::PipelineWorker do
let(:pipeline_class) do
Class.new do
def initialize(_); end
def run; end
end
end
let_it_be(:entity) { create(:bulk_import_entity) }
before do
stub_const('FakePipeline', pipeline_class)
end
it 'runs the given pipeline successfully' do
pipeline_tracker = create(
:bulk_import_tracker,
entity: entity,
pipeline_name: 'FakePipeline'
)
expect(BulkImports::Stage)
.to receive(:pipeline_exists?)
.with('FakePipeline')
.and_return(true)
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info)
.with(
worker: described_class.name,
pipeline_name: 'FakePipeline',
entity_id: entity.id
)
end
expect(BulkImports::EntityWorker)
.to receive(:perform_async)
.with(entity.id, pipeline_tracker.stage)
expect(subject).to receive(:jid).and_return('jid')
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
pipeline_tracker.reload
expect(pipeline_tracker.status_name).to eq(:finished)
expect(pipeline_tracker.jid).to eq('jid')
end
context 'when the pipeline cannot be found' do
it 'logs the error' do
pipeline_tracker = create(
:bulk_import_tracker,
:started,
entity: entity,
pipeline_name: 'FakePipeline'
)
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
pipeline_tracker_id: pipeline_tracker.id,
entity_id: entity.id,
message: 'Unstarted pipeline not found'
)
end
expect(BulkImports::EntityWorker)
.to receive(:perform_async)
.with(entity.id, pipeline_tracker.stage)
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
end
end
context 'when the pipeline raises an exception' do
it 'logs the error' do
pipeline_tracker = create(
:bulk_import_tracker,
entity: entity,
pipeline_name: 'InexistentPipeline'
)
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:error)
.with(
worker: described_class.name,
pipeline_name: 'InexistentPipeline',
entity_id: entity.id,
message: "'InexistentPipeline' is not a valid BulkImport Pipeline"
)
end
expect(Gitlab::ErrorTracking)
.to receive(:track_exception)
.with(
instance_of(NameError),
entity_id: entity.id,
pipeline_name: pipeline_tracker.pipeline_name
)
expect(BulkImports::EntityWorker)
.to receive(:perform_async)
.with(entity.id, pipeline_tracker.stage)
expect(subject).to receive(:jid).and_return('jid')
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
pipeline_tracker.reload
expect(pipeline_tracker.status_name).to eq(:failed)
expect(pipeline_tracker.jid).to eq('jid')
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment