Commit 337e2845 authored by Kassio Borges's avatar Kassio Borges

BulkImports: Handle network errors

Add `BulkImports::NetworkError` to wrap network exceptions from
`BulkImports::Client::Graphql` and `BulkImports::Client::HTTP`. The
wrapper has a `#retriable?` method that returns `true` when the
exception is something worth retrying, like a timeout, and didn't happen
more than `BulkImports::NetworkError::MAX_RETRIABLE_COUNT`, currently 3.
The counter is scoped by:
- the exception type;
- `BulkImports::Tracker`;
- `BulkImports::Entity` and;
- `BulkImports::Stage`.

Changelog: added
MR: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/68582
parent 5eaaf70a
...@@ -59,18 +59,35 @@ module BulkImports ...@@ -59,18 +59,35 @@ module BulkImports
pipeline_tracker.pipeline_class.new(context).run pipeline_tracker.pipeline_class.new(context).run
pipeline_tracker.finish! pipeline_tracker.finish!
rescue BulkImports::NetworkError => e
if e.retriable?(pipeline_tracker)
logger.error(
worker: self.class.name,
entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name,
message: "Retrying error: #{e.message}"
)
reenqueue(pipeline_tracker, delay: e.retry_delay)
else
fail_tracker(pipeline_tracker, e)
end
rescue StandardError => e rescue StandardError => e
fail_tracker(pipeline_tracker, e)
end
def fail_tracker(pipeline_tracker, exception)
pipeline_tracker.update!(status_event: 'fail_op', jid: jid) pipeline_tracker.update!(status_event: 'fail_op', jid: jid)
logger.error( logger.error(
worker: self.class.name, worker: self.class.name,
entity_id: pipeline_tracker.entity.id, entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name, pipeline_name: pipeline_tracker.pipeline_name,
message: e.message message: exception.message
) )
Gitlab::ErrorTracking.track_exception( Gitlab::ErrorTracking.track_exception(
e, exception,
entity_id: pipeline_tracker.entity.id, entity_id: pipeline_tracker.entity.id,
pipeline_name: pipeline_tracker.pipeline_name pipeline_name: pipeline_tracker.pipeline_name
) )
...@@ -88,8 +105,13 @@ module BulkImports ...@@ -88,8 +105,13 @@ module BulkImports
(Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT (Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT
end end
def reenqueue(pipeline_tracker) def reenqueue(pipeline_tracker, delay: NDJSON_PIPELINE_PERFORM_DELAY)
self.class.perform_in(NDJSON_PIPELINE_PERFORM_DELAY, pipeline_tracker.id, pipeline_tracker.stage, pipeline_tracker.entity.id) self.class.perform_in(
delay,
pipeline_tracker.id,
pipeline_tracker.stage,
pipeline_tracker.entity.id
)
end end
end end
end end
...@@ -17,6 +17,8 @@ module BulkImports ...@@ -17,6 +17,8 @@ module BulkImports
) )
::Gitlab::Json.parse(response.body) ::Gitlab::Json.parse(response.body)
rescue *Gitlab::HTTP::HTTP_ERRORS => e
raise ::BulkImports::NetworkError, e
end end
end end
private_constant :HTTP private_constant :HTTP
......
...@@ -113,11 +113,11 @@ module BulkImports ...@@ -113,11 +113,11 @@ module BulkImports
def with_error_handling def with_error_handling
response = yield response = yield
raise(::BulkImports::Error, "Error #{response.code}") unless response.success? raise ::BulkImports::NetworkError.new(response: response) unless response.success?
response response
rescue *Gitlab::HTTP::HTTP_ERRORS => e rescue *Gitlab::HTTP::HTTP_ERRORS => e
raise(::BulkImports::Error, e) raise ::BulkImports::NetworkError, e
end end
def api_url def api_url
......
# frozen_string_literal: true
module BulkImports
class NetworkError < Error
COUNTER_KEY = 'bulk_imports/%{entity_id}/%{stage}/%{tracker_id}/network_error/%{error}'
RETRIABLE_EXCEPTIONS = Gitlab::HTTP::HTTP_TIMEOUT_ERRORS
RETRIABLE_HTTP_CODES = [429].freeze
DEFAULT_RETRY_DELAY_SECONDS = 60
MAX_RETRIABLE_COUNT = 3
def initialize(message = nil, response: nil)
raise ArgumentError, 'message or response required' if message.blank? && response.blank?
super(message)
@response = response
end
def retriable?(tracker)
if retriable_exception? || retriable_http_code?
increment(tracker) <= MAX_RETRIABLE_COUNT
else
false
end
end
def retry_delay
if response&.code == 429
response.headers.fetch('Retry-After', DEFAULT_RETRY_DELAY_SECONDS).to_i
else
DEFAULT_RETRY_DELAY_SECONDS
end.seconds
end
private
attr_reader :response
def retriable_exception?
RETRIABLE_EXCEPTIONS.include?(cause&.class)
end
def retriable_http_code?
RETRIABLE_HTTP_CODES.include?(response&.code)
end
def increment(tracker)
key = COUNTER_KEY % {
stage: tracker.stage,
tracker_id: tracker.id,
entity_id: tracker.entity.id,
error: cause.class.name
}
Gitlab::Cache::Import::Caching.increment(key)
end
end
end
...@@ -84,8 +84,10 @@ module Gitlab ...@@ -84,8 +84,10 @@ module Gitlab
key = cache_key_for(raw_key) key = cache_key_for(raw_key)
Redis::Cache.with do |redis| Redis::Cache.with do |redis|
redis.incr(key) value = redis.incr(key)
redis.expire(key, timeout) redis.expire(key, timeout)
value
end end
end end
......
...@@ -32,7 +32,7 @@ RSpec.describe BulkImports::Clients::HTTP do ...@@ -32,7 +32,7 @@ RSpec.describe BulkImports::Clients::HTTP do
it 'raises BulkImports::Error' do it 'raises BulkImports::Error' do
allow(Gitlab::HTTP).to receive(method).and_raise(Errno::ECONNREFUSED) allow(Gitlab::HTTP).to receive(method).and_raise(Errno::ECONNREFUSED)
expect { subject.public_send(method, resource) }.to raise_exception(BulkImports::Error) expect { subject.public_send(method, resource) }.to raise_exception(BulkImports::NetworkError)
end end
end end
...@@ -42,7 +42,7 @@ RSpec.describe BulkImports::Clients::HTTP do ...@@ -42,7 +42,7 @@ RSpec.describe BulkImports::Clients::HTTP do
allow(Gitlab::HTTP).to receive(method).and_return(response_double) allow(Gitlab::HTTP).to receive(method).and_return(response_double)
expect { subject.public_send(method, resource) }.to raise_exception(BulkImports::Error) expect { subject.public_send(method, resource) }.to raise_exception(BulkImports::NetworkError)
end end
end end
end end
...@@ -180,7 +180,11 @@ RSpec.describe BulkImports::Clients::HTTP do ...@@ -180,7 +180,11 @@ RSpec.describe BulkImports::Clients::HTTP do
let(:version) { '13.0.0' } let(:version) { '13.0.0' }
it 'raises an error' do it 'raises an error' do
expect { subject.get(resource) }.to raise_error(::BulkImports::Error, "Unsupported GitLab Version. Minimum Supported Gitlab Version #{BulkImport::MINIMUM_GITLAB_MAJOR_VERSION}.") expect { subject.get(resource) }
.to raise_error(
::BulkImports::Error,
"Unsupported GitLab Version. Minimum Supported Gitlab Version #{BulkImport::MINIMUM_GITLAB_MAJOR_VERSION}."
)
end end
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::NetworkError, :clean_gitlab_redis_cache do
let(:tracker) { double(id: 1, stage: 2, entity: double(id: 3)) }
describe '.new' do
it 'requires either a message or a HTTP response' do
expect { described_class.new }
.to raise_error(ArgumentError, 'message or response required')
end
end
describe '#retriable?' do
it 'returns true for MAX_RETRIABLE_COUNT times when cause if one of RETRIABLE_EXCEPTIONS' do
raise described_class::RETRIABLE_EXCEPTIONS.sample
rescue StandardError => cause
begin
raise described_class, cause
rescue StandardError => exception
described_class::MAX_RETRIABLE_COUNT.times do
expect(exception.retriable?(tracker)).to eq(true)
end
expect(exception.retriable?(tracker)).to eq(false)
end
end
it 'returns true for MAX_RETRIABLE_COUNT times when response is one of RETRIABLE_CODES' do
exception = described_class.new(response: double(code: 429))
described_class::MAX_RETRIABLE_COUNT.times do
expect(exception.retriable?(tracker)).to eq(true)
end
expect(exception.retriable?(tracker)).to eq(false)
end
it 'returns false for other exceptions' do
raise StandardError
rescue StandardError => cause
begin
raise described_class, cause
rescue StandardError => exception
expect(exception.retriable?(tracker)).to eq(false)
end
end
end
describe '#retry_delay' do
it 'returns the default value when there is not a rate limit error' do
exception = described_class.new('foo')
expect(exception.retry_delay).to eq(described_class::DEFAULT_RETRY_DELAY_SECONDS.seconds)
end
context 'when the exception is a rate limit error' do
it 'returns the "Retry-After"' do
exception = described_class.new(response: double(code: 429, headers: { 'Retry-After' => 20 }))
expect(exception.retry_delay).to eq(20.seconds)
end
it 'returns the default value when there is no "Retry-After" header' do
exception = described_class.new(response: double(code: 429, headers: {}))
expect(exception.retry_delay).to eq(described_class::DEFAULT_RETRY_DELAY_SECONDS.seconds)
end
end
end
end
...@@ -58,6 +58,16 @@ RSpec.describe Gitlab::Cache::Import::Caching, :clean_gitlab_redis_cache do ...@@ -58,6 +58,16 @@ RSpec.describe Gitlab::Cache::Import::Caching, :clean_gitlab_redis_cache do
end end
end end
describe '.increment' do
it 'increment a key and returns the current value' do
expect(described_class.increment('foo')).to eq(1)
value = Gitlab::Redis::Cache.with { |r| r.get(described_class.cache_key_for('foo')) }
expect(value.to_i).to eq(1)
end
end
describe '.set_add' do describe '.set_add' do
it 'adds a value to a set' do it 'adds a value to a set' do
described_class.set_add('foo', 10) described_class.set_add('foo', 10)
......
...@@ -126,6 +126,39 @@ RSpec.describe BulkImports::PipelineWorker do ...@@ -126,6 +126,39 @@ RSpec.describe BulkImports::PipelineWorker do
expect(pipeline_tracker.status_name).to eq(:failed) expect(pipeline_tracker.status_name).to eq(:failed)
expect(pipeline_tracker.jid).to eq('jid') expect(pipeline_tracker.jid).to eq('jid')
end end
context 'when it is a network error' do
it 'reenqueue on retriable network errors' do
pipeline_tracker = create(
:bulk_import_tracker,
entity: entity,
pipeline_name: 'FakePipeline'
)
exception = BulkImports::NetworkError.new(
response: double(code: 429, headers: {})
)
expect_next_instance_of(pipeline_class) do |pipeline|
expect(pipeline)
.to receive(:run)
.and_raise(exception)
end
expect(subject).to receive(:jid).and_return('jid')
expect(described_class)
.to receive(:perform_in)
.with(
60.seconds,
pipeline_tracker.id,
pipeline_tracker.stage,
pipeline_tracker.entity.id
)
subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id)
end
end
end end
context 'when ndjson pipeline' do context 'when ndjson pipeline' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment