Commit 0de175ef authored by Kassio Borges's avatar Kassio Borges

BulkImport: Import a group subgroups

After importing a Group, using the BulkImport feature, create
BulkImport::Entity for each Group subgroups and enqueue a new job to
process the new entities. This way a group tree will be imported
level-by-level, with one background job per level.

The subgroups are fetched by HTTP since the GraphQL API doesn't provide
this information yet.
parent e87df54d
......@@ -42,7 +42,7 @@ class Import::BulkImportsController < ApplicationController
end
def importable_data
client.get('groups', top_level_only: true)
client.get('groups', top_level_only: true).parsed_response
end
def client
......@@ -63,7 +63,6 @@ class Import::BulkImportsController < ApplicationController
def bulk_import_params
%i[
source_type
source_name
source_full_path
destination_name
destination_namespace
......
# frozen_string_literal: true
# Entry point of the BulkImport feature.
# This service receives a Gitlab Instance connection params
# and a list of groups to be imported.
#
# Process topography:
#
# sync | async
# |
# User +--> P1 +----> Pn +---+
# | ^ | Enqueue new job
# | +-----+
#
# P1 (sync)
#
# - Create a BulkImport record
# - Create a BulkImport::Entity for each group to be imported
# - Enqueue a BulkImportWorker job (P2) to import the given groups (entities)
#
# Pn (async)
#
# - For each group to be imported (BulkImport::Entity.with_status(:created))
# - Import the group data
# - Create entities for each subgroup of the imported group
# - Enqueue a BulkImportService job (Pn) to import the new entities (subgroups)
#
class BulkImportService
attr_reader :current_user, :params, :credentials
......@@ -11,7 +36,6 @@ class BulkImportService
def execute
bulk_import = create_bulk_import
bulk_import.start!
BulkImportWorker.perform_async(bulk_import.id)
end
......
......@@ -10,18 +10,6 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
worker_has_external_dependencies!
def perform(bulk_import_id)
bulk_import = BulkImport.find_by_id(bulk_import_id)
return unless bulk_import
bulk_import.entities.each do |entity|
entity.start!
BulkImports::Importers::GroupImporter.new(entity.id).execute
entity.finish!
end
bulk_import.finish!
BulkImports::Importers::GroupsImporter.new(bulk_import_id).execute
end
end
......@@ -18,7 +18,7 @@ module BulkImports
end
def get(resource, query = {})
response = with_error_handling do
with_error_handling do
Gitlab::HTTP.get(
resource_url(resource),
headers: request_headers,
......@@ -26,8 +26,22 @@ module BulkImports
query: query.merge(request_query)
)
end
end
def each_page(method, resource, query = {}, &block)
return to_enum(__method__, method, resource, query) unless block_given?
next_page = @page
response.parsed_response
while next_page
@page = next_page.to_i
response = self.public_send(method, resource, query) # rubocop: disable GitlabSecurity/PublicSend
collection = response.parsed_response
next_page = response.headers['x-next-page'].presence
yield collection
end
end
private
......
......@@ -14,11 +14,9 @@ module BulkImports
@context = context
Enumerator.new do |yielder|
context.entities.each do |entity|
result = graphql_client.execute(parsed_query, query_variables(entity))
result = graphql_client.execute(parsed_query, query_variables(context.entity))
yielder << result.original_hash.deep_dup
end
yielder << result.original_hash.deep_dup
end
end
......
# frozen_string_literal: true
module BulkImports
module Common
module Loaders
class EntitiesLoader
def initialize(*args); end
def load(context, entities)
bulk_import = context.entity.bulk_import
entities.each do |entity|
bulk_import.entities.create!(entity)
end
end
end
end
end
end
# frozen_string_literal: true
module BulkImports
module Groups
module Extractors
class SubgroupsExtractor
def initialize(*args); end
def extract(context)
encoded_parent_path = ERB::Util.url_encode(context.entity.source_full_path)
subgroups = []
http_client(context.entity.bulk_import.configuration)
.each_page(:get, "groups/#{encoded_parent_path}/subgroups") do |page|
subgroups << page
end
subgroups
end
private
def http_client(configuration)
@http_client ||= BulkImports::Clients::Http.new(
uri: configuration.url,
token: configuration.access_token,
per_page: 100
)
end
end
end
end
end
......@@ -11,7 +11,11 @@ module BulkImports
def load(context, data)
return unless user_can_create_group?(context.current_user, data)
::Groups::CreateService.new(context.current_user, data).execute
group = ::Groups::CreateService.new(context.current_user, data).execute
context.entity.update!(group: group)
group
end
private
......
# frozen_string_literal: true
module BulkImports
module Groups
module Pipelines
class SubgroupEntitiesPipeline
include Pipeline
extractor BulkImports::Groups::Extractors::SubgroupsExtractor
transformer BulkImports::Groups::Transformers::SubgroupsToEntitiesTransformer
loader BulkImports::Common::Loaders::EntitiesLoader
end
end
end
end
......@@ -9,7 +9,7 @@ module BulkImports
end
def transform(context, data)
import_entity = find_by_full_path(data['full_path'], context.entities)
import_entity = context.entity
data
.then { |data| transform_name(import_entity, data) }
......@@ -75,10 +75,6 @@ module BulkImports
data['subgroup_creation_level'] = Gitlab::Access.subgroup_creation_string_options[subgroup_creation_level]
data
end
def find_by_full_path(full_path, entities)
entities.find { |entity| entity.source_full_path == full_path }
end
end
end
end
......
# frozen_string_literal: true
module BulkImports
module Groups
module Transformers
class SubgroupsToEntitiesTransformer
def initialize(*args); end
def transform(context, data)
data.map do |entry|
{
source_type: :group_entity,
source_full_path: entry['full_path'],
destination_name: entry['name'],
destination_namespace: context.entity.group.full_path,
parent_id: context.entity.id
}
end
end
end
end
end
end
# frozen_string_literal: true
# Imports a top level group into a destination
# Optionally imports into parent group
# Entity must be of type: 'group' & have parent_id: nil
# Subgroups not handled yet
module BulkImports
module Importers
class GroupImporter
def initialize(entity_id)
@entity_id = entity_id
def initialize(entity)
@entity = entity
end
def execute
return if entity.parent
entity.start!
bulk_import = entity.bulk_import
configuration = bulk_import.configuration
context = BulkImports::Pipeline::Context.new(
current_user: bulk_import.user,
entities: [entity],
entity: entity,
configuration: configuration
)
BulkImports::Groups::Pipelines::GroupPipeline.new.run(context)
end
BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline.new.run(context)
def entity
@entity ||= BulkImports::Entity.find(@entity_id)
entity.finish!
end
private
attr_reader :entity
end
end
end
# frozen_string_literal: true
module BulkImports
module Importers
class GroupsImporter
def initialize(bulk_import_id)
@bulk_import = BulkImport.find(bulk_import_id)
end
def execute
bulk_import.start! unless bulk_import.started?
if entities_to_import.empty?
bulk_import.finish!
else
entities_to_import.each do |entity|
BulkImports::Importers::GroupImporter.new(entity).execute
end
# A new BulkImportWorker job is enqueued to either
# - Process the new BulkImports::Entity created for the subgroups
# - Or to mark the `bulk_import` as finished.
BulkImportWorker.perform_async(bulk_import.id)
end
end
private
attr_reader :bulk_import
def entities_to_import
@entities_to_import ||= bulk_import.entities.with_status(:created)
end
end
end
end
......@@ -9,7 +9,7 @@ module BulkImports
PIPELINE_ATTRIBUTES = [
Attribute.new(:current_user, User),
Attribute.new(:entities, Array),
Attribute.new(:entity, ::BulkImports::Entity),
Attribute.new(:configuration, ::BulkImports::Configuration)
].freeze
......
......@@ -55,10 +55,12 @@ RSpec.describe Import::BulkImportsController do
describe 'serialized group data' do
let(:client_response) do
[
{ 'id' => 1, 'full_name' => 'group1', 'full_path' => 'full/path/group1' },
{ 'id' => 2, 'full_name' => 'group2', 'full_path' => 'full/path/group2' }
]
double(
parsed_response: [
{ 'id' => 1, 'full_name' => 'group1', 'full_path' => 'full/path/group1' },
{ 'id' => 2, 'full_name' => 'group2', 'full_path' => 'full/path/group2' }
]
)
end
before do
......@@ -69,7 +71,7 @@ RSpec.describe Import::BulkImportsController do
it 'returns serialized group data' do
get :status, format: :json
expect(response.parsed_body).to eq({ importable_data: client_response }.as_json)
expect(json_response).to eq({ importable_data: client_response.parsed_response }.as_json)
end
end
......
......@@ -22,16 +22,6 @@ RSpec.describe BulkImports::Clients::Http do
end
end
describe 'parsed response' do
it 'returns parsed response' do
response_double = double(code: 200, success?: true, parsed_response: [{ id: 1 }, { id: 2 }])
allow(Gitlab::HTTP).to receive(:get).and_return(response_double)
expect(subject.get(resource)).to eq(response_double.parsed_response)
end
end
describe 'request query' do
include_examples 'performs network request' do
let(:expected_args) do
......@@ -91,5 +81,52 @@ RSpec.describe BulkImports::Clients::Http do
end
end
end
describe '#each_page' do
let(:objects1) { [{ object: 1 }, { object: 2 }] }
let(:objects2) { [{ object: 3 }, { object: 4 }] }
let(:response1) { double(success?: true, headers: { 'x-next-page' => 2 }, parsed_response: objects1) }
let(:response2) { double(success?: true, headers: {}, parsed_response: objects2) }
before do
stub_http_get('groups', { page: 1, per_page: 30 }, response1)
stub_http_get('groups', { page: 2, per_page: 30 }, response2)
end
context 'with a block' do
it 'yields every retrieved page to the supplied block' do
pages = []
subject.each_page(:get, 'groups') { |page| pages << page }
expect(pages[0]).to be_an_instance_of(Array)
expect(pages[1]).to be_an_instance_of(Array)
expect(pages[0]).to eq(objects1)
expect(pages[1]).to eq(objects2)
end
end
context 'without a block' do
it 'returns an Enumerator' do
expect(subject.each_page(:get, :foo)).to be_an_instance_of(Enumerator)
end
end
private
def stub_http_get(path, query, response)
uri = "http://gitlab.example:80/api/v4/#{path}"
params = {
follow_redirects: false,
headers: {
"Authorization" => "Bearer token",
"Content-Type" => "application/json"
}
}.merge(query: query)
allow(Gitlab::HTTP).to receive(:get).with(uri, params).and_return(response)
end
end
end
end
......@@ -10,7 +10,7 @@ RSpec.describe BulkImports::Common::Extractors::GraphqlExtractor do
let(:context) do
instance_double(
BulkImports::Pipeline::Context,
entities: [import_entity]
entity: import_entity
)
end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Common::Loaders::EntitiesLoader do
describe '#load' do
it "creates entities for the given data" do
group = create(:group, path: "imported-group")
parent_entity = create(:bulk_import_entity, group: group, bulk_import: create(:bulk_import))
context = instance_double(BulkImports::Pipeline::Context, entity: parent_entity)
data = [{
source_type: :group_entity,
source_full_path: "parent/subgroup",
destination_name: "subgroup",
destination_namespace: parent_entity.group.full_path,
parent_id: parent_entity.id
}]
expect { subject.load(context, data) }.to change(BulkImports::Entity, :count).by(1)
subgroup_entity = BulkImports::Entity.last
expect(subgroup_entity.source_full_path).to eq 'parent/subgroup'
expect(subgroup_entity.destination_namespace).to eq 'imported-group'
expect(subgroup_entity.destination_name).to eq 'subgroup'
expect(subgroup_entity.parent_id).to eq parent_entity.id
end
end
end
......@@ -7,9 +7,11 @@ RSpec.describe BulkImports::Groups::Loaders::GroupLoader do
let(:user) { create(:user) }
let(:data) { { foo: :bar } }
let(:service_double) { instance_double(::Groups::CreateService) }
let(:entity) { create(:bulk_import_entity) }
let(:context) do
instance_double(
BulkImports::Pipeline::Context,
entity: entity,
current_user: user
)
end
......@@ -21,6 +23,7 @@ RSpec.describe BulkImports::Groups::Loaders::GroupLoader do
it 'calls Group Create Service to create a new group' do
expect(::Groups::CreateService).to receive(:new).with(context.current_user, data).and_return(service_double)
expect(service_double).to receive(:execute)
expect(entity).to receive(:update!)
subject.load(context, data)
end
......
......@@ -7,20 +7,18 @@ RSpec.describe BulkImports::Groups::Pipelines::GroupPipeline do
let(:user) { create(:user) }
let(:parent) { create(:group) }
let(:entity) do
instance_double(
BulkImports::Entity,
create(
:bulk_import_entity,
source_full_path: 'source/full/path',
destination_name: 'My Destination Group',
destination_namespace: parent.full_path
)
end
let(:entities) { [entity] }
let(:context) do
instance_double(
BulkImports::Pipeline::Context,
BulkImports::Pipeline::Context.new(
current_user: user,
entities: entities
entity: entity
)
end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline do
describe '#run' do
let_it_be(:user) { create(:user) }
let(:parent) { create(:group, name: 'imported-group', path: 'imported-group') }
let!(:parent_entity) do
create(
:bulk_import_entity,
destination_namespace: parent.full_path,
group: parent
)
end
let(:context) do
instance_double(
BulkImports::Pipeline::Context,
current_user: user,
entity: parent_entity
)
end
let(:subgroup_data) do
[
{
"name" => "subgroup",
"full_path" => "parent/subgroup"
}
]
end
subject { described_class.new }
before do
allow_next_instance_of(BulkImports::Groups::Extractors::SubgroupsExtractor) do |extractor|
allow(extractor).to receive(:extract).and_return([subgroup_data])
end
parent.add_owner(user)
end
it 'creates entities for the subgroups' do
expect { subject.run(context) }.to change(BulkImports::Entity, :count).by(1)
subgroup_entity = BulkImports::Entity.last
expect(subgroup_entity.source_full_path).to eq 'parent/subgroup'
expect(subgroup_entity.destination_namespace).to eq 'imported-group'
expect(subgroup_entity.destination_name).to eq 'subgroup'
expect(subgroup_entity.parent_id).to eq parent_entity.id
end
end
describe 'pipeline parts' do
it { expect(described_class).to include_module(BulkImports::Pipeline) }
it { expect(described_class).to include_module(BulkImports::Pipeline::Attributes) }
it { expect(described_class).to include_module(BulkImports::Pipeline::Runner) }
it 'has extractors' do
expect(described_class.extractors).to contain_exactly(
klass: BulkImports::Groups::Extractors::SubgroupsExtractor,
options: nil
)
end
it 'has transformers' do
expect(described_class.transformers).to contain_exactly(
klass: BulkImports::Groups::Transformers::SubgroupsToEntitiesTransformer,
options: nil
)
end
it 'has loaders' do
expect(described_class.loaders).to contain_exactly(
klass: BulkImports::Common::Loaders::EntitiesLoader,
options: nil
)
end
end
end
......@@ -16,12 +16,11 @@ RSpec.describe BulkImports::Groups::Transformers::GroupAttributesTransformer do
)
end
let(:entities) { [entity] }
let(:context) do
instance_double(
BulkImports::Pipeline::Context,
current_user: user,
entities: entities
entity: entity
)
end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Groups::Transformers::SubgroupsToEntitiesTransformer do
describe "#transform" do
it "transforms subgroups data in entity params" do
parent = create(:group)
parent_entity = instance_double(BulkImports::Entity, group: parent, id: 1)
context = instance_double(BulkImports::Pipeline::Context, entity: parent_entity)
subgroup_data = [
{
"name" => "subgroup",
"full_path" => "parent/subgroup"
}
]
expect(subject.transform(context, subgroup_data)).to contain_exactly(
source_type: :group_entity,
source_full_path: "parent/subgroup",
destination_name: "subgroup",
destination_namespace: parent.full_path,
parent_id: 1
)
end
end
end
......@@ -8,40 +8,49 @@ RSpec.describe BulkImports::Importers::GroupImporter do
let(:bulk_import_entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
let(:bulk_import_configuration) { create(:bulk_import_configuration, bulk_import: bulk_import) }
let(:context) do
instance_double(
BulkImports::Pipeline::Context,
BulkImports::Pipeline::Context.new(
current_user: user,
entities: [bulk_import_entity],
entity: bulk_import_entity,
configuration: bulk_import_configuration
)
end
subject { described_class.new(bulk_import_entity.id) }
subject { described_class.new(bulk_import_entity) }
before do
allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context)
stub_http_requests
end
describe '#execute' do
before do
allow(BulkImports::Pipeline::Context).to receive(:new).and_return(context)
end
it "starts the entity and run its pipelines" do
expect(bulk_import_entity).to receive(:start).and_call_original
expect_to_run_pipeline BulkImports::Groups::Pipelines::GroupPipeline, context: context
expect_to_run_pipeline BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, context: context
context 'when import entity does not have parent' do
it 'executes GroupPipeline' do
expect_next_instance_of(BulkImports::Groups::Pipelines::GroupPipeline) do |pipeline|
expect(pipeline).to receive(:run).with(context)
end
subject.execute
subject.execute
end
expect(bulk_import_entity.reload).to be_finished
end
end
context 'when import entity has parent' do
let(:bulk_import_entity_parent) { create(:bulk_import_entity, bulk_import: bulk_import) }
let(:bulk_import_entity) { create(:bulk_import_entity, bulk_import: bulk_import, parent: bulk_import_entity_parent) }
def expect_to_run_pipeline(klass, context:)
expect_next_instance_of(klass) do |pipeline|
expect(pipeline).to receive(:run).with(context)
end
end
it 'does not execute GroupPipeline' do
expect(BulkImports::Groups::Pipelines::GroupPipeline).not_to receive(:new)
def stub_http_requests
double_response = double(
code: 200,
success?: true,
parsed_response: {},
headers: {}
)
subject.execute
end
allow_next_instance_of(BulkImports::Clients::Http) do |client|
allow(client).to receive(:get).and_return(double_response)
allow(client).to receive(:post).and_return(double_response)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe BulkImports::Importers::GroupsImporter do
let_it_be(:bulk_import) { create(:bulk_import) }
subject { described_class.new(bulk_import.id) }
describe '#execute' do
context "when there is entities to be imported" do
let!(:bulk_import_entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
it "starts the bulk_import and imports its entities" do
expect(BulkImports::Importers::GroupImporter).to receive(:new)
.with(bulk_import_entity).and_return(double(execute: true))
expect(BulkImportWorker).to receive(:perform_async).with(bulk_import.id)
subject.execute
expect(bulk_import.reload).to be_started
end
end
context "when there is no entities to be imported" do
it "starts the bulk_import and imports its entities" do
expect(BulkImports::Importers::GroupImporter).not_to receive(:new)
expect(BulkImportWorker).not_to receive(:perform_async)
subject.execute
expect(bulk_import.reload).to be_finished
end
end
end
end
......@@ -7,7 +7,7 @@ RSpec.describe BulkImports::Pipeline::Context do
it 'initializes with permitted attributes' do
args = {
current_user: create(:user),
entities: [],
entity: create(:bulk_import_entity),
configuration: create(:bulk_import_configuration)
}
......
......@@ -43,14 +43,6 @@ RSpec.describe BulkImportService do
expect { subject.execute }.to change { BulkImports::Configuration.count }.by(1)
end
it 'updates bulk import state' do
expect_next_instance_of(BulkImport) do |bulk_import|
expect(bulk_import).to receive(:start!)
end
subject.execute
end
it 'enqueues BulkImportWorker' do
expect(BulkImportWorker).to receive(:perform_async)
......
......@@ -3,37 +3,14 @@
require 'spec_helper'
RSpec.describe BulkImportWorker do
let!(:bulk_import) { create(:bulk_import, :started) }
let!(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) }
let(:importer) { double(execute: nil) }
subject { described_class.new }
describe '#perform' do
before do
allow(BulkImports::Importers::GroupImporter).to receive(:new).and_return(importer)
end
it 'executes Group Importer' do
expect(importer).to receive(:execute)
subject.perform(bulk_import.id)
end
it 'updates bulk import and entity state' do
subject.perform(bulk_import.id)
expect(bulk_import.reload.human_status_name).to eq('finished')
expect(entity.reload.human_status_name).to eq('finished')
end
bulk_import_id = 1
context 'when bulk import could not be found' do
it 'does nothing' do
expect(bulk_import).not_to receive(:top_level_groups)
expect(bulk_import).not_to receive(:finish!)
expect(BulkImports::Importers::GroupsImporter)
.to receive(:new).with(bulk_import_id).and_return(double(execute: true))
subject.perform(non_existing_record_id)
end
described_class.new.perform(bulk_import_id)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment