Commit e10c719d authored by Michael Kozono's avatar Michael Kozono

Fix race condition

parent 6aa71d9d
......@@ -55,6 +55,7 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
)
end
# Must be run before fetching the repository to avoid a race condition
def start_sync!(type)
new_count = retry_count(type) + 1
......@@ -66,9 +67,8 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
def finish_sync!(type)
update!(
# Indicate that the sync succeeded
# Indicate that the sync succeeded (but separately mark as synced atomically)
"last_#{type}_successful_sync_at" => Time.now,
"resync_#{type}" => false,
"#{type}_retry_count" => nil,
"#{type}_retry_at" => nil,
"force_to_redownload_#{type}" => false,
......@@ -78,6 +78,8 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
"#{type}_verification_checksum_sha" => nil,
"#{type}_checksum_mismatch" => false,
"last_#{type}_verification_failure" => nil)
mark_synced_atomically(type)
end
def fail_sync!(type, message, error, attrs = {})
......@@ -93,6 +95,10 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
resync_wiki: repository_created_event.wiki_path.present?)
end
# Marks the project as dirty.
#
# resync_#{type}_was_scheduled_at tracks scheduled_at to avoid a race condition.
# See the method #mark_synced_atomically.
def repository_updated!(repository_updated_event, scheduled_at)
type = repository_updated_event.source
......@@ -176,4 +182,34 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
def retry_count(type)
public_send("#{type}_retry_count") || -1 # rubocop:disable GitlabSecurity/PublicSend
end
# @return [Boolean] whether the update was successful
def mark_synced_atomically(type)
# Indicates whether the project is dirty (needs to be synced).
#
# This is the field we intend to reset to false.
sync_column = "resync_#{type}"
# The latest time that this project was marked as dirty.
#
# This field may change at any time when processing
# `RepositoryUpdatedEvent`s.
sync_scheduled_column = "resync_#{type}_was_scheduled_at"
# The time recorded just before syncing.
#
# We know this field won't change between `start_sync!` and `finish_sync!`
# because it is only updated by `start_sync!`, which is only done in the
# exclusive lease block.
sync_started_column = "last_#{type}_synced_at"
# This conditional update must be atomic since RepositoryUpdatedEvent may
# update resync_*_was_scheduled_at at any time.
num_rows = self.class
.where(project: project)
.where("#{sync_scheduled_column} IS NULL OR #{sync_scheduled_column} < #{sync_started_column}")
.update_all(sync_column => false)
num_rows > 0
end
end
......@@ -123,6 +123,24 @@ module Geo
@registry ||= Geo::ProjectRegistry.find_or_initialize_by(project_id: project.id)
end
def mark_sync_as_successful
log_info("Marking #{type} sync as successful")
persisted = registry.finish_sync!(type)
reschedule_sync unless persisted
log_info("Finished #{type} sync",
update_delay_s: update_delay_in_seconds,
download_time_s: download_time_in_seconds)
end
def reschedule_sync
log_info("Reschedule #{type} sync because a RepositoryUpdateEvent was processed during the sync")
::Geo::ProjectSyncWorker.perform_async(project.id, Time.now)
end
def fail_registry!(message, error, attrs = {})
log_error(message, error)
......
......@@ -32,16 +32,6 @@ module Geo
execute_housekeeping
end
def mark_sync_as_successful
log_info("Marking #{type} sync as successful")
registry.finish_sync!(type)
log_info('Finished repository sync',
update_delay_s: update_delay_in_seconds,
download_time_s: download_time_in_seconds)
end
def expire_repository_caches
log_info('Expiring caches')
project.repository.after_sync
......
......@@ -44,16 +44,6 @@ module Geo
repository.after_sync
end
def mark_sync_as_successful
log_info("Marking #{type} sync as successful")
registry.finish_sync!(type)
log_info('Finished wiki sync',
update_delay_s: update_delay_in_seconds,
download_time_s: download_time_in_seconds)
end
def schedule_repack
# No-op: we currently don't schedule wiki repository to repack
# TODO: https://gitlab.com/gitlab-org/gitlab-ce/issues/45523
......
---
title: 'Geo: Fix repository/wiki sync race condition with multiple updates, especially
in quick succession.'
merge_request: 6161
author:
type: fixed
......@@ -410,6 +410,30 @@ describe Geo::ProjectRegistry do
expect(subject.reload.repository_checksum_mismatch).to be false
expect(subject.reload.last_repository_verification_failure).to be_nil
end
context 'when a repository sync was scheduled after the last sync began' do
before do
subject.update!(resync_repository_was_scheduled_at: subject.last_repository_synced_at + 1.minute)
subject.finish_sync!(type)
end
it 'does not reset resync_repository' do
expect(subject.reload.resync_repository).to be true
end
it 'resets the other sync state fields' do
expect(subject.reload.repository_retry_count).to be_nil
expect(subject.reload.repository_retry_at).to be_nil
expect(subject.reload.force_to_redownload_repository).to be false
end
it 'resets the verification state' do
expect(subject.reload.repository_verification_checksum_sha).to be_nil
expect(subject.reload.repository_checksum_mismatch).to be false
expect(subject.reload.last_repository_verification_failure).to be_nil
end
end
end
context 'for a wiki' do
......@@ -450,6 +474,30 @@ describe Geo::ProjectRegistry do
expect(subject.reload.wiki_checksum_mismatch).to be false
expect(subject.reload.last_wiki_verification_failure).to be_nil
end
context 'when a wiki sync was scheduled after the last sync began' do
before do
subject.update!(resync_wiki_was_scheduled_at: subject.last_wiki_synced_at + 1.minute)
subject.finish_sync!(type)
end
it 'does not reset resync_wiki' do
expect(subject.reload.resync_wiki).to be true
end
it 'resets the other sync state fields' do
expect(subject.reload.wiki_retry_count).to be_nil
expect(subject.reload.wiki_retry_at).to be_nil
expect(subject.reload.force_to_redownload_wiki).to be false
end
it 'resets the verification state' do
expect(subject.reload.wiki_verification_checksum_sha).to be_nil
expect(subject.reload.wiki_checksum_mismatch).to be false
expect(subject.reload.last_wiki_verification_failure).to be_nil
end
end
end
end
......
......@@ -20,6 +20,7 @@ describe Geo::RepositorySyncService do
it_behaves_like 'geo base sync execution'
it_behaves_like 'geo base sync fetch and repack'
it_behaves_like 'reschedules sync due to race condition instead of waiting for backfill'
describe '#execute' do
let(:url_to_repo) { "#{primary.url}#{project.full_path}.git" }
......@@ -130,7 +131,9 @@ describe Geo::RepositorySyncService do
end
it 'marks resync as true after a failure' do
subject.execute
described_class.new(project).execute
expect(Geo::ProjectRegistry.last.resync_repository).to be false
allow(repository).to receive(:fetch_as_mirror)
.with(url_to_repo, remote_name: 'geo', forced: true)
......
......@@ -20,6 +20,7 @@ RSpec.describe Geo::WikiSyncService do
it_behaves_like 'geo base sync execution'
it_behaves_like 'geo base sync fetch and repack'
it_behaves_like 'reschedules sync due to race condition instead of waiting for backfill'
describe '#execute' do
let(:url_to_repo) { "#{primary.url}#{project.full_path}.wiki.git" }
......@@ -109,7 +110,7 @@ RSpec.describe Geo::WikiSyncService do
end
it 'marks resync as true after a failure' do
subject.execute
described_class.new(project).execute
allow(repository).to receive(:fetch_as_mirror)
.with(url_to_repo, remote_name: 'geo', forced: true)
......
......@@ -142,3 +142,19 @@ shared_examples 'sync retries use the snapshot RPC' do
end
end
end
shared_examples 'reschedules sync due to race condition instead of waiting for backfill' do
describe '#mark_sync_as_successful' do
let(:mark_sync_as_successful) { subject.send(:mark_sync_as_successful) }
let(:registry) { subject.send(:registry) }
context 'when RepositoryUpdatedEvent was processed during a sync' do
it 'reschedules the sync' do
expect(::Geo::ProjectSyncWorker).to receive(:perform_async)
expect(registry).to receive(:finish_sync!).and_return(false)
mark_sync_as_successful
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment