服务端开发

更新时间:
2024-12-24

服务端开发

本节内容介绍 VSOA 服务端使用订阅与发布的方法。

开发须知

VSOA 服务端的数据发布分为“普通发布方式”和“快速发布方式”。

当服务端需要大量高频的发布数据时,可以使用快速发布的方式。快速发布进行大量高频的数据发布无法保证所有数据都能有效的到达,因此客户端在接收时可能会存在数据的丢失,使用快速发布的条件是这些丢失的数据并不影响程序的实际效果。若数据的变化是离散的而非连续的,使用快速发布可能会存在弊端。

在 VSOA 的底层实现中,普通发布方式基于 TCP,快速发布方式基于 UDP,当有大批量快周期更新数据的传输需求时,应当使用快速发布方式,否则 TCP 的丢包会因恢复时间过长导致实时性下降。

常用接口

bool vsoa_server_publish(vsoa_server_t *server, const vsoa_url_t *url, const vsoa_payload_t *payload);
bool vsoa_server_quick_publish(vsoa_server_t *server, const vsoa_url_t *url, const vsoa_payload_t *payload);
bool vsoa_server_is_subscribed(vsoa_server_t *server, const vsoa_url_t *url);
server.publish(url[, payload][, quick]);
boolean publish(String url, Payload payload);
boolean publish(String url, Payload payload, boolean quick);
func (s *Server) Publish(servicePath string,
        timeDriction time.Duration,
        pubs func(*protocol.Message, *protocol.Message)) (err error)
func (s *Server) QuickPublish(servicePath string,
        timeDriction time.Duration,
        pubs func(*protocol.Message, *protocol.Message)) (err error)
server.publish(url: str, payload: vsoa.Payload | dict = None, quick: bool = False) -> bool

说明:
发布与订阅服务端的详细接口说明可参考以下手册:

普通发布方式

如下在 axis_server 微服务中封装一个陀螺仪数据发布的接口,对外提供一个 /axis 资源对数据进行发布。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_server.h"
#include "vsoa_platform.h"

#define MY_SERVER_PASSWD  "123456"
#define AXIS_SER_BUF_LEN  100
#define AXIS_SER_PORT     3002
#define AXIS_SER_AUTO_PORT "VSOA_AUTO_PORT="
#ifndef TRUE
#define TRUE               1
#endif

static vsoa_server_t *axisServer;

/*
 * Publish axis thread
 */
static void *publish_axis_thread (void *arg)
{
    vsoa_url_t url;
    int roll, pitch,yaw;
    vsoa_payload_t payload;
    char param[AXIS_SER_BUF_LEN + 1];

    url.url     = "/axis";
    url.url_len = strlen(url.url);

    payload.data = NULL;
    payload.data_len  = 0;
    payload.param = param;

    roll  = 1;
    pitch = 1;
    yaw   = 1;
    while (TRUE) {
        sleep(1);

        if (!vsoa_server_is_subscribed(axisServer, &url)) {
            continue;
        }

        payload.param_len = snprintf(param, AXIS_SER_BUF_LEN,
                                    "{\"roll\": %d, \"pitch\": %d, \"yaw\": %d}",
                                    roll++, pitch++, yaw++);
        vsoa_server_publish(axisServer, &url, &payload);
    }

    return (NULL);
}

int main (int argc, char **argv)
{
    uint16_t server_port = AXIS_SER_PORT;
    struct sockaddr_in addr;

    char * axis_serv_auto_port = getenv(AXIS_SER_AUTO_PORT);
    if (axis_serv_auto_port != NULL) {
        fprintf(stdout, "axis_ser port is %s .\n", axis_serv_auto_port);

        server_port = atoi(axis_serv_auto_port);
        if (server_port == 0) {
            server_port = AXIS_SER_PORT;
        }
    }

    memset(&addr, 0, sizeof(struct sockaddr_in));
    addr.sin_family      = AF_INET;
    addr.sin_port        = htons(server_port);
    addr.sin_addr.s_addr = INADDR_ANY;

#ifdef VSOA_HAS_SIN_LEN
    addr.sin_len = sizeof(struct sockaddr_in);
#endif

    /*
    * Initialize server
    */
    axisServer = vsoa_server_create("{\"name\":\"axis_server\"}");
    if (!axisServer) {
        fprintf(stderr, "Can not create VSOA server!\n");
        return  (-1);
    }

    /*
    * If need password
    */
    vsoa_server_passwd(axisServer, MY_SERVER_PASSWD);

    /*
    * Start server
    */
    if (!vsoa_server_start(axisServer, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {
        vsoa_server_close(axisServer);
        fprintf(stderr, "Can not start VSOA server!\n");
        return  (-1);
    }

    fprintf(stderr, " start VSOA server success!\n");

    /*
    * Create publish thread
    */
    pthread_t pub_threadid;
    int threadRet = pthread_create(&pub_threadid, NULL, publish_axis_thread, NULL);
    if (threadRet) {
        fprintf(stderr, " create publish thread fail errno is %d!\n", errno);
        return (-1);
    }

    /*
     * set 1s timeout
    */
    struct timespec timeout = {1, 0};

    int cnt, max_fd;
    fd_set fds;

    while (TRUE) {
        FD_ZERO(&fds);
        max_fd = vsoa_server_fds(axisServer, &fds);

        cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
        if (cnt > 0) {
            vsoa_server_input_fds(axisServer, &fds);
        }
    }

    return (0);
}

var vsoa = require('vsoa');
var socket = require('socket');

/*
 * Create server
 */
var server = new vsoa.Server({
    info: {
        name: 'axis_server'
    }, passwd: '123456'
});

/*
 * Client event
 */
server.onclient = function(client, link) {
    console.log(`Client link ${link} address: ${client.address().addr}`);
    if (link) {
        client.setKeepAlive(5000);
    }
};

/*
 * Publish /axis
 */
let roll = 0
let pitch = 0
let yaw = 0
setInterval(() => {
    server.publish('/axis', {
        param: {
            roll: roll++, pitch: pitch++, yaw: yaw++
        }
    });
}, 1000)

/*
 * Server start
 */
server.start({
    domain: socket.AF_INET, addr: '0.0.0.0', port: 3002
});

/*
 * Event loop
 */
require('iosched').forever();
import java.util.Timer;
import java.util.TimerTask;
import java.net.InetSocketAddress;
import com.acoinfo.vsoa.*;

public class axis_server {
    private  static String  SERVER_NAME   = "axis_server";
    private  static String  SERVER_INFO   = "\"java language VSOA server\"";
    private  static String  PASSWORD      = "123456";
    private  static String  SERVER_ADDR   = "0.0.0.0";
    private  static int     SERVER_PORT   = 3002;

    static Server server;

    static int roll  = 1;
    static int pitch = 1;
    static int yaw   = 1;

    public static void main(String[] args) {

        /*
        * Initialize server
        */
        try {
            ServerOption opt = new ServerOption(SERVER_INFO, PASSWORD, false);
            InetSocketAddress address = new InetSocketAddress(SERVER_ADDR, SERVER_PORT);
            server = new Server(opt) {
                @Override
                public void onClient(CliHandle client, boolean link) {
                    if (!client.isConnected()) {
                        System.out.println("disconnected");
                    }

                    System.out.println("Client link " + link + " address: " + client.address().toString());
                }
            };

            /*
            * Start server
            */
            if (!server.start(address, null)) {
                return;
            }
        } catch (Exception e1) {
            e1.printStackTrace();
            return ;
        }

        TimerTask taskInterval = new TimerTask() {
            @Override
            public void run() {
                /*
                * Publish /axis
                */
                server.publish("/axis", new Payload("{ \"roll\": " + roll++ + "\"pitch\": " + pitch++ + "\"yaw\": " + yaw++
                                                    + "}", null));
            }
        };

        Timer interval = new Timer();
        interval.schedule(taskInterval, 0 ,1000);

        while(true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}
package main

import (
    "encoding/json"
    "strconv"
    "time"

    "gitee.com/sylixos/go-vsoa/protocol"
    "gitee.com/sylixos/go-vsoa/server"
)

var roll, pitch, yaw int = 1, 1, 1

func startServer() {

    // Set the server options.
    serverOption := server.Option{
        Password: "123456",
    }

    // Create a new server with the given options.
    s := server.NewServer("golang VSOA publish server", serverOption)

    // Define the publisher function.
    publishFunc := func(req, _ *protocol.Message) {
        roll++
        pitch++
        yaw++
        req.Param, _ = json.RawMessage(`{"roll":` + strconv.Itoa(roll) +
            `, "pitch":` + strconv.Itoa(pitch) +
            `, "yaw":` + strconv.Itoa(yaw) + `}`).MarshalJSON()
    }
    // Add the publisher to the server. Publishing is done every 1 second.
    s.Publish("/axis", 1*time.Second, publishFunc)

    // Start serving on the address "localhost:3001" in a separate goroutine.
    go func() {
        _ = s.Serve("localhost:3001")
    }()
}

func main() {
    startServer()

    for {
        time.Sleep(1 * time.Second)
    }
}
from vsoa.server import Server, Client
from vsoa.interface import Request, Payload
from vsoa.timer import Timer

# Create server
server = Server('axis_server', '123456')

# Client event
def onclient(cli: Client, conn: bool):
    print('Client link {} address: {}'.format(conn, cli.address()))
    if conn:
        cli.keepalive(5)

server.onclient = onclient

# Publish /axis
timer = Timer()

def static_vars(**kwargs):
    def decorate(func):
        for k in kwargs:
            setattr(func, k, kwargs[k])
        return func
    return decorate

@static_vars(roll = 0, pitch = 0, yaw = 0)
def publish_axis_task():
    server.publish('/axis', vsoa.Payload(param={'roll': publish_axis_task.roll,
                                                'pitch': publish_axis_task.pitch,
                                                'yaw': publish_axis_task.yaw}))
    publish_axis_task.roll += 1
    publish_axis_task.pitch += 1
    publish_axis_task.yaw += 1

timer.start(timeout=0., callback=publish_axis_task, interval=1.)

# Server start
server.run('127.0.0.1', 3002)

说明:

  • 在 Node.js 的环境中,不需要事件循环,即删除 require('iosched').forever();
  • 在 Node.js 的环境中,socket.AF_INET 应为 vsoa.AF_INET,详情可见 https://www.npmjs.com/package/vsoa 。

快速发布方式

C 语言环境

对于 C 语言环境,将如下接口进行替换即可:

/*
* Publish /axis
*/
vsoa_server_publish(server, &url, &payload);

替换为:

/*
* Publish /axis
*/
vsoa_server_quick_publish(server, &url, &payload);

Java 语言环境

对于 Java 语言环境,将 publish 的最后一个参数设置为 true 即可:

/*
* Publish /axis
*/
server.publish("/axis", new Payload("{ \"roll\": " + roll++ + "\"pitch\": " + pitch++ + "\"yaw\": " + yaw++
                                    + "}", null));

替换为:

/*
* Publish /axis
*/
server.publish("/axis", new Payload("{ \"roll\": " + roll++ + "\"pitch\": " + pitch++ + "\"yaw\": " + yaw++
                                    + "}", null), true);

JavaScript 语言环境

对于 JavaScript 语言环境,将 publish 的最后一个参数设置为 true 即可:

/*
* Publish /axis
*/
server.publish('/axis', {
    param: {
        roll: roll++, pitch: pitch++, yaw: yaw++
    }
});

替换为:

/*
* Publish /axis
*/
server.publish('/axis', {
    param: {
        roll: roll++, pitch: pitch++, yaw: yaw++
    }
}, true);

Go 语言环境

对于 Go 语言环境,将如下接口进行替换即可:

s.Publish("/axis", 1*time.Second, publishFunc)

替换为:

s.QuickPublish("/axis", 1*time.Second, publishFunc)
文档内容是否对您有所帮助?
有帮助
没帮助