Commit f60c32a6 authored by Mark Chao's avatar Mark Chao

Turn namespace rollout into bulk operation

Ensure bulk insertion ordering by emulating incremental ordering
parent 0c807b89
......@@ -12,35 +12,55 @@ class ElasticsearchIndexedNamespace < ApplicationRecord
scope :namespace_in, -> (namespaces) { where(namespace_id: namespaces) }
BATCH_OPERATION_SIZE = 1000
def self.target_attr_name
:namespace_id
end
# rubocop: disable Naming/UncommunicativeMethodParamName
def self.index_first_n_namespaces_of_plan(plan, n)
def self.index_first_n_namespaces_of_plan(plan, number_of_namespaces)
indexed_namespaces = self.select(:namespace_id)
now = Time.now
GitlabSubscription
ids = GitlabSubscription
.with_hosted_plan(plan)
.where.not(namespace_id: indexed_namespaces)
.order(namespace_id: :asc)
.limit(n)
.limit(number_of_namespaces)
.pluck(:namespace_id)
.each { |id| create!(namespace_id: id) }
ids.in_groups_of(BATCH_OPERATION_SIZE, false) do |batch_ids|
insert_rows = batch_ids.map do |id|
# Ensure ordering with incremental created_at,
# so rollback can start from the bigger namespace_id
now += 1.0e-05.seconds
{ created_at: now, updated_at: now, namespace_id: id }
end
Gitlab::Database.bulk_insert(table_name, insert_rows)
jobs = batch_ids.map { |id| [id, :index] }
ElasticNamespaceIndexerWorker.bulk_perform_async(jobs)
end
end
def self.unindex_last_n_namespaces_of_plan(plan, n)
def self.unindex_last_n_namespaces_of_plan(plan, number_of_namespaces)
namespaces_under_plan = GitlabSubscription.with_hosted_plan(plan).select(:namespace_id)
# rubocop: disable Cop/DestroyAll
# destroy_all is used in order to trigger `delete_from_index` callback
where(namespace: namespaces_under_plan)
ids = where(namespace: namespaces_under_plan)
.order(created_at: :desc)
.limit(n)
.destroy_all
# rubocop: enable Cop/DestroyAll
.limit(number_of_namespaces)
.pluck(:namespace_id)
ids.in_groups_of(BATCH_OPERATION_SIZE, false) do |batch_ids|
where(namespace_id: batch_ids).delete_all
jobs = batch_ids.map { |id| [id, :delete] }
ElasticNamespaceIndexerWorker.bulk_perform_async(jobs)
end
end
# rubocop: enable Naming/UncommunicativeMethodParamName
private
......
......@@ -47,24 +47,30 @@ describe ElasticsearchIndexedNamespace do
described_class.order(:created_at).pluck(:namespace_id)
end
def expect_queue_to_contain(*args)
expect(ElasticNamespaceIndexerWorker.jobs).to include(
hash_including("args" => args)
)
end
describe '.index_first_n_namespaces_of_plan' do
it 'creates records, scoped by plan and ordered by namespace id' do
ids = namespaces.map(&:id)
expect(ElasticNamespaceIndexerWorker).to receive(:perform_async).with(ids[0], :index)
described_class.index_first_n_namespaces_of_plan('gold', 1)
expect(get_indexed_namespaces).to eq([ids[0]])
expect(ElasticNamespaceIndexerWorker).to receive(:perform_async).with(ids[2], :index)
expect_queue_to_contain(ids[0], "index")
described_class.index_first_n_namespaces_of_plan('gold', 2)
expect(get_indexed_namespaces).to eq([ids[0], ids[2]])
expect(ElasticNamespaceIndexerWorker).to receive(:perform_async).with(ids[1], :index)
expect_queue_to_contain(ids[2], "index")
described_class.index_first_n_namespaces_of_plan('silver', 1)
expect(get_indexed_namespaces).to eq([ids[0], ids[2], ids[1]])
expect_queue_to_contain(ids[1], "index")
end
end
......@@ -77,22 +83,22 @@ describe ElasticsearchIndexedNamespace do
it 'creates records, scoped by plan and ordered by namespace id' do
ids = namespaces.map(&:id)
expect(get_indexed_namespaces).to eq([ids[0], ids[2], ids[1]])
expect(ElasticNamespaceIndexerWorker).to receive(:perform_async).with(ids[2], :delete)
expect(get_indexed_namespaces).to contain_exactly(ids[0], ids[2], ids[1])
described_class.unindex_last_n_namespaces_of_plan('gold', 1)
expect(get_indexed_namespaces).to eq([ids[0], ids[1]])
expect(ElasticNamespaceIndexerWorker).to receive(:perform_async).with(ids[1], :delete)
expect(get_indexed_namespaces).to contain_exactly(ids[0], ids[1])
expect_queue_to_contain(ids[2], "delete")
described_class.unindex_last_n_namespaces_of_plan('silver', 1)
expect(get_indexed_namespaces).to eq([ids[0]])
expect(ElasticNamespaceIndexerWorker).to receive(:perform_async).with(ids[0], :delete)
expect(get_indexed_namespaces).to contain_exactly(ids[0])
expect_queue_to_contain(ids[1], "delete")
described_class.unindex_last_n_namespaces_of_plan('gold', 1)
expect(get_indexed_namespaces).to be_empty
expect_queue_to_contain(ids[0], "delete")
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment