Commit e96ddd8c authored by Fabio Pitino's avatar Fabio Pitino

Use CounterAttribute for build_artifacts_size

Make increment of build_artifacts_size use CounterAttribute
concern behind feature flag.
parent 26dec3e5
......@@ -20,6 +20,14 @@
# To increment the counter we can use the method:
# delayed_increment_counter(:commit_count, 3)
#
# It is possible to register callbacks to be executed after increments have
# been flushed to the database. Callbacks are not executed if there are no increments
# to flush.
#
# counter_attribute_after_flush do |statistic|
# Namespaces::ScheduleAggregationWorker.perform_async(statistic.namespace_id)
# end
#
module CounterAttribute
extend ActiveSupport::Concern
extend AfterCommitQueue
......@@ -48,6 +56,15 @@ module CounterAttribute
def counter_attributes
@counter_attributes ||= Set.new
end
def after_flush_callbacks
@after_flush_callbacks ||= []
end
# perform registered callbacks after increments have been flushed to the database
def counter_attribute_after_flush(&callback)
after_flush_callbacks << callback
end
end
# This method must only be called by FlushCounterIncrementsWorker
......@@ -75,6 +92,8 @@ module CounterAttribute
unsafe_update_counters(id, attribute => increment_value)
redis_state { |redis| redis.del(flushed_key) }
end
execute_after_flush_callbacks
end
end
......@@ -108,13 +127,13 @@ module CounterAttribute
counter_key(attribute) + ':lock'
end
private
def counter_attribute_enabled?(attribute)
Feature.enabled?(:efficient_counter_attribute, project) &&
self.class.counter_attributes.include?(attribute)
end
private
def steal_increments(increment_key, flushed_key)
redis_state do |redis|
redis.eval(LUA_STEAL_INCREMENT_SCRIPT, keys: [increment_key, flushed_key])
......@@ -129,6 +148,12 @@ module CounterAttribute
self.class.update_counters(id, increments)
end
def execute_after_flush_callbacks
self.class.after_flush_callbacks.each do |callback|
callback.call(self)
end
end
def redis_state(&block)
Gitlab::Redis::SharedState.with(&block)
end
......
......@@ -80,10 +80,7 @@ module UpdateProjectStatistics
run_after_commit do
ProjectStatistics.increment_statistic(
project_id, self.class.project_statistics_name, delta)
Namespaces::ScheduleAggregationWorker.perform_async(
project.namespace_id)
project, self.class.project_statistics_name, delta)
end
end
end
......
......@@ -2,6 +2,7 @@
class ProjectStatistics < ApplicationRecord
include AfterCommitQueue
include CounterAttribute
belongs_to :project
belongs_to :namespace
......@@ -9,6 +10,13 @@ class ProjectStatistics < ApplicationRecord
default_value_for :wiki_size, 0
default_value_for :snippets_size, 0
counter_attribute :build_artifacts_size
counter_attribute :storage_size
counter_attribute_after_flush do |project_statistic|
Namespaces::ScheduleAggregationWorker.perform_async(project_statistic.namespace_id)
end
before_save :update_storage_size
COLUMNS_TO_REFRESH = [:repository_size, :wiki_size, :lfs_objects_size, :commit_count, :snippets_size].freeze
......@@ -96,12 +104,27 @@ class ProjectStatistics < ApplicationRecord
# Additional columns are updated depending on key => [columns], which allows
# to update statistics which are and also those which aren't included in storage_size
# or any other additional summary column in the future.
def self.increment_statistic(project_id, key, amount)
def self.increment_statistic(project, key, amount)
raise ArgumentError, "Cannot increment attribute: #{key}" unless INCREMENTABLE_COLUMNS.key?(key)
return if amount == 0
where(project_id: project_id)
.columns_to_increment(key, amount)
project.statistics.try do |project_statistics|
if project_statistics.counter_attribute_enabled?(key)
statistics_to_increment = [key] + INCREMENTABLE_COLUMNS[key].to_a
statistics_to_increment.each do |statistic|
project_statistics.delayed_increment_counter(statistic, amount)
end
else
legacy_increment_statistic(project, key, amount)
end
end
end
def self.legacy_increment_statistic(project, key, amount)
where(project_id: project.id).columns_to_increment(key, amount)
Namespaces::ScheduleAggregationWorker.perform_async( # rubocop: disable CodeReuse/Worker
project.namespace_id)
end
def self.columns_to_increment(key, amount)
......
......@@ -19,7 +19,7 @@ RSpec.describe Ci::JobArtifact do
it_behaves_like 'having unique enum values'
it_behaves_like 'UpdateProjectStatistics' do
it_behaves_like 'UpdateProjectStatistics', :with_counter_attribute do
let_it_be(:job, reload: true) { create(:ci_build) }
subject { build(:ci_job_artifact, :archive, job: job, size: 107464) }
......
......@@ -12,6 +12,36 @@ RSpec.describe CounterAttribute, :counter_attribute, :clean_gitlab_redis_shared_
let(:model) { CounterAttributeModel.find(project_statistics.id) }
end
describe 'after_flush callbacks' do
let(:attribute) { model.class.counter_attributes.first}
subject { model.flush_increments_to_database!(attribute) }
it 'has registered callbacks' do # defined in :counter_attribute RSpec tag
expect(model.class.after_flush_callbacks.size).to eq(1)
end
context 'when there are increments to flush' do
before do
model.delayed_increment_counter(attribute, 10)
end
it 'executes the callbacks' do
subject
expect(model.flushed).to be_truthy
end
end
context 'when there are no increments to flush' do
it 'does not execute the callbacks' do
subject
expect(model.flushed).to be_nil
end
end
end
describe '.steal_increments' do
let(:increment_key) { 'counters:Model:123:attribute' }
let(:flushed_key) { 'counter:Model:123:attribute:flushed' }
......
......@@ -324,22 +324,51 @@ RSpec.describe ProjectStatistics do
describe '.increment_statistic' do
shared_examples 'a statistic that increases storage_size' do
it 'increases the statistic by that amount' do
expect { described_class.increment_statistic(project.id, stat, 13) }
expect { described_class.increment_statistic(project, stat, 13) }
.to change { statistics.reload.send(stat) || 0 }
.by(13)
end
it 'increases also storage size by that amount' do
expect { described_class.increment_statistic(project.id, stat, 20) }
expect { described_class.increment_statistic(project, stat, 20) }
.to change { statistics.reload.storage_size }
.by(20)
end
end
shared_examples 'a statistic that increases storage_size asynchronously' do
it 'stores the increment temporarily in Redis', :clean_gitlab_redis_shared_state do
described_class.increment_statistic(project, stat, 13)
Gitlab::Redis::SharedState.with do |redis|
increment = redis.get(statistics.counter_key(stat))
expect(increment.to_i).to eq(13)
end
end
it 'schedules a worker to update the statistic and storage_size async' do
expect(FlushCounterIncrementsWorker)
.to receive(:perform_in)
.with(CounterAttribute::WORKER_DELAY, described_class.name, statistics.id, stat)
expect(FlushCounterIncrementsWorker)
.to receive(:perform_in)
.with(CounterAttribute::WORKER_DELAY, described_class.name, statistics.id, :storage_size)
described_class.increment_statistic(project, stat, 20)
end
end
context 'when adjusting :build_artifacts_size' do
let(:stat) { :build_artifacts_size }
it_behaves_like 'a statistic that increases storage_size'
it_behaves_like 'a statistic that increases storage_size asynchronously'
it_behaves_like 'a statistic that increases storage_size' do
before do
stub_feature_flags(efficient_counter_attribute: false)
end
end
end
context 'when adjusting :pipeline_artifacts_size' do
......
......@@ -9,6 +9,12 @@ RSpec.configure do |config|
counter_attribute :build_artifacts_size
counter_attribute :commit_count
attr_accessor :flushed
counter_attribute_after_flush do |subject|
subject.flushed = true
end
end
end
end
# frozen_string_literal: true
RSpec.shared_examples 'UpdateProjectStatistics' do
RSpec.shared_examples 'UpdateProjectStatistics' do |with_counter_attribute|
let(:project) { subject.project }
let(:project_statistics_name) { described_class.project_statistics_name }
let(:statistic_attribute) { described_class.statistic_attribute }
......@@ -13,108 +13,230 @@ RSpec.shared_examples 'UpdateProjectStatistics' do
subject.read_attribute(statistic_attribute).to_i
end
it { is_expected.to be_new_record }
def read_pending_increment
Gitlab::Redis::SharedState.with do |redis|
key = project.statistics.counter_key(project_statistics_name)
redis.get(key).to_i
end
end
context 'when creating' do
it 'updates the project statistics' do
delta0 = reload_stat
it { is_expected.to be_new_record }
subject.save!
context 'when feature flag efficient_counter_attribute is disabled' do
before do
stub_feature_flags(efficient_counter_attribute: false)
end
delta1 = reload_stat
context 'when creating' do
it 'updates the project statistics' do
delta0 = reload_stat
expect(delta1).to eq(delta0 + read_attribute)
expect(delta1).to be > delta0
end
subject.save!
it 'schedules a namespace statistics worker' do
expect(Namespaces::ScheduleAggregationWorker)
.to receive(:perform_async).once
delta1 = reload_stat
subject.save!
end
end
expect(delta1).to eq(delta0 + read_attribute)
expect(delta1).to be > delta0
end
context 'when updating' do
let(:delta) { 42 }
it 'schedules a namespace statistics worker' do
expect(Namespaces::ScheduleAggregationWorker)
.to receive(:perform_async).once
before do
subject.save!
subject.save!
end
end
it 'updates project statistics' do
expect(ProjectStatistics)
.to receive(:increment_statistic)
.and_call_original
context 'when updating' do
let(:delta) { 42 }
subject.write_attribute(statistic_attribute, read_attribute + delta)
before do
subject.save!
end
expect { subject.save! }
.to change { reload_stat }
.by(delta)
end
it 'updates project statistics' do
expect(ProjectStatistics)
.to receive(:increment_statistic)
.and_call_original
it 'schedules a namespace statistics worker' do
expect(Namespaces::ScheduleAggregationWorker)
.to receive(:perform_async).once
subject.write_attribute(statistic_attribute, read_attribute + delta)
subject.write_attribute(statistic_attribute, read_attribute + delta)
subject.save!
end
expect { subject.save! }
.to change { reload_stat }
.by(delta)
end
it 'avoids N + 1 queries' do
subject.write_attribute(statistic_attribute, read_attribute + delta)
it 'schedules a namespace statistics worker' do
expect(Namespaces::ScheduleAggregationWorker)
.to receive(:perform_async).once
control_count = ActiveRecord::QueryRecorder.new do
subject.write_attribute(statistic_attribute, read_attribute + delta)
subject.save!
end
subject.write_attribute(statistic_attribute, read_attribute + delta)
it 'avoids N + 1 queries' do
subject.write_attribute(statistic_attribute, read_attribute + delta)
expect do
subject.save!
end.not_to exceed_query_limit(control_count)
end
end
control_count = ActiveRecord::QueryRecorder.new do
subject.save!
end
context 'when destroying' do
before do
subject.save!
subject.write_attribute(statistic_attribute, read_attribute + delta)
expect do
subject.save!
end.not_to exceed_query_limit(control_count)
end
end
it 'updates the project statistics' do
delta0 = reload_stat
context 'when destroying' do
before do
subject.save!
end
subject.destroy!
it 'updates the project statistics' do
delta0 = reload_stat
delta1 = reload_stat
subject.destroy!
expect(delta1).to eq(delta0 - read_attribute)
expect(delta1).to be < delta0
end
delta1 = reload_stat
expect(delta1).to eq(delta0 - read_attribute)
expect(delta1).to be < delta0
end
it 'schedules a namespace statistics worker' do
expect(Namespaces::ScheduleAggregationWorker)
.to receive(:perform_async).once
it 'schedules a namespace statistics worker' do
expect(Namespaces::ScheduleAggregationWorker)
.to receive(:perform_async).once
subject.destroy!
end
context 'when it is destroyed from the project level' do
it 'does not update the project statistics' do
expect(ProjectStatistics)
.not_to receive(:increment_statistic)
project.update!(pending_delete: true)
project.destroy!
end
it 'does not schedule a namespace statistics worker' do
expect(Namespaces::ScheduleAggregationWorker)
.not_to receive(:perform_async)
subject.destroy!
project.update!(pending_delete: true)
project.destroy!
end
end
end
end
context 'when it is destroyed from the project level' do
it 'does not update the project statistics' do
expect(ProjectStatistics)
.not_to receive(:increment_statistic)
def expect_flush_counter_increments_worker_performed
expect(FlushCounterIncrementsWorker)
.to receive(:perform_in)
.with(CounterAttribute::WORKER_DELAY, project.statistics.class.name, project.statistics.id, project_statistics_name)
expect(FlushCounterIncrementsWorker)
.to receive(:perform_in)
.with(CounterAttribute::WORKER_DELAY, project.statistics.class.name, project.statistics.id, :storage_size)
project.update!(pending_delete: true)
project.destroy!
yield
# simulate worker running now
expect(Namespaces::ScheduleAggregationWorker).to receive(:perform_async)
FlushCounterIncrementsWorker.new.perform(project.statistics.class.name, project.statistics.id, project_statistics_name)
end
if with_counter_attribute
context 'when statistic is a counter attribute', :clean_gitlab_redis_shared_state do
context 'when creating' do
it 'stores pending increments for async update' do
initial_stat = reload_stat
expected_increment = read_attribute
expect_flush_counter_increments_worker_performed do
subject.save!
expect(read_pending_increment).to eq(expected_increment)
expect(expected_increment).to be > initial_stat
expect(expected_increment).to be_positive
end
end
end
it 'does not schedule a namespace statistics worker' do
expect(Namespaces::ScheduleAggregationWorker)
.not_to receive(:perform_async)
context 'when updating' do
let(:delta) { 42 }
before do
subject.save!
redis_shared_state_cleanup!
end
it 'stores pending increments for async update' do
expect(ProjectStatistics)
.to receive(:increment_statistic)
.and_call_original
subject.write_attribute(statistic_attribute, read_attribute + delta)
expect_flush_counter_increments_worker_performed do
subject.save!
expect(read_pending_increment).to eq(delta)
end
end
it 'avoids N + 1 queries' do
subject.write_attribute(statistic_attribute, read_attribute + delta)
control_count = ActiveRecord::QueryRecorder.new do
subject.save!
end
subject.write_attribute(statistic_attribute, read_attribute + delta)
expect do
subject.save!
end.not_to exceed_query_limit(control_count)
end
end
project.update!(pending_delete: true)
project.destroy!
context 'when destroying' do
before do
subject.save!
redis_shared_state_cleanup!
end
it 'stores pending increment for async update' do
initial_stat = reload_stat
expected_increment = -read_attribute
expect_flush_counter_increments_worker_performed do
subject.destroy!
expect(read_pending_increment).to eq(expected_increment)
expect(expected_increment).to be < initial_stat
expect(expected_increment).to be_negative
end
end
context 'when it is destroyed from the project level' do
it 'does not update the project statistics' do
expect(ProjectStatistics)
.not_to receive(:increment_statistic)
project.update!(pending_delete: true)
project.destroy!
end
it 'does not schedule a namespace statistics worker' do
expect(Namespaces::ScheduleAggregationWorker)
.not_to receive(:perform_async)
project.update!(pending_delete: true)
project.destroy!
end
end
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment