Index: llvm/include/llvm/Support/TaskQueue.h =================================================================== --- llvm/include/llvm/Support/TaskQueue.h +++ llvm/include/llvm/Support/TaskQueue.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -35,9 +36,31 @@ using TaskFunctionTy = std::function; #if LLVM_ENABLE_THREADS - using TaskTy = std::packaged_task; + struct TaskBase { + virtual ~TaskBase() {} + virtual void execute() = 0; + }; + template struct TaskImpl : public TaskBase { + explicit TaskImpl(std::packaged_task Task) + : PackagedTask(std::move(Task)) {} + + void execute() override { PackagedTask(); } + std::packaged_task PackagedTask; + }; + using QueueItemTy = std::unique_ptr; #else - using TaskTy = std::shared_future; + struct TaskBase { + virtual ~TaskBase() {} + virtual void wait() = 0; + }; + template struct TaskImpl : public TaskBase { + TaskImpl(std::shared_future Future) : Future(Future) {} + + void wait() override { Future.wait(); } + + std::shared_future Future; + }; + using QueueItemTy = std::shared_ptr; #endif /// Construct a task queue with no work. @@ -50,7 +73,8 @@ /// used to wait for the task (and all previous tasks that have not yet /// completed) to finish. template - inline std::shared_future async(Function &&F, Args &&... ArgList) { + inline std::shared_future::type> + async(Function &&F, Args &&... ArgList) { auto Task = std::bind(std::forward(F), std::forward(ArgList)...); return asyncImpl(std::move(Task)); @@ -60,8 +84,57 @@ /// used to wait for the task (and all previous tasks that have not yet /// completed) to finish. template - inline std::shared_future async(Function &&F) { - return asyncImpl(std::forward(F)); + inline std::shared_future::type> + async(Function &&F) { + using ResultTy = typename std::result_of::type; + +#if LLVM_ENABLE_THREADS + /// Wrap the Task in a packaged_task to return a future object. + std::packaged_task PackagedTask(std::move(F)); + auto Future = PackagedTask.get_future(); + { + // Lock the queue and push the new task + std::unique_lock LockGuard(QueueLock); + + // Don't allow enqueueing after disabling the queue + assert(EnableFlag && "Queuing a thread during TaskQueue destruction"); + + Tasks.push( + llvm::make_unique>(std::move(PackagedTask))); + } + QueueCondition.notify_one(); + return Future.share(); +#else + std::shared_ptr ParentTask; + if (!Tasks.empty()) + ParentTask = Tasks.back(); + + // Create a wrapper for this task which first waits on the parent task (if + // there is one) and then executes this task. + auto WrappedTask = [this, ParentTask, F] { + if (ParentTask) + ParentTask->wait(); + + // If we're executing "this" task, then we must have necessarily already + // executed all tasks that come before. That means this task is the first + // task in the list, and we can pop it to effectively "drain" the queue. + // That way someone could queue up F1, F2, F3, wait out of order on F2, + // then call TaskQueue::wait() and the front of the queue would correctly + // contain the task for F3. + Tasks.pop(); + return F(); + }; + + // The only way to have a future which invokes the function when you call + // F.wait() is by calling std::async with launch::deferred. So we need this + // to support the case where the user waits on the returned future. + auto Future = + std::async(std::launch::deferred, std::move(WrappedTask)).share(); + + auto ThisTask = std::make_shared>(Future); + Tasks.push(ThisTask); + return Future; +#endif } /// Blocking wait for all work to complete. @@ -70,13 +143,8 @@ private: void WorkThreadFunc(); - /// Asynchronous submission of a task to the queue. The returned future can be - /// used to wait for the task (and all previous tasks that have not yet - /// completed) to finish. - std::shared_future asyncImpl(TaskFunctionTy F); - /// Tasks waiting for execution in the queue. - std::queue Tasks; + std::queue Tasks; #if LLVM_ENABLE_THREADS /// Signal for the destruction of the pool, asking thread to exit. Index: llvm/lib/Support/TaskQueue.cpp =================================================================== --- llvm/lib/Support/TaskQueue.cpp +++ llvm/lib/Support/TaskQueue.cpp @@ -25,7 +25,7 @@ void TaskQueue::WorkThreadFunc() { while (true) { - TaskTy TheTask; + std::unique_ptr TheTask; { std::unique_lock LockGuard(QueueLock); // Wait for tasks to be pushed in the queue @@ -44,8 +44,9 @@ Tasks.pop(); } - // Run the task we just grabbed and update the ids. - TheTask(); + // Run the task we just grabbed which should make all waiting futures become + // ready. + TheTask->execute(); { std::unique_lock LockGuard(CompletionLock); @@ -67,23 +68,6 @@ CompletionCondition.wait(LockGuard, [&] { return !Active && Tasks.empty(); }); } -std::shared_future TaskQueue::asyncImpl(TaskFunctionTy TheTask) { - /// Wrap the Task in a packaged_task to return a future object. - TaskTy PackagedTask(std::move(TheTask)); - auto Future = PackagedTask.get_future(); - { - // Lock the queue and push the new task - std::unique_lock LockGuard(QueueLock); - - // Don't allow enqueueing after disabling the pool - assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); - - Tasks.push(std::move(PackagedTask)); - } - QueueCondition.notify_one(); - return Future.share(); -} - // The destructor joins the work thread, waiting for completion. TaskQueue::~TaskQueue() { { @@ -101,26 +85,11 @@ void TaskQueue::wait() { // Sequential implementation running the tasks while (!Tasks.empty()) { - auto Task = Tasks.front(); - Task.wait(); - } -} - -std::shared_future TaskQueue::asyncImpl(TaskFunctionTy Task) { - std::shared_future ParentFuture; - if (!Tasks.empty()) - ParentFuture = Tasks.back(); + auto Task = std::move(Tasks.front()); - auto Future = std::async(std::launch::deferred, [this, ParentFuture, Task] { - if (ParentFuture.valid()) - ParentFuture.wait(); - - Task(); - Tasks.pop(); - }).share(); - - Tasks.push(Future); - return Future; + // This will pop from the queue as part of the internal wrapped task. + Task->wait(); + } } TaskQueue::~TaskQueue() { wait(); } Index: llvm/unittests/Support/TaskQueueTest.cpp =================================================================== --- llvm/unittests/Support/TaskQueueTest.cpp +++ llvm/unittests/Support/TaskQueueTest.cpp @@ -9,6 +9,8 @@ #include "llvm/Support/TaskQueue.h" +#include "llvm/Support/Error.h" + #include "gtest/gtest.h" using namespace llvm; @@ -180,3 +182,26 @@ ASSERT_EQ(1, Z); ASSERT_EQ(1, A); } + +TEST_F(TaskQueueTest, TasksWithReturnValues) { + TaskQueue TQ; + int I = 0; + std::string S; + char C = 0; + std::shared_future F1 = TQ.async([&]() { return I = 42; }); + std::shared_future F2 = TQ.async([&]() { return S = "Test"; }); + std::shared_future F3 = TQ.async([&]() { return C = '\n'; }); + + F3.wait(); + + // First check all the local variables, to make sure that an out-of-order call + // to get() also retrieved all other values. + EXPECT_EQ('\n', C); + EXPECT_EQ("Test", S); + EXPECT_EQ(42, I); + + // Then check all the returned values. + EXPECT_EQ('\n', F3.get()); + EXPECT_EQ("Test", F2.get()); + EXPECT_EQ(42, F1.get()); +}