客户端开发
本节介绍使用 VSOA Stream 进行客户端开发的方法。
使用前提
要使用 VSOA Stream,服务端和客户端需要分别创建 Server Stream 和 Client Stream,创建流程如下:
- 服务端创建 VSOA Server Stream,并获得通道 ID (
tunid
)。 - 服务端将通道 ID 返回客户端。
- 客户端根据此通道 ID 创建 VSOA Client Stream。
VSOA Stream 创建完成后,便可进行数据通讯。在数据通讯完成时,可以关闭 VSOA Stream。
常用接口
int vsoa_client_stream_create(vsoa_client_t *client, uint16_t tunid,
const struct timespec *timeout, int keepalive);
void vsoa_client_stream_close(int stream);
client.createStream(tunid[, timeout]);
Stream client.createStream(int tunid, int timeout);
func (client *Client) NewClientStream(tunid uint16) (cs *ClientStream, err error)
func (cs *ClientStream) StopClientStream() (err error)
func (cs *ClientStream) Read(buf []byte) (int, error)
func (cs *ClientStream) Write(buf []byte) (int, error)
client.create_stream(tunid: int, onlink: callable, ondata: callable, timeout: float = CLIENT_DEF_CONN_TIMEOUT) -> Stream
说明:
VSOA Stream 客户端的详细接口说明可参考以下手册:
- C 版本可参考 VSOA C 语言编程手册。
- JavaScript 版本可参考 VSOA JavaScript 编程手册。
- Java 版本可参考 VSOA Java 编程手册。
- Golang 版本可参考 VSOA Golang 编程手册。
- Python 版本可参考 VSOA Pyhon 编程手册。
开发示例
在本示例中,客户端执行以下流程:
- 连接 VSOA 服务器
vsoa_stream_server
。 - 向服务端 URL
/read
发送 RPC Get 请求,并新建一个 VSOA Client Stream,读取服务器发送的数据。 - 打印收到的数据并关闭 VSOA Stream。
运行成功时,客户端会有如下输出:
- 在成功连接到 VSOA 服务器后,打印
Connected to VSOA stream server.
。 - 在成功接收服务器发送的数据后,打印
Received 10 bytes from VSOA stream:
,后面带有收到的数据。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#ifdef SYLIXOS
#include <sys/vproc.h>
#endif
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include "vsoa_client.h"
#include "vsoa_cliauto.h"
#include "vsoa_position.h"
static void read_callback(void *arg, vsoa_client_t *client,
vsoa_header_t *vsoa_hdr, vsoa_payload_t *payload)
{
int tunid;
int status;
struct timespec timeout;
uint8_t buffer[32];
int stream;
int len;
int i;
// check if server response is valid
if (!vsoa_hdr) {
fprintf(stderr, "Server is not responding.\n");
return;
}
status = vsoa_parser_get_status(vsoa_hdr);
if (status != 0) {
fprintf(stderr, "Server reply status is %d.\n", status);
return;
}
tunid = vsoa_parser_get_tunid(vsoa_hdr);
// set stream creation timeout to 1s
timeout.tv_sec = 1;
timeout.tv_nsec = 0;
// create vsoa client stream
stream = vsoa_client_stream_create(client, tunid, &timeout, 0);
// receive data from server stream
len = recv(stream, buffer, sizeof(buffer), 0);
if (len > 0) {
printf("Received %d bytes from VSOA stream: ", len);
for (i = 0; i < len; i++) {
printf("0x%02x ", buffer[i]);
}
printf("\n");
}
// close vsoa stream
vsoa_client_stream_close(stream);
}
static void connect_callback(void *arg, vsoa_client_auto_t *cliauto,
bool connect, const char *info)
{
vsoa_client_t *client;
vsoa_url_t url;
if (!connect) {
fprintf(stderr, "Cannot connect to VSOA stream server.\n");
return;
}
printf("Connected to VSOA stream server.\n");
// get client handle
client = vsoa_client_auto_handle(cliauto);
// set rpc call url to '/read'
url.url = "/read";
url.url_len = strlen(url.url);
// make rpc call
if (!vsoa_client_call(client, VSOA_CLIENT_RPC_METHOD_GET,
&url, NULL, read_callback, NULL, NULL)) {
fprintf(stderr, "RPC call failed: %s (%d)\n", strerror(errno), errno);
return;
}
}
int main(int argc, char *argv[])
{
vsoa_client_auto_t *cliauto;
#ifdef SYLIXOS
vprocExitModeSet(getpid(), LW_VPROC_EXIT_FORCE);
#endif
/*
* Create client auto robot
*/
cliauto = vsoa_client_auto_create(NULL, NULL);
/*
* Add a connection callback
* The callback is called automatically when client is connected to the server.
*/
if (!vsoa_client_auto_setup(cliauto, connect_callback, NULL)) {
vsoa_client_auto_delete(cliauto);
fprintf(stderr, "Cannot register connect callback: %s (%d)\n", strerror(errno), errno);
return -1;
}
/*
* Client auto robot start
* The robot will automatically connect to the specified server and maintain the connection.
* At this time, the developer only needs to focus on the business.
*/
if (!vsoa_client_auto_start(cliauto, "vsoa://vsoa_stream_server",
NULL, NULL, 0, 1000, 1000, 1000)) {
vsoa_client_auto_delete(cliauto);
fprintf(stderr, "Client auto start failed: %s (%d)\n", strerror(errno), errno);
return -1;
}
while (true) {
sleep(1);
}
return 0;
}
/* Server name to connect */
var vsoa = require('vsoa');
/* Server name to connect */
const SERVER_NAME = 'vsoa_stream_server';
/* Client */
var client = new vsoa.Client();
/*
* Listen connect event
*/
client.on('connect', function(info) {
console.info('Connected to VSOA stream server.');
client.call('/read', function (error, payload, tunid) {
if (tunid) {
var stream = client.createStream(tunid);
stream.on('data', (chunk) => {
console.log('Received ' + chunk.length + ' bytes from VSOA stream: ' +
chunk.toString('hex'));
});
stream.on('timeout', () => {
console.log('Stream connect error!');
});
}
}, 2000);
});
client.connect(`vsoa://${SERVER_NAME}`, error => {
if (error) {
console.warn('Connect error:', error.message);
}
});
/*
* Event loop
*/
require('iosched').forever();
package example;
import java.net.InetSocketAddress;
import com.acoinfo.vsoa.*;
public class streamClient {
private static final String SERVER_NAME = "vsoa_stream_server";
private static final String PASSWORD = null;
public static Client client;
public static void main(String[] args) {
/*
* Initialize client
*/
client = new Client(new ClientOption(PASSWORD, 6000, 4000, 3, false)) {
@Override
public void onError(com.acoinfo.vsoa.Error error) {
System.out.println("Client error:" + error.message);
}
@Override
public void onConnected(String info) {
System.out.println("Connected to VSOA stream server.");
}
};
if (!client.connect("vsoa://" + SERVER_NAME, null, Constant.VSOA_DEF_CONN_TIMEOUT)) {
System.out.println("Connected with server failed");
return;
}
/*
* Call /read
*/
client.call("/read", Request.VSOA_METHOD_GET, null, new CBCall() {
@Override
public void callback(com.acoinfo.vsoa.Error error, Payload payload, int tunid) {
if (error != null) {
System.out.println("RPC call error:" + error.message);
} else {
Stream stream = client.createStream(tunid, Stream.DEF_TIMEOUT);
if (stream != null) {
byte[] data;
while ((data = stream.read()) != null) {
String strData = "";
for (int i = 0; i < data.length; i++) {
strData += String.format("0x%02x ", data[i]);
}
System.out.println("Received " + data.length +
" bytes from VSOA stream: " + strData);
}
}
}
}
}, 5000);
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package main
import (
"bytes"
"errors"
"fmt"
"io"
"gitee.com/sylixos/go-vsoa/client"
"gitee.com/sylixos/go-vsoa/protocol"
)
func VsoaStreamCall() {
// Set client options
clientOption := client.Option{
Password: "123456",
}
var StreamTunID uint16
streamDone := make(chan int)
// Create a new client instance
c := client.NewClient(clientOption)
// Connect to the VSOA server
_, err := c.Connect("vsoa", "localhost:3001")
if err != nil {
fmt.Println(err)
}
// Close the connection when the function exits
defer c.Close()
req := protocol.NewMessage()
// Send a request to the server using the "/read" method
reply, err := c.Call("/read", protocol.TypeRPC, protocol.RpcMethodGet, req)
if err != nil {
// Check if the error is due to an invalid URL
if err == errors.New(protocol.StatusText(protocol.StatusInvalidUrl)) {
fmt.Println("Pass: Invalid URL")
} else {
fmt.Println(err)
}
return
} else {
StreamTunID = reply.TunID()
fmt.Println("Seq:", reply.SeqNo(), "Stream TunID:", StreamTunID)
}
receiveBuf := bytes.NewBufferString("")
// Create a new client stream
cs, err := c.NewClientStream(StreamTunID)
if err != nil {
fmt.Println(err)
return
} else {
go func() {
buf := make([]byte, 32*1024)
for {
n, err := cs.Read(buf)
if err != nil {
// EOF means stream closed
if err == io.EOF {
break
} else {
fmt.Println(err)
break
}
}
receiveBuf.Write(buf[:n])
fmt.Println("stream receiveBuf:", receiveBuf.Bytes())
// In this example, we just receive little data from server, so we just stop here
goto STOP
}
STOP:
cs.StopClientStream()
streamDone <- 1
}()
}
// Don't close the stream util the stream goroutine is done
<-streamDone
}
func main() {
VsoaStreamCall()
}
from vsoa.client import Client, Stream
import vsoa, sys
# Server name to connect
SERVER_NAME = 'vsoa_stream_server'
# Create client
client = Client()
# Listen connect event
def onconnect(client: Client, conn: bool, info: str | dict | list):
print('Connected to VSOA stream server.')
def onreply(client: Client, header: vsoa.Header, payload: vsoa.Payload):
def onlink(stream: Stream, conn: bool):
pass
def ondata(stream: Stream, data: bytes):
print('Received {} bytes from VSOA stream: {}'.format(len(data), data.hex))
if header and header.tunid > 0:
stream = client.create_stream(header.tunid, onlink, ondata)
client.call(url='/read', callback=onreply, timeout=2000)
client.onconnect = onconnect
if err := client.connect('vsoa://{}'.format(SERVER_NAME)):
print('Connect error: {}'.format(err))
sys.exit(-1)
# Event loop
client.run()
说明:
- 在 Node.js 环境中,不需要最后的事件循环操作,需要去除
require('iosched').forever();
。- 在该范例中,因为使用了独立的位置服务或 ECSM 集成的位置服务,所以客户端可以自动定位到指定的服务位置,不需要指定任何 IP 和端口信息。
功能示例
在开发示例章节中提供了使用 VSOA Stream 功能开发文件发送和接收的客户端与服务端示例,详细实现流程和功能代码示例见 开发示例。
代码调试
VSOA 提供的代码诊断工具 vcx 可用于调试 VSOA Stream 功能,下面以调试开发示例中的代码为例介绍 vcx 的使用方法:
接收文件
vcx -r ./read.dat vsoa://vsoa_stream_server/read
-r
使用 VSOA Stream 接收文件。./read.dat
通过 VSOA Stream 接收的数据存放在此文件中。
发送文件
vcx -w ./read.dat vsoa://vsoa_stream_server/read
-w
使用 VSOA Stream 发送文件。./read.dat
此文件的数据通过 VSOA Stream 发送至服务端。