Commit 97766234 authored by Mike Kozono's avatar Mike Kozono

Consume the Geo::Event

As a log cursor event, for now. It enqueues
Geo::EventWorker to do the work.
parent c11e6700
# frozen_string_literal: true
module Geo
# Called by Geo::EventWorker to consume the event
class EventService
include ::Gitlab::Geo::LogHelpers
include ::Gitlab::Utils::StrongMemoize
attr_reader :replicable_name, :event_name, :payload
def initialize(replicable_name, event_name, payload)
@replicable_name = replicable_name
@event_name = event_name
@payload = payload
end
def execute
replicator.consume(event_name, payload)
end
private
def replicator
strong_memoize(:replicator) do
model_record_id = payload['model_record_id']
replicator_class = ::Gitlab::Geo::Replicator.for_replicable_name(replicable_name)
replicator_class.new(model_record_id: model_record_id)
end
end
def extra_log_data
{
replicable_name: replicable_name,
event_name: event_name,
payload: payload
}
end
end
end
......@@ -189,6 +189,12 @@
:latency_sensitive:
:resource_boundary: :unknown
:weight: 1
- :name: geo:geo_event
:feature_category: :geo_replication
:has_external_dependencies:
:latency_sensitive:
:resource_boundary: :unknown
:weight: 1
- :name: geo:geo_file_download
:feature_category: :geo_replication
:has_external_dependencies:
......
# frozen_string_literal: true
module Geo
class EventWorker
include ApplicationWorker
include GeoQueue
sidekiq_options retry: 3, dead: false
def perform(replicable_name, event_name, payload)
Geo::EventService.new(replicable_name, event_name, payload).execute
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module LogCursor
module Events
class Event
include BaseEvent
def process
::Geo::EventWorker.perform_async(event.replicable_name, event.event_name, event.payload)
end
end
end
end
end
end
# frozen_string_literal: true
FactoryBot.define do
factory :geo_event, class: 'Geo::Event' do
replicable_name { 'package_file' }
event_name { 'created' }
trait :package_file do
payload do
{ model_record_id: create(:package_file, :pom).id }
end
end
end
end
......@@ -53,6 +53,10 @@ FactoryBot.define do
trait :design_repository_updated_event do
repository_updated_event factory: :geo_design_repository_updated_event
end
trait :event do
event factory: :geo_event
end
end
factory :geo_repository_created_event, class: 'Geo::RepositoryCreatedEvent' do
......
# frozen_string_literal: true
require "spec_helper"
describe Gitlab::Geo::LogCursor::Events::Event, :clean_gitlab_redis_shared_state do
let(:logger) { Gitlab::Geo::LogCursor::Logger.new(described_class, Logger::INFO) }
let(:event) { create(:geo_event, :package_file, event_name: "created" ) }
let(:event_log) { create(:geo_event_log, event: event) }
let(:replicable) { Packages::PackageFile.find(event.payload["model_record_id"]) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
subject { described_class.new(event, Time.now, logger) }
describe "#process" do
it "enqueues Geo::EventWorker" do
expect(::Geo::EventWorker).to receive(:perform_async).with(
"package_file",
"created",
{ "model_record_id" => replicable.id }
)
subject.process
end
it "eventually calls Replicator#consume", :sidekiq_inline do
expect_next_instance_of(::Geo::PackageFileReplicator) do |replicator|
expect(replicator).to receive(:consume).with(
"created",
{ "model_record_id" => replicable.id }
)
end
subject.process
end
end
end
# frozen_string_literal: true
require "spec_helper"
describe Geo::EventWorker, :geo do
describe "#perform" do
it "calls Geo::EventService" do
args = ["package_file", "created", { "model_record_id" => 1 }]
service = double(:service)
expect(service).to receive(:execute)
expect(::Geo::EventService).to receive(:new).with(*args).and_return(service)
described_class.new.perform(*args)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment