Commit 0f7a3275 authored by George Koltsov's avatar George Koltsov

Add Bulk Import file download & decompression services

  - Bulk Import using ndjson requires files download
  and decompression. For this reason, add 2 new services
  that are responsible for downloading a gzipped ndjson
  file from source GitLab instance and decompress it
  before importing
parent 47a16c30
# frozen_string_literal: true
module BulkImports
class FileDecompressionService
include Gitlab::ImportExport::CommandLineUtil
ServiceError = Class.new(StandardError)
def initialize(dir:, filename:)
@dir = dir
@filename = filename
@filepath = File.join(@dir, @filename)
@decompressed_filename = File.basename(@filename, '.gz')
@decompressed_filepath = File.join(@dir, @decompressed_filename)
end
def execute
validate_dir
validate_decompressed_file_size if Feature.enabled?(:validate_import_decompressed_archive_size)
validate_symlink(filepath)
decompress_file
validate_symlink(decompressed_filepath)
filepath
rescue StandardError => e
File.delete(filepath) if File.exist?(filepath)
File.delete(decompressed_filepath) if File.exist?(decompressed_filepath)
raise e
end
private
attr_reader :dir, :filename, :filepath, :decompressed_filename, :decompressed_filepath
def validate_dir
raise(ServiceError, 'Invalid target directory') unless dir.start_with?(Dir.tmpdir)
end
def validate_decompressed_file_size
raise(ServiceError, 'File decompression error') unless size_validator.valid?
end
def validate_symlink(filepath)
raise(ServiceError, 'Invalid file') if File.lstat(filepath).symlink?
end
def decompress_file
gunzip(dir: dir, filename: filename)
end
def size_validator
@size_validator ||= Gitlab::ImportExport::DecompressedArchiveSizeValidator.new(archive_path: filepath)
end
end
end
# frozen_string_literal: true
module BulkImports
class FileDownloadService
FILE_SIZE_LIMIT = 5.gigabytes
ALLOWED_CONTENT_TYPES = ['application/octet-stream'].freeze
ServiceError = Class.new(StandardError)
def initialize(configuration:, relative_url:, dir:, filename:)
@configuration = configuration
@relative_url = relative_url
@filename = filename
@dir = dir
@filepath = File.join(@dir, @filename)
end
def execute
validate_dir
validate_url
validate_content_type
validate_content_length
download_file
validate_symlink
filepath
end
private
attr_reader :configuration, :relative_url, :dir, :filename, :filepath
def download_file
File.open(filepath, 'wb') do |file|
bytes_downloaded = 0
http_client.stream(relative_url) do |chunk|
bytes_downloaded += chunk.size
raise(ServiceError, 'Invalid downloaded file') if bytes_downloaded > FILE_SIZE_LIMIT
raise(ServiceError, "File download error #{chunk.code}") unless chunk.code == 200
file.write(chunk)
end
end
rescue StandardError => e
File.delete(filepath) if File.exist?(filepath)
raise e
end
def http_client
@http_client ||= BulkImports::Clients::Http.new(
uri: configuration.url,
token: configuration.access_token
)
end
def allow_local_requests?
::Gitlab::CurrentSettings.allow_local_requests_from_web_hooks_and_services?
end
def headers
@headers ||= http_client.head(relative_url).headers
end
def validate_dir
raise(ServiceError, 'Invalid target directory') unless dir.start_with?(Dir.tmpdir)
end
def validate_symlink
if File.lstat(filepath).symlink?
File.delete(filepath)
raise(ServiceError, 'Invalid downloaded file')
end
end
def validate_url
::Gitlab::UrlBlocker.validate!(
http_client.resource_url(relative_url),
allow_localhost: allow_local_requests?,
allow_local_network: allow_local_requests?,
schemes: %w(http https)
)
end
def validate_content_length
content_size = headers['content-length']
raise(ServiceError, 'Invalid content length') if content_size.blank? || content_size.to_i > FILE_SIZE_LIMIT
end
def validate_content_type
content_type = headers['content-type']
raise(ServiceError, 'Invalid content type') if content_type.blank? || ALLOWED_CONTENT_TYPES.exclude?(content_type)
end
end
end
......@@ -18,25 +18,19 @@ module BulkImports
end
def get(resource, query = {})
with_error_handling do
Gitlab::HTTP.get(
resource_url(resource),
headers: request_headers,
follow_redirects: false,
query: query.reverse_merge(request_query)
)
end
request(:get, resource, query: query.reverse_merge(request_query))
end
def post(resource, body = {})
with_error_handling do
Gitlab::HTTP.post(
resource_url(resource),
headers: request_headers,
follow_redirects: false,
body: body
)
request(:post, resource, body: body)
end
def head(resource)
request(:head, resource)
end
def stream(resource, &block)
request(:get, resource, stream_body: true, &block)
end
def each_page(method, resource, query = {}, &block)
......@@ -55,8 +49,36 @@ module BulkImports
end
end
def resource_url(resource)
Gitlab::Utils.append_path(api_url, resource)
end
private
# rubocop:disable GitlabSecurity/PublicSend
def request(method, resource, options = {}, &block)
with_error_handling do
Gitlab::HTTP.public_send(
method,
resource_url(resource),
request_options(options),
&block
)
end
end
# rubocop:enable GitlabSecurity/PublicSend
def request_options(options)
default_options.merge(options)
end
def default_options
{
headers: request_headers,
follow_redirects: false
}
end
def request_query
{
page: @page,
......@@ -88,10 +110,6 @@ module BulkImports
def api_url
Gitlab::Utils.append_path(base_uri, "/api/#{@api_version}")
end
def resource_url(resource)
Gitlab::Utils.append_path(api_url, resource)
end
end
end
end
......@@ -15,8 +15,17 @@ module Gitlab
end
def gzip(dir:, filename:)
gzip_with_options(dir: dir, filename: filename)
end
def gunzip(dir:, filename:)
gzip_with_options(dir: dir, filename: filename, options: 'd')
end
def gzip_with_options(dir:, filename:, options: nil)
filepath = File.join(dir, filename)
cmd = %W(gzip #{filepath})
cmd << "-#{options}" if options
_, status = Gitlab::Popen.popen(cmd)
......
......@@ -15,7 +15,7 @@ module Gitlab
end
def self.file_compression_error
self.new('File compression failed')
self.new('File compression/decompression failed')
end
end
end
......
......@@ -48,6 +48,7 @@ RSpec.describe BulkImports::Clients::Http do
[
'http://gitlab.example:80/api/v4/resource',
hash_including(
follow_redirects: false,
query: {
page: described_class::DEFAULT_PAGE,
per_page: described_class::DEFAULT_PER_PAGE
......@@ -118,6 +119,26 @@ RSpec.describe BulkImports::Clients::Http do
'http://gitlab.example:80/api/v4/resource',
hash_including(
body: {},
follow_redirects: false,
headers: {
'Content-Type' => 'application/json',
'Authorization' => "Bearer #{token}"
}
)
]
end
end
end
describe '#head' do
let(:method) { :head }
include_examples 'performs network request' do
let(:expected_args) do
[
'http://gitlab.example:80/api/v4/resource',
hash_including(
follow_redirects: false,
headers: {
'Content-Type' => 'application/json',
'Authorization' => "Bearer #{token}"
......@@ -127,4 +148,23 @@ RSpec.describe BulkImports::Clients::Http do
end
end
end
describe '#stream' do
it 'performs network request with stream_body option' do
expected_args = [
'http://gitlab.example:80/api/v4/resource',
hash_including(
stream_body: true,
headers: {
'Content-Type' => 'application/json',
'Authorization' => "Bearer #{token}"
}
)
]
expect(Gitlab::HTTP).to receive(:get).with(*expected_args).and_return(response_double)
subject.stream(resource)
end
end
end
......@@ -42,6 +42,8 @@ RSpec.describe Gitlab::ImportExport::CommandLineUtil do
filename = File.basename(tempfile.path)
subject.gzip(dir: path, filename: filename)
expect(File.exist?("#{tempfile.path}.gz")).to eq(true)
end
context 'when exception occurs' do
......@@ -50,4 +52,25 @@ RSpec.describe Gitlab::ImportExport::CommandLineUtil do
end
end
end
describe '#gunzip' do
it 'decompresses specified file' do
tmpdir = Dir.mktmpdir
filename = 'labels.ndjson.gz'
gz_filepath = "spec/fixtures/bulk_imports/#{filename}"
FileUtils.copy_file(gz_filepath, File.join(tmpdir, filename))
subject.gunzip(dir: tmpdir, filename: filename)
expect(File.exist?(File.join(tmpdir, 'labels.ndjson'))).to eq(true)
FileUtils.remove_entry(tmpdir)
end
context 'when exception occurs' do
it 'raises an exception' do
expect { subject.gunzip(dir: path, filename: 'test') }.to raise_error(Gitlab::ImportExport::Error)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::FileDecompressionService do
let_it_be(:tmpdir) { Dir.mktmpdir }
let_it_be(:ndjson_filename) { 'labels.ndjson' }
let_it_be(:ndjson_filepath) { File.join(tmpdir, ndjson_filename) }
let_it_be(:gz_filename) { "#{ndjson_filename}.gz" }
let_it_be(:gz_filepath) { "spec/fixtures/bulk_imports/#{gz_filename}" }
before do
FileUtils.copy_file(gz_filepath, File.join(tmpdir, gz_filename))
FileUtils.remove_entry(ndjson_filepath) if File.exist?(ndjson_filepath)
end
after(:all) do
FileUtils.remove_entry(tmpdir)
end
subject { described_class.new(dir: tmpdir, filename: gz_filename) }
describe '#execute' do
it 'decompresses specified file' do
subject.execute
expect(File.exist?(File.join(tmpdir, ndjson_filename))).to eq(true)
expect(File.open(ndjson_filepath, &:readline)).to include('title', 'description')
end
context 'when validate_import_decompressed_archive_size feature flag is enabled' do
before do
stub_feature_flags(validate_import_decompressed_archive_size: true)
end
it 'performs decompressed file size validation' do
expect_next_instance_of(Gitlab::ImportExport::DecompressedArchiveSizeValidator) do |validator|
expect(validator).to receive(:valid?).and_return(true)
end
subject.execute
end
end
context 'when validate_import_decompressed_archive_size feature flag is disabled' do
before do
stub_feature_flags(validate_import_decompressed_archive_size: false)
end
it 'does not perform decompressed file size validation' do
expect(Gitlab::ImportExport::DecompressedArchiveSizeValidator).not_to receive(:new)
subject.execute
end
end
context 'when dir is not in tmpdir' do
subject { described_class.new(dir: '/etc', filename: 'filename') }
it 'raises an error' do
expect { subject.execute }.to raise_error(described_class::ServiceError, 'Invalid target directory')
end
end
context 'when compressed file is a symlink' do
let_it_be(:symlink) { File.join(tmpdir, 'symlink.gz') }
before do
FileUtils.ln_s(File.join(tmpdir, gz_filename), symlink)
end
subject { described_class.new(dir: tmpdir, filename: 'symlink.gz') }
it 'raises an error and removes the file' do
expect { subject.execute }.to raise_error(described_class::ServiceError, 'Invalid file')
expect(File.exist?(symlink)).to eq(false)
end
end
context 'when decompressed file is a symlink' do
let_it_be(:symlink) { File.join(tmpdir, 'symlink') }
before do
FileUtils.ln_s(File.join(tmpdir, ndjson_filename), symlink)
subject.instance_variable_set(:@decompressed_filepath, symlink)
end
subject { described_class.new(dir: tmpdir, filename: gz_filename) }
it 'raises an error and removes the file' do
expect { subject.execute }.to raise_error(described_class::ServiceError, 'Invalid file')
expect(File.exist?(symlink)).to eq(false)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::FileDownloadService do
describe '#execute' do
let_it_be(:config) { build(:bulk_import_configuration) }
let_it_be(:content_type) { 'application/octet-stream' }
let_it_be(:filename) { 'file_download_service_spec' }
let_it_be(:tmpdir) { Dir.tmpdir }
let_it_be(:filepath) { File.join(tmpdir, filename) }
let(:chunk_double) { double('chunk', size: 1000, code: 200) }
let(:response_double) do
double(
code: 200,
success?: true,
parsed_response: {},
headers: {
'content-length' => 100,
'content-type' => content_type
}
)
end
subject { described_class.new(configuration: config, relative_url: '/test', dir: tmpdir, filename: filename) }
before do
allow_next_instance_of(BulkImports::Clients::Http) do |client|
allow(client).to receive(:head).and_return(response_double)
allow(client).to receive(:stream).and_yield(chunk_double)
end
end
it 'downloads file' do
subject.execute
expect(File.exist?(filepath)).to eq(true)
expect(File.read(filepath)).to include('chunk')
end
context 'when url is not valid' do
it 'raises an error' do
stub_application_setting(allow_local_requests_from_web_hooks_and_services: false)
double = instance_double(BulkImports::Configuration, url: 'https://localhost', access_token: 'token')
service = described_class.new(configuration: double, relative_url: '/test', dir: tmpdir, filename: filename)
expect { service.execute }.to raise_error(Gitlab::UrlBlocker::BlockedUrlError)
end
end
context 'when content-type is not valid' do
let(:content_type) { 'invalid' }
it 'raises an error' do
expect { subject.execute }.to raise_error(described_class::ServiceError, 'Invalid content type')
end
end
context 'when content-length is not valid' do
context 'when content-length exceeds limit' do
before do
stub_const("#{described_class}::FILE_SIZE_LIMIT", 1)
end
it 'raises an error' do
expect { subject.execute }.to raise_error(described_class::ServiceError, 'Invalid content length')
end
end
context 'when content-length is missing' do
let(:response_double) { double(success?: true, headers: { 'content-type' => content_type }) }
it 'raises an error' do
expect { subject.execute }.to raise_error(described_class::ServiceError, 'Invalid content length')
end
end
end
context 'when partially downloaded file exceeds limit' do
before do
stub_const("#{described_class}::FILE_SIZE_LIMIT", 150)
end
it 'raises an error' do
expect { subject.execute }.to raise_error(described_class::ServiceError, 'Invalid downloaded file')
end
end
context 'when chunk code is not 200' do
let(:chunk_double) { double('chunk', size: 1000, code: 307) }
it 'raises an error' do
expect { subject.execute }.to raise_error(described_class::ServiceError, 'File download error 307')
end
end
context 'when file is a symlink' do
let_it_be(:symlink) { File.join(tmpdir, 'symlink') }
before do
FileUtils.ln_s(File.join(tmpdir, filename), symlink)
end
subject { described_class.new(configuration: config, relative_url: '/test', dir: tmpdir, filename: 'symlink') }
it 'raises an error and removes the file' do
expect { subject.execute }.to raise_error(described_class::ServiceError, 'Invalid downloaded file')
expect(File.exist?(symlink)).to eq(false)
end
end
context 'when dir is not in tmpdir' do
subject { described_class.new(configuration: config, relative_url: '/test', dir: '/etc', filename: filename) }
it 'raises an error' do
expect { subject.execute }.to raise_error(described_class::ServiceError, 'Invalid target directory')
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment