diff --git a/libc/cmake/modules/LLVMLibCTestRules.cmake b/libc/cmake/modules/LLVMLibCTestRules.cmake --- a/libc/cmake/modules/LLVMLibCTestRules.cmake +++ b/libc/cmake/modules/LLVMLibCTestRules.cmake @@ -421,7 +421,7 @@ "INTEGRATION_TEST" "" # No optional arguments "SUITE" # Single value arguments - "SRCS;HDRS;DEPENDS;ARGS;ENV;COMPILE_OPTIONS" # Multi-value arguments + "SRCS;HDRS;DEPENDS;ARGS;ENV;COMPILE_OPTIONS;LOADER_ARGS" # Multi-value arguments ${ARGN} ) @@ -533,6 +533,7 @@ ${fq_target_name} COMMAND ${INTEGRATION_TEST_ENV} $<$:${gpu_loader_exe}> + ${INTEGRATION_TEST_LOADER_ARGS} $ ${INTEGRATION_TEST_ARGS} COMMENT "Running integration test ${fq_target_name}" ) diff --git a/libc/src/__support/CPP/atomic.h b/libc/src/__support/CPP/atomic.h --- a/libc/src/__support/CPP/atomic.h +++ b/libc/src/__support/CPP/atomic.h @@ -90,6 +90,10 @@ return __atomic_fetch_or(&val, mask, int(mem_ord)); } + T fetch_and(T mask, MemoryOrder mem_ord = MemoryOrder::SEQ_CST) { + return __atomic_fetch_and(&val, mask, int(mem_ord)); + } + T fetch_sub(T decrement, MemoryOrder mem_ord = MemoryOrder::SEQ_CST) { return __atomic_fetch_sub(&val, decrement, int(mem_ord)); } diff --git a/libc/src/__support/OSUtil/gpu/io.h b/libc/src/__support/OSUtil/gpu/io.h --- a/libc/src/__support/OSUtil/gpu/io.h +++ b/libc/src/__support/OSUtil/gpu/io.h @@ -16,10 +16,6 @@ void write_to_stderr(cpp::string_view msg); -LIBC_INLINE void write_to_stderr(const char *msg) { - write_to_stderr(cpp::string_view(msg)); -} - } // namespace __llvm_libc #endif // LLVM_LIBC_SRC_SUPPORT_OSUTIL_LINUX_IO_H diff --git a/libc/src/__support/OSUtil/gpu/io.cpp b/libc/src/__support/OSUtil/gpu/io.cpp --- a/libc/src/__support/OSUtil/gpu/io.cpp +++ b/libc/src/__support/OSUtil/gpu/io.cpp @@ -14,34 +14,10 @@ namespace __llvm_libc { -namespace internal { - -static constexpr size_t BUFFER_SIZE = sizeof(rpc::Buffer) - sizeof(uint64_t); -static constexpr size_t MAX_STRING_SIZE = BUFFER_SIZE; - -LIBC_INLINE void send_null_terminated(cpp::string_view src) { - rpc::client.run( - [&](rpc::Buffer *buffer) { - buffer->data[0] = rpc::Opcode::PRINT_TO_STDERR; - char *data = reinterpret_cast(&buffer->data[1]); - inline_memcpy(data, src.data(), src.size()); - data[src.size()] = '\0'; - }, - [](rpc::Buffer *) { /* void */ }); -} - -} // namespace internal - void write_to_stderr(cpp::string_view msg) { - bool send_empty_string = true; - for (; !msg.empty();) { - const auto chunk = msg.substr(0, internal::MAX_STRING_SIZE); - internal::send_null_terminated(chunk); - msg.remove_prefix(chunk.size()); - send_empty_string = false; - } - if (send_empty_string) - internal::send_null_terminated(""); + rpc::Client::Port port = rpc::client.open(rpc::PRINT_TO_STDERR); + port.send_n(msg.data(), msg.size() + 1); + port.close(); } } // namespace __llvm_libc diff --git a/libc/src/__support/OSUtil/gpu/quick_exit.cpp b/libc/src/__support/OSUtil/gpu/quick_exit.cpp --- a/libc/src/__support/OSUtil/gpu/quick_exit.cpp +++ b/libc/src/__support/OSUtil/gpu/quick_exit.cpp @@ -17,14 +17,11 @@ namespace __llvm_libc { void quick_exit(int status) { - // TODO: Support asynchronous calls so we don't wait and exit from the GPU - // immediately. - rpc::client.run( - [&](rpc::Buffer *buffer) { - buffer->data[0] = rpc::Opcode::EXIT; - buffer->data[1] = status; - }, - [](rpc::Buffer *) { /* void */ }); + rpc::Client::Port port = rpc::client.open(rpc::EXIT); + port.send([&](rpc::Buffer *buffer) { + reinterpret_cast(buffer->data)[0] = status; + }); + port.close(); #if defined(LIBC_TARGET_ARCH_IS_NVPTX) asm("exit;" ::: "memory"); diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h --- a/libc/src/__support/RPC/rpc.h +++ b/libc/src/__support/RPC/rpc.h @@ -20,148 +20,488 @@ #include "rpc_util.h" #include "src/__support/CPP/atomic.h" +#include "src/__support/CPP/optional.h" +#include "src/string/memory_utils/memcpy_implementations.h" #include namespace __llvm_libc { namespace rpc { -/// A list of opcodes that we use to invoke certain actions on the server. We -/// reserve the first 255 values for internal libc usage. -enum Opcode : uint64_t { +/// A list of opcodes that we use to invoke certain actions on the server. +enum Opcode : uint16_t { NOOP = 0, PRINT_TO_STDERR = 1, EXIT = 2, - LIBC_LAST = (1UL << 8) - 1, + TEST_INCREMENT = 3, }; /// A fixed size channel used to communicate between the RPC client and server. -struct Buffer { - uint64_t data[8]; +struct alignas(64) Buffer { + uint8_t data[62]; + uint16_t opcode; +}; +static_assert(sizeof(Buffer) == 64, "Buffer size mismatch"); + +// The data structure Process is essentially four arrays of the same length +// indexed by port_t. The operations on process provide mutally exclusive +// access to the Buffer element at index port_t::value. Ownership alternates +// between the client and the server instance. +// The template parameters I, O correspond to the runtime +// values of the state machine implemented here from the perspective of the +// current process. They are tracked in the type system to raise errors on +// attempts to make invalid transitions or to use the protected buffer +// while the other process owns it. + +template struct port_t { + static_assert(I == 0 || I == 1, ""); + static_assert(O == 0 || O == 1, ""); + uint32_t value; + + port_t(uint32_t value) : value(value) {} + + port_t invert_inbox() { return value; } + port_t invert_outbox() { return value; } +}; + +/// Bitmap deals with consistently picking the address that corresponds to a +/// given port instance. 'Slot' is used to mean an index into the shared arrays +/// which may not be currently bound to a port. + +template +struct bitmap_t { +private: + cpp::Atomic *underlying; + using Word = uint32_t; + + inline uint32_t index_to_element(uint32_t x) { + uint32_t wordBits = 8 * sizeof(Word); + return x / wordBits; + } + + inline uint32_t index_to_subindex(uint32_t x) { + uint32_t wordBits = 8 * sizeof(Word); + return x % wordBits; + } + + inline bool nthbitset(uint32_t x, uint32_t n) { + return x & (UINT32_C(1) << n); + } + + inline bool nthbitset(uint64_t x, uint32_t n) { + return x & (UINT64_C(1) << n); + } + + inline uint32_t setnthbit(uint32_t x, uint32_t n) { + return x | (UINT32_C(1) << n); + } + + inline uint64_t setnthbit(uint64_t x, uint32_t n) { + return x | (UINT64_C(1) << n); + } + + static constexpr bool system_scope() { + return scope == __OPENCL_MEMORY_SCOPE_ALL_SVM_DEVICES; + } + static constexpr bool device_scope() { + return scope == __OPENCL_MEMORY_SCOPE_DEVICE; + } + + static_assert(system_scope() || device_scope(), ""); + static_assert(system_scope() != device_scope(), ""); + + Word load_word(uint32_t w) const { + cpp::Atomic &addr = underlying[w]; + Word tmp = addr.load(cpp::MemoryOrder::RELAXED); + return InvertedLoad ? ~tmp : tmp; + } + +public: + bitmap_t() /*: underlying(nullptr)*/ {} + bitmap_t(cpp::Atomic *d) : underlying(d) { + // can't necessarily write to the pointer from this object. if the memory is + // on a gpu, but this instance is being constructed on a cpu first, then + // direct writes will fail. However, the data does need to be zeroed for the + // bitmap to work. + } + + bool read_slot(uint32_t slot) { + uint32_t w = index_to_element(slot); + uint32_t subindex = index_to_subindex(slot); + return nthbitset(load_word(w), subindex); + } + + // Does not change inbox as no process ever writes to it's own inbox + // Knows that the outbox is initially zero which allows using fetch_add + // to set the bit over pci-e, otherwise we would need to use a CAS loop + template port_t claim_slot(port_t port) { + uint32_t slot = port.value; + + uint32_t w = index_to_element(slot); + uint32_t subindex = index_to_subindex(slot); + + cpp::Atomic &addr = underlying[w]; + Word before; + if (system_scope()) { + // System scope is used here to approximate 'might be over pcie', where + // the available atomic operations are likely to be CAS, Add, Exchange. + // Set the bit using the knowledge that it is currently clear. + Word addend = (Word)1 << subindex; + before = addr.fetch_add(addend, cpp::MemoryOrder::ACQ_REL); + } else { + // Set the bit more clearly. TODO: device scope is missing from atomic.h + Word mask = setnthbit((Word)0, subindex); + before = addr.fetch_or(mask, cpp::MemoryOrder::ACQ_REL); + } + + (void)before; + return port.invert_outbox(); + } + + // Release also does not change the inbox. Assumes the outbox is set + template port_t release_slot(port_t port) { + release_slot(port.value); + return port.invert_outbox(); + } + + // Not wholly typed as called to drop partially constructed ports, locks + void release_slot(uint32_t i) { + uint32_t w = index_to_element(i); + uint32_t subindex = index_to_subindex(i); + + cpp::Atomic &addr = underlying[w]; + + if (system_scope()) { + // Clear the bit using the knowledge that it is currently set. + Word addend = 1 + ~((Word)1 << subindex); + addr.fetch_add(addend, cpp::MemoryOrder::ACQ_REL); + } else { + // Clear the bit more clearly + Word mask = ~setnthbit((Word)0, subindex); + addr.fetch_and(mask, cpp::MemoryOrder::ACQ_REL); + } + } + + // Only used on the bitmap used for device local mutual exclusion. Does not + // hit shared memory. + bool try_claim_slot(uint32_t slot) { + uint32_t w = index_to_element(slot); + uint32_t subindex = index_to_subindex(slot); + + static_assert(device_scope(), ""); + + Word mask = setnthbit((Word)0, subindex); + + // Fetch or implementing test and set as a faster alternative to CAS + cpp::Atomic &addr = underlying[w]; + uint32_t before = addr.fetch_or(mask, cpp::MemoryOrder::ACQ_REL); + + return !nthbitset(before, subindex); + } }; /// A common process used to synchronize communication between a client and a /// server. The process contains an inbox and an outbox used for signaling /// ownership of the shared buffer. -struct Process { - LIBC_INLINE Process() = default; - LIBC_INLINE Process(const Process &) = default; - LIBC_INLINE Process &operator=(const Process &) = default; - LIBC_INLINE ~Process() = default; +template struct Process { - cpp::Atomic *inbox; - cpp::Atomic *outbox; - Buffer *buffer; + // The inverted read on inbox determines which of two linked processes + // initially owns the underlying buffer. + // The initial memory state is inbox == outbox == 0 which implies ownership, + // but one process has inbox read bitwise inverted. That starts out believing + // believing the memory state is inbox == 1, outbox == 0, which implies the + // other process owns the buffer. + BufferElement *shared_buffer; + bitmap_t active; + bitmap_t inbox; + bitmap_t outbox; + + Process() = default; + ~Process() = default; + + Process(cpp::Atomic *locks, cpp::Atomic *inbox, + cpp::Atomic *outbox, BufferElement *shared_buffer) + : shared_buffer(shared_buffer), active(locks), inbox(inbox), + outbox(outbox) {} /// Initialize the communication channels. - LIBC_INLINE void reset(void *inbox, void *outbox, void *buffer) { - *this = { - reinterpret_cast *>(inbox), - reinterpret_cast *>(outbox), - reinterpret_cast(buffer), - }; + LIBC_INLINE void reset(void *locks, void *inbox, void *outbox, void *buffer) { + this->active = reinterpret_cast *>(locks); + this->inbox = reinterpret_cast *>(inbox); + this->outbox = reinterpret_cast *>(outbox); + this->shared_buffer = reinterpret_cast(buffer); + } + + template struct maybe { + T value; + bool success; + }; + + /// Try to claim one of the buffer elements for this warp/wavefront/wave + maybe> try_open() { + + // Only one port available at present + uint32_t p = 0; + { + bool claim = active.try_claim_slot(p); + if (!claim) { + return {UINT32_MAX, false}; + } + + atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); + bool in = inbox.read_slot(p); + bool out = outbox.read_slot(p); + + if (in == 0 && out == 0) { + // Only return a port in the 0, 0 state + return {p, true}; + } + + if (in == 1 && out == 1) { + // Garbage collect from an async call + // Missing from previous implementation, would leak on odd numbers of send/recv + outbox.release_slot(p); + } + + // Other values mean the buffer is not available to this process + active.release_slot(p); + } + + return {UINT32_MAX, false}; + } + + /// Release a port. Any inbox/outbox state is acceptable. + template void close(port_t port) { + active.release_slot(port.value); + } + + /// Call a function Op on the owned buffer. Note I==O is required. + template + void apply(port_t, Op op) { + op(shared_buffer); + } + + /// Release ownership of the buffer to the other process. + /// Requires I==O to call, returns I!=O. + template + port_t post(port_t port) { + atomic_thread_fence(cpp::MemoryOrder::RELEASE); + if constexpr (IandO == 0) { + return outbox.claim_slot(port); + } else { + return outbox.release_slot(port); + } + } + + /// Wait for the buffer to be returned by the other process. + /// Equivalently, for the other process to close the port. + /// Requires I!=O to call, returns I==O + template port_t wait(port_t port) { + bool in = inbox.read_slot(port.value); + while (in == I) { + sleep_briefly(); + in = inbox.read_slot(port.value); + } + + atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); + return port.invert_inbox(); } }; +/// The port provides the interface to communicate between the multiple +/// processes. A port is conceptually an index into the memory provided by the +/// underlying process that is guarded by a lock bit. +template struct PortT { + // TODO: This should be move-only. + LIBC_INLINE PortT(Process &process, uint64_t index, + uint32_t out) + : process(process), index(index), out(out) {} + LIBC_INLINE PortT(const PortT &) = default; + LIBC_INLINE PortT &operator=(const PortT &) = delete; + LIBC_INLINE ~PortT() = default; + + template LIBC_INLINE void recv(U use); + template LIBC_INLINE void send(F fill); + template + LIBC_INLINE void send_and_recv(F fill, U use); + template LIBC_INLINE void recv_and_send(W work); + LIBC_INLINE void send_n(const void *src, uint64_t size); + template LIBC_INLINE void recv_n(A alloc); + + LIBC_INLINE uint16_t get_opcode() const { + return process.shared_buffer[index].opcode; + } + + LIBC_INLINE void close() { + port_t<1, 0> tmp(index); + process.close(tmp); + } + +private: + Process &process; + uint32_t index; + uint32_t out; +}; + /// The RPC client used to make requests to the server. -struct Client : public Process { +/// The 'false' parameter to Process means this instance can open ports first +struct Client : public Process { LIBC_INLINE Client() = default; LIBC_INLINE Client(const Client &) = default; LIBC_INLINE Client &operator=(const Client &) = default; LIBC_INLINE ~Client() = default; - template LIBC_INLINE void run(F fill, U use); + using Port = PortT; + LIBC_INLINE cpp::optional try_open(uint16_t opcode); + LIBC_INLINE Port open(uint16_t opcode); }; /// The RPC server used to respond to the client. -struct Server : public Process { +/// The 'true' parameter to Process means all ports will be unavailable +/// initially, until Client has opened one and then called post on it. +struct Server : public Process { LIBC_INLINE Server() = default; LIBC_INLINE Server(const Server &) = default; LIBC_INLINE Server &operator=(const Server &) = default; LIBC_INLINE ~Server() = default; - template LIBC_INLINE bool handle(W work, C clean); + using Port = PortT; + LIBC_INLINE cpp::optional try_open(); + LIBC_INLINE Port open(); }; -/// Run the RPC client protocol to communicate with the server. We perform the -/// following high level actions to complete a communication: -/// - Apply \p fill to the shared buffer and write 1 to the outbox. -/// - Wait until the inbox is 1. -/// - Apply \p use to the shared buffer and write 0 to the outbox. -/// - Wait until the inbox is 0. -template LIBC_INLINE void Client::run(F fill, U use) { - bool in = inbox->load(cpp::MemoryOrder::RELAXED); - bool out = outbox->load(cpp::MemoryOrder::RELAXED); - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); - // Apply the \p fill to the buffer and signal the server. - if (!in & !out) { - fill(buffer); - atomic_thread_fence(cpp::MemoryOrder::RELEASE); - outbox->store(1, cpp::MemoryOrder::RELAXED); +/// Applies \p fill to the shared buffer and initiates a send operation. +template +template +LIBC_INLINE void PortT::send(F fill) { + // index in Port corresponds to .value in port_t + // Maintaining the invariant that a port is owned by the other side + // before and after Port::send or Port::recv + if (out == 0) { + port_t<1, 0> port0(index); + port_t<0, 0> port1 = process.wait(port0); + process.apply(port1, fill); + port_t<0, 1> port2 = process.post(port1); out = 1; - } - // Wait for the server to work on the buffer and respond. - if (!in & out) { - while (!in) { - sleep_briefly(); - in = inbox->load(cpp::MemoryOrder::RELAXED); - } - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); - } - // Apply \p use to the buffer and signal the server. - if (in & out) { - use(buffer); - atomic_thread_fence(cpp::MemoryOrder::RELEASE); - outbox->store(0, cpp::MemoryOrder::RELAXED); + } else { + port_t<0, 1> port0(index); + port_t<1, 1> port1 = process.wait(port0); + process.apply(port1, fill); + port_t<1, 0> port2 = process.post(port1); out = 0; } - // Wait for the server to signal the end of the protocol. - if (in & !out) { - while (in) { - sleep_briefly(); - in = inbox->load(cpp::MemoryOrder::RELAXED); - } - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); +} + +/// Applies \p use to the shared buffer and acknowledges the send. +template +template +LIBC_INLINE void PortT::recv(U use) { + // it's the same, dispatch implicit in the boolean template parameter + PortT::send(use); +} + +/// Combines a send and receive into a single function. +template +template +LIBC_INLINE void PortT::send_and_recv(F fill, + U use) { + send(fill); + recv(use); +} + +/// Combines a receive and send operation into a single function. The \p work +/// function modifies the buffer in-place and the send is only used to initiate +/// the copy back. +template +template +LIBC_INLINE void PortT::recv_and_send(W work) { + recv(work); + send([](Buffer *) { /* no-op */ }); +} + +/// Sends an arbitrarily sized data buffer \p src across the shared channel in +/// multiples of the packet length. +template +LIBC_INLINE void PortT::send_n(const void *src, + uint64_t size) { + // TODO: We could send the first bytes in this call and potentially save an + // extra send operation. + send([=](Buffer *buffer) { buffer->data[0] = size; }); + const uint8_t *ptr = reinterpret_cast(src); + for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { + send([=](Buffer *buffer) { + const uint64_t len = + size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx; + inline_memcpy(buffer->data, ptr + idx, len); + }); } } -/// Run the RPC server protocol to communicate with the client. This is -/// non-blocking and only checks the server a single time. We perform the -/// following high level actions to complete a communication: -/// - Query if the inbox is 1 and exit if there is no work to do. -/// - Apply \p work to the shared buffer and write 1 to the outbox. -/// - Wait until the inbox is 0. -/// - Apply \p clean to the shared buffer and write 0 to the outbox. -template -LIBC_INLINE bool Server::handle(W work, C clean) { - bool in = inbox->load(cpp::MemoryOrder::RELAXED); - bool out = outbox->load(cpp::MemoryOrder::RELAXED); - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); - // There is no work to do, exit early. - if (!in & !out) - return false; - // Apply \p work to the buffer and signal the client. - if (in & !out) { - work(buffer); - atomic_thread_fence(cpp::MemoryOrder::RELEASE); - outbox->store(1, cpp::MemoryOrder::RELAXED); - out = 1; +/// Receives an arbitrarily sized data buffer across the shared channel in +/// multiples of the packet length. The \p alloc function is called with the +/// size of the data so that we can initialize the size of the \p dst buffer. +template +template +LIBC_INLINE void PortT::recv_n(A alloc) { + uint64_t size = 0; + recv([&](Buffer *buffer) { size = buffer->data[0]; }); + uint8_t *dst = reinterpret_cast(alloc(size)); + for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { + recv([=](Buffer *buffer) { + uint64_t len = + size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx; + inline_memcpy(dst + idx, buffer->data, len); + }); } - // Wait for the client to use the buffer and respond. - if (in & out) { - while (in) - in = inbox->load(cpp::MemoryOrder::RELAXED); - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); +} + +/// Attempts to open a port to use as the client. The client can only open a +/// port if we find an index that is in a valid sending state. That is, there +/// are send operations pending that haven't been serviced on this port. Each +/// port instance uses an associated \p opcode to tell the server what to do. +LIBC_INLINE cpp::optional Client::try_open(uint16_t opcode) { + maybe> p = Process::try_open(); + if (!p.success) + return cpp::nullopt; + + shared_buffer->opcode = opcode; + return Port(*this, 0, 0); +} + +LIBC_INLINE Client::Port Client::open(uint16_t opcode) { + for (;;) { + if (cpp::optional p = try_open(opcode)) + return p.value(); + sleep_briefly(); } - // Clean up the buffer and signal the end of the protocol. - if (!in & out) { - clean(buffer); - atomic_thread_fence(cpp::MemoryOrder::RELEASE); - outbox->store(0, cpp::MemoryOrder::RELAXED); - out = 0; +} + +/// Attempts to open a port to use as the server. The server can only open a +/// port if it has a pending receive operation +LIBC_INLINE cpp::optional Server::try_open() { + uint32_t index = 0; + uint32_t in = inbox.read_slot(index); + uint32_t out = outbox.read_slot(index); + + // The server is passive, if there is no work pending don't bother + // opening a port. + if (in != out) + return cpp::nullopt; + + maybe> p = Process::try_open(); + if (!p.success) { + return cpp::nullopt; } - return true; + return Port(*this, index, 0); +} + +LIBC_INLINE Server::Port Server::open() { + for (;;) { + if (cpp::optional p = try_open()) + return p.value(); + sleep_briefly(); + } } } // namespace rpc diff --git a/libc/startup/gpu/amdgpu/start.cpp b/libc/startup/gpu/amdgpu/start.cpp --- a/libc/startup/gpu/amdgpu/start.cpp +++ b/libc/startup/gpu/amdgpu/start.cpp @@ -8,12 +8,14 @@ #include "src/__support/RPC/rpc_client.h" +static __llvm_libc::cpp::Atomic lock; + extern "C" int main(int argc, char **argv, char **envp); extern "C" [[gnu::visibility("protected"), clang::amdgpu_kernel]] void _start(int argc, char **argv, char **envp, int *ret, void *in, void *out, void *buffer) { - __llvm_libc::rpc::client.reset(in, out, buffer); + __llvm_libc::rpc::client.reset(&lock, in, out, buffer); __atomic_fetch_or(ret, main(argc, argv, envp), __ATOMIC_RELAXED); } diff --git a/libc/startup/gpu/nvptx/start.cpp b/libc/startup/gpu/nvptx/start.cpp --- a/libc/startup/gpu/nvptx/start.cpp +++ b/libc/startup/gpu/nvptx/start.cpp @@ -8,12 +8,14 @@ #include "src/__support/RPC/rpc_client.h" +static __llvm_libc::cpp::Atomic lock; + extern "C" int main(int argc, char **argv, char **envp); extern "C" [[gnu::visibility("protected")]] __attribute__((nvptx_kernel)) void _start(int argc, char **argv, char **envp, int *ret, void *in, void *out, void *buffer) { - __llvm_libc::rpc::client.reset(in, out, buffer); + __llvm_libc::rpc::client.reset(&lock, in, out, buffer); __atomic_fetch_or(ret, main(argc, argv, envp), __ATOMIC_RELAXED); } diff --git a/libc/test/integration/startup/gpu/CMakeLists.txt b/libc/test/integration/startup/gpu/CMakeLists.txt --- a/libc/test/integration/startup/gpu/CMakeLists.txt +++ b/libc/test/integration/startup/gpu/CMakeLists.txt @@ -12,3 +12,15 @@ FRANCE=Paris GERMANY=Berlin ) + +add_integration_test( + startup_rpc_test + SUITE libc-startup-tests + SRCS + rpc_test.cpp + DEPENDS + libc.src.__support.RPC.rpc_client + LOADER_ARGS + --blocks 1 + --threads 1 +) diff --git a/libc/test/integration/startup/gpu/rpc_test.cpp b/libc/test/integration/startup/gpu/rpc_test.cpp new file mode 100644 --- /dev/null +++ b/libc/test/integration/startup/gpu/rpc_test.cpp @@ -0,0 +1,47 @@ +//===-- Loader test to check the RPC interface with the loader ------------===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#include "src/__support/RPC/rpc_client.h" +#include "test/IntegrationTest/test.h" + +using namespace __llvm_libc; + +// TODO: These should be put in a common utility header. +#if defined(LIBC_TARGET_ARCH_IS_NVPTX) +uint32_t __nvvm_read_ptx_sreg_ctaid_x(); +uint32_t get_block_id() { return __nvvm_read_ptx_sreg_ctaid_x(); } +#elif defined(LIBC_TARGET_ARCH_IS_AMDGPU) +uint32_t __builtin_amdgcn_workitem_id_x(); +uint32_t get_block_id() { return __builtin_amdgcn_workgroup_id_x(); } +#else +uint32_t get_block_id() { return 0; } +#endif + +static void test_add_simple() { + // uint32_t num_additions = 1000 + 10 * get_block_id(); + uint32_t num_additions = 1000000; + uint64_t cnt = 0; + for (uint32_t i = 0; i < num_additions; ++i) { + rpc::Client::Port port = rpc::client.open(rpc::TEST_INCREMENT); + port.send_and_recv( + [=](rpc::Buffer *buffer) { + reinterpret_cast(buffer->data)[0] = cnt; + }, + [&](rpc::Buffer *buffer) { + cnt = reinterpret_cast(buffer->data)[0]; + }); + port.close(); + } + ASSERT_TRUE(cnt == num_additions && "Incorrect sum"); +} + +TEST_MAIN(int argc, char **argv, char **envp) { + test_add_simple(); + + return 0; +} diff --git a/libc/utils/gpu/loader/CMakeLists.txt b/libc/utils/gpu/loader/CMakeLists.txt --- a/libc/utils/gpu/loader/CMakeLists.txt +++ b/libc/utils/gpu/loader/CMakeLists.txt @@ -1,5 +1,8 @@ add_library(gpu_loader OBJECT Main.cpp) -target_include_directories(gpu_loader PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) +target_include_directories(gpu_loader PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR} + ${LIBC_SOURCE_DIR} +) find_package(hsa-runtime64 QUIET 1.2.0 HINTS ${CMAKE_INSTALL_PREFIX} PATHS /opt/rocm) if(hsa-runtime64_FOUND) diff --git a/libc/utils/gpu/loader/Loader.h b/libc/utils/gpu/loader/Loader.h --- a/libc/utils/gpu/loader/Loader.h +++ b/libc/utils/gpu/loader/Loader.h @@ -13,10 +13,21 @@ #include #include +/// Generic launch parameters for configuration the number of blocks / threads. +struct LaunchParameters { + uint32_t num_threads_x; + uint32_t num_threads_y; + uint32_t num_threads_z; + uint32_t num_blocks_x; + uint32_t num_blocks_y; + uint32_t num_blocks_z; +}; + /// Generic interface to load the \p image and launch execution of the _start /// kernel on the target device. Copies \p argc and \p argv to the device. /// Returns the final value of the `main` function on the device. -int load(int argc, char **argv, char **evnp, void *image, size_t size); +int load(int argc, char **argv, char **evnp, void *image, size_t size, + const LaunchParameters ¶ms); /// Copy the system's argument vector to GPU memory allocated using \p alloc. template diff --git a/libc/utils/gpu/loader/Main.cpp b/libc/utils/gpu/loader/Main.cpp --- a/libc/utils/gpu/loader/Main.cpp +++ b/libc/utils/gpu/loader/Main.cpp @@ -15,21 +15,69 @@ #include #include +#include +#include int main(int argc, char **argv, char **envp) { if (argc < 2) { - printf("USAGE: ./loader , ...\n"); + printf("USAGE: ./loader [--threads , --blocks ] " + ", ...\n"); return EXIT_SUCCESS; } - // TODO: We should perform some validation on the file. - FILE *file = fopen(argv[1], "r"); + int offset = 0; + FILE *file = nullptr; + char *ptr; + LaunchParameters params = {1, 1, 1, 1, 1, 1}; + while (!file && ++offset < argc) { + if (argv[offset] == std::string("--threads") || + argv[offset] == std::string("--threads-x")) { + params.num_threads_x = + offset + 1 < argc ? strtoul(argv[offset + 1], &ptr, 10) : 1; + offset++; + continue; + } else if (argv[offset] == std::string("--threads-y")) { + params.num_threads_y = + offset + 1 < argc ? strtoul(argv[offset + 1], &ptr, 10) : 1; + offset++; + continue; + } else if (argv[offset] == std::string("--threads-z")) { + params.num_threads_z = + offset + 1 < argc ? strtoul(argv[offset + 1], &ptr, 10) : 1; + offset++; + continue; + } else if (argv[offset] == std::string("--blocks") || + argv[offset] == std::string("--blocks-x")) { + params.num_blocks_x = + offset + 1 < argc ? strtoul(argv[offset + 1], &ptr, 10) : 1; + offset++; + continue; + } else if (argv[offset] == std::string("--blocks-y")) { + params.num_blocks_y = + offset + 1 < argc ? strtoul(argv[offset + 1], &ptr, 10) : 1; + offset++; + continue; + } else if (argv[offset] == std::string("--blocks-z")) { + params.num_blocks_z = + offset + 1 < argc ? strtoul(argv[offset + 1], &ptr, 10) : 1; + offset++; + continue; + } else { + file = fopen(argv[offset], "r"); + if (!file) { + fprintf(stderr, "Failed to open image file '%s'\n", argv[offset]); + return EXIT_FAILURE; + } + break; + } + } if (!file) { - fprintf(stderr, "Failed to open image file %s\n", argv[1]); + fprintf(stderr, "No image file provided\n"); return EXIT_FAILURE; } + // TODO: We should perform some validation on the file. fseek(file, 0, SEEK_END); const auto size = ftell(file); fseek(file, 0, SEEK_SET); @@ -39,7 +87,7 @@ fclose(file); // Drop the loader from the program arguments. - int ret = load(argc - 1, &argv[1], envp, image, size); + int ret = load(argc - offset, &argv[offset], envp, image, size, params); free(image); return ret; diff --git a/libc/utils/gpu/loader/Server.h b/libc/utils/gpu/loader/Server.h new file mode 100644 --- /dev/null +++ b/libc/utils/gpu/loader/Server.h @@ -0,0 +1,59 @@ +//===-- Generic RPC server interface --------------------------------------===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#ifndef LLVM_LIBC_UTILS_GPU_LOADER_RPC_H +#define LLVM_LIBC_UTILS_GPU_LOADER_RPC_H + +#include +#include +#include +#include +#include + +#include "src/__support/RPC/rpc.h" + +static __llvm_libc::rpc::Server server; + +static __llvm_libc::cpp::Atomic lock; + +/// Queries the RPC client at least once and performs server-side work if there +/// are any active requests. +void handle_server() { + auto port = server.try_open(); + if (!port) + return; + + switch (port->get_opcode()) { + case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: { + void *str = nullptr; + port->recv_n([&](uint64_t size) { return str = malloc(size); }); + fputs(reinterpret_cast(str), stderr); + free(str); + break; + } + case __llvm_libc::rpc::Opcode::EXIT: { + port->recv([](__llvm_libc::rpc::Buffer *buffer) { + // arguably leak the port, but unimportant + exit(reinterpret_cast(buffer->data)[0]); + }); + break; + } + case __llvm_libc::rpc::Opcode::TEST_INCREMENT: { + port->recv_and_send([](__llvm_libc::rpc::Buffer *buffer) { + reinterpret_cast(buffer->data)[0] += 1; + }); + break; + } + default: + port->recv([](__llvm_libc::rpc::Buffer *) { /* no-op */ }); + // definitely leak the port + return; + } + port->close(); +} +#endif diff --git a/libc/utils/gpu/loader/amdgpu/CMakeLists.txt b/libc/utils/gpu/loader/amdgpu/CMakeLists.txt --- a/libc/utils/gpu/loader/amdgpu/CMakeLists.txt +++ b/libc/utils/gpu/loader/amdgpu/CMakeLists.txt @@ -1,7 +1,6 @@ add_executable(amdhsa_loader Loader.cpp) add_dependencies(amdhsa_loader libc.src.__support.RPC.rpc) -target_include_directories(amdhsa_loader PRIVATE ${LIBC_SOURCE_DIR}) target_link_libraries(amdhsa_loader PRIVATE hsa-runtime64::hsa-runtime64 diff --git a/libc/utils/gpu/loader/amdgpu/Loader.cpp b/libc/utils/gpu/loader/amdgpu/Loader.cpp --- a/libc/utils/gpu/loader/amdgpu/Loader.cpp +++ b/libc/utils/gpu/loader/amdgpu/Loader.cpp @@ -14,8 +14,7 @@ //===----------------------------------------------------------------------===// #include "Loader.h" - -#include "src/__support/RPC/rpc.h" +#include "Server.h" #include #include @@ -39,30 +38,6 @@ void *buffer; }; -static __llvm_libc::rpc::Server server; - -/// Queries the RPC client at least once and performs server-side work if there -/// are any active requests. -void handle_server() { - while (server.handle( - [&](__llvm_libc::rpc::Buffer *buffer) { - switch (static_cast<__llvm_libc::rpc::Opcode>(buffer->data[0])) { - case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: { - fputs(reinterpret_cast(&buffer->data[1]), stderr); - break; - } - case __llvm_libc::rpc::Opcode::EXIT: { - exit(buffer->data[1]); - break; - } - default: - return; - }; - }, - [](__llvm_libc::rpc::Buffer *buffer) {})) - ; -} - /// Print the error code and exit if \p code indicates an error. static void handle_error(hsa_status_t code) { if (code == HSA_STATUS_SUCCESS || code == HSA_STATUS_INFO_BREAK) @@ -170,7 +145,8 @@ return iterate_agent_memory_pools(agent, cb); } -int load(int argc, char **argv, char **envp, void *image, size_t size) { +int load(int argc, char **argv, char **envp, void *image, size_t size, + const LaunchParameters ¶ms) { // Initialize the HSA runtime used to communicate with the device. if (hsa_status_t err = hsa_init()) handle_error(err); @@ -330,6 +306,9 @@ hsa_amd_agents_allow_access(1, &dev_agent, nullptr, server_outbox); hsa_amd_agents_allow_access(1, &dev_agent, nullptr, buffer); + memset(server_inbox, 0, 4); + memset(server_outbox, 0, 4); + // Initialie all the arguments (explicit and implicit) to zero, then set the // explicit arguments to the values created above. std::memset(args, 0, args_size); @@ -356,12 +335,12 @@ // masked off. std::memset(packet, 0, sizeof(hsa_kernel_dispatch_packet_t)); packet->setup = 1 << HSA_KERNEL_DISPATCH_PACKET_SETUP_DIMENSIONS; - packet->workgroup_size_x = 1; - packet->workgroup_size_y = 1; - packet->workgroup_size_z = 1; - packet->grid_size_x = 1; - packet->grid_size_y = 1; - packet->grid_size_z = 1; + packet->workgroup_size_x = params.num_threads_x; + packet->workgroup_size_y = params.num_threads_y; + packet->workgroup_size_z = params.num_threads_z; + packet->grid_size_x = params.num_blocks_x * params.num_threads_x; + packet->grid_size_y = params.num_blocks_y * params.num_threads_y; + packet->grid_size_z = params.num_blocks_z * params.num_threads_z; packet->private_segment_size = private_size; packet->group_segment_size = group_size; packet->kernel_object = kernel; @@ -373,7 +352,7 @@ handle_error(err); // Initialize the RPC server's buffer for host-device communication. - server.reset(server_inbox, server_outbox, buffer); + server.reset(&lock, server_inbox, server_outbox, buffer); // Initialize the packet header and set the doorbell signal to begin execution // by the HSA runtime. diff --git a/libc/utils/gpu/loader/nvptx/CMakeLists.txt b/libc/utils/gpu/loader/nvptx/CMakeLists.txt --- a/libc/utils/gpu/loader/nvptx/CMakeLists.txt +++ b/libc/utils/gpu/loader/nvptx/CMakeLists.txt @@ -1,7 +1,6 @@ add_executable(nvptx_loader Loader.cpp) add_dependencies(nvptx_loader libc.src.__support.RPC.rpc) -target_include_directories(nvptx_loader PRIVATE ${LIBC_SOURCE_DIR}) target_link_libraries(nvptx_loader PRIVATE gpu_loader diff --git a/libc/utils/gpu/loader/nvptx/Loader.cpp b/libc/utils/gpu/loader/nvptx/Loader.cpp --- a/libc/utils/gpu/loader/nvptx/Loader.cpp +++ b/libc/utils/gpu/loader/nvptx/Loader.cpp @@ -14,8 +14,7 @@ //===----------------------------------------------------------------------===// #include "Loader.h" - -#include "src/__support/RPC/rpc.h" +#include "Server.h" #include "cuda.h" #include @@ -34,30 +33,6 @@ void *buffer; }; -static __llvm_libc::rpc::Server server; - -/// Queries the RPC client at least once and performs server-side work if there -/// are any active requests. -void handle_server() { - while (server.handle( - [&](__llvm_libc::rpc::Buffer *buffer) { - switch (static_cast<__llvm_libc::rpc::Opcode>(buffer->data[0])) { - case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: { - fputs(reinterpret_cast(&buffer->data[1]), stderr); - break; - } - case __llvm_libc::rpc::Opcode::EXIT: { - exit(buffer->data[1]); - break; - } - default: - return; - }; - }, - [](__llvm_libc::rpc::Buffer *buffer) {})) - ; -} - static void handle_error(CUresult err) { if (err == CUDA_SUCCESS) return; @@ -76,7 +51,8 @@ exit(EXIT_FAILURE); } -int load(int argc, char **argv, char **envp, void *image, size_t size) { +int load(int argc, char **argv, char **envp, void *image, size_t size, + const LaunchParameters ¶ms) { if (CUresult err = cuInit(0)) handle_error(err); @@ -154,13 +130,13 @@ CU_LAUNCH_PARAM_END}; // Initialize the RPC server's buffer for host-device communication. - server.reset(server_inbox, server_outbox, buffer); + server.reset(&lock, server_inbox, server_outbox, buffer); // Call the kernel with the given arguments. - if (CUresult err = - cuLaunchKernel(function, /*gridDimX=*/1, /*gridDimY=*/1, - /*gridDimZ=*/1, /*blockDimX=*/1, /*blockDimY=*/1, - /*bloackDimZ=*/1, 0, stream, nullptr, args_config)) + if (CUresult err = cuLaunchKernel( + function, params.num_blocks_x, params.num_blocks_y, + params.num_blocks_z, params.num_threads_x, params.num_threads_y, + params.num_threads_z, 0, stream, nullptr, args_config)) handle_error(err); // Wait until the kernel has completed execution on the device. Periodically