diff --git a/libc/src/__support/OSUtil/gpu/quick_exit.cpp b/libc/src/__support/OSUtil/gpu/quick_exit.cpp --- a/libc/src/__support/OSUtil/gpu/quick_exit.cpp +++ b/libc/src/__support/OSUtil/gpu/quick_exit.cpp @@ -18,7 +18,7 @@ void quick_exit(int status) { rpc::Port port = rpc::client.open(rpc::EXIT); - port.send([&](rpc::Buffer *buffer) { + port.send([&](rpc::Buffer *buffer, uint32_t) { reinterpret_cast(buffer->data)[0] = status; }); port.close(); diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h --- a/libc/src/__support/RPC/rpc.h +++ b/libc/src/__support/RPC/rpc.h @@ -39,11 +39,24 @@ /// A fixed size channel used to communicate between the RPC client and server. struct alignas(64) Buffer { - uint8_t data[62]; - uint16_t opcode; + uint8_t data[64]; }; static_assert(sizeof(Buffer) == 64, "Buffer size mismatch"); +/// A buffer used to swap data between the client and server across an entire +/// lane. The GPU executes threads in lock-step so we use the size of a 'warp' +/// or 'wavefront' as our minimum granularity. +struct LaneBuffer { + uint16_t opcode; + uint64_t mask; +#if defined(LIBC_TARGET_ARCH_IS_GPU) + Buffer slot[gpu::LANE_SIZE]; +#else + // Flexible array size allocated at runtime to the appropriate size. + Buffer slot[]; +#endif +}; + /// A common process used to synchronize communication between a client and a /// server. The process contains an inbox and an outbox used for signaling /// ownership of the shared buffer between both sides. @@ -75,19 +88,20 @@ static constexpr uint32_t Data = 0b01; static constexpr uint32_t Ack = 0b10; + uint32_t lane_size; cpp::Atomic *lock; cpp::Atomic *inbox; cpp::Atomic *outbox; - Buffer *buffer; + LaneBuffer *buffer; /// Initialize the communication channels. - LIBC_INLINE void reset(void *lock, void *inbox, void *outbox, void *buffer) { - *this = { - reinterpret_cast *>(lock), - reinterpret_cast *>(inbox), - reinterpret_cast *>(outbox), - reinterpret_cast(buffer), - }; + LIBC_INLINE void reset(uint32_t size, void *mtx, void *in, void *out, + void *data) { + lane_size = size; + lock = reinterpret_cast *>(mtx); + inbox = reinterpret_cast *>(in); + outbox = reinterpret_cast *>(out); + buffer = reinterpret_cast(data); } /// Determines if this process owns the buffer for a send. We can send data if @@ -169,7 +183,10 @@ } // Apply the \p fill function to initialize the buffer and release the memory. - fill(&process.buffer[index]); + for (uint32_t idx = 0; idx < process.lane_size; idx += gpu::get_lane_size()) + if (is_process_gpu() || process.buffer[index].mask & 1ul << idx) + fill(&process.buffer[index].slot[gpu::get_lane_id() + idx], + gpu::get_lane_id() + idx); out = out ^ Process::Data; atomic_thread_fence(cpp::MemoryOrder::RELEASE); process.outbox[index].store(out, cpp::MemoryOrder::RELAXED); @@ -187,7 +204,10 @@ atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); // Apply the \p use function to read the memory out of the buffer. - use(&process.buffer[index]); + for (uint32_t idx = 0; idx < process.lane_size; idx += gpu::get_lane_size()) + if (is_process_gpu() || process.buffer[index].mask & 1ul << idx) + use(&process.buffer[index].slot[gpu::get_lane_id() + idx], + gpu::get_lane_id() + idx); out = out ^ Process::Ack; process.outbox[index].store(out, cpp::MemoryOrder::RELAXED); } @@ -204,7 +224,7 @@ /// the copy back. template LIBC_INLINE void Port::recv_and_send(W work) { recv(work); - send([](Buffer *) { /* no-op */ }); + send([](Buffer *, uint32_t) { /* no-op */ }); } /// Sends an arbitrarily sized data buffer \p src across the shared channel in @@ -212,10 +232,13 @@ LIBC_INLINE void Port::send_n(const void *src, uint64_t size) { // TODO: We could send the first bytes in this call and potentially save an // extra send operation. - send([=](Buffer *buffer) { buffer->data[0] = size; }); + // TODO: We may need a way for the CPU to send different strings per thread. + send([=](Buffer *buffer, uint32_t) { + reinterpret_cast(buffer->data)[0] = size; + }); const uint8_t *ptr = reinterpret_cast(src); for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { - send([=](Buffer *buffer) { + send([=](Buffer *buffer, uint32_t) { const uint64_t len = size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx; inline_memcpy(buffer->data, ptr + idx, len); @@ -227,15 +250,42 @@ /// multiples of the packet length. The \p alloc function is called with the /// size of the data so that we can initialize the size of the \p dst buffer. template LIBC_INLINE void Port::recv_n(A alloc) { - uint64_t size = 0; - recv([&](Buffer *buffer) { size = buffer->data[0]; }); - uint8_t *dst = reinterpret_cast(alloc(size)); - for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { - recv([=](Buffer *buffer) { - uint64_t len = - size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx; - inline_memcpy(dst + idx, buffer->data, len); + // The GPU handles thread private variables and masking implicitly through its + // execution model. If this is the CPU we need to manually handle the + // possibility that the sent data is of different length. + if constexpr (is_process_gpu()) { + uint64_t size = 0; + recv([&](Buffer *buffer, uint32_t) { + size = reinterpret_cast(buffer->data)[0]; + }); + uint8_t *dst = reinterpret_cast(alloc(size), gpu::get_lane_id()); + for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { + recv([=](Buffer *buffer, uint32_t) { + uint64_t len = size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) + : size - idx; + inline_memcpy(dst + idx, buffer->data, len); + }); + } + return; + } else { + uint64_t size[MAX_LANE_SIZE]; + uint8_t *dst[MAX_LANE_SIZE]; + uint64_t max = 0; + recv([&](Buffer *buffer, uint32_t id) { + size[id] = reinterpret_cast(buffer->data)[0]; + dst[id] = reinterpret_cast(alloc(size[id], id)); + max = size[id] > max ? size[id] : max; }); + for (uint64_t idx = 0; idx < max; idx += sizeof(Buffer::data)) { + recv([=](Buffer *buffer, uint32_t id) { + uint64_t len = size[id] - idx > sizeof(Buffer::data) + ? sizeof(Buffer::data) + : size[id] - idx; + if (idx < size[id]) + inline_memcpy(dst[id] + idx, buffer->data, len); + }); + } + return; } } @@ -243,9 +293,17 @@ /// port if we find an index that is in a valid sending state. That is, there /// are send operations pending that haven't been serviced on this port. Each /// port instance uses an associated \p opcode to tell the server what to do. -LIBC_INLINE cpp::optional Client::try_open(uint16_t opcode) { +/// Opening a port is only valid if the `opcode` is the sam accross every +/// participating thread. +LIBC_INLINE cpp::optional Client::try_open(const uint16_t opcode) { // Attempt to acquire the lock on this index. - if (lock->fetch_or(1, cpp::MemoryOrder::RELAXED)) + uint64_t lane_mask = gpu::get_lane_mask(); + uint32_t previous = 0; + if (is_first_lane(lane_mask)) + previous = lock->fetch_or(1, cpp::MemoryOrder::RELAXED); + previous = gpu::broadcast_value(previous); + + if (previous != 0) return cpp::nullopt; uint32_t in = inbox->load(cpp::MemoryOrder::RELAXED); @@ -258,7 +316,10 @@ return cpp::nullopt; } - buffer->opcode = opcode; + if (is_first_lane(lane_mask)) { + buffer[0].opcode = opcode; + buffer[0].mask = lane_mask; + } return Port(*this, 0, out); } @@ -282,7 +343,13 @@ return cpp::nullopt; // Attempt to acquire the lock on this index. - if (lock->fetch_or(1, cpp::MemoryOrder::RELAXED)) + uint64_t lane_mask = gpu::get_lane_mask(); + uint32_t previous = 0; + if (is_first_lane(lane_mask)) + previous = lock->fetch_or(1, cpp::MemoryOrder::RELAXED); + previous = gpu::broadcast_value(previous); + + if (previous != 0) return cpp::nullopt; in = inbox->load(cpp::MemoryOrder::RELAXED); diff --git a/libc/src/__support/RPC/rpc_util.h b/libc/src/__support/RPC/rpc_util.h --- a/libc/src/__support/RPC/rpc_util.h +++ b/libc/src/__support/RPC/rpc_util.h @@ -9,12 +9,16 @@ #ifndef LLVM_LIBC_SRC_SUPPORT_RPC_RPC_UTILS_H #define LLVM_LIBC_SRC_SUPPORT_RPC_RPC_UTILS_H +#include "src/__support/GPU/utils.h" #include "src/__support/macros/attributes.h" #include "src/__support/macros/properties/architectures.h" namespace __llvm_libc { namespace rpc { +/// Maximum amount of data a single lane can use. +constexpr uint64_t MAX_LANE_SIZE = 64; + /// Suspend the thread briefly to assist the thread scheduler during busy loops. LIBC_INLINE void sleep_briefly() { #if defined(LIBC_TARGET_ARCH_IS_NVPTX) && __CUDA_ARCH__ >= 700 @@ -26,6 +30,25 @@ #endif } +/// Get the first active thread inside the lane. +LIBC_INLINE uint64_t get_first_lane_id(uint64_t lane_mask) { + return __builtin_ffsl(lane_mask) - 1; +} + +/// Conditional that is only true for a single thread in a lane. +LIBC_INLINE bool is_first_lane(uint64_t lane_mask) { + return gpu::get_lane_id() == get_first_lane_id(lane_mask); +} + +/// Conditional to indicate if this process is running on the GPU. +LIBC_INLINE constexpr bool is_process_gpu() { +#if defined(LIBC_TARGET_ARCH_IS_GPU) + return true; +#else + return false; +#endif +} + } // namespace rpc } // namespace __llvm_libc diff --git a/libc/startup/gpu/amdgpu/start.cpp b/libc/startup/gpu/amdgpu/start.cpp --- a/libc/startup/gpu/amdgpu/start.cpp +++ b/libc/startup/gpu/amdgpu/start.cpp @@ -20,7 +20,7 @@ void init_rpc(void *in, void *out, void *buffer) { // Only a single thread should update the RPC data. if (gpu::get_thread_id() == 0 && gpu::get_block_id() == 0) { - rpc::client.reset(&lock, in, out, buffer); + rpc::client.reset(gpu::get_lane_size(), &lock, in, out, buffer); init.store(1, cpp::MemoryOrder::RELAXED); } diff --git a/libc/startup/gpu/nvptx/start.cpp b/libc/startup/gpu/nvptx/start.cpp --- a/libc/startup/gpu/nvptx/start.cpp +++ b/libc/startup/gpu/nvptx/start.cpp @@ -20,7 +20,7 @@ void init_rpc(void *in, void *out, void *buffer) { // Only a single thread should update the RPC data. if (gpu::get_thread_id() == 0 && gpu::get_block_id() == 0) { - rpc::client.reset(&lock, in, out, buffer); + rpc::client.reset(gpu::get_lane_size(), &lock, in, out, buffer); init.store(1, cpp::MemoryOrder::RELAXED); } diff --git a/libc/test/integration/startup/gpu/CMakeLists.txt b/libc/test/integration/startup/gpu/CMakeLists.txt --- a/libc/test/integration/startup/gpu/CMakeLists.txt +++ b/libc/test/integration/startup/gpu/CMakeLists.txt @@ -22,6 +22,10 @@ libc.src.__support.RPC.rpc_client libc.src.__support.GPU.utils LOADER_ARGS - --blocks 16 - --threads 1 + --blocks-x 2 + --blocks-y 2 + --blocks-z 2 + --threads-x 4 + --threads-y 4 + --threads-z 4 ) diff --git a/libc/test/integration/startup/gpu/rpc_test.cpp b/libc/test/integration/startup/gpu/rpc_test.cpp --- a/libc/test/integration/startup/gpu/rpc_test.cpp +++ b/libc/test/integration/startup/gpu/rpc_test.cpp @@ -13,15 +13,16 @@ using namespace __llvm_libc; static void test_add_simple() { - uint32_t num_additions = 1000 + 10 * gpu::get_block_id_x(); + uint32_t num_additions = + 10 + 10 * gpu::get_thread_id() + 10 * gpu::get_block_id(); uint64_t cnt = 0; for (uint32_t i = 0; i < num_additions; ++i) { rpc::Port port = rpc::client.open(rpc::TEST_INCREMENT); port.send_and_recv( - [=](rpc::Buffer *buffer) { + [=](rpc::Buffer *buffer, uint32_t) { reinterpret_cast(buffer->data)[0] = cnt; }, - [&](rpc::Buffer *buffer) { + [&](rpc::Buffer *buffer, uint32_t) { cnt = reinterpret_cast(buffer->data)[0]; }); port.close(); diff --git a/libc/utils/gpu/loader/Server.h b/libc/utils/gpu/loader/Server.h --- a/libc/utils/gpu/loader/Server.h +++ b/libc/utils/gpu/loader/Server.h @@ -30,27 +30,31 @@ switch (port->get_opcode()) { case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: { - void *str = nullptr; - port->recv_n([&](uint64_t size) { return str = malloc(size); }); - fputs(reinterpret_cast(str), stderr); - free(str); + void *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr}; + port->recv_n( + [&](uint64_t size, uint32_t id) { return strs[id] = malloc(size); }); + for (void *str : strs) { + if (str) { + fputs(reinterpret_cast(str), stderr); + free(str); + } + } break; } case __llvm_libc::rpc::Opcode::EXIT: { - port->recv([](__llvm_libc::rpc::Buffer *buffer) { + port->recv([](__llvm_libc::rpc::Buffer *buffer, uint32_t) { exit(reinterpret_cast(buffer->data)[0]); }); break; } case __llvm_libc::rpc::Opcode::TEST_INCREMENT: { - port->recv_and_send([](__llvm_libc::rpc::Buffer *buffer) { + port->recv_and_send([](__llvm_libc::rpc::Buffer *buffer, uint32_t) { reinterpret_cast(buffer->data)[0] += 1; }); break; } default: - port->recv([](__llvm_libc::rpc::Buffer *) { /* no-op */ }); - return; + port->recv([](__llvm_libc::rpc::Buffer *buffer, uint32_t) {}); } port->close(); } diff --git a/libc/utils/gpu/loader/amdgpu/Loader.cpp b/libc/utils/gpu/loader/amdgpu/Loader.cpp --- a/libc/utils/gpu/loader/amdgpu/Loader.cpp +++ b/libc/utils/gpu/loader/amdgpu/Loader.cpp @@ -287,6 +287,10 @@ hsa_amd_memory_fill(dev_ret, 0, sizeof(int)); // Allocate finegrained memory for the RPC server and client to share. + uint32_t wavefront_size = 0; + if (hsa_status_t err = hsa_agent_get_info( + dev_agent, HSA_AGENT_INFO_WAVEFRONT_SIZE, &wavefront_size)) + handle_error(err); void *server_inbox; void *server_outbox; void *buffer; @@ -299,7 +303,9 @@ /*flags=*/0, &server_outbox)) handle_error(err); if (hsa_status_t err = hsa_amd_memory_pool_allocate( - finegrained_pool, sizeof(__llvm_libc::rpc::Buffer), + finegrained_pool, + sizeof(__llvm_libc::rpc::LaneBuffer) + + (wavefront_size * sizeof(__llvm_libc::rpc::Buffer)), /*flags=*/0, &buffer)) handle_error(err); hsa_amd_agents_allow_access(1, &dev_agent, nullptr, server_inbox); @@ -351,7 +357,7 @@ handle_error(err); // Initialize the RPC server's buffer for host-device communication. - server.reset(&lock, server_inbox, server_outbox, buffer); + server.reset(wavefront_size, &lock, server_inbox, server_outbox, buffer); // Initialize the packet header and set the doorbell signal to begin execution // by the HSA runtime. diff --git a/libc/utils/gpu/loader/nvptx/Loader.cpp b/libc/utils/gpu/loader/nvptx/Loader.cpp --- a/libc/utils/gpu/loader/nvptx/Loader.cpp +++ b/libc/utils/gpu/loader/nvptx/Loader.cpp @@ -108,9 +108,11 @@ if (CUresult err = cuMemsetD32(dev_ret, 0, 1)) handle_error(err); + uint32_t warp_size = 32; void *server_inbox = allocator(sizeof(__llvm_libc::cpp::Atomic)); void *server_outbox = allocator(sizeof(__llvm_libc::cpp::Atomic)); - void *buffer = allocator(sizeof(__llvm_libc::rpc::Buffer)); + void *buffer = allocator(sizeof(__llvm_libc::rpc::LaneBuffer) + + (warp_size * sizeof(__llvm_libc::rpc::Buffer))); if (!server_inbox || !server_outbox || !buffer) handle_error("Failed to allocate memory the RPC client / server."); @@ -130,7 +132,7 @@ CU_LAUNCH_PARAM_END}; // Initialize the RPC server's buffer for host-device communication. - server.reset(&lock, server_inbox, server_outbox, buffer); + server.reset(warp_size, &lock, server_inbox, server_outbox, buffer); // Call the kernel with the given arguments. if (CUresult err = cuLaunchKernel(