Commit 21384d80 authored by Sean McGivern's avatar Sean McGivern

Merge branch 'dm-application-worker-ee' into 'master'

[EE] Add ApplicationWorker and make every worker include it

See merge request gitlab-org/gitlab-ee!3577
parents 97f24dbd f803eb79
...@@ -5,6 +5,7 @@ require 'carrierwave/orm/activerecord' ...@@ -5,6 +5,7 @@ require 'carrierwave/orm/activerecord'
class Group < Namespace class Group < Namespace
include EE::Group include EE::Group
include Gitlab::ConfigHelper include Gitlab::ConfigHelper
include AfterCommitQueue
include AccessRequestable include AccessRequestable
include Avatarable include Avatarable
include Referable include Referable
......
...@@ -2,6 +2,7 @@ require 'digest/md5' ...@@ -2,6 +2,7 @@ require 'digest/md5'
class Key < ActiveRecord::Base class Key < ActiveRecord::Base
include Gitlab::CurrentSettings include Gitlab::CurrentSettings
include AfterCommitQueue
include Sortable include Sortable
belongs_to :user belongs_to :user
......
class Member < ActiveRecord::Base class Member < ActiveRecord::Base
include AfterCommitQueue
include Sortable include Sortable
include Importable include Importable
include Expirable include Expirable
......
...@@ -211,7 +211,7 @@ class Service < ActiveRecord::Base ...@@ -211,7 +211,7 @@ class Service < ActiveRecord::Base
def async_execute(data) def async_execute(data)
return unless supported_events.include?(data[:object_kind]) return unless supported_events.include?(data[:object_kind])
Sidekiq::Client.enqueue(ProjectServiceWorker, id, data) ProjectServiceWorker.perform_async(id, data)
end end
def issue_tracker? def issue_tracker?
......
...@@ -7,6 +7,7 @@ class User < ActiveRecord::Base ...@@ -7,6 +7,7 @@ class User < ActiveRecord::Base
include Gitlab::ConfigHelper include Gitlab::ConfigHelper
include Gitlab::CurrentSettings include Gitlab::CurrentSettings
include Gitlab::SQL::Pattern include Gitlab::SQL::Pattern
include AfterCommitQueue
include Avatarable include Avatarable
include Referable include Referable
include Sortable include Sortable
...@@ -922,6 +923,7 @@ class User < ActiveRecord::Base ...@@ -922,6 +923,7 @@ class User < ActiveRecord::Base
def post_destroy_hook def post_destroy_hook
log_info("User \"#{name}\" (#{email}) was removed") log_info("User \"#{name}\" (#{email}) was removed")
system_hook_service.execute_hooks_for(self, :destroy) system_hook_service.execute_hooks_for(self, :destroy)
end end
......
...@@ -52,7 +52,7 @@ module Geo ...@@ -52,7 +52,7 @@ module Geo
to_remove to_remove
end end
Sidekiq::Client.push_bulk('class' => Geo::FileRemovalWorker, 'args' => paths_to_remove) Geo::FileRemovalWorker.bulk_perform_async(paths_to_remove)
end end
def mark_for_resync! def mark_for_resync!
......
...@@ -2,7 +2,11 @@ class SystemHooksService ...@@ -2,7 +2,11 @@ class SystemHooksService
prepend EE::SystemHooksService prepend EE::SystemHooksService
def execute_hooks_for(model, event) def execute_hooks_for(model, event)
execute_hooks(build_event_data(model, event)) data = build_event_data(model, event)
model.run_after_commit_or_now do
SystemHooksService.new.execute_hooks(data)
end
end end
def execute_hooks(data, hooks_scope = :all) def execute_hooks(data, hooks_scope = :all)
......
...@@ -63,7 +63,7 @@ class WebHookService ...@@ -63,7 +63,7 @@ class WebHookService
end end
def async_execute def async_execute
Sidekiq::Client.enqueue(WebHookWorker, hook.id, data, hook_name) WebHookWorker.perform_async(hook.id, data, hook_name)
end end
private private
......
class AdminEmailWorker class AdminEmailWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class AuthorizedProjectsWorker class AuthorizedProjectsWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
# Schedules multiple jobs and waits for them to be completed. # Schedules multiple jobs and waits for them to be completed.
def self.bulk_perform_and_wait(args_list) def self.bulk_perform_and_wait(args_list)
...@@ -17,11 +16,6 @@ class AuthorizedProjectsWorker ...@@ -17,11 +16,6 @@ class AuthorizedProjectsWorker
waiter.wait waiter.wait
end end
# Schedules multiple jobs to run in sidekiq without waiting for completion
def self.bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
end
# Performs multiple jobs directly. Failed jobs will be put into sidekiq so # Performs multiple jobs directly. Failed jobs will be put into sidekiq so
# they can benefit from retries # they can benefit from retries
def self.bulk_perform_inline(args_list) def self.bulk_perform_inline(args_list)
......
class BackgroundMigrationWorker class BackgroundMigrationWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
# Enqueues a number of jobs in bulk.
#
# The `jobs` argument should be an Array of Arrays, each sub-array must be in
# the form:
#
# [migration-class, [arg1, arg2, ...]]
def self.perform_bulk(jobs)
Sidekiq::Client.push_bulk('class' => self,
'queue' => sidekiq_options['queue'],
'args' => jobs)
end
# Schedules multiple jobs in bulk, with a delay.
#
def self.perform_bulk_in(delay, jobs)
now = Time.now.to_i
schedule = now + delay.to_i
if schedule <= now
raise ArgumentError, 'The schedule time must be in the future!'
end
Sidekiq::Client.push_bulk('class' => self,
'queue' => sidekiq_options['queue'],
'args' => jobs,
'at' => schedule)
end
# Performs the background migration. # Performs the background migration.
# #
......
class BuildCoverageWorker class BuildCoverageWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
def perform(build_id) def perform(build_id)
......
class BuildFinishedWorker class BuildFinishedWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class BuildHooksWorker class BuildHooksWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :hooks enqueue_in group: :hooks
......
class BuildQueueWorker class BuildQueueWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class BuildSuccessWorker class BuildSuccessWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class BuildTraceSectionsWorker class BuildTraceSectionsWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
def perform(build_id) def perform(build_id)
......
class ClusterInstallAppWorker class ClusterInstallAppWorker
include Sidekiq::Worker include ApplicationWorker
include ClusterQueue include ClusterQueue
include ClusterApplications include ClusterApplications
......
class ClusterProvisionWorker class ClusterProvisionWorker
include Sidekiq::Worker include ApplicationWorker
include ClusterQueue include ClusterQueue
def perform(cluster_id) def perform(cluster_id)
......
class ClusterWaitForAppInstallationWorker class ClusterWaitForAppInstallationWorker
include Sidekiq::Worker include ApplicationWorker
include ClusterQueue include ClusterQueue
include ClusterApplications include ClusterApplications
......
Sidekiq::Worker.extend ActiveSupport::Concern
module ApplicationWorker
extend ActiveSupport::Concern
include Sidekiq::Worker
included do
sidekiq_options queue: base_queue_name
end
module ClassMethods
def base_queue_name
name
.sub(/\AGitlab::/, '')
.sub(/Worker\z/, '')
.underscore
.tr('/', '_')
end
def queue
get_sidekiq_options['queue'].to_s
end
def bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
end
def bulk_perform_in(delay, args_list)
now = Time.now.to_i
schedule = now + delay.to_i
if schedule <= now
raise ArgumentError, 'The schedule time must be in the future!'
end
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule)
end
end
end
# Concern that sets the queue of a Sidekiq worker based on the worker's class
# name/namespace.
module DedicatedSidekiqQueue
extend ActiveSupport::Concern
included do
sidekiq_options queue: name.sub(/Worker\z/, '').underscore.tr('/', '_')
end
end
...@@ -8,7 +8,7 @@ module Gitlab ...@@ -8,7 +8,7 @@ module Gitlab
extend ActiveSupport::Concern extend ActiveSupport::Concern
included do included do
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include ReschedulingMethods include ReschedulingMethods
include NotifyUponDeath include NotifyUponDeath
......
class CreateGpgSignatureWorker class CreateGpgSignatureWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(commit_sha, project_id) def perform(commit_sha, project_id)
project = Project.find_by(id: project_id) project = Project.find_by(id: project_id)
......
class CreatePipelineWorker class CreatePipelineWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :creation enqueue_in group: :creation
......
class DeleteMergedBranchesWorker class DeleteMergedBranchesWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(project_id, user_id) def perform(project_id, user_id)
begin begin
......
class DeleteUserWorker class DeleteUserWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(current_user_id, delete_user_id, options = {}) def perform(current_user_id, delete_user_id, options = {})
delete_user = User.find(delete_user_id) delete_user = User.find(delete_user_id)
......
class EmailReceiverWorker class EmailReceiverWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(raw) def perform(raw)
return unless Gitlab::IncomingEmail.enabled? return unless Gitlab::IncomingEmail.enabled?
......
class EmailsOnPushWorker class EmailsOnPushWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
attr_reader :email, :skip_premailer attr_reader :email, :skip_premailer
......
class ExpireBuildArtifactsWorker class ExpireBuildArtifactsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
...@@ -8,6 +8,6 @@ class ExpireBuildArtifactsWorker ...@@ -8,6 +8,6 @@ class ExpireBuildArtifactsWorker
build_ids = Ci::Build.with_expired_artifacts.pluck(:id) build_ids = Ci::Build.with_expired_artifacts.pluck(:id)
build_ids = build_ids.map { |build_id| [build_id] } build_ids = build_ids.map { |build_id| [build_id] }
Sidekiq::Client.push_bulk('class' => ExpireBuildInstanceArtifactsWorker, 'args' => build_ids ) ExpireBuildInstanceArtifactsWorker.bulk_perform_async(build_ids)
end end
end end
class ExpireBuildInstanceArtifactsWorker class ExpireBuildInstanceArtifactsWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(build_id) def perform(build_id)
build = Ci::Build build = Ci::Build
......
class ExpireJobCacheWorker class ExpireJobCacheWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :cache enqueue_in group: :cache
......
class ExpirePipelineCacheWorker class ExpirePipelineCacheWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :cache enqueue_in group: :cache
......
module Geo module Geo
class BaseSchedulerWorker class BaseSchedulerWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
include ExclusiveLeaseGuard include ExclusiveLeaseGuard
......
module Geo module Geo
class FileDownloadWorker class FileDownloadWorker
include Sidekiq::Worker include ApplicationWorker
sidekiq_options queue: :geo_file_download, retry: 3, dead: false
sidekiq_options retry: 3, dead: false
def perform(object_type, object_id) def perform(object_type, object_id)
Geo::FileDownloadService.new(object_type.to_sym, object_id).execute Geo::FileDownloadService.new(object_type.to_sym, object_id).execute
......
module Geo module Geo
class FileRemovalWorker class FileRemovalWorker
include Sidekiq::Worker include ApplicationWorker
include Gitlab::Geo::LogHelpers include Gitlab::Geo::LogHelpers
include GeoQueue
sidekiq_options queue: :geo
def perform(file_path) def perform(file_path)
remove_file!(file_path) remove_file!(file_path)
......
module Geo module Geo
class HashedStorageAttachmentsMigrationWorker class HashedStorageAttachmentsMigrationWorker
include Sidekiq::Worker include ApplicationWorker
include GeoQueue include GeoQueue
def perform(project_id, old_attachments_path, new_attachments_path) def perform(project_id, old_attachments_path, new_attachments_path)
......
module Geo module Geo
class HashedStorageMigrationWorker class HashedStorageMigrationWorker
include Sidekiq::Worker include ApplicationWorker
include GeoQueue include GeoQueue
def perform(project_id, old_disk_path, new_disk_path, old_storage_version) def perform(project_id, old_disk_path, new_disk_path, old_storage_version)
......
module Geo module Geo
class MetricsUpdateWorker class MetricsUpdateWorker
include Sidekiq::Worker include ApplicationWorker
include ExclusiveLeaseGuard include ExclusiveLeaseGuard
include CronjobQueue include CronjobQueue
......
module Geo module Geo
class ProjectSyncWorker class ProjectSyncWorker
include Sidekiq::Worker include ApplicationWorker
sidekiq_options queue: :geo_project_sync, retry: 3, dead: false sidekiq_options retry: 3, dead: false
sidekiq_retry_in { |count| 30 * count } sidekiq_retry_in { |count| 30 * count }
......
module Geo module Geo
class PruneEventLogWorker class PruneEventLogWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
include ExclusiveLeaseGuard include ExclusiveLeaseGuard
include ::Gitlab::Geo::LogHelpers include ::Gitlab::Geo::LogHelpers
......
module Geo module Geo
class RenameRepositoryWorker class RenameRepositoryWorker
include Sidekiq::Worker include ApplicationWorker
include GeoQueue include GeoQueue
def perform(project_id, old_disk_path, new_disk_path) def perform(project_id, old_disk_path, new_disk_path)
......
module Geo module Geo
class RepositoriesCleanUpWorker class RepositoriesCleanUpWorker
include Sidekiq::Worker include ApplicationWorker
include GeoQueue include GeoQueue
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include ExclusiveLeaseGuard include ExclusiveLeaseGuard
......
class GeoRepositoryDestroyWorker class GeoRepositoryDestroyWorker
include Sidekiq::Worker include ApplicationWorker
include GeoQueue include GeoQueue
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
......
class GitGarbageCollectWorker class GitGarbageCollectWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include Gitlab::CurrentSettings include Gitlab::CurrentSettings
sidekiq_options retry: false sidekiq_options retry: false
......
...@@ -7,7 +7,7 @@ module Gitlab ...@@ -7,7 +7,7 @@ module Gitlab
# been completed this worker will advance the import process to the next # been completed this worker will advance the import process to the next
# stage. # stage.
class AdvanceStageWorker class AdvanceStageWorker
include Sidekiq::Worker include ApplicationWorker
sidekiq_options queue: 'github_importer_advance_stage', dead: false sidekiq_options queue: 'github_importer_advance_stage', dead: false
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
module Gitlab module Gitlab
module GithubImport module GithubImport
class RefreshImportJidWorker class RefreshImportJidWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
# The interval to schedule new instances of this job at. # The interval to schedule new instances of this job at.
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class FinishImportWorker class FinishImportWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class ImportBaseDataWorker class ImportBaseDataWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class ImportIssuesAndDiffNotesWorker class ImportIssuesAndDiffNotesWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class ImportNotesWorker class ImportNotesWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class ImportPullRequestsWorker class ImportPullRequestsWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
...@@ -4,7 +4,7 @@ module Gitlab ...@@ -4,7 +4,7 @@ module Gitlab
module GithubImport module GithubImport
module Stage module Stage
class ImportRepositoryWorker class ImportRepositoryWorker
include Sidekiq::Worker include ApplicationWorker
include GithubImport::Queue include GithubImport::Queue
include StageMethods include StageMethods
......
class GitlabShellWorker class GitlabShellWorker
include Sidekiq::Worker include ApplicationWorker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
def perform(action, *arg) def perform(action, *arg)
gitlab_shell.__send__(action, *arg) # rubocop:disable GitlabSecurity/PublicSend gitlab_shell.__send__(action, *arg) # rubocop:disable GitlabSecurity/PublicSend
......
class GitlabUsagePingWorker class GitlabUsagePingWorker
LEASE_TIMEOUT = 86400 LEASE_TIMEOUT = 86400
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class GroupDestroyWorker class GroupDestroyWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include ExceptionBacktrace include ExceptionBacktrace
def perform(group_id, user_id) def perform(group_id, user_id)
......
class ImportExportProjectCleanupWorker class ImportExportProjectCleanupWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class InvalidGpgSignatureUpdateWorker class InvalidGpgSignatureUpdateWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(gpg_key_id) def perform(gpg_key_id)
gpg_key = GpgKey.find_by(id: gpg_key_id) gpg_key = GpgKey.find_by(id: gpg_key_id)
......
...@@ -2,8 +2,7 @@ require 'json' ...@@ -2,8 +2,7 @@ require 'json'
require 'socket' require 'socket'
class IrkerWorker class IrkerWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(project_id, chans, colors, push_data, settings) def perform(project_id, chans, colors, push_data, settings)
project = Project.find(project_id) project = Project.find(project_id)
......
class MergeWorker class MergeWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(merge_request_id, current_user_id, params) def perform(merge_request_id, current_user_id, params)
params = params.with_indifferent_access params = params.with_indifferent_access
......
...@@ -5,14 +5,9 @@ ...@@ -5,14 +5,9 @@
# The worker will reject doing anything for projects that *do* have a # The worker will reject doing anything for projects that *do* have a
# namespace. For those use ProjectDestroyWorker instead. # namespace. For those use ProjectDestroyWorker instead.
class NamespacelessProjectDestroyWorker class NamespacelessProjectDestroyWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include ExceptionBacktrace include ExceptionBacktrace
def self.bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
end
def perform(project_id) def perform(project_id)
begin begin
project = Project.unscoped.find(project_id) project = Project.unscoped.find(project_id)
......
class NewIssueWorker class NewIssueWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include NewIssuable include NewIssuable
def perform(issue_id, user_id) def perform(issue_id, user_id)
......
class NewMergeRequestWorker class NewMergeRequestWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include NewIssuable include NewIssuable
def perform(merge_request_id, user_id) def perform(merge_request_id, user_id)
......
class NewNoteWorker class NewNoteWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
# Keep extra parameter to preserve backwards compatibility with # Keep extra parameter to preserve backwards compatibility with
# old `NewNoteWorker` jobs (can remove later) # old `NewNoteWorker` jobs (can remove later)
......
class PagesWorker class PagesWorker
include Sidekiq::Worker include ApplicationWorker
sidekiq_options queue: :pages, retry: false sidekiq_options queue: :pages, retry: false
......
class PipelineHooksWorker class PipelineHooksWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :hooks enqueue_in group: :hooks
......
class PipelineMetricsWorker class PipelineMetricsWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
def perform(pipeline_id) def perform(pipeline_id)
......
class PipelineNotificationWorker class PipelineNotificationWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
def perform(pipeline_id, recipients = nil) def perform(pipeline_id, recipients = nil)
......
class PipelineProcessWorker class PipelineProcessWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class PipelineScheduleWorker class PipelineScheduleWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class PipelineSuccessWorker class PipelineSuccessWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class PipelineUpdateWorker class PipelineUpdateWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class PostReceive class PostReceive
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
prepend EE::PostReceive prepend EE::PostReceive
def perform(gl_repository, identifier, changes) def perform(gl_repository, identifier, changes)
......
...@@ -5,8 +5,7 @@ ...@@ -5,8 +5,7 @@
# Consider using an extra worker if you need to add any extra (and potentially # Consider using an extra worker if you need to add any extra (and potentially
# slow) processing of commits. # slow) processing of commits.
class ProcessCommitWorker class ProcessCommitWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
# project_id - The ID of the project this commit belongs to. # project_id - The ID of the project this commit belongs to.
# user_id - The ID of the user that pushed the commit. # user_id - The ID of the user that pushed the commit.
......
# Worker for updating any project specific caches. # Worker for updating any project specific caches.
class ProjectCacheWorker class ProjectCacheWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
prepend EE::Workers::ProjectCacheWorker prepend EE::Workers::ProjectCacheWorker
LEASE_TIMEOUT = 15.minutes.to_i LEASE_TIMEOUT = 15.minutes.to_i
......
class ProjectDestroyWorker class ProjectDestroyWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include ExceptionBacktrace include ExceptionBacktrace
def perform(project_id, user_id, params) def perform(project_id, user_id, params)
......
class ProjectExportWorker class ProjectExportWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include ExceptionBacktrace include ExceptionBacktrace
sidekiq_options retry: 3 sidekiq_options retry: 3
......
class ProjectMigrateHashedStorageWorker class ProjectMigrateHashedStorageWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
LEASE_TIMEOUT = 30.seconds.to_i LEASE_TIMEOUT = 30.seconds.to_i
......
class ProjectServiceWorker class ProjectServiceWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
sidekiq_options dead: false sidekiq_options dead: false
......
# Worker for updating any project specific caches. # Worker for updating any project specific caches.
class PropagateServiceTemplateWorker class PropagateServiceTemplateWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
LEASE_TIMEOUT = 4.hours.to_i LEASE_TIMEOUT = 4.hours.to_i
......
class PruneOldEventsWorker class PruneOldEventsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class ReactiveCachingWorker class ReactiveCachingWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(class_name, id, *args) def perform(class_name, id, *args)
klass = begin klass = begin
......
class RemoveExpiredGroupLinksWorker class RemoveExpiredGroupLinksWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class RemoveExpiredMembersWorker class RemoveExpiredMembersWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class RemoveOldWebHookLogsWorker class RemoveOldWebHookLogsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
WEB_HOOK_LOG_LIFETIME = 2.days WEB_HOOK_LOG_LIFETIME = 2.days
......
class RemoveUnreferencedLfsObjectsWorker class RemoveUnreferencedLfsObjectsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class RepositoryArchiveCacheWorker class RepositoryArchiveCacheWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
module RepositoryCheck module RepositoryCheck
class BatchWorker class BatchWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
RUN_TIME = 3600 RUN_TIME = 3600
......
module RepositoryCheck module RepositoryCheck
class ClearWorker class ClearWorker
include Sidekiq::Worker include ApplicationWorker
include RepositoryCheckQueue include RepositoryCheckQueue
def perform def perform
......
module RepositoryCheck module RepositoryCheck
class SingleRepositoryWorker class SingleRepositoryWorker
include Sidekiq::Worker include ApplicationWorker
include RepositoryCheckQueue include RepositoryCheckQueue
def perform(project_id) def perform(project_id)
......
class RepositoryForkWorker class RepositoryForkWorker
ForkError = Class.new(StandardError) ForkError = Class.new(StandardError)
include Sidekiq::Worker include ApplicationWorker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
include ProjectStartImport include ProjectStartImport
sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
......
class RepositoryImportWorker class RepositoryImportWorker
ImportError = Class.new(StandardError) ImportError = Class.new(StandardError)
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include ExceptionBacktrace include ExceptionBacktrace
include ProjectStartImport include ProjectStartImport
......
class RequestsProfilesWorker class RequestsProfilesWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class ScheduleUpdateUserActivityWorker class ScheduleUpdateUserActivityWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform(batch_size = 500) def perform(batch_size = 500)
......
class StageUpdateWorker class StageUpdateWorker
include Sidekiq::Worker include ApplicationWorker
include PipelineQueue include PipelineQueue
enqueue_in group: :processing enqueue_in group: :processing
......
class StorageMigratorWorker class StorageMigratorWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
BATCH_SIZE = 100 BATCH_SIZE = 100
......
class StuckCiJobsWorker class StuckCiJobsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
EXCLUSIVE_LEASE_KEY = 'stuck_ci_builds_worker_lease'.freeze EXCLUSIVE_LEASE_KEY = 'stuck_ci_builds_worker_lease'.freeze
......
class StuckImportJobsWorker class StuckImportJobsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
IMPORT_JOBS_EXPIRATION = 15.hours.to_i IMPORT_JOBS_EXPIRATION = 15.hours.to_i
......
class StuckMergeJobsWorker class StuckMergeJobsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class SystemHookPushWorker class SystemHookPushWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(push_data, hook_id) def perform(push_data, hook_id)
SystemHooksService.new.execute_hooks(push_data, hook_id) SystemHooksService.new.execute_hooks(push_data, hook_id)
......
class TrendingProjectsWorker class TrendingProjectsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class UpdateMergeRequestsWorker class UpdateMergeRequestsWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
LOG_TIME_THRESHOLD = 90 # seconds LOG_TIME_THRESHOLD = 90 # seconds
......
class UpdateUserActivityWorker class UpdateUserActivityWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(pairs) def perform(pairs)
pairs = cast_data(pairs) pairs = cast_data(pairs)
......
class UploadChecksumWorker class UploadChecksumWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(upload_id) def perform(upload_id)
upload = Upload.find(upload_id) upload = Upload.find(upload_id)
......
class WaitForClusterCreationWorker class WaitForClusterCreationWorker
include Sidekiq::Worker include ApplicationWorker
include ClusterQueue include ClusterQueue
def perform(cluster_id) def perform(cluster_id)
......
class WebHookWorker class WebHookWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
sidekiq_options retry: 4, dead: false sidekiq_options retry: 4, dead: false
......
...@@ -13,20 +13,19 @@ module Sidekiq ...@@ -13,20 +13,19 @@ module Sidekiq
module ClassMethods module ClassMethods
module NoSchedulingFromTransactions module NoSchedulingFromTransactions
NESTING = ::Rails.env.test? ? 1 : 0
%i(perform_async perform_at perform_in).each do |name| %i(perform_async perform_at perform_in).each do |name|
define_method(name) do |*args| define_method(name) do |*args|
return super(*args) if Sidekiq::Worker.skip_transaction_check if !Sidekiq::Worker.skip_transaction_check && AfterCommitQueue.inside_transaction?
return super(*args) unless ActiveRecord::Base.connection.open_transactions > NESTING raise <<-MSG.strip_heredoc
`#{self}.#{name}` cannot be called inside a transaction as this can lead to
race conditions when the worker runs before the transaction is committed and
tries to access a model that has not been saved yet.
raise <<-MSG.strip_heredoc Use an `after_commit` hook, or include `AfterCommitQueue` and use a `run_after_commit` block instead.
`#{self}.#{name}` cannot be called inside a transaction as this can lead to MSG
race conditions when the worker runs before the transaction is committed and end
tries to access a model that has not been saved yet.
Use an `after_commit` hook, or include `AfterCommitQueue` and use a `run_after_commit` block instead. super(*args)
MSG
end end
end end
end end
......
...@@ -76,12 +76,14 @@ end ...@@ -76,12 +76,14 @@ end
# The Sidekiq client API always adds the queue to the Sidekiq queue # The Sidekiq client API always adds the queue to the Sidekiq queue
# list, but mail_room and gitlab-shell do not. This is only necessary # list, but mail_room and gitlab-shell do not. This is only necessary
# for monitoring. # for monitoring.
queues = Gitlab::SidekiqConfig.queues
begin begin
queues = Gitlab::SidekiqConfig.worker_queues
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
conn.pipelined do conn.pipelined do
queues.each { |queue| conn.sadd('queues', queue) } queues.each do |queue|
conn.sadd('queues', queue)
end
end end
end end
rescue Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED rescue Redis::BaseError, SocketError, Errno::ENOENT, Errno::EADDRNOTAVAIL, Errno::EAFNOSUPPORT, Errno::ECONNRESET, Errno::ECONNREFUSED
......
...@@ -102,6 +102,6 @@ class UpdateAuthorizedKeysFile < ActiveRecord::Migration ...@@ -102,6 +102,6 @@ class UpdateAuthorizedKeysFile < ActiveRecord::Migration
def update_authorized_keys_file_since(cutoff_datetime) def update_authorized_keys_file_since(cutoff_datetime)
job = [['UpdateAuthorizedKeysFileSince', [cutoff_datetime]]] job = [['UpdateAuthorizedKeysFileSince', [cutoff_datetime]]]
BackgroundMigrationWorker.perform_bulk(job) BackgroundMigrationWorker.bulk_perform_async(job)
end end
end end
...@@ -25,14 +25,14 @@ class ScheduleEventMigrations < ActiveRecord::Migration ...@@ -25,14 +25,14 @@ class ScheduleEventMigrations < ActiveRecord::Migration
# We push multiple jobs at a time to reduce the time spent in # We push multiple jobs at a time to reduce the time spent in
# Sidekiq/Redis operations. We're using this buffer based approach so we # Sidekiq/Redis operations. We're using this buffer based approach so we
# don't need to run additional queries for every range. # don't need to run additional queries for every range.
BackgroundMigrationWorker.perform_bulk(jobs) BackgroundMigrationWorker.bulk_perform_async(jobs)
jobs.clear jobs.clear
end end
jobs << ['MigrateEventsToPushEventPayloads', [min, max]] jobs << ['MigrateEventsToPushEventPayloads', [min, max]]
end end
BackgroundMigrationWorker.perform_bulk(jobs) unless jobs.empty? BackgroundMigrationWorker.bulk_perform_async(jobs) unless jobs.empty?
end end
def down def down
......
...@@ -19,7 +19,7 @@ class ScheduleCreateGpgKeySubkeysFromGpgKeys < ActiveRecord::Migration ...@@ -19,7 +19,7 @@ class ScheduleCreateGpgKeySubkeysFromGpgKeys < ActiveRecord::Migration
[MIGRATION, [id]] [MIGRATION, [id]]
end end
BackgroundMigrationWorker.perform_bulk(jobs) BackgroundMigrationWorker.bulk_perform_async(jobs)
end end
end end
......
...@@ -68,10 +68,10 @@ BackgroundMigrationWorker.perform_async('BackgroundMigrationClassName', [arg1, a ...@@ -68,10 +68,10 @@ BackgroundMigrationWorker.perform_async('BackgroundMigrationClassName', [arg1, a
``` ```
Usually it's better to enqueue jobs in bulk, for this you can use Usually it's better to enqueue jobs in bulk, for this you can use
`BackgroundMigrationWorker.perform_bulk`: `BackgroundMigrationWorker.bulk_perform_async`:
```ruby ```ruby
BackgroundMigrationWorker.perform_bulk( BackgroundMigrationWorker.bulk_perform_async(
[['BackgroundMigrationClassName', [1]], [['BackgroundMigrationClassName', [1]],
['BackgroundMigrationClassName', [2]]] ['BackgroundMigrationClassName', [2]]]
) )
...@@ -85,13 +85,13 @@ updates. Removals in turn can be handled by simply defining foreign keys with ...@@ -85,13 +85,13 @@ updates. Removals in turn can be handled by simply defining foreign keys with
cascading deletes. cascading deletes.
If you would like to schedule jobs in bulk with a delay, you can use If you would like to schedule jobs in bulk with a delay, you can use
`BackgroundMigrationWorker.perform_bulk_in`: `BackgroundMigrationWorker.bulk_perform_in`:
```ruby ```ruby
jobs = [['BackgroundMigrationClassName', [1]], jobs = [['BackgroundMigrationClassName', [1]],
['BackgroundMigrationClassName', [2]]] ['BackgroundMigrationClassName', [2]]]
BackgroundMigrationWorker.perform_bulk_in(5.minutes, jobs) BackgroundMigrationWorker.bulk_perform_in(5.minutes, jobs)
``` ```
## Cleaning Up ## Cleaning Up
...@@ -201,7 +201,7 @@ class ScheduleExtractServicesUrl < ActiveRecord::Migration ...@@ -201,7 +201,7 @@ class ScheduleExtractServicesUrl < ActiveRecord::Migration
['ExtractServicesUrl', [id]] ['ExtractServicesUrl', [id]]
end end
BackgroundMigrationWorker.perform_bulk(jobs) BackgroundMigrationWorker.bulk_perform_async(jobs)
end end
end end
......
...@@ -3,6 +3,12 @@ ...@@ -3,6 +3,12 @@
This document outlines various guidelines that should be followed when adding or This document outlines various guidelines that should be followed when adding or
modifying Sidekiq workers. modifying Sidekiq workers.
## ApplicationWorker
All workers should include `ApplicationWorker` instead of `Sidekiq::Worker`,
which adds some convenience methods and automatically sets the queue based on
the worker's name.
## Default Queue ## Default Queue
Use of the "default" queue is not allowed. Every worker should use a queue that Use of the "default" queue is not allowed. Every worker should use a queue that
...@@ -13,19 +19,10 @@ A list of all available queues can be found in `config/sidekiq_queues.yml`. ...@@ -13,19 +19,10 @@ A list of all available queues can be found in `config/sidekiq_queues.yml`.
## Dedicated Queues ## Dedicated Queues
Most workers should use their own queue. To ease this process a worker can Most workers should use their own queue, which is automatically set based on the
include the `DedicatedSidekiqQueue` concern as follows: worker class name. For a worker named `ProcessSomethingWorker`, the queue name
would be `process_something`. If you're not sure what a worker's queue name is,
```ruby you can find it using `SomeWorker.queue`.
class ProcessSomethingWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
end
```
This will set the queue name based on the class' name, minus the `Worker`
suffix. In the above example this would lead to the queue being
`process_something`.
In some cases multiple workers do use the same queue. For example, the various In some cases multiple workers do use the same queue. For example, the various
workers for updating CI pipelines all use the `pipeline` queue. Adding workers workers for updating CI pipelines all use the `pipeline` queue. Adding workers
...@@ -39,7 +36,7 @@ tests should be placed in `spec/workers`. ...@@ -39,7 +36,7 @@ tests should be placed in `spec/workers`.
## Removing or renaming queues ## Removing or renaming queues
Try to avoid renaming or removing queues in minor and patch releases. Try to avoid renaming or removing queues in minor and patch releases.
During online update instance can have pending jobs and removing the queue can During online update instance can have pending jobs and removing the queue can
lead to those jobs being stuck forever. If you can't write migration for those lead to those jobs being stuck forever. If you can't write migration for those
Sidekiq jobs, please consider doing rename or remove queue in major release only. Sidekiq jobs, please consider doing rename or remove queue in major release only.
\ No newline at end of file
class AdminEmailsWorker class AdminEmailsWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(recipient_id, subject, body) def perform(recipient_id, subject, body)
recipient_list(recipient_id).pluck(:id).uniq.each do |user_id| recipient_list(recipient_id).pluck(:id).uniq.each do |user_id|
......
class ClearSharedRunnersMinutesWorker class ClearSharedRunnersMinutesWorker
LEASE_TIMEOUT = 3600 LEASE_TIMEOUT = 3600
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class ElasticBatchProjectIndexerWorker class ElasticBatchProjectIndexerWorker
include Sidekiq::Worker include ApplicationWorker
include Gitlab::CurrentSettings include Gitlab::CurrentSettings
# Batch indexing is a generally a onetime option, so give finer control over # Batch indexing is a generally a onetime option, so give finer control over
# queuing and concurrency # queuing and concurrency
include DedicatedSidekiqQueue
# This worker is long-running, but idempotent, so retry many times if # This worker is long-running, but idempotent, so retry many times if
# necessary # necessary
......
class ElasticCommitIndexerWorker class ElasticCommitIndexerWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include Gitlab::CurrentSettings include Gitlab::CurrentSettings
sidekiq_options retry: 2 sidekiq_options retry: 2
......
class ElasticIndexerWorker class ElasticIndexerWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
include Elasticsearch::Model::Client::ClassMethods include Elasticsearch::Model::Client::ClassMethods
include Gitlab::CurrentSettings include Gitlab::CurrentSettings
......
class ExportCsvWorker class ExportCsvWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(current_user_id, project_id, params) def perform(current_user_id, project_id, params)
@current_user = User.find(current_user_id) @current_user = User.find(current_user_id)
......
class HistoricalDataWorker class HistoricalDataWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class LdapAllGroupsSyncWorker class LdapAllGroupsSyncWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class LdapGroupSyncWorker class LdapGroupSyncWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(group_ids, provider = nil) def perform(group_ids, provider = nil)
return unless Gitlab::LDAP::Config.group_sync_enabled? return unless Gitlab::LDAP::Config.group_sync_enabled?
......
class LdapSyncWorker class LdapSyncWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
def perform def perform
......
class ObjectStorageUploadWorker class ObjectStorageUploadWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(uploader_class_name, subject_class_name, file_field, subject_id) def perform(uploader_class_name, subject_class_name, file_field, subject_id)
uploader_class = uploader_class_name.constantize uploader_class = uploader_class_name.constantize
......
class ProjectUpdateRepositoryStorageWorker class ProjectUpdateRepositoryStorageWorker
include Sidekiq::Worker include ApplicationWorker
include DedicatedSidekiqQueue
def perform(project_id, new_repository_storage_key) def perform(project_id, new_repository_storage_key)
project = Project.find(project_id) project = Project.find(project_id)
......
class RebaseWorker class RebaseWorker
include Sidekiq::Worker include ApplicationWorker
sidekiq_options queue: :merge sidekiq_options queue: :merge
......
class RepositoryUpdateMirrorWorker class RepositoryUpdateMirrorWorker
UpdateError = Class.new(StandardError) UpdateError = Class.new(StandardError)
include Sidekiq::Worker include ApplicationWorker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
include ProjectStartImport include ProjectStartImport
LEASE_KEY = 'repository_update_mirror_worker_start_scheduler'.freeze LEASE_KEY = 'repository_update_mirror_worker_start_scheduler'.freeze
......
...@@ -2,9 +2,8 @@ class RepositoryUpdateRemoteMirrorWorker ...@@ -2,9 +2,8 @@ class RepositoryUpdateRemoteMirrorWorker
UpdateAlreadyInProgressError = Class.new(StandardError) UpdateAlreadyInProgressError = Class.new(StandardError)
UpdateError = Class.new(StandardError) UpdateError = Class.new(StandardError)
include Sidekiq::Worker include ApplicationWorker
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
sidekiq_options retry: 3, dead: false sidekiq_options retry: 3, dead: false
......
class UpdateAllMirrorsWorker class UpdateAllMirrorsWorker
include Sidekiq::Worker include ApplicationWorker
include CronjobQueue include CronjobQueue
LEASE_TIMEOUT = 5.minutes LEASE_TIMEOUT = 5.minutes
......
...@@ -6,12 +6,34 @@ module AfterCommitQueue ...@@ -6,12 +6,34 @@ module AfterCommitQueue
after_rollback :_clear_after_commit_queue after_rollback :_clear_after_commit_queue
end end
def run_after_commit(method = nil, &block) def run_after_commit(&block)
_after_commit_queue << proc { self.send(method) } if method # rubocop:disable GitlabSecurity/PublicSend
_after_commit_queue << block if block _after_commit_queue << block if block
true
end
def run_after_commit_or_now(&block)
if AfterCommitQueue.inside_transaction?
run_after_commit(&block)
else
instance_eval(&block)
end
true true
end end
def self.open_transactions_baseline
if ::Rails.env.test?
return DatabaseCleaner.connections.count { |conn| conn.strategy.is_a?(DatabaseCleaner::ActiveRecord::Transaction) }
end
0
end
def self.inside_transaction?
ActiveRecord::Base.connection.open_transactions > open_transactions_baseline
end
protected protected
def _run_after_commit_queue def _run_after_commit_queue
......
...@@ -703,14 +703,14 @@ into similar problems in the future (e.g. when new tables are created). ...@@ -703,14 +703,14 @@ into similar problems in the future (e.g. when new tables are created).
# We push multiple jobs at a time to reduce the time spent in # We push multiple jobs at a time to reduce the time spent in
# Sidekiq/Redis operations. We're using this buffer based approach so we # Sidekiq/Redis operations. We're using this buffer based approach so we
# don't need to run additional queries for every range. # don't need to run additional queries for every range.
BackgroundMigrationWorker.perform_bulk(jobs) BackgroundMigrationWorker.bulk_perform_async(jobs)
jobs.clear jobs.clear
end end
jobs << [job_class_name, [start_id, end_id]] jobs << [job_class_name, [start_id, end_id]]
end end
BackgroundMigrationWorker.perform_bulk(jobs) unless jobs.empty? BackgroundMigrationWorker.bulk_perform_async(jobs) unless jobs.empty?
end end
# Queues background migration jobs for an entire table, batched by ID range. # Queues background migration jobs for an entire table, batched by ID range.
......
...@@ -62,26 +62,32 @@ module Gitlab ...@@ -62,26 +62,32 @@ module Gitlab
# directory - The directory of the Rails application. # directory - The directory of the Rails application.
# #
# Returns an Array containing the PIDs of the started processes. # Returns an Array containing the PIDs of the started processes.
def self.start(queues, env, directory = Dir.pwd) def self.start(queues, env, directory = Dir.pwd, dryrun: false)
queues.map { |pair| start_sidekiq(pair, env, directory) } queues.map { |pair| start_sidekiq(pair, env, directory, dryrun: dryrun) }
end end
# Starts a Sidekiq process that processes _only_ the given queues. # Starts a Sidekiq process that processes _only_ the given queues.
# #
# Returns the PID of the started process. # Returns the PID of the started process.
def self.start_sidekiq(queues, env, directory = Dir.pwd) def self.start_sidekiq(queues, env, directory = Dir.pwd, dryrun: false)
switches = queues.map { |q| "-q#{q},1" } cmd = %w[bundle exec sidekiq]
cmd << "-c #{queues.length + 1}"
cmd << "-e#{env}"
cmd << "-gqueues: #{queues.join(', ')}"
cmd << "-r#{directory}"
queues.each do |q|
cmd << "-q#{q},1"
end
if dryrun
puts "Sidekiq command: #{cmd}" # rubocop:disable Rails/Output
return
end
pid = Process.spawn( pid = Process.spawn(
{ 'ENABLE_SIDEKIQ_CLUSTER' => '1' }, { 'ENABLE_SIDEKIQ_CLUSTER' => '1' },
'bundle', *cmd,
'exec',
'sidekiq',
"-c #{queues.length + 1}",
"-e#{env}",
"-gqueues: #{queues.join(', ')}",
"-r#{directory}",
*switches,
err: $stderr, err: $stderr,
out: $stdout out: $stdout
) )
......
...@@ -15,6 +15,7 @@ module Gitlab ...@@ -15,6 +15,7 @@ module Gitlab
@processes = [] @processes = []
@logger = Logger.new(log_output) @logger = Logger.new(log_output)
@rails_path = Dir.pwd @rails_path = Dir.pwd
@dryrun = false
# Use a log format similar to Sidekiq to make parsing/grepping easier. # Use a log format similar to Sidekiq to make parsing/grepping easier.
@logger.formatter = proc do |level, date, program, message| @logger.formatter = proc do |level, date, program, message|
...@@ -30,18 +31,18 @@ module Gitlab ...@@ -30,18 +31,18 @@ module Gitlab
option_parser.parse!(argv) option_parser.parse!(argv)
parsed_queues = SidekiqCluster.parse_queues(argv) queue_groups = SidekiqCluster.parse_queues(argv)
queues = if @negate_queues
if @negate_queues all_queues = SidekiqConfig.config_queues(@rails_path)
parsed_queues.map { |queues| SidekiqConfig.queues(@rails_path, except: queues) } queue_groups.map! { |queues| all_queues - queues }
else end
parsed_queues
end @logger.info("Starting cluster with #{queue_groups.length} processes")
@logger.info("Starting cluster with #{queues.length} processes") @processes = SidekiqCluster.start(queue_groups, @environment, @rails_path, dryrun: @dryrun)
@processes = SidekiqCluster.start(queues, @environment, @rails_path) return if @dryrun
write_pid write_pid
trap_signals trap_signals
...@@ -107,6 +108,10 @@ module Gitlab ...@@ -107,6 +108,10 @@ module Gitlab
opt.on('-i', '--interval INT', 'The number of seconds to wait between worker checks') do |int| opt.on('-i', '--interval INT', 'The number of seconds to wait between worker checks') do |int|
@interval = int.to_i @interval = int.to_i
end end
opt.on('-d', '--dryrun', 'Print commands that would be run without this flag, and quit') do |int|
@dryrun = true
end
end end
end end
end end
......
...@@ -2,13 +2,51 @@ require 'yaml' ...@@ -2,13 +2,51 @@ require 'yaml'
module Gitlab module Gitlab
module SidekiqConfig module SidekiqConfig
def self.queues(rails_path = Rails.root.to_s, except: []) def self.redis_queues
queues_file_path = File.join(rails_path, 'config', 'sidekiq_queues.yml') @redis_queues ||= Sidekiq::Queue.all.map(&:name)
end
# This method is called by `bin/sidekiq-cluster` in EE, which runs outside
# of bundler/Rails context, so we cannot use any gem or Rails methods.
def self.config_queues(rails_path = Rails.root.to_s)
@config_queues ||= begin
config = YAML.load_file(File.join(rails_path, 'config', 'sidekiq_queues.yml'))
config[:queues].map(&:first)
end
end
def self.cron_workers
@cron_workers ||= Settings.cron_jobs.map { |job_name, options| options['job_class'].constantize }
end
def self.workers
@workers ||=
find_workers(Rails.root.join('app', 'workers')) +
find_workers(Rails.root.join('ee', 'app', 'workers'))
end
def self.default_queues
[ActionMailer::DeliveryJob.queue_name, 'default']
end
def self.worker_queues
@worker_queues ||= (workers.map(&:queue) + default_queues).uniq
end
def self.find_workers(root)
concerns = root.join('concerns').to_s
workers = Dir[root.join('**', '*.rb')]
.reject { |path| path.start_with?(concerns) }
workers.map! do |path|
ns = Pathname.new(path).relative_path_from(root).to_s.gsub('.rb', '')
@queues_file = {} ns.camelize.constantize
@queues_file[queues_file_path] ||= YAML.load_file(queues_file_path) end
@queues_file[queues_file_path].fetch(:queues).map { |queue, _| queue } - except # Skip concerns
workers.select { |w| w < Sidekiq::Worker }
end end
end end
end end
require 'spec_helper'
describe 'bin/sidekiq-cluster' do
it 'runs successfully', :aggregate_failures do
cmd = %w[bin/sidekiq-cluster --dryrun --negate cronjob]
output, status = Gitlab::Popen.popen(cmd, Rails.root.to_s)
expect(status).to be(0)
expect(output).to include('"bundle", "exec", "sidekiq"')
expect(output).not_to include('-qcronjob,1')
expect(output).to include('-qdefault,1')
end
end
...@@ -942,8 +942,8 @@ describe Gitlab::Database::MigrationHelpers do ...@@ -942,8 +942,8 @@ describe Gitlab::Database::MigrationHelpers do
end end
it 'queues jobs in groups of buffer size 1' do it 'queues jobs in groups of buffer size 1' do
expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]]]) expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]]])
expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id3, id3]]]) expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2) model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end end
...@@ -960,8 +960,8 @@ describe Gitlab::Database::MigrationHelpers do ...@@ -960,8 +960,8 @@ describe Gitlab::Database::MigrationHelpers do
end end
it 'queues jobs in bulk all at once (big buffer size)' do it 'queues jobs in bulk all at once (big buffer size)' do
expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]], expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]],
['FooJob', [id3, id3]]]) ['FooJob', [id3, id3]]])
model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2) model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2)
end end
......
...@@ -22,9 +22,9 @@ describe Gitlab::SidekiqCluster::CLI do ...@@ -22,9 +22,9 @@ describe Gitlab::SidekiqCluster::CLI do
context 'with --negate flag' do context 'with --negate flag' do
it 'starts Sidekiq workers for all queues on sidekiq_queues.yml except the ones on argv' do it 'starts Sidekiq workers for all queues on sidekiq_queues.yml except the ones on argv' do
expect(Gitlab::SidekiqConfig).to receive(:queues).and_return(['baz']) expect(Gitlab::SidekiqConfig).to receive(:config_queues).and_return(['baz'])
expect(Gitlab::SidekiqCluster).to receive(:start) expect(Gitlab::SidekiqCluster).to receive(:start)
.with([['baz']], 'test', Dir.pwd) .with([['baz']], 'test', Dir.pwd, dryrun: false)
.and_return([]) .and_return([])
expect(cli).to receive(:write_pid) expect(cli).to receive(:write_pid)
expect(cli).to receive(:trap_signals) expect(cli).to receive(:trap_signals)
......
...@@ -58,10 +58,10 @@ describe Gitlab::SidekiqCluster do ...@@ -58,10 +58,10 @@ describe Gitlab::SidekiqCluster do
describe '.start' do describe '.start' do
it 'starts Sidekiq with the given queues and environment' do it 'starts Sidekiq with the given queues and environment' do
expect(described_class).to receive(:start_sidekiq) expect(described_class).to receive(:start_sidekiq)
.ordered.with(%w(foo), :production, 'foo/bar') .ordered.with(%w(foo), :production, 'foo/bar', dryrun: false)
expect(described_class).to receive(:start_sidekiq) expect(described_class).to receive(:start_sidekiq)
.ordered.with(%w(bar baz), :production, 'foo/bar') .ordered.with(%w(bar baz), :production, 'foo/bar', dryrun: false)
described_class.start([%w(foo), %w(bar baz)], :production, 'foo/bar') described_class.start([%w(foo), %w(bar baz)], :production, 'foo/bar')
end end
......
require 'rails_helper' require 'rails_helper'
describe Gitlab::SidekiqConfig do describe Gitlab::SidekiqConfig do
describe '.queues' do describe '.workers' do
let(:queues_file_path) { Rails.root.join('config', 'sidekiq_queues.yml') } it 'includes all workers' do
workers = described_class.workers
context 'without except argument' do expect(workers).to include(PostReceive)
it 'returns all queues defined on config/sidekiq_queues.yml file' do expect(workers).to include(MergeWorker)
expected_queues = YAML.load_file(queues_file_path)[:queues].map { |queue, _| queue } end
it 'includes EE workers' do
workers = described_class.workers
expect(workers).to include(RepositoryUpdateMirrorWorker)
expect(workers).to include(LdapGroupSyncWorker)
end
end
describe '.worker_queues' do
it 'includes all queues' do
queues = described_class.worker_queues
expect(described_class.queues).to eq(expected_queues) expect(queues).to include('post_receive')
end expect(queues).to include('merge')
expect(queues).to include('cronjob')
expect(queues).to include('mailers')
expect(queues).to include('default')
end end
context 'with except argument' do it 'includes EE queues' do
it 'returns queues on config/sidekiq_queues.yml filtering out excluded ones' do queues = described_class.worker_queues
expected_queues =
YAML.load_file(queues_file_path)[:queues].map { |queue, _| queue } - ['webhook']
expect(described_class.queues(except: ['webhook'])).to eq(expected_queues) expect(queues).to include('repository_update_mirror')
end expect(queues).to include('ldap_group_sync')
end end
end end
end end
...@@ -146,7 +146,7 @@ describe WebHookService do ...@@ -146,7 +146,7 @@ describe WebHookService do
let(:system_hook) { create(:system_hook) } let(:system_hook) { create(:system_hook) }
it 'enqueue WebHookWorker' do it 'enqueue WebHookWorker' do
expect(Sidekiq::Client).to receive(:enqueue).with(WebHookWorker, project_hook.id, data, 'push_hooks') expect(WebHookWorker).to receive(:perform_async).with(project_hook.id, data, 'push_hooks')
described_class.new(project_hook, data, 'push_hooks').async_execute described_class.new(project_hook, data, 'push_hooks').async_execute
end end
......
...@@ -65,7 +65,6 @@ describe AuthorizedProjectsWorker do ...@@ -65,7 +65,6 @@ describe AuthorizedProjectsWorker do
args_list = build_args_list(project.owner.id) args_list = build_args_list(project.owner.id)
push_bulk_args = { push_bulk_args = {
'class' => described_class, 'class' => described_class,
'queue' => described_class.sidekiq_options['queue'],
'args' => args_list 'args' => args_list
} }
......
...@@ -10,35 +10,4 @@ describe BackgroundMigrationWorker, :sidekiq do ...@@ -10,35 +10,4 @@ describe BackgroundMigrationWorker, :sidekiq do
described_class.new.perform('Foo', [10, 20]) described_class.new.perform('Foo', [10, 20])
end end
end end
describe '.perform_bulk' do
it 'enqueues background migrations in bulk' do
Sidekiq::Testing.fake! do
described_class.perform_bulk([['Foo', [1]], ['Foo', [2]]])
expect(described_class.jobs.count).to eq 2
expect(described_class.jobs).to all(include('enqueued_at'))
end
end
end
describe '.perform_bulk_in' do
context 'when delay is valid' do
it 'correctly schedules background migrations' do
Sidekiq::Testing.fake! do
described_class.perform_bulk_in(1.minute, [['Foo', [1]], ['Foo', [2]]])
expect(described_class.jobs.count).to eq 2
expect(described_class.jobs).to all(include('at'))
end
end
end
context 'when delay is invalid' do
it 'raises an ArgumentError exception' do
expect { described_class.perform_bulk_in(-60, [['Foo']]) }
.to raise_error(ArgumentError)
end
end
end
end end
require 'spec_helper'
describe ApplicationWorker do
let(:worker) do
Class.new do
def self.name
'Gitlab::Foo::Bar::DummyWorker'
end
include ApplicationWorker
end
end
describe 'Sidekiq options' do
it 'sets the queue name based on the class name' do
expect(worker.sidekiq_options['queue']).to eq('foo_bar_dummy')
end
end
describe '.queue' do
it 'returns the queue name' do
worker.sidekiq_options queue: :some_queue
expect(worker.queue).to eq('some_queue')
end
end
describe '.bulk_perform_async' do
it 'enqueues jobs in bulk' do
Sidekiq::Testing.fake! do
worker.bulk_perform_async([['Foo', [1]], ['Foo', [2]]])
expect(worker.jobs.count).to eq 2
expect(worker.jobs).to all(include('enqueued_at'))
end
end
end
describe '.bulk_perform_in' do
context 'when delay is valid' do
it 'correctly schedules jobs' do
Sidekiq::Testing.fake! do
worker.bulk_perform_in(1.minute, [['Foo', [1]], ['Foo', [2]]])
expect(worker.jobs.count).to eq 2
expect(worker.jobs).to all(include('at'))
end
end
end
context 'when delay is invalid' do
it 'raises an ArgumentError exception' do
expect { worker.bulk_perform_in(-60, [['Foo']]) }
.to raise_error(ArgumentError)
end
end
end
end
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe ClusterQueue do describe ClusterQueue do
let(:worker) do let(:worker) do
Class.new do Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include ClusterQueue include ClusterQueue
end end
end end
......
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe CronjobQueue do describe CronjobQueue do
let(:worker) do let(:worker) do
Class.new do Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include CronjobQueue include CronjobQueue
end end
end end
......
require 'spec_helper'
describe DedicatedSidekiqQueue do
let(:worker) do
Class.new do
def self.name
'Foo::Bar::DummyWorker'
end
include Sidekiq::Worker
include DedicatedSidekiqQueue
end
end
describe 'queue names' do
it 'sets the queue name based on the class name' do
expect(worker.sidekiq_options['queue']).to eq('foo_bar_dummy')
end
end
end
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe GeoQueue do describe GeoQueue do
let(:worker) do let(:worker) do
Class.new do Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include GeoQueue include GeoQueue
end end
end end
......
...@@ -3,6 +3,10 @@ require 'spec_helper' ...@@ -3,6 +3,10 @@ require 'spec_helper'
describe Gitlab::GithubImport::ObjectImporter do describe Gitlab::GithubImport::ObjectImporter do
let(:worker) do let(:worker) do
Class.new do Class.new do
def self.name
'DummyWorker'
end
include(Gitlab::GithubImport::ObjectImporter) include(Gitlab::GithubImport::ObjectImporter)
def counter_name def counter_name
......
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe Gitlab::GithubImport::Queue do describe Gitlab::GithubImport::Queue do
it 'sets the Sidekiq options for the worker' do it 'sets the Sidekiq options for the worker' do
worker = Class.new do worker = Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include Gitlab::GithubImport::Queue include Gitlab::GithubImport::Queue
end end
......
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe PipelineQueue do describe PipelineQueue do
let(:worker) do let(:worker) do
Class.new do Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include PipelineQueue include PipelineQueue
end end
end end
......
...@@ -3,7 +3,11 @@ require 'spec_helper' ...@@ -3,7 +3,11 @@ require 'spec_helper'
describe RepositoryCheckQueue do describe RepositoryCheckQueue do
let(:worker) do let(:worker) do
Class.new do Class.new do
include Sidekiq::Worker def self.name
'DummyWorker'
end
include ApplicationWorker
include RepositoryCheckQueue include RepositoryCheckQueue
end end
end end
......
require 'spec_helper' require 'spec_helper'
describe 'Every Sidekiq worker' do describe 'Every Sidekiq worker' do
let(:workers) do it 'includes ApplicationWorker' do
root = Rails.root.join('app', 'workers') expect(Gitlab::SidekiqConfig.workers).to all(include(ApplicationWorker))
concerns = root.join('concerns').to_s
ee_modules = root.join('ee').to_s
workers = Dir[root.join('**', '*.rb')]
.reject { |path| path.start_with?(concerns, ee_modules) }
workers.map do |path|
ns = Pathname.new(path).relative_path_from(root).to_s.gsub('.rb', '')
ns.camelize.constantize
end
end end
it 'does not use the default queue' do it 'does not use the default queue' do
workers.each do |worker| expect(Gitlab::SidekiqConfig.workers.map(&:queue)).not_to include('default')
expect(worker.sidekiq_options['queue'].to_s).not_to eq('default')
end
end end
it 'uses the cronjob queue when the worker runs as a cronjob' do it 'uses the cronjob queue when the worker runs as a cronjob' do
cron_workers = Settings.cron_jobs expect(Gitlab::SidekiqConfig.cron_workers.map(&:queue)).to all(eq('cronjob'))
.map { |job_name, options| options['job_class'].constantize }
.to_set
workers.each do |worker|
next unless cron_workers.include?(worker)
expect(worker.sidekiq_options['queue'].to_s).to eq('cronjob')
end
end end
it 'defines the queue in the Sidekiq configuration file' do it 'defines the queue in the Sidekiq configuration file' do
config = YAML.load_file(Rails.root.join('config', 'sidekiq_queues.yml').to_s) config_queue_names = Gitlab::SidekiqConfig.config_queues.to_set
queue_names = config[:queues].map { |(queue, _)| queue }.to_set
workers.each do |worker| expect(Gitlab::SidekiqConfig.worker_queues).to all(be_in(config_queue_names))
expect(queue_names).to include(worker.sidekiq_options['queue'].to_s)
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment