Commit 509910b8 authored by Yorick Peterse's avatar Yorick Peterse

Process commits in a separate worker

This moves the code used for processing commits from GitPushService to
its own Sidekiq worker: ProcessCommitWorker.

Using a Sidekiq worker allows us to process multiple commits in
parallel. This in turn will lead to issues being closed faster and cross
references being created faster. Furthermore by isolating this code into
a separate class it's easier to test and maintain the code.

The new worker also ensures it can efficiently check which issues can be
closed, without having to run numerous SQL queries for every issue.
parent f694f94c
...@@ -32,6 +32,8 @@ class IssueCollection ...@@ -32,6 +32,8 @@ class IssueCollection
end end
end end
alias_method :visible_to, :updatable_by_user
private private
def project_ids def project_ids
......
...@@ -18,6 +18,6 @@ class IssuePolicy < IssuablePolicy ...@@ -18,6 +18,6 @@ class IssuePolicy < IssuablePolicy
def can_read_confidential? def can_read_confidential?
return false unless @user return false unless @user
IssueCollection.new([@subject]).updatable_by_user(@user).any? IssueCollection.new([@subject]).visible_to(@user).any?
end end
end end
...@@ -105,35 +105,11 @@ class GitPushService < BaseService ...@@ -105,35 +105,11 @@ class GitPushService < BaseService
# Extract any GFM references from the pushed commit messages. If the configured issue-closing regex is matched, # Extract any GFM references from the pushed commit messages. If the configured issue-closing regex is matched,
# close the referenced Issue. Create cross-reference Notes corresponding to any other referenced Mentionables. # close the referenced Issue. Create cross-reference Notes corresponding to any other referenced Mentionables.
def process_commit_messages def process_commit_messages
is_default_branch = is_default_branch? default = is_default_branch?
authors = Hash.new do |hash, commit|
email = commit.author_email
next hash[email] if hash.has_key?(email)
hash[email] = commit_user(commit)
end
@push_commits.each do |commit| @push_commits.each do |commit|
# Keep track of the issues that will be actually closed because they are on a default branch. ProcessCommitWorker.
# Hence, when creating cross-reference notes, the not-closed issues (on non-default branches) perform_async(project.id, current_user.id, commit.id, default)
# will also have cross-reference.
closed_issues = []
if is_default_branch
# Close issues if these commits were pushed to the project's default branch and the commit message matches the
# closing regex. Exclude any mentioned Issues from cross-referencing even if the commits are being pushed to
# a different branch.
closed_issues = commit.closes_issues(current_user)
closed_issues.each do |issue|
if can?(current_user, :update_issue, issue)
Issues::CloseService.new(project, authors[commit], {}).execute(issue, commit: commit)
end
end
end
commit.create_cross_references!(authors[commit], closed_issues)
update_issue_metrics(commit, authors)
end end
end end
...@@ -176,11 +152,4 @@ class GitPushService < BaseService ...@@ -176,11 +152,4 @@ class GitPushService < BaseService
def branch_name def branch_name
@branch_name ||= Gitlab::Git.ref_name(params[:ref]) @branch_name ||= Gitlab::Git.ref_name(params[:ref])
end end
def update_issue_metrics(commit, authors)
mentioned_issues = commit.all_references(authors[commit]).issues
Issue::Metrics.where(issue_id: mentioned_issues.map(&:id), first_mentioned_in_commit_at: nil).
update_all(first_mentioned_in_commit_at: commit.committed_date)
end
end end
module Issues module Issues
class CloseService < Issues::BaseService class CloseService < Issues::BaseService
# Closes the supplied issue if the current user is able to do so.
def execute(issue, commit: nil, notifications: true, system_note: true) def execute(issue, commit: nil, notifications: true, system_note: true)
return issue unless can?(current_user, :update_issue, issue) return issue unless can?(current_user, :update_issue, issue)
close_issue(issue,
commit: commit,
notifications: notifications,
system_note: system_note)
end
# Closes the supplied issue without checking if the user is authorized to
# do so.
#
# The code calling this method is responsible for ensuring that a user is
# allowed to close the given issue.
def close_issue(issue, commit: nil, notifications: true, system_note: true)
if project.jira_tracker? && project.jira_service.active if project.jira_tracker? && project.jira_service.active
project.jira_service.execute(commit, issue) project.jira_service.execute(commit, issue)
todo_service.close_issue(issue, current_user) todo_service.close_issue(issue, current_user)
......
# Worker for processing individiual commit messages pushed to a repository.
#
# Jobs for this worker are scheduled for every commit that is being pushed. As a
# result of this the workload of this worker should be kept to a bare minimum.
# Consider using an extra worker if you need to add any extra (and potentially
# slow) processing of commits.
class ProcessCommitWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
# project_id - The ID of the project this commit belongs to.
# user_id - The ID of the user that pushed the commit.
# commit_sha - The SHA1 of the commit to process.
# default - The data was pushed to the default branch.
def perform(project_id, user_id, commit_sha, default = false)
project = Project.find_by(id: project_id)
return unless project
user = User.find_by(id: user_id)
return unless user
commit = find_commit(project, commit_sha)
return unless commit
author = commit.author || user
process_commit_message(project, commit, user, author, default)
update_issue_metrics(commit, author)
end
def process_commit_message(project, commit, user, author, default = false)
closed_issues = default ? commit.closes_issues(user) : []
unless closed_issues.empty?
close_issues(project, user, author, commit, closed_issues)
end
commit.create_cross_references!(author, closed_issues)
end
def close_issues(project, user, author, commit, issues)
# We don't want to run permission related queries for every single issue,
# therefor we use IssueCollection here and skip the authorization check in
# Issues::CloseService#execute.
IssueCollection.new(issues).updatable_by_user(user).each do |issue|
Issues::CloseService.new(project, author).
close_issue(issue, commit: commit)
end
end
def update_issue_metrics(commit, author)
mentioned_issues = commit.all_references(author).issues
Issue::Metrics.where(issue_id: mentioned_issues.map(&:id), first_mentioned_in_commit_at: nil).
update_all(first_mentioned_in_commit_at: commit.committed_date)
end
private
def find_commit(project, sha)
project.commit(sha)
end
end
---
title: Process commits using a dedicated Sidekiq worker
merge_request: 6802
author:
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
- [post_receive, 5] - [post_receive, 5]
- [merge, 5] - [merge, 5]
- [update_merge_requests, 3] - [update_merge_requests, 3]
- [process_commit, 2]
- [new_note, 2] - [new_note, 2]
- [build, 2] - [build, 2]
- [pipeline, 2] - [pipeline, 2]
......
...@@ -55,4 +55,13 @@ describe IssueCollection do ...@@ -55,4 +55,13 @@ describe IssueCollection do
end end
end end
end end
describe '#visible_to' do
it 'is an alias for updatable_by_user' do
updatable_by_user = described_class.instance_method(:updatable_by_user)
visible_to = described_class.instance_method(:visible_to)
expect(visible_to).to eq(updatable_by_user)
end
end
end end
...@@ -302,6 +302,9 @@ describe GitPushService, services: true do ...@@ -302,6 +302,9 @@ describe GitPushService, services: true do
author_email: commit_author.email author_email: commit_author.email
) )
allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit).
and_return(commit)
allow(project.repository).to receive(:commits_between).and_return([commit]) allow(project.repository).to receive(:commits_between).and_return([commit])
end end
...@@ -357,6 +360,9 @@ describe GitPushService, services: true do ...@@ -357,6 +360,9 @@ describe GitPushService, services: true do
committed_date: commit_time committed_date: commit_time
) )
allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit).
and_return(commit)
allow(project.repository).to receive(:commits_between).and_return([commit]) allow(project.repository).to receive(:commits_between).and_return([commit])
end end
...@@ -393,6 +399,9 @@ describe GitPushService, services: true do ...@@ -393,6 +399,9 @@ describe GitPushService, services: true do
allow(project.repository).to receive(:commits_between). allow(project.repository).to receive(:commits_between).
and_return([closing_commit]) and_return([closing_commit])
allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit).
and_return(closing_commit)
project.team << [commit_author, :master] project.team << [commit_author, :master]
end end
......
...@@ -15,10 +15,39 @@ describe Issues::CloseService, services: true do ...@@ -15,10 +15,39 @@ describe Issues::CloseService, services: true do
end end
describe '#execute' do describe '#execute' do
let(:service) { described_class.new(project, user) }
it 'checks if the user is authorized to update the issue' do
expect(service).to receive(:can?).with(user, :update_issue, issue).
and_call_original
service.execute(issue)
end
it 'does not close the issue when the user is not authorized to do so' do
allow(service).to receive(:can?).with(user, :update_issue, issue).
and_return(false)
expect(service).not_to receive(:close_issue)
expect(service.execute(issue)).to eq(issue)
end
it 'closes the issue when the user is authorized to do so' do
allow(service).to receive(:can?).with(user, :update_issue, issue).
and_return(true)
expect(service).to receive(:close_issue).
with(issue, commit: nil, notifications: true, system_note: true)
service.execute(issue)
end
end
describe '#close_issue' do
context "valid params" do context "valid params" do
before do before do
perform_enqueued_jobs do perform_enqueued_jobs do
described_class.new(project, user).execute(issue) described_class.new(project, user).close_issue(issue)
end end
end end
...@@ -41,24 +70,12 @@ describe Issues::CloseService, services: true do ...@@ -41,24 +70,12 @@ describe Issues::CloseService, services: true do
end end
end end
context 'current user is not authorized to close issue' do
before do
perform_enqueued_jobs do
described_class.new(project, guest).execute(issue)
end
end
it 'does not close the issue' do
expect(issue).to be_open
end
end
context 'when issue is not confidential' do context 'when issue is not confidential' do
it 'executes issue hooks' do it 'executes issue hooks' do
expect(project).to receive(:execute_hooks).with(an_instance_of(Hash), :issue_hooks) expect(project).to receive(:execute_hooks).with(an_instance_of(Hash), :issue_hooks)
expect(project).to receive(:execute_services).with(an_instance_of(Hash), :issue_hooks) expect(project).to receive(:execute_services).with(an_instance_of(Hash), :issue_hooks)
described_class.new(project, user).execute(issue) described_class.new(project, user).close_issue(issue)
end end
end end
...@@ -69,14 +86,14 @@ describe Issues::CloseService, services: true do ...@@ -69,14 +86,14 @@ describe Issues::CloseService, services: true do
expect(project).to receive(:execute_hooks).with(an_instance_of(Hash), :confidential_issue_hooks) expect(project).to receive(:execute_hooks).with(an_instance_of(Hash), :confidential_issue_hooks)
expect(project).to receive(:execute_services).with(an_instance_of(Hash), :confidential_issue_hooks) expect(project).to receive(:execute_services).with(an_instance_of(Hash), :confidential_issue_hooks)
described_class.new(project, user).execute(issue) described_class.new(project, user).close_issue(issue)
end end
end end
context 'external issue tracker' do context 'external issue tracker' do
before do before do
allow(project).to receive(:default_issues_tracker?).and_return(false) allow(project).to receive(:default_issues_tracker?).and_return(false)
described_class.new(project, user).execute(issue) described_class.new(project, user).close_issue(issue)
end end
it { expect(issue).to be_valid } it { expect(issue).to be_valid }
......
require 'spec_helper'
describe ProcessCommitWorker do
let(:worker) { described_class.new }
let(:user) { create(:user) }
let(:project) { create(:project, :public) }
let(:issue) { create(:issue, project: project, author: user) }
let(:commit) { project.commit }
describe '#perform' do
it 'does not process the commit when the project does not exist' do
expect(worker).not_to receive(:close_issues)
worker.perform(-1, user.id, commit.id)
end
it 'does not process the commit when the user does not exist' do
expect(worker).not_to receive(:close_issues)
worker.perform(project.id, -1, commit.id)
end
it 'does not process the commit when the commit no longer exists' do
expect(worker).not_to receive(:close_issues)
worker.perform(project.id, user.id, 'this-should-does-not-exist')
end
it 'processes the commit message' do
expect(worker).to receive(:process_commit_message).and_call_original
worker.perform(project.id, user.id, commit.id)
end
it 'updates the issue metrics' do
expect(worker).to receive(:update_issue_metrics).and_call_original
worker.perform(project.id, user.id, commit.id)
end
end
describe '#process_commit_message' do
context 'when pushing to the default branch' do
it 'closes issues that should be closed per the commit message' do
allow(commit).to receive(:safe_message).
and_return("Closes #{issue.to_reference}")
expect(worker).to receive(:close_issues).
with(project, user, user, commit, [issue])
worker.process_commit_message(project, commit, user, user, true)
end
end
context 'when pushing to a non-default branch' do
it 'does not close any issues' do
allow(commit).to receive(:safe_message).
and_return("Closes #{issue.to_reference}")
expect(worker).not_to receive(:close_issues)
worker.process_commit_message(project, commit, user, user, false)
end
end
it 'creates cross references' do
expect(commit).to receive(:create_cross_references!)
worker.process_commit_message(project, commit, user, user)
end
end
describe '#close_issues' do
context 'when the user can update the issues' do
it 'closes the issues' do
worker.close_issues(project, user, user, commit, [issue])
issue.reload
expect(issue.closed?).to eq(true)
end
end
context 'when the user can not update the issues' do
it 'does not close the issues' do
other_user = create(:user)
worker.close_issues(project, other_user, other_user, commit, [issue])
issue.reload
expect(issue.closed?).to eq(false)
end
end
end
describe '#update_issue_metrics' do
it 'updates any existing issue metrics' do
allow(commit).to receive(:safe_message).
and_return("Closes #{issue.to_reference}")
worker.update_issue_metrics(commit, user)
metric = Issue::Metrics.first
expect(metric.first_mentioned_in_commit_at).to eq(commit.committed_date)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment