Index: llvm/include/llvm/Support/TaskQueue.h =================================================================== --- /dev/null +++ llvm/include/llvm/Support/TaskQueue.h @@ -0,0 +1,156 @@ +//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// +// This file defines a crude C++11 based task queue. +// +//===----------------------------------------------------------------------===// + +#ifndef LLVM_SUPPORT_TASK_QUEUE_H +#define LLVM_SUPPORT_TASK_QUEUE_H + +#include "llvm/Config/llvm-config.h" +#include "llvm/Support/ThreadPool.h" +#include "llvm/Support/thread.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace llvm { +/// TaskQueue executes serialized work on a user-defined Thread Pool. It +/// guarantees that if task B is enqueued after task A, task B begins after +/// task A completes and there is no overlap between the two. +class TaskQueue { +#if LLVM_ENABLE_THREADS + template friend class Task; + // Because we don't have init capture to use move-only local variables that + // are captured into a lambda, we create the promise inside an explicit + // callable struct. We want to do as much of the wrapping in the + // type-specialized domain (before type erasure) and then erase this into a + // std::function. + template struct Task { + explicit Task(Callable C, TaskQueue &Parent) + : C(std::move(C)), P(std::make_shared>()), + Parent(&Parent) {} + + void operator()() noexcept { + C(); + P->set_value(); + + // We just completed a task. If there are no more tasks in the queue, + // update IsTaskInFlight to false and stop doing work. Otherwise + // schedule the next task (while not holding the lock). + std::function Continuation; + { + std::lock_guard Lock(Parent->QueueLock); + if (Parent->Tasks.empty()) { + Parent->IsTaskInFlight = false; + return; + } + + Continuation = std::move(Parent->Tasks.front()); + Parent->Tasks.pop_front(); + } + Parent->Scheduler.async(std::move(Continuation)); + } + + Callable C; + std::shared_ptr> P; + TaskQueue *Parent; + }; +#endif +public: + /// Construct a task queue with no work. + TaskQueue(ThreadPool &Scheduler) +#if LLVM_ENABLE_THREADS + : Scheduler(Scheduler) +#endif + { + (void)Scheduler; + } + + /// Blocking destructor: the queue will wait for all work to complete. + ~TaskQueue() { +#if LLVM_ENABLE_THREADS + Scheduler.wait(); +#else + for (auto &F : Tasks) + F(); +#endif + assert(Tasks.empty()); + } + + /// Asynchronous submission of a task to the queue. The returned future can be + /// used to wait for the task (and all previous tasks that have not yet + /// completed) to finish. + template std::future async(Callable &&C) { +#if LLVM_ENABLE_THREADS + Task T{std::move(C), *this}; + + std::future F = T.P->get_future(); + { + std::lock_guard Lock(QueueLock); + // If there's already a task in flight, just queue this one up. If + // there is not a task in flight, bypass the queue and schedule this + // task immediately. + if (IsTaskInFlight) + Tasks.push_back(std::move(T)); + else { + Scheduler.async(std::move(T)); + IsTaskInFlight = true; + } + } + return std::move(F); +#else + + // There is only one thread in the entire application. We take advantage + // of deque pointer stability to get equality comparison for queued tasks. + // Specifically, since std::function is not equality comparable, we check + // for equality by using the address of the std::function inside of the + // deque, which is guaranteed to be stable across push_back and pop_front. + Tasks.push_back(std::move(C)); + auto *ElementPtr = &Tasks.back(); + + auto WrappedTask = [this, ElementPtr] { + auto *NextPtr = &Tasks.front(); + do { + NextPtr = &Tasks.front(); + Tasks.front()(); + Tasks.pop_front(); + } while (NextPtr != ElementPtr); + }; + return std::async(std::launch::deferred, std::move(WrappedTask)); +#endif + } + +private: +#if LLVM_ENABLE_THREADS + /// The thread pool on which to run the work. + ThreadPool &Scheduler; + + /// State which indicates whether the queue currently is currently processing + /// any work. + bool IsTaskInFlight = false; + + /// Mutex for synchronizing access to the Tasks array. + std::mutex QueueLock; +#endif + + /// Tasks waiting for execution in the queue. + std::deque> Tasks; +}; +} // namespace llvm + +#endif // LLVM_SUPPORT_TASK_QUEUE_H Index: llvm/unittests/Support/CMakeLists.txt =================================================================== --- llvm/unittests/Support/CMakeLists.txt +++ llvm/unittests/Support/CMakeLists.txt @@ -53,6 +53,7 @@ SwapByteOrderTest.cpp TarWriterTest.cpp TargetParserTest.cpp + TaskQueueTest.cpp ThreadLocalTest.cpp ThreadPool.cpp Threading.cpp Index: llvm/unittests/Support/TaskQueueTest.cpp =================================================================== --- /dev/null +++ llvm/unittests/Support/TaskQueueTest.cpp @@ -0,0 +1,185 @@ +//========- unittests/Support/TaskQueue.cpp - TaskQueue.h tests ------========// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/TaskQueue.h" + +#include "gtest/gtest.h" + +using namespace llvm; + +class TaskQueueTest : public testing::Test { +protected: + TaskQueueTest() {} + + /// Make sure this thread not progress faster than the main thread. + void waitForMainThread() { + std::unique_lock LockGuard(WaitMainThreadMutex); + WaitMainThread.wait(LockGuard, [&] { return MainThreadReady; }); + } + + /// Set the readiness of the main thread. + void setMainThreadReady() { + { + std::unique_lock LockGuard(WaitMainThreadMutex); + MainThreadReady = true; + } + WaitMainThread.notify_all(); + } + + void SetUp() override { MainThreadReady = false; } + + std::condition_variable WaitMainThread; + std::mutex WaitMainThreadMutex; + bool MainThreadReady; +}; + +TEST_F(TaskQueueTest, OrderedFutures) { + ThreadPool TP(1); + TaskQueue TQ(TP); + std::atomic X = 0; + std::atomic Y = 0; + std::atomic Z = 0; + + std::future F1 = TQ.async([&] { + waitForMainThread(); + ++X; + }); + std::future F2 = TQ.async([&] { + waitForMainThread(); + ++Y; + }); + std::future F3 = TQ.async([&] { + waitForMainThread(); + ++Z; + }); + + setMainThreadReady(); + F1.wait(); + ASSERT_EQ(1, X); +#if !LLVM_ENABLE_THREADS + // We can't check this in the LLVM_ENABLE_THREADS case since the work thread + // proceeds independently of the main thread, so this may or may not still + // be true. + ASSERT_EQ(0, Y); + ASSERT_EQ(0, Z); +#endif + + F2.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); +#if !LLVM_ENABLE_THREADS + ASSERT_EQ(0, Z); +#endif + + F3.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); + ASSERT_EQ(1, Z); +} + +TEST_F(TaskQueueTest, UnOrderedFutures) { + ThreadPool TP(1); + TaskQueue TQ(TP); + std::atomic X = 0; + std::atomic Y = 0; + std::atomic Z = 0; + std::future F1 = TQ.async([&] { ++X; }); + std::future F2 = TQ.async([&] { ++Y; }); + std::future F3 = TQ.async([&] { ++Z; }); + + F2.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); +#if !LLVM_ENABLE_THREADS + // We can't check this in the LLVM_ENABLE_THREADS case since the work thread + // proceeds independently of the main thread, so this may or may not still + // be true. + ASSERT_EQ(0, Z); +#endif + + F3.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); + ASSERT_EQ(1, Z); +} +// +// TEST_F(TaskQueueTest, Wait) { +// // Test that calling wait() on a TaskQueue processes all outstanding work. +// TaskQueue TQ; +// std::atomic X = 0; +// std::atomic Y = 0; +// std::atomic Z = 0; +// TQ.async([&] { X = 1; }); +// TQ.async([&] { Y = 1; }); +// TQ.async([&] { Z = 1; }); +// +// TQ.wait(); +// ASSERT_EQ(1, X); +// ASSERT_EQ(1, Y); +// ASSERT_EQ(1, Z); +//} +// +// TEST_F(TaskQueueTest, WaitAgain) { +// // Test that calling wait() on a TaskQueue and queueing up more work +// processes +// // the new work as it should. +// TaskQueue TQ; +// std::atomic X = 0; +// std::atomic Y = 0; +// std::atomic Z = 0; +// TQ.async([&] { ++X; }); +// TQ.async([&] { ++Y; }); +// TQ.async([&] { ++Z; }); +// +// TQ.wait(); +// ASSERT_EQ(1, X); +// ASSERT_EQ(1, Y); +// ASSERT_EQ(1, Z); +// +// TQ.async([&] { ++X; }); +// TQ.async([&] { ++Y; }); +// TQ.async([&] { ++Z; }); +// +// TQ.wait(); +// ASSERT_EQ(2, X); +// ASSERT_EQ(2, Y); +// ASSERT_EQ(2, Z); +//} +// +// TEST_F(TaskQueueTest, QueueWaitAfterFutureWait) { +// // Test that waiting on the result of some work and then waiting on the +// // entire queue processes each job exactly once, and in the correct +// // order. +// TaskQueue TQ; +// std::atomic X = 0; +// std::atomic Y = 0; +// std::atomic Z = 0; +// std::atomic A = 0; +// std::shared_future F1 = TQ.async([&] { ++X; }); +// std::shared_future F2 = TQ.async([&] { ++Y; }); +// std::shared_future F3 = TQ.async([&] { ++Z; }); +// std::shared_future F4 = TQ.async([&] { ++A; }); +// +// F2.wait(); +// ASSERT_EQ(1, X); +// ASSERT_EQ(1, Y); +//#if !LLVM_ENABLE_THREADS +// // We can't check this in the LLVM_ENABLE_THREADS case since the work thread +// // proceeds independently of the main thread, so this may or may not still +// // be true. +// ASSERT_EQ(0, Z); +// ASSERT_EQ(0, A); +//#endif +// +// TQ.wait(); +// ASSERT_EQ(1, X); +// ASSERT_EQ(1, Y); +// ASSERT_EQ(1, Z); +// ASSERT_EQ(1, A); +//}