Commit 5603e23a authored by Mark Chao's avatar Mark Chao

Allow indexing one project as a whole

A project’s db and repo are indexed as one unit.

Add elastic_full_index queue as central queue to
all full index related jobs.
parent ca2b8839
...@@ -103,6 +103,7 @@ ...@@ -103,6 +103,7 @@
- [admin_emails, 1] - [admin_emails, 1]
- [elastic_batch_project_indexer, 1] - [elastic_batch_project_indexer, 1]
- [elastic_indexer, 1] - [elastic_indexer, 1]
- [elastic_full_index, 1]
- [elastic_commit_indexer, 1] - [elastic_commit_indexer, 1]
- [elastic_namespace_indexer, 1] - [elastic_namespace_indexer, 1]
- [export_csv, 1] - [export_csv, 1]
......
# frozen_string_literal: true
module Elastic
class IndexProjectsByIdService
def execute(project_ids: [], namespace_ids: [])
queue_name = ElasticFullIndexWorker.queue
project_ids.each do |project_id|
ElasticIndexerWorker
.set(queue: queue_name)
.perform_async(:index, 'Project', project_id, nil)
end
namespace_ids.each do |namespace_id|
ElasticNamespaceIndexerWorker
.set(queue: queue_name)
.perform_async(namespace_id, :index)
end
end
end
end
# frozen_string_literal: true
module Elastic
class IndexProjectsByRangeService
DEFAULT_BATCH_SIZE = 1000
BULK_PERFORM_SIZE = 1000
def execute(start_id: nil, end_id: nil, batch_size: nil)
end_id ||= ::Project.maximum(:id)
return unless end_id
start_id ||= 1
batch_size ||= DEFAULT_BATCH_SIZE
args = (start_id..end_id).each_slice(batch_size).map do |range|
[range.first, range.last]
end
args.each_slice(BULK_PERFORM_SIZE) do |args|
ElasticFullIndexWorker.bulk_perform_async(args)
end
end
end
end
# frozen_string_literal: true
module Elastic
class IndexProjectsService
def execute
if Gitlab::CurrentSettings.elasticsearch_limit_indexing?
IndexProjectsByIdService.new.execute(
project_ids: ElasticsearchIndexedProject.target_ids,
namespace_ids: ElasticsearchIndexedNamespace.target_ids
)
else
IndexProjectsByRangeService.new.execute
end
end
end
end
...@@ -53,6 +53,7 @@ ...@@ -53,6 +53,7 @@
- elastic_namespace_indexer - elastic_namespace_indexer
- elastic_commit_indexer - elastic_commit_indexer
- elastic_indexer - elastic_indexer
- elastic_full_index
- export_csv - export_csv
- ldap_group_sync - ldap_group_sync
- new_epic - new_epic
......
# frozen_string_literal: true
# For each project in range,
# indexing the repository, wiki and its nested models
# (e.g. )issues and notes etc.)
# Intended for full site indexing.
class ElasticFullIndexWorker
include ApplicationWorker
sidekiq_options retry: 2
def perform(start_id, end_id)
return true unless Gitlab::CurrentSettings.elasticsearch_indexing?
Project.id_in(start_id..end_id).find_each do |project|
ElasticIndexerWorker.new.index(:index, project)
end
end
end
...@@ -14,14 +14,7 @@ class ElasticIndexerWorker ...@@ -14,14 +14,7 @@ class ElasticIndexerWorker
case operation.to_s case operation.to_s
when /index|update/ when /index|update/
record = klass.find(record_id) index(operation, klass.find(record_id), options)
record.__elasticsearch__.client = client
import(operation, record, klass)
initial_index_project(record) if klass == Project && operation.to_s.match?(/index/)
update_issue_notes(record, options["changed_fields"]) if klass == Issue
when /delete/ when /delete/
if klass.nested? if klass.nested?
client.delete( client.delete(
...@@ -45,6 +38,16 @@ class ElasticIndexerWorker ...@@ -45,6 +38,16 @@ class ElasticIndexerWorker
true true
end end
def index(operation, record, options = {})
record.__elasticsearch__.client = client
import(operation, record, record.class)
initial_index_project(record) if record.class == Project && operation.to_s.match?(/index/)
update_issue_notes(record, options["changed_fields"]) if record.class == Issue
end
private private
def update_issue_notes(record, changed_fields) def update_issue_notes(record, changed_fields)
......
# frozen_string_literal: true
require 'spec_helper'
describe Elastic::IndexProjectsByIdService do
describe '#execute' do
it 'schedules index workers' do
Sidekiq::Testing.fake! do
described_class.new.execute(project_ids: [1, 2], namespace_ids: [3, 4])
end
jobs = Sidekiq::Queues[ElasticFullIndexWorker.queue]
expect(jobs.size).to eq(4)
expect(jobs[0]['args']).to eq(['index', 'Project', 1, nil])
expect(jobs[1]['args']).to eq(['index', 'Project', 2, nil])
expect(jobs[2]['args']).to eq([3, 'index'])
expect(jobs[3]['args']).to eq([4, 'index'])
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Elastic::IndexProjectsByRangeService do
describe '#execute' do
context 'when without project' do
it 'does not err' do
expect(ElasticFullIndexWorker).not_to receive(:bulk_perform_async)
described_class.new.execute
end
end
context 'when range not specified' do
before do
allow(::Project).to receive(:maximum).with(:id).and_return(described_class::DEFAULT_BATCH_SIZE + 1)
end
it 'schedules for all projects' do
expect(ElasticFullIndexWorker).to receive(:bulk_perform_async).with([[1, 1000], [1001, 1001]])
described_class.new.execute
end
it 'respects batch_size setting' do
expect(ElasticFullIndexWorker).to receive(:bulk_perform_async).with([[1, 500], [501, 1000], [1001, 1001]])
described_class.new.execute(batch_size: 500)
end
end
context 'when range specified' do
it 'schedules for projects within range' do
expect(ElasticFullIndexWorker).to receive(:bulk_perform_async).with([[2, 5]])
described_class.new.execute(start_id: 2, end_id: 5)
end
it 'respects batch_size setting' do
expect(ElasticFullIndexWorker).to receive(:bulk_perform_async).with([[501, 1500], [1501, 1501]])
described_class.new.execute(start_id: 501, end_id: 1501, batch_size: 1000)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Elastic::IndexProjectsService do
describe '#execute' do
context 'when elasticsearch_limit_indexing? is true' do
before do
stub_ee_application_setting(elasticsearch_limit_indexing: true)
create(:elasticsearch_indexed_project)
create(:elasticsearch_indexed_namespace)
end
it 'schedules indexing for selected projects and namespaces' do
expect_next_instance_of(::Elastic::IndexProjectsByIdService) do |service|
expect(service).to receive(:execute).with(
project_ids: ElasticsearchIndexedProject.target_ids,
namespace_ids: ElasticsearchIndexedNamespace.target_ids
)
end
subject.execute
end
end
context 'when elasticsearch_limit_indexing? is false' do
before do
stub_ee_application_setting(elasticsearch_limit_indexing: false)
end
it 'schedules indexing for all projects' do
expect_next_instance_of(::Elastic::IndexProjectsByRangeService) do |service|
expect(service).to receive(:execute)
end
subject.execute
end
end
end
end
...@@ -133,4 +133,35 @@ describe ElasticIndexerWorker, :elastic do ...@@ -133,4 +133,35 @@ describe ElasticIndexerWorker, :elastic do
## All database objects + data from repository. The absolute value does not matter ## All database objects + data from repository. The absolute value does not matter
expect(Elasticsearch::Model.search('*').total_count).to be > 40 expect(Elasticsearch::Model.search('*').total_count).to be > 40
end end
it 'indexes changes during indexing gap' do
project = nil
note = nil
Sidekiq::Testing.inline! do
project = create :project, :repository
note = create :note, project: project, note: 'note_1'
Gitlab::Elastic::Helper.refresh_index
end
options = { project_ids: [project.id] }
Sidekiq::Testing.disable! do
note.update_columns(note: 'note_2')
create :note, project: project, note: 'note_3'
end
expect(Note.elastic_search('note_1', options: options).present?).to eq(true)
expect(Note.elastic_search('note_2', options: options).present?).to eq(false)
expect(Note.elastic_search('note_3', options: options).present?).to eq(false)
Sidekiq::Testing.inline! do
subject.perform("index", "Project", project.id, project.es_id)
Gitlab::Elastic::Helper.refresh_index
end
expect(Note.elastic_search('note_1', options: options).present?).to eq(false)
expect(Note.elastic_search('note_2', options: options).present?).to eq(true)
expect(Note.elastic_search('note_3', options: options).present?).to eq(true)
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment