Commit fcb73359 authored by Toon Claes's avatar Toon Claes

Merge branch '204797-geo-make-registry-consistency-worker-clean-up-unused-registries' into 'master'

Geo - Make Geo::RegistryConsistencyWorker clean up unused registries

Closes #204797

See merge request gitlab-org/gitlab!32695
parents bb396938 c3f0351c
......@@ -49,8 +49,16 @@ module Geo
#
# @return [Array] the first element is an Array of untracked IDs, and the second element is an Array of tracked IDs that are unused
def find_registry_differences(range)
source_ids = job_artifacts(fdw: false).where(id: range).pluck(::Ci::JobArtifact.arel_table[:id]) # rubocop:disable CodeReuse/ActiveRecord
tracked_ids = Geo::JobArtifactRegistry.pluck_model_ids_in_range(range)
# rubocop:disable CodeReuse/ActiveRecord
source_ids =
job_artifacts(fdw: false)
.id_in(range)
.pluck(::Ci::JobArtifact.arel_table[:id])
# rubocop:enable CodeReuse/ActiveRecord
tracked_ids =
Geo::JobArtifactRegistry
.pluck_model_ids_in_range(range)
untracked_ids = source_ids - tracked_ids
unused_tracked_ids = tracked_ids - source_ids
......
......@@ -49,8 +49,14 @@ module Geo
#
# @return [Array] the first element is an Array of untracked IDs, and the second element is an Array of tracked IDs that are unused
def find_registry_differences(range)
source_ids = lfs_objects(fdw: false).where(id: range).pluck_primary_key # rubocop:disable CodeReuse/ActiveRecord
tracked_ids = Geo::LfsObjectRegistry.pluck_model_ids_in_range(range)
source_ids =
lfs_objects(fdw: false)
.id_in(range)
.pluck_primary_key
tracked_ids =
Geo::LfsObjectRegistry
.pluck_model_ids_in_range(range)
untracked_ids = source_ids - tracked_ids
unused_tracked_ids = tracked_ids - source_ids
......
......@@ -30,4 +30,8 @@ class Geo::BaseRegistry < Geo::TrackingBase
bulk_insert!(records, returns: :ids)
end
def self.delete_for_model_ids(ids)
raise NotImplementedError, "#{self.class} does not implement #{__method__}"
end
end
......@@ -25,6 +25,10 @@ class Geo::JobArtifactRegistry < Geo::BaseRegistry
::Geo::JobArtifactRegistryFinder
end
def self.delete_worker_class
::Geo::FileRegistryRemovalWorker
end
# When false, RegistryConsistencyService will frequently check the end of the
# table to quickly handle new replicables.
def self.has_create_events?
......@@ -33,14 +37,20 @@ class Geo::JobArtifactRegistry < Geo::BaseRegistry
# TODO: remove once `success` column has a default value set
# https://gitlab.com/gitlab-org/gitlab/-/issues/214407
def self.insert_for_model_ids(ids)
records = ids.map do |id|
new(artifact_id: id, success: false, created_at: Time.zone.now)
def self.insert_for_model_ids(artifact_ids)
records = artifact_ids.map do |artifact_id|
new(artifact_id: artifact_id, success: false, created_at: Time.zone.now)
end
bulk_insert!(records, returns: :ids)
end
def self.delete_for_model_ids(artifact_ids)
artifact_ids.map do |artifact_id|
delete_worker_class.perform_async(:job_artifact, artifact_id)
end
end
def self.replication_enabled?
JobArtifactUploader.object_store_enabled? ? Gitlab::Geo.current_node.sync_object_storage? : true
end
......
......@@ -21,9 +21,19 @@ class Geo::LfsObjectRegistry < Geo::BaseRegistry
::Geo::LfsObjectRegistryFinder
end
def self.delete_worker_class
::Geo::FileRegistryRemovalWorker
end
# If false, RegistryConsistencyService will frequently check the end of the
# table to quickly handle new replicables.
def self.has_create_events?
false
end
def self.delete_for_model_ids(lfs_object_ids)
lfs_object_ids.map do |lfs_object_id|
delete_worker_class.perform_async(:lfs, lfs_object_id)
end
end
end
......@@ -22,6 +22,10 @@ class Geo::UploadRegistry < Geo::BaseRegistry
::Geo::AttachmentRegistryFinder
end
def self.delete_worker_class
::Geo::FileRegistryRemovalWorker
end
# If false, RegistryConsistencyService will frequently check the end of the
# table to quickly handle new replicables.
def self.has_create_events?
......@@ -36,6 +40,12 @@ class Geo::UploadRegistry < Geo::BaseRegistry
bulk_insert!(records, returns: :ids)
end
def self.delete_for_model_ids(attrs)
attrs.map do |file_id, file_type|
delete_worker_class.perform_async(file_type, file_id)
end
end
def self.with_search(query)
return all if query.nil?
......
......@@ -14,15 +14,15 @@ module Geo
@batch_size = batch_size
end
# @return [Boolean] whether at least one registry has been created or deleted in range
def execute
range = next_range!
return unless range
created_in_range = create_missing_in_range(range)
created_above = create_missing_above(end_of_batch: range.last)
created_in_range, deleted_in_range = handle_differences_in_range(range)
created_above, deleted_above = create_missing_above(end_of_batch: range.last)
created_in_range.any? ||
created_above.any?
[created_in_range, deleted_in_range, created_above, deleted_above].flatten.compact.any?
rescue => e
log_error("Error while backfilling #{registry_class}", e)
......@@ -33,31 +33,45 @@ module Geo
# @return [Range] the next range of a batch of records
def next_range!
Gitlab::LoopingBatcher.new(model_class, key: batcher_key, batch_size: batch_size).next_range!
Gitlab::Geo::RegistryBatcher.new(registry_class, key: batcher_key, batch_size: batch_size).next_range!
end
def batcher_key
"registry_consistency:#{registry_class.name.parameterize}"
end
# @return [Array] the list of IDs of created records
def create_missing_in_range(range)
untracked, _ = find_registry_differences(range)
return [] if untracked.empty?
def find_registry_differences(range)
finder.find_registry_differences(range)
end
def finder
@finder ||= registry_class.finder_class.new(current_node_id: Gitlab::Geo.current_node.id)
end
created = registry_class.insert_for_model_ids(untracked)
def handle_differences_in_range(range)
untracked, unused = find_registry_differences(range)
log_created(range, untracked, created)
created_in_range = create_untracked_in_range(untracked)
log_created(range, untracked, created_in_range)
created
deleted_in_range = delete_unused_in_range(unused)
log_deleted(range, unused, deleted_in_range)
[created_in_range, deleted_in_range]
end
def find_registry_differences(range)
finder.find_registry_differences(range)
# @return [Array] the list of IDs of created records
def create_untracked_in_range(untracked)
return [] if untracked.empty?
registry_class.insert_for_model_ids(untracked)
end
def finder
@finder ||= registry_class.finder_class.new(current_node_id: Gitlab::Geo.current_node.id)
# @return [Array] the list of IDs of deleted records
def delete_unused_in_range(delete_unused_in_range)
return [] if delete_unused_in_range.empty?
registry_class.delete_for_model_ids(delete_unused_in_range)
end
# This hack is used to sync new files soon after they are created.
......@@ -65,10 +79,11 @@ module Geo
# This is not needed for replicables that have already implemented
# create events.
#
# @param [Integer] the last ID of the batch processed in create_missing_in_range
# @param [Integer] the last ID of the batch processed in create_untracked_in_range
# @return [Array] the list of IDs of created records
def create_missing_above(end_of_batch:)
return [] if registry_class.has_create_events?
return [] unless model_class.any?
last_id = model_class.last.id
......@@ -81,7 +96,7 @@ module Geo
start = last_id - batch_size + 1
finish = last_id
create_missing_in_range(start..finish)
handle_differences_in_range(start..finish)
end
# Returns true when LoopingBatcher will soon return ranges near the end of
......@@ -104,5 +119,18 @@ module Geo
}
)
end
def log_deleted(range, unused, deleted)
log_info(
"Deleted registry entries",
{
registry_class: registry_class.name,
start: range.first,
finish: range.last,
deleted: deleted.length,
failed_to_delete: unused.length - deleted.length
}
)
end
end
end
---
title: Geo - Make Geo::RegistryConsistencyWorker clean up unused registries
merge_request: 32695
author:
type: added
# frozen_string_literal: true
module Gitlab
module Geo
# Returns an ID range to allow iteration over a registry table and its
# source replicable table. Repeats from the beginning after it reaches
# the end.
#
# Used by Geo in particular to iterate over a replicable and its registry
# table.
#
# Tracks a cursor for each table, by "key". If the table is smaller than
# batch_size, then a range for the whole table is returned on every call.
class RegistryBatcher
# @param [Class] registry_class the class of the table to iterate on
# @param [String] key to identify the cursor. Note, cursor is already unique
# per table.
# @param [Integer] batch_size to limit the number of records in a batch
def initialize(registry_class, key:, batch_size: 1000)
@model_class = registry_class::MODEL_CLASS
@model_foreign_key = registry_class::MODEL_FOREIGN_KEY
@registry_class = registry_class
@key = key
@batch_size = batch_size
end
# @return [Range] a range of IDs. `nil` if 0 records at or after the cursor.
def next_range!
batch_first_id = cursor_id
batch_last_id = get_batch_last_id(batch_first_id)
return unless batch_last_id
batch_first_id..batch_last_id
end
private
attr_reader :model_class, :model_foreign_key, :registry_class, :key, :batch_size
# @private
#
# Get the last ID of the batch. Increment the cursor or reset it if at end.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer] batch_last_id the last ID of the batch (not the table)
def get_batch_last_id(batch_first_id)
model_class_last_id, more_records = get_model_batch_last_id(batch_first_id)
registry_class_last_id, more_registries = get_registry_batch_last_id(batch_first_id)
batch_last_id =
if !more_records && more_registries
registry_class_last_id
else
model_class_last_id
end
if more_records || more_registries
increment_batch(batch_last_id)
else
reset if batch_first_id > 1
end
batch_last_id
end
# @private
#
# Get the last ID of the of the batch (not the table) for the replicable
# and check if there are more rows in the table.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer, Boolean] A tuple with the the last ID of the batch (not the table),
# and whether or not have more rows to check in the table
def get_model_batch_last_id(batch_first_id)
sql = <<~SQL
SELECT MAX(batch.#{model_class.primary_key}) AS batch_last_id,
EXISTS (
SELECT #{model_class.primary_key}
FROM #{model_class.table_name}
WHERE #{model_class.primary_key} > MAX(batch.#{model_class.primary_key})
) AS more_rows
FROM (
SELECT #{model_class.primary_key}
FROM #{model_class.table_name}
WHERE #{model_class.primary_key} >= #{batch_first_id}
ORDER BY #{model_class.primary_key}
LIMIT #{batch_size}) AS batch;
SQL
result = model_class.connection.exec_query(sql).first
[result["batch_last_id"], result["more_rows"]]
end
# @private
#
# Get the last ID of the of the batch (not the table) for the registry
# and check if there are more rows in the table.
#
# This query differs from the replicable query by:
#
# - We check against the foreign key IDs not the registry IDs;
# - In the where clause of the more_rows part, we use greater
# than or equal. This allows the batcher to switch to the
# registry table while getting the last ID of the batch
# when the previous batch included the end of the replicable
# table but there are orphaned registries where the foreign key
# ids are higher than the last replicable id;
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer, Boolean] A tuple with the the last ID of the batch (not the table),
# and whether or not have more rows to check in the table
def get_registry_batch_last_id(batch_first_id)
sql = <<~SQL
SELECT MAX(batch.#{model_foreign_key}) AS batch_last_id,
EXISTS (
SELECT #{model_foreign_key}
FROM #{registry_class.table_name}
WHERE #{model_foreign_key} >= MAX(batch.#{model_foreign_key})
) AS more_rows
FROM (
SELECT #{model_foreign_key}
FROM #{registry_class.table_name}
WHERE #{model_foreign_key} >= #{batch_first_id}
ORDER BY #{model_foreign_key}
LIMIT #{batch_size}) AS batch;
SQL
result = registry_class.connection.exec_query(sql).first
[result["batch_last_id"], result["more_rows"]]
end
def reset
set_cursor_id(1)
end
def increment_batch(batch_last_id)
set_cursor_id(batch_last_id + 1)
end
# @private
#
# @return [Integer] the cursor ID, or 1 if it is not set
def cursor_id
Rails.cache.fetch("#{cache_key}:cursor_id") || 1
end
def set_cursor_id(id)
Rails.cache.write("#{cache_key}:cursor_id", id)
end
def cache_key
@cache_key ||= "#{self.class.name.parameterize}:#{registry_class.name.parameterize}:#{key}:cursor_id"
end
end
end
end
......@@ -2,18 +2,77 @@
require 'spec_helper'
describe Gitlab::LoopingBatcher, :use_clean_rails_memory_store_caching do
describe Gitlab::Geo::RegistryBatcher, :geo, :use_clean_rails_memory_store_caching do
describe '#next_range!' do
let(:model_class) { LfsObject }
let(:model_foreign_key) { registry_class::MODEL_FOREIGN_KEY }
let(:registry_class) { Geo::LfsObjectRegistry }
let(:registry_class_factory) { registry_class.underscore.tr('/', '_').to_sym }
let(:key) { 'looping_batcher_spec' }
let(:batch_size) { 2 }
subject { described_class.new(model_class, key: key, batch_size: batch_size).next_range! }
subject { described_class.new(registry_class, key: key, batch_size: batch_size).next_range! }
context 'when there are no records' do
it { is_expected.to be_nil }
end
context 'when there are no records but there are orphaned registries' do
let!(:registries) { create_list(registry_class_factory, 3) }
context 'when it has never been called before' do
it { is_expected.to be_a Range }
it 'starts from the beginning' do
expect(subject.first).to eq(1)
end
it 'ends at a full batch' do
expect(subject.last).to eq(registries.second.public_send(model_foreign_key))
end
context 'when the batch size is greater than the number of registries' do
let(:batch_size) { 5 }
it 'ends at the last ID' do
expect(subject.last).to eq(registries.last.public_send(model_foreign_key))
end
end
end
context 'when it was called before' do
context 'when the previous batch included the end of the table' do
before do
described_class.new(registry_class, key: key, batch_size: registry_class.count).next_range!
end
it { is_expected.to be_nil }
end
context 'when the previous batch did not include the end of the table' do
before do
described_class.new(registry_class, key: key, batch_size: registry_class.count - 1).next_range!
end
it 'starts after the previous batch' do
expect(subject).to eq(registries.last.public_send(model_foreign_key)..registries.last.public_send(model_foreign_key))
end
end
context 'if cache is cleared' do
before do
described_class.new(registry_class, key: key, batch_size: batch_size).next_range!
end
it 'starts from the beginning' do
Rails.cache.clear
expect(subject).to eq(1..registries.second.public_send(model_foreign_key))
end
end
end
end
context 'when there are records' do
let!(:records) { create_list(model_class.underscore, 3) }
......@@ -40,7 +99,7 @@ describe Gitlab::LoopingBatcher, :use_clean_rails_memory_store_caching do
context 'when it was called before' do
context 'when the previous batch included the end of the table' do
before do
described_class.new(model_class, key: key, batch_size: model_class.count).next_range!
described_class.new(registry_class, key: key, batch_size: model_class.count).next_range!
end
it 'starts from the beginning' do
......@@ -50,7 +109,7 @@ describe Gitlab::LoopingBatcher, :use_clean_rails_memory_store_caching do
context 'when the previous batch did not include the end of the table' do
before do
described_class.new(model_class, key: key, batch_size: model_class.count - 1).next_range!
described_class.new(registry_class, key: key, batch_size: model_class.count - 1).next_range!
end
it 'starts after the previous batch' do
......@@ -59,6 +118,10 @@ describe Gitlab::LoopingBatcher, :use_clean_rails_memory_store_caching do
end
context 'if cache is cleared' do
before do
described_class.new(registry_class, key: key, batch_size: batch_size).next_range!
end
it 'starts from the beginning' do
Rails.cache.clear
......@@ -67,5 +130,43 @@ describe Gitlab::LoopingBatcher, :use_clean_rails_memory_store_caching do
end
end
end
context 'when there are records and orphaned registries with foreign key greater than last record id' do
let!(:records) { create_list(model_class.underscore, 3) }
let(:orphaned_registry_foreign_key_id) { records.last.id }
let!(:registry) { create(registry_class_factory, model_foreign_key => orphaned_registry_foreign_key_id) }
before do
model_class.where(id: orphaned_registry_foreign_key_id).delete_all
end
context 'when it has never been called before' do
it { is_expected.to be_a Range }
it 'starts from the beginning' do
expect(subject.first).to eq(1)
end
it 'ends at the last registry foreign key ID' do
expect(subject.last).to eq(orphaned_registry_foreign_key_id)
end
end
context 'when it was called before' do
before do
described_class.new(registry_class, key: key, batch_size: batch_size).next_range!
end
it { is_expected.to be_nil }
context 'if cache is cleared' do
it 'starts from the beginning' do
Rails.cache.clear
expect(subject).to eq(1..orphaned_registry_foreign_key_id)
end
end
end
end
end
end
......@@ -2,22 +2,25 @@
require 'spec_helper'
describe Geo::RegistryConsistencyService, :geo_fdw, :use_clean_rails_memory_store_caching do
describe Geo::RegistryConsistencyService, :geo, :use_clean_rails_memory_store_caching do
include EE::GeoHelpers
let(:secondary) { create(:geo_node) }
subject { described_class.new(registry_class, batch_size: batch_size) }
before do
stub_current_geo_node(secondary)
end
::Geo::Secondary::RegistryConsistencyWorker::REGISTRY_CLASSES.each do |klass|
let(:registry_class) { klass }
let(:registry_class_factory) { registry_class.underscore.tr('/', '_').to_sym }
let(:model_class) { registry_class::MODEL_CLASS }
let(:model_class_factory) { model_class.underscore.tr('/', '_').to_sym }
let(:model_foreign_key) { registry_class::MODEL_FOREIGN_KEY }
let(:batch_size) { 2 }
subject { described_class.new(registry_class, batch_size: batch_size) }
describe 'registry_class interface' do
it 'defines a MODEL_CLASS constant' do
expect(registry_class::MODEL_CLASS).not_to be_nil
......@@ -31,6 +34,10 @@ describe Geo::RegistryConsistencyService, :geo_fdw, :use_clean_rails_memory_stor
expect(registry_class).to respond_to(:insert_for_model_ids)
end
it 'responds to .delete_for_model_ids' do
expect(registry_class).to respond_to(:delete_for_model_ids)
end
it 'responds to .finder_class' do
expect(registry_class).to respond_to(:finder_class)
end
......@@ -42,7 +49,7 @@ describe Geo::RegistryConsistencyService, :geo_fdw, :use_clean_rails_memory_stor
describe '#execute' do
context 'when there are replicable records missing registries' do
let!(:expected_batch) { create_list(model_class.underscore.to_sym, batch_size) }
let!(:expected_batch) { create_list(model_class_factory, batch_size) }
it 'creates missing registries' do
expect do
......@@ -55,7 +62,7 @@ describe Geo::RegistryConsistencyService, :geo_fdw, :use_clean_rails_memory_stor
end
it 'does not exceed batch size' do
not_expected = create(model_class.underscore.to_sym)
not_expected = create(model_class_factory)
subject.execute
......@@ -64,7 +71,7 @@ describe Geo::RegistryConsistencyService, :geo_fdw, :use_clean_rails_memory_stor
# Temporarily, until we implement create events for these replicables
context 'when the number of records is greater than 6 batches' do
let!(:five_batches_worth) { create_list(model_class.underscore.to_sym, 5 * batch_size) }
let!(:five_batches_worth) { create_list(model_class_factory, 5 * batch_size) }
context 'when the previous batch is greater than 5 batches from the end of the table' do
context 'when create events are implemented for this replicable' do
......@@ -82,8 +89,8 @@ describe Geo::RegistryConsistencyService, :geo_fdw, :use_clean_rails_memory_stor
expect(registry_class.model_id_in(expected).count).to eq(2)
end
it 'calls #create_missing_in_range only once' do
expect(subject).to receive(:create_missing_in_range).once.and_call_original
it 'calls #handle_differences_in_range only once' do
expect(subject).to receive(:handle_differences_in_range).once.and_call_original
subject.execute
end
......@@ -104,8 +111,8 @@ describe Geo::RegistryConsistencyService, :geo_fdw, :use_clean_rails_memory_stor
expect(registry_class.model_id_in(expected).count).to eq(4)
end
it 'calls #create_missing_in_range twice' do
expect(subject).to receive(:create_missing_in_range).twice.and_call_original
it 'calls #handle_differences_in_range twice' do
expect(subject).to receive(:handle_differences_in_range).twice.and_call_original
subject.execute
end
......@@ -124,8 +131,8 @@ describe Geo::RegistryConsistencyService, :geo_fdw, :use_clean_rails_memory_stor
end.to change { registry_class.count }.by(batch_size)
end
it 'calls #create_missing_in_range once' do
expect(subject).to receive(:create_missing_in_range).once.and_call_original
it 'calls #handle_differences_in_range once' do
expect(subject).to receive(:handle_differences_in_range).once.and_call_original
subject.execute
end
......@@ -133,17 +140,94 @@ describe Geo::RegistryConsistencyService, :geo_fdw, :use_clean_rails_memory_stor
end
context 'when the number of records is less than 6 batches' do
it 'calls #create_missing_in_range once' do
expect(subject).to receive(:create_missing_in_range).once.and_call_original
it 'calls #handle_differences_in_range once' do
expect(subject).to receive(:handle_differences_in_range).once.and_call_original
subject.execute
end
end
end
context 'when there are unused registries' do
context 'with no replicable records' do
let(:records) { create_list(model_class_factory, batch_size) }
let(:unused_model_ids) { records.map(&:id) }
let!(:registries) do
records.map do |record|
create(registry_class_factory, model_foreign_key => record.id)
end
end
before do
model_class.where(id: unused_model_ids).delete_all
end
it 'deletes unused registries', :sidekiq_inline do
subject.execute
expect(registry_class.where(model_foreign_key => unused_model_ids)).to be_empty
end
it 'returns truthy' do
expect(subject.execute).to be_truthy
end
end
context 'when the unused registry foreign key ids are lower than the first replicable model id' do
let(:records) { create_list(model_class_factory, batch_size) }
let(:unused_registry_ids) { [records.first].map(&:id) }
let!(:registries) do
records.map do |record|
create(registry_class_factory, model_foreign_key => record.id)
end
end
before do
model_class.where(id: unused_registry_ids).delete_all
end
it 'deletes unused registries', :sidekiq_inline do
subject.execute
expect(registry_class.where(model_foreign_key => unused_registry_ids)).to be_empty
end
it 'returns truthy' do
expect(subject.execute).to be_truthy
end
end
context 'when the unused registry foreign key ids are greater than the last replicable model id' do
let(:records) { create_list(model_class_factory, batch_size) }
let(:unused_registry_ids) { [records.last].map(&:id) }
let!(:registries) do
records.map do |record|
create(registry_class_factory, model_foreign_key => record.id)
end
end
before do
model_class.where(id: unused_registry_ids).delete_all
end
it 'deletes unused registries', :sidekiq_inline do
subject.execute
expect(registry_class.where(model_foreign_key => unused_registry_ids)).to be_empty
end
it 'returns truthy' do
expect(subject.execute).to be_truthy
end
end
end
context 'when all replicable records have registries' do
it 'does nothing' do
create_list(model_class.underscore.to_sym, batch_size)
create_list(model_class_factory, batch_size)
subject.execute # create the missing registries
......@@ -153,7 +237,7 @@ describe Geo::RegistryConsistencyService, :geo_fdw, :use_clean_rails_memory_stor
end
it 'returns falsey' do
create_list(model_class.underscore.to_sym, batch_size)
create_list(model_class_factory, batch_size)
subject.execute # create the missing registries
......
# frozen_string_literal: true
module Gitlab
# Returns an ID range within a table so it can be iterated over. Repeats from
# the beginning after it reaches the end.
#
# Used by Geo in particular to iterate over a replicable and its registry
# table.
#
# Tracks a cursor for each table, by "key". If the table is smaller than
# batch_size, then a range for the whole table is returned on every call.
class LoopingBatcher
# @param [Class] model_class the class of the table to iterate on
# @param [String] key to identify the cursor. Note, cursor is already unique
# per table.
# @param [Integer] batch_size to limit the number of records in a batch
def initialize(model_class, key:, batch_size: 1000)
@model_class = model_class
@key = key
@batch_size = batch_size
end
# @return [Range] a range of IDs. `nil` if 0 records at or after the cursor.
def next_range!
return unless @model_class.any?
batch_first_id = cursor_id
batch_last_id = get_batch_last_id(batch_first_id)
return unless batch_last_id
batch_first_id..batch_last_id
end
private
# @private
#
# Get the last ID of the batch. Increment the cursor or reset it if at end.
#
# @param [Integer] batch_first_id the first ID of the batch
# @return [Integer] batch_last_id the last ID of the batch (not the table)
def get_batch_last_id(batch_first_id)
batch_last_id, more_rows = run_query(@model_class.table_name, @model_class.primary_key, batch_first_id, @batch_size)
if more_rows
increment_batch(batch_last_id)
else
reset if batch_first_id > 1
end
batch_last_id
end
def run_query(table, primary_key, batch_first_id, batch_size)
sql = <<~SQL
SELECT MAX(batch.id) AS batch_last_id,
EXISTS (
SELECT #{primary_key}
FROM #{table}
WHERE #{primary_key} > MAX(batch.id)
) AS more_rows
FROM (
SELECT #{primary_key}
FROM #{table}
WHERE #{primary_key} >= #{batch_first_id}
ORDER BY #{primary_key}
LIMIT #{batch_size}) AS batch;
SQL
result = ActiveRecord::Base.connection.exec_query(sql).first
[result["batch_last_id"], result["more_rows"]]
end
def reset
set_cursor_id(1)
end
def increment_batch(batch_last_id)
set_cursor_id(batch_last_id + 1)
end
# @private
#
# @return [Integer] the cursor ID, or 1 if it is not set
def cursor_id
Rails.cache.fetch("#{cache_key}:cursor_id") || 1
end
def set_cursor_id(id)
Rails.cache.write("#{cache_key}:cursor_id", id)
end
def cache_key
@cache_key ||= "#{self.class.name.parameterize}:#{@model_class.name.parameterize}:#{@key}:cursor_id"
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment