Note

condition_variable.wait(mutex, bool_condition)

  1. 调用前 mutex 已加锁。
  2. 检查参数是否为 ture
  3. 是则往下执行;否则先解锁并等待,至 notify_one/all() 唤醒,重新加锁并检查条件参数,循环判断直至 ture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>

std::mutex e_mutex;
std::condition_variable e_cv;
std::queue<int> e_queue;
std::atomic<bool> done = false; // 原子变量免加解锁提高性能

void Producer() {
for (int i = 0; i < 10; i++) {
{
std::unique_lock<std::mutex> lock(e_mutex);
e_queue.push(i);
std::cout << "Producer: produced " << i << std::endl;
}
e_cv.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(30));
}
{ // 免锁
done.store(true);
std::cout << "Works are done" << std::endl;
}
e_cv.notify_all();
}

void Consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(e_mutex);
e_cv.wait(lock, []() { return !e_queue.empty() || done.load(); });
if (!e_queue.empty()) {
int value = e_queue.front();
e_queue.pop();
std::cout << "Consumer" << id << ": consumed " << value << std::endl;
}
else if (done.load()) {
break;
}
}
}

int main() {
std::thread producer_thread(Producer);
std::thread consumer_thread1(Consumer, 1);
std::thread consumer_thread2(Consumer, 2);
producer_thread.join();
consumer_thread1.join();
consumer_thread2.join();
return 0;
}

Note

std::atomic<T> 用来在多线程环境中安全地读写共享变量,保证所有操作都是原子的,不需要加锁。其中 .store()= 语义一致,= 已重载为 store() 是原子操作,但使用 .store() / .load() 更专业。

Note

原子性:一个操作或全部完成,或完全不做,中间不能被打断或看到不完整的中间状态。是线程安全的必要不充分条件。

代码优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <atomic>
#include <vector>

std::mutex e_mutex;
std::condition_variable e_cv;
std::queue<int> e_queue;
std::atomic<bool> done = false;

void Producer(int count) {
for (int i = 0; i < count; i++) {
{
std::unique_lock<std::mutex> lock(e_mutex);
e_queue.push(i);
std::cout << "Producer: produced " << i << std::endl;
}
e_cv.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
done.store(true);
e_cv.notify_all();
}

void Consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(e_mutex);
e_cv.wait(lock, []() { return !e_queue.empty() || done.load(); });
if (!e_queue.empty()) {
int value = e_queue.front();
e_queue.pop();
std::cout << "Consumer " << id << ": consumed " << value << std::endl;
} else if (done.load()) {
break;
}
}
std::cout << "Consumer " << id << " exiting.\n";
}

int main() {
const int PRODUCE_COUNT = 20;
const int CONSUMER_COUNT = 3;

std::thread producer_thread(Producer, PRODUCE_COUNT);

std::vector<std::thread> consumers;
for (int i = 0; i < CONSUMER_COUNT; ++i) {
consumers.emplace_back(Consumer, i);
}

producer_thread.join();
for (auto& t : consumers) t.join();

return 0;
}