c++11并发库之线程同步

  • 主要内容
    • 条件变量
    • future
    • async/packeged_task/promise
    • shared_future
条件变量
std::mutex _mutex; std::condition_variable _cv; std::deque _data; void thread_process_data() { while(1){ std::unique_lock lk(_mutex); _cv.wait(lk, [](){return !_data.empty(); }); std::string str = _data.front(); _data.pop(); lk.unlock(); do_something(str); } }void thread_produce_data() { while(1){ std::this_thread::sleep_for(std::chrono::milliseconds(1000)); { std::lock_guard lk(_mutex); _data.push("hello cv"); } _cv.notify_one(); } }

等待条件的线程处理流程:
  • 获取一个std::unique_lock用来保护共享变量
  • 执行wait(wait_forwait_until),这些函数的调用将以原子方式释放互斥锁并暂停线程执行
    • 在释放互斥锁之前,这里会第一次检查lambda返回值;
    • 如果条件真,则线程从wait返回并继续执行;
    • 如果条件假,则线程阻塞并同时释放互斥锁;
  • 当条件变量被通知、超时过期或出现虚假唤醒时线程被唤醒,互斥锁被原子地重新获取。然后,如果唤醒是假的,线程应该检查条件并继续等待。
    • 唤醒时重新获取互斥锁
    • 再次检查lambda返回值
    • 如果条件真,则线程继续执行;
    • 如果条件假,则线程阻塞并同时释放互斥锁;
使用std::unique_lock而不是std::lock_guard原因有二:
【c++11并发库之线程同步】1、等待的线程必须在等待时解锁互斥锁,并在等待之后再次锁定它,而std::lock_guard不提供这种灵活性.
2、wait()的第一个参数要求的就是std::unique_lock的引用。
唤醒等待条件的线程:
  • 获取std::lock_gurad
  • 执行修改
  • 执行notify_onenotify_all通知条件发生,等待条件的线程将被唤醒(通知不需要持有锁)
condition variable适用于某个事件会反复的发生的情景,在某些情景下线程只想等待一次事件为真,之后它将永远不会再等待这个事件发生。
future async与future async相比thread的不同:
  • std::thread是一个类模板,而std::async只是一个函数模板
  • std::async返回std::future对象,让异步操作创建者访问异步结果.
  • 调用std::thread总启动一个新线程,且在新线程中立即执行f
  • 调用std::async不一定会启动一个新线程,并且可以决定是否立即执行f、延迟执行、不执行
async的policy参数
  • 设置为launch::async, 将在新线程中立即执行f
  • 设置为launch::deferred,async的f将延期执行直到future对象被查询(get/wait),并且f将会在查询future对象的线程中执行,这个线程甚至不一定是调用async的线程;如果future对象从未被查询,f将永远不会执行。
  • 设置为launch::deferred |std::launch::async,与实现有关,可能立即异步执行,也可能延迟执行。
  • launch::deferred和std::launch::async都没设置,C++14中为未定义
packaged_task与future
  • 类模板std::packaged_task包装任何Callable(函数、lambda、bind表达式或其他函数对象)。
  • 与std::function不同的是,std::packaged_task提供了std::future.
  • 其威力在于只要能够访问std::future,无论std::packaged_task作为对象被传输到哪个线程中执行,都可以通过std::future获取其结果。
  • 实例化的std::packaged_task本身也是一个Callable。它可以包装在std::function对象中,也可以作为线程函数传递给std::thread,或者传递给另一个需要Callable的函数,甚至可以被直接调用。
  • std::packaged_task可以用作线程池或者任务队列的构建块,作为“消息”在线程之间传递。
包装lambda
void task_lambda() { std::packaged_task task([](int a, int b) { return std::pow(a, b); }); std::future result = task.get_future(); //在当前线程执行std::pow(2,9) task(2, 9); std::cout << "task_lambda:\t" << result.get() << '\n'; }

包装bind
int f(int x, int y) { return std::pow(x,y); } void task_bind() { std::packaged_task task(bind(f,1,2)); std::future result = task.get_future(); //在当前线程执行std::pow(2,9) task(2, 9); std::cout << "task_bind:\t" << result.get() << '\n'; }

作为线程函数
int f(int x, int y) { return std::pow(x,y); } void task_thread() { std::packaged_task task(f); std::future result = task.get_future(); //在新线程执行std::pow(2,9) std::thread t(std::move(task), 2, 9); std::cout << "task_thread:\t" << result.get() << '\n'; }

等同于:
void equal_to_task_thread() { std::future result = std::async(std::launch::async, f, 2, 9); std::cout << "equal_to_async:\t" << result.get() << '\n'; }

promise与future
std::promise提供了一种设置值(类型为T)的方法,稍后可以通过关联的std::future对象读取该值。配对使用futurepromise,等待结果的线程可能会阻塞在future::wait()或者future::get(),而提供数据的线程可以使用配对的promise来设置相关的值并使future就绪。
一般的范式是:
  • 想要获取结果类型为T的线程构造std::promise< T >,并通过promise::get_future获取future对象
  • 将promise作为参数传递给新的线程函数,在新的线程函数中通过promise::set_value设置结果
  • 等待的线程通过future::wait等待结果诞生,通过future::get获取结果。
    void do_work(arg, std::promise promise_arg) { do_something(); promise_arg.set_value(T); }int main() { //使用promise在线程之间传递结果 std::promise one_promise; //构造future对象 std::future one_future = one_promise.get_future(); //promise作为参数传递给新的线程函数 std::thread work_thread(do_work, some_arg, std::move(one_promise)); //等待结果 one_future.wait(); //获取结果 std::cout << "result=" << one_future.get() << '\n'; work_thread.join(); }

  • promise不可拷贝,不能直接将one_promise作为参数传递给新的线程函数。
异常与future
  • 当使用std::async、std::packeged_task时,如果线程函数函数、被包装的函数抛出异常,与future关联的数据中将 储存异常 而非结果。
  • 对于std::promise有点不一样,如果想存储异常而非正常的结果,使用set_exception()替代set_value。
    extern std::promise some_promise; try { some_promise.set_value(calculate_value()); } catch(...) { //1使用current_exception提取calculate_value()中抛出的异常 //2使用set_exception()储存异常 some_promise.set_exception(std::current_exception()); //或者 //some_promise.set_exception(std::copy_exception(std::logic_error("foo "))); }

  • 当异常被储存,future将就绪,通过调用future::get()将重新抛出存储的异常。
  • 待续
std::shared_future
  • std::future处理了线程之间传输数据所需的所有同步,但是对std::future特定实例的成员函数调用不是线程安全的——本身如果从多个线程访问单个std::future对象而不进行额外的同步,那么将面临数据竞争和未定义的行为。
    std::future 模型对异步结果有唯一所有权,只有一个线程可以提取到异步结果(通过get),在第一次调用get()之后,就已经没有要提取的值了。
  • std::shared_future可以让多个线程可以等待相同的事件
    std::future是可以moveable,对异步结果的唯一所有权可以在future实例之间转移(通过移动构造语义),但是每次只有一个实例引用特定的异步结果。但是std::shared_future实例是copyable,所以可以有多个shared_future对象引用相同的关联状态.
  • 单个shared_future对象上的成员函数仍然不同步,从多个线程访问单个对象时,为了避免数据冲突,必须使用锁来保护访问
    使用它的首选方法是获取对象的副本,并让每个线程访问自己的副本。如果每个线程通过自己的std::shared_future对象访问共享异步状态,那么从多个线程访问该状态是安全的。
  • 通过future实例构造shared_future实例,必须转移所有权的方式进行
    std::promise p; std::future f = p.get_future(); //构造shared_future方式一: std::shared_future sf(f); //error std::shared_future sf(std::move(f)); //move触发移动构造 //方式二: std::shared_future sf = f.share(); //方式三: std::shared_future sf(std::move(p.get_future())); assert(!f.valid()); assert(sf.valid());

    推荐阅读