Index: include/llvm/Support/ThreadPool.h =================================================================== --- include/llvm/Support/ThreadPool.h +++ include/llvm/Support/ThreadPool.h @@ -105,17 +105,34 @@ /// It is an error to try to add new tasks while blocking on this call. void wait(); + /// Get the number of spawned threads. + unsigned getNumSpawnedThreads() const; + private: /// Asynchronous submission of a task to the pool. The returned future can be /// used to wait for the task to finish and is *non-blocking* on destruction. std::shared_future asyncImpl(TaskTy F); - /// Threads in flight - std::vector Threads; + /// A 'wait' implementation which sequentially runs all queued tasks. + void sequentialWait(); + + /// An 'asyncImpl' implementation which queues tasks without performing any + /// locking. + std::shared_future sequentialAsyncImpl(TaskTy Task); /// Tasks waiting for execution in the pool. std::queue Tasks; +#if LLVM_ENABLE_THREADS // avoids warning for unused variable + /// Keep track of the number of thread actually busy + std::atomic ActiveThreads; + + /// Signal for the destruction of the pool, asking thread to exit. + bool EnableFlag; + + /// Threads in flight + std::vector Threads; + /// Locking and signaling for accessing the Tasks queue. std::mutex QueueLock; std::condition_variable QueueCondition; @@ -123,13 +140,6 @@ /// Locking and signaling for job completion std::mutex CompletionLock; std::condition_variable CompletionCondition; - - /// Keep track of the number of thread actually busy - std::atomic ActiveThreads; - -#if LLVM_ENABLE_THREADS // avoids warning for unused variable - /// Signal for the destruction of the pool, asking thread to exit. - bool EnableFlag; #endif }; } Index: lib/Support/ThreadPool.cpp =================================================================== --- lib/Support/ThreadPool.cpp +++ lib/Support/ThreadPool.cpp @@ -25,6 +25,11 @@ ThreadPool::ThreadPool(unsigned ThreadCount) : ActiveThreads(0), EnableFlag(true) { + // We don't achieve any extra parallelism by spawning just one thread, so + // don't do it. + if (ThreadCount == 1) + return; + // Create ThreadCount threads that will loop forever, wait on QueueCondition // for tasks to be queued or the Pool to be destroyed. Threads.reserve(ThreadCount); @@ -73,6 +78,12 @@ } void ThreadPool::wait() { + // Use the sequential 'wait' implementation if no threads are spawned. + if (!getNumSpawnedThreads()) { + sequentialWait(); + return; + } + // Wait for all threads to complete and the queue to be empty std::unique_lock LockGuard(CompletionLock); // The order of the checks for ActiveThreads and Tasks.empty() matters because @@ -83,6 +94,10 @@ } std::shared_future ThreadPool::asyncImpl(TaskTy Task) { + // Use the sequential 'async' implementation if no threads are spawned. + if (!getNumSpawnedThreads()) + return sequentialAsyncImpl(std::move(Task)); + /// Wrap the Task in a packaged_task to return a future object. PackagedTaskTy PackagedTask(std::move(Task)); auto Future = PackagedTask.get_future(); @@ -99,8 +114,17 @@ return Future.share(); } +unsigned ThreadPool::getNumSpawnedThreads() const { return Threads.size(); } + // The destructor joins all threads, waiting for completion. ThreadPool::~ThreadPool() { + // Don't bother setting a condition if no threads are spawned -- just fall + // through to wait(). + if (!getNumSpawnedThreads()) { + wait(); + return; + } + { std::unique_lock LockGuard(QueueLock); EnableFlag = false; @@ -115,15 +139,28 @@ ThreadPool::ThreadPool() : ThreadPool(0) {} // No threads are launched, issue a warning if ThreadCount is not 0 -ThreadPool::ThreadPool(unsigned ThreadCount) - : ActiveThreads(0) { +ThreadPool::ThreadPool(unsigned ThreadCount) { if (ThreadCount) { errs() << "Warning: request a ThreadPool with " << ThreadCount << " threads, but LLVM_ENABLE_THREADS has been turned off\n"; } } -void ThreadPool::wait() { +void ThreadPool::wait() { sequentialWait(); } + +std::shared_future ThreadPool::asyncImpl(TaskTy Task) { + return sequentialAsyncImpl(std::move(Task)); +} + +unsigned ThreadPool::getNumSpawnedThreads() const { return 0; } + +ThreadPool::~ThreadPool() { + wait(); +} + +#endif // LLVM_ENABLE_THREADS + +void ThreadPool::sequentialWait() { // Sequential implementation running the tasks while (!Tasks.empty()) { auto Task = std::move(Tasks.front()); @@ -136,7 +173,8 @@ } } -std::shared_future ThreadPool::asyncImpl(TaskTy Task) { +std::shared_future +ThreadPool::sequentialAsyncImpl(TaskTy Task) { #ifndef _MSC_VER // Get a Future with launch::deferred execution using std::async auto Future = std::async(std::launch::deferred, std::move(Task)).share(); @@ -144,15 +182,13 @@ // returned future can be sync'ed on. PackagedTaskTy PackagedTask([Future]() { Future.get(); }); #else - auto Future = std::async(std::launch::deferred, std::move(Task), false).share(); - PackagedTaskTy PackagedTask([Future](bool) -> bool { Future.get(); return false; }); + auto Future = + std::async(std::launch::deferred, std::move(Task), false).share(); + PackagedTaskTy PackagedTask([Future](bool) -> bool { + Future.get(); + return false; + }); #endif Tasks.push(std::move(PackagedTask)); return Future; } - -ThreadPool::~ThreadPool() { - wait(); -} - -#endif Index: unittests/Support/ThreadPool.cpp =================================================================== --- unittests/Support/ThreadPool.cpp +++ unittests/Support/ThreadPool.cpp @@ -164,3 +164,9 @@ } ASSERT_EQ(5, checked_in); } + +TEST_F(ThreadPoolTest, NoSpawnedThreads) { + CHECK_UNSUPPORTED(); + ThreadPool Pool(1); + ASSERT_EQ(Pool.getNumSpawnedThreads(), 0U); +}