Commit af1dacb3 authored by Bob Van Landuyt's avatar Bob Van Landuyt

Merge branch '241378_until_executed_deduplication_strategy' into 'master'

Add until_executed deduplication strategy

See merge request gitlab-org/gitlab!42223
parents fb97ca0d 4774e92c
......@@ -136,9 +136,10 @@ module Ci
# We are using optimistic locking combined with Redis locking to ensure
# that a chunk gets migrated properly.
#
# We are catching an exception related to an exclusive lock not being
# acquired because it is creating a lot of noise, and is a result of
# duplicated workers running in parallel for the same build trace chunk.
# We are using until_executed deduplication strategy for workers,
# which should prevent duplicated workers running in parallel for the same build trace,
# and causing an exception related to an exclusive lock not being
# acquired
#
def persist_data!
in_lock(*lock_params) do # exclusive Redis lock is acquired first
......@@ -150,6 +151,8 @@ module Ci
end
rescue FailedToObtainLockError
metrics.increment_trace_operation(operation: :stalled)
raise FailedToPersistDataError, 'Data migration failed due to a worker duplication'
rescue ActiveRecord::StaleObjectError
raise FailedToPersistDataError, <<~MSG
Data migration race condition detected
......
......@@ -5,6 +5,8 @@ module Ci
include ApplicationWorker
include PipelineBackgroundQueue
deduplicate :until_executed
idempotent!
# rubocop: disable CodeReuse/ActiveRecord
......
---
title: Add until_executed deduplication strategy
merge_request: 42223
author:
type: added
......@@ -165,6 +165,22 @@ job. The work is skipped because the same work would be
done by the job that was scheduled first; by the time the second
job executed, the first job would do nothing.
#### Strategies
GitLab supports two deduplication strategies:
- `until_executing`
- `until_executed`
More [deduplication strategies have been
suggested](https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/195). If
you are implementing a worker that could benefit from a different
strategy, please comment in the issue.
##### Until Executing
This strategy takes a lock when a job is added to the queue, and removes that lock before the job starts.
For example, `AuthorizedProjectsWorker` takes a user ID. When the
worker runs, it recalculates a user's authorizations. GitLab schedules
this job each time an action potentially changes a user's
......@@ -173,10 +189,47 @@ same time, the second job can be skipped if the first job hasn't
begun, because when the first job runs, it creates the
authorizations for both projects.
```ruby
module AuthorizedProjectUpdate
class UserRefreshOverUserRangeWorker
include ApplicationWorker
deduplicate :until_executing
idempotent!
# ...
end
end
```
##### Until Executed
This strategy takes a lock when a job is added to the queue, and removes that lock after the job finishes.
It can be used to prevent jobs from running simultaneously multiple times.
```ruby
module Ci
class BuildTraceChunkFlushWorker
include ApplicationWorker
deduplicate :until_executed
idempotent!
# ...
end
end
```
#### Scheduling jobs in the future
GitLab doesn't skip jobs scheduled in the future, as we assume that
the state will have changed by the time the job is scheduled to
execute. If you do want to deduplicate jobs scheduled in the future
this can be specified on the worker as follows:
execute. Deduplication of jobs scheduled in the feature is possible
for both `until_executed` and `until_executing` strategies.
If you do want to deduplicate jobs scheduled in the future,
this can be specified on the worker by passing `including_scheduled: true` argument
when defining deduplication strategy:
```ruby
module AuthorizedProjectUpdate
......@@ -191,11 +244,7 @@ module AuthorizedProjectUpdate
end
```
This strategy is called `until_executing`. More [deduplication
strategies have been
suggested](https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/195). If
you are implementing a worker that could benefit from a different
strategy, please comment in the issue.
#### Troubleshooting
If the automatic deduplication were to cause issues in certain
queues. This can be temporarily disabled by enabling a feature flag
......
......@@ -8,6 +8,7 @@ module Gitlab
STRATEGIES = {
until_executing: UntilExecuting,
until_executed: UntilExecuted,
none: None
}.freeze
......
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module DuplicateJobs
module Strategies
class Base
def initialize(duplicate_job)
@duplicate_job = duplicate_job
end
def schedule(job)
raise NotImplementedError
end
def perform(_job)
raise NotImplementedError
end
private
attr_reader :duplicate_job
def strategy_name
self.class.name.to_s.demodulize.underscore.humanize.downcase
end
def check!
# The default expiry time is the DuplicateJob::DUPLICATE_KEY_TTL already
# Only the strategies de-duplicating when scheduling
duplicate_job.check!
end
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module DuplicateJobs
module Strategies
module DeduplicatesWhenScheduling
def initialize(duplicate_job)
@duplicate_job = duplicate_job
end
def schedule(job)
if deduplicatable_job? && check! && duplicate_job.duplicate?
job['duplicate-of'] = duplicate_job.existing_jid
if duplicate_job.droppable?
Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(
job, "dropped #{strategy_name}", duplicate_job.options)
return false
end
end
yield
end
private
def deduplicatable_job?
!duplicate_job.scheduled? || duplicate_job.options[:including_scheduled]
end
def check!
duplicate_job.check!(expiry)
end
def expiry
return DuplicateJob::DUPLICATE_KEY_TTL unless duplicate_job.scheduled?
time_diff = duplicate_job.scheduled_at.to_i - Time.now.to_i
time_diff > 0 ? time_diff : DuplicateJob::DUPLICATE_KEY_TTL
end
end
end
end
end
end
......@@ -5,10 +5,7 @@ module Gitlab
module DuplicateJobs
module Strategies
# This strategy will never deduplicate a job
class None
def initialize(_duplicate_job)
end
class None < Base
def schedule(_job)
yield
end
......
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module DuplicateJobs
module Strategies
# This strategy takes a lock before scheduling the job in a queue and
# removes the lock after the job has executed preventing a new job to be queued
# while a job is still executing.
class UntilExecuted < Base
include DeduplicatesWhenScheduling
def perform(_job)
yield
duplicate_job.delete!
end
end
end
end
end
end
......@@ -7,50 +7,14 @@ module Gitlab
# This strategy takes a lock before scheduling the job in a queue and
# removes the lock before the job starts allowing a new job to be queued
# while a job is still executing.
class UntilExecuting
def initialize(duplicate_job)
@duplicate_job = duplicate_job
end
def schedule(job)
if deduplicatable_job? && check! && duplicate_job.duplicate?
job['duplicate-of'] = duplicate_job.existing_jid
if duplicate_job.droppable?
Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(
job, "dropped until executing", duplicate_job.options)
return false
end
end
yield
end
class UntilExecuting < Base
include DeduplicatesWhenScheduling
def perform(_job)
duplicate_job.delete!
yield
end
private
attr_reader :duplicate_job
def deduplicatable_job?
!duplicate_job.scheduled? || duplicate_job.options[:including_scheduled]
end
def check!
duplicate_job.check!(expiry)
end
def expiry
return DuplicateJob::DUPLICATE_KEY_TTL unless duplicate_job.scheduled?
time_diff = duplicate_job.scheduled_at.to_i - Time.now.to_i
time_diff > 0 ? time_diff : DuplicateJob::DUPLICATE_KEY_TTL
end
end
end
end
......
......@@ -3,79 +3,84 @@
require 'spec_helper'
RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Client, :clean_gitlab_redis_queues do
let(:worker_class) do
Class.new do
def self.name
'TestDeduplicationWorker'
end
shared_context 'deduplication worker class' do |strategy, including_scheduled|
let(:worker_class) do
Class.new do
def self.name
'TestDeduplicationWorker'
end
include ApplicationWorker
deduplicate strategy, including_scheduled: including_scheduled
include ApplicationWorker
include ApplicationWorker
def perform(*args)
def perform(*args)
end
end
end
end
before do
stub_const('TestDeduplicationWorker', worker_class)
before do
stub_const('TestDeduplicationWorker', worker_class)
end
end
describe '#call' do
it 'adds a correct duplicate tag to the jobs', :aggregate_failures do
TestDeduplicationWorker.bulk_perform_async([['args1'], ['args2'], ['args1']])
shared_examples 'client duplicate job' do |strategy|
describe '#call' do
include_context 'deduplication worker class', strategy, false
job1, job2, job3 = TestDeduplicationWorker.jobs
expect(job1['duplicate-of']).to be_nil
expect(job2['duplicate-of']).to be_nil
expect(job3['duplicate-of']).to eq(job1['jid'])
end
context 'without scheduled deduplication' do
it "does not mark a job that's scheduled in the future as a duplicate" do
TestDeduplicationWorker.perform_async('args1')
TestDeduplicationWorker.perform_at(1.day.from_now, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args1')
it 'adds a correct duplicate tag to the jobs', :aggregate_failures do
TestDeduplicationWorker.bulk_perform_async([['args1'], ['args2'], ['args1']])
duplicates = TestDeduplicationWorker.jobs.map { |job| job['duplicate-of'] }
job1, job2, job3 = TestDeduplicationWorker.jobs
expect(duplicates).to all(be_nil)
expect(job1['duplicate-of']).to be_nil
expect(job2['duplicate-of']).to be_nil
expect(job3['duplicate-of']).to eq(job1['jid'])
end
end
context 'with scheduled deduplication' do
let(:scheduled_worker_class) do
Class.new do
def self.name
'TestDeduplicationWorker'
end
include ApplicationWorker
context 'without scheduled deduplication' do
it "does not mark a job that's scheduled in the future as a duplicate" do
TestDeduplicationWorker.perform_async('args1')
TestDeduplicationWorker.perform_at(1.day.from_now, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args1')
deduplicate :until_executing, including_scheduled: true
duplicates = TestDeduplicationWorker.jobs.map { |job| job['duplicate-of'] }
def perform(*args)
end
expect(duplicates).to all(be_nil)
end
end
before do
stub_const('TestDeduplicationWorker', scheduled_worker_class)
end
context 'with scheduled deduplication' do
include_context 'deduplication worker class', strategy, true
it 'adds a correct duplicate tag to the jobs', :aggregate_failures do
TestDeduplicationWorker.perform_async('args1')
TestDeduplicationWorker.perform_at(1.day.from_now, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args2')
before do
stub_const('TestDeduplicationWorker', worker_class)
end
job1, job2, job3, job4 = TestDeduplicationWorker.jobs
it 'adds a correct duplicate tag to the jobs', :aggregate_failures do
TestDeduplicationWorker.perform_async('args1')
TestDeduplicationWorker.perform_at(1.day.from_now, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args2')
expect(job1['duplicate-of']).to be_nil
expect(job2['duplicate-of']).to eq(job1['jid'])
expect(job3['duplicate-of']).to eq(job1['jid'])
expect(job4['duplicate-of']).to be_nil
job1, job2, job3, job4 = TestDeduplicationWorker.jobs
expect(job1['duplicate-of']).to be_nil
expect(job2['duplicate-of']).to eq(job1['jid'])
expect(job3['duplicate-of']).to eq(job1['jid'])
expect(job4['duplicate-of']).to be_nil
end
end
end
end
context 'with until_executing strategy' do
it_behaves_like 'client duplicate job', :until_executing
end
context 'with until_executed strategy' do
it_behaves_like 'client duplicate job', :until_executed
end
end
......@@ -3,39 +3,71 @@
require 'spec_helper'
RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Server, :clean_gitlab_redis_queues do
let(:worker_class) do
Class.new do
def self.name
'TestDeduplicationWorker'
shared_context 'server duplicate job' do |strategy|
let(:worker_class) do
Class.new do
def self.name
'TestDeduplicationWorker'
end
include ApplicationWorker
deduplicate strategy
def perform(*args)
self.class.work
end
def self.work
end
end
end
include ApplicationWorker
before do
stub_const('TestDeduplicationWorker', worker_class)
end
def perform(*args)
around do |example|
with_sidekiq_server_middleware do |chain|
chain.add described_class
Sidekiq::Testing.inline! { example.run }
end
end
end
before do
stub_const('TestDeduplicationWorker', worker_class)
end
context 'with until_executing strategy' do
include_context 'server duplicate job', :until_executing
around do |example|
with_sidekiq_server_middleware do |chain|
chain.add described_class
Sidekiq::Testing.inline! { example.run }
describe '#call' do
it 'removes the stored job from redis before execution' do
bare_job = { 'class' => 'TestDeduplicationWorker', 'args' => ['hello'] }
job_definition = Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob.new(bare_job.dup, 'test_deduplication')
expect(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob)
.to receive(:new).with(a_hash_including(bare_job), 'test_deduplication')
.and_return(job_definition).twice # once in client middleware
expect(job_definition).to receive(:delete!).ordered.and_call_original
expect(TestDeduplicationWorker).to receive(:work).ordered.and_call_original
TestDeduplicationWorker.perform_async('hello')
end
end
end
describe '#call' do
it 'removes the stored job from redis' do
context 'with until_executed strategy' do
include_context 'server duplicate job', :until_executed
it 'removes the stored job from redis after execution' do
bare_job = { 'class' => 'TestDeduplicationWorker', 'args' => ['hello'] }
job_definition = Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob.new(bare_job.dup, 'test_deduplication')
expect(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob)
.to receive(:new).with(a_hash_including(bare_job), 'test_deduplication')
.and_return(job_definition).twice # once in client middleware
expect(job_definition).to receive(:delete!).and_call_original
expect(TestDeduplicationWorker).to receive(:work).ordered.and_call_original
expect(job_definition).to receive(:delete!).ordered.and_call_original
TestDeduplicationWorker.perform_async('hello')
end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuted do
it_behaves_like 'deduplicating jobs when scheduling', :until_executed do
describe '#perform' do
let(:proc) { -> {} }
it 'deletes the lock after executing' do
expect(proc).to receive(:call).ordered
expect(fake_duplicate_job).to receive(:delete!).ordered
strategy.perform({}) do
proc.call
end
end
end
end
end
# frozen_string_literal: true
require 'fast_spec_helper'
require 'spec_helper'
RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting do
let(:fake_duplicate_job) do
instance_double(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob)
end
subject(:strategy) { described_class.new(fake_duplicate_job) }
describe '#schedule' do
before do
allow(Gitlab::SidekiqLogging::DeduplicationLogger.instance).to receive(:log)
end
it 'checks for duplicates before yielding' do
expect(fake_duplicate_job).to receive(:scheduled?).twice.ordered.and_return(false)
expect(fake_duplicate_job).to(
receive(:check!)
.with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL)
.ordered
.and_return('a jid'))
expect(fake_duplicate_job).to receive(:duplicate?).ordered.and_return(false)
expect { |b| strategy.schedule({}, &b) }.to yield_control
end
it 'checks worker options for scheduled jobs' do
expect(fake_duplicate_job).to receive(:scheduled?).ordered.and_return(true)
expect(fake_duplicate_job).to receive(:options).ordered.and_return({})
expect(fake_duplicate_job).not_to receive(:check!)
expect { |b| strategy.schedule({}, &b) }.to yield_control
end
context 'job marking' do
it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
allow(fake_duplicate_job).to receive(:options).and_return({})
job_hash = {}
it_behaves_like 'deduplicating jobs when scheduling', :until_executing do
describe '#perform' do
let(:proc) { -> {} }
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
it 'deletes the lock before executing' do
expect(fake_duplicate_job).to receive(:delete!).ordered
expect(proc).to receive(:call).ordered
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
context 'scheduled jobs' do
let(:time_diff) { 1.minute }
context 'scheduled in the past' do
it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:scheduled?).twice.and_return(true)
allow(fake_duplicate_job).to receive(:scheduled_at).and_return(Time.now - time_diff)
allow(fake_duplicate_job).to receive(:options).and_return({ including_scheduled: true })
allow(fake_duplicate_job).to(
receive(:check!)
.with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL)
.and_return('the jid'))
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
strategy.perform({}) do
proc.call
end
context 'scheduled in the future' do
it 'adds the jid of the existing job to the job hash' do
freeze_time do
allow(fake_duplicate_job).to receive(:scheduled?).twice.and_return(true)
allow(fake_duplicate_job).to receive(:scheduled_at).and_return(Time.now + time_diff)
allow(fake_duplicate_job).to receive(:options).and_return({ including_scheduled: true })
allow(fake_duplicate_job).to(
receive(:check!).with(time_diff.to_i).and_return('the jid'))
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
end
end
end
end
context "when the job is droppable" do
before do
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:duplicate?).and_return(true)
allow(fake_duplicate_job).to receive(:options).and_return({})
allow(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
end
it 'drops the job' do
schedule_result = nil
expect(fake_duplicate_job).to receive(:droppable?).and_return(true)
expect { |b| schedule_result = strategy.schedule({}, &b) }.not_to yield_control
expect(schedule_result).to be(false)
end
it 'logs that the job was dropped' do
fake_logger = instance_double(Gitlab::SidekiqLogging::DeduplicationLogger)
expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger)
expect(fake_logger).to receive(:log).with(a_hash_including({ 'jid' => 'new jid' }), 'dropped until executing', {})
strategy.schedule({ 'jid' => 'new jid' }) {}
end
it 'logs the deduplication options of the worker' do
fake_logger = instance_double(Gitlab::SidekiqLogging::DeduplicationLogger)
expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger)
allow(fake_duplicate_job).to receive(:options).and_return({ foo: :bar })
expect(fake_logger).to receive(:log).with(a_hash_including({ 'jid' => 'new jid' }), 'dropped until executing', { foo: :bar })
strategy.schedule({ 'jid' => 'new jid' }) {}
end
end
end
describe '#perform' do
it 'deletes the lock before executing' do
expect(fake_duplicate_job).to receive(:delete!).ordered
expect { |b| strategy.perform({}, &b) }.to yield_control
end
end
end
......@@ -8,6 +8,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies do
expect(described_class.for(:until_executing)).to eq(described_class::UntilExecuting)
end
it 'returns the right class for `until_executed`' do
expect(described_class.for(:until_executed)).to eq(described_class::UntilExecuted)
end
it 'returns the right class for `none`' do
expect(described_class.for(:none)).to eq(described_class::None)
end
......
......@@ -594,23 +594,19 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
context 'when the chunk is being locked by a different worker' do
let(:metrics) { spy('metrics') }
it 'does not raise an exception' do
lock_chunk do
expect { build_trace_chunk.persist_data! }.not_to raise_error
end
end
it 'increments stalled chunk trace metric' do
allow(build_trace_chunk)
.to receive(:metrics)
.and_return(metrics)
lock_chunk { build_trace_chunk.persist_data! }
expect do
subject
expect(metrics)
.to have_received(:increment_trace_operation)
.with(operation: :stalled)
.once
expect(metrics)
.to have_received(:increment_trace_operation)
.with(operation: :stalled)
.once
end.to raise_error(described_class::FailedToPersistDataError)
end
def lock_chunk(&block)
......
# frozen_string_literal: true
RSpec.shared_examples 'deduplicating jobs when scheduling' do |strategy_name|
let(:fake_duplicate_job) do
instance_double(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob)
end
let(:expected_message) { "dropped #{strategy_name.to_s.humanize.downcase}" }
subject(:strategy) { Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies.for(strategy_name).new(fake_duplicate_job) }
describe '#schedule' do
before do
allow(Gitlab::SidekiqLogging::DeduplicationLogger.instance).to receive(:log)
end
it 'checks for duplicates before yielding' do
expect(fake_duplicate_job).to receive(:scheduled?).twice.ordered.and_return(false)
expect(fake_duplicate_job).to(
receive(:check!)
.with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL)
.ordered
.and_return('a jid'))
expect(fake_duplicate_job).to receive(:duplicate?).ordered.and_return(false)
expect { |b| strategy.schedule({}, &b) }.to yield_control
end
it 'checks worker options for scheduled jobs' do
expect(fake_duplicate_job).to receive(:scheduled?).ordered.and_return(true)
expect(fake_duplicate_job).to receive(:options).ordered.and_return({})
expect(fake_duplicate_job).not_to receive(:check!)
expect { |b| strategy.schedule({}, &b) }.to yield_control
end
context 'job marking' do
it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
allow(fake_duplicate_job).to receive(:options).and_return({})
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
context 'scheduled jobs' do
let(:time_diff) { 1.minute }
context 'scheduled in the past' do
it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:scheduled?).twice.and_return(true)
allow(fake_duplicate_job).to receive(:scheduled_at).and_return(Time.now - time_diff)
allow(fake_duplicate_job).to receive(:options).and_return({ including_scheduled: true })
allow(fake_duplicate_job).to(
receive(:check!)
.with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL)
.and_return('the jid'))
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
end
context 'scheduled in the future' do
it 'adds the jid of the existing job to the job hash' do
freeze_time do
allow(fake_duplicate_job).to receive(:scheduled?).twice.and_return(true)
allow(fake_duplicate_job).to receive(:scheduled_at).and_return(Time.now + time_diff)
allow(fake_duplicate_job).to receive(:options).and_return({ including_scheduled: true })
allow(fake_duplicate_job).to(
receive(:check!).with(time_diff.to_i).and_return('the jid'))
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
end
end
end
end
context "when the job is droppable" do
before do
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:duplicate?).and_return(true)
allow(fake_duplicate_job).to receive(:options).and_return({})
allow(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
end
it 'drops the job' do
schedule_result = nil
expect(fake_duplicate_job).to receive(:droppable?).and_return(true)
expect { |b| schedule_result = strategy.schedule({}, &b) }.not_to yield_control
expect(schedule_result).to be(false)
end
it 'logs that the job was dropped' do
fake_logger = instance_double(Gitlab::SidekiqLogging::DeduplicationLogger)
expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger)
expect(fake_logger).to receive(:log).with(a_hash_including({ 'jid' => 'new jid' }), expected_message, {})
strategy.schedule({ 'jid' => 'new jid' }) {}
end
it 'logs the deduplication options of the worker' do
fake_logger = instance_double(Gitlab::SidekiqLogging::DeduplicationLogger)
expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger)
allow(fake_duplicate_job).to receive(:options).and_return({ foo: :bar })
expect(fake_logger).to receive(:log).with(a_hash_including({ 'jid' => 'new jid' }), expected_message, { foo: :bar })
strategy.schedule({ 'jid' => 'new jid' }) {}
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment