条件变量
我们继续使用 POSIX 读写锁章节的例子介绍条件变量。
有十个线程(线程 A、线程 1...线程 9)和一个变量 V,线程 A 需要写变量 V,线程 1...线程 9 需要读变量 V。我们假设只有在变量 V 的值改变时,读者线程才需要读变量 V,在变量 V 的值不变时,读者线程需要阻塞。
读者线程阻塞前可能需要一种“判断”操作,判断变量 V 当前的值是否与上一次读到的值不一致;“判断”操作前需要加锁,如果一致那么读者线程需要阻塞,真正进入阻塞状态前读者线程又需要释放该锁,释放锁与阻塞需要是一个不可打断的原子操作。
我们可以想象一下释放锁与阻塞不是一个原子操作的情形,如果读者线程在释放锁与阻塞之间被线程 A 抢占了,毫无疑问,线程 A 可以成功获锁,线程 A 写变量 V,变量 V 的值改变了,但读者线程却阻塞了,显然读者线程丢失了一次对变量 V 值改变的响应!
同时在线程 A 写变量 V 后以广播的方式“通知”多个读者线程去读该变量。
我们需要一种新的线程间通信手段“条件变量”来解决以上问题,释放锁与阻塞是一个原子操作和能以广播的方式“通知”多个读者线程。
条件变量是多线程间的一种同步机制,条件变量与互斥锁一起使用时,允许线程以无竞争的形式等待条件的发生。条件本身由互斥量保护,因此线程在改变条件之前必须首先锁住互斥量,其他线程在获得互斥量之前不会察觉到条件的改变。下面伪代码过程是使用条件变量的一种可能方法:
定义全局条件变量 (global_cond)
定义全局互斥锁 (global_lock)
全局变量 (global_value)
t1 ()
{
获得互斥锁 (加锁 global_lock)
等待条件 (Wait)
释放互斥锁 (解锁 global_lock)
}
t2 ()
{
获得互斥锁 (加锁 global_lock)
全局变量操作
通知条件满足 (Signal)
释放互斥锁 (解锁 global_lock)
}
main ()
{
初始化条件变量 (global_cond)
创建互斥锁 (global_lock)
创建线程 (t1 t2)
线程JOIN (join t1 t2)
销毁条件变量 (global_cond)
删除互斥锁 (global_lock)
}
SylixOS 条件变量的类型为 LW_THREAD_COND。
使用时需要定义一个 LW_THREAD_COND 型的变量,如:
LW_THREAD_COND tcd;
一个 SylixOS 条件变量必须要调用 Lw_Thread_Cond_Init 函数初始化之后才能使用。
如果需要等待一个条件变量,可以调用 Lw_Thread_Cond_Wait 函数,中断服务程序不能调用 Lw_Thread_Cond_Wait 函数等待一个条件变量,因为 Lw_Thread_Cond_Wait 函数会阻塞当前线程。
发送一个条件变量可以使用 Lw_Thread_Cond_Signal 或 Lw_Thread_Cond_Broadcast 函数,中断服务程序也可以发送一个条件变量。
当一个条件变量使用完毕后,应该调用 Lw_Thread_Cond_Destroy 函数删除,SylixOS 会回收该条件变量占用的内核资源。需要注意的是,尝试再次使用一个已被删除的条件变量将会产生未知错误。
创建一个 SylixOS 条件变量需要使用 SylixOS 条件变量属性块。SylixOS 条件变量属性块的类型为 ULONG。
使用时定义一个 ULONG 类型的变量,如:
ULONG ulCondAttr;
条件变量属性块
条件变量属性块的初始化和删除
#include <SylixOS.h>
ULONG Lw_Thread_Condattr_Init(ULONG *pulAttr);
函数 Lw_Thread_Condattr_Init 原型分析:
- 此函数成功返回 0,失败返回错误号。
- 参数 pulAttr 是 SylixOS 条件变量属性块的指针。
#include <SylixOS.h>
ULONG Lw_Thread_Condattr_Destroy(ULONG *pulAttr);
函数 Lw_Thread_Condattr_Destroy 原型分析:
- 此函数成功返回 0,失败返回错误号。
- 参数 pulAttr 是 SylixOS 条件变量属性块的指针。
设置和获取条件变量属性块的进程共享属性
#include <SylixOS.h>
ULONG Lw_Thread_Condattr_Setpshared(ULONG *pulAttr, INT iShared);
函数 Lw_Thread_Condattr_Setpshared 原型分析:
- 此函数成功返回 0,失败返回错误号。
- 参数 pulAttr 是 SylixOS 条件变量属性块的指针。
- 参数 iShared 标识了 SylixOS 条件变量属性块是否进程共享。
#include <SylixOS.h>
ULONG Lw_Thread_Condattr_Getpshared(const ULONG *pulAttr, INT *piShared);
函数 Lw_Thread_Condattr_Getpshared 原型分析:
- 此函数成功返回 0,失败返回错误号。
- 参数 pulAttr 是 SylixOS 条件变量属性块的指针。
- 输出参数 iShared 标识了 SylixOS 条件变量属性块是否进程共享。
条件变量
条件变量的初始化和删除
#include <SylixOS.h>
ULONG Lw_Thread_Cond_Init(PLW_THREAD_COND ptcd, ULONG ulAttr);
函数 Lw_Thread_Cond_Init 原型分析:
- 此函数成功返回 0,失败返回错误号。
- 参数 ptcd 是 SylixOS 条件变量的指针。
- 参数 ulAttr 是 SylixOS 条件变量属性块。
#include <SylixOS.h>
ULONG Lw_Thread_Cond_Destroy(PLW_THREAD_COND ptcd);
函数 Lw_Thread_Cond_Destroy 原型分析:
- 此函数成功返回 0,失败返回错误号。
- 参数 ptcd 是 SylixOS 条件变量的指针。
条件变量的等待
#include <SylixOS.h>
ULONG Lw_Thread_Cond_Wait(PLW_THREAD_COND ptcd,
LW_HANDLE ulMutex,
ULONG ulTimeout);
函数 Lw_Thread_Cond_Wait 原型分析:
- 此函数成功返回 0,失败返回错误号。
- 参数 ptcd 是 SylixOS 条件变量的指针。
- 参数 ulMutex 是 SylixOS 互斥信号量的句柄。
- 参数 ulTimeout 是等待的超时时间,单位为时钟嘀嗒 Tick。
条件变量的发送
#include <SylixOS.h>
ULONG Lw_Thread_Cond_Signal(PLW_THREAD_COND ptcd);
ULONG Lw_Thread_Cond_Broadcast(PLW_THREAD_COND ptcd);
以上两个函数原型分析:
- 函数成功返回 0,失败返回错误号。
- 参数 ptcd 是 SylixOS 条件变量的指针。
Lw_Thread_Cond_Broadcast 与 Lw_Thread_Cond_Signal 函数的区别在于,Lw_Thread_Cond_Broadcast 将以广播的方式唤醒阻塞在该条件变量的所有线程,而 Lw_Thread_Cond_Signal 只会唤醒一个线程。
下面程序展示了 SylixOS 条件变量的使用,程序创建两个线程和一个 SylixOS 互斥信号量及一个 SylixOS 条件变量;线程 tTestA 等待条件满足并打印变量 _G_iCount 的值,线程 tTestB 对变量 _G_iCount 进行自增操作并以广播的形式发送条件满足信号。使用 SylixOS 互斥信号量作为访问变量 _G_iCount 的互斥手段,使用 SylixOS 条件变量作为变量 _G_iCount 值改变的通知手段。
#include <SylixOS.h>
static INT _G_iCount = 0;
static LW_HANDLE _G_hLock;
static LW_THREAD_COND _G_threadCond;
static PVOID tTestA (PVOID pvArg)
{
INT iError;
while (1) {
iError = Lw_SemaphoreM_Wait(_G_hLock, LW_OPTION_WAIT_INFINITE);
if (iError != ERROR_NONE) {
break;
}
iError = Lw_Thread_Cond_Wait(&_G_threadCond,
_G_hLock,
LW_OPTION_WAIT_INFINITE);
if (iError != ERROR_NONE) {
Lw_SemaphoreM_Post(_G_hLock);
break;
}
printf("tTestA(): count = %d\n", _G_iCount);
Lw_SemaphoreM_Post(_G_hLock);
}
return (LW_NULL);
}
static PVOID tTestB (PVOID pvArg)
{
INT iError;
while (1) {
iError = Lw_SemaphoreM_Wait(_G_hLock, LW_OPTION_WAIT_INFINITE);
if (iError != ERROR_NONE) {
break;
}
_G_iCount++;
Lw_Thread_Cond_Broadcast(&_G_threadCond);
Lw_SemaphoreM_Post(_G_hLock);
Lw_Time_SSleep(1);
}
return (LW_NULL);
}
int main (int argc, char *argv[])
{
LW_HANDLE hThreadAId;
LW_HANDLE hThreadBId;
ULONG ulCondAttr;
INT iError;
Lw_Thread_Condattr_Init(&ulCondAttr);
Lw_Thread_Condattr_Setpshared(&ulCondAttr, LW_FALSE);
iError = Lw_Thread_Cond_Init(&_G_threadCond, ulCondAttr);
if (iError != ERROR_NONE) {
printf("cond create failed.\n");
return (-1);
}
Lw_Thread_Condattr_Destroy(&ulCondAttr);
_G_hLock = Lw_SemaphoreM_Create("count_lock",
LW_PRIO_HIGH,
LW_OPTION_WAIT_FIFO |
LW_OPTION_OBJECT_LOCAL|
LW_OPTION_INHERIT_PRIORITY |
LW_OPTION_ERRORCHECK,
LW_NULL);
if (_G_hLock == LW_OBJECT_HANDLE_INVALID) {
printf("mutex create failed.\n");
return (-1);
}
hThreadAId = Lw_Thread_Create("t_testa", tTestA, LW_NULL, LW_NULL);
if (hThreadAId == LW_OBJECT_HANDLE_INVALID) {
printf("t_testa create failed.\n");
return (-1);
}
hThreadBId = Lw_Thread_Create("t_testb", tTestB, LW_NULL, LW_NULL);
if (hThreadBId == LW_OBJECT_HANDLE_INVALID) {
printf("t_testb create failed.\n");
return (-1);
}
Lw_Thread_Join(hThreadAId, LW_NULL);
Lw_Thread_Join(hThreadBId, LW_NULL);
Lw_Thread_Cond_Destroy(&_G_threadCond);
Lw_SemaphoreM_Delete(&_G_hLock);
return (0);
}
在 SylixOS Shell 下运行程序:
# ./SylixOS_Conditional_Variable
tTestA(): count = 1
tTestA(): count = 2
tTestA(): count = 3
tTestA(): count = 4
tTestA(): count = 5
tTestA(): count = 6
......
# ts
NAME TID PID PRI STAT LOCK SAFE DELAY PAGEFAILS FPU CPU
---------------------------- ------- ----- --- ---- ---- ---- ---------- --------- --- ---
SylixOS_Conditional_Variable 401006f 35 200 JOIN 0 0 1 USE 0
t_testa 4010070 35 200 SEM 0 0 0 0
t_testb 4010071 35 200 SLP 0 920 0 0