Commit e7631cb4 authored by Dylan Griffith's avatar Dylan Griffith

Merge branch '207494-extend-bulk-indexer-scope' into 'master'

Handle projects actions with bulk-incremental indexer

Closes #207494

See merge request gitlab-org/gitlab!33384
parents b68e9d36 2fd3b49e
...@@ -488,6 +488,11 @@ production: &base ...@@ -488,6 +488,11 @@ production: &base
elastic_index_bulk_cron_worker: elastic_index_bulk_cron_worker:
cron: "*/1 * * * *" cron: "*/1 * * * *"
# Elasticsearch bulk updater for initial updates.
# NOTE: This will only take effect if elasticsearch is enabled.
elastic_index_initial_bulk_cron_worker:
cron: "*/1 * * * *"
registry: registry:
# enabled: true # enabled: true
# host: registry.example.com # host: registry.example.com
......
...@@ -561,6 +561,9 @@ Gitlab.ee do ...@@ -561,6 +561,9 @@ Gitlab.ee do
Settings.cron_jobs['elastic_index_bulk_cron_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['elastic_index_bulk_cron_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['elastic_index_bulk_cron_worker']['cron'] ||= '*/1 * * * *' Settings.cron_jobs['elastic_index_bulk_cron_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['elastic_index_bulk_cron_worker']['job_class'] ||= 'ElasticIndexBulkCronWorker' Settings.cron_jobs['elastic_index_bulk_cron_worker']['job_class'] ||= 'ElasticIndexBulkCronWorker'
Settings.cron_jobs['elastic_index_initial_bulk_cron_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['elastic_index_initial_bulk_cron_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['elastic_index_initial_bulk_cron_worker']['job_class'] ||= 'ElasticIndexInitialBulkCronWorker'
Settings.cron_jobs['sync_seat_link_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['sync_seat_link_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['sync_seat_link_worker']['cron'] ||= "#{rand(60)} 0 * * *" Settings.cron_jobs['sync_seat_link_worker']['cron'] ||= "#{rand(60)} 0 * * *"
Settings.cron_jobs['sync_seat_link_worker']['job_class'] = 'SyncSeatLinkWorker' Settings.cron_jobs['sync_seat_link_worker']['job_class'] = 'SyncSeatLinkWorker'
......
...@@ -82,6 +82,8 @@ ...@@ -82,6 +82,8 @@
- 1 - 1
- - elastic_commit_indexer - - elastic_commit_indexer
- 1 - 1
- - elastic_delete_project
- 1
- - elastic_full_index - - elastic_full_index
- 1 - 1
- - elastic_indexer - - elastic_indexer
......
...@@ -6,47 +6,17 @@ module Elastic ...@@ -6,47 +6,17 @@ module Elastic
include ApplicationVersionedSearch include ApplicationVersionedSearch
INDEXED_ASSOCIATIONS = [
:issues,
:merge_requests,
:snippets,
:notes,
:milestones
].freeze
included do included do
def use_elasticsearch? def use_elasticsearch?
::Gitlab::CurrentSettings.elasticsearch_indexes_project?(self) ::Gitlab::CurrentSettings.elasticsearch_indexes_project?(self)
end end
# TODO: ElasticIndexerWorker does extra work for project hooks, so we
# can't use the incremental-bulk indexer for projects yet.
#
# https://gitlab.com/gitlab-org/gitlab/issues/207494
def maintain_elasticsearch_create def maintain_elasticsearch_create
ElasticIndexerWorker.perform_async(:index, self.class.to_s, self.id, self.es_id) ::Elastic::ProcessInitialBookkeepingService.track!(self)
end
def maintain_elasticsearch_update
ElasticIndexerWorker.perform_async(:update, self.class.to_s, self.id, self.es_id)
end end
def maintain_elasticsearch_destroy def maintain_elasticsearch_destroy
ElasticIndexerWorker.perform_async(:delete, self.class.to_s, self.id, self.es_id, es_parent: self.es_parent) ElasticDeleteProjectWorker.perform_async(self.id, self.es_id)
end
def each_indexed_association
INDEXED_ASSOCIATIONS.each do |association_name|
association = self.association(association_name)
scope = association.scope
klass = association.klass
if klass == Note
scope = scope.searchable
end
yield klass, scope
end
end end
end end
end end
......
...@@ -18,19 +18,13 @@ class ElasticsearchIndexedProject < ApplicationRecord ...@@ -18,19 +18,13 @@ class ElasticsearchIndexedProject < ApplicationRecord
def index def index
if Gitlab::CurrentSettings.elasticsearch_indexing? && project.searchable? if Gitlab::CurrentSettings.elasticsearch_indexing? && project.searchable?
ElasticIndexerWorker.perform_async(:index, project.class.to_s, project.id, project.es_id) ::Elastic::ProcessInitialBookkeepingService.backfill_projects!(project) # rubocop: disable CodeReuse/ServiceClass
end end
end end
def delete_from_index def delete_from_index
if Gitlab::CurrentSettings.elasticsearch_indexing? && project.searchable? if Gitlab::CurrentSettings.elasticsearch_indexing? && project.searchable?
ElasticIndexerWorker.perform_async( ElasticDeleteProjectWorker.perform_async(project.id, project.es_id)
:delete,
project.class.to_s,
project.id,
project.es_id,
es_parent: project.es_parent
)
end end
end end
end end
...@@ -3,18 +3,11 @@ ...@@ -3,18 +3,11 @@
module Elastic module Elastic
class IndexProjectsByIdService class IndexProjectsByIdService
def execute(project_ids: [], namespace_ids: []) def execute(project_ids: [], namespace_ids: [])
queue_name = ElasticFullIndexWorker.queue projects = Project.find(project_ids)
Elastic::ProcessInitialBookkeepingService.backfill_projects!(*projects)
project_ids.each do |project_id|
ElasticIndexerWorker
.set(queue: queue_name)
.perform_async(:index, 'Project', project_id, nil)
end
namespace_ids.each do |namespace_id| namespace_ids.each do |namespace_id|
ElasticNamespaceIndexerWorker ElasticNamespaceIndexerWorker.perform_async(namespace_id, :index)
.set(queue: queue_name)
.perform_async(namespace_id, :index)
end end
end end
end end
......
# frozen_string_literal: true
module Elastic
class IndexRecordService
include Elasticsearch::Model::Client::ClassMethods
ImportError = Class.new(StandardError)
IMPORT_RETRY_COUNT = 3
# @param indexing [Boolean] determines whether operation is "indexing" or "updating"
def execute(record, indexing, options = {})
return true unless record.use_elasticsearch?
record.__elasticsearch__.client = client
import(record, indexing)
initial_index_project(record) if record.class == Project && indexing
true
rescue Elasticsearch::Transport::Transport::Errors::NotFound, ActiveRecord::RecordNotFound => e
# These errors can happen in several cases, including:
# - A record is updated, then removed before the update is handled
# - Indexing is enabled, but not every item has been indexed yet - updating
# and deleting the un-indexed records will raise exception
#
# We can ignore these.
logger.error(message: 'elastic_index_record_service_caught_exception', error_class: e.class.name, error_message: e.message)
true
end
private
def initial_index_project(project)
# Enqueue the repository indexing jobs immediately so they run in parallel
# One for the project repository, one for the wiki repository
ElasticCommitIndexerWorker.perform_async(project.id)
ElasticCommitIndexerWorker.perform_async(project.id, nil, nil, true)
project.each_indexed_association do |klass, association|
import_association(association)
end
end
def import_association(association, options = {})
options[:return] = 'errors'
errors = association.es_import(options)
return if errors.empty?
IMPORT_RETRY_COUNT.times do
errors = retry_import(errors, association, options)
return if errors.empty?
end
raise ImportError.new(errors.inspect)
end
def import(record, indexing)
operation = indexing ? 'index_document' : 'update_document'
response = nil
IMPORT_RETRY_COUNT.times do
response = if record.es_parent
record.__elasticsearch__.__send__ operation, routing: record.es_parent # rubocop:disable GitlabSecurity/PublicSend
else
record.__elasticsearch__.__send__ operation # rubocop:disable GitlabSecurity/PublicSend
end
return if response['_shards']['successful'] > 0
end
raise ImportError.new(response)
end
def retry_import(errors, association, options)
ids = errors.map { |error| error['index']['_id'][/_(\d+)$/, 1] }
association.id_in(ids).es_import(options)
end
def logger
@logger ||= ::Gitlab::Elasticsearch::Logger.build
end
end
end
...@@ -16,13 +16,17 @@ module Elastic ...@@ -16,13 +16,17 @@ module Elastic
with_redis do |redis| with_redis do |redis|
# Efficiently generate a guaranteed-unique score for each item # Efficiently generate a guaranteed-unique score for each item
max = redis.incrby(REDIS_SCORE_KEY, items.size) max = redis.incrby(self::REDIS_SCORE_KEY, items.size)
min = (max - items.size) + 1 min = (max - items.size) + 1
(min..max).zip(items).each_slice(1000) do |group| (min..max).zip(items).each_slice(1000) do |group|
logger.debug(message: 'track_items', count: group.count, tracked_items_encoded: group.to_json) logger.debug(class: self.name,
redis_set: self::REDIS_SET_KEY,
message: 'track_items',
count: group.count,
tracked_items_encoded: group.to_json)
redis.zadd(REDIS_SET_KEY, group) redis.zadd(self::REDIS_SET_KEY, group)
end end
end end
...@@ -30,11 +34,11 @@ module Elastic ...@@ -30,11 +34,11 @@ module Elastic
end end
def queue_size def queue_size
with_redis { |redis| redis.zcard(REDIS_SET_KEY) } with_redis { |redis| redis.zcard(self::REDIS_SET_KEY) }
end end
def clear_tracking! def clear_tracking!
with_redis { |redis| redis.del(REDIS_SET_KEY, REDIS_SCORE_KEY) } with_redis { |redis| redis.del(self::REDIS_SET_KEY, self::REDIS_SCORE_KEY) }
end end
def logger def logger
...@@ -56,7 +60,7 @@ module Elastic ...@@ -56,7 +60,7 @@ module Elastic
def execute_with_redis(redis) def execute_with_redis(redis)
start_time = Time.current start_time = Time.current
specs = redis.zrangebyscore(REDIS_SET_KEY, '-inf', '+inf', limit: [0, LIMIT], with_scores: true) specs = redis.zrangebyscore(self.class::REDIS_SET_KEY, '-inf', '+inf', limit: [0, LIMIT], with_scores: true)
return 0 if specs.empty? return 0 if specs.empty?
first_score = specs.first.last first_score = specs.first.last
...@@ -77,7 +81,7 @@ module Elastic ...@@ -77,7 +81,7 @@ module Elastic
self.class.track!(*failures) if failures.present? self.class.track!(*failures) if failures.present?
# Remove all the successes # Remove all the successes
redis.zremrangebyscore(REDIS_SET_KEY, first_score, last_score) redis.zremrangebyscore(self.class::REDIS_SET_KEY, first_score, last_score)
records_count = specs.count records_count = specs.count
......
# frozen_string_literal: true
module Elastic
class ProcessInitialBookkeepingService < Elastic::ProcessBookkeepingService
REDIS_SET_KEY = 'elastic:bulk:initial:0:zset'
REDIS_SCORE_KEY = 'elastic:bulk:initial:0:score'
INDEXED_ASSOCIATIONS = [
:issues,
:merge_requests,
:snippets,
:notes,
:milestones
].freeze
class << self
def backfill_projects!(*projects)
track!(*projects)
projects.each do |project|
raise ArgumentError, 'This method only accepts Projects' unless project.is_a?(Project)
maintain_indexed_associations(project)
ElasticCommitIndexerWorker.perform_async(project.id)
ElasticCommitIndexerWorker.perform_async(project.id, nil, nil, true)
end
end
def each_indexed_association(project)
INDEXED_ASSOCIATIONS.each do |association_name|
association = project.association(association_name)
scope = association.scope
klass = association.klass
if klass == Note
scope = scope.searchable
end
yield klass, scope
end
end
private
def maintain_indexed_associations(project)
each_indexed_association(project) do |_, association|
association.find_in_batches do |group|
track!(*group)
end
end
end
end
end
end
...@@ -35,6 +35,14 @@ ...@@ -35,6 +35,14 @@
:weight: 1 :weight: 1
:idempotent: true :idempotent: true
:tags: [] :tags: []
- :name: cronjob:elastic_index_initial_bulk_cron
:feature_category: :global_search
:has_external_dependencies:
:urgency: :throttled
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: cronjob:geo_container_repository_sync_dispatch - :name: cronjob:geo_container_repository_sync_dispatch
:feature_category: :geo_replication :feature_category: :geo_replication
:has_external_dependencies: :has_external_dependencies:
...@@ -563,6 +571,14 @@ ...@@ -563,6 +571,14 @@
:weight: 1 :weight: 1
:idempotent: true :idempotent: true
:tags: [] :tags: []
- :name: elastic_delete_project
:feature_category: :global_search
:has_external_dependencies:
:urgency: :throttled
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: elastic_full_index - :name: elastic_full_index
:feature_category: :global_search :feature_category: :global_search
:has_external_dependencies: :has_external_dependencies:
......
# frozen_string_literal: true
module Elastic
module BulkCronWorker
extend ActiveSupport::Concern
included do
include ApplicationWorker
include Gitlab::ExclusiveLeaseHelpers
# There is no onward scheduling and this cron handles work from across the
# application, so there's no useful context to add.
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
end
def perform
if Elastic::IndexingControl.non_cached_pause_indexing?
logger.info(message: "elasticsearch_pause_indexing setting is enabled. #{self.class} execution is skipped.")
return false
end
in_lock(self.class.name.underscore, ttl: 10.minutes, retries: 10, sleep_sec: 1) do
records_count = service.execute
log_extra_metadata_on_done(:records_count, records_count)
end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
# We're scheduled on a cronjob, so nothing to do here
end
private
def logger
Elastic::IndexingControl.logger
end
end
end
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
# Concern for pausing/unpausing elasticsearch indexing workers # Concern for pausing/unpausing elasticsearch indexing workers
module Elastic module Elastic
module IndexingControl module IndexingControl
WORKERS = [ElasticCommitIndexerWorker, ElasticIndexerWorker].freeze WORKERS = [ElasticCommitIndexerWorker, ElasticDeleteProjectWorker].freeze
def perform(*args) def perform(*args)
if Elastic::IndexingControl.non_cached_pause_indexing? && WORKERS.include?(self.class) if Elastic::IndexingControl.non_cached_pause_indexing? && WORKERS.include?(self.class)
......
# frozen_string_literal: true
class ElasticDeleteProjectWorker
include ApplicationWorker
include Elasticsearch::Model::Client::ClassMethods
prepend Elastic::IndexingControl
sidekiq_options retry: 2
feature_category :global_search
urgency :throttled
idempotent!
def perform(project_id, es_id)
remove_project_and_children_documents(project_id, es_id)
end
private
def remove_project_and_children_documents(project_id, es_id)
client.delete_by_query({
index: Project.__elasticsearch__.index_name,
routing: es_id,
body: {
query: {
bool: {
should: [
{
has_parent: {
parent_type: 'project',
query: {
term: { id: project_id }
}
}
},
{
term: {
_id: es_id
}
}
]
}
}
}
})
end
end
...@@ -13,16 +13,8 @@ class ElasticFullIndexWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -13,16 +13,8 @@ class ElasticFullIndexWorker # rubocop:disable Scalability/IdempotentWorker
def perform(start_id, end_id) def perform(start_id, end_id)
return true unless Gitlab::CurrentSettings.elasticsearch_indexing? return true unless Gitlab::CurrentSettings.elasticsearch_indexing?
failed_ids = []
Project.id_in(start_id..end_id).find_each do |project| Project.id_in(start_id..end_id).find_each do |project|
Elastic::IndexRecordService.new.execute(project, true) Elastic::ProcessInitialBookkeepingService.backfill_projects!(project)
rescue Elastic::IndexRecordService::ImportError
failed_ids << project.id
end
if failed_ids.present?
Elastic::IndexProjectsByIdService.new.execute(project_ids: failed_ids)
end end
end end
end end
# frozen_string_literal: true # frozen_string_literal: true
class ElasticIndexBulkCronWorker class ElasticIndexBulkCronWorker
include ApplicationWorker include Elastic::BulkCronWorker
include Gitlab::ExclusiveLeaseHelpers
# There is no onward scheduling and this cron handles work from across the
# application, so there's no useful context to add.
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
feature_category :global_search feature_category :global_search
idempotent! idempotent!
urgency :throttled urgency :throttled
def perform
if Elastic::IndexingControl.non_cached_pause_indexing?
logger.info(message: 'elasticsearch_pause_indexing setting is enabled. ElasticBulkCronWorker execution is skipped.')
return false
end
in_lock(self.class.name.underscore, ttl: 10.minutes, retries: 10, sleep_sec: 1) do
records_count = Elastic::ProcessBookkeepingService.new.execute
log_extra_metadata_on_done(:records_count, records_count)
end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
# We're scheduled on a cronjob, so nothing to do here
end
private private
def logger def service
Elastic::IndexingControl.logger Elastic::ProcessBookkeepingService.new
end end
end end
# frozen_string_literal: true
class ElasticIndexInitialBulkCronWorker
include Elastic::BulkCronWorker
feature_category :global_search
idempotent!
urgency :throttled
private
def service
Elastic::ProcessInitialBookkeepingService.new
end
end
# frozen_string_literal: true # frozen_string_literal: true
# Usage of this worker is deprecated, please remove it in the next major version
class ElasticIndexerWorker # rubocop:disable Scalability/IdempotentWorker class ElasticIndexerWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker include ApplicationWorker
include Elasticsearch::Model::Client::ClassMethods
prepend Elastic::IndexingControl
sidekiq_options retry: 2 sidekiq_options retry: 2
feature_category :global_search feature_category :global_search
...@@ -13,65 +12,15 @@ class ElasticIndexerWorker # rubocop:disable Scalability/IdempotentWorker ...@@ -13,65 +12,15 @@ class ElasticIndexerWorker # rubocop:disable Scalability/IdempotentWorker
return true unless Gitlab::CurrentSettings.elasticsearch_indexing? return true unless Gitlab::CurrentSettings.elasticsearch_indexing?
klass = class_name.constantize klass = class_name.constantize
record = klass.find(record_id)
case operation.to_s case operation.to_s
when /index|update/ when /index/
Elastic::IndexRecordService.new.execute( record.maintain_elasticsearch_create
klass.find(record_id), when /update/
operation.to_s.match?(/index/), record.maintain_elasticsearch_update
options
)
when /delete/ when /delete/
if options['es_parent'] record.maintain_elasticsearch_destroy
client.delete(
index: klass.index_name,
type: klass.document_type,
id: es_id,
routing: options['es_parent']
)
else
clear_project_data(record_id, es_id) if klass == Project
client.delete index: klass.index_name, type: klass.document_type, id: es_id
end end
end end
rescue Elasticsearch::Transport::Transport::Errors::NotFound, ActiveRecord::RecordNotFound => e
# These errors can happen in several cases, including:
# - A record is updated, then removed before the update is handled
# - Indexing is enabled, but not every item has been indexed yet - updating
# and deleting the un-indexed records will raise exception
#
# We can ignore these.
logger.error(message: 'elastic_indexer_worker_caught_exception', error_class: e.class.name, error_message: e.message)
true
end
private
def clear_project_data(record_id, es_id)
remove_children_documents('project', record_id, es_id)
IndexStatus.for_project(record_id).delete_all
end
def remove_children_documents(parent_type, parent_record_id, parent_es_id)
client.delete_by_query({
index: Project.__elasticsearch__.index_name,
routing: parent_es_id,
body: {
query: {
has_parent: {
parent_type: parent_type,
query: {
term: { id: parent_record_id }
}
}
}
}
})
end
def logger
@logger ||= ::Gitlab::Elasticsearch::Logger.build
end
end end
...@@ -23,18 +23,15 @@ class ElasticNamespaceIndexerWorker # rubocop:disable Scalability/IdempotentWork ...@@ -23,18 +23,15 @@ class ElasticNamespaceIndexerWorker # rubocop:disable Scalability/IdempotentWork
private private
def index_projects(namespace) def index_projects(namespace)
# The default of 1000 is good for us since Sidekiq documentation doesn't recommend more than 1000 per batch call
# https://www.rubydoc.info/github/mperham/sidekiq/Sidekiq%2FClient:push_bulk
namespace.all_projects.find_in_batches do |batch| namespace.all_projects.find_in_batches do |batch|
args = batch.map { |project| [:index, project.class.to_s, project.id, project.es_id] } ::Elastic::ProcessInitialBookkeepingService.backfill_projects!(*batch)
ElasticIndexerWorker.bulk_perform_async(args) # rubocop:disable Scalability/BulkPerformWithContext
end end
end end
def delete_from_index(namespace) def delete_from_index(namespace)
namespace.all_projects.find_in_batches do |batch| namespace.all_projects.find_in_batches do |batch|
args = batch.map { |project| [:delete, project.class.to_s, project.id, project.es_id] } args = batch.map { |project| [project.id, project.es_id] }
ElasticIndexerWorker.bulk_perform_async(args) # rubocop:disable Scalability/BulkPerformWithContext ElasticDeleteProjectWorker.bulk_perform_async(args) # rubocop:disable Scalability/BulkPerformWithContext
end end
end end
end end
...@@ -6,8 +6,11 @@ module Elastic ...@@ -6,8 +6,11 @@ module Elastic
return unless elasticsearch_enabled? return unless elasticsearch_enabled?
return unless prometheus_enabled? return unless prometheus_enabled?
gauge = Gitlab::Metrics.gauge(:global_search_bulk_cron_queue_size, 'Number of database records waiting to be synchronized to Elasticsearch', {}, :max) incremental_gauge = Gitlab::Metrics.gauge(:global_search_bulk_cron_queue_size, 'Number of incremental database updates waiting to be synchronized to Elasticsearch', {}, :max)
gauge.set({}, Elastic::ProcessBookkeepingService.queue_size) incremental_gauge.set({}, Elastic::ProcessBookkeepingService.queue_size)
initial_gauge = Gitlab::Metrics.gauge(:global_search_bulk_cron_initial_queue_size, 'Number of initial database updates waiting to be synchronized to Elasticsearch', {}, :max)
initial_gauge.set({}, Elastic::ProcessInitialBookkeepingService.queue_size)
end end
private private
......
...@@ -18,11 +18,7 @@ namespace :gitlab do ...@@ -18,11 +18,7 @@ namespace :gitlab do
print "Enqueuing projects" print "Enqueuing projects"
project_id_batches do |ids| project_id_batches do |ids|
args = ids.collect do |id| ::Elastic::ProcessInitialBookkeepingService.backfill_projects!(*Project.find(ids))
[:index, 'Project', id, nil] # es_id is unused for :index
end
ElasticIndexerWorker.bulk_perform_async(args) # rubocop:disable Scalability/BulkPerformWithContext
print "." print "."
end end
......
...@@ -9,7 +9,6 @@ RSpec.describe 'Repository index', :elastic do ...@@ -9,7 +9,6 @@ RSpec.describe 'Repository index', :elastic do
before do before do
stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true) stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
ElasticIndexerWorker.new.perform("index", "Project", project.id, project.es_id)
end end
it 'indexes initial push' do it 'indexes initial push' do
......
...@@ -57,9 +57,7 @@ RSpec.describe Gitlab::Elastic::Indexer do ...@@ -57,9 +57,7 @@ RSpec.describe Gitlab::Elastic::Indexer do
let(:to_sha) { project.repository.commit.sha } let(:to_sha) { project.repository.commit.sha }
before do before do
# enable the indexing and index the project
stub_ee_application_setting(elasticsearch_indexing: true) stub_ee_application_setting(elasticsearch_indexing: true)
Elastic::IndexRecordService.new.execute(project, true)
end end
shared_examples 'index up to the specified commit' do shared_examples 'index up to the specified commit' do
......
...@@ -47,9 +47,6 @@ RSpec.describe Project, :elastic do ...@@ -47,9 +47,6 @@ RSpec.describe Project, :elastic do
it 'only indexes enabled projects' do it 'only indexes enabled projects' do
Sidekiq::Testing.inline! do Sidekiq::Testing.inline! do
# We have to trigger indexing of the previously-created project because we don't have a way to
# enable ES for it before it's created, at which point it won't be indexed anymore
ElasticIndexerWorker.perform_async(:index, project.class.to_s, project.id, project.es_id)
create :project, path: 'test2', description: 'awesome project' create :project, path: 'test2', description: 'awesome project'
create :project create :project
...@@ -78,18 +75,18 @@ RSpec.describe Project, :elastic do ...@@ -78,18 +75,18 @@ RSpec.describe Project, :elastic do
it 'indexes only projects under the group' do it 'indexes only projects under the group' do
Sidekiq::Testing.inline! do Sidekiq::Testing.inline! do
create :project, name: 'test1', group: create(:group, parent: group) create :project, name: 'group_test1', group: create(:group, parent: group)
create :project, name: 'test2', description: 'awesome project' create :project, name: 'group_test2', description: 'awesome project'
create :project, name: 'test3', group: group create :project, name: 'group_test3', group: group
create :project, path: 'someone_elses_project', name: 'test4' create :project, path: 'someone_elses_project', name: 'test4'
ensure_elasticsearch_index! ensure_elasticsearch_index!
end end
expect(described_class.elastic_search('test*', options: { project_ids: :any }).total_count).to eq(2) expect(described_class.elastic_search('group_test*', options: { project_ids: :any }).total_count).to eq(2)
expect(described_class.elastic_search('test3', options: { project_ids: :any }).total_count).to eq(1) expect(described_class.elastic_search('group_test3', options: { project_ids: :any }).total_count).to eq(1)
expect(described_class.elastic_search('test2', options: { project_ids: :any }).total_count).to eq(0) expect(described_class.elastic_search('group_test2', options: { project_ids: :any }).total_count).to eq(0)
expect(described_class.elastic_search('test4', options: { project_ids: :any }).total_count).to eq(0) expect(described_class.elastic_search('group_test4', options: { project_ids: :any }).total_count).to eq(0)
end end
end end
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Elastic::ProjectsSearch do
subject do
Class.new do
include Elastic::ProjectsSearch
def id
1
end
def es_id
1
end
end.new
end
describe '#maintain_elasticsearch_create' do
it 'calls track!' do
expect(::Elastic::ProcessInitialBookkeepingService).to receive(:track!).and_return(true)
subject.maintain_elasticsearch_create
end
end
describe '#maintain_elasticsearch_destroy' do
it 'calls delete worker' do
expect(ElasticDeleteProjectWorker).to receive(:perform_async)
subject.maintain_elasticsearch_destroy
end
end
end
...@@ -11,10 +11,10 @@ RSpec.describe ElasticsearchIndexedProject do ...@@ -11,10 +11,10 @@ RSpec.describe ElasticsearchIndexedProject do
let(:container) { :elasticsearch_indexed_project } let(:container) { :elasticsearch_indexed_project }
let(:attribute) { :project_id } let(:attribute) { :project_id }
let(:index_action) do let(:index_action) do
expect(ElasticIndexerWorker).to receive(:perform_async).with(:index, 'Project', subject.project_id, any_args) expect(Elastic::ProcessBookkeepingService).to receive(:track!).with(subject.project)
end end
let(:delete_action) do let(:delete_action) do
expect(ElasticIndexerWorker).to receive(:perform_async).with(:delete, 'Project', subject.project_id, any_args) expect(ElasticDeleteProjectWorker).to receive(:perform_async).with(subject.project.id, subject.project.es_id)
end end
end end
end end
...@@ -5,17 +5,20 @@ require 'spec_helper' ...@@ -5,17 +5,20 @@ require 'spec_helper'
RSpec.describe Elastic::IndexProjectsByIdService do RSpec.describe Elastic::IndexProjectsByIdService do
describe '#execute' do describe '#execute' do
it 'schedules index workers' do it 'schedules index workers' do
project1 = create(:project)
project2 = create(:project)
expect(Elastic::ProcessInitialBookkeepingService).to receive(:backfill_projects!).with(project1, project2)
Sidekiq::Testing.fake! do Sidekiq::Testing.fake! do
described_class.new.execute(project_ids: [1, 2], namespace_ids: [3, 4]) described_class.new.execute(project_ids: [project1.id, project2.id], namespace_ids: [3, 4])
end end
jobs = Sidekiq::Queues[ElasticFullIndexWorker.queue] jobs = Sidekiq::Queues[ElasticNamespaceIndexerWorker.queue]
expect(jobs.size).to eq(4) expect(jobs.size).to eq(2)
expect(jobs[0]['args']).to eq(['index', 'Project', 1, nil]) expect(jobs[0]['args']).to eq([3, 'index'])
expect(jobs[1]['args']).to eq(['index', 'Project', 2, nil]) expect(jobs[1]['args']).to eq([4, 'index'])
expect(jobs[2]['args']).to eq([3, 'index'])
expect(jobs[3]['args']).to eq([4, 'index'])
end end
end end
end end
This diff is collapsed.
...@@ -11,15 +11,22 @@ RSpec.describe Elastic::MetricsUpdateService, :prometheus do ...@@ -11,15 +11,22 @@ RSpec.describe Elastic::MetricsUpdateService, :prometheus do
end end
describe '#execute' do describe '#execute' do
it 'sets a gauge for global_search_bulk_cron_queue_size' do it 'sets gauges' do
expect(Elastic::ProcessBookkeepingService).to receive(:queue_size).and_return(4) expect(Elastic::ProcessBookkeepingService).to receive(:queue_size).and_return(4)
expect(Elastic::ProcessInitialBookkeepingService).to receive(:queue_size).and_return(6)
gauge_double = instance_double(Prometheus::Client::Gauge) incremental_gauge_double = instance_double(Prometheus::Client::Gauge)
expect(Gitlab::Metrics).to receive(:gauge) expect(Gitlab::Metrics).to receive(:gauge)
.with(:global_search_bulk_cron_queue_size, anything, {}, :max) .with(:global_search_bulk_cron_queue_size, anything, {}, :max)
.and_return(gauge_double) .and_return(incremental_gauge_double)
expect(gauge_double).to receive(:set).with({}, 4) initial_gauge_double = instance_double(Prometheus::Client::Gauge)
expect(Gitlab::Metrics).to receive(:gauge)
.with(:global_search_bulk_cron_initial_queue_size, anything, {}, :max)
.and_return(initial_gauge_double)
expect(incremental_gauge_double).to receive(:set).with({}, 4)
expect(initial_gauge_double).to receive(:set).with({}, 6)
subject.execute subject.execute
end end
......
# frozen_string_literal: true
require 'spec_helper'
describe Elastic::ProcessInitialBookkeepingService do
let(:project) { create(:project) }
let(:issue) { create(:issue) }
describe '.backfill_projects!' do
it 'calls initial project indexing' do
expect(described_class).to receive(:maintain_indexed_associations)
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id)
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id, nil, nil, true)
described_class.backfill_projects!(project)
end
it 'raises an exception if non project is provided' do
expect { described_class.backfill_projects!(issue) }.to raise_error(ArgumentError)
end
it 'uses a separate queue' do
expect { described_class.backfill_projects!(project) }.not_to change { Elastic::ProcessBookkeepingService.queue_size }
end
end
end
...@@ -84,12 +84,9 @@ RSpec.describe Groups::TransferService, '#execute' do ...@@ -84,12 +84,9 @@ RSpec.describe Groups::TransferService, '#execute' do
project2 = create(:project, :repository, :public, namespace: group) project2 = create(:project, :repository, :public, namespace: group)
project3 = create(:project, :repository, :private, namespace: group) project3 = create(:project, :repository, :private, namespace: group)
expect(ElasticIndexerWorker).to receive(:perform_async) expect(Elastic::ProcessBookkeepingService).to receive(:track!).with(project1)
.with(:update, "Project", project1.id, project1.es_id) expect(Elastic::ProcessBookkeepingService).to receive(:track!).with(project2)
expect(ElasticIndexerWorker).to receive(:perform_async) expect(Elastic::ProcessBookkeepingService).not_to receive(:track!).with(project3)
.with(:update, "Project", project2.id, project2.es_id)
expect(ElasticIndexerWorker).not_to receive(:perform_async)
.with(:update, "Project", project3.id, project3.es_id)
transfer_service.execute(new_group) transfer_service.execute(new_group)
......
...@@ -4,6 +4,7 @@ module ElasticsearchHelpers ...@@ -4,6 +4,7 @@ module ElasticsearchHelpers
def ensure_elasticsearch_index! def ensure_elasticsearch_index!
# Ensure that any enqueued updates are processed # Ensure that any enqueued updates are processed
Elastic::ProcessBookkeepingService.new.execute Elastic::ProcessBookkeepingService.new.execute
Elastic::ProcessInitialBookkeepingService.new.execute
# Make any documents added to the index visible # Make any documents added to the index visible
refresh_index! refresh_index!
......
...@@ -32,10 +32,9 @@ RSpec.describe 'gitlab:elastic namespace rake tasks', :elastic do ...@@ -32,10 +32,9 @@ RSpec.describe 'gitlab:elastic namespace rake tasks', :elastic do
end end
it 'queues jobs for each project batch' do it 'queues jobs for each project batch' do
expect(ElasticIndexerWorker).to receive(:bulk_perform_async).with([ expect(Elastic::ProcessInitialBookkeepingService).to receive(:backfill_projects!).with(
[:index, 'Project', project1.id, nil], project1, project2
[:index, 'Project', project2.id, nil] )
])
run_rake_task 'gitlab:elastic:index_projects' run_rake_task 'gitlab:elastic:index_projects'
end end
...@@ -55,10 +54,9 @@ RSpec.describe 'gitlab:elastic namespace rake tasks', :elastic do ...@@ -55,10 +54,9 @@ RSpec.describe 'gitlab:elastic namespace rake tasks', :elastic do
end end
it 'does not queue jobs for projects that should not be indexed' do it 'does not queue jobs for projects that should not be indexed' do
expect(ElasticIndexerWorker).to receive(:bulk_perform_async).with([ expect(Elastic::ProcessInitialBookkeepingService).to receive(:backfill_projects!).with(
[:index, 'Project', project1.id, nil], project1, project3
[:index, 'Project', project3.id, nil] )
])
run_rake_task 'gitlab:elastic:index_projects' run_rake_task 'gitlab:elastic:index_projects'
end end
......
...@@ -24,6 +24,15 @@ RSpec.describe Elastic::IndexingControl do ...@@ -24,6 +24,15 @@ RSpec.describe Elastic::IndexingControl do
let(:worker_args) { [project.id] } let(:worker_args) { [project.id] }
let(:worker_context) { { 'correlation_id' => 'context_correlation_id' } } let(:worker_context) { { 'correlation_id' => 'context_correlation_id' } }
describe '::WORKERS' do
it 'only includes classes which inherit from this class' do
described_class::WORKERS.each do |klass|
expect(klass.ancestors.first).to eq(described_class)
end
end
end
context 'with stub_const' do
before do before do
stub_const("Elastic::IndexingControl::WORKERS", [worker.class]) stub_const("Elastic::IndexingControl::WORKERS", [worker.class])
end end
...@@ -94,4 +103,5 @@ RSpec.describe Elastic::IndexingControl do ...@@ -94,4 +103,5 @@ RSpec.describe Elastic::IndexingControl do
worker.perform(*worker_args) worker.perform(*worker_args)
end end
end end
end
end end
...@@ -61,8 +61,6 @@ RSpec.describe ElasticBatchProjectIndexerWorker do ...@@ -61,8 +61,6 @@ RSpec.describe ElasticBatchProjectIndexerWorker do
end end
it 'indexes all projects it receives even if already indexed', :sidekiq_might_not_need_inline do it 'indexes all projects it receives even if already indexed', :sidekiq_might_not_need_inline do
projects.first.index_status.update!(last_commit: 'foo')
expect_index(projects.first).and_call_original expect_index(projects.first).and_call_original
expect_next_instance_of(Gitlab::Elastic::Indexer) do |indexer| expect_next_instance_of(Gitlab::Elastic::Indexer) do |indexer|
expect(indexer).to receive(:run) expect(indexer).to receive(:run)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe ElasticDeleteProjectWorker, :elastic do
subject { described_class.new }
# Create admin user and search globally to avoid dealing with permissions in
# these tests
let(:user) { create(:admin) }
let(:search_options) { { options: { current_user: user, project_ids: :any } } }
before do
stub_ee_application_setting(elasticsearch_indexing: true)
end
it 'deletes a project with all nested objects' do
project = create :project, :repository
issue = create :issue, project: project
milestone = create :milestone, project: project
note = create :note, project: project
merge_request = create :merge_request, target_project: project, source_project: project
ensure_elasticsearch_index!
## All database objects + data from repository. The absolute value does not matter
expect(Project.elastic_search('*', search_options).records).to include(project)
expect(Issue.elastic_search('*', search_options).records).to include(issue)
expect(Milestone.elastic_search('*', search_options).records).to include(milestone)
expect(Note.elastic_search('*', search_options).records).to include(note)
expect(MergeRequest.elastic_search('*', search_options).records).to include(merge_request)
subject.perform(project.id, project.es_id)
ensure_elasticsearch_index!
expect(Project.elastic_search('*', search_options).total_count).to be(0)
expect(Issue.elastic_search('*', search_options).total_count).to be(0)
expect(Milestone.elastic_search('*', search_options).total_count).to be(0)
expect(Note.elastic_search('*', search_options).total_count).to be(0)
expect(MergeRequest.elastic_search('*', search_options).total_count).to be(0)
end
end
...@@ -11,7 +11,7 @@ RSpec.describe ElasticFullIndexWorker do ...@@ -11,7 +11,7 @@ RSpec.describe ElasticFullIndexWorker do
it 'does nothing if ES disabled' do it 'does nothing if ES disabled' do
stub_ee_application_setting(elasticsearch_indexing: false) stub_ee_application_setting(elasticsearch_indexing: false)
expect(Elastic::IndexRecordService).not_to receive(:new) expect(Elastic::ProcessInitialBookkeepingService).not_to receive(:backfill_projects!)
subject.perform(1, 2) subject.perform(1, 2)
end end
...@@ -21,23 +21,7 @@ RSpec.describe ElasticFullIndexWorker do ...@@ -21,23 +21,7 @@ RSpec.describe ElasticFullIndexWorker do
it 'indexes projects in range' do it 'indexes projects in range' do
projects.each do |project| projects.each do |project|
expect_next_instance_of(Elastic::IndexRecordService) do |service| expect(Elastic::ProcessInitialBookkeepingService).to receive(:backfill_projects!).with(project)
expect(service).to receive(:execute).with(project, true).and_return(true)
end
end
subject.perform(projects.first.id, projects.last.id)
end
it 'retries failed indexing' do
projects.each do |project|
expect_next_instance_of(Elastic::IndexRecordService) do |service|
expect(service).to receive(:execute).with(project, true).and_raise(Elastic::IndexRecordService::ImportError)
end
end
expect_next_instance_of(Elastic::IndexProjectsByIdService) do |service|
expect(service).to receive(:execute).with(project_ids: projects.map(&:id))
end end
subject.perform(projects.first.id, projects.last.id) subject.perform(projects.first.id, projects.last.id)
......
...@@ -37,6 +37,7 @@ RSpec.describe ElasticIndexBulkCronWorker do ...@@ -37,6 +37,7 @@ RSpec.describe ElasticIndexBulkCronWorker do
expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service| expect_next_instance_of(::Elastic::ProcessBookkeepingService) do |service|
expect(service).to receive(:execute).and_return(15) expect(service).to receive(:execute).and_return(15)
end end
worker = described_class.new worker = described_class.new
worker.perform worker.perform
......
...@@ -2,139 +2,45 @@ ...@@ -2,139 +2,45 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe ElasticIndexerWorker, :elastic do RSpec.describe ElasticIndexerWorker do
subject { described_class.new } subject { described_class.new }
# Create admin user and search globally to avoid dealing with permissions in describe '#perform' do
# these tests context 'indexing is enabled' do
let(:user) { create(:admin) } using RSpec::Parameterized::TableSyntax
let(:search_options) { { options: { current_user: user, project_ids: :any } } }
let(:project) { instance_double(Project, id: 1, es_id: 1) }
before do before do
stub_ee_application_setting(elasticsearch_indexing: true) stub_ee_application_setting(elasticsearch_indexing: true)
expect(Project).to receive(:find).and_return(project)
Elasticsearch::Model.client =
Gitlab::Elastic::Client.build(Gitlab::CurrentSettings.elasticsearch_config)
end end
it 'returns true if ES disabled' do where(:operation, :method) do
stub_ee_application_setting(elasticsearch_indexing: false) 'index' | 'maintain_elasticsearch_create'
'update' | 'maintain_elasticsearch_update'
expect_any_instance_of(Elasticsearch::Model).not_to receive(:__elasticsearch__) 'delete' | 'maintain_elasticsearch_destroy'
expect(subject.perform("index", "Milestone", 1, 1)).to be_truthy
end
describe 'Indexing, updating, and deleting records' do
using RSpec::Parameterized::TableSyntax
where(:type, :name) do
:project | "Project"
:issue | "Issue"
:note | "Note"
:milestone | "Milestone"
:merge_request | "MergeRequest"
end end
with_them do with_them do
it 'calls record indexing' do it 'calls respective methods' do
object = create(type) expect(project).to receive(method.to_sym)
expect_next_instance_of(Elastic::IndexRecordService) do |service|
expect(service).to receive(:execute).with(object, true, {}).and_return(true)
end
subject.perform("index", name, object.id, object.es_id)
end
it 'deletes from index when an object is deleted' do
object = nil
Sidekiq::Testing.disable! do
object = create(type)
if type != :project
# You cannot find anything in the index if it's parent project is
# not first indexed.
subject.perform("index", "Project", object.project.id, object.project.es_id)
end
subject.perform("index", name, object.id, object.es_id)
ensure_elasticsearch_index!
object.destroy
end
expect do subject.perform(operation, 'Project', project.id, project.es_id)
subject.perform("delete", name, object.id, object.es_id, { 'es_parent' => object.es_parent })
ensure_elasticsearch_index!
end.to change { object.class.elastic_search('*', search_options).total_count }.by(-1)
end end
end end
end end
it 'deletes a project with all nested objects' do context 'indexing is disabled' do
project, issue, milestone, note, merge_request = nil before do
stub_ee_application_setting(elasticsearch_indexing: false)
Sidekiq::Testing.disable! do
project = create :project, :repository
subject.perform("index", "Project", project.id, project.es_id)
issue = create :issue, project: project
subject.perform("index", "Issue", issue.id, issue.es_id)
milestone = create :milestone, project: project
subject.perform("index", "Milestone", milestone.id, milestone.es_id)
note = create :note, project: project
subject.perform("index", "Note", note.id, note.es_id)
merge_request = create :merge_request, target_project: project, source_project: project
subject.perform("index", "MergeRequest", merge_request.id, merge_request.es_id)
end
ElasticCommitIndexerWorker.new.perform(project.id)
ensure_elasticsearch_index!
## All database objects + data from repository. The absolute value does not matter
expect(Project.elastic_search('*', search_options).records).to include(project)
expect(Issue.elastic_search('*', search_options).records).to include(issue)
expect(Milestone.elastic_search('*', search_options).records).to include(milestone)
expect(Note.elastic_search('*', search_options).records).to include(note)
expect(MergeRequest.elastic_search('*', search_options).records).to include(merge_request)
subject.perform("delete", "Project", project.id, project.es_id)
ensure_elasticsearch_index!
expect(Project.elastic_search('*', search_options).total_count).to be(0)
expect(Issue.elastic_search('*', search_options).total_count).to be(0)
expect(Milestone.elastic_search('*', search_options).total_count).to be(0)
expect(Note.elastic_search('*', search_options).total_count).to be(0)
expect(MergeRequest.elastic_search('*', search_options).total_count).to be(0)
end
it 'retries if index raises error' do
object = create(:project)
expect_next_instance_of(Elastic::IndexRecordService) do |service|
allow(service).to receive(:execute).and_raise(Elastic::IndexRecordService::ImportError)
end
expect do
subject.perform("index", 'Project', object.id, object.es_id)
end.to raise_error(Elastic::IndexRecordService::ImportError)
end end
it 'ignores Elasticsearch::Transport::Transport::Errors::NotFound error' do it 'returns true if ES disabled' do
object = create(:project) expect(Milestone).not_to receive(:find).with(1)
expect_next_instance_of(Elastic::IndexRecordService) do |service| expect(subject.perform('index', 'Milestone', 1, 1)).to be_truthy
allow(service).to receive(:execute).and_raise(Elasticsearch::Transport::Transport::Errors::NotFound)
end end
expect(subject.perform("index", 'Project', object.id, object.es_id)).to eq(true)
end end
it 'ignores missing records' do
expect(subject.perform("index", 'Project', -1, 'project_-1')).to eq(true)
end end
end end
...@@ -13,7 +13,7 @@ RSpec.describe ElasticNamespaceIndexerWorker, :elastic do ...@@ -13,7 +13,7 @@ RSpec.describe ElasticNamespaceIndexerWorker, :elastic do
it 'returns true if ES disabled' do it 'returns true if ES disabled' do
stub_ee_application_setting(elasticsearch_indexing: false) stub_ee_application_setting(elasticsearch_indexing: false)
expect(ElasticIndexerWorker).not_to receive(:perform_async) expect(Elastic::ProcessInitialBookkeepingService).not_to receive(:backfill_projects!)
expect(subject.perform(1, "index")).to be_truthy expect(subject.perform(1, "index")).to be_truthy
end end
...@@ -21,7 +21,7 @@ RSpec.describe ElasticNamespaceIndexerWorker, :elastic do ...@@ -21,7 +21,7 @@ RSpec.describe ElasticNamespaceIndexerWorker, :elastic do
it 'returns true if limited indexing is not enabled' do it 'returns true if limited indexing is not enabled' do
stub_ee_application_setting(elasticsearch_limit_indexing: false) stub_ee_application_setting(elasticsearch_limit_indexing: false)
expect(ElasticIndexerWorker).not_to receive(:perform_async) expect(Elastic::ProcessInitialBookkeepingService).not_to receive(:backfill_projects!)
expect(subject.perform(1, "index")).to be_truthy expect(subject.perform(1, "index")).to be_truthy
end end
...@@ -31,15 +31,14 @@ RSpec.describe ElasticNamespaceIndexerWorker, :elastic do ...@@ -31,15 +31,14 @@ RSpec.describe ElasticNamespaceIndexerWorker, :elastic do
let(:projects) { create_list :project, 3, namespace: namespace } let(:projects) { create_list :project, 3, namespace: namespace }
it 'indexes all projects belonging to the namespace' do it 'indexes all projects belonging to the namespace' do
args = projects.map { |project| [:index, project.class.to_s, project.id, project.es_id] } expect(Elastic::ProcessInitialBookkeepingService).to receive(:backfill_projects!).with(*projects)
expect(ElasticIndexerWorker).to receive(:bulk_perform_async).with(args)
subject.perform(namespace.id, :index) subject.perform(namespace.id, :index)
end end
it 'deletes all projects belonging to the namespace' do it 'deletes all projects belonging to the namespace' do
args = projects.map { |project| [:delete, project.class.to_s, project.id, project.es_id] } args = projects.map { |project| [project.id, project.es_id] }
expect(ElasticIndexerWorker).to receive(:bulk_perform_async).with(args) expect(ElasticDeleteProjectWorker).to receive(:bulk_perform_async).with(args)
subject.perform(namespace.id, :delete) subject.perform(namespace.id, :delete)
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment