服务端开发
本节介绍使用 VSOA Stream 进行服务端开发的方法。
使用前提
要使用 VSOA Stream,服务端和客户端需要分别创建 Server Stream 和 Client Stream,创建的流程如下:
- 服务端创建 VSOA Server Stream,并获得通道 ID (
tunid
)。 - 服务端将通道 ID 返回客户端。
- 客户端根据此通道 ID 创建 VSOA Client Stream。
VSOA Stream 创建后,便可进行数据通讯。在数据通讯完成时,可以关闭 VSOA Stream。
常用接口
vsoa_server_stream_t *vsoa_server_stream_create(vsoa_server_t *server);
int vsoa_server_stream_accept(vsoa_server_stream_t *stream, struct sockaddr *addr,
socklen_t *namelen, int keepalive);
void vsoa_server_stream_close(vsoa_server_stream_t *stream);
server.createStream([timeout]);
Stream server.createStream(int timeout);
int stream.getTunid();
func (s *Server) NewServerStream(res *protocol.Message) (ss *ServerStream, err error)
func (ss *ServerStream) ServeListener(pushBuf, receiveBuf *bytes.Buffer)
server.create_stream(onlink: callable, ondata: callable, timeout: float = 5.0) -> ServerStream
说明:
VSOA Stream 服务端的详细接口说明可参考以下手册:
- C 版本可参考 VSOA C 语言编程手册。
- JavaScript 版本可参考 VSOA JavaScript 编程手册。
- Java 版本可参考 VSOA Java 编程手册。
- Golang 版本可参考 VSOA Golang 编程手册。
- Python 版本可参考 VSOA Pyhon 编程手册。
开发示例
在本示例中,服务端执行了以下流程:
- 创建 VSOA 服务器
vsoa_stream_server
。 - 监听 RPC URL
/read
,当 URL 被请求时,新建一个 VSOA Server Stream。 - 将 VSOA Server Stream 的通道 ID,作为 RPC 请求的响应,返回给客户端。
- 当客户端使用通道 ID 创建 VSOA Client Stream,并成功连接时,服务端向客户端发送 10 字节的数据。
运行成功时,服务端有如下输出:
- 在创建 VSOA 服务器成功后,打印
Started VSOA stream server.
。 - 在创建 VSOA Server Stream 成功后,打印
Replied client with tunnel ID:
,后面带有返回给客户端的通道 ID。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_parser.h"
#include "vsoa_server.h"
static uint8_t data[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
static void *thread_write_stream(void *arg)
{
vsoa_server_stream_t *stream = (vsoa_server_stream_t *)arg;
struct timeval timeout;
fd_set fds;
int count;
int client;
ssize_t sent;
// set client stream connection timeout to 5s
timeout.tv_sec = 5;
timeout.tv_usec = 0;
FD_ZERO(&fds);
FD_SET(stream->listenfd, &fds);
count = select(stream->listenfd + 1, &fds, NULL, NULL, &timeout);
if (count <= 0) {
fprintf(stderr, "Stream has no client connected.\n");
goto out;
}
client = vsoa_server_stream_accept(stream, NULL, NULL, 0);
if (client < 0) {
goto out;
}
sent = send(client, data, sizeof(data), 0);
if (sent != sizeof(data)) {
fprintf(stderr, "Cannot write stream.\n");
}
out:
vsoa_server_stream_close(stream);
return NULL;
}
static void command_read(void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
vsoa_header_t *vsoa_hdr, vsoa_url_t *url, vsoa_payload_t *payload)
{
uint32_t seqno = vsoa_parser_get_seqno(vsoa_hdr);
pthread_t thread;
vsoa_server_stream_t *stream;
// create vsoa server stream
stream = vsoa_server_stream_create(server);
if (!stream) {
fprintf(stderr, "Cannot create server stream.\n");
vsoa_server_cli_reply(server, cid, VSOA_STATUS_NO_MEMORY, seqno, 0, NULL);
return;
}
// create dedicated thread to write stream
pthread_create(&thread, NULL, thread_write_stream, stream);
// make a client reply
vsoa_server_cli_reply(server, cid, 0, seqno, stream->tunid, NULL);
// print server reply
printf("Replied client with tunnel ID: %d\n", stream->tunid);
}
int main(int argc, char *argv[])
{
vsoa_server_t *server;
vsoa_url_t url;
uint16_t server_port = 3001;
struct sockaddr_in addr;
int ret;
fd_set fds;
int max_fd;
int count;
struct timespec timeout;
extern char **environ;
int exits;
char **env = environ;
char *auto_key = "VSOA_AUTO_PORT=";
char *vsoa_auto_port ="\0";
// check if 'VSOA_AUTO_PORT' environment variable exists
while (*env) {
exits = memcmp(*env, auto_key, (size_t)strlen(auto_key));
if (exits == 0) {
vsoa_auto_port = *env;
break;
}
env++;
}
// get server port from 'VSOA_AUTO_PORT' environment variable
if (strlen(vsoa_auto_port) > 1) {
const char s[2] = "=";
char *port_str = "\0";
char *token;
token = strtok(vsoa_auto_port, s);
while (token != NULL) {
port_str = token;
token = strtok(NULL, s);
}
server_port = strtol(port_str, NULL, 10);
}
// set server listen address and port
bzero(&addr, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(server_port);
addr.sin_addr.s_addr = INADDR_ANY;
// create vsoa server
server = vsoa_server_create("{\"name\":\"vsoa_stream_server\"}");
if (!server) {
printf("Cannot create VSOA stream server.\n");
return -1;
}
// set a rpc url '/read' for stream service
url.url = "/read";
url.url_len = strlen(url.url);
vsoa_server_add_listener(server, &url, command_read, NULL);
// start vsoa server
ret = vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
if (!ret) {
printf("Cannot start VSOA stream server.\n");
vsoa_server_close(server);
return -1;
}
printf("Started VSOA stream server.\n");
// set timeout to 1s
timeout.tv_sec = 1;
timeout.tv_nsec = 0;
// server event loop
while (1) {
FD_ZERO(&fds);
max_fd = vsoa_server_fds(server, &fds);
count = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
if (count > 0) {
vsoa_server_input_fds(server, &fds);
}
}
return 0;
}
var vsoa = require('vsoa');
var socket = require('socket');
var process = require('process');
/*
* Create server
*/
var server = new vsoa.Server({
info: {
name: 'vsoa_stream_server'
}
});
/*
* Client event
*/
server.onclient = function(client, link) {
if (link) {
client.setKeepAlive(5000);
}
};
/*
* /read Callback
*/
var buffer = new Buffer([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
server.on('/read', function (cli, request, payload) {
var stream = server.createStream();
cli.reply(0, request.seqno, stream.tunid);
console.log('Replied client stream tunnel ID: ' + stream.tunid);
stream.on('connect', () => {
stream.write(buffer);
stream.close();
});
stream.on('timeout', () => {
console.log('No client connect stream!');
});
});
/*
* Get server port from 'VSOA_AUTO_PORT' environment variableif it exists
*/
var server_port = 3001;
const auto_port = process.getenv('VSOA_AUTO_PORT');
if (auto_port) {
server_port = Number(auto_port);
}
/*
* Server start
*/
server.start({
domain: socket.AF_INET, addr: '127.0.0.1', port:server_port
});
/*
* Event loop
*/
require('iosched').forever();
package example;
import java.net.InetSocketAddress;
import com.acoinfo.vsoa.*;
import com.acoinfo.vsoa.Server.CliHandle;
public class streamServer {
private static final String SERVER_NAME = "vsoa_stream_server";
private static final String SERVER_INFO = "\"VSOA stream server\"";
private static final String PASSWORD = null;
private static final String SERVER_ADDR = "0.0.0.0";
private static int SERVER_PORT = 3001;
static Server server;
public static void main(String[] args) {
/*
* Get server port from 'VSOA_AUTO_PORT' environment variable
* if it exists
*/
String auto_port = System.getenv("VSOA_AUTO_PORT");
if (auto_port != null) {
SERVER_PORT = Integer.parseInt(auto_port);
}
/*
* Initialize server
*/
try {
ServerOption opt = new ServerOption(SERVER_INFO, PASSWORD, false);
InetSocketAddress address = new InetSocketAddress(SERVER_ADDR, SERVER_PORT);
server = new Server(opt) {
@Override
public void onClient(CliHandle client, boolean link) {
if (!client.isConnected()) {
System.out.println("disconnected");
}
}
};
/*
* Start server
*/
if (!server.start(address, null)) {
return;
}
System.out.println("Started VSOA stream server.");
/*
* /read Callback
*/
server.on("/read", new CBOnCall() {
@Override
public boolean Callback(String url, CliHandle client,
Request req, Payload payload) {
byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
Stream stream = server.createStream(Stream.DEF_TIMEOUT);
client.reply(Constant.SUCCESS, req.seqno, null, stream.getTunid());
System.out.println("Replied client with tunnel ID: " + stream.getTunid());
if (!stream.waitForConnect()) {
System.out.println("Stream timeout");
}
stream.write(data);
stream.close();
return true;
}
});
} catch (Exception e1) {
e1.printStackTrace();
return ;
}
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package main
import (
"bytes"
"time"
"gitee.com/sylixos/go-vsoa/protocol"
"gitee.com/sylixos/go-vsoa/server"
)
// startServer initializes and starts the golang server.
func startServer() {
// Initialize server options
serverOption := server.Option{
Password: "123456",
}
// Create a new server instance
s := server.NewServer("golang VSOA stream server", serverOption)
// Register URL and handler function
h := func(req, res *protocol.Message) {
// Create a new server stream
ss, _ := s.NewServerStream(res)
// Prepare push and receive buffers
data := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
pushBuf := bytes.NewBuffer(data)
receiveBuf := bytes.NewBufferString("")
// Start serving the server stream in a goroutine
go func() {
ss.ServeListener(pushBuf, receiveBuf)
}()
}
s.On("/read", protocol.RpcMethodGet, h)
// Start the server in a goroutine
go func() {
_ = s.Serve("127.0.0.1:3001")
}()
}
func main() {
startServer()
for {
time.Sleep(1 * time.Second)
}
}
from vsoa.server import Server, Client, Stream
from vsoa.interface import Request, Payload
import os
# Create server
server = Server('vsoa_stream_server')
# Client event
def onclient(cli: Client, conn: bool):
if conn:
cli.keepalive(5)
server.onclient = onclient
# /read callback
@server.command('/read')
def read(cli: Client, request: Request, payload: Payload):
def onlink(stream: Stream, conn: bool):
buffer = bytes(range(10))
if conn:
stream.send(buffer)
def ondata(stream: Stream, data: bytes):
print('Received: ', len(data))
stream = server.create_stream(onlink, ondata)
cli.reply(request.seqno, tunid=stream.tunid)
# Get server port from 'VSOA_AUTO_PORT' environment variableif it exists
server_port = int(auto_port) if (auto_port := os.environ.get('VSOA_AUTO_PORT')) else 3001
# Server start
server.run('127.0.0.1', server_port)
说明:
- 运行示例前,需要在 VSOA Position 服务中添加
vsoa_stream_server
的解析实现。- 在 Node.js 环境中,需要替换为
process.env.VSOA_AUTO_PORT
来获取VSOA_AUTO_PORT
环境量。- 在 Node.js 环境中,不需要最后的事件循环操作,需要去除
require('iosched').forever()
。- 在 Node.js 环境中,
socket.AF_INET
应为vsoa.AF_INET
,详情可见 https://www.npmjs.compackage/vsoa 。- 在 Node.js 环境中,因
Buffer()
的接口已经弃用,所以需要将初始化换为Buffer.from()
。
功能示例
在开发示例章节中提供了使用 VSOA Stream 功能开发文件发送和接收的客户端与服务端示例,详细实现流程和功能代码示例见 开发示例。