Commit 8f50a5b2 authored by George Koltsov's avatar George Koltsov

Limit Group Migration extractors and loaders to 1 per pipeline

- Restrict defining extractors/loaders to 1 each per pipeline in order
  to reduce `BulkImports::Pipeline::Runner#run` complexity
- If we are going to need to extract/load from/to
  multiple sources/destination we can revisit it and consider
  adding such functionality in the future,
  but there is no need in having it now.
parent 734994d4
---
title: Limit Group Migration extractors and loaders to 1 per pipeline
merge_request: 50951
author:
type: changed
...@@ -46,14 +46,12 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do ...@@ -46,14 +46,12 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) } it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) }
it 'has extractors' do it 'has extractors' do
expect(described_class.extractors) expect(described_class.get_extractor)
.to contain_exactly( .to eq(
{
klass: BulkImports::Common::Extractors::GraphqlExtractor, klass: BulkImports::Common::Extractors::GraphqlExtractor,
options: { options: {
query: EE::BulkImports::Groups::Graphql::GetEpicsQuery query: EE::BulkImports::Groups::Graphql::GetEpicsQuery
} }
}
) )
end end
...@@ -67,9 +65,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do ...@@ -67,9 +65,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
end end
it 'has loaders' do it 'has loaders' do
expect(described_class.loaders).to contain_exactly({ expect(described_class.get_loader).to eq(klass: EE::BulkImports::Groups::Loaders::EpicsLoader, options: nil)
klass: EE::BulkImports::Groups::Loaders::EpicsLoader, options: nil
})
end end
end end
......
...@@ -10,16 +10,16 @@ module BulkImports ...@@ -10,16 +10,16 @@ module BulkImports
private private
def extractors def extractor
@extractors ||= self.class.extractors.map(&method(:instantiate)) @extractor ||= instantiate(self.class.get_extractor)
end end
def transformers def transformers
@transformers ||= self.class.transformers.map(&method(:instantiate)) @transformers ||= self.class.transformers.map(&method(:instantiate))
end end
def loaders def loader
@loaders ||= self.class.loaders.map(&method(:instantiate)) @loaders ||= instantiate(self.class.get_loader)
end end
def after_run def after_run
...@@ -41,7 +41,7 @@ module BulkImports ...@@ -41,7 +41,7 @@ module BulkImports
class_methods do class_methods do
def extractor(klass, options = nil) def extractor(klass, options = nil)
add_attribute(:extractors, klass, options) class_attributes[:extractor] = { klass: klass, options: options }
end end
def transformer(klass, options = nil) def transformer(klass, options = nil)
...@@ -49,23 +49,23 @@ module BulkImports ...@@ -49,23 +49,23 @@ module BulkImports
end end
def loader(klass, options = nil) def loader(klass, options = nil)
add_attribute(:loaders, klass, options) class_attributes[:loader] = { klass: klass, options: options }
end end
def after_run(&block) def after_run(&block)
class_attributes[:after_run] = block class_attributes[:after_run] = block
end end
def extractors def get_extractor
class_attributes[:extractors] class_attributes[:extractor]
end end
def transformers def transformers
class_attributes[:transformers] class_attributes[:transformers]
end end
def loaders def get_loader
class_attributes[:loaders] class_attributes[:loader]
end end
def after_run_callback def after_run_callback
......
...@@ -12,27 +12,17 @@ module BulkImports ...@@ -12,27 +12,17 @@ module BulkImports
info(context, message: 'Pipeline started', pipeline_class: pipeline) info(context, message: 'Pipeline started', pipeline_class: pipeline)
extractors.each do |extractor| Array.wrap(extracted_data_from(context)).each do |entry|
data = run_pipeline_step(:extractor, extractor.class.name, context) do
extractor.extract(context)
end
if data && data.respond_to?(:each)
data.each do |entry|
transformers.each do |transformer| transformers.each do |transformer|
entry = run_pipeline_step(:transformer, transformer.class.name, context) do entry = run_pipeline_step(:transformer, transformer.class.name, context) do
transformer.transform(context, entry) transformer.transform(context, entry)
end end
end end
loaders.each do |loader|
run_pipeline_step(:loader, loader.class.name, context) do run_pipeline_step(:loader, loader.class.name, context) do
loader.load(context, entry) loader.load(context, entry)
end end
end end
end
end
end
after_run.call(context) if after_run.present? after_run.call(context) if after_run.present?
rescue MarkedAsFailedError rescue MarkedAsFailedError
...@@ -55,6 +45,12 @@ module BulkImports ...@@ -55,6 +45,12 @@ module BulkImports
mark_as_failed(context) if abort_on_failure? mark_as_failed(context) if abort_on_failure?
end end
def extracted_data_from(context)
run_pipeline_step(:extractor, extractor.class.name, context) do
extractor.extract(context)
end
end
def mark_as_failed(context) def mark_as_failed(context)
warn(context, message: 'Pipeline failed', pipeline_class: pipeline) warn(context, message: 'Pipeline failed', pipeline_class: pipeline)
......
...@@ -75,14 +75,12 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do ...@@ -75,14 +75,12 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) } it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) }
it 'has extractors' do it 'has extractors' do
expect(described_class.extractors) expect(described_class.get_extractor)
.to contain_exactly( .to eq(
{
klass: BulkImports::Common::Extractors::GraphqlExtractor, klass: BulkImports::Common::Extractors::GraphqlExtractor,
options: { options: {
query: BulkImports::Groups::Graphql::GetGroupQuery query: BulkImports::Groups::Graphql::GetGroupQuery
} }
}
) )
end end
...@@ -97,9 +95,7 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do ...@@ -97,9 +95,7 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
end end
it 'has loaders' do it 'has loaders' do
expect(described_class.loaders).to contain_exactly({ expect(described_class.get_loader).to eq(klass: BulkImports::Groups::Loaders::GroupLoader, options: nil)
klass: BulkImports::Groups::Loaders::GroupLoader, options: nil
})
end end
end end
end end
...@@ -58,10 +58,7 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do ...@@ -58,10 +58,7 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do
it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) } it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) }
it 'has extractors' do it 'has extractors' do
expect(described_class.extractors).to contain_exactly( expect(described_class.get_extractor).to eq(klass: BulkImports::Groups::Extractors::SubgroupsExtractor, options: nil)
klass: BulkImports::Groups::Extractors::SubgroupsExtractor,
options: nil
)
end end
it 'has transformers' do it 'has transformers' do
...@@ -72,10 +69,7 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do ...@@ -72,10 +69,7 @@ RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do
end end
it 'has loaders' do it 'has loaders' do
expect(described_class.loaders).to contain_exactly( expect(described_class.get_loader).to eq(klass: BulkImports::Common::Loaders::EntityLoader, options: nil)
klass: BulkImports::Common::Loaders::EntityLoader,
options: nil
)
end end
end end
end end
...@@ -24,9 +24,9 @@ RSpec.describe BulkImports::Pipeline do ...@@ -24,9 +24,9 @@ RSpec.describe BulkImports::Pipeline do
describe 'getters' do describe 'getters' do
it 'retrieves class attributes' do it 'retrieves class attributes' do
expect(BulkImports::MyPipeline.extractors).to contain_exactly({ klass: BulkImports::Extractor, options: { foo: :bar } }) expect(BulkImports::MyPipeline.get_extractor).to eq({ klass: BulkImports::Extractor, options: { foo: :bar } })
expect(BulkImports::MyPipeline.transformers).to contain_exactly({ klass: BulkImports::Transformer, options: { foo: :bar } }) expect(BulkImports::MyPipeline.transformers).to contain_exactly({ klass: BulkImports::Transformer, options: { foo: :bar } })
expect(BulkImports::MyPipeline.loaders).to contain_exactly({ klass: BulkImports::Loader, options: { foo: :bar } }) expect(BulkImports::MyPipeline.get_loader).to eq({ klass: BulkImports::Loader, options: { foo: :bar } })
expect(BulkImports::MyPipeline.abort_on_failure?).to eq(true) expect(BulkImports::MyPipeline.abort_on_failure?).to eq(true)
end end
end end
...@@ -41,20 +41,14 @@ RSpec.describe BulkImports::Pipeline do ...@@ -41,20 +41,14 @@ RSpec.describe BulkImports::Pipeline do
BulkImports::MyPipeline.loader(klass, options) BulkImports::MyPipeline.loader(klass, options)
BulkImports::MyPipeline.abort_on_failure! BulkImports::MyPipeline.abort_on_failure!
expect(BulkImports::MyPipeline.extractors) expect(BulkImports::MyPipeline.get_extractor).to eq({ klass: klass, options: options })
.to contain_exactly(
{ klass: BulkImports::Extractor, options: { foo: :bar } },
{ klass: klass, options: options })
expect(BulkImports::MyPipeline.transformers) expect(BulkImports::MyPipeline.transformers)
.to contain_exactly( .to contain_exactly(
{ klass: BulkImports::Transformer, options: { foo: :bar } }, { klass: BulkImports::Transformer, options: { foo: :bar } },
{ klass: klass, options: options }) { klass: klass, options: options })
expect(BulkImports::MyPipeline.loaders) expect(BulkImports::MyPipeline.get_loader).to eq({ klass: klass, options: options })
.to contain_exactly(
{ klass: BulkImports::Loader, options: { foo: :bar } },
{ klass: klass, options: options })
expect(BulkImports::MyPipeline.abort_on_failure?).to eq(true) expect(BulkImports::MyPipeline.abort_on_failure?).to eq(true)
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment