Commit 381468d0 authored by Nick Thomas's avatar Nick Thomas

Allow asynchronous rebase operations to be monitored

This MR introduces tracking of the `rebase_jid` for merge requests. As
with `merge_ongoing?`, `rebase_in_progress?` will now return true if a
rebase is proceeding in sidekiq.

After one release, we should remove the Gitaly-based lookup of rebases.
It is much better to track this kind of thing via the database.
parent 9ef0c855
...@@ -201,7 +201,7 @@ class Projects::MergeRequestsController < Projects::MergeRequests::ApplicationCo ...@@ -201,7 +201,7 @@ class Projects::MergeRequestsController < Projects::MergeRequests::ApplicationCo
end end
def rebase def rebase
RebaseWorker.perform_async(@merge_request.id, current_user.id) @merge_request.rebase_async(current_user.id)
head :ok head :ok
end end
......
...@@ -223,7 +223,13 @@ class MergeRequest < ApplicationRecord ...@@ -223,7 +223,13 @@ class MergeRequest < ApplicationRecord
end end
def rebase_in_progress? def rebase_in_progress?
strong_memoize(:rebase_in_progress) do (rebase_jid.present? && Gitlab::SidekiqStatus.running?(rebase_jid)) ||
gitaly_rebase_in_progress?
end
# TODO: remove the Gitaly lookup after v12.1, when rebase_jid will be reliable
def gitaly_rebase_in_progress?
strong_memoize(:gitaly_rebase_in_progress) do
# The source project can be deleted # The source project can be deleted
next false unless source_project next false unless source_project
...@@ -389,6 +395,26 @@ class MergeRequest < ApplicationRecord ...@@ -389,6 +395,26 @@ class MergeRequest < ApplicationRecord
update_column(:merge_jid, jid) update_column(:merge_jid, jid)
end end
# Set off a rebase asynchronously, atomically updating the `rebase_jid` of
# the MR so that the status of the operation can be tracked.
def rebase_async(user_id)
transaction do
lock!
raise ActiveRecord::StaleObjectError if !open? || rebase_in_progress?
# Although there is a race between setting rebase_jid here and clearing it
# in the RebaseWorker, it can't do any harm since we check both that the
# attribute is set *and* that the sidekiq job is still running. So a JID
# for a completed RebaseWorker is equivalent to a nil JID.
jid = Sidekiq::Worker.skipping_transaction_check do
RebaseWorker.perform_async(id, user_id)
end
update_column(:rebase_jid, jid)
end
end
def merge_participants def merge_participants
participants = [author] participants = [author]
......
...@@ -15,7 +15,7 @@ module MergeRequests ...@@ -15,7 +15,7 @@ module MergeRequests
end end
def rebase def rebase
if merge_request.rebase_in_progress? if merge_request.gitaly_rebase_in_progress?
log_error('Rebase task canceled: Another rebase is already in progress', save_message_on_model: true) log_error('Rebase task canceled: Another rebase is already in progress', save_message_on_model: true)
return false return false
end end
...@@ -27,6 +27,8 @@ module MergeRequests ...@@ -27,6 +27,8 @@ module MergeRequests
log_error(REBASE_ERROR, save_message_on_model: true) log_error(REBASE_ERROR, save_message_on_model: true)
log_error(e.message) log_error(e.message)
false false
ensure
merge_request.update_column(:rebase_jid, nil)
end end
end end
end end
# frozen_string_literal: true # frozen_string_literal: true
# The RebaseWorker must be wrapped in important concurrency code, so should only
# be scheduled via MergeRequest#rebase_async
class RebaseWorker class RebaseWorker
include ApplicationWorker include ApplicationWorker
......
---
title: Allow asynchronous rebase operations to be monitored
merge_request: 29940
author:
type: fixed
# frozen_string_literal: true
class AddMergeRequestRebaseJid < ActiveRecord::Migration[5.1]
DOWNTIME = false
def change
add_column :merge_requests, :rebase_jid, :string
end
end
...@@ -1989,6 +1989,7 @@ ActiveRecord::Schema.define(version: 20190628185004) do ...@@ -1989,6 +1989,7 @@ ActiveRecord::Schema.define(version: 20190628185004) do
t.boolean "allow_maintainer_to_push" t.boolean "allow_maintainer_to_push"
t.integer "state_id", limit: 2 t.integer "state_id", limit: 2
t.integer "approvals_before_merge" t.integer "approvals_before_merge"
t.string "rebase_jid"
t.index ["assignee_id"], name: "index_merge_requests_on_assignee_id", using: :btree t.index ["assignee_id"], name: "index_merge_requests_on_assignee_id", using: :btree
t.index ["author_id"], name: "index_merge_requests_on_author_id", using: :btree t.index ["author_id"], name: "index_merge_requests_on_author_id", using: :btree
t.index ["created_at"], name: "index_merge_requests_on_created_at", using: :btree t.index ["created_at"], name: "index_merge_requests_on_created_at", using: :btree
......
...@@ -1485,8 +1485,14 @@ PUT /projects/:id/merge_requests/:merge_request_iid/rebase ...@@ -1485,8 +1485,14 @@ PUT /projects/:id/merge_requests/:merge_request_iid/rebase
curl --header "PRIVATE-TOKEN: <your_access_token>" https://gitlab.example.com/api/v4/projects/76/merge_requests/1/rebase curl --header "PRIVATE-TOKEN: <your_access_token>" https://gitlab.example.com/api/v4/projects/76/merge_requests/1/rebase
``` ```
This is an asynchronous request. The API will return an empty `202 Accepted` This is an asynchronous request. The API will return a `202 Accepted` response
response if the request is enqueued successfully. if the request is enqueued successfully, with a response containing:
```json
{
"rebase_in_progress": true
}
```
You can poll the [Get single MR](#get-single-mr) endpoint with the You can poll the [Get single MR](#get-single-mr) endpoint with the
`include_rebase_in_progress` parameter to check the status of the `include_rebase_in_progress` parameter to check the status of the
......
...@@ -52,7 +52,10 @@ module API ...@@ -52,7 +52,10 @@ module API
rack_response({ 'message' => '404 Not found' }.to_json, 404) rack_response({ 'message' => '404 Not found' }.to_json, 404)
end end
rescue_from ::Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError do rescue_from(
::ActiveRecord::StaleObjectError,
::Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
) do
rack_response({ 'message' => '409 Conflict: Resource lock' }.to_json, 409) rack_response({ 'message' => '409 Conflict: Resource lock' }.to_json, 409)
end end
......
...@@ -429,9 +429,10 @@ module API ...@@ -429,9 +429,10 @@ module API
authorize_push_to_merge_request!(merge_request) authorize_push_to_merge_request!(merge_request)
RebaseWorker.perform_async(merge_request.id, current_user.id) merge_request.rebase_async(current_user.id)
status :accepted status :accepted
present rebase_in_progress: merge_request.rebase_in_progress?
end end
desc 'List issues that will be closed on merge' do desc 'List issues that will be closed on merge' do
......
...@@ -160,6 +160,7 @@ excluded_attributes: ...@@ -160,6 +160,7 @@ excluded_attributes:
- :milestone_id - :milestone_id
- :ref_fetched - :ref_fetched
- :merge_jid - :merge_jid
- :rebase_jid
- :latest_merge_request_diff_id - :latest_merge_request_diff_id
award_emoji: award_emoji:
- :awardable_id - :awardable_id
......
...@@ -7,6 +7,8 @@ describe MergeRequest do ...@@ -7,6 +7,8 @@ describe MergeRequest do
include ProjectForksHelper include ProjectForksHelper
include ReactiveCachingHelpers include ReactiveCachingHelpers
using RSpec::Parameterized::TableSyntax
subject { create(:merge_request) } subject { create(:merge_request) }
describe 'associations' do describe 'associations' do
...@@ -1996,6 +1998,47 @@ describe MergeRequest do ...@@ -1996,6 +1998,47 @@ describe MergeRequest do
end end
end end
describe '#rebase_async' do
let(:merge_request) { create(:merge_request) }
let(:user_id) { double(:user_id) }
let(:rebase_jid) { 'rebase-jid' }
subject(:execute) { merge_request.rebase_async(user_id) }
it 'atomically enqueues a RebaseWorker job and updates rebase_jid' do
expect(RebaseWorker)
.to receive(:perform_async)
.with(merge_request.id, user_id)
.and_return(rebase_jid)
expect(merge_request).to receive(:lock!).and_call_original
execute
expect(merge_request.rebase_jid).to eq(rebase_jid)
end
it 'refuses to enqueue a job if a rebase is in progress' do
merge_request.update_column(:rebase_jid, rebase_jid)
expect(RebaseWorker).not_to receive(:perform_async)
expect(Gitlab::SidekiqStatus)
.to receive(:running?)
.with(rebase_jid)
.and_return(true)
expect { execute }.to raise_error(ActiveRecord::StaleObjectError)
end
it 'refuses to enqueue a job if the MR is not open' do
merge_request.update_column(:state, 'foo')
expect(RebaseWorker).not_to receive(:perform_async)
expect { execute }.to raise_error(ActiveRecord::StaleObjectError)
end
end
describe '#mergeable?' do describe '#mergeable?' do
let(:project) { create(:project) } let(:project) { create(:project) }
...@@ -2946,7 +2989,32 @@ describe MergeRequest do ...@@ -2946,7 +2989,32 @@ describe MergeRequest do
end end
describe '#rebase_in_progress?' do describe '#rebase_in_progress?' do
shared_examples 'checking whether a rebase is in progress' do where(:rebase_jid, :jid_valid, :result) do
'foo' | true | true
'foo' | false | false
'' | true | false
nil | true | false
end
with_them do
let(:merge_request) { create(:merge_request) }
subject { merge_request.rebase_in_progress? }
it do
# Stub out the legacy gitaly implementation
allow(merge_request).to receive(:gitaly_rebase_in_progress?) { false }
allow(Gitlab::SidekiqStatus).to receive(:running?).with(rebase_jid) { jid_valid }
merge_request.rebase_jid = rebase_jid
is_expected.to eq(result)
end
end
end
describe '#gitaly_rebase_in_progress?' do
let(:repo_path) do let(:repo_path) do
Gitlab::GitalyClient::StorageSettings.allow_disk_access do Gitlab::GitalyClient::StorageSettings.allow_disk_access do
subject.source_project.repository.path subject.source_project.repository.path
...@@ -2981,7 +3049,6 @@ describe MergeRequest do ...@@ -2981,7 +3049,6 @@ describe MergeRequest do
expect(subject.rebase_in_progress?).to be_falsey expect(subject.rebase_in_progress?).to be_falsey
end end
end end
end
describe '#allow_collaboration' do describe '#allow_collaboration' do
let(:merge_request) do let(:merge_request) do
......
...@@ -2033,6 +2033,9 @@ describe API::MergeRequests do ...@@ -2033,6 +2033,9 @@ describe API::MergeRequests do
expect(response).to have_gitlab_http_status(202) expect(response).to have_gitlab_http_status(202)
expect(RebaseWorker.jobs.size).to eq(1) expect(RebaseWorker.jobs.size).to eq(1)
expect(merge_request.reload).to be_rebase_in_progress
expect(json_response['rebase_in_progress']).to be(true)
end end
it 'returns 403 if the user cannot push to the branch' do it 'returns 403 if the user cannot push to the branch' do
...@@ -2043,6 +2046,16 @@ describe API::MergeRequests do ...@@ -2043,6 +2046,16 @@ describe API::MergeRequests do
expect(response).to have_gitlab_http_status(403) expect(response).to have_gitlab_http_status(403)
end end
it 'returns 409 if a rebase is already in progress' do
Sidekiq::Testing.fake! do
merge_request.rebase_async(user.id)
put api("/projects/#{project.id}/merge_requests/#{merge_request.iid}/rebase", user)
end
expect(response).to have_gitlab_http_status(409)
end
end end
describe 'Time tracking' do describe 'Time tracking' do
......
...@@ -6,10 +6,12 @@ describe MergeRequests::RebaseService do ...@@ -6,10 +6,12 @@ describe MergeRequests::RebaseService do
include ProjectForksHelper include ProjectForksHelper
let(:user) { create(:user) } let(:user) { create(:user) }
let(:rebase_jid) { 'fake-rebase-jid' }
let(:merge_request) do let(:merge_request) do
create(:merge_request, create :merge_request,
source_branch: 'feature_conflict', source_branch: 'feature_conflict',
target_branch: 'master') target_branch: 'master',
rebase_jid: rebase_jid
end end
let(:project) { merge_request.project } let(:project) { merge_request.project }
let(:repository) { project.repository.raw } let(:repository) { project.repository.raw }
...@@ -23,11 +25,11 @@ describe MergeRequests::RebaseService do ...@@ -23,11 +25,11 @@ describe MergeRequests::RebaseService do
describe '#execute' do describe '#execute' do
context 'when another rebase is already in progress' do context 'when another rebase is already in progress' do
before do before do
allow(merge_request).to receive(:rebase_in_progress?).and_return(true) allow(merge_request).to receive(:gitaly_rebase_in_progress?).and_return(true)
end end
it 'saves the error message' do it 'saves the error message' do
subject.execute(merge_request) service.execute(merge_request)
expect(merge_request.reload.merge_error).to eq 'Rebase task canceled: Another rebase is already in progress' expect(merge_request.reload.merge_error).to eq 'Rebase task canceled: Another rebase is already in progress'
end end
...@@ -36,6 +38,13 @@ describe MergeRequests::RebaseService do ...@@ -36,6 +38,13 @@ describe MergeRequests::RebaseService do
expect(service.execute(merge_request)).to match(status: :error, expect(service.execute(merge_request)).to match(status: :error,
message: described_class::REBASE_ERROR) message: described_class::REBASE_ERROR)
end end
it 'clears rebase_jid' do
expect { service.execute(merge_request) }
.to change { merge_request.rebase_jid }
.from(rebase_jid)
.to(nil)
end
end end
shared_examples 'sequence of failure and success' do shared_examples 'sequence of failure and success' do
...@@ -43,14 +52,19 @@ describe MergeRequests::RebaseService do ...@@ -43,14 +52,19 @@ describe MergeRequests::RebaseService do
allow(repository).to receive(:gitaly_operation_client).and_raise('Something went wrong') allow(repository).to receive(:gitaly_operation_client).and_raise('Something went wrong')
service.execute(merge_request) service.execute(merge_request)
merge_request.reload
expect(merge_request.reload.merge_error).to eq described_class::REBASE_ERROR expect(merge_request.reload.merge_error).to eq(described_class::REBASE_ERROR)
expect(merge_request.rebase_jid).to eq(nil)
allow(repository).to receive(:gitaly_operation_client).and_call_original allow(repository).to receive(:gitaly_operation_client).and_call_original
merge_request.update!(rebase_jid: rebase_jid)
service.execute(merge_request) service.execute(merge_request)
merge_request.reload
expect(merge_request.reload.merge_error).to eq nil expect(merge_request.merge_error).to eq(nil)
expect(merge_request.rebase_jid).to eq(nil)
end end
end end
...@@ -72,7 +86,7 @@ describe MergeRequests::RebaseService do ...@@ -72,7 +86,7 @@ describe MergeRequests::RebaseService do
it 'saves a generic error message' do it 'saves a generic error message' do
subject.execute(merge_request) subject.execute(merge_request)
expect(merge_request.reload.merge_error).to eq described_class::REBASE_ERROR expect(merge_request.reload.merge_error).to eq(described_class::REBASE_ERROR)
end end
it 'returns an error' do it 'returns an error' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment