Regulator 简介
VSOA Regulator 为微服务开发提供了变速器功能,本节将指导您如何使用 VSOA Regulator。
开发须知
在许多场景中,我们需要控制发布频率。例如,一个 UI 界面不能接收过多的数据更新频率,但是此时发布者的发布频率可能非常快。在这种场景下,我们需要改变发布的频率。
使用 VSOA Regulator 可以解决这个问题,它既可以使用在服务端,也可以使用在客户端。
本文中使用的客户端对应 发布与订阅 章节的服务端例程。
常用接口
vsoa_regulator_t *vsoa_regulator_create(const struct timespec *period);
bool vsoa_regulator_slot(vsoa_regulator_t *regulator, const vsoa_url_t *url, vsoa_regulator_func_t ondelay, void *arg, size_t buf_size);
void vsoa_regulator_delete(vsoa_regulator_t *regulator);
bool vsoa_regulator_period(vsoa_regulator_t *regulator, const struct timespec *period);
bool vsoa_regulator_unslot(vsoa_regulator_t *regulator, const vsoa_url_t *url);
bool vsoa_regulator_has_slot(vsoa_regulator_t *regulator, const vsoa_url_t *url, size_t *buf_size);
bool vsoa_regulator_update(vsoa_regulator_t *regulator, const vsoa_url_t *url, const vsoa_payload_t *payload);
vsoa.Regulator(client, period);
vsoa.Regulator.slot(client, period, url);
Regulator(Client client, int period) ;
boolean slot(String url);
void onMessage(String url, Payload payload, boolean quick);
StartRegulator(interval time.Duration) error
StopRegulator() error
Slot(URL string, onPublish func(m *protocol.Message)) error
UnSlot(URL string) error
说明:
变速器功能的详细接口说明可参考以下手册:
- C 版本可参考 C 扩展编程手册。
- JavaScript 版本可参考 VSOA JavaScript 编程手册。
- Java 版本可参考 VSOA Java 编程手册。
- Golang 版本可参考 VSOA Golang 编程手册。
开发步骤
- 调用
vsoa_regulator_create
接口,创建一个变速器实例。入参为变速器输出的时间间隔。返回值为变速器的句柄。 - 调用
vsoa_regulator_slot
接口,将变速器与侦听的 URL 绑定,并注册变频回调函数,用于接收变频后的数据。 - 按照 客户端开发 流程介绍,创建对应的客户端。
- 在客户端订阅的回调函数中,调用
vsoa_regulator_update
将payload
数据推送到创建的变速器实例中。 - 在
vsoa_regulator_slot
接口注册的变频回调函数中接收变频后的payload
数据。 - 当不需要该变速器时,调用
vsoa_regulator_unslot
释放该 URL 的注册。 - 调用
vsoa_regulator_delete
释放该变速器实例。
开发示例
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#ifdef SYLIXOS
#include <sys/vproc.h>
#endif
#include "vsoa_cliauto.h"
#include "vsoa_regulator.h"
/* My server password */
#define MY_SERVER_PASSWD "123456"
/* My client */
static vsoa_client_t *client;
/* My client auto */
static vsoa_client_auto_t *cliauto;
/* My subscribe (string pointer array) */
static char *sub_urls[] = { "/axis" };
/* My regulator */
static vsoa_regulator_t* regulator;
static void onconnect (void *arg, vsoa_client_auto_t *cliauto, bool connect, const char *info)
{
printf("On connect, connect: %s, info: %s\n",
(connect == true) ? "connected" : "disconnected", info);
}
/*
* Slow on message
*/
static void slow_onmessage (void *arg, vsoa_url_t *url, vsoa_payload_t *payload)
{
printf("On -S.L.O.W- message, URL: %.*s payload: %.*s\n",
(int)url->url_len, url->url, (int)payload->param_len, payload->param);
}
/*
* Fast on message
*/
static void fast_onmessage (void *arg, struct vsoa_client *client, vsoa_url_t *url, vsoa_payload_t *payload, bool quick)
{
printf("On -F.A.S.T- message, URL: %.*s payload: %.*s\n",
(int)url->url_len, url->url, (int)payload->param_len, payload->param);
vsoa_regulator_update(regulator, url, payload);
}
/*
* main function
*/
int main (int argc, char **argv)
{
/*
* Notify data every 3 seconds.
*/
struct timespec period = { 3, 0 };
vsoa_url_t url;
#ifdef SYLIXOS
vprocExitModeSet(getpid(), LW_VPROC_EXIT_FORCE);
#endif
regulator = vsoa_regulator_create(&period);
if (regulator == NULL) {
fprintf(stderr, "Cannot create regulator\n");
goto error;
}
url.url = "/axis";
url.url_len = strlen(url.url);
/*
* Create slot with 1024 bytes buffer.
*/
if (!vsoa_regulator_slot(regulator, &url, slow_onmessage, NULL, 1024)) {
fprintf(stderr, "Cannot register regulator slot\n");
goto error;
}
/*
* Create client auto robot
*/
cliauto = vsoa_client_auto_create(fast_onmessage, NULL);
client = vsoa_client_auto_handle(cliauto);
if (!vsoa_client_auto_setup(cliauto, onconnect, NULL)) {
vsoa_client_auto_delete(cliauto);
fprintf(stderr, "Cannot register connect callback: %s (%d)\n", strerror(errno), errno);
goto error;
}
/*
* Client auto robot start
* The robot will automatically connect to the specified server and maintain the connection.
* At this time, the developer only needs to focus on the business.
*/
vsoa_client_auto_start(cliauto, "vsoa://axis_server", MY_SERVER_PASSWD,
(char * const*)sub_urls, 1, 1000, 1000, 1000);
while (true) {
sleep(1);
}
error:
if (regulator) {
if (vsoa_regulator_has_slot(regulator, &url, NULL)) {
/*
* Release the regulator slot.
*/
vsoa_regulator_unslot(regulator, &url);
}
/*
* Release the regulator.
*/
vsoa_regulator_delete(regulator);
}
return (-1);
}
/* Server name to connect */
var vsoa = require('vsoa');
/* Server name to connect */
const SERVER_NAME = 'axis_server';
/* Client Option */
var option = { passwd: '123456' };
/* Client */
var client = new vsoa.Client(option);
/*
* Listen Subscribed message
*/
client.on('message', (url, payload, quick) {
console.log('received event:', url, 'payload:', JSON.stringify(payload))
})
/*
* Listen connect event
*/
client.on('connect', function(info) {
console.info('Connected with server:', JSON.stringify(info));
client.subscribe('/axis', error => {
console.log('Subscribe /', error ? 'error' : 'OK');
})
});
var regulator = new vsoa.Regulator(client, 3000);
regulator.slot('/axis').on('message', (url, payload, quick) {
console.log('regulator received event:', url, 'payload:', JSON.stringify(payload))
});
client.connect(`vsoa://${SERVER_NAME}`, (error, info) => {
if (error)
throw error;
});
/*
* Event loop
*/
require('iosched').forever();
import java.net.InetSocketAddress;
import com.acoinfo.vsoa.*;
public class client_test {
private static String SERVER_NAME = "axis_server";
private static String PASSWORD = "123456";
public static Client client;
public static void main(String[] args) {
/*
* Initialize client
*/
client = new Client(new ClientOption(PASSWORD, 6000, 4000, 3, false)) {
@Override
public void onError(Error error) {
System.out.println("Client error:" + error.message);
}
/*
* Message receiver
*/
@Override
public void onMessage(String url, Payload payload, boolean quick) {
if (payload.param.length() > 100) {
System.out.println("[CLIENT] received event: " + url +
" payload len: " + payload.param.length());
} else {
System.out.println("[CLIENT] received event: " + url +
" payload: " + payload.param);
}
}
@Override
public void onConnected(String info) {
System.out.println("Connected with server:" + info);
}
};
Regulator regulator = new Regulator(client, 3000) {
@Override
public void onMessage( String url, Payload payload, boolean quick) {
System.out.println("[REGULATOR] received event: " + url + " payload: " + payload.param);
}
};
regulator.slot("/axis");
if (!client.connect("vsoa://" + SERVER_NAME, null, Constant.VSOA_DEF_CONN_TIMEOUT)) {
System.out.println("Connected with server failed");
return;
}
/*
* Subscribe /axis
*/
client.subscribe("/axis", new CBSubscribe() {
@Override
public void callback(Error error) {
System.out.println("Subscribe /axis:" + (error != null ? "error" : "OK"));
}
});
while(true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
package main
import (
"encoding/json"
"errors"
"fmt"
"time"
"gitee.com/sylixos/go-vsoa/client"
"gitee.com/sylixos/go-vsoa/protocol"
)
type PublishTestParam struct {
Roll int `json:"roll"`
Pitch int `json:"pitch"`
Yaw int `json:"yaw"`
}
type callback struct{}
func VsoaGetPublishCall() {
// Set client options
clientOption := client.Option{
Password: "123456",
}
// Create a new client instance
c := client.NewClient(clientOption)
// Start the client publish regulator
c.StartRegulator(3 * time.Second)
// Connect to the VSOA server
_, err := c.Connect("vsoa", "localhost:3001")
if err != nil {
fmt.Println(err)
return
}
// Close the connection when the function exits
defer c.Close()
cb := new(callback)
// Subscribe to the "/axis" channel. but act nothing.
err = c.Subscribe("/axis", c.NoopPublish)
if err != nil {
// Handle the error
if err == errors.New(protocol.StatusText(protocol.StatusInvalidUrl)) {
fmt.Println("Pass: Invalid URL")
} else {
fmt.Println(err)
}
return
}
// When publishes on URL "/axis", act callback in speed of client's regulator.
c.Slot("/axis", cb.getPublishParam)
// Wait for 10 seconds
time.Sleep(10 * time.Second)
}
func main() {
VsoaGetPublishCall()
}
// getPublishParam parses the JSON parameter from the message and prints the "Axis" field
func (c callback) getPublishParam(m *protocol.Message) {
// Create a new PublishTestParam instance
param := new(PublishTestParam)
// Unmarshal the JSON parameter into the param variable
json.Unmarshal(m.Param, param)
// Print the value of the "Publish" field
fmt.Println("VSOA Get: Axis fields, Roll:", param.Roll, " Pitch:", param.Pitch, " Yaw:", param.Yaw)
}
注意事项
C/C++ 服务端编译时需链接如下表所示的 VSOA 动态库,在 RealEvo-IDE 中配置时请参考 C/C++ 环境验证,Linux 下开发请参考 搭建 Linux 运行环境 提供的 C 语言范例进行配置。
库名称 | 功能 |
---|---|
libvsoa-json.so | 提供 JSON 功能 |
libvsoa-server.so | 提供服务端功能 |
libvsoa-parser.so | 提供参数解析功能 |