Commit 5b4bd7a5 authored by Aakriti Gupta's avatar Aakriti Gupta Committed by Michael Kozono

Geo: Refactor verification related concerns

- Create VerifiableModel concern to break down
VerificationState concern such that
VerifiableModel holds common code to be used for
verification on both primary and secondary, which
VerificationState is specifically for checksumming on primary.
parent 356491bd
......@@ -175,8 +175,8 @@ That's all of the required database changes.
#### Step 1. Implement replication and verification
- [ ] Add the following lines to the `cool_widget` model to accomplish some important tasks:
- Include `Gitlab::Geo::ReplicableModel` in the `CoolWidget` class, and specify the Replicator class `with_replicator Geo::CoolWidgetReplicator`.
- Include the `::Gitlab::Geo::VerificationState` concern.
- Include `::Geo::ReplicableModel` in the `CoolWidget` class, and specify the Replicator class `with_replicator Geo::CoolWidgetReplicator`.
- Include the `::Geo::VerifiableModel` concern.
- Delegate verification related methods to the `cool_widget_state` model.
- For verification, override some scopes to use the `cool_widget_states` table instead of the model table.
- Implement the `verification_state_object` method to return the object that holds
......@@ -192,8 +192,8 @@ That's all of the required database changes.
class CoolWidget < ApplicationRecord
...
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator Geo::CoolWidgetReplicator
......
......@@ -179,8 +179,8 @@ That's all of the required database changes.
#### Step 1. Implement replication and verification
- [ ] Add the following lines to the `cool_widget` model to accomplish some important tasks:
- Include `Gitlab::Geo::ReplicableModel` in the `CoolWidget` class, and specify the Replicator class `with_replicator Geo::CoolWidgetReplicator`.
- Include the `::Gitlab::Geo::VerificationState` concern.
- Include `::Geo::ReplicableModel` in the `CoolWidget` class, and specify the Replicator class `with_replicator Geo::CoolWidgetReplicator`.
- Include the `::Geo::VerifiableModel` concern.
- Delegate verification related methods to the `cool_widget_state` model.
- For verification, override some scopes to use the `cool_widget_states` table instead of the model table.
- Implement the `verification_state_object` method to return the object that holds
......@@ -194,8 +194,8 @@ That's all of the required database changes.
class CoolWidget < ApplicationRecord
...
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator Geo::CoolWidgetReplicator
......
......@@ -117,7 +117,7 @@ the model code:
```ruby
class Packages::PackageFile < ApplicationRecord
include ::Gitlab::Geo::ReplicableModel
include ::Geo::ReplicableModel
with_replicator Geo::PackageFileReplicator
end
......
# frozen_string_literal: true
module Geo
module ReplicableModel
extend ActiveSupport::Concern
include Checksummable
included do
# If this hook turns out not to apply to all Models, perhaps we should extract a `ReplicableBlobModel`
after_create_commit -> { replicator.handle_after_create_commit if replicator.respond_to?(:handle_after_create_commit) }
after_destroy -> { replicator.handle_after_destroy if replicator.respond_to?(:handle_after_destroy) }
# Temporarily defining `verification_succeeded` and
# `verification_failed` for unverified models while verification is
# under development to avoid breaking GeoNodeStatusCheck code.
# TODO: Remove these after including `::Geo::VerificationState` on
# all models. https://gitlab.com/gitlab-org/gitlab/-/issues/280768
scope :verification_succeeded, -> { none }
scope :verification_failed, -> { none }
# These scopes are intended to be overridden as needed
scope :available_replicables, -> { all }
# On primary, `verifiables` are records that can be checksummed and/or are replicable.
# On secondary, `verifiables` are records that have already been replicated
# and (ideally) have been checksummed on the primary
scope :verifiables, -> { self.respond_to?(:with_files_stored_locally) ? available_replicables.with_files_stored_locally : available_replicables }
# When storing verification details in the same table as the model,
# the scope `available_verifiables` returns only those records
# that are eligible for verification, i.e. the same as the scope
# `verifiables`.
# When using a separate table to store verification details,
# the scope `available_verifiables` should return all records
# from the separate table because the separate table will
# always only have records corresponding to replicables that are verifiable.
# For this, override the scope in the replicable model, e.g. like so in
# `MergeRequestDiff`,
# `scope :available_verifiables, -> { joins(:merge_request_diff_detail) }`
scope :available_verifiables, -> { verifiables }
end
class_methods do
# Associate current model with specified replicator
#
# @param [Gitlab::Geo::Replicator] klass
def with_replicator(klass)
raise ArgumentError, 'Must be a class inheriting from Gitlab::Geo::Replicator' unless klass < ::Gitlab::Geo::Replicator
class_eval <<-RUBY, __FILE__, __LINE__ + 1
define_method :replicator do
@_replicator ||= klass.new(model_record: self)
end
define_singleton_method :replicator_class do
@_replicator_class ||= klass
end
RUBY
end
end
# Geo Replicator
#
# @abstract
# @return [Gitlab::Geo::Replicator]
def replicator
raise NotImplementedError, 'There is no Replicator defined for this model'
end
def in_replicables_for_current_secondary?
self.class.replicables_for_current_secondary(self).exists?
end
end
end
# frozen_string_literal: true
module Geo
# This concern is included on Model classes (as opposed to Registry classes)
# to manage their verification states. Note that this concern does not handle
# how verification is performed; see `VerifiableReplicator`.
#
# It handles both cases where verification state is stored in a separate
# table or when it is stored in the same table as the model.
module VerifiableModel
extend ActiveSupport::Concern
include ::Geo::VerificationState
included do
def save_verification_details
return unless self.class.separate_verification_state_table?
return unless self.class.verifiables.primary_key_in(self).exists?
# During a transaction, `verification_state_object` could be built before
# a value for `verification_state_model_key` exists. So we check for that
# before saving the `verification_state_object`
unless verification_state_object.persisted?
verification_state_object[self.class.verification_state_model_key] = self.id
end
verification_state_object.save!
end
# Implement this method in the class that includes this concern to specify
# a different ActiveRecord association name that stores the verification state
# See module EE::MergeRequestDiff for example
def verification_state_object
raise NotImplementedError if self.class.separate_verification_state_table?
self
end
end
class_methods do
include Delay
def pluck_verification_details_ids_in_range(range)
verification_state_table_class
.where(self.verification_state_model_key => range)
.pluck(self.verification_state_model_key)
end
def pluck_verifiable_ids_in_range(range)
self
.verifiables
.primary_key_in(range)
.pluck_primary_key
end
# @return whether primary checksum data is stored in a table separate
# from the model table
def separate_verification_state_table?
verification_state_table_name != table_name
end
end
end
end
......@@ -4,13 +4,13 @@ module Geo
module VerifiableRegistry
extend ActiveSupport::Concern
extend ::Gitlab::Utils::Override
include ::Gitlab::Geo::VerificationState
include ::Geo::VerificationState
class_methods do
extend ::Gitlab::Utils::Override
# Overrides a method in `Gitlab::Geo::VerificationState`. This method is
# used by `Gitlab::Geo::VerificationState.start_verification_batch` to
# Overrides a method in `::Geo::VerificationState`. This method is
# used by `::Geo::VerificationState.start_verification_batch` to
# produce a query which must return values of the primary key of the
# *model*, not of the *registry*. We need this so we can instantiate
# Replicators.
......
# frozen_string_literal: true
module Geo
# This concern is included on VerifiableModel and on VerifiableRegistry to
# manage their verification fields.
module VerificationState
extend ActiveSupport::Concern
include ::ShaAttribute
include Delay
include EachBatch
include Gitlab::Geo::LogHelpers
VERIFICATION_STATE_VALUES = {
verification_pending: 0,
verification_started: 1,
verification_succeeded: 2,
verification_failed: 3
}.freeze
VERIFICATION_TIMEOUT = 8.hours
included do
sha_attribute :verification_checksum
scope :verification_pending, -> { available_verifiables.with_verification_state(:verification_pending) }
scope :verification_started, -> { available_verifiables.with_verification_state(:verification_started) }
scope :verification_succeeded, -> { available_verifiables.with_verification_state(:verification_succeeded) }
scope :verification_failed, -> { available_verifiables.with_verification_state(:verification_failed) }
scope :checksummed, -> { where.not(verification_checksum: nil) }
scope :not_checksummed, -> { where(verification_checksum: nil) }
scope :verification_timed_out, -> { available_verifiables.where(verification_arel_table[:verification_state].eq(1)).where(verification_arel_table[:verification_started_at].lt(VERIFICATION_TIMEOUT.ago)) }
scope :verification_retry_due, -> { where(verification_arel_table[:verification_retry_at].eq(nil).or(verification_arel_table[:verification_retry_at].lt(Time.current))) }
scope :needs_verification, -> { available_verifiables.merge(with_verification_state(:verification_pending).or(with_verification_state(:verification_failed).verification_retry_due)) }
scope :needs_reverification, -> { verification_succeeded.where("verified_at < ?", ::Gitlab::Geo.current_node.minimum_reverification_interval.days.ago) }
state_machine :verification_state, initial: :verification_pending do
state :verification_pending, value: VERIFICATION_STATE_VALUES[:verification_pending]
state :verification_started, value: VERIFICATION_STATE_VALUES[:verification_started]
state :verification_succeeded, value: VERIFICATION_STATE_VALUES[:verification_succeeded] do
validates :verification_checksum, presence: true
end
state :verification_failed, value: VERIFICATION_STATE_VALUES[:verification_failed] do
validates :verification_failure, presence: true
end
before_transition any => :verification_started do |instance, _|
instance.verification_started_at = Time.current
end
before_transition [:verification_pending, :verification_started, :verification_succeeded] => :verification_pending do |instance, _|
instance.clear_verification_failure_fields!
end
before_transition verification_failed: :verification_pending do |instance, _|
# If transitioning from verification_failed, then don't clear
# verification_retry_count and verification_retry_at to ensure
# progressive backoff of syncs-due-to-verification-failures
instance.verification_failure = nil
end
before_transition any => :verification_failed do |instance, _|
instance.before_verification_failed
end
before_transition any => :verification_succeeded do |instance, _|
instance.verified_at = Time.current
instance.clear_verification_failure_fields!
end
event :verification_started do
transition [:verification_pending, :verification_started, :verification_succeeded, :verification_failed] => :verification_started
end
event :verification_succeeded do
transition verification_started: :verification_succeeded
end
event :verification_failed do
transition [:verification_pending, :verification_started, :verification_succeeded, :verification_failed] => :verification_failed
end
event :verification_pending do
transition [:verification_pending, :verification_started, :verification_succeeded, :verification_failed] => :verification_pending
end
end
private_class_method :start_verification_batch
private_class_method :start_verification_batch_query
private_class_method :start_verification_batch_subselect
end
class_methods do
include Delay
def verification_state_value(state_string)
VERIFICATION_STATE_VALUES[state_string]
end
# Returns IDs of records that are pending verification.
#
# Atomically marks those records "verification_started" in the same DB
# query.
#
def verification_pending_batch(batch_size:)
relation = verification_pending.order(Gitlab::Database.nulls_first_order(:verified_at)).limit(batch_size)
start_verification_batch(relation)
end
# Returns IDs of records that failed to verify (calculate and save checksum).
#
# Atomically marks those records "verification_started" in the same DB
# query.
#
def verification_failed_batch(batch_size:)
relation = verification_failed.verification_retry_due.order(Gitlab::Database.nulls_first_order(:verification_retry_at)).limit(batch_size)
start_verification_batch(relation)
end
# @return [Integer] number of records that need verification
def needs_verification_count(limit:)
needs_verification.limit(limit).count
end
# @return [Integer] number of records that need reverification
def needs_reverification_count(limit:)
needs_reverification.limit(limit).count
end
# Atomically marks the records as verification_started, with a
# verification_started_at time, and returns the primary key of each
# updated row. This allows VerificationBatchWorker to concurrently get
# unique batches of primary keys to process.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [Array<Integer>] primary key of each updated row
def start_verification_batch(relation)
query = start_verification_batch_query(relation)
# This query performs a write, so we need to wrap it in a transaction
# to stick to the primary database.
self.transaction do
self.connection.execute(query).to_a.map do |row|
row[self.verification_state_model_key.to_s]
end
end
end
# Returns a SQL statement which would update all the rows in the
# relation as verification_started, with a verification_started_at time,
# and returns the primary key of each updated row.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [String] SQL statement which would update all and return primary key of each row
def start_verification_batch_query(relation)
started_enum_value = VERIFICATION_STATE_VALUES[:verification_started]
<<~SQL.squish
UPDATE #{verification_state_table_name}
SET "verification_state" = #{started_enum_value},
"verification_started_at" = NOW()
WHERE #{self.verification_state_model_key} IN (#{start_verification_batch_subselect(relation).to_sql})
RETURNING #{self.verification_state_model_key}
SQL
end
# This query locks the rows during the transaction, and skips locked
# rows so that this query can be run concurrently, safely and reasonably
# efficiently.
# See https://gitlab.com/gitlab-org/gitlab/-/issues/300051#note_496889565
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [String] SQL statement which selects the primary keys to update
def start_verification_batch_subselect(relation)
relation
.select(self.verification_state_model_key)
.lock('FOR UPDATE SKIP LOCKED')
end
# Override this method in the class that includes this concern to specify
# a different ActiveRecord class to store verification state
# See module EE::MergeRequestDiff for example
def verification_state_table_class
self
end
# Overridden in ReplicableRegistry
def verification_state_model_key
verification_state_table_class.primary_key
end
def verification_state_table_name
verification_state_table_class.table_name
end
def verification_arel_table
verification_state_table_class.arel_table
end
def verification_timed_out_batch_query
return verification_timed_out unless separate_verification_state_table?
verification_state_table_class.where(self.verification_state_model_key => verification_timed_out)
end
# Fail verification for records which started verification a long time ago
def fail_verification_timeouts
attrs = {
verification_state: verification_state_value(:verification_failed),
verification_failure: "Verification timed out after #{VERIFICATION_TIMEOUT}",
verification_checksum: nil,
verification_retry_count: 1,
verification_retry_at: next_retry_time(1),
verified_at: Time.current
}
verification_timed_out_batch_query.each_batch do |relation|
relation.update_all(attrs)
end
end
# Reverifies batch and returns the number of records.
#
# Atomically marks those records "verification_pending" in the same DB
# query.
#
def reverify_batch(batch_size:)
relation = reverification_batch_relation(batch_size: batch_size)
mark_as_verification_pending(relation)
end
# Returns IDs of records that need re-verification.
#
# Atomically marks those records "verification_pending" in the same DB
# query.
def reverification_batch_relation(batch_size:)
needs_reverification.order(:verified_at).limit(batch_size)
end
# Atomically marks the records as verification_pending.
# Returns the number of records set to be referified.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [Integer] number of records
def mark_as_verification_pending(relation)
query = mark_as_verification_pending_query(relation)
self.connection.execute(query).cmd_tuples
end
# Returns a SQL statement which would update all the rows in the
# relation as verification_pending
# and returns the number of updated rows.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [String] SQL statement which would update all and return the number of rows
def mark_as_verification_pending_query(relation)
pending_enum_value = VERIFICATION_STATE_VALUES[:verification_pending]
<<~SQL.squish
UPDATE #{verification_state_table_name}
SET "verification_state" = #{pending_enum_value}
WHERE #{self.verification_state_model_key} IN (#{relation.select(self.verification_state_model_key).to_sql})
SQL
end
end
# Overridden by Geo::VerifiableRegistry
def clear_verification_failure_fields!
self.verification_retry_count = 0
self.verification_retry_at = nil
self.verification_failure = nil
end
# Overridden by Geo::VerifiableRegistry
def before_verification_failed
self.verification_retry_count ||= 0
self.verification_retry_count += 1
self.verification_retry_at = self.next_retry_time(self.verification_retry_count)
self.verified_at = Time.current
end
# Provides a safe and easy way to manage the verification state for a
# synchronous checksum calculation.
#
# @yieldreturn [String] calculated checksum value
def track_checksum_attempt!(&block)
# This line only applies to Geo::VerificationWorker, not
# Geo::VerificationBatchWorker, since the latter sets the whole batch to
# "verification_started" in the same DB query that fetches the batch.
verification_started! unless verification_started?
calculation_started_at = Time.current
checksum = yield
track_checksum_result!(checksum, calculation_started_at)
rescue StandardError => e
# Reset any potential changes from track_checksum_result, i.e.
# verification_retry_count may have been cleared.
reset
verification_failed_with_message!('Error during verification', e)
end
# Convenience method to update checksum and transition to success state.
#
# @param [String] checksum value generated by the checksum routine
# @param [DateTime] calculation_started_at the moment just before the
# checksum routine was called
def verification_succeeded_with_checksum!(checksum, calculation_started_at)
self.verification_checksum = checksum
self.verification_succeeded!
if resource_updated_during_checksum?(calculation_started_at)
# just let backfill pick it up
self.verification_pending!
elsif Gitlab::Geo.primary?
self.replicator.handle_after_checksum_succeeded
end
end
# Convenience method to update failure message and transition to failed
# state.
#
# @param [String] message error information
# @param [StandardError] error exception
def verification_failed_with_message!(message, error = nil)
log_error(message, error)
self.verification_failure = message
self.verification_failure += ": #{error.message}" if error.respond_to?(:message)
self.verification_failure.truncate(255)
self.verification_checksum = nil
self.verification_failed!
end
private
# Records the calculated checksum result
#
# Overridden by ReplicableRegistry so it can also compare with primary
# checksum.
#
# @param [String] calculated checksum value
# @param [Time] when checksum calculation was started
def track_checksum_result!(checksum, calculation_started_at)
verification_succeeded_with_checksum!(checksum, calculation_started_at)
end
def resource_updated_during_checksum?(calculation_started_at)
self.reset.verification_started_at > calculation_started_at
end
end
end
......@@ -6,8 +6,8 @@ module EE
extend ActiveSupport::Concern
prepended do
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator ::Geo::PipelineArtifactReplicator
end
......
......@@ -11,8 +11,8 @@ module EE
STORE_COLUMN = :file_store
prepended do
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator ::Geo::LfsObjectReplicator
......
......@@ -5,9 +5,9 @@ module EE
extend ActiveSupport::Concern
prepended do
include ::Gitlab::Geo::ReplicableModel
include ::Geo::ReplicableModel
include ObjectStorable
include ::Gitlab::Geo::VerificationState
include ::Geo::VerifiableModel
STORE_COLUMN = :external_diff_store
......
......@@ -6,8 +6,8 @@ module EE
extend ActiveSupport::Concern
prepended do
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator ::Geo::PackageFileReplicator
end
......
......@@ -5,8 +5,8 @@ module EE
extend ActiveSupport::Concern
prepended do
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator ::Geo::PagesDeploymentReplicator
......
......@@ -5,8 +5,8 @@ module EE
extend ActiveSupport::Concern
prepended do
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
include FromUnion
with_replicator ::Geo::SnippetRepositoryReplicator
......
......@@ -6,8 +6,8 @@ module EE
extend ActiveSupport::Concern
prepended do
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator ::Geo::TerraformStateVersionReplicator
......
......@@ -10,8 +10,8 @@ module EE
prepended do
include ::Gitlab::SQL::Pattern
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator ::Geo::UploadReplicator
......
# frozen_string_literal: true
class GroupWikiRepository < ApplicationRecord
include ::Gitlab::Geo::ReplicableModel
include ::Geo::ReplicableModel
include EachBatch
include Shardable
......
# frozen_string_literal: true
module Gitlab
module Geo
module ReplicableModel
extend ActiveSupport::Concern
include Checksummable
included do
# If this hook turns out not to apply to all Models, perhaps we should extract a `ReplicableBlobModel`
after_create_commit -> { replicator.handle_after_create_commit if replicator.respond_to?(:handle_after_create_commit) }
after_destroy -> { replicator.handle_after_destroy if replicator.respond_to?(:handle_after_destroy) }
# Temporarily defining `verification_succeeded` and
# `verification_failed` for unverified models while verification is
# under development to avoid breaking GeoNodeStatusCheck code.
# TODO: Remove these after including `Gitlab::Geo::VerificationState` on
# all models. https://gitlab.com/gitlab-org/gitlab/-/issues/280768
scope :verification_succeeded, -> { none }
scope :verification_failed, -> { none }
# These scopes are intended to be overridden as needed
scope :available_replicables, -> { all }
# On primary, `verifiables` are records that can be checksummed and/or are replicable.
# On secondary, `verifiables` are records that have already been replicated
# and (ideally) have been checksummed on the primary
scope :verifiables, -> { self.respond_to?(:with_files_stored_locally) ? available_replicables.with_files_stored_locally : available_replicables }
# When storing verification details in the same table as the model,
# the scope `available_verifiables` returns only those records
# that are eligible for verification, i.e. the same as the scope
# `verifiables`.
# When using a separate table to store verification details,
# the scope `available_verifiables` should return all records
# from the separate table because the separate table will
# always only have records corresponding to replicables that are verifiable.
# For this, override the scope in the replicable model, e.g. like so in
# `MergeRequestDiff`,
# `scope :available_verifiables, -> { joins(:merge_request_diff_detail) }`
scope :available_verifiables, -> { verifiables }
end
class_methods do
# Associate current model with specified replicator
#
# @param [Gitlab::Geo::Replicator] klass
def with_replicator(klass)
raise ArgumentError, 'Must be a class inheriting from Gitlab::Geo::Replicator' unless klass < ::Gitlab::Geo::Replicator
class_eval <<-RUBY, __FILE__, __LINE__ + 1
define_method :replicator do
@_replicator ||= klass.new(model_record: self)
end
define_singleton_method :replicator_class do
@_replicator_class ||= klass
end
RUBY
end
end
# Geo Replicator
#
# @abstract
# @return [Gitlab::Geo::Replicator]
def replicator
raise NotImplementedError, 'There is no Replicator defined for this model'
end
def in_replicables_for_current_secondary?
self.class.replicables_for_current_secondary(self).exists?
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
# This concern is included on ActiveRecord classes to manage their
# verification fields. This concern does not handle how verification is
# performed.
#
# This is a separate concern from Gitlab::Geo::ReplicableModel because e.g.
# MergeRequestDiff stores its verification state in a separate table with
# the association to MergeRequestDiffDetail.
module VerificationState
extend ActiveSupport::Concern
include ::ShaAttribute
include Delay
include EachBatch
include Gitlab::Geo::LogHelpers
VERIFICATION_STATE_VALUES = {
verification_pending: 0,
verification_started: 1,
verification_succeeded: 2,
verification_failed: 3
}.freeze
VERIFICATION_TIMEOUT = 8.hours
included do
sha_attribute :verification_checksum
# rubocop:disable CodeReuse/ActiveRecord
scope :verification_pending, -> { available_verifiables.with_verification_state(:verification_pending) }
scope :verification_started, -> { available_verifiables.with_verification_state(:verification_started) }
scope :verification_succeeded, -> { available_verifiables.with_verification_state(:verification_succeeded) }
scope :verification_failed, -> { available_verifiables.with_verification_state(:verification_failed) }
scope :checksummed, -> { where.not(verification_checksum: nil) }
scope :not_checksummed, -> { where(verification_checksum: nil) }
scope :verification_timed_out, -> { available_verifiables.where(verification_arel_table[:verification_state].eq(1)).where(verification_arel_table[:verification_started_at].lt(VERIFICATION_TIMEOUT.ago)) }
scope :verification_retry_due, -> { where(verification_arel_table[:verification_retry_at].eq(nil).or(verification_arel_table[:verification_retry_at].lt(Time.current))) }
scope :needs_verification, -> { available_verifiables.merge(with_verification_state(:verification_pending).or(with_verification_state(:verification_failed).verification_retry_due)) }
scope :needs_reverification, -> { verification_succeeded.where("verified_at < ?", ::Gitlab::Geo.current_node.minimum_reverification_interval.days.ago) }
# rubocop:enable CodeReuse/ActiveRecord
state_machine :verification_state, initial: :verification_pending do
state :verification_pending, value: VERIFICATION_STATE_VALUES[:verification_pending]
state :verification_started, value: VERIFICATION_STATE_VALUES[:verification_started]
state :verification_succeeded, value: VERIFICATION_STATE_VALUES[:verification_succeeded] do
validates :verification_checksum, presence: true
end
state :verification_failed, value: VERIFICATION_STATE_VALUES[:verification_failed] do
validates :verification_failure, presence: true
end
before_transition any => :verification_started do |instance, _|
instance.verification_started_at = Time.current
end
before_transition [:verification_pending, :verification_started, :verification_succeeded] => :verification_pending do |instance, _|
instance.clear_verification_failure_fields!
end
before_transition verification_failed: :verification_pending do |instance, _|
# If transitioning from verification_failed, then don't clear
# verification_retry_count and verification_retry_at to ensure
# progressive backoff of syncs-due-to-verification-failures
instance.verification_failure = nil
end
before_transition any => :verification_failed do |instance, _|
instance.before_verification_failed
end
before_transition any => :verification_succeeded do |instance, _|
instance.verified_at = Time.current
instance.clear_verification_failure_fields!
end
event :verification_started do
transition [:verification_pending, :verification_started, :verification_succeeded, :verification_failed] => :verification_started
end
event :verification_succeeded do
transition verification_started: :verification_succeeded
end
event :verification_failed do
transition [:verification_pending, :verification_started, :verification_succeeded, :verification_failed] => :verification_failed
end
event :verification_pending do
transition [:verification_pending, :verification_started, :verification_succeeded, :verification_failed] => :verification_pending
end
end
def save_verification_details
return unless self.class.separate_verification_state_table?
return unless self.class.verifiables.primary_key_in(self).exists?
# During a transaction, `verification_state_object` could be built before
# a value for `verification_state_model_key` exists. So we check for that
# before saving the `verification_state_object`
unless verification_state_object.persisted?
verification_state_object[self.class.verification_state_model_key] = self.id
end
verification_state_object.save!
end
# Implement this method in the class that includes this concern to specify
# a different ActiveRecord association name that stores the verification state
# See module EE::MergeRequestDiff for example
def verification_state_object
raise NotImplementedError if self.class.separate_verification_state_table?
self
end
private_class_method :start_verification_batch
private_class_method :start_verification_batch_query
private_class_method :start_verification_batch_subselect
end
class_methods do
include Delay
def verification_state_value(state_string)
VERIFICATION_STATE_VALUES[state_string]
end
# Returns IDs of records that are pending verification.
#
# Atomically marks those records "verification_started" in the same DB
# query.
#
def verification_pending_batch(batch_size:)
relation = verification_pending.order(Gitlab::Database.nulls_first_order(:verified_at)).limit(batch_size) # rubocop:disable CodeReuse/ActiveRecord
start_verification_batch(relation)
end
# Returns IDs of records that failed to verify (calculate and save checksum).
#
# Atomically marks those records "verification_started" in the same DB
# query.
#
def verification_failed_batch(batch_size:)
relation = verification_failed.verification_retry_due.order(Gitlab::Database.nulls_first_order(:verification_retry_at)).limit(batch_size) # rubocop:disable CodeReuse/ActiveRecord
start_verification_batch(relation)
end
# @return [Integer] number of records that need verification
def needs_verification_count(limit:)
needs_verification.limit(limit).count # rubocop:disable CodeReuse/ActiveRecord
end
# @return [Integer] number of records that need reverification
def needs_reverification_count(limit:)
needs_reverification.limit(limit).count # rubocop:disable CodeReuse/ActiveRecord
end
# Atomically marks the records as verification_started, with a
# verification_started_at time, and returns the primary key of each
# updated row. This allows VerificationBatchWorker to concurrently get
# unique batches of primary keys to process.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [Array<Integer>] primary key of each updated row
def start_verification_batch(relation)
query = start_verification_batch_query(relation)
# This query performs a write, so we need to wrap it in a transaction
# to stick to the primary database.
self.transaction do
self.connection.execute(query).to_a.map do |row|
row[self.verification_state_model_key.to_s]
end
end
end
# Returns a SQL statement which would update all the rows in the
# relation as verification_started, with a verification_started_at time,
# and returns the primary key of each updated row.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [String] SQL statement which would update all and return primary key of each row
def start_verification_batch_query(relation)
started_enum_value = VERIFICATION_STATE_VALUES[:verification_started]
<<~SQL.squish
UPDATE #{verification_state_table_name}
SET "verification_state" = #{started_enum_value},
"verification_started_at" = NOW()
WHERE #{self.verification_state_model_key} IN (#{start_verification_batch_subselect(relation).to_sql})
RETURNING #{self.verification_state_model_key}
SQL
end
# This query locks the rows during the transaction, and skips locked
# rows so that this query can be run concurrently, safely and reasonably
# efficiently.
# See https://gitlab.com/gitlab-org/gitlab/-/issues/300051#note_496889565
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [String] SQL statement which selects the primary keys to update
def start_verification_batch_subselect(relation)
relation
.select(self.verification_state_model_key)
.lock('FOR UPDATE SKIP LOCKED') # rubocop:disable CodeReuse/ActiveRecord
end
# Override this method in the class that includes this concern to specify
# a different ActiveRecord class to store verification state
# See module EE::MergeRequestDiff for example
def verification_state_table_class
self
end
# Overridden in ReplicableRegistry
def verification_state_model_key
verification_state_table_class.primary_key
end
def verification_state_table_name
verification_state_table_class.table_name
end
def verification_arel_table
verification_state_table_class.arel_table
end
# rubocop:disable CodeReuse/ActiveRecord
def verification_timed_out_batch_query
return verification_timed_out unless separate_verification_state_table?
verification_state_table_class.where(self.verification_state_model_key => verification_timed_out)
end
# rubocop:enable CodeReuse/ActiveRecord
# Fail verification for records which started verification a long time ago
def fail_verification_timeouts
attrs = {
verification_state: verification_state_value(:verification_failed),
verification_failure: "Verification timed out after #{VERIFICATION_TIMEOUT}",
verification_checksum: nil,
verification_retry_count: 1,
verification_retry_at: next_retry_time(1),
verified_at: Time.current
}
verification_timed_out_batch_query.each_batch do |relation|
relation.update_all(attrs)
end
end
# Reverifies batch and returns the number of records.
#
# Atomically marks those records "verification_pending" in the same DB
# query.
#
def reverify_batch(batch_size:)
relation = reverification_batch_relation(batch_size: batch_size)
mark_as_verification_pending(relation)
end
# Returns IDs of records that need re-verification.
#
# Atomically marks those records "verification_pending" in the same DB
# query.
#
# rubocop:disable CodeReuse/ActiveRecord
def reverification_batch_relation(batch_size:)
needs_reverification.order(:verified_at).limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# Atomically marks the records as verification_pending.
# Returns the number of records set to be referified.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [Integer] number of records
def mark_as_verification_pending(relation)
query = mark_as_verification_pending_query(relation)
self.connection.execute(query).cmd_tuples
end
# Returns a SQL statement which would update all the rows in the
# relation as verification_pending
# and returns the number of updated rows.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [String] SQL statement which would update all and return the number of rows
def mark_as_verification_pending_query(relation)
pending_enum_value = VERIFICATION_STATE_VALUES[:verification_pending]
<<~SQL.squish
UPDATE #{verification_state_table_name}
SET "verification_state" = #{pending_enum_value}
WHERE #{self.verification_state_model_key} IN (#{relation.select(self.verification_state_model_key).to_sql})
SQL
end
# rubocop:disable CodeReuse/ActiveRecord
def pluck_verification_details_ids_in_range(range)
verification_state_table_class
.where(self.verification_state_model_key => range)
.pluck(self.verification_state_model_key)
end
# rubocop:enable CodeReuse/ActiveRecord
def pluck_verifiable_ids_in_range(range)
self
.verifiables
.primary_key_in(range)
.pluck_primary_key
end
# @return whether primary checksum data is stored in a table separate
# from the model table
def separate_verification_state_table?
verification_state_table_name != table_name
end
end
# Overridden by Geo::VerifiableRegistry
def clear_verification_failure_fields!
self.verification_retry_count = 0
self.verification_retry_at = nil
self.verification_failure = nil
end
# Overridden by Geo::VerifiableRegistry
def before_verification_failed
self.verification_retry_count ||= 0
self.verification_retry_count += 1
self.verification_retry_at = self.next_retry_time(self.verification_retry_count)
self.verified_at = Time.current
end
# Provides a safe and easy way to manage the verification state for a
# synchronous checksum calculation.
#
# @yieldreturn [String] calculated checksum value
def track_checksum_attempt!(&block)
# This line only applies to Geo::VerificationWorker, not
# Geo::VerificationBatchWorker, since the latter sets the whole batch to
# "verification_started" in the same DB query that fetches the batch.
verification_started! unless verification_started?
calculation_started_at = Time.current
checksum = yield
track_checksum_result!(checksum, calculation_started_at)
rescue StandardError => e
# Reset any potential changes from track_checksum_result, i.e.
# verification_retry_count may have been cleared.
reset
verification_failed_with_message!('Error during verification', e)
end
# Convenience method to update checksum and transition to success state.
#
# @param [String] checksum value generated by the checksum routine
# @param [DateTime] calculation_started_at the moment just before the
# checksum routine was called
def verification_succeeded_with_checksum!(checksum, calculation_started_at)
self.verification_checksum = checksum
self.verification_succeeded!
if resource_updated_during_checksum?(calculation_started_at)
# just let backfill pick it up
self.verification_pending!
elsif Gitlab::Geo.primary?
self.replicator.handle_after_checksum_succeeded
end
end
# Convenience method to update failure message and transition to failed
# state.
#
# @param [String] message error information
# @param [StandardError] error exception
def verification_failed_with_message!(message, error = nil)
log_error(message, error)
self.verification_failure = message
self.verification_failure += ": #{error.message}" if error.respond_to?(:message)
self.verification_failure.truncate(255)
self.verification_checksum = nil
self.verification_failed!
end
private
# Records the calculated checksum result
#
# Overridden by ReplicableRegistry so it can also compare with primary
# checksum.
#
# @param [String] calculated checksum value
# @param [Time] when checksum calculation was started
def track_checksum_result!(checksum, calculation_started_at)
verification_succeeded_with_checksum!(checksum, calculation_started_at)
end
def resource_updated_during_checksum?(calculation_started_at)
self.reset.verification_started_at > calculation_started_at
end
end
end
end
......@@ -8,7 +8,7 @@ require 'spec_helper'
# against a DummyModel.
# - Place tests in replicable_model_shared_examples.rb if you want them to be
# run against every real Model.
RSpec.describe Gitlab::Geo::ReplicableModel do
RSpec.describe Geo::ReplicableModel do
include ::EE::GeoHelpers
let_it_be(:primary_node) { create(:geo_node, :primary) }
......@@ -42,6 +42,17 @@ RSpec.describe Gitlab::Geo::ReplicableModel do
it 'instantiates a replicator into the model' do
expect(subject.replicator).to be_a(Geo::DummyReplicator)
end
context 'when replicator is not defined in inheriting class' do
before do
stub_const('DummyModel', Class.new(ApplicationRecord))
DummyModel.class_eval { include ::Geo::ReplicableModel }
end
it 'raises NotImplementedError' do
expect { DummyModel.new.replicator }.to raise_error(NotImplementedError)
end
end
end
describe '#in_replicables_for_current_secondary?' do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::VerifiableModel do
include ::EE::GeoHelpers
context 'when separate table is used for verification state' do
before(:all) do
create_dummy_model_with_separate_state_table
end
after(:all) do
drop_dummy_model_with_separate_state_table
end
before do
stub_dummy_replicator_class(model_class: 'TestDummyModelWithSeparateState')
stub_dummy_model_with_separate_state_class
end
subject { TestDummyModelWithSeparateState.new }
describe '.verification_state_model_key' do
it 'returns the primary key of the state model' do
expect(subject.class.verification_state_model_key).to eq(TestDummyModelState.primary_key)
end
end
end
context 'when separate table is not used for verification state' do
before(:all) do
create_dummy_model_table
end
after(:all) do
drop_dummy_model_table
end
before do
stub_dummy_replicator_class
stub_dummy_model_class
end
subject { DummyModel.new }
describe '.verification_state_object' do
it 'returns self' do
expect(subject.verification_state_object.id).to eq(subject.id)
end
end
describe '.verification_state_model_key' do
it 'returns the primary key of the model' do
expect(subject.class.verification_state_model_key).to eq(DummyModel.primary_key)
end
end
end
end
......@@ -2,7 +2,7 @@
require 'spec_helper'
RSpec.describe Gitlab::Geo::VerificationState do
RSpec.describe Geo::VerificationState do
include ::EE::GeoHelpers
let_it_be(:primary_node) { create(:geo_node, :primary) }
......@@ -434,7 +434,7 @@ RSpec.describe Gitlab::Geo::VerificationState do
end
before do
stub_dummy_replicator_class(model_class: 'DummyModelWithSeparateState')
stub_dummy_replicator_class(model_class: 'TestDummyModelWithSeparateState')
stub_dummy_model_with_separate_state_class
end
......
......@@ -79,8 +79,8 @@ module EE
stub_const('DummyModel', Class.new(ApplicationRecord))
DummyModel.class_eval do
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator Geo::DummyReplicator
......@@ -170,8 +170,8 @@ module EE
TestDummyModelWithSeparateState.class_eval do
self.table_name = '_test_dummy_model_with_separate_states'
include ::Gitlab::Geo::ReplicableModel
include ::Gitlab::Geo::VerificationState
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator Geo::DummyReplicator
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment