Commit 9185fbb9 authored by Levin Zimmermann's avatar Levin Zimmermann

component/opcua-server: notify erp5 if read

Notifying erp5 when a read occurs isn't as straightforward as sending a
request to erp5 when a data change occurs.

I think the reason for this is that OPC UA has the concept of subscriptions [1].
But as far as I understand it this concept only applies to important events
as data changes or OPCUA Events, but not to simple "read" or "browse"
requests. So to catch those requests and forward them to ERP5 we actually
need to adjust asyncua.

In this patch I picked a dirty-adhoc-monkey-patching approach, a cleaner
version may include defining a new custom ERP5Server, which inherits
from asyncua base Server class.

Because (AFAIK) it's not possible to track the read requests with the
subscription method, we may want to unify our forwarding system by dropping
the subscription approach and move everything to a patched InternalSession.

[1] https://documentation.unified-automation.com/uasdkhp/1.4.1/html/_l2_ua_subscription.html
parent 7c413498
......@@ -29,13 +29,14 @@ initialization =
ip, port, xml, erp5_ip, erp5_port = args.ip, args.port, args.xml, args.erp5_ip, args.erp5_port
# #################################
# Define ERP5Handler
import asyncio
import asyncua
from dataclasses import dataclass, field
import json
import requests
import urllib
from asyncua.common.subscription import SubHandler
@dataclass(frozen=True)
class ERP5Handler(SubHandler):
class ERP5Handler(asyncua.common.subscription.SubHandler):
ip: str
port: str
session: requests.Session = field(default_factory=requests.Session)
......@@ -49,26 +50,35 @@ initialization =
self.call(node=node, val=val, data=data)
def event_notification(self, event):
self.call(event=event)
erp5_handler = None
if erp5_ip is not None and erp5_port is not None:
erp5_handler = ERP5Handler(erp5_ip, erp5_port)
class InternalSession(asyncua.server.internal_session.InternalSession):
async def read(self, params):
erp5_handler.call(params=params)
return await super().read(params)
# #################################
# Start OPCUA Server
import asyncio
import logging
from asyncua import Server
from asyncua.common.ua_utils import get_nodes_of_namespace
async def main():
_logger = logging.getLogger(__name__)
# setup our server
server = Server()
server = asyncua.Server()
await server.init()
server.set_endpoint(f"opc.tcp://{ip}:{port}/freeopcua/server/")
if xml is not None:
await server.import_xml(xml)
if erp5_ip is not None and erp5_port is not None:
erp5_handler = ERP5Handler(erp5_ip, erp5_port)
if erp5_handler:
subscription = await server.create_subscription(1000, erp5_handler)
await subscription.subscribe_events()
nodes = await get_nodes_of_namespace(server)
nodes = await asyncua.common.ua_utils.get_nodes_of_namespace(server)
await subscription.subscribe_data_change(nodes)
def create_session(
name, user=asyncua.server.users.User(role=asyncua.server.users.UserRole.Anonymous), external=False
):
self = server.iserver
return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
server.iserver.create_session = create_session
_logger.info("Added subscription for erp5 handler.")
_logger.info("Starting server!")
async with server:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment