diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h --- a/libc/src/__support/RPC/rpc.h +++ b/libc/src/__support/RPC/rpc.h @@ -37,6 +37,7 @@ EXIT = 2, TEST_INCREMENT = 3, TEST_INTERFACE = 4, + TEST_STREAM = 5, }; /// A fixed size channel used to communicate between the RPC client and server. @@ -300,8 +301,10 @@ template LIBC_INLINE void send_and_recv(F fill, U use); template LIBC_INLINE void recv_and_send(W work); + LIBC_INLINE void send_n(const void *const *src, uint64_t *size); LIBC_INLINE void send_n(const void *src, uint64_t size); - template LIBC_INLINE void recv_n(A alloc); + template + LIBC_INLINE void recv_n(void **dst, uint64_t *size, A &&alloc); LIBC_INLINE uint16_t get_opcode() const { return process.get_packet(index).header.opcode; @@ -406,67 +409,65 @@ /// Sends an arbitrarily sized data buffer \p src across the shared channel in /// multiples of the packet length. template -LIBC_INLINE void Port::send_n(const void *src, uint64_t size) { +LIBC_INLINE void Port::send_n(const void *const *src, uint64_t *size) { // TODO: We could send the first bytes in this call and potentially save an // extra send operation. // TODO: We may need a way for the CPU to send different strings per thread. - send([=](Buffer *buffer) { - reinterpret_cast(buffer->data)[0] = size; + uint64_t num_sends = 0; + send([&](Buffer *buffer, uint32_t id) { + reinterpret_cast(buffer->data)[0] = lane_value(size, id); + num_sends = is_process_gpu() ? lane_value(size, id) + : max(lane_value(size, id), num_sends); }); - const uint8_t *ptr = reinterpret_cast(src); - for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { - send([=](Buffer *buffer) { - const uint64_t len = - size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx; - inline_memcpy(buffer->data, ptr + idx, len); + for (uint64_t idx = 0; idx < num_sends; idx += sizeof(Buffer::data)) { + send([=](Buffer *buffer, uint32_t id) { + const uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data) + ? sizeof(Buffer::data) + : lane_value(size, id) - idx; + if (idx < lane_value(size, id)) + inline_memcpy( + buffer->data, + reinterpret_cast(lane_value(src, id)) + idx, len); }); } gpu::sync_lane(process.get_packet(index).header.mask); } +/// Helper routine to simplify the interface when sending from the GPU using +/// thread private pointers to the underlying value. +template +LIBC_INLINE void Port::send_n(const void *src, uint64_t size) { + static_assert(is_process_gpu(), "Only valid when running on the GPU"); + const void **src_ptr = &src; + uint64_t *size_ptr = &size; + send_n(src_ptr, size_ptr); +} + /// Receives an arbitrarily sized data buffer across the shared channel in /// multiples of the packet length. The \p alloc function is called with the /// size of the data so that we can initialize the size of the \p dst buffer. template template -LIBC_INLINE void Port::recv_n(A alloc) { - // The GPU handles thread private variables and masking implicitly through its - // execution model. If this is the CPU we need to manually handle the - // possibility that the sent data is of different length. - if constexpr (is_process_gpu()) { - uint64_t size = 0; - recv([&](Buffer *buffer) { - size = reinterpret_cast(buffer->data)[0]; - }); - uint8_t *dst = reinterpret_cast(alloc(size), gpu::get_lane_id()); - for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { - recv([=](Buffer *buffer) { - uint64_t len = size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) - : size - idx; - inline_memcpy(dst + idx, buffer->data, len); - }); - } - return; - } else { - uint64_t size[MAX_LANE_SIZE]; - uint8_t *dst[MAX_LANE_SIZE]; - uint64_t max = 0; - recv([&](Buffer *buffer, uint32_t id) { - size[id] = reinterpret_cast(buffer->data)[0]; - dst[id] = reinterpret_cast(alloc(size[id], id)); - max = size[id] > max ? size[id] : max; +LIBC_INLINE void Port::recv_n(void **dst, uint64_t *size, A &&alloc) { + uint64_t num_recvs = 0; + recv([&](Buffer *buffer, uint32_t id) { + lane_value(size, id) = reinterpret_cast(buffer->data)[0]; + lane_value(dst, id) = + reinterpret_cast(alloc(lane_value(size, id))); + num_recvs = is_process_gpu() ? lane_value(size, id) + : max(lane_value(size, id), num_recvs); + }); + for (uint64_t idx = 0; idx < num_recvs; idx += sizeof(Buffer::data)) { + recv([=](Buffer *buffer, uint32_t id) { + uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data) + ? sizeof(Buffer::data) + : lane_value(size, id) - idx; + if (idx < lane_value(size, id)) + inline_memcpy(reinterpret_cast(lane_value(dst, id)) + idx, + buffer->data, len); }); - for (uint64_t idx = 0; idx < max; idx += sizeof(Buffer::data)) { - recv([=](Buffer *buffer, uint32_t id) { - uint64_t len = size[id] - idx > sizeof(Buffer::data) - ? sizeof(Buffer::data) - : size[id] - idx; - if (idx < size[id]) - inline_memcpy(dst[id] + idx, buffer->data, len); - }); - } - return; } + return; } /// Attempts to open a port to use as the client. The client can only open a diff --git a/libc/src/__support/RPC/rpc_util.h b/libc/src/__support/RPC/rpc_util.h --- a/libc/src/__support/RPC/rpc_util.h +++ b/libc/src/__support/RPC/rpc_util.h @@ -54,6 +54,21 @@ return ((val + V(align) - 1) / V(align)) * V(align); } +/// Utility to provide a unified interface between the CPU and GPU's memory +/// model. On the GPU stack variables are always private to a lane so we can +/// simply use the variable passed in. On the CPU we need to allocate enough +/// space for the whole lane and index into it. +template LIBC_INLINE V &lane_value(V *val, uint32_t id) { + if constexpr (is_process_gpu()) + return *val; + return val[id]; +} + +/// Helper to get the maximum value. +template LIBC_INLINE const T &max(const T &x, const T &y) { + return x < y ? y : x; +} + } // namespace rpc } // namespace __llvm_libc diff --git a/libc/test/integration/startup/gpu/CMakeLists.txt b/libc/test/integration/startup/gpu/CMakeLists.txt --- a/libc/test/integration/startup/gpu/CMakeLists.txt +++ b/libc/test/integration/startup/gpu/CMakeLists.txt @@ -43,3 +43,12 @@ SRCS rpc_interface_test.cpp ) + +add_integration_test( + startup_rpc_stream_test + SUITE libc-startup-tests + SRCS + rpc_stream_test.cpp + LOADER_ARGS + --threads-x 32 +) diff --git a/libc/test/integration/startup/gpu/rpc_stream_test.cpp b/libc/test/integration/startup/gpu/rpc_stream_test.cpp new file mode 100644 --- /dev/null +++ b/libc/test/integration/startup/gpu/rpc_stream_test.cpp @@ -0,0 +1,50 @@ +//===-- Loader test to check the RPC streaming interface with the loader --===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#include "src/__support/GPU/utils.h" +#include "src/__support/RPC/rpc_client.h" +#include "src/__support/integer_to_string.h" +#include "src/string/memory_utils/memcmp_implementations.h" +#include "src/string/memory_utils/memcpy_implementations.h" +#include "src/string/string_utils.h" +#include "test/IntegrationTest/test.h" + +extern "C" void *malloc(uint64_t); + +using namespace __llvm_libc; + +static void test_stream() { + const char str[] = "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy" + "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy" + "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy" + "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy" + "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy"; + uint64_t send_size = sizeof(str); + void *send_ptr = malloc(send_size); + void *recv_ptr; + uint64_t recv_size; + + inline_memcpy(send_ptr, str, send_size); + ASSERT_TRUE(inline_memcmp(send_ptr, str, send_size) == 0 && "Data mismatch"); + rpc::Client::Port port = rpc::client.open(); + port.send_n(send_ptr, send_size); + port.recv_n(&recv_ptr, &recv_size, + [](uint64_t size) { return malloc(size); }); + port.close(); + ASSERT_TRUE(inline_memcmp(recv_ptr, str, recv_size) == 0 && "Data mismatch"); + ASSERT_TRUE(recv_size == send_size && "Data size mismatch"); + + free(send_ptr); + free(recv_ptr); +} + +TEST_MAIN(int argc, char **argv, char **envp) { + test_stream(); + + return 0; +} diff --git a/libc/utils/gpu/loader/Server.h b/libc/utils/gpu/loader/Server.h --- a/libc/utils/gpu/loader/Server.h +++ b/libc/utils/gpu/loader/Server.h @@ -32,17 +32,13 @@ switch (port->get_opcode()) { case rpc::Opcode::PRINT_TO_STDERR: { - uint64_t str_size[rpc::MAX_LANE_SIZE] = {0}; - char *strs[rpc::MAX_LANE_SIZE] = {nullptr}; - port->recv_n([&](uint64_t size, uint32_t id) { - str_size[id] = size; - strs[id] = new char[size]; - return strs[id]; - }); + uint64_t sizes[rpc::MAX_LANE_SIZE] = {0}; + void *strs[rpc::MAX_LANE_SIZE] = {nullptr}; + port->recv_n(strs, sizes, [&](uint64_t size) { return new char[size]; }); for (uint64_t i = 0; i < rpc::MAX_LANE_SIZE; ++i) { if (strs[i]) { - fwrite(strs[i], str_size[i], 1, stderr); - delete[] strs[i]; + fwrite(strs[i], sizes[i], 1, stderr); + delete[] reinterpret_cast(strs[i]); } } break; @@ -78,6 +74,17 @@ [&](rpc::Buffer *buffer) { buffer->data[0] = cnt = cnt + 1; }); break; } + case rpc::Opcode::TEST_STREAM: { + uint64_t sizes[rpc::MAX_LANE_SIZE] = {0}; + void *dst[rpc::MAX_LANE_SIZE] = {nullptr}; + port->recv_n(dst, sizes, [](uint64_t size) { return new char[size]; }); + port->send_n(dst, sizes); + for (uint64_t i = 0; i < rpc::MAX_LANE_SIZE; ++i) { + if (dst[i]) + delete[] reinterpret_cast(dst[i]); + } + break; + } default: port->recv([](rpc::Buffer *buffer) {}); }