diff --git a/libc/src/__support/OSUtil/gpu/io.h b/libc/src/__support/OSUtil/gpu/io.h --- a/libc/src/__support/OSUtil/gpu/io.h +++ b/libc/src/__support/OSUtil/gpu/io.h @@ -16,10 +16,6 @@ void write_to_stderr(cpp::string_view msg); -LIBC_INLINE void write_to_stderr(const char *msg) { - write_to_stderr(cpp::string_view(msg)); -} - } // namespace __llvm_libc #endif // LLVM_LIBC_SRC_SUPPORT_OSUTIL_LINUX_IO_H diff --git a/libc/src/__support/OSUtil/gpu/io.cpp b/libc/src/__support/OSUtil/gpu/io.cpp --- a/libc/src/__support/OSUtil/gpu/io.cpp +++ b/libc/src/__support/OSUtil/gpu/io.cpp @@ -14,34 +14,10 @@ namespace __llvm_libc { -namespace internal { - -static constexpr size_t BUFFER_SIZE = sizeof(rpc::Buffer) - sizeof(uint64_t); -static constexpr size_t MAX_STRING_SIZE = BUFFER_SIZE; - -LIBC_INLINE void send_null_terminated(cpp::string_view src) { - rpc::client.run( - [&](rpc::Buffer *buffer) { - buffer->data[0] = rpc::Opcode::PRINT_TO_STDERR; - char *data = reinterpret_cast(&buffer->data[1]); - inline_memcpy(data, src.data(), src.size()); - data[src.size()] = '\0'; - }, - [](rpc::Buffer *) { /* void */ }); -} - -} // namespace internal - void write_to_stderr(cpp::string_view msg) { - bool send_empty_string = true; - for (; !msg.empty();) { - const auto chunk = msg.substr(0, internal::MAX_STRING_SIZE); - internal::send_null_terminated(chunk); - msg.remove_prefix(chunk.size()); - send_empty_string = false; - } - if (send_empty_string) - internal::send_null_terminated(""); + rpc::Port port = rpc::client.open(rpc::PRINT_TO_STDERR); + port.send_n(msg.data(), msg.size() + 1); + port.close(); } } // namespace __llvm_libc diff --git a/libc/src/__support/OSUtil/gpu/quick_exit.cpp b/libc/src/__support/OSUtil/gpu/quick_exit.cpp --- a/libc/src/__support/OSUtil/gpu/quick_exit.cpp +++ b/libc/src/__support/OSUtil/gpu/quick_exit.cpp @@ -17,14 +17,9 @@ namespace __llvm_libc { void quick_exit(int status) { - // TODO: Support asynchronous calls so we don't wait and exit from the GPU - // immediately. - rpc::client.run( - [&](rpc::Buffer *buffer) { - buffer->data[0] = rpc::Opcode::EXIT; - buffer->data[1] = status; - }, - [](rpc::Buffer *) { /* void */ }); + rpc::Port port = rpc::client.open(rpc::EXIT); + port.send([&](rpc::Buffer *buffer) { buffer->data[0] = status; }); + port.close(); #if defined(LIBC_TARGET_ARCH_IS_NVPTX) asm("exit;" ::: "memory"); diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h --- a/libc/src/__support/RPC/rpc.h +++ b/libc/src/__support/RPC/rpc.h @@ -20,42 +20,70 @@ #include "rpc_util.h" #include "src/__support/CPP/atomic.h" +#include "src/__support/CPP/optional.h" +#include "src/string/memory_utils/memcpy_implementations.h" #include namespace __llvm_libc { namespace rpc { -/// A list of opcodes that we use to invoke certain actions on the server. We -/// reserve the first 255 values for internal libc usage. -enum Opcode : uint64_t { +/// A list of opcodes that we use to invoke certain actions on the server. +enum Opcode : uint16_t { NOOP = 0, PRINT_TO_STDERR = 1, EXIT = 2, - LIBC_LAST = (1UL << 8) - 1, }; /// A fixed size channel used to communicate between the RPC client and server. struct Buffer { uint64_t data[8]; + // TODO: This should be put in a separate buffer to avoid excess padding. + uint16_t opcode; }; /// A common process used to synchronize communication between a client and a /// server. The process contains an inbox and an outbox used for signaling -/// ownership of the shared buffer. +/// ownership of the shared buffer between both sides. +/// +/// This process is designed to support mostly arbitrary combinations of 'send' +/// and 'recv' operations on the shared buffer as long as these operations are +/// mirrored by the other process. These operations exchange ownership of the +/// fixed-size buffer between the users of the protocol. The assumptions when +/// using this process are as follows: +/// - The client will always start with a 'send' operation +/// - The server will always start with a 'recv' operation +/// - For every 'send' / 'recv' call on one side of the process there is a +/// mirrored 'recv' / 'send' call. +/// +/// The communication protocol is organized as a pair of two-state state +/// machines. One state machine tracks outgoing sends and the other tracks +/// incoming receives. For example, a 'send' operation uses its input 'Ack' bit +/// and its output 'Data' bit. If these bits are equal the sender owns the +/// buffer, otherwise the receiver owns the buffer and we wait. Similarly, a +/// 'recv' operation uses its output 'Ack' bit and input 'Data' bit. If these +/// bits are not equal the receiver owns the buffer, otherwise the sender owns +/// the buffer. struct Process { LIBC_INLINE Process() = default; LIBC_INLINE Process(const Process &) = default; LIBC_INLINE Process &operator=(const Process &) = default; LIBC_INLINE ~Process() = default; + enum Flags : uint32_t { + Data = 0b01, // bit 0. + Ack = 0b10, // bit 1. + }; + + cpp::Atomic *lock; cpp::Atomic *inbox; cpp::Atomic *outbox; Buffer *buffer; /// Initialize the communication channels. - LIBC_INLINE void reset(void *inbox, void *outbox, void *buffer) { + LIBC_INLINE void reset(void *lock, void *inbox, void *outbox, void *buffer) { *this = { + reinterpret_cast *>(lock), reinterpret_cast *>(inbox), reinterpret_cast *>(outbox), reinterpret_cast(buffer), @@ -63,6 +91,39 @@ } }; +/// The port provides the interface to communicate between the multiple +/// processes. A port is conceptually an index into the memory provided by the +/// underlying process that is guarded by a lock bit. +struct Port { + // TODO: This should be move-only. + LIBC_INLINE Port(Process &process, uint64_t index, uint32_t out) + : process(process), index(index), out(out) {} + LIBC_INLINE Port(const Port &) = default; + LIBC_INLINE Port &operator=(const Port &) = delete; + LIBC_INLINE ~Port() = default; + + template LIBC_INLINE void recv(U use); + template LIBC_INLINE void send(F fill); + template + LIBC_INLINE void send_and_recv(F fill, U use); + template LIBC_INLINE void recv_and_send(W work); + LIBC_INLINE void send_n(const void *src, uint64_t size); + template LIBC_INLINE void recv_n(A alloc); + + LIBC_INLINE uint16_t get_opcode() const { + return process.buffer[index].opcode; + } + + LIBC_INLINE void close() { + process.lock[index].store(0, cpp::MemoryOrder::RELAXED); + } + +private: + Process &process; + uint64_t index; + uint32_t out; +}; + /// The RPC client used to make requests to the server. struct Client : public Process { LIBC_INLINE Client() = default; @@ -70,7 +131,8 @@ LIBC_INLINE Client &operator=(const Client &) = default; LIBC_INLINE ~Client() = default; - template LIBC_INLINE void run(F fill, U use); + LIBC_INLINE cpp::optional try_open(uint16_t opcode); + LIBC_INLINE Port open(uint16_t opcode); }; /// The RPC server used to respond to the client. @@ -80,88 +142,156 @@ LIBC_INLINE Server &operator=(const Server &) = default; LIBC_INLINE ~Server() = default; - template LIBC_INLINE bool handle(W work, C clean); + LIBC_INLINE cpp::optional try_open(); + LIBC_INLINE Port open(); }; -/// Run the RPC client protocol to communicate with the server. We perform the -/// following high level actions to complete a communication: -/// - Apply \p fill to the shared buffer and write 1 to the outbox. -/// - Wait until the inbox is 1. -/// - Apply \p use to the shared buffer and write 0 to the outbox. -/// - Wait until the inbox is 0. -template LIBC_INLINE void Client::run(F fill, U use) { - bool in = inbox->load(cpp::MemoryOrder::RELAXED); - bool out = outbox->load(cpp::MemoryOrder::RELAXED); - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); - // Apply the \p fill to the buffer and signal the server. - if (!in & !out) { - fill(buffer); - atomic_thread_fence(cpp::MemoryOrder::RELEASE); - outbox->store(1, cpp::MemoryOrder::RELAXED); - out = 1; +/// Applies \p fill to the shared buffer and initiates a send operation. +template LIBC_INLINE void Port::send(F fill) { + uint32_t in = process.inbox[index].load(cpp::MemoryOrder::RELAXED); + + // We wait until we own the buffer before sending. We own the buffer if the + // input Ack bit is equal to the output Data bit. + while (bool(in & Process::Ack) != bool(out & Process::Data)) { + sleep_briefly(); + in = process.inbox[index].load(cpp::MemoryOrder::RELAXED); } - // Wait for the server to work on the buffer and respond. - if (!in & out) { - while (!in) { - sleep_briefly(); - in = inbox->load(cpp::MemoryOrder::RELAXED); - } - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); + + // Apply the \p fill function to initialize the buffer and release the memory. + fill(&process.buffer[index]); + atomic_thread_fence(cpp::MemoryOrder::RELEASE); + process.outbox[index].store(out ^ Process::Data, cpp::MemoryOrder::RELAXED); + out = out ^ Process::Data; +} + +/// Applies \p use to the shared buffer and acknowledges the send. +template LIBC_INLINE void Port::recv(U use) { + uint32_t in = process.inbox[index].load(cpp::MemoryOrder::RELAXED); + + // We wait until we own the buffer before reading its contents. We own the + // buffer if the input Data bit is not equal to the output Ack bit. + while (bool(in & Process::Data) == bool(out & Process::Ack)) { + sleep_briefly(); + in = process.inbox[index].load(cpp::MemoryOrder::RELAXED); } - // Apply \p use to the buffer and signal the server. - if (in & out) { - use(buffer); - atomic_thread_fence(cpp::MemoryOrder::RELEASE); - outbox->store(0, cpp::MemoryOrder::RELAXED); - out = 0; + atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); + + // Apply the \p use function to read the memory out of the buffer. + use(&process.buffer[index]); + process.outbox[index].store(out ^ Process::Ack, cpp::MemoryOrder::RELAXED); + out = out ^ Process::Ack; +} + +/// Combines a send and receive into a single function. +template +LIBC_INLINE void Port::send_and_recv(F fill, U use) { + send(fill); + recv(use); +} + +/// Combines a receive and send operation into a single function. The \p work +/// function modifies the buffer in-place and the send is only used to initiate +/// the copy back. +template LIBC_INLINE void Port::recv_and_send(W work) { + recv(work); + send([](Buffer *) { /* no-op */ }); +} + +/// Sends an arbitrarily sized data buffer \p src across the shared channel in +/// multiples of the packet length. +LIBC_INLINE void Port::send_n(const void *src, uint64_t size) { + // TODO: We could send the first bytes in this call and potentially save an + // extra send operation. + send([=](Buffer *buffer) { buffer->data[0] = size; }); + for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { + send([=](Buffer *buffer) { + const uint64_t len = + size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx; + inline_memcpy(buffer->data, reinterpret_cast(src) + idx, + len); + }); } - // Wait for the server to signal the end of the protocol. - if (in & !out) { - while (in) { - sleep_briefly(); - in = inbox->load(cpp::MemoryOrder::RELAXED); - } - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); +} + +/// Receives an arbitrarily sized data buffer across the shared channel in +/// multiples of the packet length. The \p alloc function is called with the +/// size of the data so that we can initialize the size of the \p dst buffer. +template LIBC_INLINE void Port::recv_n(A alloc) { + uint64_t size = 0; + recv([&](Buffer *buffer) { size = buffer->data[0]; }); + void *dst = alloc(size); + for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) { + recv([=](Buffer *buffer) { + uint64_t len = + size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx; + inline_memcpy(reinterpret_cast(dst) + idx, buffer->data, len); + }); } } -/// Run the RPC server protocol to communicate with the client. This is -/// non-blocking and only checks the server a single time. We perform the -/// following high level actions to complete a communication: -/// - Query if the inbox is 1 and exit if there is no work to do. -/// - Apply \p work to the shared buffer and write 1 to the outbox. -/// - Wait until the inbox is 0. -/// - Apply \p clean to the shared buffer and write 0 to the outbox. -template -LIBC_INLINE bool Server::handle(W work, C clean) { - bool in = inbox->load(cpp::MemoryOrder::RELAXED); - bool out = outbox->load(cpp::MemoryOrder::RELAXED); - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); - // There is no work to do, exit early. - if (!in & !out) - return false; - // Apply \p work to the buffer and signal the client. - if (in & !out) { - work(buffer); - atomic_thread_fence(cpp::MemoryOrder::RELEASE); - outbox->store(1, cpp::MemoryOrder::RELAXED); - out = 1; +/// Attempts to open a port to use as the client. The client can only open a +/// port if we find an index that is in a valid sending state. That is, there +/// are send operations pending that haven't been serviced on this port. Each +/// port instance uses an associated \p opcode to tell the server what to do. +LIBC_INLINE cpp::optional Client::try_open(uint16_t opcode) { + // Attempt to acquire the lock on this index. + if (lock->fetch_or(1, cpp::MemoryOrder::RELAXED)) + return cpp::nullopt; + + uint32_t in = inbox->load(cpp::MemoryOrder::RELAXED); + uint32_t out = outbox->load(cpp::MemoryOrder::RELAXED); + + // Once we acquire the index we need to check if we are in a valid sending + // state. + if (bool(in & Process::Ack) != bool(out & Process::Data)) { + lock->store(0, cpp::MemoryOrder::RELAXED); + return cpp::nullopt; } - // Wait for the client to use the buffer and respond. - if (in & out) { - while (in) - in = inbox->load(cpp::MemoryOrder::RELAXED); - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); + + buffer->opcode = opcode; + return Port(*this, 0, out); +} + +LIBC_INLINE Port Client::open(uint16_t opcode) { + for (;;) { + if (cpp::optional p = try_open(opcode)) + return p.value(); + sleep_briefly(); } - // Clean up the buffer and signal the end of the protocol. - if (!in & out) { - clean(buffer); - atomic_thread_fence(cpp::MemoryOrder::RELEASE); - outbox->store(0, cpp::MemoryOrder::RELAXED); - out = 0; +} + +/// Attempts to open a port to use as the server. The server can only open a +/// port if it has a pending receive operation +LIBC_INLINE cpp::optional Server::try_open() { + uint32_t in = inbox->load(cpp::MemoryOrder::RELAXED); + uint32_t out = outbox->load(cpp::MemoryOrder::RELAXED); + + // The server is passive, if there is no work pending don't bother + // opening a port. + if (bool(in & Data) == bool(out & Process::Ack)) + return cpp::nullopt; + + // Attempt to acquire the lock on this index. + if (lock->fetch_or(1, cpp::MemoryOrder::RELAXED)) + return cpp::nullopt; + + in = inbox->load(cpp::MemoryOrder::RELAXED); + out = outbox->load(cpp::MemoryOrder::RELAXED); + + if (bool(in & Data) == bool(out & Process::Ack)) { + lock->store(0, cpp::MemoryOrder::RELAXED); + return cpp::nullopt; } - return true; + return Port(*this, 0, out); +} + +LIBC_INLINE Port Server::open() { + for (;;) { + if (cpp::optional p = try_open()) + return p.value(); + sleep_briefly(); + } } } // namespace rpc diff --git a/libc/startup/gpu/amdgpu/start.cpp b/libc/startup/gpu/amdgpu/start.cpp --- a/libc/startup/gpu/amdgpu/start.cpp +++ b/libc/startup/gpu/amdgpu/start.cpp @@ -8,12 +8,14 @@ #include "src/__support/RPC/rpc_client.h" +static __llvm_libc::cpp::Atomic lock; + extern "C" int main(int argc, char **argv, char **envp); extern "C" [[gnu::visibility("protected"), clang::amdgpu_kernel]] void _start(int argc, char **argv, char **envp, int *ret, void *in, void *out, void *buffer) { - __llvm_libc::rpc::client.reset(in, out, buffer); + __llvm_libc::rpc::client.reset(&lock, in, out, buffer); __atomic_fetch_or(ret, main(argc, argv, envp), __ATOMIC_RELAXED); } diff --git a/libc/startup/gpu/nvptx/start.cpp b/libc/startup/gpu/nvptx/start.cpp --- a/libc/startup/gpu/nvptx/start.cpp +++ b/libc/startup/gpu/nvptx/start.cpp @@ -8,12 +8,14 @@ #include "src/__support/RPC/rpc_client.h" +static __llvm_libc::cpp::Atomic lock; + extern "C" int main(int argc, char **argv, char **envp); extern "C" [[gnu::visibility("protected")]] __attribute__((nvptx_kernel)) void _start(int argc, char **argv, char **envp, int *ret, void *in, void *out, void *buffer) { - __llvm_libc::rpc::client.reset(in, out, buffer); + __llvm_libc::rpc::client.reset(&lock, in, out, buffer); __atomic_fetch_or(ret, main(argc, argv, envp), __ATOMIC_RELAXED); } diff --git a/libc/utils/gpu/loader/CMakeLists.txt b/libc/utils/gpu/loader/CMakeLists.txt --- a/libc/utils/gpu/loader/CMakeLists.txt +++ b/libc/utils/gpu/loader/CMakeLists.txt @@ -1,5 +1,8 @@ add_library(gpu_loader OBJECT Main.cpp) -target_include_directories(gpu_loader PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) +target_include_directories(gpu_loader PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR} + ${LIBC_SOURCE_DIR} +) find_package(hsa-runtime64 QUIET 1.2.0 HINTS ${CMAKE_INSTALL_PREFIX} PATHS /opt/rocm) if(hsa-runtime64_FOUND) diff --git a/libc/utils/gpu/loader/Server.h b/libc/utils/gpu/loader/Server.h new file mode 100644 --- /dev/null +++ b/libc/utils/gpu/loader/Server.h @@ -0,0 +1,51 @@ +//===-- Generic RPC server interface --------------------------------------===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#ifndef LLVM_LIBC_UTILS_GPU_LOADER_RPC_H +#define LLVM_LIBC_UTILS_GPU_LOADER_RPC_H + +#include +#include +#include +#include +#include + +#include "src/__support/RPC/rpc.h" + +static __llvm_libc::rpc::Server server; + +static __llvm_libc::cpp::Atomic lock; + +/// Queries the RPC client at least once and performs server-side work if there +/// are any active requests. +void handle_server() { + auto port = server.try_open(); + if (!port) + return; + + switch (port->get_opcode()) { + case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: { + void *str = nullptr; + port->recv_n([&](uint64_t size) { return str = malloc(size); }); + fputs(reinterpret_cast(str), stderr); + free(str); + break; + } + case __llvm_libc::rpc::Opcode::EXIT: { + port->recv([](__llvm_libc::rpc::Buffer *buffer) { + exit(static_cast(buffer->data[0])); + }); + break; + } + default: + port->recv([](__llvm_libc::rpc::Buffer *) { /* no-op */ }); + return; + } + port->close(); +} +#endif diff --git a/libc/utils/gpu/loader/amdgpu/CMakeLists.txt b/libc/utils/gpu/loader/amdgpu/CMakeLists.txt --- a/libc/utils/gpu/loader/amdgpu/CMakeLists.txt +++ b/libc/utils/gpu/loader/amdgpu/CMakeLists.txt @@ -1,7 +1,6 @@ add_executable(amdhsa_loader Loader.cpp) add_dependencies(amdhsa_loader libc.src.__support.RPC.rpc) -target_include_directories(amdhsa_loader PRIVATE ${LIBC_SOURCE_DIR}) target_link_libraries(amdhsa_loader PRIVATE hsa-runtime64::hsa-runtime64 diff --git a/libc/utils/gpu/loader/amdgpu/Loader.cpp b/libc/utils/gpu/loader/amdgpu/Loader.cpp --- a/libc/utils/gpu/loader/amdgpu/Loader.cpp +++ b/libc/utils/gpu/loader/amdgpu/Loader.cpp @@ -14,8 +14,7 @@ //===----------------------------------------------------------------------===// #include "Loader.h" - -#include "src/__support/RPC/rpc.h" +#include "Server.h" #include #include @@ -39,30 +38,6 @@ void *buffer; }; -static __llvm_libc::rpc::Server server; - -/// Queries the RPC client at least once and performs server-side work if there -/// are any active requests. -void handle_server() { - while (server.handle( - [&](__llvm_libc::rpc::Buffer *buffer) { - switch (static_cast<__llvm_libc::rpc::Opcode>(buffer->data[0])) { - case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: { - fputs(reinterpret_cast(&buffer->data[1]), stderr); - break; - } - case __llvm_libc::rpc::Opcode::EXIT: { - exit(buffer->data[1]); - break; - } - default: - return; - }; - }, - [](__llvm_libc::rpc::Buffer *buffer) {})) - ; -} - /// Print the error code and exit if \p code indicates an error. static void handle_error(hsa_status_t code) { if (code == HSA_STATUS_SUCCESS || code == HSA_STATUS_INFO_BREAK) @@ -373,7 +348,7 @@ handle_error(err); // Initialize the RPC server's buffer for host-device communication. - server.reset(server_inbox, server_outbox, buffer); + server.reset(&lock, server_inbox, server_outbox, buffer); // Initialize the packet header and set the doorbell signal to begin execution // by the HSA runtime. diff --git a/libc/utils/gpu/loader/nvptx/CMakeLists.txt b/libc/utils/gpu/loader/nvptx/CMakeLists.txt --- a/libc/utils/gpu/loader/nvptx/CMakeLists.txt +++ b/libc/utils/gpu/loader/nvptx/CMakeLists.txt @@ -1,7 +1,6 @@ add_executable(nvptx_loader Loader.cpp) add_dependencies(nvptx_loader libc.src.__support.RPC.rpc) -target_include_directories(nvptx_loader PRIVATE ${LIBC_SOURCE_DIR}) target_link_libraries(nvptx_loader PRIVATE gpu_loader diff --git a/libc/utils/gpu/loader/nvptx/Loader.cpp b/libc/utils/gpu/loader/nvptx/Loader.cpp --- a/libc/utils/gpu/loader/nvptx/Loader.cpp +++ b/libc/utils/gpu/loader/nvptx/Loader.cpp @@ -14,8 +14,7 @@ //===----------------------------------------------------------------------===// #include "Loader.h" - -#include "src/__support/RPC/rpc.h" +#include "Server.h" #include "cuda.h" #include @@ -34,30 +33,6 @@ void *buffer; }; -static __llvm_libc::rpc::Server server; - -/// Queries the RPC client at least once and performs server-side work if there -/// are any active requests. -void handle_server() { - while (server.handle( - [&](__llvm_libc::rpc::Buffer *buffer) { - switch (static_cast<__llvm_libc::rpc::Opcode>(buffer->data[0])) { - case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: { - fputs(reinterpret_cast(&buffer->data[1]), stderr); - break; - } - case __llvm_libc::rpc::Opcode::EXIT: { - exit(buffer->data[1]); - break; - } - default: - return; - }; - }, - [](__llvm_libc::rpc::Buffer *buffer) {})) - ; -} - static void handle_error(CUresult err) { if (err == CUDA_SUCCESS) return; @@ -154,7 +129,7 @@ CU_LAUNCH_PARAM_END}; // Initialize the RPC server's buffer for host-device communication. - server.reset(server_inbox, server_outbox, buffer); + server.reset(&lock, server_inbox, server_outbox, buffer); // Call the kernel with the given arguments. if (CUresult err =