Commit 87fa73c5 authored by Andrew Newdigate's avatar Andrew Newdigate Committed by Kamil Trzciński

Attribute Sidekiq workers according to their workloads

These workload attributes include:

1. Are jobs for this worker latency sensitve?
1. Does the worker rely on external dependencies?
1. What resource boundary is the worker limited by? This can be either
   cpu or memory.

These attributes are non-mandatory, but in future, SLOS for workers will
be determined by these attributes, and therefore, if a job has specific
latency requirements, they should be configured through these
attributes.
parent 629dda38
...@@ -3,6 +3,10 @@ ...@@ -3,6 +3,10 @@
module WorkerAttributes module WorkerAttributes
extend ActiveSupport::Concern extend ActiveSupport::Concern
# Resource boundaries that workers can declare through the
# `worker_resource_boundary` attribute
VALID_RESOURCE_BOUNDARIES = [:memory, :cpu, :unknown].freeze
class_methods do class_methods do
def feature_category(value) def feature_category(value)
raise "Invalid category. Use `feature_category_not_owned!` to mark a worker as not owned" if value == :not_owned raise "Invalid category. Use `feature_category_not_owned!` to mark a worker as not owned" if value == :not_owned
...@@ -24,6 +28,48 @@ module WorkerAttributes ...@@ -24,6 +28,48 @@ module WorkerAttributes
get_worker_attribute(:feature_category) == :not_owned get_worker_attribute(:feature_category) == :not_owned
end end
# This should be set for jobs that need to be run immediately, or, if
# they are delayed, risk creating inconsistencies in the application
# that could being perceived by the user as incorrect behavior
# (ie, a bug)
# See doc/development/sidekiq_style_guide.md#Latency-Sensitive-Jobs
# for details
def latency_sensitive_worker!
worker_attributes[:latency_sensitive] = true
end
# Returns a truthy value if the worker is latency sensitive.
# See doc/development/sidekiq_style_guide.md#Latency-Sensitive-Jobs
# for details
def latency_sensitive_worker?
worker_attributes[:latency_sensitive]
end
# Set this attribute on a job when it will call to services outside of the
# application, such as 3rd party applications, other k8s clusters etc See
# doc/development/sidekiq_style_guide.md#Jobs-with-External-Dependencies for
# details
def worker_has_external_dependencies!
worker_attributes[:external_dependencies] = true
end
# Returns a truthy value if the worker has external dependencies.
# See doc/development/sidekiq_style_guide.md#Jobs-with-External-Dependencies
# for details
def worker_has_external_dependencies?
worker_attributes[:external_dependencies]
end
def worker_resource_boundary(boundary)
raise "Invalid boundary" unless VALID_RESOURCE_BOUNDARIES.include? boundary
worker_attributes[:resource_boundary] = boundary
end
def get_worker_resource_boundary
worker_attributes[:resource_boundary] || :unknown
end
protected protected
# Returns a worker attribute declared on this class or its parent class. # Returns a worker attribute declared on this class or its parent class.
......
...@@ -5,6 +5,7 @@ class AuthorizedProjectsWorker ...@@ -5,6 +5,7 @@ class AuthorizedProjectsWorker
prepend WaitableWorker prepend WaitableWorker
feature_category :authentication_and_authorization feature_category :authentication_and_authorization
latency_sensitive_worker!
# This is a workaround for a Ruby 2.3.7 bug. rspec-mocks cannot restore the # This is a workaround for a Ruby 2.3.7 bug. rspec-mocks cannot restore the
# visibility of prepended modules. See https://github.com/rspec/rspec-mocks/issues/1231 # visibility of prepended modules. See https://github.com/rspec/rspec-mocks/issues/1231
......
...@@ -5,6 +5,8 @@ class BuildFinishedWorker ...@@ -5,6 +5,8 @@ class BuildFinishedWorker
include PipelineQueue include PipelineQueue
queue_namespace :pipeline_processing queue_namespace :pipeline_processing
latency_sensitive_worker!
worker_resource_boundary :cpu
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(build_id) def perform(build_id)
......
...@@ -6,6 +6,7 @@ class BuildHooksWorker ...@@ -6,6 +6,7 @@ class BuildHooksWorker
queue_namespace :pipeline_hooks queue_namespace :pipeline_hooks
feature_category :continuous_integration feature_category :continuous_integration
latency_sensitive_worker!
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(build_id) def perform(build_id)
......
...@@ -6,6 +6,8 @@ class BuildQueueWorker ...@@ -6,6 +6,8 @@ class BuildQueueWorker
queue_namespace :pipeline_processing queue_namespace :pipeline_processing
feature_category :continuous_integration feature_category :continuous_integration
latency_sensitive_worker!
worker_resource_boundary :cpu
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(build_id) def perform(build_id)
......
...@@ -5,6 +5,7 @@ class BuildSuccessWorker ...@@ -5,6 +5,7 @@ class BuildSuccessWorker
include PipelineQueue include PipelineQueue
queue_namespace :pipeline_processing queue_namespace :pipeline_processing
latency_sensitive_worker!
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(build_id) def perform(build_id)
......
...@@ -4,6 +4,11 @@ class ChatNotificationWorker ...@@ -4,6 +4,11 @@ class ChatNotificationWorker
include ApplicationWorker include ApplicationWorker
feature_category :chatops feature_category :chatops
latency_sensitive_worker!
# TODO: break this into multiple jobs
# as the `responder` uses external dependencies
# See https://gitlab.com/gitlab-com/gl-infra/scalability/issues/34
# worker_has_external_dependencies!
RESCHEDULE_INTERVAL = 2.seconds RESCHEDULE_INTERVAL = 2.seconds
......
...@@ -7,6 +7,7 @@ module Ci ...@@ -7,6 +7,7 @@ module Ci
queue_namespace :pipeline_processing queue_namespace :pipeline_processing
feature_category :continuous_integration feature_category :continuous_integration
worker_resource_boundary :cpu
def perform(build_id) def perform(build_id)
::Ci::Build.find_by_id(build_id).try do |build| ::Ci::Build.find_by_id(build_id).try do |build|
......
...@@ -5,6 +5,8 @@ class ClusterInstallAppWorker ...@@ -5,6 +5,8 @@ class ClusterInstallAppWorker
include ClusterQueue include ClusterQueue
include ClusterApplications include ClusterApplications
worker_has_external_dependencies!
def perform(app_name, app_id) def perform(app_name, app_id)
find_application(app_name, app_id) do |app| find_application(app_name, app_id) do |app|
Clusters::Applications::InstallService.new(app).execute Clusters::Applications::InstallService.new(app).execute
......
...@@ -5,6 +5,8 @@ class ClusterPatchAppWorker ...@@ -5,6 +5,8 @@ class ClusterPatchAppWorker
include ClusterQueue include ClusterQueue
include ClusterApplications include ClusterApplications
worker_has_external_dependencies!
def perform(app_name, app_id) def perform(app_name, app_id)
find_application(app_name, app_id) do |app| find_application(app_name, app_id) do |app|
Clusters::Applications::PatchService.new(app).execute Clusters::Applications::PatchService.new(app).execute
......
...@@ -4,6 +4,8 @@ class ClusterProjectConfigureWorker ...@@ -4,6 +4,8 @@ class ClusterProjectConfigureWorker
include ApplicationWorker include ApplicationWorker
include ClusterQueue include ClusterQueue
worker_has_external_dependencies!
def perform(project_id) def perform(project_id)
# Scheduled for removal in https://gitlab.com/gitlab-org/gitlab-foss/issues/59319 # Scheduled for removal in https://gitlab.com/gitlab-org/gitlab-foss/issues/59319
end end
......
...@@ -4,6 +4,8 @@ class ClusterProvisionWorker ...@@ -4,6 +4,8 @@ class ClusterProvisionWorker
include ApplicationWorker include ApplicationWorker
include ClusterQueue include ClusterQueue
worker_has_external_dependencies!
def perform(cluster_id) def perform(cluster_id)
Clusters::Cluster.find_by_id(cluster_id).try do |cluster| Clusters::Cluster.find_by_id(cluster_id).try do |cluster|
cluster.provider.try do |provider| cluster.provider.try do |provider|
......
...@@ -5,6 +5,8 @@ class ClusterUpgradeAppWorker ...@@ -5,6 +5,8 @@ class ClusterUpgradeAppWorker
include ClusterQueue include ClusterQueue
include ClusterApplications include ClusterApplications
worker_has_external_dependencies!
def perform(app_name, app_id) def perform(app_name, app_id)
find_application(app_name, app_id) do |app| find_application(app_name, app_id) do |app|
Clusters::Applications::UpgradeService.new(app).execute Clusters::Applications::UpgradeService.new(app).execute
......
...@@ -8,6 +8,9 @@ class ClusterWaitForAppInstallationWorker ...@@ -8,6 +8,9 @@ class ClusterWaitForAppInstallationWorker
INTERVAL = 10.seconds INTERVAL = 10.seconds
TIMEOUT = 20.minutes TIMEOUT = 20.minutes
worker_has_external_dependencies!
worker_resource_boundary :cpu
def perform(app_name, app_id) def perform(app_name, app_id)
find_application(app_name, app_id) do |app| find_application(app_name, app_id) do |app|
Clusters::Applications::CheckInstallationProgressService.new(app).execute Clusters::Applications::CheckInstallationProgressService.new(app).execute
......
...@@ -5,6 +5,8 @@ class ClusterWaitForIngressIpAddressWorker ...@@ -5,6 +5,8 @@ class ClusterWaitForIngressIpAddressWorker
include ClusterQueue include ClusterQueue
include ClusterApplications include ClusterApplications
worker_has_external_dependencies!
def perform(app_name, app_id) def perform(app_name, app_id)
find_application(app_name, app_id) do |app| find_application(app_name, app_id) do |app|
Clusters::Applications::CheckIngressIpAddressService.new(app).execute Clusters::Applications::CheckIngressIpAddressService.new(app).execute
......
...@@ -7,6 +7,8 @@ module Clusters ...@@ -7,6 +7,8 @@ module Clusters
include ClusterQueue include ClusterQueue
include ClusterApplications include ClusterApplications
worker_has_external_dependencies!
def perform(app_name, app_id) def perform(app_name, app_id)
find_application(app_name, app_id) do |app| find_application(app_name, app_id) do |app|
Clusters::Applications::UninstallService.new(app).execute Clusters::Applications::UninstallService.new(app).execute
......
...@@ -10,6 +10,9 @@ module Clusters ...@@ -10,6 +10,9 @@ module Clusters
INTERVAL = 10.seconds INTERVAL = 10.seconds
TIMEOUT = 20.minutes TIMEOUT = 20.minutes
worker_has_external_dependencies!
worker_resource_boundary :cpu
def perform(app_name, app_id) def perform(app_name, app_id)
find_application(app_name, app_id) do |app| find_application(app_name, app_id) do |app|
Clusters::Applications::CheckUninstallProgressService.new(app).execute Clusters::Applications::CheckUninstallProgressService.new(app).execute
......
...@@ -14,6 +14,7 @@ module Gitlab ...@@ -14,6 +14,7 @@ module Gitlab
include NotifyUponDeath include NotifyUponDeath
feature_category :importers feature_category :importers
worker_has_external_dependencies!
end end
# project - An instance of `Project` to import the data into. # project - An instance of `Project` to import the data into.
......
...@@ -6,6 +6,8 @@ class CreatePipelineWorker ...@@ -6,6 +6,8 @@ class CreatePipelineWorker
queue_namespace :pipeline_creation queue_namespace :pipeline_creation
feature_category :continuous_integration feature_category :continuous_integration
latency_sensitive_worker!
worker_resource_boundary :cpu
def perform(project_id, user_id, ref, source, params = {}) def perform(project_id, user_id, ref, source, params = {})
project = Project.find(project_id) project = Project.find(project_id)
......
...@@ -6,6 +6,7 @@ module Deployments ...@@ -6,6 +6,7 @@ module Deployments
queue_namespace :deployment queue_namespace :deployment
feature_category :continuous_delivery feature_category :continuous_delivery
worker_resource_boundary :cpu
def perform(deployment_id) def perform(deployment_id)
Deployment.find_by_id(deployment_id).try(:execute_hooks) Deployment.find_by_id(deployment_id).try(:execute_hooks)
......
...@@ -6,6 +6,7 @@ module Deployments ...@@ -6,6 +6,7 @@ module Deployments
queue_namespace :deployment queue_namespace :deployment
feature_category :continuous_delivery feature_category :continuous_delivery
worker_resource_boundary :cpu
def perform(deployment_id) def perform(deployment_id)
Deployment.find_by_id(deployment_id).try do |deployment| Deployment.find_by_id(deployment_id).try do |deployment|
......
...@@ -4,6 +4,7 @@ class EmailReceiverWorker ...@@ -4,6 +4,7 @@ class EmailReceiverWorker
include ApplicationWorker include ApplicationWorker
feature_category :issue_tracking feature_category :issue_tracking
latency_sensitive_worker!
def perform(raw) def perform(raw)
return unless Gitlab::IncomingEmail.enabled? return unless Gitlab::IncomingEmail.enabled?
......
...@@ -6,6 +6,8 @@ class EmailsOnPushWorker ...@@ -6,6 +6,8 @@ class EmailsOnPushWorker
attr_reader :email, :skip_premailer attr_reader :email, :skip_premailer
feature_category :source_code_management feature_category :source_code_management
latency_sensitive_worker!
worker_resource_boundary :cpu
def perform(project_id, recipients, push_data, options = {}) def perform(project_id, recipients, push_data, options = {})
options.symbolize_keys! options.symbolize_keys!
......
...@@ -5,6 +5,7 @@ class ExpireJobCacheWorker ...@@ -5,6 +5,7 @@ class ExpireJobCacheWorker
include PipelineQueue include PipelineQueue
queue_namespace :pipeline_cache queue_namespace :pipeline_cache
latency_sensitive_worker!
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(job_id) def perform(job_id)
......
...@@ -5,6 +5,8 @@ class ExpirePipelineCacheWorker ...@@ -5,6 +5,8 @@ class ExpirePipelineCacheWorker
include PipelineQueue include PipelineQueue
queue_namespace :pipeline_cache queue_namespace :pipeline_cache
latency_sensitive_worker!
worker_resource_boundary :cpu
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(pipeline_id) def perform(pipeline_id)
......
...@@ -5,6 +5,7 @@ class GitlabShellWorker ...@@ -5,6 +5,7 @@ class GitlabShellWorker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
feature_category :source_code_management feature_category :source_code_management
latency_sensitive_worker!
def perform(action, *arg) def perform(action, *arg)
gitlab_shell.__send__(action, *arg) # rubocop:disable GitlabSecurity/PublicSend gitlab_shell.__send__(action, *arg) # rubocop:disable GitlabSecurity/PublicSend
......
...@@ -4,6 +4,7 @@ class ImportIssuesCsvWorker ...@@ -4,6 +4,7 @@ class ImportIssuesCsvWorker
include ApplicationWorker include ApplicationWorker
feature_category :issue_tracking feature_category :issue_tracking
worker_resource_boundary :cpu
sidekiq_retries_exhausted do |job| sidekiq_retries_exhausted do |job|
Upload.find(job['args'][2]).destroy Upload.find(job['args'][2]).destroy
......
...@@ -8,6 +8,7 @@ module MailScheduler ...@@ -8,6 +8,7 @@ module MailScheduler
include MailSchedulerQueue include MailSchedulerQueue
feature_category :issue_tracking feature_category :issue_tracking
worker_resource_boundary :cpu
def perform(meth, *args) def perform(meth, *args)
check_arguments!(args) check_arguments!(args)
......
...@@ -4,6 +4,7 @@ class MergeWorker ...@@ -4,6 +4,7 @@ class MergeWorker
include ApplicationWorker include ApplicationWorker
feature_category :source_code_management feature_category :source_code_management
latency_sensitive_worker!
def perform(merge_request_id, current_user_id, params) def perform(merge_request_id, current_user_id, params)
params = params.with_indifferent_access params = params.with_indifferent_access
......
...@@ -6,6 +6,7 @@ module Namespaces ...@@ -6,6 +6,7 @@ module Namespaces
include CronjobQueue include CronjobQueue
feature_category :source_code_management feature_category :source_code_management
worker_resource_boundary :cpu
# Worker to prune pending rows on Namespace::AggregationSchedule # Worker to prune pending rows on Namespace::AggregationSchedule
# It's scheduled to run once a day at 1:05am. # It's scheduled to run once a day at 1:05am.
......
...@@ -5,6 +5,8 @@ class NewIssueWorker ...@@ -5,6 +5,8 @@ class NewIssueWorker
include NewIssuable include NewIssuable
feature_category :issue_tracking feature_category :issue_tracking
latency_sensitive_worker!
worker_resource_boundary :cpu
def perform(issue_id, user_id) def perform(issue_id, user_id)
return unless objects_found?(issue_id, user_id) return unless objects_found?(issue_id, user_id)
......
...@@ -5,6 +5,8 @@ class NewMergeRequestWorker ...@@ -5,6 +5,8 @@ class NewMergeRequestWorker
include NewIssuable include NewIssuable
feature_category :source_code_management feature_category :source_code_management
latency_sensitive_worker!
worker_resource_boundary :cpu
def perform(merge_request_id, user_id) def perform(merge_request_id, user_id)
return unless objects_found?(merge_request_id, user_id) return unless objects_found?(merge_request_id, user_id)
......
...@@ -4,6 +4,8 @@ class NewNoteWorker ...@@ -4,6 +4,8 @@ class NewNoteWorker
include ApplicationWorker include ApplicationWorker
feature_category :issue_tracking feature_category :issue_tracking
latency_sensitive_worker!
worker_resource_boundary :cpu
# Keep extra parameter to preserve backwards compatibility with # Keep extra parameter to preserve backwards compatibility with
# old `NewNoteWorker` jobs (can remove later) # old `NewNoteWorker` jobs (can remove later)
......
...@@ -5,6 +5,8 @@ module ObjectPool ...@@ -5,6 +5,8 @@ module ObjectPool
include ApplicationWorker include ApplicationWorker
include ObjectPoolQueue include ObjectPoolQueue
worker_resource_boundary :cpu
# The use of pool id is deprecated. Keeping the argument allows old jobs to # The use of pool id is deprecated. Keeping the argument allows old jobs to
# still be performed. # still be performed.
def perform(_pool_id, project_id) def perform(_pool_id, project_id)
......
...@@ -5,6 +5,7 @@ class PagesDomainRemovalCronWorker ...@@ -5,6 +5,7 @@ class PagesDomainRemovalCronWorker
include CronjobQueue include CronjobQueue
feature_category :pages feature_category :pages
worker_resource_boundary :cpu
def perform def perform
PagesDomain.for_removal.find_each do |domain| PagesDomain.for_removal.find_each do |domain|
......
...@@ -5,6 +5,8 @@ class PipelineHooksWorker ...@@ -5,6 +5,8 @@ class PipelineHooksWorker
include PipelineQueue include PipelineQueue
queue_namespace :pipeline_hooks queue_namespace :pipeline_hooks
latency_sensitive_worker!
worker_resource_boundary :cpu
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(pipeline_id) def perform(pipeline_id)
......
...@@ -4,6 +4,8 @@ class PipelineMetricsWorker ...@@ -4,6 +4,8 @@ class PipelineMetricsWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
latency_sensitive_worker!
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(pipeline_id) def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline| Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
......
...@@ -4,6 +4,9 @@ class PipelineNotificationWorker ...@@ -4,6 +4,9 @@ class PipelineNotificationWorker
include ApplicationWorker include ApplicationWorker
include PipelineQueue include PipelineQueue
latency_sensitive_worker!
worker_resource_boundary :cpu
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(pipeline_id, recipients = nil) def perform(pipeline_id, recipients = nil)
pipeline = Ci::Pipeline.find_by(id: pipeline_id) pipeline = Ci::Pipeline.find_by(id: pipeline_id)
......
...@@ -6,6 +6,7 @@ class PipelineProcessWorker ...@@ -6,6 +6,7 @@ class PipelineProcessWorker
queue_namespace :pipeline_processing queue_namespace :pipeline_processing
feature_category :continuous_integration feature_category :continuous_integration
latency_sensitive_worker!
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(pipeline_id, build_ids = nil) def perform(pipeline_id, build_ids = nil)
......
...@@ -5,6 +5,7 @@ class PipelineScheduleWorker ...@@ -5,6 +5,7 @@ class PipelineScheduleWorker
include CronjobQueue include CronjobQueue
feature_category :continuous_integration feature_category :continuous_integration
worker_resource_boundary :cpu
def perform def perform
Ci::PipelineSchedule.runnable_schedules.preloaded.find_in_batches do |schedules| Ci::PipelineSchedule.runnable_schedules.preloaded.find_in_batches do |schedules|
......
...@@ -5,6 +5,7 @@ class PipelineSuccessWorker ...@@ -5,6 +5,7 @@ class PipelineSuccessWorker
include PipelineQueue include PipelineQueue
queue_namespace :pipeline_processing queue_namespace :pipeline_processing
latency_sensitive_worker!
def perform(pipeline_id) def perform(pipeline_id)
# no-op # no-op
......
...@@ -5,6 +5,7 @@ class PipelineUpdateWorker ...@@ -5,6 +5,7 @@ class PipelineUpdateWorker
include PipelineQueue include PipelineQueue
queue_namespace :pipeline_processing queue_namespace :pipeline_processing
latency_sensitive_worker!
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(pipeline_id) def perform(pipeline_id)
......
...@@ -4,6 +4,8 @@ class PostReceive ...@@ -4,6 +4,8 @@ class PostReceive
include ApplicationWorker include ApplicationWorker
feature_category :source_code_management feature_category :source_code_management
latency_sensitive_worker!
worker_resource_boundary :cpu
def perform(gl_repository, identifier, changes, push_options = {}) def perform(gl_repository, identifier, changes, push_options = {})
project, repo_type = Gitlab::GlRepository.parse(gl_repository) project, repo_type = Gitlab::GlRepository.parse(gl_repository)
......
...@@ -11,6 +11,7 @@ class ProcessCommitWorker ...@@ -11,6 +11,7 @@ class ProcessCommitWorker
include ApplicationWorker include ApplicationWorker
feature_category :source_code_management feature_category :source_code_management
latency_sensitive_worker!
# project_id - The ID of the project this commit belongs to. # project_id - The ID of the project this commit belongs to.
# user_id - The ID of the user that pushed the commit. # user_id - The ID of the user that pushed the commit.
......
...@@ -3,6 +3,9 @@ ...@@ -3,6 +3,9 @@
# Worker for updating any project specific caches. # Worker for updating any project specific caches.
class ProjectCacheWorker class ProjectCacheWorker
include ApplicationWorker include ApplicationWorker
latency_sensitive_worker!
LEASE_TIMEOUT = 15.minutes.to_i LEASE_TIMEOUT = 15.minutes.to_i
feature_category :source_code_management feature_category :source_code_management
......
...@@ -6,6 +6,7 @@ class ProjectExportWorker ...@@ -6,6 +6,7 @@ class ProjectExportWorker
sidekiq_options retry: 3 sidekiq_options retry: 3
feature_category :source_code_management feature_category :source_code_management
worker_resource_boundary :memory
def perform(current_user_id, project_id, after_export_strategy = {}, params = {}) def perform(current_user_id, project_id, after_export_strategy = {}, params = {})
current_user = User.find(current_user_id) current_user = User.find(current_user_id)
......
...@@ -5,6 +5,7 @@ class ProjectServiceWorker ...@@ -5,6 +5,7 @@ class ProjectServiceWorker
sidekiq_options dead: false sidekiq_options dead: false
feature_category :integrations feature_category :integrations
worker_has_external_dependencies!
def perform(hook_id, data) def perform(hook_id, data)
data = data.with_indifferent_access data = data.with_indifferent_access
......
...@@ -5,6 +5,14 @@ class ReactiveCachingWorker ...@@ -5,6 +5,14 @@ class ReactiveCachingWorker
feature_category_not_owned! feature_category_not_owned!
# TODO: The reactive caching worker should be split into
# two different workers, one for latency_sensitive jobs without external dependencies
# and another worker without latency_sensitivity, but with external dependencies
# https://gitlab.com/gitlab-com/gl-infra/scalability/issues/34
# This worker should also have `worker_has_external_dependencies!` enabled
latency_sensitive_worker!
worker_resource_boundary :cpu
def perform(class_name, id, *args) def perform(class_name, id, *args)
klass = begin klass = begin
class_name.constantize class_name.constantize
......
...@@ -5,6 +5,7 @@ class RemoveExpiredMembersWorker ...@@ -5,6 +5,7 @@ class RemoveExpiredMembersWorker
include CronjobQueue include CronjobQueue
feature_category :authentication_and_authorization feature_category :authentication_and_authorization
worker_resource_boundary :cpu
def perform def perform
Member.expired.find_each do |member| Member.expired.find_each do |member|
......
...@@ -7,6 +7,7 @@ class RepositoryImportWorker ...@@ -7,6 +7,7 @@ class RepositoryImportWorker
include ProjectImportOptions include ProjectImportOptions
feature_category :importers feature_category :importers
worker_has_external_dependencies!
# technical debt: https://gitlab.com/gitlab-org/gitlab/issues/33991 # technical debt: https://gitlab.com/gitlab-org/gitlab/issues/33991
sidekiq_options memory_killer_memory_growth_kb: ENV.fetch('MEMORY_KILLER_REPOSITORY_IMPORT_WORKER_MEMORY_GROWTH_KB', 50).to_i sidekiq_options memory_killer_memory_growth_kb: ENV.fetch('MEMORY_KILLER_REPOSITORY_IMPORT_WORKER_MEMORY_GROWTH_KB', 50).to_i
......
...@@ -6,6 +6,8 @@ class RepositoryUpdateRemoteMirrorWorker ...@@ -6,6 +6,8 @@ class RepositoryUpdateRemoteMirrorWorker
include ApplicationWorker include ApplicationWorker
include Gitlab::ExclusiveLeaseHelpers include Gitlab::ExclusiveLeaseHelpers
worker_has_external_dependencies!
sidekiq_options retry: 3, dead: false sidekiq_options retry: 3, dead: false
feature_category :source_code_management feature_category :source_code_management
......
...@@ -5,6 +5,7 @@ class StageUpdateWorker ...@@ -5,6 +5,7 @@ class StageUpdateWorker
include PipelineQueue include PipelineQueue
queue_namespace :pipeline_processing queue_namespace :pipeline_processing
latency_sensitive_worker!
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(stage_id) def perform(stage_id)
......
...@@ -5,6 +5,7 @@ class StuckCiJobsWorker ...@@ -5,6 +5,7 @@ class StuckCiJobsWorker
include CronjobQueue include CronjobQueue
feature_category :continuous_integration feature_category :continuous_integration
worker_resource_boundary :cpu
EXCLUSIVE_LEASE_KEY = 'stuck_ci_builds_worker_lease' EXCLUSIVE_LEASE_KEY = 'stuck_ci_builds_worker_lease'
......
...@@ -5,6 +5,7 @@ class StuckImportJobsWorker ...@@ -5,6 +5,7 @@ class StuckImportJobsWorker
include CronjobQueue include CronjobQueue
feature_category :importers feature_category :importers
worker_resource_boundary :cpu
IMPORT_JOBS_EXPIRATION = 15.hours.to_i IMPORT_JOBS_EXPIRATION = 15.hours.to_i
......
...@@ -6,6 +6,8 @@ class UpdateHeadPipelineForMergeRequestWorker ...@@ -6,6 +6,8 @@ class UpdateHeadPipelineForMergeRequestWorker
queue_namespace :pipeline_processing queue_namespace :pipeline_processing
feature_category :continuous_integration feature_category :continuous_integration
latency_sensitive_worker!
worker_resource_boundary :cpu
def perform(merge_request_id) def perform(merge_request_id)
MergeRequest.find_by_id(merge_request_id).try do |merge_request| MergeRequest.find_by_id(merge_request_id).try do |merge_request|
......
...@@ -4,6 +4,8 @@ class UpdateMergeRequestsWorker ...@@ -4,6 +4,8 @@ class UpdateMergeRequestsWorker
include ApplicationWorker include ApplicationWorker
feature_category :source_code_management feature_category :source_code_management
latency_sensitive_worker!
worker_resource_boundary :cpu
LOG_TIME_THRESHOLD = 90 # seconds LOG_TIME_THRESHOLD = 90 # seconds
......
...@@ -4,6 +4,8 @@ class WaitForClusterCreationWorker ...@@ -4,6 +4,8 @@ class WaitForClusterCreationWorker
include ApplicationWorker include ApplicationWorker
include ClusterQueue include ClusterQueue
worker_has_external_dependencies!
def perform(cluster_id) def perform(cluster_id)
Clusters::Cluster.find_by_id(cluster_id).try do |cluster| Clusters::Cluster.find_by_id(cluster_id).try do |cluster|
cluster.provider.try do |provider| cluster.provider.try do |provider|
......
...@@ -4,6 +4,8 @@ class WebHookWorker ...@@ -4,6 +4,8 @@ class WebHookWorker
include ApplicationWorker include ApplicationWorker
feature_category :integrations feature_category :integrations
worker_has_external_dependencies!
sidekiq_options retry: 4, dead: false sidekiq_options retry: 4, dead: false
def perform(hook_id, data, hook_name) def perform(hook_id, data, hook_name)
......
---
title: Attribute Sidekiq workers according to their workloads
merge_request: 18066
author:
type: other
...@@ -368,7 +368,7 @@ Enterprise Edition instance. This has some implications: ...@@ -368,7 +368,7 @@ Enterprise Edition instance. This has some implications:
- [Background migrations](background_migrations.md) run in Sidekiq, and - [Background migrations](background_migrations.md) run in Sidekiq, and
should only be done for migrations that would take an extreme amount of should only be done for migrations that would take an extreme amount of
time at GitLab.com scale. time at GitLab.com scale.
1. **Sidekiq workers** [cannot change in a backwards-incompatible way](sidekiq_style_guide.md#removing-or-renaming-queues): 1. **Sidekiq workers** [cannot change in a backwards-incompatible way](sidekiq_style_guide.md#sidekiq-compatibility-across-updates):
1. Sidekiq queues are not drained before a deploy happens, so there will be 1. Sidekiq queues are not drained before a deploy happens, so there will be
workers in the queue from the previous version of GitLab. workers in the queue from the previous version of GitLab.
1. If you need to change a method signature, try to do so across two releases, 1. If you need to change a method signature, try to do so across two releases,
......
This diff is collapsed.
...@@ -5,6 +5,8 @@ module Ci ...@@ -5,6 +5,8 @@ module Ci
include ::ApplicationWorker include ::ApplicationWorker
include ::PipelineQueue include ::PipelineQueue
worker_resource_boundary :cpu
def perform(bridge_id) def perform(bridge_id)
::Ci::Bridge.find_by_id(bridge_id).try do |bridge| ::Ci::Bridge.find_by_id(bridge_id).try do |bridge|
::Ci::CreateCrossProjectPipelineService ::Ci::CreateCrossProjectPipelineService
......
...@@ -5,6 +5,9 @@ module Ci ...@@ -5,6 +5,9 @@ module Ci
include ::ApplicationWorker include ::ApplicationWorker
include ::PipelineQueue include ::PipelineQueue
latency_sensitive_worker!
worker_resource_boundary :cpu
def perform(pipeline_id) def perform(pipeline_id)
::Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline| ::Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
::Ci::PipelineBridgeStatusService ::Ci::PipelineBridgeStatusService
......
...@@ -5,6 +5,8 @@ class CreateGithubWebhookWorker ...@@ -5,6 +5,8 @@ class CreateGithubWebhookWorker
include GrapePathHelpers::NamedRouteMatcher include GrapePathHelpers::NamedRouteMatcher
feature_category :integrations feature_category :integrations
worker_resource_boundary :cpu
worker_has_external_dependencies!
attr_reader :project attr_reader :project
......
...@@ -4,6 +4,7 @@ class ExportCsvWorker ...@@ -4,6 +4,7 @@ class ExportCsvWorker
include ApplicationWorker include ApplicationWorker
feature_category :issue_tracking feature_category :issue_tracking
worker_resource_boundary :memory
def perform(current_user_id, project_id, params) def perform(current_user_id, project_id, params)
@current_user = User.find(current_user_id) @current_user = User.find(current_user_id)
......
...@@ -6,6 +6,7 @@ module IncidentManagement ...@@ -6,6 +6,7 @@ module IncidentManagement
queue_namespace :incident_management queue_namespace :incident_management
feature_category :incident_management feature_category :incident_management
worker_resource_boundary :cpu
def perform(project_id, alert_hash) def perform(project_id, alert_hash)
project = find_project(project_id) project = find_project(project_id)
......
...@@ -5,6 +5,7 @@ class LdapAllGroupsSyncWorker ...@@ -5,6 +5,7 @@ class LdapAllGroupsSyncWorker
include CronjobQueue include CronjobQueue
feature_category :authentication_and_authorization feature_category :authentication_and_authorization
worker_has_external_dependencies!
def perform def perform
return unless Gitlab::Auth::LDAP::Config.group_sync_enabled? return unless Gitlab::Auth::LDAP::Config.group_sync_enabled?
......
...@@ -4,6 +4,7 @@ class LdapGroupSyncWorker ...@@ -4,6 +4,7 @@ class LdapGroupSyncWorker
include ApplicationWorker include ApplicationWorker
feature_category :authentication_and_authorization feature_category :authentication_and_authorization
worker_has_external_dependencies!
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(group_ids, provider = nil) def perform(group_ids, provider = nil)
......
...@@ -5,6 +5,7 @@ class LdapSyncWorker ...@@ -5,6 +5,7 @@ class LdapSyncWorker
include CronjobQueue include CronjobQueue
feature_category :authentication_and_authorization feature_category :authentication_and_authorization
worker_has_external_dependencies!
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
# rubocop: disable Gitlab/RailsLogger # rubocop: disable Gitlab/RailsLogger
......
...@@ -5,6 +5,7 @@ class NewEpicWorker ...@@ -5,6 +5,7 @@ class NewEpicWorker
include NewIssuable include NewIssuable
feature_category :agile_portfolio_management feature_category :agile_portfolio_management
worker_resource_boundary :cpu
def perform(epic_id, user_id) def perform(epic_id, user_id)
return unless objects_found?(epic_id, user_id) return unless objects_found?(epic_id, user_id)
......
...@@ -7,6 +7,8 @@ class SyncSecurityReportsToReportApprovalRulesWorker ...@@ -7,6 +7,8 @@ class SyncSecurityReportsToReportApprovalRulesWorker
include PipelineQueue include PipelineQueue
feature_category :static_application_security_testing feature_category :static_application_security_testing
latency_sensitive_worker!
worker_resource_boundary :cpu
def perform(pipeline_id) def perform(pipeline_id)
pipeline = Ci::Pipeline.find_by_id(pipeline_id) pipeline = Ci::Pipeline.find_by_id(pipeline_id)
......
...@@ -5,6 +5,7 @@ class UpdateMaxSeatsUsedForGitlabComSubscriptionsWorker ...@@ -5,6 +5,7 @@ class UpdateMaxSeatsUsedForGitlabComSubscriptionsWorker
include CronjobQueue include CronjobQueue
feature_category :license_compliance feature_category :license_compliance
worker_resource_boundary :cpu
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform def perform
......
...@@ -21,8 +21,8 @@ describe 'Every Sidekiq worker' do ...@@ -21,8 +21,8 @@ describe 'Every Sidekiq worker' do
missing_from_file = worker_queues - file_worker_queues missing_from_file = worker_queues - file_worker_queues
expect(missing_from_file).to be_empty, "expected #{missing_from_file.to_a.inspect} to be in Gitlab::SidekiqConfig::QUEUE_CONFIG_PATHS" expect(missing_from_file).to be_empty, "expected #{missing_from_file.to_a.inspect} to be in Gitlab::SidekiqConfig::QUEUE_CONFIG_PATHS"
unncessarily_in_file = file_worker_queues - worker_queues unnecessarily_in_file = file_worker_queues - worker_queues
expect(unncessarily_in_file).to be_empty, "expected #{unncessarily_in_file.to_a.inspect} not to be in Gitlab::SidekiqConfig::QUEUE_CONFIG_PATHS" expect(unnecessarily_in_file).to be_empty, "expected #{unnecessarily_in_file.to_a.inspect} not to be in Gitlab::SidekiqConfig::QUEUE_CONFIG_PATHS"
end end
it 'has its queue or namespace in config/sidekiq_queues.yml', :aggregate_failures do it 'has its queue or namespace in config/sidekiq_queues.yml', :aggregate_failures do
...@@ -42,7 +42,7 @@ describe 'Every Sidekiq worker' do ...@@ -42,7 +42,7 @@ describe 'Every Sidekiq worker' do
end end
# All Sidekiq worker classes should declare a valid `feature_category` # All Sidekiq worker classes should declare a valid `feature_category`
# or explicitely be excluded with the `feature_category_not_owned!` annotation. # or explicitly be excluded with the `feature_category_not_owned!` annotation.
# Please see doc/development/sidekiq_style_guide.md#Feature-Categorization for more details. # Please see doc/development/sidekiq_style_guide.md#Feature-Categorization for more details.
it 'has a feature_category or feature_category_not_owned! attribute', :aggregate_failures do it 'has a feature_category or feature_category_not_owned! attribute', :aggregate_failures do
Gitlab::SidekiqConfig.workers.each do |worker| Gitlab::SidekiqConfig.workers.each do |worker|
...@@ -62,5 +62,36 @@ describe 'Every Sidekiq worker' do ...@@ -62,5 +62,36 @@ describe 'Every Sidekiq worker' do
expect(feature_categories).to include(worker.get_feature_category), "expected #{worker.inspect} to declare a valid feature_category, but got #{worker.get_feature_category}" expect(feature_categories).to include(worker.get_feature_category), "expected #{worker.inspect} to declare a valid feature_category, but got #{worker.get_feature_category}"
end end
end end
# Memory-bound workers are very expensive to run, since they need to run on nodes with very low
# concurrency, so that each job can consume a large amounts of memory. For this reason, on
# GitLab.com, when a large number of memory-bound jobs arrive at once, we let them queue up
# rather than scaling the hardware to meet the SLO. For this reason, memory-bound,
# latency-sensitive jobs are explicitly discouraged and disabled.
it 'is (exclusively) memory-bound or latency-sentitive, not both', :aggregate_failures do
latency_sensitive_workers = Gitlab::SidekiqConfig.workers
.select(&:latency_sensitive_worker?)
latency_sensitive_workers.each do |worker|
expect(worker.get_worker_resource_boundary).not_to eq(:memory), "#{worker.inspect} cannot be both memory-bound and latency sensitive"
end
end
# In high traffic installations, such as GitLab.com, `latency_sensitive` workers run in a
# dedicated fleet. In order to ensure short queue times, `latency_sensitive` jobs have strict
# SLOs in order to ensure throughput. However, when a worker depends on an external service,
# such as a user's k8s cluster or a third-party internet service, we cannot guarantee latency,
# and therefore throughput. An outage to an 3rd party service could therefore impact throughput
# on other latency_sensitive jobs, leading to degradation through the GitLab application.
# Please see doc/development/sidekiq_style_guide.md#Jobs-with-External-Dependencies for more
# details.
it 'has (exclusively) external dependencies or is latency-sentitive, not both', :aggregate_failures do
latency_sensitive_workers = Gitlab::SidekiqConfig.workers
.select(&:latency_sensitive_worker?)
latency_sensitive_workers.each do |worker|
expect(worker.worker_has_external_dependencies?).to be_falsey, "#{worker.inspect} cannot have both external dependencies and be latency sensitive"
end
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment