Commit 7da23b18 authored by Valery Sizov's avatar Valery Sizov

Docker Registry replication for Geo

Replicate Docker images from primary to secondary node
parent b2fbc919
......@@ -70,10 +70,14 @@ class ContainerRepository < ApplicationRecord
digests = tags.map { |tag| tag.digest }.to_set
digests.all? do |digest|
client.delete_repository_tag(self.path, digest)
delete_tag_by_digest(digest)
end
end
def delete_tag_by_digest(digest)
client.delete_repository_tag(self.path, digest)
end
def self.build_from_path(path)
self.new(project: path.repository_project,
name: path.repository_name)
......@@ -92,3 +96,5 @@ class ContainerRepository < ApplicationRecord
name: path.repository_name)
end
end
ContainerRepository.prepend_if_ee('EE::ContainerRepository')
......@@ -17,6 +17,14 @@ module Auth
end
def self.full_access_token(*names)
access_token(%w(*), names)
end
def self.pull_access_token(*names)
access_token(['pull'], names)
end
def self.access_token(actions, names)
names = names.flatten
registry = Gitlab.config.registry
token = JSONWebToken::RSAToken.new(registry.key)
......@@ -25,7 +33,7 @@ module Auth
token.expire_time = token_expire_at
token[:access] = names.map do |name|
{ type: 'repository', name: name, actions: %w(*) }
{ type: 'repository', name: name, actions: actions }
end
token.encoded
......
......@@ -427,6 +427,11 @@ production: &base
# If it is blank, it defaults to external_url.
node_name: ''
registry_replication:
# enabled: true
# primary_api_url: http://localhost:5000/ # internal address to the primary registry, will be used by GitLab to directly communicate with primary registry API
#
# 2. GitLab CI settings
# ==========================
......
......@@ -19,6 +19,7 @@ ActiveSupport::Inflector.inflections do |inflect|
project_registry
file_registry
job_artifact_registry
container_repository_registry
vulnerability_feedback
vulnerabilities_feedback
group_view
......
......@@ -296,6 +296,12 @@ Gitlab.ee do
Settings['geo'] ||= Settingslogic.new({})
# For backwards compatibility, default to gitlab_url and if so, ensure it ends with "/"
Settings.geo['node_name'] = Settings.geo['node_name'].presence || Settings.gitlab['url'].chomp('/').concat('/')
#
# Registry replication
#
Settings.geo['registry_replication'] ||= Settingslogic.new({})
Settings.geo.registry_replication['enabled'] ||= false
end
#
......@@ -473,6 +479,9 @@ Gitlab.ee do
Settings.cron_jobs['geo_repository_verification_secondary_scheduler_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_repository_verification_secondary_scheduler_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['geo_repository_verification_secondary_scheduler_worker']['job_class'] ||= 'Geo::RepositoryVerification::Secondary::SchedulerWorker'
Settings.cron_jobs['geo_container_repository_sync_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_container_repository_sync_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['geo_container_repository_sync_worker']['job_class'] ||= 'Geo::ContainerRepositorySyncDispatchWorker'
Settings.cron_jobs['historical_data_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['historical_data_worker']['cron'] ||= '0 12 * * *'
Settings.cron_jobs['historical_data_worker']['job_class'] = 'HistoricalDataWorker'
......
# frozen_string_literal: true
class AddGeoContainerSyncCapacity < ActiveRecord::Migration[5.1]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def change
change_table :geo_nodes do |t|
t.column :container_repositories_max_capacity, :integer, default: 10, null: false
end
end
end
......@@ -1435,6 +1435,7 @@ ActiveRecord::Schema.define(version: 2019_07_29_090456) do
t.integer "minimum_reverification_interval", default: 7, null: false
t.string "internal_url"
t.string "name", null: false
t.integer "container_repositories_max_capacity", default: 10, null: false
t.index ["access_key"], name: "index_geo_nodes_on_access_key"
t.index ["name"], name: "index_geo_nodes_on_name", unique: true
t.index ["primary"], name: "index_geo_nodes_on_primary"
......
# frozen_string_literal: true
module Geo
class ContainerRepositoryRegistryFinder < RegistryFinder
def count_syncable
container_repositories.count
end
def count_synced
registries_for_container_repositories
.merge(Geo::ContainerRepositoryRegistry.synced).count
end
def count_failed
registries_for_container_repositories
.merge(Geo::ContainerRepositoryRegistry.failed).count
end
def count_registry
Geo::ContainerRepositoryRegistry.count
end
# Find limited amount of non replicated container repositories.
#
# You can pass a list with `except_repository_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
#
# @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_repository_ids ids that will be ignored from the query
# rubocop: disable CodeReuse/ActiveRecord
def find_unsynced(batch_size:, except_repository_ids: [])
container_repositories
.missing_container_repository_registry
.id_not_in(except_repository_ids)
.limit(batch_size)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def find_retryable_failed_ids(batch_size:, except_repository_ids: [])
Geo::ContainerRepositoryRegistry
.failed
.retry_due
.repository_id_not_in(except_repository_ids)
.limit(batch_size)
.pluck_container_repository_key
end
# rubocop: enable CodeReuse/ActiveRecord
private
def container_repositories
current_node.container_repositories
end
def registries_for_container_repositories
container_repositories
.inner_join_container_repository_registry
end
end
end
# frozen_string_literal: true
module EE
module ContainerRepository
extend ActiveSupport::Concern
prepended do
scope :project_id_in, ->(ids) { joins(:project).merge(Project.id_in(ids)) }
end
def push_blob(digest, file_path)
client.push_blob(path, digest, file_path)
end
def push_manifest(tag, manifest, manifest_type)
client.push_manifest(path, tag, manifest, manifest_type)
end
def blob_exists?(digest)
client.blob_exists?(path, digest)
end
end
end
# frozen_string_literal: true
class Geo::ContainerRepositoryRegistry < Geo::BaseRegistry
include ::Delay
belongs_to :container_repository
scope :repository_id_not_in, -> (ids) { where.not(container_repository_id: ids) }
scope :failed, -> { with_state(:failed) }
scope :synced, -> { with_state(:synced) }
scope :retry_due, -> { where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.now))) }
state_machine :state, initial: :pending do
state :started
state :synced
state :failed
state :pending
before_transition any => :started do |registry, _|
registry.last_synced_at = Time.now
end
before_transition any => :synced do |registry, _|
registry.retry_count = 0
registry.retry_at = nil
registry.last_sync_failure = nil
end
before_transition any => :pending do |registry, _|
registry.retry_at = 0
registry.retry_count = 0
end
event :start_sync! do
transition [:synced, :failed, :pending] => :started
end
event :finish_sync! do
transition started: :synced
end
event :repository_updated! do
transition [:synced, :failed, :started] => :pending
end
end
def self.pluck_container_repository_key
where(nil).pluck(:container_repository_id)
end
def fail_sync!(message, error)
new_retry_count = retry_count + 1
update!(
state: :failed,
last_sync_failure: "#{message}: #{error.message}",
retry_count: new_retry_count,
retry_at: next_retry_time(new_retry_count)
)
end
end
# frozen_string_literal: true
module Geo
module Fdw
class ContainerRepository < ::Geo::BaseFdw
self.table_name = Gitlab::Geo::Fdw.foreign_table_name('container_repositories')
self.primary_key = :id
belongs_to :project, class_name: 'Geo::Fdw::Project', inverse_of: :container_repositories
scope :project_id_in, ->(ids) { joins(:project).merge(Geo::Fdw::Project.id_in(ids)) }
class << self
def inner_join_container_repository_registry
join_statement =
arel_table
.join(container_repository_registry_table, Arel::Nodes::InnerJoin)
.on(arel_table[:id].eq(container_repository_registry_table[:container_repository_id]))
joins(join_statement.join_sources)
end
def missing_container_repository_registry
left_outer_join_container_repository_registry
.where(container_repository_registry_table[:id].eq(nil))
end
private
def container_repository_registry_table
Geo::ContainerRepositoryRegistry.arel_table
end
def left_outer_join_container_repository_registry
join_statement =
arel_table
.join(container_repository_registry_table, Arel::Nodes::OuterJoin)
.on(arel_table[:id].eq(container_repository_registry_table[:container_repository_id]))
joins(join_statement.join_sources)
end
end
end
end
end
......@@ -69,6 +69,12 @@ module Geo
end
end
def container_repositories
return Geo::Fdw::ContainerRepository.all unless selective_sync?
Geo::Fdw::ContainerRepository.project_id_in(projects)
end
private
def projects_for_selected_namespaces
......
......@@ -12,6 +12,8 @@ module Geo
has_many :job_artifacts, class_name: 'Geo::Fdw::Ci::JobArtifact'
has_many :lfs_objects_projects, class_name: 'Geo::Fdw::LfsObjectsProject'
has_many :lfs_objects, class_name: 'Geo::Fdw::LfsObject', through: :lfs_objects_projects
has_many :container_repositories, class_name: 'Geo::Fdw::ContainerRepository'
belongs_to :namespace, class_name: 'Geo::Fdw::Namespace'
scope :outside_shards, -> (shard_names) { where.not(repository_storage: Array(shard_names)) }
......
......@@ -223,6 +223,12 @@ class GeoNode < ApplicationRecord
Ci::JobArtifact.project_id_in(projects)
end
def container_repositories
return ContainerRepository.all unless selective_sync?
ContainerRepository.project_id_in(projects)
end
def lfs_objects
return LfsObject.all unless selective_sync?
......
# frozen_string_literal: true
require 'tempfile'
module Geo
class ContainerRepositorySync
include ExclusiveLeaseGuard
LEASE_TIMEOUT = 1.hour.freeze
attr_reader :name, :container_repository
def initialize(container_repository)
@container_repository = container_repository
@name = container_repository.path
end
def execute
try_obtain_lease do
# It makes sense to do this sequentially because in most cases images
# share some layers so it can save IO ops.
tags_to_sync.each do |tag|
sync_tag(tag[:name])
end
tags_to_remove.each do |tag|
container_repository.delete_tag_by_digest(tag[:digest])
end
true
end
end
private
def sync_tag(tag)
file = nil
manifest = client.repository_manifest(name, tag)
list_blobs(manifest).each do |digest|
next if container_repository.blob_exists?(digest)
file = client.pull_blob(name, digest)
container_repository.push_blob(digest, file.path)
file.unlink
end
container_repository.push_manifest(tag, manifest, manifest['mediaType'])
ensure
file.try(:unlink)
end
def list_blobs(manifest)
layers = manifest['layers'].map do |layer|
layer['digest']
end
layers.push(manifest.dig('config', 'digest')).compact
end
def primary_tags
@primary_tags ||= begin
client.repository_tags(name)['tags'].map do |tag|
{ name: tag, digest: client.repository_tag_digest(name, tag) }
end
end
end
def secondary_tags
@secondary_tags ||= begin
container_repository.tags.map do |tag|
{ name: tag.name, digest: tag.digest }
end
end
end
def tags_to_sync
primary_tags - secondary_tags
end
def tags_to_remove
secondary_tags - primary_tags
end
def lease_key
@lease_key ||= "#{self.class.name}:#{name}"
end
def lease_timeout
LEASE_TIMEOUT
end
# The client for primary registry
def client
ContainerRegistry::Client.new(
Gitlab.config.geo.registry_replication.primary_api_url,
token: ::Auth::ContainerRegistryAuthenticationService.pull_access_token(name)
)
end
end
end
# frozen_string_literal: true
module Geo
class ContainerRepositorySyncService
include ::Gitlab::Geo::ContainerRepositoryLogHelpers
attr_reader :container_repository
def initialize(container_repository)
@container_repository = container_repository
end
def execute
return unless Feature.enabled?(:geo_registry_replication)
log_info('Marking sync as started')
registry.start_sync!
Geo::ContainerRepositorySync.new(container_repository).execute
registry.finish_sync!
log_info('Finished sync')
rescue => e
fail_registry_sync!("Container repository sync failed", e)
end
private
def fail_registry_sync!(message, error)
log_error(message, error)
registry.fail_sync!(message, error)
end
# rubocop: disable CodeReuse/ActiveRecord
def registry
@registry ||= begin
Geo::ContainerRepositoryRegistry.find_or_initialize_by(
container_repository_id: container_repository.id
)
end
end
# rubocop: enable CodeReuse/ActiveRecord
end
end
......@@ -12,6 +12,7 @@
- cronjob:geo_scheduler_primary_per_shard_scheduler
- cronjob:geo_scheduler_secondary_per_shard_scheduler
- cronjob:geo_repository_verification_secondary_shard
- cronjob:geo_container_repository_sync_dispatch
- cronjob:historical_data
- cronjob:ldap_all_groups_sync
- cronjob:ldap_sync
......@@ -30,6 +31,7 @@
- geo:geo_hashed_storage_attachments_migration
- geo:geo_hashed_storage_migration
- geo:geo_project_sync
- geo:geo_container_repository_sync
- geo:geo_rename_repository
- geo:geo_repositories_clean_up
- geo:geo_repository_cleanup
......
# frozen_string_literal: true
module Geo
class ContainerRepositorySyncDispatchWorker < Geo::Scheduler::Secondary::SchedulerWorker
include CronjobQueue
def perform
return unless Feature.enabled?(:geo_registry_replication)
unless Gitlab.config.geo.registry_replication.enabled
log_info('Container Registry replication is not enabled')
return
end
super
end
private
def max_capacity
current_node.container_repositories_max_capacity
end
def schedule_job(repository_id)
job_id = Geo::ContainerRepositorySyncWorker.perform_async(repository_id)
{ id: repository_id, job_id: job_id } if job_id
end
# Pools for new resources to be transferred
#
# @return [Array] resources to be transferred
def load_pending_resources
resources = find_unsynced_repositories(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.size
if remaining_capacity.zero?
resources
else
resources + find_retryable_failed_repositories(batch_size: remaining_capacity)
end
end
def find_unsynced_repositories(batch_size:)
Geo::ContainerRepositoryRegistryFinder
.new(current_node_id: current_node.id)
.find_unsynced(batch_size: batch_size, except_repository_ids: scheduled_repository_ids)
.pluck_primary_key
end
def find_retryable_failed_repositories(batch_size:)
Geo::ContainerRepositoryRegistryFinder
.new(current_node_id: current_node.id)
.find_retryable_failed_ids(batch_size: batch_size, except_repository_ids: scheduled_repository_ids)
end
def scheduled_repository_ids
scheduled_jobs.map { |data| data[:id] }
end
end
end
# frozen_string_literal: true
module Geo
class ContainerRepositorySyncWorker
include ApplicationWorker
include GeoQueue
include Gitlab::Geo::LogHelpers
sidekiq_options retry: 3, dead: false
sidekiq_retry_in { |count| 30 * count }
sidekiq_retries_exhausted do |msg, _|
Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}"
end
def perform(id)
return unless Feature.enabled?(:geo_registry_replication)
repository = ContainerRepository.find_by_id(id)
if repository.nil?
log_error("Couldn't find container repository, skipping syncing", container_repository_id: id)
return
end
Geo::ContainerRepositorySyncService.new(repository).execute
end
end
end
# frozen_string_literal: true
class AddContainerRepositoryRegistry < ActiveRecord::Migration[5.0]
DOWNTIME = false
def change
create_table :container_repository_registry, id: :serial, force: :cascade do |t|
t.integer :container_repository_id, null: false
t.string :state
t.integer :retry_count, default: 0
t.string :last_sync_failure
t.datetime :retry_at
t.datetime :last_synced_at
t.datetime :created_at, null: false
t.index :container_repository_id, name: :index_container_repository_registry_on_repository_id, using: :btree
t.index :retry_at, name: :index_container_repository_registry_on_retry_at, using: :btree
t.index :state, name: :index_container_repository_registry_on_state, using: :btree
end
end
end
......@@ -10,12 +10,25 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20190314201959) do
ActiveRecord::Schema.define(version: 2019_06_12_211021) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
create_table "event_log_states", primary_key: "event_id", id: :bigint, force: :cascade do |t|
create_table "container_repository_registry", id: :serial, force: :cascade do |t|
t.integer "container_repository_id", null: false
t.string "state"
t.integer "retry_count", default: 0
t.string "last_sync_failure"
t.datetime "retry_at"
t.datetime "last_synced_at"
t.datetime "created_at", null: false
t.index ["container_repository_id"], name: "index_container_repository_registry_on_repository_id", using: :btree
t.index ["retry_at"], name: "index_container_repository_registry_on_retry_at", using: :btree
t.index ["state"], name: "index_container_repository_registry_on_state", using: :btree
end
create_table "event_log_states", primary_key: "event_id", force: :cascade do |t|
end
create_table "file_registry", id: :serial, force: :cascade do |t|
......
# frozen_string_literal: true
module EE
module ContainerRegistry
module Client
Error = Class.new(StandardError)
# In the future we may want to read a small chunks into memory and use chunked upload
# it will save us some IO.
def push_blob(name, digest, file_path)
payload = Faraday::UploadIO.new(file_path, 'application/octet-stream')
url = get_upload_url(name, digest)
headers = { 'Content-Type' => 'application/octet-stream', 'Content-Length' => payload.size.to_s }
response = faraday_upload.put(url, payload, headers)
raise Error.new("Push Blob error: #{response.body}") unless response.success?
true
end
def push_manifest(name, tag, manifest, manifest_type)
response = faraday.put("v2/#{name}/manifests/#{tag}", manifest, { 'Content-Type' => manifest_type })
raise Error.new("Push manifest error: #{response.body}") unless response.success?
true
end
def blob_exists?(name, digest)
faraday.head("/v2/#{name}/blobs/#{digest}").success?
end
# We currently use Faraday 0.12 which does not support streaming download yet
# Given that we aim to migrate to HTTP.rb client and that updating Faraday is potentialy
# dangerous, we use HTTP.rb here
def pull_blob(name, digest)
file = Tempfile.new("blob-#{digest}")
response = HTTP
.headers({ "Authorization" => "Bearer #{@options[:token]}" }) # rubocop:disable Gitlab/ModuleWithInstanceVariables
.follow
.get("#{@base_uri}/v2/#{name}/blobs/#{digest}") # rubocop:disable Gitlab/ModuleWithInstanceVariables
response.body.each do |chunk|
file.binmode
file.write(chunk)
end
file
ensure
file.close
end
private
def get_upload_url(name, digest)
response = faraday.post("/v2/#{name}/blobs/uploads/")
raise Error.new("Get upload URL error: #{response.body}") unless response.success?
response.headers['location']
upload_url = URI(response.headers['location'])
upload_url.query = "#{upload_url.query}&#{URI.encode_www_form(digest: digest)}"
upload_url
end
def faraday_upload
@faraday_upload ||= Faraday.new(@base_uri) do |conn| # rubocop:disable Gitlab/ModuleWithInstanceVariables
initialize_connection(conn, @options) # rubocop:disable Gitlab/ModuleWithInstanceVariables
conn.request :multipart
conn.request :url_encoded
conn.adapter :net_http
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module Geo
module ContainerRepositoryLogHelpers
include LogHelpers
def base_log_data(message)
{
class: self.class.name,
project_id: container_repository.project.id,
project_path: container_repository.project.full_path,
container_repository_name: container_repository.name,
message: message,
job_id: get_sidekiq_job_id
}.compact
end
end
end
end
......@@ -17,6 +17,7 @@ module Gitlab
geo_file_download_dispatch_worker
geo_migrated_local_files_clean_up_worker
geo_repository_sync_worker
geo_container_repository_sync_worker
geo_repository_verification_secondary_scheduler_worker
].freeze
......
......@@ -8,6 +8,36 @@ module Gitlab
include BaseEvent
def process
return unless Feature.enabled?(:geo_registry_replication)
registry.repository_updated!
job_id = ::Geo::ContainerRepositorySyncWorker.perform_async(event.container_repository_id)
log_event(job_id)
end
private
def skippable?
!!Gitlab.config.geo.registry_replication.enabled
end
# rubocop: disable CodeReuse/ActiveRecord
def registry
@registry ||= ::Geo::ContainerRepositoryRegistry.find_or_initialize_by(
container_repository_id: event.container_repository_id
)
end
# rubocop: enable CodeReuse/ActiveRecord
def log_event(job_id)
logger.event_info(
created_at,
'Docker Repository update',
container_repository_id: registry.container_repository_id,
skippable: skippable?,
project: registry.container_repository.project_id)
end
end
end
......
# frozen_string_literal: true
FactoryBot.define do
factory :container_repository_registry, class: Geo::ContainerRepositoryRegistry do
container_repository
last_sync_failure nil
last_synced_at nil
state :pending
trait :started do
state :started
end
trait :synced do
state :synced
last_synced_at { 5.days.ago }
end
trait :sync_failed do
state :failed
last_synced_at { 1.day.ago }
retry_count 2
last_sync_failure 'Random error'
end
trait :sync_started do
state :started
last_synced_at { 1.day.ago }
retry_count 0
end
end
end
......@@ -43,6 +43,10 @@ FactoryBot.define do
trait :cache_invalidation_event do
cache_invalidation_event factory: :geo_cache_invalidation_event
end
trait :container_repository_updated_event do
container_repository_updated_event factory: :geo_container_repository_updated_event
end
end
factory :geo_repository_created_event, class: Geo::RepositoryCreatedEvent do
......
# frozen_string_literal: true
require 'spec_helper'
describe Geo::ContainerRepositoryRegistryFinder, :geo, :geo_fdw do
include ::EE::GeoHelpers
let!(:secondary) { create(:geo_node) }
let!(:container_repository) { create(:container_repository) }
let!(:failed_registry) { create(:container_repository_registry, :sync_failed) }
let!(:synced_registry) { create(:container_repository_registry, :synced) }
let(:synced_group) { create(:group) }
let(:unsynced_group) { create(:group) }
let(:synced_project) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project, :broken_storage, group: unsynced_group) }
subject { described_class.new(current_node_id: secondary.id) }
before do
stub_current_geo_node(secondary)
end
context 'count all the things' do
describe '#count_syncable' do
it 'returns number of container repositories' do
result = subject.count_syncable
expect(result).to eq(3)
end
end
describe '#count_synced' do
it 'returns only synced registry' do
result = subject.count_synced
expect(result).to eq(1)
end
end
describe '#count_failed' do
it 'returns only failed registry' do
result = subject.count_failed
expect(result).to eq(1)
end
end
describe '#count_registry' do
it 'returns number of all registries' do
result = subject.count_registry
expect(result).to eq(2)
end
end
end
context 'find all the things' do
describe '#find_unsynced' do
it 'returns repositories without an entry in the tracking database' do
repositories = subject.find_unsynced(batch_size: 10)
expect(repositories).to match_ids(container_repository)
end
it 'returns repositories without an entry in the tracking database, excluding exception list' do
except_repository = create(:container_repository)
repositories = subject.find_unsynced(batch_size: 10, except_repository_ids: [except_repository.id])
expect(repositories).to match_ids(container_repository)
end
context 'with selective sync by namespace' do
let(:secondary) { create(:geo_node, selective_sync_type: 'namespaces', namespaces: [synced_group]) }
it 'returns repositories without an entry in the tracking database, excluding exception list' do
except_repository = create(:container_repository, project: synced_project)
repository = create(:container_repository, project: synced_project, name: 'second')
repositories = subject.find_unsynced(batch_size: 10, except_repository_ids: [except_repository.id])
expect(repositories).to match_ids(repository)
end
end
context 'with selective sync by shard' do
let(:secondary) { create(:geo_node, selective_sync_type: 'shards', selective_sync_shards: ['broken']) }
it 'returns repositories without an entry in the tracking database' do
unsynced_repository = create(:container_repository, project: unsynced_project)
repositories = subject.find_unsynced(batch_size: 10)
expect(repositories).to match_ids(unsynced_repository)
end
end
end
describe '#find_retryable_failed_ids' do
it 'returns only registry that have to be retried' do
result = subject.find_retryable_failed_ids(batch_size: 10)
expect(result).to eq([failed_registry.container_repository_id])
end
it 'returns only registry that have to be retried, excluding exception list' do
except_repository = create(:container_repository)
create(:container_repository_registry, :sync_failed, container_repository: except_repository)
result = subject.find_retryable_failed_ids(batch_size: 10, except_repository_ids: [except_repository.id])
expect(result).to eq([failed_registry.container_repository_id])
end
end
end
end
# coding: utf-8
# frozen_string_literal: true
require 'spec_helper'
describe ContainerRegistry::Client do
let(:token) { '12345' }
let(:options) { { token: token } }
let(:client) { described_class.new("http://registry", options) }
describe '#push_blob' do
let(:file) do
file = Tempfile.new('test1')
file.write('bla')
file.close
file
end
it 'PUT /v2/:name/blobs/uploads/url?digest=mytag' do
stub_request(:put, "http://registry/v2/group/test/blobs/uploads/abcd?digest=mytag")
.with(
headers: {
'Authorization' => 'bearer 12345',
'Content-Length' => '3',
'Content-Type' => 'application/octet-stream'
})
.to_return(status: 200, body: "", headers: {})
stub_request(:post, "http://registry/v2/group/test/blobs/uploads/")
.with(
headers: {
'Accept' => 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json',
'Authorization' => 'bearer 12345'
})
.to_return(status: 200, body: "", headers: { 'Location' => 'http://registry/v2/group/test/blobs/uploads/abcd' })
expect(client.push_blob('group/test', 'mytag', file.path)).to eq(true)
end
it 'raises error if response status is not 200' do
stub_request(:put, "http://registry/v2/group/test/blobs/uploads/abcd?digest=mytag")
.with(
headers: {
'Authorization' => 'bearer 12345',
'Content-Length' => '3',
'Content-Type' => 'application/octet-stream'
})
.to_return(status: 404, body: "", headers: {})
stub_request(:post, "http://registry/v2/group/test/blobs/uploads/")
.with(
headers: {
'Accept' => 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json',
'Authorization' => 'bearer 12345'
})
.to_return(status: 200, body: "", headers: { 'Location' => 'http://registry/v2/group/test/blobs/uploads/abcd' })
expect { client.push_blob('group/test', 'mytag', file.path) }
.to raise_error(EE::ContainerRegistry::Client::Error)
end
end
describe '#push_manifest' do
let(:manifest) { 'manifest' }
let(:manifest_type) { 'application/vnd.docker.distribution.manifest.v2+json' }
it 'PUT v2/:name/manifests/:tag' do
stub_request(:put, "http://registry/v2/group/test/manifests/my-tag")
.with(
body: "manifest",
headers: {
'Accept' => 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json',
'Authorization' => 'bearer 12345',
'Content-Type' => 'application/vnd.docker.distribution.manifest.v2+json'
})
.to_return(status: 200, body: "", headers: {})
expect(client.push_manifest('group/test', 'my-tag', manifest, manifest_type)).to eq(true)
end
it 'raises error if response status is not 200' do
stub_request(:put, "http://registry/v2/group/test/manifests/my-tag")
.with(
body: "manifest",
headers: {
'Accept' => 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json',
'Authorization' => 'bearer 12345',
'Content-Type' => 'application/vnd.docker.distribution.manifest.v2+json'
})
.to_return(status: 404, body: "", headers: {})
expect { client.push_manifest('group/test', 'my-tag', manifest, manifest_type) }
.to raise_error(EE::ContainerRegistry::Client::Error)
end
end
describe '#blob_exists?' do
let(:digest) { 'digest' }
it 'returns true' do
stub_request(:head, "http://registry/v2/group/test/blobs/digest")
.with(
headers: {
'Accept' => 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json',
'Authorization' => 'bearer 12345'
})
.to_return(status: 200, body: "", headers: {})
expect(client.blob_exists?('group/test', digest)).to eq(true)
end
it 'returns false' do
stub_request(:head, "http://registry/v2/group/test/blobs/digest")
.with(
headers: {
'Accept' => 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json',
'Authorization' => 'bearer 12345'
})
.to_return(status: 404, body: "", headers: {})
expect(client.blob_exists?('group/test', digest)).to eq(false)
end
end
end
......@@ -33,6 +33,7 @@ describe Gitlab::Geo::CronManager, :geo do
geo_repository_verification_primary_batch_worker
geo_repository_sync_worker
geo_file_download_dispatch_worker
geo_container_repository_sync_worker
geo_repository_verification_secondary_scheduler_worker
geo_metrics_update_worker
geo_prune_event_log_worker
......@@ -55,6 +56,7 @@ describe Gitlab::Geo::CronManager, :geo do
[
job('geo_file_download_dispatch_worker'),
job('geo_repository_sync_worker'),
job('geo_container_repository_sync_worker'),
job('geo_repository_verification_secondary_scheduler_worker'),
job('geo_migrated_local_files_clean_up_worker')
]
......
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Events::ContainerRepositoryUpdatedEvent, :postgresql, :clean_gitlab_redis_shared_state do
let(:logger) { Gitlab::Geo::LogCursor::Logger.new(described_class, Logger::INFO) }
let(:event_log) { create(:geo_event_log, :container_repository_updated_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
let(:container_repository_updated_event) { event_log.container_repository_updated_event }
let(:container_repositoy) { container_repository_updated_event.container_repository }
subject { described_class.new(container_repository_updated_event, Time.now, logger) }
around do |example|
Sidekiq::Testing.fake! { example.run }
end
describe '#process' do
it 'does not create a new project registry' do
expect { subject.process }.not_to change(Geo::ProjectRegistry, :count)
end
it 'schedules a Geo::ContainerRepositorySyncWorker' do
expect(::Geo::ContainerRepositorySyncWorker).to receive(:perform_async)
.with(container_repositoy.id)
subject.process
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::ContainerRepositoryRegistry, :geo do
set(:container_repository_registry) { create(:container_repository_registry) }
describe 'relationships' do
it { is_expected.to belong_to(:container_repository) }
end
describe 'scopes' do
describe '.repository_id_not_in' do
it 'returns registries scoped by ids' do
registry1 = create(:container_repository_registry)
registry2 = create(:container_repository_registry)
container_repository1_id = registry1.container_repository_id
container_repository2_id = registry2.container_repository_id
result = described_class.repository_id_not_in([container_repository1_id, container_repository2_id])
expect(result).to match_ids([container_repository_registry])
end
end
end
describe '#start_sync!' do
it 'updates last_synced_at' do
expect { container_repository_registry.start_sync! }
.to change { container_repository_registry.reload.last_synced_at }
end
end
describe '#finish_sync!' do
it 'finishes registry record' do
container_repository_registry = create(:container_repository_registry, :sync_started)
container_repository_registry.finish_sync!
expect(container_repository_registry.reload).to have_attributes(
retry_count: 0,
retry_at: nil,
last_sync_failure: nil,
state: 'synced'
)
end
end
describe '#fail_sync!' do
it 'fails registry record' do
error = StandardError.new('Something is wrong')
container_repository_registry.fail_sync!('Failed', error)
expect(container_repository_registry).to have_attributes(
retry_count: 1,
retry_at: be_present,
last_sync_failure: 'Failed: Something is wrong',
state: 'failed'
)
end
end
describe '#repository_updated!' do
set(:container_repository_registry) { create(:container_repository_registry, :synced) }
it 'resets the state of the sync' do
container_repository_registry.repository_updated!
expect(container_repository_registry.pending?).to be true
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::Fdw::ContainerRepository, :geo, type: :model do
context 'relationships' do
it { is_expected.to belong_to(:project).class_name('Geo::Fdw::Project') }
end
end
......@@ -7,6 +7,7 @@ RSpec.describe Geo::Fdw::Project, :geo, type: :model do
it { is_expected.to have_many(:job_artifacts).class_name('Geo::Fdw::Ci::JobArtifact') }
it { is_expected.to have_many(:lfs_objects_projects).class_name('Geo::Fdw::LfsObjectsProject') }
it { is_expected.to have_many(:lfs_objects).class_name('Geo::Fdw::LfsObject').through(:lfs_objects_projects) }
it { is_expected.to have_many(:container_repositories).class_name('Geo::Fdw::ContainerRepository') }
end
describe '.search' do
......
......@@ -36,7 +36,7 @@ describe API::ContainerRegistryEvent do
end
it 'returns 401 error status when feature is disabled' do
allow(Feature).to receive(:enabled?).with(:geo_registry_replication).and_return(false)
stub_feature_flags(geo_registry_replication: false)
expect(::ContainerRegistry::EventHandler).not_to receive(:new)
......
# frozen_string_literal: true
require 'spec_helper'
describe Geo::ContainerRepositorySyncService, :geo do
include ::EE::GeoHelpers
include ExclusiveLeaseHelpers
set(:secondary) { create(:geo_node) }
before do
stub_current_geo_node(secondary)
end
describe '#execute' do
let(:container_repository_registry) { create(:container_repository_registry, :started) }
it 'fails registry record if there was exception' do
allow_any_instance_of(Geo::ContainerRepositorySync)
.to receive(:execute).and_raise 'Sync Error'
described_class.new(container_repository_registry.container_repository).execute
expect(container_repository_registry.reload.failed?).to be_truthy
end
it 'finishes registry record if there was no exception' do
expect_any_instance_of(Geo::ContainerRepositorySync)
.to receive(:execute)
described_class.new(container_repository_registry.container_repository).execute
expect(container_repository_registry.reload.synced?).to be_truthy
end
it 'finishes registry record if there was no exception and registy does not exist' do
container_repository = create(:container_repository)
expect_any_instance_of(Geo::ContainerRepositorySync)
.to receive(:execute)
described_class.new(container_repository).execute
registry = Geo::ContainerRepositoryRegistry.find_by(container_repository_id: container_repository.id)
expect(registry.synced?).to be_truthy
end
it 'does not do anything if feature is disabled' do
stub_feature_flags(geo_registry_replication: false)
expect_any_instance_of(Geo::ContainerRepositorySync)
.not_to receive(:execute)
result = described_class.new(container_repository_registry.container_repository).execute
expect(result).to be_nil
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::ContainerRepositorySync, :geo do
let(:group) { create(:group, name: 'group') }
let(:project) { create(:project, path: 'test', group: group) }
let(:container_repository) do
create(:container_repository, name: 'my_image', project: project)
end
before do
stub_container_registry_config(enabled: true,
api_url: 'http://registry.gitlab',
host_port: 'registry.gitlab')
stub_registry_replication_config(enabled: true,
primary_api_url: 'http://primary.registry.gitlab')
stub_request(:get, "http://registry.gitlab/v2/group/test/my_image/tags/list")
.with(
headers: {
'Accept' => 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json',
'Authorization' => 'bearer token'
})
.to_return(
status: 200,
body: JSON.dump(tags: %w(obsolete)),
headers: { 'Content-Type' => 'application/json' })
stub_request(:get, "http://primary.registry.gitlab/v2/group/test/my_image/tags/list")
.with(
headers: { 'Authorization' => 'bearer pull-token' })
.to_return(
status: 200,
body: JSON.dump(tags: %w(tag-to-sync)),
headers: { 'Content-Type' => 'application/json' })
stub_request(:head, "http://primary.registry.gitlab/v2/group/test/my_image/manifests/tag-to-sync")
.with(
headers: {
'Accept' => 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json',
'Authorization' => 'bearer pull-token'
})
.to_return(status: 200, body: "", headers: { 'docker-content-digest' => 'sha256:ccccc' })
stub_request(:head, "http://registry.gitlab/v2/group/test/my_image/manifests/obsolete")
.with(
headers: {
'Accept' => 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json',
'Authorization' => 'bearer token'
})
.to_return(status: 200, body: "", headers: { 'docker-content-digest' => 'sha256:aaaaa' })
end
describe 'execute' do
it 'determines list of tags to sync and to remove correctly' do
expect(container_repository).to receive(:delete_tag_by_digest).with('sha256:aaaaa')
expect_any_instance_of(described_class).to receive(:sync_tag)
described_class.new(container_repository).execute
end
end
end
......@@ -5,5 +5,11 @@ module EE
.to receive(:blob_data_at).with(sha, '.gitlab/.gitlab-webide.yml')
.and_return(content)
end
def stub_registry_replication_config(registry_settings)
allow(::Gitlab.config.geo.registry_replication).to receive_messages(registry_settings)
allow(Auth::ContainerRegistryAuthenticationService)
.to receive(:pull_access_token).and_return('pull-token')
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::ContainerRepositorySyncDispatchWorker, :geo, :geo_fdw do
include ::EE::GeoHelpers
include ExclusiveLeaseHelpers
let(:primary) { create(:geo_node, :primary) }
let(:secondary) { create(:geo_node) }
before do
stub_current_geo_node(secondary)
stub_exclusive_lease(renew: true)
allow_any_instance_of(described_class).to receive(:over_time?).and_return(false)
stub_registry_replication_config(enabled: true)
end
it 'does not schedule anything when tracking database is not configured' do
create(:container_repository)
allow(Gitlab::Geo).to receive(:geo_database_configured?) { false }
expect(Geo::ContainerRepositorySyncWorker).not_to receive(:perform_async)
subject.perform
# We need to unstub here or the DatabaseCleaner will have issues since it
# will appear as though the tracking DB were not available
allow(Gitlab::Geo).to receive(:geo_database_configured?).and_call_original
end
it 'does not schedule anything when feature is disabled' do
create(:container_repository)
stub_feature_flags(geo_registry_replication: false)
expect(Geo::ContainerRepositorySyncWorker).not_to receive(:perform_async)
subject.perform
end
it 'does not schedule anything when node is disabled' do
create(:container_repository)
secondary.enabled = false
secondary.save
expect(Geo::ContainerRepositorySyncWorker).not_to receive(:perform_async)
subject.perform
end
context 'Sync condition' do
let(:container_repository) { create(:container_repository) }
it 'performs Geo::ContainerRepositorySyncWorker' do
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async).with(container_repository.id)
subject.perform
end
it 'performs Geo::ContainerRepositorySyncWorker for failed syncs' do
container_repository_registry = create(:container_repository_registry, :sync_failed)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async)
.with(container_repository_registry.container_repository_id).once.and_return(spy)
subject.perform
end
it 'does not perform Geo::ContainerRepositorySyncWorker for synced repositories' do
create(:container_repository_registry, :synced)
expect(Geo::ContainerRepositorySyncWorker).not_to receive(:perform_async)
subject.perform
end
context 'with a failed sync' do
let(:failed_registry) { create(:container_repository_registry, :sync_failed) }
it 'does not stall backfill' do
unsynced = create(:container_repository)
stub_const('Geo::Scheduler::SchedulerWorker::DB_RETRIEVE_BATCH_SIZE', 1)
expect(Geo::ContainerRepositorySyncWorker).not_to receive(:perform_async).with(failed_registry.container_repository_id)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async).with(unsynced.id)
subject.perform
end
it 'does not retry failed files when retry_at is tomorrow' do
failed_registry = create(:container_repository_registry, :sync_failed, retry_at: Date.tomorrow)
expect(Geo::ContainerRepositorySyncWorker)
.not_to receive(:perform_async).with( failed_registry.container_repository_id)
subject.perform
end
it 'retries failed files when retry_at is in the past' do
failed_registry = create(:container_repository_registry, :sync_failed, retry_at: Date.yesterday)
expect(Geo::ContainerRepositorySyncWorker)
.to receive(:perform_async).with(failed_registry.container_repository_id)
subject.perform
end
end
end
context 'when node has namespace restrictions', :request_store do
let(:synced_group) { create(:group) }
let(:project_in_synced_group) { create(:project, group: synced_group) }
let(:unsynced_project) { create(:project) }
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
allow(ProjectCacheWorker).to receive(:perform_async).and_return(true)
allow(::Gitlab::Geo).to receive(:current_node).and_call_original
Rails.cache.write(:current_node, secondary.to_json)
allow(::GeoNode).to receive(:current_node).and_return(secondary)
end
it 'does not perform Geo::ContainerRepositorySyncWorker for repositories that does not belong to selected namespaces ' do
container_repository = create(:container_repository, project: project_in_synced_group)
create(:container_repository, project: unsynced_project)
expect(Geo::ContainerRepositorySyncWorker).to receive(:perform_async)
.with(container_repository.id).once.and_return(spy)
subject.perform
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::ContainerRepositorySyncWorker, :geo do
describe '#perform' do
it 'runs ContainerRepositorySyncService' do
container_repository = create(:container_repository)
service = spy(:service)
expect(Geo::ContainerRepositorySyncService).to receive(:new).with(container_repository).and_return(service)
described_class.new.perform(container_repository.id)
expect(service).to have_received(:execute)
end
it 'does not run ContainerRepositorySyncService if feature disabled' do
stub_feature_flags(geo_registry_replication: false)
container_repository = create(:container_repository)
expect(Geo::ContainerRepositorySyncService).not_to receive(:new).with(container_repository)
described_class.new.perform(container_repository.id)
end
it 'logs error when repository does not exist' do
worker = described_class.new
expect(worker).to receive(:log_error)
.with("Couldn't find container repository, skipping syncing", container_repository_id: 20)
expect(Geo::ContainerRepositorySyncService).not_to receive(:new)
worker.perform(20)
end
end
end
......@@ -110,3 +110,5 @@ module ContainerRegistry
end
end
end
ContainerRegistry::Client.prepend_if_ee('EE::ContainerRegistry::Client')
......@@ -2,7 +2,7 @@
FactoryBot.define do
factory :container_repository do
name 'test_image'
sequence(:name) { |n| "test_image_#{n}" }
project
transient do
......
......@@ -145,6 +145,19 @@ describe Auth::ContainerRegistryAuthenticationService do
it_behaves_like 'not a container repository factory'
end
describe '#pull_access_token' do
let(:project) { create(:project) }
let(:token) { described_class.pull_access_token(project.full_path) }
subject { { token: token } }
it_behaves_like 'an accessible' do
let(:actions) { ['pull'] }
end
it_behaves_like 'not a container repository factory'
end
context 'user authorization' do
let(:current_user) { create(:user) }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment