Commit c5faad8c authored by Kassio Borges's avatar Kassio Borges

Add logs about `BulkImports::Pipeline#run`

To make it easier to debug exceptions log what pipeline, and its
parts, is running and for what `BulkImports::Entity`.
parent 10332fe7
......@@ -6,34 +6,61 @@ module BulkImports
extend ActiveSupport::Concern
included do
attr_reader :extractors, :transformers, :loaders
private
def initialize
@extractors = self.class.extractors.map(&method(:instantiate))
@transformers = self.class.transformers.map(&method(:instantiate))
@loaders = self.class.loaders.map(&method(:instantiate))
def extractors
@extractors ||= self.class.extractors.map(&method(:instantiate))
end
super
def transformers
@transformers ||= self.class.transformers.map(&method(:instantiate))
end
def run(context)
extractors.each do |extractor|
extractor.extract(context).each do |entry|
transformers.each do |transformer|
entry = transformer.transform(context, entry)
end
def loaders
@loaders ||= self.class.loaders.map(&method(:instantiate))
end
loaders.each do |loader|
loader.load(context, entry)
end
end
end
def pipeline_name
@pipeline ||= self.class.name
end
def instantiate(class_config)
class_config[:klass].new(class_config[:options])
end
end
def run(context)
info(context, message: "Pipeline started", pipeline: pipeline_name)
extractors.each do |extractor|
extractor.extract(context).each do |entry|
info(context, extractor: extractor.class.name)
transformers.each do |transformer|
info(context, transformer: transformer.class.name)
entry = transformer.transform(context, entry)
end
loaders.each do |loader|
info(context, loader: loader.class.name)
loader.load(context, entry)
end
end
end
end
private # rubocop:disable Lint/UselessAccessModifier
def info(context, extra = {})
logger.info({
entity: context.entity.id,
entity_type: context.entity.source_type
}.merge(extra))
end
def logger
@logger ||= Gitlab::Import::Logger.build
end
end
end
end
......@@ -39,7 +39,10 @@ RSpec.describe BulkImports::Pipeline::Runner do
end
it 'runs pipeline extractor, transformer, loader' do
context = instance_double(BulkImports::Pipeline::Context)
context = instance_double(
BulkImports::Pipeline::Context,
entity: instance_double(BulkImports::Entity, id: 1, source_type: 'group')
)
entries = [{ foo: :bar }]
expect_next_instance_of(BulkImports::Extractor) do |extractor|
......@@ -54,6 +57,17 @@ RSpec.describe BulkImports::Pipeline::Runner do
expect(loader).to receive(:load).with(context, entries.first)
end
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:info)
.with(message: "Pipeline started", pipeline: 'BulkImports::MyPipeline', entity: 1, entity_type: 'group')
expect(logger).to receive(:info)
.with(entity: 1, entity_type: 'group', extractor: 'BulkImports::Extractor')
expect(logger).to receive(:info)
.with(entity: 1, entity_type: 'group', transformer: 'BulkImports::Transformer')
expect(logger).to receive(:info)
.with(entity: 1, entity_type: 'group', loader: 'BulkImports::Loader')
end
BulkImports::MyPipeline.new.run(context)
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment