diff --git a/llvm/include/llvm/Support/ThreadPool.h b/llvm/include/llvm/Support/ThreadPool.h --- a/llvm/include/llvm/Support/ThreadPool.h +++ b/llvm/include/llvm/Support/ThreadPool.h @@ -36,7 +36,6 @@ /// for some work to become available. class ThreadPool { public: - using TaskTy = std::function; using PackagedTaskTy = std::packaged_task; /// Construct a pool using the hardware strategy \p S for mapping hardware @@ -51,17 +50,17 @@ /// Asynchronous submission of a task to the pool. The returned future can be /// used to wait for the task to finish and is *non-blocking* on destruction. template - inline std::shared_future async(Function &&F, Args &&... ArgList) { + inline auto async(Function &&F, Args &&...ArgList) { auto Task = std::bind(std::forward(F), std::forward(ArgList)...); - return asyncImpl(std::move(Task)); + return async(std::move(Task)); } /// Asynchronous submission of a task to the pool. The returned future can be /// used to wait for the task to finish and is *non-blocking* on destruction. - template - inline std::shared_future async(Function &&F) { - return asyncImpl(std::forward(F)); + template + auto async(Func &&F) -> std::shared_future { + return asyncImpl(std::function(std::forward(F))); } /// Blocking wait for all the threads to complete and the queue to be empty. @@ -78,7 +77,35 @@ /// Asynchronous submission of a task to the pool. The returned future can be /// used to wait for the task to finish and is *non-blocking* on destruction. - std::shared_future asyncImpl(TaskTy F); + template + std::shared_future asyncImpl(std::function Task) { + +#if LLVM_ENABLE_THREADS + /// Wrap the Task in a packaged_task to return a future object. + std::packaged_task PackagedTask(std::move(Task)); + auto Future = PackagedTask.get_future(); + { + // Lock the queue and push the new task + std::unique_lock LockGuard(QueueLock); + + // Don't allow enqueueing after disabling the pool + assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); + + Tasks.push(PackagedTaskTy(std::move(PackagedTask))); + } + QueueCondition.notify_one(); + +#else // LLVM_ENABLE_THREADS Disabled + + // Get a Future with launch::deferred execution using std::async + auto Future = std::async(std::launch::deferred, std::move(Task)).share(); + // Wrap the future so that both ThreadPool::wait() can operate and the + // returned future can be sync'ed on. + PackagedTaskTy PackagedTask([Future]() { Future.get(); }); + Tasks.push(PackagedTaskTy(std::move(PackagedTask))); +#endif + return Future; + } /// Threads in flight std::vector Threads; diff --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp --- a/llvm/lib/Support/ThreadPool.cpp +++ b/llvm/lib/Support/ThreadPool.cpp @@ -80,23 +80,6 @@ return false; } -std::shared_future ThreadPool::asyncImpl(TaskTy Task) { - /// Wrap the Task in a packaged_task to return a future object. - PackagedTaskTy PackagedTask(std::move(Task)); - auto Future = PackagedTask.get_future(); - { - // Lock the queue and push the new task - std::unique_lock LockGuard(QueueLock); - - // Don't allow enqueueing after disabling the pool - assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); - - Tasks.push(std::move(PackagedTask)); - } - QueueCondition.notify_one(); - return Future.share(); -} - // The destructor joins all threads, waiting for completion. ThreadPool::~ThreadPool() { { @@ -128,15 +111,7 @@ } } -std::shared_future ThreadPool::asyncImpl(TaskTy Task) { - // Get a Future with launch::deferred execution using std::async - auto Future = std::async(std::launch::deferred, std::move(Task)).share(); - // Wrap the future so that both ThreadPool::wait() can operate and the - // returned future can be sync'ed on. - PackagedTaskTy PackagedTask([Future]() { Future.get(); }); - Tasks.push(std::move(PackagedTask)); - return Future; -} +std::shared_future ThreadPool::asyncImpl(TaskTy Task) {} ThreadPool::~ThreadPool() { wait(); } diff --git a/llvm/unittests/Support/ThreadPool.cpp b/llvm/unittests/Support/ThreadPool.cpp --- a/llvm/unittests/Support/ThreadPool.cpp +++ b/llvm/unittests/Support/ThreadPool.cpp @@ -151,6 +151,31 @@ ASSERT_EQ(2, i.load()); } +TEST_F(ThreadPoolTest, GetFutureWithResult) { + CHECK_UNSUPPORTED(); + ThreadPool Pool(hardware_concurrency(2)); + auto F1 = Pool.async([] { return 1; }); + auto F2 = Pool.async([] { return 2; }); + + setMainThreadReady(); + Pool.wait(); + ASSERT_EQ(1, F1.get()); + ASSERT_EQ(2, F2.get()); +} + +TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) { + CHECK_UNSUPPORTED(); + ThreadPool Pool(hardware_concurrency(2)); + auto Fn = [](int x) { return x; }; + auto F1 = Pool.async(Fn, 1); + auto F2 = Pool.async(Fn, 2); + + setMainThreadReady(); + Pool.wait(); + ASSERT_EQ(1, F1.get()); + ASSERT_EQ(2, F2.get()); +} + TEST_F(ThreadPoolTest, PoolDestruction) { CHECK_UNSUPPORTED(); // Test that we are waiting on destruction