Commit 41bc8fdc authored by Paul Graydon's avatar Paul Graydon

wendelin_telecom_test: Update tests

parent de82baa9
...@@ -112,7 +112,14 @@ class WendelinTelecomTest(SecurityTestCase): ...@@ -112,7 +112,14 @@ class WendelinTelecomTest(SecurityTestCase):
tag_comp_id_seed = generateRandomString(length=4, only_digits=True) tag_comp_id_seed = generateRandomString(length=4, only_digits=True)
if tag_enb_id_seed is None: if tag_enb_id_seed is None:
tag_enb_id_seed = generateRandomString(length=5, hexadecimal=True) tag_enb_id_seed = generateRandomString(length=5, hexadecimal=True)
ors_tag = 'ors%s_COMP-%s_e0x%sTest' % (tag_hostname_seed, tag_comp_id_seed, tag_enb_id_seed)
ors_tag = 'ors%s' % tag_hostname_seed
# Only include a subsequent section if a non-empty string is explicitly provided for it
if tag_comp_id_seed != '':
ors_tag += '_COMP-%s' % tag_comp_id_seed
if tag_enb_id_seed != '':
ors_tag += '_e0x%s' % tag_enb_id_seed
ors_tag += 'Test'
response = self.portal.ERP5Site_registerOrs(ors_tag) response = self.portal.ERP5Site_registerOrs(ors_tag)
self.tic() self.tic()
...@@ -476,12 +483,13 @@ class WendelinTelecomTest(SecurityTestCase): ...@@ -476,12 +483,13 @@ class WendelinTelecomTest(SecurityTestCase):
self.assertTrue(new_project_item_dict['project'] is None) self.assertTrue(new_project_item_dict['project'] is None)
self.assertTrue(new_project_item_dict['client_user'] == project_item_dict['client_user']) self.assertTrue(new_project_item_dict['client_user'] == project_item_dict['client_user'])
def test_03_registerOrs(self): def test_03_1_registerValidOrs(self):
''' '''
Test the script called during slave instantiation in SlapOS by an ORS to automatically register itself. Test the script called during slave instantiation in SlapOS by an ORS to automatically register itself.
Check all detected cases. Check all valid cases.
''' '''
# Call the script with an initial seed setup
tag_hostname_seed = generateRandomString(length=3, only_digits=True) tag_hostname_seed = generateRandomString(length=3, only_digits=True)
tag_comp_id_seed = generateRandomString(length=4, only_digits=True) tag_comp_id_seed = generateRandomString(length=4, only_digits=True)
tag_enb_id_seed = generateRandomString(length=5, hexadecimal=True) tag_enb_id_seed = generateRandomString(length=5, hexadecimal=True)
...@@ -512,25 +520,6 @@ class WendelinTelecomTest(SecurityTestCase): ...@@ -512,25 +520,6 @@ class WendelinTelecomTest(SecurityTestCase):
self.assertTrue('error_msg' in response_dict) self.assertTrue('error_msg' in response_dict)
self.assertTrue(response_dict['error_msg'] == "ORS with tag %s already exists." % ors_item_dict['data_acquisition_unit'].getReference()) self.assertTrue(response_dict['error_msg'] == "ORS with tag %s already exists." % ors_item_dict['data_acquisition_unit'].getReference())
# Generate a new seed that will cause the tag to be invalid
invalid_tag_hostname_seed = 'invalid_hostname'
# Call the script a third time with the new seed
# This should error out as the tag is invalid
invalid_ors_item_dict = self.registerOrs(
tag_hostname_seed=invalid_tag_hostname_seed,
tag_comp_id_seed=tag_comp_id_seed,
tag_enb_id_seed=tag_enb_id_seed
)
# Parse the JSON response and check the error message
response_dict = json.loads(invalid_ors_item_dict['response'])
self.assertTrue('error_msg' in response_dict)
self.assertTrue(response_dict['error_msg'] == "Invalid ORS tag ors%s_COMP-%s_e0x%sTest found" % (invalid_tag_hostname_seed, tag_comp_id_seed, tag_enb_id_seed))
# Check that the Data Acquisition Unit and Data Supply have NOT been created
self.assertTrue(invalid_ors_item_dict['data_acquisition_unit'] is None)
self.assertTrue(invalid_ors_item_dict['data_supply'] is None)
# Now, link the original Data Supply to a client project # Now, link the original Data Supply to a client project
project_a_item_dict = self.registerOrsClientProject() project_a_item_dict = self.registerOrsClientProject()
project_a_url = project_a_item_dict['project'].getRelativeUrl() project_a_url = project_a_item_dict['project'].getRelativeUrl()
...@@ -546,6 +535,7 @@ class WendelinTelecomTest(SecurityTestCase): ...@@ -546,6 +535,7 @@ class WendelinTelecomTest(SecurityTestCase):
) )
# Check that the Data Acquisition Unit and Data Supply have been created # Check that the Data Acquisition Unit and Data Supply have been created
# And that the new Data Supply has automatically been linked to the project
self.assertTrue(new_enb_id_ors_item_dict['data_acquisition_unit'] is not None) self.assertTrue(new_enb_id_ors_item_dict['data_acquisition_unit'] is not None)
self.assertTrue(new_enb_id_ors_item_dict['data_supply'] is not None) self.assertTrue(new_enb_id_ors_item_dict['data_supply'] is not None)
self.assertTrue(new_enb_id_ors_item_dict['data_supply'].getDestinationProject() == project_a_url) self.assertTrue(new_enb_id_ors_item_dict['data_supply'].getDestinationProject() == project_a_url)
...@@ -572,6 +562,71 @@ class WendelinTelecomTest(SecurityTestCase): ...@@ -572,6 +562,71 @@ class WendelinTelecomTest(SecurityTestCase):
# it cannot be automatically decided to which project this version should be assigned to # it cannot be automatically decided to which project this version should be assigned to
self.assertTrue(another_enb_id_ors_item_dict['data_supply'].getDestinationProject() is None) self.assertTrue(another_enb_id_ors_item_dict['data_supply'].getDestinationProject() is None)
# Generate a new set of seeds
new_tag_hostname_seed = generateRandomString(length=3, only_digits=True)
new_tag_comp_id_seed = generateRandomString(length=4, only_digits=True)
# Call the script with a tag that does not contain an eNB identifier
no_enb_id_ors_item_dict = self.registerOrs(
tag_hostname_seed=new_tag_hostname_seed,
tag_comp_id_seed=new_tag_comp_id_seed,
tag_enb_id_seed=''
)
# Parse the JSON response and check that it is empty, indicating a success
response_dict = json.loads(no_enb_id_ors_item_dict['response'])
self.assertTrue(response_dict == {})
# Check that the Data Acquisition Unit and Data Supply have been created
self.assertTrue(no_enb_id_ors_item_dict['data_acquisition_unit'] is not None)
self.assertTrue(no_enb_id_ors_item_dict['data_supply'] is not None)
def test_03_2_registerInvalidOrs(self):
'''
Test the script called during slave instantiation in SlapOS by an ORS to automatically register itself.
Check all invalid cases.
'''
tag_hostname_seed = generateRandomString(length=3, only_digits=True)
tag_comp_id_seed = generateRandomString(length=4, only_digits=True)
tag_enb_id_seed = generateRandomString(length=5, hexadecimal=True)
# Generate a hostname seed that will cause the tag to have too many underscores
too_long_tag_hostname_seed = 'invalid_hostname'
# Call the script with this seed setup
# This should error out as the tag has too many underscore-separated sections
too_long_tag_ors_item_dict = self.registerOrs(
tag_hostname_seed=too_long_tag_hostname_seed,
tag_comp_id_seed=tag_comp_id_seed,
tag_enb_id_seed=tag_enb_id_seed
)
# Parse the JSON response and check the error message
response_dict = json.loads(too_long_tag_ors_item_dict['response'])
self.assertTrue('error_msg' in response_dict)
self.assertTrue(response_dict['error_msg'] == "Invalid ORS tag ors%s_COMP-%s_e0x%sTest found" \
% (too_long_tag_hostname_seed, tag_comp_id_seed, tag_enb_id_seed))
# Check that the Data Acquisition Unit and Data Supply have NOT been created
self.assertTrue(too_long_tag_ors_item_dict['data_acquisition_unit'] is None)
self.assertTrue(too_long_tag_ors_item_dict['data_supply'] is None)
# Now, call the script with only the first section in the tag: 'orsXXX'
# This should error out as the tag has too few underscore-separated sections
too_long_tag_ors_item_dict = self.registerOrs(
tag_hostname_seed=tag_hostname_seed,
tag_comp_id_seed='',
tag_enb_id_seed=''
)
# Parse the JSON response and check the error message
response_dict = json.loads(too_long_tag_ors_item_dict['response'])
self.assertTrue('error_msg' in response_dict)
self.assertTrue(response_dict['error_msg'] == "Invalid ORS tag ors%sTest found" % tag_hostname_seed)
# Check that the Data Acquisition Unit and Data Supply have NOT been created
self.assertTrue(too_long_tag_ors_item_dict['data_acquisition_unit'] is None)
self.assertTrue(too_long_tag_ors_item_dict['data_supply'] is None)
def test_04_1_ingestValidOrsLogDataFromFluentd(self, data_key="valid"): def test_04_1_ingestValidOrsLogDataFromFluentd(self, data_key="valid"):
''' '''
Test a simple valid ORS log ingestion: simulate a fluentd gateway forwarding valid ORS logs to the platform, Test a simple valid ORS log ingestion: simulate a fluentd gateway forwarding valid ORS logs to the platform,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment