diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h --- a/libc/src/__support/RPC/rpc.h +++ b/libc/src/__support/RPC/rpc.h @@ -69,6 +69,12 @@ Payload payload; }; +// TODO: This should be configured by the server and passed in. The general rule +// of thumb is that you should have at least as many ports as possible +// concurrent work items on the GPU to mitigate the lack offorward +// progress guarantees on the GPU. +constexpr uint64_t default_port_size = 64; + /// A common process used to synchronize communication between a client and a /// server. The process contains an inbox and an outbox used for signaling /// ownership of the shared buffer between both sides. @@ -96,20 +102,31 @@ LIBC_INLINE Process &operator=(const Process &) = default; LIBC_INLINE ~Process() = default; + uint64_t port_size; uint32_t lane_size; cpp::Atomic *lock; cpp::Atomic *inbox; cpp::Atomic *outbox; - Packet *buffer; + Packet *packet; /// Initialize the communication channels. - LIBC_INLINE void reset(uint32_t size, void *mtx, void *in, void *out, - void *data) { - lane_size = size; - lock = reinterpret_cast *>(mtx); - inbox = reinterpret_cast *>(in); - outbox = reinterpret_cast *>(out); - buffer = reinterpret_cast(data); + LIBC_INLINE void reset(uint64_t port_size, uint32_t lane_size, void *lock, + void *inbox, void *outbox, void *packet) { + *this = {port_size, + lane_size, + reinterpret_cast *>(lock), + reinterpret_cast *>(inbox), + reinterpret_cast *>(outbox), + reinterpret_cast(packet)}; + } + + /// The length of the packet is flexible because the server needs to look up + /// the lane size at runtime. This helper indexes at the proper offset. + LIBC_INLINE Packet &get_packet(uint64_t index) { + return *reinterpret_cast( + reinterpret_cast(packet) + + index * align_up(sizeof(Header) + lane_size * sizeof(Buffer), + alignof(Packet))); } /// Inverting the bits loaded from the inbox in exactly one of the pair of @@ -139,25 +156,25 @@ /// Invokes a function accross every active buffer across the total lane size. LIBC_INLINE void invoke_rpc(cpp::function fn, - uint32_t index) { + Packet &packet) { if constexpr (is_process_gpu()) { - fn(&buffer[index].payload.slot[gpu::get_lane_id()]); + fn(&packet.payload.slot[gpu::get_lane_id()]); } else { for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size()) - if (buffer[index].header.mask & 1ul << i) - fn(&buffer[index].payload.slot[i]); + if (packet.header.mask & 1ul << i) + fn(&packet.payload.slot[i]); } } /// Alternate version that also provides the index of the current lane. LIBC_INLINE void invoke_rpc(cpp::function fn, - uint32_t index) { + Packet &packet) { if constexpr (is_process_gpu()) { - fn(&buffer[index].payload.slot[gpu::get_lane_id()], gpu::get_lane_id()); + fn(&packet.payload.slot[gpu::get_lane_id()], gpu::get_lane_id()); } else { for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size()) - if (buffer[index].header.mask & 1ul << i) - fn(&buffer[index].payload.slot[i], i); + if (packet.header.mask & 1ul << i) + fn(&packet.payload.slot[i], i); } } }; @@ -182,11 +199,11 @@ template LIBC_INLINE void recv_n(A alloc); LIBC_INLINE uint16_t get_opcode() const { - return process.buffer[index].header.opcode; + return process.get_packet(index).header.opcode; } [[clang::convergent]] LIBC_INLINE void close() { - uint64_t mask = process.buffer[index].header.mask; + uint64_t mask = process.get_packet(index).header.mask; process.lock[index].store(0, cpp::MemoryOrder::RELAXED); cpp::atomic_thread_fence(cpp::MemoryOrder::RELEASE); // This lane sync forces explicit ordering of the open and close calls. @@ -234,7 +251,7 @@ } // Apply the \p fill function to initialize the buffer and release the memory. - process.invoke_rpc(fill, index); + process.invoke_rpc(fill, process.get_packet(index)); out = !out; atomic_thread_fence(cpp::MemoryOrder::RELEASE); process.outbox[index].store(out, cpp::MemoryOrder::RELAXED); @@ -252,7 +269,7 @@ atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); // Apply the \p use function to read the memory out of the buffer. - process.invoke_rpc(use, index); + process.invoke_rpc(use, process.get_packet(index)); out = !out; process.outbox[index].store(out, cpp::MemoryOrder::RELAXED); } @@ -293,7 +310,7 @@ inline_memcpy(buffer->data, ptr + idx, len); }); } - gpu::sync_lane(process.buffer[index].header.mask); + gpu::sync_lane(process.get_packet(index).header.mask); } /// Receives an arbitrarily sized data buffer across the shared channel in @@ -349,38 +366,40 @@ /// participating thread. [[clang::convergent]] LIBC_INLINE cpp::optional Client::try_open(uint16_t opcode) { - constexpr uint64_t index = 0; - // Attempt to acquire the lock on this index. - uint64_t lane_mask = gpu::get_lane_mask(); - uint32_t previous = 0; - if (is_first_lane(lane_mask)) - previous = lock->fetch_or(1, cpp::MemoryOrder::ACQ_REL); - previous = gpu::broadcast_value(previous); - - if (previous != 0) - return cpp::nullopt; - - // The mailbox state must be read with the lock held. - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); - - uint32_t in = load_inbox(index); - uint32_t out = outbox->load(cpp::MemoryOrder::RELAXED); - - // Once we acquire the index we need to check if we are in a valid sending - // state. - if (!can_send_data(in, out)) { - lock->store(0, cpp::MemoryOrder::RELAXED); - cpp::atomic_thread_fence(cpp::MemoryOrder::RELEASE); - return cpp::nullopt; - } + // Perform a naive linear scan for a port that can be opened to send data. + for (uint64_t index = 0; index < port_size; ++index) { + // Attempt to acquire the lock on this index. + uint64_t lane_mask = gpu::get_lane_mask(); + uint32_t previous = 0; + if (is_first_lane(lane_mask)) + previous = lock[index].fetch_or(1, cpp::MemoryOrder::ACQ_REL); + previous = gpu::broadcast_value(previous); + + if (previous != 0) + continue; + + // The mailbox state must be read with the lock held. + atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); + + uint32_t in = load_inbox(index); + uint32_t out = outbox->load(cpp::MemoryOrder::RELAXED); + + // Once we acquire the index we need to check if we are in a valid sending + // state. + if (!can_send_data(in, out)) { + lock[index].store(0, cpp::MemoryOrder::RELAXED); + cpp::atomic_thread_fence(cpp::MemoryOrder::RELEASE); + continue; + } - // TODO: The index here will be non-zero when we support multiple slots. - if (is_first_lane(lane_mask)) { - buffer[index].header.opcode = opcode; - buffer[index].header.mask = lane_mask; + if (is_first_lane(lane_mask)) { + get_packet(index).header.opcode = opcode; + get_packet(index).header.mask = lane_mask; + } + gpu::sync_lane(lane_mask); + return Port(*this, index, out); } - gpu::sync_lane(lane_mask); - return Port(*this, 0, out); + return cpp::nullopt; } LIBC_INLINE Client::Port Client::open(uint16_t opcode) { @@ -395,38 +414,41 @@ /// port if it has a pending receive operation [[clang::convergent]] LIBC_INLINE cpp::optional Server::try_open() { - constexpr uint64_t index = 0; - uint32_t in = load_inbox(index); - uint32_t out = outbox->load(cpp::MemoryOrder::RELAXED); - - // The server is passive, if there is no work pending don't bother - // opening a port. - if (!can_recv_data(in, out)) - return cpp::nullopt; - - // Attempt to acquire the lock on this index. - uint64_t lane_mask = gpu::get_lane_mask(); - uint32_t previous = 0; - if (is_first_lane(lane_mask)) - previous = lock->fetch_or(1, cpp::MemoryOrder::ACQ_REL); - previous = gpu::broadcast_value(previous); - - if (previous != 0) - return cpp::nullopt; - - // The mailbox state must be read with the lock held. - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); - - in = load_inbox(index); - out = outbox->load(cpp::MemoryOrder::RELAXED); + // Perform a naive linear scan for a port that has a pending request. + for (uint64_t index = 0; index < port_size; ++index) { + uint32_t in = load_inbox(index); + uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED); + + // The server is passive, if there is no work pending don't bother + // opening a port. + if (!can_recv_data(in, out)) + continue; + + // Attempt to acquire the lock on this index. + uint64_t lane_mask = gpu::get_lane_mask(); + uint32_t previous = 0; + if (is_first_lane(lane_mask)) + previous = lock[index].fetch_or(1, cpp::MemoryOrder::ACQ_REL); + previous = gpu::broadcast_value(previous); + + if (previous != 0) + continue; + + // The mailbox state must be read with the lock held. + atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); + + in = load_inbox(index); + out = outbox->load(cpp::MemoryOrder::RELAXED); + + if (!can_recv_data(in, out)) { + lock[index].store(0, cpp::MemoryOrder::RELAXED); + cpp::atomic_thread_fence(cpp::MemoryOrder::RELEASE); + continue; + } - if (!can_recv_data(in, out)) { - lock->store(0, cpp::MemoryOrder::RELAXED); - cpp::atomic_thread_fence(cpp::MemoryOrder::RELEASE); - return cpp::nullopt; + return Port(*this, index, out); } - - return Port(*this, index, out); + return cpp::nullopt; } LIBC_INLINE Server::Port Server::open() { diff --git a/libc/src/__support/RPC/rpc_util.h b/libc/src/__support/RPC/rpc_util.h --- a/libc/src/__support/RPC/rpc_util.h +++ b/libc/src/__support/RPC/rpc_util.h @@ -49,6 +49,11 @@ #endif } +/// Return \p val aligned "upwards" according to \p align. +template LIBC_INLINE V align_up(V val, A align) { + return ((val + V(align) - 1) / V(align)) * V(align); +} + } // namespace rpc } // namespace __llvm_libc diff --git a/libc/startup/gpu/amdgpu/start.cpp b/libc/startup/gpu/amdgpu/start.cpp --- a/libc/startup/gpu/amdgpu/start.cpp +++ b/libc/startup/gpu/amdgpu/start.cpp @@ -15,7 +15,7 @@ namespace __llvm_libc { -static cpp::Atomic lock = 0; +static cpp::Atomic lock[rpc::default_port_size] = {0}; extern "C" uintptr_t __init_array_start[]; extern "C" uintptr_t __init_array_end[]; @@ -43,7 +43,8 @@ _begin(int argc, char **argv, char **env, void *in, void *out, void *buffer) { // We need to set up the RPC client first in case any of the constructors // require it. - __llvm_libc::rpc::client.reset(__llvm_libc::gpu::get_lane_size(), + __llvm_libc::rpc::client.reset(__llvm_libc::rpc::default_port_size, + __llvm_libc::gpu::get_lane_size(), &__llvm_libc::lock, in, out, buffer); // We want the fini array callbacks to be run after other atexit diff --git a/libc/startup/gpu/nvptx/start.cpp b/libc/startup/gpu/nvptx/start.cpp --- a/libc/startup/gpu/nvptx/start.cpp +++ b/libc/startup/gpu/nvptx/start.cpp @@ -15,7 +15,7 @@ namespace __llvm_libc { -static cpp::Atomic lock = 0; +static cpp::Atomic lock[rpc::default_port_size] = {0}; extern "C" { // Nvidia's 'nvlink' linker does not provide these symbols. We instead need @@ -47,7 +47,8 @@ _begin(int argc, char **argv, char **env, void *in, void *out, void *buffer) { // We need to set up the RPC client first in case any of the constructors // require it. - __llvm_libc::rpc::client.reset(__llvm_libc::gpu::get_lane_size(), + __llvm_libc::rpc::client.reset(__llvm_libc::rpc::default_port_size, + __llvm_libc::gpu::get_lane_size(), &__llvm_libc::lock, in, out, buffer); // We want the fini array callbacks to be run after other atexit diff --git a/libc/utils/gpu/loader/Server.h b/libc/utils/gpu/loader/Server.h --- a/libc/utils/gpu/loader/Server.h +++ b/libc/utils/gpu/loader/Server.h @@ -19,47 +19,51 @@ static __llvm_libc::rpc::Server server; -static __llvm_libc::cpp::Atomic lock; +static __llvm_libc::cpp::Atomic + lock[__llvm_libc::rpc::default_port_size] = {0}; /// Queries the RPC client at least once and performs server-side work if there /// are any active requests. void handle_server() { - auto port = server.try_open(); - if (!port) - return; + // Continue servicing the client until there is no work left and we return. + for (;;) { + auto port = server.try_open(); + if (!port) + return; - switch (port->get_opcode()) { - case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: { - uint64_t str_size[__llvm_libc::rpc::MAX_LANE_SIZE] = {0}; - char *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr}; - port->recv_n([&](uint64_t size, uint32_t id) { - str_size[id] = size; - strs[id] = new char[size]; - return strs[id]; - }); - for (uint64_t i = 0; i < __llvm_libc::rpc::MAX_LANE_SIZE; ++i) { - if (strs[i]) { - fwrite(strs[i], str_size[i], 1, stderr); - delete[] strs[i]; + switch (port->get_opcode()) { + case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: { + uint64_t str_size[__llvm_libc::rpc::MAX_LANE_SIZE] = {0}; + char *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr}; + port->recv_n([&](uint64_t size, uint32_t id) { + str_size[id] = size; + strs[id] = new char[size]; + return strs[id]; + }); + for (uint64_t i = 0; i < __llvm_libc::rpc::MAX_LANE_SIZE; ++i) { + if (strs[i]) { + fwrite(strs[i], str_size[i], 1, stderr); + delete[] strs[i]; + } } + break; } - break; - } - case __llvm_libc::rpc::Opcode::EXIT: { - port->recv([](__llvm_libc::rpc::Buffer *buffer) { - exit(reinterpret_cast(buffer->data)[0]); - }); - break; - } - case __llvm_libc::rpc::Opcode::TEST_INCREMENT: { - port->recv_and_send([](__llvm_libc::rpc::Buffer *buffer) { - reinterpret_cast(buffer->data)[0] += 1; - }); - break; - } - default: - port->recv([](__llvm_libc::rpc::Buffer *buffer) {}); + case __llvm_libc::rpc::Opcode::EXIT: { + port->recv([](__llvm_libc::rpc::Buffer *buffer) { + exit(reinterpret_cast(buffer->data)[0]); + }); + break; + } + case __llvm_libc::rpc::Opcode::TEST_INCREMENT: { + port->recv_and_send([](__llvm_libc::rpc::Buffer *buffer) { + reinterpret_cast(buffer->data)[0] += 1; + }); + break; + } + default: + port->recv([](__llvm_libc::rpc::Buffer *buffer) {}); + } + port->close(); } - port->close(); } #endif diff --git a/libc/utils/gpu/loader/amdgpu/Loader.cpp b/libc/utils/gpu/loader/amdgpu/Loader.cpp --- a/libc/utils/gpu/loader/amdgpu/Loader.cpp +++ b/libc/utils/gpu/loader/amdgpu/Loader.cpp @@ -353,6 +353,7 @@ hsa_amd_memory_fill(dev_ret, 0, sizeof(int)); // Allocate finegrained memory for the RPC server and client to share. + uint64_t port_size = __llvm_libc::rpc::default_port_size; uint32_t wavefront_size = 0; if (hsa_status_t err = hsa_agent_get_info( dev_agent, HSA_AGENT_INFO_WAVEFRONT_SIZE, &wavefront_size)) @@ -361,18 +362,19 @@ void *server_outbox; void *buffer; if (hsa_status_t err = hsa_amd_memory_pool_allocate( - finegrained_pool, sizeof(__llvm_libc::cpp::Atomic), + finegrained_pool, port_size * sizeof(__llvm_libc::cpp::Atomic), /*flags=*/0, &server_inbox)) handle_error(err); if (hsa_status_t err = hsa_amd_memory_pool_allocate( - finegrained_pool, sizeof(__llvm_libc::cpp::Atomic), + finegrained_pool, port_size * sizeof(__llvm_libc::cpp::Atomic), /*flags=*/0, &server_outbox)) handle_error(err); if (hsa_status_t err = hsa_amd_memory_pool_allocate( finegrained_pool, - align_up(sizeof(__llvm_libc::rpc::Header) + - (wavefront_size * sizeof(__llvm_libc::rpc::Buffer)), - alignof(__llvm_libc::rpc::Packet)), + port_size * + align_up(sizeof(__llvm_libc::rpc::Header) + + (wavefront_size * sizeof(__llvm_libc::rpc::Buffer)), + alignof(__llvm_libc::rpc::Packet)), /*flags=*/0, &buffer)) handle_error(err); hsa_amd_agents_allow_access(1, &dev_agent, nullptr, server_inbox); @@ -380,7 +382,8 @@ hsa_amd_agents_allow_access(1, &dev_agent, nullptr, buffer); // Initialize the RPC server's buffer for host-device communication. - server.reset(wavefront_size, &lock, server_inbox, server_outbox, buffer); + server.reset(port_size, wavefront_size, &lock, server_inbox, server_outbox, + buffer); // Obtain a queue with the minimum (power of two) size, used to send commands // to the HSA runtime and launch execution on the device. diff --git a/libc/utils/gpu/loader/nvptx/Loader.cpp b/libc/utils/gpu/loader/nvptx/Loader.cpp --- a/libc/utils/gpu/loader/nvptx/Loader.cpp +++ b/libc/utils/gpu/loader/nvptx/Loader.cpp @@ -269,18 +269,22 @@ if (CUresult err = cuMemsetD32(dev_ret, 0, 1)) handle_error(err); + uint64_t port_size = __llvm_libc::rpc::default_port_size; uint32_t warp_size = 32; - void *server_inbox = allocator(sizeof(__llvm_libc::cpp::Atomic)); - void *server_outbox = allocator(sizeof(__llvm_libc::cpp::Atomic)); - void *buffer = - allocator(align_up(sizeof(__llvm_libc::rpc::Header) + - (warp_size * sizeof(__llvm_libc::rpc::Buffer)), - alignof(__llvm_libc::rpc::Packet))); + void *server_inbox = + allocator(port_size * sizeof(__llvm_libc::cpp::Atomic)); + void *server_outbox = + allocator(port_size * sizeof(__llvm_libc::cpp::Atomic)); + void *buffer = allocator( + port_size * align_up(sizeof(__llvm_libc::rpc::Header) + + (warp_size * sizeof(__llvm_libc::rpc::Buffer)), + alignof(__llvm_libc::rpc::Packet))); if (!server_inbox || !server_outbox || !buffer) handle_error("Failed to allocate memory the RPC client / server."); // Initialize the RPC server's buffer for host-device communication. - server.reset(warp_size, &lock, server_inbox, server_outbox, buffer); + server.reset(port_size, warp_size, &lock, server_inbox, server_outbox, + buffer); LaunchParameters single_threaded_params = {1, 1, 1, 1, 1, 1}; // Call the kernel to