Commit 536c324d authored by Adam Hegyi's avatar Adam Hegyi

Worker for populating the new VSA tables

This change adds a worker and a service class for populating the value
stream analytics stage event classes.
parent 6bb96f42
......@@ -3,9 +3,14 @@
module Analytics
module CycleAnalytics
class IssueStageEvent < ApplicationRecord
include StageEventModel
extend SuppressCompositePrimaryKeyWarning
validates(*%i[stage_event_hash_id issue_id group_id project_id start_event_timestamp], presence: true)
def self.issuable_id_column
:issue_id
end
end
end
end
......@@ -3,9 +3,14 @@
module Analytics
module CycleAnalytics
class MergeRequestStageEvent < ApplicationRecord
include StageEventModel
extend SuppressCompositePrimaryKeyWarning
validates(*%i[stage_event_hash_id merge_request_id group_id project_id start_event_timestamp], presence: true)
def self.issuable_id_column
:merge_request_id
end
end
end
end
......@@ -27,7 +27,8 @@ module Analytics
alias_attribute :custom_stage?, :custom
scope :default_stages, -> { where(custom: false) }
scope :ordered, -> { order(:relative_position, :id) }
scope :for_list, -> { includes(:start_event_label, :end_event_label).ordered }
scope :with_preloaded_labels, -> { includes(:start_event_label, :end_event_label) }
scope :for_list, -> { with_preloaded_labels.ordered }
scope :by_value_stream, -> (value_stream) { where(value_stream_id: value_stream.id) }
before_save :ensure_stage_event_hash_id
......
# frozen_string_literal: true
module Analytics
module CycleAnalytics
module StageEventModel
extend ActiveSupport::Concern
class_methods do
def upsert_data(data)
upsert_values = data.map do |row|
row.values_at(
:stage_event_hash_id,
:issuable_id,
:group_id,
:project_id,
:author_id,
:milestone_id,
:start_event_timestamp,
:end_event_timestamp
)
end
value_list = Arel::Nodes::ValuesList.new(upsert_values).to_sql
query = <<~SQL
INSERT INTO #{quoted_table_name}
(
stage_event_hash_id,
#{connection.quote_column_name(issuable_id_column)},
group_id,
project_id,
milestone_id,
author_id,
start_event_timestamp,
end_event_timestamp
)
#{value_list}
ON CONFLICT(stage_event_hash_id, #{issuable_id_column})
DO UPDATE SET
group_id = excluded.group_id,
project_id = excluded.project_id,
start_event_timestamp = excluded.start_event_timestamp,
end_event_timestamp = excluded.end_event_timestamp,
milestone_id = excluded.milestone_id,
author_id = excluded.author_id
SQL
result = connection.execute(query)
result.cmd_tuples
end
end
end
end
end
......@@ -29,6 +29,8 @@
- 1
- - analytics_code_review_metrics
- 1
- - analytics_cycle_analytics_group_data_loader
- 1
- - analytics_devops_adoption_create_snapshot
- 1
- - analytics_usage_trends_counter_job
......
......@@ -22,6 +22,12 @@ module Analytics
def self.relative_positioning_parent_column
:group_id
end
def self.distinct_stages_within_hierarchy(group)
with_preloaded_labels
.where(group_id: group.self_and_descendants.select(:id))
.select("DISTINCT ON(stage_event_hash_id) #{quoted_table_name}.*")
end
end
end
end
# frozen_string_literal: true
module Analytics
module CycleAnalytics
class DataLoaderService
MAX_UPSERT_COUNT = 10_000
UPSERT_LIMIT = 1000
BATCH_LIMIT = 500
EVENTS_LIMIT = 25
CONFIG_MAPPING = {
Issue => { event_model: IssueStageEvent, project_column: :project_id }.freeze,
MergeRequest => { event_model: MergeRequestStageEvent, project_column: :target_project_id }.freeze
}.freeze
def initialize(group:, model:, cursor: nil, updated_at_before: Time.current)
@group = group
@model = model
@cursor = cursor
@updated_at_before = updated_at_before
@upsert_count = 0
end
def execute
unless model == Issue || model == MergeRequest
return error(:invalid_model)
end
unless group.licensed_feature_available?(:cycle_analytics_for_groups)
return error(:missing_license)
end
unless group.root_ancestor == group
return error(:requires_top_level_group)
end
response = success(:model_processed)
iterator.each_batch(of: BATCH_LIMIT) do |records|
loaded_records = records.to_a
break if records.empty?
load_timestamp_data_into_value_stream_analytics(loaded_records)
if upsert_count >= MAX_UPSERT_COUNT
response = success(:limit_reached, cursor: cursor_for_node(loaded_records.last))
break
end
end
response
end
private
attr_reader :group, :model, :cursor, :updated_at_before, :upsert_count
def error(error_reason)
ServiceResponse.error(
message: "DataLoader error for group: #{group.id} (#{error_reason})",
payload: { reason: error_reason }
)
end
def success(success_reason, cursor: {})
ServiceResponse.success(payload: { reason: success_reason, cursor: cursor })
end
# rubocop: disable CodeReuse/ActiveRecord
def iterator_base_scope
model.updated_before(updated_at_before).order(:updated_at, :id)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def iterator
opts = {
in_operator_optimization_options: {
array_scope: group.all_projects.select(:id),
array_mapping_scope: -> (id_expression) { model.where(model.arel_table[project_column].eq(id_expression)) }
}
}
Gitlab::Pagination::Keyset::Iterator.new(scope: iterator_base_scope, cursor: cursor, **opts)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def load_timestamp_data_into_value_stream_analytics(loaded_records)
records_by_id = {}
events.each_slice(EVENTS_LIMIT) do |event_slice|
scope = model.join_project.id_in(loaded_records.pluck(:id))
current_select_columns = select_columns # default SELECT columns
# Add the stage timestamp columns to the SELECT
event_slice.each do |event|
scope = event.include_in(scope)
current_select_columns << event.timestamp_projection.as(event_column_name(event))
end
record_attributes = scope
.reselect(*current_select_columns)
.to_a
.map(&:attributes)
records_by_id.deep_merge!(record_attributes.index_by { |attr| attr['id'] }.compact)
end
upsert_data(records_by_id)
end
# rubocop: enable CodeReuse/ActiveRecord
def upsert_data(records)
data = []
records.each_value do |record|
stages.each do |stage|
next if record[event_column_name(stage.start_event)].nil?
data << {
stage_event_hash_id: stage.stage_event_hash_id,
issuable_id: record['id'],
group_id: record['group_id'],
project_id: record['project_id'],
author_id: record['author_id'],
milestone_id: record['milestone_id'],
start_event_timestamp: record[event_column_name(stage.start_event)],
end_event_timestamp: record[event_column_name(stage.end_event)]
}
if data.size == UPSERT_LIMIT
@upsert_count += event_model.upsert_data(data)
data.clear
end
end
end
@upsert_count += event_model.upsert_data(data) if data.any?
end
def select_columns
[
model.arel_table[:id],
model.arel_table[project_column].as('project_id'),
model.arel_table[:milestone_id],
model.arel_table[:author_id],
Project.arel_table[:parent_id].as('group_id')
]
end
# rubocop: disable CodeReuse/ActiveRecord
def cursor_for_node(record)
scope, _ = Gitlab::Pagination::Keyset::SimpleOrderBuilder.build(iterator_base_scope)
order = Gitlab::Pagination::Keyset::Order.extract_keyset_order_object(scope)
order.cursor_attributes_for_node(record)
end
# rubocop: enable CodeReuse/ActiveRecord
def project_column
CONFIG_MAPPING.fetch(model).fetch(:project_column)
end
def event_model
CONFIG_MAPPING.fetch(model).fetch(:event_model)
end
def event_column_name(event)
"column_" + event.hash_code[0...10]
end
def stages
@stages ||= Analytics::CycleAnalytics::GroupStage
.distinct_stages_within_hierarchy(group)
.select { |stage| stage.start_event.object_type == model }
end
def events
@events ||= stages
.flat_map { |stage| [stage.start_event, stage.end_event] }
.uniq { |event| event.hash_code }
end
end
end
end
......@@ -849,6 +849,15 @@
:weight: 1
:idempotent: true
:tags: []
- :name: analytics_cycle_analytics_group_data_loader
:worker_name: Analytics::CycleAnalytics::GroupDataLoaderWorker
:feature_category: :value_stream_management
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: analytics_devops_adoption_create_snapshot
:worker_name: Analytics::DevopsAdoption::CreateSnapshotWorker
:feature_category: :devops_reports
......
# frozen_string_literal: true
module Analytics
module CycleAnalytics
class GroupDataLoaderWorker
include ApplicationWorker
data_consistency :always
feature_category :value_stream_management
idempotent!
MODEL_KLASSES = %w[Issue MergeRequest].freeze
# rubocop: disable CodeReuse/ActiveRecord
def perform(group_id, model_klass = MODEL_KLASSES.first, cursor = {}, updated_at_before = Time.current)
group = Group.find_by(id: group_id)
return unless group
model = model_klass.safe_constantize
return unless model
next_model = MODEL_KLASSES[MODEL_KLASSES.index(model_klass) + 1]
response = Analytics::CycleAnalytics::DataLoaderService.new(
group: group,
model: model,
cursor: cursor,
updated_at_before: updated_at_before
).execute
if response.error?
log_extra_metadata_on_done(:error_reason, response[:reason])
elsif response.payload[:reason] == :limit_reached
self.class.perform_in(
2.minutes,
group_id,
model.to_s,
response.payload[:cursor],
updated_at_before
)
elsif response.payload[:reason] == :model_processed && next_model
self.class.perform_in(
2.minutes,
group_id,
next_model,
{},
updated_at_before
)
end
true
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
end
......@@ -34,4 +34,33 @@ RSpec.describe Analytics::CycleAnalytics::GroupStage do
let(:default_params) { { group: parent } }
end
end
describe '.distinct_stages_within_hierarchy' do
let_it_be(:group) { create(:group) }
let_it_be(:sub_group) { create(:group, parent: group) }
before do
# event identifiers are the same
create(:cycle_analytics_group_stage, name: 'Stage A1', group: group, start_event_identifier: :merge_request_created, end_event_identifier: :merge_request_merged)
create(:cycle_analytics_group_stage, name: 'Stage A2', group: sub_group, start_event_identifier: :merge_request_created, end_event_identifier: :merge_request_merged)
create(:cycle_analytics_group_stage, name: 'Stage A3', group: sub_group, start_event_identifier: :merge_request_created, end_event_identifier: :merge_request_merged)
create(:cycle_analytics_group_stage, name: 'Stage B1', group: group, start_event_identifier: :merge_request_created, end_event_identifier: :merge_request_closed)
end
it 'returns distinct stages by the event identifiers' do
stages = described_class.distinct_stages_within_hierarchy(group).to_a
expected_event_pairs = [
%w[merge_request_created merge_request_merged],
%w[merge_request_created merge_request_closed]
]
current_event_pairs = stages.map do |stage|
[stage.start_event_identifier, stage.end_event_identifier]
end
expect(current_event_pairs).to eq(expected_event_pairs)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Analytics::CycleAnalytics::DataLoaderService do
let_it_be_with_refind(:top_level_group) { create(:group) }
describe 'validations' do
let(:group) { top_level_group }
let(:model) { Issue }
subject(:service_response) { described_class.new(group: group, model: model).execute }
context 'when wrong model is passed' do
let(:model) { Project }
it 'returns service error response' do
expect(service_response).to be_error
expect(service_response.payload[:reason]).to eq(:invalid_model)
end
end
context 'when license is missing' do
it 'returns service error response' do
expect(service_response).to be_error
expect(service_response.payload[:reason]).to eq(:missing_license)
end
end
context 'when sub-group is given' do
let(:group) { create(:group, parent: top_level_group) }
it 'returns service error response' do
stub_licensed_features(cycle_analytics_for_groups: true)
expect(service_response).to be_error
expect(service_response.payload[:reason]).to eq(:requires_top_level_group)
end
end
end
describe 'data loading into stage tables' do
let_it_be(:sub_group) { create(:group, parent: top_level_group) }
let_it_be(:other_group) { create(:group) }
let_it_be(:project1) { create(:project, :repository, group: top_level_group) }
let_it_be(:project2) { create(:project, :repository, group: sub_group) }
let_it_be(:project_outside) { create(:project, group: other_group) }
let_it_be(:stage1) do
create(:cycle_analytics_group_stage, {
group: sub_group,
start_event_identifier: :merge_request_created,
end_event_identifier: :merge_request_merged
})
end
let_it_be(:stage2) do
create(:cycle_analytics_group_stage, {
group: top_level_group,
start_event_identifier: :issue_created,
end_event_identifier: :issue_closed
})
end
let_it_be(:stage_in_other_group) do
create(:cycle_analytics_group_stage, {
group: other_group,
start_event_identifier: :issue_created,
end_event_identifier: :issue_closed
})
end
before do
stub_licensed_features(cycle_analytics_for_groups: true)
end
it 'loads nothing for Issue model' do
service_response = described_class.new(group: top_level_group, model: Issue).execute
expect(service_response).to be_success
expect(service_response.payload[:reason]).to eq(:model_processed)
expect(Analytics::CycleAnalytics::IssueStageEvent.count).to eq(0)
end
it 'loads nothing for MergeRequest model' do
service_response = described_class.new(group: top_level_group, model: MergeRequest).execute
expect(service_response).to be_success
expect(service_response.payload[:reason]).to eq(:model_processed)
expect(Analytics::CycleAnalytics::MergeRequestStageEvent.count).to eq(0)
end
context 'when MergeRequest data is present' do
let_it_be(:mr1) { create(:merge_request, :unique_branches, :with_merged_metrics, updated_at: 2.days.ago, source_project: project1) }
let_it_be(:mr2) { create(:merge_request, :unique_branches, :with_merged_metrics, updated_at: 5.days.ago, source_project: project1) }
let_it_be(:mr3) { create(:merge_request, :unique_branches, :with_merged_metrics, updated_at: 10.days.ago, source_project: project2) }
it 'inserts stage records' do
expected_data = [mr1, mr2, mr3].map do |mr|
mr.reload # reload timestamps from the DB
[
mr.id,
mr.project.parent_id,
mr.project_id,
mr.created_at,
mr.metrics.merged_at
]
end
described_class.new(group: top_level_group, model: MergeRequest).execute
events = Analytics::CycleAnalytics::MergeRequestStageEvent.all
event_data = events.map do |event|
[
event.merge_request_id,
event.group_id,
event.project_id,
event.start_event_timestamp,
event.end_event_timestamp
]
end
expect(event_data.sort).to match_array(expected_data.sort)
end
it 'inserts records with record.updated_at < updated_at_before' do
described_class.new(group: top_level_group, model: MergeRequest, updated_at_before: 7.days.ago).execute
mr_ids = Analytics::CycleAnalytics::MergeRequestStageEvent.pluck(:merge_request_id)
expect(mr_ids).to match_array([mr3.id])
end
it 'inserts nothing for group outside of the hierarchy' do
mr = create(:merge_request, :unique_branches, :with_merged_metrics, source_project: project_outside)
described_class.new(group: top_level_group, model: MergeRequest).execute
record_count = Analytics::CycleAnalytics::MergeRequestStageEvent.where(merge_request_id: mr.id).count
expect(record_count).to eq(0)
end
context 'when all records are processed' do
it 'finishes with model_processed reason' do
service_response = described_class.new(group: top_level_group, model: MergeRequest).execute
expect(service_response).to be_success
expect(service_response.payload[:reason]).to eq(:model_processed)
end
end
context 'when MAX_UPSERT_COUNT is reached' do
it 'finishes with limit_reached reason' do
stub_const('Analytics::CycleAnalytics::DataLoaderService::MAX_UPSERT_COUNT', 1)
stub_const('Analytics::CycleAnalytics::DataLoaderService::BATCH_LIMIT', 1)
service_response = described_class.new(group: top_level_group, model: MergeRequest).execute
expect(service_response).to be_success
expect(service_response.payload[:reason]).to eq(:limit_reached)
end
end
context 'when cursor is given' do
it 'continues processing the records from the cursor' do
stub_const('Analytics::CycleAnalytics::DataLoaderService::MAX_UPSERT_COUNT', 1)
stub_const('Analytics::CycleAnalytics::DataLoaderService::BATCH_LIMIT', 1)
service_response = described_class.new(group: top_level_group, model: MergeRequest).execute
cursor = service_response.payload[:cursor]
expect(Analytics::CycleAnalytics::MergeRequestStageEvent.count).to eq(1)
described_class.new(group: top_level_group, model: MergeRequest, cursor: cursor).execute
expect(Analytics::CycleAnalytics::MergeRequestStageEvent.count).to eq(2)
end
end
end
context 'when Issue data is present' do
let_it_be(:issue1) { create(:issue, project: project1, closed_at: Time.current) }
let_it_be(:issue2) { create(:issue, project: project1, closed_at: Time.current) }
let_it_be(:issue3) { create(:issue, project: project2, closed_at: Time.current) }
it 'inserts stage records' do
expected_data = [issue1, issue2, issue3].map do |issue|
issue.reload
[
issue.id,
issue.project.parent_id,
issue.project_id,
issue.created_at,
issue.closed_at
]
end
described_class.new(group: top_level_group, model: Issue).execute
events = Analytics::CycleAnalytics::IssueStageEvent.all
event_data = events.map do |event|
[
event.issue_id,
event.group_id,
event.project_id,
event.start_event_timestamp,
event.end_event_timestamp
]
end
expect(event_data.sort).to match_array(expected_data.sort)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Analytics::CycleAnalytics::GroupDataLoaderWorker do
let(:group_id) { nil }
let(:model_klass) { 'Issue' }
let(:updated_at_before) { Time.current }
let(:worker) { described_class.new }
def run_worker
worker.perform(group_id, model_klass, {}, updated_at_before)
end
context 'when non-existing group is given' do
let(:group_id) { non_existing_record_id }
it 'does nothing' do
expect(Analytics::CycleAnalytics::DataLoaderService).not_to receive(:new)
expect(run_worker).to eq(nil)
end
end
context 'when invalid model klass is given' do
let(:group_id) { create(:group).id }
let(:model_klass) { 'unknown' }
it 'does nothing' do
expect(Analytics::CycleAnalytics::DataLoaderService).not_to receive(:new)
expect(run_worker).to eq(nil)
end
end
context 'when the data loader returns error response' do
let(:group_id) { create(:group) }
it 'logs the error reason' do
expect(worker).to receive(:log_extra_metadata_on_done).with(:error_reason, :missing_license)
run_worker
end
end
context 'when DataLoaderService is invoked successfully' do
let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, group: group) }
let_it_be(:group_id) { group.id }
let_it_be(:stage) { create(:cycle_analytics_group_stage, group: group, start_event_identifier: :issue_created, end_event_identifier: :issue_closed) }
let_it_be(:issue1) { create(:issue, project: project, updated_at: 5.days.ago) }
let_it_be(:issue2) { create(:issue, project: project, updated_at: 10.days.ago) }
before do
stub_licensed_features(cycle_analytics_for_groups: true)
end
context 'when limit_reached status is returned' do
before do
stub_const('Analytics::CycleAnalytics::DataLoaderService::BATCH_LIMIT', 1)
stub_const('Analytics::CycleAnalytics::DataLoaderService::MAX_UPSERT_COUNT', 1)
end
it 'schedules a new job with the returned cursor' do
expect(described_class).to receive(:perform_in).with(
2.minutes,
group_id,
'Issue',
hash_including('id' => issue2.id.to_s), # cursor, the next job continues the processing after this record
updated_at_before
)
run_worker
end
end
context 'when model_processed status is returned' do
context 'when there is a next model to process' do
it 'schedules a new job with the MergeRequest model' do
expect(described_class).to receive(:perform_in).with(
2.minutes,
group_id,
'MergeRequest',
{},
updated_at_before
)
run_worker # Issue related records are processed
end
end
context 'when there is no next model to process' do
let(:model_klass) { 'MergeRequest' }
it 'stops the execution' do
expect(described_class).not_to receive(:perform_in)
run_worker # after this call, there is no more records to be processed
end
end
end
end
end
......@@ -74,7 +74,6 @@ RSpec.describe Gitlab::Pagination::Keyset::Iterator do
expect(loaded_records).to eq(project.issues.order(custom_reorder).take(2))
# continuing the iteration
new_iterator = described_class.new(**iterator_params.merge(cursor: cursor))
new_iterator.each_batch(of: 2) do |relation|
loaded_records.concat(relation.to_a)
......
......@@ -8,4 +8,6 @@ RSpec.describe Analytics::CycleAnalytics::IssueStageEvent do
it { is_expected.to validate_presence_of(:group_id) }
it { is_expected.to validate_presence_of(:project_id) }
it { is_expected.to validate_presence_of(:start_event_timestamp) }
it_behaves_like 'StageEventModel'
end
......@@ -8,4 +8,6 @@ RSpec.describe Analytics::CycleAnalytics::MergeRequestStageEvent do
it { is_expected.to validate_presence_of(:group_id) }
it { is_expected.to validate_presence_of(:project_id) }
it { is_expected.to validate_presence_of(:start_event_timestamp) }
it_behaves_like 'StageEventModel'
end
# frozen_string_literal: true
RSpec.shared_examples 'StageEventModel' do
describe '.upsert_data' do
let(:time) { Time.parse(Time.current.to_s(:db)) } # truncating the timestamp so we can compare it with the timestamp loaded from the DB
let(:input_data) do
[
{
stage_event_hash_id: 1,
issuable_id: 2,
group_id: 3,
project_id: 4,
author_id: 5,
milestone_id: 6,
start_event_timestamp: time,
end_event_timestamp: time
},
{
stage_event_hash_id: 7,
issuable_id: 8,
group_id: 10,
project_id: 11,
author_id: 12,
milestone_id: 13,
start_event_timestamp: time,
end_event_timestamp: time
}
]
end
let(:column_order) do
[
:stage_event_hash_id,
described_class.issuable_id_column,
:group_id,
:project_id,
:milestone_id,
:author_id,
:start_event_timestamp,
:end_event_timestamp
]
end
subject(:upsert_data) { described_class.upsert_data(input_data) }
it 'inserts the data' do
upsert_data
expect(described_class.count).to eq(input_data.count)
end
it 'does not produce duplicate rows' do
2.times { upsert_data }
expect(described_class.count).to eq(input_data.count)
end
it 'inserts the data correctly' do
upsert_data
output_data = described_class.all.map do |record|
column_order.map { |column| record[column] }
end.sort
expect(input_data.map(&:values).sort).to eq(output_data)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment