Commit eee332f8 authored by Eteri's avatar Eteri Committed by Eteri

erp5_wendelin: add test for checking msgpack ingestion and default data operation for ingestion

parent 12a03f27
...@@ -125,7 +125,7 @@ class Test(ERP5TypeTestCase): ...@@ -125,7 +125,7 @@ class Test(ERP5TypeTestCase):
# clean up # clean up
data_array.invalidate() data_array.invalidate()
data_stream.setData('') data_stream.setData(None)
self.tic() self.tic()
...@@ -342,3 +342,102 @@ class Test(ERP5TypeTestCase): ...@@ -342,3 +342,102 @@ class Test(ERP5TypeTestCase):
predicted = reg.predict(np.array([[4, 10]])) predicted = reg.predict(np.array([[4, 10]]))
self.assertEqual(predicted.all(),np.array([27.]).all()) self.assertEqual(predicted.all(),np.array([27.]).all())
def test_09_IngestionFromFluentdStoreMsgpack(self, old_fluentd=False):
"""
Test ingestion using a POST Request containing a msgpack encoded message
simulating input from fluentd.
"""
from DateTime import DateTime
portal = self.portal
now = DateTime()
reference="test_sensor.test_product"
title = reference
ingestion_policy = portal.portal_ingestion_policies['default']
# create related data supply and etc.
use_category = portal.restrictedTraverse("portal_categories/use/big_data/ingestion/stream")
data_operation = portal.restrictedTraverse("data_operation_module/wendelin_ingest_data")
# create Data Product
data_product = portal.data_product_module.newContent(
portal_type = "Data Product",
title = "Append to Data Stream",
reference = reference.split('.')[1])
data_product.setUseValue(use_category)
data_product.setAggregatedPortalTypeList(["Data Stream", "Progress Indicator"])
data_product.validate()
# create Data Supply
data_supply_kw = {'title': title,
'reference': reference.split('.')[0],
'version': '001',
'effective_date': now,
'expiration_date': now + 365*10}
data_supply = portal.data_supply_module.newContent( \
portal_type='Data Supply', **data_supply_kw)
data_supply.validate()
# add ingestion line
data_supply_line_kw = {'title': 'Ingest Data',
'reference': 'ingestion_operation',
'int_index': 1,
'quantity': 1.0}
data_supply_line = data_supply.newContent(portal_type='Data Supply Line', **data_supply_line_kw)
data_supply_line.setResourceValue(data_operation)
# add append to Data Stream line
data_supply_line_kw = {'title': 'Data Stream',
'reference': 'out_stream',
'int_index': 2,
'quantity': 1.0}
data_supply_line = data_supply.newContent(portal_type='Data Supply Line', \
**data_supply_line_kw)
data_supply_line.setResourceValue(data_product)
data_supply_line.setUseValue(use_category)
self.tic()
portal.log("data_supply = ", data_supply)
portal.log("ingestion_policy = ", ingestion_policy)
number_string_list = []
for my_list in list(chunks(range(0, 100001), 10)):
number_string_list.append(','.join([str(x) for x in my_list]))
real_data = '\n'.join(number_string_list)
# make sure real_data tail is also a full line
real_data += '\n'
# simulate fluentd
body = msgpack.packb([0, real_data], use_bin_type=True)
env = {'CONTENT_TYPE': 'application/octet-stream'}
path = ingestion_policy.getPath() + '/ingest?reference=' + reference
publish_kw = dict(user='ERP5TypeTestCase', env=env,
request_method='POST', stdin=StringIO(body))
response = self.publish(path, **publish_kw)
self.assertEqual(200, response.getStatus())
self.tic()
# get related Data ingestion
data_ingestion = data_supply.Base_getRelatedObjectList(portal_type='Data Ingestion')[0]
self.assertNotEqual(None, data_ingestion)
data_ingestion_line = [x for x in data_ingestion.objectValues() if x.getReference() == 'out_stream'][0]
data_stream = data_ingestion_line.getAggregateValue()
self.assertEqual('Data Stream', data_stream.getPortalType())
data_stream_data = data_stream.getData()
# body is msgpacked real data.
self.assertEqual(body, data_stream_data)
# clean up
data_stream.setData(None)
self.tic()
...@@ -3,8 +3,9 @@ web_page_module/wendelin_js ...@@ -3,8 +3,9 @@ web_page_module/wendelin_js
portal_gadgets/WendelinInformationGadget portal_gadgets/WendelinInformationGadget
portal_ingestion_policies/default_http_json portal_ingestion_policies/default_http_json
portal_ingestion_policies/default_http_json/** portal_ingestion_policies/default_http_json/**
portal_ingestion_policies/wendelin-ingestion portal_ingestion_policies/default
data_supply_module/default_http_json data_supply_module/default_http_json
data_supply_module/default_http_json/** data_supply_module/default_http_json/**
data_product_module/default_http_json data_product_module/default_http_json
data_product_module/default_http_json/** data_product_module/default_http_json/**
data_operation_module/wendelin_ingest_data
\ No newline at end of file
data_operation_module/wendelin_ingest_data
data_product_module/default_http_json data_product_module/default_http_json
data_product_module/default_http_json/** data_product_module/default_http_json/**
data_supply_module/default_http_json data_supply_module/default_http_json
...@@ -19,8 +20,8 @@ portal_categories/use ...@@ -19,8 +20,8 @@ portal_categories/use
portal_categories/use/** portal_categories/use/**
portal_gadgets/WendelinInformationGadget portal_gadgets/WendelinInformationGadget
portal_gadgets/WendelinInformationGadget/** portal_gadgets/WendelinInformationGadget/**
portal_ingestion_policies/default
portal_ingestion_policies/default_http_json portal_ingestion_policies/default_http_json
portal_ingestion_policies/default_http_json/** portal_ingestion_policies/default_http_json/**
portal_ingestion_policies/wendelin-ingestion
web_page_module/wendelin_information_gadget.html web_page_module/wendelin_information_gadget.html
web_page_module/wendelin_js web_page_module/wendelin_js
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment