VSOA Broker

更新时间:
2024-12-19

VSOA Broker

本节主要介绍 VSOA Broker 的实现方法和使用示例。

了解 VSOA Broker

VSOA Broker 是一种采用数据中心的发布-订阅模型,负责在发布者和订阅者之间传输数据,并对数据进行缓存和路由,可以广泛应用于各种分布式系统中。

通信模型

主要特点

集中处理: VSOA Broker 能够集中处理所有的订阅请求,而无需关心发布者的具体地址。当发布者的地址发生变化时,订阅者不受到影响,实现通信双方的解耦。

数据传输与缓存: VSOA Broker 在订阅者和发布者之间起到桥梁的作用,负责数据的传输。同时,还可以对数据进行缓存,以便在需要时快速提供数据,提高系统的响应速度。

数据路由: VSOA Broker 能够根据数据的资源标识 (URL) 将数据路由到相应的订阅者,确保数据能够准确地被需要的订阅者接收。

程序遗言: VSOA Broker 支持程序遗言的功能,当发布者因为某种原因(如资源不足、严重错误、用户请求等)即将退出时,可以向 VSOA Broker 设置遗言。VSOA Broker 把程序遗言发布给订阅者,帮助订阅者了解发布程序退出原因。

VSOA Broker 的应用场景非常广泛,尤其适用于物联网、航空航天等分布式系统。可以用于连接智能设备和各种传感器,实现数据的实时采集和处理。

开发常用接口

vsoa_server_t *vsoa_server_create(const char *info_json);
void vsoa_server_on_cli(vsoa_server_t *server, vsoa_server_cli_func_t oncli, void *arg);
void vsoa_server_on_datagram(vsoa_server_t *server, vsoa_server_dat_func_t callback, void *arg);
bool vsoa_client_datagram(vsoa_client_t *client, const vsoa_url_t *url, const vsoa_payload_t *payload);
bool vsoa_server_add_listener(vsoa_server_t *server, const vsoa_url_t *url, vsoa_server_cmd_func_t callback, void *arg);
bool vsoa_client_call(vsoa_client_t *client, int method, const vsoa_url_t *url, const vsoa_payload_t *payload, vsoa_client_rpc_func_t callback, void *arg, const struct timespec *timeout);
bool vsoa_client_multi_subscribe(vsoa_client_t *client, char *const urls[], int cnt, vsoa_client_res_func_t callback, void *arg, const struct timespec *timeout);
bool vsoa_server_publish(vsoa_server_t *server, const vsoa_url_t *url, const vsoa_payload_t *payload);

说明:

详细接口说明可参考以下手册:

开发示例

  1. 数据发布者连接 broker,并通过 RPC Set 设置 lastwords 。创建数据采集和发送线程,通过 vsoa_client_datagram 接口发送采集的数据。
  2. broker 保存 lastwords 数据,当数据发布者断开连接时,向 lastwords 的订阅者发布 lastwords 数据。broker 同时接收数据发布者发送的数据,并向订阅者发布。
  3. 数据订阅者连接 broker,并订阅 /lastwords/axis

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <getopt.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "vsoa_list.h"
#include "vsoa_server.h"

/* Broker version */
#define BROKER_VERSION "0.0.1"
#define BROKER_DEFAULT_PORT    3006

/* Broker RPC define */
#define BROKER_RPC_IDX_LASTWORDS        0
#define BROKER_RPC_URL_LASTWORDS        "/lastwords"
#define BROKER_RPC_IDX_MAX              1
#define VSOA_URL_SET(_url, _url_str, _url_len) \
        {(_url)->url = _url_str; (_url)->url_len = _url_len;}

#define VSOA_PAYLOAD_SET(p1, _param, _param_len, _data, _data_len) \
        {(p1)->param = (_param); (p1)->param_len = (_param_len); \
         (p1)->data = (_data);   (p1)->data_len = (_data_len); }

/* Client last words */
typedef struct {
    vsoa_url_t url;
    vsoa_payload_t payload;
} broker_cli_lastword_t;

/* This broker client */
typedef struct broker_cli {
    struct broker_cli *next;
    struct broker_cli *prev;
    vsoa_cli_id_t cid;
    broker_cli_lastword_t lastwords;
} broker_cli_t;

/* All clients with a last words */
static broker_cli_t *broker_clis;

/* This broker server */
static vsoa_server_t *broker;

/* Broker RPC URLs */
static vsoa_url_t broker_rpc_urls[BROKER_RPC_IDX_MAX];

/*
 * Last words clean
 */
static void broker_lastwords_clean (broker_cli_lastword_t *lw)
{
    if (lw->url.url) {
        free(lw->url.url);
    }
    VSOA_URL_SET(&lw->url, NULL, 0);

    if (lw->payload.param) {
        free(lw->payload.param);
    }
    if (lw->payload.data) {
        free(lw->payload.data);
    }
    VSOA_PAYLOAD_SET(&lw->payload, NULL, 0, NULL, 0);
}

/*
 * Last words copy
 */
static bool broker_lastwords_copy (broker_cli_lastword_t *lw,
                                   const vsoa_url_t *url,
                                   const vsoa_payload_t *payload)
{
    VSOA_URL_SET(&lw->url, NULL, 0);
    VSOA_PAYLOAD_SET(&lw->payload, NULL, 0, NULL, 0);

    if (url && url->url_len) {
        lw->url.url = (char *)malloc(url->url_len);
        if (!lw->url.url) {
            goto    nomem;
        }

        memcpy(lw->url.url, url->url, url->url_len);
        lw->url.url_len = url->url_len;
    }

    if (payload) {
        if (payload->param_len) {
            lw->payload.param = (char *)malloc(payload->param_len);
            if (!lw->payload.param) {
                goto    nomem;
            }
            memcpy(lw->payload.param, payload->param, payload->param_len);
            lw->payload.param_len = payload->param_len;
        }

        if (payload->data_len) {
            lw->payload.data = (char *)malloc(payload->data_len);
            if (!lw->payload.data) {
                goto    nomem;
            }
            memcpy(lw->payload.data, payload->data, payload->data_len);
            lw->payload.data_len = payload->data_len;
        }
    }

    return  (true);

nomem:
    broker_lastwords_clean(lw);

    return  (false);
}

/*
 * Client find
 */
static broker_cli_t *broker_cli_find (vsoa_cli_id_t cid)
{
    broker_cli_t *cli;

    FOREACH_FROM_LIST (cli, broker_clis) {
        if (cli->cid == cid) {
            break;
        }
    }

    return  (cli);
}

/*
 * Create client
 */
static broker_cli_t *broker_cli_create (vsoa_cli_id_t cid)
{
    broker_cli_t *cli;

    cli = (broker_cli_t *)malloc(sizeof(broker_cli_t));
    if (cli) {
        bzero(cli, sizeof(broker_cli_t));
        cli->cid = cid;
        INSERT_TO_HEADER(cli, broker_clis);
    }

    return  (cli);
}

/*
 * Delete client
 */
static void broker_cli_delete (broker_cli_t *cli)
{
    DELETE_FROM_LIST(cli, broker_clis);
    broker_lastwords_clean(&cli->lastwords);
    free(cli);
}

/*
 * Client RPC set last words
 */
static void broker_rpc_lastwords (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
                                  vsoa_header_t *vsoa_hdr, vsoa_url_t *url, vsoa_payload_t *payload)
{
    broker_cli_t *cli;
    broker_cli_lastword_t lw;

    if (!broker_lastwords_copy(&lw, url, payload)) {
        vsoa_server_cli_reply(broker, cid, VSOA_STATUS_NO_MEMORY,
                              vsoa_parser_get_seqno(vsoa_hdr), 0, payload);
        return;
    }

    cli = broker_cli_find(cid);
    if (cli) {
        broker_lastwords_clean(&cli->lastwords);
    } else {
        cli = broker_cli_create(cid);
        if (!cli) {
            broker_lastwords_clean(&lw);
            vsoa_server_cli_reply(broker, cid, VSOA_STATUS_NO_MEMORY,
                                  vsoa_parser_get_seqno(vsoa_hdr), 0, payload);
            return;
        }
    }

    cli->lastwords = lw;

    vsoa_server_cli_reply(broker, cid, 0, vsoa_parser_get_seqno(vsoa_hdr), 0, NULL);
}

/*
 * Client connect and disconnect
 */
static void broker_on_cli (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid, bool connect)
{
    broker_cli_t *cli;
    broker_cli_lastword_t *lw;
    if (connect) {
        return;
    }

    cli = broker_cli_find(cid);
    if (cli) {
        lw = &cli->lastwords;
        if (lw->url.url) {
            vsoa_server_publish(broker, &lw->url, &lw->payload);
        }
        broker_cli_delete(cli);
    }
}

/*
 * Client datagram
 */
static void broker_on_datagram (void *arg, vsoa_server_t *server, vsoa_cli_id_t id,
                                vsoa_url_t *url, vsoa_payload_t *payload, bool quick)
{
    if (quick) {
        vsoa_server_quick_publish(broker, url, payload);
    } else {
        vsoa_server_publish(broker, url, payload);
    }
}

/*
 * broker usage.
 */
static void broker_usage (void)
{
    printf("USAGE: broker -p port   Set port\n"
           "              -P passwd Set password\n"
           "              -v        Show version\n"
           "              -h        Show this message\n");
}

/*
 * broker main
 */
int main (int argc, char **argv)
{
    int ch, cnt, max_fd = 0;
    int port = BROKER_DEFAULT_PORT;
    fd_set fds;
    char *passwd = NULL;
    char opt[] = "vhp:P:";
    struct sockaddr_in addr;

    while ((ch = getopt(argc, argv, opt)) != -1) {
        switch (ch) {

        case 'p':
            port = atoi(optarg);
            break;

        case 'P':
            passwd = strdup(optarg);
            break;

        case 'h':
            broker_usage();
            return  (0);

        case 'v':
            printf("%s\n", BROKER_VERSION);
            return  (0);

        default:
            fprintf(stderr, "Unknown argument: %c\n", (char)ch);
            break;
        }
    }

    if (port < 1 || port > 65535) {
        fprintf(stderr, "`port` invalid!\n");
        return  (-1);
    }

    bzero(&addr, sizeof(struct sockaddr_in));
    addr.sin_family      = AF_INET;
    addr.sin_port        = htons((unsigned short)port);
    addr.sin_addr.s_addr = INADDR_ANY;

#ifdef VSOA_HAS_SIN_LEN
    addr.sin_len = sizeof(struct sockaddr_in);
#endif

    broker = vsoa_server_create("{\"name\":\"VSOA broker\"}");
    if (!broker) {
        fprintf(stderr, "Can not create VSOA broker!\n");
        return  (-1);
    }

    if (passwd) {
        vsoa_server_passwd(broker, passwd);
    }

    vsoa_server_on_cli(broker, broker_on_cli, NULL);
    vsoa_server_on_datagram(broker, broker_on_datagram, NULL);

    broker_rpc_urls[BROKER_RPC_IDX_LASTWORDS].url     = BROKER_RPC_URL_LASTWORDS;
    broker_rpc_urls[BROKER_RPC_IDX_LASTWORDS].url_len = strlen(BROKER_RPC_URL_LASTWORDS);
    vsoa_server_add_listener(broker, &broker_rpc_urls[BROKER_RPC_IDX_LASTWORDS],
                             broker_rpc_lastwords, NULL);

    if (!vsoa_server_start(broker,
                           (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {
        vsoa_server_close(broker);
        fprintf(stderr, "Can not start VSOA broker!\n");
        return  (-1);
    }

    while (1) {
        FD_ZERO(&fds);
        max_fd = vsoa_server_fds(broker, &fds);

        cnt = pselect(max_fd + 1, &fds, NULL, NULL, NULL, NULL);
        if (cnt > 0) {
            vsoa_server_input_fds(broker, &fds);
        }
    }

    return  (0);
}
/*
 * end
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef SYLIXOS
#include <sys/vproc.h>
#endif
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include "vsoa_client.h"
#include "vsoa_position.h"

#define MY_SERVER_PASSWD      "123456"
#define AXIS_SER_BUF_LEN       100
#define BROKER_DEFAULT_PORT    3006

static void *datagram_send(void *arg)
{
    vsoa_url_t url;
    int roll, pitch,yaw;
    vsoa_payload_t payload;
    char param[AXIS_SER_BUF_LEN + 1];
    vsoa_client_t *cli = (vsoa_client_t *)arg;

    url.url     = "/axis";
    url.url_len = strlen(url.url);

    payload.data = NULL;
    payload.data_len  = 0;
    payload.param = param;

    roll  = 1;
    pitch = 1;
    yaw   = 1;

    while(1) {
        sleep(1);
        payload.param_len = snprintf(param, AXIS_SER_BUF_LEN,
                                            "{\"roll\": %d, \"pitch\": %d, \"yaw\": %d}",
                                            roll++, pitch++, yaw++);
        vsoa_client_datagram(cli, &url, &payload);
    }
    return NULL;
}
int main (int argc, char **argv)
{
    vsoa_url_t  url;
    vsoa_payload_t payload;
    int max_fd, cnt;
    fd_set fds;
    char info[256];
    struct sockaddr_in addr;
    struct timespec timeout = { 1, 0 };
    pthread_t tid;
    unsigned short port = BROKER_DEFAULT_PORT;

#ifdef SYLIXOS
    vprocExitModeSet(getpid(), LW_VPROC_EXIT_FORCE);
#endif

    vsoa_client_t *client = vsoa_client_create(NULL, NULL);
    if (!client) {
        fprintf(stderr, "Can not create VSOA client!\n");
        return  (-1);
    }


    bzero(&addr, sizeof(struct sockaddr_in));
    addr.sin_family      = AF_INET;
    addr.sin_port        = htons(port);
    addr.sin_addr.s_addr = INADDR_ANY;

    /*
    * Connect to server with password
    */
    if (!vsoa_client_connect(client, (struct sockaddr *)&addr, sizeof(struct sockaddr_in),
                             &timeout, MY_SERVER_PASSWD, info, sizeof(info))) {
        vsoa_client_close(client);
        fprintf(stderr, "Can not connect to VSOA server!\n");
        return  (-1);
    }

    url.url     = "/lastwords";
    url.url_len = strlen(url.url);
    payload.data = NULL;
    payload.data_len = 0;
    payload.param = "{\"lastwords\":\"Bye Bye\"}";
    payload.param_len = strlen(payload.param);

    vsoa_client_call(client, VSOA_CLIENT_RPC_METHOD_SET,
                    &url, &payload, NULL, NULL, &timeout);

   pthread_create(&tid, NULL, datagram_send, (void *)client);

    while (1) {
        FD_ZERO(&fds);
        max_fd = vsoa_client_fds(client, &fds);

        cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
        if (cnt > 0) {
            if (!vsoa_client_input_fds(client, &fds)) {
                vsoa_client_close(client);
                fprintf(stderr, "Connection lost!\n");
                return  (-1);
            }
        }
    }

    return  (0);
}

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef SYLIXOS
#include <sys/vproc.h>
#endif
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include "vsoa_client.h"
#include "vsoa_position.h"

#define MY_SERVER_PASSWD      "123456"
#define AXIS_SER_BUF_LEN       100
#define BROKER_DEFAULT_PORT    3006

static void onmessage (void *arg, struct vsoa_client *client, vsoa_url_t *url, vsoa_payload_t *payload, bool quick)
{
    printf("On message, URL: %.*s payload: %.*s\n",
           (int)url->url_len, url->url, (int)payload->param_len, payload->param);
}

int main (int argc, char **argv)
{
    int max_fd, cnt;
    fd_set fds;
    char info[256];
    struct sockaddr_in addr;
    struct timespec timeout = { 1, 0 };
    unsigned short port = BROKER_DEFAULT_PORT;
    char *topics[] = {
        "/lastwords", "/axis"
    };


#ifdef SYLIXOS
    vprocExitModeSet(getpid(), LW_VPROC_EXIT_FORCE);
#endif

    vsoa_client_t *client = vsoa_client_create(onmessage, NULL);
    if (!client) {
        fprintf(stderr, "Can not create VSOA client!\n");
        return  (-1);
    }


    bzero(&addr, sizeof(struct sockaddr_in));
    addr.sin_family      = AF_INET;
    addr.sin_port        = htons(port);
    addr.sin_addr.s_addr = INADDR_ANY;
    /*
    * Connect to server with password
    */
    if (!vsoa_client_connect(client, (struct sockaddr *)&addr, sizeof(struct sockaddr_in),
                             &timeout, MY_SERVER_PASSWD, info, sizeof(info))) {
        vsoa_client_close(client);
        fprintf(stderr, "Can not connect to VSOA server!\n");
        return  (-1);
    }


    vsoa_client_multi_subscribe(client, topics, sizeof(topics) / sizeof(char *), NULL, NULL, NULL);

    while (1) {
        FD_ZERO(&fds);
        max_fd = vsoa_client_fds(client, &fds);

        cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
        if (cnt > 0) {
            if (!vsoa_client_input_fds(client, &fds)) {
                vsoa_client_close(client);
                fprintf(stderr, "Connection lost!\n");
                return  (-1);
            }
        }
    }

    return  (0);
}

运行结果

本示例三个程序运行在同一个设备上,先启动发布端程序,再关闭发布端程序,同时在订阅端查看订阅的结果。

文档内容是否对您有所帮助?
有帮助
没帮助