Commit 31a1ce21 authored by Kamil Trzciński's avatar Kamil Trzciński

Lock writes to trace stream

parent 2e3dab38
...@@ -76,7 +76,7 @@ module Ci ...@@ -76,7 +76,7 @@ module Ci
raise ArgumentError, 'Offset is out of range' if offset > size || offset < 0 raise ArgumentError, 'Offset is out of range' if offset > size || offset < 0
raise ArgumentError, 'Chunk size overflow' if CHUNK_SIZE < (offset + new_data.bytesize) raise ArgumentError, 'Chunk size overflow' if CHUNK_SIZE < (offset + new_data.bytesize)
in_lock(*lock_params) do # Write opetation is atomic in_lock(*lock_params) do # Write operation is atomic
unsafe_set_data!(data.byteslice(0, offset) + new_data) unsafe_set_data!(data.byteslice(0, offset) + new_data)
end end
...@@ -100,7 +100,7 @@ module Ci ...@@ -100,7 +100,7 @@ module Ci
end end
def persist_data! def persist_data!
in_lock(*lock_params) do # Write opetation is atomic in_lock(*lock_params) do # Write operation is atomic
unsafe_persist_to!(self.class.persistable_store) unsafe_persist_to!(self.class.persistable_store)
end end
end end
......
---
title: Lock writes to trace stream
merge_request:
author:
type: fixed
...@@ -50,6 +50,10 @@ module API ...@@ -50,6 +50,10 @@ module API
rack_response({ 'message' => '404 Not found' }.to_json, 404) rack_response({ 'message' => '404 Not found' }.to_json, 404)
end end
rescue_from ::Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError do
rack_response({ 'message' => '409 Conflict: Resource lock' }.to_json, 409)
end
rescue_from UploadedFile::InvalidPathError do |e| rescue_from UploadedFile::InvalidPathError do |e|
rack_response({ 'message' => e.message }.to_json, 400) rack_response({ 'message' => e.message }.to_json, 400)
end end
......
...@@ -3,9 +3,11 @@ ...@@ -3,9 +3,11 @@
module Gitlab module Gitlab
module Ci module Ci
class Trace class Trace
include ExclusiveLeaseGuard include ::Gitlab::ExclusiveLeaseHelpers
LEASE_TIMEOUT = 1.hour LOCK_TTL = 1.minute
LOCK_RETRIES = 2
LOCK_SLEEP = 0.001.seconds
ArchiveError = Class.new(StandardError) ArchiveError = Class.new(StandardError)
AlreadyArchivedError = Class.new(StandardError) AlreadyArchivedError = Class.new(StandardError)
...@@ -82,26 +84,12 @@ module Gitlab ...@@ -82,26 +84,12 @@ module Gitlab
stream&.close stream&.close
end end
def write(mode) def write(mode, &blk)
stream = Gitlab::Ci::Trace::Stream.new do in_write_lock do
if trace_artifact unsafe_write!(mode, &blk)
raise AlreadyArchivedError, 'Could not write to the archived trace'
elsif current_path
File.open(current_path, mode)
elsif Feature.enabled?('ci_enable_live_trace')
Gitlab::Ci::Trace::ChunkedIO.new(job)
else
File.open(ensure_path, mode)
end end
end end
yield(stream).tap do
job.touch if job.needs_touch?
end
ensure
stream&.close
end
def erase! def erase!
## ##
# Erase the archived trace # Erase the archived trace
...@@ -117,13 +105,33 @@ module Gitlab ...@@ -117,13 +105,33 @@ module Gitlab
end end
def archive! def archive!
try_obtain_lease do in_write_lock do
unsafe_archive! unsafe_archive!
end end
end end
private private
def unsafe_write!(mode, &blk)
stream = Gitlab::Ci::Trace::Stream.new do
if trace_artifact
raise AlreadyArchivedError, 'Could not write to the archived trace'
elsif current_path
File.open(current_path, mode)
elsif Feature.enabled?('ci_enable_live_trace')
Gitlab::Ci::Trace::ChunkedIO.new(job)
else
File.open(ensure_path, mode)
end
end
yield(stream).tap do
job.touch if job.needs_touch?
end
ensure
stream&.close
end
def unsafe_archive! def unsafe_archive!
raise AlreadyArchivedError, 'Could not archive again' if trace_artifact raise AlreadyArchivedError, 'Could not archive again' if trace_artifact
raise ArchiveError, 'Job is not finished yet' unless job.complete? raise ArchiveError, 'Job is not finished yet' unless job.complete?
...@@ -146,6 +154,11 @@ module Gitlab ...@@ -146,6 +154,11 @@ module Gitlab
end end
end end
def in_write_lock(&blk)
lock_key = "trace:write:lock:#{job.id}"
in_lock(lock_key, ttl: LOCK_TTL, retries: LOCK_RETRIES, sleep_sec: LOCK_SLEEP, &blk)
end
def archive_stream!(stream) def archive_stream!(stream)
clone_file!(stream, JobArtifactUploader.workhorse_upload_path) do |clone_path| clone_file!(stream, JobArtifactUploader.workhorse_upload_path) do |clone_path|
create_build_trace!(job, clone_path) create_build_trace!(job, clone_path)
...@@ -226,16 +239,6 @@ module Gitlab ...@@ -226,16 +239,6 @@ module Gitlab
def trace_artifact def trace_artifact
job.job_artifacts_trace job.job_artifacts_trace
end end
# For ExclusiveLeaseGuard concern
def lease_key
@lease_key ||= "trace:archive:#{job.id}"
end
# For ExclusiveLeaseGuard concern
def lease_timeout
LEASE_TIMEOUT
end
end end
end end
end end
...@@ -43,19 +43,14 @@ module Gitlab ...@@ -43,19 +43,14 @@ module Gitlab
def append(data, offset) def append(data, offset)
data = data.force_encoding(Encoding::BINARY) data = data.force_encoding(Encoding::BINARY)
stream.truncate(offset) stream.seek(offset, IO::SEEK_SET)
stream.seek(0, IO::SEEK_END)
stream.write(data) stream.write(data)
stream.truncate(offset + data.bytesize)
stream.flush() stream.flush()
end end
def set(data) def set(data)
data = data.force_encoding(Encoding::BINARY) append(data, 0)
stream.seek(0, IO::SEEK_SET)
stream.write(data)
stream.truncate(data.bytesize)
stream.flush()
end end
def raw(last_lines: nil) def raw(last_lines: nil)
......
...@@ -12,6 +12,8 @@ module Gitlab ...@@ -12,6 +12,8 @@ module Gitlab
# because it holds the connection until all `retries` is consumed. # because it holds the connection until all `retries` is consumed.
# This could potentially eat up all connection pools. # This could potentially eat up all connection pools.
def in_lock(key, ttl: 1.minute, retries: 10, sleep_sec: 0.01.seconds) def in_lock(key, ttl: 1.minute, retries: 10, sleep_sec: 0.01.seconds)
raise ArgumentError, 'Key needs to be specified' unless key
lease = Gitlab::ExclusiveLease.new(key, timeout: ttl) lease = Gitlab::ExclusiveLease.new(key, timeout: ttl)
until uuid = lease.try_obtain until uuid = lease.try_obtain
......
...@@ -257,7 +257,8 @@ describe Gitlab::Ci::Trace::Stream, :clean_gitlab_redis_cache do ...@@ -257,7 +257,8 @@ describe Gitlab::Ci::Trace::Stream, :clean_gitlab_redis_cache do
let!(:last_result) { stream.html_with_state } let!(:last_result) { stream.html_with_state }
before do before do
stream.append("5678", 4) data_stream.seek(4, IO::SEEK_SET)
data_stream.write("5678")
stream.seek(0) stream.seek(0)
end end
...@@ -271,23 +272,27 @@ describe Gitlab::Ci::Trace::Stream, :clean_gitlab_redis_cache do ...@@ -271,23 +272,27 @@ describe Gitlab::Ci::Trace::Stream, :clean_gitlab_redis_cache do
end end
context 'when stream is StringIO' do context 'when stream is StringIO' do
let(:stream) do let(:data_stream) do
described_class.new do
StringIO.new("1234") StringIO.new("1234")
end end
let(:stream) do
described_class.new { data_stream }
end end
it_behaves_like 'html_with_states' it_behaves_like 'html_with_states'
end end
context 'when stream is ChunkedIO' do context 'when stream is ChunkedIO' do
let(:stream) do let(:data_stream) do
described_class.new do
Gitlab::Ci::Trace::ChunkedIO.new(build).tap do |chunked_io| Gitlab::Ci::Trace::ChunkedIO.new(build).tap do |chunked_io|
chunked_io.write("1234") chunked_io.write("1234")
chunked_io.seek(0, IO::SEEK_SET) chunked_io.seek(0, IO::SEEK_SET)
end end
end end
let(:stream) do
described_class.new { data_stream }
end end
it_behaves_like 'html_with_states' it_behaves_like 'html_with_states'
......
...@@ -11,6 +11,14 @@ describe Gitlab::ExclusiveLeaseHelpers, :clean_gitlab_redis_shared_state do ...@@ -11,6 +11,14 @@ describe Gitlab::ExclusiveLeaseHelpers, :clean_gitlab_redis_shared_state do
let(:options) { {} } let(:options) { {} }
context 'when unique key is not set' do
let(:unique_key) { }
it 'raises an error' do
expect { subject }.to raise_error ArgumentError
end
end
context 'when the lease is not obtained yet' do context 'when the lease is not obtained yet' do
before do before do
stub_exclusive_lease(unique_key, 'uuid') stub_exclusive_lease(unique_key, 'uuid')
......
...@@ -830,6 +830,18 @@ describe API::Runner, :clean_gitlab_redis_shared_state do ...@@ -830,6 +830,18 @@ describe API::Runner, :clean_gitlab_redis_shared_state do
expect(job.trace.raw).to eq 'BUILD TRACE UPDATED' expect(job.trace.raw).to eq 'BUILD TRACE UPDATED'
expect(job.job_artifacts_trace.open.read).to eq 'BUILD TRACE UPDATED' expect(job.job_artifacts_trace.open.read).to eq 'BUILD TRACE UPDATED'
end end
context 'when concurrent update of trace is happening' do
before do
job.trace.write('wb') do
update_job(state: 'success', trace: 'BUILD TRACE UPDATED')
end
end
it 'returns that operation conflicts' do
expect(response.status).to eq(409)
end
end
end end
context 'when no trace is given' do context 'when no trace is given' do
...@@ -1022,6 +1034,18 @@ describe API::Runner, :clean_gitlab_redis_shared_state do ...@@ -1022,6 +1034,18 @@ describe API::Runner, :clean_gitlab_redis_shared_state do
end end
end end
context 'when concurrent update of trace is happening' do
before do
job.trace.write('wb') do
patch_the_trace
end
end
it 'returns that operation conflicts' do
expect(response.status).to eq(409)
end
end
context 'when the job is canceled' do context 'when the job is canceled' do
before do before do
job.cancel job.cancel
......
...@@ -272,16 +272,11 @@ shared_examples_for 'common trace features' do ...@@ -272,16 +272,11 @@ shared_examples_for 'common trace features' do
include ExclusiveLeaseHelpers include ExclusiveLeaseHelpers
before do before do
stub_exclusive_lease_taken("trace:archive:#{trace.job.id}", timeout: 1.hour) stub_exclusive_lease_taken("trace:write:lock:#{trace.job.id}", timeout: 1.minute)
end end
it 'blocks concurrent archiving' do it 'blocks concurrent archiving' do
expect(Rails.logger).to receive(:error).with('Cannot obtain an exclusive lease. There must be another instance already in execution.') expect { subject }.to raise_error(::Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError)
subject
build.reload
expect(build.job_artifacts_trace).to be_nil
end end
end end
end end
......
...@@ -5,7 +5,7 @@ describe StuckCiJobsWorker do ...@@ -5,7 +5,7 @@ describe StuckCiJobsWorker do
let!(:runner) { create :ci_runner } let!(:runner) { create :ci_runner }
let!(:job) { create :ci_build, runner: runner } let!(:job) { create :ci_build, runner: runner }
let(:trace_lease_key) { "trace:archive:#{job.id}" } let(:trace_lease_key) { "trace:write:lock:#{job.id}" }
let(:trace_lease_uuid) { SecureRandom.uuid } let(:trace_lease_uuid) { SecureRandom.uuid }
let(:worker_lease_key) { StuckCiJobsWorker::EXCLUSIVE_LEASE_KEY } let(:worker_lease_key) { StuckCiJobsWorker::EXCLUSIVE_LEASE_KEY }
let(:worker_lease_uuid) { SecureRandom.uuid } let(:worker_lease_uuid) { SecureRandom.uuid }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment