Commit f68899a8 authored by Klaus Wölfel's avatar Klaus Wölfel

allow to get ingestion parameters and operation without creating data ingestion

parent e08b1192
from DateTime import DateTime
if data_ingestion_id is None:
now = DateTime()
today_string = now.strftime('%Y%m%d')
data_ingestion_id = "%s-%s" %(today_string, data_ingestion_reference)
portal = context.getPortalObject()
data_ingestion = portal.data_ingestion_module.newContent(
id = data_ingestion_id,
portal_type = "Data Ingestion",
reference = data_ingestion_reference,
specialise_list = specialise_list)
composed = data_ingestion.asComposedDocument()
property_list = ["title",
"source",
"source_section",
"source_project",
"destination",
"destination_section",
"destination_project",
"specialise"]
property_dict = {p: composed.getProperty(p) for p in property_list}
property_dict["start_date"] = composed.getEffectiveDate()
property_dict["stop_date"] = composed.getExpirationDate()
data_ingestion.edit(**property_dict)
# create ingestion lines from specialise lines
for supply_line in composed.objectValues(
portal_type = 'Data Supply Line'):
current_line = data_ingestion.newContent(
portal_type = "Data Ingestion Line",
title = supply_line.getTitle(),
aggregate = supply_line.getAggregateList(),
int_index = supply_line.getIntIndex(),
quantity = supply_line.getQuantity(),
reference = supply_line.getReference(),
resource = supply_line.getResource(),
use = supply_line.getUse()
)
if current_line.getResourceValue().getPortalType() == "Data Product":
# we set quantity=0 for the data product lines
current_line.setQuantity(0)
data_ingestion.start()
return data_ingestion
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>specialise_list, data_ingestion_reference, data_ingestion_id=None</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_createDataIngestion</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
...@@ -9,18 +9,18 @@ data_ingestion_reference = movement_dict.get('reference', reference) ...@@ -9,18 +9,18 @@ data_ingestion_reference = movement_dict.get('reference', reference)
data_ingestion_id = "%s-%s" %(today_string, data_ingestion_reference) data_ingestion_id = "%s-%s" %(today_string, data_ingestion_reference)
resource_reference = movement_dict.get('resource_reference', None) resource_reference = movement_dict.get('resource_reference', None)
specialise_reference = movement_dict.get('specialise_reference', None) specialise_reference = movement_dict.get('specialise_reference', None)
# first search for applicable data ingestion # first search for applicable data ingestion
data_ingestion = portal_catalog.getResultValue( data_ingestion = portal_catalog.getResultValue(
portal_type = 'Data Ingestion', portal_type = 'Data Ingestion',
simulation_state = ['started', 'stopped'], simulation_state = ['started', 'stopped'],
reference =data_ingestion_reference) reference = data_ingestion_reference)
def init_input_line(input_line, operation_line): def init_input_line(input_line, operation_line):
# copy device and configuration from operation line to input line # copy device and configuration from operation line to input line
input_line.setAggregateSet( input_line.setAggregateSet(
input_line.getAggregateList() + operation_line.getAggregateList()) input_line.getAggregateList() + operation_line.getAggregateList())
# Check if we have a batch referece # Check if we have a batch referece
data_ingestion_batch_reference = movement_dict.get( data_ingestion_batch_reference = movement_dict.get(
'aggregate_data_ingestion_batch_reference', 'aggregate_data_ingestion_batch_reference',
...@@ -35,7 +35,7 @@ def init_input_line(input_line, operation_line): ...@@ -35,7 +35,7 @@ def init_input_line(input_line, operation_line):
data_ingestion_batch = portal_catalog.getResultValue( data_ingestion_batch = portal_catalog.getResultValue(
portal_type = "Data Ingestion Batch", portal_type = "Data Ingestion Batch",
reference = data_ingestion_batch_reference) reference = data_ingestion_batch_reference)
if data_ingestion_batch is None: if data_ingestion_batch is None:
data_ingestion_batch = portal.data_ingestion_batch_module.get( data_ingestion_batch = portal.data_ingestion_batch_module.get(
data_ingestion_batch_id) data_ingestion_batch_id)
...@@ -44,13 +44,13 @@ def init_input_line(input_line, operation_line): ...@@ -44,13 +44,13 @@ def init_input_line(input_line, operation_line):
id = data_ingestion_batch_id, id = data_ingestion_batch_id,
portal_type = "Data Ingestion Batch", portal_type = "Data Ingestion Batch",
reference = data_ingestion_batch_reference) reference = data_ingestion_batch_reference)
else: else:
previous_data_ingestion_line = portal_catalog.getResultValue( previous_data_ingestion_line = portal_catalog.getResultValue(
portal_type = "Data Ingestion Line", portal_type = "Data Ingestion Line",
resource_reference = resource_reference, resource_reference = resource_reference,
aggregate_uid = data_ingestion_batch.getUid()) aggregate_uid = data_ingestion_batch.getUid())
if previous_data_ingestion_line is not None: if previous_data_ingestion_line is not None:
data_product = previous_data_ingestion_line.getResourceValue() data_product = previous_data_ingestion_line.getResourceValue()
data_sink_type_list = data_product.getAggregatedPortalTypeList() data_sink_type_list = data_product.getAggregatedPortalTypeList()
...@@ -58,7 +58,7 @@ def init_input_line(input_line, operation_line): ...@@ -58,7 +58,7 @@ def init_input_line(input_line, operation_line):
.getAggregateValueList(portal_type=data_sink_type_list) .getAggregateValueList(portal_type=data_sink_type_list)
input_line.setDefaultAggregateValue(data_ingestion_batch) input_line.setDefaultAggregateValue(data_ingestion_batch)
if not data_sink_list: if not data_sink_list:
if not data_sink_type_list: if not data_sink_type_list:
if data_product is None: if data_product is None:
...@@ -66,7 +66,7 @@ def init_input_line(input_line, operation_line): ...@@ -66,7 +66,7 @@ def init_input_line(input_line, operation_line):
portal_type = "Data Product", portal_type = "Data Product",
reference = resource_reference) reference = resource_reference)
data_sink_type_list = data_product.getAggregatedPortalTypeList() data_sink_type_list = data_product.getAggregatedPortalTypeList()
for data_sink_type in data_sink_type_list: for data_sink_type in data_sink_type_list:
# This should be more generic # This should be more generic
if data_sink_type not in ("Progress Indicator", "Data Ingestion Batch"): if data_sink_type not in ("Progress Indicator", "Data Ingestion Batch"):
...@@ -78,97 +78,50 @@ def init_input_line(input_line, operation_line): ...@@ -78,97 +78,50 @@ def init_input_line(input_line, operation_line):
item_device_relative_url=operation_line.getAggregateDevice(), item_device_relative_url=operation_line.getAggregateDevice(),
item_project_relative_url=input_line.getDestinationProject(), item_project_relative_url=input_line.getDestinationProject(),
item_resource_uid=input_line.getResourceUid()) item_resource_uid=input_line.getResourceUid())
if data_sink is None: if data_sink is None:
data_sink = portal.getDefaultModule(data_sink_type).newContent( data_sink = portal.getDefaultModule(data_sink_type).newContent(
portal_type = data_sink_type, portal_type = data_sink_type,
reference = "%s-%s" %(data_ingestion_reference, resource_reference)) reference = "%s-%s" %(data_ingestion_reference, resource_reference))
data_sink.validate() data_sink.validate()
data_sink_list.append(data_sink) data_sink_list.append(data_sink)
input_line.setAggregateValueList( input_line.setAggregateValueList(
input_line.getAggregateValueList() + data_sink_list) input_line.getAggregateValueList() + data_sink_list)
input_line.setQuantity(1) input_line.setQuantity(1)
if data_ingestion is None: if data_ingestion is None:
document = portal.data_ingestion_module.get(data_ingestion_id) document = portal.data_ingestion_module.get(data_ingestion_id)
if (document is not None) and document.getSimulationState() == 'started': if (document is not None) and document.getSimulationState() == 'started':
data_ingestion = document data_ingestion = document
if data_ingestion is None: if modify and data_ingestion is None:
specialise_value_list = [x.getObject() for x in portal_catalog.searchResults( specialise_list = [x.getRelativeUrl() for x in portal_catalog.searchResults(
portal_type = 'Data Supply', portal_type = 'Data Supply',
reference = specialise_reference, reference = specialise_reference,
validation_state = 'validated')] validation_state = 'validated')]
# if we do not find a validated data supply, we look for a default data supply # if we do not find a validated data supply, we look for a default data supply
if not specialise_value_list: if not specialise_list:
specialise_value_list = [x.getObject() for x in portal_catalog.searchResults( specialise_list = [x.getRelativeUrl() for x in portal_catalog.searchResults(
portal_type = 'Data Supply', portal_type = 'Data Supply',
reference = specialise_reference, reference = specialise_reference,
validation_state = 'default')] validation_state = 'default')]
# create a new data ingestion # create a new data ingestion
data_ingestion = portal.data_ingestion_module.newContent( data_ingestion = portal.ERP5Site_createDataIngestion(specialise_list,
id = data_ingestion_id, data_ingestion_reference,
portal_type = "Data Ingestion", data_ingestion_id)
reference = data_ingestion_reference,
specialise_value_list = specialise_value_list) # find ingestion line for current resource
for line in data_ingestion.objectValues(portal_type="Data Ingestion Line"):
composed = data_ingestion.asComposedDocument() if line.getResourceReference() == resource_reference:
input_line = line
property_list = ["title", elif line.getResourceValue().getPortalType() == "Data Operation":
"source", operation_line = line
"source_section",
"source_project",
"destination",
"destination_section",
"destination_project",
"specialise"]
property_dict = {p: composed.getProperty(p) for p in property_list}
property_dict["start_date"] = composed.getEffectiveDate()
property_dict["stop_date"] = composed.getExpirationDate()
data_ingestion.edit(**property_dict)
# create ingestion lines from specialise lines and assign input line
# and operation line
input_line = None
operation_line = None
for supply_line in composed.objectValues(
portal_type = 'Data Supply Line'):
current_line = data_ingestion.newContent(
portal_type = "Data Ingestion Line",
title = supply_line.getTitle(),
aggregate = supply_line.getAggregateList(),
int_index = supply_line.getIntIndex(),
quantity = supply_line.getQuantity(),
reference = supply_line.getReference(),
resource = supply_line.getResource(),
use = supply_line.getUse()
)
if current_line.getResourceReference() == resource_reference:
input_line = current_line
elif current_line.getResourceValue().getPortalType() == "Data Operation":
operation_line = current_line
else:
# we set quantity=0 for the empty line
current_line.setQuantity(0)
if modify and input_line.getQuantity() == 0:
init_input_line(input_line, operation_line) init_input_line(input_line, operation_line)
data_ingestion.start()
else:
# find ingestion line for current resource
for line in data_ingestion.objectValues(portal_type="Data Ingestion Line"):
if line.getResourceReference() == resource_reference:
input_line = line
elif line.getResourceValue().getPortalType() == "Data Operation":
operation_line = line
if input_line.getQuantity() == 0:
init_input_line(input_line, operation_line)
data_operation = operation_line.getResourceValue() data_operation = operation_line.getResourceValue()
parameter_dict = { parameter_dict = {
......
...@@ -50,7 +50,7 @@ ...@@ -50,7 +50,7 @@
</item> </item>
<item> <item>
<key> <string>_params</string> </key> <key> <string>_params</string> </key>
<value> <string>movement_dict, reference</string> </value> <value> <string>movement_dict, reference, modify=True</string> </value>
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>id</string> </key>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment