diff --git a/libc/src/__support/CPP/atomic.h b/libc/src/__support/CPP/atomic.h --- a/libc/src/__support/CPP/atomic.h +++ b/libc/src/__support/CPP/atomic.h @@ -90,6 +90,14 @@ return __atomic_fetch_sub(&val, decrement, int(mem_ord)); } + T fetch_or(T increment, MemoryOrder mem_ord = MemoryOrder::SEQ_CST) { + return __atomic_fetch_or(&val, increment, int(mem_ord)); + } + + T fetch_and(T increment, MemoryOrder mem_ord = MemoryOrder::SEQ_CST) { + return __atomic_fetch_and(&val, increment, int(mem_ord)); + } + // Set the value without using an atomic operation. This is useful // in initializing atomic values without a constructor. void set(T rhs) { val = rhs; } diff --git a/libc/src/__support/OSUtil/gpu/io.cpp b/libc/src/__support/OSUtil/gpu/io.cpp --- a/libc/src/__support/OSUtil/gpu/io.cpp +++ b/libc/src/__support/OSUtil/gpu/io.cpp @@ -21,13 +21,13 @@ LIBC_INLINE void send_null_terminated(cpp::string_view src) { rpc::client.run( - [&](rpc::Buffer *buffer) { + [&](rpc::ThreadBuffer *buffer) { buffer->data[0] = rpc::Opcode::PRINT_TO_STDERR; char *data = reinterpret_cast(&buffer->data[1]); inline_memcpy(data, src.data(), src.size()); data[src.size()] = '\0'; }, - [](rpc::Buffer *) { /* void */ }); + [](rpc::ThreadBuffer *) { /* void */ }); } } // namespace internal diff --git a/libc/src/__support/OSUtil/gpu/quick_exit.cpp b/libc/src/__support/OSUtil/gpu/quick_exit.cpp --- a/libc/src/__support/OSUtil/gpu/quick_exit.cpp +++ b/libc/src/__support/OSUtil/gpu/quick_exit.cpp @@ -20,11 +20,11 @@ // TODO: Support asynchronous calls so we don't wait and exit from the GPU // immediately. rpc::client.run( - [&](rpc::Buffer *buffer) { + [&](rpc::ThreadBuffer *buffer) { buffer->data[0] = rpc::Opcode::EXIT; buffer->data[1] = status; }, - [](rpc::Buffer *) { /* void */ }); + [](rpc::ThreadBuffer *) { /* void */ }); #if defined(LIBC_TARGET_ARCH_IS_NVPTX) asm("exit;" ::: "memory"); diff --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h --- a/libc/src/__support/RPC/rpc.h +++ b/libc/src/__support/RPC/rpc.h @@ -35,46 +35,300 @@ LIBC_LAST = (1UL << 8) - 1, }; +enum : uint32_t { + NumberPorts = 64, + NumberUInt32ForBitmaps = 2, +}; + /// A fixed size channel used to communicate between the RPC client and server. -struct Buffer { +struct ThreadBuffer { uint64_t data[8]; }; +struct Buffer { + ThreadBuffer data[32]; // TODO: handle wavesize==64 as well +}; + +// The data structure Process is essentially four arrays of the same length +// indexed by port_t. The operations on process provide mutally exclusive +// access to the Buffer element at index port_t::value. Ownership alternates +// between the client and the server instance. +// The template parameters I, O correspond to the runtime +// values of the state machine implemented here from the perspective of the +// current process. They are tracked in the type system to raise errors on +// attempts to make invalid transitions or to use the protected buffer +// while the other process owns it. + +template struct port_t { + static_assert(I == 0 || I == 1, ""); + static_assert(O == 0 || O == 1, ""); + uint32_t value; + + port_t(uint32_t value) : value(value) {} + + port_t invert_inbox() { return value; } + port_t invert_outbox() { return value; } +}; + +template struct bitmap_t { +private: + cpp::Atomic *underlying; + using Word = uint32_t; + + inline uint32_t index_to_element(uint32_t x) { + uint32_t wordBits = 8 * sizeof(Word); + return x / wordBits; + } + + inline uint32_t index_to_subindex(uint32_t x) { + uint32_t wordBits = 8 * sizeof(Word); + return x % wordBits; + } + + inline bool nthbitset(uint32_t x, uint32_t n) { + return x & (UINT32_C(1) << n); + } + + inline bool nthbitset(uint64_t x, uint32_t n) { + return x & (UINT64_C(1) << n); + } + + inline uint32_t setnthbit(uint32_t x, uint32_t n) { + return x | (UINT32_C(1) << n); + } + + inline uint64_t setnthbit(uint64_t x, uint32_t n) { + return x | (UINT64_C(1) << n); + } + + static constexpr bool system_scope() { + return scope == __OPENCL_MEMORY_SCOPE_ALL_SVM_DEVICES; + } + static constexpr bool device_scope() { + return scope == __OPENCL_MEMORY_SCOPE_DEVICE; + } + + static_assert(system_scope() || device_scope(), ""); + static_assert(system_scope() != device_scope(), ""); + + Word load_word(uint32_t w) const { + cpp::Atomic &addr = underlying[w]; + Word tmp = addr.load(cpp::MemoryOrder::RELAXED); + return InvertedLoad ? ~tmp : tmp; + } + +public: + bitmap_t() /*: underlying(nullptr)*/ {} + bitmap_t(cpp::Atomic *d) : underlying(d) { + // can't necessarily write to the pointer from this object. if the memory is + // on a gpu, but this instance is being constructed on a cpu first, then + // direct writes will fail. However, the data does need to be zeroed for the + // bitmap to work. + } + + bool read_slot(uint32_t slot) { + uint32_t w = index_to_element(slot); + uint32_t subindex = index_to_subindex(slot); + return nthbitset(load_word(w), subindex); + } + + template port_t claim_slot(port_t port) { + uint32_t slot = port.value; + + uint32_t w = index_to_element(slot); + uint32_t subindex = index_to_subindex(slot); + + cpp::Atomic &addr = underlying[w]; + Word before; + if (system_scope()) { + // System scope is used here to approximate 'might be over pcie', where + // the available atomic operations are likely to be CAS, Add, Exchange. + // Set the bit using the knowledge that it is currently clear. + Word addend = (Word)1 << subindex; + before = addr.fetch_add(addend, cpp::MemoryOrder::ACQ_REL); + } else { + // Set the bit more clearly. TODO: device scope is missing from atomic.h + Word mask = setnthbit((Word)0, subindex); + before = addr.fetch_or(mask, cpp::MemoryOrder::ACQ_REL); + } + + (void)before; + return port.invert_outbox(); + } + + void release_slot(uint32_t i) { + uint32_t w = index_to_element(i); + uint32_t subindex = index_to_subindex(i); + + cpp::Atomic &addr = underlying[w]; + + if (system_scope()) { + // Clear the bit using the knowledge that it is currently set. + Word addend = 1 + ~((Word)1 << subindex); + addr.fetch_add(addend, cpp::MemoryOrder::ACQ_REL); + } else { + // Clear the bit more clearly + Word mask = ~setnthbit((Word)0, subindex); + addr.fetch_and(mask, cpp::MemoryOrder::ACQ_REL); + } + } + + template port_t release_slot(port_t port) { + release_slot(port.value); + return port.invert_outbox(); + } + + bool try_claim_slot(uint32_t slot) { + uint32_t w = index_to_element(slot); + uint32_t subindex = index_to_subindex(slot); + + static_assert(device_scope(), ""); + + Word mask = setnthbit((Word)0, subindex); + + // Fetch or implementing test and set as a faster alternative to CAS + cpp::Atomic &addr = underlying[w]; + uint32_t before = addr.fetch_or(mask, cpp::MemoryOrder::ACQ_REL); + + return !nthbitset(before, subindex); + } +}; + +// TODO: Work out a reasonable way to abstract over this +template struct WaveSizeType; +template <> struct WaveSizeType<32> { using Type = uint32_t; }; +template <> struct WaveSizeType<64> { using Type = uint64_t; }; + /// A common process used to synchronize communication between a client and a /// server. The process contains an inbox and an outbox used for signaling /// ownership of the shared buffer. +template struct Process { - LIBC_INLINE Process() = default; - LIBC_INLINE Process(const Process &) = default; - LIBC_INLINE Process &operator=(const Process &) = default; - LIBC_INLINE ~Process() = default; + static_assert(WaveSize == 32 || WaveSize == 64, ""); + + static_assert(WaveSize == 32, "64 not yet implemented"); + + // The inverted read on inbox determines which of two linked processes + // initially owns the underlying buffer. + // The initial memory state is inbox == outbox == 0 which implies ownership, + // but one process has inbox reads intercepted and inverted so it starts out + // believing the memory state is inbox == 1, outbox == 0, which implies the + // other process owns the buffer. + BufferElement *shared_buffer; + bitmap_t active; + bitmap_t inbox; + bitmap_t outbox; + + using ThreadMask = typename WaveSizeType::Type; - cpp::Atomic *inbox; - cpp::Atomic *outbox; - Buffer *buffer; + Process() = default; + ~Process() = default; + + Process(cpp::Atomic *locks, cpp::Atomic *inbox, + cpp::Atomic *outbox, BufferElement *shared_buffer) + : shared_buffer(shared_buffer), active(locks), inbox(inbox), + outbox(outbox) {} /// Initialize the communication channels. - LIBC_INLINE void reset(void *inbox, void *outbox, void *buffer) { - *this = { - reinterpret_cast *>(inbox), - reinterpret_cast *>(outbox), - reinterpret_cast(buffer), - }; + LIBC_INLINE void reset(void *locks, void *inbox, void *outbox, void *buffer) { + this->active = reinterpret_cast *>(locks); + this->inbox = reinterpret_cast *>(inbox); + this->outbox = reinterpret_cast *>(outbox); + this->shared_buffer = reinterpret_cast(buffer); + } + + template struct maybe { + T value; + bool success; + }; + + port_t<0, 0> open(ThreadMask active_threads) { + for (;;) { + maybe> r = try_open(active_threads); + if (r.success) { + return r.value; + } + sleep_briefly(); + } + } + + maybe> try_open(ThreadMask active_threads) { + + for (uint32_t p = 0; p < NumberPorts; p++) { + bool claim = false; + if (is_first_lane(active_threads)) { + claim = active.try_claim_slot(p); + } + claim = broadcast_first_lane(active_threads, claim); + + if (!claim) { + continue; + } + + atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); + bool in = inbox.read_slot(p); + bool out = outbox.read_slot(p); + + if (in == 0 && out == 0) { + return {p, true}; + } + + if (in == 1 && out == 1) { + // garbage collect from an async call + outbox.release_slot(p); + } + + active.release_slot(p); + } + + return {UINT32_MAX, false}; + } + + template void close(port_t port) { + active.release_slot(port.value); + } + + template + void use(port_t port, Op op) { + uint32_t raw = port.value; + op(&shared_buffer[raw].data[get_lane_id()]); + } + + template port_t wait(port_t port) { + bool in = inbox.read_slot(port.value); + while (in == I) { + sleep_briefly(); + in = inbox.read_slot(port.value); + } + + atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); + return port.invert_inbox(); + } + + template + port_t send(port_t port) { + atomic_thread_fence(cpp::MemoryOrder::RELEASE); + if constexpr (IandO == 0) { + return outbox.claim_slot(port); + } else { + return outbox.release_slot(port); + } } }; /// The RPC client used to make requests to the server. -struct Client : public Process { +struct Client : public Process { LIBC_INLINE Client() = default; LIBC_INLINE Client(const Client &) = default; LIBC_INLINE Client &operator=(const Client &) = default; LIBC_INLINE ~Client() = default; template LIBC_INLINE void run(F fill, U use); + template LIBC_INLINE void run_async(F fill); }; /// The RPC server used to respond to the client. -struct Server : public Process { +struct Server : public Process { LIBC_INLINE Server() = default; LIBC_INLINE Server(const Server &) = default; LIBC_INLINE Server &operator=(const Server &) = default; @@ -90,77 +344,64 @@ /// - Apply \p use to the shared buffer and write 0 to the outbox. /// - Wait until the inbox is 0. template LIBC_INLINE void Client::run(F fill, U use) { - bool in = inbox->load(cpp::MemoryOrder::RELAXED); - bool out = outbox->load(cpp::MemoryOrder::RELAXED); - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); + uint32_t ThreadMask = 1; + + port_t<0, 0> port0 = open(ThreadMask); + // Apply the \p fill to the buffer and signal the server. - if (!in & !out) { - fill(buffer); - atomic_thread_fence(cpp::MemoryOrder::RELEASE); - outbox->store(1, cpp::MemoryOrder::RELAXED); - out = 1; - } + this->use(port0, fill); + port_t<0, 1> port1 = send(port0); + // Wait for the server to work on the buffer and respond. - if (!in & out) { - while (!in) { - sleep_briefly(); - in = inbox->load(cpp::MemoryOrder::RELAXED); - } - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); - } + port_t<1, 1> port2 = wait(port1); + // Apply \p use to the buffer and signal the server. - if (in & out) { - use(buffer); - atomic_thread_fence(cpp::MemoryOrder::RELEASE); - outbox->store(0, cpp::MemoryOrder::RELAXED); - out = 0; - } + this->use(port2, use); + port_t<1, 0> port3 = send(port2); + // Wait for the server to signal the end of the protocol. - if (in & !out) { - while (in) { - sleep_briefly(); - in = inbox->load(cpp::MemoryOrder::RELAXED); - } - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); - } + close(port3); +} + +template LIBC_INLINE void Client::run_async(F fill) { + uint32_t ThreadMask = 1; + port_t<0, 0> port0 = open(ThreadMask); + // Apply the \p fill to the buffer and signal the server. + this->use(port0, fill); + port_t<0, 1> port1 = send(port0); + close(port1); } /// Run the RPC server protocol to communicate with the client. This is /// non-blocking and only checks the server a single time. We perform the /// following high level actions to complete a communication: -/// - Query if the inbox is 1 and exit if there is no work to do. +/// - Open a port or exit if there is no work to do. /// - Apply \p work to the shared buffer and write 1 to the outbox. -/// - Wait until the inbox is 0. +/// - Wait until the inbox is 1. /// - Apply \p clean to the shared buffer and write 0 to the outbox. template LIBC_INLINE bool Server::handle(W work, C clean) { - bool in = inbox->load(cpp::MemoryOrder::RELAXED); - bool out = outbox->load(cpp::MemoryOrder::RELAXED); - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); + uint32_t ThreadMask = 1; + + auto maybe_port = try_open(ThreadMask); // There is no work to do, exit early. - if (!in & !out) + if (!maybe_port.success) return false; + + port_t<0, 0> port0 = maybe_port.value; + // Apply \p work to the buffer and signal the client. - if (in & !out) { - work(buffer); - atomic_thread_fence(cpp::MemoryOrder::RELEASE); - outbox->store(1, cpp::MemoryOrder::RELAXED); - out = 1; - } + use(port0, work); + port_t<0, 1> port1 = send(port0); + // Wait for the client to use the buffer and respond. - if (in & out) { - while (in) - in = inbox->load(cpp::MemoryOrder::RELAXED); - atomic_thread_fence(cpp::MemoryOrder::ACQUIRE); - } + port_t<1, 1> port2 = wait(port1); + // Clean up the buffer and signal the end of the protocol. - if (!in & out) { - clean(buffer); - atomic_thread_fence(cpp::MemoryOrder::RELEASE); - outbox->store(0, cpp::MemoryOrder::RELAXED); - out = 0; - } + use(port2, clean); + port_t<1, 0> port3 = send(port2); + close(port3); return true; } diff --git a/libc/src/__support/RPC/rpc_util.h b/libc/src/__support/RPC/rpc_util.h --- a/libc/src/__support/RPC/rpc_util.h +++ b/libc/src/__support/RPC/rpc_util.h @@ -12,6 +12,8 @@ #include "src/__support/macros/attributes.h" #include "src/__support/macros/properties/architectures.h" +#include + namespace __llvm_libc { namespace rpc { @@ -26,6 +28,57 @@ #endif } +LIBC_INLINE uint64_t get_lane_id() { +#if defined(LIBC_TARGET_ARCH_IS_NVPTX) + return __nvvm_read_ptx_sreg_tid_x() /*threadIdx.x*/ & (32 - 1); + +#elif defined(LIBC_TARGET_ARCH_IS_AMDGPU) +#if __AMDGCN_WAVEFRONT_SIZE == 64 + return __builtin_amdgcn_mbcnt_hi(~0u, __builtin_amdgcn_mbcnt_lo(~0u, 0u)); +#elif __AMDGCN_WAVEFRONT_SIZE == 32 + return __builtin_amdgcn_mbcnt_lo(~0u, 0u); +#else +#error "" +#endif +#else + return 0; +#endif +} + +template LIBC_INLINE uint64_t get_first_lane_id(T active_threads) { + return __builtin_ffsl(active_threads) - 1; +} + +template LIBC_INLINE bool is_first_lane(T active_threads) { + + return get_lane_id() == get_first_lane_id(active_threads); +} + +template +LIBC_INLINE uint32_t broadcast_first_lane(T active_threads, uint32_t x) { + (void)active_threads; + +#if defined(LIBC_TARGET_ARCH_IS_NVPTX) && __CUDA_ARCH__ >= 700 + +#if 0 +#error \ + "This doesn't compile, needs target feature ptx60...., despite the cuda arch guard" + uint32_t first_id = get_first_lane_id(active_threads); + return __nvvm_shfl_sync_idx_i32(active_threads, x, first_id, + 32 - 1); +#else + return x; +#endif + +#elif defined(LIBC_TARGET_ARCH_IS_AMDGPU) + // reads from lowest set bit in exec mask + // this is OK from definition of is_first_lane + return __builtin_amdgcn_readfirstlane(x); +#else + return x; +#endif +} + } // namespace rpc } // namespace __llvm_libc diff --git a/libc/startup/gpu/amdgpu/start.cpp b/libc/startup/gpu/amdgpu/start.cpp --- a/libc/startup/gpu/amdgpu/start.cpp +++ b/libc/startup/gpu/amdgpu/start.cpp @@ -13,7 +13,9 @@ extern "C" [[gnu::visibility("protected"), clang::amdgpu_kernel]] void _start(int argc, char **argv, char **envp, int *ret, void *in, void *out, void *buffer) { - __llvm_libc::rpc::client.reset(in, out, buffer); + static __llvm_libc::cpp::Atomic + locks[__llvm_libc::rpc::NumberUInt32ForBitmaps] = {0}; + __llvm_libc::rpc::client.reset(&locks, in, out, buffer); __atomic_fetch_or(ret, main(argc, argv, envp), __ATOMIC_RELAXED); } diff --git a/libc/startup/gpu/nvptx/start.cpp b/libc/startup/gpu/nvptx/start.cpp --- a/libc/startup/gpu/nvptx/start.cpp +++ b/libc/startup/gpu/nvptx/start.cpp @@ -13,7 +13,9 @@ extern "C" [[gnu::visibility("protected")]] __attribute__((nvptx_kernel)) void _start(int argc, char **argv, char **envp, int *ret, void *in, void *out, void *buffer) { - __llvm_libc::rpc::client.reset(in, out, buffer); + static __llvm_libc::cpp::Atomic + locks[__llvm_libc::rpc::NumberUInt32ForBitmaps] = {0}; + __llvm_libc::rpc::client.reset(&locks, in, out, buffer); __atomic_fetch_or(ret, main(argc, argv, envp), __ATOMIC_RELAXED); } diff --git a/libc/utils/gpu/loader/amdgpu/Loader.cpp b/libc/utils/gpu/loader/amdgpu/Loader.cpp --- a/libc/utils/gpu/loader/amdgpu/Loader.cpp +++ b/libc/utils/gpu/loader/amdgpu/Loader.cpp @@ -45,7 +45,7 @@ /// are any active requests. void handle_server() { while (server.handle( - [&](__llvm_libc::rpc::Buffer *buffer) { + [&](__llvm_libc::rpc::ThreadBuffer *buffer) { switch (static_cast<__llvm_libc::rpc::Opcode>(buffer->data[0])) { case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: { fputs(reinterpret_cast(&buffer->data[1]), stderr); @@ -59,7 +59,7 @@ return; }; }, - [](__llvm_libc::rpc::Buffer *buffer) {})) + [](__llvm_libc::rpc::ThreadBuffer *buffer) {})) ; } @@ -314,16 +314,19 @@ void *server_inbox; void *server_outbox; void *buffer; - if (hsa_status_t err = hsa_amd_memory_pool_allocate( - finegrained_pool, sizeof(__llvm_libc::cpp::Atomic), - /*flags=*/0, &server_inbox)) + + size_t bitmap_number_bytes = __llvm_libc::rpc::NumberUInt32ForBitmaps * 32; + if (hsa_status_t err = + hsa_amd_memory_pool_allocate(finegrained_pool, bitmap_number_bytes, + /*flags=*/0, &server_inbox)) handle_error(err); - if (hsa_status_t err = hsa_amd_memory_pool_allocate( - finegrained_pool, sizeof(__llvm_libc::cpp::Atomic), - /*flags=*/0, &server_outbox)) + if (hsa_status_t err = + hsa_amd_memory_pool_allocate(finegrained_pool, bitmap_number_bytes, + /*flags=*/0, &server_outbox)) handle_error(err); if (hsa_status_t err = hsa_amd_memory_pool_allocate( - finegrained_pool, sizeof(__llvm_libc::rpc::Buffer), + finegrained_pool, + __llvm_libc::rpc::NumberPorts * sizeof(__llvm_libc::rpc::Buffer), /*flags=*/0, &buffer)) handle_error(err); hsa_amd_agents_allow_access(1, &dev_agent, nullptr, server_inbox); @@ -373,7 +376,9 @@ handle_error(err); // Initialize the RPC server's buffer for host-device communication. - server.reset(server_inbox, server_outbox, buffer); + static __llvm_libc::cpp::Atomic + locks[__llvm_libc::rpc::NumberUInt32ForBitmaps] = {0}; + server.reset(&locks, server_inbox, server_outbox, buffer); // Initialize the packet header and set the doorbell signal to begin execution // by the HSA runtime.