Index: include/experimental/execution =================================================================== --- /dev/null +++ include/experimental/execution @@ -0,0 +1,95 @@ +// -*- C++ -*- +//===-------------------------- execution ---------------------------------===// +// +// The LLVM Compiler Infrastructure +// +// This file is dual licensed under the MIT and the University of Illinois Open +// Source Licenses. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#ifndef _LIBCPP_EXPERIMENTAL_EXECUTION +#define _LIBCPP_EXPERIMENTAL_EXECUTION + +/* + execution synopsis + +namespace std { + +template +struct is_execution_policy; + +template +constexpr bool is_execution_policy_v = is_execution_policy::value; + +} // namespace std + +namespace std::execution { + +class sequenced_policy; +class parallel_policy; +class parallel_unsequenced_policy; + +constexpr sequenced_policy seq{unspecified}; + +constexpr parallel_policy par{unspecified}; + +constexpr parallel_unsequenced_policy par_unseq{unspecified}; + +} // namespace std::execution + +*/ + +#include +#include + +_LIBCPP_BEGIN_NAMESPACE_EXPERIMENTAL + +namespace execution { +class sequenced_policy {}; +class parallel_policy {}; +class parallel_unsequenced_policy {}; + +constexpr sequenced_policy seq{}; +constexpr parallel_policy par{}; +constexpr parallel_unsequenced_policy par_unseq{}; +} // namespace execution + +// is_execution_policy + +template +struct is_execution_policy : false_type {}; + +template <> +struct is_execution_policy : true_type {}; +template <> +struct is_execution_policy : true_type {}; +template <> +struct is_execution_policy : true_type { +}; + +#if _LIBCPP_STD_VER >= 14 +template +constexpr bool is_execution_policy_v = is_execution_policy<_Tp>::value; +#endif + +// API parallel algorithms will use. + +class __task; +typedef function __callable_task; + +class _LIBCPP_TYPE_VIS __task { + void* __impl_; + +public: + __task(void* __impl) : __impl_(__impl) {} + + void __fork(__callable_task&& __work); + void __join(); +}; + +void _LIBCPP_FUNC_VIS __evaluate_parallel_task(const __callable_task& __t); + +_LIBCPP_END_NAMESPACE_EXPERIMENTAL + +#endif Index: src/experimental/execution.cpp =================================================================== --- /dev/null +++ src/experimental/execution.cpp @@ -0,0 +1,243 @@ +// -*- C++ -*- +//===------------------------ execution.cpp -------------------------------===// +// +// The LLVM Compiler Infrastructure +// +// This file is dual licensed under the MIT and the University of Illinois Open +// Source Licenses. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include "experimental/__config" +#ifndef _LIBCPP_HAS_NO_THREADS + +#include "experimental/execution" +#include "atomic" +#include "deque" +#include "mutex" +#include "vector" +#include "thread" + +_LIBCPP_BEGIN_NAMESPACE_EXPERIMENTAL + +namespace { +class task_group; +class worker; +class bound_task; + +// FIXME: replace this with a lock free version. +template +class stealing_deque { + deque queue_; + mutex lock_; + +public: + stealing_deque() = default; + stealing_deque(const stealing_deque& other) : queue_(other.queue_) {} + + void emplace_back(T&& input) { + lock_guard ul(lock_); + queue_.emplace_back(move(input)); + } + + bool pop_back(T& output) { + lock_guard ul(lock_); + if (queue_.empty()) + return false; + output = queue_.back(); + queue_.pop_back(); + return true; + } + + bool steal(T& output) { + lock_guard ul(lock_); + if (queue_.empty()) + return false; + output = queue_.front(); + queue_.pop_front(); + return true; + } +}; + +// Task that has been fork()'d, but is free to be stolen by another worker. +struct unbound_task { + bound_task* parent_; + __callable_task task_; + + unbound_task() = default; + unbound_task(const __callable_task& task) : parent_(nullptr), task_(task) {} + unbound_task(bound_task* parent, __callable_task&& task) + : parent_(parent), task_(move(task)) {} +}; + +class worker { + stealing_deque queue_; + task_group& group_; + atomic dead_; + + friend class task_group; + + unsigned get_worker_id(); + +public: + worker(task_group& group) : group_(group), dead_(true) {} + worker(const worker& other) + : queue_(other.queue_), group_(other.group_), dead_(true) {} + + void defer(unbound_task&& task) { queue_.emplace_back(move(task)); } + + template + void wait(const Fp& until); + void worker_main(); +}; + +class task_group { + vector workers_; + + mutex global_tasks_mutex_; + vector<__callable_task> global_tasks_; + + // This task group could potentially be running many parallel algorithms. This + // keeps track of how many root tasks are currently being run. When it reaches + // 0, workers stop trying to steal work and return. + atomic parallel_users_; + + friend class worker; + + void activate_workers() { + for (worker& w : workers_) { + bool expected = true; + if (w.dead_.compare_exchange_strong(expected, false)) + thread(&worker::worker_main, &w).detach(); + } + } + +public: + task_group(unsigned threads = thread::hardware_concurrency()) + : workers_(threads == 0 ? 1 : threads, worker(*this)) {} + + // Start up a root task, and wait until it finishes. + void evaluate_top_level_task(const __callable_task& task) { + // If we're reviving (or starting) this task group, start the workers. + if (++parallel_users_ == 1) + activate_workers(); + + condition_variable cv; + mutex m; + unique_lock guard(m); + + { + lock_guard lg(global_tasks_mutex_); + global_tasks_.emplace_back([&](__task t) { + task(t); + m.lock(), m.unlock(); // Make sure that the caller is waiting. + cv.notify_one(); + }); + } + + cv.wait(guard); + --parallel_users_; + } + + bool steal(unbound_task& task, unsigned worker_id) { + unsigned size = workers_.size(); + for (unsigned worker = (worker_id + 1) % size, count = 0; count < size; + ++count, worker = (worker + 1) % size) + if (workers_[worker].queue_.steal(task)) + return true; + return false; + } + + bool steal_global(unbound_task& task) { + lock_guard lg(global_tasks_mutex_); + if (global_tasks_.empty()) + return false; + task = unbound_task(global_tasks_.back()); + global_tasks_.pop_back(); + return true; + } + + bool active() const { return parallel_users_.load(); } +}; + +// Task that is bound to a specific worker. +class bound_task { + unbound_task bound_; + worker* worker_; + atomic ref_count_; + + friend class worker; + void eval() { + bound_.task_(this); + if (bound_.parent_) // This may be the first task. + --bound_.parent_->ref_count_; + } + +public: + bound_task(unbound_task&& binding, worker* worker) + : bound_(move(binding)), worker_(worker), ref_count_(0) {} + + void fork(__callable_task&& child) { + ++ref_count_; + worker_->defer(unbound_task(this, move(child))); + } + + void join() { + worker_->wait([this] { return ref_count_.load() == 0; }); + } +}; +} // namespace + +template +inline void worker::wait(const Fp& until) { + while (!until()) { + unbound_task task; + if (!queue_.pop_back(task)) { + // queue_ can only be added to by this thread, so there is no point in + // checking it now that we know its empty. Spin looking for tasks + // elsewhere. + while (true) { + if (group_.steal_global(task) || group_.steal(task, get_worker_id())) + break; + if (until()) + return; + this_thread::yield(); + } + } + + // We've acquired some task by some means; eval it. + try { + bound_task bound(move(task), this); + bound.eval(); + bound.join(); + } catch (...) { + // All execution policies require terminate() to be called here. + terminate(); + } + } +} + +inline unsigned worker::get_worker_id() { + return static_cast(distance(&*group_.workers_.begin(), this)); +} + +inline void worker::worker_main() { + // Continue execution until all tasks have been completed. + wait([this] { return !group_.active(); }); + dead_.store(true); +} + +void __task::__fork(__callable_task&& work) { + static_cast(__impl_)->fork(move(work)); +} + +void __task::__join() { static_cast(__impl_)->join(); } + +void __evaluate_parallel_task(const __callable_task& work) { + static task_group parallel_task_group; + parallel_task_group.evaluate_top_level_task(move(work)); +} + +_LIBCPP_END_NAMESPACE_EXPERIMENTAL + +#endif // _LIBCPP_HAS_NO_THREADS Index: test/libcxx/experimental/execution/fork_join.pass.cpp =================================================================== --- /dev/null +++ test/libcxx/experimental/execution/fork_join.pass.cpp @@ -0,0 +1,58 @@ +//===----------------------------------------------------------------------===// +// +// The LLVM Compiler Infrastructure +// +// This file is dual licensed under the MIT and the University of Illinois Open +// Source Licenses. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include +#include +#include + +// UNSUPPORTED: libcpp-has-no-threads +// UNSUPPORTED: c++98, c++03 + +using namespace std; +using namespace experimental; + +int parallel_fibb(int n, __task t) { + if (n < 2) + return 1; + + int lhs; + t.__fork([&](__task t) { lhs = parallel_fibb(n - 2, t); }); + int rhs = parallel_fibb(n - 1, t); + t.__join(); + return rhs + lhs; +} + +void fork_many(int n, __task t) { + atomic count(0); + for (int i = 0; i < n; ++i) + t.__fork([&](__task) { ++count; }); + + t.__join(); + assert(count.load() == n); +} + +int main() { + { + int res; + __evaluate_parallel_task([&](__task t) { res = parallel_fibb(10, t); }); + assert(res == 89); + } + + { + __evaluate_parallel_task([&](__task t) { fork_many(100, t); }); + } + + { + auto f = async(launch::async, [] { + __evaluate_parallel_task([&](__task t) { fork_many(100, t); }); + }); + __evaluate_parallel_task([&](__task t) { fork_many(100, t); }); + f.get(); + } +}