学习open62541 --- [36] Client异步执行method


一 前言

上一篇文章讲了Server如何异步执行方法,本篇文章讲述Client如何异步执行Server提供的方法。


二 准备Server

先写个Server,提供2个功能一样的方法,名字叫Hello World1和Hello World2,下面是代码,

#include "open62541.h"

#include <signal.h>
#include <stdlib.h>

#ifndef WIN32
#include <pthread.h>
#define THREAD_HANDLE pthread_t
#define THREAD_CREATE(handle, callback) pthread_create(&handle, NULL, callback, NULL)
#define THREAD_JOIN(handle) pthread_join(handle, NULL)
#define THREAD_CALLBACK(name) static void * name(void *_)
#else
#include <windows.h>
#define THREAD_HANDLE HANDLE
#define THREAD_CREATE(handle, callback) { handle = CreateThread( NULL, 0, callback, NULL, 0, NULL); }
#define THREAD_JOIN(handle) WaitForSingleObject(handle, INFINITE)
#define THREAD_CALLBACK(name) static DWORD WINAPI name( LPVOID lpParam )
#endif

static UA_Server* globalServer;
static volatile UA_Boolean running = true;

static void stopHandler(int sign)
{
	UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
	running = false;
}

static UA_StatusCode helloWorld1MethodCallback(UA_Server *server,
	const UA_NodeId *sessionId, void *sessionHandle,
	const UA_NodeId *methodId, void *methodContext,
	const UA_NodeId *objectId, void *objectContext,
	size_t inputSize, const UA_Variant *input,
	size_t outputSize, UA_Variant *output)
{
	UA_String *inputStr = (UA_String*)input->data;

	UA_String tmp = UA_STRING_ALLOC("Hello ");
	if (inputStr->length > 0)
	{
		tmp.data = (UA_Byte *)UA_realloc(tmp.data, tmp.length + inputStr->length);
		memcpy(&tmp.data[tmp.length], inputStr->data, inputStr->length);
		tmp.length += inputStr->length;
	}

	UA_Variant_setScalarCopy(output, &tmp, &UA_TYPES[UA_TYPES_STRING]);

	// add '\0' to string
	char* test = (char*)calloc(1, tmp.length + 1);
	memcpy(test, tmp.data, tmp.length);
	UA_String_clear(&tmp);

	UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "'Hello World1' was called");
	UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "    Data: %s", test);
	free(test);
	UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "'Hello World1' has ended");

	return UA_STATUSCODE_GOOD;
}

static void addHellWorld1Method(UA_Server *server)
{
	UA_Argument inputArgument;
	UA_Argument_init(&inputArgument);
	inputArgument.description = UA_LOCALIZEDTEXT("en-US", "A String");
	inputArgument.name = UA_STRING("MyInput");
	inputArgument.dataType = UA_TYPES[UA_TYPES_STRING].typeId;
	inputArgument.valueRank = UA_VALUERANK_SCALAR;

	UA_Argument outputArgument;
	UA_Argument_init(&outputArgument);
	outputArgument.description = UA_LOCALIZEDTEXT("en-US", "A String");
	outputArgument.name = UA_STRING("MyOutput");
	outputArgument.dataType = UA_TYPES[UA_TYPES_STRING].typeId;
	outputArgument.valueRank = UA_VALUERANK_SCALAR;

	UA_MethodAttributes helloAttr = UA_MethodAttributes_default;
	helloAttr.description = UA_LOCALIZEDTEXT("en-US", "Say `Hello World1`");
	helloAttr.displayName = UA_LOCALIZEDTEXT("en-US", "Hello World1");
	helloAttr.executable = true;
	helloAttr.userExecutable = true;
	UA_NodeId id = UA_NODEID_NUMERIC(1, 62541);
	UA_Server_addMethodNode(server, id,
		UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
		UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT),
		UA_QUALIFIEDNAME(1, "hello world1"),
		helloAttr, &helloWorld1MethodCallback,
		1, &inputArgument, 1, &outputArgument, NULL, NULL);

}


static UA_StatusCode helloWorld2MethodCallback(UA_Server *server,
	const UA_NodeId *sessionId, void *sessionHandle,
	const UA_NodeId *methodId, void *methodContext,
	const UA_NodeId *objectId, void *objectContext,
	size_t inputSize, const UA_Variant *input,
	size_t outputSize, UA_Variant *output)
{
	UA_String *inputStr = (UA_String*)input->data;

	UA_String tmp = UA_STRING_ALLOC("Hello ");
	if (inputStr->length > 0)
	{
		tmp.data = (UA_Byte *)UA_realloc(tmp.data, tmp.length + inputStr->length);
		memcpy(&tmp.data[tmp.length], inputStr->data, inputStr->length);
		tmp.length += inputStr->length;
	}

	UA_Variant_setScalarCopy(output, &tmp, &UA_TYPES[UA_TYPES_STRING]);

	// add '\0' to string
	char* test = (char*)calloc(1, tmp.length + 1);
	memcpy(test, tmp.data, tmp.length);
	UA_String_clear(&tmp);

	UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "'Hello World2' was called");
	UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "    Data: %s", test);
	free(test);
	UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "'Hello World2' has ended");

	return UA_STATUSCODE_GOOD;
}

static void addHellWorld2Method(UA_Server *server)
{
	UA_Argument inputArgument;
	UA_Argument_init(&inputArgument);
	inputArgument.description = UA_LOCALIZEDTEXT("en-US", "A String");
	inputArgument.name = UA_STRING("MyInput");
	inputArgument.dataType = UA_TYPES[UA_TYPES_STRING].typeId;
	inputArgument.valueRank = UA_VALUERANK_SCALAR;

	UA_Argument outputArgument;
	UA_Argument_init(&outputArgument);
	outputArgument.description = UA_LOCALIZEDTEXT("en-US", "A String");
	outputArgument.name = UA_STRING("MyOutput");
	outputArgument.dataType = UA_TYPES[UA_TYPES_STRING].typeId;
	outputArgument.valueRank = UA_VALUERANK_SCALAR;

	UA_MethodAttributes helloAttr = UA_MethodAttributes_default;
	helloAttr.description = UA_LOCALIZEDTEXT("en-US", "Say `Hello World2`");
	helloAttr.displayName = UA_LOCALIZEDTEXT("en-US", "Hello World2");
	helloAttr.executable = true;
	helloAttr.userExecutable = true;
	UA_NodeId id = UA_NODEID_NUMERIC(1, 62542);
	UA_Server_addMethodNode(server, id,
		UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
		UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT),
		UA_QUALIFIEDNAME(1, "hello world2"),
		helloAttr, &helloWorld2MethodCallback,
		1, &inputArgument, 1, &outputArgument, NULL, NULL);

}


/* This callback will be called when a new entry is added to the Callrequest queue */
static void TestCallback(UA_Server *server)
{
	UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Dispatched an async method");
}

int main(void)
{
	signal(SIGINT, stopHandler);
	signal(SIGTERM, stopHandler);

	globalServer = UA_Server_new();
	UA_ServerConfig *config = UA_Server_getConfig(globalServer);
	UA_ServerConfig_setDefault(config);

	/* Set the NotifyCallback */
	config->asyncOperationNotifyCallback = TestCallback;


	/* Add methods */
	addHellWorld1Method(globalServer);
	addHellWorld2Method(globalServer);

	UA_StatusCode retval = UA_Server_run(globalServer, &running);


	UA_Server_delete(globalServer);
	return retval == UA_STATUSCODE_GOOD ? EXIT_SUCCESS : EXIT_FAILURE;
}

代码比较简单,就是添加一个方法然后加一个回调,功能就是输入一个字符串"XXX",然后输出"Hello XXX"。

Hello World1方法的id是UA_NODEID_NUMERIC(1, 62541),Hello World2方法的id是UA_NODEID_NUMERIC(1, 62542)。可以先用UaExpert进行验证。


三 Client异步执行方法

open62541提供一个API用于异步执行,即__UA_Client_AsyncService(),其原型如下,

UA_StatusCode
__UA_Client_AsyncService(UA_Client *client, const void *request,
                         const UA_DataType *requestType,
                         UA_ClientAsyncServiceCallback callback,
                         const UA_DataType *responseType,
                         void *userdata, UA_UInt32 *requestId)

这是个低级接口,参数还是比较多的。

这里定义一个函数UA_Client_call_asyncMulti()来发起一个请求,该请求同时调用2个方法,如下,

static UA_StatusCode UA_Client_call_asyncMulti(UA_Client *client,
    const UA_NodeId objectId1, const UA_NodeId methodId1, size_t inputSize1, const UA_Variant *input1,
	const UA_NodeId objectId2, const UA_NodeId methodId2, size_t inputSize2, const UA_Variant *input2,
    UA_ClientAsyncServiceCallback callback, void *userdata, UA_UInt32 *reqId) 
{
    UA_CallRequest request;
    UA_CallRequest_init(&request);
    UA_CallMethodRequest item[2];

    UA_CallMethodRequest_init(&item[0]);
    item[0].methodId = methodId1;
    item[0].objectId = objectId1;
    item[0].inputArguments = (UA_Variant *)(void*)(uintptr_t)input1; // cast const...
    item[0].inputArgumentsSize = inputSize1;

	UA_CallMethodRequest_init(&item[1]);
	item[1].methodId = methodId2;
	item[1].objectId = objectId2;
	item[1].inputArguments = (UA_Variant *)(void*)(uintptr_t)input2; // cast const...
	item[1].inputArgumentsSize = inputSize2;


    request.methodsToCall = &item[0];
    request.methodsToCallSize = 2; // 请求执行2个方法

    return __UA_Client_AsyncService(client, &request,
        &UA_TYPES[UA_TYPES_CALLREQUEST], callback,
        &UA_TYPES[UA_TYPES_CALLRESPONSE], userdata, reqId);
}

该函数就是把方法的id和方法需要的参数放到请求里传给Server,注意,代码中的objectId 是方法的上一级node的id。

然后定义一个函数InitCallMulti()来调用UA_Client_call_asyncMulti(),如下,

static void InitCallMulti(UA_Client* client) 
{
    UA_UInt32 reqId = 0;
    
    // 给Hello World1方法初始化输入参数
    UA_Variant input1;
    UA_Variant_init(&input1);
    UA_String stringValue1 = UA_String_fromChars("boys (multi)");
    UA_Variant_setScalar(&input1, &stringValue1, &UA_TYPES[UA_TYPES_STRING]);

    // 给Hello World2方法初始化输入参数
	UA_Variant input2;
	UA_Variant_init(&input2);
	UA_String stringValue2 = UA_String_fromChars("girls (multi)");
	UA_Variant_setScalar(&input2, &stringValue2, &UA_TYPES[UA_TYPES_STRING]);

    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "**** Initiating CallRequest");

	UA_Client_call_asyncMulti(client,
		UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
		UA_NODEID_NUMERIC(1, 62541), 1, &input1,
        UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
        UA_NODEID_NUMERIC(1, 62542), 1, &input2,
        (UA_ClientAsyncServiceCallback)methodCalled, NULL, &reqId);

    UA_String_clear(&stringValue1);
	UA_String_clear(&stringValue2);
}

该函数会调用Server端的Hello World1方法和Hello World2方法,给Hello World1传递的参数是"boys (multi)",给Hello World2传递的参数是"girls (multi)"。

这里有个很重要的点要弄清楚,既然是异步执行,那么如何拿到方法执行的返回值呢?因为同步执行是在那死等返回值,那么异步执行怎么办呢?答案就是回调,代码中使用了一个回调函数,名叫methodCalled(),其定义如下,

static void methodCalled(UA_Client *client, void *userdata, UA_UInt32 requestId, UA_CallResponse *response) 
{
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
        "**** CallRequest Response - Req:%u with %u results",
        requestId, (UA_UInt32)response->resultsSize);

    UA_UInt32 i;
    UA_StatusCode retval = response->responseHeader.serviceResult;
    if (retval == UA_STATUSCODE_GOOD) 
    {
        for (i = 0; i < response->resultsSize; i++) 
        {
            if (response->resultsSize >= i)
                retval = response->results[i].statusCode;
            else
                retval = UA_STATUSCODE_BADUNEXPECTEDERROR;

            if (retval != UA_STATUSCODE_GOOD) 
            {
                UA_CallResponse_clear(response);
                UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                    "**** CallRequest Response - Req: %u (%u) failed", requestId, i);
                
                if (i == response->resultsSize)
                    return;
                else
                    continue;
            }

            /* Move the output arguments */
            UA_Variant *output = response->results[i].outputArguments;
            size_t outputSize = response->results[i].outputArgumentsSize;
            response->results[i].outputArguments = NULL;
            response->results[i].outputArgumentsSize = 0;

            if (retval == UA_STATUSCODE_GOOD) 
            {
				UA_String *outputStr = (UA_String*)output->data;

				UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, 
					"Method call was successful, returned %lu values: %.*s.\n",
					(unsigned long)outputSize, outputStr->length, outputStr->data);

                UA_Array_delete(output, outputSize, &UA_TYPES[UA_TYPES_VARIANT]);
            }
            else 
            {
				UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, 
					"Method call was unsuccessful, returned %x values.\n", retval);
            }
        }
    }
    else
    {
        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "**** CallRequest Response - Req:%u FAILED", requestId);
    }

    UA_CallResponse_clear(response);

}

代码也比较简单,就是把返回值个数和返回值打印出来,如果有错就打印错误信息。


四 Client整体代码及运行

代码来自example目录下的client_method_async.c,但是原始代码写的不好而且冗长,本人做了简化和调整,让整体结构变得简单,

#include "open62541.h"

#include <stdlib.h>
#include <signal.h>

UA_Boolean running = true;


static void methodCalled(UA_Client *client, void *userdata, UA_UInt32 requestId, UA_CallResponse *response) 
{
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
        "**** CallRequest Response - Req:%u with %u results",
        requestId, (UA_UInt32)response->resultsSize);

    UA_UInt32 i;
    UA_StatusCode retval = response->responseHeader.serviceResult;
    if (retval == UA_STATUSCODE_GOOD) 
    {
        for (i = 0; i < response->resultsSize; i++) 
        {
            if (response->resultsSize >= i)
                retval = response->results[i].statusCode;
            else
                retval = UA_STATUSCODE_BADUNEXPECTEDERROR;

            if (retval != UA_STATUSCODE_GOOD) 
            {
                UA_CallResponse_clear(response);
                UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                    "**** CallRequest Response - Req: %u (%u) failed", requestId, i);
                
                if (i == response->resultsSize)
                    return;
                else
                    continue;
            }

            /* Move the output arguments */
            UA_Variant *output = response->results[i].outputArguments;
            size_t outputSize = response->results[i].outputArgumentsSize;
            response->results[i].outputArguments = NULL;
            response->results[i].outputArgumentsSize = 0;

            if (retval == UA_STATUSCODE_GOOD) 
            {
				UA_String *outputStr = (UA_String*)output->data;

				UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, 
					"Method call was successful, returned %lu values: %.*s.\n",
					(unsigned long)outputSize, outputStr->length, outputStr->data);

                UA_Array_delete(output, outputSize, &UA_TYPES[UA_TYPES_VARIANT]);
            }
            else 
            {
				UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, 
					"Method call was unsuccessful, returned %x values.\n", retval);
            }
        }
    }
    else
    {
        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "**** CallRequest Response - Req:%u FAILED", requestId);
    }

    UA_CallResponse_clear(response);

}


static UA_StatusCode UA_Client_call_asyncMulti(UA_Client *client,
    const UA_NodeId objectId1, const UA_NodeId methodId1, size_t inputSize1, const UA_Variant *input1,
	const UA_NodeId objectId2, const UA_NodeId methodId2, size_t inputSize2, const UA_Variant *input2,
    UA_ClientAsyncServiceCallback callback, void *userdata, UA_UInt32 *reqId) 
{
    UA_CallRequest request;
    UA_CallRequest_init(&request);
    UA_CallMethodRequest item[2];

    UA_CallMethodRequest_init(&item[0]);
    item[0].methodId = methodId1;
    item[0].objectId = objectId1;
    item[0].inputArguments = (UA_Variant *)(void*)(uintptr_t)input1; // cast const...
    item[0].inputArgumentsSize = inputSize1;

	UA_CallMethodRequest_init(&item[1]);
	item[1].methodId = methodId2;
	item[1].objectId = objectId2;
	item[1].inputArguments = (UA_Variant *)(void*)(uintptr_t)input2; // cast const...
	item[1].inputArgumentsSize = inputSize2;


    request.methodsToCall = &item[0];
    request.methodsToCallSize = 2;

    return __UA_Client_AsyncService(client, &request,
        &UA_TYPES[UA_TYPES_CALLREQUEST], callback,
        &UA_TYPES[UA_TYPES_CALLRESPONSE], userdata, reqId);
}


static void InitCallMulti(UA_Client* client) 
{
    UA_UInt32 reqId = 0;

    UA_Variant input1;
    UA_Variant_init(&input1);
    UA_String stringValue1 = UA_String_fromChars("boys (multi)");
    UA_Variant_setScalar(&input1, &stringValue1, &UA_TYPES[UA_TYPES_STRING]);

	UA_Variant input2;
	UA_Variant_init(&input2);
	UA_String stringValue2 = UA_String_fromChars("girls (multi)");
	UA_Variant_setScalar(&input2, &stringValue2, &UA_TYPES[UA_TYPES_STRING]);

    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "**** Initiating CallRequest");

	UA_Client_call_asyncMulti(client,
		UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
		UA_NODEID_NUMERIC(1, 62541), 1, &input1,
        UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
        UA_NODEID_NUMERIC(1, 62542), 1, &input2,
        (UA_ClientAsyncServiceCallback)methodCalled, NULL, &reqId);

    UA_String_clear(&stringValue1);
	UA_String_clear(&stringValue2);
}


static void stopHandler(int sign) 
{
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Received Ctrl-C");
    running = 0;
}

static void deleteSubscriptionCallback(UA_Client *client, UA_UInt32 subscriptionId, void *subscriptionContext) 
{
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
        "Subscription Id %u was deleted", subscriptionId);
}

static void subscriptionInactivityCallback(UA_Client *client, UA_UInt32 subId, void *subContext) 
{
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Inactivity for subscription %u", subId);
}



static void cycleCallback(UA_Client *client, void *data)
{
	InitCallMulti(client);
}


int main(int argc, char *argv[]) 
{
    signal(SIGINT, stopHandler); /* catches ctrl-c */

    UA_Client *client = UA_Client_new();
    UA_ClientConfig *cc = UA_Client_getConfig(client);
    UA_ClientConfig_setDefault(cc);

    /* we use a high timeout because there may be other client and
    * processing may take long if many method calls are waiting */
    cc->timeout = 60000;

    cc->subscriptionInactivityCallback = subscriptionInactivityCallback;

    UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4840");
    if (retval != UA_STATUSCODE_GOOD) 
    {
        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
            "Not connected. Retrying to connect in 1 second");

        UA_Client_delete(client);
        return EXIT_SUCCESS;
    }

	UA_UInt64 callbackId = 0;
	UA_Client_addRepeatedCallback(client, cycleCallback, NULL, 2000, &callbackId); // call every 2s


    /* Endless loop runAsync */
    while (running) 
    {
        UA_Client_run_iterate(client, 100);
    }

    /* Clean up */
    UA_Client_disconnect(client);
    UA_Client_delete(client);
    
    return EXIT_SUCCESS;
}

代码里使用UA_Client_addRepeatedCallback()每隔2秒调用一次InitCallMulti()

编译OK后执行,在Client端每隔2s可以看到如下的打印,
在这里插入图片描述
打印信息表示返回2个结果(因为一次请求中调用2个方法),并把返回结果打印出来,和期望的一样。
Server这边也有对应的打印,
在这里插入图片描述


五 总结

本文主要讲述Client端如何异步执行方法,其实原理也比较简单,就是把方法id和方法需要的参数放到请求里,然后发给Server,等Server有回复了就会执行回调函数,在回调函数里Client就可以拿到方法调用的返回值。


版权声明:本文为whahu1989原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。