Commit 891b27c0 authored by Douwe Maan's avatar Douwe Maan

Merge branch 'feature/geo-ssh-keys-sync' into 'master'

GitLab Geo: SSH Keys Synchronization

Add support for SSH Keys synchronization to GitLab Geo (#76)
Fixes https://gitlab.com/gitlab-org/gitlab-ee/issues/371

# Checklist

- [x] When a key is created or removed we notify secondary nodes of that change.
- [x] A secondary node must receive a notification to change `.ssh/authorized_keys`

When a notification to modify `authorized_keys` is received:
- [x] It must generated async job to add or remove keys
- [x] Generated job should retry a reasonable amount of times for a short period of time (30 times, waiting from 5 to 10 seconds)
- [x] Generated jobs should also have an exponential backoff logic after the short period of time retries.

See merge request !282
parents bf417c06 da2b55fd
...@@ -60,6 +60,10 @@ class GeoNode < ActiveRecord::Base ...@@ -60,6 +60,10 @@ class GeoNode < ActiveRecord::Base
URI.join(uri, "#{uri.path}/", "api/#{API::API.version}/geo/refresh_wikis").to_s URI.join(uri, "#{uri.path}/", "api/#{API::API.version}/geo/refresh_wikis").to_s
end end
def notify_key_url
URI.join(uri, "#{uri.path}/", "api/#{API::API.version}/geo/refresh_key").to_s
end
def oauth_callback_url def oauth_callback_url
URI.join(uri, "#{uri.path}/", 'oauth/geo/callback').to_s URI.join(uri, "#{uri.path}/", 'oauth/geo/callback').to_s
end end
......
...@@ -57,6 +57,7 @@ class Key < ActiveRecord::Base ...@@ -57,6 +57,7 @@ class Key < ActiveRecord::Base
end end
def add_to_shell def add_to_shell
Gitlab::Geo.notify_key_change(id, key, :create) if Gitlab::Geo.primary?
GitlabShellWorker.perform_async( GitlabShellWorker.perform_async(
:add_key, :add_key,
shell_id, shell_id,
...@@ -73,6 +74,7 @@ class Key < ActiveRecord::Base ...@@ -73,6 +74,7 @@ class Key < ActiveRecord::Base
end end
def remove_from_shell def remove_from_shell
Gitlab::Geo.notify_key_change(id, key, :delete) if Gitlab::Geo.primary?
GitlabShellWorker.perform_async( GitlabShellWorker.perform_async(
:remove_key, :remove_key,
shell_id, shell_id,
......
module Geo
class BaseNotify
include HTTParty
# HTTParty timeout
default_timeout Gitlab.config.gitlab.webhook_timeout
def notify(notify_url, content)
response = self.class.post(notify_url,
body: content,
headers: {
'Content-Type' => 'application/json',
'PRIVATE-TOKEN' => private_token
})
[(response.code >= 200 && response.code < 300), ActionView::Base.full_sanitizer.sanitize(response.to_s)]
rescue HTTParty::Error, Errno::ECONNREFUSED => e
[false, ActionView::Base.full_sanitizer.sanitize(e.message)]
end
private
def private_token
# TODO: should we ask admin user to be defined as part of configuration?
@private_token ||= User.find_by(admin: true).authentication_token
end
end
end
module Geo
class NotifyKeyChangeService < BaseNotify
def initialize(key_id, key, action)
@id = key_id
@key = key
@action = action
end
def execute
key_change = { 'id' => @id, 'key' => @key, 'action' => @action }
content = { key_change: key_change }.to_json
::Gitlab::Geo.secondary_nodes.each do |node|
notify_url = node.notify_key_url
success, message = notify(notify_url, content)
unless success
error_message = "GitLab failed to notify #{node.url} to #{notify_url} : #{message}"
Rails.logger.error(error_message)
fail error_message # we must throw exception here to re-schedule job execution.
end
end
end
end
end
module Geo module Geo
class NotifyNodesService class NotifyNodesService < BaseNotify
include HTTParty
# HTTParty timeout
default_timeout Gitlab.config.gitlab.webhook_timeout
def initialize def initialize
@proj_queue = Gitlab::Geo::UpdateQueue.new('updated_projects') @proj_queue = Gitlab::Geo::UpdateQueue.new('updated_projects')
...@@ -19,34 +15,18 @@ module Geo ...@@ -19,34 +15,18 @@ module Geo
def process(queue, notify_url_method) def process(queue, notify_url_method)
return if queue.empty? return if queue.empty?
projects = queue.fetch_batched_data projects = queue.fetch_batched_data
content = { projects: projects }.to_json
::Gitlab::Geo.secondary_nodes.each do |node| ::Gitlab::Geo.secondary_nodes.each do |node|
notify_url = node.send(notify_url_method.to_sym) notify_url = node.send(notify_url_method.to_sym)
success, message = notify(notify_url, projects) success, message = notify(notify_url, content)
unless success unless success
Rails.logger.error("GitLab failed to notify #{node.url} to #{notify_url} : #{message}") Rails.logger.error("GitLab failed to notify #{node.url} to #{notify_url} : #{message}")
queue.store_batched_data(projects) queue.store_batched_data(projects)
end end
end end
end end
def notify(notify_url, projects)
response = self.class.post(notify_url,
body: { projects: projects }.to_json,
headers: {
'Content-Type' => 'application/json',
'PRIVATE-TOKEN' => private_token
})
[(response.code >= 200 && response.code < 300), ActionView::Base.full_sanitizer.sanitize(response.to_s)]
rescue HTTParty::Error, Errno::ECONNREFUSED => e
[false, ActionView::Base.full_sanitizer.sanitize(e.message)]
end
def private_token
# TODO: should we ask admin user to be defined as part of configuration?
@private_token ||= User.find_by(admin: true).authentication_token
end
end end
end end
module Geo
class ScheduleKeyChangeService
attr_reader :id, :key, :action
def initialize(key_change)
@id = key_change['id']
@key = key_change['key']
@action = key_change['action']
end
def execute
GeoKeyRefreshWorker.perform_async(@id, @key, @action)
end
end
end
require 'active_support/concern'
module GeoDynamicBackoff
extend ActiveSupport::Concern
included do
sidekiq_options retry: 55
sidekiq_retry_in do |count|
count <= 30 ? linear_backoff_strategy(count) : geometric_backoff_strategy(count)
end
end
private
def linear_backoff_strategy(count)
rand(1..20) + count
end
def geometric_backoff_strategy(count)
# This strategy is based on the original one from sidekiq
count = count-30 # we must start counting after 30
(count ** 4) + 15 + (rand(30)*(count+1))
end
end
class GeoKeyChangeNotifyWorker
include Sidekiq::Worker
include GeoDynamicBackoff
sidekiq_options queue: :default
def perform(key_id, key, action)
Geo::NotifyKeyChangeService.new(key_id, key, action).execute
end
end
class GeoKeyRefreshWorker
include Sidekiq::Worker
include GeoDynamicBackoff
sidekiq_options queue: :default
def perform(key_id, key, action)
action = action.to_sym
case action
when :create
# ActiveRecord::RecordNotFound when not found (so job will retry)
key = Key.find(key_id)
key.add_to_shell
when :delete
# we are physically removing the key after model is removed
# so we must reconstruct ids to schedule removal
key = Key.new(id: key_id, key: key)
key.remove_from_shell
else
fail "Invalid action: #{action}"
end
end
end
...@@ -19,7 +19,8 @@ module Gitlab ...@@ -19,7 +19,8 @@ module Gitlab
#{config.root}/app/models/hooks #{config.root}/app/models/hooks
#{config.root}/app/models/concerns #{config.root}/app/models/concerns
#{config.root}/app/models/project_services #{config.root}/app/models/project_services
#{config.root}/app/models/members)) #{config.root}/app/models/members
#{config.root}/app/workers/concerns))
# Only load the plugins named here, in the order given (default is alphabetical). # Only load the plugins named here, in the order given (default is alphabetical).
# :all can be used as a placeholder for all plugins not explicitly named. # :all can be used as a placeholder for all plugins not explicitly named.
......
...@@ -3,12 +3,11 @@ module API ...@@ -3,12 +3,11 @@ module API
before { authenticated_as_admin! } before { authenticated_as_admin! }
resource :geo do resource :geo do
# Enqueue a batch of IDs of modified projects to have their # Enqueue a batch of IDs of modified projects to have their
# repositories updated # repositories updated
# #
# Example request: # Example request:
# POST /refresh_projects # POST /geo/refresh_projects
post 'refresh_projects' do post 'refresh_projects' do
required_attributes! [:projects] required_attributes! [:projects]
::Geo::ScheduleRepoUpdateService.new(params[:projects]).execute ::Geo::ScheduleRepoUpdateService.new(params[:projects]).execute
...@@ -18,11 +17,20 @@ module API ...@@ -18,11 +17,20 @@ module API
# wiki repositories updated # wiki repositories updated
# #
# Example request: # Example request:
# POST /refresh_wikis # POST /geo/refresh_wikis
post 'refresh_wikis' do post 'refresh_wikis' do
required_attributes! [:projects] required_attributes! [:projects]
::Geo::ScheduleWikiRepoUpdateService.new(params[:projects]).execute ::Geo::ScheduleWikiRepoUpdateService.new(params[:projects]).execute
end end
# Enqueue a change operation for specific key ID
#
# Example request:
# POST /geo/refresh_key
post 'refresh_key' do
required_attributes! [:key_change]
::Geo::ScheduleKeyChangeService.new(params[:key_change]).execute
end
end end
end end
end end
...@@ -42,6 +42,10 @@ module Gitlab ...@@ -42,6 +42,10 @@ module Gitlab
::Geo::EnqueueWikiUpdateService.new(project).execute ::Geo::EnqueueWikiUpdateService.new(project).execute
end end
def self.notify_key_change(key_id, key, action)
GeoKeyChangeNotifyWorker.perform_async(key_id, key, action)
end
def self.bulk_notify_job def self.bulk_notify_job
Sidekiq::Cron::Job.find('geo_bulk_notify_worker') Sidekiq::Cron::Job.find('geo_bulk_notify_worker')
end end
......
...@@ -2,7 +2,7 @@ module Gitlab ...@@ -2,7 +2,7 @@ module Gitlab
module Middleware module Middleware
class ReadonlyGeo class ReadonlyGeo
DISALLOWED_METHODS = %w(POST PATCH PUT DELETE) DISALLOWED_METHODS = %w(POST PATCH PUT DELETE)
WHITELISTED = %w(api/v3/internal api/v3/geo/refresh_projects api/v3/geo/refresh_wikis) WHITELISTED = %w(api/v3/internal api/v3/geo/refresh_projects api/v3/geo/refresh_wikis api/v3/geo/refresh_key)
APPLICATION_JSON = 'application/json' APPLICATION_JSON = 'application/json'
def initialize(app) def initialize(app)
......
...@@ -79,4 +79,15 @@ describe Gitlab::Geo, lib: true do ...@@ -79,4 +79,15 @@ describe Gitlab::Geo, lib: true do
described_class.notify_project_update(project) described_class.notify_project_update(project)
end end
end end
describe 'notify_ssh_key_change' do
let(:key) { FactoryGirl.build(:key) }
it 'schedule async notification' do
expect(GeoKeyChangeNotifyWorker).to receive(:perform_async).and_call_original
expect_any_instance_of(GeoKeyChangeNotifyWorker).to receive(:perform)
described_class.notify_key_change(key.id, key, 'create')
end
end
end end
...@@ -163,6 +163,14 @@ describe GeoNode, type: :model do ...@@ -163,6 +163,14 @@ describe GeoNode, type: :model do
end end
end end
describe '#notify_key_url' do
let(:refresh_url) { 'https://localhost:3000/gitlab/api/v3/geo/refresh_key' }
it 'returns api url based on node uri' do
expect(new_node.notify_key_url).to eq(refresh_url)
end
end
describe '#oauth_callback_url' do describe '#oauth_callback_url' do
let(:oauth_callback_url) { 'https://localhost:3000/gitlab/oauth/geo/callback' } let(:oauth_callback_url) { 'https://localhost:3000/gitlab/oauth/geo/callback' }
......
...@@ -7,15 +7,34 @@ describe API::API, api: true do ...@@ -7,15 +7,34 @@ describe API::API, api: true do
describe 'POST /geo/refresh_projects' do describe 'POST /geo/refresh_projects' do
before(:each) { allow_any_instance_of(::Geo::ScheduleRepoUpdateService).to receive(:execute) } before(:each) { allow_any_instance_of(::Geo::ScheduleRepoUpdateService).to receive(:execute) }
it 'should retrieve the license information if admin is logged in' do it 'starts refresh process if admin and correct params' do
post api('/geo/refresh_projects', admin), projects: ['1', '2', '3'] post api('/geo/refresh_projects', admin), projects: ['1', '2', '3']
expect(response.status).to eq 201 expect(response.status).to eq 201
end end
it 'should deny access if not admin' do it 'denies access if not admin' do
post api('/geo/refresh_projects', user) post api('/geo/refresh_projects', user)
expect(response.status).to eq 403 expect(response.status).to eq 403
end end
end end
describe 'POST /geo/refresh_key' do
before(:each) { allow_any_instance_of(::Geo::ScheduleKeyChangeService).to receive(:execute) }
it 'enqueues on disk key creation if admin and correct params' do
post api('/geo/refresh_key', admin), key_change: { id: 1, action: 'create' }
expect(response.status).to eq 201
end
it 'enqueues on disk key removal if admin and correct params' do
post api('/geo/refresh_key', admin), key_change: { id: 1, action: 'delete' }
expect(response.status).to eq 201
end
it 'denies access if not admin' do
post api('/geo/refresh_key', user)
expect(response.status).to eq 403
end
end
end end
describe Geo::EnqueueProjectUpdateService, service: true do describe Geo::EnqueueProjectUpdateService, services: true do
subject { Geo::EnqueueProjectUpdateService.new(project) } subject { Geo::EnqueueProjectUpdateService.new(project) }
let(:project) { double(:project) } let(:project) { double(:project) }
let(:fake_url) { 'git@localhost:repo/path.git' } let(:fake_url) { 'git@localhost:repo/path.git' }
......
describe Geo::EnqueueWikiUpdateService, service: true do describe Geo::EnqueueWikiUpdateService, services: true do
subject { Geo::EnqueueWikiUpdateService.new(project) } subject { Geo::EnqueueWikiUpdateService.new(project) }
let(:project) { double(:project) } let(:project) { double(:project) }
let(:fake_url) { 'git@localhost:repo/path.git' } let(:fake_url) { 'git@localhost:repo/path.git' }
......
require 'spec_helper'
describe Geo::ScheduleKeyChangeService, services: true do
subject(:key_create) { Geo::ScheduleKeyChangeService.new('id' => 1, 'key' => key.key, 'action' => :create) }
subject(:key_delete) { Geo::ScheduleKeyChangeService.new('id' => 1, 'key' => key.key, 'action' => :delete) }
let(:key) { FactoryGirl.build(:key) }
before(:each) { allow_any_instance_of(GeoKeyRefreshWorker).to receive(:perform) }
context 'key creation' do
it 'executes action' do
expect(key_create.execute).to be_truthy
end
end
context 'key removal' do
it 'executes action' do
expect(key_delete.execute).to be_truthy
end
end
end
require 'spec_helper'
describe GeoKeyRefreshWorker do
subject(:key_create) { described_class.new.perform(key.id, key.key, 'create') }
subject(:key_delete) { described_class.new.perform(key.id, key.key, 'delete') }
let(:key) { FactoryGirl.create(:key) }
context 'key creation' do
it 'adds key to shell' do
expect(Key).to receive(:find).with(key.id) { key }
expect(key).to receive(:add_to_shell)
expect { key_create }.not_to raise_error
end
end
context 'key removal' do
it 'removes key from the shell' do
expect(Key).to receive(:new).with(id: key.id, key: key.key) { key }
expect(key).to receive(:remove_from_shell)
expect { key_delete }.not_to raise_error
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment