diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h --- a/libc/src/__support/RPC/rpc.h +++ b/libc/src/__support/RPC/rpc.h @@ -45,20 +45,15 @@ /// The data payload for the associated packet. We provide enough space for each /// thread in the cooperating lane to have a buffer. -struct Payload { -#if defined(LIBC_TARGET_ARCH_IS_GPU) - Buffer slot[gpu::LANE_SIZE]; -#else - // Flexible array size allocated at runtime to the appropriate size. - Buffer slot[]; -#endif +template struct Payload { + Buffer slot[lane_size]; }; /// A packet used to share data between the client and server across an entire /// lane. We use a lane as the minimum granularity for execution. -struct alignas(64) Packet { +template struct alignas(64) Packet { Header header; - Payload payload; + Payload payload; }; // TODO: This should be configured by the server and passed in. The general rule @@ -79,7 +74,7 @@ /// - The client will always start with a 'send' operation. /// - The server will always start with a 'recv' operation. /// - Every 'send' or 'recv' call is mirrored by the other process. -template struct Process { +template struct Process { LIBC_INLINE Process() = default; LIBC_INLINE Process(const Process &) = delete; LIBC_INLINE Process &operator=(const Process &) = delete; @@ -87,29 +82,26 @@ LIBC_INLINE Process &operator=(Process &&) = default; LIBC_INLINE ~Process() = default; - template friend struct Port; + template friend struct Port; protected: uint64_t port_count; - uint32_t lane_size; cpp::Atomic *inbox; cpp::Atomic *outbox; - Packet *packet; + Packet *packet; cpp::Atomic lock[DEFAULT_PORT_COUNT] = {0}; public: /// Initialize the communication channels. - LIBC_INLINE void reset(uint64_t port_count, uint32_t lane_size, - void *buffer) { + LIBC_INLINE void reset(uint64_t port_count, void *buffer) { this->port_count = port_count; - this->lane_size = lane_size; this->inbox = reinterpret_cast *>( advance(buffer, inbox_offset(port_count))); this->outbox = reinterpret_cast *>( advance(buffer, outbox_offset(port_count))); - this->packet = - reinterpret_cast(advance(buffer, buffer_offset(port_count))); + this->packet = reinterpret_cast *>( + advance(buffer, buffer_offset(port_count))); } /// Returns the beginning of the unified buffer. Intended for initializing the @@ -124,20 +116,11 @@ /// Atomic secondary[port_count]; /// Packet buffer[port_count]; /// }; - LIBC_INLINE static uint64_t allocation_size(uint64_t port_count, - uint32_t lane_size) { - return buffer_offset(port_count) + buffer_bytes(port_count, lane_size); + LIBC_INLINE static uint64_t allocation_size(uint64_t port_count) { + return buffer_offset(port_count) + buffer_bytes(port_count); } protected: - /// The length of the packet is flexible because the server needs to look up - /// the lane size at runtime. This helper indexes at the proper offset. - LIBC_INLINE Packet &get_packet(uint64_t index) { - return *reinterpret_cast(advance( - packet, index * align_up(sizeof(Header) + lane_size * sizeof(Buffer), - alignof(Packet)))); - } - /// Retrieve the inbox state from memory shared between processes. LIBC_INLINE uint32_t load_inbox(uint64_t index) { return inbox[index].load(cpp::MemoryOrder::RELAXED); @@ -222,7 +205,7 @@ /// Invokes a function accross every active buffer across the total lane size. LIBC_INLINE void invoke_rpc(cpp::function fn, - Packet &packet) { + Packet &packet) { if constexpr (is_process_gpu()) { fn(&packet.payload.slot[gpu::get_lane_id()]); } else { @@ -234,7 +217,7 @@ /// Alternate version that also provides the index of the current lane. LIBC_INLINE void invoke_rpc(cpp::function fn, - Packet &packet) { + Packet &packet) { if constexpr (is_process_gpu()) { fn(&packet.payload.slot[gpu::get_lane_id()], gpu::get_lane_id()); } else { @@ -250,13 +233,8 @@ } /// Number of bytes to allocate for the buffer containing the packets. - LIBC_INLINE static uint64_t buffer_bytes(uint64_t port_count, - uint32_t lane_size) { - return is_process_gpu() - ? port_count * sizeof(Packet) - : port_count * - align_up(sizeof(Header) + (lane_size * sizeof(Buffer)), - alignof(Packet)); + LIBC_INLINE static uint64_t buffer_bytes(uint64_t port_count) { + return port_count * sizeof(Packet); } /// Offset of the inbox in memory. This is the same as the outbox if inverted. @@ -271,15 +249,15 @@ /// Offset of the buffer containing the packets after the inbox and outbox. LIBC_INLINE static uint64_t buffer_offset(uint64_t port_count) { - return align_up(2 * mailbox_bytes(port_count), alignof(Packet)); + return align_up(2 * mailbox_bytes(port_count), alignof(Packet)); } }; /// The port provides the interface to communicate between the multiple /// processes. A port is conceptually an index into the memory provided by the /// underlying process that is guarded by a lock bit. -template struct Port { - LIBC_INLINE Port(Process &process, uint64_t lane_mask, uint64_t index, +template struct Port { + LIBC_INLINE Port(Process &process, uint64_t lane_mask, uint64_t index, uint32_t out) : process(process), lane_mask(lane_mask), index(index), out(out), receive(false), owns_buffer(true) {} @@ -292,8 +270,8 @@ LIBC_INLINE Port &operator=(Port &&) = default; friend struct Client; - friend struct Server; - friend class cpp::optional>; + template friend struct Server; + friend class cpp::optional>; public: template LIBC_INLINE void recv(U use); @@ -307,7 +285,7 @@ LIBC_INLINE void recv_n(void **dst, uint64_t *size, A &&alloc); LIBC_INLINE uint16_t get_opcode() const { - return process.get_packet(index).header.opcode; + return process.packet[index].header.opcode; } LIBC_INLINE void close() { @@ -319,7 +297,7 @@ } private: - Process &process; + Process &process; uint64_t lane_mask; uint64_t index; uint32_t out; @@ -328,41 +306,43 @@ }; /// The RPC client used to make requests to the server. -struct Client : public Process { +struct Client : public Process { LIBC_INLINE Client() = default; LIBC_INLINE Client(const Client &) = delete; LIBC_INLINE Client &operator=(const Client &) = delete; LIBC_INLINE ~Client() = default; - using Port = rpc::Port; + using Port = rpc::Port; template LIBC_INLINE cpp::optional try_open(); template LIBC_INLINE Port open(); }; /// The RPC server used to respond to the client. -struct Server : public Process { +template struct Server : public Process { LIBC_INLINE Server() = default; LIBC_INLINE Server(const Server &) = delete; LIBC_INLINE Server &operator=(const Server &) = delete; LIBC_INLINE ~Server() = default; - using Port = rpc::Port; + using Port = rpc::Port; LIBC_INLINE cpp::optional try_open(); LIBC_INLINE Port open(); }; /// Applies \p fill to the shared buffer and initiates a send operation. -template template LIBC_INLINE void Port::send(F fill) { +template +template +LIBC_INLINE void Port::send(F fill) { uint32_t in = owns_buffer ? out ^ T : process.load_inbox(index); // We need to wait until we own the buffer before sending. - while (Process::buffer_unavailable(in, out)) { + while (Process::buffer_unavailable(in, out)) { sleep_briefly(); in = process.load_inbox(index); } // Apply the \p fill function to initialize the buffer and release the memory. - process.invoke_rpc(fill, process.get_packet(index)); + process.invoke_rpc(fill, process.packet[index]); atomic_thread_fence(cpp::MemoryOrder::RELEASE); out = process.invert_outbox(index, out); owns_buffer = false; @@ -370,7 +350,9 @@ } /// Applies \p use to the shared buffer and acknowledges the send. -template template LIBC_INLINE void Port::recv(U use) { +template +template +LIBC_INLINE void Port::recv(U use) { // We only exchange ownership of the buffer during a receive if we are waiting // for a previous receive to finish. if (receive) { @@ -381,22 +363,22 @@ uint32_t in = owns_buffer ? out ^ T : process.load_inbox(index); // We need to wait until we own the buffer before receiving. - while (Process::buffer_unavailable(in, out)) { + while (Process::buffer_unavailable(in, out)) { sleep_briefly(); in = process.load_inbox(index); } atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); // Apply the \p use function to read the memory out of the buffer. - process.invoke_rpc(use, process.get_packet(index)); + process.invoke_rpc(use, process.packet[index]); receive = true; owns_buffer = true; } /// Combines a send and receive into a single function. -template +template template -LIBC_INLINE void Port::send_and_recv(F fill, U use) { +LIBC_INLINE void Port::send_and_recv(F fill, U use) { send(fill); recv(use); } @@ -404,17 +386,17 @@ /// Combines a receive and send operation into a single function. The \p work /// function modifies the buffer in-place and the send is only used to initiate /// the copy back. -template +template template -LIBC_INLINE void Port::recv_and_send(W work) { +LIBC_INLINE void Port::recv_and_send(W work) { recv(work); send([](Buffer *) { /* no-op */ }); } /// Helper routine to simplify the interface when sending from the GPU using /// thread private pointers to the underlying value. -template -LIBC_INLINE void Port::send_n(const void *src, uint64_t size) { +template +LIBC_INLINE void Port::send_n(const void *src, uint64_t size) { static_assert(is_process_gpu(), "Only valid when running on the GPU"); const void **src_ptr = &src; uint64_t *size_ptr = &size; @@ -423,8 +405,8 @@ /// Sends an arbitrarily sized data buffer \p src across the shared channel in /// multiples of the packet length. -template -LIBC_INLINE void Port::send_n(const void *const *src, uint64_t *size) { +template +LIBC_INLINE void Port::send_n(const void *const *src, uint64_t *size) { uint64_t num_sends = 0; send([&](Buffer *buffer, uint32_t id) { reinterpret_cast(buffer->data)[0] = lane_value(size, id); @@ -437,7 +419,7 @@ inline_memcpy(&buffer->data[1], lane_value(src, id), len); }); uint64_t idx = sizeof(Buffer::data) - sizeof(uint64_t); - uint64_t mask = process.get_packet(index).header.mask; + uint64_t mask = process.packet[index].header.mask; while (gpu::ballot(mask, idx < num_sends)) { send([=](Buffer *buffer, uint32_t id) { uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data) @@ -453,9 +435,9 @@ /// Receives an arbitrarily sized data buffer across the shared channel in /// multiples of the packet length. The \p alloc function is called with the /// size of the data so that we can initialize the size of the \p dst buffer. -template +template template -LIBC_INLINE void Port::recv_n(void **dst, uint64_t *size, A &&alloc) { +LIBC_INLINE void Port::recv_n(void **dst, uint64_t *size, A &&alloc) { uint64_t num_recvs = 0; recv([&](Buffer *buffer, uint32_t id) { lane_value(size, id) = reinterpret_cast(buffer->data)[0]; @@ -470,7 +452,7 @@ inline_memcpy(lane_value(dst, id), &buffer->data[1], len); }); uint64_t idx = sizeof(Buffer::data) - sizeof(uint64_t); - uint64_t mask = process.get_packet(index).header.mask; + uint64_t mask = process.packet[index].header.mask; while (gpu::ballot(mask, idx < num_recvs)) { recv([=](Buffer *buffer, uint32_t id) { uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data) @@ -491,28 +473,28 @@ [[clang::convergent]] LIBC_INLINE cpp::optional Client::try_open() { // Perform a naive linear scan for a port that can be opened to send data. - for (uint64_t index = 0; index < port_count; ++index) { + for (uint64_t index = 0; index < this->port_count; ++index) { // Attempt to acquire the lock on this index. uint64_t lane_mask = gpu::get_lane_mask(); - if (!try_lock(lane_mask, index)) + if (!this->try_lock(lane_mask, index)) continue; // The mailbox state must be read with the lock held. atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); - uint32_t in = load_inbox(index); - uint32_t out = load_outbox(index); + uint32_t in = this->load_inbox(index); + uint32_t out = this->load_outbox(index); // Once we acquire the index we need to check if we are in a valid sending // state. - if (buffer_unavailable(in, out)) { - unlock(lane_mask, index); + if (this->buffer_unavailable(in, out)) { + this->unlock(lane_mask, index); continue; } if (is_first_lane(lane_mask)) { - get_packet(index).header.opcode = opcode; - get_packet(index).header.mask = lane_mask; + this->packet[index].header.opcode = opcode; + this->packet[index].header.mask = lane_mask; } gpu::sync_lane(lane_mask); return Port(*this, lane_mask, index, out); @@ -530,32 +512,34 @@ /// Attempts to open a port to use as the server. The server can only open a /// port if it has a pending receive operation -[[clang::convergent]] LIBC_INLINE cpp::optional -Server::try_open() { +template +[[clang::convergent]] LIBC_INLINE + cpp::optional::Port> + Server::try_open() { // Perform a naive linear scan for a port that has a pending request. - for (uint64_t index = 0; index < port_count; ++index) { - uint32_t in = load_inbox(index); - uint32_t out = load_outbox(index); + for (uint64_t index = 0; index < this->port_count; ++index) { + uint32_t in = this->load_inbox(index); + uint32_t out = this->load_outbox(index); // The server is passive, if there is no work pending don't bother // opening a port. - if (buffer_unavailable(in, out)) + if (this->buffer_unavailable(in, out)) continue; // Attempt to acquire the lock on this index. uint64_t lane_mask = gpu::get_lane_mask(); // Attempt to acquire the lock on this index. - if (!try_lock(lane_mask, index)) + if (!this->try_lock(lane_mask, index)) continue; // The mailbox state must be read with the lock held. atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); - in = load_inbox(index); - out = load_outbox(index); + in = this->load_inbox(index); + out = this->load_outbox(index); - if (buffer_unavailable(in, out)) { - unlock(lane_mask, index); + if (this->buffer_unavailable(in, out)) { + this->unlock(lane_mask, index); continue; } @@ -564,7 +548,8 @@ return cpp::nullopt; } -LIBC_INLINE Server::Port Server::open() { +template +LIBC_INLINE typename Server::Port Server::open() { for (;;) { if (cpp::optional p = try_open()) return cpp::move(p.value()); diff --git a/libc/src/__support/RPC/rpc_client.cpp b/libc/src/__support/RPC/rpc_client.cpp --- a/libc/src/__support/RPC/rpc_client.cpp +++ b/libc/src/__support/RPC/rpc_client.cpp @@ -6,6 +6,7 @@ // //===----------------------------------------------------------------------===// +#include "rpc_client.h" #include "rpc.h" namespace __llvm_libc { diff --git a/libc/src/gpu/rpc_reset.cpp b/libc/src/gpu/rpc_reset.cpp --- a/libc/src/gpu/rpc_reset.cpp +++ b/libc/src/gpu/rpc_reset.cpp @@ -18,8 +18,7 @@ // shared buffer. LLVM_LIBC_FUNCTION(void, rpc_reset, (unsigned int num_ports, void *rpc_shared_buffer)) { - __llvm_libc::rpc::client.reset(num_ports, __llvm_libc::gpu::get_lane_size(), - rpc_shared_buffer); + __llvm_libc::rpc::client.reset(num_ports, rpc_shared_buffer); } } // namespace __llvm_libc diff --git a/libc/startup/gpu/amdgpu/start.cpp b/libc/startup/gpu/amdgpu/start.cpp --- a/libc/startup/gpu/amdgpu/start.cpp +++ b/libc/startup/gpu/amdgpu/start.cpp @@ -42,7 +42,6 @@ // We need to set up the RPC client first in case any of the constructors // require it. __llvm_libc::rpc::client.reset(__llvm_libc::rpc::DEFAULT_PORT_COUNT, - __llvm_libc::gpu::get_lane_size(), rpc_shared_buffer); // We want the fini array callbacks to be run after other atexit diff --git a/libc/startup/gpu/nvptx/start.cpp b/libc/startup/gpu/nvptx/start.cpp --- a/libc/startup/gpu/nvptx/start.cpp +++ b/libc/startup/gpu/nvptx/start.cpp @@ -46,7 +46,6 @@ // We need to set up the RPC client first in case any of the constructors // require it. __llvm_libc::rpc::client.reset(__llvm_libc::rpc::DEFAULT_PORT_COUNT, - __llvm_libc::gpu::get_lane_size(), rpc_shared_buffer); // We want the fini array callbacks to be run after other atexit diff --git a/libc/utils/gpu/server/Server.h b/libc/utils/gpu/server/Server.h --- a/libc/utils/gpu/server/Server.h +++ b/libc/utils/gpu/server/Server.h @@ -23,15 +23,18 @@ /// status codes. typedef enum { RPC_STATUS_SUCCESS = 0x0, + RPC_STATUS_CONTINUE = 0x1, RPC_STATUS_ERROR = 0x1000, RPC_STATUS_OUT_OF_RANGE = 0x1001, RPC_STATUS_UNHANDLED_OPCODE = 0x1002, + RPC_STATUS_INVALID_LANE_SIZE = 0x1003, } rpc_status_t; /// A struct containing an opaque handle to an RPC port. This is what allows the /// server to communicate with the client. typedef struct rpc_port_s { uint64_t handle; + uint32_t lane_size; } rpc_port_t; /// A fixed-size buffer containing the payload sent from the client. diff --git a/libc/utils/gpu/server/Server.cpp b/libc/utils/gpu/server/Server.cpp --- a/libc/utils/gpu/server/Server.cpp +++ b/libc/utils/gpu/server/Server.cpp @@ -14,6 +14,8 @@ #include #include #include +#include +#include using namespace __llvm_libc; @@ -22,81 +24,51 @@ static_assert(RPC_MAXIMUM_PORT_COUNT == rpc::DEFAULT_PORT_COUNT, "Incorrect maximum port count"); -struct Device { - rpc::Server server; - std::unordered_map callbacks; - std::unordered_map callback_data; -}; - -// A struct containing all the runtime state required to run the RPC server. -struct State { - State(uint32_t num_devices) - : num_devices(num_devices), - devices(std::unique_ptr(new Device[num_devices])), - reference_count(0u) {} - uint32_t num_devices; - std::unique_ptr devices; - std::atomic_uint32_t reference_count; -}; - -static std::mutex startup_mutex; - -static State *state; - -rpc_status_t rpc_init(uint32_t num_devices) { - std::scoped_lock lock(startup_mutex); - if (!state) - state = new State(num_devices); - - if (state->reference_count == std::numeric_limits::max()) - return RPC_STATUS_ERROR; - - state->reference_count++; - - return RPC_STATUS_SUCCESS; -} - -rpc_status_t rpc_shutdown(void) { - if (state->reference_count-- == 1) - delete state; - - return RPC_STATUS_SUCCESS; -} - -rpc_status_t rpc_server_init(uint32_t device_id, uint64_t num_ports, - uint32_t lane_size, rpc_alloc_ty alloc, - void *data) { - if (device_id >= state->num_devices) - return RPC_STATUS_OUT_OF_RANGE; - - uint64_t buffer_size = - __llvm_libc::rpc::Server::allocation_size(num_ports, lane_size); - void *buffer = alloc(buffer_size, data); - - if (!buffer) - return RPC_STATUS_ERROR; - state->devices[device_id].server.reset(num_ports, lane_size, buffer); +// The client needs to support different lane sizes for the SIMT model. Because +// of this we need to select between the possible sizes that the client can use. +struct Server { + template + Server(std::unique_ptr> &&server) + : server(std::move(server)) {} - return RPC_STATUS_SUCCESS; -} - -rpc_status_t rpc_server_shutdown(uint32_t device_id, rpc_free_ty dealloc, - void *data) { - if (device_id >= state->num_devices) - return RPC_STATUS_OUT_OF_RANGE; + void reset(uint64_t port_count, void *buffer) { + std::visit([&](auto &server) { server->reset(port_count, buffer); }, + server); + } - dealloc(rpc_get_buffer(device_id), data); + uint64_t allocation_size(uint64_t port_count) { + uint64_t ret = 0; + std::visit([&](auto &server) { ret = server->allocation_size(port_count); }, + server); + return ret; + } - return RPC_STATUS_SUCCESS; -} + void *get_buffer_start() const { + void *ret = nullptr; + std::visit([&](auto &server) { ret = server->get_buffer_start(); }, server); + return ret; + } -rpc_status_t rpc_handle_server(uint32_t device_id) { - if (device_id >= state->num_devices) - return RPC_STATUS_OUT_OF_RANGE; + rpc_status_t handle_server( + std::unordered_map &callbacks, + std::unordered_map &callback_data) { + rpc_status_t ret = RPC_STATUS_SUCCESS; + std::visit( + [&](auto &server) { + ret = handle_server(*server, callbacks, callback_data); + }, + server); + return ret; + } - for (;;) { - auto port = state->devices[device_id].server.try_open(); +private: + template + rpc_status_t handle_server( + rpc::Server &server, + std::unordered_map &callbacks, + std::unordered_map &callback_data) { + auto port = server.try_open(); if (!port) return RPC_STATUS_SUCCESS; @@ -175,21 +147,133 @@ break; } default: { - auto handler = state->devices[device_id].callbacks.find( - static_cast(port->get_opcode())); + auto handler = + callbacks.find(static_cast(port->get_opcode())); // We error out on an unhandled opcode. - if (handler == state->devices[device_id].callbacks.end()) + if (handler == callbacks.end()) return RPC_STATUS_UNHANDLED_OPCODE; // Invoke the registered callback with a reference to the port. - void *data = state->devices[device_id].callback_data.at( - static_cast(port->get_opcode())); - rpc_port_t port_ref{reinterpret_cast(&*port)}; + void *data = + callback_data.at(static_cast(port->get_opcode())); + rpc_port_t port_ref{reinterpret_cast(&*port), lane_size}; (handler->second)(port_ref, data); } } port->close(); + return RPC_STATUS_CONTINUE; + } + + std::variant>, + std::unique_ptr>, + std::unique_ptr>> + server; +}; + +struct Device { + template + Device(std::unique_ptr &&server) : server(std::move(server)) {} + Server server; + std::unordered_map callbacks; + std::unordered_map callback_data; +}; + +// A struct containing all the runtime state required to run the RPC server. +struct State { + State(uint32_t num_devices) + : num_devices(num_devices), devices(num_devices), reference_count(0u) {} + uint32_t num_devices; + std::vector> devices; + std::atomic_uint32_t reference_count; +}; + +static std::mutex startup_mutex; + +static State *state; + +rpc_status_t rpc_init(uint32_t num_devices) { + std::scoped_lock lock(startup_mutex); + if (!state) + state = new State(num_devices); + + if (state->reference_count == std::numeric_limits::max()) + return RPC_STATUS_ERROR; + + state->reference_count++; + + return RPC_STATUS_SUCCESS; +} + +rpc_status_t rpc_shutdown(void) { + if (state->reference_count-- == 1) + delete state; + + return RPC_STATUS_SUCCESS; +} + +rpc_status_t rpc_server_init(uint32_t device_id, uint64_t num_ports, + uint32_t lane_size, rpc_alloc_ty alloc, + void *data) { + if (device_id >= state->num_devices) + return RPC_STATUS_OUT_OF_RANGE; + + if (!state->devices[device_id]) { + switch (lane_size) { + case 1: + state->devices[device_id] = + std::make_unique(std::make_unique>()); + break; + case 32: + state->devices[device_id] = + std::make_unique(std::make_unique>()); + break; + case 64: + state->devices[device_id] = + std::make_unique(std::make_unique>()); + break; + default: + return RPC_STATUS_INVALID_LANE_SIZE; + } + } + + uint64_t size = state->devices[device_id]->server.allocation_size(num_ports); + void *buffer = alloc(size, data); + + if (!buffer) + return RPC_STATUS_ERROR; + + state->devices[device_id]->server.reset(num_ports, buffer); + + return RPC_STATUS_SUCCESS; +} + +rpc_status_t rpc_server_shutdown(uint32_t device_id, rpc_free_ty dealloc, + void *data) { + if (device_id >= state->num_devices) + return RPC_STATUS_OUT_OF_RANGE; + if (!state->devices[device_id]) + return RPC_STATUS_ERROR; + + dealloc(rpc_get_buffer(device_id), data); + if (state->devices[device_id]) + state->devices[device_id].release(); + + return RPC_STATUS_SUCCESS; +} + +rpc_status_t rpc_handle_server(uint32_t device_id) { + if (device_id >= state->num_devices) + return RPC_STATUS_OUT_OF_RANGE; + if (!state->devices[device_id]) + return RPC_STATUS_ERROR; + + for (;;) { + auto &device = *state->devices[device_id]; + rpc_status_t status = + device.server.handle_server(device.callbacks, device.callback_data); + if (status != RPC_STATUS_CONTINUE) + return status; } } @@ -198,22 +282,41 @@ void *data) { if (device_id >= state->num_devices) return RPC_STATUS_OUT_OF_RANGE; + if (!state->devices[device_id]) + return RPC_STATUS_ERROR; - state->devices[device_id].callbacks[opcode] = callback; - state->devices[device_id].callback_data[opcode] = data; + state->devices[device_id]->callbacks[opcode] = callback; + state->devices[device_id]->callback_data[opcode] = data; return RPC_STATUS_SUCCESS; } void *rpc_get_buffer(uint32_t device_id) { if (device_id >= state->num_devices) return nullptr; - return state->devices[device_id].server.get_buffer_start(); + if (!state->devices[device_id]) + return nullptr; + return state->devices[device_id]->server.get_buffer_start(); } void rpc_recv_and_send(rpc_port_t ref, rpc_port_callback_ty callback, void *data) { - rpc::Server::Port *port = reinterpret_cast(ref.handle); - port->recv_and_send([=](rpc::Buffer *buffer) { - callback(reinterpret_cast(buffer), data); - }); + if (ref.lane_size == 1) { + rpc::Server<1>::Port *port = + reinterpret_cast::Port *>(ref.handle); + port->recv_and_send([=](rpc::Buffer *buffer) { + callback(reinterpret_cast(buffer), data); + }); + } else if (ref.lane_size == 32) { + rpc::Server<32>::Port *port = + reinterpret_cast::Port *>(ref.handle); + port->recv_and_send([=](rpc::Buffer *buffer) { + callback(reinterpret_cast(buffer), data); + }); + } else if (ref.lane_size == 64) { + rpc::Server<64>::Port *port = + reinterpret_cast::Port *>(ref.handle); + port->recv_and_send([=](rpc::Buffer *buffer) { + callback(reinterpret_cast(buffer), data); + }); + } }