Commit 0e089a04 authored by Paul Graydon's avatar Paul Graydon

wendelin_telecom_test: Test ORS registration non-standard cases

parent ed1dc79c
......@@ -695,11 +695,12 @@ class WendelinTelecomTest(SecurityTestCase):
self.assertTrue(new_project_item_dict['client_user'] \
== project_item_dict['client_user'])
def test_03_1_registerValidOrs(self):
def test_03_1_registerOrsValid(self):
'''
Test the script called during slave instantiation in SlapOS
by an ORS to automatically register itself.
Check all valid cases.
Check the straightforward successful case where an ORS
changes its radio ID several times and re-registers.
'''
# Call the script with an initial seed setup
......@@ -725,7 +726,7 @@ class WendelinTelecomTest(SecurityTestCase):
self.assertTrue(ors_item_dict['data_supply'] is not None)
# Call the script a second time with the same seeds
# This should not do anything as the items already exist
# This should not do anything as the Data Acquisition Unit already exists
repeated_ors_item_dict = self.registerOrs(
tag_hostname_seed=tag_hostname_seed,
tag_comp_id_seed=tag_comp_id_seed,
......@@ -747,15 +748,23 @@ class WendelinTelecomTest(SecurityTestCase):
# Generate a new valid enb_id seed
new_tag_enb_id_seed = generateRandomString(length=5, hexadecimal=True)
# Call the script to simulate an ORS re-registering with another eNB identifier
# Call the script to simulate the ORS re-registering with another eNB identifier
new_enb_id_ors_item_dict = self.registerOrs(
tag_hostname_seed=tag_hostname_seed,
tag_comp_id_seed=tag_comp_id_seed,
tag_enb_id_seed=new_tag_enb_id_seed
)
# Parse the JSON response and check the status and message
response_dict = json.loads(new_enb_id_ors_item_dict['response'])
self.assertTrue(response_dict['status'] == "ok")
self.assertTrue(
response_dict['message'] == "ORS with tag %s successfully registered." \
% new_enb_id_ors_item_dict['data_acquisition_unit'].getReference()
)
# Check that the Data Acquisition Unit and Data Supply have been created
# And that the new Data Supply has automatically been linked to the project
# and that the new Data Supply has automatically been linked to the project
self.assertTrue(new_enb_id_ors_item_dict['data_acquisition_unit'] is not None)
self.assertTrue(new_enb_id_ors_item_dict['data_supply'] is not None)
self.assertTrue(new_enb_id_ors_item_dict['data_supply'].getDestinationProject() == project_a_url)
......@@ -777,6 +786,14 @@ class WendelinTelecomTest(SecurityTestCase):
tag_enb_id_seed=another_tag_enb_id_seed
)
# Parse the JSON response and check the status and message
response_dict = json.loads(another_enb_id_ors_item_dict['response'])
self.assertTrue(response_dict['status'] == "ok")
self.assertTrue(
response_dict['message'] == "ORS with tag %s successfully registered." \
% another_enb_id_ors_item_dict['data_acquisition_unit'].getReference()
)
# Check that the Data Acquisition Unit and Data Supply have been created
self.assertTrue(another_enb_id_ors_item_dict['data_acquisition_unit'] is not None)
self.assertTrue(another_enb_id_ors_item_dict['data_supply'] is not None)
......@@ -806,11 +823,11 @@ class WendelinTelecomTest(SecurityTestCase):
self.assertTrue(no_enb_id_ors_item_dict['data_acquisition_unit'] is not None)
self.assertTrue(no_enb_id_ors_item_dict['data_supply'] is not None)
def test_03_2_registerInvalidOrs(self):
def test_03_2_registerOrsInvalid(self):
'''
Test the script called during slave instantiation in SlapOS
by an ORS to automatically register itself.
Check all invalid cases.
Check all error-returning cases.
'''
tag_hostname_seed = generateRandomString(length=3, only_digits=True)
......@@ -856,6 +873,145 @@ class WendelinTelecomTest(SecurityTestCase):
self.assertTrue(too_long_tag_ors_item_dict['data_acquisition_unit'] is None)
self.assertTrue(too_long_tag_ors_item_dict['data_supply'] is None)
def test_03_3_registerOrsNonStandard(self):
'''
Test the script called during slave instantiation in SlapOS
by an ORS to automatically register itself.
Check the cases where an ORS re-registers itself
after the Data Acquisition Unit or the Data Supply is deleted.
'''
# Call the script with an initial seed setup
tag_hostname_seed = generateRandomString(length=3, only_digits=True)
tag_comp_id_seed = generateRandomString(length=4, only_digits=True)
tag_enb_id_seed = generateRandomString(length=5, hexadecimal=True)
ors_item_dict = self.registerOrs(
tag_hostname_seed=tag_hostname_seed,
tag_comp_id_seed=tag_comp_id_seed,
tag_enb_id_seed=tag_enb_id_seed
)
# Parse the JSON response and check that it indicates a success
response_dict = json.loads(ors_item_dict['response'])
self.assertTrue(response_dict['status'] == "ok")
self.assertTrue(
response_dict['message'] == "ORS with tag %s successfully registered." \
% ors_item_dict['data_acquisition_unit'].getReference()
)
# Check that the Data Acquisition Unit and Data Supply have been created
self.assertTrue(ors_item_dict['data_acquisition_unit'] is not None)
self.assertTrue(ors_item_dict['data_supply'] is not None)
# Now, link the original Data Supply to a client project...
project_a_item_dict = self.registerOrsClientProject()
project_a_url = project_a_item_dict['project'].getRelativeUrl()
ors_item_dict['data_supply'].setDestinationProject(project_a_url)
# ...and delete the Data Acquisition Unit
ors_item_dict['data_acquisition_unit'].invalidate()
ors_item_dict['data_acquisition_unit'].delete()
self.tic()
# Call the script a second time with the same seeds
# This should re-create the Data Acquisition Unit and the Data Supply
repeated_ors_item_dict = self.registerOrs(
tag_hostname_seed=tag_hostname_seed,
tag_comp_id_seed=tag_comp_id_seed,
tag_enb_id_seed=tag_enb_id_seed
)
# Parse the JSON response and check the status and message
response_dict = json.loads(repeated_ors_item_dict['response'])
self.assertTrue(response_dict['status'] == "ok")
self.assertTrue(
response_dict['message'] == "ORS with tag %s successfully registered." \
% ors_item_dict['data_acquisition_unit'].getReference()
)
# Now, link the above Data Supply to the same project...
repeated_ors_item_dict['data_supply'] \
.setDestinationProject(project_a_item_dict['project'].getRelativeUrl())
# ...and delete the Data Supply
repeated_ors_item_dict['data_supply'].invalidate()
repeated_ors_item_dict['data_supply'].delete()
self.tic()
# Call the script a third time, still with the same seeds
# This should not do anything as the Data Acquisition Unit already exists
re_repeated_ors_item_dict = self.registerOrs(
tag_hostname_seed=tag_hostname_seed,
tag_comp_id_seed=tag_comp_id_seed,
tag_enb_id_seed=tag_enb_id_seed
)
# Parse the JSON response and check the status and message
response_dict = json.loads(re_repeated_ors_item_dict['response'])
self.assertTrue(response_dict['status'] == "ok")
self.assertTrue(
response_dict['message'] == "ORS with tag %s already exists." \
% ors_item_dict['data_acquisition_unit'].getReference()
)
# Generate a new valid enb_id seed
new_tag_enb_id_seed = generateRandomString(length=5, hexadecimal=True)
# Call the script to simulate the ORS re-registering with another eNB identifier
# Even with the previously deleted Data Acquisition Unit and Data Supply,
# this should not error out
new_enb_id_ors_item_dict = self.registerOrs(
tag_hostname_seed=tag_hostname_seed,
tag_comp_id_seed=tag_comp_id_seed,
tag_enb_id_seed=new_tag_enb_id_seed
)
# Parse the JSON response and check the status and message
response_dict = json.loads(new_enb_id_ors_item_dict['response'])
self.assertTrue(response_dict['status'] == "ok")
self.assertTrue(
response_dict['message'] == "ORS with tag %s successfully registered." \
% new_enb_id_ors_item_dict['data_acquisition_unit'].getReference()
)
# Check that the Data Acquisition Unit and Data Supply have been created
self.assertTrue(new_enb_id_ors_item_dict['data_acquisition_unit'] is not None)
self.assertTrue(new_enb_id_ors_item_dict['data_supply'] is not None)
# As the two first registrations are not in valid states anymore,
# they should not be considered when deciding project attribution
# This Data Supply should therefore not be linked to the project
self.assertTrue(new_enb_id_ors_item_dict['data_supply'].getDestinationProject() is None)
# Now, link the above Data Supply to the project
# This should allow the next registration to be linked to it automatically
new_enb_id_ors_item_dict['data_supply'] \
.setDestinationProject(project_a_item_dict['project'].getRelativeUrl())
# Generate another valid enb_id seed
another_tag_enb_id_seed = generateRandomString(length=5, hexadecimal=True)
while another_tag_enb_id_seed == new_tag_enb_id_seed:
another_tag_enb_id_seed = generateRandomString(length=5, hexadecimal=True)
# Call the script to simulate the same ORS
# registering a third time with another eNB identifier
another_enb_id_ors_item_dict = self.registerOrs(
tag_hostname_seed=tag_hostname_seed,
tag_comp_id_seed=tag_comp_id_seed,
tag_enb_id_seed=another_tag_enb_id_seed
)
# Parse the JSON response and check the status and message
response_dict = json.loads(another_enb_id_ors_item_dict['response'])
self.assertTrue(response_dict['status'] == "ok")
self.assertTrue(
response_dict['message'] == "ORS with tag %s successfully registered." \
% another_enb_id_ors_item_dict['data_acquisition_unit'].getReference()
)
# Check that the Data Acquisition Unit and Data Supply have been created
# and that the new Data Supply has automatically been linked to the project
self.assertTrue(another_enb_id_ors_item_dict['data_acquisition_unit'] is not None)
self.assertTrue(another_enb_id_ors_item_dict['data_supply'] is not None)
self.assertTrue(another_enb_id_ors_item_dict['data_supply'].getDestinationProject() == project_a_url)
def test_04_1_ingestValidOrsLogDataFromFluentd(self, data_key="valid"):
'''
Test a simple valid ORS log ingestion: simulate a fluentd gateway forwarding valid ORS logs to the platform,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment