Index: include/lld/Core/Parallel.h =================================================================== --- /dev/null +++ include/lld/Core/Parallel.h @@ -0,0 +1,238 @@ +//===- lld/Core/Parallel.h - Parallel utilities ---------------------------===// +// +// The LLVM Linker +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#ifndef LLD_CORE_PARALLEL_H +#define LLD_CORE_PARALLEL_H + +#include "lld/Core/Instrumentation.h" +#include "lld/Core/LLVM.h" +#include "lld/Core/range.h" + +#ifdef _MSC_VER +// Exceptions are disabled so this isn't defined, but concrt assumes it is. +namespace { +void *__uncaught_exception() { return nullptr; } +} +#endif + +#include +#include +#include +#include +#include +#include + +#ifdef _MSC_VER +#include +#include +#endif + +namespace lld { +class Latch { + std::atomic _count; + mutable std::mutex _condMut; + mutable std::condition_variable _cond; + +public: + Latch(uint32_t count = 0) : _count(count) {} + ~Latch() { sync(); } + void inc() { ++_count; } + + void dec() { + if (--_count == 0) + _cond.notify_all(); + } + + void sync() const { + std::unique_lock lock(_condMut); + _cond.wait(lock, [&] { + return _count == 0; + }); + } +}; + +class Executor { +public: + virtual ~Executor() {} + virtual void add(std::function func) = 0; +}; + +class ThreadPoolExecutor : public Executor { +public: + ThreadPoolExecutor(std::size_t threadCount = + std::thread::hardware_concurrency()) + : _stop(false), _done(threadCount) { + // Spawn all but one of the threads in another thread as spawning threads can + // take a while. + std::thread([&, threadCount] { + for (std::size_t i = 1; i < threadCount; ++i) { + std::thread([=] { + work(); + }).detach(); + } + work(); + }).detach(); + } + + ~ThreadPoolExecutor() { + _stop = true; + _cond.notify_all(); + // Wait for ~Latch. + } + + virtual void add(std::function f) { + std::unique_lock lock(_mutex); + _workQueue.push(f); + lock.unlock(); + _cond.notify_one(); + } + +private: + void work() { + while (true) { + std::unique_lock lock(_mutex); + _cond.wait(lock, [&] { + return _stop || !_workQueue.empty(); + }); + if (_stop) + break; + auto task = _workQueue.front(); + _workQueue.pop(); + lock.unlock(); + task(); + } + _done.dec(); + } + + std::atomic _stop; + std::queue> _workQueue; + std::mutex _mutex; + std::condition_variable _cond; + Latch _done; +}; + +#ifdef _MSC_VER +/// \brief An Executor that runs tasks via ConcRT. +class ConcRTExecutor : public Executor { + struct Taskish { + Taskish(std::function task) : _task(task) {} + + std::function _task; + + static void run(void *p) { + Taskish *self = static_cast(p); + self->_task(); + delete self; + } + }; + +public: + virtual void add(std::function func) { + Concurrency::CurrentScheduler::ScheduleTask(Taskish::run, + new Taskish(func)); + } +}; + +inline Executor *getDefaultExecutor() { + static ConcRTExecutor exec; + return &exec; +} +#else +inline Executor *getDefaultExecutor() { + static ThreadPoolExecutor exec; + return &exec; +} +#endif + +class TaskGroup { + Latch _latch; + +public: + void spawn(std::function f) { + _latch.inc(); + getDefaultExecutor()->add([&, f] { + f(); + _latch.dec(); + }); + } + + void sync() const { _latch.sync(); } +}; + +#ifdef _MSC_VER +// Use ppl parallel_sort on Windows. +template +void parallel_sort( + RandomAccessIterator start, RandomAccessIterator end, + const Comp &comp = std::less< + typename std::iterator_traits::value_type>()) { + concurrency::parallel_sort(start, end, comp); +} +#else +namespace detail { +const ptrdiff_t minParallelSize = 1024; +const size_t maxDepth = 1024; + +/// \brief Inclusive median. +template +RandomAccessIterator medianOf3(RandomAccessIterator start, + RandomAccessIterator end, const Comp &comp) { + RandomAccessIterator mid = start + (std::distance(start, end) / 2); + return comp(*start, *(end - 1)) + ? (comp(*mid, *(end - 1)) ? (comp(*start, *mid) ? mid : start) + : end - 1) + : (comp(*mid, *start) ? (comp(*(end - 1), *mid) ? mid : end - 1) + : start); +} + +template +void parallel_quick_sort(RandomAccessIterator start, RandomAccessIterator end, + const Comp &comp, TaskGroup &tg, size_t depth = 0) { + // Do a sequential sort for small inputs. + if (std::distance(start, end) < detail::minParallelSize || depth > maxDepth) { + std::sort(start, end, comp); + return; + } + + // Partition. + auto pivot = medianOf3(start, end, comp); + // Move pivot to end. + std::swap(*(end - 1), *pivot); + pivot = std::partition(start, end - 1, [end](decltype(*start) v) { + return v < *(end - 1); + }); + // Move pivot to middle of partition. + std::swap(*pivot, *(end - 1)); + + // Recurse. + tg.spawn([=, &tg] { + parallel_quick_sort(start, pivot, comp, tg, depth + 1); + }); + tg.spawn([=, &tg] { + parallel_quick_sort(pivot + 1, end, comp, tg, depth + 1); + }); +} +} + +template +void parallel_sort( + RandomAccessIterator start, RandomAccessIterator end, + const Comp &comp = std::less< + typename std::iterator_traits::value_type>()) { + TaskGroup tg; + detail::parallel_quick_sort(start, end, comp, tg); +} +#endif + +template void parallel_sort(T *start, T *end) { + parallel_sort(start, end, std::less()); +} +} // end namespace lld + +#endif Index: unittests/CoreTests/CMakeLists.txt =================================================================== --- unittests/CoreTests/CMakeLists.txt +++ unittests/CoreTests/CMakeLists.txt @@ -1,3 +1,4 @@ add_lld_unittest(CoreTests + ParallelTest.cpp RangeTest.cpp ) Index: unittests/CoreTests/ParallelTest.cpp =================================================================== --- /dev/null +++ unittests/CoreTests/ParallelTest.cpp @@ -0,0 +1,33 @@ +//===- lld/unittest/ParallelTest.cpp --------------------------------------===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +/// +/// \file +/// \brief Parallel.h unit tests. +/// +//===----------------------------------------------------------------------===// + +#include "gtest/gtest.h" + +#include "lld/Core/Parallel.h" + +#include +#include + +uint32_t array[1024 * 1024]; + +TEST(Parallel, sort) { + std::mt19937 randEngine; + std::uniform_int_distribution dist; + + for (auto &i : array) + i = dist(randEngine); + + lld::parallel_sort(std::begin(array), std::end(array)); + ASSERT_TRUE(std::is_sorted(std::begin(array), std::end(array))); +}