Index: openmp/libomptarget/plugins/cuda/src/rtl.cpp =================================================================== --- openmp/libomptarget/plugins/cuda/src/rtl.cpp +++ openmp/libomptarget/plugins/cuda/src/rtl.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -91,14 +92,166 @@ /// Class containing all the device information. class RTLDeviceInfoTy { + class StreamManagerTy { + int NumberOfDevices; + // Per-device stream mutex + std::vector> StreamMtx; + // Per-device stream Id indicates the next available stream in the pool + std::vector NextStreamId; + // Per-device stream pool + std::vector> StreamPool; + // Pointer to per-device context + std::vector *ContextsPtr; + + // If there is no CUstream left in the pool, we will resize the pool to + // allocate more CUstream. This function should be called with device mutex, + // and we do not resize to smaller one. + void resizeStreamPool(const int DeviceId, const size_t NewSize) { + std::vector &Pool = StreamPool[DeviceId]; + const size_t CurrentSize = Pool.size(); + assert(NewSize > CurrentSize && + "new size is not larger than current size"); + + Pool.resize(NewSize, nullptr); + + CUresult err = cuCtxSetCurrent((*ContextsPtr)[DeviceId]); + if (err != CUDA_SUCCESS) { + DP("Error when setting current CUDA context\n"); + CUDA_ERR_STRING(err); + // We will return if cannot switch to the right context in case of + // creating bunch of streams that are not corresponding to the right + // device. The offloading will fail later because selected CUstream is + // nullptr. + return; + } + + for (size_t I = CurrentSize; I < NewSize; ++I) { + err = cuStreamCreate(&Pool[I], CU_STREAM_NON_BLOCKING); + if (err != CUDA_SUCCESS) { + DP("Error when creating CUDA stream to resize stream pool\n"); + CUDA_ERR_STRING(err); + } + } + } + + public: + // Shilei: I don't like to put this friend function here but it seems like + // the best way not to refine the whole file. + friend int32_t __tgt_rtl_init_device(int32_t device_id); + + StreamManagerTy(const int NumberOfDevices, std::vector *CtxPtr) + : NumberOfDevices(NumberOfDevices), ContextsPtr(CtxPtr) { + StreamPool.resize(NumberOfDevices); + NextStreamId.resize(NumberOfDevices); + StreamMtx.resize(NumberOfDevices); + + // Initially let's create 32 streams for each device + int EnvNumInitialStreams = 32; + char *envStr = getenv("LIBOMPTARGET_NUM_INITIAL_STREAMS"); + if (envStr) { + EnvNumInitialStreams = std::stoi(envStr); + } + + // Initialize the stream pool for each device + for (std::vector &S : StreamPool) { + S.resize(EnvNumInitialStreams); + } + + // Initialize the next stream id + std::fill(NextStreamId.begin(), NextStreamId.end(), 0); + + // Initialize stream mutex + for (std::unique_ptr &Ptr : StreamMtx) { + Ptr = std::make_unique(); + } + } + + ~StreamManagerTy() { + // Destroy streams + for (int I = 0; I < NumberOfDevices; ++I) { + CUresult err = cuCtxSetCurrent((*ContextsPtr)[I]); + if (err != CUDA_SUCCESS) { + DP("Error when setting current CUDA context\n"); + CUDA_ERR_STRING(err); + } + + for (CUstream &S : StreamPool[I]) { + if (!S) + continue; + err = cuStreamDestroy(S); + if (err != CUDA_SUCCESS) { + DP("Error when destroying CUDA stream\n"); + CUDA_ERR_STRING(err); + } + } + } + } + + // Get a CUstream from pool. Per-device next stream id always points to the + // next available CUstream. That means, CUstreams [0, id-1] have been + // assigned, and [id,] are still available. If there is no CUstream left, we + // will ask more CUstreams from CUDA RT. Each time a CUstream is assigned, + // the id will increase one. + // xxxxxs+++++++++ + // ^ + // id + // After assignment, the pool becomes the following and s is assigned. + // xxxxxs+++++++++ + // ^ + // id + CUstream getStream(const int DeviceId) { + assert(DeviceId >= 0 && + static_cast(DeviceId) < NextStreamId.size() && + "Unexpected device id"); + + const std::lock_guard Lock(*StreamMtx[DeviceId]); + int &Id = NextStreamId[DeviceId]; + // No CUstream left in the pool, we need to request from CUDA RT + if (Id == StreamPool[DeviceId].size()) { + // By default we double the stream pool every time + resizeStreamPool(DeviceId, Id * 2); + } + return StreamPool[DeviceId][Id++]; + } + + // Return a CUstream back to pool. As mentioned above, per-device next + // stream is always points to the next available CUstream, so when we return + // a CUstream, we need to first decrease the id, and then copy the CUstream + // back. + // It is worth noting that, the order of streams return might be different + // from that they're assigned, that saying, at some point, there might be + // two identical CUstreams. + // xxax+a+++++ + // ^ + // id + // However, it doesn't matter, because they're always on the two sides of + // id. The left one will in the end be overwritten by another CUstream. + // Therefore, after several execution, the order of pool might be different + // from its initial state. + void returnStream(const int DeviceId, CUstream Stream) { + assert(DeviceId >= 0 && + static_cast(DeviceId) < NextStreamId.size() && + "Unexpected device id"); + { + const std::lock_guard Lock(*StreamMtx[DeviceId]); + int &Id = NextStreamId[DeviceId]; + assert(Id > 0 && "Wrong stream ID"); + StreamPool[DeviceId][--Id] = Stream; + } + } + }; + std::vector> FuncGblEntries; - std::vector> NextStreamId; + std::shared_ptr StreamManager; public: + // Shilei: I don't like to put this friend function here but it seems like + // the best way not to refine the whole file. + friend int32_t __tgt_rtl_init_device(int32_t device_id); + int NumberOfDevices; std::vector Modules; std::vector Contexts; - std::vector> Streams; // Device properties std::vector ThreadsPerBlock; @@ -112,7 +265,6 @@ // OpenMP Environment properties int EnvNumTeams; int EnvTeamLimit; - int EnvNumStreams; // OpenMP Requires Flags int64_t RequiresFlags; @@ -178,13 +330,12 @@ E.Table.EntriesBegin = E.Table.EntriesEnd = 0; } - // Get the next stream on a given device in a round robin manner - CUstream &getNextStream(const int DeviceId) { - assert(DeviceId >= 0 && - static_cast(DeviceId) < NextStreamId.size() && - "Unexpected device id!"); - const unsigned int Id = NextStreamId[DeviceId]->fetch_add(1); - return Streams[DeviceId][Id % EnvNumStreams]; + CUstream getStream(const int DeviceId) { + return StreamManager->getStream(DeviceId); + } + + void returnStream(const int DeviceId, CUstream Stream) { + StreamManager->returnStream(DeviceId, Stream); } RTLDeviceInfoTy() { @@ -219,8 +370,6 @@ FuncGblEntries.resize(NumberOfDevices); Contexts.resize(NumberOfDevices); - Streams.resize(NumberOfDevices); - NextStreamId.resize(NumberOfDevices); ThreadsPerBlock.resize(NumberOfDevices); BlocksPerGrid.resize(NumberOfDevices); WarpSize.resize(NumberOfDevices); @@ -245,28 +394,17 @@ EnvNumTeams = -1; } - // By default let's create 256 streams per device - EnvNumStreams = 256; - envStr = getenv("LIBOMPTARGET_NUM_STREAMS"); - if (envStr) { - EnvNumStreams = std::stoi(envStr); - } - - // Initialize streams for each device - for (std::vector &S : Streams) { - S.resize(EnvNumStreams); - } - - // Initialize the next stream id - for (std::unique_ptr &Ptr : NextStreamId) { - Ptr = std::make_unique(0); - } + StreamManager = + std::make_shared(NumberOfDevices, &Contexts); // Default state. RequiresFlags = OMP_REQ_UNDEFINED; } ~RTLDeviceInfoTy() { + // First destruct stream manager in case of Contexts is destructed before it + StreamManager = nullptr; + // Close modules for (auto &module : Modules) if (module) { @@ -277,24 +415,6 @@ } } - // Destroy streams before contexts - for (int I = 0; I < NumberOfDevices; ++I) { - CUresult err = cuCtxSetCurrent(Contexts[I]); - if (err != CUDA_SUCCESS) { - DP("Error when setting current CUDA context\n"); - CUDA_ERR_STRING(err); - } - - for (auto &S : Streams[I]) - if (S) { - err = cuStreamDestroy(S); - if (err != CUDA_SUCCESS) { - DP("Error when destroying CUDA stream\n"); - CUDA_ERR_STRING(err); - } - } - } - // Destroy contexts for (auto &ctx : Contexts) if (ctx) { @@ -310,12 +430,12 @@ static RTLDeviceInfoTy DeviceInfo; namespace { -CUstream selectStream(int32_t Id, __tgt_async_info *AsyncInfo) { +CUstream getStream(int32_t Id, __tgt_async_info *AsyncInfo) { if (!AsyncInfo) - return DeviceInfo.getNextStream(Id); + return DeviceInfo.getStream(Id); if (!AsyncInfo->Queue) - AsyncInfo->Queue = DeviceInfo.getNextStream(Id); + AsyncInfo->Queue = DeviceInfo.getStream(Id); return reinterpret_cast(AsyncInfo->Queue); } @@ -338,7 +458,6 @@ } int32_t __tgt_rtl_init_device(int32_t device_id) { - CUdevice cuDevice; DP("Getting device %d\n", device_id); CUresult err = cuDeviceGet(&cuDevice, device_id); @@ -363,7 +482,7 @@ CUDA_ERR_STRING(err); } - for (CUstream &Stream : DeviceInfo.Streams[device_id]) { + for (CUstream &Stream : DeviceInfo.StreamManager->StreamPool[device_id]) { err = cuStreamCreate(&Stream, CU_STREAM_NON_BLOCKING); if (err != CUDA_SUCCESS) { DP("Error when creating CUDA stream\n"); @@ -685,7 +804,7 @@ return OFFLOAD_FAIL; } - CUstream Stream = selectStream(device_id, async_info); + CUstream Stream = getStream(device_id, async_info); err = cuMemcpyHtoDAsync((CUdeviceptr)tgt_ptr, hst_ptr, size, Stream); if (err != CUDA_SUCCESS) { @@ -723,7 +842,7 @@ return OFFLOAD_FAIL; } - CUstream Stream = selectStream(device_id, async_info); + CUstream Stream = getStream(device_id, async_info); err = cuMemcpyDtoHAsync(hst_ptr, (CUdeviceptr)tgt_ptr, size, Stream); if (err != CUDA_SUCCESS) { @@ -870,7 +989,7 @@ DP("Launch kernel with %d blocks and %d threads\n", cudaBlocksPerGrid, cudaThreadsPerBlock); - CUstream Stream = selectStream(device_id, async_info); + CUstream Stream = getStream(device_id, async_info); err = cuLaunchKernel(KernelInfo->Func, cudaBlocksPerGrid, 1, 1, cudaThreadsPerBlock, 1, 1, 0 /*bytes of shared memory*/, Stream, &args[0], 0); @@ -911,6 +1030,11 @@ CUDA_ERR_STRING(Err); return OFFLOAD_FAIL; } + // Once the stream is synchronized, return it to stream pool and reset + // async_info. This is to make sure the synchronization only works for its own + // tasks. + DeviceInfo.returnStream(device_id, Stream); + async_info->Queue = nullptr; return OFFLOAD_SUCCESS; }