服务端异步应答示例

更新时间:
2024-12-19

服务端异步应答示例

本章节主要介绍使用中间件开发异步应答功能的方法。

开发须知

  • 服务端开发 的基础上,Mware 提供了上下文引用计数增加和引用计数减少功能,可用于做异步 RPC 应答时的资源保持;
  • 使用 Mware 做高并发场景下的 RPC 服务端时,需要使用链表等方式保存每一个调用的 resolve,代码实现可以参考 开发示例

开发示例

在本示例中,服务端执行以下流程:

  1. 打开“/tmp/log_file”文件用于存储日志信息。
  2. 创建 VSOA Mware,绑定两个回调函数 log_callbackmsg_callback 到 URL /light 上。
  3. 创建 VSOA 服务器 light_server。当创建成功时,服务端会打印 Started VSOA stream server
  4. 创建 VSOA 客户端,连接服务端 msg_server。连接成功时,会打印 On connect, connect...
  5. 监听 RPC URL /light,当 URL 被请求时,调用 log_callback 回调函数。
  6. log_callback 中,记录 URL 到日志文件,再向下一个回调函数传递数据。数据键为 log,值为 payload 中的 param 字段。最后,返回“true” 继续执行下一个回调。
  7. msg_callback 中,客户端句柄使用 rpc 请求调用 msg_server/warningLampStatus 接口,并将 resolve 加入到链表尾部,同时调用 vsoa_mware_ref_resolve 将 resolve 的引用计数增加。
  8. msg_server 服务端产生应答进入回调时,从链表头部取出 resolve,并将 resolve 中的 log 健中的值取出来打印。调用 vsoa_mware_reply_resolve 函数对当前的 /light RPC 执行应答,同时调用 vsoa_mware_unref_resolve 将当前的 resolve 引用计数减少。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "vsoa_list.h"
#include "vsoa_platform.h"
#include "vsoa_server.h"
#include "vsoa_cliauto.h"
#include "vsoa_mware.h"

static vsoa_server_t *server;
/* My server password */
#define MY_SERVER_PASSWD "123456"
/* My client */
static vsoa_client_t *client;

/* My client auto */
static vsoa_client_auto_t *cliauto;

/* Resolve list of client rpc call */
typedef struct mware_resolve_lst_s {
    struct mware_resolve_lst_s *prev;
    struct mware_resolve_lst_s *next;
    vsoa_mware_resolve_t       *resolve;
} mware_resolve_lst_t;

/* Resolve list */
static mware_resolve_lst_t     *resolve_list; 
static mware_resolve_lst_t     *resolve_list_tail;

/* Mutex lock of resolve list */
static pthread_mutex_t         resolve_lock;

static void on_lamp_status(void *arg, struct vsoa_client *client, vsoa_header_t *vsoa_hdr, vsoa_payload_t *payload)
{
    vsoa_payload_t        send;
    char                 *msg;
    vsoa_mware_resolve_t *resolve = (vsoa_mware_resolve_t *)arg;

    if (vsoa_hdr) {
        printf("On asynchronous RPC reply, payload: %.*s\n", (int)payload->param_len, payload->param);
    }
    else {
        fprintf(stderr, "VSOA server /warningLampStatus reply timeout!\n");
    }

    send.data = NULL;
    send.data_len = 0;
    send.param = payload->param;
    send.param_len = payload->param_len;

    pthread_mutex_lock(&resolve_lock);
    mware_resolve_lst_t* node = resolve_list;
    if (node == NULL) {
        fprintf(stderr, "clim_rpccall_cb: unknow error, resolve_list is null\n");
        pthread_mutex_unlock(&resolve_lock);
        return;
    }
    DELETE_FROM_FIFO(node, resolve_list, resolve_list_tail);
    pthread_mutex_unlock(&resolve_lock);

    // get message data from previous callback
    msg = vsoa_mware_get_resolve_data(node->resolve, "log");
    if (msg) {
        printf("Log resolved data (key: \"log\") : %s\n", msg);
    }

    vsoa_mware_reply_resolve(node->resolve, 0, 0, &send);

    vsoa_mware_unref_resolve(node->resolve);

    free(node);
}

static void onconnect(void *arg, vsoa_client_auto_t *cliauto, bool connect, const char *info)
{
    printf("On connect, connect: %s, info: %s\n",
        (connect == true) ? "connected" : "disconnected", info);
}

static bool log_callback(void *arg, vsoa_server_t *server, vsoa_cli_id_t id, vsoa_header_t *vsoa_hdr,
    vsoa_url_t *url, vsoa_payload_t *payload, vsoa_mware_resolve_t *resolve)
{
    FILE *log_file = (FILE *)arg;
    char *msg;

    if (url->url_len) {
        // log url into file
        fprintf(log_file, "URL: %.*s\n", (int)url->url_len, url->url);
        fflush(log_file);

        // pass message data 'new resolved data' to next handler.
        msg = (char*)calloc(payload->param_len + 1, 1);
        if (msg == NULL) {
            fprintf(stderr, "log_callback: alloc msg failed, errno is %d\n", errno);
            return false;
        }
        memcpy(msg, payload->param, payload->param_len);
        vsoa_mware_add_resolve_data(resolve, "log", msg, (vsoa_mware_resolve_free_func_t)free);
    }

    // return true to continue processing next callback
    return true;
}

static bool msg_callback(void *arg, vsoa_server_t *server, vsoa_cli_id_t id, vsoa_header_t *vsoa_hdr,
    vsoa_url_t *url, vsoa_payload_t *payload, vsoa_mware_resolve_t *resolve)
{
    vsoa_url_t          callurl;
    mware_resolve_lst_t *node;
    uint32_t            seqno;

    callurl.url = "/warningLampStatus";
    callurl.url_len = strlen(callurl.url);
    seqno = vsoa_parser_get_seqno(vsoa_hdr);

    node = (mware_resolve_lst_t*)malloc(sizeof(mware_resolve_lst_t));
    if (node == NULL) {
        fprintf(stderr, "command_dispatch: alloc mware_resolve_lst_t failed, errno is %d\n", errno);
        return false;
    }

    node->resolve = resolve;
    pthread_mutex_lock(&resolve_lock);
    INSERT_TO_FIFO(node, resolve_list, resolve_list_tail);
    pthread_mutex_unlock(&resolve_lock);

    vsoa_client_call(client, VSOA_CLIENT_RPC_METHOD_GET, &callurl, NULL, on_lamp_status, NULL, NULL);
    vsoa_mware_ref_resolve(resolve);

    return false;
}

int main(int argc, char *argv[])
{
    vsoa_mware_t       *mware;
    vsoa_url_t          url;
    uint16_t            server_port = 3001;
    struct sockaddr_in  addr;
    int                 ret;

    fd_set              fds;
    int                 max_fd;
    int                 count;
    struct timespec     timeout;

    FILE               *log_file;

    pthread_mutex_init(&resolve_lock, NULL);
    resolve_list_tail = resolve_list;

    log_file = fopen("/tmp/log_file", "a");
    if (!log_file) {
        fprintf(stderr, "Can not open log_file!\n");
        return  (-1);
    }

    // set server listen address and port
    bzero(&addr, sizeof(struct sockaddr_in));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(server_port);
    addr.sin_addr.s_addr = INADDR_ANY;

    // create vsoa server
    server = vsoa_server_create("{\"name\":\"light_server\"}");
    if (!server) {
        fprintf(stderr, "Can not create VSOA server!\n");
        return -1;
    }

    // create vsoa client
    cliauto = vsoa_client_auto_create(NULL, NULL);
    client = vsoa_client_auto_handle(cliauto);

    if (!vsoa_client_auto_setup(cliauto, onconnect, NULL)) {
        vsoa_client_auto_delete(cliauto);
        fprintf(stderr, "Cannot register connect callback: %s (%d)\n", strerror(errno), errno);
        return -1;
    }

    /*
     * Client auto robot start
     * The robot will automatically connect to the specified server and maintain the connection.
     * At this time, the developer only needs to focus on the business.
     */
    vsoa_client_auto_start(cliauto, "vsoa://msg_server", MY_SERVER_PASSWD,
        NULL, 0, 1000, 1000, 1000);

    // create vsoa mware
    mware = vsoa_mware_create();

    // add /light listener
    url.url = "/light";
    url.url_len = strlen(url.url);
    vsoa_server_add_listener(server, &url, vsoa_mware_handler(mware), mware);

    // add mware rpc hook
    // 'log_callback' will be called first, if 'log_callback' returns 'true'
    // then 'msg_callback' will be called
    vsoa_mware_add_listener(mware, log_callback, log_file);
    vsoa_mware_add_listener(mware, msg_callback, NULL);

    // start vsoa server
    ret = vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
    if (!ret) {
        printf("Cannot start VSOA stream server.\n");
        vsoa_server_close(server);
        return -1;
    }

    printf("Started VSOA stream server.\n");

    // set timeout to 1s
    timeout.tv_sec = 1;
    timeout.tv_nsec = 0;

    // server event loop
    while (1) {
        FD_ZERO(&fds);
        max_fd = vsoa_server_fds(server, &fds);

        count = vsoa_select(max_fd + 1, &fds, NULL, NULL, &timeout);
        if (count > 0) {
            vsoa_server_input_fds(server, &fds);
        }
    }

    return 0;
}

说明
实例中客户端机器人连接的服务端 msg_server 可参考 RPC 服务端范例开发。

文档内容是否对您有所帮助?
有帮助
没帮助