Commit 553e0207 authored by Douglas Barbosa Alexandre's avatar Douglas Barbosa Alexandre

Merge branch...

Merge branch '327400-geo-make-job-artifact-replication-use-self-service-framework-behind-feature-flag' into 'master'

Geo: Make Job Artifacts replication use Self-Service Framework

See merge request gitlab-org/gitlab!75264
parents 929f051b 4fbccfcb
# frozen_string_literal: true
class CreateJobArtifactStates < Gitlab::Database::Migration[1.0]
VERIFICATION_STATE_INDEX_NAME = "index_job_artifact_states_on_verification_state"
PENDING_VERIFICATION_INDEX_NAME = "index_job_artifact_states_pending_verification"
FAILED_VERIFICATION_INDEX_NAME = "index_job_artifact_states_failed_verification"
NEEDS_VERIFICATION_INDEX_NAME = "index_job_artifact_states_needs_verification"
enable_lock_retries!
def up
create_table :ci_job_artifact_states, id: false do |t|
t.datetime_with_timezone :verification_started_at
t.datetime_with_timezone :verification_retry_at
t.datetime_with_timezone :verified_at
t.references :job_artifact, primary_key: true, null: false, foreign_key: { on_delete: :cascade, to_table: :ci_job_artifacts }
t.integer :verification_state, default: 0, limit: 2, null: false
t.integer :verification_retry_count, limit: 2
t.binary :verification_checksum, using: 'verification_checksum::bytea'
t.text :verification_failure, limit: 255
t.index :verification_state, name: VERIFICATION_STATE_INDEX_NAME
t.index :verified_at, where: "(verification_state = 0)", order: { verified_at: 'ASC NULLS FIRST' }, name: PENDING_VERIFICATION_INDEX_NAME
t.index :verification_retry_at, where: "(verification_state = 3)", order: { verification_retry_at: 'ASC NULLS FIRST' }, name: FAILED_VERIFICATION_INDEX_NAME
t.index :verification_state, where: "(verification_state = 0 OR verification_state = 3)", name: NEEDS_VERIFICATION_INDEX_NAME
end
end
def down
drop_table :ci_job_artifact_states
end
end
d618c28360f7716807e9727566019e269963d85164cf2f306ec9692d3b037802
\ No newline at end of file
......@@ -11863,6 +11863,27 @@ CREATE SEQUENCE ci_instance_variables_id_seq
ALTER SEQUENCE ci_instance_variables_id_seq OWNED BY ci_instance_variables.id;
CREATE TABLE ci_job_artifact_states (
verification_started_at timestamp with time zone,
verification_retry_at timestamp with time zone,
verified_at timestamp with time zone,
job_artifact_id bigint NOT NULL,
verification_state smallint DEFAULT 0 NOT NULL,
verification_retry_count smallint,
verification_checksum bytea,
verification_failure text,
CONSTRAINT check_df832b66ea CHECK ((char_length(verification_failure) <= 255))
);
CREATE SEQUENCE ci_job_artifact_states_job_artifact_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE ci_job_artifact_states_job_artifact_id_seq OWNED BY ci_job_artifact_states.job_artifact_id;
CREATE TABLE ci_job_artifacts (
project_id integer NOT NULL,
file_type integer NOT NULL,
......@@ -21661,6 +21682,8 @@ ALTER TABLE ONLY ci_group_variables ALTER COLUMN id SET DEFAULT nextval('ci_grou
ALTER TABLE ONLY ci_instance_variables ALTER COLUMN id SET DEFAULT nextval('ci_instance_variables_id_seq'::regclass);
ALTER TABLE ONLY ci_job_artifact_states ALTER COLUMN job_artifact_id SET DEFAULT nextval('ci_job_artifact_states_job_artifact_id_seq'::regclass);
ALTER TABLE ONLY ci_job_artifacts ALTER COLUMN id SET DEFAULT nextval('ci_job_artifacts_id_seq'::regclass);
ALTER TABLE ONLY ci_job_token_project_scope_links ALTER COLUMN id SET DEFAULT nextval('ci_job_token_project_scope_links_id_seq'::regclass);
......@@ -23138,6 +23161,9 @@ ALTER TABLE ONLY ci_group_variables
ALTER TABLE ONLY ci_instance_variables
ADD CONSTRAINT ci_instance_variables_pkey PRIMARY KEY (id);
ALTER TABLE ONLY ci_job_artifact_states
ADD CONSTRAINT ci_job_artifact_states_pkey PRIMARY KEY (job_artifact_id);
ALTER TABLE ONLY ci_job_artifacts
ADD CONSTRAINT ci_job_artifacts_pkey PRIMARY KEY (id);
......@@ -25767,6 +25793,8 @@ CREATE UNIQUE INDEX index_ci_group_variables_on_group_id_and_key_and_environment
CREATE UNIQUE INDEX index_ci_instance_variables_on_key ON ci_instance_variables USING btree (key);
CREATE INDEX index_ci_job_artifact_states_on_job_artifact_id ON ci_job_artifact_states USING btree (job_artifact_id);
CREATE INDEX index_ci_job_artifacts_for_terraform_reports ON ci_job_artifacts USING btree (project_id, id) WHERE (file_type = 18);
CREATE INDEX index_ci_job_artifacts_id_for_terraform_reports ON ci_job_artifacts USING btree (id) WHERE (file_type = 18);
......@@ -26755,6 +26783,14 @@ CREATE INDEX index_jira_imports_on_user_id ON jira_imports USING btree (user_id)
CREATE INDEX index_jira_tracker_data_on_service_id ON jira_tracker_data USING btree (service_id);
CREATE INDEX index_job_artifact_states_failed_verification ON ci_job_artifact_states USING btree (verification_retry_at NULLS FIRST) WHERE (verification_state = 3);
CREATE INDEX index_job_artifact_states_needs_verification ON ci_job_artifact_states USING btree (verification_state) WHERE ((verification_state = 0) OR (verification_state = 3));
CREATE INDEX index_job_artifact_states_on_verification_state ON ci_job_artifact_states USING btree (verification_state);
CREATE INDEX index_job_artifact_states_pending_verification ON ci_job_artifact_states USING btree (verified_at NULLS FIRST) WHERE (verification_state = 0);
CREATE INDEX index_keys_on_expires_at_and_id ON keys USING btree (date(timezone('UTC'::text, expires_at)), id) WHERE (expiry_notification_delivered_at IS NULL);
CREATE UNIQUE INDEX index_keys_on_fingerprint ON keys USING btree (fingerprint);
......@@ -30968,6 +31004,9 @@ ALTER TABLE ONLY application_settings
ALTER TABLE ONLY clusters_kubernetes_namespaces
ADD CONSTRAINT fk_rails_7e7688ecaf FOREIGN KEY (cluster_id) REFERENCES clusters(id) ON DELETE CASCADE;
ALTER TABLE ONLY ci_job_artifact_states
ADD CONSTRAINT fk_rails_80a9cba3b2 FOREIGN KEY (job_artifact_id) REFERENCES ci_job_artifacts(id) ON DELETE CASCADE;
ALTER TABLE ONLY approval_merge_request_rules_users
ADD CONSTRAINT fk_rails_80e6801803 FOREIGN KEY (approval_merge_request_rule_id) REFERENCES approval_merge_request_rules(id) ON DELETE CASCADE;
......@@ -39,7 +39,7 @@ verification methods:
| Blobs | User uploads _(object storage)_ | Geo with API/Managed (*2*) | _Not implemented_ |
| Blobs | LFS objects _(file system)_ | Geo with API | SHA256 checksum |
| Blobs | LFS objects _(object storage)_ | Geo with API/Managed (*2*) | _Not implemented_ |
| Blobs | CI job artifacts _(file system)_ | Geo with API | _Not implemented_ |
| Blobs | CI job artifacts _(file system)_ | Geo with API | SHA256 checksum |
| Blobs | CI job artifacts _(object storage)_ | Geo with API/Managed (*2*) | _Not implemented_ |
| Blobs | Archived CI build traces _(file system)_ | Geo with API | _Not implemented_ |
| Blobs | Archived CI build traces _(object storage)_ | Geo with API/Managed (*2*) | _Not implemented_ |
......
......@@ -270,6 +270,16 @@ configuration option in `gitlab.yml`. These metrics are served from the
| `geo_pages_deployments_verification_total` | Gauge | 14.6 | Number of pages deployments verifications tried on secondary | `url` |
| `geo_pages_deployments_verified` | Gauge | 14.6 | Number of pages deployments verified on secondary | `url` |
| `geo_pages_deployments_verification_failed` | Gauge | 14.6 | Number of pages deployments verifications failed on secondary | `url` |
| `geo_job_artifacts` | Gauge | 14.8 | Number of job artifacts on primary | `url` |
| `geo_job_artifacts_checksum_total` | Gauge | 14.8 | Number of job artifacts tried to checksum on primary | `url` |
| `geo_job_artifacts_checksummed` | Gauge | 14.8 | Number of job artifacts successfully checksummed on primary | `url` |
| `geo_job_artifacts_checksum_failed` | Gauge | 14.8 | Number of job artifacts failed to calculate the checksum on primary | `url` |
| `geo_job_artifacts_synced` | Gauge | 14.8 | Number of syncable job artifacts synced on secondary | `url` |
| `geo_job_artifacts_failed` | Gauge | 14.8 | Number of syncable job artifacts failed to sync on secondary | `url` |
| `geo_job_artifacts_registry` | Gauge | 14.8 | Number of job artifacts in the registry | `url` |
| `geo_job_artifacts_verification_total` | Gauge | 14.8 | Number of job artifacts verifications tried on secondary | `url` |
| `geo_job_artifacts_verified` | Gauge | 14.8 | Number of job artifacts verified on secondary | `url` |
| `geo_job_artifacts_verification_failed` | Gauge | 14.8 | Number of job artifacts verifications failed on secondary | `url` |
| `limited_capacity_worker_running_jobs` | Gauge | 13.5 | Number of running jobs | `worker` |
| `limited_capacity_worker_max_running_jobs` | Gauge | 13.5 | Maximum number of running jobs | `worker` |
| `limited_capacity_worker_remaining_work_count` | Gauge | 13.5 | Number of jobs waiting to be enqueued | `worker` |
......
......@@ -467,6 +467,19 @@ Example response:
"uploads_verified_count": null,
"uploads_verification_failed_count": null,
"uploads_verified_in_percentage": "0.00%",
"job_artifacts_count": 5,
"job_artifacts_checksum_total_count": 5,
"job_artifacts_checksummed_count": 5,
"job_artifacts_checksum_failed_count": 0,
"job_artifacts_synced_count": 5,
"job_artifacts_failed_count": 0,
"job_artifacts_registry_count": 5,
"job_artifacts_verification_total_count": 5,
"job_artifacts_verified_count": 5,
"job_artifacts_verification_failed_count": 0,
"job_artifacts_synced_in_percentage": "100.00%",
"job_artifacts_verified_in_percentage": "100.00%",
"job_artifacts_synced_missing_on_primary_count": 0,
},
{
"geo_node_id": 2,
......@@ -623,6 +636,19 @@ Example response:
"uploads_verified_count": null,
"uploads_verification_failed_count": null,
"uploads_verified_in_percentage": "0.00%",
"job_artifacts_count": 5,
"job_artifacts_checksum_total_count": 5,
"job_artifacts_checksummed_count": 5,
"job_artifacts_checksum_failed_count": 0,
"job_artifacts_synced_count": 5,
"job_artifacts_failed_count": 0,
"job_artifacts_registry_count": 5,
"job_artifacts_verification_total_count": 5,
"job_artifacts_verified_count": 5,
"job_artifacts_verification_failed_count": 0,
"job_artifacts_synced_in_percentage": "100.00%",
"job_artifacts_verified_in_percentage": "100.00%",
"job_artifacts_synced_missing_on_primary_count": 0,
}
]
```
......@@ -776,6 +802,19 @@ Example response:
"uploads_verified_count": null,
"uploads_verification_failed_count": null,
"uploads_verified_in_percentage": "0.00%",
"job_artifacts_count": 5,
"job_artifacts_checksum_total_count": 5,
"job_artifacts_checksummed_count": 5,
"job_artifacts_checksum_failed_count": 0,
"job_artifacts_synced_count": 5,
"job_artifacts_failed_count": 0,
"job_artifacts_registry_count": 5,
"job_artifacts_verification_total_count": 5,
"job_artifacts_verified_count": 5,
"job_artifacts_verification_failed_count": 0,
"job_artifacts_synced_in_percentage": "100.00%",
"job_artifacts_verified_in_percentage": "100.00%",
"job_artifacts_synced_missing_on_primary_count": 0,
}
```
......
......@@ -6803,6 +6803,29 @@ The edge type for [`JiraProject`](#jiraproject).
| <a id="jiraprojectedgecursor"></a>`cursor` | [`String!`](#string) | A cursor for use in pagination. |
| <a id="jiraprojectedgenode"></a>`node` | [`JiraProject`](#jiraproject) | The item at the end of the edge. |
#### `JobArtifactRegistryConnection`
The connection type for [`JobArtifactRegistry`](#jobartifactregistry).
##### Fields
| Name | Type | Description |
| ---- | ---- | ----------- |
| <a id="jobartifactregistryconnectionedges"></a>`edges` | [`[JobArtifactRegistryEdge]`](#jobartifactregistryedge) | A list of edges. |
| <a id="jobartifactregistryconnectionnodes"></a>`nodes` | [`[JobArtifactRegistry]`](#jobartifactregistry) | A list of nodes. |
| <a id="jobartifactregistryconnectionpageinfo"></a>`pageInfo` | [`PageInfo!`](#pageinfo) | Information to aid in pagination. |
#### `JobArtifactRegistryEdge`
The edge type for [`JobArtifactRegistry`](#jobartifactregistry).
##### Fields
| Name | Type | Description |
| ---- | ---- | ----------- |
| <a id="jobartifactregistryedgecursor"></a>`cursor` | [`String!`](#string) | A cursor for use in pagination. |
| <a id="jobartifactregistryedgenode"></a>`node` | [`JobArtifactRegistry`](#jobartifactregistry) | The item at the end of the edge. |
#### `JobNeedUnionConnection`
The connection type for [`JobNeedUnion`](#jobneedunion).
......@@ -10780,6 +10803,22 @@ four standard [pagination arguments](#connection-pagination-arguments):
| ---- | ---- | ----------- |
| <a id="geonodegroupwikirepositoryregistriesids"></a>`ids` | [`[ID!]`](#id) | Filters registries by their ID. |
##### `GeoNode.jobArtifactRegistries`
Find Job Artifact registries on this Geo node Available only when feature flag `geo_job_artifact_replication` is enabled. This flag is disabled by default, because the feature is experimental and is subject to change without notice.
Returns [`JobArtifactRegistryConnection`](#jobartifactregistryconnection).
This field returns a [connection](#connections). It accepts the
four standard [pagination arguments](#connection-pagination-arguments):
`before: String`, `after: String`, `first: Int`, `last: Int`.
###### Arguments
| Name | Type | Description |
| ---- | ---- | ----------- |
| <a id="geonodejobartifactregistriesids"></a>`ids` | [`[ID!]`](#id) | Filters registries by their ID. |
##### `GeoNode.lfsObjectRegistries`
Find LFS object registries on this Geo node.
......@@ -11970,6 +12009,23 @@ four standard [pagination arguments](#connection-pagination-arguments):
| <a id="jirauserjiradisplayname"></a>`jiraDisplayName` | [`String!`](#string) | Display name of the Jira user. |
| <a id="jirauserjiraemail"></a>`jiraEmail` | [`String`](#string) | Email of the Jira user, returned only for users with public emails. |
### `JobArtifactRegistry`
Represents the Geo replication and verification state of a job_artifact.
#### Fields
| Name | Type | Description |
| ---- | ---- | ----------- |
| <a id="jobartifactregistryartifactid"></a>`artifactId` | [`ID!`](#id) | ID of the Job Artifact. |
| <a id="jobartifactregistrycreatedat"></a>`createdAt` | [`Time`](#time) | Timestamp when the JobArtifactRegistry was created. |
| <a id="jobartifactregistryid"></a>`id` | [`ID!`](#id) | ID of the JobArtifactRegistry. |
| <a id="jobartifactregistrylastsyncfailure"></a>`lastSyncFailure` | [`String`](#string) | Error message during sync of the JobArtifactRegistry. |
| <a id="jobartifactregistrylastsyncedat"></a>`lastSyncedAt` | [`Time`](#time) | Timestamp of the most recent successful sync of the JobArtifactRegistry. |
| <a id="jobartifactregistryretryat"></a>`retryAt` | [`Time`](#time) | Timestamp after which the JobArtifactRegistry should be resynced. |
| <a id="jobartifactregistryretrycount"></a>`retryCount` | [`Int`](#int) | Number of consecutive failed sync attempts of the JobArtifactRegistry. |
| <a id="jobartifactregistrystate"></a>`state` | [`RegistryState`](#registrystate) | Sync state of the JobArtifactRegistry. |
### `JobPermissions`
#### Fields
# frozen_string_literal: true
module Geo
class JobArtifactLegacyRegistryFinder < FileRegistryFinder
def registry_class
Geo::JobArtifactRegistry
end
end
end
......@@ -2,8 +2,6 @@
module Geo
class JobArtifactRegistryFinder < FileRegistryFinder
def registry_class
Geo::JobArtifactRegistry
end
include FrameworkRegistryFinder
end
end
# frozen_string_literal: true
module Resolvers
module Geo
class JobArtifactRegistriesResolver < BaseResolver
type ::Types::Geo::GeoNodeType.connection_type, null: true
include RegistriesResolver
end
end
end
......@@ -58,6 +58,11 @@ module Types
null: true,
resolver: ::Resolvers::Geo::UploadRegistriesResolver,
description: 'Find Upload registries on this Geo node'
field :job_artifact_registries, ::Types::Geo::JobArtifactRegistryType.connection_type,
null: true,
resolver: ::Resolvers::Geo::JobArtifactRegistriesResolver,
description: 'Find Job Artifact registries on this Geo node',
feature_flag: :geo_job_artifact_replication
end
end
end
# frozen_string_literal: true
module Types
module Geo
# rubocop:disable Graphql/AuthorizeTypes because it is included
class JobArtifactRegistryType < BaseObject
include ::Types::Geo::RegistryType
graphql_name 'JobArtifactRegistry'
description 'Represents the Geo replication and verification state of a job_artifact.'
field :artifact_id, GraphQL::Types::ID, null: false, description: 'ID of the Job Artifact.'
end
end
end
......@@ -197,14 +197,6 @@ module EE
name: 'wiki',
name_plural: 'wikis'
},
{
data_type: 'blob',
data_type_title: _('File'),
title: _('Job artifact'),
title_plural: _('Job artifacts'),
name: 'job_artifact',
name_plural: 'job_artifacts'
},
{
data_type: 'blob',
data_type_title: _('File'),
......@@ -224,6 +216,17 @@ module EE
}
]
if ::Geo::JobArtifactReplicator.disabled?
replicable_types.insert(2, {
data_type: 'blob',
data_type_title: _('File'),
title: _('Job artifact'),
title_plural: _('Job artifacts'),
name: 'job_artifact',
name_plural: 'job_artifacts'
})
end
# Adds all the SSF Data Types automatically
enabled_replicator_classes.each do |replicator_class|
replicable_types.push(
......
......@@ -12,6 +12,13 @@ module EE
SECURITY_REPORT_FILE_TYPES = %w[sast secret_detection dependency_scanning container_scanning cluster_image_scanning dast coverage_fuzzing api_fuzzing].freeze
prepended do
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator ::Geo::JobArtifactReplicator
has_one :job_artifact_state, autosave: false, inverse_of: :job_artifact, class_name: '::Geo::JobArtifactState'
# After destroy callbacks are often skipped because of FastDestroyAll.
# All destroy callbacks should be implemented in `Ci::JobArtifacts::DestroyBatchService`
# See https://gitlab.com/gitlab-org/gitlab/-/issues/297472
......@@ -66,7 +73,26 @@ module EE
with_file_types(API_FUZZING_REPORT_TYPES)
end
scope :with_files_stored_locally, -> { where(file_store: ::ObjectStorage::Store::LOCAL) }
scope :with_files_stored_remotely, -> { where(file_store: ::ObjectStorage::Store::REMOTE) }
scope :with_verification_state, ->(state) { joins(:job_artifact_state).where(verification_arel_table[:verification_state].eq(verification_state_value(state))) }
scope :checksummed, -> { joins(:job_artifact_state).where.not(verification_arel_table[:verification_checksum].eq(nil)) }
scope :not_checksummed, -> { joins(:job_artifact_state).where(verification_arel_table[:verification_checksum].eq(nil)) }
scope :available_verifiables, -> { joins(:job_artifact_state) }
delegate :validate_schema?, to: :job
delegate :verification_retry_at, :verification_retry_at=,
:verified_at, :verified_at=,
:verification_checksum, :verification_checksum=,
:verification_failure, :verification_failure=,
:verification_retry_count, :verification_retry_count=,
:verification_state=, :verification_state,
:verification_started_at=, :verification_started_at,
to: :job_artifact_state
after_save :save_verification_details
end
class_methods do
......@@ -79,6 +105,19 @@ module EE
super
end
override :verification_state_table_class
def verification_state_table_class
::Geo::JobArtifactState
end
end
def job_artifact_state
super || build_job_artifact_state
end
def verification_state_object
job_artifact_state
end
def log_geo_deleted_event
......
......@@ -2,14 +2,18 @@
class Geo::JobArtifactRegistry < Geo::BaseRegistry
include Geo::Syncable
include ::Geo::ReplicableRegistry
include ::Geo::VerifiableRegistry
MODEL_CLASS = ::Ci::JobArtifact
MODEL_FOREIGN_KEY = :artifact_id
belongs_to :job_artifact, class_name: 'Ci::JobArtifact', foreign_key: :artifact_id
# When false, RegistryConsistencyService will frequently check the end of the
# table to quickly handle new replicables.
def self.has_create_events?
false
::Geo::JobArtifactReplicator.enabled?
end
# TODO: remove once `success` column has a default value set
......@@ -31,4 +35,42 @@ class Geo::JobArtifactRegistry < Geo::BaseRegistry
def self.delete_worker_class
::Geo::FileRegistryRemovalWorker
end
# TODO Remove this when enabling geo_job_artifact_replication by default
override :registry_consistency_worker_enabled?
def self.registry_consistency_worker_enabled?
true
end
def self.failed
if ::Geo::JobArtifactReplicator.enabled?
with_state(:failed)
else
where(success: false).where.not(retry_count: nil)
end
end
def self.never_attempted_sync
if ::Geo::JobArtifactReplicator.enabled?
pending.where(last_synced_at: nil)
else
where(success: false, retry_count: nil)
end
end
def self.retry_due
if ::Geo::JobArtifactReplicator.enabled?
where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current)))
else
where('retry_at is NULL OR retry_at < ?', Time.current)
end
end
def self.synced
if ::Geo::JobArtifactReplicator.enabled?
with_state(:synced).or(where(success: true))
else
where(success: true)
end
end
end
# frozen_string_literal: true
module Geo
class JobArtifactState < Ci::ApplicationRecord
include EachBatch
self.primary_key = :job_artifact_id
belongs_to :job_artifact, inverse_of: :job_artifact_state, class_name: 'Ci::JobArtifact'
end
end
......@@ -40,8 +40,8 @@ class GeoNodeStatus < ApplicationRecord
"#{replicable_class.replicable_name_plural}_checksummed_count".to_sym => "Number of #{replicable_class.replicable_title_plural} checksummed on the primary",
"#{replicable_class.replicable_name_plural}_checksum_failed_count".to_sym => "Number of #{replicable_class.replicable_title_plural} failed to checksum on primary",
"#{replicable_class.replicable_name_plural}_synced_count".to_sym => "Number of #{replicable_class.replicable_title_plural} in the registry",
"#{replicable_class.replicable_name_plural}_failed_count".to_sym => "Number of #{replicable_class.replicable_title_plural} synced on secondary",
"#{replicable_class.replicable_name_plural}_registry_count".to_sym => "Number of #{replicable_class.replicable_title_plural} failed to sync on secondary",
"#{replicable_class.replicable_name_plural}_failed_count".to_sym => "Number of #{replicable_class.replicable_title_plural} failed on secondary",
"#{replicable_class.replicable_name_plural}_registry_count".to_sym => "Number of #{replicable_class.replicable_title_plural} synced to sync on secondary",
"#{replicable_class.replicable_name_plural}_verification_total_count".to_sym => "Number of #{replicable_class.replicable_title_plural} available to verify on secondary",
"#{replicable_class.replicable_name_plural}_verified_count".to_sym => "Number of #{replicable_class.replicable_title_plural} verified on the secondary",
"#{replicable_class.replicable_name_plural}_verification_failed_count".to_sym => "Number of #{replicable_class.replicable_title_plural} failed to verify on secondary"
......@@ -67,9 +67,6 @@ class GeoNodeStatus < ApplicationRecord
wikis_synced_count
wikis_failed_count
job_artifacts_replication_enabled
job_artifacts_count
job_artifacts_synced_count
job_artifacts_failed_count
repositories_verified_count
repositories_verification_failed_count
repositories_verification_total_count
......@@ -127,10 +124,6 @@ class GeoNodeStatus < ApplicationRecord
wikis_verification_failed_count: 'Number of wikis failed to verify on secondary',
wikis_checksum_mismatch_count: 'Number of wikis that checksum mismatch on secondary',
job_artifacts_replication_enabled: 'Boolean denoting if replication is enabled for Job Artifacts',
job_artifacts_count: 'Total number of syncable job artifacts available on primary',
job_artifacts_synced_count: 'Number of syncable job artifacts synced on secondary',
job_artifacts_failed_count: 'Number of syncable job artifacts failed to sync on secondary',
job_artifacts_registry_count: 'Number of job artifacts in the registry',
job_artifacts_synced_missing_on_primary_count: 'Number of job artifacts marked as synced due to the file missing on the primary',
replication_slots_count: 'Total number of replication slots on the primary',
replication_slots_used_count: 'Number of replication slots in use on the primary',
......@@ -377,7 +370,6 @@ class GeoNodeStatus < ApplicationRecord
attr_in_percentage :wikis_synced, :wikis_synced_count, :wikis_count
attr_in_percentage :wikis_checksummed, :wikis_checksummed_count, :wikis_count
attr_in_percentage :wikis_verified, :wikis_verified_count, :wikis_count
attr_in_percentage :job_artifacts_synced, :job_artifacts_synced_count, :job_artifacts_count
attr_in_percentage :replication_slots_used, :replication_slots_used_count, :replication_slots_count
attr_in_percentage :container_repositories_synced, :container_repositories_synced_count, :container_repositories_count
attr_in_percentage :design_repositories_synced, :design_repositories_synced_count, :design_repositories_count
......@@ -468,6 +460,7 @@ class GeoNodeStatus < ApplicationRecord
def load_job_artifacts_data
return unless job_artifacts_replication_enabled
return if ::Geo::JobArtifactReplicator.enabled?
self.job_artifacts_count = job_artifacts_finder.registry_count
self.job_artifacts_synced_count = job_artifacts_finder.synced_count
......@@ -576,7 +569,7 @@ class GeoNodeStatus < ApplicationRecord
end
def job_artifacts_finder
@job_artifacts_finder ||= Geo::JobArtifactRegistryFinder.new
@job_artifacts_finder ||= Geo::JobArtifactLegacyRegistryFinder.new
end
def container_registry_finder
......
# frozen_string_literal: true
module Geo
class JobArtifactReplicator < Gitlab::Geo::Replicator
include ::Geo::BlobReplicatorStrategy
extend ::Gitlab::Utils::Override
def self.model
::Ci::JobArtifact
end
def carrierwave_uploader
model_record.file
end
# The feature flag follows the format `geo_#{replicable_name}_replication`,
# so here it would be `geo_job_artifact_replication`
def self.replication_enabled_by_default?
false
end
override :verification_feature_flag_enabled?
def self.verification_feature_flag_enabled?
# We are adding verification at the same time as replication, so we
# don't need to toggle verification separately from replication. When
# the replication feature flag is off, then verification is also off
# (see `VerifiableReplicator.verification_enabled?`)
true
end
end
end
......@@ -51,7 +51,7 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord
def file_registry
strong_memoize(:file_registry) do
if job_artifact?
if job_artifact? && ::Geo::JobArtifactReplicator.disabled?
::Geo::JobArtifactRegistry.find_by(artifact_id: object_db_id)
elsif replicator
replicator.registry
......
# frozen_string_literal: true
module Geo
#
# This Worker is deprecated and it only handles the Job Artifacts now
#
class FileDownloadDispatchWorker < Geo::Scheduler::Secondary::SchedulerWorker # rubocop:disable Scalability/IdempotentWorker
# rubocop:disable Scalability/CronWorkerContext
# This worker does not perform work scoped to a context
include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext
def perform
if ::Geo::JobArtifactReplicator.enabled?
log_info('JobArtifact replication is handled by Geo self service framework')
return
end
super
end
private
# Cannot utilise backoff because there are no events currently being
......@@ -26,6 +38,8 @@ module Geo
end
def schedule_job(object_type, object_db_id)
return if ::Geo::JobArtifactReplicator.enabled?
job_id = FileDownloadWorker.with_status.perform_async(object_type.to_s, object_db_id)
{ id: object_db_id, type: object_type, job_id: job_id } if job_id
......
......@@ -8,7 +8,7 @@ module Geo
FILE_SERVICE_OBJECT_TYPE = :job_artifact
def registry_finder
@registry_finder ||= Geo::JobArtifactRegistryFinder.new
@registry_finder ||= Geo::JobArtifactLegacyRegistryFinder.new
end
end
end
......
---
name: geo_job_artifact_replication
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/issues/327400
rollout_issue_url:
milestone: '14.8'
type: development
group: group::geo
default_enabled: false
# frozen_string_literal: true
class PrepareJobArtifactRegistryForSsf < Gitlab::Database::Migration[1.0]
def change
change_column_default :job_artifact_registry, :retry_count, from: nil, to: 0
add_column :job_artifact_registry, :last_synced_at, :datetime_with_timezone
add_column :job_artifact_registry, :verified_at, :datetime_with_timezone
add_column :job_artifact_registry, :verification_started_at, :datetime_with_timezone
add_column :job_artifact_registry, :verification_retry_at, :datetime_with_timezone
add_column :job_artifact_registry, :state, :integer, null: false, limit: 2, default: 0
add_column :job_artifact_registry, :verification_state, :integer, default: 0, null: false, limit: 2
add_column :job_artifact_registry, :verification_retry_count, :integer, default: 0, limit: 2, null: false
add_column :job_artifact_registry, :verification_checksum, :binary
add_column :job_artifact_registry, :verification_checksum_mismatched, :binary
add_column :job_artifact_registry, :checksum_mismatch, :boolean, default: false, null: false
add_column :job_artifact_registry, :verification_failure, :string, limit: 255 # rubocop:disable Migration/PreventStrings see https://gitlab.com/gitlab-org/gitlab/-/issues/323806
add_column :job_artifact_registry, :last_sync_failure, :string, limit: 255 # rubocop:disable Migration/PreventStrings see https://gitlab.com/gitlab-org/gitlab/-/issues/323806
end
end
# frozen_string_literal: true
class PrepareJobArtifactRegistryForSsfIndecies < Gitlab::Database::Migration[1.0]
disable_ddl_transaction!
def up
add_concurrent_index :job_artifact_registry, :verification_retry_at, name: :job_artifact_registry_failed_verification, order: "NULLS FIRST", where: "((state = 2) AND (verification_state = 3))"
add_concurrent_index :job_artifact_registry, :verification_state, name: :job_artifact_registry_needs_verification, where: "((state = 2) AND (verification_state = ANY (ARRAY[0, 3])))"
add_concurrent_index :job_artifact_registry, :verified_at, name: :job_artifact_registry_pending_verification, order: "NULLS FIRST", where: "((state = 2) AND (verification_state = 0))"
end
def down
remove_concurrent_index :job_artifact_registry, :verification_retry_at, name: :job_artifact_registry_failed_verification
remove_concurrent_index :job_artifact_registry, :verification_state, name: :job_artifact_registry_needs_verification
remove_concurrent_index :job_artifact_registry, :verified_at, name: :job_artifact_registry_pending_verification
end
end
# frozen_string_literal: true
class MigrateJobArtifactRegistry < Gitlab::Database::Migration[1.0]
MIGRATION = 'MigrateJobArtifactRegistryToSsf'
DELAY_INTERVAL = 2.minutes
BATCH_SIZE = 5_000
disable_ddl_transaction!
class JobArtifactRegistry < Geo::TrackingBase
include EachBatch
self.table_name = 'job_artifact_registry'
end
def up
queue_background_migration_jobs_by_range_at_intervals(JobArtifactRegistry, MIGRATION, DELAY_INTERVAL, batch_size: BATCH_SIZE)
end
def down
# no-op
end
end
418ae4a74ff25c09629196916400eeebe5795d1c82918d09aaaa040d228ecae8
\ No newline at end of file
0eaee99b45bf4f98933b1a04cf92207ccadef29d89388049efe44a19a0c34c4a
\ No newline at end of file
ec84e9ecbed7fdd59541ebe7c374abba3d4e657bb6cd9cab1920421e8b0e349a
\ No newline at end of file
......@@ -123,10 +123,22 @@ CREATE TABLE job_artifact_registry (
retry_at timestamp with time zone,
bytes bigint,
artifact_id integer,
retry_count integer,
retry_count integer DEFAULT 0,
success boolean,
sha256 character varying,
missing_on_primary boolean DEFAULT false NOT NULL
missing_on_primary boolean DEFAULT false NOT NULL,
state smallint DEFAULT 0 NOT NULL,
last_synced_at timestamp with time zone,
last_sync_failure character varying(255),
verified_at timestamp with time zone,
verification_started_at timestamp with time zone,
verification_retry_at timestamp with time zone,
verification_state smallint DEFAULT 0 NOT NULL,
verification_retry_count smallint DEFAULT 0 NOT NULL,
verification_checksum bytea,
verification_checksum_mismatched bytea,
checksum_mismatch boolean DEFAULT false NOT NULL,
verification_failure character varying(255)
);
CREATE SEQUENCE job_artifact_registry_id_seq
......@@ -622,6 +634,12 @@ CREATE INDEX lfs_object_registry_needs_verification ON lfs_object_registry USING
CREATE INDEX lfs_object_registry_pending_verification ON lfs_object_registry USING btree (verified_at NULLS FIRST) WHERE ((state = 2) AND (verification_state = 0));
CREATE INDEX job_artifact_registry_failed_verification ON job_artifact_registry USING btree (verification_retry_at NULLS FIRST) WHERE ((state = 2) AND (verification_state = 3));
CREATE INDEX job_artifact_registry_needs_verification ON job_artifact_registry USING btree (verification_state) WHERE ((state = 2) AND (verification_state = ANY (ARRAY[0, 3])));
CREATE INDEX job_artifact_registry_pending_verification ON job_artifact_registry USING btree (verified_at NULLS FIRST) WHERE ((state = 2) AND (verification_state = 0));
CREATE INDEX merge_request_diff_registry_failed_verification ON merge_request_diff_registry USING btree (verification_retry_at NULLS FIRST) WHERE ((state = 2) AND (verification_state = 3));
CREATE INDEX merge_request_diff_registry_needs_verification ON merge_request_diff_registry USING btree (verification_state) WHERE ((state = 2) AND (verification_state = ANY (ARRAY[0, 3])));
......
......@@ -35,7 +35,7 @@ module EE
expose :db_replication_lag_seconds
expose :job_artifacts_replication_enabled
expose :job_artifacts_replication_enabled, if: -> (*) { ::Geo::JobArtifactReplicator.disabled? }
expose :container_repositories_replication_enabled
expose :design_repositories_replication_enabled
expose :repositories_replication_enabled
......
# frozen_string_literal: true
module EE
module Gitlab
module BackgroundMigration
module MigrateJobArtifactRegistryToSsf
class JobArtifactRegistry < Geo::BaseRegistry
self.table_name = 'job_artifact_registry'
end
def perform(start_id, end_id)
JobArtifactRegistry.where(id: start_id..end_id, success: true).update_all(state: 2)
end
end
end
end
end
......@@ -17,7 +17,7 @@ module EE
end
def delete_on_geo_secondaries(key)
Geo::CacheInvalidationEventStore.new(key).create!
::Geo::CacheInvalidationEventStore.new(key).create!
end
end
end
......
......@@ -28,7 +28,8 @@ module Gitlab
::Geo::GroupWikiRepositoryReplicator,
::Geo::PipelineArtifactReplicator,
::Geo::PagesDeploymentReplicator,
::Geo::UploadReplicator
::Geo::UploadReplicator,
::Geo::JobArtifactReplicator
].freeze
def self.current_node
......
......@@ -250,6 +250,8 @@ module Gitlab
end
def print_ci_job_artifacts_status
return if ::Geo::JobArtifactReplicator.enabled?
print 'CI job artifacts: '.rjust(GEO_STATUS_COLUMN_WIDTH)
show_failed_value(current_node_status.job_artifacts_failed_count)
print "#{current_node_status.job_artifacts_synced_count}/#{current_node_status.job_artifacts_count} "
......
......@@ -12,6 +12,18 @@ FactoryBot.define do
end
end
trait :verification_succeeded do
common_security_report # with file
verification_checksum { 'abc' }
verification_state { Ci::JobArtifact.verification_state_value(:verification_succeeded) }
end
trait :verification_failed do
common_security_report # with file
verification_failure { 'Could not calculate the checksum' }
verification_state { Ci::JobArtifact.verification_state_value(:verification_failed) }
end
trait :sast_with_vulnerability_flags do
file_type { :sast }
file_format { :raw }
......
......@@ -2,6 +2,41 @@
FactoryBot.define do
factory :geo_job_artifact_registry, class: 'Geo::JobArtifactRegistry' do
association :job_artifact, factory: [:ci_job_artifact, :with_file]
state { Geo::JobArtifactRegistry.state_value(:pending) }
trait :synced do
state { Geo::JobArtifactRegistry.state_value(:synced) }
last_synced_at { 5.days.ago }
end
trait :failed do
state { Geo::JobArtifactRegistry.state_value(:failed) }
last_synced_at { 1.day.ago }
retry_count { 2 }
last_sync_failure { 'Random error' }
end
trait :started do
state { Geo::JobArtifactRegistry.state_value(:started) }
last_synced_at { 1.day.ago }
retry_count { 0 }
end
trait :verification_succeeded do
verification_checksum { 'e079a831cab27bcda7d81cd9b48296d0c3dd92ef' }
verification_state { Geo::JobArtifactRegistry.verification_state_value(:verification_succeeded) }
verified_at { 5.days.ago }
end
trait :orphan do
after(:create) do |registry, _|
Ci::JobArtifact.find(registry.artifact_id).delete
end
end
end
factory :geo_job_artifact_registry_legacy, class: 'Geo::JobArtifactRegistry' do
sequence(:artifact_id)
success { true }
......
# frozen_string_literal: true
FactoryBot.define do
factory :geo_job_artifact_state, class: 'Geo::JobArtifactState' do
job_artifact
trait(:checksummed) do
verification_checksum { 'abc' }
end
trait(:checksum_failure) do
verification_failure { 'Could not calculate the checksum' }
end
end
end
......@@ -7,9 +7,6 @@ FactoryBot.define do
trait :healthy do
status_message { nil }
job_artifacts_count { 580 }
job_artifacts_failed_count { 3 }
job_artifacts_synced_count { 577 }
job_artifacts_synced_missing_on_primary_count { 91 }
container_repositories_count { 400 }
container_repositories_registry_count { 203 }
......@@ -65,7 +62,6 @@ FactoryBot.define do
end
trait :replicated_and_verified do
job_artifacts_failed_count { 0 }
container_repositories_failed_count { 0 }
design_repositories_failed_count { 0 }
repositories_failed_count { 0 }
......@@ -85,14 +81,12 @@ FactoryBot.define do
wikis_checksum_total_count { 10 }
wikis_verified_count { 10 }
wikis_verification_total_count { 10 }
job_artifacts_synced_count { 10 }
replication_slots_used_count { 10 }
container_repositories_synced_count { 10 }
design_repositories_synced_count { 10 }
repositories_count { 10 }
wikis_count { 10 }
job_artifacts_count { 10 }
replication_slots_count { 10 }
container_repositories_count { 10 }
design_repositories_count { 10 }
......
......@@ -2,10 +2,12 @@
require 'spec_helper'
RSpec.describe Geo::JobArtifactRegistryFinder, :geo do
RSpec.describe Geo::JobArtifactLegacyRegistryFinder, :geo do
it_behaves_like 'a file registry finder' do
before do
stub_artifacts_object_storage
stub_feature_flags(geo_job_artifact_replication: false)
end
let_it_be(:project) { create(:project) }
......@@ -19,13 +21,13 @@ RSpec.describe Geo::JobArtifactRegistryFinder, :geo do
let!(:replicable_7) { create(:ci_job_artifact, :remote_store, project: project) }
let!(:replicable_8) { create(:ci_job_artifact, :remote_store, project: project) }
let_it_be(:registry_1) { create(:geo_job_artifact_registry, :failed, artifact_id: replicable_1.id) }
let_it_be(:registry_2) { create(:geo_job_artifact_registry, artifact_id: replicable_2.id, missing_on_primary: true) }
let_it_be(:registry_3) { create(:geo_job_artifact_registry, :never_synced, artifact_id: replicable_3.id) }
let_it_be(:registry_4) { create(:geo_job_artifact_registry, :failed, artifact_id: replicable_4.id) }
let_it_be(:registry_5) { create(:geo_job_artifact_registry, artifact_id: replicable_5.id, missing_on_primary: true, retry_at: 1.day.ago) }
let!(:registry_6) { create(:geo_job_artifact_registry, :failed, artifact_id: replicable_6.id) }
let!(:registry_7) { create(:geo_job_artifact_registry, :failed, artifact_id: replicable_7.id, missing_on_primary: true) }
let!(:registry_8) { create(:geo_job_artifact_registry, :never_synced, artifact_id: replicable_8.id) }
let_it_be(:registry_1) { create(:geo_job_artifact_registry_legacy, :failed, artifact_id: replicable_1.id) }
let_it_be(:registry_2) { create(:geo_job_artifact_registry_legacy, artifact_id: replicable_2.id, missing_on_primary: true) }
let_it_be(:registry_3) { create(:geo_job_artifact_registry_legacy, :never_synced, artifact_id: replicable_3.id) }
let_it_be(:registry_4) { create(:geo_job_artifact_registry_legacy, :failed, artifact_id: replicable_4.id) }
let_it_be(:registry_5) { create(:geo_job_artifact_registry_legacy, artifact_id: replicable_5.id, missing_on_primary: true, retry_at: 1.day.ago) }
let!(:registry_6) { create(:geo_job_artifact_registry_legacy, :failed, artifact_id: replicable_6.id) }
let!(:registry_7) { create(:geo_job_artifact_registry_legacy, :failed, artifact_id: replicable_7.id, missing_on_primary: true) }
let!(:registry_8) { create(:geo_job_artifact_registry_legacy, :never_synced, artifact_id: replicable_8.id) }
end
end
......@@ -22,6 +22,14 @@
"job_artifacts_failed_count",
"job_artifacts_synced_count",
"job_artifacts_synced_missing_on_primary_count",
"job_artifacts_checksum_total_count",
"job_artifacts_checksummed_count",
"job_artifacts_checksum_failed_count",
"job_artifacts_registry_count",
"job_artifacts_verification_total_count",
"job_artifacts_verified_count",
"job_artifacts_verification_failed_count",
"job_artifacts_verified_in_percentage",
"db_replication_lag_seconds",
"container_repositories_replication_enabled",
"container_repositories_count",
......@@ -205,6 +213,14 @@
"job_artifacts_synced_count": { "type": ["integer", "null"] },
"job_artifacts_synced_missing_on_primary_count": { "type": ["integer", "null"] },
"job_artifacts_synced_in_percentage": { "type": "string" },
"job_artifacts_checksum_total_count": { "type": ["integer", "null"] },
"job_artifacts_checksummed_count": { "type": ["integer", "null"] },
"job_artifacts_checksum_failed_count": { "type": ["integer", "null"] },
"job_artifacts_registry_count": { "type": ["integer", "null"] },
"job_artifacts_verification_total_count": { "type": ["integer", "null"] },
"job_artifacts_verified_count": { "type": ["integer", "null"] },
"job_artifacts_verification_failed_count": { "type": ["integer", "null"] },
"job_artifacts_verified_in_percentage": { "type": "string" },
"container_repositories_replication_enabled": { "type": ["boolean", "null"] },
"container_repositories_count": { "type": "integer" },
"container_repositories_failed_count": { "type": ["integer", "null"] },
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Resolvers::Geo::JobArtifactRegistriesResolver do
it_behaves_like 'a Geo registries resolver', :geo_job_artifact_registry
end
......@@ -15,7 +15,7 @@ RSpec.describe GitlabSchema.types['GeoNode'] do
package_file_registries snippet_repository_registries
terraform_state_version_registries group_wiki_repository_registries
pages_deployment_registries lfs_object_registries pipeline_artifact_registries
upload_registries
upload_registries job_artifact_registries
]
expect(described_class).to have_graphql_fields(*expected_fields)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe GitlabSchema.types['JobArtifactRegistry'] do
it_behaves_like 'a Geo registry type'
it 'has the expected fields (other than those included in RegistryType)' do
expected_fields = %i[artifact_id]
expect(described_class).to have_graphql_fields(*expected_fields).at_least
end
end
......@@ -57,7 +57,7 @@ RSpec.describe EE::API::Entities::GeoNodeStatus do
describe '#job_artifacts_synced_in_percentage' do
it 'formats as percentage' do
geo_node_status.assign_attributes(job_artifacts_count: 256,
geo_node_status.assign_attributes(job_artifacts_registry_count: 256,
job_artifacts_failed_count: 12,
job_artifacts_synced_count: 123)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::BackgroundMigration::MigrateJobArtifactRegistryToSsf, :geo do
let(:registry) { table(:job_artifact_registry) }
let!(:registry1) { registry.create!(artifact_id: 1, success: true, state: 0)}
let!(:registry2) { registry.create!(artifact_id: 2, success: true, state: 0)}
let!(:registry3) { registry.create!(artifact_id: 3, success: true, state: 0)}
subject do
described_class.new.perform(registry1.id, registry3.id)
end
it 'updates registries' do
subject
expect(registry.where(state: 2).count).to eq 3
end
end
......@@ -14,8 +14,8 @@ RSpec.describe Gitlab::Cleanup::OrphanJobArtifactFilesBatch do
let(:max_artifact_id) { Ci::JobArtifact.maximum(:id).to_i }
let(:orphan_id_1) { max_artifact_id + 1 }
let(:orphan_id_2) { max_artifact_id + 2 }
let!(:orphan_registry_1) { create(:geo_job_artifact_registry, artifact_id: orphan_id_1) }
let!(:orphan_registry_2) { create(:geo_job_artifact_registry, artifact_id: orphan_id_2) }
let!(:orphan_registry_1) { create(:geo_job_artifact_registry_legacy, artifact_id: orphan_id_1) }
let!(:orphan_registry_2) { create(:geo_job_artifact_registry_legacy, artifact_id: orphan_id_2) }
before do
stub_secondary_node
......@@ -35,8 +35,8 @@ RSpec.describe Gitlab::Cleanup::OrphanJobArtifactFilesBatch do
context 'with dry run' do
it 'does not remove registries' do
create(:geo_job_artifact_registry, :with_artifact, artifact_type: :archive)
create(:geo_job_artifact_registry, :orphan, artifact_type: :archive)
create(:geo_job_artifact_registry_legacy, :with_artifact, artifact_type: :archive)
create(:geo_job_artifact_registry_legacy, :orphan, artifact_type: :archive)
expect { batch.clean! }.not_to change { Geo::JobArtifactRegistry.count }
expect(batch.geo_registries_count).to eq(2)
......
......@@ -34,7 +34,7 @@ RSpec.describe Gitlab::Cleanup::OrphanJobArtifactFiles do
it 'accumulates the number of cleaned Geo registries' do
stub_const("#{described_class.name}::BATCH_SIZE", 2)
create_list(:geo_job_artifact_registry, 3, :orphan, artifact_type: :archive)
create_list(:geo_job_artifact_registry, 3, :orphan)
create(:ci_job_artifact, :archive).delete
cleanup.run!
......
......@@ -14,6 +14,8 @@ RSpec.describe Gitlab::Geo::GeoNodeStatusCheck do
describe '#replication_verification_complete?' do
before do
allow(Gitlab.config.geo.registry_replication).to receive(:enabled).and_return(true)
stub_feature_flags(geo_job_artifact_replication: false)
end
context 'with legacy replication' do
......
......@@ -11,6 +11,10 @@ RSpec.describe Gitlab::Geo::LogCursor::Events::JobArtifactDeletedEvent, :clean_g
subject { described_class.new(job_artifact_deleted_event, Time.now, logger) }
before do
stub_feature_flags(geo_job_artifact_replication: false)
end
around do |example|
Sidekiq::Testing.inline! { example.run }
end
......@@ -24,7 +28,7 @@ RSpec.describe Gitlab::Geo::LogCursor::Events::JobArtifactDeletedEvent, :clean_g
describe '#process' do
context 'with a tracking database entry' do
before do
create(:geo_job_artifact_registry, artifact_id: job_artifact.id)
create(:geo_job_artifact_registry_legacy, artifact_id: job_artifact.id)
end
context 'with a file' do
......
# frozen_string_literal: true
require 'spec_helper'
require_migration!
RSpec.describe MigrateJobArtifactRegistry do
let(:migration_name) { 'MigrateJobArtifactRegistryToSsf' }
let(:registry) { table(:job_artifact_registry) }
let!(:registry1) { registry.create!(artifact_id: 1, success: true, state: 0)}
let!(:registry2) { registry.create!(artifact_id: 2, success: true, state: 0) }
let!(:registry3) { registry.create!(artifact_id: 3, success: true, state: 0) }
let!(:registry4) { registry.create!(artifact_id: 4, success: true, state: 0) }
before do
stub_const("#{described_class.name}::BATCH_SIZE", 2)
end
it 'correctly schedules background migrations' do
Sidekiq::Testing.fake! do
freeze_time do
migrate!
expect(BackgroundMigrationWorker.jobs.size).to eq(2)
expect(described_class::MIGRATION).to be_scheduled_migration_with_multiple_args(registry1.id, registry2.id)
expect(described_class::MIGRATION).to be_scheduled_migration_with_multiple_args(registry3.id, registry4.id)
end
end
end
end
......@@ -6,6 +6,15 @@ RSpec.describe Ci::JobArtifact do
using RSpec::Parameterized::TableSyntax
include EE::GeoHelpers
include_examples 'a replicable model with a separate table for verification state' do
before do
stub_artifacts_object_storage
end
let(:verifiable_model_record) { build(:ci_job_artifact) } # add extra params if needed to make sure the record is included in `available_verifiables`
let(:unverifiable_model_record) { build(:ci_job_artifact, :remote_store) } # add extra params if needed to make sure the record is NOT included in `available_verifiables`
end
it { is_expected.to delegate_method(:validate_schema?).to(:job) }
describe '#destroy' do
......@@ -17,6 +26,8 @@ RSpec.describe Ci::JobArtifact do
end
it 'creates a JobArtifactDeletedEvent' do
stub_feature_flags(geo_job_artifact_replication: false)
job_artifact = create(:ee_ci_job_artifact, :archive)
expect { job_artifact.destroy! }.to change { Geo::JobArtifactDeletedEvent.count }.by(1)
......
......@@ -164,9 +164,9 @@ RSpec.describe GeoNodeStatus, :geo do
it 'counts synced job artifacts' do
# These should be ignored
create(:geo_upload_registry)
create(:geo_job_artifact_registry, :with_artifact, success: false)
create(:geo_job_artifact_registry_legacy, :with_artifact, success: false)
create(:geo_job_artifact_registry, :with_artifact, success: true)
create(:geo_job_artifact_registry_legacy, :with_artifact, success: true)
expect(subject.job_artifacts_synced_count).to eq(1)
end
......@@ -174,11 +174,13 @@ RSpec.describe GeoNodeStatus, :geo do
describe '#job_artifacts_synced_missing_on_primary_count' do
it 'counts job artifacts marked as synced due to file missing on the primary' do
stub_feature_flags(geo_job_artifact_replication: false)
# These should be ignored
create(:geo_upload_registry, missing_on_primary: true)
create(:geo_job_artifact_registry, :with_artifact, success: true)
create(:geo_job_artifact_registry_legacy, :with_artifact, success: true)
create(:geo_job_artifact_registry, :with_artifact, success: true, missing_on_primary: true)
create(:geo_job_artifact_registry_legacy, :with_artifact, success: true, missing_on_primary: true)
expect(subject.job_artifacts_synced_missing_on_primary_count).to eq(1)
end
......@@ -186,10 +188,12 @@ RSpec.describe GeoNodeStatus, :geo do
describe '#job_artifacts_failed_count' do
it 'counts failed job artifacts' do
stub_feature_flags(geo_job_artifact_replication: false)
# These should be ignored
create(:geo_upload_registry, :failed)
create(:geo_job_artifact_registry, :with_artifact, success: true)
create(:geo_job_artifact_registry, :with_artifact, :failed)
create(:geo_job_artifact_registry_legacy, :with_artifact, success: true)
create(:geo_job_artifact_registry_legacy, :with_artifact, :failed)
expect(subject.job_artifacts_failed_count).to eq(1)
end
......@@ -202,7 +206,7 @@ RSpec.describe GeoNodeStatus, :geo do
build = create(:ci_build, project: project)
job_artifact = create(:ci_job_artifact, job: build)
create(:geo_job_artifact_registry, success: index.even?, artifact_id: job_artifact.id)
create(:geo_job_artifact_registry_legacy, success: index.even?, artifact_id: job_artifact.id)
end
end
......@@ -1080,6 +1084,7 @@ RSpec.describe GeoNodeStatus, :geo do
Geo::GroupWikiRepositoryReplicator | :group_wiki_repository | :geo_group_wiki_repository_registry
Geo::PagesDeploymentReplicator | :pages_deployment | :geo_pages_deployment_registry
Geo::UploadReplicator | :upload | :geo_upload_registry
Geo::JobArtifactReplicator | :ci_job_artifact | :geo_job_artifact_registry
end
with_them do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::JobArtifactReplicator do
let(:model_record) { create(:ci_job_artifact, :with_file) }
include_examples 'a blob replicator'
include_examples 'a verifiable replicator'
end
......@@ -58,4 +58,11 @@ RSpec.describe 'Gets registries' do
registry_factory: :geo_upload_registry,
registry_foreign_key_field_name: 'fileId'
}
it_behaves_like 'gets registries for', {
field_name: 'jobArtifactRegistries',
registry_class_name: 'JobArtifactRegistry',
registry_factory: :geo_job_artifact_registry,
registry_foreign_key_field_name: 'artifactId'
}
end
......@@ -11,6 +11,7 @@ RSpec.describe Geo::FileDownloadService do
before do
stub_current_geo_node(secondary)
stub_feature_flags(geo_job_artifact_replication: false)
end
describe '#downloader' do
......@@ -28,7 +29,7 @@ RSpec.describe Geo::FileDownloadService do
context 'with job_artifacts' do
let!(:geo_job_artifact_registry) do
create(:geo_job_artifact_registry, success: false, retry_count: 31, artifact_id: file.id)
create(:geo_job_artifact_registry_legacy, success: false, retry_count: 31, artifact_id: file.id)
end
let(:file) { create(:ci_job_artifact) }
......@@ -181,7 +182,7 @@ RSpec.describe Geo::FileDownloadService do
context 'for a registered file that failed to sync' do
let!(:geo_job_artifact_registry) do
create(:geo_job_artifact_registry, success: false, artifact_id: file.id, retry_count: 3, retry_at: 1.hour.ago)
create(:geo_job_artifact_registry_legacy, success: false, artifact_id: file.id, retry_count: 3, retry_at: 1.hour.ago)
end
context 'when the file is successfully downloaded' do
......
......@@ -59,9 +59,13 @@ RSpec.describe Geo::FileRegistryRemovalService, :geo do
end
let!(:job_artifact) { create(:ci_job_artifact, :archive) }
let!(:registry) { create(:geo_job_artifact_registry, artifact_id: job_artifact.id) }
let!(:registry) { create(:geo_job_artifact_registry_legacy, artifact_id: job_artifact.id) }
let!(:file_path) { job_artifact.file.path }
before do
stub_feature_flags(geo_job_artifact_replication: false)
end
it_behaves_like 'removes artifact'
context 'migrated to object storage' do
......
......@@ -20,7 +20,8 @@ RSpec.describe Geo::RegistryConsistencyService, :geo, :use_clean_rails_memory_st
Geo::DesignRegistry => :project_with_design,
Geo::MergeRequestDiffRegistry => :external_merge_request_diff,
Geo::PackageFileRegistry => :package_file,
Geo::UploadRegistry => :upload
Geo::UploadRegistry => :upload,
Geo::JobArtifactRegistry => :ci_job_artifact
}.fetch(registry_class, default_factory_name)
end
......
......@@ -205,6 +205,8 @@ RSpec.describe 'geo rake tasks', :geo, :silence_stdout do
context 'with SSF LFS replication eneabled' do
it 'prints messages for all the checks' do
stub_feature_flags(geo_job_artifact_replication: false)
checks.each do |text|
expect { run_rake_task('geo:status') }.to output(text).to_stdout
end
......
......@@ -30,6 +30,8 @@ RSpec.describe 'Every GitLab uploader' do
# When this test starts failing means that we have migrated Geo's handling of uploads to the
# SSF, and we can remove the tests for the file retriever and downloader classes.
it 'has some uploads to be migrated' do
stub_feature_flags(geo_job_artifact_replication: false)
expect(object_types - replicable_names).not_to be_empty
end
end
......@@ -84,8 +86,10 @@ RSpec.describe 'Every GitLab uploader' do
end
def handled_by_ssf?(uploader)
return true if uploads?(uploader)
replicable_name = replicable_name_for(uploader)
replicable_names.include?(replicable_name) || uploads?(uploader)
replicable_names.include?(replicable_name) && ::Gitlab::Geo::Replicator.for_replicable_name(replicable_name).enabled?
end
def uploads?(uploader)
......@@ -108,7 +112,11 @@ RSpec.describe 'Every GitLab uploader' do
end
def replicable_names
@replicable_names ||= replicators.map(&:replicable_name)
@replicable_names ||= begin
replicators
.map(&:replicable_name)
.select {|replicable_name| ::Gitlab::Geo::Replicator.for_replicable_name(replicable_name).enabled? }
end
end
def replicable_name_for(uploader)
......
......@@ -19,6 +19,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
WebMock.stub_request(:get, /primary-geo-node/).to_return(status: 200, body: "", headers: {})
allow(Geo::FileDownloadWorker).to receive(:with_status).and_return(Geo::FileDownloadWorker)
stub_feature_flags(geo_job_artifact_replication: false)
end
it 'does not schedule anything when tracking database is not configured' do
......@@ -48,7 +49,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
context 'with job artifacts' do
it 'performs Geo::FileDownloadWorker for unsynced job artifacts' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
registry = create(:geo_job_artifact_registry_legacy, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy)
......@@ -57,7 +58,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
end
it 'performs Geo::FileDownloadWorker for failed-sync job artifacts' do
registry = create(:geo_job_artifact_registry, :with_artifact, :failed)
registry = create(:geo_job_artifact_registry_legacy, :with_artifact, :failed)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy)
......@@ -66,7 +67,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts' do
registry = create(:geo_job_artifact_registry, :with_artifact, bytes: 1234, success: true)
registry = create(:geo_job_artifact_registry_legacy, :with_artifact, bytes: 1234, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id)
......@@ -75,7 +76,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts even with 0 bytes downloaded' do
registry = create(:geo_job_artifact_registry, :with_artifact, bytes: 0, success: true)
registry = create(:geo_job_artifact_registry_legacy, :with_artifact, bytes: 0, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id)
......@@ -84,7 +85,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
end
it 'does not retry failed artifacts when retry_at is tomorrow' do
registry = create(:geo_job_artifact_registry, :with_artifact, :failed, retry_at: Date.tomorrow)
registry = create(:geo_job_artifact_registry_legacy, :with_artifact, :failed, retry_at: Date.tomorrow)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id)
......@@ -93,7 +94,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
end
it 'retries failed artifacts when retry_at is in the past' do
registry = create(:geo_job_artifact_registry, :with_artifact, :failed, retry_at: Date.yesterday)
registry = create(:geo_job_artifact_registry_legacy, :with_artifact, :failed, retry_at: Date.yesterday)
expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy)
......@@ -103,10 +104,10 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
context 'with files missing on the primary that are marked as synced' do
let!(:artifact_file_missing_on_primary) { create(:ci_job_artifact) }
let!(:artifact_registry) { create(:geo_job_artifact_registry, artifact_id: artifact_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true) }
let!(:artifact_registry) { create(:geo_job_artifact_registry_legacy, artifact_id: artifact_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true) }
it 'retries the files if there is spare capacity' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
registry = create(:geo_job_artifact_registry_legacy, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id)
......@@ -131,7 +132,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
end
it 'does not retry those files if there is no spare capacity' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
registry = create(:geo_job_artifact_registry_legacy, :with_artifact, :never_synced)
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id)
......@@ -140,7 +141,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
end
it 'does not retry those files if they are already scheduled' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced)
registry = create(:geo_job_artifact_registry_legacy, :with_artifact, :never_synced)
scheduled_jobs = [{ type: 'job_artifact', id: artifact_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
......@@ -171,7 +172,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
result_object = double(:result, success: true, bytes_downloaded: 100, primary_missing_file: false)
allow_any_instance_of(::Gitlab::Geo::Replication::BaseTransfer).to receive(:download_from_primary).and_return(result_object)
create_list(:geo_job_artifact_registry, 6, :with_artifact, :never_synced)
create_list(:geo_job_artifact_registry_legacy, 6, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).to receive(:perform_async).exactly(6).times.and_call_original
# For 10 downloads, we expect four database reloads:
......
......@@ -100,6 +100,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo do
expect(Geo::TerraformStateVersionRegistry.where(terraform_state_version_id: terraform_state_version.id).count).to eq(0)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(0)
expect(Geo::PagesDeploymentRegistry.where(pages_deployment: pages_deployment.id).count).to eq(0)
expect(Geo::JobArtifactRegistry.where(job_artifact: job_artifact.id).count).to eq(0)
subject.perform
......@@ -114,6 +115,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo do
expect(Geo::TerraformStateVersionRegistry.where(terraform_state_version_id: terraform_state_version.id).count).to eq(1)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(1)
expect(Geo::PagesDeploymentRegistry.where(pages_deployment: pages_deployment.id).count).to eq(1)
expect(Geo::JobArtifactRegistry.where(job_artifact: job_artifact.id).count).to eq(1)
end
context 'when the current Geo node is disabled or primary' do
......
# frozen_string_literal: true
module Gitlab
module BackgroundMigration
# rubocop: disable Style/Documentation
class MigrateJobArtifactRegistryToSsf
def perform(*job_artifact_ids)
end
end
end
end
Gitlab::BackgroundMigration::MigrateJobArtifactRegistryToSsf.prepend_mod_with('Gitlab::BackgroundMigration::MigrateJobArtifactRegistryToSsf')
......@@ -85,6 +85,7 @@ ci_instance_variables: :gitlab_ci
ci_job_artifacts: :gitlab_ci
ci_job_token_project_scope_links: :gitlab_ci
ci_job_variables: :gitlab_ci
ci_job_artifact_states: :gitlab_ci
ci_minutes_additional_packs: :gitlab_ci
ci_namespace_monthly_usages: :gitlab_ci
ci_namespace_mirrors: :gitlab_ci
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment