POSIX 命名消息队列

更新时间:
2024-12-26

POSIX 命名消息队列

POSIX 命名消息队列的句柄的类型为 mqd_t。使用时需要定义一个 mqd_t 类型的变量:

mqd_t mqd;

一个 POSIX 命名消息队列必须要调用 mq_open 函数创建或打开之后才能使用,接收消息可以调用 mq_receive 函数,发送消息可以使用 mq_send 函数。

当一个 POSIX 命名消息队列使用完毕后,应该调用 mq_close 函数将其关闭;当一个 POSIX 命名消息队列不再有任何用途时,应该调用 mq_unlink 函数将其删除,SylixOS 会回收该消息队列占用的内核资源。

命名消息队列属性块

创建一个 POSIX 命名消息队列需要使用 POSIX 命名消息队列属性块。POSIX 命名消息队列属性块的类型为 struct mq_attr,定义如下:

typedef struct mq_attr {
    long              mq_flags;                /*  消息队列文件标志                 */
    long              mq_maxmsg;               /*  消息队列可容纳的最大消息个数     */
    long              mq_msgsize;              /*  消息队列的单则消息的最大长度     */
    long              mq_curmsgs;              /*  当前消息队列的消息数目           */
} mq_attr_t;

使用时需要定义一个 struct mq_attr 结构的变量,如:

struct mq_attr     mqattr;

由于 POSIX 并没有定义 POSIX 命名消息队列属性块的操作函数,所以使用时需要对 struct mq_attr 结构的成员进行赋值,示例代码如下:

struct mq_attr mqattr = {O_RDWR, 128, 64, 0};

命名消息队列

命名消息队列的创建与打开

#include <mqueue.h>
mqd_t   mq_open(const char  *name, int  flag, ...);

函数 mq_open 原型分析:

  • 此函数成功时返回一个命名消息队列的句柄,失败时返回 MQ_FAILED 并设置错误号。
  • 参数 name 是命名消息队列的名字。
  • 参数 flag 是命名消息队列的打开选项(O_CREAT,O_EXCL...)。
  • 参数 … 是可变参数,该参数可指定消息队列的模式和属性块(mode 和 pmqattr)。

如果需要创建一个 POSIX 命名消息队列,打开选项应该加上 O_CREAT,并且可变参数应指定 mode 和 pmqattr,当 pmqattr 为 NULL 时,将使用默认的属性。创建命令消息队列的形式如下:

mqd_t  mq;
mq = mq_open(“mq_test”, O_RDWR | O_CREAT, 0666, NULL);

如果打开一个已经存在的 POSIX 命名消息队列,打开选项不应该加上 O_CREAT。

默认的属性定义如下:

mq_attr_t  mq_attr_default = {O_RDWR, 128, 64, 0};

即消息队列可容纳 128 条消息,单条消息的最大长度为 64 字节。

命名消息队列属性的获取与设置

#include <mqueue.h>
int    mq_getattr(mqd_t  mqd, struct mq_attr *pmqattr);

函数 mq_getattr 原型分析:

  • 此函数成功返回 0,失败返回-1 并设置错误号。
  • 参数 mqd 是 POSIX 命名消息队列的句柄。
  • 输出参数 pmqattr 用于接收 POSIX 命名消息队列的属性。
#include <mqueue.h>
int   mq_setattr(mqd_t  mqd, const struct mq_attr *pmqattrNew, struct mq_attr *pmqattrOld);

函数 mq_setattr 原型分析:

  • 此函数成功时返回 0,失败时返回-1 并设置错误号。
  • 参数 mqd 是 POSIX 命名消息队列的句柄。
  • 参数 pmqattrNew 指向一个 POSIX 命名消息队列属性块,是需要设置的新属性。
  • 输出参数 pmqattrOld 用于接收 POSIX 命名消息队列的当前属性,可以为 NULL。

向命名消息队列发送消息

#include <mqueue.h>
int   mq_send(mqd_t  mqd, const char  *msg, size_t  msglen, unsigned msgprio);
int   mq_timedsend(mqd_t  mqd, const char  *msg, size_t  msglen, 
                   unsigned msgprio, const struct timespec *abs_timeout);
int  mq_reltimedsend_np(mqd_t  mqd, const char  *msg, size_t  msglen, 
                        unsigned msgprio, const struct timespec *rel_timeout);

以上三个函数原型分析:

  • 以上函数成功时返回 0,失败时返回-1 并设置错误号。
  • 参数 mqd 是 POSIX 命名消息队列的句柄。
  • 参数 msg 指向需要发送的消息缓冲区(一个 const char 型的指针)。
  • 参数 msglen 是需要发送的消息的长度。
  • 参数 msgprio 是需要发送的消息的优先级。
  • 参数 abs_timeout 是当消息队列满时,发送者线程需要等待的绝对超时时间。
  • 参数 rel_timeout 是当消息队列满时,发送者线程需要等待的相对超时时间。

mq_timedsend 函数是 mq_send 函数的带等待超时时间的版本,abs_timeout 为等待的绝对超时时间(绝对时间是指到将来的某个时间点)。

mq_reltimedsend_np 函数是 mq_timedsend 函数的非 POSIX 标准版本,参数 rel_timeout 为等待的相对超时时间(相对时间是指以当前时间为起点的一个时间区间)。

接收命名消息队列的消息

#include <mqueue.h>
ssize_t      mq_receive(mqd_t  mqd, char  *msg, size_t  msglen, unsigned *pmsgprio);
ssize_t      mq_timedreceive(mqd_t  mqd, char  *msg, size_t  msglen, unsigned *pmsgprio, 
                             const struct timespec *abs_timeout);
ssize_t      mq_reltimedreceive_np(mqd_t  mqd, char  *msg, size_t  msglen, 
                                   unsigned *pmsgprio, const struct timespec *rel_timeout);

以上三个函数原型分析:

  • 以上函数成功时返回接收的消息的长度,失败时返回-1 并设置错误号。
  • 参数 mqd 是 POSIX 命名消息队列的句柄。
  • 参数 msg 指向用于接收消息的消息缓冲区(一个 char 型的指针)。
  • 参数 msglen 是消息缓冲区的长度。
  • 输出参数 pmsgprio 用于接收消息的优先级。
  • 参数 abs_timeout 是当消息队列空时,接收者线程需要等待的绝对超时时间。
  • 参数 rel_timeout 是当消息队列空时,接收者线程需要等待的相对超时时间。

mq_timedreceive 是 mq_receive 的带等待超时时间的版本,abs_timeout 为等待的绝对超时时间。

mq_reltimedreceive_np 是 mq_timedreceive 的非 POSIX 标准版本,参数 rel_timeout 为等待的相对超时时间。

注册命名消息队列可读时通知信号

#include <mqueue.h>
int    mq_notify(mqd_t  mqd, const struct sigevent  *pnotify);

函数 mq_notify 原型分析:

  • 此函数成功返回 0,失败返回-1 并设置错误号。
  • 参数 mqd 是 POSIX 命名消息队列的句柄。
  • 参数 pnotify 指向一个 struct sigevent 信号事件类型的变量。

命名消息队列的关闭

#include <mqueue.h>
int    mq_close(mqd_t  mqd);

函数 mq_close 原型分析:

  • 此函数成功时返回 0,失败时返回-1 并设置错误号。
  • 参数 mqd 是 POSIX 命名消息队列的句柄。

命名消息队列的删除

#include <mqueue.h>
int    mq_unlink(const char  *name);

函数 mq_close 原型分析:

  • 此函数成功返回 0,失败返回-1 并设置错误号。
  • 参数 name 是 POSIX 命名消息队列的名字。

下面是通过 POSIX 命名消息队列实现的生产者消费者实例,生产者程序每隔 1 秒向消息队列中发送一则消息,消费者每隔 2 秒从队列中获取一则消息,生产者生产完成后延时 1 分钟等待消费者正常退出,1 分钟之后生产者调用 mq_unlink 函数删除消息队列文件。

#include <stdio.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <string.h>

#define MAXSIZE         (10)
#define BUFFER          (8192)
#define FILE_NAME       "/posix"

struct msg_type {
    int     len;
    char    buf[MAXSIZE];
};

int main (int argc, char *argv[])
{
    mqd_t                   msgq_id;
    struct msg_type         msg;
    unsigned int            prio = 1;
    struct mq_attr          msgq_attr;
    int                     ret;
    int                     i;

    msgq_id = mq_open(FILE_NAME, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG, NULL);
    if(msgq_id == (mqd_t)-1) {
        perror("mq_open");
        return  (-1);
    }
    ret = mq_getattr(msgq_id, &msgq_attr);
    if(ret < 0) {
        perror("mq_getattr");
        return  (-1);
    }
    ret = mq_setattr(msgq_id, &msgq_attr, NULL);
    if(ret < 0) {
        perror("mq_setattr");
        return  (-1);
    }
    for (i = 0; i < 10; ++i) {
        memset(msg.buf, 0, MAXSIZE);
        sprintf(msg.buf, "%c", i + 'a');
        msg.len = 1;
        fprintf(stdout, "msg.buf = %s\n", msg.buf);
        ret = mq_send(msgq_id, (char*)&msg, sizeof(struct msg_type), prio);
        if(ret < 0) {
            perror("mq_send");
            return  (-1);
        }
        sleep(1);
    }
    sleep(60);
    ret = mq_close(msgq_id);
    if(ret < 0) {
        perror("mq_close");
        return  (-1);
    }
    ret = mq_unlink(FILE_NAME);
    if(ret < 0) {
        perror("mq_unlink");
        return (-1);
    }
    return  (0);
}
#include <stdio.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <string.h>

#define MAXSIZE         (10)
#define BUFFER          (8192)
#define FILE_NAME       "/posix"

struct msg_type {
    int     len;
    char    buf[MAXSIZE];
};

int main (int argc, char *argv[])
{
    mqd_t                   msgq_id;
    unsigned int            sender;
    struct msg_type         msg;
    struct mq_attr          msgq_attr;
    long                    recv_size = BUFFER;
    int                     ret;
    int                     i;

    msgq_id = mq_open(FILE_NAME, O_RDWR);
    if(msgq_id == (mqd_t)-1) {
        perror("mq_open");
        return  (-1);
    }
    ret = mq_getattr(msgq_id, &msgq_attr);
    if(ret < 0) {
        perror("mq_getattr");
        return  (-1);
    }
    if(recv_size < msgq_attr.mq_msgsize) {
        recv_size = msgq_attr.mq_msgsize;
    }
    for (i = 0; i < 10; ++i) {
        msg.len    = -1;
        memset(msg.buf, 0, MAXSIZE);
        ret = mq_receive(msgq_id, (char*)&msg, recv_size, &sender);
        if (ret < 0) {
            perror("mq_receive");
            return  (-1);
        }
        fprintf(stdout, "msg.len = %d, msg.buf = %s\n", msg.len, msg.buf);
        sleep(2);
    }
    ret = mq_close(msgq_id);
    if(ret < 0) {
        perror("mq_close");
        return  (-1);
    }
    return  (0);
}
文档内容是否对您有所帮助?
有帮助
没帮助