Commit ab5a4c2d authored by Mike Kozono's avatar Mike Kozono

Add Reenqueuer worker concern

It helps run exactly one instance of a worker, over and
over, until it returns false or raises.

If `perform` completes very quickly, it will sleep until
`minimum_duration` is reached (default 5 seconds).
parent c0b1e828
# frozen_string_literal: true
#
# A concern that helps run exactly one instance of a worker, over and over,
# until it returns false or raises.
#
# To ensure the worker is always up, you can schedule it every minute with
# sidekiq-cron. Excess jobs will immediately exit due to an exclusive lease.
#
# The worker must define:
#
# - `#perform`
# - `#lease_timeout`
#
# The worker spec should include `it_behaves_like 'reenqueuer'` and
# `it_behaves_like 'it is rate limited to 1 call per'`.
#
# Optionally override `#minimum_duration` to adjust the rate limit.
#
# When `#perform` returns false, the job will not be reenqueued. Instead, we
# will wait for the next one scheduled by sidekiq-cron.
#
# #lease_timeout should be longer than the longest possible `#perform`.
# The lease is normally released in an ensure block, but it is possible to
# orphan the lease by killing Sidekiq, so it should also be as short as
# possible. Consider that long-running jobs are generally not recommended.
# Ideally, every job finishes within 25 seconds because that is the default
# wait time for graceful termination.
#
# Timing: It runs as often as Sidekiq allows. We rate limit with sleep for
# now: https://gitlab.com/gitlab-org/gitlab/issues/121697
module Reenqueuer
extend ActiveSupport::Concern
prepended do
include ExclusiveLeaseGuard
include ReenqueuerSleeper
sidekiq_options retry: false
end
def perform(*args)
try_obtain_lease do
reenqueue(*args) do
ensure_minimum_duration(minimum_duration) do
super
end
end
end
end
private
def reenqueue(*args)
self.class.perform_async(*args) if yield
end
# Override as needed
def minimum_duration
5.seconds
end
# We intend to get rid of sleep:
# https://gitlab.com/gitlab-org/gitlab/issues/121697
module ReenqueuerSleeper
# The block will run, and then sleep until the minimum duration. Returns the
# block's return value.
#
# Usage:
#
# ensure_minimum_duration(5.seconds) do
# # do something
# end
#
def ensure_minimum_duration(minimum_duration)
start_time = Time.now
result = yield
sleep_if_time_left(minimum_duration, start_time)
result
end
private
def sleep_if_time_left(minimum_duration, start_time)
time_left = calculate_time_left(minimum_duration, start_time)
sleep(time_left) if time_left > 0
end
def calculate_time_left(minimum_duration, start_time)
minimum_duration - elapsed_time(start_time)
end
def elapsed_time(start_time)
Time.now - start_time
end
end
end
# frozen_string_literal: true
# Expects `worker_class` to be defined
shared_examples_for 'reenqueuer' do
subject(:job) { worker_class.new }
before do
allow(job).to receive(:sleep) # faster tests
end
it 'implements lease_timeout' do
expect(job.lease_timeout).to be_a(ActiveSupport::Duration)
end
describe '#perform' do
it 'tries to obtain a lease' do
expect_to_obtain_exclusive_lease(job.lease_key)
job.perform
end
end
end
# Example usage:
#
# it_behaves_like 'it is rate limited to 1 call per', 5.seconds do
# subject { described_class.new }
# let(:rate_limited_method) { subject.perform }
# end
#
shared_examples_for 'it is rate limited to 1 call per' do |minimum_duration|
before do
# Allow Timecop freeze and travel without the block form
Timecop.safe_mode = false
Timecop.freeze
time_travel_during_rate_limited_method(actual_duration)
end
after do
Timecop.return
Timecop.safe_mode = true
end
context 'when the work finishes in 0 seconds' do
let(:actual_duration) { 0 }
it 'sleeps exactly the minimum duration' do
expect(subject).to receive(:sleep).with(a_value_within(0.01).of(minimum_duration))
rate_limited_method
end
end
context 'when the work finishes in 10% of minimum duration' do
let(:actual_duration) { 0.1 * minimum_duration }
it 'sleeps 90% of minimum duration' do
expect(subject).to receive(:sleep).with(a_value_within(0.01).of(0.9 * minimum_duration))
rate_limited_method
end
end
context 'when the work finishes in 90% of minimum duration' do
let(:actual_duration) { 0.9 * minimum_duration }
it 'sleeps 10% of minimum duration' do
expect(subject).to receive(:sleep).with(a_value_within(0.01).of(0.1 * minimum_duration))
rate_limited_method
end
end
context 'when the work finishes exactly at minimum duration' do
let(:actual_duration) { minimum_duration }
it 'does not sleep' do
expect(subject).not_to receive(:sleep)
rate_limited_method
end
end
context 'when the work takes 10% longer than minimum duration' do
let(:actual_duration) { 1.1 * minimum_duration }
it 'does not sleep' do
expect(subject).not_to receive(:sleep)
rate_limited_method
end
end
context 'when the work takes twice as long as minimum duration' do
let(:actual_duration) { 2 * minimum_duration }
it 'does not sleep' do
expect(subject).not_to receive(:sleep)
rate_limited_method
end
end
def time_travel_during_rate_limited_method(actual_duration)
# Save the original implementation of ensure_minimum_duration
original_ensure_minimum_duration = subject.method(:ensure_minimum_duration)
allow(subject).to receive(:ensure_minimum_duration) do |minimum_duration, &block|
original_ensure_minimum_duration.call(minimum_duration) do
# Time travel inside the block inside ensure_minimum_duration
Timecop.travel(actual_duration) if actual_duration && actual_duration > 0
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Reenqueuer do
include ExclusiveLeaseHelpers
let_it_be(:worker_class) do
Class.new do
def self.name
'Gitlab::Foo::Bar::DummyWorker'
end
include ApplicationWorker
prepend Reenqueuer
attr_reader :performed_args
def perform(*args)
@performed_args = args
success? # for stubbing
end
def success?
false
end
def lease_timeout
30.seconds
end
end
end
subject(:job) { worker_class.new }
before do
allow(job).to receive(:sleep) # faster tests
end
it_behaves_like 'reenqueuer'
it_behaves_like 'it is rate limited to 1 call per', 5.seconds do
let(:rate_limited_method) { subject.perform }
end
it 'disables Sidekiq retries' do
expect(job.sidekiq_options_hash).to include('retry' => false)
end
describe '#perform', :clean_gitlab_redis_shared_state do
let(:arbitrary_args) { [:foo, 'bar', { a: 1 }] }
context 'when the lease is available' do
it 'does perform' do
job.perform(*arbitrary_args)
expect(job.performed_args).to eq(arbitrary_args)
end
end
context 'when the lease is taken' do
before do
stub_exclusive_lease_taken(job.lease_key)
end
it 'does not perform' do
job.perform(*arbitrary_args)
expect(job.performed_args).to be_nil
end
end
context 'when #perform returns truthy' do
before do
allow(job).to receive(:success?).and_return(true)
end
it 'reenqueues the worker' do
expect(worker_class).to receive(:perform_async)
job.perform
end
end
context 'when #perform returns falsey' do
it 'does not reenqueue the worker' do
expect(worker_class).not_to receive(:perform_async)
job.perform
end
end
end
end
describe Reenqueuer::ReenqueuerSleeper do
let_it_be(:dummy_class) do
Class.new do
include Reenqueuer::ReenqueuerSleeper
def rate_limited_method
ensure_minimum_duration(11.seconds) do
# do work
end
end
end
end
subject(:dummy) { dummy_class.new }
# Test that rate_limited_method is rate limited by ensure_minimum_duration
it_behaves_like 'it is rate limited to 1 call per', 11.seconds do
let(:rate_limited_method) { dummy.rate_limited_method }
end
# Test ensure_minimum_duration more directly
describe '#ensure_minimum_duration' do
around do |example|
# Allow Timecop.travel without the block form
Timecop.safe_mode = false
Timecop.freeze do
example.run
end
Timecop.safe_mode = true
end
let(:minimum_duration) { 4.seconds }
context 'when the block completes well before the minimum duration' do
let(:time_left) { 3.seconds }
it 'sleeps until the minimum duration' do
expect(dummy).to receive(:sleep).with(a_value_within(0.01).of(time_left))
dummy.ensure_minimum_duration(minimum_duration) do
Timecop.travel(minimum_duration - time_left)
end
end
end
context 'when the block completes just before the minimum duration' do
let(:time_left) { 0.1.seconds }
it 'sleeps until the minimum duration' do
expect(dummy).to receive(:sleep).with(a_value_within(0.01).of(time_left))
dummy.ensure_minimum_duration(minimum_duration) do
Timecop.travel(minimum_duration - time_left)
end
end
end
context 'when the block completes just after the minimum duration' do
let(:time_over) { 0.1.seconds }
it 'does not sleep' do
expect(dummy).not_to receive(:sleep)
dummy.ensure_minimum_duration(minimum_duration) do
Timecop.travel(minimum_duration + time_over)
end
end
end
context 'when the block completes well after the minimum duration' do
let(:time_over) { 10.seconds }
it 'does not sleep' do
expect(dummy).not_to receive(:sleep)
dummy.ensure_minimum_duration(minimum_duration) do
Timecop.travel(minimum_duration + time_over)
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment