diff --git a/llvm/include/llvm/Support/ThreadPool.h b/llvm/include/llvm/Support/ThreadPool.h --- a/llvm/include/llvm/Support/ThreadPool.h +++ b/llvm/include/llvm/Support/ThreadPool.h @@ -36,9 +36,6 @@ /// for some work to become available. class ThreadPool { public: - using TaskTy = std::function; - using PackagedTaskTy = std::packaged_task; - /// Construct a pool using the hardware strategy \p S for mapping hardware /// execution resources (threads, cores, CPUs) /// Defaults to using the maximum execution resources in the system, but @@ -51,17 +48,17 @@ /// Asynchronous submission of a task to the pool. The returned future can be /// used to wait for the task to finish and is *non-blocking* on destruction. template - inline std::shared_future async(Function &&F, Args &&... ArgList) { + inline auto async(Function &&F, Args &&...ArgList) { auto Task = std::bind(std::forward(F), std::forward(ArgList)...); - return asyncImpl(std::move(Task)); + return async(std::move(Task)); } /// Asynchronous submission of a task to the pool. The returned future can be /// used to wait for the task to finish and is *non-blocking* on destruction. - template - inline std::shared_future async(Function &&F) { - return asyncImpl(std::forward(F)); + template + auto async(Func &&F) -> std::shared_future { + return asyncImpl(std::function(std::forward(F))); } /// Blocking wait for all the threads to complete and the queue to be empty. @@ -74,17 +71,70 @@ bool isWorkerThread() const; private: + /// Helpers to create a promise and a callable wrapper of \p Task that sets + /// the result of the promise. Returns the callable and a future to access the + /// result. + template + static std::pair, std::future> + createTaskAndFuture(std::function Task) { + std::shared_ptr> Promise = + std::make_shared>(); + auto F = Promise->get_future(); + return { + [Promise = std::move(Promise), Task]() { Promise->set_value(Task()); }, + std::move(F)}; + } + static std::pair, std::future> + createTaskAndFuture(std::function Task) { + std::shared_ptr> Promise = + std::make_shared>(); + auto F = Promise->get_future(); + return {[Promise = std::move(Promise), Task]() { + Task(); + Promise->set_value(); + }, + std::move(F)}; + } + bool workCompletedUnlocked() { return !ActiveThreads && Tasks.empty(); } /// Asynchronous submission of a task to the pool. The returned future can be /// used to wait for the task to finish and is *non-blocking* on destruction. - std::shared_future asyncImpl(TaskTy F); + template + std::shared_future asyncImpl(std::function Task) { + +#if LLVM_ENABLE_THREADS + /// Wrap the Task in a std::function that sets the result of the + /// corresponding future. + auto R = createTaskAndFuture(Task); + + { + // Lock the queue and push the new task + std::unique_lock LockGuard(QueueLock); + + // Don't allow enqueueing after disabling the pool + assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); + Tasks.push(std::move(R.first)); + } + QueueCondition.notify_one(); + return R.second.share(); + +#else // LLVM_ENABLE_THREADS Disabled + + // Get a Future with launch::deferred execution using std::async + auto Future = std::async(std::launch::deferred, std::move(Task)).share(); + // Wrap the future so that both ThreadPool::wait() can operate and the + // returned future can be sync'ed on. + Tasks.push([Future]() { Future.get(); }); + return Future; +#endif + } /// Threads in flight std::vector Threads; /// Tasks waiting for execution in the pool. - std::queue Tasks; + std::queue> Tasks; /// Locking and signaling for accessing the Tasks queue. std::mutex QueueLock; diff --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp --- a/llvm/lib/Support/ThreadPool.cpp +++ b/llvm/lib/Support/ThreadPool.cpp @@ -29,7 +29,7 @@ Threads.emplace_back([S, ThreadID, this] { S.apply_thread_strategy(ThreadID); while (true) { - PackagedTaskTy Task; + std::function Task; { std::unique_lock LockGuard(QueueLock); // Wait for tasks to be pushed in the queue @@ -80,23 +80,6 @@ return false; } -std::shared_future ThreadPool::asyncImpl(TaskTy Task) { - /// Wrap the Task in a packaged_task to return a future object. - PackagedTaskTy PackagedTask(std::move(Task)); - auto Future = PackagedTask.get_future(); - { - // Lock the queue and push the new task - std::unique_lock LockGuard(QueueLock); - - // Don't allow enqueueing after disabling the pool - assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); - - Tasks.push(std::move(PackagedTask)); - } - QueueCondition.notify_one(); - return Future.share(); -} - // The destructor joins all threads, waiting for completion. ThreadPool::~ThreadPool() { { @@ -128,16 +111,6 @@ } } -std::shared_future ThreadPool::asyncImpl(TaskTy Task) { - // Get a Future with launch::deferred execution using std::async - auto Future = std::async(std::launch::deferred, std::move(Task)).share(); - // Wrap the future so that both ThreadPool::wait() can operate and the - // returned future can be sync'ed on. - PackagedTaskTy PackagedTask([Future]() { Future.get(); }); - Tasks.push(std::move(PackagedTask)); - return Future; -} - ThreadPool::~ThreadPool() { wait(); } #endif diff --git a/llvm/unittests/Support/ThreadPool.cpp b/llvm/unittests/Support/ThreadPool.cpp --- a/llvm/unittests/Support/ThreadPool.cpp +++ b/llvm/unittests/Support/ThreadPool.cpp @@ -151,6 +151,31 @@ ASSERT_EQ(2, i.load()); } +TEST_F(ThreadPoolTest, GetFutureWithResult) { + CHECK_UNSUPPORTED(); + ThreadPool Pool(hardware_concurrency(2)); + auto F1 = Pool.async([] { return 1; }); + auto F2 = Pool.async([] { return 2; }); + + setMainThreadReady(); + Pool.wait(); + ASSERT_EQ(1, F1.get()); + ASSERT_EQ(2, F2.get()); +} + +TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) { + CHECK_UNSUPPORTED(); + ThreadPool Pool(hardware_concurrency(2)); + auto Fn = [](int x) { return x; }; + auto F1 = Pool.async(Fn, 1); + auto F2 = Pool.async(Fn, 2); + + setMainThreadReady(); + Pool.wait(); + ASSERT_EQ(1, F1.get()); + ASSERT_EQ(2, F2.get()); +} + TEST_F(ThreadPoolTest, PoolDestruction) { CHECK_UNSUPPORTED(); // Test that we are waiting on destruction