diff --git a/libc/src/__support/RPC/CMakeLists.txt b/libc/src/__support/RPC/CMakeLists.txt --- a/libc/src/__support/RPC/CMakeLists.txt +++ b/libc/src/__support/RPC/CMakeLists.txt @@ -10,6 +10,8 @@ DEPENDS libc.src.__support.common libc.src.__support.CPP.atomic + libc.src.__support.CPP.optional + libc.src.__support.CPP.functional libc.src.__support.GPU.utils ) diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h --- a/libc/src/__support/RPC/rpc.h +++ b/libc/src/__support/RPC/rpc.h @@ -20,6 +20,7 @@ #include "rpc_util.h" #include "src/__support/CPP/atomic.h" +#include "src/__support/CPP/functional.h" #include "src/__support/CPP/optional.h" #include "src/__support/GPU/utils.h" #include "src/string/memory_utils/memcpy_implementations.h" @@ -38,12 +39,36 @@ }; /// A fixed size channel used to communicate between the RPC client and server. -struct alignas(64) Buffer { - uint8_t data[62]; - uint16_t opcode; +struct Buffer { + uint64_t data[8]; }; static_assert(sizeof(Buffer) == 64, "Buffer size mismatch"); +/// The information associated with a packet. This indicates which operations to +/// perform and which threads are active in the slots. +struct Header { + uint64_t mask; + uint16_t opcode; +}; + +/// The data payload for the associated packet. We provide enough space for each +/// thread in the cooperating lane to have a buffer. +struct Payload { +#if defined(LIBC_TARGET_ARCH_IS_GPU) + Buffer slot[gpu::LANE_SIZE]; +#else + // Flexible array size allocated at runtime to the appropriate size. + Buffer slot[]; +#endif +}; + +/// A packet used to share data between the client and server across an entire +/// lane. We use a lane as the minimum granularity for execution. +struct alignas(64) Packet { + Header header; + Payload payload; +}; + /// A common process used to synchronize communication between a client and a /// server. The process contains an inbox and an outbox used for signaling /// ownership of the shared buffer between both sides. @@ -71,18 +96,21 @@ LIBC_INLINE Process &operator=(const Process &) = default; LIBC_INLINE ~Process() = default; + uint32_t lane_size; cpp::Atomic *lock; cpp::Atomic *inbox; cpp::Atomic *outbox; - Buffer *buffer; + Packet *buffer; /// Initialize the communication channels. - LIBC_INLINE void reset(void *lock, void *inbox, void *outbox, void *buffer) { + LIBC_INLINE void reset(uint32_t lane_size, void *lock, void *inbox, + void *outbox, void *buffer) { *this = { + lane_size, reinterpret_cast *>(lock), reinterpret_cast *>(inbox), reinterpret_cast *>(outbox), - reinterpret_cast(buffer), + reinterpret_cast(buffer), }; } @@ -144,7 +172,8 @@ return lane_mask != packed; } - // Unlock the lock at index. + /// Unlock the lock at index. We need a lane sync to keep this function + /// convergent, otherwise the compiler will sink the store and deadlock. [[clang::convergent]] LIBC_INLINE void unlock(uint64_t lane_mask, uint64_t index) { // Wait for other threads in the warp to finish using the lock @@ -156,6 +185,31 @@ // warp dropping the lock again. uint32_t and_mask = ~(rpc::is_first_lane(lane_mask) ? 1 : 0); lock[index].fetch_and(and_mask, cpp::MemoryOrder::RELAXED); + gpu::sync_lane(lane_mask); + } + + /// Invokes a function accross every active buffer across the total lane size. + LIBC_INLINE void invoke_rpc(cpp::function fn, + uint32_t index) { + if constexpr (is_process_gpu()) { + fn(&buffer[index].payload.slot[gpu::get_lane_id()]); + } else { + for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size()) + if (buffer[index].header.mask & 1ul << i) + fn(&buffer[index].payload.slot[i]); + } + } + + /// Alternate version that also provides the index of the current lane. + LIBC_INLINE void invoke_rpc(cpp::function fn, + uint32_t index) { + if constexpr (is_process_gpu()) { + fn(&buffer[index].payload.slot[gpu::get_lane_id()], gpu::get_lane_id()); + } else { + for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size()) + if (buffer[index].header.mask & 1ul << i) + fn(&buffer[index].payload.slot[i], i); + } } }; @@ -180,7 +234,7 @@ template LIBC_INLINE void recv_n(A alloc); LIBC_INLINE uint16_t get_opcode() const { - return process.buffer[index].opcode; + return process.buffer[index].header.opcode; } LIBC_INLINE void close() { process.unlock(lane_mask, index); } @@ -227,7 +281,7 @@ } // Apply the \p fill function to initialize the buffer and release the memory. - fill(&process.buffer[index]); + process.invoke_rpc(fill, index); out = !out; atomic_thread_fence(cpp::MemoryOrder::RELEASE); process.outbox[index].store(out, cpp::MemoryOrder::RELAXED); @@ -245,7 +299,7 @@ atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); // Apply the \p use function to read the memory out of the buffer. - use(&process.buffer[index]); + process.invoke_rpc(use, index); out = !out; process.outbox[index].store(out, cpp::MemoryOrder::RELAXED); } @@ -274,7 +328,10 @@ LIBC_INLINE void Port::send_n(const void *src, uint64_t size) { // TODO: We could send the first bytes in this call and potentially save an // extra send operation. - send([=](Buffer *buffer) { buffer->data[0] = size; }); + // TODO: We may need a way for the CPU to send different strings per thread. + send([=](Buffer *buffer) { + reinterpret_cast(buffer->data)[0] = size; + }); const uint8_t *ptr = reinterpret_cast(src); for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { send([=](Buffer *buffer) { @@ -283,6 +340,7 @@ inline_memcpy(buffer->data, ptr + idx, len); }); } + gpu::sync_lane(process.buffer[index].header.mask); } /// Receives an arbitrarily sized data buffer across the shared channel in @@ -291,15 +349,42 @@ template template LIBC_INLINE void Port::recv_n(A alloc) { - uint64_t size = 0; - recv([&](Buffer *buffer) { size = buffer->data[0]; }); - uint8_t *dst = reinterpret_cast(alloc(size)); - for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { - recv([=](Buffer *buffer) { - uint64_t len = - size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx; - inline_memcpy(dst + idx, buffer->data, len); + // The GPU handles thread private variables and masking implicitly through its + // execution model. If this is the CPU we need to manually handle the + // possibility that the sent data is of different length. + if constexpr (is_process_gpu()) { + uint64_t size = 0; + recv([&](Buffer *buffer) { + size = reinterpret_cast(buffer->data)[0]; + }); + uint8_t *dst = reinterpret_cast(alloc(size), gpu::get_lane_id()); + for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { + recv([=](Buffer *buffer) { + uint64_t len = size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) + : size - idx; + inline_memcpy(dst + idx, buffer->data, len); + }); + } + return; + } else { + uint64_t size[MAX_LANE_SIZE]; + uint8_t *dst[MAX_LANE_SIZE]; + uint64_t max = 0; + recv([&](Buffer *buffer, uint32_t id) { + size[id] = reinterpret_cast(buffer->data)[0]; + dst[id] = reinterpret_cast(alloc(size[id], id)); + max = size[id] > max ? size[id] : max; }); + for (uint64_t idx = 0; idx < max; idx += sizeof(Buffer::data)) { + recv([=](Buffer *buffer, uint32_t id) { + uint64_t len = size[id] - idx > sizeof(Buffer::data) + ? sizeof(Buffer::data) + : size[id] - idx; + if (idx < size[id]) + inline_memcpy(dst[id] + idx, buffer->data, len); + }); + } + return; } } @@ -307,7 +392,10 @@ /// port if we find an index that is in a valid sending state. That is, there /// are send operations pending that haven't been serviced on this port. Each /// port instance uses an associated \p opcode to tell the server what to do. -LIBC_INLINE cpp::optional Client::try_open(uint16_t opcode) { +/// Opening a port is only valid if the `opcode` is the sam accross every +/// participating thread. +[[clang::convergent]] LIBC_INLINE cpp::optional +Client::try_open(uint16_t opcode) { constexpr uint64_t index = 0; const uint64_t lane_mask = gpu::get_lane_mask(); @@ -323,13 +411,16 @@ // Once we acquire the index we need to check if we are in a valid sending // state. - if (buffer_unavailable(in, out)) { unlock(lane_mask, index); return cpp::nullopt; } - buffer->opcode = opcode; + if (is_first_lane(lane_mask)) { + buffer[index].header.opcode = opcode; + buffer[index].header.mask = lane_mask; + } + gpu::sync_lane(lane_mask); return Port(*this, lane_mask, index, out); } @@ -343,7 +434,8 @@ /// Attempts to open a port to use as the server. The server can only open a /// port if it has a pending receive operation -LIBC_INLINE cpp::optional Server::try_open() { +[[clang::convergent]] LIBC_INLINE cpp::optional +Server::try_open() { constexpr uint64_t index = 0; const uint64_t lane_mask = gpu::get_lane_mask(); diff --git a/libc/src/__support/RPC/rpc_util.h b/libc/src/__support/RPC/rpc_util.h --- a/libc/src/__support/RPC/rpc_util.h +++ b/libc/src/__support/RPC/rpc_util.h @@ -16,6 +16,9 @@ namespace __llvm_libc { namespace rpc { +/// Maximum amount of data a single lane can use. +constexpr uint64_t MAX_LANE_SIZE = 64; + /// Suspend the thread briefly to assist the thread scheduler during busy loops. LIBC_INLINE void sleep_briefly() { #if defined(LIBC_TARGET_ARCH_IS_NVPTX) && __CUDA_ARCH__ >= 700 @@ -37,6 +40,15 @@ return gpu::get_lane_id() == get_first_lane_id(lane_mask); } +/// Conditional to indicate if this process is running on the GPU. +LIBC_INLINE constexpr bool is_process_gpu() { +#if defined(LIBC_TARGET_ARCH_IS_GPU) + return true; +#else + return false; +#endif +} + } // namespace rpc } // namespace __llvm_libc diff --git a/libc/startup/gpu/amdgpu/start.cpp b/libc/startup/gpu/amdgpu/start.cpp --- a/libc/startup/gpu/amdgpu/start.cpp +++ b/libc/startup/gpu/amdgpu/start.cpp @@ -52,7 +52,7 @@ if (gpu::get_thread_id() == 0 && gpu::get_block_id() == 0) { // We need to set up the RPC client first in case any of the constructors // require it. - rpc::client.reset(&lock, in, out, buffer); + rpc::client.reset(gpu::get_lane_size(), &lock, in, out, buffer); // We want the fini array callbacks to be run after other atexit // callbacks are run. So, we register them before running the init diff --git a/libc/startup/gpu/nvptx/start.cpp b/libc/startup/gpu/nvptx/start.cpp --- a/libc/startup/gpu/nvptx/start.cpp +++ b/libc/startup/gpu/nvptx/start.cpp @@ -57,7 +57,7 @@ if (gpu::get_thread_id() == 0 && gpu::get_block_id() == 0) { // We need to set up the RPC client first in case any of the constructors // require it. - rpc::client.reset(&lock, in, out, buffer); + rpc::client.reset(gpu::get_lane_size(), &lock, in, out, buffer); // We want the fini array callbacks to be run after other atexit // callbacks are run. So, we register them before running the init diff --git a/libc/test/integration/startup/gpu/CMakeLists.txt b/libc/test/integration/startup/gpu/CMakeLists.txt --- a/libc/test/integration/startup/gpu/CMakeLists.txt +++ b/libc/test/integration/startup/gpu/CMakeLists.txt @@ -22,8 +22,12 @@ libc.src.__support.RPC.rpc_client libc.src.__support.GPU.utils LOADER_ARGS - --blocks 16 - --threads 1 + --blocks-x 2 + --blocks-y 2 + --blocks-z 2 + --threads-x 4 + --threads-y 4 + --threads-z 4 ) add_integration_test( diff --git a/libc/test/integration/startup/gpu/rpc_test.cpp b/libc/test/integration/startup/gpu/rpc_test.cpp --- a/libc/test/integration/startup/gpu/rpc_test.cpp +++ b/libc/test/integration/startup/gpu/rpc_test.cpp @@ -13,7 +13,8 @@ using namespace __llvm_libc; static void test_add_simple() { - uint32_t num_additions = 1000 + 10 * gpu::get_block_id_x(); + uint32_t num_additions = + 10 + 10 * gpu::get_thread_id() + 10 * gpu::get_block_id(); uint64_t cnt = 0; for (uint32_t i = 0; i < num_additions; ++i) { rpc::Client::Port port = rpc::client.open(rpc::TEST_INCREMENT); @@ -29,8 +30,20 @@ ASSERT_TRUE(cnt == num_additions && "Incorrect sum"); } +// Test to ensure that the RPC mechanism doesn't hang on divergence. +static void test_noop(uint8_t data) { + rpc::Client::Port port = rpc::client.open(rpc::NOOP); + port.send([=](rpc::Buffer *buffer) { buffer->data[0] = data; }); + port.close(); +} + TEST_MAIN(int argc, char **argv, char **envp) { test_add_simple(); + if (gpu::get_thread_id() % 2) + test_noop(1); + else + test_noop(2); + return 0; } diff --git a/libc/utils/gpu/loader/Loader.h b/libc/utils/gpu/loader/Loader.h --- a/libc/utils/gpu/loader/Loader.h +++ b/libc/utils/gpu/loader/Loader.h @@ -29,6 +29,11 @@ int load(int argc, char **argv, char **evnp, void *image, size_t size, const LaunchParameters ¶ms); +/// Return \p V aligned "upwards" according to \p Align. +template inline V align_up(V val, A align) { + return ((val + V(align) - 1) / V(align)) * V(align); +} + /// Copy the system's argument vector to GPU memory allocated using \p alloc. template void *copy_argument_vector(int argc, char **argv, Allocator alloc) { diff --git a/libc/utils/gpu/loader/Server.h b/libc/utils/gpu/loader/Server.h --- a/libc/utils/gpu/loader/Server.h +++ b/libc/utils/gpu/loader/Server.h @@ -30,15 +30,19 @@ switch (port->get_opcode()) { case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: { - uint64_t str_size; - char *str = nullptr; - port->recv_n([&](uint64_t size) { - str_size = size; - str = new char[size]; - return str; + uint64_t str_size[__llvm_libc::rpc::MAX_LANE_SIZE] = {0}; + char *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr}; + port->recv_n([&](uint64_t size, uint32_t id) { + str_size[id] = size; + strs[id] = new char[size]; + return strs[id]; }); - fwrite(str, str_size, 1, stderr); - delete[] str; + for (uint64_t i = 0; i < __llvm_libc::rpc::MAX_LANE_SIZE; ++i) { + if (strs[i]) { + fwrite(strs[i], str_size[i], 1, stderr); + delete[] strs[i]; + } + } break; } case __llvm_libc::rpc::Opcode::EXIT: { @@ -54,8 +58,7 @@ break; } default: - port->recv([](__llvm_libc::rpc::Buffer *) { /* no-op */ }); - return; + port->recv([](__llvm_libc::rpc::Buffer *buffer) {}); } port->close(); } diff --git a/libc/utils/gpu/loader/amdgpu/Loader.cpp b/libc/utils/gpu/loader/amdgpu/Loader.cpp --- a/libc/utils/gpu/loader/amdgpu/Loader.cpp +++ b/libc/utils/gpu/loader/amdgpu/Loader.cpp @@ -287,6 +287,10 @@ hsa_amd_memory_fill(dev_ret, 0, sizeof(int)); // Allocate finegrained memory for the RPC server and client to share. + uint32_t wavefront_size = 0; + if (hsa_status_t err = hsa_agent_get_info( + dev_agent, HSA_AGENT_INFO_WAVEFRONT_SIZE, &wavefront_size)) + handle_error(err); void *server_inbox; void *server_outbox; void *buffer; @@ -299,7 +303,10 @@ /*flags=*/0, &server_outbox)) handle_error(err); if (hsa_status_t err = hsa_amd_memory_pool_allocate( - finegrained_pool, sizeof(__llvm_libc::rpc::Buffer), + finegrained_pool, + align_up(sizeof(__llvm_libc::rpc::Header) + + (wavefront_size * sizeof(__llvm_libc::rpc::Buffer)), + alignof(__llvm_libc::rpc::Packet)), /*flags=*/0, &buffer)) handle_error(err); hsa_amd_agents_allow_access(1, &dev_agent, nullptr, server_inbox); @@ -351,7 +358,7 @@ handle_error(err); // Initialize the RPC server's buffer for host-device communication. - server.reset(&lock, server_inbox, server_outbox, buffer); + server.reset(wavefront_size, &lock, server_inbox, server_outbox, buffer); // Initialize the packet header and set the doorbell signal to begin execution // by the HSA runtime. diff --git a/libc/utils/gpu/loader/nvptx/Loader.cpp b/libc/utils/gpu/loader/nvptx/Loader.cpp --- a/libc/utils/gpu/loader/nvptx/Loader.cpp +++ b/libc/utils/gpu/loader/nvptx/Loader.cpp @@ -232,9 +232,13 @@ if (CUresult err = cuMemsetD32(dev_ret, 0, 1)) handle_error(err); + uint32_t warp_size = 32; void *server_inbox = allocator(sizeof(__llvm_libc::cpp::Atomic)); void *server_outbox = allocator(sizeof(__llvm_libc::cpp::Atomic)); - void *buffer = allocator(sizeof(__llvm_libc::rpc::Buffer)); + void *buffer = + allocator(align_up(sizeof(__llvm_libc::rpc::Header) + + (warp_size * sizeof(__llvm_libc::rpc::Buffer)), + alignof(__llvm_libc::rpc::Packet))); if (!server_inbox || !server_outbox || !buffer) handle_error("Failed to allocate memory the RPC client / server."); @@ -254,7 +258,7 @@ CU_LAUNCH_PARAM_END}; // Initialize the RPC server's buffer for host-device communication. - server.reset(&lock, server_inbox, server_outbox, buffer); + server.reset(warp_size, &lock, server_inbox, server_outbox, buffer); // Call the kernel with the given arguments. if (CUresult err = cuLaunchKernel(