Commit 5f2cf8aa authored by George Koltsov's avatar George Koltsov

BulkImport - cache epics source id in redis for later use

  - Store epic ids from source GitLab instance during
    Bulk Import migration in order to use them in other
    pipelines. Perform cleanup once entity is finished
    or failed.
parent c2389dcf
......@@ -66,6 +66,16 @@ class BulkImports::Entity < ApplicationRecord
event :fail_op do
transition any => :failed
end
after_transition any => [:finished, :failed] do |entity|
Gitlab::Redis::Cache.with do |redis|
pattern = "bulk_import:#{entity.bulk_import.id}:entity:#{entity.id}:*"
redis.scan_each(match: pattern).each do |key|
redis.del(key)
end
end
end
end
def update_tracker_for(relation:, has_next_page:, next_page: nil)
......
......@@ -21,6 +21,7 @@ module EE
has_next_page: hasNextPage
}
nodes {
id
iid
title
description
......
......@@ -13,6 +13,10 @@ module EE
transformer ::BulkImports::Common::Transformers::ProhibitedAttributesTransformer
transformer EE::BulkImports::Groups::Transformers::EpicAttributesTransformer
def transform(_, data)
cache_epic_source_params(data)
end
def load(context, data)
raise ::BulkImports::Pipeline::NotAllowedError unless authorized?
......@@ -36,6 +40,22 @@ module EE
def authorized?
context.current_user.can?(:admin_epic, context.group)
end
def cache_epic_source_params(data)
source_id = GlobalID.parse(data['id'])&.model_id
source_iid = data['iid']
if source_id
cache_key = "bulk_import:#{context.bulk_import.id}:entity:#{context.entity.id}:epic:#{source_iid}"
source_params = { source_id: source_id }
::Gitlab::Redis::Cache.with do |redis|
redis.set(cache_key, source_params.to_json, ex: ::BulkImports::Pipeline::CACHE_KEY_EXPIRATION)
end
end
data
end
end
end
end
......
......@@ -2,7 +2,7 @@
require 'spec_helper'
RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline, :clean_gitlab_redis_cache do
let_it_be(:cursor) { 'cursor' }
let_it_be(:user) { create(:user) }
let_it_be(:group) { create(:group) }
......@@ -30,7 +30,7 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
describe '#run' do
it 'imports group epics into destination group' do
first_page = extractor_data(has_next_page: true, cursor: cursor)
last_page = extractor_data(has_next_page: false)
last_page = extractor_data(has_next_page: false, page: 2)
allow_next_instance_of(BulkImports::Common::Extractors::GraphqlExtractor) do |extractor|
allow(extractor)
......@@ -93,6 +93,20 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
end
end
describe '#transform' do
it 'caches epic source id in redis' do
data = { 'id' => 'gid://gitlab/Epic/1', 'iid' => 1 }
cache_key = "bulk_import:#{bulk_import.id}:entity:#{entity.id}:epic:#{data['iid']}"
source_params = { source_id: '1' }.to_json
::Gitlab::Redis::Cache.with do |redis|
expect(redis).to receive(:set).with(cache_key, source_params, ex: ::BulkImports::Pipeline::CACHE_KEY_EXPIRATION)
end
subject.transform(context, data)
end
end
describe '#after_run' do
context 'when extracted data has next page' do
it 'updates tracker information and runs pipeline again' do
......@@ -148,9 +162,11 @@ RSpec.describe EE::BulkImports::Groups::Pipelines::EpicsPipeline do
end
end
def extractor_data(has_next_page:, cursor: nil)
def extractor_data(has_next_page:, cursor: nil, page: 1)
data = [
{
'id' => "gid://gitlab/Epic/#{page}",
'iid' => page,
'title' => 'epic1',
'state' => 'closed',
'confidential' => true,
......
......@@ -9,6 +9,8 @@ module BulkImports
NotAllowedError = Class.new(StandardError)
CACHE_KEY_EXPIRATION = 2.hours
def initialize(context)
@context = context
end
......
......@@ -189,4 +189,20 @@ RSpec.describe BulkImports::Entity, type: :model do
expect(entity.next_page_for(:relation)).to eq('nextPage')
end
end
describe 'caching', :clean_gitlab_redis_cache do
let(:entity) { create(:bulk_import_entity, :started) }
it 'removes entity cache keys' do
cache_key = "bulk_import:#{entity.bulk_import.id}:entity:#{entity.id}:relation:1"
Gitlab::Redis::Cache.with do |redis|
redis.set(cache_key, 1)
expect(redis).to receive(:del).with(cache_key)
end
entity.finish!
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment