Commit c16a5701 authored by Douwe Maan's avatar Douwe Maan Committed by Alejandro Rodríguez

Merge branch 'process-commit-worker-improvements' into 'master'

Pass commit data to ProcessCommitWorker

This changes `ProcessCommitWorker` so that it takes a Hash containing commit data instead of a commit SHA. This means the worker doesn't have to access Git just to process a commit message (and other data it may use). This in turn should solve the problem of ending up with 15 000-something jobs in the `process_commit` queue that take forever to process.

See merge request !7744
parent 0eecc941
...@@ -48,6 +48,10 @@ class Commit ...@@ -48,6 +48,10 @@ class Commit
max_lines: DIFF_HARD_LIMIT_LINES, max_lines: DIFF_HARD_LIMIT_LINES,
} }
end end
def from_hash(hash, project)
new(Gitlab::Git::Commit.new(hash), project)
end
end end
attr_accessor :raw attr_accessor :raw
......
...@@ -135,7 +135,7 @@ class GitPushService < BaseService ...@@ -135,7 +135,7 @@ class GitPushService < BaseService
@push_commits.each do |commit| @push_commits.each do |commit|
ProcessCommitWorker. ProcessCommitWorker.
perform_async(project.id, current_user.id, commit.id, default) perform_async(project.id, current_user.id, commit.to_hash, default)
end end
end end
......
...@@ -10,9 +10,10 @@ class ProcessCommitWorker ...@@ -10,9 +10,10 @@ class ProcessCommitWorker
# project_id - The ID of the project this commit belongs to. # project_id - The ID of the project this commit belongs to.
# user_id - The ID of the user that pushed the commit. # user_id - The ID of the user that pushed the commit.
# commit_sha - The SHA1 of the commit to process. # commit_hash - Hash containing commit details to use for constructing a
# Commit object without having to use the Git repository.
# default - The data was pushed to the default branch. # default - The data was pushed to the default branch.
def perform(project_id, user_id, commit_sha, default = false) def perform(project_id, user_id, commit_hash, default = false)
project = Project.find_by(id: project_id) project = Project.find_by(id: project_id)
return unless project return unless project
...@@ -21,10 +22,7 @@ class ProcessCommitWorker ...@@ -21,10 +22,7 @@ class ProcessCommitWorker
return unless user return unless user
commit = find_commit(project, commit_sha) commit = build_commit(project, commit_hash)
return unless commit
author = commit.author || user author = commit.author || user
process_commit_message(project, commit, user, author, default) process_commit_message(project, commit, user, author, default)
...@@ -59,9 +57,18 @@ class ProcessCommitWorker ...@@ -59,9 +57,18 @@ class ProcessCommitWorker
update_all(first_mentioned_in_commit_at: commit.committed_date) update_all(first_mentioned_in_commit_at: commit.committed_date)
end end
private def build_commit(project, hash)
date_suffix = '_date'
# When processing Sidekiq payloads various timestamps are stored as Strings.
# Commit in turn expects Time-like instances upon input, so we have to
# manually parse these values.
hash.each do |key, value|
if key.to_s.end_with?(date_suffix) && value.is_a?(String)
hash[key] = Time.parse(value)
end
end
def find_commit(project, sha) Commit.from_hash(hash, project)
project.commit(sha)
end end
end end
---
title: Pass commit data to ProcessCommitWorker to reduce Git overhead
merge_request: 7744
author:
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class MigrateProcessCommitWorkerJobs < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
class Project < ActiveRecord::Base
def self.find_including_path(id)
select("projects.*, CONCAT(namespaces.path, '/', projects.path) AS path_with_namespace").
joins('INNER JOIN namespaces ON namespaces.id = projects.namespace_id').
find_by(id: id)
end
def repository_storage_path
Gitlab.config.repositories.storages[repository_storage]
end
def repository_path
File.join(repository_storage_path, read_attribute(:path_with_namespace) + '.git')
end
def repository
@repository ||= Rugged::Repository.new(repository_path)
end
end
DOWNTIME = true
DOWNTIME_REASON = 'Existing workers will error until they are using a newer version of the code'
disable_ddl_transaction!
def up
Sidekiq.redis do |redis|
new_jobs = []
while job = redis.lpop('queue:process_commit')
payload = JSON.load(job)
project = Project.find_including_path(payload['args'][0])
next unless project
begin
commit = project.repository.lookup(payload['args'][2])
rescue Rugged::OdbError
next
end
hash = {
id: commit.oid,
message: commit.message,
parent_ids: commit.parent_ids,
authored_date: commit.author[:time],
author_name: commit.author[:name],
author_email: commit.author[:email],
committed_date: commit.committer[:time],
committer_email: commit.committer[:email],
committer_name: commit.committer[:name]
}
payload['args'][2] = hash
new_jobs << JSON.dump(payload)
end
redis.multi do |multi|
new_jobs.each do |j|
multi.lpush('queue:process_commit', j)
end
end
end
end
def down
Sidekiq.redis do |redis|
new_jobs = []
while job = redis.lpop('queue:process_commit')
payload = JSON.load(job)
payload['args'][2] = payload['args'][2]['id']
new_jobs << JSON.dump(payload)
end
redis.multi do |multi|
new_jobs.each do |j|
multi.lpush('queue:process_commit', j)
end
end
end
end
end
...@@ -321,6 +321,6 @@ describe Gitlab::CycleAnalytics::Events do ...@@ -321,6 +321,6 @@ describe Gitlab::CycleAnalytics::Events do
context.update(milestone: milestone) context.update(milestone: milestone)
mr = create_merge_request_closing_issue(context) mr = create_merge_request_closing_issue(context)
ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.sha) ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.to_hash)
end end
end end
require 'spec_helper'
require Rails.root.join('db', 'migrate', '20161124141322_migrate_process_commit_worker_jobs.rb')
describe MigrateProcessCommitWorkerJobs do
let(:project) { create(:project) }
let(:user) { create(:user) }
let(:commit) { project.commit.raw.raw_commit }
describe 'Project' do
describe 'find_including_path' do
it 'returns Project instances' do
expect(described_class::Project.find_including_path(project.id)).
to be_an_instance_of(described_class::Project)
end
it 'selects the full path for every Project' do
migration_project = described_class::Project.
find_including_path(project.id)
expect(migration_project[:path_with_namespace]).
to eq(project.path_with_namespace)
end
end
describe '#repository_storage_path' do
it 'returns the storage path for the repository' do
migration_project = described_class::Project.
find_including_path(project.id)
expect(File.directory?(migration_project.repository_storage_path)).
to eq(true)
end
end
describe '#repository_path' do
it 'returns the path to the repository' do
migration_project = described_class::Project.
find_including_path(project.id)
expect(File.directory?(migration_project.repository_path)).to eq(true)
end
end
describe '#repository' do
it 'returns a Rugged::Repository' do
migration_project = described_class::Project.
find_including_path(project.id)
expect(migration_project.repository).
to be_an_instance_of(Rugged::Repository)
end
end
end
describe '#up', :redis do
let(:migration) { described_class.new }
def job_count
Sidekiq.redis { |r| r.llen('queue:process_commit') }
end
before do
Sidekiq.redis do |redis|
job = JSON.dump(args: [project.id, user.id, commit.oid])
redis.lpush('queue:process_commit', job)
end
end
it 'skips jobs using a project that no longer exists' do
allow(described_class::Project).to receive(:find_including_path).
with(project.id).
and_return(nil)
migration.up
expect(job_count).to eq(0)
end
it 'skips jobs using commits that no longer exist' do
allow_any_instance_of(Rugged::Repository).to receive(:lookup).
with(commit.oid).
and_raise(Rugged::OdbError)
migration.up
expect(job_count).to eq(0)
end
it 'inserts migrated jobs back into the queue' do
migration.up
expect(job_count).to eq(1)
end
context 'a migrated job' do
let(:job) do
migration.up
JSON.load(Sidekiq.redis { |r| r.lpop('queue:process_commit') })
end
let(:commit_hash) do
job['args'][2]
end
it 'includes the project ID' do
expect(job['args'][0]).to eq(project.id)
end
it 'includes the user ID' do
expect(job['args'][1]).to eq(user.id)
end
it 'includes the commit ID' do
expect(commit_hash['id']).to eq(commit.oid)
end
it 'includes the commit message' do
expect(commit_hash['message']).to eq(commit.message)
end
it 'includes the parent IDs' do
expect(commit_hash['parent_ids']).to eq(commit.parent_ids)
end
it 'includes the author date' do
expect(commit_hash['authored_date']).to eq(commit.author[:time].to_s)
end
it 'includes the author name' do
expect(commit_hash['author_name']).to eq(commit.author[:name])
end
it 'includes the author Email' do
expect(commit_hash['author_email']).to eq(commit.author[:email])
end
it 'includes the commit date' do
expect(commit_hash['committed_date']).to eq(commit.committer[:time].to_s)
end
it 'includes the committer name' do
expect(commit_hash['committer_name']).to eq(commit.committer[:name])
end
it 'includes the committer Email' do
expect(commit_hash['committer_email']).to eq(commit.committer[:email])
end
end
end
describe '#down', :redis do
let(:migration) { described_class.new }
def job_count
Sidekiq.redis { |r| r.llen('queue:process_commit') }
end
before do
Sidekiq.redis do |redis|
job = JSON.dump(args: [project.id, user.id, commit.oid])
redis.lpush('queue:process_commit', job)
migration.up
end
end
it 'pushes migrated jobs back into the queue' do
migration.down
expect(job_count).to eq(1)
end
context 'a migrated job' do
let(:job) do
migration.down
JSON.load(Sidekiq.redis { |r| r.lpop('queue:process_commit') })
end
it 'includes the project ID' do
expect(job['args'][0]).to eq(project.id)
end
it 'includes the user ID' do
expect(job['args'][1]).to eq(user.id)
end
it 'includes the commit SHA' do
expect(job['args'][2]).to eq(commit.oid)
end
end
end
end
...@@ -302,4 +302,21 @@ eos ...@@ -302,4 +302,21 @@ eos
expect(commit.uri_type('this/path/doesnt/exist')).to be_nil expect(commit.uri_type('this/path/doesnt/exist')).to be_nil
end end
end end
describe '.from_hash' do
let(:new_commit) { described_class.from_hash(commit.to_hash, project) }
it 'returns a Commit' do
expect(new_commit).to be_an_instance_of(described_class)
end
it 'wraps a Gitlab::Git::Commit' do
expect(new_commit.raw).to be_an_instance_of(Gitlab::Git::Commit)
end
it 'stores the correct commit fields' do
expect(new_commit.id).to eq(commit.id)
expect(new_commit.message).to eq(commit.message)
end
end
end end
...@@ -135,6 +135,6 @@ describe 'cycle analytics events' do ...@@ -135,6 +135,6 @@ describe 'cycle analytics events' do
merge_merge_requests_closing_issue(issue) merge_merge_requests_closing_issue(issue)
ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.sha) ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.to_hash)
end end
end end
...@@ -263,7 +263,7 @@ describe GitPushService, services: true do ...@@ -263,7 +263,7 @@ describe GitPushService, services: true do
author_email: commit_author.email author_email: commit_author.email
) )
allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit). allow_any_instance_of(ProcessCommitWorker).to receive(:build_commit).
and_return(commit) and_return(commit)
allow(project.repository).to receive(:commits_between).and_return([commit]) allow(project.repository).to receive(:commits_between).and_return([commit])
...@@ -321,7 +321,7 @@ describe GitPushService, services: true do ...@@ -321,7 +321,7 @@ describe GitPushService, services: true do
committed_date: commit_time committed_date: commit_time
) )
allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit). allow_any_instance_of(ProcessCommitWorker).to receive(:build_commit).
and_return(commit) and_return(commit)
allow(project.repository).to receive(:commits_between).and_return([commit]) allow(project.repository).to receive(:commits_between).and_return([commit])
...@@ -360,7 +360,7 @@ describe GitPushService, services: true do ...@@ -360,7 +360,7 @@ describe GitPushService, services: true do
allow(project.repository).to receive(:commits_between). allow(project.repository).to receive(:commits_between).
and_return([closing_commit]) and_return([closing_commit])
allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit). allow_any_instance_of(ProcessCommitWorker).to receive(:build_commit).
and_return(closing_commit) and_return(closing_commit)
project.team << [commit_author, :master] project.team << [commit_author, :master]
......
...@@ -55,8 +55,12 @@ RSpec.configure do |config| ...@@ -55,8 +55,12 @@ RSpec.configure do |config|
config.around(:each, :redis) do |example| config.around(:each, :redis) do |example|
Gitlab::Redis.with(&:flushall) Gitlab::Redis.with(&:flushall)
Sidekiq.redis(&:flushall)
example.run example.run
Gitlab::Redis.with(&:flushall) Gitlab::Redis.with(&:flushall)
Sidekiq.redis(&:flushall)
end end
end end
......
...@@ -11,31 +11,25 @@ describe ProcessCommitWorker do ...@@ -11,31 +11,25 @@ describe ProcessCommitWorker do
it 'does not process the commit when the project does not exist' do it 'does not process the commit when the project does not exist' do
expect(worker).not_to receive(:close_issues) expect(worker).not_to receive(:close_issues)
worker.perform(-1, user.id, commit.id) worker.perform(-1, user.id, commit.to_hash)
end end
it 'does not process the commit when the user does not exist' do it 'does not process the commit when the user does not exist' do
expect(worker).not_to receive(:close_issues) expect(worker).not_to receive(:close_issues)
worker.perform(project.id, -1, commit.id) worker.perform(project.id, -1, commit.to_hash)
end
it 'does not process the commit when the commit no longer exists' do
expect(worker).not_to receive(:close_issues)
worker.perform(project.id, user.id, 'this-should-does-not-exist')
end end
it 'processes the commit message' do it 'processes the commit message' do
expect(worker).to receive(:process_commit_message).and_call_original expect(worker).to receive(:process_commit_message).and_call_original
worker.perform(project.id, user.id, commit.id) worker.perform(project.id, user.id, commit.to_hash)
end end
it 'updates the issue metrics' do it 'updates the issue metrics' do
expect(worker).to receive(:update_issue_metrics).and_call_original expect(worker).to receive(:update_issue_metrics).and_call_original
worker.perform(project.id, user.id, commit.id) worker.perform(project.id, user.id, commit.to_hash)
end end
end end
...@@ -106,4 +100,19 @@ describe ProcessCommitWorker do ...@@ -106,4 +100,19 @@ describe ProcessCommitWorker do
expect(metric.first_mentioned_in_commit_at).to eq(commit.committed_date) expect(metric.first_mentioned_in_commit_at).to eq(commit.committed_date)
end end
end end
describe '#build_commit' do
it 'returns a Commit' do
commit = worker.build_commit(project, id: '123')
expect(commit).to be_an_instance_of(Commit)
end
it 'parses date strings into Time instances' do
commit = worker.
build_commit(project, id: '123', authored_date: Time.now.to_s)
expect(commit.authored_date).to be_an_instance_of(Time)
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment