Commit 4bd0aa64 authored by Roque's avatar Roque

erp5_wendelin_data_lake_ingestion: update unit tests

parent edc668b5
...@@ -70,6 +70,12 @@ class TestDataIngestion(SecurityTestCase): ...@@ -70,6 +70,12 @@ class TestDataIngestion(SecurityTestCase):
reference = reference) reference = reference)
return data_stream return data_stream
def getDataStreamChunkList(self, reference):
data_stream_list = self.portal.portal_catalog(
portal_type = 'Data Stream',
reference = reference)
return data_stream_list
def ingestRequest(self, reference, eof, data_chunk, ingestion_policy): def ingestRequest(self, reference, eof, data_chunk, ingestion_policy):
encoded_data_chunk = base64.b64encode(data_chunk) encoded_data_chunk = base64.b64encode(data_chunk)
request = self.portal.REQUEST request = self.portal.REQUEST
...@@ -84,14 +90,11 @@ class TestDataIngestion(SecurityTestCase): ...@@ -84,14 +90,11 @@ class TestDataIngestion(SecurityTestCase):
def ingest(self, data_chunk, reference, extension, eof, randomize_ingestion_reference=False): def ingest(self, data_chunk, reference, extension, eof, randomize_ingestion_reference=False):
ingestion_reference = self.getIngestionReference(reference, extension, randomize_ingestion_reference) ingestion_reference = self.getIngestionReference(reference, extension, randomize_ingestion_reference)
# use default ebulk policy # use default ebulk policy
ingestion_policy = self.portal.portal_ingestion_policies.wendelin_embulk ingestion_policy = self.portal.portal_ingestion_policies.default_ebulk
self.ingestRequest(ingestion_reference, eof, data_chunk, ingestion_policy) self.ingestRequest(ingestion_reference, eof, data_chunk, ingestion_policy)
_, ingestion_reference = self.sanitizeReference(ingestion_reference) _, ingestion_reference = self.sanitizeReference(ingestion_reference)
return ingestion_reference
return ingestion_reference
def stepIngest(self, extension, delimiter, randomize_ingestion_reference=False): def stepIngest(self, extension, delimiter, randomize_ingestion_reference=False):
file_name = "file_name.csv" file_name = "file_name.csv"
...@@ -108,16 +111,15 @@ class TestDataIngestion(SecurityTestCase): ...@@ -108,16 +111,15 @@ class TestDataIngestion(SecurityTestCase):
chunk.append(line) chunk.append(line)
else: else:
break break
ingestion_reference = self.ingest(data_chunk, reference, extension, self.SINGLE_INGESTION_END, randomize_ingestion_reference=randomize_ingestion_reference) ingestion_reference = self.ingest(data_chunk, reference, extension, self.SINGLE_INGESTION_END, randomize_ingestion_reference=randomize_ingestion_reference)
if os.path.exists(file_name): if os.path.exists(file_name):
os.remove(file_name) os.remove(file_name)
# test properly ingested # test properly ingested
data_ingestion = self.getDataIngestion(ingestion_reference) data_ingestion = self.getDataIngestion(ingestion_reference)
self.assertNotEqual(None, data_ingestion) self.assertNotEqual(None, data_ingestion)
data_ingestion_line = [x for x in data_ingestion.objectValues() \ data_ingestion_line = [x for x in data_ingestion.objectValues() \
if x.getReference() == 'out_stream'][0] if x.getReference() == 'out_stream'][0]
data_set = data_ingestion_line.getAggregateValue(portal_type='Data Set') data_set = data_ingestion_line.getAggregateValue(portal_type='Data Set')
...@@ -127,8 +129,9 @@ class TestDataIngestion(SecurityTestCase): ...@@ -127,8 +129,9 @@ class TestDataIngestion(SecurityTestCase):
data_stream_data = data_stream.getData() data_stream_data = data_stream.getData()
self.assertEqual(data_chunk, data_stream_data) self.assertEqual(data_chunk, data_stream_data)
# check Data Stream and Data Set are validated # check Data Set is validated and Data Stream is published
self.assertEqual('validated', data_stream.getValidationState()) self.assertEqual('validated', data_set.getValidationState())
self.assertEqual('published', data_stream.getValidationState())
return data_set, [data_stream] return data_set, [data_stream]
...@@ -137,10 +140,10 @@ class TestDataIngestion(SecurityTestCase): ...@@ -137,10 +140,10 @@ class TestDataIngestion(SecurityTestCase):
Test default ingestion with ebulk too. Test default ingestion with ebulk too.
""" """
self.stepIngest(self.CSV, ",") self.stepIngest(self.CSV, ",")
def test_02_DefaultSplitIngestion(self): def test_02_DefaultSplitIngestion(self):
""" """
Test multiple uploads from ebulk end up in same Data Stream concatenated Test multiple uploads from ebulk end up in multiple Data Streams
(in case of large file upload when ebluk by default splits file to 50MBs (in case of large file upload when ebluk by default splits file to 50MBs
chunks). chunks).
""" """
...@@ -152,42 +155,48 @@ class TestDataIngestion(SecurityTestCase): ...@@ -152,42 +155,48 @@ class TestDataIngestion(SecurityTestCase):
for _ in xrange(250)]) for _ in xrange(250)])
data_chunk_4 = ''.join([random.choice(string.ascii_letters + string.digits) \ data_chunk_4 = ''.join([random.choice(string.ascii_letters + string.digits) \
for _ in xrange(250)]) for _ in xrange(250)])
data_chunk = data_chunk_1 + data_chunk_2 + data_chunk_3 + data_chunk_4
reference = self.getRandomReference() reference = self.getRandomReference()
ingestion_reference = self.ingest(data_chunk_1, reference, self.FIF, self.PART_1) ingestion_reference = self.ingest(data_chunk_1, reference, self.FIF, self.PART_1)
time.sleep(1) time.sleep(1)
self.tic() self.tic()
ingestion_reference = self.ingest(data_chunk_2, reference, self.FIF, self.PART_2) ingestion_reference = self.ingest(data_chunk_2, reference, self.FIF, self.PART_2)
time.sleep(1) time.sleep(1)
self.tic() self.tic()
ingestion_reference = self.ingest(data_chunk_3, reference, self.FIF, self.PART_3) ingestion_reference = self.ingest(data_chunk_3, reference, self.FIF, self.PART_3)
time.sleep(1) time.sleep(1)
self.tic() self.tic()
ingestion_reference = self.ingest(data_chunk_4, reference, self.FIF, self.EOF) ingestion_reference = self.ingest(data_chunk_4, reference, self.FIF, self.EOF)
time.sleep(1) time.sleep(1)
self.tic() self.tic()
# call explicitly alarm so all 4 Data Streams can be concatenated to one # call explicitly alarm so all 4 Data Streams are validated and published
self.portal.portal_alarms.wendelin_data_lake_handle_analysis.Alarm_dataLakeHandleAnalysis() self.portal.portal_alarms.wendelin_handle_analysis.Alarm_handleAnalysis()
self.tic() self.tic()
# check resulting Data Stream # check resulting Data Streams
data_stream = self.getDataStream(ingestion_reference) data_stream_list = self.getDataStreamChunkList(ingestion_reference)
self.assertEqual(data_chunk, data_stream.getData()) #one data stream per chunk
self.assertEqual(len(data_stream_list), 4)
#last datastream (EOF) published, the rest validated
for stream in data_stream_list:
if stream.getId().endswith(self.EOF.replace(self.REFERENCE_SEPARATOR, "")):
self.assertEqual('published', stream.getValidationState())
else:
self.assertEqual('validated', stream.getValidationState())
def test_03_DefaultWendelinConfigurationExistency(self): def test_03_DefaultWendelinConfigurationExistency(self):
""" """
Test that nobody accidently removes needed by HowTo's default configurations. Test that nobody accidently removes needed by HowTo's default configurations.
""" """
# test default ebuk ingestion exists # test default ebuk ingestion exists
self.assertNotEqual(None, self.assertNotEqual(None,
getattr(self.portal.portal_ingestion_policies, "wendelin_embulk", None)) getattr(self.portal.portal_ingestion_policies, "default_ebulk", None))
self.assertNotEqual(None, self.assertNotEqual(None,
getattr(self.portal.data_supply_module, "embulk", None)) getattr(self.portal.data_supply_module, "embulk", None))
def test_04_DefaultModelSecurityModel(self): def test_04_DefaultModelSecurityModel(self):
...@@ -199,19 +208,17 @@ class TestDataIngestion(SecurityTestCase): ...@@ -199,19 +208,17 @@ class TestDataIngestion(SecurityTestCase):
# check data relation between Data Set and Data Streams work # check data relation between Data Set and Data Streams work
self.assertSameSet(data_stream_list, data_set.DataSet_getDataStreamList()) self.assertSameSet(data_stream_list, data_set.DataSet_getDataStreamList())
# publish data set and have all Data Streams publsihed automatically # check data set and all Data Streams states
data_set.publish() self.assertEqual('validated', data_set.getValidationState())
self.tic() self.assertSameSet(['published' for x in data_stream_list],
self.assertEqual('published', data_set.getValidationState())
self.assertSameSet(['published' for x in data_stream_list],
[x.getValidationState() for x in data_stream_list]) [x.getValidationState() for x in data_stream_list])
# invalidate Data Set should invalidate related Data Streams # invalidate Data Set should invalidate related Data Streams
data_set.invalidate() data_set.invalidate()
self.tic() self.tic()
self.assertEqual('invalidated', data_set.getValidationState()) self.assertEqual('invalidated', data_set.getValidationState())
self.assertSameSet(['invalidated' for x in data_stream_list], self.assertSameSet(['invalidated' for x in data_stream_list],
[x.getValidationState() for x in data_stream_list]) [x.getValidationState() for x in data_stream_list])
# XXX: new test which simulates download / upload of Data Set and increase DS version # XXX: new test which simulates download / upload of Data Set and increase DS version
\ No newline at end of file
...@@ -46,8 +46,8 @@ ...@@ -46,8 +46,8 @@
<key> <string>text_content_warning_message</string> </key> <key> <string>text_content_warning_message</string> </key>
<value> <value>
<tuple> <tuple>
<string>W: 99, 34: Unused variable \'i\' (unused-variable)</string> <string>W:102, 34: Unused variable \'i\' (unused-variable)</string>
<string>W: 99, 76: Unused variable \'j\' (unused-variable)</string> <string>W:102, 76: Unused variable \'j\' (unused-variable)</string>
</tuple> </tuple>
</value> </value>
</item> </item>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment