Commit 242ef90d authored by Oswaldo Ferreira's avatar Oswaldo Ferreira

Merge branch 'if-218380-deduplicate_scheduled_jobs' into 'master'

Deduplicate scheduled Sidekiq jobs

See merge request gitlab-org/gitlab!33972
parents bf4c17b9 fa41f376
......@@ -5,6 +5,7 @@ module AuthorizedProjectUpdate
feature_category :authentication_and_authorization
urgency :low
queue_namespace :authorized_project_update
deduplicate :until_executing, including_scheduled: true
idempotent!
end
......
......@@ -119,6 +119,20 @@ module WorkerAttributes
Array(worker_attributes[:tags])
end
def deduplicate(strategy, options = {})
worker_attributes[:deduplication_strategy] = strategy
worker_attributes[:deduplication_options] = options
end
def get_deduplicate_strategy
worker_attributes[:deduplication_strategy] ||
Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DEFAULT_STRATEGY
end
def get_deduplication_options
worker_attributes[:deduplication_options] || {}
end
protected
# Returns a worker attribute declared on this class or its parent class.
......
......@@ -5,9 +5,6 @@ module Gitlab
module DuplicateJobs
class Client
def call(worker_class, job, queue, _redis_pool, &block)
# We don't try to deduplicate jobs that are scheduled in the future
return yield if job['at']
DuplicateJob.new(job, queue).schedule(&block)
end
end
......
......@@ -18,13 +18,13 @@ module Gitlab
# When new jobs can be scheduled again, the strategy calls `#delete`.
class DuplicateJob
DUPLICATE_KEY_TTL = 6.hours
DEFAULT_STRATEGY = :until_executing
attr_reader :existing_jid
def initialize(job, queue_name, strategy: :until_executing)
def initialize(job, queue_name)
@job = job
@queue_name = queue_name
@strategy = strategy
end
# This will continue the middleware chain if the job should be scheduled
......@@ -41,12 +41,12 @@ module Gitlab
end
# This method will return the jid that was set in redis
def check!
def check!(expiry = DUPLICATE_KEY_TTL)
read_jid = nil
Sidekiq.redis do |redis|
redis.multi do |multi|
redis.set(idempotency_key, jid, ex: DUPLICATE_KEY_TTL, nx: true)
redis.set(idempotency_key, jid, ex: expiry, nx: true)
read_jid = redis.get(idempotency_key)
end
end
......@@ -60,6 +60,10 @@ module Gitlab
end
end
def scheduled?
scheduled_at.present?
end
def duplicate?
raise "Call `#check!` first to check for existing duplicates" unless existing_jid
......@@ -67,14 +71,36 @@ module Gitlab
end
def droppable?
idempotent? && duplicate? && ::Feature.disabled?("disable_#{queue_name}_deduplication")
idempotent? && ::Feature.disabled?("disable_#{queue_name}_deduplication")
end
def scheduled_at
job['at']
end
def options
return {} unless worker_klass
return {} unless worker_klass.respond_to?(:get_deduplication_options)
worker_klass.get_deduplication_options
end
private
attr_reader :queue_name, :strategy, :job
attr_reader :queue_name, :job
attr_writer :existing_jid
def worker_klass
@worker_klass ||= worker_class_name.to_s.safe_constantize
end
def strategy
return DEFAULT_STRATEGY unless worker_klass
return DEFAULT_STRATEGY unless worker_klass.respond_to?(:idempotent?)
worker_klass.get_deduplicate_strategy
end
def worker_class_name
job['class']
end
......@@ -104,11 +130,10 @@ module Gitlab
end
def idempotent?
worker_class = worker_class_name.to_s.safe_constantize
return false unless worker_class
return false unless worker_class.respond_to?(:idempotent?)
return false unless worker_klass
return false unless worker_klass.respond_to?(:idempotent?)
worker_class.idempotent?
worker_klass.idempotent?
end
end
end
......
......@@ -13,13 +13,13 @@ module Gitlab
end
def schedule(job)
if duplicate_job.check! && duplicate_job.duplicate?
if deduplicatable_job? && check! && duplicate_job.duplicate?
job['duplicate-of'] = duplicate_job.existing_jid
end
if duplicate_job.droppable?
Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(job, "dropped until executing")
return false
if duplicate_job.droppable?
Gitlab::SidekiqLogging::DeduplicationLogger.instance.log(job, "dropped until executing")
return false
end
end
yield
......@@ -34,6 +34,22 @@ module Gitlab
private
attr_reader :duplicate_job
def deduplicatable_job?
!duplicate_job.scheduled? || duplicate_job.options[:including_scheduled]
end
def check!
duplicate_job.check!(expiry)
end
def expiry
return DuplicateJob::DUPLICATE_KEY_TTL unless duplicate_job.scheduled?
time_diff = duplicate_job.scheduled_at.to_i - Time.now.to_i
time_diff > 0 ? time_diff : DuplicateJob::DUPLICATE_KEY_TTL
end
end
end
end
......
......@@ -31,14 +31,51 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::Client, :clean_gitlab_redis_q
expect(job3['duplicate-of']).to eq(job1['jid'])
end
it "does not mark a job that's scheduled in the future as a duplicate" do
TestDeduplicationWorker.perform_async('args1')
TestDeduplicationWorker.perform_at(1.day.from_now, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args1')
context 'without scheduled deduplication' do
it "does not mark a job that's scheduled in the future as a duplicate" do
TestDeduplicationWorker.perform_async('args1')
TestDeduplicationWorker.perform_at(1.day.from_now, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args1')
duplicates = TestDeduplicationWorker.jobs.map { |job| job['duplicate-of'] }
duplicates = TestDeduplicationWorker.jobs.map { |job| job['duplicate-of'] }
expect(duplicates).to all(be_nil)
expect(duplicates).to all(be_nil)
end
end
context 'with scheduled deduplication' do
let(:scheduled_worker_class) do
Class.new do
def self.name
'TestDeduplicationWorker'
end
include ApplicationWorker
deduplicate :until_executing, including_scheduled: true
def perform(*args)
end
end
end
before do
stub_const('TestDeduplicationWorker', scheduled_worker_class)
end
it 'adds a correct duplicate tag to the jobs', :aggregate_failures do
TestDeduplicationWorker.perform_async('args1')
TestDeduplicationWorker.perform_at(1.day.from_now, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args1')
TestDeduplicationWorker.perform_in(3.hours, 'args2')
job1, job2, job3, job4 = TestDeduplicationWorker.jobs
expect(job1['duplicate-of']).to be_nil
expect(job2['duplicate-of']).to eq(job1['jid'])
expect(job3['duplicate-of']).to eq(job1['jid'])
expect(job4['duplicate-of']).to be_nil
end
end
end
end
......@@ -93,6 +93,25 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gitlab_r
end
end
describe '#scheduled?' do
it 'returns false for non-scheduled jobs' do
expect(duplicate_job.scheduled?).to be(false)
end
context 'scheduled jobs' do
let(:job) do
{ 'class' => 'AuthorizedProjectsWorker',
'args' => [1],
'jid' => '123',
'at' => 42 }
end
it 'returns true' do
expect(duplicate_job.scheduled?).to be(true)
end
end
end
describe '#duplicate?' do
it "raises an error if the check wasn't performed" do
expect { duplicate_job.duplicate? }.to raise_error /Call `#check!` first/
......@@ -112,28 +131,23 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gitlab_r
end
end
describe 'droppable?' do
where(:idempotent, :duplicate, :prevent_deduplication) do
# [true, false].repeated_permutation(3)
[[true, true, true],
[true, true, false],
[true, false, true],
[true, false, false],
[false, true, true],
[false, true, false],
[false, false, true],
[false, false, false]]
describe '#droppable?' do
where(:idempotent, :prevent_deduplication) do
# [true, false].repeated_permutation(2)
[[true, true],
[true, false],
[false, true],
[false, false]]
end
with_them do
before do
allow(AuthorizedProjectsWorker).to receive(:idempotent?).and_return(idempotent)
allow(duplicate_job).to receive(:duplicate?).and_return(duplicate)
stub_feature_flags("disable_#{queue}_deduplication" => prevent_deduplication)
end
it 'is droppable when all conditions are met' do
if idempotent && duplicate && !prevent_deduplication
if idempotent && !prevent_deduplication
expect(duplicate_job).to be_droppable
else
expect(duplicate_job).not_to be_droppable
......@@ -142,6 +156,31 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gitlab_r
end
end
describe '#scheduled_at' do
let(:scheduled_at) { 42 }
let(:job) do
{ 'class' => 'AuthorizedProjectsWorker',
'args' => [1],
'jid' => '123',
'at' => scheduled_at }
end
it 'returns when the job is scheduled at' do
expect(duplicate_job.scheduled_at).to eq(scheduled_at)
end
end
describe '#options' do
let(:worker_options) { { foo: true } }
it 'returns worker options' do
allow(AuthorizedProjectsWorker).to(
receive(:get_deduplication_options).and_return(worker_options))
expect(duplicate_job.options).to eq(worker_options)
end
end
def set_idempotency_key(key, value = '1')
Sidekiq.redis { |r| r.set(key, value) }
end
......
# frozen_string_literal: true
require 'fast_spec_helper'
require 'timecop'
describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting do
let(:fake_duplicate_job) do
......@@ -15,28 +16,90 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting do
end
it 'checks for duplicates before yielding' do
expect(fake_duplicate_job).to receive(:check!).ordered.and_return('a jid')
expect(fake_duplicate_job).to receive(:scheduled?).twice.ordered.and_return(false)
expect(fake_duplicate_job).to(
receive(:check!)
.with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL)
.ordered
.and_return('a jid'))
expect(fake_duplicate_job).to receive(:duplicate?).ordered.and_return(false)
expect(fake_duplicate_job).to receive(:droppable?).ordered.and_return(false)
expect { |b| strategy.schedule({}, &b) }.to yield_control
end
it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
it 'checks worker options for scheduled jobs' do
expect(fake_duplicate_job).to receive(:scheduled?).ordered.and_return(true)
expect(fake_duplicate_job).to receive(:options).ordered.and_return({})
expect(fake_duplicate_job).not_to receive(:check!)
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
expect { |b| strategy.schedule({}, &b) }.to yield_control
end
context 'job marking' do
it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
strategy.schedule(job_hash) {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
expect(job_hash).to include('duplicate-of' => 'the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
context 'scheduled jobs' do
let(:time_diff) { 1.minute }
context 'scheduled in the past' do
it 'adds the jid of the existing job to the job hash' do
allow(fake_duplicate_job).to receive(:scheduled?).twice.and_return(true)
allow(fake_duplicate_job).to receive(:scheduled_at).and_return(Time.now - time_diff)
allow(fake_duplicate_job).to receive(:options).and_return({ including_scheduled: true })
allow(fake_duplicate_job).to(
receive(:check!)
.with(Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DUPLICATE_KEY_TTL)
.and_return('the jid'))
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
end
context 'scheduled in the future' do
it 'adds the jid of the existing job to the job hash' do
Timecop.freeze do
allow(fake_duplicate_job).to receive(:scheduled?).twice.and_return(true)
allow(fake_duplicate_job).to receive(:scheduled_at).and_return(Time.now + time_diff)
allow(fake_duplicate_job).to receive(:options).and_return({ including_scheduled: true })
allow(fake_duplicate_job).to(
receive(:check!).with(time_diff.to_i).and_return('the jid'))
allow(fake_duplicate_job).to receive(:droppable?).and_return(true)
job_hash = {}
expect(fake_duplicate_job).to receive(:duplicate?).and_return(true)
expect(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
strategy.schedule(job_hash) {}
expect(job_hash).to include('duplicate-of' => 'the jid')
end
end
end
end
end
context "when the job is droppable" do
before do
allow(fake_duplicate_job).to receive(:scheduled?).and_return(false)
allow(fake_duplicate_job).to receive(:check!).and_return('the jid')
allow(fake_duplicate_job).to receive(:duplicate?).and_return(true)
allow(fake_duplicate_job).to receive(:existing_jid).and_return('the jid')
......@@ -52,7 +115,7 @@ describe Gitlab::SidekiqMiddleware::DuplicateJobs::Strategies::UntilExecuting do
expect(schedule_result).to be(false)
end
it 'logs that the job wass dropped' do
it 'logs that the job was dropped' do
fake_logger = instance_double(Gitlab::SidekiqLogging::DeduplicationLogger)
expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment