Commit 8814fda9 authored by Douwe Maan's avatar Douwe Maan

Merge branch 'move-notification-service-calls-to-sidekiq' into 'master'

Move NotificationService calls to Sidekiq

See merge request gitlab-org/gitlab-ee!5463
parents 15396b1a 846d9a39
......@@ -26,7 +26,7 @@ module Issues
issue.update(closed_by: current_user)
event_service.close_issue(issue, current_user)
create_note(issue, commit) if system_note
notification_service.close_issue(issue, current_user) if notifications
notification_service.async.close_issue(issue, current_user) if notifications
todo_service.close_issue(issue, current_user)
execute_hooks(issue, 'close')
invalidate_cache_counts(issue, users: issue.assignees)
......
......@@ -141,7 +141,7 @@ module Issues
end
def notify_participants
notification_service.issue_moved(@old_issue, @new_issue, @current_user)
notification_service.async.issue_moved(@old_issue, @new_issue, @current_user)
end
end
end
......@@ -6,7 +6,7 @@ module Issues
if issue.reopen
event_service.reopen_issue(issue, current_user)
create_note(issue, 'reopened')
notification_service.reopen_issue(issue, current_user)
notification_service.async.reopen_issue(issue, current_user)
execute_hooks(issue, 'reopen')
invalidate_cache_counts(issue, users: issue.assignees)
issue.update_project_counter_caches
......
......@@ -30,7 +30,7 @@ module Issues
if issue.assignees != old_assignees
create_assignee_note(issue, old_assignees)
notification_service.reassigned_issue(issue, current_user, old_assignees)
notification_service.async.reassigned_issue(issue, current_user, old_assignees)
todo_service.reassigned_issue(issue, current_user, old_assignees)
end
......@@ -41,13 +41,13 @@ module Issues
added_labels = issue.labels - old_labels
if added_labels.present?
notification_service.relabeled_issue(issue, added_labels, current_user)
notification_service.async.relabeled_issue(issue, added_labels, current_user)
end
added_mentions = issue.mentioned_users - old_mentioned_users
if added_mentions.present?
notification_service.new_mentions_in_issue(issue, added_mentions, current_user)
notification_service.async.new_mentions_in_issue(issue, added_mentions, current_user)
end
end
......
......@@ -10,7 +10,7 @@ module MergeRequests
if merge_request.close
create_event(merge_request)
create_note(merge_request)
notification_service.close_mr(merge_request, current_user)
notification_service.async.close_mr(merge_request, current_user)
todo_service.close_merge_request(merge_request, current_user)
execute_hooks(merge_request, 'close')
invalidate_cache_counts(merge_request, users: merge_request.assignees)
......
......@@ -6,7 +6,7 @@ module MergeRequests
if merge_request.reopen
create_event(merge_request)
create_note(merge_request, 'reopened')
notification_service.reopen_mr(merge_request, current_user)
notification_service.async.reopen_mr(merge_request, current_user)
execute_hooks(merge_request, 'reopen')
merge_request.reload_diff(current_user)
merge_request.mark_as_unchecked
......
......@@ -4,7 +4,7 @@ module MergeRequests
return unless merge_request.discussions_resolved?
SystemNoteService.resolve_all_discussions(merge_request, project, current_user)
notification_service.resolve_all_discussions(merge_request, current_user)
notification_service.async.resolve_all_discussions(merge_request, current_user)
end
end
end
......@@ -34,6 +34,7 @@ module MergeRequests
merge_request
end
# rubocop:disable Metrics/AbcSize
def handle_changes(merge_request, options)
old_associations = options.fetch(:old_associations, {})
old_labels = old_associations.fetch(:labels, [])
......@@ -57,8 +58,11 @@ module MergeRequests
end
if merge_request.previous_changes.include?('assignee_id')
old_assignee_id = merge_request.previous_changes['assignee_id'].first
old_assignee = User.find(old_assignee_id) if old_assignee_id
create_assignee_note(merge_request)
notification_service.reassigned_merge_request(merge_request, current_user)
notification_service.async.reassigned_merge_request(merge_request, current_user, old_assignee)
todo_service.reassigned_merge_request(merge_request, current_user)
end
......@@ -69,7 +73,7 @@ module MergeRequests
added_labels = merge_request.labels - old_labels
if added_labels.present?
notification_service.relabeled_merge_request(
notification_service.async.relabeled_merge_request(
merge_request,
added_labels,
current_user
......@@ -78,13 +82,14 @@ module MergeRequests
added_mentions = merge_request.mentioned_users - old_mentioned_users
if added_mentions.present?
notification_service.new_mentions_in_merge_request(
notification_service.async.new_mentions_in_merge_request(
merge_request,
added_mentions,
current_user
)
end
end
# rubocop:enable Metrics/AbcSize
def merge_from_quick_action(merge_request)
last_diff_sha = params.delete(:merge)
......
......@@ -7,9 +7,34 @@
# Ex.
# NotificationService.new.new_issue(issue, current_user)
#
# When calculating the recipients of a notification is expensive (for instance,
# in the new issue case), `#async` will make that calculation happen in Sidekiq
# instead:
#
# NotificationService.new.async.new_issue(issue, current_user)
#
class NotificationService
prepend EE::NotificationService
class Async
attr_reader :parent
delegate :respond_to_missing, to: :parent
def initialize(parent)
@parent = parent
end
def method_missing(meth, *args)
return super unless parent.respond_to?(meth)
MailScheduler::NotificationServiceWorker.perform_async(meth.to_s, *args)
end
end
def async
@async ||= Async.new(self)
end
# Always notify user about ssh key added
# only if ssh key is not deploy key
#
......@@ -145,8 +170,23 @@ class NotificationService
# * merge_request assignee if their notification level is not Disabled
# * users with custom level checked with "reassign merge request"
#
def reassigned_merge_request(merge_request, current_user)
reassign_resource_email(merge_request, current_user, :reassigned_merge_request_email)
def reassigned_merge_request(merge_request, current_user, previous_assignee)
recipients = NotificationRecipientService.build_recipients(
merge_request,
current_user,
action: "reassign",
previous_assignee: previous_assignee
)
recipients.each do |recipient|
mailer.reassigned_merge_request_email(
recipient.user.id,
merge_request.id,
previous_assignee&.id,
current_user.id,
recipient.reason
).deliver_later
end
end
# When we add labels to a merge request we should send an email to:
......@@ -444,29 +484,6 @@ class NotificationService
end
end
def reassign_resource_email(target, current_user, method)
previous_assignee_id = previous_record(target, 'assignee_id')
previous_assignee = User.find_by(id: previous_assignee_id) if previous_assignee_id
recipients = NotificationRecipientService.build_recipients(
target,
current_user,
action: "reassign",
previous_assignee: previous_assignee
)
recipients.each do |recipient|
mailer.send(
method,
recipient.user.id,
target.id,
previous_assignee_id,
current_user.id,
recipient.reason
).deliver_later
end
end
def relabeled_resource_email(target, labels, current_user, method)
recipients = labels.flat_map { |l| l.subscribers(target.project) }.uniq
recipients = notifiable_users(
......@@ -518,14 +535,6 @@ class NotificationService
Notify
end
def previous_record(object, attribute)
return unless object && attribute
if object.previous_changes.include?(attribute)
object.previous_changes[attribute].first
end
end
private
def recipients_for_pages_domain(domain)
......
......@@ -41,6 +41,7 @@
- github_importer:github_import_stage_import_repository
- mail_scheduler:mail_scheduler_issue_due
- mail_scheduler:mail_scheduler_notification_service
- object_storage_upload
- object_storage:object_storage_background_move
......
......@@ -4,4 +4,8 @@ module MailSchedulerQueue
included do
queue_namespace :mail_scheduler
end
def notification_service
@notification_service ||= NotificationService.new
end
end
......@@ -4,8 +4,6 @@ module MailScheduler
include MailSchedulerQueue
def perform(project_id)
notification_service = NotificationService.new
Issue.opened.due_tomorrow.in_projects(project_id).preload(:project).find_each do |issue|
notification_service.issue_due(issue)
end
......
require 'active_job/arguments'
module MailScheduler
class NotificationServiceWorker
include ApplicationWorker
include MailSchedulerQueue
def perform(meth, *args)
deserialized_args = ActiveJob::Arguments.deserialize(args)
notification_service.public_send(meth, *deserialized_args) # rubocop:disable GitlabSecurity/PublicSend
rescue ActiveJob::DeserializationError
end
def self.perform_async(*args)
super(*ActiveJob::Arguments.serialize(args))
end
end
end
---
title: Compute notification recipients in background jobs
merge_request:
author:
type: performance
......@@ -10,7 +10,7 @@ module MergeRequests
mark_pending_todos_as_done(merge_request)
if merge_request.approvals_left.zero?
notification_service.approve_mr(merge_request, current_user)
notification_service.async.approve_mr(merge_request, current_user)
execute_hooks(merge_request, 'approved')
end
end
......
......@@ -14,7 +14,7 @@ module MergeRequests
create_note(merge_request)
if currently_approved
notification_service.unapprove_mr(merge_request, current_user)
notification_service.async.unapprove_mr(merge_request, current_user)
execute_hooks(merge_request, 'unapproved')
end
end
......
......@@ -71,9 +71,8 @@ describe MergeRequests::ApprovalService do
before do
expect(merge_request).to receive(:approvals_left).and_return(0)
allow(service).to receive(:notification_service).and_return(notification_service)
allow(service).to receive(:execute_hooks)
allow(notification_service).to receive(:approve_mr)
allow(service).to receive(:notification_service).and_return(notification_service)
end
it 'fires a webhook' do
......@@ -83,7 +82,7 @@ describe MergeRequests::ApprovalService do
end
it 'sends an email' do
expect(notification_service).to receive(:approve_mr).with(merge_request, user)
expect(notification_service).to receive_message_chain(:async, :approve_mr).with(merge_request, user)
service.execute(merge_request)
end
......
......@@ -3,8 +3,8 @@ require 'rails_helper'
describe MergeRequests::RemoveApprovalService do
describe '#execute' do
let(:user) { create(:user) }
let(:merge_request) { create(:merge_request) }
let(:project) { merge_request.project }
let(:project) { create(:project, approvals_before_merge: 1) }
let(:merge_request) { create(:merge_request, source_project: project) }
subject(:service) { described_class.new(project, user) }
......@@ -14,6 +14,8 @@ describe MergeRequests::RemoveApprovalService do
context 'with a user who has approved' do
before do
project.add_developer(create(:user))
merge_request.update!(approvals_before_merge: 2)
merge_request.approvals.create(user: user)
end
......@@ -30,7 +32,7 @@ describe MergeRequests::RemoveApprovalService do
end
it 'does not send a notification' do
expect(Notify).not_to receive(:unapprove_mr)
expect(service).not_to receive(:notification_service)
execute!
end
......@@ -43,16 +45,16 @@ describe MergeRequests::RemoveApprovalService do
end
context 'with an approved merge request' do
let(:notify) { Object.new }
let(:notification_service) { NotificationService.new }
before do
merge_request.update_attribute :approvals_before_merge, 1
merge_request.approvals.create(user: user)
allow(service).to receive(:notification_service).and_return(notify)
allow(service).to receive(:notification_service).and_return(notification_service)
end
it 'sends a notification' do
expect(notify).to receive(:unapprove_mr)
expect(notification_service).to receive_message_chain(:async, :unapprove_mr).with(merge_request, user)
execute!
end
end
......
......@@ -96,6 +96,37 @@ describe NotificationService, :mailer do
it_should_behave_like 'participating by assignee notification'
end
describe '#async' do
let(:async) { notification.async }
set(:key) { create(:personal_key) }
it 'returns an Async object with the correct parent' do
expect(async).to be_a(described_class::Async)
expect(async.parent).to eq(notification)
end
context 'when receiving a public method' do
it 'schedules a MailScheduler::NotificationServiceWorker' do
expect(MailScheduler::NotificationServiceWorker)
.to receive(:perform_async).with('new_key', key)
async.new_key(key)
end
end
context 'when receiving a private method' do
it 'raises NoMethodError' do
expect { async.notifiable?(key) }.to raise_error(NoMethodError)
end
end
context 'when recieving a non-existent method' do
it 'raises NoMethodError' do
expect { async.foo(key) }.to raise_error(NoMethodError)
end
end
end
describe 'Keys' do
describe '#new_key' do
let(:key_options) { {} }
......@@ -982,6 +1013,8 @@ describe NotificationService, :mailer do
let(:merge_request) { create :merge_request, source_project: project, assignee: create(:user), description: 'cc @participant' }
before do
project.add_master(merge_request.author)
project.add_master(merge_request.assignee)
build_team(merge_request.target_project)
add_users_with_subscription(merge_request.target_project, merge_request)
update_custom_notification(:new_merge_request, @u_guest_custom, resource: project)
......@@ -1135,15 +1168,18 @@ describe NotificationService, :mailer do
end
describe '#reassigned_merge_request' do
let(:current_user) { create(:user) }
before do
update_custom_notification(:reassign_merge_request, @u_guest_custom, resource: project)
update_custom_notification(:reassign_merge_request, @u_custom_global)
end
it do
notification.reassigned_merge_request(merge_request, merge_request.author)
notification.reassigned_merge_request(merge_request, current_user, merge_request.author)
should_email(merge_request.assignee)
should_email(merge_request.author)
should_email(@u_watcher)
should_email(@u_participant_mentioned)
should_email(@subscriber)
......@@ -1158,7 +1194,7 @@ describe NotificationService, :mailer do
end
it 'adds "assigned" reason for new assignee' do
notification.reassigned_merge_request(merge_request, merge_request.author)
notification.reassigned_merge_request(merge_request, current_user, merge_request.author)
email = find_email_for(merge_request.assignee)
......@@ -1168,7 +1204,7 @@ describe NotificationService, :mailer do
it_behaves_like 'participating notifications' do
let(:participant) { create(:user, username: 'user-participant') }
let(:issuable) { merge_request }
let(:notification_trigger) { notification.reassigned_merge_request(merge_request, @u_disabled) }
let(:notification_trigger) { notification.reassigned_merge_request(merge_request, current_user, merge_request.author) }
end
end
......
......@@ -12,8 +12,8 @@ describe MailScheduler::IssueDueWorker do
create(:issue, :opened, project: project, due_date: 2.days.from_now) # due on another day
create(:issue, :opened, due_date: Date.tomorrow) # different project
expect_any_instance_of(NotificationService).to receive(:issue_due).with(issue1)
expect_any_instance_of(NotificationService).to receive(:issue_due).with(issue2)
expect(worker.notification_service).to receive(:issue_due).with(issue1)
expect(worker.notification_service).to receive(:issue_due).with(issue2)
worker.perform(project.id)
end
......
require 'spec_helper'
describe MailScheduler::NotificationServiceWorker do
let(:worker) { described_class.new }
let(:method) { 'new_key' }
set(:key) { create(:personal_key) }
def serialize(*args)
ActiveJob::Arguments.serialize(args)
end
describe '#perform' do
it 'deserializes arguments from global IDs' do
expect(worker.notification_service).to receive(method).with(key)
worker.perform(method, *serialize(key))
end
context 'when the arguments cannot be deserialized' do
it 'does nothing' do
expect(worker.notification_service).not_to receive(method)
worker.perform(method, key.to_global_id.to_s.succ)
end
end
context 'when the method is not a public method' do
it 'raises NoMethodError' do
expect { worker.perform('notifiable?', *serialize(key)) }.to raise_error(NoMethodError)
end
end
end
describe '.perform_async' do
it 'serializes arguments as global IDs when scheduling' do
Sidekiq::Testing.fake! do
described_class.perform_async(method, key)
expect(described_class.jobs.count).to eq(1)
expect(described_class.jobs.first).to include('args' => [method, *serialize(key)])
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment