Commit c0d33dc6 authored by Oswaldo Ferreira's avatar Oswaldo Ferreira

Add reactive caching worker for external services

Introduces a separated worker to handle reactive caching
that performs external requests.

This worker needs to stay in lower priority given it's
not possible to predict how external services will perform.
I.e. this work should not block other work with higher
priority.
parent d3c9c616
......@@ -8,6 +8,11 @@ module ReactiveCaching
InvalidateReactiveCache = Class.new(StandardError)
ExceededReactiveCacheLimit = Class.new(StandardError)
WORK_TYPE = {
default: ReactiveCachingWorker,
external_dependency: ExternalServiceReactiveCachingWorker
}.freeze
included do
extend ActiveModel::Naming
......@@ -16,6 +21,7 @@ module ReactiveCaching
class_attribute :reactive_cache_refresh_interval
class_attribute :reactive_cache_lifetime
class_attribute :reactive_cache_hard_limit
class_attribute :reactive_cache_work_type
class_attribute :reactive_cache_worker_finder
# defaults
......@@ -24,6 +30,7 @@ module ReactiveCaching
self.reactive_cache_refresh_interval = 1.minute
self.reactive_cache_lifetime = 10.minutes
self.reactive_cache_hard_limit = 1.megabyte
self.reactive_cache_work_type = :default
self.reactive_cache_worker_finder = ->(id, *_args) do
find_by(primary_key => id)
end
......@@ -112,7 +119,7 @@ module ReactiveCaching
def refresh_reactive_cache!(*args)
clear_reactive_cache!(*args)
keep_alive_reactive_cache!(*args)
ReactiveCachingWorker.perform_async(self.class, id, *args)
worker_class.perform_async(self.class, id, *args)
end
def keep_alive_reactive_cache!(*args)
......@@ -145,7 +152,11 @@ module ReactiveCaching
def enqueuing_update(*args)
yield
ReactiveCachingWorker.perform_in(self.class.reactive_cache_refresh_interval, self.class, id, *args)
worker_class.perform_in(self.class.reactive_cache_refresh_interval, self.class, id, *args)
end
def worker_class
WORK_TYPE.fetch(self.class.reactive_cache_work_type.to_sym)
end
def check_exceeded_reactive_cache_limit!(data)
......
......@@ -1060,6 +1060,13 @@
:resource_boundary: :cpu
:weight: 1
:idempotent:
- :name: external_service_reactive_caching
:feature_category: :not_owned
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent:
- :name: file_hook
:feature_category: :integrations
:has_external_dependencies:
......
# frozen_string_literal: true
module ReactiveCacheableWorker
extend ActiveSupport::Concern
included do
include ApplicationWorker
feature_category_not_owned!
def self.context_for_arguments(arguments)
class_name, *_other_args = arguments
Gitlab::ApplicationContext.new(related_class: class_name.to_s)
end
end
def perform(class_name, id, *args)
klass = begin
class_name.constantize
rescue NameError
nil
end
return unless klass
klass
.reactive_cache_worker_finder
.call(id, *args)
.try(:exclusively_update_reactive_cache!, *args)
rescue ReactiveCaching::ExceededReactiveCacheLimit => e
Gitlab::ErrorTracking.track_exception(e)
end
end
# frozen_string_literal: true
class ExternalServiceReactiveCachingWorker # rubocop:disable Scalability/IdempotentWorker
include ReactiveCacheableWorker
worker_has_external_dependencies!
end
# frozen_string_literal: true
class ReactiveCachingWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
include ReactiveCacheableWorker
feature_category_not_owned!
# TODO: The reactive caching worker should be split into
# two different workers, one for high urgency jobs without external dependencies
# and another worker without high urgency, but with external dependencies
# https://gitlab.com/gitlab-com/gl-infra/scalability/issues/34
# This worker should also have `worker_has_external_dependencies!` enabled
urgency :high
worker_resource_boundary :cpu
def self.context_for_arguments(arguments)
class_name, *_other_args = arguments
Gitlab::ApplicationContext.new(related_class: class_name.to_s)
end
def perform(class_name, id, *args)
klass = begin
class_name.constantize
rescue NameError
nil
end
return unless klass
klass
.reactive_cache_worker_finder
.call(id, *args)
.try(:exclusively_update_reactive_cache!, *args)
rescue ReactiveCaching::ExceededReactiveCacheLimit => e
Gitlab::ErrorTracking.track_exception(e)
end
end
......@@ -100,6 +100,8 @@
- 1
- - export_csv
- 1
- - external_service_reactive_caching
- 1
- - file_hook
- 1
- - gcp_cluster
......
......@@ -85,6 +85,9 @@ The ReactiveCaching concern can be used in models as well as `project_services`
1. Implement the `calculate_reactive_cache` method in your model/service.
1. Call `with_reactive_cache` in your model/service where the cached value is needed.
1. If the `calculate_reactive_cache` method above submits requests to external services
(e.g. Prometheus, K8s), make sure to change the
[`reactive_cache_work_type` accordingly](#selfreactive_cache_work_type).
### In controllers
......@@ -244,6 +247,13 @@ and will silently raise `ReactiveCaching::ExceededReactiveCacheLimit` on Sentry.
self.reactive_cache_hard_limit = 5.megabytes
```
#### `self.reactive_cache_work_type`
- This is the type of work performed by the `calculate_reactive_cache` method. Based on this attribute,
it's able to pick the right worker to process the caching job. Make sure to
set it as `:external_dependency` if the work performs any external request
(e.g. Kubernetes, Sentry).
#### `self.reactive_cache_worker_finder`
- This is the method used by the background worker to find or generate the object on
......
......@@ -6,39 +6,47 @@ describe ReactiveCaching, :use_clean_rails_memory_store_caching do
include ExclusiveLeaseHelpers
include ReactiveCachingHelpers
class CacheTest
include ReactiveCaching
let(:cache_class_test) do
Class.new do
include ReactiveCaching
self.reactive_cache_key = ->(thing) { ["foo", thing.id] }
self.reactive_cache_key = ->(thing) { ["foo", thing.id] }
self.reactive_cache_lifetime = 5.minutes
self.reactive_cache_refresh_interval = 15.seconds
self.reactive_cache_lifetime = 5.minutes
self.reactive_cache_refresh_interval = 15.seconds
attr_reader :id
attr_reader :id
def self.primary_key
:id
end
def self.primary_key
:id
end
def initialize(id, &blk)
@id = id
@calculator = blk
end
def initialize(id, &blk)
@id = id
@calculator = blk
end
def calculate_reactive_cache
@calculator.call
end
def calculate_reactive_cache
@calculator.call
end
def result
with_reactive_cache do |data|
data
def result
with_reactive_cache do |data|
data
end
end
end
end
let(:external_dependency_cache_class_test) do
Class.new(cache_class_test) do
self.reactive_cache_work_type = :external_dependency
end
end
let(:calculation) { -> { 2 + 2 } }
let(:cache_key) { "foo:666" }
let(:instance) { CacheTest.new(666, &calculation) }
let(:instance) { cache_class_test.new(666, &calculation) }
describe '#with_reactive_cache' do
before do
......@@ -47,6 +55,18 @@ describe ReactiveCaching, :use_clean_rails_memory_store_caching do
subject(:go!) { instance.result }
shared_examples 'reactive worker call' do |worker_class|
let(:instance) do
test_class.new(666, &calculation)
end
it 'performs caching with correct worker' do
expect(worker_class).to receive(:perform_async).with(test_class, 666)
go!
end
end
shared_examples 'a cacheable value' do |cached_value|
before do
stub_reactive_cache(instance, cached_value)
......@@ -73,10 +93,12 @@ describe ReactiveCaching, :use_clean_rails_memory_store_caching do
it { is_expected.to be_nil }
it 'refreshes cache' do
expect(ReactiveCachingWorker).to receive(:perform_async).with(CacheTest, 666)
it_behaves_like 'reactive worker call', ReactiveCachingWorker do
let(:test_class) { cache_class_test }
end
instance.with_reactive_cache { raise described_class::InvalidateReactiveCache }
it_behaves_like 'reactive worker call', ExternalServiceReactiveCachingWorker do
let(:test_class) { external_dependency_cache_class_test }
end
end
end
......@@ -84,10 +106,12 @@ describe ReactiveCaching, :use_clean_rails_memory_store_caching do
context 'when cache is empty' do
it { is_expected.to be_nil }
it 'enqueues a background worker to bootstrap the cache' do
expect(ReactiveCachingWorker).to receive(:perform_async).with(CacheTest, 666)
it_behaves_like 'reactive worker call', ReactiveCachingWorker do
let(:test_class) { cache_class_test }
end
go!
it_behaves_like 'reactive worker call', ExternalServiceReactiveCachingWorker do
let(:test_class) { external_dependency_cache_class_test }
end
it 'updates the cache lifespan' do
......@@ -168,12 +192,14 @@ describe ReactiveCaching, :use_clean_rails_memory_store_caching do
context 'with custom reactive_cache_worker_finder' do
let(:args) { %w(arg1 arg2) }
let(:instance) { CustomFinderCacheTest.new(666, &calculation) }
let(:instance) { custom_finder_cache_test.new(666, &calculation) }
class CustomFinderCacheTest < CacheTest
self.reactive_cache_worker_finder = ->(_id, *args) { from_cache(*args) }
let(:custom_finder_cache_test) do
Class.new(cache_class_test) do
self.reactive_cache_worker_finder = ->(_id, *args) { from_cache(*args) }
def self.from_cache(*args); end
def self.from_cache(*args); end
end
end
before do
......@@ -234,6 +260,18 @@ describe ReactiveCaching, :use_clean_rails_memory_store_caching do
go!
end
context 'when :external_dependency cache' do
let(:instance) do
external_dependency_cache_class_test.new(666, &calculation)
end
it 'enqueues a repeat worker' do
expect_reactive_cache_update_queued(instance, worker_klass: ExternalServiceReactiveCachingWorker)
go!
end
end
it 'calls a reactive_cache_updated only once if content did not change on subsequent update' do
expect(instance).to receive(:calculate_reactive_cache).twice
expect(instance).to receive(:reactive_cache_updated).once
......@@ -262,7 +300,7 @@ describe ReactiveCaching, :use_clean_rails_memory_store_caching do
it_behaves_like 'ExceededReactiveCacheLimit'
context 'when reactive_cache_hard_limit is overridden' do
let(:test_class) { Class.new(CacheTest) { self.reactive_cache_hard_limit = 3.megabytes } }
let(:test_class) { Class.new(cache_class_test) { self.reactive_cache_hard_limit = 3.megabytes } }
let(:instance) { test_class.new(666, &calculation) }
it_behaves_like 'successful cache'
......
......@@ -10,8 +10,11 @@ module ReactiveCachingHelpers
end
def stub_reactive_cache(subject = nil, data = nil, *qualifiers)
allow(ReactiveCachingWorker).to receive(:perform_async)
allow(ReactiveCachingWorker).to receive(:perform_in)
ReactiveCaching::WORK_TYPE.values.each do |worker|
allow(worker).to receive(:perform_async)
allow(worker).to receive(:perform_in)
end
write_reactive_cache(subject, data, *qualifiers) unless subject.nil?
end
......@@ -42,8 +45,8 @@ module ReactiveCachingHelpers
Rails.cache.write(alive_reactive_cache_key(subject, *qualifiers), true)
end
def expect_reactive_cache_update_queued(subject)
expect(ReactiveCachingWorker)
def expect_reactive_cache_update_queued(subject, worker_klass: ReactiveCachingWorker)
expect(worker_klass)
.to receive(:perform_in)
.with(subject.class.reactive_cache_refresh_interval, subject.class, subject.id)
end
......
# frozen_string_literal: true
RSpec.shared_examples 'reactive cacheable worker' do
describe '#perform' do
context 'when reactive cache worker class is found' do
let!(:cluster) { create(:cluster, :project, :provided_by_gcp) }
let(:project) { cluster.project }
let!(:environment) { create(:environment, project: project) }
it 'calls #exclusively_update_reactive_cache!' do
expect_any_instance_of(Environment).to receive(:exclusively_update_reactive_cache!)
described_class.new.perform("Environment", environment.id)
end
context 'when ReactiveCaching::ExceededReactiveCacheLimit is raised' do
it 'avoids failing the job and tracks via Gitlab::ErrorTracking' do
allow_any_instance_of(Environment).to receive(:exclusively_update_reactive_cache!)
.and_raise(ReactiveCaching::ExceededReactiveCacheLimit)
expect(Gitlab::ErrorTracking).to receive(:track_exception)
.with(kind_of(ReactiveCaching::ExceededReactiveCacheLimit))
described_class.new.perform("Environment", environment.id)
end
end
end
context 'when reactive cache worker class is not found' do
it 'raises no error' do
expect { described_class.new.perform("Environment", -1) }.not_to raise_error
end
end
context 'when reactive cache worker class is invalid' do
it 'raises no error' do
expect { described_class.new.perform("FooBarKux", -1) }.not_to raise_error
end
end
end
describe 'worker context' do
it 'sets the related class on the job' do
described_class.perform_async('Environment', 1, 'other', 'argument')
scheduled_job = described_class.jobs.first
expect(scheduled_job).to include('meta.related_class' => 'Environment')
end
it 'sets the related class on the job when it was passed as a class' do
described_class.perform_async(Project, 1, 'other', 'argument')
scheduled_job = described_class.jobs.first
expect(scheduled_job).to include('meta.related_class' => 'Project')
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe ExternalServiceReactiveCachingWorker do
it_behaves_like 'reactive cacheable worker'
end
......@@ -3,47 +3,5 @@
require 'spec_helper'
describe ReactiveCachingWorker do
describe '#perform' do
context 'when user configured kubernetes from CI/CD > Clusters' do
let!(:cluster) { create(:cluster, :project, :provided_by_gcp) }
let(:project) { cluster.project }
let!(:environment) { create(:environment, project: project) }
it 'calls #exclusively_update_reactive_cache!' do
expect_any_instance_of(Environment).to receive(:exclusively_update_reactive_cache!)
described_class.new.perform("Environment", environment.id)
end
context 'when ReactiveCaching::ExceededReactiveCacheLimit is raised' do
it 'avoids failing the job and tracks via Gitlab::ErrorTracking' do
allow_any_instance_of(Environment).to receive(:exclusively_update_reactive_cache!)
.and_raise(ReactiveCaching::ExceededReactiveCacheLimit)
expect(Gitlab::ErrorTracking).to receive(:track_exception)
.with(kind_of(ReactiveCaching::ExceededReactiveCacheLimit))
described_class.new.perform("Environment", environment.id)
end
end
end
end
describe 'worker context' do
it 'sets the related class on the job' do
described_class.perform_async('Environment', 1, 'other', 'argument')
scheduled_job = described_class.jobs.first
expect(scheduled_job).to include('meta.related_class' => 'Environment')
end
it 'sets the related class on the job when it was passed as a class' do
described_class.perform_async(Project, 1, 'other', 'argument')
scheduled_job = described_class.jobs.first
expect(scheduled_job).to include('meta.related_class' => 'Project')
end
end
it_behaves_like 'reactive cacheable worker'
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment