Commit 36401b43 authored by Fabio Pitino's avatar Fabio Pitino

Efficiently increment Active Record counters using Redis

* Add increments to a key in Redis
* Flush increments asynchronously to the database
* Add unit tests for Lua script
parent 7a89ded4
# frozen_string_literal: true
# Add capabilities to increment a numeric model attribute efficiently by
# using Redis and flushing the increments asynchronously to the database
# after a period of time (10 minutes).
# When an attribute is incremented by a value, the increment is added
# to a Redis key. Then, FlushCounterIncrementsWorker will execute
# `flush_increments_to_database!` which removes increments from Redis for a
# given model attribute and updates the values in the database.
#
# @example:
#
# class ProjectStatistics
# include CounterAttribute
#
# counter_attribute :commit_count
# counter_attribute :storage_size
# end
#
# To increment the counter we can use the method:
# delayed_increment_counter(:commit_count, 3)
#
module CounterAttribute
extend ActiveSupport::Concern
extend AfterCommitQueue
include Gitlab::ExclusiveLeaseHelpers
LUA_STEAL_INCREMENT_SCRIPT = <<~EOS.freeze
local increment_key, flushed_key = KEYS[1], KEYS[2]
local increment_value = redis.call("get", increment_key) or 0
local flushed_value = redis.call("incrby", flushed_key, increment_value)
if flushed_value == 0 then
redis.call("del", increment_key, flushed_key)
else
redis.call("del", increment_key)
end
return flushed_value
EOS
WORKER_DELAY = 10.minutes
WORKER_LOCK_TTL = 10.minutes
class_methods do
def counter_attribute(attribute)
counter_attributes << attribute
end
def counter_attributes
@counter_attributes ||= Set.new
end
end
# This method must only be called by FlushCounterIncrementsWorker
# because it should run asynchronously and with exclusive lease.
# This will
# 1. temporarily move the pending increment for a given attribute
# to a relative "flushed" Redis key, delete the increment key and return
# the value. If new increments are performed at this point, the increment
# key is recreated as part of `delayed_increment_counter`.
# The "flushed" key is used to ensure that we can keep incrementing
# counters in Redis while flushing existing values.
# 2. then the value is used to update the counter in the database.
# 3. finally the "flushed" key is deleted.
def flush_increments_to_database!(attribute)
lock_key = counter_lock_key(attribute)
with_exclusive_lease(lock_key) do
increment_key = counter_key(attribute)
flushed_key = counter_flushed_key(attribute)
increment_value = steal_increments(increment_key, flushed_key)
next if increment_value == 0
transaction do
unsafe_update_counters(id, attribute => increment_value)
redis_state { |redis| redis.del(flushed_key) }
end
end
end
def delayed_increment_counter(attribute, increment)
return if increment == 0
run_after_commit_or_now do
if counter_attribute_enabled?(attribute)
redis_state do |redis|
redis.incrby(counter_key(attribute), increment)
end
FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, self.class.name, self.id, attribute)
else
legacy_increment!(attribute, increment)
end
end
true
end
def counter_key(attribute)
"project:{#{project_id}}:counters:#{self.class}:#{id}:#{attribute}"
end
def counter_flushed_key(attribute)
counter_key(attribute) + ':flushed'
end
def counter_lock_key(attribute)
counter_key(attribute) + ':lock'
end
private
def counter_attribute_enabled?(attribute)
Feature.enabled?(:efficient_counter_attribute, project) &&
self.class.counter_attributes.include?(attribute)
end
def steal_increments(increment_key, flushed_key)
redis_state do |redis|
redis.eval(LUA_STEAL_INCREMENT_SCRIPT, keys: [increment_key, flushed_key])
end
end
def legacy_increment!(attribute, increment)
increment!(attribute, increment)
end
def unsafe_update_counters(id, increments)
self.class.update_counters(id, increments)
end
def redis_state(&block)
Gitlab::Redis::SharedState.with(&block)
end
def with_exclusive_lease(lock_key)
in_lock(lock_key, ttl: WORKER_LOCK_TTL) do
yield
end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
# a worker is already updating the counters
end
end
......@@ -1340,6 +1340,14 @@
:weight: 1
:idempotent:
:tags: []
- :name: flush_counter_increments
:feature_category: :not_owned
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: git_garbage_collect
:feature_category: :gitaly
:has_external_dependencies:
......
# frozen_string_literal: true
# Invoked by CounterAttribute concern when incrementing counter
# attributes. The method `flush_increments_to_database!` that
# this worker uses is itself idempotent as it runs with exclusive
# lease to ensure that only one instance at the time can flush
# increments from Redis to the database.
class FlushCounterIncrementsWorker
include ApplicationWorker
feature_category_not_owned!
urgency :low
deduplicate :until_executing, including_scheduled: true
idempotent!
def perform(model_name, model_id, attribute)
return unless self.class.const_defined?(model_name)
model_class = model_name.constantize
model = model_class.find_by_id(model_id)
return unless model
model.flush_increments_to_database!(attribute)
end
end
---
title: Add mechanism that efficiently increments ActiveRecord counters using Redis
merge_request: 35878
author:
type: performance
......@@ -108,6 +108,8 @@
- 1
- - file_hook
- 1
- - flush_counter_increments
- 1
- - gcp_cluster
- 1
- - geo
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe CounterAttribute, :counter_attribute, :clean_gitlab_redis_shared_state do
using RSpec::Parameterized::TableSyntax
let(:project_statistics) { create(:project_statistics) }
let(:model) { CounterAttributeModel.find(project_statistics.id) }
it_behaves_like CounterAttribute, [:build_artifacts_size, :commit_count] do
let(:model) { CounterAttributeModel.find(project_statistics.id) }
end
describe '.steal_increments' do
let(:increment_key) { 'counters:Model:123:attribute' }
let(:flushed_key) { 'counter:Model:123:attribute:flushed' }
subject { model.send(:steal_increments, increment_key, flushed_key) }
where(:increment, :flushed, :result, :flushed_key_present) do
nil | nil | 0 | false
nil | 0 | 0 | false
0 | 0 | 0 | false
1 | 0 | 1 | true
1 | nil | 1 | true
1 | 1 | 2 | true
1 | -2 | -1 | true
-1 | 1 | 0 | false
end
with_them do
before do
Gitlab::Redis::SharedState.with do |redis|
redis.set(increment_key, increment) if increment
redis.set(flushed_key, flushed) if flushed
end
end
it { is_expected.to eq(result) }
it 'drops the increment key and creates the flushed key if it does not exist' do
subject
Gitlab::Redis::SharedState.with do |redis|
expect(redis.exists(increment_key)).to be_falsey
expect(redis.exists(flushed_key)).to eq(flushed_key_present)
end
end
end
end
end
......@@ -328,8 +328,8 @@ RSpec.describe ProjectStatistics do
it 'increases also storage size by that amount' do
expect { described_class.increment_statistic(project.id, stat, 20) }
.to change { statistics.reload.storage_size }
.by(20)
.to change { statistics.reload.storage_size }
.by(20)
end
end
......
# frozen_string_literal: true
RSpec.configure do |config|
config.before(:each, :counter_attribute) do
stub_const('CounterAttributeModel', Class.new(ProjectStatistics))
CounterAttributeModel.class_eval do
include CounterAttribute
counter_attribute :build_artifacts_size
counter_attribute :commit_count
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.shared_examples_for CounterAttribute do |counter_attributes|
it 'defines a Redis counter_key' do
expect(model.counter_key(:counter_name))
.to eq("project:{#{model.project_id}}:counters:CounterAttributeModel:#{model.id}:counter_name")
end
it 'defines a method to store counters' do
expect(model.class.counter_attributes.to_a).to eq(counter_attributes)
end
counter_attributes.each do |attribute|
describe attribute do
describe '#delayed_increment_counter', :redis do
let(:increment) { 10 }
subject { model.delayed_increment_counter(attribute, increment) }
context 'when attribute is a counter attribute' do
where(:increment) { [10, -3] }
with_them do
it 'increments the counter in Redis' do
subject
Gitlab::Redis::SharedState.with do |redis|
counter = redis.get(model.counter_key(attribute))
expect(counter).to eq(increment.to_s)
end
end
it 'does not increment the counter for the record' do
expect { subject }.not_to change { model.reset.read_attribute(attribute) }
end
it 'schedules a worker to flush counter increments asynchronously' do
expect(FlushCounterIncrementsWorker).to receive(:perform_in)
.with(CounterAttribute::WORKER_DELAY, model.class.name, model.id, attribute)
.and_call_original
subject
end
end
context 'when increment is 0' do
let(:increment) { 0 }
it 'does nothing' do
expect(FlushCounterIncrementsWorker).not_to receive(:perform_in)
expect(model).not_to receive(:update!)
subject
end
end
end
context 'when attribute is not a counter attribute' do
it 'delegates to ActiveRecord update!' do
expect { model.delayed_increment_counter(:unknown_attribute, 10) }
.to raise_error(ActiveModel::MissingAttributeError)
end
end
context 'when feature flag is disabled' do
before do
stub_feature_flags(efficient_counter_attribute: false)
end
it 'delegates to ActiveRecord update!' do
expect { subject }
.to change { model.reset.read_attribute(attribute) }.by(increment)
end
it 'does not increment the counter in Redis' do
subject
Gitlab::Redis::SharedState.with do |redis|
counter = redis.get(model.counter_key(attribute))
expect(counter).to be_nil
end
end
end
end
end
end
describe '.flush_increments_to_database!', :redis do
let(:incremented_attribute) { counter_attributes.first }
subject { model.flush_increments_to_database!(incremented_attribute) }
it 'obtains an exclusive lease during processing' do
expect(model)
.to receive(:in_lock)
.with(model.counter_lock_key(incremented_attribute), ttl: described_class::WORKER_LOCK_TTL)
.and_call_original
subject
end
context 'when there is a counter to flush' do
before do
model.delayed_increment_counter(incremented_attribute, 10)
model.delayed_increment_counter(incremented_attribute, -3)
end
it 'updates the record' do
expect { subject }.to change { model.reset.read_attribute(incremented_attribute) }.by(7)
end
it 'removes the increment entry from Redis' do
Gitlab::Redis::SharedState.with do |redis|
key_exists = redis.exists(model.counter_key(incremented_attribute))
expect(key_exists).to be_truthy
end
subject
Gitlab::Redis::SharedState.with do |redis|
key_exists = redis.exists(model.counter_key(incremented_attribute))
expect(key_exists).to be_falsey
end
end
end
context 'when there are no counters to flush' do
context 'when there are no counters in the relative :flushed key' do
it 'does not change the record' do
expect { subject }.not_to change { model.reset.attributes }
end
end
# This can be the case where updating counters in the database fails with error
# and retrying the worker will retry flushing the counters but the main key has
# disappeared and the increment has been moved to the "<...>:flushed" key.
context 'when there are counters in the relative :flushed key' do
before do
Gitlab::Redis::SharedState.with do |redis|
redis.incrby(model.counter_flushed_key(incremented_attribute), 10)
end
end
it 'updates the record' do
expect { subject }.to change { model.reset.read_attribute(incremented_attribute) }.by(10)
end
it 'deletes the relative :flushed key' do
subject
Gitlab::Redis::SharedState.with do |redis|
key_exists = redis.exists(model.counter_flushed_key(incremented_attribute))
expect(key_exists).to be_falsey
end
end
end
end
context 'when deleting :flushed key fails' do
before do
Gitlab::Redis::SharedState.with do |redis|
redis.incrby(model.counter_flushed_key(incremented_attribute), 10)
expect(redis).to receive(:del).and_raise('could not delete key')
end
end
it 'does a rollback of the counter update' do
expect { subject }.to raise_error('could not delete key')
expect(model.reset.read_attribute(incremented_attribute)).to eq(0)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe FlushCounterIncrementsWorker, :counter_attribute do
let(:project_statistics) { create(:project_statistics) }
let(:model) { CounterAttributeModel.find(project_statistics.id) }
describe '#perform', :redis do
let(:attribute) { model.class.counter_attributes.first }
let(:worker) { described_class.new }
subject { worker.perform(model.class.name, model.id, attribute) }
it 'flushes increments to database' do
expect(model.class).to receive(:find_by_id).and_return(model)
expect(model)
.to receive(:flush_increments_to_database!)
.with(attribute)
.and_call_original
subject
end
context 'when model class does not exist' do
subject { worker.perform('non-existend-model') }
it 'does nothing' do
expect(worker).not_to receive(:in_lock)
end
end
context 'when record does not exist' do
subject { worker.perform(model.class.name, model.id + 100, attribute) }
it 'does nothing' do
expect(worker).not_to receive(:in_lock)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment