c++11并发库之线程同步
- 主要内容
- 条件变量
- future
- async/packeged_task/promise
- shared_future
std::mutex _mutex;
std::condition_variable _cv;
std::deque _data;
void thread_process_data()
{
while(1){
std::unique_lock lk(_mutex);
_cv.wait(lk, [](){return !_data.empty();
});
std::string str = _data.front();
_data.pop();
lk.unlock();
do_something(str);
}
}void thread_produce_data()
{
while(1){
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
{
std::lock_guard lk(_mutex);
_data.push("hello cv");
}
_cv.notify_one();
}
}
等待条件的线程处理流程:
- 获取一个
std::unique_lock
用来保护共享变量 - 执行
wait
(wait_for
或wait_until
),这些函数的调用将以原子方式释放互斥锁并暂停线程执行- 在释放互斥锁之前,这里会第一次检查lambda返回值;
- 如果条件真,则线程从wait返回并继续执行;
- 如果条件假,则线程阻塞并同时释放互斥锁;
- 当条件变量被通知、超时过期或出现虚假唤醒时线程被唤醒,互斥锁被原子地重新获取。然后,如果唤醒是假的,线程应该检查条件并继续等待。
- 唤醒时重新获取互斥锁
- 再次检查lambda返回值
- 如果条件真,则线程继续执行;
- 如果条件假,则线程阻塞并同时释放互斥锁;
使用std::unique_lock而不是std::lock_guard原因有二:唤醒等待条件的线程:
【c++11并发库之线程同步】1、等待的线程必须在等待时解锁互斥锁,并在等待之后再次锁定它,而std::lock_guard不提供这种灵活性.
2、wait()的第一个参数要求的就是std::unique_lock的引用。
- 获取
std::lock_gurad
- 执行修改
- 执行
notify_one
或notify_all
通知条件发生,等待条件的线程将被唤醒(通知不需要持有锁)
condition variable适用于某个事件会反复的发生的情景,在某些情景下线程只想等待一次事件为真,之后它将永远不会再等待这个事件发生。future async与future async相比thread的不同:
- std::thread是一个类模板,而std::async只是一个函数模板
- std::async返回std::future对象,让异步操作创建者访问异步结果.
- 调用std::thread总启动一个新线程,且在新线程中立即执行f
- 调用std::async不一定会启动一个新线程,并且可以决定是否立即执行f、延迟执行、不执行
- 设置为launch::async, 将在新线程中立即执行f
- 设置为launch::deferred,async的f将延期执行直到future对象被查询(get/wait),并且f将会在查询future对象的线程中执行,这个线程甚至不一定是调用async的线程;如果future对象从未被查询,f将永远不会执行。
- 设置为launch::deferred |std::launch::async,与实现有关,可能立即异步执行,也可能延迟执行。
- launch::deferred和std::launch::async都没设置,C++14中为未定义
- 类模板std::packaged_task包装任何Callable(函数、lambda、bind表达式或其他函数对象)。
- 与std::function不同的是,std::packaged_task提供了std::future.
- 其威力在于只要能够访问std::future,无论std::packaged_task作为对象被传输到哪个线程中执行,都可以通过std::future获取其结果。
- 实例化的std::packaged_task本身也是一个Callable。它可以包装在std::function对象中,也可以作为线程函数传递给std::thread,或者传递给另一个需要Callable的函数,甚至可以被直接调用。
- std::packaged_task可以用作线程池或者任务队列的构建块,作为“消息”在线程之间传递。
void task_lambda()
{
std::packaged_task task([](int a, int b) {
return std::pow(a, b);
});
std::future result = task.get_future();
//在当前线程执行std::pow(2,9)
task(2, 9);
std::cout << "task_lambda:\t" << result.get() << '\n';
}
包装bind
int f(int x, int y) { return std::pow(x,y);
}
void task_bind()
{
std::packaged_task task(bind(f,1,2));
std::future result = task.get_future();
//在当前线程执行std::pow(2,9)
task(2, 9);
std::cout << "task_bind:\t" << result.get() << '\n';
}
作为线程函数
int f(int x, int y) { return std::pow(x,y);
}
void task_thread()
{
std::packaged_task task(f);
std::future result = task.get_future();
//在新线程执行std::pow(2,9)
std::thread t(std::move(task), 2, 9);
std::cout << "task_thread:\t" << result.get() << '\n';
}
等同于:
void equal_to_task_thread()
{
std::future result = std::async(std::launch::async, f, 2, 9);
std::cout << "equal_to_async:\t" << result.get() << '\n';
}
promise与future
一般的范式是:std::promise
提供了一种设置值(类型为T)的方法,稍后可以通过关联的std::future
对象读取该值。配对使用future
和promise
,等待结果的线程可能会阻塞在future::wait()
或者future::get()
,而提供数据的线程可以使用配对的promise
来设置相关的值并使future
就绪。
- 想要获取结果类型为T的线程构造std::promise< T >,并通过promise::get_future获取future对象
- 将promise作为参数传递给新的线程函数,在新的线程函数中通过promise::set_value设置结果
- 等待的线程通过future::wait等待结果诞生,通过future::get获取结果。
void do_work(arg, std::promise
promise_arg) { do_something(); promise_arg.set_value(T); }int main() { //使用promise 在线程之间传递结果 std::promise one_promise; //构造future对象 std::future one_future = one_promise.get_future(); //promise作为参数传递给新的线程函数 std::thread work_thread(do_work, some_arg, std::move(one_promise)); //等待结果 one_future.wait(); //获取结果 std::cout << "result=" << one_future.get() << '\n'; work_thread.join(); }
- promise不可拷贝,不能直接将one_promise作为参数传递给新的线程函数。
- 当使用std::async、std::packeged_task时,如果线程函数函数、被包装的函数抛出异常,与future关联的数据中将 储存异常 而非结果。
- 对于std::promise有点不一样,如果想存储异常而非正常的结果,使用set_exception()替代set_value。
extern std::promise
some_promise; try { some_promise.set_value(calculate_value()); } catch(...) { //1使用current_exception提取calculate_value()中抛出的异常 //2使用set_exception()储存异常 some_promise.set_exception(std::current_exception()); //或者 //some_promise.set_exception(std::copy_exception(std::logic_error("foo "))); }
- 当异常被储存,future将就绪,通过调用future::get()将重新抛出存储的异常。
- 待续
- std::future处理了线程之间传输数据所需的所有同步,但是对std::future特定实例的成员函数调用不是线程安全的——本身如果从多个线程访问单个std::future对象而不进行额外的同步,那么将面临数据竞争和未定义的行为。
std::future 模型对异步结果有唯一所有权,只有一个线程可以提取到异步结果(通过get),在第一次调用get()之后,就已经没有要提取的值了。
- std::shared_future可以让多个线程可以等待相同的事件
std::future是可以moveable,对异步结果的唯一所有权可以在future实例之间转移(通过移动构造语义),但是每次只有一个实例引用特定的异步结果。但是std::shared_future实例是copyable,所以可以有多个shared_future对象引用相同的关联状态.
- 单个shared_future对象上的成员函数仍然不同步,从多个线程访问单个对象时,为了避免数据冲突,必须使用锁来保护访问
使用它的首选方法是获取对象的副本,并让每个线程访问自己的副本。如果每个线程通过自己的std::shared_future对象访问共享异步状态,那么从多个线程访问该状态是安全的。
- 通过future实例构造shared_future实例,必须转移所有权的方式进行
std::promise
p; std::future f = p.get_future(); //构造shared_future方式一: std::shared_future sf(f); //error std::shared_future sf(std::move(f)); //move触发移动构造 //方式二: std::shared_future sf = f.share(); //方式三: std::shared_future sf(std::move(p.get_future())); assert(!f.valid()); assert(sf.valid());
推荐阅读
- CountDownLatch-线程并发的发令枪
- Java并发编程|Java并发编程 - 深入剖析ReentrantLock之非公平锁加锁流程(第1篇)
- 啥是负载均衡、高并发、分布式、集群()
- java多线程-锁
- 并发,并行,阻塞,非阻塞,异步,同步
- iOS数据库之FMDB、Realm、WCDB
- 并发与高并发课程学习笔记(9)
- java并发编程实战读书笔记(第一章|java并发编程实战读书笔记:第一章 简介)
- Java并发基础之内存模型
- 高性能分布式锁