Commit a4308d4d authored by Nick Thomas's avatar Nick Thomas

Merge branch 'feature/ha-geologcursor' into 'master'

Make GeoLogCursor Highly Available

Closes #2917

See merge request gitlab-org/gitlab-ee!3305
parents 74d5422a 3307e07a
...@@ -2,16 +2,11 @@ ...@@ -2,16 +2,11 @@
# vim: ft=ruby # vim: ft=ruby
require 'rubygems' require 'rubygems'
require 'bundler/setup' require 'bundler/setup'
# loads rails environment / initializers
require "#{File.dirname(__FILE__)}/../config/environment"
require 'optparse' require 'optparse'
class GeoLogCursorOptionParser class GeoLogCursorOptionParser
def self.parse(argv) def self.parse(argv)
options = { full_scan: false } options = { full_scan: false }
version = Gitlab::Geo::LogCursor::Daemon::VERSION
op = OptionParser.new op = OptionParser.new
op.banner = 'GitLab Geo: Log Cursor' op.banner = 'GitLab Geo: Log Cursor'
...@@ -19,6 +14,7 @@ class GeoLogCursorOptionParser ...@@ -19,6 +14,7 @@ class GeoLogCursorOptionParser
op.separator 'Usage: ./geo_log_cursor [options]' op.separator 'Usage: ./geo_log_cursor [options]'
op.separator '' op.separator ''
op.on('-f', '--full-scan', 'Performs full-scan to lookup for un-replicated data') { options[:full_scan] = true } op.on('-f', '--full-scan', 'Performs full-scan to lookup for un-replicated data') { options[:full_scan] = true }
op.on('-d', '--debug', 'Enable detailed logging with extra debug information') { options[:debug] = true }
op.separator 'Common options:' op.separator 'Common options:'
op.on('-h', '--help') do op.on('-h', '--help') do
...@@ -26,7 +22,10 @@ class GeoLogCursorOptionParser ...@@ -26,7 +22,10 @@ class GeoLogCursorOptionParser
exit exit
end end
op.on('-v', '--version') do op.on('-v', '--version') do
puts version # Load only necessary libraries for faster startup
require "#{File.dirname(__FILE__)}/../lib/gitlab/geo/log_cursor/daemon"
puts Gitlab::Geo::LogCursor::Daemon::VERSION
exit exit
end end
op.separator '' op.separator ''
...@@ -40,5 +39,8 @@ end ...@@ -40,5 +39,8 @@ end
if $0 == __FILE__ if $0 == __FILE__
options = GeoLogCursorOptionParser.parse(ARGV) options = GeoLogCursorOptionParser.parse(ARGV)
# We load rails environment / initializers only here to get faster command line startup when `--help` and `--version`
require "#{File.dirname(__FILE__)}/../config/environment"
Gitlab::Geo::LogCursor::Daemon.new(options).run! Gitlab::Geo::LogCursor::Daemon.new(options).run!
end end
---
title: Make GeoLogCursor Highly Available
merge_request: 3305
author:
type: added
...@@ -56,6 +56,18 @@ module Gitlab ...@@ -56,6 +56,18 @@ module Gitlab
end end
end end
# Try to obtain the lease. Returns the UUID and current TTL, which will be
# zero if it's not taken.
def try_obtain_with_ttl
Gitlab::Redis::SharedState.with do |redis|
output = redis.set(@redis_shared_state_key, @uuid, nx: true, ex: @timeout) && @uuid
ttl = output ? 0 : redis.ttl(@redis_shared_state_key)
{ ttl: [ttl, 0].max, uuid: output }
end
end
# Try to renew an existing lease. Return lease UUID on success, # Try to renew an existing lease. Return lease UUID on success,
# false if the lease is taken by a different UUID or inexistent. # false if the lease is taken by a different UUID or inexistent.
def renew def renew
...@@ -71,5 +83,12 @@ module Gitlab ...@@ -71,5 +83,12 @@ module Gitlab
redis.exists(@redis_shared_state_key) redis.exists(@redis_shared_state_key)
end end
end end
# Returns true if the UUID for the key hasn't changed.
def same_uuid?
Gitlab::Redis::SharedState.with do |redis|
redis.get(@redis_shared_state_key) == @uuid
end
end
end end
end end
...@@ -2,8 +2,7 @@ module Gitlab ...@@ -2,8 +2,7 @@ module Gitlab
module Geo module Geo
module LogCursor module LogCursor
class Daemon class Daemon
VERSION = '0.1.0'.freeze VERSION = '0.2.0'.freeze
POOL_WAIT = 5.seconds.freeze
BATCH_SIZE = 250 BATCH_SIZE = 250
attr_reader :options attr_reader :options
...@@ -11,6 +10,7 @@ module Gitlab ...@@ -11,6 +10,7 @@ module Gitlab
def initialize(options = {}) def initialize(options = {})
@options = options @options = options
@exit = false @exit = false
logger.geo_logger.build.level = options[:debug] ? :debug : Rails.logger.level
end end
def run! def run!
...@@ -19,12 +19,12 @@ module Gitlab ...@@ -19,12 +19,12 @@ module Gitlab
full_scan! if options[:full_scan] full_scan! if options[:full_scan]
until exit? until exit?
run_once! lease = Lease.try_obtain_with_ttl { run_once! }
return if exit? return if exit?
# When no new event is found sleep for a few moments # When no new event is found sleep for a few moments
sleep(POOL_WAIT) arbitrary_sleep(lease[:ttl])
end end
end end
...@@ -48,9 +48,8 @@ module Gitlab ...@@ -48,9 +48,8 @@ module Gitlab
existing = ::Geo::ProjectRegistry.where(project_id: project_ids).pluck(:project_id) existing = ::Geo::ProjectRegistry.where(project_id: project_ids).pluck(:project_id)
missing_projects = project_ids - existing missing_projects = project_ids - existing
Gitlab::Geo::Logger.info( logger.info(
class: self.class.name, "Missing projects",
message: "Missing projects",
projects: missing_projects, projects: missing_projects,
project_count: missing_projects.count) project_count: missing_projects.count)
...@@ -114,7 +113,7 @@ module Gitlab ...@@ -114,7 +113,7 @@ module Gitlab
event = event_log.repository_created_event event = event_log.repository_created_event
registry = find_or_initialize_registry(event.project_id, resync_repository: true, resync_wiki: event.wiki_path.present?) registry = find_or_initialize_registry(event.project_id, resync_repository: true, resync_wiki: event.wiki_path.present?)
log_event_info( logger.event_info(
event_log.created_at, event_log.created_at,
message: 'Repository created', message: 'Repository created',
project_id: event.project_id, project_id: event.project_id,
...@@ -132,7 +131,7 @@ module Gitlab ...@@ -132,7 +131,7 @@ module Gitlab
event = event_log.repository_updated_event event = event_log.repository_updated_event
registry = find_or_initialize_registry(event.project_id, "resync_#{event.source}" => true) registry = find_or_initialize_registry(event.project_id, "resync_#{event.source}" => true)
log_event_info( logger.event_info(
event_log.created_at, event_log.created_at,
message: 'Repository update', message: 'Repository update',
project_id: event.project_id, project_id: event.project_id,
...@@ -154,7 +153,7 @@ module Gitlab ...@@ -154,7 +153,7 @@ module Gitlab
.new(event.project_id, event.deleted_project_name, disk_path, event.repository_storage_name) .new(event.project_id, event.deleted_project_name, disk_path, event.repository_storage_name)
.async_execute .async_execute
log_event_info( logger.event_info(
event_log.created_at, event_log.created_at,
message: 'Deleted project', message: 'Deleted project',
project_id: event.project_id, project_id: event.project_id,
...@@ -171,9 +170,9 @@ module Gitlab ...@@ -171,9 +170,9 @@ module Gitlab
job_id = ::Geo::RepositoriesCleanUpWorker.perform_in(1.hour, event.geo_node_id) job_id = ::Geo::RepositoriesCleanUpWorker.perform_in(1.hour, event.geo_node_id)
if job_id if job_id
log_info('Scheduled repositories clean up for Geo node', geo_node_id: event.geo_node_id, job_id: job_id) logger.info('Scheduled repositories clean up for Geo node', geo_node_id: event.geo_node_id, job_id: job_id)
else else
log_error('Could not schedule repositories clean up for Geo node', geo_node_id: event.geo_node_id) logger.error('Could not schedule repositories clean up for Geo node', geo_node_id: event.geo_node_id)
end end
end end
...@@ -188,7 +187,7 @@ module Gitlab ...@@ -188,7 +187,7 @@ module Gitlab
.new(event.project_id, old_path, new_path) .new(event.project_id, old_path, new_path)
.async_execute .async_execute
log_event_info( logger.event_info(
event_log.created_at, event_log.created_at,
message: 'Renaming project', message: 'Renaming project',
project_id: event.project_id, project_id: event.project_id,
...@@ -208,7 +207,7 @@ module Gitlab ...@@ -208,7 +207,7 @@ module Gitlab
old_storage_version: event.old_storage_version old_storage_version: event.old_storage_version
).async_execute ).async_execute
log_event_info( logger.event_info(
event_log.created_at, event_log.created_at,
message: 'Migrating project to hashed storage', message: 'Migrating project to hashed storage',
project_id: event.project_id, project_id: event.project_id,
...@@ -225,21 +224,16 @@ module Gitlab ...@@ -225,21 +224,16 @@ module Gitlab
registry registry
end end
def cursor_delay(created_at) # Sleeps for the expired TTL that remains on the lease plus some random seconds.
(Time.now - created_at).to_f.round(3) #
# This allows multiple GeoLogCursors to randomly process a batch of events,
# without favouring the shortest path (or latency).
def arbitrary_sleep(delay)
sleep(delay + rand(1..20) * 0.1)
end end
def log_event_info(created_at, message, params = {}) def logger
params[:cursor_delay_s] = cursor_delay(created_at) Gitlab::Geo::LogCursor::Logger
log_info(message, params)
end
def log_info(message, params = {})
Gitlab::Geo::Logger.info({ class: self.class.name, message: message }.merge(params))
end
def log_error(message, params = {})
Gitlab::Geo::Logger.error({ class: self.class.name, message: message }.merge(params))
end end
end end
end end
......
...@@ -4,19 +4,14 @@ module Gitlab ...@@ -4,19 +4,14 @@ module Gitlab
# Manages events from primary database and store state in the DR database # Manages events from primary database and store state in the DR database
class Events class Events
BATCH_SIZE = 50 BATCH_SIZE = 50
NAMESPACE = 'geo:gitlab'.freeze
LEASE_TIMEOUT = 5.minutes.freeze
LEASE_KEY = 'geo_log_cursor_processed'.freeze
# fetches up to BATCH_SIZE next events and keep track of batches # fetches up to BATCH_SIZE next events and keep track of batches
def self.fetch_in_batches def self.fetch_in_batches
try_obtain_lease do ::Geo::EventLog.where('id > ?', last_processed).find_in_batches(batch_size: BATCH_SIZE) do |batch|
::Geo::EventLog.where('id > ?', last_processed).find_in_batches(batch_size: BATCH_SIZE) do |batch| yield batch
yield batch
save_processed(batch.last.id) save_processed(batch.last.id)
break unless renew_lease! break unless Lease.renew!
end
end end
end end
...@@ -39,33 +34,6 @@ module Gitlab ...@@ -39,33 +34,6 @@ module Gitlab
-1 -1
end end
end end
# private methods
def self.try_obtain_lease
lease = exclusive_lease.try_obtain
unless lease
$stdout.puts 'Cannot obtain an exclusive lease. There must be another process already in execution.'
return
end
begin
yield lease
ensure
Gitlab::ExclusiveLease.cancel(LEASE_KEY, lease)
end
end
def self.renew_lease!
exclusive_lease.renew
end
def self.exclusive_lease
@lease ||= Gitlab::ExclusiveLease.new(LEASE_KEY, timeout: LEASE_TIMEOUT)
end
private_class_method :try_obtain_lease, :exclusive_lease
end end
end end
end end
......
module Gitlab
module Geo
module LogCursor
module Lease
NAMESPACE = 'geo:gitlab'.freeze
LEASE_TIMEOUT = 30.seconds.freeze
LEASE_KEY = 'geo_log_cursor_processed'.freeze
def self.exclusive_lease
@lease ||= Gitlab::ExclusiveLease.new(LEASE_KEY, timeout: LEASE_TIMEOUT)
end
def self.renew!
lease = exclusive_lease.renew
logger.debug lease ? 'Lease renewed.' : 'Lease not renewed.'
{ uuid: lease, ttl: lease ? 0 : LEASE_TIMEOUT }
end
def self.try_obtain_with_ttl
lease = exclusive_lease.try_obtain_with_ttl
unless lease[:ttl].zero? || exclusive_lease.same_uuid?
$stdout.puts lease_taken_message
logger.info(lease_taken_message)
return lease
end
begin
logger.debug('Lease obtained. Fetching events.')
yield
logger.debug('Finished fetching events.')
renew!
rescue => e
logger.error("Lease canceled due to error: #{e.message}")
Gitlab::ExclusiveLease.cancel(LEASE_KEY, lease[:uuid])
{ uuid: false, ttl: LEASE_TIMEOUT }
end
end
def self.lease_taken_message
'Cannot obtain an exclusive lease. There must be another process already in execution.'
end
def self.logger
Gitlab::Geo::LogCursor::Logger
end
private_class_method :exclusive_lease, :lease_taken_message, :logger
end
end
end
end
module Gitlab
module Geo
module LogCursor
module Logger
PID = Process.pid.freeze
def self.event_info(created_at, message, params = {})
args = { pid: PID,
class: caller_name,
message: message,
cursor_delay_s: cursor_delay(created_at) }.merge(params)
geo_logger.info(args)
end
def self.info(message, params = {})
geo_logger.info({ pid: PID, class: caller_name, message: message }.merge(params))
end
def self.error(message, params = {})
geo_logger.error({ pid: PID, class: caller_name, message: message }.merge(params))
end
def self.debug(message, params = {})
geo_logger.debug({ pid: PID, class: caller_name, message: message }.merge(params))
end
def self.geo_logger
Gitlab::Geo::Logger
end
def self.caller_name
caller_locations[1].to_s.rpartition('/').last[/[a-z_]*/]&.classify
end
def self.cursor_delay(created_at)
(Time.now - created_at).to_f.round(3)
end
private_class_method :caller_name, :cursor_delay
end
end
end
end
...@@ -4,6 +4,10 @@ module Gitlab ...@@ -4,6 +4,10 @@ module Gitlab
file_name_noext + '.log' file_name_noext + '.log'
end end
def self.debug(message)
build.debug(message)
end
def self.error(message) def self.error(message)
build.error(message) build.error(message)
end end
......
...@@ -19,6 +19,34 @@ describe Gitlab::ExclusiveLease, :clean_gitlab_redis_shared_state do ...@@ -19,6 +19,34 @@ describe Gitlab::ExclusiveLease, :clean_gitlab_redis_shared_state do
end end
end end
describe '#try_obtain_with_ttl' do
it 'cannot obtain twice before the lease has expired' do
lease = described_class.new(unique_key, timeout: 3600)
ttl_lease = lease.try_obtain_with_ttl
expect(ttl_lease[:uuid]).to be_present
expect(ttl_lease[:ttl]).to eq(0)
second_ttl_lease = lease.try_obtain_with_ttl
expect(second_ttl_lease[:uuid]).to be false
expect(second_ttl_lease[:ttl]).to be > 0
end
it 'can obtain after the lease has expired' do
timeout = 1
lease = described_class.new(unique_key, timeout: 1)
sleep(2 * timeout) # lease should have expired now
ttl_lease = lease.try_obtain_with_ttl
expect(ttl_lease[:uuid]).to be_present
expect(ttl_lease[:ttl]).to eq(0)
end
end
describe '#renew' do describe '#renew' do
it 'returns true when we have the existing lease' do it 'returns true when we have the existing lease' do
lease = described_class.new(unique_key, timeout: 3600) lease = described_class.new(unique_key, timeout: 3600)
...@@ -47,6 +75,23 @@ describe Gitlab::ExclusiveLease, :clean_gitlab_redis_shared_state do ...@@ -47,6 +75,23 @@ describe Gitlab::ExclusiveLease, :clean_gitlab_redis_shared_state do
end end
end end
describe '#same_uuid?' do
it 'returns true for an existing lease' do
lease = described_class.new(unique_key, timeout: 3600)
lease.try_obtain
expect(lease.same_uuid?).to eq(true)
end
it 'returns false for a lease that does not exist' do
described_class.new(unique_key, timeout: 3600).try_obtain
lease = described_class.new(unique_key, timeout: 3600)
expect(lease.same_uuid?).to eq(false)
end
end
describe '.get_uuid' do describe '.get_uuid' do
it 'gets the uuid if lease with the key associated exists' do it 'gets the uuid if lease with the key associated exists' do
uuid = described_class.new(unique_key, timeout: 3600).try_obtain uuid = described_class.new(unique_key, timeout: 3600).try_obtain
......
require 'spec_helper' require 'spec_helper'
describe Gitlab::Geo::LogCursor::Daemon, :postgresql do describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared_state do
include ::EE::GeoHelpers include ::EE::GeoHelpers
set(:primary) { create(:geo_node, :primary) } set(:primary) { create(:geo_node, :primary) }
...@@ -15,9 +15,9 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do ...@@ -15,9 +15,9 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
stub_env("::#{described_class}::POOL_WAIT", 0.1)
allow(daemon).to receive(:trap_signals) allow(daemon).to receive(:trap_signals)
allow(daemon).to receive(:arbitrary_sleep).and_return(0.1)
end end
describe '#run!' do describe '#run!' do
...@@ -52,6 +52,14 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do ...@@ -52,6 +52,14 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do
daemon.run! daemon.run!
end end
it 'skips execution if cannot achieve a lease' do
is_expected.to receive(:exit?).and_return(false, true)
is_expected.not_to receive(:run_once!)
expect_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain_with_ttl).and_return({ ttl: 1, uuid: false })
daemon.run!
end
end end
describe '#run_once!' do describe '#run_once!' do
......
...@@ -24,12 +24,6 @@ describe Gitlab::Geo::LogCursor::Events do ...@@ -24,12 +24,6 @@ describe Gitlab::Geo::LogCursor::Events do
expect(Geo::EventLogState.last.event_id).to eq(event_log_2.id) expect(Geo::EventLogState.last.event_id).to eq(event_log_2.id)
end end
end end
it 'skips execution if cannot achieve a lease' do
expect_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { }
expect { |b| described_class.fetch_in_batches(&b) }.not_to yield_control
end
end end
describe '.save_processed' do describe '.save_processed' do
......
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Lease, :clean_gitlab_redis_shared_state do
describe '.exclusive_lease' do
it 'returns an exclusive lease instance' do
expect(described_class.send(:exclusive_lease)).to be_an_instance_of(Gitlab::ExclusiveLease)
end
end
describe '.renew_lease!' do
it 'returns an exclusive lease instance' do
expect_any_instance_of(Gitlab::ExclusiveLease).to receive(:renew)
described_class.renew!
end
end
describe '.try_obtain_lease' do
it 'returns zero when there is no lease' do
result = described_class.try_obtain_with_ttl {}
expect(result[:ttl]).to eq(0)
expect(result[:uuid]).to be_present
end
it 'returns > 0 if there is a lease' do
allow(described_class).to receive(:try_obtain_with_ttl).and_return({ ttl: 1, uuid: false })
result = described_class.try_obtain_with_ttl {}
expect(result[:ttl]).to be > 0
expect(result[:uuid]).to be false
end
it 'returns > 0 if there was an error' do
expect(Gitlab::ExclusiveLease).to receive(:cancel)
result = described_class.try_obtain_with_ttl { raise StandardError }
expect(result[:ttl]).to be > 0
expect(result[:uuid]).to be false
end
end
end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Logger do
before do
stub_const("#{described_class.name}::PID", 111)
end
it 'logs an info event' do
expect(::Gitlab::Logger).to receive(:info).with(pid: 111,
class: "LoggerSpec",
message: 'Test')
described_class.info('Test')
end
it 'logs an error event' do
expect(::Gitlab::Logger).to receive(:error).with(pid: 111,
class: "LoggerSpec",
message: 'Test')
described_class.error('Test')
end
describe '.event_info' do
it 'logs an info event' do
expect(::Gitlab::Logger).to receive(:info).with(pid: 111,
class: "LoggerSpec",
message: 'Test',
cursor_delay_s: 0.0)
described_class.event_info(Time.now, 'Test')
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment