Commit 8a403acb authored by Fabio Pitino's avatar Fabio Pitino Committed by Matthias Käppler

Drop user pipelines async when user is blocked

Forcefully updating pipelines and jobs statuses does
not execute state machine callbacks causing some
cleanups not to be performed.
parent 47d8b264
...@@ -354,7 +354,7 @@ class User < ApplicationRecord ...@@ -354,7 +354,7 @@ class User < ApplicationRecord
# this state transition object in order to do a rollback. # this state transition object in order to do a rollback.
# For this reason the tradeoff is to disable this cop. # For this reason the tradeoff is to disable this cop.
after_transition any => :blocked do |user| after_transition any => :blocked do |user|
Ci::AbortPipelinesService.new.execute(user.pipelines, :user_blocked) Ci::DropPipelineService.new.execute_async_for_all(user.pipelines, :user_blocked, user)
Ci::DisableUserPipelineSchedulesService.new.execute(user) Ci::DisableUserPipelineSchedulesService.new.execute(user)
end end
# rubocop: enable CodeReuse/ServiceClass # rubocop: enable CodeReuse/ServiceClass
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
module Ci module Ci
class AbortPipelinesService class AbortPipelinesService
# NOTE: This call fails pipelines in bulk without running callbacks. # NOTE: This call fails pipelines in bulk without running callbacks.
# Only for pipeline abandonment scenarios (examples: project delete, user block) # Only for pipeline abandonment scenarios (examples: project delete)
def execute(pipelines, failure_reason) def execute(pipelines, failure_reason)
pipelines.cancelable.each_batch(of: 100) do |pipeline_batch| pipelines.cancelable.each_batch(of: 100) do |pipeline_batch|
now = Time.current now = Time.current
......
# frozen_string_literal: true
module Ci
class DropPipelineService
# execute service asynchronously for each cancelable pipeline
def execute_async_for_all(pipelines, failure_reason, context_user)
pipelines.cancelable.select(:id).find_in_batches do |pipelines_batch|
Ci::DropPipelineWorker.bulk_perform_async_with_contexts(
pipelines_batch,
arguments_proc: -> (pipeline) { [pipeline.id, failure_reason] },
context_proc: -> (_) { { user: context_user } }
)
end
end
def execute(pipeline, failure_reason, retries: 3)
Gitlab::OptimisticLocking.retry_lock(pipeline.cancelable_statuses, retries, name: 'ci_pipeline_drop_running') do |cancelables|
cancelables.find_in_batches do |batch|
preload_associations_for_drop(batch)
batch.each do |job|
job.drop(failure_reason)
end
end
end
end
private
def preload_associations_for_drop(builds_batch)
ActiveRecord::Associations::Preloader.new.preload( # rubocop: disable CodeReuse/ActiveRecord
builds_batch,
[:project, :pipeline, :metadata, :deployment, :taggings]
)
end
end
end
...@@ -1251,6 +1251,14 @@ ...@@ -1251,6 +1251,14 @@
:weight: 3 :weight: 3
:idempotent: :idempotent:
:tags: [] :tags: []
- :name: pipeline_default:ci_drop_pipeline
:feature_category: :continuous_integration
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 3
:idempotent: true
:tags: []
- :name: pipeline_default:ci_merge_requests_add_todo_when_build_fails - :name: pipeline_default:ci_merge_requests_add_todo_when_build_fails
:feature_category: :continuous_integration :feature_category: :continuous_integration
:has_external_dependencies: :has_external_dependencies:
......
# frozen_string_literal: true
module Ci
class DropPipelineWorker
include ApplicationWorker
include PipelineQueue
idempotent!
def perform(pipeline_id, failure_reason)
Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
Ci::DropPipelineService.new.execute(pipeline, failure_reason.to_sym)
end
end
end
end
---
title: Drop user pipelines async when user is blocked
merge_request: 59129
author:
type: fixed
...@@ -1803,8 +1803,8 @@ RSpec.describe User do ...@@ -1803,8 +1803,8 @@ RSpec.describe User do
it 'aborts all running pipelines and related jobs' do it 'aborts all running pipelines and related jobs' do
expect(user).to receive(:pipelines).and_return(pipelines) expect(user).to receive(:pipelines).and_return(pipelines)
expect(Ci::AbortPipelinesService).to receive(:new).and_return(service) expect(Ci::DropPipelineService).to receive(:new).and_return(service)
expect(service).to receive(:execute).with(pipelines, :user_blocked) expect(service).to receive(:execute_async_for_all).with(pipelines, :user_blocked, user)
user.block user.block
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::DropPipelineService do
let_it_be(:user) { create(:user) }
let(:failure_reason) { :user_blocked }
let!(:cancelable_pipeline) { create(:ci_pipeline, :running, user: user) }
let!(:running_build) { create(:ci_build, :running, pipeline: cancelable_pipeline) }
let!(:success_pipeline) { create(:ci_pipeline, :success, user: user) }
let!(:success_build) { create(:ci_build, :success, pipeline: success_pipeline) }
describe '#execute_async_for_all' do
subject { described_class.new.execute_async_for_all(user.pipelines, failure_reason, user) }
it 'drops only cancelable pipelines asynchronously', :sidekiq_inline do
subject
expect(cancelable_pipeline.reload).to be_failed
expect(running_build.reload).to be_failed
expect(success_pipeline.reload).to be_success
expect(success_build.reload).to be_success
end
end
describe '#execute' do
subject { described_class.new.execute(cancelable_pipeline.id, failure_reason) }
def drop_pipeline!(pipeline)
described_class.new.execute(pipeline, failure_reason)
end
it 'drops each cancelable build in the pipeline', :aggregate_failures do
drop_pipeline!(cancelable_pipeline)
expect(running_build.reload).to be_failed
expect(running_build.failure_reason).to eq(failure_reason.to_s)
expect(success_build.reload).to be_success
end
it 'avoids N+1 queries when reading data' do
control_count = ActiveRecord::QueryRecorder.new do
drop_pipeline!(cancelable_pipeline)
end.count
writes_per_build = 2
expected_reads_count = control_count - writes_per_build
create_list(:ci_build, 5, :running, pipeline: cancelable_pipeline)
expect do
drop_pipeline!(cancelable_pipeline)
end.not_to exceed_query_limit(expected_reads_count + (5 * writes_per_build))
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::DropPipelineWorker do
include AfterNextHelpers
let(:pipeline) { create(:ci_pipeline, :running) }
let(:failure_reason) { :user_blocked }
describe '#perform' do
subject { described_class.new.perform(pipeline.id, failure_reason) }
it 'calls delegates to the service' do
expect_next(Ci::DropPipelineService).to receive(:execute).with(pipeline, failure_reason)
subject
end
it_behaves_like 'an idempotent worker' do
let!(:running_build) { create(:ci_build, :running, pipeline: pipeline) }
let!(:success_build) { create(:ci_build, :success, pipeline: pipeline) }
let(:job_args) { [pipeline.id, failure_reason] }
it 'executes the service', :aggregate_failures do
subject
expect(running_build.reload).to be_failed
expect(running_build.failure_reason).to eq(failure_reason.to_s)
expect(success_build.reload).to be_success
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment