Commit d683b264 authored by Stan Hu's avatar Stan Hu Committed by Nick Thomas

Geo: Increase parallelism by scheduling project repositories by shard

parent 838c8682
...@@ -69,6 +69,9 @@ module Geo ...@@ -69,6 +69,9 @@ module Geo
private private
def worker_metadata
end
def db_retrieve_batch_size def db_retrieve_batch_size
DB_RETRIEVE_BATCH_SIZE DB_RETRIEVE_BATCH_SIZE
end end
...@@ -119,7 +122,8 @@ module Geo ...@@ -119,7 +122,8 @@ module Geo
end end
def schedule_jobs def schedule_jobs
num_to_schedule = [max_capacity - scheduled_job_ids.size, pending_resources.size].min capacity = max_capacity
num_to_schedule = [capacity - scheduled_job_ids.size, pending_resources.size].min
to_schedule = pending_resources.shift(num_to_schedule) to_schedule = pending_resources.shift(num_to_schedule)
scheduled = to_schedule.map do |args| scheduled = to_schedule.map do |args|
...@@ -129,7 +133,7 @@ module Geo ...@@ -129,7 +133,7 @@ module Geo
scheduled_jobs.concat(scheduled) scheduled_jobs.concat(scheduled)
log_info("Loop #{loops}", enqueued: scheduled.length, pending: pending_resources.length, scheduled: scheduled_jobs.length) log_info("Loop #{loops}", enqueued: scheduled.length, pending: pending_resources.length, scheduled: scheduled_jobs.length, capacity: capacity)
end end
def scheduled_job_ids def scheduled_job_ids
...@@ -152,11 +156,13 @@ module Geo ...@@ -152,11 +156,13 @@ module Geo
def log_info(message, extra_args = {}) def log_info(message, extra_args = {})
args = { class: self.class.name, message: message }.merge(extra_args) args = { class: self.class.name, message: message }.merge(extra_args)
args.merge!(worker_metadata) if worker_metadata
Gitlab::Geo::Logger.info(args) Gitlab::Geo::Logger.info(args)
end end
def log_error(message, extra_args = {}) def log_error(message, extra_args = {})
args = { class: self.class.name, message: message }.merge(extra_args) args = { class: self.class.name, message: message }.merge(extra_args)
args.merge!(worker_metadata) if worker_metadata
Gitlab::Geo::Logger.error(args) Gitlab::Geo::Logger.error(args)
end end
end end
......
module Geo
class RepositoryShardSyncWorker < Geo::BaseSchedulerWorker
# We may have many long-running threads, so split them out
# into their own queue to make it possible for other jobs to run.
sidekiq_options queue: :geo_repository_shard_sync, retry: false
attr_accessor :shard_name
def perform(shard_name)
@shard_name = shard_name
return unless Gitlab::Geo::ShardHealthCache.healthy_shard?(shard_name)
super()
end
private
def worker_metadata
{ shard: shard_name }
end
# We need a custom key here since we are running one worker per shard
def lease_key
@lease_key ||= "#{self.class.name.underscore}:shard:#{shard_name}"
end
def max_capacity
healthy_count = Gitlab::Geo::ShardHealthCache.healthy_shard_count
# If we don't have a count, that means that for some reason
# RepositorySyncWorker stopped running/updating the cache. We might
# be trying to shut down Geo while this job may still be running.
return 0 unless healthy_count.to_i > 0
capacity_per_shard = current_node.repos_max_capacity / healthy_count
[1, capacity_per_shard.to_i].max
end
def schedule_job(project_id)
job_id = Geo::ProjectSyncWorker.perform_async(project_id, Time.now)
{ id: project_id, job_id: job_id } if job_id
end
def finder
@finder ||= ProjectRegistryFinder.new(current_node: current_node)
end
def load_pending_resources
resources = find_project_ids_not_synced(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.size
if remaining_capacity.zero?
resources
else
resources + find_project_ids_updated_recently(batch_size: remaining_capacity)
end
end
def find_project_ids_not_synced(batch_size:)
shard_restriction(finder.find_unsynced_projects(batch_size: batch_size))
.reorder(last_repository_updated_at: :desc)
.pluck(:id)
end
def find_project_ids_updated_recently(batch_size:)
shard_restriction(finder.find_projects_updated_recently(batch_size: batch_size))
.order(Gitlab::Database.nulls_first_order(:last_repository_updated_at, :desc))
.pluck(:id)
end
def shard_restriction(relation)
relation.where(repository_storage: shard_name)
end
end
end
module Geo module Geo
class RepositorySyncWorker < Geo::BaseSchedulerWorker class RepositorySyncWorker
private include ApplicationWorker
include CronjobQueue
def max_capacity def perform
current_node.repos_max_capacity return unless Gitlab::Geo.geo_database_configured?
end return unless Gitlab::Geo.secondary?
def schedule_job(project_id)
job_id = Geo::ProjectSyncWorker.perform_async(project_id, Time.now)
{ id: project_id, job_id: job_id } if job_id shards = healthy_shards
end
def finder Gitlab::Geo::ShardHealthCache.update(shards)
@finder ||= ProjectRegistryFinder.new(current_node: current_node)
end
def load_pending_resources shards.each do |shard_name|
resources = find_project_ids_not_synced(batch_size: db_retrieve_batch_size) RepositoryShardSyncWorker.perform_async(shard_name)
remaining_capacity = db_retrieve_batch_size - resources.size
if remaining_capacity.zero?
resources
else
resources + find_project_ids_updated_recently(batch_size: remaining_capacity)
end end
end end
def find_project_ids_not_synced(batch_size:)
healthy_shards_restriction(finder.find_unsynced_projects(batch_size: batch_size))
.reorder(last_repository_updated_at: :desc)
.pluck(:id)
end
def find_project_ids_updated_recently(batch_size:)
healthy_shards_restriction(finder.find_projects_updated_recently(batch_size: batch_size))
.order(Gitlab::Database.nulls_first_order(:last_repository_updated_at, :desc))
.pluck(:id)
end
def healthy_shards_restriction(relation)
configured = Gitlab.config.repositories.storages.keys
referenced = Project.distinct(:repository_storage).pluck(:repository_storage)
healthy = healthy_shards
known = configured | referenced
return relation if (known - healthy).empty?
relation.where(repository_storage: healthy)
end
def healthy_shards def healthy_shards
Gitlab::HealthChecks::FsShardsCheck Gitlab::HealthChecks::FsShardsCheck
.readiness .readiness
......
---
title: 'Geo: Increase parallelism by scheduling project repositories by shard'
merge_request: 3606
author:
type: added
...@@ -77,6 +77,7 @@ ...@@ -77,6 +77,7 @@
- [admin_emails, 1] - [admin_emails, 1]
- [geo_project_sync, 1] - [geo_project_sync, 1]
- [geo_file_download, 1] - [geo_file_download, 1]
- [geo_repository_shard_sync, 1]
- [elastic_batch_project_indexer, 1] - [elastic_batch_project_indexer, 1]
- [elastic_indexer, 1] - [elastic_indexer, 1]
- [elastic_commit_indexer, 1] - [elastic_commit_indexer, 1]
......
module Gitlab
module Geo
class ShardHealthCache
HEALTHY_SHARDS_KEY = 'gitlab-geo-healthy-shards'.freeze
HEALTHY_SHARDS_TIMEOUT = 300
# Clears the Redis set storing the list of healthy shards
def self.clear
Gitlab::Redis::Cache.with { |redis| redis.del(HEALTHY_SHARDS_KEY) }
end
# Updates the list of healthy shards using a Redis set
#
# shards - An array of shard names to store
def self.update(shards)
Gitlab::Redis::Cache.with do |redis|
redis.multi do |m|
m.del(HEALTHY_SHARDS_KEY)
shards.each { |shard_name| m.sadd(HEALTHY_SHARDS_KEY, shard_name) }
m.expire(HEALTHY_SHARDS_KEY, HEALTHY_SHARDS_TIMEOUT)
end
end
end
# Returns an array of strings of healthy shards
def self.cached_healthy_shards
Gitlab::Redis::Cache.with { |redis| redis.smembers(HEALTHY_SHARDS_KEY) }
end
# Checks whether the given shard name is in the list of healthy shards.
#
# shard_name - The string to check
def self.healthy_shard?(shard_name)
Gitlab::Redis::Cache.with { |redis| redis.sismember(HEALTHY_SHARDS_KEY, shard_name) }
end
# Returns the number of healthy shards in the Redis set
def self.healthy_shard_count
Gitlab::Redis::Cache.with { |redis| redis.scard(HEALTHY_SHARDS_KEY) }
end
end
end
end
require 'spec_helper'
describe Gitlab::Geo::ShardHealthCache, :clean_gitlab_redis_cache do
let(:shards) { %w(foo bar) }
before do
described_class.update(shards)
end
describe '.clear' do
it 'leaves no shards around' do
described_class.clear
expect(described_class.healthy_shard_count).to eq(0)
end
end
describe '.update' do
it 'returns the healthy shards' do
expect(described_class.cached_healthy_shards).to match_array(shards)
end
it 'replaces the existing set' do
new_set = %w(test me more)
described_class.update(new_set)
expect(described_class.cached_healthy_shards).to match_array(new_set)
end
end
describe '.healthy_shard_count' do
it 'returns the healthy shard count' do
expect(described_class.healthy_shard_count).to eq(2)
end
it 'returns 0 if no shards are available' do
described_class.update([])
expect(described_class.healthy_shard_count).to eq(0)
end
end
describe '.healthy_shard?' do
it 'returns true for a healthy shard' do
expect(described_class.healthy_shard?('foo')).to be_truthy
end
it 'returns false for an unknown shard' do
expect(described_class.healthy_shard?('unknown')).to be_falsey
end
end
end
require 'spec_helper'
# Disable transactions via :truncate method because a foreign table
# can't see changes inside a transaction of a different connection.
describe Geo::RepositoryShardSyncWorker, :geo, :truncate, :clean_gitlab_redis_cache do
include ::EE::GeoHelpers
let!(:primary) { create(:geo_node, :primary) }
let!(:secondary) { create(:geo_node) }
let!(:synced_group) { create(:group) }
let!(:project_in_synced_group) { create(:project, group: synced_group) }
let!(:unsynced_project) { create(:project) }
let(:shard_name) { Gitlab.config.repositories.storages.keys.first }
subject { described_class.new }
before do
stub_current_geo_node(secondary)
end
shared_examples '#perform' do |skip_tests|
before do
skip('FDW is not configured') if skip_tests
end
before do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true }
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:renew) { true }
Gitlab::Geo::ShardHealthCache.update([shard_name])
end
it 'performs Geo::ProjectSyncWorker for each project' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'performs Geo::ProjectSyncWorker for projects where last attempt to sync failed' do
create(:geo_project_registry, :sync_failed, project: project_in_synced_group)
create(:geo_project_registry, :synced, project: unsynced_project)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).once.and_return(spy)
subject.perform(shard_name)
end
it 'does not perform Geo::ProjectSyncWorker when shard becomes unhealthy' do
Gitlab::Geo::ShardHealthCache.update([])
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'performs Geo::ProjectSyncWorker for synced projects updated recently' do
create(:geo_project_registry, :synced, :repository_dirty, project: project_in_synced_group)
create(:geo_project_registry, :synced, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform(shard_name)
end
it 'does not perform Geo::ProjectSyncWorker when no geo database is configured' do
allow(Gitlab::Geo).to receive(:geo_database_configured?) { false }
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
# We need to unstub here or the DatabaseCleaner will have issues since it
# will appear as though the tracking DB were not available
allow(Gitlab::Geo).to receive(:geo_database_configured?).and_call_original
end
it 'does not perform Geo::ProjectSyncWorker when not running on a secondary' do
allow(Gitlab::Geo).to receive(:secondary?) { false }
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not perform Geo::ProjectSyncWorker when node is disabled' do
allow_any_instance_of(GeoNode).to receive(:enabled?) { false }
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform(shard_name)
end
context 'multiple shards' do
it 'uses two loops to schedule jobs' do
expect(subject).to receive(:schedule_jobs).twice.and_call_original
Gitlab::Geo::ShardHealthCache.update([shard_name, 'shard2', 'shard3', 'shard4', 'shard5'])
secondary.update!(repos_max_capacity: 5)
subject.perform(shard_name)
end
end
context 'when node has namespace restrictions' do
before do
secondary.update_attribute(:namespaces, [synced_group])
end
it 'does not perform Geo::ProjectSyncWorker for projects that do not belong to selected namespaces to replicate' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(project_in_synced_group.id, within(1.minute).of(Time.now))
.once
.and_return(spy)
subject.perform(shard_name)
end
it 'does not perform Geo::ProjectSyncWorker for synced projects updated recently that do not belong to selected namespaces to replicate' do
create(:geo_project_registry, :synced, :repository_dirty, project: project_in_synced_group)
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project)
expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(project_in_synced_group.id, within(1.minute).of(Time.now))
.once
.and_return(spy)
subject.perform(shard_name)
end
end
context 'all repositories fail' do
let!(:project_list) { create_list(:project, 4, :random_last_repository_updated_at) }
before do
# Neither of these are needed for this spec
unsynced_project.destroy
project_in_synced_group.destroy
allow_any_instance_of(described_class).to receive(:db_retrieve_batch_size).and_return(2) # Must be >1 because of the Geo::BaseSchedulerWorker#interleave
secondary.update!(repos_max_capacity: 3) # Must be more than db_retrieve_batch_size
allow_any_instance_of(Project).to receive(:ensure_repository).and_raise(Gitlab::Shell::Error.new('foo'))
allow_any_instance_of(Geo::ProjectRegistry).to receive(:wiki_sync_due?).and_return(false)
allow_any_instance_of(Geo::RepositorySyncService).to receive(:expire_repository_caches)
end
it 'tries to sync every project' do
project_list.each do |project|
expect(Geo::ProjectSyncWorker)
.to receive(:perform_async)
.with(project.id, anything)
.at_least(:once)
.and_call_original
end
3.times do
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end
end
context 'additional shards' do
it 'skips backfill for projects on unhealthy shards' do
missing_not_synced = create(:project, group: synced_group)
missing_not_synced.update_column(:repository_storage, 'unknown')
missing_dirty = create(:project, group: synced_group)
missing_dirty.update_column(:repository_storage, 'unknown')
create(:geo_project_registry, :synced, :repository_dirty, project: missing_dirty)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project_in_synced_group.id, anything)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_not_synced.id, anything)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_dirty.id, anything)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end
end
describe 'when PostgreSQL FDW is available', :geo do
# Skip if FDW isn't activated on this database
it_behaves_like '#perform', Gitlab::Database.postgresql? && !Gitlab::Geo.fdw?
end
describe 'when PostgreSQL FDW is not enabled', :geo do
before do
allow(Gitlab::Geo).to receive(:fdw?).and_return(false)
end
it_behaves_like '#perform', false
end
end
require 'spec_helper' require 'spec_helper'
# Disable transactions via :truncate method because a foreign table describe Geo::RepositorySyncWorker, :geo, :clean_gitlab_redis_cache do
# can't see changes inside a transaction of a different connection.
describe Geo::RepositorySyncWorker, :geo, :truncate do
include ::EE::GeoHelpers include ::EE::GeoHelpers
let!(:primary) { create(:geo_node, :primary) } let!(:primary) { create(:geo_node, :primary) }
...@@ -17,128 +15,9 @@ describe Geo::RepositorySyncWorker, :geo, :truncate do ...@@ -17,128 +15,9 @@ describe Geo::RepositorySyncWorker, :geo, :truncate do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
end end
shared_examples '#perform' do |skip_tests| describe '#perform' do
before do context 'additional shards' do
skip('FDW is not configured') if skip_tests it 'skips backfill for repositories on other shards' do
end
before do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true }
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:renew) { true }
end
it 'performs Geo::ProjectSyncWorker for each project' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform
end
it 'performs Geo::ProjectSyncWorker for projects where last attempt to sync failed' do
create(:geo_project_registry, :sync_failed, project: project_in_synced_group)
create(:geo_project_registry, :synced, project: unsynced_project)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).once.and_return(spy)
subject.perform
end
it 'performs Geo::ProjectSyncWorker for synced projects updated recently' do
create(:geo_project_registry, :synced, :repository_dirty, project: project_in_synced_group)
create(:geo_project_registry, :synced, project: unsynced_project)
create(:geo_project_registry, :synced, :wiki_dirty)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).twice.and_return(spy)
subject.perform
end
it 'does not perform Geo::ProjectSyncWorker when no geo database is configured' do
allow(Gitlab::Geo).to receive(:geo_database_configured?) { false }
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform
# We need to unstub here or the DatabaseCleaner will have issues since it
# will appear as though the tracking DB were not available
allow(Gitlab::Geo).to receive(:geo_database_configured?).and_call_original
end
it 'does not perform Geo::ProjectSyncWorker when not running on a secondary' do
allow(Gitlab::Geo).to receive(:secondary?) { false }
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform
end
it 'does not perform Geo::ProjectSyncWorker when node is disabled' do
allow_any_instance_of(GeoNode).to receive(:enabled?) { false }
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async)
subject.perform
end
context 'when node has namespace restrictions' do
before do
secondary.update_attribute(:namespaces, [synced_group])
end
it 'does not perform Geo::ProjectSyncWorker for projects that do not belong to selected namespaces to replicate' do
expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(project_in_synced_group.id, within(1.minute).of(Time.now))
.once
.and_return(spy)
subject.perform
end
it 'does not perform Geo::ProjectSyncWorker for synced projects updated recently that do not belong to selected namespaces to replicate' do
create(:geo_project_registry, :synced, :repository_dirty, project: project_in_synced_group)
create(:geo_project_registry, :synced, :repository_dirty, project: unsynced_project)
expect(Geo::ProjectSyncWorker).to receive(:perform_async)
.with(project_in_synced_group.id, within(1.minute).of(Time.now))
.once
.and_return(spy)
subject.perform
end
end
context 'all repositories fail' do
let!(:project_list) { create_list(:project, 4, :random_last_repository_updated_at) }
before do
# Neither of these are needed for this spec
unsynced_project.destroy
project_in_synced_group.destroy
allow_any_instance_of(described_class).to receive(:db_retrieve_batch_size).and_return(2) # Must be >1 because of the Geo::BaseSchedulerWorker#interleave
secondary.update!(repos_max_capacity: 3) # Must be more than db_retrieve_batch_size
allow_any_instance_of(Project).to receive(:ensure_repository).and_raise(Gitlab::Shell::Error.new('foo'))
allow_any_instance_of(Geo::ProjectRegistry).to receive(:wiki_sync_due?).and_return(false)
allow_any_instance_of(Geo::RepositorySyncService).to receive(:expire_repository_caches)
end
it 'tries to sync every project' do
project_list.each do |project|
expect(Geo::ProjectSyncWorker)
.to receive(:perform_async)
.with(project.id, anything)
.at_least(:once)
.and_call_original
end
3.times do
Sidekiq::Testing.inline! { subject.perform }
end
end
end
context 'unhealthy shards' do
it 'skips backfill for repositories on unhealthy shards' do
unhealthy_not_synced = create(:project, group: synced_group, repository_storage: 'broken') unhealthy_not_synced = create(:project, group: synced_group, repository_storage: 'broken')
unhealthy_dirty = create(:project, group: synced_group, repository_storage: 'broken') unhealthy_dirty = create(:project, group: synced_group, repository_storage: 'broken')
...@@ -147,9 +26,8 @@ describe Geo::RepositorySyncWorker, :geo, :truncate do ...@@ -147,9 +26,8 @@ describe Geo::RepositorySyncWorker, :geo, :truncate do
# Make the shard unhealthy # Make the shard unhealthy
FileUtils.rm_rf(unhealthy_not_synced.repository_storage_path) FileUtils.rm_rf(unhealthy_not_synced.repository_storage_path)
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project_in_synced_group.id, anything) expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with(project_in_synced_group.repository.storage)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(unhealthy_not_synced.id, anything) expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('broken')
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(unhealthy_dirty.id, anything)
Sidekiq::Testing.inline! { subject.perform } Sidekiq::Testing.inline! { subject.perform }
end end
...@@ -165,25 +43,11 @@ describe Geo::RepositorySyncWorker, :geo, :truncate do ...@@ -165,25 +43,11 @@ describe Geo::RepositorySyncWorker, :geo, :truncate do
# hide the 'broken' storage for this spec # hide the 'broken' storage for this spec
stub_storage_settings({}) stub_storage_settings({})
expect(Geo::ProjectSyncWorker).to receive(:perform_async).with(project_in_synced_group.id, anything) expect(Geo::RepositoryShardSyncWorker).to receive(:perform_async).with(project_in_synced_group.repository.storage)
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_not_synced.id, anything) expect(Geo::RepositoryShardSyncWorker).not_to receive(:perform_async).with('unknown')
expect(Geo::ProjectSyncWorker).not_to receive(:perform_async).with(missing_dirty.id, anything)
Sidekiq::Testing.inline! { subject.perform } Sidekiq::Testing.inline! { subject.perform }
end end
end end
end end
describe 'when PostgreSQL FDW is available', :geo do
# Skip if FDW isn't activated on this database
it_behaves_like '#perform', Gitlab::Database.postgresql? && !Gitlab::Geo.fdw?
end
describe 'when PostgreSQL FDW is not enabled', :geo do
before do
allow(Gitlab::Geo).to receive(:fdw?).and_return(false)
end
it_behaves_like '#perform', false
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment