客户端开发

更新时间:
2024-12-19

客户端开发

本节内容介绍 VSOA 客户端使用发布与订阅的方法。

开发须知

VSOA 客户端使用发布与订阅时,首先需要确定订阅的 URL 主题,当订阅的主题产生数据时,将进入客户端指定的数据接收接口。

常用接口

bool vsoa_client_subscribe(vsoa_client_t *client, const vsoa_url_t *url, vsoa_client_res_func_t callback, 
                           void *arg, const struct timespec *timeout);
bool vsoa_client_unsubscribe(vsoa_client_t *client, const vsoa_url_t *url, vsoa_client_res_func_t callback, 
                             void *arg, const struct timespec *timeout);
bool vsoa_client_multi_subscribe(vsoa_client_t *client, char *const urls[], int cnt, vsoa_client_res_func_t callback, 
                                 void *arg, const struct timespec *timeout);
bool vsoa_client_multi_unsubscribe(vsoa_client_t *client, char *const urls[], int cnt, vsoa_client_res_func_t callback, 
                                   void *arg, const struct timespec *timeout);
client.subscribe(url[, callback[, timeout]]);
client.unsubscribe([url[, callback[, timeout]]]);
boolean subscribe(String url, CBSubscribe cbSubscribe);
boolean subscribe(String url, CBSubscribe cbSubscribe, int timeout);
boolean subscribe(String[] urls, CBSubscribe cbSubscribe);
boolean subscribe(String[] urls, CBSubscribe cbSubscribe, int timeout);
func (client *Client)Subscribe(URL string, onPublish func(m *protocol.Message)) error
func (client *Client)UnSubscribe(URL string) error
client.subscribe(url: str | list[str], callback: callable = None, timeout: float = 60.0) -> bool
client.unsubscribe(url: str | list[str], callback: callable = None, timeout: float = 60.0) -> bool

说明:

发布与订阅客户端的详细接口说明可参考以下手册:

开发示例

假定存在一个 axis_server 服务,提供一个 URL 为 /axis 的主题供客户端订阅云原生验证平台中陀螺仪的信息。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#ifdef SYLIXOS
#include <sys/vproc.h>
#endif
#include "vsoa_cliauto.h"

/* My server password */
#define MY_SERVER_PASSWD "123456"

/* My client */
static vsoa_client_t *client;

/* My client auto */
static vsoa_client_auto_t *cliauto;

/* My subscribe (string pointer array) */
static char *sub_urls[] = { "/axis" };

static void onconnect (void *arg, vsoa_client_auto_t *cliauto, bool connect, const char *info)
{
    printf("On connect, connect: %s, info: %s\n", 
            (connect == true) ? "connected" : "disconnected", info);
}

/*
 * On subscribed messages received
 */
static void onmessage (void *arg, struct vsoa_client *client, vsoa_url_t *url, vsoa_payload_t *payload, bool quick)
{
    printf("On message, URL: %.*s payload: %.*s\n",
           (int)url->url_len, url->url, (int)payload->param_len, payload->param);
}

/*
 * main function
 */
int main (int argc, char **argv)
{
#ifdef SYLIXOS
    vprocExitModeSet(getpid(), LW_VPROC_EXIT_FORCE);
#endif

    /*
     * Create client auto robot
     */
    cliauto = vsoa_client_auto_create(onmessage, NULL);
    client  = vsoa_client_auto_handle(cliauto);

    if (!vsoa_client_auto_setup(cliauto, onconnect, NULL)) {
        vsoa_client_auto_delete(cliauto);
        fprintf(stderr, "Cannot register connect callback: %s (%d)\n", strerror(errno), errno);
        return -1;
    }

    /*
     * Client auto robot start
     * The robot will automatically connect to the specified server and maintain the connection.
     * At this time, the developer only needs to focus on the business.
     */
    vsoa_client_auto_start(cliauto, "vsoa://axis_server", MY_SERVER_PASSWD,
                           (char * const)sub_urls, 1, 1000, 1000, 1000);

    while (true) {
        sleep(1);
    }
}
/* Server name to connect */
var vsoa = require('vsoa');

/* Server name to connect */
const SERVER_NAME = 'axis_server';

/* Client Option */
var option = { passwd: '123456' };

/* Client */
var client = new vsoa.Client(option);

/*
 * Listen Subscribed message
 */
client.on('message', (url, payload, quick) => {
    console.log('received event:', url, 'payload:', JSON.stringify(payload))
})

/*
 * Listen connect event
 */
client.on('connect', function(info) {
    console.info('Connected with server:', JSON.stringify(info));
    
    client.subscribe('/axis', error => {
        console.log('Subscribe /', error ? 'error' : 'OK');
    })
});
    
client.connect(`vsoa://${SERVER_NAME}`, (error, info) => {
    if (error) 
        throw error;
});

/*
 * Event loop
 */
require('iosched').forever();
import java.net.InetSocketAddress;

import com.acoinfo.vsoa.CBCall;
import com.acoinfo.vsoa.Client;
import com.acoinfo.vsoa.CBSubscribe;
import com.acoinfo.vsoa.ClientOption;
import com.acoinfo.vsoa.Constant;
import com.acoinfo.vsoa.Error;
import com.acoinfo.vsoa.Payload;
import com.acoinfo.vsoa.Position;
import com.acoinfo.vsoa.VsoaSocketAddress;

public class client_test {
    private  static boolean POS_MANUALLY  = true;
    private  static String  SERVER_NAME   = "axis_server";
    private  static String  PASSWORD      = "123456";
    private  static String  POS_ADDRESS   = "127.0.0.1";
    private  static int     POS_PORT      = 3000;

    public   static Client client;

    public static void main(String[] args) {

        /*
        * Initialize client
        */
        client = new Client(new ClientOption(PASSWORD, 6000, 4000, 3, false)) {

            @Override
            public void onError(Error error) {
                System.out.println("Client error:" + error.message);
            }

            /*
            * Message receiver
            */
            @Override
            public void onMessage(String url, Payload payload, boolean quick) {
                if (payload.param.length() > 100) {
                    System.out.println("[CLIENT] received event: " + url + 
                                    " payload len: " + payload.param.length());
                } else {
                    System.out.println("[CLIENT] received event: " + url + 
                                    " payload: " + payload.param);
                }
            }

            @Override
            public void onConnected(String info) {
                System.out.println("Connected with server:" + info);
            }
        }; 
        
        if (!client.connect("vsoa://" + SERVER_NAME, null, Constant.VSOA_DEF_CONN_TIMEOUT)) {
            System.out.println("Connected with server failed");
            return;
        }

        /*
        * Subscribe /axis
        */
        client.subscribe("/axis", new CBSubscribe() {
            @Override
            public void callback(Error error) {
                System.out.println("Subscribe /axis:" + (error != null ? "error" : "OK"));
            }
        });

        while(true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}
package main

import (
    "encoding/json"
    "errors"
    "fmt"
    "time"

    "gitee.com/sylixos/go-vsoa/client"
    "gitee.com/sylixos/go-vsoa/protocol"
)

type PublishTestParam struct {
    Roll  int `json:"roll"`
    Pitch int `json:"pitch"`
    Yaw   int `json:"yaw"`
}

type callback struct{}

func VsoaGetPublishCall() {
    // Set client options
    clientOption := client.Option{
        Password: "123456",
    }

    // Create a new client instance
    c := client.NewClient(clientOption)

    // Connect to the VSOA server
    _, err := c.Connect("vsoa", "localhost:3001")
    if err != nil {
        fmt.Println(err)
        return
    }

    // Close the connection when the function exits
    defer c.Close()

    cb := new(callback)

    // Subscribe again to the "/axis" channel
    err = c.Subscribe("/axis", cb.getPublishParam)
    if err != nil {
        // Handle the error
        if err == errors.New(protocol.StatusText(protocol.StatusInvalidUrl)) {
            fmt.Println("Pass: Invalid URL")
        } else {
            fmt.Println(err)
        }
        return
    }

    // Wait for 2 seconds
    time.Sleep(2 * time.Second)
}

func main() {
    VsoaGetPublishCall()
}

// getPublishParam parses the JSON parameter from the message and prints the "Axis" field
func (c callback) getPublishParam(m *protocol.Message) {
    // Create a new PublishTestParam instance
    param := new(PublishTestParam)

    // Unmarshal the JSON parameter into the param variable
    json.Unmarshal(m.Param, param)

    // Print the value of the "Publish" field
    fmt.Println("VSOA Get: Axis fields, Roll:", param.Roll, " Pitch:", param.Pitch, " Yaw:", param.Yaw)
}

from vsoa.client import Client
import vsoa, sys

# Server name to connect
SERVER_NAME = 'axis_server'

# Password
PASSWD = '123456'

# Create client
client = Client()

# Listen subscribed message
def onmessage(client: Client, url: str, payload: vsoa.Payload, quick: bool):
    print('Received event: {} payload: {}'.format(url, str(payload.param)))

client.onmessage = onmessage

# Listen connect event
def onconnect(client: Client, conn: bool, info: str | dict | list):
    print('Connected with server: {}'.format(str(info)))
    ret = client.subscribe('/axis')
    print('Subscribe {}', 'OK' if ret else 'error')

client.onconnect = onconnect
if err := client.connect('vsoa://{}'.format(SERVER_NAME), PASSWD):
    print('Connect error: {}'.format(err))
    sys.exit(-1)

# Event loop
client.run()

注意:

  • 在 Node.js 的环境中,不需要事件循环。
  • 在该范例中,因为使用了独立的位置服务或 ECSM 集成的位服务,所以客户端可以通过 URL 定位到指定的服务,不需要通过 IP 和端口信息访问服务。
文档内容是否对您有所帮助?
有帮助
没帮助