Commit 2d921f7e authored by Matt Kasa's avatar Matt Kasa

Propagate locked from pipeline to job artifacts

Relates to https://gitlab.com/gitlab-org/gitlab/-/issues/327281
parent c547531d
......@@ -19,6 +19,7 @@ module Ci
def initialize(job)
@job = job
@project = job.project
@pipeline = job.pipeline if ::Feature.enabled?(:ci_update_unlocked_job_artifacts, @project)
end
def authorize(artifact_type:, filesize: nil)
......@@ -53,7 +54,7 @@ module Ci
private
attr_reader :job, :project
attr_reader :job, :project, :pipeline
def validate_requirements(artifact_type:, filesize:)
return too_large_error if too_large?(artifact_type, filesize)
......@@ -85,24 +86,32 @@ module Ci
expire_in = params['expire_in'] ||
Gitlab::CurrentSettings.current_application_settings.default_artifacts_expire_in
artifact = Ci::JobArtifact.new(
artifact_attributes = {
job_id: job.id,
project: project,
file: artifacts_file,
file_type: params[:artifact_type],
file_format: params[:artifact_format],
file_sha256: artifacts_file.sha256,
expire_in: expire_in)
expire_in: expire_in
}
artifact_attributes[:locked] = pipeline.locked if ::Feature.enabled?(:ci_update_unlocked_job_artifacts, project)
artifact = Ci::JobArtifact.new(
artifact_attributes.merge(
file: artifacts_file,
file_type: params[:artifact_type],
file_format: params[:artifact_format],
file_sha256: artifacts_file.sha256
)
)
artifact_metadata = if metadata_file
Ci::JobArtifact.new(
job_id: job.id,
project: project,
file: metadata_file,
file_type: :metadata,
file_format: :gzip,
file_sha256: metadata_file.sha256,
expire_in: expire_in)
artifact_attributes.merge(
file: metadata_file,
file_type: :metadata,
file_format: :gzip,
file_sha256: metadata_file.sha256
)
)
end
[artifact, artifact_metadata]
......
......@@ -5,22 +5,84 @@ module Ci
BATCH_SIZE = 100
def execute(ci_ref, before_pipeline = nil)
query = <<~SQL.squish
UPDATE "ci_pipelines"
SET "locked" = #{::Ci::Pipeline.lockeds[:unlocked]}
WHERE "ci_pipelines"."id" in (
#{collect_pipelines(ci_ref, before_pipeline).select(:id).to_sql}
LIMIT #{BATCH_SIZE}
FOR UPDATE SKIP LOCKED
)
RETURNING "ci_pipelines"."id";
SQL
loop do
break if Ci::Pipeline.connection.exec_query(query).empty?
results = {
unlocked_pipelines: 0,
unlocked_job_artifacts: 0
}
if ::Feature.enabled?(:ci_update_unlocked_job_artifacts, ci_ref.project)
loop do
unlocked_pipelines = []
unlocked_job_artifacts = []
::Ci::Pipeline.transaction do
unlocked_pipelines = unlock_pipelines(ci_ref, before_pipeline)
unlocked_job_artifacts = unlock_job_artifacts(unlocked_pipelines)
end
break if unlocked_pipelines.empty?
results[:unlocked_pipelines] += unlocked_pipelines.length
results[:unlocked_job_artifacts] += unlocked_job_artifacts.length
end
else
query = <<~SQL.squish
UPDATE "ci_pipelines"
SET "locked" = #{::Ci::Pipeline.lockeds[:unlocked]}
WHERE "ci_pipelines"."id" in (
#{collect_pipelines(ci_ref, before_pipeline).select(:id).to_sql}
LIMIT #{BATCH_SIZE}
FOR UPDATE SKIP LOCKED
)
RETURNING "ci_pipelines"."id";
SQL
loop do
unlocked_pipelines = Ci::Pipeline.connection.exec_query(query)
break if unlocked_pipelines.empty?
results[:unlocked_pipelines] += unlocked_pipelines.length
end
end
results
end
# rubocop:disable CodeReuse/ActiveRecord
def unlock_job_artifacts_query(pipeline_ids)
ci_job_artifacts = ::Ci::JobArtifact.arel_table
build_ids = ::Ci::Build.select(:id).where(commit_id: pipeline_ids)
returning = Arel::Nodes::Grouping.new(ci_job_artifacts[:id])
Arel::UpdateManager.new
.table(ci_job_artifacts)
.where(ci_job_artifacts[:job_id].in(Arel.sql(build_ids.to_sql)))
.set([[ci_job_artifacts[:locked], ::Ci::JobArtifact.lockeds[:unlocked]]])
.to_sql + " RETURNING #{returning.to_sql}"
end
# rubocop:enable CodeReuse/ActiveRecord
# rubocop:disable CodeReuse/ActiveRecord
def unlock_pipelines_query(ci_ref, before_pipeline)
ci_pipelines = ::Ci::Pipeline.arel_table
pipelines_scope = ci_ref.pipelines.artifacts_locked
pipelines_scope = pipelines_scope.before_pipeline(before_pipeline) if before_pipeline
pipelines_scope = pipelines_scope.select(:id).limit(BATCH_SIZE).lock('FOR UPDATE SKIP LOCKED')
returning = Arel::Nodes::Grouping.new(ci_pipelines[:id])
Arel::UpdateManager.new
.table(ci_pipelines)
.where(ci_pipelines[:id].in(Arel.sql(pipelines_scope.to_sql)))
.set([[ci_pipelines[:locked], ::Ci::Pipeline.lockeds[:unlocked]]])
.to_sql + " RETURNING #{returning.to_sql}"
end
# rubocop:enable CodeReuse/ActiveRecord
private
def collect_pipelines(ci_ref, before_pipeline)
......@@ -29,5 +91,17 @@ module Ci
pipeline_scope.artifacts_locked
end
def unlock_job_artifacts(pipelines)
return if pipelines.empty?
::Ci::JobArtifact.connection.exec_query(
unlock_job_artifacts_query(pipelines.rows.flatten)
)
end
def unlock_pipelines(ci_ref, before_pipeline)
::Ci::Pipeline.connection.exec_query(unlock_pipelines_query(ci_ref, before_pipeline))
end
end
end
......@@ -15,9 +15,12 @@ module Ci
::Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
break unless pipeline.has_archive_artifacts?
::Ci::UnlockArtifactsService
results = ::Ci::UnlockArtifactsService
.new(pipeline.project, pipeline.user)
.execute(pipeline.ci_ref, pipeline)
log_extra_metadata_on_done(:unlocked_pipelines, results[:unlocked_pipelines])
log_extra_metadata_on_done(:unlocked_job_artifacts, results[:unlocked_job_artifacts])
end
end
end
......
......@@ -15,9 +15,12 @@ module Ci
::Project.find_by_id(project_id).try do |project|
::User.find_by_id(user_id).try do |user|
project.ci_refs.find_by_ref_path(ref_path).try do |ci_ref|
::Ci::UnlockArtifactsService
results = ::Ci::UnlockArtifactsService
.new(project, user)
.execute(ci_ref)
log_extra_metadata_on_done(:unlocked_pipelines, results[:unlocked_pipelines])
log_extra_metadata_on_done(:unlocked_job_artifacts, results[:unlocked_job_artifacts])
end
end
end
......
---
name: ci_update_unlocked_job_artifacts
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/70235
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/343465
milestone: '14.5'
type: development
group: group::testing
default_enabled: false
......@@ -213,6 +213,14 @@ FactoryBot.define do
end
end
trait :with_persisted_artifacts do
status { :success }
after(:create) do |pipeline, evaluator|
pipeline.builds << create(:ci_build, :artifacts, pipeline: pipeline, project: pipeline.project)
end
end
trait :with_job do
after(:build) do |pipeline, evaluator|
pipeline.builds << build(:ci_build, pipeline: pipeline, project: pipeline.project)
......
......@@ -49,6 +49,7 @@ RSpec.describe Ci::JobArtifacts::CreateService do
expect(new_artifact.file_type).to eq(params['artifact_type'])
expect(new_artifact.file_format).to eq(params['artifact_format'])
expect(new_artifact.file_sha256).to eq(artifacts_sha256)
expect(new_artifact.locked).to eq(job.pipeline.locked)
end
it 'does not track the job user_id' do
......@@ -75,6 +76,7 @@ RSpec.describe Ci::JobArtifacts::CreateService do
expect(new_artifact.file_type).to eq('metadata')
expect(new_artifact.file_format).to eq('gzip')
expect(new_artifact.file_sha256).to eq(artifacts_sha256)
expect(new_artifact.locked).to eq(job.pipeline.locked)
end
it 'sets expiration date according to application settings' do
......
......@@ -4,7 +4,9 @@ require 'spec_helper'
RSpec.describe Ci::RefDeleteUnlockArtifactsWorker do
describe '#perform' do
subject(:perform) { described_class.new.perform(project_id, user_id, ref) }
subject(:perform) { worker.perform(project_id, user_id, ref) }
let(:worker) { described_class.new }
let(:ref) { 'refs/heads/master' }
......@@ -40,6 +42,36 @@ RSpec.describe Ci::RefDeleteUnlockArtifactsWorker do
expect(service).to have_received(:execute).with(ci_ref)
end
context 'when a locked pipeline with persisted artifacts exists' do
let!(:pipeline) { create(:ci_pipeline, :with_persisted_artifacts, ref: 'master', project: project, locked: :artifacts_locked) }
context 'with ci_update_unlocked_job_artifacts disabled' do
before do
stub_feature_flags(ci_update_unlocked_job_artifacts: false)
end
it 'logs the correct extra metadata' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:unlocked_pipelines, 1)
expect(worker).to receive(:log_extra_metadata_on_done).with(:unlocked_job_artifacts, 0)
perform
end
end
context 'with ci_update_unlocked_job_artifacts enabled' do
before do
stub_feature_flags(ci_update_unlocked_job_artifacts: true)
end
it 'logs the correct extra metadata' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:unlocked_pipelines, 1)
expect(worker).to receive(:log_extra_metadata_on_done).with(:unlocked_job_artifacts, 2)
perform
end
end
end
end
context 'when ci ref does not exist for the given project' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment