数据同步
本节内容介绍 VSOA 客户端重连后与服务端数据同步的开发方法。
开发须知
数据同步是指在数据发布与订阅场景中,当客户端因故障断开重连后,需要立即获取当前最新数据的需求。
如本章开发示例中的 axis_server 陀螺仪服务,若产生故障断开重连,则需要客户端上线后立即获取 /axis
的最新状态,以保证数据的一致性。通常情况下,客户端会在断开重连后,主动发起一次 RPC 请求以获取数据的最新状态,但如果需要获取的数据量较大,则会给代码编程带来更多的复杂性。此时,可以使用客户端机器人带有的自动数据同步接口进行处理。
常用接口
bool vsoa_client_auto_consistent(vsoa_client_auto_t *cliauto, char *const urls[], int url_cnt, unsigned int rpc_timeout);
说明:
发布与订阅客户端的详细接口说明可参考 VSOA C 语言编程手册。您也可以使用 VSOA 客户端机器人 接口完成主题订阅,关于机器人接口的更多内容可参见 VSOA C 扩展编程手册。
开发示例
如本章开发示例中的 axis_server 陀螺仪服务,若客户端产生故障断开重连,则需要调用自动数据同步接口 vsoa_client_auto_consistent
。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#ifdef SYLIXOS
#include <sys/vproc.h>
#endif
#include "vsoa_cliauto.h"
/* My server password */
#define MY_SERVER_PASSWD "123456"
/* My client */
static vsoa_client_t *client;
/* My client auto */
static vsoa_client_auto_t *cliauto;
/* My subscribe (string pointer array) */
static char *sub_urls[] = { "/axis" };
static void onconnect (void *arg, vsoa_client_auto_t *cliauto, bool connect, const char *info)
{
printf("On connect, connect: %s, info: %s\n",
(connect == true) ? "connected" : "disconnected", info);
}
/*
* On subscribed messages received
*/
static void onmessage (void *arg, struct vsoa_client *client, vsoa_url_t *url, vsoa_payload_t *payload, bool quick)
{
printf("On message, URL: %.*s payload: %.*s\n",
(int)url->url_len, url->url, (int)payload->param_len, payload->param);
}
/*
* main function
*/
int main (int argc, char **argv)
{
#ifdef SYLIXOS
vprocExitModeSet(getpid(), LW_VPROC_EXIT_FORCE);
#endif
/*
* Create client auto robot
*/
cliauto = vsoa_client_auto_create(onmessage, NULL);
client = vsoa_client_auto_handle(cliauto);
if (!vsoa_client_auto_setup(cliauto, onconnect, NULL)) {
vsoa_client_auto_delete(cliauto);
fprintf(stderr, "Cannot register connect callback: %s (%d)\n", strerror(errno), errno);
return -1;
}
vsoa_client_auto_consistent(cliauto, sub_urls, 1, 1000);
/*
* Client auto robot start
* The robot will automatically connect to the specified server and maintain the connection.
* At this time, the developer only needs to focus on the business.
*/
vsoa_client_auto_start(cliauto, "vsoa://axis_server", MY_SERVER_PASSWD,
(char * const)sub_urls, 1, 1000, 1000, 1000);
while (true) {
sleep(1);
}
}
同时,服务端应当实现 /axis
的 RPC 接口,用于客户端获取当前陀螺仪数据。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_server.h"
#include "vsoa_platform.h"
#define MY_SERVER_PASSWD "123456"
static vsoa_server_t *server;
static int roll = 1, pitch = 1, yaw = 1;
/*
* Publish axis thread
*/
static void *publish_axis_thread (void *arg)
{
vsoa_url_t url;
vsoa_payload_t payload;
char param[100];
url.url = "/axis";
url.url_len = strlen(url.url);
payload.data = NULL;
payload.data_len = 0;
payload.param = param;
while (1) {
sleep(1);
if (!vsoa_server_is_subscribed(server, &url)) {
continue;
}
payload.param_len = sprintf(param,
"{\"roll\": %d, \"pitch\": %d, \"yaw\": %d}",
roll++, pitch++, yaw++);
vsoa_server_publish(server, &url, &payload);
}
return (NULL);
}
static void command_axis (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
vsoa_header_t *vsoa_hdr, vsoa_url_t *url,
vsoa_payload_t *payload)
{
vsoa_payload_t send;
char param[100];
uint32_t seqno = vsoa_parser_get_seqno(vsoa_hdr);
sprintf(param, "{\"roll\": %d, \"pitch\": %d, \"yaw\": %d}",
roll, pitch, yaw);
send.data = NULL;
send.data_len = 0;
send.param = param;
send.param_len = strlen(send.param);
vsoa_server_cli_reply(server, cid, 0, seqno, 0, &send);
}
int main (int argc, char **argv)
{
int cnt, max_fd;
fd_set fds;
pthread_t id;
struct sockaddr_in addr;
struct timespec timeout = { 1, 0 };
bzero(&addr, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(3002);
addr.sin_addr.s_addr = INADDR_ANY;
#ifdef VSOA_HAS_SIN_LEN
addr.sin_len = sizeof(struct sockaddr_in);
#endif
/*
* Initialize server
*/
server = vsoa_server_create("{\"name\":\"axis_server\"}");
if (!server) {
fprintf(stderr, "Can not create VSOA server!\n");
return (-1);
}
/*
* If need password
*/
vsoa_server_passwd(server, MY_SERVER_PASSWD);
/*
* Add /axis listener
*/
url.url = "/axis";
url.url_len = strlen(url.url);
vsoa_server_add_listener(server, &url, command_axis, NULL);
/*
* Start server
*/
if (!vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {
vsoa_server_close(server);
fprintf(stderr, "Can not start VSOA server!\n");
return (-1);
}
/*
* Create publish thread
*/
pthread_create(&id, NULL, publish_axis_thread, NULL);
while (1) {
FD_ZERO(&fds);
max_fd = vsoa_server_fds(server, &fds);
cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
if (cnt > 0) {
vsoa_server_input_fds(server, &fds);
}
}
return (0);
}