Index: include/lldb/Utility/TaskPool.h =================================================================== --- /dev/null +++ include/lldb/Utility/TaskPool.h @@ -0,0 +1,210 @@ +//===--------------------- TaskPool.h ---------------------------*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include +#include +#include +#include +#include +#include +#include + +class TaskPool +{ +public: + TaskPool() = delete; + + // Add a new task to the thread pool and return a std::future belongs for the newly created task. + // The caller of this function have to wait on the future for this task to complete. + template + static std::future::type> + AddTask(F&& f, Args&&... args); + + // Run all of the specified tasks on the thread pool and wait until all of them are finished + // before returning + template + static void + RunTasks(T&&... t); + +private: + template + struct RunTaskImpl; + + class TaskPoolImpl + { + public: + TaskPoolImpl(uint32_t num_threads); + + ~TaskPoolImpl(); + + template + std::future::type> + AddTask(F&& f, Args&&... args); + + void + Stop(); + + private: + static void + Worker(TaskPoolImpl* pool); + + std::queue> m_tasks; + std::mutex m_tasks_mutex; + std::condition_variable m_tasks_cv; + bool m_stop; + std::vector m_threads; + }; + + static TaskPoolImpl& + GetImplementation(); +}; + +template +class TaskRunner +{ +public: + template + void + AddTask(F&& f, Args&&... args); + + std::future + WaitForNextCompletedTask(); + + void + WaitForAllTask(); + +private: + std::list> m_ready; + std::list> m_pending; + std::mutex m_mutex; + std::condition_variable m_cv; +}; + +template +std::future::type> +TaskPool::AddTask(F&& f, Args&&... args) +{ + return GetImplementation().AddTask(std::forward(f), std::forward(args)...); +} + +template +void +TaskPool::RunTasks(T&&... t) +{ + RunTaskImpl::Run(std::forward(t)...); +} + +template +struct TaskPool::RunTaskImpl +{ + static void + Run(H&& h, T&&... t) + { + auto f = AddTask(std::forward(h)); + RunTaskImpl::Run(std::forward(t)...); + f.wait(); + } +}; + +template<> +struct TaskPool::RunTaskImpl<> +{ + static void + Run() {} +}; + +template +std::future::type> +TaskPool::TaskPoolImpl::AddTask(F&& f, Args&&... args) +{ + auto task = std::make_shared::type()>>( + std::bind(std::forward(f), std::forward(args)...)); + + std::unique_lock lock(m_tasks_mutex); + assert(!m_stop && "Can't add task to TaskPool after it is stopped"); + m_tasks.emplace([task](){ (*task)(); }); + lock.unlock(); + m_tasks_cv.notify_one(); + + return task->get_future(); +} + +template +template +void +TaskRunner::AddTask(F&& f, Args&&... args) +{ + std::unique_lock lock(m_mutex); + auto it = m_pending.emplace(m_pending.end()); + *it = std::move(TaskPool::AddTask( + [this, it](F&& f, Args&&... args) + { + T&& r = f(args...); + + std::unique_lock lock(this->m_mutex); + this->m_ready.emplace_back(std::move(*it)); + this->m_pending.erase(it); + lock.unlock(); + + this->m_cv.notify_one(); + return r; + }, + std::forward(f), + std::forward(args)...)); +} + +template <> +template +void +TaskRunner::AddTask(F&& f, Args&&... args) +{ + std::unique_lock lock(m_mutex); + auto it = m_pending.emplace(m_pending.end()); + *it = std::move(TaskPool::AddTask( + [this, it](F&& f, Args&&... args) + { + f(args...); + + std::unique_lock lock(this->m_mutex); + this->m_ready.emplace_back(std::move(*it)); + this->m_pending.erase(it); + lock.unlock(); + + this->m_cv.notify_one(); + }, + std::forward(f), + std::forward(args)...)); +} + +template +std::future +TaskRunner::WaitForNextCompletedTask() +{ + std::unique_lock lock(m_mutex); + if (m_ready.empty() && m_pending.empty()) + return std::future(); // No more tasks + + if (m_ready.empty()) + m_cv.wait(lock, [this](){ return !this->m_ready.empty(); }); + + std::future res = std::move(m_ready.front()); + m_ready.pop_front(); + + lock.unlock(); + res.wait(); + + return std::move(res); +} + +template +void +TaskRunner::WaitForAllTask() +{ + while (WaitForNextCompletedTask().valid()); +} Index: source/Utility/CMakeLists.txt =================================================================== --- source/Utility/CMakeLists.txt +++ source/Utility/CMakeLists.txt @@ -14,6 +14,7 @@ StringExtractor.cpp StringExtractorGDBRemote.cpp StringLexer.cpp + TaskPool.cpp TimeSpecTimeout.cpp UriParser.cpp ) Index: source/Utility/TaskPool.cpp =================================================================== --- /dev/null +++ source/Utility/TaskPool.cpp @@ -0,0 +1,59 @@ +//===--------------------- TaskPool.cpp -------------------------*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include "lldb/Utility/TaskPool.h" + +TaskPool::TaskPoolImpl& +TaskPool::GetImplementation() +{ + static TaskPool::TaskPoolImpl g_task_pool_impl(std::thread::hardware_concurrency()); + return g_task_pool_impl; +} + +TaskPool::TaskPoolImpl::TaskPoolImpl(uint32_t num_threads) : + m_stop(false) +{ + for (uint32_t i = 0; i < num_threads; ++i) + m_threads.emplace_back(Worker, this); +} + +TaskPool::TaskPoolImpl::~TaskPoolImpl() +{ + Stop(); +} + +void +TaskPool::TaskPoolImpl::Stop() +{ + std::unique_lock lock(m_tasks_mutex); + m_stop = true; + lock.unlock(); + m_tasks_cv.notify_all(); + for (auto& t : m_threads) + t.join(); +} + +void +TaskPool::TaskPoolImpl::Worker(TaskPoolImpl* pool) +{ + while (true) + { + std::unique_lock lock(pool->m_tasks_mutex); + if (pool->m_tasks.empty()) + pool->m_tasks_cv.wait(lock, [pool](){ return !pool->m_tasks.empty() || pool->m_stop; }); + if (pool->m_tasks.empty()) + break; + + std::function f = pool->m_tasks.front(); + pool->m_tasks.pop(); + lock.unlock(); + + f(); + } +}