diff --git a/openmp/docs/design/Runtimes.rst b/openmp/docs/design/Runtimes.rst --- a/openmp/docs/design/Runtimes.rst +++ b/openmp/docs/design/Runtimes.rst @@ -1193,7 +1193,7 @@ operations (e.g., kernel launches and memory copies) that are executed sequentially. Parallelism is achieved by featuring multiple streams. The ``libomptarget`` leverages streams to exploit parallelism between plugin -operations. The default value is ``32``. +operations. The default value is ``1``, more streams are created as needed. LIBOMPTARGET_NUM_INITIAL_EVENTS """"""""""""""""""""""""""""""" @@ -1201,7 +1201,8 @@ This environment variable sets the number of pre-created events in the plugin (if supported) at initialization. More events will be created dynamically throughout the execution if needed. An event is used to synchronize -a stream with another efficiently. The default value is ``32``. +a stream with another efficiently. The default value is ``1``, more events are +created as needed. LIBOMPTARGET_LOCK_MAPPED_HOST_BUFFERS """"""""""""""""""""""""""""""""""""" diff --git a/openmp/libomptarget/include/Utilities.h b/openmp/libomptarget/include/Utilities.h --- a/openmp/libomptarget/include/Utilities.h +++ b/openmp/libomptarget/include/Utilities.h @@ -83,6 +83,12 @@ } } + Envar &operator=(const Ty &V) { + Data = V; + Initialized = true; + return *this; + } + /// Get the definitive value. const Ty &get() const { // Throw a runtime error in case this envar is not initialized. diff --git a/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp b/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp --- a/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp +++ b/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp @@ -583,10 +583,12 @@ /// Class holding an HSA queue to submit kernel and barrier packets. struct AMDGPUQueueTy { /// Create an empty queue. - AMDGPUQueueTy() : Queue(nullptr), Mutex() {} + AMDGPUQueueTy() : Queue(nullptr), Mutex(), NumUsers(0) {} - /// Initialize a new queue belonging to a specific agent. + /// Lazily initialize a new queue belonging to a specific agent. Error init(hsa_agent_t Agent, int32_t QueueSize) { + if (Queue) + return Plugin::success(); hsa_status_t Status = hsa_queue_create(Agent, QueueSize, HSA_QUEUE_TYPE_MULTI, callbackError, nullptr, UINT32_MAX, UINT32_MAX, &Queue); @@ -595,10 +597,22 @@ /// Deinitialize the queue and destroy its resources. Error deinit() { + std::lock_guard Lock(Mutex); + if (!Queue) + return Plugin::success(); hsa_status_t Status = hsa_queue_destroy(Queue); return Plugin::check(Status, "Error in hsa_queue_destroy: %s"); } + /// Returns if this queue is considered busy + bool isBusy() const { return NumUsers > 0; } + + /// Decrement user count of the queue object + void removeUser() { --NumUsers; } + + /// Increase user count of the queue object + void addUser() { ++NumUsers; } + /// Push a kernel launch to the queue. The kernel launch requires an output /// signal and can define an optional input signal (nullptr if none). Error pushKernelLaunch(const AMDGPUKernelTy &Kernel, void *KernelArgs, @@ -611,6 +625,7 @@ // the addition of other packets to the queue. The following piece of code // should be lightweight; do not block the thread, allocate memory, etc. std::lock_guard Lock(Mutex); + assert(Queue && "Interacted with a non-initialized queue!"); // Avoid defining the input dependency if already satisfied. if (InputSignal && !InputSignal->load()) @@ -659,6 +674,7 @@ const AMDGPUSignalTy *InputSignal2) { // Lock the queue during the packet publishing process. std::lock_guard Lock(Mutex); + assert(Queue && "Interacted with a non-initialized queue!"); // Push the barrier with the lock acquired. return pushBarrierImpl(OutputSignal, InputSignal1, InputSignal2); @@ -777,6 +793,9 @@ /// TODO: There are other more advanced approaches to avoid this mutex using /// atomic operations. We can further investigate it if this is a bottleneck. std::mutex Mutex; + + /// Indicates that the queue is busy when > 0 + int NumUsers; }; /// Struct that implements a stream of asynchronous operations for AMDGPU @@ -886,7 +905,7 @@ hsa_agent_t Agent; /// The queue that the stream uses to launch kernels. - AMDGPUQueueTy &Queue; + AMDGPUQueueTy *Queue; /// The manager of signals to reuse signals. AMDGPUSignalManagerTy &SignalManager; @@ -978,6 +997,9 @@ /// signal of the current stream, and 2) the last signal of the other stream. /// Use a barrier packet with two input signals. Error waitOnStreamOperation(AMDGPUStreamTy &OtherStream, uint32_t Slot) { + if (Queue == nullptr) + return Plugin::error("Target queue was nullptr"); + /// The signal that we must wait from the other stream. AMDGPUSignalTy *OtherSignal = OtherStream.Slots[Slot].Signal; @@ -999,7 +1021,7 @@ return Err; // Push a barrier into the queue with both input signals. - return Queue.pushBarrier(OutputSignal, InputSignal, OtherSignal); + return Queue->pushBarrier(OutputSignal, InputSignal, OtherSignal); } /// Callback for running a specific asynchronous operation. This callback is @@ -1085,6 +1107,9 @@ uint32_t NumThreads, uint64_t NumBlocks, uint32_t GroupSize, AMDGPUMemoryManagerTy &MemoryManager) { + if (Queue == nullptr) + return Plugin::error("Target queue was nullptr"); + // Retrieve an available signal for the operation's output. AMDGPUSignalTy *OutputSignal = nullptr; if (auto Err = SignalManager.getResource(OutputSignal)) @@ -1102,8 +1127,8 @@ return Err; // Push the kernel with the output signal and an input signal (optional) - return Queue.pushKernelLaunch(Kernel, KernelArgs, NumThreads, NumBlocks, - GroupSize, OutputSignal, InputSignal); + return Queue->pushKernelLaunch(Kernel, KernelArgs, NumThreads, NumBlocks, + GroupSize, OutputSignal, InputSignal); } /// Push an asynchronous memory copy between pinned memory buffers. @@ -1331,6 +1356,8 @@ /// Make the stream wait on an event. Error waitEvent(const AMDGPUEventTy &Event); + + friend struct AMDGPUStreamManagerTy; }; /// Class representing an event on AMDGPU. The event basically stores some @@ -1428,6 +1455,99 @@ return waitOnStreamOperation(RecordedStream, Event.RecordedSlot); } +struct AMDGPUStreamManagerTy final + : GenericDeviceResourceManagerTy> { + using ResourceRef = AMDGPUResourceRef; + using ResourcePoolTy = GenericDeviceResourceManagerTy; + + AMDGPUStreamManagerTy(GenericDeviceTy &Device, hsa_agent_t HSAAgent) + : GenericDeviceResourceManagerTy(Device), NextQueue(0), Agent(HSAAgent) {} + + Error init(uint32_t InitialSize, int NumHSAQueues, int HSAQueueSize) { + Queues = std::vector(NumHSAQueues); + QueueSize = HSAQueueSize; + MaxNumQueues = NumHSAQueues; + // Initialize one queue eagerly + if (auto Err = Queues.front().init(Agent, QueueSize)) + return Err; + + return GenericDeviceResourceManagerTy::init(InitialSize); + } + + /// Deinitialize the resource pool and delete all resources. This function + /// must be called before the destructor. + Error deinit() override { + // De-init all queues + for (AMDGPUQueueTy &Queue : Queues) { + if (auto Err = Queue.deinit()) + return Err; + } + + return GenericDeviceResourceManagerTy::deinit(); + } + + /// Get a single stream from the pool or create new resources. + virtual Error getResource(AMDGPUStreamTy *&StreamHandle) override { + return getResourcesImpl(1, &StreamHandle, [this](AMDGPUStreamTy *&Handle) { + return assignNextQueue(Handle); + }); + } + + /// Return stream to the pool. + virtual Error returnResource(AMDGPUStreamTy *StreamHandle) override { + return returnResourceImpl(StreamHandle, [](AMDGPUStreamTy *Handle) { + Handle->Queue->removeUser(); + return Plugin::success(); + }); + } + +private: + /// Search for and assign an prefereably idle queue to the given Stream. If + /// there is no queue without current users, resort to round robin selection. + inline Error assignNextQueue(AMDGPUStreamTy *Stream) { + uint32_t StartIndex = NextQueue % MaxNumQueues; + AMDGPUQueueTy *Q = nullptr; + + for (int i = 0; i < MaxNumQueues; ++i) { + Q = &Queues[StartIndex++]; + if (StartIndex == MaxNumQueues) + StartIndex = 0; + + if (Q->isBusy()) + continue; + else { + if (auto Err = Q->init(Agent, QueueSize)) + return Err; + + Q->addUser(); + Stream->Queue = Q; + return Plugin::success(); + } + } + + // All queues busy: Round robin (StartIndex has the initial value again) + Queues[StartIndex].addUser(); + Stream->Queue = &Queues[StartIndex]; + ++NextQueue; + return Plugin::success(); + } + + /// The next queue index to use for round robin selection. + uint32_t NextQueue; + + /// The queues which are assigned to requested streams. + std::vector Queues; + + /// The corresponding device as HSA agent. + hsa_agent_t Agent; + + /// The maximum number of queues. + int MaxNumQueues; + + /// The size of created queues. + int QueueSize; +}; + /// Abstract class that holds the common members of the actual kernel devices /// and the host device. Both types should inherit from this class. struct AMDGenericDeviceTy { @@ -1607,9 +1727,8 @@ OMPX_InitialNumSignals("LIBOMPTARGET_AMDGPU_NUM_INITIAL_HSA_SIGNALS", 64), OMPX_StreamBusyWait("LIBOMPTARGET_AMDGPU_STREAM_BUSYWAIT", 2000000), - AMDGPUStreamManager(*this), AMDGPUEventManager(*this), - AMDGPUSignalManager(*this), Agent(Agent), HostDevice(HostDevice), - Queues() {} + AMDGPUStreamManager(*this, Agent), AMDGPUEventManager(*this), + AMDGPUSignalManager(*this), Agent(Agent), HostDevice(HostDevice) {} ~AMDGPUDeviceTy() {} @@ -1676,17 +1795,12 @@ return Err; // Compute the number of queues and their size. - const uint32_t NumQueues = std::min(OMPX_NumQueues.get(), MaxQueues); - const uint32_t QueueSize = std::min(OMPX_QueueSize.get(), MaxQueueSize); - - // Construct and initialize each device queue. - Queues = std::vector(NumQueues); - for (AMDGPUQueueTy &Queue : Queues) - if (auto Err = Queue.init(Agent, QueueSize)) - return Err; + OMPX_NumQueues = std::max(1U, std::min(OMPX_NumQueues.get(), MaxQueues)); + OMPX_QueueSize = std::min(OMPX_QueueSize.get(), MaxQueueSize); // Initialize stream pool. - if (auto Err = AMDGPUStreamManager.init(OMPX_InitialNumStreams)) + if (auto Err = AMDGPUStreamManager.init(OMPX_InitialNumStreams, + OMPX_NumQueues, OMPX_QueueSize)) return Err; // Initialize event pool. @@ -1725,11 +1839,6 @@ } } - for (AMDGPUQueueTy &Queue : Queues) { - if (auto Err = Queue.deinit()) - return Err; - } - // Invalidate agent reference. Agent = {0}; @@ -2416,19 +2525,8 @@ }); } - /// Get the next queue in a round-robin fashion. - AMDGPUQueueTy &getNextQueue() { - static std::atomic NextQueue(0); - - uint32_t Current = NextQueue.fetch_add(1, std::memory_order_relaxed); - return Queues[Current % Queues.size()]; - } - private: - using AMDGPUStreamRef = AMDGPUResourceRef; using AMDGPUEventRef = AMDGPUResourceRef; - - using AMDGPUStreamManagerTy = GenericDeviceResourceManagerTy; using AMDGPUEventManagerTy = GenericDeviceResourceManagerTy; /// Envar for controlling the number of HSA queues per device. High number of @@ -2484,9 +2582,6 @@ /// Reference to the host device. AMDHostDeviceTy &HostDevice; - - /// List of device packet queues. - std::vector Queues; }; Error AMDGPUDeviceImageTy::loadExecutable(const AMDGPUDeviceTy &Device) { @@ -2558,7 +2653,7 @@ } AMDGPUStreamTy::AMDGPUStreamTy(AMDGPUDeviceTy &Device) - : Agent(Device.getAgent()), Queue(Device.getNextQueue()), + : Agent(Device.getAgent()), Queue(nullptr), SignalManager(Device.getSignalManager()), Device(Device), // Initialize the std::deque with some empty positions. Slots(32), NextSlot(0), SyncCycle(0), RPCServer(nullptr), diff --git a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h --- a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h +++ b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h @@ -1168,7 +1168,7 @@ /// Deinitialize the resource pool and delete all resources. This function /// must be called before the destructor. - Error deinit() { + virtual Error deinit() { if (NextAvailable) DP("Missing %d resources to be returned\n", NextAvailable); @@ -1252,7 +1252,7 @@ return Plugin::success(); } -private: +protected: /// The resources between \p OldSize and \p NewSize need to be created or /// destroyed. The mutex is locked when this function is called. Error resizeResourcePoolImpl(uint32_t OldSize, uint32_t NewSize) { diff --git a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.cpp b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.cpp --- a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.cpp +++ b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.cpp @@ -396,9 +396,9 @@ // device initialization. These cannot be consulted until the device is // initialized correctly. We intialize them in GenericDeviceTy::init(). OMPX_TargetStackSize(), OMPX_TargetHeapSize(), - // By default, the initial number of streams and events are 32. - OMPX_InitialNumStreams("LIBOMPTARGET_NUM_INITIAL_STREAMS", 32), - OMPX_InitialNumEvents("LIBOMPTARGET_NUM_INITIAL_EVENTS", 32), + // By default, the initial number of streams and events is 1. + OMPX_InitialNumStreams("LIBOMPTARGET_NUM_INITIAL_STREAMS", 1), + OMPX_InitialNumEvents("LIBOMPTARGET_NUM_INITIAL_EVENTS", 1), DeviceId(DeviceId), GridValues(OMPGridValues), PeerAccesses(NumDevices, PeerAccessState::PENDING), PeerAccessesLock(), PinnedAllocs(*this), RPCServer(nullptr) {