Commit c5bd57e8 authored by Douwe Maan's avatar Douwe Maan

Add ApplicationWorker and make every worker include it

parent 2d08a15f
class AdminEmailWorker class AdminEmailWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class AuthorizedProjectsWorker class AuthorizedProjectsWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
# Schedules multiple jobs and waits for them to be completed. # Schedules multiple jobs and waits for them to be completed.
def self.bulk_perform_and_wait(args_list) def self.bulk_perform_and_wait(args_list)
......
class BackgroundMigrationWorker class BackgroundMigrationWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
# Enqueues a number of jobs in bulk. # Enqueues a number of jobs in bulk.
# #
......
class BuildCoverageWorker class BuildCoverageWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
def perform(build_id) def perform(build_id)
......
class BuildFinishedWorker class BuildFinishedWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class BuildHooksWorker class BuildHooksWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :hooks enqueue_in group: :hooks
......
class BuildQueueWorker class BuildQueueWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class BuildSuccessWorker class BuildSuccessWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class BuildTraceSectionsWorker class BuildTraceSectionsWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
def perform(build_id) def perform(build_id)
......
class ClusterInstallAppWorker class ClusterInstallAppWorker
include Sidekiq::Worker include ApplicationWorker
include ClusterQueue include ClusterQueue
include ClusterApplications include ClusterApplications
......
class ClusterProvisionWorker class ClusterProvisionWorker
include Sidekiq::Worker include ApplicationWorker
include ClusterQueue include ClusterQueue
def perform(cluster_id) def perform(cluster_id)
......
class ClusterWaitForAppInstallationWorker class ClusterWaitForAppInstallationWorker
include Sidekiq::Worker include ApplicationWorker
include ClusterQueue include ClusterQueue
include ClusterApplications include ClusterApplications
......
Sidekiq::Worker.extend ActiveSupport::Concern
module ApplicationWorker
extend ActiveSupport::Concern
include Sidekiq::Worker
included do
sidekiq_options queue: base_queue_name
end
module ClassMethods
def base_queue_name
name
.sub(/\AGitlab::/, '')
.sub(/Worker\z/, '')
.underscore
.tr('/', '_')
end
def queue
get_sidekiq_options['queue'].to_s
end
end
end
# Concern that sets the queue of a Sidekiq worker based on the worker's class
# name/namespace.
module DedicatedSidekiqQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: name.sub(/Worker\z/, '').underscore.tr('/', '_')
end
end
...@@ -8,7 +8,7 @@ module Gitlab ...@@ -8,7 +8,7 @@ module Gitlab
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include ReschedulingMethods include ReschedulingMethods
include NotifyUponDeath include NotifyUponDeath
......
class CreateGpgSignatureWorker class CreateGpgSignatureWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(commit_sha, project_id) def perform(commit_sha, project_id)
project = Project.find_by(id: project_id) project = Project.find_by(id: project_id)
......
class CreatePipelineWorker class CreatePipelineWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :creation enqueue_in group: :creation
......
class DeleteMergedBranchesWorker class DeleteMergedBranchesWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(project_id, user_id) def perform(project_id, user_id)
begin begin
......
class DeleteUserWorker class DeleteUserWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(current_user_id, delete_user_id, options = {}) def perform(current_user_id, delete_user_id, options = {})
delete_user = User.find(delete_user_id) delete_user = User.find(delete_user_id)
......
class EmailReceiverWorker class EmailReceiverWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(raw) def perform(raw)
return unless Gitlab::IncomingEmail.enabled? return unless Gitlab::IncomingEmail.enabled?
......
class EmailsOnPushWorker class EmailsOnPushWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
attr_reader :email, :skip_premailer attr_reader :email, :skip_premailer
......
class ExpireBuildArtifactsWorker class ExpireBuildArtifactsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class ExpireBuildInstanceArtifactsWorker class ExpireBuildInstanceArtifactsWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(build_id) def perform(build_id)
build = Ci::Build build = Ci::Build
......
class ExpireJobCacheWorker class ExpireJobCacheWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :cache enqueue_in group: :cache
......
class ExpirePipelineCacheWorker class ExpirePipelineCacheWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :cache enqueue_in group: :cache
......
module Geo module Geo
class BaseSchedulerWorker class BaseSchedulerWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
include ExclusiveLeaseGuard include ExclusiveLeaseGuard
......
module Geo module Geo
class FileDownloadWorker class FileDownloadWorker
include Sidekiq::Worker include ApplicationWorker
sidekiq_options queue: :geo_file_download, retry: 3, dead: false
sidekiq_options retry: 3, dead: false
def perform(object_type, object_id) def perform(object_type, object_id)
Geo::FileDownloadService.new(object_type.to_sym, object_id).execute Geo::FileDownloadService.new(object_type.to_sym, object_id).execute
......
module Geo module Geo
class FileRemovalWorker class FileRemovalWorker
include Sidekiq::Worker include ApplicationWorker
include Gitlab::Geo::LogHelpers include Gitlab::Geo::LogHelpers
include GeoQueue
sidekiq_options queue: :geo
def perform(file_path) def perform(file_path)
remove_file!(file_path) remove_file!(file_path)
......
module Geo module Geo
class HashedStorageAttachmentsMigrationWorker class HashedStorageAttachmentsMigrationWorker
include Sidekiq::Worker include ApplicationWorker
include GeoQueue include GeoQueue
def perform(project_id, old_attachments_path, new_attachments_path) def perform(project_id, old_attachments_path, new_attachments_path)
......
module Geo module Geo
class HashedStorageMigrationWorker class HashedStorageMigrationWorker
include Sidekiq::Worker include ApplicationWorker
include GeoQueue include GeoQueue
def perform(project_id, old_disk_path, new_disk_path, old_storage_version) def perform(project_id, old_disk_path, new_disk_path, old_storage_version)
......
module Geo module Geo
class MetricsUpdateWorker class MetricsUpdateWorker
include Sidekiq::Worker include ApplicationWorker
include ExclusiveLeaseGuard include ExclusiveLeaseGuard
include CronjobQueue include CronjobQueue
......
module Geo module Geo
class ProjectSyncWorker class ProjectSyncWorker
include Sidekiq::Worker include ApplicationWorker
sidekiq_options queue: :geo_project_sync, retry: 3, dead: false sidekiq_options retry: 3, dead: false
sidekiq_retry_in { |count| 30 * count } sidekiq_retry_in { |count| 30 * count }
......
module Geo module Geo
class PruneEventLogWorker class PruneEventLogWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
include ExclusiveLeaseGuard include ExclusiveLeaseGuard
include ::Gitlab::Geo::LogHelpers include ::Gitlab::Geo::LogHelpers
......
module Geo module Geo
class RenameRepositoryWorker class RenameRepositoryWorker
include Sidekiq::Worker include ApplicationWorker
include GeoQueue include GeoQueue
def perform(project_id, old_disk_path, new_disk_path) def perform(project_id, old_disk_path, new_disk_path)
......
module Geo module Geo
class RepositoriesCleanUpWorker class RepositoriesCleanUpWorker
include Sidekiq::Worker include ApplicationWorker
include GeoQueue include GeoQueue
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include ExclusiveLeaseGuard include ExclusiveLeaseGuard
......
class GeoRepositoryDestroyWorker class GeoRepositoryDestroyWorker
include Sidekiq::Worker include ApplicationWorker
include GeoQueue include GeoQueue
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
......
class GitGarbageCollectWorker class GitGarbageCollectWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include Gitlab::CurrentSettings include Gitlab::CurrentSettings
sidekiq_options retry: false sidekiq_options retry: false
......
...@@ -7,7 +7,7 @@ module Gitlab ...@@ -7,7 +7,7 @@ module Gitlab
# been completed this worker will advance the import process to the next # been completed this worker will advance the import process to the next
# stage. # stage.
class AdvanceStageWorker class AdvanceStageWorker
include Sidekiq::Worker include ApplicationWorker
sidekiq_options queue: 'github_importer_advance_stage', dead: false sidekiq_options queue: 'github_importer_advance_stage', dead: false
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
module Gitlab module Gitlab
module GithubImport module GithubImport
class RefreshImportJidWorker class RefreshImportJidWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
# The interval to schedule new instances of this job at. # The interval to schedule new instances of this job at.
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class FinishImportWorker class FinishImportWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class ImportBaseDataWorker class ImportBaseDataWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class ImportIssuesAndDiffNotesWorker class ImportIssuesAndDiffNotesWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class ImportNotesWorker class ImportNotesWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class ImportPullRequestsWorker class ImportPullRequestsWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class ImportRepositoryWorker class ImportRepositoryWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
class GitlabShellWorker class GitlabShellWorker
include Sidekiq::Worker include ApplicationWorker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
def perform(action, *arg) def perform(action, *arg)
gitlab_shell.__send__(action, *arg) # rubocop:disable GitlabSecurity/PublicSend gitlab_shell.__send__(action, *arg) # rubocop:disable GitlabSecurity/PublicSend
......
class GitlabUsagePingWorker class GitlabUsagePingWorker
LEASE_TIMEOUT = 86400 LEASE_TIMEOUT = 86400
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class GroupDestroyWorker class GroupDestroyWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include ExceptionBacktrace include ExceptionBacktrace
def perform(group_id, user_id) def perform(group_id, user_id)
......
class ImportExportProjectCleanupWorker class ImportExportProjectCleanupWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class InvalidGpgSignatureUpdateWorker class InvalidGpgSignatureUpdateWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(gpg_key_id) def perform(gpg_key_id)
gpg_key = GpgKey.find_by(id: gpg_key_id) gpg_key = GpgKey.find_by(id: gpg_key_id)
......
...@@ -2,8 +2,7 @@ require 'json' ...@@ -2,8 +2,7 @@ require 'json'
require 'socket' require 'socket'
class IrkerWorker class IrkerWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(project_id, chans, colors, push_data, settings) def perform(project_id, chans, colors, push_data, settings)
project = Project.find(project_id) project = Project.find(project_id)
......
class MergeWorker class MergeWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(merge_request_id, current_user_id, params) def perform(merge_request_id, current_user_id, params)
params = params.with_indifferent_access params = params.with_indifferent_access
......
...@@ -5,8 +5,7 @@ ...@@ -5,8 +5,7 @@
# The worker will reject doing anything for projects that *do* have a # The worker will reject doing anything for projects that *do* have a
# namespace. For those use ProjectDestroyWorker instead. # namespace. For those use ProjectDestroyWorker instead.
class NamespacelessProjectDestroyWorker class NamespacelessProjectDestroyWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include ExceptionBacktrace include ExceptionBacktrace
def self.bulk_perform_async(args_list) def self.bulk_perform_async(args_list)
......
class NewIssueWorker class NewIssueWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include NewIssuable include NewIssuable
def perform(issue_id, user_id) def perform(issue_id, user_id)
......
class NewMergeRequestWorker class NewMergeRequestWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include NewIssuable include NewIssuable
def perform(merge_request_id, user_id) def perform(merge_request_id, user_id)
......
class NewNoteWorker class NewNoteWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
# Keep extra parameter to preserve backwards compatibility with # Keep extra parameter to preserve backwards compatibility with
# old `NewNoteWorker` jobs (can remove later) # old `NewNoteWorker` jobs (can remove later)
......
class PagesWorker class PagesWorker
include Sidekiq::Worker include ApplicationWorker
sidekiq_options queue: :pages, retry: false sidekiq_options queue: :pages, retry: false
......
class PipelineHooksWorker class PipelineHooksWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :hooks enqueue_in group: :hooks
......
class PipelineMetricsWorker class PipelineMetricsWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
def perform(pipeline_id) def perform(pipeline_id)
......
class PipelineNotificationWorker class PipelineNotificationWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
def perform(pipeline_id, recipients = nil) def perform(pipeline_id, recipients = nil)
......
class PipelineProcessWorker class PipelineProcessWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class PipelineScheduleWorker class PipelineScheduleWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class PipelineSuccessWorker class PipelineSuccessWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class PipelineUpdateWorker class PipelineUpdateWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class PostReceive class PostReceive
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
prepend EE::PostReceive prepend EE::PostReceive
def perform(gl_repository, identifier, changes) def perform(gl_repository, identifier, changes)
......
...@@ -5,8 +5,7 @@ ...@@ -5,8 +5,7 @@
# Consider using an extra worker if you need to add any extra (and potentially # Consider using an extra worker if you need to add any extra (and potentially
# slow) processing of commits. # slow) processing of commits.
class ProcessCommitWorker class ProcessCommitWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
# project_id - The ID of the project this commit belongs to. # project_id - The ID of the project this commit belongs to.
# user_id - The ID of the user that pushed the commit. # user_id - The ID of the user that pushed the commit.
......
# Worker for updating any project specific caches. # Worker for updating any project specific caches.
class ProjectCacheWorker class ProjectCacheWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
prepend EE::Workers::ProjectCacheWorker prepend EE::Workers::ProjectCacheWorker
LEASE_TIMEOUT = 15.minutes.to_i LEASE_TIMEOUT = 15.minutes.to_i
......
class ProjectDestroyWorker class ProjectDestroyWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include ExceptionBacktrace include ExceptionBacktrace
def perform(project_id, user_id, params) def perform(project_id, user_id, params)
......
class ProjectExportWorker class ProjectExportWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include ExceptionBacktrace include ExceptionBacktrace
sidekiq_options retry: 3 sidekiq_options retry: 3
......
class ProjectMigrateHashedStorageWorker class ProjectMigrateHashedStorageWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
LEASE_TIMEOUT = 30.seconds.to_i LEASE_TIMEOUT = 30.seconds.to_i
......
class ProjectServiceWorker class ProjectServiceWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
sidekiq_options dead: false sidekiq_options dead: false
......
# Worker for updating any project specific caches. # Worker for updating any project specific caches.
class PropagateServiceTemplateWorker class PropagateServiceTemplateWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
LEASE_TIMEOUT = 4.hours.to_i LEASE_TIMEOUT = 4.hours.to_i
......
class PruneOldEventsWorker class PruneOldEventsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class ReactiveCachingWorker class ReactiveCachingWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(class_name, id, *args) def perform(class_name, id, *args)
klass = begin klass = begin
......
class RemoveExpiredGroupLinksWorker class RemoveExpiredGroupLinksWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class RemoveExpiredMembersWorker class RemoveExpiredMembersWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class RemoveOldWebHookLogsWorker class RemoveOldWebHookLogsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
WEB_HOOK_LOG_LIFETIME = 2.days WEB_HOOK_LOG_LIFETIME = 2.days
......
class RemoveUnreferencedLfsObjectsWorker class RemoveUnreferencedLfsObjectsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class RepositoryArchiveCacheWorker class RepositoryArchiveCacheWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
module RepositoryCheck module RepositoryCheck
class BatchWorker class BatchWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
RUN_TIME = 3600 RUN_TIME = 3600
......
module RepositoryCheck module RepositoryCheck
class ClearWorker class ClearWorker
include Sidekiq::Worker include ApplicationWorker
include RepositoryCheckQueue include RepositoryCheckQueue
def perform def perform
......
module RepositoryCheck module RepositoryCheck
class SingleRepositoryWorker class SingleRepositoryWorker
include Sidekiq::Worker include ApplicationWorker
include RepositoryCheckQueue include RepositoryCheckQueue
def perform(project_id) def perform(project_id)
......
class RepositoryForkWorker class RepositoryForkWorker
ForkError = Class.new(StandardError) ForkError = Class.new(StandardError)
include Sidekiq::Worker include ApplicationWorker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
include ProjectStartImport include ProjectStartImport
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
......
class RepositoryImportWorker class RepositoryImportWorker
ImportError = Class.new(StandardError) ImportError = Class.new(StandardError)
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include ExceptionBacktrace include ExceptionBacktrace
include ProjectStartImport include ProjectStartImport
......
class RequestsProfilesWorker class RequestsProfilesWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class ScheduleUpdateUserActivityWorker class ScheduleUpdateUserActivityWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform(batch_size = 500) def perform(batch_size = 500)
......
class StageUpdateWorker class StageUpdateWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class StorageMigratorWorker class StorageMigratorWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
BATCH_SIZE = 100 BATCH_SIZE = 100
......
class StuckCiJobsWorker class StuckCiJobsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
EXCLUSIVE_LEASE_KEY = 'stuck_ci_builds_worker_lease'.freeze EXCLUSIVE_LEASE_KEY = 'stuck_ci_builds_worker_lease'.freeze
......
class StuckImportJobsWorker class StuckImportJobsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
IMPORT_JOBS_EXPIRATION = 15.hours.to_i IMPORT_JOBS_EXPIRATION = 15.hours.to_i
......
class StuckMergeJobsWorker class StuckMergeJobsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class SystemHookPushWorker class SystemHookPushWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(push_data, hook_id) def perform(push_data, hook_id)
SystemHooksService.new.execute_hooks(push_data, hook_id) SystemHooksService.new.execute_hooks(push_data, hook_id)
......
class TrendingProjectsWorker class TrendingProjectsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class UpdateMergeRequestsWorker class UpdateMergeRequestsWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
LOG_TIME_THRESHOLD = 90 # seconds LOG_TIME_THRESHOLD = 90 # seconds
......
class UpdateUserActivityWorker class UpdateUserActivityWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(pairs) def perform(pairs)
pairs = cast_data(pairs) pairs = cast_data(pairs)
......
class UploadChecksumWorker class UploadChecksumWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(upload_id) def perform(upload_id)
upload = Upload.find(upload_id) upload = Upload.find(upload_id)
......
class WaitForClusterCreationWorker class WaitForClusterCreationWorker
include Sidekiq::Worker include ApplicationWorker
include ClusterQueue include ClusterQueue
def perform(cluster_id) def perform(cluster_id)
......
class WebHookWorker class WebHookWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
sidekiq_options retry: 4, dead: false sidekiq_options retry: 4, dead: false
......
...@@ -76,12 +76,14 @@ end ...@@ -76,12 +76,14 @@ end
# The Sidekiq client API always adds the queue to the Sidekiq queue # The Sidekiq client API always adds the queue to the Sidekiq queue
# list, but mail_room and gitlab-shell do not. This is only necessary # list, but mail_room and gitlab-shell do not. This is only necessary
# for monitoring. # for monitoring.
queues = Gitlab::SidekiqConfig.queues
begin begin
queues = Gitlab::SidekiqConfig.worker_queues
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
conn.pipelined do conn.pipelined do
queues.each { |queue| conn.sadd('queues', queue) } queues.each do |queue|
conn.sadd('queues', queue)
end
end end
end end
rescue Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED rescue Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED
......
...@@ -18,8 +18,7 @@ include the `DedicatedSidekiqQueue` concern as follows: ...@@ -18,8 +18,7 @@ include the `DedicatedSidekiqQueue` concern as follows:
```ruby ```ruby
class ProcessSomethingWorker class ProcessSomethingWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
end end
``` ```
......
class AdminEmailsWorker class AdminEmailsWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(recipient_id, subject, body) def perform(recipient_id, subject, body)
recipient_list(recipient_id).pluck(:id).uniq.each do |user_id| recipient_list(recipient_id).pluck(:id).uniq.each do |user_id|
......
class ClearSharedRunnersMinutesWorker class ClearSharedRunnersMinutesWorker
LEASE_TIMEOUT = 3600 LEASE_TIMEOUT = 3600
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class ElasticBatchProjectIndexerWorker class ElasticBatchProjectIndexerWorker
include Sidekiq::Worker include ApplicationWorker
include Gitlab::CurrentSettings include Gitlab::CurrentSettings
# Batch indexing is a generally a onetime option, so give finer control over # Batch indexing is a generally a onetime option, so give finer control over
# queuing and concurrency # queuing and concurrency
include DedicatedSidekiqQueue
# This worker is long-running, but idempotent, so retry many times if # This worker is long-running, but idempotent, so retry many times if
# necessary # necessary
......
class ElasticCommitIndexerWorker class ElasticCommitIndexerWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include Gitlab::CurrentSettings include Gitlab::CurrentSettings
sidekiq_options retry: 2 sidekiq_options retry: 2
......
class ElasticIndexerWorker class ElasticIndexerWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include Elasticsearch::Model::Client::ClassMethods include Elasticsearch::Model::Client::ClassMethods
include Gitlab::CurrentSettings include Gitlab::CurrentSettings
......
class ExportCsvWorker class ExportCsvWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(current_user_id, project_id, params) def perform(current_user_id, project_id, params)
@current_user = User.find(current_user_id) @current_user = User.find(current_user_id)
......
class HistoricalDataWorker class HistoricalDataWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class LdapAllGroupsSyncWorker class LdapAllGroupsSyncWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class LdapGroupSyncWorker class LdapGroupSyncWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(group_ids, provider = nil) def perform(group_ids, provider = nil)
return unless Gitlab::LDAP::Config.group_sync_enabled? return unless Gitlab::LDAP::Config.group_sync_enabled?
......
class LdapSyncWorker class LdapSyncWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class ObjectStorageUploadWorker class ObjectStorageUploadWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(uploader_class_name, subject_class_name, file_field, subject_id) def perform(uploader_class_name, subject_class_name, file_field, subject_id)
uploader_class = uploader_class_name.constantize uploader_class = uploader_class_name.constantize
......
class ProjectUpdateRepositoryStorageWorker class ProjectUpdateRepositoryStorageWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(project_id, new_repository_storage_key) def perform(project_id, new_repository_storage_key)
project = Project.find(project_id) project = Project.find(project_id)
......
class RebaseWorker class RebaseWorker
include Sidekiq::Worker include ApplicationWorker
sidekiq_options queue: :merge sidekiq_options queue: :merge
......
class RepositoryUpdateMirrorWorker class RepositoryUpdateMirrorWorker
UpdateError = Class.new(StandardError) UpdateError = Class.new(StandardError)
include Sidekiq::Worker include ApplicationWorker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
include ProjectStartImport include ProjectStartImport
LEASE_KEY = 'repository_update_mirror_worker_start_scheduler'.freeze LEASE_KEY = 'repository_update_mirror_worker_start_scheduler'.freeze
......
...@@ -2,9 +2,8 @@ class RepositoryUpdateRemoteMirrorWorker ...@@ -2,9 +2,8 @@ class RepositoryUpdateRemoteMirrorWorker
UpdateAlreadyInProgressError = Class.new(StandardError) UpdateAlreadyInProgressError = Class.new(StandardError)
UpdateError = Class.new(StandardError) UpdateError = Class.new(StandardError)
include Sidekiq::Worker include ApplicationWorker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
sidekiq_options retry: 3, dead: false sidekiq_options retry: 3, dead: false
......
class UpdateAllMirrorsWorker class UpdateAllMirrorsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
LEASE_TIMEOUT = 5.minutes LEASE_TIMEOUT = 5.minutes
......
...@@ -30,18 +30,16 @@ module Gitlab ...@@ -30,18 +30,16 @@ module Gitlab
option_parser.parse!(argv) option_parser.parse!(argv)
parsed_queues = SidekiqCluster.parse_queues(argv) queue_groups = SidekiqCluster.parse_queues(argv)
queues =
if @negate_queues if @negate_queues
parsed_queues.map { |queues| SidekiqConfig.queues(@rails_path, except: queues) } all_queues = SidekiqConfig.config_queues(@rails_path)
else queue_groups.map! { |queues| all_queues - queues }
parsed_queues
end end
@logger.info("Starting cluster with #{queues.length} processes") @logger.info("Starting cluster with #{queue_groups.length} processes")
@processes = SidekiqCluster.start(queues, @environment, @rails_path) @processes = SidekiqCluster.start(queue_groups, @environment, @rails_path)
write_pid write_pid
trap_signals trap_signals
......
...@@ -2,13 +2,51 @@ require 'yaml' ...@@ -2,13 +2,51 @@ require 'yaml'
module Gitlab module Gitlab
module SidekiqConfig module SidekiqConfig
def self.queues(rails_path = Rails.root.to_s, except: []) def self.redis_queues
queues_file_path = File.join(rails_path, 'config', 'sidekiq_queues.yml') @redis_queues ||= Sidekiq::Queue.all.map(&:name)
end
# This method is called by `bin/sidekiq-cluster` in EE, which runs outside
# of bundler/Rails context, so we cannot use any gem or Rails methods.
def self.config_queues(rails_path = Rails.root.to_s)
@config_queues ||= begin
config = YAML.load_file(File.join(rails_path, 'config', 'sidekiq_queues.yml'))
config[:queues].map(&:first)
end
end
def self.cron_workers
@cron_workers ||= Settings.cron_jobs.map { |job_name, options| options['job_class'].constantize }
end
def self.workers
@workers ||=
find_workers(Rails.root.join('app', 'workers')) +
find_workers(Rails.root.join('ee', 'app', 'workers'))
end
@queues_file = {} def self.default_queues
@queues_file[queues_file_path] ||= YAML.load_file(queues_file_path) [ActionMailer::DeliveryJob.queue_name, 'default']
end
def self.worker_queues
@worker_queues ||= (workers.map(&:queue) + default_queues).uniq
end
def self.find_workers(root)
concerns = root.join('concerns').to_s
workers = Dir[root.join('**', '*.rb')]
.reject { |path| path.start_with?(concerns) }
workers.map! do |path|
ns = Pathname.new(path).relative_path_from(root).to_s.gsub('.rb', '')
ns.camelize.constantize
end
@queues_file[queues_file_path].fetch(:queues).map { |queue, _| queue } - except # Skip concerns
workers.select { |w| w < Sidekiq::Worker }
end end
end end
end end
...@@ -22,7 +22,7 @@ describe Gitlab::SidekiqCluster::CLI do ...@@ -22,7 +22,7 @@ describe Gitlab::SidekiqCluster::CLI do
context 'with --negate flag' do context 'with --negate flag' do
it 'starts Sidekiq workers for all queues on sidekiq_queues.yml except the ones on argv' do it 'starts Sidekiq workers for all queues on sidekiq_queues.yml except the ones on argv' do
expect(Gitlab::SidekiqConfig).to receive(:queues).and_return(['baz']) expect(Gitlab::SidekiqConfig).to receive(:config_queues).and_return(['baz'])
expect(Gitlab::SidekiqCluster).to receive(:start) expect(Gitlab::SidekiqCluster).to receive(:start)
.with([['baz']], 'test', Dir.pwd) .with([['baz']], 'test', Dir.pwd)
.and_return([]) .and_return([])
......
require 'rails_helper' require 'rails_helper'
describe Gitlab::SidekiqConfig do describe Gitlab::SidekiqConfig do
describe '.queues' do describe '.workers' do
let(:queues_file_path) { Rails.root.join('config', 'sidekiq_queues.yml') } it 'includes all workers' do
workers = described_class.workers
context 'without except argument' do expect(workers).to include(PostReceive)
it 'returns all queues defined on config/sidekiq_queues.yml file' do expect(workers).to include(MergeWorker)
expected_queues = YAML.load_file(queues_file_path)[:queues].map { |queue, _| queue } end
it 'includes EE workers' do
workers = described_class.workers
expect(described_class.queues).to eq(expected_queues) expect(workers).to include(RepositoryUpdateMirrorWorker)
expect(workers).to include(LdapGroupSyncWorker)
end end
end end
context 'with except argument' do describe '.worker_queues' do
it 'returns queues on config/sidekiq_queues.yml filtering out excluded ones' do it 'includes all queues' do
expected_queues = queues = described_class.worker_queues
YAML.load_file(queues_file_path)[:queues].map { |queue, _| queue } - ['webhook']
expect(described_class.queues(except: ['webhook'])).to eq(expected_queues) expect(queues).to include('post_receive')
expect(queues).to include('merge')
expect(queues).to include('cronjob')
expect(queues).to include('mailers')
expect(queues).to include('default')
end end
it 'includes EE queues' do
queues = described_class.worker_queues
expect(queues).to include('repository_update_mirror')
expect(queues).to include('ldap_group_sync')
end end
end end
end end
require 'spec_helper' require 'spec_helper'
describe DedicatedSidekiqQueue do describe ApplicationWorker do
let(:worker) do let(:worker) do
Class.new do Class.new do
def self.name def self.name
'Foo::Bar::DummyWorker' 'Gitlab::Foo::Bar::DummyWorker'
end end
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
end end
end end
describe 'queue names' do describe 'Sidekiq options' do
it 'sets the queue name based on the class name' do it 'sets the queue name based on the class name' do
expect(worker.sidekiq_options['queue']).to eq('foo_bar_dummy') expect(worker.sidekiq_options['queue']).to eq('foo_bar_dummy')
end end
end end
describe '.queue' do
it 'returns the queue name' do
worker.sidekiq_options queue: :some_queue
expect(worker.queue).to eq('some_queue')
end
end
end end
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe ClusterQueue do describe ClusterQueue do
let(:worker) do let(:worker) do
Class.new do Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include ClusterQueue include ClusterQueue
end end
end end
......
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe CronjobQueue do describe CronjobQueue do
let(:worker) do let(:worker) do
Class.new do Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include CronjobQueue include CronjobQueue
end end
end end
......
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe GeoQueue do describe GeoQueue do
let(:worker) do let(:worker) do
Class.new do Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include GeoQueue include GeoQueue
end end
end end
......
...@@ -3,6 +3,10 @@ require 'spec_helper' ...@@ -3,6 +3,10 @@ require 'spec_helper'
describe Gitlab::GithubImport::ObjectImporter do describe Gitlab::GithubImport::ObjectImporter do
let(:worker) do let(:worker) do
Class.new do Class.new do
def self.name
'DummyWorker'
end
include(Gitlab::GithubImport::ObjectImporter) include(Gitlab::GithubImport::ObjectImporter)
def counter_name def counter_name
......
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe Gitlab::GithubImport::Queue do describe Gitlab::GithubImport::Queue do
it 'sets the Sidekiq options for the worker' do it 'sets the Sidekiq options for the worker' do
worker = Class.new do worker = Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include Gitlab::GithubImport::Queue include Gitlab::GithubImport::Queue
end end
......
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe PipelineQueue do describe PipelineQueue do
let(:worker) do let(:worker) do
Class.new do Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include PipelineQueue include PipelineQueue
end end
end end
......
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe RepositoryCheckQueue do describe RepositoryCheckQueue do
let(:worker) do let(:worker) do
Class.new do Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include RepositoryCheckQueue include RepositoryCheckQueue
end end
end end
......
require 'spec_helper' require 'spec_helper'
describe 'Every Sidekiq worker' do describe 'Every Sidekiq worker' do
let(:workers) do it 'includes ApplicationWorker' do
root = Rails.root.join('app', 'workers') expect(Gitlab::SidekiqConfig.workers).to all(include(ApplicationWorker))
concerns = root.join('concerns').to_s
ee_modules = root.join('ee').to_s
workers = Dir[root.join('**', '*.rb')]
.reject { |path| path.start_with?(concerns, ee_modules) }
workers.map do |path|
ns = Pathname.new(path).relative_path_from(root).to_s.gsub('.rb', '')
ns.camelize.constantize
end
end end
it 'does not use the default queue' do it 'does not use the default queue' do
workers.each do |worker| expect(Gitlab::SidekiqConfig.workers.map(&:queue)).not_to include('default')
expect(worker.sidekiq_options['queue'].to_s).not_to eq('default')
end
end end
it 'uses the cronjob queue when the worker runs as a cronjob' do it 'uses the cronjob queue when the worker runs as a cronjob' do
cron_workers = Settings.cron_jobs expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(eq('cronjob'))
.map { |job_name, options| options['job_class'].constantize }
.to_set
workers.each do |worker|
next unless cron_workers.include?(worker)
expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob')
end
end end
it 'defines the queue in the Sidekiq configuration file' do it 'defines the queue in the Sidekiq configuration file' do
config = YAML.load_file(Rails.root.join('config', 'sidekiq_queues.yml').to_s) config_queue_names = Gitlab::SidekiqConfig.config_queues.to_set
queue_names = config[:queues].map { |(queue, _)| queue }.to_set
workers.each do |worker| expect(Gitlab::SidekiqConfig.worker_queues).to all(be_in(config_queue_names))
expect(queue_names).to include(worker.sidekiq_options['queue'].to_s)
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment