diff --git a/openmp/runtime/src/kmp.h b/openmp/runtime/src/kmp.h --- a/openmp/runtime/src/kmp.h +++ b/openmp/runtime/src/kmp.h @@ -2481,6 +2481,62 @@ } ed; } kmp_event_t; +// Maximum number of TDGs +#define NUM_TDG_LIMIT 100 +// Initial number of allocated nodes while recording +#define INIT_MAPSIZE 50 + +typedef struct kmp_taskgraph_flags { /*This needs to be exactly 32 bits */ + unsigned nowait : 1; + unsigned re_record : 1; + unsigned reserved : 30; +} kmp_taskgraph_flags_t; + +/// Represents a TDG node +typedef struct kmp_node_info { + kmp_task_t *task; // Pointer to the actual task + kmp_int32 *successors; // Array of the succesors ids + kmp_int32 nsuccessors; // Number of succesors of the node + std::atomic + npredecessors_counter; // Number of predessors on the fly + kmp_int32 npredecessors; // Total number of predecessors + kmp_int32 successors_size; // Number of allocated succesors ids + kmp_taskdata_t *parent_task; // Parent implicit task +} kmp_node_info_t; + +/// Represent a TDG's current status +typedef enum kmp_tdg_status { + KMP_TDG_NONE = 0, + KMP_TDG_RECORDING = 1, + KMP_TDG_READY = 2 +} kmp_tdg_status_t; + +/// Structure that contains a TDG +typedef struct kmp_tdg_info { + kmp_int32 tdg_id; // Unique idenfifier of the TDG + kmp_taskgraph_flags_t tdg_flags; // Flags related to a TDG + kmp_int32 map_size; // Number of allocated TDG nodes + kmp_int32 num_roots; // Number of roots tasks int the TDG + kmp_int32 *root_tasks; // Array of tasks identifiers that are roots + kmp_node_info_t *record_map; // Array of TDG nodes + kmp_tdg_status_t tdg_status = + KMP_TDG_NONE; // Status of the TDG (recording, ready...) + std::atomic num_tasks; // Number of TDG nodes + kmp_bootstrap_lock_t + graph_lock; // Protect graph attributes when updated via taskloop_recur + // Taskloop reduction related + void *rec_taskred_data; // Data to pass to __kmpc_task_reduction_init or + // __kmpc_taskred_init + kmp_int32 rec_num_taskred; +} kmp_tdg_info_t; + +extern kmp_tdg_info_t *__kmp_global_tdgs[NUM_TDG_LIMIT]; +extern kmp_int32 __kmp_curr_tdg_idx; +extern kmp_int32 __kmp_successors_size; +extern std::atomic __kmp_tdg_task_id; +extern kmp_int32 __kmp_max_nesting; +extern kmp_int32 __kmp_num_tdg; + #ifdef BUILD_TIED_TASK_STACK /* Tied Task stack definitions */ @@ -2528,7 +2584,8 @@ unsigned complete : 1; /* 1==complete, 0==not complete */ unsigned freed : 1; /* 1==freed, 0==allocated */ unsigned native : 1; /* 1==gcc-compiled task, 0==intel */ - unsigned reserved31 : 7; /* reserved for library use */ + unsigned onced : 1; /* 1==ran once already, 0==never ran, record & replay purposes */ + unsigned reserved31 : 6; /* reserved for library use */ } kmp_tasking_flags_t; @@ -2578,6 +2635,8 @@ #if OMPT_SUPPORT ompt_task_info_t ompt_task_info; #endif + bool is_taskgraph = 0; // whether the task is within a TDG + kmp_tdg_info_t *tdg; // used to associate task with a TDG kmp_target_data_t td_target_data; }; // struct kmp_taskdata @@ -4118,6 +4177,18 @@ void **user_lock, uintptr_t hint); +// Taskgraph's Record & Replay mechanism +// __kmp_tdg_is_recording: check whether a given TDG is recording +// status: the tdg's current status +static inline bool __kmp_tdg_is_recording(kmp_tdg_status_t status) { + return status == KMP_TDG_RECORDING; +} + +KMP_EXPORT kmp_int32 __kmpc_start_record_task(ident_t *loc, kmp_int32 gtid, + kmp_int32 input_flags, + kmp_int32 tdg_id); +KMP_EXPORT void __kmpc_end_record_task(ident_t *loc, kmp_int32 gtid, + kmp_int32 input_flags, kmp_int32 tdg_id); /* Interface to fast scalable reduce methods routines */ KMP_EXPORT kmp_int32 __kmpc_reduce_nowait( diff --git a/openmp/runtime/src/kmp_global.cpp b/openmp/runtime/src/kmp_global.cpp --- a/openmp/runtime/src/kmp_global.cpp +++ b/openmp/runtime/src/kmp_global.cpp @@ -557,4 +557,13 @@ int __kmp_nesting_mode_nlevels = 1; int *__kmp_nesting_nth_level; -// end of file // +// TDG record & replay +kmp_tdg_info_t *__kmp_global_tdgs[NUM_TDG_LIMIT]; +kmp_int32 + __kmp_curr_tdg_idx; // Id of the current TDG being recorded or executed +kmp_int32 __kmp_num_tdg = 0; +kmp_int32 __kmp_successors_size = 10; // Initial succesor size list for + // recording +kmp_int32 __kmp_max_nesting = 4; // Nesting when erasing edges +std::atomic __kmp_tdg_task_id = 0; +// end of file // \ No newline at end of file diff --git a/openmp/runtime/src/kmp_taskdeps.h b/openmp/runtime/src/kmp_taskdeps.h --- a/openmp/runtime/src/kmp_taskdeps.h +++ b/openmp/runtime/src/kmp_taskdeps.h @@ -92,6 +92,21 @@ extern void __kmpc_give_task(kmp_task_t *ptask, kmp_int32 start); static inline void __kmp_release_deps(kmp_int32 gtid, kmp_taskdata_t *task) { + + if (task->is_taskgraph && !(__kmp_tdg_is_recording(task->tdg->tdg_status))) { + kmp_node_info_t *TaskInfo = &(task->tdg->record_map[task->td_task_id]); + + for (int i = 0; i < TaskInfo->nsuccessors; i++) { + kmp_int32 successorNumber = TaskInfo->successors[i]; + kmp_node_info_t *successor = &(task->tdg->record_map[successorNumber]); + kmp_int32 npredecessors = KMP_ATOMIC_DEC(&successor->npredecessors_counter) - 1; + if (successor->task != nullptr && npredecessors == 0) { + __kmp_omp_task(gtid, successor->task, false); + } + } + return; + } + kmp_info_t *thread = __kmp_threads[gtid]; kmp_depnode_t *node = task->td_depnode; @@ -120,8 +135,10 @@ gtid, task)); KMP_ACQUIRE_DEPNODE(gtid, node); - node->dn.task = - NULL; // mark this task as finished, so no new dependencies are generated + if (!task->is_taskgraph || + (task->is_taskgraph && !__kmp_tdg_is_recording(task->tdg->tdg_status))) + node->dn.task = + NULL; // mark this task as finished, so no new dependencies are generated KMP_RELEASE_DEPNODE(gtid, node); kmp_depnode_list_t *next; diff --git a/openmp/runtime/src/kmp_taskdeps.cpp b/openmp/runtime/src/kmp_taskdeps.cpp --- a/openmp/runtime/src/kmp_taskdeps.cpp +++ b/openmp/runtime/src/kmp_taskdeps.cpp @@ -218,6 +218,42 @@ static inline void __kmp_track_dependence(kmp_int32 gtid, kmp_depnode_t *source, kmp_depnode_t *sink, kmp_task_t *sink_task) { + kmp_taskdata_t *task_source = KMP_TASK_TO_TASKDATA(source->dn.task); + kmp_taskdata_t *task_sink = KMP_TASK_TO_TASKDATA(sink_task); + if (source->dn.task && sink_task) { + // Not supporting dependency between two tasks that one is within the TDG + // and the other is not + KMP_ASSERT(task_source->is_taskgraph == task_sink->is_taskgraph); + } + if (task_sink->is_taskgraph && + __kmp_tdg_is_recording(task_sink->tdg->tdg_status)) { + kmp_node_info_t *source_info = + &task_sink->tdg->record_map[task_source->td_task_id]; + bool exists = false; + for (int i = 0; i < source_info->nsuccessors; i++) { + if (source_info->successors[i] == task_sink->td_task_id) { + exists = true; + break; + } + } + if (!exists) { + if (source_info->nsuccessors >= source_info->successors_size) { + source_info->successors_size = 2 * source_info->successors_size; + kmp_int32 *old_succ_ids = source_info->successors; + kmp_int32 *new_succ_ids = (kmp_int32 *)__kmp_allocate( + source_info->successors_size * sizeof(kmp_int32)); + source_info->successors = new_succ_ids; + __kmp_free(old_succ_ids); + } + + source_info->successors[source_info->nsuccessors] = task_sink->td_task_id; + source_info->nsuccessors++; + + kmp_node_info_t *sink_info = + &(task_sink->tdg->record_map[task_sink->td_task_id]); + sink_info->npredecessors++; + } + } #ifdef KMP_SUPPORT_GRAPH_OUTPUT kmp_taskdata_t *task_source = KMP_TASK_TO_TASKDATA(source->dn.task); // do not use sink->dn.task as that is only filled after the dependences @@ -256,10 +292,19 @@ // link node as successor of list elements for (kmp_depnode_list_t *p = plist; p; p = p->next) { kmp_depnode_t *dep = p->node; + kmp_tdg_status tdg_status = KMP_TDG_NONE; + if (task) { + kmp_taskdata_t *td = KMP_TASK_TO_TASKDATA(task); + if (td->is_taskgraph) + tdg_status = KMP_TASK_TO_TASKDATA(task)->tdg->tdg_status; + if (__kmp_tdg_is_recording(tdg_status)) + __kmp_track_dependence(gtid, dep, node, task); + } if (dep->dn.task) { KMP_ACQUIRE_DEPNODE(gtid, dep); if (dep->dn.task) { - __kmp_track_dependence(gtid, dep, node, task); + if (!(__kmp_tdg_is_recording(tdg_status)) && task) + __kmp_track_dependence(gtid, dep, node, task); dep->dn.successors = __kmp_add_node(thread, dep->dn.successors, node); KA_TRACE(40, ("__kmp_process_deps: T#%d adding dependence from %p to " "%p\n", @@ -281,16 +326,36 @@ if (!sink) return 0; kmp_int32 npredecessors = 0; + kmp_tdg_status tdg_status = KMP_TDG_NONE; + kmp_taskdata_t *td = KMP_TASK_TO_TASKDATA(task); + if (task) { + if (td->is_taskgraph) + tdg_status = KMP_TASK_TO_TASKDATA(task)->tdg->tdg_status; + if (__kmp_tdg_is_recording(tdg_status) && sink->dn.task) + __kmp_track_dependence(gtid, sink, source, task); + } if (sink->dn.task) { // synchronously add source to sink' list of successors KMP_ACQUIRE_DEPNODE(gtid, sink); if (sink->dn.task) { - __kmp_track_dependence(gtid, sink, source, task); + if (!(__kmp_tdg_is_recording(tdg_status)) && task) + __kmp_track_dependence(gtid, sink, source, task); sink->dn.successors = __kmp_add_node(thread, sink->dn.successors, source); KA_TRACE(40, ("__kmp_process_deps: T#%d adding dependence from %p to " "%p\n", gtid, KMP_TASK_TO_TASKDATA(sink->dn.task), KMP_TASK_TO_TASKDATA(task))); + if (__kmp_tdg_is_recording(tdg_status)) { + kmp_taskdata_t *tdd = KMP_TASK_TO_TASKDATA(sink->dn.task); + if (tdd->is_taskgraph) { + if (tdd->td_flags.onced) + // decrement npredecessors if sink->dn.task belongs to a taskgraph + // and + // 1) the task is reset to its initial state (by kmp_free_task) or + // 2) the task is complete but not yet reset + npredecessors--; + } + } npredecessors++; } KMP_RELEASE_DEPNODE(gtid, sink); @@ -595,6 +660,46 @@ kmp_info_t *thread = __kmp_threads[gtid]; kmp_taskdata_t *current_task = thread->th.th_current_task; + // record TDG with deps + if (new_taskdata->is_taskgraph && + __kmp_tdg_is_recording(new_taskdata->tdg->tdg_status)) { + kmp_tdg_info_t *tdg = new_taskdata->tdg; + // extend record_map if needed + if (new_taskdata->td_task_id >= tdg->map_size) { + __kmp_acquire_bootstrap_lock(&tdg->graph_lock); + if (new_taskdata->td_task_id >= tdg->map_size) { + kmp_uint old_size = tdg->map_size; + kmp_uint new_size = old_size * 2; + kmp_node_info_t *old_record = tdg->record_map; + kmp_node_info_t *new_record = (kmp_node_info_t *)__kmp_allocate( + new_size * sizeof(kmp_node_info_t)); + KMP_MEMCPY(new_record, tdg->record_map, + old_size * sizeof(kmp_node_info_t)); + tdg->record_map = new_record; + + __kmp_free(old_record); + + for (kmp_int i = old_size; i < new_size; i++) { + kmp_int32 *successorsList = (kmp_int32 *)__kmp_allocate( + __kmp_successors_size * sizeof(kmp_int32)); + new_record[i].task = nullptr; + new_record[i].successors = successorsList; + new_record[i].nsuccessors = 0; + new_record[i].npredecessors = 0; + new_record[i].successors_size = __kmp_successors_size; + KMP_ATOMIC_ST_REL(&new_record[i].npredecessors_counter, 0); + } + // update the size at the end, so that we avoid other + // threads use old_record while map_size is already updated + tdg->map_size = new_size; + } + __kmp_release_bootstrap_lock(&tdg->graph_lock); + } + tdg->record_map[new_taskdata->td_task_id].task = new_task; + tdg->record_map[new_taskdata->td_task_id].parent_task = + new_taskdata->td_parent; + KMP_ATOMIC_INC(&tdg->num_tasks); + } #if OMPT_SUPPORT if (ompt_enabled.enabled) { if (!current_task->ompt_task_info.frame.enter_frame.ptr) diff --git a/openmp/runtime/src/kmp_tasking.cpp b/openmp/runtime/src/kmp_tasking.cpp --- a/openmp/runtime/src/kmp_tasking.cpp +++ b/openmp/runtime/src/kmp_tasking.cpp @@ -16,7 +16,7 @@ #include "kmp_stats.h" #include "kmp_wait_release.h" #include "kmp_taskdeps.h" - +#include #if OMPT_SUPPORT #include "ompt-specific.h" #endif @@ -34,6 +34,7 @@ static int __kmp_realloc_task_threads_data(kmp_info_t *thread, kmp_task_team_t *task_team); static void __kmp_bottom_half_finish_proxy(kmp_int32 gtid, kmp_task_t *ptask); +int __kmp_taskloop_task(int gtid, void *ptask); #ifdef BUILD_TIED_TASK_STACK @@ -278,7 +279,7 @@ } // Check mutexinoutset dependencies, acquire locks kmp_depnode_t *node = tasknew->td_depnode; - if (UNLIKELY(node && (node->dn.mtx_num_locks > 0))) { + if (!tasknew->is_taskgraph && UNLIKELY(node && (node->dn.mtx_num_locks > 0))) { for (int i = 0; i < node->dn.mtx_num_locks; ++i) { KMP_DEBUG_ASSERT(node->dn.mtx_locks[i] != NULL); if (__kmp_test_lock(node->dn.mtx_locks[i], gtid)) @@ -885,12 +886,30 @@ task->data2.priority = 0; taskdata->td_flags.freed = 1; + // do not free tasks in taskgraph + if (!taskdata->is_taskgraph) { // deallocate the taskdata and shared variable blocks associated with this task #if USE_FAST_MEMORY __kmp_fast_free(thread, taskdata); #else /* ! USE_FAST_MEMORY */ __kmp_thread_free(thread, taskdata); #endif + } else { + taskdata->td_flags.complete = 0; + taskdata->td_flags.started = 0; + taskdata->td_flags.freed = 0; + taskdata->td_flags.executing = 0; + taskdata->td_flags.task_serial = + (taskdata->td_parent->td_flags.final || + taskdata->td_flags.team_serial || taskdata->td_flags.tasking_ser); + + // taskdata->td_allow_completion_event.pending_events_count = 1; + KMP_ATOMIC_ST_RLX(&taskdata->td_untied_count, 0); + KMP_ATOMIC_ST_RLX(&taskdata->td_incomplete_child_tasks, 0); + // start at one because counts current task and children + KMP_ATOMIC_ST_RLX(&taskdata->td_allocated_child_tasks, 1); + } + KA_TRACE(20, ("__kmp_free_task: T#%d freed task %p\n", gtid, taskdata)); } @@ -977,6 +996,8 @@ flags.detachable == TASK_DETACHABLE || flags.hidden_helper; ret = ret || KMP_ATOMIC_LD_ACQ(&taskdata->td_parent->td_incomplete_child_tasks) > 0; + if (taskdata->td_taskgroup && taskdata->is_taskgraph) + ret = ret || KMP_ATOMIC_LD_ACQ(&taskdata->td_taskgroup->count) > 0; return ret; } @@ -996,6 +1017,8 @@ kmp_info_t *thread = __kmp_threads[gtid]; kmp_task_team_t *task_team = thread->th.th_task_team; // might be NULL for serial teams... + // to avoid seg fault when we need to access taskdata->td_flags after free when using vanilla taskloop + kmp_int32 is_taskgraph; #if KMP_DEBUG kmp_int32 children = 0; #endif @@ -1005,6 +1028,11 @@ KMP_DEBUG_ASSERT(taskdata->td_flags.tasktype == TASK_EXPLICIT); + if (!taskdata->is_taskgraph) + is_taskgraph = 0; + else + is_taskgraph = 1; + // Pop task from stack if tied #ifdef BUILD_TIED_TASK_STACK if (taskdata->td_flags.tiedness == TASK_TIED) { @@ -1111,6 +1139,7 @@ if (completed) { taskdata->td_flags.complete = 1; // mark the task as completed + taskdata->td_flags.onced = 1; // mark the task as ran once already #if OMPT_SUPPORT // This is not a detached task, we are done here @@ -1127,7 +1156,7 @@ #endif KMP_ATOMIC_DEC(&taskdata->td_parent->td_incomplete_child_tasks); KMP_DEBUG_ASSERT(children >= 0); - if (taskdata->td_taskgroup) + if (taskdata->td_taskgroup && !taskdata->is_taskgraph) KMP_ATOMIC_DEC(&taskdata->td_taskgroup->count); } else if (task_team && (task_team->tt.tt_found_proxy_tasks || task_team->tt.tt_hidden_helper_task_encountered)) { @@ -1166,6 +1195,20 @@ // KMP_DEBUG_ASSERT( resumed_task->td_flags.executing == 0 ); resumed_task->td_flags.executing = 1; // resume previous task + if (is_taskgraph) { + if (__kmp_track_children_task(taskdata)) { + if (taskdata->td_taskgroup) { + // TDG: we only release taskgroup barrier here because + // free_task_and_ancestors will call + // __kmp_free_task, which resets all task parameters such as + // taskdata->started, etc. If we release the barrier earlier, these + // parameters could be read before being reset. This is not an issue for + // non-TDG implementation because we never reuse a task(data) structure + KMP_ATOMIC_DEC(&taskdata->td_taskgroup->count); + } + } + } + KA_TRACE( 10, ("__kmp_task_finish(exit): T#%d finished task %p, resuming task %p\n", gtid, taskdata, resumed_task)); @@ -1282,6 +1325,7 @@ task->td_flags.executing = 1; task->td_flags.complete = 0; task->td_flags.freed = 0; + task->td_flags.onced = 0; task->td_depnode = NULL; task->td_last_tied = task; @@ -1318,6 +1362,7 @@ if (task->td_dephash) { int children; task->td_flags.complete = 1; + task->td_flags.onced = 1; children = KMP_ATOMIC_LD_ACQ(&task->td_incomplete_child_tasks); kmp_tasking_flags_t flags_old = task->td_flags; if (children == 0 && flags_old.complete == 1) { @@ -1547,7 +1592,7 @@ taskdata->td_flags.executing = 0; taskdata->td_flags.complete = 0; taskdata->td_flags.freed = 0; - + taskdata->td_flags.onced = 0; KMP_ATOMIC_ST_RLX(&taskdata->td_incomplete_child_tasks, 0); // start at one because counts current task and children KMP_ATOMIC_ST_RLX(&taskdata->td_allocated_child_tasks, 1); @@ -1583,6 +1628,13 @@ } } + if (__kmp_tdg_is_recording( + __kmp_global_tdgs[__kmp_curr_tdg_idx]->tdg_status) && + (task_entry != (kmp_routine_entry_t)__kmp_taskloop_task)) { + taskdata->is_taskgraph = 1; + taskdata->tdg = __kmp_global_tdgs[__kmp_curr_tdg_idx]; + taskdata->td_task_id = KMP_ATOMIC_INC(&__kmp_tdg_task_id); + } KA_TRACE(20, ("__kmp_task_alloc(exit): T#%d created task %p parent=%p\n", gtid, taskdata, taskdata->td_parent)); @@ -1925,6 +1977,51 @@ bool serialize_immediate) { kmp_taskdata_t *new_taskdata = KMP_TASK_TO_TASKDATA(new_task); + if (new_taskdata->is_taskgraph && + __kmp_tdg_is_recording(new_taskdata->tdg->tdg_status)) { + kmp_tdg_info_t *tdg = new_taskdata->tdg; + // extend the record_map if needed + if (new_taskdata->td_task_id >= new_taskdata->tdg->map_size) { + __kmp_acquire_bootstrap_lock(&tdg->graph_lock); + // map_size could have been updated by another thread if recursive + // taskloop + if (new_taskdata->td_task_id >= tdg->map_size) { + kmp_uint old_size = tdg->map_size; + kmp_uint new_size = old_size * 2; + kmp_node_info_t *old_record = tdg->record_map; + kmp_node_info_t *new_record = (kmp_node_info_t *)__kmp_allocate( + new_size * sizeof(kmp_node_info_t)); + + KMP_MEMCPY(new_record, old_record, old_size * sizeof(kmp_node_info_t)); + tdg->record_map = new_record; + + __kmp_free(old_record); + + for (kmp_int i = old_size; i < new_size; i++) { + kmp_int32 *successorsList = (kmp_int32 *)__kmp_allocate( + __kmp_successors_size * sizeof(kmp_int32)); + new_record[i].task = nullptr; + new_record[i].successors = successorsList; + new_record[i].nsuccessors = 0; + new_record[i].npredecessors = 0; + new_record[i].successors_size = __kmp_successors_size; + KMP_ATOMIC_ST_REL(&new_record[i].npredecessors_counter, 0); + } + // update the size at the end, so that we avoid other + // threads use old_record while map_size is already updated + tdg->map_size = new_size; + } + __kmp_release_bootstrap_lock(&tdg->graph_lock); + } + // record a task + if (tdg->record_map[new_taskdata->td_task_id].task == nullptr) { + tdg->record_map[new_taskdata->td_task_id].task = new_task; + tdg->record_map[new_taskdata->td_task_id].parent_task = + new_taskdata->td_parent; + KMP_ATOMIC_INC(&tdg->num_tasks); + } + } + /* Should we execute the new task or queue it? For now, let's just always try to queue it. If the queue fills up, then we'll execute it. */ if (new_taskdata->td_flags.proxy == TASK_PROXY || @@ -2173,7 +2270,6 @@ taskdata->ompt_task_info.frame.enter_frame = ompt_data_none; } #endif // OMPT_SUPPORT && OMPT_OPTIONAL - } KA_TRACE(10, ("__kmpc_omp_taskwait(exit): T#%d task %p finished waiting, " @@ -2441,6 +2537,15 @@ without help of the runtime library. */ void *__kmpc_task_reduction_init(int gtid, int num, void *data) { + if (__kmp_tdg_is_recording( + __kmp_global_tdgs[__kmp_curr_tdg_idx]->tdg_status)) { + kmp_tdg_info_t *this_tdg = __kmp_global_tdgs[__kmp_curr_tdg_idx]; + this_tdg->rec_taskred_data = + __kmp_allocate(sizeof(kmp_task_red_input_t) * num); + this_tdg->rec_num_taskred = num; + KMP_MEMCPY(this_tdg->rec_taskred_data, data, + sizeof(kmp_task_red_input_t) * num); + } return __kmp_task_reduction_init(gtid, num, (kmp_task_red_input_t *)data); } @@ -2457,6 +2562,15 @@ has two parameters, pointer to object to be initialized and pointer to omp_orig */ void *__kmpc_taskred_init(int gtid, int num, void *data) { + if (__kmp_tdg_is_recording( + __kmp_global_tdgs[__kmp_curr_tdg_idx]->tdg_status)) { + kmp_tdg_info_t *this_tdg = __kmp_global_tdgs[__kmp_curr_tdg_idx]; + this_tdg->rec_taskred_data = + __kmp_allocate(sizeof(kmp_task_red_input_t) * num); + this_tdg->rec_num_taskred = num; + KMP_MEMCPY(this_tdg->rec_taskred_data, data, + sizeof(kmp_task_red_input_t) * num); + } return __kmp_task_reduction_init(gtid, num, (kmp_taskred_input_t *)data); } @@ -2503,6 +2617,16 @@ kmp_int32 num = tg->reduce_num_data; kmp_int32 tid = thread->th.th_info.ds.ds_tid; + if ((thread->th.th_current_task->is_taskgraph) && + (!__kmp_tdg_is_recording( + __kmp_global_tdgs[__kmp_curr_tdg_idx]->tdg_status))) { + tg = thread->th.th_current_task->td_taskgroup; + KMP_ASSERT(tg != NULL); + KMP_ASSERT(tg->reduce_data != NULL); + arr = (kmp_taskred_data_t *)(tg->reduce_data); + num = tg->reduce_num_data; + } + KMP_ASSERT(data != NULL); while (tg != NULL) { for (int i = 0; i < num; ++i) { @@ -3024,7 +3148,6 @@ thread_data->td.td_deque_tail = tail; TCW_4(thread_data->td.td_deque_ntasks, thread_data->td.td_deque_ntasks - 1); - __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); KA_TRACE(10, ("__kmp_remove_my_task(exit #4): T#%d task %p removed: " @@ -4228,6 +4351,7 @@ KMP_DEBUG_ASSERT(taskdata->td_flags.freed == 0); taskdata->td_flags.complete = 1; // mark the task as completed + taskdata->td_flags.onced = 1; if (taskdata->td_taskgroup) KMP_ATOMIC_DEC(&taskdata->td_taskgroup->count); @@ -4426,8 +4550,11 @@ // // thread: allocating thread // task_src: pointer to source task to be duplicated +// taskloop_recur: used only when dealing with taskgraph, +// indicating whether we need to update task->td_task_id // returns: a pointer to the allocated kmp_task_t structure (task). -kmp_task_t *__kmp_task_dup_alloc(kmp_info_t *thread, kmp_task_t *task_src) { +kmp_task_t *__kmp_task_dup_alloc(kmp_info_t *thread, kmp_task_t *task_src, + int taskloop_recur) { kmp_task_t *task; kmp_taskdata_t *taskdata; kmp_taskdata_t *taskdata_src = KMP_TASK_TO_TASKDATA(task_src); @@ -4455,7 +4582,11 @@ task = KMP_TASKDATA_TO_TASK(taskdata); // Initialize new task (only specific fields not affected by memcpy) - taskdata->td_task_id = KMP_GEN_TASK_ID(); + if (!taskdata->is_taskgraph || taskloop_recur) + taskdata->td_task_id = KMP_GEN_TASK_ID(); + else if (taskdata->is_taskgraph && + __kmp_tdg_is_recording(taskdata_src->tdg->tdg_status)) + taskdata->td_task_id = KMP_ATOMIC_INC(&__kmp_tdg_task_id); if (task->shareds != NULL) { // need setup shareds pointer shareds_offset = (char *)task_src->shareds - (char *)taskdata_src; task->shareds = &((char *)taskdata)[shareds_offset]; @@ -4682,7 +4813,9 @@ lastpriv = 1; } } - next_task = __kmp_task_dup_alloc(thread, task); // allocate new task + next_task = + __kmp_task_dup_alloc(thread, task, + /* taskloop_recur */ 0); // allocate new task kmp_taskdata_t *next_taskdata = KMP_TASK_TO_TASKDATA(next_task); kmp_taskloop_bounds_t next_task_bounds = kmp_taskloop_bounds_t(next_task, task_bounds); @@ -4879,7 +5012,9 @@ lb1 = ub0 + st; // create pattern task for 2nd half of the loop - next_task = __kmp_task_dup_alloc(thread, task); // duplicate the task + next_task = + __kmp_task_dup_alloc(thread, task, + /* taskloop_recur */ 1); // duplicate the task // adjust lower bound (upper bound is not changed) for the 2nd half *(kmp_uint64 *)((char *)next_task + lower_offset) = lb1; if (ptask_dup != NULL) // construct firstprivates, etc. @@ -4912,6 +5047,10 @@ p->codeptr_ra = codeptr_ra; #endif + kmp_taskdata_t *new_task_data = KMP_TASK_TO_TASKDATA(new_task); + new_task_data->tdg = taskdata->tdg; + new_task_data->is_taskgraph = 0; + #if OMPT_SUPPORT // schedule new task with correct return address for OMPT events __kmp_omp_taskloop_task(NULL, gtid, new_task, codeptr_ra); @@ -4951,6 +5090,7 @@ __kmpc_taskgroup(loc, gtid); } + KMP_ATOMIC_DEC(&__kmp_tdg_task_id); // ========================================================================= // calculate loop parameters kmp_taskloop_bounds_t task_bounds(task, lb, ub); @@ -5198,3 +5338,201 @@ return taskdata->td_task_team != NULL; } + +// __kmp_find_tdg: identify a TDG through its ID +// gtid: Global Thread ID +// tdg_id: ID of the TDG +// returns: If a TDG corresponding to this ID is found and not +// its initial state, return the pointer to it, otherwise nullptr +kmp_tdg_info_t *__kmp_find_tdg(kmp_int32 tdg_id) { + kmp_tdg_info_t *res = nullptr; + if ((__kmp_global_tdgs[tdg_id]) && + (__kmp_global_tdgs[tdg_id]->tdg_status != KMP_TDG_NONE)) + res = __kmp_global_tdgs[tdg_id]; + return res; +} + +// __kmp_start_record: launch the execution of a previous +// recorded TDG +// gtid: Global Thread ID +// tdg: ID of the TDG +void __kmp_exec_tdg(kmp_int32 gtid, kmp_tdg_info_t *tdg) { + KMP_DEBUG_ASSERT(tdg->tdg_status == KMP_TDG_READY); + KA_TRACE(10, ("__kmp_exec_tdg(enter): T#%d tdg_id=%d num_roots=%d\n", gtid, + tdg->tdg_id, tdg->num_roots)); + kmp_node_info_t *this_record_map = tdg->record_map; + kmp_int32 *this_root_tasks = tdg->root_tasks; + kmp_int32 this_num_roots = tdg->num_roots; + kmp_int32 this_num_tasks = KMP_ATOMIC_LD_RLX(&tdg->num_tasks); + + kmp_info_t *thread = __kmp_threads[gtid]; + kmp_taskdata_t *parent_task = thread->th.th_current_task; + + if (tdg->rec_taskred_data) { + __kmpc_taskred_init(gtid, tdg->rec_num_taskred, tdg->rec_taskred_data); + } + + for (kmp_int32 j = 0; j < this_num_tasks; j++) { + kmp_taskdata_t *td = KMP_TASK_TO_TASKDATA(this_record_map[j].task); + + td->td_parent = parent_task; + this_record_map[j].parent_task = parent_task; + + kmp_taskgroup_t *parent_taskgroup = + this_record_map[j].parent_task->td_taskgroup; + + KMP_ATOMIC_ST_RLX(&this_record_map[j].npredecessors_counter, + this_record_map[j].npredecessors); + KMP_ATOMIC_INC(&this_record_map[j].parent_task->td_incomplete_child_tasks); + + if (parent_taskgroup) { + KMP_ATOMIC_INC(&parent_taskgroup->count); + // The taskgroup is different so we must update it + td->td_taskgroup = parent_taskgroup; + } else if (td->td_taskgroup != nullptr) { + // If the parent doesnt have a taskgroup, remove it from the task + td->td_taskgroup = nullptr; + } + if (this_record_map[j].parent_task->td_flags.tasktype == TASK_EXPLICIT) + KMP_ATOMIC_INC(&this_record_map[j].parent_task->td_allocated_child_tasks); + } + + for (kmp_int32 j = 0; j < this_num_roots; ++j) { + __kmp_omp_task(gtid, this_record_map[this_root_tasks[j]].task, true); + } + KA_TRACE(10, ("__kmp_exec_tdg(exit): T#%d tdg_id=%d num_roots=%d\n", gtid, + tdg->tdg_id, tdg->num_roots)); +} + +// __kmp_start_record: set up a TDG structure and turn the +// recording flag to true +// gtid: Global Thread ID of the encountering thread +// input_flags: Flags associated with the TDG +// tdg_id: ID of the TDG to record +static inline void __kmp_start_record(kmp_int32 gtid, + kmp_taskgraph_flags_t *flags, + kmp_int32 tdg_id) { + kmp_tdg_info_t *tdg = + (kmp_tdg_info_t *)__kmp_allocate(sizeof(kmp_tdg_info_t)); + __kmp_global_tdgs[__kmp_curr_tdg_idx] = tdg; + // Initializing the TDG structure + tdg->tdg_id = tdg_id; + tdg->map_size = INIT_MAPSIZE; + tdg->num_roots = -1; + tdg->root_tasks = nullptr; + tdg->tdg_status = KMP_TDG_RECORDING; + tdg->rec_num_taskred = 0; + tdg->rec_taskred_data = nullptr; + KMP_ATOMIC_ST_RLX(&tdg->num_tasks, 0); + + // Initializing the list of nodes in this TDG + kmp_node_info_t *this_record_map = + (kmp_node_info_t *)__kmp_allocate(INIT_MAPSIZE * sizeof(kmp_node_info_t)); + for (kmp_int32 i = 0; i < INIT_MAPSIZE; i++) { + kmp_int32 *successorsList = + (kmp_int32 *)__kmp_allocate(__kmp_successors_size * sizeof(kmp_int32)); + this_record_map[i].task = nullptr; + this_record_map[i].successors = successorsList; + this_record_map[i].nsuccessors = 0; + this_record_map[i].npredecessors = 0; + this_record_map[i].successors_size = __kmp_successors_size; + KMP_ATOMIC_ST_RLX(&this_record_map[i].npredecessors_counter, 0); + } + + __kmp_global_tdgs[__kmp_curr_tdg_idx]->record_map = this_record_map; +} + +// __kmpc_start_record_task: Wrapper around __kmp_start_record to mark +// the beginning of the record process of a task region +// loc_ref: Location of TDG, not used yet +// gtid: Global Thread ID of the encountering thread +// input_flags: Flags associated with the TDG +// tdg_id: ID of the TDG to record, for now, incremental integer +// returns: 1 if we record, otherwise, 0 +kmp_int32 __kmpc_start_record_task(ident_t *loc_ref, kmp_int32 gtid, + kmp_int32 input_flags, kmp_int32 tdg_id) { + + kmp_int32 res; + kmp_taskgraph_flags_t *flags = (kmp_taskgraph_flags_t *)&input_flags; + KA_TRACE(10, + ("__kmpc_start_record_task(enter): T#%d loc=%p flags=%d tdg_id=%d\n", + gtid, loc_ref, input_flags, tdg_id)); + __kmpc_taskgroup(loc_ref, gtid); + if (kmp_tdg_info_t *tdg = __kmp_find_tdg(tdg_id)) { + // TODO: use re_record flag + __kmp_exec_tdg(gtid, tdg); + res = 0; + } else { + __kmp_curr_tdg_idx = __kmp_num_tdg; + KMP_DEBUG_ASSERT(__kmp_curr_tdg_idx < NUM_TDG_LIMIT); + __kmp_start_record(gtid, flags, tdg_id); + __kmp_num_tdg++; + res = 1; + } + KA_TRACE(10, ("__kmpc_start_record_task(exit): T#%d TDG %d starts to %s\n", + gtid, tdg_id, res ? "record" : "execute")); + return res; +} + +// __kmp_end_record: set up a TDG after recording it +// gtid: Global thread ID +// tdg: Pointer to the TDG +void __kmp_end_record(kmp_int32 gtid, kmp_tdg_info_t *tdg) { + // Store roots + kmp_node_info_t *this_record_map = tdg->record_map; + kmp_int32 this_num_tasks = KMP_ATOMIC_LD_RLX(&tdg->num_tasks); + kmp_int32 *this_root_tasks = + (kmp_int32 *)__kmp_allocate(this_num_tasks * sizeof(kmp_int32)); + kmp_int32 this_map_size = tdg->map_size; + kmp_int32 this_num_roots = 0; + kmp_info_t *thread = __kmp_threads[gtid]; + + for (kmp_int32 i = 0; i < this_num_tasks; i++) { + if (this_record_map[i].npredecessors == 0) { + this_root_tasks[this_num_roots++] = i; + } + } + + // Update with roots info and mapsize + tdg->map_size = this_map_size; + tdg->num_roots = this_num_roots; + tdg->root_tasks = this_root_tasks; + KMP_DEBUG_ASSERT(tdg->tdg_status == KMP_TDG_RECORDING); + tdg->tdg_status = KMP_TDG_READY; + + if (thread->th.th_current_task->td_dephash) { + __kmp_dephash_free(thread, thread->th.th_current_task->td_dephash); + thread->th.th_current_task->td_dephash = NULL; + } + + // Reset predecessor counter + for (kmp_int32 i = 0; i < this_num_tasks; i++) { + KMP_ATOMIC_ST_RLX(&this_record_map[i].npredecessors_counter, + this_record_map[i].npredecessors); + } + KMP_ATOMIC_ST_RLX(&__kmp_tdg_task_id, 0); +} + +// __kmpc_end_record_task: wrapper around __kmp_end_record to mark +// the end of recording phase +// +// loc_ref: Source location information +// gtid: Global thread ID +// input_flags: Flags attached to the graph +// tdg_id: ID of the TDG just finished recording +void __kmpc_end_record_task(ident_t *loc_ref, kmp_int32 gtid, + kmp_int32 input_flags, kmp_int32 tdg_id) { + kmp_tdg_info_t *tdg = __kmp_find_tdg(tdg_id); + + KA_TRACE(10, ("__kmpc_end_record_task(enter): T#%d loc=%p finishes recording" + " tdg=%d with flags=%d\n", + gtid, loc_ref, tdg_id, input_flags)); + // TODO: use input_flags->nowait + __kmpc_end_taskgroup(loc_ref, gtid); + if (__kmp_tdg_is_recording(tdg->tdg_status)) { + __kmp_end_record(gtid, tdg); + } + KA_TRACE(10, ("__kmpc_end_record_task(exit): T#%d loc=%p finished recording" + " tdg=%d, its status is now READY\n", + gtid, loc_ref, tdg_id)); +} diff --git a/openmp/runtime/test/tasking/omp_record_replay.cpp b/openmp/runtime/test/tasking/omp_record_replay.cpp new file mode 100644 --- /dev/null +++ b/openmp/runtime/test/tasking/omp_record_replay.cpp @@ -0,0 +1,47 @@ +// RUN: %libomp-cxx-compile-and-run +#include +#include +#define NT 100 + +// Compiler-generated code (emulation) +typedef struct ident { + void* dummy; +} ident_t; + + +#ifdef __cplusplus +extern "C" { + int __kmpc_global_thread_num(ident_t *); + int __kmpc_start_record_task(ident_t *, int, int, int); + void __kmpc_end_record_task(ident_t *, int, int , int); +} +#endif + +void func(int *num_exec) { + (*num_exec)++; +} + +int main() { + int num_exec = 0; + int num_tasks = 0; + int x=0; + #pragma omp parallel + #pragma omp single + for (int iter = 0; iter < NT; ++iter) { + int gtid = __kmpc_global_thread_num(nullptr); + int res = __kmpc_start_record_task(nullptr, gtid, /* kmp_tdg_flags */ 0, /* tdg_id */0); + if (res) { + num_tasks++; + #pragma omp task + func(&num_exec); + } + __kmpc_end_record_task(nullptr, gtid, /* kmp_tdg_flags */0, /* tdg_id */0); + } + + assert(num_tasks==1); + assert(num_exec==NT); + + std::cout << "Passed" << std::endl; + return 0; +} +// CHECK: Passed \ No newline at end of file diff --git a/openmp/runtime/test/tasking/omp_record_replay_deps.cpp b/openmp/runtime/test/tasking/omp_record_replay_deps.cpp new file mode 100644 --- /dev/null +++ b/openmp/runtime/test/tasking/omp_record_replay_deps.cpp @@ -0,0 +1,62 @@ +// RUN: %libomp-cxx-compile-and-run +#include +#include +#define NT 100 +#define MULTIPLIER 100 +#define DECREMENT 5 + +int val; +// Compiler-generated code (emulation) +typedef struct ident { + void* dummy; +} ident_t; + + +#ifdef __cplusplus +extern "C" { + int __kmpc_global_thread_num(ident_t *); + int __kmpc_start_record_task(ident_t *, int, int, int); + void __kmpc_end_record_task(ident_t *, int, int, int); +} +#endif + +void sub() { + #pragma omp atomic + val -= DECREMENT; +} + +void add() { + #pragma omp atomic + val += DECREMENT; +} + +void mult() { + // no atomicity needed, can only be executed by 1 thread + // and no concurrency with other tasks possible + val *= MULTIPLIER; +} + +int main() { + val = 0; + int *x, *y; + #pragma omp parallel + #pragma omp single + for (int iter = 0; iter < NT; ++iter) { + int gtid = __kmpc_global_thread_num(nullptr); + int res = __kmpc_start_record_task(nullptr, gtid, /* kmp_tdg_flags */0, /* tdg_id */0); + if (res) { + #pragma omp task depend(out:y) + add(); + #pragma omp task depend(out:x) + sub(); + #pragma omp task depend(in:x,y) + mult(); + } + __kmpc_end_record_task(nullptr, gtid, /* kmp_tdg_flags */0, /* tdg_id */0); + } + assert(val==0); + + std::cout << "Passed" << std::endl; + return 0; +} +// CHECK: Passed \ No newline at end of file diff --git a/openmp/runtime/test/tasking/omp_record_replay_multiTDGs.cpp b/openmp/runtime/test/tasking/omp_record_replay_multiTDGs.cpp new file mode 100644 --- /dev/null +++ b/openmp/runtime/test/tasking/omp_record_replay_multiTDGs.cpp @@ -0,0 +1,75 @@ +// RUN: %libomp-cxx-compile-and-run +#include +#include +#define NT 20 +#define MULTIPLIER 100 +#define DECREMENT 5 + +// Compiler-generated code (emulation) +typedef struct ident { + void* dummy; +} ident_t; + +int val; +#ifdef __cplusplus +extern "C" { + int __kmpc_global_thread_num(ident_t *); + int __kmpc_start_record_task(ident_t *, int, int, int); + void __kmpc_end_record_task(ident_t *, int, int , int); +} +#endif + +void sub() { + #pragma omp atomic + val -= DECREMENT; +} + +void add() { + #pragma omp atomic + val += DECREMENT; +} + +void mult() { + // no atomicity needed, can only be executed by 1 thread + // and no concurrency with other tasks possible + val *= MULTIPLIER; +} + +int main() { + int num_tasks = 0; + int *x, *y; + #pragma omp parallel + #pragma omp single + for (int iter = 0; iter < NT; ++iter) { + int gtid = __kmpc_global_thread_num(nullptr); + int res = __kmpc_start_record_task(nullptr, gtid, /* kmp_tdg_flags */ 0, /* tdg_id */0); + if (res) { + num_tasks++; + #pragma omp task depend(out:y) + add(); + #pragma omp task depend(out:x) + sub(); + #pragma omp task depend(in:x,y) + mult(); + } + __kmpc_end_record_task(nullptr, gtid, /* kmp_tdg_flags */0, /* tdg_id */0); + res = __kmpc_start_record_task(nullptr, gtid, /* kmp_tdg_flags */ 0, /* tdg_id */1); + if (res) { + num_tasks++; + #pragma omp task depend(out:y) + add(); + #pragma omp task depend(out:x) + sub(); + #pragma omp task depend(in:x,y) + mult(); + } + __kmpc_end_record_task(nullptr, gtid, /* kmp_tdg_flags */0, /* tdg_id */1); + } + + assert(num_tasks==2); + assert(val==0); + + std::cout << "Passed" << std::endl; + return 0; +} +// CHECK: Passed \ No newline at end of file diff --git a/openmp/runtime/test/tasking/omp_record_replay_taskloop.cpp b/openmp/runtime/test/tasking/omp_record_replay_taskloop.cpp new file mode 100644 --- /dev/null +++ b/openmp/runtime/test/tasking/omp_record_replay_taskloop.cpp @@ -0,0 +1,49 @@ +// RUN: %libomp-cxx-compile-and-run +#include +#include + +#define NT 20 +#define N 128*128 + +typedef struct ident { + void* dummy; +} ident_t; + + +#ifdef __cplusplus +extern "C" { + int __kmpc_global_thread_num(ident_t *); + int __kmpc_start_record_task(ident_t *, int, int, int); + void __kmpc_end_record_task(ident_t *, int, int , int); +} +#endif + +int main() { + int num_tasks = 0; + + int array[N]; + for (int i = 0; i < N; ++i) + array[i] = 1; + + long sum = 0; + #pragma omp parallel + #pragma omp single + for (int iter = 0; iter < NT; ++iter) { + int gtid = __kmpc_global_thread_num(nullptr); + int res = __kmpc_start_record_task(nullptr, gtid, /* kmp_tdg_flags */0, /* tdg_id */0); + if (res) { + num_tasks++; + #pragma omp taskloop reduction(+:sum) num_tasks(4096) + for (int i = 0; i < N; ++i) { + sum += array[i]; + } + } + __kmpc_end_record_task(nullptr, gtid, /* kmp_tdg_flags */0, /* tdg_id */0); + } + assert(sum==N*NT); + assert(num_tasks==1); + + std::cout << "Passed" << std::endl; + return 0; +} +// CHECK: Passed \ No newline at end of file