POSIX 命名消息队列
POSIX 命名消息队列的句柄的类型为 mqd_t。使用时需要定义一个 mqd_t 类型的变量:
mqd_t mqd;
一个 POSIX 命名消息队列必须要调用 mq_open 函数创建或打开之后才能使用,接收消息可以调用 mq_receive 函数,发送消息可以使用 mq_send 函数。
当一个 POSIX 命名消息队列使用完毕后,应该调用 mq_close 函数将其关闭;当一个 POSIX 命名消息队列不再有任何用途时,应该调用 mq_unlink 函数将其删除,SylixOS 会回收该消息队列占用的内核资源。
命名消息队列属性块
创建一个 POSIX 命名消息队列需要使用 POSIX 命名消息队列属性块。POSIX 命名消息队列属性块的类型为 struct mq_attr,定义如下:
typedef struct mq_attr {
long mq_flags; /* 消息队列文件标志 */
long mq_maxmsg; /* 消息队列可容纳的最大消息个数 */
long mq_msgsize; /* 消息队列的单则消息的最大长度 */
long mq_curmsgs; /* 当前消息队列的消息数目 */
} mq_attr_t;
使用时需要定义一个 struct mq_attr 结构的变量,如:
struct mq_attr mqattr;
由于 POSIX 并没有定义 POSIX 命名消息队列属性块的操作函数,所以使用时需要对 struct mq_attr 结构的成员进行赋值,示例代码如下:
struct mq_attr mqattr = {O_RDWR, 128, 64, 0};
命名消息队列
命名消息队列的创建与打开
#include <mqueue.h>
mqd_t mq_open(const char *name, int flag, ...);
函数 mq_open 原型分析:
- 此函数成功时返回一个命名消息队列的句柄,失败时返回 MQ_FAILED 并设置错误号。
- 参数 name 是命名消息队列的名字。
- 参数 flag 是命名消息队列的打开选项(O_CREAT,O_EXCL...)。
- 参数 … 是可变参数,该参数可指定消息队列的模式和属性块(mode 和 pmqattr)。
如果需要创建一个 POSIX 命名消息队列,打开选项应该加上 O_CREAT,并且可变参数应指定 mode 和 pmqattr,当 pmqattr 为 NULL 时,将使用默认的属性。创建命令消息队列的形式如下:
mqd_t mq;
mq = mq_open(“mq_test”, O_RDWR | O_CREAT, 0666, NULL);
如果打开一个已经存在的 POSIX 命名消息队列,打开选项不应该加上 O_CREAT。
默认的属性定义如下:
mq_attr_t mq_attr_default = {O_RDWR, 128, 64, 0};
即消息队列可容纳 128 条消息,单条消息的最大长度为 64 字节。
命名消息队列属性的获取与设置
#include <mqueue.h>
int mq_getattr(mqd_t mqd, struct mq_attr *pmqattr);
函数 mq_getattr 原型分析:
- 此函数成功返回 0,失败返回-1 并设置错误号。
- 参数 mqd 是 POSIX 命名消息队列的句柄。
- 输出参数 pmqattr 用于接收 POSIX 命名消息队列的属性。
#include <mqueue.h>
int mq_setattr(mqd_t mqd, const struct mq_attr *pmqattrNew, struct mq_attr *pmqattrOld);
函数 mq_setattr 原型分析:
- 此函数成功时返回 0,失败时返回-1 并设置错误号。
- 参数 mqd 是 POSIX 命名消息队列的句柄。
- 参数 pmqattrNew 指向一个 POSIX 命名消息队列属性块,是需要设置的新属性。
- 输出参数 pmqattrOld 用于接收 POSIX 命名消息队列的当前属性,可以为 NULL。
向命名消息队列发送消息
#include <mqueue.h>
int mq_send(mqd_t mqd, const char *msg, size_t msglen, unsigned msgprio);
int mq_timedsend(mqd_t mqd, const char *msg, size_t msglen,
unsigned msgprio, const struct timespec *abs_timeout);
int mq_reltimedsend_np(mqd_t mqd, const char *msg, size_t msglen,
unsigned msgprio, const struct timespec *rel_timeout);
以上三个函数原型分析:
- 以上函数成功时返回 0,失败时返回-1 并设置错误号。
- 参数 mqd 是 POSIX 命名消息队列的句柄。
- 参数 msg 指向需要发送的消息缓冲区(一个 const char 型的指针)。
- 参数 msglen 是需要发送的消息的长度。
- 参数 msgprio 是需要发送的消息的优先级。
- 参数 abs_timeout 是当消息队列满时,发送者线程需要等待的绝对超时时间。
- 参数 rel_timeout 是当消息队列满时,发送者线程需要等待的相对超时时间。
mq_timedsend 函数是 mq_send 函数的带等待超时时间的版本,abs_timeout 为等待的绝对超时时间(绝对时间是指到将来的某个时间点)。
mq_reltimedsend_np 函数是 mq_timedsend 函数的非 POSIX 标准版本,参数 rel_timeout 为等待的相对超时时间(相对时间是指以当前时间为起点的一个时间区间)。
接收命名消息队列的消息
#include <mqueue.h>
ssize_t mq_receive(mqd_t mqd, char *msg, size_t msglen, unsigned *pmsgprio);
ssize_t mq_timedreceive(mqd_t mqd, char *msg, size_t msglen, unsigned *pmsgprio,
const struct timespec *abs_timeout);
ssize_t mq_reltimedreceive_np(mqd_t mqd, char *msg, size_t msglen,
unsigned *pmsgprio, const struct timespec *rel_timeout);
以上三个函数原型分析:
- 以上函数成功时返回接收的消息的长度,失败时返回-1 并设置错误号。
- 参数 mqd 是 POSIX 命名消息队列的句柄。
- 参数 msg 指向用于接收消息的消息缓冲区(一个 char 型的指针)。
- 参数 msglen 是消息缓冲区的长度。
- 输出参数 pmsgprio 用于接收消息的优先级。
- 参数 abs_timeout 是当消息队列空时,接收者线程需要等待的绝对超时时间。
- 参数 rel_timeout 是当消息队列空时,接收者线程需要等待的相对超时时间。
mq_timedreceive 是 mq_receive 的带等待超时时间的版本,abs_timeout 为等待的绝对超时时间。
mq_reltimedreceive_np 是 mq_timedreceive 的非 POSIX 标准版本,参数 rel_timeout 为等待的相对超时时间。
注册命名消息队列可读时通知信号
#include <mqueue.h>
int mq_notify(mqd_t mqd, const struct sigevent *pnotify);
函数 mq_notify 原型分析:
- 此函数成功返回 0,失败返回-1 并设置错误号。
- 参数 mqd 是 POSIX 命名消息队列的句柄。
- 参数 pnotify 指向一个 struct sigevent 信号事件类型的变量。
命名消息队列的关闭
#include <mqueue.h>
int mq_close(mqd_t mqd);
函数 mq_close 原型分析:
- 此函数成功时返回 0,失败时返回-1 并设置错误号。
- 参数 mqd 是 POSIX 命名消息队列的句柄。
命名消息队列的删除
#include <mqueue.h>
int mq_unlink(const char *name);
函数 mq_close 原型分析:
- 此函数成功返回 0,失败返回-1 并设置错误号。
- 参数 name 是 POSIX 命名消息队列的名字。
下面是通过 POSIX 命名消息队列实现的生产者消费者实例,生产者程序每隔 1 秒向消息队列中发送一则消息,消费者每隔 2 秒从队列中获取一则消息,生产者生产完成后延时 1 分钟等待消费者正常退出,1 分钟之后生产者调用 mq_unlink 函数删除消息队列文件。
#include <stdio.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <string.h>
#define MAXSIZE (10)
#define BUFFER (8192)
#define FILE_NAME "/posix"
struct msg_type {
int len;
char buf[MAXSIZE];
};
int main (int argc, char *argv[])
{
mqd_t msgq_id;
struct msg_type msg;
unsigned int prio = 1;
struct mq_attr msgq_attr;
int ret;
int i;
msgq_id = mq_open(FILE_NAME, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG, NULL);
if(msgq_id == (mqd_t)-1) {
perror("mq_open");
return (-1);
}
ret = mq_getattr(msgq_id, &msgq_attr);
if(ret < 0) {
perror("mq_getattr");
return (-1);
}
ret = mq_setattr(msgq_id, &msgq_attr, NULL);
if(ret < 0) {
perror("mq_setattr");
return (-1);
}
for (i = 0; i < 10; ++i) {
memset(msg.buf, 0, MAXSIZE);
sprintf(msg.buf, "%c", i + 'a');
msg.len = 1;
fprintf(stdout, "msg.buf = %s\n", msg.buf);
ret = mq_send(msgq_id, (char*)&msg, sizeof(struct msg_type), prio);
if(ret < 0) {
perror("mq_send");
return (-1);
}
sleep(1);
}
sleep(60);
ret = mq_close(msgq_id);
if(ret < 0) {
perror("mq_close");
return (-1);
}
ret = mq_unlink(FILE_NAME);
if(ret < 0) {
perror("mq_unlink");
return (-1);
}
return (0);
}
#include <stdio.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <string.h>
#define MAXSIZE (10)
#define BUFFER (8192)
#define FILE_NAME "/posix"
struct msg_type {
int len;
char buf[MAXSIZE];
};
int main (int argc, char *argv[])
{
mqd_t msgq_id;
unsigned int sender;
struct msg_type msg;
struct mq_attr msgq_attr;
long recv_size = BUFFER;
int ret;
int i;
msgq_id = mq_open(FILE_NAME, O_RDWR);
if(msgq_id == (mqd_t)-1) {
perror("mq_open");
return (-1);
}
ret = mq_getattr(msgq_id, &msgq_attr);
if(ret < 0) {
perror("mq_getattr");
return (-1);
}
if(recv_size < msgq_attr.mq_msgsize) {
recv_size = msgq_attr.mq_msgsize;
}
for (i = 0; i < 10; ++i) {
msg.len = -1;
memset(msg.buf, 0, MAXSIZE);
ret = mq_receive(msgq_id, (char*)&msg, recv_size, &sender);
if (ret < 0) {
perror("mq_receive");
return (-1);
}
fprintf(stdout, "msg.len = %d, msg.buf = %s\n", msg.len, msg.buf);
sleep(2);
}
ret = mq_close(msgq_id);
if(ret < 0) {
perror("mq_close");
return (-1);
}
return (0);
}