Commit f33aba93 authored by Nick Thomas's avatar Nick Thomas

Merge branch...

Merge branch '4870-skip-repository-changing-events-on-geo-secondaries-if-the-repository-hasn-t-been-backfilled-yet' into 'master'

Resolve "Skip repository-changing events on Geo secondaries if the repository hasn't been backfilled yet"

Closes #4870

See merge request gitlab-org/gitlab-ee!5042
parents c0af6dce f1e341a6
---
title: Skip repository-changing events on Geo secondaries if the repository hasn't
been backfilled yet
merge_request:
author:
type: fixed
...@@ -106,34 +106,43 @@ module Gitlab ...@@ -106,34 +106,43 @@ module Gitlab
registry = find_or_initialize_registry(event.project_id, registry = find_or_initialize_registry(event.project_id,
"resync_#{event.source}" => true, "#{event.source}_verification_checksum" => nil) "resync_#{event.source}" => true, "#{event.source}_verification_checksum" => nil)
registry.save!
job_id = ::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
logger.event_info( logger.event_info(
created_at, created_at,
'Repository update', 'Repository update',
project_id: event.project_id, project_id: event.project_id,
source: event.source, source: event.source,
resync_repository: registry.resync_repository, resync_repository: registry.resync_repository,
resync_wiki: registry.resync_wiki) resync_wiki: registry.resync_wiki,
job_id: job_id)
registry.save!
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
end end
def handle_repository_deleted_event(event, created_at) def handle_repository_deleted_event(event, created_at)
job_id = ::Geo::RepositoryDestroyService registry = find_or_initialize_registry(event.project_id)
.new(event.project_id, event.deleted_project_name, event.deleted_path, event.repository_storage_name) skippable = registry.new_record?
.async_execute
logger.event_info( params = {
created_at,
'Deleted project',
project_id: event.project_id, project_id: event.project_id,
repository_storage_name: event.repository_storage_name, repository_storage_name: event.repository_storage_name,
disk_path: event.deleted_path, disk_path: event.deleted_path,
job_id: job_id) skippable: skippable
}
unless skippable
params[:job_id] = ::Geo::RepositoryDestroyService.new(
event.project_id,
event.deleted_project_name,
event.deleted_path,
event.repository_storage_name
).async_execute
::Geo::ProjectRegistry.where(project_id: event.project_id).delete_all
end
# No need to create a project entry if it doesn't exist logger.event_info(created_at, 'Deleted project', params)
::Geo::ProjectRegistry.where(project_id: event.project_id).delete_all
end end
def handle_repositories_changed_event(event, created_at) def handle_repositories_changed_event(event, created_at)
...@@ -151,41 +160,52 @@ module Gitlab ...@@ -151,41 +160,52 @@ module Gitlab
def handle_repository_renamed_event(event, created_at) def handle_repository_renamed_event(event, created_at)
return unless event.project_id return unless event.project_id
old_path = event.old_path_with_namespace registry = find_or_initialize_registry(event.project_id)
new_path = event.new_path_with_namespace skippable = registry.new_record?
job_id = ::Geo::RenameRepositoryService
.new(event.project_id, old_path, new_path)
.async_execute
logger.event_info( params = {
created_at,
'Renaming project',
project_id: event.project_id, project_id: event.project_id,
old_path: old_path, old_path: event.old_path_with_namespace,
new_path: new_path, new_path: event.new_path_with_namespace,
job_id: job_id) skippable: skippable
}
unless skippable
params[:job_id] = ::Geo::RenameRepositoryService.new(
event.project_id,
event.old_path_with_namespace,
event.new_path_with_namespace
).async_execute
end
logger.event_info(created_at, 'Renaming project', params)
end end
def handle_hashed_storage_migrated_event(event, created_at) def handle_hashed_storage_migrated_event(event, created_at)
return unless event.project_id return unless event.project_id
job_id = ::Geo::HashedStorageMigrationService.new( registry = find_or_initialize_registry(event.project_id)
event.project_id, skippable = registry.new_record?
old_disk_path: event.old_disk_path,
new_disk_path: event.new_disk_path,
old_storage_version: event.old_storage_version
).async_execute
logger.event_info( params = {
created_at,
'Migrating project to hashed storage',
project_id: event.project_id, project_id: event.project_id,
old_storage_version: event.old_storage_version, old_storage_version: event.old_storage_version,
new_storage_version: event.new_storage_version, new_storage_version: event.new_storage_version,
old_disk_path: event.old_disk_path, old_disk_path: event.old_disk_path,
new_disk_path: event.new_disk_path, new_disk_path: event.new_disk_path,
job_id: job_id) skippable: skippable
}
unless skippable
params[:job_id] = ::Geo::HashedStorageMigrationService.new(
event.project_id,
old_disk_path: event.old_disk_path,
new_disk_path: event.new_disk_path,
old_storage_version: event.old_storage_version
).async_execute
end
logger.event_info(created_at, 'Migrating project to hashed storage', params)
end end
def handle_hashed_storage_attachments_event(event, created_at) def handle_hashed_storage_attachments_event(event, created_at)
...@@ -254,7 +274,7 @@ module Gitlab ...@@ -254,7 +274,7 @@ module Gitlab
::Geo::FileRegistry.where(file_id: event.upload_id, file_type: event.upload_type).delete_all ::Geo::FileRegistry.where(file_id: event.upload_id, file_type: event.upload_type).delete_all
end end
def find_or_initialize_registry(project_id, attrs) def find_or_initialize_registry(project_id, attrs = nil)
registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id) registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
registry.assign_attributes(attrs) registry.assign_attributes(attrs)
registry registry
......
...@@ -7,6 +7,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -7,6 +7,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
set(:secondary) { create(:geo_node) } set(:secondary) { create(:geo_node) }
let(:options) { {} } let(:options) { {} }
subject(:daemon) { described_class.new(options) } subject(:daemon) { described_class.new(options) }
around do |example| around do |example|
...@@ -164,26 +165,35 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -164,26 +165,35 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) } let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:repository_deleted_event) { event_log.repository_deleted_event } let(:repository_deleted_event) { event_log.repository_deleted_event }
let(:project) { repository_deleted_event.project } let(:project) { repository_deleted_event.project }
let(:deleted_project_name) { repository_deleted_event.deleted_project_name }
let(:deleted_path) { repository_deleted_event.deleted_path }
it 'does not create a tracking database entry' do context 'when a tracking entry does not exist' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count) it 'does not schedule a GeoRepositoryDestroyWorker' do
end expect(::GeoRepositoryDestroyWorker).not_to receive(:perform_async)
.with(project.id, deleted_project_name, deleted_path, project.repository_storage)
it 'schedules a GeoRepositoryDestroyWorker' do
project_id = repository_deleted_event.project_id
project_name = repository_deleted_event.deleted_project_name
project_path = repository_deleted_event.deleted_path
expect(::GeoRepositoryDestroyWorker).to receive(:perform_async) daemon.run_once!
.with(project_id, project_name, project_path, project.repository_storage) end
daemon.run_once! it 'does not create a tracking entry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end
end end
it 'removes the tracking database entry if exist' do context 'when a tracking entry exists' do
create(:geo_project_registry, :synced, project: project) let!(:tracking_entry) { create(:geo_project_registry, project: project) }
it 'schedules a GeoRepositoryDestroyWorker' do
expect(::GeoRepositoryDestroyWorker).to receive(:perform_async)
.with(project.id, deleted_project_name, deleted_path, project.repository_storage)
daemon.run_once!
end
expect { daemon.run_once! }.to change(Geo::ProjectRegistry, :count).by(-1) it 'removes the tracking entry' do
expect { daemon.run_once! }.to change(Geo::ProjectRegistry, :count).by(-1)
end
end end
end end
...@@ -214,27 +224,33 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -214,27 +224,33 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
let(:repository_updated_event) { create(:geo_repository_updated_event, project: project) } let(:repository_updated_event) { create(:geo_repository_updated_event, project: project) }
let(:event_log) { create(:geo_event_log, repository_updated_event: repository_updated_event) } let(:event_log) { create(:geo_event_log, repository_updated_event: repository_updated_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) } let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let!(:registry) { create(:geo_project_registry, :synced, project: project) }
before do
allow(Geo::ProjectSyncWorker).to receive(:perform_async)
end
it 'replays events for projects that belong to selected namespaces to replicate' do it 'replays events for projects that belong to selected namespaces to replicate' do
secondary.update!(namespaces: [group_1]) secondary.update!(namespaces: [group_1])
expect { daemon.run_once! }.to change(Geo::ProjectRegistry, :count).by(1) expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(project.id, anything).once
daemon.run_once!
end end
it 'does not replay events for projects that do not belong to selected namespaces to replicate' do it 'does not replay events for projects that do not belong to selected namespaces to replicate' do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [group_2]) secondary.update!(selective_sync_type: 'namespaces', namespaces: [group_2])
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count) expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
.with(project.id, anything)
daemon.run_once!
end end
it 'does not replay events for projects that do not belong to selected shards to replicate' do it 'does not replay events for projects that do not belong to selected shards to replicate' do
secondary.update!(selective_sync_type: 'shards', selective_sync_shards: ['broken']) secondary.update!(selective_sync_type: 'shards', selective_sync_shards: ['broken'])
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count) expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
.with(project.id, anything)
daemon.run_once!
end end
end end
...@@ -242,20 +258,32 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -242,20 +258,32 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
let(:event_log) { create(:geo_event_log, :renamed_event) } let(:event_log) { create(:geo_event_log, :renamed_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) } let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:repository_renamed_event) { event_log.repository_renamed_event } let(:repository_renamed_event) { event_log.repository_renamed_event }
let(:project) {repository_renamed_event.project }
let(:old_path_with_namespace) { repository_renamed_event.old_path_with_namespace }
let(:new_path_with_namespace) { repository_renamed_event.new_path_with_namespace }
it 'does not create a new project registry' do context 'when a tracking entry does not exist' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count) it 'does not create a tracking entry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end
it 'does not schedule a Geo::RenameRepositoryWorker' do
expect(::Geo::RenameRepositoryWorker).not_to receive(:perform_async)
.with(project.id, old_path_with_namespace, new_path_with_namespace)
daemon.run_once!
end
end end
it 'schedules a Geo::RenameRepositoryWorker' do context 'when a tracking entry does exists' do
project_id = repository_renamed_event.project_id it 'schedules a Geo::RenameRepositoryWorker' do
old_path_with_namespace = repository_renamed_event.old_path_with_namespace create(:geo_project_registry, project: project)
new_path_with_namespace = repository_renamed_event.new_path_with_namespace
expect(::Geo::RenameRepositoryWorker).to receive(:perform_async) expect(::Geo::RenameRepositoryWorker).to receive(:perform_async)
.with(project_id, old_path_with_namespace, new_path_with_namespace) .with(project.id, old_path_with_namespace, new_path_with_namespace)
daemon.run_once! daemon.run_once!
end
end end
end end
...@@ -263,21 +291,33 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -263,21 +291,33 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
let(:event_log) { create(:geo_event_log, :hashed_storage_migration_event) } let(:event_log) { create(:geo_event_log, :hashed_storage_migration_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) } let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:hashed_storage_migrated_event) { event_log.hashed_storage_migrated_event } let(:hashed_storage_migrated_event) { event_log.hashed_storage_migrated_event }
let(:project) { hashed_storage_migrated_event.project }
let(:old_disk_path) { hashed_storage_migrated_event.old_disk_path }
let(:new_disk_path) { hashed_storage_migrated_event.new_disk_path }
let(:old_storage_version) { hashed_storage_migrated_event.old_storage_version }
context 'when a tracking entry does not exist' do
it 'does not create a tracking entry' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count)
end
it 'does not create a new project registry' do it 'does not schedule a Geo::HashedStorageMigrationWorker' do
expect { daemon.run_once! }.not_to change(Geo::ProjectRegistry, :count) expect(::Geo::HashedStorageMigrationWorker).not_to receive(:perform_async)
.with(project.id, old_disk_path, new_disk_path, old_storage_version)
daemon.run_once!
end
end end
it 'schedules a Geo::HashedStorageMigrationWorker' do context 'when a tracking entry exists' do
project = hashed_storage_migrated_event.project it 'schedules a Geo::HashedStorageMigrationWorker' do
old_disk_path = hashed_storage_migrated_event.old_disk_path create(:geo_project_registry, project: project)
new_disk_path = hashed_storage_migrated_event.new_disk_path
old_storage_version = hashed_storage_migrated_event.old_storage_version
expect(::Geo::HashedStorageMigrationWorker).to receive(:perform_async) expect(::Geo::HashedStorageMigrationWorker).to receive(:perform_async)
.with(project.id, old_disk_path, new_disk_path, old_storage_version) .with(project.id, old_disk_path, new_disk_path, old_storage_version)
daemon.run_once! daemon.run_once!
end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment