Commit 8b7bc6dd authored by Mike Kozono's avatar Mike Kozono

Add a Geo::PackageFileReplicator to PackageFile

The Geo::PackageFileReplicator uses the BlobReplicatorStrategy.

A "created" event is published after creating a PackageFile.
parent 4405fd1b
......@@ -55,6 +55,8 @@ module Gitlab
memo << ee_path.to_s
end
ee_paths << "ee/app/replicators"
# Eager load should load CE first
config.eager_load_paths.push(*ee_paths)
config.helpers_paths.push "#{config.root}/ee/app/helpers"
......
# frozen_string_literal: true
module Geo
module BlobReplicatorStrategy
extend ActiveSupport::Concern
included do
event :created
end
class_methods do
end
# Called by Packages::PackageFile on create
def publish_created_event
publish(:created, **created_params)
end
# Called by Gitlab::Geo::Replicator#consume
def consume_created_event
download
end
def carrierwave_uploader
raise NotImplementedError
end
private
# Example:
#
# package_file.replicator.download
def download
::Geo::BlobDownloadService.new(replicator: self).execute
end
def created_params
{ model_record_id: model_record.id }
end
end
end
# frozen_string_literal: true
class Packages::PackageFile < ApplicationRecord
include UpdateProjectStatistics
include ::Gitlab::Geo::ReplicableModel
delegate :project, :project_id, to: :package
delegate :conan_file_type, to: :conan_file_metadatum
......@@ -28,7 +29,10 @@ class Packages::PackageFile < ApplicationRecord
mount_uploader :file, Packages::PackageFileUploader
with_replicator Geo::PackageFileReplicator
after_save :update_file_store, if: :saved_change_to_file?
after_create_commit -> { replicator.publish_created_event }
def update_file_store
# The file.object_store is set during `uploader.store!`
......
# frozen_string_literal: true
module Geo
class PackageFileReplicator < Gitlab::Geo::Replicator
include ::Geo::BlobReplicatorStrategy
def carrierwave_uploader
model_record.file
end
private
def model
::Packages::PackageFile
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module ReplicableModel
def self.included(klass)
klass.extend(ClassMethods)
end
module ClassMethods
def with_replicator(klass)
raise ArgumentError, 'Must be a class inheriting from Gitlab::Geo::Replicator' unless klass < ::Gitlab::Geo::Replicator
class_eval <<-RUBY, __FILE__, __LINE__ + 1
define_method :replicator do
@_replicator ||= klass.new(model_record: self)
end
RUBY
end
end
# Geo Replicator
#
# @return [Gitlab::Geo::Replicator]
def replicator
raise NotImplementedError, 'There is no Replicator defined for this model'
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
class Replicator
include ::Gitlab::Geo::LogHelpers
# Declare supported event
#
# @example Declaring support for :update and :delete events
# class MyReplicator < Gitlab::Geo::Replicator
# event :update
# event :delete
# end
#
# @param [Symbol] event_name
def self.event(event_name)
@events ||= []
@events << event_name.to_sym
end
private_class_method :event
# List supported events
#
# @return [Array<Symbol>] with list of events
def self.supported_events
@events.dup
end
# Check if the replicator supports a specific event
#
# @param [Boolean] event_name
def self.event_supported?(event_name)
@events.include?(event_name.to_sym)
end
# Return the name of the replicator
#
# @return [String] name
def self.replicable_name
self.name.demodulize.sub('Replicator', '').underscore
end
def self.registry_class
const_get("::Geo::#{replicable_name.camelize}Registry", false)
end
def self.for_replicable_name(replicable_name)
replicator_class_name = "::Geo::#{replicable_name.camelize}Replicator"
const_get(replicator_class_name, false)
end
def initialize(model_record: nil, model_record_id: nil)
@model_record = model_record
@model_record_id = model_record_id
end
def model_record
if defined?(@model_record) && @model_record
return @model_record
end
if model_record_id
@model_record = model.find(model_record_id)
end
end
def publish(event_name, **event_data)
return unless Feature.enabled?(:geo_self_service_framework)
raise ArgumentError, "Unsupported event: '#{event_name}'" unless self.class.event_supported?(event_name)
create_event_with(
class_name: ::Geo::Event,
replicable_name: self.class.replicable_name,
event_name: event_name,
payload: event_data
)
end
# Consume an event, using the published contextual data
#
# This method is called by the GeoLogCursor when reading the event from the queue
#
# @param [Symbol] event_name
# @param [Hash] params contextual data published with the event
def consume(event_name, **params)
raise ArgumentError, "Unsupported event: '#{event_name}'" unless self.class.event_supported?(event_name)
consume_method = "consume_#{event_name}".to_sym
raise NotImplementedError, "Consume method not implemented: '#{consume_method}'" unless instance_method_defined?(consume_method)
# Inject model_record based on included class
if model_record
params[:model_record] = model_record
end
send(consume_method, **params) # rubocop:disable GitlabSecurity/PublicSend
end
def replicable_name
self.class.replicable_name
end
def registry_class
self.class.registry_class
end
def registry
registry_class.for_model_record_id(model_record.id)
end
def primary_checksum
nil
end
protected
# Store an event on the database
#
# @example Create an event
# create_event_with(class_name: Geo::CacheInvalidationEvent, key: key)
#
# @param [Class] class_name a ActiveRecord class that's used to store an event for Geo
# @param [Hash] **params context information that will be stored in the event table
# @return [ApplicationRecord] event instance that was just persisted
def create_event_with(class_name:, **params)
return unless Gitlab::Geo.primary?
return unless Gitlab::Geo.secondary_nodes.any?
event = class_name.create!(**params)
# Only works with the new geo_events at the moment because we need to
# know which foreign key to use
::Geo::EventLog.create!(geo_event_id: event)
event
rescue ActiveRecord::RecordInvalid, NoMethodError => e
log_error("#{class_name} could not be created", e, params)
end
private
# Checks if method is implemented by current class (ignoring inherited methods)
#
# @param [Symbol] method_name
# @return [Boolean] whether method is implemented
def instance_method_defined?(method_name)
self.class.instance_methods(false).include?(method_name)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Geo::Replicator do
context 'with defined events' do
class DummyReplicator < Gitlab::Geo::Replicator
event :test
event :another_test
protected
def publish_test(other:)
true
end
end
context 'event DSL' do
subject { DummyReplicator }
describe '.supported_events' do
it 'expects :test event to be supported' do
expect(subject.supported_events).to match_array([:test, :another_test])
end
end
describe '.event_supported?' do
it 'expects a supported event to return true' do
expect(subject.event_supported?(:test)).to be_truthy
end
it 'expect an unsupported event to return false' do
expect(subject.event_supported?(:something_else)).to be_falsey
end
end
end
context 'model DSL' do
class DummyModel
include ActiveModel::Model
include Gitlab::Geo::ReplicableModel
with_replicator DummyReplicator
end
subject { DummyModel.new }
it 'adds replicator method to the model' do
expect(subject).respond_to? :replicator
end
it 'instantiates a replicator into the model' do
expect(subject.replicator).to be_a(DummyReplicator)
end
end
describe '#publish' do
subject { DummyReplicator.new }
context 'when geo_self_service_framework feature is disabled' do
before do
stub_feature_flags(geo_self_service_framework: false)
end
it 'returns nil' do
expect(subject.publish(:test, other: true)).to be_nil
end
it 'does not call create_event' do
expect(subject).not_to receive(:create_event_with)
subject.publish(:test, other: true)
end
end
context 'when publishing a supported events with required params' do
it 'does not raise errors' do
expect { subject.publish(:test, other: true) }.not_to raise_error
end
end
context 'when publishing unsupported event' do
it 'raises an argument error' do
expect { subject.publish(:unsupported) }.to raise_error(ArgumentError)
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::PackageFileReplicator do
include EE::GeoHelpers
let_it_be(:primary) { create(:geo_node, :primary) }
let_it_be(:secondary) { create(:geo_node) }
let_it_be(:model_record) { create(:package_file, :npm) }
subject { described_class.new(model_record: model_record) }
before do
stub_current_geo_node(primary)
end
describe '#publish_created_event' do
it "creates a Geo::Event" do
expect do
subject.publish_created_event
end.to change { ::Geo::Event.count }.by(1)
expect(::Geo::Event.last.attributes).to include("replicable_name" => "package_file", "event_name" => "created", "payload" => { "model_record_id" => model_record.id })
end
end
describe '#consume_created_event' do
it 'invokes Geo::BlobDownloadService' do
service = double(:service)
expect(service).to receive(:execute)
expect(::Geo::BlobDownloadService).to receive(:new).with(replicator: subject).and_return(service)
subject.consume_created_event
end
end
end
......@@ -27,6 +27,7 @@ module Quality
policies
presenters
rack_servers
replicators
routing
rubocop
serializers
......
......@@ -21,7 +21,7 @@ RSpec.describe Quality::TestLevel do
context 'when level is unit' do
it 'returns a pattern' do
expect(subject.pattern(:unit))
.to eq("spec/{bin,config,db,dependencies,factories,finders,frontend,graphql,haml_lint,helpers,initializers,javascripts,lib,models,policies,presenters,rack_servers,routing,rubocop,serializers,services,sidekiq,tasks,uploaders,validators,views,workers,elastic_integration}{,/**/}*_spec.rb")
.to eq("spec/{bin,config,db,dependencies,factories,finders,frontend,graphql,haml_lint,helpers,initializers,javascripts,lib,models,policies,presenters,rack_servers,replicators,routing,rubocop,serializers,services,sidekiq,tasks,uploaders,validators,views,workers,elastic_integration}{,/**/}*_spec.rb")
end
end
......@@ -82,7 +82,7 @@ RSpec.describe Quality::TestLevel do
context 'when level is unit' do
it 'returns a regexp' do
expect(subject.regexp(:unit))
.to eq(%r{spec/(bin|config|db|dependencies|factories|finders|frontend|graphql|haml_lint|helpers|initializers|javascripts|lib|models|policies|presenters|rack_servers|routing|rubocop|serializers|services|sidekiq|tasks|uploaders|validators|views|workers|elastic_integration)})
.to eq(%r{spec/(bin|config|db|dependencies|factories|finders|frontend|graphql|haml_lint|helpers|initializers|javascripts|lib|models|policies|presenters|rack_servers|replicators|routing|rubocop|serializers|services|sidekiq|tasks|uploaders|validators|views|workers|elastic_integration)})
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment