Commit 8e6bb77d authored by Bob Van Landuyt's avatar Bob Van Landuyt

Limit the context for paused elasticsearch jobs

When we pause elasticsearch indexing, we store the jobs in a ZSET. The
advantage of that would be that the jobs are automatically
deduplicated. Thanks Redis!

However, since the application context contains unique
information (correlation_id, user, caller_id), we would lose this
benefit.

With this, we only include the shared information (the `meta.project`
key), which is shared across the deduplicatable jobs.

The project information should be enough context for logging purposes:
The namespace can be inferred from that, and the caller_id should be
populated again when the job is rescheduled.
parent d96d4e10
......@@ -6,6 +6,7 @@ module Elastic
# This class should only be used with sidekiq workers which extend Elastic::IndexingControl module
class IndexingControlService
LIMIT = 1000
PROJECT_CONTEXT_KEY = "#{Labkit::Context::LOG_KEY}.project"
def initialize(klass)
raise ArgumentError, "passed class must extend Elastic::IndexingControl" unless klass.include?(Elastic::IndexingControl)
......@@ -82,7 +83,8 @@ module Elastic
def serialize(args, context)
{
args: args,
context: context
# Only include part of the context that would not prevent deduplication
context: context.slice(PROJECT_CONTEXT_KEY)
}.to_json
end
......
---
title: Limit the context for paused elasticsearch jobs
merge_request: 41297
author:
type: performance
......@@ -14,8 +14,16 @@ RSpec.describe Elastic::IndexingControlService, :clean_gitlab_redis_shared_state
end
end
let(:worker_context) do
{ 'correlation_id' => 'context_correlation_id',
'meta.project' => 'gitlab-org/gitlab' }
end
let(:stored_context) do
{ "#{Labkit::Context::LOG_KEY}.project" => 'gitlab-org/gitlab' }
end
let(:worker_args) { [1, 2] }
let(:worker_context) { { 'correlation_id' => 'context_correlation_id' } }
subject { described_class.new(worker_class) }
......@@ -79,6 +87,17 @@ RSpec.describe Elastic::IndexingControlService, :clean_gitlab_redis_shared_state
2.times { subject.add_to_waiting_queue!(worker_args, worker_context) }
end.to change { subject.queue_size }.from(0).to(1)
end
it 'only stores `project` context information' do
subject.add_to_waiting_queue!(worker_args, worker_context)
subject.send(:with_redis) do |r|
set_key = subject.send(:redis_set_key)
stored_job = subject.send(:deserialize, r.zrange(set_key, 0, -1).first)
expect(stored_job['context']).to eq(stored_context)
end
end
end
describe '#has_jobs_in_waiting_queue?' do
......@@ -114,7 +133,7 @@ RSpec.describe Elastic::IndexingControlService, :clean_gitlab_redis_shared_state
subject.add_to_waiting_queue!(j, worker_context)
end
expect(Labkit::Context).to receive(:with_context).with(worker_context).exactly(jobs.count).times.and_call_original
expect(Labkit::Context).to receive(:with_context).with(stored_context).exactly(jobs.count).times.and_call_original
expect(worker_class).to receive(:perform_async).exactly(jobs.count).times
expect { subject.resume_processing! }.to change { subject.has_jobs_in_waiting_queue? }.from(true).to(false)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment