Commit 2d612009 authored by Adam Hegyi's avatar Adam Hegyi

Merge branch 'add-vsa-periodic-reaggregation-worker' into 'master'

Add periodic re-aggregation for VSA

See merge request gitlab-org/gitlab!84171
parents 75230eb6 2bb347b7
# frozen_string_literal: true
class Analytics::CycleAnalytics::Aggregation < ApplicationRecord
include IgnorableColumns
include FromUnion
belongs_to :group, optional: false
validates :incremental_runtimes_in_seconds, :incremental_processed_records, :last_full_run_runtimes_in_seconds, :last_full_run_processed_records, presence: true, length: { maximum: 10 }, allow_blank: true
validates :incremental_runtimes_in_seconds, :incremental_processed_records, :full_runtimes_in_seconds, :full_processed_records, presence: true, length: { maximum: 10 }, allow_blank: true
scope :priority_order, -> (column_to_sort = :last_incremental_run_at) { order(arel_table[column_to_sort].asc.nulls_first) }
scope :enabled, -> { where('enabled IS TRUE') }
# These columns were added with wrong naming convention, the columns were never used.
ignore_column :last_full_run_processed_records, remove_with: '15.1', remove_after: '2022-05-22'
ignore_column :last_full_run_runtimes_in_seconds, remove_with: '15.1', remove_after: '2022-05-22'
ignore_column :last_full_run_issues_updated_at, remove_with: '15.1', remove_after: '2022-05-22'
ignore_column :last_full_run_mrs_updated_at, remove_with: '15.1', remove_after: '2022-05-22'
ignore_column :last_full_run_issues_id, remove_with: '15.1', remove_after: '2022-05-22'
ignore_column :last_full_run_merge_requests_id, remove_with: '15.1', remove_after: '2022-05-22'
def cursor_for(mode, model)
{
updated_at: self["last_#{mode}_#{model.table_name}_updated_at"],
id: self["last_#{mode}_#{model.table_name}_id"]
}.compact
end
def refresh_last_run(mode)
self["last_#{mode}_run_at"] = Time.current
end
def reset_full_run_cursors
self.last_full_issues_id = nil
self.last_full_issues_updated_at = nil
self.last_full_merge_requests_id = nil
self.last_full_merge_requests_updated_at = nil
end
def set_cursor(mode, model, cursor)
self["last_#{mode}_#{model.table_name}_id"] = cursor[:id]
self["last_#{mode}_#{model.table_name}_updated_at"] = cursor[:updated_at]
end
def set_stats(mode, runtime, processed_records)
# We only store the last 10 data points
self["#{mode}_runtimes_in_seconds"] = (self["#{mode}_runtimes_in_seconds"] + [runtime]).last(10)
self["#{mode}_processed_records"] = (self["#{mode}_processed_records"] + [processed_records]).last(10)
end
def estimated_next_run_at
return unless enabled
return if last_incremental_run_at.nil?
......
---
name: vsa_reaggregation_worker
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/84171
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/357647
milestone: '14.10'
type: development
group: group::optimize
default_enabled: false
......@@ -640,6 +640,9 @@ Gitlab.ee do
Settings.cron_jobs['analytics_cycle_analytics_consistency_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['analytics_cycle_analytics_consistency_worker']['cron'] ||= '*/30 * * * *'
Settings.cron_jobs['analytics_cycle_analytics_consistency_worker']['job_class'] = 'Analytics::CycleAnalytics::ConsistencyWorker'
Settings.cron_jobs['analytics_cycle_analytics_reaggregation_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['analytics_cycle_analytics_reaggregation_worker']['cron'] ||= '44 * * * *'
Settings.cron_jobs['analytics_cycle_analytics_reaggregation_worker']['job_class'] = 'Analytics::CycleAnalytics::ReaggregationWorker'
Settings.cron_jobs['active_user_count_threshold_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['active_user_count_threshold_worker']['cron'] ||= '0 12 * * *'
Settings.cron_jobs['active_user_count_threshold_worker']['job_class'] = 'ActiveUserCountThresholdWorker'
......
# frozen_string_literal: true
class AddRuntimeDataColumnsToVsaAggregations < Gitlab::Database::Migration[1.0]
def up
change_table(:analytics_cycle_analytics_aggregations, bulk: true) do |t|
t.integer :full_runtimes_in_seconds, array: true, default: [], null: false
t.integer :full_processed_records, array: true, default: [], null: false
t.column :last_full_merge_requests_updated_at, :datetime_with_timezone
t.column :last_full_issues_updated_at, :datetime_with_timezone
t.integer :last_full_issues_id
t.integer :last_full_merge_requests_id
end
end
def down
remove_column :analytics_cycle_analytics_aggregations, :full_runtimes_in_seconds
remove_column :analytics_cycle_analytics_aggregations, :full_processed_records
remove_column :analytics_cycle_analytics_aggregations, :last_full_merge_requests_updated_at
remove_column :analytics_cycle_analytics_aggregations, :last_full_issues_updated_at
remove_column :analytics_cycle_analytics_aggregations, :last_full_issues_id
remove_column :analytics_cycle_analytics_aggregations, :last_full_merge_requests_id
end
end
# frozen_string_literal: true
class AddCheckConstraintToVsaAggregationRuntimeDataColumns < Gitlab::Database::Migration[1.0]
FULL_RUNTIMES_IN_SECONDS_CONSTRAINT = 'full_runtimes_in_seconds_size'
FULL_PROCESSED_RECORDS_CONSTRAINT = 'full_processed_records_size'
disable_ddl_transaction!
def up
add_check_constraint(:analytics_cycle_analytics_aggregations,
'CARDINALITY(full_runtimes_in_seconds) <= 10',
FULL_RUNTIMES_IN_SECONDS_CONSTRAINT)
add_check_constraint(:analytics_cycle_analytics_aggregations,
'CARDINALITY(full_processed_records) <= 10',
FULL_PROCESSED_RECORDS_CONSTRAINT)
end
def down
remove_check_constraint :analytics_cycle_analytics_aggregations, FULL_RUNTIMES_IN_SECONDS_CONSTRAINT
remove_check_constraint :analytics_cycle_analytics_aggregations, FULL_PROCESSED_RECORDS_CONSTRAINT
end
end
f5c934c691b50bff8c4029a975e37e86177cdb24b10bb65be2edd5bda50938b0
\ No newline at end of file
4ffb630e2949769c0ad64d43c2f8b6ad432358c44b00da99ec8ce538bb245e1a
\ No newline at end of file
......@@ -10639,10 +10639,18 @@ CREATE TABLE analytics_cycle_analytics_aggregations (
last_full_run_mrs_updated_at timestamp with time zone,
last_consistency_check_updated_at timestamp with time zone,
enabled boolean DEFAULT true NOT NULL,
full_runtimes_in_seconds integer[] DEFAULT '{}'::integer[] NOT NULL,
full_processed_records integer[] DEFAULT '{}'::integer[] NOT NULL,
last_full_merge_requests_updated_at timestamp with time zone,
last_full_issues_updated_at timestamp with time zone,
last_full_issues_id integer,
last_full_merge_requests_id integer,
CONSTRAINT chk_rails_1ef688e577 CHECK ((cardinality(incremental_runtimes_in_seconds) <= 10)),
CONSTRAINT chk_rails_7810292ec9 CHECK ((cardinality(last_full_run_processed_records) <= 10)),
CONSTRAINT chk_rails_8b9e89687c CHECK ((cardinality(last_full_run_runtimes_in_seconds) <= 10)),
CONSTRAINT chk_rails_e16bf3913a CHECK ((cardinality(incremental_processed_records) <= 10))
CONSTRAINT chk_rails_e16bf3913a CHECK ((cardinality(incremental_processed_records) <= 10)),
CONSTRAINT full_processed_records_size CHECK ((cardinality(full_processed_records) <= 10)),
CONSTRAINT full_runtimes_in_seconds_size CHECK ((cardinality(full_runtimes_in_seconds) <= 10))
);
CREATE TABLE analytics_cycle_analytics_group_stages (
# frozen_string_literal: true
module Analytics
module CycleAnalytics
class AggregatorService
SUPPORTED_MODES = %I[incremental full].to_set
def initialize(aggregation:, mode: :incremental)
raise "Only :incremental mode is supported" if mode != :incremental
raise "Only :incremental and :full modes are supported" unless SUPPORTED_MODES.include?(mode)
@aggregation = aggregation
@mode = mode
@update_params = {
runtime_column => (aggregation[runtime_column] + [0]).last(10),
processed_records_column => (aggregation[processed_records_column] + [0]).last(10)
}
@runtime = 0
@processed_records = 0
@aggregation_finished = true
end
def execute
run_aggregation(Issue)
return unless aggregation.enabled?
run_aggregation(MergeRequest)
return unless aggregation.enabled?
update_params["last_#{mode}_run_at"] = Time.current
aggregation.refresh_last_run(mode)
aggregation.update!(update_params)
update_aggregation
end
private
attr_reader :aggregation, :mode, :update_params
def update_aggregation
aggregation.set_stats(mode, runtime, processed_records)
if full_run? && fully_aggregated?
aggregation.reset_full_run_cursors
end
aggregation.save!
end
attr_reader :aggregation, :mode, :update_params, :runtime, :processed_records
def run_aggregation(model)
response = Analytics::CycleAnalytics::DataLoaderService.new(
group: aggregation.group,
model: model,
context: Analytics::CycleAnalytics::AggregationContext.new(cursor: cursor_for(model))
context: Analytics::CycleAnalytics::AggregationContext.new(cursor: aggregation.cursor_for(mode, model))
).execute
handle_response(model, response)
......@@ -39,37 +52,25 @@ module Analytics
def handle_response(model, response)
if response.success?
update_params[updated_at_column(model)] = response.payload[:context].cursor[:updated_at]
update_params[id_column(model)] = response.payload[:context].cursor[:id]
update_params[runtime_column][-1] += response.payload[:context].runtime
update_params[processed_records_column][-1] += response.payload[:context].processed_records
else
update_params.clear
update_params[:enabled] = false
end
end
aggregation.set_cursor(mode, model, response.payload[:context].cursor)
def cursor_for(model)
{
updated_at: aggregation[updated_at_column(model)],
id: aggregation[id_column(model)]
}.compact
end
@runtime += response.payload[:context].runtime
@processed_records += response.payload[:context].processed_records
def updated_at_column(model)
"last_#{mode}_#{model.table_name}_updated_at"
end
@aggregation_finished = false if response.payload[:reason] != :model_processed
def id_column(model)
"last_#{mode}_#{model.table_name}_id"
else
aggregation.reset
aggregation.update!(enabled: false)
end
end
def runtime_column
"#{mode}_runtimes_in_seconds"
def full_run?
mode == :full
end
def processed_records_column
"#{mode}_processed_records"
def fully_aggregated?
@aggregation_finished
end
end
end
......
......@@ -57,6 +57,15 @@
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:analytics_cycle_analytics_reaggregation
:worker_name: Analytics::CycleAnalytics::ReaggregationWorker
:feature_category: :value_stream_management
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:analytics_devops_adoption_create_all_snapshots
:worker_name: Analytics::DevopsAdoption::CreateAllSnapshotsWorker
:feature_category: :devops_reports
......
......@@ -15,7 +15,7 @@ module Analytics
data_consistency :always
feature_category :value_stream_management
MAX_RUNTIME = 5.minutes
MAX_RUNTIME = 250.seconds
delegate :monotonic_time, to: :'Gitlab::Metrics::System'
......
......@@ -15,7 +15,7 @@ module Analytics
data_consistency :always
feature_category :value_stream_management
MAX_RUNTIME = 5.minutes
MAX_RUNTIME = 250.seconds
delegate :monotonic_time, to: :'Gitlab::Metrics::System'
......@@ -31,7 +31,7 @@ module Analytics
break if batch.empty?
batch.each do |aggregation|
Analytics::CycleAnalytics::AggregatorService.new(aggregation: aggregation).execute
Analytics::CycleAnalytics::AggregatorService.new(aggregation: aggregation, mode: :incremental).execute
if monotonic_time - start_time >= MAX_RUNTIME
over_time = true
......
# frozen_string_literal: true
module Analytics
module CycleAnalytics
class ReaggregationWorker
include ApplicationWorker
# rubocop:disable Scalability/CronWorkerContext
# This worker does not perform work scoped to a context
include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext
idempotent!
data_consistency :always
feature_category :value_stream_management
MAX_RUNTIME = 250.seconds
delegate :monotonic_time, to: :'Gitlab::Metrics::System'
def perform
return if Feature.disabled?(:vsa_reaggregation_worker, default_enabled: :yaml)
current_time = Time.current
start_time = monotonic_time
over_time = false
loop do
batch = Analytics::CycleAnalytics::Aggregation.load_batch(current_time, :last_full_run_at)
break if batch.empty?
batch.each do |aggregation|
Analytics::CycleAnalytics::AggregatorService.new(aggregation: aggregation, mode: :full).execute
if monotonic_time - start_time >= MAX_RUNTIME
over_time = true
break
end
end
break if over_time
end
end
end
end
end
......@@ -15,7 +15,7 @@ RSpec.describe Analytics::CycleAnalytics::AggregatorService do
let(:mode) { :other_mode }
it 'raises error' do
expect { run_service }.to raise_error /Only :incremental mode is supported/
expect { run_service }.to raise_error /Only :incremental and :full modes are supported/
end
end
......@@ -23,6 +23,12 @@ RSpec.describe Analytics::CycleAnalytics::AggregatorService do
it 'sets the aggregation record disabled' do
expect { run_service }.to change { aggregation.reload.enabled }.from(true).to(false)
end
it 'calls the DataLoaderService only once' do
expect(Analytics::CycleAnalytics::DataLoaderService).to receive(:new).once.and_call_original
run_service
end
end
context 'when a subgroup is given' do
......@@ -111,5 +117,53 @@ RSpec.describe Analytics::CycleAnalytics::AggregatorService do
)
end
end
context 'when running a full aggregation' do
let(:mode) { :full }
let(:project) { create(:project, group: group) }
let!(:merge_request_1) { create(:merge_request, :with_merged_metrics, :unique_branches, project: project) }
let!(:merge_request_2) { create(:merge_request, :with_merged_metrics, :unique_branches, project: project) }
before do
create(:cycle_analytics_group_stage,
group: group,
start_event_identifier: :merge_request_created,
end_event_identifier: :merge_request_merged
)
stub_const('Analytics::CycleAnalytics::DataLoaderService::MAX_UPSERT_COUNT', 1)
stub_const('Analytics::CycleAnalytics::DataLoaderService::UPSERT_LIMIT', 1)
stub_const('Analytics::CycleAnalytics::DataLoaderService::BATCH_LIMIT', 1)
end
context 'when aggregation is not finished' do
it 'persists the cursor attributes' do
run_service
expect(aggregation.reload).to have_attributes(
full_processed_records: [1],
last_full_merge_requests_updated_at: be_within(5.seconds).of(merge_request_1.updated_at),
last_full_merge_requests_id: merge_request_1.id,
last_full_issues_updated_at: nil,
last_full_issues_id: nil
)
end
end
context 'when aggregation is finished during the second run' do
it 'resets the cursor attributes so the aggregation starts from the beginning' do
3.times { run_service }
expect(aggregation.reload).to have_attributes(
full_processed_records: [1, 1, 0],
last_full_merge_requests_updated_at: nil,
last_full_merge_requests_id: nil,
last_full_issues_updated_at: nil,
last_full_issues_id: nil
)
end
end
end
end
end
# frozen_string_literal: true
RSpec.shared_examples 'aggregator worker examples' do
def run_worker
described_class.new.perform
end
it_behaves_like 'an idempotent worker'
context 'when the FF is disabled' do
it 'does nothing' do
stub_feature_flags(feature_flag => false)
expect(Analytics::CycleAnalytics::Aggregation).not_to receive(:load_batch)
run_worker
end
end
context 'when the loaded batch is empty' do
it 'does nothing' do
expect(Analytics::CycleAnalytics::AggregatorService).not_to receive(:new)
run_worker
end
end
it 'invokes the AggregatorService' do
aggregation = create(:cycle_analytics_aggregation)
expect(Analytics::CycleAnalytics::AggregatorService).to receive(:new)
.with(aggregation: aggregation, mode: expected_mode)
.and_call_original
run_worker
end
it 'breaks at the second iteration due to overtime' do
create_list(:cycle_analytics_aggregation, 2)
first_monotonic_time = 100
second_monotonic_time = first_monotonic_time + described_class::MAX_RUNTIME.to_i + 10
expect(Gitlab::Metrics::System).to receive(:monotonic_time).and_return(first_monotonic_time, second_monotonic_time)
expect(Analytics::CycleAnalytics::AggregatorService).to receive(:new).and_call_original.exactly(:once)
run_worker
end
end
......@@ -3,47 +3,8 @@
require 'spec_helper'
RSpec.describe Analytics::CycleAnalytics::IncrementalWorker do
def run_worker
described_class.new.perform
end
it_behaves_like 'an idempotent worker'
context 'when the vsa_incremental_worker FF is disabled' do
it 'does nothing' do
stub_feature_flags(vsa_incremental_worker: false)
expect(Analytics::CycleAnalytics::Aggregation).not_to receive(:load_batch)
run_worker
end
end
context 'when the loaded batch is empty' do
it 'does nothing' do
expect(Analytics::CycleAnalytics::AggregatorService).not_to receive(:new)
run_worker
end
end
it 'invokes the AggregatorService' do
aggregation = create(:cycle_analytics_aggregation)
expect(Analytics::CycleAnalytics::AggregatorService).to receive(:new).with(aggregation: aggregation).and_call_original
run_worker
end
it 'breaks at the second iteration due to overtime' do
create_list(:cycle_analytics_aggregation, 2)
first_monotonic_time = 100
second_monotonic_time = first_monotonic_time + Analytics::CycleAnalytics::IncrementalWorker::MAX_RUNTIME.to_i + 10
expect(Gitlab::Metrics::System).to receive(:monotonic_time).and_return(first_monotonic_time, second_monotonic_time)
expect(Analytics::CycleAnalytics::AggregatorService).to receive(:new).and_call_original.exactly(:once)
run_worker
it_behaves_like 'aggregator worker examples' do
let(:expected_mode) { :incremental }
let(:feature_flag) { :vsa_incremental_worker }
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Analytics::CycleAnalytics::ReaggregationWorker do
it_behaves_like 'aggregator worker examples' do
let(:expected_mode) { :full }
let(:feature_flag) { :vsa_reaggregation_worker }
end
end
......@@ -22,7 +22,7 @@ RSpec.describe 'Database schema' do
approvals: %w[user_id],
approver_groups: %w[target_id],
approvers: %w[target_id user_id],
analytics_cycle_analytics_aggregations: %w[last_full_run_issues_id last_full_run_merge_requests_id last_incremental_issues_id last_incremental_merge_requests_id],
analytics_cycle_analytics_aggregations: %w[last_full_issues_id last_full_merge_requests_id last_incremental_issues_id last_full_run_issues_id last_full_run_merge_requests_id last_incremental_merge_requests_id],
analytics_cycle_analytics_merge_request_stage_events: %w[author_id group_id merge_request_id milestone_id project_id stage_event_hash_id state_id],
analytics_cycle_analytics_issue_stage_events: %w[author_id group_id issue_id milestone_id project_id stage_event_hash_id state_id],
audit_events: %w[author_id entity_id target_id],
......
......@@ -10,7 +10,7 @@ RSpec.describe Analytics::CycleAnalytics::Aggregation, type: :model do
it { is_expected.not_to validate_presence_of(:group) }
it { is_expected.not_to validate_presence_of(:enabled) }
%i[incremental_runtimes_in_seconds incremental_processed_records last_full_run_runtimes_in_seconds last_full_run_processed_records].each do |column|
%i[incremental_runtimes_in_seconds incremental_processed_records full_runtimes_in_seconds full_processed_records].each do |column|
it "validates the array length of #{column}" do
record = described_class.new(column => [1] * 11)
......@@ -20,6 +20,81 @@ RSpec.describe Analytics::CycleAnalytics::Aggregation, type: :model do
end
end
describe 'attribute updater methods' do
subject(:aggregation) { build(:cycle_analytics_aggregation) }
describe '#cursor_for' do
it 'returns empty cursors' do
aggregation.last_full_issues_id = nil
aggregation.last_full_issues_updated_at = nil
expect(aggregation.cursor_for(:full, Issue)).to eq({})
end
context 'when cursor is not empty' do
it 'returns the cursor values' do
current_time = Time.current
aggregation.last_full_issues_id = 1111
aggregation.last_full_issues_updated_at = current_time
expect(aggregation.cursor_for(:full, Issue)).to eq({ id: 1111, updated_at: current_time })
end
end
end
describe '#refresh_last_run' do
it 'updates the run_at column' do
freeze_time do
aggregation.refresh_last_run(:incremental)
expect(aggregation.last_incremental_run_at).to eq(Time.current)
end
end
end
describe '#reset_full_run_cursors' do
it 'resets all full run cursors to nil' do
aggregation.last_full_issues_id = 111
aggregation.last_full_issues_updated_at = Time.current
aggregation.last_full_merge_requests_id = 111
aggregation.last_full_merge_requests_updated_at = Time.current
aggregation.reset_full_run_cursors
expect(aggregation).to have_attributes(
last_full_issues_id: nil,
last_full_issues_updated_at: nil,
last_full_merge_requests_id: nil,
last_full_merge_requests_updated_at: nil
)
end
end
describe '#set_cursor' do
it 'sets the cursor values for the given mode' do
aggregation.set_cursor(:full, Issue, { id: 2222, updated_at: nil })
expect(aggregation).to have_attributes(
last_full_issues_id: 2222,
last_full_issues_updated_at: nil
)
end
end
describe '#set_stats' do
it 'appends stats to the runtime and processed_records attributes' do
aggregation.set_stats(:full, 10, 20)
aggregation.set_stats(:full, 20, 30)
expect(aggregation).to have_attributes(
full_runtimes_in_seconds: [10, 20],
full_processed_records: [20, 30]
)
end
end
end
describe '#safe_create_for_group' do
let_it_be(:group) { create(:group) }
let_it_be(:subgroup) { create(:group, parent: group) }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment