客户端开发

更新时间:
2024-12-23

客户端开发

本节介绍使用 VSOA Stream 进行客户端开发的方法。

使用前提

要使用 VSOA Stream,服务端和客户端需要分别创建 Server Stream 和 Client Stream,创建流程如下:

  1. 服务端创建 VSOA Server Stream,并获得通道 ID (tunid)。
  2. 服务端将通道 ID 返回客户端。
  3. 客户端根据此通道 ID 创建 VSOA Client Stream。

VSOA Stream 创建完成后,便可进行数据通讯。在数据通讯完成时,可以关闭 VSOA Stream。

常用接口

int vsoa_client_stream_create(vsoa_client_t *client, uint16_t tunid,
                              const struct timespec *timeout, int keepalive);

void vsoa_client_stream_close(int stream);
client.createStream(tunid[, timeout]);
Stream client.createStream(int tunid, int timeout);
func (client *Client) NewClientStream(tunid uint16) (cs *ClientStream, err error) 
func (cs *ClientStream) StopClientStream() (err error)
func (cs *ClientStream) Read(buf []byte) (int, error)
func (cs *ClientStream) Write(buf []byte) (int, error) 
client.create_stream(tunid: int, onlink: callable, ondata: callable, timeout: float = CLIENT_DEF_CONN_TIMEOUT) -> Stream

说明:

VSOA Stream 客户端的详细接口说明可参考以下手册:

开发示例

在本示例中,客户端执行以下流程:

  1. 连接 VSOA 服务器 vsoa_stream_server
  2. 向服务端 URL /read 发送 RPC Get 请求,并新建一个 VSOA Client Stream,读取服务器发送的数据。
  3. 打印收到的数据并关闭 VSOA Stream。

运行成功时,客户端会有如下输出:

  1. 在成功连接到 VSOA 服务器后,打印 Connected to VSOA stream server.
  2. 在成功接收服务器发送的数据后,打印 Received 10 bytes from VSOA stream:,后面带有收到的数据。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#ifdef SYLIXOS
#include <sys/vproc.h>
#endif
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>

#include "vsoa_client.h"
#include "vsoa_cliauto.h"
#include "vsoa_position.h"

static void read_callback(void *arg, vsoa_client_t *client,
                          vsoa_header_t *vsoa_hdr, vsoa_payload_t *payload)
{
    int             tunid;
    int             status;
    struct timespec timeout;
    uint8_t         buffer[32];
    int             stream;
    int             len;
    int             i;

    // check if server response is valid
    if (!vsoa_hdr) {
        fprintf(stderr, "Server is not responding.\n");
        return;
    }
    
    status = vsoa_parser_get_status(vsoa_hdr);
    if (status != 0) {
        fprintf(stderr, "Server reply status is %d.\n", status);
        return;
    }

    tunid = vsoa_parser_get_tunid(vsoa_hdr);

    // set stream creation timeout to 1s
    timeout.tv_sec  = 1;
    timeout.tv_nsec = 0;

    // create vsoa client stream
    stream = vsoa_client_stream_create(client, tunid, &timeout, 0);

    // receive data from server stream
    len = recv(stream, buffer, sizeof(buffer), 0);
    if (len > 0) {
        printf("Received %d bytes from VSOA stream: ", len);
        for (i = 0; i < len; i++) {
            printf("0x%02x ", buffer[i]);
        }
        printf("\n");
    }

    // close vsoa stream
    vsoa_client_stream_close(stream);
}

static void connect_callback(void *arg, vsoa_client_auto_t *cliauto,
                             bool connect, const char *info)
{
    vsoa_client_t       *client;
    vsoa_url_t           url;

    if (!connect) {
        fprintf(stderr, "Cannot connect to VSOA stream server.\n");
        return;
    }

    printf("Connected to VSOA stream server.\n");

    // get client handle
    client  = vsoa_client_auto_handle(cliauto);

    // set rpc call url to '/read'
    url.url     = "/read";
    url.url_len = strlen(url.url);

    // make rpc call
    if (!vsoa_client_call(client, VSOA_CLIENT_RPC_METHOD_GET,
                          &url, NULL, read_callback, NULL, NULL)) {
        fprintf(stderr, "RPC call failed: %s (%d)\n", strerror(errno), errno);
        return;
    }
}

int main(int argc, char *argv[])
{
    vsoa_client_auto_t  *cliauto;

#ifdef SYLIXOS
    vprocExitModeSet(getpid(), LW_VPROC_EXIT_FORCE);
#endif

    /*
     * Create client auto robot
     */
    cliauto = vsoa_client_auto_create(NULL, NULL);

    /*
     * Add a connection callback
     * The callback is called automatically when client is connected to the server.
     */
    if (!vsoa_client_auto_setup(cliauto, connect_callback, NULL)) {
        vsoa_client_auto_delete(cliauto);
        fprintf(stderr, "Cannot register connect callback: %s (%d)\n", strerror(errno), errno);
        return -1;
    }

    /*
     * Client auto robot start
     * The robot will automatically connect to the specified server and maintain the connection.
     * At this time, the developer only needs to focus on the business.
     */
    if (!vsoa_client_auto_start(cliauto, "vsoa://vsoa_stream_server",
                                NULL, NULL, 0, 1000, 1000, 1000)) {
        vsoa_client_auto_delete(cliauto);
        fprintf(stderr, "Client auto start failed: %s (%d)\n", strerror(errno), errno);
        return -1;
    }

    while (true) {
        sleep(1);
    }

    return 0;
}
/* Server name to connect */
var vsoa = require('vsoa');

/* Server name to connect */
const SERVER_NAME = 'vsoa_stream_server';

/* Client */
var client = new vsoa.Client();

/*
 * Listen connect event
 */
client.on('connect', function(info) {
    console.info('Connected to VSOA stream server.');

    client.call('/read', function (error, payload, tunid) {
        if (tunid) {
            var stream = client.createStream(tunid);

            stream.on('data', (chunk) => {
                console.log('Received ' + chunk.length + ' bytes from VSOA stream: ' +
                            chunk.toString('hex'));
            });

            stream.on('timeout', () => {
                console.log('Stream connect error!');
            });
        }

    }, 2000);
});

client.connect(`vsoa://${SERVER_NAME}`, error => {
    if (error) {
        console.warn('Connect error:', error.message);
    }
});

/*
 * Event loop
 */
require('iosched').forever();
package example;

import java.net.InetSocketAddress;
import com.acoinfo.vsoa.*;

public class streamClient {
    private static final String SERVER_NAME   = "vsoa_stream_server";
    private static final String PASSWORD      = null;

    public static Client client;

    public static void main(String[] args) {

        /*
         * Initialize client
         */
        client = new Client(new ClientOption(PASSWORD, 6000, 4000, 3, false)) {

            @Override
            public void onError(com.acoinfo.vsoa.Error error) {
                System.out.println("Client error:" + error.message);
            }

            @Override
            public void onConnected(String info) {
                System.out.println("Connected to VSOA stream server.");
            }
        };

        if (!client.connect("vsoa://" + SERVER_NAME, null, Constant.VSOA_DEF_CONN_TIMEOUT)) {
            System.out.println("Connected with server failed");
            return;
        }

        /*
         * Call /read
         */
        client.call("/read", Request.VSOA_METHOD_GET, null, new CBCall() {
            @Override
            public void callback(com.acoinfo.vsoa.Error error, Payload payload, int tunid) {
                if (error != null) {
                    System.out.println("RPC call error:" + error.message);
                } else {
                    Stream stream = client.createStream(tunid, Stream.DEF_TIMEOUT);
                    if (stream != null) {
                        byte[] data;
                        while ((data = stream.read()) != null) {

                            String strData = "";
                            for (int i = 0; i < data.length; i++) {
                                strData += String.format("0x%02x ", data[i]);
                            }
                            System.out.println("Received " + data.length +
                                               " bytes from VSOA stream: " + strData);
                        }
                    }
                }
            }
        }, 5000);

        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
package main

import (
    "bytes"
    "errors"
    "fmt"
    "io"

    "gitee.com/sylixos/go-vsoa/client"
    "gitee.com/sylixos/go-vsoa/protocol"
)

func VsoaStreamCall() {
    // Set client options
    clientOption := client.Option{
        Password: "123456",
    }

    var StreamTunID uint16
    streamDone := make(chan int)

    // Create a new client instance
    c := client.NewClient(clientOption)

    // Connect to the VSOA server
    _, err := c.Connect("vsoa", "localhost:3001")
    if err != nil {
        fmt.Println(err)
    }

    // Close the connection when the function exits
    defer c.Close()

    req := protocol.NewMessage()

    // Send a request to the server using the "/read" method
    reply, err := c.Call("/read", protocol.TypeRPC, protocol.RpcMethodGet, req)
    if err != nil {
        // Check if the error is due to an invalid URL
        if err == errors.New(protocol.StatusText(protocol.StatusInvalidUrl)) {
            fmt.Println("Pass: Invalid URL")
        } else {
            fmt.Println(err)
        }
        return
    } else {
        StreamTunID = reply.TunID()
        fmt.Println("Seq:", reply.SeqNo(), "Stream TunID:", StreamTunID)
    }

    receiveBuf := bytes.NewBufferString("")

    // Create a new client stream
    cs, err := c.NewClientStream(StreamTunID)
    if err != nil {
        fmt.Println(err)
        return
    } else {
        go func() {
            buf := make([]byte, 32*1024)
            for {
                n, err := cs.Read(buf)
                if err != nil {
                    // EOF means stream closed
                    if err == io.EOF {
                        break
                    } else {
                        fmt.Println(err)
                        break
                    }
                }
                receiveBuf.Write(buf[:n])
                fmt.Println("stream receiveBuf:", receiveBuf.Bytes())

                // In this example, we just receive little data from server, so we just stop here
                goto STOP
            }

        STOP:
            cs.StopClientStream()
            streamDone <- 1
        }()
    }
    // Don't close the stream util the stream goroutine is done
    <-streamDone
}

func main() {
    VsoaStreamCall()
}
from vsoa.client import Client, Stream
import vsoa, sys

# Server name to connect
SERVER_NAME = 'vsoa_stream_server'

# Create client
client = Client()

# Listen connect event
def onconnect(client: Client, conn: bool, info: str | dict | list):
    print('Connected to VSOA stream server.')

    def onreply(client: Client, header: vsoa.Header, payload: vsoa.Payload):
        def onlink(stream: Stream, conn: bool):
            pass

        def ondata(stream: Stream, data: bytes):
            print('Received {} bytes from VSOA stream: {}'.format(len(data), data.hex))

        if header and header.tunid > 0:
            stream = client.create_stream(header.tunid, onlink, ondata)

    client.call(url='/read', callback=onreply, timeout=2000)

client.onconnect = onconnect
if err := client.connect('vsoa://{}'.format(SERVER_NAME)):
    print('Connect error: {}'.format(err))
    sys.exit(-1)

# Event loop
client.run()

说明:

  • 在 Node.js 环境中,不需要最后的事件循环操作,需要去除 require('iosched').forever();
  • 在该范例中,因为使用了独立的位置服务或 ECSM 集成的位置服务,所以客户端可以自动定位到指定的服务位置,不需要指定任何 IP 和端口信息。

功能示例

在开发示例章节中提供了使用 VSOA Stream 功能开发文件发送和接收的客户端与服务端示例,详细实现流程和功能代码示例见 开发示例

代码调试

VSOA 提供的代码诊断工具 vcx 可用于调试 VSOA Stream 功能,下面以调试开发示例中的代码为例介绍 vcx 的使用方法:

接收文件

vcx -r ./read.dat vsoa://vsoa_stream_server/read
  • -r 使用 VSOA Stream 接收文件。
  • ./read.dat 通过 VSOA Stream 接收的数据存放在此文件中。

发送文件

vcx -w ./read.dat vsoa://vsoa_stream_server/read
  • -w 使用 VSOA Stream 发送文件。
  • ./read.dat 此文件的数据通过 VSOA Stream 发送至服务端。
文档内容是否对您有所帮助?
有帮助
没帮助