Commit 1b31a5ef authored by Ivan Tyagov's avatar Ivan Tyagov Committed by Laurent S

Use current way of ingesting added by JM.

Conflicts:
	bt5/erp5_wendelin/DocumentTemplateItem/portal_components/document.erp5.IngestionPolicy.py
	bt5/erp5_wendelin/TestTemplateItem/portal_components/test.erp5.testWendelin.py

(cherry picked from commit 1ca48658)
parent 8d4a8450
......@@ -53,12 +53,23 @@ class IngestionPolicy(Folder):
return self.portal_ingestion_policies.unpack(data)
security.declarePublic('ingest')
def ingest(self, **kw):
def ingest(self, REQUEST, **kw):
"""
Ingest chunk of raw data either from a Sensor or any of DAUs.
"""
if self.REQUEST.method != 'POST':
raise BadRequest('Only POST request is allowed.')
environ = REQUEST.environ
method = environ.pop('REQUEST_METHOD')
try:
if method != 'POST':
raise BadRequest('Only POST request is allowed.')
if REQUEST._file is not None:
assert not REQUEST.form, REQUEST.form # Are cgi and HTTPRequest fixed ?
# Query string was ignored so parse again, faking a GET request.
# Such POST is legit: https://stackoverflow.com/a/14710450
REQUEST.processInputs()
REQUEST.form['data_chunk'] = REQUEST._file.read()
finally:
environ['REQUEST_METHOD'] = method
tag_parsing_script_id = self.getScriptId()
......@@ -105,4 +116,4 @@ class IngestionPolicy(Folder):
if ingestion_script is None:
raise NotFound('No such ingestion script found: %s' %ingestion_script_id)
ingestion_script(data_chunk=data_chunk, **parameter_dict)
ingestion_script(data_chunk=data_chunk, **parameter_dict)
\ No newline at end of file
......@@ -26,14 +26,14 @@
##############################################################################
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import createZODBPythonScript
from wendelin.bigarray.array_zodb import ZBigArray
from DateTime import DateTime
from zExceptions import NotFound
from cStringIO import StringIO
import msgpack
import numpy as np
import string
import random
import urllib
def getRandomString():
return 'test_%s' %''.join([random.choice(string.ascii_letters + string.digits) \
......@@ -52,47 +52,17 @@ class Test(ERP5TypeTestCase):
def getTitle(self):
return "Wendelin Test"
def afterSetUp(self):
"""
This is ran before anything, used to set the environment
"""
# here, you can create the categories and objects your test will depend on
pass
def stepSetupIngestion(self, reference):
"""
Generic step.
"""
ingestion_policy, data_supply, data_stream, data_array = \
self.portal.portal_ingestion_policies.IngestionPolicyTool_addIngestionPolicy( \
reference = reference, \
batch_mode = 1)
# to avoid random test failures due to test execution we make start date one day before
data_supply.setStartDate(DateTime() - 1)
self.tic()
return ingestion_policy, data_supply, data_stream, data_array
def test_0_import(self):
"""
Test we can import certain libraries but still failure to do so should be a
a test step failure rather than global test failure.
"""
import scipy as _
import sklearn as _
import pandas as _
import matplotlib as _
def test_01_IngestionFromFluentd(self):
def test_01_IngestionFromFluentd(self, old_fluentd=False):
"""
Test ingestion using a POST Request containing a msgpack encoded message
simulating input from fluentd.
"""
portal = self.portal
request = portal.REQUEST
reference = getRandomString()
ingestion_policy = portal.restrictedTraverse("portal_ingestion_policies/wendelin_1")
data_supply = portal.restrictedTraverse("data_supply_module/wendelin_1")
reference = 'wendelin-default-ingestion'
number_string_list = []
for my_list in list(chunks(range(0, 100001), 10)):
number_string_list.append(','.join([str(x) for x in my_list]))
......@@ -100,120 +70,59 @@ class Test(ERP5TypeTestCase):
# make sure real_data tail is also a full line
real_data += '\n'
ingestion_policy, _, data_stream, data_array = \
self.stepSetupIngestion(reference)
# simulate fluentd
body = msgpack.packb([0, real_data], use_bin_type=True)
if old_fluentd:
env = {'CONTENT_TYPE': 'application/x-www-form-urlencoded'}
body = urllib.urlencode({'data_chunk': body})
else:
env = {'CONTENT_TYPE': 'application/octet-stream'}
path = ingestion_policy.getPath() + '/ingest?reference=' + reference
publish_kw = dict(basic='ERP5TypeTestCase:', env=env,
request_method='POST', stdin=StringIO(body))
response = self.publish(path, **publish_kw)
# Due to inconsistencies in the Zope framework,
# a normal instance returns 204. As explained at
# http://blog.ploeh.dk/2013/04/30/rest-lesson-learned-avoid-204-responses/
# turning 200 into 204 automatically when the body is empty is questionable.
self.assertEqual(200, response.getStatus())
# get related Data ingestion
data_ingestion = data_supply.Base_getRelatedObjectList(portal_type='Data Ingestion')[0]
self.assertNotEqual(None, data_ingestion)
data_ingestion_line = [x for x in data_ingestion.objectValues() if x.getReference() == 'out_data_stream'][0]
# simulate fluentd by setting proper values in REQUEST
request.method = 'POST'
data_chunk = msgpack.packb([0, real_data], use_bin_type=True)
request.set('reference', reference)
request.set('data_chunk', data_chunk)
ingestion_policy.ingest()
data_stream = data_ingestion_line.getAggregateValue()
self.assertEqual('Data Stream', data_stream.getPortalType())
data_stream_data = data_stream.getData()
self.assertEqual(real_data, data_stream_data)
# try sample transformation
data_array = portal.data_array_module.newContent(
portal_type = 'Data Array',
reference = reference)
data_array.validate()
self.tic()
data_stream.DataStream_transform(\
chunk_length = 10450, \
transform_script_id = 'DataStream_copyCSVToDataArray',
data_array_reference = reference)
self.tic()
# test that extracted array contains same values as input CSV
zarray = data_array.getArray()
self.assertEqual(np.average(zarray), np.average(np.arange(100001)))
self.assertTrue(np.array_equal(zarray, np.arange(100001)))
# test ingesting with bad reference and raise of NotFound
request.set('reference', reference + 'not_existing')
self.assertRaises(NotFound, ingestion_policy.ingest)
def test_01_1_IngestionTail(self):
"""
Test real time convertion to a numpy array by appending data to a data stream.
"""
portal = self.portal
reference = getRandomString()
number_string_list = []
for my_list in list(chunks(range(0, 10001), 10)):
number_string_list.append(','.join([str(x) for x in my_list]))
real_data = '\n'.join(number_string_list)
# make sure real_data tail is also a full line
real_data += '\n'
# clean up
data_array.invalidate()
data_stream.setData('')
_, _, data_stream, data_array = self.stepSetupIngestion(reference)
data_stream.appendData(real_data)
self.tic()
self.assertEqual(None, data_array.getArray())
# override DataStream_transformTail to actually do transformation on appenData
start = data_stream.getSize()
script_id = 'DataStream_transformTail'
script_content_list = ["start_offset, end_offset", """
# created by testWendelin.test_01_1_IngestionTail
start = %s
end = %s
context.activate().DataStream_readChunkListAndTransform( \
start, \
end, \
%s, \
transform_script_id = 'DataStream_copyCSVToDataArray', \
data_array_reference=context.getReference())""" %(start, start + 10450, 10450)]
createZODBPythonScript(
portal.portal_skins.custom,
script_id,
*script_content_list)
number_string_list = []
for my_list in list(chunks(range(10001, 200001), 10)):
number_string_list.append(','.join([str(x) for x in my_list]))
real_data = '\n'.join(number_string_list)
# make sure real_data tail is also a full line
real_data += '\n'
# append data to Data Stream and check array which should be feed now.
data_stream.appendData(real_data)
self.tic()
# test that extracted array contains same values as input CSV
zarray = data_array.getArray()
expected_numpy_array = np.arange(10001, 200001)
self.assertEqual(np.average(zarray), np.average(expected_numpy_array))
self.assertTrue(np.array_equal(zarray, expected_numpy_array))
# clean up script
portal.portal_skins.custom.manage_delObjects([script_id,])
self.tic()
# analyze numpy array using activities.
active_process = portal.portal_activities.newActiveProcess()
zarray = data_array.getArray()
max_elements = zarray.shape[0]
expected_result_list = []
jobs = 15
offset = max_elements / jobs
start = 0
end = start + offset
for _ in range(jobs):
# calculate directly expectations
expected_result_list.append(np.average(expected_numpy_array[start:end]))
data_array.activate(
active_process = active_process.getPath(), \
activity='SQLQueue').DataArray_calculateArraySliceAverageAndStore(start, end)
data_array.log('%s %s' %(start, end))
start += offset
end += offset
self.tic()
result_list = [x.getResult() for x in active_process.getResultList()]
self.assertSameSet(result_list, expected_result_list)
# final reduce job to a number
sum(result_list)
def test_01_1_IngestionFromOldFluentd(self):
self.test_01_IngestionFromFluentd(True)
def test_01_02_ParallelTransformation(self):
"""
......@@ -227,10 +136,15 @@ context.activate().DataStream_readChunkListAndTransform( \
number_string_list = [row]*20
real_data = '\n'.join(number_string_list)
portal.log( real_data)
_, _, data_stream, _ = self.stepSetupIngestion(reference)
data_stream = portal.data_stream_module.newContent(
portal_type = 'Data Stream',
reference = reference)
data_stream.appendData(real_data)
data_stream.validate()
data_array = portal.data_array_module.newContent(
portal_type = 'Data Array',
reference = reference)
data_array.validate()
self.tic()
data_stream.DataStream_transform(\
......@@ -307,4 +221,37 @@ context.activate().DataStream_readChunkListAndTransform( \
self.assertTrue(
np.array_equal(data_array.getArraySlice(0,100), \
new_array[:100]))
\ No newline at end of file
def test_04_DataBucket(self):
"""
Test data bucket
"""
bucket_stream = self.portal.data_stream_module.newContent( \
portal_type = 'Data Bucket Stream')
self.tic()
self.assertEqual(0, len(bucket_stream))
# test set and get
bin_string = "1"*100000
key = len(bucket_stream) + 1
bucket_stream.insertBucket(key, bin_string )
self.assertEqual(bin_string, bucket_stream.getBucket(key))
# test sequence
self.assertEqual(1, len(bucket_stream))
# test pop
bucket_stream.popBucket(key)
self.assertEqual(0, len(bucket_stream))
# set many buckets
for i in range(100):
bucket_stream.insertBucket(i, i*10000)
self.assertEqual(100, len(bucket_stream))
self.assertEqual(range(100), bucket_stream.getKeyList())
# test as sequence
bucket = bucket_stream.getBucketItemSequence(start_key=10, count=1)[0]
self.assertEqual(100000, bucket[1].value)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment