diff --git a/openmp/libomptarget/plugins/cuda/src/rtl.cpp b/openmp/libomptarget/plugins/cuda/src/rtl.cpp --- a/openmp/libomptarget/plugins/cuda/src/rtl.cpp +++ b/openmp/libomptarget/plugins/cuda/src/rtl.cpp @@ -187,137 +187,125 @@ int NumThreads = 0; }; -class StreamManagerTy { - int NumberOfDevices; - // The initial size of stream pool - int EnvNumInitialStreams; - // Per-device stream mutex - std::vector> StreamMtx; - // Per-device stream Id indicates the next available stream in the pool - std::vector NextStreamId; - // Per-device stream pool - std::vector> StreamPool; - // Reference to per-device data - std::vector &DeviceData; - - // If there is no CUstream left in the pool, we will resize the pool to - // allocate more CUstream. This function should be called with device mutex, - // and we do not resize to smaller one. - void resizeStreamPool(const int DeviceId, const size_t NewSize) { - std::vector &Pool = StreamPool[DeviceId]; - const size_t CurrentSize = Pool.size(); - assert(NewSize > CurrentSize && "new size is not larger than current size"); - - CUresult Err = cuCtxSetCurrent(DeviceData[DeviceId].Context); - if (!checkResult(Err, "Error returned from cuCtxSetCurrent\n")) { - // We will return if cannot switch to the right context in case of - // creating bunch of streams that are not corresponding to the right - // device. The offloading will fail later because selected CUstream is - // nullptr. - return; - } - - Pool.resize(NewSize, nullptr); +/// Resource allocator where \p T is the resource type. +/// Functions \p create and \p destroy return OFFLOAD_SUCCESS and OFFLOAD_FAIL +/// accordingly. The implementation should not raise any exception. +template class AllocatorTy { +public: + /// Create a resource and assign to R. + int create(T &R) noexcept; + /// Destroy the resource. + int destroy(T) noexcept; +}; - for (size_t I = CurrentSize; I < NewSize; ++I) { - checkResult(cuStreamCreate(&Pool[I], CU_STREAM_NON_BLOCKING), - "Error returned from cuStreamCreate\n"); - } - } +/// Allocator for CUstream. +template <> class AllocatorTy { + CUcontext Context; public: - StreamManagerTy(const int NumberOfDevices, - std::vector &DeviceData) - : NumberOfDevices(NumberOfDevices), EnvNumInitialStreams(32), - DeviceData(DeviceData) { - StreamPool.resize(NumberOfDevices); - NextStreamId.resize(NumberOfDevices); - StreamMtx.resize(NumberOfDevices); + AllocatorTy(CUcontext C) noexcept : Context(C) {} - if (const char *EnvStr = getenv("LIBOMPTARGET_NUM_INITIAL_STREAMS")) - EnvNumInitialStreams = std::stoi(EnvStr); + /// See AllocatorTy::create. + int create(CUstream &Stream) noexcept { + if (!checkResult(cuCtxSetCurrent(Context), + "Error returned from cuCtxSetCurrent\n")) + return OFFLOAD_FAIL; - // Initialize the next stream id - std::fill(NextStreamId.begin(), NextStreamId.end(), 0); + if (!checkResult(cuStreamCreate(&Stream, CU_STREAM_NON_BLOCKING), + "Error returned from cuStreamCreate\n")) + return OFFLOAD_FAIL; - // Initialize stream mutex - for (std::unique_ptr &Ptr : StreamMtx) - Ptr = std::make_unique(); + return OFFLOAD_SUCCESS; } - ~StreamManagerTy() { - // Destroy streams - for (int I = 0; I < NumberOfDevices; ++I) { - checkResult(cuCtxSetCurrent(DeviceData[I].Context), - "Error returned from cuCtxSetCurrent\n"); + /// See AllocatorTy::destroy. + int destroy(CUstream Stream) noexcept { + if (!checkResult(cuCtxSetCurrent(Context), + "Error returned from cuCtxSetCurrent\n")) + return OFFLOAD_FAIL; + if (!checkResult(cuStreamDestroy(Stream), + "Error returned from cuStreamDestroy\n")) + return OFFLOAD_FAIL; - for (CUstream &S : StreamPool[I]) { - if (S) - checkResult(cuStreamDestroy(S), - "Error returned from cuStreamDestroy\n"); - } - } + return OFFLOAD_SUCCESS; } +}; - // Get a CUstream from pool. Per-device next stream id always points to the - // next available CUstream. That means, CUstreams [0, id-1] have been - // assigned, and [id,] are still available. If there is no CUstream left, we - // will ask more CUstreams from CUDA RT. Each time a CUstream is assigned, - // the id will increase one. - // xxxxxs+++++++++ - // ^ - // id - // After assignment, the pool becomes the following and s is assigned. - // xxxxxs+++++++++ - // ^ - // id - CUstream getStream(const int DeviceId) { - const std::lock_guard Lock(*StreamMtx[DeviceId]); - int &Id = NextStreamId[DeviceId]; - // No CUstream left in the pool, we need to request from CUDA RT - if (Id == static_cast(StreamPool[DeviceId].size())) { - // By default we double the stream pool every time - resizeStreamPool(DeviceId, Id * 2); +/// A generic pool of resources where \p T is the resource type. +/// \p T should be copyable as the object is stored in \p std::vector . +template class ResourcePoolTy { + /// Index of the next available resource. + size_t Next = 0; + /// Mutex to guard the pool. + std::mutex Mutex; + /// Pool of resources. + std::vector Resources; + /// A reference to the corresponding allocator. + AllocatorTy Allocator; + + /// If `Resources` is used up, we will fill in more resources. It assumes that + /// the new size `Size` should be always larger than the current size. + bool resize(size_t Size) { + auto CurSize = Resources.size(); + assert(Size > CurSize && "Unexpected smaller size"); + Resources.reserve(Size); + for (auto I = CurSize; I < Size; ++I) { + T NewItem; + int Ret = Allocator.create(NewItem); + if (Ret != OFFLOAD_SUCCESS) + return false; + Resources.push_back(NewItem); } - return StreamPool[DeviceId][Id++]; + return true; } - // Return a CUstream back to pool. As mentioned above, per-device next - // stream is always points to the next available CUstream, so when we return - // a CUstream, we need to first decrease the id, and then copy the CUstream - // back. - // It is worth noting that, the order of streams return might be different - // from that they're assigned, that saying, at some point, there might be - // two identical CUstreams. - // xxax+a+++++ - // ^ - // id - // However, it doesn't matter, because they're always on the two sides of - // id. The left one will in the end be overwritten by another CUstream. - // Therefore, after several execution, the order of pool might be different - // from its initial state. - void returnStream(const int DeviceId, CUstream Stream) { - const std::lock_guard Lock(*StreamMtx[DeviceId]); - int &Id = NextStreamId[DeviceId]; - assert(Id > 0 && "Wrong stream ID"); - StreamPool[DeviceId][--Id] = Stream; +public: + ResourcePoolTy(AllocatorTy &&A, size_t Size = 0) noexcept + : Allocator(std::move(A)) { + (void)resize(Size); } - bool initializeDeviceStreamPool(const int DeviceId) { - assert(StreamPool[DeviceId].empty() && "stream pool has been initialized"); + ~ResourcePoolTy() noexcept { + for (auto &R : Resources) + (void)Allocator.destroy(R); + } - resizeStreamPool(DeviceId, EnvNumInitialStreams); + /// Get a resource from pool. `Next` always points to the next available + /// resource. That means, `[0, next-1]` have been assigned, and `[id,]` are + /// still available. If there is no resource left, we will ask for more. Each + /// time a resource is assigned, the id will increase one. + /// xxxxxs+++++++++ + /// ^ + /// Next + /// After assignment, the pool becomes the following and s is assigned. + /// xxxxxs+++++++++ + /// ^ + /// Next + int acquire(T &R) noexcept { + std::lock_guard LG(Mutex); + if (Next == Resources.size() && !resize(Resources.size() * 2)) + return OFFLOAD_FAIL; - // Check the size of stream pool - if (static_cast(StreamPool[DeviceId].size()) != EnvNumInitialStreams) - return false; + R = Resources[Next++]; - // Check whether each stream is valid - for (CUstream &S : StreamPool[DeviceId]) - if (!S) - return false; + return OFFLOAD_SUCCESS; + } - return true; + /// Return the resource back to the pool. When we return a resource, we need + /// to first decrease `Next`, and then copy the resource back. It is worth + /// noting that, the order of resources return might be different from that + /// they're assigned, that saying, at some point, there might be two identical + /// resources. + /// xxax+a+++++ + /// ^ + /// Next + /// However, it doesn't matter, because they're always on the two sides of + /// `Next`. The left one will in the end be overwritten by another resource. + /// Therefore, after several execution, the order of pool might be different + /// from its initial state. + void release(T R) noexcept { + std::lock_guard LG(Mutex); + Resources[--Next] = R; } }; @@ -331,13 +319,18 @@ int64_t RequiresFlags; // Amount of dynamic shared memory to use at launch. uint64_t DynamicMemorySize; + // Number of initial streams for each device. + int NumInitialStreams = 32; static constexpr const int HardTeamLimit = 1U << 16U; // 64k static constexpr const int HardThreadLimit = 1024; static constexpr const int DefaultNumTeams = 128; static constexpr const int DefaultNumThreads = 128; - std::unique_ptr StreamManager; + using StreamPoolTy = ResourcePoolTy; + using StreamAllocatorTy = AllocatorTy; + std::vector> StreamPool; + std::vector DeviceData; std::vector Modules; @@ -471,8 +464,13 @@ CUstream getStream(const int DeviceId, __tgt_async_info *AsyncInfo) const { assert(AsyncInfo && "AsyncInfo is nullptr"); - if (!AsyncInfo->Queue) - AsyncInfo->Queue = StreamManager->getStream(DeviceId); + if (!AsyncInfo->Queue) { + CUstream S; + if (StreamPool[DeviceId]->acquire(S) != OFFLOAD_SUCCESS) + return nullptr; + + AsyncInfo->Queue = S; + } return reinterpret_cast(AsyncInfo->Queue); } @@ -509,6 +507,7 @@ } DeviceData.resize(NumberOfDevices); + StreamPool.resize(NumberOfDevices); // Get environment variables regarding teams if (const char *EnvStr = getenv("OMP_TEAM_LIMIT")) { @@ -532,9 +531,11 @@ DP("Parsed LIBOMPTARGET_SHARED_MEMORY_SIZE = %" PRIu64 "\n", DynamicMemorySize); } - - StreamManager = - std::make_unique(NumberOfDevices, DeviceData); + if (const char *EnvStr = getenv("LIBOMPTARGET_NUM_INITIAL_STREAMS")) { + // LIBOMPTARGET_NUM_INITIAL_STREAMS has been set + NumInitialStreams = std::stoi(EnvStr); + DP("Parsed LIBOMPTARGET_NUM_INITIAL_STREAMS=%d\n", NumInitialStreams); + } for (int I = 0; I < NumberOfDevices; ++I) DeviceAllocators.emplace_back(I, DeviceData); @@ -556,13 +557,14 @@ for (auto &M : MemoryManagers) M.release(); - StreamManager = nullptr; - for (CUmodule &M : Modules) // Close module if (M) checkResult(cuModuleUnload(M), "Error returned from cuModuleUnload\n"); + for (auto &S : StreamPool) + S = nullptr; + for (DeviceDataTy &D : DeviceData) { // Destroy context if (D.Context) { @@ -627,8 +629,9 @@ return OFFLOAD_FAIL; // Initialize stream pool - if (!StreamManager->initializeDeviceStreamPool(DeviceId)) - return OFFLOAD_FAIL; + if (!StreamPool[DeviceId]) + StreamPool[DeviceId] = std::make_unique( + StreamAllocatorTy(DeviceData[DeviceId].Context), NumInitialStreams); // Query attributes to determine number of threads/block and blocks/grid. int MaxGridDimX; @@ -1195,8 +1198,7 @@ // Once the stream is synchronized, return it to stream pool and reset // AsyncInfo. This is to make sure the synchronization only works for its // own tasks. - StreamManager->returnStream(DeviceId, - reinterpret_cast(AsyncInfo->Queue)); + StreamPool[DeviceId]->release(reinterpret_cast(AsyncInfo->Queue)); AsyncInfo->Queue = nullptr; if (Err != CUDA_SUCCESS) {