Commit 3a2ba7b1 authored by Stan Hu's avatar Stan Hu Committed by Fabio Pitino

Fix missing pipeline e-mails when job logs moved to object storage

As discussed in https://gitlab.com/gitlab-org/gitlab/-/issues/195430,
it's possible that a race condition occurs when a build finishes and a
failed pipeline email goes out. This is what was happening before:

1. Build finishes and causes a pipeline failure. The pipeline failure
transition causes the pipeline to run `PipelineNotificationWorker`,
which schedules an `ActiveJob` to e-mail a user with a failed pipeline
ID.

2. `BuildFinishedWorker` schedules `ArchiveTraceWorker` to move the job
log, stored locally on a filesystem, to object storage.

3. The `ActiveJob` runs and loads the pipeline and failed build
logs. Some builds have been moved to object storage, but the `ActiveJob`
has a stale record and attempts to load the file from the filesystem.

4. The file doesn't exist, an `Errno::ENOENT` exception is raised.

To fix this, we now attempt a refresh the job from the database if we
encounter this exception.
parent 2e7091e2
---
title: Fix missing pipeline e-mails when job logs moved to object storage
merge_request: 40075
author:
type: fixed
......@@ -79,22 +79,15 @@ module Gitlab
job.trace_chunks.any? || current_path.present? || old_trace.present?
end
def read
stream = Gitlab::Ci::Trace::Stream.new do
if trace_artifact
trace_artifact.open
elsif job.trace_chunks.any?
Gitlab::Ci::Trace::ChunkedIO.new(job)
elsif current_path
File.open(current_path, "rb")
elsif old_trace
StringIO.new(old_trace)
end
end
def read(&block)
should_retry = true if lock_taken?(lock_key)
yield stream
ensure
stream&.close
read_stream(&block)
rescue Errno::ENOENT
raise unless should_retry
job.reset
read_stream(&block)
end
def write(mode, &blk)
......@@ -141,6 +134,24 @@ module Gitlab
private
def read_stream
stream = Gitlab::Ci::Trace::Stream.new do
if trace_artifact
trace_artifact.open
elsif job.trace_chunks.any?
Gitlab::Ci::Trace::ChunkedIO.new(job)
elsif current_path
File.open(current_path, "rb")
elsif old_trace
StringIO.new(old_trace)
end
end
yield stream
ensure
stream&.close
end
def unsafe_write!(mode, &blk)
stream = Gitlab::Ci::Trace::Stream.new do
if trace_artifact
......@@ -184,10 +195,13 @@ module Gitlab
end
def in_write_lock(&blk)
lock_key = "trace:write:lock:#{job.id}"
in_lock(lock_key, ttl: LOCK_TTL, retries: LOCK_RETRIES, sleep_sec: LOCK_SLEEP, &blk)
end
def lock_key
"trace:write:lock:#{job.id}"
end
def archive_stream!(stream)
clone_file!(stream, JobArtifactUploader.workhorse_upload_path) do |clone_path|
create_build_trace!(job, clone_path)
......
......@@ -39,5 +39,10 @@ module Gitlab
ensure
lease&.cancel
end
def lock_taken?(key)
lease = Gitlab::ExclusiveLease.new(key, timeout: 0)
lease.exists?
end
end
end
......@@ -11,6 +11,41 @@ RSpec.describe Gitlab::Ci::Trace, :clean_gitlab_redis_shared_state do
it { expect(trace).to delegate_method(:old_trace).to(:job) }
end
context 'when trace is migrated to object storage' do
let!(:job) { create(:ci_build, :trace_artifact) }
let!(:artifact1) { job.job_artifacts_trace }
let!(:artifact2) { job.reload.job_artifacts_trace }
let(:test_data) { "hello world" }
before do
stub_artifacts_object_storage
artifact1.file.migrate!(ObjectStorage::Store::REMOTE)
end
context 'when write lock is not present' do
it 'raises an exception' do
expect { artifact2.job.trace.raw }.to raise_error(Errno::ENOENT)
end
end
context 'when write lock is present', :clean_gitlab_redis_shared_state do
before do
Gitlab::ExclusiveLease.new("trace:write:lock:#{job.id}", timeout: 10.seconds).try_obtain
end
it 'reloads the trace after is it migrated' do
stub_const('Gitlab::HttpIO::BUFFER_SIZE', test_data.length)
expect_next_instance_of(Gitlab::HttpIO) do |http_io|
expect(http_io).to receive(:get_chunk).and_return(test_data, "")
end
expect(artifact2.job.trace.raw).to eq(test_data)
end
end
end
context 'when live trace feature is disabled' do
before do
stub_feature_flags(ci_enable_live_trace: false)
......
......@@ -111,4 +111,14 @@ RSpec.describe Gitlab::ExclusiveLeaseHelpers, :clean_gitlab_redis_shared_state d
end
end
end
describe '#lock_taken?' do
it 'returns true when lock has been taken' do
expect(class_instance.lock_taken?(unique_key)).to be false
class_instance.in_lock(unique_key) do
expect(class_instance.lock_taken?(unique_key)).to be true
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment