diff --git a/openmp/libomptarget/plugins/cuda/src/rtl.cpp b/openmp/libomptarget/plugins/cuda/src/rtl.cpp --- a/openmp/libomptarget/plugins/cuda/src/rtl.cpp +++ b/openmp/libomptarget/plugins/cuda/src/rtl.cpp @@ -187,135 +187,152 @@ int NumThreads = 0; }; -class StreamManagerTy { - int NumberOfDevices; - // The initial size of stream pool - int EnvNumInitialStreams; - // Per-device stream mutex - std::vector> StreamMtx; - // Per-device stream Id indicates the next available stream in the pool - std::vector NextStreamId; - // Per-device stream pool - std::vector> StreamPool; - // Reference to per-device data - std::vector &DeviceData; +/// A generic pool of resources. +/// +/// `Ty` is the type of the resource. It should be copyable. +/// `AllocatorTy` should provide two functions: +/// - `int AllocatorTy::create(Ty &)`: create one resource. +/// - `int Allocator::destroy(Ty)`: destroy the resource. +/// Both functions return OFFLOAD_SUCCESS or OFFLOAD_FAIL accordingly. +template class ResourcePoolTy { + /// Index of the next available resource. + size_t Next = 0; + /// Mutex to guard the pool. + std::mutex Mutex; + /// Pool of resources. + std::vector Resources; + /// A reference to the corresponding allocator. + AllocatorTy &Allocator; - // If there is no CUstream left in the pool, we will resize the pool to - // allocate more CUstream. This function should be called with device mutex, - // and we do not resize to smaller one. - void resizeStreamPool(const int DeviceId, const size_t NewSize) { - std::vector &Pool = StreamPool[DeviceId]; - const size_t CurrentSize = Pool.size(); - assert(NewSize > CurrentSize && "new size is not larger than current size"); - - CUresult Err = cuCtxSetCurrent(DeviceData[DeviceId].Context); - if (!checkResult(Err, "Error returned from cuCtxSetCurrent\n")) { - // We will return if cannot switch to the right context in case of - // creating bunch of streams that are not corresponding to the right - // device. The offloading will fail later because selected CUstream is - // nullptr. - return; - } - - Pool.resize(NewSize, nullptr); - - for (size_t I = CurrentSize; I < NewSize; ++I) { - checkResult(cuStreamCreate(&Pool[I], CU_STREAM_NON_BLOCKING), - "Error returned from cuStreamCreate\n"); + /// If `Resources` is used up, we will fill in more resources. It assumes that + /// the new size `Size` should be always larger than the current size. + bool resize(size_t Size) { + auto CurSize = Resources.size(); + assert(Size > CurSize && "Unexpected smaller size"); + Resources.reserve(Size); + for (auto I = CurSize; I < Size; ++I) { + Ty NewItem; + int Ret = Allocator.create(NewItem); + if (Ret != OFFLOAD_SUCCESS) + return false; + Resources.push_back(NewItem); } + return true; } public: - StreamManagerTy(const int NumberOfDevices, - std::vector &DeviceData) - : NumberOfDevices(NumberOfDevices), EnvNumInitialStreams(32), - DeviceData(DeviceData) { - StreamPool.resize(NumberOfDevices); - NextStreamId.resize(NumberOfDevices); - StreamMtx.resize(NumberOfDevices); + ResourcePoolTy(AllocatorTy &A, size_t Size = 0) noexcept : Allocator(A) { + (void)resize(Size); + } + ~ResourcePoolTy() noexcept { + for (auto &R : Resources) + (void)Allocator.destroy(R); + } + + /// Get a resource from pool. `Next` always points to the next available + /// resource. That means, `[0, next-1]` have been assigned, and `[id,]` are + /// still available. If there is no resource left, we will ask for more. Each + /// time a resource is assigned, the id will increase one. + /// xxxxxs+++++++++ + /// ^ + /// Next + /// After assignment, the pool becomes the following and s is assigned. + /// xxxxxs+++++++++ + /// ^ + /// Next + int acquire(Ty &R) noexcept { + std::lock_guard LG(Mutex); + if (Next == Resources.size() && !resize(Resources.size() * 2)) + return OFFLOAD_FAIL; + + R = Resources[Next++]; + + return OFFLOAD_SUCCESS; + } + + /// Return the resource back to the pool. When we return a resource, we need + /// to first decrease `Next`, and then copy the resource back. It is worth + /// noting that, the order of resources return might be different from that + /// they're assigned, that saying, at some point, there might be two identical + /// resources. + /// xxax+a+++++ + /// ^ + /// Next + /// However, it doesn't matter, because they're always on the two sides of + /// `Next`. The left one will in the end be overwritten by another resource. + /// Therefore, after several execution, the order of pool might be different + /// from its initial state. + void release(Ty R) noexcept { + std::lock_guard LG(Mutex); + Resources[--Next] = R; + } +}; + +/// Allocator for CUstream. +class StreamAllocatorTy { + DeviceDataTy &DeviceData; + +public: + StreamAllocatorTy(DeviceDataTy &DD) noexcept : DeviceData(DD) {} + + int create(CUstream &Stream) noexcept { + if (!checkResult(cuCtxSetCurrent(DeviceData.Context), + "Error returned from cuCtxSetCurrent\n")) + return OFFLOAD_FAIL; + + if (!checkResult(cuStreamCreate(&Stream, CU_STREAM_NON_BLOCKING), + "Error returned from cuStreamCreate\n")) + return OFFLOAD_FAIL; + + return OFFLOAD_SUCCESS; + } + + int destroy(CUstream Stream) noexcept { + if (!checkResult(cuCtxSetCurrent(DeviceData.Context), + "Error returned from cuCtxSetCurrent\n")) + return OFFLOAD_FAIL; + if (!checkResult(cuStreamDestroy(Stream), + "Error returned from cuStreamDestroy\n")) + return OFFLOAD_FAIL; + + return OFFLOAD_SUCCESS; + } +}; + +class StreamPoolTy { + /// The initial size of stream pool. + int NumInitialStreams = 32; + /// Reference to per-device stream allocator. + std::vector StreamAllocators; + /// Per-device stream manager. + using PoolTy = ResourcePoolTy; + std::vector> Pools; + +public: + StreamPoolTy(int NumDevices, std::vector &DeviceData) + : Pools(NumDevices) { if (const char *EnvStr = getenv("LIBOMPTARGET_NUM_INITIAL_STREAMS")) - EnvNumInitialStreams = std::stoi(EnvStr); + NumInitialStreams = std::stoi(EnvStr); - // Initialize the next stream id - std::fill(NextStreamId.begin(), NextStreamId.end(), 0); - - // Initialize stream mutex - for (std::unique_ptr &Ptr : StreamMtx) - Ptr = std::make_unique(); + for (unsigned I = 0; I < NumDevices; ++I) + StreamAllocators.emplace_back(DeviceData[I]); } - ~StreamManagerTy() { - // Destroy streams - for (int I = 0; I < NumberOfDevices; ++I) { - checkResult(cuCtxSetCurrent(DeviceData[I].Context), - "Error returned from cuCtxSetCurrent\n"); - - for (CUstream &S : StreamPool[I]) { - if (S) - checkResult(cuStreamDestroy(S), - "Error returned from cuStreamDestroy\n"); - } - } + int acquire(int DeviceId, CUstream &Stream) { + return Pools[DeviceId]->acquire(Stream); } - // Get a CUstream from pool. Per-device next stream id always points to the - // next available CUstream. That means, CUstreams [0, id-1] have been - // assigned, and [id,] are still available. If there is no CUstream left, we - // will ask more CUstreams from CUDA RT. Each time a CUstream is assigned, - // the id will increase one. - // xxxxxs+++++++++ - // ^ - // id - // After assignment, the pool becomes the following and s is assigned. - // xxxxxs+++++++++ - // ^ - // id - CUstream getStream(const int DeviceId) { - const std::lock_guard Lock(*StreamMtx[DeviceId]); - int &Id = NextStreamId[DeviceId]; - // No CUstream left in the pool, we need to request from CUDA RT - if (Id == static_cast(StreamPool[DeviceId].size())) { - // By default we double the stream pool every time - resizeStreamPool(DeviceId, Id * 2); - } - return StreamPool[DeviceId][Id++]; + void release(int DeviceId, CUstream Stream) { + (void)Pools[DeviceId]->release(Stream); } - // Return a CUstream back to pool. As mentioned above, per-device next - // stream is always points to the next available CUstream, so when we return - // a CUstream, we need to first decrease the id, and then copy the CUstream - // back. - // It is worth noting that, the order of streams return might be different - // from that they're assigned, that saying, at some point, there might be - // two identical CUstreams. - // xxax+a+++++ - // ^ - // id - // However, it doesn't matter, because they're always on the two sides of - // id. The left one will in the end be overwritten by another CUstream. - // Therefore, after several execution, the order of pool might be different - // from its initial state. - void returnStream(const int DeviceId, CUstream Stream) { - const std::lock_guard Lock(*StreamMtx[DeviceId]); - int &Id = NextStreamId[DeviceId]; - assert(Id > 0 && "Wrong stream ID"); - StreamPool[DeviceId][--Id] = Stream; - } + bool initialize(int DeviceId) { + if (Pools[DeviceId]) + return true; - bool initializeDeviceStreamPool(const int DeviceId) { - assert(StreamPool[DeviceId].empty() && "stream pool has been initialized"); - - resizeStreamPool(DeviceId, EnvNumInitialStreams); - - // Check the size of stream pool - if (static_cast(StreamPool[DeviceId].size()) != EnvNumInitialStreams) - return false; - - // Check whether each stream is valid - for (CUstream &S : StreamPool[DeviceId]) - if (!S) - return false; + Pools[DeviceId] = + std::make_unique(StreamAllocators[DeviceId], NumInitialStreams); return true; } @@ -337,7 +354,7 @@ static constexpr const int DefaultNumTeams = 128; static constexpr const int DefaultNumThreads = 128; - std::unique_ptr StreamManager; + std::unique_ptr StreamPool; std::vector DeviceData; std::vector Modules; @@ -471,8 +488,13 @@ CUstream getStream(const int DeviceId, __tgt_async_info *AsyncInfo) const { assert(AsyncInfo && "AsyncInfo is nullptr"); - if (!AsyncInfo->Queue) - AsyncInfo->Queue = StreamManager->getStream(DeviceId); + if (!AsyncInfo->Queue) { + CUstream S; + if (StreamPool->acquire(DeviceId, S) != OFFLOAD_SUCCESS) + return nullptr; + + AsyncInfo->Queue = S; + } return reinterpret_cast(AsyncInfo->Queue); } @@ -533,8 +555,7 @@ DynamicMemorySize); } - StreamManager = - std::make_unique(NumberOfDevices, DeviceData); + StreamPool = std::make_unique(NumberOfDevices, DeviceData); for (int I = 0; I < NumberOfDevices; ++I) DeviceAllocators.emplace_back(I, DeviceData); @@ -556,7 +577,7 @@ for (auto &M : MemoryManagers) M.release(); - StreamManager = nullptr; + StreamPool = nullptr; for (CUmodule &M : Modules) // Close module @@ -627,7 +648,7 @@ return OFFLOAD_FAIL; // Initialize stream pool - if (!StreamManager->initializeDeviceStreamPool(DeviceId)) + if (!StreamPool->initialize(DeviceId)) return OFFLOAD_FAIL; // Query attributes to determine number of threads/block and blocks/grid. @@ -1195,8 +1216,7 @@ // Once the stream is synchronized, return it to stream pool and reset // AsyncInfo. This is to make sure the synchronization only works for its // own tasks. - StreamManager->returnStream(DeviceId, - reinterpret_cast(AsyncInfo->Queue)); + StreamPool->release(DeviceId, reinterpret_cast(AsyncInfo->Queue)); AsyncInfo->Queue = nullptr; if (Err != CUDA_SUCCESS) {