diff --git a/llvm/include/llvm/Support/ThreadPool.h b/llvm/include/llvm/Support/ThreadPool.h --- a/llvm/include/llvm/Support/ThreadPool.h +++ b/llvm/include/llvm/Support/ThreadPool.h @@ -13,26 +13,42 @@ #ifndef LLVM_SUPPORT_THREADPOOL_H #define LLVM_SUPPORT_THREADPOOL_H +#include "llvm/ADT/DenseMap.h" #include "llvm/Config/llvm-config.h" +#include "llvm/Support/RWMutex.h" #include "llvm/Support/Threading.h" #include "llvm/Support/thread.h" #include #include +#include #include #include #include -#include #include namespace llvm { +class ThreadPoolTaskGroup; + /// A ThreadPool for asynchronous parallel execution on a defined number of /// threads. /// /// The pool keeps a vector of threads alive, waiting on a condition variable /// for some work to become available. +/// +/// It is possible to reuse one thread pool for different groups of tasks +/// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using +/// the same queue, but it is possible to wait only for a specific group of +/// tasks to finish. +/// +/// It is also possible for worker threads to submit new tasks and wait for +/// them. Note that this may result in a deadlock in cases such as when a task +/// (directly or indirectly) tries to wait for its own completion, or when all +/// available threads are used up by tasks waiting for a task that has no thread +/// left to run on (this includes waiting on the returned future). It should be +/// generally safe to wait() for a group as long as groups do not form a cycle. class ThreadPool { public: /// Construct a pool using the hardware strategy \p S for mapping hardware @@ -47,23 +63,47 @@ /// Asynchronous submission of a task to the pool. The returned future can be /// used to wait for the task to finish and is *non-blocking* on destruction. template - inline auto async(Function &&F, Args &&...ArgList) { + auto async(Function &&F, Args &&...ArgList) { auto Task = std::bind(std::forward(F), std::forward(ArgList)...); return async(std::move(Task)); } + /// Overload, task will be in the given task group. + template + auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) { + auto Task = + std::bind(std::forward(F), std::forward(ArgList)...); + return async(Group, std::move(Task)); + } + /// Asynchronous submission of a task to the pool. The returned future can be /// used to wait for the task to finish and is *non-blocking* on destruction. template auto async(Func &&F) -> std::shared_future { - return asyncImpl(std::function(std::forward(F))); + return asyncImpl(std::function(std::forward(F)), + nullptr); + } + + template + auto async(ThreadPoolTaskGroup &Group, Func &&F) + -> std::shared_future { + return asyncImpl(std::function(std::forward(F)), + &Group); } /// Blocking wait for all the threads to complete and the queue to be empty. /// It is an error to try to add new tasks while blocking on this call. + /// Calling wait() from a task would deadlock waiting for itself. void wait(); + /// Blocking wait for only all the threads in the given group to complete. + /// It is possible to wait even inside a task, but waiting (directly or + /// indirectly) on itself will deadlock. If called from a task running on a + /// worker thread, the call may process pending tasks while waiting in order + /// not to waste the thread. + void wait(ThreadPoolTaskGroup &Group); + // TODO: misleading legacy name warning! // Returns the maximum number of worker threads in the pool, not the current // number of threads! @@ -98,12 +138,15 @@ std::move(F)}; } - bool workCompletedUnlocked() { return !ActiveThreads && Tasks.empty(); } + /// Returns true if all tasks in the given group have finished (nullptr means + /// all tasks regardless of their group). QueueLock must be locked. + bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const; /// Asynchronous submission of a task to the pool. The returned future can be /// used to wait for the task to finish and is *non-blocking* on destruction. template - std::shared_future asyncImpl(std::function Task) { + std::shared_future asyncImpl(std::function Task, + ThreadPoolTaskGroup *Group) { #if LLVM_ENABLE_THREADS /// Wrap the Task in a std::function that sets the result of the @@ -117,7 +160,7 @@ // Don't allow enqueueing after disabling the pool assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); - Tasks.push(std::move(R.first)); + Tasks.emplace_back(std::make_pair(std::move(R.first), Group)); requestedThreads = ActiveThreads + Tasks.size(); } QueueCondition.notify_one(); @@ -130,7 +173,7 @@ auto Future = std::async(std::launch::deferred, std::move(Task)).share(); // Wrap the future so that both ThreadPool::wait() can operate and the // returned future can be sync'ed on. - Tasks.push([Future]() { Future.get(); }); + Tasks.emplace_back(std::make_pair([Future]() { Future.get(); }, Group)); return Future; #endif } @@ -139,25 +182,29 @@ // Grow to ensure that we have at least `requested` Threads, but do not go // over MaxThreadCount. void grow(int requested); + + void processTasks(ThreadPoolTaskGroup *WaitingForGroup); #endif /// Threads in flight std::vector Threads; /// Lock protecting access to the Threads vector. - mutable std::mutex ThreadsLock; + mutable llvm::sys::RWMutex ThreadsLock; /// Tasks waiting for execution in the pool. - std::queue> Tasks; + std::deque, ThreadPoolTaskGroup *>> Tasks; /// Locking and signaling for accessing the Tasks queue. std::mutex QueueLock; std::condition_variable QueueCondition; - /// Signaling for job completion + /// Signaling for job completion (all tasks or all tasks in a group). std::condition_variable CompletionCondition; /// Keep track of the number of thread actually busy unsigned ActiveThreads = 0; + /// Number of threads active for tasks in the given group (only non-zero). + DenseMap ActiveGroups; #if LLVM_ENABLE_THREADS // avoids warning for unused variable /// Signal for the destruction of the pool, asking thread to exit. @@ -169,6 +216,34 @@ /// Maximum number of threads to potentially grow this pool to. const unsigned MaxThreadCount; }; -} + +/// A group of tasks to be run on a thread pool. Thread pool tasks in different +/// groups can run on the same threadpool but can be waited for separately. +/// It is even possible for tasks of one group to submit and wait for tasks +/// of another group, as long as this does not form a loop. +class ThreadPoolTaskGroup { +public: + /// The ThreadPool argument is the thread pool to forward calls to. + ThreadPoolTaskGroup(ThreadPool &Pool) : Pool(Pool) {} + + /// Blocking destructor: will wait for all the tasks in the group to complete + /// by calling ThreadPool::wait(). + ~ThreadPoolTaskGroup() { wait(); } + + /// Calls ThreadPool::async() for this group. + template + inline auto async(Function &&F, Args &&...ArgList) { + return Pool.async(*this, std::forward(F), + std::forward(ArgList)...); + } + + /// Calls ThreadPool::wait() for this group. + void wait() { Pool.wait(*this); } + +private: + ThreadPool &Pool; +}; + +} // namespace llvm #endif // LLVM_SUPPORT_THREADPOOL_H diff --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp --- a/llvm/lib/Support/ThreadPool.cpp +++ b/llvm/lib/Support/ThreadPool.cpp @@ -24,11 +24,19 @@ #if LLVM_ENABLE_THREADS +// A note on thread groups: Tasks are by default in no group (represented +// by nullptr ThreadPoolTaskGroup pointer in the Tasks queue) and functionality +// here normally works on all tasks regardless of their group (functions +// in that case receive nullptr ThreadPoolTaskGroup pointer as argument). +// A task in a group has a pointer to that ThreadPoolTaskGroup in the Tasks +// queue, and functions called to work only on tasks from one group take that +// pointer. + ThreadPool::ThreadPool(ThreadPoolStrategy S) : Strategy(S), MaxThreadCount(S.compute_thread_count()) {} void ThreadPool::grow(int requested) { - std::unique_lock LockGuard(ThreadsLock); + llvm::sys::ScopedWriter LockGuard(ThreadsLock); if (Threads.size() >= MaxThreadCount) return; // Already hit the max thread pool size. int newThreadCount = std::min(requested, MaxThreadCount); @@ -36,52 +44,125 @@ int ThreadID = Threads.size(); Threads.emplace_back([this, ThreadID] { Strategy.apply_thread_strategy(ThreadID); - while (true) { - std::function Task; - { - std::unique_lock LockGuard(QueueLock); - // Wait for tasks to be pushed in the queue - QueueCondition.wait(LockGuard, - [&] { return !EnableFlag || !Tasks.empty(); }); - // Exit condition - if (!EnableFlag && Tasks.empty()) - return; - // Yeah, we have a task, grab it and release the lock on the queue - - // We first need to signal that we are active before popping the queue - // in order for wait() to properly detect that even if the queue is - // empty, there is still a task in flight. - ++ActiveThreads; - Task = std::move(Tasks.front()); - Tasks.pop(); - } - // Run the task we just grabbed - Task(); - - bool Notify; - { - // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait() - std::lock_guard LockGuard(QueueLock); - --ActiveThreads; - Notify = workCompletedUnlocked(); - } - // Notify task completion if this is the last active thread, in case - // someone waits on ThreadPool::wait(). - if (Notify) - CompletionCondition.notify_all(); - } + processTasks(nullptr); }); } } +#ifndef NDEBUG +// The group of the tasks run by the current thread. +static LLVM_THREAD_LOCAL std::vector + *CurrentThreadTaskGroups = nullptr; +#endif + +// WaitingForGroup == nullptr means all tasks regardless of their group. +void ThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) { + while (true) { + std::function Task; + ThreadPoolTaskGroup *GroupOfTask; + { + std::unique_lock LockGuard(QueueLock); + bool workCompletedForGroup = false; // Result of workCompletedUnlocked() + // Wait for tasks to be pushed in the queue + QueueCondition.wait(LockGuard, [&] { + return !EnableFlag || !Tasks.empty() || + (WaitingForGroup != nullptr && + (workCompletedForGroup = + workCompletedUnlocked(WaitingForGroup))); + }); + // Exit condition + if (!EnableFlag && Tasks.empty()) + return; + if (WaitingForGroup != nullptr && workCompletedForGroup) + return; + // Yeah, we have a task, grab it and release the lock on the queue + + // We first need to signal that we are active before popping the queue + // in order for wait() to properly detect that even if the queue is + // empty, there is still a task in flight. + ++ActiveThreads; + Task = std::move(Tasks.front().first); + GroupOfTask = Tasks.front().second; + // Need to count active threads in each group separately, ActiveThreads + // would never be 0 if waiting for another group inside a wait. + if (GroupOfTask != nullptr) + ++ActiveGroups[GroupOfTask]; // Increment or set to 1 if new item + Tasks.pop_front(); + } +#ifndef NDEBUG + if (CurrentThreadTaskGroups == nullptr) + CurrentThreadTaskGroups = new std::vector; + CurrentThreadTaskGroups->push_back(GroupOfTask); +#endif + + // Run the task we just grabbed + Task(); + +#ifndef NDEBUG + CurrentThreadTaskGroups->pop_back(); +#endif + + bool Notify; + bool NotifyGroup; + { + // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait() + std::lock_guard LockGuard(QueueLock); + --ActiveThreads; + if (GroupOfTask != nullptr) { + auto A = ActiveGroups.find(GroupOfTask); + if (--(A->second) == 0) + ActiveGroups.erase(A); + } + Notify = workCompletedUnlocked(GroupOfTask); + NotifyGroup = GroupOfTask != nullptr && Notify; + } + // Notify task completion if this is the last active thread, in case + // someone waits on ThreadPool::wait(). + if (Notify) + CompletionCondition.notify_all(); + // If this was a task in a group, notify also threads waiting for tasks + // in this function on QueueCondition, to make a recursive wait() return + // after the group it's been waiting for has finished. + if (NotifyGroup) + QueueCondition.notify_all(); + } +} + +bool ThreadPool::workCompletedUnlocked(ThreadPoolTaskGroup *Group) const { + if (Group == nullptr) + return !ActiveThreads && Tasks.empty(); + return ActiveGroups.count(Group) == 0 && + !llvm::any_of(Tasks, + [Group](const auto &T) { return T.second == Group; }); +} + void ThreadPool::wait() { + assert(!isWorkerThread()); // Would deadlock waiting for itself. // Wait for all threads to complete and the queue to be empty std::unique_lock LockGuard(QueueLock); - CompletionCondition.wait(LockGuard, [&] { return workCompletedUnlocked(); }); + CompletionCondition.wait(LockGuard, + [&] { return workCompletedUnlocked(nullptr); }); +} + +void ThreadPool::wait(ThreadPoolTaskGroup &Group) { + // Wait for all threads in the group to complete. + if (!isWorkerThread()) { + std::unique_lock LockGuard(QueueLock); + CompletionCondition.wait(LockGuard, + [&] { return workCompletedUnlocked(&Group); }); + return; + } + // Make sure to not deadlock waiting for oneself. + assert(CurrentThreadTaskGroups == nullptr || + !llvm::is_contained(*CurrentThreadTaskGroups, &Group)); + // Handle the case of recursive call from another task in a different group, + // in which case process tasks while waiting to keep the thread busy and avoid + // possible deadlock. + processTasks(&Group); } bool ThreadPool::isWorkerThread() const { - std::unique_lock LockGuard(ThreadsLock); + llvm::sys::ScopedReader LockGuard(ThreadsLock); llvm::thread::id CurrentThreadId = llvm::this_thread::get_id(); for (const llvm::thread &Thread : Threads) if (CurrentThreadId == Thread.get_id()) @@ -96,7 +177,7 @@ EnableFlag = false; } QueueCondition.notify_all(); - std::unique_lock LockGuard(ThreadsLock); + llvm::sys::ScopedReader LockGuard(ThreadsLock); for (auto &Worker : Threads) Worker.join(); } @@ -115,12 +196,18 @@ void ThreadPool::wait() { // Sequential implementation running the tasks while (!Tasks.empty()) { - auto Task = std::move(Tasks.front()); - Tasks.pop(); + auto Task = std::move(Tasks.front().first); + Tasks.pop_front(); Task(); } } +void ThreadPool::wait(ThreadPoolTaskGroup &) { + // Simply wait for all, this works even if recursive (the running task + // is already removed from the queue). + wait(); +} + bool ThreadPool::isWorkerThread() const { report_fatal_error("LLVM compiled without multithreading"); } diff --git a/llvm/tools/llvm-profdata/llvm-profdata.cpp b/llvm/tools/llvm-profdata/llvm-profdata.cpp --- a/llvm/tools/llvm-profdata/llvm-profdata.cpp +++ b/llvm/tools/llvm-profdata/llvm-profdata.cpp @@ -38,6 +38,7 @@ #include "llvm/Support/WithColor.h" #include "llvm/Support/raw_ostream.h" #include +#include using namespace llvm; diff --git a/llvm/unittests/Support/ThreadPool.cpp b/llvm/unittests/Support/ThreadPool.cpp --- a/llvm/unittests/Support/ThreadPool.cpp +++ b/llvm/unittests/Support/ThreadPool.cpp @@ -18,6 +18,9 @@ #include "llvm/Support/TargetSelect.h" #include "llvm/Support/Threading.h" +#include +#include + #include "gtest/gtest.h" using namespace llvm; @@ -29,6 +32,7 @@ SmallVector UnsupportedArchs; SmallVector UnsupportedOSs; SmallVector UnsupportedEnvironments; + protected: // This is intended for platform as a temporary "XFAIL" bool isUnsupportedOSOrEnvironment() { @@ -57,27 +61,45 @@ } /// Make sure this thread not progress faster than the main thread. - void waitForMainThread() { - std::unique_lock LockGuard(WaitMainThreadMutex); - WaitMainThread.wait(LockGuard, [&] { return MainThreadReady; }); - } + void waitForMainThread() { waitForPhase(1); } /// Set the readiness of the main thread. - void setMainThreadReady() { + void setMainThreadReady() { setPhase(1); } + + /// Wait until given phase is set using setPhase(); first "main" phase is 1. + /// See also PhaseResetHelper below. + void waitForPhase(int Phase) { + std::unique_lock LockGuard(CurrentPhaseMutex); + CurrentPhaseCondition.wait( + LockGuard, [&] { return CurrentPhase == Phase || CurrentPhase < 0; }); + } + /// If a thread waits on another phase, the test could bail out on a failed + /// assertion and ThreadPool destructor would wait() on all threads, which + /// would deadlock on the task waiting. Create this helper to automatically + /// reset the phase and unblock such threads. + struct PhaseResetHelper { + PhaseResetHelper(ThreadPoolTest *test) : test(test) {} + ~PhaseResetHelper() { test->setPhase(-1); } + ThreadPoolTest *test; + }; + + /// Advance to the given phase. + void setPhase(int Phase) { { - std::unique_lock LockGuard(WaitMainThreadMutex); - MainThreadReady = true; + std::unique_lock LockGuard(CurrentPhaseMutex); + assert(Phase == CurrentPhase + 1 || Phase < 0); + CurrentPhase = Phase; } - WaitMainThread.notify_all(); + CurrentPhaseCondition.notify_all(); } - void SetUp() override { MainThreadReady = false; } + void SetUp() override { CurrentPhase = 0; } std::vector RunOnAllSockets(ThreadPoolStrategy S); - std::condition_variable WaitMainThread; - std::mutex WaitMainThreadMutex; - bool MainThreadReady = false; + std::condition_variable CurrentPhaseCondition; + std::mutex CurrentPhaseMutex; + int CurrentPhase; // -1 = error, 0 = setup, 1 = ready, 2+ = custom }; #define CHECK_UNSUPPORTED() \ @@ -194,6 +216,125 @@ ASSERT_EQ(5, checked_in); } +// Check running tasks in different groups. +TEST_F(ThreadPoolTest, Groups) { + CHECK_UNSUPPORTED(); + // Need at least two threads, as the task in group2 + // might block a thread until all tasks in group1 finish. + ThreadPoolStrategy S = hardware_concurrency(2); + if (S.compute_thread_count() < 2) + return; + ThreadPool Pool(S); + PhaseResetHelper Helper(this); + ThreadPoolTaskGroup Group1(Pool); + ThreadPoolTaskGroup Group2(Pool); + + // Check that waiting for an empty group is a no-op. + Group1.wait(); + + std::atomic_int checked_in1{0}; + std::atomic_int checked_in2{0}; + + for (size_t i = 0; i < 5; ++i) { + Group1.async([this, &checked_in1] { + waitForMainThread(); + ++checked_in1; + }); + } + Group2.async([this, &checked_in2] { + waitForPhase(2); + ++checked_in2; + }); + ASSERT_EQ(0, checked_in1); + ASSERT_EQ(0, checked_in2); + // Start first group and wait for it. + setMainThreadReady(); + Group1.wait(); + ASSERT_EQ(5, checked_in1); + // Second group has not yet finished, start it and wait for it. + ASSERT_EQ(0, checked_in2); + setPhase(2); + Group2.wait(); + ASSERT_EQ(5, checked_in1); + ASSERT_EQ(1, checked_in2); +} + +// Check recursive tasks. +TEST_F(ThreadPoolTest, RecursiveGroups) { + CHECK_UNSUPPORTED(); + ThreadPool Pool; + ThreadPoolTaskGroup Group(Pool); + + std::atomic_int checked_in1{0}; + + for (size_t i = 0; i < 5; ++i) { + Group.async([this, &Pool, &checked_in1] { + waitForMainThread(); + + ThreadPoolTaskGroup LocalGroup(Pool); + + // Check that waiting for an empty group is a no-op. + LocalGroup.wait(); + + std::atomic_int checked_in2{0}; + for (size_t i = 0; i < 5; ++i) { + LocalGroup.async([&checked_in2] { ++checked_in2; }); + } + LocalGroup.wait(); + ASSERT_EQ(5, checked_in2); + + ++checked_in1; + }); + } + ASSERT_EQ(0, checked_in1); + setMainThreadReady(); + Group.wait(); + ASSERT_EQ(5, checked_in1); +} + +TEST_F(ThreadPoolTest, RecursiveWaitDeadlock) { + CHECK_UNSUPPORTED(); + ThreadPoolStrategy S = hardware_concurrency(2); + if (S.compute_thread_count() < 2) + return; + ThreadPool Pool(S); + PhaseResetHelper Helper(this); + ThreadPoolTaskGroup Group(Pool); + + // Test that a thread calling wait() for a group and is waiting for more tasks + // returns when the last task finishes in a different thread while the waiting + // thread was waiting for more tasks to process while waiting. + + // Task A runs in the first thread. It finishes and leaves + // the background thread waiting for more tasks. + Group.async([this] { + waitForMainThread(); + setPhase(2); + }); + // Task B is run in a second thread, it launches yet another + // task C in a different group, which will be handled by the waiting + // thread started above. + Group.async([this, &Pool] { + waitForPhase(2); + ThreadPoolTaskGroup LocalGroup(Pool); + LocalGroup.async([this] { + waitForPhase(3); + // Give the other thread enough time to check that there's no task + // to process and suspend waiting for a notification. This is indeed racy, + // but probably the best that can be done. + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + }); + // And task B only now will wait for the tasks in the group (=task C) + // to finish. This test checks that it does not deadlock. If the + // `NotifyGroup` handling in ThreadPool::processTasks() didn't take place, + // this task B would be stuck waiting for tasks to arrive. + setPhase(3); + LocalGroup.wait(); + }); + setMainThreadReady(); + Group.wait(); +} + #if LLVM_ENABLE_THREADS == 1 // FIXME: Skip some tests below on non-Windows because multi-socket systems