异步并发
include <future> 使用 std::future .get() 异步返回结果
std::async1 2 3
| std::future<int> fut = std::async(std::launch::async, function);
std::cout << fut.get();
|
std::packaged_task 封装函数1 2 3 4
| std::packaged_task<int(int)> task(function); std::future<int> fut = task.get_future(); std::thread(std::move(task), arg); std::cout << fut.get();
|
std::promise1 2 3 4
| std::promise<int> prom; std::future<int> fut = prom.get_future(); std::thread(function, std::move(prom)).join(); std::cout << fut.get();
|
Note
std::promise 和 std::packaged_task 一样都要用 std::future = .get_future()
线程池异步获取任务结果,若传入 void 函数,则 .get() 无返回值,仅等待执行完毕。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| #include <iostream> #include <thread> #include <condition_variable> #include <queue> #include <vector> #include <functional> #include <atomic> #include <mutex> #include <future>
class ThreadPool { private: std::mutex mtx; std::condition_variable condition; std::queue<std::function<void()>> tasks; std::atomic<bool> stop; std::vector<std::thread> threads; ThreadPool(const ThreadPool&) = delete; ThreadPool& operator=(const ThreadPool&) = delete; ThreadPool(int num_threads);
public: static ThreadPool& SetThreadPool(int num_threads) { static ThreadPool threadpool(num_threads); return threadpool; }
~ThreadPool() { { std::lock_guard<std::mutex> lock(mtx); stop = true; } condition.notify_all(); for (auto& t : threads) { t.join(); } } template<typename F, typename... Args> auto enqueue(F&& f, Args&&... args) -> std::future<decltype(f(std::forward<Args>(args)...))> { auto task_ptr = std::make_shared<std::packaged_task<decltype(f(std::forward<Args>(args)...))()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); std::future<decltype(f(std::forward<Args>(args)...))> future = task_ptr->get_future(); if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); { std::lock_guard<std::mutex> lock(mtx); tasks.emplace([task_ptr]() {(*task_ptr)(); }); } condition.notify_one(); return future; } };
ThreadPool::ThreadPool(int num_threads) : stop{ false } { for (int i = 0; i < num_threads; i++) { threads.emplace_back([this]() { while (true) { std::unique_lock<std::mutex> lock(mtx); condition.wait(lock, [this]() { return !tasks.empty() || stop; }); if (stop && tasks.empty()) return; std::function<void()> task(std::move(tasks.front())); tasks.pop(); lock.unlock(); task(); } }); } }
int main() { ThreadPool& threadpool = ThreadPool::SetThreadPool(std::thread::hardware_concurrency()); std::vector<std::future<int>> futures; for (int i = 0; i < 10; ++i) { futures.emplace_back(threadpool.enqueue([i] { return i * i; })); } std::cout << "你好" << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(2000)); for (auto& fut : futures) { std::cout << fut.get() << '\n'; }
return 0; }
|
Tip
C++17 后使用 std::invoke_result_t<F, Args...> 代替 decltype(f(std::forward<Args>(args)...)) 。可以先 decay(移除引用、顶层 cv),如果 f 返回 int& 会推导为 int , std::packaged_task<int()> 可编译,而 decltype 推导为 int& ,std::packaged_task<int&()> 无法编译。
1 2 3 4 5 6 7 8 9 10 11
| template<typename F, typename... Args> auto enqueue(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>> { using typeName = std::invoke_result_t<F, Args...>; auto task_ptr = std::make_shared<std::packaged_task<typeName()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); std::future<typeName> future = task_ptr->get_future(); if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); { std::lock_guard<std::mutex> lock(mtx); tasks.emplace([task_ptr]() {(*task_ptr)(); }); } condition.notify_one(); return future; }
|