Commit c12cc4e2 authored by Roque's avatar Roque

erp5_wendelin_data_lake_ingestion: rename and move scripts to data lake skin folder

parent 33636898
portal = context.getPortalObject()
reference_separator = portal.getIngestionReferenceDictionary()["reference_separator"]
reference_length = portal.getIngestionReferenceDictionary()["reference_length"]
invalid_chars = portal.getIngestionReferenceDictionary()["invalid_chars"]
reference_separator = portal.ERP5Site_getIngestionReferenceDictionary()["reference_separator"]
reference_length = portal.ERP5Site_getIngestionReferenceDictionary()["reference_length"]
invalid_chars = portal.ERP5Site_getIngestionReferenceDictionary()["invalid_chars"]
record = reference.rsplit(reference_separator)
length = len(record)
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Folder" module="OFS.Folder"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_objects</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>erp5_ingestion_reference_utils</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
import json
portal = context.getPortalObject()
dict = {'invalid_suffix':portal.getIngestionReferenceDictionary()['invalid_suffix'],
'split_end_suffix':portal.getIngestionReferenceDictionary()['split_end_suffix'],
'single_end_suffix':portal.getIngestionReferenceDictionary()['single_end_suffix'],
'split_first_suffix':portal.getIngestionReferenceDictionary()['split_first_suffix'],
'none_extension':portal.getIngestionReferenceDictionary()['none_extension'],
'reference_separator':portal.getIngestionReferenceDictionary()['reference_separator'],
'complex_files_extensions':portal.getIngestionReferenceDictionary()['complex_files_extensions'],
'reference_length':portal.getIngestionReferenceDictionary()['reference_length'],
'invalid_chars':portal.getIngestionReferenceDictionary()['invalid_chars'],
}
return json.dumps(dict)
from Products.ERP5Type.Log import log
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
complex_files = portal.getIngestionReferenceDictionary()["complex_files_extensions"]
complex_files = portal.ERP5Site_getIngestionReferenceDictionary()["complex_files_extensions"]
for data_analysis in portal_catalog(portal_type = "Data Analysis",
simulation_state = "planned"):
......
......@@ -40,15 +40,15 @@ def isInterruptedAbandonedSplitIngestion(reference):
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_end_single = portal.getIngestionReferenceDictionary()["single_end_suffix"]
reference_first_split = portal.getIngestionReferenceDictionary()["split_first_suffix"]
reference_end_split = portal.getIngestionReferenceDictionary()["split_end_suffix"]
reference_end_single = portal.ERP5Site_getIngestionReferenceDictionary()["single_end_suffix"]
reference_first_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_first_suffix"]
reference_end_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"]
# stop single started ingestion (not split files)
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
id = "%"+reference_end_single):
if not portal.IsReferenceInvalidated(data_ingestion):
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference())
if len(related_split_ingestions) == 1:
......@@ -67,7 +67,7 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
id = "%"+reference_first_split):
if not portal.IsReferenceInvalidated(data_ingestion):
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
if isInterruptedAbandonedSplitIngestion(data_ingestion.getReference()):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=False)
else:
......@@ -102,7 +102,7 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
if ingestion.getSimulationState() == "started":
ingestion.stop()
else:
portal.InvalidateReference(ingestion)
portal.ERP5Site_invalidateReference(ingestion)
ingestion.deliver()
except Exception as e:
context.logEntry("ERROR appending split data streams for ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference()))
......
......@@ -10,9 +10,9 @@ TRUE = "TRUE"
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_separator = portal.getIngestionReferenceDictionary()["reference_separator"]
reference_end_single = portal.getIngestionReferenceDictionary()["single_end_suffix"]
reference_end_split = portal.getIngestionReferenceDictionary()["split_end_suffix"]
reference_separator = portal.ERP5Site_getIngestionReferenceDictionary()["reference_separator"]
reference_end_single = portal.ERP5Site_getIngestionReferenceDictionary()["single_end_suffix"]
reference_end_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"]
# remove supplier and eof from reference
data_ingestion_reference = reference_separator.join(reference.split(reference_separator)[1:-3])
......@@ -20,7 +20,7 @@ EOF = reference.split(reference_separator)[-3]
size = reference.split(reference_separator)[-2]
if data_ingestion_reference is "":
context.logEntry("[ERROR] Data Ingestion reference parameter for ingestionReferenceExists script is not well formated")
context.logEntry("[ERROR] Data Ingestion reference parameter for ERP5Site_checkIngestionReferenceExists script is not well formated")
raise ValueError("Data Ingestion reference is not well formated")
# check if there are started ingestions for this reference
......
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ingestionReferenceExists</string> </value>
<value> <string>ERP5Site_checkIngestionReferenceExists</string> </value>
</item>
</dictionary>
</pickle>
......
portal = context.getPortalObject()
INVALID_SUFFIX = portal.getIngestionReferenceDictionary()["invalid_suffix"]
INVALID_SUFFIX = portal.ERP5Site_getIngestionReferenceDictionary()["invalid_suffix"]
return document.getReference().endswith(INVALID_SUFFIX)
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>InvalidateReference</string> </value>
<value> <string>ERP5Site_checkReferenceInvalidated</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>getDataStreamChunk</string> </value>
<value> <string>ERP5Site_getDataStreamChunk</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -11,11 +11,11 @@ from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery, ComplexQuery
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_separator = portal.getIngestionReferenceDictionary()["reference_separator"]
reference_separator = portal.ERP5Site_getIngestionReferenceDictionary()["reference_separator"]
try:
data_set = portal.data_set_module.get(data_set_reference)
if data_set is None or portal.IsReferenceInvalidated(data_set):
if data_set is None or portal.ERP5Site_checkReferenceInvalidated(data_set):
return { "status_code": 0, "result": [] }
except Exception as e: # fails because unauthorized access
log("Unauthorized access to getDataStreamList.")
......
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>getDataStreamList</string> </value>
<value> <string>ERP5Site_getDataStreamList</string> </value>
</item>
</dictionary>
</pickle>
......
import json
portal = context.getPortalObject()
dict = {'invalid_suffix':portal.ERP5Site_getIngestionReferenceDictionary()['invalid_suffix'],
'split_end_suffix':portal.ERP5Site_getIngestionReferenceDictionary()['split_end_suffix'],
'single_end_suffix':portal.ERP5Site_getIngestionReferenceDictionary()['single_end_suffix'],
'split_first_suffix':portal.ERP5Site_getIngestionReferenceDictionary()['split_first_suffix'],
'none_extension':portal.ERP5Site_getIngestionReferenceDictionary()['none_extension'],
'reference_separator':portal.ERP5Site_getIngestionReferenceDictionary()['reference_separator'],
'complex_files_extensions':portal.ERP5Site_getIngestionReferenceDictionary()['complex_files_extensions'],
'reference_length':portal.ERP5Site_getIngestionReferenceDictionary()['reference_length'],
'invalid_chars':portal.ERP5Site_getIngestionReferenceDictionary()['invalid_chars'],
}
return json.dumps(dict)
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>getIngestionReferenceDictionary</string> </value>
<value> <string>ERP5Site_getIngestionConstantsJson</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>getIngestionConstantsJson</string> </value>
<value> <string>ERP5Site_getIngestionReferenceDictionary</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -13,7 +13,7 @@ kw_dict = {"query": portal_type_query,
"reference": reference}
for document in portal_catalog(**kw_dict):
portal.InvalidateReference(document)
portal.ERP5Site_invalidateReference(document)
try:
document.invalidate()
except:
......
portal = context.getPortalObject()
INVALID_SUFFIX = portal.getIngestionReferenceDictionary()["invalid_suffix"]
INVALID_SUFFIX = portal.ERP5Site_getIngestionReferenceDictionary()["invalid_suffix"]
try:
if not document.getReference().endswith(INVALID_SUFFIX):
......
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>IsReferenceInvalidated</string> </value>
<value> <string>ERP5Site_invalidateReference</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -14,34 +14,34 @@ try:
data_ingestion = portal_catalog.getResultValue(
portal_type = 'Data Ingestion',
id = data_stream.getId())
portal.InvalidateReference(data_stream)
portal.ERP5Site_invalidateReference(data_stream)
data_stream.invalidate()
if not portal.IsReferenceInvalidated(data_ingestion):
portal.InvalidateReference(data_ingestion)
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
portal.ERP5Site_invalidateReference(data_ingestion)
data_an = portal_catalog.getResultValue(
portal_type = 'Data Analysis',
id = data_stream.getId())
if data_an != None:
portal.InvalidateReference(data_an)
portal.ERP5Site_invalidateReference(data_an)
data_array = portal_catalog.getResultValue(
portal_type = 'Data Array',
id = data_stream.getId())
if data_array != None:
portal.InvalidateReference(data_array)
portal.ERP5Site_invalidateReference(data_array)
data_array.invalidate()
else: # split ingestion interrumped and restarted
# invalidate draft datastreams and old started data ingestions
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
reference = reference):
if not portal.IsReferenceInvalidated(data_ingestion):
portal.InvalidateReference(data_ingestion)
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
portal.ERP5Site_invalidateReference(data_ingestion)
data_ingestion.deliver()
for data_stream in portal_catalog(portal_type = "Data Stream",
validation_state = "draft",
reference = reference):
if not portal.IsReferenceInvalidated(data_stream):
portal.InvalidateReference(data_stream)
if not portal.ERP5Site_checkReferenceInvalidated(data_stream):
portal.ERP5Site_invalidateReference(data_stream)
except Exception as e:
context.logEntry("ERROR in ERP5Site_invalidateSplitIngestions: " + str(e))
pass
portal = context.getPortalObject()
INVALID_SUFFIX = portal.getIngestionReferenceDictionary()["invalid_suffix"]
INVALID_SUFFIX = portal.ERP5Site_getIngestionReferenceDictionary()["invalid_suffix"]
try:
if document.getReference().endswith(INVALID_SUFFIX):
......
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>RevalidateReference</string> </value>
<value> <string>ERP5Site_revalidateReference</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -40,15 +40,15 @@ def isInterruptedAbandonedSplitIngestion(reference):
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_end_single = portal.getIngestionReferenceDictionary()["single_end_suffix"]
reference_first_split = portal.getIngestionReferenceDictionary()["split_first_suffix"]
reference_end_split = portal.getIngestionReferenceDictionary()["split_end_suffix"]
reference_end_single = portal.ERP5Site_getIngestionReferenceDictionary()["single_end_suffix"]
reference_first_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_first_suffix"]
reference_end_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"]
# stop single started ingestion (not split files)
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
id = "%"+reference_end_single):
if not portal.IsReferenceInvalidated(data_ingestion):
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference())
if len(related_split_ingestions) == 1:
......@@ -67,7 +67,7 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
id = "%"+reference_first_split):
if not portal.IsReferenceInvalidated(data_ingestion):
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
if isInterruptedAbandonedSplitIngestion(data_ingestion.getReference()):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=False)
else:
......@@ -102,7 +102,7 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
if ingestion.getSimulationState() == "started":
ingestion.stop()
else:
portal.InvalidateReference(ingestion)
portal.ERP5Site_invalidateReference(ingestion)
ingestion.deliver()
except Exception as e:
context.logEntry("ERROR appending split data streams for ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference()))
......
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>getDescriptorHTMLContent</string> </value>
<value> <string>ERP5_getDescriptorHTMLContent</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -6,9 +6,9 @@ now_string = now.strftime('%Y%m%d-%H%M%S-%f')[:-3]
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_separator = portal.getIngestionReferenceDictionary()["reference_separator"]
reference_end_single = portal.getIngestionReferenceDictionary()["single_end_suffix"]
none_extension = portal.getIngestionReferenceDictionary()["none_extension"]
reference_separator = portal.ERP5Site_getIngestionReferenceDictionary()["reference_separator"]
reference_end_single = portal.ERP5Site_getIngestionReferenceDictionary()["single_end_suffix"]
none_extension = portal.ERP5Site_getIngestionReferenceDictionary()["none_extension"]
# remove supplier, eof, size and hash from reference
reference = reference_separator.join(reference.split(reference_separator)[1:-3])
......@@ -111,8 +111,8 @@ if dataset_reference is not None:
data_set.validate()
except:
data_set = portal.data_set_module.get(dataset_reference)
if portal.IsReferenceInvalidated(data_set):
portal.RevalidateReference(data_set)
if portal.ERP5Site_checkReferenceInvalidated(data_set):
portal.ERP5Site_revalidateReference(data_set)
if data_set.getValidationState() == "invalidated":
data_set.validate()
input_line.setDefaultAggregateValue(data_set)
......
......@@ -27,10 +27,10 @@ class TestDataIngestion(SecurityTestCase):
return "DataIngestionTest"
def afterSetUp(self):
self.assertEqual(self.REFERENCE_SEPARATOR, self.portal.getIngestionReferenceDictionary()["reference_separator"])
self.assertEqual(self.INVALID, self.portal.getIngestionReferenceDictionary()["invalid_suffix"])
self.assertEqual(self.EOF, self.REFERENCE_SEPARATOR + self.portal.getIngestionReferenceDictionary()["split_end_suffix"])
self.assertEqual(self.PART_1, self.REFERENCE_SEPARATOR + self.portal.getIngestionReferenceDictionary()["split_first_suffix"])
self.assertEqual(self.REFERENCE_SEPARATOR, self.portal.ERP5Site_getIngestionReferenceDictionary()["reference_separator"])
self.assertEqual(self.INVALID, self.portal.ERP5Site_getIngestionReferenceDictionary()["invalid_suffix"])
self.assertEqual(self.EOF, self.REFERENCE_SEPARATOR + self.portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"])
self.assertEqual(self.PART_1, self.REFERENCE_SEPARATOR + self.portal.ERP5Site_getIngestionReferenceDictionary()["split_first_suffix"])
def getRandomReference(self):
random_string = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(10)])
......
......@@ -46,9 +46,8 @@
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple>
<string>W: 88, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W: 95, 34: Unused variable \'i\' (unused-variable)</string>
<string>W: 95, 76: Unused variable \'j\' (unused-variable)</string>
<string>W: 99, 34: Unused variable \'i\' (unused-variable)</string>
<string>W: 99, 76: Unused variable \'j\' (unused-variable)</string>
</tuple>
</value>
</item>
......
erp5_ingestion_reference_utils
erp5_wendelin_data_lake
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment