diff --git a/libcxx/benchmarks/CMakeLists.txt b/libcxx/benchmarks/CMakeLists.txt --- a/libcxx/benchmarks/CMakeLists.txt +++ b/libcxx/benchmarks/CMakeLists.txt @@ -189,6 +189,8 @@ map.bench.cpp monotonic_buffer.bench.cpp ordered_set.bench.cpp + stop_token.async_shutdown.bench.cpp + stop_token.jthread.bench.cpp std_format_spec_string_unicode.bench.cpp string.bench.cpp stringstream.bench.cpp diff --git a/libcxx/benchmarks/stop_token.async_shutdown.bench.cpp b/libcxx/benchmarks/stop_token.async_shutdown.bench.cpp new file mode 100644 --- /dev/null +++ b/libcxx/benchmarks/stop_token.async_shutdown.bench.cpp @@ -0,0 +1,73 @@ +//===----------------------------------------------------------------------===// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#include +#include +#include +#include +#include + +using namespace std::chrono_literals; + +constexpr size_t thread_count = 20; +constexpr size_t concurrent_request_count = 1000; +std::atomic ready{false}; + +struct dummy_stop_callback { + void operator()() const noexcept {} +}; + +void thread_func(uint64_t* count, std::stop_token st) { + std::vector>> cbs(concurrent_request_count); + + ready.wait(false); + + std::uint32_t index = 0; + std::uint64_t local_count = 0; + while (!st.stop_requested()) { + cbs[index].emplace(st, dummy_stop_callback{}); + index = (index + 1) % concurrent_request_count; + ++local_count; + } + *count = local_count; +} + +template +struct on_scope_exit { + on_scope_exit(F f) : f_(std::move(f)) {} + on_scope_exit(const on_scope_exit&) = delete; + on_scope_exit(on_scope_exit&&) = delete; + + F f_; + ~on_scope_exit() { f_(); } +}; + +int main() { + std::vector counts(thread_count, 0); + std::stop_source ss; + { + std::vector threads; + + { + auto release_on_exit = on_scope_exit([&]() { + ready = true; + ready.notify_all(); + }); + for (size_t i = 0; i < thread_count; ++i) { + threads.emplace_back(thread_func, &counts[i], ss.get_token()); + } + } + + std::this_thread::sleep_for(std::chrono::seconds(10)); + + ss.request_stop(); + } + + std::uint64_t total_count = std::reduce(counts.begin(), counts.end()); + + std::cout << "Total iterations of " << thread_count << " threads for 10s was " << total_count << "\n"; +} diff --git a/libcxx/benchmarks/stop_token.jthread.bench.cpp b/libcxx/benchmarks/stop_token.jthread.bench.cpp new file mode 100644 --- /dev/null +++ b/libcxx/benchmarks/stop_token.jthread.bench.cpp @@ -0,0 +1,29 @@ +//===----------------------------------------------------------------------===// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#include +#include +#include +#include + +using namespace std::chrono_literals; + +int main() { + std::uint64_t count = 0; + { + std::jthread t([&](std::stop_token st) { + std::uint64_t local_count = 0; + while (!st.stop_requested()) { + std::stop_callback cb{st, [&]() noexcept {}}; + ++local_count; + } + count = local_count; + }); + std::this_thread::sleep_for(30s); + } + std::cout << "Thread did " << count << " callback registration/deregistration in 30s\n"; +} diff --git a/libcxx/include/__stop_token/stop_state.h b/libcxx/include/__stop_token/stop_state.h --- a/libcxx/include/__stop_token/stop_state.h +++ b/libcxx/include/__stop_token/stop_state.h @@ -12,7 +12,6 @@ #include <__availability> #include <__config> -#include <__stop_token/atomic_unique_lock.h> #include <__stop_token/intrusive_list_view.h> #include <__thread/this_thread.h> #include <__thread/thread.h> @@ -38,10 +37,51 @@ bool* __destroyed_ = nullptr; }; +// stop_token needs to lock with noexcept. mutex::lock can throw. +// wrap it with a while loop and catch all exceptions +class __nothrow_mutex_lock { + std::mutex& __mutex_; + bool __is_locked_; + +public: + _LIBCPP_HIDE_FROM_ABI explicit __nothrow_mutex_lock(std::mutex& __mutex) noexcept + : __mutex_(__mutex), __is_locked_(true) { + __lock(); + } + + __nothrow_mutex_lock(const __nothrow_mutex_lock&) = delete; + __nothrow_mutex_lock(__nothrow_mutex_lock&&) = delete; + __nothrow_mutex_lock& operator=(const __nothrow_mutex_lock&) = delete; + __nothrow_mutex_lock& operator=(__nothrow_mutex_lock&&) = delete; + + _LIBCPP_HIDE_FROM_ABI ~__nothrow_mutex_lock() { + if (__is_locked_) { + __unlock(); + } + } + + _LIBCPP_HIDE_FROM_ABI bool __owns_lock() const noexcept { return __is_locked_; } + + _LIBCPP_HIDE_FROM_ABI void __lock() noexcept { + while (true) { + try { + __mutex_.lock(); + break; + } catch (...) { + } + } + __is_locked_ = true; + } + + _LIBCPP_HIDE_FROM_ABI void __unlock() noexcept { + __mutex_.unlock(); // throws nothing + __is_locked_ = false; + } +}; + class __stop_state { static constexpr uint32_t __stop_requested_bit = 1; - static constexpr uint32_t __callback_list_locked_bit = 1 << 1; - static constexpr uint32_t __stop_source_counter_shift = 2; + static constexpr uint32_t __stop_source_counter_shift = 1; // The "stop_source counter" is not used for lifetime reference counting. // When the number of stop_source reaches 0, the remaining stop_tokens's @@ -50,17 +90,18 @@ // The "callback list locked" bit implements the atomic_unique_lock to // guard the operations on the callback list // - // 31 - 2 | 1 | 0 | - // stop_source counter | callback list locked | stop_requested | + // 31 - 1 | 0 | + // stop_source counter | stop_requested | atomic __state_ = 0; // Reference count for stop_token + stop_callback + stop_source // When the counter reaches zero, the state is destroyed // It is used by __intrusive_shared_ptr, but it is stored here for better layout atomic __ref_count_ = 0; + std::mutex __mutex_; using __state_t = uint32_t; - using __callback_list_lock = __atomic_unique_lock<__state_t, __callback_list_locked_bit>; + using __callback_list_lock = __nothrow_mutex_lock; using __callback_list = __intrusive_list_view<__stop_callback_base>; __callback_list __callback_list_; @@ -102,10 +143,12 @@ } _LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool __request_stop() noexcept { - auto __cb_list_lock = __try_lock_for_request_stop(); - if (!__cb_list_lock.__owns_lock()) { + __callback_list_lock __cb_list_lock(__mutex_); + auto __old = __state_.fetch_or(__stop_requested_bit, std::memory_order_release); + if ((__old & __stop_requested_bit) == __stop_requested_bit) { return false; } + __requesting_thread_ = this_thread::get_id(); while (!__callback_list_.__empty()) { @@ -139,20 +182,16 @@ } _LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool __add_callback(__stop_callback_base* __cb) noexcept { - // If it is already stop_requested. Do not try to request it again. - const auto __give_up_trying_to_lock_condition = [__cb](__state_t __state) { - if ((__state & __stop_requested_bit) != 0) { - // already stop requested, synchronously run the callback and no need to lock the list again - __cb->__invoke(); - return true; - } - // no stop source. no need to lock the list to add the callback as it can never be invoked - return (__state >> __stop_source_counter_shift) == 0; - }; - - __callback_list_lock __cb_list_lock(__state_, __give_up_trying_to_lock_condition); + __callback_list_lock __cb_list_lock(__mutex_); + auto __state = __state_.load(std::memory_order_acquire); + if ((__state & __stop_requested_bit) != 0) { + // already stop requested, synchronously run the callback and no need to lock the list again + __cb->__invoke(); + return false; + } - if (!__cb_list_lock.__owns_lock()) { + // no stop source. no need to lock the list to add the callback as it can never be invoked + if ((__state >> __stop_source_counter_shift) == 0) { return false; } @@ -166,7 +205,7 @@ // called by the destructor of stop_callback _LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI void __remove_callback(__stop_callback_base* __cb) noexcept { - __callback_list_lock __cb_list_lock(__state_); + __callback_list_lock __cb_list_lock(__mutex_); // under below condition, the request_stop call just popped __cb from the list and could execute it now bool __potentially_executing_now = __cb->__prev_ == nullptr && !__callback_list_.__is_head(__cb); @@ -193,29 +232,6 @@ } private: - _LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI __callback_list_lock __try_lock_for_request_stop() noexcept { - // If it is already stop_requested, do not try to request stop or lock the list again. - const auto __lock_fail_condition = [](__state_t __state) { return (__state & __stop_requested_bit) != 0; }; - - // set locked and requested bit at the same time - const auto __after_lock_state = [](__state_t __state) { - return __state | __callback_list_locked_bit | __stop_requested_bit; - }; - - // acq because [thread.stoptoken.intro] Registration of a callback synchronizes with the invocation of that - // callback. We are going to invoke the callback after getting the lock, acquire so that we can see the - // registration of a callback (and other writes that happens-before the add_callback) - // Note: the rel (unlock) in the add_callback syncs with this acq - // rel because [thread.stoptoken.intro] A call to request_stop that returns true synchronizes with a call - // to stop_requested on an associated stop_token or stop_source object that returns true. - // We need to make sure that all writes (including user code) before request_stop will be made visible - // to the threads that waiting for `stop_requested == true` - // Note: this rel syncs with the acq in `stop_requested` - const auto __locked_ordering = std::memory_order_acq_rel; - - return __callback_list_lock(__state_, __lock_fail_condition, __after_lock_state, __locked_ordering); - } - template friend struct __intrusive_shared_ptr_traits; };