Commit 50e3462a authored by Valery Sizov's avatar Valery Sizov

Refactor and improve Geo::LogCursor and tests

It also make it more efficient
parent a9dbdac0
...@@ -10,7 +10,7 @@ module Gitlab ...@@ -10,7 +10,7 @@ module Gitlab
# fetches up to BATCH_SIZE next events and keep track of batches # fetches up to BATCH_SIZE next events and keep track of batches
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def fetch_in_batches(batch_size: BATCH_SIZE) def fetch_in_batches(batch_size: BATCH_SIZE)
last_id = last_processed_id last_id = last_processed_id || last_event_log_id
::Geo::EventLog.where('id > ?', last_id).find_in_batches(batch_size: batch_size) do |batch| ::Geo::EventLog.where('id > ?', last_id).find_in_batches(batch_size: batch_size) do |batch|
yield(batch, last_id) yield(batch, last_id)
...@@ -31,18 +31,19 @@ module Gitlab ...@@ -31,18 +31,19 @@ module Gitlab
event_state.update!(event_id: event_id) event_state.update!(event_id: event_id)
end end
# @return [Integer] id of last replicated event # @return [Integer] id of last replicated event or nil if it does not exist
def last_processed_id def last_processed_id
last = ::Geo::EventLogState.last_processed&.id ::Geo::EventLogState.last_processed&.id
return last if last end
if ::Geo::EventLog.any? # @return [Integer] id of the event we need to start processing from
event_id = ::Geo::EventLog.last.id def last_event_log_id
save_processed(event_id) last_id = ::Geo::EventLog.last&.id
event_id
else return 0 unless last_id
-1
end save_processed(last_id)
last_id
end end
end end
end end
......
...@@ -15,14 +15,16 @@ describe Gitlab::Geo::LogCursor::EventLogs, :postgresql, :clean_gitlab_redis_sha ...@@ -15,14 +15,16 @@ describe Gitlab::Geo::LogCursor::EventLogs, :postgresql, :clean_gitlab_redis_sha
let!(:event_log_2) { create(:geo_event_log) } let!(:event_log_2) { create(:geo_event_log) }
context 'when there is no event_log_state' do context 'when there is no event_log_state' do
it 'does not yields a group of events' do it 'does not yield a group of events' do
expect { |b| subject.fetch_in_batches(&b) }.not_to yield_with_args([event_log_1, event_log_2]) expect { |b| subject.fetch_in_batches(&b) }.not_to yield_with_args([event_log_1, event_log_2])
end end
end end
context 'when there is already an event_log_state' do context 'when there is already an event_log_state' do
let(:last_event_id) { event_log_1.id - 1 }
before do before do
create(:geo_event_log_state, event_id: event_log_1.id - 1) create(:geo_event_log_state, event_id: last_event_id)
end end
it 'saves last event as last processed after yielding' do it 'saves last event as last processed after yielding' do
...@@ -30,6 +32,10 @@ describe Gitlab::Geo::LogCursor::EventLogs, :postgresql, :clean_gitlab_redis_sha ...@@ -30,6 +32,10 @@ describe Gitlab::Geo::LogCursor::EventLogs, :postgresql, :clean_gitlab_redis_sha
expect(Geo::EventLogState.last.event_id).to eq(event_log_2.id) expect(Geo::EventLogState.last.event_id).to eq(event_log_2.id)
end end
it 'yields a group of events' do
expect { |b| subject.fetch_in_batches(&b) }.to yield_with_args([event_log_1, event_log_2], last_event_id)
end
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment