Commit 0971f301 authored by Shinya Maeda's avatar Shinya Maeda

Add new concerns

parent d1632da8
......@@ -61,7 +61,7 @@ module Gitlab
stream = Gitlab::Ci::Trace::Stream.new do
if trace_artifact
trace_artifact.open
elsif Feature.enabled?('ci_enable_live_trace') && ChunkedFile::LiveTrace.exists?(job.id)
elsif ChunkedFile::LiveTrace.exist?(job.id)
ChunkedFile::LiveTrace.new(job.id, "rb")
elsif current_path
File.open(current_path, "rb")
......@@ -109,10 +109,10 @@ module Gitlab
raise ArchiveError, 'Already archived' if trace_artifact
raise ArchiveError, 'Job is not finished yet' unless job.complete?
if Feature.enabled?('ci_enable_live_trace') && ChunkedFile::LiveTrace.exists?(job.id)
if ChunkedFile::LiveTrace.exist?(job.id)
ChunkedFile::LiveTrace.open(job.id, "wb") do |stream|
archive_stream!(stream)
stream.truncate(0)
stream.delete
end
elsif current_path
File.open(current_path) do |stream|
......
......@@ -29,6 +29,10 @@ module Gitlab
::Ci::JobTraceChunk.where(job_id: job_id).pluck('data')
.inject(0) { |sum, data| sum + data.length }
end
def delete_all(job_id)
::Ci::JobTraceChunk.destroy_all(job_id: job_id)
end
end
attr_reader :job_trace_chunk
......@@ -67,9 +71,7 @@ module Gitlab
end
def truncate!(offset)
raise NotImplementedError, 'Partial truncate is not supported' unless offset == 0
delete!
raise NotImplementedError
end
def delete!
......
......@@ -38,6 +38,14 @@ module Gitlab
end
end
def delete_all(job_id)
Gitlab::Redis::Cache.with do |redis|
redis.scan_each(:match => buffer_key(job_id, '?')) do |key|
redis.del(key)
end
end
end
def buffer_key(job_id, chunk_index)
"live_trace_buffer:#{job_id}:#{chunk_index}"
end
......@@ -87,7 +95,6 @@ module Gitlab
puts "#{self.class.name} - #{__callee__}: offset: #{offset.inspect} params[:chunk_index]: #{params[:chunk_index]}"
Gitlab::Redis::Cache.with do |redis|
return 0 unless redis.exists(buffer_key)
return delete! if offset == 0
truncated_data = redis.getrange(buffer_key, 0, offset)
redis.set(buffer_key, truncated_data)
......
......@@ -8,43 +8,30 @@ module Gitlab
class Trace
module ChunkedFile
class ChunkedIO
class << self
def open(*args)
stream = self.new(*args)
yield stream
ensure
stream&.close
end
end
WriteError = Class.new(StandardError)
FailedToGetChunkError = Class.new(StandardError)
extend ChunkedFile::Concerns::Opener
include ChunkedFile::Concerns::Errors
include ChunkedFile::Concerns::Hooks
include ChunkedFile::Concerns::Callbacks
prepend ChunkedFile::Concerns::Permissions
attr_reader :size
attr_reader :tell
attr_reader :chunk, :chunk_range
attr_reader :write_lock_uuid
attr_reader :job_id
attr_reader :mode
alias_method :pos, :tell
def initialize(job_id, size, mode)
def initialize(job_id, size, mode = 'rb')
@size = size
@tell = 0
@job_id = job_id
@mode = mode
if /(w|a)/ =~ mode
@write_lock_uuid = Gitlab::ExclusiveLease.new(write_lock_key, timeout: 1.hour.to_i).try_obtain
raise WriteError, 'Already opened by another process' unless write_lock_uuid
seek(0, IO::SEEK_END) if /a/ =~ mode
end
raise NotImplementedError, "Mode 'w' is not supported" if mode.include?('w')
end
def close
Gitlab::ExclusiveLease.cancel(write_lock_key, write_lock_uuid) if write_lock_uuid
end
def binmode
......@@ -55,20 +42,20 @@ module Gitlab
true
end
def seek(pos, where = IO::SEEK_SET)
def seek(amount, where = IO::SEEK_SET)
new_pos =
case where
when IO::SEEK_END
size + pos
size + amount
when IO::SEEK_SET
pos
amount
when IO::SEEK_CUR
tell + pos
tell + amount
else
-1
end
raise 'new position is outside of file' if new_pos < 0 || new_pos > size
raise ArgumentError, 'new position is outside of file' if new_pos < 0 || new_pos > size
@tell = new_pos
end
......@@ -122,42 +109,18 @@ module Gitlab
out
end
def write(data, &block)
raise WriteError, 'Could not write without lock' unless write_lock_uuid
raise WriteError, 'Could not write empty data' unless data.present?
_data = data.dup
prev_tell = tell
def write(data)
raise ArgumentError, 'Could not write empty data' unless data.present?
until _data.empty?
writable_space = buffer_size - chunk_offset
writing_size = [writable_space, _data.length].min
written_size = write_chunk!(_data.slice!(0...writing_size), &block)
@tell += written_size
@size = [tell, size].max
if mode.include?('w')
write_as_overwrite(data)
elsif mode.include?('a')
write_as_append(data)
end
tell - prev_tell
end
def truncate(offset, &block)
raise WriteError, 'Could not write without lock' unless write_lock_uuid
raise WriteError, 'Offset is out of bound' if offset > size || offset < 0
@tell = size - 1
until size == offset
truncatable_space = size - chunk_start
_chunk_offset = (offset <= chunk_start) ? 0 : offset % buffer_size
removed_size = truncate_chunk!(_chunk_offset, &block)
@tell -= removed_size
@size -= removed_size
end
@tell = [tell, 0].max
@size = [size, 0].max
def truncate(offset)
raise NotImplementedError
end
def flush
......@@ -178,9 +141,6 @@ module Gitlab
unless in_range?
chunk_store.open(job_id, chunk_index, params_for_store) do |store|
@chunk = store.get
raise FailedToGetChunkError unless chunk && chunk.length > 0
@chunk_range = (chunk_start...(chunk_start + chunk.length))
end
end
......@@ -188,8 +148,33 @@ module Gitlab
@chunk[chunk_offset..buffer_size]
end
def write_chunk!(data, &block)
def write_as_overwrite(data)
raise NotImplementedError, "Overwrite is not supported"
end
def write_as_append(data)
@tell = size
data_size = data.size
new_tell = tell + data_size
data_offset = 0
until tell == new_tell
writable_size = buffer_size - chunk_offset
writable_data = data[data_offset...(data_offset + writable_size)]
written_size = write_chunk(writable_data)
data_offset += written_size
@tell += written_size
@size = [tell, size].max
end
data_size
end
def write_chunk(data)
chunk_store.open(job_id, chunk_index, params_for_store) do |store|
with_callbacks(:write_chunk, store) do
written_size = if buffer_size == data.length
store.write!(data)
else
......@@ -198,22 +183,21 @@ module Gitlab
raise WriteError, 'Written size mismatch' unless data.length == written_size
block.call(store) if block_given?
written_size
end
end
end
def truncate_chunk!(offset, &block)
def truncate_chunk(offset)
chunk_store.open(job_id, chunk_index, params_for_store) do |store|
with_callbacks(:truncate_chunk, store) do
removed_size = store.size - offset
store.truncate!(offset)
block.call(store) if block_given?
removed_size
end
end
end
def params_for_store(c_index = chunk_index)
{
......@@ -240,19 +224,15 @@ module Gitlab
end
def chunks_count
(size / buffer_size) + (has_extra? ? 1 : 0)
(size / buffer_size)
end
def has_extra?
(size % buffer_size) > 0
def first_chunk?
chunk_index == 0
end
def last_chunk?
chunks_count == 0 || chunk_index == (chunks_count - 1) || chunk_index == chunks_count
end
def write_lock_key
"live_trace:operation:write:#{job_id}"
chunks_count == 0 || chunk_index == (chunks_count - 1)
end
def chunk_store
......
module Gitlab
module Ci
class Trace
module ChunkedFile
module Concerns
module Callbacks
extend ActiveSupport::Concern
included do
class_attribute :_before_callbacks, :_after_callbacks,
:instance_writer => false
self._before_callbacks = Hash.new []
self._after_callbacks = Hash.new []
end
def with_callbacks(kind, *args)
self.class._before_callbacks[kind].each { |c| send c, *args }
yield
self.class._after_callbacks[kind].each { |c| send c, *args }
end
module ClassMethods
def before_callback(kind, callback)
self._before_callbacks = self._before_callbacks.
merge kind => _before_callbacks[kind] + [callback]
end
def after_callback(kind, callback)
self._after_callbacks = self._after_callbacks.
merge kind => _after_callbacks[kind] + [callback]
end
end
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
module ChunkedFile
module Concerns
module Errors
extend ActiveSupport::Concern
included do
WriteError = Class.new(StandardError)
FailedToGetChunkError = Class.new(StandardError)
end
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
module ChunkedFile
module Concerns
module Hooks
extend ActiveSupport::Concern
included do
class_attribute :_before_methods, :_after_methods,
:instance_writer => false
self._before_methods = Hash.new []
self._after_methods = Hash.new []
end
class_methods do
def before_method(kind, callback)
self._before_methods = self._before_methods.
merge kind => _before_methods[kind] + [callback]
end
def after_method(kind, callback)
self._after_methods = self._after_methods.
merge kind => _after_methods[kind] + [callback]
end
end
def method_added(method_name)
return if self.class._before_methods.values.include?(method_name)
return if self.class._after_methods.values.include?(method_name)
return if hooked_methods.include?(method_name)
add_hooks_to(method_name)
end
private
def hooked_methods
@hooked_methods ||= []
end
def add_hooks_to(method_name)
hooked_methods << method_name
original_method = instance_method(method_name)
# re-define the method, but notice how we reference the original
# method definition
define_method(method_name) do |*args, &block|
self.class._before_methods[method_name].each { |hook| method(hook).call }
# now invoke the original method
original_method.bind(self).call(*args, &block).tap do
self.class._after_methods[method_name].each { |hook| method(hook).call }
end
end
end
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
module ChunkedFile
module Concerns
module Opener
extend ActiveSupport::Concern
class_methods do
def open(*args)
stream = self.new(*args)
yield stream
ensure
stream&.close
end
end
end
end
end
end
end
end
module Gitlab
module Ci
class Trace
module ChunkedFile
module Concerns
module Permissions
extend ActiveSupport::Concern
included do
PermissionError = Class.new(StandardError)
attr_reader :write_lock_uuid
# mode checks
before_method :read, :can_read!
before_method :readline, :can_read!
before_method :each_line, :can_read!
before_method :write, :can_write!
before_method :truncate, :can_write!
# write_lock
before_method :write, :check_lock!
before_method :truncate, :check_lock!
end
def initialize(job_id, size, mode = 'rb')
if /(w|a)/ =~ mode
@write_lock_uuid = Gitlab::ExclusiveLease
.new(write_lock_key, timeout: 1.hour.to_i).try_obtain
raise PermissionError, 'Already opened by another process' unless write_lock_uuid
end
super
end
def close
if write_lock_uuid
Gitlab::ExclusiveLease.cancel(write_lock_key, write_lock_uuid)
end
super
end
def check_lock!
raise PermissionError, 'Could not write without lock' unless write_lock_uuid
end
def can_read!
raise IOError, 'not opened for reading' unless /(r|+)/ =~ mode
end
def can_write!
raise IOError, 'not opened for writing' unless /(w|a)/ =~ mode
end
def write_lock_key
"live_trace:operation:write:#{job_id}"
end
end
end
end
end
end
end
......@@ -10,40 +10,36 @@ module Gitlab
end
end
after_callback :write_chunk, :stash_to_database
def initialize(job_id, mode)
super(job_id, calculate_size(job_id), mode)
end
def write(data)
raise NotImplementedError, 'Overwrite is not supported' unless tell == size
super(data) do |store|
if store.filled?
def stash_to_database(store)
# Once data is filled into redis, move the data to database
if store.filled? &&
ChunkStore::Database.open(job_id, chunk_index, params_for_store) do |to_store|
to_store.write!(store.get)
store.delete!
end
end
end
end
# Efficient process than iterating each
def truncate(offset)
super(offset) do |store|
next if chunk_index == 0
prev_chunk_index = chunk_index - 1
if ChunkStore::Database.exist?(job_id, prev_chunk_index)
# Swap data from Database to Redis to truncate any size than buffer_size
ChunkStore::Database.open(job_id, prev_chunk_index, params_for_store(prev_chunk_index)) do |from_store|
ChunkStore::Redis.open(job_id, prev_chunk_index, params_for_store(prev_chunk_index)) do |to_store|
to_store.write!(from_store.get)
from_store.delete!
end
end
if truncate == 0
self.delete_all(job_id)
elsif offset == size
# no-op
else
raise NotImplementedError, 'Unexpected operation'
end
end
def delete
ChunkStores::Redis.delete_all(job_id)
ChunkStores::Database.delete_all(job_id)
end
private
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment