Commit 558e96dc authored by Ivan Tyagov's avatar Ivan Tyagov

Data lake changes

See merge request nexedi/wendelin!45
parents 28a9e89e 4bd0aa64
import hashlib
import base64
from Products.ZSQLCatalog.SQLCatalog import Query
CHUNK_SIZE = 200000
def getHash(data_stream):
hash_md5 = hashlib.md5()
data_stream_chunk = None
n_chunk = 0
chunk_size = CHUNK_SIZE
while True:
start_offset = n_chunk*chunk_size
end_offset = n_chunk*chunk_size+chunk_size
try:
data_stream_chunk = ''.join(data_stream.readChunkList(start_offset, end_offset))
except Exception:
# data stream is empty
data_stream_chunk = ""
hash_md5.update(data_stream_chunk)
if data_stream_chunk == "": break
n_chunk += 1
return hash_md5.hexdigest()
decoded = base64.b64decode(data_chunk)
data_stream.appendData(decoded)
data_stream.setVersion(getHash(data_stream))
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_end_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"]
#if last chunk of split ingestion -> validate all related data streams and publish the current one:
if data_stream.getId().endswith(reference_end_split):
query = Query(portal_type="Data Stream", reference=data_stream.getReference(), validation_state="draft")
split_ingestion_data_stream_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),))
#full_file_size = 0
for chunk_data_stream in split_ingestion_data_stream_list:
#full_file_size += chunk_data_stream.getSize()
if chunk_data_stream.getValidationState() != "validated":
chunk_data_stream.validate()
if data_stream.getValidationState() != "validated":
data_stream.validate()
data_stream.publish()
portal = context.getPortalObject()
reference_separator = portal.getIngestionReferenceDictionary()["reference_separator"]
reference_length = portal.getIngestionReferenceDictionary()["reference_length"]
invalid_chars = portal.getIngestionReferenceDictionary()["invalid_chars"]
reference_separator = portal.ERP5Site_getIngestionReferenceDictionary()["reference_separator"]
reference_length = portal.ERP5Site_getIngestionReferenceDictionary()["reference_length"]
invalid_chars = portal.ERP5Site_getIngestionReferenceDictionary()["invalid_chars"]
record = reference.rsplit(reference_separator)
length = len(record)
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Ingestion Policy" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_Access_contents_information_Permission</string> </key>
<value>
<tuple>
<string>Anonymous</string>
<string>Assignee</string>
<string>Assignor</string>
<string>Associate</string>
<string>Auditor</string>
<string>Manager</string>
</tuple>
</value>
</item>
<item>
<key> <string>_Add_portal_content_Permission</string> </key>
<value>
<tuple>
<string>Assignee</string>
<string>Assignor</string>
<string>Associate</string>
<string>Manager</string>
</tuple>
</value>
</item>
<item>
<key> <string>_Modify_portal_content_Permission</string> </key>
<value>
<tuple>
<string>Assignee</string>
<string>Assignor</string>
<string>Associate</string>
<string>Manager</string>
</tuple>
</value>
</item>
<item>
<key> <string>_View_Permission</string> </key>
<value>
<tuple>
<string>Anonymous</string>
<string>Assignee</string>
<string>Assignor</string>
<string>Associate</string>
<string>Auditor</string>
<string>Manager</string>
</tuple>
</value>
</item>
<item>
<key> <string>data_operation_script_id</string> </key>
<value> <string>IngestionPolicy_getIngestionOperationAndParameterDictEbulk</string> </value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>default_ebulk</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value> <string>Handles ingestion of raw files bytes sent to us from ebulk.</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>default_ebulk</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Ingestion Policy</string> </value>
</item>
<item>
<key> <string>script_id</string> </key>
<value> <string>IngestionPolicy_parseEbulkIngestionTag</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string>Default Ebulk Ingestion Policy</string> </value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>001</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>edit_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
<item>
<key> <string>validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.Workflow"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_log</string> </key>
<value>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>edit</string> </value>
</item>
<item>
<key> <string>actor</string> </key>
<value> <string>zope</string> </value>
</item>
<item>
<key> <string>comment</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>error_message</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>serial</string> </key>
<value> <string>984.990.49194.19609</string> </value>
</item>
<item>
<key> <string>state</string> </key>
<value> <string>current</string> </value>
</item>
<item>
<key> <string>time</string> </key>
<value>
<object>
<klass>
<global name="DateTime" module="DateTime.DateTime"/>
</klass>
<tuple>
<none/>
</tuple>
<state>
<tuple>
<float>1589986625.99</float>
<string>UTC</string>
</tuple>
</state>
</object>
</value>
</item>
</dictionary>
</list>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.Workflow"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_log</string> </key>
<value>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>actor</string> </key>
<value> <string>zope</string> </value>
</item>
<item>
<key> <string>comment</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>error_message</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>time</string> </key>
<value>
<object>
<klass>
<global name="DateTime" module="DateTime.DateTime"/>
</klass>
<tuple>
<none/>
</tuple>
<state>
<tuple>
<float>1589898672.1</float>
<string>UTC</string>
</tuple>
</state>
</object>
</value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
......@@ -58,11 +58,11 @@
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>wendelin_embulk</string> </value>
<value> <string>wendelin_ebulk</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value> <string>Handles ingestion of raw files bytes sent to us from embulk.</string> </value>
<value> <string>[OBSOLETE - Kept for old ebulk clients] Handles ingestion of raw files bytes sent to us from ebulk.</string> </value>
</item>
<item>
<key> <string>id</string> </key>
......@@ -78,7 +78,7 @@
</item>
<item>
<key> <string>title</string> </key>
<value> <string>Wendelin Embulk Ingestion Policy</string> </value>
<value> <string>[OBSOLETE] Wendelin Ebulk Ingestion Policy</string> </value>
</item>
<item>
<key> <string>version</string> </key>
......@@ -152,7 +152,7 @@
</item>
<item>
<key> <string>serial</string> </key>
<value> <string>983.12768.14725.3720</string> </value>
<value> <string>983.63603.62266.8260</string> </value>
</item>
<item>
<key> <string>state</string> </key>
......@@ -170,7 +170,7 @@
</tuple>
<state>
<tuple>
<float>1586946559.16</float>
<float>1589986571.48</float>
<string>UTC</string>
</tuple>
</state>
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Folder" module="OFS.Folder"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_objects</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>erp5_ingestion_reference_utils</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
import json
portal = context.getPortalObject()
dict = {'invalid_suffix':portal.getIngestionReferenceDictionary()['invalid_suffix'],
'split_end_suffix':portal.getIngestionReferenceDictionary()['split_end_suffix'],
'single_end_suffix':portal.getIngestionReferenceDictionary()['single_end_suffix'],
'split_first_suffix':portal.getIngestionReferenceDictionary()['split_first_suffix'],
'none_extension':portal.getIngestionReferenceDictionary()['none_extension'],
'reference_separator':portal.getIngestionReferenceDictionary()['reference_separator'],
'complex_files_extensions':portal.getIngestionReferenceDictionary()['complex_files_extensions'],
'reference_length':portal.getIngestionReferenceDictionary()['reference_length'],
'invalid_chars':portal.getIngestionReferenceDictionary()['invalid_chars'],
}
return json.dumps(dict)
from Products.ERP5Type.Log import log
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
complex_files = portal.getIngestionReferenceDictionary()["complex_files_extensions"]
complex_files = portal.ERP5Site_getIngestionReferenceDictionary()["complex_files_extensions"]
for data_analysis in portal_catalog(portal_type = "Data Analysis",
simulation_state = "planned"):
......
......@@ -40,15 +40,15 @@ def isInterruptedAbandonedSplitIngestion(reference):
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_end_single = portal.getIngestionReferenceDictionary()["single_end_suffix"]
reference_first_split = portal.getIngestionReferenceDictionary()["split_first_suffix"]
reference_end_split = portal.getIngestionReferenceDictionary()["split_end_suffix"]
reference_end_single = portal.ERP5Site_getIngestionReferenceDictionary()["single_end_suffix"]
reference_first_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_first_suffix"]
reference_end_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"]
# stop single started ingestion (not split files)
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
id = "%"+reference_end_single):
if not portal.IsReferenceInvalidated(data_ingestion):
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference())
if len(related_split_ingestions) == 1:
......@@ -67,7 +67,7 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
id = "%"+reference_first_split):
if not portal.IsReferenceInvalidated(data_ingestion):
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
if isInterruptedAbandonedSplitIngestion(data_ingestion.getReference()):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=False)
else:
......@@ -102,7 +102,7 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
if ingestion.getSimulationState() == "started":
ingestion.stop()
else:
portal.InvalidateReference(ingestion)
portal.ERP5Site_invalidateReference(ingestion)
ingestion.deliver()
except Exception as e:
context.logEntry("ERROR appending split data streams for ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference()))
......
......@@ -10,9 +10,9 @@ TRUE = "TRUE"
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_separator = portal.getIngestionReferenceDictionary()["reference_separator"]
reference_end_single = portal.getIngestionReferenceDictionary()["single_end_suffix"]
reference_end_split = portal.getIngestionReferenceDictionary()["split_end_suffix"]
reference_separator = portal.ERP5Site_getIngestionReferenceDictionary()["reference_separator"]
reference_end_single = portal.ERP5Site_getIngestionReferenceDictionary()["single_end_suffix"]
reference_end_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"]
# remove supplier and eof from reference
data_ingestion_reference = reference_separator.join(reference.split(reference_separator)[1:-3])
......@@ -20,7 +20,7 @@ EOF = reference.split(reference_separator)[-3]
size = reference.split(reference_separator)[-2]
if data_ingestion_reference is "":
context.logEntry("[ERROR] Data Ingestion reference parameter for ingestionReferenceExists script is not well formated")
context.logEntry("[ERROR] Data Ingestion reference parameter for ERP5Site_checkIngestionReferenceExists script is not well formated")
raise ValueError("Data Ingestion reference is not well formated")
# check if there are started ingestions for this reference
......
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ingestionReferenceExists</string> </value>
<value> <string>ERP5Site_checkIngestionReferenceExists</string> </value>
</item>
</dictionary>
</pickle>
......
portal = context.getPortalObject()
INVALID_SUFFIX = portal.getIngestionReferenceDictionary()["invalid_suffix"]
INVALID_SUFFIX = portal.ERP5Site_getIngestionReferenceDictionary()["invalid_suffix"]
return document.getReference().endswith(INVALID_SUFFIX)
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>InvalidateReference</string> </value>
<value> <string>ERP5Site_checkReferenceInvalidated</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>getDataStreamChunk</string> </value>
<value> <string>ERP5Site_getDataStreamChunk</string> </value>
</item>
</dictionary>
</pickle>
......
"""
This script is called from ebulk client to get list of Data Streams for a Data set.
"""
import json
from Products.ERP5Type.Log import log
portal = context.getPortalObject()
try:
data_set = portal.data_set_module.get(data_set_reference)
if data_set is None or portal.ERP5Site_checkReferenceInvalidated(data_set):
return { "status_code": 0, "result": [] }
except Exception as e: # fails because unauthorized access
log("Unauthorized access to getDataStreamList: " + str(e))
return { "status_code": 1, "error_message": "401 - Unauthorized access. Please check your user credentials and try again." }
data_stream_dict = {}
for stream in data_set.DataSet_getDataStreamList():
if not portal.ERP5Site_checkReferenceInvalidated(stream) and stream.getValidationState() != "draft":
data_stream_info_dict = { 'id': 'data_stream_module/'+stream.getId(),
'size': stream.getSize(),
'hash': stream.getVersion() }
if stream.getReference() in data_stream_dict:
data_stream_dict[stream.getReference()]['data-stream-list'].append(data_stream_info_dict)
data_stream_dict[stream.getReference()]['large-hash'] = data_stream_dict[stream.getReference()]['large-hash'] + str(stream.getVersion())
data_stream_dict[stream.getReference()]['full-size'] = int(data_stream_dict[stream.getReference()]['full-size']) + int(stream.getSize())
else:
data_stream_dict[stream.getReference()] = { 'data-stream-list': [data_stream_info_dict],
'id': 'data_stream_module/'+stream.getId(),
'reference': stream.getReference(),
'large-hash': stream.getVersion(),
'full-size': stream.getSize() }
result_dict = { 'status_code': 0, 'result': data_stream_dict.values()}
return json.dumps(result_dict)
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>getDataStreamList</string> </value>
<value> <string>ERP5Site_getDataStreamList</string> </value>
</item>
</dictionary>
</pickle>
......
import json
portal = context.getPortalObject()
dict = {'invalid_suffix':portal.ERP5Site_getIngestionReferenceDictionary()['invalid_suffix'],
'split_end_suffix':portal.ERP5Site_getIngestionReferenceDictionary()['split_end_suffix'],
'single_end_suffix':portal.ERP5Site_getIngestionReferenceDictionary()['single_end_suffix'],
'split_first_suffix':portal.ERP5Site_getIngestionReferenceDictionary()['split_first_suffix'],
'none_extension':portal.ERP5Site_getIngestionReferenceDictionary()['none_extension'],
'reference_separator':portal.ERP5Site_getIngestionReferenceDictionary()['reference_separator'],
'complex_files_extensions':portal.ERP5Site_getIngestionReferenceDictionary()['complex_files_extensions'],
'reference_length':portal.ERP5Site_getIngestionReferenceDictionary()['reference_length'],
'invalid_chars':portal.ERP5Site_getIngestionReferenceDictionary()['invalid_chars'],
}
return json.dumps(dict)
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>getIngestionReferenceDictionary</string> </value>
<value> <string>ERP5Site_getIngestionConstantsJson</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>getIngestionConstantsJson</string> </value>
<value> <string>ERP5Site_getIngestionReferenceDictionary</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -13,7 +13,7 @@ kw_dict = {"query": portal_type_query,
"reference": reference}
for document in portal_catalog(**kw_dict):
portal.InvalidateReference(document)
portal.ERP5Site_invalidateReference(document)
try:
document.invalidate()
except:
......
portal = context.getPortalObject()
INVALID_SUFFIX = portal.getIngestionReferenceDictionary()["invalid_suffix"]
INVALID_SUFFIX = portal.ERP5Site_getIngestionReferenceDictionary()["invalid_suffix"]
try:
if not document.getReference().endswith(INVALID_SUFFIX):
......
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>IsReferenceInvalidated</string> </value>
<value> <string>ERP5Site_invalidateReference</string> </value>
</item>
</dictionary>
</pickle>
......
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery, ComplexQuery
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
......@@ -14,34 +12,32 @@ try:
data_ingestion = portal_catalog.getResultValue(
portal_type = 'Data Ingestion',
id = data_stream.getId())
portal.InvalidateReference(data_stream)
portal.ERP5Site_invalidateReference(data_stream)
data_stream.invalidate()
if not portal.IsReferenceInvalidated(data_ingestion):
portal.InvalidateReference(data_ingestion)
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
portal.ERP5Site_invalidateReference(data_ingestion)
data_an = portal_catalog.getResultValue(
portal_type = 'Data Analysis',
id = data_stream.getId())
if data_an != None:
portal.InvalidateReference(data_an)
portal.ERP5Site_invalidateReference(data_an)
data_array = portal_catalog.getResultValue(
portal_type = 'Data Array',
id = data_stream.getId())
if data_array != None:
portal.InvalidateReference(data_array)
portal.ERP5Site_invalidateReference(data_array)
data_array.invalidate()
else: # split ingestion interrumped and restarted
# invalidate draft datastreams and old started data ingestions
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
reference = reference):
if not portal.IsReferenceInvalidated(data_ingestion):
portal.InvalidateReference(data_ingestion)
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
portal.ERP5Site_invalidateReference(data_ingestion)
data_ingestion.deliver()
for data_stream in portal_catalog(portal_type = "Data Stream",
validation_state = "draft",
reference = reference):
if not portal.IsReferenceInvalidated(data_stream):
portal.InvalidateReference(data_stream)
reference = reference):
if not portal.ERP5Site_checkReferenceInvalidated(data_stream):
portal.ERP5Site_invalidateReference(data_stream)
except Exception as e:
context.logEntry("ERROR in ERP5Site_invalidateSplitIngestions: " + str(e))
pass
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_separator = portal.ERP5Site_getIngestionReferenceDictionary()["reference_separator"]
none_extension = portal.ERP5Site_getIngestionReferenceDictionary()["none_extension"]
# check new reference
data_ingestions = portal_catalog(portal_type = "Data Ingestion", reference = new_reference)
if len(data_ingestions) > 0: raise "Error renaming: new reference '%s' already exists." % new_reference
# rename data ingestions
data_ingestions = portal_catalog(portal_type = "Data Ingestion", reference = reference)
if len(data_ingestions) == 0: raise "Error renaming: could not find any data ingestion with reference '%s'." % reference
data_ingestion_title = reference_separator.join(new_reference.split(reference_separator)[1:-1])
for data_ingestion in data_ingestions:
data_ingestion.setReference(new_reference)
data_ingestion.setTitle(data_ingestion_title)
extension = new_reference.split(reference_separator)[-1]
data_stream_title = "%s%s" % (data_ingestion_title, "."+extension if extension != none_extension else "")
# rename data streams
data_streams = portal_catalog(portal_type = "Data Stream", reference = reference)
for data_stream in data_streams:
data_stream.setReference(new_reference)
data_stream.setTitle(data_stream_title)
# rename data analysis
data_analysises = portal_catalog(portal_type = "Data Analysis", reference = reference)
for data_analysis in data_analysises:
data_analysis.setReference(new_reference)
# rename data arrays
data_arrays = portal_catalog(portal_type = "Data Array", reference = reference)
for data_array in data_arrays:
data_array.setReference(new_reference)
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>reference, new_reference</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_renameIngestion</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
portal = context.getPortalObject()
INVALID_SUFFIX = portal.getIngestionReferenceDictionary()["invalid_suffix"]
INVALID_SUFFIX = portal.ERP5Site_getIngestionReferenceDictionary()["invalid_suffix"]
try:
if document.getReference().endswith(INVALID_SUFFIX):
......
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>RevalidateReference</string> </value>
<value> <string>ERP5Site_revalidateReference</string> </value>
</item>
</dictionary>
</pickle>
......
from Products.ERP5Type.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery
import hashlib
CHUNK_SIZE = 200000
......@@ -14,7 +12,7 @@ def getHash(data_stream):
end_offset = n_chunk*chunk_size+chunk_size
try:
data_stream_chunk = ''.join(data_stream.readChunkList(start_offset, end_offset))
except:
except Exception:
# data stream is empty
data_stream_chunk = ""
hash_md5.update(data_stream_chunk)
......@@ -22,9 +20,18 @@ def getHash(data_stream):
n_chunk += 1
return hash_md5.hexdigest()
def isFinishedSplitIngestion(reference):
#check if all chunks of a split file were ingested
#that is if EOF chunk was ingested
reference_end_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"]
eof_ingestion = portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
reference = reference,
id = "%"+reference_end_split)
return len(eof_ingestion) == 1
def isInterruptedAbandonedSplitIngestion(reference):
from DateTime import DateTime
now = DateTime()
day_hours = 1.0/24/60*60*24
# started split data ingestions for reference
catalog_kw = {'portal_type': 'Data Ingestion',
......@@ -40,70 +47,57 @@ def isInterruptedAbandonedSplitIngestion(reference):
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_end_single = portal.getIngestionReferenceDictionary()["single_end_suffix"]
reference_first_split = portal.getIngestionReferenceDictionary()["split_first_suffix"]
reference_end_split = portal.getIngestionReferenceDictionary()["split_end_suffix"]
reference_end_single = portal.ERP5Site_getIngestionReferenceDictionary()["single_end_suffix"]
reference_first_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_first_suffix"]
reference_end_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"]
# stop single started ingestion (not split files)
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
id = "%"+reference_end_single):
if not portal.IsReferenceInvalidated(data_ingestion):
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference())
if len(related_split_ingestions) == 1:
data_stream = portal_catalog.getResultValue(
portal_type = 'Data Stream',
reference = data_ingestion.getReference())
if data_stream is not None:
hash_value = getHash(data_stream)
data_stream.setVersion(hash_value)
if data_stream.getValidationState() != "validated":
data_stream.validate()
if data_ingestion.getSimulationState() == "started":
data_ingestion.stop()
try:
data_stream = portal_catalog.getResultValue(
portal_type = 'Data Stream',
reference = data_ingestion.getReference())
if data_stream is not None:
if data_stream.getVersion() is None:
hash_value = getHash(data_stream)
data_stream.setVersion(hash_value)
if data_stream.getValidationState() != "validated" and data_stream.getValidationState() != "published":
data_stream.validate()
if data_stream.getValidationState() != "published":
data_stream.publish()
if data_ingestion.getSimulationState() == "started":
data_ingestion.stop()
except Exception as e:
context.log("ERROR stoping single ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference()))
context.log(e)
else:
data_ingestion.deliver()
# append split ingestions
# handle split ingestions
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
id = "%"+reference_first_split):
if not portal.IsReferenceInvalidated(data_ingestion):
if isInterruptedAbandonedSplitIngestion(data_ingestion.getReference()):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=False)
else:
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
if isFinishedSplitIngestion(data_ingestion.getReference()):
try:
last_data_stream_id = ""
query = Query(portal_type="Data Stream", reference=data_ingestion.getReference(), validation_state="draft")
result_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),))
full_data_stream = None
for data_stream in result_list:
log(''.join(["Data stream for split ingestion: ", data_stream.getId()]))
if data_stream.getId() == data_ingestion.getId():
log("It is base data stream")
full_data_stream = data_stream
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
reference = data_ingestion.getReference())
for ingestion in related_split_ingestions:
if ingestion.getId().endswith(reference_end_split):
if ingestion.getSimulationState() == "started":
ingestion.stop()
else:
log("It is not base data stream, it is a part")
if full_data_stream != None:
log("appending content to base data stream...")
full_data_stream.appendData(data_stream.getData())
last_data_stream_id = data_stream.getId()
portal.data_stream_module.deleteContent(data_stream.getId())
if last_data_stream_id.endswith(reference_end_split):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=True)
hash = getHash(full_data_stream)
full_data_stream.setVersion(hash)
if full_data_stream.getValidationState() != "validated":
full_data_stream.validate()
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
reference = data_ingestion.getReference())
for ingestion in related_split_ingestions:
if ingestion.getId() == full_data_stream.getId():
if ingestion.getSimulationState() == "started":
ingestion.stop()
else:
portal.InvalidateReference(ingestion)
ingestion.deliver()
ingestion.deliver()
except Exception as e:
context.logEntry("ERROR appending split data streams for ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference()))
context.logEntry(e)
context.log("ERROR handling split data streams for ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference()))
context.log(e)
else:
if isInterruptedAbandonedSplitIngestion(data_ingestion.getReference()):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=False)
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>getDescriptorHTMLContent</string> </value>
<value> <string>ERP5_getDescriptorHTMLContent</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -6,9 +6,9 @@ now_string = now.strftime('%Y%m%d-%H%M%S-%f')[:-3]
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_separator = portal.getIngestionReferenceDictionary()["reference_separator"]
reference_end_single = portal.getIngestionReferenceDictionary()["single_end_suffix"]
none_extension = portal.getIngestionReferenceDictionary()["none_extension"]
reference_separator = portal.ERP5Site_getIngestionReferenceDictionary()["reference_separator"]
reference_end_single = portal.ERP5Site_getIngestionReferenceDictionary()["single_end_suffix"]
none_extension = portal.ERP5Site_getIngestionReferenceDictionary()["none_extension"]
# remove supplier, eof, size and hash from reference
reference = reference_separator.join(reference.split(reference_separator)[1:-3])
......@@ -20,8 +20,6 @@ supplier = movement_dict.get('supplier', None)
extension = movement_dict.get('extension', None)
dataset_reference = movement_dict.get('dataset_reference', None)
data_ingestion_id = '%s_%s_%s_%s' %(supplier, dataset_reference, now_string, eof)
size = movement_dict.get('size', None) if movement_dict.get('size', None) != "" else None
hash_value = movement_dict.get('hash', None) if movement_dict.get('hash', None) != "" else None
# search for applicable data ingestion
data_ingestion = portal_catalog.getResultValue(
......@@ -85,12 +83,9 @@ for supply_line in composed.objectValues(portal_type = 'Data Supply Line'):
input_line.setAggregateSet(
input_line.getAggregateList() + operation_line.getAggregateList())
if hash_value is None or eof != reference_end_single: # do not set hash if split, calculate when append
hash_value = ""
data_stream = portal.data_stream_module.newContent(
portal_type = "Data Stream",
id = data_ingestion_id,
version = hash_value,
title = "%s%s" % (data_ingestion.getTitle(), "."+extension if extension != none_extension else ""),
reference = data_ingestion_reference)
......@@ -109,10 +104,10 @@ if dataset_reference is not None:
# when a data set is uploaded from ebulk this means that "validation" is done at client side
# thus set set accordingly
data_set.validate()
except:
except Exception:
data_set = portal.data_set_module.get(dataset_reference)
if portal.IsReferenceInvalidated(data_set):
portal.RevalidateReference(data_set)
if portal.ERP5Site_checkReferenceInvalidated(data_set):
portal.ERP5Site_revalidateReference(data_set)
if data_set.getValidationState() == "invalidated":
data_set.validate()
input_line.setDefaultAggregateValue(data_set)
......@@ -122,7 +117,9 @@ data_ingestion.start()
data_operation = operation_line.getResourceValue()
data_stream = input_line.getAggregateDataStreamValue()
# if not split (one single ingestion) validate and publish the data stream
if eof == reference_end_single:
data_stream.validate()
data_stream.publish()
return data_operation, {'data_stream': data_stream}
"""
This script is called from ebulk client to get list of Data Streams for a
Data set.
"""
import re
import json
from Products.ERP5Type.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery, ComplexQuery
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_separator = portal.getIngestionReferenceDictionary()["reference_separator"]
try:
data_set = portal.data_set_module.get(data_set_reference)
if data_set is None or portal.IsReferenceInvalidated(data_set):
return { "status_code": 0, "result": [] }
except Exception as e: # fails because unauthorized access
log("Unauthorized access to getDataStreamList.")
return { "status_code": 1, "error_message": "401 - Unauthorized access. Please check your user credentials and try again." }
data_set = portal.data_set_module.get(data_set_reference)
if data_set is None:
return []
data_stream_list = []
for stream in data_set.DataSet_getDataStreamList():
if stream.getVersion() == "":
return { "status_code": 2, "result": [] }
data_stream_list.append({ 'id': 'data_stream_module/'+stream.getId(),
'reference': stream.getReference(),
'size': stream.getSize(),
'hash': stream.getVersion() })
dict = { 'status_code': 0, 'result': data_stream_list }
return json.dumps(dict)
......@@ -27,10 +27,10 @@ class TestDataIngestion(SecurityTestCase):
return "DataIngestionTest"
def afterSetUp(self):
self.assertEqual(self.REFERENCE_SEPARATOR, self.portal.getIngestionReferenceDictionary()["reference_separator"])
self.assertEqual(self.INVALID, self.portal.getIngestionReferenceDictionary()["invalid_suffix"])
self.assertEqual(self.EOF, self.REFERENCE_SEPARATOR + self.portal.getIngestionReferenceDictionary()["split_end_suffix"])
self.assertEqual(self.PART_1, self.REFERENCE_SEPARATOR + self.portal.getIngestionReferenceDictionary()["split_first_suffix"])
self.assertEqual(self.REFERENCE_SEPARATOR, self.portal.ERP5Site_getIngestionReferenceDictionary()["reference_separator"])
self.assertEqual(self.INVALID, self.portal.ERP5Site_getIngestionReferenceDictionary()["invalid_suffix"])
self.assertEqual(self.EOF, self.REFERENCE_SEPARATOR + self.portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"])
self.assertEqual(self.PART_1, self.REFERENCE_SEPARATOR + self.portal.ERP5Site_getIngestionReferenceDictionary()["split_first_suffix"])
def getRandomReference(self):
random_string = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(10)])
......@@ -70,6 +70,12 @@ class TestDataIngestion(SecurityTestCase):
reference = reference)
return data_stream
def getDataStreamChunkList(self, reference):
data_stream_list = self.portal.portal_catalog(
portal_type = 'Data Stream',
reference = reference)
return data_stream_list
def ingestRequest(self, reference, eof, data_chunk, ingestion_policy):
encoded_data_chunk = base64.b64encode(data_chunk)
request = self.portal.REQUEST
......@@ -84,14 +90,11 @@ class TestDataIngestion(SecurityTestCase):
def ingest(self, data_chunk, reference, extension, eof, randomize_ingestion_reference=False):
ingestion_reference = self.getIngestionReference(reference, extension, randomize_ingestion_reference)
# use default ebulk policy
ingestion_policy = self.portal.portal_ingestion_policies.wendelin_embulk
ingestion_policy = self.portal.portal_ingestion_policies.default_ebulk
self.ingestRequest(ingestion_reference, eof, data_chunk, ingestion_policy)
_, ingestion_reference = self.sanitizeReference(ingestion_reference)
return ingestion_reference
return ingestion_reference
def stepIngest(self, extension, delimiter, randomize_ingestion_reference=False):
file_name = "file_name.csv"
......@@ -108,16 +111,15 @@ class TestDataIngestion(SecurityTestCase):
chunk.append(line)
else:
break
ingestion_reference = self.ingest(data_chunk, reference, extension, self.SINGLE_INGESTION_END, randomize_ingestion_reference=randomize_ingestion_reference)
if os.path.exists(file_name):
os.remove(file_name)
# test properly ingested
data_ingestion = self.getDataIngestion(ingestion_reference)
self.assertNotEqual(None, data_ingestion)
data_ingestion_line = [x for x in data_ingestion.objectValues() \
if x.getReference() == 'out_stream'][0]
data_set = data_ingestion_line.getAggregateValue(portal_type='Data Set')
......@@ -127,8 +129,9 @@ class TestDataIngestion(SecurityTestCase):
data_stream_data = data_stream.getData()
self.assertEqual(data_chunk, data_stream_data)
# check Data Stream and Data Set are validated
self.assertEqual('validated', data_stream.getValidationState())
# check Data Set is validated and Data Stream is published
self.assertEqual('validated', data_set.getValidationState())
self.assertEqual('published', data_stream.getValidationState())
return data_set, [data_stream]
......@@ -137,10 +140,10 @@ class TestDataIngestion(SecurityTestCase):
Test default ingestion with ebulk too.
"""
self.stepIngest(self.CSV, ",")
def test_02_DefaultSplitIngestion(self):
"""
Test multiple uploads from ebulk end up in same Data Stream concatenated
Test multiple uploads from ebulk end up in multiple Data Streams
(in case of large file upload when ebluk by default splits file to 50MBs
chunks).
"""
......@@ -152,42 +155,48 @@ class TestDataIngestion(SecurityTestCase):
for _ in xrange(250)])
data_chunk_4 = ''.join([random.choice(string.ascii_letters + string.digits) \
for _ in xrange(250)])
data_chunk = data_chunk_1 + data_chunk_2 + data_chunk_3 + data_chunk_4
reference = self.getRandomReference()
ingestion_reference = self.ingest(data_chunk_1, reference, self.FIF, self.PART_1)
time.sleep(1)
self.tic()
ingestion_reference = self.ingest(data_chunk_2, reference, self.FIF, self.PART_2)
time.sleep(1)
time.sleep(1)
self.tic()
ingestion_reference = self.ingest(data_chunk_3, reference, self.FIF, self.PART_3)
time.sleep(1)
time.sleep(1)
self.tic()
ingestion_reference = self.ingest(data_chunk_4, reference, self.FIF, self.EOF)
time.sleep(1)
self.tic()
# call explicitly alarm so all 4 Data Streams can be concatenated to one
self.portal.portal_alarms.wendelin_data_lake_handle_analysis.Alarm_dataLakeHandleAnalysis()
# call explicitly alarm so all 4 Data Streams are validated and published
self.portal.portal_alarms.wendelin_handle_analysis.Alarm_handleAnalysis()
self.tic()
# check resulting Data Stream
data_stream = self.getDataStream(ingestion_reference)
self.assertEqual(data_chunk, data_stream.getData())
# check resulting Data Streams
data_stream_list = self.getDataStreamChunkList(ingestion_reference)
#one data stream per chunk
self.assertEqual(len(data_stream_list), 4)
#last datastream (EOF) published, the rest validated
for stream in data_stream_list:
if stream.getId().endswith(self.EOF.replace(self.REFERENCE_SEPARATOR, "")):
self.assertEqual('published', stream.getValidationState())
else:
self.assertEqual('validated', stream.getValidationState())
def test_03_DefaultWendelinConfigurationExistency(self):
"""
Test that nobody accidently removes needed by HowTo's default configurations.
"""
"""
# test default ebuk ingestion exists
self.assertNotEqual(None,
getattr(self.portal.portal_ingestion_policies, "wendelin_embulk", None))
self.assertNotEqual(None,
self.assertNotEqual(None,
getattr(self.portal.portal_ingestion_policies, "default_ebulk", None))
self.assertNotEqual(None,
getattr(self.portal.data_supply_module, "embulk", None))
def test_04_DefaultModelSecurityModel(self):
......@@ -199,19 +208,17 @@ class TestDataIngestion(SecurityTestCase):
# check data relation between Data Set and Data Streams work
self.assertSameSet(data_stream_list, data_set.DataSet_getDataStreamList())
# publish data set and have all Data Streams publsihed automatically
data_set.publish()
self.tic()
self.assertEqual('published', data_set.getValidationState())
self.assertSameSet(['published' for x in data_stream_list],
# check data set and all Data Streams states
self.assertEqual('validated', data_set.getValidationState())
self.assertSameSet(['published' for x in data_stream_list],
[x.getValidationState() for x in data_stream_list])
# invalidate Data Set should invalidate related Data Streams
data_set.invalidate()
self.tic()
self.assertEqual('invalidated', data_set.getValidationState())
self.assertSameSet(['invalidated' for x in data_stream_list],
self.assertSameSet(['invalidated' for x in data_stream_list],
[x.getValidationState() for x in data_stream_list])
# XXX: new test which simulates download / upload of Data Set and increase DS version
\ No newline at end of file
......@@ -46,9 +46,8 @@
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple>
<string>W: 88, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W: 95, 34: Unused variable \'i\' (unused-variable)</string>
<string>W: 95, 76: Unused variable \'j\' (unused-variable)</string>
<string>W:102, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:102, 76: Unused variable \'j\' (unused-variable)</string>
</tuple>
</value>
</item>
......
......@@ -6,6 +6,7 @@ data_product_module/fif_descriptor
data_supply_module/embulk
data_supply_module/embulk/**
portal_ingestion_policies/wendelin_embulk
portal_ingestion_policies/default_ebulk
portal_categories/function/**
portal_categories/use/**
portal_alarms/wendelin_data_lake_handle_analysis
......
......@@ -6,4 +6,5 @@ data_product_module/fif_descriptor
data_supply_module/embulk
data_supply_module/embulk/**
portal_ingestion_policies/wendelin_embulk
portal_ingestion_policies/default_ebulk
portal_categories/use/**
\ No newline at end of file
......@@ -10,4 +10,5 @@ portal_alarms/wendelin_data_lake_handle_analysis/**
portal_callables/DataIngestionLine_writeEbulkIngestionToDataStream
portal_callables/IngestionPolicy_parseEbulkIngestionTag
portal_categories/use/**
portal_ingestion_policies/default_ebulk
portal_ingestion_policies/wendelin_embulk
\ No newline at end of file
erp5_ingestion_reference_utils
erp5_wendelin_data_lake
\ No newline at end of file
......@@ -112,7 +112,7 @@
});
})
.declareMethod("getDescriptorContent", function (descriptorReference) {
var url = "/erp5/getDescriptorHTMLContent?reference=" + descriptorReference,
var url = "/ERP5_getDescriptorHTMLContent?reference=" + descriptorReference,
xmlHttp = new XMLHttpRequest();
try {
xmlHttp.open("GET", url, false);
......
......@@ -238,7 +238,7 @@
</item>
<item>
<key> <string>serial</string> </key>
<value> <string>969.21927.24503.19865</string> </value>
<value> <string>984.982.33861.30037</string> </value>
</item>
<item>
<key> <string>state</string> </key>
......@@ -256,7 +256,7 @@
</tuple>
<state>
<tuple>
<float>1533297268.38</float>
<float>1589986197.54</float>
<string>UTC</string>
</tuple>
</state>
......
......@@ -60,7 +60,7 @@
"key": "field_listbox",
"lines": 15,
"list_method": "portal_catalog",
"query": "urn:jio:allDocs?query=portal_type%3A%22Data+Set%22+AND+validation_state%3A%22published%22+AND+NOT+reference%3A%22%25_invalid%22",
"query": "urn:jio:allDocs?query=portal_type%3A%22Data+Set%22+AND+validation_state%3A%22validated%22+AND+NOT+reference%3A%22%25_invalid%22",
"portal_type": [],
"search_column_list": column_list,
"sort_column_list": column_list,
......
......@@ -236,7 +236,7 @@
</item>
<item>
<key> <string>serial</string> </key>
<value> <string>982.48449.25246.62327</string> </value>
<value> <string>983.63603.62266.8260</string> </value>
</item>
<item>
<key> <string>state</string> </key>
......@@ -254,7 +254,7 @@
</tuple>
<state>
<tuple>
<float>1587730634.22</float>
<float>1589984604.57</float>
<string>UTC</string>
</tuple>
</state>
......
......@@ -236,7 +236,7 @@
</item>
<item>
<key> <string>serial</string> </key>
<value> <string>982.48449.25246.62327</string> </value>
<value> <string>984.971.49023.47684</string> </value>
</item>
<item>
<key> <string>state</string> </key>
......@@ -254,7 +254,7 @@
</tuple>
<state>
<tuple>
<float>1587730667.12</float>
<float>1591034166.31</float>
<string>UTC</string>
</tuple>
</state>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment