服务端开发

更新时间:
2024-12-24

服务端开发

本节内容介绍 VSOA 服务端使用 QoS 的方法。

开发须知

VSOA 提供指定远程客户端优先级的接口,允许的优先级范围为 0 - 5 ,0 为最低优先级,5 为最高优先级。优先级控制 publish 的发送顺序和底层网络包的服务质量,请根据实际情况设置。

常用接口

bool vsoa_server_cli_priority(vsoa_server_t *server, vsoa_cli_id_t id, int new_prio);
client.priority(3);
boolean CliHandle.priority(int prio);
cli.priority(prio: int)

说明:

VSOA QoS 详细接口说明可参考以下手册:

开发示例

在本示例中,服务端执行以下流程:

  1. 服务端监听到客户端连接时,通过对应接口设置客户端优先级。
  2. 后续服务端 publish 时,将会根据优先级控制 publish 的发送顺序。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_server.h"
#include "vsoa_platform.h"

#define MY_SERVER_PASSWD "123456"

static vsoa_server_t *server;

/*
 * Publish axis thread
 */
static void *publish_axis_thread (void *arg)
{
    vsoa_url_t url;
    vsoa_payload_t payload;
    char param[100];
    int roll = 1, pitch = 1, yaw = 1;

    url.url     = "/axis";
    url.url_len = strlen(url.url);

    payload.data = NULL;
    payload.data_len  = 0;
    payload.param = param;

    while (1) {
        sleep(1);

        if (!vsoa_server_is_subscribed(server, &url)) {
            continue;
        }

        payload.param_len = sprintf(param,
                                    "{\"roll\": %d, \"pitch\": %d, \"yaw\": %d}",
                                    roll++, pitch++, yaw++);

        vsoa_server_publish(server, &url, &payload);
    }

    return (NULL);
}

/*
 * Server on client connect or lost callback
 */
static  void vsoa_server_client_connect_callback(void *arg, vsoa_server_t *server, vsoa_cli_id_t id, bool connect)
{
    /*
     * Set priority
     */
    vsoa_server_cli_priority(server, id, 3);

    printf("client id = %d, status = %s\r\n", id, (connect == true) ? "connect" : "disconnect");
}

int main (int argc, char **argv)
{
    int cnt, max_fd;
    fd_set fds;
    pthread_t id;
    struct sockaddr_in addr;
    struct timespec timeout = { 1, 0 };

    bzero(&addr, sizeof(struct sockaddr_in));
    addr.sin_family      = AF_INET;
    addr.sin_port        = htons(3002);
    addr.sin_addr.s_addr = INADDR_ANY;

#ifdef VSOA_HAS_SIN_LEN
    addr.sin_len = sizeof(struct sockaddr_in);
#endif

    /*
     * Initialize server
     */
    server = vsoa_server_create("{\"name\":\"axis_server\"}");
    if (!server) {
        fprintf(stderr, "Can not create VSOA server!\n");
        return  (-1);
    }


    /*
     * Set on client callback
     */
    vsoa_server_on_cli(server, vsoa_server_client_connect_callback, NULL);

    /*
     * If need password
     */
    vsoa_server_passwd(server, MY_SERVER_PASSWD);

    /*
     * Start server
     */
    if (!vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {
        vsoa_server_close(server);
        fprintf(stderr, "Can not start VSOA server!\n");
        return  (-1);
    }

    /*
     * Create publish thread
     */
    pthread_create(&id, NULL, publish_axis_thread, NULL);

    while (1) {
        FD_ZERO(&fds);
        max_fd = vsoa_server_fds(server, &fds);

        cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
        if (cnt > 0) {
            vsoa_server_input_fds(server, &fds);
        }
    }

    return (0);
}
var vsoa = require('vsoa');
var socket = require('socket');

/*
 * Create server
 */
var server = new vsoa.Server({
    info: {
        name: 'axis_server'
    }, passwd: '123456'
});

/*
 * Client event
 */
server.onclient = function(client, link) {
    console.log(`Client link ${link} address: ${client.address().addr}`);
    if (link) {
        client.setKeepAlive(5000);
        /*
         * Set priority
         */
        client.priority(3);
    }
};

/*
 * Publish /axis
 */
let roll = 0
let pitch = 0
let yaw = 0
setInterval(() => {
    server.publish('/axis', {
        param: {
            roll: roll++, pitch: pitch++, yaw: yaw++
        }
    });
}, 1000)

/*
 * Server start
 */
server.start({
    domain: socket.AF_INET, addr: '0.0.0.0', port: 3002
});

/*
 * Event loop
 */
require('iosched').forever();
import java.util.Timer;
import java.util.TimerTask;
import java.net.InetSocketAddress;
import com.acoinfo.vsoa.*;

public class axis_server {
    private  static String  SERVER_NAME   = "axis_server";
    private  static String  SERVER_INFO   = "\"java language VSOA server\"";
    private  static String  PASSWORD      = "123456";
    private  static String  SERVER_ADDR   = "0.0.0.0";
    private  static int     SERVER_PORT   = 3002;

    static Server server;

    static int roll  = 1;
    static int pitch = 1;
    static int yaw   = 1;

    public static void main(String[] args) {

        /*
         * Initialize server
         */
        try {
            ServerOption opt = new ServerOption(SERVER_INFO, PASSWORD, false);
            InetSocketAddress address = new InetSocketAddress(SERVER_ADDR, SERVER_PORT);
            server = new Server(opt) {
                @Override
                public void onClient(CliHandle client, boolean link) {
                    if (!client.isConnected()) {
                        System.out.println("disconnected");
                    } else {
                        /*
                         * Set priority
                         */
                        client.priority(3);
                    }

                    System.out.println("Client link " + link + " address: " + client.address().toString());
                }
            };

            /*
             * Start server
             */
            if (!server.start(address, null)) {
                return;
            }
        } catch (Exception e1) {
            e1.printStackTrace();
            return ;
        }

        TimerTask taskInterval = new TimerTask() {
            @Override
            public void run() {
                /*
                 * Publish /axis
                 */
                server.publish("/axis", new Payload("{ \"roll\": " + roll++ + "\"pitch\": " + pitch++ + "\"yaw\": " + yaw++
                                                    + "}", null));
            }
        };

        Timer interval = new Timer();
        interval.schedule(taskInterval, 0 ,1000);

        while(true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}
from vsoa.server import Server, Client
from vsoa.timer import Timer

# Create server
server = Server('axis_server', '123456')

# Client event
def onclient(cli: Client, conn: bool):
    print('Client link {} address: {}'.format(conn, cli.address()))
    if conn:
        cli.keepalive(5)
        # Set priority
        cli.priority(3)

server.onclient = onclient

# Publish /axis
timer = Timer()

def static_vars(**kwargs):
    def decorate(func):
        for k in kwargs:
            setattr(func, k, kwargs[k])
        return func
    return decorate

@static_vars(roll = 0, pitch = 0, yaw = 0)
def publish_axis_task():
    server.publish('/axis', vsoa.Payload(param={'roll': publish_axis_task.roll,
                                                'pitch': publish_axis_task.pitch,
                                                'yaw': publish_axis_task.yaw}))
    publish_axis_task.roll += 1
    publish_axis_task.pitch += 1
    publish_axis_task.yaw += 1

timer.start(timeout=0., callback=publish_axis_task, interval=1.)

# Server start
server.run('127.0.0.1', 3002)

说明:

  • 在 Node.js 环境中,不需要事件循环,即删除 require('iosched').forever();
  • 在 Node.js 环境中,socket.AF_INET 应为 vsoa.AF_INET,详情可见 https://www.npmjs.com/package/vsoa 。
文档内容是否对您有所帮助?
有帮助
没帮助