Commit de0aa461 authored by Vitali Tatarintev's avatar Vitali Tatarintev

Merge branch...

Merge branch '327281-stop-the-increase-of-unlocked-expired-artifacts-waiting-for-removal' into 'master'

Use locked value from Ci::Pipeline for Ci::JobArtifact

See merge request gitlab-org/gitlab!70235
parents d257c85b 2d921f7e
......@@ -19,6 +19,7 @@ module Ci
def initialize(job)
@job = job
@project = job.project
@pipeline = job.pipeline if ::Feature.enabled?(:ci_update_unlocked_job_artifacts, @project)
end
def authorize(artifact_type:, filesize: nil)
......@@ -53,7 +54,7 @@ module Ci
private
attr_reader :job, :project
attr_reader :job, :project, :pipeline
def validate_requirements(artifact_type:, filesize:)
return too_large_error if too_large?(artifact_type, filesize)
......@@ -85,24 +86,32 @@ module Ci
expire_in = params['expire_in'] ||
Gitlab::CurrentSettings.current_application_settings.default_artifacts_expire_in
artifact = Ci::JobArtifact.new(
artifact_attributes = {
job_id: job.id,
project: project,
expire_in: expire_in
}
artifact_attributes[:locked] = pipeline.locked if ::Feature.enabled?(:ci_update_unlocked_job_artifacts, project)
artifact = Ci::JobArtifact.new(
artifact_attributes.merge(
file: artifacts_file,
file_type: params[:artifact_type],
file_format: params[:artifact_format],
file_sha256: artifacts_file.sha256,
expire_in: expire_in)
file_sha256: artifacts_file.sha256
)
)
artifact_metadata = if metadata_file
Ci::JobArtifact.new(
job_id: job.id,
project: project,
artifact_attributes.merge(
file: metadata_file,
file_type: :metadata,
file_format: :gzip,
file_sha256: metadata_file.sha256,
expire_in: expire_in)
file_sha256: metadata_file.sha256
)
)
end
[artifact, artifact_metadata]
......
......@@ -5,6 +5,27 @@ module Ci
BATCH_SIZE = 100
def execute(ci_ref, before_pipeline = nil)
results = {
unlocked_pipelines: 0,
unlocked_job_artifacts: 0
}
if ::Feature.enabled?(:ci_update_unlocked_job_artifacts, ci_ref.project)
loop do
unlocked_pipelines = []
unlocked_job_artifacts = []
::Ci::Pipeline.transaction do
unlocked_pipelines = unlock_pipelines(ci_ref, before_pipeline)
unlocked_job_artifacts = unlock_job_artifacts(unlocked_pipelines)
end
break if unlocked_pipelines.empty?
results[:unlocked_pipelines] += unlocked_pipelines.length
results[:unlocked_job_artifacts] += unlocked_job_artifacts.length
end
else
query = <<~SQL.squish
UPDATE "ci_pipelines"
SET "locked" = #{::Ci::Pipeline.lockeds[:unlocked]}
......@@ -17,10 +38,51 @@ module Ci
SQL
loop do
break if Ci::Pipeline.connection.exec_query(query).empty?
unlocked_pipelines = Ci::Pipeline.connection.exec_query(query)
break if unlocked_pipelines.empty?
results[:unlocked_pipelines] += unlocked_pipelines.length
end
end
results
end
# rubocop:disable CodeReuse/ActiveRecord
def unlock_job_artifacts_query(pipeline_ids)
ci_job_artifacts = ::Ci::JobArtifact.arel_table
build_ids = ::Ci::Build.select(:id).where(commit_id: pipeline_ids)
returning = Arel::Nodes::Grouping.new(ci_job_artifacts[:id])
Arel::UpdateManager.new
.table(ci_job_artifacts)
.where(ci_job_artifacts[:job_id].in(Arel.sql(build_ids.to_sql)))
.set([[ci_job_artifacts[:locked], ::Ci::JobArtifact.lockeds[:unlocked]]])
.to_sql + " RETURNING #{returning.to_sql}"
end
# rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def unlock_pipelines_query(ci_ref, before_pipeline)
ci_pipelines = ::Ci::Pipeline.arel_table
pipelines_scope = ci_ref.pipelines.artifacts_locked
pipelines_scope = pipelines_scope.before_pipeline(before_pipeline) if before_pipeline
pipelines_scope = pipelines_scope.select(:id).limit(BATCH_SIZE).lock('FOR UPDATE SKIP LOCKED')
returning = Arel::Nodes::Grouping.new(ci_pipelines[:id])
Arel::UpdateManager.new
.table(ci_pipelines)
.where(ci_pipelines[:id].in(Arel.sql(pipelines_scope.to_sql)))
.set([[ci_pipelines[:locked], ::Ci::Pipeline.lockeds[:unlocked]]])
.to_sql + " RETURNING #{returning.to_sql}"
end
# rubocop:enable CodeReuse/ActiveRecord
private
def collect_pipelines(ci_ref, before_pipeline)
......@@ -29,5 +91,17 @@ module Ci
pipeline_scope.artifacts_locked
end
def unlock_job_artifacts(pipelines)
return if pipelines.empty?
::Ci::JobArtifact.connection.exec_query(
unlock_job_artifacts_query(pipelines.rows.flatten)
)
end
def unlock_pipelines(ci_ref, before_pipeline)
::Ci::Pipeline.connection.exec_query(unlock_pipelines_query(ci_ref, before_pipeline))
end
end
end
......@@ -15,9 +15,12 @@ module Ci
::Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
break unless pipeline.has_archive_artifacts?
::Ci::UnlockArtifactsService
results = ::Ci::UnlockArtifactsService
.new(pipeline.project, pipeline.user)
.execute(pipeline.ci_ref, pipeline)
log_extra_metadata_on_done(:unlocked_pipelines, results[:unlocked_pipelines])
log_extra_metadata_on_done(:unlocked_job_artifacts, results[:unlocked_job_artifacts])
end
end
end
......
......@@ -15,9 +15,12 @@ module Ci
::Project.find_by_id(project_id).try do |project|
::User.find_by_id(user_id).try do |user|
project.ci_refs.find_by_ref_path(ref_path).try do |ci_ref|
::Ci::UnlockArtifactsService
results = ::Ci::UnlockArtifactsService
.new(project, user)
.execute(ci_ref)
log_extra_metadata_on_done(:unlocked_pipelines, results[:unlocked_pipelines])
log_extra_metadata_on_done(:unlocked_job_artifacts, results[:unlocked_job_artifacts])
end
end
end
......
---
name: ci_update_unlocked_job_artifacts
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/70235
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/343465
milestone: '14.5'
type: development
group: group::testing
default_enabled: false
......@@ -213,6 +213,14 @@ FactoryBot.define do
end
end
trait :with_persisted_artifacts do
status { :success }
after(:create) do |pipeline, evaluator|
pipeline.builds << create(:ci_build, :artifacts, pipeline: pipeline, project: pipeline.project)
end
end
trait :with_job do
after(:build) do |pipeline, evaluator|
pipeline.builds << build(:ci_build, pipeline: pipeline, project: pipeline.project)
......
......@@ -49,6 +49,7 @@ RSpec.describe Ci::JobArtifacts::CreateService do
expect(new_artifact.file_type).to eq(params['artifact_type'])
expect(new_artifact.file_format).to eq(params['artifact_format'])
expect(new_artifact.file_sha256).to eq(artifacts_sha256)
expect(new_artifact.locked).to eq(job.pipeline.locked)
end
it 'does not track the job user_id' do
......@@ -75,6 +76,7 @@ RSpec.describe Ci::JobArtifacts::CreateService do
expect(new_artifact.file_type).to eq('metadata')
expect(new_artifact.file_format).to eq('gzip')
expect(new_artifact.file_sha256).to eq(artifacts_sha256)
expect(new_artifact.locked).to eq(job.pipeline.locked)
end
it 'sets expiration date according to application settings' do
......
......@@ -3,35 +3,39 @@
require 'spec_helper'
RSpec.describe Ci::UnlockArtifactsService do
describe '#execute' do
subject(:execute) { described_class.new(pipeline.project, pipeline.user).execute(ci_ref, before_pipeline) }
using RSpec::Parameterized::TableSyntax
before do
stub_const("#{described_class}::BATCH_SIZE", 1)
where(:tag, :ci_update_unlocked_job_artifacts) do
false | false
false | true
true | false
true | true
end
[true, false].each do |tag|
context "when tag is #{tag}" do
with_them do
let(:ref) { 'master' }
let(:ref_path) { tag ? "#{::Gitlab::Git::TAG_REF_PREFIX}#{ref}" : "#{::Gitlab::Git::BRANCH_REF_PREFIX}#{ref}" }
let(:ci_ref) { create(:ci_ref, ref_path: ref_path) }
let!(:old_unlocked_pipeline) { create(:ci_pipeline, ref: ref, tag: tag, project: ci_ref.project, locked: :unlocked) }
let!(:older_pipeline) { create(:ci_pipeline, ref: ref, tag: tag, project: ci_ref.project, locked: :artifacts_locked) }
let!(:older_ambiguous_pipeline) { create(:ci_pipeline, ref: ref, tag: !tag, project: ci_ref.project, locked: :artifacts_locked) }
let!(:pipeline) { create(:ci_pipeline, ref: ref, tag: tag, project: ci_ref.project, locked: :artifacts_locked) }
let!(:child_pipeline) { create(:ci_pipeline, ref: ref, tag: tag, project: ci_ref.project, locked: :artifacts_locked) }
let!(:newer_pipeline) { create(:ci_pipeline, ref: ref, tag: tag, project: ci_ref.project, locked: :artifacts_locked) }
let!(:other_ref_pipeline) { create(:ci_pipeline, ref: 'other_ref', tag: tag, project: ci_ref.project, locked: :artifacts_locked) }
let(:project) { ci_ref.project }
let(:source_job) { create(:ci_build, pipeline: pipeline) }
let!(:old_unlocked_pipeline) { create(:ci_pipeline, :with_persisted_artifacts, ref: ref, tag: tag, project: project, locked: :unlocked) }
let!(:older_pipeline) { create(:ci_pipeline, :with_persisted_artifacts, ref: ref, tag: tag, project: project, locked: :artifacts_locked) }
let!(:older_ambiguous_pipeline) { create(:ci_pipeline, :with_persisted_artifacts, ref: ref, tag: !tag, project: project, locked: :artifacts_locked) }
let!(:pipeline) { create(:ci_pipeline, :with_persisted_artifacts, ref: ref, tag: tag, project: project, locked: :artifacts_locked) }
let!(:child_pipeline) { create(:ci_pipeline, :with_persisted_artifacts, ref: ref, tag: tag, project: project, locked: :artifacts_locked) }
let!(:newer_pipeline) { create(:ci_pipeline, :with_persisted_artifacts, ref: ref, tag: tag, project: project, locked: :artifacts_locked) }
let!(:other_ref_pipeline) { create(:ci_pipeline, :with_persisted_artifacts, ref: 'other_ref', tag: tag, project: project, locked: :artifacts_locked) }
let!(:sources_pipeline) { create(:ci_sources_pipeline, source_job: source_job, source_project: project, pipeline: child_pipeline, project: project) }
before do
create(:ci_sources_pipeline,
source_job: create(:ci_build, pipeline: pipeline),
source_project: ci_ref.project,
pipeline: child_pipeline,
project: ci_ref.project)
stub_const("#{described_class}::BATCH_SIZE", 1)
stub_feature_flags(ci_update_unlocked_job_artifacts: ci_update_unlocked_job_artifacts)
end
describe '#execute' do
subject(:execute) { described_class.new(pipeline.project, pipeline.user).execute(ci_ref, before_pipeline) }
context 'when running on a ref before a pipeline' do
let(:before_pipeline) { pipeline }
......@@ -62,6 +66,12 @@ RSpec.describe Ci::UnlockArtifactsService do
it 'does not unlock artifacts for child pipeline' do
expect { execute }.not_to change { child_pipeline.reload.locked }.from('artifacts_locked')
end
it 'unlocks job artifact records' do
pending unless ci_update_unlocked_job_artifacts
expect { execute }.to change { ::Ci::JobArtifact.artifact_unlocked.count }.from(0).to(2)
end
end
context 'when running on just the ref' do
......@@ -90,6 +100,150 @@ RSpec.describe Ci::UnlockArtifactsService do
it 'does not unlock artifacts for other refs' do
expect { execute }.not_to change { other_ref_pipeline.reload.locked }.from('artifacts_locked')
end
it 'unlocks job artifact records' do
pending unless ci_update_unlocked_job_artifacts
expect { execute }.to change { ::Ci::JobArtifact.artifact_unlocked.count }.from(0).to(8)
end
end
end
describe '#unlock_pipelines_query' do
subject { described_class.new(pipeline.project, pipeline.user).unlock_pipelines_query(ci_ref, before_pipeline) }
context 'when running on a ref before a pipeline' do
let(:before_pipeline) { pipeline }
it 'produces the expected SQL string' do
expect(subject.squish).to eq <<~SQL.squish
UPDATE
"ci_pipelines"
SET
"locked" = 0
WHERE
"ci_pipelines"."id" IN
(SELECT
"ci_pipelines"."id"
FROM
"ci_pipelines"
WHERE
"ci_pipelines"."ci_ref_id" = #{ci_ref.id}
AND "ci_pipelines"."locked" = 1
AND (ci_pipelines.id < #{before_pipeline.id})
AND "ci_pipelines"."id" NOT IN
(WITH RECURSIVE
"base_and_descendants"
AS
((SELECT
"ci_pipelines".*
FROM
"ci_pipelines"
WHERE
"ci_pipelines"."id" = #{before_pipeline.id})
UNION
(SELECT
"ci_pipelines".*
FROM
"ci_pipelines",
"base_and_descendants",
"ci_sources_pipelines"
WHERE
"ci_sources_pipelines"."pipeline_id" = "ci_pipelines"."id"
AND "ci_sources_pipelines"."source_pipeline_id" = "base_and_descendants"."id"
AND "ci_sources_pipelines"."source_project_id" = "ci_sources_pipelines"."project_id"))
SELECT
"id"
FROM
"base_and_descendants"
AS
"ci_pipelines")
LIMIT 1
FOR UPDATE
SKIP LOCKED)
RETURNING ("ci_pipelines"."id")
SQL
end
end
context 'when running on just the ref' do
let(:before_pipeline) { nil }
it 'produces the expected SQL string' do
expect(subject.squish).to eq <<~SQL.squish
UPDATE
"ci_pipelines"
SET
"locked" = 0
WHERE
"ci_pipelines"."id" IN
(SELECT
"ci_pipelines"."id"
FROM
"ci_pipelines"
WHERE
"ci_pipelines"."ci_ref_id" = #{ci_ref.id}
AND "ci_pipelines"."locked" = 1
LIMIT 1
FOR UPDATE
SKIP LOCKED)
RETURNING
("ci_pipelines"."id")
SQL
end
end
end
describe '#unlock_job_artifacts_query' do
subject { described_class.new(pipeline.project, pipeline.user).unlock_job_artifacts_query(pipeline_ids) }
context 'when running on a ref before a pipeline' do
let(:before_pipeline) { pipeline }
let(:pipeline_ids) { [older_pipeline.id] }
it 'produces the expected SQL string' do
expect(subject.squish).to eq <<~SQL.squish
UPDATE
"ci_job_artifacts"
SET
"locked" = 0
WHERE
"ci_job_artifacts"."job_id" IN
(SELECT
"ci_builds"."id"
FROM
"ci_builds"
WHERE
"ci_builds"."type" = 'Ci::Build'
AND "ci_builds"."commit_id" = #{older_pipeline.id})
RETURNING
("ci_job_artifacts"."id")
SQL
end
end
context 'when running on just the ref' do
let(:before_pipeline) { nil }
let(:pipeline_ids) { [older_pipeline.id, newer_pipeline.id, pipeline.id] }
it 'produces the expected SQL string' do
expect(subject.squish).to eq <<~SQL.squish
UPDATE
"ci_job_artifacts"
SET
"locked" = 0
WHERE
"ci_job_artifacts"."job_id" IN
(SELECT
"ci_builds"."id"
FROM
"ci_builds"
WHERE
"ci_builds"."type" = 'Ci::Build'
AND "ci_builds"."commit_id" IN (#{pipeline_ids.join(', ')}))
RETURNING
("ci_job_artifacts"."id")
SQL
end
end
end
......
......@@ -4,7 +4,9 @@ require 'spec_helper'
RSpec.describe Ci::RefDeleteUnlockArtifactsWorker do
describe '#perform' do
subject(:perform) { described_class.new.perform(project_id, user_id, ref) }
subject(:perform) { worker.perform(project_id, user_id, ref) }
let(:worker) { described_class.new }
let(:ref) { 'refs/heads/master' }
......@@ -40,6 +42,36 @@ RSpec.describe Ci::RefDeleteUnlockArtifactsWorker do
expect(service).to have_received(:execute).with(ci_ref)
end
context 'when a locked pipeline with persisted artifacts exists' do
let!(:pipeline) { create(:ci_pipeline, :with_persisted_artifacts, ref: 'master', project: project, locked: :artifacts_locked) }
context 'with ci_update_unlocked_job_artifacts disabled' do
before do
stub_feature_flags(ci_update_unlocked_job_artifacts: false)
end
it 'logs the correct extra metadata' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:unlocked_pipelines, 1)
expect(worker).to receive(:log_extra_metadata_on_done).with(:unlocked_job_artifacts, 0)
perform
end
end
context 'with ci_update_unlocked_job_artifacts enabled' do
before do
stub_feature_flags(ci_update_unlocked_job_artifacts: true)
end
it 'logs the correct extra metadata' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:unlocked_pipelines, 1)
expect(worker).to receive(:log_extra_metadata_on_done).with(:unlocked_job_artifacts, 2)
perform
end
end
end
end
context 'when ci ref does not exist for the given project' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment