Commit 33fe6425 authored by Ivan Tyagov's avatar Ivan Tyagov

Test multiple couplers setup.

See merge request !65
parents 553966a0 99b86efe
......@@ -37,7 +37,7 @@ class BeremizTest(e2e.EndToEndTestCase):
print("Sleep for 3h.")
time.sleep(3*3600)
print("Request beremiz-runtime and OSIE coupler instances.")
print("Test beremiz-runtime and OSIE coupler instances.")
# supply / request coupler in anonymous mode (no user / pass)
instance_name = time.strftime('e2e-test-coupler-anonymous-%Y-%B-%d-%H:%M:%S')
parameter_dict = {"mode":1}
......@@ -73,7 +73,6 @@ class BeremizTest(e2e.EndToEndTestCase):
partition_parameter_kw=parameter_dict)
cls.waitUntilGreen(instance_name, 180)
connection_dict = cls.getInstanceInfos(instance_name).connection_dict
print(connection_dict)
# request a coupler in a publisher mode (ID=1)
print("Pub / Sub coupler instances.")
......@@ -87,7 +86,6 @@ class BeremizTest(e2e.EndToEndTestCase):
partition_parameter_kw=parameter_dict)
cls.waitUntilGreen(instance_name, 180)
connection_dict = cls.getInstanceInfos(instance_name).connection_dict
print(connection_dict)
cls.coupler_publisher_url_ipv6 = connection_dict.get("url-ipv6")
# request a coupler in a subscriber mode (ID=2)
......@@ -101,38 +99,52 @@ class BeremizTest(e2e.EndToEndTestCase):
partition_parameter_kw=parameter_dict)
cls.waitUntilGreen(instance_name, 180)
connection_dict = cls.getInstanceInfos(instance_name).connection_dict
print(connection_dict)
cls.coupler_subscriber_url_ipv6 = connection_dict.get("url-ipv6")
# request a coupler in a publisher mode (ID=3) with a NEW mulsticast group
print("[New multicast groupd] Pub / Sub coupler instances.")
# request a coupler in a publisher mode (ID=3) with a NEW mulsticast group and
# a coupler which is BOTH publisher and subscriber
network_group = "opc.udp://224.0.0.22:4841/"
print("[New multicast group] Pub / Sub coupler instances.")
instance_name = time.strftime('e2e-test-coupler-anonymous-publisher-new-%Y-%B-%d-%H:%M:%S')
parameter_dict = {"mode":1,
"opc_ua_port": 6842,
"heart_beat": 1,
"network_address_url_data_type": "opc.udp://224.0.0.22:4841/",
"network_address_url_data_type": network_group,
"id":3}
cls.request(cls.coupler_release,
instance_name,
partition_parameter_kw=parameter_dict)
cls.waitUntilGreen(instance_name, 180)
connection_dict = cls.getInstanceInfos(instance_name).connection_dict
print(connection_dict)
cls.coupler_publisher_new_url_ipv6 = connection_dict.get("url-ipv6")
# request a coupler in a subscriber mode (ID=2)
instance_name = time.strftime('e2e-test-coupler-anonymous-subscriber-new-%Y-%B-%d-%H:%M:%S')
# request a coupler in a both publisher and subscriber mode
instance_name = time.strftime('e2e-test-coupler-anonymous-publisher-subscriber-new-%Y-%B-%d-%H:%M:%S')
parameter_dict = {"mode":1,
"opc_ua_port": 6843,
"heart_beat": 1,
"heart_beat_id_list": "3",
"network_address_url_data_type": "opc.udp://224.0.0.22:4841/",
"network_address_url_data_type": network_group,
"id":4}
cls.request(cls.coupler_release,
instance_name,
partition_parameter_kw=parameter_dict)
cls.waitUntilGreen(instance_name, 180)
connection_dict = cls.getInstanceInfos(instance_name).connection_dict
print(connection_dict)
cls.coupler_publisher_subscriber_new_url_ipv6 = connection_dict.get("url-ipv6")
# request a coupler in a subscriber mode
instance_name = time.strftime('e2e-test-coupler-anonymous-subscriber-new-%Y-%B-%d-%H:%M:%S')
parameter_dict = {"mode":1,
"opc_ua_port": 6844,
"heart_beat_id_list": "4",
"network_address_url_data_type": network_group,
"id":5}
cls.request(cls.coupler_release,
instance_name,
partition_parameter_kw=parameter_dict)
cls.waitUntilGreen(instance_name, 180)
connection_dict = cls.getInstanceInfos(instance_name).connection_dict
cls.coupler_subscriber_new_url_ipv6 = connection_dict.get("url-ipv6")
def test_01_plc_increment_run(self):
......@@ -153,7 +165,6 @@ class BeremizTest(e2e.EndToEndTestCase):
try:
client.connect()
root = client.get_root_node()
children_list = root.get_children()
var = client.get_node(OPC_UA_IDENTIFIER)
for i in range (0, NUMBER_OF_CHECKS):
i2c0_relay0_before = var.get_value()
......@@ -182,7 +193,6 @@ class BeremizTest(e2e.EndToEndTestCase):
client.set_password(self.password)
client.connect()
root = client.get_root_node()
children_list = root.get_children()
var = client.get_node(OPC_UA_IDENTIFIER)
# by default it is a 0
self.assertEqual(var.get_value(), 0)
......@@ -201,15 +211,12 @@ class BeremizTest(e2e.EndToEndTestCase):
# give it some time for services(runtime & coupler) to warm up, connect and run
time.sleep(60)
# connect to a session at OPC-UA server
print("Test with default network mulsticast group.")
client = Client(self.coupler_subscriber_url_ipv6)
# for now this is the only test thus we start it without a wrapper
test_failures = 0
try:
client.connect()
root = client.get_root_node()
children_list = root.get_children()
var = client.get_node(HEART_BEAT_IDENTIFIER)
for i in range (0, NUMBER_OF_CHECKS):
heart_beat_before = var.get_value()
......@@ -229,13 +236,22 @@ class BeremizTest(e2e.EndToEndTestCase):
finally:
client.disconnect()
# connect to a session at OPC-UA server
client = Client(self.coupler_subscriber_new_url_ipv6)
# no failures
self.assertEqual(test_failures, 0)
def test_04_keep_alive_network_multiple_groups(self):
"""
Test keep alive network with multiple multicast networks.
"""
NUMBER_OF_CHECKS = 10
TIMEOUT = 3
print("Test with dedicated network mulsticast group.")
client = Client(self.coupler_publisher_subscriber_new_url_ipv6)
test_failures = 0
try:
client.connect()
root = client.get_root_node()
children_list = root.get_children()
var = client.get_node(HEART_BEAT_IDENTIFIER)
for i in range (0, NUMBER_OF_CHECKS):
heart_beat_before = var.get_value()
......@@ -257,3 +273,39 @@ class BeremizTest(e2e.EndToEndTestCase):
# no failures
self.assertEqual(test_failures, 0)
def test_05_keep_alive_network_both_publisher_and_subscriber(self):
"""
Test keep alive network with multiple multicast networks when
coupler is both publisher and subscriber.
"""
NUMBER_OF_CHECKS = 10
TIMEOUT = 3
print("Test with both publisher and subscriber.")
client = Client(self.coupler_subscriber_new_url_ipv6)
test_failures = 0
try:
client.connect()
root = client.get_root_node()
var = client.get_node(HEART_BEAT_IDENTIFIER)
for i in range (0, NUMBER_OF_CHECKS):
heart_beat_before = var.get_value()
print("\nheart_beat (before) = ", heart_beat_before)
print("Sleep for %s seconds ..." %TIMEOUT)
time.sleep(TIMEOUT)
heart_beat_after = var.get_value()
print("heart_beat (after) = ", heart_beat_after)
# for the wait timeout runtime should have increased the value
if (heart_beat_after <= heart_beat_before):
# counter should have been increased, mark failure
test_failures += 1
# heart_beat should be a float in this format <id>.<timestamp> to be
# sure that multiple couplers can use different groups check <id> is consistent
if int(heart_beat_before)!= 4 or int(heart_beat_after)!= 4:
test_failures += 1
finally:
client.disconnect()
# no failures
self.assertEqual(test_failures, 0)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment