diff --git a/libc/src/__support/OSUtil/gpu/io.cpp b/libc/src/__support/OSUtil/gpu/io.cpp --- a/libc/src/__support/OSUtil/gpu/io.cpp +++ b/libc/src/__support/OSUtil/gpu/io.cpp @@ -15,7 +15,7 @@ namespace __llvm_libc { void write_to_stderr(cpp::string_view msg) { - rpc::Port port = rpc::client.open(rpc::PRINT_TO_STDERR); + rpc::Client::Port port = rpc::client.open(rpc::PRINT_TO_STDERR); port.send_n(msg.data(), msg.size()); port.close(); } diff --git a/libc/src/__support/OSUtil/gpu/quick_exit.cpp b/libc/src/__support/OSUtil/gpu/quick_exit.cpp --- a/libc/src/__support/OSUtil/gpu/quick_exit.cpp +++ b/libc/src/__support/OSUtil/gpu/quick_exit.cpp @@ -17,7 +17,7 @@ namespace __llvm_libc { void quick_exit(int status) { - rpc::Port port = rpc::client.open(rpc::EXIT); + rpc::Client::Port port = rpc::client.open(rpc::EXIT); port.send([&](rpc::Buffer *buffer) { reinterpret_cast(buffer->data)[0] = status; }); diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h --- a/libc/src/__support/RPC/rpc.h +++ b/libc/src/__support/RPC/rpc.h @@ -48,6 +48,13 @@ /// server. The process contains an inbox and an outbox used for signaling /// ownership of the shared buffer between both sides. /// +/// No process writes to its inbox. Each toggles the bit in the outbox to pass +/// ownership to the other process. +/// When inbox == outbox, the current state machine owns the buffer. +/// Initially the client is able to open any port as it will load 0 from both. +/// The server inbox read is inverted, so it loads inbox==1, outbox==0 until +/// the client has written to its outbox. +/// /// This process is designed to support mostly arbitrary combinations of 'send' /// and 'recv' operations on the shared buffer as long as these operations are /// mirrored by the other process. These operations exchange ownership of the @@ -58,23 +65,12 @@ /// - For every 'send' / 'recv' call on one side of the process there is a /// mirrored 'recv' / 'send' call. /// -/// The communication protocol is organized as a pair of two-state state -/// machines. One state machine tracks outgoing sends and the other tracks -/// incoming receives. For example, a 'send' operation uses its input 'Ack' bit -/// and its output 'Data' bit. If these bits are equal the sender owns the -/// buffer, otherwise the receiver owns the buffer and we wait. Similarly, a -/// 'recv' operation uses its output 'Ack' bit and input 'Data' bit. If these -/// bits are not equal the receiver owns the buffer, otherwise the sender owns -/// the buffer. -struct Process { +template struct Process { LIBC_INLINE Process() = default; LIBC_INLINE Process(const Process &) = default; LIBC_INLINE Process &operator=(const Process &) = default; LIBC_INLINE ~Process() = default; - static constexpr uint32_t Data = 0b01; - static constexpr uint32_t Ack = 0b10; - cpp::Atomic *lock; cpp::Atomic *inbox; cpp::Atomic *outbox; @@ -90,25 +86,38 @@ }; } - /// Determines if this process owns the buffer for a send. We can send data if - /// the output data bit matches the input acknowledge bit. + /// Inverting the bits loaded from the inbox in exactly one of the pair of + /// processes means that each can use the same state transitions. + /// Whichever process has InvertInbox==false is the initial owner. + /// Inbox equal Outbox => current process owns the buffer + /// Inbox difer Outbox => current process does not own the buffer + /// At startup, memory is zero initialised and raw loads of either mailbox + /// would return zero. Thus both would succeed in opening a port and data + /// races result. If either inbox or outbox is inverted for one process, that + /// process interprets memory as Inbox!=Outbox and thus waits for the other. + /// It is simpler to invert reads from the inbox than writes to the outbox. + LIBC_INLINE uint32_t load_inbox(uint64_t index) { + uint32_t i = inbox[index].load(cpp::MemoryOrder::RELAXED); + return InvertInbox ? !i : i; + } + + /// Determines if this process owns the buffer for a send. LIBC_INLINE static bool can_send_data(uint32_t in, uint32_t out) { - return bool(in & Process::Ack) == bool(out & Process::Data); + return in == out; } - /// Determines if this process owns the buffer for a receive. We can send data - /// if the output acknowledge bit does not match the input data bit. + /// Determines if this process owns the buffer for a receive. LIBC_INLINE static bool can_recv_data(uint32_t in, uint32_t out) { - return bool(in & Process::Data) != bool(out & Process::Ack); + return in == out; } }; /// The port provides the interface to communicate between the multiple /// processes. A port is conceptually an index into the memory provided by the /// underlying process that is guarded by a lock bit. -struct Port { +template struct Port { // TODO: This should be move-only. - LIBC_INLINE Port(Process &process, uint64_t index, uint32_t out) + LIBC_INLINE Port(Process &process, uint64_t index, uint32_t out) : process(process), index(index), out(out) {} LIBC_INLINE Port(const Port &) = default; LIBC_INLINE Port &operator=(const Port &) = delete; @@ -131,70 +140,73 @@ } private: - Process &process; + Process &process; uint64_t index; uint32_t out; }; /// The RPC client used to make requests to the server. -struct Client : public Process { +struct Client : public Process { LIBC_INLINE Client() = default; LIBC_INLINE Client(const Client &) = default; LIBC_INLINE Client &operator=(const Client &) = default; LIBC_INLINE ~Client() = default; + using Port = rpc::Port; LIBC_INLINE cpp::optional try_open(uint16_t opcode); LIBC_INLINE Port open(uint16_t opcode); }; /// The RPC server used to respond to the client. -struct Server : public Process { +struct Server : public Process { LIBC_INLINE Server() = default; LIBC_INLINE Server(const Server &) = default; LIBC_INLINE Server &operator=(const Server &) = default; LIBC_INLINE ~Server() = default; + using Port = rpc::Port; LIBC_INLINE cpp::optional try_open(); LIBC_INLINE Port open(); }; /// Applies \p fill to the shared buffer and initiates a send operation. -template LIBC_INLINE void Port::send(F fill) { - uint32_t in = process.inbox[index].load(cpp::MemoryOrder::RELAXED); +template template LIBC_INLINE void Port::send(F fill) { + uint32_t in = process.load_inbox(index); // We need to wait until we own the buffer before sending. - while (!Process::can_send_data(in, out)) { + while (!Process::can_send_data(in, out)) { sleep_briefly(); - in = process.inbox[index].load(cpp::MemoryOrder::RELAXED); + in = process.load_inbox(index); } // Apply the \p fill function to initialize the buffer and release the memory. fill(&process.buffer[index]); - out = out ^ Process::Data; + out = !out; atomic_thread_fence(cpp::MemoryOrder::RELEASE); process.outbox[index].store(out, cpp::MemoryOrder::RELAXED); } /// Applies \p use to the shared buffer and acknowledges the send. -template LIBC_INLINE void Port::recv(U use) { - uint32_t in = process.inbox[index].load(cpp::MemoryOrder::RELAXED); +template template LIBC_INLINE void Port::recv(U use) { + uint32_t in = process.load_inbox(index); // We need to wait until we own the buffer before receiving. - while (!Process::can_recv_data(in, out)) { + while (!Process::can_recv_data(in, out)) { sleep_briefly(); - in = process.inbox[index].load(cpp::MemoryOrder::RELAXED); + in = process.load_inbox(index); } atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); // Apply the \p use function to read the memory out of the buffer. use(&process.buffer[index]); - out = out ^ Process::Ack; + out = !out; process.outbox[index].store(out, cpp::MemoryOrder::RELAXED); } /// Combines a send and receive into a single function. +template template -LIBC_INLINE void Port::send_and_recv(F fill, U use) { +LIBC_INLINE void Port::send_and_recv(F fill, U use) { send(fill); recv(use); } @@ -202,14 +214,17 @@ /// Combines a receive and send operation into a single function. The \p work /// function modifies the buffer in-place and the send is only used to initiate /// the copy back. -template LIBC_INLINE void Port::recv_and_send(W work) { +template +template +LIBC_INLINE void Port::recv_and_send(W work) { recv(work); send([](Buffer *) { /* no-op */ }); } /// Sends an arbitrarily sized data buffer \p src across the shared channel in /// multiples of the packet length. -LIBC_INLINE void Port::send_n(const void *src, uint64_t size) { +template +LIBC_INLINE void Port::send_n(const void *src, uint64_t size) { // TODO: We could send the first bytes in this call and potentially save an // extra send operation. send([=](Buffer *buffer) { buffer->data[0] = size; }); @@ -226,7 +241,9 @@ /// Receives an arbitrarily sized data buffer across the shared channel in /// multiples of the packet length. The \p alloc function is called with the /// size of the data so that we can initialize the size of the \p dst buffer. -template LIBC_INLINE void Port::recv_n(A alloc) { +template +template +LIBC_INLINE void Port::recv_n(A alloc) { uint64_t size = 0; recv([&](Buffer *buffer) { size = buffer->data[0]; }); uint8_t *dst = reinterpret_cast(alloc(size)); @@ -243,7 +260,8 @@ /// port if we find an index that is in a valid sending state. That is, there /// are send operations pending that haven't been serviced on this port. Each /// port instance uses an associated \p opcode to tell the server what to do. -LIBC_INLINE cpp::optional Client::try_open(uint16_t opcode) { +LIBC_INLINE cpp::optional Client::try_open(uint16_t opcode) { + constexpr uint64_t index = 0; // Attempt to acquire the lock on this index. if (lock->fetch_or(1, cpp::MemoryOrder::RELAXED)) return cpp::nullopt; @@ -251,7 +269,7 @@ // The mailbox state must be read with the lock held. atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); - uint32_t in = inbox->load(cpp::MemoryOrder::RELAXED); + uint32_t in = load_inbox(index); uint32_t out = outbox->load(cpp::MemoryOrder::RELAXED); // Once we acquire the index we need to check if we are in a valid sending @@ -262,12 +280,12 @@ } buffer->opcode = opcode; - return Port(*this, 0, out); + return Port(*this, index, out); } -LIBC_INLINE Port Client::open(uint16_t opcode) { +LIBC_INLINE Client::Port Client::open(uint16_t opcode) { for (;;) { - if (cpp::optional p = try_open(opcode)) + if (cpp::optional p = try_open(opcode)) return p.value(); sleep_briefly(); } @@ -275,8 +293,9 @@ /// Attempts to open a port to use as the server. The server can only open a /// port if it has a pending receive operation -LIBC_INLINE cpp::optional Server::try_open() { - uint32_t in = inbox->load(cpp::MemoryOrder::RELAXED); +LIBC_INLINE cpp::optional Server::try_open() { + constexpr uint64_t index = 0; + uint32_t in = load_inbox(index); uint32_t out = outbox->load(cpp::MemoryOrder::RELAXED); // The server is passive, if there is no work pending don't bother @@ -291,7 +310,7 @@ // The mailbox state must be read with the lock held. atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); - in = inbox->load(cpp::MemoryOrder::RELAXED); + in = load_inbox(index); out = outbox->load(cpp::MemoryOrder::RELAXED); if (!can_recv_data(in, out)) { @@ -299,12 +318,12 @@ return cpp::nullopt; } - return Port(*this, 0, out); + return Port(*this, index, out); } -LIBC_INLINE Port Server::open() { +LIBC_INLINE Server::Port Server::open() { for (;;) { - if (cpp::optional p = try_open()) + if (cpp::optional p = try_open()) return p.value(); sleep_briefly(); } diff --git a/libc/test/integration/startup/gpu/rpc_test.cpp b/libc/test/integration/startup/gpu/rpc_test.cpp --- a/libc/test/integration/startup/gpu/rpc_test.cpp +++ b/libc/test/integration/startup/gpu/rpc_test.cpp @@ -16,7 +16,7 @@ uint32_t num_additions = 1000 + 10 * gpu::get_block_id_x(); uint64_t cnt = 0; for (uint32_t i = 0; i < num_additions; ++i) { - rpc::Port port = rpc::client.open(rpc::TEST_INCREMENT); + rpc::Client::Port port = rpc::client.open(rpc::TEST_INCREMENT); port.send_and_recv( [=](rpc::Buffer *buffer) { reinterpret_cast(buffer->data)[0] = cnt;