diff --git a/openmp/libomptarget/plugins/cuda/src/rtl.cpp b/openmp/libomptarget/plugins/cuda/src/rtl.cpp --- a/openmp/libomptarget/plugins/cuda/src/rtl.cpp +++ b/openmp/libomptarget/plugins/cuda/src/rtl.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -80,7 +81,8 @@ }; /// Device environment data -/// Manually sync with the deviceRTL side for now, move to a dedicated header file later. +/// Manually sync with the deviceRTL side for now, move to a dedicated header +/// file later. struct omptarget_device_environmentTy { int32_t debug_level; }; @@ -89,16 +91,164 @@ /// FIXME: we may need this to be per device and per library. std::list KernelsList; +namespace { +bool checkResult(CUresult Err, const char *ErrMsg) { + if (Err == CUDA_SUCCESS) + return true; + + DP(ErrMsg); + CUDA_ERR_STRING(Err); + return false; +} +} // namespace + +class StreamManagerTy { + int NumberOfDevices; + // Per-device stream mutex + std::vector> StreamMtx; + // Per-device stream Id indicates the next available stream in the pool + std::vector NextStreamId; + // Per-device stream pool + std::vector> StreamPool; + // Pointer to per-device context + std::vector &ContextsPtr; + + // If there is no CUstream left in the pool, we will resize the pool to + // allocate more CUstream. This function should be called with device mutex, + // and we do not resize to smaller one. + void resizeStreamPool(const int DeviceId, const size_t NewSize) { + std::vector &Pool = StreamPool[DeviceId]; + const size_t CurrentSize = Pool.size(); + assert(NewSize > CurrentSize && "new size is not larger than current size"); + + Pool.resize(NewSize, nullptr); + + CUresult err = cuCtxSetCurrent(ContextsPtr[DeviceId]); + if (!checkResult(err, "Error when setting current CUDA context\n")) { + // We will return if cannot switch to the right context in case of + // creating bunch of streams that are not corresponding to the right + // device. The offloading will fail later because selected CUstream is + // nullptr. + return; + } + + for (size_t I = CurrentSize; I < NewSize; ++I) { + err = cuStreamCreate(&Pool[I], CU_STREAM_NON_BLOCKING); + checkResult(err, + "Error when creating CUDA stream to resize stream pool\n"); + } + } + +public: + StreamManagerTy(const int NumberOfDevices, std::vector &CtxPtr) + : NumberOfDevices(NumberOfDevices), ContextsPtr(CtxPtr) { + StreamPool.resize(NumberOfDevices); + NextStreamId.resize(NumberOfDevices); + StreamMtx.resize(NumberOfDevices); + + // Initially let's create 32 streams for each device + int EnvNumInitialStreams = 32; + char *envStr = getenv("LIBOMPTARGET_NUM_INITIAL_STREAMS"); + if (envStr) + EnvNumInitialStreams = std::stoi(envStr); + + // Initialize the stream pool for each device + for (std::vector &S : StreamPool) + S.resize(EnvNumInitialStreams); + + // Initialize the next stream id + std::fill(NextStreamId.begin(), NextStreamId.end(), 0); + + // Initialize stream mutex + for (std::unique_ptr &Ptr : StreamMtx) + Ptr = std::make_unique(); + } + + ~StreamManagerTy() { + // Destroy streams + for (int I = 0; I < NumberOfDevices; ++I) { + CUresult err = cuCtxSetCurrent(ContextsPtr[I]); + checkResult(err, "Error when setting current CUDA context\n"); + + for (CUstream &S : StreamPool[I]) { + if (!S) + continue; + err = cuStreamDestroy(S); + checkResult(err, "Error when destroying CUDA stream\n"); + } + } + } + + // Get a CUstream from pool. Per-device next stream id always points to the + // next available CUstream. That means, CUstreams [0, id-1] have been + // assigned, and [id,] are still available. If there is no CUstream left, we + // will ask more CUstreams from CUDA RT. Each time a CUstream is assigned, + // the id will increase one. + // xxxxxs+++++++++ + // ^ + // id + // After assignment, the pool becomes the following and s is assigned. + // xxxxxs+++++++++ + // ^ + // id + CUstream getStream(const int DeviceId) { + assert(DeviceId >= 0 && + static_cast(DeviceId) < NextStreamId.size() && + "Unexpected device id"); + + const std::lock_guard Lock(*StreamMtx[DeviceId]); + int &Id = NextStreamId[DeviceId]; + // No CUstream left in the pool, we need to request from CUDA RT + if (Id == StreamPool[DeviceId].size()) { + // By default we double the stream pool every time + resizeStreamPool(DeviceId, Id * 2); + } + return StreamPool[DeviceId][Id++]; + } + + // Return a CUstream back to pool. As mentioned above, per-device next + // stream is always points to the next available CUstream, so when we return + // a CUstream, we need to first decrease the id, and then copy the CUstream + // back. + // It is worth noting that, the order of streams return might be different + // from that they're assigned, that saying, at some point, there might be + // two identical CUstreams. + // xxax+a+++++ + // ^ + // id + // However, it doesn't matter, because they're always on the two sides of + // id. The left one will in the end be overwritten by another CUstream. + // Therefore, after several execution, the order of pool might be different + // from its initial state. + void returnStream(const int DeviceId, CUstream Stream) { + assert(DeviceId >= 0 && + static_cast(DeviceId) < NextStreamId.size() && + "Unexpected device id"); + + const std::lock_guard Lock(*StreamMtx[DeviceId]); + int &Id = NextStreamId[DeviceId]; + assert(Id > 0 && "Wrong stream ID"); + StreamPool[DeviceId][--Id] = Stream; + } + + void initializeDevice(int DeviceId) { + // This function should be called after setting right context + for (CUstream &Stream : StreamPool[DeviceId]) { + CUresult Err = cuStreamCreate(&Stream, CU_STREAM_NON_BLOCKING); + checkResult(Err, "Error when creating CUDA stream\n"); + } + } +}; + /// Class containing all the device information. class RTLDeviceInfoTy { std::vector> FuncGblEntries; - std::vector> NextStreamId; + std::shared_ptr StreamManager; public: int NumberOfDevices; std::vector Modules; std::vector Contexts; - std::vector> Streams; // Device properties std::vector ThreadsPerBlock; @@ -112,17 +262,31 @@ // OpenMP Environment properties int EnvNumTeams; int EnvTeamLimit; - int EnvNumStreams; // OpenMP Requires Flags int64_t RequiresFlags; - //static int EnvNumThreads; - static const int HardTeamLimit = 1<<16; // 64k + // static int EnvNumThreads; + static const int HardTeamLimit = 1 << 16; // 64k static const int HardThreadLimit = 1024; static const int DefaultNumTeams = 128; static const int DefaultNumThreads = 128; + std::shared_ptr getStreamManager() { return StreamManager; } + + CUstream getStream(const int DeviceId) { + return StreamManager->getStream(DeviceId); + } + + void returnStream(const int DeviceId, __tgt_async_info *AsyncInfoPtr) { + assert(AsyncInfoPtr && "AsyncInfoPtr is nullptr"); + assert(AsyncInfoPtr->Queue && "AsyncInfoPtr->Queue is nullptr"); + + StreamManager->returnStream( + DeviceId, reinterpret_cast(AsyncInfoPtr->Queue)); + AsyncInfoPtr->Queue = nullptr; + } + // Record entry point associated with device void addOffloadEntry(int32_t device_id, __tgt_offload_entry entry) { assert(device_id < (int32_t)FuncGblEntries.size() && @@ -178,15 +342,6 @@ E.Table.EntriesBegin = E.Table.EntriesEnd = 0; } - // Get the next stream on a given device in a round robin manner - CUstream &getNextStream(const int DeviceId) { - assert(DeviceId >= 0 && - static_cast(DeviceId) < NextStreamId.size() && - "Unexpected device id!"); - const unsigned int Id = NextStreamId[DeviceId]->fetch_add(1); - return Streams[DeviceId][Id % EnvNumStreams]; - } - RTLDeviceInfoTy() { #ifdef OMPTARGET_DEBUG if (char *envStr = getenv("LIBOMPTARGET_DEBUG")) { @@ -219,8 +374,6 @@ FuncGblEntries.resize(NumberOfDevices); Contexts.resize(NumberOfDevices); - Streams.resize(NumberOfDevices); - NextStreamId.resize(NumberOfDevices); ThreadsPerBlock.resize(NumberOfDevices); BlocksPerGrid.resize(NumberOfDevices); WarpSize.resize(NumberOfDevices); @@ -245,28 +398,17 @@ EnvNumTeams = -1; } - // By default let's create 256 streams per device - EnvNumStreams = 256; - envStr = getenv("LIBOMPTARGET_NUM_STREAMS"); - if (envStr) { - EnvNumStreams = std::stoi(envStr); - } - - // Initialize streams for each device - for (std::vector &S : Streams) { - S.resize(EnvNumStreams); - } - - // Initialize the next stream id - for (std::unique_ptr &Ptr : NextStreamId) { - Ptr = std::make_unique(0); - } + StreamManager = + std::make_shared(NumberOfDevices, Contexts); // Default state. RequiresFlags = OMP_REQ_UNDEFINED; } ~RTLDeviceInfoTy() { + // First destruct stream manager in case of Contexts is destructed before it + StreamManager = nullptr; + // Close modules for (auto &module : Modules) if (module) { @@ -277,24 +419,6 @@ } } - // Destroy streams before contexts - for (int I = 0; I < NumberOfDevices; ++I) { - CUresult err = cuCtxSetCurrent(Contexts[I]); - if (err != CUDA_SUCCESS) { - DP("Error when setting current CUDA context\n"); - CUDA_ERR_STRING(err); - } - - for (auto &S : Streams[I]) - if (S) { - err = cuStreamDestroy(S); - if (err != CUDA_SUCCESS) { - DP("Error when destroying CUDA stream\n"); - CUDA_ERR_STRING(err); - } - } - } - // Destroy contexts for (auto &ctx : Contexts) if (ctx) { @@ -310,14 +434,13 @@ static RTLDeviceInfoTy DeviceInfo; namespace { -CUstream selectStream(int32_t Id, __tgt_async_info *AsyncInfo) { - if (!AsyncInfo) - return DeviceInfo.getNextStream(Id); +CUstream getStream(int32_t DeviceId, __tgt_async_info *AsyncInfoPtr) { + assert(AsyncInfoPtr && "AsyncInfoPtr is nullptr"); - if (!AsyncInfo->Queue) - AsyncInfo->Queue = DeviceInfo.getNextStream(Id); + if (!AsyncInfoPtr->Queue) + AsyncInfoPtr->Queue = DeviceInfo.getStream(DeviceId); - return reinterpret_cast(AsyncInfo->Queue); + return reinterpret_cast(AsyncInfoPtr->Queue); } int32_t dataRetrieve(int32_t DeviceId, void *HstPtr, void *TgtPtr, int64_t Size, @@ -331,7 +454,7 @@ return OFFLOAD_FAIL; } - CUstream Stream = selectStream(DeviceId, AsyncInfoPtr); + CUstream Stream = getStream(DeviceId, AsyncInfoPtr); err = cuMemcpyDtoHAsync(HstPtr, (CUdeviceptr)TgtPtr, Size, Stream); if (err != CUDA_SUCCESS) { @@ -356,7 +479,7 @@ return OFFLOAD_FAIL; } - CUstream Stream = selectStream(DeviceId, AsyncInfoPtr); + CUstream Stream = getStream(DeviceId, AsyncInfoPtr); err = cuMemcpyHtoDAsync((CUdeviceptr)TgtPtr, HstPtr, Size, Stream); if (err != CUDA_SUCCESS) { @@ -413,13 +536,8 @@ CUDA_ERR_STRING(err); } - for (CUstream &Stream : DeviceInfo.Streams[device_id]) { - err = cuStreamCreate(&Stream, CU_STREAM_NON_BLOCKING); - if (err != CUDA_SUCCESS) { - DP("Error when creating CUDA stream\n"); - CUDA_ERR_STRING(err); - } - } + // Initialize stream pool + DeviceInfo.getStreamManager()->initializeDevice(device_id); // Query attributes to determine number of threads/block and blocks/grid. int maxGridDimX; @@ -894,7 +1012,7 @@ DP("Launch kernel with %d blocks and %d threads\n", cudaBlocksPerGrid, cudaThreadsPerBlock); - CUstream Stream = selectStream(device_id, async_info); + CUstream Stream = getStream(device_id, async_info); err = cuLaunchKernel(KernelInfo->Func, cudaBlocksPerGrid, 1, 1, cudaThreadsPerBlock, 1, 1, 0 /*bytes of shared memory*/, Stream, &args[0], 0); @@ -948,6 +1066,12 @@ CUDA_ERR_STRING(Err); return OFFLOAD_FAIL; } + + // Once the stream is synchronized, return it to stream pool and reset + // async_info. This is to make sure the synchronization only works for its own + // tasks. + DeviceInfo.returnStream(device_id, async_info); + return OFFLOAD_SUCCESS; }