Commit 4fbccfcb authored by Valery Sizov's avatar Valery Sizov Committed by Douglas Barbosa Alexandre

Geo: Job Artifacts replication using SSF

We currently replicate JobArtifacts using legacy worker
this MR ports the replication to use new Geo SSF

Changelog: changed
parent 2bd0b1a9
# frozen_string_literal: true
class CreateJobArtifactStates < Gitlab::Database::Migration[1.0]
VERIFICATION_STATE_INDEX_NAME = "index_job_artifact_states_on_verification_state"
PENDING_VERIFICATION_INDEX_NAME = "index_job_artifact_states_pending_verification"
FAILED_VERIFICATION_INDEX_NAME = "index_job_artifact_states_failed_verification"
NEEDS_VERIFICATION_INDEX_NAME = "index_job_artifact_states_needs_verification"
enable_lock_retries!
def up
create_table :ci_job_artifact_states, id: false do |t|
t.datetime_with_timezone :verification_started_at
t.datetime_with_timezone :verification_retry_at
t.datetime_with_timezone :verified_at
t.references :job_artifact, primary_key: true, null: false, foreign_key: { on_delete: :cascade, to_table: :ci_job_artifacts }
t.integer :verification_state, default: 0, limit: 2, null: false
t.integer :verification_retry_count, limit: 2
t.binary :verification_checksum, using: 'verification_checksum::bytea'
t.text :verification_failure, limit: 255
t.index :verification_state, name: VERIFICATION_STATE_INDEX_NAME
t.index :verified_at, where: "(verification_state = 0)", order: { verified_at: 'ASC NULLS FIRST' }, name: PENDING_VERIFICATION_INDEX_NAME
t.index :verification_retry_at, where: "(verification_state = 3)", order: { verification_retry_at: 'ASC NULLS FIRST' }, name: FAILED_VERIFICATION_INDEX_NAME
t.index :verification_state, where: "(verification_state = 0 OR verification_state = 3)", name: NEEDS_VERIFICATION_INDEX_NAME
end
end
def down
drop_table :ci_job_artifact_states
end
end
d618c28360f7716807e9727566019e269963d85164cf2f306ec9692d3b037802
\ No newline at end of file
...@@ -11863,6 +11863,27 @@ CREATE SEQUENCE ci_instance_variables_id_seq ...@@ -11863,6 +11863,27 @@ CREATE SEQUENCE ci_instance_variables_id_seq
ALTER SEQUENCE ci_instance_variables_id_seq OWNED BY ci_instance_variables.id; ALTER SEQUENCE ci_instance_variables_id_seq OWNED BY ci_instance_variables.id;
CREATE TABLE ci_job_artifact_states (
verification_started_at timestamp with time zone,
verification_retry_at timestamp with time zone,
verified_at timestamp with time zone,
job_artifact_id bigint NOT NULL,
verification_state smallint DEFAULT 0 NOT NULL,
verification_retry_count smallint,
verification_checksum bytea,
verification_failure text,
CONSTRAINT check_df832b66ea CHECK ((char_length(verification_failure) <= 255))
);
CREATE SEQUENCE ci_job_artifact_states_job_artifact_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE ci_job_artifact_states_job_artifact_id_seq OWNED BY ci_job_artifact_states.job_artifact_id;
CREATE TABLE ci_job_artifacts ( CREATE TABLE ci_job_artifacts (
project_id integer NOT NULL, project_id integer NOT NULL,
file_type integer NOT NULL, file_type integer NOT NULL,
...@@ -21661,6 +21682,8 @@ ALTER TABLE ONLY ci_group_variables ALTER COLUMN id SET DEFAULT nextval('ci_grou ...@@ -21661,6 +21682,8 @@ ALTER TABLE ONLY ci_group_variables ALTER COLUMN id SET DEFAULT nextval('ci_grou
ALTER TABLE ONLY ci_instance_variables ALTER COLUMN id SET DEFAULT nextval('ci_instance_variables_id_seq'::regclass); ALTER TABLE ONLY ci_instance_variables ALTER COLUMN id SET DEFAULT nextval('ci_instance_variables_id_seq'::regclass);
ALTER TABLE ONLY ci_job_artifact_states ALTER COLUMN job_artifact_id SET DEFAULT nextval('ci_job_artifact_states_job_artifact_id_seq'::regclass);
ALTER TABLE ONLY ci_job_artifacts ALTER COLUMN id SET DEFAULT nextval('ci_job_artifacts_id_seq'::regclass); ALTER TABLE ONLY ci_job_artifacts ALTER COLUMN id SET DEFAULT nextval('ci_job_artifacts_id_seq'::regclass);
ALTER TABLE ONLY ci_job_token_project_scope_links ALTER COLUMN id SET DEFAULT nextval('ci_job_token_project_scope_links_id_seq'::regclass); ALTER TABLE ONLY ci_job_token_project_scope_links ALTER COLUMN id SET DEFAULT nextval('ci_job_token_project_scope_links_id_seq'::regclass);
...@@ -23138,6 +23161,9 @@ ALTER TABLE ONLY ci_group_variables ...@@ -23138,6 +23161,9 @@ ALTER TABLE ONLY ci_group_variables
ALTER TABLE ONLY ci_instance_variables ALTER TABLE ONLY ci_instance_variables
ADD CONSTRAINT ci_instance_variables_pkey PRIMARY KEY (id); ADD CONSTRAINT ci_instance_variables_pkey PRIMARY KEY (id);
ALTER TABLE ONLY ci_job_artifact_states
ADD CONSTRAINT ci_job_artifact_states_pkey PRIMARY KEY (job_artifact_id);
ALTER TABLE ONLY ci_job_artifacts ALTER TABLE ONLY ci_job_artifacts
ADD CONSTRAINT ci_job_artifacts_pkey PRIMARY KEY (id); ADD CONSTRAINT ci_job_artifacts_pkey PRIMARY KEY (id);
...@@ -25767,6 +25793,8 @@ CREATE UNIQUE INDEX index_ci_group_variables_on_group_id_and_key_and_environment ...@@ -25767,6 +25793,8 @@ CREATE UNIQUE INDEX index_ci_group_variables_on_group_id_and_key_and_environment
CREATE UNIQUE INDEX index_ci_instance_variables_on_key ON ci_instance_variables USING btree (key); CREATE UNIQUE INDEX index_ci_instance_variables_on_key ON ci_instance_variables USING btree (key);
CREATE INDEX index_ci_job_artifact_states_on_job_artifact_id ON ci_job_artifact_states USING btree (job_artifact_id);
CREATE INDEX index_ci_job_artifacts_for_terraform_reports ON ci_job_artifacts USING btree (project_id, id) WHERE (file_type = 18); CREATE INDEX index_ci_job_artifacts_for_terraform_reports ON ci_job_artifacts USING btree (project_id, id) WHERE (file_type = 18);
CREATE INDEX index_ci_job_artifacts_id_for_terraform_reports ON ci_job_artifacts USING btree (id) WHERE (file_type = 18); CREATE INDEX index_ci_job_artifacts_id_for_terraform_reports ON ci_job_artifacts USING btree (id) WHERE (file_type = 18);
...@@ -26755,6 +26783,14 @@ CREATE INDEX index_jira_imports_on_user_id ON jira_imports USING btree (user_id) ...@@ -26755,6 +26783,14 @@ CREATE INDEX index_jira_imports_on_user_id ON jira_imports USING btree (user_id)
CREATE INDEX index_jira_tracker_data_on_service_id ON jira_tracker_data USING btree (service_id); CREATE INDEX index_jira_tracker_data_on_service_id ON jira_tracker_data USING btree (service_id);
CREATE INDEX index_job_artifact_states_failed_verification ON ci_job_artifact_states USING btree (verification_retry_at NULLS FIRST) WHERE (verification_state = 3);
CREATE INDEX index_job_artifact_states_needs_verification ON ci_job_artifact_states USING btree (verification_state) WHERE ((verification_state = 0) OR (verification_state = 3));
CREATE INDEX index_job_artifact_states_on_verification_state ON ci_job_artifact_states USING btree (verification_state);
CREATE INDEX index_job_artifact_states_pending_verification ON ci_job_artifact_states USING btree (verified_at NULLS FIRST) WHERE (verification_state = 0);
CREATE INDEX index_keys_on_expires_at_and_id ON keys USING btree (date(timezone('UTC'::text, expires_at)), id) WHERE (expiry_notification_delivered_at IS NULL); CREATE INDEX index_keys_on_expires_at_and_id ON keys USING btree (date(timezone('UTC'::text, expires_at)), id) WHERE (expiry_notification_delivered_at IS NULL);
CREATE UNIQUE INDEX index_keys_on_fingerprint ON keys USING btree (fingerprint); CREATE UNIQUE INDEX index_keys_on_fingerprint ON keys USING btree (fingerprint);
...@@ -30968,6 +31004,9 @@ ALTER TABLE ONLY application_settings ...@@ -30968,6 +31004,9 @@ ALTER TABLE ONLY application_settings
ALTER TABLE ONLY clusters_kubernetes_namespaces ALTER TABLE ONLY clusters_kubernetes_namespaces
ADD CONSTRAINT fk_rails_7e7688ecaf FOREIGN KEY (cluster_id) REFERENCES clusters(id) ON DELETE CASCADE; ADD CONSTRAINT fk_rails_7e7688ecaf FOREIGN KEY (cluster_id) REFERENCES clusters(id) ON DELETE CASCADE;
ALTER TABLE ONLY ci_job_artifact_states
ADD CONSTRAINT fk_rails_80a9cba3b2 FOREIGN KEY (job_artifact_id) REFERENCES ci_job_artifacts(id) ON DELETE CASCADE;
ALTER TABLE ONLY approval_merge_request_rules_users ALTER TABLE ONLY approval_merge_request_rules_users
ADD CONSTRAINT fk_rails_80e6801803 FOREIGN KEY (approval_merge_request_rule_id) REFERENCES approval_merge_request_rules(id) ON DELETE CASCADE; ADD CONSTRAINT fk_rails_80e6801803 FOREIGN KEY (approval_merge_request_rule_id) REFERENCES approval_merge_request_rules(id) ON DELETE CASCADE;
...@@ -39,7 +39,7 @@ verification methods: ...@@ -39,7 +39,7 @@ verification methods:
| Blobs | User uploads _(object storage)_ | Geo with API/Managed (*2*) | _Not implemented_ | | Blobs | User uploads _(object storage)_ | Geo with API/Managed (*2*) | _Not implemented_ |
| Blobs | LFS objects _(file system)_ | Geo with API | SHA256 checksum | | Blobs | LFS objects _(file system)_ | Geo with API | SHA256 checksum |
| Blobs | LFS objects _(object storage)_ | Geo with API/Managed (*2*) | _Not implemented_ | | Blobs | LFS objects _(object storage)_ | Geo with API/Managed (*2*) | _Not implemented_ |
| Blobs | CI job artifacts _(file system)_ | Geo with API | _Not implemented_ | | Blobs | CI job artifacts _(file system)_ | Geo with API | SHA256 checksum |
| Blobs | CI job artifacts _(object storage)_ | Geo with API/Managed (*2*) | _Not implemented_ | | Blobs | CI job artifacts _(object storage)_ | Geo with API/Managed (*2*) | _Not implemented_ |
| Blobs | Archived CI build traces _(file system)_ | Geo with API | _Not implemented_ | | Blobs | Archived CI build traces _(file system)_ | Geo with API | _Not implemented_ |
| Blobs | Archived CI build traces _(object storage)_ | Geo with API/Managed (*2*) | _Not implemented_ | | Blobs | Archived CI build traces _(object storage)_ | Geo with API/Managed (*2*) | _Not implemented_ |
......
...@@ -270,6 +270,16 @@ configuration option in `gitlab.yml`. These metrics are served from the ...@@ -270,6 +270,16 @@ configuration option in `gitlab.yml`. These metrics are served from the
| `geo_pages_deployments_verification_total` | Gauge | 14.6 | Number of pages deployments verifications tried on secondary | `url` | | `geo_pages_deployments_verification_total` | Gauge | 14.6 | Number of pages deployments verifications tried on secondary | `url` |
| `geo_pages_deployments_verified` | Gauge | 14.6 | Number of pages deployments verified on secondary | `url` | | `geo_pages_deployments_verified` | Gauge | 14.6 | Number of pages deployments verified on secondary | `url` |
| `geo_pages_deployments_verification_failed` | Gauge | 14.6 | Number of pages deployments verifications failed on secondary | `url` | | `geo_pages_deployments_verification_failed` | Gauge | 14.6 | Number of pages deployments verifications failed on secondary | `url` |
| `geo_job_artifacts` | Gauge | 14.8 | Number of job artifacts on primary | `url` |
| `geo_job_artifacts_checksum_total` | Gauge | 14.8 | Number of job artifacts tried to checksum on primary | `url` |
| `geo_job_artifacts_checksummed` | Gauge | 14.8 | Number of job artifacts successfully checksummed on primary | `url` |
| `geo_job_artifacts_checksum_failed` | Gauge | 14.8 | Number of job artifacts failed to calculate the checksum on primary | `url` |
| `geo_job_artifacts_synced` | Gauge | 14.8 | Number of syncable job artifacts synced on secondary | `url` |
| `geo_job_artifacts_failed` | Gauge | 14.8 | Number of syncable job artifacts failed to sync on secondary | `url` |
| `geo_job_artifacts_registry` | Gauge | 14.8 | Number of job artifacts in the registry | `url` |
| `geo_job_artifacts_verification_total` | Gauge | 14.8 | Number of job artifacts verifications tried on secondary | `url` |
| `geo_job_artifacts_verified` | Gauge | 14.8 | Number of job artifacts verified on secondary | `url` |
| `geo_job_artifacts_verification_failed` | Gauge | 14.8 | Number of job artifacts verifications failed on secondary | `url` |
| `limited_capacity_worker_running_jobs` | Gauge | 13.5 | Number of running jobs | `worker` | | `limited_capacity_worker_running_jobs` | Gauge | 13.5 | Number of running jobs | `worker` |
| `limited_capacity_worker_max_running_jobs` | Gauge | 13.5 | Maximum number of running jobs | `worker` | | `limited_capacity_worker_max_running_jobs` | Gauge | 13.5 | Maximum number of running jobs | `worker` |
| `limited_capacity_worker_remaining_work_count` | Gauge | 13.5 | Number of jobs waiting to be enqueued | `worker` | | `limited_capacity_worker_remaining_work_count` | Gauge | 13.5 | Number of jobs waiting to be enqueued | `worker` |
......
...@@ -467,6 +467,19 @@ Example response: ...@@ -467,6 +467,19 @@ Example response:
"uploads_verified_count": null, "uploads_verified_count": null,
"uploads_verification_failed_count": null, "uploads_verification_failed_count": null,
"uploads_verified_in_percentage": "0.00%", "uploads_verified_in_percentage": "0.00%",
"job_artifacts_count": 5,
"job_artifacts_checksum_total_count": 5,
"job_artifacts_checksummed_count": 5,
"job_artifacts_checksum_failed_count": 0,
"job_artifacts_synced_count": 5,
"job_artifacts_failed_count": 0,
"job_artifacts_registry_count": 5,
"job_artifacts_verification_total_count": 5,
"job_artifacts_verified_count": 5,
"job_artifacts_verification_failed_count": 0,
"job_artifacts_synced_in_percentage": "100.00%",
"job_artifacts_verified_in_percentage": "100.00%",
"job_artifacts_synced_missing_on_primary_count": 0,
}, },
{ {
"geo_node_id": 2, "geo_node_id": 2,
...@@ -623,6 +636,19 @@ Example response: ...@@ -623,6 +636,19 @@ Example response:
"uploads_verified_count": null, "uploads_verified_count": null,
"uploads_verification_failed_count": null, "uploads_verification_failed_count": null,
"uploads_verified_in_percentage": "0.00%", "uploads_verified_in_percentage": "0.00%",
"job_artifacts_count": 5,
"job_artifacts_checksum_total_count": 5,
"job_artifacts_checksummed_count": 5,
"job_artifacts_checksum_failed_count": 0,
"job_artifacts_synced_count": 5,
"job_artifacts_failed_count": 0,
"job_artifacts_registry_count": 5,
"job_artifacts_verification_total_count": 5,
"job_artifacts_verified_count": 5,
"job_artifacts_verification_failed_count": 0,
"job_artifacts_synced_in_percentage": "100.00%",
"job_artifacts_verified_in_percentage": "100.00%",
"job_artifacts_synced_missing_on_primary_count": 0,
} }
] ]
``` ```
...@@ -776,6 +802,19 @@ Example response: ...@@ -776,6 +802,19 @@ Example response:
"uploads_verified_count": null, "uploads_verified_count": null,
"uploads_verification_failed_count": null, "uploads_verification_failed_count": null,
"uploads_verified_in_percentage": "0.00%", "uploads_verified_in_percentage": "0.00%",
"job_artifacts_count": 5,
"job_artifacts_checksum_total_count": 5,
"job_artifacts_checksummed_count": 5,
"job_artifacts_checksum_failed_count": 0,
"job_artifacts_synced_count": 5,
"job_artifacts_failed_count": 0,
"job_artifacts_registry_count": 5,
"job_artifacts_verification_total_count": 5,
"job_artifacts_verified_count": 5,
"job_artifacts_verification_failed_count": 0,
"job_artifacts_synced_in_percentage": "100.00%",
"job_artifacts_verified_in_percentage": "100.00%",
"job_artifacts_synced_missing_on_primary_count": 0,
} }
``` ```
......
...@@ -6803,6 +6803,29 @@ The edge type for [`JiraProject`](#jiraproject). ...@@ -6803,6 +6803,29 @@ The edge type for [`JiraProject`](#jiraproject).
| <a id="jiraprojectedgecursor"></a>`cursor` | [`String!`](#string) | A cursor for use in pagination. | | <a id="jiraprojectedgecursor"></a>`cursor` | [`String!`](#string) | A cursor for use in pagination. |
| <a id="jiraprojectedgenode"></a>`node` | [`JiraProject`](#jiraproject) | The item at the end of the edge. | | <a id="jiraprojectedgenode"></a>`node` | [`JiraProject`](#jiraproject) | The item at the end of the edge. |
#### `JobArtifactRegistryConnection`
The connection type for [`JobArtifactRegistry`](#jobartifactregistry).
##### Fields
| Name | Type | Description |
| ---- | ---- | ----------- |
| <a id="jobartifactregistryconnectionedges"></a>`edges` | [`[JobArtifactRegistryEdge]`](#jobartifactregistryedge) | A list of edges. |
| <a id="jobartifactregistryconnectionnodes"></a>`nodes` | [`[JobArtifactRegistry]`](#jobartifactregistry) | A list of nodes. |
| <a id="jobartifactregistryconnectionpageinfo"></a>`pageInfo` | [`PageInfo!`](#pageinfo) | Information to aid in pagination. |
#### `JobArtifactRegistryEdge`
The edge type for [`JobArtifactRegistry`](#jobartifactregistry).
##### Fields
| Name | Type | Description |
| ---- | ---- | ----------- |
| <a id="jobartifactregistryedgecursor"></a>`cursor` | [`String!`](#string) | A cursor for use in pagination. |
| <a id="jobartifactregistryedgenode"></a>`node` | [`JobArtifactRegistry`](#jobartifactregistry) | The item at the end of the edge. |
#### `JobNeedUnionConnection` #### `JobNeedUnionConnection`
The connection type for [`JobNeedUnion`](#jobneedunion). The connection type for [`JobNeedUnion`](#jobneedunion).
...@@ -10780,6 +10803,22 @@ four standard [pagination arguments](#connection-pagination-arguments): ...@@ -10780,6 +10803,22 @@ four standard [pagination arguments](#connection-pagination-arguments):
| ---- | ---- | ----------- | | ---- | ---- | ----------- |
| <a id="geonodegroupwikirepositoryregistriesids"></a>`ids` | [`[ID!]`](#id) | Filters registries by their ID. | | <a id="geonodegroupwikirepositoryregistriesids"></a>`ids` | [`[ID!]`](#id) | Filters registries by their ID. |
##### `GeoNode.jobArtifactRegistries`
Find Job Artifact registries on this Geo node Available only when feature flag `geo_job_artifact_replication` is enabled. This flag is disabled by default, because the feature is experimental and is subject to change without notice.
Returns [`JobArtifactRegistryConnection`](#jobartifactregistryconnection).
This field returns a [connection](#connections). It accepts the
four standard [pagination arguments](#connection-pagination-arguments):
`before: String`, `after: String`, `first: Int`, `last: Int`.
###### Arguments
| Name | Type | Description |
| ---- | ---- | ----------- |
| <a id="geonodejobartifactregistriesids"></a>`ids` | [`[ID!]`](#id) | Filters registries by their ID. |
##### `GeoNode.lfsObjectRegistries` ##### `GeoNode.lfsObjectRegistries`
Find LFS object registries on this Geo node. Find LFS object registries on this Geo node.
...@@ -11970,6 +12009,23 @@ four standard [pagination arguments](#connection-pagination-arguments): ...@@ -11970,6 +12009,23 @@ four standard [pagination arguments](#connection-pagination-arguments):
| <a id="jirauserjiradisplayname"></a>`jiraDisplayName` | [`String!`](#string) | Display name of the Jira user. | | <a id="jirauserjiradisplayname"></a>`jiraDisplayName` | [`String!`](#string) | Display name of the Jira user. |
| <a id="jirauserjiraemail"></a>`jiraEmail` | [`String`](#string) | Email of the Jira user, returned only for users with public emails. | | <a id="jirauserjiraemail"></a>`jiraEmail` | [`String`](#string) | Email of the Jira user, returned only for users with public emails. |
### `JobArtifactRegistry`
Represents the Geo replication and verification state of a job_artifact.
#### Fields
| Name | Type | Description |
| ---- | ---- | ----------- |
| <a id="jobartifactregistryartifactid"></a>`artifactId` | [`ID!`](#id) | ID of the Job Artifact. |
| <a id="jobartifactregistrycreatedat"></a>`createdAt` | [`Time`](#time) | Timestamp when the JobArtifactRegistry was created. |
| <a id="jobartifactregistryid"></a>`id` | [`ID!`](#id) | ID of the JobArtifactRegistry. |
| <a id="jobartifactregistrylastsyncfailure"></a>`lastSyncFailure` | [`String`](#string) | Error message during sync of the JobArtifactRegistry. |
| <a id="jobartifactregistrylastsyncedat"></a>`lastSyncedAt` | [`Time`](#time) | Timestamp of the most recent successful sync of the JobArtifactRegistry. |
| <a id="jobartifactregistryretryat"></a>`retryAt` | [`Time`](#time) | Timestamp after which the JobArtifactRegistry should be resynced. |
| <a id="jobartifactregistryretrycount"></a>`retryCount` | [`Int`](#int) | Number of consecutive failed sync attempts of the JobArtifactRegistry. |
| <a id="jobartifactregistrystate"></a>`state` | [`RegistryState`](#registrystate) | Sync state of the JobArtifactRegistry. |
### `JobPermissions` ### `JobPermissions`
#### Fields #### Fields
# frozen_string_literal: true
module Geo
class JobArtifactLegacyRegistryFinder < FileRegistryFinder
def registry_class
Geo::JobArtifactRegistry
end
end
end
...@@ -2,8 +2,6 @@ ...@@ -2,8 +2,6 @@
module Geo module Geo
class JobArtifactRegistryFinder < FileRegistryFinder class JobArtifactRegistryFinder < FileRegistryFinder
def registry_class include FrameworkRegistryFinder
Geo::JobArtifactRegistry
end
end end
end end
# frozen_string_literal: true
module Resolvers
module Geo
class JobArtifactRegistriesResolver < BaseResolver
type ::Types::Geo::GeoNodeType.connection_type, null: true
include RegistriesResolver
end
end
end
...@@ -58,6 +58,11 @@ module Types ...@@ -58,6 +58,11 @@ module Types
null: true, null: true,
resolver: ::Resolvers::Geo::UploadRegistriesResolver, resolver: ::Resolvers::Geo::UploadRegistriesResolver,
description: 'Find Upload registries on this Geo node' description: 'Find Upload registries on this Geo node'
field :job_artifact_registries, ::Types::Geo::JobArtifactRegistryType.connection_type,
null: true,
resolver: ::Resolvers::Geo::JobArtifactRegistriesResolver,
description: 'Find Job Artifact registries on this Geo node',
feature_flag: :geo_job_artifact_replication
end end
end end
end end
# frozen_string_literal: true
module Types
module Geo
# rubocop:disable Graphql/AuthorizeTypes because it is included
class JobArtifactRegistryType < BaseObject
include ::Types::Geo::RegistryType
graphql_name 'JobArtifactRegistry'
description 'Represents the Geo replication and verification state of a job_artifact.'
field :artifact_id, GraphQL::Types::ID, null: false, description: 'ID of the Job Artifact.'
end
end
end
...@@ -197,14 +197,6 @@ module EE ...@@ -197,14 +197,6 @@ module EE
name: 'wiki', name: 'wiki',
name_plural: 'wikis' name_plural: 'wikis'
}, },
{
data_type: 'blob',
data_type_title: _('File'),
title: _('Job artifact'),
title_plural: _('Job artifacts'),
name: 'job_artifact',
name_plural: 'job_artifacts'
},
{ {
data_type: 'blob', data_type: 'blob',
data_type_title: _('File'), data_type_title: _('File'),
...@@ -224,6 +216,17 @@ module EE ...@@ -224,6 +216,17 @@ module EE
} }
] ]
if ::Geo::JobArtifactReplicator.disabled?
replicable_types.insert(2, {
data_type: 'blob',
data_type_title: _('File'),
title: _('Job artifact'),
title_plural: _('Job artifacts'),
name: 'job_artifact',
name_plural: 'job_artifacts'
})
end
# Adds all the SSF Data Types automatically # Adds all the SSF Data Types automatically
enabled_replicator_classes.each do |replicator_class| enabled_replicator_classes.each do |replicator_class|
replicable_types.push( replicable_types.push(
......
...@@ -12,6 +12,13 @@ module EE ...@@ -12,6 +12,13 @@ module EE
SECURITY_REPORT_FILE_TYPES = %w[sast secret_detection dependency_scanning container_scanning cluster_image_scanning dast coverage_fuzzing api_fuzzing].freeze SECURITY_REPORT_FILE_TYPES = %w[sast secret_detection dependency_scanning container_scanning cluster_image_scanning dast coverage_fuzzing api_fuzzing].freeze
prepended do prepended do
include ::Geo::ReplicableModel
include ::Geo::VerifiableModel
with_replicator ::Geo::JobArtifactReplicator
has_one :job_artifact_state, autosave: false, inverse_of: :job_artifact, class_name: '::Geo::JobArtifactState'
# After destroy callbacks are often skipped because of FastDestroyAll. # After destroy callbacks are often skipped because of FastDestroyAll.
# All destroy callbacks should be implemented in `Ci::JobArtifacts::DestroyBatchService` # All destroy callbacks should be implemented in `Ci::JobArtifacts::DestroyBatchService`
# See https://gitlab.com/gitlab-org/gitlab/-/issues/297472 # See https://gitlab.com/gitlab-org/gitlab/-/issues/297472
...@@ -66,7 +73,26 @@ module EE ...@@ -66,7 +73,26 @@ module EE
with_file_types(API_FUZZING_REPORT_TYPES) with_file_types(API_FUZZING_REPORT_TYPES)
end end
scope :with_files_stored_locally, -> { where(file_store: ::ObjectStorage::Store::LOCAL) }
scope :with_files_stored_remotely, -> { where(file_store: ::ObjectStorage::Store::REMOTE) }
scope :with_verification_state, ->(state) { joins(:job_artifact_state).where(verification_arel_table[:verification_state].eq(verification_state_value(state))) }
scope :checksummed, -> { joins(:job_artifact_state).where.not(verification_arel_table[:verification_checksum].eq(nil)) }
scope :not_checksummed, -> { joins(:job_artifact_state).where(verification_arel_table[:verification_checksum].eq(nil)) }
scope :available_verifiables, -> { joins(:job_artifact_state) }
delegate :validate_schema?, to: :job delegate :validate_schema?, to: :job
delegate :verification_retry_at, :verification_retry_at=,
:verified_at, :verified_at=,
:verification_checksum, :verification_checksum=,
:verification_failure, :verification_failure=,
:verification_retry_count, :verification_retry_count=,
:verification_state=, :verification_state,
:verification_started_at=, :verification_started_at,
to: :job_artifact_state
after_save :save_verification_details
end end
class_methods do class_methods do
...@@ -79,6 +105,19 @@ module EE ...@@ -79,6 +105,19 @@ module EE
super super
end end
override :verification_state_table_class
def verification_state_table_class
::Geo::JobArtifactState
end
end
def job_artifact_state
super || build_job_artifact_state
end
def verification_state_object
job_artifact_state
end end
def log_geo_deleted_event def log_geo_deleted_event
......
...@@ -2,14 +2,18 @@ ...@@ -2,14 +2,18 @@
class Geo::JobArtifactRegistry < Geo::BaseRegistry class Geo::JobArtifactRegistry < Geo::BaseRegistry
include Geo::Syncable include Geo::Syncable
include ::Geo::ReplicableRegistry
include ::Geo::VerifiableRegistry
MODEL_CLASS = ::Ci::JobArtifact MODEL_CLASS = ::Ci::JobArtifact
MODEL_FOREIGN_KEY = :artifact_id MODEL_FOREIGN_KEY = :artifact_id
belongs_to :job_artifact, class_name: 'Ci::JobArtifact', foreign_key: :artifact_id
# When false, RegistryConsistencyService will frequently check the end of the # When false, RegistryConsistencyService will frequently check the end of the
# table to quickly handle new replicables. # table to quickly handle new replicables.
def self.has_create_events? def self.has_create_events?
false ::Geo::JobArtifactReplicator.enabled?
end end
# TODO: remove once `success` column has a default value set # TODO: remove once `success` column has a default value set
...@@ -31,4 +35,42 @@ class Geo::JobArtifactRegistry < Geo::BaseRegistry ...@@ -31,4 +35,42 @@ class Geo::JobArtifactRegistry < Geo::BaseRegistry
def self.delete_worker_class def self.delete_worker_class
::Geo::FileRegistryRemovalWorker ::Geo::FileRegistryRemovalWorker
end end
# TODO Remove this when enabling geo_job_artifact_replication by default
override :registry_consistency_worker_enabled?
def self.registry_consistency_worker_enabled?
true
end
def self.failed
if ::Geo::JobArtifactReplicator.enabled?
with_state(:failed)
else
where(success: false).where.not(retry_count: nil)
end
end
def self.never_attempted_sync
if ::Geo::JobArtifactReplicator.enabled?
pending.where(last_synced_at: nil)
else
where(success: false, retry_count: nil)
end
end
def self.retry_due
if ::Geo::JobArtifactReplicator.enabled?
where(arel_table[:retry_at].eq(nil).or(arel_table[:retry_at].lt(Time.current)))
else
where('retry_at is NULL OR retry_at < ?', Time.current)
end
end
def self.synced
if ::Geo::JobArtifactReplicator.enabled?
with_state(:synced).or(where(success: true))
else
where(success: true)
end
end
end end
# frozen_string_literal: true
module Geo
class JobArtifactState < Ci::ApplicationRecord
include EachBatch
self.primary_key = :job_artifact_id
belongs_to :job_artifact, inverse_of: :job_artifact_state, class_name: 'Ci::JobArtifact'
end
end
...@@ -40,8 +40,8 @@ class GeoNodeStatus < ApplicationRecord ...@@ -40,8 +40,8 @@ class GeoNodeStatus < ApplicationRecord
"#{replicable_class.replicable_name_plural}_checksummed_count".to_sym => "Number of #{replicable_class.replicable_title_plural} checksummed on the primary", "#{replicable_class.replicable_name_plural}_checksummed_count".to_sym => "Number of #{replicable_class.replicable_title_plural} checksummed on the primary",
"#{replicable_class.replicable_name_plural}_checksum_failed_count".to_sym => "Number of #{replicable_class.replicable_title_plural} failed to checksum on primary", "#{replicable_class.replicable_name_plural}_checksum_failed_count".to_sym => "Number of #{replicable_class.replicable_title_plural} failed to checksum on primary",
"#{replicable_class.replicable_name_plural}_synced_count".to_sym => "Number of #{replicable_class.replicable_title_plural} in the registry", "#{replicable_class.replicable_name_plural}_synced_count".to_sym => "Number of #{replicable_class.replicable_title_plural} in the registry",
"#{replicable_class.replicable_name_plural}_failed_count".to_sym => "Number of #{replicable_class.replicable_title_plural} synced on secondary", "#{replicable_class.replicable_name_plural}_failed_count".to_sym => "Number of #{replicable_class.replicable_title_plural} failed on secondary",
"#{replicable_class.replicable_name_plural}_registry_count".to_sym => "Number of #{replicable_class.replicable_title_plural} failed to sync on secondary", "#{replicable_class.replicable_name_plural}_registry_count".to_sym => "Number of #{replicable_class.replicable_title_plural} synced to sync on secondary",
"#{replicable_class.replicable_name_plural}_verification_total_count".to_sym => "Number of #{replicable_class.replicable_title_plural} available to verify on secondary", "#{replicable_class.replicable_name_plural}_verification_total_count".to_sym => "Number of #{replicable_class.replicable_title_plural} available to verify on secondary",
"#{replicable_class.replicable_name_plural}_verified_count".to_sym => "Number of #{replicable_class.replicable_title_plural} verified on the secondary", "#{replicable_class.replicable_name_plural}_verified_count".to_sym => "Number of #{replicable_class.replicable_title_plural} verified on the secondary",
"#{replicable_class.replicable_name_plural}_verification_failed_count".to_sym => "Number of #{replicable_class.replicable_title_plural} failed to verify on secondary" "#{replicable_class.replicable_name_plural}_verification_failed_count".to_sym => "Number of #{replicable_class.replicable_title_plural} failed to verify on secondary"
...@@ -67,9 +67,6 @@ class GeoNodeStatus < ApplicationRecord ...@@ -67,9 +67,6 @@ class GeoNodeStatus < ApplicationRecord
wikis_synced_count wikis_synced_count
wikis_failed_count wikis_failed_count
job_artifacts_replication_enabled job_artifacts_replication_enabled
job_artifacts_count
job_artifacts_synced_count
job_artifacts_failed_count
repositories_verified_count repositories_verified_count
repositories_verification_failed_count repositories_verification_failed_count
repositories_verification_total_count repositories_verification_total_count
...@@ -127,10 +124,6 @@ class GeoNodeStatus < ApplicationRecord ...@@ -127,10 +124,6 @@ class GeoNodeStatus < ApplicationRecord
wikis_verification_failed_count: 'Number of wikis failed to verify on secondary', wikis_verification_failed_count: 'Number of wikis failed to verify on secondary',
wikis_checksum_mismatch_count: 'Number of wikis that checksum mismatch on secondary', wikis_checksum_mismatch_count: 'Number of wikis that checksum mismatch on secondary',
job_artifacts_replication_enabled: 'Boolean denoting if replication is enabled for Job Artifacts', job_artifacts_replication_enabled: 'Boolean denoting if replication is enabled for Job Artifacts',
job_artifacts_count: 'Total number of syncable job artifacts available on primary',
job_artifacts_synced_count: 'Number of syncable job artifacts synced on secondary',
job_artifacts_failed_count: 'Number of syncable job artifacts failed to sync on secondary',
job_artifacts_registry_count: 'Number of job artifacts in the registry',
job_artifacts_synced_missing_on_primary_count: 'Number of job artifacts marked as synced due to the file missing on the primary', job_artifacts_synced_missing_on_primary_count: 'Number of job artifacts marked as synced due to the file missing on the primary',
replication_slots_count: 'Total number of replication slots on the primary', replication_slots_count: 'Total number of replication slots on the primary',
replication_slots_used_count: 'Number of replication slots in use on the primary', replication_slots_used_count: 'Number of replication slots in use on the primary',
...@@ -377,7 +370,6 @@ class GeoNodeStatus < ApplicationRecord ...@@ -377,7 +370,6 @@ class GeoNodeStatus < ApplicationRecord
attr_in_percentage :wikis_synced, :wikis_synced_count, :wikis_count attr_in_percentage :wikis_synced, :wikis_synced_count, :wikis_count
attr_in_percentage :wikis_checksummed, :wikis_checksummed_count, :wikis_count attr_in_percentage :wikis_checksummed, :wikis_checksummed_count, :wikis_count
attr_in_percentage :wikis_verified, :wikis_verified_count, :wikis_count attr_in_percentage :wikis_verified, :wikis_verified_count, :wikis_count
attr_in_percentage :job_artifacts_synced, :job_artifacts_synced_count, :job_artifacts_count
attr_in_percentage :replication_slots_used, :replication_slots_used_count, :replication_slots_count attr_in_percentage :replication_slots_used, :replication_slots_used_count, :replication_slots_count
attr_in_percentage :container_repositories_synced, :container_repositories_synced_count, :container_repositories_count attr_in_percentage :container_repositories_synced, :container_repositories_synced_count, :container_repositories_count
attr_in_percentage :design_repositories_synced, :design_repositories_synced_count, :design_repositories_count attr_in_percentage :design_repositories_synced, :design_repositories_synced_count, :design_repositories_count
...@@ -468,6 +460,7 @@ class GeoNodeStatus < ApplicationRecord ...@@ -468,6 +460,7 @@ class GeoNodeStatus < ApplicationRecord
def load_job_artifacts_data def load_job_artifacts_data
return unless job_artifacts_replication_enabled return unless job_artifacts_replication_enabled
return if ::Geo::JobArtifactReplicator.enabled?
self.job_artifacts_count = job_artifacts_finder.registry_count self.job_artifacts_count = job_artifacts_finder.registry_count
self.job_artifacts_synced_count = job_artifacts_finder.synced_count self.job_artifacts_synced_count = job_artifacts_finder.synced_count
...@@ -576,7 +569,7 @@ class GeoNodeStatus < ApplicationRecord ...@@ -576,7 +569,7 @@ class GeoNodeStatus < ApplicationRecord
end end
def job_artifacts_finder def job_artifacts_finder
@job_artifacts_finder ||= Geo::JobArtifactRegistryFinder.new @job_artifacts_finder ||= Geo::JobArtifactLegacyRegistryFinder.new
end end
def container_registry_finder def container_registry_finder
......
# frozen_string_literal: true
module Geo
class JobArtifactReplicator < Gitlab::Geo::Replicator
include ::Geo::BlobReplicatorStrategy
extend ::Gitlab::Utils::Override
def self.model
::Ci::JobArtifact
end
def carrierwave_uploader
model_record.file
end
# The feature flag follows the format `geo_#{replicable_name}_replication`,
# so here it would be `geo_job_artifact_replication`
def self.replication_enabled_by_default?
false
end
override :verification_feature_flag_enabled?
def self.verification_feature_flag_enabled?
# We are adding verification at the same time as replication, so we
# don't need to toggle verification separately from replication. When
# the replication feature flag is off, then verification is also off
# (see `VerifiableReplicator.verification_enabled?`)
true
end
end
end
...@@ -51,7 +51,7 @@ module Geo ...@@ -51,7 +51,7 @@ module Geo
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def file_registry def file_registry
strong_memoize(:file_registry) do strong_memoize(:file_registry) do
if job_artifact? if job_artifact? && ::Geo::JobArtifactReplicator.disabled?
::Geo::JobArtifactRegistry.find_by(artifact_id: object_db_id) ::Geo::JobArtifactRegistry.find_by(artifact_id: object_db_id)
elsif replicator elsif replicator
replicator.registry replicator.registry
......
# frozen_string_literal: true # frozen_string_literal: true
module Geo module Geo
#
# This Worker is deprecated and it only handles the Job Artifacts now
#
class FileDownloadDispatchWorker < Geo::Scheduler::Secondary::SchedulerWorker # rubocop:disable Scalability/IdempotentWorker class FileDownloadDispatchWorker < Geo::Scheduler::Secondary::SchedulerWorker # rubocop:disable Scalability/IdempotentWorker
# rubocop:disable Scalability/CronWorkerContext # rubocop:disable Scalability/CronWorkerContext
# This worker does not perform work scoped to a context # This worker does not perform work scoped to a context
include CronjobQueue include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext # rubocop:enable Scalability/CronWorkerContext
def perform
if ::Geo::JobArtifactReplicator.enabled?
log_info('JobArtifact replication is handled by Geo self service framework')
return
end
super
end
private private
# Cannot utilise backoff because there are no events currently being # Cannot utilise backoff because there are no events currently being
...@@ -26,6 +38,8 @@ module Geo ...@@ -26,6 +38,8 @@ module Geo
end end
def schedule_job(object_type, object_db_id) def schedule_job(object_type, object_db_id)
return if ::Geo::JobArtifactReplicator.enabled?
job_id = FileDownloadWorker.with_status.perform_async(object_type.to_s, object_db_id) job_id = FileDownloadWorker.with_status.perform_async(object_type.to_s, object_db_id)
{ id: object_db_id, type: object_type, job_id: job_id } if job_id { id: object_db_id, type: object_type, job_id: job_id } if job_id
......
...@@ -8,7 +8,7 @@ module Geo ...@@ -8,7 +8,7 @@ module Geo
FILE_SERVICE_OBJECT_TYPE = :job_artifact FILE_SERVICE_OBJECT_TYPE = :job_artifact
def registry_finder def registry_finder
@registry_finder ||= Geo::JobArtifactRegistryFinder.new @registry_finder ||= Geo::JobArtifactLegacyRegistryFinder.new
end end
end end
end end
......
---
name: geo_job_artifact_replication
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/issues/327400
rollout_issue_url:
milestone: '14.8'
type: development
group: group::geo
default_enabled: false
# frozen_string_literal: true
class PrepareJobArtifactRegistryForSsf < Gitlab::Database::Migration[1.0]
def change
change_column_default :job_artifact_registry, :retry_count, from: nil, to: 0
add_column :job_artifact_registry, :last_synced_at, :datetime_with_timezone
add_column :job_artifact_registry, :verified_at, :datetime_with_timezone
add_column :job_artifact_registry, :verification_started_at, :datetime_with_timezone
add_column :job_artifact_registry, :verification_retry_at, :datetime_with_timezone
add_column :job_artifact_registry, :state, :integer, null: false, limit: 2, default: 0
add_column :job_artifact_registry, :verification_state, :integer, default: 0, null: false, limit: 2
add_column :job_artifact_registry, :verification_retry_count, :integer, default: 0, limit: 2, null: false
add_column :job_artifact_registry, :verification_checksum, :binary
add_column :job_artifact_registry, :verification_checksum_mismatched, :binary
add_column :job_artifact_registry, :checksum_mismatch, :boolean, default: false, null: false
add_column :job_artifact_registry, :verification_failure, :string, limit: 255 # rubocop:disable Migration/PreventStrings see https://gitlab.com/gitlab-org/gitlab/-/issues/323806
add_column :job_artifact_registry, :last_sync_failure, :string, limit: 255 # rubocop:disable Migration/PreventStrings see https://gitlab.com/gitlab-org/gitlab/-/issues/323806
end
end
# frozen_string_literal: true
class PrepareJobArtifactRegistryForSsfIndecies < Gitlab::Database::Migration[1.0]
disable_ddl_transaction!
def up
add_concurrent_index :job_artifact_registry, :verification_retry_at, name: :job_artifact_registry_failed_verification, order: "NULLS FIRST", where: "((state = 2) AND (verification_state = 3))"
add_concurrent_index :job_artifact_registry, :verification_state, name: :job_artifact_registry_needs_verification, where: "((state = 2) AND (verification_state = ANY (ARRAY[0, 3])))"
add_concurrent_index :job_artifact_registry, :verified_at, name: :job_artifact_registry_pending_verification, order: "NULLS FIRST", where: "((state = 2) AND (verification_state = 0))"
end
def down
remove_concurrent_index :job_artifact_registry, :verification_retry_at, name: :job_artifact_registry_failed_verification
remove_concurrent_index :job_artifact_registry, :verification_state, name: :job_artifact_registry_needs_verification
remove_concurrent_index :job_artifact_registry, :verified_at, name: :job_artifact_registry_pending_verification
end
end
# frozen_string_literal: true
class MigrateJobArtifactRegistry < Gitlab::Database::Migration[1.0]
MIGRATION = 'MigrateJobArtifactRegistryToSsf'
DELAY_INTERVAL = 2.minutes
BATCH_SIZE = 5_000
disable_ddl_transaction!
class JobArtifactRegistry < Geo::TrackingBase
include EachBatch
self.table_name = 'job_artifact_registry'
end
def up
queue_background_migration_jobs_by_range_at_intervals(JobArtifactRegistry, MIGRATION, DELAY_INTERVAL, batch_size: BATCH_SIZE)
end
def down
# no-op
end
end
418ae4a74ff25c09629196916400eeebe5795d1c82918d09aaaa040d228ecae8
\ No newline at end of file
0eaee99b45bf4f98933b1a04cf92207ccadef29d89388049efe44a19a0c34c4a
\ No newline at end of file
ec84e9ecbed7fdd59541ebe7c374abba3d4e657bb6cd9cab1920421e8b0e349a
\ No newline at end of file
...@@ -123,10 +123,22 @@ CREATE TABLE job_artifact_registry ( ...@@ -123,10 +123,22 @@ CREATE TABLE job_artifact_registry (
retry_at timestamp with time zone, retry_at timestamp with time zone,
bytes bigint, bytes bigint,
artifact_id integer, artifact_id integer,
retry_count integer, retry_count integer DEFAULT 0,
success boolean, success boolean,
sha256 character varying, sha256 character varying,
missing_on_primary boolean DEFAULT false NOT NULL missing_on_primary boolean DEFAULT false NOT NULL,
state smallint DEFAULT 0 NOT NULL,
last_synced_at timestamp with time zone,
last_sync_failure character varying(255),
verified_at timestamp with time zone,
verification_started_at timestamp with time zone,
verification_retry_at timestamp with time zone,
verification_state smallint DEFAULT 0 NOT NULL,
verification_retry_count smallint DEFAULT 0 NOT NULL,
verification_checksum bytea,
verification_checksum_mismatched bytea,
checksum_mismatch boolean DEFAULT false NOT NULL,
verification_failure character varying(255)
); );
CREATE SEQUENCE job_artifact_registry_id_seq CREATE SEQUENCE job_artifact_registry_id_seq
...@@ -622,6 +634,12 @@ CREATE INDEX lfs_object_registry_needs_verification ON lfs_object_registry USING ...@@ -622,6 +634,12 @@ CREATE INDEX lfs_object_registry_needs_verification ON lfs_object_registry USING
CREATE INDEX lfs_object_registry_pending_verification ON lfs_object_registry USING btree (verified_at NULLS FIRST) WHERE ((state = 2) AND (verification_state = 0)); CREATE INDEX lfs_object_registry_pending_verification ON lfs_object_registry USING btree (verified_at NULLS FIRST) WHERE ((state = 2) AND (verification_state = 0));
CREATE INDEX job_artifact_registry_failed_verification ON job_artifact_registry USING btree (verification_retry_at NULLS FIRST) WHERE ((state = 2) AND (verification_state = 3));
CREATE INDEX job_artifact_registry_needs_verification ON job_artifact_registry USING btree (verification_state) WHERE ((state = 2) AND (verification_state = ANY (ARRAY[0, 3])));
CREATE INDEX job_artifact_registry_pending_verification ON job_artifact_registry USING btree (verified_at NULLS FIRST) WHERE ((state = 2) AND (verification_state = 0));
CREATE INDEX merge_request_diff_registry_failed_verification ON merge_request_diff_registry USING btree (verification_retry_at NULLS FIRST) WHERE ((state = 2) AND (verification_state = 3)); CREATE INDEX merge_request_diff_registry_failed_verification ON merge_request_diff_registry USING btree (verification_retry_at NULLS FIRST) WHERE ((state = 2) AND (verification_state = 3));
CREATE INDEX merge_request_diff_registry_needs_verification ON merge_request_diff_registry USING btree (verification_state) WHERE ((state = 2) AND (verification_state = ANY (ARRAY[0, 3]))); CREATE INDEX merge_request_diff_registry_needs_verification ON merge_request_diff_registry USING btree (verification_state) WHERE ((state = 2) AND (verification_state = ANY (ARRAY[0, 3])));
......
...@@ -35,7 +35,7 @@ module EE ...@@ -35,7 +35,7 @@ module EE
expose :db_replication_lag_seconds expose :db_replication_lag_seconds
expose :job_artifacts_replication_enabled expose :job_artifacts_replication_enabled, if: -> (*) { ::Geo::JobArtifactReplicator.disabled? }
expose :container_repositories_replication_enabled expose :container_repositories_replication_enabled
expose :design_repositories_replication_enabled expose :design_repositories_replication_enabled
expose :repositories_replication_enabled expose :repositories_replication_enabled
......
# frozen_string_literal: true
module EE
module Gitlab
module BackgroundMigration
module MigrateJobArtifactRegistryToSsf
class JobArtifactRegistry < Geo::BaseRegistry
self.table_name = 'job_artifact_registry'
end
def perform(start_id, end_id)
JobArtifactRegistry.where(id: start_id..end_id, success: true).update_all(state: 2)
end
end
end
end
end
...@@ -17,7 +17,7 @@ module EE ...@@ -17,7 +17,7 @@ module EE
end end
def delete_on_geo_secondaries(key) def delete_on_geo_secondaries(key)
Geo::CacheInvalidationEventStore.new(key).create! ::Geo::CacheInvalidationEventStore.new(key).create!
end end
end end
end end
......
...@@ -28,7 +28,8 @@ module Gitlab ...@@ -28,7 +28,8 @@ module Gitlab
::Geo::GroupWikiRepositoryReplicator, ::Geo::GroupWikiRepositoryReplicator,
::Geo::PipelineArtifactReplicator, ::Geo::PipelineArtifactReplicator,
::Geo::PagesDeploymentReplicator, ::Geo::PagesDeploymentReplicator,
::Geo::UploadReplicator ::Geo::UploadReplicator,
::Geo::JobArtifactReplicator
].freeze ].freeze
def self.current_node def self.current_node
......
...@@ -250,6 +250,8 @@ module Gitlab ...@@ -250,6 +250,8 @@ module Gitlab
end end
def print_ci_job_artifacts_status def print_ci_job_artifacts_status
return if ::Geo::JobArtifactReplicator.enabled?
print 'CI job artifacts: '.rjust(GEO_STATUS_COLUMN_WIDTH) print 'CI job artifacts: '.rjust(GEO_STATUS_COLUMN_WIDTH)
show_failed_value(current_node_status.job_artifacts_failed_count) show_failed_value(current_node_status.job_artifacts_failed_count)
print "#{current_node_status.job_artifacts_synced_count}/#{current_node_status.job_artifacts_count} " print "#{current_node_status.job_artifacts_synced_count}/#{current_node_status.job_artifacts_count} "
......
...@@ -12,6 +12,18 @@ FactoryBot.define do ...@@ -12,6 +12,18 @@ FactoryBot.define do
end end
end end
trait :verification_succeeded do
common_security_report # with file
verification_checksum { 'abc' }
verification_state { Ci::JobArtifact.verification_state_value(:verification_succeeded) }
end
trait :verification_failed do
common_security_report # with file
verification_failure { 'Could not calculate the checksum' }
verification_state { Ci::JobArtifact.verification_state_value(:verification_failed) }
end
trait :sast_with_vulnerability_flags do trait :sast_with_vulnerability_flags do
file_type { :sast } file_type { :sast }
file_format { :raw } file_format { :raw }
......
...@@ -2,6 +2,41 @@ ...@@ -2,6 +2,41 @@
FactoryBot.define do FactoryBot.define do
factory :geo_job_artifact_registry, class: 'Geo::JobArtifactRegistry' do factory :geo_job_artifact_registry, class: 'Geo::JobArtifactRegistry' do
association :job_artifact, factory: [:ci_job_artifact, :with_file]
state { Geo::JobArtifactRegistry.state_value(:pending) }
trait :synced do
state { Geo::JobArtifactRegistry.state_value(:synced) }
last_synced_at { 5.days.ago }
end
trait :failed do
state { Geo::JobArtifactRegistry.state_value(:failed) }
last_synced_at { 1.day.ago }
retry_count { 2 }
last_sync_failure { 'Random error' }
end
trait :started do
state { Geo::JobArtifactRegistry.state_value(:started) }
last_synced_at { 1.day.ago }
retry_count { 0 }
end
trait :verification_succeeded do
verification_checksum { 'e079a831cab27bcda7d81cd9b48296d0c3dd92ef' }
verification_state { Geo::JobArtifactRegistry.verification_state_value(:verification_succeeded) }
verified_at { 5.days.ago }
end
trait :orphan do
after(:create) do |registry, _|
Ci::JobArtifact.find(registry.artifact_id).delete
end
end
end
factory :geo_job_artifact_registry_legacy, class: 'Geo::JobArtifactRegistry' do
sequence(:artifact_id) sequence(:artifact_id)
success { true } success { true }
......
# frozen_string_literal: true
FactoryBot.define do
factory :geo_job_artifact_state, class: 'Geo::JobArtifactState' do
job_artifact
trait(:checksummed) do
verification_checksum { 'abc' }
end
trait(:checksum_failure) do
verification_failure { 'Could not calculate the checksum' }
end
end
end
...@@ -7,9 +7,6 @@ FactoryBot.define do ...@@ -7,9 +7,6 @@ FactoryBot.define do
trait :healthy do trait :healthy do
status_message { nil } status_message { nil }
job_artifacts_count { 580 }
job_artifacts_failed_count { 3 }
job_artifacts_synced_count { 577 }
job_artifacts_synced_missing_on_primary_count { 91 } job_artifacts_synced_missing_on_primary_count { 91 }
container_repositories_count { 400 } container_repositories_count { 400 }
container_repositories_registry_count { 203 } container_repositories_registry_count { 203 }
...@@ -65,7 +62,6 @@ FactoryBot.define do ...@@ -65,7 +62,6 @@ FactoryBot.define do
end end
trait :replicated_and_verified do trait :replicated_and_verified do
job_artifacts_failed_count { 0 }
container_repositories_failed_count { 0 } container_repositories_failed_count { 0 }
design_repositories_failed_count { 0 } design_repositories_failed_count { 0 }
repositories_failed_count { 0 } repositories_failed_count { 0 }
...@@ -85,14 +81,12 @@ FactoryBot.define do ...@@ -85,14 +81,12 @@ FactoryBot.define do
wikis_checksum_total_count { 10 } wikis_checksum_total_count { 10 }
wikis_verified_count { 10 } wikis_verified_count { 10 }
wikis_verification_total_count { 10 } wikis_verification_total_count { 10 }
job_artifacts_synced_count { 10 }
replication_slots_used_count { 10 } replication_slots_used_count { 10 }
container_repositories_synced_count { 10 } container_repositories_synced_count { 10 }
design_repositories_synced_count { 10 } design_repositories_synced_count { 10 }
repositories_count { 10 } repositories_count { 10 }
wikis_count { 10 } wikis_count { 10 }
job_artifacts_count { 10 }
replication_slots_count { 10 } replication_slots_count { 10 }
container_repositories_count { 10 } container_repositories_count { 10 }
design_repositories_count { 10 } design_repositories_count { 10 }
......
...@@ -2,10 +2,12 @@ ...@@ -2,10 +2,12 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Geo::JobArtifactRegistryFinder, :geo do RSpec.describe Geo::JobArtifactLegacyRegistryFinder, :geo do
it_behaves_like 'a file registry finder' do it_behaves_like 'a file registry finder' do
before do before do
stub_artifacts_object_storage stub_artifacts_object_storage
stub_feature_flags(geo_job_artifact_replication: false)
end end
let_it_be(:project) { create(:project) } let_it_be(:project) { create(:project) }
...@@ -19,13 +21,13 @@ RSpec.describe Geo::JobArtifactRegistryFinder, :geo do ...@@ -19,13 +21,13 @@ RSpec.describe Geo::JobArtifactRegistryFinder, :geo do
let!(:replicable_7) { create(:ci_job_artifact, :remote_store, project: project) } let!(:replicable_7) { create(:ci_job_artifact, :remote_store, project: project) }
let!(:replicable_8) { create(:ci_job_artifact, :remote_store, project: project) } let!(:replicable_8) { create(:ci_job_artifact, :remote_store, project: project) }
let_it_be(:registry_1) { create(:geo_job_artifact_registry, :failed, artifact_id: replicable_1.id) } let_it_be(:registry_1) { create(:geo_job_artifact_registry_legacy, :failed, artifact_id: replicable_1.id) }
let_it_be(:registry_2) { create(:geo_job_artifact_registry, artifact_id: replicable_2.id, missing_on_primary: true) } let_it_be(:registry_2) { create(:geo_job_artifact_registry_legacy, artifact_id: replicable_2.id, missing_on_primary: true) }
let_it_be(:registry_3) { create(:geo_job_artifact_registry, :never_synced, artifact_id: replicable_3.id) } let_it_be(:registry_3) { create(:geo_job_artifact_registry_legacy, :never_synced, artifact_id: replicable_3.id) }
let_it_be(:registry_4) { create(:geo_job_artifact_registry, :failed, artifact_id: replicable_4.id) } let_it_be(:registry_4) { create(:geo_job_artifact_registry_legacy, :failed, artifact_id: replicable_4.id) }
let_it_be(:registry_5) { create(:geo_job_artifact_registry, artifact_id: replicable_5.id, missing_on_primary: true, retry_at: 1.day.ago) } let_it_be(:registry_5) { create(:geo_job_artifact_registry_legacy, artifact_id: replicable_5.id, missing_on_primary: true, retry_at: 1.day.ago) }
let!(:registry_6) { create(:geo_job_artifact_registry, :failed, artifact_id: replicable_6.id) } let!(:registry_6) { create(:geo_job_artifact_registry_legacy, :failed, artifact_id: replicable_6.id) }
let!(:registry_7) { create(:geo_job_artifact_registry, :failed, artifact_id: replicable_7.id, missing_on_primary: true) } let!(:registry_7) { create(:geo_job_artifact_registry_legacy, :failed, artifact_id: replicable_7.id, missing_on_primary: true) }
let!(:registry_8) { create(:geo_job_artifact_registry, :never_synced, artifact_id: replicable_8.id) } let!(:registry_8) { create(:geo_job_artifact_registry_legacy, :never_synced, artifact_id: replicable_8.id) }
end end
end end
...@@ -22,6 +22,14 @@ ...@@ -22,6 +22,14 @@
"job_artifacts_failed_count", "job_artifacts_failed_count",
"job_artifacts_synced_count", "job_artifacts_synced_count",
"job_artifacts_synced_missing_on_primary_count", "job_artifacts_synced_missing_on_primary_count",
"job_artifacts_checksum_total_count",
"job_artifacts_checksummed_count",
"job_artifacts_checksum_failed_count",
"job_artifacts_registry_count",
"job_artifacts_verification_total_count",
"job_artifacts_verified_count",
"job_artifacts_verification_failed_count",
"job_artifacts_verified_in_percentage",
"db_replication_lag_seconds", "db_replication_lag_seconds",
"container_repositories_replication_enabled", "container_repositories_replication_enabled",
"container_repositories_count", "container_repositories_count",
...@@ -205,6 +213,14 @@ ...@@ -205,6 +213,14 @@
"job_artifacts_synced_count": { "type": ["integer", "null"] }, "job_artifacts_synced_count": { "type": ["integer", "null"] },
"job_artifacts_synced_missing_on_primary_count": { "type": ["integer", "null"] }, "job_artifacts_synced_missing_on_primary_count": { "type": ["integer", "null"] },
"job_artifacts_synced_in_percentage": { "type": "string" }, "job_artifacts_synced_in_percentage": { "type": "string" },
"job_artifacts_checksum_total_count": { "type": ["integer", "null"] },
"job_artifacts_checksummed_count": { "type": ["integer", "null"] },
"job_artifacts_checksum_failed_count": { "type": ["integer", "null"] },
"job_artifacts_registry_count": { "type": ["integer", "null"] },
"job_artifacts_verification_total_count": { "type": ["integer", "null"] },
"job_artifacts_verified_count": { "type": ["integer", "null"] },
"job_artifacts_verification_failed_count": { "type": ["integer", "null"] },
"job_artifacts_verified_in_percentage": { "type": "string" },
"container_repositories_replication_enabled": { "type": ["boolean", "null"] }, "container_repositories_replication_enabled": { "type": ["boolean", "null"] },
"container_repositories_count": { "type": "integer" }, "container_repositories_count": { "type": "integer" },
"container_repositories_failed_count": { "type": ["integer", "null"] }, "container_repositories_failed_count": { "type": ["integer", "null"] },
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Resolvers::Geo::JobArtifactRegistriesResolver do
it_behaves_like 'a Geo registries resolver', :geo_job_artifact_registry
end
...@@ -15,7 +15,7 @@ RSpec.describe GitlabSchema.types['GeoNode'] do ...@@ -15,7 +15,7 @@ RSpec.describe GitlabSchema.types['GeoNode'] do
package_file_registries snippet_repository_registries package_file_registries snippet_repository_registries
terraform_state_version_registries group_wiki_repository_registries terraform_state_version_registries group_wiki_repository_registries
pages_deployment_registries lfs_object_registries pipeline_artifact_registries pages_deployment_registries lfs_object_registries pipeline_artifact_registries
upload_registries upload_registries job_artifact_registries
] ]
expect(described_class).to have_graphql_fields(*expected_fields) expect(described_class).to have_graphql_fields(*expected_fields)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe GitlabSchema.types['JobArtifactRegistry'] do
it_behaves_like 'a Geo registry type'
it 'has the expected fields (other than those included in RegistryType)' do
expected_fields = %i[artifact_id]
expect(described_class).to have_graphql_fields(*expected_fields).at_least
end
end
...@@ -57,7 +57,7 @@ RSpec.describe EE::API::Entities::GeoNodeStatus do ...@@ -57,7 +57,7 @@ RSpec.describe EE::API::Entities::GeoNodeStatus do
describe '#job_artifacts_synced_in_percentage' do describe '#job_artifacts_synced_in_percentage' do
it 'formats as percentage' do it 'formats as percentage' do
geo_node_status.assign_attributes(job_artifacts_count: 256, geo_node_status.assign_attributes(job_artifacts_registry_count: 256,
job_artifacts_failed_count: 12, job_artifacts_failed_count: 12,
job_artifacts_synced_count: 123) job_artifacts_synced_count: 123)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::BackgroundMigration::MigrateJobArtifactRegistryToSsf, :geo do
let(:registry) { table(:job_artifact_registry) }
let!(:registry1) { registry.create!(artifact_id: 1, success: true, state: 0)}
let!(:registry2) { registry.create!(artifact_id: 2, success: true, state: 0)}
let!(:registry3) { registry.create!(artifact_id: 3, success: true, state: 0)}
subject do
described_class.new.perform(registry1.id, registry3.id)
end
it 'updates registries' do
subject
expect(registry.where(state: 2).count).to eq 3
end
end
...@@ -14,8 +14,8 @@ RSpec.describe Gitlab::Cleanup::OrphanJobArtifactFilesBatch do ...@@ -14,8 +14,8 @@ RSpec.describe Gitlab::Cleanup::OrphanJobArtifactFilesBatch do
let(:max_artifact_id) { Ci::JobArtifact.maximum(:id).to_i } let(:max_artifact_id) { Ci::JobArtifact.maximum(:id).to_i }
let(:orphan_id_1) { max_artifact_id + 1 } let(:orphan_id_1) { max_artifact_id + 1 }
let(:orphan_id_2) { max_artifact_id + 2 } let(:orphan_id_2) { max_artifact_id + 2 }
let!(:orphan_registry_1) { create(:geo_job_artifact_registry, artifact_id: orphan_id_1) } let!(:orphan_registry_1) { create(:geo_job_artifact_registry_legacy, artifact_id: orphan_id_1) }
let!(:orphan_registry_2) { create(:geo_job_artifact_registry, artifact_id: orphan_id_2) } let!(:orphan_registry_2) { create(:geo_job_artifact_registry_legacy, artifact_id: orphan_id_2) }
before do before do
stub_secondary_node stub_secondary_node
...@@ -35,8 +35,8 @@ RSpec.describe Gitlab::Cleanup::OrphanJobArtifactFilesBatch do ...@@ -35,8 +35,8 @@ RSpec.describe Gitlab::Cleanup::OrphanJobArtifactFilesBatch do
context 'with dry run' do context 'with dry run' do
it 'does not remove registries' do it 'does not remove registries' do
create(:geo_job_artifact_registry, :with_artifact, artifact_type: :archive) create(:geo_job_artifact_registry_legacy, :with_artifact, artifact_type: :archive)
create(:geo_job_artifact_registry, :orphan, artifact_type: :archive) create(:geo_job_artifact_registry_legacy, :orphan, artifact_type: :archive)
expect { batch.clean! }.not_to change { Geo::JobArtifactRegistry.count } expect { batch.clean! }.not_to change { Geo::JobArtifactRegistry.count }
expect(batch.geo_registries_count).to eq(2) expect(batch.geo_registries_count).to eq(2)
......
...@@ -34,7 +34,7 @@ RSpec.describe Gitlab::Cleanup::OrphanJobArtifactFiles do ...@@ -34,7 +34,7 @@ RSpec.describe Gitlab::Cleanup::OrphanJobArtifactFiles do
it 'accumulates the number of cleaned Geo registries' do it 'accumulates the number of cleaned Geo registries' do
stub_const("#{described_class.name}::BATCH_SIZE", 2) stub_const("#{described_class.name}::BATCH_SIZE", 2)
create_list(:geo_job_artifact_registry, 3, :orphan, artifact_type: :archive) create_list(:geo_job_artifact_registry, 3, :orphan)
create(:ci_job_artifact, :archive).delete create(:ci_job_artifact, :archive).delete
cleanup.run! cleanup.run!
......
...@@ -14,6 +14,8 @@ RSpec.describe Gitlab::Geo::GeoNodeStatusCheck do ...@@ -14,6 +14,8 @@ RSpec.describe Gitlab::Geo::GeoNodeStatusCheck do
describe '#replication_verification_complete?' do describe '#replication_verification_complete?' do
before do before do
allow(Gitlab.config.geo.registry_replication).to receive(:enabled).and_return(true) allow(Gitlab.config.geo.registry_replication).to receive(:enabled).and_return(true)
stub_feature_flags(geo_job_artifact_replication: false)
end end
context 'with legacy replication' do context 'with legacy replication' do
......
...@@ -11,6 +11,10 @@ RSpec.describe Gitlab::Geo::LogCursor::Events::JobArtifactDeletedEvent, :clean_g ...@@ -11,6 +11,10 @@ RSpec.describe Gitlab::Geo::LogCursor::Events::JobArtifactDeletedEvent, :clean_g
subject { described_class.new(job_artifact_deleted_event, Time.now, logger) } subject { described_class.new(job_artifact_deleted_event, Time.now, logger) }
before do
stub_feature_flags(geo_job_artifact_replication: false)
end
around do |example| around do |example|
Sidekiq::Testing.inline! { example.run } Sidekiq::Testing.inline! { example.run }
end end
...@@ -24,7 +28,7 @@ RSpec.describe Gitlab::Geo::LogCursor::Events::JobArtifactDeletedEvent, :clean_g ...@@ -24,7 +28,7 @@ RSpec.describe Gitlab::Geo::LogCursor::Events::JobArtifactDeletedEvent, :clean_g
describe '#process' do describe '#process' do
context 'with a tracking database entry' do context 'with a tracking database entry' do
before do before do
create(:geo_job_artifact_registry, artifact_id: job_artifact.id) create(:geo_job_artifact_registry_legacy, artifact_id: job_artifact.id)
end end
context 'with a file' do context 'with a file' do
......
# frozen_string_literal: true
require 'spec_helper'
require_migration!
RSpec.describe MigrateJobArtifactRegistry do
let(:migration_name) { 'MigrateJobArtifactRegistryToSsf' }
let(:registry) { table(:job_artifact_registry) }
let!(:registry1) { registry.create!(artifact_id: 1, success: true, state: 0)}
let!(:registry2) { registry.create!(artifact_id: 2, success: true, state: 0) }
let!(:registry3) { registry.create!(artifact_id: 3, success: true, state: 0) }
let!(:registry4) { registry.create!(artifact_id: 4, success: true, state: 0) }
before do
stub_const("#{described_class.name}::BATCH_SIZE", 2)
end
it 'correctly schedules background migrations' do
Sidekiq::Testing.fake! do
freeze_time do
migrate!
expect(BackgroundMigrationWorker.jobs.size).to eq(2)
expect(described_class::MIGRATION).to be_scheduled_migration_with_multiple_args(registry1.id, registry2.id)
expect(described_class::MIGRATION).to be_scheduled_migration_with_multiple_args(registry3.id, registry4.id)
end
end
end
end
...@@ -6,6 +6,15 @@ RSpec.describe Ci::JobArtifact do ...@@ -6,6 +6,15 @@ RSpec.describe Ci::JobArtifact do
using RSpec::Parameterized::TableSyntax using RSpec::Parameterized::TableSyntax
include EE::GeoHelpers include EE::GeoHelpers
include_examples 'a replicable model with a separate table for verification state' do
before do
stub_artifacts_object_storage
end
let(:verifiable_model_record) { build(:ci_job_artifact) } # add extra params if needed to make sure the record is included in `available_verifiables`
let(:unverifiable_model_record) { build(:ci_job_artifact, :remote_store) } # add extra params if needed to make sure the record is NOT included in `available_verifiables`
end
it { is_expected.to delegate_method(:validate_schema?).to(:job) } it { is_expected.to delegate_method(:validate_schema?).to(:job) }
describe '#destroy' do describe '#destroy' do
...@@ -17,6 +26,8 @@ RSpec.describe Ci::JobArtifact do ...@@ -17,6 +26,8 @@ RSpec.describe Ci::JobArtifact do
end end
it 'creates a JobArtifactDeletedEvent' do it 'creates a JobArtifactDeletedEvent' do
stub_feature_flags(geo_job_artifact_replication: false)
job_artifact = create(:ee_ci_job_artifact, :archive) job_artifact = create(:ee_ci_job_artifact, :archive)
expect { job_artifact.destroy! }.to change { Geo::JobArtifactDeletedEvent.count }.by(1) expect { job_artifact.destroy! }.to change { Geo::JobArtifactDeletedEvent.count }.by(1)
......
...@@ -164,9 +164,9 @@ RSpec.describe GeoNodeStatus, :geo do ...@@ -164,9 +164,9 @@ RSpec.describe GeoNodeStatus, :geo do
it 'counts synced job artifacts' do it 'counts synced job artifacts' do
# These should be ignored # These should be ignored
create(:geo_upload_registry) create(:geo_upload_registry)
create(:geo_job_artifact_registry, :with_artifact, success: false) create(:geo_job_artifact_registry_legacy, :with_artifact, success: false)
create(:geo_job_artifact_registry, :with_artifact, success: true) create(:geo_job_artifact_registry_legacy, :with_artifact, success: true)
expect(subject.job_artifacts_synced_count).to eq(1) expect(subject.job_artifacts_synced_count).to eq(1)
end end
...@@ -174,11 +174,13 @@ RSpec.describe GeoNodeStatus, :geo do ...@@ -174,11 +174,13 @@ RSpec.describe GeoNodeStatus, :geo do
describe '#job_artifacts_synced_missing_on_primary_count' do describe '#job_artifacts_synced_missing_on_primary_count' do
it 'counts job artifacts marked as synced due to file missing on the primary' do it 'counts job artifacts marked as synced due to file missing on the primary' do
stub_feature_flags(geo_job_artifact_replication: false)
# These should be ignored # These should be ignored
create(:geo_upload_registry, missing_on_primary: true) create(:geo_upload_registry, missing_on_primary: true)
create(:geo_job_artifact_registry, :with_artifact, success: true) create(:geo_job_artifact_registry_legacy, :with_artifact, success: true)
create(:geo_job_artifact_registry, :with_artifact, success: true, missing_on_primary: true) create(:geo_job_artifact_registry_legacy, :with_artifact, success: true, missing_on_primary: true)
expect(subject.job_artifacts_synced_missing_on_primary_count).to eq(1) expect(subject.job_artifacts_synced_missing_on_primary_count).to eq(1)
end end
...@@ -186,10 +188,12 @@ RSpec.describe GeoNodeStatus, :geo do ...@@ -186,10 +188,12 @@ RSpec.describe GeoNodeStatus, :geo do
describe '#job_artifacts_failed_count' do describe '#job_artifacts_failed_count' do
it 'counts failed job artifacts' do it 'counts failed job artifacts' do
stub_feature_flags(geo_job_artifact_replication: false)
# These should be ignored # These should be ignored
create(:geo_upload_registry, :failed) create(:geo_upload_registry, :failed)
create(:geo_job_artifact_registry, :with_artifact, success: true) create(:geo_job_artifact_registry_legacy, :with_artifact, success: true)
create(:geo_job_artifact_registry, :with_artifact, :failed) create(:geo_job_artifact_registry_legacy, :with_artifact, :failed)
expect(subject.job_artifacts_failed_count).to eq(1) expect(subject.job_artifacts_failed_count).to eq(1)
end end
...@@ -202,7 +206,7 @@ RSpec.describe GeoNodeStatus, :geo do ...@@ -202,7 +206,7 @@ RSpec.describe GeoNodeStatus, :geo do
build = create(:ci_build, project: project) build = create(:ci_build, project: project)
job_artifact = create(:ci_job_artifact, job: build) job_artifact = create(:ci_job_artifact, job: build)
create(:geo_job_artifact_registry, success: index.even?, artifact_id: job_artifact.id) create(:geo_job_artifact_registry_legacy, success: index.even?, artifact_id: job_artifact.id)
end end
end end
...@@ -1080,6 +1084,7 @@ RSpec.describe GeoNodeStatus, :geo do ...@@ -1080,6 +1084,7 @@ RSpec.describe GeoNodeStatus, :geo do
Geo::GroupWikiRepositoryReplicator | :group_wiki_repository | :geo_group_wiki_repository_registry Geo::GroupWikiRepositoryReplicator | :group_wiki_repository | :geo_group_wiki_repository_registry
Geo::PagesDeploymentReplicator | :pages_deployment | :geo_pages_deployment_registry Geo::PagesDeploymentReplicator | :pages_deployment | :geo_pages_deployment_registry
Geo::UploadReplicator | :upload | :geo_upload_registry Geo::UploadReplicator | :upload | :geo_upload_registry
Geo::JobArtifactReplicator | :ci_job_artifact | :geo_job_artifact_registry
end end
with_them do with_them do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::JobArtifactReplicator do
let(:model_record) { create(:ci_job_artifact, :with_file) }
include_examples 'a blob replicator'
include_examples 'a verifiable replicator'
end
...@@ -58,4 +58,11 @@ RSpec.describe 'Gets registries' do ...@@ -58,4 +58,11 @@ RSpec.describe 'Gets registries' do
registry_factory: :geo_upload_registry, registry_factory: :geo_upload_registry,
registry_foreign_key_field_name: 'fileId' registry_foreign_key_field_name: 'fileId'
} }
it_behaves_like 'gets registries for', {
field_name: 'jobArtifactRegistries',
registry_class_name: 'JobArtifactRegistry',
registry_factory: :geo_job_artifact_registry,
registry_foreign_key_field_name: 'artifactId'
}
end end
...@@ -11,6 +11,7 @@ RSpec.describe Geo::FileDownloadService do ...@@ -11,6 +11,7 @@ RSpec.describe Geo::FileDownloadService do
before do before do
stub_current_geo_node(secondary) stub_current_geo_node(secondary)
stub_feature_flags(geo_job_artifact_replication: false)
end end
describe '#downloader' do describe '#downloader' do
...@@ -28,7 +29,7 @@ RSpec.describe Geo::FileDownloadService do ...@@ -28,7 +29,7 @@ RSpec.describe Geo::FileDownloadService do
context 'with job_artifacts' do context 'with job_artifacts' do
let!(:geo_job_artifact_registry) do let!(:geo_job_artifact_registry) do
create(:geo_job_artifact_registry, success: false, retry_count: 31, artifact_id: file.id) create(:geo_job_artifact_registry_legacy, success: false, retry_count: 31, artifact_id: file.id)
end end
let(:file) { create(:ci_job_artifact) } let(:file) { create(:ci_job_artifact) }
...@@ -181,7 +182,7 @@ RSpec.describe Geo::FileDownloadService do ...@@ -181,7 +182,7 @@ RSpec.describe Geo::FileDownloadService do
context 'for a registered file that failed to sync' do context 'for a registered file that failed to sync' do
let!(:geo_job_artifact_registry) do let!(:geo_job_artifact_registry) do
create(:geo_job_artifact_registry, success: false, artifact_id: file.id, retry_count: 3, retry_at: 1.hour.ago) create(:geo_job_artifact_registry_legacy, success: false, artifact_id: file.id, retry_count: 3, retry_at: 1.hour.ago)
end end
context 'when the file is successfully downloaded' do context 'when the file is successfully downloaded' do
......
...@@ -59,9 +59,13 @@ RSpec.describe Geo::FileRegistryRemovalService, :geo do ...@@ -59,9 +59,13 @@ RSpec.describe Geo::FileRegistryRemovalService, :geo do
end end
let!(:job_artifact) { create(:ci_job_artifact, :archive) } let!(:job_artifact) { create(:ci_job_artifact, :archive) }
let!(:registry) { create(:geo_job_artifact_registry, artifact_id: job_artifact.id) } let!(:registry) { create(:geo_job_artifact_registry_legacy, artifact_id: job_artifact.id) }
let!(:file_path) { job_artifact.file.path } let!(:file_path) { job_artifact.file.path }
before do
stub_feature_flags(geo_job_artifact_replication: false)
end
it_behaves_like 'removes artifact' it_behaves_like 'removes artifact'
context 'migrated to object storage' do context 'migrated to object storage' do
......
...@@ -20,7 +20,8 @@ RSpec.describe Geo::RegistryConsistencyService, :geo, :use_clean_rails_memory_st ...@@ -20,7 +20,8 @@ RSpec.describe Geo::RegistryConsistencyService, :geo, :use_clean_rails_memory_st
Geo::DesignRegistry => :project_with_design, Geo::DesignRegistry => :project_with_design,
Geo::MergeRequestDiffRegistry => :external_merge_request_diff, Geo::MergeRequestDiffRegistry => :external_merge_request_diff,
Geo::PackageFileRegistry => :package_file, Geo::PackageFileRegistry => :package_file,
Geo::UploadRegistry => :upload Geo::UploadRegistry => :upload,
Geo::JobArtifactRegistry => :ci_job_artifact
}.fetch(registry_class, default_factory_name) }.fetch(registry_class, default_factory_name)
end end
......
...@@ -205,6 +205,8 @@ RSpec.describe 'geo rake tasks', :geo, :silence_stdout do ...@@ -205,6 +205,8 @@ RSpec.describe 'geo rake tasks', :geo, :silence_stdout do
context 'with SSF LFS replication eneabled' do context 'with SSF LFS replication eneabled' do
it 'prints messages for all the checks' do it 'prints messages for all the checks' do
stub_feature_flags(geo_job_artifact_replication: false)
checks.each do |text| checks.each do |text|
expect { run_rake_task('geo:status') }.to output(text).to_stdout expect { run_rake_task('geo:status') }.to output(text).to_stdout
end end
......
...@@ -30,6 +30,8 @@ RSpec.describe 'Every GitLab uploader' do ...@@ -30,6 +30,8 @@ RSpec.describe 'Every GitLab uploader' do
# When this test starts failing means that we have migrated Geo's handling of uploads to the # When this test starts failing means that we have migrated Geo's handling of uploads to the
# SSF, and we can remove the tests for the file retriever and downloader classes. # SSF, and we can remove the tests for the file retriever and downloader classes.
it 'has some uploads to be migrated' do it 'has some uploads to be migrated' do
stub_feature_flags(geo_job_artifact_replication: false)
expect(object_types - replicable_names).not_to be_empty expect(object_types - replicable_names).not_to be_empty
end end
end end
...@@ -84,8 +86,10 @@ RSpec.describe 'Every GitLab uploader' do ...@@ -84,8 +86,10 @@ RSpec.describe 'Every GitLab uploader' do
end end
def handled_by_ssf?(uploader) def handled_by_ssf?(uploader)
return true if uploads?(uploader)
replicable_name = replicable_name_for(uploader) replicable_name = replicable_name_for(uploader)
replicable_names.include?(replicable_name) || uploads?(uploader) replicable_names.include?(replicable_name) && ::Gitlab::Geo::Replicator.for_replicable_name(replicable_name).enabled?
end end
def uploads?(uploader) def uploads?(uploader)
...@@ -108,7 +112,11 @@ RSpec.describe 'Every GitLab uploader' do ...@@ -108,7 +112,11 @@ RSpec.describe 'Every GitLab uploader' do
end end
def replicable_names def replicable_names
@replicable_names ||= replicators.map(&:replicable_name) @replicable_names ||= begin
replicators
.map(&:replicable_name)
.select {|replicable_name| ::Gitlab::Geo::Replicator.for_replicable_name(replicable_name).enabled? }
end
end end
def replicable_name_for(uploader) def replicable_name_for(uploader)
......
...@@ -19,6 +19,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t ...@@ -19,6 +19,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
WebMock.stub_request(:get, /primary-geo-node/).to_return(status: 200, body: "", headers: {}) WebMock.stub_request(:get, /primary-geo-node/).to_return(status: 200, body: "", headers: {})
allow(Geo::FileDownloadWorker).to receive(:with_status).and_return(Geo::FileDownloadWorker) allow(Geo::FileDownloadWorker).to receive(:with_status).and_return(Geo::FileDownloadWorker)
stub_feature_flags(geo_job_artifact_replication: false)
end end
it 'does not schedule anything when tracking database is not configured' do it 'does not schedule anything when tracking database is not configured' do
...@@ -48,7 +49,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t ...@@ -48,7 +49,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
context 'with job artifacts' do context 'with job artifacts' do
it 'performs Geo::FileDownloadWorker for unsynced job artifacts' do it 'performs Geo::FileDownloadWorker for unsynced job artifacts' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced) registry = create(:geo_job_artifact_registry_legacy, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).to receive(:perform_async) expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy) .with('job_artifact', registry.artifact_id).once.and_return(spy)
...@@ -57,7 +58,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t ...@@ -57,7 +58,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
end end
it 'performs Geo::FileDownloadWorker for failed-sync job artifacts' do it 'performs Geo::FileDownloadWorker for failed-sync job artifacts' do
registry = create(:geo_job_artifact_registry, :with_artifact, :failed) registry = create(:geo_job_artifact_registry_legacy, :with_artifact, :failed)
expect(Geo::FileDownloadWorker).to receive(:perform_async) expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy) .with('job_artifact', registry.artifact_id).once.and_return(spy)
...@@ -66,7 +67,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t ...@@ -66,7 +67,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
end end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts' do it 'does not perform Geo::FileDownloadWorker for synced job artifacts' do
registry = create(:geo_job_artifact_registry, :with_artifact, bytes: 1234, success: true) registry = create(:geo_job_artifact_registry_legacy, :with_artifact, bytes: 1234, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async) expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id) .with('job_artifact', registry.artifact_id)
...@@ -75,7 +76,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t ...@@ -75,7 +76,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
end end
it 'does not perform Geo::FileDownloadWorker for synced job artifacts even with 0 bytes downloaded' do it 'does not perform Geo::FileDownloadWorker for synced job artifacts even with 0 bytes downloaded' do
registry = create(:geo_job_artifact_registry, :with_artifact, bytes: 0, success: true) registry = create(:geo_job_artifact_registry_legacy, :with_artifact, bytes: 0, success: true)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async) expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id) .with('job_artifact', registry.artifact_id)
...@@ -84,7 +85,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t ...@@ -84,7 +85,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
end end
it 'does not retry failed artifacts when retry_at is tomorrow' do it 'does not retry failed artifacts when retry_at is tomorrow' do
registry = create(:geo_job_artifact_registry, :with_artifact, :failed, retry_at: Date.tomorrow) registry = create(:geo_job_artifact_registry_legacy, :with_artifact, :failed, retry_at: Date.tomorrow)
expect(Geo::FileDownloadWorker).not_to receive(:perform_async) expect(Geo::FileDownloadWorker).not_to receive(:perform_async)
.with('job_artifact', registry.artifact_id) .with('job_artifact', registry.artifact_id)
...@@ -93,7 +94,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t ...@@ -93,7 +94,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
end end
it 'retries failed artifacts when retry_at is in the past' do it 'retries failed artifacts when retry_at is in the past' do
registry = create(:geo_job_artifact_registry, :with_artifact, :failed, retry_at: Date.yesterday) registry = create(:geo_job_artifact_registry_legacy, :with_artifact, :failed, retry_at: Date.yesterday)
expect(Geo::FileDownloadWorker).to receive(:perform_async) expect(Geo::FileDownloadWorker).to receive(:perform_async)
.with('job_artifact', registry.artifact_id).once.and_return(spy) .with('job_artifact', registry.artifact_id).once.and_return(spy)
...@@ -103,10 +104,10 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t ...@@ -103,10 +104,10 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
context 'with files missing on the primary that are marked as synced' do context 'with files missing on the primary that are marked as synced' do
let!(:artifact_file_missing_on_primary) { create(:ci_job_artifact) } let!(:artifact_file_missing_on_primary) { create(:ci_job_artifact) }
let!(:artifact_registry) { create(:geo_job_artifact_registry, artifact_id: artifact_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true) } let!(:artifact_registry) { create(:geo_job_artifact_registry_legacy, artifact_id: artifact_file_missing_on_primary.id, bytes: 1234, success: true, missing_on_primary: true) }
it 'retries the files if there is spare capacity' do it 'retries the files if there is spare capacity' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced) registry = create(:geo_job_artifact_registry_legacy, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id) expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id)
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id) expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', artifact_file_missing_on_primary.id)
...@@ -131,7 +132,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t ...@@ -131,7 +132,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
end end
it 'does not retry those files if there is no spare capacity' do it 'does not retry those files if there is no spare capacity' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced) registry = create(:geo_job_artifact_registry_legacy, :with_artifact, :never_synced)
expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice expect(subject).to receive(:db_retrieve_batch_size).and_return(1).twice
expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id) expect(Geo::FileDownloadWorker).to receive(:perform_async).with('job_artifact', registry.artifact_id)
...@@ -140,7 +141,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t ...@@ -140,7 +141,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
end end
it 'does not retry those files if they are already scheduled' do it 'does not retry those files if they are already scheduled' do
registry = create(:geo_job_artifact_registry, :with_artifact, :never_synced) registry = create(:geo_job_artifact_registry_legacy, :with_artifact, :never_synced)
scheduled_jobs = [{ type: 'job_artifact', id: artifact_file_missing_on_primary.id, job_id: 'foo' }] scheduled_jobs = [{ type: 'job_artifact', id: artifact_file_missing_on_primary.id, job_id: 'foo' }]
expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1) expect(subject).to receive(:scheduled_jobs).and_return(scheduled_jobs).at_least(1)
...@@ -171,7 +172,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t ...@@ -171,7 +172,7 @@ RSpec.describe Geo::FileDownloadDispatchWorker, :geo, :use_sql_query_cache_for_t
result_object = double(:result, success: true, bytes_downloaded: 100, primary_missing_file: false) result_object = double(:result, success: true, bytes_downloaded: 100, primary_missing_file: false)
allow_any_instance_of(::Gitlab::Geo::Replication::BaseTransfer).to receive(:download_from_primary).and_return(result_object) allow_any_instance_of(::Gitlab::Geo::Replication::BaseTransfer).to receive(:download_from_primary).and_return(result_object)
create_list(:geo_job_artifact_registry, 6, :with_artifact, :never_synced) create_list(:geo_job_artifact_registry_legacy, 6, :with_artifact, :never_synced)
expect(Geo::FileDownloadWorker).to receive(:perform_async).exactly(6).times.and_call_original expect(Geo::FileDownloadWorker).to receive(:perform_async).exactly(6).times.and_call_original
# For 10 downloads, we expect four database reloads: # For 10 downloads, we expect four database reloads:
......
...@@ -100,6 +100,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo do ...@@ -100,6 +100,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo do
expect(Geo::TerraformStateVersionRegistry.where(terraform_state_version_id: terraform_state_version.id).count).to eq(0) expect(Geo::TerraformStateVersionRegistry.where(terraform_state_version_id: terraform_state_version.id).count).to eq(0)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(0) expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(0)
expect(Geo::PagesDeploymentRegistry.where(pages_deployment: pages_deployment.id).count).to eq(0) expect(Geo::PagesDeploymentRegistry.where(pages_deployment: pages_deployment.id).count).to eq(0)
expect(Geo::JobArtifactRegistry.where(job_artifact: job_artifact.id).count).to eq(0)
subject.perform subject.perform
...@@ -114,6 +115,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo do ...@@ -114,6 +115,7 @@ RSpec.describe Geo::Secondary::RegistryConsistencyWorker, :geo do
expect(Geo::TerraformStateVersionRegistry.where(terraform_state_version_id: terraform_state_version.id).count).to eq(1) expect(Geo::TerraformStateVersionRegistry.where(terraform_state_version_id: terraform_state_version.id).count).to eq(1)
expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(1) expect(Geo::UploadRegistry.where(file_id: upload.id).count).to eq(1)
expect(Geo::PagesDeploymentRegistry.where(pages_deployment: pages_deployment.id).count).to eq(1) expect(Geo::PagesDeploymentRegistry.where(pages_deployment: pages_deployment.id).count).to eq(1)
expect(Geo::JobArtifactRegistry.where(job_artifact: job_artifact.id).count).to eq(1)
end end
context 'when the current Geo node is disabled or primary' do context 'when the current Geo node is disabled or primary' do
......
# frozen_string_literal: true
module Gitlab
module BackgroundMigration
# rubocop: disable Style/Documentation
class MigrateJobArtifactRegistryToSsf
def perform(*job_artifact_ids)
end
end
end
end
Gitlab::BackgroundMigration::MigrateJobArtifactRegistryToSsf.prepend_mod_with('Gitlab::BackgroundMigration::MigrateJobArtifactRegistryToSsf')
...@@ -85,6 +85,7 @@ ci_instance_variables: :gitlab_ci ...@@ -85,6 +85,7 @@ ci_instance_variables: :gitlab_ci
ci_job_artifacts: :gitlab_ci ci_job_artifacts: :gitlab_ci
ci_job_token_project_scope_links: :gitlab_ci ci_job_token_project_scope_links: :gitlab_ci
ci_job_variables: :gitlab_ci ci_job_variables: :gitlab_ci
ci_job_artifact_states: :gitlab_ci
ci_minutes_additional_packs: :gitlab_ci ci_minutes_additional_packs: :gitlab_ci
ci_namespace_monthly_usages: :gitlab_ci ci_namespace_monthly_usages: :gitlab_ci
ci_namespace_mirrors: :gitlab_ci ci_namespace_mirrors: :gitlab_ci
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment