数据同步

更新时间:
2024-12-19

数据同步

本节内容介绍 QVSOA 客户端重连后与服务端数据同步的开发方法。

开发须知

数据同步是指在数据发布与订阅场景中,当客户端因故障断开重连后,需要立即获取当前最新数据的需求。

如本章开发示例中的 axis_server 陀螺仪服务,若产生故障断开重连,则需要客户端上线后立即获取 /axis 的最新状态,以保证数据的一致性。此时,可使用 QVSOA 提供的 autoConsistent 相关接口添加感兴趣的 URL,当成功建立连接时将自动查询相应数据。

开发示例

在本章客户端代码的基础上,使用 autoConsistent 方法添加 /axis URL,即可实现数据自动同步。

#include <QCoreApplication>
#include <QDebug>
#include <QVsoa>

constexpr char SERVER_PASSWORD[] = "123456";

void onConnected(bool ok, QString info)
{
    if (!ok) {
        qDebug() << "Connected with server failed!";
        return;
    }
    qDebug() << "Connected with server:" << info;
}

void onDisconnected()
{
    qDebug() << "Connection break";
}

void onMessage(QString url, QVsoaPayload payload)
{
    qDebug() << "received event:" << url << "payload:" << payload.param();
}

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    // Initilize client
    QVsoaClient client;
    if (client.isInvalid()) {
        qDebug() << "Can not create VSOA client!";
        return -1;
    }

    QObject::connect(&client, &QVsoaClient::connected, onConnected);
    QObject::connect(&client, &QVsoaClient::disconnected, onDisconnected);
    QObject::connect(&client, &QVsoaClient::message, onMessage);

    // Connect to server with password
    client.connect2server("vsoa://axis_server", SERVER_PASSWORD);
    // Enable automatic connections
    client.autoConnect(1000, 500);
    // Subscribe /axis
    client.subscribe("/axis");
    // Enable data consistency on /axis
    client.autoConsistent({"/axis"}, 1000);

    return a.exec();
}

同时,服务端应当实现 /axis 的 RPC 接口,用于客户端获取当前陀螺仪数据。

#include <QCoreApplication>
#include <QDebug>
#include <QJsonDocument>
#include <QProcessEnvironment>
#include <QTimer>
#include <QVsoa>

constexpr char SERVER_INFO[] = "{\"name\":\"axis_server\"}";
constexpr char SERVER_PASSWORD[] = "123456";
constexpr char SERVER_ADDR[] = "0.0.0.0";
constexpr int SERVER_PORT = 3002;

using namespace std::placeholders;

void onNewClient(QPointer<QVsoaCliHandle> client)
{
    qDebug() << QStringLiteral("New client, address: %1:%2").arg(client->address().ip()).arg(client->address().port());
}

void onDisconnected(QPointer<QVsoaCliHandle> client)
{
    qDebug() << QStringLiteral("Client disconnect: %1:%2").arg(client->address().ip()).arg(client->address().port());
}

static int roll = 1;
static int pitch = 1;
static int yaw = 1;

void publishMessage(QVsoaServer *server)
{
    if (server->isSubscribed("/axis")) {
        QVariantMap param = {
            {"poll",  roll++ },
            {"pitch", pitch++},
            {"yaw",   yaw++  }
        };
        // Publish /axis
        server->publish("/axis", QVsoaPayload(QString::fromUtf8(QJsonDocument::fromVariant(param).toJson()), {}));
    }
}

void getAxisData(QPointer<QVsoaCliHandle> client,
                 const QVsoaHeader &header,
                 const QString &,
                 const QVsoaPayload &)
{
    if (header.flags() & FlagCode::FLAG_SET) {
        return;
    }
    QVariantMap param = {
        {"poll",  roll },
        {"pitch", pitch},
        {"yaw",   yaw  }
    };
    client->reply(StatusCode::SUCCESS,
                  header.seqno(),
                  QVsoaPayload(QString::fromUtf8(QJsonDocument::fromVariant(param).toJson()), {}));
}

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    // Initialize server
    QVsoaServer server(SERVER_INFO);
    if (server.isInvalid()) {
        qDebug() << "Can not create VSOA server!";
        return -1;
    }

    // If need password
    server.setPassword(SERVER_PASSWORD);

    QObject::connect(&server, &QVsoaServer::newClient, onNewClient);
    QObject::connect(&server, &QVsoaServer::clientDisconnect, onDisconnected);

    int port = SERVER_PORT;
    int auto_port = QProcessEnvironment::systemEnvironment().value("VSOA_AUTO_PORT", "-1").toInt();
    if (auto_port != -1) {
        port = auto_port;
    }
    // Start server
    if (!server.start(QVsoaSocketAddress(AF_INET, SERVER_ADDR, port))) {
        qDebug() << "Can not start VSOA server!";
        return -1;
    }
    qDebug() << "Started VSOA server.";

    QTimer timer;
    QObject::connect(&timer, &QTimer::timeout, std::bind(publishMessage, &server));
    timer.start(1000);

    // Add /axis listener
    QVsoaRPCServerListener listener("/axis", getAxisData);
    listener.listen(&server);

    return a.exec();
}
文档内容是否对您有所帮助?
有帮助
没帮助