Commit 1108df18 authored by Shinya Maeda's avatar Shinya Maeda

Add chunks_store database spec

parent f49aea75
......@@ -18,7 +18,7 @@ class JobArtifactUploader < GitlabUploader
if file_storage?
File.open(path, "rb") if path
else
::Gitlab::Ci::Trace::RemoteFile.new(model.job_id, url, size, "rb") if url
::Gitlab::Ci::Trace::HttpIO.new(url, size) if url
end
end
......
......@@ -61,7 +61,7 @@ module Gitlab
stream = Gitlab::Ci::Trace::Stream.new do
if trace_artifact
trace_artifact.open
elsif LiveTraceFile.exists?(job.id)
elsif Feature.enabled?('ci_enable_live_trace') && LiveTraceFile.exists?(job.id)
LiveTraceFile.new(job.id, "rb")
elsif current_path
File.open(current_path, "rb")
......@@ -77,10 +77,14 @@ module Gitlab
def write
stream = Gitlab::Ci::Trace::Stream.new do
if current_path
current_path
if Feature.enabled?('ci_enable_live_trace')
if current_path
current_path
else
LiveTraceFile.new(job.id, "a+b")
end
else
LiveTraceFile.new(job.id, "a+b")
File.open(ensure_path, "a+b")
end
end
......@@ -105,7 +109,7 @@ module Gitlab
raise ArchiveError, 'Already archived' if trace_artifact
raise ArchiveError, 'Job is not finished yet' unless job.complete?
if LiveTraceFile.exists?(job.id)
if Feature.enabled?('ci_enable_live_trace') && LiveTraceFile.exists?(job.id)
LiveTraceFile.open(job.id, "wb") do |stream|
archive_stream!(stream)
stream.truncate(0)
......@@ -153,6 +157,19 @@ module Gitlab
end
end
def ensure_path
return current_path if current_path
ensure_directory
default_path
end
def ensure_directory
unless Dir.exist?(default_directory)
FileUtils.mkdir_p(default_directory)
end
end
def current_path
@current_path ||= paths.find do |trace_path|
File.exist?(trace_path)
......
......@@ -4,9 +4,6 @@ module Gitlab
module ChunkedFile
module ChunkStore
class Base
InitializeError = Class.new(StandardError)
NotSupportedError = Class.new(StandardError)
attr_reader :buffer_size
attr_reader :chunk_start
attr_reader :url
......@@ -17,6 +14,10 @@ module Gitlab
@url = params[:url]
end
def close
raise NotImplementedError
end
def get
raise NotImplementedError
end
......
......@@ -8,45 +8,55 @@ module Gitlab
def open(job_id, chunk_index, **params)
raise ArgumentError unless job_id && chunk_index
job = Ci::JobTraceChunk.find_or_initialize_by(job_id: job_id, chunk_index: chunk_index)
job_trace_chunk = ::Ci::JobTraceChunk
.find_or_initialize_by(job_id: job_id, chunk_index: chunk_index)
store = self.new(job_trace_chunk, params)
yield self.class.new(job, params)
yield store
ensure
store&.close
end
def exist?(job_id, chunk_index)
Ci::JobTraceChunk.exists?(job_id: job_id, chunk_index: chunk_index)
::Ci::JobTraceChunk.exists?(job_id: job_id, chunk_index: chunk_index)
end
def chunks_count(job_id)
Ci::JobTraceChunk.where(job_id: job_id).count
::Ci::JobTraceChunk.where(job_id: job_id).count
end
def chunks_size(job_id)
Ci::JobTraceChunk.where(job_id: job_id).pluck('len(data)')
.inject(0){ |sum, data_length| sum + data_length }
::Ci::JobTraceChunk.where(job_id: job_id).pluck('data')
.inject(0) { |sum, data| sum + data.length }
end
end
attr_reader :job
attr_reader :job_trace_chunk
def initialize(job, **params)
def initialize(job_trace_chunk, **params)
super
@job = job
@job_trace_chunk = job_trace_chunk
end
def close
@job_trace_chunk = nil
end
def get
job.data
job_trace_chunk.data
end
def size
job.data&.length || 0
job_trace_chunk.data&.length || 0
end
def write!(data)
raise NotImplementedError, 'Only full size write is supported' unless buffer_size == data.length
raise NotImplementedError, 'Partial write is not supported' unless buffer_size == data&.length
raise NotImplementedError, 'UPDATE is not supported' if job_trace_chunk.data
job.create!(data: data)
job_trace_chunk.data = data
job_trace_chunk.save!
data.length
end
......@@ -56,7 +66,7 @@ module Gitlab
end
def delete!
job.destroy!
job_trace_chunk.destroy!
end
end
end
......
......@@ -8,24 +8,42 @@ module Gitlab
def open(job_id, chunk_index, **params)
raise ArgumentError unless job_id && chunk_index
yield self.class.new(params)
relative_path = relative_path(job_id, chunk_index)
store = self.new(relative_path, params)
yield store
ensure
store&.close
end
def exist?(job_id, chunk_index)
raise NotSupportedError
raise NotImplementedError
end
def chunks_count(job_id)
raise NotSupportedError
raise NotImplementedError
end
def relative_path(job_id, chunk_index)
"#{job_id}/#{chunk_index}.chunk"
end
end
FailedToGetChunkError = Class.new(StandardError)
def initialize(**params)
attr_reader :relative_path
def initialize(relative_path, **params)
super
@relative_path = relative_path
end
def close
@relative_path = nil
end
## TODO: Carrierwave::Fog integration
def get
response = Net::HTTP.start(uri.hostname, uri.port, use_ssl: uri.scheme == 'https') do |http|
request = Net::HTTP::Get.new(uri)
......@@ -43,6 +61,7 @@ module Gitlab
end
def write!(data)
raise NotImplementedError, 'Partial write is not supported' unless buffer_size == data.length
raise NotImplementedError
end
......
......@@ -8,7 +8,12 @@ module Gitlab
def open(job_id, chunk_index, **params)
raise ArgumentError unless job_id && chunk_index
yield self.new(self.buffer_key(job_id, chunk_index), params)
buffer_key = self.buffer_key(job_id, chunk_index)
store = self.new(buffer_key, params)
yield store
ensure
store&.close
end
def exist?(job_id, chunk_index)
......@@ -46,6 +51,10 @@ module Gitlab
@buffer_key = buffer_key
end
def close
@buffer_key = nil
end
def get
Gitlab::Redis::Cache.with do |redis|
redis.get(buffer_key)
......
......@@ -22,7 +22,7 @@ module Gitlab
@job_id = job_id
if /(w|a)/ =~ mode
@write_lock_uuid = Gitlab::ExclusiveLease.new(write_lock_key, timeout: 5.minutes.to_i).try_obtain
@write_lock_uuid = Gitlab::ExclusiveLease.new(write_lock_key, timeout: 1.hour.to_i).try_obtain
raise WriteError, 'Already opened by another process' unless write_lock_uuid
end
end
......
......@@ -10,7 +10,7 @@ module Gitlab
stream = self.class.new(job_id, mode)
yield stream
ensure
stream.close
end
......
include ActionDispatch::TestProcess
FactoryBot.define do
factory :job_trace_chunk, class: Ci::JobTraceChunk do
job factory: :ci_build
end
end
require 'spec_helper'
describe Gitlab::Ci::Trace::ChunkedFile::ChunkStore::Database do
let(:job_id) { job.id }
let(:chunk_index) { 0 }
let(:buffer_size) { 256 }
let(:job_trace_chunk) { ::Ci::JobTraceChunk.new(job_id: job_id, chunk_index: chunk_index) }
let(:params) { { buffer_size: buffer_size } }
let(:trace) { 'A' * buffer_size }
let(:job) { create(:ci_build) }
describe '.open' do
subject { described_class.open(job_id, chunk_index, params) }
it 'opens' do
expect { |b| described_class.open(job_id, chunk_index, params, &b) }
.to yield_successive_args(described_class)
end
context 'when job_id is nil' do
let(:job_id) { nil }
it { expect { subject }.to raise_error(ArgumentError) }
end
context 'when chunk_index is nil' do
let(:chunk_index) { nil }
it { expect { subject }.to raise_error(ArgumentError) }
end
end
describe '.exist?' do
subject { described_class.exist?(job_id, chunk_index) }
context 'when job_trace_chunk exists' do
before do
described_class.new(job_trace_chunk, params).write!(trace)
end
it { is_expected.to be_truthy }
end
context 'when job_trace_chunk does not exist' do
it { is_expected.to be_falsy }
end
end
describe '.chunks_count' do
subject { described_class.chunks_count(job_id) }
context 'when job_trace_chunk exists' do
before do
described_class.new(job_trace_chunk, params).write!(trace)
end
it { is_expected.to eq(1) }
context 'when two chunks exists' do
let(:job_trace_chunk_2) { ::Ci::JobTraceChunk.new(job_id: job_id, chunk_index: chunk_index + 1) }
let(:trace_2) { 'B' * buffer_size }
before do
described_class.new(job_trace_chunk_2, params).write!(trace_2)
end
it { is_expected.to eq(2) }
end
end
context 'when job_trace_chunk does not exist' do
it { is_expected.to eq(0) }
end
end
describe '.chunks_size' do
subject { described_class.chunks_size(job_id) }
context 'when job_trace_chunk exists' do
before do
described_class.new(job_trace_chunk, params).write!(trace)
end
it { is_expected.to eq(trace.length) }
context 'when two chunks exists' do
let(:job_trace_chunk_2) { ::Ci::JobTraceChunk.new(job_id: job_id, chunk_index: chunk_index + 1) }
let(:trace_2) { 'B' * buffer_size }
let(:chunks_size) { trace.length + trace_2.length }
before do
described_class.new(job_trace_chunk_2, params).write!(trace_2)
end
it { is_expected.to eq(chunks_size) }
end
end
context 'when job_trace_chunk does not exist' do
it { is_expected.to eq(0) }
end
end
describe '#get' do
subject { described_class.new(job_trace_chunk, params).get }
context 'when job_trace_chunk exists' do
before do
described_class.new(job_trace_chunk, params).write!(trace)
end
it { is_expected.to eq(trace) }
end
context 'when job_trace_chunk does not exist' do
it { is_expected.to be_nil }
end
end
describe '#size' do
subject { described_class.new(job_trace_chunk, params).size }
context 'when job_trace_chunk exists' do
before do
described_class.new(job_trace_chunk, params).write!(trace)
end
it { is_expected.to eq(trace.length) }
end
context 'when job_trace_chunk does not exist' do
it { is_expected.to eq(0) }
end
end
describe '#write!' do
subject { described_class.new(job_trace_chunk, params).write!(trace) }
context 'when job_trace_chunk exists' do
before do
described_class.new(job_trace_chunk, params).write!(trace)
end
it { expect { subject }.to raise_error('UPDATE is not supported') }
end
context 'when job_trace_chunk does not exist' do
let(:expected_data) { ::Ci::JobTraceChunk.find_by(job_id: job_id, chunk_index: chunk_index).data }
it 'writes' do
is_expected.to eq(trace.length)
expect(expected_data).to eq(trace)
end
end
context 'when data is nil' do
let(:trace) { nil }
it { expect { subject }.to raise_error('Partial write is not supported') }
end
end
describe '#truncate!' do
subject { described_class.new(job_trace_chunk, params).truncate!(0) }
it { expect { subject }.to raise_error(NotImplementedError) }
end
describe '#delete!' do
subject { described_class.new(job_trace_chunk, params).delete! }
context 'when job_trace_chunk exists' do
before do
described_class.new(job_trace_chunk, params).write!(trace)
end
it 'deletes' do
expect(::Ci::JobTraceChunk.exists?(job_id: job_id, chunk_index: chunk_index))
.to be_truthy
subject
expect(::Ci::JobTraceChunk.exists?(job_id: job_id, chunk_index: chunk_index))
.to be_falsy
end
end
context 'when job_trace_chunk does not exist' do
it 'deletes' do
expect(::Ci::JobTraceChunk.exists?(job_id: job_id, chunk_index: chunk_index))
.to be_falsy
subject
expect(::Ci::JobTraceChunk.exists?(job_id: job_id, chunk_index: chunk_index))
.to be_falsy
end
end
end
end
......@@ -2,8 +2,8 @@ require 'spec_helper'
describe Gitlab::Ci::Trace::ChunkedFile::ChunkStore::Redis, :clean_gitlab_redis_cache do
let(:job_id) { 1 }
let(:buffer_size) { 128.kilobytes }
let(:chunk_index) { 0 }
let(:buffer_size) { 128.kilobytes }
let(:buffer_key) { described_class.buffer_key(job_id, chunk_index) }
let(:params) { { buffer_size: buffer_size } }
let(:trace) { 'Here is the trace' }
......@@ -111,16 +111,14 @@ describe Gitlab::Ci::Trace::ChunkedFile::ChunkStore::Redis, :clean_gitlab_redis_
context 'when buffer_key exists' do
before do
Gitlab::Redis::Cache.with do |redis|
redis.set(buffer_key, trace)
end
described_class.new(buffer_key, params).write!(trace)
end
it { is_expected.to eq(trace) }
end
context 'when buffer_key does not exist' do
it { is_expected.not_to eq(trace) }
it { is_expected.to be_nil }
end
end
......@@ -129,9 +127,7 @@ describe Gitlab::Ci::Trace::ChunkedFile::ChunkStore::Redis, :clean_gitlab_redis_
context 'when buffer_key exists' do
before do
Gitlab::Redis::Cache.with do |redis|
redis.set(buffer_key, trace)
end
described_class.new(buffer_key, params).write!(trace)
end
it { is_expected.to eq(trace.length) }
......@@ -147,9 +143,7 @@ describe Gitlab::Ci::Trace::ChunkedFile::ChunkStore::Redis, :clean_gitlab_redis_
context 'when buffer_key exists' do
before do
Gitlab::Redis::Cache.with do |redis|
redis.set(buffer_key, 'Already data in the chunk')
end
described_class.new(buffer_key, params).write!('Already data in the chunk')
end
it 'overwrites' do
......@@ -187,9 +181,7 @@ describe Gitlab::Ci::Trace::ChunkedFile::ChunkStore::Redis, :clean_gitlab_redis_
context 'when buffer_key exists' do
before do
Gitlab::Redis::Cache.with do |redis|
redis.set(buffer_key, trace)
end
described_class.new(buffer_key, params).write!(trace)
end
it 'truncates' do
......@@ -241,9 +233,7 @@ describe Gitlab::Ci::Trace::ChunkedFile::ChunkStore::Redis, :clean_gitlab_redis_
context 'when buffer_key exists' do
before do
Gitlab::Redis::Cache.with do |redis|
redis.set(buffer_key, trace)
end
described_class.new(buffer_key, params).write!(trace)
end
it 'deletes' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment