Commit 58a42da3 authored by Kassio Borges's avatar Kassio Borges

Add BulkImports::Groups::EpicsPipeline

Add the ability to import Group epics with the Group Migration
(BulkImports) feature.
parent 72fa89b7
...@@ -59,6 +59,25 @@ class BulkImports::Entity < ApplicationRecord ...@@ -59,6 +59,25 @@ class BulkImports::Entity < ApplicationRecord
end end
end end
def update_tracker_for(relation:, has_next_page:, next_page: nil)
attributes = {
relation: relation,
has_next_page: has_next_page,
next_page: next_page,
bulk_import_entity_id: id
}
trackers.upsert(attributes, unique_by: %i[bulk_import_entity_id relation])
end
def has_next_page?(relation)
trackers.find_by(relation: relation)&.has_next_page
end
def next_page_for(relation)
trackers.find_by(relation: relation)&.next_page
end
private private
def validate_parent_is_a_group def validate_parent_is_a_group
......
# frozen_string_literal: true
module BulkImports
module EE
module Groups
module Graphql
module GetEpicsQuery
extend self
def to_s
<<-'GRAPHQL'
query($full_path: ID!, $cursor: String) {
group(fullPath: $full_path) {
epics(
includeDescendantGroups: false,
first: 100,
after: $cursor
) {
pageInfo {
endCursor
hasNextPage
}
nodes {
title
description
state
createdAt
closedAt
startDate
startDateFixed
startDateIsFixed
dueDateFixed
dueDateIsFixed
relativePosition
confidential
}
}
}
}
GRAPHQL
end
def variables(entity)
{
full_path: entity.source_full_path,
cursor: entity.next_page_for(:epics)
}
end
end
end
end
end
end
# frozen_string_literal: true
module BulkImports
module EE
module Groups
module Loaders
class EpicsLoader
def initialize(options = {})
@options = options
end
def load(context, data)
Array.wrap(data['nodes']).each do |args|
::Epics::CreateService.new(
context.entity.group,
context.current_user,
args
).execute
end
context.entity.update_tracker_for(
relation: :epics,
has_next_page: data.dig('page_info', 'has_next_page'),
next_page: data.dig('page_info', 'end_cursor')
)
end
end
end
end
end
end
# frozen_string_literal: true
module BulkImports
module EE
module Groups
module Pipelines
class EpicsPipeline
include ::BulkImports::Pipeline
extractor ::BulkImports::Common::Extractors::GraphqlExtractor,
query: BulkImports::EE::Groups::Graphql::GetEpicsQuery
transformer ::BulkImports::Common::Transformers::HashKeyDigger,
key_path: %w[data group epics]
transformer ::BulkImports::Common::Transformers::UnderscorifyKeysTransformer
loader BulkImports::EE::Groups::Loaders::EpicsLoader
after_run do |context|
if context.entity.has_next_page?(:epics)
self.new.run(context)
end
end
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::EE::Groups::Loaders::EpicsLoader do
describe '#load' do
let(:user) { create(:user) }
let(:group) { create(:group) }
let(:entity) { create(:bulk_import_entity, group: group) }
let(:context) do
BulkImports::Pipeline::Context.new(
entity: entity,
current_user: user
)
end
let(:data) do
{
'page_info' => {
'end_cursor' => 'endCursorValue',
'has_next_page' => true
},
'nodes' => [
{
'title' => 'epic1',
'state' => 'opened',
'confidential' => false
},
{
'title' => 'epic2',
'state' => 'closed',
'confidential' => true
}
]
}
end
subject { described_class.new }
it 'creates the epics and update the entity tracker' do
expect { subject.load(context, data) }.to change(::Epic, :count).by(2)
tracker = entity.trackers.last
expect(group.epics.count).to eq(2)
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq('endCursorValue')
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::EE::Groups::Pipelines::EpicsPipeline do
describe '#run' do
let(:user) { create(:user) }
let(:group) { create(:group) }
let(:entity) do
create(
:bulk_import_entity,
source_full_path: 'source/full/path',
destination_name: 'My Destination Group',
destination_namespace: group.full_path,
group: group
)
end
let(:context) do
BulkImports::Pipeline::Context.new(
current_user: user,
entity: entity
)
end
subject { described_class.new }
it 'imports group epics into destination group' do
page1 = extractor_data(has_next_page: true, cursor: 'nextPageCursor')
page2 = extractor_data(has_next_page: false)
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
if entity.has_next_page?(:epics)
allow(extractor).to receive(:extract).and_return(page2)
else
allow(extractor).to receive(:extract).and_return(page1)
end
end
expect { subject.run(context) }.to change(::Epic, :count).by(2)
end
end
describe 'pipeline parts' do
it { expect(described_class).to include_module(BulkImports::Pipeline) }
it { expect(described_class).to include_module(BulkImports::Pipeline::Attributes) }
it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) }
it 'has extractors' do
expect(described_class.extractors)
.to contain_exactly(
{
klass: BulkImports::Common::Extractors::GraphqlExtractor,
options: {
query: BulkImports::EE::Groups::Graphql::GetEpicsQuery
}
}
)
end
it 'has transformers' do
expect(described_class.transformers)
.to contain_exactly(
{ klass: BulkImports::Common::Transformers::HashKeyDigger, options: { key_path: %w[data group epics] } },
{ klass: BulkImports::Common::Transformers::UnderscorifyKeysTransformer, options: nil }
)
end
it 'has loaders' do
expect(described_class.loaders).to contain_exactly({
klass: BulkImports::EE::Groups::Loaders::EpicsLoader, options: nil
})
end
end
def extractor_data(has_next_page:, cursor: nil)
[
{
'data' => {
'group' => {
'epics' => {
'page_info' => {
'end_cursor' => cursor,
'has_next_page' => has_next_page
},
'nodes' => [
{
'title' => 'epic1',
'state' => 'closed',
'confidential' => true
}
]
}
}
}
}
]
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Importers::GroupImporter do
let(:user) { create(:user) }
let(:bulk_import) { create(:bulk_import) }
let(:bulk_import_entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
let(:bulk_import_configuration) { create(:bulk_import_configuration, bulk_import: bulk_import) }
let(:context) do
BulkImports::Pipeline::Context.new(
current_user: user,
entity: bulk_import_entity,
configuration: bulk_import_configuration
)
end
subject { described_class.new(bulk_import_entity) }
before do
allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context)
end
describe '#execute' do
it "starts the entity and run its pipelines" do
expect(bulk_import_entity).to receive(:start).and_call_original
expect_to_run_pipeline BulkImports::Groups::Pipelines::GroupPipeline, context: context
expect_to_run_pipeline BulkImports::EE::Groups::Pipelines::EpicsPipeline, context: context
expect_to_run_pipeline BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, context: context
subject.execute
expect(bulk_import_entity.reload).to be_finished
end
end
def expect_to_run_pipeline(klass, context:)
expect_next_instance_of(klass) do |pipeline|
expect(pipeline).to receive(:run).with(context)
end
end
end
...@@ -6,15 +6,16 @@ module BulkImports ...@@ -6,15 +6,16 @@ module BulkImports
class GraphqlExtractor class GraphqlExtractor
def initialize(query) def initialize(query)
@query = query[:query] @query = query[:query]
@query_string = @query.to_s
@variables = @query.variables
end end
def extract(context) def extract(context)
@context = context client = graphql_client(context)
Enumerator.new do |yielder| Enumerator.new do |yielder|
result = graphql_client.execute(parsed_query, query_variables(context.entity)) result = client.execute(
client.parse(query.to_s),
query.variables(context.entity)
)
yielder << result.original_hash.deep_dup yielder << result.original_hash.deep_dup
end end
...@@ -22,23 +23,17 @@ module BulkImports ...@@ -22,23 +23,17 @@ module BulkImports
private private
def graphql_client attr_reader :query
def graphql_client(context)
@graphql_client ||= BulkImports::Clients::Graphql.new( @graphql_client ||= BulkImports::Clients::Graphql.new(
url: @context.configuration.url, url: context.configuration.url,
token: @context.configuration.access_token token: context.configuration.access_token
) )
end end
def parsed_query def parsed_query
@parsed_query ||= graphql_client.parse(@query.to_s) @parsed_query ||= graphql_client.parse(query.to_s)
end
def query_variables(entity)
return unless @variables
@variables.transform_values do |entity_attribute|
entity.public_send(entity_attribute) # rubocop:disable GitlabSecurity/PublicSend
end
end end
end end
end end
......
# frozen_string_literal: true
# Cleanup GraphQL original response hash from unnecessary nesting
# 1. Remove ['data']['group'] or ['data']['project'] hash nesting
# 2. Remove ['edges'] & ['nodes'] array wrappings
# 3. Remove ['node'] hash wrapping
#
# @example
# data = {"data"=>{"group"=> {
# "name"=>"test",
# "fullName"=>"test",
# "description"=>"test",
# "labels"=>{"edges"=>[{"node"=>{"title"=>"label1"}}, {"node"=>{"title"=>"label2"}}, {"node"=>{"title"=>"label3"}}]}}}}
#
# BulkImports::Common::Transformers::GraphqlCleanerTransformer.new.transform(nil, data)
#
# {"name"=>"test", "fullName"=>"test", "description"=>"test", "labels"=>[{"title"=>"label1"}, {"title"=>"label2"}, {"title"=>"label3"}]}
module BulkImports
module Common
module Transformers
class GraphqlCleanerTransformer
EDGES = 'edges'
NODE = 'node'
def initialize(options = {})
@options = options
end
def transform(_, data)
return data unless data.is_a?(Hash)
data = data.dig('data', 'group') || data.dig('data', 'project') || data
clean_edges_and_nodes(data)
end
def clean_edges_and_nodes(data)
case data
when Array
data.map(&method(:clean_edges_and_nodes))
when Hash
if data.key?(NODE)
clean_edges_and_nodes(data[NODE])
else
data.transform_values { |value| clean_edges_and_nodes(value.try(:fetch, EDGES, value) || value) }
end
else
data
end
end
end
end
end
end
# frozen_string_literal: true
module BulkImports
module Common
module Transformers
class HashKeyDigger
def initialize(options = {})
@key_path = options[:key_path]
end
def transform(_, data)
raise ArgumentError, "Given data must be a Hash" unless data.is_a?(Hash)
data.dig(*Array.wrap(key_path))
end
private
attr_reader :key_path
end
end
end
end
...@@ -29,8 +29,8 @@ module BulkImports ...@@ -29,8 +29,8 @@ module BulkImports
GRAPHQL GRAPHQL
end end
def variables def variables(entity)
{ full_path: :source_full_path } { full_path: entity.source_full_path }
end end
end end
end end
......
...@@ -8,7 +8,7 @@ module BulkImports ...@@ -8,7 +8,7 @@ module BulkImports
extractor Common::Extractors::GraphqlExtractor, query: Graphql::GetGroupQuery extractor Common::Extractors::GraphqlExtractor, query: Graphql::GetGroupQuery
transformer Common::Transformers::GraphqlCleanerTransformer transformer Common::Transformers::HashKeyDigger, key_path: %w[data group]
transformer Common::Transformers::UnderscorifyKeysTransformer transformer Common::Transformers::UnderscorifyKeysTransformer
transformer Groups::Transformers::GroupAttributesTransformer transformer Groups::Transformers::GroupAttributesTransformer
......
...@@ -19,6 +19,7 @@ module BulkImports ...@@ -19,6 +19,7 @@ module BulkImports
) )
BulkImports::Groups::Pipelines::GroupPipeline.new.run(context) BulkImports::Groups::Pipelines::GroupPipeline.new.run(context)
'BulkImports::EE::Groups::Pipelines::EpicsPipeline'.constantize.new.run(context) if Gitlab.ee?
BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline.new.run(context) BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline.new.run(context)
entity.finish! entity.finish!
......
...@@ -19,6 +19,10 @@ module BulkImports ...@@ -19,6 +19,10 @@ module BulkImports
add_attribute(:loaders, klass, options) add_attribute(:loaders, klass, options)
end end
def after_run(&block)
class_attributes[:after_run] = block
end
def add_attribute(sym, klass, options) def add_attribute(sym, klass, options)
class_attributes[sym] ||= [] class_attributes[sym] ||= []
class_attributes[sym] << { klass: klass, options: options } class_attributes[sym] << { klass: klass, options: options }
...@@ -35,6 +39,10 @@ module BulkImports ...@@ -35,6 +39,10 @@ module BulkImports
def loaders def loaders
class_attributes[:loaders] class_attributes[:loaders]
end end
def after_run_callback
class_attributes[:after_run]
end
end end
end end
end end
......
...@@ -20,6 +20,10 @@ module BulkImports ...@@ -20,6 +20,10 @@ module BulkImports
@loaders ||= self.class.loaders.map(&method(:instantiate)) @loaders ||= self.class.loaders.map(&method(:instantiate))
end end
def after_run
@after_run ||= self.class.after_run_callback
end
def pipeline_name def pipeline_name
@pipeline ||= self.class.name @pipeline ||= self.class.name
end end
...@@ -47,6 +51,8 @@ module BulkImports ...@@ -47,6 +51,8 @@ module BulkImports
end end
end end
end end
after_run.call(context) if after_run.present?
end end
private # rubocop:disable Lint/UselessAccessModifier private # rubocop:disable Lint/UselessAccessModifier
......
...@@ -41,12 +41,11 @@ RSpec.describe BulkImports::Common::Extractors::GraphqlExtractor do ...@@ -41,12 +41,11 @@ RSpec.describe BulkImports::Common::Extractors::GraphqlExtractor do
end end
context 'when variables are present' do context 'when variables are present' do
let(:query) { { query: double(to_s: 'test', variables: { full_path: :source_full_path }) } } let(:variables) { { foo: :bar } }
let(:query) { { query: double(to_s: 'test', variables: variables) } }
it 'builds graphql query variables for import entity' do it 'builds graphql query variables for import entity' do
expected_variables = { full_path: import_entity.source_full_path } expect(graphql_client).to receive(:execute).with(anything, variables)
expect(graphql_client).to receive(:execute).with(anything, expected_variables)
subject.extract(context).first subject.extract(context).first
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Common::Transformers::GraphqlCleanerTransformer do
describe '#transform' do
let_it_be(:expected_output) do
{
'name' => 'test',
'fullName' => 'test',
'description' => 'test',
'labels' => [
{ 'title' => 'label1' },
{ 'title' => 'label2' },
{ 'title' => 'label3' }
]
}
end
it 'deep cleans hash from GraphQL keys' do
data = {
'data' => {
'group' => {
'name' => 'test',
'fullName' => 'test',
'description' => 'test',
'labels' => {
'edges' => [
{ 'node' => { 'title' => 'label1' } },
{ 'node' => { 'title' => 'label2' } },
{ 'node' => { 'title' => 'label3' } }
]
}
}
}
}
transformed_data = described_class.new.transform(nil, data)
expect(transformed_data).to eq(expected_output)
end
context 'when data does not have data/group nesting' do
it 'deep cleans hash from GraphQL keys' do
data = {
'name' => 'test',
'fullName' => 'test',
'description' => 'test',
'labels' => {
'edges' => [
{ 'node' => { 'title' => 'label1' } },
{ 'node' => { 'title' => 'label2' } },
{ 'node' => { 'title' => 'label3' } }
]
}
}
transformed_data = described_class.new.transform(nil, data)
expect(transformed_data).to eq(expected_output)
end
end
context 'when data is not a hash' do
it 'does not perform transformation' do
data = 'test'
transformed_data = described_class.new.transform(nil, data)
expect(transformed_data).to eq(data)
end
end
context 'when nested data is not an array or hash' do
it 'only removes top level data/group keys' do
data = {
'data' => {
'group' => 'test'
}
}
transformed_data = described_class.new.transform(nil, data)
expect(transformed_data).to eq('test')
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Common::Transformers::HashKeyDigger do
describe '#transform' do
it 'when the key_path is an array' do
data = { foo: { bar: :value } }
key_path = %i[foo bar]
transformed = described_class.new(key_path: key_path).transform(nil, data)
expect(transformed).to eq(:value)
end
it 'when the key_path is not an array' do
data = { foo: { bar: :value } }
key_path = :foo
transformed = described_class.new(key_path: key_path).transform(nil, data)
expect(transformed).to eq({ bar: :value })
end
it "when the data is not a hash" do
expect { described_class.new(key_path: nil).transform(nil, nil) }
.to raise_error(ArgumentError, "Given data must be a Hash")
end
end
end
...@@ -90,13 +90,16 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do ...@@ -90,13 +90,16 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
it 'has transformers' do it 'has transformers' do
expect(described_class.transformers) expect(described_class.transformers)
.to contain_exactly( .to contain_exactly(
{ klass: BulkImports::Common::Transformers::GraphqlCleanerTransformer, options: nil }, { klass: BulkImports::Common::Transformers::HashKeyDigger, options: { key_path: %w[data group] } },
{ klass: BulkImports::Common::Transformers::UnderscorifyKeysTransformer, options: nil }, { klass: BulkImports::Common::Transformers::UnderscorifyKeysTransformer, options: nil },
{ klass: BulkImports::Groups::Transformers::GroupAttributesTransformer, options: nil }) { klass: BulkImports::Groups::Transformers::GroupAttributesTransformer, options: nil }
)
end end
it 'has loaders' do it 'has loaders' do
expect(described_class.loaders).to contain_exactly({ klass: BulkImports::Groups::Loaders::GroupLoader, options: nil }) expect(described_class.loaders).to contain_exactly({
klass: BulkImports::Groups::Loaders::GroupLoader, options: nil
})
end end
end end
end end
...@@ -18,8 +18,8 @@ RSpec.describe BulkImports::Importers::GroupImporter do ...@@ -18,8 +18,8 @@ RSpec.describe BulkImports::Importers::GroupImporter do
subject { described_class.new(bulk_import_entity) } subject { described_class.new(bulk_import_entity) }
before do before do
allow(Gitlab).to receive(:ee?).and_return(false)
allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context) allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context)
stub_http_requests
end end
describe '#execute' do describe '#execute' do
...@@ -39,18 +39,4 @@ RSpec.describe BulkImports::Importers::GroupImporter do ...@@ -39,18 +39,4 @@ RSpec.describe BulkImports::Importers::GroupImporter do
expect(pipeline).to receive(:run).with(context) expect(pipeline).to receive(:run).with(context)
end end
end end
def stub_http_requests
double_response = double(
code: 200,
success?: true,
parsed_response: {},
headers: {}
)
allow_next_instance_of(BulkImports::Clients::Http) do |client|
allow(client).to receive(:get).and_return(double_response)
allow(client).to receive(:post).and_return(double_response)
end
end
end end
...@@ -82,4 +82,68 @@ RSpec.describe BulkImports::Entity, type: :model do ...@@ -82,4 +82,68 @@ RSpec.describe BulkImports::Entity, type: :model do
end end
end end
end end
describe "#update_tracker_for" do
let(:entity) { create(:bulk_import_entity) }
it "inserts new tracker when it does not exist" do
expect do
entity.update_tracker_for(relation: :relation, has_next_page: false)
end.to change(BulkImports::Tracker, :count).by(1)
tracker = entity.trackers.last
expect(tracker.relation).to eq('relation')
expect(tracker.has_next_page).to eq(false)
expect(tracker.next_page).to eq(nil)
end
it "updates the tracker if it already exist" do
create(
:bulk_import_tracker,
relation: :relation,
has_next_page: false,
entity: entity
)
expect do
entity.update_tracker_for(relation: :relation, has_next_page: true, next_page: 'nextPage')
end.not_to change(BulkImports::Tracker, :count)
tracker = entity.trackers.last
expect(tracker.relation).to eq('relation')
expect(tracker.has_next_page).to eq(true)
expect(tracker.next_page).to eq('nextPage')
end
end
describe "#has_next_page?" do
it "queries for the given relation if it has more pages to be fetched" do
entity = create(:bulk_import_entity)
create(
:bulk_import_tracker,
relation: :relation,
has_next_page: false,
entity: entity
)
expect(entity.has_next_page?(:relation)).to eq(false)
end
end
describe "#next_page_for" do
it "queries for the next page of the given relation" do
entity = create(:bulk_import_entity)
create(
:bulk_import_tracker,
relation: :relation,
has_next_page: false,
next_page: 'nextPage',
entity: entity
)
expect(entity.next_page_for(:relation)).to eq('nextPage')
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment