服务端开发

更新时间:
2024-12-24

服务端开发

本节内容介绍 VSOA 服务端使用 RPC 的方法。

开发须知

微服务对外提供 RPC 服务时,需要指定该 RPC 的 URL 资源标识并将该 URL 加入监听,同时需要指定 RPC 的回调函数用于处理 RPC 被调用时的逻辑。

常用接口

bool vsoa_server_add_listener(vsoa_server_t *server, const vsoa_url_t *url, vsoa_server_cmd_func_t callback, void *arg);
void vsoa_server_remove_listener(vsoa_server_t *server, const vsoa_url_t *url);
bool vsoa_server_cli_reply(vsoa_server_t *server, vsoa_cli_id_t id, uint8_t status, uint32_t seqno, uint16_t tunid, const vsoa_payload_t *payload);
server.on
void on(String url, CBOnCall cb);
func (s *Server) On(servicePath string, serviceMethod protocol.RpcMessageType, handler func(*protocol.Message, *protocol.Message)) (err error)
server.command(url: str, wq: vsoa:WorkQueue = None) -> callable

说明:

RPC 服务端的详细接口说明可参考以下手册:

开发示例

例如:将“灯光状态”定义为一个 /light 的 URL 资源标识,供调用 RPC 的客户端获取灯光状态。

向所创建的 VSOA 服务中增加一个 RPC 实现,需要指定当客户端调用 /light 时,将被执行的 RPC 回调函数。

/*
 * Application: light_server
 */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_server.h"
#include "vsoa_platform.h"

#define SERVER_PASSWD       "123456"
#define VSOA_AUTO_PORT_ENV  "VSOA_AUTO_PORT="
#define SERVER_DEFAULT_PORT 3001

static vsoa_server_t *server;

/*
* /light Callback
*/
static void command_light (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
                           vsoa_header_t *vsoa_hdr, vsoa_url_t *url,
                           vsoa_payload_t *payload)
{
    vsoa_payload_t send;
    uint32_t seqno = vsoa_parser_get_seqno(vsoa_hdr);

    send.data      = NULL;
    send.data_len  = 0;
    send.param     = "{\"light\": 1}";
    send.param_len = strlen(send.param);

    vsoa_server_cli_reply(server, cid, 0, seqno, 0, &send);
}

int main (int argc, char **argv)
{
    struct sockaddr_in addr;
    uint16_t server_port = SERVER_DEFAULT_PORT;

    char *autoPort = getenv(VSOA_AUTO_PORT_ENV);
    if (autoPort != NULL) {
        fprintf(stdout, "ser port is %s .\n", autoPort);

        server_port = atoi(autoPort);
        if (server_port == 0) {
            server_port = SERVER_DEFAULT_PORT;
        }
    }

    bzero(&addr, sizeof(struct sockaddr_in));
    addr.sin_family      = AF_INET;
    addr.sin_port        = htons(server_port);
    addr.sin_addr.s_addr = INADDR_ANY;

#ifdef VSOA_HAS_SIN_LEN
    addr.sin_len = sizeof(struct sockaddr_in);
#endif

    /*
    * Initialize server
    */
    server = vsoa_server_create("{\"name\":\"light_server\"}");
    if (!server) {
        fprintf(stderr, "Can not create VSOA server!\n");
        return  (-1);
    }

    /*
    * If need password
    */
    vsoa_server_passwd(server, SERVER_PASSWD);

    /*
    * Add /light listener
    */
    vsoa_url_t url;
    url.url     = "/light";
    url.url_len = strlen(url.url);
    vsoa_server_add_listener(server, &url, command_light, NULL);

    /*
    * Start server
    */
    if (!vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {
        vsoa_server_close(server);
        fprintf(stderr, "Can not start VSOA server!\n");
        return  (-1);
    }

    int cnt, max_fd;
    fd_set fds;
    struct timespec timeout = { 1, 0 };
    while (1) {
        FD_ZERO(&fds);
        max_fd = vsoa_server_fds(server, &fds);

        cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
        if (cnt > 0) {
            vsoa_server_input_fds(server, &fds);
        }
    }

    return (0);
}
/*
 * light_server.js
 */
var vsoa = require('vsoa');
var socket = require('socket');

/*
 * Create server
 */
var server = new vsoa.Server({
    info: {
        name: 'light_server'
    }, passwd: '123456'
});

/*
 * Client event
 */
server.onclient = function(client, link) {
    console.log(`Client link ${link} address: ${client.address().addr}`);
    if (link) {
        client.setKeepAlive(5000);
    }
};

/*
* /light Callback
*/
server.on('/light', function (cli, request, payload) {
    const content = {}
    content.param = {'light': 1}
    cli.reply(0, request.seqno, content)
})

/*
 * Server start
 */
server.start({
    domain: socket.AF_INET, addr: '127.0.0.1', port: 3001
});

/*
 * Event loop
 */
require('iosched').forever();
/*
* light_server.java
*/
import java.net.InetSocketAddress;

import com.acoinfo.vsoa.*;
import com.acoinfo.vsoa.Server.CliHandle;

public class light_server {
    private  static String  SERVER_NAME   = "light_server";
    private  static String  SERVER_INFO   = "\"java language VSOA server\"";
    private  static String  PASSWORD      = "123456";
    private  static String  SERVER_ADDR   = "0.0.0.0";
    private  static int     SERVER_PORT   = 3001;

    static Server server;

    public static void main(String[] args) {

        /*
        * Initialize server
        */
        try {
            ServerOption opt = new ServerOption(SERVER_INFO, PASSWORD, false);
            InetSocketAddress address = new InetSocketAddress(SERVER_ADDR, SERVER_PORT);
            server = new Server(opt) {
                @Override
                public void onClient(CliHandle client, boolean link) {
                    if (!client.isConnected()) {
                        System.out.println("disconnected");
                    }

                    System.out.println("Client link " + link + " address: " + client.address().toString());
                }
            };

            /*
            * Start server
            */
            if (!server.start(address, null)) {
                return;
            }

            /*
            * /light Callback
            */
            server.on("/light", new CBOnCall() {
                @Override
                public boolean Callback(String url, CliHandle client, Request req, Payload payload) {
                    payload.param = "{\"light\": 1}";
                    client.reply(Constant.SUCCESS, req.seqno, payload);
                    return true;
                }
            });

        } catch (Exception e1) {
            e1.printStackTrace();
            return ;
        }

        while(true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}
package main

import (
    "encoding/json"
    "strconv"
    "time"

    "gitee.com/sylixos/go-vsoa/protocol"
    "gitee.com/sylixos/go-vsoa/server"
)

type RpcLightParam struct {
    LightStatus int `json:"light"`
}

var lightstatus = 1

func startServer() {
    /*
    * Initialize server
    */
    serverOption := server.Option{
        Password: "123456",
    }
    s := server.NewServer("golang VSOA RPC server", serverOption)

    /*
    * /light Callback on RPC GET
    */
    handleLightGet := func(req, res *protocol.Message) {
        res.Param, _ = json.RawMessage(`{"light":` + strconv.Itoa(lightstatus) + `}`).MarshalJSON()
        res.Data = req.Data
    }
    s.On("/light", protocol.RpcMethodGet, handleLightGet)

    go func() {
        _ = s.Serve("localhost:3001")
    }()
}

func main() {
    startServer()

    for {
        time.Sleep(1 * time.Second)
    }
}

from vsoa.server import Server, Client
from vsoa.interface import Request, Payload

# Create server
server = Server('light_server', '123456')

# Client event
def onclient(cli: Client, conn: bool):
    print('Client link {} address: {}'.format(conn, cli.address()))
    if conn:
        cli.keepalive(5)

server.onclient = onclient

# /light callback
@server.command('/light')
def light(cli: Client, request: Request, payload: Payload):
    content = vsoa.Payload(param={'light': 1})
    cli.reply(seqno=request.seqno, payload=content, status=0)

# Server start
server.run('127.0.0.1', 3001)

说明:

  • 在 Node.js 的环境中,不需要事件循环,即删除 require('iosched').forever();
  • 在 Node.js 的环境中,socket.AF_INET 应为 vsoa.AF_INET,详情可见 https://www.npmjs.com/package/vsoa 。

VSOA vsoa_payload_t 结构体

在使用 C 语言开发过程中,需要特别注意 vsoa_payload_t 结构体的使用方法。该结构体中有 param 和 data 两个字段提供给用户以携带数据,长度分别为 param_len 和 data_len。其中: param 参数推荐用于字符串类型参数,data 参数用于二进制数据或者 base64 类型数据。如果用户没有用到该字段,需要将对应的字段置为 NULL 并将其长度置为 0

例如,如果只需要 param 参数不需要 data 参数,需要按照如下形式生成 payload:

vsoa_payload_t send;

send.data      = NULL;
send.data_len  = 0;
send.param     = "{\"light\": 1}";
send.param_len = strlen(send.param);

关于 vsoa_payload_t 的详细说明可见 VSOA Base Data Struct 中的 Payload 说明。

RPC GET/SET

如同定义函数时习惯区分函数的功能为“设置/获取”或“输入/输出”的概念,在 VSOA 的 RPC 中也可以通过读取 VSOA 报文头标志对 RPC 请求区分 GET 或 SET 操作。针对如下示例约定,当需要开启灯光时,向灯光控制微服务发送数据 "1";相反,当需要关闭灯光时,向灯光控制微服务发送数据 "0"。

static void command_light (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
                           vsoa_header_t *vsoa_hdr, vsoa_url_t *url,
                           vsoa_payload_t *payload)
{
    vsoa_payload_t send;
    static int light_status = 0;
    uint32_t  seqno = vsoa_parser_get_seqno(vsoa_hdr);
    uint8_t  flags = vsoa_parser_get_flags(vsoa_hdr);

#define LIGHT_ON      "{\"light\": 1}"
#define LIGHT_OFF     "{\"light\": 0}"

    if (flags & VSOA_FLAG_SET) {
        if (!strncmp(payload->param, LIGHT_ON, strlen(LIGHT_ON))) {
            light_status = 1;

        } else if (!strncmp(payload->param, LIGHT_OFF, strlen(LIGHT_OFF))) {
            light_status = 0;
        }
    }

    send.param = light_status ? LIGHT_ON : LIGHT_OFF;
    send.data = NULL;
    send.data_len = 0;
    send.param_len = strlen(send.param);

    vsoa_server_cli_reply(server, cid, 0, seqno, 0, &send);
}
server.on('/light', function (cli, request, payload) {
    const content = {}
    if (request.method == vsoa.method.SET) {
        content.param = {'light': 1}
    } else if (request.method == vsoa.method.GET) {
        content.param = {'light': 0}
    }
    cli.reply(0, request.seqno, content)
})
private static String LIGHT_ON    = "{\"light\": 1}";
private static String LIGHT_OFF   = "{\"light\": 0}";
private static int    lightStatus = 0;

public static void main(String[] args) {
    ……

    server.on("/light", new CBOnCall() {
        @Override
        public boolean Callback(String url, CliHandle client, Request req, Payload payload) {
            if (req.method == Request.VSOA_METHOD_SET) {
                if (payload.param == LIGHT_ON) {
                    lightStatus = 1;
                    payload.param = LIGHT_ON;

                } else {
                    lightStatus = 0;
                    payload.param = LIGHT_OFF;
                }
            } else {
                payload.param = lightStatus == 1 ? LIGHT_ON : LIGHT_OFF;
            }

            client.reply(Constant.SUCCESS, req.seqno, payload);

            return true;
        }
    });
}
    ……

func startServer() {
    ……

    /*
    * /light Callback on RPC GET
    */
    handleLightGet := func(req, res *protocol.Message) {
        res.Param, _ = json.RawMessage(`{"light":` + strconv.Itoa(lightstatus) + `}`).MarshalJSON()
        res.Data = req.Data
    }
    s.On("/light", protocol.RpcMethodGet, handleLightGet)

    /*
    * /light Callback on RPC SET
    */
    handleLightSet := func(req, res *protocol.Message) {
        reqParam := new(RpcLightParam)
        err := json.Unmarshal(req.Param, reqParam)

        if err != nil {
            res.Param, _ = json.RawMessage(`{"light":` + strconv.Itoa(lightstatus) + `}`).MarshalJSON()
            return
        }

        lightstatus = reqParam.LightStatus
        res.Param, _ = json.RawMessage(`{"light":` + strconv.Itoa(lightstatus) + `}`).MarshalJSON()
        res.Data = req.Data
    }
    s.On("/light", protocol.RpcMethodSet, handleLightSet)

    ……
}

    ……
# /light callback
@server.command('/light')
def light(cli: Client, request: Request, payload: Payload):
    match request.method:
        case vsoa.METHOD_SET: content = vsoa.Payload(param={'light': 1})
        case vsoa.METHOD_GET: content = vsoa.Payload(param={'light': 0})
        case _: content = {}
    cli.reply(seqno=request.seqno, payload=content, status=0)

并行 RPC

以上 RPC 使用方式中的所有回调函数都是串行执行的,VSOA 为 C 语言开发版本提供了并行处理 RPC 请求的功能,开发者可以通过并发 RPC 相关接口创建线程池并行处理 RPC 请求。

如下是将以上开发示例修改为并行 RPC 的示例:

/*
 * Application: light_server
 */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_server.h"
#include "vsoa_platform.h"
#include "vsoa_plistener.h"

#define SERVER_PASSWD       "123456"
#define VSOA_AUTO_PORT_ENV  "VSOA_AUTO_PORT="
#define SERVER_DEFAULT_PORT 3001
#define PLISTEN_THREAD_COUNT 2

static vsoa_server_t *server;

/*
* /light Callback
*/
static void command_light (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
                           vsoa_header_t *vsoa_hdr, vsoa_url_t *url,
                           vsoa_payload_t *payload)
{
    vsoa_payload_t send;
    uint32_t seqno = vsoa_parser_get_seqno(vsoa_hdr);

    send.data      = NULL;
    send.data_len  = 0;
    send.param     = "{\"light\": 1}";
    send.param_len = strlen(send.param);

    vsoa_server_cli_reply(server, cid, 0, seqno, 0, &send);
}

int main (int argc, char **argv)
{
    struct sockaddr_in addr;
     uint16_t server_port = SERVER_DEFAULT_PORT;

     char *autoPort = getenv(VSOA_AUTO_PORT_ENV);
     if (autoPort != NULL) {
         fprintf(stdout, "ser port is %s .\n", autoPort);

         server_port = atoi(autoPort);
         if (server_port == 0) {
             server_port = SERVER_DEFAULT_PORT;
         }
     }

     bzero(&addr, sizeof(struct sockaddr_in));
     addr.sin_family      = AF_INET;
     addr.sin_port        = htons(server_port);
     addr.sin_addr.s_addr = INADDR_ANY;
#ifdef VSOA_HAS_SIN_LEN
    addr.sin_len = sizeof(struct sockaddr_in);
#endif

    /*
    * Initialize server
    */
    server = vsoa_server_create("{\"name\":\"light_server\"}");
    if (!server) {
        fprintf(stderr, "Can not create VSOA server!\n");
        return  (-1);
    }

    /*
    * If need password
    */
    vsoa_server_passwd(server, SERVER_PASSWD);

    /*
    * Create plistener
    */
    vsoa_plistener_t *pl;
    pl = vsoa_server_plistener_create(PLISTEN_THREAD_COUNT);
    if (!pl) {
        vsoa_server_close(server);
        fprintf(stderr, "Can not create VSOA parallel server!\n");
        return  (-1);
    }

    /*
    * Add /light listener
    */
    vsoa_url_t url;
    url.url     = "/light";
    url.url_len = strlen(url.url);
    vsoa_server_add_listener(server, &url, command_light, NULL);

    /*
    * Add parallel listener
    */
    vsoa_plistener_handler_t handler;
    handler = vsoa_server_plistener_handler(pl, true, 0, command_light, NULL);
    vsoa_server_add_listener(server, &url, handler.callback, handler.arg);

    /*
    * Start server
    */
    if (!vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {
        vsoa_server_close(server);
        fprintf(stderr, "Can not start VSOA server!\n");
        return  (-1);
    }

    int cnt, max_fd;
    fd_set fds;
    struct timespec timeout = { 1, 0 };
    while (1) {
        FD_ZERO(&fds);
        max_fd = vsoa_server_fds(server, &fds);

        cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
        if (cnt > 0) {
            vsoa_server_input_fds(server, &fds);
        }
    }

    return (0);
}

说明:

VSOA 并行 RPC 接口说明可参考 C 扩展编程手册

URL 匹配规则

URL 标识RPC 匹配规则
"/"默认监听所有事件
"/a/b/c"只处理 "/a/b/c" 标识的调用
"/a/b/c/"处理 "/a/b/c" 和 "/a/b/c/..." 标识的所有调用

注意:

如果 "/a/b/c" 和 "/a/b/c/" 的 RPC 监听处理同时存在,则客户端发起一个 "/a/b/c" 标识的 RPC 调用时, 系统将优先匹配 "/a/b/c",而非 "/a/b/c/"。

文档内容是否对您有所帮助?
有帮助
没帮助