Add a PoC for a basic tracking system for backfill repositories

[ci skip]
parent c6534317
...@@ -21,6 +21,7 @@ eslint-report.html ...@@ -21,6 +21,7 @@ eslint-report.html
/backups/* /backups/*
/config/aws.yml /config/aws.yml
/config/database.yml /config/database.yml
/config/database_geo.yml
/config/gitlab.yml /config/gitlab.yml
/config/gitlab_ci.yml /config/gitlab_ci.yml
/config/initializers/rack_attack.rb /config/initializers/rack_attack.rb
......
class Admin::GeoNodesController < Admin::ApplicationController class Admin::GeoNodesController < Admin::ApplicationController
before_action :check_license, except: [:index, :destroy] before_action :check_license, except: [:index, :destroy]
before_action :load_node, only: [:destroy, :repair, :backfill_repositories] before_action :load_node, only: [:destroy, :repair]
def index def index
@nodes = GeoNode.all @nodes = GeoNode.all
...@@ -40,16 +40,6 @@ class Admin::GeoNodesController < Admin::ApplicationController ...@@ -40,16 +40,6 @@ class Admin::GeoNodesController < Admin::ApplicationController
redirect_to admin_geo_nodes_path redirect_to admin_geo_nodes_path
end end
def backfill_repositories
if @node.primary?
redirect_to admin_geo_nodes_path, notice: 'This is the primary node. Please run this action with a secondary node.'
else
@node.backfill_repositories
redirect_to admin_geo_nodes_path, notice: 'Backfill scheduled successfully.'
end
end
private private
def geo_node_params def geo_node_params
......
class Geo::BaseRegistry < ActiveRecord::Base
self.abstract_class = true
establish_connection Rails.configuration.geo_database
end
class Geo::ProjectRegistry < Geo::BaseRegistry
validates :project_id, presence: true
end
...@@ -79,12 +79,6 @@ class GeoNode < ActiveRecord::Base ...@@ -79,12 +79,6 @@ class GeoNode < ActiveRecord::Base
self.primary? ? false : !oauth_application.present? self.primary? ? false : !oauth_application.present?
end end
def backfill_repositories
if Gitlab::Geo.enabled? && !primary?
GeoScheduleBackfillWorker.perform_async(id)
end
end
private private
def geo_api_url(suffix) def geo_api_url(suffix)
......
module Geo module Geo
class RepositoryBackfillService class RepositoryBackfillService
attr_reader :project, :geo_node attr_reader :project
def initialize(project, geo_node) def initialize(project)
@project = project @project = project
@geo_node = geo_node
end end
def execute def execute
geo_node.system_hook.execute(hook_data, 'system_hooks') try_obtain_lease do
project.create_repository unless project.repository_exists?
project.repository.after_create if project.empty_repo?
project.repository.fetch_geo_mirror(ssh_url_to_repo)
project.repository.expire_all_method_caches
project.repository.expire_branch_cache
project.repository.expire_content_cache
# TODO: Check if it was successful or not
timestamp = DateTime.now
registry = Geo::ProjectRegistry.find_or_create_by(project_id: project.id)
registry.last_repository_synced_at = timestamp
registry.last_repository_successful_sync_at = timestamp
registry.save
end
end end
private private
def hook_data def try_obtain_lease
{ uuid = Gitlab::ExclusiveLease.new(
event_name: 'repository_update', lease_key,
project_id: project.id, timeout: 24.hours
project: project.hook_attrs, ).try_obtain
remote_url: project.ssh_url_to_repo
} return unless uuid
yield
release_lease(uuid)
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(lease_key, uuid)
end
def lease_key
@key ||= "repository_backfill_service:#{project.id}"
end
def ssh_url_to_repo
"#{Gitlab::Geo.primary_ssh_path_prefix}#{project.path_with_namespace}.git"
end end
end end
end end
module Geo
class ScheduleBackfillService
attr_accessor :geo_node_id
def initialize(geo_node_id)
@geo_node_id = geo_node_id
end
def execute
return if geo_node_id.nil?
Project.find_each(batch_size: 100) do |project|
GeoRepositoryBackfillWorker.perform_async(geo_node_id, project.id) if project.valid_repo?
end
end
end
end
...@@ -30,10 +30,6 @@ ...@@ -30,10 +30,6 @@
= link_to repair_admin_geo_node_path(node), method: :post, title: 'OAuth application is missing', class: 'btn btn-default btn-sm prepend-left-10' do = link_to repair_admin_geo_node_path(node), method: :post, title: 'OAuth application is missing', class: 'btn btn-default btn-sm prepend-left-10' do
= icon('exclamation-triangle fw') = icon('exclamation-triangle fw')
Repair authentication Repair authentication
- unless node.primary?
= link_to backfill_repositories_admin_geo_node_path(node), method: :post, class: 'btn btn-primary btn-sm prepend-left-10' do
= icon 'map-signs'
Backfill all repositories
= link_to admin_geo_node_path(node), data: { confirm: 'Are you sure?' }, method: :delete, class: 'btn btn-remove btn-sm prepend-left-10' do = link_to admin_geo_node_path(node), data: { confirm: 'Are you sure?' }, method: :delete, class: 'btn btn-remove btn-sm prepend-left-10' do
= icon 'trash' = icon 'trash'
Remove Remove
class GeoBackfillWorker
include Sidekiq::Worker
include CronjobQueue
RUN_TIME = 5.minutes.to_i.freeze
def perform
start = Time.now
project_ids.each do |project_id|
break if Time.now - start >= RUN_TIME
project = Project.find(project_id)
next if project.repository_exists?
try_obtain_lease do
Geo::RepositoryBackfillService.new(project).execute
end
end
end
private
def project_ids
return [] if Project.count == Geo::ProjectRegistry.count
Project.where.not(id: Geo::ProjectRegistry.pluck(:id))
.limit(100)
.pluck(:id)
end
def try_obtain_lease
uuid = Gitlab::ExclusiveLease.new(
lease_key,
timeout: 24.hours
).try_obtain
return unless uuid
yield
release_lease(uuid)
end
def release_lease(uuid)
Gitlab::ExclusiveLease.cancel(lease_key, uuid)
end
def lease_key
'repository_backfill_service'
end
end
class GeoRepositoryBackfillWorker
include Sidekiq::Worker
include ::GeoDynamicBackoff
include GeoQueue
def perform(geo_node_id, project_id)
project = Project.find(project_id)
geo_node = GeoNode.find(geo_node_id)
Geo::RepositoryBackfillService.new(project, geo_node).execute
end
end
class GeoScheduleBackfillWorker
include Sidekiq::Worker
include ::GeoDynamicBackoff
include GeoQueue
def perform(geo_node_id)
Geo::ScheduleBackfillService.new(geo_node_id).execute
end
end
#
# PRODUCTION
#
production:
adapter: postgresql
encoding: unicode
database: gitlabhq_geo_production
pool: 10
# username: git
# password:
# host: localhost
# port: 5432
#
# Development specific
#
development:
adapter: postgresql
encoding: unicode
database: gitlabhq_geo_development
pool: 5
username: postgres
password:
#
# Staging specific
#
staging:
adapter: postgresql
encoding: unicode
database: gitlabhq_geo_staging
pool: 5
username: postgres
password:
# Warning: The database defined as "test" will be erased and
# re-generated from your development database when you run "rake".
# Do not set this db to the same as development or production.
test: &test
adapter: postgresql
encoding: unicode
database: gitlabhq_geo_test
pool: 5
username: postgres
password:
...@@ -213,6 +213,11 @@ production: &base ...@@ -213,6 +213,11 @@ production: &base
geo_bulk_notify_worker: geo_bulk_notify_worker:
cron: "*/10 * * * * *" cron: "*/10 * * * * *"
# Gitlab Geo backfill worker
# NOTE: This will only take effect if Geo is enabled
geo_backfill_worker:
cron: "*/5 * * * *"
registry: registry:
# enabled: true # enabled: true
# host: registry.example.com # host: registry.example.com
......
...@@ -384,6 +384,9 @@ Settings.cron_jobs['ldap_group_sync_worker']['job_class'] = 'LdapGroupSyncWorker ...@@ -384,6 +384,9 @@ Settings.cron_jobs['ldap_group_sync_worker']['job_class'] = 'LdapGroupSyncWorker
Settings.cron_jobs['geo_bulk_notify_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['geo_bulk_notify_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_bulk_notify_worker']['cron'] ||= '*/10 * * * * *' Settings.cron_jobs['geo_bulk_notify_worker']['cron'] ||= '*/10 * * * * *'
Settings.cron_jobs['geo_bulk_notify_worker']['job_class'] ||= 'GeoBulkNotifyWorker' Settings.cron_jobs['geo_bulk_notify_worker']['job_class'] ||= 'GeoBulkNotifyWorker'
Settings.cron_jobs['geo_backfill_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_backfill_worker']['cron'] ||= '*/5 * * * *'
Settings.cron_jobs['geo_backfill_worker']['job_class'] ||= 'GeoBackfillWorker'
Settings.cron_jobs['gitlab_usage_ping_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['gitlab_usage_ping_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['gitlab_usage_ping_worker']['cron'] ||= Settings.send(:cron_random_weekly_time) Settings.cron_jobs['gitlab_usage_ping_worker']['cron'] ||= Settings.send(:cron_random_weekly_time)
Settings.cron_jobs['gitlab_usage_ping_worker']['job_class'] = 'GitlabUsagePingWorker' Settings.cron_jobs['gitlab_usage_ping_worker']['job_class'] = 'GitlabUsagePingWorker'
......
Rails.application.configure do
config.geo_database = config_for(:database_geo)
end
...@@ -39,6 +39,9 @@ Sidekiq.configure_server do |config| ...@@ -39,6 +39,9 @@ Sidekiq.configure_server do |config|
# Gitlab Geo: enable bulk notify job only on primary node # Gitlab Geo: enable bulk notify job only on primary node
Gitlab::Geo.bulk_notify_job.disable! unless Gitlab::Geo.primary? Gitlab::Geo.bulk_notify_job.disable! unless Gitlab::Geo.primary?
# GitLab Geo: enable backfill job only on secondary nodes
Gitlab::Geo.backfill_job.disable! unless Gitlab::Geo.secondary?
Gitlab::SidekiqThrottler.execute! Gitlab::SidekiqThrottler.execute!
config = ActiveRecord::Base.configurations[Rails.env] || config = ActiveRecord::Base.configurations[Rails.env] ||
......
...@@ -113,7 +113,6 @@ namespace :admin do ...@@ -113,7 +113,6 @@ namespace :admin do
resources :geo_nodes, only: [:index, :create, :destroy] do resources :geo_nodes, only: [:index, :create, :destroy] do
member do member do
post :repair post :repair
post :backfill_repositories
end end
end end
## EE-specific ## EE-specific
......
class CreateProjectRegistry < ActiveRecord::Migration
def change
create_table :project_registries do |t|
t.integer :project_id, null: false
t.datetime :last_repository_synced_at
t.datetime :last_repository_successful_sync_at
t.datetime :created_at, null: false
end
end
end
# encoding: UTF-8
# This file is auto-generated from the current state of the database. Instead
# of editing this file, please use the migrations feature of Active Record to
# incrementally modify your database, and then regenerate this schema definition.
#
# Note that this schema.rb definition is the authoritative source for your
# database schema. If you need to create the application database on another
# system, you should be using db:schema:load, not running all the migrations
# from scratch. The latter is a flawed and unsustainable approach (the more migrations
# you'll amass, the slower it'll run and the greater likelihood for issues).
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20170206203234) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
create_table "project_registries", force: :cascade do |t|
t.integer "project_id", null: false
t.datetime "last_repository_synced_at"
t.datetime "last_repository_successful_sync_at"
t.datetime "created_at", null: false
end
end
require 'rails/generators'
require 'rails/generators/active_record'
require 'rails/generators/active_record/migration/migration_generator'
class GeoMigrationGenerator < ActiveRecord::Generators::MigrationGenerator
source_root File.join(File.dirname(ActiveRecord::Generators::MigrationGenerator.instance_method(:create_migration_file).source_location.first), 'templates')
def create_migration_file
set_local_assigns!
validate_file_name!
migration_template @migration_template, "db_geo/migrate/#{file_name}.rb"
end
end
...@@ -34,6 +34,10 @@ module Gitlab ...@@ -34,6 +34,10 @@ module Gitlab
self.cache_value(:geo_node_secondary) { self.enabled? && self.current_node && !self.current_node.primary? } self.cache_value(:geo_node_secondary) { self.enabled? && self.current_node && !self.current_node.primary? }
end end
def self.primary_ssh_path_prefix
self.cache_value(:geo_primary_ssh_path_prefix) { self.enabled? && self.primary_node && "git@#{self.primary_node.host}:" }
end
def self.geo_node?(host:, port:) def self.geo_node?(host:, port:)
GeoNode.where(host: host, port: port).exists? GeoNode.where(host: host, port: port).exists?
end end
...@@ -46,6 +50,10 @@ module Gitlab ...@@ -46,6 +50,10 @@ module Gitlab
Sidekiq::Cron::Job.find('geo_bulk_notify_worker') Sidekiq::Cron::Job.find('geo_bulk_notify_worker')
end end
def self.backfill_job
Sidekiq::Cron::Job.find('geo_backfill_worker')
end
def self.oauth_authentication def self.oauth_authentication
return false unless Gitlab::Geo.secondary? return false unless Gitlab::Geo.secondary?
......
task spec: ['geo:db:test:prepare']
namespace :geo do
namespace :db do |ns|
%i(drop create setup migrate rollback seed version).each do |task_name|
task task_name do
Rake::Task["db:#{task_name}"].invoke
end
end
namespace :schema do
%i(load dump).each do |task_name|
task task_name do
Rake::Task["db:schema:#{task_name}"].invoke
end
end
end
namespace :test do
task :prepare do
Rake::Task['db:test:prepare'].invoke
end
end
# append and prepend proper tasks to all the tasks defined above
ns.tasks.each do |task|
task.enhance ['geo:config:set'] do
Rake::Task['geo:config:restore'].invoke
end
end
end
namespace :config do
task :set do
# save current configuration
@previous_config = {
config: Rails.application.config.dup,
schema: ENV['SCHEMA'],
skip_post_deployment_migrations: ENV['SKIP_POST_DEPLOYMENT_MIGRATIONS']
}
# set config variables for geo database
ENV['SCHEMA'] = 'db_geo/schema.rb'
ENV['SKIP_POST_DEPLOYMENT_MIGRATIONS'] = 'true'
Rails.application.config.paths['db'] = ['db_geo']
Rails.application.config.paths['db/migrate'] = ['db_geo/migrate']
Rails.application.config.paths['db/seeds.rb'] = ['db_geo/seeds.rb']
Rails.application.config.paths['config/database'] = ['config/database_geo.yml']
end
task :restore do
# restore config variables to previous values
ENV['SCHEMA'] = @previous_config[:schema]
ENV['SKIP_POST_DEPLOYMENT_MIGRATIONS'] = @previous_config[:skip_post_deployment_migrations]
Rails.application.config = @previous_config[:config]
end
end
end
...@@ -115,16 +115,4 @@ describe Admin::GeoNodesController do ...@@ -115,16 +115,4 @@ describe Admin::GeoNodesController do
it_behaves_like 'unlicensed geo action' it_behaves_like 'unlicensed geo action'
end end
describe '#backfill_repositories' do
let(:geo_node) { create(:geo_node) }
subject { post :backfill_repositories, id: geo_node }
before do
allow(Gitlab::Geo).to receive(:license_allows?) { false }
subject
end
it_behaves_like 'unlicensed geo action'
end
end end
...@@ -234,28 +234,4 @@ describe GeoNode, type: :model do ...@@ -234,28 +234,4 @@ describe GeoNode, type: :model do
expect(node).to be_missing_oauth_application expect(node).to be_missing_oauth_application
end end
end end
describe '#backfill_repositories' do
before do
Sidekiq::Worker.clear_all
end
it 'schedules the scheduler worker' do
Sidekiq::Testing.fake! do
expect { node.backfill_repositories }.to change(GeoScheduleBackfillWorker.jobs, :size).by(1)
end
end
it 'schedules the correct worker for the number of projects' do
Sidekiq::Testing.fake! do
2.times do
create(:project)
end
node.backfill_repositories
expect { GeoScheduleBackfillWorker.drain }.to change(GeoRepositoryBackfillWorker.jobs, :size).by(2)
end
end
end
end end
...@@ -75,14 +75,6 @@ describe API::Geo, api: true do ...@@ -75,14 +75,6 @@ describe API::Geo, api: true do
post api('/geo/receive_events'), push_payload, geo_token_header post api('/geo/receive_events'), push_payload, geo_token_header
expect(response.status).to eq 201 expect(response.status).to eq 201
end end
it 'can start a refresh process from the backfill service' do
project = create(:project)
backfill = Geo::RepositoryBackfillService.new(project, geo_node)
post api('/geo/receive_events'), backfill.send(:hook_data), geo_token_header
expect(response.status).to eq 201
end
end end
describe 'POST /geo/receive_events push_tag events' do describe 'POST /geo/receive_events push_tag events' do
......
require 'spec_helper' require 'spec_helper'
describe Geo::RepositoryBackfillService, services: true do describe Geo::RepositoryBackfillService, services: true do
SYSTEM_HOOKS_HEADER = { 'Content-Type' => 'application/json', 'X-Gitlab-Event' => 'System Hook' }.freeze
let(:project) { create(:project) } let(:project) { create(:project) }
let(:geo_node) { create(:geo_node) }
subject { Geo::RepositoryBackfillService.new(project, geo_node) } subject { Geo::RepositoryBackfillService.new(project) }
describe '#execute' do describe '#execute' do
it 'calls upon the system hook of the Geo Node' do pending { raise 'must be implemented' }
WebMock.stub_request(:post, geo_node.geo_events_url)
subject.execute
expect(WebMock).to have_requested(:post, geo_node.geo_events_url).with(
headers: SYSTEM_HOOKS_HEADER,
body: {
event_name: 'repository_update',
project_id: project.id,
project: project.hook_attrs,
remote_url: project.ssh_url_to_repo
}
).once
end
end end
end end
require 'spec_helper'
describe Geo::ScheduleBackfillService, services: true do
subject { Geo::ScheduleBackfillService.new(geo_node.id) }
let(:geo_node) { create(:geo_node) }
describe '#execute' do
it 'schedules the backfill service' do
Sidekiq::Worker.clear_all
Sidekiq::Testing.fake! do
2.times do
create(:project)
end
expect{ subject.execute }.to change(GeoRepositoryBackfillWorker.jobs, :size).by(2)
end
end
end
end
require 'spec_helper'
describe Geo::GeoBackfillWorker, services: true do
describe '#perform' do
pending { raise 'must be implemented' }
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment