Commit ce1c4016 authored by Léo-Paul Géneau's avatar Léo-Paul Géneau 👾

Merge publishing and subscribing servers

parent 1b893278
......@@ -4,8 +4,8 @@ LIBS=-lstdc++ -lmavsdk -lmavsdk_action -lmavsdk_mavlink_passthrough -lmavsdk_tel
LIB_NAME := libqjswrapper.so
SRCS := mavsdk_wrapper.cpp pubsub_publish.c pubsub_subscribe.c qjs_wrapper.c
OBJS := mavsdk_wrapper.o pubsub_publish.o pubsub_subscribe.o qjs_wrapper.o
SRCS := mavsdk_wrapper.cpp pubsub.c qjs_wrapper.c
OBJS := mavsdk_wrapper.o pubsub.o qjs_wrapper.o
all: $(LIB_NAME)
......
#ifndef __PUBSUB_COMMON_H__
#define __PUBSUB_COMMON_H__
#ifndef __PUBSUB_H__
#define __PUBSUB_H__
#include <open62541/server.h>
#include <open62541/plugin/log_stdout.h>
#define countof(x) (sizeof(x) / sizeof((x)[0]))
#define DATA_SET_WRITER_ID 1
#define WRITER_GROUP_ID 1
typedef struct {
int id;
UA_Double latitude;
......@@ -24,4 +27,16 @@ typedef struct {
UA_Byte builtInType;
} VariableData;
#endif /* __PUBSUB_COMMON_H__ */
\ No newline at end of file
UA_StatusCode writeVariable(char *name, void * UA_RESTRICT pvalue,
UA_DataType type);
int runPubsub(UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl,
VariableData const *variableArray, int nbVariable,
int id, int nbReader,
void (*init_node_id)(UA_UInt32 id, int nb, int magic),
int (*get_reader_id)(int nb),
void (*update)(UA_UInt32 id, const UA_DataValue*),
UA_Boolean *running);
#endif /* __PUBSUB_H__ */
\ No newline at end of file
#ifndef __PUBSUB_PUBLISH_H__
#define __PUBSUB_PUBLISH_H__
#include "pubsub_common.h"
UA_StatusCode writeVariable(char *name, void * UA_RESTRICT pvalue,
UA_DataType type);
int publish(UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl,
VariableData const *variableArray, int nbVariable,
int id, UA_Boolean *running);
#endif /* __PUBSUB_PUBLISH_H__ */
#ifndef __PUBSUB_SUBSCRIBE_H__
#define __PUBSUB_SUBSCRIBE_H__
#include "pubsub_common.h"
int subscribe(UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl,
VariableData const *variableArray, int nbVariable, int nbReader,
void (*init_node_id)(UA_UInt32 id, int nb, int magic),
int (*get_reader_id)(int nb),
void (*update)(UA_UInt32 id, const UA_DataValue*),
UA_Boolean *running);
#endif /* __PUBSUB_SUBSCRIBE_H__ */
This diff is collapsed.
#include <open62541/plugin/log_stdout.h>
#include <open62541/plugin/pubsub_udp.h>
#include <open62541/server_config_default.h>
#include "pubsub_publish.h"
static UA_Server *server;
UA_NodeId connectionIdent, publishedDataSetIdent, writerGroupIdent;
static UA_StatusCode
addPubSubConnection(UA_Server *server, UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl,
int id){
char name[20];
UA_snprintf(name, sizeof(name - 1), "UADP Connection %d", id);
UA_PubSubConnectionConfig connectionConfig;
memset(&connectionConfig, 0, sizeof(connectionConfig));
connectionConfig.name = UA_STRING(name);
connectionConfig.transportProfileUri = *transportProfile;
connectionConfig.enabled = UA_TRUE;
UA_Variant_setScalar(&connectionConfig.address, networkAddressUrl,
&UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
connectionConfig.publisherId.numeric = id;
return UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
}
/**
* **PublishedDataSet handling**
*
* The PublishedDataSet (PDS) and PubSubConnection are the toplevel entities and
* can exist alone. The PDS contains the collection of the published fields. All
* other PubSub elements are directly or indirectly linked with the PDS or
* connection. */
static void
addPublishedDataSet(UA_Server *server, int id) {
/* The PublishedDataSetConfig contains all necessary public
* information for the creation of a new PublishedDataSet */
char name[8];
UA_snprintf(name, sizeof(name - 1), "PDS %d", id);
UA_PublishedDataSetConfig publishedDataSetConfig;
memset(&publishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig));
publishedDataSetConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
publishedDataSetConfig.name = UA_STRING(name);
/* Create new PublishedDataSet based on the PublishedDataSetConfig. */
UA_Server_addPublishedDataSet(server, &publishedDataSetConfig, &publishedDataSetIdent);
}
static UA_StatusCode
addVariable(UA_Server *server, VariableData varDetails) {
UA_VariableAttributes attr = UA_VariableAttributes_default;
UA_Variant_setScalar(&attr.value, varDetails.pdefaultValue, &UA_TYPES[varDetails.type]);
attr.description = UA_LOCALIZEDTEXT("en-US", varDetails.description);
attr.displayName = UA_LOCALIZEDTEXT("en-US", varDetails.description);
attr.dataType = UA_TYPES[varDetails.type].typeId;
attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
return UA_Server_addVariableNode(server, UA_NODEID_STRING(1, varDetails.name),
UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES),
UA_QUALIFIEDNAME(1, varDetails.description),
UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE),
attr, NULL, NULL);
}
UA_StatusCode writeVariable(char *name, void * UA_RESTRICT pvalue, UA_DataType type)
{
UA_NodeId integerNodeId = UA_NODEID_STRING(1, name);
UA_Variant var;
UA_Variant_init(&var);
UA_Variant_setScalar(&var, pvalue, &type);
return UA_Server_writeValue(server, integerNodeId, var);
}
static void
addDataSetField(UA_Server *server, VariableData varDetails) {
UA_NodeId dataSetFieldIdent;
UA_DataSetFieldConfig dataSetFieldConfig;
memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig));
dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
dataSetFieldConfig.field.variable.fieldNameAlias = UA_STRING(varDetails.description);
dataSetFieldConfig.field.variable.promotedField = UA_FALSE;
dataSetFieldConfig.field.variable.publishParameters.publishedVariable =
UA_NODEID_STRING(1, varDetails.name);
dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
UA_Server_addDataSetField(server, publishedDataSetIdent,
&dataSetFieldConfig, &dataSetFieldIdent);
}
/**
* **WriterGroup handling**
*
* The WriterGroup (WG) is part of the connection and contains the primary
* configuration parameters for the message creation. */
static void
addWriterGroup(UA_Server *server) {
/* Now we create a new WriterGroupConfig and add the group to the existing
* PubSubConnection. */
UA_WriterGroupConfig writerGroupConfig;
memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
writerGroupConfig.name = UA_STRING("Demo WriterGroup");
writerGroupConfig.publishingInterval = 100;
writerGroupConfig.enabled = UA_FALSE;
writerGroupConfig.writerGroupId = 100;
writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
writerGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
/* The configuration flags for the messages are encapsulated inside the
* message- and transport settings extension objects. These extension
* objects are defined by the standard. e.g.
* UadpWriterGroupMessageDataType */
UA_UadpWriterGroupMessageDataType *writerGroupMessage = UA_UadpWriterGroupMessageDataType_new();
/* Change message settings of writerGroup to send PublisherId,
* WriterGroupId in GroupHeader and DataSetWriterId in PayloadHeader
* of NetworkMessage */
writerGroupMessage->networkMessageContentMask = (UA_UadpNetworkMessageContentMask)(UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
writerGroupConfig.messageSettings.content.decoded.data = writerGroupMessage;
UA_Server_addWriterGroup(server, connectionIdent, &writerGroupConfig, &writerGroupIdent);
UA_Server_setWriterGroupOperational(server, writerGroupIdent);
UA_UadpWriterGroupMessageDataType_delete(writerGroupMessage);
}
/**
* **DataSetWriter handling**
*
* A DataSetWriter (DSW) is the glue between the WG and the PDS. The DSW is
* linked to exactly one PDS and contains additional information for the
* message generation. */
static UA_StatusCode
addDataSetWriter(UA_Server *server) {
/* We need now a DataSetWriter within the WriterGroup. This means we must
* create a new DataSetWriterConfig and add call the addWriterGroup function. */
UA_NodeId dataSetWriterIdent;
UA_DataSetWriterConfig dataSetWriterConfig;
memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig));
dataSetWriterConfig.name = UA_STRING("Demo DataSetWriter");
dataSetWriterConfig.dataSetWriterId = 62541;
dataSetWriterConfig.keyFrameCount = 10;
return UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
&dataSetWriterConfig, &dataSetWriterIdent);
}
int publish(UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl,
VariableData const *variableArray, int nbVariable,
int id, UA_Boolean *running) {
int i;
UA_StatusCode retval = UA_STATUSCODE_GOOD;
server = UA_Server_new();
UA_ServerConfig *config = UA_Server_getConfig(server);
UA_ServerConfig_setDefault(config);
UA_ServerConfig_addPubSubTransportLayer(config, UA_PubSubTransportLayerUDPMP());
retval |= addPubSubConnection(server, transportProfile, networkAddressUrl, id);
if (retval != UA_STATUSCODE_GOOD)
return EXIT_FAILURE;
addPublishedDataSet(server, id);
for(i = 0; i < nbVariable; i++) {
retval |= addVariable(server, variableArray[i]);
if (retval != UA_STATUSCODE_GOOD)
return EXIT_FAILURE;
addDataSetField(server, variableArray[i]);
}
addWriterGroup(server);
retval |= addDataSetWriter(server);
if (retval != UA_STATUSCODE_GOOD)
return EXIT_FAILURE;
retval |= UA_Server_run(server, running);
UA_Server_delete(server);
return retval == UA_STATUSCODE_GOOD ? EXIT_SUCCESS : EXIT_FAILURE;
}
#include <quickjs/quickjs.h>
#include "mavsdk_wrapper.h"
#include "pubsub_publish.h"
#include "pubsub_subscribe.h"
#include "pubsub.h"
static JSClassID js_drone_class_id;
......@@ -33,8 +32,7 @@ const VariableData droneVariableArray[] = {
},
};
static UA_Boolean publishing = true;
static UA_Boolean subscribing = true;
static UA_Boolean pubsub_running = true;
int nbDrone;
static JSValueConst *drone_object_id_list;
......@@ -133,44 +131,6 @@ void pubsub_publish_coordinates(double latitude, double longitude, float altitud
"Writing variable returned value %x", res);
}
static JSValue js_pubsub_publish(JSContext *ctx, JSValueConst this_val,
int argc, JSValueConst *argv)
{
const char *ipv6;
const char *port;
char urlBuffer[44];
int id;
int res;
ipv6 = JS_ToCString(ctx, argv[0]);
port = JS_ToCString(ctx, argv[1]);
UA_snprintf(urlBuffer, sizeof(urlBuffer), "opc.udp://[%s]:%s/", ipv6, port);
UA_String transportProfile =
UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
UA_NetworkAddressUrlDataType networkAddressUrl =
{UA_STRING_NULL , UA_STRING(urlBuffer)};
if (JS_ToInt32(ctx, &id, argv[2]))
return JS_EXCEPTION;
res = publish(&transportProfile, &networkAddressUrl, droneVariableArray,
countof(droneVariableArray), id, &publishing);
JS_FreeCString(ctx, ipv6);
JS_FreeCString(ctx, port);
return JS_NewInt32(ctx, res);
}
static JSValue js_init_subscription(JSContext *ctx, JSValueConst this_val,
int argc, JSValueConst *argv)
{
if (JS_ToInt32(ctx, &nbDrone, argv[0]))
return JS_EXCEPTION;
drone_object_id_list = malloc(nbDrone * sizeof(JSValueConst));
return JS_NewInt32(ctx, 0);
}
int get_drone_id(int nb) {
JSDroneData *s = JS_GetOpaque(drone_object_id_list[nb], js_drone_class_id);
return s->id;
......@@ -212,30 +172,13 @@ void pubsub_update_coordinates(UA_UInt32 id, const UA_DataValue *var)
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT, "NodeId not found");
}
static JSValue js_pubsub_write(JSContext *ctx, JSValueConst this_val,
int argc, JSValueConst *argv)
{
double latitude;
double longitude;
double altitude;
if (JS_ToFloat64(ctx, &latitude, argv[0]))
return JS_EXCEPTION;
if (JS_ToFloat64(ctx, &longitude, argv[1]))
return JS_EXCEPTION;
if (JS_ToFloat64(ctx, &altitude, argv[2]))
return JS_EXCEPTION;
pubsub_publish_coordinates(latitude, longitude, altitude);
return JS_NewInt32(ctx, 0);
}
static JSValue js_pubsub_subscribe(JSContext *ctx, JSValueConst this_val,
int argc, JSValueConst *argv)
static JSValue js_run_pubsub(JSContext *ctx, JSValueConst this_val,
int argc, JSValueConst *argv)
{
const char *ipv6;
const char *port;
char urlBuffer[44];
int id;
int res;
ipv6 = JS_ToCString(ctx, argv[0]);
......@@ -247,25 +190,50 @@ static JSValue js_pubsub_subscribe(JSContext *ctx, JSValueConst this_val,
UA_NetworkAddressUrlDataType networkAddressUrl =
{UA_STRING_NULL , UA_STRING(urlBuffer)};
res = subscribe(&transportProfile, &networkAddressUrl, droneVariableArray,
countof(droneVariableArray), nbDrone, init_node_id,
get_drone_id, pubsub_update_coordinates, &subscribing);
if (JS_ToInt32(ctx, &id, argv[2]))
return JS_EXCEPTION;
res = runPubsub(&transportProfile, &networkAddressUrl, droneVariableArray,
countof(droneVariableArray), id, nbDrone, init_node_id,
get_drone_id, pubsub_update_coordinates, &pubsub_running);
JS_FreeCString(ctx, ipv6);
JS_FreeCString(ctx, port);
return JS_NewInt32(ctx, res);
}
static JSValue js_stop_publishing(JSContext *ctx, JSValueConst this_val,
int argc, JSValueConst *argv)
static JSValue js_init_pubsub(JSContext *ctx, JSValueConst this_val,
int argc, JSValueConst *argv)
{
publishing = false;
if (JS_ToInt32(ctx, &nbDrone, argv[0]))
return JS_EXCEPTION;
drone_object_id_list = malloc(nbDrone * sizeof(JSValueConst));
return JS_NewInt32(ctx, 0);
}
static JSValue js_stop_subscribing(JSContext *ctx, JSValueConst this_val,
int argc, JSValueConst *argv)
static JSValue js_pubsub_write(JSContext *ctx, JSValueConst this_val,
int argc, JSValueConst *argv)
{
double latitude;
double longitude;
double altitude;
if (JS_ToFloat64(ctx, &latitude, argv[0]))
return JS_EXCEPTION;
if (JS_ToFloat64(ctx, &longitude, argv[1]))
return JS_EXCEPTION;
if (JS_ToFloat64(ctx, &altitude, argv[2]))
return JS_EXCEPTION;
pubsub_publish_coordinates(latitude, longitude, altitude);
return JS_NewInt32(ctx, 0);
}
static JSValue js_stop_pubsub(JSContext *ctx, JSValueConst this_val,
int argc, JSValueConst *argv)
{
subscribing = false;
pubsub_running = false;
free(drone_object_id_list);
return JS_NewInt32(ctx, 0);
}
......@@ -551,11 +519,9 @@ static const JSCFunctionListEntry js_mavsdk_funcs[] = {
JS_CFUNC_DEF("takeOff", 0, js_mavsdk_takeOff ),
JS_CFUNC_DEF("takeOffAndWait", 0, js_mavsdk_takeOffAndWait ),
JS_CFUNC_DEF("land", 0, js_mavsdk_land ),
JS_CFUNC_DEF("publish", 2, js_pubsub_publish ),
JS_CFUNC_DEF("subscribe", 2, js_pubsub_subscribe ),
JS_CFUNC_DEF("stopPublishing", 0, js_stop_publishing ),
JS_CFUNC_DEF("stopSubscribing", 0, js_stop_subscribing ),
JS_CFUNC_DEF("initSubscription", 1, js_init_subscription ),
JS_CFUNC_DEF("initPubsub", 1, js_init_pubsub ),
JS_CFUNC_DEF("runPubsub", 2, js_run_pubsub ),
JS_CFUNC_DEF("stopPubsub", 0, js_stop_pubsub ),
JS_CFUNC_DEF("pubsubWrite", 3, js_pubsub_write ),
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment