服务端开发
本节内容介绍 VSOA 服务端使用 QoS 的方法。
开发须知
VSOA 提供指定远程客户端优先级的接口,允许的优先级范围为 0 - 5 ,0 为最低优先级,5 为最高优先级。优先级控制 publish 的发送顺序和底层网络包的服务质量,请根据实际情况设置。
常用接口
bool vsoa_server_cli_priority(vsoa_server_t *server, vsoa_cli_id_t id, int new_prio);
client.priority(3);
boolean CliHandle.priority(int prio);
cli.priority(prio: int)
说明:
VSOA QoS 详细接口说明可参考以下手册:
- C 版本可参考 VSOA C 语言编程手册。
- JavaScript 版本可参考 VSOA JavaScript 编程手册。
- Java 版本可参考 VSOA Java 编程手册。
- Golang 版本暂不支持 Qos。
- Python 版本可参考 VSOA Pyhon 编程手册。
开发示例
在本示例中,服务端执行以下流程:
- 服务端监听到客户端连接时,通过对应接口设置客户端优先级。
- 后续服务端 publish 时,将会根据优先级控制 publish 的发送顺序。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_server.h"
#include "vsoa_platform.h"
#define MY_SERVER_PASSWD "123456"
static vsoa_server_t *server;
/*
* Publish axis thread
*/
static void *publish_axis_thread (void *arg)
{
vsoa_url_t url;
vsoa_payload_t payload;
char param[100];
int roll = 1, pitch = 1, yaw = 1;
url.url = "/axis";
url.url_len = strlen(url.url);
payload.data = NULL;
payload.data_len = 0;
payload.param = param;
while (1) {
sleep(1);
if (!vsoa_server_is_subscribed(server, &url)) {
continue;
}
payload.param_len = sprintf(param,
"{\"roll\": %d, \"pitch\": %d, \"yaw\": %d}",
roll++, pitch++, yaw++);
vsoa_server_publish(server, &url, &payload);
}
return (NULL);
}
/*
* Server on client connect or lost callback
*/
static void vsoa_server_client_connect_callback(void *arg, vsoa_server_t *server, vsoa_cli_id_t id, bool connect)
{
/*
* Set priority
*/
vsoa_server_cli_priority(server, id, 3);
printf("client id = %d, status = %s\r\n", id, (connect == true) ? "connect" : "disconnect");
}
int main (int argc, char **argv)
{
int cnt, max_fd;
fd_set fds;
pthread_t id;
struct sockaddr_in addr;
struct timespec timeout = { 1, 0 };
bzero(&addr, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(3002);
addr.sin_addr.s_addr = INADDR_ANY;
#ifdef VSOA_HAS_SIN_LEN
addr.sin_len = sizeof(struct sockaddr_in);
#endif
/*
* Initialize server
*/
server = vsoa_server_create("{\"name\":\"axis_server\"}");
if (!server) {
fprintf(stderr, "Can not create VSOA server!\n");
return (-1);
}
/*
* Set on client callback
*/
vsoa_server_on_cli(server, vsoa_server_client_connect_callback, NULL);
/*
* If need password
*/
vsoa_server_passwd(server, MY_SERVER_PASSWD);
/*
* Start server
*/
if (!vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {
vsoa_server_close(server);
fprintf(stderr, "Can not start VSOA server!\n");
return (-1);
}
/*
* Create publish thread
*/
pthread_create(&id, NULL, publish_axis_thread, NULL);
while (1) {
FD_ZERO(&fds);
max_fd = vsoa_server_fds(server, &fds);
cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
if (cnt > 0) {
vsoa_server_input_fds(server, &fds);
}
}
return (0);
}
var vsoa = require('vsoa');
var socket = require('socket');
/*
* Create server
*/
var server = new vsoa.Server({
info: {
name: 'axis_server'
}, passwd: '123456'
});
/*
* Client event
*/
server.onclient = function(client, link) {
console.log(`Client link ${link} address: ${client.address().addr}`);
if (link) {
client.setKeepAlive(5000);
/*
* Set priority
*/
client.priority(3);
}
};
/*
* Publish /axis
*/
let roll = 0
let pitch = 0
let yaw = 0
setInterval(() => {
server.publish('/axis', {
param: {
roll: roll++, pitch: pitch++, yaw: yaw++
}
});
}, 1000)
/*
* Server start
*/
server.start({
domain: socket.AF_INET, addr: '0.0.0.0', port: 3002
});
/*
* Event loop
*/
require('iosched').forever();
import java.util.Timer;
import java.util.TimerTask;
import java.net.InetSocketAddress;
import com.acoinfo.vsoa.*;
public class axis_server {
private static String SERVER_NAME = "axis_server";
private static String SERVER_INFO = "\"java language VSOA server\"";
private static String PASSWORD = "123456";
private static String SERVER_ADDR = "0.0.0.0";
private static int SERVER_PORT = 3002;
static Server server;
static int roll = 1;
static int pitch = 1;
static int yaw = 1;
public static void main(String[] args) {
/*
* Initialize server
*/
try {
ServerOption opt = new ServerOption(SERVER_INFO, PASSWORD, false);
InetSocketAddress address = new InetSocketAddress(SERVER_ADDR, SERVER_PORT);
server = new Server(opt) {
@Override
public void onClient(CliHandle client, boolean link) {
if (!client.isConnected()) {
System.out.println("disconnected");
} else {
/*
* Set priority
*/
client.priority(3);
}
System.out.println("Client link " + link + " address: " + client.address().toString());
}
};
/*
* Start server
*/
if (!server.start(address, null)) {
return;
}
} catch (Exception e1) {
e1.printStackTrace();
return ;
}
TimerTask taskInterval = new TimerTask() {
@Override
public void run() {
/*
* Publish /axis
*/
server.publish("/axis", new Payload("{ \"roll\": " + roll++ + "\"pitch\": " + pitch++ + "\"yaw\": " + yaw++
+ "}", null));
}
};
Timer interval = new Timer();
interval.schedule(taskInterval, 0 ,1000);
while(true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
from vsoa.server import Server, Client
from vsoa.timer import Timer
# Create server
server = Server('axis_server', '123456')
# Client event
def onclient(cli: Client, conn: bool):
print('Client link {} address: {}'.format(conn, cli.address()))
if conn:
cli.keepalive(5)
# Set priority
cli.priority(3)
server.onclient = onclient
# Publish /axis
timer = Timer()
def static_vars(**kwargs):
def decorate(func):
for k in kwargs:
setattr(func, k, kwargs[k])
return func
return decorate
@static_vars(roll = 0, pitch = 0, yaw = 0)
def publish_axis_task():
server.publish('/axis', vsoa.Payload(param={'roll': publish_axis_task.roll,
'pitch': publish_axis_task.pitch,
'yaw': publish_axis_task.yaw}))
publish_axis_task.roll += 1
publish_axis_task.pitch += 1
publish_axis_task.yaw += 1
timer.start(timeout=0., callback=publish_axis_task, interval=1.)
# Server start
server.run('127.0.0.1', 3002)
说明:
- 在 Node.js 环境中,不需要事件循环,即删除
require('iosched').forever();
。- 在 Node.js 环境中,
socket.AF_INET
应为vsoa.AF_INET
,详情可见 https://www.npmjs.com/package/vsoa 。