Index: lldb/trunk/include/lldb/Utility/TaskPool.h =================================================================== --- lldb/trunk/include/lldb/Utility/TaskPool.h +++ lldb/trunk/include/lldb/Utility/TaskPool.h @@ -0,0 +1,201 @@ +//===--------------------- TaskPool.h ---------------------------*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#ifndef utility_TaskPool_h_ +#define utility_TaskPool_h_ + +#include +#include +#include +#include +#include +#include +#include + +// Global TaskPool class for running tasks in parallel on a set of worker thread created the first +// time the task pool is used. The TaskPool provide no gurantee about the order the task will be run +// and about what tasks will run in parrallel. None of the task added to the task pool should block +// on something (mutex, future, condition variable) what will be set only by the completion of an +// other task on the task pool as they may run on the same thread sequentally. +class TaskPool +{ +public: + // Add a new task to the task pool and return a std::future belonging to the newly created task. + // The caller of this function has to wait on the future for this task to complete. + template + static std::future::type> + AddTask(F&& f, Args&&... args); + + // Run all of the specified tasks on the task pool and wait until all of them are finished + // before returning. This method is intended to be used for small number tasks where listing + // them as function arguments is acceptable. For running large number of tasks you should use + // AddTask for each task and then call wait() on each returned future. + template + static void + RunTasks(T&&... tasks); + +private: + TaskPool() = delete; + + template + struct RunTaskImpl; + + static void + AddTaskImpl(std::function&& task_fn); +}; + +// Wrapper class around the global TaskPool implementation to make it possible to create a set of +// tasks and then wait for the tasks to be completed by the WaitForNextCompletedTask call. This +// class should be used when WaitForNextCompletedTask is needed because this class add no other +// extra functionality to the TaskPool class and it have a very minor performance overhead. +template // The return type of the tasks what will be added to this task runner +class TaskRunner +{ +public: + // Add a task to the task runner what will also add the task to the global TaskPool. The + // function doesn't return the std::future for the task because it will be supplied by the + // WaitForNextCompletedTask after the task is completed. + template + void + AddTask(F&& f, Args&&... args); + + // Wait for the next task in this task runner to finish and then return the std::future what + // belongs to the finished task. If there is no task in this task runner (neither pending nor + // comleted) then this function will return an invalid future. Usually this function should be + // called in a loop processing the results of the tasks until it returns an invalid std::future + // what means that all task in this task runner is completed. + std::future + WaitForNextCompletedTask(); + + // Convenience method to wait for all task in this TaskRunner to finish. Do NOT use this class + // just because of this method. Use TaskPool instead and wait for each std::future returned by + // AddTask in a loop. + void + WaitForAllTasks(); + +private: + std::list> m_ready; + std::list> m_pending; + std::mutex m_mutex; + std::condition_variable m_cv; +}; + +template +std::future::type> +TaskPool::AddTask(F&& f, Args&&... args) +{ + auto task_sp = std::make_shared::type()>>( + std::bind(std::forward(f), std::forward(args)...)); + + AddTaskImpl([task_sp]() { (*task_sp)(); }); + + return task_sp->get_future(); +} + +template +void +TaskPool::RunTasks(T&&... tasks) +{ + RunTaskImpl::Run(std::forward(tasks)...); +} + +template +struct TaskPool::RunTaskImpl +{ + static void + Run(Head&& h, Tail&&... t) + { + auto f = AddTask(std::forward(h)); + RunTaskImpl::Run(std::forward(t)...); + f.wait(); + } +}; + +template<> +struct TaskPool::RunTaskImpl<> +{ + static void + Run() {} +}; + +template +template +void +TaskRunner::AddTask(F&& f, Args&&... args) +{ + std::unique_lock lock(m_mutex); + auto it = m_pending.emplace(m_pending.end()); + *it = std::move(TaskPool::AddTask( + [this, it](F f, Args... args) + { + T&& r = f(std::forward(args)...); + + std::unique_lock lock(this->m_mutex); + this->m_ready.emplace_back(std::move(*it)); + this->m_pending.erase(it); + lock.unlock(); + + this->m_cv.notify_one(); + return r; + }, + std::forward(f), + std::forward(args)...)); +} + +template <> +template +void +TaskRunner::AddTask(F&& f, Args&&... args) +{ + std::unique_lock lock(m_mutex); + auto it = m_pending.emplace(m_pending.end()); + *it = std::move(TaskPool::AddTask( + [this, it](F f, Args... args) + { + f(std::forward(args)...); + + std::unique_lock lock(this->m_mutex); + this->m_ready.emplace_back(std::move(*it)); + this->m_pending.erase(it); + lock.unlock(); + + this->m_cv.notify_one(); + }, + std::forward(f), + std::forward(args)...)); +} + +template +std::future +TaskRunner::WaitForNextCompletedTask() +{ + std::unique_lock lock(m_mutex); + if (m_ready.empty() && m_pending.empty()) + return std::future(); // No more tasks + + if (m_ready.empty()) + m_cv.wait(lock, [this](){ return !this->m_ready.empty(); }); + + std::future res = std::move(m_ready.front()); + m_ready.pop_front(); + + lock.unlock(); + res.wait(); + + return std::move(res); +} + +template +void +TaskRunner::WaitForAllTasks() +{ + while (WaitForNextCompletedTask().valid()); +} + +#endif // #ifndef utility_TaskPool_h_ Index: lldb/trunk/lldb.xcodeproj/project.pbxproj =================================================================== --- lldb/trunk/lldb.xcodeproj/project.pbxproj +++ lldb/trunk/lldb.xcodeproj/project.pbxproj @@ -719,6 +719,7 @@ 6D95DC021B9DC057000E318A /* SymbolFileDWARFDwo.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 6D95DBFF1B9DC057000E318A /* SymbolFileDWARFDwo.cpp */; }; 6D99A3631BBC2F3200979793 /* ArmUnwindInfo.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 6D99A3621BBC2F3200979793 /* ArmUnwindInfo.cpp */; }; 6D9AB3DD1BB2B74E003F2289 /* TypeMap.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 6D9AB3DC1BB2B74E003F2289 /* TypeMap.cpp */; }; + 6DEC6F391BD66D750091ABA6 /* TaskPool.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 6DEC6F381BD66D750091ABA6 /* TaskPool.cpp */; }; 8C2D6A53197A1EAF006989C9 /* MemoryHistory.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 8C2D6A52197A1EAF006989C9 /* MemoryHistory.cpp */; }; 8C2D6A5E197A250F006989C9 /* MemoryHistoryASan.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 8C2D6A5A197A1FDC006989C9 /* MemoryHistoryASan.cpp */; }; 8CCB017E19BA28A80009FD44 /* ThreadCollection.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 8CCB017A19BA283D0009FD44 /* ThreadCollection.cpp */; }; @@ -2421,6 +2422,8 @@ 6D99A3621BBC2F3200979793 /* ArmUnwindInfo.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = ArmUnwindInfo.cpp; path = source/Symbol/ArmUnwindInfo.cpp; sourceTree = ""; }; 6D9AB3DC1BB2B74E003F2289 /* TypeMap.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = TypeMap.cpp; path = source/Symbol/TypeMap.cpp; sourceTree = ""; }; 6D9AB3DE1BB2B76B003F2289 /* TypeMap.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; name = TypeMap.h; path = include/lldb/Symbol/TypeMap.h; sourceTree = ""; }; + 6DEC6F381BD66D750091ABA6 /* TaskPool.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = TaskPool.cpp; path = source/Utility/TaskPool.cpp; sourceTree = ""; }; + 6DEC6F3A1BD66D950091ABA6 /* TaskPool.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = TaskPool.h; path = include/lldb/Utility/TaskPool.h; sourceTree = ""; }; 8C2D6A52197A1EAF006989C9 /* MemoryHistory.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = MemoryHistory.cpp; path = source/Target/MemoryHistory.cpp; sourceTree = ""; }; 8C2D6A54197A1EBE006989C9 /* MemoryHistory.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; name = MemoryHistory.h; path = include/lldb/Target/MemoryHistory.h; sourceTree = ""; }; 8C2D6A5A197A1FDC006989C9 /* MemoryHistoryASan.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = MemoryHistoryASan.cpp; sourceTree = ""; }; @@ -3625,6 +3628,8 @@ 2682F168115ED9C800CCFF99 /* Utility */ = { isa = PBXGroup; children = ( + 6DEC6F3A1BD66D950091ABA6 /* TaskPool.h */, + 6DEC6F381BD66D750091ABA6 /* TaskPool.cpp */, 257E47151AA56C2000A62F81 /* ModuleCache.cpp */, 257E47161AA56C2000A62F81 /* ModuleCache.h */, 33064C9B1A5C7A490033D415 /* UriParser.h */, @@ -6687,6 +6692,7 @@ 265205A813D3E3F700132FE2 /* RegisterContextKDP_arm.cpp in Sources */, 265205AA13D3E3F700132FE2 /* RegisterContextKDP_i386.cpp in Sources */, AF77E0A31A033D360096C0EA /* RegisterContextMacOSXFrameBackchain.cpp in Sources */, + 6DEC6F391BD66D750091ABA6 /* TaskPool.cpp in Sources */, 265205AC13D3E3F700132FE2 /* RegisterContextKDP_x86_64.cpp in Sources */, 2628A4D513D4977900F5487A /* ThreadKDP.cpp in Sources */, 26D7E45D13D5E30A007FD12B /* SocketAddress.cpp in Sources */, Index: lldb/trunk/source/Utility/CMakeLists.txt =================================================================== --- lldb/trunk/source/Utility/CMakeLists.txt +++ lldb/trunk/source/Utility/CMakeLists.txt @@ -14,6 +14,7 @@ StringExtractor.cpp StringExtractorGDBRemote.cpp StringLexer.cpp + TaskPool.cpp TimeSpecTimeout.cpp UriParser.cpp ) Index: lldb/trunk/source/Utility/TaskPool.cpp =================================================================== --- lldb/trunk/source/Utility/TaskPool.cpp +++ lldb/trunk/source/Utility/TaskPool.cpp @@ -0,0 +1,88 @@ +//===--------------------- TaskPool.cpp -------------------------*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include "lldb/Utility/TaskPool.h" + +namespace +{ + class TaskPoolImpl + { + public: + static TaskPoolImpl& + GetInstance(); + + void + AddTask(std::function&& task_fn); + + private: + TaskPoolImpl(); + + static void + Worker(TaskPoolImpl* pool); + + std::queue> m_tasks; + std::mutex m_tasks_mutex; + uint32_t m_thread_count; + }; + +} // end of anonymous namespace + +TaskPoolImpl& +TaskPoolImpl::GetInstance() +{ + static TaskPoolImpl g_task_pool_impl; + return g_task_pool_impl; +} + +void +TaskPool::AddTaskImpl(std::function&& task_fn) +{ + TaskPoolImpl::GetInstance().AddTask(std::move(task_fn)); +} + +TaskPoolImpl::TaskPoolImpl() : + m_thread_count(0) +{ +} + +void +TaskPoolImpl::AddTask(std::function&& task_fn) +{ + static const uint32_t max_threads = std::thread::hardware_concurrency(); + + std::unique_lock lock(m_tasks_mutex); + m_tasks.emplace(std::move(task_fn)); + if (m_thread_count < max_threads) + { + m_thread_count++; + lock.unlock(); + + std::thread (Worker, this).detach(); + } +} + +void +TaskPoolImpl::Worker(TaskPoolImpl* pool) +{ + while (true) + { + std::unique_lock lock(pool->m_tasks_mutex); + if (pool->m_tasks.empty()) + { + pool->m_thread_count--; + break; + } + + std::function f = pool->m_tasks.front(); + pool->m_tasks.pop(); + lock.unlock(); + + f(); + } +} Index: lldb/trunk/unittests/Utility/CMakeLists.txt =================================================================== --- lldb/trunk/unittests/Utility/CMakeLists.txt +++ lldb/trunk/unittests/Utility/CMakeLists.txt @@ -1,4 +1,5 @@ add_lldb_unittest(UtilityTests StringExtractorTest.cpp + TaskPoolTest.cpp UriParserTest.cpp ) Index: lldb/trunk/unittests/Utility/TaskPoolTest.cpp =================================================================== --- lldb/trunk/unittests/Utility/TaskPoolTest.cpp +++ lldb/trunk/unittests/Utility/TaskPoolTest.cpp @@ -0,0 +1,62 @@ +#include "gtest/gtest.h" + +#include "lldb/Utility/TaskPool.h" + +TEST (TaskPoolTest, AddTask) +{ + auto fn = [](int x) { return x * x + 1; }; + + auto f1 = TaskPool::AddTask(fn, 1); + auto f2 = TaskPool::AddTask(fn, 2); + auto f3 = TaskPool::AddTask(fn, 3); + auto f4 = TaskPool::AddTask(fn, 4); + + ASSERT_EQ (10, f3.get()); + ASSERT_EQ ( 2, f1.get()); + ASSERT_EQ (17, f4.get()); + ASSERT_EQ ( 5, f2.get()); +} + +TEST (TaskPoolTest, RunTasks) +{ + std::vector r(4); + + auto fn = [](int x, int& y) { y = x * x + 1; }; + + TaskPool::RunTasks( + [fn, &r]() { fn(1, r[0]); }, + [fn, &r]() { fn(2, r[1]); }, + [fn, &r]() { fn(3, r[2]); }, + [fn, &r]() { fn(4, r[3]); } + ); + + ASSERT_EQ ( 2, r[0]); + ASSERT_EQ ( 5, r[1]); + ASSERT_EQ (10, r[2]); + ASSERT_EQ (17, r[3]); +} + +TEST (TaskPoolTest, TaskRunner) +{ + auto fn = [](int x) { return std::make_pair(x, x * x); }; + + TaskRunner> tr; + tr.AddTask(fn, 1); + tr.AddTask(fn, 2); + tr.AddTask(fn, 3); + tr.AddTask(fn, 4); + + int count = 0; + while (true) + { + auto f = tr.WaitForNextCompletedTask(); + if (!f.valid()) + break; + + ++count; + std::pair v = f.get(); + ASSERT_EQ (v.first * v.first, v.second); + } + + ASSERT_EQ(4, count); +}