[Cpp基础][09]C++11多线程编程基础

Last Updated on 2024年2月17日

C++11为多线程开发准备了一套标准的基础设施,主要为<thread>,<mutex>,<condition_variable>, 这套组件基本是"pthread"的标准化.本文主要介绍C++11多线程开发相关的基础内容.如果需要更多的细节,可以直接google或者查手册进行了解.

可调用对象

可调用对象在C++ 11中是非常重要的概念, 它使得"函数"变得更加像对象. 线程库完整的支持可调用对象.

  • 支持()运算符的对象都是可调用对象,这些工具的设计比较独立,可以在使用时再查reference,常见的有
    • bind创建的对象
    • lambda
    • 重载了()的类
    • 函数/函数指针
    • function<T>.

std::thread

  • 标准库提供了std::thread来进行基础的线程管理,std::thread类实例在逻辑上是物理线程的unique容器,只能容纳一个物理线程.
    • 可以通过移动赋值运算符在不同的容器中转移线程.
    • 本文将称std::thread对象为线程容器,物理线程则直接称为线程.
  • std::thread的默认构造函数将初始化一个空容器.
    • std::thread thread_var(callable_obj,args):创建一个线程容器,载入新线程,且新线程自动开始执行.
    • 可以通过移动赋值/移动构造在std::thread之间转移线程的所有权,转移线程的管理权不会中断线程的运行.
    • 和pthread一样,我们不能直接获得线程中函数callable_obj(args)的返回值.
  • 由于实现原因,一个线程必须在适当的时机被detachjoin,否则就会造成线程资源的泄露("线程返回状态"也是一种线程资源,如果不用detach或join通知OS,这个资源不会被释放)
    • x.join(): 在当前线程等待线程x的返回状态,得到之后再释放线程的资源. 当join() return后,x恢复到原始的空容器状态.
    • x.detach(): 通知OS,可以直接在线程执行完后释放资源. detach() return之后,x恢复到原始的空容器状态.
    • std::thread的实现很严格, 对于没有join()detach()x,x析构时将触发std::terminate()
  • 在pthread中,可以在线程内部detach(my_self),这实在是一种很差的编程风格,std::thread不支持这一特性
  • std::thread::hardware_concurrency()可以获得当前设备的物理最大并行数.
    • 一个较好的编程模型是: 按照物理线程数创建一个线程池, 我们手动的为线程池内的线程分配task, 而不是创建一大堆线程,由OS进行调度 .
  • x.get_id()std::this_thread::get_id()方法可以获得线程ID,能从根本上鉴别不同的线程实体.

注意 std::thread 的构造函数

  • std::thread构造函数的参数都是T型的,其效果大致如下: std::thread thd(foo,args)中的fooargs都会用于初始化新线程中的同类型临时变量.在新线程内执行的实际是tmp_foo(tmp_args),所有的tmp_args都是右值类型.
  • thread的这种实现可能导致很多与预期不符的错误,尤其是在涉及引用传值和class时.
    • 例如,对于void foo(MyT& in),在std::thread(foo,data)时,由于拷贝后的临时对象是右值类型, 无法传入void foo(MyT&), 所以std::thread(foo,data)一般会触发编译错误
    • 例如,为了提高性能,你使用了void bar(const T& in), 其实std::thread thd(bar,var)仍然没有避免对var的拷贝.
    • 例如, 对于下面的代码, 实际会拷贝buf的指针到char * tmp_buf,在新线程内执行bar(tmp_buf)时, oops()可能已经return,此时tmp_buf就指向了悬空的内容.
void bar(std::string in){
}

void oops(){
    char buf[128]="xxxxx";
    std::thread local(bar,buf);
    local.detach();
}
  • 解决上述只有一个稳定而通用的方法: 只使用内置类型和指针作为形参

线程间通信核心工具

如果线程们各干各的事, 彼此之间没有任何交集,那么情况是最理想的. 在实践中,需要解决的主要有两个问题.线程同步和线程安全. 线程同步主要是指: 我们希望多个并发的线程按照可控的顺序执行. 线程安全主要是指: 我们希望多个线程共享同一个对象时, 大家彼此都能相安无事.

基于<mutex>的线程安全

并行安全问题的解决,主要的手段是"互斥", 意思是, 在某个线程访问对象时, 其他线程都不能访问它."互斥"是一种策略,实现互斥的工具主要有两类,分别称为"锁"和"原子操作". 在实践中,实现"锁"和"原子操作"的手段很多, 不同的实现一般有不同的依赖, 或者不同的特性. 但是逻辑上功能都是相似的

典型的"锁"是这样一种逻辑: 想要访问对象,就必须要拿到对象上的"锁", 如果拿不到锁,说明别人正在使用,我们不应该操作对象,如果拿到锁了, 就可以放心的操作对象, 锁的典型用法如下

lock_of_objx.lock(); //只有在拿到锁时,才会return
process(objx);
lock_of_objx.unlock(); // 我们不需要锁了,可以让给别人.

原子操作则是指: 不会被中断的操作. 在多核环境中,可以理解为"不会被干扰的操作",即在Op执行的过程中,计算单元和操作数都不会被外界干扰,从而保证计算结果一定是可预测的. 一般而言,当我们说原子操作时,都是在说硬件层面的原子操作(因为我们也可以用通过加锁来实现逻辑上的原子操作).

锁的实现归根到底要基于硬件提供的原子操作. 一般而言, 仅在进行lock-free编程时才使用原子操作, 因为原子操作不只需要保证原子性, 还要维护happens-before逻辑, 这就与硬件强相关了, 比使用锁的心智负担要大的多.

lock-free 的意思是: 即便某个线程因某些原因(代码逻辑错误或硬件调度问题)被阻塞, 也不会影响到其他线程. 显然,使用锁的话,是不可能达成这种目标的,如果一个线程拿到锁后被阻塞,导致锁无法被释放,那么其他线程就再也没有机会拿到这个锁了.

  • C++11标准库提供的工具核心为<mutex>,std::mutex的性能一般很好,可以放心使用.

    • 一般而言,总是应当使用std::mutex,读写锁,recursive-mutex等都应当避免使用.
    • mutex.unlock()释放锁后,如果已经有其他线程需要锁,那么会立刻切换到其他线程.
    • 如果有多个线程争用锁,那么将由线程调度器决定哪个线程被唤醒并持有mutex
  • 标准库提供的两个辅助管理类模板

    • std::lock_guard<>:实现自动lock()unlock()
    • std::scoped_lock<>: C++17以后应当用于替代lock_guard, 它的主要区别是可以同时lock多个mutex
    • std::unique_lock<>: 可以转移mutex的管理权,例如把一个已经lockmutex转交给另一个函数.
  • 标准库提供的辅助函数 std::callonce, 可以保证函数只被调用一次,在多线程,或者可重入的函数中很有用.

  • 死锁的避免: 本质上,死锁是一个环状依赖, 假如我们在持有锁A的同时依赖持有锁B,那么就引入了一个依赖边, 所有的依赖边都不应该成环,才能避免死锁。

    • 策略1:避免同时持有多个锁,每次需要持有新的锁时都释放已有的锁
    • 策略2: 层次锁,每一个线程都有一个therad_local 的 current_level, 每个mutex都有一个对应的level,每次上锁都会更新current_level,只允许current_level单调变化,这样就能保证所有线程的上锁顺序一致。当尝试上锁时发现 mutex.level不单调,可以直接抛出一个运行期错误。按照这种策略实现锁时,所有依赖的方向都是单调的,不可能成环
  • 线程安全的一大敌人:回调函数,因为回调函数的内容是库开发者无法控制的,当允许使用回调函数时,必须要避免权限泄露,死锁等问题。

  • std::shared_mutex,可以实现类似读写锁的功能,不过相似的逻辑最好使用另一篇博文内建议的方法

    • 可以按exclusive的形式获得锁,获得成功后,没有任何形式能再获得锁。
    • 可以按shared的形式获得锁,获得成功后,可以按shared形式再次获得锁(类似recursive-lock)。
  • std::recursive_mutex: 当一个线程内持有锁时,再lock会抛出异常,recursive_mutex可以避免这个问题,不过这一般意味着错误的设计,你不应当使用它。

    • 一个典型的使用场景是,MyT::API0需要调用MyT::API1,而两者都需要持有锁以保证安全,不过这个场景更建议抽象一个无锁的MyT::PrivateFn,然后让两个API都调用它。

基于<condition_variable>的线程同步

从代码的角度看, 这里实现的线程同步就是: 一个线程到达某一个特定点位后,就先wait()暂停下来, 直到被notify()唤醒后,才继续执行.

  • 标准库提供了类似于pthread的高效且安全的睡眠-唤醒机制.

    • the_cv.notify()只能通知已经调用the_cv.wait()的线程.如果发出通知时,没有线程已经调用the_cv.wait(...), 那么这次通知就会被miss掉.
  • 意外唤醒(伪唤醒): 在现代系统中cv.wait(...)以及sleep()等主动让出CPU时间的函数会因各种各样的原因被突然唤醒(最常见的原因就是被signal或异常唤醒),使线程继续执行后续指令.这种唤醒对于我们而言完全是意外的, 有可能导致一些不可预知的行为. 因此,在实践中,我们总是会在cv.wait(...) return后,再次检查是否应该继续睡眠. 也就是

    while(IsNeedWait){
    cv.wait(...)
    }
  • 从原理上说,实现condion_variable的功能是不需要MUTEX的,只要能sleep,wakeup就够了. condition variablemutex组合使用主要是为了避免意外的notify miss, 分析如下.

// 这是一个Naive的例子,只有两个线程, wait()最初的功能就纯粹是睡眠等待.

// 场景1:在某一次Consumer检查g_data.empty()时,发现为空,进入while()循环,在执行完g_data_mutex.unlock(),且还未执行g_cv.wait()时,OS把线程切换到Producer,并且Producer完整的执行了一遍,发出了一个notify(),显然,这一次notify就丢失了.
// 场景2:Consumer陷入wait()后, Producer触发了一次notify(), 此时立刻切换到Counsmer线程,使其wait() return,之后,OS又把线程切换到Producer,又触发了一次notify,那么这第二个notify就丢失了

MyData g_data;
std::mutex g_data_mutex;
MyCondition g_cv;

void Consumer(){
    while(1){
        g_data_mutex.lock();
        while(g_data.empty()){
            g_data_mutex.unlock();
            g_cv.wait()
            g_data_mutex.lock();
        }
        auto job=pop_one_job(&g_data);
        g_data_mutex.unlock();
        process(&job);
    }

}
void Producer(){
    while(1){
        g_data_mutex.lock();
        prepare_data(&g_data);
        g_data_mutex.unlock();
        g_cv.notify();
    }
}

// 这个问题需要同时使用以下两个策略来解决: `unlock(),wait(),lock()`这三个操作必须组合成具备原子性的操作, 这可以避免场景1和场景2

// 这里其实还有一个不重要的小问题, 假定Producer2执行unlock之后, Consumer2会立刻开始检测g_data.empty(),然后发现非空, 就开始pop job并处理. 之后再切换回Producer2之后, 再执行的notify实际将没有任何作用, 这也算一个notify miss, 虽然它并不影响代码的实用性, 我们一般也会把notify放到lock()保护的范围内, 以避免问题.
void Consumer2(){
    while(1){
        std::unique_lock<std::mutex> u_lock(g_data_mutex)
        while(g_data.empty()){
            std_g_cv.wait(u_lock);
        }
        auto job=pop_one_job(&g_data);
        u_lock.unlock()
        process(&job);
    }
}

void Producer2(){
    while(1){
        g_data_mutex.lock();
        prepare_data(&g_data);
        g_data_mutex.unlock();
        std_g_cv.notify();
    }
}

其他

有了mutexcondition variable,就已经能够实现多线程中需要的所有功能了, 在一些开发规范中, 甚至只推荐使用这两种技术. 在这两个技术的基础上, C++11 额外提供了一些高级工具, 如std::async(),std::promise,std::future,std::packaged_task<>。 这些工具更多属于语法糖或者工具

等待

  • 标准库额外提供了wait_forwait_until系列API,可以控制等待的超时时间. 为了支持这个特性, 标准库还给出了点时间/段时间的标准定义.
  • 标准库给出的时间有两类:点时间time_point和段时间time_duration,且提供了相应的算数支持,这很自然.
  • 在C++标准中,有steady clockunsteady clock的区别,它们的区别是"是否可能被动态调整"
    • unsteady clock: 它的值可以被动态调整. (例如系统时钟, 这是一个共享的值, 甚至也可以在其他进程动态修改)
    • steady clock: 它的值总是会随CPU时钟tick-tock而增加. (断电重启后这个值才会清零)
    • 大部分时候,我们的程序都只关心时间间隔/时间长度,而不关注现在究竟是哪个时间点, 此时,总是应该优先使用steady clock,因为它能保证单增,从而不会出现"时光倒流"的问题.
  • 标准库提供的时钟中,最常用的有三个.
    • std::chrono::high_resolution_clock: 当前系统最高精度的时钟
    • std::chrono::system_clock: 用于代表物理时间的时钟(unsteady)
    • std::chrono::steady_clock: 保证是一个稳定时钟