Commit 7313a789 authored by Ivan Tyagov's avatar Ivan Tyagov

Stop use \n character as ingestion delimiter, only use for .CSV format where...

Stop use \n character as ingestion delimiter, only use for .CSV format where it's part of the structure of a file.
parent 9fa5088a
......@@ -76,7 +76,7 @@ if transform_script_id is not None:\n
# [warning] store current position offset in Data Stream, this can cause easily \n
# ConflictErrors and it spawns re-index activities on DataStream. Thus \n
# disable for now.\n
#data_stream.setOffset(end)\n
#data_stream.setIntOffsetIndex(end)\n
\n
# start another read in another activity\n
start += chunk_length\n
......
......@@ -58,8 +58,11 @@
In this implementation we find respective Data Stream and simply\n
append data there. We save raw JSON dictionary.\n
Ingestion Policy -> Data Supply -> Data Supply Line -> Sensor\n
-> Data Stream\n
-> Data Stream\n
"""\n
from DateTime import DateTime\n
\n
now = DateTime()\n
request = context.REQUEST\n
portal_catalog = context.portal_catalog\n
\n
......@@ -108,10 +111,11 @@ if data_chunk is not None and reference is not None:\n
for data_chunk in data_chunk_list:\n
pretty_data = str(data_chunk[1])\n
pretty_data_chunk_list.append(pretty_data)\n
data = \'\\n\'.join(pretty_data_chunk_list)\n
# each chunk of data by default should be added with a new line character\n
data = \'\\n%s\' %data\n
data = \'\'.join(pretty_data_chunk_list)\n
\n
# append data\n
data_stream.appendData(data)\n
\n
#context.log("Appended %s bytes to %s (%s, %s, %s)" \n
# %(len(data), reference, data_supply, sensor, data_stream))\n
# XXX: open question -> we do not store the act of ingestion.\n
......
......@@ -71,9 +71,11 @@ class Test(ERP5TypeTestCase):
# simulate fluentd by setting proper values in REQUEST
reference = getRandomString()
number_list = range(11)
request.method = 'POST'
real_data = ('%s\n' %','.join([str(x) for x in number_list]))*10000
number_string = ','.join([str(x) for x in range(11)])
number_string_list = [number_string]*10000
real_data = '\n'.join(number_string_list)
data_chunk = msgpack.packb([0, real_data], use_bin_type=True)
request.set('reference', reference)
request.set('data_chunk', data_chunk)
......@@ -118,7 +120,7 @@ class Test(ERP5TypeTestCase):
# ingestion handler script saves new data using new line so we
# need to remove it, it also stringifies thus we need to
data_stream_data = data_stream.getData()
self.assertEqual('\n%s' %real_data, data_stream_data) # XXX: get rid of new line in ingest script!
self.assertEqual(real_data, data_stream_data)
# try sample transformation
reference = 'test-data-array- %s' %getRandomString()
......
......@@ -50,7 +50,7 @@
<string>W: 59, 4: Unused variable \'scipy\' (unused-variable)</string>
<string>W: 61, 4: Unused variable \'pandas\' (unused-variable)</string>
<string>W: 60, 4: Unused variable \'sklearn\' (unused-variable)</string>
<string>W:110, 4: Unused variable \'data_supply\' (unused-variable)</string>
<string>W:112, 4: Unused variable \'data_supply\' (unused-variable)</string>
</tuple>
</value>
</item>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment