diff --git a/openmp/libomptarget/include/ompt_buffer_mgr.h b/openmp/libomptarget/include/ompt_buffer_mgr.h new file mode 100644 --- /dev/null +++ b/openmp/libomptarget/include/ompt_buffer_mgr.h @@ -0,0 +1,373 @@ +//===-- ompt_buffer_mgr.h - Target independent OpenMP target RTL -- C++ -*-===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// +// +// Interface to be used for generating and flushing OMPT device trace records. +// +//===----------------------------------------------------------------------===// + +#ifndef _OMPT_BUFFER_MGR_H_ +#define _OMPT_BUFFER_MGR_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +// TODO Start with 1 helper thread and add dynamically if required +// Number of helper threads must not execeed 32 since the +// thread-wait-tracker is 32 bits in length. +#define OMPT_NUM_HELPER_THREADS 4 + +#ifdef OMPT_SUPPORT +#define OMPT_TRACING_IF_ENABLED(stmts) \ + do { \ + stmts \ + } while (0) +#else +#define OMPT_TRACING_IF_ENABLED(stmts) +#endif + +/* + * Buffer manager for trace records generated by OpenMP master and + * worker threads. During device init, a tool may register a + * buffer-request and a buffer-completion callback. The buffer-request + * callback should be used to allocate new buffers as required. The + * buffer-complete callback should be used to return trace records to + * the tool. + * + * In addition to trace records, this class manages the helper threads + * for dispatching a range of trace records to the tool. + */ +class OmptTracingBufferMgr { +public: + /* + * A trace record (TR) holds the trace data. Its type + * can be ompt or native. Currently, only ompt type is implemented. + */ + + /* + * A TR can be in the following states: + * TR_init: initial state + * TR_ready: An OpenMP thread marks a TR ready when it is done + * populating the TR + * TR_released: A helper thread marks a TR released after it has + * completed returning the TR to the tool + */ + enum TRStatus { TR_init, TR_ready, TR_released }; + +private: + // Internal variable for tracking threads to wait for flush + uint32_t ThreadFlushTracker; + + // Internal variable for tracking threads shutting down + uint32_t ThreadShutdownTracker; + + /* + * Metadata capturing the state of a buffer of trace records. Once a + * buffer is allocated, trace records are carved out by the OpenMP + * threads. + * + * Id, Start, and TotalBytes are not changed once set. But Cursor, + * RemainingBytes, and isFull can be read/written more than + * once. Hence, accesses of the 2nd set of locations need to be + * synchronized. + */ + struct Buffer { + uint64_t Id; // Unique identifier of the buffer + void *Start; // Start of allocated space for trace records + void *Cursor; // Address of the last trace record carved out + size_t TotalBytes; // Total number of bytes in the allocated space + size_t RemainingBytes; // Total number of unused bytes + // corresponding to Cursor + bool isFull; // true if no more trace records can be accomodated, + // otherwise false + Buffer(uint64_t id, void *st, void *cr, size_t bytes, size_t rem, + bool is_full) + : Id{id}, Start{st}, Cursor{cr}, TotalBytes{bytes}, + RemainingBytes{rem}, isFull{is_full} {} + Buffer() = delete; + Buffer(const Buffer &) = delete; + Buffer &operator=(const Buffer &) = delete; + }; + + using BufPtr = std::shared_ptr; + using MapId2Buf = std::map; + + // Map from id to corresponding buffer. The ids are assigned in + // increasing order of creation. + MapId2Buf Id2BufferMap; + + // Trace record metadata + struct TraceRecordMd { + BufPtr BufAddr; // Enclosing buffer + TRStatus TRState; // Status of a trace record + TraceRecordMd(BufPtr buf) : BufAddr{buf}, TRState{TR_init} {} + TraceRecordMd() = delete; + TraceRecordMd(const TraceRecordMd &) = delete; + TraceRecordMd &operator=(const TraceRecordMd &) = delete; + }; + + using BufMdPtr = std::shared_ptr; + using UMapPtr2BufState = std::unordered_map; + + // A hashmap from cursor -> metadata containing the trace record + // status and the containing buffer + UMapPtr2BufState Cursor2BufMdMap; + + /* + * A buffer is flushed when it fills up or when the tool invokes + * flush_trace. So it's possible that the same buffer may be flushed + * more than once. When a buffer is flushed the first time, a unique + * id (flush-id) is generated and assigned to that buffer. Even if + * it is flushed again, the previously assigned id is maintained for + * that buffer. This id is loosely used to determine the order in + * which the buffers are processed and the corresponding trace + * records released to the tool. + */ + + struct FlushInfo { + uint64_t FlushId; + void *FlushCursor; + BufPtr FlushBuf; + FlushInfo() = default; + FlushInfo(uint64_t id, void *cr, BufPtr buf) + : FlushId{id}, FlushCursor{cr}, FlushBuf{buf} {} + }; + + /* + * A buffer may be in the following states: + * Flush_waiting: when a buffer is flushed, either because it is + * full or because the tool invokes ompt_flush_trace + * Flush_processing: when a helper thread claims the waiting buffer + * and is in the process of dispatching buffer-completion callbacks + * on an associated range of trace records. If all trace records are + * not released, the state may be reset to Flush_waiting after the + * buffer-completion callbacks return + */ + enum BufferFlushStatus { Flush_waiting, Flush_processing }; + struct FlushMd { + void *FlushCursor; + BufPtr FlushBuf; + BufferFlushStatus FlushStatus; + FlushMd(void *cr, BufPtr buf, BufferFlushStatus status) + : FlushCursor{cr}, FlushBuf{buf}, FlushStatus{status} {} + FlushMd() = delete; + }; + + using MapId2Md = std::map; + + /* + * A map from a flush-id to metadata containing the current + * cursor. the corresponding buffer, and its flushed status. If a + * buffer is flushed multiple times, the cursor is updated to the + * furthest one + */ + MapId2Md Id2FlushMdMap; + + using UMapBufPtr2Id = std::unordered_map; + + // A hash map from a buffer address to the corresponding flush-id + UMapBufPtr2Id FlushBufPtr2IdMap; + + using USetCursor = std::unordered_set; + + USetCursor LastCursors; + + using UMapThd2Id = std::unordered_map; + + // A hash map from a helper thread id to an integer + UMapThd2Id HelperThreadIdMap; + + // Mutex to protect Id2BufferMap and Cursor2BufMdMap + std::mutex BufferMgrMutex; + + // Mutex to protect FlushBufPtr2IdMap and Id2FlushMdMap + std::mutex FlushMutex; + + // Mutex to protect metadata tracking last cursors of buffer-completion + // callbacks + std::mutex LastCursorMutex; + + // Condition variable used by helper thread to signal that flush is requested + std::condition_variable FlushCv; + + // Condition variable used while waiting for flushing to complete + std::condition_variable ThreadFlushCv; + + // Condition variable used while waiting for threads to shutdown + std::condition_variable ThreadShutdownCv; + + // TODO Separate out the helper thread into its own class + std::vector CompletionThreads; + + // Called when a buffer may be flushed. setComplete should be called without + // holding any lock + void setComplete(void *cursor); + + // Called to dispatch buffer-completion callbacks for the trace records in + // this buffer + void flushBuffer(FlushInfo); + + // Dispatch a buffer-completion callback with a range of trace records + void dispatchCallback(void *buffer, void *first_cursor, void *last_cursor); + + // Add a last cursor + void addLastCursor(void *cursor) { + std::unique_lock lck(LastCursorMutex); + LastCursors.emplace(cursor); + } + + // Remove a last cursor + void removeLastCursor(void *cursor) { + std::unique_lock lck(LastCursorMutex); + assert(LastCursors.find(cursor) != LastCursors.end()); + LastCursors.erase(cursor); + } + + // Reserve a candidate buffer for flushing, preventing other helper threads + // from accessing it + FlushInfo findAndReserveFlushedBuf(uint64_t id); + + // Unreserve a buffer so that other helper threads can process it + void unreserveFlushedBuf(const FlushInfo &); + + // All done with this buffer, so the buffer and its metadata can be removed + void destroyFlushedBuf(const FlushInfo &); + + // Add a new buffer by an OpenMP thread so that a helper thread can process it + uint64_t addNewFlushEntry(BufPtr buf, void *cursor); + + // Get the next trace record + void *getNextTR(void *tr); + + // Get the size of a trace record + // We support only ompt records today + size_t getTRSize() { return sizeof(ompt_record_ompt_t); } + + // Given a buffer, return the latest cursor + void *getBufferCursor(BufPtr); + + // Is no more space remaining for trace records in this buffer? + bool isBufferFull(const FlushInfo &); + + // Have all trace records in this buffer been returned to the tool? + bool isBufferOwned(const FlushInfo &); + + // Dispatch a buffer-completion callback and indicate that the buffer can be + // deallocated + void dispatchBufferOwnedCallback(const FlushInfo &); + + // Main entry point for a helper thread + void driveCompletion(); + + // Examine the flushed buffers and dispatch buffer-completion callbacks + void invokeCallbacks(); + + // The caller does not hold a lock while calling this method + void waitForFlushCompletion(); + + // Given a thread number, set the corresponding bit in the flush + // tracker. The caller must hold the flush lock. + void setThreadFlush(uint32_t thd_num) { + ThreadFlushTracker |= (1 << thd_num); + } + + // Reset this thread's flush bit. The caller must hold the flush lock + void resetThisThreadFlush() { + std::thread::id id = std::this_thread::get_id(); + assert(HelperThreadIdMap.find(id) != HelperThreadIdMap.end()); + ThreadFlushTracker &= ~(1 << HelperThreadIdMap[id]); + } + + // Given a thread number, set the corresponding bit in the shutdown + // tracker. The caller must hold the flush lock. + void setThreadShutdown(uint32_t thd_num) { + ThreadShutdownTracker |= (1 << thd_num); + } + + // Reset this thread's shutdown bit. The caller must hold the flush + // lock + void resetThisThreadShutdown() { + std::thread::id id = std::this_thread::get_id(); + assert(HelperThreadIdMap.find(id) != HelperThreadIdMap.end()); + ThreadShutdownTracker &= ~(1 << HelperThreadIdMap[id]); + } + + // Return true if this thread's flush bit is set. The caller must + // hold the flush lock + bool isThisThreadFlushWaitedUpon() { + std::thread::id id = std::this_thread::get_id(); + assert(HelperThreadIdMap.find(id) != HelperThreadIdMap.end()); + return (ThreadFlushTracker & (1 << HelperThreadIdMap[id])) != 0; + } + + // Return true if this thread's shutdown bit is set. The caller must + // hold the flush lock + bool isThisThreadShutdownWaitedUpon() { + std::thread::id id = std::this_thread::get_id(); + assert(HelperThreadIdMap.find(id) != HelperThreadIdMap.end()); + return (ThreadShutdownTracker & (1 << HelperThreadIdMap[id])) != 0; + } + + // The caller must not hold the flush lock + bool amIHelperThread() { + std::unique_lock flush_lock(FlushMutex); + if (HelperThreadIdMap.find(std::this_thread::get_id()) != + HelperThreadIdMap.end()) + return true; + return false; + } + + // The caller must hold the appropriate lock + void init(); + + // The caller must hold the flush lock + void createHelperThreads(); + + // The caller must hold the flush lock + void destroyHelperThreads(); + +public: + OmptTracingBufferMgr(); + ~OmptTracingBufferMgr(); + + // The caller must not hold the flush lock + void startHelperThreads(); + + // The caller must not hold the flush lock + void shutdownHelperThreads(); + + // Assign a cursor for a new trace record + void *assignCursor(ompt_callbacks_t type); + + // Get the status of a trace record + TRStatus getTRStatus(void *tr); + + // Set the status of a trace record + void setTRStatus(void *tr, TRStatus); + + // Is this a last cursor of a buffer completion callback? + bool isLastCursor(void *cursor) { + std::unique_lock lck(LastCursorMutex); + return LastCursors.find(cursor) != LastCursors.end(); + } + + // Called for flushing outstanding buffers + int flushAllBuffers(ompt_device_t *); +}; + +#endif // _OMPT_BUFFER_MGR_H_ diff --git a/openmp/libomptarget/include/ompt_device_callbacks.h b/openmp/libomptarget/include/ompt_device_callbacks.h --- a/openmp/libomptarget/include/ompt_device_callbacks.h +++ b/openmp/libomptarget/include/ompt_device_callbacks.h @@ -20,6 +20,8 @@ //**************************************************************************** #include +#include +#include #include #include @@ -214,15 +216,103 @@ } }; + void ompt_callback_buffer_request(int device_num, ompt_buffer_t **buffer, + size_t *bytes) { + if (ompt_callback_buffer_request_fn) { + ompt_callback_buffer_request_fn(device_num, buffer, bytes); + } + } + + void ompt_callback_buffer_complete(int device_num, ompt_buffer_t *buffer, + size_t bytes, ompt_buffer_cursor_t begin, + int buffer_owned) { + if (ompt_callback_buffer_complete_fn) { + ompt_callback_buffer_complete_fn(device_num, buffer, bytes, begin, + buffer_owned); + } + } + void init() { enabled = false; + tracing_enabled = false; + tracing_type_enabled = 0; + #define init_name(name, type, code) name##_fn = 0; FOREACH_OMPT_TARGET_CALLBACK(init_name) #undef init_name - }; + + ompt_callback_buffer_request_fn = 0; + ompt_callback_buffer_complete_fn = 0; + } bool is_enabled() { return enabled; } + bool is_tracing_enabled() { return tracing_enabled; } + void set_tracing_enabled(bool b) { tracing_enabled = b; } + + bool is_tracing_type_enabled(unsigned int etype) { + assert(etype < 64); + if (etype < 64) + return (tracing_type_enabled & (1UL << etype)) != 0; + return false; + } + + void set_tracing_type_enabled(unsigned int etype, bool b) { + assert(etype < 64); + if (etype < 64) { + if (b) + tracing_type_enabled |= (1UL << etype); + else + tracing_type_enabled &= ~(1UL << etype); + } + } + + ompt_set_result_t set_trace_ompt(ompt_device_t *device, unsigned int enable, + unsigned int etype) { + // TODO handle device + + DP("set_trace_ompt: %d %d\n", etype, enable); + + bool is_event_enabled = enable > 0; + if (etype == 0) { + /* set/reset all supported types */ + set_tracing_type_enabled(ompt_callbacks_t::ompt_callback_target, + is_event_enabled); + set_tracing_type_enabled(ompt_callbacks_t::ompt_callback_target_data_op, + is_event_enabled); + set_tracing_type_enabled(ompt_callbacks_t::ompt_callback_target_submit, + is_event_enabled); + set_tracing_type_enabled(ompt_callbacks_t::ompt_callback_target_emi, + is_event_enabled); + set_tracing_type_enabled( + ompt_callbacks_t::ompt_callback_target_data_op_emi, is_event_enabled); + set_tracing_type_enabled( + ompt_callbacks_t::ompt_callback_target_submit_emi, is_event_enabled); + + if (is_event_enabled) + return ompt_set_sometimes; // a subset is enabled + else + return ompt_set_always; // we can disable for all + } + switch (etype) { + case ompt_callbacks_t::ompt_callback_target: + case ompt_callbacks_t::ompt_callback_target_data_op: + case ompt_callbacks_t::ompt_callback_target_submit: + case ompt_callbacks_t::ompt_callback_target_emi: + case ompt_callbacks_t::ompt_callback_target_data_op_emi: + case ompt_callbacks_t::ompt_callback_target_submit_emi: { + set_tracing_type_enabled(etype, is_event_enabled); + return ompt_set_always; + } + default: { + if (is_event_enabled) + return ompt_set_never; // unimplemented + else + return ompt_set_always; // always disabled anyways + } + } + } + void prepare_devices(int number_of_devices) { resize(number_of_devices); }; void register_callbacks(ompt_function_lookup_t lookup) { @@ -247,15 +337,29 @@ static ompt_interface_fn_t lookup(const char *interface_function_name); + void set_buffer_request(ompt_callback_buffer_request_t callback) { + ompt_callback_buffer_request_fn = callback; + } + + void set_buffer_complete(ompt_callback_buffer_complete_t callback) { + ompt_callback_buffer_complete_fn = callback; + } + private: bool enabled; + std::atomic tracing_enabled; + std::atomic tracing_type_enabled; #define declare_name(name, type, code) name##_t name##_fn; FOREACH_OMPT_TARGET_CALLBACK(declare_name) #undef declare_name - static void resize(int number_of_devices); + ompt_callback_buffer_request_t ompt_callback_buffer_request_fn; + ompt_callback_buffer_complete_t ompt_callback_buffer_complete_fn; + static ompt_device *lookup_device(int device_num); + + static void resize(int number_of_devices); static const char *documentation; }; diff --git a/openmp/libomptarget/plugins/amdgpu/src/ompt_callback.cpp b/openmp/libomptarget/plugins/amdgpu/src/ompt_callback.cpp --- a/openmp/libomptarget/plugins/amdgpu/src/ompt_callback.cpp +++ b/openmp/libomptarget/plugins/amdgpu/src/ompt_callback.cpp @@ -15,8 +15,10 @@ //**************************************************************************** #include +#include #include +#include #include //**************************************************************************** @@ -39,18 +41,178 @@ // macros //**************************************************************************** -#define FOREACH_TARGET_FN(macro) +// Supported device tracing entry points +#define FOREACH_TARGET_FN(macro) \ + macro(ompt_set_trace_ompt) macro(ompt_start_trace) macro(ompt_flush_trace) \ + macro(ompt_stop_trace) macro(ompt_advance_buffer_cursor) \ + macro(ompt_get_record_ompt) #define fnptr_to_ptr(x) ((void *)(uint64_t)x) #define ompt_ptr_unknown ((void *)0) +#define OMPT_API_ROUTINE static + +//**************************************************************************** +// private data +//**************************************************************************** + +// Mutexes to protect the function pointers +static std::mutex set_trace_mutex; +static std::mutex start_trace_mutex; +static std::mutex flush_trace_mutex; +static std::mutex stop_trace_mutex; +static std::mutex advance_buffer_cursor_mutex; + //**************************************************************************** // global data //**************************************************************************** ompt_device_callbacks_t ompt_device_callbacks; +typedef ompt_set_result_t (*libomptarget_ompt_set_trace_ompt_t)( + ompt_device_t *device, unsigned int enable, unsigned int etype); +typedef int (*libomptarget_ompt_start_trace_t)(ompt_callback_buffer_request_t, + ompt_callback_buffer_complete_t); +typedef int (*libomptarget_ompt_flush_trace_t)(ompt_device_t *); +typedef int (*libomptarget_ompt_stop_trace_t)(ompt_device_t *); +typedef int (*libomptarget_ompt_advance_buffer_cursor_t)( + ompt_device_t *, ompt_buffer_t *, size_t, ompt_buffer_cursor_t, + ompt_buffer_cursor_t *); + +libomptarget_ompt_set_trace_ompt_t ompt_set_trace_ompt_fn = nullptr; +libomptarget_ompt_start_trace_t ompt_start_trace_fn = nullptr; +libomptarget_ompt_flush_trace_t ompt_flush_trace_fn = nullptr; +libomptarget_ompt_stop_trace_t ompt_stop_trace_fn = nullptr; +libomptarget_ompt_advance_buffer_cursor_t ompt_advance_buffer_cursor_fn = + nullptr; + +// Runtime entry-points for device tracing + +OMPT_API_ROUTINE ompt_set_result_t ompt_set_trace_ompt(ompt_device_t *device, + unsigned int enable, + unsigned int etype) { + DP("Executing ompt_set_trace_ompt\n"); + + // TODO handle device + + { + // protect the function pointer + std::unique_lock lck(set_trace_mutex); + // plugin specific + ompt_device_callbacks.set_trace_ompt(device, enable, etype); + // libomptarget specific + if (!ompt_set_trace_ompt_fn) { + void *vptr = dlsym(NULL, "libomptarget_ompt_set_trace_ompt"); + assert(vptr && "OMPT set trace ompt entry point not found"); + ompt_set_trace_ompt_fn = + reinterpret_cast(vptr); + } + } + return ompt_set_trace_ompt_fn(device, enable, etype); +} + +OMPT_API_ROUTINE int +ompt_start_trace(ompt_device_t *device, ompt_callback_buffer_request_t request, + ompt_callback_buffer_complete_t complete) { + DP("OMPT: Executing ompt_start_trace\n"); + + // TODO handle device + + { + // protect the function pointer + std::unique_lock lck(start_trace_mutex); + // plugin specific + ompt_device_callbacks.set_buffer_request(request); + ompt_device_callbacks.set_buffer_complete(complete); + if (request && complete) + ompt_device_callbacks.set_tracing_enabled(true); + + // libomptarget specific + if (!ompt_start_trace_fn) { + void *vptr = dlsym(NULL, "libomptarget_ompt_start_trace"); + assert(vptr && "OMPT start trace entry point not found"); + ompt_start_trace_fn = + reinterpret_cast(vptr); + } + } + return ompt_start_trace_fn(request, complete); +} + +OMPT_API_ROUTINE int ompt_flush_trace(ompt_device_t *device) { + DP("OMPT: Executing ompt_flush_trace\n"); + + // TODO handle device + + { + // Protect the function pointer + std::unique_lock lck(flush_trace_mutex); + if (!ompt_flush_trace_fn) { + void *vptr = dlsym(NULL, "libomptarget_ompt_flush_trace"); + assert(vptr && "OMPT flush trace entry point not found"); + ompt_flush_trace_fn = + reinterpret_cast(vptr); + } + } + return ompt_flush_trace_fn(device); +} + +OMPT_API_ROUTINE int ompt_stop_trace(ompt_device_t *device) { + DP("OMPT: Executing ompt_stop_trace\n"); + + // TODO handle device + { + // Protect the function pointer + std::unique_lock lck(stop_trace_mutex); + ompt_device_callbacks.set_tracing_enabled(false); + + if (!ompt_stop_trace_fn) { + void *vptr = dlsym(NULL, "libomptarget_ompt_stop_trace"); + assert(vptr && "OMPT stop trace entry point not found"); + ompt_stop_trace_fn = + reinterpret_cast(vptr); + } + } + return ompt_stop_trace_fn(device); +} + +OMPT_API_ROUTINE ompt_record_ompt_t * +ompt_get_record_ompt(ompt_buffer_t *buffer, ompt_buffer_cursor_t current) { + // TODO In debug mode, get the metadata associated with this buffer + // and assert that there are enough bytes for the current record + + // Currently, no synchronization required since a disjoint set of + // trace records is handed over to a thread. + + // Note that current can be nullptr. In that case, we return + // nullptr. The tool has to handle that properly. + return (ompt_record_ompt_t *)current; +} + +OMPT_API_ROUTINE int +ompt_advance_buffer_cursor(ompt_device_t *device, ompt_buffer_t *buffer, + size_t size, /* bytes returned in the corresponding + callback, unused here */ + ompt_buffer_cursor_t current, + ompt_buffer_cursor_t *next) { + // Advance can be called concurrently, so synchronize setting the + // function pointer. The actual libomptarget function does not need + // to be synchronized since it must be working on logically disjoint + // buffers. + { + std::unique_lock lck(advance_buffer_cursor_mutex); + if (!ompt_advance_buffer_cursor_fn) { + void *vptr = dlsym(NULL, "libomptarget_ompt_advance_buffer_cursor"); + assert(vptr && "OMPT advance buffer cursor entry point not found"); + ompt_advance_buffer_cursor_fn = + reinterpret_cast(vptr); + } + } + return ompt_advance_buffer_cursor_fn(device, buffer, size, current, next); +} + +// End of runtime entry-points for trace records + //**************************************************************************** // private data //**************************************************************************** diff --git a/openmp/libomptarget/plugins/amdgpu/src/rtl.cpp b/openmp/libomptarget/plugins/amdgpu/src/rtl.cpp --- a/openmp/libomptarget/plugins/amdgpu/src/rtl.cpp +++ b/openmp/libomptarget/plugins/amdgpu/src/rtl.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -50,6 +51,10 @@ #define OMPT_IF_ENABLED(stmts) #endif +typedef void (*libomptarget_ompt_set_granted_teams_t)(uint32_t); +libomptarget_ompt_set_granted_teams_t ompt_set_granted_teams_fn = nullptr; +std::mutex granted_teams_mtx; + // hostrpc interface, FIXME: consider moving to its own include these are // statically linked into amdgpu/plugin if present from hostrpc_services.a, // linked as --whole-archive to override the weak symbols that are used to @@ -1098,6 +1103,22 @@ DP("Final %d num_groups and %d threadsPerGroup\n", num_groups, threadsPerGroup); +#ifdef OMPT_SUPPORT + if (ompt_device_callbacks.is_tracing_enabled()) { + { + std::unique_lock granted_teams_fn_lck(granted_teams_mtx); + if (!ompt_set_granted_teams_fn) { + void *vptr = dlsym(NULL, "libomptarget_ompt_set_granted_teams"); + assert(vptr && "OMPT set granted teams entry point not found"); + ompt_set_granted_teams_fn = + reinterpret_cast(vptr); + } + } + // No need to hold a lock + ompt_set_granted_teams_fn(num_groups); + } +#endif + launchVals res; res.WorkgroupSize = threadsPerGroup; res.GridSize = threadsPerGroup * num_groups; diff --git a/openmp/libomptarget/src/CMakeLists.txt b/openmp/libomptarget/src/CMakeLists.txt --- a/openmp/libomptarget/src/CMakeLists.txt +++ b/openmp/libomptarget/src/CMakeLists.txt @@ -16,6 +16,7 @@ ${CMAKE_CURRENT_SOURCE_DIR}/api.cpp ${CMAKE_CURRENT_SOURCE_DIR}/device.cpp ${CMAKE_CURRENT_SOURCE_DIR}/interface.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/ompt_buffer_mgr.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ompt_callback.cpp ${CMAKE_CURRENT_SOURCE_DIR}/rtl.cpp ${CMAKE_CURRENT_SOURCE_DIR}/omptarget.cpp @@ -38,6 +39,7 @@ endif() target_link_libraries(omptarget PRIVATE ${CMAKE_DL_LIBS} + ${OPENMP_PTHREAD_LIB} "-Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/exports") # Install libomptarget under the lib destination folder. diff --git a/openmp/libomptarget/src/device.cpp b/openmp/libomptarget/src/device.cpp --- a/openmp/libomptarget/src/device.cpp +++ b/openmp/libomptarget/src/device.cpp @@ -497,32 +497,40 @@ } void *DeviceTy::allocData(int64_t Size, void *HstPtr, int32_t Kind) { + uint64_t start_time = 0; void *codeptr = nullptr; OMPT_IF_ENABLED( codeptr = OMPT_GET_RETURN_ADDRESS(0); ompt_interface.ompt_state_set(OMPT_GET_FRAME_ADDRESS(0), codeptr); ompt_interface.target_data_alloc_begin(RTLDeviceID, HstPtr, Size, - codeptr);); + codeptr); + start_time = ompt_interface.get_ns_duration_since_epoch();); void *tgt_ptr = RTL->data_alloc(RTLDeviceID, Size, HstPtr, Kind); - OMPT_IF_ENABLED( - ompt_interface.target_data_alloc_end(RTLDeviceID, HstPtr, Size, codeptr); - ompt_interface.ompt_state_clear();); + OMPT_IF_ENABLED(ompt_interface.target_data_submit_trace_record_gen( + RTLDeviceID, ompt_target_data_alloc, tgt_ptr, HstPtr, Size, start_time); + ompt_interface.target_data_alloc_end(RTLDeviceID, HstPtr, + Size, codeptr); + ompt_interface.ompt_state_clear();); return tgt_ptr; } int32_t DeviceTy::deleteData(void *TgtPtrBegin) { + uint64_t start_time = 0; void *codeptr = nullptr; OMPT_IF_ENABLED( codeptr = OMPT_GET_RETURN_ADDRESS(0); ompt_interface.ompt_state_set(OMPT_GET_FRAME_ADDRESS(0), codeptr); ompt_interface.target_data_delete_begin(RTLDeviceID, TgtPtrBegin, - codeptr);); + codeptr); + start_time = ompt_interface.get_ns_duration_since_epoch();); int32_t status = RTL->data_delete(RTLDeviceID, TgtPtrBegin); OMPT_IF_ENABLED( + ompt_interface.target_data_submit_trace_record_gen( + DeviceID, ompt_target_data_delete, TgtPtrBegin, 0, 0, start_time); ompt_interface.target_data_delete_end(RTLDeviceID, TgtPtrBegin, codeptr); ompt_interface.ompt_state_clear();); @@ -544,12 +552,14 @@ : "unknown"); } + uint64_t start_time = 0; void *codeptr = nullptr; OMPT_IF_ENABLED( codeptr = OMPT_GET_RETURN_ADDRESS(0); ompt_interface.ompt_state_set(OMPT_GET_FRAME_ADDRESS(0), codeptr); ompt_interface.target_data_submit_begin(RTLDeviceID, TgtPtrBegin, - HstPtrBegin, Size, codeptr);); + HstPtrBegin, Size, codeptr); + start_time = ompt_interface.get_ns_duration_since_epoch();); int32_t status; if (!AsyncInfo || !RTL->data_submit_async || !RTL->synchronize) @@ -558,8 +568,11 @@ status = RTL->data_submit_async(RTLDeviceID, TgtPtrBegin, HstPtrBegin, Size, AsyncInfo); - OMPT_IF_ENABLED(ompt_interface.target_data_submit_end( - RTLDeviceID, TgtPtrBegin, HstPtrBegin, Size, codeptr); + OMPT_IF_ENABLED(ompt_interface.target_data_submit_trace_record_gen( + DeviceID, ompt_target_data_transfer_to_device, HstPtrBegin, TgtPtrBegin, + Size, start_time); + ompt_interface.target_data_submit_end( + RTLDeviceID, TgtPtrBegin, HstPtrBegin, Size, codeptr); ompt_interface.ompt_state_clear();); return status; } @@ -578,12 +591,14 @@ : "unknown"); } + uint64_t start_time = 0; void *codeptr = nullptr; OMPT_IF_ENABLED( codeptr = OMPT_GET_RETURN_ADDRESS(0); ompt_interface.ompt_state_set(OMPT_GET_FRAME_ADDRESS(0), codeptr); ompt_interface.target_data_retrieve_begin(RTLDeviceID, HstPtrBegin, - TgtPtrBegin, Size, codeptr);); + TgtPtrBegin, Size, codeptr); + start_time = ompt_interface.get_ns_duration_since_epoch();); int32_t status; if (!RTL->data_retrieve_async || !RTL->synchronize) @@ -592,10 +607,12 @@ status = RTL->data_retrieve_async(RTLDeviceID, HstPtrBegin, TgtPtrBegin, Size, AsyncInfo); - OMPT_IF_ENABLED(ompt_interface.target_data_retrieve_end( - RTLDeviceID, HstPtrBegin, TgtPtrBegin, Size, codeptr); + OMPT_IF_ENABLED(ompt_interface.target_data_submit_trace_record_gen( + DeviceID, ompt_target_data_transfer_from_device, TgtPtrBegin, HstPtrBegin, + Size, start_time); + ompt_interface.target_data_retrieve_end( + RTLDeviceID, HstPtrBegin, TgtPtrBegin, Size, codeptr); ompt_interface.ompt_state_clear();); - return status; } diff --git a/openmp/libomptarget/src/exports b/openmp/libomptarget/src/exports --- a/openmp/libomptarget/src/exports +++ b/openmp/libomptarget/src/exports @@ -44,6 +44,12 @@ __tgt_set_info_flag; __tgt_print_device_info; libomptarget_ompt_connect; + libomptarget_ompt_set_trace_ompt; + libomptarget_ompt_start_trace; + libomptarget_ompt_flush_trace; + libomptarget_ompt_stop_trace; + libomptarget_ompt_set_granted_teams; + libomptarget_ompt_advance_buffer_cursor; local: *; }; diff --git a/openmp/libomptarget/src/interface.cpp b/openmp/libomptarget/src/interface.cpp --- a/openmp/libomptarget/src/interface.cpp +++ b/openmp/libomptarget/src/interface.cpp @@ -112,7 +112,9 @@ OMPT_IF_ENABLED( codeptr = OMPT_GET_RETURN_ADDRESS(0); ompt_interface.ompt_state_set(OMPT_GET_FRAME_ADDRESS(0), codeptr); - ompt_interface.target_data_enter_begin(device_id, codeptr);); + ompt_interface.target_data_enter_begin(device_id, codeptr); + ompt_interface.target_trace_record_gen(device_id, ompt_target_enter_data, + ompt_scope_begin, codeptr);); if (checkDeviceAndCtors(device_id, loc)) { DP("Not offloading to device %" PRId64 "\n", device_id); @@ -140,7 +142,9 @@ rc = AsyncInfo.synchronize(); handleTargetOutcome(rc == OFFLOAD_SUCCESS, loc); - OMPT_IF_ENABLED(ompt_interface.target_data_enter_end(device_id, codeptr); + OMPT_IF_ENABLED(ompt_interface.target_trace_record_gen( + device_id, ompt_target_enter_data, ompt_scope_end, codeptr); + ompt_interface.target_data_enter_end(device_id, codeptr); ompt_interface.ompt_state_clear();); } @@ -211,7 +215,9 @@ OMPT_IF_ENABLED( codeptr = OMPT_GET_RETURN_ADDRESS(0); ompt_interface.ompt_state_set(OMPT_GET_FRAME_ADDRESS(0), codeptr); - ompt_interface.target_data_exit_begin(device_id, codeptr);); + ompt_interface.target_data_exit_begin(device_id, codeptr); + ompt_interface.target_trace_record_gen(device_id, ompt_target_exit_data, + ompt_scope_begin, codeptr);); int rc = targetDataEnd(loc, Device, arg_num, args_base, args, arg_sizes, arg_types, arg_names, arg_mappers, AsyncInfo); @@ -219,7 +225,9 @@ rc = AsyncInfo.synchronize(); handleTargetOutcome(rc == OFFLOAD_SUCCESS, loc); - OMPT_IF_ENABLED(ompt_interface.target_data_exit_end(device_id, codeptr); + OMPT_IF_ENABLED(ompt_interface.target_trace_record_gen( + device_id, ompt_target_exit_data, ompt_scope_end, codeptr); + ompt_interface.target_data_exit_end(device_id, codeptr); ompt_interface.ompt_state_clear();); } @@ -260,13 +268,6 @@ void **arg_mappers) { TIMESCOPE_WITH_IDENT(loc); DP("Entering data update with %d mappings\n", arg_num); - - void *codeptr = nullptr; - OMPT_IF_ENABLED( - codeptr = OMPT_GET_RETURN_ADDRESS(0); - ompt_interface.ompt_state_set(OMPT_GET_FRAME_ADDRESS(0), codeptr); - ompt_interface.target_update_begin(device_id, codeptr);); - if (checkDeviceAndCtors(device_id, loc)) { DP("Not offloading to device %" PRId64 "\n", device_id); return; @@ -276,6 +277,14 @@ printKernelArguments(loc, device_id, arg_num, arg_sizes, arg_types, arg_names, "Updating OpenMP data"); + void *codeptr = nullptr; + OMPT_IF_ENABLED( + codeptr = OMPT_GET_RETURN_ADDRESS(0); + ompt_interface.ompt_state_set(OMPT_GET_FRAME_ADDRESS(0), codeptr); + ompt_interface.target_update_begin(device_id, codeptr); + ompt_interface.target_trace_record_gen(device_id, ompt_target_update, + ompt_scope_begin, codeptr);); + DeviceTy &Device = *PM->Devices[device_id]; AsyncInfoTy AsyncInfo(Device); int rc = targetDataUpdate(loc, Device, arg_num, args_base, args, arg_sizes, @@ -284,7 +293,9 @@ rc = AsyncInfo.synchronize(); handleTargetOutcome(rc == OFFLOAD_SUCCESS, loc); - OMPT_IF_ENABLED(ompt_interface.target_update_end(device_id, codeptr); + OMPT_IF_ENABLED(ompt_interface.target_trace_record_gen( + device_id, ompt_target_update, ompt_scope_end, codeptr); + ompt_interface.target_update_end(device_id, codeptr); ompt_interface.ompt_state_clear();); } @@ -350,7 +361,9 @@ OMPT_IF_ENABLED( codeptr = OMPT_GET_RETURN_ADDRESS(0); ompt_interface.ompt_state_set(OMPT_GET_FRAME_ADDRESS(0), codeptr); - ompt_interface.target_begin(device_id, codeptr);); + ompt_interface.target_begin(device_id, codeptr); + ompt_interface.target_trace_record_gen(device_id, ompt_target, + ompt_scope_begin, codeptr);); int rc = target(loc, Device, host_ptr, arg_num, args_base, args, arg_sizes, arg_types, arg_names, arg_mappers, 0, 0, false /*team*/, @@ -359,7 +372,9 @@ rc = AsyncInfo.synchronize(); handleTargetOutcome(rc == OFFLOAD_SUCCESS, loc); - OMPT_IF_ENABLED(ompt_interface.target_end(device_id, codeptr); + OMPT_IF_ENABLED(ompt_interface.target_trace_record_gen( + device_id, ompt_target, ompt_scope_end, codeptr); + ompt_interface.target_end(device_id, codeptr); ompt_interface.ompt_state_clear();); assert(rc == OFFLOAD_SUCCESS && "__tgt_target_mapper unexpected failure!"); @@ -435,7 +450,9 @@ OMPT_IF_ENABLED( codeptr = OMPT_GET_RETURN_ADDRESS(0); ompt_interface.ompt_state_set(OMPT_GET_FRAME_ADDRESS(0), codeptr); - ompt_interface.target_begin(device_id, codeptr);); + ompt_interface.target_begin(device_id, codeptr); + ompt_interface.target_trace_record_gen(device_id, ompt_target, + ompt_scope_begin, codeptr);); int rc = target(loc, Device, host_ptr, arg_num, args_base, args, arg_sizes, arg_types, arg_names, arg_mappers, team_num, thread_limit, @@ -444,7 +461,9 @@ rc = AsyncInfo.synchronize(); handleTargetOutcome(rc == OFFLOAD_SUCCESS, loc); - OMPT_IF_ENABLED(ompt_interface.target_end(device_id, codeptr); + OMPT_IF_ENABLED(ompt_interface.target_trace_record_gen( + device_id, ompt_target, ompt_scope_end, codeptr); + ompt_interface.target_end(device_id, codeptr); ompt_interface.ompt_state_clear();); assert(rc == OFFLOAD_SUCCESS && diff --git a/openmp/libomptarget/src/ompt_buffer_mgr.cpp b/openmp/libomptarget/src/ompt_buffer_mgr.cpp new file mode 100644 --- /dev/null +++ b/openmp/libomptarget/src/ompt_buffer_mgr.cpp @@ -0,0 +1,663 @@ +//=== ompt_buffer_mgr.cpp - Target independent OpenMP target RTL -- C++ -*-===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// +// +// Implementation of OMPT device trace record generation and flushing. +// +//===----------------------------------------------------------------------===// + +#include +#include +#include +#include + +#include + +#include "private.h" + +#include +#include + +extern ompt_device_callbacks_t ompt_device_callbacks; + +// When set to true, helper threads terminate their work +static bool done_tracing{false}; + +// Unique buffer id in creation order +static std::atomic buf_id{0}; + +// Unique id in buffer flush order +static std::atomic flush_id{0}; + +static uint64_t get_and_inc_buf_id() { return buf_id++; } +static uint64_t get_buf_id() { return buf_id; } + +static uint64_t get_and_inc_flush_id() { return flush_id++; } +static uint64_t get_flush_id() { return flush_id; } + +/* + * Used by OpenMP threads for assigning space for a trace record. If + * there is no space in the last buffer allocated, the last buffer is + * marked full and scheduled for flushing. Otherwise, space is + * assigned for a trace record and the new cursor returned. + */ +void *OmptTracingBufferMgr::assignCursor(ompt_callbacks_t type) { + // TODO Currently, we are serializing assignment of space for new + // trace records as well as allocation of new buffers. This can be + // changed by maintaining thread local info + size_t rec_size = getTRSize(); + void *to_be_flushed_cursor = nullptr; + std::unique_lock lck(BufferMgrMutex); + if (!Id2BufferMap.empty()) { + // Try to assign a trace record from the last allocated buffer + BufPtr buf = Id2BufferMap.rbegin()->second; + if (rec_size <= buf->RemainingBytes) { + // This is the new cursor + assert((char *)buf->Start + buf->TotalBytes - buf->RemainingBytes == + (char *)buf->Cursor + rec_size); + buf->Cursor = (char *)buf->Start + buf->TotalBytes - buf->RemainingBytes; + buf->RemainingBytes -= rec_size; + assert(Cursor2BufMdMap.find(buf->Cursor) == Cursor2BufMdMap.end()); + Cursor2BufMdMap[buf->Cursor] = std::make_shared(buf); + DP("Assigned %lu bytes at %p in existing buffer %p\n", rec_size, + buf->Cursor, buf->Start); + return buf->Cursor; + } else { + to_be_flushed_cursor = buf->Cursor; + buf->isFull = true; // no space for any more trace records + } + } + void *buffer = nullptr; + size_t total_bytes; + // TODO Move the buffer allocation to a helper thread + ompt_device_callbacks.ompt_callback_buffer_request(0 /* device_num */, + &buffer, &total_bytes); + + // TODO Instead of asserting, turn OFF tracing + assert(total_bytes >= rec_size && "Buffer is too small"); + assert(buffer != nullptr && "Buffer request function failed"); + + uint64_t new_buf_id = get_and_inc_buf_id(); + auto new_buf = std::make_shared( + new_buf_id, buffer /* start */, buffer /* cursor */, total_bytes, + total_bytes - rec_size, /* remaining bytes */ + false /* is full? */); + auto buf_md_ptr = std::make_shared(new_buf); + assert(Cursor2BufMdMap.find(new_buf->Cursor) == Cursor2BufMdMap.end()); + Cursor2BufMdMap[new_buf->Cursor] = buf_md_ptr; + + assert(Id2BufferMap.find(new_buf_id) == Id2BufferMap.end()); + Id2BufferMap[new_buf_id] = new_buf; + + lck.unlock(); + + // Schedule this buffer for flushing till this cursor + if (to_be_flushed_cursor) + setComplete(to_be_flushed_cursor); + + DP("Assigned %lu bytes at %p in new buffer with id %lu\n", rec_size, buffer, + new_buf_id); + + return buffer; +} + +/* + * Called by an OpenMP thread when a buffer fills up and should be + * flushed. This function assigns a new flush_id to the buffer, adds + * to the flush-related metadata and wakes up a helper thread to + * dispatch a buffer-completion callback. This function should be + * called without holding any lock. + * Note lock order: buf_lock -> flush_lock + */ +void OmptTracingBufferMgr::setComplete(void *cursor) { + std::unique_lock buf_lock(BufferMgrMutex); + auto buf_itr = Cursor2BufMdMap.find(cursor); + // Between calling setComplete and this check, a flush-all may have + // delivered this buffer to the tool and deleted it. So the buffer + // may not exist. + if (buf_itr == Cursor2BufMdMap.end()) + return; + + // Cannot assert that the state of the cursor is ready since a + // different thread may be in the process of populating it. If it + // remains in init state when the range of trace records is + // determined for dispatching the buffer-completion callback, it + // will not be included. + BufPtr buf = buf_itr->second->BufAddr; + + std::unique_lock flush_lock(FlushMutex); + uint64_t flush_id; + auto flush_itr = FlushBufPtr2IdMap.find(buf); + if (flush_itr == FlushBufPtr2IdMap.end()) { + // This buffer has not been flushed yet + addNewFlushEntry(buf, cursor); + } else { + // This buffer has been flushed before + flush_id = flush_itr->second; + auto flush_md_itr = Id2FlushMdMap.find(flush_id); + assert(flush_md_itr != Id2FlushMdMap.end()); + flush_md_itr->second.FlushCursor = cursor; // update the cursor + // Do not update the flush status since it may be under processing + // by another thread + DP("Updated id %lu cursor %p buf %p\n", flush_id, cursor, + flush_md_itr->second.FlushBuf->Start); + } + flush_lock.unlock(); + buf_lock.unlock(); + + // Wake up a helper thread to invoke the buffer-completion callback + FlushCv.notify_one(); +} + +// This is the driver routine for the completion thread +void OmptTracingBufferMgr::driveCompletion() { + while (true) { + bool should_signal_workers = false; + std::unique_lock flush_lock(FlushMutex); + if (done_tracing) { + // An upper layer serializes flush_trace and stop_trace. In + // addition, before done_tracing is set, a flush is performed as + // part of stop_trace. So assert that no flush is in progress. + assert(ThreadFlushTracker == 0); + break; + } + FlushCv.wait(flush_lock, [this] { + return done_tracing || + (!Id2FlushMdMap.empty() && + ompt_device_callbacks.is_tracing_enabled()) || + isThisThreadFlushWaitedUpon(); + }); + if (isThisThreadFlushWaitedUpon()) { + resetThisThreadFlush(); + if (ThreadFlushTracker == 0) + should_signal_workers = true; + } + flush_lock.unlock(); + + invokeCallbacks(); + + if (should_signal_workers) + ThreadFlushCv.notify_all(); + + // There is a scenario where a buffer was processed but not full + // or owned, so it was put back in waiting state. So this thread + // would not wait but keep on looping without having any actual + // work until new trace records are added and this thread + // signaled. Hence, this thread yields. + std::this_thread::yield(); + } + bool is_last_helper = false; + std::unique_lock flush_lock(FlushMutex); + assert(done_tracing && "Helper thread exiting but not yet done"); + assert(isThisThreadShutdownWaitedUpon() && + "Helper thread exiting but not waited upon"); + resetThisThreadShutdown(); + if (ThreadShutdownTracker == 0) + is_last_helper = true; + flush_lock.unlock(); + if (is_last_helper) + ThreadShutdownCv.notify_all(); + + // Note that some trace records may have been written but not + // delivered to the tool. If flush/stop APIs are not called by the + // tool, those trace records may never be delivered to the tool and + // the corresponding buffers not reclaimed. TODO Explore whether + // this cleanup must be done. +} + +/* + * Called by a buffer-completion helper thread. This function examines + * the flushed buffers in flush order and dispatches + * callbacks. Lock holding is minimized by reserving a buffer, + * processing it, and then unreserving it if there are more trace + * records to flush later. If all trace records are flushed, a + * callback is dispatched informing the tool that the buffer can be + * deallocated. If the buffer can be deallocated, all metadata is + * destroyed. + * Note that this function must be called without holding any locks. + */ +void OmptTracingBufferMgr::invokeCallbacks() { + DP("Looking for callbacks to invoke\n"); + auto max_id = std::numeric_limits::max(); + auto curr_id = max_id; + auto end_id = get_flush_id(); + DP("End id is %lu\n", end_id); + while (true) { + // Set the status of the flushed buffer to in-processing so that + // another helper thread does not process it concurrently. An + // OpenMP worker thread may, however, populate a trace record in a + // reserved buffer concurrently. + FlushInfo flush_info = findAndReserveFlushedBuf(curr_id); + + // no entry found, nothing to process + if (curr_id == max_id && flush_info.FlushCursor == nullptr) + return; + + if (flush_info.FlushCursor != nullptr) { + // increment curr_id to get the candidate for the next iteration + curr_id = flush_info.FlushId + 1; + } else { + assert(curr_id != max_id && "Cannot increment max id"); + ++curr_id; + } + + DP("Next id will be %lu\n", curr_id); + + if (flush_info.FlushCursor == nullptr) { + // This buffer must have been processed already + if (curr_id < end_id) + continue; + else + return; // nothing else to process + } + + DP("Buf %p Cursor %p Id %lu will be flushed\n", flush_info.FlushBuf->Start, + flush_info.FlushCursor, flush_info.FlushId); + + // Examine the status of the trace records and dispatch + // buffer-completion callbacks as appropriate. + flushBuffer(flush_info); + + // TODO optimize to set buffer-owned in the same pass above. + // Currently, this is the only way a buffer is deallocated + if (isBufferFull(flush_info)) { + // All trace records have been delivered to the tool + if (isBufferOwned(flush_info)) { + // erase element from buffer and flush maps + destroyFlushedBuf(flush_info); + + // dispatch callback with a null range and have the tool + // deallocate the buffer + dispatchBufferOwnedCallback(flush_info); + } else { + unreserveFlushedBuf(flush_info); + } + } else { + unreserveFlushedBuf(flush_info); + } + if (curr_id >= end_id) + return; + } +} + +/* + * This function is called on a buffer that is already reserved by + * this thread. Buffer-completion callbacks are dispatched for every + * range of trace records that are ready. + * This routine must be called without holding locks + */ +void OmptTracingBufferMgr::flushBuffer(FlushInfo flush_info) { + assert(flush_info.FlushBuf && "Cannot flush an empty buffer"); + assert(flush_info.FlushCursor && "Cannot flush upto a null cursor"); + + void *curr_tr = flush_info.FlushBuf->Start; + void *last_tr = flush_info.FlushCursor; + // Compute a range [first_cursor,last_cursor] to flush + void *first_cursor = nullptr; + void *last_cursor = nullptr; + while (curr_tr <= last_tr) { + TRStatus tr_status = getTRStatus(curr_tr); + if (tr_status == TR_init || tr_status == TR_released) { + if (first_cursor == nullptr) { + // This TR won't be part of a range + assert(last_cursor == nullptr && + "Begin/last cursors mutually inconsistent"); + } else { + // End the current interval + dispatchCallback(flush_info.FlushBuf->Start, first_cursor, last_cursor); + first_cursor = last_cursor = nullptr; + } + } else { + assert(tr_status == TR_ready && "Unknown trace record status"); + setTRStatus(curr_tr, TR_released); + if (first_cursor == nullptr) + first_cursor = curr_tr; + last_cursor = curr_tr; + } + curr_tr = getNextTR(curr_tr); + } + if (first_cursor != nullptr) { + assert(last_cursor != nullptr); + dispatchCallback(flush_info.FlushBuf->Start, first_cursor, last_cursor); + } +} + +// Given a range of trace records, dispatch a buffer-completion callback +void OmptTracingBufferMgr::dispatchCallback(void *buffer, void *first_cursor, + void *last_cursor) { + assert(first_cursor != nullptr && last_cursor != nullptr && + "Callback with nullptr"); + addLastCursor(last_cursor); + + // This is best effort. + // There is a small window when the buffer-completion callback may + // be invoked even after tracing has been disabled. + // Note that we don't want to hold a lock when dispatching the callback. + if (ompt_device_callbacks.is_tracing_enabled()) { + DP("Dispatch callback w/ range (inclusive) to be flushed: %p -> %p\n", + first_cursor, last_cursor); + ompt_device_callbacks.ompt_callback_buffer_complete( + 0 /* TODO device num */, buffer, + /* bytes returned in this callback */ + (char *)getNextTR(last_cursor) - (char *)first_cursor, + (ompt_buffer_cursor_t)first_cursor, false /* buffer_owned */); + } + + removeLastCursor(last_cursor); +} + +// Dispatch a buffer-completion callback with buffer_owned set so that +// the tool can deallocate the buffer +void OmptTracingBufferMgr::dispatchBufferOwnedCallback( + const FlushInfo &flush_info) { + // This is best effort. + // There is a small window when the buffer-completion callback may + // be invoked even after tracing has been disabled. + // Note that we don't want to hold a lock when dispatching the callback. + if (ompt_device_callbacks.is_tracing_enabled()) { + DP("Dispatch callback with buffer %p owned\n", flush_info.FlushBuf->Start); + ompt_device_callbacks.ompt_callback_buffer_complete( + 0, flush_info.FlushBuf->Start, 0, (ompt_buffer_cursor_t)0, + true /* buffer owned */); + } +} + +void OmptTracingBufferMgr::setTRStatus(void *rec, TRStatus status) { + std::unique_lock buf_lock(BufferMgrMutex); + auto itr = Cursor2BufMdMap.find(rec); + assert(itr != Cursor2BufMdMap.end()); + itr->second->TRState = status; +} + +OmptTracingBufferMgr::TRStatus OmptTracingBufferMgr::getTRStatus(void *rec) { + std::unique_lock buf_lock(BufferMgrMutex); + auto itr = Cursor2BufMdMap.find(rec); + assert(itr != Cursor2BufMdMap.end()); + return itr->second->TRState; +} + +void *OmptTracingBufferMgr::getNextTR(void *rec) { + size_t rec_size = getTRSize(); + // warning: no overflow check done + return (char *)rec + rec_size; +} + +bool OmptTracingBufferMgr::isBufferFull(const FlushInfo &flush_info) { + std::unique_lock buf_lock(BufferMgrMutex); + return flush_info.FlushBuf->isFull; +} + +void *OmptTracingBufferMgr::getBufferCursor(BufPtr buf) { + std::unique_lock buf_lock(BufferMgrMutex); + return buf->Cursor; +} + +/* + * Traverse all the trace records of a buffer and return true if all + * of them have been released to the tool, otherwise return false + */ +bool OmptTracingBufferMgr::isBufferOwned(const FlushInfo &flush_info) { + assert(isBufferFull(flush_info) && "Compute buffer-owned when it is full"); + void *curr_tr = flush_info.FlushBuf->Start; + // Since the buffer is full, the cursor must be the last valid + // TR. Note that this may be more up-to-date than the cursor in the + // flush_info. Use the last valid TR to avoid dropping trace records + void *last_tr = getBufferCursor(flush_info.FlushBuf); + while (curr_tr <= last_tr) { + if (getTRStatus(curr_tr) != TR_released) + return false; + curr_tr = getNextTR(curr_tr); + } + return true; +} + +/* + * A buffer must be reserved by a thread before it can be processed + * and callbacks dispatched for that buffer. Reservation is done by + * setting the status to in-processing. + * + * If a buffer is found in the flush metadata for the given id and it + * is not in in-processing mode, reserve it by setting its mode to + * in-processing and return the corresponding flush metadata. If the + * given id is set to max, return the first waiting buffer in the + * list of buffers to be flushed. + */ +OmptTracingBufferMgr::FlushInfo +OmptTracingBufferMgr::findAndReserveFlushedBuf(uint64_t flush_id) { + std::unique_lock flush_lock(FlushMutex); + MapId2Md::iterator flush_itr; + if (flush_id == std::numeric_limits::max()) { + // Reserve the first waiting buffer and return it + if (Id2FlushMdMap.empty()) + return FlushInfo(); + for (flush_itr = Id2FlushMdMap.begin(); flush_itr != Id2FlushMdMap.end(); + ++flush_itr) { + // Reserve only if waiting + if (flush_itr->second.FlushStatus == Flush_waiting) + break; + } + if (flush_itr == Id2FlushMdMap.end()) + return FlushInfo(); + } else { + flush_itr = Id2FlushMdMap.find(flush_id); + if (flush_itr == Id2FlushMdMap.end() || + flush_itr->second.FlushStatus == Flush_processing) + return FlushInfo(); + } + assert(flush_itr->second.FlushStatus == Flush_waiting); + flush_itr->second.FlushStatus = Flush_processing; + FlushInfo flush_info(flush_itr->first, flush_itr->second.FlushCursor, + flush_itr->second.FlushBuf); + DP("Reserved buffer: flush_id:%lu, cursor:%p, buf:%p\n", flush_itr->first, + flush_itr->second.FlushCursor, flush_itr->second.FlushBuf->Start); + return flush_info; +} + +/* + * Given a buffer, verify that it is in processing state and set its + * status to waiting, removing the reservation. The same thread that + * reserved it should be unreserving it but currently there is no such + * check. + */ +void OmptTracingBufferMgr::unreserveFlushedBuf(const FlushInfo &flush_info) { + std::unique_lock flush_lock(FlushMutex); + auto itr = Id2FlushMdMap.find(flush_info.FlushId); + assert(itr != Id2FlushMdMap.end() && + itr->second.FlushStatus == Flush_processing); + itr->second.FlushStatus = Flush_waiting; + DP("Unreserved buffer: flush_id:%lu, cursor:%p, buf:%p\n", flush_info.FlushId, + flush_info.FlushCursor, flush_info.FlushBuf->Start); +} + +/* + * This function must be called after all of the trace records in the + * buffer have been released to the tool. The buffer is removed from + * all metadata maps. + * Note lock order: buf_lock -> flush_lock + */ +void OmptTracingBufferMgr::destroyFlushedBuf(const FlushInfo &flush_info) { + DP("Destroying buffer: flush_id:%lu, cursor:%p, buf:%p\n", flush_info.FlushId, + flush_info.FlushCursor, flush_info.FlushBuf->Start); + + BufPtr buf = flush_info.FlushBuf; + + std::unique_lock buf_lock(BufferMgrMutex); + // Mapping info for all cursors in this buffer must be erased. This + // can be done since only fully populated buffers are destroyed. + char *curr_cursor = (char*)flush_info.FlushBuf->Start; + size_t total_valid_bytes = (buf->TotalBytes / getTRSize()) * getTRSize(); + char *end_cursor = curr_cursor + total_valid_bytes; + while (curr_cursor != end_cursor) { + auto buf_itr = Cursor2BufMdMap.find(curr_cursor); + assert(buf_itr != Cursor2BufMdMap.end() && + "Cursor not found in buffer metadata map"); + assert(buf_itr->second->BufAddr == buf); + Cursor2BufMdMap.erase(buf_itr); + curr_cursor += getTRSize(); + } + Id2BufferMap.erase(buf->Id); + + std::unique_lock flush_lock(FlushMutex); + auto flush_itr = Id2FlushMdMap.find(flush_info.FlushId); + assert(flush_itr != Id2FlushMdMap.end()); + assert(flush_itr->second.FlushBuf == buf); + Id2FlushMdMap.erase(flush_itr); + FlushBufPtr2IdMap.erase(buf); +} + +/* + * Generate a new flush id and add the buffer to the flush metadata + * maps. This function must be called while holding the flush lock. + */ +uint64_t OmptTracingBufferMgr::addNewFlushEntry(BufPtr buf, void *cursor) { + assert(FlushBufPtr2IdMap.find(buf) == FlushBufPtr2IdMap.end()); + uint64_t flush_id = get_and_inc_flush_id(); + FlushBufPtr2IdMap.emplace(buf, flush_id); + assert(Id2FlushMdMap.find(flush_id) == Id2FlushMdMap.end()); + Id2FlushMdMap.emplace(flush_id, FlushMd(cursor, buf, Flush_waiting)); + + DP("Added new flush id %lu cursor %p buf %p\n", flush_id, cursor, buf->Start); + + return flush_id; +} + +/* + * Called by ompt_flush_trace and ompt_stop_trace. Traverse the + * existing buffers in creation order and flush all the ready TRs + */ +int OmptTracingBufferMgr::flushAllBuffers(ompt_device_t *device) { + // If flush is called from a helper thread, just bail out + if (amIHelperThread()) + return 0; // failed to flush + + // To avoid holding the mutex for too long, get the ids of the first + // and the last TRs under lock, and then go through that range, + // holding the mutex for an individual TR + std::unique_lock buf_lock(BufferMgrMutex); + if (Id2BufferMap.empty()) + return 1; // no trace records to flush + uint64_t curr_buf_id = Id2BufferMap.begin()->first; + uint64_t last_buf_id = Id2BufferMap.rbegin()->first; + buf_lock.unlock(); + + while (curr_buf_id <= last_buf_id) { + std::unique_lock buf_lock(BufferMgrMutex); + // Another thread may have deleted this buffer by now + auto buf_itr = Id2BufferMap.find(curr_buf_id); + if (buf_itr == Id2BufferMap.end()) { + ++curr_buf_id; + continue; + } + // If this buffer is in the flush-map, skip it. It is either in + // process by another thread or will be processed + BufPtr curr_buf = buf_itr->second; + std::unique_lock flush_lock(FlushMutex); + auto flush_itr = FlushBufPtr2IdMap.find(curr_buf); + if (flush_itr != FlushBufPtr2IdMap.end()) { + ++curr_buf_id; + continue; + } + // This buffer has not been flushed yet + uint64_t flush_id = addNewFlushEntry(curr_buf, curr_buf->Cursor); + DP("flushAllBuffers: Added new id %lu cursor %p buf %p\n", flush_id, + curr_buf->Cursor, curr_buf->Start); + + flush_lock.unlock(); + buf_lock.unlock(); + + ++curr_buf_id; + } + + // Wake up all helper threads to invoke buffer-completion callbacks + FlushCv.notify_all(); + + // This is best effort. It is possible that some trace records are + // not flushed when the wait is done. + waitForFlushCompletion(); + + return 1; // success +} + +void OmptTracingBufferMgr::waitForFlushCompletion() { + std::unique_lock flush_lock(FlushMutex); + for (uint32_t i = 0; i < OMPT_NUM_HELPER_THREADS; ++i) + setThreadFlush(i); + ThreadFlushCv.wait(flush_lock, [this] { return ThreadFlushTracker == 0; }); +} + +void OmptTracingBufferMgr::init() { + ThreadFlushTracker = 0; + ThreadShutdownTracker = 0; + done_tracing = false; // TODO make it a class member +} + +void OmptTracingBufferMgr::startHelperThreads() { + // All helper threads are stopped while holding FlushMutex. So if + // any helper thread is present, just return. This takes care of + // repeated calls to start-trace. + std::unique_lock flush_lock(FlushMutex); + if (!HelperThreadIdMap.empty()) { + assert(!done_tracing && "Helper threads exist but tracing is done"); + return; + } + init(); + createHelperThreads(); +} + +void OmptTracingBufferMgr::shutdownHelperThreads() { + std::unique_lock flush_lock(FlushMutex); + if (done_tracing // If another thread called stop, there is nothing + // to do for this thread + || HelperThreadIdMap.empty() // Threads were never started + ) { + // Don't assert on HelperThreadIdMap since shutdown by another + // thread may be in progress + return; + } + + // If I am destroying the threads, then at least one thread must be present + assert(!CompletionThreads.empty()); + assert(!HelperThreadIdMap.empty()); + assert(ThreadShutdownTracker == 0); + + // Set the done flag which helper threads will look at + done_tracing = true; + // Wait to make sure all helper threads exit + for (uint32_t i = 0; i < OMPT_NUM_HELPER_THREADS; ++i) + setThreadShutdown(i); + // Signal indicating that done_tracing is set + FlushCv.notify_all(); + ThreadShutdownCv.wait(flush_lock, + [this] { return ThreadShutdownTracker == 0; }); + + // Now destroy all the helper threads + destroyHelperThreads(); +} + +void OmptTracingBufferMgr::createHelperThreads() { + for (uint32_t i = 0; i < OMPT_NUM_HELPER_THREADS; ++i) { + CompletionThreads.emplace_back( + std::thread(&OmptTracingBufferMgr::driveCompletion, this)); + HelperThreadIdMap[CompletionThreads.back().get_id()] = i; + } +} + +void OmptTracingBufferMgr::destroyHelperThreads() { + for (auto &thd : CompletionThreads) + thd.join(); + CompletionThreads.clear(); + HelperThreadIdMap.clear(); +} + +OmptTracingBufferMgr::OmptTracingBufferMgr() { + // no need to hold locks for init() since object is getting constructed here + init(); +} + +OmptTracingBufferMgr::~OmptTracingBufferMgr() { + OMPT_TRACING_IF_ENABLED(shutdownHelperThreads();); +} diff --git a/openmp/libomptarget/src/ompt_callback.h b/openmp/libomptarget/src/ompt_callback.h --- a/openmp/libomptarget/src/ompt_callback.h +++ b/openmp/libomptarget/src/ompt_callback.h @@ -25,8 +25,14 @@ #define OMPT_GET_RETURN_ADDRESS(level) __builtin_return_address(level) +#include + #include +using HighResClk = std::chrono::high_resolution_clock; +using HighResTp = std::chrono::time_point; +using DurationNs = std::chrono::nanoseconds; + class OmptInterface { public: OmptInterface() @@ -83,6 +89,24 @@ void target_end(int64_t device_id, void *codeptr); + uint64_t get_ns_duration_since_epoch() { + const HighResTp time_point = HighResClk::now(); + const HighResClk::duration duration_since_epoch = + time_point.time_since_epoch(); + return std::chrono::duration_cast(duration_since_epoch).count(); + } + + ompt_record_ompt_t *target_trace_record_gen(int64_t device_id, + ompt_target_t kind, + ompt_scope_endpoint_t endpoint, + void *code); + ompt_record_ompt_t * + target_submit_trace_record_gen(uint64_t start_time, + unsigned int num_teams = 1); + ompt_record_ompt_t *target_data_submit_trace_record_gen( + int64_t device_id, ompt_target_data_op_t data_op, void *tgt_ptr, + void *hst_ptr, size_t bytes, uint64_t start_time); + private: void ompt_state_set_helper(void *enter_frame, void *codeptr_ra, int flags, int state); @@ -103,6 +127,21 @@ void *_enter_frame; void *_codeptr_ra; int _state; + + // Called by all trace generation routines + void set_trace_record_common(ompt_record_ompt_t *data_ptr, + ompt_callbacks_t cbt, uint64_t start_time); + // Type specific helpers + void set_trace_record_target_data_op(ompt_record_target_data_op_t *rec, + int64_t device_id, + ompt_target_data_op_t data_op, + void *src_ptr, void *dest_ptr, + size_t bytes); + void set_trace_record_target_kernel(ompt_record_target_kernel_t *rec, + unsigned int num_teams); + void set_trace_record_target(ompt_record_target_t *rec, int64_t device_id, + ompt_target_t kind, + ompt_scope_endpoint_t endpoint, void *code); }; extern thread_local OmptInterface ompt_interface; diff --git a/openmp/libomptarget/src/ompt_callback.cpp b/openmp/libomptarget/src/ompt_callback.cpp --- a/openmp/libomptarget/src/ompt_callback.cpp +++ b/openmp/libomptarget/src/ompt_callback.cpp @@ -14,6 +14,7 @@ #include #include #include +#include //**************************************************************************** // local include files @@ -25,6 +26,7 @@ #include "private.h" #include +#include #include /******************************************************************************* @@ -70,6 +72,8 @@ ompt_device_callbacks_t ompt_device_callbacks; +OmptTracingBufferMgr ompt_trace_record_buffer_mgr; + /***************************************************************************** * private data *****************************************************************************/ @@ -82,6 +86,13 @@ const char *ompt_device_callbacks_t::documentation = 0; +static std::atomic unique_id_ticket(1); + +// Mutexes to serialize entry points invocation +static std::mutex set_trace_mutex; +// Serialize start/stop/flush +static std::mutex start_stop_flush_trace_mutex; + /***************************************************************************** * Thread local data *****************************************************************************/ @@ -93,8 +104,7 @@ static thread_local ompt_data_t *ompt_task_data = 0; static thread_local ompt_data_t *ompt_target_task_data = 0; static thread_local ompt_id_t host_op_id = 0; - -static std::atomic unique_id_ticket(1); +static thread_local uint32_t ompt_num_granted_teams = 0; /***************************************************************************** * OMPT callbacks @@ -260,6 +270,50 @@ target_operation_end(); } +ompt_record_ompt_t *OmptInterface::target_data_submit_trace_record_gen( + int64_t device_id, ompt_target_data_op_t data_op, void *src_ptr, + void *dest_ptr, size_t bytes, uint64_t start_time) { + if (!ompt_device_callbacks.is_tracing_enabled() || + (!ompt_device_callbacks.is_tracing_type_enabled( + ompt_callback_target_data_op) && + !ompt_device_callbacks.is_tracing_type_enabled( + ompt_callback_target_data_op_emi))) + return nullptr; + + ompt_record_ompt_t *data_ptr = + (ompt_record_ompt_t *)ompt_trace_record_buffer_mgr.assignCursor( + ompt_callback_target_data_op); + + // Logically, this record is now private + + set_trace_record_common(data_ptr, ompt_callback_target_data_op, start_time); + + set_trace_record_target_data_op(&data_ptr->record.target_data_op, device_id, + data_op, src_ptr, dest_ptr, bytes); + + // The trace record has been created, mark it ready for delivery to the tool + ompt_trace_record_buffer_mgr.setTRStatus(data_ptr, + OmptTracingBufferMgr::TR_ready); + + DP("Generated target_data_submit trace record %p\n", data_ptr); + return data_ptr; +} + +void OmptInterface::set_trace_record_target_data_op( + ompt_record_target_data_op_t *rec, int64_t device_id, + ompt_target_data_op_t data_op, void *src_ptr, void *dest_ptr, + size_t bytes) { + rec->host_op_id = ompt_target_region_opid; + rec->optype = data_op; + rec->src_addr = src_ptr; + rec->src_device_num = device_id; + rec->dest_addr = dest_ptr; + rec->dest_device_num = device_id; + rec->bytes = bytes; + rec->end_time = get_ns_duration_since_epoch(); + rec->codeptr_ra = _codeptr_ra; +} + void OmptInterface::target_submit_begin(unsigned int num_teams) { ompt_device_callbacks.ompt_callback_target_submit_emi( ompt_scope_begin, &ompt_target_data, num_teams, opid_create, @@ -272,6 +326,42 @@ &ompt_target_region_opid); } +ompt_record_ompt_t * +OmptInterface::target_submit_trace_record_gen(uint64_t start_time, + unsigned int num_teams) { + if (!ompt_device_callbacks.is_tracing_enabled() || + (!ompt_device_callbacks.is_tracing_type_enabled( + ompt_callback_target_submit) && + !ompt_device_callbacks.is_tracing_type_enabled( + ompt_callback_target_submit_emi))) + return nullptr; + + ompt_record_ompt_t *data_ptr = + (ompt_record_ompt_t *)ompt_trace_record_buffer_mgr.assignCursor( + ompt_callback_target_submit); + + // Logically, this record is now private + + set_trace_record_common(data_ptr, ompt_callback_target_submit, start_time); + + set_trace_record_target_kernel(&data_ptr->record.target_kernel, num_teams); + + // The trace record has been created, mark it ready for delivery to the tool + ompt_trace_record_buffer_mgr.setTRStatus(data_ptr, + OmptTracingBufferMgr::TR_ready); + + DP("Generated target_submit trace record %p\n", data_ptr); + return data_ptr; +} + +void OmptInterface::set_trace_record_target_kernel( + ompt_record_target_kernel_t *rec, unsigned int num_teams) { + rec->host_op_id = ompt_target_region_opid; + rec->requested_num_teams = num_teams; + rec->granted_num_teams = ompt_num_granted_teams; + rec->end_time = get_ns_duration_since_epoch(); +} + void OmptInterface::target_data_enter_begin(int64_t device_id, void *codeptr) { target_region_begin(); ompt_device_callbacks.ompt_callback_target_emi( @@ -331,6 +421,60 @@ target_region_end(); } +ompt_record_ompt_t * +OmptInterface::target_trace_record_gen(int64_t device_id, ompt_target_t kind, + ompt_scope_endpoint_t endpoint, + void *code) { + if (!ompt_device_callbacks.is_tracing_enabled() || + (!ompt_device_callbacks.is_tracing_type_enabled(ompt_callback_target) && + !ompt_device_callbacks.is_tracing_type_enabled( + ompt_callback_target_emi))) + return nullptr; + + uint64_t start_time = ompt_interface.get_ns_duration_since_epoch(); + + ompt_record_ompt_t *data_ptr = + (ompt_record_ompt_t *)ompt_trace_record_buffer_mgr.assignCursor( + ompt_callback_target); + + // Logically, this record is now private + + set_trace_record_common(data_ptr, ompt_callback_target, start_time); + set_trace_record_target(&data_ptr->record.target, device_id, kind, endpoint, + code); + + // The trace record has been created, mark it ready for delivery to the tool + ompt_trace_record_buffer_mgr.setTRStatus(data_ptr, + OmptTracingBufferMgr::TR_ready); + + DP("Generated target trace record %p, completing a kernel\n", data_ptr); + + return data_ptr; +} + +void OmptInterface::set_trace_record_target(ompt_record_target_t *rec, + int64_t device_id, + ompt_target_t kind, + ompt_scope_endpoint_t endpoint, + void *code) { + rec->kind = kind; + rec->endpoint = endpoint; + rec->device_num = device_id; + assert(ompt_task_data); + rec->task_id = ompt_task_data->value; + rec->target_id = ompt_target_data.value; + rec->codeptr_ra = code; +} + +void OmptInterface::set_trace_record_common(ompt_record_ompt_t *data_ptr, + ompt_callbacks_t cbt, + uint64_t start_time) { + data_ptr->type = cbt; + data_ptr->time = start_time; + data_ptr->thread_id = 0; // TODO + data_ptr->target_id = ompt_target_data.value; +} + /***************************************************************************** * OMPT interface operations *****************************************************************************/ @@ -420,4 +564,72 @@ } DP("OMPT: Leave libomptarget_ompt_connect\n"); } + +// Device-independent entry point for ompt_set_trace_ompt +ompt_set_result_t libomptarget_ompt_set_trace_ompt(ompt_device_t *device, + unsigned int enable, + unsigned int etype) { + std::unique_lock lck(set_trace_mutex); + return ompt_device_callbacks.set_trace_ompt(device, enable, etype); +} + +// Device-independent entry point for ompt_start_trace +int libomptarget_ompt_start_trace(ompt_callback_buffer_request_t request, + ompt_callback_buffer_complete_t complete) { + std::unique_lock lck(start_stop_flush_trace_mutex); + ompt_device_callbacks.set_buffer_request(request); + ompt_device_callbacks.set_buffer_complete(complete); + if (request && complete) { + ompt_device_callbacks.set_tracing_enabled(true); + ompt_trace_record_buffer_mgr.startHelperThreads(); + return 1; // success + } + return 0; // failure +} + +// Device-independent entry point for ompt_flush_trace +int libomptarget_ompt_flush_trace(ompt_device_t *device) { + std::unique_lock lck(start_stop_flush_trace_mutex); + return ompt_trace_record_buffer_mgr.flushAllBuffers(device); +} + +// Device independent entry point for ompt_stop_trace +int libomptarget_ompt_stop_trace(ompt_device_t *device) { + std::unique_lock lck(start_stop_flush_trace_mutex); + int status = ompt_trace_record_buffer_mgr.flushAllBuffers(device); + // TODO shutdown should perhaps return a status + ompt_trace_record_buffer_mgr.shutdownHelperThreads(); + ompt_device_callbacks.set_tracing_enabled(false); + return status; +} + +// Device independent entry point for ompt_advance_buffer_cursor +// Note: The input parameter size is unused here. It refers to the +// bytes returned in the corresponding callback. +int libomptarget_ompt_advance_buffer_cursor(ompt_device_t *device, + ompt_buffer_t *buffer, size_t size, + ompt_buffer_cursor_t current, + ompt_buffer_cursor_t *next) { + char *curr_rec = (char *)current; + // Don't assert if current is null, just indicate end of buffer + if (curr_rec == nullptr || + ompt_trace_record_buffer_mgr.isLastCursor(curr_rec)) { + *next = 0; + return false; + } + // TODO In debug mode, assert that the metadata points to the + // input parameter buffer + + size_t sz = sizeof(ompt_record_ompt_t); + *next = (ompt_buffer_cursor_t)(curr_rec + sz); + DP("Advanced buffer pointer by %lu bytes to %p\n", sz, curr_rec + sz); + return true; +} + +// This function is invoked before the kernel launch. So when the +// trace record is populated after kernel completion, +// ompt_num_granted_teams is already updated +void libomptarget_ompt_set_granted_teams(uint32_t num_teams) { + ompt_num_granted_teams = num_teams; +} } diff --git a/openmp/libomptarget/src/omptarget.cpp b/openmp/libomptarget/src/omptarget.cpp --- a/openmp/libomptarget/src/omptarget.cpp +++ b/openmp/libomptarget/src/omptarget.cpp @@ -1487,9 +1487,11 @@ DP("Launching target execution %s with pointer " DPxMOD " (index=%d).\n", TargetTable->EntriesBegin[TM->Index].name, DPxPTR(TgtEntryPtr), TM->Index); + uint64_t start_time = 0; OMPT_IF_ENABLED(ompt_interface.ompt_state_set(OMPT_GET_FRAME_ADDRESS(0), OMPT_GET_RETURN_ADDRESS(0)); - ompt_interface.target_submit_begin(TeamNum);); + ompt_interface.target_submit_begin(TeamNum); + start_time = ompt_interface.get_ns_duration_since_epoch();); { TIMESCOPE_WITH_NAME_AND_IDENT( @@ -1503,14 +1505,16 @@ TgtArgs.size(), AsyncInfo); } + OMPT_IF_ENABLED( + ompt_interface.target_submit_trace_record_gen(start_time, TeamNum); + ompt_interface.target_submit_end(TeamNum); + ompt_interface.ompt_state_clear();); + if (Ret != OFFLOAD_SUCCESS) { REPORT("Executing target region abort target.\n"); return OFFLOAD_FAIL; } - OMPT_IF_ENABLED(ompt_interface.target_submit_end(TeamNum); - ompt_interface.ompt_state_clear();); - if (ArgNum) { // Transfer data back and deallocate target memory for (first-)private // variables diff --git a/openmp/libomptarget/test/ompt/callbacks.h b/openmp/libomptarget/test/ompt/callbacks.h --- a/openmp/libomptarget/test/ompt/callbacks.h +++ b/openmp/libomptarget/test/ompt/callbacks.h @@ -1,3 +1,4 @@ +#include #include // Tool related code below @@ -6,7 +7,133 @@ // For EMI callbacks ompt_id_t next_op_id = 0x8000000000000001; -// OMPT callbacks +#define OMPT_BUFFER_REQUEST_SIZE 256 + +// OMPT entry point handles +static ompt_set_trace_ompt_t ompt_set_trace_ompt = 0; +static ompt_start_trace_t ompt_start_trace = 0; +static ompt_flush_trace_t ompt_flush_trace = 0; +static ompt_stop_trace_t ompt_stop_trace = 0; +static ompt_get_record_ompt_t ompt_get_record_ompt = 0; +static ompt_advance_buffer_cursor_t ompt_advance_buffer_cursor = 0; + +// OMPT trace record utilities +static void print_record_ompt(ompt_record_ompt_t *rec) { + if (rec == NULL) return; + + printf("rec=%p type=%d time=%lu thread_id=%lu target_id=%lu\n", + rec, rec->type, rec->time, rec->thread_id, rec->target_id); + + switch (rec->type) { + case ompt_callback_target: + case ompt_callback_target_emi: + { + ompt_record_target_t target_rec = rec->record.target; + printf("\tRecord Target: kind=%d endpoint=%d device=%d task_id=%lu target_id=%lu codeptr=%p\n", + target_rec.kind, target_rec.endpoint, target_rec.device_num, + target_rec.task_id, target_rec.target_id, target_rec.codeptr_ra); + break; + } + case ompt_callback_target_data_op: + case ompt_callback_target_data_op_emi: + { + ompt_record_target_data_op_t target_data_op_rec = rec->record.target_data_op; + printf("\t Record DataOp: host_op_id=%lu optype=%d src_addr=%p src_device=%d " + "dest_addr=%p dest_device=%d bytes=%lu end_time=%lu duration=%lu ns codeptr=%p\n", + target_data_op_rec.host_op_id, target_data_op_rec.optype, + target_data_op_rec.src_addr, target_data_op_rec.src_device_num, + target_data_op_rec.dest_addr, target_data_op_rec.dest_device_num, + target_data_op_rec.bytes, target_data_op_rec.end_time, + target_data_op_rec.end_time - rec->time, + target_data_op_rec.codeptr_ra); + break; + } + case ompt_callback_target_submit: + case ompt_callback_target_submit_emi: + { + ompt_record_target_kernel_t target_kernel_rec = rec->record.target_kernel; + printf("\t Record Submit: host_op_id=%lu requested_num_teams=%u granted_num_teams=%u " + "end_time=%lu duration=%lu ns\n", + target_kernel_rec.host_op_id, target_kernel_rec.requested_num_teams, + target_kernel_rec.granted_num_teams, target_kernel_rec.end_time, + target_kernel_rec.end_time - rec->time); + break; + } + default: + assert(0); + break; + } +} + +static void delete_buffer_ompt(ompt_buffer_t *buffer) { + free(buffer); + printf("Deallocated %p\n", buffer); +} + +// OMPT trace record callbacks +static void on_ompt_callback_buffer_request ( + int device_num, + ompt_buffer_t **buffer, + size_t *bytes +) { + *bytes = OMPT_BUFFER_REQUEST_SIZE; + *buffer = malloc(*bytes); + printf("Allocated %lu bytes at %p in buffer request callback\n", *bytes, *buffer); +} + +// Note: This callback must handle a null begin cursor. Currently, +// ompt_get_record_ompt, print_record_ompt, and +// ompt_advance_buffer_cursor handle a null cursor. +static void on_ompt_callback_buffer_complete ( + int device_num, + ompt_buffer_t *buffer, + size_t bytes, /* bytes returned in this callback */ + ompt_buffer_cursor_t begin, + int buffer_owned +) { + printf("Executing buffer complete callback: %d %p %lu %p %d\n", + device_num, buffer, bytes, (void*)begin, buffer_owned); + + int status = 1; + ompt_buffer_cursor_t current = begin; + while (status) { + ompt_record_ompt_t *rec = ompt_get_record_ompt(buffer, current); + print_record_ompt(rec); + status = ompt_advance_buffer_cursor(NULL, /* TODO device */ + buffer, + bytes, + current, + ¤t); + } + if (buffer_owned) delete_buffer_ompt(buffer); +} + +// OMPT trace record utilities +static ompt_set_result_t set_trace_ompt() { + if (!ompt_set_trace_ompt) return ompt_set_error; + + ompt_set_trace_ompt(0, 1, ompt_callback_target); + ompt_set_trace_ompt(0, 1, ompt_callback_target_data_op); + ompt_set_trace_ompt(0, 1, ompt_callback_target_submit); + + return ompt_set_always; +} + +static int start_trace() { + if (!ompt_start_trace) return 0; + return ompt_start_trace(0, &on_ompt_callback_buffer_request, + &on_ompt_callback_buffer_complete); +} + +static int flush_trace() { + if (!ompt_flush_trace) return 0; + return ompt_flush_trace(0); +} + +static int stop_trace() { + if (!ompt_stop_trace) return 0; + return ompt_stop_trace(0); +} // Synchronous callbacks static void on_ompt_callback_device_initialize(int device_num, const char *type, @@ -15,6 +142,28 @@ const char *documentation) { printf("Callback Init: device_num=%d type=%s device=%p lookup=%p doc=%p\n", device_num, type, device, lookup, documentation); + + if (!lookup) { + printf("Trace collection disabled on device %d\n", device_num); + return; + } + + ompt_set_trace_ompt = (ompt_set_trace_ompt_t) lookup("ompt_set_trace_ompt"); + ompt_start_trace = (ompt_start_trace_t) lookup("ompt_start_trace"); + ompt_flush_trace = (ompt_flush_trace_t) lookup("ompt_flush_trace"); + ompt_stop_trace = (ompt_stop_trace_t) lookup("ompt_stop_trace"); + ompt_get_record_ompt = (ompt_get_record_ompt_t) lookup("ompt_get_record_ompt"); + ompt_advance_buffer_cursor = (ompt_advance_buffer_cursor_t) lookup("ompt_advance_buffer_cursor"); + + set_trace_ompt(); + + // In many scenarios, this will be a good place to start the + // trace. If start_trace is called from the main program before this + // callback is dispatched, the start_trace handle will be null. This + // is because this device_init callback is invoked during the first + // target construct implementation. + + start_trace(); } static void on_ompt_callback_device_finalize(int device_num) { diff --git a/openmp/libomptarget/test/ompt/veccopy.c b/openmp/libomptarget/test/ompt/veccopy.c --- a/openmp/libomptarget/test/ompt/veccopy.c +++ b/openmp/libomptarget/test/ompt/veccopy.c @@ -5,7 +5,8 @@ // UNSUPPORTED: x86_64-pc-linux-gnu /* - * Example OpenMP program that registers non-EMI callbacks + * Example OpenMP program that registers non-EMI callbacks and uses + * start/stop/flush APIs to control the regions to trace */ #include @@ -29,18 +30,24 @@ for (i=0; i -#include #include #include "callbacks.h" @@ -30,18 +29,26 @@ for (i=0; i