Commit a5ddd4e6 authored by Sean McGivern's avatar Sean McGivern

Move NotificationService calls to Sidekiq

The NotificationService has to do quite a lot of work to calculate the
recipients for an email. Where possible, we should try to avoid doing this in an
HTTP request, because the mail are sent by Sidekiq anyway, so there's no need to
schedule those emails immediately.

This commit creates a generic Sidekiq worker that uses Global ID to serialise
and deserialise its arguments, then forwards them to the NotificationService.
The NotificationService gains an `#async` method, so you can replace:

    notification_service.new_issue(issue, current_user)

With:

    notification_service.async.new_issue(issue, current_user)

And have everything else work as normal, except that calculating the recipients
will be done by Sidekiq, which will then schedule further Sidekiq jobs to send
each email.
parent 4bd51416
...@@ -26,7 +26,7 @@ module Issues ...@@ -26,7 +26,7 @@ module Issues
issue.update(closed_by: current_user) issue.update(closed_by: current_user)
event_service.close_issue(issue, current_user) event_service.close_issue(issue, current_user)
create_note(issue, commit) if system_note create_note(issue, commit) if system_note
notification_service.close_issue(issue, current_user) if notifications notification_service.async.close_issue(issue, current_user) if notifications
todo_service.close_issue(issue, current_user) todo_service.close_issue(issue, current_user)
execute_hooks(issue, 'close') execute_hooks(issue, 'close')
invalidate_cache_counts(issue, users: issue.assignees) invalidate_cache_counts(issue, users: issue.assignees)
......
...@@ -141,7 +141,7 @@ module Issues ...@@ -141,7 +141,7 @@ module Issues
end end
def notify_participants def notify_participants
notification_service.issue_moved(@old_issue, @new_issue, @current_user) notification_service.async.issue_moved(@old_issue, @new_issue, @current_user)
end end
end end
end end
...@@ -6,7 +6,7 @@ module Issues ...@@ -6,7 +6,7 @@ module Issues
if issue.reopen if issue.reopen
event_service.reopen_issue(issue, current_user) event_service.reopen_issue(issue, current_user)
create_note(issue, 'reopened') create_note(issue, 'reopened')
notification_service.reopen_issue(issue, current_user) notification_service.async.reopen_issue(issue, current_user)
execute_hooks(issue, 'reopen') execute_hooks(issue, 'reopen')
invalidate_cache_counts(issue, users: issue.assignees) invalidate_cache_counts(issue, users: issue.assignees)
issue.update_project_counter_caches issue.update_project_counter_caches
......
...@@ -30,7 +30,7 @@ module Issues ...@@ -30,7 +30,7 @@ module Issues
if issue.assignees != old_assignees if issue.assignees != old_assignees
create_assignee_note(issue, old_assignees) create_assignee_note(issue, old_assignees)
notification_service.reassigned_issue(issue, current_user, old_assignees) notification_service.async.reassigned_issue(issue, current_user, old_assignees)
todo_service.reassigned_issue(issue, current_user, old_assignees) todo_service.reassigned_issue(issue, current_user, old_assignees)
end end
...@@ -41,13 +41,13 @@ module Issues ...@@ -41,13 +41,13 @@ module Issues
added_labels = issue.labels - old_labels added_labels = issue.labels - old_labels
if added_labels.present? if added_labels.present?
notification_service.relabeled_issue(issue, added_labels, current_user) notification_service.async.relabeled_issue(issue, added_labels, current_user)
end end
added_mentions = issue.mentioned_users - old_mentioned_users added_mentions = issue.mentioned_users - old_mentioned_users
if added_mentions.present? if added_mentions.present?
notification_service.new_mentions_in_issue(issue, added_mentions, current_user) notification_service.async.new_mentions_in_issue(issue, added_mentions, current_user)
end end
end end
......
...@@ -10,7 +10,7 @@ module MergeRequests ...@@ -10,7 +10,7 @@ module MergeRequests
if merge_request.close if merge_request.close
create_event(merge_request) create_event(merge_request)
create_note(merge_request) create_note(merge_request)
notification_service.close_mr(merge_request, current_user) notification_service.async.close_mr(merge_request, current_user)
todo_service.close_merge_request(merge_request, current_user) todo_service.close_merge_request(merge_request, current_user)
execute_hooks(merge_request, 'close') execute_hooks(merge_request, 'close')
invalidate_cache_counts(merge_request, users: merge_request.assignees) invalidate_cache_counts(merge_request, users: merge_request.assignees)
......
...@@ -6,7 +6,7 @@ module MergeRequests ...@@ -6,7 +6,7 @@ module MergeRequests
if merge_request.reopen if merge_request.reopen
create_event(merge_request) create_event(merge_request)
create_note(merge_request, 'reopened') create_note(merge_request, 'reopened')
notification_service.reopen_mr(merge_request, current_user) notification_service.async.reopen_mr(merge_request, current_user)
execute_hooks(merge_request, 'reopen') execute_hooks(merge_request, 'reopen')
merge_request.reload_diff(current_user) merge_request.reload_diff(current_user)
merge_request.mark_as_unchecked merge_request.mark_as_unchecked
......
...@@ -4,7 +4,7 @@ module MergeRequests ...@@ -4,7 +4,7 @@ module MergeRequests
return unless merge_request.discussions_resolved? return unless merge_request.discussions_resolved?
SystemNoteService.resolve_all_discussions(merge_request, project, current_user) SystemNoteService.resolve_all_discussions(merge_request, project, current_user)
notification_service.resolve_all_discussions(merge_request, current_user) notification_service.async.resolve_all_discussions(merge_request, current_user)
end end
end end
end end
...@@ -34,6 +34,7 @@ module MergeRequests ...@@ -34,6 +34,7 @@ module MergeRequests
merge_request merge_request
end end
# rubocop:disable Metrics/AbcSize
def handle_changes(merge_request, options) def handle_changes(merge_request, options)
old_associations = options.fetch(:old_associations, {}) old_associations = options.fetch(:old_associations, {})
old_labels = old_associations.fetch(:labels, []) old_labels = old_associations.fetch(:labels, [])
...@@ -57,8 +58,11 @@ module MergeRequests ...@@ -57,8 +58,11 @@ module MergeRequests
end end
if merge_request.previous_changes.include?('assignee_id') if merge_request.previous_changes.include?('assignee_id')
old_assignee_id = merge_request.previous_changes['assignee_id'].first
old_assignee = User.find(old_assignee_id) if old_assignee_id
create_assignee_note(merge_request) create_assignee_note(merge_request)
notification_service.reassigned_merge_request(merge_request, current_user) notification_service.async.reassigned_merge_request(merge_request, current_user, old_assignee)
todo_service.reassigned_merge_request(merge_request, current_user) todo_service.reassigned_merge_request(merge_request, current_user)
end end
...@@ -69,7 +73,7 @@ module MergeRequests ...@@ -69,7 +73,7 @@ module MergeRequests
added_labels = merge_request.labels - old_labels added_labels = merge_request.labels - old_labels
if added_labels.present? if added_labels.present?
notification_service.relabeled_merge_request( notification_service.async.relabeled_merge_request(
merge_request, merge_request,
added_labels, added_labels,
current_user current_user
...@@ -78,13 +82,14 @@ module MergeRequests ...@@ -78,13 +82,14 @@ module MergeRequests
added_mentions = merge_request.mentioned_users - old_mentioned_users added_mentions = merge_request.mentioned_users - old_mentioned_users
if added_mentions.present? if added_mentions.present?
notification_service.new_mentions_in_merge_request( notification_service.async.new_mentions_in_merge_request(
merge_request, merge_request,
added_mentions, added_mentions,
current_user current_user
) )
end end
end end
# rubocop:enable Metrics/AbcSize
def merge_from_quick_action(merge_request) def merge_from_quick_action(merge_request)
last_diff_sha = params.delete(:merge) last_diff_sha = params.delete(:merge)
......
...@@ -7,9 +7,34 @@ ...@@ -7,9 +7,34 @@
# Ex. # Ex.
# NotificationService.new.new_issue(issue, current_user) # NotificationService.new.new_issue(issue, current_user)
# #
# When calculating the recipients of a notification is expensive (for instance,
# in the new issue case), `#async` will make that calculation happen in Sidekiq
# instead:
#
# NotificationService.new.async.new_issue(issue, current_user)
#
class NotificationService class NotificationService
prepend EE::NotificationService prepend EE::NotificationService
class Async
attr_reader :parent
delegate :respond_to_missing, to: :parent
def initialize(parent)
@parent = parent
end
def method_missing(meth, *args)
return super unless parent.respond_to?(meth)
MailScheduler::NotificationServiceWorker.perform_async(meth.to_s, *args)
end
end
def async
@async ||= Async.new(self)
end
# Always notify user about ssh key added # Always notify user about ssh key added
# only if ssh key is not deploy key # only if ssh key is not deploy key
# #
...@@ -145,8 +170,23 @@ class NotificationService ...@@ -145,8 +170,23 @@ class NotificationService
# * merge_request assignee if their notification level is not Disabled # * merge_request assignee if their notification level is not Disabled
# * users with custom level checked with "reassign merge request" # * users with custom level checked with "reassign merge request"
# #
def reassigned_merge_request(merge_request, current_user) def reassigned_merge_request(merge_request, current_user, previous_assignee)
reassign_resource_email(merge_request, current_user, :reassigned_merge_request_email) recipients = NotificationRecipientService.build_recipients(
merge_request,
current_user,
action: "reassign",
previous_assignee: previous_assignee
)
recipients.each do |recipient|
mailer.reassigned_merge_request_email(
recipient.user.id,
merge_request.id,
previous_assignee&.id,
current_user.id,
recipient.reason
).deliver_later
end
end end
# When we add labels to a merge request we should send an email to: # When we add labels to a merge request we should send an email to:
...@@ -444,29 +484,6 @@ class NotificationService ...@@ -444,29 +484,6 @@ class NotificationService
end end
end end
def reassign_resource_email(target, current_user, method)
previous_assignee_id = previous_record(target, 'assignee_id')
previous_assignee = User.find_by(id: previous_assignee_id) if previous_assignee_id
recipients = NotificationRecipientService.build_recipients(
target,
current_user,
action: "reassign",
previous_assignee: previous_assignee
)
recipients.each do |recipient|
mailer.send(
method,
recipient.user.id,
target.id,
previous_assignee_id,
current_user.id,
recipient.reason
).deliver_later
end
end
def relabeled_resource_email(target, labels, current_user, method) def relabeled_resource_email(target, labels, current_user, method)
recipients = labels.flat_map { |l| l.subscribers(target.project) }.uniq recipients = labels.flat_map { |l| l.subscribers(target.project) }.uniq
recipients = notifiable_users( recipients = notifiable_users(
...@@ -518,14 +535,6 @@ class NotificationService ...@@ -518,14 +535,6 @@ class NotificationService
Notify Notify
end end
def previous_record(object, attribute)
return unless object && attribute
if object.previous_changes.include?(attribute)
object.previous_changes[attribute].first
end
end
private private
def recipients_for_pages_domain(domain) def recipients_for_pages_domain(domain)
......
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
- github_importer:github_import_stage_import_repository - github_importer:github_import_stage_import_repository
- mail_scheduler:mail_scheduler_issue_due - mail_scheduler:mail_scheduler_issue_due
- mail_scheduler:mail_scheduler_notification_service
- object_storage_upload - object_storage_upload
- object_storage:object_storage_background_move - object_storage:object_storage_background_move
......
...@@ -4,4 +4,8 @@ module MailSchedulerQueue ...@@ -4,4 +4,8 @@ module MailSchedulerQueue
included do included do
queue_namespace :mail_scheduler queue_namespace :mail_scheduler
end end
def notification_service
@notification_service ||= NotificationService.new
end
end end
...@@ -4,8 +4,6 @@ module MailScheduler ...@@ -4,8 +4,6 @@ module MailScheduler
include MailSchedulerQueue include MailSchedulerQueue
def perform(project_id) def perform(project_id)
notification_service = NotificationService.new
Issue.opened.due_tomorrow.in_projects(project_id).preload(:project).find_each do |issue| Issue.opened.due_tomorrow.in_projects(project_id).preload(:project).find_each do |issue|
notification_service.issue_due(issue) notification_service.issue_due(issue)
end end
......
require 'active_job/arguments'
module MailScheduler
class NotificationServiceWorker
include ApplicationWorker
include MailSchedulerQueue
def perform(meth, *args)
deserialized_args = ActiveJob::Arguments.deserialize(args)
notification_service.public_send(meth, *deserialized_args) # rubocop:disable GitlabSecurity/PublicSend
rescue ActiveJob::DeserializationError
end
def self.perform_async(*args)
super(*ActiveJob::Arguments.serialize(args))
end
end
end
---
title: Compute notification recipients in background jobs
merge_request:
author:
type: performance
...@@ -96,6 +96,37 @@ describe NotificationService, :mailer do ...@@ -96,6 +96,37 @@ describe NotificationService, :mailer do
it_should_behave_like 'participating by assignee notification' it_should_behave_like 'participating by assignee notification'
end end
describe '#async' do
let(:async) { notification.async }
set(:key) { create(:personal_key) }
it 'returns an Async object with the correct parent' do
expect(async).to be_a(described_class::Async)
expect(async.parent).to eq(notification)
end
context 'when receiving a public method' do
it 'schedules a MailScheduler::NotificationServiceWorker' do
expect(MailScheduler::NotificationServiceWorker)
.to receive(:perform_async).with('new_key', key)
async.new_key(key)
end
end
context 'when receiving a private method' do
it 'raises NoMethodError' do
expect { async.notifiable?(key) }.to raise_error(NoMethodError)
end
end
context 'when recieving a non-existent method' do
it 'raises NoMethodError' do
expect { async.foo(key) }.to raise_error(NoMethodError)
end
end
end
describe 'Keys' do describe 'Keys' do
describe '#new_key' do describe '#new_key' do
let(:key_options) { {} } let(:key_options) { {} }
...@@ -982,6 +1013,8 @@ describe NotificationService, :mailer do ...@@ -982,6 +1013,8 @@ describe NotificationService, :mailer do
let(:merge_request) { create :merge_request, source_project: project, assignee: create(:user), description: 'cc @participant' } let(:merge_request) { create :merge_request, source_project: project, assignee: create(:user), description: 'cc @participant' }
before do before do
project.add_master(merge_request.author)
project.add_master(merge_request.assignee)
build_team(merge_request.target_project) build_team(merge_request.target_project)
add_users_with_subscription(merge_request.target_project, merge_request) add_users_with_subscription(merge_request.target_project, merge_request)
update_custom_notification(:new_merge_request, @u_guest_custom, resource: project) update_custom_notification(:new_merge_request, @u_guest_custom, resource: project)
...@@ -1135,15 +1168,18 @@ describe NotificationService, :mailer do ...@@ -1135,15 +1168,18 @@ describe NotificationService, :mailer do
end end
describe '#reassigned_merge_request' do describe '#reassigned_merge_request' do
let(:current_user) { create(:user) }
before do before do
update_custom_notification(:reassign_merge_request, @u_guest_custom, resource: project) update_custom_notification(:reassign_merge_request, @u_guest_custom, resource: project)
update_custom_notification(:reassign_merge_request, @u_custom_global) update_custom_notification(:reassign_merge_request, @u_custom_global)
end end
it do it do
notification.reassigned_merge_request(merge_request, merge_request.author) notification.reassigned_merge_request(merge_request, current_user, merge_request.author)
should_email(merge_request.assignee) should_email(merge_request.assignee)
should_email(merge_request.author)
should_email(@u_watcher) should_email(@u_watcher)
should_email(@u_participant_mentioned) should_email(@u_participant_mentioned)
should_email(@subscriber) should_email(@subscriber)
...@@ -1158,7 +1194,7 @@ describe NotificationService, :mailer do ...@@ -1158,7 +1194,7 @@ describe NotificationService, :mailer do
end end
it 'adds "assigned" reason for new assignee' do it 'adds "assigned" reason for new assignee' do
notification.reassigned_merge_request(merge_request, merge_request.author) notification.reassigned_merge_request(merge_request, current_user, merge_request.author)
email = find_email_for(merge_request.assignee) email = find_email_for(merge_request.assignee)
...@@ -1168,7 +1204,7 @@ describe NotificationService, :mailer do ...@@ -1168,7 +1204,7 @@ describe NotificationService, :mailer do
it_behaves_like 'participating notifications' do it_behaves_like 'participating notifications' do
let(:participant) { create(:user, username: 'user-participant') } let(:participant) { create(:user, username: 'user-participant') }
let(:issuable) { merge_request } let(:issuable) { merge_request }
let(:notification_trigger) { notification.reassigned_merge_request(merge_request, @u_disabled) } let(:notification_trigger) { notification.reassigned_merge_request(merge_request, current_user, merge_request.author) }
end end
end end
......
...@@ -12,8 +12,8 @@ describe MailScheduler::IssueDueWorker do ...@@ -12,8 +12,8 @@ describe MailScheduler::IssueDueWorker do
create(:issue, :opened, project: project, due_date: 2.days.from_now) # due on another day create(:issue, :opened, project: project, due_date: 2.days.from_now) # due on another day
create(:issue, :opened, due_date: Date.tomorrow) # different project create(:issue, :opened, due_date: Date.tomorrow) # different project
expect_any_instance_of(NotificationService).to receive(:issue_due).with(issue1) expect(worker.notification_service).to receive(:issue_due).with(issue1)
expect_any_instance_of(NotificationService).to receive(:issue_due).with(issue2) expect(worker.notification_service).to receive(:issue_due).with(issue2)
worker.perform(project.id) worker.perform(project.id)
end end
......
require 'spec_helper'
describe MailScheduler::NotificationServiceWorker do
let(:worker) { described_class.new }
let(:method) { 'new_key' }
set(:key) { create(:personal_key) }
def serialize(*args)
ActiveJob::Arguments.serialize(args)
end
describe '#perform' do
it 'deserializes arguments from global IDs' do
expect(worker.notification_service).to receive(method).with(key)
worker.perform(method, *serialize(key))
end
context 'when the arguments cannot be deserialized' do
it 'does nothing' do
expect(worker.notification_service).not_to receive(method)
worker.perform(method, key.to_global_id.to_s.succ)
end
end
context 'when the method is not a public method' do
it 'raises NoMethodError' do
expect { worker.perform('notifiable?', *serialize(key)) }.to raise_error(NoMethodError)
end
end
end
describe '.perform_async' do
it 'serializes arguments as global IDs when scheduling' do
Sidekiq::Testing.fake! do
described_class.perform_async(method, key)
expect(described_class.jobs.count).to eq(1)
expect(described_class.jobs.first).to include('args' => [method, *serialize(key)])
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment