Commit 0af73d70 authored by Klaus Wölfel's avatar Klaus Wölfel

processing of analysis with transient items

parent 8bc4d3b0
portal = context.getPortalObject() portal = context.getPortalObject()
operation = None operation = None
use = None use_list = []
parameter_dict = {} parameter_dict = {}
transient_output_item = None
context.checkConsistency(fixit=True) context.checkConsistency(fixit=True)
initial_product = context.getSpecialiseValue(portal_type="Data Transformation").getResourceValue() initial_product = context.getSpecialiseValue(portal_type="Data Transformation").getResourceValue()
for analysis_line in context.objectValues(portal_type="Data Analysis Line"): analysis_line_list = [(a.getIntIndex(), a) for a in context.objectValues(portal_type="Data Analysis Line")]
for int_index, analysis_line in sorted(analysis_line_list):
resource = analysis_line.getResourceValue() resource = analysis_line.getResourceValue()
if resource == initial_product: if resource == initial_product:
use = analysis_line.getUse() use_list = analysis_line.getUseList()
if resource is not None: if resource is not None:
resource_portal_type = resource.getPortalType() resource_portal_type = resource.getPortalType()
else: else:
...@@ -17,16 +19,16 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"): ...@@ -17,16 +19,16 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"):
operation = analysis_line.getResourceValue() operation = analysis_line.getResourceValue()
else: else:
parameter = {} parameter = {}
for portal_type in ["Data Array", "Progress Indicator"] + \ for portal_type in ["Data Array", "Data Array View", "Progress Indicator"] + \
list(portal.getPortalDataSinkTypeList()) + \ list(portal.getPortalDataSinkTypeList()) + \
list(portal.getPortalDataDescriptorTypeList()): list(portal.getPortalDataDescriptorTypeList()):
value = analysis_line.getAggregateValue(portal_type=portal_type) value = analysis_line.getAggregateValue(portal_type=portal_type)
if value is not None: if value is not None:
parameter[portal_type] = value parameter[portal_type] = value
if analysis_line.getQuantity() < 0 and analysis_line.getUse() == "big_data/analysis/transient": if analysis_line.getQuantity() < 0 and "big_data/analysis/transient" in analysis_line.getUseList():
# at the moment we only support transient data arrays # at the moment we only support transient data arrays
parameter['Data Array'] = transient_input_item parameter['Data Array'] = transient_input_item
if analysis_line.getQuantity() > 0 and analysis_line.getUse() == "big_data/analysis/transient": if analysis_line.getQuantity() > 0 and "big_data/analysis/transient" in analysis_line.getUseList():
# at the moment we only support transient data arrays # at the moment we only support transient data arrays
transient_output_item = portal.data_array_module.newContent(portal_type='Data Array', transient_output_item = portal.data_array_module.newContent(portal_type='Data Array',
temp_object=True) temp_object=True)
...@@ -42,6 +44,10 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"): ...@@ -42,6 +44,10 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"):
parameter_dict[reference].append(parameter) parameter_dict[reference].append(parameter)
else: else:
parameter_dict[reference] = parameter parameter_dict[reference] = parameter
if transient_output_item is not None and not consuming_analysis_list:
return
script_id = operation.getScriptId() script_id = operation.getScriptId()
out = getattr(operation_analysis_line, script_id)(**parameter_dict) out = getattr(operation_analysis_line, script_id)(**parameter_dict)
...@@ -52,5 +58,5 @@ if out == 1: ...@@ -52,5 +58,5 @@ if out == 1:
context.activate(serialization_tag=str(context.getUid())).DataAnalysis_executeDataOperation(consuming_analysis_list) context.activate(serialization_tag=str(context.getUid())).DataAnalysis_executeDataOperation(consuming_analysis_list)
else: else:
# only stop batch ingestions # only stop batch ingestions
if use == "big_data/ingestion/batch": if "big_data/ingestion/batch" in use_list:
context.stop() context.stop()
...@@ -101,16 +101,22 @@ for movement in portal_catalog(query): ...@@ -101,16 +101,22 @@ for movement in portal_catalog(query):
resource_relative_url = resource.getRelativeUrl()) resource_relative_url = resource.getRelativeUrl())
for related_movement in related_movement_list: for related_movement in related_movement_list:
aggregate_set.update(related_movement.getAggregateSet()) aggregate_set.update(related_movement.getAggregateSet())
if related_movement.getUse() == "big_data/ingestion/batch": if "big_data/ingestion/batch" in related_movement.getUseList():
related_movement.getParentValue().deliver() related_movement.getParentValue().deliver()
# create new item based on item_type if it is not already aggregated # create new item based on item_type if it is not already aggregated
aggregate_type_set = set( aggregate_type_set = set(
[portal.restrictedTraverse(a).getPortalType() for a in aggregate_set]) [portal.restrictedTraverse(a).getPortalType() for a in aggregate_set])
for item_type in transformation_line.getAggregatedPortalTypeList(): for item_type in transformation_line.getAggregatedPortalTypeList():
# create item if it does note exist yet. # Create item if it does note exist yet.
# Except if it is a Data Array Line, then it is currently created by # Do not create item if it is a Data Array Line, then it is created by
# data operation itself (probably this exception is inconsistent) # data operation itself (probably this exception is inconsistent).
if item_type not in aggregate_type_set and item_type != "Data Array Line": # Do not create item if it is a transient Data Array.
# Do not create item if it is an input Data Array
if item_type not in aggregate_type_set \
and item_type != "Data Array Line" \
and not (item_type == "Data Array" \
and "big_data/analysis/transient" in transformation_line.getUseList() ) \
and not (quantity < 0 and item_type == "Data Array"):
item = portal.portal_catalog.getResultValue( item = portal.portal_catalog.getResultValue(
portal_type=item_type, portal_type=item_type,
validation_state="validated", validation_state="validated",
...@@ -141,7 +147,9 @@ for movement in portal_catalog(query): ...@@ -141,7 +147,9 @@ for movement in portal_catalog(query):
if line.getResourceValue().getPortalType() == "Data Operation": if line.getResourceValue().getPortalType() == "Data Operation":
aggregate_set.update(line.getAggregateList()) aggregate_set.update(line.getAggregateList())
data_analysis.newContent( tag = "%s-%s" %(data_analysis.getUid(), transformation_line.getUid())
data_analysis_line = data_analysis.newContent(
activate_kw={'tag': tag},
portal_type = "Data Analysis Line", portal_type = "Data Analysis Line",
title = transformation_line.getTitle(), title = transformation_line.getTitle(),
reference = transformation_line.getReference(), reference = transformation_line.getReference(),
...@@ -150,7 +158,15 @@ for movement in portal_catalog(query): ...@@ -150,7 +158,15 @@ for movement in portal_catalog(query):
variation_category_list = transformation_line.getVariationCategoryList(), variation_category_list = transformation_line.getVariationCategoryList(),
quantity = quantity, quantity = quantity,
quantity_unit = transformation_line.getQuantityUnit(), quantity_unit = transformation_line.getQuantityUnit(),
use = transformation_line.getUse(), use_list = transformation_line.getUseList(),
aggregate_set = aggregate_set) aggregate_set = aggregate_set)
# fix consistency of line and all affected items. Do it after reindexing
# activities of newly created Data Analysis Line finished, because check
# consistency script might need to find the newly created Data Analysis
# Line in catalog.
data_analysis_line.checkConsistency(fixit=True)
for item in data_analysis_line.getAggregateValueList():
item.activate(after_tag=tag).checkConsistency(fixit=True)
data_analysis.start() data_analysis.start()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment