Commit f3e1d2c6 authored by Stan Hu's avatar Stan Hu Committed by Douglas Barbosa Alexandre

Atomically select replicas that meet LSN requirement

During a merge, we attempt to find a matching merge request with a SHA
using a replica that should be up-to-date with the primary for a given
PostgreSQL log sequence number (LSN). However, there is a race condition
that can happen if service discovery alters the host list after this
check has taken place. This most likely happens when a Web worker
starts:

1. When Rails starts up for the first time, there is a 1-minute or
2-minute delay before service discovery finds replicas
(see https://gitlab.com/gitlab-org/gitlab/-/issues/271575).

2. During this time `LoadBalancer#all_caught_up?` will return
`true`. This will indicate to the Web worker that it can use replicas
and does not have to use the primary.

3. During a request, service discovery may load all the replicas and
change the host list. As a result, the next read may be directed to a
lagging replica.

However, this may cause a merge to fail if it cannot find a match.

When a user merges a merge request, Sidekiq logs the minimum LSN needed
to match a merge request for the API. If we have this LSN, we now:

1. Select from the available list of replicas that meet this LSN
requirement.
2. Store this subset for the given request.
3. Round-robin reads with this subset of replicas.

Relates to https://gitlab.com/gitlab-org/gitlab/-/issues/247857
parent 24e67427
---
name: load_balancing_atomic_replica
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/49294
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/291193
milestone: '13.11'
type: development
group:
default_enabled: false
---
title: Atomically select an available replica if all caught up
merge_request: 49294
author:
type: fixed
......@@ -9,6 +9,7 @@ module EE
TOTAL_METRIC = :gitlab_merge_request_match_total
STALE_METRIC = :gitlab_merge_request_match_stale_secondary
# rubocop:disable Gitlab/ModuleWithInstanceVariables
override :match?
def match?
return super unless ::Gitlab::Database::LoadBalancing.enable?
......@@ -27,11 +28,16 @@ module EE
# report no matching merge requests. To avoid this, we check
# the write location to ensure the replica can make this query.
track_session_metrics do
::Gitlab::Database::LoadBalancing::Sticking.unstick_or_continue_sticking(:project, @project.id) # rubocop:disable Gitlab/ModuleWithInstanceVariables
if ::Feature.enabled?(:load_balancing_atomic_replica, @project)
::Gitlab::Database::LoadBalancing::Sticking.select_valid_host(:project, @project.id)
else
::Gitlab::Database::LoadBalancing::Sticking.unstick_or_continue_sticking(:project, @project.id)
end
end
super
end
# rubocop:disable Gitlab/ModuleWithInstanceVariables
private
......
......@@ -18,7 +18,13 @@ module Gitlab
end
def hosts
@mutex.synchronize { @hosts }
@mutex.synchronize { @hosts.dup }
end
def shuffle
@mutex.synchronize do
unsafe_shuffle
end
end
def length
......@@ -35,9 +41,9 @@ module Gitlab
def hosts=(hosts)
@mutex.synchronize do
@hosts = hosts.shuffle
@hosts = hosts
unsafe_shuffle
update_pools
@index = 0
end
set_metrics!
......@@ -52,6 +58,11 @@ module Gitlab
private
def unsafe_shuffle
@hosts = @hosts.shuffle
@index = 0
end
# Returns the next available host.
#
# Returns a Gitlab::Database::LoadBalancing::Host instance, or nil if no
......
......@@ -12,6 +12,7 @@ module Gitlab
# always returns a connection to the primary.
class LoadBalancer
CACHE_KEY = :gitlab_load_balancer_host
VALID_HOSTS_CACHE_KEY = :gitlab_load_balancer_valid_hosts
attr_reader :host_list
......@@ -117,7 +118,7 @@ module Gitlab
# Hosts are scoped per thread so that multiple threads don't
# accidentally re-use the same host + connection.
def host
RequestStore[CACHE_KEY] ||= @host_list.next
RequestStore[CACHE_KEY] ||= current_host_list.next
end
# Releases the host and connection for the current thread.
......@@ -128,6 +129,7 @@ module Gitlab
end
RequestStore.delete(CACHE_KEY)
RequestStore.delete(VALID_HOSTS_CACHE_KEY)
end
def release_primary_connection
......@@ -151,6 +153,33 @@ module Gitlab
@host_list.hosts.all? { |host| host.caught_up?(location) }
end
# Returns true if there was at least one host that has caught up with the given transaction.
#
# In case of a retry, this method also stores the set of hosts that have caught up.
def select_caught_up_hosts(location)
all_hosts = @host_list.hosts
valid_hosts = all_hosts.select { |host| host.caught_up?(location) }
return false if valid_hosts.empty?
# Hosts can come online after the time when this scan was done,
# so we need to remember the ones that can be used. If the host went
# offline, we'll just rely on the retry mechanism to use the primary.
set_valid_hosts(HostList.new(valid_hosts))
# Since we will be using a subset from the original list, let's just
# pick a random host and mix up the original list to ensure we don't
# only end up using one replica.
RequestStore[CACHE_KEY] = valid_hosts.sample
@host_list.shuffle
true
end
def set_valid_hosts(hosts)
RequestStore[VALID_HOSTS_CACHE_KEY] = hosts
end
# Yields a block, retrying it upon error using an exponential backoff.
def retry_with_backoff(retries = 3, time = 2)
retried = 0
......@@ -222,6 +251,10 @@ module Gitlab
@connection_db_roles_count.delete(connection)
end
end
def current_host_list
RequestStore[VALID_HOSTS_CACHE_KEY] || @host_list
end
end
end
end
......
......@@ -27,7 +27,7 @@ module Gitlab
stick(namespace, id) if Session.current.performed_write?
end
# Checks if we were able to caught-up with all the work
# Checks if we are caught-up with all the work
def self.all_caught_up?(namespace, id)
location = last_write_location_for(namespace, id)
......@@ -38,12 +38,38 @@ module Gitlab
end
end
# Selects hosts that have caught up with the primary. This ensures
# atomic selection of the host to prevent the host list changing
# in another thread.
#
# Returns true if one host was selected.
def self.select_caught_up_replicas(namespace, id)
location = last_write_location_for(namespace, id)
# Unlike all_caught_up?, we return false if no write location exists.
# We want to be sure we talk to a replica that has caught up for a specific
# write location. If no such location exists, err on the side of caution.
return false unless location
load_balancer.select_caught_up_hosts(location).tap do |selected|
unstick(namespace, id) if selected
end
end
# Sticks to the primary if necessary, otherwise unsticks an object (if
# it was previously stuck to the primary).
def self.unstick_or_continue_sticking(namespace, id)
Session.current.use_primary! unless all_caught_up?(namespace, id)
end
# Select a replica that has caught up with the primary. If one has not been
# found, stick to the primary.
def self.select_valid_host(namespace, id)
replica_selected = select_caught_up_replicas(namespace, id)
Session.current.use_primary! unless replica_selected
end
# Starts sticking to the primary for the given namespace and id, using
# the latest WAL pointer from the primary.
def self.stick(namespace, id)
......
......@@ -25,6 +25,7 @@ RSpec.describe Gitlab::Checks::MatchingMergeRequest do
before do
expect(::Gitlab::Database::LoadBalancing).to receive(:enable?).at_least(:once).and_return(false)
expect(::Gitlab::Database::LoadBalancing::Sticking).not_to receive(:unstick_or_continue_sticking)
expect(::Gitlab::Database::LoadBalancing::Sticking).not_to receive(:select_valid_replicas)
end
it 'does not attempt to stick to primary' do
......@@ -44,11 +45,10 @@ RSpec.describe Gitlab::Checks::MatchingMergeRequest do
before do
expect(::Gitlab::Database::LoadBalancing).to receive(:enable?).at_least(:once).and_return(true)
expect(::Gitlab::Database::LoadBalancing::Sticking).to receive(:unstick_or_continue_sticking).and_call_original
allow(::Gitlab::Database::LoadBalancing::Sticking).to receive(:all_caught_up?).and_return(all_caught_up)
end
context 'on secondary that has caught up to primary' do
shared_examples 'secondary that has caught up to a primary' do
it 'continues to use the secondary' do
expect(session.use_primary?).to be false
expect(subject.match?).to be true
......@@ -61,9 +61,7 @@ RSpec.describe Gitlab::Checks::MatchingMergeRequest do
end
end
context 'on secondary behind primary' do
let(:all_caught_up) { false }
shared_examples 'secondary that is lagging primary' do
it 'sticks to the primary' do
expect(subject.match?).to be true
expect(session.use_primary?).to be true
......@@ -75,6 +73,41 @@ RSpec.describe Gitlab::Checks::MatchingMergeRequest do
.and change { stale_counter.get }.by(1)
end
end
context 'with load_balancing_atomic_replica feature flag enabled' do
before do
stub_feature_flags(load_balancing_atomic_replica: true)
expect(::Gitlab::Database::LoadBalancing::Sticking).to receive(:select_valid_host).with(:project, project.id).and_call_original
allow(::Gitlab::Database::LoadBalancing::Sticking).to receive(:select_caught_up_replicas).with(:project, project.id).and_return(all_caught_up)
end
it_behaves_like 'secondary that has caught up to a primary'
context 'on secondary behind primary' do
let(:all_caught_up) { false }
it_behaves_like 'secondary that is lagging primary'
end
end
context 'with load_balancing_atomic_replica feature flag disabled' do
before do
stub_feature_flags(load_balancing_atomic_replica: false)
expect(::Gitlab::Database::LoadBalancing::Sticking).not_to receive(:select_valid_host)
expect(::Gitlab::Database::LoadBalancing::Sticking).to receive(:unstick_or_continue_sticking).and_call_original
allow(::Gitlab::Database::LoadBalancing::Sticking).to receive(:all_caught_up?).and_return(all_caught_up)
end
it_behaves_like 'secondary that has caught up to a primary'
context 'on secondary behind primary' do
let(:all_caught_up) { false }
it_behaves_like 'secondary that is lagging primary'
end
end
end
end
end
......@@ -14,9 +14,10 @@ RSpec.describe Gitlab::Database::LoadBalancing::HostList do
end
let(:load_balancer) { double(:load_balancer) }
let(:host_count) { 2 }
let(:host_list) do
hosts = Array.new(2) do
hosts = Array.new(host_count) do
Gitlab::Database::LoadBalancing::Host.new('localhost', load_balancer, port: 5432)
end
......@@ -111,6 +112,15 @@ RSpec.describe Gitlab::Database::LoadBalancing::HostList do
end
end
describe '#hosts' do
it 'returns a copy of the host' do
first = host_list.hosts
expect(host_list.hosts).to eq(first)
expect(host_list.hosts.object_id).not_to eq(first.object_id)
end
end
describe '#hosts=' do
it 'updates the list of hosts to use' do
host_list.hosts = [
......@@ -160,4 +170,19 @@ RSpec.describe Gitlab::Database::LoadBalancing::HostList do
expect(described_class.new.next).to be_nil
end
end
describe '#shuffle' do
let(:host_count) { 3 }
it 'randomizes the list' do
2.times do
all_hosts = host_list.hosts
host_list.shuffle
expect(host_list.length).to eq(host_count)
expect(host_list.hosts).to contain_exactly(*all_hosts)
end
end
end
end
......@@ -262,6 +262,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::LoadBalancer, :request_store do
it 'stores the host in a thread-local variable' do
RequestStore.delete(described_class::CACHE_KEY)
RequestStore.delete(described_class::VALID_HOSTS_CACHE_KEY)
expect(lb.host_list).to receive(:next).once.and_call_original
......@@ -279,6 +280,7 @@ RSpec.describe Gitlab::Database::LoadBalancing::LoadBalancer, :request_store do
lb.release_host
expect(RequestStore[described_class::CACHE_KEY]).to be_nil
expect(RequestStore[described_class::VALID_HOSTS_CACHE_KEY]).to be_nil
end
end
......@@ -432,4 +434,58 @@ RSpec.describe Gitlab::Database::LoadBalancing::LoadBalancer, :request_store do
expect(lb.serialization_failure?(wrapped)).to eq(true)
end
end
describe '#select_caught_up_hosts' do
let(:location) { 'AB/12345'}
let(:hosts) { lb.host_list.hosts }
let(:valid_host_list) { RequestStore[described_class::VALID_HOSTS_CACHE_KEY] }
let(:valid_hosts) { valid_host_list.hosts }
subject { lb.select_caught_up_hosts(location) }
context 'when all replicas are caught up' do
before do
expect(hosts).to all(receive(:caught_up?).with(location).and_return(true))
end
it 'returns true and sets all hosts to valid' do
expect(subject).to be true
expect(valid_host_list).to be_a(Gitlab::Database::LoadBalancing::HostList)
expect(valid_hosts).to contain_exactly(*hosts)
end
end
context 'when none of the replicas are caught up' do
before do
expect(hosts).to all(receive(:caught_up?).with(location).and_return(false))
end
it 'returns true and has does not set the valid hosts' do
expect(subject).to be false
expect(valid_host_list).to be_nil
end
end
context 'when one of the replicas is caught up' do
before do
expect(hosts[0]).to receive(:caught_up?).with(location).and_return(false)
expect(hosts[1]).to receive(:caught_up?).with(location).and_return(true)
end
it 'returns true and sets one host to valid' do
expect(subject).to be true
expect(valid_host_list).to be_a(Gitlab::Database::LoadBalancing::HostList)
expect(valid_hosts).to contain_exactly(hosts[1])
end
it 'host always returns the caught-up replica' do
subject
3.times do
expect(lb.host).to eq(hosts[1])
RequestStore.delete(described_class::CACHE_KEY)
end
end
end
end
end
......@@ -271,4 +271,37 @@ RSpec.describe Gitlab::Database::LoadBalancing::Sticking, :redis do
described_class.load_balancer
end
end
describe '.select_caught_up_replicas' do
let(:lb) { double(:lb) }
before do
allow(described_class).to receive(:load_balancer).and_return(lb)
end
context 'with no write location' do
before do
allow(described_class).to receive(:last_write_location_for)
.with(:project, 42).and_return(nil)
end
it 'returns false and does not try to find caught up hosts' do
expect(described_class).not_to receive(:select_caught_up_hosts)
expect(described_class.select_caught_up_replicas(:project, 42)).to be false
end
end
context 'with write location' do
before do
allow(described_class).to receive(:last_write_location_for)
.with(:project, 42).and_return('foo')
end
it 'returns true, selects hosts, and unsticks if any secondary has caught up' do
expect(lb).to receive(:select_caught_up_hosts).and_return(true)
expect(described_class).to receive(:unstick).with(:project, 42)
expect(described_class.select_caught_up_replicas(:project, 42)).to be true
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment