diff --git a/libc/src/threads/linux/CMakeLists.txt b/libc/src/threads/linux/CMakeLists.txt --- a/libc/src/threads/linux/CMakeLists.txt +++ b/libc/src/threads/linux/CMakeLists.txt @@ -24,6 +24,7 @@ add_header_library( threads_utils HDRS + CndVar.h Futex.h Mutex.h Thread.h diff --git a/libc/src/threads/linux/CndVar.h b/libc/src/threads/linux/CndVar.h new file mode 100644 --- /dev/null +++ b/libc/src/threads/linux/CndVar.h @@ -0,0 +1,144 @@ +//===-- Utility condition variable class ------------------------*- C++ -*-===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#ifndef LLVM_LIBC_SRC_THREADS_LINUX_CNDVAR_H +#define LLVM_LIBC_SRC_THREADS_LINUX_CNDVAR_H + +#include "Futex.h" +#include "Mutex.h" + +#include "config/linux/syscall.h" // For syscall functions. +#include "include/sys/syscall.h" // For syscall numbers. +#include "include/threads.h" // For values like thrd_success etc. + +#include // For futex operations. +#include // For atomic operations + +namespace __llvm_libc { + +struct CndVar { + enum CndWaiterStatus : uint32_t { + WS_Waiting = 0xE, + WS_Signalled = 0x5, + }; + + struct CndWaiter { + FutexWord futex_word = WS_Waiting; + CndWaiter *next = nullptr; + }; + + CndWaiter *waitq_front; + CndWaiter *waitq_back; + Mutex qmtx; + + static int init(CndVar *cv) { + cv->waitq_front = cv->waitq_back = nullptr; + return Mutex::init(&cv->qmtx, mtx_plain); + } + + static int destroy(CndVar *cv) { + cv->waitq_front = cv->waitq_back = nullptr; + return thrd_success; + } + + int wait(Mutex *m) { + // The goal is to perform "unlock |m| and wait" in an + // atomic operation. However, it is not possible to do it + // in the true sense so we do it in spirit. Before unlocking + // |m|, a new waiter object is added to the waiter queue with + // the waiter queue locked. Iff a signalling thread signals + // the waiter before the waiter actually starts waiting, the + // wait operation will not begin at all and the waiter immediately + // returns. + + CndWaiter waiter; + { + MutexLock ml(&qmtx); + CndWaiter *old_back = nullptr; + if (waitq_front == nullptr) { + waitq_front = waitq_back = &waiter; + } else { + old_back = waitq_back; + waitq_back->next = &waiter; + waitq_back = &waiter; + } + + if (m->unlock() != thrd_success) { + // If we do not remove the queued up waiter before returning, + // then another thread can potentially signal a non-existing + // waiter. Note also that we do this with |qmtx| locked. This + // ensures that another thread will not signal the withdrawing + // waiter. + waitq_back = old_back; + if (waitq_back == nullptr) + waitq_front = nullptr; + else + waitq_back->next = nullptr; + + return thrd_error; + } + } + + __llvm_libc::syscall(SYS_futex, &waiter.futex_word, FUTEX_WAIT, WS_Waiting, + 0, 0, 0); + + // At this point, if locking |m| fails, we can simply return as the + // queued up waiter would have been removed from the queue. + return m->lock(); + } + + int notify_one() { + // We don't use an RAII locker in this method as we want to unlock + // |qmtx| and signal the waiter using a single FUTEX_WAKE_OP signal. + qmtx.lock(); + if (waitq_front == nullptr) { + qmtx.unlock(); + return thrd_success; + } + + CndWaiter *first = waitq_front; + waitq_front = waitq_front->next; + if (waitq_front == nullptr) + waitq_back = nullptr; + + atomic_store(&qmtx.futex_word, Mutex::MS_Free); + + __llvm_libc::syscall( + SYS_futex, &qmtx.futex_word, FUTEX_WAKE_OP, 1, 1, &first->futex_word, + FUTEX_OP(FUTEX_OP_SET, WS_Signalled, FUTEX_OP_CMP_EQ, WS_Waiting)); + return thrd_success; + } + + int broadcast() { + MutexLock ml(&qmtx); + FutexWord dummy_futex_word; + CndWaiter *waiter = waitq_front; + waitq_front = waitq_back = nullptr; + while (waiter != nullptr) { + // FUTEX_WAKE_OP is used instead of just FUTEX_WAKE as it allows us to + // atomically update the waiter status to WS_Signalled before waking + // up the waiter. A dummy location is used for the other futex of + // FUTEX_WAKE_OP. + __llvm_libc::syscall( + SYS_futex, &dummy_futex_word, FUTEX_WAKE_OP, 1, 1, + &waiter->futex_word, + FUTEX_OP(FUTEX_OP_SET, WS_Signalled, FUTEX_OP_CMP_EQ, WS_Waiting)); + waiter = waiter->next; + } + return thrd_success; + } +}; + +static_assert(sizeof(CndVar) == sizeof(cnd_t), + "Mismatch in the size of the " + "internal representation of condition variable and the public " + "cnd_t type."); + +} // namespace __llvm_libc + +#endif // LLVM_LIBC_SRC_THREADS_LINUX_CNDVAR_H diff --git a/libc/src/threads/linux/cnd_broadcast.cpp b/libc/src/threads/linux/cnd_broadcast.cpp --- a/libc/src/threads/linux/cnd_broadcast.cpp +++ b/libc/src/threads/linux/cnd_broadcast.cpp @@ -6,11 +6,16 @@ // //===----------------------------------------------------------------------===// +#include "CndVar.h" + #include "src/threads/cnd_broadcast.h" #include "src/__support/common.h" namespace __llvm_libc { -LLVM_LIBC_FUNCTION(int, cnd_broadcast, (cnd_t * cond)) { return thrd_success; } +LLVM_LIBC_FUNCTION(int, cnd_broadcast, (cnd_t * cond)) { + CndVar *cndvar = reinterpret_cast(cond); + return cndvar->broadcast(); +} } // namespace __llvm_libc diff --git a/libc/src/threads/linux/cnd_destroy.cpp b/libc/src/threads/linux/cnd_destroy.cpp --- a/libc/src/threads/linux/cnd_destroy.cpp +++ b/libc/src/threads/linux/cnd_destroy.cpp @@ -6,11 +6,16 @@ // //===----------------------------------------------------------------------===// +#include "CndVar.h" + #include "src/threads/cnd_destroy.h" #include "src/__support/common.h" namespace __llvm_libc { -LLVM_LIBC_FUNCTION(int, cnd_destroy, (cnd_t * cond)) { return thrd_success; } +LLVM_LIBC_FUNCTION(int, cnd_destroy, (cnd_t * cond)) { + CndVar *cndvar = reinterpret_cast(cond); + return CndVar::destroy(cndvar); +} } // namespace __llvm_libc diff --git a/libc/src/threads/linux/cnd_init.cpp b/libc/src/threads/linux/cnd_init.cpp --- a/libc/src/threads/linux/cnd_init.cpp +++ b/libc/src/threads/linux/cnd_init.cpp @@ -6,11 +6,16 @@ // //===----------------------------------------------------------------------===// +#include "CndVar.h" + #include "src/threads/cnd_init.h" #include "src/__support/common.h" namespace __llvm_libc { -LLVM_LIBC_FUNCTION(int, cnd_init, (cnd_t * cond)) { return thrd_success; } +LLVM_LIBC_FUNCTION(int, cnd_init, (cnd_t * cond)) { + CndVar *cndvar = reinterpret_cast(cond); + return CndVar::init(cndvar); +} } // namespace __llvm_libc diff --git a/libc/src/threads/linux/cnd_signal.cpp b/libc/src/threads/linux/cnd_signal.cpp --- a/libc/src/threads/linux/cnd_signal.cpp +++ b/libc/src/threads/linux/cnd_signal.cpp @@ -6,11 +6,16 @@ // //===----------------------------------------------------------------------===// +#include "CndVar.h" + #include "src/threads/cnd_signal.h" #include "src/__support/common.h" namespace __llvm_libc { -LLVM_LIBC_FUNCTION(int, cnd_signal, (cnd_t * cond)) { return thrd_success; } +LLVM_LIBC_FUNCTION(int, cnd_signal, (cnd_t * cond)) { + CndVar *cndvar = reinterpret_cast(cond); + return cndvar->notify_one(); +} } // namespace __llvm_libc diff --git a/libc/src/threads/linux/cnd_wait.cpp b/libc/src/threads/linux/cnd_wait.cpp --- a/libc/src/threads/linux/cnd_wait.cpp +++ b/libc/src/threads/linux/cnd_wait.cpp @@ -6,13 +6,18 @@ // //===----------------------------------------------------------------------===// +#include "CndVar.h" +#include "Mutex.h" + #include "src/threads/cnd_wait.h" #include "src/__support/common.h" namespace __llvm_libc { -LLVM_LIBC_FUNCTION(int, cnd_wait, (cnd_t * cond, mtx_t *mutex)) { - return thrd_success; +LLVM_LIBC_FUNCTION(int, cnd_wait, (cnd_t * cond, mtx_t *mtx)) { + CndVar *cndvar = reinterpret_cast(cond); + Mutex *mutex = reinterpret_cast(mtx); + return cndvar->wait(mutex); } } // namespace __llvm_libc diff --git a/libc/test/src/threads/CMakeLists.txt b/libc/test/src/threads/CMakeLists.txt --- a/libc/test/src/threads/CMakeLists.txt +++ b/libc/test/src/threads/CMakeLists.txt @@ -47,3 +47,23 @@ libc.src.threads.thrd_create libc.src.threads.thrd_join ) + +add_libc_unittest( + cnd_test + SUITE + libc_threads_unittests + SRCS + cnd_test.cpp + DEPENDS + libc.include.threads + libc.src.threads.cnd_init + libc.src.threads.cnd_broadcast + libc.src.threads.cnd_signal + libc.src.threads.cnd_wait + libc.src.threads.mtx_init + libc.src.threads.mtx_lock + libc.src.threads.mtx_unlock + libc.src.threads.thrd_create + libc.src.threads.thrd_join + libc.src.threads.linux.threads_utils +) diff --git a/libc/test/src/threads/cnd_test.cpp b/libc/test/src/threads/cnd_test.cpp new file mode 100644 --- /dev/null +++ b/libc/test/src/threads/cnd_test.cpp @@ -0,0 +1,81 @@ +//===-- Unittests for condition variable broadcast fucntionality ----------===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#include + +#include "include/threads.h" +#include "src/threads/cnd_broadcast.h" +#include "src/threads/cnd_destroy.h" +#include "src/threads/cnd_init.h" +#include "src/threads/cnd_signal.h" +#include "src/threads/cnd_wait.h" +#include "src/threads/mtx_init.h" +#include "src/threads/mtx_lock.h" +#include "src/threads/mtx_unlock.h" +#include "src/threads/thrd_create.h" +#include "src/threads/thrd_join.h" +#include "utils/UnitTest/Test.h" + +// The single test in this file tests all condition variable operations. The +// main thread spawns THRD_COUNT threads, each of which wait on a condition +// variable |broadcast_cnd|. After spawing the threads, it waits on another +// condition variable |threads_ready_cnd| which will be signalled by the last +// thread before it starts waiting on |broadcast_cnd|. On signalled by the +// last thread, the main thread then wakes up to broadcast to all waiting +// threads to wake up. Each of the THRD_COUNT child threads increment +// |broadcast_count| by 1 before they start waiting on |broadcast_cnd|, and +// decrement it by 1 after getting signalled on |broadcast_cnd|. + +constexpr int THRD_COUNT = 10000; + +static atomic_uint broadcast_count = 0; +static cnd_t broadcast_cnd, threads_ready_cnd; +static mtx_t broadcast_mtx, threads_ready_mtx; + +int broadcast_thread_func(void *) { + __llvm_libc::mtx_lock(&broadcast_mtx); + int oldval = atomic_fetch_add(&broadcast_count, 1); + if (oldval == THRD_COUNT - 1) { + __llvm_libc::mtx_lock(&threads_ready_mtx); + __llvm_libc::cnd_signal(&threads_ready_cnd); + __llvm_libc::mtx_unlock(&threads_ready_mtx); + } + + __llvm_libc::cnd_wait(&broadcast_cnd, &broadcast_mtx); + __llvm_libc::mtx_unlock(&broadcast_mtx); + atomic_fetch_sub(&broadcast_count, 1); + return 0; +} + +TEST(LlvmLibcCndVarTest, WaitNotifyBroadcastTest) { + __llvm_libc::cnd_init(&broadcast_cnd); + __llvm_libc::cnd_init(&threads_ready_cnd); + __llvm_libc::mtx_init(&broadcast_mtx, mtx_plain); + __llvm_libc::mtx_init(&threads_ready_mtx, mtx_plain); + + __llvm_libc::mtx_lock(&threads_ready_mtx); + thrd_t threads[THRD_COUNT]; + for (unsigned int i = 0; i < THRD_COUNT; ++i) + __llvm_libc::thrd_create(&threads[i], broadcast_thread_func, nullptr); + + __llvm_libc::cnd_wait(&threads_ready_cnd, &threads_ready_mtx); + __llvm_libc::mtx_unlock(&threads_ready_mtx); + + __llvm_libc::mtx_lock(&broadcast_mtx); + ASSERT_EQ(int(broadcast_count), THRD_COUNT); + __llvm_libc::cnd_broadcast(&broadcast_cnd); + __llvm_libc::mtx_unlock(&broadcast_mtx); + + for (unsigned int i = 0; i < THRD_COUNT; ++i) { + int retval = 0xBAD; + __llvm_libc::thrd_join(&threads[i], &retval); + ASSERT_EQ(retval, 0); + } + + ASSERT_EQ(int(broadcast_count), 0); +}