异步并发

include <future> 使用 std::future .get() 异步返回结果

  1. std::async
    1
    2
    3
    std::future<int> fut = std::async(std::launch::async, function);
    // …干别的事…
    std::cout << fut.get();
  2. std::packaged_task 封装函数
    1
    2
    3
    4
    std::packaged_task<int(int)> task(function);
    std::future<int> fut = task.get_future();
    std::thread(std::move(task), arg);
    std::cout << fut.get();
  3. std::promise
    1
    2
    3
    4
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();
    std::thread(function, std::move(prom)).join();
    std::cout << fut.get();

Note

std::promisestd::packaged_task 一样都要用 std::future = .get_future()

线程池异步获取任务结果,若传入 void 函数,则 .get() 无返回值,仅等待执行完毕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <vector>
#include <functional>
#include <atomic>
#include <mutex>
#include <future>

class ThreadPool {
private:
std::mutex mtx;
std::condition_variable condition;
std::queue<std::function<void()>> tasks;
std::atomic<bool> stop;
std::vector<std::thread> threads;
//单例模式
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(int num_threads);

public:
static ThreadPool& SetThreadPool(int num_threads) {
static ThreadPool threadpool(num_threads); //局部静态变量,懒汉单例,第一次调用才实例化
return threadpool;
}

~ThreadPool() {
{
std::lock_guard<std::mutex> lock(mtx);
stop = true;
}
condition.notify_all();
for (auto& t : threads) {
t.join();
}
}
template<typename F, typename... Args> auto enqueue(F&& f, Args&&... args) -> std::future<decltype(f(std::forward<Args>(args)...))> {
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(std::forward<Args>(args)...))()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<decltype(f(std::forward<Args>(args)...))> future = task_ptr->get_future();
if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
{
std::lock_guard<std::mutex> lock(mtx);
tasks.emplace([task_ptr]() {(*task_ptr)(); }); //lambda 封装成 void() 函数
}
condition.notify_one();
return future;
}
};

ThreadPool::ThreadPool(int num_threads) : stop{ false } {
for (int i = 0; i < num_threads; i++) {
threads.emplace_back([this]() {
while (true)
{
std::unique_lock<std::mutex> lock(mtx);
condition.wait(lock, [this]() { return !tasks.empty() || stop; });
if (stop && tasks.empty()) return;
std::function<void()> task(std::move(tasks.front()));
tasks.pop();
lock.unlock();
task();
}
});
}
}

int main() {
ThreadPool& threadpool = ThreadPool::SetThreadPool(std::thread::hardware_concurrency());
std::vector<std::future<int>> futures;
for (int i = 0; i < 10; ++i) {
futures.emplace_back(threadpool.enqueue([i] { return i * i; }));
}
std::cout << "你好" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
for (auto& fut : futures) {
std::cout << fut.get() << '\n';
}

return 0;
}

Tip

C++17 后使用 std::invoke_result_t<F, Args...> 代替 decltype(f(std::forward<Args>(args)...)) 。可以先 decay(移除引用、顶层 cv),如果 f 返回 int& 会推导为 int , std::packaged_task<int()> 可编译,而 decltype 推导为 int&std::packaged_task<int&()> 无法编译。

1
2
3
4
5
6
7
8
9
10
11
template<typename F, typename... Args> auto enqueue(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>> {
using typeName = std::invoke_result_t<F, Args...>;
auto task_ptr = std::make_shared<std::packaged_task<typeName()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<typeName> future = task_ptr->get_future();
if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
{
std::lock_guard<std::mutex> lock(mtx);
tasks.emplace([task_ptr]() {(*task_ptr)(); }); //lambda 封装成 void() 函数
}
condition.notify_one();
return future; }