一 前言
上一篇文章讲了Server如何异步执行方法,本篇文章讲述Client如何异步执行Server提供的方法。
二 准备Server
先写个Server,提供2个功能一样的方法,名字叫Hello World1和Hello World2,下面是代码,
#include "open62541.h"
#include <signal.h>
#include <stdlib.h>
#ifndef WIN32
#include <pthread.h>
#define THREAD_HANDLE pthread_t
#define THREAD_CREATE(handle, callback) pthread_create(&handle, NULL, callback, NULL)
#define THREAD_JOIN(handle) pthread_join(handle, NULL)
#define THREAD_CALLBACK(name) static void * name(void *_)
#else
#include <windows.h>
#define THREAD_HANDLE HANDLE
#define THREAD_CREATE(handle, callback) { handle = CreateThread( NULL, 0, callback, NULL, 0, NULL); }
#define THREAD_JOIN(handle) WaitForSingleObject(handle, INFINITE)
#define THREAD_CALLBACK(name) static DWORD WINAPI name( LPVOID lpParam )
#endif
static UA_Server* globalServer;
static volatile UA_Boolean running = true;
static void stopHandler(int sign)
{
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
running = false;
}
static UA_StatusCode helloWorld1MethodCallback(UA_Server *server,
const UA_NodeId *sessionId, void *sessionHandle,
const UA_NodeId *methodId, void *methodContext,
const UA_NodeId *objectId, void *objectContext,
size_t inputSize, const UA_Variant *input,
size_t outputSize, UA_Variant *output)
{
UA_String *inputStr = (UA_String*)input->data;
UA_String tmp = UA_STRING_ALLOC("Hello ");
if (inputStr->length > 0)
{
tmp.data = (UA_Byte *)UA_realloc(tmp.data, tmp.length + inputStr->length);
memcpy(&tmp.data[tmp.length], inputStr->data, inputStr->length);
tmp.length += inputStr->length;
}
UA_Variant_setScalarCopy(output, &tmp, &UA_TYPES[UA_TYPES_STRING]);
// add '\0' to string
char* test = (char*)calloc(1, tmp.length + 1);
memcpy(test, tmp.data, tmp.length);
UA_String_clear(&tmp);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "'Hello World1' was called");
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, " Data: %s", test);
free(test);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "'Hello World1' has ended");
return UA_STATUSCODE_GOOD;
}
static void addHellWorld1Method(UA_Server *server)
{
UA_Argument inputArgument;
UA_Argument_init(&inputArgument);
inputArgument.description = UA_LOCALIZEDTEXT("en-US", "A String");
inputArgument.name = UA_STRING("MyInput");
inputArgument.dataType = UA_TYPES[UA_TYPES_STRING].typeId;
inputArgument.valueRank = UA_VALUERANK_SCALAR;
UA_Argument outputArgument;
UA_Argument_init(&outputArgument);
outputArgument.description = UA_LOCALIZEDTEXT("en-US", "A String");
outputArgument.name = UA_STRING("MyOutput");
outputArgument.dataType = UA_TYPES[UA_TYPES_STRING].typeId;
outputArgument.valueRank = UA_VALUERANK_SCALAR;
UA_MethodAttributes helloAttr = UA_MethodAttributes_default;
helloAttr.description = UA_LOCALIZEDTEXT("en-US", "Say `Hello World1`");
helloAttr.displayName = UA_LOCALIZEDTEXT("en-US", "Hello World1");
helloAttr.executable = true;
helloAttr.userExecutable = true;
UA_NodeId id = UA_NODEID_NUMERIC(1, 62541);
UA_Server_addMethodNode(server, id,
UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT),
UA_QUALIFIEDNAME(1, "hello world1"),
helloAttr, &helloWorld1MethodCallback,
1, &inputArgument, 1, &outputArgument, NULL, NULL);
}
static UA_StatusCode helloWorld2MethodCallback(UA_Server *server,
const UA_NodeId *sessionId, void *sessionHandle,
const UA_NodeId *methodId, void *methodContext,
const UA_NodeId *objectId, void *objectContext,
size_t inputSize, const UA_Variant *input,
size_t outputSize, UA_Variant *output)
{
UA_String *inputStr = (UA_String*)input->data;
UA_String tmp = UA_STRING_ALLOC("Hello ");
if (inputStr->length > 0)
{
tmp.data = (UA_Byte *)UA_realloc(tmp.data, tmp.length + inputStr->length);
memcpy(&tmp.data[tmp.length], inputStr->data, inputStr->length);
tmp.length += inputStr->length;
}
UA_Variant_setScalarCopy(output, &tmp, &UA_TYPES[UA_TYPES_STRING]);
// add '\0' to string
char* test = (char*)calloc(1, tmp.length + 1);
memcpy(test, tmp.data, tmp.length);
UA_String_clear(&tmp);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "'Hello World2' was called");
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, " Data: %s", test);
free(test);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "'Hello World2' has ended");
return UA_STATUSCODE_GOOD;
}
static void addHellWorld2Method(UA_Server *server)
{
UA_Argument inputArgument;
UA_Argument_init(&inputArgument);
inputArgument.description = UA_LOCALIZEDTEXT("en-US", "A String");
inputArgument.name = UA_STRING("MyInput");
inputArgument.dataType = UA_TYPES[UA_TYPES_STRING].typeId;
inputArgument.valueRank = UA_VALUERANK_SCALAR;
UA_Argument outputArgument;
UA_Argument_init(&outputArgument);
outputArgument.description = UA_LOCALIZEDTEXT("en-US", "A String");
outputArgument.name = UA_STRING("MyOutput");
outputArgument.dataType = UA_TYPES[UA_TYPES_STRING].typeId;
outputArgument.valueRank = UA_VALUERANK_SCALAR;
UA_MethodAttributes helloAttr = UA_MethodAttributes_default;
helloAttr.description = UA_LOCALIZEDTEXT("en-US", "Say `Hello World2`");
helloAttr.displayName = UA_LOCALIZEDTEXT("en-US", "Hello World2");
helloAttr.executable = true;
helloAttr.userExecutable = true;
UA_NodeId id = UA_NODEID_NUMERIC(1, 62542);
UA_Server_addMethodNode(server, id,
UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT),
UA_QUALIFIEDNAME(1, "hello world2"),
helloAttr, &helloWorld2MethodCallback,
1, &inputArgument, 1, &outputArgument, NULL, NULL);
}
/* This callback will be called when a new entry is added to the Callrequest queue */
static void TestCallback(UA_Server *server)
{
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Dispatched an async method");
}
int main(void)
{
signal(SIGINT, stopHandler);
signal(SIGTERM, stopHandler);
globalServer = UA_Server_new();
UA_ServerConfig *config = UA_Server_getConfig(globalServer);
UA_ServerConfig_setDefault(config);
/* Set the NotifyCallback */
config->asyncOperationNotifyCallback = TestCallback;
/* Add methods */
addHellWorld1Method(globalServer);
addHellWorld2Method(globalServer);
UA_StatusCode retval = UA_Server_run(globalServer, &running);
UA_Server_delete(globalServer);
return retval == UA_STATUSCODE_GOOD ? EXIT_SUCCESS : EXIT_FAILURE;
}
代码比较简单,就是添加一个方法然后加一个回调,功能就是输入一个字符串"XXX",然后输出"Hello XXX"。
Hello World1方法的id是UA_NODEID_NUMERIC(1, 62541),Hello World2方法的id是UA_NODEID_NUMERIC(1, 62542)。可以先用UaExpert进行验证。
三 Client异步执行方法
open62541提供一个API用于异步执行,即__UA_Client_AsyncService(),其原型如下,
UA_StatusCode
__UA_Client_AsyncService(UA_Client *client, const void *request,
const UA_DataType *requestType,
UA_ClientAsyncServiceCallback callback,
const UA_DataType *responseType,
void *userdata, UA_UInt32 *requestId)
这是个低级接口,参数还是比较多的。
这里定义一个函数UA_Client_call_asyncMulti()来发起一个请求,该请求同时调用2个方法,如下,
static UA_StatusCode UA_Client_call_asyncMulti(UA_Client *client,
const UA_NodeId objectId1, const UA_NodeId methodId1, size_t inputSize1, const UA_Variant *input1,
const UA_NodeId objectId2, const UA_NodeId methodId2, size_t inputSize2, const UA_Variant *input2,
UA_ClientAsyncServiceCallback callback, void *userdata, UA_UInt32 *reqId)
{
UA_CallRequest request;
UA_CallRequest_init(&request);
UA_CallMethodRequest item[2];
UA_CallMethodRequest_init(&item[0]);
item[0].methodId = methodId1;
item[0].objectId = objectId1;
item[0].inputArguments = (UA_Variant *)(void*)(uintptr_t)input1; // cast const...
item[0].inputArgumentsSize = inputSize1;
UA_CallMethodRequest_init(&item[1]);
item[1].methodId = methodId2;
item[1].objectId = objectId2;
item[1].inputArguments = (UA_Variant *)(void*)(uintptr_t)input2; // cast const...
item[1].inputArgumentsSize = inputSize2;
request.methodsToCall = &item[0];
request.methodsToCallSize = 2; // 请求执行2个方法
return __UA_Client_AsyncService(client, &request,
&UA_TYPES[UA_TYPES_CALLREQUEST], callback,
&UA_TYPES[UA_TYPES_CALLRESPONSE], userdata, reqId);
}
该函数就是把方法的id和方法需要的参数放到请求里传给Server,注意,代码中的objectId 是方法的上一级node的id。
然后定义一个函数InitCallMulti()来调用UA_Client_call_asyncMulti(),如下,
static void InitCallMulti(UA_Client* client)
{
UA_UInt32 reqId = 0;
// 给Hello World1方法初始化输入参数
UA_Variant input1;
UA_Variant_init(&input1);
UA_String stringValue1 = UA_String_fromChars("boys (multi)");
UA_Variant_setScalar(&input1, &stringValue1, &UA_TYPES[UA_TYPES_STRING]);
// 给Hello World2方法初始化输入参数
UA_Variant input2;
UA_Variant_init(&input2);
UA_String stringValue2 = UA_String_fromChars("girls (multi)");
UA_Variant_setScalar(&input2, &stringValue2, &UA_TYPES[UA_TYPES_STRING]);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "**** Initiating CallRequest");
UA_Client_call_asyncMulti(client,
UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
UA_NODEID_NUMERIC(1, 62541), 1, &input1,
UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
UA_NODEID_NUMERIC(1, 62542), 1, &input2,
(UA_ClientAsyncServiceCallback)methodCalled, NULL, &reqId);
UA_String_clear(&stringValue1);
UA_String_clear(&stringValue2);
}
该函数会调用Server端的Hello World1方法和Hello World2方法,给Hello World1传递的参数是"boys (multi)",给Hello World2传递的参数是"girls (multi)"。
这里有个很重要的点要弄清楚,既然是异步执行,那么如何拿到方法执行的返回值呢?因为同步执行是在那死等返回值,那么异步执行怎么办呢?答案就是回调,代码中使用了一个回调函数,名叫methodCalled(),其定义如下,
static void methodCalled(UA_Client *client, void *userdata, UA_UInt32 requestId, UA_CallResponse *response)
{
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"**** CallRequest Response - Req:%u with %u results",
requestId, (UA_UInt32)response->resultsSize);
UA_UInt32 i;
UA_StatusCode retval = response->responseHeader.serviceResult;
if (retval == UA_STATUSCODE_GOOD)
{
for (i = 0; i < response->resultsSize; i++)
{
if (response->resultsSize >= i)
retval = response->results[i].statusCode;
else
retval = UA_STATUSCODE_BADUNEXPECTEDERROR;
if (retval != UA_STATUSCODE_GOOD)
{
UA_CallResponse_clear(response);
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"**** CallRequest Response - Req: %u (%u) failed", requestId, i);
if (i == response->resultsSize)
return;
else
continue;
}
/* Move the output arguments */
UA_Variant *output = response->results[i].outputArguments;
size_t outputSize = response->results[i].outputArgumentsSize;
response->results[i].outputArguments = NULL;
response->results[i].outputArgumentsSize = 0;
if (retval == UA_STATUSCODE_GOOD)
{
UA_String *outputStr = (UA_String*)output->data;
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"Method call was successful, returned %lu values: %.*s.\n",
(unsigned long)outputSize, outputStr->length, outputStr->data);
UA_Array_delete(output, outputSize, &UA_TYPES[UA_TYPES_VARIANT]);
}
else
{
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"Method call was unsuccessful, returned %x values.\n", retval);
}
}
}
else
{
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "**** CallRequest Response - Req:%u FAILED", requestId);
}
UA_CallResponse_clear(response);
}
代码也比较简单,就是把返回值个数和返回值打印出来,如果有错就打印错误信息。
四 Client整体代码及运行
代码来自example目录下的client_method_async.c,但是原始代码写的不好而且冗长,本人做了简化和调整,让整体结构变得简单,
#include "open62541.h"
#include <stdlib.h>
#include <signal.h>
UA_Boolean running = true;
static void methodCalled(UA_Client *client, void *userdata, UA_UInt32 requestId, UA_CallResponse *response)
{
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"**** CallRequest Response - Req:%u with %u results",
requestId, (UA_UInt32)response->resultsSize);
UA_UInt32 i;
UA_StatusCode retval = response->responseHeader.serviceResult;
if (retval == UA_STATUSCODE_GOOD)
{
for (i = 0; i < response->resultsSize; i++)
{
if (response->resultsSize >= i)
retval = response->results[i].statusCode;
else
retval = UA_STATUSCODE_BADUNEXPECTEDERROR;
if (retval != UA_STATUSCODE_GOOD)
{
UA_CallResponse_clear(response);
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"**** CallRequest Response - Req: %u (%u) failed", requestId, i);
if (i == response->resultsSize)
return;
else
continue;
}
/* Move the output arguments */
UA_Variant *output = response->results[i].outputArguments;
size_t outputSize = response->results[i].outputArgumentsSize;
response->results[i].outputArguments = NULL;
response->results[i].outputArgumentsSize = 0;
if (retval == UA_STATUSCODE_GOOD)
{
UA_String *outputStr = (UA_String*)output->data;
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"Method call was successful, returned %lu values: %.*s.\n",
(unsigned long)outputSize, outputStr->length, outputStr->data);
UA_Array_delete(output, outputSize, &UA_TYPES[UA_TYPES_VARIANT]);
}
else
{
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"Method call was unsuccessful, returned %x values.\n", retval);
}
}
}
else
{
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "**** CallRequest Response - Req:%u FAILED", requestId);
}
UA_CallResponse_clear(response);
}
static UA_StatusCode UA_Client_call_asyncMulti(UA_Client *client,
const UA_NodeId objectId1, const UA_NodeId methodId1, size_t inputSize1, const UA_Variant *input1,
const UA_NodeId objectId2, const UA_NodeId methodId2, size_t inputSize2, const UA_Variant *input2,
UA_ClientAsyncServiceCallback callback, void *userdata, UA_UInt32 *reqId)
{
UA_CallRequest request;
UA_CallRequest_init(&request);
UA_CallMethodRequest item[2];
UA_CallMethodRequest_init(&item[0]);
item[0].methodId = methodId1;
item[0].objectId = objectId1;
item[0].inputArguments = (UA_Variant *)(void*)(uintptr_t)input1; // cast const...
item[0].inputArgumentsSize = inputSize1;
UA_CallMethodRequest_init(&item[1]);
item[1].methodId = methodId2;
item[1].objectId = objectId2;
item[1].inputArguments = (UA_Variant *)(void*)(uintptr_t)input2; // cast const...
item[1].inputArgumentsSize = inputSize2;
request.methodsToCall = &item[0];
request.methodsToCallSize = 2;
return __UA_Client_AsyncService(client, &request,
&UA_TYPES[UA_TYPES_CALLREQUEST], callback,
&UA_TYPES[UA_TYPES_CALLRESPONSE], userdata, reqId);
}
static void InitCallMulti(UA_Client* client)
{
UA_UInt32 reqId = 0;
UA_Variant input1;
UA_Variant_init(&input1);
UA_String stringValue1 = UA_String_fromChars("boys (multi)");
UA_Variant_setScalar(&input1, &stringValue1, &UA_TYPES[UA_TYPES_STRING]);
UA_Variant input2;
UA_Variant_init(&input2);
UA_String stringValue2 = UA_String_fromChars("girls (multi)");
UA_Variant_setScalar(&input2, &stringValue2, &UA_TYPES[UA_TYPES_STRING]);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "**** Initiating CallRequest");
UA_Client_call_asyncMulti(client,
UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
UA_NODEID_NUMERIC(1, 62541), 1, &input1,
UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
UA_NODEID_NUMERIC(1, 62542), 1, &input2,
(UA_ClientAsyncServiceCallback)methodCalled, NULL, &reqId);
UA_String_clear(&stringValue1);
UA_String_clear(&stringValue2);
}
static void stopHandler(int sign)
{
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Received Ctrl-C");
running = 0;
}
static void deleteSubscriptionCallback(UA_Client *client, UA_UInt32 subscriptionId, void *subscriptionContext)
{
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"Subscription Id %u was deleted", subscriptionId);
}
static void subscriptionInactivityCallback(UA_Client *client, UA_UInt32 subId, void *subContext)
{
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Inactivity for subscription %u", subId);
}
static void cycleCallback(UA_Client *client, void *data)
{
InitCallMulti(client);
}
int main(int argc, char *argv[])
{
signal(SIGINT, stopHandler); /* catches ctrl-c */
UA_Client *client = UA_Client_new();
UA_ClientConfig *cc = UA_Client_getConfig(client);
UA_ClientConfig_setDefault(cc);
/* we use a high timeout because there may be other client and
* processing may take long if many method calls are waiting */
cc->timeout = 60000;
cc->subscriptionInactivityCallback = subscriptionInactivityCallback;
UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4840");
if (retval != UA_STATUSCODE_GOOD)
{
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"Not connected. Retrying to connect in 1 second");
UA_Client_delete(client);
return EXIT_SUCCESS;
}
UA_UInt64 callbackId = 0;
UA_Client_addRepeatedCallback(client, cycleCallback, NULL, 2000, &callbackId); // call every 2s
/* Endless loop runAsync */
while (running)
{
UA_Client_run_iterate(client, 100);
}
/* Clean up */
UA_Client_disconnect(client);
UA_Client_delete(client);
return EXIT_SUCCESS;
}
代码里使用UA_Client_addRepeatedCallback()每隔2秒调用一次InitCallMulti()
编译OK后执行,在Client端每隔2s可以看到如下的打印,
打印信息表示返回2个结果(因为一次请求中调用2个方法),并把返回结果打印出来,和期望的一样。
Server这边也有对应的打印,
五 总结
本文主要讲述Client端如何异步执行方法,其实原理也比较简单,就是把方法id和方法需要的参数放到请求里,然后发给Server,等Server有回复了就会执行回调函数,在回调函数里Client就可以拿到方法调用的返回值。