Commit 8f44590a authored by Stan Hu's avatar Stan Hu Committed by Nick Thomas

Geo: Migrate CI artifacts into their own registry table

parent 6b8d2328
...@@ -18,6 +18,7 @@ ActiveSupport::Inflector.inflections do |inflect| ...@@ -18,6 +18,7 @@ ActiveSupport::Inflector.inflections do |inflect|
project_auto_devops project_auto_devops
project_registry project_registry
file_registry file_registry
job_artifact_registry
) )
inflect.acronym 'EE' inflect.acronym 'EE'
end end
module Geo module Geo
class JobArtifactRegistryFinder < FileRegistryFinder class JobArtifactRegistryFinder < RegistryFinder
def count_local_job_artifacts def count_local_job_artifacts
local_job_artifacts.count local_job_artifacts.count
end end
...@@ -21,25 +21,25 @@ module Geo ...@@ -21,25 +21,25 @@ module Geo
end end
def count_registry_job_artifacts def count_registry_job_artifacts
Geo::FileRegistry.job_artifacts.count Geo::JobArtifactRegistry.count
end end
# Find limited amount of non replicated lfs objects. # Find limited amount of non replicated lfs objects.
# #
# You can pass a list with `except_file_ids:` so you can exclude items you # You can pass a list with `except_artifact_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet # already scheduled but haven't finished and aren't persisted to the database yet
# #
# TODO: Alternative here is to use some sort of window function with a cursor instead # TODO: Alternative here is to use some sort of window function with a cursor instead
# of simply limiting the query and passing a list of items we don't want # of simply limiting the query and passing a list of items we don't want
# #
# @param [Integer] batch_size used to limit the results returned # @param [Integer] batch_size used to limit the results returned
# @param [Array<Integer>] except_file_ids ids that will be ignored from the query # @param [Array<Integer>] except_artifact_ids ids that will be ignored from the query
def find_unsynced_job_artifacts(batch_size:, except_file_ids: []) def find_unsynced_job_artifacts(batch_size:, except_artifact_ids: [])
relation = relation =
if use_legacy_queries? if use_legacy_queries?
legacy_find_unsynced_job_artifacts(except_file_ids: except_file_ids) legacy_find_unsynced_job_artifacts(except_artifact_ids: except_artifact_ids)
else else
fdw_find_unsynced_job_artifacts(except_file_ids: except_file_ids) fdw_find_unsynced_job_artifacts(except_artifact_ids: except_artifact_ids)
end end
relation.limit(batch_size) relation.limit(batch_size)
...@@ -68,13 +68,21 @@ module Geo ...@@ -68,13 +68,21 @@ module Geo
job_artifacts.with_files_stored_locally job_artifacts.with_files_stored_locally
end end
def find_synced_job_artifacts_registries
Geo::JobArtifactRegistry.synced
end
def find_failed_job_artifacts_registries
Geo::JobArtifactRegistry.failed
end
private private
def find_synced_job_artifacts def find_synced_job_artifacts
if use_legacy_queries? if use_legacy_queries?
legacy_find_synced_job_artifacts legacy_find_synced_job_artifacts
else else
fdw_find_job_artifacts.merge(Geo::FileRegistry.synced) fdw_find_job_artifacts.merge(find_synced_job_artifacts_registries)
end end
end end
...@@ -82,7 +90,7 @@ module Geo ...@@ -82,7 +90,7 @@ module Geo
if use_legacy_queries? if use_legacy_queries?
legacy_find_failed_job_artifacts legacy_find_failed_job_artifacts
else else
fdw_find_job_artifacts.merge(Geo::FileRegistry.failed) fdw_find_job_artifacts.merge(find_synced_job_artifacts_registries)
end end
end end
...@@ -91,25 +99,23 @@ module Geo ...@@ -91,25 +99,23 @@ module Geo
# #
def fdw_find_job_artifacts def fdw_find_job_artifacts
fdw_job_artifacts.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_job_artifacts_table}.id") fdw_job_artifacts.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_job_artifacts_table}.id")
.with_files_stored_locally .with_files_stored_locally
.merge(Geo::FileRegistry.job_artifacts)
end end
def fdw_find_unsynced_job_artifacts(except_file_ids:) def fdw_find_unsynced_job_artifacts(except_artifact_ids:)
fdw_job_artifacts.joins("LEFT OUTER JOIN file_registry fdw_job_artifacts.joins("LEFT OUTER JOIN job_artifact_registry
ON file_registry.file_id = #{fdw_job_artifacts_table}.id ON job_artifact_registry.artifact_id = #{fdw_job_artifacts_table}.id")
AND file_registry.file_type = 'job_artifact'")
.with_files_stored_locally .with_files_stored_locally
.where(file_registry: { id: nil }) .where(job_artifact_registry: { id: nil })
.where.not(id: except_file_ids) .where.not(id: except_artifact_ids)
end end
def fdw_find_migrated_local_job_artifacts(except_file_ids:) def fdw_find_migrated_local_job_artifacts(except_file_ids:)
fdw_job_artifacts.joins("INNER JOIN file_registry ON file_registry.file_id = #{fdw_job_artifacts_table}.id") fdw_job_artifacts.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_job_artifacts_table}.id")
.with_files_stored_remotely .with_files_stored_remotely
.where.not(id: except_file_ids) .where.not(id: except_file_ids)
.merge(Geo::FileRegistry.job_artifacts) .merge(Geo::JobArtifactRegistry.all)
end end
def fdw_job_artifacts def fdw_job_artifacts
...@@ -131,7 +137,7 @@ module Geo ...@@ -131,7 +137,7 @@ module Geo
def legacy_find_synced_job_artifacts def legacy_find_synced_job_artifacts
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
local_job_artifacts, local_job_artifacts,
Geo::FileRegistry.job_artifacts.synced.pluck(:file_id), find_synced_job_artifacts_registries.pluck(:artifact_id),
Ci::JobArtifact Ci::JobArtifact
) )
end end
...@@ -139,23 +145,28 @@ module Geo ...@@ -139,23 +145,28 @@ module Geo
def legacy_find_failed_job_artifacts def legacy_find_failed_job_artifacts
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
local_job_artifacts, local_job_artifacts,
Geo::FileRegistry.job_artifacts.failed.pluck(:file_id), find_failed_job_artifacts_registries.pluck(:artifact_id),
Ci::JobArtifact Ci::JobArtifact
) )
end end
def legacy_find_unsynced_job_artifacts(except_file_ids:) def legacy_find_unsynced_job_artifacts(except_artifact_ids:)
registry_file_ids = legacy_pluck_registry_file_ids(file_types: :job_artifact) | except_file_ids registry_artifact_ids = legacy_pluck_artifact_ids(include_registry_ids: except_artifact_ids)
legacy_left_outer_join_registry_ids( legacy_left_outer_join_registry_ids(
local_job_artifacts, local_job_artifacts,
registry_file_ids, registry_artifact_ids,
Ci::JobArtifact Ci::JobArtifact
) )
end end
def legacy_pluck_artifact_ids(include_registry_ids:)
ids = Geo::JobArtifactRegistry.pluck(:artifact_id)
(ids + include_registry_ids).uniq
end
def legacy_find_migrated_local_job_artifacts(except_file_ids:) def legacy_find_migrated_local_job_artifacts(except_file_ids:)
registry_file_ids = Geo::FileRegistry.job_artifacts.pluck(:file_id) - except_file_ids registry_file_ids = Geo::JobArtifactRegistry.pluck(:artifact_id) - except_file_ids
legacy_inner_join_registry_ids( legacy_inner_join_registry_ids(
job_artifacts.with_files_stored_remotely, job_artifacts.with_files_stored_remotely,
......
module Geo::Syncable
extend ActiveSupport::Concern
included do
scope :failed, -> { where(success: false) }
scope :synced, -> { where(success: true) }
scope :retry_due, -> { where('retry_at is NULL OR retry_at < ?', Time.now) }
end
end
class Geo::FileRegistry < Geo::BaseRegistry class Geo::FileRegistry < Geo::BaseRegistry
scope :failed, -> { where(success: false) } include Geo::Syncable
scope :synced, -> { where(success: true) }
scope :retry_due, -> { where('retry_at is NULL OR retry_at < ?', Time.now) }
scope :lfs_objects, -> { where(file_type: :lfs) } scope :lfs_objects, -> { where(file_type: :lfs) }
scope :job_artifacts, -> { where(file_type: :job_artifact) }
scope :attachments, -> { where(file_type: Geo::FileService::DEFAULT_OBJECT_TYPES) } scope :attachments, -> { where(file_type: Geo::FileService::DEFAULT_OBJECT_TYPES) }
end end
class Geo::JobArtifactRegistry < Geo::BaseRegistry
include Geo::Syncable
end
...@@ -29,10 +29,15 @@ module Geo ...@@ -29,10 +29,15 @@ module Geo
end end
def update_registry(bytes_downloaded, success:) def update_registry(bytes_downloaded, success:)
transfer = Geo::FileRegistry.find_or_initialize_by( transfer =
file_type: object_type, if object_type.to_sym == :job_artifact
file_id: object_db_id Geo::JobArtifactRegistry.find_or_initialize_by(artifact_id: object_db_id)
) else
Geo::FileRegistry.find_or_initialize_by(
file_type: object_type,
file_id: object_db_id
)
end
transfer.bytes = bytes_downloaded transfer.bytes = bytes_downloaded
transfer.success = success transfer.success = success
......
...@@ -11,7 +11,7 @@ module Geo ...@@ -11,7 +11,7 @@ module Geo
log_info('Lease obtained') log_info('Lease obtained')
unless file_registry unless file_registry
log_error('Could not find file_registry', type: object_type, id: object_db_id) log_error('Could not find file_registry')
return return
end end
...@@ -34,7 +34,11 @@ module Geo ...@@ -34,7 +34,11 @@ module Geo
def file_registry def file_registry
strong_memoize(:file_registry) do strong_memoize(:file_registry) do
::Geo::FileRegistry.find_by(file_type: object_type, file_id: object_db_id) if object_type.to_sym == :job_artifact
::Geo::JobArtifactRegistry.find_by(artifact_id: object_db_id)
else
::Geo::FileRegistry.find_by(file_type: object_type, file_id: object_db_id)
end
end end
end end
......
...@@ -65,14 +65,22 @@ module Geo ...@@ -65,14 +65,22 @@ module Geo
end end
def find_unsynced_job_artifacts_ids(batch_size:) def find_unsynced_job_artifacts_ids(batch_size:)
job_artifacts_finder.find_unsynced_job_artifacts(batch_size: batch_size, except_file_ids: scheduled_file_ids(:job_artifact)) job_artifacts_finder.find_unsynced_job_artifacts(batch_size: batch_size, except_artifact_ids: scheduled_file_ids(:job_artifact))
.pluck(:id) .pluck(:id)
.map { |id| [:job_artifact, id] } .map { |id| [:job_artifact, id] }
end end
def find_failed_upload_object_ids(batch_size:) def find_failed_upload_object_ids(batch_size:)
file_registry_finder.find_failed_file_registries(batch_size: batch_size) file_ids = file_registry_finder.find_failed_file_registries(batch_size: batch_size)
.pluck(:file_type, :file_id) .pluck(:file_type, :file_id)
artifact_ids = find_failed_artifact_ids(batch_size: batch_size)
take_batch(file_ids, artifact_ids)
end
def find_failed_artifact_ids(batch_size:)
job_artifacts_finder.find_failed_job_artifacts_registries.limit(batch_size)
.pluck(:artifact_id).map { |id| [:job_artifact, id] }
end end
def scheduled_file_ids(file_types) def scheduled_file_ids(file_types)
......
---
title: 'Geo: Migrate CI job artifacts into their own registry table'
merge_request:
author:
type: performance
class MigrateCiJobArtifactsToSeparateRegistry < ActiveRecord::Migration
def up
tracking_db.create_table :job_artifact_registry, force: :cascade do |t|
t.datetime_with_timezone "created_at"
t.datetime_with_timezone "retry_at"
t.integer "bytes", limit: 8
t.integer "artifact_id", unique: true
t.integer "retry_count"
t.boolean "success"
t.string "sha256"
end
Geo::TrackingBase.transaction do
execute('LOCK TABLE file_registry IN EXCLUSIVE MODE')
execute <<~EOF
INSERT INTO job_artifact_registry (created_at, retry_at, artifact_id, bytes, retry_count, success, sha256)
SELECT created_at, retry_at, file_id, bytes, retry_count, success, sha256
FROM file_registry WHERE file_type = 'job_artifact'
EOF
execute <<~EOF
CREATE OR REPLACE FUNCTION replicate_job_artifact_registry()
RETURNS trigger AS
$BODY$
BEGIN
IF (TG_OP = 'UPDATE') THEN
UPDATE job_artifact_registry SET (retry_at, bytes, retry_count, success, sha256) = (NEW.retry_at, NEW.bytes, NEW.retry_count, NEW.success, NEW.sha256);
ELSEIF (TG_OP = 'INSERT') THEN
INSERT INTO job_artifact_registry (created_at, retry_at, artifact_id, bytes, retry_count, success, sha256)
VALUES (NEW.created_at, NEW.retry_at, NEW.file_id, NEW.bytes, NEW.retry_count, NEW.success, NEW.sha256);
END IF;
RETURN NEW;
END;
$BODY$
LANGUAGE 'plpgsql'
VOLATILE;
EOF
execute <<~EOF
CREATE TRIGGER replicate_job_artifact_registry
AFTER INSERT OR UPDATE ON file_registry
FOR EACH ROW WHEN (NEW.file_type = 'job_artifact') EXECUTE PROCEDURE replicate_job_artifact_registry();
EOF
end
tracking_db.add_index :job_artifact_registry, :retry_at
tracking_db.add_index :job_artifact_registry, :success
end
def down
tracking_db.drop_table :job_artifact_registry
execute('DROP TRIGGER IF EXISTS replicate_job_artifact_registry ON file_registry')
execute('DROP FUNCTION IF EXISTS replicate_job_artifact_registry()')
end
def execute(statement)
tracking_db.execute(statement)
end
def tracking_db
Geo::TrackingBase.connection
end
end
class DeleteJobArtifactsFromFileRegistry < ActiveRecord::Migration
def up
execute("DELETE FROM file_registry WHERE file_type = 'job_artifact'")
execute('DROP TRIGGER IF EXISTS replicate_job_artifact_registry ON file_registry')
execute('DROP FUNCTION IF EXISTS replicate_job_artifact_registry()')
end
end
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
# #
# It's strongly recommended that you check this file into your version control system. # It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20180320013929) do ActiveRecord::Schema.define(version: 20180331055706) do
# These are extensions that must be enabled in order to support this database # These are extensions that must be enabled in order to support this database
enable_extension "plpgsql" enable_extension "plpgsql"
...@@ -35,6 +35,19 @@ ActiveRecord::Schema.define(version: 20180320013929) do ...@@ -35,6 +35,19 @@ ActiveRecord::Schema.define(version: 20180320013929) do
add_index "file_registry", ["retry_at"], name: "index_file_registry_on_retry_at", using: :btree add_index "file_registry", ["retry_at"], name: "index_file_registry_on_retry_at", using: :btree
add_index "file_registry", ["success"], name: "index_file_registry_on_success", using: :btree add_index "file_registry", ["success"], name: "index_file_registry_on_success", using: :btree
create_table "job_artifact_registry", force: :cascade do |t|
t.datetime_with_timezone "created_at"
t.datetime_with_timezone "retry_at"
t.integer "bytes", limit: 8
t.integer "artifact_id"
t.integer "retry_count"
t.boolean "success"
t.string "sha256"
end
add_index "job_artifact_registry", ["retry_at"], name: "index_job_artifact_registry_on_retry_at", using: :btree
add_index "job_artifact_registry", ["success"], name: "index_job_artifact_registry_on_success", using: :btree
create_table "project_registry", force: :cascade do |t| create_table "project_registry", force: :cascade do |t|
t.integer "project_id", null: false t.integer "project_id", null: false
t.datetime "last_repository_synced_at" t.datetime "last_repository_synced_at"
......
...@@ -242,7 +242,7 @@ module Gitlab ...@@ -242,7 +242,7 @@ module Gitlab
end end
def handle_job_artifact_deleted_event(event, created_at) def handle_job_artifact_deleted_event(event, created_at)
file_registry_job_artifacts = ::Geo::FileRegistry.job_artifacts.where(file_id: event.job_artifact_id) file_registry_job_artifacts = ::Geo::JobArtifactRegistry.where(artifact_id: event.job_artifact_id)
return unless file_registry_job_artifacts.any? # avoid race condition return unless file_registry_job_artifacts.any? # avoid race condition
file_path = File.join(::JobArtifactUploader.root, event.file_path) file_path = File.join(::JobArtifactUploader.root, event.file_path)
......
...@@ -7,7 +7,6 @@ FactoryBot.define do ...@@ -7,7 +7,6 @@ FactoryBot.define do
trait(:attachment) { file_type :attachment } trait(:attachment) { file_type :attachment }
trait(:avatar) { file_type :avatar } trait(:avatar) { file_type :avatar }
trait(:file) { file_type :file } trait(:file) { file_type :file }
trait(:job_artifact) { file_type :job_artifact }
trait(:lfs) { file_type :lfs } trait(:lfs) { file_type :lfs }
trait(:namespace_file) { file_type :namespace_file } trait(:namespace_file) { file_type :namespace_file }
trait(:personal_file) { file_type :personal_file } trait(:personal_file) { file_type :personal_file }
...@@ -18,7 +17,7 @@ FactoryBot.define do ...@@ -18,7 +17,7 @@ FactoryBot.define do
if registry.file_type.to_sym == :lfs if registry.file_type.to_sym == :lfs
create(:lfs_object) create(:lfs_object)
elsif registry.file_type.to_sym == :job_artifact elsif registry.file_type.to_sym == :job_artifact
create(:ci_job_artifact) raise NotImplementedError, 'Use create(:geo_job_artifact_registry, :with_artifact) instead'
else else
create(:upload) create(:upload)
end end
......
FactoryBot.define do
factory :geo_job_artifact_registry, class: Geo::JobArtifactRegistry do
sequence(:artifact_id)
success true
trait :with_artifact do
after(:build, :stub) do |registry, _|
file = create(:ci_job_artifact)
registry.artifact_id = file.id
end
end
end
end
...@@ -96,17 +96,17 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -96,17 +96,17 @@ describe Geo::JobArtifactRegistryFinder, :geo do
describe '#count_synced_job_artifacts' do describe '#count_synced_job_artifacts' do
it 'counts job artifacts that have been synced' do it 'counts job artifacts that have been synced' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id)
expect(subject.count_synced_job_artifacts).to eq 2 expect(subject.count_synced_job_artifacts).to eq 2
end end
it 'ignores remote job artifacts' do it 'ignores remote job artifacts' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_remote_1.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_1.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id)
expect(subject.count_synced_job_artifacts).to eq 2 expect(subject.count_synced_job_artifacts).to eq 2
end end
...@@ -123,17 +123,17 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -123,17 +123,17 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end end
it 'counts job artifacts that has been synced' do it 'counts job artifacts that has been synced' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id)
expect(subject.count_synced_job_artifacts).to eq 1 expect(subject.count_synced_job_artifacts).to eq 1
end end
it 'ignores remote job artifacts' do it 'ignores remote job artifacts' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_remote_1.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_1.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id)
expect(subject.count_synced_job_artifacts).to eq 1 expect(subject.count_synced_job_artifacts).to eq 1
end end
...@@ -142,17 +142,17 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -142,17 +142,17 @@ describe Geo::JobArtifactRegistryFinder, :geo do
describe '#count_failed_job_artifacts' do describe '#count_failed_job_artifacts' do
it 'counts job artifacts that sync has failed' do it 'counts job artifacts that sync has failed' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, success: false)
expect(subject.count_failed_job_artifacts).to eq 2 expect(subject.count_failed_job_artifacts).to eq 2
end end
it 'ignores remote job artifacts' do it 'ignores remote job artifacts' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_remote_1.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, success: false)
expect(subject.count_failed_job_artifacts).to eq 2 expect(subject.count_failed_job_artifacts).to eq 2
end end
...@@ -169,22 +169,22 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -169,22 +169,22 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end end
it 'counts job artifacts that sync has failed' do it 'counts job artifacts that sync has failed' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id)
expect(subject.count_failed_job_artifacts).to eq 1 expect(subject.count_failed_job_artifacts).to eq 1
end end
it 'does not count job artifacts of unsynced projects' do it 'does not count job artifacts of unsynced projects' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id, success: false)
expect(subject.count_failed_job_artifacts).to eq 0 expect(subject.count_failed_job_artifacts).to eq 0
end end
it 'ignores remote job artifacts' do it 'ignores remote job artifacts' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_2.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id, success: false)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, success: false)
job_artifact_1.update!(file_store: ObjectStorage::Store::REMOTE) job_artifact_1.update!(file_store: ObjectStorage::Store::REMOTE)
expect(subject.count_failed_job_artifacts).to eq 1 expect(subject.count_failed_job_artifacts).to eq 1
...@@ -202,8 +202,8 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -202,8 +202,8 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end end
it 'returns job artifacts without an entry on the tracking database' do it 'returns job artifacts without an entry on the tracking database' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: true) create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: true)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, success: false)
job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10) job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10)
...@@ -211,10 +211,10 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -211,10 +211,10 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end end
it 'excludes job artifacts without an entry on the tracking database' do it 'excludes job artifacts without an entry on the tracking database' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id, success: true) create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: true)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_3.id, success: false) create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, success: false)
job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10, except_file_ids: [job_artifact_2.id]) job_artifacts = subject.find_unsynced_job_artifacts(batch_size: 10, except_artifact_ids: [job_artifact_2.id])
expect(job_artifacts).to match_ids(job_artifact_4) expect(job_artifacts).to match_ids(job_artifact_4)
end end
...@@ -229,7 +229,7 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -229,7 +229,7 @@ describe Geo::JobArtifactRegistryFinder, :geo do
it 'returns job artifacts remotely and successfully synced locally' do it 'returns job artifacts remotely and successfully synced locally' do
job_artifact = create(:ci_job_artifact, :remote_store, project: synced_project) job_artifact = create(:ci_job_artifact, :remote_store, project: synced_project)
create(:geo_file_registry, :job_artifact, file_id: job_artifact.id) create(:geo_job_artifact_registry, artifact_id: job_artifact.id)
job_artifacts = subject.find_migrated_local_job_artifacts(batch_size: 10) job_artifacts = subject.find_migrated_local_job_artifacts(batch_size: 10)
...@@ -245,7 +245,7 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -245,7 +245,7 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end end
it 'excludes synced job artifacts that are stored locally' do it 'excludes synced job artifacts that are stored locally' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_1.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id)
job_artifacts = subject.find_migrated_local_job_artifacts(batch_size: 10) job_artifacts = subject.find_migrated_local_job_artifacts(batch_size: 10)
...@@ -253,8 +253,8 @@ describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -253,8 +253,8 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end end
it 'excludes except_file_ids' do it 'excludes except_file_ids' do
create(:geo_file_registry, :job_artifact, file_id: job_artifact_remote_1.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_1.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_remote_2.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_2.id)
job_artifacts = subject.find_migrated_local_job_artifacts(batch_size: 10, except_file_ids: [job_artifact_remote_1.id]) job_artifacts = subject.find_migrated_local_job_artifacts(batch_size: 10, except_file_ids: [job_artifact_remote_1.id])
......
...@@ -395,13 +395,13 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -395,13 +395,13 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
context 'with a tracking database entry' do context 'with a tracking database entry' do
before do before do
create(:geo_file_registry, :job_artifact, file_id: job_artifact.id) create(:geo_job_artifact_registry, artifact_id: job_artifact.id)
end end
context 'with a file' do context 'with a file' do
context 'when the delete succeeds' do context 'when the delete succeeds' do
it 'removes the tracking database entry' do it 'removes the tracking database entry' do
expect { daemon.run_once! }.to change(Geo::FileRegistry.job_artifacts, :count).by(-1) expect { daemon.run_once! }.to change(Geo::JobArtifactRegistry, :count).by(-1)
end end
it 'deletes the file' do it 'deletes the file' do
...@@ -415,7 +415,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -415,7 +415,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
end end
it 'does not remove the tracking database entry' do it 'does not remove the tracking database entry' do
expect { daemon.run_once! }.not_to change(Geo::FileRegistry.job_artifacts, :count) expect { daemon.run_once! }.not_to change(Geo::JobArtifactRegistry, :count)
end end
end end
end end
...@@ -426,14 +426,14 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared ...@@ -426,14 +426,14 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql, :clean_gitlab_redis_shared
end end
it 'removes the tracking database entry' do it 'removes the tracking database entry' do
expect { daemon.run_once! }.to change(Geo::FileRegistry.job_artifacts, :count).by(-1) expect { daemon.run_once! }.to change(Geo::JobArtifactRegistry, :count).by(-1)
end end
end end
end end
context 'without a tracking database entry' do context 'without a tracking database entry' do
it 'does not create a tracking database entry' do it 'does not create a tracking database entry' do
expect { daemon.run_once! }.not_to change(Geo::FileRegistry, :count) expect { daemon.run_once! }.not_to change(Geo::JobArtifactRegistry, :count)
end end
it 'does not delete the file (yet, due to possible race condition)' do it 'does not delete the file (yet, due to possible race condition)' do
......
require 'spec_helper'
require Rails.root.join('ee', 'db', 'geo', 'migrate', '20180322062741_migrate_ci_job_artifacts_to_separate_registry.rb')
describe MigrateCiJobArtifactsToSeparateRegistry, :geo, :migration do
let(:file_registry) { table(:file_registry) }
let(:job_artifact_registry) { table(:job_artifact_registry) }
before do
file_registry.create!(file_id: 1, file_type: 'job_artifact', success: true, bytes: 1024, sha256: '0' * 64)
file_registry.create!(file_id: 2, file_type: 'job_artifact', success: false, bytes: 2048, sha256: '1' * 64)
file_registry.create!(file_id: 3, file_type: 'attachment', success: true)
file_registry.create!(file_id: 4, file_type: 'job_artifact', success: false, bytes: 4096, sha256: '2' * 64)
end
describe '#up' do
it 'migrates all job artifacts to its own data table' do
expect(file_registry.all.count).to eq(4)
migrate!
expect(file_registry.all.count).to eq(4)
expect(job_artifact_registry.all.count).to eq(3)
expect(job_artifact_registry.where(artifact_id: 1, success: true, bytes: 1024, sha256: '0' * 64).count).to eq(1)
expect(job_artifact_registry.where(artifact_id: 2, success: false, bytes: 2048, sha256: '1' * 64).count).to eq(1)
expect(job_artifact_registry.where(artifact_id: 4, success: false, bytes: 4096, sha256: '2' * 64).count).to eq(1)
expect(file_registry.where(file_id: 3, file_type: 'attachment', success: true).count).to eq(1)
end
it 'creates a new artifact with the trigger' do
migrate!
expect(job_artifact_registry.all.count).to eq(3)
file_registry.create!(file_id: 5, file_type: 'job_artifact', success: true, bytes: 8192, sha256: '3' * 64)
expect(job_artifact_registry.all.count).to eq(4)
expect(job_artifact_registry.where(artifact_id: 5, success: true, bytes: 8192, sha256: '3' * 64).count).to eq(1)
end
it 'updates a new artifact with the trigger' do
migrate!
expect(job_artifact_registry.all.count).to eq(3)
entry = file_registry.find_by(file_id: 1)
entry.update_attributes(success: false, bytes: 10240, sha256: '10' * 64)
expect(job_artifact_registry.where(artifact_id: 1, success: false, bytes: 10240, sha256: '10' * 64).count).to eq(1)
end
it 'creates a new artifact using the next ID' do
migrate!
max_id = job_artifact_registry.maximum(:id)
last_id = job_artifact_registry.create!(artifact_id: 5, success: true).id
expect(last_id - max_id).to eq(1)
end
end
describe '#down' do
it 'rolls back data properly' do
migrate!
expect(file_registry.all.count).to eq(4)
expect(job_artifact_registry.all.count).to eq(3)
schema_migrate_down!
expect(file_registry.all.count).to eq(4)
expect(file_registry.where(file_type: 'attachment').count).to eq(1)
expect(file_registry.where(file_type: 'job_artifact').count).to eq(3)
expect(file_registry.where(file_type: 'job_artifact', bytes: 1024, sha256: '0' * 64).count).to eq(1)
expect(file_registry.where(file_type: 'job_artifact', bytes: 2048, sha256: '1' * 64).count).to eq(1)
expect(file_registry.where(file_type: 'job_artifact', bytes: 4096, sha256: '2' * 64).count).to eq(1)
end
end
end
...@@ -205,9 +205,8 @@ describe GeoNodeStatus, :geo do ...@@ -205,9 +205,8 @@ describe GeoNodeStatus, :geo do
create(:geo_file_registry, :avatar, success: false) create(:geo_file_registry, :avatar, success: false)
create(:geo_file_registry, file_type: :attachment, success: true) create(:geo_file_registry, file_type: :attachment, success: true)
create(:geo_file_registry, :lfs, :with_file, success: true) create(:geo_file_registry, :lfs, :with_file, success: true)
create(:geo_file_registry, :job_artifact, :with_file, success: false) create(:geo_job_artifact_registry, :with_artifact, success: false)
create(:geo_job_artifact_registry, :with_artifact, success: true)
create(:geo_file_registry, :job_artifact, :with_file, success: true)
expect(subject.job_artifacts_synced_count).to eq(1) expect(subject.job_artifacts_synced_count).to eq(1)
end end
...@@ -219,9 +218,8 @@ describe GeoNodeStatus, :geo do ...@@ -219,9 +218,8 @@ describe GeoNodeStatus, :geo do
create(:geo_file_registry, success: false) create(:geo_file_registry, success: false)
create(:geo_file_registry, :avatar, success: false) create(:geo_file_registry, :avatar, success: false)
create(:geo_file_registry, file_type: :attachment, success: false) create(:geo_file_registry, file_type: :attachment, success: false)
create(:geo_file_registry, :job_artifact, :with_file, success: true) create(:geo_job_artifact_registry, :with_artifact, success: true)
create(:geo_job_artifact_registry, :with_artifact, success: false)
create(:geo_file_registry, :job_artifact, :with_file, success: false)
expect(subject.job_artifacts_failed_count).to eq(1) expect(subject.job_artifacts_failed_count).to eq(1)
end end
...@@ -233,7 +231,8 @@ describe GeoNodeStatus, :geo do ...@@ -233,7 +231,8 @@ describe GeoNodeStatus, :geo do
[project_1, project_2, project_3, project_4].each_with_index do |project, index| [project_1, project_2, project_3, project_4].each_with_index do |project, index|
build = create(:ci_build, project: project) build = create(:ci_build, project: project)
job_artifact = create(:ci_job_artifact, job: build) job_artifact = create(:ci_job_artifact, job: build)
create(:geo_file_registry, :job_artifact, success: index.even?, file_id: job_artifact.id)
create(:geo_job_artifact_registry, success: index.even?, artifact_id: job_artifact.id)
end end
end end
......
...@@ -195,13 +195,13 @@ describe Geo::FileDownloadService do ...@@ -195,13 +195,13 @@ describe Geo::FileDownloadService do
it 'downloads a job artifact' do it 'downloads a job artifact' do
stub_transfer(Gitlab::Geo::JobArtifactTransfer, 100) stub_transfer(Gitlab::Geo::JobArtifactTransfer, 100)
expect { subject.execute }.to change { Geo::FileRegistry.synced.count }.by(1) expect { subject.execute }.to change { Geo::JobArtifactRegistry.synced.count }.by(1)
end end
it 'registers when the download fails' do it 'registers when the download fails' do
stub_transfer(Gitlab::Geo::JobArtifactTransfer, -1) stub_transfer(Gitlab::Geo::JobArtifactTransfer, -1)
expect { subject.execute }.to change { Geo::FileRegistry.failed.count }.by(1) expect { subject.execute }.to change { Geo::JobArtifactRegistry.failed.count }.by(1)
end end
it 'logs a message' do it 'logs a message' do
......
...@@ -35,6 +35,22 @@ describe Geo::FileRegistryRemovalService do ...@@ -35,6 +35,22 @@ describe Geo::FileRegistryRemovalService do
end end
end end
shared_examples 'removes artifact' do
subject(:service) { described_class.new('job_artifact', registry.artifact_id) }
it 'file from disk' do
expect do
service.execute
end.to change { File.exist?(file_path) }.from(true).to(false)
end
it 'registry when file was deleted successfully' do
expect do
service.execute
end.to change(Geo::JobArtifactRegistry, :count).by(-1)
end
end
context 'with LFS object' do context 'with LFS object' do
let!(:lfs_object) { create(:lfs_object, :with_file) } let!(:lfs_object) { create(:lfs_object, :with_file) }
let!(:file_registry) { create(:geo_file_registry, :lfs, file_id: lfs_object.id) } let!(:file_registry) { create(:geo_file_registry, :lfs, file_id: lfs_object.id) }
...@@ -54,10 +70,10 @@ describe Geo::FileRegistryRemovalService do ...@@ -54,10 +70,10 @@ describe Geo::FileRegistryRemovalService do
context 'with job artifact' do context 'with job artifact' do
let!(:job_artifact) { create(:ci_job_artifact, :archive) } let!(:job_artifact) { create(:ci_job_artifact, :archive) }
let!(:file_registry) { create(:geo_file_registry, :job_artifact, file_id: job_artifact.id) } let!(:registry) { create(:geo_job_artifact_registry, artifact_id: job_artifact.id) }
let!(:file_path) { job_artifact.file.path } let!(:file_path) { job_artifact.file.path }
it_behaves_like 'removes' it_behaves_like 'removes artifact'
context 'migrated to object storage' do context 'migrated to object storage' do
before do before do
...@@ -65,7 +81,7 @@ describe Geo::FileRegistryRemovalService do ...@@ -65,7 +81,7 @@ describe Geo::FileRegistryRemovalService do
job_artifact.update_column(:file_store, JobArtifactUploader::Store::REMOTE) job_artifact.update_column(:file_store, JobArtifactUploader::Store::REMOTE)
end end
it_behaves_like 'removes' it_behaves_like 'removes artifact'
end end
end end
......
...@@ -7,5 +7,50 @@ module EE ...@@ -7,5 +7,50 @@ module EE
super super
rescue Geo::TrackingBase::SecondaryNotConfigured rescue Geo::TrackingBase::SecondaryNotConfigured
end end
override :active_record_base
def active_record_base
if geo_migration?
Geo::TrackingBase
else
super
end
end
override :migrations_paths
def migrations_paths
if geo_migration?
::Gitlab::Geo::DatabaseTasks.geo_migrate_path
else
super
end
end
override :schema_migrate_down!
def schema_migrate_down!
with_db_config { super }
end
override :schema_migrate_up!
def schema_migrate_up!
with_db_config { super }
end
override :migrate!
def migrate!
with_db_config { super }
end
def with_db_config(&block)
if geo_migration?
::Gitlab::Geo::DatabaseTasks.with_geo_db { yield }
else
yield
end
end
def geo_migration?
self.class.metadata[:geo]
end
end end
end end
...@@ -76,10 +76,10 @@ describe Geo::FileDownloadDispatchWorker, :geo do ...@@ -76,10 +76,10 @@ describe Geo::FileDownloadDispatchWorker, :geo do
it 'performs Geo::FileDownloadWorker for failed-sync job artifacts' do it 'performs Geo::FileDownloadWorker for failed-sync job artifacts' do
artifact = create(:ci_job_artifact) artifact = create(:ci_job_artifact)
Geo::FileRegistry.create!(file_type: :job_artifact, file_id: artifact.id, bytes: 0, success: false) create(:geo_job_artifact_registry, artifact_id: artifact.id, bytes: 0, success: false)
expect(Geo::FileDownloadWorker).to receive(:perform_async) expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', artifact.id).once.and_return(spy) .with(:job_artifact, artifact.id).once.and_return(spy)
subject.perform subject.perform
end end
...@@ -87,7 +87,7 @@ describe Geo::FileDownloadDispatchWorker, :geo do ...@@ -87,7 +87,7 @@ describe Geo::FileDownloadDispatchWorker, :geo do
it 'does not perform Geo::FileDownloadWorker for synced job artifacts' do it 'does not perform Geo::FileDownloadWorker for synced job artifacts' do
artifact = create(:ci_job_artifact) artifact = create(:ci_job_artifact)
Geo::FileRegistry.create!(file_type: :job_artifact, file_id: artifact.id, bytes: 1234, success: true) create(:geo_job_artifact_registry, artifact_id: artifact.id, bytes: 1234, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async) expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
...@@ -97,7 +97,7 @@ describe Geo::FileDownloadDispatchWorker, :geo do ...@@ -97,7 +97,7 @@ describe Geo::FileDownloadDispatchWorker, :geo do
it 'does not perform Geo::FileDownloadWorker for synced job artifacts even with 0 bytes downloaded' do it 'does not perform Geo::FileDownloadWorker for synced job artifacts even with 0 bytes downloaded' do
artifact = create(:ci_job_artifact) artifact = create(:ci_job_artifact)
Geo::FileRegistry.create!(file_type: :job_artifact, file_id: artifact.id, bytes: 0, success: true) create(:geo_job_artifact_registry, artifact_id: artifact.id, bytes: 0, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async) expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
......
...@@ -122,8 +122,8 @@ describe Geo::MigratedLocalFilesCleanUpWorker, :geo do ...@@ -122,8 +122,8 @@ describe Geo::MigratedLocalFilesCleanUpWorker, :geo do
before do before do
stub_artifacts_object_storage stub_artifacts_object_storage
create(:geo_file_registry, :job_artifact, file_id: job_artifact_local.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_local.id)
create(:geo_file_registry, :job_artifact, file_id: job_artifact_remote.id) create(:geo_job_artifact_registry, artifact_id: job_artifact_remote.id)
end end
it 'schedules job for artifact stored remotely and synced locally' do it 'schedules job for artifact stored remotely and synced locally' do
......
module MigrationsHelpers module MigrationsHelpers
prepend EE::MigrationsHelpers prepend EE::MigrationsHelpers
def active_record_base
ActiveRecord::Base
end
def table(name) def table(name)
Class.new(ActiveRecord::Base) do Class.new(active_record_base) do
self.table_name = name self.table_name = name
self.inheritance_column = :_type_disabled self.inheritance_column = :_type_disabled
end end
...@@ -13,7 +17,7 @@ module MigrationsHelpers ...@@ -13,7 +17,7 @@ module MigrationsHelpers
end end
def table_exists?(name) def table_exists?(name)
ActiveRecord::Base.connection.table_exists?(name) active_record_base.connection.table_exists?(name)
end end
def migrations def migrations
...@@ -21,7 +25,7 @@ module MigrationsHelpers ...@@ -21,7 +25,7 @@ module MigrationsHelpers
end end
def clear_schema_cache! def clear_schema_cache!
ActiveRecord::Base.connection_pool.connections.each do |conn| active_record_base.connection_pool.connections.each do |conn|
conn.schema_cache.clear! conn.schema_cache.clear!
end end
end end
...@@ -32,7 +36,7 @@ module MigrationsHelpers ...@@ -32,7 +36,7 @@ module MigrationsHelpers
# Reset column information for the most offending classes **after** we # Reset column information for the most offending classes **after** we
# migrated the schema up, otherwise, column information could be # migrated the schema up, otherwise, column information could be
# outdated. We have a separate method for this so we can override it in EE. # outdated. We have a separate method for this so we can override it in EE.
ActiveRecord::Base.descendants.each(&method(:reset_column_information)) active_record_base.descendants.each(&method(:reset_column_information))
# Without that, we get errors because of missing attributes, e.g. # Without that, we get errors because of missing attributes, e.g.
# super: no superclass method `elasticsearch_indexing' for #<ApplicationSetting:0x00007f85628508d8> # super: no superclass method `elasticsearch_indexing' for #<ApplicationSetting:0x00007f85628508d8>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment