Index: llvm/include/llvm/Support/TaskQueue.h =================================================================== --- /dev/null +++ llvm/include/llvm/Support/TaskQueue.h @@ -0,0 +1,104 @@ +//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// +// This file defines a crude C++11 based task queue. +// +//===----------------------------------------------------------------------===// + +#ifndef LLVM_SUPPORT_TASK_QUEUE_H +#define LLVM_SUPPORT_TASK_QUEUE_H + +#include "llvm/Config/llvm-config.h" +#include "llvm/Support/ThreadPool.h" +#include "llvm/Support/thread.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace llvm { +/// TaskQueue executes work on exactly 1 thread in a first in first out +/// fashion. All work is guaranteed to execute in the order it is received. +class TaskQueue { +public: + /// Construct a task queue with no work. + TaskQueue(ThreadPool &Pool); + + /// Blocking destructor: the queue will wait for all work to complete. + ~TaskQueue(); + + /// Asynchronous submission of a task to the queue. The returned future can be + /// used to wait for the task (and all previous tasks that have not yet + /// completed) to finish. + template std::future async(Callable &&C) { + std::future Future; + std::function Func; + { + std::lock_guard Lock(QueueLock); + std::tie(Func, Future) = CreateTask(std::forward(C)); + + // If there's already a task in flight, just queue this one up. + if (IsTaskInFlight) + Tasks.push(std::move(Func)); + } + Scheduler.async(std::move(Func)); + return Future; + } + +private: + template + std::pair, std::future> CreateTask(Callable &&C) { + auto P = std::make_shared>(); + + // Because we don't have init capture to use move-only local variables that + // are captured into a lambda, we create the promise inside an explicit + // callable struct. We want to do as much of the wrapping in the + // type-specialized domain (before type erasure) and then erase this into a + // std::function. + std::function WrappedTask = [this, C, P] { + C(); + P->set_value(); + + std::function Continuation; + { + std::lock_guard Lock(QueueLock); + if (Tasks.empty()) { + IsTaskInFlight = false; + return; + } + + Continuation = std::move(Tasks.front()); + Tasks.pop(); + } + Scheduler.async(std::move(Continuation)); + }; + + return std::make_pair(std::move(WrappedTask), P->get_future()); + } + + /// The thread pool on which to run the work. + ThreadPool &Scheduler; + + /// Tasks waiting for execution in the queue. + std::queue> Tasks; + + bool IsTaskInFlight = false; + + /// Mutex for synchronizing access to the Tasks array. + std::mutex QueueLock; +}; +} // namespace llvm + +#endif // LLVM_SUPPORT_TASK_QUEUE_H Index: llvm/lib/Support/CMakeLists.txt =================================================================== --- llvm/lib/Support/CMakeLists.txt +++ llvm/lib/Support/CMakeLists.txt @@ -116,6 +116,7 @@ SystemUtils.cpp TarWriter.cpp TargetParser.cpp + TaskQueue.cpp ThreadPool.cpp Timer.cpp ToolOutputFile.cpp Index: llvm/lib/Support/TaskQueue.cpp =================================================================== --- /dev/null +++ llvm/lib/Support/TaskQueue.cpp @@ -0,0 +1,54 @@ +//==-- llvm/Support/TaskQueue.cpp - A TaskQueue implementation ---*- C++ -*-==// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// +// This file implements a crude C++11 based task queue. +// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/TaskQueue.h" + +#include "llvm/Config/llvm-config.h" +#include "llvm/Support/Threading.h" +#include "llvm/Support/raw_ostream.h" + +using namespace llvm; + +#if LLVM_ENABLE_THREADS + +TaskQueue::TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) {} + +// The destructor joins the work thread, waiting for completion. +TaskQueue::~TaskQueue() { Scheduler.wait(); } + +#else // LLVM_ENABLE_THREADS Disabled + +TaskQueue::TaskQueue() {} + +std::shared_future TaskQueue::asyncImpl(TaskFunctionTy Task) { + std::shared_future ParentFuture; + if (!Tasks.empty()) + ParentFuture = Tasks.back(); + + auto Future = std::async(std::launch::deferred, + [this, ParentFuture, Task] { + if (ParentFuture.valid()) + ParentFuture.wait(); + + Task(); + Tasks.pop(); + }) + .share(); + + Tasks.push(Future); + return Future; +} + +TaskQueue::~TaskQueue() { wait(); } + +#endif Index: llvm/unittests/Support/CMakeLists.txt =================================================================== --- llvm/unittests/Support/CMakeLists.txt +++ llvm/unittests/Support/CMakeLists.txt @@ -53,6 +53,7 @@ SwapByteOrderTest.cpp TarWriterTest.cpp TargetParserTest.cpp + TaskQueueTest.cpp ThreadLocalTest.cpp ThreadPool.cpp Threading.cpp Index: llvm/unittests/Support/TaskQueueTest.cpp =================================================================== --- /dev/null +++ llvm/unittests/Support/TaskQueueTest.cpp @@ -0,0 +1,185 @@ +//========- unittests/Support/TaskQueue.cpp - TaskQueue.h tests ------========// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/TaskQueue.h" + +#include "gtest/gtest.h" + +using namespace llvm; + +class TaskQueueTest : public testing::Test { +protected: + TaskQueueTest() {} + + /// Make sure this thread not progress faster than the main thread. + void waitForMainThread() { + std::unique_lock LockGuard(WaitMainThreadMutex); + WaitMainThread.wait(LockGuard, [&] { return MainThreadReady; }); + } + + /// Set the readiness of the main thread. + void setMainThreadReady() { + { + std::unique_lock LockGuard(WaitMainThreadMutex); + MainThreadReady = true; + } + WaitMainThread.notify_all(); + } + + void SetUp() override { MainThreadReady = false; } + + std::condition_variable WaitMainThread; + std::mutex WaitMainThreadMutex; + bool MainThreadReady; +}; + +TEST_F(TaskQueueTest, OrderedFutures) { + ThreadPool TP(1); + TaskQueue TQ(TP); + std::atomic X = 0; + std::atomic Y = 0; + std::atomic Z = 0; + + std::future F1 = TQ.async([&] { + waitForMainThread(); + ++X; + }); + std::future F2 = TQ.async([&] { + waitForMainThread(); + ++Y; + }); + std::future F3 = TQ.async([&] { + waitForMainThread(); + ++Z; + }); + + setMainThreadReady(); + F1.wait(); + ASSERT_EQ(1, X); +#if !LLVM_ENABLE_THREADS + // We can't check this in the LLVM_ENABLE_THREADS case since the work thread + // proceeds independently of the main thread, so this may or may not still + // be true. + ASSERT_EQ(0, Y); + ASSERT_EQ(0, Z); +#endif + + F2.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); +#if !LLVM_ENABLE_THREADS + ASSERT_EQ(0, Z); +#endif + + F3.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); + ASSERT_EQ(1, Z); +} + +TEST_F(TaskQueueTest, UnOrderedFutures) { + ThreadPool TP(1); + TaskQueue TQ(TP); + std::atomic X = 0; + std::atomic Y = 0; + std::atomic Z = 0; + std::future F1 = TQ.async([&] { ++X; }); + std::future F2 = TQ.async([&] { ++Y; }); + std::future F3 = TQ.async([&] { ++Z; }); + + F2.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); +#if !LLVM_ENABLE_THREADS + // We can't check this in the LLVM_ENABLE_THREADS case since the work thread + // proceeds independently of the main thread, so this may or may not still + // be true. + ASSERT_EQ(0, Z); +#endif + + F3.wait(); + ASSERT_EQ(1, X); + ASSERT_EQ(1, Y); + ASSERT_EQ(1, Z); +} +// +// TEST_F(TaskQueueTest, Wait) { +// // Test that calling wait() on a TaskQueue processes all outstanding work. +// TaskQueue TQ; +// std::atomic X = 0; +// std::atomic Y = 0; +// std::atomic Z = 0; +// TQ.async([&] { X = 1; }); +// TQ.async([&] { Y = 1; }); +// TQ.async([&] { Z = 1; }); +// +// TQ.wait(); +// ASSERT_EQ(1, X); +// ASSERT_EQ(1, Y); +// ASSERT_EQ(1, Z); +//} +// +// TEST_F(TaskQueueTest, WaitAgain) { +// // Test that calling wait() on a TaskQueue and queueing up more work +// processes +// // the new work as it should. +// TaskQueue TQ; +// std::atomic X = 0; +// std::atomic Y = 0; +// std::atomic Z = 0; +// TQ.async([&] { ++X; }); +// TQ.async([&] { ++Y; }); +// TQ.async([&] { ++Z; }); +// +// TQ.wait(); +// ASSERT_EQ(1, X); +// ASSERT_EQ(1, Y); +// ASSERT_EQ(1, Z); +// +// TQ.async([&] { ++X; }); +// TQ.async([&] { ++Y; }); +// TQ.async([&] { ++Z; }); +// +// TQ.wait(); +// ASSERT_EQ(2, X); +// ASSERT_EQ(2, Y); +// ASSERT_EQ(2, Z); +//} +// +// TEST_F(TaskQueueTest, QueueWaitAfterFutureWait) { +// // Test that waiting on the result of some work and then waiting on the +// // entire queue processes each job exactly once, and in the correct +// // order. +// TaskQueue TQ; +// std::atomic X = 0; +// std::atomic Y = 0; +// std::atomic Z = 0; +// std::atomic A = 0; +// std::shared_future F1 = TQ.async([&] { ++X; }); +// std::shared_future F2 = TQ.async([&] { ++Y; }); +// std::shared_future F3 = TQ.async([&] { ++Z; }); +// std::shared_future F4 = TQ.async([&] { ++A; }); +// +// F2.wait(); +// ASSERT_EQ(1, X); +// ASSERT_EQ(1, Y); +//#if !LLVM_ENABLE_THREADS +// // We can't check this in the LLVM_ENABLE_THREADS case since the work thread +// // proceeds independently of the main thread, so this may or may not still +// // be true. +// ASSERT_EQ(0, Z); +// ASSERT_EQ(0, A); +//#endif +// +// TQ.wait(); +// ASSERT_EQ(1, X); +// ASSERT_EQ(1, Y); +// ASSERT_EQ(1, Z); +// ASSERT_EQ(1, A); +//}