Commit 5eab5bbd authored by Stan Hu's avatar Stan Hu

Merge branch 'mk/geo/mark-missing-primary-files-as-synced-and-add-metrics' into 'master'

Geo: Mark missing primary files as synced, and add logging and metrics for sysadmins to investigate missing files

Closes #5144 and #5312

See merge request gitlab-org/gitlab-ee!5050
parents f89c3101 fb736aa6
......@@ -20,4 +20,6 @@ lib/gitlab/workhorse.rb
ee/db/**/*
ee/app/serializers/ee/merge_request_widget_entity.rb
ee/lib/ee/gitlab/ldap/sync/admin_users.rb
ee/app/workers/geo/file_download_dispatch_worker/job_artifact_job_finder.rb
ee/app/workers/geo/file_download_dispatch_worker/lfs_object_job_finder.rb
ee/spec/**/*
......@@ -47,7 +47,18 @@ class Upload < ActiveRecord::Base
end
def exist?
File.exist?(absolute_path)
exist = File.exist?(absolute_path)
# Help sysadmins find missing upload files
if persisted? && !exist
if Gitlab::Sentry.enabled?
Raven.capture_message("Upload file does not exist", extra: self.attributes)
end
Gitlab::Metrics.counter(:upload_file_does_not_exist_total, 'The number of times an upload record could not find its file').increment
end
exist
end
def uploader_context
......
......@@ -1077,6 +1077,9 @@ ActiveRecord::Schema.define(version: 20180405101928) do
t.integer "repositories_verification_failed_count"
t.integer "wikis_verified_count"
t.integer "wikis_verification_failed_count"
t.integer "lfs_objects_synced_missing_on_primary_count"
t.integer "job_artifacts_synced_missing_on_primary_count"
t.integer "attachments_synced_missing_on_primary_count"
end
add_index "geo_node_statuses", ["geo_node_id"], name: "index_geo_node_statuses_on_geo_node_id", unique: true, using: :btree
......
......@@ -26,52 +26,56 @@ server, because the embedded server configuration is overwritten once every
In this experimental phase, only a few metrics are available:
| Metric | Type | Since | Description |
|:--------------------------------- |:--------- |:----- |:----------- |
| db_ping_timeout | Gauge | 9.4 | Whether or not the last database ping timed out |
| db_ping_success | Gauge | 9.4 | Whether or not the last database ping succeeded |
| db_ping_latency_seconds | Gauge | 9.4 | Round trip time of the database ping |
| filesystem_access_latency_seconds | Gauge | 9.4 | Latency in accessing a specific filesystem |
| filesystem_accessible | Gauge | 9.4 | Whether or not a specific filesystem is accessible |
| filesystem_write_latency_seconds | Gauge | 9.4 | Write latency of a specific filesystem |
| filesystem_writable | Gauge | 9.4 | Whether or not the filesystem is writable |
| filesystem_read_latency_seconds | Gauge | 9.4 | Read latency of a specific filesystem |
| filesystem_readable | Gauge | 9.4 | Whether or not the filesystem is readable |
| http_requests_total | Counter | 9.4 | Rack request count |
| http_request_duration_seconds | Histogram | 9.4 | HTTP response time from rack middleware |
| pipelines_created_total | Counter | 9.4 | Counter of pipelines created |
| rack_uncaught_errors_total | Counter | 9.4 | Rack connections handling uncaught errors count |
| redis_ping_timeout | Gauge | 9.4 | Whether or not the last redis ping timed out |
| redis_ping_success | Gauge | 9.4 | Whether or not the last redis ping succeeded |
| redis_ping_latency_seconds | Gauge | 9.4 | Round trip time of the redis ping |
| user_session_logins_total | Counter | 9.4 | Counter of how many users have logged in |
| filesystem_circuitbreaker_latency_seconds | Gauge | 9.5 | Time spent validating if a storage is accessible |
| filesystem_circuitbreaker | Gauge | 9.5 | Wether or not the circuit for a certain shard is broken or not |
| Metric | Type | Since | Description |
|:--------------------------------------------- |:--------- |:----- |:----------- |
| db_ping_timeout | Gauge | 9.4 | Whether or not the last database ping timed out |
| db_ping_success | Gauge | 9.4 | Whether or not the last database ping succeeded |
| db_ping_latency_seconds | Gauge | 9.4 | Round trip time of the database ping |
| filesystem_access_latency_seconds | Gauge | 9.4 | Latency in accessing a specific filesystem |
| filesystem_accessible | Gauge | 9.4 | Whether or not a specific filesystem is accessible |
| filesystem_write_latency_seconds | Gauge | 9.4 | Write latency of a specific filesystem |
| filesystem_writable | Gauge | 9.4 | Whether or not the filesystem is writable |
| filesystem_read_latency_seconds | Gauge | 9.4 | Read latency of a specific filesystem |
| filesystem_readable | Gauge | 9.4 | Whether or not the filesystem is readable |
| http_requests_total | Counter | 9.4 | Rack request count |
| http_request_duration_seconds | Histogram | 9.4 | HTTP response time from rack middleware |
| pipelines_created_total | Counter | 9.4 | Counter of pipelines created |
| rack_uncaught_errors_total | Counter | 9.4 | Rack connections handling uncaught errors count |
| redis_ping_timeout | Gauge | 9.4 | Whether or not the last redis ping timed out |
| redis_ping_success | Gauge | 9.4 | Whether or not the last redis ping succeeded |
| redis_ping_latency_seconds | Gauge | 9.4 | Round trip time of the redis ping |
| user_session_logins_total | Counter | 9.4 | Counter of how many users have logged in |
| filesystem_circuitbreaker_latency_seconds | Gauge | 9.5 | Time spent validating if a storage is accessible |
| filesystem_circuitbreaker | Gauge | 9.5 | Whether or not the circuit for a certain shard is broken or not |
| circuitbreaker_storage_check_duration_seconds | Histogram | 10.3 | Time a single storage probe took |
| upload_file_does_not_exist | Counter | 10.7 | Number of times an upload record could not find its file |
## Sidekiq Metrics available
Sidekiq jobs may also gather metrics, and these metrics can be accessed if the Sidekiq exporter is enabled (e.g. via
the `monitoring.sidekiq_exporter` configuration option in `gitlab.yml`.
| Metric | Type | Since | Description | Labels |
|:--------------------------------- |:--------- |:----- |:----------- |:------ |
|geo_db_replication_lag_seconds | Gauge | 10.2 | Database replication lag (seconds) | url
|geo_repositories | Gauge | 10.2 | Total number of repositories available on primary | url
|geo_repositories_synced | Gauge | 10.2 | Number of repositories synced on secondary | url
|geo_repositories_failed | Gauge | 10.2 | Number of repositories failed to sync on secondary | url
|geo_lfs_objects | Gauge | 10.2 | Total number of LFS objects available on primary | url
|geo_lfs_objects_synced | Gauge | 10.2 | Number of LFS objects synced on secondary | url
|geo_lfs_objects_failed | Gauge | 10.2 | Number of LFS objects failed to sync on secondary | url
|geo_attachments | Gauge | 10.2 | Total number of file attachments available on primary | url
|geo_attachments_synced | Gauge | 10.2 | Number of attachments synced on secondary | url
|geo_attachments_failed | Gauge | 10.2 | Number of attachments failed to sync on secondary | url
|geo_last_event_id | Gauge | 10.2 | Database ID of the latest event log entry on the primary | url
|geo_last_event_timestamp | Gauge | 10.2 | UNIX timestamp of the latest event log entry on the primary | url
|geo_cursor_last_event_id | Gauge | 10.2 | Last database ID of the event log processed by the secondary | url
|geo_cursor_last_event_timestamp | Gauge | 10.2 | Last UNIX timestamp of the event log processed by the secondary | url
|geo_status_failed_total | Counter | 10.2 | Number of times retrieving the status from the Geo Node failed | url
|geo_last_successful_status_check_timestamp | Gauge | Last timestamp when the status was successfully updated | url
| Metric | Type | Since | Description | Labels |
|:------------------------------------------- |:------- |:----- |:----------- |:------ |
| geo_db_replication_lag_seconds | Gauge | 10.2 | Database replication lag (seconds) | url
| geo_repositories | Gauge | 10.2 | Total number of repositories available on primary | url
| geo_repositories_synced | Gauge | 10.2 | Number of repositories synced on secondary | url
| geo_repositories_failed | Gauge | 10.2 | Number of repositories failed to sync on secondary | url
| geo_lfs_objects | Gauge | 10.2 | Total number of LFS objects available on primary | url
| geo_lfs_objects_synced | Gauge | 10.2 | Number of LFS objects synced on secondary | url
| geo_lfs_objects_failed | Gauge | 10.2 | Number of LFS objects failed to sync on secondary | url
| geo_attachments | Gauge | 10.2 | Total number of file attachments available on primary | url
| geo_attachments_synced | Gauge | 10.2 | Number of attachments synced on secondary | url
| geo_attachments_failed | Gauge | 10.2 | Number of attachments failed to sync on secondary | url
| geo_last_event_id | Gauge | 10.2 | Database ID of the latest event log entry on the primary | url
| geo_last_event_timestamp | Gauge | 10.2 | UNIX timestamp of the latest event log entry on the primary | url
| geo_cursor_last_event_id | Gauge | 10.2 | Last database ID of the event log processed by the secondary | url
| geo_cursor_last_event_timestamp | Gauge | 10.2 | Last UNIX timestamp of the event log processed by the secondary | url
| geo_status_failed_total | Counter | 10.2 | Number of times retrieving the status from the Geo Node failed | url
| geo_last_successful_status_check_timestamp | Gauge | 10.2 | Last timestamp when the status was successfully updated | url
| geo_lfs_objects_synced_missing_on_primary | Gauge | 10.7 | Number of LFS objects marked as synced due to the file missing on the primary | url
| geo_job_artifacts_synced_missing_on_primary | Gauge | 10.7 | Number of job artifacts marked as synced due to the file missing on the primary | url
| geo_attachments_synced_missing_on_primary | Gauge | 10.7 | Number of attachments marked as synced due to the file missing on the primary | url
## Metrics shared directory
......
......@@ -154,15 +154,18 @@ Example response:
"attachments_count": 1,
"attachments_synced_count": 1,
"attachments_failed_count": 0,
"attachments_synced_missing_on_primary_count": 0,
"attachments_synced_in_percentage": "100.00%",
"db_replication_lag_seconds": 0,
"lfs_objects_count": 0,
"lfs_objects_synced_count": 0,
"lfs_objects_failed_count": 0,
"lfs_objects_synced_missing_on_primary_count": 0,
"lfs_objects_synced_in_percentage": "0.00%",
"job_artifacts_count": 2,
"job_artifacts_synced_count": 1,
"job_artifacts_failed_count": 1,
"job_artifacts_synced_missing_on_primary_count": 0,
"job_artifacts_synced_in_percentage": "50.00%",
"repositories_count": 41,
"repositories_failed_count": 1,
......@@ -209,15 +212,18 @@ Example response:
"attachments_count": 1,
"attachments_synced_count": 1,
"attachments_failed_count": 0,
"attachments_synced_missing_on_primary_count": 0,
"attachments_synced_in_percentage": "100.00%",
"db_replication_lag_seconds": 0,
"lfs_objects_count": 0,
"lfs_objects_synced_count": 0,
"lfs_objects_failed_count": 0,
"lfs_objects_synced_missing_on_primary_count": 0,
"lfs_objects_synced_in_percentage": "0.00%",
"job_artifacts_count": 2,
"job_artifacts_synced_count": 1,
"job_artifacts_failed_count": 1,
"job_artifacts_synced_missing_on_primary_count": 0,
"job_artifacts_synced_in_percentage": "50.00%",
"repositories_count": 41,
"repositories_failed_count": 1,
......
......@@ -32,6 +32,14 @@ module Geo
end
end
def count_synced_missing_on_primary_attachments
if aggregate_pushdown_supported? && !use_legacy_queries?
fdw_find_synced_missing_on_primary_attachments.count
else
legacy_find_synced_missing_on_primary_attachments.count
end
end
def count_registry_attachments
Geo::FileRegistry.attachments.count
end
......@@ -84,6 +92,28 @@ module Geo
relation.limit(batch_size)
end
def find_retryable_failed_attachments_registries(batch_size:, except_file_ids: [])
find_failed_attachments_registries
.retry_due
.where.not(file_id: except_file_ids)
.limit(batch_size)
end
def find_retryable_synced_missing_on_primary_attachments_registries(batch_size:, except_file_ids: [])
find_synced_missing_on_primary_attachments_registries
.retry_due
.where.not(file_id: except_file_ids)
.limit(batch_size)
end
def find_failed_attachments_registries
Geo::FileRegistry.attachments.failed
end
def find_synced_missing_on_primary_attachments_registries
Geo::FileRegistry.attachments.synced.missing_on_primary
end
private
def group_uploads
......@@ -146,6 +176,10 @@ module Geo
.where.not(id: except_file_ids)
end
def fdw_find_synced_missing_on_primary_attachments
fdw_find_synced_attachments.merge(Geo::FileRegistry.missing_on_primary)
end
def fdw_attachments
if selective_sync?
Geo::Fdw::Upload.where(group_uploads.or(project_uploads).or(other_uploads))
......@@ -180,7 +214,7 @@ module Geo
def legacy_find_failed_attachments
legacy_inner_join_registry_ids(
local_attachments,
Geo::FileRegistry.attachments.failed.pluck(:file_id),
find_failed_attachments_registries.pluck(:file_id),
Upload
)
end
......@@ -204,5 +238,13 @@ module Geo
Upload
)
end
def legacy_find_synced_missing_on_primary_attachments
legacy_inner_join_registry_ids(
local_attachments,
Geo::FileRegistry.attachments.synced.missing_on_primary.pluck(:file_id),
Upload
)
end
end
end
module Geo
class FileRegistryFinder < RegistryFinder
def find_failed_file_registries(batch_size:)
Geo::FileRegistry.failed.retry_due.limit(batch_size)
end
protected
def legacy_pluck_registry_file_ids(file_types:)
......
......@@ -20,11 +20,19 @@ module Geo
end
end
def count_synced_missing_on_primary_job_artifacts
if aggregate_pushdown_supported?
find_synced_missing_on_primary_job_artifacts.count
else
legacy_find_synced_missing_on_primary_job_artifacts.count
end
end
def count_registry_job_artifacts
Geo::JobArtifactRegistry.count
end
# Find limited amount of non replicated lfs objects.
# Find limited amount of non replicated job artifacts.
#
# You can pass a list with `except_artifact_ids:` so you can exclude items you
# already scheduled but haven't finished and aren't persisted to the database yet
......@@ -45,12 +53,12 @@ module Geo
relation.limit(batch_size)
end
def find_migrated_local_job_artifacts(batch_size:, except_file_ids: [])
def find_migrated_local_job_artifacts(batch_size:, except_artifact_ids: [])
relation =
if use_legacy_queries?
legacy_find_migrated_local_job_artifacts(except_file_ids: except_file_ids)
legacy_find_migrated_local_job_artifacts(except_artifact_ids: except_artifact_ids)
else
fdw_find_migrated_local_job_artifacts(except_file_ids: except_file_ids)
fdw_find_migrated_local_job_artifacts(except_artifact_ids: except_artifact_ids)
end
relation.limit(batch_size)
......@@ -68,10 +76,28 @@ module Geo
job_artifacts.with_files_stored_locally
end
def find_retryable_failed_job_artifacts_registries(batch_size:, except_artifact_ids: [])
find_failed_job_artifacts_registries
.retry_due
.where.not(artifact_id: except_artifact_ids)
.limit(batch_size)
end
def find_retryable_synced_missing_on_primary_job_artifacts_registries(batch_size:, except_artifact_ids: [])
find_synced_missing_on_primary_job_artifacts_registries
.retry_due
.where.not(artifact_id: except_artifact_ids)
.limit(batch_size)
end
def find_synced_job_artifacts_registries
Geo::JobArtifactRegistry.synced
end
def find_synced_missing_on_primary_job_artifacts_registries
Geo::JobArtifactRegistry.synced.missing_on_primary
end
def find_failed_job_artifacts_registries
Geo::JobArtifactRegistry.failed
end
......@@ -86,6 +112,14 @@ module Geo
end
end
def find_synced_missing_on_primary_job_artifacts
if use_legacy_queries?
legacy_find_synced_missing_on_primary_job_artifacts
else
fdw_find_job_artifacts.merge(find_synced_missing_on_primary_job_artifacts_registries)
end
end
def find_failed_job_artifacts
if use_legacy_queries?
legacy_find_failed_job_artifacts
......@@ -111,10 +145,10 @@ module Geo
.where.not(id: except_artifact_ids)
end
def fdw_find_migrated_local_job_artifacts(except_file_ids:)
def fdw_find_migrated_local_job_artifacts(except_artifact_ids:)
fdw_job_artifacts.joins("INNER JOIN job_artifact_registry ON job_artifact_registry.artifact_id = #{fdw_job_artifacts_table}.id")
.with_files_stored_remotely
.where.not(id: except_file_ids)
.where.not(id: except_artifact_ids)
.merge(Geo::JobArtifactRegistry.all)
end
......@@ -165,8 +199,8 @@ module Geo
(ids + include_registry_ids).uniq
end
def legacy_find_migrated_local_job_artifacts(except_file_ids:)
registry_file_ids = Geo::JobArtifactRegistry.pluck(:artifact_id) - except_file_ids
def legacy_find_migrated_local_job_artifacts(except_artifact_ids:)
registry_file_ids = Geo::JobArtifactRegistry.pluck(:artifact_id) - except_artifact_ids
legacy_inner_join_registry_ids(
job_artifacts.with_files_stored_remotely,
......@@ -174,5 +208,13 @@ module Geo
Ci::JobArtifact
)
end
def legacy_find_synced_missing_on_primary_job_artifacts
legacy_inner_join_registry_ids(
local_job_artifacts,
find_synced_missing_on_primary_job_artifacts_registries.pluck(:artifact_id),
Ci::JobArtifact
)
end
end
end
......@@ -20,6 +20,14 @@ module Geo
end
end
def count_synced_missing_on_primary_lfs_objects
if aggregate_pushdown_supported? && !use_legacy_queries?
fdw_find_synced_missing_on_primary_lfs_objects.count
else
legacy_find_synced_missing_on_primary_lfs_objects.count
end
end
def count_registry_lfs_objects
Geo::FileRegistry.lfs_objects.count
end
......@@ -68,13 +76,35 @@ module Geo
lfs_objects.with_files_stored_locally
end
def find_retryable_failed_lfs_objects_registries(batch_size:, except_file_ids: [])
find_failed_lfs_objects_registries
.retry_due
.where.not(file_id: except_file_ids)
.limit(batch_size)
end
def find_retryable_synced_missing_on_primary_lfs_objects_registries(batch_size:, except_file_ids: [])
find_synced_missing_on_primary_lfs_objects_registries
.retry_due
.where.not(file_id: except_file_ids)
.limit(batch_size)
end
def find_failed_lfs_objects_registries
Geo::FileRegistry.lfs_objects.failed
end
def find_synced_missing_on_primary_lfs_objects_registries
Geo::FileRegistry.lfs_objects.synced.missing_on_primary
end
private
def find_synced_lfs_objects
if use_legacy_queries?
legacy_find_synced_lfs_objects
else
fdw_find_lfs_objects.merge(Geo::FileRegistry.synced)
fdw_find_synced_lfs_objects
end
end
......@@ -82,7 +112,7 @@ module Geo
if use_legacy_queries?
legacy_find_failed_lfs_objects
else
fdw_find_lfs_objects.merge(Geo::FileRegistry.failed)
fdw_find_failed_lfs_objects
end
end
......@@ -112,6 +142,18 @@ module Geo
.merge(Geo::FileRegistry.lfs_objects)
end
def fdw_find_synced_lfs_objects
fdw_find_lfs_objects.merge(Geo::FileRegistry.synced)
end
def fdw_find_synced_missing_on_primary_lfs_objects
fdw_find_lfs_objects.merge(Geo::FileRegistry.synced.missing_on_primary)
end
def fdw_find_failed_lfs_objects
fdw_find_lfs_objects.merge(Geo::FileRegistry.failed)
end
def fdw_lfs_objects
if selective_sync?
Geo::Fdw::LfsObject.joins(:project).where(projects: { id: current_node.projects })
......@@ -139,7 +181,7 @@ module Geo
def legacy_find_failed_lfs_objects
legacy_inner_join_registry_ids(
local_lfs_objects,
Geo::FileRegistry.lfs_objects.failed.pluck(:file_id),
find_failed_lfs_objects_registries.pluck(:file_id),
LfsObject
)
end
......@@ -163,5 +205,13 @@ module Geo
LfsObject
)
end
def legacy_find_synced_missing_on_primary_lfs_objects
legacy_inner_join_registry_ids(
local_lfs_objects,
Geo::FileRegistry.lfs_objects.synced.missing_on_primary.pluck(:file_id),
LfsObject
)
end
end
end
......@@ -5,5 +5,6 @@ module Geo::Syncable
scope :failed, -> { where(success: false) }
scope :synced, -> { where(success: true) }
scope :retry_due, -> { where('retry_at is NULL OR retry_at < ?', Time.now) }
scope :missing_on_primary, -> { where(missing_on_primary: true) }
end
end
......@@ -37,14 +37,17 @@ class GeoNodeStatus < ActiveRecord::Base
lfs_objects_synced_count: 'Number of local LFS objects synced on secondary',
lfs_objects_failed_count: 'Number of local LFS objects failed to sync on secondary',
lfs_objects_registry_count: 'Number of LFS objects in the registry',
lfs_objects_synced_missing_on_primary_count: 'Number of LFS objects marked as synced due to the file missing on the primary',
job_artifacts_count: 'Total number of local job artifacts available on primary',
job_artifacts_synced_count: 'Number of local job artifacts synced on secondary',
job_artifacts_failed_count: 'Number of local job artifacts failed to sync on secondary',
job_artifacts_registry_count: 'Number of job artifacts in the registry',
job_artifacts_synced_missing_on_primary_count: 'Number of job artifacts marked as synced due to the file missing on the primary',
attachments_count: 'Total number of local file attachments available on primary',
attachments_synced_count: 'Number of local file attachments synced on secondary',
attachments_failed_count: 'Number of local file attachments failed to sync on secondary',
attachments_registry_count: 'Number of attachments in the registry',
attachments_synced_missing_on_primary_count: 'Number of attachments marked as synced due to the file missing on the primary',
replication_slots_count: 'Total number of replication slots on the primary',
replication_slots_used_count: 'Number of replication slots in use on the primary',
replication_slots_max_retained_wal_bytes: 'Maximum number of bytes retained in the WAL on the primary',
......@@ -168,12 +171,15 @@ class GeoNodeStatus < ActiveRecord::Base
self.lfs_objects_synced_count = lfs_objects_finder.count_synced_lfs_objects
self.lfs_objects_failed_count = lfs_objects_finder.count_failed_lfs_objects
self.lfs_objects_registry_count = lfs_objects_finder.count_registry_lfs_objects
self.lfs_objects_synced_missing_on_primary_count = lfs_objects_finder.count_synced_missing_on_primary_lfs_objects
self.job_artifacts_synced_count = job_artifacts_finder.count_synced_job_artifacts
self.job_artifacts_failed_count = job_artifacts_finder.count_failed_job_artifacts
self.job_artifacts_registry_count = job_artifacts_finder.count_registry_job_artifacts
self.job_artifacts_synced_missing_on_primary_count = job_artifacts_finder.count_synced_missing_on_primary_job_artifacts
self.attachments_synced_count = attachments_finder.count_synced_attachments
self.attachments_failed_count = attachments_finder.count_failed_attachments
self.attachments_registry_count = attachments_finder.count_registry_attachments
self.attachments_synced_missing_on_primary_count = attachments_finder.count_synced_missing_on_primary_attachments
end
end
......
module Geo
# This class is responsible for:
# * Finding the appropriate Downloader class for a FileRegistry record
# * Executing the Downloader
# * Marking the FileRegistry record as synced or needing retry
class FileDownloadService < FileService
LEASE_TIMEOUT = 8.hours.freeze
......@@ -8,13 +12,15 @@ module Geo
def execute
try_obtain_lease do
start_time = Time.now
bytes_downloaded = downloader.execute
success = (bytes_downloaded.present? && bytes_downloaded >= 0)
log_info("File download",
success: success,
bytes_downloaded: bytes_downloaded,
download_time_s: (Time.now - start_time).to_f.round(3))
update_registry(bytes_downloaded, success: success)
download_result = downloader.execute
mark_as_synced = download_result.success || download_result.primary_missing_file
log_file_download(mark_as_synced, download_result, start_time)
update_registry(download_result.bytes_downloaded,
mark_as_synced: mark_as_synced,
missing_on_primary: download_result.primary_missing_file)
end
end
......@@ -28,8 +34,21 @@ module Geo
raise
end
def update_registry(bytes_downloaded, success:)
transfer =
def log_file_download(mark_as_synced, download_result, start_time)
metadata = {
mark_as_synced: mark_as_synced,
download_success: download_result.success,
bytes_downloaded: download_result.bytes_downloaded,
failed_before_transfer: download_result.failed_before_transfer,
primary_missing_file: download_result.primary_missing_file,
download_time_s: (Time.now - start_time).to_f.round(3)
}
log_info("File download", metadata)
end
def update_registry(bytes_downloaded, mark_as_synced:, missing_on_primary: false)
registry =
if object_type.to_sym == :job_artifact
Geo::JobArtifactRegistry.find_or_initialize_by(artifact_id: object_db_id)
else
......@@ -39,16 +58,22 @@ module Geo
)
end
transfer.bytes = bytes_downloaded
transfer.success = success
registry.bytes = bytes_downloaded
registry.success = mark_as_synced
registry.missing_on_primary = missing_on_primary
retry_later = !registry.success || registry.missing_on_primary
unless success
if retry_later
# We don't limit the amount of retries
transfer.retry_count = (transfer.retry_count || 0) + 1
transfer.retry_at = Time.now + delay(transfer.retry_count).seconds
registry.retry_count = (registry.retry_count || 0) + 1
registry.retry_at = Time.now + delay(registry.retry_count).seconds
else
registry.retry_count = 0
registry.retry_at = nil
end
transfer.save
registry.save
end
def lease_key
......
......@@ -76,7 +76,7 @@ module Geo
end
def upload?
Geo::FileService::DEFAULT_OBJECT_TYPES.include?(object_type)
Geo::FileService::DEFAULT_OBJECT_TYPES.include?(object_type.to_s)
end
def lease_key
......
......@@ -9,7 +9,7 @@ module Geo
DEFAULT_SERVICE_TYPE = 'file'.freeze
def initialize(object_type, object_db_id)
@object_type = object_type.to_s
@object_type = object_type.to_sym
@object_db_id = object_db_id
end
......@@ -21,19 +21,19 @@ module Geo
def service_klass_name
klass_name =
if DEFAULT_OBJECT_TYPES.include?(object_type)
if DEFAULT_OBJECT_TYPES.include?(object_type.to_s)
DEFAULT_SERVICE_TYPE
else
object_type
end
klass_name.camelize
klass_name.to_s.camelize
end
def base_log_data(message)
{
class: self.class.name,
object_type: object_type,
object_type: object_type.to_s,
object_db_id: object_db_id,
message: message
}
......
module Geo
# This class is responsible for:
# * Handling file requests from the secondary over the API
# * Returning the necessary response data to send the file back
class FileUploadService < FileService
attr_reader :auth_header
......
......@@ -9,84 +9,61 @@ module Geo
end
def schedule_job(object_type, object_db_id)
job_id = FileDownloadWorker.perform_async(object_type, object_db_id)
job_id = FileDownloadWorker.perform_async(object_type.to_s, object_db_id)
{ id: object_db_id, type: object_type, job_id: job_id } if job_id
end
def attachments_finder
@attachments_finder ||= AttachmentRegistryFinder.new(current_node: current_node)
end
def file_registry_finder
@file_registry_finder ||= FileRegistryFinder.new(current_node: current_node)
end
def lfs_objects_finder
@lfs_objects_finder ||= LfsObjectRegistryFinder.new(current_node: current_node)
end
def job_artifacts_finder
@job_artifacts_finder ||= JobArtifactRegistryFinder.new(current_node: current_node)
end
# Pools for new resources to be transferred
#
# @return [Array] resources to be transferred
def load_pending_resources
resources = find_unsynced_objects(batch_size: db_retrieve_batch_size)
resources = find_unsynced_jobs(batch_size: db_retrieve_batch_size)
remaining_capacity = db_retrieve_batch_size - resources.count
if remaining_capacity.zero?
resources
else
resources + find_failed_upload_object_ids(batch_size: remaining_capacity)
resources + find_low_priority_jobs(batch_size: remaining_capacity)
end
end
def find_unsynced_objects(batch_size:)
lfs_object_ids = find_unsynced_lfs_objects_ids(batch_size: batch_size)
attachment_ids = find_unsynced_attachments_ids(batch_size: batch_size)
job_artifact_ids = find_unsynced_job_artifacts_ids(batch_size: batch_size)
take_batch(lfs_object_ids, attachment_ids, job_artifact_ids)
end
def find_unsynced_lfs_objects_ids(batch_size:)
lfs_objects_finder.find_unsynced_lfs_objects(batch_size: batch_size, except_file_ids: scheduled_file_ids(:lfs))
.pluck(:id)
.map { |id| [:lfs, id] }
# @return [Array] job arguments of unsynced resources
def find_unsynced_jobs(batch_size:)
find_jobs(sync_statuses: [:unsynced], batch_size: batch_size)
end
def find_unsynced_attachments_ids(batch_size:)
attachments_finder.find_unsynced_attachments(batch_size: batch_size, except_file_ids: scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES))
.pluck(:uploader, :id)
.map { |uploader, id| [uploader.sub(/Uploader\z/, '').underscore, id] }
# @return [Array] job arguments of low priority resources
def find_low_priority_jobs(batch_size:)
find_jobs(sync_statuses: [:failed, :synced_missing_on_primary], batch_size: batch_size)
end
def find_unsynced_job_artifacts_ids(batch_size:)
job_artifacts_finder.find_unsynced_job_artifacts(batch_size: batch_size, except_artifact_ids: scheduled_file_ids(:job_artifact))
.pluck(:id)
.map { |id| [:job_artifact, id] }
end
def find_failed_upload_object_ids(batch_size:)
file_ids = file_registry_finder.find_failed_file_registries(batch_size: batch_size)
.pluck(:file_type, :file_id)
artifact_ids = find_failed_artifact_ids(batch_size: batch_size)
# Get a batch of resources taking equal parts from each resource.
#
# @return [Array] job arguments of a batch of resources
def find_jobs(sync_statuses:, batch_size:)
jobs = job_finders.reduce([]) do |jobs, job_finder|
sync_statuses.reduce(jobs) do |jobs, sync_status|
jobs << job_finder.find_jobs(sync_status: sync_status, batch_size: batch_size)
end
end
take_batch(file_ids, artifact_ids)
take_batch(*jobs, batch_size: batch_size)
end
def find_failed_artifact_ids(batch_size:)
job_artifacts_finder.find_failed_job_artifacts_registries.retry_due.limit(batch_size)
.pluck(:artifact_id).map { |id| [:job_artifact, id] }
def job_finders
@job_finders ||= [
Geo::FileDownloadDispatchWorker::AttachmentJobFinder.new(scheduled_file_ids(Geo::FileService::DEFAULT_OBJECT_TYPES)),
Geo::FileDownloadDispatchWorker::LfsObjectJobFinder.new(scheduled_file_ids(:lfs)),
Geo::FileDownloadDispatchWorker::JobArtifactJobFinder.new(scheduled_file_ids(:job_artifact))
]
end
def scheduled_file_ids(file_types)
file_types = Array(file_types)
file_types = file_types.map(&:to_s)
scheduled_jobs.select { |data| file_types.include?(data[:type]) }.map { |data| data[:id] }
scheduled_jobs.select { |data| file_types.include?(data[:type].to_s) }.map { |data| data[:id] }
end
end
end
module Geo
class FileDownloadDispatchWorker
class AttachmentJobFinder < JobFinder
def resource_type
:attachment
end
def except_resource_ids_key
:except_file_ids
end
# Why do we need a different `file_type` for each Uploader? Why not just use 'upload'?
def find_unsynced_jobs(batch_size:)
registry_finder.find_unsynced_attachments(batch_size: batch_size, except_file_ids: scheduled_file_ids)
.pluck(:id, :uploader)
.map { |id, uploader| [uploader.sub(/Uploader\z/, '').underscore, id] }
end
def find_failed_jobs(batch_size:)
find_failed_registries(batch_size: batch_size).pluck(:file_type, :file_id)
end
def find_synced_missing_on_primary_jobs(batch_size:)
find_synced_missing_on_primary_registries(batch_size: batch_size).pluck(:file_type, :file_id)
end
end
end
end
module Geo
class FileDownloadDispatchWorker
class JobArtifactJobFinder < JobFinder
def resource_type
:job_artifact
end
def resource_id_prefix
:artifact
end
def find_unsynced_jobs(batch_size:)
registry_finder.find_unsynced_job_artifacts(batch_size: batch_size, except_artifact_ids: scheduled_file_ids)
.pluck(:id)
.map { |id| ['job_artifact', id] }
end
def find_failed_jobs(batch_size:)
find_failed_registries(batch_size: batch_size).pluck(:artifact_id).map { |id| ['job_artifact', id] }
end
def find_synced_missing_on_primary_jobs(batch_size:)
find_synced_missing_on_primary_registries(batch_size: batch_size).pluck(:artifact_id).map { |id| ['job_artifact', id] }
end
end
end
end
module Geo
class FileDownloadDispatchWorker
class JobFinder
include Gitlab::Utils::StrongMemoize
attr_reader :registry_finder, :scheduled_file_ids
def initialize(scheduled_file_ids)
current_node = Gitlab::Geo.current_node
@registry_finder = registry_finder_class.new(current_node: current_node)
@scheduled_file_ids = scheduled_file_ids
end
def registry_finder_class
"Geo::#{resource_type.to_s.classify}RegistryFinder".constantize
end
def except_resource_ids_key
:"except_#{resource_id_prefix}_ids"
end
def find_jobs(sync_status:, batch_size:)
self.public_send(:"find_#{sync_status}_jobs", batch_size: batch_size) # rubocop:disable GitlabSecurity/PublicSend
end
def find_failed_registries(batch_size:)
registry_finder.public_send(:"find_retryable_failed_#{resource_type}s_registries", batch_size: batch_size, except_resource_ids_key => scheduled_file_ids) # rubocop:disable GitlabSecurity/PublicSend
end
def find_synced_missing_on_primary_registries(batch_size:)
registry_finder.public_send(:"find_retryable_synced_missing_on_primary_#{resource_type}s_registries", batch_size: batch_size, except_resource_ids_key => scheduled_file_ids) # rubocop:disable GitlabSecurity/PublicSend
end
end
end
end
module Geo
class FileDownloadDispatchWorker
class LfsObjectJobFinder < JobFinder
def resource_type
:lfs_object
end
def except_resource_ids_key
:except_file_ids
end
def find_unsynced_jobs(batch_size:)
registry_finder.find_unsynced_lfs_objects(batch_size: batch_size, except_file_ids: scheduled_file_ids)
.pluck(:id)
.map { |id| ['lfs', id] }
end
def find_failed_jobs(batch_size:)
find_failed_registries(batch_size: batch_size).pluck(:file_id).map { |id| ['lfs', id] }
end
def find_synced_missing_on_primary_jobs(batch_size:)
find_synced_missing_on_primary_registries(batch_size: batch_size).pluck(:file_id).map { |id| ['lfs', id] }
end
end
end
end
......@@ -20,7 +20,7 @@ module Geo
end
def schedule_job(object_type, object_db_id)
job_id = ::Geo::FileRegistryRemovalWorker.perform_async(object_type, object_db_id)
job_id = ::Geo::FileRegistryRemovalWorker.perform_async(object_type.to_s, object_db_id)
if job_id
retval = { id: object_db_id, type: object_type, job_id: job_id }
......@@ -47,7 +47,7 @@ module Geo
lfs_objects_finder.find_migrated_local_lfs_objects(batch_size: batch_size, except_file_ids: scheduled_file_ids(:lfs))
.pluck(:id)
.map { |id| [:lfs, id] }
.map { |id| ['lfs', id] }
end
def find_migrated_local_attachments_ids(batch_size:)
......@@ -61,15 +61,16 @@ module Geo
def find_migrated_local_job_artifacts_ids(batch_size:)
return [] unless job_artifacts_object_store_enabled?
job_artifacts_finder.find_migrated_local_job_artifacts(batch_size: batch_size, except_file_ids: scheduled_file_ids(:job_artifact))
job_artifacts_finder.find_migrated_local_job_artifacts(batch_size: batch_size, except_artifact_ids: scheduled_file_ids(:job_artifact))
.pluck(:id)
.map { |id| [:job_artifact, id] }
.map { |id| ['job_artifact', id] }
end
def scheduled_file_ids(file_types)
file_types = Array(file_types)
file_types = file_types.map(&:to_s)
scheduled_jobs.select { |data| file_types.include?(data[:type]) }.map { |data| data[:id] }
scheduled_jobs.select { |data| file_types.include?(data[:type].to_s) }.map { |data| data[:id] }
end
def attachments_object_store_enabled?
......
......@@ -107,8 +107,8 @@ module Geo
(Time.now.utc - start_time) >= run_time
end
def take_batch(*arrays)
interleave(*arrays).uniq.compact.take(db_retrieve_batch_size)
def take_batch(*arrays, batch_size: db_retrieve_batch_size)
interleave(*arrays).uniq.compact.take(batch_size)
end
# Combines the elements of multiple, arbitrary-length arrays into a single array.
......
---
title: Mark files missing on primary as synced, but retry them
merge_request: 5050
author:
type: added
class AddMissingOnPrimaryToFileRegistry < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
add_column_with_default :file_registry, :missing_on_primary, :boolean, default: false, allow_null: false
end
def down
remove_column :file_registry, :missing_on_primary
end
end
class AddMissingOnPrimaryToJobArtifactRegistry < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
add_column_with_default :job_artifact_registry, :missing_on_primary, :boolean, default: false, allow_null: false
end
def down
remove_column :job_artifact_registry, :missing_on_primary
end
end
......@@ -28,6 +28,7 @@ ActiveRecord::Schema.define(version: 20180405074130) do
t.boolean "success", default: false, null: false
t.integer "retry_count"
t.datetime "retry_at"
t.boolean "missing_on_primary", default: false, null: false
end
add_index "file_registry", ["file_type", "file_id"], name: "index_file_registry_on_file_type_and_file_id", unique: true, using: :btree
......@@ -43,6 +44,7 @@ ActiveRecord::Schema.define(version: 20180405074130) do
t.integer "retry_count"
t.boolean "success"
t.string "sha256"
t.boolean "missing_on_primary", default: false, null: false
end
add_index "job_artifact_registry", ["retry_at"], name: "index_job_artifact_registry_on_retry_at", using: :btree
......
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class AddMissingOnPrimaryCountsToGeoNodeStatuses < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
def change
add_column :geo_node_statuses, :lfs_objects_synced_missing_on_primary_count, :integer
add_column :geo_node_statuses, :job_artifacts_synced_missing_on_primary_count, :integer
add_column :geo_node_statuses, :attachments_synced_missing_on_primary_count, :integer
end
end
......@@ -23,8 +23,7 @@ module API
file = response[:file]
present_disk_file!(file.path, file.filename)
else
status response[:code]
response
error! response, response.delete(:code)
end
end
......
......@@ -260,6 +260,7 @@ module EE
expose :attachments_count
expose :attachments_synced_count
expose :attachments_failed_count
expose :attachments_synced_missing_on_primary_count
expose :attachments_synced_in_percentage do |node|
number_to_percentage(node.attachments_synced_in_percentage, precision: 2)
end
......@@ -269,6 +270,7 @@ module EE
expose :lfs_objects_count
expose :lfs_objects_synced_count
expose :lfs_objects_failed_count
expose :lfs_objects_synced_missing_on_primary_count
expose :lfs_objects_synced_in_percentage do |node|
number_to_percentage(node.lfs_objects_synced_in_percentage, precision: 2)
end
......@@ -276,6 +278,7 @@ module EE
expose :job_artifacts_count
expose :job_artifacts_synced_count
expose :job_artifacts_failed_count
expose :job_artifacts_synced_missing_on_primary_count
expose :job_artifacts_synced_in_percentage do |node|
number_to_percentage(node.job_artifacts_synced_in_percentage, precision: 2)
end
......
module Gitlab
module Geo
# This class is responsible for:
# * Finding an Upload record
# * Requesting and downloading the Upload's file from the primary
# * Returning a detailed Result
#
# TODO: Rearrange things so this class not inherited by JobArtifactDownloader and LfsDownloader
# Maybe rename it so it doesn't seem generic. It only works with Upload records.
class FileDownloader
attr_reader :object_type, :object_db_id
......@@ -14,10 +21,33 @@ module Gitlab
# or nil or -1 if a failure occurred.
def execute
upload = Upload.find_by(id: object_db_id)
return unless upload.present?
return fail_before_transfer unless upload.present?
transfer = ::Gitlab::Geo::FileTransfer.new(object_type.to_sym, upload)
transfer.download_from_primary
Result.from_transfer_result(transfer.download_from_primary)
end
class Result
attr_reader :success, :bytes_downloaded, :primary_missing_file, :failed_before_transfer
def self.from_transfer_result(transfer_result)
Result.new(success: transfer_result.success,
primary_missing_file: transfer_result.primary_missing_file,
bytes_downloaded: transfer_result.bytes_downloaded)
end
def initialize(success:, bytes_downloaded:, primary_missing_file: false, failed_before_transfer: false)
@success = success
@bytes_downloaded = bytes_downloaded
@primary_missing_file = primary_missing_file
@failed_before_transfer = failed_before_transfer
end
end
private
def fail_before_transfer
Result.new(success: false, bytes_downloaded: 0, failed_before_transfer: true)
end
end
end
......
module Gitlab
module Geo
# This class is responsible for:
# * Requesting an Upload file from the primary
# * Saving it in the right place on successful download
# * Returning a detailed Result object
class FileTransfer < Transfer
def initialize(file_type, upload)
@file_type = file_type
......
module Gitlab
module Geo
# This class is responsible for:
# * Finding an Upload record
# * Returning the necessary response data to send the file back
#
# TODO: Rearrange things so this class not inherited by JobArtifactUploader and LfsUploader
# Maybe rename it so it doesn't seem generic. It only works with Upload records.
class FileUploader
include LogHelpers
FILE_NOT_FOUND_GEO_CODE = 'FILE_NOT_FOUND'.freeze
attr_reader :object_db_id, :message
def initialize(object_db_id, message)
......@@ -11,8 +21,9 @@ module Gitlab
def execute
recorded_file = Upload.find_by(id: object_db_id)
return error unless recorded_file&.exist?
return error unless valid?(recorded_file)
return error('Upload not found') unless recorded_file
return file_not_found(recorded_file) unless recorded_file.exist?
return error('Upload not found') unless valid?(recorded_file)
success(CarrierWave::SanitizedFile.new(recorded_file.absolute_path))
end
......@@ -37,9 +48,20 @@ module Gitlab
{ code: :ok, message: 'Success', file: file }
end
def error(message = 'File not found')
def error(message)
{ code: :not_found, message: message }
end
# A 404 implies the client made a mistake requesting that resource.
# In this case, we know that the resource should exist, so it is a 500 server error.
# We send a special "geo_code" so the secondary can mark the file as synced.
def file_not_found(resource)
{
code: :not_found,
geo_code: FILE_NOT_FOUND_GEO_CODE,
message: "#{resource.class.name} ##{resource.id} file not found"
}
end
end
end
end
module Gitlab
module Geo
# This class is responsible for:
# * Finding a ::Ci::JobArtifact record
# * Requesting and downloading the JobArtifact's file from the primary
# * Returning a detailed Result
#
# TODO: Rearrange things so this class does not inherit FileDownloader
class JobArtifactDownloader < FileDownloader
def execute
job_artifact = ::Ci::JobArtifact.find_by(id: object_db_id)
return unless job_artifact.present?
return fail_before_transfer unless job_artifact.present?
transfer = ::Gitlab::Geo::JobArtifactTransfer.new(job_artifact)
transfer.download_from_primary
Result.from_transfer_result(transfer.download_from_primary)
end
end
end
......
module Gitlab
module Geo
# This class is responsible for:
# * Requesting an ::Ci::JobArtifact file from the primary
# * Saving it in the right place on successful download
# * Returning a detailed Result object
class JobArtifactTransfer < Transfer
def initialize(job_artifact)
@file_type = :job_artifact
......
module Gitlab
module Geo
# This class is responsible for:
# * Finding an ::Ci::JobArtifact record
# * Returning the necessary response data to send the file back
#
# TODO: Rearrange things so this class does not inherit from FileUploader
class JobArtifactUploader < ::Gitlab::Geo::FileUploader
def execute
job_artifact = ::Ci::JobArtifact.find_by(id: object_db_id)
......@@ -9,7 +14,9 @@ module Gitlab
end
unless job_artifact.file.present? && job_artifact.file.exists?
return error('Job artifact does not have a file')
log_error("Could not upload job artifact because it does not have a file", id: job_artifact.id)
return file_not_found(job_artifact)
end
success(job_artifact.file)
......
module Gitlab
module Geo
# This class is responsible for:
# * Finding a LfsObject record
# * Requesting and downloading the LfsObject's file from the primary
# * Returning a detailed Result
#
# TODO: Rearrange things so this class does not inherit FileDownloader
class LfsDownloader < FileDownloader
def execute
lfs_object = LfsObject.find_by(id: object_db_id)
return unless lfs_object.present?
return fail_before_transfer unless lfs_object.present?
transfer = ::Gitlab::Geo::LfsTransfer.new(lfs_object)
transfer.download_from_primary
Result.from_transfer_result(transfer.download_from_primary)
end
end
end
......
module Gitlab
module Geo
# This class is responsible for:
# * Requesting an LfsObject file from the primary
# * Saving it in the right place on successful download
# * Returning a detailed Result object
class LfsTransfer < Transfer
def initialize(lfs_object)
@file_type = :lfs
......
module Gitlab
module Geo
# This class is responsible for:
# * Finding an LfsObject record
# * Returning the necessary response data to send the file back
#
# TODO: Rearrange things so this class does not inherit from FileUploader
class LfsUploader < FileUploader
def execute
lfs_object = LfsObject.find_by(id: object_db_id)
return error unless lfs_object.present?
return error if message[:checksum] != lfs_object.oid
return error('LFS object not found') unless lfs_object
return error('LFS object not found') if message[:checksum] != lfs_object.oid
unless lfs_object.file.present? && lfs_object.file.exists?
return error('LFS object does not have a file')
log_error("Could not upload LFS object because it does not have a file", id: lfs_object.id)
return file_not_found(lfs_object)
end
success(lfs_object.file)
......
module Gitlab
module Geo
class NamespaceFileDownloader < FileDownloader
end
end
end
module Gitlab
module Geo
class NamespaceFileUploader < FileUploader
end
end
end
......@@ -14,25 +14,39 @@ module Gitlab
@request_data = request_data
end
# Returns number of bytes downloaded or -1 if unsuccessful.
# Returns Result object with success boolean and number of bytes downloaded.
def download_from_primary
return unless Gitlab::Geo.secondary?
return if File.directory?(filename)
return failure unless Gitlab::Geo.secondary?
return failure if File.directory?(filename)
primary = Gitlab::Geo.primary_node
return unless primary
return failure unless primary
url = primary.geo_transfers_url(file_type, file_id.to_s)
req_headers = TransferRequest.new(request_data).headers
return unless ensure_path_exists
return failure unless ensure_path_exists
download_file(url, req_headers)
end
class Result
attr_reader :success, :bytes_downloaded, :primary_missing_file
def initialize(success:, bytes_downloaded:, primary_missing_file: false)
@success = success
@bytes_downloaded = bytes_downloaded
@primary_missing_file = primary_missing_file
end
end
private
def failure(primary_missing_file: false)
Result.new(success: false, bytes_downloaded: 0, primary_missing_file: primary_missing_file)
end
def ensure_path_exists
path = Pathname.new(filename)
dir = path.dirname
......@@ -55,7 +69,7 @@ module Gitlab
file_size = -1
temp_file = open_temp_file(filename)
return unless temp_file
return failure unless temp_file
begin
response = Gitlab::HTTP.get(url, allow_local_requests: true, headers: req_headers, stream_body: true) do |fragment|
......@@ -65,13 +79,13 @@ module Gitlab
temp_file.flush
unless response.success?
log_error("Unsuccessful download", filename: filename, response_code: response.code, response_msg: response.msg, url: url)
return file_size
log_error("Unsuccessful download", filename: filename, response_code: response.code, response_msg: response.try(:msg), url: url)
return failure(primary_missing_file: primary_missing_file?(response, temp_file))
end
if File.directory?(filename)
log_error("Destination file is a directory", filename: filename)
return file_size
return failure
end
FileUtils.mv(temp_file.path, filename)
......@@ -85,7 +99,18 @@ module Gitlab
temp_file.unlink
end
file_size
Result.new(success: file_size > -1, bytes_downloaded: [file_size, 0].max)
end
def primary_missing_file?(response, temp_file)
body = File.read(temp_file.path) if File.exist?(temp_file.path)
if response.code == 404 && body.present?
json_response = JSON.parse(body)
json_response['geo_code'] == Gitlab::Geo::FileUploader::FILE_NOT_FOUND_GEO_CODE
end
rescue JSON::ParserError
false
end
def default_permissions
......
......@@ -8,12 +8,15 @@ FactoryBot.define do
attachments_count 329
attachments_failed_count 13
attachments_synced_count 141
attachments_synced_missing_on_primary_count 89
lfs_objects_count 256
lfs_objects_failed_count 12
lfs_objects_synced_count 123
lfs_objects_synced_missing_on_primary_count 90
job_artifacts_count 580
job_artifacts_failed_count 3
job_artifacts_synced_count 577
job_artifacts_synced_missing_on_primary_count 91
repositories_count 10
repositories_synced_count 5
repositories_failed_count 0
......
require 'spec_helper'
describe Geo::FileRegistryFinder, :geo do
include ::EE::GeoHelpers
let(:secondary) { create(:geo_node) }
subject { described_class.new(current_node: secondary) }
before do
stub_current_geo_node(secondary)
end
describe '#find_failed_file_registries' do
it 'returs uploads that sync has failed' do
failed_lfs_registry = create(:geo_file_registry, :lfs, :with_file, success: false)
failed_file_upload = create(:geo_file_registry, :with_file, success: false)
failed_issuable_upload = create(:geo_file_registry, :with_file, success: false)
create(:geo_file_registry, :lfs, :with_file, success: true)
create(:geo_file_registry, :with_file, success: true)
uploads = subject.find_failed_file_registries(batch_size: 10)
expect(uploads).to match_array([failed_lfs_registry, failed_file_upload, failed_issuable_upload])
end
end
end
......@@ -22,42 +22,6 @@ describe Geo::JobArtifactRegistryFinder, :geo do
stub_artifacts_object_storage
end
describe '#count_synced_job_artifacts' do
it 'delegates to #legacy_find_synced_job_artifacts' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(false)
expect(subject).to receive(:legacy_find_synced_job_artifacts).and_call_original
subject.count_synced_job_artifacts
end
it 'delegates to #find_synced_job_artifacts for PostgreSQL 10' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(true)
expect(subject).to receive(:find_synced_job_artifacts).and_call_original
subject.count_synced_job_artifacts
end
end
describe '#count_failed_job_artifacts' do
it 'delegates to #legacy_find_failed_job_artifacts' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(false)
expect(subject).to receive(:legacy_find_failed_job_artifacts).and_call_original
subject.count_failed_job_artifacts
end
it 'delegates to #find_failed_job_artifacts' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(true)
expect(subject).to receive(:find_failed_job_artifacts).and_call_original
subject.count_failed_job_artifacts
end
end
shared_examples 'counts all the things' do
describe '#count_local_job_artifacts' do
before do
......@@ -95,6 +59,22 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end
describe '#count_synced_job_artifacts' do
it 'delegates to #legacy_find_synced_job_artifacts' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(false)
expect(subject).to receive(:legacy_find_synced_job_artifacts).and_call_original
subject.count_synced_job_artifacts
end
it 'delegates to #find_synced_job_artifacts for PostgreSQL 10' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(true)
expect(subject).to receive(:find_synced_job_artifacts).and_call_original
subject.count_synced_job_artifacts
end
it 'counts job artifacts that have been synced' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
......@@ -141,6 +121,22 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end
describe '#count_failed_job_artifacts' do
it 'delegates to #legacy_find_failed_job_artifacts' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(false)
expect(subject).to receive(:legacy_find_failed_job_artifacts).and_call_original
subject.count_failed_job_artifacts
end
it 'delegates to #find_failed_job_artifacts' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(true)
expect(subject).to receive(:find_failed_job_artifacts).and_call_original
subject.count_failed_job_artifacts
end
it 'counts job artifacts that sync has failed' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id)
......@@ -191,6 +187,74 @@ describe Geo::JobArtifactRegistryFinder, :geo do
end
end
end
describe '#count_synced_missing_on_primary_job_artifacts' do
it 'delegates to #legacy_find_synced_missing_on_primary_job_artifacts' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(false)
expect(subject).to receive(:legacy_find_synced_missing_on_primary_job_artifacts).and_call_original
subject.count_synced_missing_on_primary_job_artifacts
end
it 'delegates to #find_synced_missing_on_primary_job_artifacts for PostgreSQL 10' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(true)
expect(subject).to receive(:find_synced_missing_on_primary_job_artifacts).and_call_original
subject.count_synced_missing_on_primary_job_artifacts
end
it 'counts job artifacts that have been synced and are missing on the primary' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, missing_on_primary: true)
expect(subject.count_synced_missing_on_primary_job_artifacts).to eq 1
end
it 'excludes job artifacts that are not missing on the primary' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id)
expect(subject.count_synced_missing_on_primary_job_artifacts).to eq 0
end
it 'excludes job artifacts that are not synced' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, success: false, missing_on_primary: true)
expect(subject.count_synced_missing_on_primary_job_artifacts).to eq 0
end
it 'ignores remote job artifacts' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_1.id, missing_on_primary: true)
expect(subject.count_synced_missing_on_primary_job_artifacts).to eq 0
end
context 'with selective sync' do
before do
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end
it 'delegates to #legacy_find_synced_missing_on_primary_job_artifacts' do
expect(subject).to receive(:legacy_find_synced_missing_on_primary_job_artifacts).and_call_original
subject.count_synced_missing_on_primary_job_artifacts
end
it 'counts job artifacts that has been synced' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_1.id, missing_on_primary: true)
create(:geo_job_artifact_registry, artifact_id: job_artifact_2.id, missing_on_primary: true)
create(:geo_job_artifact_registry, artifact_id: job_artifact_3.id, missing_on_primary: true)
expect(subject.count_synced_missing_on_primary_job_artifacts).to eq 2
end
it 'ignores remote job artifacts' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_1.id, missing_on_primary: true)
expect(subject.count_synced_missing_on_primary_job_artifacts).to eq 0
end
end
end
end
shared_examples 'finds all the things' do
......@@ -252,11 +316,11 @@ describe Geo::JobArtifactRegistryFinder, :geo do
expect(job_artifacts).to be_empty
end
it 'excludes except_file_ids' do
it 'excludes except_artifact_ids' do
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_1.id)
create(:geo_job_artifact_registry, artifact_id: job_artifact_remote_2.id)
job_artifacts = subject.find_migrated_local_job_artifacts(batch_size: 10, except_file_ids: [job_artifact_remote_1.id])
job_artifacts = subject.find_migrated_local_job_artifacts(batch_size: 10, except_artifact_ids: [job_artifact_remote_1.id])
expect(job_artifacts).to match_ids(job_artifact_remote_2)
end
......
......@@ -22,30 +22,66 @@ describe Geo::LfsObjectRegistryFinder, :geo do
stub_lfs_object_storage
end
context 'aggregate pushdown not supported' do
before do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(false)
shared_examples 'counts all the things' do
describe '#count_local_lfs_objects' do
before do
lfs_object_1
lfs_object_2
lfs_object_3
lfs_object_4
end
it 'counts LFS objects' do
expect(subject.count_local_lfs_objects).to eq 4
end
it 'ignores remote LFS objects' do
lfs_object_1.update_column(:file_store, ObjectStorage::Store::REMOTE)
expect(subject.count_local_lfs_objects).to eq 3
end
context 'with selective sync' do
before do
allow_any_instance_of(LfsObjectsProject).to receive(:update_project_statistics).and_return(nil)
create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_1)
create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_2)
create(:lfs_objects_project, project: unsynced_project, lfs_object: lfs_object_3)
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end
it 'counts LFS objects' do
expect(subject.count_local_lfs_objects).to eq 2
end
it 'ignores remote LFS objects' do
lfs_object_1.update_column(:file_store, ObjectStorage::Store::REMOTE)
expect(subject.count_local_lfs_objects).to eq 1
end
end
end
describe '#count_synced_lfs_objects' do
it 'delegates to #legacy_find_synced_lfs_objects' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(false)
expect(subject).to receive(:legacy_find_synced_lfs_objects).and_call_original
subject.count_synced_lfs_objects
end
end
describe '#count_failed_lfs_objects' do
it 'delegates to #legacy_find_failed_lfs_objects' do
expect(subject).to receive(:legacy_find_failed_lfs_objects).and_call_original
it 'delegates to #fdw_find_synced_lfs_objects for PostgreSQL 10' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(true)
allow(subject).to receive(:use_legacy_queries?).and_return(false)
subject.count_failed_lfs_objects
expect(subject).to receive(:fdw_find_synced_lfs_objects).and_return(double(count: 1))
subject.count_synced_lfs_objects
end
end
end
shared_examples 'counts all the things' do
describe '#count_synced_lfs_objects' do
it 'counts LFS objects that has been synced' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object_2.id)
......@@ -88,9 +124,10 @@ describe Geo::LfsObjectRegistryFinder, :geo do
end
it 'ignores remote LFS objects' do
create(:geo_file_registry, :lfs, file_id: lfs_object_remote_1.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_2.id)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id)
lfs_object_1.update_column(:file_store, ObjectStorage::Store::REMOTE)
expect(subject.count_synced_lfs_objects).to eq 1
end
......@@ -98,6 +135,22 @@ describe Geo::LfsObjectRegistryFinder, :geo do
end
describe '#count_failed_lfs_objects' do
it 'delegates to #legacy_find_failed_lfs_objects' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(false)
expect(subject).to receive(:legacy_find_failed_lfs_objects).and_call_original
subject.count_failed_lfs_objects
end
it 'delegates to #find_failed_lfs_objects' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(true)
expect(subject).to receive(:find_failed_lfs_objects).and_call_original
subject.count_failed_lfs_objects
end
it 'counts LFS objects that sync has failed' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object_2.id)
......@@ -140,14 +193,90 @@ describe Geo::LfsObjectRegistryFinder, :geo do
end
it 'ignores remote LFS objects' do
create(:geo_file_registry, :lfs, file_id: lfs_object_remote_1.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object_2.id, success: false)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, success: false)
lfs_object_1.update_column(:file_store, ObjectStorage::Store::REMOTE)
expect(subject.count_failed_lfs_objects).to eq 1
end
end
end
describe '#count_synced_missing_on_primary_lfs_objects' do
it 'delegates to #legacy_find_synced_missing_on_primary_lfs_objects' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(false)
expect(subject).to receive(:legacy_find_synced_missing_on_primary_lfs_objects).and_call_original
subject.count_synced_missing_on_primary_lfs_objects
end
it 'delegates to #fdw_find_synced_missing_on_primary_lfs_objects for PostgreSQL 10' do
allow(subject).to receive(:aggregate_pushdown_supported?).and_return(true)
allow(subject).to receive(:use_legacy_queries?).and_return(false)
expect(subject).to receive(:fdw_find_synced_missing_on_primary_lfs_objects).and_return(double(count: 1))
subject.count_synced_missing_on_primary_lfs_objects
end
it 'counts LFS objects that have been synced and are missing on the primary' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, missing_on_primary: true)
expect(subject.count_synced_missing_on_primary_lfs_objects).to eq 1
end
it 'excludes LFS objects that are not missing on the primary' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id)
expect(subject.count_synced_missing_on_primary_lfs_objects).to eq 0
end
it 'excludes LFS objects that are not synced' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, success: false, missing_on_primary: true)
expect(subject.count_synced_missing_on_primary_lfs_objects).to eq 0
end
it 'ignores remote LFS objects' do
create(:geo_file_registry, :lfs, file_id: lfs_object_remote_1.id, missing_on_primary: true)
expect(subject.count_synced_missing_on_primary_lfs_objects).to eq 0
end
context 'with selective sync' do
before do
allow_any_instance_of(LfsObjectsProject).to receive(:update_project_statistics).and_return(nil)
create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_1)
create(:lfs_objects_project, project: synced_project, lfs_object: lfs_object_2)
create(:lfs_objects_project, project: unsynced_project, lfs_object: lfs_object_3)
secondary.update!(selective_sync_type: 'namespaces', namespaces: [synced_group])
end
it 'delegates to #legacy_find_synced_missing_on_primary_lfs_objects' do
expect(subject).to receive(:legacy_find_synced_missing_on_primary_lfs_objects).and_call_original
subject.count_synced_missing_on_primary_lfs_objects
end
it 'counts LFS objects that has been synced' do
create(:geo_file_registry, :lfs, file_id: lfs_object_1.id, missing_on_primary: true)
create(:geo_file_registry, :lfs, file_id: lfs_object_2.id, missing_on_primary: true)
create(:geo_file_registry, :lfs, file_id: lfs_object_3.id, missing_on_primary: true)
expect(subject.count_synced_missing_on_primary_lfs_objects).to eq 2
end
it 'ignores remote LFS objects' do
create(:geo_file_registry, :lfs, file_id: lfs_object_remote_1.id, missing_on_primary: true)
expect(subject.count_synced_missing_on_primary_lfs_objects).to eq 0
end
end
end
end
shared_examples 'finds all the things' do
......
......@@ -9,12 +9,15 @@
"attachments_count",
"attachments_failed_count",
"attachments_synced_count",
"attachments_synced_missing_on_primary_count",
"lfs_objects_count",
"lfs_objects_failed_count",
"lfs_objects_synced_count",
"lfs_objects_synced_missing_on_primary_count",
"job_artifacts_count",
"job_artifacts_failed_count",
"job_artifacts_synced_count",
"job_artifacts_synced_missing_on_primary_count",
"db_replication_lag_seconds",
"repositories_count",
"repositories_failed_count",
......@@ -53,15 +56,18 @@
"attachments_count": { "type": "integer" },
"attachments_failed_count": { "type": ["integer", "null"] },
"attachments_synced_count": { "type": ["integer", "null"] },
"attachments_synced_missing_on_primary_count": { "type": ["integer", "null"] },
"attachments_synced_in_percentage": { "type": "string" },
"db_replication_lag_seconds": { "type": ["integer", "null"] },
"lfs_objects_count": { "type": "integer" },
"lfs_objects_failed_count": { "type": ["integer", "null"] },
"lfs_objects_synced_count": { "type": ["integer", "null"] },
"lfs_objects_synced_missing_on_primary_count": { "type": ["integer", "null"] },
"lfs_objects_synced_in_percentage": { "type": "string" },
"job_artifacts_count": { "type": "integer" },
"job_artifacts_failed_count": { "type": ["integer", "null"] },
"job_artifacts_synced_count": { "type": ["integer", "null"] },
"job_artifacts_synced_missing_on_primary_count": { "type": ["integer", "null"] },
"job_artifacts_synced_in_percentage": { "type": "string" },
"repositories_count": { "type": "integer" },
"repositories_failed_count": { "type": ["integer", "null"] },
......
require 'spec_helper'
describe Gitlab::Geo::FileUploader, :geo do
shared_examples_for 'returns necessary params for sending a file from an API endpoint' do
subject { @subject ||= uploader.execute }
context 'when the upload exists' do
let(:uploader) { described_class.new(upload.id, message) }
before do
expect(Upload).to receive(:find_by).with(id: upload.id).and_return(upload)
end
context 'when the upload has a file' do
before do
FileUtils.mkdir_p(File.dirname(upload.absolute_path))
FileUtils.touch(upload.absolute_path) unless File.exist?(upload.absolute_path)
end
context 'when the message parameters match the upload' do
let(:message) { { id: upload.model_id, type: upload.model_type, checksum: 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' } }
it 'returns the file in a success hash' do
expect(subject).to include(code: :ok, message: 'Success')
expect(subject[:file].file).to eq(upload.absolute_path)
end
end
context 'when the message id does not match the upload model_id' do
let(:message) { { id: 10000, type: upload.model_type, checksum: 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' } }
it 'returns an error hash' do
expect(subject).to include(code: :not_found, message: "Upload not found")
end
end
context 'when the message type does not match the upload model_type' do
let(:message) { { id: upload.model_id, type: 'bad_type', checksum: 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' } }
it 'returns an error hash' do
expect(subject).to include(code: :not_found, message: "Upload not found")
end
end
context 'when the message checksum does not match the upload checksum' do
let(:message) { { id: upload.model_id, type: upload.model_type, checksum: 'doesnotmatch' } }
it 'returns an error hash' do
expect(subject).to include(code: :not_found, message: "Upload not found")
end
end
end
context 'when the upload does not have a file' do
let(:message) { { id: upload.model_id, type: upload.model_type, checksum: 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' } }
it 'returns an error hash' do
expect(subject).to include(code: :not_found, geo_code: 'FILE_NOT_FOUND', message: match(/Upload #\d+ file not found/))
end
end
end
context 'when the upload does not exist' do
it 'returns an error hash' do
result = described_class.new(10000, {}).execute
expect(result).to eq(code: :not_found, message: "Upload not found")
end
end
end
describe '#execute' do
context 'user avatar' do
it_behaves_like "returns necessary params for sending a file from an API endpoint" do
let(:upload) { create(:upload, model: build(:user)) }
end
end
context 'group avatar' do
it_behaves_like "returns necessary params for sending a file from an API endpoint" do
let(:upload) { create(:upload, model: build(:group)) }
end
end
context 'project avatar' do
it_behaves_like "returns necessary params for sending a file from an API endpoint" do
let(:upload) { create(:upload, model: build(:project)) }
end
end
context 'with an attachment' do
it_behaves_like "returns necessary params for sending a file from an API endpoint" do
let(:upload) { create(:upload, :attachment_upload) }
end
end
context 'with a snippet' do
it_behaves_like "returns necessary params for sending a file from an API endpoint" do
let(:upload) { create(:upload, :personal_snippet_upload) }
end
end
context 'with file upload' do
it_behaves_like "returns necessary params for sending a file from an API endpoint" do
let(:upload) { create(:upload, :issuable_upload) }
end
end
context 'with namespace file upload' do
it_behaves_like "returns necessary params for sending a file from an API endpoint" do
let(:upload) { create(:upload, :namespace_upload) }
end
end
end
end
......@@ -3,22 +3,29 @@ require 'spec_helper'
describe Gitlab::Geo::JobArtifactDownloader, :geo do
let(:job_artifact) { create(:ci_job_artifact) }
subject do
described_class.new(:job_artifact, job_artifact.id)
end
context '#execute' do
context 'with job artifact' do
it 'returns a FileDownloader::Result object' do
downloader = described_class.new(:job_artifact, job_artifact.id)
result = Gitlab::Geo::Transfer::Result.new(success: true, bytes_downloaded: 1)
context '#download_from_primary' do
it 'with a job artifact' do
allow_any_instance_of(Gitlab::Geo::JobArtifactTransfer)
.to receive(:download_from_primary).and_return(100)
allow_any_instance_of(Gitlab::Geo::JobArtifactTransfer)
.to receive(:download_from_primary).and_return(result)
expect(subject.execute).to eq(100)
expect(downloader.execute).to be_a(Gitlab::Geo::FileDownloader::Result)
end
end
it 'with an unknown job artifact' do
expect(described_class.new(:job_artifact, 10000)).not_to receive(:download_from_primary)
context 'with unknown job artifact' do
let(:downloader) { described_class.new(:job_artifact, 10000) }
it 'returns a FileDownloader::Result object' do
expect(downloader.execute).to be_a(Gitlab::Geo::FileDownloader::Result)
end
expect(subject.execute).to be_nil
it 'returns a result indicating a failure before a transfer was attempted' do
expect(downloader.execute.failed_before_transfer).to be_truthy
end
end
end
end
......@@ -2,7 +2,8 @@ require 'spec_helper'
describe Gitlab::Geo::JobArtifactUploader, :geo do
context '#execute' do
subject { described_class.new(job_artifact.id, {}).execute }
let(:uploader) { described_class.new(job_artifact.id, {}) }
subject { uploader.execute }
context 'when the job artifact exists' do
before do
......@@ -29,7 +30,13 @@ describe Gitlab::Geo::JobArtifactUploader, :geo do
let(:job_artifact) { create(:ci_job_artifact) }
it 'returns an error hash' do
expect(subject).to eq(code: :not_found, message: "Job artifact does not have a file")
expect(subject).to include(code: :not_found, geo_code: 'FILE_NOT_FOUND', message: match(/JobArtifact #\d+ file not found/))
end
it 'logs the missing file' do
expect(uploader).to receive(:log_error).with("Could not upload job artifact because it does not have a file", id: job_artifact.id)
subject
end
end
end
......
require 'spec_helper'
describe Gitlab::Geo::LfsDownloader do
describe Gitlab::Geo::LfsDownloader, :geo do
let(:lfs_object) { create(:lfs_object) }
subject do
described_class.new(:lfs, lfs_object.id)
end
context '#execute' do
context 'with LFS object' do
it 'returns a FileDownloader::Result object' do
downloader = described_class.new(:lfs, lfs_object.id)
result = Gitlab::Geo::Transfer::Result.new(success: true, bytes_downloaded: 1)
context '#download_from_primary' do
it 'with LFS object' do
allow_any_instance_of(Gitlab::Geo::LfsTransfer)
.to receive(:download_from_primary).and_return(100)
allow_any_instance_of(Gitlab::Geo::LfsTransfer)
.to receive(:download_from_primary).and_return(result)
expect(subject.execute).to eq(100)
expect(downloader.execute).to be_a(Gitlab::Geo::FileDownloader::Result)
end
end
it 'with unknown LFS object' do
expect(described_class.new(:lfs, 10000)).not_to receive(:download_from_primary)
context 'with unknown job artifact' do
let(:downloader) { described_class.new(:lfs, 10000) }
it 'returns a FileDownloader::Result object' do
expect(downloader.execute).to be_a(Gitlab::Geo::FileDownloader::Result)
end
expect(subject.execute).to be_nil
it 'returns a result indicating a failure before a transfer was attempted' do
expect(downloader.execute.failed_before_transfer).to be_truthy
end
end
end
end
require 'spec_helper'
describe Gitlab::Geo::LfsUploader, :geo do
context '#execute' do
subject { uploader.execute }
context 'when the LFS object exists' do
let(:uploader) { described_class.new(lfs_object.id, message) }
before do
expect(LfsObject).to receive(:find_by).with(id: lfs_object.id).and_return(lfs_object)
end
context 'when the LFS object has a file' do
let(:lfs_object) { create(:lfs_object, :with_file) }
let(:message) { { checksum: lfs_object.oid } }
context 'when the message checksum matches the LFS object oid' do
it 'returns the file in a success hash' do
expect(subject).to eq(code: :ok, message: 'Success', file: lfs_object.file)
end
end
context 'when the message checksum does not match the LFS object oid' do
let(:message) { { checksum: 'foo' } }
it 'returns an error hash' do
expect(subject).to include(code: :not_found, message: "LFS object not found")
end
end
end
context 'when the LFS object does not have a file' do
let(:lfs_object) { create(:lfs_object) }
let(:message) { { checksum: lfs_object.oid } }
it 'returns an error hash' do
expect(subject).to include(code: :not_found, geo_code: 'FILE_NOT_FOUND', message: match(/LfsObject #\d+ file not found/))
end
it 'logs the missing file' do
expect(uploader).to receive(:log_error).with("Could not upload LFS object because it does not have a file", id: lfs_object.id)
subject
end
end
end
context 'when the LFS object does not exist' do
let(:uploader) { described_class.new(10000, {}) }
it 'returns an error hash' do
expect(subject).to eq(code: :not_found, message: 'LFS object not found')
end
end
end
end
......@@ -23,36 +23,68 @@ describe Gitlab::Geo::Transfer do
stub_current_geo_node(secondary_node)
end
it 'when the destination filename is a directory' do
transfer = described_class.new(:lfs, lfs_object.id, '/tmp', { sha256: lfs_object.id })
context 'when the destination filename is a directory' do
it 'returns a failed result' do
transfer = described_class.new(:lfs, lfs_object.id, '/tmp', { sha256: lfs_object.id })
expect(transfer.download_from_primary).to eq(nil)
result = transfer.download_from_primary
expect_result(result, success: false, bytes_downloaded: 0, primary_missing_file: false)
end
end
it 'when the HTTP response is successful' do
expect(FileUtils).to receive(:mv).with(anything, lfs_object.file.path).and_call_original
response = double(success?: true)
expect(Gitlab::HTTP).to receive(:get).and_yield(content.to_s).and_return(response)
context 'when the HTTP response is successful' do
it 'returns a successful result' do
expect(FileUtils).to receive(:mv).with(anything, lfs_object.file.path).and_call_original
response = double(:response, success?: true)
expect(Gitlab::HTTP).to receive(:get).and_yield(content.to_s).and_return(response)
expect(subject.download_from_primary).to eq(size)
stat = File.stat(lfs_object.file.path)
expect(stat.size).to eq(size)
expect(stat.mode & 0777).to eq(0666 - File.umask)
expect(File.binread(lfs_object.file.path)).to eq(content)
result = subject.download_from_primary
expect_result(result, success: true, bytes_downloaded: size, primary_missing_file: false)
stat = File.stat(lfs_object.file.path)
expect(stat.size).to eq(size)
expect(stat.mode & 0777).to eq(0666 - File.umask)
expect(File.binread(lfs_object.file.path)).to eq(content)
end
end
it 'when the HTTP response is unsuccessful' do
expect(FileUtils).not_to receive(:mv).with(anything, lfs_object.file.path).and_call_original
response = double(success?: false, code: 404, msg: 'No such file')
expect(Gitlab::HTTP).to receive(:get).and_return(response)
context 'when the HTTP response is unsuccessful' do
context 'when the HTTP response indicates a missing file on the primary' do
it 'returns a failed result indicating primary_missing_file' do
expect(FileUtils).not_to receive(:mv).with(anything, lfs_object.file.path).and_call_original
response = double(:response, success?: false, code: 404, msg: "No such file")
expect(File).to receive(:read).and_return("{\"geo_code\":\"#{Gitlab::Geo::FileUploader::FILE_NOT_FOUND_GEO_CODE}\"}")
expect(Gitlab::HTTP).to receive(:get).and_return(response)
result = subject.download_from_primary
expect_result(result, success: false, bytes_downloaded: 0, primary_missing_file: true)
end
end
context 'when the HTTP response does not indicate a missing file on the primary' do
it 'returns a failed result' do
expect(FileUtils).not_to receive(:mv).with(anything, lfs_object.file.path).and_call_original
response = double(:response, success?: false, code: 404, msg: 'No such file')
expect(Gitlab::HTTP).to receive(:get).and_return(response)
expect(subject.download_from_primary).to eq(-1)
result = subject.download_from_primary
expect_result(result, success: false, bytes_downloaded: 0)
end
end
end
it 'when Tempfile fails' do
expect(Tempfile).to receive(:new).and_raise(Errno::ENAMETOOLONG)
context 'when Tempfile fails' do
it 'returns a failed result' do
expect(Tempfile).to receive(:new).and_raise(Errno::ENAMETOOLONG)
expect(subject.download_from_primary).to eq(nil)
result = subject.download_from_primary
expect(result.success).to eq(false)
expect(result.bytes_downloaded).to eq(0)
end
end
context "invalid path" do
......@@ -62,8 +94,17 @@ describe Gitlab::Geo::Transfer do
allow(FileUtils).to receive(:mkdir_p) { raise Errno::EEXIST }
expect(subject).to receive(:log_error).with("unable to create directory /foo: File exists")
expect(subject.download_from_primary).to be_nil
result = subject.download_from_primary
expect(result.success).to eq(false)
expect(result.bytes_downloaded).to eq(0)
end
end
end
def expect_result(result, success:, bytes_downloaded:, primary_missing_file: nil)
expect(result.success).to eq(success)
expect(result.bytes_downloaded).to eq(bytes_downloaded)
expect(result.primary_missing_file).to eq(primary_missing_file)
end
end
......@@ -95,6 +95,21 @@ describe GeoNodeStatus, :geo do
end
end
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
describe '#attachments_synced_missing_on_primary_count', :delete do
it 'only counts successful syncs' do
create_list(:user, 3, avatar: fixture_file_upload(Rails.root + 'spec/fixtures/dk.png', 'image/png'))
uploads = Upload.all.pluck(:id)
create(:geo_file_registry, :avatar, file_id: uploads[0], missing_on_primary: true)
create(:geo_file_registry, :avatar, file_id: uploads[1])
create(:geo_file_registry, :avatar, file_id: uploads[2], success: false)
expect(subject.attachments_synced_missing_on_primary_count).to eq(1)
end
end
describe '#attachments_failed_count', :delete do
it 'counts failed avatars, attachment, personal snippets and files' do
# These two should be ignored
......@@ -158,7 +173,39 @@ describe GeoNodeStatus, :geo do
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
describe '#lfs_objects_failed', :delete do
describe '#lfs_objects_synced_count', :delete do
it 'counts synced LFS objects' do
# These four should be ignored
create(:geo_file_registry, success: false)
create(:geo_file_registry, :avatar)
create(:geo_file_registry, file_type: :attachment)
create(:geo_file_registry, :lfs, :with_file, success: false)
create(:geo_file_registry, :lfs, :with_file, success: true)
expect(subject.lfs_objects_synced_count).to eq(1)
end
end
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
describe '#lfs_objects_synced_missing_on_primary_count', :delete do
it 'counts LFS objects marked as synced due to file missing on the primary' do
# These four should be ignored
create(:geo_file_registry, success: false)
create(:geo_file_registry, :avatar, missing_on_primary: true)
create(:geo_file_registry, file_type: :attachment, missing_on_primary: true)
create(:geo_file_registry, :lfs, :with_file, success: false)
create(:geo_file_registry, :lfs, :with_file, success: true, missing_on_primary: true)
expect(subject.lfs_objects_synced_missing_on_primary_count).to eq(1)
end
end
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
describe '#lfs_objects_failed_count', :delete do
it 'counts failed LFS objects' do
# These four should be ignored
create(:geo_file_registry, success: false)
......@@ -207,17 +254,29 @@ describe GeoNodeStatus, :geo do
describe '#job_artifacts_synced_count', :delete do
it 'counts synced job artifacts' do
# These should be ignored
create(:geo_file_registry, success: false)
create(:geo_file_registry, :avatar, success: false)
create(:geo_file_registry, file_type: :attachment, success: true)
create(:geo_file_registry, :lfs, :with_file, success: true)
create(:geo_file_registry, success: true)
create(:geo_job_artifact_registry, :with_artifact, success: false)
create(:geo_job_artifact_registry, :with_artifact, success: true)
expect(subject.job_artifacts_synced_count).to eq(1)
end
end
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
describe '#job_artifacts_synced_missing_on_primary_count', :delete do
it 'counts job artifacts marked as synced due to file missing on the primary' do
# These should be ignored
create(:geo_file_registry, success: true, missing_on_primary: true)
create(:geo_job_artifact_registry, :with_artifact, success: true)
create(:geo_job_artifact_registry, :with_artifact, success: true, missing_on_primary: true)
expect(subject.job_artifacts_synced_missing_on_primary_count).to eq(1)
end
end
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
describe '#job_artifacts_failed_count', :delete do
......@@ -227,16 +286,17 @@ describe GeoNodeStatus, :geo do
create(:geo_file_registry, :avatar, success: false)
create(:geo_file_registry, file_type: :attachment, success: false)
create(:geo_job_artifact_registry, :with_artifact, success: true)
create(:geo_job_artifact_registry, :with_artifact, success: false)
expect(subject.job_artifacts_failed_count).to eq(1)
end
end
describe '#job_artifacts_synced_in_percentage' do
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
context 'when artifacts are available', :delete do
# Disable transactions via :delete method because a foreign table
# can't see changes inside a transaction of a different connection.
describe '#job_artifacts_synced_in_percentage', :delete do
context 'when artifacts are available' do
before do
[project_1, project_2, project_3, project_4].each_with_index do |project, index|
build = create(:ci_build, project: project)
......
......@@ -103,17 +103,30 @@ describe API::Geo do
expect(response).to have_gitlab_http_status(401)
end
context 'file file exists' do
it 'responds with 200 with X-Sendfile' do
get api("/geo/transfers/file/#{upload.id}"), nil, req_header
context 'when the Upload record exists' do
context 'when the file exists' do
it 'responds with 200 with X-Sendfile' do
get api("/geo/transfers/file/#{upload.id}"), nil, req_header
expect(response).to have_gitlab_http_status(200)
expect(response.headers['Content-Type']).to eq('application/octet-stream')
expect(response.headers['X-Sendfile']).to end_with('dk.png')
end
end
expect(response).to have_gitlab_http_status(200)
expect(response.headers['Content-Type']).to eq('application/octet-stream')
expect(response.headers['X-Sendfile']).to end_with('dk.png')
context 'file does not exist' do
it 'responds with 404 and a specific geo code' do
File.unlink(upload.absolute_path)
get api("/geo/transfers/file/#{upload.id}"), nil, req_header
expect(response).to have_gitlab_http_status(404)
expect(json_response['geo_code']).to eq(Gitlab::Geo::FileUploader::FILE_NOT_FOUND_GEO_CODE)
end
end
end
context 'file does not exist' do
context 'when the Upload record does not exist' do
it 'responds with 404' do
get api("/geo/transfers/file/100000"), nil, req_header
......@@ -139,13 +152,26 @@ describe API::Geo do
expect(response).to have_gitlab_http_status(401)
end
context 'LFS file exists' do
it 'responds with 200 with X-Sendfile' do
get api("/geo/transfers/lfs/#{lfs_object.id}"), nil, req_header
context 'LFS object exists' do
context 'file exists' do
it 'responds with 200 with X-Sendfile' do
get api("/geo/transfers/lfs/#{lfs_object.id}"), nil, req_header
expect(response).to have_gitlab_http_status(200)
expect(response.headers['Content-Type']).to eq('application/octet-stream')
expect(response.headers['X-Sendfile']).to eq(lfs_object.file.path)
expect(response).to have_gitlab_http_status(200)
expect(response.headers['Content-Type']).to eq('application/octet-stream')
expect(response.headers['X-Sendfile']).to eq(lfs_object.file.path)
end
end
context 'file does not exist' do
it 'responds with 404 and a specific geo code' do
File.unlink(lfs_object.file.path)
get api("/geo/transfers/lfs/#{lfs_object.id}"), nil, req_header
expect(response).to have_gitlab_http_status(404)
expect(json_response['geo_code']).to eq(Gitlab::Geo::FileUploader::FILE_NOT_FOUND_GEO_CODE)
end
end
end
......
......@@ -25,12 +25,15 @@ describe Geo::MetricsUpdateService, :geo do
lfs_objects_count: 100,
lfs_objects_synced_count: 50,
lfs_objects_failed_count: 12,
lfs_objects_synced_missing_on_primary_count: 4,
job_artifacts_count: 100,
job_artifacts_synced_count: 50,
job_artifacts_failed_count: 12,
job_artifacts_synced_missing_on_primary_count: 5,
attachments_count: 30,
attachments_synced_count: 30,
attachments_failed_count: 25,
attachments_synced_missing_on_primary_count: 6,
last_event_id: 2,
last_event_date: event_date,
cursor_last_event_id: 1,
......@@ -143,12 +146,15 @@ describe Geo::MetricsUpdateService, :geo do
expect(metric_value(:geo_lfs_objects)).to eq(100)
expect(metric_value(:geo_lfs_objects_synced)).to eq(50)
expect(metric_value(:geo_lfs_objects_failed)).to eq(12)
expect(metric_value(:geo_lfs_objects_synced_missing_on_primary)).to eq(4)
expect(metric_value(:geo_job_artifacts)).to eq(100)
expect(metric_value(:geo_job_artifacts_synced)).to eq(50)
expect(metric_value(:geo_job_artifacts_failed)).to eq(12)
expect(metric_value(:geo_job_artifacts_synced_missing_on_primary)).to eq(5)
expect(metric_value(:geo_attachments)).to eq(30)
expect(metric_value(:geo_attachments_synced)).to eq(30)
expect(metric_value(:geo_attachments_failed)).to eq(25)
expect(metric_value(:geo_attachments_synced_missing_on_primary)).to eq(6)
expect(metric_value(:geo_last_event_id)).to eq(2)
expect(metric_value(:geo_last_event_timestamp)).to eq(event_date.to_i)
expect(metric_value(:geo_cursor_last_event_id)).to eq(1)
......
......@@ -2,7 +2,7 @@ require 'spec_helper'
describe Geo::FileDownloadWorker, :geo do
describe '#perform' do
it 'instantiates and executes FileDownloadService, and converts object_type to a symbol' do
it 'instantiates and executes FileDownloadService' do
service = double(:service)
expect(service).to receive(:execute)
expect(Geo::FileDownloadService).to receive(:new).with(:job_artifact, 1).and_return(service)
......
......@@ -39,14 +39,14 @@ describe Geo::MigratedLocalFilesCleanUpWorker, :geo do
end
it 'schedules job for file stored remotely and synced locally' do
expect(worker).to receive(:schedule_job).with(:lfs, lfs_object_remote.id)
expect(worker).to receive(:schedule_job).with('lfs', lfs_object_remote.id)
expect(worker).not_to receive(:schedule_job).with(anything, lfs_object_local.id)
worker.perform
end
it 'schedules worker for file stored remotely and synced locally' do
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with(:lfs, lfs_object_remote.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('lfs', lfs_object_remote.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with(anything, lfs_object_local.id)
worker.perform
......@@ -127,14 +127,14 @@ describe Geo::MigratedLocalFilesCleanUpWorker, :geo do
end
it 'schedules job for artifact stored remotely and synced locally' do
expect(worker).to receive(:schedule_job).with(:job_artifact, job_artifact_remote.id)
expect(worker).to receive(:schedule_job).with('job_artifact', job_artifact_remote.id)
expect(worker).not_to receive(:schedule_job).with(anything, job_artifact_local.id)
worker.perform
end
it 'schedules worker for artifact stored remotely and synced locally' do
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with(:job_artifact, job_artifact_remote.id)
expect(Geo::FileRegistryRemovalWorker).to receive(:perform_async).with('job_artifact', job_artifact_remote.id)
expect(Geo::FileRegistryRemovalWorker).not_to receive(:perform_async).with(anything, job_artifact_local.id)
worker.perform
......
require 'spec_helper'
describe Geo::Scheduler::SchedulerWorker do
describe Geo::Scheduler::SchedulerWorker, :geo do
subject { described_class.new }
it 'includes ::Gitlab::Geo::LogHelpers' do
expect(described_class).to include_module(::Gitlab::Geo::LogHelpers)
end
it 'needs many other specs'
describe '#take_batch' do
let(:a) { [[2, :lfs], [3, :lfs]] }
let(:b) { [] }
let(:c) { [[3, :job_artifact], [8, :job_artifact], [9, :job_artifact]] }
context 'without batch_size' do
it 'returns a batch of jobs' do
expect(subject).to receive(:db_retrieve_batch_size).and_return(4)
expect(subject.send(:take_batch, a, b, c)).to eq([
[3, :job_artifact],
[2, :lfs],
[8, :job_artifact],
[3, :lfs]
])
end
end
context 'with batch_size' do
it 'returns a batch of jobs' do
expect(subject.send(:take_batch, a, b, c, batch_size: 2)).to eq([
[3, :job_artifact],
[2, :lfs]
])
end
end
end
describe '#interleave' do
# Notice ties are resolved by taking the "first" tied element
it 'interleaves 2 arrays' do
a = %w{1 2 3}
b = %w{A B C}
expect(subject.send(:interleave, a, b)).to eq(%w{1 A 2 B 3 C})
end
# Notice there are no ties in this call
it 'interleaves 2 arrays with a longer second array' do
a = %w{1 2}
b = %w{A B C}
expect(subject.send(:interleave, a, b)).to eq(%w{A 1 B 2 C})
end
it 'interleaves 2 arrays with a longer first array' do
a = %w{1 2 3}
b = %w{A B}
expect(subject.send(:interleave, a, b)).to eq(%w{1 A 2 B 3})
end
it 'interleaves 3 arrays' do
a = %w{1 2 3}
b = %w{A B C}
c = %w{i ii iii}
expect(subject.send(:interleave, a, b, c)).to eq(%w{1 A i 2 B ii 3 C iii})
end
it 'interleaves 3 arrays of unequal length' do
a = %w{1 2}
b = %w{A}
c = %w{i ii iii iiii}
expect(subject.send(:interleave, a, b, c)).to eq(%w{i 1 ii A iii 2 iiii})
end
end
end
......@@ -109,10 +109,51 @@ describe Upload do
expect(upload).to exist
end
it 'returns false when the file does not exist' do
upload = described_class.new(path: "#{__FILE__}-nope")
context 'when the file does not exist' do
it 'returns false' do
upload = described_class.new(path: "#{__FILE__}-nope")
expect(upload).not_to exist
expect(upload).not_to exist
end
context 'when the record is persisted' do
it 'sends a message to Sentry' do
upload = create(:upload, :issuable_upload)
expect(Gitlab::Sentry).to receive(:enabled?).and_return(true)
expect(Raven).to receive(:capture_message).with("Upload file does not exist", extra: upload.attributes)
upload.exist?
end
it 'increments a metric counter to signal a problem' do
upload = create(:upload, :issuable_upload)
counter = double(:counter)
expect(counter).to receive(:increment)
expect(Gitlab::Metrics).to receive(:counter).with(:upload_file_does_not_exist_total, 'The number of times an upload record could not find its file').and_return(counter)
upload.exist?
end
end
context 'when the record is not persisted' do
it 'does not send a message to Sentry' do
upload = described_class.new(path: "#{__FILE__}-nope")
expect(Raven).not_to receive(:capture_message)
upload.exist?
end
it 'does not increment a metric counter' do
upload = described_class.new(path: "#{__FILE__}-nope")
expect(Gitlab::Metrics).not_to receive(:counter)
upload.exist?
end
end
end
end
......
......@@ -10,6 +10,13 @@ RSpec::Matchers.define :match_ids do |*expected|
'matches elements by ids'
end
failure_message do
actual_ids = map_ids(actual)
expected_ids = map_ids(expected)
"expected IDs #{actual_ids} in:\n\n #{actual.inspect}\n\nto match IDs #{expected_ids} in:\n\n #{expected.inspect}"
end
def map_ids(elements)
elements = elements.flatten if elements.respond_to?(:flatten)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment