Commit b65312da authored by Ophélie Gagnard's avatar Ophélie Gagnard

slapos_metadata_transform_agent: fix data conversion in Stream to Array so...

slapos_metadata_transform_agent: fix data conversion in Stream to Array so json conversion don't fail on special characters
parent 9ba3e103
......@@ -4,7 +4,7 @@
"""
context.log(reference)
if not reference:
raise ValueError('reference is None')
......
# TODO
# + review the code
# + put a slash ('/') at the end of the directories' name in MCA
import numpy as np
import re
import json
import os.path
def get_end_and_json_list(start, in_data_stream, max_scan_size = 1024 ** 3):
def get_end_and_json_list(start, in_data_stream, chunk_size = 4 * 1024 * 1024):
"""
Determine the end index of a scan (i.e. the complete scan of a filesystem).
Return the end index and a list of strings assumed to be valid json strings.
(This assumes the scanning of the file system produces correct strings.)
"""
chunk_size = 1024 # * 1024
while chunk_size < max_scan_size:
end = min(start+chunk_size, in_data_stream.getSize())
unpacked, end = in_data_stream.readMsgpackChunkList(start, end)
# assume max path size is 4096 and add 4096 for the rest
max_remaining_for_eol = 8192
end = min(start + chunk_size + max_remaining_for_eol, in_data_stream.getSize())
#unpacked, get_end = in_data_stream.readMsgpackChunkList(start, end) # LEGACY
unpacked = in_data_stream.readChunkList(start, end) # CURRENT
unpacked_string = "".join(unpacked) # CURRENT
# extending the current chunk until the next end of line,
# so json remains valid
if end < in_data_stream.getSize():
new_end_index = chunk_size
while unpacked_string[new_end_index] != '\n': # CURRENT
#while unpacked[new_end_index] != ord('\n'): # LEGACY
new_end_index += 1
end = start + new_end_index + 1
""" not useful anymore? # LEGACY / TOCLEAN
unpacked = unpacked[:end]
try: # efficient but does not prevent errors
raw_data_string = ''.join(map(chr, unpacked))
except: # not efficient but does prevent value errors and type errors
cleaned_unpacked = []
for i in unpacked:
if (type(i) == type(1) # only ints
and 0 <= i and i < 256): # i in range(256)
cleaned_unpacked.append(i)
raw_data_string = ''.join(map(chr, cleaned_unpacked))
#"""
raw_data_string = unpacked_string[:end] # CURRENT
end_scan_regexp = re.compile('.*?\[fluentbit_end\]\n', re.DOTALL)
complete_scan = end_scan_regexp.match(raw_data_string)
if not complete_scan:
chunk_size *= 2
continue
chunk_size = len(complete_scan.group()) + 1
line_list = complete_scan.group().splitlines()
timestamp_json_regexp = re.compile('.*?:(.*?)\[(.*)\]')
scan_end = end_scan_regexp.match(raw_data_string)
if not scan_end:
is_end_of_scan = False
else:
is_end_of_scan = True
end = start + len(scan_end.group()) + 1
raw_data_string = raw_data_string[:len(scan_end.group())]
context.log("DEBUG 020: len(raw_data_string) =", len(raw_data_string))
context.log("DEBUG 020: type(raw_data_string) =", type(raw_data_string))
context.log("DEBUG 020: type(raw_data_string[0]) =", type(raw_data_string[0]))
line_list = raw_data_string.splitlines()
context.log("DEBUG 020: len(line_list) =", len(line_list))
timestamp_json_regexp = re.compile(r'.*?:(.*?)\[(.*)\]')
json_string_list = [timestamp_json_regexp.match(line).group(2)
for line in line_list
if len(timestamp_json_regexp.match(line).groups()) == 2]
if (timestamp_json_regexp.match(line) and (len(timestamp_json_regexp.match(line).groups()) == 2))]
return start + chunk_size, json_string_list
return end, json_string_list, is_end_of_scan
raise NotImplementedError(
'Scan too big (<= '
+ str(chunk_size / 1024 ** 2)
+ ' Mo) or no "fluentbit_end" in the Data Stream.')
def get_triplet_list(json_string_list):
def get_triplet_list(json_string_list, is_end_of_scan):
"""
Parse unpacked and return a triplet list: (path, slice1, slice2).
path is the path of the processed file, slice1 and slice2 are two parts
......@@ -50,10 +71,26 @@ def get_triplet_list(json_string_list):
NOTE: timestamps are parsed in case they are needed for future improvement
but they are not used at the moment.
"""
if is_end_of_scan:
# this lign deletes the "fluentbit_end" at the end of a scan
# because it is not valid json
json_string_list = json_string_list[:-1]
tmp_data_list = [json.loads(json_string) for json_string in json_string_list]
data_list = []
tmp_data_list = []
fail_counter = 0
for json_string in json_string_list:
tmp_data_list.append(json.loads(json_string))
""" CURRENT
try:
tmp_data_list.append(json.loads(json_string))
except:
context.log('FAILING json_string:', json_string) # DEBUG
fail_counter += 1
pass
if fail_counter > 0:
context.log('FAILED json_string:', fail_counter) # DEBUG
#"""
data_list = []
for data in tmp_data_list:
in_list = False
if ('path' in data) and exclude_file_list:
......@@ -103,32 +140,33 @@ out_data_array.setPublicationSectionList(in_data_stream.getPublicationSectionLis
if 'file_system_image/reference_image' in in_data_stream.getPublicationSectionList():
if out_data_array.getValidationState() == 'draft':
out_data_array.validate()
if not out_data_array.getCausality():
ingestion_line = in_data_stream.getAggregateRelatedValue(portal_type='Data Ingestion Line')
resource = ingestion_line.getResource()
exclude_file_list = ingestion_line.getResourceValue().DataProduct_getExcludeFileList()
out_data_array.edit(causality=resource)
ressource = ingestion_line.getRessource()
exclude_file_list = ingestion_line.getRessourceValue().DataProduct_getExcludeFileList()
out_data_array.edit(causality=ressource)
# DEBUG advice: running the script on the whole Data Stream
# (i.e. restarting from the beginning):
#progress_indicator.setIntOffsetIndex(0)
start = progress_indicator.getIntOffsetIndex()
end = in_data_stream.getSize()
if start >= end:
return
end, json_string_list = get_end_and_json_list(start, in_data_stream)
triplet_list = get_triplet_list(json_string_list)
end, json_string_list, is_end_of_scan = get_end_and_json_list(start, in_data_stream)
context.log("DEBUG 020: len(json_string_list) =", len(json_string_list))
triplet_list = get_triplet_list(json_string_list, is_end_of_scan)
context.log("DEBUG 020: len(triplet_list) =", len(triplet_list))
uid_list = get_uid_list(triplet_list, in_data_stream)
context.log("DEBUG 020: len(uid_list) =", len(uid_list))
uid_ndarray = create_ndarray(uid_list)
context.log("len(uid_ndarray) =", len(uid_ndarray))
context.log("start =", start)
context.log("end =", end)
if start == 0:
zbigarray = None
else:
zbigarray = out_data_array.getArray()
# DEBUG advice: reset the Data Array:
# NOTE: currently, the Data Array is reset each time a scan is processed
zbigarray = None
if zbigarray is None:
zbigarray = out_data_array.initArray(shape=(0,), dtype='int64')
......
......@@ -149,7 +149,7 @@
</item>
<item>
<key> <string>serial</string> </key>
<value> <string>995.60665.3511.20872</string> </value>
<value> <string>997.41827.9954.54391</string> </value>
</item>
<item>
<key> <string>state</string> </key>
......@@ -167,7 +167,7 @@
</tuple>
<state>
<tuple>
<float>1636043117.96</float>
<float>1642690808.92</float>
<string>UTC</string>
</tuple>
</state>
......
......@@ -31,7 +31,7 @@ for movement in portal_catalog(query = query):
continue
if movement.DataIngestionLine_hasMissingRequiredItem():
raise ValueError("Transformation requires movement to have " +
"aggregated data ingestion batch")
"aggregated data ingestion batch %s" % movement.getRelativeUrl())
delivery = movement.getParentValue()
data_supply = delivery.getSpecialiseValue(portal_type="Data Supply")
data_supply_list = delivery.getSpecialiseValueList(portal_type="Data Supply")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment