Commit ae2ffcfc authored by Quang-Minh Nguyen's avatar Quang-Minh Nguyen Committed by Mark Chao

Implement Sidekiq queue re-routing in the application

parent d6a09e6d
......@@ -16,6 +16,7 @@ module ApplicationWorker
included do
set_queue
after_set_class_attribute { set_queue }
def structured_payload(payload = {})
context = Gitlab::ApplicationContext.current.merge(
......@@ -47,22 +48,14 @@ module ApplicationWorker
class_methods do
def inherited(subclass)
subclass.set_queue
subclass.after_set_class_attribute { subclass.set_queue }
end
def set_queue
queue_name = [queue_namespace, base_queue_name].compact.join(':')
queue_name = ::Gitlab::SidekiqConfig::WorkerRouter.global.route(self)
sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue
end
def base_queue_name
name
.sub(/\AGitlab::/, '')
.sub(/Worker\z/, '')
.underscore
.tr('/', '_')
end
def queue_namespace(new_namespace = nil)
if new_namespace
sidekiq_options queue_namespace: new_namespace
......
......@@ -36,13 +36,13 @@ module WorkerAttributes
def feature_category(value, *extras)
raise "Invalid category. Use `feature_category_not_owned!` to mark a worker as not owned" if value == :not_owned
class_attributes[:feature_category] = value
set_class_attribute(:feature_category, value)
end
# Special case: mark this work as not associated with a feature category
# this should be used for cross-cutting concerns, such as mailer workers.
def feature_category_not_owned!
class_attributes[:feature_category] = :not_owned
set_class_attribute(:feature_category, :not_owned)
end
def get_feature_category
......@@ -64,7 +64,7 @@ module WorkerAttributes
def urgency(urgency)
raise "Invalid urgency: #{urgency}" unless VALID_URGENCIES.include?(urgency)
class_attributes[:urgency] = urgency
set_class_attribute(:urgency, urgency)
end
def get_urgency
......@@ -75,8 +75,8 @@ module WorkerAttributes
raise ArgumentError, "Invalid data consistency: #{data_consistency}" unless VALID_DATA_CONSISTENCIES.include?(data_consistency)
raise ArgumentError, 'Data consistency is already set' if class_attributes[:data_consistency]
class_attributes[:data_consistency_feature_flag] = feature_flag if feature_flag
class_attributes[:data_consistency] = data_consistency
set_class_attribute(:data_consistency_feature_flag, feature_flag) if feature_flag
set_class_attribute(:data_consistency, data_consistency)
validate_worker_attributes!
end
......@@ -105,7 +105,7 @@ module WorkerAttributes
# doc/development/sidekiq_style_guide.md#jobs-with-external-dependencies for
# details
def worker_has_external_dependencies!
class_attributes[:external_dependencies] = true
set_class_attribute(:external_dependencies, true)
end
# Returns a truthy value if the worker has external dependencies.
......@@ -118,7 +118,7 @@ module WorkerAttributes
def worker_resource_boundary(boundary)
raise "Invalid boundary" unless VALID_RESOURCE_BOUNDARIES.include? boundary
class_attributes[:resource_boundary] = boundary
set_class_attribute(:resource_boundary, boundary)
end
def get_worker_resource_boundary
......@@ -126,7 +126,7 @@ module WorkerAttributes
end
def idempotent!
class_attributes[:idempotent] = true
set_class_attribute(:idempotent, true)
validate_worker_attributes!
end
......@@ -136,7 +136,7 @@ module WorkerAttributes
end
def weight(value)
class_attributes[:weight] = value
set_class_attribute(:weight, value)
end
def get_weight
......@@ -146,7 +146,7 @@ module WorkerAttributes
end
def tags(*values)
class_attributes[:tags] = values
set_class_attribute(:tags, values)
end
def get_tags
......@@ -154,8 +154,8 @@ module WorkerAttributes
end
def deduplicate(strategy, options = {})
class_attributes[:deduplication_strategy] = strategy
class_attributes[:deduplication_options] = options
set_class_attribute(:deduplication_strategy, strategy)
set_class_attribute(:deduplication_options, options)
end
def get_deduplicate_strategy
......@@ -168,7 +168,7 @@ module WorkerAttributes
end
def big_payload!
class_attributes[:big_payload] = true
set_class_attribute(:big_payload, true)
end
def big_payload?
......
---
title: Implement Sidekiq queue re-routing in the application
merge_request: 59604
author:
type: added
......@@ -438,6 +438,12 @@ production: &base
## Sidekiq
sidekiq:
log_format: json # (default is the original format)
# An array of tuples indicating the rules for re-routing a worker to a
# desirable queue before scheduling. For example:
# routing_rules:
# - ["resource_boundary=cpu", "cpu_boundary"]
# - ["feature_category=pages", null]
# - ["*", "default"]
## Auxiliary jobs
# Periodically executed jobs, to self-heal GitLab, do external synchronizations, etc.
......
......@@ -698,6 +698,7 @@ end
#
Settings['sidekiq'] ||= Settingslogic.new({})
Settings['sidekiq']['log_format'] ||= 'default'
Settings['sidekiq']['routing_rules'] ||= []
#
# GitLab Shell
......
......@@ -14,6 +14,18 @@ module Gitlab
class_attributes[name] || superclass_attributes(name)
end
def set_class_attribute(name, value)
class_attributes[name] = value
after_hooks.each(&:call)
value
end
def after_set_class_attribute(&block)
after_hooks << block
end
private
def class_attributes
......@@ -25,6 +37,10 @@ module Gitlab
superclass.get_class_attribute(name)
end
def after_hooks
@after_hooks ||= []
end
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqConfig
class WorkerRouter
InvalidRoutingRuleError = Class.new(StandardError)
RuleEvaluator = Struct.new(:matcher, :queue_name)
def self.queue_name_from_worker_name(worker_klass)
base_queue_name =
worker_klass.name
.delete_prefix('Gitlab::')
.delete_suffix('Worker')
.underscore
.tr('/', '_')
[worker_klass.queue_namespace, base_queue_name].compact.join(':')
end
def self.global
@global_worker_router ||= new(::Gitlab.config.sidekiq.routing_rules)
rescue InvalidRoutingRuleError, ::Gitlab::SidekiqConfig::WorkerMatcher::UnknownPredicate => e
::Gitlab::ErrorTracking.track_and_raise_for_dev_exception(e)
@global_worker_router = new([])
end
# call-seq:
# router = WorkerRouter.new([
# ["resource_boundary=cpu", 'cpu_boundary'],
# ["feature_category=pages", nil],
# ["feature_category=source_code_management", ''],
# ["*", "default"]
# ])
# router.route(ACpuBoundaryWorker) # Return "cpu_boundary"
# router.route(JustAPagesWorker) # Return "just_a_pages_worker"
# router.route(PostReceive) # Return "post_receive"
# router.route(RandomWorker) # Return "default"
#
# This class is responsible for routing a Sidekiq worker to a certain
# queue defined in the input routing rules. The input routing rules, as
# described above, is an order-matter array of tuples [query, queue_name].
#
# - The query syntax is the same as the "queue selector" detailedly
# denoted in doc/administration/operations/extra_sidekiq_processes.md.
#
# - The queue_name must be a valid Sidekiq queue name. If the queue name
# is nil, or an empty string, the worker is routed to the queue generated
# by the name of the worker instead.
#
# Rules are evaluated from first to last, and as soon as we find a match
# for a given worker we stop processing for that worker (first match
# wins). If the worker doesn't match any rule, it falls back the queue
# name generated from the worker name
#
# For further information, please visit:
# https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1016
#
def initialize(routing_rules)
@rule_evaluators = parse_routing_rules(routing_rules)
end
def route(worker_klass)
# A medium representation to ensure the backward-compatibility of
# WorkerMatcher
worker_metadata = generate_worker_metadata(worker_klass)
@rule_evaluators.each do |evaluator|
if evaluator.matcher.match?(worker_metadata)
return evaluator.queue_name.presence || queue_name_from_worker_name(worker_klass)
end
end
queue_name_from_worker_name(worker_klass)
end
private
def parse_routing_rules(routing_rules)
raise InvalidRoutingRuleError, 'The set of routing rule must be an array' unless routing_rules.is_a?(Array)
routing_rules.map do |rule_tuple|
raise InvalidRoutingRuleError, "Routing rule `#{rule_tuple.inspect}` is invalid" unless valid_routing_rule?(rule_tuple)
selector, destination_queue = rule_tuple
RuleEvaluator.new(
::Gitlab::SidekiqConfig::WorkerMatcher.new(selector),
destination_queue
)
end
end
def valid_routing_rule?(rule_tuple)
rule_tuple.is_a?(Array) && rule_tuple.length == 2
end
def generate_worker_metadata(worker_klass)
# The ee indicator here is insignificant and irrelevant to the matcher.
# Plus, it's not easy to determine whether a worker is **only**
# available in EE.
::Gitlab::SidekiqConfig::Worker.new(worker_klass, ee: false).to_yaml
end
def queue_name_from_worker_name(worker_klass)
self.class.queue_name_from_worker_name(worker_klass)
end
end
end
end
......@@ -6,36 +6,62 @@ RSpec.describe Gitlab::ClassAttributes do
Class.new do
include Gitlab::ClassAttributes
def self.get_attribute(name)
get_class_attribute(name)
class << self
attr_reader :counter_1, :counter_2
# get_class_attribute and set_class_attribute are protected,
# hence those methods are for testing purpose
def get_attribute(name)
get_class_attribute(name)
end
def set_attribute(name, value)
set_class_attribute(name, value)
end
end
after_set_class_attribute do
@counter_1 ||= 0
@counter_1 += 1
end
def self.set_attribute(name, value)
class_attributes[name] = value
after_set_class_attribute do
@counter_2 ||= 0
@counter_2 += 2
end
end
end
let(:subclass) { Class.new(klass) }
describe ".get_class_attribute" do
it "returns values set on the class" do
klass.set_attribute(:foo, :bar)
it "returns values set on the class" do
klass.set_attribute(:foo, :bar)
expect(klass.get_attribute(:foo)).to eq(:bar)
end
expect(klass.get_attribute(:foo)).to eq(:bar)
end
it "returns values set on a superclass" do
klass.set_attribute(:foo, :bar)
it "returns values set on a superclass" do
klass.set_attribute(:foo, :bar)
expect(subclass.get_attribute(:foo)).to eq(:bar)
end
expect(subclass.get_attribute(:foo)).to eq(:bar)
end
it "returns values from the subclass over attributes from a superclass" do
klass.set_attribute(:foo, :baz)
subclass.set_attribute(:foo, :bar)
it "returns values from the subclass over attributes from a superclass" do
klass.set_attribute(:foo, :baz)
subclass.set_attribute(:foo, :bar)
expect(subclass.get_attribute(:foo)).to eq(:bar)
end
expect(klass.get_attribute(:foo)).to eq(:baz)
expect(subclass.get_attribute(:foo)).to eq(:bar)
end
it "triggers after hooks after set class values" do
expect(klass.counter_1).to be(nil)
expect(klass.counter_2).to be(nil)
klass.set_attribute(:foo, :bar)
klass.set_attribute(:foo, :bar)
expect(klass.counter_1).to eq(2)
expect(klass.counter_2).to eq(4)
end
end
# frozen_string_literal: true
require 'spec_helper'
require 'rspec-parameterized'
RSpec.describe Gitlab::SidekiqConfig::WorkerRouter do
describe '.queue_name_from_worker_name' do
using RSpec::Parameterized::TableSyntax
def create_worker(name, namespace = nil)
Class.new.tap do |worker|
worker.define_singleton_method(:name) { name }
worker.define_singleton_method(:queue_namespace) { namespace }
end
end
where(:worker, :expected_name) do
create_worker('PagesWorker') | 'pages'
create_worker('PipelineNotificationWorker') | 'pipeline_notification'
create_worker('PostReceive') | 'post_receive'
create_worker('PostReceive', :git) | 'git:post_receive'
create_worker('PipelineHooksWorker', :pipeline_hooks) | 'pipeline_hooks:pipeline_hooks'
create_worker('Gitlab::JiraImport::AdvanceStageWorker') | 'jira_import_advance_stage'
create_worker('Gitlab::PhabricatorImport::ImportTasksWorker', :importer) | 'importer:phabricator_import_import_tasks'
end
with_them do
it 'generates a valid queue name from worker name' do
expect(described_class.queue_name_from_worker_name(worker)).to eql(expected_name)
end
end
end
shared_context 'router examples setup' do
using RSpec::Parameterized::TableSyntax
let(:worker) do
Class.new do
def self.name
'Gitlab::Foo::BarWorker'
end
include ApplicationWorker
feature_category :feature_a
urgency :low
worker_resource_boundary :cpu
tags :expensive
end
end
where(:routing_rules, :expected_queue) do
# Default, no configuration
[] | 'foo_bar'
# Does not match, fallback to the named queue
[
['feature_category=feature_b|urgency=high', 'queue_a'],
['resource_boundary=memory', 'queue_b'],
['tags=cheap', 'queue_c']
] | 'foo_bar'
# Match a nil queue, fallback to named queue
[
['feature_category=feature_b|urgency=high', 'queue_a'],
['resource_boundary=cpu', nil],
['tags=cheap', 'queue_c']
] | 'foo_bar'
# Match an empty string, fallback to named queue
[
['feature_category=feature_b|urgency=high', 'queue_a'],
['resource_boundary=cpu', ''],
['tags=cheap', 'queue_c']
] | 'foo_bar'
# Match the first rule
[
['feature_category=feature_a|urgency=high', 'queue_a'],
['resource_boundary=cpu', 'queue_b'],
['tags=cheap', 'queue_c']
] | 'queue_a'
# Match the first rule 2
[
['feature_category=feature_b|urgency=low', 'queue_a'],
['resource_boundary=cpu', 'queue_b'],
['tags=cheap', 'queue_c']
] | 'queue_a'
# Match the third rule
[
['feature_category=feature_b|urgency=high', 'queue_a'],
['resource_boundary=memory', 'queue_b'],
['tags=expensive', 'queue_c']
] | 'queue_c'
# Match all, first match wins
[
['feature_category=feature_a|urgency=low', 'queue_a'],
['resource_boundary=cpu', 'queue_b'],
['tags=expensive', 'queue_c']
] | 'queue_a'
# Match the same rule multiple times, the first match wins
[
['feature_category=feature_a', 'queue_a'],
['feature_category=feature_a', 'queue_b'],
['feature_category=feature_a', 'queue_c']
] | 'queue_a'
# Match wildcard
[
['feature_category=feature_b|urgency=high', 'queue_a'],
['resource_boundary=memory', 'queue_b'],
['tags=cheap', 'queue_c'],
['*', 'default']
] | 'default'
# Match wildcard at the top of the chain. It makes the following rules useless
[
['*', 'queue_foo'],
['feature_category=feature_a|urgency=low', 'queue_a'],
['resource_boundary=cpu', 'queue_b'],
['tags=expensive', 'queue_c']
] | 'queue_foo'
end
end
describe '.global' do
before do
described_class.remove_instance_variable(:@global_worker_router) if described_class.instance_variable_defined?(:@global_worker_router)
end
after do
described_class.remove_instance_variable(:@global_worker_router)
end
context 'valid routing rules' do
include_context 'router examples setup'
with_them do
before do
stub_config(sidekiq: { routing_rules: routing_rules })
end
it 'routes the worker to the correct queue' do
expect(described_class.global.route(worker)).to eql(expected_queue)
end
end
end
context 'invalid routing rules' do
let(:worker) do
Class.new do
def self.name
'Gitlab::Foo::BarWorker'
end
include ApplicationWorker
end
end
before do
stub_config(sidekiq: { routing_rules: routing_rules })
end
context 'invalid routing rules format' do
let(:routing_rules) { ['feature_category=a'] }
it 'captures the error and falls back to an empty route' do
expect(Gitlab::ErrorTracking).to receive(:track_and_raise_for_dev_exception).with(be_a(described_class::InvalidRoutingRuleError))
expect(described_class.global.route(worker)).to eql('foo_bar')
end
end
context 'invalid predicate' do
let(:routing_rules) { [['invalid_term=a', 'queue_a']] }
it 'captures the error and falls back to an empty route' do
expect(Gitlab::ErrorTracking).to receive(:track_and_raise_for_dev_exception).with(
be_a(Gitlab::SidekiqConfig::WorkerMatcher::UnknownPredicate)
)
expect(described_class.global.route(worker)).to eql('foo_bar')
end
end
end
end
describe '#route' do
context 'valid routing rules' do
include_context 'router examples setup'
with_them do
it 'routes the worker to the correct queue' do
router = described_class.new(routing_rules)
expect(router.route(worker)).to eql(expected_queue)
end
end
end
context 'invalid routing rules' do
it 'raises an exception' do
expect { described_class.new(nil) }.to raise_error(described_class::InvalidRoutingRuleError)
expect { described_class.new(['feature_category=a']) }.to raise_error(described_class::InvalidRoutingRuleError)
expect { described_class.new([['feature_category=a', 'queue_a', 'queue_b']]) }.to raise_error(described_class::InvalidRoutingRuleError)
expect do
described_class.new(
[
['feature_category=a', 'queue_b'],
['feature_category=b']
]
)
end.to raise_error(described_class::InvalidRoutingRuleError)
expect { described_class.new([['invalid_term=a', 'queue_a']]) }.to raise_error(Gitlab::SidekiqConfig::WorkerMatcher::UnknownPredicate)
end
end
end
end
......@@ -3,7 +3,14 @@
require 'spec_helper'
RSpec.describe ApplicationWorker do
let_it_be(:worker) do
# We depend on the lazy-load characteristic of rspec. If the worker is loaded
# before setting up, it's likely to go wrong. Consider this catcha:
# before do
# allow(router).to receive(:route).with(worker).and_return('queue_1')
# end
# As worker is triggered, it includes ApplicationWorker, and the router is
# called before it is stubbed. That makes the stubbing useless.
let(:worker) do
Class.new do
def self.name
'Gitlab::Foo::Bar::DummyWorker'
......@@ -14,10 +21,77 @@ RSpec.describe ApplicationWorker do
end
let(:instance) { worker.new }
let(:router) { double(:router) }
describe 'Sidekiq options' do
it 'sets the queue name based on the class name' do
before do
allow(::Gitlab::SidekiqConfig::WorkerRouter).to receive(:global).and_return(router)
allow(router).to receive(:route).and_return('foo_bar_dummy')
end
describe 'Sidekiq attributes' do
it 'sets the queue name based on the output of the router' do
expect(worker.sidekiq_options['queue']).to eq('foo_bar_dummy')
expect(router).to have_received(:route).with(worker).at_least(:once)
end
context 'when a worker attribute is updated' do
before do
counter = 0
allow(router).to receive(:route) do
counter += 1
"queue_#{counter}"
end
end
it 'updates the queue name afterward' do
expect(worker.sidekiq_options['queue']).to eq('queue_1')
worker.feature_category :pages
expect(worker.sidekiq_options['queue']).to eq('queue_2')
worker.feature_category_not_owned!
expect(worker.sidekiq_options['queue']).to eq('queue_3')
worker.urgency :high
expect(worker.sidekiq_options['queue']).to eq('queue_4')
worker.worker_has_external_dependencies!
expect(worker.sidekiq_options['queue']).to eq('queue_5')
worker.worker_resource_boundary :cpu
expect(worker.sidekiq_options['queue']).to eq('queue_6')
worker.idempotent!
expect(worker.sidekiq_options['queue']).to eq('queue_7')
worker.weight 3
expect(worker.sidekiq_options['queue']).to eq('queue_8')
worker.tags :hello
expect(worker.sidekiq_options['queue']).to eq('queue_9')
worker.big_payload!
expect(worker.sidekiq_options['queue']).to eq('queue_10')
expect(router).to have_received(:route).with(worker).at_least(10).times
end
end
context 'when the worker is inherited' do
let(:sub_worker) { Class.new(worker) }
before do
allow(router).to receive(:route).and_return('queue_1')
worker # Force loading worker 1 to update its queue
allow(router).to receive(:route).and_return('queue_2')
end
it 'sets the queue name for the inherited worker' do
expect(sub_worker.sidekiq_options['queue']).to eq('queue_2')
expect(router).to have_received(:route).with(sub_worker).at_least(:once)
end
end
end
......@@ -74,11 +148,24 @@ RSpec.describe ApplicationWorker do
end
describe '.queue_namespace' do
it 'sets the queue name based on the class name' do
before do
allow(router).to receive(:route).and_return('foo_bar_dummy', 'some_namespace:foo_bar_dummy')
end
it 'updates the queue name from the router again' do
expect(worker.queue).to eq('foo_bar_dummy')
worker.queue_namespace :some_namespace
expect(worker.queue).to eq('some_namespace:foo_bar_dummy')
end
it 'updates the queue_namespace options of the worker' do
worker.queue_namespace :some_namespace
expect(worker.queue_namespace).to eql('some_namespace')
expect(worker.sidekiq_options['queue_namespace']).to be(:some_namespace)
end
end
describe '.queue' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment