diff --git a/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp b/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp --- a/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp +++ b/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp @@ -599,6 +599,15 @@ return Plugin::check(Status, "Error in hsa_queue_destroy: %s"); } + /// Returns if this queue is considered busy + bool isBusy() const { return NumUsers.load() > 0; } + + /// Decrement user count of the queue object + void removeUser() { --NumUsers; } + + /// Increase user count of the queue object + void addUser() { ++NumUsers; } + /// Push a kernel launch to the queue. The kernel launch requires an output /// signal and can define an optional input signal (nullptr if none). Error pushKernelLaunch(const AMDGPUKernelTy &Kernel, void *KernelArgs, @@ -779,6 +788,9 @@ /// TODO: There are other more advanced approaches to avoid this mutex using /// atomic operations. We can further investigate it if this is a bottleneck. std::mutex Mutex; + + /// Indicates that the queue is busy when > 0 + std::atomic NumUsers{0}; }; /// Struct that implements a stream of asynchronous operations for AMDGPU @@ -888,7 +900,7 @@ hsa_agent_t Agent; /// The queue that the stream uses to launch kernels. - AMDGPUQueueTy &Queue; + AMDGPUQueueTy *Queue; /// The manager of signals to reuse signals. AMDGPUSignalManagerTy &SignalManager; @@ -971,6 +983,9 @@ /// signal of the current stream, and 2) the last signal of the other stream. /// Use a barrier packet with two input signals. Error waitOnStreamOperation(AMDGPUStreamTy &OtherStream, uint32_t Slot) { + if (Queue == nullptr) + return Plugin::error("Target queue was nullptr"); + /// The signal that we must wait from the other stream. AMDGPUSignalTy *OtherSignal = OtherStream.Slots[Slot].Signal; @@ -990,9 +1005,14 @@ return Err; // Push a barrier into the queue with both input signals. - return Queue.pushBarrier(OutputSignal, InputSignal, OtherSignal); + return Queue->pushBarrier(OutputSignal, InputSignal, OtherSignal); } + /// Assign another queue to this stream. This should only be called by a + /// ResourceManager when this stream is provided from a resource pool. By + /// using another queue, we may avoid putting work on an already busy queue. + void assignQueue(AMDGPUQueueTy *NewQueue) { Queue = NewQueue; } + /// Callback for running a specific asynchronous operation. This callback is /// used for hsa_amd_signal_async_handler. The argument is the operation that /// should be executed. Notice we use the post action mechanism to codify the @@ -1072,6 +1092,9 @@ uint32_t NumThreads, uint64_t NumBlocks, uint32_t GroupSize, AMDGPUMemoryManagerTy &MemoryManager) { + if (Queue == nullptr) + return Plugin::error("Target queue was nullptr"); + // Retrieve an available signal for the operation's output. AMDGPUSignalTy *OutputSignal = SignalManager.getResource(); OutputSignal->reset(); @@ -1087,8 +1110,8 @@ return Err; // Push the kernel with the output signal and an input signal (optional) - return Queue.pushKernelLaunch(Kernel, KernelArgs, NumThreads, NumBlocks, - GroupSize, OutputSignal, InputSignal); + return Queue->pushKernelLaunch(Kernel, KernelArgs, NumThreads, NumBlocks, + GroupSize, OutputSignal, InputSignal); } /// Push an asynchronous memory copy between pinned memory buffers. @@ -1311,6 +1334,8 @@ /// Make the stream wait on an event. Error waitEvent(const AMDGPUEventTy &Event); + + friend struct AMDGPUStreamManagerTy; }; /// Class representing an event on AMDGPU. The event basically stores some @@ -1408,6 +1433,128 @@ return waitOnStreamOperation(RecordedStream, Event.RecordedSlot); } +struct AMDGPUStreamManagerTy final + : GenericDeviceResourceManagerTy> { + using ResourceRef = AMDGPUResourceRef; + using ResourcePoolTy = GenericDeviceResourceManagerTy; + + AMDGPUStreamManagerTy(GenericDeviceTy &Device, hsa_agent_t HsaAgent) + : GenericDeviceResourceManagerTy(Device), Agent(HsaAgent) {} + + Error init(uint32_t InitialSize, int NumHsaQueues, int HsaQueueSize) { + Queues = std::vector(NumHsaQueues); + QueueSize = HsaQueueSize; + MaxNumQueues = NumHsaQueues; + // Initialize one queue eagerly + if (auto Err = Queues.front().init(Agent, QueueSize)) { + return Err; + } + + return GenericDeviceResourceManagerTy::init(InitialSize); + } + + /// Deinitialize the resource pool and delete all resources. This function + /// must be called before the destructor. + Error deinit() override { + // De-init all queues + for (AMDGPUQueueTy &Queue : Queues) { + if (auto Err = Queue.deinit()) + return Err; + } + + return GenericDeviceResourceManagerTy::deinit(); + } + + /// Get resource from the pool or create new resources. + ResourceRef getResource() override { + const std::lock_guard Lock(Mutex); + assert(NextAvailable <= ResourcePool.size() && + "Resource pool is corrupted"); + + if (NextAvailable == ResourcePool.size()) { + // By default we double the resource pool every time. + if (auto Err = ResourcePoolTy::resizeResourcePool(NextAvailable * 2)) { + REPORT("Failure to resize the resource pool: %s", + toString(std::move(Err)).data()); + // Return an empty reference. + return ResourceRef(); + } + } + + // Find an ideally idle queue, for the stream + AMDGPUQueueTy *Queue = getNextIdleQueue(); + + // Select the next available resource reference and increment counter + auto &Resource = ResourcePool[NextAvailable++]; + + // Mark queue as busy, assign to the stream and return it + Queue->addUser(); + (*Resource).assignQueue(Queue); + return Resource; + } + + /// Return resource to the pool. + void returnResource(ResourceRef Resource) override { + const std::lock_guard Lock(Mutex); + assert(NextAvailable > 0 && "Resource pool is corrupted"); + (*Resource).Queue->removeUser(); + ResourcePool[--NextAvailable] = Resource; + } + +private: + /// Search and return an prefereably idle queue. If there is no queue + /// without current users, resort to round robin selection. + inline AMDGPUQueueTy *getNextIdleQueue() { + uint32_t StartIndex = + NextQueue.fetch_add(1, std::memory_order_relaxed) % MaxNumQueues; + AMDGPUQueueTy *Q = nullptr; + + // Start the search from the starting index + for (int i = StartIndex; i < MaxNumQueues; ++i) { + Q = &Queues[i]; + if (!Q->isBusy()) { + if (auto Err = Q->initLazy(Agent, QueueSize)) { + REPORT("Failure during queue init: %s\n", + toString(std::move(Err)).data()); + return &Queues[0]; + } + return Q; + } + } + + // Wrap around and continue search from the beginning + for (int i = 0; i < StartIndex; ++i) { + Q = &Queues[i]; + if (!Q->isBusy()) { + if (auto Err = Q->initLazy(Agent, QueueSize)) { + REPORT("Failure during queue init: %s\n", + toString(std::move(Err)).data()); + return &Queues[0]; + } + return Q; + } + } + + // All queues busy: Round robin + return &Queues[StartIndex]; + } + + /// The next queue index to use for round robin selection. + std::atomic NextQueue{0}; + + /// The queues which are assigned to requested streams. + std::vector Queues; + + /// The corresponding device as HSA agent. + hsa_agent_t Agent; + + /// The maximum number of queues. + int MaxNumQueues; + + /// The size of created queues. + int QueueSize; +}; + /// Abstract class that holds the common members of the actual kernel devices /// and the host device. Both types should inherit from this class. struct AMDGenericDeviceTy { @@ -1587,9 +1734,8 @@ OMPX_InitialNumSignals("LIBOMPTARGET_AMDGPU_NUM_INITIAL_HSA_SIGNALS", 64), OMPX_StreamBusyWait("LIBOMPTARGET_AMDGPU_STREAM_BUSYWAIT", 2000000), - AMDGPUStreamManager(*this), AMDGPUEventManager(*this), - AMDGPUSignalManager(*this), Agent(Agent), HostDevice(HostDevice), - Queues() {} + AMDGPUStreamManager(*this, Agent), AMDGPUEventManager(*this), + AMDGPUSignalManager(*this), Agent(Agent), HostDevice(HostDevice) {} ~AMDGPUDeviceTy() {} @@ -1659,14 +1805,9 @@ OMPX_NumQueues = std::max(1U, std::min(OMPX_NumQueues.get(), MaxQueues)); OMPX_QueueSize = std::min(OMPX_QueueSize.get(), MaxQueueSize); - // Construct and initialize each device queue. - Queues = std::vector(OMPX_NumQueues); - // Initialize one queue eagerly. - if (auto Err = Queues.front().init(Agent, OMPX_QueueSize)) - return Err; - // Initialize stream pool. - if (auto Err = AMDGPUStreamManager.init(OMPX_InitialNumStreams)) + if (auto Err = AMDGPUStreamManager.init(OMPX_InitialNumStreams, + OMPX_NumQueues, OMPX_QueueSize)) return Err; // Initialize event pool. @@ -1705,11 +1846,6 @@ } } - for (AMDGPUQueueTy &Queue : Queues) { - if (auto Err = Queue.deinit()) - return Err; - } - // Invalidate agent reference. Agent = {0}; @@ -2376,29 +2512,8 @@ }); } - /// Get the next queue in a round-robin fashion, includes lazy initialization. - AMDGPUQueueTy &getNextQueue() { - uint32_t Current = NextQueue.fetch_add(1, std::memory_order_relaxed); - uint32_t Idx = Current % Queues.size(); - auto &Queue = Queues[Idx]; - // Only queue 0 has been initialized eagerly. Others might need lazy/late - // initialization. - if (Idx == 0) - return Queue; - - if (auto Err = Queue.initLazy(Agent, OMPX_QueueSize)) { - // Gracefully handle late initialization errors, but report them anyway. - REPORT("%s\n", toString(std::move(Err)).data()); - return Queues[0]; - } - return Queue; - } - private: - using AMDGPUStreamRef = AMDGPUResourceRef; using AMDGPUEventRef = AMDGPUResourceRef; - - using AMDGPUStreamManagerTy = GenericDeviceResourceManagerTy; using AMDGPUEventManagerTy = GenericDeviceResourceManagerTy; /// Envar for controlling the number of HSA queues per device. High number of @@ -2454,12 +2569,6 @@ /// Reference to the host device. AMDHostDeviceTy &HostDevice; - - /// List of device packet queues. - std::vector Queues; - - // The next queue to be used for a new stream. - std::atomic NextQueue = {0}; }; Error AMDGPUDeviceImageTy::loadExecutable(const AMDGPUDeviceTy &Device) { @@ -2531,7 +2640,7 @@ } AMDGPUStreamTy::AMDGPUStreamTy(AMDGPUDeviceTy &Device) - : Agent(Device.getAgent()), Queue(Device.getNextQueue()), + : Agent(Device.getAgent()), Queue(nullptr), SignalManager(Device.getSignalManager()), // Initialize the std::deque with some empty positions. Slots(32), NextSlot(0), SyncCycle(0), diff --git a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h --- a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h +++ b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h @@ -1108,7 +1108,7 @@ /// Deinitialize the resource pool and delete all resources. This function /// must be called before the destructor. - Error deinit() { + virtual Error deinit() { if (NextAvailable) DP("Missing %d resources to be returned\n", NextAvailable); @@ -1123,7 +1123,7 @@ } /// Get resource from the pool or create new resources. - ResourceRef getResource() { + virtual ResourceRef getResource() { const std::lock_guard Lock(Mutex); assert(NextAvailable <= ResourcePool.size() && @@ -1142,14 +1142,14 @@ } /// Return resource to the pool. - void returnResource(ResourceRef Resource) { + virtual void returnResource(ResourceRef Resource) { const std::lock_guard Lock(Mutex); assert(NextAvailable > 0 && "Resource pool is corrupted"); ResourcePool[--NextAvailable] = Resource; } -private: +protected: /// The resources between \p OldSize and \p NewSize need to be created or /// destroyed. The mutex is locked when this function is called. Error resizeResourcePoolImpl(uint32_t OldSize, uint32_t NewSize) {