Commit 9f449d21 authored by Stan Hu's avatar Stan Hu

Add a GitLab Geo download scheduler for LFS files

The download scheduler works as follows:

1. Load a batch of IDs that we need to download from the primary (DB_RETRIEVE_BATCH) into a pending list.
2. Schedule them so that at most MAX_CONCURRENT_DOWNLOADS are running at once.
3. When a slot frees, schedule another download.
4. When we have drained the pending list, load another batch into memory, and schedule the remaining files, excluding ones in progress.
5. Quit when we have scheduled all downloads or exceeded an hour.
parent 4f758caa
......@@ -350,6 +350,7 @@ Style/MutableConstant:
Exclude:
- 'db/migrate/**/*'
- 'db/post_migrate/**/*'
- 'db/geo/migrate/**/*'
# Favor unless over if for negative conditions (or control flow or).
Style/NegatedIf:
......
......@@ -20,7 +20,7 @@ module EE
def toggle_node_button(node)
btn_class, title, data =
if node.enabled?
['warning', 'Disable node', { confirm: 'Disabling a node stops the repositories backfilling process. Are you sure?' }]
['warning', 'Disable node', { confirm: 'Disabling a node stops the sync process. Are you sure?' }]
else
['success', 'Enable node']
end
......
class Geo::FileRegistry < Geo::BaseRegistry
end
......@@ -28,9 +28,7 @@ class GeoNodeStatus
end
def repositories_synced_in_percentage
return 0 if repositories_count.zero?
(repositories_synced_count.to_f / repositories_count.to_f) * 100.0
sync_percentage(repositories_count, repositories_synced_count)
end
def repositories_failed_count
......@@ -40,4 +38,32 @@ class GeoNodeStatus
def repositories_failed_count=(value)
@repositories_failed_count = value.to_i
end
def lfs_objects_total
@lfs_objects_total ||= LfsObject.count
end
def lfs_objects_total=(value)
@lfs_objects_total = value.to_i
end
def lfs_objects_synced
@lfs_objects_synced ||= Geo::FileRegistry.where(file_type: :lfs).count
end
def lfs_objects_synced=(value)
@lfs_objects_synced = value.to_i
end
def lfs_objects_synced_in_percentage
sync_percentage(lfs_objects_total, lfs_objects_synced)
end
private
def sync_percentage(total, synced)
return 0 if total.zero?
(synced.to_f / total.to_f) * 100.0
end
end
......@@ -3,6 +3,7 @@ class GeoNodePresenter < Gitlab::View::Presenter::Delegated
delegate :healthy?, :health, :repositories_count, :repositories_synced_count,
:repositories_synced_in_percentage, :repositories_failed_count,
:lfs_objects_total, :lfs_objects_synced, :lfs_objects_synced_in_percentage,
to: :status
private
......
module Geo
class FileDownloadService
attr_reader :object_type, :object_db_id
LEASE_TIMEOUT = 8.hours.freeze
def initialize(object_type, object_db_id)
@object_type = object_type
@object_db_id = object_db_id
end
def execute
try_obtain_lease do |lease|
case object_type
when :lfs
download_lfs_object
else
log("unknown file type: #{object_type}")
end
end
end
private
def try_obtain_lease
uuid = Gitlab::ExclusiveLease.new(lease_key, timeout: LEASE_TIMEOUT).try_obtain
return unless uuid.present?
begin
yield
ensure
Gitlab::ExclusiveLease.cancel(lease_key, uuid)
end
end
def download_lfs_object
lfs_object = LfsObject.find_by_id(object_db_id)
return unless lfs_object.present?
transfer = ::Gitlab::Geo::LfsTransfer.new(lfs_object)
bytes_downloaded = transfer.download_from_primary
success = bytes_downloaded && bytes_downloaded >= 0
update_registry(bytes_downloaded) if success
success
end
def log(message)
Rails.logger.info "#{self.class.name}: #{message}"
end
def update_registry(bytes_downloaded)
transfer = Geo::FileRegistry.find_or_initialize_by(
file_type: object_type,
file_id: object_db_id)
transfer.bytes = bytes_downloaded
transfer.save
end
def lease_key
"file_download_service:#{object_type}:#{object_db_id}"
end
end
end
......@@ -3,7 +3,7 @@ module Geo
include Gitlab::CurrentSettings
include HTTParty
KEYS = %w(health repositories_count repositories_synced_count repositories_failed_count).freeze
KEYS = %w(health repositories_count repositories_synced_count repositories_failed_count lfs_objects_total lfs_objects_synced).freeze
# HTTParty timeout
default_timeout current_application_settings.geo_status_timeout
......
......@@ -30,6 +30,9 @@
%p
%span.help-block
Repositories failed: #{node.repositories_failed_count}
%p
%span.help-block
LFS objects synced: #{node.lfs_objects_synced}/#{node.lfs_objects_total} (#{number_to_percentage(node.lfs_objects_synced_in_percentage, precision: 2)})
%p
%span.help-block= node.healthy? ? 'No Health Problems Detected' : node.health
......
......@@ -16,7 +16,7 @@ class GeoBackfillWorker
project_ids.each do |project_id|
begin
break if over_time?(start_time)
break unless node_enabled?
break unless Gitlab::Geo.current_node_enabled?
project = Project.find(project_id)
next if synced?(project)
......@@ -79,11 +79,4 @@ class GeoBackfillWorker
def lease_timeout
Geo::RepositoryBackfillService::LEASE_TIMEOUT
end
def node_enabled?
# No caching of the enabled! If we cache it and an admin disables
# this node, an active GeoBackfillWorker would keep going for up
# to max run time after the node was disabled.
Gitlab::Geo.current_node.reload.enabled?
end
end
class GeoFileDownloadDispatchWorker
include Sidekiq::Worker
include CronjobQueue
LEASE_KEY = 'geo_file_download_dispatch_worker'.freeze
LEASE_TIMEOUT = 8.hours.freeze
RUN_TIME = 60.minutes.to_i.freeze
DB_RETRIEVE_BATCH = 1000.freeze
MAX_CONCURRENT_DOWNLOADS = 10.freeze
def initialize
@pending_lfs_downloads = []
@scheduled_lfs_jobs = []
end
# The scheduling works as the following:
#
# 1. Load a batch of IDs that we need to download from the primary (DB_RETRIEVE_BATCH) into a pending list.
# 2. Schedule them so that at most MAX_CONCURRENT_DOWNLOADS are running at once.
# 3. When a slot frees, schedule another download.
# 4. When we have drained the pending list, load another batch into memory, and schedule the remaining
# files, excluding ones in progress.
# 5. Quit when we have scheduled all downloads or exceeded an hour.
def perform
return unless Gitlab::Geo.secondary?
@start_time = Time.now
# Prevent multiple Sidekiq workers from attempting to schedule downloads
try_obtain_lease do
loop do
break unless node_enabled?
update_jobs_in_progress
load_pending_downloads if reload_queue?
# If we are still under the limit after refreshing our DB, we can end
# after scheduling the remaining transfers.
last_batch = reload_queue?
break if over_time?
break unless downloads_remain?
schedule_lfs_downloads
break if last_batch
sleep(1)
end
end
end
private
def reload_queue?
@pending_lfs_downloads.size < MAX_CONCURRENT_DOWNLOADS
end
def over_time?
Time.now - @start_time >= RUN_TIME
end
def load_pending_downloads
@pending_lfs_downloads = find_lfs_object_ids(DB_RETRIEVE_BATCH)
end
def downloads_remain?
@pending_lfs_downloads.size
end
def schedule_lfs_downloads
num_to_schedule = [MAX_CONCURRENT_DOWNLOADS - job_ids.size, @pending_lfs_downloads.size].min
return unless downloads_remain?
num_to_schedule.times do
lfs_id = @pending_lfs_downloads.shift
job_id = GeoFileDownloadWorker.perform_async(:lfs, lfs_id)
if job_id
@scheduled_lfs_jobs << { job_id: job_id, id: lfs_id }
end
end
end
def find_lfs_object_ids(limit)
downloaded_ids = Geo::FileRegistry.where(file_type: 'lfs').pluck(:file_id)
downloaded_ids = (downloaded_ids + scheduled_lfs_ids).uniq
LfsObject.where.not(id: downloaded_ids).order(created_at: :desc).limit(limit).pluck(:id)
end
def update_jobs_in_progress
status = Gitlab::SidekiqStatus.job_status(job_ids)
# SidekiqStatus returns an array of booleans: true if the job has completed, false otherwise.
# For each entry, first use `zip` to make { job_id: 123, id: 10 } -> [ { job_id: 123, id: 10 }, bool ]
# Next, filter out the jobs that have completed.
@scheduled_lfs_jobs = @scheduled_lfs_jobs.zip(status).map { |(job, completed)| job if completed }.compact
end
def job_ids
@scheduled_lfs_jobs.map { |data| data[:job_id] }
end
def scheduled_lfs_ids
@scheduled_lfs_jobs.map { |data| data[:id] }
end
def try_obtain_lease
uuid = Gitlab::ExclusiveLease.new(LEASE_KEY, timeout: LEASE_TIMEOUT).try_obtain
return unless uuid
yield
release_lease(uuid)
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(LEASE_KEY, uuid)
end
def node_enabled?
# Only check every minute to avoid polling the DB excessively
unless @last_enabled_check.present? && (Time.now - @last_enabled_check > 1.minute)
@last_enabled_check = Time.now
@current_node_enabled = nil
end
@current_node_enabled ||= Gitlab::Geo.current_node_enabled?
end
end
class GeoFileDownloadWorker
include Sidekiq::Worker
include GeoQueue
def perform(object_type, object_id)
Geo::FileDownloadService.new(object_type.to_sym, object_id).execute
end
end
......@@ -218,6 +218,11 @@ production: &base
geo_backfill_worker:
cron: "*/5 * * * *"
# GitLab Geo file download worker
# NOTE: This will only take effect if Geo is enabled
geo_download_dispatch_worker:
cron: "*/10 * * * *"
registry:
# enabled: true
# host: registry.example.com
......
......@@ -392,6 +392,9 @@ Settings.cron_jobs['geo_bulk_notify_worker']['job_class'] ||= 'GeoBulkNotifyWork
Settings.cron_jobs['geo_backfill_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_backfill_worker']['cron'] ||= '*/5 * * * *'
Settings.cron_jobs['geo_backfill_worker']['job_class'] ||= 'GeoBackfillWorker'
Settings.cron_jobs['geo_download_dispatch_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_download_dispatch_worker']['cron'] ||= '5 * * * *'
Settings.cron_jobs['geo_download_dispatch_worker']['job_class'] ||= 'GeoFileDownloadDispatchWorker'
Settings.cron_jobs['gitlab_usage_ping_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['gitlab_usage_ping_worker']['cron'] ||= Settings.send(:cron_random_weekly_time)
Settings.cron_jobs['gitlab_usage_ping_worker']['job_class'] = 'GitlabUsagePingWorker'
......
......@@ -10,6 +10,6 @@
# end
#
ActiveSupport::Inflector.inflections do |inflect|
inflect.uncountable %w(award_emoji project_statistics project_registry)
inflect.uncountable %w(award_emoji project_statistics project_registry file_registry)
inflect.acronym 'EE'
end
......@@ -42,6 +42,9 @@ Sidekiq.configure_server do |config|
# GitLab Geo: enable backfill job only on secondary nodes
Gitlab::Geo.backfill_job.disable! unless Gitlab::Geo.secondary?
# GitLab Geo: enable backfill job only on secondary nodes
Gitlab::Geo.file_download_job.disable! unless Gitlab::Geo.secondary?
Gitlab::SidekiqThrottler.execute!
config = ActiveRecord::Base.configurations[Rails.env] ||
......
class CreateFileRegistry < ActiveRecord::Migration
def change
create_table :file_registry do |t|
t.string :file_type, null: false
t.integer :file_id, null: false
t.integer :bytes
t.string :sha256
t.datetime :created_at, null: false
end
add_index :file_registry, :file_type
add_index :file_registry, [:file_type, :file_id], { unique: true }
end
end
......@@ -12,10 +12,20 @@
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20170302005747) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
create_table "file_registry", force: :cascade do |t|
t.string "file_type", null: false
t.integer "file_id", null: false
t.integer "bytes"
t.string "sha256"
t.datetime "created_at", null: false
end
add_index "file_registry", ["file_type", "file_id"], name: "index_file_registry_on_file_type_and_file_id", unique: true, using: :btree
add_index "file_registry", ["file_type"], name: "index_file_registry_on_file_type", using: :btree
create_table "project_registry", force: :cascade do |t|
t.integer "project_id", null: false
t.datetime "last_repository_synced_at"
......@@ -24,5 +34,4 @@ ActiveRecord::Schema.define(version: 20170302005747) do
end
add_index "project_registry", ["project_id"], name: "index_project_registry_on_project_id", using: :btree
end
......@@ -774,6 +774,8 @@ module API
expose :repositories_count
expose :repositories_synced_count
expose :repositories_failed_count
expose :lfs_objects_total
expose :lfs_objects_synced
end
end
end
......@@ -22,6 +22,13 @@ module Gitlab
self.cache_value(:geo_node_enabled) { GeoNode.exists? }
end
def self.current_node_enabled?
# No caching of the enabled! If we cache it and an admin disables
# this node, an active GeoBackfillWorker would keep going for up
# to max run time after the node was disabled.
Gitlab::Geo.current_node.reload.enabled?
end
def self.license_allows?
::License.current && ::License.current.add_on?('GitLab_Geo')
end
......@@ -54,6 +61,10 @@ module Gitlab
Sidekiq::Cron::Job.find('geo_backfill_worker')
end
def self.file_download_job
Sidekiq::Cron::Job.find('geo_download_dispatch_worker')
end
def self.oauth_authentication
return false unless Gitlab::Geo.secondary?
......
......@@ -47,7 +47,7 @@ module Gitlab
def hmac_secret(access_key)
@hmac_secret ||= begin
geo_node = GeoNode.find_by(access_key: access_key)
geo_node = GeoNode.find_by(access_key: access_key, enabled: true)
geo_node&.secret_access_key
end
end
......
......@@ -12,6 +12,13 @@ describe Gitlab::Geo::JwtRequestDecoder do
expect(subject.decode).to eq(data)
end
it 'fails to decode when node is disabled' do
primary_node.enabled = false
primary_node.save
expect(subject.decode).to be_nil
end
it 'fails to decode with wrong key' do
data = request.headers['Authorization']
......
require 'spec_helper'
describe Geo::GeoNodeStatus, models: true do
subject { GeoNodeStatus.new }
describe '#lfs_objects_synced_in_percentage' do
it 'returns 0 when no objects are available' do
subject.lfs_objects_total = 0
subject.lfs_objects_synced = 0
expect(subject.lfs_objects_synced_in_percentage).to eq(0)
end
it 'returns the right percentage' do
subject.lfs_objects_total = 4
subject.lfs_objects_synced = 1
expect(subject.lfs_objects_synced_in_percentage).to be_within(0.0001).of(25)
end
end
end
......@@ -1972,6 +1972,16 @@ describe Repository, models: true do
end
end
describe '#after_sync' do
it 'expires repository cache' do
expect(repository).to receive(:expire_all_method_caches)
expect(repository).to receive(:expire_branch_cache)
expect(repository).to receive(:expire_content_cache)
repository.after_sync
end
end
def create_remote_branch(remote_name, branch_name, target)
rugged = repository.rugged
rugged.references.create("refs/remotes/#{remote_name}/#{branch_name}", target.id)
......
require 'spec_helper'
describe Geo::FileDownloadService, services: true do
let(:lfs_object) { create(:lfs_object) }
let(:secondary) { create(:geo_node) }
subject { Geo::FileDownloadService.new(:lfs, lfs_object.id) }
before do
create(:geo_node, :primary)
allow(described_class).to receive(:current_node) { secondary }
end
describe '#execute' do
it 'downloads an LFS object' do
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
allow_any_instance_of(Gitlab::Geo::LfsTransfer)
.to receive(:download_from_primary).and_return(100)
expect{ subject.execute }.to change { Geo::FileRegistry.count }.by(1)
end
end
end
require 'spec_helper'
describe GeoFileDownloadDispatchWorker do
before do
@primary = create(:geo_node, :primary, host: 'primary-geo-node')
@secondary = create(:geo_node, :current)
allow(Gitlab::Geo).to receive(:secondary?).and_return(true)
allow_any_instance_of(Gitlab::ExclusiveLease)
.to receive(:try_obtain).and_return(true)
WebMock.stub_request(:get, /primary-geo-node/).to_return(status: 200, body: "", headers: {})
end
subject { described_class.new }
describe '#perform' do
it 'does not schedule anything when node is disabled' do
expect(GeoFileDownloadWorker).not_to receive(:perform_async)
@secondary.enabled = false
@secondary.save
subject.perform
end
it 'executes GeoFileDownloadWorker for each LFS object' do
create_list(:lfs_object, 2, :with_file)
allow_any_instance_of(described_class).to receive(:over_time?).and_return(false)
expect(GeoFileDownloadWorker).to receive(:perform_async).twice.and_call_original
subject.perform
end
# Test the case where we have:
#
# 1. A total of 6 files in the queue, and we can load a maxmimum of 5 and send 2 at a time.
# 2. We send 2, wait for 1 to finish, and then send again.
it 'attempts to load a new batch without pending downloads' do
stub_const('GeoFileDownloadDispatchWorker::DB_RETRIEVE_BATCH', 5)
stub_const('GeoFileDownloadDispatchWorker::MAX_CONCURRENT_DOWNLOADS', 2)
create_list(:lfs_object, 6, :with_file)
allow_any_instance_of(described_class).to receive(:over_time?).and_return(false)
expect(GeoFileDownloadWorker).to receive(:perform_async).exactly(6).times.and_call_original
# For 6 downloads, we expect three database reloads:
# 1. Load the first batch of 5.
# 2. 4 get sent out, 1 remains. This triggers another reload, which loads in the remaining 2.
# 3. Since the second reload filled the pipe with 2, we need to do a final reload to ensure
# zero are left.
expect(subject).to receive(:load_pending_downloads).exactly(3).times.and_call_original
Sidekiq::Testing.inline! do
subject.perform
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment