diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h --- a/libc/src/__support/RPC/rpc.h +++ b/libc/src/__support/RPC/rpc.h @@ -82,9 +82,6 @@ LIBC_INLINE Process &operator=(Process &&) = default; LIBC_INLINE ~Process() = default; - template friend struct Port; - -protected: uint64_t port_count; cpp::Atomic *inbox; cpp::Atomic *outbox; @@ -92,7 +89,6 @@ cpp::Atomic lock[DEFAULT_PORT_COUNT] = {0}; -public: /// Initialize the communication channels. LIBC_INLINE void reset(uint64_t port_count, void *buffer) { this->port_count = port_count; @@ -328,7 +324,7 @@ }; /// The RPC client used to make requests to the server. -struct Client : public Process { +struct Client { LIBC_INLINE Client() = default; LIBC_INLINE Client(const Client &) = delete; LIBC_INLINE Client &operator=(const Client &) = delete; @@ -337,10 +333,17 @@ using Port = rpc::Port; template LIBC_INLINE cpp::optional try_open(); template LIBC_INLINE Port open(); + + LIBC_INLINE void reset(uint64_t port_count, void *buffer) { + process.reset(port_count, buffer); + } + +private: + Process process; }; /// The RPC server used to respond to the client. -template struct Server : public Process { +template struct Server { LIBC_INLINE Server() = default; LIBC_INLINE Server(const Server &) = delete; LIBC_INLINE Server &operator=(const Server &) = delete; @@ -349,6 +352,21 @@ using Port = rpc::Port; LIBC_INLINE cpp::optional try_open(); LIBC_INLINE Port open(); + + LIBC_INLINE void reset(uint64_t port_count, void *buffer) { + process.reset(port_count, buffer); + } + + LIBC_INLINE void *get_buffer_start() const { + return process.get_buffer_start(); + } + + LIBC_INLINE static uint64_t allocation_size(uint64_t port_count) { + return Process::allocation_size(port_count); + } + +private: + Process process; }; /// Applies \p fill to the shared buffer and initiates a send operation. @@ -487,28 +505,28 @@ [[clang::convergent]] LIBC_INLINE cpp::optional Client::try_open() { // Perform a naive linear scan for a port that can be opened to send data. - for (uint64_t index = 0; index < this->port_count; ++index) { + for (uint64_t index = 0; index < process.port_count; ++index) { // Attempt to acquire the lock on this index. uint64_t lane_mask = gpu::get_lane_mask(); - if (!this->try_lock(lane_mask, index)) + if (!process.try_lock(lane_mask, index)) continue; - uint32_t in = this->load_inbox(index); - uint32_t out = this->load_outbox(index); + uint32_t in = process.load_inbox(index); + uint32_t out = process.load_outbox(index); // Once we acquire the index we need to check if we are in a valid sending // state. - if (this->buffer_unavailable(in, out)) { - this->unlock(lane_mask, index); + if (process.buffer_unavailable(in, out)) { + process.unlock(lane_mask, index); continue; } if (is_first_lane(lane_mask)) { - this->packet[index].header.opcode = opcode; - this->packet[index].header.mask = lane_mask; + process.packet[index].header.opcode = opcode; + process.packet[index].header.mask = lane_mask; } gpu::sync_lane(lane_mask); - return Port(*this, lane_mask, index, out); + return Port(process, lane_mask, index, out); } return cpp::nullopt; } @@ -528,29 +546,29 @@ cpp::optional::Port> Server::try_open() { // Perform a naive linear scan for a port that has a pending request. - for (uint64_t index = 0; index < this->port_count; ++index) { - uint32_t in = this->load_inbox(index); - uint32_t out = this->load_outbox(index); + for (uint64_t index = 0; index < process.port_count; ++index) { + uint32_t in = process.load_inbox(index); + uint32_t out = process.load_outbox(index); // The server is passive, if there is no work pending don't bother // opening a port. - if (this->buffer_unavailable(in, out)) + if (process.buffer_unavailable(in, out)) continue; // Attempt to acquire the lock on this index. uint64_t lane_mask = gpu::get_lane_mask(); - if (!this->try_lock(lane_mask, index)) + if (!process.try_lock(lane_mask, index)) continue; - in = this->load_inbox(index); - out = this->load_outbox(index); + in = process.load_inbox(index); + out = process.load_outbox(index); - if (this->buffer_unavailable(in, out)) { - this->unlock(lane_mask, index); + if (process.buffer_unavailable(in, out)) { + process.unlock(lane_mask, index); continue; } - return Port(*this, lane_mask, index, out); + return Port(process, lane_mask, index, out); } return cpp::nullopt; }