Commit f1e341a6 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre Committed by Nick Thomas

Resolve "Skip repository-changing events on Geo secondaries if the repository...

Resolve "Skip repository-changing events on Geo secondaries if the repository hasn't been backfilled yet"
parent c0af6dce
---
title: Skip repository-changing events on Geo secondaries if the repository hasn't
been backfilled yet
merge_request:
author:
type: fixed
......@@ -106,34 +106,43 @@ module Gitlab
registry = find_or_initialize_registry(event.project_id,
"resync_#{event.source}" => true, "#{event.source}_verification_checksum" => nil)
registry.save!
job_id = ::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
logger.event_info(
created_at,
'Repository update',
project_id: event.project_id,
source: event.source,
resync_repository: registry.resync_repository,
resync_wiki: registry.resync_wiki)
registry.save!
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
resync_wiki: registry.resync_wiki,
job_id: job_id)
end
def handle_repository_deleted_event(event, created_at)
job_id = ::Geo::RepositoryDestroyService
.new(event.project_id, event.deleted_project_name, event.deleted_path, event.repository_storage_name)
.async_execute
registry = find_or_initialize_registry(event.project_id)
skippable = registry.new_record?
logger.event_info(
created_at,
'Deleted project',
params = {
project_id: event.project_id,
repository_storage_name: event.repository_storage_name,
disk_path: event.deleted_path,
job_id: job_id)
skippable: skippable
}
unless skippable
params[:job_id] = ::Geo::RepositoryDestroyService.new(
event.project_id,
event.deleted_project_name,
event.deleted_path,
event.repository_storage_name
).async_execute
::Geo::ProjectRegistry.where(project_id: event.project_id).delete_all
end
# No need to create a project entry if it doesn't exist
::Geo::ProjectRegistry.where(project_id: event.project_id).delete_all
logger.event_info(created_at, 'Deleted project', params)
end
def handle_repositories_changed_event(event, created_at)
......@@ -151,41 +160,52 @@ module Gitlab
def handle_repository_renamed_event(event, created_at)
return unless event.project_id
old_path = event.old_path_with_namespace
new_path = event.new_path_with_namespace
job_id = ::Geo::RenameRepositoryService
.new(event.project_id, old_path, new_path)
.async_execute
registry = find_or_initialize_registry(event.project_id)
skippable = registry.new_record?
logger.event_info(
created_at,
'Renaming project',
params = {
project_id: event.project_id,
old_path: old_path,
new_path: new_path,
job_id: job_id)
old_path: event.old_path_with_namespace,
new_path: event.new_path_with_namespace,
skippable: skippable
}
unless skippable
params[:job_id] = ::Geo::RenameRepositoryService.new(
event.project_id,
event.old_path_with_namespace,
event.new_path_with_namespace
).async_execute
end
logger.event_info(created_at, 'Renaming project', params)
end
def handle_hashed_storage_migrated_event(event, created_at)
return unless event.project_id
job_id = ::Geo::HashedStorageMigrationService.new(
event.project_id,
old_disk_path: event.old_disk_path,
new_disk_path: event.new_disk_path,
old_storage_version: event.old_storage_version
).async_execute
registry = find_or_initialize_registry(event.project_id)
skippable = registry.new_record?
logger.event_info(
created_at,
'Migrating project to hashed storage',
params = {
project_id: event.project_id,
old_storage_version: event.old_storage_version,
new_storage_version: event.new_storage_version,
old_disk_path: event.old_disk_path,
new_disk_path: event.new_disk_path,
job_id: job_id)
skippable: skippable
}
unless skippable
params[:job_id] = ::Geo::HashedStorageMigrationService.new(
event.project_id,
old_disk_path: event.old_disk_path,
new_disk_path: event.new_disk_path,
old_storage_version: event.old_storage_version
).async_execute
end
logger.event_info(created_at, 'Migrating project to hashed storage', params)
end
def handle_hashed_storage_attachments_event(event, created_at)
......@@ -254,7 +274,7 @@ module Gitlab
::Geo::FileRegistry.where(file_id: event.upload_id, file_type: event.upload_type).delete_all
end
def find_or_initialize_registry(project_id, attrs)
def find_or_initialize_registry(project_id, attrs = nil)
registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
registry.assign_attributes(attrs)
registry
......
......@@ -7,6 +7,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
set(:secondary) { create(:geo_node) }
let(:options) { {} }
subject(:daemon) { described_class.new(options) }
around do |example|
......@@ -164,26 +165,35 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:repository_deleted_event) { event_log.repository_deleted_event }
let(:project) { repository_deleted_event.project }
let(:deleted_project_name) { repository_deleted_event.deleted_project_name }
let(:deleted_path) { repository_deleted_event.deleted_path }
it 'does not create a tracking database entry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end
it 'schedules a GeoRepositoryDestroyWorker' do
project_id = repository_deleted_event.project_id
project_name = repository_deleted_event.deleted_project_name
project_path = repository_deleted_event.deleted_path
context 'when a tracking entry does not exist' do
it 'does not schedule a GeoRepositoryDestroyWorker' do
expect(::GeoRepositoryDestroyWorker).not_to receive(:perform_async)
.with(project.id, deleted_project_name, deleted_path, project.repository_storage)
expect(::GeoRepositoryDestroyWorker).to receive(:perform_async)
.with(project_id, project_name, project_path, project.repository_storage)
daemon.run_once!
end
daemon.run_once!
it 'does not create a tracking entry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end
end
it 'removes the tracking database entry if exist' do
create(:geo_project_registry, :synced, project: project)
context 'when a tracking entry exists' do
let!(:tracking_entry) { create(:geo_project_registry, project: project) }
it 'schedules a GeoRepositoryDestroyWorker' do
expect(::GeoRepositoryDestroyWorker).to receive(:perform_async)
.with(project.id, deleted_project_name, deleted_path, project.repository_storage)
daemon.run_once!
end
expect { daemon.run_once! }.to change(Geo::ProjectRegistry, :count).by(-1)
it 'removes the tracking entry' do
expect { daemon.run_once! }.to change(Geo::ProjectRegistry, :count).by(-1)
end
end
end
......@@ -214,27 +224,33 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
let(:repository_updated_event) { create(:geo_repository_updated_event, project: project) }
let(:event_log) { create(:geo_event_log, repository_updated_event: repository_updated_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
before do
allow(Geo::ProjectSyncWorker).to receive(:perform_async)
end
let!(:registry) { create(:geo_project_registry, :synced, project: project) }
it 'replays events for projects that belong to selected namespaces to replicate' do
secondary.update!(namespaces: [group_1])
expect { daemon.run_once! }.to change(Geo::ProjectRegistry, :count).by(1)
expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(project.id, anything).once
daemon.run_once!
end
it 'does not replay events for projects that do not belong to selected namespaces to replicate' do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [group_2])
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
.with(project.id, anything)
daemon.run_once!
end
it 'does not replay events for projects that do not belong to selected shards to replicate' do
secondary.update!(selective_sync_type: 'shards', selective_sync_shards: ['broken'])
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
.with(project.id, anything)
daemon.run_once!
end
end
......@@ -242,20 +258,32 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
let(:event_log) { create(:geo_event_log, :renamed_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:repository_renamed_event) { event_log.repository_renamed_event }
let(:project) {repository_renamed_event.project }
let(:old_path_with_namespace) { repository_renamed_event.old_path_with_namespace }
let(:new_path_with_namespace) { repository_renamed_event.new_path_with_namespace }
it 'does not create a new project registry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
context 'when a tracking entry does not exist' do
it 'does not create a tracking entry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end
it 'does not schedule a Geo::RenameRepositoryWorker' do
expect(::Geo::RenameRepositoryWorker).not_to receive(:perform_async)
.with(project.id, old_path_with_namespace, new_path_with_namespace)
daemon.run_once!
end
end
it 'schedules a Geo::RenameRepositoryWorker' do
project_id = repository_renamed_event.project_id
old_path_with_namespace = repository_renamed_event.old_path_with_namespace
new_path_with_namespace = repository_renamed_event.new_path_with_namespace
context 'when a tracking entry does exists' do
it 'schedules a Geo::RenameRepositoryWorker' do
create(:geo_project_registry, project: project)
expect(::Geo::RenameRepositoryWorker).to receive(:perform_async)
.with(project_id, old_path_with_namespace, new_path_with_namespace)
expect(::Geo::RenameRepositoryWorker).to receive(:perform_async)
.with(project.id, old_path_with_namespace, new_path_with_namespace)
daemon.run_once!
daemon.run_once!
end
end
end
......@@ -263,21 +291,33 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
let(:event_log) { create(:geo_event_log, :hashed_storage_migration_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:hashed_storage_migrated_event) { event_log.hashed_storage_migrated_event }
let(:project) { hashed_storage_migrated_event.project }
let(:old_disk_path) { hashed_storage_migrated_event.old_disk_path }
let(:new_disk_path) { hashed_storage_migrated_event.new_disk_path }
let(:old_storage_version) { hashed_storage_migrated_event.old_storage_version }
context 'when a tracking entry does not exist' do
it 'does not create a tracking entry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end
it 'does not create a new project registry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
it 'does not schedule a Geo::HashedStorageMigrationWorker' do
expect(::Geo::HashedStorageMigrationWorker).not_to receive(:perform_async)
.with(project.id, old_disk_path, new_disk_path, old_storage_version)
daemon.run_once!
end
end
it 'schedules a Geo::HashedStorageMigrationWorker' do
project = hashed_storage_migrated_event.project
old_disk_path = hashed_storage_migrated_event.old_disk_path
new_disk_path = hashed_storage_migrated_event.new_disk_path
old_storage_version = hashed_storage_migrated_event.old_storage_version
context 'when a tracking entry exists' do
it 'schedules a Geo::HashedStorageMigrationWorker' do
create(:geo_project_registry, project: project)
expect(::Geo::HashedStorageMigrationWorker).to receive(:perform_async)
.with(project.id, old_disk_path, new_disk_path, old_storage_version)
expect(::Geo::HashedStorageMigrationWorker).to receive(:perform_async)
.with(project.id, old_disk_path, new_disk_path, old_storage_version)
daemon.run_once!
daemon.run_once!
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment