Commit 11b643d5 authored by Douwe Maan's avatar Douwe Maan

Merge branch 'tc-geo-prune-event-log' into 'master'

Prune Geo event log

Closes #3577

See merge request gitlab-org/gitlab-ee!3172
parents 0abf6c49 5c3ca746
......@@ -17,6 +17,8 @@ module Geo
# The `build_event` method is supposed to return an instance of the event
# that will be logged.
class EventStore
include ::Gitlab::Geo::ProjectLogHelpers
class << self
attr_accessor :event_type
end
......@@ -30,6 +32,7 @@ module Geo
def create
return unless Gitlab::Geo.primary?
return unless Gitlab::Geo.secondary_nodes.any? # no need to create an event if no one is listening
Geo::EventLog.create!("#{self.class.event_type}" => build_event)
rescue ActiveRecord::RecordInvalid, NoMethodError => e
......@@ -42,20 +45,5 @@ module Geo
raise NotImplementedError,
"#{self.class} does not implement #{__method__}"
end
def log_error(message, error)
Gitlab::Geo::Logger.error({
class: self.class.name,
message: message,
error: error
}.merge(log_params))
end
def log_params
{
project_id: project.id,
project_path: project.full_path
}
end
end
end
#
# Concern that helps with getting an exclusive lease for running a worker
#
# `#try_obtain_lease` takes a block which will be run if it was able to obtain the lease.
# Implement `#lease_timeout` to configure the timeout for the exclusive lease.
# Optionally override `#lease_key` to set the lease key, it defaults to the class name with underscores.
#
module ExclusiveLeaseGuard
extend ActiveSupport::Concern
# override in subclass
def lease_timeout
raise NotImplementedError
end
def lease_key
@lease_key ||= self.class.name.underscore
end
def log_error(message, extra_args = {})
logger.error(messages)
end
def try_obtain_lease
lease = exclusive_lease.try_obtain
......@@ -27,4 +43,8 @@ module ExclusiveLeaseGuard
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(lease_key, uuid)
end
def renew_lease!
exclusive_lease.renew
end
end
......@@ -2,6 +2,7 @@ module Geo
class BaseSchedulerWorker
include Sidekiq::Worker
include CronjobQueue
include ExclusiveLeaseGuard
DB_RETRIEVE_BATCH_SIZE = 1000
LEASE_TIMEOUT = 60.minutes
......@@ -72,10 +73,6 @@ module Geo
DB_RETRIEVE_BATCH_SIZE
end
def lease_key
@lease_key ||= self.class.name.underscore
end
def lease_timeout
LEASE_TIMEOUT
end
......@@ -139,33 +136,6 @@ module Geo
scheduled_jobs.map { |data| data[:job_id] }
end
def try_obtain_lease
lease = exclusive_lease.try_obtain
unless lease
log_error('Cannot obtain an exclusive lease. There must be another worker already in execution.')
return
end
begin
yield lease
ensure
release_lease(lease)
end
end
def exclusive_lease
@lease ||= Gitlab::ExclusiveLease.new(lease_key, timeout: lease_timeout)
end
def renew_lease!
exclusive_lease.renew
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(lease_key, uuid)
end
def current_node
Gitlab::Geo.current_node
end
......
module Geo
class PruneEventLogWorker
include Sidekiq::Worker
include CronjobQueue
include ExclusiveLeaseGuard
include ::Gitlab::Geo::LogHelpers
LEASE_TIMEOUT = 60.minutes
def lease_timeout
LEASE_TIMEOUT
end
def perform
return unless Gitlab::Geo.primary?
try_obtain_lease do
if Gitlab::Geo.secondary_nodes.empty?
log_info('No secondary nodes, delete all Geo Event Log entries')
Geo::EventLog.delete_all
return
end
cursor_last_event_ids = Gitlab::Geo.secondary_nodes.map do |node|
Geo::NodeStatusService.new.call(node).cursor_last_event_id
end
if cursor_last_event_ids.include?(nil)
log_info('Could not get status of all nodes, not deleting any entries from Geo Event Log', unhealthy_node_count: cursor_last_event_ids.count(nil))
return
end
log_info('Delete Geo Event Log entries up to id', geo_event_log_id: cursor_last_event_ids.min)
Geo::EventLog.where('id < ?', cursor_last_event_ids.min).delete_all
end
end
end
end
......@@ -3,6 +3,7 @@ module Geo
include Sidekiq::Worker
include GeoQueue
include Gitlab::ShellAdapter
include ExclusiveLeaseGuard
BATCH_SIZE = 250
LEASE_TIMEOUT = 60.minutes
......@@ -40,31 +41,8 @@ module Geo
end
end
def try_obtain_lease
lease = exclusive_lease.try_obtain
unless lease
log_error('Cannot obtain an exclusive lease. There must be another worker already in execution.')
return
end
begin
yield lease
ensure
release_lease(lease)
end
end
def lease_key
@lease_key ||= self.class.name.underscore
end
def exclusive_lease
@lease ||= Gitlab::ExclusiveLease.new(lease_key, timeout: LEASE_TIMEOUT)
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(lease_key, uuid)
def lease_timeout
LEASE_TIMEOUT
end
def log_info(message, params = {})
......
---
title: Add worker to prune the Geo Event Log
merge_request: 3172
author:
type: added
......@@ -246,6 +246,11 @@ production: &base
geo_metrics_update_worker:
cron: "*/1 * * * *"
# GitLab Geo prune event log worker
# NOTE: This will only take effect if Geo is enabled (primary node only)
geo_prune_event_log_worker:
cron: "0 */6 * * *"
# GitLab Geo repository sync worker
# NOTE: This will only take effect if Geo is enabled (secondary nodes only)
geo_repository_sync_worker:
......
......@@ -452,6 +452,9 @@ Settings.cron_jobs['geo_repository_sync_worker']['job_class'] ||= 'Geo::Reposito
Settings.cron_jobs['geo_file_download_dispatch_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_file_download_dispatch_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['geo_file_download_dispatch_worker']['job_class'] ||= 'Geo::FileDownloadDispatchWorker'
Settings.cron_jobs['geo_prune_event_log_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_prune_event_log_worker']['cron'] ||= '0 */6 * * *'
Settings.cron_jobs['geo_prune_event_log_worker']['job_class'] ||= 'Geo::PruneEventLogWorker'
Settings.cron_jobs['import_export_project_cleanup_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['import_export_project_cleanup_worker']['cron'] ||= '0 * * * *'
Settings.cron_jobs['import_export_project_cleanup_worker']['job_class'] = 'ImportExportProjectCleanupWorker'
......
......@@ -7,9 +7,10 @@ module Gitlab
Gitlab::Geo::Logger.info(data)
end
def log_error(message, error)
def log_error(message, error = nil, details = {})
data = base_log_data(message)
data[:error] = error.to_s
data[:error] = error.to_s if error
data.merge!(details) if details
Gitlab::Geo::Logger.error(data)
end
......
......@@ -279,25 +279,7 @@ describe Admin::GeoNodesController, :postgresql do
end
context 'with add-on license' do
let(:geo_node_status) do
GeoNodeStatus.new(
id: 1,
health: nil,
attachments_count: 329,
attachments_failed_count: 13,
attachments_synced_count: 141,
lfs_objects_count: 256,
lfs_objects_failed_count: 12,
lfs_objects_synced_count: 123,
repositories_count: 10,
repositories_synced_count: 5,
repositories_failed_count: 0,
last_event_id: 2,
last_event_timestamp: Time.now.to_i,
cursor_last_event_id: 1,
cursor_last_event_timestamp: Time.now.to_i
)
end
let(:geo_node_status) { build(:geo_node_status, :healthy) }
before do
allow(Gitlab::Geo).to receive(:license_allows?).and_return(true)
......
......@@ -67,7 +67,8 @@ describe Namespace do
describe '#move_dir' do
context 'when running on a primary node' do
let!(:geo_node) { create(:geo_node, :primary) }
set(:primary) { create(:geo_node, :primary) }
set(:secondary) { create(:geo_node) }
let(:gitlab_shell) { Gitlab::Shell.new }
it 'logs the Geo::RepositoryRenamedEvent for each project inside namespace' do
......@@ -84,7 +85,6 @@ describe Namespace do
allow(parent).to receive(:full_path).and_return(new_path)
allow(gitlab_shell).to receive(:mv_namespace)
.ordered
.with(project_1.repository_storage_path, full_path_was, new_path)
.and_return(true)
......
......@@ -759,7 +759,8 @@ describe Project do
describe '#rename_repo' do
context 'when running on a primary node' do
let!(:geo_node) { create(:geo_node, :primary) }
set(:primary) { create(:geo_node, :primary) }
set(:secondary) { create(:geo_node) }
let(:project) { create(:project, :repository) }
let(:gitlab_shell) { Gitlab::Shell.new }
......
......@@ -161,7 +161,8 @@ describe Projects::CreateService, '#execute' do
end
context 'when running on a primary node' do
let!(:geo_node) { create(:geo_node, :primary) }
set(:primary) { create(:geo_node, :primary) }
set(:secondary) { create(:geo_node) }
it 'logs an event to the Geo event log' do
expect { create_project(user, opts) }.to change(Geo::RepositoryCreatedEvent, :count).by(1)
......
......@@ -31,7 +31,8 @@ describe Projects::DestroyService do
end
context 'when running on a primary node' do
let!(:geo_node) { create(:geo_node, :primary) }
set(:primary) { create(:geo_node, :primary) }
set(:secondary) { create(:geo_node) }
it 'logs an event to the Geo event log' do
# Run Sidekiq immediately to check that renamed repository will be removed
......
......@@ -7,9 +7,10 @@ describe Projects::HashedStorageMigrationService do
let(:hashed_storage) { Storage::HashedProject.new(project) }
describe '#execute' do
it 'creates a Geo::RepositoryRenamedEvent on success' do
allow(Gitlab::Geo).to receive(:primary?).and_return(true)
set(:primary) { create(:geo_node, :primary) }
set(:secondary) { create(:geo_node) }
it 'creates a Geo::RepositoryRenamedEvent on success' do
expect { service.execute }.to change { Geo::EventLog.count }.by(1)
event = Geo::EventLog.first.event
......@@ -32,7 +33,6 @@ describe Projects::HashedStorageMigrationService do
allow(service).to receive(:move_repository).and_call_original
allow(service).to receive(:move_repository).with(from_name, to_name).once { false } # will disable first move only
allow(Gitlab::Geo).to receive(:primary?).and_return(true)
expect { service.execute }.not_to change { Geo::EventLog.count }
end
end
......
......@@ -12,7 +12,8 @@ describe Projects::TransferService do
end
context 'when running on a primary node' do
let!(:geo_node) { create(:geo_node, :primary) }
set(:primary) { create(:geo_node, :primary) }
set(:secondary) { create(:geo_node) }
it 'logs an event to the Geo event log' do
expect { subject.execute(group) }.to change(Geo::RepositoryRenamedEvent, :count).by(1)
......
FactoryGirl.define do
factory :geo_node_status do
skip_create
sequence(:id)
trait :healthy do
health nil
attachments_count 329
attachments_failed_count 13
attachments_synced_count 141
lfs_objects_count 256
lfs_objects_failed_count 12
lfs_objects_synced_count 123
repositories_count 10
repositories_synced_count 5
repositories_failed_count 0
last_event_id 2
last_event_timestamp Time.now.to_i
cursor_last_event_id 1
cursor_last_event_timestamp Time.now.to_i
end
trait :unhealthy do
health "Could not connect to Geo node - HTTP Status Code: 401 Unauthorized\nTest"
end
end
end
......@@ -2,6 +2,7 @@ require 'spec_helper'
describe Geo::RepositoryCreatedEventStore do
set(:project) { create(:project) }
set(:secondary_node) { create(:geo_node) }
subject(:create!) { described_class.new(project).create }
......@@ -17,6 +18,12 @@ describe Geo::RepositoryCreatedEventStore do
allow(Gitlab::Geo).to receive(:primary?) { true }
end
it 'does not create an event when there are no secondary nodes' do
allow(Gitlab::Geo).to receive(:secondary_nodes) { [] }
expect { create! }.not_to change(Geo::RepositoryCreatedEvent, :count)
end
it 'creates a created event' do
expect { create! }.to change(Geo::RepositoryCreatedEvent, :count).by(1)
end
......
require 'spec_helper'
describe Geo::RepositoryDeletedEventStore do
let(:project) { create(:project, path: 'bar') }
let!(:project_id) { project.id }
let!(:project_name) { project.name }
let!(:repo_path) { project.full_path }
let!(:wiki_path) { "#{project.full_path}.wiki" }
let!(:storage_name) { project.repository_storage }
let!(:storage_path) { project.repository_storage_path }
set(:project) { create(:project, path: 'bar') }
set(:secondary_node) { create(:geo_node) }
let(:project_id) { project.id }
let(:project_name) { project.name }
let(:repo_path) { project.full_path }
let(:wiki_path) { "#{project.full_path}.wiki" }
let(:storage_name) { project.repository_storage }
let(:storage_path) { project.repository_storage_path }
subject { described_class.new(project, repo_path: repo_path, wiki_path: wiki_path) }
......@@ -23,6 +24,12 @@ describe Geo::RepositoryDeletedEventStore do
allow(Gitlab::Geo).to receive(:primary?) { true }
end
it 'does not create an event when there are no secondary nodes' do
allow(Gitlab::Geo).to receive(:secondary_nodes) { [] }
expect { subject.create }.not_to change(Geo::RepositoryDeletedEvent, :count)
end
it 'creates a deleted event' do
expect { subject.create }.to change(Geo::RepositoryDeletedEvent, :count).by(1)
end
......
require 'spec_helper'
describe Geo::RepositoryRenamedEventStore do
let(:project) { create(:project, path: 'bar') }
set(:project) { create(:project, path: 'bar') }
set(:secondary_node) { create(:geo_node) }
let(:old_path) { 'foo' }
let(:old_path_with_namespace) { "#{project.namespace.full_path}/foo" }
......@@ -19,6 +20,12 @@ describe Geo::RepositoryRenamedEventStore do
allow(Gitlab::Geo).to receive(:primary?) { true }
end
it 'does not create an event when there are no secondary nodes' do
allow(Gitlab::Geo).to receive(:secondary_nodes) { [] }
expect { subject.create }.not_to change(Geo::RepositoryRenamedEvent, :count)
end
it 'creates a renamed event' do
expect { subject.create }.to change(Geo::RepositoryRenamedEvent, :count).by(1)
end
......
require 'spec_helper'
describe Geo::RepositoryUpdatedEventStore do
let(:project) { create(:project, :repository) }
set(:project) { create(:project, :repository) }
set(:secondary_node) { create(:geo_node) }
let(:blankrev) { Gitlab::Git::BLANK_SHA }
let(:refs) { ['refs/heads/tést', 'refs/tags/tag'] }
......@@ -26,6 +27,14 @@ describe Geo::RepositoryUpdatedEventStore do
allow(Gitlab::Geo).to receive(:primary?) { true }
end
it 'does not create an event when there are no secondary nodes' do
allow(Gitlab::Geo).to receive(:secondary_nodes) { [] }
subject = described_class.new(project, refs: refs, changes: changes)
expect { subject.create }.not_to change(Geo::RepositoryUpdatedEvent, :count)
end
it 'creates a push event' do
subject = described_class.new(project, refs: refs, changes: changes)
......
require 'spec_helper'
describe Geo::PruneEventLogWorker, :geo do
include ::EE::GeoHelpers
subject(:worker) { described_class.new }
set(:primary) { create(:geo_node, :primary, host: 'primary-geo-node') }
set(:secondary) { create(:geo_node) }
before do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(true)
end
describe '#perform' do
context 'current node secondary' do
before do
stub_current_geo_node(secondary)
end
it 'does nothing' do
expect(worker).not_to receive(:try_obtain_lease)
worker.perform
end
end
context 'current node primary' do
before do
stub_current_geo_node(primary)
end
it 'logs error when it cannot obtain lease' do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { nil }
expect(worker).to receive(:log_error).with('Cannot obtain an exclusive lease. There must be another worker already in execution.')
worker.perform
end
context 'no secondary nodes' do
before do
secondary.destroy
end
it 'deletes everything from the Geo event log' do
create_list(:geo_event_log, 2)
expect(worker).to receive(:log_info).with('No secondary nodes, delete all Geo Event Log entries')
expect { worker.perform }.to change { Geo::EventLog.count }.by(-2)
end
end
context 'multiple secondary nodes' do
set(:secondary2) { create(:geo_node) }
let(:healthy_status) { build(:geo_node_status, :healthy) }
let(:unhealthy_status) { build(:geo_node_status, :unhealthy) }
let(:node_status_service) do
service = double
allow(Geo::NodeStatusService).to receive(:new).and_return(service)
service
end
it 'contacts all secondary nodes for their status' do
expect(node_status_service).to receive(:call).twice { healthy_status }
expect(worker).to receive(:log_info).with('Delete Geo Event Log entries up to id', anything)
worker.perform
end
it 'aborts when there are unhealthy nodes' do
create_list(:geo_event_log, 2)
expect(node_status_service).to receive(:call).twice.and_return(healthy_status, unhealthy_status)
expect(worker).to receive(:log_info).with('Could not get status of all nodes, not deleting any entries from Geo Event Log', unhealthy_node_count: 1)
expect { worker.perform }.not_to change { Geo::EventLog.count }
end
it 'takes the integer-minimum value of all cursor_last_event_ids' do
events = create_list(:geo_event_log, 12)
allow(node_status_service).to receive(:call).twice.and_return(
build(:geo_node_status, :healthy, cursor_last_event_id: events[3]),
build(:geo_node_status, :healthy, cursor_last_event_id: events.last)
)
expect(worker).to receive(:log_info).with('Delete Geo Event Log entries up to id', geo_event_log_id: events[3])
expect { worker.perform }.to change { Geo::EventLog.count }.by(-3)
end
end
end
end
describe '#log_error' do
it 'calls the Geo logger' do
expect(Gitlab::Geo::Logger).to receive(:error)
worker.log_error('Something is wrong')
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment