Concurrency With C++

不同语言/操作系统/硬件的线程实现还不一样,因此学一个比较主流的多线程库还是挺重要的。
C++ STL的线程支持库就是一个好例子。在这之前,最好先复习一下基础知识。

基础知识

  • 不变量(invariants) 是指对数据结构的断言
    • 例:某种帐本,任意前缀和不为负数。
    • 例:链表某个节点的后继的前驱是自己。
    • 众所周知,不良的并发编程会破坏不变量!
    • 破坏不变量的根源是写操作。
  • 在并发中的竞争条件(race condition) 是指,任何结果依赖于在两个或多个线程上操作执行的相对顺序。
    • 有问题的竞争条件会破坏不变量,引发Bug。
    • Debug模式下可能无法发现有问题的竞争条件。
    • 下文的竞争条件默认是有问题的。
    • 在类内部使用互斥锁不能完全消除竞争条件。
      • 例:return &locked_data; 泄露指针或引用,使得数据离开内部锁的保护!
      • 例:template<typename F> ... F(&locked_data); 可能已经泄露了指针或引用!
      • 例:if (!s.empty()) s.pop(); 很可能引发一次不安全的pop!
      • 例:tmp = s.top(); s.pop(); return tmp; 在return时发生构造函数异常,数据丢失!
      • 解决办法:
        1. 传入引用或指针来接收返回值。缺点:一些类不支持赋值。
        2. 要求拷贝/移动构造函数不抛出异常。缺点:用户说我偏不。
        3. 返回指向原返回值的指针,最好配合shared_ptr。缺点:性能损耗。
        4. 1+2 或者 1+3。
  • 死锁
    • 四要素:互斥,持有并等待,非抢占,循环等待。
    • 解决办法:破坏四要素中任意一个
      1. 保证上锁的顺序是唯一的。
      2. 使用std::lock()
  • 锁的粒度是指单个锁保护的数据量和保护时间。
    • 挑选合适的数据量大小。
    • 在耗时的操作中尽可能地释放锁,例如I/O操作。
  • 懒汉模式特指单例模式中的延迟初始化。
    • 裸懒汉模式显然是线程不安全的。
    • 双重检查加锁也充满了陷阱,因为最外层的检查可能是错误的(赋好了指针,但未彻底完成初始化)!
    • 理想的解决方法是保证标志位是最后初始化的(要防止编译器乱序优化),而不要将指针看作标志位。
    • 使用std::call_once
    • 还可以利用C++的static局部变量实现懒汉模式。C++11标准保证这是线程安全的!
  • 读写锁是指共享锁。
    • 用于很少更新的数据结构。
    • C++11啥也没有。
    • C++14新增shared_timed_mutex
    • C++17新增shared_mutex,性能更好,好耶!
    • 推荐将shared_mutex声明为mutable
    • 当心读者太多,写者饿死!
  • 递归锁允许一个线程多次上锁,但必须进行相同次数的释放锁。
    • 例:A.a()需要锁,A.b()也需要锁,而且A.a()需要调用A.b()
    • 不建议这样用,因为A.b()可能要面对错误的数据。
    • 建议抽出A.a()A.b()的公共部分,成为一个新的私有函数,且不上锁(因为已经上了锁)。

C++ Library

Thread 线程

cppreference thread

#include <thread>

起一个线程!

  • thread
    • 默认构造不代表任何线程
    • 可移动,不可复制
    • 构造函数完成后立即就绪
    • 若子线程抛出异常,则调用std::terminate中止进程
    • 使用thread_local关键字,令变量的生命周期和所属线程相同
    • join() 调用者等待,直到线程执行完毕
    • detach() 分离线程和线程对象,此时可以安全地销毁线程对象。
    • joinable() 是否可以使用join()。正在运行中、未分离的线程都是 joinable.
    • thread::hardware_concurrency() 获取线程数参考值。
  • this_thread 命名空间
    • yield() 建议当前线程进入就绪状态
    • get_id() 获取线程id。结果可用<<序列化,虽然意义不大。
    • sleep_for(duration)/sleep_until(time_point) 当前线程阻塞一段时间

提到多线程,几乎避不开互斥、同步等概念。

Mutex 互斥锁

cppreference mutex

#include <mutex>

主要用于保护数据的操作原子性。

若多个线程请求同一个锁,而此锁尚未被释放,则当锁释放时,其中一个线程获得锁(退出阻塞),其余线程维持阻塞。

  • mutex
    • lock() 阻塞,直到获得锁
    • unlock()
    • try_lock() 若无法获得锁,则返回 false 但不阻塞
    • 小心mutex的生命周期。销毁一个被其他线程持有的锁是后果无定义的。
    • 销毁线程前应当释放锁,否则后果无定义。
    • 不可拷贝,不可移动
    • 同一线程不可对同一mutex多次上锁,这样做的后果是无定义的。
  • recursive_mutex
    • 同一线程可以多次上锁。上锁和解锁应一一对应。
    • 例如std::lock_guard<std::recursive_mutex> lk(m);
  • timed_mutex
    • try_lock_for(timeout_duration) 最多等待一段时间,无法获得锁则返回 false
    • try_lock_until(timeout_time) 最多等待至一个时间点,无法获得锁则返回 false
  • recursive_timed_mutex
    • 混合了两者的功能
  • void lock(Lockable1& lock1, Lockable2& lock2, LockableN&... lockn) 全局函数
    • 安全地“同时”请求多个互斥锁
    • 保证不会引发死锁
  • lock_guard
    • RAII 请求和释放一个锁
    • 必须拥有一个锁
    • lock_guard( mutex_type& m, std::adopt_lock_t t );假定已经获得了锁
  • scoped_lock
    • RAII 同时请求和释放多个锁
    • 保证不会引发死锁
  • unique_lock
    • RAII 独占锁
    • 可以主动解锁。常见于条件变量。暂时或提前释放锁通常可提高时间性能。
    • 可以不拥有锁,因此有空间和性能损耗。常见于需要延迟上锁或转移所有权。
    • adopt_lock假定已经获得了锁,defer_lock假定延迟获得锁。
    • 若延迟上锁,随后可用std::lock(unique_lock, ...)上锁。
  • shared_lock
    • RAII 共享锁
  • void call_once( std::once_flag& flag, Callable&& f, Args&&... args ); 函数
    • 全局仅执行函数f(args)一次
    • flag是复位,则允许执行f(args)
    • 支持多线程并发
    • f(args)正常结束,则flag置位,防止第二次调用
    • f(args)抛出未处理的异常,则flag保持复位,允许第二次调用
    • 一个flag可以对应多个f,不保证哪个函数先执行。

Condition Variable 条件变量

cppreference condition_variable

条件变量可以用来阻塞一个或多个线程,直到某个活动线程修改一个共享变量(即条件),并通知条件变量。

  1. 为了保护上述共享变量,需要上互斥锁。
  2. 对比互斥锁的优势是:避免定时,定向唤醒,占用资源低,事件驱动。
  3. 被唤醒的线程需要手动或自动地检查条件,若条件失败,则再次被阻塞,等待被唤醒。

#include <condition_variable>

  • condition_variable
    • 只有默认构造函数
    • void wait( std::unique_lock<std::mutex>& lock );
      • 释放锁,然后进入阻塞,直到被唤醒。
      • 被唤醒后,先获得锁。
    • void wait( std::unique_lock<std::mutex>& lock, Predicate pred );
      • 释放锁,然后进入阻塞,直到被唤醒。
      • 被唤醒后,先获得锁
      • 只要pred不满足,就会再次释放锁然后阻塞。
      • 避免在pred里加副作用,因为无法确定它会执行多少次。
    • notify_one()/notify_all()
      • 唤醒关联的阻塞线程。
      • 当前线程应当事先释放锁(lock_guard应已析构)。
    • void notify_all_at_thread_exit( std::condition_variable& cond, std::unique_lock<std::mutex> lk );
      • 全局函数。
      • 线程退出时(所有thread_local被销毁后),释放锁,然后唤醒全部。

Future & Async 未来与异步

cppreference future

有时候,起一个线程仅仅是为了一次返回值。thread不能提供返回值,条件变量又太重量级了,你需要使用async

async可以轻量级地开启一个带返回值的线程,通常短期内你用不到返回值。一段时间后,你需要返回值了,才阻塞自己,直到返回值就绪。需要花费一段时间才能就绪的返回值就是future

async也允许你推迟执行,这意味着没有新线程,而是同步地执行。

  • async函数
    • 可接受std::launch policy为第一个参数,允许的值有:
      • std::launch::async 开启新线程,异步执行。
      • std::launch::deferred 没有新线程,懒惰同步执行。
      • launch::async|launch::deferred 默认值,由实现选择策略。
      • C++标准允许实现采用不同的默认策略,甚至允许新增策略。
    • async返回一个future
  • future<>
    • 表示一个值,这个值可能正在计算中,可能就绪,也可能是抛出的异常。
    • get()成员函数。 保证返回就绪的值或者抛出异常。(若未就绪,则当前线程阻塞)。此函数只能调用一次,然后将valid()设置为false。
    • valid()成员函数。 检查下一次get()的合法性,即有没有共享状态。
    • wait()成员函数。 保证值就绪或者抛出异常。(若未就绪,则阻塞)
    • wait_for, wait_until成员函数。有限等待。
    • share()成员函数。移动自己,构造一个shared_future<>
    • 可移动,不可复制。
  • shared_future<>
    • future<>类似,但可以get()多次。
    • 可移动也可以复制。
    • 正确的用法就是每个线程持有一份复制。
  • packaged_task<Callable>
    • 将future绑定到函数(或可调用对象),方便自定义调度。
    • 构造器仅接受函数(可以是lambda!),不绑定参数。只有在operator ()时才传入参数。
    • packaged_task本身也是一个可调用对象!可以用它起一个新线程。thread(move(task), args...)
    • 可移动,不可复制。
    • get_future() 获取绑定的future。
    • operator(args...) 立即在当前线程执行,然后future就绪,意味着唤醒等待此future的线程。
    • make_ready_at_thread_exit(args...) 立即在当前线程执行,但是future不就绪。当线程退出且所有thread_local对象销毁后,future才就绪。
    • reset() 重新构造自己。旧版的future作废。
  • promise<>
    • 比async提供更强的自由度,可以手动设置future!如此可以在设置future后继续清理工作。
    • 可移动,不可复制。
    • 一般先get_future()留下接口,然后movepromise到别的线程,在别的线程set_value
    • set_value(value) 设置关联的future的值,唤醒在等待的线程。
    • set_value_at_thread_exit(value) 类似set_value,但当前线程退出后,才唤醒在等待的线程。
    • void set_exception( std::exception_ptr p ) 令future的get()抛出异常。
      • 在try-catch块中,配合std::current_exception()使用
      • 不用try-catch,性能更高,配合make_exception_ptr(E e)用。
    • set_exception_at_thread_exit 同理。

内存模型和原子操作

设计无锁数据结构

  • 无锁数据结构应如何设计?
  • 无锁数据结构的内存管理有何难题?
  • 更多的建议?