Commit 2bb347b7 authored by Adam Hegyi's avatar Adam Hegyi

Add periodic re-aggregation worker for VSA

This change adds the reaggregation worker for the aggregated value stream
analytics feature. The worker ensures that outdated rows are detected
and corrected. The worker scans all issues and MRs in the group
hierarchy and once it reaches the end, a new cycle starts.

Changelog: added
parent cf25d929
# frozen_string_literal: true
class Analytics::CycleAnalytics::Aggregation < ApplicationRecord
include IgnorableColumns
include FromUnion
belongs_to :group, optional: false
validates :incremental_runtimes_in_seconds, :incremental_processed_records, :last_full_run_runtimes_in_seconds, :last_full_run_processed_records, presence: true, length: { maximum: 10 }, allow_blank: true
validates :incremental_runtimes_in_seconds, :incremental_processed_records, :full_runtimes_in_seconds, :full_processed_records, presence: true, length: { maximum: 10 }, allow_blank: true
scope :priority_order, -> (column_to_sort = :last_incremental_run_at) { order(arel_table[column_to_sort].asc.nulls_first) }
scope :enabled, -> { where('enabled IS TRUE') }
# These columns were added with wrong naming convention, the columns were never used.
ignore_column :last_full_run_processed_records, remove_with: '15.1', remove_after: '2022-05-22'
ignore_column :last_full_run_runtimes_in_seconds, remove_with: '15.1', remove_after: '2022-05-22'
ignore_column :last_full_run_issues_updated_at, remove_with: '15.1', remove_after: '2022-05-22'
ignore_column :last_full_run_mrs_updated_at, remove_with: '15.1', remove_after: '2022-05-22'
ignore_column :last_full_run_issues_id, remove_with: '15.1', remove_after: '2022-05-22'
ignore_column :last_full_run_merge_requests_id, remove_with: '15.1', remove_after: '2022-05-22'
def cursor_for(mode, model)
{
updated_at: self["last_#{mode}_#{model.table_name}_updated_at"],
id: self["last_#{mode}_#{model.table_name}_id"]
}.compact
end
def refresh_last_run(mode)
self["last_#{mode}_run_at"] = Time.current
end
def reset_full_run_cursors
self.last_full_issues_id = nil
self.last_full_issues_updated_at = nil
self.last_full_merge_requests_id = nil
self.last_full_merge_requests_updated_at = nil
end
def set_cursor(mode, model, cursor)
self["last_#{mode}_#{model.table_name}_id"] = cursor[:id]
self["last_#{mode}_#{model.table_name}_updated_at"] = cursor[:updated_at]
end
def set_stats(mode, runtime, processed_records)
# We only store the last 10 data points
self["#{mode}_runtimes_in_seconds"] = (self["#{mode}_runtimes_in_seconds"] + [runtime]).last(10)
self["#{mode}_processed_records"] = (self["#{mode}_processed_records"] + [processed_records]).last(10)
end
def estimated_next_run_at
return unless enabled
return if last_incremental_run_at.nil?
......
---
name: vsa_reaggregation_worker
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/84171
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/357647
milestone: '14.10'
type: development
group: group::optimize
default_enabled: false
......@@ -640,6 +640,9 @@ Gitlab.ee do
Settings.cron_jobs['analytics_cycle_analytics_consistency_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['analytics_cycle_analytics_consistency_worker']['cron'] ||= '*/30 * * * *'
Settings.cron_jobs['analytics_cycle_analytics_consistency_worker']['job_class'] = 'Analytics::CycleAnalytics::ConsistencyWorker'
Settings.cron_jobs['analytics_cycle_analytics_reaggregation_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['analytics_cycle_analytics_reaggregation_worker']['cron'] ||= '44 * * * *'
Settings.cron_jobs['analytics_cycle_analytics_reaggregation_worker']['job_class'] = 'Analytics::CycleAnalytics::ReaggregationWorker'
Settings.cron_jobs['active_user_count_threshold_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['active_user_count_threshold_worker']['cron'] ||= '0 12 * * *'
Settings.cron_jobs['active_user_count_threshold_worker']['job_class'] = 'ActiveUserCountThresholdWorker'
......
# frozen_string_literal: true
class AddRuntimeDataColumnsToVsaAggregations < Gitlab::Database::Migration[1.0]
def up
change_table(:analytics_cycle_analytics_aggregations, bulk: true) do |t|
t.integer :full_runtimes_in_seconds, array: true, default: [], null: false
t.integer :full_processed_records, array: true, default: [], null: false
t.column :last_full_merge_requests_updated_at, :datetime_with_timezone
t.column :last_full_issues_updated_at, :datetime_with_timezone
t.integer :last_full_issues_id
t.integer :last_full_merge_requests_id
end
end
def down
remove_column :analytics_cycle_analytics_aggregations, :full_runtimes_in_seconds
remove_column :analytics_cycle_analytics_aggregations, :full_processed_records
remove_column :analytics_cycle_analytics_aggregations, :last_full_merge_requests_updated_at
remove_column :analytics_cycle_analytics_aggregations, :last_full_issues_updated_at
remove_column :analytics_cycle_analytics_aggregations, :last_full_issues_id
remove_column :analytics_cycle_analytics_aggregations, :last_full_merge_requests_id
end
end
# frozen_string_literal: true
class AddCheckConstraintToVsaAggregationRuntimeDataColumns < Gitlab::Database::Migration[1.0]
FULL_RUNTIMES_IN_SECONDS_CONSTRAINT = 'full_runtimes_in_seconds_size'
FULL_PROCESSED_RECORDS_CONSTRAINT = 'full_processed_records_size'
disable_ddl_transaction!
def up
add_check_constraint(:analytics_cycle_analytics_aggregations,
'CARDINALITY(full_runtimes_in_seconds) <= 10',
FULL_RUNTIMES_IN_SECONDS_CONSTRAINT)
add_check_constraint(:analytics_cycle_analytics_aggregations,
'CARDINALITY(full_processed_records) <= 10',
FULL_PROCESSED_RECORDS_CONSTRAINT)
end
def down
remove_check_constraint :analytics_cycle_analytics_aggregations, FULL_RUNTIMES_IN_SECONDS_CONSTRAINT
remove_check_constraint :analytics_cycle_analytics_aggregations, FULL_PROCESSED_RECORDS_CONSTRAINT
end
end
f5c934c691b50bff8c4029a975e37e86177cdb24b10bb65be2edd5bda50938b0
\ No newline at end of file
4ffb630e2949769c0ad64d43c2f8b6ad432358c44b00da99ec8ce538bb245e1a
\ No newline at end of file
......@@ -10639,10 +10639,18 @@ CREATE TABLE analytics_cycle_analytics_aggregations (
last_full_run_mrs_updated_at timestamp with time zone,
last_consistency_check_updated_at timestamp with time zone,
enabled boolean DEFAULT true NOT NULL,
full_runtimes_in_seconds integer[] DEFAULT '{}'::integer[] NOT NULL,
full_processed_records integer[] DEFAULT '{}'::integer[] NOT NULL,
last_full_merge_requests_updated_at timestamp with time zone,
last_full_issues_updated_at timestamp with time zone,
last_full_issues_id integer,
last_full_merge_requests_id integer,
CONSTRAINT chk_rails_1ef688e577 CHECK ((cardinality(incremental_runtimes_in_seconds) <= 10)),
CONSTRAINT chk_rails_7810292ec9 CHECK ((cardinality(last_full_run_processed_records) <= 10)),
CONSTRAINT chk_rails_8b9e89687c CHECK ((cardinality(last_full_run_runtimes_in_seconds) <= 10)),
CONSTRAINT chk_rails_e16bf3913a CHECK ((cardinality(incremental_processed_records) <= 10))
CONSTRAINT chk_rails_e16bf3913a CHECK ((cardinality(incremental_processed_records) <= 10)),
CONSTRAINT full_processed_records_size CHECK ((cardinality(full_processed_records) <= 10)),
CONSTRAINT full_runtimes_in_seconds_size CHECK ((cardinality(full_runtimes_in_seconds) <= 10))
);
CREATE TABLE analytics_cycle_analytics_group_stages (
# frozen_string_literal: true
module Analytics
module CycleAnalytics
class AggregatorService
SUPPORTED_MODES = %I[incremental full].to_set
def initialize(aggregation:, mode: :incremental)
raise "Only :incremental mode is supported" if mode != :incremental
raise "Only :incremental and :full modes are supported" unless SUPPORTED_MODES.include?(mode)
@aggregation = aggregation
@mode = mode
@update_params = {
runtime_column => (aggregation[runtime_column] + [0]).last(10),
processed_records_column => (aggregation[processed_records_column] + [0]).last(10)
}
@runtime = 0
@processed_records = 0
@aggregation_finished = true
end
def execute
run_aggregation(Issue)
return unless aggregation.enabled?
run_aggregation(MergeRequest)
return unless aggregation.enabled?
update_params["last_#{mode}_run_at"] = Time.current
aggregation.refresh_last_run(mode)
aggregation.update!(update_params)
update_aggregation
end
private
attr_reader :aggregation, :mode, :update_params
def update_aggregation
aggregation.set_stats(mode, runtime, processed_records)
if full_run? && fully_aggregated?
aggregation.reset_full_run_cursors
end
aggregation.save!
end
attr_reader :aggregation, :mode, :update_params, :runtime, :processed_records
def run_aggregation(model)
response = Analytics::CycleAnalytics::DataLoaderService.new(
group: aggregation.group,
model: model,
context: Analytics::CycleAnalytics::AggregationContext.new(cursor: cursor_for(model))
context: Analytics::CycleAnalytics::AggregationContext.new(cursor: aggregation.cursor_for(mode, model))
).execute
handle_response(model, response)
......@@ -39,37 +52,25 @@ module Analytics
def handle_response(model, response)
if response.success?
update_params[updated_at_column(model)] = response.payload[:context].cursor[:updated_at]
update_params[id_column(model)] = response.payload[:context].cursor[:id]
update_params[runtime_column][-1] += response.payload[:context].runtime
update_params[processed_records_column][-1] += response.payload[:context].processed_records
else
update_params.clear
update_params[:enabled] = false
end
end
aggregation.set_cursor(mode, model, response.payload[:context].cursor)
def cursor_for(model)
{
updated_at: aggregation[updated_at_column(model)],
id: aggregation[id_column(model)]
}.compact
end
@runtime += response.payload[:context].runtime
@processed_records += response.payload[:context].processed_records
def updated_at_column(model)
"last_#{mode}_#{model.table_name}_updated_at"
end
@aggregation_finished = false if response.payload[:reason] != :model_processed
def id_column(model)
"last_#{mode}_#{model.table_name}_id"
else
aggregation.reset
aggregation.update!(enabled: false)
end
end
def runtime_column
"#{mode}_runtimes_in_seconds"
def full_run?
mode == :full
end
def processed_records_column
"#{mode}_processed_records"
def fully_aggregated?
@aggregation_finished
end
end
end
......
......@@ -57,6 +57,15 @@
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:analytics_cycle_analytics_reaggregation
:worker_name: Analytics::CycleAnalytics::ReaggregationWorker
:feature_category: :value_stream_management
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:analytics_devops_adoption_create_all_snapshots
:worker_name: Analytics::DevopsAdoption::CreateAllSnapshotsWorker
:feature_category: :devops_reports
......
......@@ -15,7 +15,7 @@ module Analytics
data_consistency :always
feature_category :value_stream_management
MAX_RUNTIME = 5.minutes
MAX_RUNTIME = 250.seconds
delegate :monotonic_time, to: :'Gitlab::Metrics::System'
......
......@@ -15,7 +15,7 @@ module Analytics
data_consistency :always
feature_category :value_stream_management
MAX_RUNTIME = 5.minutes
MAX_RUNTIME = 250.seconds
delegate :monotonic_time, to: :'Gitlab::Metrics::System'
......@@ -31,7 +31,7 @@ module Analytics
break if batch.empty?
batch.each do |aggregation|
Analytics::CycleAnalytics::AggregatorService.new(aggregation: aggregation).execute
Analytics::CycleAnalytics::AggregatorService.new(aggregation: aggregation, mode: :incremental).execute
if monotonic_time - start_time >= MAX_RUNTIME
over_time = true
......
# frozen_string_literal: true
module Analytics
module CycleAnalytics
class ReaggregationWorker
include ApplicationWorker
# rubocop:disable Scalability/CronWorkerContext
# This worker does not perform work scoped to a context
include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext
idempotent!
data_consistency :always
feature_category :value_stream_management
MAX_RUNTIME = 250.seconds
delegate :monotonic_time, to: :'Gitlab::Metrics::System'
def perform
return if Feature.disabled?(:vsa_reaggregation_worker, default_enabled: :yaml)
current_time = Time.current
start_time = monotonic_time
over_time = false
loop do
batch = Analytics::CycleAnalytics::Aggregation.load_batch(current_time, :last_full_run_at)
break if batch.empty?
batch.each do |aggregation|
Analytics::CycleAnalytics::AggregatorService.new(aggregation: aggregation, mode: :full).execute
if monotonic_time - start_time >= MAX_RUNTIME
over_time = true
break
end
end
break if over_time
end
end
end
end
end
......@@ -15,7 +15,7 @@ RSpec.describe Analytics::CycleAnalytics::AggregatorService do
let(:mode) { :other_mode }
it 'raises error' do
expect { run_service }.to raise_error /Only :incremental mode is supported/
expect { run_service }.to raise_error /Only :incremental and :full modes are supported/
end
end
......@@ -23,6 +23,12 @@ RSpec.describe Analytics::CycleAnalytics::AggregatorService do
it 'sets the aggregation record disabled' do
expect { run_service }.to change { aggregation.reload.enabled }.from(true).to(false)
end
it 'calls the DataLoaderService only once' do
expect(Analytics::CycleAnalytics::DataLoaderService).to receive(:new).once.and_call_original
run_service
end
end
context 'when a subgroup is given' do
......@@ -111,5 +117,53 @@ RSpec.describe Analytics::CycleAnalytics::AggregatorService do
)
end
end
context 'when running a full aggregation' do
let(:mode) { :full }
let(:project) { create(:project, group: group) }
let!(:merge_request_1) { create(:merge_request, :with_merged_metrics, :unique_branches, project: project) }
let!(:merge_request_2) { create(:merge_request, :with_merged_metrics, :unique_branches, project: project) }
before do
create(:cycle_analytics_group_stage,
group: group,
start_event_identifier: :merge_request_created,
end_event_identifier: :merge_request_merged
)
stub_const('Analytics::CycleAnalytics::DataLoaderService::MAX_UPSERT_COUNT', 1)
stub_const('Analytics::CycleAnalytics::DataLoaderService::UPSERT_LIMIT', 1)
stub_const('Analytics::CycleAnalytics::DataLoaderService::BATCH_LIMIT', 1)
end
context 'when aggregation is not finished' do
it 'persists the cursor attributes' do
run_service
expect(aggregation.reload).to have_attributes(
full_processed_records: [1],
last_full_merge_requests_updated_at: be_within(5.seconds).of(merge_request_1.updated_at),
last_full_merge_requests_id: merge_request_1.id,
last_full_issues_updated_at: nil,
last_full_issues_id: nil
)
end
end
context 'when aggregation is finished during the second run' do
it 'resets the cursor attributes so the aggregation starts from the beginning' do
3.times { run_service }
expect(aggregation.reload).to have_attributes(
full_processed_records: [1, 1, 0],
last_full_merge_requests_updated_at: nil,
last_full_merge_requests_id: nil,
last_full_issues_updated_at: nil,
last_full_issues_id: nil
)
end
end
end
end
end
# frozen_string_literal: true
RSpec.shared_examples 'aggregator worker examples' do
def run_worker
described_class.new.perform
end
it_behaves_like 'an idempotent worker'
context 'when the FF is disabled' do
it 'does nothing' do
stub_feature_flags(feature_flag => false)
expect(Analytics::CycleAnalytics::Aggregation).not_to receive(:load_batch)
run_worker
end
end
context 'when the loaded batch is empty' do
it 'does nothing' do
expect(Analytics::CycleAnalytics::AggregatorService).not_to receive(:new)
run_worker
end
end
it 'invokes the AggregatorService' do
aggregation = create(:cycle_analytics_aggregation)
expect(Analytics::CycleAnalytics::AggregatorService).to receive(:new)
.with(aggregation: aggregation, mode: expected_mode)
.and_call_original
run_worker
end
it 'breaks at the second iteration due to overtime' do
create_list(:cycle_analytics_aggregation, 2)
first_monotonic_time = 100
second_monotonic_time = first_monotonic_time + described_class::MAX_RUNTIME.to_i + 10
expect(Gitlab::Metrics::System).to receive(:monotonic_time).and_return(first_monotonic_time, second_monotonic_time)
expect(Analytics::CycleAnalytics::AggregatorService).to receive(:new).and_call_original.exactly(:once)
run_worker
end
end
......@@ -3,47 +3,8 @@
require 'spec_helper'
RSpec.describe Analytics::CycleAnalytics::IncrementalWorker do
def run_worker
described_class.new.perform
end
it_behaves_like 'an idempotent worker'
context 'when the vsa_incremental_worker FF is disabled' do
it 'does nothing' do
stub_feature_flags(vsa_incremental_worker: false)
expect(Analytics::CycleAnalytics::Aggregation).not_to receive(:load_batch)
run_worker
end
end
context 'when the loaded batch is empty' do
it 'does nothing' do
expect(Analytics::CycleAnalytics::AggregatorService).not_to receive(:new)
run_worker
end
end
it 'invokes the AggregatorService' do
aggregation = create(:cycle_analytics_aggregation)
expect(Analytics::CycleAnalytics::AggregatorService).to receive(:new).with(aggregation: aggregation).and_call_original
run_worker
end
it 'breaks at the second iteration due to overtime' do
create_list(:cycle_analytics_aggregation, 2)
first_monotonic_time = 100
second_monotonic_time = first_monotonic_time + Analytics::CycleAnalytics::IncrementalWorker::MAX_RUNTIME.to_i + 10
expect(Gitlab::Metrics::System).to receive(:monotonic_time).and_return(first_monotonic_time, second_monotonic_time)
expect(Analytics::CycleAnalytics::AggregatorService).to receive(:new).and_call_original.exactly(:once)
run_worker
it_behaves_like 'aggregator worker examples' do
let(:expected_mode) { :incremental }
let(:feature_flag) { :vsa_incremental_worker }
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Analytics::CycleAnalytics::ReaggregationWorker do
it_behaves_like 'aggregator worker examples' do
let(:expected_mode) { :full }
let(:feature_flag) { :vsa_reaggregation_worker }
end
end
......@@ -22,7 +22,7 @@ RSpec.describe 'Database schema' do
approvals: %w[user_id],
approver_groups: %w[target_id],
approvers: %w[target_id user_id],
analytics_cycle_analytics_aggregations: %w[last_full_run_issues_id last_full_run_merge_requests_id last_incremental_issues_id last_incremental_merge_requests_id],
analytics_cycle_analytics_aggregations: %w[last_full_issues_id last_full_merge_requests_id last_incremental_issues_id last_full_run_issues_id last_full_run_merge_requests_id last_incremental_merge_requests_id],
analytics_cycle_analytics_merge_request_stage_events: %w[author_id group_id merge_request_id milestone_id project_id stage_event_hash_id state_id],
analytics_cycle_analytics_issue_stage_events: %w[author_id group_id issue_id milestone_id project_id stage_event_hash_id state_id],
audit_events: %w[author_id entity_id target_id],
......
......@@ -10,7 +10,7 @@ RSpec.describe Analytics::CycleAnalytics::Aggregation, type: :model do
it { is_expected.not_to validate_presence_of(:group) }
it { is_expected.not_to validate_presence_of(:enabled) }
%i[incremental_runtimes_in_seconds incremental_processed_records last_full_run_runtimes_in_seconds last_full_run_processed_records].each do |column|
%i[incremental_runtimes_in_seconds incremental_processed_records full_runtimes_in_seconds full_processed_records].each do |column|
it "validates the array length of #{column}" do
record = described_class.new(column => [1] * 11)
......@@ -20,6 +20,81 @@ RSpec.describe Analytics::CycleAnalytics::Aggregation, type: :model do
end
end
describe 'attribute updater methods' do
subject(:aggregation) { build(:cycle_analytics_aggregation) }
describe '#cursor_for' do
it 'returns empty cursors' do
aggregation.last_full_issues_id = nil
aggregation.last_full_issues_updated_at = nil
expect(aggregation.cursor_for(:full, Issue)).to eq({})
end
context 'when cursor is not empty' do
it 'returns the cursor values' do
current_time = Time.current
aggregation.last_full_issues_id = 1111
aggregation.last_full_issues_updated_at = current_time
expect(aggregation.cursor_for(:full, Issue)).to eq({ id: 1111, updated_at: current_time })
end
end
end
describe '#refresh_last_run' do
it 'updates the run_at column' do
freeze_time do
aggregation.refresh_last_run(:incremental)
expect(aggregation.last_incremental_run_at).to eq(Time.current)
end
end
end
describe '#reset_full_run_cursors' do
it 'resets all full run cursors to nil' do
aggregation.last_full_issues_id = 111
aggregation.last_full_issues_updated_at = Time.current
aggregation.last_full_merge_requests_id = 111
aggregation.last_full_merge_requests_updated_at = Time.current
aggregation.reset_full_run_cursors
expect(aggregation).to have_attributes(
last_full_issues_id: nil,
last_full_issues_updated_at: nil,
last_full_merge_requests_id: nil,
last_full_merge_requests_updated_at: nil
)
end
end
describe '#set_cursor' do
it 'sets the cursor values for the given mode' do
aggregation.set_cursor(:full, Issue, { id: 2222, updated_at: nil })
expect(aggregation).to have_attributes(
last_full_issues_id: 2222,
last_full_issues_updated_at: nil
)
end
end
describe '#set_stats' do
it 'appends stats to the runtime and processed_records attributes' do
aggregation.set_stats(:full, 10, 20)
aggregation.set_stats(:full, 20, 30)
expect(aggregation).to have_attributes(
full_runtimes_in_seconds: [10, 20],
full_processed_records: [20, 30]
)
end
end
end
describe '#safe_create_for_group' do
let_it_be(:group) { create(:group) }
let_it_be(:subgroup) { create(:group, parent: group) }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment