diff --git a/openmp/libomptarget/include/device.h b/openmp/libomptarget/include/device.h --- a/openmp/libomptarget/include/device.h +++ b/openmp/libomptarget/include/device.h @@ -438,6 +438,12 @@ /// OFFLOAD_SUCCESS/OFFLOAD_FAIL when succeeds/fails. int32_t synchronize(AsyncInfoTy &AsyncInfo); + /// Query for device/queue/event based completion on \p AsyncInfo in a + /// non-blocking manner and return OFFLOAD_SUCCESS/OFFLOAD_FAIL when + /// succeeds/fails. Must be called multiple times until AsyncInfo is + /// completed and AsyncInfo.isDone() returns true. + int32_t queryAsync(AsyncInfoTy &AsyncInfo); + /// Calls the corresponding print in the \p RTLDEVID /// device RTL to obtain the information of the specific device. bool printDeviceInfo(int32_t RTLDevID); diff --git a/openmp/libomptarget/include/omptarget.h b/openmp/libomptarget/include/omptarget.h --- a/openmp/libomptarget/include/omptarget.h +++ b/openmp/libomptarget/include/omptarget.h @@ -15,11 +15,15 @@ #define _OMPTARGET_H_ #include +#include #include #include +#include #include +#include "llvm/ADT/SmallVector.h" + #define OFFLOAD_SUCCESS (0) #define OFFLOAD_FAIL (~0) @@ -181,15 +185,29 @@ /// associated with a libomptarget layer device. RAII semantics to avoid /// mistakes. class AsyncInfoTy { +public: + enum class SyncTy { BLOCKING, NON_BLOCKING }; + +private: /// Locations we used in (potentially) asynchronous calls which should live /// as long as this AsyncInfoTy object. std::deque BufferLocations; + /// Post-processing operations executed after a successful synchronization. + /// \note the post-processing function should return OFFLOAD_SUCCESS or + /// OFFLOAD_FAIL appropriately. + using PostProcFuncTy = std::function; + llvm::SmallVector PostProcessingFunctions; + __tgt_async_info AsyncInfo; DeviceTy &Device; public: - AsyncInfoTy(DeviceTy &Device) : Device(Device) {} + /// Synchronization method to be used. + SyncTy SyncType; + + AsyncInfoTy(DeviceTy &Device, SyncTy SyncType = SyncTy::BLOCKING) + : Device(Device), SyncType(SyncType) {} ~AsyncInfoTy() { synchronize(); } /// Implicit conversion to the __tgt_async_info which is used in the @@ -198,12 +216,54 @@ /// Synchronize all pending actions. /// + /// \note synchronization will be performance in a blocking or non-blocking + /// manner, depending on the SyncType. + /// + /// \note if the operations are completed, the registered post-processing + /// functions will be executed once and unregistered afterwards. + /// /// \returns OFFLOAD_FAIL or OFFLOAD_SUCCESS appropriately. int synchronize(); /// Return a void* reference with a lifetime that is at least as long as this /// AsyncInfoTy object. The location can be used as intermediate buffer. void *&getVoidPtrLocation(); + + /// Check if all asynchronous operations are completed. + /// + /// \note if the operations are completed, the registered post-processing + /// functions will be executed once and unregistered afterwards. + /// + /// \returns true if there is no pending asynchronous operations, false + /// otherwise. + bool isDone(); + + /// Add a new post-processing function to be executed after synchronization. + /// + /// \param[in] Function is a templated function (e.g., function pointers, + /// lambdas, std::function) that can be convertible to a PostProcFuncTy (i.e., + /// it must have int() as its function signature). + template void addPostProcessingFunction(FuncTy &&Function) { + static_assert(std::is_convertible_v, + "Invalid post-processing function type. Please check " + "function signature!"); + PostProcessingFunctions.emplace_back(Function); + } + +private: + /// Run all the post-processing functions sequentially. + /// + /// \note after a successful execution, all previously registered functions + /// are unregistered. + /// + /// \returns OFFLOAD_FAIL if any post-processing function failed, + /// OFFLOAD_SUCCESS otherwise. + int32_t runPostProcessing(); + + /// Check if the internal asynchronous info queue is empty or not. + /// + /// \returns true if empty, false otherwise. + bool isQueueEmpty() const; }; /// This struct is a record of non-contiguous information @@ -347,6 +407,15 @@ void *DepList, int32_t NoAliasDepNum, void *NoAliasDepList); +// Non-blocking synchronization for target nowait regions. This function +// acquires the asynchronous context from task data of the current task being +// executed and tries to query for the completion of its operations. If the +// operations are still pending, the function returns immediately. If the +// operations are completed, all the post-processing procedures stored in the +// asynchronous context are executed and the context is removed from the task +// data. +void __tgt_target_nowait_query(void **AsyncHandle); + void __tgt_set_info_flag(uint32_t); int __tgt_print_device_info(int64_t DeviceId); diff --git a/openmp/libomptarget/include/omptargetplugin.h b/openmp/libomptarget/include/omptargetplugin.h --- a/openmp/libomptarget/include/omptargetplugin.h +++ b/openmp/libomptarget/include/omptargetplugin.h @@ -156,6 +156,16 @@ // error code. int32_t __tgt_rtl_synchronize(int32_t ID, __tgt_async_info *AsyncInfo); +// Queries for the completion of asynchronous operations. Instead of blocking +// the calling thread as __tgt_rtl_synchronize, the progress of the operations +// stored in AsyncInfo->Queue is queried in a non-blocking manner, partially +// advancing their execution. If all operations are completed, AsyncInfo->Queue +// is set to nullptr. If there are still pending operations, AsyncInfo->Queue is +// kept as a valid queue. In any case of success (i.e., successful query +// with/without completing all operations), return zero. Otherwise, return an +// error code. +int32_t __tgt_rtl_query_async(int32_t ID, __tgt_async_info *AsyncInfo); + // Set plugin's internal information flag externally. void __tgt_rtl_set_info_flag(uint32_t); diff --git a/openmp/libomptarget/include/rtl.h b/openmp/libomptarget/include/rtl.h --- a/openmp/libomptarget/include/rtl.h +++ b/openmp/libomptarget/include/rtl.h @@ -62,6 +62,7 @@ __tgt_async_info *); typedef int64_t(init_requires_ty)(int64_t); typedef int32_t(synchronize_ty)(int32_t, __tgt_async_info *); + typedef int32_t(query_async_ty)(int32_t, __tgt_async_info *); typedef int32_t (*register_lib_ty)(__tgt_bin_desc *); typedef int32_t(supports_empty_images_ty)(); typedef void(print_device_info_ty)(int32_t); @@ -112,6 +113,7 @@ run_team_region_async_ty *run_team_region_async = nullptr; init_requires_ty *init_requires = nullptr; synchronize_ty *synchronize = nullptr; + query_async_ty *query_async = nullptr; register_lib_ty register_lib = nullptr; register_lib_ty unregister_lib = nullptr; supports_empty_images_ty *supports_empty_images = nullptr; diff --git a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h --- a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h +++ b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h @@ -290,6 +290,11 @@ Error synchronize(__tgt_async_info *AsyncInfo); virtual Error synchronizeImpl(__tgt_async_info &AsyncInfo) = 0; + /// Query for the completion of the pending operations on the __tgt_async_info + /// structure in a non-blocking manner. + Error queryAsync(__tgt_async_info *AsyncInfo); + virtual Error queryAsyncImpl(__tgt_async_info &AsyncInfo) = 0; + /// Allocate data on the device or involving the device. Expected dataAlloc(int64_t Size, void *HostPtr, TargetAllocTy Kind); diff --git a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.cpp b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.cpp --- a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.cpp +++ b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.cpp @@ -354,6 +354,13 @@ return synchronizeImpl(*AsyncInfo); } +Error GenericDeviceTy::queryAsync(__tgt_async_info *AsyncInfo) { + if (!AsyncInfo || !AsyncInfo->Queue) + return Plugin::error("Invalid async info queue"); + + return queryAsyncImpl(*AsyncInfo); +} + Expected GenericDeviceTy::dataAlloc(int64_t Size, void *HostPtr, TargetAllocTy Kind) { void *Alloc = nullptr; @@ -791,6 +798,16 @@ return (bool)Err; } +int32_t __tgt_rtl_query_async(int32_t DeviceId, + __tgt_async_info *AsyncInfoPtr) { + auto Err = Plugin::get().getDevice(DeviceId).queryAsync(AsyncInfoPtr); + if (Err) + REPORT("Failure to query stream %p: %s\n", AsyncInfoPtr->Queue, + toString(std::move(Err)).data()); + + return (bool)Err; +} + int32_t __tgt_rtl_run_target_region(int32_t DeviceId, void *TgtEntryPtr, void **TgtArgs, ptrdiff_t *TgtOffsets, int32_t NumArgs) { diff --git a/openmp/libomptarget/plugins-nextgen/cuda/src/rtl.cpp b/openmp/libomptarget/plugins-nextgen/cuda/src/rtl.cpp --- a/openmp/libomptarget/plugins-nextgen/cuda/src/rtl.cpp +++ b/openmp/libomptarget/plugins-nextgen/cuda/src/rtl.cpp @@ -486,6 +486,24 @@ return Plugin::check(Res, "Error in cuStreamSynchronize: %s"); } + /// Query for the completion of the pending operations on the async info. + Error queryAsyncImpl(__tgt_async_info &AsyncInfo) override { + CUstream Stream = reinterpret_cast(AsyncInfo.Queue); + CUresult Res = cuStreamQuery(Stream); + + // Not ready streams must be considered as successful operations. + if (Res == CUDA_ERROR_NOT_READY) + return Plugin::success(); + + // Once the stream is synchronized and the operations completed (or an error + // occurs), return it to stream pool and reset AsyncInfo. This is to make + // sure the synchronization only works for its own tasks. + CUDAStreamManager.returnResource(Stream); + AsyncInfo.Queue = nullptr; + + return Plugin::check(Res, "Error in cuStreamQuery: %s"); + } + /// Submit data to the device (host to device transfer). Error dataSubmitImpl(void *TgtPtr, const void *HstPtr, int64_t Size, AsyncInfoWrapperTy &AsyncInfoWrapper) override { diff --git a/openmp/libomptarget/plugins-nextgen/generic-elf-64bit/src/rtl.cpp b/openmp/libomptarget/plugins-nextgen/generic-elf-64bit/src/rtl.cpp --- a/openmp/libomptarget/plugins-nextgen/generic-elf-64bit/src/rtl.cpp +++ b/openmp/libomptarget/plugins-nextgen/generic-elf-64bit/src/rtl.cpp @@ -245,6 +245,12 @@ return Plugin::success(); } + /// All functions are already synchronous. No need to do anything on this + /// query function. + Error queryAsyncImpl(__tgt_async_info &AsyncInfo) override { + return Plugin::success(); + } + /// This plugin does not support interoperability Error initAsyncInfoImpl(AsyncInfoWrapperTy &AsyncInfoWrapper) override { return Plugin::error("initAsyncInfoImpl not supported"); diff --git a/openmp/libomptarget/plugins/cuda/dynamic_cuda/cuda.h b/openmp/libomptarget/plugins/cuda/dynamic_cuda/cuda.h --- a/openmp/libomptarget/plugins/cuda/dynamic_cuda/cuda.h +++ b/openmp/libomptarget/plugins/cuda/dynamic_cuda/cuda.h @@ -31,6 +31,7 @@ CUDA_ERROR_INVALID_VALUE = 1, CUDA_ERROR_NO_DEVICE = 100, CUDA_ERROR_INVALID_HANDLE = 400, + CUDA_ERROR_NOT_READY = 600, CUDA_ERROR_TOO_MANY_PEERS = 711, } CUresult; @@ -244,6 +245,7 @@ CUresult cuStreamCreate(CUstream *, unsigned); CUresult cuStreamDestroy(CUstream); CUresult cuStreamSynchronize(CUstream); +CUresult cuStreamQuery(CUstream); CUresult cuCtxSetCurrent(CUcontext); CUresult cuDevicePrimaryCtxRelease(CUdevice); CUresult cuDevicePrimaryCtxGetState(CUdevice, unsigned *, int *); diff --git a/openmp/libomptarget/plugins/cuda/dynamic_cuda/cuda.cpp b/openmp/libomptarget/plugins/cuda/dynamic_cuda/cuda.cpp --- a/openmp/libomptarget/plugins/cuda/dynamic_cuda/cuda.cpp +++ b/openmp/libomptarget/plugins/cuda/dynamic_cuda/cuda.cpp @@ -58,6 +58,7 @@ DLWRAP(cuStreamCreate, 2) DLWRAP(cuStreamDestroy, 1) DLWRAP(cuStreamSynchronize, 1) +DLWRAP(cuStreamQuery, 1) DLWRAP(cuCtxSetCurrent, 1) DLWRAP(cuDevicePrimaryCtxRelease, 1) DLWRAP(cuDevicePrimaryCtxGetState, 3) diff --git a/openmp/libomptarget/plugins/cuda/src/rtl.cpp b/openmp/libomptarget/plugins/cuda/src/rtl.cpp --- a/openmp/libomptarget/plugins/cuda/src/rtl.cpp +++ b/openmp/libomptarget/plugins/cuda/src/rtl.cpp @@ -1267,6 +1267,29 @@ return (Err == CUDA_SUCCESS) ? OFFLOAD_SUCCESS : OFFLOAD_FAIL; } + int queryAsync(const int DeviceId, __tgt_async_info *AsyncInfo) const { + CUstream Stream = reinterpret_cast(AsyncInfo->Queue); + CUresult Err = cuStreamQuery(Stream); + + // Not ready streams must be considered as successful operations. + if (Err == CUDA_ERROR_NOT_READY) + return OFFLOAD_SUCCESS; + + // Once the stream is synchronized or an error occurs, return it to the + // stream pool and reset AsyncInfo. This is to make sure the + // synchronization only works for its own tasks. + StreamPool[DeviceId]->release(Stream); + AsyncInfo->Queue = nullptr; + + if (Err != CUDA_SUCCESS) { + DP("Error when querying for stream progress. stream = " DPxMOD + ", async info ptr = " DPxMOD "\n", + DPxPTR(Stream), DPxPTR(AsyncInfo)); + CUDA_ERR_STRING(Err); + } + return (Err == CUDA_SUCCESS) ? OFFLOAD_SUCCESS : OFFLOAD_FAIL; + } + void printDeviceInfo(int32_t DeviceId) { char TmpChar[1000]; std::string TmpStr; @@ -1780,6 +1803,15 @@ return DeviceRTL.synchronize(DeviceId, AsyncInfoPtr); } +int32_t __tgt_rtl_query_async(int32_t DeviceId, + __tgt_async_info *AsyncInfoPtr) { + assert(DeviceRTL.isValidDeviceId(DeviceId) && "device_id is invalid"); + assert(AsyncInfoPtr && "async_info_ptr is nullptr"); + assert(AsyncInfoPtr->Queue && "async_info_ptr->Queue is nullptr"); + // NOTE: We don't need to set context for stream query. + return DeviceRTL.queryAsync(DeviceId, AsyncInfoPtr); +} + void __tgt_rtl_set_info_flag(uint32_t NewInfoLevel) { std::atomic &InfoLevel = getInfoLevelInternal(); InfoLevel.store(NewInfoLevel); diff --git a/openmp/libomptarget/src/device.cpp b/openmp/libomptarget/src/device.cpp --- a/openmp/libomptarget/src/device.cpp +++ b/openmp/libomptarget/src/device.cpp @@ -641,6 +641,13 @@ return OFFLOAD_SUCCESS; } +int32_t DeviceTy::queryAsync(AsyncInfoTy &AsyncInfo) { + if (RTL->query_async) + return RTL->query_async(RTLDeviceID, AsyncInfo); + + return synchronize(AsyncInfo); +} + int32_t DeviceTy::createEvent(void **Event) { if (RTL->create_event) return RTL->create_event(RTLDeviceID, Event); diff --git a/openmp/libomptarget/src/exports b/openmp/libomptarget/src/exports --- a/openmp/libomptarget/src/exports +++ b/openmp/libomptarget/src/exports @@ -26,6 +26,7 @@ __tgt_target_teams_nowait_mapper; __tgt_target_kernel; __tgt_target_kernel_nowait; + __tgt_target_nowait_query; __tgt_mapper_num_components; __tgt_push_mapper_component; __kmpc_push_target_tripcount; @@ -60,4 +61,3 @@ local: *; }; - diff --git a/openmp/libomptarget/src/interface.cpp b/openmp/libomptarget/src/interface.cpp --- a/openmp/libomptarget/src/interface.cpp +++ b/openmp/libomptarget/src/interface.cpp @@ -16,10 +16,13 @@ #include "private.h" #include "rtl.h" +#include "Utilities.h" + #include #include #include #include +#include //////////////////////////////////////////////////////////////////////////////// /// adds requires flags @@ -61,28 +64,29 @@ } } -/// creates host-to-target data mapping, stores it in the -/// libomptarget.so internal structure (an entry in a stack of data maps) -/// and passes the data to the device. -EXTERN void __tgt_target_data_begin_mapper(ident_t *Loc, int64_t DeviceId, - int32_t ArgNum, void **ArgsBase, - void **Args, int64_t *ArgSizes, - int64_t *ArgTypes, - map_var_info_t *ArgNames, - void **ArgMappers) { +template +static inline void +targetDataMapper(ident_t *Loc, int64_t DeviceId, int32_t ArgNum, + void **ArgsBase, void **Args, int64_t *ArgSizes, + int64_t *ArgTypes, map_var_info_t *ArgNames, void **ArgMappers, + TargetDataFuncPtrTy TargetDataFunction, + const char *RegionTypeMsg, const char *RegionName) { + static_assert(std::is_convertible_v, + "TargetAsyncInfoTy must be convertible to AsyncInfoTy."); + TIMESCOPE_WITH_IDENT(Loc); - DP("Entering data begin region for device %" PRId64 " with %d mappings\n", - DeviceId, ArgNum); + + DP("Entering data %s region for device %" PRId64 " with %d mappings\n", + RegionName, DeviceId, ArgNum); + if (checkDeviceAndCtors(DeviceId, Loc)) { DP("Not offloading to device %" PRId64 "\n", DeviceId); return; } - DeviceTy &Device = *PM->Devices[DeviceId]; - if (getInfoLevel() & OMP_INFOTYPE_KERNEL_ARGS) printKernelArguments(Loc, DeviceId, ArgNum, ArgSizes, ArgTypes, ArgNames, - "Entering OpenMP data region"); + RegionTypeMsg); #ifdef OMPTARGET_DEBUG for (int I = 0; I < ArgNum; ++I) { DP("Entry %2d: Base=" DPxMOD ", Begin=" DPxMOD ", Size=%" PRId64 @@ -92,23 +96,45 @@ } #endif - AsyncInfoTy AsyncInfo(Device); - int Rc = targetDataBegin(Loc, Device, ArgNum, ArgsBase, Args, ArgSizes, - ArgTypes, ArgNames, ArgMappers, AsyncInfo); + DeviceTy &Device = *PM->Devices[DeviceId]; + TargetAsyncInfoTy TargetAsyncInfo(Device); + AsyncInfoTy &AsyncInfo = TargetAsyncInfo; + + int Rc = OFFLOAD_SUCCESS; + Rc = TargetDataFunction(Loc, Device, ArgNum, ArgsBase, Args, ArgSizes, + ArgTypes, ArgNames, ArgMappers, AsyncInfo, + false /* FromMapper */); + if (Rc == OFFLOAD_SUCCESS) Rc = AsyncInfo.synchronize(); + handleTargetOutcome(Rc == OFFLOAD_SUCCESS, Loc); } +/// creates host-to-target data mapping, stores it in the +/// libomptarget.so internal structure (an entry in a stack of data maps) +/// and passes the data to the device. +EXTERN void __tgt_target_data_begin_mapper(ident_t *Loc, int64_t DeviceId, + int32_t ArgNum, void **ArgsBase, + void **Args, int64_t *ArgSizes, + int64_t *ArgTypes, + map_var_info_t *ArgNames, + void **ArgMappers) { + TIMESCOPE_WITH_IDENT(Loc); + targetDataMapper(Loc, DeviceId, ArgNum, ArgsBase, Args, ArgSizes, + ArgTypes, ArgNames, ArgMappers, targetDataBegin, + "Entering OpenMP data region", "begin"); +} + EXTERN void __tgt_target_data_begin_nowait_mapper( ident_t *Loc, int64_t DeviceId, int32_t ArgNum, void **ArgsBase, void **Args, int64_t *ArgSizes, int64_t *ArgTypes, map_var_info_t *ArgNames, void **ArgMappers, int32_t DepNum, void *DepList, int32_t NoAliasDepNum, void *NoAliasDepList) { TIMESCOPE_WITH_IDENT(Loc); - - __tgt_target_data_begin_mapper(Loc, DeviceId, ArgNum, ArgsBase, Args, - ArgSizes, ArgTypes, ArgNames, ArgMappers); + targetDataMapper( + Loc, DeviceId, ArgNum, ArgsBase, Args, ArgSizes, ArgTypes, ArgNames, + ArgMappers, targetDataBegin, "Entering OpenMP data region", "begin"); } /// passes data from the target, releases target memory and destroys @@ -121,32 +147,9 @@ map_var_info_t *ArgNames, void **ArgMappers) { TIMESCOPE_WITH_IDENT(Loc); - DP("Entering data end region with %d mappings\n", ArgNum); - if (checkDeviceAndCtors(DeviceId, Loc)) { - DP("Not offloading to device %" PRId64 "\n", DeviceId); - return; - } - - DeviceTy &Device = *PM->Devices[DeviceId]; - - if (getInfoLevel() & OMP_INFOTYPE_KERNEL_ARGS) - printKernelArguments(Loc, DeviceId, ArgNum, ArgSizes, ArgTypes, ArgNames, - "Exiting OpenMP data region"); -#ifdef OMPTARGET_DEBUG - for (int I = 0; I < ArgNum; ++I) { - DP("Entry %2d: Base=" DPxMOD ", Begin=" DPxMOD ", Size=%" PRId64 - ", Type=0x%" PRIx64 ", Name=%s\n", - I, DPxPTR(ArgsBase[I]), DPxPTR(Args[I]), ArgSizes[I], ArgTypes[I], - (ArgNames) ? getNameFromMapping(ArgNames[I]).c_str() : "unknown"); - } -#endif - - AsyncInfoTy AsyncInfo(Device); - int Rc = targetDataEnd(Loc, Device, ArgNum, ArgsBase, Args, ArgSizes, - ArgTypes, ArgNames, ArgMappers, AsyncInfo); - if (Rc == OFFLOAD_SUCCESS) - Rc = AsyncInfo.synchronize(); - handleTargetOutcome(Rc == OFFLOAD_SUCCESS, Loc); + targetDataMapper(Loc, DeviceId, ArgNum, ArgsBase, Args, ArgSizes, + ArgTypes, ArgNames, ArgMappers, targetDataEnd, + "Exiting OpenMP data region", "end"); } EXTERN void __tgt_target_data_end_nowait_mapper( @@ -155,9 +158,9 @@ void **ArgMappers, int32_t DepNum, void *DepList, int32_t NoAliasDepNum, void *NoAliasDepList) { TIMESCOPE_WITH_IDENT(Loc); - - __tgt_target_data_end_mapper(Loc, DeviceId, ArgNum, ArgsBase, Args, ArgSizes, - ArgTypes, ArgNames, ArgMappers); + targetDataMapper( + Loc, DeviceId, ArgNum, ArgsBase, Args, ArgSizes, ArgTypes, ArgNames, + ArgMappers, targetDataEnd, "Exiting OpenMP data region", "end"); } EXTERN void __tgt_target_data_update_mapper(ident_t *Loc, int64_t DeviceId, @@ -167,23 +170,9 @@ map_var_info_t *ArgNames, void **ArgMappers) { TIMESCOPE_WITH_IDENT(Loc); - DP("Entering data update with %d mappings\n", ArgNum); - if (checkDeviceAndCtors(DeviceId, Loc)) { - DP("Not offloading to device %" PRId64 "\n", DeviceId); - return; - } - - if (getInfoLevel() & OMP_INFOTYPE_KERNEL_ARGS) - printKernelArguments(Loc, DeviceId, ArgNum, ArgSizes, ArgTypes, ArgNames, - "Updating OpenMP data"); - - DeviceTy &Device = *PM->Devices[DeviceId]; - AsyncInfoTy AsyncInfo(Device); - int Rc = targetDataUpdate(Loc, Device, ArgNum, ArgsBase, Args, ArgSizes, - ArgTypes, ArgNames, ArgMappers, AsyncInfo); - if (Rc == OFFLOAD_SUCCESS) - Rc = AsyncInfo.synchronize(); - handleTargetOutcome(Rc == OFFLOAD_SUCCESS, Loc); + targetDataMapper( + Loc, DeviceId, ArgNum, ArgsBase, Args, ArgSizes, ArgTypes, ArgNames, + ArgMappers, targetDataUpdate, "Updating OpenMP data", "update"); } EXTERN void __tgt_target_data_update_nowait_mapper( @@ -192,37 +181,33 @@ void **ArgMappers, int32_t DepNum, void *DepList, int32_t NoAliasDepNum, void *NoAliasDepList) { TIMESCOPE_WITH_IDENT(Loc); - - __tgt_target_data_update_mapper(Loc, DeviceId, ArgNum, ArgsBase, Args, - ArgSizes, ArgTypes, ArgNames, ArgMappers); + targetDataMapper( + Loc, DeviceId, ArgNum, ArgsBase, Args, ArgSizes, ArgTypes, ArgNames, + ArgMappers, targetDataUpdate, "Updating OpenMP data", "update"); } -/// Implements a kernel entry that executes the target region on the specified -/// device. -/// -/// \param Loc Source location associated with this target region. -/// \param DeviceId The device to execute this region, -1 indicated the default. -/// \param NumTeams Number of teams to launch the region with, -1 indicates a -/// non-teams region and 0 indicates it was unspecified. -/// \param ThreadLimit Limit to the number of threads to use in the kernel -/// launch, 0 indicates it was unspecified. -/// \param HostPtr The pointer to the host function registered with the kernel. -/// \param Args All arguments to this kernel launch (see struct definition). -EXTERN int __tgt_target_kernel(ident_t *Loc, int64_t DeviceId, int32_t NumTeams, +template +static inline int targetKernel(ident_t *Loc, int64_t DeviceId, int32_t NumTeams, int32_t ThreadLimit, void *HostPtr, __tgt_kernel_arguments *Args) { + static_assert(std::is_convertible_v, + "Target AsyncInfoTy must be convertible to AsyncInfoTy."); + TIMESCOPE_WITH_IDENT(Loc); - DP("Entering target region with entry point " DPxMOD " and device Id %" PRId64 + + DP("Entering target region for device %" PRId64 " with entry point " DPxMOD "\n", - DPxPTR(HostPtr), DeviceId); - if (Args->Version != 1) { - DP("Unexpected ABI version: %d\n", Args->Version); - } + DeviceId, DPxPTR(HostPtr)); + if (checkDeviceAndCtors(DeviceId, Loc)) { DP("Not offloading to device %" PRId64 "\n", DeviceId); return OMP_TGT_FAIL; } + if (Args->Version != 1) { + DP("Unexpected ABI version: %d\n", Args->Version); + } + if (getInfoLevel() & OMP_INFOTYPE_KERNEL_ARGS) printKernelArguments(Loc, DeviceId, Args->NumArgs, Args->ArgSizes, Args->ArgTypes, Args->ArgNames, @@ -243,26 +228,50 @@ NumTeams = 0; DeviceTy &Device = *PM->Devices[DeviceId]; - AsyncInfoTy AsyncInfo(Device); - int Rc = target(Loc, Device, HostPtr, Args->NumArgs, Args->ArgBasePtrs, - Args->ArgPtrs, Args->ArgSizes, Args->ArgTypes, Args->ArgNames, - Args->ArgMappers, NumTeams, ThreadLimit, Args->Tripcount, - IsTeams, AsyncInfo); + TargetAsyncInfoTy TargetAsyncInfo(Device); + AsyncInfoTy &AsyncInfo = TargetAsyncInfo; + + int Rc = OFFLOAD_SUCCESS; + Rc = target(Loc, Device, HostPtr, Args->NumArgs, Args->ArgBasePtrs, + Args->ArgPtrs, Args->ArgSizes, Args->ArgTypes, Args->ArgNames, + Args->ArgMappers, NumTeams, ThreadLimit, Args->Tripcount, IsTeams, + AsyncInfo); + if (Rc == OFFLOAD_SUCCESS) Rc = AsyncInfo.synchronize(); + handleTargetOutcome(Rc == OFFLOAD_SUCCESS, Loc); assert(Rc == OFFLOAD_SUCCESS && "__tgt_target_kernel unexpected failure!"); + return OMP_TGT_SUCCESS; } +/// Implements a kernel entry that executes the target region on the specified +/// device. +/// +/// \param Loc Source location associated with this target region. +/// \param DeviceId The device to execute this region, -1 indicated the default. +/// \param NumTeams Number of teams to launch the region with, -1 indicates a +/// non-teams region and 0 indicates it was unspecified. +/// \param ThreadLimit Limit to the number of threads to use in the kernel +/// launch, 0 indicates it was unspecified. +/// \param HostPtr The pointer to the host function registered with the kernel. +/// \param Args All arguments to this kernel launch (see struct definition). +EXTERN int __tgt_target_kernel(ident_t *Loc, int64_t DeviceId, int32_t NumTeams, + int32_t ThreadLimit, void *HostPtr, + __tgt_kernel_arguments *Args) { + TIMESCOPE_WITH_IDENT(Loc); + return targetKernel(Loc, DeviceId, NumTeams, ThreadLimit, + HostPtr, Args); +} + EXTERN int __tgt_target_kernel_nowait( ident_t *Loc, int64_t DeviceId, int32_t NumTeams, int32_t ThreadLimit, void *HostPtr, __tgt_kernel_arguments *Args, int32_t DepNum, void *DepList, int32_t NoAliasDepNum, void *NoAliasDepList) { TIMESCOPE_WITH_IDENT(Loc); - - return __tgt_target_kernel(Loc, DeviceId, NumTeams, ThreadLimit, HostPtr, - Args); + return targetKernel(Loc, DeviceId, NumTeams, + ThreadLimit, HostPtr, Args); } // Get the current number of components for a user-defined mapper. @@ -303,3 +312,43 @@ return PM->Devices[DeviceId]->printDeviceInfo( PM->Devices[DeviceId]->RTLDeviceID); } + +EXTERN void __tgt_target_nowait_query(void **AsyncHandle) { + if (!AsyncHandle || !*AsyncHandle) { + FATAL_MESSAGE0( + 1, "Receive an invalid async handle from the current OpenMP task. Is " + "this a target nowait region?\n"); + } + + // Exponential backoff tries to optimally decide if a thread should just query + // for the device operations (work/spin wait on them) or block until they are + // completed (use device side blocking mechanism). This allows the runtime to + // adapt itself when there are a lot of long-running target regions in-flight. + using namespace llvm::omp::target; + static thread_local ExponentialBackoff QueryCounter( + Int64Envar("OMPTARGET_QUERY_COUNT_MAX", 10), + Int64Envar("OMPTARGET_QUERY_COUNT_THRESHOLD", 5), + Envar("OMPTARGET_QUERY_COUNT_BACKOFF_FACTOR", 0.5f)); + + auto *AsyncInfo = (AsyncInfoTy *)*AsyncHandle; + + // If the thread is actively waiting on too many target nowait regions, we + // should use the blocking sync type. + if (QueryCounter.isAboveThreshold()) + AsyncInfo->SyncType = AsyncInfoTy::SyncTy::BLOCKING; + + // If there are device operations still pending, return immediately without + // deallocating the handle and increase the current thread query count. + if (!AsyncInfo->isDone()) { + QueryCounter.increment(); + return; + } + + // When a thread successfully completes a target nowait region, we + // exponentially backoff its query counter by the query factor. + QueryCounter.decrement(); + + // Delete the handle and unset it from the OpenMP task data. + delete AsyncInfo; + *AsyncHandle = nullptr; +} diff --git a/openmp/libomptarget/src/omptarget.cpp b/openmp/libomptarget/src/omptarget.cpp --- a/openmp/libomptarget/src/omptarget.cpp +++ b/openmp/libomptarget/src/omptarget.cpp @@ -24,13 +24,25 @@ int AsyncInfoTy::synchronize() { int Result = OFFLOAD_SUCCESS; - if (AsyncInfo.Queue) { - // If we have a queue we need to synchronize it now. - Result = Device.synchronize(*this); - assert(AsyncInfo.Queue == nullptr && - "The device plugin should have nulled the queue to indicate there " - "are no outstanding actions!"); + if (!isQueueEmpty()) { + switch (SyncType) { + case SyncTy::BLOCKING: + // If we have a queue we need to synchronize it now. + Result = Device.synchronize(*this); + assert(AsyncInfo.Queue == nullptr && + "The device plugin should have nulled the queue to indicate there " + "are no outstanding actions!"); + break; + case SyncTy::NON_BLOCKING: + Result = Device.queryAsync(*this); + break; + } } + + // Run any pending post-processing function registered on this async object. + if (Result == OFFLOAD_SUCCESS && isQueueEmpty()) + Result = runPostProcessing(); + return Result; } @@ -39,6 +51,30 @@ return BufferLocations.back(); } +bool AsyncInfoTy::isDone() { + synchronize(); + // The async info operations are completed when the internal queue is empty. + return isQueueEmpty(); +} + +int32_t AsyncInfoTy::runPostProcessing() { + size_t Size = PostProcessingFunctions.size(); + for (size_t I = 0; I < Size; ++I) { + const int Result = PostProcessingFunctions[I](); + if (Result != OFFLOAD_SUCCESS) + return Result; + } + + // Clear the vector up until the last known function, since post-processing + // procedures might add new procedures themselves. + const auto PrevBegin = PostProcessingFunctions.begin(); + PostProcessingFunctions.erase(PrevBegin, PrevBegin + Size); + + return OFFLOAD_SUCCESS; +} + +bool AsyncInfoTy::isQueueEmpty() const { return AsyncInfo.Queue == nullptr; } + /* All begin addresses for partially mapped structs must be 8-aligned in order * to ensure proper alignment of members. E.g. * @@ -696,12 +732,89 @@ } // namespace +/// Applies the necessary post-processing procedures to entries listed in \p +/// EntriesInfo after the execution of all device side operations from a target +/// data end. This includes the update of pointers at the host and removal of +/// device buffer when needed. It returns OFFLOAD_FAIL or OFFLOAD_SUCCESS +/// according to the successfulness of the operations. +static int +postProcessingTargetDataEnd(DeviceTy *Device, + SmallVector EntriesInfo, + void *FromMapperBase) { + int Ret = OFFLOAD_SUCCESS; + + for (PostProcessingInfo &Info : EntriesInfo) { + // If we marked the entry to be deleted we need to verify no other + // thread reused it by now. If deletion is still supposed to happen by + // this thread LR will be set and exclusive access to the HDTT map + // will avoid another thread reusing the entry now. Note that we do + // not request (exclusive) access to the HDTT map if Info.DelEntry is + // not set. + LookupResult LR; + DeviceTy::HDTTMapAccessorTy HDTTMap = + Device->HostDataToTargetMap.getExclusiveAccessor(!Info.DelEntry); + + if (Info.DelEntry) { + LR = Device->lookupMapping(HDTTMap, Info.HstPtrBegin, Info.DataSize); + if (LR.Entry->getTotalRefCount() != 0 || + LR.Entry->getDeleteThreadId() != std::this_thread::get_id()) { + // The thread is not in charge of deletion anymore. Give up access + // to the HDTT map and unset the deletion flag. + HDTTMap.destroy(); + Info.DelEntry = false; + } + } + + // If we copied back to the host a struct/array containing pointers, + // we need to restore the original host pointer values from their + // shadow copies. If the struct is going to be deallocated, remove any + // remaining shadow pointer entries for this struct. + auto CB = [&](ShadowPtrListTy::iterator &Itr) { + // If we copied the struct to the host, we need to restore the + // pointer. + if (Info.ArgType & OMP_TGT_MAPTYPE_FROM) { + void **ShadowHstPtrAddr = (void **)Itr->first; + *ShadowHstPtrAddr = Itr->second.HstPtrVal; + DP("Restoring original host pointer value " DPxMOD " for host " + "pointer " DPxMOD "\n", + DPxPTR(Itr->second.HstPtrVal), DPxPTR(ShadowHstPtrAddr)); + } + // If the struct is to be deallocated, remove the shadow entry. + if (Info.DelEntry) { + DP("Removing shadow pointer " DPxMOD "\n", DPxPTR((void **)Itr->first)); + auto OldItr = Itr; + Itr++; + Device->ShadowPtrMap.erase(OldItr); + } else { + ++Itr; + } + return OFFLOAD_SUCCESS; + }; + applyToShadowMapEntries(*Device, CB, Info.HstPtrBegin, Info.DataSize, + Info.TPR); + + // If we are deleting the entry the DataMapMtx is locked and we own + // the entry. + if (Info.DelEntry) { + if (!FromMapperBase || FromMapperBase != Info.HstPtrBegin) + Ret = Device->deallocTgtPtr(HDTTMap, LR, Info.DataSize); + + if (Ret != OFFLOAD_SUCCESS) { + REPORT("Deallocating data from device failed.\n"); + break; + } + } + } + + return Ret; +} + /// Internal function to undo the mapping and retrieve the data from the device. int targetDataEnd(ident_t *Loc, DeviceTy &Device, int32_t ArgNum, void **ArgBases, void **Args, int64_t *ArgSizes, int64_t *ArgTypes, map_var_info_t *ArgNames, void **ArgMappers, AsyncInfoTy &AsyncInfo, bool FromMapper) { - int Ret; + int Ret = OFFLOAD_SUCCESS; SmallVector PostProcessingPtrs; void *FromMapperBase = nullptr; // process each input. @@ -861,75 +974,15 @@ } } - // TODO: We should not synchronize here but pass the AsyncInfo object to the - // allocate/deallocate device APIs. - // - // We need to synchronize before deallocating data. - Ret = AsyncInfo.synchronize(); - if (Ret != OFFLOAD_SUCCESS) - return OFFLOAD_FAIL; - - // Deallocate target pointer - for (PostProcessingInfo &Info : PostProcessingPtrs) { - // If we marked the entry to be deleted we need to verify no other thread - // reused it by now. If deletion is still supposed to happen by this thread - // LR will be set and exclusive access to the HDTT map will avoid another - // thread reusing the entry now. Note that we do not request (exclusive) - // access to the HDTT map if Info.DelEntry is not set. - LookupResult LR; - DeviceTy::HDTTMapAccessorTy HDTTMap = - Device.HostDataToTargetMap.getExclusiveAccessor(!Info.DelEntry); - - if (Info.DelEntry) { - LR = Device.lookupMapping(HDTTMap, Info.HstPtrBegin, Info.DataSize); - if (LR.Entry->getTotalRefCount() != 0 || - LR.Entry->getDeleteThreadId() != std::this_thread::get_id()) { - // The thread is not in charge of deletion anymore. Give up access to - // the HDTT map and unset the deletion flag. - HDTTMap.destroy(); - Info.DelEntry = false; - } - } - - // If we copied back to the host a struct/array containing pointers, we - // need to restore the original host pointer values from their shadow - // copies. If the struct is going to be deallocated, remove any remaining - // shadow pointer entries for this struct. - auto CB = [&](ShadowPtrListTy::iterator &Itr) { - // If we copied the struct to the host, we need to restore the pointer. - if (Info.ArgType & OMP_TGT_MAPTYPE_FROM) { - void **ShadowHstPtrAddr = (void **)Itr->first; - *ShadowHstPtrAddr = Itr->second.HstPtrVal; - DP("Restoring original host pointer value " DPxMOD " for host " - "pointer " DPxMOD "\n", - DPxPTR(Itr->second.HstPtrVal), DPxPTR(ShadowHstPtrAddr)); - } - // If the struct is to be deallocated, remove the shadow entry. - if (Info.DelEntry) { - DP("Removing shadow pointer " DPxMOD "\n", DPxPTR((void **)Itr->first)); - auto OldItr = Itr; - Itr++; - Device.ShadowPtrMap.erase(OldItr); - } else { - ++Itr; - } - return OFFLOAD_SUCCESS; - }; - applyToShadowMapEntries(Device, CB, Info.HstPtrBegin, Info.DataSize, - Info.TPR); - - // If we are deleting the entry the DataMapMtx is locked and we own the - // entry. - if (Info.DelEntry) { - if (!FromMapperBase || FromMapperBase != Info.HstPtrBegin) - Ret = Device.deallocTgtPtr(HDTTMap, LR, Info.DataSize); - - if (Ret != OFFLOAD_SUCCESS) { - REPORT("Deallocating data from device failed.\n"); - break; - } - } - } + // Add post-processing functions + // TODO: We might want to remove `mutable` in the future by not changing the + // captured variables somehow. + AsyncInfo.addPostProcessingFunction( + [=, Device = &Device, + PostProcessingPtrs = std::move(PostProcessingPtrs)]() mutable -> int { + return postProcessingTargetDataEnd(Device, PostProcessingPtrs, + FromMapperBase); + }); return Ret; } @@ -969,20 +1022,22 @@ return OFFLOAD_FAIL; } - auto CB = [&](ShadowPtrListTy::iterator &Itr) { - void **ShadowHstPtrAddr = (void **)Itr->first; - // Wait for device-to-host memcopies for whole struct to complete, - // before restoring the correct host pointer. - if (AsyncInfo.synchronize() != OFFLOAD_SUCCESS) - return OFFLOAD_FAIL; - *ShadowHstPtrAddr = Itr->second.HstPtrVal; - DP("Restoring original host pointer value " DPxMOD - " for host pointer " DPxMOD "\n", - DPxPTR(Itr->second.HstPtrVal), DPxPTR(ShadowHstPtrAddr)); - ++Itr; + // Wait for device-to-host memcopies for whole struct to complete, + // before restoring the correct host pointer. + AsyncInfo.addPostProcessingFunction([=, Device = &Device]() -> int { + auto CB = [&](ShadowPtrListTy::iterator &Itr) { + void **ShadowHstPtrAddr = (void **)Itr->first; + *ShadowHstPtrAddr = Itr->second.HstPtrVal; + DP("Restoring original host pointer value " DPxMOD + " for host pointer " DPxMOD "\n", + DPxPTR(Itr->second.HstPtrVal), DPxPTR(ShadowHstPtrAddr)); + ++Itr; + return OFFLOAD_SUCCESS; + }; + applyToShadowMapEntries(*Device, CB, HstPtrBegin, ArgSize, TPR); + return OFFLOAD_SUCCESS; - }; - applyToShadowMapEntries(Device, CB, HstPtrBegin, ArgSize, TPR); + }); } if (ArgType & OMP_TGT_MAPTYPE_TO) { @@ -1159,19 +1214,19 @@ /// first-private arguments and transfer them all at once. struct FirstPrivateArgInfoTy { /// The index of the element in \p TgtArgs corresponding to the argument - const int Index; + int Index; /// Host pointer begin - const char *HstPtrBegin; + char *HstPtrBegin; /// Host pointer end - const char *HstPtrEnd; + char *HstPtrEnd; /// Aligned size - const int64_t AlignedSize; + int64_t AlignedSize; /// Host pointer name - const map_var_info_t HstPtrName = nullptr; + map_var_info_t HstPtrName = nullptr; - FirstPrivateArgInfoTy(int Index, const void *HstPtr, int64_t Size, + FirstPrivateArgInfoTy(int Index, void *HstPtr, int64_t Size, const map_var_info_t HstPtrName = nullptr) - : Index(Index), HstPtrBegin(reinterpret_cast(HstPtr)), + : Index(Index), HstPtrBegin(reinterpret_cast(HstPtr)), HstPtrEnd(HstPtrBegin + Size), AlignedSize(Size + Size % Alignment), HstPtrName(HstPtrName) {} }; @@ -1473,12 +1528,19 @@ return OFFLOAD_FAIL; } - // Free target memory for private arguments - Ret = PrivateArgumentManager.free(); - if (Ret != OFFLOAD_SUCCESS) { - REPORT("Failed to deallocate target memory for private args\n"); - return OFFLOAD_FAIL; - } + // Free target memory for private arguments after synchronization. + // TODO: We might want to remove `mutable` in the future by not changing the + // captured variables somehow. + AsyncInfo.addPostProcessingFunction( + [PrivateArgumentManager = + std::move(PrivateArgumentManager)]() mutable -> int { + int Ret = PrivateArgumentManager.free(); + if (Ret != OFFLOAD_SUCCESS) { + REPORT("Failed to deallocate target memory for private args\n"); + return OFFLOAD_FAIL; + } + return Ret; + }); return OFFLOAD_SUCCESS; } @@ -1530,7 +1592,7 @@ PrivateArgumentManagerTy PrivateArgumentManager(Device, AsyncInfo); - int Ret; + int Ret = OFFLOAD_SUCCESS; if (ArgNum) { // Process data, such as data mapping, before launching the kernel Ret = processDataBefore(Loc, DeviceId, HostPtr, ArgNum, ArgBases, Args, diff --git a/openmp/libomptarget/src/private.h b/openmp/libomptarget/src/private.h --- a/openmp/libomptarget/src/private.h +++ b/openmp/libomptarget/src/private.h @@ -117,6 +117,11 @@ kmp_depend_info_t *dep_list, kmp_int32 ndeps_noalias, kmp_depend_info_t *noalias_dep_list) __attribute__((weak)); +void **__kmpc_omp_get_target_async_handle_ptr(kmp_int32 gtid) + __attribute__((weak)); +bool __kmpc_omp_has_task_team(kmp_int32 gtid) __attribute__((weak)); +// Invalid GTID as defined by libomp; keep in sync +#define KMP_GTID_DNE (-2) #ifdef __cplusplus } #endif @@ -189,6 +194,98 @@ } } +// Wrapper for task stored async info objects. +class TaskAsyncInfoWrapperTy { + const int ExecThreadID = KMP_GTID_DNE; + AsyncInfoTy LocalAsyncInfo; + AsyncInfoTy *AsyncInfo = &LocalAsyncInfo; + void **TaskAsyncInfoPtr = nullptr; + +public: + TaskAsyncInfoWrapperTy(DeviceTy &Device) + : ExecThreadID(__kmpc_global_thread_num(NULL)), LocalAsyncInfo(Device) { + // If we failed to acquired the current global thread id, we cannot + // re-enqueue the current task. Thus we should use the local blocking async + // info. + if (ExecThreadID == KMP_GTID_DNE) + return; + + // Only tasks with an assigned task team can be re-enqueue and thus can + // use the non-blocking synchronization scheme. Thus we should use the local + // blocking async info, if we donĀ“t have one. + if (!__kmpc_omp_has_task_team(ExecThreadID)) + return; + + // Acquire a pointer to the AsyncInfo stored inside the current task being + // executed. + TaskAsyncInfoPtr = __kmpc_omp_get_target_async_handle_ptr(ExecThreadID); + + // If we cannot acquire such pointer, fallback to using the local blocking + // async info. + if (!TaskAsyncInfoPtr) + return; + + // When creating a new task async info, the task handle must always be + // invalid. We must never overwrite any task async handle and there should + // never be any valid handle store inside the task at this point. + assert((*TaskAsyncInfoPtr) == nullptr && + "Task async handle is not empty when dispatching new device " + "operations. The handle was not cleared properly or " + "__tgt_target_nowait_query should have been called!"); + + // If no valid async handle is present, a new AsyncInfo will be allocated + // and stored in the current task. + AsyncInfo = new AsyncInfoTy(Device, AsyncInfoTy::SyncTy::NON_BLOCKING); + *TaskAsyncInfoPtr = (void *)AsyncInfo; + } + + ~TaskAsyncInfoWrapperTy() { + // Local async info destruction is automatically handled by ~AsyncInfoTy. + if (AsyncInfo == &LocalAsyncInfo) + return; + + // If the are device operations still pending, return immediately without + // deallocating the handle. + if (!AsyncInfo->isDone()) + return; + + // Delete the handle and unset it from the OpenMP task data. + delete AsyncInfo; + *TaskAsyncInfoPtr = nullptr; + } + + operator AsyncInfoTy &() { return *AsyncInfo; } +}; + +// Implement exponential backoff counting. +// Linearly increments until given maximum, exponentially decrements based on +// given backoff factor. +class ExponentialBackoff { + int64_t Count = 0; + const int64_t MaxCount = 0; + const int64_t CountThreshold = 0; + const float BackoffFactor = 0.0f; + +public: + ExponentialBackoff(int64_t MaxCount, int64_t CountThreshold, + float BackoffFactor) + : MaxCount(MaxCount), CountThreshold(CountThreshold), + BackoffFactor(BackoffFactor) { + assert(MaxCount >= 0 && + "ExponentialBackoff: maximum count value should be non-negative"); + assert(CountThreshold >= 0 && + "ExponentialBackoff: count threshold value should be non-negative"); + assert(BackoffFactor >= 0 && BackoffFactor < 1 && + "ExponentialBackoff: backoff factor should be in [0, 1) interval"); + } + + void increment() { Count = std::min(Count + 1, MaxCount); } + + void decrement() { Count *= BackoffFactor; } + + bool isAboveThreshold() const { return Count > CountThreshold; } +}; + #include "llvm/Support/TimeProfiler.h" #define TIMESCOPE() llvm::TimeTraceScope TimeScope(__FUNCTION__) #define TIMESCOPE_WITH_IDENT(IDENT) \ diff --git a/openmp/libomptarget/src/rtl.cpp b/openmp/libomptarget/src/rtl.cpp --- a/openmp/libomptarget/src/rtl.cpp +++ b/openmp/libomptarget/src/rtl.cpp @@ -212,6 +212,8 @@ DynLibrary->getAddressOfSymbol("__tgt_rtl_run_target_team_region_async"); *((void **)&RTL.synchronize) = DynLibrary->getAddressOfSymbol("__tgt_rtl_synchronize"); + *((void **)&RTL.query_async) = + DynLibrary->getAddressOfSymbol("__tgt_rtl_query_async"); *((void **)&RTL.data_exchange) = DynLibrary->getAddressOfSymbol("__tgt_rtl_data_exchange"); *((void **)&RTL.data_exchange_async) = diff --git a/openmp/runtime/src/kmp.h b/openmp/runtime/src/kmp.h --- a/openmp/runtime/src/kmp.h +++ b/openmp/runtime/src/kmp.h @@ -2501,6 +2501,10 @@ } kmp_tasking_flags_t; +typedef struct kmp_target_data { + void *async_handle; // libomptarget async handle for task completion query +} kmp_target_data_t; + struct kmp_taskdata { /* aligned during dynamic allocation */ kmp_int32 td_task_id; /* id, assigned by debugger */ kmp_tasking_flags_t td_flags; /* task flags */ @@ -2543,6 +2547,7 @@ #if OMPT_SUPPORT ompt_task_info_t ompt_task_info; #endif + kmp_target_data_t td_target_data; }; // struct kmp_taskdata // Make sure padding above worked @@ -4042,6 +4047,10 @@ KMP_EXPORT void __kmp_set_teams_thread_limit(int limit); KMP_EXPORT int __kmp_get_teams_thread_limit(void); +/* Interface target task integration */ +KMP_EXPORT void **__kmpc_omp_get_target_async_handle_ptr(kmp_int32 gtid); +KMP_EXPORT bool __kmpc_omp_has_task_team(kmp_int32 gtid); + /* Lock interface routines (fast versions with gtid passed in) */ KMP_EXPORT void __kmpc_init_lock(ident_t *loc, kmp_int32 gtid, void **user_lock); diff --git a/openmp/runtime/src/kmp_tasking.cpp b/openmp/runtime/src/kmp_tasking.cpp --- a/openmp/runtime/src/kmp_tasking.cpp +++ b/openmp/runtime/src/kmp_tasking.cpp @@ -21,6 +21,9 @@ #include "ompt-specific.h" #endif +// Declaration of synchronization function from libomptarget. +extern "C" void __tgt_target_nowait_query(void **) KMP_WEAK_ATTRIBUTE_INTERNAL; + /* forward declaration */ static void __kmp_enable_tasking(kmp_task_team_t *task_team, kmp_info_t *this_thr); @@ -1063,7 +1066,7 @@ KMP_DEBUG_ASSERT(taskdata->td_flags.started == 1); KMP_DEBUG_ASSERT(taskdata->td_flags.freed == 0); - bool detach = false; + bool completed = true; if (UNLIKELY(taskdata->td_flags.detachable == TASK_DETACHABLE)) { if (taskdata->td_allow_completion_event.type == KMP_EVENT_ALLOW_COMPLETION) { @@ -1087,13 +1090,24 @@ // __kmp_fulfill_event might free taskdata at any time from now taskdata->td_flags.proxy = TASK_PROXY; // proxify! - detach = true; + completed = false; } __kmp_release_tas_lock(&taskdata->td_allow_completion_event.lock, gtid); } } - if (!detach) { + // Tasks with valid target async handles must be re-enqueued. + if (taskdata->td_target_data.async_handle != NULL) { + // Note: no need to translate gtid to its shadow. If the current thread is a + // hidden helper one, then the gtid is already correct. Otherwise, hidden + // helper threads are disabled, and gtid refers to a OpenMP thread. + __kmpc_give_task(task, __kmp_tid_from_gtid(gtid)); + if (KMP_HIDDEN_HELPER_THREAD(gtid)) + __kmp_hidden_helper_worker_thread_signal(); + completed = false; + } + + if (completed) { taskdata->td_flags.complete = 1; // mark the task as completed #if OMPT_SUPPORT @@ -1125,6 +1139,13 @@ // function KMP_DEBUG_ASSERT(taskdata->td_flags.executing == 1); taskdata->td_flags.executing = 0; // suspend the finishing task + + // Decrement the counter of hidden helper tasks to be executed. + if (taskdata->td_flags.hidden_helper) { + // Hidden helper tasks can only be executed by hidden helper threads. + KMP_ASSERT(KMP_HIDDEN_HELPER_THREAD(gtid)); + KMP_ATOMIC_DEC(&__kmp_unexecuted_hidden_helper_tasks); + } } KA_TRACE( @@ -1136,7 +1157,7 @@ // johnmc: if an asynchronous inquiry peers into the runtime system // it doesn't see the freed task as the current task. thread->th.th_current_task = resumed_task; - if (!detach) + if (completed) __kmp_free_task_and_ancestors(gtid, taskdata, thread); // TODO: GEH - make sure root team implicit task is initialized properly. @@ -1532,6 +1553,7 @@ parent_task->td_taskgroup; // task inherits taskgroup from the parent task taskdata->td_dephash = NULL; taskdata->td_depnode = NULL; + taskdata->td_target_data.async_handle = NULL; if (flags->tiedness == TASK_UNTIED) taskdata->td_last_tied = NULL; // will be set when the task is scheduled else @@ -1674,13 +1696,6 @@ } #endif - // Decreament the counter of hidden helper tasks to be executed - if (taskdata->td_flags.hidden_helper) { - // Hidden helper tasks can only be executed by hidden helper threads - KMP_ASSERT(KMP_HIDDEN_HELPER_THREAD(gtid)); - KMP_ATOMIC_DEC(&__kmp_unexecuted_hidden_helper_tasks); - } - // Proxy tasks are not handled by the runtime if (taskdata->td_flags.proxy != TASK_PROXY) { __kmp_task_start(gtid, task, current_task); // OMPT only if not discarded @@ -1783,7 +1798,12 @@ KMP_FSYNC_ACQUIRED(taskdata); // acquired self (new task) #endif - if (task->routine != NULL) { + if (taskdata->td_target_data.async_handle != NULL) { + // If we have a valid target async handle, that means that we have already + // executed the task routine once. We must query for the handle completion + // instead of re-executing the routine. + __tgt_target_nowait_query(&taskdata->td_target_data.async_handle); + } else if (task->routine != NULL) { #ifdef KMP_GOMP_COMPAT if (taskdata->td_flags.native) { ((void (*)(void *))(*(task->routine)))(task->shareds); @@ -5131,3 +5151,45 @@ modifier, task_dup); KA_TRACE(20, ("__kmpc_taskloop_5(exit): T#%d\n", gtid)); } + +/*! +@ingroup TASKING +@param gtid Global Thread ID of current thread +@return Returns a pointer to the thread's current task async handle. If no task +is present or gtid is invalid, returns NULL. + +Acqurires a pointer to the target async handle from the current task. +*/ +void **__kmpc_omp_get_target_async_handle_ptr(kmp_int32 gtid) { + if (gtid == KMP_GTID_DNE) + return NULL; + + kmp_info_t *thread = __kmp_thread_from_gtid(gtid); + kmp_taskdata_t *taskdata = thread->th.th_current_task; + + if (!taskdata) + return NULL; + + return &taskdata->td_target_data.async_handle; +} + +/*! +@ingroup TASKING +@param gtid Global Thread ID of current thread +@return Returns TRUE if the current task being executed of the given thread has +a task team allocated to it. Otherwise, returns FALSE. + +Checks if the current thread has a task team. +*/ +bool __kmpc_omp_has_task_team(kmp_int32 gtid) { + if (gtid == KMP_GTID_DNE) + return FALSE; + + kmp_info_t *thread = __kmp_thread_from_gtid(gtid); + kmp_taskdata_t *taskdata = thread->th.th_current_task; + + if (!taskdata) + return FALSE; + + return taskdata->td_task_team != NULL; +}