Commit f40aa0d3 authored by Imre Farkas's avatar Imre Farkas

Deduplicate scheduled Sidekiq jobs

Deduplication for scheduled jobs is disabled by default, and is
controlled via the 'deduplicate' worker attribute.
parent aa69348a
......@@ -119,6 +119,20 @@ module WorkerAttributes
Array(worker_attributes[:tags])
end
def deduplicate(strategy, options = {})
worker_attributes[:deduplication_strategy] = strategy
worker_attributes[:deduplication_options] = options
end
def get_deduplicate_strategy
worker_attributes[:deduplication_strategy] ||
Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DEFAULT_STRATEGY
end
def get_deduplication_options
worker_attributes[:deduplication_options] || {}
end
protected
# Returns a worker attribute declared on this class or its parent class.
......
......@@ -5,9 +5,6 @@ module Gitlab
module DuplicateJobs
class Client
def call(worker_class, job, queue, _redis_pool, &block)
# We don't try to deduplicate jobs that are scheduled in the future
return yield if job['at']
DuplicateJob.new(job, queue).schedule(&block)
end
end
......
......@@ -18,13 +18,13 @@ module Gitlab
# When new jobs can be scheduled again, the strategy calls `#delete`.
class DuplicateJob
DUPLICATE_KEY_TTL = 6.hours
DEFAULT_STRATEGY = :until_executing
attr_reader :existing_jid
def initialize(job, queue_name, strategy: :until_executing)
def initialize(job, queue_name)
@job = job
@queue_name = queue_name
@strategy = strategy
end
# This will continue the middleware chain if the job should be scheduled
......@@ -41,12 +41,12 @@ module Gitlab
end
# This method will return the jid that was set in redis
def check!
def check!(expiry = DUPLICATE_KEY_TTL)
read_jid = nil
Sidekiq.redis do |redis|
redis.multi do |multi|
redis.set(idempotency_key, jid, ex: DUPLICATE_KEY_TTL, nx: true)
redis.set(idempotency_key, jid, ex: expiry, nx: true)
read_jid = redis.get(idempotency_key)
end
end
......@@ -60,6 +60,10 @@ module Gitlab
end
end
def scheduled?
scheduled_at.present?
end
def duplicate?
raise "Call `#check!` first to check for existing duplicates" unless existing_jid
......@@ -67,14 +71,36 @@ module Gitlab
end
def droppable?
idempotent? && duplicate? && ::Feature.disabled?("disable_#{queue_name}_deduplication")
idempotent? && ::Feature.disabled?("disable_#{queue_name}_deduplication")
end
def scheduled_at
job['at']
end
def options
return {} unless worker_klass
return {} unless worker_klass.respond_to?(:get_deduplication_options)
worker_klass.get_deduplication_options
end
private
attr_reader :queue_name, :strategy, :job
attr_reader :queue_name, :job
attr_writer :existing_jid
def worker_klass
@worker_klass ||= worker_class_name.to_s.safe_constantize
end
def strategy
return DEFAULT_STRATEGY unless worker_klass
return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?)
worker_klass.get_deduplicate_strategy
end
def worker_class_name
job['class']
end
......@@ -104,11 +130,10 @@ module Gitlab
end
def idempotent?
worker_class = worker_class_name.to_s.safe_constantize
return false unless worker_class
return false unless worker_class.respond_to?(:idempotent?)
return false unless worker_klass
return false unless worker_klass.respond_to?(:idempotent?)
worker_class.idempotent?
worker_klass.idempotent?
end
end
end
......
......@@ -13,13 +13,13 @@ module Gitlab
end
def schedule(job)
if duplicate_job.check! && duplicate_job.duplicate?
if deduplicatable_job? && check! && duplicate_job.duplicate?
job['duplicate-of'] = duplicate_job.existing_jid
end
if duplicate_job.droppable?
Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(job, "dropped until executing")
return false
if duplicate_job.droppable?
Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(job, "dropped until executing")
return false
end
end
yield
......@@ -34,6 +34,22 @@ module Gitlab
private
attr_reader :duplicate_job
def deduplicatable_job?
!duplicate_job.scheduled? || duplicate_job.options[:including_scheduled]
end
def check!
duplicate_job.check!(expiry)
end
def expiry
return DuplicateJob::DUPLICATE_KEY_TTL unless duplicate_job.scheduled?
time_diff = duplicate_job.scheduled_at.to_i - Time.now.to_i
time_diff > 0 ? time_diff : DuplicateJob::DUPLICATE_KEY_TTL
end
end
end
end
......
......@@ -31,14 +31,51 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::Client, :clean_gitlab_redis_q
expect(job3['duplicate-of']).to eq(job1['jid'])
end
it "does not mark a job that's scheduled in the future as a duplicate" do
TestDeduplicationWorker.perform_async('args1')
TestDeduplicationWorker.perform_at(1.day.from_now, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args1')
context 'without scheduled deduplication' do
it "does not mark a job that's scheduled in the future as a duplicate" do
TestDeduplicationWorker.perform_async('args1')
TestDeduplicationWorker.perform_at(1.day.from_now, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args1')
duplicates = TestDeduplicationWorker.jobs.map { |job| job['duplicate-of'] }
duplicates = TestDeduplicationWorker.jobs.map { |job| job['duplicate-of'] }
expect(duplicates).to all(be_nil)
expect(duplicates).to all(be_nil)
end
end
context 'with scheduled deduplication' do
let(:scheduled_worker_class) do
Class.new do
def self.name
'TestDeduplicationWorker'
end
include ApplicationWorker
deduplicate :until_executing, including_scheduled: true
def perform(*args)
end
end
end
before do
stub_const('TestDeduplicationWorker', scheduled_worker_class)
end
it 'adds a correct duplicate tag to the jobs', :aggregate_failures do
TestDeduplicationWorker.perform_async('args1')
TestDeduplicationWorker.perform_at(1.day.from_now, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args2')
job1, job2, job3, job4 = TestDeduplicationWorker.jobs
expect(job1['duplicate-of']).to be_nil
expect(job2['duplicate-of']).to eq(job1['jid'])
expect(job3['duplicate-of']).to eq(job1['jid'])
expect(job4['duplicate-of']).to be_nil
end
end
end
end
......@@ -93,6 +93,25 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gitlab_r
end
end
describe '#scheduled?' do
it 'returns false for non-scheduled jobs' do
expect(duplicate_job.scheduled?).to be(false)
end
context 'scheduled jobs' do
let(:job) do
{ 'class' => 'AuthorizedProjectsWorker',
'args' => [1],
'jid' => '123',
'at' => 42 }
end
it 'returns true' do
expect(duplicate_job.scheduled?).to be(true)
end
end
end
describe '#duplicate?' do
it "raises an error if the check wasn't performed" do
expect { duplicate_job.duplicate? }.to raise_error /Call `#check!` first/
......@@ -112,28 +131,23 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gitlab_r
end
end
describe 'droppable?' do
where(:idempotent, :duplicate, :prevent_deduplication) do
# [true, false].repeated_permutation(3)
[[true, true, true],
[true, true, false],
[true, false, true],
[true, false, false],
[false, true, true],
[false, true, false],
[false, false, true],
[false, false, false]]
describe '#droppable?' do
where(:idempotent, :prevent_deduplication) do
# [true, false].repeated_permutation(2)
[[true, true],
[true, false],
[false, true],
[false, false]]
end
with_them do
before do
allow(AuthorizedProjectsWorker).to receive(:idempotent?).and_return(idempotent)
allow(duplicate_job).to receive(:duplicate?).and_return(duplicate)
stub_feature_flags("disable_#{queue}_deduplication" => prevent_deduplication)
end
it 'is droppable when all conditions are met' do
if idempotent && duplicate && !prevent_deduplication
if idempotent && !prevent_deduplication
expect(duplicate_job).to be_droppable
else
expect(duplicate_job).not_to be_droppable
......@@ -142,6 +156,31 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gitlab_r
end
end
describe '#scheduled_at' do
let(:scheduled_at) { 42 }
let(:job) do
{ 'class' => 'AuthorizedProjectsWorker',
'args' => [1],
'jid' => '123',
'at' => scheduled_at }
end
it 'returns when the job is scheduled at' do
expect(duplicate_job.scheduled_at).to eq(scheduled_at)
end
end
describe '#options' do
let(:worker_options) { { foo: true } }
it 'returns worker options' do
allow(AuthorizedProjectsWorker).to(
receive(:get_deduplication_options).and_return(worker_options))
expect(duplicate_job.options).to eq(worker_options)
end
end
def set_idempotency_key(key, value = '1')
Sidekiq.redis { |r| r.set(key, value) }
end
......
# frozen_string_literal: true
require 'fast_spec_helper'
require 'timecop'
describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting do
let(:fake_duplicate_job) do
......@@ -15,28 +16,90 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting do
end
it 'checks for duplicates before yielding' do
expect(fake_duplicate_job).to receive(:check!).ordered.and_return('a jid')
expect(fake_duplicate_job).to receive(:scheduled?).twice.ordered.and_return(false)
expect(fake_duplicate_job).to(
receive(:check!)
.with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL)
.ordered
.and_return('a jid'))
expect(fake_duplicate_job).to receive(:duplicate?).ordered.and_return(false)
expect(fake_duplicate_job).to receive(:droppable?).ordered.and_return(false)
expect { |b| strategy.schedule({}, &b) }.to yield_control
end
it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
it 'checks worker options for scheduled jobs' do
expect(fake_duplicate_job).to receive(:scheduled?).ordered.and_return(true)
expect(fake_duplicate_job).to receive(:options).ordered.and_return({})
expect(fake_duplicate_job).not_to receive(:check!)
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
expect { |b| strategy.schedule({}, &b) }.to yield_control
end
context 'job marking' do
it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
strategy.schedule(job_hash) {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
expect(job_hash).to include('duplicate-of' => 'the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
context 'scheduled jobs' do
let(:time_diff) { 1.minute }
context 'scheduled in the past' do
it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:scheduled?).twice.and_return(true)
allow(fake_duplicate_job).to receive(:scheduled_at).and_return(Time.now - time_diff)
allow(fake_duplicate_job).to receive(:options).and_return({ including_scheduled: true })
allow(fake_duplicate_job).to(
receive(:check!)
.with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL)
.and_return('the jid'))
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
end
context 'scheduled in the future' do
it 'adds the jid of the existing job to the job hash' do
Timecop.freeze do
allow(fake_duplicate_job).to receive(:scheduled?).twice.and_return(true)
allow(fake_duplicate_job).to receive(:scheduled_at).and_return(Time.now + time_diff)
allow(fake_duplicate_job).to receive(:options).and_return({ including_scheduled: true })
allow(fake_duplicate_job).to(
receive(:check!).with(time_diff.to_i).and_return('the jid'))
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
end
end
end
end
context "when the job is droppable" do
before do
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:duplicate?).and_return(true)
allow(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
......@@ -52,7 +115,7 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting do
expect(schedule_result).to be(false)
end
it 'logs that the job wass dropped' do
it 'logs that the job was dropped' do
fake_logger = instance_double(Gitlab::SidekiqLogging::DeduplicationLogger)
expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment