Commit c0db0637 authored by Nikola Milojevic's avatar Nikola Milojevic

Merge branch '330685-use-load-balancing-in-jira-connect-workers' into 'master'

Use load balancing for Jira Connect workers

See merge request gitlab-org/gitlab!64715
parents 6cdfe00c 1b0b3d25
......@@ -1103,7 +1103,7 @@
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:idempotent:
:tags: []
- :name: jira_connect:jira_connect_sync_builds
:worker_name: JiraConnect::SyncBuildsWorker
......@@ -1112,7 +1112,7 @@
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:idempotent:
:tags:
- :exclude_from_kubernetes
- :name: jira_connect:jira_connect_sync_deployments
......@@ -1122,7 +1122,7 @@
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:idempotent:
:tags:
- :exclude_from_kubernetes
- :name: jira_connect:jira_connect_sync_feature_flags
......@@ -1132,7 +1132,7 @@
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:idempotent:
:tags:
- :exclude_from_kubernetes
- :name: jira_connect:jira_connect_sync_merge_request
......@@ -1142,7 +1142,7 @@
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:idempotent:
:tags: []
- :name: jira_connect:jira_connect_sync_project
:worker_name: JiraConnect::SyncProjectWorker
......@@ -1151,7 +1151,7 @@
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:idempotent:
:tags:
- :exclude_from_kubernetes
- :name: jira_importer:jira_import_advance_stage
......
# frozen_string_literal: true
module JiraConnect
class SyncBranchWorker
class SyncBranchWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
sidekiq_options retry: 3
queue_namespace :jira_connect
feature_category :integrations
data_consistency :delayed, feature_flag: :load_balancing_for_jira_connect_workers
loggable_arguments 1, 2
worker_has_external_dependencies!
idempotent!
def perform(project_id, branch_name, commit_shas, update_sequence_id)
project = Project.find_by_id(project_id)
......
# frozen_string_literal: true
module JiraConnect
class SyncBuildsWorker
class SyncBuildsWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
sidekiq_options retry: 3
idempotent!
worker_has_external_dependencies!
queue_namespace :jira_connect
feature_category :integrations
data_consistency :delayed, feature_flag: :load_balancing_for_jira_connect_workers
tags :exclude_from_kubernetes
worker_has_external_dependencies!
def perform(pipeline_id, sequence_id)
pipeline = Ci::Pipeline.find_by_id(pipeline_id)
......
# frozen_string_literal: true
module JiraConnect
class SyncDeploymentsWorker
class SyncDeploymentsWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
sidekiq_options retry: 3
idempotent!
worker_has_external_dependencies!
queue_namespace :jira_connect
feature_category :integrations
data_consistency :delayed, feature_flag: :load_balancing_for_jira_connect_workers
tags :exclude_from_kubernetes
worker_has_external_dependencies!
def perform(deployment_id, sequence_id)
deployment = Deployment.find_by_id(deployment_id)
......
# frozen_string_literal: true
module JiraConnect
class SyncFeatureFlagsWorker
class SyncFeatureFlagsWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
sidekiq_options retry: 3
idempotent!
worker_has_external_dependencies!
queue_namespace :jira_connect
feature_category :integrations
data_consistency :delayed, feature_flag: :load_balancing_for_jira_connect_workers
tags :exclude_from_kubernetes
worker_has_external_dependencies!
def perform(feature_flag_id, sequence_id)
feature_flag = ::Operations::FeatureFlag.find_by_id(feature_flag_id)
......
# frozen_string_literal: true
module JiraConnect
class SyncMergeRequestWorker
class SyncMergeRequestWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
sidekiq_options retry: 3
queue_namespace :jira_connect
feature_category :integrations
idempotent!
data_consistency :delayed, feature_flag: :load_balancing_for_jira_connect_workers
worker_has_external_dependencies!
......
# frozen_string_literal: true
module JiraConnect
class SyncProjectWorker
class SyncProjectWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
sidekiq_options retry: 3
queue_namespace :jira_connect
feature_category :integrations
data_consistency :delayed, feature_flag: :load_balancing_for_jira_connect_workers
tags :exclude_from_kubernetes
idempotent!
worker_has_external_dependencies!
MERGE_REQUEST_LIMIT = 400
......
---
name: load_balancing_for_jira_connect_workers
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/64715
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/335420
milestone: '14.1'
type: development
group: group::ecosystem
default_enabled: false
......@@ -5,6 +5,11 @@ require 'spec_helper'
RSpec.describe JiraConnect::SyncBranchWorker do
include AfterNextHelpers
it_behaves_like 'worker with data consistency',
described_class,
feature_flag: :load_balancing_for_jira_connect_workers,
data_consistency: :delayed
describe '#perform' do
let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, :repository, group: group) }
......@@ -15,65 +20,59 @@ RSpec.describe JiraConnect::SyncBranchWorker do
let(:commit_shas) { %w(b83d6e3 5a62481) }
let(:update_sequence_id) { 1 }
def perform
described_class.new.perform(project_id, branch_name, commit_shas, update_sequence_id)
end
def expect_jira_sync_service_execute(args)
expect_next_instances_of(JiraConnect::SyncService, IdempotentWorkerHelper::WORKER_EXEC_TIMES) do |instance|
expect(instance).to receive(:execute).with(args)
end
expect_next(JiraConnect::SyncService).to receive(:execute).with(args)
end
it_behaves_like 'an idempotent worker' do
let(:job_args) { [project_id, branch_name, commit_shas, update_sequence_id] }
it 'calls JiraConnect::SyncService#execute' do
expect_jira_sync_service_execute(
branches: [instance_of(Gitlab::Git::Branch)],
commits: project.commits_by(oids: commit_shas),
update_sequence_id: update_sequence_id
)
before do
stub_request(:post, 'https://sample.atlassian.net/rest/devinfo/0.10/bulk').to_return(status: 200, body: '', headers: {})
end
perform
end
context 'without branch name' do
let(:branch_name) { nil }
it 'calls JiraConnect::SyncService#execute' do
expect_jira_sync_service_execute(
branches: [instance_of(Gitlab::Git::Branch)],
branches: nil,
commits: project.commits_by(oids: commit_shas),
update_sequence_id: update_sequence_id
)
subject
end
context 'without branch name' do
let(:branch_name) { nil }
it 'calls JiraConnect::SyncService#execute' do
expect_jira_sync_service_execute(
branches: nil,
commits: project.commits_by(oids: commit_shas),
update_sequence_id: update_sequence_id
)
subject
end
perform
end
end
context 'without commits' do
let(:commit_shas) { nil }
context 'without commits' do
let(:commit_shas) { nil }
it 'calls JiraConnect::SyncService#execute' do
expect_jira_sync_service_execute(
branches: [instance_of(Gitlab::Git::Branch)],
commits: nil,
update_sequence_id: update_sequence_id
)
it 'calls JiraConnect::SyncService#execute' do
expect_jira_sync_service_execute(
branches: [instance_of(Gitlab::Git::Branch)],
commits: nil,
update_sequence_id: update_sequence_id
)
subject
end
perform
end
end
context 'when project no longer exists' do
let(:project_id) { non_existing_record_id }
context 'when project no longer exists' do
let(:project_id) { non_existing_record_id }
it 'does not call JiraConnect::SyncService' do
expect(JiraConnect::SyncService).not_to receive(:new)
it 'does not call JiraConnect::SyncService' do
expect(JiraConnect::SyncService).not_to receive(:new)
subject
end
perform
end
end
end
......
......@@ -5,6 +5,11 @@ require 'spec_helper'
RSpec.describe ::JiraConnect::SyncBuildsWorker do
include AfterNextHelpers
it_behaves_like 'worker with data consistency',
described_class,
feature_flag: :load_balancing_for_jira_connect_workers,
data_consistency: :delayed
describe '#perform' do
let_it_be(:pipeline) { create(:ci_pipeline) }
......
......@@ -5,6 +5,11 @@ require 'spec_helper'
RSpec.describe ::JiraConnect::SyncDeploymentsWorker do
include AfterNextHelpers
it_behaves_like 'worker with data consistency',
described_class,
feature_flag: :load_balancing_for_jira_connect_workers,
data_consistency: :delayed
describe '#perform' do
let_it_be(:deployment) { create(:deployment) }
......
......@@ -5,6 +5,11 @@ require 'spec_helper'
RSpec.describe ::JiraConnect::SyncFeatureFlagsWorker do
include AfterNextHelpers
it_behaves_like 'worker with data consistency',
described_class,
feature_flag: :load_balancing_for_jira_connect_workers,
data_consistency: :delayed
describe '#perform' do
let_it_be(:feature_flag) { create(:operations_feature_flag) }
......
......@@ -5,6 +5,11 @@ require 'spec_helper'
RSpec.describe JiraConnect::SyncMergeRequestWorker do
include AfterNextHelpers
it_behaves_like 'worker with data consistency',
described_class,
feature_flag: :load_balancing_for_jira_connect_workers,
data_consistency: :delayed
describe '#perform' do
let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, :repository, group: group) }
......@@ -14,29 +19,24 @@ RSpec.describe JiraConnect::SyncMergeRequestWorker do
let(:merge_request_id) { merge_request.id }
let(:update_sequence_id) { 1 }
it_behaves_like 'an idempotent worker' do
let(:job_args) { [merge_request_id, update_sequence_id] }
before do
stub_request(:post, 'https://sample.atlassian.net/rest/devinfo/0.10/bulk').to_return(status: 200, body: '', headers: {})
end
def perform
described_class.new.perform(merge_request_id, update_sequence_id)
end
it 'calls JiraConnect::SyncService#execute' do
expect_next_instances_of(JiraConnect::SyncService, IdempotentWorkerHelper::WORKER_EXEC_TIMES) do |service|
expect(service).to receive(:execute).with(merge_requests: [merge_request], update_sequence_id: update_sequence_id)
end
it 'calls JiraConnect::SyncService#execute' do
expect_next(JiraConnect::SyncService).to receive(:execute)
.with(merge_requests: [merge_request], update_sequence_id: update_sequence_id)
subject
end
perform
end
context 'when MR no longer exists' do
let(:merge_request_id) { non_existing_record_id }
context 'when MR no longer exists' do
let(:merge_request_id) { non_existing_record_id }
it 'does not call JiraConnect::SyncService' do
expect(JiraConnect::SyncService).not_to receive(:new)
it 'does not call JiraConnect::SyncService' do
expect(JiraConnect::SyncService).not_to receive(:new)
subject
end
perform
end
end
end
......
......@@ -3,6 +3,13 @@
require 'spec_helper'
RSpec.describe JiraConnect::SyncProjectWorker, factory_default: :keep do
include AfterNextHelpers
it_behaves_like 'worker with data consistency',
described_class,
feature_flag: :load_balancing_for_jira_connect_workers,
data_consistency: :delayed
describe '#perform' do
let_it_be(:project) { create_default(:project).freeze }
......@@ -14,6 +21,22 @@ RSpec.describe JiraConnect::SyncProjectWorker, factory_default: :keep do
let(:jira_connect_sync_service) { JiraConnect::SyncService.new(project) }
let(:job_args) { [project.id, update_sequence_id] }
let(:update_sequence_id) { 1 }
let(:request_path) { '/rest/devinfo/0.10/bulk' }
let(:request_body) do
{
repositories: [
Atlassian::JiraConnect::Serializers::RepositoryEntity.represent(
project,
merge_requests: [mr_with_jira_description, mr_with_jira_title],
update_sequence_id: update_sequence_id
)
]
}
end
def perform(project_id, update_sequence_id)
described_class.new.perform(project_id, update_sequence_id)
end
before do
stub_request(:post, 'https://sample.atlassian.net/rest/devinfo/0.10/bulk').to_return(status: 200, body: '', headers: {})
......@@ -24,54 +47,37 @@ RSpec.describe JiraConnect::SyncProjectWorker, factory_default: :keep do
context 'when the project is not found' do
it 'does not raise an error' do
expect { described_class.new.perform('non_existing_record_id', update_sequence_id) }.not_to raise_error
expect { perform('non_existing_record_id', update_sequence_id) }.not_to raise_error
end
end
it 'avoids N+1 database queries' do
control_count = ActiveRecord::QueryRecorder.new { described_class.new.perform(project.id, update_sequence_id) }.count
control_count = ActiveRecord::QueryRecorder.new { perform(project.id, update_sequence_id) }.count
create(:merge_request, :unique_branches, title: 'TEST-123')
expect { described_class.new.perform(project.id, update_sequence_id) }.not_to exceed_query_limit(control_count)
expect { perform(project.id, update_sequence_id) }.not_to exceed_query_limit(control_count)
end
it_behaves_like 'an idempotent worker' do
let(:request_path) { '/rest/devinfo/0.10/bulk' }
let(:request_body) do
{
repositories: [
Atlassian::JiraConnect::Serializers::RepositoryEntity.represent(
project,
merge_requests: [mr_with_jira_description, mr_with_jira_title],
update_sequence_id: update_sequence_id
)
]
}
end
it 'sends the request with custom update_sequence_id' do
allow_next_instances_of(Atlassian::JiraConnect::Client, IdempotentWorkerHelper::WORKER_EXEC_TIMES) do |client|
expect(client).to receive(:post).with(request_path, request_body)
end
it 'sends the request with custom update_sequence_id' do
allow_next(Atlassian::JiraConnect::Client).to receive(:post)
.with(request_path, request_body)
subject
end
perform(project.id, update_sequence_id)
end
context 'when the number of merge requests to sync is higher than the limit' do
let!(:most_recent_merge_request) { create(:merge_request, :unique_branches, description: 'TEST-323', title: 'TEST-123') }
context 'when the number of merge requests to sync is higher than the limit' do
let!(:most_recent_merge_request) { create(:merge_request, :unique_branches, description: 'TEST-323', title: 'TEST-123') }
before do
stub_const("#{described_class}::MERGE_REQUEST_LIMIT", 1)
end
before do
stub_const("#{described_class}::MERGE_REQUEST_LIMIT", 1)
end
it 'syncs only the most recent merge requests within the limit' do
expect(jira_connect_sync_service).to receive(:execute)
.exactly(IdempotentWorkerHelper::WORKER_EXEC_TIMES).times
.with(merge_requests: [most_recent_merge_request], update_sequence_id: update_sequence_id)
it 'syncs only the most recent merge requests within the limit' do
expect(jira_connect_sync_service).to receive(:execute)
.with(merge_requests: [most_recent_merge_request], update_sequence_id: update_sequence_id)
subject
end
perform(project.id, update_sequence_id)
end
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment