Geo - Calculate repositories checksum on primary node

parent fe1a3903
......@@ -112,13 +112,16 @@
- cronjob:geo_metrics_update
- cronjob:geo_prune_event_log
- cronjob:geo_repository_sync
- cronjob:geo_repository_verification_primary_batch
- cronjob:geo_sidekiq_cron_config
- cronjob:historical_data
- cronjob:ldap_all_groups_sync
- cronjob:ldap_sync
- cronjob:update_all_mirrors
- geo:geo_base_scheduler
- geo:geo_scheduler_base
- geo:geo_scheduler_primary
- geo:geo_scheduler_secondary
- geo:geo_file_download
- geo:geo_file_removal
- geo:geo_hashed_storage_attachments_migration
......@@ -128,6 +131,8 @@
- geo:geo_repositories_clean_up
- geo:geo_repository_destroy
- geo:geo_repository_shard_sync
- geo:geo_repository_verification_primary_shard
- geo:geo_repository_verification_primary_single
- object_storage_upload
- object_storage:object_storage_background_move
......
......@@ -56,7 +56,7 @@ class PostReceive
end
def process_wiki_changes(post_received)
# Nothing defined here yet.
post_received.project.touch(:last_activity_at, :last_repository_updated_at)
end
def log(message)
......
......@@ -472,6 +472,9 @@ Settings.cron_jobs['geo_file_download_dispatch_worker']['job_class'] ||= 'Geo::F
Settings.cron_jobs['geo_prune_event_log_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_prune_event_log_worker']['cron'] ||= '0 */6 * * *'
Settings.cron_jobs['geo_prune_event_log_worker']['job_class'] ||= 'Geo::PruneEventLogWorker'
Settings.cron_jobs['geo_repository_verification_primary_batch_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_repository_verification_primary_batch_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['geo_repository_verification_primary_batch_worker']['job_class'] ||= 'Geo::RepositoryVerification::Primary::BatchWorker'
Settings.cron_jobs['import_export_project_cleanup_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['import_export_project_cleanup_worker']['cron'] ||= '0 * * * *'
Settings.cron_jobs['import_export_project_cleanup_worker']['job_class'] = 'ImportExportProjectCleanupWorker'
......
......@@ -1873,6 +1873,20 @@ ActiveRecord::Schema.define(version: 20180306074045) do
add_index "project_mirror_data", ["next_execution_timestamp", "retry_count"], name: "index_mirror_data_on_next_execution_and_retry_count", using: :btree
add_index "project_mirror_data", ["project_id"], name: "index_project_mirror_data_on_project_id", unique: true, using: :btree
create_table "project_repository_states", force: :cascade do |t|
t.integer "project_id", null: false
t.binary "repository_verification_checksum"
t.binary "wiki_verification_checksum"
t.boolean "last_repository_verification_failed", default: false, null: false
t.boolean "last_wiki_verification_failed", default: false, null: false
t.datetime_with_timezone "last_repository_verification_at"
t.datetime_with_timezone "last_wiki_verification_at"
t.string "last_repository_verification_failure"
t.string "last_wiki_verification_failure"
end
add_index "project_repository_states", ["project_id"], name: "index_project_repository_states_on_project_id", unique: true, using: :btree
create_table "project_statistics", force: :cascade do |t|
t.integer "project_id", null: false
t.integer "namespace_id", null: false
......@@ -2664,6 +2678,7 @@ ActiveRecord::Schema.define(version: 20180306074045) do
add_foreign_key "project_group_links", "projects", name: "fk_daa8cee94c", on_delete: :cascade
add_foreign_key "project_import_data", "projects", name: "fk_ffb9ee3a10", on_delete: :cascade
add_foreign_key "project_mirror_data", "projects", name: "fk_d1aad367d7", on_delete: :cascade
add_foreign_key "project_repository_states", "projects", on_delete: :cascade
add_foreign_key "project_statistics", "projects", on_delete: :cascade
add_foreign_key "protected_branch_merge_access_levels", "namespaces", column: "group_id", name: "fk_98f3d044fe", on_delete: :cascade
add_foreign_key "protected_branch_merge_access_levels", "protected_branches", name: "fk_8a3072ccb3", on_delete: :cascade
......
module Geo
class RepositoryVerificationFinder
def find_outdated_projects(batch_size:)
Project.select(:id)
.joins(:repository_state)
.where(repository_unverified
.or(repository_verification_outdated)
.or(wiki_unverified)
.or(wiki_verification_outdated))
.order(repository_state_table[:last_repository_verification_at].asc)
.limit(batch_size)
end
def find_unverified_projects(batch_size:)
Project.select(:id)
.joins(left_join_repository_state)
.where(repository_never_verified)
.order(projects_table[:last_repository_updated_at].asc)
.limit(batch_size)
end
protected
def projects_table
Project.arel_table
end
def repository_state_table
ProjectRepositoryState.arel_table
end
def left_join_repository_state
projects_table
.join(repository_state_table, Arel::Nodes::OuterJoin)
.on(projects_table[:id].eq(repository_state_table[:project_id]))
.join_sources
end
def repository_unverified
repository_state_table[:repository_verification_checksum].eq(nil)
end
def repository_verification_outdated
repository_state_table[:last_repository_verification_at]
.lt(projects_table[:last_repository_updated_at])
end
def wiki_unverified
repository_state_table[:wiki_verification_checksum].eq(nil)
end
def wiki_verification_outdated
repository_state_table[:last_wiki_verification_at]
.lt(projects_table[:last_repository_updated_at])
end
def repository_never_verified
repository_state_table[:id].eq(nil)
end
end
end
......@@ -12,6 +12,7 @@ module EE
include Elastic::ProjectsSearch
prepend ImportStatusStateMachine
include EE::DeploymentPlatform
include EachBatch
before_validation :mark_remote_mirrors_for_removal
......@@ -24,6 +25,7 @@ module EE
belongs_to :mirror_user, foreign_key: 'mirror_user_id', class_name: 'User'
has_one :repository_state, class_name: 'ProjectRepositoryState', inverse_of: :project
has_one :mirror_data, autosave: true, class_name: 'ProjectMirrorData'
has_one :push_rule, ->(project) { project&.feature_available?(:push_rules) ? all : none }
has_one :index_status
......
class ProjectRepositoryState < ActiveRecord::Base
include ShaAttribute
sha_attribute :repository_verification_checksum
sha_attribute :wiki_verification_checksum
belongs_to :project, inverse_of: :repository_state
validates :project, presence: true, uniqueness: true
def repository_checksum_outdated?(timestamp)
repository_verification_checksum.nil? || recalculate_repository_checksum?(timestamp)
end
def wiki_checksum_outdated?(timestamp)
return false unless project.wiki_enabled?
wiki_verification_checksum.nil? || recalculate_wiki_checksum?(timestamp)
end
private
def recalculate_repository_checksum?(timestamp)
last_repository_verification_at.nil? || timestamp > last_repository_verification_at
end
def recalculate_wiki_checksum?(timestamp)
last_wiki_verification_at.nil? || timestamp > last_wiki_verification_at
end
end
module Geo
class BaseSchedulerWorker
include ApplicationWorker
include GeoQueue
include ExclusiveLeaseGuard
include ::Gitlab::Utils::StrongMemoize
DB_RETRIEVE_BATCH_SIZE = 1000
LEASE_TIMEOUT = 60.minutes
RUN_TIME = 60.minutes.to_i
attr_reader :pending_resources, :scheduled_jobs, :start_time, :loops
def initialize
@pending_resources = []
@scheduled_jobs = []
end
# The scheduling works as the following:
#
# 1. Load a batch of IDs that we need to schedule (DB_RETRIEVE_BATCH_SIZE) into a pending list.
# 2. Schedule them so that at most `max_capacity` are running at once.
# 3. When a slot frees, schedule another job.
# 4. When we have drained the pending list, load another batch into memory, and schedule the
# remaining jobs, excluding ones in progress.
# 5. Quit when we have scheduled all jobs or exceeded MAX_RUNTIME.
def perform
return unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary?
@start_time = Time.now.utc
@loops = 0
# Prevent multiple Sidekiq workers from attempting to schedule jobs
try_obtain_lease do
log_info('Started scheduler')
reason = :unknown
begin
reason = loop do
break :node_disabled unless node_enabled?
update_jobs_in_progress
update_pending_resources
break :over_time if over_time?
break :complete unless resources_remain?
# If we're still under the limit after refreshing from the DB, we
# can end after scheduling the remaining transfers.
last_batch = reload_queue?
schedule_jobs
@loops += 1
break :last_batch if last_batch
break :lease_lost unless renew_lease!
sleep(1)
end
rescue => err
reason = :error
log_error(err.message)
raise err
ensure
duration = Time.now.utc - start_time
log_info('Finished scheduler', total_loops: loops, duration: duration, reason: reason)
end
end
end
private
def worker_metadata
end
def db_retrieve_batch_size
DB_RETRIEVE_BATCH_SIZE
end
def lease_timeout
LEASE_TIMEOUT
end
def max_capacity
raise NotImplementedError
end
def run_time
RUN_TIME
end
def reload_queue?
pending_resources.size < max_capacity
end
def resources_remain?
!pending_resources.empty?
end
def over_time?
(Time.now.utc - start_time) >= run_time
end
def take_batch(*arrays)
interleave(*arrays).uniq.compact.take(db_retrieve_batch_size)
end
# Combines the elements of multiple, arbitrary-length arrays into a single array.
#
# Each array is spread evenly over the resultant array.
# The order of the original arrays is preserved within the resultant array.
# In the case of ties between elements, the element from the first array goes first.
# From https://stackoverflow.com/questions/15628936/ruby-equally-distribute-elements-and-interleave-merge-multiple-arrays/15639147#15639147
#
# For examples, see the specs in file_download_dispatch_worker_spec.rb
def interleave(*arrays)
elements = []
coefficients = []
arrays.each_with_index do |e, index|
elements += e
coefficients += interleave_coefficients(e, index)
end
combined = elements.zip(coefficients)
combined.sort_by { |zipped| zipped[1] }.map { |zipped| zipped[0] }
end
# Assigns a position to each element in order to spread out arrays evenly.
#
# `array_index` is used to resolve ties between arrays of equal length.
#
# Examples:
#
# irb(main):006:0> interleave_coefficients(['a', 'b'], 0)
# => [0.2499998750000625, 0.7499996250001875]
# irb(main):027:0> interleave_coefficients(['a', 'b', 'c'], 0)
# => [0.16666661111112963, 0.4999998333333889, 0.8333330555556481]
# irb(main):007:0> interleave_coefficients(['a', 'b', 'c'], 1)
# => [0.16699994433335189, 0.5003331665556111, 0.8336663887778704]
def interleave_coefficients(array, array_index)
(1..array.size).map do |i|
(i - 0.5 + array_index / 1000.0) / (array.size + 1e-6)
end
end
def update_jobs_in_progress
status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids)
# SidekiqStatus returns an array of booleans: true if the job is still running, false otherwise.
# For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, running)| job if running }.compact
end
def update_pending_resources
@pending_resources = load_pending_resources if reload_queue?
end
def schedule_jobs
capacity = max_capacity
num_to_schedule = [capacity - scheduled_job_ids.size, pending_resources.size].min
num_to_schedule = 0 if num_to_schedule < 0
to_schedule = pending_resources.shift(num_to_schedule)
scheduled = to_schedule.map do |args|
job = schedule_job(*args)
job if job&.fetch(:job_id, nil).present?
end.compact
scheduled_jobs.concat(scheduled)
log_info("Loop #{loops}", enqueued: scheduled.length, pending: pending_resources.length, scheduled: scheduled_jobs.length, capacity: capacity)
end
def scheduled_job_ids
scheduled_jobs.map { |data| data[:job_id] }
end
def current_node
Gitlab::Geo.current_node
end
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && @last_enabled_check > 1.minute.ago
@last_enabled_check = Time.now
clear_memoization(:current_node_enabled)
end
strong_memoize(:current_node_enabled) do
Gitlab::Geo.current_node_enabled?
end
end
def log_info(message, extra_args = {})
args = { class: self.class.name, message: message }.merge(extra_args)
args.merge!(worker_metadata) if worker_metadata
Gitlab::Geo::Logger.info(args)
end
def log_error(message, extra_args = {})
args = { class: self.class.name, message: message }.merge(extra_args)
args.merge!(worker_metadata) if worker_metadata
Gitlab::Geo::Logger.error(args)
end
end
end
module Geo
class FileDownloadDispatchWorker < Geo::BaseSchedulerWorker
class FileDownloadDispatchWorker < Geo::Scheduler::SecondaryWorker
include CronjobQueue
private
......
module Geo
class RepositoryShardSyncWorker < Geo::BaseSchedulerWorker
class RepositoryShardSyncWorker < Geo::Scheduler::SecondaryWorker
sidekiq_options retry: false
attr_accessor :shard_name
......
module Geo
module RepositoryVerification
module Primary
class BatchWorker
include ApplicationWorker
include CronjobQueue
include ::Gitlab::Utils::StrongMemoize
HEALTHY_SHARD_CHECKS = [
Gitlab::HealthChecks::FsShardsCheck,
Gitlab::HealthChecks::GitalyCheck
].freeze
def perform
return unless Feature.enabled?('geo_repository_verification')
return unless Gitlab::Geo.primary?
Gitlab::Geo::ShardHealthCache.update(healthy_shards)
healthy_shards.each do |shard_name|
Geo::RepositoryVerification::Primary::ShardWorker.perform_async(shard_name)
end
end
def healthy_shards
strong_memoize(:healthy_shards) do
# For now, we need to perform both Gitaly and direct filesystem checks to ensure
# the shard is healthy. We take the intersection of the successful checks
# as the healthy shards.
HEALTHY_SHARD_CHECKS.map(&:readiness)
.map { |check_result| check_result.select(&:success) }
.inject(:&)
.map { |check_result| check_result.labels[:shard] }
.compact
.uniq
end
end
end
end
end
end
module Geo
module RepositoryVerification
module Primary
class ShardWorker < Geo::Scheduler::PrimaryWorker
sidekiq_options retry: false
MAX_CAPACITY = 100
attr_accessor :shard_name
def perform(shard_name)
@shard_name = shard_name
return unless Gitlab::Geo::ShardHealthCache.healthy_shard?(shard_name)
super()
end
private
def worker_metadata
{ shard: shard_name }
end
# We need a custom key here since we are running one worker per shard
def lease_key
@lease_key ||= "#{self.class.name.underscore}:shard:#{shard_name}"
end
def max_capacity
MAX_CAPACITY
end
def schedule_job(project_id)
job_id = Geo::RepositoryVerification::Primary::SingleWorker.perform_async(project_id, Time.now)
{ id: project_id, job_id: job_id } if job_id
end
def finder
@finder ||= Geo::RepositoryVerificationFinder.new
end
def load_pending_resources
resources = find_unverified_project_ids(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.size
if remaining_capacity.zero?
resources
else
resources + find_outdated_project_ids(batch_size: remaining_capacity)
end
end
def find_unverified_project_ids(batch_size:)
shard_restriction(finder.find_unverified_projects(batch_size: batch_size))
.pluck(:id)
end
def find_outdated_project_ids(batch_size:)
shard_restriction(finder.find_outdated_projects(batch_size: batch_size))
.pluck(:id)
end
def shard_restriction(relation)
relation.where(repository_storage: shard_name)
end
end
end
end
end
module Geo
module RepositoryVerification
module Primary
class SingleWorker
include ApplicationWorker
include GeoQueue
include ExclusiveLeaseGuard
include Gitlab::Geo::ProjectLogHelpers
LEASE_TIMEOUT = 1.hour.to_i
attr_reader :project
def perform(project_id, scheduled_time)
return unless Gitlab::Geo.primary?
@project = Project.find_by(id: project_id)
return if project.nil? || project.pending_delete?
try_obtain_lease do
calculate_repository_checksum if repository_state.repository_checksum_outdated?(scheduled_time)
calculate_wiki_checksum if repository_state.wiki_checksum_outdated?(scheduled_time)
end
end
private
def calculate_repository_checksum
calculate_checksum(:repository, project.disk_path)
end
def calculate_wiki_checksum
calculate_checksum(:wiki, project.wiki.disk_path)
end
def calculate_checksum(type, repository_relative_path)
checksum = Gitlab::Git::Checksum.new(project.repository_storage, repository_relative_path)
update_repository_state!(type, checksum: checksum.calculate)
rescue => e
log_error('Error calculating the repository checksum', e, type: type)
update_repository_state!(type, failed: true, failure: e.message)
end
def update_repository_state!(type, checksum: nil, failed: false, failure: nil)
repository_state.update!(
"#{type}_verification_checksum" => checksum,
"last_#{type}_verification_at" => DateTime.now,
"last_#{type}_verification_failed" => failed,
"last_#{type}_verification_failure" => failure
)
end
def repository_state
@repository_state ||= project.repository_state || project.build_repository_state
end
def lease_key
"geo:single_repository_verification_worker:#{project.id}"
end
def lease_timeout
LEASE_TIMEOUT
end
end
end
end
end
module Geo
module Scheduler
class BaseWorker
include ApplicationWorker
include GeoQueue
include ExclusiveLeaseGuard
include ::Gitlab::Utils::StrongMemoize
DB_RETRIEVE_BATCH_SIZE = 1000
LEASE_TIMEOUT = 60.minutes
RUN_TIME = 60.minutes.to_i
attr_reader :pending_resources, :scheduled_jobs, :start_time, :loops
def initialize
@pending_resources = []
@scheduled_jobs = []
end
# The scheduling works as the following:
#
# 1. Load a batch of IDs that we need to schedule (DB_RETRIEVE_BATCH_SIZE) into a pending list.
# 2. Schedule them so that at most `max_capacity` are running at once.
# 3. When a slot frees, schedule another job.
# 4. When we have drained the pending list, load another batch into memory, and schedule the
# remaining jobs, excluding ones in progress.
# 5. Quit when we have scheduled all jobs or exceeded MAX_RUNTIME.
def perform
@start_time = Time.now.utc
@loops = 0
# Prevent multiple Sidekiq workers from attempting to schedule jobs
try_obtain_lease do
log_info('Started scheduler')
reason = :unknown
begin
reason = loop do
break :node_disabled unless node_enabled?
update_jobs_in_progress
update_pending_resources
break :over_time if over_time?
break :complete unless resources_remain?
# If we're still under the limit after refreshing from the DB, we
# can end after scheduling the remaining transfers.
last_batch = reload_queue?
schedule_jobs
@loops += 1
break :last_batch if last_batch
break :lease_lost unless renew_lease!
sleep(1)
end
rescue => err
reason = :error
log_error(err.message)
raise err
ensure
duration = Time.now.utc - start_time
log_info('Finished scheduler', total_loops: loops, duration: duration, reason: reason)
end
end
end
private
def worker_metadata
end
def db_retrieve_batch_size
DB_RETRIEVE_BATCH_SIZE
end
def lease_timeout
LEASE_TIMEOUT
end
def max_capacity
raise NotImplementedError
end
def run_time
RUN_TIME
end
def reload_queue?
pending_resources.size < max_capacity
end
def resources_remain?
!pending_resources.empty?
end
def over_time?
(Time.now.utc - start_time) >= run_time
end
def take_batch(*arrays)
interleave(*arrays).uniq.compact.take(db_retrieve_batch_size)
end
# Combines the elements of multiple, arbitrary-length arrays into a single array.
#
# Each array is spread evenly over the resultant array.
# The order of the original arrays is preserved within the resultant array.
# In the case of ties between elements, the element from the first array goes first.
# From https://stackoverflow.com/questions/15628936/ruby-equally-distribute-elements-and-interleave-merge-multiple-arrays/15639147#15639147
#
# For examples, see the specs in file_download_dispatch_worker_spec.rb
def interleave(*arrays)
elements = []
coefficients = []
arrays.each_with_index do |e, index|
elements += e
coefficients += interleave_coefficients(e, index)
end
combined = elements.zip(coefficients)
combined.sort_by { |zipped| zipped[1] }.map { |zipped| zipped[0] }
end
# Assigns a position to each element in order to spread out arrays evenly.
#
# `array_index` is used to resolve ties between arrays of equal length.
#
# Examples:
#
# irb(main):006:0> interleave_coefficients(['a', 'b'], 0)
# => [0.2499998750000625, 0.7499996250001875]
# irb(main):027:0> interleave_coefficients(['a', 'b', 'c'], 0)
# => [0.16666661111112963, 0.4999998333333889, 0.8333330555556481]
# irb(main):007:0> interleave_coefficients(['a', 'b', 'c'], 1)
# => [0.16699994433335189, 0.5003331665556111, 0.8336663887778704]
def interleave_coefficients(array, array_index)
(1..array.size).map do |i|
(i - 0.5 + array_index / 1000.0) / (array.size + 1e-6)
end
end
def update_jobs_in_progress
status = Gitlab::SidekiqStatus.job_status(scheduled_job_ids)
# SidekiqStatus returns an array of booleans: true if the job is still running, false otherwise.
# For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_jobs = @scheduled_jobs.zip(status).map { |(job, running)| job if running }.compact
end
def update_pending_resources
@pending_resources = load_pending_resources if reload_queue?
end
def schedule_jobs
capacity = max_capacity
num_to_schedule = [capacity - scheduled_job_ids.size, pending_resources.size].min
num_to_schedule = 0 if num_to_schedule < 0
to_schedule = pending_resources.shift(num_to_schedule)
scheduled = to_schedule.map do |args|
job = schedule_job(*args)
job if job&.fetch(:job_id, nil).present?
end.compact
scheduled_jobs.concat(scheduled)
log_info("Loop #{loops}", enqueued: scheduled.length, pending: pending_resources.length, scheduled: scheduled_jobs.length, capacity: capacity)
end
def scheduled_job_ids
scheduled_jobs.map { |data| data[:job_id] }
end
def current_node
Gitlab::Geo.current_node
end
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && @last_enabled_check > 1.minute.ago
@last_enabled_check = Time.now
clear_memoization(:current_node_enabled)
end
strong_memoize(:current_node_enabled) do
Gitlab::Geo.current_node_enabled?
end
end
def log_info(message, extra_args = {})
args = { class: self.class.name, message: message }.merge(extra_args)
args.merge!(worker_metadata) if worker_metadata
Gitlab::Geo::Logger.info(args)
end
def log_error(message, extra_args = {})
args = { class: self.class.name, message: message }.merge(extra_args)
args.merge!(worker_metadata) if worker_metadata
Gitlab::Geo::Logger.error(args)
end
end
end
end
module Geo
module Scheduler
class PrimaryWorker < Geo::Scheduler::BaseWorker
def perform
return unless Gitlab::Geo.primary?
super
end
end
end
end
module Geo
module Scheduler
class SecondaryWorker < Geo::Scheduler::BaseWorker
def perform
return unless Gitlab::Geo.geo_database_configured?
return unless Gitlab::Geo.secondary?
super
end
end
end
end
class CreateProjectRepositoryStates < ActiveRecord::Migration
DOWNTIME = false
def change
create_table :project_repository_states do |t|
t.references :project, null: false, index: { unique: true }, foreign_key: { on_delete: :cascade }
t.binary :repository_verification_checksum
t.binary :wiki_verification_checksum
t.boolean :last_repository_verification_failed, null: false, default: false
t.boolean :last_wiki_verification_failed, null: false, default: false
t.datetime_with_timezone :last_repository_verification_at
t.datetime_with_timezone :last_wiki_verification_at
t.string :last_repository_verification_failure
t.string :last_wiki_verification_failure
end
end
end
......@@ -2,11 +2,13 @@ module Gitlab
module Geo
class CronManager
COMMON_JOBS = %w[geo_metrics_update_worker].freeze
PRIMARY_JOBS = %w[geo_repository_verification_primary_batch_worker].freeze
SECONDARY_JOBS = %w[
geo_repository_sync_worker
geo_file_download_dispatch_worker
].freeze
GEO_JOBS = (COMMON_JOBS + SECONDARY_JOBS).freeze
GEO_JOBS = (COMMON_JOBS + PRIMARY_JOBS + SECONDARY_JOBS).freeze
CONFIG_WATCHER = 'geo_sidekiq_cron_config_worker'.freeze
CONFIG_WATCHER_CLASS = 'Geo::SidekiqCronConfigWorker'.freeze
......
module Gitlab
module Git
class Checksum
include Gitlab::Git::Popen
EMPTY_REPOSITORY_CHECKSUM = '0000000000000000000000000000000000000000'.freeze
Failure = Class.new(StandardError)
attr_reader :path, :relative_path, :storage, :storage_path
def initialize(storage, relative_path)
@storage = storage
@storage_path = Gitlab.config.repositories.storages[storage]['path']
@relative_path = "#{relative_path}.git"
@path = File.join(storage_path, @relative_path)
end
def calculate
unless repository_exists?
failure!(Gitlab::Git::Repository::NoRepository, 'No repository for such path')
end
calculate_checksum_by_shelling_out
end
private
def repository_exists?
raw_repository.exists?
end
def calculate_checksum_by_shelling_out
args = %W(--git-dir=#{path} show-ref --heads --tags)
output, status = run_git(args)
if status&.zero?
refs = output.split("\n")
refs.inject(nil) do |checksum, ref|
value = Digest::SHA1.hexdigest(ref)
if checksum.nil?
value
else
(checksum.hex ^ value.hex).to_s(16)
end
end
else
# Empty repositories return with a non-zero status and an empty output.
if output&.empty?
EMPTY_REPOSITORY_CHECKSUM
else
failure!(Gitlab::Git::Checksum::Failure, output)
end
end
end
def failure!(klass, message)
Gitlab::GitLogger.error("'git show-ref --heads --tags' in #{path}: #{message}")
raise klass.new("Could not calculate the checksum for #{path}: #{message}")
end
def circuit_breaker
@circuit_breaker ||= Gitlab::Git::Storage::CircuitBreaker.for_storage(storage)
end
def raw_repository
Gitlab::Git::Repository.new(storage, relative_path, nil)
end
def run_git(args)
circuit_breaker.perform do
popen([Gitlab.config.git.bin_path, *args], path)
end
end
end
end
end
FactoryBot.define do
factory :repository_state, class: 'ProjectRepositoryState' do
project
trait :repository_outdated do
repository_verification_checksum 'f079a831cab27bcda7d81cd9b48296d0c3dd92ee'
last_repository_verification_at { 5.days.ago }
last_repository_verification_failed false
end
trait :repository_verified do
repository_verification_checksum 'f079a831cab27bcda7d81cd9b48296d0c3dd92ee'
last_repository_verification_failed false
last_repository_verification_at { Time.now }
end
trait :wiki_outdated do
repository_verification_checksum 'f079a831cab27bcda7d81cd9b48296d0c3dd92ee'
last_repository_verification_at { 5.days.ago }
last_repository_verification_failed false
end
trait :wiki_verified do
wiki_verification_checksum 'e079a831cab27bcda7d81cd9b48296d0c3dd92ef'
last_wiki_verification_failed false
last_wiki_verification_at { Time.now }
end
end
end
require 'spec_helper'
describe Geo::RepositoryVerificationFinder, :postgresql do
set(:project) { create(:project) }
describe '#find_outdated_projects' do
it 'returns projects where repository verification is outdated' do
create(:repository_state, :repository_outdated, project: project)
expect(subject.find_outdated_projects(batch_size: 10))
.to match_array(project)
end
it 'returns projects where repository verification is pending' do
create(:repository_state, :wiki_verified, project: project)
expect(subject.find_outdated_projects(batch_size: 10))
.to match_array(project)
end
it 'returns projects where wiki verification is outdated' do
create(:repository_state, :wiki_outdated, project: project)
expect(subject.find_outdated_projects(batch_size: 10))
.to match_array(project)
end
it 'returns projects where wiki verification is pending' do
create(:repository_state, :repository_verified, project: project)
expect(subject.find_outdated_projects(batch_size: 10))
.to match_array(project)
end
it 'returns verified projects that repositories have changed' do
repository_outdated = create(:repository_state, :repository_outdated).project
repository_outdated.update_column(:last_repository_updated_at, 6.hours.ago)
wiki_outdated = create(:repository_state, :wiki_outdated).project
wiki_outdated.update_column(:last_repository_updated_at, 48.hours.ago)
expect(subject.find_outdated_projects(batch_size: 10))
.to match_array([repository_outdated, wiki_outdated])
end
end
describe '#find_unverified_projects' do
it 'returns projects that never have been verified' do
expect(subject.find_unverified_projects(batch_size: 10))
.to match_array(project)
end
end
end
......@@ -23,7 +23,13 @@ describe Gitlab::Geo::CronManager, :geo do
job.enable!
end
JOBS = %w(ldap_test geo_repository_sync_worker geo_file_download_dispatch_worker geo_metrics_update_worker).freeze
JOBS = %w[
ldap_test
geo_repository_verification_primary_batch_worker
geo_repository_sync_worker
geo_file_download_dispatch_worker
geo_metrics_update_worker
].freeze
before(:all) do
JOBS.each { |name| init_cron_job(name, name.camelize) }
......@@ -35,6 +41,7 @@ describe Gitlab::Geo::CronManager, :geo do
let(:common_jobs) { [job('geo_metrics_update_worker')] }
let(:ldap_test_job) { job('ldap_test') }
let(:primary_jobs) { [job('geo_repository_verification_primary_batch_worker')] }
let(:secondary_jobs) do
[
......@@ -58,6 +65,10 @@ describe Gitlab::Geo::CronManager, :geo do
expect(common_jobs).to all(be_enabled)
end
it 'enables primary-only jobs' do
expect(primary_jobs).to all(be_enabled)
end
it 'enables non-geo jobs' do
expect(ldap_test_job).to be_enabled
end
......@@ -78,6 +89,10 @@ describe Gitlab::Geo::CronManager, :geo do
expect(common_jobs).to all(be_enabled)
end
it 'disables primary-only jobs' do
primary_jobs.each { |job| expect(job).not_to be_enabled }
end
it 'disables non-geo jobs' do
expect(ldap_test_job).not_to be_enabled
end
......@@ -90,6 +105,10 @@ describe Gitlab::Geo::CronManager, :geo do
manager.execute
end
it 'disables primary-only jobs' do
primary_jobs.each { |job| expect(job).not_to be_enabled }
end
it 'disables secondary-only jobs' do
secondary_jobs.each { |job| expect(job).not_to be_enabled }
end
......
require 'spec_helper'
describe Gitlab::Git::Checksum, seed_helper: true do
let(:storage) { 'default' }
it 'raises Gitlab::Git::Repository::NoRepository when there is no repo' do
checksum = described_class.new(storage, 'nonexistent-repo')
expect { checksum.calculate }.to raise_error Gitlab::Git::Repository::NoRepository
end
it 'pretends that checksum is 000000... when the repo is empty' do
FileUtils.rm_rf(File.join(SEED_STORAGE_PATH, 'empty-repo.git'))
system(git_env, *%W(#{Gitlab.config.git.bin_path} init --bare empty-repo.git),
chdir: SEED_STORAGE_PATH,
out: '/dev/null',
err: '/dev/null')
checksum = described_class.new(storage, 'empty-repo')
expect(checksum.calculate).to eq '0000000000000000000000000000000000000000'
end
it 'raises Gitlab::Git::Repository::Failure when shelling out to git return non-zero status' do
checksum = described_class.new(storage, 'gitlab-git-test')
allow(checksum).to receive(:popen).and_return(['output', nil])
expect { checksum.calculate }.to raise_error Gitlab::Git::Checksum::Failure
end
it 'calculates the checksum when there is a repo' do
checksum = described_class.new(storage, 'gitlab-git-test')
expect(checksum.calculate).to eq '54f21be4c32c02f6788d72207fa03ad3bce725e4'
end
end
require 'rails_helper'
describe ProjectRepositoryState do
using RSpec::Parameterized::TableSyntax
set(:project) { create(:project) }
set(:repository_state) { create(:repository_state, project_id: project.id) }
subject { repository_state }
describe 'assocations' do
it { is_expected.to belong_to(:project).inverse_of(:repository_state) }
end
describe 'validations' do
it { is_expected.to validate_presence_of(:project) }
it { is_expected.to validate_uniqueness_of(:project) }
end
describe '#repository_checksum_outdated?' do
where(:repository_verification_checksum, :last_repository_verification_at, :expected) do
now = Time.now
past = now - 1.year
future = now + 1.year
nil | nil | true
'123' | nil | true
'123' | past | true
'123' | now | true
'123' | future | false
end
with_them do
before do
repository_state.update!(repository_verification_checksum: repository_verification_checksum, last_repository_verification_at: last_repository_verification_at)
end
subject { repository_state.repository_checksum_outdated?(Time.now) }
it { is_expected.to eq(expected) }
end
end
describe '#wiki_checksum_outdated?' do
where(:wiki_verification_checksum, :last_wiki_verification_at, :expected) do
now = Time.now
past = now - 1.year
future = now + 1.year
nil | nil | true
'123' | nil | true
'123' | past | true
'123' | now | true
'123' | future | false
end
with_them do
before do
repository_state.update!(wiki_verification_checksum: wiki_verification_checksum, last_wiki_verification_at: last_wiki_verification_at)
end
subject { repository_state.wiki_checksum_outdated?(Time.now) }
context 'wiki enabled' do
it { is_expected.to eq(expected) }
end
context 'wiki disabled' do
before do
project.update!(wiki_enabled: false)
end
it { is_expected.to be_falsy }
end
end
end
end
......@@ -14,6 +14,8 @@ describe Project do
it { is_expected.to delegate_method(:shared_runners_minutes_used?).to(:shared_runners_limit_namespace) }
it { is_expected.to have_one(:mirror_data).class_name('ProjectMirrorData') }
it { is_expected.to have_one(:repository_state).class_name('ProjectRepositoryState').inverse_of(:project) }
it { is_expected.to have_many(:path_locks) }
it { is_expected.to have_many(:sourced_pipelines) }
it { is_expected.to have_many(:source_pipelines) }
......
......@@ -110,7 +110,7 @@ describe Geo::FileDownloadDispatchWorker, :geo do
# 1. A total of 10 files in the queue, and we can load a maximimum of 5 and send 2 at a time.
# 2. We send 2, wait for 1 to finish, and then send again.
it 'attempts to load a new batch without pending downloads' do
stub_const('Geo::BaseSchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 5)
stub_const('Geo::Scheduler::BaseWorker::DB_RETRIEVE_BATCH_SIZE', 5)
secondary.update!(files_max_capacity: 2)
allow_any_instance_of(::Gitlab::Geo::Transfer).to receive(:download_from_primary).and_return(100)
......
require 'spec_helper'
describe Geo::RepositoryVerification::Primary::BatchWorker, :postgresql, :clean_gitlab_redis_cache do
include ::EE::GeoHelpers
set(:healthy_not_verified) { create(:project) }
let!(:primary) { create(:geo_node, :primary) }
let(:healthy_shard) { healthy_not_verified.repository.storage }
before do
stub_current_geo_node(primary)
end
around do |example|
Sidekiq::Testing.inline! { example.run }
end
describe '#perform' do
context 'when geo_repository_verification is enabled' do
before do
stub_feature_flags(geo_repository_verification: true)
end
it 'skips backfill for repositories on other shards' do
unhealthy_not_verified = create(:project, repository_storage: 'broken')
unhealthy_outdated = create(:project, repository_storage: 'broken')
create(:repository_state, :repository_outdated, project: unhealthy_outdated)
# Make the shard unhealthy
FileUtils.rm_rf(unhealthy_not_verified.repository_storage_path)
expect(Geo::RepositoryVerification::Primary::ShardWorker).to receive(:perform_async).with(healthy_shard)
expect(Geo::RepositoryVerification::Primary::ShardWorker).not_to receive(:perform_async).with('broken')
subject.perform
end
it 'skips backfill for projects on missing shards' do
missing_not_verified = create(:project)
missing_not_verified.update_column(:repository_storage, 'unknown')
missing_outdated = create(:project)
missing_outdated.update_column(:repository_storage, 'unknown')
create(:repository_state, :repository_outdated, project: missing_outdated)
# hide the 'broken' storage for this spec
stub_storage_settings({})
expect(Geo::RepositoryVerification::Primary::ShardWorker).to receive(:perform_async).with(healthy_shard)
expect(Geo::RepositoryVerification::Primary::ShardWorker).not_to receive(:perform_async).with('unknown')
subject.perform
end
it 'skips backfill for projects with downed Gitaly server' do
create(:project, repository_storage: 'broken')
unhealthy_outdated = create(:project, repository_storage: 'broken')
create(:repository_state, :repository_outdated, project: unhealthy_outdated)
# Report only one healthy shard
expect(Gitlab::HealthChecks::FsShardsCheck).to receive(:readiness)
.and_return([result(true, healthy_shard), result(true, 'broken')])
expect(Gitlab::HealthChecks::GitalyCheck).to receive(:readiness)
.and_return([result(true, healthy_shard), result(false, 'broken')])
expect(Geo::RepositoryVerification::Primary::ShardWorker).to receive(:perform_async).with(healthy_shard)
expect(Geo::RepositoryVerification::Primary::ShardWorker).not_to receive(:perform_async).with('broken')
subject.perform
end
end
context 'when geo_repository_verification is disabled' do
before do
stub_feature_flags(geo_repository_verification: false)
end
it 'does not schedule jobs' do
expect(Geo::RepositoryVerification::Primary::ShardWorker)
.not_to receive(:perform_async).with(healthy_shard)
subject.perform
end
end
end
def result(success, shard)
Gitlab::HealthChecks::Result.new(success, nil, { shard: shard })
end
end
require 'spec_helper'
describe Geo::RepositoryVerification::Primary::ShardWorker, :postgresql, :clean_gitlab_redis_cache do
include ::EE::GeoHelpers
let!(:primary) { create(:geo_node, :primary) }
let(:shard_name) { Gitlab.config.repositories.storages.keys.first }
before do
stub_current_geo_node(primary)
end
describe '#perform' do
before do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true }
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:renew) { true }
Gitlab::Geo::ShardHealthCache.update([shard_name])
end
it 'performs Geo::RepositoryVerification::Primary::SingleWorker for each project' do
create_list(:project, 2)
expect(Geo::RepositoryVerification::Primary::SingleWorker)
.to receive(:perform_async).twice
subject.perform(shard_name)
end
it 'performs Geo::RepositoryVerification::Primary::SingleWorker for verified projects updated recently' do
verified_project = create(:project)
repository_outdated = create(:project)
wiki_outdated = create(:project)
create(:repository_state, :repository_verified, :wiki_verified, project: verified_project)
create(:repository_state, :repository_outdated, project: repository_outdated)
create(:repository_state, :wiki_outdated, project: wiki_outdated)
expect(Geo::RepositoryVerification::Primary::SingleWorker)
.not_to receive(:perform_async).with(verified_project.id, instance_of(Time))
expect(Geo::RepositoryVerification::Primary::SingleWorker)
.to receive(:perform_async).with(repository_outdated.id, instance_of(Time))
expect(Geo::RepositoryVerification::Primary::SingleWorker)
.to receive(:perform_async).with(wiki_outdated.id, instance_of(Time))
subject.perform(shard_name)
end
it 'performs Geo::RepositoryVerification::Primary::SingleWorker for projects missing repository verification' do
missing_repository_verification = create(:project)
create(:repository_state, :wiki_verified, project: missing_repository_verification)
expect(Geo::RepositoryVerification::Primary::SingleWorker)
.to receive(:perform_async).with(missing_repository_verification.id, instance_of(Time))
subject.perform(shard_name)
end
it 'performs Geo::RepositoryVerification::Primary::SingleWorker for projects missing wiki verification' do
missing_wiki_verification = create(:project)
create(:repository_state, :repository_verified, project: missing_wiki_verification)
expect(Geo::RepositoryVerification::Primary::SingleWorker)
.to receive(:perform_async).with(missing_wiki_verification.id, instance_of(Time))
subject.perform(shard_name)
end
it 'does not perform Geo::RepositoryVerification::Primary::SingleWorker when shard becomes unhealthy' do
create(:project)
Gitlab::Geo::ShardHealthCache.update([])
expect(Geo::RepositoryVerification::Primary::SingleWorker)
.not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not perform Geo::RepositoryVerification::Primary::SingleWorker when not running on a primary' do
allow(Gitlab::Geo).to receive(:primary?) { false }
expect(Geo::RepositoryVerification::Primary::SingleWorker)
.not_to receive(:perform_async)
subject.perform(shard_name)
end
it 'does not schedule jobs when number of scheduled jobs exceeds capacity' do
create(:project)
is_expected.to receive(:scheduled_job_ids).and_return(1..1000).at_least(:once)
is_expected.not_to receive(:schedule_job)
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
it 'does not perform Geo::RepositoryVerification::Primary::SingleWorker for projects on unhealthy shards' do
healthy_unverified = create(:project)
missing_not_verified = create(:project)
missing_not_verified.update_column(:repository_storage, 'unknown')
missing_outdated = create(:project)
missing_outdated.update_column(:repository_storage, 'unknown')
create(:repository_state, :repository_outdated, project: missing_outdated)
expect(Geo::RepositoryVerification::Primary::SingleWorker)
.to receive(:perform_async).with(healthy_unverified.id, instance_of(Time))
expect(Geo::RepositoryVerification::Primary::SingleWorker)
.not_to receive(:perform_async).with(missing_not_verified.id, instance_of(Time))
expect(Geo::RepositoryVerification::Primary::SingleWorker)
.not_to receive(:perform_async).with(missing_outdated.id, instance_of(Time))
Sidekiq::Testing.inline! { subject.perform(shard_name) }
end
end
end
require 'spec_helper'
describe Geo::RepositoryVerification::Primary::SingleWorker, :postgresql, :clean_gitlab_redis_cache do
include ::EE::GeoHelpers
set(:project_with_repositories) { create(:project, :repository, :wiki_repo) }
set(:project_without_repositories) { create(:project) }
let!(:primary) { create(:geo_node, :primary) }
before do
stub_current_geo_node(primary)
end
describe '#perform' do
before do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true }
end
it 'does not calculate the checksum when not running on a primary' do
allow(Gitlab::Geo).to receive(:primary?) { false }
expect_any_instance_of(Gitlab::Git::Checksum).not_to receive(:calculate)
subject.perform(project_without_repositories.id, Time.now)
end
it 'does not calculate the checksum when project is pending deletion' do
project_with_repositories.update!(pending_delete: true)
expect_any_instance_of(Gitlab::Git::Checksum).not_to receive(:calculate)
subject.perform(project_with_repositories.id, Time.now)
end
it 'does not raise an error when project could not be found' do
expect { subject.perform(-1, Time.now) }.not_to raise_error
end
it 'calculates the checksum for unverified projects' do
subject.perform(project_with_repositories.id, Time.now)
expect(project_with_repositories.repository_state).to have_attributes(
repository_verification_checksum: instance_of(String),
last_repository_verification_failure: nil,
last_repository_verification_at: instance_of(Time),
last_repository_verification_failed: false,
wiki_verification_checksum: instance_of(String),
last_wiki_verification_failure: nil,
last_wiki_verification_at: instance_of(Time),
last_wiki_verification_failed: false
)
end
it 'calculates the checksum for outdated projects' do
repository_state =
create(:repository_state, :repository_verified, :wiki_verified,
project: project_with_repositories,
repository_verification_checksum: 'f123',
last_repository_verification_at: Time.now - 1.hour,
wiki_verification_checksum: 'e123',
last_wiki_verification_at: Time.now - 1.hour)
subject.perform(project_with_repositories.id, Time.now)
repository_state.reload
expect(repository_state.repository_verification_checksum).not_to eq 'f123'
expect(repository_state.last_repository_verification_at).to be_within(10.seconds).of(Time.now)
expect(repository_state.wiki_verification_checksum).not_to eq 'e123'
expect(repository_state.last_wiki_verification_at).to be_within(10.seconds).of(Time.now)
end
it 'does not recalculate the checksum for projects up to date' do
last_verification_at = Time.now
repository_state =
create(:repository_state, :repository_verified, :wiki_verified,
project: project_with_repositories,
repository_verification_checksum: 'f123',
last_repository_verification_at: last_verification_at,
wiki_verification_checksum: 'e123',
last_wiki_verification_at: last_verification_at)
subject.perform(project_with_repositories.id, Time.now - 1.hour)
expect(repository_state.reload).to have_attributes(
repository_verification_checksum: 'f123',
last_repository_verification_at: be_within(1.second).of(last_verification_at),
wiki_verification_checksum: 'e123',
last_wiki_verification_at: be_within(1.second).of(last_verification_at)
)
end
it 'does not calculate the wiki checksum when wiki is not enabled for project' do
project_with_repositories.update!(wiki_enabled: false)
subject.perform(project_with_repositories.id, Time.now)
expect(project_with_repositories.repository_state).to have_attributes(
repository_verification_checksum: instance_of(String),
last_repository_verification_failure: nil,
last_repository_verification_at: instance_of(Time),
last_repository_verification_failed: false,
wiki_verification_checksum: nil,
last_wiki_verification_failure: nil,
last_wiki_verification_at: nil,
last_wiki_verification_failed: false
)
end
it 'keeps track of failures when calculating the repository checksum' do
subject.perform(project_without_repositories.id, Time.now)
expect(project_without_repositories.repository_state).to have_attributes(
repository_verification_checksum: nil,
last_repository_verification_failure: /No repository for such path/,
last_repository_verification_at: instance_of(Time),
last_repository_verification_failed: true,
wiki_verification_checksum: nil,
last_wiki_verification_failure: /No repository for such path/,
last_wiki_verification_at: instance_of(Time),
last_wiki_verification_failed: true
)
end
end
end
......@@ -298,6 +298,7 @@ project:
- container_repositories
- uploads
- mirror_data
- repository_state
- source_pipelines
- sourced_pipelines
- members_and_requesters
......
......@@ -114,6 +114,18 @@ describe PostReceive do
end
end
describe '#process_wiki_changes' do
let(:gl_repository) { "wiki-#{project.id}" }
it 'updates project activity' do
described_class.new.perform(gl_repository, key_id, base64_changes)
expect { project.reload }
.to change(project, :last_activity_at)
.and change(project, :last_repository_updated_at)
end
end
context "webhook" do
it "fetches the correct project" do
expect(Project).to receive(:find_by).with(id: project.id.to_s)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment