Commit 1b0b3d25 authored by Markus Koller's avatar Markus Koller

Use load balancing for Jira Connect workers

This sets the `data_consistency` on all Jira Connect workers so they
can use readonly database replicas.

This is not compatible with idempotent workers so we have to remove
that, although in practice it seems these jobs were never able to be
deduplicated anyway.

Changelog: changed
parent 095e2843
...@@ -1094,7 +1094,7 @@ ...@@ -1094,7 +1094,7 @@
:urgency: :low :urgency: :low
:resource_boundary: :unknown :resource_boundary: :unknown
:weight: 1 :weight: 1
:idempotent: true :idempotent:
:tags: [] :tags: []
- :name: jira_connect:jira_connect_sync_builds - :name: jira_connect:jira_connect_sync_builds
:worker_name: JiraConnect::SyncBuildsWorker :worker_name: JiraConnect::SyncBuildsWorker
...@@ -1103,7 +1103,7 @@ ...@@ -1103,7 +1103,7 @@
:urgency: :low :urgency: :low
:resource_boundary: :unknown :resource_boundary: :unknown
:weight: 1 :weight: 1
:idempotent: true :idempotent:
:tags: :tags:
- :exclude_from_kubernetes - :exclude_from_kubernetes
- :name: jira_connect:jira_connect_sync_deployments - :name: jira_connect:jira_connect_sync_deployments
...@@ -1113,7 +1113,7 @@ ...@@ -1113,7 +1113,7 @@
:urgency: :low :urgency: :low
:resource_boundary: :unknown :resource_boundary: :unknown
:weight: 1 :weight: 1
:idempotent: true :idempotent:
:tags: :tags:
- :exclude_from_kubernetes - :exclude_from_kubernetes
- :name: jira_connect:jira_connect_sync_feature_flags - :name: jira_connect:jira_connect_sync_feature_flags
...@@ -1123,7 +1123,7 @@ ...@@ -1123,7 +1123,7 @@
:urgency: :low :urgency: :low
:resource_boundary: :unknown :resource_boundary: :unknown
:weight: 1 :weight: 1
:idempotent: true :idempotent:
:tags: :tags:
- :exclude_from_kubernetes - :exclude_from_kubernetes
- :name: jira_connect:jira_connect_sync_merge_request - :name: jira_connect:jira_connect_sync_merge_request
...@@ -1133,7 +1133,7 @@ ...@@ -1133,7 +1133,7 @@
:urgency: :low :urgency: :low
:resource_boundary: :unknown :resource_boundary: :unknown
:weight: 1 :weight: 1
:idempotent: true :idempotent:
:tags: [] :tags: []
- :name: jira_connect:jira_connect_sync_project - :name: jira_connect:jira_connect_sync_project
:worker_name: JiraConnect::SyncProjectWorker :worker_name: JiraConnect::SyncProjectWorker
...@@ -1142,7 +1142,7 @@ ...@@ -1142,7 +1142,7 @@
:urgency: :low :urgency: :low
:resource_boundary: :unknown :resource_boundary: :unknown
:weight: 1 :weight: 1
:idempotent: true :idempotent:
:tags: :tags:
- :exclude_from_kubernetes - :exclude_from_kubernetes
- :name: jira_importer:jira_import_advance_stage - :name: jira_importer:jira_import_advance_stage
......
# frozen_string_literal: true # frozen_string_literal: true
module JiraConnect module JiraConnect
class SyncBranchWorker class SyncBranchWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker include ApplicationWorker
sidekiq_options retry: 3 sidekiq_options retry: 3
queue_namespace :jira_connect queue_namespace :jira_connect
feature_category :integrations feature_category :integrations
data_consistency :delayed, feature_flag: :load_balancing_for_jira_connect_workers
loggable_arguments 1, 2 loggable_arguments 1, 2
worker_has_external_dependencies! worker_has_external_dependencies!
idempotent!
def perform(project_id, branch_name, commit_shas, update_sequence_id) def perform(project_id, branch_name, commit_shas, update_sequence_id)
project = Project.find_by_id(project_id) project = Project.find_by_id(project_id)
......
# frozen_string_literal: true # frozen_string_literal: true
module JiraConnect module JiraConnect
class SyncBuildsWorker class SyncBuildsWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker include ApplicationWorker
sidekiq_options retry: 3 sidekiq_options retry: 3
idempotent!
worker_has_external_dependencies!
queue_namespace :jira_connect queue_namespace :jira_connect
feature_category :integrations feature_category :integrations
data_consistency :delayed, feature_flag: :load_balancing_for_jira_connect_workers
tags :exclude_from_kubernetes tags :exclude_from_kubernetes
worker_has_external_dependencies!
def perform(pipeline_id, sequence_id) def perform(pipeline_id, sequence_id)
pipeline = Ci::Pipeline.find_by_id(pipeline_id) pipeline = Ci::Pipeline.find_by_id(pipeline_id)
......
# frozen_string_literal: true # frozen_string_literal: true
module JiraConnect module JiraConnect
class SyncDeploymentsWorker class SyncDeploymentsWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker include ApplicationWorker
sidekiq_options retry: 3 sidekiq_options retry: 3
idempotent!
worker_has_external_dependencies!
queue_namespace :jira_connect queue_namespace :jira_connect
feature_category :integrations feature_category :integrations
data_consistency :delayed, feature_flag: :load_balancing_for_jira_connect_workers
tags :exclude_from_kubernetes tags :exclude_from_kubernetes
worker_has_external_dependencies!
def perform(deployment_id, sequence_id) def perform(deployment_id, sequence_id)
deployment = Deployment.find_by_id(deployment_id) deployment = Deployment.find_by_id(deployment_id)
......
# frozen_string_literal: true # frozen_string_literal: true
module JiraConnect module JiraConnect
class SyncFeatureFlagsWorker class SyncFeatureFlagsWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker include ApplicationWorker
sidekiq_options retry: 3 sidekiq_options retry: 3
idempotent!
worker_has_external_dependencies!
queue_namespace :jira_connect queue_namespace :jira_connect
feature_category :integrations feature_category :integrations
data_consistency :delayed, feature_flag: :load_balancing_for_jira_connect_workers
tags :exclude_from_kubernetes tags :exclude_from_kubernetes
worker_has_external_dependencies!
def perform(feature_flag_id, sequence_id) def perform(feature_flag_id, sequence_id)
feature_flag = ::Operations::FeatureFlag.find_by_id(feature_flag_id) feature_flag = ::Operations::FeatureFlag.find_by_id(feature_flag_id)
......
# frozen_string_literal: true # frozen_string_literal: true
module JiraConnect module JiraConnect
class SyncMergeRequestWorker class SyncMergeRequestWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker include ApplicationWorker
sidekiq_options retry: 3 sidekiq_options retry: 3
queue_namespace :jira_connect queue_namespace :jira_connect
feature_category :integrations feature_category :integrations
idempotent! data_consistency :delayed, feature_flag: :load_balancing_for_jira_connect_workers
worker_has_external_dependencies! worker_has_external_dependencies!
......
# frozen_string_literal: true # frozen_string_literal: true
module JiraConnect module JiraConnect
class SyncProjectWorker class SyncProjectWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker include ApplicationWorker
sidekiq_options retry: 3 sidekiq_options retry: 3
queue_namespace :jira_connect queue_namespace :jira_connect
feature_category :integrations feature_category :integrations
data_consistency :delayed, feature_flag: :load_balancing_for_jira_connect_workers
tags :exclude_from_kubernetes tags :exclude_from_kubernetes
idempotent!
worker_has_external_dependencies! worker_has_external_dependencies!
MERGE_REQUEST_LIMIT = 400 MERGE_REQUEST_LIMIT = 400
......
---
name: load_balancing_for_jira_connect_workers
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/64715
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/335420
milestone: '14.1'
type: development
group: group::ecosystem
default_enabled: false
...@@ -5,6 +5,11 @@ require 'spec_helper' ...@@ -5,6 +5,11 @@ require 'spec_helper'
RSpec.describe JiraConnect::SyncBranchWorker do RSpec.describe JiraConnect::SyncBranchWorker do
include AfterNextHelpers include AfterNextHelpers
it_behaves_like 'worker with data consistency',
described_class,
feature_flag: :load_balancing_for_jira_connect_workers,
data_consistency: :delayed
describe '#perform' do describe '#perform' do
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, :repository, group: group) } let_it_be(:project) { create(:project, :repository, group: group) }
...@@ -15,65 +20,59 @@ RSpec.describe JiraConnect::SyncBranchWorker do ...@@ -15,65 +20,59 @@ RSpec.describe JiraConnect::SyncBranchWorker do
let(:commit_shas) { %w(b83d6e3 5a62481) } let(:commit_shas) { %w(b83d6e3 5a62481) }
let(:update_sequence_id) { 1 } let(:update_sequence_id) { 1 }
def perform
described_class.new.perform(project_id, branch_name, commit_shas, update_sequence_id)
end
def expect_jira_sync_service_execute(args) def expect_jira_sync_service_execute(args)
expect_next_instances_of(JiraConnect::SyncService, IdempotentWorkerHelper::WORKER_EXEC_TIMES) do |instance| expect_next(JiraConnect::SyncService).to receive(:execute).with(args)
expect(instance).to receive(:execute).with(args)
end
end end
it_behaves_like 'an idempotent worker' do it 'calls JiraConnect::SyncService#execute' do
let(:job_args) { [project_id, branch_name, commit_shas, update_sequence_id] } expect_jira_sync_service_execute(
branches: [instance_of(Gitlab::Git::Branch)],
commits: project.commits_by(oids: commit_shas),
update_sequence_id: update_sequence_id
)
before do perform
stub_request(:post, 'https://sample.atlassian.net/rest/devinfo/0.10/bulk').to_return(status: 200, body: '', headers: {}) end
end
context 'without branch name' do
let(:branch_name) { nil }
it 'calls JiraConnect::SyncService#execute' do it 'calls JiraConnect::SyncService#execute' do
expect_jira_sync_service_execute( expect_jira_sync_service_execute(
branches: [instance_of(Gitlab::Git::Branch)], branches: nil,
commits: project.commits_by(oids: commit_shas), commits: project.commits_by(oids: commit_shas),
update_sequence_id: update_sequence_id update_sequence_id: update_sequence_id
) )
subject perform
end
context 'without branch name' do
let(:branch_name) { nil }
it 'calls JiraConnect::SyncService#execute' do
expect_jira_sync_service_execute(
branches: nil,
commits: project.commits_by(oids: commit_shas),
update_sequence_id: update_sequence_id
)
subject
end
end end
end
context 'without commits' do context 'without commits' do
let(:commit_shas) { nil } let(:commit_shas) { nil }
it 'calls JiraConnect::SyncService#execute' do it 'calls JiraConnect::SyncService#execute' do
expect_jira_sync_service_execute( expect_jira_sync_service_execute(
branches: [instance_of(Gitlab::Git::Branch)], branches: [instance_of(Gitlab::Git::Branch)],
commits: nil, commits: nil,
update_sequence_id: update_sequence_id update_sequence_id: update_sequence_id
) )
subject perform
end
end end
end
context 'when project no longer exists' do context 'when project no longer exists' do
let(:project_id) { non_existing_record_id } let(:project_id) { non_existing_record_id }
it 'does not call JiraConnect::SyncService' do it 'does not call JiraConnect::SyncService' do
expect(JiraConnect::SyncService).not_to receive(:new) expect(JiraConnect::SyncService).not_to receive(:new)
subject perform
end
end end
end end
end end
......
...@@ -5,6 +5,11 @@ require 'spec_helper' ...@@ -5,6 +5,11 @@ require 'spec_helper'
RSpec.describe ::JiraConnect::SyncBuildsWorker do RSpec.describe ::JiraConnect::SyncBuildsWorker do
include AfterNextHelpers include AfterNextHelpers
it_behaves_like 'worker with data consistency',
described_class,
feature_flag: :load_balancing_for_jira_connect_workers,
data_consistency: :delayed
describe '#perform' do describe '#perform' do
let_it_be(:pipeline) { create(:ci_pipeline) } let_it_be(:pipeline) { create(:ci_pipeline) }
......
...@@ -5,6 +5,11 @@ require 'spec_helper' ...@@ -5,6 +5,11 @@ require 'spec_helper'
RSpec.describe ::JiraConnect::SyncDeploymentsWorker do RSpec.describe ::JiraConnect::SyncDeploymentsWorker do
include AfterNextHelpers include AfterNextHelpers
it_behaves_like 'worker with data consistency',
described_class,
feature_flag: :load_balancing_for_jira_connect_workers,
data_consistency: :delayed
describe '#perform' do describe '#perform' do
let_it_be(:deployment) { create(:deployment) } let_it_be(:deployment) { create(:deployment) }
......
...@@ -5,6 +5,11 @@ require 'spec_helper' ...@@ -5,6 +5,11 @@ require 'spec_helper'
RSpec.describe ::JiraConnect::SyncFeatureFlagsWorker do RSpec.describe ::JiraConnect::SyncFeatureFlagsWorker do
include AfterNextHelpers include AfterNextHelpers
it_behaves_like 'worker with data consistency',
described_class,
feature_flag: :load_balancing_for_jira_connect_workers,
data_consistency: :delayed
describe '#perform' do describe '#perform' do
let_it_be(:feature_flag) { create(:operations_feature_flag) } let_it_be(:feature_flag) { create(:operations_feature_flag) }
......
...@@ -5,6 +5,11 @@ require 'spec_helper' ...@@ -5,6 +5,11 @@ require 'spec_helper'
RSpec.describe JiraConnect::SyncMergeRequestWorker do RSpec.describe JiraConnect::SyncMergeRequestWorker do
include AfterNextHelpers include AfterNextHelpers
it_behaves_like 'worker with data consistency',
described_class,
feature_flag: :load_balancing_for_jira_connect_workers,
data_consistency: :delayed
describe '#perform' do describe '#perform' do
let_it_be(:group) { create(:group) } let_it_be(:group) { create(:group) }
let_it_be(:project) { create(:project, :repository, group: group) } let_it_be(:project) { create(:project, :repository, group: group) }
...@@ -14,29 +19,24 @@ RSpec.describe JiraConnect::SyncMergeRequestWorker do ...@@ -14,29 +19,24 @@ RSpec.describe JiraConnect::SyncMergeRequestWorker do
let(:merge_request_id) { merge_request.id } let(:merge_request_id) { merge_request.id }
let(:update_sequence_id) { 1 } let(:update_sequence_id) { 1 }
it_behaves_like 'an idempotent worker' do def perform
let(:job_args) { [merge_request_id, update_sequence_id] } described_class.new.perform(merge_request_id, update_sequence_id)
end
before do
stub_request(:post, 'https://sample.atlassian.net/rest/devinfo/0.10/bulk').to_return(status: 200, body: '', headers: {})
end
it 'calls JiraConnect::SyncService#execute' do it 'calls JiraConnect::SyncService#execute' do
expect_next_instances_of(JiraConnect::SyncService, IdempotentWorkerHelper::WORKER_EXEC_TIMES) do |service| expect_next(JiraConnect::SyncService).to receive(:execute)
expect(service).to receive(:execute).with(merge_requests: [merge_request], update_sequence_id: update_sequence_id) .with(merge_requests: [merge_request], update_sequence_id: update_sequence_id)
end
subject perform
end end
context 'when MR no longer exists' do context 'when MR no longer exists' do
let(:merge_request_id) { non_existing_record_id } let(:merge_request_id) { non_existing_record_id }
it 'does not call JiraConnect::SyncService' do it 'does not call JiraConnect::SyncService' do
expect(JiraConnect::SyncService).not_to receive(:new) expect(JiraConnect::SyncService).not_to receive(:new)
subject perform
end
end end
end end
end end
......
...@@ -3,6 +3,13 @@ ...@@ -3,6 +3,13 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe JiraConnect::SyncProjectWorker, factory_default: :keep do RSpec.describe JiraConnect::SyncProjectWorker, factory_default: :keep do
include AfterNextHelpers
it_behaves_like 'worker with data consistency',
described_class,
feature_flag: :load_balancing_for_jira_connect_workers,
data_consistency: :delayed
describe '#perform' do describe '#perform' do
let_it_be(:project) { create_default(:project).freeze } let_it_be(:project) { create_default(:project).freeze }
...@@ -14,6 +21,22 @@ RSpec.describe JiraConnect::SyncProjectWorker, factory_default: :keep do ...@@ -14,6 +21,22 @@ RSpec.describe JiraConnect::SyncProjectWorker, factory_default: :keep do
let(:jira_connect_sync_service) { JiraConnect::SyncService.new(project) } let(:jira_connect_sync_service) { JiraConnect::SyncService.new(project) }
let(:job_args) { [project.id, update_sequence_id] } let(:job_args) { [project.id, update_sequence_id] }
let(:update_sequence_id) { 1 } let(:update_sequence_id) { 1 }
let(:request_path) { '/rest/devinfo/0.10/bulk' }
let(:request_body) do
{
repositories: [
Atlassian::JiraConnect::Serializers::RepositoryEntity.represent(
project,
merge_requests: [mr_with_jira_description, mr_with_jira_title],
update_sequence_id: update_sequence_id
)
]
}
end
def perform(project_id, update_sequence_id)
described_class.new.perform(project_id, update_sequence_id)
end
before do before do
stub_request(:post, 'https://sample.atlassian.net/rest/devinfo/0.10/bulk').to_return(status: 200, body: '', headers: {}) stub_request(:post, 'https://sample.atlassian.net/rest/devinfo/0.10/bulk').to_return(status: 200, body: '', headers: {})
...@@ -24,54 +47,37 @@ RSpec.describe JiraConnect::SyncProjectWorker, factory_default: :keep do ...@@ -24,54 +47,37 @@ RSpec.describe JiraConnect::SyncProjectWorker, factory_default: :keep do
context 'when the project is not found' do context 'when the project is not found' do
it 'does not raise an error' do it 'does not raise an error' do
expect { described_class.new.perform('non_existing_record_id', update_sequence_id) }.not_to raise_error expect { perform('non_existing_record_id', update_sequence_id) }.not_to raise_error
end end
end end
it 'avoids N+1 database queries' do it 'avoids N+1 database queries' do
control_count = ActiveRecord::QueryRecorder.new { described_class.new.perform(project.id, update_sequence_id) }.count control_count = ActiveRecord::QueryRecorder.new { perform(project.id, update_sequence_id) }.count
create(:merge_request, :unique_branches, title: 'TEST-123') create(:merge_request, :unique_branches, title: 'TEST-123')
expect { described_class.new.perform(project.id, update_sequence_id) }.not_to exceed_query_limit(control_count) expect { perform(project.id, update_sequence_id) }.not_to exceed_query_limit(control_count)
end end
it_behaves_like 'an idempotent worker' do it 'sends the request with custom update_sequence_id' do
let(:request_path) { '/rest/devinfo/0.10/bulk' } allow_next(Atlassian::JiraConnect::Client).to receive(:post)
let(:request_body) do .with(request_path, request_body)
{
repositories: [
Atlassian::JiraConnect::Serializers::RepositoryEntity.represent(
project,
merge_requests: [mr_with_jira_description, mr_with_jira_title],
update_sequence_id: update_sequence_id
)
]
}
end
it 'sends the request with custom update_sequence_id' do
allow_next_instances_of(Atlassian::JiraConnect::Client, IdempotentWorkerHelper::WORKER_EXEC_TIMES) do |client|
expect(client).to receive(:post).with(request_path, request_body)
end
subject perform(project.id, update_sequence_id)
end end
context 'when the number of merge requests to sync is higher than the limit' do context 'when the number of merge requests to sync is higher than the limit' do
let!(:most_recent_merge_request) { create(:merge_request, :unique_branches, description: 'TEST-323', title: 'TEST-123') } let!(:most_recent_merge_request) { create(:merge_request, :unique_branches, description: 'TEST-323', title: 'TEST-123') }
before do before do
stub_const("#{described_class}::MERGE_REQUEST_LIMIT", 1) stub_const("#{described_class}::MERGE_REQUEST_LIMIT", 1)
end end
it 'syncs only the most recent merge requests within the limit' do it 'syncs only the most recent merge requests within the limit' do
expect(jira_connect_sync_service).to receive(:execute) expect(jira_connect_sync_service).to receive(:execute)
.exactly(IdempotentWorkerHelper::WORKER_EXEC_TIMES).times .with(merge_requests: [most_recent_merge_request], update_sequence_id: update_sequence_id)
.with(merge_requests: [most_recent_merge_request], update_sequence_id: update_sequence_id)
subject perform(project.id, update_sequence_id)
end
end end
end end
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment