Commit 2c477c1c authored by Nick Thomas's avatar Nick Thomas

Merge branch 'mk/geo/fix-repo-updated-race-condition' into 'master'

Geo: Fix repository updated replication race condition

Closes #5489

See merge request gitlab-org/gitlab-ee!6161
parents 39851dfa 7e911edc
class Geo::ProjectRegistry < Geo::BaseRegistry
include ::Delay
include ::EachBatch
include ::IgnorableColumn
include ::ShaAttribute
RETRIES_BEFORE_REDOWNLOAD = 5
ignore_column :last_repository_verification_at
ignore_column :last_repository_verification_failed
ignore_column :last_wiki_verification_at
......@@ -52,6 +55,61 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
)
end
# Must be run before fetching the repository to avoid a race condition
def start_sync!(type)
new_count = retry_count(type) + 1
update!(
"last_#{type}_synced_at" => Time.now,
"#{type}_retry_count" => new_count,
"#{type}_retry_at" => next_retry_time(new_count))
end
def finish_sync!(type)
update!(
# Indicate that the sync succeeded (but separately mark as synced atomically)
"last_#{type}_successful_sync_at" => Time.now,
"#{type}_retry_count" => nil,
"#{type}_retry_at" => nil,
"force_to_redownload_#{type}" => false,
"last_#{type}_sync_failure" => nil,
# Indicate that repository verification needs to be done again
"#{type}_verification_checksum_sha" => nil,
"#{type}_checksum_mismatch" => false,
"last_#{type}_verification_failure" => nil)
mark_synced_atomically(type)
end
def fail_sync!(type, message, error, attrs = {})
attrs["resync_#{type}"] = true
attrs["last_#{type}_sync_failure"] = "#{message}: #{error.message}"
attrs["#{type}_retry_count"] = retry_count(type) + 1
update!(attrs)
end
def repository_created!(repository_created_event)
update!(resync_repository: true,
resync_wiki: repository_created_event.wiki_path.present?)
end
# Marks the project as dirty.
#
# resync_#{type}_was_scheduled_at tracks scheduled_at to avoid a race condition.
# See the method #mark_synced_atomically.
def repository_updated!(repository_updated_event, scheduled_at)
type = repository_updated_event.source
update!(
"resync_#{type}" => true,
"#{type}_verification_checksum_sha" => nil,
"#{type}_checksum_mismatch" => false,
"last_#{type}_verification_failure" => nil,
"resync_#{type}_was_scheduled_at" => scheduled_at)
end
def repository_sync_due?(scheduled_time)
never_synced_repository? || repository_sync_needed?(scheduled_time)
end
......@@ -78,6 +136,12 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
Gitlab::Redis::SharedState.with { |redis| redis.set(fetches_since_gc_redis_key, value) }
end
def should_be_redownloaded?(type)
return true if public_send("force_to_redownload_#{type}") # rubocop:disable GitlabSecurity/PublicSend
retry_count(type) > RETRIES_BEFORE_REDOWNLOAD
end
private
def fetches_since_gc_redis_key
......@@ -105,4 +169,47 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
last_wiki_synced_at && timestamp > last_wiki_synced_at
end
# To prevent the retry time from storing invalid dates in the database,
# cap the max time to a week plus some random jitter value.
def next_retry_time(retry_count)
proposed_time = Time.now + delay(retry_count).seconds
max_future_time = Time.now + 7.days + delay(1).seconds
[proposed_time, max_future_time].min
end
def retry_count(type)
public_send("#{type}_retry_count") || -1 # rubocop:disable GitlabSecurity/PublicSend
end
# @return [Boolean] whether the update was successful
def mark_synced_atomically(type)
# Indicates whether the project is dirty (needs to be synced).
#
# This is the field we intend to reset to false.
sync_column = "resync_#{type}"
# The latest time that this project was marked as dirty.
#
# This field may change at any time when processing
# `RepositoryUpdatedEvent`s.
sync_scheduled_column = "resync_#{type}_was_scheduled_at"
# The time recorded just before syncing.
#
# We know this field won't change between `start_sync!` and `finish_sync!`
# because it is only updated by `start_sync!`, which is only done in the
# exclusive lease block.
sync_started_column = "last_#{type}_synced_at"
# This conditional update must be atomic since RepositoryUpdatedEvent may
# update resync_*_was_scheduled_at at any time.
num_rows = self.class
.where(project: project)
.where("#{sync_scheduled_column} IS NULL OR #{sync_scheduled_column} < #{sync_started_column}")
.update_all(sync_column => false)
num_rows > 0
end
end
......@@ -16,7 +16,6 @@ module Geo
GEO_REMOTE_NAME = 'geo'.freeze
LEASE_TIMEOUT = 8.hours.freeze
LEASE_KEY_PREFIX = 'geo_sync_service'.freeze
RETRIES_BEFORE_REDOWNLOAD = 5
def initialize(project)
@project = project
......@@ -26,11 +25,7 @@ module Geo
try_obtain_lease do
log_info("Started #{type} sync")
if should_be_retried?
sync_repository
else
sync_repository(true)
end
sync_repository
log_info("Finished #{type} sync")
end
......@@ -46,15 +41,16 @@ module Geo
private
def fetch_repository(redownload)
def fetch_repository
log_info("Trying to fetch #{type}")
clean_up_temporary_repository
update_registry!(started_at: DateTime.now)
log_info("Marking #{type} sync as started")
registry.start_sync!(type)
if redownload
if redownload?
redownload_repository
set_temp_repository_as_main
schedule_repack
elsif repository.exists?
fetch_geo_mirror(repository)
......@@ -65,6 +61,10 @@ module Geo
end
end
def redownload?
registry.should_be_redownloaded?(type)
end
def schedule_repack
raise NotImplementedError
end
......@@ -82,16 +82,10 @@ module Geo
end
fetch_geo_mirror(temp_repo)
end
def retry_count
registry.public_send("#{type}_retry_count") || -1 # rubocop:disable GitlabSecurity/PublicSend
end
def should_be_retried?
return false if registry.public_send("force_to_redownload_#{type}") # rubocop:disable GitlabSecurity/PublicSend
retry_count <= RETRIES_BEFORE_REDOWNLOAD
set_temp_repository_as_main
ensure
clean_up_temporary_repository
end
def current_node
......@@ -132,40 +126,28 @@ module Geo
@registry ||= Geo::ProjectRegistry.find_or_initialize_by(project_id: project.id)
end
def update_registry!(started_at: nil, finished_at: nil, attrs: {})
return unless started_at || finished_at
def mark_sync_as_successful
log_info("Marking #{type} sync as successful")
log_info("Updating #{type} sync information")
persisted = registry.finish_sync!(type)
if started_at
attrs["last_#{type}_synced_at"] = started_at
attrs["#{type}_retry_count"] = retry_count + 1
attrs["#{type}_retry_at"] = next_retry_time(attrs["#{type}_retry_count"])
end
reschedule_sync unless persisted
if finished_at
attrs["last_#{type}_successful_sync_at"] = finished_at
attrs["resync_#{type}"] = false
attrs["#{type}_retry_count"] = nil
attrs["#{type}_retry_at"] = nil
attrs["force_to_redownload_#{type}"] = false
# Indicate that repository verification needs to be done again
attrs["#{type}_verification_checksum_sha"] = nil
attrs["#{type}_checksum_mismatch"] = false
attrs["last_#{type}_verification_failure"] = nil
end
log_info("Finished #{type} sync",
update_delay_s: update_delay_in_seconds,
download_time_s: download_time_in_seconds)
end
registry.update!(attrs)
def reschedule_sync
log_info("Reschedule #{type} sync because a RepositoryUpdateEvent was processed during the sync")
::Geo::ProjectSyncWorker.perform_async(project.id, Time.now)
end
def fail_registry!(message, error, attrs = {})
log_error(message, error)
attrs["resync_#{type}"] = true
attrs["last_#{type}_sync_failure"] = "#{message}: #{error.message}"
attrs["#{type}_retry_count"] = retry_count + 1
registry.update!(attrs)
registry.fail_sync!(type, message, error, attrs)
repository.clean_stale_repository_files
end
......@@ -259,14 +241,5 @@ module Geo
File.dirname(disk_path)
)
end
# To prevent the retry time from storing invalid dates in the database,
# cap the max time to a week plus some random jitter value.
def next_retry_time(retry_count)
proposed_time = Time.now + delay(retry_count).seconds
max_future_time = Time.now + 7.days + delay(1).seconds
[proposed_time, max_future_time].min
end
end
end
......@@ -4,8 +4,8 @@ module Geo
private
def sync_repository(redownload = false)
fetch_repository(redownload)
def sync_repository
fetch_repository
update_gitattributes
......@@ -27,19 +27,10 @@ module Geo
log_info('Expiring caches')
project.repository.after_create
ensure
clean_up_temporary_repository if redownload
expire_repository_caches
execute_housekeeping
end
def mark_sync_as_successful
update_registry!(finished_at: DateTime.now, attrs: { last_repository_sync_failure: nil })
log_info('Finished repository sync',
update_delay_s: update_delay_in_seconds,
download_time_s: download_time_in_seconds)
end
def expire_repository_caches
log_info('Expiring caches')
project.repository.after_sync
......
......@@ -4,8 +4,8 @@ module Geo
private
def sync_repository(redownload = false)
fetch_repository(redownload)
def sync_repository
fetch_repository
mark_sync_as_successful
rescue Gitlab::Git::RepositoryMirroring::RemoteError,
......@@ -23,7 +23,6 @@ module Geo
log_info('Setting force_to_redownload flag')
fail_registry!('Invalid wiki', e, force_to_redownload_wiki: true)
ensure
clean_up_temporary_repository if redownload
expire_repository_caches
end
......@@ -44,14 +43,6 @@ module Geo
repository.after_sync
end
def mark_sync_as_successful
update_registry!(finished_at: DateTime.now, attrs: { last_wiki_sync_failure: nil })
log_info('Finished wiki sync',
update_delay_s: update_delay_in_seconds,
download_time_s: download_time_in_seconds)
end
def schedule_repack
# No-op: we currently don't schedule wiki repository to repack
# TODO: https://gitlab.com/gitlab-org/gitlab-ce/issues/45523
......
---
title: 'Geo: Fix repository/wiki sync race condition with multiple updates, especially
in quick succession.'
merge_request: 6161
author:
type: fixed
class AddResyncWasScheduledAtToProjectRegistry < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def change
add_column :project_registry, :resync_repository_was_scheduled_at, :datetime_with_timezone
add_column :project_registry, :resync_wiki_was_scheduled_at, :datetime_with_timezone
end
end
......@@ -11,7 +11,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20180510223634) do
ActiveRecord::Schema.define(version: 20180613184349) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
......@@ -76,6 +76,8 @@ ActiveRecord::Schema.define(version: 20180510223634) do
t.boolean "wiki_checksum_mismatch", default: false, null: false
t.boolean "last_repository_check_failed"
t.datetime_with_timezone "last_repository_check_at"
t.datetime_with_timezone "resync_repository_was_scheduled_at"
t.datetime_with_timezone "resync_wiki_was_scheduled_at"
end
add_index "project_registry", ["last_repository_successful_sync_at"], name: "index_project_registry_on_last_repository_successful_sync_at", using: :btree
......
......@@ -16,7 +16,7 @@ module Gitlab
attr_reader :event, :created_at, :logger
def registry
@registry ||= find_or_initialize_registry
@registry ||= ::Geo::ProjectRegistry.find_or_initialize_by(project_id: event.project_id)
end
def skippable?
......@@ -32,12 +32,6 @@ module Gitlab
def enqueue_job_if_shard_healthy(event)
yield if healthy_shard_for?(event)
end
def find_or_initialize_registry(attrs = nil)
::Geo::ProjectRegistry.find_or_initialize_by(project_id: event.project_id).tap do |registry|
registry.assign_attributes(attrs)
end
end
end
end
end
......
......@@ -7,7 +7,7 @@ module Gitlab
def process
log_event
registry.save!
registry.repository_created!(event)
enqueue_job_if_shard_healthy(event) do
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
......@@ -16,12 +16,6 @@ module Gitlab
private
def registry
@registry ||= find_or_initialize_registry(
resync_repository: true,
resync_wiki: event.wiki_path.present?)
end
def log_event
logger.event_info(
created_at,
......
......@@ -6,10 +6,10 @@ module Gitlab
include BaseEvent
def process
registry.save!
registry.repository_updated!(event, scheduled_at)
job_id = enqueue_job_if_shard_healthy(event) do
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
::Geo::ProjectSyncWorker.perform_async(event.project_id, scheduled_at)
end
log_event(job_id)
......@@ -17,15 +17,6 @@ module Gitlab
private
def registry
@registry ||= find_or_initialize_registry(
"resync_#{event.source}" => true,
"#{event.source}_verification_checksum_sha" => nil,
"#{event.source}_checksum_mismatch" => false,
"last_#{event.source}_verification_failure" => nil
)
end
def log_event(job_id)
logger.event_info(
created_at,
......@@ -34,8 +25,13 @@ module Gitlab
source: event.source,
resync_repository: registry.resync_repository,
resync_wiki: registry.resync_wiki,
scheduled_at: scheduled_at,
job_id: job_id)
end
def scheduled_at
@scheduled_at ||= Time.now
end
end
end
end
......
......@@ -52,6 +52,15 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :c
last_repository_verification_failure: nil
)
end
it 'sets resync_repository_was_scheduled_at to the scheduled time' do
Timecop.freeze do
subject.process
reloaded_registry = registry.reload
expect(reloaded_registry.resync_repository_was_scheduled_at).to be_within(1.second).of(Time.now)
end
end
end
context 'when the event source is a wiki' do
......@@ -73,6 +82,15 @@ describe Gitlab::Geo::LogCursor::Events::RepositoryUpdatedEvent, :postgresql, :c
expect(reloaded_registry.wiki_checksum_mismatch).to be false
expect(reloaded_registry.last_wiki_verification_failure).to be_nil
end
it 'sets resync_wiki_was_scheduled_at to the scheduled time' do
Timecop.freeze do
subject.process
reloaded_registry = registry.reload
expect(reloaded_registry.resync_wiki_was_scheduled_at).to be_within(1.second).of(Time.now)
end
end
end
end
end
......
This diff is collapsed.
......@@ -20,6 +20,7 @@ describe Geo::RepositorySyncService do
it_behaves_like 'geo base sync execution'
it_behaves_like 'geo base sync fetch and repack'
it_behaves_like 'reschedules sync due to race condition instead of waiting for backfill'
describe '#execute' do
let(:url_to_repo) { "#{primary.url}#{project.full_path}.git" }
......@@ -130,7 +131,9 @@ describe Geo::RepositorySyncService do
end
it 'marks resync as true after a failure' do
subject.execute
described_class.new(project).execute
expect(Geo::ProjectRegistry.last.resync_repository).to be false
allow(repository).to receive(:fetch_as_mirror)
.with(url_to_repo, remote_name: 'geo', forced: true)
......@@ -248,15 +251,15 @@ describe Geo::RepositorySyncService do
context 'retries' do
it 'tries to fetch repo' do
create(:geo_project_registry, project: project, repository_retry_count: Geo::BaseSyncService::RETRIES_BEFORE_REDOWNLOAD - 1)
create(:geo_project_registry, project: project, repository_retry_count: Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD - 1)
expect(subject).to receive(:sync_repository).with(no_args)
expect(subject).to receive(:sync_repository)
subject.execute
end
it 'sets the redownload flag to false after success' do
registry = create(:geo_project_registry, project: project, repository_retry_count: Geo::BaseSyncService::RETRIES_BEFORE_REDOWNLOAD + 1, force_to_redownload_repository: true)
registry = create(:geo_project_registry, project: project, repository_retry_count: Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD + 1, force_to_redownload_repository: true)
subject.execute
......@@ -264,9 +267,9 @@ describe Geo::RepositorySyncService do
end
it 'tries to redownload repo' do
create(:geo_project_registry, project: project, repository_retry_count: Geo::BaseSyncService::RETRIES_BEFORE_REDOWNLOAD + 1)
create(:geo_project_registry, project: project, repository_retry_count: Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD + 1)
expect(subject).to receive(:sync_repository).with(true).and_call_original
expect(subject).to receive(:sync_repository).and_call_original
expect(subject.gitlab_shell).to receive(:mv_repository).exactly(2).times.and_call_original
expect(subject.gitlab_shell).to receive(:add_namespace).with(
......@@ -294,11 +297,11 @@ describe Geo::RepositorySyncService do
create(
:geo_project_registry,
project: project,
repository_retry_count: Geo::BaseSyncService::RETRIES_BEFORE_REDOWNLOAD - 1,
repository_retry_count: Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD - 1,
force_to_redownload_repository: true
)
expect(subject).to receive(:sync_repository).with(true)
expect(subject).to receive(:sync_repository)
subject.execute
end
......@@ -307,7 +310,7 @@ describe Geo::RepositorySyncService do
create(
:geo_project_registry,
project: project,
repository_retry_count: Geo::BaseSyncService::RETRIES_BEFORE_REDOWNLOAD - 1,
repository_retry_count: Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD - 1,
force_to_redownload_repository: true
)
......@@ -323,7 +326,7 @@ describe Geo::RepositorySyncService do
registry = create(
:geo_project_registry,
project: project,
repository_retry_count: Geo::BaseSyncService::RETRIES_BEFORE_REDOWNLOAD + 2000,
repository_retry_count: Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD + 2000,
repository_retry_at: timestamp,
force_to_redownload_repository: true
)
......
......@@ -20,6 +20,7 @@ RSpec.describe Geo::WikiSyncService do
it_behaves_like 'geo base sync execution'
it_behaves_like 'geo base sync fetch and repack'
it_behaves_like 'reschedules sync due to race condition instead of waiting for backfill'
describe '#execute' do
let(:url_to_repo) { "#{primary.url}#{project.full_path}.wiki.git" }
......@@ -109,7 +110,7 @@ RSpec.describe Geo::WikiSyncService do
end
it 'marks resync as true after a failure' do
subject.execute
described_class.new(project).execute
allow(repository).to receive(:fetch_as_mirror)
.with(url_to_repo, remote_name: 'geo', forced: true)
......
......@@ -50,7 +50,7 @@ end
shared_examples 'geo base sync fetch and repack' do
describe '#fetch_repository' do
let(:fetch_repository) { subject.send(:fetch_repository, false) }
let(:fetch_repository) { subject.send(:fetch_repository) }
before do
allow(subject).to receive(:fetch_geo_mirror).and_return(true)
......@@ -62,8 +62,9 @@ shared_examples 'geo base sync fetch and repack' do
fetch_repository
end
it 'updates registry' do
is_expected.to receive(:update_registry!)
it 'tells registry that sync will start now' do
registry = subject.send(:registry)
expect(registry).to receive(:start_sync!)
fetch_repository
end
......@@ -95,7 +96,7 @@ shared_examples 'geo base sync fetch and repack' do
end
shared_examples 'sync retries use the snapshot RPC' do
let(:retry_count) { Geo::BaseSyncService::RETRIES_BEFORE_REDOWNLOAD }
let(:retry_count) { Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD }
context 'snapshot synchronization method' do
before do
......@@ -141,3 +142,19 @@ shared_examples 'sync retries use the snapshot RPC' do
end
end
end
shared_examples 'reschedules sync due to race condition instead of waiting for backfill' do
describe '#mark_sync_as_successful' do
let(:mark_sync_as_successful) { subject.send(:mark_sync_as_successful) }
let(:registry) { subject.send(:registry) }
context 'when RepositoryUpdatedEvent was processed during a sync' do
it 'reschedules the sync' do
expect(::Geo::ProjectSyncWorker).to receive(:perform_async)
expect(registry).to receive(:finish_sync!).and_return(false)
mark_sync_as_successful
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment