Commit 626518f5 authored by Terri Chu's avatar Terri Chu Committed by Dylan Griffith

Store slice multiplier and max slices running for reindexing in database

parent 34099496
---
title: Store slice multiplier and max slices running for reindex in DB
merge_request: 60861
author:
type: changed
# frozen_string_literal: true
class AddSliceMultiplierAndMaxSlicesToElasticReindexingTask < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DEFAULT_MAX_TOTAL_SLICES_RUNNING = 60
DEFAULT_SLICE_MULTIPLIER = 2
def change
add_column :elastic_reindexing_tasks, :max_slices_running, :integer,
limit: 2,
default: DEFAULT_MAX_TOTAL_SLICES_RUNNING,
null: false
add_column :elastic_reindexing_tasks, :slice_multiplier, :integer,
limit: 2,
default: DEFAULT_SLICE_MULTIPLIER,
null: false
end
end
08f4cd1f8f7ddc336d0edee7581b0cb59e0cdc7b5f3cbeb1ccdcd7a1c52d366f
\ No newline at end of file
...@@ -12447,6 +12447,8 @@ CREATE TABLE elastic_reindexing_tasks ( ...@@ -12447,6 +12447,8 @@ CREATE TABLE elastic_reindexing_tasks (
error_message text, error_message text,
documents_count_target integer, documents_count_target integer,
delete_original_index_at timestamp with time zone, delete_original_index_at timestamp with time zone,
max_slices_running smallint DEFAULT 60 NOT NULL,
slice_multiplier smallint DEFAULT 2 NOT NULL,
CONSTRAINT check_04151aca42 CHECK ((char_length(index_name_from) <= 255)), CONSTRAINT check_04151aca42 CHECK ((char_length(index_name_from) <= 255)),
CONSTRAINT check_7f64acda8e CHECK ((char_length(error_message) <= 255)), CONSTRAINT check_7f64acda8e CHECK ((char_length(error_message) <= 255)),
CONSTRAINT check_85ebff7124 CHECK ((char_length(index_name_to) <= 255)), CONSTRAINT check_85ebff7124 CHECK ((char_length(index_name_to) <= 255)),
...@@ -5,6 +5,9 @@ class Elastic::ReindexingTask < ApplicationRecord ...@@ -5,6 +5,9 @@ class Elastic::ReindexingTask < ApplicationRecord
self.table_name = 'elastic_reindexing_tasks' self.table_name = 'elastic_reindexing_tasks'
validates :max_slices_running, presence: true
validates :slice_multiplier, presence: true
ignore_columns %i[documents_count index_name_from index_name_to elastic_task documents_count_target], remove_with: '14.0', remove_after: '2021-04-22' ignore_columns %i[documents_count index_name_from index_name_to elastic_task documents_count_target], remove_with: '14.0', remove_after: '2021-04-22'
has_many :subtasks, class_name: 'Elastic::ReindexingSubtask', foreign_key: :elastic_reindexing_task_id has_many :subtasks, class_name: 'Elastic::ReindexingSubtask', foreign_key: :elastic_reindexing_task_id
......
...@@ -13,8 +13,6 @@ module Elastic ...@@ -13,8 +13,6 @@ module Elastic
DELETE_ORIGINAL_INDEX_AFTER = 14.days DELETE_ORIGINAL_INDEX_AFTER = 14.days
REINDEX_MAX_RETRY_LIMIT = 10 REINDEX_MAX_RETRY_LIMIT = 10
REINDEX_MAX_TOTAL_SLICES_RUNNING = 60
REINDEX_SLICE_MULTIPLIER = 2
def execute def execute
case current_task.state.to_sym case current_task.state.to_sym
...@@ -90,7 +88,7 @@ module Elastic ...@@ -90,7 +88,7 @@ module Elastic
documents_count: documents_count documents_count: documents_count
) )
max_slice = elastic_helper.get_settings(index_name: old_index_name).dig('number_of_shards').to_i * REINDEX_SLICE_MULTIPLIER max_slice = elastic_helper.get_settings(index_name: old_index_name).dig('number_of_shards').to_i * current_task.slice_multiplier
0.upto(max_slice - 1).to_a.each do |slice| 0.upto(max_slice - 1).to_a.each do |slice|
subtask.slices.create!( subtask.slices.create!(
elastic_max_slice: max_slice, elastic_max_slice: max_slice,
...@@ -205,7 +203,7 @@ module Elastic ...@@ -205,7 +203,7 @@ module Elastic
def trigger_reindexing_slices(slices_in_progress = 0) def trigger_reindexing_slices(slices_in_progress = 0)
current_task.subtasks.each do |subtask| current_task.subtasks.each do |subtask|
slices_to_start = REINDEX_MAX_TOTAL_SLICES_RUNNING - slices_in_progress slices_to_start = current_task.max_slices_running - slices_in_progress
break if slices_to_start == 0 break if slices_to_start == 0
subtask.slices.not_started.limit(slices_to_start).each do |slice| subtask.slices.not_started.limit(slices_to_start).each do |slice|
......
...@@ -13,6 +13,11 @@ RSpec.describe Elastic::ReindexingTask, type: :model do ...@@ -13,6 +13,11 @@ RSpec.describe Elastic::ReindexingTask, type: :model do
it { is_expected.to have_many(:subtasks) } it { is_expected.to have_many(:subtasks) }
end end
describe 'validations' do
it { is_expected.to validate_presence_of(:max_slices_running) }
it { is_expected.to validate_presence_of(:slice_multiplier) }
end
it 'only allows one running task at a time' do it 'only allows one running task at a time' do
expect { create(:elastic_reindexing_task, state: :success) }.not_to raise_error expect { create(:elastic_reindexing_task, state: :success) }.not_to raise_error
expect { create(:elastic_reindexing_task) }.not_to raise_error expect { create(:elastic_reindexing_task) }.not_to raise_error
......
...@@ -78,7 +78,7 @@ RSpec.describe Elastic::ClusterReindexingService, :elastic do ...@@ -78,7 +78,7 @@ RSpec.describe Elastic::ClusterReindexingService, :elastic do
end end
context 'state: reindexing' do context 'state: reindexing' do
let!(:task) { create(:elastic_reindexing_task, state: :reindexing) } let!(:task) { create(:elastic_reindexing_task, state: :reindexing, max_slices_running: 1) }
let!(:subtask) { create(:elastic_reindexing_subtask, elastic_reindexing_task: task, documents_count: 10) } let!(:subtask) { create(:elastic_reindexing_subtask, elastic_reindexing_task: task, documents_count: 10) }
let!(:slices) { [slice_1, slice_2, slice_3] } let!(:slices) { [slice_1, slice_2, slice_3] }
let(:slice_1) { create(:elastic_reindexing_slice, elastic_reindexing_subtask: subtask, elastic_max_slice: 3, elastic_slice: 0) } let(:slice_1) { create(:elastic_reindexing_slice, elastic_reindexing_subtask: subtask, elastic_max_slice: 3, elastic_slice: 0) }
...@@ -106,7 +106,10 @@ RSpec.describe Elastic::ClusterReindexingService, :elastic do ...@@ -106,7 +106,10 @@ RSpec.describe Elastic::ClusterReindexingService, :elastic do
end end
it 'errors if documents count is different' do it 'errors if documents count is different' do
cluster_reindexing_service.execute # run once to kick off reindexing for slices # kick off reindexing for each slice
slices.count.times do
cluster_reindexing_service.execute
end
expect { cluster_reindexing_service.execute }.to change { task.reload.state }.from('reindexing').to('failure') expect { cluster_reindexing_service.execute }.to change { task.reload.state }.from('reindexing').to('failure')
expect(task.reload.error_message).to match(/count is different/) expect(task.reload.error_message).to match(/count is different/)
...@@ -180,15 +183,13 @@ RSpec.describe Elastic::ClusterReindexingService, :elastic do ...@@ -180,15 +183,13 @@ RSpec.describe Elastic::ClusterReindexingService, :elastic do
context 'slice batching' do context 'slice batching' do
it 'kicks off the next set of slices if the current slice is finished', :aggregate_failures do it 'kicks off the next set of slices if the current slice is finished', :aggregate_failures do
stub_const("#{described_class.name}::REINDEX_MAX_TOTAL_SLICES_RUNNING", 1) expect { cluster_reindexing_service.execute }.to change { slice_1.reload.elastic_task }
expect { cluster_reindexing_service.execute }.to change { slices.first.reload.elastic_task }
expect(helper).to have_received(:reindex).with(from: subtask.index_name_from, to: subtask.index_name_to, max_slice: 3, slice: 0) expect(helper).to have_received(:reindex).with(from: subtask.index_name_from, to: subtask.index_name_to, max_slice: 3, slice: 0)
expect { cluster_reindexing_service.execute }.to change { slices.second.reload.elastic_task } expect { cluster_reindexing_service.execute }.to change { slice_2.reload.elastic_task }
expect(helper).to have_received(:reindex).with(from: subtask.index_name_from, to: subtask.index_name_to, max_slice: 3, slice: 1) expect(helper).to have_received(:reindex).with(from: subtask.index_name_from, to: subtask.index_name_to, max_slice: 3, slice: 1)
expect { cluster_reindexing_service.execute }.to change { slices.third.reload.elastic_task } expect { cluster_reindexing_service.execute }.to change { slice_3.reload.elastic_task }
expect(helper).to have_received(:reindex).with(from: subtask.index_name_from, to: subtask.index_name_to, max_slice: 3, slice: 2) expect(helper).to have_received(:reindex).with(from: subtask.index_name_from, to: subtask.index_name_to, max_slice: 3, slice: 2)
end end
end end
...@@ -205,8 +206,6 @@ RSpec.describe Elastic::ClusterReindexingService, :elastic do ...@@ -205,8 +206,6 @@ RSpec.describe Elastic::ClusterReindexingService, :elastic do
before do before do
allow(helper).to receive(:documents_count).with(index_name: subtask.index_name_to).and_return(subtask.reload.documents_count) allow(helper).to receive(:documents_count).with(index_name: subtask.index_name_to).and_return(subtask.reload.documents_count)
allow(helper).to receive(:get_settings).with(index_name: subtask.index_name_from).and_return(current_settings.with_indifferent_access) allow(helper).to receive(:get_settings).with(index_name: subtask.index_name_from).and_return(current_settings.with_indifferent_access)
cluster_reindexing_service.execute # run once to kick off reindexing for slices
end end
it 'launches all state steps' do it 'launches all state steps' do
...@@ -214,6 +213,11 @@ RSpec.describe Elastic::ClusterReindexingService, :elastic do ...@@ -214,6 +213,11 @@ RSpec.describe Elastic::ClusterReindexingService, :elastic do
expect(helper).to receive(:switch_alias).with(to: subtask.index_name_to, from: subtask.index_name_from, alias_name: subtask.alias_name) expect(helper).to receive(:switch_alias).with(to: subtask.index_name_to, from: subtask.index_name_from, alias_name: subtask.alias_name)
expect(Gitlab::CurrentSettings).to receive(:update!).with(elasticsearch_pause_indexing: false) expect(Gitlab::CurrentSettings).to receive(:update!).with(elasticsearch_pause_indexing: false)
# kick off reindexing for each slice
slices.count.times do
cluster_reindexing_service.execute
end
expect { cluster_reindexing_service.execute }.to change { task.reload.state }.from('reindexing').to('success') expect { cluster_reindexing_service.execute }.to change { task.reload.state }.from('reindexing').to('success')
expect(task.reload.delete_original_index_at).to be_within(1.minute).of(described_class::DELETE_ORIGINAL_INDEX_AFTER.from_now) expect(task.reload.delete_original_index_at).to be_within(1.minute).of(described_class::DELETE_ORIGINAL_INDEX_AFTER.from_now)
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment