Commit eb64ecb2 authored by Shinya Maeda's avatar Shinya Maeda

Clarify namespaces

parent 85ae610c
......@@ -18,7 +18,7 @@ class JobArtifactUploader < GitlabUploader
if file_storage?
File.open(path, "rb") if path
else
::Gitlab::Ci::Trace::Remote.new(model.job_id, url, size, "rb") if url
::Gitlab::Ci::Trace::RemoteFile.new(model.job_id, url, size, "rb") if url
end
end
......
......@@ -61,8 +61,8 @@ module Gitlab
stream = Gitlab::Ci::Trace::Stream.new do
if trace_artifact
trace_artifact.open
elsif LiveTrace.exists?(job.id)
LiveTrace.new(job.id, "rb")
elsif LiveTraceFile.exists?(job.id)
LiveTraceFile.new(job.id, "rb")
elsif current_path
File.open(current_path, "rb")
elsif old_trace
......@@ -80,7 +80,7 @@ module Gitlab
if current_path
current_path
else
LiveTrace.new(job.id, "a+b")
LiveTraceFile.new(job.id, "a+b")
end
end
......@@ -105,10 +105,10 @@ module Gitlab
raise ArchiveError, 'Already archived' if trace_artifact
raise ArchiveError, 'Job is not finished yet' unless job.complete?
if LiveTrace.exists?(job.id)
LiveTrace.new(job.id, "rb") do |stream|
if LiveTraceFile.exists?(job.id)
LiveTraceFile.open(job.id, "wb") do |stream|
archive_stream!(stream)
job.erase_old_trace!
stream.truncate(0)
end
elsif current_path
File.open(current_path) do |stream|
......
module Gitlab
module Ci
class Trace
module ChunkStores
class Base
InitializeError = Class.new(StandardError)
NotSupportedError = Class.new(StandardError)
attr_reader :chunk_start
attr_reader :chunk_index
attr_reader :buffer_size
attr_reader :url
def initialize(*identifiers, **params)
@buffer_size = params[:buffer_size]
@chunk_start = params[:chunk_start]
@url = params[:url]
end
def exist?
raise NotImplementedError
end
def get
raise NotImplementedError
end
def size
raise NotImplementedError
end
def write!(data)
raise NotImplementedError
end
def truncate!(offset)
raise NotImplementedError
end
def delete!
raise NotImplementedError
end
def filled?
size == buffer_size
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
module ChunkStores
class Database < Base
class << self
def open(job_id, chunk_index, **params)
raise ArgumentError unless job_id && chunk_index
job = Ci::JobTraceChunk.find_or_initialize_by(job_id: job_id, chunk_index: chunk_index)
yield self.class.new(job, params)
end
def exist?(job_id, chunk_index)
Ci::JobTraceChunk.exists?(job_id: job_id, chunk_index: chunk_index)
end
def chunks_count(job_id)
Ci::JobTraceChunk.where(job_id: job_id).count
end
def chunks_size(job_id)
Ci::JobTraceChunk.where(job_id: job_id).pluck('len(data)')
.inject(0){ |sum, data_length| sum + data_length }
end
def delete_all(job_id)
Ci::JobTraceChunk.destroy_all(job_id: job_id)
end
end
attr_reader :job
def initialize(job, **params)
super
@job = job
end
def get
job.data
end
def size
job.data&.length || 0
end
def write!(data)
raise NotSupportedError, 'Only full size is supported' unless buffer_size == data.length
job.create!(data: data)
data.length
end
def truncate!(offset)
raise NotSupportedError
end
def delete!
job.destroy!
end
# def change_chunk_index!(job_id, new_chunk_index)
# raise NotSupportedError
# end
end
end
end
end
end
module Gitlab
module Ci
class Trace
module ChunkStores
class ObjectStorage < Base
class << self
def open(job_id, chunk_index, **params)
raise ArgumentError unless job_id && chunk_index
yield self.class.new(params)
end
def exist?(job_id, chunk_index)
raise NotSupportedError
end
def chunks_count(job_id)
raise NotSupportedError
end
end
InvalidURLError = Class.new(StandardError)
FailedToGetChunkError = Class.new(StandardError)
attr_reader :url
def initialize(**params)
raise InvalidURLError unless ::Gitlab::UrlSanitizer.valid?(url)
super
@uri = URI(url)
end
def get
response = Net::HTTP.start(uri.hostname, uri.port, use_ssl: uri.scheme == 'https') do |http|
request = Net::HTTP::Get.new(uri)
request.set_range(chunk_start, buffer_size)
http.request(request)
end
raise FailedToGetChunkError unless response.code == '200' || response.code == '206'
response.body.force_encoding(Encoding::BINARY)
end
def size
raise NotImplementedError
end
def write!(data)
raise NotImplementedError
end
def truncate!(offset)
raise NotImplementedError
end
def delete
raise NotImplementedError
end
def change_chunk_index!(job_id, new_chunk_index)
raise NotImplementedError
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
module ChunkStores
class Redis < Base
class << self
def open(job_id, chunk_index, **params)
raise ArgumentError unless job_id && chunk_index
yield self.class.new(self.buffer_key(job_id, chunk_index), params)
end
def exist?(job_id, chunk_index)
Gitlab::Redis::Cache.with do |redis|
redis.exists(self.buffer_key(job_id, chunk_index))
end
end
def chunks_count(job_id)
Gitlab::Redis::Cache.with do |redis|
redis.keys(buffer_key(job_id, '*')).count
end
end
def chunks_size(job_id)
Gitlab::Redis::Cache.with do |redis|
redis.keys(buffer_key(job_id, '*')).inject(0) do |sum, key|
sum + redis.strlen(key)
end
end
end
def buffer_key(job_id, chunk_index)
"live_trace_buffer:#{job_id}:#{chunk_index}"
end
end
attr_reader :buffer_key
def initialize(buffer_key, **params)
super
@buffer_key = buffer_key
end
def get
Gitlab::Redis::Cache.with do |redis|
redis.get(buffer_key)
end
end
def size
Gitlab::Redis::Cache.with do |redis|
redis.strlen(buffer_key)
end
end
def write!(data)
Gitlab::Redis::Cache.with do |redis|
redis.set(buffer_key, data)
end
end
def truncate!(offset)
Gitlab::Redis::Cache.with do |redis|
truncated_data = redis.getrange(buffer_key, 0, offset)
redis.set(buffer_key, truncated_data)
end
end
def delete!
Gitlab::Redis::Cache.with do |redis|
redis.del(buffer_key)
end
end
# def change_chunk_index!(job_id, new_chunk_index)
# Gitlab::Redis::Cache.with do |redis|
# new_buffer_key = self.class.buffer_key(job_id, new_chunk_index)
# redis.rename(buffer_key, new_buffer_key)
# end
# end
end
end
end
end
end
......@@ -136,7 +136,7 @@ module Gitlab
raise WriteError, 'Already opened by another process' unless write_lock_uuid
removal_chunk_index_start = (offset / BUFFER_SIZE)
removal_chunk_index_end = total_chunk_count - 1
removal_chunk_index_end = chunks_count - 1
removal_chunk_offset = offset % BUFFER_SIZE
if removal_chunk_offset > 0
......@@ -164,6 +164,10 @@ module Gitlab
true
end
def delete_chunks!
truncate(0)
end
private
##
......@@ -207,16 +211,16 @@ module Gitlab
(tell / BUFFER_SIZE)
end
def total_chunk_count
def chunks_count
(size / BUFFER_SIZE) + 1
end
def last_chunk?
chunk_index == (total_chunk_count - 1)
chunk_index == (chunks_count - 1)
end
def write_lock_key
"live_trace_write:#{job_id}"
"live_trace:operation:write:#{job_id}"
end
end
end
......
module Gitlab
module Ci
class Trace
module File
module ChunkStore
class Base
InitializeError = Class.new(StandardError)
NotSupportedError = Class.new(StandardError)
attr_reader :buffer_size
attr_reader :chunk_start
attr_reader :url
def initialize(*identifiers, **params)
@buffer_size = params[:buffer_size]
@chunk_start = params[:chunk_start]
@url = params[:url]
end
def get
raise NotImplementedError
end
def size
raise NotImplementedError
end
def write!(data)
raise NotImplementedError
end
def truncate!(offset)
raise NotImplementedError
end
def delete!
raise NotImplementedError
end
def filled?
size == buffer_size
end
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
module File
module ChunkStore
class Database < Base
class << self
def open(job_id, chunk_index, **params)
raise ArgumentError unless job_id && chunk_index
job = Ci::JobTraceChunk.find_or_initialize_by(job_id: job_id, chunk_index: chunk_index)
yield self.class.new(job, params)
end
def exist?(job_id, chunk_index)
Ci::JobTraceChunk.exists?(job_id: job_id, chunk_index: chunk_index)
end
def chunks_count(job_id)
Ci::JobTraceChunk.where(job_id: job_id).count
end
def chunks_size(job_id)
Ci::JobTraceChunk.where(job_id: job_id).pluck('len(data)')
.inject(0){ |sum, data_length| sum + data_length }
end
end
attr_reader :job
def initialize(job, **params)
super
@job = job
end
def get
job.data
end
def size
job.data&.length || 0
end
def write!(data)
raise NotImplementedError, 'Only full size write is supported' unless buffer_size == data.length
job.create!(data: data)
data.length
end
def truncate!(offset)
raise NotImplementedError
end
def delete!
job.destroy!
end
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
module File
module ChunkStore
class ObjectStorage < Base
class << self
def open(job_id, chunk_index, **params)
raise ArgumentError unless job_id && chunk_index
yield self.class.new(params)
end
def exist?(job_id, chunk_index)
raise NotSupportedError
end
def chunks_count(job_id)
raise NotSupportedError
end
end
FailedToGetChunkError = Class.new(StandardError)
def initialize(**params)
super
end
def get
response = Net::HTTP.start(uri.hostname, uri.port, use_ssl: uri.scheme == 'https') do |http|
request = Net::HTTP::Get.new(uri)
request.set_range(chunk_start, buffer_size)
http.request(request)
end
raise FailedToGetChunkError unless response.code == '200' || response.code == '206'
response.body.force_encoding(Encoding::BINARY)
end
def size
raise NotImplementedError
end
def write!(data)
raise NotImplementedError
end
def truncate!(offset)
raise NotImplementedError
end
def delete!
raise NotImplementedError
end
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
module File
module ChunkStore
class Redis < Base
class << self
def open(job_id, chunk_index, **params)
raise ArgumentError unless job_id && chunk_index
yield self.class.new(self.buffer_key(job_id, chunk_index), params)
end
def exist?(job_id, chunk_index)
Gitlab::Redis::Cache.with do |redis|
redis.exists(self.buffer_key(job_id, chunk_index))
end
end
def chunks_count(job_id)
Gitlab::Redis::Cache.with do |redis|
redis.keys(buffer_key(job_id, '*')).count
end
end
def chunks_size(job_id)
Gitlab::Redis::Cache.with do |redis|
redis.keys(buffer_key(job_id, '*')).inject(0) do |sum, key|
sum + redis.strlen(key)
end
end
end
def buffer_key(job_id, chunk_index)
"live_trace_buffer:#{job_id}:#{chunk_index}"
end
end
attr_reader :buffer_key
def initialize(buffer_key, **params)
super
@buffer_key = buffer_key
end
def get
Gitlab::Redis::Cache.with do |redis|
redis.get(buffer_key)
end
end
def size
Gitlab::Redis::Cache.with do |redis|
redis.strlen(buffer_key)
end
end
def write!(data)
Gitlab::Redis::Cache.with do |redis|
redis.set(buffer_key, data)
end
end
def truncate!(offset)
Gitlab::Redis::Cache.with do |redis|
truncated_data = redis.getrange(buffer_key, 0, offset)
redis.set(buffer_key, truncated_data)
end
end
def delete!
Gitlab::Redis::Cache.with do |redis|
redis.del(buffer_key)
end
end
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
module File
class LiveTrace < ChunkedIO
BUFFER_SIZE = 128.kilobytes
class << self
def open(job_id, mode)
stream = self.class.new(job_id, mode)
yield stream
stream.close
end
def exist?(job_id)
ChunkStores::Redis.chunks_count(job_id) > 0 ||
ChunkStores::Database.chunks_count(job_id) > 0
end
end
def initialize(job_id, mode)
super(job_id, calculate_size, mode)
end
def write(data)
raise NotImplementedError, 'Overwrite is not supported' unless tell == size
super(data) do |store|
if store.filled?
# Rotate data from redis to database
ChunkStores::Database.open(job_id, chunk_index, params_for_store) do |to_store|
to_store.write!(store.get)
end
store.delete!
end
end
end
private
def calculate_size
ChunkStores::Redis.chunks_size(job_id) +
ChunkStores::Database.chunks_size(job_id)
end
def chunk_store
if last_chunk?
ChunkStores::Redis
else
ChunkStores::Database
end
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
module File
class Remote < ChunkedIO
BUFFER_SIZE = 128.kilobytes
class << self
def open(job_id, url, size, mode)
stream = self.class.new(job_id, mode)
yield stream
stream.close
end
end
InvalidURLError = Class.new(StandardError)
attr_reader :uri
def initialize(job_id, url, size, mode)
raise InvalidURLError unless ::Gitlab::UrlSanitizer.valid?(url)
@uri = URI(url)
super(job_id, size, mode)
end
def write(data)
raise NotImplementedError
end
def truncate(offset)
raise NotImplementedError
end
def flush
raise NotImplementedError
end
private
def chunk_store
ChunkStores::ObjectStorage
end
def params_for_store
super.merge( { uri: uri } )
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
class LiveTrace < ChunkedIO
BUFFER_SIZE = 128.kilobytes
class << self
def exist?(job_id)
ChunkStores::Redis.chunks_count(job_id) > 0 ||
ChunkStores::Database.chunks_count(job_id) > 0
end
end
def initialize(job_id, mode)
super(job_id, calculate_size, mode)
end
def write(data)
raise NotImplementedError, 'Overwrite is not supported' unless tell == size
super(data) do |store|
if store.filled?
# Rotate data from redis to database
ChunkStores::Database.open(job_id, chunk_index, params_for_store) do |to_store|
to_store.write!(store.get)
end
store.delete!
end
end
end
private
def calculate_size
ChunkStores::Redis.chunks_size(job_id) +
ChunkStores::Database.chunks_size(job_id)
end
def chunk_store
if last_chunk?
ChunkStores::Redis
else
ChunkStores::Database
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
class Remote < ChunkedIO
BUFFER_SIZE = 128.kilobytes
NoSupportError = Class.new(StandardError)
attr_reader :uri
def initialize(job_id, url, size, mode)
@uri = URI(url)
super(job_id, size, mode)
end
def write(data)
raise NoSupportError
end
def truncate(offset)
raise NoSupportError
end
def flush
raise NoSupportError
end
private
def chunk_store
ChunkStores::Http
end
def params_for_store
super.merge( { uri: uri } )
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment