[Cpp基础][09]C++11多线程编程基础

Last Updated on 2020年4月15日

C++11为多线程开发准备了一套标准的基础设施,主要为<thread>,<mutex>,<condition_variable>, 这套组件基本是”pthread”的标准化.本文主要介绍C++11多线程开发相关的基础内容.如果需要更多的细节,可以直接google或者查手册进行了解.

可调用对象

可调用对象在C++ 11中是非常重要的概念, 它使得”函数”变得更加像对象. 线程库完整的支持可调用对象.

  • 支持()运算符的对象都是可调用对象,这些工具的设计比较独立,可以在使用时再查reference,常见的有
    • bind创建的对象
    • lambda
    • 重载了()的类
    • 函数/函数指针
    • function<T>.

std::thread

  • 标准库提供了std::thread来进行基础的线程管理,std::thread类实例在逻辑上是物理线程的unique容器,只能容纳一个物理线程.
    • 可以通过移动赋值运算符在不同的容器中转移线程.
    • 本文将称std::thread对象为线程容器,物理线程则直接称为线程.
  • std::thread的默认构造函数将初始化一个空容器.
    • std::thread thread_var(callable_obj,args):创建一个线程容器,载入新线程,且新线程自动开始执行.
    • 可以通过移动赋值/移动构造在std::thread之间转移线程的所有权,转移线程的管理权不会中断线程的运行.
    • 和pthread一样,我们不能直接获得线程中函数callable_obj(args)的返回值.
  • 由于实现原因,一个线程必须在适当的时机被detachjoin,否则就会造成线程资源的泄露(“线程返回状态”也是一种线程资源,如果不用detach或join通知OS,这个资源不会被释放)
    • x.join(): 在当前线程等待线程x的返回状态,得到之后再释放线程的资源. 当join() return后,x恢复到原始的空容器状态.
    • x.detach(): 通知OS,可以直接在线程执行完后释放资源. detach() return之后,x恢复到原始的空容器状态.
    • std::thread的实现很严格, 对于没有join()detach()x,x析构时将触发std::terminate()
  • 在pthread中,可以在线程内部detach(my_self),这实在是一种很差的编程风格,std::thread不支持这一特性
  • std::thread::hardware_concurrency()可以获得当前设备的物理最大并行数.
    • 一个较好的编程模型是: 按照物理线程数创建一个线程池, 我们手动的为线程池内的线程分配task, 而不是创建一大堆线程,由OS进行调度 .
  • x.get_id()std::this_thread::get_id()方法可以获得线程ID,能从根本上鉴别不同的线程实体.

注意 std::thread 的构造函数

  • std::thread构造函数的参数都是T型的,其效果大致如下: std::thread thd(foo,args)中的fooargs都会用于初始化新线程中的同类型临时变量.在新线程内执行的实际是tmp_foo(tmp_args),所有的tmp_args都是右值类型.
  • thread的这种实现可能导致很多与预期不符的错误,尤其是在涉及引用传值和class时.
    • 例如,对于void foo(MyT& in),在std::thread(foo,data)时,由于拷贝后的临时对象是右值类型, 无法传入void foo(MyT&), 所以std::thread(foo,data)一般会触发编译错误
    • 例如,为了提高性能,你使用了void bar(const T& in), 其实std::thread thd(bar,var)仍然没有避免对var的拷贝.
    • 例如, 对于下面的代码, 实际会拷贝buf的指针到char * tmp_buf,在新线程内执行bar(tmp_buf)时, oops()可能已经return,此时tmp_buf就指向了悬空的内容.
void bar(std::string in){
}

void oops(){
    char buf[128]="xxxxx";
    std::thread local(bar,buf);
    local.detach();
}
  • 解决上述只有一个稳定而通用的方法: 只使用内置类型和指针作为形参,否则可能需要std::ref()std::move()

线程间通信核心工具

如果线程们各干各的事, 彼此之间没有任何交集,那么情况是最理想的. 在实践中,需要解决的主要有两个问题.线程同步和线程安全. 线程同步主要是指: 我们希望多个并发的线程按照可控的顺序执行. 线程安全主要是指: 我们希望多个线程共享同一个对象时, 大家彼此都能相安无事.

基于<mutex>的线程安全

并行安全问题的解决,主要的手段是”互斥”, 意思是, 在某个线程访问对象时, 其他线程都不能访问它.”互斥”是一种策略,实现互斥的工具主要有两类,分别称为”锁”和”原子操作”. 在实践中,实现”锁”和”原子操作”的手段很多, 不同的实现一般有不同的依赖, 或者不同的特性. 但是逻辑上功能都是相似的

典型的”锁”是这样一种逻辑: 想要访问对象,就必须要拿到对象上的”锁”, 如果拿不到锁,说明别人正在使用,我们不应该操作对象,如果拿到锁了, 就可以放心的操作对象, 锁的典型用法如下

lock_of_objx.lock(); //只有在拿到锁时,才会return
process(objx);
lock_of_objx.unlock(); // 我们不需要锁了,可以让给别人.

原子操作则是指: 不会被中断的操作. 在多核环境中,可以理解为”不会被干扰的操作”,即在Op执行的过程中,计算单元和操作数都不会被外界干扰,从而保证计算结果一定是可预测的. 一般而言,当我们说原子操作时,都是在说硬件层面的原子操作(因为我们也可以用通过加锁来实现逻辑上的原子操作).

锁的实现归根到底要基于硬件提供的原子操作. 一般而言,越接近硬件底层的锁,功能越简单,对CPU资源的浪费也越严重,在实践中, 常常会基于底层的锁,逐层实现更高级的锁,以满足我们的需求.

  • C++11标准库提供的工具核心为<mutex>,std::mutex的性能一般很好,可以放心使用.
    • 一般而言,总是应当使用std::mutex,使用其他XX mutex能解决的问题都可以用标准的mutex解决,然而引入的额外复杂性往往是得不偿失的,如shared_mutex(即read_write_mutex),recursive_mutex
    • mutex.unlock()释放锁后,如果已经有其他线程需要锁,那么会立刻切换到其他线程.
    • 因此,不会出现单个线程重复lock/unlock的情况,mutex一定会被让渡出去.
    • 如果有多个线程争用锁,那么将由线程调度器决定哪个线程被唤醒并持有mutex
  • 标准库提供的两个辅助管理类模板
    • std::lock_guard<>:实现自动lock()unlock()
    • std::unique_lock<>: 可以转移mutex的管理权,例如把一个已经lockmutex转交给另一个函数.
  • 标准库提供的辅助函数 std::callonce, 可以保证函数只被调用一次,在多线程,或者可重入的函数中很有用.

基于<condition_variable>的线程同步

从代码的角度看, 这里实现的线程同步就是: 一个线程到达某一个特定点位后,就先wait()暂停下来, 直到被notify()唤醒后,才继续执行.

  • 标准库提供了类似于pthread的高效且安全的睡眠-唤醒机制.
    • the_cv.notify()只能通知已经调用the_cv.wait()的线程.如果发出通知时,没有线程已经调用the_cv.wait(...), 那么这次通知就会被miss掉.
  • 意外唤醒(伪唤醒): 在现代系统中cv.wait(...)以及sleep()等主动让出CPU时间的函数会因各种各样的原因被突然唤醒(最常见的原因就是被signal或异常唤醒),使线程继续执行后续指令.这种唤醒对于我们而言完全是意外的, 有可能导致一些不可预知的行为. 因此,在实践中,我们总是会在cv.wait(...) return后,再次检查是否应该继续睡眠. 也就是
while(IsNeedWait){
    cv.wait(...)
}
  • 从原理上说,实现condion_variable的功能是不需要锁的,只要能sleep,wakeup就够了. condition variablemutex组合使用主要是为了避免意外的notify miss, 分析如下.
// 这是一个Naive的例子,只有两个线程, wait()最初的功能就纯粹是睡眠等待.

// 场景1:在某一次Consumer检查g_data.empty()时,发现为空,进入while()循环,在执行完g_data_mutex.unlock(),且还未执行g_cv.wait()时,OS把线程切换到Producer,并且Producer完整的执行了一遍,发出了一个notify(),显然,这一次notify就丢失了.
// 场景2:Consumer陷入wait()后, Producer触发了一次notify(), 此时立刻切换到Counsmer线程,使其wait() return,之后,OS又把线程切换到Producer,又触发了一次notify,那么这第二个notify就丢失了

MyData g_data;
std::mutex g_data_mutex;
MyCondition g_cv;

void Consumer(){
    while(1){
        g_data_mutex.lock();
        while(g_data.empty()){
            g_data_mutex.unlock();
            g_cv.wait()
            g_data_mutex.lock();
        }
        auto job=pop_one_job(&g_data);
        g_data_mutex.unlock();
        process(&job);
    }

}
void Producer(){
    while(1){
        g_data_mutex.lock();
        prepare_data(&g_data);
        g_cv.notify();
        g_data_mutex.unlock();
    }
}

// 这个问题可以使用原子的`std_g_cv.wait()`来解决,这个版本的wait将会自动在合适的时机调用unlock和lock,避免前面的问题
void Consumer2(){
    while(1){
        g_data_mutex.lock();
        while(g_data.empty()){
            std_g_cv.wait(g_data_mutex);
        }
        auto job=pop_one_job(&g_data);
        g_data_mutex.unlock();
        process(&job);
    }
}

void Producer2(){
    while(1){
        g_data_mutex.lock();
        prepare_data(&g_data);
        std_g_cv.notify(); // 有的场景中, 少量`notify miss` 可能并不重要. 此时可以把notify移出临界区.
        g_data_mutex.unlock();  
    }
}

  • void wait(std::uniqe_lock<std::mutex> &)
    • 首先, 必须是std::uniqe_lock<std::mutex>类型的(标准库还提供了std::condition_variable_any,它可以传入任何类型的锁,但是相对的性能一般会更差一些)
    • 传入的uniqe_lock必须已经上锁.
  • notify_one(),notify_all(): 字面意思
  • 能访问同一cv对象的线程都可以调用cv.notify()cv.wait(),这就能让我们实现类似于ping-pong执行的逻辑,一个线程notify另一个线程后,自己立刻陷入wait.等待另一个线程notify回来.

线程间通信额外工具

有了mutexcondition variable,就已经能够实现多线程中需要的所有功能了, 在一些开发规范中, 甚至只推荐使用这两种技术. 在这两个技术的基础上, C++11 额外提供了一些高级工具, 这些工具基本都是伴随着<future>这个头文件而来的, 可以更快捷的实现一些常见功能.

一次性数据管道

有这样一种场景十分常见, 一个线程需要读取某个值, 如果得不到,就直接阻塞在原地,直到这个值被另一个线程设置. 标准库为这种场景提供了组件std::futurestd::promise, 这两个组件极有可能成为历史长河的渣滓. 所以在此仅简单介绍.

注意: 如果你需要频繁的传递数据, 那么就应该基于mutexcondition varaible自己设计一个结构,例如BlockingQueuefuture-promise是设计为一次性使用的,高频的使用future-promise往往会带来更大的开销.

  • 一次性管道自身称为SharedState, 这个SharedState的实现及管理由标准库实现方负责.(因为这个一次性管道能同时被多个线程访问,所以称为SharedState是比较合理的)
  • 标准库规定, 用户只能通过std::promisestd::future来和SharedState交互. 编译器规定了promisefuture的行为,但是并没有规定SharedState实现方法
  • 从逻辑上看, std::promise负责创建并持有std::shared_ptr<SharedState>,而std::future则仅仅持有一个std::shared_ptr<SharedSate>以引用这个对象
  • std::futurestd::promise自身都是unique型的,使用移动语义赋值/构造.
// 这是一个非常粗糙的例子,没有考虑安全检查,性能等问题,仅仅是为了说明具体的行为规则.
cpp struct  SharedState
{
void  set(); // 设置值
void  get(); // 读取值:如果有错误,就报异常; 如果有值,就返回值; 如果没有值,就阻塞到有值/错误为止;
void  set_error(); // 设置错误
};

struct  Future
{
T get()
    {
    T ret = p_state->get();
    p_state.reset(); // 得到值后,就自动解除绑定
    return ret;
    }
private:
    friend struct Promise;
    shared_ptr<SharedState> p_state;
};

struct  Promise
{
Promise()
    {
    p_state = std::make_shared<SharedState>(); //创建SharedState
    future.p_state  = p_state;// 为内部的future设置绑定
    }

~Promise()
    {
    if (p_state)
        {
        p_state->set_error(); // 如果析构时仍然没有调用`set_value`,那么就再也没有机会了, 我们需要通过set_error通知用户.
        }

    }

void  set_value(T v)
    {
    p_state->set(v);
    p_state.reset();// 设置完值后,自动解除绑定
    }

Future get_future()
    {
    return  std::move(future); // 把内部的future移动出去.
    }

private:
    shared_ptr<SharedState> p_state;
    Future future;
}
  • 其他信息:
    • std::future 的状态称为valid, 它用于表明是否持有一个有效的shared_ptr.
    • 各种文章中, SharedState的状态称为ready.
      • 存在一种std::shared_future,它被设计成使用拷贝语义传递,且调用get()并不会自动reset().
    • std::futurestd::shared_future的API都没有被设计为线程安全的.
      • 例如,对于一个g_shared_future,你不能在多个线程直接并行的调用g_shared_future.get()
      • C++11仅保证:绑定到同一个shared state的不同shared_future对象可以安全的并行get()

一次性任务.

  • std::async()函数: 快速创建异步执行的函数, 可以通过返回的future来读取返回值.( 这个函数的功能其实和std::thread差不多,但是多了一些行为控制)
  • std::packaged_task<>模板:把函数调用包装为可调用对象,该对象重载了operator(), 且有get_future()成员,使得函数的调用和返回值的获取可以分割在不同的线程中.

等待

  • 标准库额外提供了wait_forwait_until系列API,可以控制等待的超时时间. 为了支持这个特性, 标准库还给出了点时间/段时间的标准定义.
  • 标准库给出的时间有两类:点时间time_point和段时间time_duration,且提供了相应的算数支持,这很自然.
  • 在C++标准中,有steady clockunsteady clock的区别,它们的区别是”是否可能被动态调整”
    • unsteady clock: 它的值可以被调整,以消除累积误差. (它的值一般会通过电池存储在主板上)
    • steady clock: 它的值总是会随CPU时钟tick-tock而增加. (断电重启后这个值才会清零)
    • 大部分时候,我们的程序都只关心时间间隔/时间长度,而不关注现在究竟是哪个时间点, 此时,总是应该优先使用steady clock,因为它能保证单增,从而不会出现”时光倒流”的问题.
  • 标准库提供的时钟中,最常用的有三个.
    • std::chrono::high_resolution_clock: 当前系统最高精度的时钟
    • std::chrono::system_clock: 用于代表物理时间的时钟(unsteady)
    • std::chrono::steady_clock: 保证是一个稳定时钟

原子操作和内存模型

  • 在不加锁时, 并发访问内存地址可能引入冲突, 解决这种冲突的唯一方法就是序列化.
    • 策略1: 显式的使用锁, 使访问序列化
    • 策略2: 使用std::atomic<>,强制使多线程的并发访问序列化.
  • 标准库的<atomic>提供了一个std::atomic<>模板,且提供了很多特化,<atomic>中已经完整给出了常用内置类型对应的原子类型, 如std::atomic<int>. 在原子类型上进行的操作都是原子的.
    • 可以通过is_lock_free()成员来判断这些类型原子性的实现方法,如果是true, 原子性就是由硬件直接保证的. 如果是false, 那么类型内部实际会使用某种锁来实现原子性.
    • 标准库强制要求std::atomic_flag必须是lock_free的, 因为只有基于std::atomic_flag才能实现最简单的自旋锁,进而实现其他的原子操作.
  • C++的内存模型基本和OS提供的抽象是相同的.
    • 进程使用一个连续的虚拟空间
    • 每个线程有自己独立的函数栈, 以及”thread_local” 存储. 函数栈和thread_local存储使用的地址段在线程创建时由标准库动态分配,并初始化.
    • 每个线程都可以不受任何限制的访问进程虚拟空间内的任意地址.
    • 单个对象的内存布局和单线程状况一致
  • 内存序memory_order问题.
    • 在单线程时代, 只有编译器优化或CPU的乱序执行可能影响指令的执行顺序. 例如, 两个没有数据依赖的a[0]=1;a[1]=2;的执行顺序可能是不确定的.
    • 在多线程时代, 又引入了新的执行顺序不确定性. 例如,下面的两个线程中,我们并不能直接判断b的最终取值是哪个.
std::atomic<int> g_a=1;
thread1(){
    int b=g_a.load(); // atomic load
}
thread2(){
    g_a.store(2); // atomic store
}
  • 我们可以使用各种编程方法使得对原子对象的操作按预期进行. 同时,标准库也提供了std::memory_order, 他可以同时解决两个方面的问题: 单个线程内多个彼此没有依赖的原子操作的执行顺序, 多个线程间原子操作的执行顺序.
  • 对于标准库提供的原子类型,其原子操作默认使用的内存序为”memory_order_relaxed”, 这样的内存序只能保证”原子性”, 但是并不能保证线程内/线程间执行指令的顺序. 标准库还提供了其他内存序, 使用标准库的内存序机制, 在并发进行原子操作时, 可以得到特定的内存IO顺序, 例如, 在上面的例子中, 应用内存序机制修改代码后, 我们可以保证b肯定读到1(或者肯定读到2).