Commit 7d9f4319 authored by Grzegorz Bizon's avatar Grzegorz Bizon

Use optimistic locking to safely migrate a build trace chunk

This commit adds support for using optimistic locking on the database
level when migrating a build trace chunk to a persistent store. It
allows us to safely migrate data to the object storage or database while
avoiding certain race conditions related to chunks migration.
parent 69833f36
...@@ -7,6 +7,7 @@ module Ci ...@@ -7,6 +7,7 @@ module Ci
include ::FastDestroyAll include ::FastDestroyAll
include ::Checksummable include ::Checksummable
include ::Gitlab::ExclusiveLeaseHelpers include ::Gitlab::ExclusiveLeaseHelpers
include ::Gitlab::OptimisticLocking
belongs_to :build, class_name: "Ci::Build", foreign_key: :build_id belongs_to :build, class_name: "Ci::Build", foreign_key: :build_id
...@@ -116,12 +117,8 @@ module Ci ...@@ -116,12 +117,8 @@ module Ci
(start_offset...end_offset) (start_offset...end_offset)
end end
def persist_data!
in_lock(*lock_params) { unsafe_persist_data! }
end
def schedule_to_persist! def schedule_to_persist!
return if persisted? return if flushed?
Ci::BuildTraceChunkFlushWorker.perform_async(id) Ci::BuildTraceChunkFlushWorker.perform_async(id)
end end
...@@ -131,13 +128,28 @@ module Ci ...@@ -131,13 +128,28 @@ module Ci
# happen that a chunk gets migrated after being loaded by another worker # happen that a chunk gets migrated after being loaded by another worker
# but before the worker acquires a lock to perform the migration. # but before the worker acquires a lock to perform the migration.
# #
# We want to reset a chunk in that case and retry migration. If it fails # We are using Redis locking to ensure that we perform this operation
# again, we want to re-raise the exception. # inside an exclusive lock, but this does not prevent us from running into
# race conditions related to updating a model representation in the
# database. Optimistic locking is another mechanism that help here.
# #
def flush! # We are using optimistic locking combined with Redis locking to ensure
persist_data! # that a chunk gets migrated properly.
rescue FailedToPersistDataError #
self.reset.persist_data! def persist_data!
in_lock(*lock_params) do # exclusive Redis lock is acquired first
self.reset.then do |chunk| # we ensure having latest lock_version
chunk.unsafe_persist_data! # we migrate the data and update data store
end
end
rescue ActiveRecord::StaleObjectError
raise FailedToPersistDataError, <<~MSG
data migration race condition detected
store: #{data_store}
build: #{build.id}
index: #{chunk_index}
MSG
end end
## ##
...@@ -149,10 +161,14 @@ module Ci ...@@ -149,10 +161,14 @@ module Ci
build.pending_state.present? && chunks_max_index == chunk_index build.pending_state.present? && chunks_max_index == chunk_index
end end
def persisted? def flushed?
!redis? !redis?
end end
def migrated?
flushed?
end
def live? def live?
redis? redis?
end end
...@@ -163,7 +179,7 @@ module Ci ...@@ -163,7 +179,7 @@ module Ci
self.chunk_index <=> other.chunk_index self.chunk_index <=> other.chunk_index
end end
private protected
def get_data def get_data
# Redis / database return UTF-8 encoded string by default # Redis / database return UTF-8 encoded string by default
...@@ -182,7 +198,7 @@ module Ci ...@@ -182,7 +198,7 @@ module Ci
data is not fulfilled in a bucket data is not fulfilled in a bucket
size: #{current_size} size: #{current_size}
state: #{build.pending_state.present?} state: #{pending_state?}
max: #{chunks_max_index} max: #{chunks_max_index}
index: #{chunk_index} index: #{chunk_index}
MSG MSG
...@@ -239,6 +255,12 @@ module Ci ...@@ -239,6 +255,12 @@ module Ci
size == CHUNK_SIZE size == CHUNK_SIZE
end end
private
def pending_state?
build.pending_state.present?
end
def current_store def current_store
self.class.get_store_class(data_store) self.class.get_store_class(data_store)
end end
......
...@@ -8,8 +8,10 @@ module Ci ...@@ -8,8 +8,10 @@ module Ci
idempotent! idempotent!
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def perform(chunk_id) def perform(id)
::Ci::BuildTraceChunk.find_by(id: chunk_id).try(&:flush!) ::Ci::BuildTraceChunk.find_by(id: id).try do |chunk|
chunk.persist_data!
end
end end
# rubocop: enable CodeReuse/ActiveRecord # rubocop: enable CodeReuse/ActiveRecord
end end
......
---
title: Use optimistic locking to safely migrate a build trace chunk
merge_request: 44588
author:
type: fixed
# frozen_string_literal: true
class AddLockVersionToCiBuildTraceChunk < ActiveRecord::Migration[6.0]
DOWNTIME = false
def change
add_column :ci_build_trace_chunks, :lock_version, :integer, default: 0, null: false
end
end
761cad9a584d98e3086e716f7a5c1d9b4aba87b084efcfcee7272cfdf1179372
\ No newline at end of file
...@@ -9916,7 +9916,8 @@ CREATE TABLE ci_build_trace_chunks ( ...@@ -9916,7 +9916,8 @@ CREATE TABLE ci_build_trace_chunks (
chunk_index integer NOT NULL, chunk_index integer NOT NULL,
data_store integer NOT NULL, data_store integer NOT NULL,
raw_data bytea, raw_data bytea,
checksum bytea checksum bytea,
lock_version integer DEFAULT 0 NOT NULL
); );
CREATE SEQUENCE ci_build_trace_chunks_id_seq CREATE SEQUENCE ci_build_trace_chunks_id_seq
......
...@@ -502,6 +502,10 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do ...@@ -502,6 +502,10 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
describe '#persist_data!' do describe '#persist_data!' do
let(:build) { create(:ci_build, :running) } let(:build) { create(:ci_build, :running) }
before do
build_trace_chunk.save!
end
subject { build_trace_chunk.persist_data! } subject { build_trace_chunk.persist_data! }
shared_examples_for 'Atomic operation' do shared_examples_for 'Atomic operation' do
...@@ -575,6 +579,25 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do ...@@ -575,6 +579,25 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
expect(build_trace_chunk.fog?).to be_truthy expect(build_trace_chunk.fog?).to be_truthy
end end
end end
context 'when the chunk has been modifed by a different worker' do
it 'reloads the chunk before migration' do
described_class
.find(build_trace_chunk.id)
.update!(data_store: :fog)
build_trace_chunk.persist_data!
end
it 'verifies the operation using optimistic locking' do
allow(build_trace_chunk)
.to receive(:save!)
.and_raise(ActiveRecord::StaleObjectError)
expect { build_trace_chunk.persist_data! }
.to raise_error(described_class::FailedToPersistDataError)
end
end
end end
end end
...@@ -780,51 +803,6 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do ...@@ -780,51 +803,6 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
end end
end end
describe '#flush!' do
context 'when chunk can be flushed without problems' do
before do
allow(build_trace_chunk).to receive(:persist_data!)
end
it 'completes migration successfully' do
expect { build_trace_chunk.flush! }.not_to raise_error
end
end
context 'when the flush operation fails at first' do
it 'retries reloads the chunk' do
expect(build_trace_chunk)
.to receive(:persist_data!)
.and_raise(described_class::FailedToPersistDataError)
.ordered
expect(build_trace_chunk).to receive(:reset)
.and_return(build_trace_chunk)
.ordered
expect(build_trace_chunk)
.to receive(:persist_data!)
.ordered
build_trace_chunk.flush!
end
end
context 'when the flush constatly fails' do
before do
allow(build_trace_chunk)
.to receive(:persist_data!)
.and_raise(described_class::FailedToPersistDataError)
end
it 'attems to reset the chunk but eventually fails too' do
expect(build_trace_chunk).to receive(:reset)
.and_return(build_trace_chunk)
expect { build_trace_chunk.flush! }
.to raise_error(described_class::FailedToPersistDataError)
end
end
end
describe 'comparable build trace chunks' do describe 'comparable build trace chunks' do
describe '#<=>' do describe '#<=>' do
context 'when chunks are associated with different builds' do context 'when chunks are associated with different builds' do
......
...@@ -14,7 +14,7 @@ RSpec.describe Ci::BuildTraceChunkFlushWorker do ...@@ -14,7 +14,7 @@ RSpec.describe Ci::BuildTraceChunkFlushWorker do
described_class.new.perform(chunk.id) described_class.new.perform(chunk.id)
expect(chunk.reload).to be_persisted expect(chunk.reload).to be_migrated
end end
describe '#perform' do describe '#perform' do
...@@ -24,7 +24,7 @@ RSpec.describe Ci::BuildTraceChunkFlushWorker do ...@@ -24,7 +24,7 @@ RSpec.describe Ci::BuildTraceChunkFlushWorker do
it 'migrates build trace chunk to a safe store' do it 'migrates build trace chunk to a safe store' do
subject subject
expect(chunk.reload).to be_persisted expect(chunk.reload).to be_migrated
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment