Commit 754b1ee4 authored by Nick Thomas's avatar Nick Thomas

Merge branch '5299-elastic-recovery' into 'master'

ElasticSearch indexing on web

Closes #5299

See merge request gitlab-org/gitlab-ee!11408
parents e5a5fbd8 65dac44a
......@@ -103,6 +103,7 @@
- [admin_emails, 1]
- [elastic_batch_project_indexer, 1]
- [elastic_indexer, 1]
- [elastic_full_index, 1]
- [elastic_commit_indexer, 1]
- [elastic_namespace_indexer, 1]
- [export_csv, 1]
......
# frozen_string_literal: true
class Admin::ElasticsearchController < Admin::ApplicationController
before_action :check_elasticsearch_web_indexing_feature_flag!
def check_elasticsearch_web_indexing_feature_flag!
render_404 unless Feature.enabled?(:elasticsearch_web_indexing)
end
# POST
# Scheduling indexing jobs
def enqueue_index
::Elastic::IndexProjectsService.new.execute
notice = _('Elasticsearch indexing started')
queue_link = helpers.link_to(_('(check progress)'), sidekiq_path + '/queues/elastic_full_index')
flash[:notice] = "#{notice} #{queue_link}".html_safe
redirect_back_or_default
end
end
......@@ -12,6 +12,14 @@ module Elastic
repository_access_level
).freeze
INDEXED_ASSOCIATIONS = [
:issues,
:merge_requests,
:snippets,
:notes,
:milestones
].freeze
included do
include ApplicationSearch
......@@ -97,6 +105,26 @@ module Elastic
self.__elasticsearch__.search(query_hash)
end
def self.indexed_association_classes
INDEXED_ASSOCIATIONS.map do |association_name|
reflect_on_association(association_name).klass
end
end
def each_indexed_association
INDEXED_ASSOCIATIONS.each do |association_name|
association = self.association(association_name)
scope = association.scope
klass = association.klass
if klass == Note
scope = scope.searchable
end
yield klass, scope
end
end
end
end
end
......@@ -11,7 +11,7 @@ module EE
belongs_to :review, inverse_of: :notes
scope :searchable, -> { where(system: false) }
scope :searchable, -> { where(system: false).includes(:noteable) }
end
# Original method in Elastic::ApplicationSearch
......
# frozen_string_literal: true
module Elastic
class IndexProjectsByIdService
def execute(project_ids: [], namespace_ids: [])
queue_name = ElasticFullIndexWorker.queue
project_ids.each do |project_id|
ElasticIndexerWorker
.set(queue: queue_name)
.perform_async(:index, 'Project', project_id, nil)
end
namespace_ids.each do |namespace_id|
ElasticNamespaceIndexerWorker
.set(queue: queue_name)
.perform_async(namespace_id, :index)
end
end
end
end
# frozen_string_literal: true
module Elastic
class IndexProjectsByRangeService
DEFAULT_BATCH_SIZE = 1000
BULK_PERFORM_SIZE = 1000
def execute(start_id: nil, end_id: nil, batch_size: nil)
end_id ||= ::Project.maximum(:id)
return unless end_id
start_id ||= 1
batch_size ||= DEFAULT_BATCH_SIZE
args = (start_id..end_id).each_slice(batch_size).map do |range|
[range.first, range.last]
end
args.each_slice(BULK_PERFORM_SIZE) do |args|
ElasticFullIndexWorker.bulk_perform_async(args)
end
end
end
end
# frozen_string_literal: true
module Elastic
class IndexProjectsService
def execute
if Gitlab::CurrentSettings.elasticsearch_limit_indexing?
IndexProjectsByIdService.new.execute(
project_ids: ElasticsearchIndexedProject.target_ids,
namespace_ids: ElasticsearchIndexedNamespace.target_ids
)
else
IndexProjectsByRangeService.new.execute
end
end
end
end
# frozen_string_literal: true
module Elastic
class IndexRecordService
include Elasticsearch::Model::Client::ClassMethods
ISSUE_TRACKED_FIELDS = %w(assignee_ids author_id confidential).freeze
# @param indexing [Boolean] determines whether operation is "indexing" or "updating"
def execute(record, indexing, options = {})
record.__elasticsearch__.client = client
import(record, record.class.nested?, indexing)
initial_index_project(record) if record.class == Project && indexing
update_issue_notes(record, options["changed_fields"]) if record.class == Issue
rescue Elasticsearch::Transport::Transport::Errors::NotFound, ActiveRecord::RecordNotFound
# These errors can happen in several cases, including:
# - A record is updated, then removed before the update is handled
# - Indexing is enabled, but not every item has been indexed yet - updating
# and deleting the un-indexed records will raise exception
#
# We can ignore these.
true
end
private
def update_issue_notes(record, changed_fields)
if changed_fields && (changed_fields & ISSUE_TRACKED_FIELDS).any?
Note.es_import query: -> { where(noteable: record) }
end
end
def initial_index_project(project)
project.each_indexed_association do |klass, objects|
nested = klass.nested?
objects.find_each { |object| import(object, nested, true) }
end
# Finally, index blobs/commits/wikis
ElasticCommitIndexerWorker.perform_async(project.id)
end
def import(record, nested, indexing)
operation = indexing ? 'index_document' : 'update_document'
if nested
record.__elasticsearch__.__send__ operation, routing: record.es_parent # rubocop:disable GitlabSecurity/PublicSend
else
record.__elasticsearch__.__send__ operation # rubocop:disable GitlabSecurity/PublicSend
end
end
end
end
......@@ -82,3 +82,11 @@
AWS Secret Access Key. Only required if not using role instance credentials
= f.submit 'Save changes', class: "btn btn-success"
- if Feature.enabled?(:elasticsearch_web_indexing)
%br
%h4
= _('Indexing')
= form_with url: admin_elasticsearch_enqueue_index_path, html: { class: 'fieldset-form' }, data: { remote: false } do |f|
= f.submit _('Start Indexing'), class: "btn btn-success"
......@@ -54,6 +54,7 @@
- elastic_namespace_indexer
- elastic_commit_indexer
- elastic_indexer
- elastic_full_index
- export_csv
- ldap_group_sync
- new_epic
......
# frozen_string_literal: true
# For each project in range,
# indexing the repository, wiki and its nested models
# (e.g. )issues and notes etc.)
# Intended for full site indexing.
class ElasticFullIndexWorker
include ApplicationWorker
sidekiq_options retry: 2
def perform(start_id, end_id)
return true unless Gitlab::CurrentSettings.elasticsearch_indexing?
Project.id_in(start_id..end_id).find_each do |project|
Elastic::IndexRecordService.new.execute(project, true)
end
end
end
......@@ -5,8 +5,6 @@ class ElasticIndexerWorker
sidekiq_options retry: 2
ISSUE_TRACKED_FIELDS = %w(assignee_ids author_id confidential).freeze
def perform(operation, class_name, record_id, es_id, options = {})
return true unless Gitlab::CurrentSettings.elasticsearch_indexing?
......@@ -14,14 +12,11 @@ class ElasticIndexerWorker
case operation.to_s
when /index|update/
record = klass.find(record_id)
record.__elasticsearch__.client = client
import(operation, record, klass)
initial_index_project(record) if klass == Project && operation.to_s.match?(/index/)
update_issue_notes(record, options["changed_fields"]) if klass == Issue
Elastic::IndexRecordService.new.execute(
klass.find(record_id),
operation.to_s.match?(/index/),
options
)
when /delete/
if klass.nested?
client.delete(
......@@ -47,40 +42,11 @@ class ElasticIndexerWorker
private
def update_issue_notes(record, changed_fields)
if changed_fields && (changed_fields & ISSUE_TRACKED_FIELDS).any?
Note.es_import query: -> { where(noteable: record) }
end
end
def clear_project_data(record_id, es_id)
remove_children_documents('project', record_id, es_id)
IndexStatus.for_project(record_id).delete_all
end
def initial_index_project(project)
{
Issue => project.issues,
MergeRequest => project.merge_requests,
Snippet => project.snippets,
Note => project.notes.searchable,
Milestone => project.milestones
}.each do |klass, objects|
objects.find_each { |object| import(:index, object, klass) }
end
# Finally, index blobs/commits/wikis
ElasticCommitIndexerWorker.perform_async(project.id)
end
def import(operation, record, klass)
if klass.nested?
record.__elasticsearch__.__send__ "#{operation}_document", routing: record.es_parent # rubocop:disable GitlabSecurity/PublicSend
else
record.__elasticsearch__.__send__ "#{operation}_document" # rubocop:disable GitlabSecurity/PublicSend
end
end
def remove_documents_by_project_id(record_id)
client.delete_by_query({
index: Project.__elasticsearch__.index_name,
......
---
title: Allowing Elasticsearch indexing gap recovering
merge_request: 11408
author:
type: changed
......@@ -44,5 +44,9 @@ namespace :admin do
resources :uploads, only: [:index, :destroy]
end
namespace :elasticsearch do
post :enqueue_index
end
get '/dashboard/stats', to: 'dashboard#stats'
end
# frozen_string_literal: true
require 'spec_helper'
describe Admin::ElasticsearchController do
let(:admin) { create(:admin) }
describe 'POST #enqueue_index' do
before do
sign_in(admin)
end
it 'starts indexing' do
expect_next_instance_of(::Elastic::IndexProjectsService) do |service|
expect(service).to receive(:execute)
end
post :enqueue_index
expect(controller).to set_flash[:notice].to include('/admin/sidekiq/queues/elastic_full_index')
end
context 'when feature disabled' do
it 'does nothing and returns 404' do
stub_feature_flags(elasticsearch_web_indexing: false)
expect(::Elastic::IndexProjectsService).not_to receive(:new)
post :enqueue_index
expect(response).to have_gitlab_http_status(404)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Elastic::IndexProjectsByIdService do
describe '#execute' do
it 'schedules index workers' do
Sidekiq::Testing.fake! do
described_class.new.execute(project_ids: [1, 2], namespace_ids: [3, 4])
end
jobs = Sidekiq::Queues[ElasticFullIndexWorker.queue]
expect(jobs.size).to eq(4)
expect(jobs[0]['args']).to eq(['index', 'Project', 1, nil])
expect(jobs[1]['args']).to eq(['index', 'Project', 2, nil])
expect(jobs[2]['args']).to eq([3, 'index'])
expect(jobs[3]['args']).to eq([4, 'index'])
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Elastic::IndexProjectsByRangeService do
describe '#execute' do
context 'when without project' do
it 'does not err' do
expect(ElasticFullIndexWorker).not_to receive(:bulk_perform_async)
described_class.new.execute
end
end
context 'when range not specified' do
before do
allow(::Project).to receive(:maximum).with(:id).and_return(described_class::DEFAULT_BATCH_SIZE + 1)
end
it 'schedules for all projects' do
expect(ElasticFullIndexWorker).to receive(:bulk_perform_async).with([[1, 1000], [1001, 1001]])
described_class.new.execute
end
it 'respects batch_size setting' do
expect(ElasticFullIndexWorker).to receive(:bulk_perform_async).with([[1, 500], [501, 1000], [1001, 1001]])
described_class.new.execute(batch_size: 500)
end
end
context 'when range specified' do
it 'schedules for projects within range' do
expect(ElasticFullIndexWorker).to receive(:bulk_perform_async).with([[2, 5]])
described_class.new.execute(start_id: 2, end_id: 5)
end
it 'respects batch_size setting' do
expect(ElasticFullIndexWorker).to receive(:bulk_perform_async).with([[501, 1500], [1501, 1501]])
described_class.new.execute(start_id: 501, end_id: 1501, batch_size: 1000)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Elastic::IndexProjectsService do
describe '#execute' do
context 'when elasticsearch_limit_indexing? is true' do
before do
stub_ee_application_setting(elasticsearch_limit_indexing: true)
create(:elasticsearch_indexed_project)
create(:elasticsearch_indexed_namespace)
end
it 'schedules indexing for selected projects and namespaces' do
expect_next_instance_of(::Elastic::IndexProjectsByIdService) do |service|
expect(service).to receive(:execute).with(
project_ids: ElasticsearchIndexedProject.target_ids,
namespace_ids: ElasticsearchIndexedNamespace.target_ids
)
end
subject.execute
end
end
context 'when elasticsearch_limit_indexing? is false' do
before do
stub_ee_application_setting(elasticsearch_limit_indexing: false)
end
it 'schedules indexing for all projects' do
expect_next_instance_of(::Elastic::IndexProjectsByRangeService) do |service|
expect(service).to receive(:execute)
end
subject.execute
end
end
end
end
require 'spec_helper'
describe Elastic::IndexRecordService, :elastic do
subject { described_class.new }
before do
stub_ee_application_setting(elasticsearch_indexing: true)
Elasticsearch::Model.client =
Gitlab::Elastic::Client.build(Gitlab::CurrentSettings.elasticsearch_config)
end
describe 'Indexing, updating, and deleting records' do
using RSpec::Parameterized::TableSyntax
where(:type, :name, :attribute) do
:project | "Project" | :name
:issue | "Issue" | :title
:note | "Note" | :note
:milestone | "Milestone" | :title
:merge_request | "MergeRequest" | :title
end
with_them do
it 'indexes new records' do
object = nil
Sidekiq::Testing.disable! do
object = create(type)
end
expect do
subject.execute(object, true)
Gitlab::Elastic::Helper.refresh_index
end.to change { Elasticsearch::Model.search('*').records.size }.by(1)
end
it 'updates the index when object is changed' do
object = nil
Sidekiq::Testing.disable! do
object = create(type)
subject.execute(object, true)
object.update(attribute => "new")
end
expect do
subject.execute(object, false)
Gitlab::Elastic::Helper.refresh_index
end.to change { Elasticsearch::Model.search('new').records.size }.by(1)
end
end
end
it 'indexes all nested objects for a Project' do
# To be able to access it outside the following block
project = nil
Sidekiq::Testing.disable! do
project = create :project, :repository
create :issue, project: project
create :milestone, project: project
create :note, project: project
create :merge_request, target_project: project, source_project: project
create :project_snippet, project: project
end
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id).and_call_original
# Nothing should be in the index at this point
expect(Elasticsearch::Model.search('*').total_count).to be(0)
Sidekiq::Testing.inline! do
subject.execute(project, true)
end
Gitlab::Elastic::Helper.refresh_index
## All database objects + data from repository. The absolute value does not matter
expect(Elasticsearch::Model.search('*').total_count).to be > 40
end
it 'indexes changes during indexing gap' do
project = nil
note = nil
Sidekiq::Testing.inline! do
project = create :project, :repository
note = create :note, project: project, note: 'note_1'
Gitlab::Elastic::Helper.refresh_index
end
options = { project_ids: [project.id] }
Sidekiq::Testing.disable! do
note.update_columns(note: 'note_2')
create :note, project: project, note: 'note_3'
end
expect(Note.elastic_search('note_1', options: options).present?).to eq(true)
expect(Note.elastic_search('note_2', options: options).present?).to eq(false)
expect(Note.elastic_search('note_3', options: options).present?).to eq(false)
Sidekiq::Testing.inline! do
subject.execute(project, true)
Gitlab::Elastic::Helper.refresh_index
end
expect(Note.elastic_search('note_1', options: options).present?).to eq(false)
expect(Note.elastic_search('note_2', options: options).present?).to eq(true)
expect(Note.elastic_search('note_3', options: options).present?).to eq(true)
end
end
......@@ -30,31 +30,14 @@ describe ElasticIndexerWorker, :elastic do
end
with_them do
it 'indexes new records' do
object = nil
Sidekiq::Testing.disable! do
object = create(type)
end
expect do
subject.perform("index", name, object.id, object.es_id)
Gitlab::Elastic::Helper.refresh_index
end.to change { Elasticsearch::Model.search('*').records.size }.by(1)
end
it 'updates the index when object is changed' do
object = nil
it 'calls record indexing' do
object = create(type)
Sidekiq::Testing.disable! do
object = create(type)
subject.perform("index", name, object.id, object.es_id)
object.update(attribute => "new")
expect_next_instance_of(Elastic::IndexRecordService) do |service|
expect(service).to receive(:execute).with(object, true, {})
end
expect do
subject.perform("update", name, object.id, object.es_id)
Gitlab::Elastic::Helper.refresh_index
end.to change { Elasticsearch::Model.search('new').records.size }.by(1)
subject.perform("index", name, object.id, object.es_id)
end
it 'deletes from index when an object is deleted' do
......@@ -106,31 +89,4 @@ describe ElasticIndexerWorker, :elastic do
expect(Elasticsearch::Model.search('*').total_count).to be(0)
end
it 'indexes all nested objects for a Project' do
# To be able to access it outside the following block
project = nil
Sidekiq::Testing.disable! do
project = create :project, :repository
create :issue, project: project
create :milestone, project: project
create :note, project: project
create :merge_request, target_project: project, source_project: project
create :project_snippet, project: project
end
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id).and_call_original
# Nothing should be in the index at this point
expect(Elasticsearch::Model.search('*').total_count).to be(0)
Sidekiq::Testing.inline! do
subject.perform("index", "Project", project.id, project.es_id)
end
Gitlab::Elastic::Helper.refresh_index
## All database objects + data from repository. The absolute value does not matter
expect(Elasticsearch::Model.search('*').total_count).to be > 40
end
end
......@@ -306,6 +306,9 @@ msgstr ""
msgid "(No changes)"
msgstr ""
msgid "(check progress)"
msgstr ""
msgid "(external source)"
msgstr ""
......@@ -4319,6 +4322,9 @@ msgstr ""
msgid "Elasticsearch"
msgstr ""
msgid "Elasticsearch indexing started"
msgstr ""
msgid "Elasticsearch integration. Elasticsearch AWS IAM."
msgstr ""
......@@ -6672,6 +6678,9 @@ msgstr ""
msgid "Incompatible options set!"
msgstr ""
msgid "Indexing"
msgstr ""
msgid "Indicates whether this runner can pick jobs without tags"
msgstr ""
......@@ -11633,6 +11642,9 @@ msgstr ""
msgid "Start GitLab Ultimate trial"
msgstr ""
msgid "Start Indexing"
msgstr ""
msgid "Start Web Terminal"
msgstr ""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment