Commit 58432c3f authored by Fabio Pitino's avatar Fabio Pitino

Merge branch 'remove-tech-debts-on-refresh-mr-services' into 'master'

Refactoring merge train refresh service

See merge request gitlab-org/gitlab!51433
parents 959c8eff c75c8dc5
# frozen_string_literal: true
# This model represents the merge train metadata of a single merge request.
# Please note that, in product perspective, a merge train represents a group of merge requests,
# however, in ActiveRecord model perspective, it exists for _each_ merge request on the train.
class MergeTrain < ApplicationRecord
include Gitlab::Utils::StrongMemoize
include AfterCommitQueue
......@@ -81,25 +84,22 @@ class MergeTrain < ApplicationRecord
class << self
def all_active_mrs_in_train(target_project_id, target_branch)
MergeRequest.joins(:merge_train).merge(
MergeTrain.active.for_target(target_project_id, target_branch).by_id
all_cars(target_project_id, target_branch)
)
end
def first_in_train(target_project_id, target_branch)
all_active_mrs_in_train(target_project_id, target_branch).first
def all_cars(target_project_id, target_branch, limit: nil)
active.for_target(target_project_id, target_branch).by_id.limit(limit)
end
def first_in_trains(project)
MergeRequest.preload(:target_project).where(id: first_merge_request_ids(project))
def first_car(target_project_id, target_branch)
all_cars(target_project_id, target_branch).first
end
def first_in_train_from(merge_request_ids)
merge_request = MergeRequest.find(merge_request_ids.first)
all_active_mrs_in_train(merge_request.target_project_id, merge_request.target_branch).find_by(id: merge_request_ids)
end
def last_complete_mr_in_train(target_project_id, target_branch)
MergeRequest.find_by(id: last_complete_merge_train(target_project_id, target_branch))
def first_cars_in_trains(project)
active.where(target_project: project)
.select('DISTINCT ON (target_branch) *')
.order(:target_branch, :id)
end
def sha_exists_in_history?(target_project_id, target_branch, newrev, limit: 20)
......@@ -114,17 +114,6 @@ class MergeTrain < ApplicationRecord
private
def first_merge_request_ids(project)
MergeTrain.where(target_project: project)
.active
.select('DISTINCT ON (target_branch) merge_request_id')
.order(:target_branch, :id)
end
def last_complete_merge_train(target_project_id, target_branch)
complete_merge_trains(target_project_id, target_branch, limit: 1)
end
def complete_merge_trains(target_project_id, target_branch, limit:)
MergeTrain.for_target(target_project_id, target_branch)
.complete.order(id: :desc).select(:merge_request_id).limit(limit)
......@@ -132,11 +121,11 @@ class MergeTrain < ApplicationRecord
end
def all_next
self.class.all_active_mrs_in_train(target_project_id, target_branch).where('merge_trains.id > ?', id)
self.class.all_cars(target_project_id, target_branch).where('merge_trains.id > ?', id)
end
def all_prev
self.class.all_active_mrs_in_train(target_project_id, target_branch).where('merge_trains.id < ?', id)
self.class.all_cars(target_project_id, target_branch).where('merge_trains.id < ?', id)
end
def next
......@@ -152,7 +141,7 @@ class MergeTrain < ApplicationRecord
end
def previous_ref
prev&.train_ref_path || merge_request.target_branch_ref
prev&.merge_request&.train_ref_path || merge_request.target_branch_ref
end
def previous_ref_sha
......@@ -176,7 +165,11 @@ class MergeTrain < ApplicationRecord
end
def mergeable?
has_pipeline? && pipeline&.success? && first_in_train?
has_pipeline? && pipeline&.success? && first_car?
end
def first_car?
self.class.first_car(target_project_id, target_branch) == self
end
def cleanup_ref
......@@ -188,7 +181,7 @@ class MergeTrain < ApplicationRecord
end
def refresh_async
AutoMergeProcessWorker.perform_async(merge_request_id)
MergeTrains::RefreshWorker.perform_async(target_project_id, target_branch)
end
private
......@@ -196,8 +189,4 @@ class MergeTrain < ApplicationRecord
def has_pipeline?
pipeline_id.present? && pipeline
end
def first_in_train?
all_prev.none?
end
end
......@@ -15,18 +15,19 @@ module AutoMerge
def process(merge_request)
return unless merge_request.on_train?
::MergeTrains::RefreshMergeRequestsService.new(project, nil).execute(merge_request)
::MergeTrains::RefreshWorker
.perform_async(merge_request.target_project_id, merge_request.target_branch)
end
def cancel(merge_request)
# Before dropping a merge request from a merge train, get the next
# merge request in order to refresh it later.
next_merge_request = merge_request.merge_train&.next
next_car = merge_request.merge_train&.next
super do
if merge_request.merge_train&.destroy
SystemNoteService.cancel_merge_train(merge_request, project, current_user)
next_merge_request.merge_train.outdate_pipeline if next_merge_request
next_car.outdate_pipeline if next_car
end
end
end
......@@ -34,12 +35,12 @@ module AutoMerge
def abort(merge_request, reason, process_next: true)
# Before dropping a merge request from a merge train, get the next
# merge request in order to refresh it later.
next_merge_request = merge_request.merge_train&.next
next_car = merge_request.merge_train&.next
super(merge_request, reason) do
if merge_request.merge_train&.destroy
SystemNoteService.abort_merge_train(merge_request, project, current_user, reason)
next_merge_request.merge_train.outdate_pipeline if next_merge_request && process_next
next_car.outdate_pipeline if next_car && process_next
end
end
end
......
......@@ -90,8 +90,8 @@ module EE
def refresh_merge_trains(project)
return unless project.merge_pipelines_were_disabled?
MergeTrain.first_in_trains(project).each do |merge_request|
AutoMergeProcessWorker.perform_async(merge_request.id)
MergeTrain.first_cars_in_trains(project).each do |car|
MergeTrains::RefreshWorker.perform_async(car.target_project_id, car.target_branch)
end
end
end
......
......@@ -8,8 +8,8 @@ module MergeTrains
# If the new revision doesn't exist in the merge train history,
# that means there was an unexpected commit came from out of merge train cycle.
unless MergeTrain.sha_exists_in_history?(target_project.id, target_branch, newrev)
merge_request = MergeTrain.first_in_train(target_project.id, target_branch)
merge_request.merge_train.outdate_pipeline if merge_request
car = MergeTrain.first_car(target_project.id, target_branch)
car.outdate_pipeline if car
end
end
end
......
# frozen_string_literal: true
module MergeTrains
class RefreshMergeRequestsService < BaseService
include ::Gitlab::ExclusiveLeaseHelpers
include ::Gitlab::Utils::StrongMemoize
DEFAULT_CONCURRENCY = 20
##
# merge_request ... A merge request pointer in a merge train.
# All the merge requests following the specified merge request will be refreshed.
def execute(merge_request)
@merge_request = merge_request
return unless merge_request.on_train?
queue = Gitlab::BatchPopQueueing.new('merge_trains', queue_id)
result = queue.safe_execute([merge_request.id], lock_timeout: 15.minutes) do |items|
logging("Successfuly obtained the exclusive lock. Found merge requests to be refreshed", merge_request_ids: items.map(&:to_i))
first_merge_request = get_first_in_train(items)
unsafe_refresh(first_merge_request)
end
if result[:status] == :enqueued
logging("This merge request was enqueued because the exclusive lock is obtained by the other process.")
end
if result[:status] == :finished && result[:new_items].present?
logging("Found more merge requests to be refreshed", merge_request_ids: result[:new_items].map(&:to_i))
get_first_in_train(result[:new_items]).try do |first_merge_request|
logging("Rescheduled to refresh the merge train from", merge_request_ids: [first_merge_request.id])
AutoMergeProcessWorker.perform_async(first_merge_request.id)
end
end
end
private
attr_reader :merge_request
# TODO:
# As we changed the process flow to refresh merge requests from the begnning always,
# we don't use the `items` argument anymore.
# We should refactor the current logic to make this class more readable.
# See https://gitlab.com/gitlab-org/gitlab/-/issues/281065
def get_first_in_train(items)
MergeTrain.first_in_train(merge_request.target_project, merge_request.target_branch)
end
def unsafe_refresh(first_merge_request)
require_next_recreate = false
following_merge_requests_from(first_merge_request).each do |following_mr|
logging("Started refreshing", merge_request_ids: [following_mr.id])
break if following_mr.merge_train.index >= max_concurrency
result = MergeTrains::RefreshMergeRequestService
.new(following_mr.project, following_mr.merge_user,
require_recreate: require_next_recreate)
.execute(following_mr)
require_next_recreate = (result[:status] == :error || result[:pipeline_created])
end
end
def following_merge_requests_from(first_merge_request)
first_merge_request.merge_train.all_next.to_a.unshift(first_merge_request)
end
def queue_id
"#{merge_request.target_project_id}:#{merge_request.target_branch}"
end
def max_concurrency
strong_memoize(:max_concurrency) do
DEFAULT_CONCURRENCY
end
end
def logging(message, extra = {})
return unless Feature.enabled?(:ci_merge_train_logging, merge_request.project)
Sidekiq.logger.info(
{ class: self.class.to_s, args: [merge_request.id.to_s], message: message }.merge(extra)
)
end
end
end
# frozen_string_literal: true
module MergeTrains
# This class is to refresh all merge requests on the merge train that
# the given merge request belongs to.
#
# It performs a sequential update on all merge requests on the train.
# In order to prevent multiple sidekiq jobs from updating concurrently,
# the process attempts to obtain an exclusive lock at first.
# If the process successfully obtains the lock, the sequential refresh will be executed in this sidekiq job.
# If the process failed to obtain the lock, the refresh request is pushed to the queue in Redis.
# The queued refresh requests will be poped at once when the current process has finished.
class RefreshService < BaseService
include ::Gitlab::ExclusiveLeaseHelpers
include ::Gitlab::Utils::StrongMemoize
DEFAULT_CONCURRENCY = 20.freeze
TRAIN_PROCESSING_LOCK_TIMEOUT = 15.minutes.freeze
SIGNAL_FOR_REFRESH_REQUEST = 1.freeze
def execute(target_project_id, target_branch)
@target_project_id = target_project_id
@target_branch = target_branch
queue = Gitlab::BatchPopQueueing.new('merge_trains', queue_id)
result = queue.safe_execute([SIGNAL_FOR_REFRESH_REQUEST], lock_timeout: TRAIN_PROCESSING_LOCK_TIMEOUT) do |items|
unsafe_refresh
end
if result[:status] == :finished && result[:new_items].present?
MergeTrains::RefreshWorker.perform_async(target_project_id, target_branch)
end
end
private
attr_reader :target_project_id, :target_branch
def unsafe_refresh
require_next_recreate = false
MergeTrain.all_cars(target_project_id, target_branch, limit: DEFAULT_CONCURRENCY).each do |car|
result = MergeTrains::RefreshMergeRequestService
.new(car.target_project, car.user, require_recreate: require_next_recreate)
.execute(car.merge_request)
require_next_recreate = (result[:status] == :error || result[:pipeline_created])
end
end
def queue_id
"#{target_project_id}:#{target_branch}"
end
end
end
......@@ -3,6 +3,14 @@
#
# Do not edit it manually!
---
- :name: auto_merge:merge_trains_refresh
:feature_category: :continuous_integration
:has_external_dependencies:
:urgency: :low
:resource_boundary: :cpu
:weight: 3
:idempotent: true
:tags: []
- :name: cronjob:active_user_count_threshold
:feature_category: :provision
:has_external_dependencies:
......
# frozen_string_literal: true
module MergeTrains
class RefreshWorker
include ApplicationWorker
queue_namespace :auto_merge
feature_category :continuous_integration
worker_resource_boundary :cpu
deduplicate :until_executing
idempotent!
def perform(target_project_id, target_branch)
::MergeTrains::RefreshService
.new(nil, nil)
.execute(target_project_id, target_branch)
end
end
end
---
name: ci_merge_train_logging
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/40105
rollout_issue_url:
milestone: '13.4'
type: development
group: group::continuous integration
default_enabled: false
......@@ -113,22 +113,22 @@ RSpec.describe MergeTrain do
end
end
describe '.first_in_train' do
subject { described_class.first_in_train(target_project_id, target_branch) }
describe '.first_car' do
subject { described_class.first_car(target_project_id, target_branch) }
let(:target_project_id) { merge_request.target_project_id }
let(:target_branch) { merge_request.target_branch }
let!(:merge_request) { create_merge_request_on_train }
it 'returns the merge request' do
is_expected.to eq(merge_request)
is_expected.to eq(merge_request.merge_train)
end
context 'when the other merge request is on the merge train' do
let!(:merge_request_2) { create_merge_request_on_train(source_branch: 'improve/awesome') }
it 'returns the merge request' do
is_expected.to eq(merge_request)
is_expected.to eq(merge_request.merge_train)
end
end
......@@ -147,101 +147,24 @@ RSpec.describe MergeTrain do
end
end
describe '.first_in_trains' do
describe '.first_cars_in_trains' do
let!(:first_on_master) { create_merge_request_on_train(target_branch: 'master', source_branch: 'feature-1') }
let!(:second_on_master) { create_merge_request_on_train(target_branch: 'master', source_branch: 'feature-2') }
let!(:first_on_stable) { create_merge_request_on_train(target_branch: 'stable', source_branch: 'feature-1-backport') }
let!(:second_on_stable) { create_merge_request_on_train(target_branch: 'stable', source_branch: 'feature-2-backport') }
subject { described_class.first_in_trains(project) }
subject { described_class.first_cars_in_trains(project) }
it 'returns only first merge requests per merge train' do
is_expected.to contain_exactly(first_on_master, first_on_stable)
is_expected.to contain_exactly(first_on_master.merge_train, first_on_stable.merge_train)
end
context 'when first_on_master has already been merged' do
let!(:first_on_master) { create_merge_request_on_train(target_branch: 'master', source_branch: 'feature-1', status: :merged) }
it 'returns second on master as active MR' do
is_expected.to contain_exactly(second_on_master, first_on_stable)
end
end
end
describe '.first_in_train_from' do
subject { described_class.first_in_train_from(merge_request_ids) }
context 'when arguments is null' do
let(:merge_request_ids) { nil }
it 'raises an error' do
expect { subject }.to raise_error(NoMethodError)
end
end
context 'when there are two merge requests on the same merge train' do
let(:merge_request_ids) { [merge_request_1.id, merge_request_2.id] }
let!(:merge_request_1) { create_merge_request_on_train }
let!(:merge_request_2) { create_merge_request_on_train(source_branch: 'improve/awesome') }
it 'returns the first merge request on the merge train from the given ids' do
is_expected.to eq(merge_request_1)
end
context 'when the first merge request has already been merged' do
let!(:merge_request_1) { create_merge_request_on_train(status: :merged) }
it 'returns the first active merge request on the merge train from the given ids' do
is_expected.to eq(merge_request_2)
end
end
context "when specifies merge request 2's id only" do
let(:merge_request_ids) { [merge_request_2.id] }
it 'returns the first merge request on the merge train from the given ids' do
is_expected.to eq(merge_request_2)
end
end
end
end
describe '.last_complete_mr_in_train' do
subject { described_class.last_complete_mr_in_train(target_project_id, target_branch) }
let(:target_project_id) { project.id }
let(:target_branch) { 'master' }
context 'when there is a merge request on train' do
let!(:merge_request_1) { create_merge_request_on_train }
context 'when the merge request has already been merged' do
let!(:merge_request_1) { create_merge_request_on_train(status: :merged) }
it 'returns the merge request' do
is_expected.to eq(merge_request_1)
end
context 'when there is another merge request on train and it has been merged' do
let!(:merge_request_2) { create_merge_request_on_train(source_branch: 'improve/awesome', status: :merged) }
it 'returns the last merge request' do
is_expected.to eq(merge_request_2)
end
end
end
context 'when the merge request has not been merged yet' do
it 'returns nothing' do
is_expected.to be_nil
end
end
end
context 'when there are no merge requests on train' do
it 'returns nothing' do
is_expected.to be_nil
is_expected.to contain_exactly(second_on_master.merge_train, first_on_stable.merge_train)
end
end
end
......@@ -356,7 +279,7 @@ RSpec.describe MergeTrain do
let!(:merge_request_2) { create_merge_request_on_train(source_branch: 'improve/awesome') }
it 'returns the next merge requests' do
is_expected.to eq([merge_request_2])
is_expected.to eq([merge_request_2.merge_train])
end
end
end
......@@ -378,7 +301,7 @@ RSpec.describe MergeTrain do
let!(:merge_request_2) { create_merge_request_on_train(source_branch: 'improve/awesome') }
it 'returns the previous merge requests' do
is_expected.to eq([merge_request])
is_expected.to eq([merge_request.merge_train])
end
context 'when the previous merge request has already been merged' do
......@@ -407,7 +330,7 @@ RSpec.describe MergeTrain do
let!(:merge_request_2) { create_merge_request_on_train(source_branch: 'improve/awesome') }
it 'returns the next merge request' do
is_expected.to eq(merge_request_2)
is_expected.to eq(merge_request_2.merge_train)
end
end
end
......@@ -429,7 +352,7 @@ RSpec.describe MergeTrain do
let!(:merge_request_2) { create_merge_request_on_train(source_branch: 'improve/awesome') }
it 'returns the next merge request' do
is_expected.to eq(merge_request)
is_expected.to eq(merge_request.merge_train)
end
end
end
......@@ -637,8 +560,8 @@ RSpec.describe MergeTrain do
context 'and transits to stale' do
it 'refreshes asynchronously' do
expect(AutoMergeProcessWorker)
.to receive(:perform_async).with(merge_train.merge_request_id).once
expect(MergeTrains::RefreshWorker)
.to receive(:perform_async).with(merge_train.target_project_id, merge_train.target_branch).once
merge_train.outdate_pipeline!
end
......
......@@ -102,10 +102,11 @@ RSpec.describe AutoMerge::MergeTrainService do
target_project: project, target_branch: 'master')
end
it 'calls RefreshMergeRequestsService' do
expect_next_instance_of(MergeTrains::RefreshMergeRequestsService) do |service|
expect(service).to receive(:execute).with(merge_request)
end
it 'calls RefreshWorker' do
expect(MergeTrains::RefreshWorker)
.to receive(:perform_async)
.with(merge_request.target_project_id, merge_request.target_branch)
.once
subject
end
......@@ -113,8 +114,8 @@ RSpec.describe AutoMerge::MergeTrainService do
context 'when merge request is not on a merge train' do
let(:merge_request) { create(:merge_request) }
it 'does not call RefreshMergeRequestsService' do
expect(MergeTrains::RefreshMergeRequestsService).not_to receive(:new)
it 'does not call RefreshWorker' do
expect(MergeTrains::RefreshWorker).not_to receive(:perform_async)
subject
end
......@@ -193,8 +194,8 @@ RSpec.describe AutoMerge::MergeTrainService do
let(:status) { MergeTrain.state_machines[:status].states[:fresh].value }
it 'processes the next merge request on the train by default' do
expect(AutoMergeProcessWorker).to receive(:perform_async).with(merge_request_2.id)
it 'processes the train by default' do
expect(MergeTrains::RefreshWorker).to receive(:perform_async).with(merge_request_2.target_project_id, merge_request_2.target_branch)
subject
......@@ -205,7 +206,7 @@ RSpec.describe AutoMerge::MergeTrainService do
let(:status) { MergeTrain.state_machines[:status].states[:stale].value }
it 'does not do anything' do
expect(AutoMergeProcessWorker).not_to receive(:perform_async).with(merge_request_2.id)
expect(MergeTrains::RefreshWorker).not_to receive(:perform_async)
expect { subject }.not_to raise_error
......@@ -281,8 +282,8 @@ RSpec.describe AutoMerge::MergeTrainService do
status: MergeTrain.state_machines[:status].states[:fresh].value)
end
it 'processes the next merge request on the train' do
expect(AutoMergeProcessWorker).to receive(:perform_async).with(merge_request_2.id)
it 'processes the train' do
expect(MergeTrains::RefreshWorker).to receive(:perform_async).with(merge_request_2.target_project_id, merge_request_2.target_branch)
subject
......@@ -293,7 +294,7 @@ RSpec.describe AutoMerge::MergeTrainService do
let(:args) { { process_next: false } }
it 'does not process the next merge request on the train' do
expect(AutoMergeProcessWorker).not_to receive(:perform_async)
expect(MergeTrains::RefreshWorker).not_to receive(:perform_async)
subject
end
......
......@@ -2,7 +2,7 @@
require 'spec_helper'
RSpec.describe MergeTrains::RefreshMergeRequestsService do
RSpec.describe MergeTrains::RefreshService do
include ExclusiveLeaseHelpers
let(:project) { create(:project) }
......@@ -16,7 +16,7 @@ RSpec.describe MergeTrains::RefreshMergeRequestsService do
end
describe '#execute', :clean_gitlab_redis_queues do
subject { service.execute(merge_request) }
subject { service.execute(merge_request.target_project_id, merge_request.target_branch) }
let!(:merge_request_1) do
create(:merge_request, :on_train, train_creator: maintainer_1,
......@@ -45,28 +45,6 @@ RSpec.describe MergeTrains::RefreshMergeRequestsService do
allow(refresh_service_2).to receive(:execute) { refresh_service_2_result }
end
shared_examples 'logging results' do |count|
context 'when ci_merge_train_logging is enabled' do
it 'logs results' do
expect(Sidekiq.logger).to receive(:info).exactly(count).times
subject
end
end
context 'when ci_merge_train_logging is disabled' do
before do
stub_feature_flags(ci_merge_train_logging: false)
end
it 'does not log results' do
expect(Sidekiq.logger).not_to receive(:info)
subject
end
end
end
context 'when merge request 1 is passed' do
let(:merge_request) { merge_request_1 }
......@@ -77,45 +55,37 @@ RSpec.describe MergeTrains::RefreshMergeRequestsService do
subject
end
it_behaves_like 'logging results', 3
context 'when refresh service 1 returns error status' do
let(:refresh_service_1_result) { { status: :error, message: 'Failed to create ref' } }
it 'specifies require_recreate to refresh service 2' do
allow(MergeTrains::RefreshMergeRequestService)
expect(MergeTrains::RefreshMergeRequestService)
.to receive(:new).with(project, maintainer_2, require_recreate: true) { refresh_service_2 }
subject
end
it_behaves_like 'logging results', 3
end
context 'when refresh service 1 returns success status and did not create a pipeline' do
let(:refresh_service_1_result) { { status: :success, pipeline_created: false } }
it 'does not specify require_recreate to refresh service 2' do
allow(MergeTrains::RefreshMergeRequestService)
expect(MergeTrains::RefreshMergeRequestService)
.to receive(:new).with(project, maintainer_2, require_recreate: false) { refresh_service_2 }
subject
end
it_behaves_like 'logging results', 3
end
context 'when refresh service 1 returns success status and created a pipeline' do
let(:refresh_service_1_result) { { status: :success, pipeline_created: true } }
it 'specifies require_recreate to refresh service 2' do
allow(MergeTrains::RefreshMergeRequestService)
expect(MergeTrains::RefreshMergeRequestService)
.to receive(:new).with(project, maintainer_2, require_recreate: true) { refresh_service_2 }
subject
end
it_behaves_like 'logging results', 3
end
context 'when merge request 1 is not on a merge train' do
......@@ -127,8 +97,6 @@ RSpec.describe MergeTrains::RefreshMergeRequestsService do
subject
end
it_behaves_like 'logging results', 0
end
context 'when merge request 1 was on a merge train' do
......@@ -142,8 +110,6 @@ RSpec.describe MergeTrains::RefreshMergeRequestsService do
subject
end
it_behaves_like 'logging results', 0
end
context 'when the other thread has already been processing the merge train' do
......@@ -161,13 +127,11 @@ RSpec.describe MergeTrains::RefreshMergeRequestsService do
it 'enqueues the merge request id to BatchPopQueueing' do
expect_next_instance_of(Gitlab::BatchPopQueueing) do |queuing|
expect(queuing).to receive(:enqueue).with([merge_request_1.id], anything).and_call_original
expect(queuing).to receive(:enqueue).with([described_class::SIGNAL_FOR_REFRESH_REQUEST], anything).and_call_original
end
subject
end
it_behaves_like 'logging results', 1
end
end
......@@ -180,8 +144,6 @@ RSpec.describe MergeTrains::RefreshMergeRequestsService do
subject
end
it_behaves_like 'logging results', 3
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment