服务端开发
本节内容介绍 VSOA 服务端使用订阅与发布的方法。
开发须知
VSOA 服务端的数据发布分为“普通发布方式”和“快速发布方式”。
当服务端需要大量高频的发布数据时,可以使用快速发布的方式。快速发布进行大量高频的数据发布无法保证所有数据都能有效的到达,因此客户端在接收时可能会存在数据的丢失,使用快速发布的条件是这些丢失的数据并不影响程序的实际效果。若数据的变化是离散的而非连续的,使用快速发布可能会存在弊端。
在 VSOA 的底层实现中,普通发布方式基于 TCP,快速发布方式基于 UDP,当有大批量快周期更新数据的传输需求时,应当使用快速发布方式,否则 TCP 的丢包会因恢复时间过长导致实时性下降。
常用接口
bool vsoa_server_publish(vsoa_server_t *server, const vsoa_url_t *url, const vsoa_payload_t *payload);
bool vsoa_server_quick_publish(vsoa_server_t *server, const vsoa_url_t *url, const vsoa_payload_t *payload);
bool vsoa_server_is_subscribed(vsoa_server_t *server, const vsoa_url_t *url);
server.publish(url[, payload][, quick]);
boolean publish(String url, Payload payload);
boolean publish(String url, Payload payload, boolean quick);
func (s *Server) Publish(servicePath string,
timeDriction time.Duration,
pubs func(*protocol.Message, *protocol.Message)) (err error)
func (s *Server) QuickPublish(servicePath string,
timeDriction time.Duration,
pubs func(*protocol.Message, *protocol.Message)) (err error)
server.publish(url: str, payload: vsoa.Payload | dict = None, quick: bool = False) -> bool
说明:
发布与订阅服务端的详细接口说明可参考以下手册:
- C 版本可参考 VSOA C 语言编程手册。
- JavaScript 版本可参考 VSOA JavaScript 编程手册。
- Java 版本可参考 VSOA Java 编程手册。
- Golang 版本可参考 VSOA Golang 编程手册。
- Python 版本可参考 VSOA Pyhon 编程手册。
普通发布方式
如下在 axis_server 微服务中封装一个陀螺仪数据发布的接口,对外提供一个 /axis
资源对数据进行发布。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_server.h"
#include "vsoa_platform.h"
#define MY_SERVER_PASSWD "123456"
#define AXIS_SER_BUF_LEN 100
#define AXIS_SER_PORT 3002
#define AXIS_SER_AUTO_PORT "VSOA_AUTO_PORT="
#ifndef TRUE
#define TRUE 1
#endif
static vsoa_server_t *axisServer;
/*
* Publish axis thread
*/
static void *publish_axis_thread (void *arg)
{
vsoa_url_t url;
int roll, pitch,yaw;
vsoa_payload_t payload;
char param[AXIS_SER_BUF_LEN + 1];
url.url = "/axis";
url.url_len = strlen(url.url);
payload.data = NULL;
payload.data_len = 0;
payload.param = param;
roll = 1;
pitch = 1;
yaw = 1;
while (TRUE) {
sleep(1);
if (!vsoa_server_is_subscribed(axisServer, &url)) {
continue;
}
payload.param_len = snprintf(param, AXIS_SER_BUF_LEN,
"{\"roll\": %d, \"pitch\": %d, \"yaw\": %d}",
roll++, pitch++, yaw++);
vsoa_server_publish(axisServer, &url, &payload);
}
return (NULL);
}
int main (int argc, char **argv)
{
uint16_t server_port = AXIS_SER_PORT;
struct sockaddr_in addr;
char * axis_serv_auto_port = getenv(AXIS_SER_AUTO_PORT);
if (axis_serv_auto_port != NULL) {
fprintf(stdout, "axis_ser port is %s .\n", axis_serv_auto_port);
server_port = atoi(axis_serv_auto_port);
if (server_port == 0) {
server_port = AXIS_SER_PORT;
}
}
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(server_port);
addr.sin_addr.s_addr = INADDR_ANY;
#ifdef VSOA_HAS_SIN_LEN
addr.sin_len = sizeof(struct sockaddr_in);
#endif
/*
* Initialize server
*/
axisServer = vsoa_server_create("{\"name\":\"axis_server\"}");
if (!axisServer) {
fprintf(stderr, "Can not create VSOA server!\n");
return (-1);
}
/*
* If need password
*/
vsoa_server_passwd(axisServer, MY_SERVER_PASSWD);
/*
* Start server
*/
if (!vsoa_server_start(axisServer, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {
vsoa_server_close(axisServer);
fprintf(stderr, "Can not start VSOA server!\n");
return (-1);
}
fprintf(stderr, " start VSOA server success!\n");
/*
* Create publish thread
*/
pthread_t pub_threadid;
int threadRet = pthread_create(&pub_threadid, NULL, publish_axis_thread, NULL);
if (threadRet) {
fprintf(stderr, " create publish thread fail errno is %d!\n", errno);
return (-1);
}
/*
* set 1s timeout
*/
struct timespec timeout = {1, 0};
int cnt, max_fd;
fd_set fds;
while (TRUE) {
FD_ZERO(&fds);
max_fd = vsoa_server_fds(axisServer, &fds);
cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
if (cnt > 0) {
vsoa_server_input_fds(axisServer, &fds);
}
}
return (0);
}
var vsoa = require('vsoa');
var socket = require('socket');
/*
* Create server
*/
var server = new vsoa.Server({
info: {
name: 'axis_server'
}, passwd: '123456'
});
/*
* Client event
*/
server.onclient = function(client, link) {
console.log(`Client link ${link} address: ${client.address().addr}`);
if (link) {
client.setKeepAlive(5000);
}
};
/*
* Publish /axis
*/
let roll = 0
let pitch = 0
let yaw = 0
setInterval(() => {
server.publish('/axis', {
param: {
roll: roll++, pitch: pitch++, yaw: yaw++
}
});
}, 1000)
/*
* Server start
*/
server.start({
domain: socket.AF_INET, addr: '0.0.0.0', port: 3002
});
/*
* Event loop
*/
require('iosched').forever();
import java.util.Timer;
import java.util.TimerTask;
import java.net.InetSocketAddress;
import com.acoinfo.vsoa.*;
public class axis_server {
private static String SERVER_NAME = "axis_server";
private static String SERVER_INFO = "\"java language VSOA server\"";
private static String PASSWORD = "123456";
private static String SERVER_ADDR = "0.0.0.0";
private static int SERVER_PORT = 3002;
static Server server;
static int roll = 1;
static int pitch = 1;
static int yaw = 1;
public static void main(String[] args) {
/*
* Initialize server
*/
try {
ServerOption opt = new ServerOption(SERVER_INFO, PASSWORD, false);
InetSocketAddress address = new InetSocketAddress(SERVER_ADDR, SERVER_PORT);
server = new Server(opt) {
@Override
public void onClient(CliHandle client, boolean link) {
if (!client.isConnected()) {
System.out.println("disconnected");
}
System.out.println("Client link " + link + " address: " + client.address().toString());
}
};
/*
* Start server
*/
if (!server.start(address, null)) {
return;
}
} catch (Exception e1) {
e1.printStackTrace();
return ;
}
TimerTask taskInterval = new TimerTask() {
@Override
public void run() {
/*
* Publish /axis
*/
server.publish("/axis", new Payload("{ \"roll\": " + roll++ + "\"pitch\": " + pitch++ + "\"yaw\": " + yaw++
+ "}", null));
}
};
Timer interval = new Timer();
interval.schedule(taskInterval, 0 ,1000);
while(true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
package main
import (
"encoding/json"
"strconv"
"time"
"gitee.com/sylixos/go-vsoa/protocol"
"gitee.com/sylixos/go-vsoa/server"
)
var roll, pitch, yaw int = 1, 1, 1
func startServer() {
// Set the server options.
serverOption := server.Option{
Password: "123456",
}
// Create a new server with the given options.
s := server.NewServer("golang VSOA publish server", serverOption)
// Define the publisher function.
publishFunc := func(req, _ *protocol.Message) {
roll++
pitch++
yaw++
req.Param, _ = json.RawMessage(`{"roll":` + strconv.Itoa(roll) +
`, "pitch":` + strconv.Itoa(pitch) +
`, "yaw":` + strconv.Itoa(yaw) + `}`).MarshalJSON()
}
// Add the publisher to the server. Publishing is done every 1 second.
s.Publish("/axis", 1*time.Second, publishFunc)
// Start serving on the address "localhost:3001" in a separate goroutine.
go func() {
_ = s.Serve("localhost:3001")
}()
}
func main() {
startServer()
for {
time.Sleep(1 * time.Second)
}
}
from vsoa.server import Server, Client
from vsoa.interface import Request, Payload
from vsoa.timer import Timer
# Create server
server = Server('axis_server', '123456')
# Client event
def onclient(cli: Client, conn: bool):
print('Client link {} address: {}'.format(conn, cli.address()))
if conn:
cli.keepalive(5)
server.onclient = onclient
# Publish /axis
timer = Timer()
def static_vars(**kwargs):
def decorate(func):
for k in kwargs:
setattr(func, k, kwargs[k])
return func
return decorate
@static_vars(roll = 0, pitch = 0, yaw = 0)
def publish_axis_task():
server.publish('/axis', vsoa.Payload(param={'roll': publish_axis_task.roll,
'pitch': publish_axis_task.pitch,
'yaw': publish_axis_task.yaw}))
publish_axis_task.roll += 1
publish_axis_task.pitch += 1
publish_axis_task.yaw += 1
timer.start(timeout=0., callback=publish_axis_task, interval=1.)
# Server start
server.run('127.0.0.1', 3002)
说明:
- 在 Node.js 的环境中,不需要事件循环,即删除
require('iosched').forever();
。- 在 Node.js 的环境中,
socket.AF_INET
应为vsoa.AF_INET
,详情可见 https://www.npmjs.com/package/vsoa 。
快速发布方式
C 语言环境
对于 C 语言环境,将如下接口进行替换即可:
/*
* Publish /axis
*/
vsoa_server_publish(server, &url, &payload);
替换为:
/*
* Publish /axis
*/
vsoa_server_quick_publish(server, &url, &payload);
Java 语言环境
对于 Java 语言环境,将 publish
的最后一个参数设置为 true
即可:
/*
* Publish /axis
*/
server.publish("/axis", new Payload("{ \"roll\": " + roll++ + "\"pitch\": " + pitch++ + "\"yaw\": " + yaw++
+ "}", null));
替换为:
/*
* Publish /axis
*/
server.publish("/axis", new Payload("{ \"roll\": " + roll++ + "\"pitch\": " + pitch++ + "\"yaw\": " + yaw++
+ "}", null), true);
JavaScript 语言环境
对于 JavaScript 语言环境,将 publish
的最后一个参数设置为 true
即可:
/*
* Publish /axis
*/
server.publish('/axis', {
param: {
roll: roll++, pitch: pitch++, yaw: yaw++
}
});
替换为:
/*
* Publish /axis
*/
server.publish('/axis', {
param: {
roll: roll++, pitch: pitch++, yaw: yaw++
}
}, true);
Go 语言环境
对于 Go 语言环境,将如下接口进行替换即可:
s.Publish("/axis", 1*time.Second, publishFunc)
替换为:
s.QuickPublish("/axis", 1*time.Second, publishFunc)