?? 本篇博客开始要继续给大家介绍线程同步和互斥的实际应用——生产者消费者模型,还要给大家介绍一种同步的机制——POSIX信号量
目录
- 生产者消费者模型
- 基于BlockingQueue的生产者消费者模型
-
- 介绍
- 实现
-
- 概述
- 基本方法的封装
- 放数据和取数据
- 封装一个任务
- 单生产者和单消费者模型分析
- 多生产者和多消费者
- POSIX信号量
-
- 介绍
- 相关接口介绍
- 基于环形队列的生产消费模型
-
- 环形队列介绍
- 实现
-
- 概述
- 取数据和放数据
- 结果分析
- 总结
生产者消费者模型
概念: 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过一个来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。生产消费者模型优点:
- 解耦:生产者和消费者是通过一个共享数据区域来进行通信。而不是直接进行通信,这样两个角色之间的依耐性就降低了(代码层面实现解耦),变成了角色与共享数据区域之间的弱耦合,一个逻辑出错不影响两一个逻辑,二者变得更独立。
- 支持并发:生产者负责生产数据,消费者负责拿数据。生产者生产完数据可以继续生产,大部分时间内是不需要等待消费者消费数据才继续生产。也就是说,在任一时刻,二者都是在正常处理任务的,进度都得以推进。
- 支持忙闲下不均:生产者生产了数据是放进容器中,消费者不必立即消费,可以慢慢地从容器中取数据。容器快要空了,消费者的消费速度就可以降下来,让生产者继续生产。
- 3种关系: 生产者与生产者(互斥)、生产者与消费者(同步(主要)和互斥)和消费者与消费者(互斥)
- 两个角色: 生产者和消费者
- 一个交易场所: 容器、共享资源等
文章图片
基于BlockingQueue的生产者消费者模型 介绍
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。阻塞队列的特点:
- 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素
- 当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
文章图片
实现 概述
因为交易场所是一个阻塞队列,所以,我封装了一个BlcokingQueue 的类,这里提供了放数据和取数据这样两个主要的方法。其中,有五个成员变量:
- 队列: 使用STL中的queue来实现
- 容量: 阻塞队列的容量,由用户给定,我们也可以提供一个默认的容量
- 互斥量: 为了实现生产者和消费者的同步,我们需要使用条件变量和互斥量来实现同步的操作
- 生产者唤醒和等待的条件变量: 当队列满了,生产者等待条件满足,应该挂起等待,等待消费者唤醒
- 消费者唤醒和等待的条件变量: 当队列为空,消费者等待条件满足,应该挂起等待,等待生产者唤醒
- 构造函数做初始化和资源分配的操作,分配锁资源和条件变量
- 析构函数做清理资源的操作,对锁和条件变量进行销毁
template
class BlockQueue
{
public:
BlockQueue(int capacity = 5)
:_capacity(capacity)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_c_cond, nullptr);
pthread_cond_init(&_p_cond, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_c_cond);
pthread_cond_destroy(&_p_cond);
}
private:
std::queue _q;
size_t_capacity;
// 队列最大容量
pthread_mutex_t _lock;
// 锁
pthread_cond_t_c_cond;
// 消费者被唤醒和挂起的条件变量
pthread_cond_t_p_cond;
// 生产者被唤醒和挂起的条件变量
};
基本方法的封装
我对阻塞队列的一些基本操作进行了封装,有以下几个处理动作(可以设置为私有方法):
- 判断队列为空或为满
- 唤醒生产者和唤醒消费者
- 生产者挂起等待和消费者挂起等待
- 加锁和解锁
private:
bool IsFull()
{
return _q.size() == _capacity;
}
bool IsEmpty()
{
return _q.empty();
}
void ConsumerWait()
{
std::cout << "consumer wait...." << std::endl;
pthread_cond_wait(&_c_cond, &_lock);
}
void WakeUpConsumer()
{
std::cout << "wake up consumer...." << std::endl;
pthread_cond_broadcast(&_c_cond);
}
void ProductorWait()
{
std::cout << "productor wait...." << std::endl;
pthread_cond_wait(&_p_cond, &_lock);
}
void WakeUpProductor()
{
std::cout << "wake up productor...." << std::endl;
pthread_cond_broadcast(&_p_cond);
}
void LockQueue()
{
pthread_mutex_lock(&_lock);
}
void UnLockQueue()
{
pthread_mutex_unlock(&_lock);
}
放数据和取数据
- 生产者进行相关操作前先上锁,队列如果为满就需要挂起等待。队列不为满就生成一个数据,然后需要把数据放入阻塞队列中,解开锁之后唤醒消费者,喊消费者进行消费。
- 消费者进行相关操作前先上锁,队列如果为空就需要挂起等待。队列不为空就需要从阻塞队列中取一个数据,然后解开锁之后唤醒消费者,喊生产者进行生产。
先看代码:
void ProductData(T data)
{
LockQueue();
while (IsFull()){
// 让productor挂起等待
ProductorWait();
}
// 放数据
_q.push(data);
UnLockQueue();
// 唤醒consumer
WakeUpConsumer();
}
void ConsumeData(T& data)
{
LockQueue();
while (IsEmpty()){
// 让consumer挂起等待
ConsumerWait();
}
// 取数据
data = https://www.it610.com/article/_q.front();
_q.pop();
UnLockQueue();
// 唤醒productor
WakeUpProductor();
}
注意: 在临界资源判断唤醒条件是否就绪应该使用while循环检测,被唤醒的线程并不着急立即往下执行,而是再进行一次检测,判断当前唤醒条件是否真的就绪了。因为唤醒线程的这个函数调用可能会发生失败,且线程可能是在条件不满足的时候被唤醒,发生误判被伪唤醒。
封装一个任务
我们可以实现一个任务类,生产者把这个任务放进阻塞队列中,消费者取出并进行处理。其中还有一个run的任务执行方法
实现如下:
class Task
{
public:
Task(int a = 0, int b = 0)
:_a(a)
,_b(b)
{}
int Run()
{
return _a + _b;
}
int GetA()
{
return _a;
}
int GetB()
{
return _b;
}
private:
int _a;
int _b;
};
单生产者和单消费者模型分析
为了更好地理解整个过程,我先用一个单生产者和单消费者模型来演示分析。
下面是生产者和消费者两个线程的执行代码:
BlockQueue* q;
// 阻塞队列void* Consumer(void* arg)
{
long id = (long)arg;
while (1){
// 消费(取)数据
Task t(0, 0);
q->ConsumeData(t);
std::cout << "consumer " << id << " consumes a task: " << t.GetA() << " + " << t.GetB() << " = " << t.Run() << std::endl;
sleep(1);
// 后面可注释,调整速度
}
}void* Productor(void* arg)
{
long id = (long)arg;
while (1){
// 生产(放)数据
int x = rand()%10 + 1;
int y = rand()%10 + 1;
Task t(x, y);
std::cout << "productor " << id << " produncs a task: " << x << " + " << y << " = ?" << std::endl;
q->ProductData(t);
sleep(1);
// 后面可注释,调整速度
}
}
int main()
{
srand((size_t)time(nullptr));
// 创建一个交易场所
q =new BlockQueue;
pthread_t p, c;
pthread_create(&p, nullptr, Productor, (void*)(1));
pthread_create(&c, nullptr, Consumer, (void*)(1));
pthread_join(p, nullptr);
pthread_join(c, nullptr);
delete q;
return 0;
}
代码运行结果分三种情况进行分析:
- 消费者和生产者以相同的速度执行
文章图片
2. 生产者快,消费者慢
文章图片
生产者的速度慢,生产者生产一个任务立马唤醒消费者,消费者处理完一个数据,发现队列为空,然后挂起等待,接着生产者继续生产一个任务,然后唤醒消费者。可以看出的是,队列大部分时间是为空的,消费者大部分时间是处于挂起等待的,二者步调一直是一致的,且执行速度是随生产者的,也是并行的
3. 生产者慢,消费者快
文章图片
生产者生产速度快,导致一上来生产者就把队列塞满了任务,接着唤醒消费者来消费,然后挂起等待。紧接着消费者处理完一个任务就唤醒生产者来生产,生产者生产了一个任务就喊消费者来消费,然后继续挂起。可以看出的是,在这种情况下,队列长时间是满的,生产者大部分时间是挂起等待的。生产者和消费者开始一小部分时间内步调是不一致的,生产者生产完,消费者才消费是串行的,但是过了一会,二者步调就变得一致了,且速度是随消费者的
注意: 这里我们还可以设置一个高水位线和低水位线,队列中任务量达到高水位线时就喊消费者来消费,任务量达到低水位线时,就喊生产者赶紧来生产。
多生产者和多消费者
多生产者和多消费者不做详细分析,我会把主体内容介绍。
做到几点:
- 生产者之间需要互斥,也就是生产者和生产者之间需要组内竞争一把锁,消费者也是如此
- 生产者和消费者之间用互斥量和条件变量做到同步和互斥(上面就做到了)
#define P_COUNT 3
#define C_COUNT 3BlockQueue* q;
pthread_mutex_t c_lock;
// 消费者的锁
pthread_mutex_t p_lock;
// 生产者的锁void* Consumer(void* arg)
{
long id = (long)arg;
while (1){
pthread_mutex_lock(&c_lock);
// 消费(取)数据
Task t(0, 0);
q->ConsumeData(t);
std::cout << "consumer " << id << " consumes a task: " << t.GetA() << " + " << t.GetB() << " = " << t.Run() << std::endl;
pthread_mutex_unlock(&c_lock);
sleep(1);
}
}void* Productor(void* arg)
{
long id = (long)arg;
while (1){
pthread_mutex_lock(&p_lock);
// 生产(放)数据
int x = rand()%10 + 1;
int y = rand()%10 + 1;
Task t(x, y);
std::cout << "productor " << id << " produncs a task: " << x << " + " << y << " = ?" << std::endl;
q->ProductData(t);
pthread_mutex_unlock(&p_lock);
sleep(1);
}
}
int main()
{
srand((size_t)time(nullptr));
pthread_mutex_init(&c_lock, nullptr);
pthread_mutex_init(&p_lock, nullptr);
// 创建一个交易场所
q =new BlockQueue;
pthread_t p[P_COUNT];
pthread_t c[C_COUNT];
for (long i = 0;
i < P_COUNT;
++i)
{
pthread_create(p+i, nullptr, Productor, (void*)(i+1));
}
for (long i = 0;
i < C_COUNT;
++i)
{
pthread_create(c+i, nullptr, Consumer, (void*)(i+1));
}for (int i = 0;
i < C_COUNT;
++i)
{
pthread_join(c[i], nullptr);
}
for (int i = 0;
i < P_COUNT;
++i)
{
pthread_join(p[i], nullptr);
}pthread_mutex_destroy(&c_lock);
pthread_mutex_destroy(&p_lock);
delete q;
return 0;
}
注意:
- 生产者之间需要一个互斥量,消费者之间也需要一个互斥量
- 生产者和消费者的个数可以自己调整宏变量
- 主体代码并没有什么大的改变
POSIX信号量: 该信号量允许进程和线程同步对共享资源的访问。同时也可以用于实现线程间同步。总结几点:
- 是什么? 信号量本质是一个计数器,描述临界资源的有效个数。申请一个资源就对信号量减1(P操作),释放一个资源就对信号量加1(V操作)
- 为什么? 临界资源可以看成很多份,互相不冲突且高效
- 怎么用? 可以使用信号量的相关接口,来申请信号量和释放信号量(下面详细介绍)
semaphore.h
的头文件中。信号量是一个类型为sem_t
的变量初始化信号量:
函数原型:销毁信号量:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
返回值: 成功返回0,失败返回-1
- sem:信号量
- pshared:0表示线程间共享,非零表示进程间共享
- value:信号量初始值
函数原型:等待信号量:
int sem_destroy(sem_t *sem);
参数:
返回值: 成功返回0,失败返回-1
- sem:信号量
函数原型:发布信号量:
int sem_wait(sem_t *sem);
功能: 等待信号量,会将信号量的值减1
参数:
返回值: 成功返回0,失败返回-1
- sem:信号量
函数原型:信号量的简单使用:
int sem_post(sem_t *sem);
功能: 发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1
参数:
返回值: 成功返回0,失败返回-1
- sem:信号量
#include
#include
#include #include
using namespace std;
sem_t sem;
void* run1(void* arg)
{
while (1){
sem_wait(&sem);
cout << "run1 is running..." << endl;
sem_post(&sem);
sleep(1);
}
}
void* run2(void* arg)
{
while (1){
sem_wait(&sem);
cout << "run2 is running..." << endl;
sem_post(&sem);
sleep(1);
}}
int main()
{
sem_init(&sem, 0, 1);
pthread_t t1, t2;
pthread_create(&t1, nullptr, run1, nullptr);
pthread_create(&t2, nullptr, run2, nullptr);
sem_destroy(&sem);
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
return 0;
}
代码运行结果如下: 通过信号量可以做到线程同步
文章图片
说明: 二元信号量(value=https://www.it610.com/article/1,一个资源)等价于互斥锁
基于环形队列的生产消费模型 环形队列介绍 环形队列: 环形队列和普通队列的区别就是,这种队列是一种环形的结构,有一个头指针和一个尾指针维护环中一小段队列。(如下图)
文章图片
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
由于环形队列采用数组模拟,用模运算来模拟环状特性,如下:
文章图片
因为信号量就是一个计数器,所以我们可以通过信号量来实现多线程间的同步过程。
实现 概述
一个交易场所: 循环队列
两个角色:
- 生产者:需要申请空间资源(P操作),然后释放数据资源(V操作)
- 消费者:需要申请数据资源(P操作),然后释放空间资源(V操作)
几个变量成员:
- 队列:数组模拟
- 容量:由用户给定
- 空间资源信号量:队列的容量大小
- 数据资源信号量:开始为0
- 生产者的下标位置:开始为0
- 消费者的下标位置:开始为0
template
class RingQueue
{
public:
RingQueue(int capacity = 5)
:_capacity(capacity)
,_rq(capacity)
,_c_index(0)
,_p_index(0)
{
sem_init(&_blank_sem, 0, _capacity);
sem_init(&_data_sem, 0, 0);
}
~RingQueue()
{
sem_destroy(&_blank_sem);
sem_destroy(&_data_sem);
}
private:
std::vector _rq;
size_t_capacity;
sem_t_blank_sem;
sem_t_data_sem;
int_c_index;
int_p_index;
};
申请信号量的两个私有成员方法:
private:
void P(sem_t& sem)
{
sem_wait(&sem);
}
void V(sem_t& sem)
{
sem_post(&sem);
}
取数据和放数据
- 生产者生成数据前需要申请空间资源信号量(P(_blank_sem)),申请不成功就挂起等待,等待信号量来了继续获得信号量,然后释放数据资源信号量(V(_data_sem))
- 消费者消费数据前需要申请数据资源信号量(P(_data_sem)),申请不成功就挂起等待,等待信号量来了继续获得信号量,然后释放空间资源信号量(V(_blank_sem))
void GetData(T& data)
{
// consumer申请数据资源
P(_data_sem);
data = https://www.it610.com/article/_rq[_c_index];
_c_index = (_c_index + 1) % _capacity;
// consumer释放格子资源
V(_blank_sem);
}
void PutData(const T& data)
{
// productor申请格子资源
P(_blank_sem);
_rq[_p_index] = data;
_p_index = (_p_index + 1) % _capacity;
// productor释放数据资源
V(_data_sem);
}
说明: 这里不需要判断队列是否满了,因为有信号量作计数器,空间信号量资源为0,生产者如果继续申请就会挂起等待。所以,队列中满了这个状态我们不必关心了,有信号量在其中作用
结果分析
这里讨论单生产者和单消费者,创建两个线程的代码和阻塞队列模型是类似的,这里就不放重复代码了。同样地,我们分析三种情况:
- 生产者和消费者执行速度一致
文章图片
生产者生产完一个数据,然后消费者就消费了,二者步调一致,并发执行 - 生产者快,消费者慢
文章图片
生产者速度快,一下字就把队列塞满了数据(开始时二者步调不一致),接着生产者如果再去申请空间信号量,此时已经申请不到了,只能挂起等待,消费者消费数据是否空间信号量,这是生产者才可以继续生产,可以看出,在后面大部分时间,二者步调恢复一致了,且速度随消费者 - 生产者慢,消费者快
文章图片
生产生产者生产完一个数据,数据信号量加1,空间信号量减1,然后消费者里马消费了一个数据,数据信号量减1,空间信号量加1,此时数据信号量为0,消费者再去申请数据信号量,申请不到就挂起等待,只能等生产者在去生产释放空间信号量,然后消费者才可以申请到。可以看出的是,队列长时间是空的,二者步调一致,速度随生产者
文章图片
推荐阅读
- Linux|【Linux篇】第十三篇——多线程(一)(线程概念+线程控制)
- HTML|关于我踏进IT培训机构的四年
- github|如何在 Linux 桌面中启用 “激活 Linux” 水印通知 | Linux 中国
- 如何在Linux中获取和显示目录大小
- rabbitmq从入门到精通|RabbitMQ入门 -- 阿里云服务器安装RabbitMQ
- 如何在Linux和Windows环境中使用PHP执行Shell命令而无需等待结果(异步)
- Linux|ubuntu18.04.4更换内核版本
- 云原生与微服务|【docker基础操作命令】(一)启动命令和镜像命令
- 老猿Python|Python+Pycharm和 VisualStudio C++社区版使用PK及易混淆的语法问题