Index: include/lldb/Utility/TaskPool.h =================================================================== --- include/lldb/Utility/TaskPool.h +++ include/lldb/Utility/TaskPool.h @@ -10,82 +10,16 @@ #ifndef utility_TaskPool_h_ #define utility_TaskPool_h_ -#include // for bind, function -#include -#include -#include // for make_shared -#include // for mutex, unique_lock, condition_variable -#include // for forward, result_of, move +#include -// Global TaskPool class for running tasks in parallel on a set of worker thread -// created the first -// time the task pool is used. The TaskPool provide no guarantee about the order -// the task will be run -// and about what tasks will run in parallel. None of the task added to the task -// pool should block -// on something (mutex, future, condition variable) what will be set only by the -// completion of an -// other task on the task pool as they may run on the same thread sequentally. -class TaskPool { -public: - // Add a new task to the task pool and return a std::future belonging to the - // newly created task. - // The caller of this function has to wait on the future for this task to - // complete. - template - static std::future::type> - AddTask(F &&f, Args &&... args); +namespace TaskPool { - // Run all of the specified tasks on the task pool and wait until all of them - // are finished - // before returning. This method is intended to be used for small number tasks - // where listing - // them as function arguments is acceptable. For running large number of tasks - // you should use - // AddTask for each task and then call wait() on each returned future. - template static void RunTasks(T &&... tasks); - -private: - TaskPool() = delete; - - template struct RunTaskImpl; - - static void AddTaskImpl(std::function &&task_fn); -}; - -template -std::future::type> -TaskPool::AddTask(F &&f, Args &&... args) { - auto task_sp = std::make_shared< - std::packaged_task::type()>>( - std::bind(std::forward(f), std::forward(args)...)); - - AddTaskImpl([task_sp]() { (*task_sp)(); }); - - return task_sp->get_future(); -} - -template void TaskPool::RunTasks(T &&... tasks) { - RunTaskImpl::Run(std::forward(tasks)...); +template void RunTasks(T &&... tasks) { + std::function cbs[sizeof...(T)]{tasks...}; + llvm::parallel::for_each_n(llvm::parallel::par, static_cast(0), + sizeof...(T), [&cbs](size_t idx) { cbs[idx](); }); } -template -struct TaskPool::RunTaskImpl { - static void Run(Head &&h, Tail &&... t) { - auto f = AddTask(std::forward(h)); - RunTaskImpl::Run(std::forward(t)...); - f.wait(); - } -}; - -template <> struct TaskPool::RunTaskImpl<> { - static void Run() {} -}; - -// Run 'func' on every value from begin .. end-1. Each worker will grab -// 'batch_size' numbers at a time to work on, so for very fast functions, batch -// should be large enough to avoid too much cache line contention. -void TaskMapOverInt(size_t begin, size_t end, - std::function const &func); +} // namespace TaskPool #endif // #ifndef utility_TaskPool_h_ Index: source/Plugins/SymbolFile/DWARF/SymbolFileDWARF.cpp =================================================================== --- source/Plugins/SymbolFile/DWARF/SymbolFileDWARF.cpp +++ source/Plugins/SymbolFile/DWARF/SymbolFileDWARF.cpp @@ -11,6 +11,7 @@ // Other libraries and framework includes #include "llvm/Support/Casting.h" +#include "llvm/Support/Parallel.h" #include "llvm/Support/Threading.h" #include "lldb/Core/ArchSpec.h" @@ -1991,13 +1992,15 @@ // a DIE in one compile unit refers to another and the indexes accesses // those DIEs. //---------------------------------------------------------------------- - TaskMapOverInt(0, num_compile_units, extract_fn); + llvm::parallel::for_each_n(llvm::parallel::par, 0U, num_compile_units, + extract_fn); // Now create a task runner that can index each DWARF compile unit in a // separate // thread so we can index quickly. - TaskMapOverInt(0, num_compile_units, parser_fn); + llvm::parallel::for_each_n(llvm::parallel::par, 0U, num_compile_units, + parser_fn); auto finalize_fn = [](NameToDIE &index, std::vector &srcs) { for (auto &src : srcs) Index: source/Utility/CMakeLists.txt =================================================================== --- source/Utility/CMakeLists.txt +++ source/Utility/CMakeLists.txt @@ -26,7 +26,6 @@ StringExtractorGDBRemote.cpp StringLexer.cpp StringList.cpp - TaskPool.cpp TildeExpressionResolver.cpp UserID.cpp UriParser.cpp Index: source/Utility/TaskPool.cpp =================================================================== --- source/Utility/TaskPool.cpp +++ /dev/null @@ -1,98 +0,0 @@ -//===--------------------- TaskPool.cpp -------------------------*- C++ -*-===// -// -// The LLVM Compiler Infrastructure -// -// This file is distributed under the University of Illinois Open Source -// License. See LICENSE.TXT for details. -// -//===----------------------------------------------------------------------===// - -#include "lldb/Utility/TaskPool.h" - -#include // for uint32_t -#include // for queue -#include // for thread - -namespace { -class TaskPoolImpl { -public: - static TaskPoolImpl &GetInstance(); - - void AddTask(std::function &&task_fn); - -private: - TaskPoolImpl(); - - static void Worker(TaskPoolImpl *pool); - - std::queue> m_tasks; - std::mutex m_tasks_mutex; - uint32_t m_thread_count; -}; - -} // end of anonymous namespace - -TaskPoolImpl &TaskPoolImpl::GetInstance() { - static TaskPoolImpl g_task_pool_impl; - return g_task_pool_impl; -} - -void TaskPool::AddTaskImpl(std::function &&task_fn) { - TaskPoolImpl::GetInstance().AddTask(std::move(task_fn)); -} - -TaskPoolImpl::TaskPoolImpl() : m_thread_count(0) {} - -void TaskPoolImpl::AddTask(std::function &&task_fn) { - static const uint32_t max_threads = std::thread::hardware_concurrency(); - - std::unique_lock lock(m_tasks_mutex); - m_tasks.emplace(std::move(task_fn)); - if (m_thread_count < max_threads) { - m_thread_count++; - // Note that this detach call needs to happen with the m_tasks_mutex held. - // This prevents the thread - // from exiting prematurely and triggering a linux libc bug - // (https://sourceware.org/bugzilla/show_bug.cgi?id=19951). - std::thread(Worker, this).detach(); - } -} - -void TaskPoolImpl::Worker(TaskPoolImpl *pool) { - while (true) { - std::unique_lock lock(pool->m_tasks_mutex); - if (pool->m_tasks.empty()) { - pool->m_thread_count--; - break; - } - - std::function f = pool->m_tasks.front(); - pool->m_tasks.pop(); - lock.unlock(); - - f(); - } -} - -void TaskMapOverInt(size_t begin, size_t end, - std::function const &func) { - std::atomic idx{begin}; - size_t num_workers = - std::min(end, std::thread::hardware_concurrency()); - - auto wrapper = [&idx, end, &func]() { - while (true) { - size_t i = idx.fetch_add(1); - if (i >= end) - break; - func(i); - } - }; - - std::vector> futures; - futures.reserve(num_workers); - for (size_t i = 0; i < num_workers; i++) - futures.push_back(TaskPool::AddTask(wrapper)); - for (size_t i = 0; i < num_workers; i++) - futures[i].wait(); -} Index: unittests/Utility/TaskPoolTest.cpp =================================================================== --- unittests/Utility/TaskPoolTest.cpp +++ unittests/Utility/TaskPoolTest.cpp @@ -2,20 +2,6 @@ #include "lldb/Utility/TaskPool.h" -TEST(TaskPoolTest, AddTask) { - auto fn = [](int x) { return x * x + 1; }; - - auto f1 = TaskPool::AddTask(fn, 1); - auto f2 = TaskPool::AddTask(fn, 2); - auto f3 = TaskPool::AddTask(fn, 3); - auto f4 = TaskPool::AddTask(fn, 4); - - ASSERT_EQ(10, f3.get()); - ASSERT_EQ(2, f1.get()); - ASSERT_EQ(17, f4.get()); - ASSERT_EQ(5, f2.get()); -} - TEST(TaskPoolTest, RunTasks) { std::vector r(4); @@ -29,15 +15,3 @@ ASSERT_EQ(10, r[2]); ASSERT_EQ(17, r[3]); } - -TEST(TaskPoolTest, TaskMap) { - int data[4]; - auto fn = [&data](int x) { data[x] = x * x; }; - - TaskMapOverInt(0, 4, fn); - - ASSERT_EQ(data[0], 0); - ASSERT_EQ(data[1], 1); - ASSERT_EQ(data[2], 4); - ASSERT_EQ(data[3], 9); -}