服务端开发

更新时间:
2024-12-19

服务端开发

本节介绍使用 VSOA Stream 进行服务端开发的方法。

使用前提

要使用 VSOA Stream,服务端和客户端需要分别创建 Server Stream 和 Client Stream,创建的流程如下:

  1. 服务端创建 VSOA Server Stream,并获得通道 ID (tunid)。
  2. 服务端将通道 ID 返回客户端。
  3. 客户端根据此通道 ID 创建 VSOA Client Stream。

VSOA Stream 创建后,便可进行数据通讯。在数据通讯完成时,可以关闭 VSOA Stream。

常用接口

vsoa_server_stream_t *vsoa_server_stream_create(vsoa_server_t *server);

int vsoa_server_stream_accept(vsoa_server_stream_t *stream, struct sockaddr *addr,
                              socklen_t *namelen, int keepalive);

void vsoa_server_stream_close(vsoa_server_stream_t *stream);
server.createStream([timeout]);
Stream server.createStream(int timeout);

int stream.getTunid();
func (s *Server) NewServerStream(res *protocol.Message) (ss *ServerStream, err error)
func (ss *ServerStream) ServeListener(pushBuf, receiveBuf *bytes.Buffer)
server.create_stream(onlink: callable, ondata: callable, timeout: float = 5.0) -> ServerStream

说明:

VSOA Stream 服务端的详细接口说明可参考以下手册:

开发示例

在本示例中,服务端执行了以下流程:

  1. 创建 VSOA 服务器 vsoa_stream_server
  2. 监听 RPC URL /read,当 URL 被请求时,新建一个 VSOA Server Stream。
  3. 将 VSOA Server Stream 的通道 ID,作为 RPC 请求的响应,返回给客户端。
  4. 当客户端使用通道 ID 创建 VSOA Client Stream,并成功连接时,服务端向客户端发送 10 字节的数据。

运行成功时,服务端有如下输出:

  1. 在创建 VSOA 服务器成功后,打印 Started VSOA stream server.
  2. 在创建 VSOA Server Stream 成功后,打印 Replied client with tunnel ID:,后面带有返回给客户端的通道 ID。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/select.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_parser.h"
#include "vsoa_server.h"

static uint8_t data[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };

static void *thread_write_stream(void *arg)
{
    vsoa_server_stream_t   *stream = (vsoa_server_stream_t *)arg;
    struct timeval          timeout;
    fd_set                  fds;
    int                     count;
    int                     client;
    ssize_t                 sent;

    // set client stream connection timeout to 5s
    timeout.tv_sec  = 5;
    timeout.tv_usec = 0;

    FD_ZERO(&fds);
    FD_SET(stream->listenfd, &fds);

    count = select(stream->listenfd + 1, &fds, NULL, NULL, &timeout);
    if (count <= 0) {
        fprintf(stderr, "Stream has no client connected.\n");
        goto out;
    }

    client = vsoa_server_stream_accept(stream, NULL, NULL, 0);
    if (client < 0) {
        goto out;
    }

    sent = send(client, data, sizeof(data), 0);
    if (sent != sizeof(data)) {
        fprintf(stderr, "Cannot write stream.\n");
    }

    out:
    vsoa_server_stream_close(stream);

    return NULL;
}

static void command_read(void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
                         vsoa_header_t *vsoa_hdr, vsoa_url_t *url, vsoa_payload_t *payload)
{
    uint32_t                seqno = vsoa_parser_get_seqno(vsoa_hdr);
    pthread_t               thread;
    vsoa_server_stream_t   *stream;

    // create vsoa server stream
    stream = vsoa_server_stream_create(server);
    if (!stream) {
        fprintf(stderr, "Cannot create server stream.\n");
        vsoa_server_cli_reply(server, cid, VSOA_STATUS_NO_MEMORY, seqno, 0, NULL);
        return;
    }

    // create dedicated thread to write stream
    pthread_create(&thread, NULL, thread_write_stream, stream);

    // make a client reply
    vsoa_server_cli_reply(server, cid, 0, seqno, stream->tunid, NULL);

    // print server reply
    printf("Replied client with tunnel ID: %d\n", stream->tunid);
}

int main(int argc, char *argv[])
{
    vsoa_server_t      *server;
    vsoa_url_t          url;
    uint16_t            server_port = 3001;
    struct sockaddr_in  addr;
    int                 ret;

    fd_set              fds;
    int                 max_fd;
    int                 count;
    struct timespec     timeout;

    extern char       **environ;
    int                 exits;
    char              **env             = environ;
    char               *auto_key        = "VSOA_AUTO_PORT=";
    char               *vsoa_auto_port  ="\0";

    // check if 'VSOA_AUTO_PORT' environment variable exists
    while (*env) {
        exits = memcmp(*env, auto_key, (size_t)strlen(auto_key));
        if (exits == 0) {
            vsoa_auto_port = *env;
            break;
        }
        env++;
    }

    // get server port from 'VSOA_AUTO_PORT' environment variable
    if (strlen(vsoa_auto_port) > 1) {
        const char  s[2]     = "=";
        char       *port_str = "\0";
        char       *token;

        token = strtok(vsoa_auto_port, s);
        while (token != NULL) {
            port_str = token;
            token = strtok(NULL, s);
        }

        server_port = strtol(port_str, NULL, 10);
    }

    // set server listen address and port
    bzero(&addr, sizeof(struct sockaddr_in));
    addr.sin_family      = AF_INET;
    addr.sin_port        = htons(server_port);
    addr.sin_addr.s_addr = INADDR_ANY;

    // create vsoa server
    server = vsoa_server_create("{\"name\":\"vsoa_stream_server\"}");
    if (!server) {
        printf("Cannot create VSOA stream server.\n");
        return -1;
    }

    // set a rpc url '/read' for stream service
    url.url = "/read";
    url.url_len = strlen(url.url);
    vsoa_server_add_listener(server, &url, command_read, NULL);

    // start vsoa server
    ret = vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
    if (!ret) {
        printf("Cannot start VSOA stream server.\n");
        vsoa_server_close(server);
        return -1;
    }

    printf("Started VSOA stream server.\n");

    // set timeout to 1s
    timeout.tv_sec  = 1;
    timeout.tv_nsec = 0;

    // server event loop
    while (1) {
        FD_ZERO(&fds);
        max_fd = vsoa_server_fds(server, &fds);

        count = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
        if (count > 0) {
            vsoa_server_input_fds(server, &fds);
        }
    }

    return 0;
}
var vsoa    = require('vsoa');
var socket  = require('socket');
var process = require('process');

/*
 * Create server
 */
var server = new vsoa.Server({
    info: {
        name: 'vsoa_stream_server'
    }
});

/*
 * Client event
 */
server.onclient = function(client, link) {
    if (link) {
        client.setKeepAlive(5000);
    }
};

/*
* /read Callback
*/
var buffer = new Buffer([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);

server.on('/read', function (cli, request, payload) {
    var stream = server.createStream();

    cli.reply(0, request.seqno, stream.tunid);
    console.log('Replied client stream tunnel ID: ' + stream.tunid);

    stream.on('connect', () => {
        stream.write(buffer);
        stream.close();
    });

    stream.on('timeout', () => {
        console.log('No client connect stream!');
    });
});

/*
 * Get server port from 'VSOA_AUTO_PORT' environment variableif it exists
 */
var server_port = 3001;
const auto_port = process.getenv('VSOA_AUTO_PORT');
if (auto_port) {
    server_port = Number(auto_port);
}

/*
 * Server start
 */
server.start({
    domain: socket.AF_INET, addr: '127.0.0.1', port:server_port
});

/*
 * Event loop
 */
require('iosched').forever();
package example;

import java.net.InetSocketAddress;
import com.acoinfo.vsoa.*;
import com.acoinfo.vsoa.Server.CliHandle;

public class streamServer {
    private static final String SERVER_NAME   = "vsoa_stream_server";
    private static final String SERVER_INFO   = "\"VSOA stream server\"";
    private static final String PASSWORD      = null;
    private static final String SERVER_ADDR   = "0.0.0.0";
    private static int          SERVER_PORT   = 3001;

    static Server server;

    public static void main(String[] args) {
        /*
         * Get server port from 'VSOA_AUTO_PORT' environment variable
         * if it exists
         */
        String auto_port = System.getenv("VSOA_AUTO_PORT");
        if (auto_port != null) {
            SERVER_PORT = Integer.parseInt(auto_port);
        }

        /*
         * Initialize server
         */
        try {
            ServerOption opt = new ServerOption(SERVER_INFO, PASSWORD, false);
            InetSocketAddress address = new InetSocketAddress(SERVER_ADDR, SERVER_PORT);
            server = new Server(opt) {
                @Override
                public void onClient(CliHandle client, boolean link) {
                    if (!client.isConnected()) {
                        System.out.println("disconnected");
                    }
                }
            };

            /*
             * Start server
             */
            if (!server.start(address, null)) {
                return;
            }
            System.out.println("Started VSOA stream server.");

            /*
             * /read Callback
             */
            server.on("/read", new CBOnCall() {
                @Override
                public boolean Callback(String url, CliHandle client,
                                        Request req, Payload payload) {
                    byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
                    Stream stream = server.createStream(Stream.DEF_TIMEOUT);
                    client.reply(Constant.SUCCESS, req.seqno, null, stream.getTunid());
                    System.out.println("Replied client with tunnel ID: " + stream.getTunid());

                    if (!stream.waitForConnect()) {
                        System.out.println("Stream timeout");
                    }

                    stream.write(data);
                    stream.close();

                    return true;
                }
            });


        } catch (Exception e1) {
            e1.printStackTrace();
            return ;
        }

        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
package main

import (
    "bytes"
    "time"

    "gitee.com/sylixos/go-vsoa/protocol"
    "gitee.com/sylixos/go-vsoa/server"
)

// startServer initializes and starts the golang server.
func startServer() {
    // Initialize server options
    serverOption := server.Option{
        Password: "123456",
    }

    // Create a new server instance
    s := server.NewServer("golang VSOA stream server", serverOption)

    // Register URL and handler function
    h := func(req, res *protocol.Message) {
        // Create a new server stream
        ss, _ := s.NewServerStream(res)

        // Prepare push and receive buffers
        data := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
        pushBuf := bytes.NewBuffer(data)
        receiveBuf := bytes.NewBufferString("")

        // Start serving the server stream in a goroutine
        go func() {
            ss.ServeListener(pushBuf, receiveBuf)
        }()
    }
    s.On("/read", protocol.RpcMethodGet, h)

    // Start the server in a goroutine
    go func() {
        _ = s.Serve("127.0.0.1:3001")
    }()
}

func main() {
    startServer()

    for {
        time.Sleep(1 * time.Second)
    }
}
from vsoa.server import Server, Client, Stream
from vsoa.interface import Request, Payload
import os

# Create server
server = Server('vsoa_stream_server')

# Client event
def onclient(cli: Client, conn: bool):
    if conn:
        cli.keepalive(5)

server.onclient = onclient

# /read callback
@server.command('/read')
def read(cli: Client, request: Request, payload: Payload):
    def onlink(stream: Stream, conn: bool):
        buffer = bytes(range(10))
        if conn:
            stream.send(buffer)

    def ondata(stream: Stream, data: bytes):
        print('Received: ', len(data))

    stream  = server.create_stream(onlink, ondata)
    cli.reply(request.seqno, tunid=stream.tunid)

# Get server port from 'VSOA_AUTO_PORT' environment variableif it exists
server_port = int(auto_port) if (auto_port := os.environ.get('VSOA_AUTO_PORT')) else 3001

# Server start
server.run('127.0.0.1', server_port)

说明:

  • 运行示例前,需要在 VSOA Position 服务中添加 vsoa_stream_server 的解析实现。
  • 在 Node.js 环境中,需要替换为 process.env.VSOA_AUTO_PORT 来获取 VSOA_AUTO_PORT 环境量。
  • 在 Node.js 环境中,不需要最后的事件循环操作,需要去除 require('iosched').forever()
  • 在 Node.js 环境中,socket.AF_INET 应为 vsoa.AF_INET,详情可见 https://www.npmjs.compackage/vsoa 。
  • 在 Node.js 环境中,因 Buffer() 的接口已经弃用,所以需要将初始化换为 Buffer.from()

功能示例

在开发示例章节中提供了使用 VSOA Stream 功能开发文件发送和接收的客户端与服务端示例,详细实现流程和功能代码示例见 开发示例

文档内容是否对您有所帮助?
有帮助
没帮助