Commit 052c1209 authored by Valery Sizov's avatar Valery Sizov Committed by Michael Kozono

Regularly reverify Packages on primary

Updates verification state records using batches

We need to regularly re-verify data on the primary
to protect against data corruption. In combination with
https://gitlab.com/gitlab-org/gitlab-ee/issues/13842,
this will ensure that packages are regularly re-verified
on all secondary nodes as well.
parent 57f342a2
......@@ -7,11 +7,19 @@ module Geo
include Delay
DEFAULT_VERIFICATION_BATCH_SIZE = 10
DEFAULT_REVERIFICATION_BATCH_SIZE = 1000
class_methods do
extend Gitlab::Utils::Override
delegate :verification_pending_batch, :verification_failed_batch, :needs_verification_count, :fail_verification_timeouts, to: :verification_query_class
delegate :verification_pending_batch,
:verification_failed_batch,
:needs_verification_count,
:needs_reverification_count,
:fail_verification_timeouts,
:reverifiable_batch,
:reverify_batch,
to: :verification_query_class
# If replication is disabled, then so is verification.
override :verification_enabled?
......@@ -31,6 +39,10 @@ module Geo
::Geo::VerificationBatchWorker.perform_with_capacity(replicable_name)
::Geo::VerificationTimeoutWorker.perform_async(replicable_name)
# Secondaries don't need to run this since they will receive an event for each
# rechecksummed resource: https://gitlab.com/gitlab-org/gitlab/-/issues/13842
::Geo::ReverificationBatchWorker.perform_async(replicable_name) if ::Gitlab::Geo.primary?
end
# Called by VerificationBatchWorker.
......@@ -54,6 +66,18 @@ module Geo
.ceil
end
# Called by ReverificationBatchWorker.
#
# - Asks the DB how many things still need to be reverified (with a limit)
# - Converts that to a number of batches
#
# @return [Integer] number of batches of reverification work remaining, up to the given maximum
def remaining_reverification_batch_count(max_batch_count:)
needs_reverification_count(limit: max_batch_count * reverification_batch_size)
.fdiv(reverification_batch_size)
.ceil
end
# @return [Array<Gitlab::Geo::Replicator>] batch of replicators which need to be verified
def replicator_batch_to_verify
model_record_id_batch_to_verify.map do |id|
......@@ -74,6 +98,11 @@ module Geo
ids
end
# @return [Integer] number of records set to be re-verified
def reverify_batch!
reverify_batch(batch_size: reverification_batch_size)
end
# If primary, query the model table.
# If secondary, query the registry table.
def verification_query_class
......@@ -85,6 +114,11 @@ module Geo
DEFAULT_VERIFICATION_BATCH_SIZE
end
# @return [Integer] number of records to reverify per batch job
def reverification_batch_size
DEFAULT_REVERIFICATION_BATCH_SIZE
end
def checksummed_count
# When verification is disabled, this returns nil.
# Bonus: This causes the progress bar to be hidden.
......
......@@ -515,6 +515,14 @@
:weight: 1
:idempotent:
:tags: []
- :name: geo:geo_reverification_batch
:feature_category: :geo_replication
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: geo:geo_scheduler_primary_scheduler
:feature_category: :geo_replication
:has_external_dependencies:
......
# frozen_string_literal: true
module Geo
class ReverificationBatchWorker
include ApplicationWorker
include GeoQueue
include LimitedCapacity::Worker
include ::Gitlab::Geo::LogHelpers
# Single-file should be fast enough. If increasing this constant over 1, then be sure
# to add row locking.
# See https://gitlab.com/gitlab-org/gitlab/-/merge_requests/53470/diffs#note_502847744
MAX_RUNNING_JOBS = 1
idempotent!
loggable_arguments 0
def perform_work(replicable_name)
replicator_class = replicator_class_for(replicable_name)
replicator_class.reverify_batch!
end
def remaining_work_count(replicable_name)
replicator_class = replicator_class_for(replicable_name)
@remaining_work_count ||= replicator_class
.remaining_reverification_batch_count(max_batch_count: remaining_capacity)
end
def max_running_jobs
MAX_RUNNING_JOBS
end
def replicator_class_for(replicable_name)
@replicator_class ||= ::Gitlab::Geo::Replicator.for_replicable_name(replicable_name)
end
end
end
---
title: Regularly reverify Packages on primary
merge_request: 53470
author:
type: changed
......@@ -37,6 +37,7 @@ module Gitlab
scope :verification_timed_out, -> { verification_started.where("verification_started_at < ?", VERIFICATION_TIMEOUT.ago) }
scope :retry_due, -> { where(arel_table[:verification_retry_at].eq(nil).or(arel_table[:verification_retry_at].lt(Time.current))) }
scope :needs_verification, -> { with_verification_state(:verification_pending).or(with_verification_state(:verification_failed).retry_due) }
scope :needs_reverification, -> { verification_succeeded.where("verified_at < ?", ::Gitlab::Geo.current_node.minimum_reverification_interval.days.ago) }
# rubocop:enable CodeReuse/ActiveRecord
state_machine :verification_state, initial: :verification_pending do
......@@ -142,6 +143,11 @@ module Gitlab
needs_verification
end
# @return [Integer] number of records that need reverification
def needs_reverification_count(limit:)
needs_reverification.limit(limit).count # rubocop:disable CodeReuse/ActiveRecord
end
# Atomically marks the records as verification_started, with a
# verification_started_at time, and returns the primary key of each
# updated row. This allows VerificationBatchWorker to concurrently get
......@@ -213,6 +219,55 @@ module Gitlab
relation.update_all(attrs)
end
end
# Reverifies batch and returns the number of records.
#
# Atomically marks those records "verification_pending" in the same DB
# query.
#
def reverify_batch(batch_size:)
relation = reverification_batch_relation(batch_size: batch_size)
mark_as_verification_pending(relation)
end
# Returns IDs of records that need re-verification.
#
# Atomically marks those records "verification_pending" in the same DB
# query.
#
# rubocop:disable CodeReuse/ActiveRecord
def reverification_batch_relation(batch_size:)
needs_reverification.order(:verified_at).limit(batch_size)
end
# rubocop:enable CodeReuse/ActiveRecord
# Atomically marks the records as verification_pending.
# Returns the number of records set to be referified.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [Integer] number of records
def mark_as_verification_pending(relation)
query = mark_as_verification_pending_query(relation)
self.connection.execute(query).cmd_tuples
end
# Returns a SQL statement which would update all the rows in the
# relation as verification_pending
# and returns the number of updated rows.
#
# @param [ActiveRecord::Relation] relation with appropriate where, order, and limit defined
# @return [String] SQL statement which would update all and return the number of rows
def mark_as_verification_pending_query(relation)
pending_enum_value = VERIFICATION_STATE_VALUES[:verification_pending]
<<~SQL.squish
UPDATE #{table_name}
SET "verification_state" = #{pending_enum_value}
WHERE #{self.verification_state_model_key} IN (#{relation.select(self.verification_state_model_key).to_sql})
SQL
end
end
# Overridden by ReplicableRegistry
......
......@@ -169,6 +169,68 @@ RSpec.describe Gitlab::Geo::VerificationState do
end
end
describe '.needs_reverification' do
before do
stub_current_geo_node(primary_node)
end
let(:pending_value) { DummyModel.verification_state_value(:verification_pending) }
let(:failed_value) { DummyModel.verification_state_value(:verification_failed) }
let(:succeeded_value) { DummyModel.verification_state_value(:verification_succeeded) }
it 'includes verification_succeeded with expired checksum' do
DummyModel.insert_all([
{ verification_state: succeeded_value, verified_at: 15.days.ago }
])
expect(subject.class.needs_reverification.count).to eq 1
end
it 'excludes non-success verification states and fresh checksums' do
DummyModel.insert_all([
{ verification_state: pending_value, verified_at: 7.days.ago },
{ verification_state: failed_value, verified_at: 6.days.ago },
{ verification_state: succeeded_value, verified_at: 3.days.ago }
])
expect(subject.class.needs_reverification.count).to eq 0
end
end
describe '.reverify_batch' do
let!(:other_verified_records) do
DummyModel.insert_all([
{ verification_state: succeeded_value, verified_at: 3.days.ago },
{ verification_state: succeeded_value, verified_at: 4.days.ago }
])
end
let(:succeeded_value) { DummyModel.verification_state_value(:verification_succeeded) }
before do
stub_current_geo_node(primary_node)
subject.verification_started
subject.verification_succeeded_with_checksum!('foo', Time.current)
subject.update!(verified_at: 15.days.ago)
end
it 'sets pending status to records with outdated verification' do
expect do
expect(subject.class.reverify_batch(batch_size: 100)).to eq 1
end.to change { subject.reload.verification_pending? }.to be_truthy
end
it 'limits the update with batch_size' do
DummyModel.update_all(verified_at: 15.days.ago)
expect(subject.class.reverify_batch(batch_size: 2)).to eq 2
expect(DummyModel.verification_pending.count).to eq 2
end
end
describe '.fail_verification_timeouts' do
before do
subject.verification_started!
......
......@@ -137,6 +137,26 @@ RSpec.shared_examples 'a verifiable replicator' do
described_class.trigger_background_verification
end
context 'for a Geo secondary' do
it 'does not enqueue ReverificationBatchWorker' do
stub_secondary_node
expect(::Geo::ReverificationBatchWorker).not_to receive(:perform_async)
described_class.trigger_background_verification
end
end
context 'for a Geo primary' do
it 'enqueues ReverificationBatchWorker' do
stub_primary_node
expect(::Geo::ReverificationBatchWorker).to receive(:perform_async).with(described_class.replicable_name)
described_class.trigger_background_verification
end
end
end
context 'when verification is disabled' do
......@@ -185,6 +205,23 @@ RSpec.shared_examples 'a verifiable replicator' do
end
end
describe '.remaining_reverification_batch_count' do
it 'converts needs_reverification_count to number of batches' do
expected_limit = 4000
expect(described_class).to receive(:needs_reverification_count).with(limit: expected_limit).and_return(1500)
expect(described_class.remaining_reverification_batch_count(max_batch_count: 4)).to eq(2)
end
end
describe '.reverify_batch!' do
it 'calls #reverify_batch' do
allow(described_class).to receive(:reverify_batch).with(batch_size: described_class::DEFAULT_REVERIFICATION_BATCH_SIZE)
described_class.reverify_batch!
end
end
describe '.replicator_batch_to_verify' do
it 'returns usable Replicator instances' do
model_record.save!
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::ReverificationBatchWorker, :geo do
include EE::GeoHelpers
let(:node) { create(:geo_node, :primary) }
subject(:job) { described_class.new }
before do
stub_current_geo_node(node)
end
it 'uses a Geo queue' do
expect(job.sidekiq_options_hash).to include(
'queue' => 'geo:geo_reverification_batch',
'queue_namespace' => :geo
)
end
describe '#perform' do
let(:replicable_name) { 'widget' }
let(:replicator_class) { double('widget_replicator_class') }
before do
allow(::Gitlab::Geo::Replicator)
.to receive(:for_replicable_name).with(replicable_name).and_return(replicator_class)
end
it 'calls reverify_batch!' do
allow(replicator_class).to receive(:remaining_reverification_batch_count).and_return(1)
expect(replicator_class).to receive(:reverify_batch!)
job.perform(replicable_name)
end
end
include_examples 'an idempotent worker' do
let(:job_args) { ['package_file'] }
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment