消息队列

更新时间:
2024-05-29
下载文档

消息队列

在说明什么是消息队列前,我们先看一下例子:有两个线程(线程 A、线程 B,线程 A 的优先级较线程 B 的高)和一个变量 V,线程 A 需要写变量 V,线程 B 需要读变量 V。我们假设只有在变量 V 的值改变时,线程 B 才需要读变量 V,在变量 V 的值不变时,线程 B 需要阻塞。

如果我们继续使用条件变量进行线程间通信,在线程 A 快速频繁地修改变量 V 的值时,可能会造成线程 B 丢失一部分对变量 V 值改变的响应——应该被读出的旧值已经被新值所覆盖。

消息队列是一个可以存放多则消息的 FIFO(先入先出)队列。如果我们改用消息队列作为线程 A、B 间的通信手段,线程 A 将变量 V 修改后的值作为一则消息存入消息队列,线程 B 只需要从消息队列读出消息(即变量 V 修改后的值),那么在消息队列满前就不会出现线程 B 丢失一部分对变量 V 值改变的响应。

消息队列使得我们的软件更容易按功能模块划分和实现,不同功能模块使用不同线程实现,功能模块之间使用消息队列进行通信解耦合而不是定义调用接口。

比如 ADC 线程读取 ADC 转换后的结果,将结果存入消息队列,UI 线程从消息队列取出结果将其显示到屏幕上;按键线程读取用户按下的按键,将键值存入消息队列,UI 线程取出键值切换显示页面……,这样只有 UI 线程能操作显示界面,避免了显示错误的发生。

SylixOS 消息队列还支持紧急消息的发送,紧急消息直接插入到消息队列的首部,紧急消息将最先被处理,保证了某些异常情况下的安全。

一个 SylixOS 消息队列必须要调用 Lw_MsgQueue_Create 函数创建之后才能使用,如果创建成功,Lw_MsgQueue_Create 函数将返回一个消息队列的句柄。

线程如果需要接收消息,可以调用 Lw_MsgQueue_Receive 函数,中断服务程序不能调用 Lw_MsgQueue_Receive 函数接收消息,因为 Lw_MsgQueue_Receive 函数在消息队列为空时会阻塞当前线程。

中断服务程序可以使用 Lw_MsgQueue_TryReceive 函数尝试接收消息,Lw_MsgQueue_TryReceive 函数在消息队列为空队列时将立即返回,不会阻塞当前线程。

发送消息可以调用 Lw_MsgQueue_Send 函数。

当一个消息队列使用完毕后(并确保以后也不再使用),应该调用 Lw_MsgQueue_Delete 函数将其删除,SylixOS 会回收该消息队列占用的内核资源。

消息队列

消息队列的创建和删除

#include <SylixOS.h>
LW_HANDLE Lw_MsgQueue_Create(CPCHAR             pcName,
                             ULONG              ulMaxMsgCounter,
                             size_t             stMaxMsgByteSize,
                             ULONG              ulOption,
                             LW_OBJECT_ID      *pulId); 

函数 Lw_MsgQueue_Create 原型分析:

  • 此函数成功返回消息队列的句柄,失败时返回 LW_HANDLE_INVALID 并设置错误号。
  • 参数 pcName 是消息队列的名字。
  • 参数 ulMaxMsgCounter 是消息队列可容纳的最大消息个数。
  • 参数 stMaxMsgByteSize 是消息队列的单则消息的最大长度。
  • 参数 ulOption 是消息队列的创建选项如下表所示。
  • 输出参数 pulId 用于接收消息队列的 ID。

需要注意的是,消息队列中最大消息队列的最小值为 sizeof(size_t),也就是说创建消息队列的最小容量为 sizeof(size_t)个字节大小。

宏名含义
LW_OPTION_WAIT_PRIORITY按优先级顺序等待
LW_OPTION_WAIT_FIFO按先入先出顺序等待
LW_OPTION_OBJECT_GLOBAL全局对象
LW_OPTION_OBJECT_LOCAL本地对象
#include <SylixOS.h>
ULONG    Lw_MsgQueue_Delete(LW_HANDLE  *pulId); 
  • 此函数成功返回 ERROR_NONE,失败返回错误号。
  • 参数 pulId 是消息队列的句柄。

接收消息

#include <SylixOS.h>
ULONG    Lw_MsgQueue_Receive(LW_HANDLE   ulId,
                             PVOID       pvMsgBuffer,
                             size_t      stMaxByteSize,
                             size_t     *pstMsgLen,
                             ULONG       ulTimeout);
ULONG    Lw_MsgQueue_ReceiveEx(LW_HANDLE      ulId,
                               PVOID          pvMsgBuffer,
                               size_t         stMaxByteSize,
                               size_t        *pstMsgLen,
                               ULONG          ulTimeout,
                               ULONG          ulOption);
ULONG    Lw_MsgQueue_TryReceive(LW_HANDLE     ulId,
                                PVOID         pvMsgBuffer,
                                size_t        stMaxByteSize,
                                size_t       *pstMsgLen);

以上三个函数原型分析:

  • 函数成功返回 ERROR_NONE,失败返回错误号。
  • 参数 ulId 是消息队列的句柄。
  • 参数 pvMsgBuffer 指向用于接收消息的消息缓冲区(一个 void 类型的指针,可以指向任意类型)。
  • 参数 stMaxByteSize 是消息缓冲区的长度。
  • 输出参数 pstMsgLen 用于接收消息的长度。
  • 参数 ulTimeout 是等待的超时时间,单位为时钟嘀嗒 Tick。
  • 参数 ulOption 是消息队列的接收选项,如下表所示。
宏名含义
LW_OPTION_NOERROR大于缓冲区的消息自动截断(默认为此选项)

调用 Lw_MsgQueue_Receive 函数将从 ulId 代表的消息队列中获得消息:

  • 当队列中存在消息时,该函数将把消息复制到参数 pvMsgBuffer 向的消息缓冲区,如果缓冲区长度大于消息长度,缓冲区剩余部分不做修改;如果缓冲区长度小于消息长度,消息将被截断并且没有任何错误返回。Lw_MsgQueue_ReceiveEx 函数提供了消息错误检查机制,当消息被截断时,该函数将返回错误号 E2BIG。
  • 当队列中不存在消息时,线程将被阻塞,如果设置了 ulTimeout 的超时值为 LW_OPTION_WAIT_INFINITE,线程将永远阻塞直到消息到来;如果 ulTimeout 的超时值不为 LW_OPTION_WAIT_INFINITE,线程将在指定的时间超时后自动唤醒线程。

发送消息

#include <SylixOS.h>
ULONG    Lw_MsgQueue_Send(LW_HANDLE       ulId,
                          const PVOID     pvMsgBuffer,
                          size_t          stMsgLen);
ULONG    Lw_MsgQueue_SendEx(LW_HANDLE     ulId,
                            const PVOID   pvMsgBuffer,
                            size_t        stMsgLen,
                            ULONG         ulOption);

以上两个函数原型分析:

  • 函数成功返回 ERROR_NONE,失败返回错误号。
  • 参数 ulId 是消息队列的句柄。
  • 参数 pvMsgBuffer 指向需要发送的消息缓冲区(一个 void 类型的指针,可以指向任意类型)。
  • 参数 stMsgLen 是需要发送的消息的长度。
  • 参数 ulOption 是消息的发送选项,如下表所示。
宏名含义
LW_OPTION_DEFAULT默认的选项
LW_OPTION_URGENT紧急消息发送
LW_OPTION_BROADCAST广播发送

如果使用 LW_OPTION_URGENT 选项,那么该消息将被插入到消息队列的首部。如果使用 LW_OPTION_BROADCAST 选项,那么该消息将被传递到每一个等待该消息队列的线程。

带延时的发送消息

#include <SylixOS.h>
ULONG    Lw_MsgQueue_Send2(LW_HANDLE       ulId,
                           const PVOID     pvMsgBuffer,
                           size_t          stMsgLen
                           ULONG           ulTimeout);
ULONG    Lw_MsgQueue_SendEx2(LW_HANDLE     ulId,
                             const PVOID   pvMsgBuffer,
                             size_t        stMsgLen,
                             ULONG         ulTimeout
                             ULONG         ulOption);

以上两个函数原型分析:

  • 函数成功返回 ERROR_NONE,失败返回错误号。
  • 参数 ulId 是消息队列的句柄。
  • 参数 pvMsgBuffer 指向需要发送的消息缓冲区(一个 void 类型的指针,可以指向任意类型)。
  • 参数 stMsgLen 是需要发送的消息的长度。
  • 参数 ulTimeout 是发送消息的延时等待时间。
  • 参数 ulOption 是消息的发送选项,如下表所示。

以上两个函数与 Lw_MsgQueue_Send 函数不同的参数传递中增加了 ulTimeout 参数,该参数表示发送消息带延时等待功能,这意味着,当发送的消息队列满时,发送消息将等待 ulTimeout 时间,如果超时时间到时消息队列仍然处于满状态消息将被丢弃,否则消息被成功发送。

宏名含义
LW_OPTION_DEFAULT默认的选项
LW_OPTION_URGENT紧急消息发送
LW_OPTION_BROADCAST广播发送

消息队列的清除

#include <SylixOS.h>
ULONG    Lw_MsgQueue_Clear(LW_HANDLE  ulId);

函数 Lw_MsgQueue_Clear 原型分析:

  • 此函数成功返回 ERROR_NONE,失败返回错误号。
  • 参数 ulId 是消息队列的句柄。

消息队列的清除意味着队列中的所有消息将被删除(消息丢弃,队列仍然有效),企图从中接收消息不会得到预期的结果。

释放等待消息队列的所有线程

#include <SylixOS.h>
ULONG    Lw_MsgQueue_Flush(LW_HANDLE  ulId, 
                                ULONG      *pulThreadUnblockNum);

函数 Lw_MsgQueue_Flush 原型分析:

  • 此函数成功返回 ERROR_NONE,失败返回错误号。
  • 参数 ulId 是消息队列的句柄。
  • 输出参数 pulThreadUnblockNum 返回被解除阻塞的线程数,可以为 NULL。

调用 Lw_MsgQueue_Flush 函数将使所有阻塞在指定消息队列上的线程(包括发送和接收线程)就绪,这样避免了线程长时间阻塞的状态。

#include <SylixOS.h>
ULONG    Lw_MsgQueue_FlushSend(LW_HANDLE  ulId, 
                               ULONG     *pulThreadUnblockNum);

函数 Lw_MsgQueue_FlushSend 原型分析:

  • 此函数成功返回 ERROR_NONE,失败返回错误号。
  • 参数 ulId 是消息队列的句柄。
  • 输出参数 pulThreadUnblockNum 返回被解除阻塞的线程数,可以为 NULL。

调用 Lw_MsgQueue_FlushSend 函数将使所有阻塞在指定消息队列上的发送线程就绪,这样避免了发送线程因为长时间发送不出去消息而长时间阻塞的状态。

#include <SylixOS.h>
ULONG    Lw_MsgQueue_FlushReceive(LW_HANDLE  ulId, 
                                              ULONG      *pulThreadUnblockNum);

函数 Lw_MsgQueue_FlushReceive 原型分析:

  • 此函数成功返回 ERROR_NONE,失败返回错误号。
  • 参数 ulId 是消息队列的句柄。
  • 输出参数 pulThreadUnblockNum 返回被解除阻塞的线程数,可以为 NULL。

调用 Lw_MsgQueue_FlushReceive 函数将使所有阻塞在指定消息队列上的接收线程就绪,这样避免了接收线程因为长时间接收不到消息而长时间阻塞的状态。

获得消息队列的状态

#include <SylixOS.h>
ULONG    Lw_MsgQueue_Status(LW_HANDLE      ulId,
                            ULONG         *pulMaxMsgNum,
                            ULONG         *pulCounter,
                            size_t        *pstMsgLen,
                            ULONG         *pulOption,
                            ULONG         *pulThreadBlockNum);
ULONG    Lw_MsgQueue_StatusEx(LW_HANDLE     ulId,
                              ULONG        *pulMaxMsgNum,
                              ULONG        *pulCounter,
                              size_t       *pstMsgLen,
                              ULONG        *pulOption,
                              ULONG        *pulThreadBlockNum,
                              size_t       *pstMaxMsgLen);

以上函数原型分析:

  • 函数成功返回 ERROR_NONE,失败返回错误号。
  • 参数 ulId 是消息队列的句柄。
  • 输出参数 pulMaxMsgNum 用于接收消息队列可容纳的最大消息个数。
  • 输出参数 pulCounter 用于接收消息队列当前消息的数目。
  • 输出参数 pstMsgLen 用于接收消息队列最近一则消息的长度。
  • 输出参数 pulOption 用于接收消息队列的创建选项。
  • 输出参数 pulThreadBlockNum 用于接收当前阻塞在该消息队列的线程数。
  • 输出参数 pstMaxMsgLen 用于接收消息队列的的单则消息的最大长度。

获得消息队列的名字

#include <SylixOS.h>
ULONG    Lw_MsgQueue_GetName(LW_HANDLE  ulId,
                             PCHAR      pcName);

函数 Lw_MsgQueue_GetName 原型分析:

  • 此函数成功返回 ERROR_NONE,失败返回错误号。
  • 参数 ulId 是消息队列的句柄。
  • 输出参数 pcName 是计数型信号量的名字,pcName 应该指向一个大小为 LW_CFG_OBJECT_NAME_SIZE 的字符数组。

下面程序展示了 SylixOS 消息队列的使用,程序创建两个线程和一个 SylixOS 消息队列;线程 tTestB 将字符串作为消息发送到消息队列中,线程 tTestA 从消息队列中读取消息并打印。

#include <SylixOS.h>
#include "string.h"

static LW_HANDLE    _G_hMsgQ;

static PVOID  tTestA (PVOID  pvArg)
{
    INT         iError;
    CHAR        acMsg[64];
    size_t      stLen;

    while (1) {
        iError = Lw_MsgQueue_Receive(_G_hMsgQ, acMsg, sizeof(acMsg), &stLen, LW_OPTION_WAIT_INFINITE);
        if (iError != ERROR_NONE) {
            break;
        }
        printf("tTestA(): get a msg \"%s\"\n", acMsg);
    }
    return  (LW_NULL);
}

static PVOID  tTestB (PVOID  pvArg)
{
    INT         iError;
    CHAR        acMsg[64];
    size_t      stLen;
    INT         iCount = 0;

    while (1) {
        sprintf(acMsg, "hello SylixOS %d", iCount);
        stLen = strlen(acMsg) + 1;
        iCount++;
        iError = Lw_MsgQueue_Send(_G_hMsgQ, acMsg, stLen);
        if (iError != ERROR_NONE) {
            break;
        }
        Lw_Time_SSleep(1);
    }
    return  (LW_NULL);
}
int main (int argc, char *argv[])
{
    LW_HANDLE               hThreadAId;
    LW_HANDLE               hThreadBId;

    _G_hMsgQ = Lw_MsgQueue_Create("msgq", 10, 64,
                                         LW_OPTION_WAIT_FIFO |
                                         LW_OPTION_OBJECT_LOCAL,
                                         LW_NULL);
    if (_G_hMsgQ == LW_OBJECT_HANDLE_INVALID) {
        printf("message queue create failed.\n");
        return  (-1);
    }
    hThreadAId = Lw_Thread_Create("t_testa", tTestA, LW_NULL, LW_NULL);
    if (hThreadAId == LW_OBJECT_HANDLE_INVALID) {
        printf("t_testa create failed.\n");
        return  (-1);
    }
    hThreadBId = Lw_Thread_Create("t_testb", tTestB, LW_NULL, LW_NULL);
    if (hThreadBId == LW_OBJECT_HANDLE_INVALID) {
        printf("t_testb create failed.\n");
        return  (-1);
    }
    Lw_Thread_Join(hThreadAId, LW_NULL);
    Lw_Thread_Join(hThreadBId, LW_NULL);
    Lw_MsgQueue_Delete(&_G_hMsgQ);
    return  (0);
}

在 SylixOS Shell 下运行程序:

# ./SylixOS_Message_Queuing
tTestA(): get a msg "hello SylixOS 0"
tTestA(): get a msg "hello SylixOS 1"
tTestA(): get a msg "hello SylixOS 2"
tTestA(): get a msg "hello SylixOS 3"
tTestA(): get a msg "hello SylixOS 5"
tTestA(): get a msg "hello SylixOS 6"
......
# ts
         NAME             TID    PID  PRI STAT LOCK SAFE    DELAY   PAGEFAILS FPU CPU
----------------------- ------- ----- --- ---- ---- ---- ---------- --------- --- ---
SylixOS_Message_Queuing 4010069    33 200 JOIN    0               0         1 USE   0
t_testa                 401006a    33 200 MSGQ    0               0         0 USE   0
t_testb                 401006b    33 200 SLP     0             506         0       0
文档内容是否对您有所帮助?
有帮助
没帮助