Commit 961b73cd authored by Terri Chu's avatar Terri Chu Committed by Mark Chao

Delete merge requests from original ES index

parent 2f7ca7e0
......@@ -6,6 +6,8 @@ module Elastic
delegate :migrate, :skip_migration?, :completed?, :batched?, :throttle_delay, :pause_indexing?, :space_requirements?, :space_required_bytes, to: :migration
ELASTICSEARCH_SIZE = 25
def initialize(version:, name:, filename:)
@version = version
@name = name
......@@ -57,7 +59,7 @@ module Elastic
def self.load_versions(completed:)
helper = Gitlab::Elastic::Helper.default
helper.client
.search(index: helper.migrations_index_name, body: { query: { term: { completed: completed } } })
.search(index: helper.migrations_index_name, body: { query: { term: { completed: completed } }, size: ELASTICSEARCH_SIZE })
.dig('hits', 'hits')
.map { |v| v['_id'].to_i }
rescue Elasticsearch::Transport::Transport::Errors::NotFound
......
---
title: Delete Merge Requests from original ES index
merge_request: 61394
author:
type: changed
# frozen_string_literal: true
class DeleteMergeRequestsFromOriginalIndex < Elastic::Migration
batched!
throttle_delay 3.minutes
MAX_ATTEMPTS = 30
QUERY_BODY = {
query: {
term: {
type: 'merge_request'
}
}
}.freeze
def migrate
retry_attempt = migration_state[:retry_attempt].to_i
task_id = migration_state[:task_id]
if retry_attempt >= MAX_ATTEMPTS
fail_migration_halt_error!(retry_attempt: retry_attempt)
return
end
if task_id
log "Checking task status with task_id:#{task_id}"
check_task(task_id, retry_attempt)
return
end
if completed?
log "Skipping removing merge requests from the original index since it is already applied"
return
end
log "Launching delete by query"
response = client.delete_by_query(index: helper.target_name, body: QUERY_BODY, conflicts: 'proceed', wait_for_completion: false)
log_raise "Failed to delete merge requests with task_id:#{task_id} - #{response['failures']}" if response['failures'].present?
task_id = response['task']
log "Removing merge requests from the original index is started with task_id:#{task_id}"
set_migration_state(
retry_attempt: retry_attempt,
task_id: task_id
)
rescue StandardError => e
log "migrate failed, increasing migration_state retry_attempt: #{retry_attempt} error:#{e.class}:#{e.message}"
set_migration_state(
retry_attempt: retry_attempt + 1,
task_id: nil
)
raise e
end
def completed?
helper.refresh_index
results = client.count(index: helper.target_index_name, body: QUERY_BODY)
total_remaining = results.dig('count')
log "Checking to see if migration is completed based on index counts remaining:#{total_remaining}"
total_remaining == 0
end
private
def check_task(task_id, retry_attempt)
response = helper.task_status(task_id: task_id)
if response['completed']
log "Removing merge requests from the original index is completed for task_id:#{task_id}"
set_migration_state(
retry_attempt: retry_attempt,
task_id: nil
)
# since delete_by_query is using wait_for_completion = false, the task must be cleaned up
# in Elasticsearch system .tasks index
helper.client.delete(index: '.tasks', type: 'task', id: task_id)
else
log "Removing merge requests from the original index is still in progress for task_id:#{task_id}"
end
log_raise "Failed to delete merge requests: #{response['failures']}" if response['failures'].present?
end
end
# frozen_string_literal: true
require 'spec_helper'
require File.expand_path('ee/elastic/migrate/20210510113500_delete_merge_requests_from_original_index.rb')
RSpec.describe DeleteMergeRequestsFromOriginalIndex, :elastic, :sidekiq_inline do
let(:version) { 20210510113500 }
let(:migration) { described_class.new(version) }
let(:helper) { Gitlab::Elastic::Helper.new }
before do
stub_ee_application_setting(elasticsearch_search: true, elasticsearch_indexing: true)
allow(migration).to receive(:helper).and_return(helper)
end
describe 'migration_options' do
it 'has migration options set', :aggregate_failures do
expect(migration.batched?).to be_truthy
expect(migration.throttle_delay).to eq(3.minutes)
end
end
context 'merge requests are already deleted' do
it 'does not execute delete_by_query' do
expect(migration.completed?).to be_truthy
expect(helper.client).not_to receive(:delete_by_query)
migration.migrate
end
end
context 'merge requests are still present in the index' do
let!(:merge_requests) { create_list(:merge_request, 10, :unique_branches) }
before do
set_elasticsearch_migration_to :migrate_merge_requests_to_separate_index, including: false
ensure_elasticsearch_index!
end
it 'removes merge requests from the index' do
# initiate the task in Elasticsearch
migration.migrate
expect(migration.migration_state).to match(retry_attempt: 0, task_id: anything)
task_id = migration.migration_state[:task_id]
# the migration might not complete after the initial task is created
# so make sure it actually completes
50.times do |_| # Max 0.5s waiting
migration.migrate
break if migration.completed?
sleep 0.01
end
# verify clean up of the task from Elasticsearch
expect(migration.migration_state).to match(retry_attempt: 0, task_id: nil)
expect { helper.client.get(index: '.tasks', type: 'task', id: task_id) }.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
end
end
context 'migration fails' do
let(:client) { double('Elasticsearch::Transport::Client') }
before do
allow(migration).to receive(:client).and_return(client)
allow(migration).to receive(:completed?).and_return(false)
end
context 'exception is raised' do
before do
allow(client).to receive(:delete_by_query).and_raise(StandardError)
end
it 'increases retry_attempt' do
migration.set_migration_state(retry_attempt: 1)
expect { migration.migrate }.to raise_error(StandardError)
expect(migration.migration_state).to match(retry_attempt: 2, task_id: nil)
end
it 'fails the migration after too many attempts' do
stub_const('DeleteMergeRequestsFromOriginalIndex::MAX_ATTEMPTS', 2)
# run migration up to the set MAX_ATTEMPTS set in the migration
DeleteMergeRequestsFromOriginalIndex::MAX_ATTEMPTS.times do
expect { migration.migrate }.to raise_error(StandardError)
end
migration.migrate
expect(migration.migration_state).to match(retry_attempt: 2, task_id: nil, halted: true, halted_indexing_unpaused: false)
end
end
context 'es responds with errors' do
before do
allow(client).to receive(:delete_by_query).and_return('task' => 'task_1')
end
context 'when a task throws an error' do
before do
allow(helper).to receive(:task_status).and_return('failures' => ['failed'])
migration.migrate
end
it 'raises an error and increases retry attempt' do
expect { migration.migrate }.to raise_error(/Failed to delete merge requests/)
expect(migration.migration_state).to match(retry_attempt: 1, task_id: nil)
end
end
context 'when delete_by_query throws an error' do
before do
allow(client).to receive(:delete_by_query).and_return('failures' => ['failed'])
end
it 'raises an error and increases retry attempt' do
expect { migration.migrate }.to raise_error(/Failed to delete merge requests/)
expect(migration.migration_state).to match(retry_attempt: 1, task_id: nil)
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment