Commit 9e08072d authored by Ivan Tyagov's avatar Ivan Tyagov

WIP: Enable subscriber part of server.

parent a6953d1c
#include <open62541/client_subscriptions.h>
#include <open62541/plugin/log_stdout.h>
#include <open62541/plugin/pubsub_udp.h>
#include <open62541/server.h>
#include <open62541/server_config_default.h>
#include <open62541/types_generated.h>
#include "ua_pubsub.h"
#if defined (UA_ENABLE_PUBSUB_ETH_UADP)
#include <open62541/plugin/pubsub_ethernet.h>
#endif
#include <stdio.h>
#include <signal.h>
#include <stdlib.h>
UA_NodeId connectionIdentifier;
UA_NodeId readerGroupIdentifier;
UA_NodeId readerIdentifier;
UA_DataSetReaderConfig readerConfig;
static void fillTestDataSetMetaData(UA_DataSetMetaDataType *pMetaData);
/* callback to handle change notifications */
static void dataChangeNotificationCallback(UA_Server *server, UA_UInt32 monitoredItemId,
void *monitoredItemContext, const UA_NodeId *nodeId,
void *nodeContext, UA_UInt32 attributeId,
const UA_DataValue *var) {
if(UA_Variant_hasScalarType(&var->value, &UA_TYPES[UA_TYPES_UINT32])) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"Received Notification with value %d",
*(UA_UInt32*) var->value.data);
}
}
/* Add new connection to the server */
static UA_StatusCode addPubSubConnectionXXX(UA_Server *server, UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl) {
if((server == NULL) || (transportProfile == NULL) ||
(networkAddressUrl == NULL)) {
return UA_STATUSCODE_BADINTERNALERROR;
}
UA_StatusCode retval = UA_STATUSCODE_GOOD;
/* Configuration creation for the connection */
UA_PubSubConnectionConfig connectionConfig;
memset (&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
connectionConfig.name = UA_STRING("UDPMC Connection 1");
connectionConfig.transportProfileUri = *transportProfile;
connectionConfig.enabled = UA_TRUE;
UA_Variant_setScalar(&connectionConfig.address, networkAddressUrl,
&UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
connectionConfig.publisherId.numeric = UA_UInt32_random ();
retval |= UA_Server_addPubSubConnection (server, &connectionConfig, &connectionIdentifier);
if (retval != UA_STATUSCODE_GOOD) {
return retval;
}
return retval;
}
/**
* **ReaderGroup**
*
* ReaderGroup is used to group a list of DataSetReaders. All ReaderGroups are
* created within a PubSubConnection and automatically deleted if the connection
* is removed. All network message related filters are only available in the DataSetReader. */
/* Add ReaderGroup to the created connection */
static UA_StatusCode addReaderGroup(UA_Server *server) {
if(server == NULL) {
return UA_STATUSCODE_BADINTERNALERROR;
}
UA_StatusCode retval = UA_STATUSCODE_GOOD;
UA_ReaderGroupConfig readerGroupConfig;
memset (&readerGroupConfig, 0, sizeof(UA_ReaderGroupConfig));
readerGroupConfig.name = UA_STRING("ReaderGroup1");
retval |= UA_Server_addReaderGroup(server, connectionIdentifier, &readerGroupConfig,
&readerGroupIdentifier);
UA_Server_setReaderGroupOperational(server, readerGroupIdentifier);
return retval;
}
/**
* **DataSetReader**
*
* DataSetReader can receive NetworkMessages with the DataSetMessage
* of interest sent by the Publisher. DataSetReader provides
* the configuration necessary to receive and process DataSetMessages
* on the Subscriber side. DataSetReader must be linked with a
* SubscribedDataSet and be contained within a ReaderGroup. */
/* Add DataSetReader to the ReaderGroup */
static UA_StatusCode addDataSetReader(UA_Server *server) {
if(server == NULL) {
return UA_STATUSCODE_BADINTERNALERROR;
}
UA_StatusCode retval = UA_STATUSCODE_GOOD;
memset (&readerConfig, 0, sizeof(UA_DataSetReaderConfig));
readerConfig.name = UA_STRING("DataSet Reader 1");
/* Parameters to filter which DataSetMessage has to be processed
* by the DataSetReader */
/* The following parameters are used to show that the data published by
* tutorial_pubsub_publish.c is being subscribed and is being updated in
* the information model */
UA_UInt16 publisherIdentifier = 2234;
readerConfig.publisherId.type = &UA_TYPES[UA_TYPES_UINT16];
readerConfig.publisherId.data = &publisherIdentifier;
readerConfig.writerGroupId = 100;
readerConfig.dataSetWriterId = 62541;
/* Setting up Meta data configuration in DataSetReader */
fillTestDataSetMetaData(&readerConfig.dataSetMetaData);
retval |= UA_Server_addDataSetReader(server, readerGroupIdentifier, &readerConfig,
&readerIdentifier);
return retval;
}
/**
* **SubscribedDataSet**
*
* Set SubscribedDataSet type to TargetVariables data type.
* Add subscribedvariables to the DataSetReader */
static UA_StatusCode addSubscribedVariables(UA_Server *server, UA_NodeId dataSetReaderId) {
if(server == NULL)
return UA_STATUSCODE_BADINTERNALERROR;
UA_StatusCode retval = UA_STATUSCODE_GOOD;
UA_NodeId folderId;
UA_String folderName = readerConfig.dataSetMetaData.name;
UA_ObjectAttributes oAttr = UA_ObjectAttributes_default;
UA_QualifiedName folderBrowseName;
if(folderName.length > 0) {
oAttr.displayName.locale = UA_STRING ("en-US");
oAttr.displayName.text = folderName;
folderBrowseName.namespaceIndex = 1;
folderBrowseName.name = folderName;
}
else {
oAttr.displayName = UA_LOCALIZEDTEXT ("en-US", "Subscribed Variables");
folderBrowseName = UA_QUALIFIEDNAME (1, "Subscribed Variables");
}
UA_Server_addObjectNode (server, UA_NODEID_NULL,
UA_NODEID_NUMERIC (0, UA_NS0ID_OBJECTSFOLDER),
UA_NODEID_NUMERIC (0, UA_NS0ID_ORGANIZES),
folderBrowseName, UA_NODEID_NUMERIC (0,
UA_NS0ID_BASEOBJECTTYPE), oAttr, NULL, &folderId);
/**
* **TargetVariables**
*
* The SubscribedDataSet option TargetVariables defines a list of Variable mappings between
* received DataSet fields and target Variables in the Subscriber AddressSpace.
* The values subscribed from the Publisher are updated in the value field of these variables */
/* Create the TargetVariables with respect to DataSetMetaData fields */
UA_FieldTargetVariable *targetVars = (UA_FieldTargetVariable *)
UA_calloc(readerConfig.dataSetMetaData.fieldsSize, sizeof(UA_FieldTargetVariable));
for(size_t i = 0; i < readerConfig.dataSetMetaData.fieldsSize; i++) {
/* Variable to subscribe data */
UA_VariableAttributes vAttr = UA_VariableAttributes_default;
UA_LocalizedText_copy(&readerConfig.dataSetMetaData.fields[i].description,
&vAttr.description);
vAttr.displayName.locale = UA_STRING("en-US");
vAttr.displayName.text = readerConfig.dataSetMetaData.fields[i].name;
vAttr.dataType = readerConfig.dataSetMetaData.fields[i].dataType;
UA_NodeId newNode;
retval |= UA_Server_addVariableNode(server, UA_NODEID_NUMERIC(1, (UA_UInt32)i + 50000),
folderId,
UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT),
UA_QUALIFIEDNAME(1, (char *)readerConfig.dataSetMetaData.fields[i].name.data),
UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE),
vAttr, NULL, &newNode);
/*monitor variable*/
UA_MonitoredItemCreateRequest monRequest = UA_MonitoredItemCreateRequest_default(newNode);
UA_Server_createDataChangeMonitoredItem(server, UA_TIMESTAMPSTORETURN_SOURCE,
monRequest, NULL, dataChangeNotificationCallback);
/* For creating Targetvariables */
UA_FieldTargetDataType_init(&targetVars[i].targetVariable);
targetVars[i].targetVariable.attributeId = UA_ATTRIBUTEID_VALUE;
targetVars[i].targetVariable.targetNodeId = newNode;
}
retval = UA_Server_DataSetReader_createTargetVariables(server, dataSetReaderId,
readerConfig.dataSetMetaData.fieldsSize, targetVars);
for(size_t i = 0; i < readerConfig.dataSetMetaData.fieldsSize; i++)
UA_FieldTargetDataType_clear(&targetVars[i].targetVariable);
UA_free(targetVars);
UA_free(readerConfig.dataSetMetaData.fields);
return retval;
}
/**
* **DataSetMetaData**
*
* The DataSetMetaData describes the content of a DataSet. It provides the information necessary to decode
* DataSetMessages on the Subscriber side. DataSetMessages received from the Publisher are decoded into
* DataSet and each field is updated in the Subscriber based on datatype match of TargetVariable fields of Subscriber
* and PublishedDataSetFields of Publisher */
/* Define MetaData for TargetVariables */
static void fillTestDataSetMetaData(UA_DataSetMetaDataType *pMetaData) {
if(pMetaData == NULL) {
return;
}
UA_DataSetMetaDataType_init (pMetaData);
pMetaData->name = UA_STRING ("DataSet 1 (subscribed)");
/* Static definition of number of fields size to 2 to create four different
* targetVariables of distinct datatype
*/
pMetaData->fieldsSize = 2;
pMetaData->fields = (UA_FieldMetaData*)UA_Array_new (pMetaData->fieldsSize,
&UA_TYPES[UA_TYPES_FIELDMETADATA]);
/* heartbeat */
UA_FieldMetaData_init (&pMetaData->fields[0]);
UA_NodeId_copy (&UA_TYPES[UA_TYPES_UINT32].typeId,
&pMetaData->fields[0].dataType);
pMetaData->fields[0].builtInType = UA_NS0ID_UINT32;
pMetaData->fields[0].name = UA_STRING ("Heartbeat (subscribed)");
pMetaData->fields[0].valueRank = -1; /* scalar */
// ID
UA_FieldMetaData_init (&pMetaData->fields[1]);
UA_NodeId_copy (&UA_TYPES[UA_TYPES_UINT32].typeId,
&pMetaData->fields[1].dataType);
pMetaData->fields[1].builtInType = UA_NS0ID_UINT32;
pMetaData->fields[1].name = UA_STRING ("ID (subscribed)");
pMetaData->fields[1].valueRank = -1; /* scalar */
}
/**
* Followed by the main server code, making use of the above definitions */
/*
UA_Boolean running = true;
static void stopHandler(int sign) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
running = false;
}
*/
static int run(UA_String *transportProfile, UA_NetworkAddressUrlDataType *networkAddressUrl) {
//signal(SIGINT, stopHandler);
//signal(SIGTERM, stopHandler);
/* Return value initialized to Status Good */
//UA_StatusCode retval = UA_STATUSCODE_GOOD;
//UA_Server *server = UA_Server_new();
//UA_ServerConfig *config = UA_Server_getConfig(server);
//UA_ServerConfig_setMinimal(config, 4801, NULL);
/* Add the PubSub network layer implementation to the server config.
* The TransportLayer is acting as factory to create new connections
* on runtime. Details about the PubSubTransportLayer can be found inside the
* tutorial_pubsub_connection */
//UA_ServerConfig_addPubSubTransportLayer(config, UA_PubSubTransportLayerUDPMP());
/* API calls */
/* Add PubSubConnection */
//retval |= addPubSubConnection(server, transportProfile, networkAddressUrl);
//if (retval != UA_STATUSCODE_GOOD)
// return EXIT_FAILURE;
/* Add ReaderGroup to the created PubSubConnection */
//retval |= addReaderGroup(server);
//if (retval != UA_STATUSCODE_GOOD)
// return EXIT_FAILURE;
/* Add DataSetReader to the created ReaderGroup */
//retval |= addDataSetReader(server);
//if (retval != UA_STATUSCODE_GOOD)
// return EXIT_FAILURE;
/* Add SubscribedVariables to the created DataSetReader */
//retval |= addSubscribedVariables(server, readerIdentifier);
//if (retval != UA_STATUSCODE_GOOD)
// return EXIT_FAILURE;
//retval = UA_Server_run(server, &running);
//UA_Server_delete(server);
//return retval == UA_STATUSCODE_GOOD ? EXIT_SUCCESS : EXIT_FAILURE;
}
/*
static void
usage(char *progname) {
printf("usage: %s <uri> [device]\n", progname);
}
int main(int argc, char **argv) {
UA_String transportProfile = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING("opc.udp://224.0.0.22:4840/")};
if(argc > 1) {
if(strcmp(argv[1], "-h") == 0) {
usage(argv[0]);
return EXIT_SUCCESS;
} else if(strncmp(argv[1], "opc.udp://", 10) == 0) {
networkAddressUrl.url = UA_STRING(argv[1]);
} else if(strncmp(argv[1], "opc.eth://", 10) == 0) {
transportProfile =
UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-eth-uadp");
if(argc < 3) {
printf("Error: UADP/ETH needs an interface name\n");
return EXIT_FAILURE;
}
networkAddressUrl.networkInterface = UA_STRING(argv[2]);
networkAddressUrl.url = UA_STRING(argv[1]);
} else {
printf ("Error: unknown URI\n");
return EXIT_FAILURE;
}
}
return run(&transportProfile, &networkAddressUrl);
}
*/
...@@ -40,6 +40,7 @@ static int COUPLER_ID = 0; ...@@ -40,6 +40,7 @@ static int COUPLER_ID = 0;
UA_Server *server; UA_Server *server;
#include "keep_alive_publisher.h" #include "keep_alive_publisher.h"
#include "keep_alive_subscriber.h"
// The default port of OPC-UA server // The default port of OPC-UA server
const int DEFAULT_OPC_UA_PORT = 4840; const int DEFAULT_OPC_UA_PORT = 4840;
...@@ -261,6 +262,33 @@ int main(int argc, char **argv) ...@@ -261,6 +262,33 @@ int main(int argc, char **argv)
enablePublishHeartBeat(server, config); enablePublishHeartBeat(server, config);
} }
// XXX: subscribe part
/* Add PubSubConnection */
UA_ServerConfig_addPubSubTransportLayer(config, UA_PubSubTransportLayerUDPMP());
UA_StatusCode return_value = UA_STATUSCODE_GOOD;
UA_String transportProfile = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING("opc.udp://224.0.0.22:4840/")};
return_value |= addPubSubConnectionXXX(server, &transportProfile, &networkAddressUrl);
if ( return_value!= UA_STATUSCODE_GOOD)
return EXIT_FAILURE;
/* Add ReaderGroup to the created PubSubConnection */
return_value |= addReaderGroup(server);
if (return_value!= UA_STATUSCODE_GOOD)
return EXIT_FAILURE;
/* Add DataSetReader to the created ReaderGroup */
return_value |= addDataSetReader(server);
if (return_value != UA_STATUSCODE_GOOD)
return EXIT_FAILURE;
/* Add SubscribedVariables to the created DataSetReader */
return_value |= addSubscribedVariables(server, readerIdentifier);
if (return_value != UA_STATUSCODE_GOOD)
return EXIT_FAILURE;
// EOF: subscribe
// run server // run server
UA_StatusCode retval = UA_Server_run(server, &running); UA_StatusCode retval = UA_Server_run(server, &running);
UA_Server_delete(server); UA_Server_delete(server);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment