diff --git a/libc/src/__support/RPC/CMakeLists.txt b/libc/src/__support/RPC/CMakeLists.txt --- a/libc/src/__support/RPC/CMakeLists.txt +++ b/libc/src/__support/RPC/CMakeLists.txt @@ -10,6 +10,8 @@ DEPENDS libc.src.__support.common libc.src.__support.CPP.atomic + libc.src.__support.CPP.optional + libc.src.__support.CPP.functional libc.src.__support.GPU.utils ) diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h --- a/libc/src/__support/RPC/rpc.h +++ b/libc/src/__support/RPC/rpc.h @@ -20,6 +20,7 @@ #include "rpc_util.h" #include "src/__support/CPP/atomic.h" +#include "src/__support/CPP/functional.h" #include "src/__support/CPP/optional.h" #include "src/__support/GPU/utils.h" #include "src/string/memory_utils/memcpy_implementations.h" @@ -38,12 +39,36 @@ }; /// A fixed size channel used to communicate between the RPC client and server. -struct alignas(64) Buffer { - uint8_t data[62]; - uint16_t opcode; +struct Buffer { + uint64_t data[8]; }; static_assert(sizeof(Buffer) == 64, "Buffer size mismatch"); +/// The information associated with a packet. This indicates which operations to +/// perform and which threads are active in the slots. +struct Header { + uint64_t mask; + uint16_t opcode; +}; + +/// The data payload for the associated packet. We provide enough space for each +/// thread in the cooperating lane to have a buffer. +struct Payload { +#if defined(LIBC_TARGET_ARCH_IS_GPU) + Buffer slot[gpu::LANE_SIZE]; +#else + // Flexible array size allocated at runtime to the appropriate size. + Buffer slot[]; +#endif +}; + +/// A packet used to share data between the client and server across an entire +/// lane. We use a lane as the minimum granularity for execution. +struct alignas(64) Packet { + Header header; + Payload payload; +}; + /// A common process used to synchronize communication between a client and a /// server. The process contains an inbox and an outbox used for signaling /// ownership of the shared buffer between both sides. @@ -75,19 +100,20 @@ static constexpr uint32_t Data = 0b01; static constexpr uint32_t Ack = 0b10; + uint32_t lane_size; cpp::Atomic *lock; cpp::Atomic *inbox; cpp::Atomic *outbox; - Buffer *buffer; + Packet *buffer; /// Initialize the communication channels. - LIBC_INLINE void reset(void *lock, void *inbox, void *outbox, void *buffer) { - *this = { - reinterpret_cast *>(lock), - reinterpret_cast *>(inbox), - reinterpret_cast *>(outbox), - reinterpret_cast(buffer), - }; + LIBC_INLINE void reset(uint32_t size, void *mtx, void *in, void *out, + void *data) { + lane_size = size; + lock = reinterpret_cast *>(mtx); + inbox = reinterpret_cast *>(in); + outbox = reinterpret_cast *>(out); + buffer = reinterpret_cast(data); } /// Determines if this process owns the buffer for a send. We can send data if @@ -101,6 +127,30 @@ LIBC_INLINE static bool can_recv_data(uint32_t in, uint32_t out) { return bool(in & Process::Data) != bool(out & Process::Ack); } + + /// Invokes a function accross every active buffer across the total lane size. + LIBC_INLINE void invoke_rpc(cpp::function fn, + uint32_t index) { + if constexpr (is_process_gpu()) { + fn(&buffer[index].payload.slot[gpu::get_lane_id()]); + } else { + for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size()) + if (buffer[index].header.mask & 1ul << i) + fn(&buffer[index].payload.slot[i]); + } + } + + /// Alternate version that also provides the index of the current lane. + LIBC_INLINE void invoke_rpc(cpp::function fn, + uint32_t index) { + if constexpr (is_process_gpu()) { + fn(&buffer[index].payload.slot[gpu::get_lane_id()], gpu::get_lane_id()); + } else { + for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size()) + if (buffer[index].header.mask & 1ul << i) + fn(&buffer[index].payload.slot[i], i); + } + } }; /// The port provides the interface to communicate between the multiple @@ -123,11 +173,15 @@ template LIBC_INLINE void recv_n(A alloc); LIBC_INLINE uint16_t get_opcode() const { - return process.buffer[index].opcode; + return process.buffer[index].header.opcode; } - LIBC_INLINE void close() { + [[clang::convergent]] LIBC_INLINE void close() { + uint64_t mask = process.buffer[index].header.mask; process.lock[index].store(0, cpp::MemoryOrder::RELAXED); + cpp::atomic_thread_fence(cpp::MemoryOrder::RELEASE); + // This lane sync forces explicit ordering of the open and close calls. + gpu::sync_lane(mask); } private: @@ -169,7 +223,7 @@ } // Apply the \p fill function to initialize the buffer and release the memory. - fill(&process.buffer[index]); + process.invoke_rpc(fill, index); out = out ^ Process::Data; atomic_thread_fence(cpp::MemoryOrder::RELEASE); process.outbox[index].store(out, cpp::MemoryOrder::RELAXED); @@ -187,7 +241,7 @@ atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); // Apply the \p use function to read the memory out of the buffer. - use(&process.buffer[index]); + process.invoke_rpc(use, index); out = out ^ Process::Ack; process.outbox[index].store(out, cpp::MemoryOrder::RELAXED); } @@ -212,7 +266,10 @@ LIBC_INLINE void Port::send_n(const void *src, uint64_t size) { // TODO: We could send the first bytes in this call and potentially save an // extra send operation. - send([=](Buffer *buffer) { buffer->data[0] = size; }); + // TODO: We may need a way for the CPU to send different strings per thread. + send([=](Buffer *buffer) { + reinterpret_cast(buffer->data)[0] = size; + }); const uint8_t *ptr = reinterpret_cast(src); for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { send([=](Buffer *buffer) { @@ -221,21 +278,49 @@ inline_memcpy(buffer->data, ptr + idx, len); }); } + gpu::sync_lane(process.buffer[index].header.mask); } /// Receives an arbitrarily sized data buffer across the shared channel in /// multiples of the packet length. The \p alloc function is called with the /// size of the data so that we can initialize the size of the \p dst buffer. template LIBC_INLINE void Port::recv_n(A alloc) { - uint64_t size = 0; - recv([&](Buffer *buffer) { size = buffer->data[0]; }); - uint8_t *dst = reinterpret_cast(alloc(size)); - for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { - recv([=](Buffer *buffer) { - uint64_t len = - size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx; - inline_memcpy(dst + idx, buffer->data, len); + // The GPU handles thread private variables and masking implicitly through its + // execution model. If this is the CPU we need to manually handle the + // possibility that the sent data is of different length. + if constexpr (is_process_gpu()) { + uint64_t size = 0; + recv([&](Buffer *buffer) { + size = reinterpret_cast(buffer->data)[0]; }); + uint8_t *dst = reinterpret_cast(alloc(size), gpu::get_lane_id()); + for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { + recv([=](Buffer *buffer) { + uint64_t len = size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) + : size - idx; + inline_memcpy(dst + idx, buffer->data, len); + }); + } + return; + } else { + uint64_t size[MAX_LANE_SIZE]; + uint8_t *dst[MAX_LANE_SIZE]; + uint64_t max = 0; + recv([&](Buffer *buffer, uint32_t id) { + size[id] = reinterpret_cast(buffer->data)[0]; + dst[id] = reinterpret_cast(alloc(size[id], id)); + max = size[id] > max ? size[id] : max; + }); + for (uint64_t idx = 0; idx < max; idx += sizeof(Buffer::data)) { + recv([=](Buffer *buffer, uint32_t id) { + uint64_t len = size[id] - idx > sizeof(Buffer::data) + ? sizeof(Buffer::data) + : size[id] - idx; + if (idx < size[id]) + inline_memcpy(dst[id] + idx, buffer->data, len); + }); + } + return; } } @@ -243,9 +328,18 @@ /// port if we find an index that is in a valid sending state. That is, there /// are send operations pending that haven't been serviced on this port. Each /// port instance uses an associated \p opcode to tell the server what to do. -LIBC_INLINE cpp::optional Client::try_open(uint16_t opcode) { +/// Opening a port is only valid if the `opcode` is the sam accross every +/// participating thread. +[[clang::convergent]] LIBC_INLINE cpp::optional +Client::try_open(uint16_t opcode) { // Attempt to acquire the lock on this index. - if (lock->fetch_or(1, cpp::MemoryOrder::RELAXED)) + uint64_t lane_mask = gpu::get_lane_mask(); + uint32_t previous = 0; + if (is_first_lane(lane_mask)) + previous = lock->fetch_or(1, cpp::MemoryOrder::ACQUIRE); + previous = gpu::broadcast_value(previous); + + if (previous != 0) return cpp::nullopt; uint32_t in = inbox->load(cpp::MemoryOrder::RELAXED); @@ -255,10 +349,16 @@ // state. if (!can_send_data(in, out)) { lock->store(0, cpp::MemoryOrder::RELAXED); + cpp::atomic_thread_fence(cpp::MemoryOrder::RELEASE); return cpp::nullopt; } - buffer->opcode = opcode; + // TODO: The index here will be non-zero when we support multiple slots. + if (is_first_lane(lane_mask)) { + buffer[0].header.opcode = opcode; + buffer[0].header.mask = lane_mask; + } + gpu::sync_lane(lane_mask); return Port(*this, 0, out); } @@ -272,7 +372,7 @@ /// Attempts to open a port to use as the server. The server can only open a /// port if it has a pending receive operation -LIBC_INLINE cpp::optional Server::try_open() { +[[clang::convergent]] LIBC_INLINE cpp::optional Server::try_open() { uint32_t in = inbox->load(cpp::MemoryOrder::RELAXED); uint32_t out = outbox->load(cpp::MemoryOrder::RELAXED); @@ -282,7 +382,13 @@ return cpp::nullopt; // Attempt to acquire the lock on this index. - if (lock->fetch_or(1, cpp::MemoryOrder::RELAXED)) + uint64_t lane_mask = gpu::get_lane_mask(); + uint32_t previous = 0; + if (is_first_lane(lane_mask)) + previous = lock->fetch_or(1, cpp::MemoryOrder::ACQUIRE); + previous = gpu::broadcast_value(previous); + + if (previous != 0) return cpp::nullopt; in = inbox->load(cpp::MemoryOrder::RELAXED); @@ -290,6 +396,7 @@ if (!can_recv_data(in, out)) { lock->store(0, cpp::MemoryOrder::RELAXED); + cpp::atomic_thread_fence(cpp::MemoryOrder::RELEASE); return cpp::nullopt; } diff --git a/libc/src/__support/RPC/rpc_util.h b/libc/src/__support/RPC/rpc_util.h --- a/libc/src/__support/RPC/rpc_util.h +++ b/libc/src/__support/RPC/rpc_util.h @@ -9,12 +9,16 @@ #ifndef LLVM_LIBC_SRC_SUPPORT_RPC_RPC_UTILS_H #define LLVM_LIBC_SRC_SUPPORT_RPC_RPC_UTILS_H +#include "src/__support/GPU/utils.h" #include "src/__support/macros/attributes.h" #include "src/__support/macros/properties/architectures.h" namespace __llvm_libc { namespace rpc { +/// Maximum amount of data a single lane can use. +constexpr uint64_t MAX_LANE_SIZE = 64; + /// Suspend the thread briefly to assist the thread scheduler during busy loops. LIBC_INLINE void sleep_briefly() { #if defined(LIBC_TARGET_ARCH_IS_NVPTX) && __CUDA_ARCH__ >= 700 @@ -26,6 +30,25 @@ #endif } +/// Get the first active thread inside the lane. +LIBC_INLINE uint64_t get_first_lane_id(uint64_t lane_mask) { + return __builtin_ffsl(lane_mask) - 1; +} + +/// Conditional that is only true for a single thread in a lane. +LIBC_INLINE bool is_first_lane(uint64_t lane_mask) { + return gpu::get_lane_id() == get_first_lane_id(lane_mask); +} + +/// Conditional to indicate if this process is running on the GPU. +LIBC_INLINE constexpr bool is_process_gpu() { +#if defined(LIBC_TARGET_ARCH_IS_GPU) + return true; +#else + return false; +#endif +} + } // namespace rpc } // namespace __llvm_libc diff --git a/libc/startup/gpu/amdgpu/start.cpp b/libc/startup/gpu/amdgpu/start.cpp --- a/libc/startup/gpu/amdgpu/start.cpp +++ b/libc/startup/gpu/amdgpu/start.cpp @@ -20,7 +20,7 @@ void init_rpc(void *in, void *out, void *buffer) { // Only a single thread should update the RPC data. if (gpu::get_thread_id() == 0 && gpu::get_block_id() == 0) { - rpc::client.reset(&lock, in, out, buffer); + rpc::client.reset(gpu::get_lane_size(), &lock, in, out, buffer); init.store(1, cpp::MemoryOrder::RELAXED); } diff --git a/libc/startup/gpu/nvptx/start.cpp b/libc/startup/gpu/nvptx/start.cpp --- a/libc/startup/gpu/nvptx/start.cpp +++ b/libc/startup/gpu/nvptx/start.cpp @@ -20,7 +20,7 @@ void init_rpc(void *in, void *out, void *buffer) { // Only a single thread should update the RPC data. if (gpu::get_thread_id() == 0 && gpu::get_block_id() == 0) { - rpc::client.reset(&lock, in, out, buffer); + rpc::client.reset(gpu::get_lane_size(), &lock, in, out, buffer); init.store(1, cpp::MemoryOrder::RELAXED); } diff --git a/libc/test/integration/startup/gpu/CMakeLists.txt b/libc/test/integration/startup/gpu/CMakeLists.txt --- a/libc/test/integration/startup/gpu/CMakeLists.txt +++ b/libc/test/integration/startup/gpu/CMakeLists.txt @@ -22,6 +22,10 @@ libc.src.__support.RPC.rpc_client libc.src.__support.GPU.utils LOADER_ARGS - --blocks 16 - --threads 1 + --blocks-x 2 + --blocks-y 2 + --blocks-z 2 + --threads-x 4 + --threads-y 4 + --threads-z 4 ) diff --git a/libc/test/integration/startup/gpu/rpc_test.cpp b/libc/test/integration/startup/gpu/rpc_test.cpp --- a/libc/test/integration/startup/gpu/rpc_test.cpp +++ b/libc/test/integration/startup/gpu/rpc_test.cpp @@ -13,7 +13,8 @@ using namespace __llvm_libc; static void test_add_simple() { - uint32_t num_additions = 1000 + 10 * gpu::get_block_id_x(); + uint32_t num_additions = + 10 + 10 * gpu::get_thread_id() + 10 * gpu::get_block_id(); uint64_t cnt = 0; for (uint32_t i = 0; i < num_additions; ++i) { rpc::Port port = rpc::client.open(rpc::TEST_INCREMENT); @@ -29,8 +30,20 @@ ASSERT_TRUE(cnt == num_additions && "Incorrect sum"); } +// Test to ensure that the RPC mechanism doesn't hang on divergence. +static void test_noop(uint8_t data) { + rpc::Port port = rpc::client.open(rpc::NOOP); + port.send([=](rpc::Buffer *buffer) { buffer->data[0] = data; }); + port.close(); +} + TEST_MAIN(int argc, char **argv, char **envp) { test_add_simple(); + if (gpu::get_thread_id() % 2) + test_noop(1); + else + test_noop(2); + return 0; } diff --git a/libc/utils/gpu/loader/Loader.h b/libc/utils/gpu/loader/Loader.h --- a/libc/utils/gpu/loader/Loader.h +++ b/libc/utils/gpu/loader/Loader.h @@ -29,6 +29,11 @@ int load(int argc, char **argv, char **evnp, void *image, size_t size, const LaunchParameters ¶ms); +/// Return \p V aligned "upwards" according to \p Align. +template inline V align_up(V val, A align) { + return ((val + V(align) - 1) / V(align)) * V(align); +} + /// Copy the system's argument vector to GPU memory allocated using \p alloc. template void *copy_argument_vector(int argc, char **argv, Allocator alloc) { diff --git a/libc/utils/gpu/loader/Server.h b/libc/utils/gpu/loader/Server.h --- a/libc/utils/gpu/loader/Server.h +++ b/libc/utils/gpu/loader/Server.h @@ -30,10 +30,15 @@ switch (port->get_opcode()) { case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: { - void *str = nullptr; - port->recv_n([&](uint64_t size) { return str = malloc(size); }); - fputs(reinterpret_cast(str), stderr); - free(str); + void *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr}; + port->recv_n( + [&](uint64_t size, uint32_t id) { return strs[id] = malloc(size); }); + for (void *str : strs) { + if (str) { + fputs(reinterpret_cast(str), stderr); + free(str); + } + } break; } case __llvm_libc::rpc::Opcode::EXIT: { @@ -49,8 +54,7 @@ break; } default: - port->recv([](__llvm_libc::rpc::Buffer *) { /* no-op */ }); - return; + port->recv([](__llvm_libc::rpc::Buffer *buffer) {}); } port->close(); } diff --git a/libc/utils/gpu/loader/amdgpu/Loader.cpp b/libc/utils/gpu/loader/amdgpu/Loader.cpp --- a/libc/utils/gpu/loader/amdgpu/Loader.cpp +++ b/libc/utils/gpu/loader/amdgpu/Loader.cpp @@ -287,6 +287,10 @@ hsa_amd_memory_fill(dev_ret, 0, sizeof(int)); // Allocate finegrained memory for the RPC server and client to share. + uint32_t wavefront_size = 0; + if (hsa_status_t err = hsa_agent_get_info( + dev_agent, HSA_AGENT_INFO_WAVEFRONT_SIZE, &wavefront_size)) + handle_error(err); void *server_inbox; void *server_outbox; void *buffer; @@ -299,7 +303,10 @@ /*flags=*/0, &server_outbox)) handle_error(err); if (hsa_status_t err = hsa_amd_memory_pool_allocate( - finegrained_pool, sizeof(__llvm_libc::rpc::Buffer), + finegrained_pool, + align_up(sizeof(__llvm_libc::rpc::Header) + + (wavefront_size * sizeof(__llvm_libc::rpc::Buffer)), + alignof(__llvm_libc::rpc::Packet)), /*flags=*/0, &buffer)) handle_error(err); hsa_amd_agents_allow_access(1, &dev_agent, nullptr, server_inbox); @@ -351,7 +358,7 @@ handle_error(err); // Initialize the RPC server's buffer for host-device communication. - server.reset(&lock, server_inbox, server_outbox, buffer); + server.reset(wavefront_size, &lock, server_inbox, server_outbox, buffer); // Initialize the packet header and set the doorbell signal to begin execution // by the HSA runtime. diff --git a/libc/utils/gpu/loader/nvptx/Loader.cpp b/libc/utils/gpu/loader/nvptx/Loader.cpp --- a/libc/utils/gpu/loader/nvptx/Loader.cpp +++ b/libc/utils/gpu/loader/nvptx/Loader.cpp @@ -108,9 +108,13 @@ if (CUresult err = cuMemsetD32(dev_ret, 0, 1)) handle_error(err); + uint32_t warp_size = 32; void *server_inbox = allocator(sizeof(__llvm_libc::cpp::Atomic)); void *server_outbox = allocator(sizeof(__llvm_libc::cpp::Atomic)); - void *buffer = allocator(sizeof(__llvm_libc::rpc::Buffer)); + void *buffer = + allocator(align_up(sizeof(__llvm_libc::rpc::Header) + + (warp_size * sizeof(__llvm_libc::rpc::Buffer)), + alignof(__llvm_libc::rpc::Packet))); if (!server_inbox || !server_outbox || !buffer) handle_error("Failed to allocate memory the RPC client / server."); @@ -130,7 +134,7 @@ CU_LAUNCH_PARAM_END}; // Initialize the RPC server's buffer for host-device communication. - server.reset(&lock, server_inbox, server_outbox, buffer); + server.reset(warp_size, &lock, server_inbox, server_outbox, buffer); // Call the kernel with the given arguments. if (CUresult err = cuLaunchKernel(