Commit c75c99e0 authored by James Lopez's avatar James Lopez

Merge branch...

Merge branch 'qmnguyen0711/1016-implement-sidekiq-queue-re-routing-in-the-application' into 'master'

Extract Sidekiq query selector matching logic to a new class

See merge request gitlab-org/gitlab!59550
parents ce86b2ff 1beb0c24
......@@ -5,6 +5,7 @@ require 'optparse'
require_relative '../lib/gitlab'
require_relative '../lib/gitlab/utils'
require_relative '../lib/gitlab/sidekiq_config/cli_methods'
require_relative '../lib/gitlab/sidekiq_config/worker_matcher'
require_relative '../lib/gitlab/sidekiq_cluster'
require_relative '../lib/gitlab/sidekiq_cluster/cli'
......
......@@ -53,11 +53,11 @@ module Gitlab
'You cannot specify --queue-selector and --experimental-queue-selector together'
end
all_queues = SidekiqConfig::CliMethods.all_queues(@rails_path)
queue_names = SidekiqConfig::CliMethods.worker_queues(@rails_path)
worker_metadatas = SidekiqConfig::CliMethods.worker_metadatas(@rails_path)
worker_queues = SidekiqConfig::CliMethods.worker_queues(@rails_path)
queue_groups = argv.map do |queues|
next queue_names if queues == '*'
queue_groups = argv.map do |queues_or_query_string|
next worker_queues if queues_or_query_string == SidekiqConfig::WorkerMatcher::WILDCARD_MATCH
# When using the queue query syntax, we treat each queue group
# as a worker attribute query, and resolve the queues for the
......@@ -65,14 +65,14 @@ module Gitlab
# Simplify with https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/646
if @queue_selector || @experimental_queue_selector
SidekiqConfig::CliMethods.query_workers(queues, all_queues)
SidekiqConfig::CliMethods.query_queues(queues_or_query_string, worker_metadatas)
else
SidekiqConfig::CliMethods.expand_queues(queues.split(','), queue_names)
SidekiqConfig::CliMethods.expand_queues(queues_or_query_string.split(','), worker_queues)
end
end
if @negate_queues
queue_groups.map! { |queues| queue_names - queues }
queue_groups.map! { |queues| worker_queues - queues }
end
if queue_groups.all?(&:empty?)
......
......@@ -12,35 +12,19 @@ module Gitlab
# rubocop:disable Gitlab/ModuleWithInstanceVariables
extend self
# The file names are misleading. Those files contain the metadata of the
# workers. They should be renamed to all_workers instead.
# https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1018
QUEUE_CONFIG_PATHS = begin
result = %w[app/workers/all_queues.yml]
result << 'ee/app/workers/all_queues.yml' if Gitlab.ee?
result
end.freeze
QUERY_OR_OPERATOR = '|'
QUERY_AND_OPERATOR = '&'
QUERY_CONCATENATE_OPERATOR = ','
QUERY_TERM_REGEX = %r{^(\w+)(!?=)([\w:#{QUERY_CONCATENATE_OPERATOR}]+)}.freeze
def worker_metadatas(rails_path = Rails.root.to_s)
@worker_metadatas ||= {}
QUERY_PREDICATES = {
feature_category: :to_sym,
has_external_dependencies: lambda { |value| value == 'true' },
name: :to_s,
resource_boundary: :to_sym,
tags: :to_sym,
urgency: :to_sym
}.freeze
QueryError = Class.new(StandardError)
InvalidTerm = Class.new(QueryError)
UnknownOperator = Class.new(QueryError)
UnknownPredicate = Class.new(QueryError)
def all_queues(rails_path = Rails.root.to_s)
@worker_queues ||= {}
@worker_queues[rails_path] ||= QUEUE_CONFIG_PATHS.flat_map do |path|
@worker_metadatas[rails_path] ||= QUEUE_CONFIG_PATHS.flat_map do |path|
full_path = File.join(rails_path, path)
File.exist?(full_path) ? YAML.load_file(full_path) : []
......@@ -49,7 +33,7 @@ module Gitlab
# rubocop:enable Gitlab/ModuleWithInstanceVariables
def worker_queues(rails_path = Rails.root.to_s)
worker_names(all_queues(rails_path))
worker_names(worker_metadatas(rails_path))
end
def expand_queues(queues, all_queues = self.worker_queues)
......@@ -62,13 +46,18 @@ module Gitlab
end
end
def query_workers(query_string, queues)
worker_names(queues.select(&query_string_to_lambda(query_string)))
def query_queues(query_string, worker_metadatas)
matcher = SidekiqConfig::WorkerMatcher.new(query_string)
selected_metadatas = worker_metadatas.select do |worker_metadata|
matcher.match?(worker_metadata)
end
worker_names(selected_metadatas)
end
def clear_memoization!
if instance_variable_defined?('@worker_queues')
remove_instance_variable('@worker_queues')
if instance_variable_defined?('@worker_metadatas')
remove_instance_variable('@worker_metadatas')
end
end
......@@ -77,53 +66,6 @@ module Gitlab
def worker_names(workers)
workers.map { |queue| queue[:name] }
end
def query_string_to_lambda(query_string)
or_clauses = query_string.split(QUERY_OR_OPERATOR).map do |and_clauses_string|
and_clauses_predicates = and_clauses_string.split(QUERY_AND_OPERATOR).map do |term|
predicate_for_term(term)
end
lambda { |worker| and_clauses_predicates.all? { |predicate| predicate.call(worker) } }
end
lambda { |worker| or_clauses.any? { |predicate| predicate.call(worker) } }
end
def predicate_for_term(term)
match = term.match(QUERY_TERM_REGEX)
raise InvalidTerm.new("Invalid term: #{term}") unless match
_, lhs, op, rhs = *match
predicate_for_op(op, predicate_factory(lhs, rhs.split(QUERY_CONCATENATE_OPERATOR)))
end
def predicate_for_op(op, predicate)
case op
when '='
predicate
when '!='
lambda { |worker| !predicate.call(worker) }
else
# This is unreachable because InvalidTerm will be raised instead, but
# keeping it allows to guard against that changing in future.
raise UnknownOperator.new("Unknown operator: #{op}")
end
end
def predicate_factory(lhs, values)
values_block = QUERY_PREDICATES[lhs.to_sym]
raise UnknownPredicate.new("Unknown predicate: #{lhs}") unless values_block
lambda do |queue|
comparator = Array(queue[lhs.to_sym]).to_set
values.map(&values_block).to_set.intersect?(comparator)
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module SidekiqConfig
class WorkerMatcher
WILDCARD_MATCH = '*'
QUERY_OR_OPERATOR = '|'
QUERY_AND_OPERATOR = '&'
QUERY_CONCATENATE_OPERATOR = ','
QUERY_TERM_REGEX = %r{^(\w+)(!?=)([\w:#{QUERY_CONCATENATE_OPERATOR}]+)}.freeze
QUERY_PREDICATES = {
feature_category: :to_sym,
has_external_dependencies: lambda { |value| value == 'true' },
name: :to_s,
resource_boundary: :to_sym,
tags: :to_sym,
urgency: :to_sym
}.freeze
QueryError = Class.new(StandardError)
InvalidTerm = Class.new(QueryError)
UnknownOperator = Class.new(QueryError)
UnknownPredicate = Class.new(QueryError)
def initialize(query_string)
@match_lambda = query_string_to_lambda(query_string)
end
def match?(worker_metadata)
@match_lambda.call(worker_metadata)
end
private
def query_string_to_lambda(query_string)
return lambda { |_worker| true } if query_string.strip == WILDCARD_MATCH
or_clauses = query_string.split(QUERY_OR_OPERATOR).map do |and_clauses_string|
and_clauses_predicates = and_clauses_string.split(QUERY_AND_OPERATOR).map do |term|
predicate_for_term(term)
end
lambda { |worker| and_clauses_predicates.all? { |predicate| predicate.call(worker) } }
end
lambda { |worker| or_clauses.any? { |predicate| predicate.call(worker) } }
end
def predicate_for_term(term)
match = term.match(QUERY_TERM_REGEX)
raise InvalidTerm.new("Invalid term: #{term}") unless match
_, lhs, op, rhs = *match
predicate_for_op(op, predicate_factory(lhs, rhs.split(QUERY_CONCATENATE_OPERATOR)))
end
def predicate_for_op(op, predicate)
case op
when '='
predicate
when '!='
lambda { |worker| !predicate.call(worker) }
else
# This is unreachable because InvalidTerm will be raised instead, but
# keeping it allows to guard against that changing in future.
raise UnknownOperator.new("Unknown operator: #{op}")
end
end
def predicate_factory(lhs, values)
values_block = QUERY_PREDICATES[lhs.to_sym]
raise UnknownPredicate.new("Unknown predicate: #{lhs}") unless values_block
lambda do |queue|
comparator = Array(queue[lhs.to_sym]).to_set
values.map(&values_block).to_set.intersect?(comparator)
end
end
end
end
end
......@@ -214,7 +214,7 @@ RSpec.describe Gitlab::SidekiqCluster::CLI do
expect(Gitlab::SidekiqCluster).not_to receive(:start)
expect { cli.run(%W(#{flag} unknown_field=chatops)) }
.to raise_error(Gitlab::SidekiqConfig::CliMethods::QueryError)
.to raise_error(Gitlab::SidekiqConfig::WorkerMatcher::QueryError)
end
end
end
......
# frozen_string_literal: true
require 'fast_spec_helper'
require 'rspec-parameterized'
RSpec.describe Gitlab::SidekiqConfig::CliMethods do
let(:dummy_root) { '/tmp/' }
......@@ -122,10 +121,8 @@ RSpec.describe Gitlab::SidekiqConfig::CliMethods do
end
end
describe '.query_workers' do
using RSpec::Parameterized::TableSyntax
let(:queues) do
describe '.query_queues' do
let(:worker_metadatas) do
[
{
name: 'a',
......@@ -162,79 +159,16 @@ RSpec.describe Gitlab::SidekiqConfig::CliMethods do
]
end
context 'with valid input' do
where(:query, :selected_queues) do
# feature_category
'feature_category=category_a' | %w(a a:2)
'feature_category=category_a,category_c' | %w(a a:2 c)
'feature_category=category_a|feature_category=category_c' | %w(a a:2 c)
'feature_category!=category_a' | %w(b c)
# has_external_dependencies
'has_external_dependencies=true' | %w(b)
'has_external_dependencies=false' | %w(a a:2 c)
'has_external_dependencies=true,false' | %w(a a:2 b c)
'has_external_dependencies=true|has_external_dependencies=false' | %w(a a:2 b c)
'has_external_dependencies!=true' | %w(a a:2 c)
# urgency
'urgency=high' | %w(a:2 b)
'urgency=low' | %w(a)
'urgency=high,low,throttled' | %w(a a:2 b c)
'urgency=low|urgency=throttled' | %w(a c)
'urgency!=high' | %w(a c)
# name
'name=a' | %w(a)
'name=a,b' | %w(a b)
'name=a,a:2|name=b' | %w(a a:2 b)
'name!=a,a:2' | %w(b c)
# resource_boundary
'resource_boundary=memory' | %w(b c)
'resource_boundary=memory,cpu' | %w(a b c)
'resource_boundary=memory|resource_boundary=cpu' | %w(a b c)
'resource_boundary!=memory,cpu' | %w(a:2)
# tags
'tags=no_disk_io' | %w(a b)
'tags=no_disk_io,git_access' | %w(a a:2 b)
'tags=no_disk_io|tags=git_access' | %w(a a:2 b)
'tags=no_disk_io&tags=git_access' | %w(a)
'tags!=no_disk_io' | %w(a:2 c)
'tags!=no_disk_io,git_access' | %w(c)
'tags=unknown_tag' | []
'tags!=no_disk_io' | %w(a:2 c)
'tags!=no_disk_io,git_access' | %w(c)
'tags!=unknown_tag' | %w(a a:2 b c)
# combinations
'feature_category=category_a&urgency=high' | %w(a:2)
'feature_category=category_a&urgency=high|feature_category=category_c' | %w(a:2 c)
end
let(:worker_matcher) { double(:WorkerMatcher) }
let(:query) { 'feature_category=category_a,category_c' }
with_them do
it do
expect(described_class.query_workers(query, queues))
.to match_array(selected_queues)
end
end
before do
allow(::Gitlab::SidekiqConfig::WorkerMatcher).to receive(:new).with(query).and_return(worker_matcher)
allow(worker_matcher).to receive(:match?).and_return(true, true, false, true)
end
context 'with invalid input' do
where(:query, :error) do
'feature_category="category_a"' | described_class::InvalidTerm
'feature_category=' | described_class::InvalidTerm
'feature_category~category_a' | described_class::InvalidTerm
'worker_name=a' | described_class::UnknownPredicate
end
with_them do
it do
expect { described_class.query_workers(query, queues) }
.to raise_error(error)
end
end
it 'returns the queue names of matched workers' do
expect(described_class.query_queues(query, worker_metadatas)).to match(%w(a a:2 c))
end
end
end
# frozen_string_literal: true
require 'fast_spec_helper'
require 'rspec-parameterized'
RSpec.describe Gitlab::SidekiqConfig::WorkerMatcher do
describe '#match?' do
using RSpec::Parameterized::TableSyntax
let(:worker_metadatas) do
[
{
name: 'a',
feature_category: :category_a,
has_external_dependencies: false,
urgency: :low,
resource_boundary: :cpu,
tags: [:no_disk_io, :git_access]
},
{
name: 'a:2',
feature_category: :category_a,
has_external_dependencies: false,
urgency: :high,
resource_boundary: :none,
tags: [:git_access]
},
{
name: 'b',
feature_category: :category_b,
has_external_dependencies: true,
urgency: :high,
resource_boundary: :memory,
tags: [:no_disk_io]
},
{
name: 'c',
feature_category: :category_c,
has_external_dependencies: false,
urgency: :throttled,
resource_boundary: :memory,
tags: []
}
]
end
context 'with valid input' do
where(:query, :expected_metadatas) do
# feature_category
'feature_category=category_a' | %w(a a:2)
'feature_category=category_a,category_c' | %w(a a:2 c)
'feature_category=category_a|feature_category=category_c' | %w(a a:2 c)
'feature_category!=category_a' | %w(b c)
# has_external_dependencies
'has_external_dependencies=true' | %w(b)
'has_external_dependencies=false' | %w(a a:2 c)
'has_external_dependencies=true,false' | %w(a a:2 b c)
'has_external_dependencies=true|has_external_dependencies=false' | %w(a a:2 b c)
'has_external_dependencies!=true' | %w(a a:2 c)
# urgency
'urgency=high' | %w(a:2 b)
'urgency=low' | %w(a)
'urgency=high,low,throttled' | %w(a a:2 b c)
'urgency=low|urgency=throttled' | %w(a c)
'urgency!=high' | %w(a c)
# name
'name=a' | %w(a)
'name=a,b' | %w(a b)
'name=a,a:2|name=b' | %w(a a:2 b)
'name!=a,a:2' | %w(b c)
# resource_boundary
'resource_boundary=memory' | %w(b c)
'resource_boundary=memory,cpu' | %w(a b c)
'resource_boundary=memory|resource_boundary=cpu' | %w(a b c)
'resource_boundary!=memory,cpu' | %w(a:2)
# tags
'tags=no_disk_io' | %w(a b)
'tags=no_disk_io,git_access' | %w(a a:2 b)
'tags=no_disk_io|tags=git_access' | %w(a a:2 b)
'tags=no_disk_io&tags=git_access' | %w(a)
'tags!=no_disk_io' | %w(a:2 c)
'tags!=no_disk_io,git_access' | %w(c)
'tags=unknown_tag' | []
'tags!=no_disk_io' | %w(a:2 c)
'tags!=no_disk_io,git_access' | %w(c)
'tags!=unknown_tag' | %w(a a:2 b c)
# combinations
'feature_category=category_a&urgency=high' | %w(a:2)
'feature_category=category_a&urgency=high|feature_category=category_c' | %w(a:2 c)
# Match all
'*' | %w(a a:2 b c)
end
with_them do
it do
matched_metadatas = worker_metadatas.select do |metadata|
described_class.new(query).match?(metadata)
end
expect(matched_metadatas.map { |m| m[:name] }).to match_array(expected_metadatas)
end
end
end
context 'with invalid input' do
where(:query, :error) do
'feature_category="category_a"' | described_class::InvalidTerm
'feature_category=' | described_class::InvalidTerm
'feature_category~category_a' | described_class::InvalidTerm
'worker_name=a' | described_class::UnknownPredicate
end
with_them do
it do
worker_metadatas.each do |metadata|
expect { described_class.new(query).match?(metadata) }
.to raise_error(error)
end
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment