Commit fee9c313 authored by Roque's avatar Roque

erp5_wendelin_data_lake_ingestion: refactor get data stream script

queries
- new script to get data set number of files
parent 5256a3bc
"""
This script is called from ebulk client to get count of Data Streams for a Data set.
"""
from erp5.component.module.Log import log
portal = context.getPortalObject()
try:
data_set = portal.data_set_module.get(data_set_reference)
if data_set is None or data_set.getReference().endswith("_invalid"):
return { "status_code": 0, "result": 0 }
except Exception as e:
log("Unauthorized access to getDataStreamList: " + str(e))
return { "status_code": 1, "error_message": "401 - Unauthorized access. Please check your user credentials and try again." }
data_set_uid = data_set.getUid()
data_stream_list = context.DataSet_getDataStreamList(data_set_uid)
return { "status_code": 0, "result": len(data_stream_list) }
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>data_set_reference</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_getDataStreamCount</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
"""
This script is called from ebulk client to get list of Data Streams for a Data set.
"""
import json
from erp5.component.module.Log import log
limit=[]
if batch_size:
limit=[offset, batch_size]
portal = context.getPortalObject()
try:
data_set = portal.data_set_module.get(data_set_reference)
if data_set is None or portal.ERP5Site_checkReferenceInvalidated(data_set):
return { "status_code": 0, "result": [] }
if data_set is None or data_set.getReference().endswith("_invalid"):
return json.dumps({ "status_code": 0, "result": [] })
except Exception as e: # fails because unauthorized access
log("Unauthorized access to getDataStreamList: " + str(e))
return { "status_code": 1, "error_message": "401 - Unauthorized access. Please check your user credentials and try again." }
return json.dumps({ "status_code": 1, "error_message": "401 - Unauthorized access. Please check your user credentials and try again." })
data_set_uid = data_set.getUid()
data_stream_list = context.DataSet_getDataStreamList(data_set_uid, limit)
data_stream_dict = {}
for stream in data_set.DataSet_getDataStreamList():
if stream and not portal.ERP5Site_checkReferenceInvalidated(stream) and stream.getValidationState() != "draft":
data_stream_info_dict = { 'id': 'data_stream_module/'+stream.getId(),
'size': stream.getSize(),
'hash': stream.getVersion() }
if stream.getReference() in data_stream_dict:
data_stream_dict[stream.getReference()]['data-stream-list'].append(data_stream_info_dict)
data_stream_dict[stream.getReference()]['large-hash'] = data_stream_dict[stream.getReference()]['large-hash'] + str(stream.getVersion())
data_stream_dict[stream.getReference()]['full-size'] = int(data_stream_dict[stream.getReference()]['full-size']) + int(stream.getSize())
else:
data_stream_dict[stream.getReference()] = { 'data-stream-list': [data_stream_info_dict],
'id': 'data_stream_module/'+stream.getId(),
'reference': stream.getReference(),
'large-hash': stream.getVersion(),
'full-size': stream.getSize() }
for stream_brain in data_stream_list:
reference = stream_brain.reference
version = stream_brain.version
size = stream_brain.size
data_stream_id = stream_brain.relative_url
data_stream_info_dict = {'id': data_stream_id,
'size': size,
'hash': version}
if reference in data_stream_dict:
data_stream_dict[reference]['data-stream-list'].append(data_stream_info_dict)
data_stream_dict[reference]['large-hash'] = data_stream_dict[reference]['large-hash'] + str(version)
data_stream_dict[reference]['full-size'] = int(data_stream_dict[reference]['full-size']) + int(size)
else:
data_stream_dict[reference] = { 'data-stream-list': [data_stream_info_dict],
'id': data_stream_id,
'reference': reference,
'large-hash': version,
'full-size': size}
result_dict = { 'status_code': 0, 'result': data_stream_dict.values()}
return json.dumps(result_dict)
......@@ -50,7 +50,7 @@
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>data_set_reference</string> </value>
<value> <string>data_set_reference, offset=0, batch_size=0, get_count=False</string> </value>
</item>
<item>
<key> <string>id</string> </key>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment