diff --git a/openmp/libomptarget/plugins/cuda/src/rtl.cpp b/openmp/libomptarget/plugins/cuda/src/rtl.cpp --- a/openmp/libomptarget/plugins/cuda/src/rtl.cpp +++ b/openmp/libomptarget/plugins/cuda/src/rtl.cpp @@ -187,135 +187,141 @@ int NumThreads = 0; }; -class StreamManagerTy { - int NumberOfDevices; - // The initial size of stream pool - int EnvNumInitialStreams; - // Per-device stream mutex - std::vector> StreamMtx; - // Per-device stream Id indicates the next available stream in the pool - std::vector NextStreamId; - // Per-device stream pool - std::vector> StreamPool; - // Reference to per-device data - std::vector &DeviceData; +/// A resource manager serving as a pool of resources. +/// +/// `Ty` is the type of the resource. It should be copyable. +/// `AllocatorTy` should provide two functions: +/// - `Ty AllocatorTy::allocate()`: allocate one resource. +/// - `void AllocatorTy::deallocate(Ty)`: deallocate the resource. +template class ResourceManagerTy { + /// Index of the next available resource. + size_t Next = 0; + /// Mutex to gaurd the pool. + std::mutex Mutex; + /// Pool of resources. + std::vector Resources; + /// A reference to the corresponding allocator. + AllocatorTy &Allocator; - // If there is no CUstream left in the pool, we will resize the pool to - // allocate more CUstream. This function should be called with device mutex, - // and we do not resize to smaller one. - void resizeStreamPool(const int DeviceId, const size_t NewSize) { - std::vector &Pool = StreamPool[DeviceId]; - const size_t CurrentSize = Pool.size(); - assert(NewSize > CurrentSize && "new size is not larger than current size"); - - CUresult Err = cuCtxSetCurrent(DeviceData[DeviceId].Context); - if (!checkResult(Err, "Error returned from cuCtxSetCurrent\n")) { - // We will return if cannot switch to the right context in case of - // creating bunch of streams that are not corresponding to the right - // device. The offloading will fail later because selected CUstream is - // nullptr. - return; - } - - Pool.resize(NewSize, nullptr); - - for (size_t I = CurrentSize; I < NewSize; ++I) { - checkResult(cuStreamCreate(&Pool[I], CU_STREAM_NON_BLOCKING), - "Error returned from cuStreamCreate\n"); - } + /// If `Resouces` is used up, we will fill in more resources. It assumes that + /// the new size `Size` should be always larger than the current size. + void resizeTo(size_t Size) { + auto CurSize = Resources.size(); + assert(Size > CurSize && "Unexpected smaller size"); + Resources.reserve(Size); + for (size_t I = CurSize; I < Size; ++I) + Resources.push_back(Allocator.allocate()); } +public: + ResourceManagerTy(AllocatorTy &A, size_t Size = 0) noexcept : Allocator(A) { + resizeTo(Size); + } + + ~ResourceManagerTy() noexcept { + for (auto &R : Resources) + Allocator.deallocate(R); + } + + /// Get a resource from pool. `Next` always points to the next available + /// resource. That means, `[0, next-1]` have been assigned, and `[id,]` are + /// still available. If there is no resource left, we will ask for more. Each + /// time a resource is assigned, the id will increase one. + /// xxxxxs+++++++++ + /// ^ + /// Next + /// After assignment, the pool becomes the following and s is assigned. + /// xxxxxs+++++++++ + /// ^ + /// Next + Ty get() noexcept { + std::lock_guard LG(Mutex); + if (Next == Resources.size()) + resizeTo(Resources.size() * 2); + + return Resources[Next++]; + } + + /// Return the resource back to the pool. When we return a resource, we need + /// to first decrease `Next`, and then copy the resource back. It is worth + /// noting that, the order of resources return might be different from that + /// they're assigned, that saying, at some point, there might be two identical + /// resources. + /// xxax+a+++++ + /// ^ + /// Next + /// However, it doesn't matter, because they're always on the two sides of + /// `Next`. The left one will in the end be overwritten by another resource. + /// Therefore, after several execution, the order of pool might be different + /// from its initial state. + void put(Ty R) noexcept { + std::lock_guard LG(Mutex); + Resources[--Next] = R; + } +}; + +/// Allocator for CUstream. +class StreamAllocatorTy { + DeviceDataTy &DeviceData; + +public: + StreamAllocatorTy(DeviceDataTy &DD) noexcept : DeviceData(DD) {} + + CUstream allocate() noexcept { + if (!checkResult(cuCtxSetCurrent(DeviceData.Context), + "Error returned from cuCtxSetCurrent\n")) + return nullptr; + + CUstream Stream; + if (!checkResult(cuStreamCreate(&Stream, CU_STREAM_NON_BLOCKING), + "Error returned from cuStreamCreate\n")) + return nullptr; + + return Stream; + } + + void deallocate(CUstream Stream) noexcept { + checkResult(cuCtxSetCurrent(DeviceData.Context), + "Error returned from cuCtxSetCurrent\n"); + checkResult(cuStreamDestroy(Stream), + "Error returned from cuStreamDestroy\n"); + } +}; + +class StreamManagerTy { + /// The initial size of stream pool + int EnvNumInitialStreams; + /// Reference to per-device stream allocator + std::vector StreamAllocator; + /// Per-device stream manager. + using RMTy = ResourceManagerTy; + std::vector> ResourceManagers; + public: StreamManagerTy(const int NumberOfDevices, std::vector &DeviceData) - : NumberOfDevices(NumberOfDevices), EnvNumInitialStreams(32), - DeviceData(DeviceData) { - StreamPool.resize(NumberOfDevices); - NextStreamId.resize(NumberOfDevices); - StreamMtx.resize(NumberOfDevices); - + : EnvNumInitialStreams(32), ResourceManagers(NumberOfDevices) { if (const char *EnvStr = getenv("LIBOMPTARGET_NUM_INITIAL_STREAMS")) EnvNumInitialStreams = std::stoi(EnvStr); - // Initialize the next stream id - std::fill(NextStreamId.begin(), NextStreamId.end(), 0); - - // Initialize stream mutex - for (std::unique_ptr &Ptr : StreamMtx) - Ptr = std::make_unique(); + for (unsigned I = 0; I < NumberOfDevices; ++I) + StreamAllocator.emplace_back(DeviceData[I]); } - ~StreamManagerTy() { - // Destroy streams - for (int I = 0; I < NumberOfDevices; ++I) { - checkResult(cuCtxSetCurrent(DeviceData[I].Context), - "Error returned from cuCtxSetCurrent\n"); - - for (CUstream &S : StreamPool[I]) { - if (S) - checkResult(cuStreamDestroy(S), - "Error returned from cuStreamDestroy\n"); - } - } + CUstream getStream(int DeviceId) { + return ResourceManagers[DeviceId]->get(); } - // Get a CUstream from pool. Per-device next stream id always points to the - // next available CUstream. That means, CUstreams [0, id-1] have been - // assigned, and [id,] are still available. If there is no CUstream left, we - // will ask more CUstreams from CUDA RT. Each time a CUstream is assigned, - // the id will increase one. - // xxxxxs+++++++++ - // ^ - // id - // After assignment, the pool becomes the following and s is assigned. - // xxxxxs+++++++++ - // ^ - // id - CUstream getStream(const int DeviceId) { - const std::lock_guard Lock(*StreamMtx[DeviceId]); - int &Id = NextStreamId[DeviceId]; - // No CUstream left in the pool, we need to request from CUDA RT - if (Id == static_cast(StreamPool[DeviceId].size())) { - // By default we double the stream pool every time - resizeStreamPool(DeviceId, Id * 2); - } - return StreamPool[DeviceId][Id++]; + void returnStream(int DeviceId, CUstream Stream) { + ResourceManagers[DeviceId]->put(Stream); } - // Return a CUstream back to pool. As mentioned above, per-device next - // stream is always points to the next available CUstream, so when we return - // a CUstream, we need to first decrease the id, and then copy the CUstream - // back. - // It is worth noting that, the order of streams return might be different - // from that they're assigned, that saying, at some point, there might be - // two identical CUstreams. - // xxax+a+++++ - // ^ - // id - // However, it doesn't matter, because they're always on the two sides of - // id. The left one will in the end be overwritten by another CUstream. - // Therefore, after several execution, the order of pool might be different - // from its initial state. - void returnStream(const int DeviceId, CUstream Stream) { - const std::lock_guard Lock(*StreamMtx[DeviceId]); - int &Id = NextStreamId[DeviceId]; - assert(Id > 0 && "Wrong stream ID"); - StreamPool[DeviceId][--Id] = Stream; - } + bool initializeDeviceStreamPool(int DeviceId) { + if (ResourceManagers[DeviceId]) + return true; - bool initializeDeviceStreamPool(const int DeviceId) { - assert(StreamPool[DeviceId].empty() && "stream pool has been initialized"); - - resizeStreamPool(DeviceId, EnvNumInitialStreams); - - // Check the size of stream pool - if (static_cast(StreamPool[DeviceId].size()) != EnvNumInitialStreams) - return false; - - // Check whether each stream is valid - for (CUstream &S : StreamPool[DeviceId]) - if (!S) - return false; + ResourceManagers[DeviceId] = + std::make_unique(StreamAllocator[DeviceId], EnvNumInitialStreams); return true; }