Index: include/llvm/Support/ThreadPool.h =================================================================== --- include/llvm/Support/ThreadPool.h +++ include/llvm/Support/ThreadPool.h @@ -67,7 +67,7 @@ ThreadPool(); /// Construct a pool of \p ThreadCount threads - ThreadPool(unsigned ThreadCount); + explicit ThreadPool(unsigned ThreadCount); /// Blocking destructor: the pool will wait for all the threads to complete. ~ThreadPool(); @@ -113,23 +113,21 @@ /// Threads in flight std::vector Threads; + /// Locking and signaling for accessing the Tasks queue, the ActiveThreads + /// counter, and the InDestructorFlag. + std::mutex Mutex; + std::condition_variable SubmissionCondition; + std::condition_variable CompletionCondition; + /// Tasks waiting for execution in the pool. std::queue Tasks; - /// Locking and signaling for accessing the Tasks queue. - std::mutex QueueLock; - std::condition_variable QueueCondition; - - /// Locking and signaling for job completion - std::mutex CompletionLock; - std::condition_variable CompletionCondition; - /// Keep track of the number of thread actually busy - std::atomic ActiveThreads; + unsigned ActiveThreads; #if LLVM_ENABLE_THREADS // avoids warning for unused variable /// Signal for the destruction of the pool, asking thread to exit. - bool EnableFlag; + bool InDestructorFlag; #endif }; } Index: lib/Support/ThreadPool.cpp =================================================================== --- lib/Support/ThreadPool.cpp +++ lib/Support/ThreadPool.cpp @@ -24,31 +24,24 @@ ThreadPool::ThreadPool() : ThreadPool(std::thread::hardware_concurrency()) {} ThreadPool::ThreadPool(unsigned ThreadCount) - : ActiveThreads(0), EnableFlag(true) { - // Create ThreadCount threads that will loop forever, wait on QueueCondition - // for tasks to be queued or the Pool to be destroyed. + : ActiveThreads(0), InDestructorFlag(false) { + // Create ThreadCount threads that will loop forever, wait on + // SubmissionCondition for tasks to be queued or the Pool to be destroyed. Threads.reserve(ThreadCount); for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) { Threads.emplace_back([&] { while (true) { PackagedTaskTy Task; { - std::unique_lock LockGuard(QueueLock); + std::unique_lock LockGuard(Mutex); // Wait for tasks to be pushed in the queue - QueueCondition.wait(LockGuard, - [&] { return !EnableFlag || !Tasks.empty(); }); + SubmissionCondition.wait( + LockGuard, [&] { return InDestructorFlag || !Tasks.empty(); }); // Exit condition - if (!EnableFlag && Tasks.empty()) + if (InDestructorFlag && Tasks.empty()) return; // Yeah, we have a task, grab it and release the lock on the queue - - // We first need to signal that we are active before popping the queue - // in order for wait() to properly detect that even if the queue is - // empty, there is still a task in flight. - { - ++ActiveThreads; - std::unique_lock LockGuard(CompletionLock); - } + ++ActiveThreads; Task = std::move(Tasks.front()); Tasks.pop(); } @@ -60,8 +53,7 @@ #endif { - // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait() - std::unique_lock LockGuard(CompletionLock); + std::unique_lock LockGuard(Mutex); --ActiveThreads; } @@ -74,7 +66,7 @@ void ThreadPool::wait() { // Wait for all threads to complete and the queue to be empty - std::unique_lock LockGuard(CompletionLock); + std::unique_lock LockGuard(Mutex); CompletionCondition.wait(LockGuard, [&] { return Tasks.empty() && !ActiveThreads; }); } @@ -85,26 +77,35 @@ auto Future = PackagedTask.get_future(); { // Lock the queue and push the new task - std::unique_lock LockGuard(QueueLock); + std::unique_lock LockGuard(Mutex); // Don't allow enqueueing after disabling the pool - assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); + assert(!InDestructorFlag && + "Queuing a thread during ThreadPool destruction"); Tasks.push(std::move(PackagedTask)); } - QueueCondition.notify_one(); + SubmissionCondition.notify_one(); return Future.share(); } // The destructor joins all threads, waiting for completion. ThreadPool::~ThreadPool() { { - std::unique_lock LockGuard(QueueLock); - EnableFlag = false; + std::unique_lock LockGuard(Mutex); + InDestructorFlag = true; } - QueueCondition.notify_all(); + SubmissionCondition.notify_all(); for (auto &Worker : Threads) Worker.join(); + +#ifndef NDEBUG + { + std::unique_lock LockGuard(Mutex); + assert(ActiveThreads == 0 && + "Threads still running after all were joined."); + } +#endif } #else // LLVM_ENABLE_THREADS Disabled