数据同步
本节内容介绍 QVSOA 客户端重连后与服务端数据同步的开发方法。
开发须知
数据同步是指在数据发布与订阅场景中,当客户端因故障断开重连后,需要立即获取当前最新数据的需求。
如本章开发示例中的 axis_server 陀螺仪服务,若产生故障断开重连,则需要客户端上线后立即获取 /axis
的最新状态,以保证数据的一致性。此时,可使用 QVSOA 提供的 autoConsistent
相关接口添加感兴趣的 URL,当成功建立连接时将自动查询相应数据。
开发示例
在本章客户端代码的基础上,使用 autoConsistent
方法添加 /axis
URL,即可实现数据自动同步。
#include <QCoreApplication>
#include <QDebug>
#include <QVsoa>
constexpr char SERVER_PASSWORD[] = "123456";
void onConnected(bool ok, QString info)
{
if (!ok) {
qDebug() << "Connected with server failed!";
return;
}
qDebug() << "Connected with server:" << info;
}
void onDisconnected()
{
qDebug() << "Connection break";
}
void onMessage(QString url, QVsoaPayload payload)
{
qDebug() << "received event:" << url << "payload:" << payload.param();
}
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
// Initilize client
QVsoaClient client;
if (client.isInvalid()) {
qDebug() << "Can not create VSOA client!";
return -1;
}
QObject::connect(&client, &QVsoaClient::connected, onConnected);
QObject::connect(&client, &QVsoaClient::disconnected, onDisconnected);
QObject::connect(&client, &QVsoaClient::message, onMessage);
// Connect to server with password
client.connect2server("vsoa://axis_server", SERVER_PASSWORD);
// Enable automatic connections
client.autoConnect(1000, 500);
// Subscribe /axis
client.subscribe("/axis");
// Enable data consistency on /axis
client.autoConsistent({"/axis"}, 1000);
return a.exec();
}
同时,服务端应当实现 /axis
的 RPC 接口,用于客户端获取当前陀螺仪数据。
#include <QCoreApplication>
#include <QDebug>
#include <QJsonDocument>
#include <QProcessEnvironment>
#include <QTimer>
#include <QVsoa>
constexpr char SERVER_INFO[] = "{\"name\":\"axis_server\"}";
constexpr char SERVER_PASSWORD[] = "123456";
constexpr char SERVER_ADDR[] = "0.0.0.0";
constexpr int SERVER_PORT = 3002;
using namespace std::placeholders;
void onNewClient(QPointer<QVsoaCliHandle> client)
{
qDebug() << QStringLiteral("New client, address: %1:%2").arg(client->address().ip()).arg(client->address().port());
}
void onDisconnected(QPointer<QVsoaCliHandle> client)
{
qDebug() << QStringLiteral("Client disconnect: %1:%2").arg(client->address().ip()).arg(client->address().port());
}
static int roll = 1;
static int pitch = 1;
static int yaw = 1;
void publishMessage(QVsoaServer *server)
{
if (server->isSubscribed("/axis")) {
QVariantMap param = {
{"poll", roll++ },
{"pitch", pitch++},
{"yaw", yaw++ }
};
// Publish /axis
server->publish("/axis", QVsoaPayload(QString::fromUtf8(QJsonDocument::fromVariant(param).toJson()), {}));
}
}
void getAxisData(QPointer<QVsoaCliHandle> client,
const QVsoaHeader &header,
const QString &,
const QVsoaPayload &)
{
if (header.flags() & FlagCode::FLAG_SET) {
return;
}
QVariantMap param = {
{"poll", roll },
{"pitch", pitch},
{"yaw", yaw }
};
client->reply(StatusCode::SUCCESS,
header.seqno(),
QVsoaPayload(QString::fromUtf8(QJsonDocument::fromVariant(param).toJson()), {}));
}
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
// Initialize server
QVsoaServer server(SERVER_INFO);
if (server.isInvalid()) {
qDebug() << "Can not create VSOA server!";
return -1;
}
// If need password
server.setPassword(SERVER_PASSWORD);
QObject::connect(&server, &QVsoaServer::newClient, onNewClient);
QObject::connect(&server, &QVsoaServer::clientDisconnect, onDisconnected);
int port = SERVER_PORT;
int auto_port = QProcessEnvironment::systemEnvironment().value("VSOA_AUTO_PORT", "-1").toInt();
if (auto_port != -1) {
port = auto_port;
}
// Start server
if (!server.start(QVsoaSocketAddress(AF_INET, SERVER_ADDR, port))) {
qDebug() << "Can not start VSOA server!";
return -1;
}
qDebug() << "Started VSOA server.";
QTimer timer;
QObject::connect(&timer, &QTimer::timeout, std::bind(publishMessage, &server));
timer.start(1000);
// Add /axis listener
QVsoaRPCServerListener listener("/axis", getAxisData);
listener.listen(&server);
return a.exec();
}