0%

C++并发编程(三):同步

1 condition_variable:同步

上面的互斥锁只是在共享数据处执行保护操作,但是数据的同步,即线程对数据的操作的先后次序并不确定,当我们还想对线程同步时,必须采取一定的同步操作。条件变量是达到这个目的方法。

C++标准库对条件变量有两套实现:

  • std::condition_variablestd::condition_variable_any 。这两个实现都包含在<condition_variable>头文件的声明中。两者都需要与一个互斥量一起才能工作(互斥量是 为了同步)
    • 前者仅限于与std::mutex一起工作,
    • 而后者可以和任何满足最低标准的互斥量一起工作,从而加上了_any的后缀,因此从体积、性能,以及系统资源的使用方面产生额外的开销.
    • 所以 std::condition_variable 一般作为首选的类型,当对灵活性有硬性要求时,我们才会去考虑 std::condition_variable_any

在上面的例子中,10 个线程被同时唤醒,因此打印的时候是乱序的。值得注意的是 while(!ready),实际上,正常情况下,cv.wait 只会被调用一次,然后等待唤醒,因为线程在调用 wait() 之后就被阻塞了。但是通过一个 while 循环来判断全局标志位是否正确,这样可以防止被误唤醒,这也是条件变量中的常见写法。

1.1 接口概览

1.1.1 构造函数

std::condition_variable 的拷贝构造函数被禁用,只提供了默认构造函数。

1.1.2 wait操作

std::condition_variable 提供了两种 wait() 函数,一个是不带条件的,一个是可传入条件,通常为lambda表达式:

1
2
3
4
5
//无条件等待
void wait (unique_lock<mutex>& lck);
//有条件等待
template <class Predicate>
void wait (unique_lock<mutex>& lck, Predicate pred);
  • 当线程调用wait (unique_lock<mutex>& lck)时,当前线程会阻塞并释放当前获得的锁lck,以提醒其他线程可以获得这个自由锁了。因此对于wait (unique_lock<mutex>& lck)只要一调用就会阻塞,那么在外部必须给它价格条件判断,判断线程是否执行wait
  • wait (unique_lock<mutex>& lck, Predicate pred)是当pred返回false时线程会阻塞,即其自带了条件判断,我们只需传入即可。

  • 另外,当阻塞在wait的线程被唤醒时,会再次获得相应的锁。

注意wait()函数一定要搭配unique_lock类模板使用,而不是lock_guard。这是因为lock_guard在线程调用wait阻塞时,不会自动释放当前线程所获的的锁,这样就会导致死锁的发生。unique_lock`是一个灵活性的锁机制

mutexlock类似,std::condition_variable 也提供了相应的两种(带 Predicate 和不带Predicatewait_for() 函数,与 std::condition_variable::wait() 类似,不过 wait_for 可以指定一个时间段,在当前线程收到通知或者指定的时间超时之前,该线程都会处于阻塞状态。而一旦超时或者收到了其他线程的通知,wait_for 返回,剩下的处理步骤和 wait() 类似。还有 wait_util(),用法也类似

1.1.3 notify 操作
  • std::condition_variable::notify_one()

    唤醒某个等待(wait)线程。如果当前没有等待线程,则该函数什么也不做,如果同时存在多个等待线程,则唤醒某个线程是不确定的(unspecified)。

  • std::condition_variable::notify_all() 唤醒所有的等待(wait)线程。如果当前没有等待线程,则该函数什么也不做。

1.2 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
std::mutex mut;
std::queue<data_chunk> data_queue; // 1
std::condition_variable data_cond;
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data); // 2
data_cond.notify_one(); // 3
}
}
void data_processing_thread()
{
while(true)
{
std::unique_lock<std::mutex> lk(mut); // 4
data_cond.wait(
lk,[]{return !data_queue.empty();}); // 5
data_chunk data=data_queue.front();
data_queue.pop();
lk.unlock(); // 6
process(data);
if(is_last_chunk(data))
break;
}
}

1.3 condition_variable_any 介绍

std::condition_variable 类似,只不过 std::condition_variable_anywait 函数可以接受任何 lockable 参数,而 std::condition_variable 只能接受 std::unique_lock<std::mutex> 类型的参数,除此以外,和 std::condition_variable 几乎完全一样。

1.4 生成者消费者模型

一般来说,生产者消费者模型可以通过 queuemutexcondition_variable 来实现。下面是一个简单实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include <iostream>
#include <mutex>
#include <queue>
#include <condition_cariable>
#include <chrono>
#include <thread>
#include <atomic>

int main()
{
std::queue<int> production;
std::mutex mtx;
std::condition_variable cv;
bool ready = false; // 是否有产品可供消费
bool done = false; // 生产结束

std::thread producer(
[&] () -> void {
for (int i = 1; i < 10; ++i)
{
// 模拟实际生产过程
std::this_thread ::sleep_for(std::chrono::milliseconds(10));
std::cout << "producing " << i << std::endl;

std::unique_lock<std::mutex> lock(mtx);
production.push(i);

// 有产品可以消费了
ready = true;
cv.notify_one();
}
// 生产结束了
done = true;
}
);

std::thread consumer(
[&] () -> void {
std::unique_lock<std::mutex> lock(mtx);
// 如果生成没有结束或者队列中还有产品没有消费,则继续消费,否则结束消费
while(!done || !production.empty())
{
// 防止误唤醒
while(!ready)
{
cv.wait(lock);
}

while(!production.empty())
{
// 模拟消费过程
std::cout << "consuming " << production.front() << std::endl;
production.pop();
}

// 没有产品了
ready = false;
}
}
);

producer.join();
consumer.join();

return 0;
}