Commit d0e4b89d authored by Grzegorz Bizon's avatar Grzegorz Bizon

Make cloud native build chunks more resilient

This commit extends live traces with an additional mechanism that
writes data checksum to a database once a trace chunk gets full.
parent 66fa2eff
......@@ -2,9 +2,10 @@
module Ci
class BuildTraceChunk < ApplicationRecord
include FastDestroyAll
extend ::Gitlab::Ci::Model
include ::FastDestroyAll
include ::Checksummable
include ::Gitlab::ExclusiveLeaseHelpers
extend Gitlab::Ci::Model
belongs_to :build, class_name: "Ci::Build", foreign_key: :build_id
......@@ -60,8 +61,6 @@ module Ci
end
end
##
# Data is memoized for optimizing #size and #end_offset
def data
@data ||= get_data.to_s
end
......@@ -80,11 +79,11 @@ module Ci
in_lock(*lock_params) { unsafe_append_data!(new_data, offset) }
schedule_to_persist if full?
schedule_to_persist! if full?
end
def size
@size ||= current_store.size(self) || data&.bytesize
@size ||= @data&.bytesize || current_store.size(self) || data&.bytesize
end
def start_offset
......@@ -100,35 +99,49 @@ module Ci
end
def persist_data!
in_lock(*lock_params) do # Write operation is atomic
unsafe_persist_to!(self.class.persistable_store)
in_lock(*lock_params) { unsafe_persist_data! }
end
def schedule_to_persist!
return if persisted?
Ci::BuildTraceChunkFlushWorker.perform_async(id)
end
private
def unsafe_persist_to!(new_store)
def get_data
# Redis / database return UTF-8 encoded string by default
current_store.data(self)&.force_encoding(Encoding::BINARY)
end
def unsafe_persist_data!(new_store = self.class.persistable_store)
return if data_store == new_store.to_s
current_data = get_data
current_data = data
old_store_class = current_store
unless current_data&.bytesize.to_i == CHUNK_SIZE
raise FailedToPersistDataError, 'Data is not fulfilled in a bucket'
end
old_store_class = current_store
self.raw_data = nil
self.data_store = new_store
self.checksum = crc32(current_data)
##
# We need to so persist data then save a new store identifier before we
# remove data from the previous store to make this operation
# trasnaction-safe. `unsafe_set_data! calls `save!` because of this
# reason.
#
# TODO consider using callbacks and state machine to remove old data
#
unsafe_set_data!(current_data)
old_store_class.delete_data(self)
end
def get_data
current_store.data(self)&.force_encoding(Encoding::BINARY) # Redis/Database return UTF-8 string as default
end
def unsafe_set_data!(value)
raise ArgumentError, 'New data size exceeds chunk size' if value.bytesize > CHUNK_SIZE
......@@ -157,14 +170,12 @@ module Ci
save! if changed?
end
def schedule_to_persist
return if data_persisted?
Ci::BuildTraceChunkFlushWorker.perform_async(id)
def persisted?
!redis?
end
def data_persisted?
!redis?
def live?
redis?
end
def full?
......
......@@ -29,7 +29,7 @@ module Ci
new_data = truncated_data + new_data
end
model.raw_data = new_data
set_data(model, new_data)
model.raw_data.to_s.bytesize
end
......
......@@ -41,9 +41,9 @@ module Ci
end
end
def set_data(model, data)
def set_data(model, new_data)
Gitlab::Redis::SharedState.with do |redis|
redis.set(key(model), data, ex: CHUNK_REDIS_TTL)
redis.set(key(model), new_data, ex: CHUNK_REDIS_TTL)
end
end
......
......@@ -3,9 +3,13 @@
module Checksummable
extend ActiveSupport::Concern
def crc32(data)
Zlib.crc32(data)
end
class_methods do
def hexdigest(path)
Digest::SHA256.file(path).hexdigest
::Digest::SHA256.file(path).hexdigest
end
end
end
......@@ -6,9 +6,9 @@ module Ci
include PipelineBackgroundQueue
# rubocop: disable CodeReuse/ActiveRecord
def perform(build_trace_chunk_id)
::Ci::BuildTraceChunk.find_by(id: build_trace_chunk_id).try do |build_trace_chunk|
build_trace_chunk.persist_data!
def perform(chunk_id)
::Ci::BuildTraceChunk.find_by(id: chunk_id).try do |chunk|
chunk.persist_data!
end
end
# rubocop: enable CodeReuse/ActiveRecord
......
---
title: Make cloud native build logs more resilient
merge_request: 40506
author:
type: added
# frozen_string_literal: true
class AddChecksumToBuildChunk < ActiveRecord::Migration[6.0]
DOWNTIME = false
def change
add_column :ci_build_trace_chunks, :checksum, :binary
end
end
39d412a1680d55466c14450943e17802eb183f2f33f2f77078cba571262cd149
\ No newline at end of file
......@@ -9782,7 +9782,8 @@ CREATE TABLE public.ci_build_trace_chunks (
build_id integer NOT NULL,
chunk_index integer NOT NULL,
data_store integer NOT NULL,
raw_data bytea
raw_data bytea,
checksum bytea
);
CREATE SEQUENCE public.ci_build_trace_chunks_id_seq
......
......@@ -8,6 +8,11 @@ module Gitlab
# We assume 'value' must be mutable, given
# that frozen string is enabled.
##
# TODO We need to remove this because it is going to change checksum of
# a trace.
#
value.gsub!(token, 'x' * token.length)
value
end
......
......@@ -222,6 +222,8 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
subject
build_trace_chunk.reload
expect(build_trace_chunk.checksum).to be_present
expect(build_trace_chunk.fog?).to be_truthy
expect(build_trace_chunk.data).to eq(new_data)
end
......@@ -502,6 +504,12 @@ RSpec.describe Ci::BuildTraceChunk, :clean_gitlab_redis_shared_state do
expect(Ci::BuildTraceChunks::Fog.new.data(build_trace_chunk)).to eq(data)
end
it 'calculates CRC32 checksum' do
subject
expect(build_trace_chunk.reload.checksum).to eq '3398914352'
end
it_behaves_like 'Atomic operation'
end
......
......@@ -46,9 +46,7 @@ RSpec.describe Ci::BuildTraceChunks::Fog do
end
describe '#set_data' do
subject { data_store.set_data(model, data) }
let(:data) { 'abc123' }
let(:new_data) { 'abc123' }
context 'when data exists' do
let(:model) { create(:ci_build_trace_chunk, :fog_with_data, initial_data: 'sample data in fog') }
......@@ -56,9 +54,9 @@ RSpec.describe Ci::BuildTraceChunks::Fog do
it 'overwrites data' do
expect(data_store.data(model)).to eq('sample data in fog')
subject
data_store.set_data(model, new_data)
expect(data_store.data(model)).to eq('abc123')
expect(data_store.data(model)).to eq new_data
end
end
......@@ -68,9 +66,9 @@ RSpec.describe Ci::BuildTraceChunks::Fog do
it 'sets new data' do
expect(data_store.data(model)).to be_nil
subject
data_store.set_data(model, new_data)
expect(data_store.data(model)).to eq('abc123')
expect(data_store.data(model)).to eq new_data
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment