Commit 8ae4857e authored by Terri Chu's avatar Terri Chu Committed by Douglas Barbosa Alexandre

Resolve BulkIndexer flush returning nil values

parent f86786ed
---
title: Fix BulkIndexer flush returning nil values after failure.
merge_request: 36716
author:
type: fixed
...@@ -17,19 +17,24 @@ module Gitlab ...@@ -17,19 +17,24 @@ module Gitlab
attr_reader :logger, :failures attr_reader :logger, :failures
# body - array of json formatted index operation requests awaiting submission to elasticsearch in bulk
# body_size_bytes - total size in bytes of each json element in body array
# failures - array of records that had a failure during submission to elasticsearch
# logger - set the logger used by instance
# ref_buffer - records awaiting submission to elasticsearch
# cleared if `try_send_bulk` is successful
# flushed into `failures` if `try_send_bulk` fails
def initialize(logger:) def initialize(logger:)
@body = [] @body = []
@body_size_bytes = 0 @body_size_bytes = 0
@failures = [] @failures = []
@logger = logger @logger = logger
@ref_cache = [] @ref_buffer = []
end end
# Adds or removes a document in elasticsearch, depending on whether the # Adds or removes a document in elasticsearch, depending on whether the
# database record it refers to can be found # database record it refers to can be found
def process(ref) def process(ref)
ref_cache << ref
if ref.database_record if ref.database_record
index(ref) index(ref)
else else
...@@ -48,23 +53,23 @@ module Gitlab ...@@ -48,23 +53,23 @@ module Gitlab
def reset! def reset!
@body = [] @body = []
@body_size_bytes = 0 @body_size_bytes = 0
@ref_cache = [] @ref_buffer = []
end end
attr_reader :body, :body_size_bytes, :ref_cache attr_reader :body, :body_size_bytes, :ref_buffer
def index(ref) def index(ref)
proxy = ref.database_record.__elasticsearch__ proxy = ref.database_record.__elasticsearch__
op = build_op(ref, proxy) op = build_op(ref, proxy)
submit({ index: op }, proxy.as_indexed_json) submit(ref, { index: op }, proxy.as_indexed_json)
end end
def delete(ref) def delete(ref)
proxy = ref.klass.__elasticsearch__ proxy = ref.klass.__elasticsearch__
op = build_op(ref, proxy) op = build_op(ref, proxy)
submit(delete: op) submit(ref, delete: op)
end end
def build_op(ref, proxy) def build_op(ref, proxy)
...@@ -83,12 +88,17 @@ module Gitlab ...@@ -83,12 +88,17 @@ module Gitlab
Gitlab::CurrentSettings.elasticsearch_max_bulk_size_mb.megabytes Gitlab::CurrentSettings.elasticsearch_max_bulk_size_mb.megabytes
end end
def submit(*hashes) def submit(ref, *hashes)
jsons = hashes.map(&:to_json) jsons = hashes.map(&:to_json)
bytesize = calculate_bytesize(jsons) bytesize = calculate_bytesize(jsons)
# if new ref will exceed the bulk limit, send existing buffer of records
# when successful, clears `body`, `ref_buffer`, and `body_size_bytes`
# continue to buffer refs until bulk limit is reached or flush is called
# any errors encountered are added to `failures`
send_bulk if will_exceed_bulk_limit?(bytesize) send_bulk if will_exceed_bulk_limit?(bytesize)
ref_buffer << ref
body.concat(jsons) body.concat(jsons)
@body_size_bytes += bytesize @body_size_bytes += bytesize
end end
...@@ -111,7 +121,7 @@ module Gitlab ...@@ -111,7 +121,7 @@ module Gitlab
logger.info( logger.info(
message: 'bulk_submitted', message: 'bulk_submitted',
body_size_bytes: body_size_bytes, body_size_bytes: body_size_bytes,
bulk_count: ref_cache.count, bulk_count: ref_buffer.count,
errors_count: failed_refs.count errors_count: failed_refs.count
) )
...@@ -128,7 +138,7 @@ module Gitlab ...@@ -128,7 +138,7 @@ module Gitlab
# If an exception is raised, treat the entire bulk as failed # If an exception is raised, treat the entire bulk as failed
logger.error(message: 'bulk_exception', error_class: err.class.to_s, error_message: err.message) logger.error(message: 'bulk_exception', error_class: err.class.to_s, error_message: err.message)
ref_cache ref_buffer
end end
def process_errors(result) def process_errors(result)
...@@ -145,7 +155,7 @@ module Gitlab ...@@ -145,7 +155,7 @@ module Gitlab
if op.nil? || op['error'] if op.nil? || op['error']
logger.warn(message: 'bulk_error', item: item) logger.warn(message: 'bulk_error', item: item)
out << ref_cache[i] out << ref_buffer[i]
end end
end end
......
...@@ -56,6 +56,7 @@ RSpec.describe Gitlab::Elastic::BulkIndexer, :elastic do ...@@ -56,6 +56,7 @@ RSpec.describe Gitlab::Elastic::BulkIndexer, :elastic do
body_bytesize = args[:body].map(&:bytesize).reduce(:+) body_bytesize = args[:body].map(&:bytesize).reduce(:+)
expect(body_bytesize).to be <= bulk_limit_bytes expect(body_bytesize).to be <= bulk_limit_bytes
end end
expect(indexer.failures).to be_empty expect(indexer.failures).to be_empty
end end
end end
...@@ -99,6 +100,25 @@ RSpec.describe Gitlab::Elastic::BulkIndexer, :elastic do ...@@ -99,6 +100,25 @@ RSpec.describe Gitlab::Elastic::BulkIndexer, :elastic do
expect(indexer.failures).to contain_exactly(issue_as_ref, other_issue_as_ref) expect(indexer.failures).to contain_exactly(issue_as_ref, other_issue_as_ref)
end end
it 'fails a document correctly on exception after adding an item that exceeded the bulk limit' do
bulk_limit_bytes = (issue_as_json_with_times.to_json.bytesize * 1.5).to_i
set_bulk_limit(indexer, bulk_limit_bytes)
indexer.process(issue_as_ref)
allow(es_client).to receive(:bulk).and_return({})
indexer.process(issue_as_ref)
expect(es_client).to have_received(:bulk) do |args|
body_bytesize = args[:body].map(&:bytesize).reduce(:+)
expect(body_bytesize).to be <= bulk_limit_bytes
end
expect(es_client).to receive(:bulk) { raise 'An exception' }
expect(indexer.flush).to contain_exactly(issue_as_ref)
expect(indexer.failures).to contain_exactly(issue_as_ref)
end
context 'indexing an issue' do context 'indexing an issue' do
it 'adds the issue to the index' do it 'adds the issue to the index' do
expect(indexer.process(issue_as_ref).flush).to be_empty expect(indexer.process(issue_as_ref).flush).to be_empty
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment