Commit c62a9f3a authored by Adam Hegyi's avatar Adam Hegyi

Introduce VSA incremental aggregation

This change implements the periodical/incremental data aggregation for
value stream analytics.

Related issue: https://gitlab.com/gitlab-org/gitlab/-/issues/344803
parent f1279212
# frozen_string_literal: true
class Analytics::CycleAnalytics::Aggregation < ApplicationRecord
include FromUnion
belongs_to :group, optional: false
validates :incremental_runtimes_in_seconds, :incremental_processed_records, :last_full_run_runtimes_in_seconds, :last_full_run_processed_records, presence: true, length: { maximum: 10 }
validates :incremental_runtimes_in_seconds, :incremental_processed_records, :last_full_run_runtimes_in_seconds, :last_full_run_processed_records, presence: true, length: { maximum: 10 }, allow_blank: true
scope :priority_order, -> { order('last_incremental_run_at ASC NULLS FIRST') }
scope :enabled, -> { where('enabled IS TRUE') }
def self.safe_create_for_group(group)
top_level_group = group.root_ancestor
......@@ -10,4 +16,22 @@ class Analytics::CycleAnalytics::Aggregation < ApplicationRecord
insert({ group_id: top_level_group.id }, unique_by: :group_id)
end
def self.load_batch(last_run_at, batch_size = 100)
last_run_at_not_set = Analytics::CycleAnalytics::Aggregation
.enabled
.where(last_incremental_run_at: nil)
.priority_order
.limit(batch_size)
last_run_at_before = Analytics::CycleAnalytics::Aggregation
.enabled
.where('last_incremental_run_at < ?', last_run_at)
.priority_order
.limit(batch_size)
Analytics::CycleAnalytics::Aggregation
.from_union([last_run_at_not_set, last_run_at_before], remove_order: false, remove_duplicates: false)
.limit(batch_size)
end
end
---
name: vsa_incremental_worker
introduced_by_url:
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/353453
milestone: '14.9'
type: development
group: group::optimize
default_enabled: false
......@@ -625,6 +625,9 @@ Gitlab.ee do
Settings.cron_jobs['analytics_devops_adoption_create_all_snapshots_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['analytics_devops_adoption_create_all_snapshots_worker']['cron'] ||= '0 1 * * *'
Settings.cron_jobs['analytics_devops_adoption_create_all_snapshots_worker']['job_class'] = 'Analytics::DevopsAdoption::CreateAllSnapshotsWorker'
Settings.cron_jobs['analytics_cycle_analytics_incremental_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['analytics_cycle_analytics_incremental_worker']['cron'] ||= '*/10 * * * *'
Settings.cron_jobs['analytics_cycle_analytics_incremental_worker']['job_class'] = 'Analytics::CycleAnalytics::IncrementalWorker'
Settings.cron_jobs['active_user_count_threshold_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['active_user_count_threshold_worker']['cron'] ||= '0 12 * * *'
Settings.cron_jobs['active_user_count_threshold_worker']['job_class'] = 'ActiveUserCountThresholdWorker'
......
......@@ -29,8 +29,6 @@
- 1
- - analytics_code_review_metrics
- 1
- - analytics_cycle_analytics_group_data_loader
- 1
- - analytics_devops_adoption_create_snapshot
- 1
- - analytics_usage_trends_counter_job
......
# frozen_string_literal: true
module Analytics
module CycleAnalytics
class AggregationContext
attr_accessor :cursor, :processed_records
def initialize(cursor: {})
@processed_records = 0
@cursor = cursor.compact
end
def processing_start!
@start_time = Gitlab::Metrics::System.monotonic_time
end
def processing_finished!
@end_time = Gitlab::Metrics::System.monotonic_time
end
def runtime
@end_time - @start_time
end
end
end
end
# frozen_string_literal: true
module Analytics
module CycleAnalytics
class AggregatorService
def initialize(aggregation:, mode: :incremental)
raise "Only :incremental mode is supported" if mode != :incremental
@aggregation = aggregation
@mode = mode
@update_params = {
runtime_column => (aggregation[runtime_column] + [0]).last(10),
processed_records_column => (aggregation[processed_records_column] + [0]).last(10)
}
end
def execute
run_aggregation(Issue)
run_aggregation(MergeRequest)
update_params["last_#{mode}_run_at"] = Time.current
aggregation.update!(update_params)
end
private
attr_reader :aggregation, :mode, :update_params
def run_aggregation(model)
response = Analytics::CycleAnalytics::DataLoaderService.new(
group: aggregation.group,
model: model,
context: Analytics::CycleAnalytics::AggregationContext.new(cursor: cursor_for(model))
).execute
handle_response(model, response)
end
def handle_response(model, response)
if response.success?
update_params[updated_at_column(model)] = response.payload[:context].cursor[:updated_at]
update_params[id_column(model)] = response.payload[:context].cursor[:id]
update_params[runtime_column][-1] += response.payload[:context].runtime
update_params[processed_records_column][-1] += response.payload[:context].processed_records
else
update_params.clear
update_params[:enabled] = false
end
end
def cursor_for(model)
{
updated_at: aggregation[updated_at_column(model)],
id: aggregation[id_column(model)]
}.compact
end
def updated_at_column(model)
"last_#{mode}_#{model.table_name}_updated_at"
end
def id_column(model)
"last_#{mode}_#{model.table_name}_id"
end
def runtime_column
"#{mode}_runtimes_in_seconds"
end
def processed_records_column
"#{mode}_processed_records"
end
end
end
end
......@@ -5,7 +5,7 @@ module Analytics
class DataLoaderService
include Validations
MAX_UPSERT_COUNT = 10_000
MAX_UPSERT_COUNT = 50_000
UPSERT_LIMIT = 1000
BATCH_LIMIT = 500
EVENTS_LIMIT = 25
......@@ -15,11 +15,10 @@ module Analytics
MergeRequest => { event_model: MergeRequestStageEvent, project_column: :target_project_id }.freeze
}.freeze
def initialize(group:, model:, cursor: nil, updated_at_before: Time.current)
def initialize(group:, model:, context: Analytics::CycleAnalytics::AggregationContext.new)
@group = group
@model = model
@cursor = cursor
@updated_at_before = updated_at_before
@context = context
@upsert_count = 0
load_stages # ensure stages are loaded/created
......@@ -29,8 +28,9 @@ module Analytics
error_response = validate
return error_response if error_response
response = success(:model_processed, cursor: {})
response = success(:model_processed, context: context)
context.processing_start!
iterator.each_batch(of: BATCH_LIMIT) do |records|
loaded_records = records.to_a
......@@ -38,22 +38,27 @@ module Analytics
load_timestamp_data_into_value_stream_analytics(loaded_records)
context.processed_records += loaded_records.size
context.cursor = cursor_for_node(loaded_records.last)
if upsert_count >= MAX_UPSERT_COUNT
response = success(:limit_reached, cursor: cursor_for_node(loaded_records.last))
response = success(:limit_reached, context: context)
break
end
end
context.processing_finished!
response
end
private
attr_reader :group, :model, :cursor, :updated_at_before, :upsert_count, :stages
attr_reader :group, :model, :context, :upsert_count, :stages
# rubocop: disable CodeReuse/ActiveRecord
def iterator_base_scope
model.updated_before(updated_at_before).order(:updated_at, :id)
model.order(:updated_at, :id)
end
# rubocop: enable CodeReuse/ActiveRecord
......@@ -66,7 +71,7 @@ module Analytics
}
}
Gitlab::Pagination::Keyset::Iterator.new(scope: iterator_base_scope, cursor: cursor, **opts)
Gitlab::Pagination::Keyset::Iterator.new(scope: iterator_base_scope, cursor: context.cursor, **opts)
end
# rubocop: enable CodeReuse/ActiveRecord
......
......@@ -39,6 +39,15 @@
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:analytics_cycle_analytics_incremental
:worker_name: Analytics::CycleAnalytics::IncrementalWorker
:feature_category: :value_stream_management
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:analytics_devops_adoption_create_all_snapshots
:worker_name: Analytics::DevopsAdoption::CreateAllSnapshotsWorker
:feature_category: :devops_reports
......@@ -858,15 +867,6 @@
:weight: 1
:idempotent: true
:tags: []
- :name: analytics_cycle_analytics_group_data_loader
:worker_name: Analytics::CycleAnalytics::GroupDataLoaderWorker
:feature_category: :value_stream_management
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: analytics_devops_adoption_create_snapshot
:worker_name: Analytics::DevopsAdoption::CreateSnapshotWorker
:feature_category: :devops_reports
......
# frozen_string_literal: true
module Analytics
module CycleAnalytics
class GroupDataLoaderWorker
include ApplicationWorker
data_consistency :always
feature_category :value_stream_management
idempotent!
MODEL_KLASSES = %w[Issue MergeRequest].freeze
def perform(group_id, model_klass = MODEL_KLASSES.first, cursor = {}, updated_at_before = Time.current)
group = Group.find_by_id(group_id)
return unless group
model = model_klass.safe_constantize
return unless model
next_model = MODEL_KLASSES[MODEL_KLASSES.index(model_klass) + 1]
response = Analytics::CycleAnalytics::DataLoaderService.new(
group: group,
model: model,
cursor: cursor,
updated_at_before: updated_at_before
).execute
if response.error?
log_extra_metadata_on_done(:error_reason, response[:reason])
elsif response.payload[:reason] == :limit_reached
self.class.perform_in(
2.minutes,
group_id,
model.to_s,
response.payload[:cursor],
updated_at_before
)
elsif response.payload[:reason] == :model_processed && next_model
self.class.perform_in(
2.minutes,
group_id,
next_model,
{},
updated_at_before
)
end
true
end
end
end
end
# frozen_string_literal: true
module Analytics
module CycleAnalytics
class IncrementalWorker
include ApplicationWorker
# rubocop:disable Scalability/CronWorkerContext
# This worker does not perform work scoped to a context
include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext
idempotent!
data_consistency :always
feature_category :value_stream_management
MAX_RUNTIME = 5.minutes
delegate :monotonic_time, to: :'Gitlab::Metrics::System'
def perform
return if Feature.disabled?(:vsa_incremental_worker, default_enabled: :yaml)
current_time = Time.current
start_time = monotonic_time
over_time = false
loop do
batch = Analytics::CycleAnalytics::Aggregation.load_batch(current_time)
break if batch.empty?
batch.each do |aggregation|
Analytics::CycleAnalytics::AggregatorService.new(aggregation: aggregation).execute
if monotonic_time - start_time >= MAX_RUNTIME
over_time = true
break
end
end
break if over_time
end
end
end
end
end
......@@ -29,7 +29,7 @@ RSpec.describe Projects::Analytics::CycleAnalytics::SummaryController do
before do
stub_licensed_features(cycle_analytics_for_projects: true, cycle_analytics_for_groups: true)
Analytics::CycleAnalytics::GroupDataLoaderWorker.new.perform(group.id, 'Issue')
Analytics::CycleAnalytics::DataLoaderService.new(group: group, model: Issue).execute
project.add_reporter(user)
end
......
......@@ -24,14 +24,12 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::DataCollector do
def aggregate_vsa_data(group)
Analytics::CycleAnalytics::DataLoaderService.new(
group: group,
model: Issue,
updated_at_before: Time.now
model: Issue
).execute
Analytics::CycleAnalytics::DataLoaderService.new(
group: group,
model: MergeRequest,
updated_at_before: Time.now
model: MergeRequest
).execute
end
......
......@@ -47,7 +47,7 @@ RSpec.describe Gitlab::Analytics::CycleAnalytics::Summary::StageTimeSummary do
options[:use_aggregated_data_collector] = true
stub_licensed_features(cycle_analytics_for_groups: true)
Analytics::CycleAnalytics::GroupDataLoaderWorker.new.perform(group.id, 'Issue')
Analytics::CycleAnalytics::DataLoaderService.new(group: group, model: Issue).execute
end
it 'loads the lead and cycle time' do
......
# frozen_string_literal: true
# frozen_string_literal
require 'spec_helper'
RSpec.describe Analytics::CycleAnalytics::AggregationContext do
let(:cursor) { {} }
subject(:ctx) { described_class.new(cursor: cursor) }
it 'removes nil values from the cursor' do
cursor[:id] = nil
cursor[:updated_at] = 1
expect(ctx.cursor).to eq({ updated_at: 1 })
end
it 'calculates duration' do
expect(Gitlab::Metrics::System).to receive(:monotonic_time).and_return(100, 500)
ctx.processing_start!
ctx.processing_finished!
expect(ctx.runtime).to eq(400)
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Analytics::CycleAnalytics::AggregatorService do
let!(:group) { create(:group) }
let!(:aggregation) { create(:cycle_analytics_aggregation, :enabled, group: group) }
let(:mode) { :incremental }
def run_service
described_class.new(aggregation: aggregation, mode: mode).execute
end
context 'when invalid mode is given' do
let(:mode) { :other_mode }
it 'raises error' do
expect { run_service }.to raise_error /Only :incremental mode is supported/
end
end
context 'when the group is not licensed' do
it 'sets the aggregation record disabled' do
expect { run_service }.to change { aggregation.reload.enabled }.from(true).to(false)
end
end
context 'when a subgroup is given' do
let(:group) { create(:group, parent: create(:group)) }
it 'sets the aggregation record disabled' do
stub_licensed_features(cycle_analytics_for_groups: true)
expect { run_service }.to change { aggregation.reload.enabled }.from(true).to(false)
end
end
context 'when the aggregation succeeds' do
before do
stub_licensed_features(cycle_analytics_for_groups: true)
end
context 'when nothing to aggregate' do
it 'updates the aggregation record with metadata' do
freeze_time do
run_service
expect(aggregation.reload).to have_attributes(
incremental_runtimes_in_seconds: satisfy(&:one?),
incremental_processed_records: [0],
last_incremental_run_at: Time.current,
last_incremental_merge_requests_updated_at: nil,
last_incremental_merge_requests_id: nil,
last_incremental_issues_updated_at: nil,
last_incremental_issues_id: nil
)
end
end
context 'when the aggregation already contains metadata about the previous runs' do
before do
# we store data for the last 10 runs
aggregation.update!(
incremental_processed_records: [1000] * 10,
incremental_runtimes_in_seconds: [100] * 10
)
end
it 'updates the statistical columns' do
run_service
aggregation.reload
expect(aggregation.incremental_processed_records.length).to eq(10)
expect(aggregation.incremental_runtimes_in_seconds.length).to eq(10)
expect(aggregation.incremental_processed_records[-1]).to eq(0)
expect(aggregation.incremental_processed_records[-1]).not_to eq(100)
end
end
end
context 'when merge requests and issues are present for the configured VSA stages' do
let(:project) { create(:project, group: group) }
let!(:merge_request) { create(:merge_request, :with_merged_metrics, project: project) }
let!(:issue1) { create(:issue, project: project, closed_at: Time.current) }
let!(:issue2) { create(:issue, project: project, closed_at: Time.current) }
before do
create(:cycle_analytics_group_stage,
group: group,
start_event_identifier: :merge_request_created,
end_event_identifier: :merge_request_merged
)
create(:cycle_analytics_group_stage,
group: group,
start_event_identifier: :issue_created,
end_event_identifier: :issue_closed
)
end
it 'updates the aggregation record with record count and the last cursor' do
run_service
expect(aggregation.reload).to have_attributes(
incremental_processed_records: [3],
last_incremental_merge_requests_updated_at: be_within(5.seconds).of(merge_request.updated_at),
last_incremental_merge_requests_id: merge_request.id,
last_incremental_issues_updated_at: be_within(5.seconds).of(issue2.updated_at),
last_incremental_issues_id: issue2.id
)
end
end
end
end
......@@ -80,6 +80,7 @@ RSpec.describe Analytics::CycleAnalytics::DataLoaderService do
expect(service_response).to be_success
expect(service_response.payload[:reason]).to eq(:model_processed)
expect(Analytics::CycleAnalytics::IssueStageEvent.count).to eq(0)
expect(service_response[:context].processed_records).to eq(0)
end
it 'loads nothing for MergeRequest model' do
......@@ -88,6 +89,7 @@ RSpec.describe Analytics::CycleAnalytics::DataLoaderService do
expect(service_response).to be_success
expect(service_response.payload[:reason]).to eq(:model_processed)
expect(Analytics::CycleAnalytics::MergeRequestStageEvent.count).to eq(0)
expect(service_response[:context].processed_records).to eq(0)
end
context 'when MergeRequest data is present' do
......@@ -125,14 +127,6 @@ RSpec.describe Analytics::CycleAnalytics::DataLoaderService do
expect(event_data.sort).to match_array(expected_data.sort)
end
it 'inserts records with record.updated_at < updated_at_before' do
described_class.new(group: top_level_group, model: MergeRequest, updated_at_before: 7.days.ago).execute
mr_ids = Analytics::CycleAnalytics::MergeRequestStageEvent.pluck(:merge_request_id)
expect(mr_ids).to match_array([mr3.id])
end
it 'inserts nothing for group outside of the hierarchy' do
mr = create(:merge_request, :unique_branches, :with_merged_metrics, source_project: project_outside)
......@@ -169,13 +163,15 @@ RSpec.describe Analytics::CycleAnalytics::DataLoaderService do
stub_const('Analytics::CycleAnalytics::DataLoaderService::BATCH_LIMIT', 1)
service_response = described_class.new(group: top_level_group, model: MergeRequest).execute
cursor = service_response.payload[:cursor]
ctx = service_response.payload[:context]
expect(Analytics::CycleAnalytics::MergeRequestStageEvent.count).to eq(1)
described_class.new(group: top_level_group, model: MergeRequest, cursor: cursor).execute
described_class.new(group: top_level_group, model: MergeRequest, context: ctx).execute
expect(Analytics::CycleAnalytics::MergeRequestStageEvent.count).to eq(2)
expect(ctx.processed_records).to eq(2)
expect(ctx.runtime).to be > 0
end
end
end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Analytics::CycleAnalytics::GroupDataLoaderWorker do
let(:group_id) { nil }
let(:model_klass) { 'Issue' }
let(:updated_at_before) { Time.current }
let(:worker) { described_class.new }
def run_worker
worker.perform(group_id, model_klass, {}, updated_at_before)
end
context 'when non-existing group is given' do
let(:group_id) { non_existing_record_id }
it 'does nothing' do
expect(Analytics::CycleAnalytics::DataLoaderService).not_to receive(:new)
expect(run_worker).to eq(nil)
end
end
context 'when invalid model klass is given' do
let(:group_id) { create(:group).id }
let(:model_klass) { 'unknown' }
it 'does nothing' do
expect(Analytics::CycleAnalytics::DataLoaderService).not_to receive(:new)
expect(run_worker).to eq(nil)
end
end
context 'when the data loader returns error response' do
let(:group_id) { create(:group) }
it 'logs the error reason' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:error_reason, :missing_license)
run_worker
end
end
context 'when DataLoaderService is invoked successfully' do
let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, group: group) }
let_it_be(:group_id) { group.id }
let_it_be(:stage) { create(:cycle_analytics_group_stage, group: group, start_event_identifier: :issue_created, end_event_identifier: :issue_closed) }
let_it_be(:issue1) { create(:issue, project: project, updated_at: 5.days.ago) }
let_it_be(:issue2) { create(:issue, project: project, updated_at: 10.days.ago) }
before do
stub_licensed_features(cycle_analytics_for_groups: true)
end
context 'when limit_reached status is returned' do
before do
stub_const('Analytics::CycleAnalytics::DataLoaderService::BATCH_LIMIT', 1)
stub_const('Analytics::CycleAnalytics::DataLoaderService::MAX_UPSERT_COUNT', 1)
end
it 'schedules a new job with the returned cursor' do
expect(described_class).to receive(:perform_in).with(
2.minutes,
group_id,
'Issue',
hash_including('id' => issue2.id.to_s), # cursor, the next job continues the processing after this record
updated_at_before
)
run_worker
end
end
context 'when model_processed status is returned' do
context 'when there is a next model to process' do
it 'schedules a new job with the MergeRequest model' do
expect(described_class).to receive(:perform_in).with(
2.minutes,
group_id,
'MergeRequest',
{},
updated_at_before
)
run_worker # Issue related records are processed
end
end
context 'when there is no next model to process' do
let(:model_klass) { 'MergeRequest' }
it 'stops the execution' do
expect(described_class).not_to receive(:perform_in)
run_worker # after this call, there is no more records to be processed
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Analytics::CycleAnalytics::IncrementalWorker do
def run_worker
described_class.new.perform
end
it_behaves_like 'an idempotent worker'
context 'when the vsa_incremental_worker FF is disabled' do
it 'does nothing' do
stub_feature_flags(vsa_incremental_worker: false)
expect(Analytics::CycleAnalytics::Aggregation).not_to receive(:load_batch)
run_worker
end
end
context 'when the loaded batch is empty' do
it 'does nothing' do
expect(Analytics::CycleAnalytics::AggregatorService).not_to receive(:new)
run_worker
end
end
it 'invokes the AggregatorService' do
aggregation = create(:cycle_analytics_aggregation)
expect(Analytics::CycleAnalytics::AggregatorService).to receive(:new).with(aggregation: aggregation).and_call_original
run_worker
end
it 'breaks at the second iteration due to overtime' do
create_list(:cycle_analytics_aggregation, 2)
first_monotonic_time = 100
second_monotonic_time = first_monotonic_time + Analytics::CycleAnalytics::IncrementalWorker::MAX_RUNTIME.to_i + 10
expect(Gitlab::Metrics::System).to receive(:monotonic_time).and_return(first_monotonic_time, second_monotonic_time)
expect(Analytics::CycleAnalytics::AggregatorService).to receive(:new).and_call_original.exactly(:once)
run_worker
end
end
# frozen_string_literal: true
FactoryBot.define do
factory :cycle_analytics_aggregation, class: 'Analytics::CycleAnalytics::Aggregation' do
group
enabled { true }
trait :disabled do
enabled { false }
end
trait :enabled do
enabled { true }
end
end
end
......@@ -51,4 +51,28 @@ RSpec.describe Analytics::CycleAnalytics::Aggregation, type: :model do
end
end
end
describe '#load_batch' do
let!(:aggregation1) { create(:cycle_analytics_aggregation, last_incremental_run_at: nil) }
let!(:aggregation2) { create(:cycle_analytics_aggregation, last_incremental_run_at: 5.days.ago).reload }
let!(:aggregation3) { create(:cycle_analytics_aggregation, last_incremental_run_at: nil) }
let!(:aggregation5) { create(:cycle_analytics_aggregation, last_incremental_run_at: 10.days.ago).reload }
before do
create(:cycle_analytics_aggregation, :disabled) # disabled rows are skipped
create(:cycle_analytics_aggregation, last_incremental_run_at: 1.day.ago) # "early" rows are filtered out
end
it 'loads records in priority order' do
batch = described_class.load_batch(2.days.ago).to_a
expect(batch.size).to eq(4)
first_two = batch.first(2)
last_two = batch.last(2)
# Using match_array because the order can be undeterministic for nil values.
expect(first_two).to match_array([aggregation1, aggregation3])
expect(last_two).to eq([aggregation5, aggregation2])
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment