Commit f5bfe560 authored by Jarka Košanová's avatar Jarka Košanová

Merge branch 'refactor-stuck-imports-jobs-worker' into 'master'

Process stuck jira import jobs

See merge request gitlab-org/gitlab!32643
parents 793900ed 6b43af8d
......@@ -5,6 +5,9 @@ module ImportState
extend ActiveSupport::Concern
included do
scope :with_jid, -> { where.not(jid: nil) }
scope :without_jid, -> { where(jid: nil) }
# Refreshes the expiration time of the associated import job ID.
#
# This method can be used by asynchronous importers to refresh the status,
......
......@@ -7,6 +7,7 @@ class JiraImportState < ApplicationRecord
self.table_name = 'jira_imports'
ERROR_MESSAGE_SIZE = 1000 # 1000 characters limit
STATUSES = { initial: 0, scheduled: 1, started: 2, failed: 3, finished: 4 }.freeze
belongs_to :project
......@@ -14,6 +15,7 @@ class JiraImportState < ApplicationRecord
belongs_to :label
scope :by_jira_project_key, -> (jira_project_key) { where(jira_project_key: jira_project_key) }
scope :with_status, ->(statuses) { where(status: statuses) }
validates :project, presence: true
validates :jira_project_key, presence: true
......@@ -25,6 +27,8 @@ class JiraImportState < ApplicationRecord
message: _('Cannot have multiple Jira imports running at the same time')
}
before_save :ensure_error_message_size
alias_method :scheduled_by, :user
state_machine :status, initial: :initial do
......@@ -65,6 +69,13 @@ class JiraImportState < ApplicationRecord
end
end
after_transition any => :failed do |state, transition|
arguments_hash = transition.args.first
error_message = arguments_hash&.dig(:error_message)
state.update_column(:error_message, error_message) if error_message.present?
end
# Supress warning:
# both JiraImportState and its :status machine have defined a different default for "status".
# although both have same value but represented in 2 ways: integer(0) and symbol(:initial)
......@@ -102,4 +113,18 @@ class JiraImportState < ApplicationRecord
def self.finished_imports_count
finished.sum(:imported_issues_count)
end
def mark_as_failed(error_message)
sanitized_message = Gitlab::UrlSanitizer.sanitize(error_message)
do_fail(error_message: error_message)
rescue ActiveRecord::ActiveRecordError => e
Gitlab::AppLogger.error("Error setting import status to failed: #{e.message}. Original error: #{sanitized_message}")
end
private
def ensure_error_message_size
self.error_message = error_message&.truncate(ERROR_MESSAGE_SIZE)
end
end
......@@ -28,8 +28,8 @@ module JiraImport
rescue => ex
# in case project.save! raises an erorr
Gitlab::ErrorTracking.track_exception(ex, project_id: project.id)
jira_import&.do_fail!(error_message: ex.message)
build_error_response(ex.message)
jira_import.do_fail!
end
def build_jira_import
......
......@@ -155,6 +155,14 @@
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:jira_import_stuck_jira_import_jobs
:feature_category: :importers
:has_external_dependencies:
:urgency: :low
:resource_boundary: :cpu
:weight: 1
:idempotent:
:tags: []
- :name: cronjob:namespaces_prune_aggregation_schedules
:feature_category: :source_code_management
:has_external_dependencies:
......
# frozen_string_literal: true
module Gitlab
module Import
module StuckImportJob
extend ActiveSupport::Concern
IMPORT_JOBS_EXPIRATION = 15.hours.seconds.to_i
included do
include ApplicationWorker
# rubocop:disable Scalability/CronWorkerContext
# This worker updates several import states inline and does not schedule
# other jobs. So no context needed
include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext
feature_category :importers
worker_resource_boundary :cpu
end
def perform
stuck_imports_without_jid_count = mark_imports_without_jid_as_failed!
stuck_imports_with_jid_count = mark_imports_with_jid_as_failed!
track_metrics(stuck_imports_with_jid_count, stuck_imports_without_jid_count)
end
private
def track_metrics(with_jid_count, without_jid_count)
raise NotImplementedError
end
def mark_imports_without_jid_as_failed!
enqueued_import_states_without_jid.each do |import_state|
import_state.mark_as_failed(error_message)
end.size
end
def mark_imports_with_jid_as_failed!
jids_and_ids = enqueued_import_states_with_jid.pluck(:jid, :id).to_h # rubocop: disable CodeReuse/ActiveRecord
# Find the jobs that aren't currently running or that exceeded the threshold.
completed_jids = Gitlab::SidekiqStatus.completed_jids(jids_and_ids.keys)
return 0 unless completed_jids.any?
completed_import_state_ids = jids_and_ids.values_at(*completed_jids)
# We select the import states again, because they may have transitioned from
# scheduled/started to finished/failed while we were looking up their Sidekiq status.
completed_import_states = enqueued_import_states_with_jid.id_in(completed_import_state_ids)
completed_import_state_jids = completed_import_states.map { |import_state| import_state.jid }.join(', ')
Gitlab::Import::Logger.info(
message: 'Marked stuck import jobs as failed',
job_ids: completed_import_state_jids
)
completed_import_states.each do |import_state|
import_state.mark_as_failed(error_message)
end.size
end
def enqueued_import_states
raise NotImplementedError
end
def enqueued_import_states_with_jid
enqueued_import_states.with_jid
end
def enqueued_import_states_without_jid
enqueued_import_states.without_jid
end
def error_message
_("Import timed out. Import took longer than %{import_jobs_expiration} seconds") % { import_jobs_expiration: IMPORT_JOBS_EXPIRATION }
end
end
end
end
# frozen_string_literal: true
module Gitlab
module JiraImport
class StuckJiraImportJobsWorker # rubocop:disable Scalability/IdempotentWorker
include Gitlab::Import::StuckImportJob
private
def track_metrics(with_jid_count, without_jid_count)
Gitlab::Metrics.add_event(:stuck_jira_import_jobs,
jira_imports_without_jid_count: with_jid_count,
jira_imports_with_jid_count: without_jid_count)
end
def enqueued_import_states
JiraImportState.with_status([:scheduled, :started])
end
end
end
end
# frozen_string_literal: true
class StuckImportJobsWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
# rubocop:disable Scalability/CronWorkerContext
# This worker updates several import states inline and does not schedule
# other jobs. So no context needed
include CronjobQueue
# rubocop:enable Scalability/CronWorkerContext
include Gitlab::Import::StuckImportJob
feature_category :importers
worker_resource_boundary :cpu
IMPORT_JOBS_EXPIRATION = 15.hours.to_i
def perform
import_state_without_jid_count = mark_import_states_without_jid_as_failed!
import_state_with_jid_count = mark_import_states_with_jid_as_failed!
Gitlab::Metrics.add_event(:stuck_import_jobs,
projects_without_jid_count: import_state_without_jid_count,
projects_with_jid_count: import_state_with_jid_count)
end
IMPORT_JOBS_EXPIRATION = Gitlab::Import::StuckImportJob::IMPORT_JOBS_EXPIRATION
private
def mark_import_states_without_jid_as_failed!
enqueued_import_states_without_jid.each do |import_state|
import_state.mark_as_failed(error_message)
end.count
end
# rubocop: disable CodeReuse/ActiveRecord
def mark_import_states_with_jid_as_failed!
jids_and_ids = enqueued_import_states_with_jid.pluck(:jid, :id).to_h
# Find the jobs that aren't currently running or that exceeded the threshold.
completed_jids = Gitlab::SidekiqStatus.completed_jids(jids_and_ids.keys)
return unless completed_jids.any?
completed_import_state_ids = jids_and_ids.values_at(*completed_jids)
# We select the import states again, because they may have transitioned from
# scheduled/started to finished/failed while we were looking up their Sidekiq status.
completed_import_states = enqueued_import_states_with_jid.where(id: completed_import_state_ids)
completed_import_state_jids = completed_import_states.map { |import_state| import_state.jid }.join(', ')
Gitlab::Import::Logger.info(
message: 'Marked stuck import jobs as failed',
job_ids: completed_import_state_jids
def track_metrics(with_jid_count, without_jid_count)
Gitlab::Metrics.add_event(
:stuck_import_jobs,
projects_without_jid_count: without_jid_count,
projects_with_jid_count: with_jid_count
)
completed_import_states.each do |import_state|
import_state.mark_as_failed(error_message)
end.count
end
# rubocop: enable CodeReuse/ActiveRecord
def enqueued_import_states
ProjectImportState.with_status([:scheduled, :started])
end
# rubocop: disable CodeReuse/ActiveRecord
def enqueued_import_states_with_jid
enqueued_import_states.where.not(jid: nil)
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def enqueued_import_states_without_jid
enqueued_import_states.where(jid: nil)
end
# rubocop: enable CodeReuse/ActiveRecord
def error_message
_("Import timed out. Import took longer than %{import_jobs_expiration} seconds") % { import_jobs_expiration: IMPORT_JOBS_EXPIRATION }
end
end
---
title: Process stuck jira import jobs
merge_request: 32643
author:
type: added
......@@ -454,6 +454,9 @@ Settings.cron_jobs['remove_unreferenced_lfs_objects_worker']['job_class'] = 'Rem
Settings.cron_jobs['stuck_import_jobs_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['stuck_import_jobs_worker']['cron'] ||= '15 * * * *'
Settings.cron_jobs['stuck_import_jobs_worker']['job_class'] = 'StuckImportJobsWorker'
Settings.cron_jobs['jira_import_stuck_jira_import_jobs'] ||= Settingslogic.new({})
Settings.cron_jobs['jira_import_stuck_jira_import_jobs']['cron'] ||= '* 0/15 * * *'
Settings.cron_jobs['jira_import_stuck_jira_import_jobs']['job_class'] = 'Gitlab::JiraImport::StuckJiraImportJobsWorker'
Settings.cron_jobs['stuck_export_jobs_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['stuck_export_jobs_worker']['cron'] ||= '30 * * * *'
Settings.cron_jobs['stuck_export_jobs_worker']['job_class'] = 'StuckExportJobsWorker'
......
# frozen_string_literal: true
class AddErrorMessageColumnToJiraImports < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
unless column_exists?(:jira_imports, :error_message)
add_column :jira_imports, :error_message, :text
end
add_text_limit :jira_imports, :error_message, 1000
end
def down
return unless column_exists?(:jira_imports, :error_message)
remove_column :jira_imports, :error_message
end
end
......@@ -3516,7 +3516,9 @@ CREATE TABLE public.jira_imports (
jid character varying(255),
jira_project_key character varying(255) NOT NULL,
jira_project_name character varying(255) NOT NULL,
scheduled_at timestamp with time zone
scheduled_at timestamp with time zone,
error_message text,
CONSTRAINT check_9ed451c5b1 CHECK ((char_length(error_message) <= 1000))
);
CREATE SEQUENCE public.jira_imports_id_seq
......@@ -13955,6 +13957,7 @@ COPY "schema_migrations" (version) FROM STDIN;
20200515155620
20200518091745
20200518133123
20200519101002
20200519115908
20200519171058
20200519194042
......
......@@ -54,8 +54,8 @@ class StuckImportJobsWorker
IMPORT_JOBS_EXPIRATION = 15.hours.to_i
def perform
import_state_without_jid_count = mark_import_states_without_jid_as_failed!
import_state_with_jid_count = mark_import_states_with_jid_as_failed!
imports_without_jid_count = mark_imports_without_jid_as_failed!
imports_with_jid_count = mark_imports_with_jid_as_failed!
...
```
......
......@@ -26,6 +26,7 @@ describe Gitlab::SidekiqConfig do
queues = described_class.expand_queues(%w[cronjob])
expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('cronjob:jira_import_stuck_jira_import_jobs')
expect(queues).to include('cronjob:stuck_merge_jobs')
end
......
......@@ -76,7 +76,12 @@ describe Gitlab::SidekiqConfig::CliMethods do
describe '.expand_queues' do
let(:worker_queues) do
['cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs', 'post_receive']
[
'cronjob:stuck_import_jobs',
'cronjob:jira_import_stuck_jira_import_jobs',
'cronjob:stuck_merge_jobs',
'post_receive'
]
end
it 'defaults the value of the second argument to .worker_queues' do
......@@ -88,12 +93,12 @@ describe Gitlab::SidekiqConfig::CliMethods do
allow(described_class).to receive(:worker_queues).and_return(worker_queues)
expect(described_class.expand_queues(['cronjob']))
.to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs')
.to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:jira_import_stuck_jira_import_jobs', 'cronjob:stuck_merge_jobs')
end
it 'expands queue namespaces to concrete queue names' do
expect(described_class.expand_queues(['cronjob'], worker_queues))
.to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs')
.to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:jira_import_stuck_jira_import_jobs', 'cronjob:stuck_merge_jobs')
end
it 'lets concrete queue names pass through' do
......
......@@ -19,6 +19,7 @@ describe Gitlab::SidekiqConfig do
expect(queues).to include('post_receive')
expect(queues).to include('merge')
expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('cronjob:jira_import_stuck_jira_import_jobs')
expect(queues).to include('mailers')
expect(queues).to include('default')
end
......
......@@ -17,6 +17,7 @@ describe Gitlab::SidekiqVersioning::Manager do
expect(queues).to include('repository_fork')
expect(queues).to include('cronjob')
expect(queues).to include('cronjob:stuck_import_jobs')
expect(queues).to include('cronjob:jira_import_stuck_jira_import_jobs')
expect(queues).to include('cronjob:stuck_merge_jobs')
expect(queues).to include('unknown')
end
......
......@@ -163,4 +163,39 @@ describe JiraImportState do
end
end
end
context 'ensure error_message size on save' do
let_it_be(:project) { create(:project) }
before do
stub_const('JiraImportState::ERROR_MESSAGE_SIZE', 10)
end
context 'when jira import has no error_message' do
let(:jira_import) { build(:jira_import_state, project: project)}
it 'does not run the callback', :aggregate_failures do
expect { jira_import.save }.to change { JiraImportState.count }.by(1)
expect(jira_import.reload.error_message).to be_nil
end
end
context 'when jira import error_message does not exceed the limit' do
let(:jira_import) { build(:jira_import_state, project: project, error_message: 'error')}
it 'does not run the callback', :aggregate_failures do
expect { jira_import.save }.to change { JiraImportState.count }.by(1)
expect(jira_import.reload.error_message).to eq('error')
end
end
context 'when error_message exceeds limit' do
let(:jira_import) { build(:jira_import_state, project: project, error_message: 'error message longer than the limit')}
it 'truncates error_message to the limit', :aggregate_failures do
expect { jira_import.save! }.to change { JiraImportState.count }.by(1)
expect(jira_import.reload.error_message.size).to eq 10
end
end
end
end
......@@ -45,6 +45,22 @@ describe JiraImport::StartImportService do
it_behaves_like 'responds with error', 'Jira import is already running.'
end
context 'when an error is raised while scheduling import' do
before do
expect_next_instance_of(JiraImportState) do |jira_impport|
expect(jira_impport).to receive(:schedule!).and_raise(Projects::ImportService::Error, 'Unexpected failure.')
end
end
it_behaves_like 'responds with error', 'Unexpected failure.'
it 'saves the error message' do
subject
expect(JiraImportState.last.error_message).to eq('Unexpected failure.')
end
end
context 'when everything is ok' do
it 'returns success response' do
expect(subject).to be_a(ServiceResponse)
......@@ -57,7 +73,7 @@ describe JiraImport::StartImportService do
expect(project.latest_jira_import).to be_scheduled
end
it 'creates Jira import data' do
it 'creates Jira import data', :aggregate_failures do
jira_import = subject.payload[:import_data]
expect(jira_import.jira_project_xid).to eq(0)
......@@ -72,8 +88,8 @@ describe JiraImport::StartImportService do
it 'creates Jira label title with correct number' do
jira_import = subject.payload[:import_data]
label_title = "jira-import::#{jira_import.jira_project_key}-1"
expect(jira_import.label.title).to eq(label_title)
end
end
......@@ -83,8 +99,8 @@ describe JiraImport::StartImportService do
it 'creates Jira label title with correct number' do
jira_import = subject.payload[:import_data]
label_title = "jira-import::#{jira_import.jira_project_key}-4"
expect(jira_import.label.title).to eq(label_title)
end
end
......
# frozen_string_literal: true
shared_examples 'stuck import job detection' do
context 'when the job has completed' do
context 'when the import status was already updated' do
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids) do
import_state.start
import_state.finish
[import_state.jid]
end
end
it 'does not mark the import as failed' do
worker.perform
expect(import_state.reload.status).to eq('finished')
end
end
context 'when the import status was not updated' do
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([import_state.jid])
end
it 'marks the import as failed' do
worker.perform
expect(import_state.reload.status).to eq('failed')
end
end
end
context 'when the job is still in Sidekiq' do
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([])
end
it 'does not mark the import as failed' do
expect { worker.perform }.not_to change { import_state.reload.status }
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe ::Gitlab::JiraImport::StuckJiraImportJobsWorker do
let_it_be(:current_user) { create(:user) }
let_it_be(:project) { create(:project) }
let(:worker) { described_class.new }
describe 'with scheduled Jira import' do
it_behaves_like 'stuck import job detection' do
let(:import_state) { create(:jira_import_state, :scheduled, project: project) }
before do
import_state.update(jid: '123')
end
end
end
describe 'with started jira import' do
it_behaves_like 'stuck import job detection' do
let(:import_state) { create(:jira_import_state, :started, project: project) }
before do
import_state.update(jid: '123')
end
end
end
describe 'with failed jira import' do
let(:import_state) { create(:jira_import_state, :failed, project: project) }
it 'detects no stuck jobs' do
expect(worker).to receive(:track_metrics).with(0, 0)
worker.perform
end
end
end
......@@ -5,51 +5,8 @@ require 'spec_helper'
describe StuckImportJobsWorker do
let(:worker) { described_class.new }
shared_examples 'project import job detection' do
context 'when the job has completed' do
context 'when the import status was already updated' do
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids) do
import_state.start
import_state.finish
[import_state.jid]
end
end
it 'does not mark the project as failed' do
worker.perform
expect(import_state.reload.status).to eq('finished')
end
end
context 'when the import status was not updated' do
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([import_state.jid])
end
it 'marks the project as failed' do
worker.perform
expect(import_state.reload.status).to eq('failed')
end
end
end
context 'when the job is still in Sidekiq' do
before do
allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([])
end
it 'does not mark the project as failed' do
expect { worker.perform }.not_to change { import_state.reload.status }
end
end
end
describe 'with scheduled import_status' do
it_behaves_like 'project import job detection' do
it_behaves_like 'stuck import job detection' do
let(:import_state) { create(:project, :import_scheduled).import_state }
before do
......@@ -59,7 +16,7 @@ describe StuckImportJobsWorker do
end
describe 'with started import_status' do
it_behaves_like 'project import job detection' do
it_behaves_like 'stuck import job detection' do
let(:import_state) { create(:project, :import_started).import_state }
before do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment