Regulator 简介

更新时间:
2024-12-19

Regulator 简介

VSOA Regulator 为微服务开发提供了变速器功能,本节将指导您如何使用 VSOA Regulator。

开发须知

在许多场景中,我们需要控制发布频率。例如,一个 UI 界面不能接收过多的数据更新频率,但是此时发布者的发布频率可能非常快。在这种场景下,我们需要改变发布的频率。

使用 VSOA Regulator 可以解决这个问题,它既可以使用在服务端,也可以使用在客户端。

本文中使用的客户端对应 发布与订阅 章节的服务端例程

常用接口

vsoa_regulator_t *vsoa_regulator_create(const struct timespec *period);
bool vsoa_regulator_slot(vsoa_regulator_t *regulator, const vsoa_url_t *url, vsoa_regulator_func_t ondelay, void *arg, size_t buf_size);
void vsoa_regulator_delete(vsoa_regulator_t *regulator);
bool vsoa_regulator_period(vsoa_regulator_t *regulator, const struct timespec *period);
bool vsoa_regulator_unslot(vsoa_regulator_t *regulator, const vsoa_url_t *url);
bool vsoa_regulator_has_slot(vsoa_regulator_t *regulator, const vsoa_url_t *url, size_t *buf_size);
bool vsoa_regulator_update(vsoa_regulator_t *regulator, const vsoa_url_t *url, const vsoa_payload_t *payload);
vsoa.Regulator(client, period);
vsoa.Regulator.slot(client, period, url);
Regulator(Client client, int period) ;
boolean slot(String url);
void onMessage(String url, Payload payload, boolean quick);
StartRegulator(interval time.Duration) error
StopRegulator() error
Slot(URL string, onPublish func(m *protocol.Message)) error
UnSlot(URL string) error

说明:

变速器功能的详细接口说明可参考以下手册:

开发步骤

  1. 调用 vsoa_regulator_create 接口,创建一个变速器实例。入参为变速器输出的时间间隔。返回值为变速器的句柄。
  2. 调用 vsoa_regulator_slot 接口,将变速器与侦听的 URL 绑定,并注册变频回调函数,用于接收变频后的数据。
  3. 按照 客户端开发 流程介绍,创建对应的客户端。
  4. 在客户端订阅的回调函数中,调用 vsoa_regulator_updatepayload 数据推送到创建的变速器实例中。
  5. vsoa_regulator_slot 接口注册的变频回调函数中接收变频后的 payload 数据。
  6. 当不需要该变速器时,调用 vsoa_regulator_unslot 释放该 URL 的注册。
  7. 调用 vsoa_regulator_delete 释放该变速器实例。

开发示例

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#ifdef SYLIXOS
#include <sys/vproc.h>
#endif
#include "vsoa_cliauto.h"
#include "vsoa_regulator.h"

/* My server password */
#define MY_SERVER_PASSWD "123456"

/* My client */
static vsoa_client_t *client;

/* My client auto */
static vsoa_client_auto_t *cliauto;

/* My subscribe (string pointer array) */
static char *sub_urls[] = { "/axis" };

/* My regulator */
static vsoa_regulator_t* regulator;

static void onconnect (void *arg, vsoa_client_auto_t *cliauto, bool connect, const char *info)
{
    printf("On connect, connect: %s, info: %s\n", 
            (connect == true) ? "connected" : "disconnected", info);
}

/* 
 * Slow on message
 */
static void slow_onmessage (void *arg, vsoa_url_t *url, vsoa_payload_t *payload)
{
    printf("On -S.L.O.W- message, URL: %.*s payload: %.*s\n",
           (int)url->url_len, url->url, (int)payload->param_len, payload->param);
}

/*
 * Fast on message
 */
static void fast_onmessage (void *arg, struct vsoa_client *client, vsoa_url_t *url, vsoa_payload_t *payload, bool quick)
{
    printf("On -F.A.S.T- message, URL: %.*s payload: %.*s\n",
           (int)url->url_len, url->url, (int)payload->param_len, payload->param);

    vsoa_regulator_update(regulator, url, payload);
}

/*
 * main function
 */
int main (int argc, char **argv)
{
    /* 
     * Notify data every 3 seconds.
     */
    struct timespec period = { 3, 0 }; 
    vsoa_url_t      url;

#ifdef SYLIXOS
    vprocExitModeSet(getpid(), LW_VPROC_EXIT_FORCE);
#endif

    regulator = vsoa_regulator_create(&period);
    if (regulator == NULL) {
        fprintf(stderr, "Cannot create regulator\n");
        goto    error;
    }

    url.url = "/axis";
    url.url_len = strlen(url.url);

    /*
     * Create slot with 1024 bytes buffer.
     */
    if (!vsoa_regulator_slot(regulator, &url, slow_onmessage, NULL, 1024)) {
        fprintf(stderr, "Cannot register regulator slot\n");
        goto    error;
    }

    /*
     * Create client auto robot
     */
    cliauto = vsoa_client_auto_create(fast_onmessage, NULL);
    client  = vsoa_client_auto_handle(cliauto);

    if (!vsoa_client_auto_setup(cliauto, onconnect, NULL)) {
        vsoa_client_auto_delete(cliauto);
        fprintf(stderr, "Cannot register connect callback: %s (%d)\n", strerror(errno), errno);
        goto    error;
    }

    /*
     * Client auto robot start
     * The robot will automatically connect to the specified server and maintain the connection.
     * At this time, the developer only needs to focus on the business.
     */
    vsoa_client_auto_start(cliauto, "vsoa://axis_server", MY_SERVER_PASSWD,
                           (char * const*)sub_urls, 1, 1000, 1000, 1000);

    while (true) {
        sleep(1);
    }

error:
    if (regulator) {
        if (vsoa_regulator_has_slot(regulator, &url, NULL)) {

            /*
             * Release the regulator slot.
             */
            vsoa_regulator_unslot(regulator, &url);
        }

        /*
         * Release the regulator.
         */
        vsoa_regulator_delete(regulator);
    }

    return  (-1);
}

/* Server name to connect */
var vsoa = require('vsoa');

/* Server name to connect */
const SERVER_NAME = 'axis_server';

/* Client Option */
var option = { passwd: '123456' };

/* Client */
var client = new vsoa.Client(option);

/*
 * Listen Subscribed message
 */
client.on('message', (url, payload, quick) {
    console.log('received event:', url, 'payload:', JSON.stringify(payload))
})

/*
 * Listen connect event
 */
client.on('connect', function(info) {
    console.info('Connected with server:', JSON.stringify(info));
    
    client.subscribe('/axis', error => {
        console.log('Subscribe /', error ? 'error' : 'OK');
    })
});

var regulator = new vsoa.Regulator(client, 3000);

regulator.slot('/axis').on('message', (url, payload, quick) {
    console.log('regulator received event:', url, 'payload:', JSON.stringify(payload))
});
    
client.connect(`vsoa://${SERVER_NAME}`, (error, info) => {
    if (error) 
        throw error;
});

/*
 * Event loop
 */
require('iosched').forever();

import java.net.InetSocketAddress;
import com.acoinfo.vsoa.*;

public class client_test {
    private  static String  SERVER_NAME   = "axis_server";
    private  static String  PASSWORD      = "123456";

    public   static Client client;

    public static void main(String[] args) {

        /*
        * Initialize client
        */
        client = new Client(new ClientOption(PASSWORD, 6000, 4000, 3, false)) {

            @Override
            public void onError(Error error) {
                System.out.println("Client error:" + error.message);
            }

            /*
            * Message receiver
            */
            @Override
            public void onMessage(String url, Payload payload, boolean quick) {
                if (payload.param.length() > 100) {
                    System.out.println("[CLIENT] received event: " + url + 
                                    " payload len: " + payload.param.length());
                } else {
                    System.out.println("[CLIENT] received event: " + url + 
                                    " payload: " + payload.param);
                }
            }

            @Override
            public void onConnected(String info) {
                System.out.println("Connected with server:" + info);
            }
        }; 

        Regulator regulator = new Regulator(client, 3000) {
            
            @Override
            public void onMessage( String url, Payload payload, boolean quick) {
                System.out.println("[REGULATOR] received event: " + url + " payload: " + payload.param);
            }
        };

        regulator.slot("/axis");
        
        if (!client.connect("vsoa://" + SERVER_NAME, null, Constant.VSOA_DEF_CONN_TIMEOUT)) {
            System.out.println("Connected with server failed");
            return;
        }

        /*
        * Subscribe /axis
        */
        client.subscribe("/axis", new CBSubscribe() {
            @Override
            public void callback(Error error) {
                System.out.println("Subscribe /axis:" + (error != null ? "error" : "OK"));
            }
        });

        while(true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}
package main

import (
    "encoding/json"
    "errors"
    "fmt"
    "time"

    "gitee.com/sylixos/go-vsoa/client"
    "gitee.com/sylixos/go-vsoa/protocol"
)

type PublishTestParam struct {
    Roll  int `json:"roll"`
    Pitch int `json:"pitch"`
    Yaw   int `json:"yaw"`
}

type callback struct{}

func VsoaGetPublishCall() {
    // Set client options
    clientOption := client.Option{
        Password: "123456",
    }

    // Create a new client instance
    c := client.NewClient(clientOption)
    // Start the client publish regulator
    c.StartRegulator(3 * time.Second)

    // Connect to the VSOA server
    _, err := c.Connect("vsoa", "localhost:3001")
    if err != nil {
        fmt.Println(err)
        return
    }

    // Close the connection when the function exits
    defer c.Close()

    cb := new(callback)

    // Subscribe to the "/axis" channel. but act nothing.
    err = c.Subscribe("/axis", c.NoopPublish)
    if err != nil {
        // Handle the error
        if err == errors.New(protocol.StatusText(protocol.StatusInvalidUrl)) {
            fmt.Println("Pass: Invalid URL")
        } else {
            fmt.Println(err)
        }
        return
    }

    // When publishes on URL "/axis", act callback in speed of client's regulator.
    c.Slot("/axis", cb.getPublishParam)

    // Wait for 10 seconds
    time.Sleep(10 * time.Second)
}

func main() {
    VsoaGetPublishCall()
}

// getPublishParam parses the JSON parameter from the message and prints the "Axis" field
func (c callback) getPublishParam(m *protocol.Message) {
    // Create a new PublishTestParam instance
    param := new(PublishTestParam)

    // Unmarshal the JSON parameter into the param variable
    json.Unmarshal(m.Param, param)

    // Print the value of the "Publish" field
    fmt.Println("VSOA Get: Axis fields, Roll:", param.Roll, " Pitch:", param.Pitch, " Yaw:", param.Yaw)
}

文档内容是否对您有所帮助?
有帮助
没帮助