Commit f66f700f authored by Fabio Pitino's avatar Fabio Pitino

Merge branch 'bvl-limit-elastic-control-context' into 'master'

Limit the context for paused elasticsearch jobs

See merge request gitlab-org/gitlab!41297
parents 98e6883a 8e6bb77d
...@@ -6,6 +6,7 @@ module Elastic ...@@ -6,6 +6,7 @@ module Elastic
# This class should only be used with sidekiq workers which extend Elastic::IndexingControl module # This class should only be used with sidekiq workers which extend Elastic::IndexingControl module
class IndexingControlService class IndexingControlService
LIMIT = 1000 LIMIT = 1000
PROJECT_CONTEXT_KEY = "#{Labkit::Context::LOG_KEY}.project"
def initialize(klass) def initialize(klass)
raise ArgumentError, "passed class must extend Elastic::IndexingControl" unless klass.include?(Elastic::IndexingControl) raise ArgumentError, "passed class must extend Elastic::IndexingControl" unless klass.include?(Elastic::IndexingControl)
...@@ -82,7 +83,8 @@ module Elastic ...@@ -82,7 +83,8 @@ module Elastic
def serialize(args, context) def serialize(args, context)
{ {
args: args, args: args,
context: context # Only include part of the context that would not prevent deduplication
context: context.slice(PROJECT_CONTEXT_KEY)
}.to_json }.to_json
end end
......
---
title: Limit the context for paused elasticsearch jobs
merge_request: 41297
author:
type: performance
...@@ -14,8 +14,16 @@ RSpec.describe Elastic::IndexingControlService, :clean_gitlab_redis_shared_state ...@@ -14,8 +14,16 @@ RSpec.describe Elastic::IndexingControlService, :clean_gitlab_redis_shared_state
end end
end end
let(:worker_context) do
{ 'correlation_id' => 'context_correlation_id',
'meta.project' => 'gitlab-org/gitlab' }
end
let(:stored_context) do
{ "#{Labkit::Context::LOG_KEY}.project" => 'gitlab-org/gitlab' }
end
let(:worker_args) { [1, 2] } let(:worker_args) { [1, 2] }
let(:worker_context) { { 'correlation_id' => 'context_correlation_id' } }
subject { described_class.new(worker_class) } subject { described_class.new(worker_class) }
...@@ -79,6 +87,17 @@ RSpec.describe Elastic::IndexingControlService, :clean_gitlab_redis_shared_state ...@@ -79,6 +87,17 @@ RSpec.describe Elastic::IndexingControlService, :clean_gitlab_redis_shared_state
2.times { subject.add_to_waiting_queue!(worker_args, worker_context) } 2.times { subject.add_to_waiting_queue!(worker_args, worker_context) }
end.to change { subject.queue_size }.from(0).to(1) end.to change { subject.queue_size }.from(0).to(1)
end end
it 'only stores `project` context information' do
subject.add_to_waiting_queue!(worker_args, worker_context)
subject.send(:with_redis) do |r|
set_key = subject.send(:redis_set_key)
stored_job = subject.send(:deserialize, r.zrange(set_key, 0, -1).first)
expect(stored_job['context']).to eq(stored_context)
end
end
end end
describe '#has_jobs_in_waiting_queue?' do describe '#has_jobs_in_waiting_queue?' do
...@@ -114,7 +133,7 @@ RSpec.describe Elastic::IndexingControlService, :clean_gitlab_redis_shared_state ...@@ -114,7 +133,7 @@ RSpec.describe Elastic::IndexingControlService, :clean_gitlab_redis_shared_state
subject.add_to_waiting_queue!(j, worker_context) subject.add_to_waiting_queue!(j, worker_context)
end end
expect(Labkit::Context).to receive(:with_context).with(worker_context).exactly(jobs.count).times.and_call_original expect(Labkit::Context).to receive(:with_context).with(stored_context).exactly(jobs.count).times.and_call_original
expect(worker_class).to receive(:perform_async).exactly(jobs.count).times expect(worker_class).to receive(:perform_async).exactly(jobs.count).times
expect { subject.resume_processing! }.to change { subject.has_jobs_in_waiting_queue? }.from(true).to(false) expect { subject.resume_processing! }.to change { subject.has_jobs_in_waiting_queue? }.from(true).to(false)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment