Commit 2fd3b49e authored by Dmitry Gruzd's avatar Dmitry Gruzd Committed by Dylan Griffith

Use bulk indexer for projects

Handle project create/update/delete actions with bulk-incremental
indexer.
parent f4ef30ae
......@@ -487,6 +487,11 @@ production: &base
# NOTE: This will only take effect if elasticsearch is enabled.
elastic_index_bulk_cron_worker:
cron: "*/1 * * * *"
# Elasticsearch bulk updater for initial updates.
# NOTE: This will only take effect if elasticsearch is enabled.
elastic_index_initial_bulk_cron_worker:
cron: "*/1 * * * *"
registry:
# enabled: true
......
......@@ -561,6 +561,9 @@ Gitlab.ee do
Settings.cron_jobs['elastic_index_bulk_cron_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['elastic_index_bulk_cron_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['elastic_index_bulk_cron_worker']['job_class'] ||= 'ElasticIndexBulkCronWorker'
Settings.cron_jobs['elastic_index_initial_bulk_cron_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['elastic_index_initial_bulk_cron_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['elastic_index_initial_bulk_cron_worker']['job_class'] ||= 'ElasticIndexInitialBulkCronWorker'
Settings.cron_jobs['sync_seat_link_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['sync_seat_link_worker']['cron'] ||= "#{rand(60)} 0 * * *"
Settings.cron_jobs['sync_seat_link_worker']['job_class'] = 'SyncSeatLinkWorker'
......
......@@ -82,6 +82,8 @@
- 1
- - elastic_commit_indexer
- 1
- - elastic_delete_project
- 1
- - elastic_full_index
- 1
- - elastic_indexer
......
......@@ -6,47 +6,17 @@ module Elastic
include ApplicationVersionedSearch
INDEXED_ASSOCIATIONS = [
:issues,
:merge_requests,
:snippets,
:notes,
:milestones
].freeze
included do
def use_elasticsearch?
::Gitlab::CurrentSettings.elasticsearch_indexes_project?(self)
end
# TODO: ElasticIndexerWorker does extra work for project hooks, so we
# can't use the incremental-bulk indexer for projects yet.
#
# https://gitlab.com/gitlab-org/gitlab/issues/207494
def maintain_elasticsearch_create
ElasticIndexerWorker.perform_async(:index, self.class.to_s, self.id, self.es_id)
end
def maintain_elasticsearch_update
ElasticIndexerWorker.perform_async(:update, self.class.to_s, self.id, self.es_id)
::Elastic::ProcessInitialBookkeepingService.track!(self)
end
def maintain_elasticsearch_destroy
ElasticIndexerWorker.perform_async(:delete, self.class.to_s, self.id, self.es_id, es_parent: self.es_parent)
end
def each_indexed_association
INDEXED_ASSOCIATIONS.each do |association_name|
association = self.association(association_name)
scope = association.scope
klass = association.klass
if klass == Note
scope = scope.searchable
end
yield klass, scope
end
ElasticDeleteProjectWorker.perform_async(self.id, self.es_id)
end
end
end
......
......@@ -18,19 +18,13 @@ class ElasticsearchIndexedProject < ApplicationRecord
def index
if Gitlab::CurrentSettings.elasticsearch_indexing? && project.searchable?
ElasticIndexerWorker.perform_async(:index, project.class.to_s, project.id, project.es_id)
::Elastic::ProcessInitialBookkeepingService.backfill_projects!(project) # rubocop: disable CodeReuse/ServiceClass
end
end
def delete_from_index
if Gitlab::CurrentSettings.elasticsearch_indexing? && project.searchable?
ElasticIndexerWorker.perform_async(
:delete,
project.class.to_s,
project.id,
project.es_id,
es_parent: project.es_parent
)
ElasticDeleteProjectWorker.perform_async(project.id, project.es_id)
end
end
end
......@@ -3,18 +3,11 @@
module Elastic
class IndexProjectsByIdService
def execute(project_ids: [], namespace_ids: [])
queue_name = ElasticFullIndexWorker.queue
project_ids.each do |project_id|
ElasticIndexerWorker
.set(queue: queue_name)
.perform_async(:index, 'Project', project_id, nil)
end
projects = Project.find(project_ids)
Elastic::ProcessInitialBookkeepingService.backfill_projects!(*projects)
namespace_ids.each do |namespace_id|
ElasticNamespaceIndexerWorker
.set(queue: queue_name)
.perform_async(namespace_id, :index)
ElasticNamespaceIndexerWorker.perform_async(namespace_id, :index)
end
end
end
......
# frozen_string_literal: true
module Elastic
class IndexRecordService
include Elasticsearch::Model::Client::ClassMethods
ImportError = Class.new(StandardError)
IMPORT_RETRY_COUNT = 3
# @param indexing [Boolean] determines whether operation is "indexing" or "updating"
def execute(record, indexing, options = {})
return true unless record.use_elasticsearch?
record.__elasticsearch__.client = client
import(record, indexing)
initial_index_project(record) if record.class == Project && indexing
true
rescue Elasticsearch::Transport::Transport::Errors::NotFound, ActiveRecord::RecordNotFound => e
# These errors can happen in several cases, including:
# - A record is updated, then removed before the update is handled
# - Indexing is enabled, but not every item has been indexed yet - updating
# and deleting the un-indexed records will raise exception
#
# We can ignore these.
logger.error(message: 'elastic_index_record_service_caught_exception', error_class: e.class.name, error_message: e.message)
true
end
private
def initial_index_project(project)
# Enqueue the repository indexing jobs immediately so they run in parallel
# One for the project repository, one for the wiki repository
ElasticCommitIndexerWorker.perform_async(project.id)
ElasticCommitIndexerWorker.perform_async(project.id, nil, nil, true)
project.each_indexed_association do |klass, association|
import_association(association)
end
end
def import_association(association, options = {})
options[:return] = 'errors'
errors = association.es_import(options)
return if errors.empty?
IMPORT_RETRY_COUNT.times do
errors = retry_import(errors, association, options)
return if errors.empty?
end
raise ImportError.new(errors.inspect)
end
def import(record, indexing)
operation = indexing ? 'index_document' : 'update_document'
response = nil
IMPORT_RETRY_COUNT.times do
response = if record.es_parent
record.__elasticsearch__.__send__ operation, routing: record.es_parent # rubocop:disable GitlabSecurity/PublicSend
else
record.__elasticsearch__.__send__ operation # rubocop:disable GitlabSecurity/PublicSend
end
return if response['_shards']['successful'] > 0
end
raise ImportError.new(response)
end
def retry_import(errors, association, options)
ids = errors.map { |error| error['index']['_id'][/_(\d+)$/, 1] }
association.id_in(ids).es_import(options)
end
def logger
@logger ||= ::Gitlab::Elasticsearch::Logger.build
end
end
end
......@@ -16,13 +16,17 @@ module Elastic
with_redis do |redis|
# Efficiently generate a guaranteed-unique score for each item
max = redis.incrby(REDIS_SCORE_KEY, items.size)
max = redis.incrby(self::REDIS_SCORE_KEY, items.size)
min = (max - items.size) + 1
(min..max).zip(items).each_slice(1000) do |group|
logger.debug(message: 'track_items', count: group.count, tracked_items_encoded: group.to_json)
logger.debug(class: self.name,
redis_set: self::REDIS_SET_KEY,
message: 'track_items',
count: group.count,
tracked_items_encoded: group.to_json)
redis.zadd(REDIS_SET_KEY, group)
redis.zadd(self::REDIS_SET_KEY, group)
end
end
......@@ -30,11 +34,11 @@ module Elastic
end
def queue_size
with_redis { |redis| redis.zcard(REDIS_SET_KEY) }
with_redis { |redis| redis.zcard(self::REDIS_SET_KEY) }
end
def clear_tracking!
with_redis { |redis| redis.del(REDIS_SET_KEY, REDIS_SCORE_KEY) }
with_redis { |redis| redis.del(self::REDIS_SET_KEY, self::REDIS_SCORE_KEY) }
end
def logger
......@@ -56,7 +60,7 @@ module Elastic
def execute_with_redis(redis)
start_time = Time.current
specs = redis.zrangebyscore(REDIS_SET_KEY, '-inf', '+inf', limit: [0, LIMIT], with_scores: true)
specs = redis.zrangebyscore(self.class::REDIS_SET_KEY, '-inf', '+inf', limit: [0, LIMIT], with_scores: true)
return 0 if specs.empty?
first_score = specs.first.last
......@@ -77,7 +81,7 @@ module Elastic
self.class.track!(*failures) if failures.present?
# Remove all the successes
redis.zremrangebyscore(REDIS_SET_KEY, first_score, last_score)
redis.zremrangebyscore(self.class::REDIS_SET_KEY, first_score, last_score)
records_count = specs.count
......
# frozen_string_literal: true
module Elastic
class ProcessInitialBookkeepingService < Elastic::ProcessBookkeepingService
REDIS_SET_KEY = 'elastic:bulk:initial:0:zset'
REDIS_SCORE_KEY = 'elastic:bulk:initial:0:score'
INDEXED_ASSOCIATIONS = [
:issues,
:merge_requests,
:snippets,
:notes,
:milestones
].freeze
class << self
def backfill_projects!(*projects)
track!(*projects)
projects.each do |project|
raise ArgumentError, 'This method only accepts Projects' unless project.is_a?(Project)
maintain_indexed_associations(project)
ElasticCommitIndexerWorker.perform_async(project.id)
ElasticCommitIndexerWorker.perform_async(project.id, nil, nil, true)
end
end
def each_indexed_association(project)
INDEXED_ASSOCIATIONS.each do |association_name|
association = project.association(association_name)
scope = association.scope
klass = association.klass
if klass == Note
scope = scope.searchable
end
yield klass, scope
end
end
private
def maintain_indexed_associations(project)
each_indexed_association(project) do |_, association|
association.find_in_batches do |group|
track!(*group)
end
end
end
end
end
end
......@@ -35,6 +35,14 @@
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:elastic_index_initial_bulk_cron
:feature_category: :global_search
:has_external_dependencies:
:urgency: :throttled
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:geo_container_repository_sync_dispatch
:feature_category: :geo_replication
:has_external_dependencies:
......@@ -563,6 +571,14 @@
:weight: 1
:idempotent: true
:tags: []
- :name: elastic_delete_project
:feature_category: :global_search
:has_external_dependencies:
:urgency: :throttled
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: elastic_full_index
:feature_category: :global_search
:has_external_dependencies:
......
# frozen_string_literal: true
module Elastic
module BulkCronWorker
extend ActiveSupport::Concern
included do
include ApplicationWorker
include Gitlab::ExclusiveLeaseHelpers
# There is no onward scheduling and this cron handles work from across the
# application, so there's no useful context to add.
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
end
def perform
if Elastic::IndexingControl.non_cached_pause_indexing?
logger.info(message: "elasticsearch_pause_indexing setting is enabled. #{self.class} execution is skipped.")
return false
end
in_lock(self.class.name.underscore, ttl: 10.minutes, retries: 10, sleep_sec: 1) do
records_count = service.execute
log_extra_metadata_on_done(:records_count, records_count)
end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
# We're scheduled on a cronjob, so nothing to do here
end
private
def logger
Elastic::IndexingControl.logger
end
end
end
......@@ -3,7 +3,7 @@
# Concern for pausing/unpausing elasticsearch indexing workers
module Elastic
module IndexingControl
WORKERS = [ElasticCommitIndexerWorker, ElasticIndexerWorker].freeze
WORKERS = [ElasticCommitIndexerWorker, ElasticDeleteProjectWorker].freeze
def perform(*args)
if Elastic::IndexingControl.non_cached_pause_indexing? && WORKERS.include?(self.class)
......
# frozen_string_literal: true
class ElasticDeleteProjectWorker
include ApplicationWorker
include Elasticsearch::Model::Client::ClassMethods
prepend Elastic::IndexingControl
sidekiq_options retry: 2
feature_category :global_search
urgency :throttled
idempotent!
def perform(project_id, es_id)
remove_project_and_children_documents(project_id, es_id)
end
private
def remove_project_and_children_documents(project_id, es_id)
client.delete_by_query({
index: Project.__elasticsearch__.index_name,
routing: es_id,
body: {
query: {
bool: {
should: [
{
has_parent: {
parent_type: 'project',
query: {
term: { id: project_id }
}
}
},
{
term: {
_id: es_id
}
}
]
}
}
}
})
end
end
......@@ -13,16 +13,8 @@ class ElasticFullIndexWorker # rubocop:disable Scalability/IdempotentWorker
def perform(start_id, end_id)
return true unless Gitlab::CurrentSettings.elasticsearch_indexing?
failed_ids = []
Project.id_in(start_id..end_id).find_each do |project|
Elastic::IndexRecordService.new.execute(project, true)
rescue Elastic::IndexRecordService::ImportError
failed_ids << project.id
end
if failed_ids.present?
Elastic::IndexProjectsByIdService.new.execute(project_ids: failed_ids)
Elastic::ProcessInitialBookkeepingService.backfill_projects!(project)
end
end
end
# frozen_string_literal: true
class ElasticIndexBulkCronWorker
include ApplicationWorker
include Gitlab::ExclusiveLeaseHelpers
# There is no onward scheduling and this cron handles work from across the
# application, so there's no useful context to add.
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
include Elastic::BulkCronWorker
feature_category :global_search
idempotent!
urgency :throttled
def perform
if Elastic::IndexingControl.non_cached_pause_indexing?
logger.info(message: 'elasticsearch_pause_indexing setting is enabled. ElasticBulkCronWorker execution is skipped.')
return false
end
in_lock(self.class.name.underscore, ttl: 10.minutes, retries: 10, sleep_sec: 1) do
records_count = Elastic::ProcessBookkeepingService.new.execute
log_extra_metadata_on_done(:records_count, records_count)
end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
# We're scheduled on a cronjob, so nothing to do here
end
private
def logger
Elastic::IndexingControl.logger
def service
Elastic::ProcessBookkeepingService.new
end
end
# frozen_string_literal: true
class ElasticIndexInitialBulkCronWorker
include Elastic::BulkCronWorker
feature_category :global_search
idempotent!
urgency :throttled
private
def service
Elastic::ProcessInitialBookkeepingService.new
end
end
# frozen_string_literal: true
# Usage of this worker is deprecated, please remove it in the next major version
class ElasticIndexerWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
include Elasticsearch::Model::Client::ClassMethods
prepend Elastic::IndexingControl
sidekiq_options retry: 2
feature_category :global_search
......@@ -13,65 +12,15 @@ class ElasticIndexerWorker # rubocop:disable Scalability/IdempotentWorker
return true unless Gitlab::CurrentSettings.elasticsearch_indexing?
klass = class_name.constantize
record = klass.find(record_id)
case operation.to_s
when /index|update/
Elastic::IndexRecordService.new.execute(
klass.find(record_id),
operation.to_s.match?(/index/),
options
)
when /index/
record.maintain_elasticsearch_create
when /update/
record.maintain_elasticsearch_update
when /delete/
if options['es_parent']
client.delete(
index: klass.index_name,
type: klass.document_type,
id: es_id,
routing: options['es_parent']
)
else
clear_project_data(record_id, es_id) if klass == Project
client.delete index: klass.index_name, type: klass.document_type, id: es_id
end
record.maintain_elasticsearch_destroy
end
rescue Elasticsearch::Transport::Transport::Errors::NotFound, ActiveRecord::RecordNotFound => e
# These errors can happen in several cases, including:
# - A record is updated, then removed before the update is handled
# - Indexing is enabled, but not every item has been indexed yet - updating
# and deleting the un-indexed records will raise exception
#
# We can ignore these.
logger.error(message: 'elastic_indexer_worker_caught_exception', error_class: e.class.name, error_message: e.message)
true
end
private
def clear_project_data(record_id, es_id)
remove_children_documents('project', record_id, es_id)
IndexStatus.for_project(record_id).delete_all
end
def remove_children_documents(parent_type, parent_record_id, parent_es_id)
client.delete_by_query({
index: Project.__elasticsearch__.index_name,
routing: parent_es_id,
body: {
query: {
has_parent: {
parent_type: parent_type,
query: {
term: { id: parent_record_id }
}
}
}
}
})
end
def logger
@logger ||= ::Gitlab::Elasticsearch::Logger.build
end
end
......@@ -23,18 +23,15 @@ class ElasticNamespaceIndexerWorker # rubocop:disable Scalability/IdempotentWork
private
def index_projects(namespace)
# The default of 1000 is good for us since Sidekiq documentation doesn't recommend more than 1000 per batch call
# https://www.rubydoc.info/github/mperham/sidekiq/Sidekiq%2FClient:push_bulk
namespace.all_projects.find_in_batches do |batch|
args = batch.map { |project| [:index, project.class.to_s, project.id, project.es_id] }
ElasticIndexerWorker.bulk_perform_async(args) # rubocop:disable Scalability/BulkPerformWithContext
::Elastic::ProcessInitialBookkeepingService.backfill_projects!(*batch)
end
end
def delete_from_index(namespace)
namespace.all_projects.find_in_batches do |batch|
args = batch.map { |project| [:delete, project.class.to_s, project.id, project.es_id] }
ElasticIndexerWorker.bulk_perform_async(args) # rubocop:disable Scalability/BulkPerformWithContext
args = batch.map { |project| [project.id, project.es_id] }
ElasticDeleteProjectWorker.bulk_perform_async(args) # rubocop:disable Scalability/BulkPerformWithContext
end
end
end
......@@ -6,8 +6,11 @@ module Elastic
return unless elasticsearch_enabled?
return unless prometheus_enabled?
gauge = Gitlab::Metrics.gauge(:global_search_bulk_cron_queue_size, 'Number of database records waiting to be synchronized to Elasticsearch', {}, :max)
gauge.set({}, Elastic::ProcessBookkeepingService.queue_size)
incremental_gauge = Gitlab::Metrics.gauge(:global_search_bulk_cron_queue_size, 'Number of incremental database updates waiting to be synchronized to Elasticsearch', {}, :max)
incremental_gauge.set({}, Elastic::ProcessBookkeepingService.queue_size)
initial_gauge = Gitlab::Metrics.gauge(:global_search_bulk_cron_initial_queue_size, 'Number of initial database updates waiting to be synchronized to Elasticsearch', {}, :max)
initial_gauge.set({}, Elastic::ProcessInitialBookkeepingService.queue_size)
end
private
......
......@@ -18,11 +18,7 @@ namespace :gitlab do
print "Enqueuing projects"
project_id_batches do |ids|
args = ids.collect do |id|
[:index, 'Project', id, nil] # es_id is unused for :index
end
ElasticIndexerWorker.bulk_perform_async(args) # rubocop:disable Scalability/BulkPerformWithContext
::Elastic::ProcessInitialBookkeepingService.backfill_projects!(*Project.find(ids))
print "."
end
......
......@@ -9,7 +9,6 @@ RSpec.describe 'Repository index', :elastic do
before do
stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
ElasticIndexerWorker.new.perform("index", "Project", project.id, project.es_id)
end
it 'indexes initial push' do
......
......@@ -57,9 +57,7 @@ RSpec.describe Gitlab::Elastic::Indexer do
let(:to_sha) { project.repository.commit.sha }
before do
# enable the indexing and index the project
stub_ee_application_setting(elasticsearch_indexing: true)
Elastic::IndexRecordService.new.execute(project, true)
end
shared_examples 'index up to the specified commit' do
......
......@@ -47,9 +47,6 @@ RSpec.describe Project, :elastic do
it 'only indexes enabled projects' do
Sidekiq::Testing.inline! do
# We have to trigger indexing of the previously-created project because we don't have a way to
# enable ES for it before it's created, at which point it won't be indexed anymore
ElasticIndexerWorker.perform_async(:index, project.class.to_s, project.id, project.es_id)
create :project, path: 'test2', description: 'awesome project'
create :project
......@@ -78,18 +75,18 @@ RSpec.describe Project, :elastic do
it 'indexes only projects under the group' do
Sidekiq::Testing.inline! do
create :project, name: 'test1', group: create(:group, parent: group)
create :project, name: 'test2', description: 'awesome project'
create :project, name: 'test3', group: group
create :project, name: 'group_test1', group: create(:group, parent: group)
create :project, name: 'group_test2', description: 'awesome project'
create :project, name: 'group_test3', group: group
create :project, path: 'someone_elses_project', name: 'test4'
ensure_elasticsearch_index!
end
expect(described_class.elastic_search('test*', options: { project_ids: :any }).total_count).to eq(2)
expect(described_class.elastic_search('test3', options: { project_ids: :any }).total_count).to eq(1)
expect(described_class.elastic_search('test2', options: { project_ids: :any }).total_count).to eq(0)
expect(described_class.elastic_search('test4', options: { project_ids: :any }).total_count).to eq(0)
expect(described_class.elastic_search('group_test*', options: { project_ids: :any }).total_count).to eq(2)
expect(described_class.elastic_search('group_test3', options: { project_ids: :any }).total_count).to eq(1)
expect(described_class.elastic_search('group_test2', options: { project_ids: :any }).total_count).to eq(0)
expect(described_class.elastic_search('group_test4', options: { project_ids: :any }).total_count).to eq(0)
end
end
end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::ProjectsSearch do
subject do
Class.new do
include Elastic::ProjectsSearch
def id
1
end
def es_id
1
end
end.new
end
describe '#maintain_elasticsearch_create' do
it 'calls track!' do
expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).and_return(true)
subject.maintain_elasticsearch_create
end
end
describe '#maintain_elasticsearch_destroy' do
it 'calls delete worker' do
expect(ElasticDeleteProjectWorker).to receive(:perform_async)
subject.maintain_elasticsearch_destroy
end
end
end
......@@ -11,10 +11,10 @@ RSpec.describe ElasticsearchIndexedProject do
let(:container) { :elasticsearch_indexed_project }
let(:attribute) { :project_id }
let(:index_action) do
expect(ElasticIndexerWorker).to receive(:perform_async).with(:index, 'Project', subject.project_id, any_args)
expect(Elastic::ProcessBookkeepingService).to receive(:track!).with(subject.project)
end
let(:delete_action) do
expect(ElasticIndexerWorker).to receive(:perform_async).with(:delete, 'Project', subject.project_id, any_args)
expect(ElasticDeleteProjectWorker).to receive(:perform_async).with(subject.project.id, subject.project.es_id)
end
end
end
......@@ -5,17 +5,20 @@ require 'spec_helper'
RSpec.describe Elastic::IndexProjectsByIdService do
describe '#execute' do
it 'schedules index workers' do
project1 = create(:project)
project2 = create(:project)
expect(Elastic::ProcessInitialBookkeepingService).to receive(:backfill_projects!).with(project1, project2)
Sidekiq::Testing.fake! do
described_class.new.execute(project_ids: [1, 2], namespace_ids: [3, 4])
described_class.new.execute(project_ids: [project1.id, project2.id], namespace_ids: [3, 4])
end
jobs = Sidekiq::Queues[ElasticFullIndexWorker.queue]
jobs = Sidekiq::Queues[ElasticNamespaceIndexerWorker.queue]
expect(jobs.size).to eq(4)
expect(jobs[0]['args']).to eq(['index', 'Project', 1, nil])
expect(jobs[1]['args']).to eq(['index', 'Project', 2, nil])
expect(jobs[2]['args']).to eq([3, 'index'])
expect(jobs[3]['args']).to eq([4, 'index'])
expect(jobs.size).to eq(2)
expect(jobs[0]['args']).to eq([3, 'index'])
expect(jobs[1]['args']).to eq([4, 'index'])
end
end
end
This diff is collapsed.
......@@ -11,15 +11,22 @@ RSpec.describe Elastic::MetricsUpdateService, :prometheus do
end
describe '#execute' do
it 'sets a gauge for global_search_bulk_cron_queue_size' do
it 'sets gauges' do
expect(Elastic::ProcessBookkeepingService).to receive(:queue_size).and_return(4)
expect(Elastic::ProcessInitialBookkeepingService).to receive(:queue_size).and_return(6)
gauge_double = instance_double(Prometheus::Client::Gauge)
incremental_gauge_double = instance_double(Prometheus::Client::Gauge)
expect(Gitlab::Metrics).to receive(:gauge)
.with(:global_search_bulk_cron_queue_size, anything, {}, :max)
.and_return(gauge_double)
.and_return(incremental_gauge_double)
expect(gauge_double).to receive(:set).with({}, 4)
initial_gauge_double = instance_double(Prometheus::Client::Gauge)
expect(Gitlab::Metrics).to receive(:gauge)
.with(:global_search_bulk_cron_initial_queue_size, anything, {}, :max)
.and_return(initial_gauge_double)
expect(incremental_gauge_double).to receive(:set).with({}, 4)
expect(initial_gauge_double).to receive(:set).with({}, 6)
subject.execute
end
......
# frozen_string_literal: true
require 'spec_helper'
describe Elastic::ProcessInitialBookkeepingService do
let(:project) { create(:project) }
let(:issue) { create(:issue) }
describe '.backfill_projects!' do
it 'calls initial project indexing' do
expect(described_class).to receive(:maintain_indexed_associations)
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id)
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id, nil, nil, true)
described_class.backfill_projects!(project)
end
it 'raises an exception if non project is provided' do
expect { described_class.backfill_projects!(issue) }.to raise_error(ArgumentError)
end
it 'uses a separate queue' do
expect { described_class.backfill_projects!(project) }.not_to change { Elastic::ProcessBookkeepingService.queue_size }
end
end
end
......@@ -84,12 +84,9 @@ RSpec.describe Groups::TransferService, '#execute' do
project2 = create(:project, :repository, :public, namespace: group)
project3 = create(:project, :repository, :private, namespace: group)
expect(ElasticIndexerWorker).to receive(:perform_async)
.with(:update, "Project", project1.id, project1.es_id)
expect(ElasticIndexerWorker).to receive(:perform_async)
.with(:update, "Project", project2.id, project2.es_id)
expect(ElasticIndexerWorker).not_to receive(:perform_async)
.with(:update, "Project", project3.id, project3.es_id)
expect(Elastic::ProcessBookkeepingService).to receive(:track!).with(project1)
expect(Elastic::ProcessBookkeepingService).to receive(:track!).with(project2)
expect(Elastic::ProcessBookkeepingService).not_to receive(:track!).with(project3)
transfer_service.execute(new_group)
......
......@@ -4,6 +4,7 @@ module ElasticsearchHelpers
def ensure_elasticsearch_index!
# Ensure that any enqueued updates are processed
Elastic::ProcessBookkeepingService.new.execute
Elastic::ProcessInitialBookkeepingService.new.execute
# Make any documents added to the index visible
refresh_index!
......
......@@ -32,10 +32,9 @@ RSpec.describe 'gitlab:elastic namespace rake tasks', :elastic do
end
it 'queues jobs for each project batch' do
expect(ElasticIndexerWorker).to receive(:bulk_perform_async).with([
[:index, 'Project', project1.id, nil],
[:index, 'Project', project2.id, nil]
])
expect(Elastic::ProcessInitialBookkeepingService).to receive(:backfill_projects!).with(
project1, project2
)
run_rake_task 'gitlab:elastic:index_projects'
end
......@@ -55,10 +54,9 @@ RSpec.describe 'gitlab:elastic namespace rake tasks', :elastic do
end
it 'does not queue jobs for projects that should not be indexed' do
expect(ElasticIndexerWorker).to receive(:bulk_perform_async).with([
[:index, 'Project', project1.id, nil],
[:index, 'Project', project3.id, nil]
])
expect(Elastic::ProcessInitialBookkeepingService).to receive(:backfill_projects!).with(
project1, project3
)
run_rake_task 'gitlab:elastic:index_projects'
end
......
......@@ -24,74 +24,84 @@ RSpec.describe Elastic::IndexingControl do
let(:worker_args) { [project.id] }
let(:worker_context) { { 'correlation_id' => 'context_correlation_id' } }
before do
stub_const("Elastic::IndexingControl::WORKERS", [worker.class])
end
describe '.non_cached_pause_indexing?' do
it 'calls current_without_cache' do
expect(ApplicationSetting).to receive(:where).with(elasticsearch_pause_indexing: true).and_return(ApplicationSetting.none)
expect(described_class.non_cached_pause_indexing?).to be_falsey
describe '::WORKERS' do
it 'only includes classes which inherit from this class' do
described_class::WORKERS.each do |klass|
expect(klass.ancestors.first).to eq(described_class)
end
end
end
describe '.resume_processing!' do
context 'with stub_const' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(false)
stub_const("Elastic::IndexingControl::WORKERS", [worker.class])
end
it 'triggers job processing if there are jobs' do
expect(Elastic::IndexingControlService).to receive(:has_jobs_in_waiting_queue?).with(worker.class).and_return(true)
expect(Elastic::IndexingControlService).to receive(:resume_processing!).with(worker.class)
describe '.non_cached_pause_indexing?' do
it 'calls current_without_cache' do
expect(ApplicationSetting).to receive(:where).with(elasticsearch_pause_indexing: true).and_return(ApplicationSetting.none)
described_class.resume_processing!
expect(described_class.non_cached_pause_indexing?).to be_falsey
end
end
it 'does nothing if no jobs available' do
expect(Elastic::IndexingControlService).to receive(:has_jobs_in_waiting_queue?).with(worker.class).and_return(false)
expect(Elastic::IndexingControlService).not_to receive(:resume_processing!)
describe '.resume_processing!' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(false)
end
described_class.resume_processing!
end
end
it 'triggers job processing if there are jobs' do
expect(Elastic::IndexingControlService).to receive(:has_jobs_in_waiting_queue?).with(worker.class).and_return(true)
expect(Elastic::IndexingControlService).to receive(:resume_processing!).with(worker.class)
context 'with elasticsearch indexing paused' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(true)
end
described_class.resume_processing!
end
it 'adds jobs to the waiting queue' do
expect_any_instance_of(Gitlab::Elastic::Indexer).not_to receive(:run)
expect(Elastic::IndexingControlService).to receive(:add_to_waiting_queue!).with(worker.class, worker_args, worker_context)
it 'does nothing if no jobs available' do
expect(Elastic::IndexingControlService).to receive(:has_jobs_in_waiting_queue?).with(worker.class).and_return(false)
expect(Elastic::IndexingControlService).not_to receive(:resume_processing!)
Labkit::Context.with_context(worker_context) do
worker.perform(*worker_args)
described_class.resume_processing!
end
end
it 'ignores changes from a different worker' do
stub_const("Elastic::IndexingControl::WORKERS", [])
context 'with elasticsearch indexing paused' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(true)
end
expect_any_instance_of(Gitlab::Elastic::Indexer).to receive(:run)
expect(Elastic::IndexingControlService).not_to receive(:add_to_waiting_queue!)
it 'adds jobs to the waiting queue' do
expect_any_instance_of(Gitlab::Elastic::Indexer).not_to receive(:run)
expect(Elastic::IndexingControlService).to receive(:add_to_waiting_queue!).with(worker.class, worker_args, worker_context)
worker.perform(*worker_args)
end
end
Labkit::Context.with_context(worker_context) do
worker.perform(*worker_args)
end
end
context 'with elasticsearch indexing unpaused' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(false)
it 'ignores changes from a different worker' do
stub_const("Elastic::IndexingControl::WORKERS", [])
expect_any_instance_of(Gitlab::Elastic::Indexer).to receive(:run)
expect(Elastic::IndexingControlService).not_to receive(:add_to_waiting_queue!)
worker.perform(*worker_args)
end
end
it 'performs the job' do
expect_next_instance_of(Gitlab::Elastic::Indexer) do |indexer|
expect(indexer).to receive(:run)
context 'with elasticsearch indexing unpaused' do
before do
expect(Elastic::IndexingControl).to receive(:non_cached_pause_indexing?).and_return(false)
end
expect(Elastic::IndexingControlService).not_to receive(:track!)
worker.perform(*worker_args)
it 'performs the job' do
expect_next_instance_of(Gitlab::Elastic::Indexer) do |indexer|
expect(indexer).to receive(:run)
end
expect(Elastic::IndexingControlService).not_to receive(:track!)
worker.perform(*worker_args)
end
end
end
end
......@@ -61,8 +61,6 @@ RSpec.describe ElasticBatchProjectIndexerWorker do
end
it 'indexes all projects it receives even if already indexed', :sidekiq_might_not_need_inline do
projects.first.index_status.update!(last_commit: 'foo')
expect_index(projects.first).and_call_original
expect_next_instance_of(Gitlab::Elastic::Indexer) do |indexer|
expect(indexer).to receive(:run)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ElasticDeleteProjectWorker, :elastic do
subject { described_class.new }
# Create admin user and search globally to avoid dealing with permissions in
# these tests
let(:user) { create(:admin) }
let(:search_options) { { options: { current_user: user, project_ids: :any } } }
before do
stub_ee_application_setting(elasticsearch_indexing: true)
end
it 'deletes a project with all nested objects' do
project = create :project, :repository
issue = create :issue, project: project
milestone = create :milestone, project: project
note = create :note, project: project
merge_request = create :merge_request, target_project: project, source_project: project
ensure_elasticsearch_index!
## All database objects + data from repository. The absolute value does not matter
expect(Project.elastic_search('*', search_options).records).to include(project)
expect(Issue.elastic_search('*', search_options).records).to include(issue)
expect(Milestone.elastic_search('*', search_options).records).to include(milestone)
expect(Note.elastic_search('*', search_options).records).to include(note)
expect(MergeRequest.elastic_search('*', search_options).records).to include(merge_request)
subject.perform(project.id, project.es_id)
ensure_elasticsearch_index!
expect(Project.elastic_search('*', search_options).total_count).to be(0)
expect(Issue.elastic_search('*', search_options).total_count).to be(0)
expect(Milestone.elastic_search('*', search_options).total_count).to be(0)
expect(Note.elastic_search('*', search_options).total_count).to be(0)
expect(MergeRequest.elastic_search('*', search_options).total_count).to be(0)
end
end
......@@ -11,7 +11,7 @@ RSpec.describe ElasticFullIndexWorker do
it 'does nothing if ES disabled' do
stub_ee_application_setting(elasticsearch_indexing: false)
expect(Elastic::IndexRecordService).not_to receive(:new)
expect(Elastic::ProcessInitialBookkeepingService).not_to receive(:backfill_projects!)
subject.perform(1, 2)
end
......@@ -21,23 +21,7 @@ RSpec.describe ElasticFullIndexWorker do
it 'indexes projects in range' do
projects.each do |project|
expect_next_instance_of(Elastic::IndexRecordService) do |service|
expect(service).to receive(:execute).with(project, true).and_return(true)
end
end
subject.perform(projects.first.id, projects.last.id)
end
it 'retries failed indexing' do
projects.each do |project|
expect_next_instance_of(Elastic::IndexRecordService) do |service|
expect(service).to receive(:execute).with(project, true).and_raise(Elastic::IndexRecordService::ImportError)
end
end
expect_next_instance_of(Elastic::IndexProjectsByIdService) do |service|
expect(service).to receive(:execute).with(project_ids: projects.map(&:id))
expect(Elastic::ProcessInitialBookkeepingService).to receive(:backfill_projects!).with(project)
end
subject.perform(projects.first.id, projects.last.id)
......
......@@ -37,6 +37,7 @@ RSpec.describe ElasticIndexBulkCronWorker do
expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service|
expect(service).to receive(:execute).and_return(15)
end
worker = described_class.new
worker.perform
......
......@@ -2,139 +2,45 @@
require 'spec_helper'
RSpec.describe ElasticIndexerWorker, :elastic do
RSpec.describe ElasticIndexerWorker do
subject { described_class.new }
# Create admin user and search globally to avoid dealing with permissions in
# these tests
let(:user) { create(:admin) }
let(:search_options) { { options: { current_user: user, project_ids: :any } } }
describe '#perform' do
context 'indexing is enabled' do
using RSpec::Parameterized::TableSyntax
before do
stub_ee_application_setting(elasticsearch_indexing: true)
let(:project) { instance_double(Project, id: 1, es_id: 1) }
Elasticsearch::Model.client =
Gitlab::Elastic::Client.build(Gitlab::CurrentSettings.elasticsearch_config)
end
it 'returns true if ES disabled' do
stub_ee_application_setting(elasticsearch_indexing: false)
expect_any_instance_of(Elasticsearch::Model).not_to receive(:__elasticsearch__)
expect(subject.perform("index", "Milestone", 1, 1)).to be_truthy
end
describe 'Indexing, updating, and deleting records' do
using RSpec::Parameterized::TableSyntax
where(:type, :name) do
:project | "Project"
:issue | "Issue"
:note | "Note"
:milestone | "Milestone"
:merge_request | "MergeRequest"
end
with_them do
it 'calls record indexing' do
object = create(type)
expect_next_instance_of(Elastic::IndexRecordService) do |service|
expect(service).to receive(:execute).with(object, true, {}).and_return(true)
end
subject.perform("index", name, object.id, object.es_id)
before do
stub_ee_application_setting(elasticsearch_indexing: true)
expect(Project).to receive(:find).and_return(project)
end
it 'deletes from index when an object is deleted' do
object = nil
Sidekiq::Testing.disable! do
object = create(type)
where(:operation, :method) do
'index' | 'maintain_elasticsearch_create'
'update' | 'maintain_elasticsearch_update'
'delete' | 'maintain_elasticsearch_destroy'
end
if type != :project
# You cannot find anything in the index if it's parent project is
# not first indexed.
subject.perform("index", "Project", object.project.id, object.project.es_id)
end
with_them do
it 'calls respective methods' do
expect(project).to receive(method.to_sym)
subject.perform("index", name, object.id, object.es_id)
ensure_elasticsearch_index!
object.destroy
subject.perform(operation, 'Project', project.id, project.es_id)
end
expect do
subject.perform("delete", name, object.id, object.es_id, { 'es_parent' => object.es_parent })
ensure_elasticsearch_index!
end.to change { object.class.elastic_search('*', search_options).total_count }.by(-1)
end
end
end
it 'deletes a project with all nested objects' do
project, issue, milestone, note, merge_request = nil
Sidekiq::Testing.disable! do
project = create :project, :repository
subject.perform("index", "Project", project.id, project.es_id)
issue = create :issue, project: project
subject.perform("index", "Issue", issue.id, issue.es_id)
milestone = create :milestone, project: project
subject.perform("index", "Milestone", milestone.id, milestone.es_id)
note = create :note, project: project
subject.perform("index", "Note", note.id, note.es_id)
merge_request = create :merge_request, target_project: project, source_project: project
subject.perform("index", "MergeRequest", merge_request.id, merge_request.es_id)
end
ElasticCommitIndexerWorker.new.perform(project.id)
ensure_elasticsearch_index!
## All database objects + data from repository. The absolute value does not matter
expect(Project.elastic_search('*', search_options).records).to include(project)
expect(Issue.elastic_search('*', search_options).records).to include(issue)
expect(Milestone.elastic_search('*', search_options).records).to include(milestone)
expect(Note.elastic_search('*', search_options).records).to include(note)
expect(MergeRequest.elastic_search('*', search_options).records).to include(merge_request)
subject.perform("delete", "Project", project.id, project.es_id)
ensure_elasticsearch_index!
expect(Project.elastic_search('*', search_options).total_count).to be(0)
expect(Issue.elastic_search('*', search_options).total_count).to be(0)
expect(Milestone.elastic_search('*', search_options).total_count).to be(0)
expect(Note.elastic_search('*', search_options).total_count).to be(0)
expect(MergeRequest.elastic_search('*', search_options).total_count).to be(0)
end
it 'retries if index raises error' do
object = create(:project)
expect_next_instance_of(Elastic::IndexRecordService) do |service|
allow(service).to receive(:execute).and_raise(Elastic::IndexRecordService::ImportError)
end
expect do
subject.perform("index", 'Project', object.id, object.es_id)
end.to raise_error(Elastic::IndexRecordService::ImportError)
end
context 'indexing is disabled' do
before do
stub_ee_application_setting(elasticsearch_indexing: false)
end
it 'ignores Elasticsearch::Transport::Transport::Errors::NotFound error' do
object = create(:project)
it 'returns true if ES disabled' do
expect(Milestone).not_to receive(:find).with(1)
expect_next_instance_of(Elastic::IndexRecordService) do |service|
allow(service).to receive(:execute).and_raise(Elasticsearch::Transport::Transport::Errors::NotFound)
expect(subject.perform('index', 'Milestone', 1, 1)).to be_truthy
end
end
expect(subject.perform("index", 'Project', object.id, object.es_id)).to eq(true)
end
it 'ignores missing records' do
expect(subject.perform("index", 'Project', -1, 'project_-1')).to eq(true)
end
end
......@@ -13,7 +13,7 @@ RSpec.describe ElasticNamespaceIndexerWorker, :elastic do
it 'returns true if ES disabled' do
stub_ee_application_setting(elasticsearch_indexing: false)
expect(ElasticIndexerWorker).not_to receive(:perform_async)
expect(Elastic::ProcessInitialBookkeepingService).not_to receive(:backfill_projects!)
expect(subject.perform(1, "index")).to be_truthy
end
......@@ -21,7 +21,7 @@ RSpec.describe ElasticNamespaceIndexerWorker, :elastic do
it 'returns true if limited indexing is not enabled' do
stub_ee_application_setting(elasticsearch_limit_indexing: false)
expect(ElasticIndexerWorker).not_to receive(:perform_async)
expect(Elastic::ProcessInitialBookkeepingService).not_to receive(:backfill_projects!)
expect(subject.perform(1, "index")).to be_truthy
end
......@@ -31,15 +31,14 @@ RSpec.describe ElasticNamespaceIndexerWorker, :elastic do
let(:projects) { create_list :project, 3, namespace: namespace }
it 'indexes all projects belonging to the namespace' do
args = projects.map { |project| [:index, project.class.to_s, project.id, project.es_id] }
expect(ElasticIndexerWorker).to receive(:bulk_perform_async).with(args)
expect(Elastic::ProcessInitialBookkeepingService).to receive(:backfill_projects!).with(*projects)
subject.perform(namespace.id, :index)
end
it 'deletes all projects belonging to the namespace' do
args = projects.map { |project| [:delete, project.class.to_s, project.id, project.es_id] }
expect(ElasticIndexerWorker).to receive(:bulk_perform_async).with(args)
args = projects.map { |project| [project.id, project.es_id] }
expect(ElasticDeleteProjectWorker).to receive(:bulk_perform_async).with(args)
subject.perform(namespace.id, :delete)
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment