Commit b26712c0 authored by Michael Kozono's avatar Michael Kozono

Retry if synced but missing on primary

…at a low priority, similar to failed syncs.
parent 7a543084
......@@ -84,6 +84,14 @@ module Geo
relation.limit(batch_size)
end
def find_failed_attachments_registries
Geo::FileRegistry.attachments.failed
end
def find_synced_missing_on_primary_attachments_registries
Geo::FileRegistry.attachments.synced.missing_on_primary
end
private
def group_uploads
......@@ -180,7 +188,7 @@ module Geo
def legacy_find_failed_attachments
legacy_inner_join_registry_ids(
local_attachments,
Geo::FileRegistry.attachments.failed.pluck(:file_id),
find_failed_attachments_registries.pluck(:file_id),
Upload
)
end
......
module Geo
class FileRegistryFinder < RegistryFinder
def find_failed_file_registries(batch_size:)
Geo::FileRegistry.failed.retry_due.limit(batch_size)
end
protected
def legacy_pluck_registry_file_ids(file_types:)
......
......@@ -24,7 +24,7 @@ module Geo
Geo::JobArtifactRegistry.count
end
# Find limited amount of non replicated lfs objects.
# Find limited amount of non replicated job artifacts.
#
# You can pass a list with `except_artifact_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
......@@ -45,12 +45,12 @@ module Geo
relation.limit(batch_size)
end
def find_migrated_local_job_artifacts(batch_size:, except_file_ids: [])
def find_migrated_local_job_artifacts(batch_size:, except_artifact_ids: [])
relation =
if use_legacy_queries?
legacy_find_migrated_local_job_artifacts(except_file_ids: except_file_ids)
legacy_find_migrated_local_job_artifacts(except_artifact_ids: except_artifact_ids)
else
fdw_find_migrated_local_job_artifacts(except_file_ids: except_file_ids)
fdw_find_migrated_local_job_artifacts(except_artifact_ids: except_artifact_ids)
end
relation.limit(batch_size)
......@@ -72,6 +72,10 @@ module Geo
Geo::JobArtifactRegistry.synced
end
def find_synced_missing_on_primary_job_artifacts_registries
Geo::JobArtifactRegistry.synced.missing_on_primary
end
def find_failed_job_artifacts_registries
Geo::JobArtifactRegistry.failed
end
......@@ -86,6 +90,14 @@ module Geo
end
end
def find_synced_missing_on_primary_job_artifacts
if use_legacy_queries?
legacy_find_synced_missing_on_primary_job_artifacts
else
fdw_find_job_artifacts.merge(find_synced_missing_on_primary_job_artifacts_registries)
end
end
def find_failed_job_artifacts
if use_legacy_queries?
legacy_find_failed_job_artifacts
......@@ -111,10 +123,10 @@ module Geo
.where.not(id: except_artifact_ids)
end
def fdw_find_migrated_local_job_artifacts(except_file_ids:)
def fdw_find_migrated_local_job_artifacts(except_artifact_ids:)
fdw_job_artifacts.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_job_artifacts_table}.id")
.with_files_stored_remotely
.where.not(id: except_file_ids)
.where.not(id: except_artifact_ids)
.merge(Geo::JobArtifactRegistry.all)
end
......@@ -165,8 +177,8 @@ module Geo
(ids + include_registry_ids).uniq
end
def legacy_find_migrated_local_job_artifacts(except_file_ids:)
registry_file_ids = Geo::JobArtifactRegistry.pluck(:artifact_id) - except_file_ids
def legacy_find_migrated_local_job_artifacts(except_artifact_ids:)
registry_file_ids = Geo::JobArtifactRegistry.pluck(:artifact_id) - except_artifact_ids
legacy_inner_join_registry_ids(
job_artifacts.with_files_stored_remotely,
......@@ -174,5 +186,13 @@ module Geo
Ci::JobArtifact
)
end
def legacy_find_synced_missing_on_primary_job_artifacts
legacy_inner_join_registry_ids(
local_job_artifacts,
find_synced_missing_on_primary_job_artifacts_registries.pluck(:artifact_id),
Ci::JobArtifact
)
end
end
end
......@@ -68,6 +68,14 @@ module Geo
lfs_objects.with_files_stored_locally
end
def find_failed_lfs_objects_registries
Geo::FileRegistry.lfs_objects.failed
end
def find_synced_missing_on_primary_lfs_objects_registries
Geo::FileRegistry.lfs_objects.synced.missing_on_primary
end
private
def find_synced_lfs_objects
......@@ -139,7 +147,7 @@ module Geo
def legacy_find_failed_lfs_objects
legacy_inner_join_registry_ids(
local_lfs_objects,
Geo::FileRegistry.lfs_objects.failed.pluck(:file_id),
find_failed_lfs_objects_registries.pluck(:file_id),
LfsObject
)
end
......
......@@ -5,5 +5,6 @@ module Geo::Syncable
scope :failed, -> { where(success: false) }
scope :synced, -> { where(success: true) }
scope :retry_due, -> { where('retry_at is NULL OR retry_at < ?', Time.now) }
scope :missing_on_primary, -> { where(missing_on_primary: true) }
end
end
......@@ -9,7 +9,7 @@ module Geo
end
def schedule_job(object_type, object_db_id)
job_id = FileDownloadWorker.perform_async(object_type, object_db_id)
job_id = FileDownloadWorker.perform_async(object_type.to_s, object_db_id)
{ id: object_db_id, type: object_type, job_id: job_id } if job_id
end
......@@ -18,10 +18,6 @@ module Geo
@attachments_finder ||= AttachmentRegistryFinder.new(current_node: current_node)
end
def file_registry_finder
@file_registry_finder ||= FileRegistryFinder.new(current_node: current_node)
end
def lfs_objects_finder
@lfs_objects_finder ||= LfsObjectRegistryFinder.new(current_node: current_node)
end
......@@ -40,47 +36,97 @@ module Geo
if remaining_capacity.zero?
resources
else
resources + find_failed_upload_object_ids(batch_size: remaining_capacity)
resources + find_low_priority_objects(batch_size: remaining_capacity)
end
end
def find_unsynced_objects(batch_size:)
lfs_object_ids = find_unsynced_lfs_objects_ids(batch_size: batch_size)
attachment_ids = find_unsynced_attachments_ids(batch_size: batch_size)
job_artifact_ids = find_unsynced_job_artifacts_ids(batch_size: batch_size)
take_batch(find_unsynced_lfs_objects_ids(batch_size: batch_size),
find_unsynced_attachments_ids(batch_size: batch_size),
find_unsynced_job_artifacts_ids(batch_size: batch_size),
batch_size: batch_size)
end
take_batch(lfs_object_ids, attachment_ids, job_artifact_ids)
def find_low_priority_objects(batch_size:)
take_batch(find_failed_attachments_ids(batch_size: batch_size),
find_failed_lfs_objects_ids(batch_size: batch_size),
find_failed_artifacts_ids(batch_size: batch_size),
find_synced_missing_on_primary_attachments_ids(batch_size: batch_size),
find_synced_missing_on_primary_lfs_objects_ids(batch_size: batch_size),
find_synced_missing_on_primary_job_artifacts_ids(batch_size: batch_size),
batch_size: batch_size)
end
def find_unsynced_lfs_objects_ids(batch_size:)
lfs_objects_finder.find_unsynced_lfs_objects(batch_size: batch_size, except_file_ids: scheduled_file_ids(:lfs))
lfs_objects_finder.find_unsynced_lfs_objects(
batch_size: batch_size,
except_file_ids: scheduled_file_ids('lfs'))
.pluck(:id)
.map { |id| [:lfs, id] }
.map { |id| ['lfs', id] }
end
def find_unsynced_attachments_ids(batch_size:)
attachments_finder.find_unsynced_attachments(batch_size: batch_size, except_file_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
.pluck(:uploader, :id)
.map { |uploader, id| [uploader.sub(/Uploader\z/, '').underscore, id] }
attachments_finder.find_unsynced_attachments(
batch_size: batch_size,
except_file_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
.pluck(:id, :uploader)
.map { |id, uploader| [uploader.sub(/Uploader\z/, '').underscore, id] }
end
def find_unsynced_job_artifacts_ids(batch_size:)
job_artifacts_finder.find_unsynced_job_artifacts(batch_size: batch_size, except_artifact_ids: scheduled_file_ids(:job_artifact))
job_artifacts_finder.find_unsynced_job_artifacts(
batch_size: batch_size,
except_artifact_ids: scheduled_file_ids('job_artifact'))
.pluck(:id)
.map { |id| [:job_artifact, id] }
.map { |id| ['job_artifact', id] }
end
def find_failed_upload_object_ids(batch_size:)
file_ids = file_registry_finder.find_failed_file_registries(batch_size: batch_size)
def find_failed_attachments_ids(batch_size:)
attachments_finder.find_failed_attachments_registries
.retry_due
.where.not(file_id: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
.limit(batch_size)
.pluck(:file_type, :file_id)
artifact_ids = find_failed_artifact_ids(batch_size: batch_size)
end
def find_failed_lfs_objects_ids(batch_size:)
lfs_objects_finder.find_failed_lfs_objects_registries
.retry_due
.where.not(file_id: scheduled_file_ids('lfs'))
.limit(batch_size)
.pluck(:file_id).map { |id| ['lfs', id] }
end
def find_failed_artifacts_ids(batch_size:)
job_artifacts_finder.find_failed_job_artifacts_registries
.retry_due
.where.not(artifact_id: scheduled_file_ids('job_artifact'))
.limit(batch_size)
.pluck(:artifact_id).map { |id| ['job_artifact', id] }
end
def find_synced_missing_on_primary_attachments_ids(batch_size:)
attachments_finder.find_synced_missing_on_primary_attachments_registries
.retry_due
.where.not(file_id: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
.limit(batch_size)
.pluck(:file_type, :file_id)
end
take_batch(file_ids, artifact_ids)
def find_synced_missing_on_primary_lfs_objects_ids(batch_size:)
lfs_objects_finder.find_synced_missing_on_primary_lfs_objects_registries
.retry_due
.where.not(file_id: scheduled_file_ids('lfs'))
.limit(batch_size)
.pluck(:file_id).map { |id| ['lfs', id] }
end
def find_failed_artifact_ids(batch_size:)
job_artifacts_finder.find_failed_job_artifacts_registries.retry_due.limit(batch_size)
.pluck(:artifact_id).map { |id| [:job_artifact, id] }
def find_synced_missing_on_primary_job_artifacts_ids(batch_size:)
job_artifacts_finder.find_synced_missing_on_primary_job_artifacts_registries
.retry_due
.where.not(artifact_id: scheduled_file_ids('job_artifact'))
.limit(batch_size)
.pluck(:artifact_id).map { |id| ['job_artifact', id] }
end
def scheduled_file_ids(file_types)
......
......@@ -6,7 +6,7 @@ module Geo
sidekiq_options retry: 3, dead: false
def perform(object_type, object_id)
Geo::FileDownloadService.new(object_type.to_sym, object_id).execute
Geo::FileDownloadService.new(object_type, object_id).execute
end
end
end
......@@ -61,7 +61,7 @@ module Geo
def find_migrated_local_job_artifacts_ids(batch_size:)
return [] unless job_artifacts_object_store_enabled?
job_artifacts_finder.find_migrated_local_job_artifacts(batch_size: batch_size, except_file_ids: scheduled_file_ids(:job_artifact))
job_artifacts_finder.find_migrated_local_job_artifacts(batch_size: batch_size, except_artifact_ids: scheduled_file_ids(:job_artifact))
.pluck(:id)
.map { |id| [:job_artifact, id] }
end
......
......@@ -107,8 +107,8 @@ module Geo
(Time.now.utc - start_time) >= run_time
end
def take_batch(*arrays)
interleave(*arrays).uniq.compact.take(db_retrieve_batch_size)
def take_batch(*arrays, batch_size: db_retrieve_batch_size)
interleave(*arrays).uniq.compact.take(batch_size)
end
# Combines the elements of multiple, arbitrary-length arrays into a single array.
......
......@@ -172,16 +172,14 @@ describe Geo::AttachmentRegistryFinder, :geo do
expect(uploads).to match_ids(upload_2, upload_3, upload_4)
end
it 'excludes uploads without an entry on the tracking database' do
create(:geo_file_registry, :avatar, file_id: upload_1.id, success: true)
it 'excludes uploads in the except_file_ids option' do
uploads = subject.find_unsynced_attachments(batch_size: 10, except_file_ids: [upload_2.id])
expect(uploads).to match_ids(upload_3, upload_4)
expect(uploads).to match_ids(upload_1, upload_3, upload_4)
end
it 'excludes remote uploads without an entry on the tracking database' do
create(:geo_file_registry, :avatar, file_id: upload_1.id, success: true)
it 'excludes remote uploads' do
upload_1.update!(store: ObjectStorage::Store::REMOTE)
uploads = subject.find_unsynced_attachments(batch_size: 10)
......
require 'spec_helper'
describe Geo::FileRegistryFinder, :geo do
include ::EE::GeoHelpers
let(:secondary) { create(:geo_node) }
subject { described_class.new(current_node: secondary) }
before do
stub_current_geo_node(secondary)
end
describe '#find_failed_file_registries' do
it 'returs uploads that sync has failed' do
failed_lfs_registry = create(:geo_file_registry, :lfs, :with_file, success: false)
failed_file_upload = create(:geo_file_registry, :with_file, success: false)
failed_issuable_upload = create(:geo_file_registry, :with_file, success: false)
create(:geo_file_registry, :lfs, :with_file, success: true)
create(:geo_file_registry, :with_file, success: true)
uploads = subject.find_failed_file_registries(batch_size: 10)
expect(uploads).to match_array([failed_lfs_registry, failed_file_upload, failed_issuable_upload])
end
end
end
......@@ -252,11 +252,11 @@ describe Geo::JobArtifactRegistryFinder, :geo do
expect(job_artifacts).to be_empty
end
it 'excludes except_file_ids' do
it 'excludes except_artifact_ids' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_1.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_2.id)
job_artifacts = subject.find_migrated_local_job_artifacts(batch_size: 10, except_file_ids: [job_artifact_remote_1.id])
job_artifacts = subject.find_migrated_local_job_artifacts(batch_size: 10, except_artifact_ids: [job_artifact_remote_1.id])
expect(job_artifacts).to match_ids(job_artifact_remote_2)
end
......
......@@ -2,10 +2,10 @@ require 'spec_helper'
describe Geo::FileDownloadWorker, :geo do
describe '#perform' do
it 'instantiates and executes FileDownloadService, and converts object_type to a symbol' do
it 'instantiates and executes FileDownloadService' do
service = double(:service)
expect(service).to receive(:execute)
expect(Geo::FileDownloadService).to receive(:new).with(:job_artifact, 1).and_return(service)
expect(Geo::FileDownloadService).to receive(:new).with('job_artifact', 1).and_return(service)
described_class.new.perform('job_artifact', 1)
end
end
......
require 'spec_helper'
describe Geo::Scheduler::SchedulerWorker do
describe Geo::Scheduler::SchedulerWorker, :geo do
subject { described_class.new }
it 'includes ::Gitlab::Geo::LogHelpers' do
expect(described_class).to include_module(::Gitlab::Geo::LogHelpers)
end
it 'needs many other specs'
describe '#take_batch' do
let(:a) { [[2, :lfs], [3, :lfs]] }
let(:b) { [] }
let(:c) { [[3, :job_artifact], [8, :job_artifact], [9, :job_artifact]] }
context 'without batch_size' do
it 'returns a batch of jobs' do
expect(subject).to receive(:db_retrieve_batch_size).and_return(4)
expect(subject.send(:take_batch, a, b, c)).to eq([
[3, :job_artifact],
[2, :lfs],
[8, :job_artifact],
[3, :lfs]
])
end
end
context 'with batch_size' do
it 'returns a batch of jobs' do
expect(subject.send(:take_batch, a, b, c, batch_size: 2)).to eq([
[3, :job_artifact],
[2, :lfs]
])
end
end
end
describe '#interleave' do
# Notice ties are resolved by taking the "first" tied element
it 'interleaves 2 arrays' do
a = %w{1 2 3}
b = %w{A B C}
expect(subject.send(:interleave, a, b)).to eq(%w{1 A 2 B 3 C})
end
# Notice there are no ties in this call
it 'interleaves 2 arrays with a longer second array' do
a = %w{1 2}
b = %w{A B C}
expect(subject.send(:interleave, a, b)).to eq(%w{A 1 B 2 C})
end
it 'interleaves 2 arrays with a longer first array' do
a = %w{1 2 3}
b = %w{A B}
expect(subject.send(:interleave, a, b)).to eq(%w{1 A 2 B 3})
end
it 'interleaves 3 arrays' do
a = %w{1 2 3}
b = %w{A B C}
c = %w{i ii iii}
expect(subject.send(:interleave, a, b, c)).to eq(%w{1 A i 2 B ii 3 C iii})
end
it 'interleaves 3 arrays of unequal length' do
a = %w{1 2}
b = %w{A}
c = %w{i ii iii iiii}
expect(subject.send(:interleave, a, b, c)).to eq(%w{i 1 ii A iii 2 iiii})
end
end
end
......@@ -10,6 +10,13 @@ RSpec::Matchers.define :match_ids do |*expected|
'matches elements by ids'
end
failure_message do
actual_ids = map_ids(actual)
expected_ids = map_ids(expected)
"expected IDs #{actual_ids} in:\n\n #{actual.inspect}\n\nto match IDs #{expected_ids} in:\n\n #{expected.inspect}"
end
def map_ids(elements)
elements = elements.flatten if elements.respond_to?(:flatten)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment