Commit c11e6700 authored by Mike Kozono's avatar Mike Kozono

Add Geo::BlobDownloadService service

which creates the registry if needed, downloads the
blob, and updates sync state with the result.
parent cc492f2e
# frozen_string_literal: true
module Geo
class BlobDownloadService
include Gitlab::Geo::LogHelpers
# Imagine a multi-gigabyte LFS object file and an instance on the other side
# of the earth
LEASE_TIMEOUT = 8.hours.freeze
include ExclusiveLeaseGuard
# Initialize a new blob downloader service
#
# @param [Gitlab::Geo::Replicator] replicator instance
def initialize(replicator:)
@replicator = replicator
end
# Downloads a blob from the primary and places it where it should be. And
# records sync status in Registry.
#
# Exits early if another instance is running for the same replicable model.
#
# @return [Boolean] true if synced, false if not
def execute
try_obtain_lease do
start_time = Time.now
registry.start!
download_result = ::Gitlab::Geo::Replication::BlobDownloader.new(replicator: @replicator).execute
mark_as_synced = download_result.success || download_result.primary_missing_file
if mark_as_synced
registry.synced!
else
message = download_result.reason
error = download_result.extra_details&.fetch(:error, nil)
registry.failed!(message, error)
end
log_download(mark_as_synced, download_result, start_time)
!!mark_as_synced
end
end
private
def registry
@registry ||= @replicator.registry
end
def log_download(mark_as_synced, download_result, start_time)
metadata = {
mark_as_synced: mark_as_synced,
download_success: download_result.success,
bytes_downloaded: download_result.bytes_downloaded,
primary_missing_file: download_result.primary_missing_file,
download_time_s: (Time.now - start_time).to_f.round(3),
reason: download_result.reason
}
metadata.merge(download_result.extra_details) if download_result.extra_details
log_info("Blob download", metadata)
end
def lease_key
@lease_key ||= "#{self.class.name.underscore}:#{@replicator.replicable_name}:#{@replicator.model_record.id}"
end
def lease_timeout
LEASE_TIMEOUT
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module Replication
class BlobDownloader
TEMP_PREFIX = 'tmp_'.freeze
attr_reader :replicator
delegate :replicable_name, :model_record, :primary_checksum, :carrierwave_uploader, to: :replicator
delegate :file_storage?, to: :carrierwave_uploader
class Result
attr_reader :success, :bytes_downloaded, :primary_missing_file, :reason, :extra_details
def initialize(success:, bytes_downloaded:, primary_missing_file: false, reason: nil, extra_details: nil)
@success = success
@bytes_downloaded = bytes_downloaded
@primary_missing_file = primary_missing_file
@reason = reason
@extra_details = extra_details
end
end
def initialize(replicator:)
@replicator = replicator
end
# Download the file to a tempfile, then put it where it belongs.
#
# @return [Result] a result object containing all relevant information
def execute
check_result = check_preconditions
return check_result if check_result
temp_file = open_temp_file
return temp_file if temp_file.is_a?(Result)
begin
result = download_file(resource_url, request_headers, temp_file)
ensure
temp_file.close
temp_file.unlink
end
result
end
# @return [String] URL to download the resource from
def resource_url
# TODO change to Generalized API endpoint after that is implemented
Gitlab::Geo.primary_node.geo_transfers_url(replicable_name, model_record.id.to_s)
end
private
# Encodes data about the requested resource in the authorization header.
# The primary will decode it and compare the decoded data to the
# requested resource. If decoding works and the data makes sense, then
# this proves to the primary that the secondary knows its GeoNode's
# secret_access_key.
#
# @return [Hash] HTTP request headers
def request_headers
request_data = {
replicable_name: replicable_name,
model_record_id: model_record.id
}
TransferRequest.new(request_data).headers
end
# Returns nil if passed preconditions, otherwise returns a Result object
#
# @return [Result] a result object with skipped reason
def check_preconditions
unless Gitlab::Geo.secondary?
return failure_result(reason: 'Skipping transfer as this is not a Secondary node')
end
unless Gitlab::Geo.primary_node
return failure_result(reason: 'Skipping transfer as there is no Primary node to download from')
end
if file_storage?
if File.directory?(absolute_path)
return failure_result(reason: 'Skipping transfer as destination exist and is a directory')
end
unless ensure_destination_path_exists
return failure_result(reason: 'Skipping transfer as we cannot create the destination directory')
end
end
nil
end
def absolute_path
carrierwave_uploader.path
end
def failure_result(bytes_downloaded: 0, primary_missing_file: false, reason: nil, extra_details: nil)
Result.new(success: false, bytes_downloaded: bytes_downloaded, primary_missing_file: primary_missing_file, reason: reason, extra_details: extra_details)
end
# Ensure entire destination path exist or try to create when not available
#
# @return [Boolean] whether destination path exists or could be created
def ensure_destination_path_exists
path = Pathname.new(absolute_path)
dir = path.dirname
return true if File.directory?(dir)
begin
FileUtils.mkdir_p(dir)
rescue => e
log_error("Unable to create directory #{dir}: #{e}")
return false
end
true
end
# Download file from informed URL using HTTP.rb
#
# @return [Result] Object with transfer status and information
def download_file(url, req_headers, temp_file)
file_size = -1
# Make the request
response = ::HTTP.follow.get(url, headers: req_headers)
# Check for failures
unless response.status.success?
return failure_result(primary_missing_file: primary_missing_file?(response), reason: "Non-success HTTP response status code #{response.status.code}", extra_details: { status_code: response.status.code, reason: response.status.reason, url: url })
end
# Stream to temporary file on disk
response.body.each do |chunk|
temp_file.write(chunk)
end
file_size = temp_file.size
# Check for checksum mismatch
if checksum_mismatch?(temp_file.path)
return failure_result(bytes_downloaded: file_size, reason: "Downloaded file checksum mismatch", extra_details: { primary_checksum: primary_checksum, actual_checksum: @actual_checksum })
end
carrierwave_uploader.replace_file_without_saving!(CarrierWave::SanitizedFile.new(temp_file))
Result.new(success: true, bytes_downloaded: [file_size, 0].max)
rescue StandardError => e
failure_result(bytes_downloaded: file_size, reason: "Error downloading file", extra_details: { error: e, url: url })
end
def primary_missing_file?(response)
return false unless response.status.not_found?
return false unless response.content_type.mime_type == 'application/json'
json_response = response.parse
code_file_not_found?(json_response['geo_code'])
rescue JSON::ParserError
false
end
def code_file_not_found?(geo_code)
geo_code == Gitlab::Geo::Replication::FILE_NOT_FOUND_GEO_CODE
end
def default_permissions
0666 - File.umask
end
def open_temp_file
if file_storage?
# Make sure the file is in the same directory to prevent moves across filesystems
pathname = Pathname.new(absolute_path)
temp = Tempfile.new(TEMP_PREFIX, pathname.dirname.to_s)
else
temp = Tempfile.new("#{TEMP_PREFIX}-#{replicable_name}-#{model_record.id}")
end
temp.chmod(default_permissions)
temp.binmode
temp
rescue => e
details = { error: e }
details.merge({ absolute_path: absolute_path }) if absolute_path
failure_result(reason: "Error creating temporary file", extra_details: details)
end
# @param [String] file_path disk location to compare checksum mismatch
def checksum_mismatch?(file_path)
# Skip checksum check if primary didn't generate one because, for
# example, large attachments are checksummed asynchronously, and most
# types of artifacts are not checksummed at all at the moment.
return false if primary_checksum.blank?
return false unless Feature.enabled?(:geo_file_transfer_validation, default_enabled: true)
primary_checksum != actual_checksum(file_path)
end
def actual_checksum(file_path)
@actual_checksum = Digest::SHA256.file(file_path).hexdigest
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Geo::Replication::BlobDownloader do
include ::EE::GeoHelpers
let_it_be(:primary) { create(:geo_node, :primary) }
let_it_be(:secondary) { create(:geo_node) }
let(:model_record) { create(:package_file, :npm) }
let(:replicator) { model_record.replicator }
subject { described_class.new(replicator: replicator) }
describe '#execute' do
before do
stub_current_geo_node(secondary)
end
context 'precondition failures' do
context 'not a Geo secondary' do
it 'returns failure' do
stub_current_geo_node(primary)
result = subject.execute
expect(result.success).to be_falsey
end
end
context 'no Geo primary exists' do
it 'returns failure' do
primary.update!(primary: false)
result = subject.execute
expect(result.success).to be_falsey
end
end
context 'when the file is locally stored' do
context 'when the file destination is already taken by a directory' do
it 'returns failure' do
path = replicator.carrierwave_uploader.path
expect(File).to receive(:directory?).with(path).and_return(true)
result = subject.execute
expect(result.success).to be_falsey
end
end
xit 'ensures the file destination directory exists' # Not worth testing here as-is. Extract the functionality first.
end
end
context 'when an error occurs while getting a Tempfile' do
it 'returns failure' do
subject
expect(Tempfile).to receive(:new).and_raise('boom')
result = subject.execute
expect(result.success).to be_falsey
expect(result.extra_details).to have_key(:error)
end
end
context 'when the HTTP response is unsuccessful' do
context 'when the HTTP response indicates a missing file on the primary' do
it 'returns a failed result indicating primary_missing_file' do
stub_request(:get, subject.resource_url)
.to_return(status: 404,
headers: { content_type: 'application/json' },
body: { geo_code: Gitlab::Geo::Replication::FILE_NOT_FOUND_GEO_CODE }.to_json)
result = subject.execute
expect_blob_downloader_result(result, success: false, bytes_downloaded: 0, primary_missing_file: true)
end
end
context 'when the HTTP response does not indicate a missing file on the primary' do
it 'returns a failed result' do
stub_request(:get, subject.resource_url)
.to_return(status: 404,
headers: { content_type: 'application/json' },
body: 'Not found')
result = subject.execute
expect_blob_downloader_result(result, success: false, bytes_downloaded: 0, primary_missing_file: false)
end
end
end
context 'when the HTTP response is successful' do
it 'returns success' do
path = replicator.carrierwave_uploader.path
content = replicator.carrierwave_uploader.file.read
size = content.bytesize
stub_request(:get, subject.resource_url).to_return(status: 200, body: content)
result = subject.execute
stat = File.stat(path)
expect_blob_downloader_result(result, success: true, bytes_downloaded: size, primary_missing_file: false)
expect(stat.size).to eq(size)
expect(stat.mode & 0777).to eq(0666 - File.umask)
expect(File.binread(path)).to eq(content)
end
context 'when the checksum of the downloaded file does not match' do
it 'returns a failed result' do
allow(replicator).to receive(:primary_checksum).and_return('something')
bad_content = 'corrupted!!!'
stub_request(:get, subject.resource_url)
.to_return(status: 200, body: bad_content)
result = subject.execute
expect_blob_downloader_result(result, success: false, bytes_downloaded: bad_content.bytesize, primary_missing_file: false)
end
end
context 'when the primary has not stored a checksum for the file' do
it 'returns a successful result' do
expect(replicator).to receive(:primary_checksum).and_return(nil)
content = 'foo'
stub_request(:get, subject.resource_url)
.to_return(status: 200, body: content)
result = subject.execute
expect_blob_downloader_result(result, success: true, bytes_downloaded: content.bytesize, primary_missing_file: false)
end
end
end
end
def expect_blob_downloader_result(result, success:, bytes_downloaded:, primary_missing_file:, extra_details: nil)
expect(result.success).to eq(success)
expect(result.bytes_downloaded).to eq(bytes_downloaded)
expect(result.primary_missing_file).to eq(primary_missing_file)
# Sanity check to help ensure a valid test
expect(success).not_to be_nil
expect(primary_missing_file).not_to be_nil
end
end
# frozen_string_literal: true
require "spec_helper"
describe Geo::BlobDownloadService do
include ::EE::GeoHelpers
include ExclusiveLeaseHelpers
let_it_be(:primary) { create(:geo_node, :primary) }
let_it_be(:secondary) { create(:geo_node) }
let(:model_record) { create(:package_file, :npm) }
let(:replicator) { model_record.replicator }
let(:registry_class) { replicator.registry_class }
subject { described_class.new(replicator: replicator) }
before do
stub_current_geo_node(secondary)
end
describe "#execute" do
let(:downloader) { double(:downloader) }
before do
expect(downloader).to receive(:execute).and_return(result)
expect(::Gitlab::Geo::Replication::BlobDownloader).to receive(:new).and_return(downloader)
end
context "when it can obtain the exclusive lease" do
context "when the registry record does not exist" do
context "when the downloader returns success" do
let(:result) { double(:result, success: true, primary_missing_file: false, bytes_downloaded: 123, reason: nil, extra_details: nil) }
it "creates the registry" do
expect do
subject.execute
end.to change { registry_class.count }.by(1)
end
it "sets sync state to synced" do
subject.execute
expect(registry_class.last).to be_synced
end
end
context "when the downloader returns failure" do
let(:result) { double(:result, success: false, primary_missing_file: false, bytes_downloaded: 123, reason: "foo", extra_details: nil) }
it "creates the registry" do
expect do
subject.execute
end.to change { registry_class.count }.by(1)
end
it "sets sync state to failed" do
subject.execute
expect(registry_class.last).to be_failed
end
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment