Commit 48a89ee1 authored by George Koltsov's avatar George Koltsov

Add structured payload to BulkImport workers

  - Update BulkImports::* workers logging messages
    to include structured payload context information
    in the logs
parent 3c16576a
...@@ -12,12 +12,24 @@ module BulkImports ...@@ -12,12 +12,24 @@ module BulkImports
worker_has_external_dependencies! worker_has_external_dependencies!
def perform(entity_id, current_stage = nil) def perform(entity_id, current_stage = nil)
return if stage_running?(entity_id, current_stage) if stage_running?(entity_id, current_stage)
logger.info(
structured_payload(
entity_id: entity_id,
current_stage: current_stage,
message: 'Stage running'
)
)
return
end
logger.info( logger.info(
worker: self.class.name, structured_payload(
entity_id: entity_id, entity_id: entity_id,
current_stage: current_stage current_stage: current_stage,
message: 'Stage starting'
)
) )
next_pipeline_trackers_for(entity_id).each do |pipeline_tracker| next_pipeline_trackers_for(entity_id).each do |pipeline_tracker|
...@@ -29,10 +41,11 @@ module BulkImports ...@@ -29,10 +41,11 @@ module BulkImports
end end
rescue StandardError => e rescue StandardError => e
logger.error( logger.error(
worker: self.class.name, structured_payload(
entity_id: entity_id, entity_id: entity_id,
current_stage: current_stage, current_stage: current_stage,
error_message: e.message message: e.message
)
) )
Gitlab::ErrorTracking.track_exception(e, entity_id: entity_id) Gitlab::ErrorTracking.track_exception(e, entity_id: entity_id)
......
...@@ -42,10 +42,12 @@ module BulkImports ...@@ -42,10 +42,12 @@ module BulkImports
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
} }
Gitlab::Import::Logger.warn( Gitlab::Import::Logger.error(
attributes.merge( structured_payload(
bulk_import_id: entity.bulk_import.id, attributes.merge(
bulk_import_entity_type: entity.source_type bulk_import_id: entity.bulk_import.id,
bulk_import_entity_type: entity.source_type
)
) )
) )
......
...@@ -18,18 +18,20 @@ module BulkImports ...@@ -18,18 +18,20 @@ module BulkImports
if pipeline_tracker.present? if pipeline_tracker.present?
logger.info( logger.info(
worker: self.class.name, structured_payload(
entity_id: pipeline_tracker.entity.id, entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name pipeline_name: pipeline_tracker.pipeline_name
)
) )
run(pipeline_tracker) run(pipeline_tracker)
else else
logger.error( logger.error(
worker: self.class.name, structured_payload(
entity_id: entity_id, entity_id: entity_id,
pipeline_tracker_id: pipeline_tracker_id, pipeline_tracker_id: pipeline_tracker_id,
message: 'Unstarted pipeline not found' message: 'Unstarted pipeline not found'
)
) )
end end
...@@ -63,10 +65,11 @@ module BulkImports ...@@ -63,10 +65,11 @@ module BulkImports
rescue BulkImports::NetworkError => e rescue BulkImports::NetworkError => e
if e.retriable?(pipeline_tracker) if e.retriable?(pipeline_tracker)
logger.error( logger.error(
worker: self.class.name, structured_payload(
entity_id: pipeline_tracker.entity.id, entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name, pipeline_name: pipeline_tracker.pipeline_name,
message: "Retrying error: #{e.message}" message: "Retrying error: #{e.message}"
)
) )
pipeline_tracker.update!(status_event: 'retry', jid: jid) pipeline_tracker.update!(status_event: 'retry', jid: jid)
...@@ -83,10 +86,11 @@ module BulkImports ...@@ -83,10 +86,11 @@ module BulkImports
pipeline_tracker.update!(status_event: 'fail_op', jid: jid) pipeline_tracker.update!(status_event: 'fail_op', jid: jid)
logger.error( logger.error(
worker: self.class.name, structured_payload(
entity_id: pipeline_tracker.entity.id, entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name, pipeline_name: pipeline_tracker.pipeline_name,
message: exception.message message: exception.message
)
) )
Gitlab::ErrorTracking.track_exception( Gitlab::ErrorTracking.track_exception(
......
...@@ -36,9 +36,11 @@ RSpec.describe BulkImports::EntityWorker do ...@@ -36,9 +36,11 @@ RSpec.describe BulkImports::EntityWorker do
expect(logger) expect(logger)
.to receive(:info).twice .to receive(:info).twice
.with( .with(
worker: described_class.name, hash_including(
entity_id: entity.id, 'entity_id' => entity.id,
current_stage: nil 'current_stage' => nil,
'message' => 'Stage starting'
)
) )
end end
...@@ -58,24 +60,26 @@ RSpec.describe BulkImports::EntityWorker do ...@@ -58,24 +60,26 @@ RSpec.describe BulkImports::EntityWorker do
expect(BulkImports::PipelineWorker) expect(BulkImports::PipelineWorker)
.to receive(:perform_async) .to receive(:perform_async)
.and_raise(exception) .and_raise(exception)
expect_next_instance_of(Gitlab::Import::Logger) do |logger| expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger) expect(logger)
.to receive(:info).twice .to receive(:info).twice
.with( .with(
worker: described_class.name, hash_including(
entity_id: entity.id, 'entity_id' => entity.id,
current_stage: nil 'current_stage' => nil
)
) )
expect(logger) expect(logger)
.to receive(:error) .to receive(:error)
.with( .with(
worker: described_class.name, hash_including(
entity_id: entity.id, 'entity_id' => entity.id,
current_stage: nil, 'current_stage' => nil,
error_message: 'Error!' 'message' => 'Error!'
)
) )
end end
...@@ -90,6 +94,18 @@ RSpec.describe BulkImports::EntityWorker do ...@@ -90,6 +94,18 @@ RSpec.describe BulkImports::EntityWorker do
let(:job_args) { [entity.id, 0] } let(:job_args) { [entity.id, 0] }
it 'do not enqueue a new pipeline job if the current stage still running' do it 'do not enqueue a new pipeline job if the current stage still running' do
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger)
.to receive(:info).twice
.with(
hash_including(
'entity_id' => entity.id,
'current_stage' => 0,
'message' => 'Stage running'
)
)
end
expect(BulkImports::PipelineWorker) expect(BulkImports::PipelineWorker)
.not_to receive(:perform_async) .not_to receive(:perform_async)
...@@ -110,9 +126,10 @@ RSpec.describe BulkImports::EntityWorker do ...@@ -110,9 +126,10 @@ RSpec.describe BulkImports::EntityWorker do
expect(logger) expect(logger)
.to receive(:info).twice .to receive(:info).twice
.with( .with(
worker: described_class.name, hash_including(
entity_id: entity.id, 'entity_id' => entity.id,
current_stage: 0 'current_stage' => 0
)
) )
end end
......
...@@ -35,14 +35,16 @@ RSpec.describe BulkImports::ExportRequestWorker do ...@@ -35,14 +35,16 @@ RSpec.describe BulkImports::ExportRequestWorker do
expect(client).to receive(:post).and_raise(BulkImports::NetworkError, 'Export error').twice expect(client).to receive(:post).and_raise(BulkImports::NetworkError, 'Export error').twice
end end
expect(Gitlab::Import::Logger).to receive(:warn).with( expect(Gitlab::Import::Logger).to receive(:error).with(
bulk_import_entity_id: entity.id, hash_including(
pipeline_class: 'ExportRequestWorker', 'bulk_import_entity_id' => entity.id,
exception_class: 'BulkImports::NetworkError', 'pipeline_class' => 'ExportRequestWorker',
exception_message: 'Export error', 'exception_class' => 'BulkImports::NetworkError',
correlation_id_value: anything, 'exception_message' => 'Export error',
bulk_import_id: bulk_import.id, 'correlation_id_value' => anything,
bulk_import_entity_type: entity.source_type 'bulk_import_id' => bulk_import.id,
'bulk_import_entity_type' => entity.source_type
)
).twice ).twice
perform_multiple(job_args) perform_multiple(job_args)
......
...@@ -34,9 +34,10 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -34,9 +34,10 @@ RSpec.describe BulkImports::PipelineWorker do
expect(logger) expect(logger)
.to receive(:info) .to receive(:info)
.with( .with(
worker: described_class.name, hash_including(
pipeline_name: 'FakePipeline', 'pipeline_name' => 'FakePipeline',
entity_id: entity.id 'entity_id' => entity.id
)
) )
end end
...@@ -44,7 +45,7 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -44,7 +45,7 @@ RSpec.describe BulkImports::PipelineWorker do
.to receive(:perform_async) .to receive(:perform_async)
.with(entity.id, pipeline_tracker.stage) .with(entity.id, pipeline_tracker.stage)
expect(subject).to receive(:jid).and_return('jid') allow(subject).to receive(:jid).and_return('jid')
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
...@@ -79,10 +80,11 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -79,10 +80,11 @@ RSpec.describe BulkImports::PipelineWorker do
expect(logger) expect(logger)
.to receive(:error) .to receive(:error)
.with( .with(
worker: described_class.name, hash_including(
pipeline_tracker_id: pipeline_tracker.id, 'pipeline_tracker_id' => pipeline_tracker.id,
entity_id: entity.id, 'entity_id' => entity.id,
message: 'Unstarted pipeline not found' 'message' => 'Unstarted pipeline not found'
)
) )
end end
...@@ -107,10 +109,11 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -107,10 +109,11 @@ RSpec.describe BulkImports::PipelineWorker do
expect(logger) expect(logger)
.to receive(:error) .to receive(:error)
.with( .with(
worker: described_class.name, hash_including(
pipeline_name: 'InexistentPipeline', 'pipeline_name' => 'InexistentPipeline',
entity_id: entity.id, 'entity_id' => entity.id,
message: "'InexistentPipeline' is not a valid BulkImport Pipeline" 'message' => "'InexistentPipeline' is not a valid BulkImport Pipeline"
)
) )
end end
...@@ -126,7 +129,7 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -126,7 +129,7 @@ RSpec.describe BulkImports::PipelineWorker do
.to receive(:perform_async) .to receive(:perform_async)
.with(entity.id, pipeline_tracker.stage) .with(entity.id, pipeline_tracker.stage)
expect(subject).to receive(:jid).and_return('jid') allow(subject).to receive(:jid).and_return('jid')
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
...@@ -151,10 +154,11 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -151,10 +154,11 @@ RSpec.describe BulkImports::PipelineWorker do
expect(logger) expect(logger)
.to receive(:error) .to receive(:error)
.with( .with(
worker: described_class.name, hash_including(
pipeline_name: 'Pipeline', 'pipeline_name' => 'Pipeline',
entity_id: entity.id, 'entity_id' => entity.id,
message: 'Failed entity status' 'message' => 'Failed entity status'
)
) )
end end
...@@ -183,7 +187,7 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -183,7 +187,7 @@ RSpec.describe BulkImports::PipelineWorker do
.and_raise(exception) .and_raise(exception)
end end
expect(subject).to receive(:jid).and_return('jid').twice allow(subject).to receive(:jid).and_return('jid')
expect_any_instance_of(BulkImports::Tracker) do |tracker| expect_any_instance_of(BulkImports::Tracker) do |tracker|
expect(tracker).to receive(:retry).and_call_original expect(tracker).to receive(:retry).and_call_original
...@@ -193,9 +197,10 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -193,9 +197,10 @@ RSpec.describe BulkImports::PipelineWorker do
expect(logger) expect(logger)
.to receive(:info) .to receive(:info)
.with( .with(
worker: described_class.name, hash_including(
pipeline_name: 'FakePipeline', 'pipeline_name' => 'FakePipeline',
entity_id: entity.id 'entity_id' => entity.id
)
) )
end end
...@@ -292,10 +297,11 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -292,10 +297,11 @@ RSpec.describe BulkImports::PipelineWorker do
expect(logger) expect(logger)
.to receive(:error) .to receive(:error)
.with( .with(
worker: described_class.name, hash_including(
pipeline_name: 'NdjsonPipeline', 'pipeline_name' => 'NdjsonPipeline',
entity_id: entity.id, 'entity_id' => entity.id,
message: 'Pipeline timeout' 'message' => 'Pipeline timeout'
)
) )
end end
...@@ -318,10 +324,11 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -318,10 +324,11 @@ RSpec.describe BulkImports::PipelineWorker do
expect(logger) expect(logger)
.to receive(:error) .to receive(:error)
.with( .with(
worker: described_class.name, hash_including(
pipeline_name: 'NdjsonPipeline', 'pipeline_name' => 'NdjsonPipeline',
entity_id: entity.id, 'entity_id' => entity.id,
message: 'Error!' 'message' => 'Error!'
)
) )
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment