Commit bc00803a authored by Kamil Trzciński's avatar Kamil Trzciński

Access metadata directly from Object Storage

Previously we would pull the file, now, we just stream-it as needed from Object Storage
parent 67157de1
...@@ -437,9 +437,9 @@ module Ci ...@@ -437,9 +437,9 @@ module Ci
end end
def artifacts_metadata_entry(path, **options) def artifacts_metadata_entry(path, **options)
artifacts_metadata.use_file do |metadata_path| artifacts_metadata.open do |metadata_stream|
metadata = Gitlab::Ci::Build::Artifacts::Metadata.new( metadata = Gitlab::Ci::Build::Artifacts::Metadata.new(
metadata_path, metadata_stream,
path, path,
**options) **options)
......
...@@ -71,6 +71,23 @@ class GitlabUploader < CarrierWave::Uploader::Base ...@@ -71,6 +71,23 @@ class GitlabUploader < CarrierWave::Uploader::Base
File.join('/', self.class.base_dir, dynamic_segment, filename) File.join('/', self.class.base_dir, dynamic_segment, filename)
end end
def open
stream = if file_storage?
File.open(path, "rb") if path
else
::Gitlab::HttpIO.new(url, cached_size) if url
end
return unless stream
return stream unless block_given?
begin
yield(stream)
ensure
stream.close
end
end
private private
# Designed to be overridden by child uploaders that have a dynamic path # Designed to be overridden by child uploaders that have a dynamic path
......
...@@ -18,14 +18,6 @@ class JobArtifactUploader < GitlabUploader ...@@ -18,14 +18,6 @@ class JobArtifactUploader < GitlabUploader
dynamic_segment dynamic_segment
end end
def open
if file_storage?
File.open(path, "rb") if path
else
::Gitlab::Ci::Trace::HttpIO.new(url, cached_size) if url
end
end
private private
def dynamic_segment def dynamic_segment
......
---
title: Access metadata directly from Object Storage
merge_request:
author:
type: performance
...@@ -7,14 +7,15 @@ module Gitlab ...@@ -7,14 +7,15 @@ module Gitlab
module Artifacts module Artifacts
class Metadata class Metadata
ParserError = Class.new(StandardError) ParserError = Class.new(StandardError)
InvalidStreamError = Class.new(StandardError)
VERSION_PATTERN = /^[\w\s]+(\d+\.\d+\.\d+)/ VERSION_PATTERN = /^[\w\s]+(\d+\.\d+\.\d+)/
INVALID_PATH_PATTERN = %r{(^\.?\.?/)|(/\.?\.?/)} INVALID_PATH_PATTERN = %r{(^\.?\.?/)|(/\.?\.?/)}
attr_reader :file, :path, :full_version attr_reader :stream, :path, :full_version
def initialize(file, path, **opts) def initialize(stream, path, **opts)
@file, @path, @opts = file, path, opts @stream, @path, @opts = stream, path, opts
@full_version = read_version @full_version = read_version
end end
...@@ -103,7 +104,17 @@ module Gitlab ...@@ -103,7 +104,17 @@ module Gitlab
end end
def gzip(&block) def gzip(&block)
Zlib::GzipReader.open(@file, &block) raise InvalidStreamError, "Invalid stream" unless @stream
# restart gzip reading
@stream.seek(0)
gz = Zlib::GzipReader.new(@stream)
yield(gz)
rescue Zlib::Error => e
raise InvalidStreamError, e.message
ensure
gz&.finish
end end
end end
end end
......
##
# This class is compatible with IO class (https://ruby-doc.org/core-2.3.1/IO.html)
# source: https://gitlab.com/snippets/1685610
module Gitlab
module Ci
class Trace
class HttpIO
BUFFER_SIZE = 128.kilobytes
InvalidURLError = Class.new(StandardError)
FailedToGetChunkError = Class.new(StandardError)
attr_reader :uri, :size
attr_reader :tell
attr_reader :chunk, :chunk_range
alias_method :pos, :tell
def initialize(url, size)
raise InvalidURLError unless ::Gitlab::UrlSanitizer.valid?(url)
@uri = URI(url)
@size = size
@tell = 0
end
def close
# no-op
end
def binmode
# no-op
end
def binmode?
true
end
def path
nil
end
def url
@uri.to_s
end
def seek(pos, where = IO::SEEK_SET)
new_pos =
case where
when IO::SEEK_END
size + pos
when IO::SEEK_SET
pos
when IO::SEEK_CUR
tell + pos
else
-1
end
raise 'new position is outside of file' if new_pos < 0 || new_pos > size
@tell = new_pos
end
def eof?
tell == size
end
def each_line
until eof?
line = readline
break if line.nil?
yield(line)
end
end
def read(length = nil, outbuf = "")
out = ""
length ||= size - tell
until length <= 0 || eof?
data = get_chunk
break if data.empty?
chunk_bytes = [BUFFER_SIZE - chunk_offset, length].min
chunk_data = data.byteslice(0, chunk_bytes)
out << chunk_data
@tell += chunk_data.bytesize
length -= chunk_data.bytesize
end
# If outbuf is passed, we put the output into the buffer. This supports IO.copy_stream functionality
if outbuf
outbuf.slice!(0, outbuf.bytesize)
outbuf << out
end
out
end
def readline
out = ""
until eof?
data = get_chunk
new_line = data.index("\n")
if !new_line.nil?
out << data[0..new_line]
@tell += new_line + 1
break
else
out << data
@tell += data.bytesize
end
end
out
end
def write(data)
raise NotImplementedError
end
def truncate(offset)
raise NotImplementedError
end
def flush
raise NotImplementedError
end
def present?
true
end
private
##
# The below methods are not implemented in IO class
#
def in_range?
@chunk_range&.include?(tell)
end
def get_chunk
unless in_range?
response = Net::HTTP.start(uri.hostname, uri.port, proxy_from_env: true, use_ssl: uri.scheme == 'https') do |http|
http.request(request)
end
raise FailedToGetChunkError unless response.code == '200' || response.code == '206'
@chunk = response.body.force_encoding(Encoding::BINARY)
@chunk_range = response.content_range
##
# Note: If provider does not return content_range, then we set it as we requested
# Provider: minio
# - When the file size is larger than requested Content-range, the Content-range is included in responces with Net::HTTPPartialContent 206
# - When the file size is smaller than requested Content-range, the Content-range is included in responces with Net::HTTPPartialContent 206
# Provider: AWS
# - When the file size is larger than requested Content-range, the Content-range is included in responces with Net::HTTPPartialContent 206
# - When the file size is smaller than requested Content-range, the Content-range is included in responces with Net::HTTPPartialContent 206
# Provider: GCS
# - When the file size is larger than requested Content-range, the Content-range is included in responces with Net::HTTPPartialContent 206
# - When the file size is smaller than requested Content-range, the Content-range is included in responces with Net::HTTPOK 200
@chunk_range ||= (chunk_start...(chunk_start + @chunk.bytesize))
end
@chunk[chunk_offset..BUFFER_SIZE]
end
def request
Net::HTTP::Get.new(uri).tap do |request|
request.set_range(chunk_start, BUFFER_SIZE)
end
end
def chunk_offset
tell % BUFFER_SIZE
end
def chunk_start
(tell / BUFFER_SIZE) * BUFFER_SIZE
end
def chunk_end
[chunk_start + BUFFER_SIZE, size].min
end
end
end
end
end
##
# This class is compatible with IO class (https://ruby-doc.org/core-2.3.1/IO.html)
# source: https://gitlab.com/snippets/1685610
module Gitlab
class HttpIO
BUFFER_SIZE = 128.kilobytes
InvalidURLError = Class.new(StandardError)
FailedToGetChunkError = Class.new(StandardError)
attr_reader :uri, :size
attr_reader :tell
attr_reader :chunk, :chunk_range
alias_method :pos, :tell
def initialize(url, size)
raise InvalidURLError unless ::Gitlab::UrlSanitizer.valid?(url)
@uri = URI(url)
@size = size
@tell = 0
end
def close
# no-op
end
def binmode
# no-op
end
def binmode?
true
end
def path
nil
end
def url
@uri.to_s
end
def seek(pos, where = IO::SEEK_SET)
new_pos =
case where
when IO::SEEK_END
size + pos
when IO::SEEK_SET
pos
when IO::SEEK_CUR
tell + pos
else
-1
end
raise 'new position is outside of file' if new_pos < 0 || new_pos > size
@tell = new_pos
end
def eof?
tell == size
end
def each_line
until eof?
line = readline
break if line.nil?
yield(line)
end
end
def read(length = nil, outbuf = "")
out = ""
length ||= size - tell
until length <= 0 || eof?
data = get_chunk
break if data.empty?
chunk_bytes = [BUFFER_SIZE - chunk_offset, length].min
chunk_data = data.byteslice(0, chunk_bytes)
out << chunk_data
@tell += chunk_data.bytesize
length -= chunk_data.bytesize
end
# If outbuf is passed, we put the output into the buffer. This supports IO.copy_stream functionality
if outbuf
outbuf.slice!(0, outbuf.bytesize)
outbuf << out
end
out
end
def readline
out = ""
until eof?
data = get_chunk
new_line = data.index("\n")
if !new_line.nil?
out << data[0..new_line]
@tell += new_line + 1
break
else
out << data
@tell += data.bytesize
end
end
out
end
def write(data)
raise NotImplementedError
end
def truncate(offset)
raise NotImplementedError
end
def flush
raise NotImplementedError
end
def present?
true
end
private
##
# The below methods are not implemented in IO class
#
def in_range?
@chunk_range&.include?(tell)
end
def get_chunk
unless in_range?
response = Net::HTTP.start(uri.hostname, uri.port, proxy_from_env: true, use_ssl: uri.scheme == 'https') do |http|
http.request(request)
end
raise FailedToGetChunkError unless response.code == '200' || response.code == '206'
@chunk = response.body.force_encoding(Encoding::BINARY)
@chunk_range = response.content_range
##
# Note: If provider does not return content_range, then we set it as we requested
# Provider: minio
# - When the file size is larger than requested Content-range, the Content-range is included in responces with Net::HTTPPartialContent 206
# - When the file size is smaller than requested Content-range, the Content-range is included in responces with Net::HTTPPartialContent 206
# Provider: AWS
# - When the file size is larger than requested Content-range, the Content-range is included in responces with Net::HTTPPartialContent 206
# - When the file size is smaller than requested Content-range, the Content-range is included in responces with Net::HTTPPartialContent 206
# Provider: GCS
# - When the file size is larger than requested Content-range, the Content-range is included in responces with Net::HTTPPartialContent 206
# - When the file size is smaller than requested Content-range, the Content-range is included in responces with Net::HTTPOK 200
@chunk_range ||= (chunk_start...(chunk_start + @chunk.bytesize))
end
@chunk[chunk_offset..BUFFER_SIZE]
end
def request
Net::HTTP::Get.new(uri).tap do |request|
request.set_range(chunk_start, BUFFER_SIZE)
end
end
def chunk_offset
tell % BUFFER_SIZE
end
def chunk_start
(tell / BUFFER_SIZE) * BUFFER_SIZE
end
def chunk_end
[chunk_start + BUFFER_SIZE, size].min
end
end
end
...@@ -245,7 +245,7 @@ describe Projects::JobsController, :clean_gitlab_redis_shared_state do ...@@ -245,7 +245,7 @@ describe Projects::JobsController, :clean_gitlab_redis_shared_state do
end end
it 'returns a trace' do it 'returns a trace' do
expect { get_trace }.to raise_error(Gitlab::Ci::Trace::HttpIO::FailedToGetChunkError) expect { get_trace }.to raise_error(Gitlab::HttpIO::FailedToGetChunkError)
end end
end end
end end
......
...@@ -2,13 +2,21 @@ require 'spec_helper' ...@@ -2,13 +2,21 @@ require 'spec_helper'
describe Gitlab::Ci::Build::Artifacts::Metadata do describe Gitlab::Ci::Build::Artifacts::Metadata do
def metadata(path = '', **opts) def metadata(path = '', **opts)
described_class.new(metadata_file_path, path, **opts) described_class.new(metadata_file_stream, path, **opts)
end end
let(:metadata_file_path) do let(:metadata_file_path) do
Rails.root + 'spec/fixtures/ci_build_artifacts_metadata.gz' Rails.root + 'spec/fixtures/ci_build_artifacts_metadata.gz'
end end
let(:metadata_file_stream) do
File.open(metadata_file_path) if metadata_file_path
end
after do
metadata_file_stream&.close
end
context 'metadata file exists' do context 'metadata file exists' do
describe '#find_entries! empty string' do describe '#find_entries! empty string' do
subject { metadata('').find_entries! } subject { metadata('').find_entries! }
...@@ -86,11 +94,21 @@ describe Gitlab::Ci::Build::Artifacts::Metadata do ...@@ -86,11 +94,21 @@ describe Gitlab::Ci::Build::Artifacts::Metadata do
end end
context 'metadata file does not exist' do context 'metadata file does not exist' do
let(:metadata_file_path) { '' } let(:metadata_file_path) { nil }
describe '#find_entries!' do
it 'raises error' do
expect { metadata.find_entries! }.to raise_error(described_class::InvalidStreamError, /Invalid stream/)
end
end
end
context 'metadata file is invalid' do
let(:metadata_file_path) { Rails.root + 'spec/fixtures/ci_build_artifacts.zip' }
describe '#find_entries!' do describe '#find_entries!' do
it 'raises error' do it 'raises error' do
expect { metadata.find_entries! }.to raise_error(Errno::ENOENT) expect { metadata.find_entries! }.to raise_error(described_class::InvalidStreamError, /not in gzip format/)
end end
end end
end end
......
require 'spec_helper' require 'spec_helper'
describe Gitlab::Ci::Trace::HttpIO do describe Gitlab::HttpIO do
include HttpIOHelpers include HttpIOHelpers
let(:http_io) { described_class.new(url, size) } let(:http_io) { described_class.new(url, size) }
......
...@@ -54,12 +54,12 @@ module HttpIOHelpers ...@@ -54,12 +54,12 @@ module HttpIOHelpers
def set_smaller_buffer_size_than(file_size) def set_smaller_buffer_size_than(file_size)
blocks = (file_size / 128) blocks = (file_size / 128)
new_size = (blocks / 2) * 128 new_size = (blocks / 2) * 128
stub_const("Gitlab::Ci::Trace::HttpIO::BUFFER_SIZE", new_size) stub_const("Gitlab::HttpIO::BUFFER_SIZE", new_size)
end end
def set_larger_buffer_size_than(file_size) def set_larger_buffer_size_than(file_size)
blocks = (file_size / 128) blocks = (file_size / 128)
new_size = (blocks * 2) * 128 new_size = (blocks * 2) * 128
stub_const("Gitlab::Ci::Trace::HttpIO::BUFFER_SIZE", new_size) stub_const("Gitlab::HttpIO::BUFFER_SIZE", new_size)
end end
end end
...@@ -55,7 +55,7 @@ describe JobArtifactUploader do ...@@ -55,7 +55,7 @@ describe JobArtifactUploader do
end end
it 'returns http io stream' do it 'returns http io stream' do
is_expected.to be_a(Gitlab::Ci::Trace::HttpIO) is_expected.to be_a(Gitlab::HttpIO)
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment