Index: llvm/trunk/include/llvm/Support/TaskQueue.h =================================================================== --- llvm/trunk/include/llvm/Support/TaskQueue.h +++ llvm/trunk/include/llvm/Support/TaskQueue.h @@ -0,0 +1,137 @@ +//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// +// This file defines a crude C++11 based task queue. +// +//===----------------------------------------------------------------------===// + +#ifndef LLVM_SUPPORT_TASK_QUEUE_H +#define LLVM_SUPPORT_TASK_QUEUE_H + +#include "llvm/Config/llvm-config.h" +#include "llvm/Support/ThreadPool.h" +#include "llvm/Support/thread.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace llvm { +/// TaskQueue executes serialized work on a user-defined Thread Pool. It +/// guarantees that if task B is enqueued after task A, task B begins after +/// task A completes and there is no overlap between the two. +class TaskQueue { + // Because we don't have init capture to use move-only local variables that + // are captured into a lambda, we create the promise inside an explicit + // callable struct. We want to do as much of the wrapping in the + // type-specialized domain (before type erasure) and then erase this into a + // std::function. + template struct Task { + using ResultTy = typename std::result_of::type; + explicit Task(Callable C, TaskQueue &Parent) + : C(std::move(C)), P(std::make_shared>()), + Parent(&Parent) {} + + template void invokeCallbackAndSetPromise() { + P->set_value(C()); + } + + template <> void invokeCallbackAndSetPromise() { + C(); + P->set_value(); + } + + void operator()() noexcept { + invokeCallbackAndSetPromise(); + Parent->completeTask(); + } + + Callable C; + std::shared_ptr> P; + TaskQueue *Parent; + }; + +public: + /// Construct a task queue with no work. + TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; } + + /// Blocking destructor: the queue will wait for all work to complete. + ~TaskQueue() { + Scheduler.wait(); + assert(Tasks.empty()); + } + + /// Asynchronous submission of a task to the queue. The returned future can be + /// used to wait for the task (and all previous tasks that have not yet + /// completed) to finish. + template + std::future::type> async(Callable &&C) { +#if !LLVM_ENABLE_THREADS + static_assert(false, + "TaskQueue requires building with LLVM_ENABLE_THREADS!"); +#endif + Task T{std::move(C), *this}; + using ResultTy = typename std::result_of::type; + std::future F = T.P->get_future(); + { + std::lock_guard Lock(QueueLock); + // If there's already a task in flight, just queue this one up. If + // there is not a task in flight, bypass the queue and schedule this + // task immediately. + if (IsTaskInFlight) + Tasks.push_back(std::move(T)); + else { + Scheduler.async(std::move(T)); + IsTaskInFlight = true; + } + } + return std::move(F); + } + +private: + void completeTask() { + // We just completed a task. If there are no more tasks in the queue, + // update IsTaskInFlight to false and stop doing work. Otherwise + // schedule the next task (while not holding the lock). + std::function Continuation; + { + std::lock_guard Lock(QueueLock); + if (Tasks.empty()) { + IsTaskInFlight = false; + return; + } + + Continuation = std::move(Tasks.front()); + Tasks.pop_front(); + } + Scheduler.async(std::move(Continuation)); + } + + /// The thread pool on which to run the work. + ThreadPool &Scheduler; + + /// State which indicates whether the queue currently is currently processing + /// any work. + bool IsTaskInFlight = false; + + /// Mutex for synchronizing access to the Tasks array. + std::mutex QueueLock; + + /// Tasks waiting for execution in the queue. + std::deque> Tasks; +}; +} // namespace llvm + +#endif // LLVM_SUPPORT_TASK_QUEUE_H Index: llvm/trunk/unittests/Support/CMakeLists.txt =================================================================== --- llvm/trunk/unittests/Support/CMakeLists.txt +++ llvm/trunk/unittests/Support/CMakeLists.txt @@ -53,6 +53,7 @@ SwapByteOrderTest.cpp TarWriterTest.cpp TargetParserTest.cpp + TaskQueueTest.cpp ThreadLocalTest.cpp ThreadPool.cpp Threading.cpp Index: llvm/trunk/unittests/Support/TaskQueueTest.cpp =================================================================== --- llvm/trunk/unittests/Support/TaskQueueTest.cpp +++ llvm/trunk/unittests/Support/TaskQueueTest.cpp @@ -0,0 +1,105 @@ +//========- unittests/Support/TaskQueue.cpp - TaskQueue.h tests ------========// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/TaskQueue.h" + +#include "gtest/gtest.h" + +using namespace llvm; + +#if LLVM_ENABLE_THREADS +class TaskQueueTest : public testing::Test { +protected: + TaskQueueTest() {} +}; + +TEST_F(TaskQueueTest, OrderedFutures) { + ThreadPool TP(1); + TaskQueue TQ(TP); + std::atomic X = 0; + std::atomic Y = 0; + std::atomic Z = 0; + + std::mutex M1, M2, M3; + std::unique_lock L1(M1); + std::unique_lock L2(M2); + std::unique_lock L3(M3); + + std::future F1 = TQ.async([&] { + std::unique_lock Lock(M1); + ++X; + }); + std::future F2 = TQ.async([&] { + std::unique_lock Lock(M2); + ++Y; + }); + std::future F3 = TQ.async([&] { + std::unique_lock Lock(M3); + ++Z; + }); + + L1.unlock(); + F1.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(0, Y); + ASSERT_EQ(0, Z); + + L2.unlock(); + F2.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); + ASSERT_EQ(0, Z); + + L3.unlock(); + F3.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); + ASSERT_EQ(1, Z); +} + +TEST_F(TaskQueueTest, UnOrderedFutures) { + ThreadPool TP(1); + TaskQueue TQ(TP); + std::atomic X = 0; + std::atomic Y = 0; + std::atomic Z = 0; + std::mutex M; + + std::unique_lock Lock(M); + + std::future F1 = TQ.async([&] { ++X; }); + std::future F2 = TQ.async([&] { ++Y; }); + std::future F3 = TQ.async([&M, &Z] { + std::unique_lock Lock(M); + ++Z; + }); + + F2.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); + ASSERT_EQ(0, Z); + + Lock.unlock(); + + F3.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); + ASSERT_EQ(1, Z); +} + +TEST_F(TaskQueueTest, FutureWithReturnValue) { + ThreadPool TP(1); + TaskQueue TQ(TP); + std::future F1 = TQ.async([&] { return std::string("Hello"); }); + std::future F2 = TQ.async([&] { return 42; }); + + ASSERT_EQ(42, F2.get()); + ASSERT_EQ("Hello", F1.get()); +} +#endif