Index: llvm/include/llvm/Support/TaskQueue.h =================================================================== --- /dev/null +++ llvm/include/llvm/Support/TaskQueue.h @@ -0,0 +1,110 @@ +//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// +// This file defines a crude C++11 based task queue. +// +//===----------------------------------------------------------------------===// + +#ifndef LLVM_SUPPORT_TASK_QUEUE_H +#define LLVM_SUPPORT_TASK_QUEUE_H + +#include "llvm/Config/llvm-config.h" +#include "llvm/Support/thread.h" + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace llvm { +/// TaskQueue executes work on exactly 1 thread in a first in first out +/// fashion. All work is guaranteed to execute in the order it is received. +class TaskQueue { +public: + using TaskFunctionTy = std::function; + +#if LLVM_ENABLE_THREADS + using TaskTy = std::packaged_task; +#else + struct Task { + std::shared_future ParentFuture; + std::shared_future ThisFuture; + std::function ThisTask; + }; + using TaskTy = Task; + +#endif + + /// Construct a task queue with no work. + TaskQueue(); + + /// Blocking destructor: the queue will wait for all work to complete. + ~TaskQueue(); + + /// Asynchronous submission of a task to the queue. The returned future can be + /// used to wait for the task (and all previous tasks that have not yet + /// completed) to finish. + template + inline std::shared_future async(Function &&F, Args &&... ArgList) { + auto Task = + std::bind(std::forward(F), std::forward(ArgList)...); + return asyncImpl(std::move(Task)); + } + + /// Asynchronous submission of a task to the queue. The returned future can be + /// used to wait for the task (and all previous tasks that have not yet + /// completed) to finish. + template + inline std::shared_future async(Function &&F) { + return asyncImpl(std::forward(F)); + } + + /// Blocking wait for all work to complete. + void wait(); + +private: + void WorkThreadFunc(); + + /// Asynchronous submission of a task to the queue. The returned future can be + /// used to wait for the task (and all previous tasks that have not yet + /// completed) to finish. + std::shared_future asyncImpl(TaskFunctionTy F); + + /// Tasks waiting for execution in the queue. + std::queue Tasks; + +#if LLVM_ENABLE_THREADS + /// Signal for the destruction of the pool, asking thread to exit. + /// This needs to come before WorkThread so that it is initialized first in + /// the constructor. + bool EnableFlag = true; + + /// Flag indicating whether the queue is in the middle of processing a job. + bool Active = false; + + /// Locking and signaling for accessing the Tasks queue. + std::mutex QueueLock; + std::condition_variable QueueCondition; + + /// Locking and signaling for job completion + std::mutex CompletionLock; + std::condition_variable CompletionCondition; + + /// The main work thread. + llvm::thread WorkThread; +#endif +}; +} // namespace llvm + +#endif // LLVM_SUPPORT_TASK_QUEUE_H Index: llvm/lib/Support/CMakeLists.txt =================================================================== --- llvm/lib/Support/CMakeLists.txt +++ llvm/lib/Support/CMakeLists.txt @@ -116,6 +116,7 @@ SystemUtils.cpp TarWriter.cpp TargetParser.cpp + TaskQueue.cpp ThreadPool.cpp Timer.cpp ToolOutputFile.cpp Index: llvm/lib/Support/TaskQueue.cpp =================================================================== --- /dev/null +++ llvm/lib/Support/TaskQueue.cpp @@ -0,0 +1,129 @@ +//==-- llvm/Support/TaskQueue.cpp - A TaskQueue implementation ---*- C++ -*-==// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// +// This file implements a crude C++11 based task queue. +// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/TaskQueue.h" + +#include "llvm/Config/llvm-config.h" +#include "llvm/Support/Threading.h" +#include "llvm/Support/raw_ostream.h" + +using namespace llvm; + +#if LLVM_ENABLE_THREADS + +TaskQueue::TaskQueue() : WorkThread([this] { WorkThreadFunc(); }) {} + +void TaskQueue::WorkThreadFunc() { + while (true) { + TaskTy TheTask; + { + std::unique_lock LockGuard(QueueLock); + // Wait for tasks to be pushed in the queue + QueueCondition.wait(LockGuard, + [&] { return !EnableFlag || !Tasks.empty(); }); + // Exit condition + if (!EnableFlag && Tasks.empty()) + return; + + { + std::unique_lock LockGuard(CompletionLock); + Active = true; + } + // Yeah, we have a task, grab it and release the lock on the queue + TheTask = std::move(Tasks.front()); + Tasks.pop(); + } + + // Run the task we just grabbed and update the ids. + TheTask(); + + { + std::unique_lock LockGuard(CompletionLock); + Active = false; + } + + // Notify task completion, in case someone waits on ThreadPool::wait() + CompletionCondition.notify_all(); + } +} + +void TaskQueue::wait() { + // Wait for the queue to be empty + std::unique_lock LockGuard(CompletionLock); + + // The order of the checks for ActiveThreads and Tasks.empty() matters because + // any active threads might be modifying the Tasks queue, and this would be a + // race. + CompletionCondition.wait(LockGuard, [&] { return !Active && Tasks.empty(); }); +} + +std::shared_future TaskQueue::asyncImpl(TaskFunctionTy TheTask) { + /// Wrap the Task in a packaged_task to return a future object. + TaskTy PackagedTask(std::move(TheTask)); + auto Future = PackagedTask.get_future(); + { + // Lock the queue and push the new task + std::unique_lock LockGuard(QueueLock); + + // Don't allow enqueueing after disabling the pool + assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); + + Tasks.push(std::move(PackagedTask)); + } + QueueCondition.notify_one(); + return Future.share(); +} + +// The destructor joins the work thread, waiting for completion. +TaskQueue::~TaskQueue() { + { + std::unique_lock LockGuard(QueueLock); + EnableFlag = false; + } + QueueCondition.notify_one(); + WorkThread.join(); +} + +#else // LLVM_ENABLE_THREADS Disabled + +TaskQueue::TaskQueue() {} + +void TaskQueue::wait() { + // Sequential implementation running the tasks + while (!Tasks.empty()) { + auto Task = std::move(Tasks.front()); + Tasks.pop(); + Task.ThisTask(); + } +} + +std::shared_future TaskQueue::asyncImpl(TaskFunctionTy TheTask) { + std::shared_future ThisFuture; + std::shared_future ParentFuture; + if (!Tasks.empty()) + ParentFuture = Tasks.back().ThisFuture; + + ThisFuture = std::async(std::launch::deferred, [this, ParentFuture] { + if (ParentFuture.valid()) + ParentFuture.wait(); + auto Task = std::move(Tasks.front()); + Tasks.pop(); + Task.ThisTask(); + }); + Tasks.push({ParentFuture, ThisFuture, std::move(TheTask)}); + return ThisFuture; +} + +TaskQueue::~TaskQueue() { wait(); } + +#endif Index: llvm/unittests/Support/CMakeLists.txt =================================================================== --- llvm/unittests/Support/CMakeLists.txt +++ llvm/unittests/Support/CMakeLists.txt @@ -53,6 +53,7 @@ SwapByteOrderTest.cpp TarWriterTest.cpp TargetParserTest.cpp + TaskQueueTest.cpp ThreadLocalTest.cpp ThreadPool.cpp Threading.cpp Index: llvm/unittests/Support/TaskQueueTest.cpp =================================================================== --- /dev/null +++ llvm/unittests/Support/TaskQueueTest.cpp @@ -0,0 +1,152 @@ +//========- unittests/Support/TaskQueue.cpp - TaskQueue.h tests ------========// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/TaskQueue.h" + +#include "gtest/gtest.h" + +using namespace llvm; + +class TaskQueueTest : public testing::Test { +protected: + TaskQueueTest() {} + + /// Make sure this thread not progress faster than the main thread. + void waitForMainThread() { + std::unique_lock LockGuard(WaitMainThreadMutex); + WaitMainThread.wait(LockGuard, [&] { return MainThreadReady; }); + } + + /// Set the readiness of the main thread. + void setMainThreadReady() { + { + std::unique_lock LockGuard(WaitMainThreadMutex); + MainThreadReady = true; + } + WaitMainThread.notify_all(); + } + + void SetUp() override { MainThreadReady = false; } + + std::condition_variable WaitMainThread; + std::mutex WaitMainThreadMutex; + bool MainThreadReady; +}; + +TEST_F(TaskQueueTest, OrderedFutures) { + TaskQueue TQ; + std::atomic X = 0; + std::atomic Y = 0; + std::atomic Z = 0; + + std::shared_future F1 = TQ.async([&] { + waitForMainThread(); + X = 1; + }); + std::shared_future F2 = TQ.async([&] { + waitForMainThread(); + Y = 1; + }); + std::shared_future F3 = TQ.async([&] { + waitForMainThread(); + Z = 1; + }); + + setMainThreadReady(); + F1.wait(); + ASSERT_EQ(1, X); +#if !LLVM_ENABLE_THREADS + // We can't check this in the LLVM_ENABLE_THREADS case since the work thread + // proceeds independently of the main thread, so this may or may not still + // be true. + ASSERT_EQ(0, Y); + ASSERT_EQ(0, Z); +#endif + + F2.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); +#if !LLVM_ENABLE_THREADS + ASSERT_EQ(0, Z); +#endif + + F3.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); + ASSERT_EQ(1, Z); +} + +TEST_F(TaskQueueTest, UnOrderedFutures) { + TaskQueue TQ; + std::atomic X = 0; + std::atomic Y = 0; + std::atomic Z = 0; + std::shared_future F1 = TQ.async([&] { X = 1; }); + std::shared_future F2 = TQ.async([&] { Y = 1; }); + std::shared_future F3 = TQ.async([&] { Z = 1; }); + + F2.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); +#if !LLVM_ENABLE_THREADS + // We can't check this in the LLVM_ENABLE_THREADS case since the work thread + // proceeds independently of the main thread, so this may or may not still + // be true. + ASSERT_EQ(0, Z); +#endif + + F3.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); + ASSERT_EQ(1, Z); +} + +TEST_F(TaskQueueTest, Wait) { + TaskQueue TQ; + std::atomic X = 0; + std::atomic Y = 0; + std::atomic Z = 0; + TQ.async([&] { X = 1; }); + TQ.async([&] { Y = 1; }); + TQ.async([&] { Z = 1; }); + + TQ.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); + ASSERT_EQ(1, Z); +} + +TEST_F(TaskQueueTest, QueueWaitAfterFutureWait) { + TaskQueue TQ; + std::atomic X = 0; + std::atomic Y = 0; + std::atomic Z = 0; + std::atomic A = 0; + std::shared_future F1 = TQ.async([&] { X = 1; }); + std::shared_future F2 = TQ.async([&] { Y = 1; }); + std::shared_future F3 = TQ.async([&] { Z = 1; }); + std::shared_future F4 = TQ.async([&] { A = 1; }); + + F2.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); +#if !LLVM_ENABLE_THREADS + // We can't check this in the LLVM_ENABLE_THREADS case since the work thread + // proceeds independently of the main thread, so this may or may not still + // be true. + ASSERT_EQ(0, Z); + ASSERT_EQ(0, A); +#endif + + TQ.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); + ASSERT_EQ(1, Z); + ASSERT_EQ(1, A); +}