Commit 9bba9b25 authored by nmilojevic1's avatar nmilojevic1

Address followup items

- Refactor deduplication strategies
- Make .databases to be hash with indifferent access
- Make deduplicate_job more clear
parent dab58c88
...@@ -61,6 +61,7 @@ module Gitlab ...@@ -61,6 +61,7 @@ module Gitlab
def self.databases def self.databases
@databases ||= database_base_models @databases ||= database_base_models
.transform_values { |connection_class| Connection.new(connection_class) } .transform_values { |connection_class| Connection.new(connection_class) }
.with_indifferent_access
.freeze .freeze
end end
......
...@@ -64,9 +64,9 @@ module Gitlab ...@@ -64,9 +64,9 @@ module Gitlab
Sidekiq.redis do |redis| Sidekiq.redis do |redis|
redis.multi do |multi| redis.multi do |multi|
redis.set(idempotency_key, jid, ex: expiry, nx: true) multi.set(idempotency_key, jid, ex: expiry, nx: true)
read_wal_locations = check_existing_wal_locations!(redis, expiry) read_wal_locations = check_existing_wal_locations!(multi, expiry)
read_jid = redis.get(idempotency_key) read_jid = multi.get(idempotency_key)
end end
end end
...@@ -81,9 +81,9 @@ module Gitlab ...@@ -81,9 +81,9 @@ module Gitlab
return unless job_wal_locations.present? return unless job_wal_locations.present?
Sidekiq.redis do |redis| Sidekiq.redis do |redis|
redis.multi do redis.multi do |multi|
job_wal_locations.each do |connection_name, location| job_wal_locations.each do |connection_name, location|
redis.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]) multi.eval(LUA_SET_WAL_SCRIPT, keys: [wal_location_key(connection_name)], argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL])
end end
end end
end end
...@@ -96,9 +96,9 @@ module Gitlab ...@@ -96,9 +96,9 @@ module Gitlab
read_wal_locations = {} read_wal_locations = {}
Sidekiq.redis do |redis| Sidekiq.redis do |redis|
redis.multi do redis.multi do |multi|
job_wal_locations.keys.each do |connection_name| job_wal_locations.keys.each do |connection_name|
read_wal_locations[connection_name] = redis.lindex(wal_location_key(connection_name), 0) read_wal_locations[connection_name] = multi.lindex(wal_location_key(connection_name), 0)
end end
end end
end end
...@@ -110,8 +110,8 @@ module Gitlab ...@@ -110,8 +110,8 @@ module Gitlab
def delete! def delete!
Sidekiq.redis do |redis| Sidekiq.redis do |redis|
redis.multi do |multi| redis.multi do |multi|
redis.del(idempotency_key) multi.del(idempotency_key)
delete_wal_locations!(redis) delete_wal_locations!(multi)
end end
end end
end end
...@@ -147,7 +147,7 @@ module Gitlab ...@@ -147,7 +147,7 @@ module Gitlab
private private
attr_accessor :existing_wal_locations attr_writer :existing_wal_locations
attr_reader :queue_name, :job attr_reader :queue_name, :job
attr_writer :existing_jid attr_writer :existing_jid
...@@ -155,6 +155,31 @@ module Gitlab ...@@ -155,6 +155,31 @@ module Gitlab
@worker_klass ||= worker_class_name.to_s.safe_constantize @worker_klass ||= worker_class_name.to_s.safe_constantize
end end
def delete_wal_locations!(redis)
job_wal_locations.keys.each do |connection_name|
redis.del(wal_location_key(connection_name))
redis.del(existing_wal_location_key(connection_name))
end
end
def check_existing_wal_locations!(redis, expiry)
read_wal_locations = {}
job_wal_locations.each do |connection_name, location|
key = existing_wal_location_key(connection_name)
redis.set(key, location, ex: expiry, nx: true)
read_wal_locations[connection_name] = redis.get(key)
end
read_wal_locations
end
def job_wal_locations
return {} unless preserve_wal_location?
job['wal_locations'] || {}
end
def pg_wal_lsn_diff(connection_name) def pg_wal_lsn_diff(connection_name)
Gitlab::Database.databases[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name]) Gitlab::Database.databases[connection_name].pg_wal_lsn_diff(job_wal_locations[connection_name], existing_wal_locations[connection_name])
end end
...@@ -179,12 +204,6 @@ module Gitlab ...@@ -179,12 +204,6 @@ module Gitlab
job['jid'] job['jid']
end end
def job_wal_locations
return {} unless preserve_wal_location?
job['wal_locations'] || {}
end
def existing_wal_location_key(connection_name) def existing_wal_location_key(connection_name)
"#{idempotency_key}:#{connection_name}:existing_wal_location" "#{idempotency_key}:#{connection_name}:existing_wal_location"
end end
...@@ -209,23 +228,8 @@ module Gitlab ...@@ -209,23 +228,8 @@ module Gitlab
"#{worker_class_name}:#{Sidekiq.dump_json(arguments)}" "#{worker_class_name}:#{Sidekiq.dump_json(arguments)}"
end end
def delete_wal_locations!(redis) def existing_wal_locations
job_wal_locations.keys.each do |connection_name| @existing_wal_locations ||= {}
redis.del(wal_location_key(connection_name))
redis.del(existing_wal_location_key(connection_name))
end
end
def check_existing_wal_locations!(redis, expiry)
read_wal_locations = {}
job_wal_locations.each do |connection_name, location|
key = existing_wal_location_key(connection_name)
redis.set(key, location, ex: expiry, nx: true)
read_wal_locations[connection_name] = redis.get(key)
end
read_wal_locations
end end
def preserve_wal_location? def preserve_wal_location?
......
...@@ -4,11 +4,15 @@ module Gitlab ...@@ -4,11 +4,15 @@ module Gitlab
module SidekiqMiddleware module SidekiqMiddleware
module DuplicateJobs module DuplicateJobs
module Strategies module Strategies
module DeduplicatesWhenScheduling class DeduplicatesWhenScheduling < Base
extend ::Gitlab::Utils::Override
override :initialize
def initialize(duplicate_job) def initialize(duplicate_job)
@duplicate_job = duplicate_job @duplicate_job = duplicate_job
end end
override :schedule
def schedule(job) def schedule(job)
if deduplicatable_job? && check! && duplicate_job.duplicate? if deduplicatable_job? && check! && duplicate_job.duplicate?
job['duplicate-of'] = duplicate_job.existing_jid job['duplicate-of'] = duplicate_job.existing_jid
...@@ -25,6 +29,7 @@ module Gitlab ...@@ -25,6 +29,7 @@ module Gitlab
yield yield
end end
override :perform
def perform(job) def perform(job)
update_job_wal_location!(job) update_job_wal_location!(job)
end end
......
...@@ -7,11 +7,7 @@ module Gitlab ...@@ -7,11 +7,7 @@ module Gitlab
# This strategy takes a lock before scheduling the job in a queue and # This strategy takes a lock before scheduling the job in a queue and
# removes the lock after the job has executed preventing a new job to be queued # removes the lock after the job has executed preventing a new job to be queued
# while a job is still executing. # while a job is still executing.
class UntilExecuted < Base class UntilExecuted < DeduplicatesWhenScheduling
extend ::Gitlab::Utils::Override
include DeduplicatesWhenScheduling
override :perform override :perform
def perform(job) def perform(job)
super super
......
...@@ -7,11 +7,7 @@ module Gitlab ...@@ -7,11 +7,7 @@ module Gitlab
# This strategy takes a lock before scheduling the job in a queue and # This strategy takes a lock before scheduling the job in a queue and
# removes the lock before the job starts allowing a new job to be queued # removes the lock before the job starts allowing a new job to be queued
# while a job is still executing. # while a job is still executing.
class UntilExecuting < Base class UntilExecuting < DeduplicatesWhenScheduling
extend ::Gitlab::Utils::Override
include DeduplicatesWhenScheduling
override :perform override :perform
def perform(job) def perform(job)
super super
......
...@@ -15,6 +15,13 @@ RSpec.describe Gitlab::Database do ...@@ -15,6 +15,13 @@ RSpec.describe Gitlab::Database do
end end
end end
describe '.databases' do
it 'stores connections as a HashWithIndifferentAccess' do
expect(described_class.databases.has_key?('main')).to be true
expect(described_class.databases.has_key?(:main)).to be true
end
end
describe '.default_pool_size' do describe '.default_pool_size' do
before do before do
allow(Gitlab::Runtime).to receive(:max_threads).and_return(7) allow(Gitlab::Runtime).to receive(:max_threads).and_return(7)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment