Commit 9c720a98 authored by Grzegorz Bizon's avatar Grzegorz Bizon

Merge branch 'build-chunks-on-object-storage' into 'master'

Allow to store BuildTraceChunks on Object Storage

Closes #45712

See merge request gitlab-org/gitlab-ce!19515
parents bf9fd9c3 21399fbc
module Ci
class BuildTraceChunk < ActiveRecord::Base
include FastDestroyAll
include ::Gitlab::ExclusiveLeaseHelpers
extend Gitlab::Ci::Model
belongs_to :build, class_name: "Ci::Build", foreign_key: :build_id
default_value_for :data_store, :redis
WriteError = Class.new(StandardError)
CHUNK_SIZE = 128.kilobytes
CHUNK_REDIS_TTL = 1.week
WRITE_LOCK_RETRY = 10
WRITE_LOCK_SLEEP = 0.01.seconds
WRITE_LOCK_TTL = 1.minute
# Note: The ordering of this enum is related to the precedence of persist store.
# The bottom item takes the higest precedence, and the top item takes the lowest precedence.
enum data_store: {
redis: 1,
db: 2
database: 2,
fog: 3
}
class << self
def redis_data_key(build_id, chunk_index)
"gitlab:ci:trace:#{build_id}:chunks:#{chunk_index}"
def all_stores
@all_stores ||= self.data_stores.keys
end
def redis_data_keys
redis.pluck(:build_id, :chunk_index).map do |data|
redis_data_key(data.first, data.second)
end
def persistable_store
# get first available store from the back of the list
all_stores.reverse.find { |store| get_store_class(store).available? }
end
def redis_delete_data(keys)
return if keys.empty?
Gitlab::Redis::SharedState.with do |redis|
redis.del(keys)
end
def get_store_class(store)
@stores ||= {}
@stores[store] ||= "Ci::BuildTraceChunks::#{store.capitalize}".constantize.new
end
##
# FastDestroyAll concerns
def begin_fast_destroy
redis_data_keys
all_stores.each_with_object({}) do |store, result|
relation = public_send(store) # rubocop:disable GitlabSecurity/PublicSend
keys = get_store_class(store).keys(relation)
result[store] = keys if keys.present?
end
end
##
# FastDestroyAll concerns
def finalize_fast_destroy(keys)
redis_delete_data(keys)
keys.each do |store, value|
get_store_class(store).delete_keys(value)
end
end
end
......@@ -66,10 +70,15 @@ module Ci
end
def append(new_data, offset)
raise ArgumentError, 'New data is missing' unless new_data
raise ArgumentError, 'Offset is out of range' if offset > size || offset < 0
raise ArgumentError, 'Chunk size overflow' if CHUNK_SIZE < (offset + new_data.bytesize)
set_data(data.byteslice(0, offset) + new_data)
in_lock(*lock_params) do # Write opetation is atomic
unsafe_set_data!(data.byteslice(0, offset) + new_data)
end
schedule_to_persist if full?
end
def size
......@@ -88,93 +97,63 @@ module Ci
(start_offset...end_offset)
end
def use_database!
in_lock do
break if db?
break unless size > 0
self.update!(raw_data: data, data_store: :db)
self.class.redis_delete_data([redis_data_key])
def persist_data!
in_lock(*lock_params) do # Write opetation is atomic
unsafe_persist_to!(self.class.persistable_store)
end
end
private
def get_data
if redis?
redis_data
elsif db?
raw_data
else
raise 'Unsupported data store'
end&.force_encoding(Encoding::BINARY) # Redis/Database return UTF-8 string as default
end
def set_data(value)
raise ArgumentError, 'too much data' if value.bytesize > CHUNK_SIZE
in_lock do
if redis?
redis_set_data(value)
elsif db?
self.raw_data = value
else
raise 'Unsupported data store'
end
def unsafe_persist_to!(new_store)
return if data_store == new_store.to_s
raise ArgumentError, 'Can not persist empty data' unless size > 0
@data = value
old_store_class = self.class.get_store_class(data_store)
save! if changed?
get_data.tap do |the_data|
self.raw_data = nil
self.data_store = new_store
unsafe_set_data!(the_data)
end
schedule_to_db if full?
end
def schedule_to_db
return if db?
Ci::BuildTraceChunkFlushWorker.perform_async(id)
old_store_class.delete_data(self)
end
def full?
size == CHUNK_SIZE
def get_data
self.class.get_store_class(data_store).data(self)&.force_encoding(Encoding::BINARY) # Redis/Database return UTF-8 string as default
rescue Excon::Error::NotFound
# If the data store is :fog and the file does not exist in the object storage, this method returns nil.
end
def redis_data
Gitlab::Redis::SharedState.with do |redis|
redis.get(redis_data_key)
end
end
def unsafe_set_data!(value)
raise ArgumentError, 'New data size exceeds chunk size' if value.bytesize > CHUNK_SIZE
def redis_set_data(data)
Gitlab::Redis::SharedState.with do |redis|
redis.set(redis_data_key, data, ex: CHUNK_REDIS_TTL)
end
end
self.class.get_store_class(data_store).set_data(self, value)
@data = value
def redis_data_key
self.class.redis_data_key(build_id, chunk_index)
save! if changed?
end
def in_lock
write_lock_key = "trace_write:#{build_id}:chunks:#{chunk_index}"
def schedule_to_persist
return if data_persisted?
lease = Gitlab::ExclusiveLease.new(write_lock_key, timeout: WRITE_LOCK_TTL)
retry_count = 0
Ci::BuildTraceChunkFlushWorker.perform_async(id)
end
until uuid = lease.try_obtain
# Keep trying until we obtain the lease. To prevent hammering Redis too
# much we'll wait for a bit between retries.
sleep(WRITE_LOCK_SLEEP)
break if WRITE_LOCK_RETRY < (retry_count += 1)
end
def data_persisted?
!redis?
end
raise WriteError, 'Failed to obtain write lock' unless uuid
def full?
size == CHUNK_SIZE
end
self.reload if self.persisted?
return yield
ensure
Gitlab::ExclusiveLease.cancel(write_lock_key, uuid)
def lock_params
["trace_write:#{build_id}:chunks:#{chunk_index}",
{ ttl: WRITE_LOCK_TTL,
retries: WRITE_LOCK_RETRY,
sleep_sec: WRITE_LOCK_SLEEP }]
end
end
end
module Ci
module BuildTraceChunks
class Database
def available?
true
end
def keys(relation)
[]
end
def delete_keys(keys)
# no-op
end
def data(model)
model.raw_data
end
def set_data(model, data)
model.raw_data = data
end
def delete_data(model)
model.update_columns(raw_data: nil) unless model.raw_data.nil?
end
end
end
end
module Ci
module BuildTraceChunks
class Fog
def available?
object_store.enabled
end
def data(model)
connection.get_object(bucket_name, key(model))[:body]
end
def set_data(model, data)
connection.put_object(bucket_name, key(model), data)
end
def delete_data(model)
delete_keys([[model.build_id, model.chunk_index]])
end
def keys(relation)
return [] unless available?
relation.pluck(:build_id, :chunk_index)
end
def delete_keys(keys)
keys.each do |key|
connection.delete_object(bucket_name, key_raw(*key))
end
end
private
def key(model)
key_raw(model.build_id, model.chunk_index)
end
def key_raw(build_id, chunk_index)
"tmp/builds/#{build_id.to_i}/chunks/#{chunk_index.to_i}.log"
end
def bucket_name
return unless available?
object_store.remote_directory
end
def connection
return unless available?
@connection ||= ::Fog::Storage.new(object_store.connection.to_hash.deep_symbolize_keys)
end
def object_store
Gitlab.config.artifacts.object_store
end
end
end
end
module Ci
module BuildTraceChunks
class Redis
CHUNK_REDIS_TTL = 1.week
def available?
true
end
def data(model)
Gitlab::Redis::SharedState.with do |redis|
redis.get(key(model))
end
end
def set_data(model, data)
Gitlab::Redis::SharedState.with do |redis|
redis.set(key(model), data, ex: CHUNK_REDIS_TTL)
end
end
def delete_data(model)
delete_keys([[model.build_id, model.chunk_index]])
end
def keys(relation)
relation.pluck(:build_id, :chunk_index)
end
def delete_keys(keys)
return if keys.empty?
keys = keys.map { |key| key_raw(*key) }
Gitlab::Redis::SharedState.with do |redis|
redis.del(keys)
end
end
private
def key(model)
key_raw(model.build_id, model.chunk_index)
end
def key_raw(build_id, chunk_index)
"gitlab:ci:trace:#{build_id.to_i}:chunks:#{chunk_index.to_i}"
end
end
end
end
......@@ -7,7 +7,7 @@ module Ci
def perform(build_trace_chunk_id)
::Ci::BuildTraceChunk.find_by(id: build_trace_chunk_id).try do |build_trace_chunk|
build_trace_chunk.use_database!
build_trace_chunk.persist_data!
end
end
end
......
---
title: Use object storage as the first class persistable store for new live trace
architecture
merge_request: 19515
author:
type: changed
......@@ -77,10 +77,10 @@ cloud-native, for example on Kubernetes.
The data flow is the same as described in the [data flow section](#data-flow)
with one change: _the stored path of the first two phases is different_. This new live
trace architecture stores chunks of traces in Redis and the database instead of
trace architecture stores chunks of traces in Redis and a persistent store (object storage or database) instead of
file storage. Redis is used as first-class storage, and it stores up-to 128KB
of data. Once the full chunk is sent, it is flushed to database. After a while,
the data in Redis and database will be archived to [object storage](#uploading-traces-to-object-storage).
of data. Once the full chunk is sent, it is flushed a persistent store, either object storage(temporary directory) or database.
After a while, the data in Redis and a persitent store will be archived to [object storage](#uploading-traces-to-object-storage).
The data are stored in the following Redis namespace: `Gitlab::Redis::SharedState`.
......@@ -89,11 +89,11 @@ Here is the detailed data flow:
1. GitLab Runner picks a job from GitLab
1. GitLab Runner sends a piece of trace to GitLab
1. GitLab appends the data to Redis
1. Once the data in Redis reach 128KB, the data is flushed to the database.
1. Once the data in Redis reach 128KB, the data is flushed to a persistent store (object storage or the database).
1. The above steps are repeated until the job is finished.
1. Once the job is finished, GitLab schedules a Sidekiq worker to archive the trace.
1. The Sidekiq worker archives the trace to object storage and cleans up the trace
in Redis and the database.
in Redis and a persistent store (object storage or the database).
### Enabling live trace
......
module Gitlab
# This module provides helper methods which are intregrated with GitLab::ExclusiveLease
module ExclusiveLeaseHelpers
FailedToObtainLockError = Class.new(StandardError)
##
# This helper method blocks a process/thread until the other process cancel the obrainted lease key.
#
# Note: It's basically discouraged to use this method in the unicorn's thread,
# because it holds the connection until all `retries` is consumed.
# This could potentially eat up all connection pools.
def in_lock(key, ttl: 1.minute, retries: 10, sleep_sec: 0.01.seconds)
lease = Gitlab::ExclusiveLease.new(key, timeout: ttl)
until uuid = lease.try_obtain
# Keep trying until we obtain the lease. To prevent hammering Redis too
# much we'll wait for a bit.
sleep(sleep_sec)
break if (retries -= 1) < 0
end
raise FailedToObtainLockError, 'Failed to obtain a lock' unless uuid
return yield
ensure
Gitlab::ExclusiveLease.cancel(key, uuid)
end
end
end
......@@ -3,5 +3,53 @@ FactoryBot.define do
build factory: :ci_build
chunk_index 0
data_store :redis
trait :redis_with_data do
data_store :redis
transient do
initial_data 'test data'
end
after(:create) do |build_trace_chunk, evaluator|
Ci::BuildTraceChunks::Redis.new.set_data(build_trace_chunk, evaluator.initial_data)
end
end
trait :redis_without_data do
data_store :redis
end
trait :database_with_data do
data_store :database
transient do
initial_data 'test data'
end
after(:build) do |build_trace_chunk, evaluator|
Ci::BuildTraceChunks::Database.new.set_data(build_trace_chunk, evaluator.initial_data)
end
end
trait :database_without_data do
data_store :database
end
trait :fog_with_data do
data_store :fog
transient do
initial_data 'test data'
end
after(:create) do |build_trace_chunk, evaluator|
Ci::BuildTraceChunks::Fog.new.set_data(build_trace_chunk, evaluator.initial_data)
end
end
trait :fog_without_data do
data_store :fog
end
end
end
require 'spec_helper'
describe Gitlab::ExclusiveLeaseHelpers, :clean_gitlab_redis_shared_state do
include ::ExclusiveLeaseHelpers
let(:class_instance) { (Class.new { include ::Gitlab::ExclusiveLeaseHelpers }).new }
let(:unique_key) { SecureRandom.hex(10) }
describe '#in_lock' do
subject { class_instance.in_lock(unique_key, **options) { } }
let(:options) { {} }
context 'when the lease is not obtained yet' do
before do
stub_exclusive_lease(unique_key, 'uuid')
end
it 'calls the given block' do
expect { |b| class_instance.in_lock(unique_key, &b) }.to yield_control.once
end
it 'calls the given block continuously' do
expect { |b| class_instance.in_lock(unique_key, &b) }.to yield_control.once
expect { |b| class_instance.in_lock(unique_key, &b) }.to yield_control.once
expect { |b| class_instance.in_lock(unique_key, &b) }.to yield_control.once
end
it 'cancels the exclusive lease after the block' do
expect_to_cancel_exclusive_lease(unique_key, 'uuid')
subject
end
end
context 'when the lease is obtained already' do
let!(:lease) { stub_exclusive_lease_taken(unique_key) }
it 'retries to obtain a lease and raises an error' do
expect(lease).to receive(:try_obtain).exactly(11).times
expect { subject }.to raise_error('Failed to obtain a lock')
end
context 'when ttl is specified' do
let(:options) { { ttl: 10.minutes } }
it 'receives the specified argument' do
expect(Gitlab::ExclusiveLease).to receive(:new).with(unique_key, { timeout: 10.minutes } )
expect { subject }.to raise_error('Failed to obtain a lock')
end
end
context 'when retry count is specified' do
let(:options) { { retries: 3 } }
it 'retries for the specified times' do
expect(lease).to receive(:try_obtain).exactly(4).times
expect { subject }.to raise_error('Failed to obtain a lock')
end
end
context 'when sleep second is specified' do
let(:options) { { retries: 0, sleep_sec: 0.05.seconds } }
it 'receives the specified argument' do
expect(class_instance).to receive(:sleep).with(0.05.seconds).once
expect { subject }.to raise_error('Failed to obtain a lock')
end
end
end
end
end
......@@ -14,6 +14,7 @@ describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
before do
stub_feature_flags(ci_enable_live_trace: true)
stub_artifacts_object_storage
end
context 'FastDestroyAll' do
......@@ -37,6 +38,22 @@ describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
end
end
describe '.all_stores' do
subject { described_class.all_stores }
it 'returns a correctly ordered array' do
is_expected.to eq(%w[redis database fog])
end
it 'returns redis store as the the lowest precedence' do
expect(subject.first).to eq('redis')
end
it 'returns fog store as the the highest precedence' do
expect(subject.last).to eq('fog')
end
end
describe '#data' do
subject { build_trace_chunk.data }
......@@ -44,181 +61,269 @@ describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
let(:data_store) { :redis }
before do
build_trace_chunk.send(:redis_set_data, 'Sample data in redis')
build_trace_chunk.send(:unsafe_set_data!, 'Sample data in redis')
end
it { is_expected.to eq('Sample data in redis') }
end
context 'when data_store is database' do
let(:data_store) { :db }
let(:raw_data) { 'Sample data in db' }
let(:data_store) { :database }
let(:raw_data) { 'Sample data in database' }
it { is_expected.to eq('Sample data in db') }
it { is_expected.to eq('Sample data in database') }
end
end
describe '#set_data' do
subject { build_trace_chunk.send(:set_data, value) }
let(:value) { 'Sample data' }
context 'when data_store is fog' do
let(:data_store) { :fog }
context 'when value bytesize is bigger than CHUNK_SIZE' do
let(:value) { 'a' * (described_class::CHUNK_SIZE + 1) }
before do
build_trace_chunk.send(:unsafe_set_data!, 'Sample data in fog')
end
it { expect { subject }.to raise_error('too much data') }
it { is_expected.to eq('Sample data in fog') }
end
end
context 'when data_store is redis' do
let(:data_store) { :redis }
describe '#append' do
subject { build_trace_chunk.append(new_data, offset) }
it do
expect(build_trace_chunk.send(:redis_data)).to be_nil
let(:new_data) { 'Sample new data' }
let(:offset) { 0 }
let(:merged_data) { data + new_data.to_s }
subject
shared_examples_for 'Appending correctly' do
context 'when offset is negative' do
let(:offset) { -1 }
it { expect { subject }.to raise_error('Offset is out of range') }
end
expect(build_trace_chunk.send(:redis_data)).to eq(value)
context 'when offset is bigger than data size' do
let(:offset) { data.bytesize + 1 }
it { expect { subject }.to raise_error('Offset is out of range') }
end
context 'when fullfilled chunk size' do
let(:value) { 'a' * described_class::CHUNK_SIZE }
context 'when new data overflows chunk size' do
let(:new_data) { 'a' * (described_class::CHUNK_SIZE + 1) }
it 'schedules stashing data' do
expect(Ci::BuildTraceChunkFlushWorker).to receive(:perform_async).once
it { expect { subject }.to raise_error('Chunk size overflow') }
end
context 'when offset is EOF' do
let(:offset) { data.bytesize }
it 'appends' do
subject
expect(build_trace_chunk.data).to eq(merged_data)
end
end
end
context 'when data_store is database' do
let(:data_store) { :db }
context 'when the other process is appending' do
let(:lease_key) { "trace_write:#{build_trace_chunk.build.id}:chunks:#{build_trace_chunk.chunk_index}" }
it 'sets data' do
expect(build_trace_chunk.raw_data).to be_nil
before do
stub_exclusive_lease_taken(lease_key)
end
subject
it 'raise an error' do
expect { subject }.to raise_error('Failed to obtain a lock')
end
end
expect(build_trace_chunk.raw_data).to eq(value)
expect(build_trace_chunk.persisted?).to be_truthy
end
context 'when new_data is nil' do
let(:new_data) { nil }
context 'when raw_data is not changed' do
it 'does not execute UPDATE' do
expect(build_trace_chunk.raw_data).to be_nil
build_trace_chunk.save!
it 'raises an error' do
expect { subject }.to raise_error('New data is missing')
end
end
# First set
expect(ActiveRecord::QueryRecorder.new { subject }.count).to be > 0
expect(build_trace_chunk.raw_data).to eq(value)
expect(build_trace_chunk.persisted?).to be_truthy
context 'when new_data is empty' do
let(:new_data) { '' }
# Second set
build_trace_chunk.reload
expect(ActiveRecord::QueryRecorder.new { subject }.count).to be(0)
it 'does not append' do
subject
expect(build_trace_chunk.data).to eq(data)
end
it 'does not execute UPDATE' do
ActiveRecord::QueryRecorder.new { subject }.log.map do |query|
expect(query).not_to include('UPDATE')
end
end
end
end
context 'when fullfilled chunk size' do
it 'does not schedule stashing data' do
expect(Ci::BuildTraceChunkFlushWorker).not_to receive(:perform_async)
context 'when offset is middle of datasize' do
let(:offset) { data.bytesize / 2 }
it 'appends' do
subject
expect(build_trace_chunk.data).to eq(data.byteslice(0, offset) + new_data)
end
end
end
end
describe '#truncate' do
subject { build_trace_chunk.truncate(offset) }
shared_examples_for 'Scheduling sidekiq worker to flush data to persist store' do
context 'when new data fullfilled chunk size' do
let(:new_data) { 'a' * described_class::CHUNK_SIZE }
shared_examples_for 'truncates' do
context 'when offset is negative' do
let(:offset) { -1 }
it 'schedules trace chunk flush worker' do
expect(Ci::BuildTraceChunkFlushWorker).to receive(:perform_async).once
it { expect { subject }.to raise_error('Offset is out of range') }
end
subject
end
context 'when offset is bigger than data size' do
let(:offset) { data.bytesize + 1 }
it 'migrates data to object storage' do
Sidekiq::Testing.inline! do
subject
it { expect { subject }.to raise_error('Offset is out of range') }
build_trace_chunk.reload
expect(build_trace_chunk.fog?).to be_truthy
expect(build_trace_chunk.data).to eq(new_data)
end
end
end
end
context 'when offset is 10' do
let(:offset) { 10 }
shared_examples_for 'Scheduling no sidekiq worker' do
context 'when new data fullfilled chunk size' do
let(:new_data) { 'a' * described_class::CHUNK_SIZE }
it 'does not schedule trace chunk flush worker' do
expect(Ci::BuildTraceChunkFlushWorker).not_to receive(:perform_async)
it 'truncates' do
subject
end
expect(build_trace_chunk.data).to eq(data.byteslice(0, offset))
it 'does not migrate data to object storage' do
Sidekiq::Testing.inline! do
data_store = build_trace_chunk.data_store
subject
build_trace_chunk.reload
expect(build_trace_chunk.data_store).to eq(data_store)
end
end
end
end
context 'when data_store is redis' do
let(:data_store) { :redis }
let(:data) { 'Sample data in redis' }
before do
build_trace_chunk.send(:redis_set_data, data)
context 'when there are no data' do
let(:data) { '' }
it 'has no data' do
expect(build_trace_chunk.data).to be_empty
end
it_behaves_like 'Appending correctly'
it_behaves_like 'Scheduling sidekiq worker to flush data to persist store'
end
it_behaves_like 'truncates'
end
context 'when there are some data' do
let(:data) { 'Sample data in redis' }
context 'when data_store is database' do
let(:data_store) { :db }
let(:raw_data) { 'Sample data in db' }
let(:data) { raw_data }
before do
build_trace_chunk.send(:unsafe_set_data!, data)
end
it_behaves_like 'truncates'
it 'has data' do
expect(build_trace_chunk.data).to eq(data)
end
it_behaves_like 'Appending correctly'
it_behaves_like 'Scheduling sidekiq worker to flush data to persist store'
end
end
end
describe '#append' do
subject { build_trace_chunk.append(new_data, offset) }
context 'when data_store is database' do
let(:data_store) { :database }
let(:new_data) { 'Sample new data' }
let(:offset) { 0 }
let(:total_data) { data + new_data }
context 'when there are no data' do
let(:data) { '' }
shared_examples_for 'appends' do
context 'when offset is negative' do
let(:offset) { -1 }
it 'has no data' do
expect(build_trace_chunk.data).to be_empty
end
it { expect { subject }.to raise_error('Offset is out of range') }
it_behaves_like 'Appending correctly'
it_behaves_like 'Scheduling no sidekiq worker'
end
context 'when offset is bigger than data size' do
let(:offset) { data.bytesize + 1 }
context 'when there are some data' do
let(:raw_data) { 'Sample data in database' }
let(:data) { raw_data }
it { expect { subject }.to raise_error('Offset is out of range') }
it 'has data' do
expect(build_trace_chunk.data).to eq(data)
end
it_behaves_like 'Appending correctly'
it_behaves_like 'Scheduling no sidekiq worker'
end
end
context 'when offset is bigger than data size' do
let(:new_data) { 'a' * (described_class::CHUNK_SIZE + 1) }
context 'when data_store is fog' do
let(:data_store) { :fog }
it { expect { subject }.to raise_error('Chunk size overflow') }
context 'when there are no data' do
let(:data) { '' }
it 'has no data' do
expect(build_trace_chunk.data).to be_empty
end
it_behaves_like 'Appending correctly'
it_behaves_like 'Scheduling no sidekiq worker'
end
context 'when offset is EOF' do
let(:offset) { data.bytesize }
context 'when there are some data' do
let(:data) { 'Sample data in fog' }
it 'appends' do
subject
before do
build_trace_chunk.send(:unsafe_set_data!, data)
end
expect(build_trace_chunk.data).to eq(total_data)
it 'has data' do
expect(build_trace_chunk.data).to eq(data)
end
it_behaves_like 'Appending correctly'
it_behaves_like 'Scheduling no sidekiq worker'
end
end
end
describe '#truncate' do
subject { build_trace_chunk.truncate(offset) }
shared_examples_for 'truncates' do
context 'when offset is negative' do
let(:offset) { -1 }
it { expect { subject }.to raise_error('Offset is out of range') }
end
context 'when offset is bigger than data size' do
let(:offset) { data.bytesize + 1 }
it { expect { subject }.to raise_error('Offset is out of range') }
end
context 'when offset is 10' do
let(:offset) { 10 }
it 'appends' do
it 'truncates' do
subject
expect(build_trace_chunk.data).to eq(data.byteslice(0, offset) + new_data)
expect(build_trace_chunk.data).to eq(data.byteslice(0, offset))
end
end
end
......@@ -228,18 +333,29 @@ describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
let(:data) { 'Sample data in redis' }
before do
build_trace_chunk.send(:redis_set_data, data)
build_trace_chunk.send(:unsafe_set_data!, data)
end
it_behaves_like 'appends'
it_behaves_like 'truncates'
end
context 'when data_store is database' do
let(:data_store) { :db }
let(:raw_data) { 'Sample data in db' }
let(:data_store) { :database }
let(:raw_data) { 'Sample data in database' }
let(:data) { raw_data }
it_behaves_like 'appends'
it_behaves_like 'truncates'
end
context 'when data_store is fog' do
let(:data_store) { :fog }
let(:data) { 'Sample data in fog' }
before do
build_trace_chunk.send(:unsafe_set_data!, data)
end
it_behaves_like 'truncates'
end
end
......@@ -253,7 +369,7 @@ describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
let(:data) { 'Sample data in redis' }
before do
build_trace_chunk.send(:redis_set_data, data)
build_trace_chunk.send(:unsafe_set_data!, data)
end
it { is_expected.to eq(data.bytesize) }
......@@ -265,10 +381,10 @@ describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
end
context 'when data_store is database' do
let(:data_store) { :db }
let(:data_store) { :database }
context 'when data exists' do
let(:raw_data) { 'Sample data in db' }
let(:raw_data) { 'Sample data in database' }
let(:data) { raw_data }
it { is_expected.to eq(data.bytesize) }
......@@ -278,10 +394,43 @@ describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
it { is_expected.to eq(0) }
end
end
context 'when data_store is fog' do
let(:data_store) { :fog }
context 'when data exists' do
let(:data) { 'Sample data in fog' }
let(:key) { "tmp/builds/#{build.id}/chunks/#{chunk_index}.log" }
before do
build_trace_chunk.send(:unsafe_set_data!, data)
end
it { is_expected.to eq(data.bytesize) }
end
context 'when data does not exist' do
it { is_expected.to eq(0) }
end
end
end
describe '#use_database!' do
subject { build_trace_chunk.use_database! }
describe '#persist_data!' do
subject { build_trace_chunk.persist_data! }
shared_examples_for 'Atomic operation' do
context 'when the other process is persisting' do
let(:lease_key) { "trace_write:#{build_trace_chunk.build.id}:chunks:#{build_trace_chunk.chunk_index}" }
before do
stub_exclusive_lease_taken(lease_key)
end
it 'raise an error' do
expect { subject }.to raise_error('Failed to obtain a lock')
end
end
end
context 'when data_store is redis' do
let(:data_store) { :redis }
......@@ -290,46 +439,93 @@ describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
let(:data) { 'Sample data in redis' }
before do
build_trace_chunk.send(:redis_set_data, data)
build_trace_chunk.send(:unsafe_set_data!, data)
end
it 'stashes the data' do
expect(build_trace_chunk.data_store).to eq('redis')
expect(build_trace_chunk.send(:redis_data)).to eq(data)
expect(build_trace_chunk.raw_data).to be_nil
it 'persists the data' do
expect(build_trace_chunk.redis?).to be_truthy
expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to eq(data)
expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to be_nil
expect { Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk) }.to raise_error(Excon::Error::NotFound)
subject
expect(build_trace_chunk.data_store).to eq('db')
expect(build_trace_chunk.send(:redis_data)).to be_nil
expect(build_trace_chunk.raw_data).to eq(data)
expect(build_trace_chunk.fog?).to be_truthy
expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to be_nil
expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to be_nil
expect(Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk)).to eq(data)
end
it_behaves_like 'Atomic operation'
end
context 'when data does not exist' do
it 'does not call UPDATE' do
expect(ActiveRecord::QueryRecorder.new { subject }.count).to eq(0)
it 'does not persist' do
expect { subject }.to raise_error('Can not persist empty data')
end
end
end
context 'when data_store is database' do
let(:data_store) { :db }
let(:data_store) { :database }
it 'does not call UPDATE' do
expect(ActiveRecord::QueryRecorder.new { subject }.count).to eq(0)
context 'when data exists' do
let(:data) { 'Sample data in database' }
before do
build_trace_chunk.send(:unsafe_set_data!, data)
end
it 'persists the data' do
expect(build_trace_chunk.database?).to be_truthy
expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to be_nil
expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to eq(data)
expect { Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk) }.to raise_error(Excon::Error::NotFound)
subject
expect(build_trace_chunk.fog?).to be_truthy
expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to be_nil
expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to be_nil
expect(Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk)).to eq(data)
end
it_behaves_like 'Atomic operation'
end
end
end
describe 'ExclusiveLock' do
before do
stub_exclusive_lease_taken
stub_const('Ci::BuildTraceChunk::WRITE_LOCK_RETRY', 1)
context 'when data does not exist' do
it 'does not persist' do
expect { subject }.to raise_error('Can not persist empty data')
end
end
end
it 'raise an error' do
expect { build_trace_chunk.append('ABC', 0) }.to raise_error('Failed to obtain write lock')
context 'when data_store is fog' do
let(:data_store) { :fog }
context 'when data exists' do
let(:data) { 'Sample data in fog' }
before do
build_trace_chunk.send(:unsafe_set_data!, data)
end
it 'does not change data store' do
expect(build_trace_chunk.fog?).to be_truthy
expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to be_nil
expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to be_nil
expect(Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk)).to eq(data)
subject
expect(build_trace_chunk.fog?).to be_truthy
expect(Ci::BuildTraceChunks::Redis.new.data(build_trace_chunk)).to be_nil
expect(Ci::BuildTraceChunks::Database.new.data(build_trace_chunk)).to be_nil
expect(Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk)).to eq(data)
end
it_behaves_like 'Atomic operation'
end
end
end
......
require 'spec_helper'
describe Ci::BuildTraceChunks::Database do
let(:data_store) { described_class.new }
describe '#available?' do
subject { data_store.available? }
it { is_expected.to be_truthy }
end
describe '#data' do
subject { data_store.data(model) }
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :database_with_data, initial_data: 'sample data in database') }
it 'returns the data' do
is_expected.to eq('sample data in database')
end
end
context 'when data does not exist' do
let(:model) { create(:ci_build_trace_chunk, :database_without_data) }
it 'returns nil' do
is_expected.to be_nil
end
end
end
describe '#set_data' do
subject { data_store.set_data(model, data) }
let(:data) { 'abc123' }
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :database_with_data, initial_data: 'sample data in database') }
it 'overwrites data' do
expect(data_store.data(model)).to eq('sample data in database')
subject
expect(data_store.data(model)).to eq('abc123')
end
end
context 'when data does not exist' do
let(:model) { create(:ci_build_trace_chunk, :database_without_data) }
it 'sets new data' do
expect(data_store.data(model)).to be_nil
subject
expect(data_store.data(model)).to eq('abc123')
end
end
end
describe '#delete_data' do
subject { data_store.delete_data(model) }
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :database_with_data, initial_data: 'sample data in database') }
it 'deletes data' do
expect(data_store.data(model)).to eq('sample data in database')
subject
expect(data_store.data(model)).to be_nil
end
end
context 'when data does not exist' do
let(:model) { create(:ci_build_trace_chunk, :database_without_data) }
it 'does nothing' do
expect(data_store.data(model)).to be_nil
subject
expect(data_store.data(model)).to be_nil
end
end
end
describe '#keys' do
subject { data_store.keys(relation) }
let(:build) { create(:ci_build) }
let(:relation) { build.trace_chunks }
before do
create(:ci_build_trace_chunk, :database_with_data, chunk_index: 0, build: build)
create(:ci_build_trace_chunk, :database_with_data, chunk_index: 1, build: build)
end
it 'returns empty array' do
is_expected.to eq([])
end
end
end
require 'spec_helper'
describe Ci::BuildTraceChunks::Fog do
let(:data_store) { described_class.new }
before do
stub_artifacts_object_storage
end
describe '#available?' do
subject { data_store.available? }
context 'when object storage is enabled' do
it { is_expected.to be_truthy }
end
context 'when object storage is disabled' do
before do
stub_artifacts_object_storage(enabled: false)
end
it { is_expected.to be_falsy }
end
end
describe '#data' do
subject { data_store.data(model) }
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :fog_with_data, initial_data: 'sample data in fog') }
it 'returns the data' do
is_expected.to eq('sample data in fog')
end
end
context 'when data does not exist' do
let(:model) { create(:ci_build_trace_chunk, :fog_without_data) }
it 'returns nil' do
expect { data_store.data(model) }.to raise_error(Excon::Error::NotFound)
end
end
end
describe '#set_data' do
subject { data_store.set_data(model, data) }
let(:data) { 'abc123' }
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :fog_with_data, initial_data: 'sample data in fog') }
it 'overwrites data' do
expect(data_store.data(model)).to eq('sample data in fog')
subject
expect(data_store.data(model)).to eq('abc123')
end
end
context 'when data does not exist' do
let(:model) { create(:ci_build_trace_chunk, :fog_without_data) }
it 'sets new data' do
expect { data_store.data(model) }.to raise_error(Excon::Error::NotFound)
subject
expect(data_store.data(model)).to eq('abc123')
end
end
end
describe '#delete_data' do
subject { data_store.delete_data(model) }
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :fog_with_data, initial_data: 'sample data in fog') }
it 'deletes data' do
expect(data_store.data(model)).to eq('sample data in fog')
subject
expect { data_store.data(model) }.to raise_error(Excon::Error::NotFound)
end
end
context 'when data does not exist' do
let(:model) { create(:ci_build_trace_chunk, :fog_without_data) }
it 'does nothing' do
expect { data_store.data(model) }.to raise_error(Excon::Error::NotFound)
subject
expect { data_store.data(model) }.to raise_error(Excon::Error::NotFound)
end
end
end
describe '#keys' do
subject { data_store.keys(relation) }
let(:build) { create(:ci_build) }
let(:relation) { build.trace_chunks }
before do
create(:ci_build_trace_chunk, :fog_with_data, chunk_index: 0, build: build)
create(:ci_build_trace_chunk, :fog_with_data, chunk_index: 1, build: build)
end
it 'returns keys' do
is_expected.to eq([[build.id, 0], [build.id, 1]])
end
end
describe '#delete_keys' do
subject { data_store.delete_keys(keys) }
let(:build) { create(:ci_build) }
let(:relation) { build.trace_chunks }
let(:keys) { data_store.keys(relation) }
before do
create(:ci_build_trace_chunk, :fog_with_data, chunk_index: 0, build: build)
create(:ci_build_trace_chunk, :fog_with_data, chunk_index: 1, build: build)
end
it 'deletes multiple data' do
::Fog::Storage.new(JobArtifactUploader.object_store_credentials).tap do |connection|
expect(connection.get_object('artifacts', "tmp/builds/#{build.id}/chunks/0.log")[:body]).to be_present
expect(connection.get_object('artifacts', "tmp/builds/#{build.id}/chunks/1.log")[:body]).to be_present
end
subject
::Fog::Storage.new(JobArtifactUploader.object_store_credentials).tap do |connection|
expect { connection.get_object('artifacts', "tmp/builds/#{build.id}/chunks/0.log")[:body] }.to raise_error(Excon::Error::NotFound)
expect { connection.get_object('artifacts', "tmp/builds/#{build.id}/chunks/1.log")[:body] }.to raise_error(Excon::Error::NotFound)
end
end
end
end
require 'spec_helper'
describe Ci::BuildTraceChunks::Redis, :clean_gitlab_redis_shared_state do
let(:data_store) { described_class.new }
describe '#available?' do
subject { data_store.available? }
it { is_expected.to be_truthy }
end
describe '#data' do
subject { data_store.data(model) }
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :redis_with_data, initial_data: 'sample data in redis') }
it 'returns the data' do
is_expected.to eq('sample data in redis')
end
end
context 'when data does not exist' do
let(:model) { create(:ci_build_trace_chunk, :redis_without_data) }
it 'returns nil' do
is_expected.to be_nil
end
end
end
describe '#set_data' do
subject { data_store.set_data(model, data) }
let(:data) { 'abc123' }
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :redis_with_data, initial_data: 'sample data in redis') }
it 'overwrites data' do
expect(data_store.data(model)).to eq('sample data in redis')
subject
expect(data_store.data(model)).to eq('abc123')
end
end
context 'when data does not exist' do
let(:model) { create(:ci_build_trace_chunk, :redis_without_data) }
it 'sets new data' do
expect(data_store.data(model)).to be_nil
subject
expect(data_store.data(model)).to eq('abc123')
end
end
end
describe '#delete_data' do
subject { data_store.delete_data(model) }
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :redis_with_data, initial_data: 'sample data in redis') }
it 'deletes data' do
expect(data_store.data(model)).to eq('sample data in redis')
subject
expect(data_store.data(model)).to be_nil
end
end
context 'when data does not exist' do
let(:model) { create(:ci_build_trace_chunk, :redis_without_data) }
it 'does nothing' do
expect(data_store.data(model)).to be_nil
subject
expect(data_store.data(model)).to be_nil
end
end
end
describe '#keys' do
subject { data_store.keys(relation) }
let(:build) { create(:ci_build) }
let(:relation) { build.trace_chunks }
before do
create(:ci_build_trace_chunk, :redis_with_data, chunk_index: 0, build: build)
create(:ci_build_trace_chunk, :redis_with_data, chunk_index: 1, build: build)
end
it 'returns keys' do
is_expected.to eq([[build.id, 0], [build.id, 1]])
end
end
describe '#delete_keys' do
subject { data_store.delete_keys(keys) }
let(:build) { create(:ci_build) }
let(:relation) { build.trace_chunks }
let(:keys) { data_store.keys(relation) }
before do
create(:ci_build_trace_chunk, :redis_with_data, chunk_index: 0, build: build)
create(:ci_build_trace_chunk, :redis_with_data, chunk_index: 1, build: build)
end
it 'deletes multiple data' do
Gitlab::Redis::SharedState.with do |redis|
expect(redis.exists("gitlab:ci:trace:#{build.id}:chunks:0")).to be_truthy
expect(redis.exists("gitlab:ci:trace:#{build.id}:chunks:1")).to be_truthy
end
subject
Gitlab::Redis::SharedState.with do |redis|
expect(redis.exists("gitlab:ci:trace:#{build.id}:chunks:0")).to be_falsy
expect(redis.exists("gitlab:ci:trace:#{build.id}:chunks:1")).to be_falsy
end
end
end
end
......@@ -25,6 +25,11 @@ module StubObjectStorage
::Fog::Storage.new(connection_params).tap do |connection|
begin
connection.directories.create(key: remote_directory)
# Cleanup remaining files
connection.directories.each do |directory|
directory.files.map(&:destroy)
end
rescue Excon::Error::Conflict
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment