Commit 7dd71a09 authored by Dylan Griffith's avatar Dylan Griffith

Merge branch '344803-periodic-vsa-consistency-check' into 'master'

Introduce VSA peridoical consistency check

See merge request gitlab-org/gitlab!82591
parents b2d6e6d6 cb08b3ab
......@@ -7,7 +7,7 @@ class Analytics::CycleAnalytics::Aggregation < ApplicationRecord
validates :incremental_runtimes_in_seconds, :incremental_processed_records, :last_full_run_runtimes_in_seconds, :last_full_run_processed_records, presence: true, length: { maximum: 10 }, allow_blank: true
scope :priority_order, -> { order('last_incremental_run_at ASC NULLS FIRST') }
scope :priority_order, -> (column_to_sort = :last_incremental_run_at) { order(arel_table[column_to_sort].asc.nulls_first) }
scope :enabled, -> { where('enabled IS TRUE') }
def estimated_next_run_at
......@@ -55,17 +55,17 @@ class Analytics::CycleAnalytics::Aggregation < ApplicationRecord
connection.select_value("(#{max})")
end
def self.load_batch(last_run_at, batch_size = 100)
def self.load_batch(last_run_at, column_to_query = :last_incremental_run_at, batch_size = 100)
last_run_at_not_set = Analytics::CycleAnalytics::Aggregation
.enabled
.where(last_incremental_run_at: nil)
.priority_order
.where(column_to_query => nil)
.priority_order(column_to_query)
.limit(batch_size)
last_run_at_before = Analytics::CycleAnalytics::Aggregation
.enabled
.where('last_incremental_run_at < ?', last_run_at)
.priority_order
.where(arel_table[column_to_query].lt(last_run_at))
.priority_order(column_to_query)
.limit(batch_size)
Analytics::CycleAnalytics::Aggregation
......
---
name: vsa_consistency_worker
introduced_by_url:
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/355709
milestone: '14.9'
type: development
group: group::optimize
default_enabled: false
......@@ -628,6 +628,9 @@ Gitlab.ee do
Settings.cron_jobs['analytics_cycle_analytics_incremental_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['analytics_cycle_analytics_incremental_worker']['cron'] ||= '*/10 * * * *'
Settings.cron_jobs['analytics_cycle_analytics_incremental_worker']['job_class'] = 'Analytics::CycleAnalytics::IncrementalWorker'
Settings.cron_jobs['analytics_cycle_analytics_consistency_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['analytics_cycle_analytics_consistency_worker']['cron'] ||= '*/30 * * * *'
Settings.cron_jobs['analytics_cycle_analytics_consistency_worker']['job_class'] = 'Analytics::CycleAnalytics::ConsistencyWorker'
Settings.cron_jobs['active_user_count_threshold_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['active_user_count_threshold_worker']['cron'] ||= '0 12 * * *'
Settings.cron_jobs['active_user_count_threshold_worker']['job_class'] = 'ActiveUserCountThresholdWorker'
......
......@@ -39,6 +39,15 @@
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:analytics_cycle_analytics_consistency
:worker_name: Analytics::CycleAnalytics::ConsistencyWorker
:feature_category: :value_stream_management
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:analytics_cycle_analytics_incremental
:worker_name: Analytics::CycleAnalytics::IncrementalWorker
:feature_category: :value_stream_management
......
# frozen_string_literal: true
module Analytics
module CycleAnalytics
class ConsistencyWorker
include ApplicationWorker
# rubocop:disable Scalability/CronWorkerContext
# This worker does not perform work scoped to a context
include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext
idempotent!
data_consistency :always
feature_category :value_stream_management
MAX_RUNTIME = 5.minutes
delegate :monotonic_time, to: :'Gitlab::Metrics::System'
def perform
return if Feature.disabled?(:vsa_consistency_worker, default_enabled: :yaml)
current_time = Time.current
start_time = monotonic_time
over_time = false
loop do
batch = Analytics::CycleAnalytics::Aggregation.load_batch(current_time, :last_consistency_check_updated_at)
break if batch.empty?
batch.each do |aggregation|
run_consistency_check_services(aggregation.group)
aggregation.update!(last_consistency_check_updated_at: current_time)
if monotonic_time - start_time >= MAX_RUNTIME
over_time = true
break
end
end
break if over_time
end
end
private
def run_consistency_check_services(group)
Analytics::CycleAnalytics::ConsistencyCheckService.new(group: group, event_model: Analytics::CycleAnalytics::IssueStageEvent).execute
Analytics::CycleAnalytics::ConsistencyCheckService.new(group: group, event_model: Analytics::CycleAnalytics::MergeRequestStageEvent).execute
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Analytics::CycleAnalytics::ConsistencyWorker do
def run_worker
described_class.new.perform
end
context 'when the vsa_incremental_worker feature flag is off' do
before do
stub_feature_flags(vsa_consistency_worker: false)
end
it 'does nothing' do
expect(Analytics::CycleAnalytics::Aggregation).not_to receive(:load_batch)
run_worker
end
end
context 'when the vsa_consistency_worker feature flag is on' do
before do
stub_feature_flags(vsa_consistency_worker: true)
end
context 'when no pending aggregation records present' do
before do
expect(Analytics::CycleAnalytics::Aggregation).to receive(:load_batch).once.and_call_original
end
it 'does nothing' do
freeze_time do
aggregation = create(:cycle_analytics_aggregation, last_consistency_check_updated_at: 5.minutes.from_now)
expect { run_worker }.not_to change { aggregation.reload }
end
end
end
context 'when pending aggregation records present' do
it 'invokes the consistency services' do
aggregation1 = create(:cycle_analytics_aggregation, last_consistency_check_updated_at: 5.minutes.ago)
aggregation2 = create(:cycle_analytics_aggregation, last_consistency_check_updated_at: 10.minutes.ago)
freeze_time do
run_worker
expect(aggregation1.reload.last_consistency_check_updated_at).to eq(Time.current)
expect(aggregation2.reload.last_consistency_check_updated_at).to eq(Time.current)
end
end
end
context 'when worker is over time' do
it 'breaks at the second iteration due to overtime' do
create_list(:cycle_analytics_aggregation, 2)
first_monotonic_time = 100
second_monotonic_time = first_monotonic_time + described_class::MAX_RUNTIME.to_i + 10
expect(Gitlab::Metrics::System).to receive(:monotonic_time).and_return(first_monotonic_time, second_monotonic_time)
expect_next_instance_of(described_class) do |instance|
expect(instance).to receive(:run_consistency_check_services).once
end
run_worker
end
end
end
end
......@@ -51,14 +51,14 @@ RSpec.describe Analytics::CycleAnalytics::Aggregation, type: :model do
end
describe '#load_batch' do
let!(:aggregation1) { create(:cycle_analytics_aggregation, last_incremental_run_at: nil) }
let!(:aggregation1) { create(:cycle_analytics_aggregation, last_incremental_run_at: nil, last_consistency_check_updated_at: 3.days.ago).reload }
let!(:aggregation2) { create(:cycle_analytics_aggregation, last_incremental_run_at: 5.days.ago).reload }
let!(:aggregation3) { create(:cycle_analytics_aggregation, last_incremental_run_at: nil) }
let!(:aggregation5) { create(:cycle_analytics_aggregation, last_incremental_run_at: 10.days.ago).reload }
let!(:aggregation3) { create(:cycle_analytics_aggregation, last_incremental_run_at: nil, last_consistency_check_updated_at: 2.days.ago).reload }
let!(:aggregation4) { create(:cycle_analytics_aggregation, last_incremental_run_at: 10.days.ago).reload }
before do
create(:cycle_analytics_aggregation, :disabled) # disabled rows are skipped
create(:cycle_analytics_aggregation, last_incremental_run_at: 1.day.ago) # "early" rows are filtered out
create(:cycle_analytics_aggregation, last_incremental_run_at: 1.day.ago, last_consistency_check_updated_at: 1.hour.ago) # "early" rows are filtered out
end
it 'loads records in priority order' do
......@@ -70,7 +70,20 @@ RSpec.describe Analytics::CycleAnalytics::Aggregation, type: :model do
# Using match_array because the order can be undeterministic for nil values.
expect(first_two).to match_array([aggregation1, aggregation3])
expect(last_two).to eq([aggregation5, aggregation2])
expect(last_two).to eq([aggregation4, aggregation2])
end
context 'when loading batch for last_consistency_check_updated_at' do
it 'loads records in priority order' do
batch = described_class.load_batch(1.day.ago, :last_consistency_check_updated_at).to_a
expect(batch.size).to eq(4)
first_two = batch.first(2)
last_two = batch.last(2)
expect(first_two).to match_array([aggregation2, aggregation4])
expect(last_two).to eq([aggregation1, aggregation3])
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment