Commit 14870d61 authored by Valery Sizov's avatar Valery Sizov Committed by Douglas Barbosa Alexandre

Create Design Registry Sync Service

We create a separate registry for designs despite the fact that this
is basically the same as wiki or regular repository. We aim to make them
separate so this can be considered as a first step
parent 108fdade
...@@ -20,6 +20,7 @@ ActiveSupport::Inflector.inflections do |inflect| ...@@ -20,6 +20,7 @@ ActiveSupport::Inflector.inflections do |inflect|
file_registry file_registry
job_artifact_registry job_artifact_registry
container_repository_registry container_repository_registry
design_registry
vulnerability_feedback vulnerability_feedback
vulnerabilities_feedback vulnerabilities_feedback
group_view group_view
......
# frozen_string_literal: true
class Geo::DesignRegistry < Geo::BaseRegistry
include ::Delay
RETRIES_BEFORE_REDOWNLOAD = 5
belongs_to :project
scope :failed, -> { with_state(:failed) }
scope :synced, -> { with_state(:synced) }
scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.now))) }
state_machine :state, initial: :pending do
state :started
state :synced
state :failed
state :pending
before_transition any => :started do |registry, _|
registry.last_synced_at = Time.now
end
before_transition any => :pending do |registry, _|
registry.retry_at = 0
registry.retry_count = 0
end
event :start_sync! do
transition [:synced, :failed, :pending] => :started
end
event :repository_updated! do
transition [:synced, :failed, :started] => :pending
end
end
def fail_sync!(message, error, attrs = {})
new_retry_count = retry_count + 1
attrs[:state] = :failed
attrs[:last_sync_failure] = "#{message}: #{error.message}"
attrs[:retry_count] = new_retry_count
attrs[:retry_at] = next_retry_time(new_retry_count)
update!(attrs)
end
# TODO: This method has to use optimistic locking to update state
def finish_sync!(missing_on_primary = false)
update!(
state: :synced,
missing_on_primary: missing_on_primary,
retry_count: 0,
last_sync_failure: nil,
retry_at: nil,
force_to_redownload: false
)
end
def should_be_redownloaded?
return true if force_to_redownload
retry_count > RETRIES_BEFORE_REDOWNLOAD
end
end
# frozen_string_literal: true
module Geo
class DesignRepositorySyncService < RepositoryBaseSyncService
self.type = :design
private
def sync_repository
return if Feature.disabled?(:enable_geo_design_sync)
start_registry_sync!
fetch_repository
mark_sync_as_successful
rescue Gitlab::Shell::Error, Gitlab::Git::BaseError => e
# In some cases repository does not exist, the only way to know about this is to parse the error text.
# If it does not exist we should consider it as successfully downloaded.
if e.message.include? Gitlab::GitAccess::ERROR_MESSAGES[:no_repo]
log_info('Design repository is not found, marking it as successfully synced')
mark_sync_as_successful(missing_on_primary: true)
else
fail_registry_sync!('Error syncing design repository', e)
end
rescue Gitlab::Git::Repository::NoRepository => e
log_info('Marking the design repository for a forced re-download')
fail_registry_sync!('Invalid design repository', e, force_to_redownload: true)
ensure
expire_repository_caches
end
def repository
project.design_repository
end
def ensure_repository
repository.create_if_not_exists
end
def expire_repository_caches
log_info('Expiring caches for design repository')
repository.after_sync
end
def fail_registry_sync!(message, error, attrs = {})
log_error(message, error)
registry.fail_sync!(message, error, attrs)
repository.clean_stale_repository_files
end
def start_registry_sync!
log_info("Marking design sync as started")
registry.start_sync!
end
def mark_sync_as_successful(missing_on_primary: false)
log_info("Marking design sync as successful")
registry.finish_sync!(missing_on_primary)
log_info("Finished design sync", download_time_s: download_time_in_seconds)
end
# rubocop: disable CodeReuse/ActiveRecord
def registry
@registry ||= Geo::DesignRegistry.find_or_initialize_by(project_id: project.id)
end
# rubocop: enable CodeReuse/ActiveRecord
def download_time_in_seconds
(Time.now.to_f - registry.last_synced_at.to_f).round(3)
end
def redownload?
registry.should_be_redownloaded?
end
end
end
# frozen_string_literal: true
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class AddDesignRegistry < ActiveRecord::Migration[5.2]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def change
create_table :design_registry, id: :serial, force: :cascade do |t|
t.integer :project_id, null: false
t.string :state, limit: 20
t.integer :retry_count, default: 0
t.string :last_sync_failure # rubocop:disable Migration/AddLimitToStringColumns
t.boolean :force_to_redownload
t.boolean :missing_on_primary
t.datetime :retry_at
t.datetime :last_synced_at
t.datetime :created_at, null: false
t.index :project_id, name: :index_design_registry_on_project_id, using: :btree
t.index :retry_at, name: :index_design_registry_on_retry_at, using: :btree
t.index :state, name: :index_design_registry_on_state, using: :btree
end
end
end
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
# #
# It's strongly recommended that you check this file into your version control system. # It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 2019_08_02_200655) do ActiveRecord::Schema.define(version: 2019_09_23_111102) do
# These are extensions that must be enabled in order to support this database # These are extensions that must be enabled in order to support this database
enable_extension "plpgsql" enable_extension "plpgsql"
...@@ -28,6 +28,21 @@ ActiveRecord::Schema.define(version: 2019_08_02_200655) do ...@@ -28,6 +28,21 @@ ActiveRecord::Schema.define(version: 2019_08_02_200655) do
t.index ["state"], name: "index_container_repository_registry_on_state" t.index ["state"], name: "index_container_repository_registry_on_state"
end end
create_table "design_registry", id: :serial, force: :cascade do |t|
t.integer "project_id", null: false
t.string "state", limit: 20
t.integer "retry_count", default: 0
t.string "last_sync_failure"
t.boolean "force_to_redownload"
t.boolean "missing_on_primary"
t.datetime "retry_at"
t.datetime "last_synced_at"
t.datetime "created_at", null: false
t.index ["project_id"], name: "index_design_registry_on_project_id"
t.index ["retry_at"], name: "index_design_registry_on_retry_at"
t.index ["state"], name: "index_design_registry_on_state"
end
create_table "event_log_states", primary_key: "event_id", force: :cascade do |t| create_table "event_log_states", primary_key: "event_id", force: :cascade do |t|
t.datetime "created_at", null: false t.datetime "created_at", null: false
end end
......
...@@ -7,10 +7,6 @@ FactoryBot.define do ...@@ -7,10 +7,6 @@ FactoryBot.define do
last_synced_at { nil } last_synced_at { nil }
state { :pending } state { :pending }
trait :started do
state { :started }
end
trait :synced do trait :synced do
state { :synced } state { :synced }
last_synced_at { 5.days.ago } last_synced_at { 5.days.ago }
......
# frozen_string_literal: true
FactoryBot.define do
factory :geo_design_registry, class: Geo::DesignRegistry do
project
last_sync_failure { nil }
last_synced_at { nil }
state { :pending }
trait :synced do
state { :synced }
last_synced_at { 5.days.ago }
end
trait :sync_failed do
state { :failed }
last_synced_at { 1.day.ago }
retry_count { 2 }
last_sync_failure { 'Random error' }
end
trait :sync_started do
state { :started }
last_synced_at { 1.day.ago }
retry_count { 0 }
end
end
end
...@@ -25,11 +25,8 @@ describe Geo::ContainerRepositoryRegistry, :geo do ...@@ -25,11 +25,8 @@ describe Geo::ContainerRepositoryRegistry, :geo do
end end
end end
describe '#start_sync!' do it_behaves_like 'a Geo registry' do
it 'updates last_synced_at' do let(:registry) { create(:container_repository_registry) }
expect { container_repository_registry.start_sync! }
.to change { container_repository_registry.reload.last_synced_at }
end
end end
describe '#finish_sync!' do describe '#finish_sync!' do
...@@ -46,29 +43,4 @@ describe Geo::ContainerRepositoryRegistry, :geo do ...@@ -46,29 +43,4 @@ describe Geo::ContainerRepositoryRegistry, :geo do
) )
end end
end end
describe '#fail_sync!' do
it 'fails registry record' do
error = StandardError.new('Something is wrong')
container_repository_registry.fail_sync!('Failed', error)
expect(container_repository_registry).to have_attributes(
retry_count: 1,
retry_at: be_present,
last_sync_failure: 'Failed: Something is wrong',
state: 'failed'
)
end
end
describe '#repository_updated!' do
set(:container_repository_registry) { create(:container_repository_registry, :synced) }
it 'resets the state of the sync' do
container_repository_registry.repository_updated!
expect(container_repository_registry.pending?).to be true
end
end
end end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::DesignRegistry, :geo do
set(:design_registry) { create(:geo_design_registry) }
describe 'relationships' do
it { is_expected.to belong_to(:project) }
end
it_behaves_like 'a Geo registry' do
let(:registry) { create(:geo_design_registry) }
end
describe '#finish_sync!' do
it 'finishes registry record' do
design_registry = create(:geo_design_registry, :sync_started)
design_registry.finish_sync!
expect(design_registry.reload).to have_attributes(
retry_count: 0,
retry_at: nil,
last_sync_failure: nil,
state: 'synced',
missing_on_primary: false,
force_to_redownload: false
)
end
end
describe '#should_be_redownloaded?' do
context 'when force_to_redownload is false' do
it 'returns false' do
expect(design_registry.should_be_redownloaded?).to be false
end
it 'returns true when limit is exceeded' do
design_registry.retry_count = Geo::DesignRegistry::RETRIES_BEFORE_REDOWNLOAD + 1
expect(design_registry.should_be_redownloaded?).to be true
end
end
context 'when force_to_redownload is true' do
it 'resets the state of the sync' do
design_registry.force_to_redownload = true
expect(design_registry.should_be_redownloaded?).to be true
end
end
end
end
...@@ -13,7 +13,7 @@ describe Geo::ContainerRepositorySyncService, :geo do ...@@ -13,7 +13,7 @@ describe Geo::ContainerRepositorySyncService, :geo do
end end
describe '#execute' do describe '#execute' do
let(:container_repository_registry) { create(:container_repository_registry, :started) } let(:container_repository_registry) { create(:container_repository_registry, :sync_started) }
it 'fails registry record if there was exception' do it 'fails registry record if there was exception' do
allow_any_instance_of(Geo::ContainerRepositorySync) allow_any_instance_of(Geo::ContainerRepositorySync)
......
# frozen_string_literal: true
require 'spec_helper'
describe Geo::DesignRepositorySyncService do
include ::EE::GeoHelpers
include ExclusiveLeaseHelpers
set(:primary) { create(:geo_node, :primary) }
set(:secondary) { create(:geo_node) }
set(:project) { create(:project_empty_repo) }
let(:repository) { project.design_repository }
let(:lease_key) { "geo_sync_service:design:#{project.id}" }
let(:lease_uuid) { 'uuid'}
subject { described_class.new(project) }
before do
stub_current_geo_node(secondary)
end
it_behaves_like 'geo base sync execution'
it_behaves_like 'geo base sync fetch'
describe '#execute' do
let(:url_to_repo) { "#{primary.url}#{project.full_path}.design.git" }
before do
stub_exclusive_lease(lease_key, lease_uuid)
stub_exclusive_lease("geo_project_housekeeping:#{project.id}")
allow_any_instance_of(Repository).to receive(:fetch_as_mirror)
.and_return(true)
allow_any_instance_of(Repository)
.to receive(:find_remote_root_ref)
.with('geo')
.and_return('master')
allow_any_instance_of(Geo::ProjectHousekeepingService).to receive(:execute)
.and_return(nil)
end
include_context 'lease handling'
it 'fetches project repository with JWT credentials' do
expect(repository).to receive(:with_config)
.with("http.#{url_to_repo}.extraHeader" => anything)
.once
.and_call_original
expect(repository).to receive(:fetch_as_mirror)
.with(url_to_repo, remote_name: 'geo', forced: true)
.once
subject.execute
end
it 'expires repository caches' do
expect_any_instance_of(Repository).to receive(:expire_all_method_caches).once
expect_any_instance_of(Repository).to receive(:expire_branch_cache).once
expect_any_instance_of(Repository).to receive(:expire_content_cache).once
subject.execute
end
it 'voids the failure message when it succeeds after an error' do
registry = create(:geo_design_registry, project: project, last_sync_failure: 'error')
expect { subject.execute }.to change { registry.reload.last_sync_failure}.to(nil)
end
it 'rescues when Gitlab::Shell::Error is raised' do
allow(repository).to receive(:fetch_as_mirror)
.with(url_to_repo, remote_name: 'geo', forced: true)
.and_raise(Gitlab::Shell::Error)
expect { subject.execute }.not_to raise_error
end
it 'rescues exception when Gitlab::Git::Repository::NoRepository is raised' do
allow(repository).to receive(:fetch_as_mirror)
.with(url_to_repo, remote_name: 'geo', forced: true)
.and_raise(Gitlab::Git::Repository::NoRepository)
expect { subject.execute }.not_to raise_error
end
it 'increases retry count when Gitlab::Git::Repository::NoRepository is raised' do
allow(repository).to receive(:fetch_as_mirror)
.with(url_to_repo, remote_name: 'geo', forced: true)
.and_raise(Gitlab::Git::Repository::NoRepository)
subject.execute
expect(Geo::DesignRegistry.last).to have_attributes(
retry_count: 1
)
end
it 'marks sync as successful if no repository found' do
registry = create(:geo_design_registry, project: project)
allow(repository).to receive(:fetch_as_mirror)
.with(url_to_repo, remote_name: 'geo', forced: true)
.and_raise(Gitlab::Shell::Error.new(Gitlab::GitAccess::ERROR_MESSAGES[:no_repo]))
subject.execute
expect(registry.reload).to have_attributes(
state: 'synced',
missing_on_primary: true
)
end
it 'marks resync as true after a failure' do
described_class.new(project).execute
expect(Geo::DesignRegistry.last.state).to eq 'synced'
allow(repository).to receive(:fetch_as_mirror)
.with(url_to_repo, remote_name: 'geo', forced: true)
.and_raise(Gitlab::Git::Repository::NoRepository)
subject.execute
expect(Geo::DesignRegistry.last.state).to eq 'failed'
end
it_behaves_like 'sync retries use the snapshot RPC' do
let(:repository) { project.design_repository }
let(:retry_count) { Geo::DesignRegistry::RETRIES_BEFORE_REDOWNLOAD }
def registry_with_retry_count(retries)
create(:geo_design_registry, project: project, retry_count: retries)
end
end
end
end
...@@ -43,6 +43,8 @@ describe Geo::RepositorySyncService do ...@@ -43,6 +43,8 @@ describe Geo::RepositorySyncService do
.and_return(nil) .and_return(nil)
end end
include_context 'lease handling'
it 'fetches project repository with JWT credentials' do it 'fetches project repository with JWT credentials' do
expect(repository).to receive(:with_config) expect(repository).to receive(:with_config)
.with("http.#{url_to_repo}.extraHeader" => anything) .with("http.#{url_to_repo}.extraHeader" => anything)
...@@ -64,36 +66,12 @@ describe Geo::RepositorySyncService do ...@@ -64,36 +66,12 @@ describe Geo::RepositorySyncService do
subject.execute subject.execute
end end
it 'returns the lease when succeed' do
expect_to_cancel_exclusive_lease(lease_key, lease_uuid)
subject.execute
end
it 'voids the failure message when it succeeds after an error' do it 'voids the failure message when it succeeds after an error' do
registry = create(:geo_project_registry, project: project, last_repository_sync_failure: 'error') registry = create(:geo_project_registry, project: project, last_repository_sync_failure: 'error')
expect { subject.execute }.to change { registry.reload.last_repository_sync_failure}.to(nil) expect { subject.execute }.to change { registry.reload.last_repository_sync_failure}.to(nil)
end end
it 'returns the lease when sync fail' do
allow(repository).to receive(:fetch_as_mirror)
.with(url_to_repo, remote_name: 'geo', forced: true)
.and_raise(Gitlab::Shell::Error)
expect_to_cancel_exclusive_lease(lease_key, lease_uuid)
subject.execute
end
it 'does not fetch project repository if cannot obtain a lease' do
stub_exclusive_lease_taken(lease_key)
expect(repository).not_to receive(:fetch_as_mirror)
subject.execute
end
it 'rescues when Gitlab::Shell::Error is raised' do it 'rescues when Gitlab::Shell::Error is raised' do
allow(repository).to receive(:fetch_as_mirror) allow(repository).to receive(:fetch_as_mirror)
.with(url_to_repo, remote_name: 'geo', forced: true) .with(url_to_repo, remote_name: 'geo', forced: true)
...@@ -425,6 +403,11 @@ describe Geo::RepositorySyncService do ...@@ -425,6 +403,11 @@ describe Geo::RepositorySyncService do
it_behaves_like 'sync retries use the snapshot RPC' do it_behaves_like 'sync retries use the snapshot RPC' do
let(:repository) { project.repository } let(:repository) { project.repository }
let(:retry_count) { Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD }
def registry_with_retry_count(retries)
create(:geo_project_registry, project: project, repository_retry_count: retries, wiki_retry_count: retries)
end
end end
end end
......
...@@ -34,6 +34,8 @@ RSpec.describe Geo::WikiSyncService do ...@@ -34,6 +34,8 @@ RSpec.describe Geo::WikiSyncService do
.and_return(true) .and_return(true)
end end
include_context 'lease handling'
it 'fetches wiki repository with JWT credentials' do it 'fetches wiki repository with JWT credentials' do
expect(repository).to receive(:with_config).with("http.#{url_to_repo}.extraHeader" => anything).and_call_original expect(repository).to receive(:with_config).with("http.#{url_to_repo}.extraHeader" => anything).and_call_original
expect(repository).to receive(:fetch_as_mirror) expect(repository).to receive(:fetch_as_mirror)
...@@ -43,26 +45,12 @@ RSpec.describe Geo::WikiSyncService do ...@@ -43,26 +45,12 @@ RSpec.describe Geo::WikiSyncService do
subject.execute subject.execute
end end
it 'releases lease' do
expect_to_cancel_exclusive_lease(lease_key, lease_uuid)
subject.execute
end
it 'voids the failure message when it succeeds after an error' do it 'voids the failure message when it succeeds after an error' do
registry = create(:geo_project_registry, project: project, last_wiki_sync_failure: 'error') registry = create(:geo_project_registry, project: project, last_wiki_sync_failure: 'error')
expect { subject.execute }.to change { registry.reload.last_wiki_sync_failure }.to(nil) expect { subject.execute }.to change { registry.reload.last_wiki_sync_failure }.to(nil)
end end
it 'does not fetch wiki repository if cannot obtain a lease' do
stub_exclusive_lease_taken(lease_key)
expect(repository).not_to receive(:fetch_as_mirror)
subject.execute
end
it 'rescues exception when Gitlab::Shell::Error is raised' do it 'rescues exception when Gitlab::Shell::Error is raised' do
allow(repository).to receive(:fetch_as_mirror) allow(repository).to receive(:fetch_as_mirror)
.with(url_to_repo, remote_name: 'geo', forced: true) .with(url_to_repo, remote_name: 'geo', forced: true)
...@@ -241,6 +229,11 @@ RSpec.describe Geo::WikiSyncService do ...@@ -241,6 +229,11 @@ RSpec.describe Geo::WikiSyncService do
it_behaves_like 'sync retries use the snapshot RPC' do it_behaves_like 'sync retries use the snapshot RPC' do
let(:repository) { project.wiki.repository } let(:repository) { project.wiki.repository }
let(:retry_count) { Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD }
def registry_with_retry_count(retries)
create(:geo_project_registry, project: project, repository_retry_count: retries, wiki_retry_count: retries)
end
end end
end end
end end
...@@ -94,8 +94,6 @@ shared_examples 'geo base sync fetch' do ...@@ -94,8 +94,6 @@ shared_examples 'geo base sync fetch' do
end end
shared_examples 'sync retries use the snapshot RPC' do shared_examples 'sync retries use the snapshot RPC' do
let(:retry_count) { Geo::ProjectRegistry::RETRIES_BEFORE_REDOWNLOAD }
context 'snapshot synchronization method' do context 'snapshot synchronization method' do
before do before do
allow(subject).to receive(:temp_repo) { repository } allow(subject).to receive(:temp_repo) { repository }
...@@ -113,7 +111,7 @@ shared_examples 'sync retries use the snapshot RPC' do ...@@ -113,7 +111,7 @@ shared_examples 'sync retries use the snapshot RPC' do
end end
it 'does not attempt to snapshot for ordinary retries' do it 'does not attempt to snapshot for ordinary retries' do
create(:geo_project_registry, project: project, repository_retry_count: retry_count - 1, wiki_retry_count: retry_count - 1) registry_with_retry_count(retry_count - 1)
expect(repository).not_to receive_create_from_snapshot expect(repository).not_to receive_create_from_snapshot
expect(subject).to receive(:fetch_geo_mirror).with(repository) expect(subject).to receive(:fetch_geo_mirror).with(repository)
...@@ -122,7 +120,7 @@ shared_examples 'sync retries use the snapshot RPC' do ...@@ -122,7 +120,7 @@ shared_examples 'sync retries use the snapshot RPC' do
end end
context 'registry is ready to be snapshotted' do context 'registry is ready to be snapshotted' do
let!(:registry) { create(:geo_project_registry, project: project, repository_retry_count: retry_count + 1, wiki_retry_count: retry_count + 1) } let!(:registry) { registry_with_retry_count(retry_count + 1) }
it 'attempts to snapshot' do it 'attempts to snapshot' do
expect(repository).to receive_create_from_snapshot expect(repository).to receive_create_from_snapshot
...@@ -156,3 +154,29 @@ shared_examples 'reschedules sync due to race condition instead of waiting for b ...@@ -156,3 +154,29 @@ shared_examples 'reschedules sync due to race condition instead of waiting for b
end end
end end
end end
shared_context 'lease handling' do
it 'returns the lease when succeed' do
expect_to_cancel_exclusive_lease(lease_key, lease_uuid)
subject.execute
end
it 'returns the lease when sync fail' do
allow(repository).to receive(:fetch_as_mirror)
.with(url_to_repo, remote_name: 'geo', forced: true)
.and_raise(Gitlab::Shell::Error)
expect_to_cancel_exclusive_lease(lease_key, lease_uuid)
subject.execute
end
it 'does not fetch project repository if cannot obtain a lease' do
stub_exclusive_lease_taken(lease_key)
expect(repository).not_to receive(:fetch_as_mirror)
subject.execute
end
end
# frozen_string_literal: true
shared_examples_for 'a Geo registry' do
describe '#start_sync!' do
it 'updates last_synced_at' do
expect { registry.start_sync! }
.to change { registry.reload.last_synced_at }
end
end
describe '#fail_sync!' do
it 'fails registry record' do
error = StandardError.new('Something is wrong')
registry.fail_sync!('Failed', error)
expect(registry).to have_attributes(
retry_count: 1,
retry_at: be_present,
last_sync_failure: 'Failed: Something is wrong',
state: 'failed'
)
end
end
describe '#repository_updated!' do
it 'resets the state of the sync' do
registry.state = :synced
registry.repository_updated!
expect(registry.pending?).to be true
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment