diff --git a/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp b/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp --- a/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp +++ b/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp @@ -571,7 +571,7 @@ /// Class holding an HSA queue to submit kernel and barrier packets. struct AMDGPUQueueTy { /// Create an empty queue. - AMDGPUQueueTy() : Queue(nullptr), Mutex() {} + AMDGPUQueueTy() : Queue(nullptr), Mutex(), NumUsers(0) {} /// Initialize a new queue belonging to a specific agent. Error init(hsa_agent_t Agent, int32_t QueueSize) { @@ -583,8 +583,6 @@ /// If the queue is not initialized, do it now. Error initLazy(hsa_agent_t Agent, int32_t QueueSize) { - // Lock the queue during the lazy init - std::lock_guard Lock(Mutex); if (Queue) return Plugin::success(); return init(Agent, QueueSize); @@ -599,6 +597,15 @@ return Plugin::check(Status, "Error in hsa_queue_destroy: %s"); } + /// Returns if this queue is considered busy + bool isBusy() const { return NumUsers > 0; } + + /// Decrement user count of the queue object + void removeUser() { --NumUsers; } + + /// Increase user count of the queue object + void addUser() { ++NumUsers; } + /// Push a kernel launch to the queue. The kernel launch requires an output /// signal and can define an optional input signal (nullptr if none). Error pushKernelLaunch(const AMDGPUKernelTy &Kernel, void *KernelArgs, @@ -779,6 +786,9 @@ /// TODO: There are other more advanced approaches to avoid this mutex using /// atomic operations. We can further investigate it if this is a bottleneck. std::mutex Mutex; + + /// Indicates that the queue is busy when > 0 + int NumUsers; }; /// Struct that implements a stream of asynchronous operations for AMDGPU @@ -888,7 +898,7 @@ hsa_agent_t Agent; /// The queue that the stream uses to launch kernels. - AMDGPUQueueTy &Queue; + AMDGPUQueueTy *Queue; /// The manager of signals to reuse signals. AMDGPUSignalManagerTy &SignalManager; @@ -971,6 +981,9 @@ /// signal of the current stream, and 2) the last signal of the other stream. /// Use a barrier packet with two input signals. Error waitOnStreamOperation(AMDGPUStreamTy &OtherStream, uint32_t Slot) { + if (Queue == nullptr) + return Plugin::error("Target queue was nullptr"); + /// The signal that we must wait from the other stream. AMDGPUSignalTy *OtherSignal = OtherStream.Slots[Slot].Signal; @@ -990,7 +1003,15 @@ return Err; // Push a barrier into the queue with both input signals. - return Queue.pushBarrier(OutputSignal, InputSignal, OtherSignal); + return Queue->pushBarrier(OutputSignal, InputSignal, OtherSignal); + } + + /// Assign another queue to this stream. This should only be called by a + /// ResourceManager when this stream is provided from a resource pool. By + /// using another queue, we may avoid putting work on an already busy queue. + void assignQueue(AMDGPUQueueTy *NewQueue) { + NewQueue->addUser(); + Queue = NewQueue; } /// Callback for running a specific asynchronous operation. This callback is @@ -1072,6 +1093,9 @@ uint32_t NumThreads, uint64_t NumBlocks, uint32_t GroupSize, AMDGPUMemoryManagerTy &MemoryManager) { + if (Queue == nullptr) + return Plugin::error("Target queue was nullptr"); + // Retrieve an available signal for the operation's output. AMDGPUSignalTy *OutputSignal = SignalManager.getResource(); OutputSignal->reset(); @@ -1087,8 +1111,8 @@ return Err; // Push the kernel with the output signal and an input signal (optional) - return Queue.pushKernelLaunch(Kernel, KernelArgs, NumThreads, NumBlocks, - GroupSize, OutputSignal, InputSignal); + return Queue->pushKernelLaunch(Kernel, KernelArgs, NumThreads, NumBlocks, + GroupSize, OutputSignal, InputSignal); } /// Push an asynchronous memory copy between pinned memory buffers. @@ -1311,6 +1335,8 @@ /// Make the stream wait on an event. Error waitEvent(const AMDGPUEventTy &Event); + + friend struct AMDGPUStreamManagerTy; }; /// Class representing an event on AMDGPU. The event basically stores some @@ -1408,6 +1434,102 @@ return waitOnStreamOperation(RecordedStream, Event.RecordedSlot); } +struct AMDGPUStreamManagerTy final + : GenericDeviceResourceManagerTy> { + using ResourceRef = AMDGPUResourceRef; + using ResourcePoolTy = GenericDeviceResourceManagerTy; + + AMDGPUStreamManagerTy(GenericDeviceTy &Device, hsa_agent_t HsaAgent) + : GenericDeviceResourceManagerTy(Device), NextQueue(0), Agent(HsaAgent) {} + + Error init(uint32_t InitialSize, int NumHsaQueues, int HsaQueueSize) { + Queues = std::vector(NumHsaQueues); + QueueSize = HsaQueueSize; + MaxNumQueues = NumHsaQueues; + // Initialize one queue eagerly + if (auto Err = Queues.front().init(Agent, QueueSize)) + return Err; + + return GenericDeviceResourceManagerTy::init(InitialSize); + } + + /// Deinitialize the resource pool and delete all resources. This function + /// must be called before the destructor. + Error deinit() override { + // De-init all queues + for (AMDGPUQueueTy &Queue : Queues) { + if (auto Err = Queue.deinit()) + return Err; + } + + return GenericDeviceResourceManagerTy::deinit(); + } + + /// Get resource from the pool or create new resources. + ResourceRef getResource() override { + ResourceRef Resource = GenericDeviceResourceManagerTy::getResource(); + // Assign an ideally idle queue to the stream + assignNextQueue(Resource); + return Resource; + } + + /// Return resource to the pool. + void returnResource(ResourceRef Resource) override { + std::unique_lock Lock(Mutex); + (*Resource).Queue->removeUser(); + Lock.unlock(); + GenericDeviceResourceManagerTy::returnResource(Resource); + } + +private: + /// Search for and assign an prefereably idle queue to the given Stream. If + /// there is no queue without current users, resort to round robin selection. + inline void assignNextQueue(ResourceRef Resource) { + const std::lock_guard Lock(Mutex); + uint32_t StartIndex = NextQueue % MaxNumQueues; + AMDGPUQueueTy *Q = nullptr; + + for (int i = 0; i < MaxNumQueues; ++i) { + Q = &Queues[StartIndex++]; + if (StartIndex == MaxNumQueues) + StartIndex = 0; + if (Q->isBusy()) + continue; + else { + if (auto Err = Q->initLazy(Agent, QueueSize)) { + REPORT("Failure during queue init: %s\n", + toString(std::move(Err)).data()); + Q = &Queues[0]; + } + + Q->addUser(); + (*Resource).Queue = Q; + return; + } + } + + // All queues busy: Round robin (StartIndex has the original value again) + Queues[StartIndex].addUser(); + (*Resource).Queue = &Queues[StartIndex]; + ++NextQueue; + } + + /// The next queue index to use for round robin selection. + uint32_t NextQueue; + + /// The queues which are assigned to requested streams. + std::vector Queues; + + /// The corresponding device as HSA agent. + hsa_agent_t Agent; + + /// The maximum number of queues. + int MaxNumQueues; + + /// The size of created queues. + int QueueSize; +}; + /// Abstract class that holds the common members of the actual kernel devices /// and the host device. Both types should inherit from this class. struct AMDGenericDeviceTy { @@ -1587,9 +1709,8 @@ OMPX_InitialNumSignals("LIBOMPTARGET_AMDGPU_NUM_INITIAL_HSA_SIGNALS", 64), OMPX_StreamBusyWait("LIBOMPTARGET_AMDGPU_STREAM_BUSYWAIT", 2000000), - AMDGPUStreamManager(*this), AMDGPUEventManager(*this), - AMDGPUSignalManager(*this), Agent(Agent), HostDevice(HostDevice), - Queues() {} + AMDGPUStreamManager(*this, Agent), AMDGPUEventManager(*this), + AMDGPUSignalManager(*this), Agent(Agent), HostDevice(HostDevice) {} ~AMDGPUDeviceTy() {} @@ -1659,14 +1780,9 @@ OMPX_NumQueues = std::max(1U, std::min(OMPX_NumQueues.get(), MaxQueues)); OMPX_QueueSize = std::min(OMPX_QueueSize.get(), MaxQueueSize); - // Construct and initialize each device queue. - Queues = std::vector(OMPX_NumQueues); - // Initialize one queue eagerly. - if (auto Err = Queues.front().init(Agent, OMPX_QueueSize)) - return Err; - // Initialize stream pool. - if (auto Err = AMDGPUStreamManager.init(OMPX_InitialNumStreams)) + if (auto Err = AMDGPUStreamManager.init(OMPX_InitialNumStreams, + OMPX_NumQueues, OMPX_QueueSize)) return Err; // Initialize event pool. @@ -1705,11 +1821,6 @@ } } - for (AMDGPUQueueTy &Queue : Queues) { - if (auto Err = Queue.deinit()) - return Err; - } - // Invalidate agent reference. Agent = {0}; @@ -2376,29 +2487,8 @@ }); } - /// Get the next queue in a round-robin fashion, includes lazy initialization. - AMDGPUQueueTy &getNextQueue() { - uint32_t Current = NextQueue.fetch_add(1, std::memory_order_relaxed); - uint32_t Idx = Current % Queues.size(); - auto &Queue = Queues[Idx]; - // Only queue 0 has been initialized eagerly. Others might need lazy/late - // initialization. - if (Idx == 0) - return Queue; - - if (auto Err = Queue.initLazy(Agent, OMPX_QueueSize)) { - // Gracefully handle late initialization errors, but report them anyway. - REPORT("%s\n", toString(std::move(Err)).data()); - return Queues[0]; - } - return Queue; - } - private: - using AMDGPUStreamRef = AMDGPUResourceRef; using AMDGPUEventRef = AMDGPUResourceRef; - - using AMDGPUStreamManagerTy = GenericDeviceResourceManagerTy; using AMDGPUEventManagerTy = GenericDeviceResourceManagerTy; /// Envar for controlling the number of HSA queues per device. High number of @@ -2454,12 +2544,6 @@ /// Reference to the host device. AMDHostDeviceTy &HostDevice; - - /// List of device packet queues. - std::vector Queues; - - // The next queue to be used for a new stream. - std::atomic NextQueue = {0}; }; Error AMDGPUDeviceImageTy::loadExecutable(const AMDGPUDeviceTy &Device) { @@ -2531,7 +2615,7 @@ } AMDGPUStreamTy::AMDGPUStreamTy(AMDGPUDeviceTy &Device) - : Agent(Device.getAgent()), Queue(Device.getNextQueue()), + : Agent(Device.getAgent()), Queue(nullptr), SignalManager(Device.getSignalManager()), // Initialize the std::deque with some empty positions. Slots(32), NextSlot(0), SyncCycle(0), diff --git a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h --- a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h +++ b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h @@ -1108,7 +1108,7 @@ /// Deinitialize the resource pool and delete all resources. This function /// must be called before the destructor. - Error deinit() { + virtual Error deinit() { if (NextAvailable) DP("Missing %d resources to be returned\n", NextAvailable); @@ -1123,7 +1123,7 @@ } /// Get resource from the pool or create new resources. - ResourceRef getResource() { + virtual ResourceRef getResource() { const std::lock_guard Lock(Mutex); assert(NextAvailable <= ResourcePool.size() && @@ -1142,14 +1142,14 @@ } /// Return resource to the pool. - void returnResource(ResourceRef Resource) { + virtual void returnResource(ResourceRef Resource) { const std::lock_guard Lock(Mutex); assert(NextAvailable > 0 && "Resource pool is corrupted"); ResourcePool[--NextAvailable] = Resource; } -private: +protected: /// The resources between \p OldSize and \p NewSize need to be created or /// destroyed. The mutex is locked when this function is called. Error resizeResourcePoolImpl(uint32_t OldSize, uint32_t NewSize) {