C++|生产者消费者问题 C++实现


  • 生产者消费者问题 C++实现
    • 知识准备
      • thread
        • 介绍
        • 成员类
        • 成员函数
      • sleep_for
        • 介绍
      • mutex
        • 介绍
        • 成员函数
      • unique_lock
        • 介绍
        • 成员函数
      • codition_variable
        • 介绍
        • 成员函数
    • 代码示例

生产者消费者问题 C++实现 知识准备 thread
介绍
  • 定位于头文件的class thread
  • 表示单个执行线程, 没有两个thread对象会表示同一个线程
    • 不可复制构造
    • 不可复制赋值
成员类
  • id
成员函数
  • get_id
    • 返回线程的id
  • hardware_concurrency
    • 返回实现支持的并发线程数
  • join
    • 等待线程完成其执行
sleep_for
介绍
  • 【C++|生产者消费者问题 C++实现】定义域头文件
  • 声明
    • template< class Rep, class Period > void sleep_for( const std::chrono::duration& sleep_duration );

    • sleep_duration : 要睡眠的时长
mutex
介绍
  • 定义于头文件
  • mutex 提供排他性非递归所有权语义:
    • 调用方线程从它成功调用 locktry_lock 开始,到它调用 unlock 为止占有 mutex
    • 线程占有 mutex 时,所有其他线程若试图要求 mutex 的所有权,则将阻塞(对于 lock 的调用)或收到 false 返回值(对于 try_lock ).
    • 调用方线程在调用 locktry_lock 前必须不占有 mutex
成员函数
  • lock
    • 锁定互斥, 若互斥不可用则堵塞
  • unlock
    • 解锁互斥
unique_lock
介绍
  • 定义于
  • 声明
    template< class Mutex > class unique_lock;

成员函数
  • lock
    • 锁定关联互斥
  • unlock
    • 解锁关联互斥
  • mutex
    • 返回指向关联互斥的指针
codition_variable
介绍
  • 定义于头文件
  • 声明class condition_variable;
  • 有意修改变量的线程必须:
      1. 获得std:: mutex(通过std::unique_lock)
      2. 在保有锁时进行修改
      3. 执行 notify_one 或 notify_all
  • 任何有意在 std::condition_variable 上等待的线程必须
      1. 获得std::unique_lock
      2. 执行 wait 、 wait_for 或 wait_until ,等待操作自动释放互斥,并悬挂线程的执行
      3. 线程被唤醒,且自动重获得互斥
  • **std::condition_variable只可与 std::unique_lock一同使用;
成员函数
  • notify_one
    • 通知一个等待线程
  • notify_all
    • 通知所有等待线程
  • wait
    • 阻塞当前进程, 直至被唤醒
  • wait_for
    • 阻塞当前线程,直到条件变量被唤醒,或到指定时限时长后
    • wait_until
      • 阻塞当前线程,直到条件变量被唤醒,或直到抵达指定时间点
代码示例
// operator_system.cpp: 定义控制台应用程序的入口点。 //#include "stdafx.h" #include #include #include #include #include using namespace std; static const int buffer_size = 10; // 缓存大小 static const int item_total = 100; //总共要生产 item_total个item// 缓存结构体, 使用循环队列当做缓存 struct Buffer { int buffer[buffer_size]; size_t read_position; // 当前读位置 size_t write_position; // 当前写位置 mutex mtx; // 读写互斥 //条件变量 condition_variable not_full; condition_variable not_empty; }buffer_res; typedef struct Buffer Buffer; void porduce_item(Buffer *b, int item) { unique_lock lock(b->mtx); //设置互斥锁while(((b->write_position + 1) % buffer_size) == b->read_position) { //当前缓存已经满了 cout << "buffer is full now, producer is wating....." << endl; (b->not_full).wait(lock); // 等待缓存非full } // 向缓存中添加item (b->buffer)[b->write_position] = item; (b->write_position)++; // 若到达最后一个, 写位置置位0 if (b->write_position == buffer_size) b->write_position = 0; (b->not_empty).notify_all(); lock.unlock(); }int consume_item(Buffer *b) { int data; unique_lock lock(b->mtx); while (b->write_position == b->read_position) {// 当前buffer 为空 cout << "buffer is empty , consumer is waiting....." << endl; (b->not_empty).wait(lock); }data = https://www.it610.com/article/(b->buffer)[b->read_position]; (b->read_position)++; if (b->read_position >= buffer_size) b->read_position = 0; (b->not_full).notify_all(); lock.unlock(); return data; }//生产者任务 void producer() { for (int i = 1; i<= item_total; i++) { cout << "prodece the " << i << "^th item ..." << endl; porduce_item(&buffer_res, i); } }//消费者任务 void consumer() { static int cnt = 0; while(1) { Sleep(1); int item = consume_item(&buffer_res); cout << "consume the " << item << "^th item" << endl; if (++cnt == item_total) break; } }//初始化 buffer void init_buffer(Buffer *b) { b->write_position = 0; b->read_position = 0; }int main() { init_buffer(&buffer_res); thread prodece(producer); thread consume(consumer); prodece.join(); consume.join(); getchar(); }

    推荐阅读