Commit 3487b4db authored by Sean McGivern's avatar Sean McGivern

Merge branch 'replication-slot-multiple-databases' into 'master'

Refactor Postgresql::ReplicationSlot to support multiple databases

See merge request gitlab-org/gitlab!66471
parents 894ce317 e51d78bf
...@@ -39,5 +39,55 @@ module Postgresql ...@@ -39,5 +39,55 @@ module Postgresql
false false
end end
end end
def self.count
connection
.execute("SELECT COUNT(*) FROM pg_replication_slots;")
.first
.fetch('count')
.to_i
end
def self.unused_slots_count
connection
.execute("SELECT COUNT(*) FROM pg_replication_slots WHERE active = 'f';")
.first
.fetch('count')
.to_i
end
def self.used_slots_count
connection
.execute("SELECT COUNT(*) FROM pg_replication_slots WHERE active = 't';")
.first
.fetch('count')
.to_i
end
# array of slots and the retained_bytes
# https://www.skillslogic.com/blog/databases/checking-postgres-replication-lag
# http://bdr-project.org/docs/stable/monitoring-peers.html
def self.slots_retained_bytes
connection.execute(<<-SQL.squish).to_a
SELECT slot_name, database,
active, pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn)
AS retained_bytes
FROM pg_replication_slots;
SQL
end
# returns the max number WAL space (in bytes) being used across the replication slots
def self.max_retained_wal
connection.execute(<<-SQL.squish).first.fetch('coalesce').to_i
SELECT COALESCE(MAX(pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn)), 0)
FROM pg_replication_slots;
SQL
end
def self.max_replication_slots
connection.execute(<<-SQL.squish).first&.fetch('setting').to_i
SELECT setting FROM pg_settings WHERE name = 'max_replication_slots';
SQL
end
end end
end end
...@@ -299,19 +299,19 @@ class GeoNode < ApplicationRecord ...@@ -299,19 +299,19 @@ class GeoNode < ApplicationRecord
def replication_slots_count def replication_slots_count
return unless primary? return unless primary?
PgReplicationSlot.count Postgresql::ReplicationSlot.count
end end
def replication_slots_used_count def replication_slots_used_count
return unless primary? return unless primary?
PgReplicationSlot.used_slots_count Postgresql::ReplicationSlot.used_slots_count
end end
def replication_slots_max_retained_wal_bytes def replication_slots_max_retained_wal_bytes
return unless primary? return unless primary?
PgReplicationSlot.max_retained_wal Postgresql::ReplicationSlot.max_retained_wal
end end
def find_or_build_status def find_or_build_status
......
# frozen_string_literal: true
# `pg_replication_slots` is a PostgreSQL view
class PgReplicationSlot
def self.count
ApplicationRecord.connection.execute("SELECT COUNT(*) FROM pg_replication_slots;")
.first.fetch('count').to_i
end
def self.unused_slots_count
ApplicationRecord.connection.execute("SELECT COUNT(*) FROM pg_replication_slots WHERE active = 'f';")
.first.fetch('count').to_i
end
def self.used_slots_count
ApplicationRecord.connection.execute("SELECT COUNT(*) FROM pg_replication_slots WHERE active = 't';")
.first.fetch('count').to_i
end
# array of slots and the retained_bytes
# https://www.skillslogic.com/blog/databases/checking-postgres-replication-lag
# http://bdr-project.org/docs/stable/monitoring-peers.html
def self.slots_retained_bytes
ApplicationRecord.connection.execute(<<-SQL.squish)
SELECT slot_name, database,
active, pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn)
AS retained_bytes
FROM pg_replication_slots;
SQL
.to_a
end
# returns the max number WAL space (in bytes) being used across the replication slots
def self.max_retained_wal
ApplicationRecord.connection.execute(<<-SQL.squish)
SELECT COALESCE(MAX(pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn)), 0)
FROM pg_replication_slots;
SQL
.first.fetch('coalesce').to_i
end
def self.max_replication_slots
ApplicationRecord.connection.execute(<<-SQL.squish)
SELECT setting FROM pg_settings WHERE name = 'max_replication_slots';
SQL
.first&.fetch('setting').to_i
end
end
...@@ -17,7 +17,7 @@ module Geo ...@@ -17,7 +17,7 @@ module Geo
def perform def perform
return if Gitlab::Database.read_only? return if Gitlab::Database.read_only?
return unless Gitlab::Database.main.healthy? return if Postgresql::ReplicationSlot.lag_too_great?
unless ::GeoNode.secondary_nodes.any? unless ::GeoNode.secondary_nodes.any?
Geo::PruneEventLogService.new(:all).execute Geo::PruneEventLogService.new(:all).execute
......
...@@ -6,10 +6,6 @@ module EE ...@@ -6,10 +6,6 @@ module EE
module Connection module Connection
extend ActiveSupport::Concern extend ActiveSupport::Concern
def healthy?
!Postgresql::ReplicationSlot.lag_too_great?
end
def geo_uncached_queries(&block) def geo_uncached_queries(&block)
raise 'No block given' unless block_given? raise 'No block given' unless block_given?
......
...@@ -7,20 +7,6 @@ RSpec.describe Gitlab::Database::Connection do ...@@ -7,20 +7,6 @@ RSpec.describe Gitlab::Database::Connection do
let(:connection) { described_class.new } let(:connection) { described_class.new }
describe '#healthy?' do
it 'returns true when replication lag is not too great' do
allow(Postgresql::ReplicationSlot).to receive(:lag_too_great?).and_return(false)
expect(connection.healthy?).to be_truthy
end
it 'returns false when replication lag is too great' do
allow(Postgresql::ReplicationSlot).to receive(:lag_too_great?).and_return(true)
expect(connection.healthy?).to be_falsey
end
end
describe '#geo_uncached_queries' do describe '#geo_uncached_queries' do
context 'when no block is given' do context 'when no block is given' do
it 'raises error' do it 'raises error' do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe PgReplicationSlot do
it '#max_replication_slots' do
expect(described_class.max_replication_slots).to be >= 0
end
skip_examples = described_class.max_replication_slots <= described_class.count
context 'with enough slots available' do
before(:all) do
skip('max_replication_slots too small') if skip_examples
@current_slot_count =
ActiveRecord::Base.connection.execute("SELECT COUNT(*) FROM pg_replication_slots;")
.first.fetch('count').to_i
@current_unused_count =
ActiveRecord::Base.connection.execute("SELECT COUNT(*) FROM pg_replication_slots WHERE active = 'f';")
.first.fetch('count').to_i
ActiveRecord::Base.connection.execute("SELECT * FROM pg_create_physical_replication_slot('test_slot');")
end
after(:all) do
unless skip_examples
ActiveRecord::Base.connection.execute("SELECT pg_drop_replication_slot('test_slot');")
end
end
it '#slots_count' do
expect(described_class.count).to eq(@current_slot_count + 1)
end
it '#unused_slots_count' do
expect(described_class.unused_slots_count).to eq(@current_unused_count + 1)
end
it '#max_retained_wal' do
expect(described_class.max_retained_wal).not_to be_nil
end
it '#slots_retained_bytes' do
slot = described_class.slots_retained_bytes.find {|x| x['slot_name'] == 'test_slot' }
expect(slot).not_to be_nil
expect(slot['retained_bytes']).to be_nil
end
end
end
...@@ -29,8 +29,11 @@ RSpec.describe Geo::PruneEventLogWorker, :geo do ...@@ -29,8 +29,11 @@ RSpec.describe Geo::PruneEventLogWorker, :geo do
end end
it 'does nothing when database is not feeling healthy' do it 'does nothing when database is not feeling healthy' do
allow(Gitlab::Database.main).to receive(:healthy?).and_return(false) allow(Postgresql::ReplicationSlot)
.to receive(:lag_too_great?)
.and_return(true)
expect(GeoNode).not_to receive(:secondary_nodes)
expect(Geo::PruneEventLogService).not_to receive(:new) expect(Geo::PruneEventLogService).not_to receive(:new)
worker.perform worker.perform
......
...@@ -60,4 +60,71 @@ RSpec.describe Postgresql::ReplicationSlot do ...@@ -60,4 +60,71 @@ RSpec.describe Postgresql::ReplicationSlot do
expect(described_class.lag_too_great?).to eq(false) expect(described_class.lag_too_great?).to eq(false)
end end
end end
describe '#max_replication_slots' do
it 'returns the maximum number of replication slots' do
expect(described_class.max_replication_slots).to be >= 0
end
end
context 'with enough slots available' do
skip_examples = described_class.max_replication_slots <= described_class.count
before(:all) do
skip('max_replication_slots too small') if skip_examples
@current_slot_count = ApplicationRecord
.connection
.execute("SELECT COUNT(*) FROM pg_replication_slots;")
.first
.fetch('count')
.to_i
@current_unused_count = ApplicationRecord
.connection
.execute("SELECT COUNT(*) FROM pg_replication_slots WHERE active = 'f';")
.first
.fetch('count')
.to_i
ApplicationRecord
.connection
.execute("SELECT * FROM pg_create_physical_replication_slot('test_slot');")
end
after(:all) do
unless skip_examples
ApplicationRecord
.connection
.execute("SELECT pg_drop_replication_slot('test_slot');")
end
end
describe '#slots_count' do
it 'returns the number of replication slots' do
expect(described_class.count).to eq(@current_slot_count + 1)
end
end
describe '#unused_slots_count' do
it 'returns the number of unused replication slots' do
expect(described_class.unused_slots_count).to eq(@current_unused_count + 1)
end
end
describe '#max_retained_wal' do
it 'returns the retained WAL size' do
expect(described_class.max_retained_wal).not_to be_nil
end
end
describe '#slots_retained_bytes' do
it 'returns the number of retained bytes' do
slot = described_class.slots_retained_bytes.find {|x| x['slot_name'] == 'test_slot' }
expect(slot).not_to be_nil
expect(slot['retained_bytes']).to be_nil
end
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment