Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
W
wendelin
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Xiaowu Zhang
wendelin
Commits
8cb33a12
Commit
8cb33a12
authored
Jul 12, 2019
by
Ivan Tyagov
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add back wrongly removed scripts.
parent
53710404
Changes
16
Hide whitespace changes
Inline
Side-by-side
Showing
16 changed files
with
904 additions
and
1 deletion
+904
-1
bt5/erp5_wendelin_data_lake_ingestion/PathTemplateItem/portal_alarms/wendelin_data_lake_handle_analysis.xml
...Item/portal_alarms/wendelin_data_lake_handle_analysis.xml
+199
-0
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/Alarm_dataLakeHandleAnalysis.py
...s/erp5_wendelin_data_lake/Alarm_dataLakeHandleAnalysis.py
+5
-0
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/Alarm_dataLakeHandleAnalysis.xml
.../erp5_wendelin_data_lake/Alarm_dataLakeHandleAnalysis.xml
+62
-0
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/Alarm_dataLakeHandleInvalids.py
...s/erp5_wendelin_data_lake/Alarm_dataLakeHandleInvalids.py
+42
-0
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/Alarm_dataLakeHandleInvalids.xml
.../erp5_wendelin_data_lake/Alarm_dataLakeHandleInvalids.xml
+62
-0
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_createDataAnalysisList.py
...rp5_wendelin_data_lake/DataLake_createDataAnalysisList.py
+97
-0
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_createDataAnalysisList.xml
...p5_wendelin_data_lake/DataLake_createDataAnalysisList.xml
+62
-0
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_executeDataAnalysisList.py
...p5_wendelin_data_lake/DataLake_executeDataAnalysisList.py
+25
-0
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_executeDataAnalysisList.xml
...5_wendelin_data_lake/DataLake_executeDataAnalysisList.xml
+62
-0
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_executeDataOperation.py
.../erp5_wendelin_data_lake/DataLake_executeDataOperation.py
+22
-0
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_executeDataOperation.xml
...erp5_wendelin_data_lake/DataLake_executeDataOperation.xml
+62
-0
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_stopIngestionList.py
...ins/erp5_wendelin_data_lake/DataLake_stopIngestionList.py
+109
-0
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_stopIngestionList.xml
...ns/erp5_wendelin_data_lake/DataLake_stopIngestionList.xml
+62
-0
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/logEntry.xml
...ateItem/portal_skins/erp5_wendelin_data_lake/logEntry.xml
+28
-0
bt5/erp5_wendelin_data_lake_ingestion/bt/template_keep_last_workflow_history_only_path_list
...ion/bt/template_keep_last_workflow_history_only_path_list
+3
-1
bt5/erp5_wendelin_data_lake_ingestion/bt/template_path_list
bt5/erp5_wendelin_data_lake_ingestion/bt/template_path_list
+2
-0
No files found.
bt5/erp5_wendelin_data_lake_ingestion/PathTemplateItem/portal_alarms/wendelin_data_lake_handle_analysis.xml
0 → 100644
View file @
8cb33a12
<?xml version="1.0"?>
<ZopeData>
<record
id=
"1"
aka=
"AAAAAAAAAAE="
>
<pickle>
<global
name=
"Alarm"
module=
"erp5.portal_type"
/>
</pickle>
<pickle>
<dictionary>
<item>
<key>
<string>
active_sense_method_id
</string>
</key>
<value>
<string>
Alarm_dataLakeHandleAnalysis
</string>
</value>
</item>
<item>
<key>
<string>
automatic_solve
</string>
</key>
<value>
<int>
0
</int>
</value>
</item>
<item>
<key>
<string>
description
</string>
</key>
<value>
<none/>
</value>
</item>
<item>
<key>
<string>
enabled
</string>
</key>
<value>
<int>
1
</int>
</value>
</item>
<item>
<key>
<string>
id
</string>
</key>
<value>
<string>
wendelin_data_lake_handle_analysis
</string>
</value>
</item>
<item>
<key>
<string>
periodicity_hour
</string>
</key>
<value>
<tuple/>
</value>
</item>
<item>
<key>
<string>
periodicity_minute
</string>
</key>
<value>
<tuple/>
</value>
</item>
<item>
<key>
<string>
periodicity_minute_frequency
</string>
</key>
<value>
<int>
1
</int>
</value>
</item>
<item>
<key>
<string>
periodicity_month
</string>
</key>
<value>
<tuple/>
</value>
</item>
<item>
<key>
<string>
periodicity_month_day
</string>
</key>
<value>
<tuple/>
</value>
</item>
<item>
<key>
<string>
periodicity_start_date
</string>
</key>
<value>
<object>
<klass>
<global
id=
"1.1"
name=
"DateTime"
module=
"DateTime.DateTime"
/>
</klass>
<tuple>
<none/>
</tuple>
<state>
<tuple>
<float>
1420502400.0
</float>
<string>
GMT
</string>
</tuple>
</state>
</object>
</value>
</item>
<item>
<key>
<string>
periodicity_stop_date
</string>
</key>
<value>
<object>
<klass>
<reference
id=
"1.1"
/>
</klass>
<tuple>
<none/>
</tuple>
<state>
<tuple>
<float>
32503680000.0
</float>
<string>
GMT
</string>
</tuple>
</state>
</object>
</value>
</item>
<item>
<key>
<string>
periodicity_week
</string>
</key>
<value>
<tuple/>
</value>
</item>
<item>
<key>
<string>
portal_type
</string>
</key>
<value>
<string>
Alarm
</string>
</value>
</item>
<item>
<key>
<string>
title
</string>
</key>
<value>
<string>
Handle Analysis
</string>
</value>
</item>
<item>
<key>
<string>
workflow_history
</string>
</key>
<value>
<persistent>
<string
encoding=
"base64"
>
AAAAAAAAAAI=
</string>
</persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record
id=
"2"
aka=
"AAAAAAAAAAI="
>
<pickle>
<global
name=
"PersistentMapping"
module=
"Persistence.mapping"
/>
</pickle>
<pickle>
<dictionary>
<item>
<key>
<string>
data
</string>
</key>
<value>
<dictionary>
<item>
<key>
<string>
edit_workflow
</string>
</key>
<value>
<persistent>
<string
encoding=
"base64"
>
AAAAAAAAAAM=
</string>
</persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
<record
id=
"3"
aka=
"AAAAAAAAAAM="
>
<pickle>
<global
name=
"WorkflowHistoryList"
module=
"Products.ERP5Type.patches.WorkflowTool"
/>
</pickle>
<pickle>
<tuple>
<none/>
<list>
<dictionary>
<item>
<key>
<string>
action
</string>
</key>
<value>
<string>
edit
</string>
</value>
</item>
<item>
<key>
<string>
actor
</string>
</key>
<value>
<string>
zope
</string>
</value>
</item>
<item>
<key>
<string>
comment
</string>
</key>
<value>
<none/>
</value>
</item>
<item>
<key>
<string>
error_message
</string>
</key>
<value>
<string></string>
</value>
</item>
<item>
<key>
<string>
serial
</string>
</key>
<value>
<string>
977.326.43867.47035
</string>
</value>
</item>
<item>
<key>
<string>
state
</string>
</key>
<value>
<string>
current
</string>
</value>
</item>
<item>
<key>
<string>
time
</string>
</key>
<value>
<object>
<klass>
<global
name=
"DateTime"
module=
"DateTime.DateTime"
/>
</klass>
<tuple>
<none/>
</tuple>
<state>
<tuple>
<float>
1562853538.26
</float>
<string>
UTC
</string>
</tuple>
</state>
</object>
</value>
</item>
</dictionary>
</list>
</tuple>
</pickle>
</record>
</ZopeData>
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/Alarm_dataLakeHandleAnalysis.py
0 → 100644
View file @
8cb33a12
context
.
logEntry
(
"Alarm_dataLakeHandleAnalysis FIRED!"
)
portal
=
context
.
getPortalObject
()
portal
.
DataLake_stopIngestionList
()
portal
.
DataLake_createDataAnalysisList
()
portal
.
DataLake_executeDataAnalysisList
()
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/Alarm_dataLakeHandleAnalysis.xml
0 → 100644
View file @
8cb33a12
<?xml version="1.0"?>
<ZopeData>
<record
id=
"1"
aka=
"AAAAAAAAAAE="
>
<pickle>
<global
name=
"PythonScript"
module=
"Products.PythonScripts.PythonScript"
/>
</pickle>
<pickle>
<dictionary>
<item>
<key>
<string>
Script_magic
</string>
</key>
<value>
<int>
3
</int>
</value>
</item>
<item>
<key>
<string>
_bind_names
</string>
</key>
<value>
<object>
<klass>
<global
name=
"NameAssignments"
module=
"Shared.DC.Scripts.Bindings"
/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key>
<string>
_asgns
</string>
</key>
<value>
<dictionary>
<item>
<key>
<string>
name_container
</string>
</key>
<value>
<string>
container
</string>
</value>
</item>
<item>
<key>
<string>
name_context
</string>
</key>
<value>
<string>
context
</string>
</value>
</item>
<item>
<key>
<string>
name_m_self
</string>
</key>
<value>
<string>
script
</string>
</value>
</item>
<item>
<key>
<string>
name_subpath
</string>
</key>
<value>
<string>
traverse_subpath
</string>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key>
<string>
_params
</string>
</key>
<value>
<string></string>
</value>
</item>
<item>
<key>
<string>
id
</string>
</key>
<value>
<string>
Alarm_dataLakeHandleAnalysis
</string>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/Alarm_dataLakeHandleInvalids.py
0 → 100644
View file @
8cb33a12
from
Products.ZSQLCatalog.SQLCatalog
import
Query
,
SimpleQuery
return
# This alarm was deprecated - kept for test
portal
=
context
.
getPortalObject
()
portal_catalog
=
portal
.
portal_catalog
# invalidate old (more than 5 hours) pending ingestions (e.g. split ingestions that were canceled/interrumped and no resumed)
from
DateTime
import
DateTime
now
=
DateTime
()
now_minus_max
=
now
-
1.0
/
24
/
60
*
9999
now_minus_5
=
now
-
1.0
/
24
/
60
*
60
*
5
catalog_kw
=
{
'creation_date'
:
{
'query'
:
(
now_minus_max
,
now_minus_5
),
'range'
:
'minmax'
},
'simulation_state'
:
'started'
,
'portal_type'
:
'Data Ingestion'
}
for
data_ingestion
in
portal_catalog
(
**
catalog_kw
):
# search related data ingestions that are not old yet (less than 5 hours)
catalog_kw
=
{
'creation_date'
:
{
'query'
:
(
now_minus_5
,
DateTime
()),
'range'
:
'minmax'
},
'simulation_state'
:
'started'
,
'portal_type'
:
'Data Ingestion'
,
'reference'
:
data_ingestion
.
getReference
()}
invalidate
=
True
if
len
(
portal_catalog
(
**
catalog_kw
))
>
0
:
invalidate
=
False
if
invalidate
:
# invalidate related Data Stream
kw_dict
=
{
"portal_type"
:
"Data Stream"
,
"id"
:
data_ingestion
.
getId
()}
for
data_stream
in
portal_catalog
(
**
kw_dict
):
if
not
data_stream
.
getReference
().
endswith
(
"_invalid"
):
data_stream
.
setReference
(
data_stream
.
getReference
()
+
"_invalid"
)
try
:
data_stream
.
invalidate
()
except
:
context
.
logEntry
(
"[WARNING] Could not invalidate data stream '%s', it was already invalidated or draft"
%
data_stream
.
getId
())
try
:
if
not
data_ingestion
.
getReference
().
endswith
(
"_invalid"
):
data_ingestion
.
setReference
(
data_ingestion
.
getReference
()
+
"_invalid"
)
data_ingestion
.
deliver
()
except
:
context
.
logEntry
(
"[WARNING] Could not invalidate/deliver data ingestion '%s', it was already invalidated/deliver"
%
data_ingestion
.
getId
())
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/Alarm_dataLakeHandleInvalids.xml
0 → 100644
View file @
8cb33a12
<?xml version="1.0"?>
<ZopeData>
<record
id=
"1"
aka=
"AAAAAAAAAAE="
>
<pickle>
<global
name=
"PythonScript"
module=
"Products.PythonScripts.PythonScript"
/>
</pickle>
<pickle>
<dictionary>
<item>
<key>
<string>
Script_magic
</string>
</key>
<value>
<int>
3
</int>
</value>
</item>
<item>
<key>
<string>
_bind_names
</string>
</key>
<value>
<object>
<klass>
<global
name=
"NameAssignments"
module=
"Shared.DC.Scripts.Bindings"
/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key>
<string>
_asgns
</string>
</key>
<value>
<dictionary>
<item>
<key>
<string>
name_container
</string>
</key>
<value>
<string>
container
</string>
</value>
</item>
<item>
<key>
<string>
name_context
</string>
</key>
<value>
<string>
context
</string>
</value>
</item>
<item>
<key>
<string>
name_m_self
</string>
</key>
<value>
<string>
script
</string>
</value>
</item>
<item>
<key>
<string>
name_subpath
</string>
</key>
<value>
<string>
traverse_subpath
</string>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key>
<string>
_params
</string>
</key>
<value>
<string></string>
</value>
</item>
<item>
<key>
<string>
id
</string>
</key>
<value>
<string>
Alarm_dataLakeHandleInvalids
</string>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_createDataAnalysisList.py
0 → 100644
View file @
8cb33a12
from
DateTime
import
DateTime
portal
=
context
.
getPortalObject
()
portal_catalog
=
portal
.
portal_catalog
from
Products.ERP5Type.Log
import
log
now
=
DateTime
()
query_dict
=
{
"portal_type"
:
"Data Ingestion Line"
,
"resource_portal_type"
:
"Data Product"
,
"simulation_state"
:
"stopped"
}
for
line_data_ingestion
in
portal_catalog
(
**
query_dict
):
data_ingestion
=
line_data_ingestion
.
getParentValue
()
# Get applicable transformation
for
transformation
in
portal_catalog
(
portal_type
=
"Data Transformation"
,
validation_state
=
"validated"
,
resource_relative_url
=
line_data_ingestion
.
getResource
()):
# Create Analysis
try
:
try
:
data_analysis
=
portal
.
data_analysis_module
.
newContent
(
portal_type
=
"Data Analysis"
,
id
=
data_ingestion
.
getId
(),
title
=
"%s - %s"
%
(
transformation
.
getTitle
(),
data_ingestion
.
getTitle
()),
reference
=
data_ingestion
.
getReference
(),
start_date
=
now
,
specialise_value
=
transformation
,
causality_value
=
data_ingestion
,
source
=
data_ingestion
.
getSource
(),
source_section
=
data_ingestion
.
getSourceSection
(),
source_project
=
data_ingestion
.
getSourceProject
(),
destination
=
data_ingestion
.
getDestination
(),
destination_section
=
data_ingestion
.
getDestinationSection
(),
destination_project
=
data_ingestion
.
getDestinationProject
())
except
Exception
as
e
:
#log(''.join(["[WARNING] Data Analysis already created: ", str(e)]))
data_analysis
=
None
if
data_analysis
is
not
None
:
# create input and output lines
for
transformation_line
in
transformation
.
objectValues
(
portal_type
=
[
"Data Transformation Resource Line"
,
"Data Transformation Operation Line"
]):
resource
=
transformation_line
.
getResourceValue
()
quantity
=
transformation_line
.
getQuantity
()
if
isinstance
(
quantity
,
tuple
):
quantity
=
quantity
[
0
]
aggregate_set
=
set
()
# manually add device and device configuration to every line
if
line_data_ingestion
.
getAggregateDevice
()
is
not
None
:
aggregate_set
.
add
(
line_data_ingestion
.
getAggregateDevice
())
if
line_data_ingestion
.
getAggregateDeviceConfiguration
()
is
not
None
:
aggregate_set
.
add
(
line_data_ingestion
.
getAggregateDeviceConfiguration
())
if
transformation_line
.
getPortalType
()
==
"Data Transformation Resource Line"
:
# at the moment, we only check for positive or negative quantity
if
quantity
<
0
:
# it is an input line. If it is an input resource line, then we search for an
# ingestion line with the same resource. If it is an operation line
# then we search for an ingestion line with resource portal type Data Product
related_lines_list
=
portal_catalog
(
portal_type
=
"Data Ingestion Line"
,
simulation_state
=
"stopped"
,
resource_relative_url
=
resource
.
getRelativeUrl
())
for
related_line
in
related_lines_list
:
if
(
related_line
.
getParentValue
().
getReference
()
==
data_ingestion
.
getReference
()
and
related_line
.
getParentValue
().
getSimulationState
()
==
"stopped"
):
aggregate_set
.
update
(
related_line
.
getAggregateSet
())
related_line
.
getParentValue
().
deliver
()
else
:
# it is an output line
# create new item based on item_type: data array, stream, descriptor, etc.
item_type
=
resource
.
getAggregatedPortalType
()
module
=
portal
.
getDefaultModule
(
item_type
)
item
=
module
.
newContent
(
portal_type
=
item_type
,
title
=
data_ingestion
.
getTitle
(),
id
=
data_ingestion
.
getId
(),
reference
=
data_ingestion
.
getReference
(),
version
=
'001'
)
if
"Data Descriptor"
not
in
item_type
:
item
.
validate
()
aggregate_set
=
set
()
aggregate_set
.
add
(
item
)
data_analysis
.
newContent
(
portal_type
=
"Data Analysis Line"
,
title
=
transformation_line
.
getTitle
(),
reference
=
transformation_line
.
getReference
(),
int_index
=
transformation_line
.
getIntIndex
(),
resource_value
=
resource
,
quantity
=
quantity
,
quantity_unit
=
transformation_line
.
getQuantityUnit
(),
aggregate_value_set
=
aggregate_set
)
data_analysis
.
plan
()
except
Exception
as
e
:
context
.
logEntry
(
"[ERROR] Error creating Data Analysis for Data Ingestion '%s' (ID: %s): %s"
%
(
data_ingestion
.
getReference
(),
data_ingestion
.
getId
(),
str
(
e
)))
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_createDataAnalysisList.xml
0 → 100644
View file @
8cb33a12
<?xml version="1.0"?>
<ZopeData>
<record
id=
"1"
aka=
"AAAAAAAAAAE="
>
<pickle>
<global
name=
"PythonScript"
module=
"Products.PythonScripts.PythonScript"
/>
</pickle>
<pickle>
<dictionary>
<item>
<key>
<string>
Script_magic
</string>
</key>
<value>
<int>
3
</int>
</value>
</item>
<item>
<key>
<string>
_bind_names
</string>
</key>
<value>
<object>
<klass>
<global
name=
"NameAssignments"
module=
"Shared.DC.Scripts.Bindings"
/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key>
<string>
_asgns
</string>
</key>
<value>
<dictionary>
<item>
<key>
<string>
name_container
</string>
</key>
<value>
<string>
container
</string>
</value>
</item>
<item>
<key>
<string>
name_context
</string>
</key>
<value>
<string>
context
</string>
</value>
</item>
<item>
<key>
<string>
name_m_self
</string>
</key>
<value>
<string>
script
</string>
</value>
</item>
<item>
<key>
<string>
name_subpath
</string>
</key>
<value>
<string>
traverse_subpath
</string>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key>
<string>
_params
</string>
</key>
<value>
<string></string>
</value>
</item>
<item>
<key>
<string>
id
</string>
</key>
<value>
<string>
DataLake_createDataAnalysisList
</string>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_executeDataAnalysisList.py
0 → 100644
View file @
8cb33a12
from
Products.ERP5Type.Log
import
log
portal
=
context
.
getPortalObject
()
portal_catalog
=
portal
.
portal_catalog
complex_files
=
portal
.
getIngestionReferenceDictionary
()[
"complex_files_extensions"
]
for
data_analysis
in
portal_catalog
(
portal_type
=
"Data Analysis"
,
simulation_state
=
"planned"
):
try
:
if
data_analysis
.
getSimulationState
()
==
"planned"
:
process
=
True
complex_file
=
False
for
ext
in
complex_files
:
if
data_analysis
.
getReference
().
endswith
(
ext
):
complex_file
=
True
if
complex_file
:
# if server is bussy and file to process is complex, leave for next alarm
if
portal
.
portal_activities
.
countMessage
()
>
50
:
log
(
"There are more than 50 activities running, so leaving data processing of file '%s' for next alarm"
%
data_analysis
.
getReference
())
process
=
False
if
process
:
data_analysis
.
start
()
data_analysis
.
activate
(
serialization_tag
=
str
(
data_analysis
.
getUid
()))
\
.
DataLake_executeDataOperation
()
except
Exception
as
e
:
context
.
logEntry
(
"[ERROR] Error executing Data Analysis for '%s': %s"
%
(
data_analysis
.
getId
(),
str
(
e
)))
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_executeDataAnalysisList.xml
0 → 100644
View file @
8cb33a12
<?xml version="1.0"?>
<ZopeData>
<record
id=
"1"
aka=
"AAAAAAAAAAE="
>
<pickle>
<global
name=
"PythonScript"
module=
"Products.PythonScripts.PythonScript"
/>
</pickle>
<pickle>
<dictionary>
<item>
<key>
<string>
Script_magic
</string>
</key>
<value>
<int>
3
</int>
</value>
</item>
<item>
<key>
<string>
_bind_names
</string>
</key>
<value>
<object>
<klass>
<global
name=
"NameAssignments"
module=
"Shared.DC.Scripts.Bindings"
/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key>
<string>
_asgns
</string>
</key>
<value>
<dictionary>
<item>
<key>
<string>
name_container
</string>
</key>
<value>
<string>
container
</string>
</value>
</item>
<item>
<key>
<string>
name_context
</string>
</key>
<value>
<string>
context
</string>
</value>
</item>
<item>
<key>
<string>
name_m_self
</string>
</key>
<value>
<string>
script
</string>
</value>
</item>
<item>
<key>
<string>
name_subpath
</string>
</key>
<value>
<string>
traverse_subpath
</string>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key>
<string>
_params
</string>
</key>
<value>
<string></string>
</value>
</item>
<item>
<key>
<string>
id
</string>
</key>
<value>
<string>
DataLake_executeDataAnalysisList
</string>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_executeDataOperation.py
0 → 100644
View file @
8cb33a12
operation
=
None
parameter_dict
=
{}
for
analysis_line
in
context
.
objectValues
(
portal_type
=
"Data Analysis Line"
):
resource
=
analysis_line
.
getResourceValue
()
if
resource
is
not
None
:
resource_portal_type
=
resource
.
getPortalType
()
else
:
resource_portal_type
=
''
if
resource_portal_type
==
'Data Operation'
:
operation_analysis_line
=
analysis_line
operation
=
analysis_line
.
getResourceValue
()
else
:
reference
=
analysis_line
.
getReference
()
aggregate
=
analysis_line
.
getAggregateDataStreamValue
()
or
\
analysis_line
.
getAggregateDataArrayValue
()
or
\
analysis_line
.
getAggregateDataDescriptorValue
()
parameter_dict
[
reference
]
=
aggregate
script_id
=
operation
.
getScriptId
()
getattr
(
operation_analysis_line
,
script_id
)(
**
parameter_dict
)
context
.
stop
()
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_executeDataOperation.xml
0 → 100644
View file @
8cb33a12
<?xml version="1.0"?>
<ZopeData>
<record
id=
"1"
aka=
"AAAAAAAAAAE="
>
<pickle>
<global
name=
"PythonScript"
module=
"Products.PythonScripts.PythonScript"
/>
</pickle>
<pickle>
<dictionary>
<item>
<key>
<string>
Script_magic
</string>
</key>
<value>
<int>
3
</int>
</value>
</item>
<item>
<key>
<string>
_bind_names
</string>
</key>
<value>
<object>
<klass>
<global
name=
"NameAssignments"
module=
"Shared.DC.Scripts.Bindings"
/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key>
<string>
_asgns
</string>
</key>
<value>
<dictionary>
<item>
<key>
<string>
name_container
</string>
</key>
<value>
<string>
container
</string>
</value>
</item>
<item>
<key>
<string>
name_context
</string>
</key>
<value>
<string>
context
</string>
</value>
</item>
<item>
<key>
<string>
name_m_self
</string>
</key>
<value>
<string>
script
</string>
</value>
</item>
<item>
<key>
<string>
name_subpath
</string>
</key>
<value>
<string>
traverse_subpath
</string>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key>
<string>
_params
</string>
</key>
<value>
<string></string>
</value>
</item>
<item>
<key>
<string>
id
</string>
</key>
<value>
<string>
DataLake_executeDataOperation
</string>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_stopIngestionList.py
0 → 100644
View file @
8cb33a12
from
Products.ERP5Type.Log
import
log
from
Products.ZSQLCatalog.SQLCatalog
import
Query
,
SimpleQuery
import
hashlib
CHUNK_SIZE
=
200000
def
getHash
(
data_stream
):
hash_md5
=
hashlib
.
md5
()
data_stream_chunk
=
None
n_chunk
=
0
chunk_size
=
CHUNK_SIZE
while
True
:
start_offset
=
n_chunk
*
chunk_size
end_offset
=
n_chunk
*
chunk_size
+
chunk_size
try
:
data_stream_chunk
=
''
.
join
(
data_stream
.
readChunkList
(
start_offset
,
end_offset
))
except
:
# data stream is empty
data_stream_chunk
=
""
hash_md5
.
update
(
data_stream_chunk
)
if
data_stream_chunk
==
""
:
break
n_chunk
+=
1
return
hash_md5
.
hexdigest
()
def
isInterruptedAbandonedSplitIngestion
(
reference
):
from
DateTime
import
DateTime
now
=
DateTime
()
day_hours
=
1.0
/
24
/
60
*
60
*
24
# started split data ingestions for reference
catalog_kw
=
{
'portal_type'
:
'Data Ingestion'
,
'simulation_state'
:
'started'
,
'reference'
:
reference
}
invalidate
=
True
for
data_ingestion
in
portal_catalog
(
**
catalog_kw
):
# check that all related ingestions are old (more than 24 hours)
if
(
DateTime
()
-
data_ingestion
.
getCreationDate
())
<
day_hours
:
invalidate
=
False
return
invalidate
portal
=
context
.
getPortalObject
()
portal_catalog
=
portal
.
portal_catalog
reference_end_single
=
portal
.
getIngestionReferenceDictionary
()[
"single_end_suffix"
]
reference_first_split
=
portal
.
getIngestionReferenceDictionary
()[
"split_first_suffix"
]
reference_end_split
=
portal
.
getIngestionReferenceDictionary
()[
"split_end_suffix"
]
# stop single started ingestion (not split files)
for
data_ingestion
in
portal_catalog
(
portal_type
=
"Data Ingestion"
,
simulation_state
=
"started"
,
id
=
"%"
+
reference_end_single
):
if
not
portal
.
IsReferenceInvalidated
(
data_ingestion
):
related_split_ingestions
=
portal_catalog
(
portal_type
=
"Data Ingestion"
,
reference
=
data_ingestion
.
getReference
())
if
len
(
related_split_ingestions
)
==
1
:
data_stream
=
portal_catalog
.
getResultValue
(
portal_type
=
'Data Stream'
,
reference
=
data_ingestion
.
getReference
())
if
data_stream
is
not
None
:
hash_value
=
getHash
(
data_stream
)
data_stream
.
setVersion
(
hash_value
)
if
data_stream
.
getValidationState
()
!=
"validated"
:
data_stream
.
validate
()
if
data_ingestion
.
getSimulationState
()
==
"started"
:
data_ingestion
.
stop
()
# append split ingestions
for
data_ingestion
in
portal_catalog
(
portal_type
=
"Data Ingestion"
,
simulation_state
=
"started"
,
id
=
"%"
+
reference_first_split
):
if
not
portal
.
IsReferenceInvalidated
(
data_ingestion
):
if
isInterruptedAbandonedSplitIngestion
(
data_ingestion
.
getReference
()):
portal
.
ERP5Site_invalidateSplitIngestions
(
data_ingestion
.
getReference
(),
success
=
False
)
else
:
try
:
last_data_stream_id
=
""
query
=
Query
(
portal_type
=
"Data Stream"
,
reference
=
data_ingestion
.
getReference
(),
validation_state
=
"draft"
)
result_list
=
portal_catalog
(
query
=
query
,
sort_on
=
((
'creation_date'
,
'ascending'
),))
full_data_stream
=
None
for
data_stream
in
result_list
:
log
(
''
.
join
([
"Data stream for split ingestion: "
,
data_stream
.
getId
()]))
if
data_stream
.
getId
()
==
data_ingestion
.
getId
():
log
(
"It is base data stream"
)
full_data_stream
=
data_stream
else
:
log
(
"It is not base data stream, it is a part"
)
if
full_data_stream
!=
None
:
log
(
"appending content to base data stream..."
)
full_data_stream
.
appendData
(
data_stream
.
getData
())
last_data_stream_id
=
data_stream
.
getId
()
portal
.
data_stream_module
.
deleteContent
(
data_stream
.
getId
())
if
last_data_stream_id
.
endswith
(
reference_end_split
):
portal
.
ERP5Site_invalidateSplitIngestions
(
data_ingestion
.
getReference
(),
success
=
True
)
hash
=
getHash
(
full_data_stream
)
full_data_stream
.
setVersion
(
hash
)
if
full_data_stream
.
getValidationState
()
!=
"validated"
:
full_data_stream
.
validate
()
related_split_ingestions
=
portal_catalog
(
portal_type
=
"Data Ingestion"
,
simulation_state
=
"started"
,
reference
=
data_ingestion
.
getReference
())
for
ingestion
in
related_split_ingestions
:
if
ingestion
.
getId
()
==
full_data_stream
.
getId
():
if
ingestion
.
getSimulationState
()
==
"started"
:
ingestion
.
stop
()
else
:
portal
.
InvalidateReference
(
ingestion
)
ingestion
.
deliver
()
except
Exception
as
e
:
context
.
logEntry
(
"ERROR appending split data streams for ingestion: %s - reference: %s."
%
(
data_ingestion
.
getId
(),
data_ingestion
.
getReference
()))
context
.
logEntry
(
e
)
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/DataLake_stopIngestionList.xml
0 → 100644
View file @
8cb33a12
<?xml version="1.0"?>
<ZopeData>
<record
id=
"1"
aka=
"AAAAAAAAAAE="
>
<pickle>
<global
name=
"PythonScript"
module=
"Products.PythonScripts.PythonScript"
/>
</pickle>
<pickle>
<dictionary>
<item>
<key>
<string>
Script_magic
</string>
</key>
<value>
<int>
3
</int>
</value>
</item>
<item>
<key>
<string>
_bind_names
</string>
</key>
<value>
<object>
<klass>
<global
name=
"NameAssignments"
module=
"Shared.DC.Scripts.Bindings"
/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key>
<string>
_asgns
</string>
</key>
<value>
<dictionary>
<item>
<key>
<string>
name_container
</string>
</key>
<value>
<string>
container
</string>
</value>
</item>
<item>
<key>
<string>
name_context
</string>
</key>
<value>
<string>
context
</string>
</value>
</item>
<item>
<key>
<string>
name_m_self
</string>
</key>
<value>
<string>
script
</string>
</value>
</item>
<item>
<key>
<string>
name_subpath
</string>
</key>
<value>
<string>
traverse_subpath
</string>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key>
<string>
_params
</string>
</key>
<value>
<string></string>
</value>
</item>
<item>
<key>
<string>
id
</string>
</key>
<value>
<string>
DataLake_stopIngestionList
</string>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/logEntry.xml
0 → 100644
View file @
8cb33a12
<?xml version="1.0"?>
<ZopeData>
<record
id=
"1"
aka=
"AAAAAAAAAAE="
>
<pickle>
<global
name=
"ExternalMethod"
module=
"Products.ExternalMethod.ExternalMethod"
/>
</pickle>
<pickle>
<dictionary>
<item>
<key>
<string>
_function
</string>
</key>
<value>
<string>
logEntry
</string>
</value>
</item>
<item>
<key>
<string>
_module
</string>
</key>
<value>
<string>
ingestion_log
</string>
</value>
</item>
<item>
<key>
<string>
id
</string>
</key>
<value>
<string>
logEntry
</string>
</value>
</item>
<item>
<key>
<string>
title
</string>
</key>
<value>
<string></string>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
bt5/erp5_wendelin_data_lake_ingestion/bt/template_keep_last_workflow_history_only_path_list
View file @
8cb33a12
...
...
@@ -9,4 +9,6 @@ data_transformation_module/transform_raw_fif_data
data_transformation_module/transform_raw_fif_data/**
portal_ingestion_policies/wendelin_embulk
portal_categories/function/**
portal_categories/use/**
\ No newline at end of file
portal_categories/use/**
portal_alarms/wendelin_data_lake_handle_analysis
portal_alarms/wendelin_data_lake_handle_analysis/**
\ No newline at end of file
bt5/erp5_wendelin_data_lake_ingestion/bt/template_path_list
View file @
8cb33a12
...
...
@@ -7,6 +7,8 @@ data_supply_module/embulk
data_supply_module/embulk/**
data_transformation_module/transform_raw_fif_data
data_transformation_module/transform_raw_fif_data/**
portal_alarms/wendelin_data_lake_handle_analysis
portal_alarms/wendelin_data_lake_handle_analysis/**
portal_callables/DataIngestionLine_writeEmbulkIngestionToDataStream
portal_callables/DataTransformation_transformFIFDataToDataArray
portal_callables/HandleFifEmbulkIngestion
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment