diff --git a/openmp/libomptarget/plugins/cuda/src/rtl.cpp b/openmp/libomptarget/plugins/cuda/src/rtl.cpp --- a/openmp/libomptarget/plugins/cuda/src/rtl.cpp +++ b/openmp/libomptarget/plugins/cuda/src/rtl.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -92,13 +93,32 @@ /// Class containing all the device information. class RTLDeviceInfoTy { std::vector> FuncGblEntries; - std::vector> NextStreamId; + std::vector> StreamLock; + std::vector NextStreamId; + + // If there is no CUstream left in the pool, we will resize the pool to + // allocate more CUstream. This function should be called with device lock, + // and we do not resize to smaller one. + void resizeStreamPool(const int DeviceId, const size_t NewSize) { + std::vector &Pool = StreamPool[DeviceId]; + const size_t CurrentSize = Pool.size(); + assert(NewSize > CurrentSize && "new size is not larger than current size"); + Pool.resize(NewSize); + CUresult err; + for (size_t I = CurrentSize; I < NewSize; ++I) { + err = cuStreamCreate(&Pool[I], CU_STREAM_NON_BLOCKING); + if (err != CUDA_SUCCESS) { + DP("Error when creating CUDA stream to resize stream pool\n"); + CUDA_ERR_STRING(err); + } + } + } public: int NumberOfDevices; std::vector Modules; std::vector Contexts; - std::vector> Streams; + std::vector> StreamPool; // Device properties std::vector ThreadsPerBlock; @@ -112,7 +132,7 @@ // OpenMP Environment properties int EnvNumTeams; int EnvTeamLimit; - int EnvNumStreams; + int EnvNumInitialStreams; // OpenMP Requires Flags int64_t RequiresFlags; @@ -178,13 +198,32 @@ E.Table.EntriesBegin = E.Table.EntriesEnd = 0; } - // Get the next stream on a given device in a round robin manner - CUstream &getNextStream(const int DeviceId) { + CUstream getStream(const int DeviceId) { assert(DeviceId >= 0 && static_cast(DeviceId) < NextStreamId.size() && - "Unexpected device id!"); - const unsigned int Id = NextStreamId[DeviceId]->fetch_add(1); - return Streams[DeviceId][Id % EnvNumStreams]; + "Unexpected device id"); + { + const std::lock_guard Lock(*StreamLock[DeviceId]); + int &Id = NextStreamId[DeviceId]; + // No CUstream left in the pool, we need to request from CUDA RT + if (Id == StreamPool[DeviceId].size()) { + // By default we double the stream pool every time + resizeStreamPool(DeviceId, Id * 2); + } + return StreamPool[DeviceId][Id++]; + } + } + + void returnStream(const int DeviceId, CUstream Stream) { + assert(DeviceId >= 0 && + static_cast(DeviceId) < NextStreamId.size() && + "Unexpected device id"); + { + const std::lock_guard Lock(*StreamLock[DeviceId]); + int &Id = NextStreamId[DeviceId]; + assert(Id > 0 && "Wrong stream ID"); + StreamPool[DeviceId][--Id] = Stream; + } } RTLDeviceInfoTy() { @@ -219,7 +258,7 @@ FuncGblEntries.resize(NumberOfDevices); Contexts.resize(NumberOfDevices); - Streams.resize(NumberOfDevices); + StreamPool.resize(NumberOfDevices); NextStreamId.resize(NumberOfDevices); ThreadsPerBlock.resize(NumberOfDevices); BlocksPerGrid.resize(NumberOfDevices); @@ -245,21 +284,24 @@ EnvNumTeams = -1; } - // By default let's create 256 streams per device - EnvNumStreams = 256; - envStr = getenv("LIBOMPTARGET_NUM_STREAMS"); + // Initially let's create 32 streams for each device + EnvNumInitialStreams = 32; + envStr = getenv("LIBOMPTARGET_NUM_INITIAL_STREAMS"); if (envStr) { - EnvNumStreams = std::stoi(envStr); + EnvNumInitialStreams = std::stoi(envStr); } - // Initialize streams for each device - for (std::vector &S : Streams) { - S.resize(EnvNumStreams); + // Initialize the stream pool for each device + for (std::vector &S : StreamPool) { + S.resize(EnvNumInitialStreams); } // Initialize the next stream id - for (std::unique_ptr &Ptr : NextStreamId) { - Ptr = std::make_unique(0); + std::fill(NextStreamId.begin(), NextStreamId.end(), 0); + + // Initialize stream locks + for (std::unique_ptr &Ptr : StreamLock) { + Ptr = std::make_unique(); } // Default state. @@ -285,7 +327,7 @@ CUDA_ERR_STRING(err); } - for (auto &S : Streams[I]) + for (auto &S : StreamPool[I]) if (S) { err = cuStreamDestroy(S); if (err != CUDA_SUCCESS) { @@ -310,12 +352,12 @@ static RTLDeviceInfoTy DeviceInfo; namespace { -CUstream selectStream(int32_t Id, __tgt_async_info *AsyncInfo) { +CUstream getStream(int32_t Id, __tgt_async_info *AsyncInfo) { if (!AsyncInfo) - return DeviceInfo.getNextStream(Id); + return DeviceInfo.getStream(Id); if (!AsyncInfo->Queue) - AsyncInfo->Queue = DeviceInfo.getNextStream(Id); + AsyncInfo->Queue = DeviceInfo.getStream(Id); return reinterpret_cast(AsyncInfo->Queue); } @@ -363,7 +405,7 @@ CUDA_ERR_STRING(err); } - for (CUstream &Stream : DeviceInfo.Streams[device_id]) { + for (CUstream &Stream : DeviceInfo.StreamPool[device_id]) { err = cuStreamCreate(&Stream, CU_STREAM_NON_BLOCKING); if (err != CUDA_SUCCESS) { DP("Error when creating CUDA stream\n"); @@ -685,7 +727,7 @@ return OFFLOAD_FAIL; } - CUstream Stream = selectStream(device_id, async_info); + CUstream Stream = getStream(device_id, async_info); err = cuMemcpyHtoDAsync((CUdeviceptr)tgt_ptr, hst_ptr, size, Stream); if (err != CUDA_SUCCESS) { @@ -723,7 +765,7 @@ return OFFLOAD_FAIL; } - CUstream Stream = selectStream(device_id, async_info); + CUstream Stream = getStream(device_id, async_info); err = cuMemcpyDtoHAsync(hst_ptr, (CUdeviceptr)tgt_ptr, size, Stream); if (err != CUDA_SUCCESS) { @@ -870,7 +912,7 @@ DP("Launch kernel with %d blocks and %d threads\n", cudaBlocksPerGrid, cudaThreadsPerBlock); - CUstream Stream = selectStream(device_id, async_info); + CUstream Stream = getStream(device_id, async_info); err = cuLaunchKernel(KernelInfo->Func, cudaBlocksPerGrid, 1, 1, cudaThreadsPerBlock, 1, 1, 0 /*bytes of shared memory*/, Stream, &args[0], 0); @@ -911,6 +953,11 @@ CUDA_ERR_STRING(Err); return OFFLOAD_FAIL; } + // Once the stream is synchronized, return it to stream pool and reset + // async_info. This is to make sure the synchronization only works for its own + // tasks. + DeviceInfo.returnStream(device_id, Stream); + async_info->Queue = nullptr; return OFFLOAD_SUCCESS; }