Index: llvm/include/llvm/Support/ThreadPool.h =================================================================== --- llvm/include/llvm/Support/ThreadPool.h +++ llvm/include/llvm/Support/ThreadPool.h @@ -64,6 +64,9 @@ /// It is an error to try to add new tasks while blocking on this call. void wait(); + /// Blocking wait for the queue to have size at most Size. + void waitQueueSize(size_t Size = 0); + // TODO: misleading legacy name warning! // Returns the maximum number of worker threads in the pool, not the current // number of threads! @@ -153,6 +156,9 @@ std::mutex QueueLock; std::condition_variable QueueCondition; + /// Signaling for queue size decreases + std::condition_variable QueueSizeDecreaseCondition; + /// Signaling for job completion std::condition_variable CompletionCondition; Index: llvm/lib/Support/ThreadPool.cpp =================================================================== --- llvm/lib/Support/ThreadPool.cpp +++ llvm/lib/Support/ThreadPool.cpp @@ -54,6 +54,9 @@ ++ActiveThreads; Task = std::move(Tasks.front()); Tasks.pop(); + + // Notify the condition variable that the queue size has decreased. + QueueSizeDecreaseCondition.notify_one(); } // Run the task we just grabbed Task(); @@ -80,6 +83,13 @@ CompletionCondition.wait(LockGuard, [&] { return workCompletedUnlocked(); }); } +void ThreadPool::waitQueueSize(size_t Size) { + // Wait for the queue to have at most Size elements + std::unique_lock LockGuard(QueueLock); + QueueSizeDecreaseCondition.wait(LockGuard, + [&] { return Tasks.size() <= Size; }); +} + bool ThreadPool::isWorkerThread() const { std::unique_lock LockGuard(ThreadsLock); llvm::thread::id CurrentThreadId = llvm::this_thread::get_id();