Commit e06e56ba authored by Andrew Newdigate's avatar Andrew Newdigate Committed by Sean McGivern

Allow config of sidekiq-cluster with query

parent e622d585
......@@ -3,6 +3,7 @@
require 'optparse'
require 'logger'
require 'time'
require 'concurrent'
module Gitlab
module SidekiqCluster
......@@ -45,8 +46,14 @@ module Gitlab
all_queues = SidekiqConfig::CliMethods.worker_queues(@rails_path)
queue_groups.map! do |queues|
SidekiqConfig::CliMethods.expand_queues(queues, all_queues)
# When using the experimental Queue query syntax,
# we treat each queue group as a worker attribute
# query, and resolve the queues for the queue group
# using this query.
if @queue_query_syntax
queue_groups.map! { |queues| SidekiqConfig.query_workers(queues).map(&:queue) }
else
queue_groups.map! { |queues| SidekiqConfig::CliMethods.expand_queues(queues, all_queues) }
end
if @negate_queues
......@@ -151,6 +158,10 @@ module Gitlab
@rails_path = path
end
opt.on('--[-no-]-experimental-queue-query-syntax', 'Run workers based on the provided selector') do |queue_query_syntax|
@queue_query_syntax = queue_query_syntax
end
opt.on('-n', '--negate', 'Run workers for all queues in sidekiq_queues.yml except the given ones') do
@negate_queues = true
end
......
......@@ -104,5 +104,59 @@ module Gitlab
ns.camelize.constantize
end
end
def self.query_workers(query_string)
predicate = query_string_to_lambda(query_string)
workers.filter(&predicate)
end
def self.query_string_to_lambda(query_string)
or_clauses = query_string.split(%r{\s+}).map do |and_clauses_string|
and_clauses_predicates = and_clauses_string.split(',').map do |term|
match = term.match(%r{^(\w+)(!?=)([\w|]+)})
raise "invalid term #{term}" unless match
lhs = match[1]
op = match[2]
rhs = match[3]
predicate_for_op(op, predicate_factory(lhs, rhs.split('|')))
end
lambda { |worker| and_clauses_predicates.all? { |predicate| predicate.call(worker) } }
end
lambda { |worker| or_clauses.any? { |predicate| predicate.call(worker) } }
end
def self.predicate_for_op(op, predicate)
case op
when "="
predicate
when "!="
lambda { |worker| !predicate.call(worker) }
else
raise "unknown op #{op}"
end
end
def self.predicate_factory(lhs, values)
case lhs
when "resource_boundary"
values_sym = values.map(&:to_sym)
lambda { |worker| values_sym.include? worker.get_worker_resource_boundary }
when "latency_sensitive"
values_bool = values.map { |v| v.casecmp("true").zero? }
lambda { |worker| values_bool.include? worker.latency_sensitive_worker? }
when "feature_category"
values_sym = values.map(&:to_sym)
lambda { |worker| values_sym.include? worker.get_feature_category }
else
raise "unknown predicate #{lhs}"
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment