Commit 478d915f authored by Furkan Ayhan's avatar Furkan Ayhan Committed by Stan Hu

Fix status cache for upstream pipelines

When status of downstream pipeline is updated, the upstream cache
is not updated.

In this commit;
- Removed "cachable?" check
- Moved from EE to CE and refactored upstream/downstream pipeline
cache expiration
- Implement calling ExpirePipelineCacheWorker from ExpireJobCacheWorker
parent 46527a69
...@@ -228,7 +228,7 @@ module Ci ...@@ -228,7 +228,7 @@ module Ci
pipeline.run_after_commit do pipeline.run_after_commit do
PipelineHooksWorker.perform_async(pipeline.id) PipelineHooksWorker.perform_async(pipeline.id)
ExpirePipelineCacheWorker.perform_async(pipeline.id) if pipeline.cacheable? ExpirePipelineCacheWorker.perform_async(pipeline.id)
end end
end end
...@@ -938,6 +938,12 @@ module Ci ...@@ -938,6 +938,12 @@ module Ci
.first .first
end end
def self_with_ancestors_and_descendants(same_project: false)
::Gitlab::Ci::PipelineObjectHierarchy
.new(self.class.unscoped.where(id: id), options: { same_project: same_project })
.all_objects
end
def bridge_triggered? def bridge_triggered?
source_bridge.present? source_bridge.present?
end end
...@@ -1173,10 +1179,6 @@ module Ci ...@@ -1173,10 +1179,6 @@ module Ci
@persistent_ref ||= PersistentRef.new(pipeline: self) @persistent_ref ||= PersistentRef.new(pipeline: self)
end end
def cacheable?
!dangling?
end
def dangling? def dangling?
Enums::Ci::Pipeline.dangling_sources.key?(source.to_sym) Enums::Ci::Pipeline.dangling_sources.key?(source.to_sym)
end end
......
...@@ -62,15 +62,16 @@ module Ci ...@@ -62,15 +62,16 @@ module Ci
project = pipeline.project project = pipeline.project
store.touch(project_pipelines_path(project)) store.touch(project_pipelines_path(project))
store.touch(project_pipeline_path(project, pipeline))
store.touch(commit_pipelines_path(project, pipeline.commit)) unless pipeline.commit.nil? store.touch(commit_pipelines_path(project, pipeline.commit)) unless pipeline.commit.nil?
store.touch(new_merge_request_pipelines_path(project)) store.touch(new_merge_request_pipelines_path(project))
store.touch(graphql_pipeline_path(pipeline))
each_pipelines_merge_request_path(pipeline) do |path| each_pipelines_merge_request_path(pipeline) do |path|
store.touch(path) store.touch(path)
end end
pipeline.self_with_ancestors_and_descendants.each do |relative_pipeline|
store.touch(project_pipeline_path(relative_pipeline.project, relative_pipeline))
store.touch(graphql_pipeline_path(relative_pipeline))
end
end end
end end
end end
Ci::ExpirePipelineCacheService.prepend_if_ee('EE::Ci::ExpirePipelineCacheService')
...@@ -35,7 +35,7 @@ class BuildFinishedWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -35,7 +35,7 @@ class BuildFinishedWorker # rubocop:disable Scalability/IdempotentWorker
# We execute these async as these are independent operations. # We execute these async as these are independent operations.
BuildHooksWorker.perform_async(build.id) BuildHooksWorker.perform_async(build.id)
ExpirePipelineCacheWorker.perform_async(build.pipeline_id) if build.pipeline.cacheable? ExpirePipelineCacheWorker.perform_async(build.pipeline_id)
ChatNotificationWorker.perform_async(build.id) if build.pipeline.chat? ChatNotificationWorker.perform_async(build.id) if build.pipeline.chat?
## ##
......
...@@ -16,19 +16,13 @@ class ExpireJobCacheWorker ...@@ -16,19 +16,13 @@ class ExpireJobCacheWorker
pipeline = job.pipeline pipeline = job.pipeline
project = job.project project = job.project
Gitlab::EtagCaching::Store.new.tap do |store| Gitlab::EtagCaching::Store.new.touch(project_job_path(project, job))
store.touch(project_pipeline_path(project, pipeline)) ExpirePipelineCacheWorker.perform_async(pipeline.id)
store.touch(project_job_path(project, job))
end
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
private private
def project_pipeline_path(project, pipeline)
Gitlab::Routing.url_helpers.project_pipeline_path(project, pipeline, format: :json)
end
def project_job_path(project, job) def project_job_path(project, job)
Gitlab::Routing.url_helpers.project_build_path(project, job.id, format: :json) Gitlab::Routing.url_helpers.project_build_path(project, job.id, format: :json)
end end
......
...@@ -13,7 +13,7 @@ class ExpirePipelineCacheWorker ...@@ -13,7 +13,7 @@ class ExpirePipelineCacheWorker
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(pipeline_id) def perform(pipeline_id)
pipeline = Ci::Pipeline.find_by(id: pipeline_id) pipeline = Ci::Pipeline.find_by(id: pipeline_id)
return unless pipeline&.cacheable? return unless pipeline
Ci::ExpirePipelineCacheService.new.execute(pipeline) Ci::ExpirePipelineCacheService.new.execute(pipeline)
end end
......
---
title: Fix status cache for upstream pipelines
merge_request: 54937
author:
type: fixed
# frozen_string_literal: true
module EE
module Ci
module ExpirePipelineCacheService
extend ::Gitlab::Utils::Override
override :update_etag_cache
def update_etag_cache(pipeline, store)
super
triggered_by = pipeline.triggered_by_pipeline
store.touch(project_pipeline_path(triggered_by.project, triggered_by)) if triggered_by
pipeline.triggered_pipelines.each do |triggered|
store.touch(project_pipeline_path(triggered.project, triggered))
end
end
end
end
end
...@@ -369,18 +369,6 @@ RSpec.describe Ci::Pipeline do ...@@ -369,18 +369,6 @@ RSpec.describe Ci::Pipeline do
end end
end end
context 'when pipeline is web terminal triggered' do
before do
pipeline.source = 'webide'
end
it 'does not schedule the pipeline cache worker' do
expect(ExpirePipelineCacheWorker).not_to receive(:perform_async)
pipeline.cancel!
end
end
context 'when pipeline project has downstream subscriptions' do context 'when pipeline project has downstream subscriptions' do
let(:pipeline) { create(:ci_empty_pipeline, project: create(:project, :public)) } let(:pipeline) { create(:ci_empty_pipeline, project: create(:project, :public)) }
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Ci::ExpirePipelineCacheService do
let(:pipeline) { create(:ci_empty_pipeline) }
subject { described_class.new }
describe '#perform' do
context 'when pipeline is triggered by other pipeline' do
let(:source) { create(:ci_sources_pipeline, pipeline: pipeline) }
it 'updates the cache of dependent pipeline' do
dependent_pipeline_path = "/#{source.source_project.full_path}/-/pipelines/#{source.source_pipeline.id}.json"
allow_any_instance_of(Gitlab::EtagCaching::Store).to receive(:touch)
expect_any_instance_of(Gitlab::EtagCaching::Store).to receive(:touch).with(dependent_pipeline_path)
subject.execute(pipeline)
end
end
context 'when pipeline triggered other pipeline' do
let(:build) { create(:ci_build, pipeline: pipeline) }
let(:source) { create(:ci_sources_pipeline, source_job: build) }
it 'updates the cache of dependent pipeline' do
dependent_pipeline_path = "/#{source.project.full_path}/-/pipelines/#{source.pipeline.id}.json"
allow_any_instance_of(Gitlab::EtagCaching::Store).to receive(:touch)
expect_any_instance_of(Gitlab::EtagCaching::Store).to receive(:touch).with(dependent_pipeline_path)
subject.execute(pipeline)
end
end
end
end
...@@ -1449,28 +1449,10 @@ RSpec.describe Ci::Pipeline, :mailer, factory_default: :keep do ...@@ -1449,28 +1449,10 @@ RSpec.describe Ci::Pipeline, :mailer, factory_default: :keep do
end end
describe 'pipeline caching' do describe 'pipeline caching' do
context 'if pipeline is cacheable' do it 'performs ExpirePipelinesCacheWorker' do
before do expect(ExpirePipelineCacheWorker).to receive(:perform_async).with(pipeline.id)
pipeline.source = 'push'
end
it 'performs ExpirePipelinesCacheWorker' do
expect(ExpirePipelineCacheWorker).to receive(:perform_async).with(pipeline.id)
pipeline.cancel
end
end
context 'if pipeline is not cacheable' do pipeline.cancel
before do
pipeline.source = 'webide'
end
it 'deos not perform ExpirePipelinesCacheWorker' do
expect(ExpirePipelineCacheWorker).not_to receive(:perform_async)
pipeline.cancel
end
end end
end end
......
...@@ -63,5 +63,36 @@ RSpec.describe Ci::ExpirePipelineCacheService do ...@@ -63,5 +63,36 @@ RSpec.describe Ci::ExpirePipelineCacheService do
expect(Project.find(project_with_repo.id).pipeline_status.has_status?).to be_falsey expect(Project.find(project_with_repo.id).pipeline_status.has_status?).to be_falsey
end end
end end
context 'when the pipeline is triggered by another pipeline' do
let(:source) { create(:ci_sources_pipeline, pipeline: pipeline) }
it 'updates the cache of dependent pipeline' do
dependent_pipeline_path = "/#{source.source_project.full_path}/-/pipelines/#{source.source_pipeline.id}.json"
expect_next_instance_of(Gitlab::EtagCaching::Store) do |store|
allow(store).to receive(:touch)
expect(store).to receive(:touch).with(dependent_pipeline_path)
end
subject.execute(pipeline)
end
end
context 'when the pipeline triggered another pipeline' do
let(:build) { create(:ci_build, pipeline: pipeline) }
let(:source) { create(:ci_sources_pipeline, source_job: build) }
it 'updates the cache of dependent pipeline' do
dependent_pipeline_path = "/#{source.project.full_path}/-/pipelines/#{source.pipeline.id}.json"
expect_next_instance_of(Gitlab::EtagCaching::Store) do |store|
allow(store).to receive(:touch)
expect(store).to receive(:touch).with(dependent_pipeline_path)
end
subject.execute(pipeline)
end
end
end end
end end
...@@ -13,7 +13,6 @@ RSpec.describe ExpireJobCacheWorker do ...@@ -13,7 +13,6 @@ RSpec.describe ExpireJobCacheWorker do
include_examples 'an idempotent worker' do include_examples 'an idempotent worker' do
it 'invalidates Etag caching for the job path' do it 'invalidates Etag caching for the job path' do
pipeline_path = "/#{project.full_path}/-/pipelines/#{pipeline.id}.json"
job_path = "/#{project.full_path}/builds/#{job.id}.json" job_path = "/#{project.full_path}/builds/#{job.id}.json"
spy_store = Gitlab::EtagCaching::Store.new spy_store = Gitlab::EtagCaching::Store.new
...@@ -22,13 +21,12 @@ RSpec.describe ExpireJobCacheWorker do ...@@ -22,13 +21,12 @@ RSpec.describe ExpireJobCacheWorker do
expect(spy_store).to receive(:touch) expect(spy_store).to receive(:touch)
.exactly(worker_exec_times).times .exactly(worker_exec_times).times
.with(pipeline_path) .with(job_path)
.and_call_original .and_call_original
expect(spy_store).to receive(:touch) expect(ExpirePipelineCacheWorker).to receive(:perform_async)
.with(pipeline.id)
.exactly(worker_exec_times).times .exactly(worker_exec_times).times
.with(job_path)
.and_call_original
subject subject
end end
......
...@@ -25,15 +25,6 @@ RSpec.describe ExpirePipelineCacheWorker do ...@@ -25,15 +25,6 @@ RSpec.describe ExpirePipelineCacheWorker do
subject.perform(617748) subject.perform(617748)
end end
it "doesn't do anything if the pipeline cannot be cached" do
allow_any_instance_of(Ci::Pipeline).to receive(:cacheable?).and_return(false)
expect_any_instance_of(Ci::ExpirePipelineCacheService).not_to receive(:execute)
expect_any_instance_of(Gitlab::EtagCaching::Store).not_to receive(:touch)
subject.perform(pipeline.id)
end
it_behaves_like 'an idempotent worker' do it_behaves_like 'an idempotent worker' do
let(:job_args) { [pipeline.id] } let(:job_args) { [pipeline.id] }
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment