diff --git a/llvm/include/llvm/Support/ThreadPool.h b/llvm/include/llvm/Support/ThreadPool.h --- a/llvm/include/llvm/Support/ThreadPool.h +++ b/llvm/include/llvm/Support/ThreadPool.h @@ -111,6 +111,7 @@ /// corresponding future. auto R = createTaskAndFuture(Task); + int requestedThreads; { // Lock the queue and push the new task std::unique_lock LockGuard(QueueLock); @@ -118,9 +119,10 @@ // Don't allow enqueueing after disabling the pool assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); Tasks.push(std::move(R.first)); - grow(); + requestedThreads = ActiveThreads + Tasks.size(); } QueueCondition.notify_one(); + grow(requestedThreads); return R.second.share(); #else // LLVM_ENABLE_THREADS Disabled @@ -135,28 +137,21 @@ } #if LLVM_ENABLE_THREADS - // Maybe create a new thread and add it to Threads. - // - // Requirements: - // * this->QueueLock should be owned by the calling thread prior to - // calling this function. It will neither lock it nor unlock it. - // Calling this function without owning QueueLock would result in data - // races as this function reads Tasks and ActiveThreads. - // * this->Tasks should be populated with any pending tasks. This function - // uses Tasks.size() to determine whether it needs to create a new thread. - // * this->ActiveThreads should be up to date as it is also used to - // determine whether to create a new thread. - void grow(); + // Grow to ensure that we have at least `requested` Threads, but do not go + // over MaxThreadCount. + void grow(int requested); #endif /// Threads in flight std::vector Threads; + /// Lock protecting access to the Threads vector. + mutable std::mutex ThreadsLock; /// Tasks waiting for execution in the pool. std::queue> Tasks; /// Locking and signaling for accessing the Tasks queue. - mutable std::mutex QueueLock; + std::mutex QueueLock; std::condition_variable QueueCondition; /// Signaling for job completion diff --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp --- a/llvm/lib/Support/ThreadPool.cpp +++ b/llvm/lib/Support/ThreadPool.cpp @@ -23,49 +23,51 @@ ThreadPool::ThreadPool(ThreadPoolStrategy S) : Strategy(S), MaxThreadCount(S.compute_thread_count()) {} -void ThreadPool::grow() { +void ThreadPool::grow(int requested) { + std::unique_lock LockGuard(ThreadsLock); if (Threads.size() >= MaxThreadCount) return; // Already hit the max thread pool size. - if (ActiveThreads + Tasks.size() <= Threads.size()) - return; // We have enough threads for now. - int ThreadID = Threads.size(); - Threads.emplace_back([this, ThreadID] { - Strategy.apply_thread_strategy(ThreadID); - while (true) { - std::function Task; - { - std::unique_lock LockGuard(QueueLock); - // Wait for tasks to be pushed in the queue - QueueCondition.wait(LockGuard, - [&] { return !EnableFlag || !Tasks.empty(); }); - // Exit condition - if (!EnableFlag && Tasks.empty()) - return; - // Yeah, we have a task, grab it and release the lock on the queue - - // We first need to signal that we are active before popping the queue - // in order for wait() to properly detect that even if the queue is - // empty, there is still a task in flight. - ++ActiveThreads; - Task = std::move(Tasks.front()); - Tasks.pop(); + int newThreadCount = std::min(requested, MaxThreadCount); + while (static_cast(Threads.size()) < newThreadCount) { + int ThreadID = Threads.size(); + Threads.emplace_back([this, ThreadID] { + Strategy.apply_thread_strategy(ThreadID); + while (true) { + std::function Task; + { + std::unique_lock LockGuard(QueueLock); + // Wait for tasks to be pushed in the queue + QueueCondition.wait(LockGuard, + [&] { return !EnableFlag || !Tasks.empty(); }); + // Exit condition + if (!EnableFlag && Tasks.empty()) + return; + // Yeah, we have a task, grab it and release the lock on the queue + + // We first need to signal that we are active before popping the queue + // in order for wait() to properly detect that even if the queue is + // empty, there is still a task in flight. + ++ActiveThreads; + Task = std::move(Tasks.front()); + Tasks.pop(); + } + // Run the task we just grabbed + Task(); + + bool Notify; + { + // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait() + std::lock_guard LockGuard(QueueLock); + --ActiveThreads; + Notify = workCompletedUnlocked(); + } + // Notify task completion if this is the last active thread, in case + // someone waits on ThreadPool::wait(). + if (Notify) + CompletionCondition.notify_all(); } - // Run the task we just grabbed - Task(); - - bool Notify; - { - // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait() - std::lock_guard LockGuard(QueueLock); - --ActiveThreads; - Notify = workCompletedUnlocked(); - } - // Notify task completion if this is the last active thread, in case - // someone waits on ThreadPool::wait(). - if (Notify) - CompletionCondition.notify_all(); - } - }); + }); + } } void ThreadPool::wait() { @@ -75,7 +77,7 @@ } bool ThreadPool::isWorkerThread() const { - std::unique_lock LockGuard(QueueLock); + std::unique_lock LockGuard(ThreadsLock); llvm::thread::id CurrentThreadId = llvm::this_thread::get_id(); for (const llvm::thread &Thread : Threads) if (CurrentThreadId == Thread.get_id()) @@ -90,6 +92,7 @@ EnableFlag = false; } QueueCondition.notify_all(); + std::unique_lock LockGuard(ThreadsLock); for (auto &Worker : Threads) Worker.join(); }