Index: llvm/include/llvm/Support/TaskQueue.h =================================================================== --- /dev/null +++ llvm/include/llvm/Support/TaskQueue.h @@ -0,0 +1,167 @@ +//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// +// This file defines a crude C++11 based task queue. +// +//===----------------------------------------------------------------------===// + +#ifndef LLVM_SUPPORT_TASK_QUEUE_H +#define LLVM_SUPPORT_TASK_QUEUE_H + +#include "llvm/Config/llvm-config.h" +#include "llvm/Support/ThreadPool.h" +#include "llvm/Support/thread.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace llvm { +/// TaskQueue executes serialized work on a user-defined Thread Pool. It +/// guarantees that if task B is enqueued after task A, task B begins after +/// task A completes and there is no overlap between the two. +#if LLVM_ENABLE_THREADS +class TaskQueue { + // Because we don't have init capture to use move-only local variables that + // are captured into a lambda, we create the promise inside an explicit + // callable struct. We want to do as much of the wrapping in the + // type-specialized domain (before type erasure) and then erase this into a + // std::function. + template struct Task { + explicit Task(Callable C, TaskQueue &Parent) + : C(std::move(C)), P(std::make_shared>()), + Parent(&Parent) {} + + void operator()() noexcept { + C(); + P->set_value(); + + Parent->completeTask(); + } + + Callable C; + std::shared_ptr> P; + TaskQueue *Parent; + }; + +public: + /// Construct a task queue with no work. + TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; } + + /// Blocking destructor: the queue will wait for all work to complete. + ~TaskQueue() { + Scheduler.wait(); + assert(Tasks.empty()); + } + + /// Asynchronous submission of a task to the queue. The returned future can be + /// used to wait for the task (and all previous tasks that have not yet + /// completed) to finish. + template std::future async(Callable &&C) { + Task T{std::move(C), *this}; + + std::future F = T.P->get_future(); + { + std::lock_guard Lock(QueueLock); + // If there's already a task in flight, just queue this one up. If + // there is not a task in flight, bypass the queue and schedule this + // task immediately. + if (IsTaskInFlight) + Tasks.push_back(std::move(T)); + else { + Scheduler.async(std::move(T)); + IsTaskInFlight = true; + } + } + return std::move(F); + } + +private: + void completeTask() { + // We just completed a task. If there are no more tasks in the queue, + // update IsTaskInFlight to false and stop doing work. Otherwise + // schedule the next task (while not holding the lock). + std::function Continuation; + { + std::lock_guard Lock(QueueLock); + if (Tasks.empty()) { + IsTaskInFlight = false; + return; + } + + Continuation = std::move(Tasks.front()); + Tasks.pop_front(); + } + Scheduler.async(std::move(Continuation)); + } + + /// The thread pool on which to run the work. + ThreadPool &Scheduler; + + /// State which indicates whether the queue currently is currently processing + /// any work. + bool IsTaskInFlight = false; + + /// Mutex for synchronizing access to the Tasks array. + std::mutex QueueLock; + + /// Tasks waiting for execution in the queue. + std::deque> Tasks; +}; +#else +class TaskQueue { +public: + /// Construct a task queue with no work. + TaskQueue(ThreadPool &Scheduler) { (void)Scheduler; } + + /// Blocking destructor: the queue will wait for all work to complete. + ~TaskQueue() { + for (auto &F : Tasks) { + F(); + } + } + + /// Asynchronous submission of a task to the queue. The returned future can be + /// used to wait for the task (and all previous tasks that have not yet + /// completed) to finish. + template std::future async(Callable &&C) { + Tasks.push_back(std::move(C)); + + uint64_t Id = NextTaskId++; + + auto WrappedTask = [this, Id] { + assert(!Tasks.empty()); + do { + Tasks.front()(); + Tasks.pop_front(); + } while (Id != NextRunId++); + }; + return std::async(std::launch::deferred, std::move(WrappedTask)); + } + +private: + /// The id of the next task that we'll add to the queue. + uint64_t NextTaskId = 0; + + /// The id of the next task that needs to be run. + uint64_t NextRunId = 0; + + /// Tasks waiting for execution in the queue. + std::deque> Tasks; +}; +#endif +} // namespace llvm + +#endif // LLVM_SUPPORT_TASK_QUEUE_H Index: llvm/unittests/Support/CMakeLists.txt =================================================================== --- llvm/unittests/Support/CMakeLists.txt +++ llvm/unittests/Support/CMakeLists.txt @@ -53,6 +53,7 @@ SwapByteOrderTest.cpp TarWriterTest.cpp TargetParserTest.cpp + TaskQueueTest.cpp ThreadLocalTest.cpp ThreadPool.cpp Threading.cpp Index: llvm/unittests/Support/TaskQueueTest.cpp =================================================================== --- /dev/null +++ llvm/unittests/Support/TaskQueueTest.cpp @@ -0,0 +1,185 @@ +//========- unittests/Support/TaskQueue.cpp - TaskQueue.h tests ------========// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/TaskQueue.h" + +#include "gtest/gtest.h" + +using namespace llvm; + +class TaskQueueTest : public testing::Test { +protected: + TaskQueueTest() {} + + /// Make sure this thread not progress faster than the main thread. + void waitForMainThread() { + std::unique_lock LockGuard(WaitMainThreadMutex); + WaitMainThread.wait(LockGuard, [&] { return MainThreadReady; }); + } + + /// Set the readiness of the main thread. + void setMainThreadReady() { + { + std::unique_lock LockGuard(WaitMainThreadMutex); + MainThreadReady = true; + } + WaitMainThread.notify_all(); + } + + void SetUp() override { MainThreadReady = false; } + + std::condition_variable WaitMainThread; + std::mutex WaitMainThreadMutex; + bool MainThreadReady; +}; + +TEST_F(TaskQueueTest, OrderedFutures) { + ThreadPool TP(1); + TaskQueue TQ(TP); + std::atomic X = 0; + std::atomic Y = 0; + std::atomic Z = 0; + + std::future F1 = TQ.async([&] { + waitForMainThread(); + ++X; + }); + std::future F2 = TQ.async([&] { + waitForMainThread(); + ++Y; + }); + std::future F3 = TQ.async([&] { + waitForMainThread(); + ++Z; + }); + + setMainThreadReady(); + F1.wait(); + ASSERT_EQ(1, X); +#if !LLVM_ENABLE_THREADS + // We can't check this in the LLVM_ENABLE_THREADS case since the work thread + // proceeds independently of the main thread, so this may or may not still + // be true. + ASSERT_EQ(0, Y); + ASSERT_EQ(0, Z); +#endif + + F2.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); +#if !LLVM_ENABLE_THREADS + ASSERT_EQ(0, Z); +#endif + + F3.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); + ASSERT_EQ(1, Z); +} + +TEST_F(TaskQueueTest, UnOrderedFutures) { + ThreadPool TP(1); + TaskQueue TQ(TP); + std::atomic X = 0; + std::atomic Y = 0; + std::atomic Z = 0; + std::future F1 = TQ.async([&] { ++X; }); + std::future F2 = TQ.async([&] { ++Y; }); + std::future F3 = TQ.async([&] { ++Z; }); + + F2.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); +#if !LLVM_ENABLE_THREADS + // We can't check this in the LLVM_ENABLE_THREADS case since the work thread + // proceeds independently of the main thread, so this may or may not still + // be true. + ASSERT_EQ(0, Z); +#endif + + F3.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); + ASSERT_EQ(1, Z); +} +// +// TEST_F(TaskQueueTest, Wait) { +// // Test that calling wait() on a TaskQueue processes all outstanding work. +// TaskQueue TQ; +// std::atomic X = 0; +// std::atomic Y = 0; +// std::atomic Z = 0; +// TQ.async([&] { X = 1; }); +// TQ.async([&] { Y = 1; }); +// TQ.async([&] { Z = 1; }); +// +// TQ.wait(); +// ASSERT_EQ(1, X); +// ASSERT_EQ(1, Y); +// ASSERT_EQ(1, Z); +//} +// +// TEST_F(TaskQueueTest, WaitAgain) { +// // Test that calling wait() on a TaskQueue and queueing up more work +// processes +// // the new work as it should. +// TaskQueue TQ; +// std::atomic X = 0; +// std::atomic Y = 0; +// std::atomic Z = 0; +// TQ.async([&] { ++X; }); +// TQ.async([&] { ++Y; }); +// TQ.async([&] { ++Z; }); +// +// TQ.wait(); +// ASSERT_EQ(1, X); +// ASSERT_EQ(1, Y); +// ASSERT_EQ(1, Z); +// +// TQ.async([&] { ++X; }); +// TQ.async([&] { ++Y; }); +// TQ.async([&] { ++Z; }); +// +// TQ.wait(); +// ASSERT_EQ(2, X); +// ASSERT_EQ(2, Y); +// ASSERT_EQ(2, Z); +//} +// +// TEST_F(TaskQueueTest, QueueWaitAfterFutureWait) { +// // Test that waiting on the result of some work and then waiting on the +// // entire queue processes each job exactly once, and in the correct +// // order. +// TaskQueue TQ; +// std::atomic X = 0; +// std::atomic Y = 0; +// std::atomic Z = 0; +// std::atomic A = 0; +// std::shared_future F1 = TQ.async([&] { ++X; }); +// std::shared_future F2 = TQ.async([&] { ++Y; }); +// std::shared_future F3 = TQ.async([&] { ++Z; }); +// std::shared_future F4 = TQ.async([&] { ++A; }); +// +// F2.wait(); +// ASSERT_EQ(1, X); +// ASSERT_EQ(1, Y); +//#if !LLVM_ENABLE_THREADS +// // We can't check this in the LLVM_ENABLE_THREADS case since the work thread +// // proceeds independently of the main thread, so this may or may not still +// // be true. +// ASSERT_EQ(0, Z); +// ASSERT_EQ(0, A); +//#endif +// +// TQ.wait(); +// ASSERT_EQ(1, X); +// ASSERT_EQ(1, Y); +// ASSERT_EQ(1, Z); +// ASSERT_EQ(1, A); +//}