Commit 85ae610c authored by Shinya Maeda's avatar Shinya Maeda

Introduce chunks store

parent 0142f9e0
......@@ -18,7 +18,7 @@ class JobArtifactUploader < GitlabUploader
if file_storage?
File.open(path, "rb") if path
else
::Gitlab::Ci::Trace::HttpIO.new(url, size) if url
::Gitlab::Ci::Trace::Remote.new(model.job_id, url, size, "rb") if url
end
end
......
......@@ -61,8 +61,8 @@ module Gitlab
stream = Gitlab::Ci::Trace::Stream.new do
if trace_artifact
trace_artifact.open
elsif LiveIO.exists?(job.id)
LiveIO.new(job.id)
elsif LiveTrace.exists?(job.id)
LiveTrace.new(job.id, "rb")
elsif current_path
File.open(current_path, "rb")
elsif old_trace
......@@ -80,7 +80,7 @@ module Gitlab
if current_path
current_path
else
LiveIO.new(job.id)
LiveTrace.new(job.id, "a+b")
end
end
......@@ -105,7 +105,12 @@ module Gitlab
raise ArchiveError, 'Already archived' if trace_artifact
raise ArchiveError, 'Job is not finished yet' unless job.complete?
if current_path
if LiveTrace.exists?(job.id)
LiveTrace.new(job.id, "rb") do |stream|
archive_stream!(stream)
job.erase_old_trace!
end
elsif current_path
File.open(current_path) do |stream|
archive_stream!(stream)
FileUtils.rm(current_path)
......
module Gitlab
module Ci
class Trace
module ChunkStores
class Base
InitializeError = Class.new(StandardError)
NotSupportedError = Class.new(StandardError)
attr_reader :chunk_start
attr_reader :chunk_index
attr_reader :buffer_size
attr_reader :url
def initialize(*identifiers, **params)
@buffer_size = params[:buffer_size]
@chunk_start = params[:chunk_start]
@url = params[:url]
end
def exist?
raise NotImplementedError
end
def get
raise NotImplementedError
end
def size
raise NotImplementedError
end
def write!(data)
raise NotImplementedError
end
def truncate!(offset)
raise NotImplementedError
end
def delete!
raise NotImplementedError
end
def filled?
size == buffer_size
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
module ChunkStores
class Database < Base
class << self
def open(job_id, chunk_index, **params)
raise ArgumentError unless job_id && chunk_index
job = Ci::JobTraceChunk.find_or_initialize_by(job_id: job_id, chunk_index: chunk_index)
yield self.class.new(job, params)
end
def exist?(job_id, chunk_index)
Ci::JobTraceChunk.exists?(job_id: job_id, chunk_index: chunk_index)
end
def chunks_count(job_id)
Ci::JobTraceChunk.where(job_id: job_id).count
end
def chunks_size(job_id)
Ci::JobTraceChunk.where(job_id: job_id).pluck('len(data)')
.inject(0){ |sum, data_length| sum + data_length }
end
def delete_all(job_id)
Ci::JobTraceChunk.destroy_all(job_id: job_id)
end
end
attr_reader :job
def initialize(job, **params)
super
@job = job
end
def get
job.data
end
def size
job.data&.length || 0
end
def write!(data)
raise NotSupportedError, 'Only full size is supported' unless buffer_size == data.length
job.create!(data: data)
data.length
end
def truncate!(offset)
raise NotSupportedError
end
def delete!
job.destroy!
end
# def change_chunk_index!(job_id, new_chunk_index)
# raise NotSupportedError
# end
end
end
end
end
end
module Gitlab
module Ci
class Trace
module ChunkStores
class ObjectStorage < Base
class << self
def open(job_id, chunk_index, **params)
raise ArgumentError unless job_id && chunk_index
yield self.class.new(params)
end
def exist?(job_id, chunk_index)
raise NotSupportedError
end
def chunks_count(job_id)
raise NotSupportedError
end
end
InvalidURLError = Class.new(StandardError)
FailedToGetChunkError = Class.new(StandardError)
attr_reader :url
def initialize(**params)
raise InvalidURLError unless ::Gitlab::UrlSanitizer.valid?(url)
super
@uri = URI(url)
end
def get
response = Net::HTTP.start(uri.hostname, uri.port, use_ssl: uri.scheme == 'https') do |http|
request = Net::HTTP::Get.new(uri)
request.set_range(chunk_start, buffer_size)
http.request(request)
end
raise FailedToGetChunkError unless response.code == '200' || response.code == '206'
response.body.force_encoding(Encoding::BINARY)
end
def size
raise NotImplementedError
end
def write!(data)
raise NotImplementedError
end
def truncate!(offset)
raise NotImplementedError
end
def delete
raise NotImplementedError
end
def change_chunk_index!(job_id, new_chunk_index)
raise NotImplementedError
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
module ChunkStores
class Redis < Base
class << self
def open(job_id, chunk_index, **params)
raise ArgumentError unless job_id && chunk_index
yield self.class.new(self.buffer_key(job_id, chunk_index), params)
end
def exist?(job_id, chunk_index)
Gitlab::Redis::Cache.with do |redis|
redis.exists(self.buffer_key(job_id, chunk_index))
end
end
def chunks_count(job_id)
Gitlab::Redis::Cache.with do |redis|
redis.keys(buffer_key(job_id, '*')).count
end
end
def chunks_size(job_id)
Gitlab::Redis::Cache.with do |redis|
redis.keys(buffer_key(job_id, '*')).inject(0) do |sum, key|
sum + redis.strlen(key)
end
end
end
def buffer_key(job_id, chunk_index)
"live_trace_buffer:#{job_id}:#{chunk_index}"
end
end
attr_reader :buffer_key
def initialize(buffer_key, **params)
super
@buffer_key = buffer_key
end
def get
Gitlab::Redis::Cache.with do |redis|
redis.get(buffer_key)
end
end
def size
Gitlab::Redis::Cache.with do |redis|
redis.strlen(buffer_key)
end
end
def write!(data)
Gitlab::Redis::Cache.with do |redis|
redis.set(buffer_key, data)
end
end
def truncate!(offset)
Gitlab::Redis::Cache.with do |redis|
truncated_data = redis.getrange(buffer_key, 0, offset)
redis.set(buffer_key, truncated_data)
end
end
def delete!
Gitlab::Redis::Cache.with do |redis|
redis.del(buffer_key)
end
end
# def change_chunk_index!(job_id, new_chunk_index)
# Gitlab::Redis::Cache.with do |redis|
# new_buffer_key = self.class.buffer_key(job_id, new_chunk_index)
# redis.rename(buffer_key, new_buffer_key)
# end
# end
end
end
end
end
end
##
# This class is compatible with IO class (https://ruby-doc.org/core-2.3.1/IO.html)
# source: https://gitlab.com/snippets/1685610
module Gitlab
module Ci
class Trace
class ChunkedIO
WriteError = Class.new(StandardError)
attr_reader :size
attr_reader :tell
attr_reader :chunk, :chunk_range
attr_reader :write_lock_uuid
attr_reader :job_id
alias_method :pos, :tell
def initialize(job_id, size, mode)
@size = size
@tell = 0
@job_id = job_id
if /(w|a)/ =~ mode
@write_lock_uuid = Gitlab::ExclusiveLease.new(write_lock_key, timeout: 5.minutes.to_i).try_obtain
raise WriteError, 'Already opened by another process' unless write_lock_uuid
end
end
def close
Gitlab::ExclusiveLease.cancel(write_lock_key, write_lock_uuid) if write_lock_uuid
end
def binmode
# no-op
end
def binmode?
true
end
def path
nil
end
def seek(pos, where = IO::SEEK_SET)
new_pos =
case where
when IO::SEEK_END
size + pos
when IO::SEEK_SET
pos
when IO::SEEK_CUR
tell + pos
else
-1
end
raise 'new position is outside of file' if new_pos < 0 || new_pos > size
@tell = new_pos
end
def eof?
tell == size
end
def each_line
until eof?
line = readline
break if line.nil?
yield(line)
end
end
def read(length = nil)
out = ""
until eof? || (length && out.length >= length)
data = get_chunk
break if data.empty?
out << data
@tell += data.bytesize
end
out = out[0, length] if length && out.length > length
out
end
def readline
out = ""
until eof?
data = get_chunk
new_line = data.index("\n")
if !new_line.nil?
out << data[0..new_line]
@tell += new_line + 1
break
else
out << data
@tell += data.bytesize
end
end
out
end
def write(data, &block)
raise WriteError, 'Already opened by another process' unless write_lock_uuid
while data.present?
empty_space = BUFFER_SIZE - chunk_offset
chunk_store.open(job_id, chunk_index, params_for_store) do |store|
data_to_write = ''
data_to_write += store.get if store.size > 0
data_to_write += data.slice!(empty_space)
written_size = store.write!(data_to_write)
raise WriteError, 'Written size mismatch' unless data_to_write.length == written_size
block.call(store, chunk_index) if block_given?
@tell += written_size
@size += written_size
end
end
end
def truncate(offset)
raise WriteError, 'Already opened by another process' unless write_lock_uuid
removal_chunk_index_start = (offset / BUFFER_SIZE)
removal_chunk_index_end = total_chunk_count - 1
removal_chunk_offset = offset % BUFFER_SIZE
if removal_chunk_offset > 0
chunk_store.open(job_id, removal_chunk_index_start, params_for_store) do |store|
store.truncate!(removal_chunk_offset)
end
removal_chunk_index_start += 1
end
(removal_chunk_index_start..removal_chunk_index_end).each do |removal_chunk_index|
chunk_store.open(job_id, removal_chunk_index, params_for_store) do |store|
store.delete!
end
end
@tell = @size = offset
end
def flush
# no-op
end
def present?
true
end
private
##
# The below methods are not implemented in IO class
#
def in_range?
@chunk_range&.include?(tell)
end
def get_chunk
unless in_range?
chunk_store.open(job_id, chunk_index, params_for_store) do |store|
@chunk = store.get
@chunk_range = (chunk_start...(chunk_start + @chunk.length))
end
end
@chunk[chunk_offset..BUFFER_SIZE]
end
def params_for_store
{
buffer_size: BUFFER_SIZE,
chunk_start: chunk_start
}
end
def chunk_offset
tell % BUFFER_SIZE
end
def chunk_start
(tell / BUFFER_SIZE) * BUFFER_SIZE
end
def chunk_end
[chunk_start + BUFFER_SIZE, size].min
end
def chunk_index
(tell / BUFFER_SIZE)
end
def total_chunk_count
(size / BUFFER_SIZE) + 1
end
def last_chunk?
chunk_index == (total_chunk_count - 1)
end
def write_lock_key
"live_trace_write:#{job_id}"
end
end
end
end
end
module Gitlab
module Ci
class Trace
class LiveIO < ChunkedIO
BUFFER_SIZE = 32.kilobytes
class << self
def exists?(job_id)
exists_in_redis? || exists_in_database?
end
def exists_in_redis?(job_id)
Gitlab::Redis::Cache.with do |redis|
redis.exists(buffer_key(job_id))
end
end
def exists_in_database?(job_id)
Ci::JobTraceChunk.exists?(job_id: job_id)
end
def buffer_key(job_id)
"ci:live_trace_buffer:#{job_id}"
end
end
attr_reader :job_id
def initialize(job_id)
@job_id = job_id
super
end
def write(data)
# TODO:
end
def truncate(offset)
# TODO:
end
def flush
# TODO:
end
private
##
# Override
def get_chunk
# TODO:
end
end
end
end
end
module Gitlab
module Ci
class Trace
class LiveTrace < ChunkedIO
BUFFER_SIZE = 128.kilobytes
class << self
def exist?(job_id)
ChunkStores::Redis.chunks_count(job_id) > 0 ||
ChunkStores::Database.chunks_count(job_id) > 0
end
end
def initialize(job_id, mode)
super(job_id, calculate_size, mode)
end
def write(data)
raise NotImplementedError, 'Overwrite is not supported' unless tell == size
super(data) do |store|
if store.filled?
# Rotate data from redis to database
ChunkStores::Database.open(job_id, chunk_index, params_for_store) do |to_store|
to_store.write!(store.get)
end
store.delete!
end
end
end
private
def calculate_size
ChunkStores::Redis.chunks_size(job_id) +
ChunkStores::Database.chunks_size(job_id)
end
def chunk_store
if last_chunk?
ChunkStores::Redis
else
ChunkStores::Database
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
class Remote < ChunkedIO
BUFFER_SIZE = 128.kilobytes
NoSupportError = Class.new(StandardError)
attr_reader :uri
def initialize(job_id, url, size, mode)
@uri = URI(url)
super(job_id, size, mode)
end
def write(data)
raise NoSupportError
end
def truncate(offset)
raise NoSupportError
end
def flush
raise NoSupportError
end
private
def chunk_store
ChunkStores::Http
end
def params_for_store
super.merge( { uri: uri } )
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment