Commit 979f8686 authored by Stan Hu's avatar Stan Hu

Merge branch 'tc-hotfix-geo-cursor-gaps' into 'master'

Track the Geo event log gaps in redis and handle them later

Closes #6956

See merge request gitlab-org/gitlab-ee!6640
parents cab3c51f e5170caa
......@@ -204,7 +204,7 @@ class GeoNodeStatus < ActiveRecord::Base
def load_secondary_data
if Gitlab::Geo.secondary?
self.db_replication_lag_seconds = Gitlab::Geo::HealthCheck.db_replication_lag_seconds
self.cursor_last_event_id = Geo::EventLogState.last_processed&.event_id
self.cursor_last_event_id = current_cursor_last_event_id
self.cursor_last_event_date = Geo::EventLog.find_by(id: self.cursor_last_event_id)&.created_at
self.repositories_synced_count = projects_finder.count_synced_repositories
self.repositories_failed_count = projects_finder.count_failed_repositories
......@@ -241,6 +241,15 @@ class GeoNodeStatus < ActiveRecord::Base
end
end
def current_cursor_last_event_id
return unless Gitlab::Geo.secondary?
min_gap_id = ::Gitlab::Geo::EventGapTracking.min_gap_id
last_processed_id = Geo::EventLogState.last_processed&.event_id
[min_gap_id, last_processed_id].compact.min
end
def healthy?
!outdated? && status_message_healthy?
end
......
---
title: Track the Geo event log gaps in redis and handle them later
merge_request: 6640
author:
type: changed
module Gitlab
module Geo
class EventGapTracking
include Utils::StrongMemoize
include ::Gitlab::Geo::LogHelpers
attr_accessor :previous_id
GEO_EVENT_LOG_GAPS = 'geo:event_log:gaps'.freeze
GAP_GRACE_PERIOD = 10.minutes
GAP_OUTDATED_PERIOD = 1.hour
class << self
def min_gap_id
with_redis do |redis|
redis.zrange(GEO_EVENT_LOG_GAPS, 0, -1).min&.to_i
end
end
def gap_count
with_redis do |redis|
redis.zcount(GEO_EVENT_LOG_GAPS, '-inf', '+inf')
end
end
def with_redis
::Gitlab::Redis::SharedState.with { |redis| yield redis }
end
end
delegate :with_redis, to: :class
def initialize(logger = ::Gitlab::Geo::Logger)
@logger = logger
@previous_id = 0
end
def check!(current_id)
return unless previous_id > 0
return unless gap?(current_id)
track_gaps(current_id)
ensure
self.previous_id = current_id
end
# accepts a block that should return whether the event was handled
def fill_gaps
with_redis do |redis|
redis.zremrangebyscore(GEO_EVENT_LOG_GAPS, '-inf', outdated_timestamp)
gap_ids = redis.zrangebyscore(GEO_EVENT_LOG_GAPS, '-inf', grace_timestamp, with_scores: true)
gap_ids.each do |event_id, score|
handled = yield event_id.to_i
redis.zrem(GEO_EVENT_LOG_GAPS, event_id) if handled
end
end
end
def track_gaps(current_id)
log_info("Event log gap detected", previous_event_id: previous_id, current_event_id: current_id)
with_redis do |redis|
expire_time = Time.now.to_i
((previous_id + 1)..(current_id - 1)).each do |gap_id|
redis.zadd(GEO_EVENT_LOG_GAPS, expire_time, gap_id)
end
end
end
def gap?(current_id)
return false if previous_id <= 0
current_id > (previous_id + 1)
end
private
def grace_timestamp
(Time.now - GAP_GRACE_PERIOD).to_i
end
def outdated_timestamp
(Time.now - GAP_OUTDATED_PERIOD).to_i
end
end
end
end
......@@ -11,7 +11,7 @@ module Gitlab
@filename = upload.absolute_path
@request_data = build_request_data(upload)
rescue ObjectStorage::RemoteStoreError
Rails.logger.warn "Cannot transfer a remote object."
::Gitlab::Geo::Logger.warn "Cannot transfer a remote object."
end
private
......
......@@ -34,6 +34,8 @@ module Gitlab
end
def run_once!
gap_tracking.fill_gaps { |event_id| handle_gap_event(event_id) }
# Wrap this with the connection to make it possible to reconnect if
# PGbouncer dies: https://github.com/rails/rails/issues/29189
ActiveRecord::Base.connection_pool.with_connection do
......@@ -41,44 +43,50 @@ module Gitlab
end
end
private
def handle_events(batch, last_id)
def handle_events(batch, previous_batch_last_id)
logger.info("Handling events", first_id: batch.first.id, last_id: batch.last.id)
last_event_id = last_id
gap_tracking.previous_id = previous_batch_last_id
batch.each do |event_log|
gap_tracking.check!(event_log.id)
batch.each_with_index do |event_log, index|
handle_single_event(event_log)
end
end
def handle_single_event(event_log)
event = event_log.event
# If a project is deleted, the event log and its associated event data
# could be purged from the log. We ignore this and move along.
unless event
logger.warn("Unknown event", event_log_id: event_log.id)
next
return
end
check_event_id(last_event_id, event_log.id) if last_event_id > 0
last_event_id = event_log.id
unless can_replay?(event_log)
logger.event_info(event_log.created_at, 'Skipped event', event_data(event_log))
next
return
end
begin
process_event(event, event_log)
end
def process_event(event, event_log)
event_klass_for(event).new(event, event_log.created_at, logger).process
rescue NoMethodError => e
logger.error(e.message)
raise e
end
end
end
def check_event_id(last_event_id, current_log_id)
if last_event_id + 1 != current_log_id
logger.info("Event log gap", previous_event_log_id: last_event_id, event_log_id: current_log_id)
end
def handle_gap_event(event_id)
event_log = ::Geo::EventLog.find_by(id: event_id)
return false unless event_log
handle_single_event(event_log)
true
end
def event_klass_for(event)
......@@ -120,6 +128,10 @@ module Gitlab
sleep(delay + rand(1..20) * 0.1)
end
def gap_tracking
@gap_tracking ||= ::Gitlab::Geo::EventGapTracking.new(logger)
end
def logger
strong_memoize(:logger) do
Gitlab::Geo::LogCursor::Logger.new(self.class, log_level)
......
require 'spec_helper'
describe Gitlab::Geo::EventGapTracking, :clean_gitlab_redis_cache do
let(:previous_event_id) { 7 }
let(:gap_id) { previous_event_id + 1 }
let(:event_id_with_gap) { previous_event_id + 2 }
subject(:gap_tracking) { described_class.new }
before do
gap_tracking.previous_id = previous_event_id
end
describe '.min_gap_id' do
it 'returns nil when there are no gaps' do
expect(described_class.min_gap_id).to eq(nil)
end
it 'returns the lowest gap id' do
Timecop.travel(50.minutes.ago) do
gap_tracking.previous_id = 18
gap_tracking.track_gaps(20)
end
Timecop.travel(40.minutes.ago) do
gap_tracking.previous_id = 12
gap_tracking.track_gaps(14)
end
expect(described_class.min_gap_id).to eq(13)
end
end
describe '.gap_count' do
it 'returns 0 when there are no gaps' do
expect(described_class.gap_count).to be_zero
end
it 'returns the number of gaps' do
gap_tracking.previous_id = 18
gap_tracking.track_gaps(20)
gap_tracking.previous_id = 12
gap_tracking.track_gaps(14)
expect(described_class.gap_count).to eq(2)
end
end
describe '#check!' do
it 'does nothing when previous id not valid' do
gap_tracking.previous_id = 0
expect(gap_tracking).not_to receive(:gap?)
gap_tracking.check!(event_id_with_gap)
expect(gap_tracking.previous_id).to eq(event_id_with_gap)
end
it 'does nothing when there is no gap' do
expect(gap_tracking).not_to receive(:track_gaps)
gap_tracking.check!(previous_event_id + 1)
expect(gap_tracking.previous_id).to eq(previous_event_id + 1)
end
it 'tracks the gap if there is one' do
expect(gap_tracking).to receive(:track_gaps)
gap_tracking.check!(event_id_with_gap)
expect(gap_tracking.previous_id).to eq(event_id_with_gap)
end
end
describe '#fill_gaps' do
it 'ignore gaps that are less than 10 minutes old' do
Timecop.freeze do
gap_tracking.check!(event_id_with_gap)
expect { |blk| gap_tracking.fill_gaps(&blk) }.not_to yield_with_args(anything)
end
end
it 'handles gaps that are more than 10 minutes old' do
gap_tracking.check!(event_id_with_gap)
Timecop.travel(12.minutes) do
expect { |blk| gap_tracking.fill_gaps(&blk) }.to yield_with_args(gap_id)
end
end
it 'drops gaps older than 1 hour' do
gap_tracking.check!(event_id_with_gap)
Timecop.travel(62.minutes) do
expect { |blk| gap_tracking.fill_gaps(&blk) }.not_to yield_with_args(anything)
end
expect(read_gaps).to be_empty
end
end
describe '#track_gaps' do
it 'logs a message' do
expect(gap_tracking).to receive(:log_info).with(/gap detected/, hash_including(previous_event_id: previous_event_id, current_event_id: event_id_with_gap))
gap_tracking.track_gaps(event_id_with_gap)
end
it 'saves the gap id in redis' do
Timecop.freeze do
gap_tracking.track_gaps(event_id_with_gap)
expect(read_gaps).to contain_exactly([gap_id.to_s, Time.now.to_i])
end
end
it 'saves a range of gaps id in redis' do
Timecop.freeze do
gap_tracking.track_gaps(event_id_with_gap + 3)
expected_gaps = ((previous_event_id + 1)..(event_id_with_gap + 2)).collect { |id| [id.to_s, Time.now.to_i] }
expect(read_gaps).to match_array(expected_gaps)
end
end
it 'saves the gaps in order' do
expected_gaps = []
Timecop.freeze do
gap_tracking.track_gaps(event_id_with_gap)
expected_gaps << [gap_id.to_s, Time.now.to_i]
end
Timecop.travel(2.minutes) do
gap_tracking.previous_id = 17
gap_tracking.track_gaps(19)
expected_gaps << [18.to_s, Time.now.to_i]
end
expect(read_gaps).to eq(expected_gaps)
end
end
describe '#gap?' do
it 'returns false when current_id is the previous +1' do
expect(gap_tracking.gap?(previous_event_id + 1)).to be_falsy
end
it 'returns true when current_id is the previous +2' do
expect(gap_tracking.gap?(previous_event_id + 2)).to be_truthy
end
it 'returns false when current_id is equal to the previous' do
expect(gap_tracking.gap?(previous_event_id)).to be_falsy
end
it 'returns false when current_id less than the previous' do
expect(gap_tracking.gap?(previous_event_id - 1)).to be_falsy
end
it 'returns false when previous id is 0' do
gap_tracking.previous_id = 0
expect(gap_tracking.gap?(100)).to be_falsy
end
end
def read_gaps
::Gitlab::Redis::SharedState.with do |redis|
redis.zrangebyscore(described_class::GEO_EVENT_LOG_GAPS, '-inf', '+inf', with_scores: true)
end
end
end
......@@ -84,6 +84,15 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
daemon.run_once!
end
it 'calls #handle_gap_event for each gap the gap tracking finds' do
allow(daemon.gap_tracking).to receive(:fill_gaps).and_yield(1).and_yield(5)
expect(daemon).to receive(:handle_gap_event).with(1)
expect(daemon).to receive(:handle_gap_event).with(5)
daemon.run_once!
end
end
context 'when node has namespace restrictions' do
......@@ -116,32 +125,28 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
daemon.run_once!
end
it "logs a message if an event was skipped" do
it 'detects when an event was skipped' do
updated_event = create(:geo_repository_updated_event, project: project)
new_event = create(:geo_event_log, id: event_log.id + 2, repository_updated_event: updated_event)
expect(Gitlab::Geo::Logger).to receive(:info)
.with(hash_including(
class: 'Gitlab::Geo::LogCursor::Daemon',
message: 'Event log gap',
previous_event_log_id: event_log.id,
event_log_id: new_event.id))
daemon.run_once!
expect(read_gaps).to eq([event_log.id + 1])
expect(::Geo::EventLogState.last_processed.id).to eq(new_event.id)
end
it 'detects when an event was skipped between batches' do
updated_event = create(:geo_repository_updated_event, project: project)
new_event = create(:geo_event_log, repository_updated_event: updated_event)
# Test that the cursor picks up from the last stored ID
third_event = create(:geo_event_log, id: new_event.id + 3, repository_updated_event: updated_event)
daemon.run_once!
expect(Gitlab::Geo::Logger).to receive(:info)
.with(hash_including(
class: 'Gitlab::Geo::LogCursor::Daemon',
message: 'Event log gap',
previous_event_log_id: new_event.id,
event_log_id: third_event.id))
create(:geo_event_log, id: new_event.id + 3, repository_updated_event: updated_event)
daemon.run_once!
expect(read_gaps).to eq([new_event.id + 1, new_event.id + 2])
end
it "logs a message if an associated event can't be found" do
......@@ -183,4 +188,67 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
end
end
end
describe '#handle_events' do
let(:batch) { create_list(:geo_event_log, 2) }
it 'passes the previous batch id on to gap tracking' do
expect(daemon.gap_tracking).to receive(:previous_id=).with(55).ordered
batch.each do |event_log|
expect(daemon.gap_tracking).to receive(:previous_id=).with(event_log.id).ordered
end
daemon.handle_events(batch, 55)
end
it 'checks for gaps for each id in batch' do
batch.each do |event_log|
expect(daemon.gap_tracking).to receive(:check!).with(event_log.id)
end
daemon.handle_events(batch, 55)
end
it 'handles every single event' do
batch.each do |event_log|
expect(daemon).to receive(:handle_single_event).with(event_log)
end
daemon.handle_events(batch, 55)
end
end
describe '#handle_single_event' do
set(:event_log) { create(:geo_event_log, :updated_event) }
it 'skips execution when no event data is found' do
event_log = build(:geo_event_log)
expect(daemon).not_to receive(:can_replay?)
daemon.handle_single_event(event_log)
end
it 'checks if it can replay the event' do
expect(daemon).to receive(:can_replay?)
daemon.handle_single_event(event_log)
end
it 'processes event when it is replayable' do
allow(daemon).to receive(:can_replay?).and_return(true)
expect(daemon).to receive(:process_event).with(event_log.event, event_log)
daemon.handle_single_event(event_log)
end
end
def read_gaps
gaps = []
Timecop.travel(12.minutes) do
daemon.gap_tracking.fill_gaps { |id| gaps << id }
end
gaps
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment