服务端异步应答示例
本章节主要介绍使用中间件开发异步应答功能的方法。
开发须知
- 在 服务端开发 的基础上,Mware 提供了上下文引用计数增加和引用计数减少功能,可用于做异步 RPC 应答时的资源保持;
- 使用 Mware 做高并发场景下的 RPC 服务端时,需要使用链表等方式保存每一个调用的 resolve,代码实现可以参考 开发示例。
开发示例
在本示例中,服务端执行以下流程:
- 打开“/tmp/log_file”文件用于存储日志信息。
- 创建 VSOA Mware,绑定两个回调函数
log_callback
和msg_callback
到 URL/light
上。 - 创建 VSOA 服务器
light_server
。当创建成功时,服务端会打印Started VSOA stream server
。 - 创建 VSOA 客户端,连接服务端
msg_server
。连接成功时,会打印On connect, connect...
。 - 监听 RPC URL
/light
,当 URL 被请求时,调用log_callback
回调函数。 - 在
log_callback
中,记录 URL 到日志文件,再向下一个回调函数传递数据。数据键为log
,值为 payload 中的 param 字段。最后,返回“true” 继续执行下一个回调。 - 在
msg_callback
中,客户端句柄使用 rpc 请求调用msg_server
的/warningLampStatus
接口,并将 resolve 加入到链表尾部,同时调用vsoa_mware_ref_resolve
将 resolve 的引用计数增加。 - 当
msg_server
服务端产生应答进入回调时,从链表头部取出 resolve,并将 resolve 中的log
健中的值取出来打印。调用vsoa_mware_reply_resolve
函数对当前的/light
RPC 执行应答,同时调用vsoa_mware_unref_resolve
将当前的 resolve 引用计数减少。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_list.h"
#include "vsoa_platform.h"
#include "vsoa_server.h"
#include "vsoa_cliauto.h"
#include "vsoa_mware.h"
static vsoa_server_t *server;
/* My server password */
#define MY_SERVER_PASSWD "123456"
/* My client */
static vsoa_client_t *client;
/* My client auto */
static vsoa_client_auto_t *cliauto;
/* Resolve list of client rpc call */
typedef struct mware_resolve_lst_s {
struct mware_resolve_lst_s *prev;
struct mware_resolve_lst_s *next;
vsoa_mware_resolve_t *resolve;
} mware_resolve_lst_t;
/* Resolve list */
static mware_resolve_lst_t *resolve_list;
static mware_resolve_lst_t *resolve_list_tail;
/* Mutex lock of resolve list */
static pthread_mutex_t resolve_lock;
static void on_lamp_status(void *arg, struct vsoa_client *client, vsoa_header_t *vsoa_hdr, vsoa_payload_t *payload)
{
vsoa_payload_t send;
char *msg;
vsoa_mware_resolve_t *resolve = (vsoa_mware_resolve_t *)arg;
if (vsoa_hdr) {
printf("On asynchronous RPC reply, payload: %.*s\n", (int)payload->param_len, payload->param);
}
else {
fprintf(stderr, "VSOA server /warningLampStatus reply timeout!\n");
}
send.data = NULL;
send.data_len = 0;
send.param = payload->param;
send.param_len = payload->param_len;
pthread_mutex_lock(&resolve_lock);
mware_resolve_lst_t* node = resolve_list;
if (node == NULL) {
fprintf(stderr, "clim_rpccall_cb: unknow error, resolve_list is null\n");
pthread_mutex_unlock(&resolve_lock);
return;
}
DELETE_FROM_FIFO(node, resolve_list, resolve_list_tail);
pthread_mutex_unlock(&resolve_lock);
// get message data from previous callback
msg = vsoa_mware_get_resolve_data(node->resolve, "log");
if (msg) {
printf("Log resolved data (key: \"log\") : %s\n", msg);
}
vsoa_mware_reply_resolve(node->resolve, 0, 0, &send);
vsoa_mware_unref_resolve(node->resolve);
free(node);
}
static void onconnect(void *arg, vsoa_client_auto_t *cliauto, bool connect, const char *info)
{
printf("On connect, connect: %s, info: %s\n",
(connect == true) ? "connected" : "disconnected", info);
}
static bool log_callback(void *arg, vsoa_server_t *server, vsoa_cli_id_t id, vsoa_header_t *vsoa_hdr,
vsoa_url_t *url, vsoa_payload_t *payload, vsoa_mware_resolve_t *resolve)
{
FILE *log_file = (FILE *)arg;
char *msg;
if (url->url_len) {
// log url into file
fprintf(log_file, "URL: %.*s\n", (int)url->url_len, url->url);
fflush(log_file);
// pass message data 'new resolved data' to next handler.
msg = (char*)calloc(payload->param_len + 1, 1);
if (msg == NULL) {
fprintf(stderr, "log_callback: alloc msg failed, errno is %d\n", errno);
return false;
}
memcpy(msg, payload->param, payload->param_len);
vsoa_mware_add_resolve_data(resolve, "log", msg, (vsoa_mware_resolve_free_func_t)free);
}
// return true to continue processing next callback
return true;
}
static bool msg_callback(void *arg, vsoa_server_t *server, vsoa_cli_id_t id, vsoa_header_t *vsoa_hdr,
vsoa_url_t *url, vsoa_payload_t *payload, vsoa_mware_resolve_t *resolve)
{
vsoa_url_t callurl;
mware_resolve_lst_t *node;
uint32_t seqno;
callurl.url = "/warningLampStatus";
callurl.url_len = strlen(callurl.url);
seqno = vsoa_parser_get_seqno(vsoa_hdr);
node = (mware_resolve_lst_t*)malloc(sizeof(mware_resolve_lst_t));
if (node == NULL) {
fprintf(stderr, "command_dispatch: alloc mware_resolve_lst_t failed, errno is %d\n", errno);
return false;
}
node->resolve = resolve;
pthread_mutex_lock(&resolve_lock);
INSERT_TO_FIFO(node, resolve_list, resolve_list_tail);
pthread_mutex_unlock(&resolve_lock);
vsoa_client_call(client, VSOA_CLIENT_RPC_METHOD_GET, &callurl, NULL, on_lamp_status, NULL, NULL);
vsoa_mware_ref_resolve(resolve);
return false;
}
int main(int argc, char *argv[])
{
vsoa_mware_t *mware;
vsoa_url_t url;
uint16_t server_port = 3001;
struct sockaddr_in addr;
int ret;
fd_set fds;
int max_fd;
int count;
struct timespec timeout;
FILE *log_file;
pthread_mutex_init(&resolve_lock, NULL);
resolve_list_tail = resolve_list;
log_file = fopen("/tmp/log_file", "a");
if (!log_file) {
fprintf(stderr, "Can not open log_file!\n");
return (-1);
}
// set server listen address and port
bzero(&addr, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(server_port);
addr.sin_addr.s_addr = INADDR_ANY;
// create vsoa server
server = vsoa_server_create("{\"name\":\"light_server\"}");
if (!server) {
fprintf(stderr, "Can not create VSOA server!\n");
return -1;
}
// create vsoa client
cliauto = vsoa_client_auto_create(NULL, NULL);
client = vsoa_client_auto_handle(cliauto);
if (!vsoa_client_auto_setup(cliauto, onconnect, NULL)) {
vsoa_client_auto_delete(cliauto);
fprintf(stderr, "Cannot register connect callback: %s (%d)\n", strerror(errno), errno);
return -1;
}
/*
* Client auto robot start
* The robot will automatically connect to the specified server and maintain the connection.
* At this time, the developer only needs to focus on the business.
*/
vsoa_client_auto_start(cliauto, "vsoa://msg_server", MY_SERVER_PASSWD,
NULL, 0, 1000, 1000, 1000);
// create vsoa mware
mware = vsoa_mware_create();
// add /light listener
url.url = "/light";
url.url_len = strlen(url.url);
vsoa_server_add_listener(server, &url, vsoa_mware_handler(mware), mware);
// add mware rpc hook
// 'log_callback' will be called first, if 'log_callback' returns 'true'
// then 'msg_callback' will be called
vsoa_mware_add_listener(mware, log_callback, log_file);
vsoa_mware_add_listener(mware, msg_callback, NULL);
// start vsoa server
ret = vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
if (!ret) {
printf("Cannot start VSOA stream server.\n");
vsoa_server_close(server);
return -1;
}
printf("Started VSOA stream server.\n");
// set timeout to 1s
timeout.tv_sec = 1;
timeout.tv_nsec = 0;
// server event loop
while (1) {
FD_ZERO(&fds);
max_fd = vsoa_server_fds(server, &fds);
count = vsoa_select(max_fd + 1, &fds, NULL, NULL, &timeout);
if (count > 0) {
vsoa_server_input_fds(server, &fds);
}
}
return 0;
}
说明:
实例中客户端机器人连接的服务端msg_server
可参考 RPC 服务端范例开发。