通过实例化 std::mutex
创建互斥量实例,成员函数 lock()
可对互斥量上锁,unlock()
为解锁。不过,不推荐直接去调用成员函数,调用成员函数就意味着,必须在每个函数出口都要去调用 unlock()
(包括异常的情况)。C++ 标准库为互斥量提供了 RAII 模板类 std::lock_guard
,在构造时就能提供已锁的互斥量,并在析构时进行解锁,从而保证了互斥量能被正确解锁。
#include <thread>
#include <mutex>
#include <iostream>
int g_i = 0;
std::mutex g_i_mutex; // protects g_i
void safe_increment()
{
const std::lock_guard<std::mutex> lock(g_i_mutex);
++g_i;
std::cout << "g_i: " << g_i << "; in thread #"
<< std::this_thread::get_id() << '\n';
// g_i_mutex is automatically released when lock
// goes out of scope
}
int main()
{
std::cout << "g_i: " << g_i << "; in main()\n";
std::thread t1(safe_increment);
std::thread t2(safe_increment);
t1.join();
t2.join();
std::cout << "g_i: " << g_i << "; in main()\n";
}
Possible output:
g_i: 0; in main()
g_i: 1; in thread #140487981209344
g_i: 2; in thread #140487972816640
g_i: 2; in main()
C++17 中添加了一个新特性,称为模板类参数推导,类似 std::lock_guard
这样简单的模板类型,其模板参数
列表可以省略:
std::lock_guard lock(g_i_mutex);
使用互斥量来保护数据,并不是在每一个成员函数中加入一个 std::lock_guard
对象那么简单,一个指针或引
用,也会让这种保护形同虚设。不过,检查指针或引用很容易,只要没有成员函数通过返回值或者输出参数的形式,向其调用者返回指向受保护数据的指针或引用,数据就是安全的。确保成员函数不会传出指针或引用的同时,检查成员函数是否通过指针或引用的方式来调用也是很重要的(尤其是这个操作不在你的控制下时),函数可能没在互斥量保护的区域内存储指针或引用,这样就很危险。更危险的是:将保护数据作为一个运行时参数。
class some_data {
private:
int a;
std::string b;
public:
void do_something();
};
class data_wrapper{
private:
some_data data;
std::mutex m;
public:
template<typename Function>
void process_data(Function func) {
std::lock_guard<std::mutex> l(m);
func(data); // 1 传递“保护”数据给用户函数
}
};
some_data* unprotected;
void malicious_function(some_data& protected_data) {
unprotected = &protected_data;
}
data_wrapper x;
void foo() {
x.process_data(malicious_function); // 2 传递一个恶意函数
unprotected->do_something(); // 3 在无保护的情况下访问保护数据
}
代码 1 处把需要受互斥量保护的变量 data 的引用作为参数传给了外部函数 malicious_function
,该函数保存了 data 的引用,最后代码 3 处访问 data 并没有受到互斥量的保护。
避免死锁的一般建议,就是让两个互斥量以相同的顺序上锁:总在互斥量 B 之前锁住互斥量 A,就永远不会死锁。
某些情况下是可以这样用,因为不同的互斥量用于不同的地方。不过,当有多个互斥量保护同一个类的独立实例时,一个操作对同一个类的两个不同实例进行数据的交换操作,为了保证数据交换操作的正确性,就要避免并发修改数据,并确保每个实例上的互斥量都能锁住自己要保护的区域。不过,选择一个固定的顺序(例如,实例提供的第一互斥量作为第一个参数,提供的第二个互斥量为第二个参数),可能会适得其反:在参数交换了之后,两个线程试图在相同的两个实例间进行数据交换时,程序又死锁了!
很幸运,C++ 标准库有办法解决这个问题,std::lock
—— 可以一次性锁住多个(两个以上)的互斥量,并且没有死锁风险。
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X {
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs) {
if(&lhs == &rhs) return;
std::lock(lhs.m, rhs.m); // 1
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock); // 2
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock); // 3
swap(lhs.some_detail, rhs.some_detail);
}
};
C++ 17 提供了 std::scoped_lock
,上面的代码可以改成这样:
void swap(X& lhs, X& rhs) {
if(&lhs==&rhs) return;
std::scoped_lock guard(lhs.m, rhs.m); // 1
swap(lhs.some_detail, rhs.some_detail);
}
-
避免嵌套锁
-
避免在持有锁时调用外部代码
因为代码是外部提供的,所以没有办法确定外部要做什么。外部程序可能做任何事情,包括获取锁。在持有锁的情况下,如果用外部代码要获取一个锁,就会违反第一个指导意见,并造成死锁(有时这是无法避免的)。
-
使用固定顺序获取锁
-
使用层次锁结构
这个建议需要对应用进行分层,并且识别在给定层上所有互斥量。当代码试图对互斥量上锁,而低层已持有该层锁时,不允许锁定。可以通过每个互斥量对应的层数,以及每个线程使用的互斥量,在运行时检查锁定操作是否可以 进行。
hierarchical_mutex high_level_mutex(10000); // 1
hierarchical_mutex low_level_mutex(5000); // 2
hierarchical_mutex other_mutex(6000); // 3
int do_low_level_stuff();
int low_level_func() {
std::lock_guard<hierarchical_mutex> lk(low_level_mutex); // 4
return do_low_level_stuff();
}
void high_level_stuff(int some_param);
void high_level_func() {
std::lock_guard<hierarchical_mutex> lk(high_level_mutex); // 6
high_level_stuff(low_level_func()); // 5
}
void thread_a() { // 7
high_level_func();
}
void do_other_stuff();
void other_stuff() {
high_level_func(); // 10
do_other_stuff();
}
void thread_b() { // 8
std::lock_guard<hierarchical_mutex> lk(other_mutex); // 9
other_stuff();
}
这段代码有三个 hierarchical_mutex
实例,其通过逐渐递减的层级进行构造。根据已经定义好的机制,如将一个 hierarchical_mutex
实例进行上锁,那么只能获取更低层级实例上的锁,这就会对代码进行一些限制。
- thread_a:
high_level_func()
调用low_level_func()
的同时,也持有high_level_mutex
上的锁,这没什么问题,因为high_level_mutex
(10000) 要比low_level_mutex
(5000) 更高级。 - thread_b:先锁住了
other_mutex
,这个互斥量的层级值只有 6000。这就意味着,中层级的数据已被保护。当other_stuff()
调用high_level_func()
时,就违反了层级结构,high_level_func()
试图获取high_level_mutex
,这个互斥量的层级值是 10000,要比当前层级值 6000 大很多。因此hierarchical_mutex
将会产生一个错误,可能会是抛出一个异常或直接终止程序。
层级互斥量不可能死锁,因为互斥量本身会严格遵循约定进行上锁,当多个互斥量在是在同一级上时,不能同时持有多个锁,所以“手递手”的方案需要每个互斥量在一条链上,并且每个互斥量都比前一个有更低的层级值,这在某些情况下无法实现。
一个简单的层级互斥量实现:
class hierarchical_mutex {
std::mutex internal_mutex;
unsigned long const hierarchy_value;
unsigned long previous_hierarchy_value;
static thread_local unsigned long this_thread_hierarchy_value; // 1
void check_for_hierarchy_violation() {
if(this_thread_hierarchy_value <= hierarchy_value) { // 2
throw std::logic_error(“mutex hierarchy violated”);
}
}
void update_hierarchy_value() {
previous_hierarchy_value = this_thread_hierarchy_value; // 3
this_thread_hierarchy_value = hierarchy_value;
}
public:
explicit hierarchical_mutex(unsigned long value):
hierarchy_value(value), previous_hierarchy_value(0) {}
void lock() {
check_for_hierarchy_violation();
internal_mutex.lock(); // 4
update_hierarchy_value(); // 5
}
void unlock() {
if(this_thread_hierarchy_value != hierarchy_value)
throw std::logic_error(“mutex hierarchy violated”); // 9
this_thread_hierarchy_value = previous_hierarchy_value; // 6
internal_mutex.unlock();
}
bool try_lock() {
check_for_hierarchy_violation();
if(!internal_mutex.try_lock()) // 7
return false;
update_hierarchy_value();
return true;
}
};
thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX); // 8
这里重点是使用了 thread_local 的值来代表当前线程的层级值(this_thread_hierarchy_value
),初始化为最大值,所以最初所有线程都能被锁住。因为声明中有 thread_local,所以每个线程都有其副本,这样线程中
变量状态完全独立,当从另一个线程进行读取时,变量的状态也完全独立。
std::unqiue_lock
使用起来更为自由,std::unique_lock
实例不会总与互斥量的数据类型相关,使用起来要比 std:lock_guard
更加灵活。首先,可将 std::adopt_lock
作为第二个参数传入构造函数,对互斥量进行管理。也可以将 std::defer_lock
作为第二个参数传递进去,表明互斥量应保持解锁状态。这样就可以让 std::unique_lock
对象(不是互斥量)的 lock()
所获取,或传递 std::unique_lock
对象到 std::lock()
中。
前面 std::lock 一节的代码可以改成这样:
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X {
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs) {
if(&lhs==&rhs) return;
std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock); // 1
std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock); // 1 std::defer_lock 留下未上锁的互斥量
std::lock(lock_a,lock_b); // 2 互斥量在这里上锁
swap(lhs.some_detail,rhs.some_detail);
}
};
代码长度相同,几乎等价,唯一不同的就是 std::unique_lock
会占用比较多的空间,并且比 std::lock_guard
稍慢一些。保证灵活性要付出代价,这个代价就是允许 std::unique_lock
实例不带互斥量:信息已存储,且已更新。
互斥量是一种通用的机制,但其并非保护共享数据的唯一方式,有很多方式可以在特定情况下,对共享数据提供合适的保护。 一个特别极端的情况就是,共享数据在并发访问和初始化时(都需要保护),需要进行隐式同步。这可能是因为数据作为只读方式创建,所以没有同步问题,或者因为必要的保护作为对数据操作的一部分。任何情况下,数据初始化后锁住一个互斥量,纯粹是为了保护其初始化过程,并且会给性能带来不必要的影响。
出于以上的原因,C++ 标准提供了一种纯粹保护共享数据初始化过程的机制。
延迟初始化(Lazy initialization)在单线程代码很常见——每一个操作都需要先对源进行检查,为了了解数据是 否被初始化,然后在其使用前决定,数据是否需要初始化:
std::shared_ptr<some_resource> resource_ptr;
void foo() {
if(!resource_ptr) {
resource_ptr.reset(new some_resource); // 1
}
resource_ptr->do_something();
}
转为多线程代码时,只有 1 处需要保护,这样共享数据对于并发访问就是安全的。但是下面天真的转换会使得线程资源产生不必要的序列化,为了确定数据源已经初始化,每个线程必须等待互斥量。
std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;
void foo() {
std::unique_lock<std::mutex> lk(resource_mutex); // 所有线程在此序列化
if(!resource_ptr) {
resource_ptr.reset(new some_resource); // 只有初始化过程需要保护
}
lk.unlock();
resource_ptr->do_something();
}
很多人能想出更好的一些的办法来做这件事,包括声名狼藉的“双重检查锁模式“
void undefined_behaviour_with_double_checked_locking() {
if(!resource_ptr) { // 1
std::lock_guard<std::mutex> lk(resource_mutex);
if(!resource_ptr) { // 2
resource_ptr.reset(new some_resource); // 3
}
}
resource_ptr->do_something(); // 4
}
这个模式为什么声名狼藉呢?因为有潜在的条件竞争。未被锁保护的读取操作 1 没有与其他线程里被锁保护的写入操作 3 进行同步,因此就会产生条件竞争,这个条件竞争不仅覆盖指针本身,还会影响到其指向的对象;即使一个线程知道另一个线程完成对指针进行写入,它可能没有看到新创建的 some_resource 实例,然后 do_something() 后,得到不正确的结果。这个例子是在一种典型的条件竞争 —— 数据竞争,C++ 标准中指定为“未定义行为”。
所以 C++ 标准库提供了 std::once_flag
和 std::call_once
来处理这种情况。
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag; // 1
void init_resource() {
resource_ptr.reset(new some_resource);
}
void foo() {
std::call_once(resource_flag, init_resource); // 可以完整的进行一次初始化
resource_ptr->do_something();
}
使用 std::call_once
作为类成员的延迟初始化(线程安全):
class X {
private:
connection_info connection_details;
connection_handle connection;
std::once_flag connection_init_flag;
void open_connection() {
connection = connection_manager.open(connection_details);
}
public:
X(connection_info const& connection_details_):
connection_details(connection_details_) {}
void send_data(data_packet const& data) { // 1
std::call_once(connection_init_flag, &X::open_connection,this); // 2
connection.send_data(data);
}
data_packet receive_data() { // 3
std::call_once(connection_init_flag, &X::open_connection,this); // 2
return connection.receive_data();
}
};
还有一种初始化过程中潜存着条件竞争:其中一个局部变量为 static 类型,这种变量的在声明后就已经完成初化。==对于多线程调用的函数,这就意味着这里有条件竞争——抢着去定义这个变量。很多在不支持 C++11 标准的编译器上,在实践过程中,这样的条件竞争是确实存在的。因为在多线程中,每个线程都认为他们是第一个初始化这个变量线程,或一个线程对变量进行初始化,而另外一个线程要使用这个变量时,初始化过程还没完成==。
==在 C++11 标准中,这些问题都被解决了:初始化及定义完全在一个线程中发生,并且没有其他线程可在初始化完成前对其进行处理,条件竞争终止于初始化阶段,这样比在之后再去处理好的多。==在只需要一个全局实例情况下,这里提供一个 std::call_once
的替代方案:
class my_class;
my_class& get_my_class_instance() {
static my_class instance; // 线程安全的初始化过程
return instance;
}
多线程可以安全的调用 get_my_class_instance()
函数,不用为数据竞争而担心。
C++17 标准库提供了两种非常好的互斥量 —— std::shared_mutex
和 std::shared_timed_mutex
。唯一的限制:当有线程拥有共享锁时,尝试获取独占锁的线程会被阻塞,直到所有其他线程放弃锁。当任一线程拥有一个独占锁时,其他线程就无法获得共享锁或独占锁,直到第一个线程放弃其拥有的锁。
#include <map>
#include <string>
#include <mutex>
#include <shared_mutex>
class dns_entry;
class dns_cache {
std::map<std::string, dns_entry> entries;
mutable std::shared_mutex entry_mutex;
public:
dns_entry find_entry(std::string const& domain) const {
std::shared_lock<std::shared_mutex> lk(entry_mutex); // 1
std::map<std::string, dns_entry>::const_iterator const it = entries.find(domain);
return (it == entries.end())?dns_entry():it->second;
}
void update_or_add_entry(std::string const& domain, dns_entry const& dns_details) {
std::lock_guard<std::shared_mutex> lk(entry_mutex); // 2
entries[domain] = dns_details;
}
};