Commit 10dab323 authored by Levin Zimmermann's avatar Levin Zimmermann

Add transient use for Data Analysis

See merge request !101
parents 1098afbf 38b821af
...@@ -31,6 +31,7 @@ from Products.ERP5Type import Permissions, PropertySheet ...@@ -31,6 +31,7 @@ from Products.ERP5Type import Permissions, PropertySheet
from erp5.component.document.BigFile import BigFile from erp5.component.document.BigFile import BigFile
from wendelin.bigarray.array_zodb import ZBigArray from wendelin.bigarray.array_zodb import ZBigArray
from erp5.component.document.File import _MARKER from erp5.component.document.File import _MARKER
from wendelin.bigarray.array_ram import RAMArray
from ZPublisher import HTTPRangeSupport from ZPublisher import HTTPRangeSupport
from webdav.common import rfc1123_date from webdav.common import rfc1123_date
from DateTime import DateTime from DateTime import DateTime
...@@ -60,7 +61,10 @@ class DataArray(BigFile): ...@@ -60,7 +61,10 @@ class DataArray(BigFile):
""" """
Initialise array. Initialise array.
""" """
array = ZBigArray(shape, dtype) if self.__module__ == "erp5.temp_portal_type":
array = RAMArray(shape, dtype)
else:
array = ZBigArray(shape, dtype)
self._setArray(array) self._setArray(array)
return array return array
......
portal = context.getPortalObject() portal = context.getPortalObject()
operation = None operation = None
use = None use_list = []
parameter_dict = {} parameter_dict = {}
transient_output_item = None
context.checkConsistency(fixit=True) context.checkConsistency(fixit=True)
initial_product = context.getSpecialiseValue(portal_type="Data Transformation").getResourceValue() initial_product = context.getSpecialiseValue(portal_type="Data Transformation").getResourceValue()
for analysis_line in sorted(context.objectValues(portal_type="Data Analysis Line"), for analysis_line in sorted(context.objectValues(portal_type="Data Analysis Line"),
key=lambda x: x.getIntIndex()): key=lambda x: x.getIntIndex()):
resource = analysis_line.getResourceValue() resource = analysis_line.getResourceValue()
if resource == initial_product: if resource == initial_product:
use = analysis_line.getUse() use_list = analysis_line.getUseList()
if resource is not None: if resource is not None:
resource_portal_type = resource.getPortalType() resource_portal_type = resource.getPortalType()
else: else:
...@@ -19,13 +20,20 @@ for analysis_line in sorted(context.objectValues(portal_type="Data Analysis Line ...@@ -19,13 +20,20 @@ for analysis_line in sorted(context.objectValues(portal_type="Data Analysis Line
operation = analysis_line.getResourceValue() operation = analysis_line.getResourceValue()
else: else:
parameter = {} parameter = {}
for portal_type in ["Data Array", "Progress Indicator"] + \ for portal_type in ["Data Array", "Data Array View", "Progress Indicator"] + \
list(portal.getPortalDataSinkTypeList()) + \ list(portal.getPortalDataSinkTypeList()) + \
list(portal.getPortalDataDescriptorTypeList()): list(portal.getPortalDataDescriptorTypeList()):
value = analysis_line.getAggregateValue(portal_type=portal_type) value = analysis_line.getAggregateValue(portal_type=portal_type)
if value is not None: if value is not None:
parameter[portal_type] = value parameter[portal_type] = value
if analysis_line.getQuantity() < 0 and "big_data/analysis/transient" in analysis_line.getUseList():
# at the moment we only support transient data arrays
parameter['Data Array'] = transient_input_item
if analysis_line.getQuantity() > 0 and "big_data/analysis/transient" in analysis_line.getUseList():
# at the moment we only support transient data arrays
transient_output_item = portal.data_array_module.newContent(portal_type='Data Array',
temp_object=True)
parameter['Data Array'] = transient_output_item
for base_category in analysis_line.getVariationRangeBaseCategoryList(): for base_category in analysis_line.getVariationRangeBaseCategoryList():
parameter[base_category] = analysis_line.getVariationCategoryItemList( parameter[base_category] = analysis_line.getVariationCategoryItemList(
base_category_list=(base_category,))[0][0] base_category_list=(base_category,))[0][0]
...@@ -43,16 +51,21 @@ for analysis_line in sorted(context.objectValues(portal_type="Data Analysis Line ...@@ -43,16 +51,21 @@ for analysis_line in sorted(context.objectValues(portal_type="Data Analysis Line
parameter_dict[reference].append(parameter) parameter_dict[reference].append(parameter)
else: else:
parameter_dict[reference] = parameter parameter_dict[reference] = parameter
if transient_output_item is not None and not consuming_analysis_list:
return
script_id = operation.getScriptId() script_id = operation.getScriptId()
out = getattr(operation_analysis_line, script_id)(**parameter_dict) out = getattr(operation_analysis_line, script_id)(**parameter_dict)
for consuming_analysis in consuming_analysis_list:
portal.restrictedTraverse(consuming_analysis).DataAnalysis_executeDataOperation(transient_input_item = transient_output_item)
if out == 1: if out == 1:
context.activate(serialization_tag=str(context.getUid())).DataAnalysis_executeDataOperation() context.activate(serialization_tag=str(context.getUid())).DataAnalysis_executeDataOperation(consuming_analysis_list)
else: else:
# only stop batch ingestions # only stop batch ingestions
if use == "big_data/ingestion/batch": if "big_data/ingestion/batch" in use_list:
context.stop() context.stop()
# stop refresh # stop refresh
if context.getRefreshState() == "refresh_started": if context.getRefreshState() == "refresh_started":
......
...@@ -50,7 +50,7 @@ ...@@ -50,7 +50,7 @@
</item> </item>
<item> <item>
<key> <string>_params</string> </key> <key> <string>_params</string> </key>
<value> <string></string> </value> <value> <string>consuming_analysis_list=[], transient_input_item=None</string> </value>
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>id</string> </key>
......
...@@ -121,13 +121,13 @@ for movement in portal_catalog(query = query): ...@@ -121,13 +121,13 @@ for movement in portal_catalog(query = query):
resource_relative_url = resource.getRelativeUrl()) resource_relative_url = resource.getRelativeUrl())
for related_movement in related_movement_list: for related_movement in related_movement_list:
#aggregate_set.update(related_movement.getAggregateSet()) if "big_data/ingestion/batch" in related_movement.getUseList():
related_movement.getParentValue().deliver() related_movement.getParentValue().deliver()
# create new item based on item_type if it is not already aggregated # create new item based on item_type if it is not already aggregated
aggregate_type_set = set( aggregate_type_set = set(
[portal.restrictedTraverse(a).getPortalType() for a in aggregate_set]) [portal.restrictedTraverse(a).getPortalType() for a in aggregate_set])
for item_type in transformation_line.getAggregatedPortalTypeList(): for item_type in transformation_line.getAggregatedPortalTypeList():
# if item is not yet aggregated to this line, search it by related project # if item is not yet aggregated to this line, search it by related project
# and source If the item is a data configuration or a device configuration # and source If the item is a data configuration or a device configuration
...@@ -135,22 +135,25 @@ for movement in portal_catalog(query = query): ...@@ -135,22 +135,25 @@ for movement in portal_catalog(query = query):
# the variation nor the related sensor. Data Array Lines are created # the variation nor the related sensor. Data Array Lines are created
# by Data Operation. # by Data Operation.
if item_type not in aggregate_type_set: if all(
[
# Do not create item if it is a Data Array Line, then it is created by data operation itself.
item_type not in aggregate_type_set,
# Do not create item if it is a transient Data Array.
not (item_type == "Data Array" and "big_data/analysis/transient" in transformation_line.getUseList()),
]
):
item = None
if item_type in portal.getPortalDeviceConfigurationTypeList() + portal.getPortalDataConfigurationTypeList(): if item_type in portal.getPortalDeviceConfigurationTypeList() + portal.getPortalDataConfigurationTypeList():
if item_type != "Status Configuration":
if item_type == "Status Configuration":
item = None
else:
item = portal.portal_catalog.getResultValue( item = portal.portal_catalog.getResultValue(
portal_type=item_type, portal_type=item_type,
#validation_state="validated", #validation_state="validated",
item_project_relative_url=delivery.getDestinationProject(), item_project_relative_url=delivery.getDestinationProject(),
item_source_relative_url=delivery.getSource()) item_source_relative_url=delivery.getSource())
elif item_type != "Data Array Line": elif item_type != "Data Array Line":
item_query_dict = dict( item_query_dict = dict(
portal_type=item_type, portal_type=item_type,
validation_state="validated", validation_state="validated",
...@@ -159,8 +162,10 @@ for movement in portal_catalog(query = query): ...@@ -159,8 +162,10 @@ for movement in portal_catalog(query = query):
item_resource_uid=resource.getUid(), item_resource_uid=resource.getUid(),
item_source_relative_url=data_analysis.getSource()) item_source_relative_url=data_analysis.getSource())
if data_analysis.getDestinationProjectValue() is not None: if data_analysis.getDestinationProjectValue() is not None:
item_query_dict["item_project_relative_url"] = data_analysis.getDestinationProject() item_query_dict["item_project_relative_url"] = data_analysis.getDestinationProject()
item = portal.portal_catalog.getResultValue(**item_query_dict) item = portal.portal_catalog.getResultValue(**item_query_dict)
if item is None: if item is None:
...@@ -176,7 +181,9 @@ for movement in portal_catalog(query = query): ...@@ -176,7 +181,9 @@ for movement in portal_catalog(query = query):
pass pass
aggregate_set.add(item.getRelativeUrl()) aggregate_set.add(item.getRelativeUrl())
tag = "%s-%s" %(data_analysis.getUid(), transformation_line.getUid())
data_analysis_line = data_analysis.newContent( data_analysis_line = data_analysis.newContent(
activate_kw={'tag': tag},
portal_type = "Data Analysis Line", portal_type = "Data Analysis Line",
title = transformation_line.getTitle(), title = transformation_line.getTitle(),
reference = transformation_line.getReference(), reference = transformation_line.getReference(),
...@@ -185,7 +192,7 @@ for movement in portal_catalog(query = query): ...@@ -185,7 +192,7 @@ for movement in portal_catalog(query = query):
variation_category_list = transformation_line.getVariationCategoryList(), variation_category_list = transformation_line.getVariationCategoryList(),
quantity = quantity, quantity = quantity,
quantity_unit = transformation_line.getQuantityUnit(), quantity_unit = transformation_line.getQuantityUnit(),
use = transformation_line.getUse(), use_list = transformation_line.getUseList(),
aggregate_set = aggregate_set) aggregate_set = aggregate_set)
# for intput lines of first level analysis set causality and specialise # for intput lines of first level analysis set causality and specialise
if quantity < 0 and delivery.getPortalType() == "Data Ingestion": if quantity < 0 and delivery.getPortalType() == "Data Ingestion":
...@@ -193,7 +200,14 @@ for movement in portal_catalog(query = query): ...@@ -193,7 +200,14 @@ for movement in portal_catalog(query = query):
causality_value = delivery, causality_value = delivery,
specialise_value_list = data_supply_list) specialise_value_list = data_supply_list)
data_analysis.checkConsistency(fixit=True) # fix consistency of line and all affected items. Do it after reindexing
# activities of newly created Data Analysis Line finished, because check
# consistency script might need to find the newly created Data Analysis
# Line in catalog.
data_analysis_line.checkConsistency(fixit=True)
for item in data_analysis_line.getAggregateValueList():
item.activate(after_tag=tag).checkConsistency(fixit=True)
try: try:
data_analysis.start() data_analysis.start()
except UnsupportedWorkflowMethod: except UnsupportedWorkflowMethod:
......
portal = context.getPortalObject() portal = context.getPortalObject()
#search_kw = { consuming_analysis_list_dict = {}
# 'simulation_state': 'started',
# 'portal_type': 'Data Analysis',
#}
#method_kw = { def add_consuming_analysis(producing_analysis_relative_url, consuming_analysis_relative_url):
# 'active_process': this_portal_type_active_process, consuming_analysis_list = consuming_analysis_list_dict.setdefault(producing_analysis_relative_url, [])
#} consuming_analysis_list.append(consuming_analysis_relative_url)
#activate_kw = {
# 'tag': tag,
# 'priority': priority,
#}
#portal.portal_catalog.searchAndActivate(
# method_id='DataAnalysis_executeDataOperation',
# method_kw=method_kw,
# activate_kw=activate_kw,
# **search_kw)
# First we split all started Data Analysis documents into documents with transient
# inputs and without transient inputs. Documents without transient inputs
# are added to 'data_analysis_list'.
data_analysis_list = []
for data_analysis in portal.portal_catalog(portal_type = "Data Analysis", for data_analysis in portal.portal_catalog(portal_type = "Data Analysis",
simulation_state = "started"): simulation_state = "started"):
has_transient_input = False
for line in data_analysis.objectValues(portal_type="Data Analysis Line"):
if line.getUse() == "big_data/analysis/transient" and line.getQuantity() < 0:
has_transient_input = True
add_consuming_analysis(line.getParentValue().getCausality(), line.getParentRelativeUrl())
if not has_transient_input:
data_analysis_list.append(data_analysis)
# Now we will activate `executeDataOperation` on given Data Analysis documents
for data_analysis in data_analysis_list:
if not data_analysis.hasActivity(): if not data_analysis.hasActivity():
if data_analysis.getRefreshState() == "current": if data_analysis.getRefreshState() == "current":
consuming_analysis_list = consuming_analysis_list_dict.get(data_analysis.getRelativeUrl(), [])
data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\ data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\
.DataAnalysis_executeDataOperation() .DataAnalysis_executeDataOperation(consuming_analysis_list)
# Finally we refresh specified Data Analysis documents
for data_analysis in portal.portal_catalog(portal_type = "Data Analysis", for data_analysis in portal.portal_catalog(portal_type = "Data Analysis",
refresh_state = "refresh_planned"): refresh_state = "refresh_planned"):
if data_analysis.getRefreshState() == "refresh_planned": if data_analysis.getRefreshState() == "refresh_planned":
......
...@@ -34,10 +34,11 @@ import string ...@@ -34,10 +34,11 @@ import string
import random import random
import urllib import urllib
def getRandomString(): def getRandomString():
return 'test_%s' %''.join([random.choice(string.ascii_letters + string.digits) \ return 'test_%s' %''.join([random.choice(string.ascii_letters + string.digits) \
for _ in xrange(32)]) for _ in xrange(32)])
def chunks(l, n): def chunks(l, n):
"""Yield successive n-sized chunks from l.""" """Yield successive n-sized chunks from l."""
for i in xrange(0, len(l), n): for i in xrange(0, len(l), n):
...@@ -510,3 +511,16 @@ class Test(ERP5TypeTestCase): ...@@ -510,3 +511,16 @@ class Test(ERP5TypeTestCase):
len(to_delete_data_analysis.objectValues())) len(to_delete_data_analysis.objectValues()))
self.assertEqual("started", to_delete_data_analysis.getSimulationState()) self.assertEqual("started", to_delete_data_analysis.getSimulationState())
def test_11_temporaryDataArray(self):
"""
Test if temporary Data Array is functional.
"""
portal = self.portal
ndarray = np.array([[0, 1], [2, 3]])
temporary_data_array = portal.data_array_module.newContent(
portal_type='Data Array',
temp_object=True
)
zbigarray = temporary_data_array.initArray(shape=ndarray.shape, dtype=ndarray.dtype)
zbigarray.append(ndarray)
self.assertTrue(np.array_equal(zbigarray[2:], ndarray))
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment