VSOA Broker
本节主要介绍 VSOA Broker 的实现方法和使用示例。
了解 VSOA Broker
VSOA Broker 是一种采用数据中心的发布-订阅模型,负责在发布者和订阅者之间传输数据,并对数据进行缓存和路由,可以广泛应用于各种分布式系统中。
通信模型
主要特点
集中处理: VSOA Broker 能够集中处理所有的订阅请求,而无需关心发布者的具体地址。当发布者的地址发生变化时,订阅者不受到影响,实现通信双方的解耦。
数据传输与缓存: VSOA Broker 在订阅者和发布者之间起到桥梁的作用,负责数据的传输。同时,还可以对数据进行缓存,以便在需要时快速提供数据,提高系统的响应速度。
数据路由: VSOA Broker 能够根据数据的资源标识 (URL) 将数据路由到相应的订阅者,确保数据能够准确地被需要的订阅者接收。
程序遗言: VSOA Broker 支持程序遗言的功能,当发布者因为某种原因(如资源不足、严重错误、用户请求等)即将退出时,可以向 VSOA Broker 设置遗言。VSOA Broker 把程序遗言发布给订阅者,帮助订阅者了解发布程序退出原因。
VSOA Broker 的应用场景非常广泛,尤其适用于物联网、航空航天等分布式系统。可以用于连接智能设备和各种传感器,实现数据的实时采集和处理。
开发常用接口
vsoa_server_t *vsoa_server_create(const char *info_json);
void vsoa_server_on_cli(vsoa_server_t *server, vsoa_server_cli_func_t oncli, void *arg);
void vsoa_server_on_datagram(vsoa_server_t *server, vsoa_server_dat_func_t callback, void *arg);
bool vsoa_client_datagram(vsoa_client_t *client, const vsoa_url_t *url, const vsoa_payload_t *payload);
bool vsoa_server_add_listener(vsoa_server_t *server, const vsoa_url_t *url, vsoa_server_cmd_func_t callback, void *arg);
bool vsoa_client_call(vsoa_client_t *client, int method, const vsoa_url_t *url, const vsoa_payload_t *payload, vsoa_client_rpc_func_t callback, void *arg, const struct timespec *timeout);
bool vsoa_client_multi_subscribe(vsoa_client_t *client, char *const urls[], int cnt, vsoa_client_res_func_t callback, void *arg, const struct timespec *timeout);
bool vsoa_server_publish(vsoa_server_t *server, const vsoa_url_t *url, const vsoa_payload_t *payload);
说明:
详细接口说明可参考以下手册:
- VSOA C 语言编程手册。C 版本也可以使用 VSOA 客户端机器人,手册详情可见 VSOA C 扩展编程手册。
开发示例
- 数据发布者连接 broker,并通过 RPC Set 设置
lastwords
。创建数据采集和发送线程,通过vsoa_client_datagram
接口发送采集的数据。 - broker 保存
lastwords
数据,当数据发布者断开连接时,向lastwords
的订阅者发布lastwords
数据。broker 同时接收数据发布者发送的数据,并向订阅者发布。 - 数据订阅者连接 broker,并订阅
/lastwords
和/axis
。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <getopt.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "vsoa_list.h"
#include "vsoa_server.h"
/* Broker version */
#define BROKER_VERSION "0.0.1"
#define BROKER_DEFAULT_PORT 3006
/* Broker RPC define */
#define BROKER_RPC_IDX_LASTWORDS 0
#define BROKER_RPC_URL_LASTWORDS "/lastwords"
#define BROKER_RPC_IDX_MAX 1
#define VSOA_URL_SET(_url, _url_str, _url_len) \
{(_url)->url = _url_str; (_url)->url_len = _url_len;}
#define VSOA_PAYLOAD_SET(p1, _param, _param_len, _data, _data_len) \
{(p1)->param = (_param); (p1)->param_len = (_param_len); \
(p1)->data = (_data); (p1)->data_len = (_data_len); }
/* Client last words */
typedef struct {
vsoa_url_t url;
vsoa_payload_t payload;
} broker_cli_lastword_t;
/* This broker client */
typedef struct broker_cli {
struct broker_cli *next;
struct broker_cli *prev;
vsoa_cli_id_t cid;
broker_cli_lastword_t lastwords;
} broker_cli_t;
/* All clients with a last words */
static broker_cli_t *broker_clis;
/* This broker server */
static vsoa_server_t *broker;
/* Broker RPC URLs */
static vsoa_url_t broker_rpc_urls[BROKER_RPC_IDX_MAX];
/*
* Last words clean
*/
static void broker_lastwords_clean (broker_cli_lastword_t *lw)
{
if (lw->url.url) {
free(lw->url.url);
}
VSOA_URL_SET(&lw->url, NULL, 0);
if (lw->payload.param) {
free(lw->payload.param);
}
if (lw->payload.data) {
free(lw->payload.data);
}
VSOA_PAYLOAD_SET(&lw->payload, NULL, 0, NULL, 0);
}
/*
* Last words copy
*/
static bool broker_lastwords_copy (broker_cli_lastword_t *lw,
const vsoa_url_t *url,
const vsoa_payload_t *payload)
{
VSOA_URL_SET(&lw->url, NULL, 0);
VSOA_PAYLOAD_SET(&lw->payload, NULL, 0, NULL, 0);
if (url && url->url_len) {
lw->url.url = (char *)malloc(url->url_len);
if (!lw->url.url) {
goto nomem;
}
memcpy(lw->url.url, url->url, url->url_len);
lw->url.url_len = url->url_len;
}
if (payload) {
if (payload->param_len) {
lw->payload.param = (char *)malloc(payload->param_len);
if (!lw->payload.param) {
goto nomem;
}
memcpy(lw->payload.param, payload->param, payload->param_len);
lw->payload.param_len = payload->param_len;
}
if (payload->data_len) {
lw->payload.data = (char *)malloc(payload->data_len);
if (!lw->payload.data) {
goto nomem;
}
memcpy(lw->payload.data, payload->data, payload->data_len);
lw->payload.data_len = payload->data_len;
}
}
return (true);
nomem:
broker_lastwords_clean(lw);
return (false);
}
/*
* Client find
*/
static broker_cli_t *broker_cli_find (vsoa_cli_id_t cid)
{
broker_cli_t *cli;
FOREACH_FROM_LIST (cli, broker_clis) {
if (cli->cid == cid) {
break;
}
}
return (cli);
}
/*
* Create client
*/
static broker_cli_t *broker_cli_create (vsoa_cli_id_t cid)
{
broker_cli_t *cli;
cli = (broker_cli_t *)malloc(sizeof(broker_cli_t));
if (cli) {
bzero(cli, sizeof(broker_cli_t));
cli->cid = cid;
INSERT_TO_HEADER(cli, broker_clis);
}
return (cli);
}
/*
* Delete client
*/
static void broker_cli_delete (broker_cli_t *cli)
{
DELETE_FROM_LIST(cli, broker_clis);
broker_lastwords_clean(&cli->lastwords);
free(cli);
}
/*
* Client RPC set last words
*/
static void broker_rpc_lastwords (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
vsoa_header_t *vsoa_hdr, vsoa_url_t *url, vsoa_payload_t *payload)
{
broker_cli_t *cli;
broker_cli_lastword_t lw;
if (!broker_lastwords_copy(&lw, url, payload)) {
vsoa_server_cli_reply(broker, cid, VSOA_STATUS_NO_MEMORY,
vsoa_parser_get_seqno(vsoa_hdr), 0, payload);
return;
}
cli = broker_cli_find(cid);
if (cli) {
broker_lastwords_clean(&cli->lastwords);
} else {
cli = broker_cli_create(cid);
if (!cli) {
broker_lastwords_clean(&lw);
vsoa_server_cli_reply(broker, cid, VSOA_STATUS_NO_MEMORY,
vsoa_parser_get_seqno(vsoa_hdr), 0, payload);
return;
}
}
cli->lastwords = lw;
vsoa_server_cli_reply(broker, cid, 0, vsoa_parser_get_seqno(vsoa_hdr), 0, NULL);
}
/*
* Client connect and disconnect
*/
static void broker_on_cli (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid, bool connect)
{
broker_cli_t *cli;
broker_cli_lastword_t *lw;
if (connect) {
return;
}
cli = broker_cli_find(cid);
if (cli) {
lw = &cli->lastwords;
if (lw->url.url) {
vsoa_server_publish(broker, &lw->url, &lw->payload);
}
broker_cli_delete(cli);
}
}
/*
* Client datagram
*/
static void broker_on_datagram (void *arg, vsoa_server_t *server, vsoa_cli_id_t id,
vsoa_url_t *url, vsoa_payload_t *payload, bool quick)
{
if (quick) {
vsoa_server_quick_publish(broker, url, payload);
} else {
vsoa_server_publish(broker, url, payload);
}
}
/*
* broker usage.
*/
static void broker_usage (void)
{
printf("USAGE: broker -p port Set port\n"
" -P passwd Set password\n"
" -v Show version\n"
" -h Show this message\n");
}
/*
* broker main
*/
int main (int argc, char **argv)
{
int ch, cnt, max_fd = 0;
int port = BROKER_DEFAULT_PORT;
fd_set fds;
char *passwd = NULL;
char opt[] = "vhp:P:";
struct sockaddr_in addr;
while ((ch = getopt(argc, argv, opt)) != -1) {
switch (ch) {
case 'p':
port = atoi(optarg);
break;
case 'P':
passwd = strdup(optarg);
break;
case 'h':
broker_usage();
return (0);
case 'v':
printf("%s\n", BROKER_VERSION);
return (0);
default:
fprintf(stderr, "Unknown argument: %c\n", (char)ch);
break;
}
}
if (port < 1 || port > 65535) {
fprintf(stderr, "`port` invalid!\n");
return (-1);
}
bzero(&addr, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons((unsigned short)port);
addr.sin_addr.s_addr = INADDR_ANY;
#ifdef VSOA_HAS_SIN_LEN
addr.sin_len = sizeof(struct sockaddr_in);
#endif
broker = vsoa_server_create("{\"name\":\"VSOA broker\"}");
if (!broker) {
fprintf(stderr, "Can not create VSOA broker!\n");
return (-1);
}
if (passwd) {
vsoa_server_passwd(broker, passwd);
}
vsoa_server_on_cli(broker, broker_on_cli, NULL);
vsoa_server_on_datagram(broker, broker_on_datagram, NULL);
broker_rpc_urls[BROKER_RPC_IDX_LASTWORDS].url = BROKER_RPC_URL_LASTWORDS;
broker_rpc_urls[BROKER_RPC_IDX_LASTWORDS].url_len = strlen(BROKER_RPC_URL_LASTWORDS);
vsoa_server_add_listener(broker, &broker_rpc_urls[BROKER_RPC_IDX_LASTWORDS],
broker_rpc_lastwords, NULL);
if (!vsoa_server_start(broker,
(struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {
vsoa_server_close(broker);
fprintf(stderr, "Can not start VSOA broker!\n");
return (-1);
}
while (1) {
FD_ZERO(&fds);
max_fd = vsoa_server_fds(broker, &fds);
cnt = pselect(max_fd + 1, &fds, NULL, NULL, NULL, NULL);
if (cnt > 0) {
vsoa_server_input_fds(broker, &fds);
}
}
return (0);
}
/*
* end
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef SYLIXOS
#include <sys/vproc.h>
#endif
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include "vsoa_client.h"
#include "vsoa_position.h"
#define MY_SERVER_PASSWD "123456"
#define AXIS_SER_BUF_LEN 100
#define BROKER_DEFAULT_PORT 3006
static void *datagram_send(void *arg)
{
vsoa_url_t url;
int roll, pitch,yaw;
vsoa_payload_t payload;
char param[AXIS_SER_BUF_LEN + 1];
vsoa_client_t *cli = (vsoa_client_t *)arg;
url.url = "/axis";
url.url_len = strlen(url.url);
payload.data = NULL;
payload.data_len = 0;
payload.param = param;
roll = 1;
pitch = 1;
yaw = 1;
while(1) {
sleep(1);
payload.param_len = snprintf(param, AXIS_SER_BUF_LEN,
"{\"roll\": %d, \"pitch\": %d, \"yaw\": %d}",
roll++, pitch++, yaw++);
vsoa_client_datagram(cli, &url, &payload);
}
return NULL;
}
int main (int argc, char **argv)
{
vsoa_url_t url;
vsoa_payload_t payload;
int max_fd, cnt;
fd_set fds;
char info[256];
struct sockaddr_in addr;
struct timespec timeout = { 1, 0 };
pthread_t tid;
unsigned short port = BROKER_DEFAULT_PORT;
#ifdef SYLIXOS
vprocExitModeSet(getpid(), LW_VPROC_EXIT_FORCE);
#endif
vsoa_client_t *client = vsoa_client_create(NULL, NULL);
if (!client) {
fprintf(stderr, "Can not create VSOA client!\n");
return (-1);
}
bzero(&addr, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
/*
* Connect to server with password
*/
if (!vsoa_client_connect(client, (struct sockaddr *)&addr, sizeof(struct sockaddr_in),
&timeout, MY_SERVER_PASSWD, info, sizeof(info))) {
vsoa_client_close(client);
fprintf(stderr, "Can not connect to VSOA server!\n");
return (-1);
}
url.url = "/lastwords";
url.url_len = strlen(url.url);
payload.data = NULL;
payload.data_len = 0;
payload.param = "{\"lastwords\":\"Bye Bye\"}";
payload.param_len = strlen(payload.param);
vsoa_client_call(client, VSOA_CLIENT_RPC_METHOD_SET,
&url, &payload, NULL, NULL, &timeout);
pthread_create(&tid, NULL, datagram_send, (void *)client);
while (1) {
FD_ZERO(&fds);
max_fd = vsoa_client_fds(client, &fds);
cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
if (cnt > 0) {
if (!vsoa_client_input_fds(client, &fds)) {
vsoa_client_close(client);
fprintf(stderr, "Connection lost!\n");
return (-1);
}
}
}
return (0);
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef SYLIXOS
#include <sys/vproc.h>
#endif
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include "vsoa_client.h"
#include "vsoa_position.h"
#define MY_SERVER_PASSWD "123456"
#define AXIS_SER_BUF_LEN 100
#define BROKER_DEFAULT_PORT 3006
static void onmessage (void *arg, struct vsoa_client *client, vsoa_url_t *url, vsoa_payload_t *payload, bool quick)
{
printf("On message, URL: %.*s payload: %.*s\n",
(int)url->url_len, url->url, (int)payload->param_len, payload->param);
}
int main (int argc, char **argv)
{
int max_fd, cnt;
fd_set fds;
char info[256];
struct sockaddr_in addr;
struct timespec timeout = { 1, 0 };
unsigned short port = BROKER_DEFAULT_PORT;
char *topics[] = {
"/lastwords", "/axis"
};
#ifdef SYLIXOS
vprocExitModeSet(getpid(), LW_VPROC_EXIT_FORCE);
#endif
vsoa_client_t *client = vsoa_client_create(onmessage, NULL);
if (!client) {
fprintf(stderr, "Can not create VSOA client!\n");
return (-1);
}
bzero(&addr, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
/*
* Connect to server with password
*/
if (!vsoa_client_connect(client, (struct sockaddr *)&addr, sizeof(struct sockaddr_in),
&timeout, MY_SERVER_PASSWD, info, sizeof(info))) {
vsoa_client_close(client);
fprintf(stderr, "Can not connect to VSOA server!\n");
return (-1);
}
vsoa_client_multi_subscribe(client, topics, sizeof(topics) / sizeof(char *), NULL, NULL, NULL);
while (1) {
FD_ZERO(&fds);
max_fd = vsoa_client_fds(client, &fds);
cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
if (cnt > 0) {
if (!vsoa_client_input_fds(client, &fds)) {
vsoa_client_close(client);
fprintf(stderr, "Connection lost!\n");
return (-1);
}
}
}
return (0);
}
运行结果
本示例三个程序运行在同一个设备上,先启动发布端程序,再关闭发布端程序,同时在订阅端查看订阅的结果。