Commit d4e54efc authored by Valery Sizov's avatar Valery Sizov Committed by Michael Kozono

Fix Geo: Secondaries may be orphaning Upload files

When group destroy is happening we don't currently
create delete events because we rely on destroy hook.
This MR works around that by using our FastDestroy tool.

Changelog: fixed
EE: true
parent b574ddd8
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
class Upload < ApplicationRecord class Upload < ApplicationRecord
include Checksummable include Checksummable
# Upper limit for foreground checksum processing # Upper limit for foreground checksum processing
CHECKSUM_THRESHOLD = 100.megabytes CHECKSUM_THRESHOLD = 100.megabytes
...@@ -51,9 +52,9 @@ class Upload < ApplicationRecord ...@@ -51,9 +52,9 @@ class Upload < ApplicationRecord
## ##
# FastDestroyAll concerns # FastDestroyAll concerns
def finalize_fast_destroy(keys) def finalize_fast_destroy(items_to_remove)
keys.each do |store_class, paths| items_to_remove.each do |store_class, keys|
store_class.new.delete_keys_async(paths) store_class.new.delete_keys_async(keys)
end end
end end
end end
...@@ -65,6 +66,10 @@ class Upload < ApplicationRecord ...@@ -65,6 +66,10 @@ class Upload < ApplicationRecord
uploader_class.absolute_path(self) uploader_class.absolute_path(self)
end end
def relative_path
uploader_class.relative_path(self)
end
def calculate_checksum! def calculate_checksum!
self.checksum = nil self.checksum = nil
return unless needs_checksum? return unless needs_checksum?
......
...@@ -55,3 +55,5 @@ module Uploads ...@@ -55,3 +55,5 @@ module Uploads
end end
end end
end end
Uploads::Local.prepend_mod
...@@ -32,7 +32,14 @@ class FileUploader < GitlabUploader ...@@ -32,7 +32,14 @@ class FileUploader < GitlabUploader
def self.absolute_path(upload) def self.absolute_path(upload)
File.join( File.join(
absolute_base_dir(upload.model), root,
relative_path(upload)
)
end
def self.relative_path(upload)
File.join(
base_dir(upload.model),
upload.path # already contain the dynamic_segment, see #upload_path upload.path # already contain the dynamic_segment, see #upload_path
) )
end end
......
...@@ -98,16 +98,20 @@ module Geo ...@@ -98,16 +98,20 @@ module Geo
carrierwave_uploader.file.exists? carrierwave_uploader.file.exists?
end end
def deleted_params
{
model_record_id: model_record.id,
uploader_class: carrierwave_uploader.class.to_s,
blob_path: carrierwave_uploader.relative_path
}
end
private private
def download def download
::Geo::BlobDownloadService.new(replicator: self).execute ::Geo::BlobDownloadService.new(replicator: self).execute
end end
def deleted_params
{ model_record_id: model_record.id, uploader_class: carrierwave_uploader.class.to_s, blob_path: carrierwave_uploader.relative_path }
end
# Return whether it's capable of generating a checksum of itself # Return whether it's capable of generating a checksum of itself
# #
# @return [Boolean] whether it can generate a checksum # @return [Boolean] whether it can generate a checksum
......
# frozen_string_literal: true
module EE
module Uploads
module Local
extend ::Gitlab::Utils::Override
override :keys
def keys(relation)
return super unless ::Geo::EventStore.can_create_event?
relation.includes(:model).find_each.map do |record|
record.replicator.deleted_params.merge(absolute_path: record.absolute_path)
end
end
override :delete_keys_async
def delete_keys_async(keys_to_delete)
return super unless ::Geo::EventStore.can_create_event?
keys_to_delete.each_slice(::Uploads::Base::BATCH_SIZE) do |batch|
::DeleteStoredFilesWorker.perform_async(self.class, batch.pluck(:absolute_path))
::Geo::UploadReplicator.bulk_create_delete_events_async(batch)
end
end
end
end
end
...@@ -71,6 +71,7 @@ module Geo ...@@ -71,6 +71,7 @@ module Geo
class_name: 'Geo::Event', class_name: 'Geo::Event',
foreign_key: :geo_event_id, foreign_key: :geo_event_id,
inverse_of: :geo_event_log inverse_of: :geo_event_log
def self.latest_event def self.latest_event
order(id: :desc).first order(id: :desc).first
end end
......
...@@ -9,6 +9,31 @@ module Geo ...@@ -9,6 +9,31 @@ module Geo
::Upload ::Upload
end end
def self.bulk_create_delete_events_async(deleted_uploads)
return if deleted_uploads.empty?
deleted_upload_details = []
events = deleted_uploads.map do |upload|
deleted_upload_details << [upload[:model_record_id], upload[:blob_path]]
{
replicable_name: 'upload',
event_name: 'deleted',
payload: {
model_record_id: upload[:model_record_id],
blob_path: upload[:blob_path],
uploader_class: upload[:uploader_class]
},
created_at: Time.current
}
end
log_info('Delete bulk of uploads: ', uploads: deleted_upload_details)
::Geo::BatchEventCreateWorker.perform_async(events)
end
def carrierwave_uploader def carrierwave_uploader
model_record.retrieve_uploader model_record.retrieve_uploader
end end
......
...@@ -462,6 +462,15 @@ ...@@ -462,6 +462,15 @@
:weight: 2 :weight: 2
:idempotent: :idempotent:
:tags: [] :tags: []
- :name: geo:geo_batch_event_create
:worker_name: Geo::BatchEventCreateWorker
:feature_category: :geo_replication
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: geo:geo_batch_project_registry - :name: geo:geo_batch_project_registry
:worker_name: Geo::Batch::ProjectRegistryWorker :worker_name: Geo::Batch::ProjectRegistryWorker
:feature_category: :geo_replication :feature_category: :geo_replication
......
# frozen_string_literal: true
module Geo
class BatchEventCreateWorker
include ApplicationWorker
data_consistency :always
include GeoQueue
include ::Gitlab::Geo::LogHelpers
idempotent!
def perform(events)
log_info('Executing Geo::BatchEventCreateWorker', events_count: events.size)
::Gitlab::Geo::Replicator.bulk_create_events(events)
end
end
end
...@@ -193,6 +193,20 @@ module Gitlab ...@@ -193,6 +193,20 @@ module Gitlab
false false
end end
def self.bulk_create_events(events)
::Geo::EventLog.transaction do
results = ::Geo::Event.insert_all!(events)
break if results.rows.empty?
ids = results.map { |result| { geo_event_id: result['id'], created_at: Time.current } }
::Geo::EventLog.insert_all!(ids)
end
rescue ActiveRecord::RecordInvalid, NoMethodError => e
log_error('Geo::EventLog could not be created in bulk', e)
end
# @param [ActiveRecord::Base] model_record # @param [ActiveRecord::Base] model_record
# @param [Integer] model_record_id # @param [Integer] model_record_id
def initialize(model_record: nil, model_record_id: nil) def initialize(model_record: nil, model_record_id: nil)
...@@ -222,7 +236,6 @@ module Gitlab ...@@ -222,7 +236,6 @@ module Gitlab
raise ArgumentError, "Unsupported event: '#{event_name}'" unless self.class.event_supported?(event_name) raise ArgumentError, "Unsupported event: '#{event_name}'" unless self.class.event_supported?(event_name)
create_event_with( create_event_with(
class_name: ::Geo::Event,
replicable_name: self.class.replicable_name, replicable_name: self.class.replicable_name,
event_name: event_name, event_name: event_name,
payload: event_data payload: event_data
...@@ -310,16 +323,15 @@ module Gitlab ...@@ -310,16 +323,15 @@ module Gitlab
# Store an event on the database # Store an event on the database
# #
# @example Create an event # @example Create an event
# create_event_with(class_name: Geo::CacheInvalidationEvent, key: key) # create_event_with(key: key)
# #
# @param [Class] class_name a ActiveRecord class that's used to store an event for Geo
# @param [Hash] **params context information that will be stored in the event table # @param [Hash] **params context information that will be stored in the event table
# @return [ApplicationRecord] event instance that was just persisted # @return [ApplicationRecord] event instance that was just persisted
def create_event_with(class_name:, **params) def create_event_with(**params)
return unless Gitlab::Geo.primary? return unless Gitlab::Geo.primary?
return unless Gitlab::Geo.secondary_nodes.any? return unless Gitlab::Geo.secondary_nodes.any?
event = class_name.create!(**params) event = ::Geo::Event.create!(**params)
# Only works with the new geo_events at the moment because we need to # Only works with the new geo_events at the moment because we need to
# know which foreign key to use # know which foreign key to use
...@@ -327,7 +339,7 @@ module Gitlab ...@@ -327,7 +339,7 @@ module Gitlab
event event
rescue ActiveRecord::RecordInvalid, NoMethodError => e rescue ActiveRecord::RecordInvalid, NoMethodError => e
log_error("#{class_name} could not be created", e, params) log_error("::Geo::Event could not be created", e, params)
end end
def current_node def current_node
......
...@@ -133,6 +133,27 @@ RSpec.describe Gitlab::Geo::Replicator do ...@@ -133,6 +133,27 @@ RSpec.describe Gitlab::Geo::Replicator do
end end
end end
describe '.bulk_create_events' do
let(:event) do
{
replicable_name: 'upload',
event_name: 'created',
payload: {
data: "some payload"
},
created_at: Time.current
}
end
let(:events) { [event] }
it 'creates events' do
expect { Gitlab::Geo::Replicator.bulk_create_events(events) }.to change { ::Geo::EventLog.count }.from(0).to(1)
expect(::Geo::EventLog.last.event).to be_present
end
end
describe '#initialize' do describe '#initialize' do
subject(:replicator) { Geo::DummyReplicator.new(**args) } subject(:replicator) { Geo::DummyReplicator.new(**args) }
......
...@@ -87,7 +87,7 @@ RSpec.describe Upload do ...@@ -87,7 +87,7 @@ RSpec.describe Upload do
end end
describe '#destroy' do describe '#destroy' do
subject { create(:upload, checksum: '8710d2c16809c79fee211a9693b64038a8aae99561bc86ce98a9b46b45677fe4') } subject { create(:upload, :namespace_upload, checksum: '8710d2c16809c79fee211a9693b64038a8aae99561bc86ce98a9b46b45677fe4') }
context 'when running in a Geo primary node' do context 'when running in a Geo primary node' do
let_it_be(:primary) { create(:geo_node, :primary) } let_it_be(:primary) { create(:geo_node, :primary) }
...@@ -98,6 +98,18 @@ RSpec.describe Upload do ...@@ -98,6 +98,18 @@ RSpec.describe Upload do
expect { subject.destroy }.to change(Geo::UploadDeletedEvent, :count).by(1) expect { subject.destroy }.to change(Geo::UploadDeletedEvent, :count).by(1)
end end
it 'logs an event to the Geo event log when bulk removal is used', :sidekiq_inline do
stub_current_geo_node(primary)
expect { subject.model.destroy }.to change(Geo::Event.where(replicable_name: :upload, event_name: :deleted), :count).by(1)
payload = Geo::Event.where(replicable_name: :upload, event_name: :deleted).last.payload
expect(payload['model_record_id']).to eq(subject.id)
expect(payload['blob_path']).to eq(subject.relative_path)
expect(payload['uploader_class']).to eq('NamespaceFileUploader')
end
end end
end end
end end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Uploads::Local, :geo do
include ::EE::GeoHelpers
let(:data_store) { described_class.new }
before do
stub_uploads_object_storage(FileUploader)
end
context 'on a primary when secondary nodes exist' do
let(:project) { create(:project) }
let(:relation) { project.uploads }
before do
allow(::Geo::EventStore).to receive(:can_create_event?).and_return(true)
end
describe '#keys' do
let(:upload) { create(:upload, uploader: FileUploader, model: project) }
let!(:uploads) { [upload] }
it 'returns keys' do
keys = data_store.keys(relation)
expected_hash = {
absolute_path: upload.absolute_path,
blob_path: upload.retrieve_uploader.relative_path,
model_record_id: upload.id,
uploader_class: "FileUploader"
}
expect(keys.size).to eq 1
expect(keys.first).to include(expected_hash)
end
end
describe '#delete_keys_async' do
it 'performs calls to DeleteStoredFilesWorker and Geo::UploadReplicator.bulk_create_delete_events_async' do
keys_to_delete = [{
absolute_path: 'absolute_path',
blob_path: 'relative_path',
model_record_id: 1,
uploader_class: "FileUploader"
}]
expect(::DeleteStoredFilesWorker).to receive(:perform_async).with(Uploads::Local, ['absolute_path'])
expect(::Geo::UploadReplicator).to receive(:bulk_create_delete_events_async).with(keys_to_delete)
data_store.delete_keys_async(keys_to_delete)
end
end
end
end
...@@ -6,4 +6,30 @@ RSpec.describe Geo::UploadReplicator do ...@@ -6,4 +6,30 @@ RSpec.describe Geo::UploadReplicator do
let(:model_record) { create(:upload, :with_file) } let(:model_record) { create(:upload, :with_file) }
include_examples 'a blob replicator' include_examples 'a blob replicator'
describe '.bulk_create_delete_events_async' do
let(:deleted_upload) do
{
model_record_id: 1,
blob_path: 'path',
uploader_class: 'UploaderClass'
}
end
let(:deleted_uploads) { [deleted_upload] }
it 'calls Geo::BatchEventCreateWorker and passes events array', :sidekiq_inline do
expect { described_class.bulk_create_delete_events_async(deleted_uploads) }.to change { ::Geo::Event.count }.from(0).to(1)
created_event = ::Geo::Event.last
expect(created_event.replicable_name).to eq 'upload'
expect(created_event.event_name).to eq 'deleted'
expect(created_event.created_at).to be_present
expect(created_event.payload).to eq(deleted_upload.stringify_keys)
end
it 'returns nil when empty array is passed' do
expect(described_class.bulk_create_delete_events_async([])).to be_nil
end
end
end end
...@@ -51,7 +51,7 @@ module EE ...@@ -51,7 +51,7 @@ module EE
def stub_dummy_replicator_class def stub_dummy_replicator_class
stub_const('Geo::DummyReplicator', Class.new(::Gitlab::Geo::Replicator)) stub_const('Geo::DummyReplicator', Class.new(::Gitlab::Geo::Replicator))
Geo::DummyReplicator.class_eval do ::Geo::DummyReplicator.class_eval do
event :test event :test
event :another_test event :another_test
......
# frozen_string_literal: true
require "spec_helper"
RSpec.describe Geo::BatchEventCreateWorker, :geo do
describe "#perform" do
it "calls Gitlab::Geo::Replicator.bulk_create_events" do
events = []
expect(::Gitlab::Geo::Replicator).to receive(:bulk_create_events).with(events)
described_class.new.perform(events)
end
end
end
...@@ -82,6 +82,18 @@ RSpec.describe Upload do ...@@ -82,6 +82,18 @@ RSpec.describe Upload do
end end
end end
describe '#relative_path' do
it "delegates to the uploader's relative_path method" do
uploader = spy('FakeUploader')
upload = described_class.new(path: '/tmp/secret/file.jpg', store: ObjectStorage::Store::LOCAL)
expect(upload).to receive(:uploader_class).and_return(uploader)
upload.relative_path
expect(uploader).to have_received(:relative_path).with(upload)
end
end
describe '#calculate_checksum!' do describe '#calculate_checksum!' do
let(:upload) do let(:upload) do
described_class.new(path: __FILE__, described_class.new(path: __FILE__,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment