Commit 808ede0a authored by Terri Chu's avatar Terri Chu

Merge branch '335825-reindex-namespace-ancestry' into 'master'

Advanced Search: Add namespace_ancestry_ids to issues index

See merge request gitlab-org/gitlab!70042
parents c27332d3 1fae3ce0
...@@ -233,6 +233,11 @@ Any data or index cleanup needed to support migration retries should be handled ...@@ -233,6 +233,11 @@ Any data or index cleanup needed to support migration retries should be handled
will re-enqueue itself with a delay which is set using the `throttle_delay` option described below. The batching will re-enqueue itself with a delay which is set using the `throttle_delay` option described below. The batching
must be handled within the `migrate` method, this setting controls the re-enqueuing only. must be handled within the `migrate` method, this setting controls the re-enqueuing only.
- `batch_size` - Sets the number of documents modified during a `batched!` migration run. This size should be set to a value which allows the updates
enough time to finish. This can be tuned in combination with the `throttle_delay` option described below. The batching
must be handled within a custom `migrate` method or by using the [`Elastic::MigrationBackfillHelper`](https://gitlab.com/gitlab-org/gitlab/-/blob/master/ee/app/workers/concerns/elastic/migration_backfill_helper.rb)
`migrate` method which uses this setting. Default value is 1000 documents.
- `throttle_delay` - Sets the wait time in between batch runs. This time should be set high enough to allow each migration batch - `throttle_delay` - Sets the wait time in between batch runs. This time should be set high enough to allow each migration batch
enough time to finish. Additionally, the time should be less than 30 minutes since that is how often the enough time to finish. Additionally, the time should be less than 30 minutes since that is how often the
[`Elastic::MigrationWorker`](https://gitlab.com/gitlab-org/gitlab/-/blob/master/ee/app/workers/elastic/migration_worker.rb) [`Elastic::MigrationWorker`](https://gitlab.com/gitlab-org/gitlab/-/blob/master/ee/app/workers/elastic/migration_worker.rb)
......
...@@ -5,7 +5,7 @@ module Elastic ...@@ -5,7 +5,7 @@ module Elastic
attr_reader :version, :name, :filename attr_reader :version, :name, :filename
delegate :migrate, :skip_migration?, :completed?, :batched?, :throttle_delay, :pause_indexing?, delegate :migrate, :skip_migration?, :completed?, :batched?, :throttle_delay, :pause_indexing?,
:space_requirements?, :space_required_bytes, :obsolete?, :space_requirements?, :space_required_bytes, :obsolete?, :batch_size,
to: :migration to: :migration
ELASTICSEARCH_SIZE = 25 ELASTICSEARCH_SIZE = 25
......
...@@ -2,13 +2,15 @@ ...@@ -2,13 +2,15 @@
module Elastic module Elastic
module MigrationBackfillHelper module MigrationBackfillHelper
UPDATE_BATCH_SIZE = 100
def migrate def migrate
if completed? if completed?
log "Skipping adding #{field_name} field to #{index_name} documents migration since it is already applied" log "Skipping adding #{field_name} field to #{index_name} documents migration since it is already applied"
return return
end end
log "Adding #{field_name} field to #{index_name} documents for batch of #{self.class::QUERY_BATCH_SIZE} documents" log "Adding #{field_name} field to #{index_name} documents for batch of #{query_batch_size} documents"
document_references = process_batch! document_references = process_batch!
...@@ -61,7 +63,7 @@ module Elastic ...@@ -61,7 +63,7 @@ module Elastic
def process_batch! def process_batch!
query = { query = {
size: self.class::QUERY_BATCH_SIZE, size: query_batch_size,
query: { query: {
bool: { bool: {
filter: missing_field_filter filter: missing_field_filter
...@@ -80,11 +82,24 @@ module Elastic ...@@ -80,11 +82,24 @@ module Elastic
Gitlab::Elastic::DocumentReference.new(self.class::DOCUMENT_TYPE, id, es_id, es_parent) Gitlab::Elastic::DocumentReference.new(self.class::DOCUMENT_TYPE, id, es_id, es_parent)
end end
document_references.each_slice(self.class::UPDATE_BATCH_SIZE) do |refs| document_references.each_slice(update_batch_size) do |refs|
Elastic::ProcessInitialBookkeepingService.track!(*refs) Elastic::ProcessInitialBookkeepingService.track!(*refs)
end end
document_references document_references
end end
def query_batch_size
return self.class::QUERY_BATCH_SIZE if self.class.const_defined?(:QUERY_BATCH_SIZE)
return batch_size if respond_to?(:batch_size)
raise NotImplemented
end
def update_batch_size
return self.class::UPDATE_BATCH_SIZE if self.class.const_defined?(:UPDATE_BATCH_SIZE)
UPDATE_BATCH_SIZE
end
end end
end end
...@@ -6,11 +6,16 @@ module Elastic ...@@ -6,11 +6,16 @@ module Elastic
include Gitlab::ClassAttributes include Gitlab::ClassAttributes
DEFAULT_THROTTLE_DELAY = 5.minutes DEFAULT_THROTTLE_DELAY = 5.minutes
DEFAULT_BATCH_SIZE = 1000
def batched? def batched?
self.class.get_batched self.class.get_batched
end end
def batch_size
self.class.get_batch_size
end
def throttle_delay def throttle_delay
self.class.get_throttle_delay self.class.get_throttle_delay
end end
...@@ -55,6 +60,14 @@ module Elastic ...@@ -55,6 +60,14 @@ module Elastic
def get_throttle_delay def get_throttle_delay
class_attributes[:throttle_delay] || DEFAULT_THROTTLE_DELAY class_attributes[:throttle_delay] || DEFAULT_THROTTLE_DELAY
end end
def batch_size(value)
class_attributes[:batch_size] = value
end
def get_batch_size
class_attributes[:batch_size] || DEFAULT_BATCH_SIZE
end
end end
end end
end end
...@@ -5,26 +5,20 @@ class AddNamespaceAncestryToIssuesMapping < Elastic::Migration ...@@ -5,26 +5,20 @@ class AddNamespaceAncestryToIssuesMapping < Elastic::Migration
DOCUMENT_KLASS = Issue DOCUMENT_KLASS = Issue
def migrate
if completed?
log 'Skipping adding namespace_ancestry to issues mapping migration since it is already applied'
return
end
log 'Adding namespace_ancestry to issues mapping'
update_mapping!(index_name, { properties: { namespace_ancestry: { type: 'text', index_prefixes: { min_chars: 1, max_chars: 19 } } } })
end
def completed?
helper.refresh_index(index_name: index_name)
mappings = helper.get_mapping(index_name: index_name)
mappings.dig('namespace_ancestry').present?
end
private private
def index_name def index_name
DOCUMENT_KLASS.__elasticsearch__.index_name DOCUMENT_KLASS.__elasticsearch__.index_name
end end
def new_mappings
{
namespace_ancestry: {
type: 'text',
index_prefixes: {
min_chars: 1, max_chars: 19
}
}
}
end
end end
# frozen_string_literal: true
class AddNamespaceAncestryIdsToIssuesMapping < Elastic::Migration
include Elastic::MigrationUpdateMappingsHelper
DOCUMENT_TYPE = Issue
private
def index_name
DOCUMENT_TYPE.__elasticsearch__.index_name
end
def new_mappings
{
namespace_ancestry_ids: {
type: 'keyword'
}
}
end
end
# frozen_string_literal: true
class RedoBackfillNamespaceAncestryIdsForIssues < Elastic::Migration
include Elastic::MigrationBackfillHelper
batched!
batch_size 5000
throttle_delay 3.minutes
DOCUMENT_TYPE = Issue
UPDATE_BATCH_SIZE = 100
private
def index_name
DOCUMENT_TYPE.__elasticsearch__.index_name
end
def field_name
:namespace_ancestry_ids
end
end
...@@ -5,6 +5,8 @@ module Elastic ...@@ -5,6 +5,8 @@ module Elastic
class ApplicationInstanceProxy < Elasticsearch::Model::Proxy::InstanceMethodsProxy class ApplicationInstanceProxy < Elasticsearch::Model::Proxy::InstanceMethodsProxy
include InstanceProxyUtil include InstanceProxyUtil
NAMESPACE_ANCESTRY_SEPARATOR = '-'
def es_parent def es_parent
"project_#{target.project_id}" unless target.is_a?(Project) || target&.project_id.nil? "project_#{target.project_id}" unless target.is_a?(Project) || target&.project_id.nil?
end end
...@@ -20,7 +22,7 @@ module Elastic ...@@ -20,7 +22,7 @@ module Elastic
def namespace_ancestry def namespace_ancestry
project = target.is_a?(Project) ? target : target.project project = target.is_a?(Project) ? target : target.project
namespace = project.namespace namespace = project.namespace
namespace.self_and_ancestor_ids(hierarchy_order: :desc).join('-') namespace.self_and_ancestor_ids(hierarchy_order: :desc).join(NAMESPACE_ANCESTRY_SEPARATOR) + NAMESPACE_ANCESTRY_SEPARATOR
end end
private private
......
...@@ -36,7 +36,8 @@ module Elastic ...@@ -36,7 +36,8 @@ module Elastic
indexes :visibility_level, type: :integer indexes :visibility_level, type: :integer
indexes :issues_access_level, type: :integer indexes :issues_access_level, type: :integer
indexes :upvotes, type: :integer indexes :upvotes, type: :integer
indexes :namespace_ancestry, type: :text, index_prefixes: { min_chars: 1, max_chars: 19 } indexes :namespace_ancestry, type: :text, index_prefixes: { min_chars: 1, max_chars: 19 } # deprecated
indexes :namespace_ancestry_ids, type: :keyword
end end
end end
end end
......
...@@ -21,7 +21,7 @@ module Elastic ...@@ -21,7 +21,7 @@ module Elastic
data['issues_access_level'] = safely_read_project_feature_for_elasticsearch(:issues) data['issues_access_level'] = safely_read_project_feature_for_elasticsearch(:issues)
data['upvotes'] = target.upvotes_count data['upvotes'] = target.upvotes_count
data['namespace_ancestry'] = target.namespace_ancestry if Elastic::DataMigrationService.migration_has_finished?(:add_namespace_ancestry_to_issues_mapping) data['namespace_ancestry_ids'] = target.namespace_ancestry if Elastic::DataMigrationService.migration_has_finished?(:add_namespace_ancestry_ids_to_issues_mapping)
data.merge(generic_attributes) data.merge(generic_attributes)
end end
......
# frozen_string_literal: true # frozen_string_literal: true
require 'spec_helper' require 'spec_helper'
require_relative 'migration_shared_examples'
require File.expand_path('ee/elastic/migrate/20210722112500_add_upvotes_mappings_to_merge_requests.rb') require File.expand_path('ee/elastic/migrate/20210722112500_add_upvotes_mappings_to_merge_requests.rb')
RSpec.describe AddUpvotesMappingsToMergeRequests, :elastic, :sidekiq_inline do RSpec.describe AddUpvotesMappingsToMergeRequests, :elastic, :sidekiq_inline do
let(:version) { 20210722112500 } let(:version) { 20210722112500 }
let(:migration) { described_class.new(version) }
let(:helper) { Gitlab::Elastic::Helper.new }
before do
allow(migration).to receive(:helper).and_return(helper)
end
describe '.migrate' do
subject { migration.migrate }
context 'when migration is already completed' do
it 'does not modify data' do
expect(helper).not_to receive(:update_mapping)
subject
end
end
context 'migration process' do
before do
allow(helper).to receive(:get_mapping).and_return({})
end
it 'updates the issues index mappings' do
expect(helper).to receive(:update_mapping)
subject
end
end
end
describe '.completed?' do
context 'mapping has been updated' do
specify { expect(migration).to be_completed }
end
context 'mapping has not been updated' do
before do
allow(helper).to receive(:get_mapping).and_return({})
end
specify { expect(migration).not_to be_completed } include_examples 'migration adds mapping'
end
end
end end
# frozen_string_literal: true # frozen_string_literal: true
require 'spec_helper' require 'spec_helper'
require_relative 'migration_shared_examples'
require File.expand_path('ee/elastic/migrate/20210813134600_add_namespace_ancestry_to_issues_mapping.rb') require File.expand_path('ee/elastic/migrate/20210813134600_add_namespace_ancestry_to_issues_mapping.rb')
RSpec.describe AddNamespaceAncestryToIssuesMapping, :elastic, :sidekiq_inline do RSpec.describe AddNamespaceAncestryToIssuesMapping, :elastic, :sidekiq_inline do
let(:version) { 20210813134600 } let(:version) { 20210813134600 }
let(:migration) { described_class.new(version) }
let(:helper) { Gitlab::Elastic::Helper.new }
before do
allow(migration).to receive(:helper).and_return(helper)
end
describe '.migrate' do
subject { migration.migrate }
context 'when migration is already completed' do
it 'does not modify data' do
expect(helper).not_to receive(:update_mapping)
subject
end
end
context 'migration process' do
before do
allow(helper).to receive(:get_mapping).and_return({})
end
it 'updates the issues index mappings' do
expect(helper).to receive(:update_mapping)
subject
end
end
end
describe '.completed?' do
context 'mapping has been updated' do
specify { expect(migration).to be_completed }
end
context 'mapping has not been updated' do
before do
allow(helper).to receive(:get_mapping).and_return({})
end
specify { expect(migration).not_to be_completed } include_examples 'migration adds mapping'
end
end
end end
# frozen_string_literal: true
require 'spec_helper'
require_relative 'migration_shared_examples'
require File.expand_path('ee/elastic/migrate/20210910094600_add_namespace_ancestry_ids_to_issues_mapping.rb')
RSpec.describe AddNamespaceAncestryIdsToIssuesMapping, :elastic, :sidekiq_inline do
let(:version) { 20210910094600 }
include_examples 'migration adds mapping'
end
# frozen_string_literal: true
require 'spec_helper'
require_relative 'migration_shared_examples'
require File.expand_path('ee/elastic/migrate/20210910100000_redo_backfill_namespace_ancestry_ids_for_issues.rb')
RSpec.describe RedoBackfillNamespaceAncestryIdsForIssues, :elastic, :sidekiq_inline do
let(:version) { 20210910100000 }
include_examples 'migration backfills a field' do
let(:objects) { create_list(:issue, 3) }
let(:field_name) { :namespace_ancestry_ids }
let(:field_value) { "1-2-3-" }
let(:expected_throttle_delay) { 3.minutes }
let(:expected_batch_size) { 5000 }
end
end
# frozen_string_literal: true # frozen_string_literal: true
require 'spec_helper'
require File.expand_path('ee/elastic/migrate/20210825110300_backfill_namespace_ancestry_for_issues.rb')
RSpec.describe BackfillNamespaceAncestryForIssues, :elastic, :sidekiq_inline do RSpec.shared_examples 'migration backfills a field' do
let(:version) { 20210825110300 }
let(:migration) { described_class.new(version) } let(:migration) { described_class.new(version) }
let(:issues) { create_list(:issue, 3) } let(:klass) { objects.first.class }
before do before do
stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true) stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
set_elasticsearch_migration_to(version, including: false)
# ensure issues are indexed # ensure objects are indexed
issues objects
ensure_elasticsearch_index! ensure_elasticsearch_index!
end end
...@@ -19,7 +17,8 @@ RSpec.describe BackfillNamespaceAncestryForIssues, :elastic, :sidekiq_inline do ...@@ -19,7 +17,8 @@ RSpec.describe BackfillNamespaceAncestryForIssues, :elastic, :sidekiq_inline do
describe 'migration_options' do describe 'migration_options' do
it 'has migration options set', :aggregate_failures do it 'has migration options set', :aggregate_failures do
expect(migration).to be_batched expect(migration).to be_batched
expect(migration.throttle_delay).to eq(3.minutes) expect(migration.throttle_delay).to eq(expected_throttle_delay)
expect(migration.batch_size).to eq(expected_batch_size)
end end
end end
...@@ -36,47 +35,39 @@ RSpec.describe BackfillNamespaceAncestryForIssues, :elastic, :sidekiq_inline do ...@@ -36,47 +35,39 @@ RSpec.describe BackfillNamespaceAncestryForIssues, :elastic, :sidekiq_inline do
context 'migration process' do context 'migration process' do
before do before do
remove_namespace_ancestry_from_issues(issues) remove_field_from_objects(objects)
end end
context 'when migration fails' do it 'updates all documents' do
let(:logger) { instance_double('::Gitlab::Elasticsearch::Logger') }
before do
allow(migration).to receive(:logger).and_return(logger)
allow(logger).to receive(:info)
allow(migration).to receive(:process_batch!).and_raise('failed to process')
end
it 'logs and reraises an error' do
expect(logger).to receive(:error).with(/migrate failed with error/)
expect { subject }.to raise_error(RuntimeError)
end
end
it 'updates all issue documents' do
# track calls are batched in groups of 100 # track calls are batched in groups of 100
expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).once do |*tracked_refs| expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).once.and_call_original do |*tracked_refs|
expect(tracked_refs.count).to eq(3) expect(tracked_refs.count).to eq(3)
end end
subject subject
ensure_elasticsearch_index!
expect(migration.completed?).to be_truthy
end end
it 'only updates issue documents missing namespace_ancestry', :aggregate_failures do it 'only updates documents missing a field', :aggregate_failures do
issue = issues.first object = objects.first
add_namespace_ancestry_for_issues(issues[1..-1]) add_field_for_objects(objects[1..-1])
expected = [Gitlab::Elastic::DocumentReference.new(Issue, issue.id, issue.es_id, issue.es_parent)] expected = [Gitlab::Elastic::DocumentReference.new(klass, object.id, object.es_id, object.es_parent)]
expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).with(*expected).once expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).with(*expected).once.and_call_original
subject subject
ensure_elasticsearch_index!
expect(migration.completed?).to be_truthy
end end
it 'processes in batches', :aggregate_failures do it 'processes in batches', :aggregate_failures do
stub_const("#{described_class}::QUERY_BATCH_SIZE", 2) allow(migration).to receive(:batch_size).and_return(2)
stub_const("#{described_class}::UPDATE_BATCH_SIZE", 1) allow(migration).to receive(:update_batch_size).and_return(1)
expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).exactly(3).times.and_call_original expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).exactly(3).times.and_call_original
...@@ -86,89 +77,119 @@ RSpec.describe BackfillNamespaceAncestryForIssues, :elastic, :sidekiq_inline do ...@@ -86,89 +77,119 @@ RSpec.describe BackfillNamespaceAncestryForIssues, :elastic, :sidekiq_inline do
ensure_elasticsearch_index! ensure_elasticsearch_index!
migration.migrate migration.migrate
ensure_elasticsearch_index!
expect(migration.completed?).to be_truthy
end end
end end
end end
describe '.completed?' do describe '.completed?' do
context 'when documents are missing namespace_ancestry' do context 'when documents are missing field' do
before do before do
remove_namespace_ancestry_from_issues(issues) remove_field_from_objects(objects)
end end
specify { expect(migration).not_to be_completed } specify { expect(migration).not_to be_completed }
context 'when there are no documents in index' do
before do
delete_issues_from_index!
end
specify { expect(migration).to be_completed }
end
end end
context 'when no documents are missing namespace_ancestry' do context 'when no documents are missing field' do
specify { expect(migration).to be_completed } specify { expect(migration).to be_completed }
end end
end end
private private
def add_namespace_ancestry_for_issues(issues) def add_field_for_objects(objects)
script = { script = {
source: "ctx._source['namespace_ancestry'] = params.namespace_ancestry;", source: "ctx._source['#{field_name}'] = params.#{field_name};",
lang: "painless", lang: "painless",
params: { params: {
namespace_ancestry: "1-2-3" field_name => field_value
} }
} }
update_by_query(issues, script) update_by_query(objects, script)
end end
def remove_namespace_ancestry_from_issues(issues) def remove_field_from_objects(objects)
script = { script = {
source: "ctx._source.remove('namespace_ancestry')" source: "ctx._source.remove('#{field_name}')"
} }
update_by_query(issues, script) update_by_query(objects, script)
end end
def update_by_query(issues, script) def update_by_query(objects, script)
issue_ids = issues.map { |i| i.id } object_ids = objects.map(&:id)
client = Issue.__elasticsearch__.client client = klass.__elasticsearch__.client
client.update_by_query({ client.update_by_query({
index: Issue.__elasticsearch__.index_name, index: klass.__elasticsearch__.index_name,
wait_for_completion: true, # run synchronously wait_for_completion: true, # run synchronously
refresh: true, # make operation visible to search refresh: true, # make operation visible to search
body: { body: {
script: script, script: script,
query: { query: {
bool: { bool: {
must: [ must: [
{ {
terms: { terms: {
id: issue_ids id: object_ids
} }
} }
] ]
} }
} }
} }
}) })
end
end
RSpec.shared_examples 'migration adds mapping' do
let(:migration) { described_class.new(version) }
let(:helper) { Gitlab::Elastic::Helper.new }
before do
allow(migration).to receive(:helper).and_return(helper)
end end
def delete_issues_from_index! describe '.migrate' do
client = Issue.__elasticsearch__.client subject { migration.migrate }
client.delete_by_query({
index: Issue.__elasticsearch__.index_name, context 'when migration is already completed' do
wait_for_completion: true, # run synchronously it 'does not modify data' do
body: { expect(helper).not_to receive(:update_mapping)
query: {
match_all: {} subject
} end
} end
})
context 'migration process' do
before do
allow(helper).to receive(:get_mapping).and_return({})
end
it 'updates the issues index mappings' do
expect(helper).to receive(:update_mapping)
subject
end
end
end
describe '.completed?' do
context 'mapping has been updated' do
specify { expect(migration).to be_completed }
end
context 'mapping has not been updated' do
before do
allow(helper).to receive(:get_mapping).and_return({})
end
specify { expect(migration).not_to be_completed }
end
end end
end end
...@@ -113,13 +113,13 @@ RSpec.describe Issue, :elastic do ...@@ -113,13 +113,13 @@ RSpec.describe Issue, :elastic do
let_it_be(:issue) { create(:issue, project: project, assignees: [assignee]) } let_it_be(:issue) { create(:issue, project: project, assignees: [assignee]) }
let_it_be(:award_emoji) { create(:award_emoji, :upvote, awardable: issue) } let_it_be(:award_emoji) { create(:award_emoji, :upvote, awardable: issue) }
context 'when add_namespace_ancestry_to_issues_mapping migration is not done' do context 'when add_namespace_ancestry_ids_to_issues_mapping migration is not done' do
before do before do
set_elasticsearch_migration_to :add_namespace_ancestry_to_issues_mapping, including: false set_elasticsearch_migration_to :add_namespace_ancestry_ids_to_issues_mapping, including: false
end end
it "returns json without namespace_ancestry" do it "returns json without namespace_ancestry" do
expect(issue.__elasticsearch__.as_indexed_json.keys).not_to include('namespace_ancestry') expect(issue.__elasticsearch__.as_indexed_json.keys).not_to include('namespace_ancestry_ids')
end end
end end
...@@ -138,7 +138,7 @@ RSpec.describe Issue, :elastic do ...@@ -138,7 +138,7 @@ RSpec.describe Issue, :elastic do
'type' => issue.es_type, 'type' => issue.es_type,
'state' => issue.state, 'state' => issue.state,
'upvotes' => 1, 'upvotes' => 1,
'namespace_ancestry' => "#{group.id}-#{subgroup.id}" 'namespace_ancestry_ids' => "#{group.id}-#{subgroup.id}-"
}) })
expected_hash['assignee_id'] = [assignee.id] expected_hash['assignee_id'] = [assignee.id]
......
...@@ -48,4 +48,18 @@ RSpec.describe Elastic::MigrationOptions do ...@@ -48,4 +48,18 @@ RSpec.describe Elastic::MigrationOptions do
expect(subject).to eq(30.seconds) expect(subject).to eq(30.seconds)
end end
end end
describe '#batch_size' do
subject { migration_class.new.batch_size }
it 'has a default' do
expect(subject).to eq(described_class::DEFAULT_BATCH_SIZE)
end
it 'respects when batch_size is set for the class' do
migration_class.batch_size 10000
expect(subject).to eq(10000)
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment