数据同步

更新时间:
2024-12-19

数据同步

本节内容介绍 VSOA 客户端重连后与服务端数据同步的开发方法。

开发须知

数据同步是指在数据发布与订阅场景中,当客户端因故障断开重连后,需要立即获取当前最新数据的需求。

如本章开发示例中的 axis_server 陀螺仪服务,若产生故障断开重连,则需要客户端上线后立即获取 /axis 的最新状态,以保证数据的一致性。通常情况下,客户端会在断开重连后,主动发起一次 RPC 请求以获取数据的最新状态,但如果需要获取的数据量较大,则会给代码编程带来更多的复杂性。此时,可以使用客户端机器人带有的自动数据同步接口进行处理。

常用接口

bool vsoa_client_auto_consistent(vsoa_client_auto_t *cliauto, char *const urls[], int url_cnt, unsigned int rpc_timeout);

说明:

发布与订阅客户端的详细接口说明可参考 VSOA C 语言编程手册。您也可以使用 VSOA 客户端机器人 接口完成主题订阅,关于机器人接口的更多内容可参见 VSOA C 扩展编程手册

开发示例

如本章开发示例中的 axis_server 陀螺仪服务,若客户端产生故障断开重连,则需要调用自动数据同步接口 vsoa_client_auto_consistent

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#ifdef SYLIXOS
#include <sys/vproc.h>
#endif
#include "vsoa_cliauto.h"

/* My server password */
#define MY_SERVER_PASSWD "123456"

/* My client */
static vsoa_client_t *client;

/* My client auto */
static vsoa_client_auto_t *cliauto;

/* My subscribe (string pointer array) */
static char *sub_urls[] = { "/axis" };

static void onconnect (void *arg, vsoa_client_auto_t *cliauto, bool connect, const char *info)
{
    printf("On connect, connect: %s, info: %s\n", 
            (connect == true) ? "connected" : "disconnected", info);
}

/*
 * On subscribed messages received
 */
static void onmessage (void *arg, struct vsoa_client *client, vsoa_url_t *url, vsoa_payload_t *payload, bool quick)
{
    printf("On message, URL: %.*s payload: %.*s\n",
           (int)url->url_len, url->url, (int)payload->param_len, payload->param);
}

/*
 * main function
 */
int main (int argc, char **argv)
{
#ifdef SYLIXOS
    vprocExitModeSet(getpid(), LW_VPROC_EXIT_FORCE);
#endif

    /*
     * Create client auto robot
     */
    cliauto = vsoa_client_auto_create(onmessage, NULL);
    client  = vsoa_client_auto_handle(cliauto);

    if (!vsoa_client_auto_setup(cliauto, onconnect, NULL)) {
        vsoa_client_auto_delete(cliauto);
        fprintf(stderr, "Cannot register connect callback: %s (%d)\n", strerror(errno), errno);
        return -1;
    }

    vsoa_client_auto_consistent(cliauto, sub_urls, 1, 1000);

    /*
     * Client auto robot start
     * The robot will automatically connect to the specified server and maintain the connection.
     * At this time, the developer only needs to focus on the business.
     */
    vsoa_client_auto_start(cliauto, "vsoa://axis_server", MY_SERVER_PASSWD,
                           (char * const)sub_urls, 1, 1000, 1000, 1000);

    while (true) {
        sleep(1);
    }
}

同时,服务端应当实现 /axis 的 RPC 接口,用于客户端获取当前陀螺仪数据。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_server.h"
#include "vsoa_platform.h"

#define MY_SERVER_PASSWD "123456"

static vsoa_server_t *server;

static int roll = 1, pitch = 1, yaw = 1;

/*
 * Publish axis thread
 */
static void *publish_axis_thread (void *arg)
{
    vsoa_url_t url;
    vsoa_payload_t payload;
    char param[100];

    url.url     = "/axis";
    url.url_len = strlen(url.url);

    payload.data = NULL;
    payload.data_len  = 0;
    payload.param = param;

    while (1) {
        sleep(1);

        if (!vsoa_server_is_subscribed(server, &url)) {
            continue;
        }

        payload.param_len = sprintf(param,
                                    "{\"roll\": %d, \"pitch\": %d, \"yaw\": %d}",
                                    roll++, pitch++, yaw++);

        vsoa_server_publish(server, &url, &payload);
    }

    return (NULL);
}

static void command_axis (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
                          vsoa_header_t *vsoa_hdr, vsoa_url_t *url,
                          vsoa_payload_t *payload)
{
    vsoa_payload_t send;
    char param[100];
    uint32_t  seqno = vsoa_parser_get_seqno(vsoa_hdr);

    sprintf(param, "{\"roll\": %d, \"pitch\": %d, \"yaw\": %d}",
            roll, pitch, yaw);

    send.data = NULL;
    send.data_len = 0;
    send.param = param;
    send.param_len = strlen(send.param);

    vsoa_server_cli_reply(server, cid, 0, seqno, 0, &send);
}

int main (int argc, char **argv)
{
    int cnt, max_fd;
    fd_set fds;
    pthread_t id;
    struct sockaddr_in addr;
    struct timespec timeout = { 1, 0 };

    bzero(&addr, sizeof(struct sockaddr_in));
    addr.sin_family      = AF_INET;
    addr.sin_port        = htons(3002);
    addr.sin_addr.s_addr = INADDR_ANY;

#ifdef VSOA_HAS_SIN_LEN
    addr.sin_len = sizeof(struct sockaddr_in);
#endif

    /*
    * Initialize server
    */
    server = vsoa_server_create("{\"name\":\"axis_server\"}");
    if (!server) {
        fprintf(stderr, "Can not create VSOA server!\n");
        return  (-1);
    }

    /*
    * If need password
    */
    vsoa_server_passwd(server, MY_SERVER_PASSWD);

    /*
    * Add /axis listener
    */
    url.url     = "/axis";
    url.url_len = strlen(url.url);
    vsoa_server_add_listener(server, &url, command_axis, NULL);

    /*
    * Start server
    */
    if (!vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {
        vsoa_server_close(server);
        fprintf(stderr, "Can not start VSOA server!\n");
        return  (-1);
    }

    /*
    * Create publish thread
    */
    pthread_create(&id, NULL, publish_axis_thread, NULL);

    while (1) {
        FD_ZERO(&fds);
        max_fd = vsoa_server_fds(server, &fds);

        cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
        if (cnt > 0) {
            vsoa_server_input_fds(server, &fds);
        }
    }

    return (0);
}

文档内容是否对您有所帮助?
有帮助
没帮助