Require a lease to perform repositories cleanup on secondaries nodes

parent 34437208
...@@ -4,16 +4,20 @@ module Geo ...@@ -4,16 +4,20 @@ module Geo
include GeoQueue include GeoQueue
BATCH_SIZE = 250 BATCH_SIZE = 250
LEASE_TIMEOUT = 60.minutes
def perform(geo_node_id) def perform(geo_node_id)
geo_node = GeoNode.find(geo_node_id) # Prevent multiple Sidekiq workers from performing repositories clean up
try_obtain_lease do
geo_node = GeoNode.find(geo_node_id)
restricted_project_ids = geo_node.project_ids restricted_project_ids = geo_node.project_ids
return unless restricted_project_ids return unless restricted_project_ids
Project.where.not(id: restricted_project_ids).find_in_batches(batch_size: BATCH_SIZE) do |batch| Project.where.not(id: restricted_project_ids).find_in_batches(batch_size: BATCH_SIZE) do |batch|
batch.each do |project| batch.each do |project|
clean_up_repositories(project) clean_up_repositories(project)
end
end end
end end
rescue ActiveRecord::RecordNotFound => e rescue ActiveRecord::RecordNotFound => e
...@@ -32,6 +36,33 @@ module Geo ...@@ -32,6 +36,33 @@ module Geo
end end
end end
def try_obtain_lease
lease = exclusive_lease.try_obtain
unless lease
log_error('Cannot obtain an exclusive lease. There must be another worker already in execution.')
return
end
begin
yield lease
ensure
release_lease(lease)
end
end
def lease_key
@lease_key ||= self.class.name.underscore
end
def exclusive_lease
@lease ||= Gitlab::ExclusiveLease.new(lease_key, timeout: LEASE_TIMEOUT)
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(lease_key, uuid)
end
def log_info(message, params = {}) def log_info(message, params = {})
Gitlab::Geo::Logger.info({ class: self.class.name, message: message }.merge(params)) Gitlab::Geo::Logger.info({ class: self.class.name, message: message }.merge(params))
end end
......
...@@ -7,6 +7,10 @@ describe Geo::RepositoriesCleanUpWorker do ...@@ -7,6 +7,10 @@ describe Geo::RepositoriesCleanUpWorker do
let!(:project_2) { create(:empty_project) } let!(:project_2) { create(:empty_project) }
describe '#perform' do describe '#perform' do
before do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { true }
end
context 'when node has namespace restrictions' do context 'when node has namespace restrictions' do
it 'performs GeoRepositoryDestroyWorker for each project that do not belong to selected namespaces to replicate' do it 'performs GeoRepositoryDestroyWorker for each project that do not belong to selected namespaces to replicate' do
geo_node.update_attribute(:namespaces, [group]) geo_node.update_attribute(:namespaces, [group])
...@@ -27,6 +31,16 @@ describe Geo::RepositoriesCleanUpWorker do ...@@ -27,6 +31,16 @@ describe Geo::RepositoriesCleanUpWorker do
end end
end end
context 'when cannnot obtain a lease' do
it 'does not perform GeoRepositoryDestroyWorker' do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { false }
expect(GeoRepositoryDestroyWorker).not_to receive(:perform_async)
subject.perform(geo_node.id)
end
end
context 'when Geo node could not be found' do context 'when Geo node could not be found' do
it 'does not raise an error' do it 'does not raise an error' do
expect { subject.perform(-1) }.not_to raise_error expect { subject.perform(-1) }.not_to raise_error
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment