Sever simulating/emulating hosting data.
Powerfuse at DyeMansion
asyncua = 1.1.5
requests = 2.31.0
import asyncio
import logging
from asyncua import Server, ua
from asyncua.common.methods import uamethod
from asyncua.common.manage_nodes import create_object_type
from asyncua.server.users import User, UserRole
import argparse
import random
import csv
import sys
import string
# command line handling
parser = argparse.ArgumentParser(description='Run OPCUA Server.')
a = parser.add_argument
a('--ipv4', help='The IPv4 address on which the OPCUA Server runs', default="")
a('--ipv6', help='The IPv6 address on which the OPCUA Server runs', default="::")
a('--ipv6-enabled', help='The IPv6 address check whether it is enabled or disabled', default="1")
a('--port', help='The port on which the OPCUA Server runs', default="4840")
a('--xml', help='Path of XML to configure Server. See asyncua doc for more details.', default=None)
a('--application_uri', help='Path of XML to configure Server. See asyncua doc for more details.', default=None)
args = parser.parse_args()
ipv4 = args.ipv4
ipv6 = args.ipv6
ipv6_enabled = args.ipv6_enabled
port = args.port
xml = args.xml
application_uri = args.application_uri
users_db = {
"Powerfuse": "password"
class UserManager:
def get_user(self, iserver, username=None, password=None, certificate=None):
if username in users_db and password == users_db[username].encode():
return User(role=UserRole.Admin)
return None
def stripAllValues(item_list):
Strip whitespace from all string values in each dictionary within a list.
:param item_list: List of dictionaries
:return: New list with all string values stripped
def stripDict(d):
return {k: v.strip() if isinstance(v, str) else v for k, v in d.items()}
return [stripDict(item) for item in item_list]
def readNodes(input_interface_description):
import csv
Reads the nodes defined in .csv style description as list of dictionaries with key values paris.
keys represent the header of the .csv file
input_interface_description: filename of the interface description
csv_data = []
with open(input_interface_description, 'r') as file:
reader = csv.DictReader(file, delimiter=',')
for row in reader:
csv_data = stripAllValues(csv_data)
return csv_data
# Class sub handler - with function that notes datachanges
class SubHandler(object):
def datachange_notification(self, node, val, data):
print(f"New data change event for node {node} with value {val}")
async def shuffleBasedOnCsvData(server, _logger, csv_data):
# The values of the nodes are again set randomly.
# Import the nodes from the CSV data
# server: opcua server
# csv_data: data in the original format of csv now in the format of list of dictionary - iterating over the rows
data_type_mapping = {
'BOOL': (ua.VariantType.Boolean, lambda: random.choice([True, False])),
'INT': (ua.VariantType.Int32, lambda: random.randint(-2**31, 2**31 - 1)),
'STRING': (ua.VariantType.String, lambda: ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(random.randint(5, 20)))),
'UDINT': (ua.VariantType.UInt32, lambda: random.randint(0, 2**32 - 1)),
'UINT': (ua.VariantType.UInt16, lambda: random.randint(0, 2**16 - 1)),
'REAL': (ua.VariantType.Float, lambda: random.uniform(-1e6, 1e6)),
for row in csv_data:
ns = 1
ns = int(row['ns'])
object_type_node_class = row['object_type_node_class']
s = row['s']
data_type = row['data_type']
description = row['description']
access_level = row['access_level']
cloud_transmission = row['cloud_transmission']
if data_type not in data_type_mapping:
_logger.error(f"Unsupported data type: {data_type} for node {s}")
root = server.nodes.root
# Get the object node
myvar = server.get_node(f"ns=1;s={s}")
# get the type and the value function of the node variable
variant_type, value_func = data_type_mapping[data_type]
# Call the value function to generate a random value
value = value_func()
# set the value explicitly with ua type and value.
value = value = ua.Variant(value, variant_type)
# Write the value to the node
await myvar.write_value(value)
async def importNodesFromCsvData(server, _logger, csv_data):
# Import the nodes from the CSV data
# server: opcua server
# csv_data: data in the original format of csv now in the format of list of dictionary - iterating over the rows
data_type_mapping = {
'BOOL': (ua.VariantType.Boolean, lambda: random.choice([True, False])),
'INT': (ua.VariantType.Int32, lambda: random.randint(-2**31, 2**31 - 1)),
'STRING': (ua.VariantType.String, lambda: ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(random.randint(5, 20)))),
'UDINT': (ua.VariantType.UInt32, lambda: random.randint(0, 2**32 - 1)),
'UINT': (ua.VariantType.UInt16, lambda: random.randint(0, 2**16 - 1)),
'REAL': (ua.VariantType.Float, lambda: random.uniform(-1e6, 1e6)),
# Get Objects node
objects = server.nodes.objects
ns = 1
object_node_name = "Powerfuse"
object_node = await objects.add_object(ua.NodeId("Powerfuse", ns), "Powerfuse")
for row in csv_data:
ns = int(row['ns'])
object_type_node_class = row['object_type_node_class']
# todo change from either s to node_id or the other way around.
s = row['s']
node_id = row['s'].strip()
data_type = row['data_type']
description = row['description']
access_level = row['access_level']
cloud_transmission = row['cloud_transmission']
# Check for valid data type
if data_type not in data_type_mapping:
_logger.error(f"Unsupported data type: {data_type} for node {s}")
# If data type is valid add it if its not already present
# Determine the data type
variant_type, value_func = data_type_mapping[data_type]
# Create initial value
initial_value = value_func()
# Create the variable
var = await object_node.add_variable(
ua.NodeId(node_id, ns),
# Set the description
# await var.set_description(ua.LocalizedText(description))
await var.set_writable(access_level == access_level)
# Set the variable to be writable
await var.set_writable()
print(f"Added node: ns={ns};s={node_id}")
# Create a subscription with a publishing interval of 500 milliseconds and assign it to the SubHandler class for handling subscription events
sub = await server.create_subscription(500, handler=SubHandler())
# Subscribe the variable node to the subscription for data change notifications
await sub.subscribe_data_change(var)
async def main():
_logger = logging.getLogger(__name__)
bool_vialogin = True
# setup our server
if bool_vialogin:
# server = Server()
user_manager = UserManager()
server = Server(user_manager = user_manager)
await server.init()
server = Server()
await server.init()
if bool(int(ipv6_enabled)):"Set endpoint to: opc.tcp://[{ipv6}]:{port}/{application_uri}")
_logger.debug(f"Setting endpoint to: opc.tcp://[{ipv4}]:{port}/{application_uri}")
if xml is not None:
await server.import_xml(xml)
# Read the interface description defining the nodes
nodes_data = readNodes(input_interface_description = "Powerfuse_Interface_definition_import.csv")
# Create nodes defined in the interface description
await importNodesFromCsvData(server = server, _logger = _logger, csv_data = nodes_data)
# Start the server"Starting server!")
# Trigger data change events
async with server:
while True:
await shuffleBasedOnCsvData(server = server, _logger = _logger, csv_data = nodes_data)
await asyncio.sleep(10)
if __name__ == "__main__":
logging.basicConfig(level=logging.ERROR), debug=False)
......@@ -34,6 +34,7 @@ cert = ${slap-connection:cert-file}
# Default parameters will be overwritten when called with
configuration.opcua-port = "4840"
configuration.ipv6-enabled = "1"
configuration.application_uri = ""
recipe =
......@@ -43,7 +44,7 @@ destination = ${directory:etc}/schema.xml
recipe = slapos.cookbook:wrapper
command-line = {{ interpreter_location }}/py {{ osie_repository_location }}/opcua-server-fhi/ --xml ${opcua-xml-url:destination} --port ${instance-parameter:configuration.opcua-port} --ipv6 ${instance-parameter:ipv6-random} --ipv6-enabled ${instance-parameter:configuration.ipv6-enabled}
command-line = {{ interpreter_location }}/py {{ osie_repository_location }}/opcua-server-fhi/ --xml ${opcua-xml-url:destination} --port ${instance-parameter:configuration.opcua-port} --ipv6 ${instance-parameter:ipv6-random} --ipv6-enabled ${instance-parameter:configuration.ipv6-enabled} --application_uri ${instance-parameter:configuration.application_uri}
wrapper-path = ${directory:service}/opcua-server-fhi-service
output = $${:wrapper-path}
