Commit 0f9f1b99 authored by Terri Chu's avatar Terri Chu Committed by Dmitry Gruzd

Fix Elastic::MigrationWorker current_migration (2nd attempt)

parent dec0df95
...@@ -65,8 +65,6 @@ module Elastic ...@@ -65,8 +65,6 @@ module Elastic
.search(index: helper.migrations_index_name, body: { query: { term: { completed: completed } }, size: ELASTICSEARCH_SIZE }) .search(index: helper.migrations_index_name, body: { query: { term: { completed: completed } }, size: ELASTICSEARCH_SIZE })
.dig('hits', 'hits') .dig('hits', 'hits')
.map { |v| v['_id'].to_i } .map { |v| v['_id'].to_i }
rescue StandardError
[]
end end
def running? def running?
......
...@@ -21,6 +21,13 @@ module Elastic ...@@ -21,6 +21,13 @@ module Elastic
return false unless helper.alias_exists? return false unless helper.alias_exists?
in_lock(self.class.name.underscore, ttl: 1.day, retries: 10, sleep_sec: 1) do in_lock(self.class.name.underscore, ttl: 1.day, retries: 10, sleep_sec: 1) do
# migration index should be checked before pulling the current_migration because if no migrations_index exists,
# current_migration will return nil
unless helper.migrations_index_exists?
logger.info 'MigrationWorker: creating migrations index'
helper.create_migrations_index
end
migration = current_migration migration = current_migration
unless migration unless migration
...@@ -28,11 +35,6 @@ module Elastic ...@@ -28,11 +35,6 @@ module Elastic
break false break false
end end
unless helper.migrations_index_exists?
logger.info 'MigrationWorker: creating migrations index'
helper.create_migrations_index
end
if migration.halted? if migration.halted?
logger.info "MigrationWorker: migration[#{migration.name}] has been halted. All future migrations will be halted because of that. Exiting" logger.info "MigrationWorker: migration[#{migration.name}] has been halted. All future migrations will be halted because of that. Exiting"
unpause_indexing!(migration) unpause_indexing!(migration)
...@@ -89,7 +91,12 @@ module Elastic ...@@ -89,7 +91,12 @@ module Elastic
def current_migration def current_migration
completed_migrations = Elastic::MigrationRecord.load_versions(completed: true) completed_migrations = Elastic::MigrationRecord.load_versions(completed: true)
# use a negative condition to support new migrations which do not exist in the index yet
Elastic::DataMigrationService.migrations.find { |migration| !completed_migrations.include?(migration.version) } Elastic::DataMigrationService.migrations.find { |migration| !completed_migrations.include?(migration.version) }
rescue StandardError => e
# do not return a migration if there is an issue communicating with the Elasticsearch instance
logger.error("MigrationWorker: #{e.class}: #{e.message}")
nil
end end
def pause_indexing!(migration) def pause_indexing!(migration)
......
...@@ -42,13 +42,13 @@ RSpec.describe Elastic::MigrationRecord, :elastic do ...@@ -42,13 +42,13 @@ RSpec.describe Elastic::MigrationRecord, :elastic do
end end
describe '#load_from_index' do describe '#load_from_index' do
it 'does not raise an exeption when connection refused' do it 'does not raise an exception when connection refused' do
allow(Gitlab::Elastic::Helper.default).to receive(:get).and_raise(Faraday::ConnectionFailed) allow(Gitlab::Elastic::Helper.default).to receive(:get).and_raise(Faraday::ConnectionFailed)
expect(record.load_from_index).to be_nil expect(record.load_from_index).to be_nil
end end
it 'does not raise an exeption when record does not exist' do it 'does not raise an exception when record does not exist' do
allow(Gitlab::Elastic::Helper.default).to receive(:get).and_raise(Elasticsearch::Transport::Transport::Errors::NotFound) allow(Gitlab::Elastic::Helper.default).to receive(:get).and_raise(Elasticsearch::Transport::Transport::Errors::NotFound)
expect(record.load_from_index).to be_nil expect(record.load_from_index).to be_nil
...@@ -95,18 +95,18 @@ RSpec.describe Elastic::MigrationRecord, :elastic do ...@@ -95,18 +95,18 @@ RSpec.describe Elastic::MigrationRecord, :elastic do
expect(described_class.load_versions(completed: false)).to contain_exactly(in_progress_migration.version) expect(described_class.load_versions(completed: false)).to contain_exactly(in_progress_migration.version)
end end
it 'returns empty array if no index present' do it 'raises an exception if no index present' do
es_helper.delete_migrations_index es_helper.delete_migrations_index
expect(described_class.load_versions(completed: true)).to eq([]) expect { described_class.load_versions(completed: true) }.to raise_exception(Elasticsearch::Transport::Transport::Errors::NotFound)
expect(described_class.load_versions(completed: false)).to eq([]) expect { described_class.load_versions(completed: false) }.to raise_exception(Elasticsearch::Transport::Transport::Errors::NotFound)
end end
it 'returns empty array when exception is raised' do it 'raises an exception when exception is raised' do
allow(Gitlab::Elastic::Helper.default.client).to receive(:search).and_raise(Faraday::ConnectionFailed) allow(Gitlab::Elastic::Helper.default.client).to receive(:search).and_raise(Faraday::ConnectionFailed)
expect(described_class.load_versions(completed: true)).to eq([]) expect { described_class.load_versions(completed: true) }.to raise_exception(StandardError)
expect(described_class.load_versions(completed: false)).to eq([]) expect { described_class.load_versions(completed: false) }.to raise_exception(StandardError)
end end
end end
......
...@@ -69,7 +69,6 @@ RSpec.describe 'gitlab:elastic namespace rake tasks', :elastic, :silence_stdout ...@@ -69,7 +69,6 @@ RSpec.describe 'gitlab:elastic namespace rake tasks', :elastic, :silence_stdout
it 'marks all migrations as completed' do it 'marks all migrations as completed' do
expect(Elastic::DataMigrationService).to receive(:mark_all_as_completed!).and_call_original expect(Elastic::DataMigrationService).to receive(:mark_all_as_completed!).and_call_original
expect(Elastic::MigrationRecord.load_versions(completed: true)).to eq([])
subject subject
refresh_index! refresh_index!
......
...@@ -22,168 +22,185 @@ RSpec.describe Elastic::MigrationWorker, :elastic do ...@@ -22,168 +22,185 @@ RSpec.describe Elastic::MigrationWorker, :elastic do
before do before do
stub_ee_application_setting(elasticsearch_indexing: true) stub_ee_application_setting(elasticsearch_indexing: true)
allow(subject).to receive(:current_migration).and_return(migration)
end end
it 'creates an index if it does not exist' do context 'an unexecuted migration present' do
Gitlab::Elastic::Helper.default.delete_migrations_index
expect { subject.perform }.to change { Gitlab::Elastic::Helper.default.migrations_index_exists? }.from(false).to(true)
end
context 'no unexecuted migrations' do
before do before do
allow(subject).to receive(:current_migration).and_return(nil) allow(subject).to receive(:current_migration).and_return(migration)
end end
it 'skips execution' do it 'creates an index if it does not exist' do
expect(subject).not_to receive(:execute_migration) Gitlab::Elastic::Helper.default.delete_migrations_index
expect(subject.perform).to be_falsey expect { subject.perform }.to change { Gitlab::Elastic::Helper.default.migrations_index_exists? }.from(false).to(true)
end end
end
context 'migration is halted' do context 'migration is halted' do
using RSpec::Parameterized::TableSyntax using RSpec::Parameterized::TableSyntax
where(:pause_indexing, :halted_indexing_unpaused, :unpause) do where(:pause_indexing, :halted_indexing_unpaused, :unpause) do
false | false | false false | false | false
false | true | false false | true | false
true | false | true true | false | true
true | true | false true | true | false
end
with_them do
before do
allow(Gitlab::CurrentSettings).to receive(:elasticsearch_pause_indexing?).and_return(true)
allow(migration).to receive(:pause_indexing?).and_return(true)
migration.save_state!(halted: true, pause_indexing: pause_indexing, halted_indexing_unpaused: halted_indexing_unpaused)
end end
it 'unpauses indexing' do with_them do
if unpause before do
expect(Gitlab::CurrentSettings).to receive(:update!).with(elasticsearch_pause_indexing: false) allow(Gitlab::CurrentSettings).to receive(:elasticsearch_pause_indexing?).and_return(true)
else allow(migration).to receive(:pause_indexing?).and_return(true)
expect(Gitlab::CurrentSettings).not_to receive(:update!) migration.save_state!(halted: true, pause_indexing: pause_indexing, halted_indexing_unpaused: halted_indexing_unpaused)
end end
expect(migration).not_to receive(:migrate) it 'unpauses indexing' do
if unpause
expect(Gitlab::CurrentSettings).to receive(:update!).with(elasticsearch_pause_indexing: false)
else
expect(Gitlab::CurrentSettings).not_to receive(:update!)
end
subject.perform expect(migration).not_to receive(:migrate)
subject.perform
end
end end
end end
end
context 'migration process' do context 'migration process' do
before do before do
allow(migration).to receive(:started?).and_return(started) allow(migration).to receive(:started?).and_return(started)
allow(migration).to receive(:completed?).and_return(completed) allow(migration).to receive(:completed?).and_return(completed)
allow(migration).to receive(:batched?).and_return(batched) allow(migration).to receive(:batched?).and_return(batched)
end end
using RSpec::Parameterized::TableSyntax using RSpec::Parameterized::TableSyntax
# completed is evaluated after migrate method is executed # completed is evaluated after migrate method is executed
where(:started, :completed, :execute_migration, :batched) do where(:started, :completed, :execute_migration, :batched) do
false | false | true | false false | false | true | false
false | true | true | false false | true | true | false
false | false | true | true false | false | true | true
false | true | true | true false | true | true | true
true | false | false | false true | false | false | false
true | true | false | false true | true | false | false
true | false | true | true true | false | true | true
true | true | true | true true | true | true | true
end end
with_them do with_them do
it 'calls migration only when needed', :aggregate_failures do it 'calls migration only when needed', :aggregate_failures do
if execute_migration if execute_migration
expect(migration).to receive(:migrate).once expect(migration).to receive(:migrate).once
else else
expect(migration).not_to receive(:migrate) expect(migration).not_to receive(:migrate)
end
expect(migration).to receive(:save!).with(completed: completed)
expect(Elastic::DataMigrationService).to receive(:drop_migration_has_finished_cache!).with(migration)
subject.perform
end end
expect(migration).to receive(:save!).with(completed: completed) it 'handles batched migrations' do
expect(Elastic::DataMigrationService).to receive(:drop_migration_has_finished_cache!).with(migration) if batched && !completed
# default throttle_delay is 5.minutes
expect( Elastic::MigrationWorker).to receive(:perform_in)
.with(5.minutes)
else
expect( Elastic::MigrationWorker).not_to receive(:perform_in)
end
subject.perform subject.perform
end
end end
it 'handles batched migrations' do context 'indexing pause' do
if batched && !completed before do
# default throttle_delay is 5.minutes allow(migration).to receive(:pause_indexing?).and_return(true)
expect( Elastic::MigrationWorker).to receive(:perform_in)
.with(5.minutes)
else
expect( Elastic::MigrationWorker).not_to receive(:perform_in)
end end
subject.perform let(:batched) { true }
end
end
context 'indexing pause' do where(:started, :completed, :expected) do
before do false | false | false
allow(migration).to receive(:pause_indexing?).and_return(true) true | false | false
end true | true | true
end
let(:batched) { true } with_them do
it 'pauses and unpauses indexing' do
expect(Gitlab::CurrentSettings).to receive(:update!).with(elasticsearch_pause_indexing: true)
expect(Gitlab::CurrentSettings).to receive(:update!).with(elasticsearch_pause_indexing: false) if expected
where(:started, :completed, :expected) do subject.perform
false | false | false end
true | false | false end
true | true | true
end end
with_them do context 'checks space required' do
it 'pauses and unpauses indexing' do let(:helper) { Gitlab::Elastic::Helper.new }
expect(Gitlab::CurrentSettings).to receive(:update!).with(elasticsearch_pause_indexing: true) let(:started) { false }
expect(Gitlab::CurrentSettings).to receive(:update!).with(elasticsearch_pause_indexing: false) if expected let(:completed) { false }
let(:batched) { false }
before do
allow(Gitlab::Elastic::Helper).to receive(:default).and_return(helper)
allow(migration).to receive(:space_requirements?).and_return(true)
allow(migration).to receive(:space_required_bytes).and_return(10)
end
it 'halts the migration if there is not enough space' do
allow(helper).to receive(:cluster_free_size_bytes).and_return(5)
expect(migration).to receive(:halt!)
expect(migration).not_to receive(:migrate)
subject.perform subject.perform
end end
end
end
context 'checks space required' do it 'runs the migration if there is enough space' do
let(:helper) { Gitlab::Elastic::Helper.new } allow(helper).to receive(:cluster_free_size_bytes).and_return(20)
let(:started) { false } expect(migration).not_to receive(:halt!)
let(:completed) { false } expect(migration).to receive(:migrate).once
let(:batched) { false }
before do subject.perform
allow(Gitlab::Elastic::Helper).to receive(:default).and_return(helper) end
allow(migration).to receive(:space_requirements?).and_return(true)
allow(migration).to receive(:space_required_bytes).and_return(10)
end
it 'halts the migration if there is not enough space' do context 'when migration is already started' do
allow(helper).to receive(:cluster_free_size_bytes).and_return(5) let(:started) { true }
expect(migration).to receive(:halt!)
expect(migration).not_to receive(:migrate)
subject.perform it 'does not check space requirements' do
expect(helper).not_to receive(:cluster_free_size_bytes)
expect(migration).not_to receive(:space_required_bytes)
subject.perform
end
end
end end
end
end
it 'runs the migration if there is enough space' do context 'no unexecuted migrations' do
allow(helper).to receive(:cluster_free_size_bytes).and_return(20) before do
expect(migration).not_to receive(:halt!) allow(subject).to receive(:current_migration).and_return(nil)
expect(migration).to receive(:migrate).once end
subject.perform it 'skips execution' do
end expect(subject).not_to receive(:execute_migration)
context 'when migration is already started' do expect(subject.perform).to be_falsey
let(:started) { true } end
end
it 'does not check space requirements' do context 'no executed migrations' do
expect(helper).not_to receive(:cluster_free_size_bytes) before do
expect(migration).not_to receive(:space_required_bytes) allow(Elastic::MigrationRecord).to receive(:load_versions).and_return([])
allow(Elastic::DataMigrationService).to receive(:migrations).and_return([migration])
end
subject.perform it 'executes the first migration' do
end expect(subject).to receive(:execute_migration).with(migration)
end
subject.perform
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment