服务端开发
本节内容介绍 VSOA 服务端使用 RPC 的方法。
开发须知
微服务对外提供 RPC 服务时,需要指定该 RPC 的 URL 资源标识并将该 URL 加入监听,同时需要指定 RPC 的回调函数用于处理 RPC 被调用时的逻辑。
常用接口
bool vsoa_server_add_listener(vsoa_server_t *server, const vsoa_url_t *url, vsoa_server_cmd_func_t callback, void *arg);
void vsoa_server_remove_listener(vsoa_server_t *server, const vsoa_url_t *url);
bool vsoa_server_cli_reply(vsoa_server_t *server, vsoa_cli_id_t id, uint8_t status, uint32_t seqno, uint16_t tunid, const vsoa_payload_t *payload);
server.on
void on(String url, CBOnCall cb);
func (s *Server) On(servicePath string, serviceMethod protocol.RpcMessageType, handler func(*protocol.Message, *protocol.Message)) (err error)
server.command(url: str, wq: vsoa:WorkQueue = None) -> callable
说明:
RPC 服务端的详细接口说明可参考以下手册:
- C 版本可参考 VSOA C 语言编程手册。
- JavaScript 版本可参考 VSOA JavaScript 编程手册。
- Java 版本可参考 VSOA Java 编程手册。
- Golang 版本可参考 VSOA Golang 编程手册。
- Python 版本可参考 VSOA Pyhon 编程手册。
开发示例
例如:将“灯光状态”定义为一个 /light
的 URL 资源标识,供调用 RPC 的客户端获取灯光状态。
向所创建的 VSOA 服务中增加一个 RPC 实现,需要指定当客户端调用 /light
时,将被执行的 RPC 回调函数。
/*
* Application: light_server
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_server.h"
#include "vsoa_platform.h"
#define SERVER_PASSWD "123456"
#define VSOA_AUTO_PORT_ENV "VSOA_AUTO_PORT="
#define SERVER_DEFAULT_PORT 3001
static vsoa_server_t *server;
/*
* /light Callback
*/
static void command_light (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
vsoa_header_t *vsoa_hdr, vsoa_url_t *url,
vsoa_payload_t *payload)
{
vsoa_payload_t send;
uint32_t seqno = vsoa_parser_get_seqno(vsoa_hdr);
send.data = NULL;
send.data_len = 0;
send.param = "{\"light\": 1}";
send.param_len = strlen(send.param);
vsoa_server_cli_reply(server, cid, 0, seqno, 0, &send);
}
int main (int argc, char **argv)
{
struct sockaddr_in addr;
uint16_t server_port = SERVER_DEFAULT_PORT;
char *autoPort = getenv(VSOA_AUTO_PORT_ENV);
if (autoPort != NULL) {
fprintf(stdout, "ser port is %s .\n", autoPort);
server_port = atoi(autoPort);
if (server_port == 0) {
server_port = SERVER_DEFAULT_PORT;
}
}
bzero(&addr, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(server_port);
addr.sin_addr.s_addr = INADDR_ANY;
#ifdef VSOA_HAS_SIN_LEN
addr.sin_len = sizeof(struct sockaddr_in);
#endif
/*
* Initialize server
*/
server = vsoa_server_create("{\"name\":\"light_server\"}");
if (!server) {
fprintf(stderr, "Can not create VSOA server!\n");
return (-1);
}
/*
* If need password
*/
vsoa_server_passwd(server, SERVER_PASSWD);
/*
* Add /light listener
*/
vsoa_url_t url;
url.url = "/light";
url.url_len = strlen(url.url);
vsoa_server_add_listener(server, &url, command_light, NULL);
/*
* Start server
*/
if (!vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {
vsoa_server_close(server);
fprintf(stderr, "Can not start VSOA server!\n");
return (-1);
}
int cnt, max_fd;
fd_set fds;
struct timespec timeout = { 1, 0 };
while (1) {
FD_ZERO(&fds);
max_fd = vsoa_server_fds(server, &fds);
cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
if (cnt > 0) {
vsoa_server_input_fds(server, &fds);
}
}
return (0);
}
/*
* light_server.js
*/
var vsoa = require('vsoa');
var socket = require('socket');
/*
* Create server
*/
var server = new vsoa.Server({
info: {
name: 'light_server'
}, passwd: '123456'
});
/*
* Client event
*/
server.onclient = function(client, link) {
console.log(`Client link ${link} address: ${client.address().addr}`);
if (link) {
client.setKeepAlive(5000);
}
};
/*
* /light Callback
*/
server.on('/light', function (cli, request, payload) {
const content = {}
content.param = {'light': 1}
cli.reply(0, request.seqno, content)
})
/*
* Server start
*/
server.start({
domain: socket.AF_INET, addr: '127.0.0.1', port: 3001
});
/*
* Event loop
*/
require('iosched').forever();
/*
* light_server.java
*/
import java.net.InetSocketAddress;
import com.acoinfo.vsoa.*;
import com.acoinfo.vsoa.Server.CliHandle;
public class light_server {
private static String SERVER_NAME = "light_server";
private static String SERVER_INFO = "\"java language VSOA server\"";
private static String PASSWORD = "123456";
private static String SERVER_ADDR = "0.0.0.0";
private static int SERVER_PORT = 3001;
static Server server;
public static void main(String[] args) {
/*
* Initialize server
*/
try {
ServerOption opt = new ServerOption(SERVER_INFO, PASSWORD, false);
InetSocketAddress address = new InetSocketAddress(SERVER_ADDR, SERVER_PORT);
server = new Server(opt) {
@Override
public void onClient(CliHandle client, boolean link) {
if (!client.isConnected()) {
System.out.println("disconnected");
}
System.out.println("Client link " + link + " address: " + client.address().toString());
}
};
/*
* Start server
*/
if (!server.start(address, null)) {
return;
}
/*
* /light Callback
*/
server.on("/light", new CBOnCall() {
@Override
public boolean Callback(String url, CliHandle client, Request req, Payload payload) {
payload.param = "{\"light\": 1}";
client.reply(Constant.SUCCESS, req.seqno, payload);
return true;
}
});
} catch (Exception e1) {
e1.printStackTrace();
return ;
}
while(true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
package main
import (
"encoding/json"
"strconv"
"time"
"gitee.com/sylixos/go-vsoa/protocol"
"gitee.com/sylixos/go-vsoa/server"
)
type RpcLightParam struct {
LightStatus int `json:"light"`
}
var lightstatus = 1
func startServer() {
/*
* Initialize server
*/
serverOption := server.Option{
Password: "123456",
}
s := server.NewServer("golang VSOA RPC server", serverOption)
/*
* /light Callback on RPC GET
*/
handleLightGet := func(req, res *protocol.Message) {
res.Param, _ = json.RawMessage(`{"light":` + strconv.Itoa(lightstatus) + `}`).MarshalJSON()
res.Data = req.Data
}
s.On("/light", protocol.RpcMethodGet, handleLightGet)
go func() {
_ = s.Serve("localhost:3001")
}()
}
func main() {
startServer()
for {
time.Sleep(1 * time.Second)
}
}
from vsoa.server import Server, Client
from vsoa.interface import Request, Payload
# Create server
server = Server('light_server', '123456')
# Client event
def onclient(cli: Client, conn: bool):
print('Client link {} address: {}'.format(conn, cli.address()))
if conn:
cli.keepalive(5)
server.onclient = onclient
# /light callback
@server.command('/light')
def light(cli: Client, request: Request, payload: Payload):
content = vsoa.Payload(param={'light': 1})
cli.reply(seqno=request.seqno, payload=content, status=0)
# Server start
server.run('127.0.0.1', 3001)
说明:
- 在 Node.js 的环境中,不需要事件循环,即删除
require('iosched').forever();
。- 在 Node.js 的环境中,
socket.AF_INET
应为vsoa.AF_INET
,详情可见 https://www.npmjs.com/package/vsoa 。
VSOA vsoa_payload_t 结构体
在使用 C 语言开发过程中,需要特别注意 vsoa_payload_t
结构体的使用方法。该结构体中有 param 和 data 两个字段提供给用户以携带数据,长度分别为 param_len 和 data_len。其中: param 参数推荐用于字符串类型参数,data 参数用于二进制数据或者 base64 类型数据。如果用户没有用到该字段,需要将对应的字段置为 NULL 并将其长度置为 0。
例如,如果只需要 param 参数不需要 data 参数,需要按照如下形式生成 payload:
vsoa_payload_t send;
send.data = NULL;
send.data_len = 0;
send.param = "{\"light\": 1}";
send.param_len = strlen(send.param);
关于 vsoa_payload_t
的详细说明可见 VSOA Base Data Struct 中的 Payload 说明。
RPC GET/SET
如同定义函数时习惯区分函数的功能为“设置/获取”或“输入/输出”的概念,在 VSOA 的 RPC 中也可以通过读取 VSOA 报文头标志对 RPC 请求区分 GET 或 SET 操作。针对如下示例约定,当需要开启灯光时,向灯光控制微服务发送数据 "1";相反,当需要关闭灯光时,向灯光控制微服务发送数据 "0"。
static void command_light (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
vsoa_header_t *vsoa_hdr, vsoa_url_t *url,
vsoa_payload_t *payload)
{
vsoa_payload_t send;
static int light_status = 0;
uint32_t seqno = vsoa_parser_get_seqno(vsoa_hdr);
uint8_t flags = vsoa_parser_get_flags(vsoa_hdr);
#define LIGHT_ON "{\"light\": 1}"
#define LIGHT_OFF "{\"light\": 0}"
if (flags & VSOA_FLAG_SET) {
if (!strncmp(payload->param, LIGHT_ON, strlen(LIGHT_ON))) {
light_status = 1;
} else if (!strncmp(payload->param, LIGHT_OFF, strlen(LIGHT_OFF))) {
light_status = 0;
}
}
send.param = light_status ? LIGHT_ON : LIGHT_OFF;
send.data = NULL;
send.data_len = 0;
send.param_len = strlen(send.param);
vsoa_server_cli_reply(server, cid, 0, seqno, 0, &send);
}
server.on('/light', function (cli, request, payload) {
const content = {}
if (request.method == vsoa.method.SET) {
content.param = {'light': 1}
} else if (request.method == vsoa.method.GET) {
content.param = {'light': 0}
}
cli.reply(0, request.seqno, content)
})
private static String LIGHT_ON = "{\"light\": 1}";
private static String LIGHT_OFF = "{\"light\": 0}";
private static int lightStatus = 0;
public static void main(String[] args) {
……
server.on("/light", new CBOnCall() {
@Override
public boolean Callback(String url, CliHandle client, Request req, Payload payload) {
if (req.method == Request.VSOA_METHOD_SET) {
if (payload.param == LIGHT_ON) {
lightStatus = 1;
payload.param = LIGHT_ON;
} else {
lightStatus = 0;
payload.param = LIGHT_OFF;
}
} else {
payload.param = lightStatus == 1 ? LIGHT_ON : LIGHT_OFF;
}
client.reply(Constant.SUCCESS, req.seqno, payload);
return true;
}
});
}
……
func startServer() {
……
/*
* /light Callback on RPC GET
*/
handleLightGet := func(req, res *protocol.Message) {
res.Param, _ = json.RawMessage(`{"light":` + strconv.Itoa(lightstatus) + `}`).MarshalJSON()
res.Data = req.Data
}
s.On("/light", protocol.RpcMethodGet, handleLightGet)
/*
* /light Callback on RPC SET
*/
handleLightSet := func(req, res *protocol.Message) {
reqParam := new(RpcLightParam)
err := json.Unmarshal(req.Param, reqParam)
if err != nil {
res.Param, _ = json.RawMessage(`{"light":` + strconv.Itoa(lightstatus) + `}`).MarshalJSON()
return
}
lightstatus = reqParam.LightStatus
res.Param, _ = json.RawMessage(`{"light":` + strconv.Itoa(lightstatus) + `}`).MarshalJSON()
res.Data = req.Data
}
s.On("/light", protocol.RpcMethodSet, handleLightSet)
……
}
……
# /light callback
@server.command('/light')
def light(cli: Client, request: Request, payload: Payload):
match request.method:
case vsoa.METHOD_SET: content = vsoa.Payload(param={'light': 1})
case vsoa.METHOD_GET: content = vsoa.Payload(param={'light': 0})
case _: content = {}
cli.reply(seqno=request.seqno, payload=content, status=0)
并行 RPC
以上 RPC 使用方式中的所有回调函数都是串行执行的,VSOA 为 C 语言开发版本提供了并行处理 RPC 请求的功能,开发者可以通过并发 RPC 相关接口创建线程池并行处理 RPC 请求。
如下是将以上开发示例修改为并行 RPC 的示例:
/*
* Application: light_server
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_server.h"
#include "vsoa_platform.h"
#include "vsoa_plistener.h"
#define SERVER_PASSWD "123456"
#define VSOA_AUTO_PORT_ENV "VSOA_AUTO_PORT="
#define SERVER_DEFAULT_PORT 3001
#define PLISTEN_THREAD_COUNT 2
static vsoa_server_t *server;
/*
* /light Callback
*/
static void command_light (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
vsoa_header_t *vsoa_hdr, vsoa_url_t *url,
vsoa_payload_t *payload)
{
vsoa_payload_t send;
uint32_t seqno = vsoa_parser_get_seqno(vsoa_hdr);
send.data = NULL;
send.data_len = 0;
send.param = "{\"light\": 1}";
send.param_len = strlen(send.param);
vsoa_server_cli_reply(server, cid, 0, seqno, 0, &send);
}
int main (int argc, char **argv)
{
struct sockaddr_in addr;
uint16_t server_port = SERVER_DEFAULT_PORT;
char *autoPort = getenv(VSOA_AUTO_PORT_ENV);
if (autoPort != NULL) {
fprintf(stdout, "ser port is %s .\n", autoPort);
server_port = atoi(autoPort);
if (server_port == 0) {
server_port = SERVER_DEFAULT_PORT;
}
}
bzero(&addr, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(server_port);
addr.sin_addr.s_addr = INADDR_ANY;
#ifdef VSOA_HAS_SIN_LEN
addr.sin_len = sizeof(struct sockaddr_in);
#endif
/*
* Initialize server
*/
server = vsoa_server_create("{\"name\":\"light_server\"}");
if (!server) {
fprintf(stderr, "Can not create VSOA server!\n");
return (-1);
}
/*
* If need password
*/
vsoa_server_passwd(server, SERVER_PASSWD);
/*
* Create plistener
*/
vsoa_plistener_t *pl;
pl = vsoa_server_plistener_create(PLISTEN_THREAD_COUNT);
if (!pl) {
vsoa_server_close(server);
fprintf(stderr, "Can not create VSOA parallel server!\n");
return (-1);
}
/*
* Add /light listener
*/
vsoa_url_t url;
url.url = "/light";
url.url_len = strlen(url.url);
vsoa_server_add_listener(server, &url, command_light, NULL);
/*
* Add parallel listener
*/
vsoa_plistener_handler_t handler;
handler = vsoa_server_plistener_handler(pl, true, 0, command_light, NULL);
vsoa_server_add_listener(server, &url, handler.callback, handler.arg);
/*
* Start server
*/
if (!vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {
vsoa_server_close(server);
fprintf(stderr, "Can not start VSOA server!\n");
return (-1);
}
int cnt, max_fd;
fd_set fds;
struct timespec timeout = { 1, 0 };
while (1) {
FD_ZERO(&fds);
max_fd = vsoa_server_fds(server, &fds);
cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
if (cnt > 0) {
vsoa_server_input_fds(server, &fds);
}
}
return (0);
}
说明:
VSOA 并行 RPC 接口说明可参考 C 扩展编程手册。
URL 匹配规则
URL 标识 | RPC 匹配规则 |
---|---|
"/" | 默认监听所有事件 |
"/a/b/c" | 只处理 "/a/b/c" 标识的调用 |
"/a/b/c/" | 处理 "/a/b/c" 和 "/a/b/c/..." 标识的所有调用 |
注意:
如果 "/a/b/c" 和 "/a/b/c/" 的 RPC 监听处理同时存在,则客户端发起一个 "/a/b/c" 标识的 RPC 调用时, 系统将优先匹配 "/a/b/c",而非 "/a/b/c/"。