Commit 6aa71d9d authored by Michael Kozono's avatar Michael Kozono

Make ProjectRegistry responsible for its own sync state

parent f6dfba53
class Geo::ProjectRegistry < Geo::BaseRegistry class Geo::ProjectRegistry < Geo::BaseRegistry
include ::Delay
include ::EachBatch include ::EachBatch
include ::IgnorableColumn include ::IgnorableColumn
include ::ShaAttribute include ::ShaAttribute
RETRIES_BEFORE_REDOWNLOAD = 5
ignore_column :last_repository_verification_at ignore_column :last_repository_verification_at
ignore_column :last_repository_verification_failed ignore_column :last_repository_verification_failed
ignore_column :last_wiki_verification_at ignore_column :last_wiki_verification_at
...@@ -52,6 +55,55 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -52,6 +55,55 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
) )
end end
def start_sync!(type)
new_count = retry_count(type) + 1
update!(
"last_#{type}_synced_at" => Time.now,
"#{type}_retry_count" => new_count,
"#{type}_retry_at" => next_retry_time(new_count))
end
def finish_sync!(type)
update!(
# Indicate that the sync succeeded
"last_#{type}_successful_sync_at" => Time.now,
"resync_#{type}" => false,
"#{type}_retry_count" => nil,
"#{type}_retry_at" => nil,
"force_to_redownload_#{type}" => false,
"last_#{type}_sync_failure" => nil,
# Indicate that repository verification needs to be done again
"#{type}_verification_checksum_sha" => nil,
"#{type}_checksum_mismatch" => false,
"last_#{type}_verification_failure" => nil)
end
def fail_sync!(type, message, error, attrs = {})
attrs["resync_#{type}"] = true
attrs["last_#{type}_sync_failure"] = "#{message}: #{error.message}"
attrs["#{type}_retry_count"] = retry_count(type) + 1
update!(attrs)
end
def repository_created!(repository_created_event)
update!(resync_repository: true,
resync_wiki: repository_created_event.wiki_path.present?)
end
def repository_updated!(repository_updated_event, scheduled_at)
type = repository_updated_event.source
update!(
"resync_#{type}" => true,
"#{type}_verification_checksum_sha" => nil,
"#{type}_checksum_mismatch" => false,
"last_#{type}_verification_failure" => nil,
"resync_#{type}_was_scheduled_at" => scheduled_at)
end
def repository_sync_due?(scheduled_time) def repository_sync_due?(scheduled_time)
never_synced_repository? || repository_sync_needed?(scheduled_time) never_synced_repository? || repository_sync_needed?(scheduled_time)
end end
...@@ -78,6 +130,12 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -78,6 +130,12 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
Gitlab::Redis::SharedState.with { |redis| redis.set(fetches_since_gc_redis_key, value) } Gitlab::Redis::SharedState.with { |redis| redis.set(fetches_since_gc_redis_key, value) }
end end
def should_be_retried?(type)
return false if public_send("force_to_redownload_#{type}") # rubocop:disable GitlabSecurity/PublicSend
retry_count(type) <= RETRIES_BEFORE_REDOWNLOAD
end
private private
def fetches_since_gc_redis_key def fetches_since_gc_redis_key
...@@ -105,4 +163,17 @@ class Geo::ProjectRegistry < Geo::BaseRegistry ...@@ -105,4 +163,17 @@ class Geo::ProjectRegistry < Geo::BaseRegistry
last_wiki_synced_at && timestamp > last_wiki_synced_at last_wiki_synced_at && timestamp > last_wiki_synced_at
end end
# To prevent the retry time from storing invalid dates in the database,
# cap the max time to a week plus some random jitter value.
def next_retry_time(retry_count)
proposed_time = Time.now + delay(retry_count).seconds
max_future_time = Time.now + 7.days + delay(1).seconds
[proposed_time, max_future_time].min
end
def retry_count(type)
public_send("#{type}_retry_count") || -1 # rubocop:disable GitlabSecurity/PublicSend
end
end end
...@@ -16,7 +16,6 @@ module Geo ...@@ -16,7 +16,6 @@ module Geo
GEO_REMOTE_NAME = 'geo'.freeze GEO_REMOTE_NAME = 'geo'.freeze
LEASE_TIMEOUT = 8.hours.freeze LEASE_TIMEOUT = 8.hours.freeze
LEASE_KEY_PREFIX = 'geo_sync_service'.freeze LEASE_KEY_PREFIX = 'geo_sync_service'.freeze
RETRIES_BEFORE_REDOWNLOAD = 5
def initialize(project) def initialize(project)
@project = project @project = project
...@@ -26,7 +25,7 @@ module Geo ...@@ -26,7 +25,7 @@ module Geo
try_obtain_lease do try_obtain_lease do
log_info("Started #{type} sync") log_info("Started #{type} sync")
if should_be_retried? if registry.should_be_retried?(type)
sync_repository sync_repository
else else
sync_repository(true) sync_repository(true)
...@@ -50,7 +49,9 @@ module Geo ...@@ -50,7 +49,9 @@ module Geo
log_info("Trying to fetch #{type}") log_info("Trying to fetch #{type}")
clean_up_temporary_repository clean_up_temporary_repository
update_registry!(started_at: DateTime.now) log_info("Marking #{type} sync as started")
registry.start_sync!(type)
if redownload if redownload
redownload_repository redownload_repository
...@@ -84,16 +85,6 @@ module Geo ...@@ -84,16 +85,6 @@ module Geo
fetch_geo_mirror(temp_repo) fetch_geo_mirror(temp_repo)
end end
def retry_count
registry.public_send("#{type}_retry_count") || -1 # rubocop:disable GitlabSecurity/PublicSend
end
def should_be_retried?
return false if registry.public_send("force_to_redownload_#{type}") # rubocop:disable GitlabSecurity/PublicSend
retry_count <= RETRIES_BEFORE_REDOWNLOAD
end
def current_node def current_node
::Gitlab::Geo.current_node ::Gitlab::Geo.current_node
end end
...@@ -132,40 +123,10 @@ module Geo ...@@ -132,40 +123,10 @@ module Geo
@registry ||= Geo::ProjectRegistry.find_or_initialize_by(project_id: project.id) @registry ||= Geo::ProjectRegistry.find_or_initialize_by(project_id: project.id)
end end
def update_registry!(started_at: nil, finished_at: nil, attrs: {})
return unless started_at || finished_at
log_info("Updating #{type} sync information")
if started_at
attrs["last_#{type}_synced_at"] = started_at
attrs["#{type}_retry_count"] = retry_count + 1
attrs["#{type}_retry_at"] = next_retry_time(attrs["#{type}_retry_count"])
end
if finished_at
attrs["last_#{type}_successful_sync_at"] = finished_at
attrs["resync_#{type}"] = false
attrs["#{type}_retry_count"] = nil
attrs["#{type}_retry_at"] = nil
attrs["force_to_redownload_#{type}"] = false
# Indicate that repository verification needs to be done again
attrs["#{type}_verification_checksum_sha"] = nil
attrs["#{type}_checksum_mismatch"] = false
attrs["last_#{type}_verification_failure"] = nil
end
registry.update!(attrs)
end
def fail_registry!(message, error, attrs = {}) def fail_registry!(message, error, attrs = {})
log_error(message, error) log_error(message, error)
attrs["resync_#{type}"] = true registry.fail_sync!(type, message, error, attrs)
attrs["last_#{type}_sync_failure"] = "#{message}: #{error.message}"
attrs["#{type}_retry_count"] = retry_count + 1
registry.update!(attrs)
repository.clean_stale_repository_files repository.clean_stale_repository_files
end end
...@@ -259,14 +220,5 @@ module Geo ...@@ -259,14 +220,5 @@ module Geo
File.dirname(disk_path) File.dirname(disk_path)
) )
end end
# To prevent the retry time from storing invalid dates in the database,
# cap the max time to a week plus some random jitter value.
def next_retry_time(retry_count)
proposed_time = Time.now + delay(retry_count).seconds
max_future_time = Time.now + 7.days + delay(1).seconds
[proposed_time, max_future_time].min
end
end end
end end
...@@ -33,7 +33,9 @@ module Geo ...@@ -33,7 +33,9 @@ module Geo
end end
def mark_sync_as_successful def mark_sync_as_successful
update_registry!(finished_at: DateTime.now, attrs: { last_repository_sync_failure: nil }) log_info("Marking #{type} sync as successful")
registry.finish_sync!(type)
log_info('Finished repository sync', log_info('Finished repository sync',
update_delay_s: update_delay_in_seconds, update_delay_s: update_delay_in_seconds,
......
...@@ -45,7 +45,9 @@ module Geo ...@@ -45,7 +45,9 @@ module Geo
end end
def mark_sync_as_successful def mark_sync_as_successful
update_registry!(finished_at: DateTime.now, attrs: { last_wiki_sync_failure: nil }) log_info("Marking #{type} sync as successful")
registry.finish_sync!(type)
log_info('Finished wiki sync', log_info('Finished wiki sync',
update_delay_s: update_delay_in_seconds, update_delay_s: update_delay_in_seconds,
......
...@@ -16,7 +16,7 @@ module Gitlab ...@@ -16,7 +16,7 @@ module Gitlab
attr_reader :event, :created_at, :logger attr_reader :event, :created_at, :logger
def registry def registry
@registry ||= find_or_initialize_registry @registry ||= ::Geo::ProjectRegistry.find_or_initialize_by(project_id: event.project_id)
end end
def skippable? def skippable?
...@@ -32,12 +32,6 @@ module Gitlab ...@@ -32,12 +32,6 @@ module Gitlab
def enqueue_job_if_shard_healthy(event) def enqueue_job_if_shard_healthy(event)
yield if healthy_shard_for?(event) yield if healthy_shard_for?(event)
end end
def find_or_initialize_registry(attrs = nil)
::Geo::ProjectRegistry.find_or_initialize_by(project_id: event.project_id).tap do |registry|
registry.assign_attributes(attrs)
end
end
end end
end end
end end
......
...@@ -7,7 +7,7 @@ module Gitlab ...@@ -7,7 +7,7 @@ module Gitlab
def process def process
log_event log_event
registry.save! registry.repository_created!(event)
enqueue_job_if_shard_healthy(event) do enqueue_job_if_shard_healthy(event) do
::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now) ::Geo::ProjectSyncWorker.perform_async(event.project_id, Time.now)
...@@ -16,12 +16,6 @@ module Gitlab ...@@ -16,12 +16,6 @@ module Gitlab
private private
def registry
@registry ||= find_or_initialize_registry(
resync_repository: true,
resync_wiki: event.wiki_path.present?)
end
def log_event def log_event
logger.event_info( logger.event_info(
created_at, created_at,
......
...@@ -6,7 +6,7 @@ module Gitlab ...@@ -6,7 +6,7 @@ module Gitlab
include BaseEvent include BaseEvent
def process def process
registry.save! registry.repository_updated!(event, scheduled_at)
job_id = enqueue_job_if_shard_healthy(event) do job_id = enqueue_job_if_shard_healthy(event) do
::Geo::ProjectSyncWorker.perform_async(event.project_id, scheduled_at) ::Geo::ProjectSyncWorker.perform_async(event.project_id, scheduled_at)
...@@ -17,16 +17,6 @@ module Gitlab ...@@ -17,16 +17,6 @@ module Gitlab
private private
def registry
@registry ||= find_or_initialize_registry(
"resync_#{event.source}" => true,
"#{event.source}_verification_checksum_sha" => nil,
"#{event.source}_checksum_mismatch" => false,
"last_#{event.source}_verification_failure" => nil,
"resync_#{event.source}_was_scheduled_at" => scheduled_at
)
end
def log_event(job_id) def log_event(job_id)
logger.event_info( logger.event_info(
created_at, created_at,
......
This diff is collapsed.
...@@ -248,7 +248,7 @@ describe Geo::RepositorySyncService do ...@@ -248,7 +248,7 @@ describe Geo::RepositorySyncService do
context 'retries' do context 'retries' do
it 'tries to fetch repo' do it 'tries to fetch repo' do
create(:geo_project_registry, project: project, repository_retry_count: Geo::BaseSyncService::RETRIES_BEFORE_REDOWNLOAD - 1) create(:geo_project_registry, project: project, repository_retry_count: Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD - 1)
expect(subject).to receive(:sync_repository).with(no_args) expect(subject).to receive(:sync_repository).with(no_args)
...@@ -256,7 +256,7 @@ describe Geo::RepositorySyncService do ...@@ -256,7 +256,7 @@ describe Geo::RepositorySyncService do
end end
it 'sets the redownload flag to false after success' do it 'sets the redownload flag to false after success' do
registry = create(:geo_project_registry, project: project, repository_retry_count: Geo::BaseSyncService::RETRIES_BEFORE_REDOWNLOAD + 1, force_to_redownload_repository: true) registry = create(:geo_project_registry, project: project, repository_retry_count: Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD + 1, force_to_redownload_repository: true)
subject.execute subject.execute
...@@ -264,7 +264,7 @@ describe Geo::RepositorySyncService do ...@@ -264,7 +264,7 @@ describe Geo::RepositorySyncService do
end end
it 'tries to redownload repo' do it 'tries to redownload repo' do
create(:geo_project_registry, project: project, repository_retry_count: Geo::BaseSyncService::RETRIES_BEFORE_REDOWNLOAD + 1) create(:geo_project_registry, project: project, repository_retry_count: Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD + 1)
expect(subject).to receive(:sync_repository).with(true).and_call_original expect(subject).to receive(:sync_repository).with(true).and_call_original
expect(subject.gitlab_shell).to receive(:mv_repository).exactly(2).times.and_call_original expect(subject.gitlab_shell).to receive(:mv_repository).exactly(2).times.and_call_original
...@@ -290,7 +290,7 @@ describe Geo::RepositorySyncService do ...@@ -290,7 +290,7 @@ describe Geo::RepositorySyncService do
create( create(
:geo_project_registry, :geo_project_registry,
project: project, project: project,
repository_retry_count: Geo::BaseSyncService::RETRIES_BEFORE_REDOWNLOAD - 1, repository_retry_count: Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD - 1,
force_to_redownload_repository: true force_to_redownload_repository: true
) )
...@@ -303,7 +303,7 @@ describe Geo::RepositorySyncService do ...@@ -303,7 +303,7 @@ describe Geo::RepositorySyncService do
create( create(
:geo_project_registry, :geo_project_registry,
project: project, project: project,
repository_retry_count: Geo::BaseSyncService::RETRIES_BEFORE_REDOWNLOAD - 1, repository_retry_count: Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD - 1,
force_to_redownload_repository: true force_to_redownload_repository: true
) )
...@@ -319,7 +319,7 @@ describe Geo::RepositorySyncService do ...@@ -319,7 +319,7 @@ describe Geo::RepositorySyncService do
registry = create( registry = create(
:geo_project_registry, :geo_project_registry,
project: project, project: project,
repository_retry_count: Geo::BaseSyncService::RETRIES_BEFORE_REDOWNLOAD + 2000, repository_retry_count: Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD + 2000,
repository_retry_at: timestamp, repository_retry_at: timestamp,
force_to_redownload_repository: true force_to_redownload_repository: true
) )
......
...@@ -62,8 +62,9 @@ shared_examples 'geo base sync fetch and repack' do ...@@ -62,8 +62,9 @@ shared_examples 'geo base sync fetch and repack' do
fetch_repository fetch_repository
end end
it 'updates registry' do it 'tells registry that sync will start now' do
is_expected.to receive(:update_registry!) registry = subject.send(:registry)
expect(registry).to receive(:start_sync!)
fetch_repository fetch_repository
end end
...@@ -95,7 +96,7 @@ shared_examples 'geo base sync fetch and repack' do ...@@ -95,7 +96,7 @@ shared_examples 'geo base sync fetch and repack' do
end end
shared_examples 'sync retries use the snapshot RPC' do shared_examples 'sync retries use the snapshot RPC' do
let(:retry_count) { Geo::BaseSyncService::RETRIES_BEFORE_REDOWNLOAD } let(:retry_count) { Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD }
context 'snapshot synchronization method' do context 'snapshot synchronization method' do
before do before do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment