Commit 9aaef2c9 authored by Furkan Ayhan's avatar Furkan Ayhan Committed by Adam Hegyi

Add logging to process-sync-events service

With this, we can track;
- how many events we process
- queue depth, event backlog
parent cb2712bd
...@@ -13,4 +13,8 @@ class Namespaces::SyncEvent < ApplicationRecord ...@@ -13,4 +13,8 @@ class Namespaces::SyncEvent < ApplicationRecord
def self.enqueue_worker def self.enqueue_worker
::Namespaces::ProcessSyncEventsWorker.perform_async # rubocop:disable CodeReuse/Worker ::Namespaces::ProcessSyncEventsWorker.perform_async # rubocop:disable CodeReuse/Worker
end end
def self.upper_bound_count
select('COALESCE(MAX(id) - MIN(id) + 1, 0) AS upper_bound_count').to_a.first.upper_bound_count
end
end end
...@@ -13,4 +13,8 @@ class Projects::SyncEvent < ApplicationRecord ...@@ -13,4 +13,8 @@ class Projects::SyncEvent < ApplicationRecord
def self.enqueue_worker def self.enqueue_worker
::Projects::ProcessSyncEventsWorker.perform_async # rubocop:disable CodeReuse/Worker ::Projects::ProcessSyncEventsWorker.perform_async # rubocop:disable CodeReuse/Worker
end end
def self.upper_bound_count
select('COALESCE(MAX(id) - MIN(id) + 1, 0) AS upper_bound_count').to_a.first.upper_bound_count
end
end end
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
module Ci module Ci
class ProcessSyncEventsService class ProcessSyncEventsService
include Gitlab::Utils::StrongMemoize
include ExclusiveLeaseGuard include ExclusiveLeaseGuard
BATCH_SIZE = 1000 BATCH_SIZE = 1000
...@@ -10,6 +9,7 @@ module Ci ...@@ -10,6 +9,7 @@ module Ci
def initialize(sync_event_class, sync_class) def initialize(sync_event_class, sync_class)
@sync_event_class = sync_event_class @sync_event_class = sync_event_class
@sync_class = sync_class @sync_class = sync_class
@results = {}
end end
def execute def execute
...@@ -17,13 +17,19 @@ module Ci ...@@ -17,13 +17,19 @@ module Ci
try_obtain_lease { process_events } try_obtain_lease { process_events }
enqueue_worker_if_there_still_event enqueue_worker_if_there_still_event
@results
end end
private private
def process_events def process_events
add_result(estimated_total_events: @sync_event_class.upper_bound_count)
events = @sync_event_class.preload_synced_relation.first(BATCH_SIZE) events = @sync_event_class.preload_synced_relation.first(BATCH_SIZE)
add_result(consumable_events: events.size)
return if events.empty? return if events.empty?
processed_events = [] processed_events = []
...@@ -35,6 +41,7 @@ module Ci ...@@ -35,6 +41,7 @@ module Ci
processed_events << event processed_events << event
end end
ensure ensure
add_result(processed_events: processed_events.size)
@sync_event_class.id_in(processed_events).delete_all @sync_event_class.id_in(processed_events).delete_all
end end
end end
...@@ -50,5 +57,9 @@ module Ci ...@@ -50,5 +57,9 @@ module Ci
def lease_timeout def lease_timeout
1.minute 1.minute
end end
def add_result(result)
@results.merge!(result)
end
end end
end end
...@@ -16,7 +16,13 @@ module Namespaces ...@@ -16,7 +16,13 @@ module Namespaces
deduplicate :until_executing deduplicate :until_executing
def perform def perform
::Ci::ProcessSyncEventsService.new(::Namespaces::SyncEvent, ::Ci::NamespaceMirror).execute results = ::Ci::ProcessSyncEventsService.new(
::Namespaces::SyncEvent, ::Ci::NamespaceMirror
).execute
results.each do |key, value|
log_extra_metadata_on_done(key, value)
end
end end
end end
end end
...@@ -16,7 +16,13 @@ module Projects ...@@ -16,7 +16,13 @@ module Projects
deduplicate :until_executing deduplicate :until_executing
def perform def perform
::Ci::ProcessSyncEventsService.new(::Projects::SyncEvent, ::Ci::ProjectMirror).execute results = ::Ci::ProcessSyncEventsService.new(
::Projects::SyncEvent, ::Ci::ProjectMirror
).execute
results.each do |key, value|
log_extra_metadata_on_done(key, value)
end
end end
end end
end end
...@@ -25,6 +25,8 @@ RSpec.describe Ci::ProcessSyncEventsService do ...@@ -25,6 +25,8 @@ RSpec.describe Ci::ProcessSyncEventsService do
project2.update!(group: parent_group_2) project2.update!(group: parent_group_2)
end end
it { is_expected.to eq(service_results(2, 2, 2)) }
it 'consumes events' do it 'consumes events' do
expect { execute }.to change(Projects::SyncEvent, :count).from(2).to(0) expect { execute }.to change(Projects::SyncEvent, :count).from(2).to(0)
...@@ -36,20 +38,32 @@ RSpec.describe Ci::ProcessSyncEventsService do ...@@ -36,20 +38,32 @@ RSpec.describe Ci::ProcessSyncEventsService do
) )
end end
it 'enqueues Projects::ProcessSyncEventsWorker if any left' do context 'when any event left after processing' do
stub_const("#{described_class}::BATCH_SIZE", 1) before do
stub_const("#{described_class}::BATCH_SIZE", 1)
end
expect(Projects::ProcessSyncEventsWorker).to receive(:perform_async) it { is_expected.to eq(service_results(2, 1, 1)) }
execute it 'enqueues Projects::ProcessSyncEventsWorker' do
expect(Projects::ProcessSyncEventsWorker).to receive(:perform_async)
execute
end
end end
it 'does not enqueue Projects::ProcessSyncEventsWorker if no left' do context 'when no event left after processing' do
stub_const("#{described_class}::BATCH_SIZE", 2) before do
stub_const("#{described_class}::BATCH_SIZE", 2)
end
expect(Projects::ProcessSyncEventsWorker).not_to receive(:perform_async) it { is_expected.to eq(service_results(2, 2, 2)) }
execute it 'does not enqueue Projects::ProcessSyncEventsWorker' do
expect(Projects::ProcessSyncEventsWorker).not_to receive(:perform_async)
execute
end
end end
context 'when there is no event' do context 'when there is no event' do
...@@ -57,27 +71,45 @@ RSpec.describe Ci::ProcessSyncEventsService do ...@@ -57,27 +71,45 @@ RSpec.describe Ci::ProcessSyncEventsService do
Projects::SyncEvent.delete_all Projects::SyncEvent.delete_all
end end
it { is_expected.to eq(service_results(0, 0, nil)) }
it 'does nothing' do it 'does nothing' do
expect { execute }.not_to change(Projects::SyncEvent, :count) expect { execute }.not_to change(Projects::SyncEvent, :count)
end end
end end
it 'does not delete non-executed events' do context 'when there is non-executed events' do
new_project = create(:project) before do
sync_event_class.delete_all new_project = create(:project)
sync_event_class.delete_all
project1.update!(group: parent_group_2) project1.update!(group: parent_group_2)
new_project.update!(group: parent_group_1) new_project.update!(group: parent_group_1)
project2.update!(group: parent_group_1) project2.update!(group: parent_group_1)
new_project_sync_event = new_project.sync_events.last @new_project_sync_event = new_project.sync_events.last
allow(sync_event_class).to receive(:preload_synced_relation).and_return( allow(sync_event_class).to receive(:preload_synced_relation).and_return(
sync_event_class.where.not(id: new_project_sync_event) sync_event_class.where.not(id: @new_project_sync_event)
) )
end
it { is_expected.to eq(service_results(3, 2, 2)) }
it 'does not delete non-executed events' do
expect { execute }.to change(Projects::SyncEvent, :count).from(3).to(1)
expect(@new_project_sync_event.reload).to be_persisted
end
end
private
expect { execute }.to change(Projects::SyncEvent, :count).from(3).to(1) def service_results(total, consumable, processed)
expect(new_project_sync_event.reload).to be_persisted {
estimated_total_events: total,
consumable_events: consumable,
processed_events: processed
}.compact
end end
end end
......
...@@ -7,10 +7,12 @@ RSpec.describe Namespaces::ProcessSyncEventsWorker do ...@@ -7,10 +7,12 @@ RSpec.describe Namespaces::ProcessSyncEventsWorker do
let!(:group2) { create(:group) } let!(:group2) { create(:group) }
let!(:group3) { create(:group) } let!(:group3) { create(:group) }
subject(:worker) { described_class.new }
include_examples 'an idempotent worker' include_examples 'an idempotent worker'
describe '#perform' do describe '#perform' do
subject(:perform) { described_class.new.perform } subject(:perform) { worker.perform }
before do before do
group2.update!(parent: group1) group2.update!(parent: group1)
...@@ -28,5 +30,13 @@ RSpec.describe Namespaces::ProcessSyncEventsWorker do ...@@ -28,5 +30,13 @@ RSpec.describe Namespaces::ProcessSyncEventsWorker do
an_object_having_attributes(namespace_id: group3.id, traversal_ids: [group1.id, group2.id, group3.id]) an_object_having_attributes(namespace_id: group3.id, traversal_ids: [group1.id, group2.id, group3.id])
) )
end end
it 'logs the service result', :aggregate_failures do
expect(worker).to receive(:log_extra_metadata_on_done).with(:estimated_total_events, 5)
expect(worker).to receive(:log_extra_metadata_on_done).with(:consumable_events, 5)
expect(worker).to receive(:log_extra_metadata_on_done).with(:processed_events, 5)
perform
end
end end
end end
...@@ -6,10 +6,12 @@ RSpec.describe Projects::ProcessSyncEventsWorker do ...@@ -6,10 +6,12 @@ RSpec.describe Projects::ProcessSyncEventsWorker do
let!(:group) { create(:group) } let!(:group) { create(:group) }
let!(:project) { create(:project) } let!(:project) { create(:project) }
subject(:worker) { described_class.new }
include_examples 'an idempotent worker' include_examples 'an idempotent worker'
describe '#perform' do describe '#perform' do
subject(:perform) { described_class.new.perform } subject(:perform) { worker.perform }
before do before do
project.update!(namespace: group) project.update!(namespace: group)
...@@ -24,5 +26,13 @@ RSpec.describe Projects::ProcessSyncEventsWorker do ...@@ -24,5 +26,13 @@ RSpec.describe Projects::ProcessSyncEventsWorker do
an_object_having_attributes(namespace_id: group.id) an_object_having_attributes(namespace_id: group.id)
) )
end end
it 'logs the service result', :aggregate_failures do
expect(worker).to receive(:log_extra_metadata_on_done).with(:estimated_total_events, 2)
expect(worker).to receive(:log_extra_metadata_on_done).with(:consumable_events, 2)
expect(worker).to receive(:log_extra_metadata_on_done).with(:processed_events, 2)
perform
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment