diff --git a/openmp/runtime/src/i18n/en_US.txt b/openmp/runtime/src/i18n/en_US.txt --- a/openmp/runtime/src/i18n/en_US.txt +++ b/openmp/runtime/src/i18n/en_US.txt @@ -269,6 +269,7 @@ Using_uint_Value "%1$s value \"%2$u\" will be used." Using_uint64_Value "%1$s value \"%2$s\" will be used." Using_str_Value "%1$s value \"%2$s\" will be used." +BarrierPatternOverride "Mixing other barrier patterns with dist is prohibited. Using dist for all barrier patterns." MaxValueUsing "%1$s maximum value \"%2$d\" will be used." MinValueUsing "%1$s minimum value \"%2$d\" will be used." MemoryAllocFailed "Memory allocation failed." diff --git a/openmp/runtime/src/kmp.h b/openmp/runtime/src/kmp.h --- a/openmp/runtime/src/kmp.h +++ b/openmp/runtime/src/kmp.h @@ -115,6 +115,7 @@ #include "kmp_debug.h" #include "kmp_lock.h" #include "kmp_version.h" +#include "kmp_barrier.h" #if USE_DEBUGGER #include "kmp_debugger.h" #endif @@ -263,6 +264,7 @@ template class kmp_flag_32; template class kmp_flag_64; +template class kmp_atomic_flag_64; class kmp_flag_oncore; #ifdef __cplusplus @@ -1890,6 +1892,15 @@ 0 // Thread th_reap_state: not safe to reap (tasking) #define KMP_SAFE_TO_REAP 1 // Thread th_reap_state: safe to reap (not tasking) +// The flag_type describes the storage used for the flag. +enum flag_type { + flag32, /**< atomic 32 bit flags */ + flag64, /**< 64 bit flags */ + atomic_flag64, /**< atomic 64 bit flags */ + flag_oncore, /**< special 64-bit flag for on-core barrier (hierarchical) */ + flag_unset +}; + enum barrier_type { bs_plain_barrier = 0, /* 0, All non-fork/join barriers (except reduction barriers if enabled) */ @@ -1913,6 +1924,7 @@ bp_hyper_bar = 2, /* Hypercube-embedded tree with min branching factor 2^n */ bp_hierarchical_bar = 3, /* Machine hierarchy tree */ + bp_dist_bar = 4, /* Distributed barrier */ bp_last_bar /* Placeholder to mark the end */ } kmp_bar_pat_e; @@ -2637,6 +2649,7 @@ /* while awaiting queuing lock acquire */ volatile void *th_sleep_loc; // this points at a kmp_flag + flag_type th_sleep_loc_type; // enum type of flag stored in th_sleep_loc ident_t *th_ident; unsigned th_x; // Random number generator data @@ -2657,6 +2670,9 @@ written by the worker thread) */ kmp_uint8 th_active_in_pool; // included in count of #active threads in pool int th_active; // ! sleeping; 32 bits for TCR/TCW + std::atomic th_used_in_team; // Flag indicating use in team + // 0 = not used in team; 1 = used in team; + // 2 = transitioning to not used in team; 3 = transitioning to used in team struct cons_header *th_cons; // used for consistency check #if KMP_USE_HIER_SCHED // used for hierarchical scheduling @@ -2836,6 +2852,7 @@ #if USE_ITT_BUILD void *t_stack_id; // team specific stack stitching id (for ittnotify) #endif /* USE_ITT_BUILD */ + distributedBarrier *b; // Distributed barrier data associated with team } kmp_base_team_t; union KMP_ALIGN_CACHE kmp_team { @@ -4137,18 +4154,26 @@ extern void __kmp_suspend_32(int th_gtid, kmp_flag_32 *flag); template extern void __kmp_suspend_64(int th_gtid, kmp_flag_64 *flag); +template +extern void __kmp_atomic_suspend_64(int th_gtid, + kmp_atomic_flag_64 *flag); extern void __kmp_suspend_oncore(int th_gtid, kmp_flag_oncore *flag); #if KMP_HAVE_MWAIT || KMP_HAVE_UMWAIT template extern void __kmp_mwait_32(int th_gtid, kmp_flag_32 *flag); template extern void __kmp_mwait_64(int th_gtid, kmp_flag_64 *flag); +template +extern void __kmp_atomic_mwait_64(int th_gtid, kmp_atomic_flag_64 *flag); extern void __kmp_mwait_oncore(int th_gtid, kmp_flag_oncore *flag); #endif template extern void __kmp_resume_32(int target_gtid, kmp_flag_32 *flag); template extern void __kmp_resume_64(int target_gtid, kmp_flag_64 *flag); +template +extern void __kmp_atomic_resume_64(int target_gtid, + kmp_atomic_flag_64 *flag); extern void __kmp_resume_oncore(int target_gtid, kmp_flag_oncore *flag); template @@ -4167,6 +4192,14 @@ void *itt_sync_obj, #endif /* USE_ITT_BUILD */ kmp_int32 is_constrained); +template +int __kmp_atomic_execute_tasks_64(kmp_info_t *thread, kmp_int32 gtid, + kmp_atomic_flag_64 *flag, + int final_spin, int *thread_finished, +#if USE_ITT_BUILD + void *itt_sync_obj, +#endif /* USE_ITT_BUILD */ + kmp_int32 is_constrained); int __kmp_execute_tasks_oncore(kmp_info_t *thread, kmp_int32 gtid, kmp_flag_oncore *flag, int final_spin, int *thread_finished, diff --git a/openmp/runtime/src/kmp_atomic.cpp b/openmp/runtime/src/kmp_atomic.cpp --- a/openmp/runtime/src/kmp_atomic.cpp +++ b/openmp/runtime/src/kmp_atomic.cpp @@ -732,7 +732,7 @@ #define OP_UPDATE_CRITICAL(TYPE, OP, LCK_ID) \ __kmp_acquire_atomic_lock(&ATOMIC_LOCK##LCK_ID, gtid); \ - (*lhs) = (TYPE)((*lhs)OP((TYPE)rhs)); \ + (*lhs) = (TYPE)((*lhs)OP rhs); \ __kmp_release_atomic_lock(&ATOMIC_LOCK##LCK_ID, gtid); // ------------------------------------------------------------------------ @@ -791,14 +791,14 @@ { \ TYPE old_value, new_value; \ old_value = *(TYPE volatile *)lhs; \ - new_value = (TYPE)(old_value OP((TYPE)rhs)); \ + new_value = (TYPE)(old_value OP rhs); \ while (!KMP_COMPARE_AND_STORE_ACQ##BITS( \ (kmp_int##BITS *)lhs, *VOLATILE_CAST(kmp_int##BITS *) & old_value, \ *VOLATILE_CAST(kmp_int##BITS *) & new_value)) { \ KMP_DO_PAUSE; \ \ old_value = *(TYPE volatile *)lhs; \ - new_value = (TYPE)(old_value OP((TYPE)rhs)); \ + new_value = (TYPE)(old_value OP rhs); \ } \ } diff --git a/openmp/runtime/src/kmp_barrier.h b/openmp/runtime/src/kmp_barrier.h new file mode 100644 --- /dev/null +++ b/openmp/runtime/src/kmp_barrier.h @@ -0,0 +1,109 @@ +/* + * kmp_barrier.h + */ + +//===----------------------------------------------------------------------===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#ifndef KMP_BARRIER_H +#define KMP_BARRIER_H + +#include "kmp.h" + +// Use four cache lines: MLC tends to prefetch the next or previous cache line +// creating a possible fake conflict between cores, so this is the only way to +// guarantee that no such prefetch can happen. +#ifndef KMP_FOURLINE_ALIGN_CACHE +#define KMP_FOURLINE_ALIGN_CACHE KMP_ALIGN(4 * CACHE_LINE) +#endif + +#define KMP_OPTIMIZE_FOR_REDUCTIONS 0 + +class distributedBarrier { + struct flags_s { + kmp_uint32 volatile KMP_FOURLINE_ALIGN_CACHE stillNeed; + }; + + struct go_s { + std::atomic KMP_FOURLINE_ALIGN_CACHE go; + }; + + struct iter_s { + kmp_uint64 volatile KMP_FOURLINE_ALIGN_CACHE iter; + }; + + struct sleep_s { + std::atomic KMP_FOURLINE_ALIGN_CACHE sleep; + }; + + void init(size_t nthr); + void resize(size_t nthr); + void computeGo(size_t n); + void computeVarsForN(size_t n); + +public: + enum { + MAX_ITERS = 3, + MAX_GOS = 8, + IDEAL_GOS = 4, + IDEAL_CONTENTION = 16, + }; + + flags_s *flags[MAX_ITERS]; + go_s *go; + iter_s *iter; + sleep_s *sleep; + + size_t KMP_ALIGN_CACHE num_threads; // number of threads in barrier + size_t KMP_ALIGN_CACHE max_threads; // size of arrays in data structure + // number of go signals each requiring one write per iteration + size_t KMP_ALIGN_CACHE num_gos; + // number of groups of gos + size_t KMP_ALIGN_CACHE num_groups; + // threads per go signal + size_t KMP_ALIGN_CACHE threads_per_go; + bool KMP_ALIGN_CACHE fix_threads_per_go; + // threads per group + size_t KMP_ALIGN_CACHE threads_per_group; + // number of go signals in a group + size_t KMP_ALIGN_CACHE gos_per_group; + void *team_icvs; + + distributedBarrier() = delete; + ~distributedBarrier() = delete; + + // Used instead of constructor to create aligned data + static distributedBarrier *allocate(int nThreads) { + distributedBarrier *d = (distributedBarrier *)_mm_malloc( + sizeof(distributedBarrier), 4 * CACHE_LINE); + d->num_threads = 0; + d->max_threads = 0; + for (int i = 0; i < MAX_ITERS; ++i) + d->flags[i] = NULL; + d->go = NULL; + d->iter = NULL; + d->sleep = NULL; + d->team_icvs = NULL; + d->fix_threads_per_go = false; + // calculate gos and groups ONCE on base size + d->computeGo(nThreads); + d->init(nThreads); + return d; + } + + static void deallocate(distributedBarrier *db) { _mm_free(db); } + + void update_num_threads(size_t nthr) { init(nthr); } + + bool need_resize(size_t new_nthr) { return (new_nthr > max_threads); } + size_t get_num_threads() { return num_threads; } + kmp_uint64 go_release(); + void go_reset(); +}; + +#endif // KMP_BARRIER_H diff --git a/openmp/runtime/src/kmp_barrier.cpp b/openmp/runtime/src/kmp_barrier.cpp --- a/openmp/runtime/src/kmp_barrier.cpp +++ b/openmp/runtime/src/kmp_barrier.cpp @@ -10,12 +10,14 @@ // //===----------------------------------------------------------------------===// -#include "kmp.h" #include "kmp_wait_release.h" +#include "kmp_barrier.h" #include "kmp_itt.h" #include "kmp_os.h" #include "kmp_stats.h" #include "ompt-specific.h" +// for distributed barrier +#include "kmp_affinity.h" #if KMP_MIC #include @@ -40,6 +42,517 @@ void __kmp_print_structure(void); // Forward declaration // ---------------------------- Barrier Algorithms ---------------------------- +// Distributed barrier + +// Compute how many threads to have polling each cache-line. +// We want to limit the number of writes to IDEAL_GO_RESOLUTION. +void distributedBarrier::computeVarsForN(size_t n) { + int nsockets = 1; + if (__kmp_topology) { + int socket_level = __kmp_topology->get_level(KMP_HW_SOCKET); + int core_level = __kmp_topology->get_level(KMP_HW_CORE); + int ncores_per_socket = + __kmp_topology->calculate_ratio(core_level, socket_level); + nsockets = __kmp_topology->get_count(socket_level); + + if (nsockets <= 0) + nsockets = 1; + if (ncores_per_socket <= 0) + ncores_per_socket = 1; + + threads_per_go = ncores_per_socket >> 1; + if (!fix_threads_per_go) { + // Minimize num_gos + if (threads_per_go > 4) { + if (KMP_OPTIMIZE_FOR_REDUCTIONS) { + threads_per_go = threads_per_go >> 1; + } + if (threads_per_go > 4 && nsockets == 1) + threads_per_go = threads_per_go >> 1; + } + } + if (threads_per_go == 0) + threads_per_go = 1; + fix_threads_per_go = true; + num_gos = n / threads_per_go; + if (n % threads_per_go) + num_gos++; + if (nsockets == 1 || num_gos == 1) + num_groups = 1; + else { + num_groups = num_gos / nsockets; + if (num_gos % nsockets) + num_groups++; + } + if (num_groups <= 0) + num_groups = 1; + gos_per_group = num_gos / num_groups; + if (num_gos % num_groups) + gos_per_group++; + threads_per_group = threads_per_go * gos_per_group; + } else { + num_gos = n / threads_per_go; + if (n % threads_per_go) + num_gos++; + if (num_gos == 1) + num_groups = 1; + else { + num_groups = num_gos / 2; + if (num_gos % 2) + num_groups++; + } + gos_per_group = num_gos / num_groups; + if (num_gos % num_groups) + gos_per_group++; + threads_per_group = threads_per_go * gos_per_group; + } +} + +void distributedBarrier::computeGo(size_t n) { + // Minimize num_gos + for (num_gos = 1;; num_gos++) + if (IDEAL_CONTENTION * num_gos >= n) + break; + threads_per_go = n / num_gos; + if (n % num_gos) + threads_per_go++; + while (num_gos > MAX_GOS) { + threads_per_go++; + num_gos = n / threads_per_go; + if (n % threads_per_go) + num_gos++; + } + computeVarsForN(n); +} + +// This function is to resize the barrier arrays when the new number of threads +// exceeds max_threads, which is the current size of all the arrays +void distributedBarrier::resize(size_t nthr) { + KMP_DEBUG_ASSERT(nthr > max_threads); + + // expand to requested size * 2 + max_threads = nthr * 2; + + // allocate arrays to new max threads + for (int i = 0; i < MAX_ITERS; ++i) { + if (flags[i]) + flags[i] = (flags_s *)KMP_INTERNAL_REALLOC(flags[i], + max_threads * sizeof(flags_s)); + else + flags[i] = (flags_s *)KMP_INTERNAL_MALLOC(max_threads * sizeof(flags_s)); + } + + if (go) + go = (go_s *)KMP_INTERNAL_REALLOC(go, max_threads * sizeof(go_s)); + else + go = (go_s *)KMP_INTERNAL_MALLOC(max_threads * sizeof(go_s)); + + if (iter) + iter = (iter_s *)KMP_INTERNAL_REALLOC(iter, max_threads * sizeof(iter_s)); + else + iter = (iter_s *)KMP_INTERNAL_MALLOC(max_threads * sizeof(iter_s)); + + if (sleep) + sleep = + (sleep_s *)KMP_INTERNAL_REALLOC(sleep, max_threads * sizeof(sleep_s)); + else + sleep = (sleep_s *)KMP_INTERNAL_MALLOC(max_threads * sizeof(sleep_s)); +} + +// This function is to set all the go flags that threads might be waiting +// on, and when blocktime is not infinite, it should be followed by a wake-up +// call to each thread +kmp_uint64 distributedBarrier::go_release() { + kmp_uint64 next_go = iter[0].iter + distributedBarrier::MAX_ITERS; + for (size_t j = 0; j < num_gos; j++) { + go[j].go.store(next_go); + } + return next_go; +} + +void distributedBarrier::go_reset() { + for (size_t j = 0; j < max_threads; ++j) { + for (size_t i = 0; i < distributedBarrier::MAX_ITERS; ++i) { + flags[i][j].stillNeed = 1; + } + go[j].go.store(0); + iter[j].iter = 0; + } +} + +// This function inits/re-inits the distributed barrier for a particular number +// of threads. If a resize of arrays is needed, it calls the resize function. +void distributedBarrier::init(size_t nthr) { + size_t old_max = max_threads; + if (nthr > max_threads) { // need more space in arrays + resize(nthr); + } + + for (size_t i = 0; i < max_threads; i++) { + for (size_t j = 0; j < distributedBarrier::MAX_ITERS; j++) { + flags[j][i].stillNeed = 1; + } + go[i].go.store(0); + iter[i].iter = 0; + if (i >= old_max) + sleep[i].sleep = false; + } + + // Recalculate num_gos, etc. based on new nthr + computeVarsForN(nthr); + + num_threads = nthr; + + if (team_icvs == NULL) + team_icvs = __kmp_allocate(sizeof(kmp_internal_control_t)); +} + +// This function is used only when KMP_BLOCKTIME is not infinite. +// static +void __kmp_dist_barrier_wakeup(enum barrier_type bt, kmp_team_t *team, + size_t start, size_t stop, size_t inc, + size_t tid) { + KMP_DEBUG_ASSERT(__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME); + if (bt == bs_forkjoin_barrier && TCR_4(__kmp_global.g.g_done)) + return; + + kmp_info_t **other_threads = team->t.t_threads; + for (size_t thr = start; thr < stop; thr += inc) { + KMP_DEBUG_ASSERT(other_threads[thr]); + int gtid = other_threads[thr]->th.th_info.ds.ds_gtid; + // Wake up worker regardless of if it appears to be sleeping or not + __kmp_atomic_resume_64(gtid, (kmp_atomic_flag_64<> *)NULL); + } +} + +static void +__kmp_dist_barrier_gather(enum barrier_type bt, kmp_info_t *this_thr, int gtid, + int tid, void (*reduce)(void *, void *) + USE_ITT_BUILD_ARG(void *itt_sync_obj)) { + KMP_TIME_DEVELOPER_PARTITIONED_BLOCK(KMP_dist_gather); + kmp_team_t *team; + distributedBarrier *b; + kmp_info_t **other_threads; + kmp_uint64 my_current_iter, my_next_iter; + kmp_uint32 nproc; + bool group_leader; + + team = this_thr->th.th_team; + nproc = this_thr->th.th_team_nproc; + other_threads = team->t.t_threads; + b = team->t.b; + my_current_iter = b->iter[tid].iter; + my_next_iter = (my_current_iter + 1) % distributedBarrier::MAX_ITERS; + group_leader = ((tid % b->threads_per_group) == 0); + + KA_TRACE(20, + ("__kmp_dist_barrier_gather: T#%d(%d:%d) enter; barrier type %d\n", + gtid, team->t.t_id, tid, bt)); + +#if USE_ITT_BUILD && USE_ITT_NOTIFY + // Barrier imbalance - save arrive time to the thread + if (__kmp_forkjoin_frames_mode == 3 || __kmp_forkjoin_frames_mode == 2) { + this_thr->th.th_bar_arrive_time = this_thr->th.th_bar_min_time = + __itt_get_timestamp(); + } +#endif + + if (group_leader) { + // Start from the thread after the group leader + size_t group_start = tid + 1; + size_t group_end = tid + b->threads_per_group; + size_t threads_pending = 0; + + if (group_end > nproc) + group_end = nproc; + do { // wait for threads in my group + threads_pending = 0; + // Check all the flags every time to avoid branch misspredict + for (size_t thr = group_start; thr < group_end; thr++) { + // Each thread uses a different cache line + threads_pending += b->flags[my_current_iter][thr].stillNeed; + } + // Execute tasks here + if (__kmp_tasking_mode != tskm_immediate_exec) { + kmp_task_team_t *task_team = this_thr->th.th_task_team; + if (task_team != NULL) { + if (TCR_SYNC_4(task_team->tt.tt_active)) { + if (KMP_TASKING_ENABLED(task_team)) { + int tasks_completed = FALSE; + __kmp_atomic_execute_tasks_64( + this_thr, gtid, (kmp_atomic_flag_64<> *)NULL, FALSE, + &tasks_completed USE_ITT_BUILD_ARG(itt_sync_obj), 0); + } else + this_thr->th.th_reap_state = KMP_SAFE_TO_REAP; + } + } else { + this_thr->th.th_reap_state = KMP_SAFE_TO_REAP; + } // if + } + if (TCR_4(__kmp_global.g.g_done)) { + if (__kmp_global.g.g_abort) + __kmp_abort_thread(); + break; + } else if (__kmp_tasking_mode != tskm_immediate_exec && + this_thr->th.th_reap_state == KMP_SAFE_TO_REAP) { + this_thr->th.th_reap_state = KMP_NOT_SAFE_TO_REAP; + } + } while (threads_pending > 0); + + if (reduce) { // Perform reduction if needed + OMPT_REDUCTION_DECL(this_thr, gtid); + OMPT_REDUCTION_BEGIN; + // Group leader reduces all threads in group + for (size_t thr = group_start; thr < group_end; thr++) { + (*reduce)(this_thr->th.th_local.reduce_data, + other_threads[thr]->th.th_local.reduce_data); + } + OMPT_REDUCTION_END; + } + + // Set flag for next iteration + b->flags[my_next_iter][tid].stillNeed = 1; + // Each thread uses a different cache line; resets stillNeed to 0 to + // indicate it has reached the barrier + b->flags[my_current_iter][tid].stillNeed = 0; + + do { // wait for all group leaders + threads_pending = 0; + for (size_t thr = 0; thr < nproc; thr += b->threads_per_group) { + threads_pending += b->flags[my_current_iter][thr].stillNeed; + } + // Execute tasks here + if (__kmp_tasking_mode != tskm_immediate_exec) { + kmp_task_team_t *task_team = this_thr->th.th_task_team; + if (task_team != NULL) { + if (TCR_SYNC_4(task_team->tt.tt_active)) { + if (KMP_TASKING_ENABLED(task_team)) { + int tasks_completed = FALSE; + __kmp_atomic_execute_tasks_64( + this_thr, gtid, (kmp_atomic_flag_64<> *)NULL, FALSE, + &tasks_completed USE_ITT_BUILD_ARG(itt_sync_obj), 0); + } else + this_thr->th.th_reap_state = KMP_SAFE_TO_REAP; + } + } else { + this_thr->th.th_reap_state = KMP_SAFE_TO_REAP; + } // if + } + if (TCR_4(__kmp_global.g.g_done)) { + if (__kmp_global.g.g_abort) + __kmp_abort_thread(); + break; + } else if (__kmp_tasking_mode != tskm_immediate_exec && + this_thr->th.th_reap_state == KMP_SAFE_TO_REAP) { + this_thr->th.th_reap_state = KMP_NOT_SAFE_TO_REAP; + } + } while (threads_pending > 0); + + if (reduce) { // Perform reduction if needed + if (KMP_MASTER_TID(tid)) { // Master reduces over group leaders + OMPT_REDUCTION_DECL(this_thr, gtid); + OMPT_REDUCTION_BEGIN; + for (size_t thr = b->threads_per_group; thr < nproc; + thr += b->threads_per_group) { + (*reduce)(this_thr->th.th_local.reduce_data, + other_threads[thr]->th.th_local.reduce_data); + } + OMPT_REDUCTION_END; + } + } + } else { + // Set flag for next iteration + b->flags[my_next_iter][tid].stillNeed = 1; + // Each thread uses a different cache line; resets stillNeed to 0 to + // indicate it has reached the barrier + b->flags[my_current_iter][tid].stillNeed = 0; + } + + KMP_MFENCE(); + + KA_TRACE(20, + ("__kmp_dist_barrier_gather: T#%d(%d:%d) exit for barrier type %d\n", + gtid, team->t.t_id, tid, bt)); +} + +static void __kmp_dist_barrier_release( + enum barrier_type bt, kmp_info_t *this_thr, int gtid, int tid, + int propagate_icvs USE_ITT_BUILD_ARG(void *itt_sync_obj)) { + KMP_TIME_DEVELOPER_PARTITIONED_BLOCK(KMP_dist_release); + kmp_team_t *team; + distributedBarrier *b; + kmp_bstate_t *thr_bar; + kmp_uint64 my_current_iter, next_go; + size_t my_go_index; + bool group_leader; + + KA_TRACE(20, ("__kmp_dist_barrier_release: T#%d(%d) enter; barrier type %d\n", + gtid, tid, bt)); + + thr_bar = &this_thr->th.th_bar[bt].bb; + + if (!KMP_MASTER_TID(tid)) { + // workers and non-master group leaders need to check their presence in team + do { + if (this_thr->th.th_used_in_team.load() != 1 && + this_thr->th.th_used_in_team.load() != 3) { + // Thread is not in use in a team. Wait on location in tid's thread + // struct. The 0 value tells anyone looking that this thread is spinning + // or sleeping until this location becomes 3 again; 3 is the transition + // state to get to 1 which is waiting on go and being in the team + kmp_flag_32 my_flag(&(this_thr->th.th_used_in_team), 3); + if (KMP_COMPARE_AND_STORE_ACQ32(&(this_thr->th.th_used_in_team), 2, + 0) || + this_thr->th.th_used_in_team.load() == 0) { + my_flag.wait(this_thr, true, itt_sync_obj); + } +#if USE_ITT_BUILD && USE_ITT_NOTIFY + if ((__itt_sync_create_ptr && itt_sync_obj == NULL) || KMP_ITT_DEBUG) { + // In fork barrier where we could not get the object reliably + itt_sync_obj = + __kmp_itt_barrier_object(gtid, bs_forkjoin_barrier, 0, -1); + // Cancel wait on previous parallel region... + __kmp_itt_task_starting(itt_sync_obj); + + if (bt == bs_forkjoin_barrier && TCR_4(__kmp_global.g.g_done)) + return; + + itt_sync_obj = __kmp_itt_barrier_object(gtid, bs_forkjoin_barrier); + if (itt_sync_obj != NULL) + // Call prepare as early as possible for "new" barrier + __kmp_itt_task_finished(itt_sync_obj); + } else +#endif /* USE_ITT_BUILD && USE_ITT_NOTIFY */ + if (bt == bs_forkjoin_barrier && TCR_4(__kmp_global.g.g_done)) + return; + } + if (this_thr->th.th_used_in_team.load() != 1 && + this_thr->th.th_used_in_team.load() != 3) // spurious wake-up? + continue; + if (bt == bs_forkjoin_barrier && TCR_4(__kmp_global.g.g_done)) + return; + + // At this point, the thread thinks it is in use in a team, or in + // transition to be used in a team, but it might have reached this barrier + // before it was marked unused by the team. Unused threads are awoken and + // shifted to wait on local thread struct elsewhere. It also might reach + // this point by being picked up for use by a different team. Either way, + // we need to update the tid. + tid = __kmp_tid_from_gtid(gtid); + team = this_thr->th.th_team; + KMP_DEBUG_ASSERT(tid >= 0); + KMP_DEBUG_ASSERT(team); + b = team->t.b; + my_current_iter = b->iter[tid].iter; + next_go = my_current_iter + distributedBarrier::MAX_ITERS; + my_go_index = tid / b->threads_per_go; + if (this_thr->th.th_used_in_team.load() == 3) { + KMP_COMPARE_AND_STORE_ACQ32(&(this_thr->th.th_used_in_team), 3, 1); + } + // Check if go flag is set + if (b->go[my_go_index].go.load() != next_go) { + // Wait on go flag on team + kmp_atomic_flag_64 my_flag( + &(b->go[my_go_index].go), next_go, &(b->sleep[tid].sleep)); + my_flag.wait(this_thr, true, itt_sync_obj); + KMP_DEBUG_ASSERT(my_current_iter == b->iter[tid].iter || + b->iter[tid].iter == 0); + KMP_DEBUG_ASSERT(b->sleep[tid].sleep == false); + } + + if (bt == bs_forkjoin_barrier && TCR_4(__kmp_global.g.g_done)) + return; + // At this point, the thread's go location was set. This means the primary + // thread is safely in the barrier, and so this thread's data is + // up-to-date, but we should check again that this thread is really in + // use in the team, as it could have been woken up for the purpose of + // changing team size, or reaping threads at shutdown. + if (this_thr->th.th_used_in_team.load() == 1) + break; + } while (1); + + if (bt == bs_forkjoin_barrier && TCR_4(__kmp_global.g.g_done)) + return; + + group_leader = ((tid % b->threads_per_group) == 0); + if (group_leader) { + // Tell all the threads in my group they can go! + for (size_t go_idx = my_go_index + 1; + go_idx < my_go_index + b->gos_per_group; go_idx++) { + b->go[go_idx].go.store(next_go); + } + // Fence added so that workers can see changes to go. sfence inadequate. + KMP_MFENCE(); + } + +#if KMP_BARRIER_ICV_PUSH + if (propagate_icvs) { // copy ICVs to final dest + __kmp_init_implicit_task(team->t.t_ident, team->t.t_threads[tid], team, + tid, FALSE); + copy_icvs(&team->t.t_implicit_task_taskdata[tid].td_icvs, + (kmp_internal_control_t *)team->t.b->team_icvs); + copy_icvs(&thr_bar->th_fixed_icvs, + &team->t.t_implicit_task_taskdata[tid].td_icvs); + } +#endif + if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME && group_leader) { + // This thread is now awake and participating in the barrier; + // wake up the other threads in the group + size_t nproc = this_thr->th.th_team_nproc; + size_t group_end = tid + b->threads_per_group; + if (nproc < group_end) + group_end = nproc; + __kmp_dist_barrier_wakeup(bt, team, tid + 1, group_end, 1, tid); + } + } else { // Primary thread + team = this_thr->th.th_team; + b = team->t.b; + my_current_iter = b->iter[tid].iter; + next_go = my_current_iter + distributedBarrier::MAX_ITERS; +#if KMP_BARRIER_ICV_PUSH + if (propagate_icvs) { + // primary thread has ICVs in final destination; copy + copy_icvs(&thr_bar->th_fixed_icvs, + &team->t.t_implicit_task_taskdata[tid].td_icvs); + } +#endif + // Tell all the group leaders they can go! + for (size_t go_idx = 0; go_idx < b->num_gos; go_idx += b->gos_per_group) { + b->go[go_idx].go.store(next_go); + } + + if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME) { + // Wake-up the group leaders + size_t nproc = this_thr->th.th_team_nproc; + __kmp_dist_barrier_wakeup(bt, team, tid + b->threads_per_group, nproc, + b->threads_per_group, tid); + } + + // Tell all the threads in my group they can go! + for (size_t go_idx = 1; go_idx < b->gos_per_group; go_idx++) { + b->go[go_idx].go.store(next_go); + } + + // Fence added so that workers can see changes to go. sfence inadequate. + KMP_MFENCE(); + + if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME) { + // Wake-up the other threads in my group + size_t nproc = this_thr->th.th_team_nproc; + size_t group_end = tid + b->threads_per_group; + if (nproc < group_end) + group_end = nproc; + __kmp_dist_barrier_wakeup(bt, team, tid + 1, group_end, 1, tid); + } + } + // Update to next iteration + KMP_ASSERT(my_current_iter == b->iter[tid].iter); + b->iter[tid].iter = (b->iter[tid].iter + 1) % distributedBarrier::MAX_ITERS; + + KA_TRACE( + 20, ("__kmp_dist_barrier_release: T#%d(%d:%d) exit for barrier type %d\n", + gtid, team->t.t_id, tid, bt)); +} // Linear Barrier template @@ -1394,6 +1907,11 @@ bt, this_thr, gtid, tid, reduce USE_ITT_BUILD_ARG(itt_sync_obj)); } else { switch (__kmp_barrier_gather_pattern[bt]) { + case bp_dist_bar: { + __kmp_dist_barrier_gather(bt, this_thr, gtid, tid, + reduce USE_ITT_BUILD_ARG(itt_sync_obj)); + break; + } case bp_hyper_bar: { // don't set branch bits to 0; use linear KMP_ASSERT(__kmp_barrier_gather_branch_bits[bt]); @@ -1507,6 +2025,12 @@ bt, this_thr, gtid, tid, FALSE USE_ITT_BUILD_ARG(itt_sync_obj)); } else { switch (__kmp_barrier_release_pattern[bt]) { + case bp_dist_bar: { + KMP_ASSERT(__kmp_barrier_release_branch_bits[bt]); + __kmp_dist_barrier_release(bt, this_thr, gtid, tid, + FALSE USE_ITT_BUILD_ARG(itt_sync_obj)); + break; + } case bp_hyper_bar: { KMP_ASSERT(__kmp_barrier_release_branch_bits[bt]); __kmp_hyper_barrier_release(bt, this_thr, gtid, tid, @@ -1638,6 +2162,11 @@ if (!team->t.t_serialized) { if (KMP_MASTER_GTID(gtid)) { switch (__kmp_barrier_release_pattern[bt]) { + case bp_dist_bar: { + __kmp_dist_barrier_release(bt, this_thr, gtid, tid, + FALSE USE_ITT_BUILD_ARG(NULL)); + break; + } case bp_hyper_bar: { KMP_ASSERT(__kmp_barrier_release_branch_bits[bt]); __kmp_hyper_barrier_release(bt, this_thr, gtid, tid, @@ -1749,8 +2278,8 @@ if (__kmp_tasking_mode == tskm_extra_barrier) { __kmp_tasking_barrier(team, this_thr, gtid); - KA_TRACE(10, ("__kmp_join_barrier: T#%d(%d:%d) past taking barrier\n", gtid, - team_id, tid)); + KA_TRACE(10, ("__kmp_join_barrier: T#%d(%d:%d) past tasking barrier\n", + gtid, team_id, tid)); } #ifdef KMP_DEBUG if (__kmp_tasking_mode != tskm_immediate_exec) { @@ -1759,8 +2288,9 @@ __kmp_gtid_from_thread(this_thr), team_id, team->t.t_task_team[this_thr->th.th_task_state], this_thr->th.th_task_team)); - KMP_DEBUG_ASSERT(this_thr->th.th_task_team == - team->t.t_task_team[this_thr->th.th_task_state]); + if (this_thr->th.th_task_team) + KMP_DEBUG_ASSERT(this_thr->th.th_task_team == + team->t.t_task_team[this_thr->th.th_task_state]); } #endif /* KMP_DEBUG */ @@ -1786,6 +2316,11 @@ #endif /* USE_ITT_BUILD */ switch (__kmp_barrier_gather_pattern[bs_forkjoin_barrier]) { + case bp_dist_bar: { + __kmp_dist_barrier_gather(bs_forkjoin_barrier, this_thr, gtid, tid, + NULL USE_ITT_BUILD_ARG(itt_sync_obj)); + break; + } case bp_hyper_bar: { KMP_ASSERT(__kmp_barrier_gather_branch_bits[bs_forkjoin_barrier]); __kmp_hyper_barrier_gather(bs_forkjoin_barrier, this_thr, gtid, tid, @@ -1831,8 +2366,7 @@ team_thread->th.th_stats->setIdleFlag(); if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME && team_thread->th.th_sleep_loc != NULL) - __kmp_null_resume_wrapper(__kmp_gtid_from_thread(team_thread), - team_thread->th.th_sleep_loc); + __kmp_null_resume_wrapper(team_thread); } #endif #if USE_ITT_BUILD @@ -1979,6 +2513,11 @@ } // primary thread switch (__kmp_barrier_release_pattern[bs_forkjoin_barrier]) { + case bp_dist_bar: { + __kmp_dist_barrier_release(bs_forkjoin_barrier, this_thr, gtid, tid, + TRUE USE_ITT_BUILD_ARG(NULL)); + break; + } case bp_hyper_bar: { KMP_ASSERT(__kmp_barrier_release_branch_bits[bs_forkjoin_barrier]); __kmp_hyper_barrier_release(bs_forkjoin_barrier, this_thr, gtid, tid, diff --git a/openmp/runtime/src/kmp_global.cpp b/openmp/runtime/src/kmp_global.cpp --- a/openmp/runtime/src/kmp_global.cpp +++ b/openmp/runtime/src/kmp_global.cpp @@ -110,8 +110,8 @@ "reduction" #endif // KMP_FAST_REDUCTION_BARRIER }; -char const *__kmp_barrier_pattern_name[bp_last_bar] = {"linear", "tree", - "hyper", "hierarchical"}; +char const *__kmp_barrier_pattern_name[bp_last_bar] = { + "linear", "tree", "hyper", "hierarchical", "dist"}; int __kmp_allThreadsSpecified = 0; size_t __kmp_align_alloc = CACHE_LINE; diff --git a/openmp/runtime/src/kmp_os.h b/openmp/runtime/src/kmp_os.h --- a/openmp/runtime/src/kmp_os.h +++ b/openmp/runtime/src/kmp_os.h @@ -1019,6 +1019,27 @@ #define KMP_MB() /* nothing to do */ #endif +#if KMP_ARCH_X86 || KMP_ARCH_X86_64 +#if KMP_COMPILER_ICC +#define KMP_MFENCE_() _mm_mfence() +#define KMP_SFENCE_() _mm_sfence() +#elif KMP_COMPILER_MSVC +#define KMP_MFENCE_() MemoryBarrier() +#define KMP_SFENCE_() MemoryBarrier() +#else +#define KMP_MFENCE_() __sync_synchronize() +#define KMP_SFENCE_() __sync_synchronize() +#endif +#define KMP_MFENCE() \ + if (UNLIKELY(!__kmp_cpuinfo.initialized)) { \ + __kmp_query_cpuid(&__kmp_cpuinfo); \ + } \ + if (__kmp_cpuinfo.sse2) { \ + KMP_MFENCE_(); \ + } +#define KMP_SFENCE() KMP_SFENCE_() +#endif + #ifndef KMP_IMB #define KMP_IMB() /* nothing to do */ #endif diff --git a/openmp/runtime/src/kmp_runtime.cpp b/openmp/runtime/src/kmp_runtime.cpp --- a/openmp/runtime/src/kmp_runtime.cpp +++ b/openmp/runtime/src/kmp_runtime.cpp @@ -109,6 +109,10 @@ static void __kmp_reap_thread(kmp_info_t *thread, int is_root); kmp_info_t *__kmp_thread_pool_insert_pt = NULL; +void __kmp_resize_dist_barrier(kmp_team_t *team, int old_nthreads, + int new_nthreads); +void __kmp_add_threads_to_team(kmp_team_t *team, int new_nthreads); + /* Calculate the identifier of the current thread */ /* fast (and somewhat portable) way to get unique identifier of executing thread. Returns KMP_GTID_DNE if we haven't been assigned a gtid. */ @@ -1206,7 +1210,7 @@ this_thr->th.th_team = serial_team; serial_team->t.t_master_tid = this_thr->th.th_info.ds.ds_tid; - KF_TRACE(10, ("__kmpc_serialized_parallel: T#d curtask=%p\n", global_tid, + KF_TRACE(10, ("__kmpc_serialized_parallel: T#%d curtask=%p\n", global_tid, this_thr->th.th_current_task)); KMP_ASSERT(this_thr->th.th_current_task->td_flags.executing == 1); this_thr->th.th_current_task->td_flags.executing = 0; @@ -1565,15 +1569,24 @@ /* Change number of threads in the team if requested */ if (master_set_numthreads) { // The parallel has num_threads clause - if (master_set_numthreads < master_th->th.th_teams_size.nth) { + if (master_set_numthreads <= master_th->th.th_teams_size.nth) { // AC: only can reduce number of threads dynamically, can't increase kmp_info_t **other_threads = parent_team->t.t_threads; + // NOTE: if using distributed barrier, we need to run this code block + // even when the team size appears not to have changed from the max. + int old_proc = master_th->th.th_teams_size.nth; + if (__kmp_barrier_release_pattern[bs_forkjoin_barrier] == + bp_dist_bar) { + __kmp_resize_dist_barrier(parent_team, old_proc, + master_set_numthreads); + __kmp_add_threads_to_team(parent_team, master_set_numthreads); + } parent_team->t.t_nproc = master_set_numthreads; for (i = 0; i < master_set_numthreads; ++i) { other_threads[i]->th.th_team_nproc = master_set_numthreads; } - // Keep extra threads hot in the team for possible next parallels } + // Keep extra threads hot in the team for possible next parallels master_th->th.th_set_nproc = 0; } @@ -1637,6 +1650,9 @@ } #endif + // Need this to happen before we determine the number of threads, not while + // we are allocating the team + //__kmp_push_current_task_to_thread(master_th, parent_team, 0); int enter_teams = 0; if (parent_team->t.t_active_level >= master_th->th.th_current_task->td_icvs.max_active_levels) { @@ -1644,13 +1660,10 @@ } else { enter_teams = ((ap == NULL && active_level == 0) || (ap && teams_level > 0 && teams_level == level)); - nthreads = - master_set_numthreads - ? master_set_numthreads - : get__nproc_2( - parent_team, - master_tid); // TODO: get nproc directly from current task - + nthreads = master_set_numthreads + ? master_set_numthreads + // TODO: get nproc directly from current task + : get__nproc_2(parent_team, master_tid); // Check if we need to take forkjoin lock? (no need for serialized // parallel out of teams construct). This code moved here from // __kmp_reserve_threads() to speedup nested serialized parallels. @@ -1985,6 +1998,8 @@ #endif proc_bind, &new_icvs, argc USE_NESTED_HOT_ARG(master_th)); + if (__kmp_barrier_release_pattern[bs_forkjoin_barrier] == bp_dist_bar) + copy_icvs((kmp_internal_control_t *)team->t.b->team_icvs, &new_icvs); } else { /* allocate a new parallel team */ KF_TRACE(10, ("__kmp_fork_call: before __kmp_allocate_team\n")); @@ -1995,6 +2010,9 @@ proc_bind, &master_th->th.th_current_task->td_icvs, argc USE_NESTED_HOT_ARG(master_th)); + if (__kmp_barrier_release_pattern[bs_forkjoin_barrier] == bp_dist_bar) + copy_icvs((kmp_internal_control_t *)team->t.b->team_icvs, + &master_th->th.th_current_task->td_icvs); } KF_TRACE( 10, ("__kmp_fork_call: after __kmp_allocate_team - team = %p\n", team)); @@ -2361,6 +2379,12 @@ parent_team->t.t_stack_id = NULL; } #endif + + if (team->t.t_nproc > 1 && + __kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) { + team->t.b->update_num_threads(team->t.t_nproc); + __kmp_add_threads_to_team(team, team->t.t_nproc); + } } KMP_MB(); @@ -2648,6 +2672,9 @@ __kmp_acquire_bootstrap_lock(&__kmp_forkjoin_lock); + if (__kmp_barrier_release_pattern[bs_forkjoin_barrier] == bp_dist_bar) { + __kmp_resize_dist_barrier(hot_team, hot_team->t.t_nproc, new_nth); + } // Release the extra threads we don't need any more. for (f = new_nth; f < hot_team->t.t_nproc; f++) { KMP_DEBUG_ASSERT(hot_team->t.t_threads[f] != NULL); @@ -2667,6 +2694,11 @@ } #endif + if (__kmp_barrier_release_pattern[bs_forkjoin_barrier] == bp_dist_bar) { + hot_team->t.b->update_num_threads(new_nth); + __kmp_add_threads_to_team(hot_team, new_nth); + } + __kmp_release_bootstrap_lock(&__kmp_forkjoin_lock); // Update the t_nproc field in the threads that are still active. @@ -4114,7 +4146,6 @@ this_thr->th.th_team_nproc = team->t.t_nproc; this_thr->th.th_team_master = master; this_thr->th.th_team_serialized = team->t.t_serialized; - TCW_PTR(this_thr->th.th_sleep_loc, NULL); KMP_DEBUG_ASSERT(team->t.t_implicit_task_taskdata); @@ -4283,6 +4314,12 @@ new_thr->th.th_task_state_top = 0; new_thr->th.th_task_state_stack_sz = 4; + if (__kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) { + // Make sure pool thread has transitioned to waiting on own thread struct + KMP_DEBUG_ASSERT(new_thr->th.th_used_in_team.load() == 0); + // Thread activated in __kmp_allocate_team when increasing team size + } + #ifdef KMP_ADJUST_BLOCKTIME /* Adjust blocktime back to zero if necessary */ /* Middle initialization might not have occurred yet */ @@ -4450,6 +4487,9 @@ balign[b].bb.use_oncore_barrier = 0; } + TCW_PTR(new_thr->th.th_sleep_loc, NULL); + new_thr->th.th_sleep_loc_type = flag_unset; + new_thr->th.th_spin_here = FALSE; new_thr->th.th_next_waiting = 0; #if KMP_OS_UNIX @@ -5029,6 +5069,13 @@ } #endif + if (team->t.t_nproc != new_nproc && + __kmp_barrier_release_pattern[bs_forkjoin_barrier] == bp_dist_bar) { + // Distributed barrier may need a resize + int old_nthr = team->t.t_nproc; + __kmp_resize_dist_barrier(team, old_nthr, new_nproc); + } + // Has the number of threads changed? /* Let's assume the most common case is that the number of threads is unchanged, and put that case first. */ @@ -5078,6 +5125,11 @@ new_nproc)); team->t.t_size_changed = 1; + if (__kmp_barrier_release_pattern[bs_forkjoin_barrier] == bp_dist_bar) { + // Barrier size already reduced earlier in this function + // Activate team threads via th_used_in_team + __kmp_add_threads_to_team(team, new_nproc); + } #if KMP_NESTED_HOT_TEAMS if (__kmp_hot_teams_mode == 0) { // AC: saved number of threads should correspond to team's value in this @@ -5154,7 +5206,7 @@ KA_TRACE(20, ("__kmp_allocate_team: increasing hot team thread count to %d\n", new_nproc)); - + int old_nproc = team->t.t_nproc; // save old value and use to update only team->t.t_size_changed = 1; #if KMP_NESTED_HOT_TEAMS @@ -5181,10 +5233,9 @@ KMP_DEBUG_ASSERT(__kmp_hot_teams_mode == 1); team->t.t_nproc = new_nproc; // just get reserved threads involved } else { - // we may have some threads in reserve, but not enough - team->t.t_nproc = - hot_teams[level] - .hot_team_nth; // get reserved threads involved if any + // We may have some threads in reserve, but not enough; + // get reserved threads involved if any. + team->t.t_nproc = hot_teams[level].hot_team_nth; hot_teams[level].hot_team_nth = new_nproc; // adjust hot team max size #endif // KMP_NESTED_HOT_TEAMS if (team->t.t_max_nproc < new_nproc) { @@ -5239,8 +5290,12 @@ #if KMP_NESTED_HOT_TEAMS } // end of check of t_nproc vs. new_nproc vs. hot_team_nth #endif // KMP_NESTED_HOT_TEAMS + if (__kmp_barrier_release_pattern[bs_forkjoin_barrier] == bp_dist_bar) { + // Barrier size already increased earlier in this function + // Activate team threads via th_used_in_team + __kmp_add_threads_to_team(team, new_nproc); + } /* make sure everyone is syncronized */ - int old_nproc = team->t.t_nproc; // save old value and use to update only // new threads below __kmp_initialize_team(team, new_nproc, new_icvs, root->r.r_uber_thread->th.th_ident); @@ -5344,6 +5399,13 @@ /* take this team from the team pool */ __kmp_team_pool = team->t.t_next_pool; + if (max_nproc > 1 && + __kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) { + if (!team->t.b) { // Allocate barrier structure + team->t.b = distributedBarrier::allocate(__kmp_dflt_team_nth_ub); + } + } + /* setup the team for fresh use */ __kmp_initialize_team(team, new_nproc, new_icvs, NULL); @@ -5399,6 +5461,12 @@ /* and set it up */ team->t.t_max_nproc = max_nproc; + if (max_nproc > 1 && + __kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) { + // Allocate barrier structure + team->t.b = distributedBarrier::allocate(__kmp_dflt_team_nth_ub); + } + /* NOTE well, for some reason allocating one big buffer and dividing it up seems to really hurt performance a lot on the P4, so, let's not use this */ __kmp_allocate_team_arrays(team, max_nproc); @@ -5555,10 +5623,43 @@ /* free the worker threads */ for (f = 1; f < team->t.t_nproc; ++f) { KMP_DEBUG_ASSERT(team->t.t_threads[f]); + if (__kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) { + KMP_COMPARE_AND_STORE_ACQ32(&(team->t.t_threads[f]->th.th_used_in_team), + 1, 2); + } __kmp_free_thread(team->t.t_threads[f]); + } + + if (__kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) { + if (team->t.b) { + // wake up thread at old location + team->t.b->go_release(); + if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME) { + for (f = 1; f < team->t.t_nproc; ++f) { + if (team->t.b->sleep[f].sleep) { + __kmp_atomic_resume_64( + team->t.t_threads[f]->th.th_info.ds.ds_gtid, + (kmp_atomic_flag_64<> *)NULL); + } + } + } + // Wait for threads to be removed from team + for (int f = 1; f < team->t.t_nproc; ++f) { + while (team->t.t_threads[f]->th.th_used_in_team.load() != 0) + KMP_CPU_PAUSE(); + } + } + } + + for (f = 1; f < team->t.t_nproc; ++f) { team->t.t_threads[f] = NULL; } + if (team->t.t_max_nproc > 1 && + __kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) { + distributedBarrier::deallocate(team->t.b); + team->t.b = NULL; + } /* put the team back in the team pool */ /* TODO limit size of team pool, call reap_team if pool too large */ team->t.t_next_pool = CCAST(kmp_team_t *, __kmp_team_pool); @@ -5957,12 +6058,19 @@ KA_TRACE( 20, ("__kmp_reap_thread: releasing T#%d from fork barrier for reap\n", gtid)); - /* Need release fence here to prevent seg faults for tree forkjoin barrier - * (GEH) */ - ANNOTATE_HAPPENS_BEFORE(thread); - kmp_flag_64<> flag(&thread->th.th_bar[bs_forkjoin_barrier].bb.b_go, - thread); - __kmp_release_64(&flag); + if (__kmp_barrier_gather_pattern[bs_forkjoin_barrier] == bp_dist_bar) { + while ( + !KMP_COMPARE_AND_STORE_ACQ32(&(thread->th.th_used_in_team), 0, 3)) + KMP_CPU_PAUSE(); + __kmp_resume_32(gtid, (kmp_flag_32 *)NULL); + } else { + /* Need release fence here to prevent seg faults for tree forkjoin + barrier (GEH) */ + ANNOTATE_HAPPENS_BEFORE(thread); + kmp_flag_64<> flag(&thread->th.th_bar[bs_forkjoin_barrier].bb.b_go, + thread); + __kmp_release_64(&flag); + } } // Terminate OS thread. @@ -6836,8 +6944,8 @@ #if KMP_FAST_REDUCTION_BARRIER #define kmp_reduction_barrier_gather_bb ((int)1) #define kmp_reduction_barrier_release_bb ((int)1) -#define kmp_reduction_barrier_gather_pat bp_hyper_bar -#define kmp_reduction_barrier_release_pat bp_hyper_bar +#define kmp_reduction_barrier_gather_pat __kmp_barrier_gather_pat_dflt +#define kmp_reduction_barrier_release_pat __kmp_barrier_release_pat_dflt #endif // KMP_FAST_REDUCTION_BARRIER for (i = bs_plain_barrier; i < bs_last_barrier; i++) { __kmp_barrier_gather_branch_bits[i] = __kmp_barrier_gather_bb_dflt; @@ -8694,6 +8802,96 @@ __kmp_release_bootstrap_lock(&__kmp_initz_lock); } +// The team size is changing, so distributed barrier must be modified +void __kmp_resize_dist_barrier(kmp_team_t *team, int old_nthreads, + int new_nthreads) { + KMP_DEBUG_ASSERT(__kmp_barrier_release_pattern[bs_forkjoin_barrier] == + bp_dist_bar); + kmp_info_t **other_threads = team->t.t_threads; + + // We want all the workers to stop waiting on the barrier while we adjust the + // size of the team. + for (int f = 1; f < old_nthreads; ++f) { + KMP_DEBUG_ASSERT(other_threads[f] != NULL); + // Ignore threads that are already inactive or not present in the team + if (team->t.t_threads[f]->th.th_used_in_team.load() == 0) { + // teams construct causes thread_limit to get passed in, and some of + // those could be inactive; just ignore them + continue; + } + // If thread is transitioning still to in_use state, wait for it + if (team->t.t_threads[f]->th.th_used_in_team.load() == 3) { + while (team->t.t_threads[f]->th.th_used_in_team.load() == 3) + KMP_CPU_PAUSE(); + } + // The thread should be in_use now + KMP_DEBUG_ASSERT(team->t.t_threads[f]->th.th_used_in_team.load() == 1); + // Transition to unused state + team->t.t_threads[f]->th.th_used_in_team.store(2); + KMP_DEBUG_ASSERT(team->t.t_threads[f]->th.th_used_in_team.load() == 2); + } + // Release all the workers + kmp_uint64 new_value; // new value for go + new_value = team->t.b->go_release(); + + KMP_MFENCE(); + + // Workers should see transition status 2 and move to 0; but may need to be + // woken up first + size_t my_go_index; + int count = old_nthreads - 1; + while (count > 0) { + count = old_nthreads - 1; + for (int f = 1; f < old_nthreads; ++f) { + my_go_index = f / team->t.b->threads_per_go; + if (other_threads[f]->th.th_used_in_team.load() != 0) { + if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME) { // Wake up the workers + kmp_atomic_flag_64<> *flag = (kmp_atomic_flag_64<> *)CCAST( + void *, other_threads[f]->th.th_sleep_loc); + __kmp_atomic_resume_64(other_threads[f]->th.th_info.ds.ds_gtid, flag); + } + } else { + KMP_DEBUG_ASSERT(team->t.t_threads[f]->th.th_used_in_team.load() == 0); + count--; + } + } + } + // Now update the barrier size + team->t.b->update_num_threads(new_nthreads); + team->t.b->go_reset(); +} + +void __kmp_add_threads_to_team(kmp_team_t *team, int new_nthreads) { + // Add the threads back to the team + KMP_DEBUG_ASSERT(team); + // Threads were paused and pointed at th_used_in_team temporarily during a + // resize of the team. We're going to set th_used_in_team to 3 to indicate to + // the thread that it should transition itself back into the team. Then, if + // blocktime isn't infinite, the thread could be sleeping, so we send a resume + // to wake it up. + for (int f = 1; f < new_nthreads; ++f) { + KMP_DEBUG_ASSERT(team->t.t_threads[f]); + KMP_COMPARE_AND_STORE_ACQ32(&(team->t.t_threads[f]->th.th_used_in_team), 0, + 3); + if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME) { // Wake up sleeping threads + __kmp_resume_32(team->t.t_threads[f]->th.th_info.ds.ds_gtid, + (kmp_flag_32 *)NULL); + } + } + // The threads should be transitioning to the team; when they are done, they + // should have set th_used_in_team to 1. This loop forces master to wait until + // all threads have moved into the team and are waiting in the barrier. + int count = new_nthreads - 1; + while (count > 0) { + count = new_nthreads - 1; + for (int f = 1; f < new_nthreads; ++f) { + if (team->t.t_threads[f]->th.th_used_in_team.load() == 1) { + count--; + } + } + } +} + // Globals and functions for hidden helper task kmp_info_t **__kmp_hidden_helper_threads; kmp_info_t *__kmp_hidden_helper_main_thread; diff --git a/openmp/runtime/src/kmp_settings.cpp b/openmp/runtime/src/kmp_settings.cpp --- a/openmp/runtime/src/kmp_settings.cpp +++ b/openmp/runtime/src/kmp_settings.cpp @@ -1684,6 +1684,8 @@ const char *var; /* ---------- Barrier method control ------------ */ + static int dist_req = 0, non_dist_req = 0; + static bool warn = 1; for (int i = bs_plain_barrier; i < bs_last_barrier; i++) { var = __kmp_barrier_pattern_env_name[i]; @@ -1695,6 +1697,11 @@ for (j = bp_linear_bar; j < bp_last_bar; j++) { if (__kmp_match_with_sentinel(__kmp_barrier_pattern_name[j], value, 1, ',')) { + if (j == bp_dist_bar) { + dist_req++; + } else { + non_dist_req++; + } __kmp_barrier_gather_pattern[i] = (kmp_bar_pat_e)j; break; } @@ -1709,6 +1716,11 @@ if (comma != NULL) { for (j = bp_linear_bar; j < bp_last_bar; j++) { if (__kmp_str_match(__kmp_barrier_pattern_name[j], 1, comma + 1)) { + if (j == bp_dist_bar) { + dist_req++; + } else { + non_dist_req++; + } __kmp_barrier_release_pattern[i] = (kmp_bar_pat_e)j; break; } @@ -1723,6 +1735,28 @@ } } } + if ((dist_req == 0) && (non_dist_req != 0)) { + // Something was set to a barrier other than dist; set all others to hyper + for (int i = bs_plain_barrier; i < bs_last_barrier; i++) { + if (__kmp_barrier_release_pattern[i] == bp_dist_bar) + __kmp_barrier_release_pattern[i] = bp_hyper_bar; + if (__kmp_barrier_gather_pattern[i] == bp_dist_bar) + __kmp_barrier_gather_pattern[i] = bp_hyper_bar; + } + } else if (non_dist_req != 0) { + // some requests for dist, plus requests for others; set all to dist + if (non_dist_req > 0 && dist_req > 0 && warn) { + KMP_INFORM(BarrierPatternOverride, name, + __kmp_barrier_pattern_name[bp_dist_bar]); + warn = 0; + } + for (int i = bs_plain_barrier; i < bs_last_barrier; i++) { + if (__kmp_barrier_release_pattern[i] != bp_dist_bar) + __kmp_barrier_release_pattern[i] = bp_dist_bar; + if (__kmp_barrier_gather_pattern[i] != bp_dist_bar) + __kmp_barrier_gather_pattern[i] = bp_dist_bar; + } + } } // __kmp_stg_parse_barrier_pattern static void __kmp_stg_print_barrier_pattern(kmp_str_buf_t *buffer, @@ -1739,7 +1773,7 @@ __kmp_str_buf_print(buffer, " %s='", __kmp_barrier_pattern_env_name[i]); } - KMP_DEBUG_ASSERT(j < bs_last_barrier && k < bs_last_barrier); + KMP_DEBUG_ASSERT(j < bp_last_bar && k < bp_last_bar); __kmp_str_buf_print(buffer, "%s,%s'\n", __kmp_barrier_pattern_name[j], __kmp_barrier_pattern_name[k]); } diff --git a/openmp/runtime/src/kmp_stats.h b/openmp/runtime/src/kmp_stats.h --- a/openmp/runtime/src/kmp_stats.h +++ b/openmp/runtime/src/kmp_stats.h @@ -246,6 +246,8 @@ // KMP_tree_release -- time in __kmp_tree_barrier_release // KMP_hyper_gather -- time in __kmp_hyper_barrier_gather // KMP_hyper_release -- time in __kmp_hyper_barrier_release +// KMP_dist_gather -- time in __kmp_dist_barrier_gather +// KMP_dist_release -- time in __kmp_dist_barrier_release // clang-format off #define KMP_FOREACH_DEVELOPER_TIMER(macro, arg) \ macro(KMP_fork_call, 0, arg) \ @@ -255,6 +257,8 @@ macro(KMP_hier_release, 0, arg) \ macro(KMP_hyper_gather, 0, arg) \ macro(KMP_hyper_release, 0, arg) \ + macro(KMP_dist_gather, 0, arg) \ + macro(KMP_dist_release, 0, arg) \ macro(KMP_linear_gather, 0, arg) \ macro(KMP_linear_release, 0, arg) \ macro(KMP_tree_gather, 0, arg) \ diff --git a/openmp/runtime/src/kmp_str.h b/openmp/runtime/src/kmp_str.h --- a/openmp/runtime/src/kmp_str.h +++ b/openmp/runtime/src/kmp_str.h @@ -106,6 +106,7 @@ char *__kmp_str_format(char const *format, ...); void __kmp_str_free(char **str); int __kmp_str_match(char const *target, int len, char const *data); +bool __kmp_str_contains(char const *target, int len, char const *data); int __kmp_str_match_false(char const *data); int __kmp_str_match_true(char const *data); void __kmp_str_replace(char *str, char search_for, char replace_with); diff --git a/openmp/runtime/src/kmp_str.cpp b/openmp/runtime/src/kmp_str.cpp --- a/openmp/runtime/src/kmp_str.cpp +++ b/openmp/runtime/src/kmp_str.cpp @@ -515,6 +515,31 @@ return ((len > 0) ? i >= len : (!target[i] && (len || !data[i]))); } // __kmp_str_match +// If data contains all of target, returns true, otherwise returns false. +// len should be the length of target +bool __kmp_str_contains(char const *target, int len, char const *data) { + int i = 0, j = 0, start = 0; + if (target == NULL || data == NULL) { + return FALSE; + } + while (target[i]) { + if (!data[j]) + return FALSE; + if (TOLOWER(target[i]) != TOLOWER(data[j])) { + j = start + 1; + start = j; + i = 0; + } else { + if (i == 0) + start = j; + j++; + i++; + } + } + + return i == len; +} // __kmp_str_contains + int __kmp_str_match_false(char const *data) { int result = __kmp_str_match("false", 1, data) || __kmp_str_match("off", 2, data) || diff --git a/openmp/runtime/src/kmp_tasking.cpp b/openmp/runtime/src/kmp_tasking.cpp --- a/openmp/runtime/src/kmp_tasking.cpp +++ b/openmp/runtime/src/kmp_tasking.cpp @@ -2963,8 +2963,7 @@ (TCR_PTR(CCAST(void *, other_thread->th.th_sleep_loc)) != NULL)) { asleep = 1; - __kmp_null_resume_wrapper(__kmp_gtid_from_thread(other_thread), - other_thread->th.th_sleep_loc); + __kmp_null_resume_wrapper(other_thread); // A sleeping thread should not have any tasks on it's queue. // There is a slight possibility that it resumes, steals a task // from another thread, which spawns more tasks, all in the time @@ -3113,6 +3112,16 @@ thread_finished USE_ITT_BUILD_ARG(itt_sync_obj), is_constrained); } +template +int __kmp_atomic_execute_tasks_64( + kmp_info_t *thread, kmp_int32 gtid, kmp_atomic_flag_64 *flag, + int final_spin, int *thread_finished USE_ITT_BUILD_ARG(void *itt_sync_obj), + kmp_int32 is_constrained) { + return __kmp_execute_tasks_template( + thread, gtid, flag, final_spin, + thread_finished USE_ITT_BUILD_ARG(itt_sync_obj), is_constrained); +} + int __kmp_execute_tasks_oncore( kmp_info_t *thread, kmp_int32 gtid, kmp_flag_oncore *flag, int final_spin, int *thread_finished USE_ITT_BUILD_ARG(void *itt_sync_obj), @@ -3139,6 +3148,14 @@ int *USE_ITT_BUILD_ARG(void *), kmp_int32); +template int __kmp_atomic_execute_tasks_64( + kmp_info_t *, kmp_int32, kmp_atomic_flag_64 *, int, + int *USE_ITT_BUILD_ARG(void *), kmp_int32); + +template int __kmp_atomic_execute_tasks_64( + kmp_info_t *, kmp_int32, kmp_atomic_flag_64 *, int, + int *USE_ITT_BUILD_ARG(void *), kmp_int32); + // __kmp_enable_tasking: Allocate task team and resume threads sleeping at the // next barrier so they can assist in executing enqueued tasks. // First thread in allocates the task team atomically. @@ -3177,7 +3194,7 @@ // tasks and execute them. In extra barrier mode, tasks do not sleep // at the separate tasking barrier, so this isn't a problem. for (i = 0; i < nthreads; i++) { - volatile void *sleep_loc; + void *sleep_loc; kmp_info_t *thread = threads_data[i].td.td_thr; if (i == this_thr->th.th_info.ds.ds_tid) { @@ -3194,7 +3211,7 @@ KF_TRACE(50, ("__kmp_enable_tasking: T#%d waking up thread T#%d\n", __kmp_gtid_from_thread(this_thr), __kmp_gtid_from_thread(thread))); - __kmp_null_resume_wrapper(__kmp_gtid_from_thread(thread), sleep_loc); + __kmp_null_resume_wrapper(thread); } else { KF_TRACE(50, ("__kmp_enable_tasking: T#%d don't wake up thread T#%d\n", __kmp_gtid_from_thread(this_thr), @@ -3564,7 +3581,7 @@ __kmp_gtid_from_thread(thread))); if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME) { - volatile void *sleep_loc; + void *sleep_loc; // If the thread is sleeping, awaken it. if ((sleep_loc = TCR_PTR(CCAST(void *, thread->th.th_sleep_loc))) != NULL) { @@ -3572,7 +3589,7 @@ 10, ("__kmp_wait_to_unref_task_team: T#%d waking up thread T#%d\n", __kmp_gtid_from_thread(thread), __kmp_gtid_from_thread(thread))); - __kmp_null_resume_wrapper(__kmp_gtid_from_thread(thread), sleep_loc); + __kmp_null_resume_wrapper(thread); } } } diff --git a/openmp/runtime/src/kmp_wait_release.h b/openmp/runtime/src/kmp_wait_release.h --- a/openmp/runtime/src/kmp_wait_release.h +++ b/openmp/runtime/src/kmp_wait_release.h @@ -33,96 +33,285 @@ @{ */ -/*! - * The flag_type describes the storage used for the flag. - */ -enum flag_type { - flag32, /**< 32 bit flags */ - flag64, /**< 64 bit flags */ - flag_oncore /**< special 64-bit flag for on-core barrier (hierarchical) */ -}; - struct flag_properties { unsigned int type : 16; unsigned int reserved : 16; }; -/*! - * Base class for wait/release volatile flag - */ -template class kmp_flag_native { - volatile P *loc; - flag_properties t; +template struct flag_traits {}; + +template <> struct flag_traits { + typedef kmp_uint32 flag_t; + static const flag_type t = flag32; + static inline flag_t tcr(flag_t f) { return TCR_4(f); } + static inline flag_t test_then_add4(volatile flag_t *f) { + return KMP_TEST_THEN_ADD4_32(RCAST(volatile kmp_int32 *, f)); + } + static inline flag_t test_then_or(volatile flag_t *f, flag_t v) { + return KMP_TEST_THEN_OR32(f, v); + } + static inline flag_t test_then_and(volatile flag_t *f, flag_t v) { + return KMP_TEST_THEN_AND32(f, v); + } +}; + +template <> struct flag_traits { + typedef kmp_uint64 flag_t; + static const flag_type t = atomic_flag64; + static inline flag_t tcr(flag_t f) { return TCR_8(f); } + static inline flag_t test_then_add4(volatile flag_t *f) { + return KMP_TEST_THEN_ADD4_64(RCAST(volatile kmp_int64 *, f)); + } + static inline flag_t test_then_or(volatile flag_t *f, flag_t v) { + return KMP_TEST_THEN_OR64(f, v); + } + static inline flag_t test_then_and(volatile flag_t *f, flag_t v) { + return KMP_TEST_THEN_AND64(f, v); + } +}; + +template <> struct flag_traits { + typedef kmp_uint64 flag_t; + static const flag_type t = flag64; + static inline flag_t tcr(flag_t f) { return TCR_8(f); } + static inline flag_t test_then_add4(volatile flag_t *f) { + return KMP_TEST_THEN_ADD4_64(RCAST(volatile kmp_int64 *, f)); + } + static inline flag_t test_then_or(volatile flag_t *f, flag_t v) { + return KMP_TEST_THEN_OR64(f, v); + } + static inline flag_t test_then_and(volatile flag_t *f, flag_t v) { + return KMP_TEST_THEN_AND64(f, v); + } +}; + +template <> struct flag_traits { + typedef kmp_uint64 flag_t; + static const flag_type t = flag_oncore; + static inline flag_t tcr(flag_t f) { return TCR_8(f); } + static inline flag_t test_then_add4(volatile flag_t *f) { + return KMP_TEST_THEN_ADD4_64(RCAST(volatile kmp_int64 *, f)); + } + static inline flag_t test_then_or(volatile flag_t *f, flag_t v) { + return KMP_TEST_THEN_OR64(f, v); + } + static inline flag_t test_then_and(volatile flag_t *f, flag_t v) { + return KMP_TEST_THEN_AND64(f, v); + } +}; + +/*! Base class for all flags */ +template class kmp_flag { +protected: + flag_properties t; /**< "Type" of the flag in loc */ + kmp_info_t *waiting_threads[1]; /**< Threads sleeping on this thread. */ + kmp_uint32 num_waiting_threads; /**< #threads sleeping on this thread. */ + std::atomic *sleepLoc; public: - typedef P flag_t; - kmp_flag_native(volatile P *p, flag_type ft) - : loc(p), t({(short unsigned int)ft, 0U}) {} - volatile P *get() { return loc; } - void *get_void_p() { return RCAST(void *, CCAST(P *, loc)); } - void set(volatile P *new_loc) { loc = new_loc; } + typedef flag_traits traits_type; + kmp_flag() : t({FlagType, 0U}), num_waiting_threads(0), sleepLoc(nullptr) {} + kmp_flag(int nwaiters) + : t({FlagType, 0U}), num_waiting_threads(nwaiters), sleepLoc(nullptr) {} + kmp_flag(std::atomic *sloc) + : t({FlagType, 0U}), num_waiting_threads(0), sleepLoc(sloc) {} + /*! @result the flag_type */ flag_type get_type() { return (flag_type)(t.type); } - P load() { return *loc; } - void store(P val) { *loc = val; } + + /*! param i in index into waiting_threads + * @result the thread that is waiting at index i */ + kmp_info_t *get_waiter(kmp_uint32 i) { + KMP_DEBUG_ASSERT(i < num_waiting_threads); + return waiting_threads[i]; + } + /*! @result num_waiting_threads */ + kmp_uint32 get_num_waiters() { return num_waiting_threads; } + /*! @param thr in the thread which is now waiting + * Insert a waiting thread at index 0. */ + void set_waiter(kmp_info_t *thr) { + waiting_threads[0] = thr; + num_waiting_threads = 1; + } + enum barrier_type get_bt() { return bs_last_barrier; } }; -/*! - * Base class for wait/release atomic flag - */ -template class kmp_flag { - std::atomic

- *loc; /**< Pointer to the flag storage that is modified by another thread - */ - flag_properties t; /**< "Type" of the flag in loc */ +/*! Base class for wait/release volatile flag */ +template +class kmp_flag_native : public kmp_flag { +protected: + volatile PtrType *loc; + PtrType checker; /**< When flag==checker, it has been released. */ + typedef flag_traits traits_type; + public: - typedef P flag_t; - kmp_flag(std::atomic

*p, flag_type ft) - : loc(p), t({(short unsigned int)ft, 0U}) {} - /*! - * @result the pointer to the actual flag - */ - std::atomic

*get() { return loc; } - /*! - * @result void* pointer to the actual flag - */ + typedef PtrType flag_t; + kmp_flag_native(volatile PtrType *p) : kmp_flag(), loc(p) {} + kmp_flag_native(volatile PtrType *p, kmp_info_t *thr) + : kmp_flag(1), loc(p) { + this->waiting_threads[0] = thr; + } + kmp_flag_native(volatile PtrType *p, PtrType c) + : kmp_flag(), loc(p), checker(c) {} + kmp_flag_native(volatile PtrType *p, PtrType c, std::atomic *sloc) + : kmp_flag(sloc), loc(p), checker(c) {} + volatile PtrType *get() { return loc; } + void *get_void_p() { return RCAST(void *, CCAST(PtrType *, loc)); } + void set(volatile PtrType *new_loc) { loc = new_loc; } + PtrType load() { return *loc; } + void store(PtrType val) { *loc = val; } + /*! @result true if the flag object has been released. */ + virtual bool done_check() { + if (Sleepable && !(this->sleepLoc)) + return (traits_type::tcr(*(this->get())) & ~KMP_BARRIER_SLEEP_STATE) == + checker; + else + return traits_type::tcr(*(this->get())) == checker; + } + /*! @param old_loc in old value of flag + * @result true if the flag's old value indicates it was released. */ + virtual bool done_check_val(PtrType old_loc) { return old_loc == checker; } + /*! @result true if the flag object is not yet released. + * Used in __kmp_wait_template like: + * @code + * while (flag.notdone_check()) { pause(); } + * @endcode */ + virtual bool notdone_check() { + return traits_type::tcr(*(this->get())) != checker; + } + /*! @result Actual flag value before release was applied. + * Trigger all waiting threads to run by modifying flag to release state. */ + void internal_release() { + (void)traits_type::test_then_add4((volatile PtrType *)this->get()); + } + /*! @result Actual flag value before sleep bit(s) set. + * Notes that there is at least one thread sleeping on the flag by setting + * sleep bit(s). */ + PtrType set_sleeping() { + if (this->sleepLoc) { + this->sleepLoc->store(true); + return *(this->get()); + } + return traits_type::test_then_or((volatile PtrType *)this->get(), + KMP_BARRIER_SLEEP_STATE); + } + /*! @result Actual flag value before sleep bit(s) cleared. + * Notes that there are no longer threads sleeping on the flag by clearing + * sleep bit(s). */ + void unset_sleeping() { + if (this->sleepLoc) { + this->sleepLoc->store(false); + return; + } + traits_type::test_then_and((volatile PtrType *)this->get(), + ~KMP_BARRIER_SLEEP_STATE); + } + /*! @param old_loc in old value of flag + * Test if there are threads sleeping on the flag's old value in old_loc. */ + bool is_sleeping_val(PtrType old_loc) { + if (this->sleepLoc) + return this->sleepLoc->load(); + return old_loc & KMP_BARRIER_SLEEP_STATE; + } + /*! Test whether there are threads sleeping on the flag. */ + bool is_sleeping() { + if (this->sleepLoc) + return this->sleepLoc->load(); + return is_sleeping_val(*(this->get())); + } + bool is_any_sleeping() { + if (this->sleepLoc) + return this->sleepLoc->load(); + return is_sleeping_val(*(this->get())); + } + kmp_uint8 *get_stolen() { return NULL; } +}; + +/*! Base class for wait/release atomic flag */ +template +class kmp_flag_atomic : public kmp_flag { +protected: + std::atomic *loc; /**< Pointer to flag location to wait on */ + PtrType checker; /**< Flag == checker means it has been released. */ +public: + typedef flag_traits traits_type; + typedef PtrType flag_t; + kmp_flag_atomic(std::atomic *p) : kmp_flag(), loc(p) {} + kmp_flag_atomic(std::atomic *p, kmp_info_t *thr) + : kmp_flag(1), loc(p) { + this->waiting_threads[0] = thr; + } + kmp_flag_atomic(std::atomic *p, PtrType c) + : kmp_flag(), loc(p), checker(c) {} + kmp_flag_atomic(std::atomic *p, PtrType c, std::atomic *sloc) + : kmp_flag(sloc), loc(p), checker(c) {} + /*! @result the pointer to the actual flag */ + std::atomic *get() { return loc; } + /*! @result void* pointer to the actual flag */ void *get_void_p() { return RCAST(void *, loc); } - /*! - * @param new_loc in set loc to point at new_loc - */ - void set(std::atomic

*new_loc) { loc = new_loc; } - /*! - * @result the flag_type - */ - flag_type get_type() { return (flag_type)(t.type); } - /*! - * @result flag value - */ - P load() { return loc->load(std::memory_order_acquire); } - /*! - * @param val the new flag value to be stored - */ - void store(P val) { loc->store(val, std::memory_order_release); } - // Derived classes must provide the following: - /* - kmp_info_t * get_waiter(kmp_uint32 i); - kmp_uint32 get_num_waiters(); - bool done_check(); - bool done_check_val(P old_loc); - bool notdone_check(); - P internal_release(); - void suspend(int th_gtid); - void mwait(int th_gtid); - void resume(int th_gtid); - P set_sleeping(); - P unset_sleeping(); - bool is_sleeping(); - bool is_any_sleeping(); - bool is_sleeping_val(P old_loc); - int execute_tasks(kmp_info_t *this_thr, kmp_int32 gtid, int final_spin, - int *thread_finished - USE_ITT_BUILD_ARG(void * itt_sync_obj), kmp_int32 - is_constrained); - */ + /*! @param new_loc in set loc to point at new_loc */ + void set(std::atomic *new_loc) { loc = new_loc; } + /*! @result flag value */ + PtrType load() { return loc->load(std::memory_order_acquire); } + /*! @param val the new flag value to be stored */ + void store(PtrType val) { loc->store(val, std::memory_order_release); } + /*! @result true if the flag object has been released. */ + bool done_check() { + if (Sleepable && !(this->sleepLoc)) + return (this->load() & ~KMP_BARRIER_SLEEP_STATE) == checker; + else + return this->load() == checker; + } + /*! @param old_loc in old value of flag + * @result true if the flag's old value indicates it was released. */ + bool done_check_val(PtrType old_loc) { return old_loc == checker; } + /*! @result true if the flag object is not yet released. + * Used in __kmp_wait_template like: + * @code + * while (flag.notdone_check()) { pause(); } + * @endcode */ + bool notdone_check() { return this->load() != checker; } + /*! @result Actual flag value before release was applied. + * Trigger all waiting threads to run by modifying flag to release state. */ + void internal_release() { KMP_ATOMIC_ADD(this->get(), 4); } + /*! @result Actual flag value before sleep bit(s) set. + * Notes that there is at least one thread sleeping on the flag by setting + * sleep bit(s). */ + PtrType set_sleeping() { + if (this->sleepLoc) { + this->sleepLoc->store(true); + return *(this->get()); + } + return KMP_ATOMIC_OR(this->get(), KMP_BARRIER_SLEEP_STATE); + } + /*! @result Actual flag value before sleep bit(s) cleared. + * Notes that there are no longer threads sleeping on the flag by clearing + * sleep bit(s). */ + void unset_sleeping() { + if (this->sleepLoc) { + this->sleepLoc->store(false); + return; + } + KMP_ATOMIC_AND(this->get(), ~KMP_BARRIER_SLEEP_STATE); + } + /*! @param old_loc in old value of flag + * Test whether there are threads sleeping on flag's old value in old_loc. */ + bool is_sleeping_val(PtrType old_loc) { + if (this->sleepLoc) + return this->sleepLoc->load(); + return old_loc & KMP_BARRIER_SLEEP_STATE; + } + /*! Test whether there are threads sleeping on the flag. */ + bool is_sleeping() { + if (this->sleepLoc) + return this->sleepLoc->load(); + return is_sleeping_val(this->load()); + } + bool is_any_sleeping() { + if (this->sleepLoc) + return this->sleepLoc->load(); + return is_sleeping_val(this->load()); + } + kmp_uint8 *get_stolen() { return NULL; } }; #if OMPT_SUPPORT @@ -264,8 +453,9 @@ ompt_entry_state = this_thr->th.ompt_thread_info.state; if (!final_spin || ompt_entry_state != ompt_state_wait_barrier_implicit || KMP_MASTER_TID(this_thr->th.th_info.ds.ds_tid)) { - ompt_lw_taskteam_t *team = - this_thr->th.th_team->t.ompt_serialized_team_info; + ompt_lw_taskteam_t *team = NULL; + if (this_thr->th.th_team) + team = this_thr->th.th_team->t.ompt_serialized_team_info; if (team) { tId = &(team->ompt_task_info.task_data); } else { @@ -340,11 +530,11 @@ disabled (KMP_TASKING=0). */ if (task_team != NULL) { if (TCR_SYNC_4(task_team->tt.tt_active)) { - if (KMP_TASKING_ENABLED(task_team)) + if (KMP_TASKING_ENABLED(task_team)) { flag->execute_tasks( this_thr, th_gtid, final_spin, &tasks_completed USE_ITT_BUILD_ARG(itt_sync_obj), 0); - else + } else this_thr->th.th_reap_state = KMP_SAFE_TO_REAP; } else { KMP_DEBUG_ASSERT(!KMP_MASTER_TID(this_thr->th.th_info.ds.ds_tid)); @@ -557,6 +747,7 @@ else { // if flag changes here, wake-up happens immediately TCW_PTR(th->th.th_sleep_loc, (void *)flag); + th->th.th_sleep_loc_type = flag->get_type(); __kmp_unlock_suspend_mx(th); KF_TRACE(50, ("__kmp_mwait_template: T#%d calling mwait\n", th_gtid)); #if KMP_HAVE_UMWAIT @@ -574,6 +765,7 @@ if (flag->is_sleeping()) flag->unset_sleeping(); TCW_PTR(th->th.th_sleep_loc, NULL); + th->th.th_sleep_loc_type = flag_unset; } // Mark thread as active again th->th.th_active = TRUE; @@ -624,251 +816,15 @@ } } -template struct flag_traits {}; - -template <> struct flag_traits { - typedef kmp_uint32 flag_t; - static const flag_type t = flag32; - static inline flag_t tcr(flag_t f) { return TCR_4(f); } - static inline flag_t test_then_add4(volatile flag_t *f) { - return KMP_TEST_THEN_ADD4_32(RCAST(volatile kmp_int32 *, f)); - } - static inline flag_t test_then_or(volatile flag_t *f, flag_t v) { - return KMP_TEST_THEN_OR32(f, v); - } - static inline flag_t test_then_and(volatile flag_t *f, flag_t v) { - return KMP_TEST_THEN_AND32(f, v); - } -}; - -template <> struct flag_traits { - typedef kmp_uint64 flag_t; - static const flag_type t = flag64; - static inline flag_t tcr(flag_t f) { return TCR_8(f); } - static inline flag_t test_then_add4(volatile flag_t *f) { - return KMP_TEST_THEN_ADD4_64(RCAST(volatile kmp_int64 *, f)); - } - static inline flag_t test_then_or(volatile flag_t *f, flag_t v) { - return KMP_TEST_THEN_OR64(f, v); - } - static inline flag_t test_then_and(volatile flag_t *f, flag_t v) { - return KMP_TEST_THEN_AND64(f, v); - } -}; - -// Basic flag that does not use C11 Atomics -template -class kmp_basic_flag_native : public kmp_flag_native { - typedef flag_traits traits_type; - FlagType checker; /**< Value to compare flag to to check if flag has been - released. */ - kmp_info_t - *waiting_threads[1]; /**< Array of threads sleeping on this thread. */ - kmp_uint32 - num_waiting_threads; /**< Number of threads sleeping on this thread. */ -public: - kmp_basic_flag_native(volatile FlagType *p) - : kmp_flag_native(p, traits_type::t), num_waiting_threads(0) {} - kmp_basic_flag_native(volatile FlagType *p, kmp_info_t *thr) - : kmp_flag_native(p, traits_type::t), num_waiting_threads(1) { - waiting_threads[0] = thr; - } - kmp_basic_flag_native(volatile FlagType *p, FlagType c) - : kmp_flag_native(p, traits_type::t), checker(c), - num_waiting_threads(0) {} - /*! - * param i in index into waiting_threads - * @result the thread that is waiting at index i - */ - kmp_info_t *get_waiter(kmp_uint32 i) { - KMP_DEBUG_ASSERT(i < num_waiting_threads); - return waiting_threads[i]; - } - /*! - * @result num_waiting_threads - */ - kmp_uint32 get_num_waiters() { return num_waiting_threads; } - /*! - * @param thr in the thread which is now waiting - * - * Insert a waiting thread at index 0. - */ - void set_waiter(kmp_info_t *thr) { - waiting_threads[0] = thr; - num_waiting_threads = 1; - } - /*! - * @result true if the flag object has been released. - */ - bool done_check() { - if (Sleepable) - return (traits_type::tcr(*(this->get())) & ~KMP_BARRIER_SLEEP_STATE) == - checker; - else - return traits_type::tcr(*(this->get())) == checker; - } - /*! - * @param old_loc in old value of flag - * @result true if the flag's old value indicates it was released. - */ - bool done_check_val(FlagType old_loc) { return old_loc == checker; } - /*! - * @result true if the flag object is not yet released. - * Used in __kmp_wait_template like: - * @code - * while (flag.notdone_check()) { pause(); } - * @endcode - */ - bool notdone_check() { return traits_type::tcr(*(this->get())) != checker; } - /*! - * @result Actual flag value before release was applied. - * Trigger all waiting threads to run by modifying flag to release state. - */ - void internal_release() { - (void)traits_type::test_then_add4((volatile FlagType *)this->get()); - } - /*! - * @result Actual flag value before sleep bit(s) set. - * Notes that there is at least one thread sleeping on the flag by setting - * sleep bit(s). - */ - FlagType set_sleeping() { - return traits_type::test_then_or((volatile FlagType *)this->get(), - KMP_BARRIER_SLEEP_STATE); - } - /*! - * @result Actual flag value before sleep bit(s) cleared. - * Notes that there are no longer threads sleeping on the flag by clearing - * sleep bit(s). - */ - FlagType unset_sleeping() { - return traits_type::test_then_and((volatile FlagType *)this->get(), - ~KMP_BARRIER_SLEEP_STATE); - } - /*! - * @param old_loc in old value of flag - * Test whether there are threads sleeping on the flag's old value in old_loc. - */ - bool is_sleeping_val(FlagType old_loc) { - return old_loc & KMP_BARRIER_SLEEP_STATE; - } - /*! - * Test whether there are threads sleeping on the flag. - */ - bool is_sleeping() { return is_sleeping_val(*(this->get())); } - bool is_any_sleeping() { return is_sleeping_val(*(this->get())); } - kmp_uint8 *get_stolen() { return NULL; } - enum barrier_type get_bt() { return bs_last_barrier; } -}; - -template -class kmp_basic_flag : public kmp_flag { - typedef flag_traits traits_type; - FlagType checker; /**< Value to compare flag to to check if flag has been - released. */ - kmp_info_t - *waiting_threads[1]; /**< Array of threads sleeping on this thread. */ - kmp_uint32 - num_waiting_threads; /**< Number of threads sleeping on this thread. */ -public: - kmp_basic_flag(std::atomic *p) - : kmp_flag(p, traits_type::t), num_waiting_threads(0) {} - kmp_basic_flag(std::atomic *p, kmp_info_t *thr) - : kmp_flag(p, traits_type::t), num_waiting_threads(1) { - waiting_threads[0] = thr; - } - kmp_basic_flag(std::atomic *p, FlagType c) - : kmp_flag(p, traits_type::t), checker(c), - num_waiting_threads(0) {} - /*! - * param i in index into waiting_threads - * @result the thread that is waiting at index i - */ - kmp_info_t *get_waiter(kmp_uint32 i) { - KMP_DEBUG_ASSERT(i < num_waiting_threads); - return waiting_threads[i]; - } - /*! - * @result num_waiting_threads - */ - kmp_uint32 get_num_waiters() { return num_waiting_threads; } - /*! - * @param thr in the thread which is now waiting - * - * Insert a waiting thread at index 0. - */ - void set_waiter(kmp_info_t *thr) { - waiting_threads[0] = thr; - num_waiting_threads = 1; - } - /*! - * @result true if the flag object has been released. - */ - bool done_check() { - if (Sleepable) - return (this->load() & ~KMP_BARRIER_SLEEP_STATE) == checker; - else - return this->load() == checker; - } - /*! - * @param old_loc in old value of flag - * @result true if the flag's old value indicates it was released. - */ - bool done_check_val(FlagType old_loc) { return old_loc == checker; } - /*! - * @result true if the flag object is not yet released. - * Used in __kmp_wait_template like: - * @code - * while (flag.notdone_check()) { pause(); } - * @endcode - */ - bool notdone_check() { return this->load() != checker; } - /*! - * @result Actual flag value before release was applied. - * Trigger all waiting threads to run by modifying flag to release state. - */ - void internal_release() { KMP_ATOMIC_ADD(this->get(), 4); } - /*! - * @result Actual flag value before sleep bit(s) set. - * Notes that there is at least one thread sleeping on the flag by setting - * sleep bit(s). - */ - FlagType set_sleeping() { - return KMP_ATOMIC_OR(this->get(), KMP_BARRIER_SLEEP_STATE); - } - /*! - * @result Actual flag value before sleep bit(s) cleared. - * Notes that there are no longer threads sleeping on the flag by clearing - * sleep bit(s). - */ - FlagType unset_sleeping() { - return KMP_ATOMIC_AND(this->get(), ~KMP_BARRIER_SLEEP_STATE); - } - /*! - * @param old_loc in old value of flag - * Test whether there are threads sleeping on the flag's old value in old_loc. - */ - bool is_sleeping_val(FlagType old_loc) { - return old_loc & KMP_BARRIER_SLEEP_STATE; - } - /*! - * Test whether there are threads sleeping on the flag. - */ - bool is_sleeping() { return is_sleeping_val(this->load()); } - bool is_any_sleeping() { return is_sleeping_val(this->load()); } - kmp_uint8 *get_stolen() { return NULL; } - enum barrier_type get_bt() { return bs_last_barrier; } -}; - template -class kmp_flag_32 : public kmp_basic_flag { +class kmp_flag_32 : public kmp_flag_atomic { public: kmp_flag_32(std::atomic *p) - : kmp_basic_flag(p) {} + : kmp_flag_atomic(p) {} kmp_flag_32(std::atomic *p, kmp_info_t *thr) - : kmp_basic_flag(p, thr) {} + : kmp_flag_atomic(p, thr) {} kmp_flag_32(std::atomic *p, kmp_uint32 c) - : kmp_basic_flag(p, c) {} + : kmp_flag_atomic(p, c) {} void suspend(int th_gtid) { __kmp_suspend_32(th_gtid, this); } #if KMP_HAVE_MWAIT || KMP_HAVE_UMWAIT void mwait(int th_gtid) { __kmp_mwait_32(th_gtid, this); } @@ -895,14 +851,16 @@ }; template -class kmp_flag_64 : public kmp_basic_flag_native { +class kmp_flag_64 : public kmp_flag_native { public: kmp_flag_64(volatile kmp_uint64 *p) - : kmp_basic_flag_native(p) {} + : kmp_flag_native(p) {} kmp_flag_64(volatile kmp_uint64 *p, kmp_info_t *thr) - : kmp_basic_flag_native(p, thr) {} + : kmp_flag_native(p, thr) {} kmp_flag_64(volatile kmp_uint64 *p, kmp_uint64 c) - : kmp_basic_flag_native(p, c) {} + : kmp_flag_native(p, c) {} + kmp_flag_64(volatile kmp_uint64 *p, kmp_uint64 c, std::atomic *loc) + : kmp_flag_native(p, c, loc) {} void suspend(int th_gtid) { __kmp_suspend_64(th_gtid, this); } #if KMP_HAVE_MWAIT || KMP_HAVE_UMWAIT void mwait(int th_gtid) { __kmp_mwait_64(th_gtid, this); } @@ -928,20 +886,52 @@ flag_type get_ptr_type() { return flag64; } }; +template +class kmp_atomic_flag_64 + : public kmp_flag_atomic { +public: + kmp_atomic_flag_64(std::atomic *p) + : kmp_flag_atomic(p) {} + kmp_atomic_flag_64(std::atomic *p, kmp_info_t *thr) + : kmp_flag_atomic(p, thr) {} + kmp_atomic_flag_64(std::atomic *p, kmp_uint64 c) + : kmp_flag_atomic(p, c) {} + kmp_atomic_flag_64(std::atomic *p, kmp_uint64 c, + std::atomic *loc) + : kmp_flag_atomic(p, c, loc) {} + void suspend(int th_gtid) { __kmp_atomic_suspend_64(th_gtid, this); } + void mwait(int th_gtid) { __kmp_atomic_mwait_64(th_gtid, this); } + void resume(int th_gtid) { __kmp_atomic_resume_64(th_gtid, this); } + int execute_tasks(kmp_info_t *this_thr, kmp_int32 gtid, int final_spin, + int *thread_finished USE_ITT_BUILD_ARG(void *itt_sync_obj), + kmp_int32 is_constrained) { + return __kmp_atomic_execute_tasks_64( + this_thr, gtid, this, final_spin, + thread_finished USE_ITT_BUILD_ARG(itt_sync_obj), is_constrained); + } + bool wait(kmp_info_t *this_thr, + int final_spin USE_ITT_BUILD_ARG(void *itt_sync_obj)) { + if (final_spin) + return __kmp_wait_template( + this_thr, this USE_ITT_BUILD_ARG(itt_sync_obj)); + else + return __kmp_wait_template( + this_thr, this USE_ITT_BUILD_ARG(itt_sync_obj)); + } + void release() { __kmp_release_template(this); } + flag_type get_ptr_type() { return atomic_flag64; } +}; + // Hierarchical 64-bit on-core barrier instantiation -class kmp_flag_oncore : public kmp_flag_native { - kmp_uint64 checker; - kmp_info_t *waiting_threads[1]; - kmp_uint32 num_waiting_threads; - kmp_uint32 - offset; /**< Portion of flag that is of interest for an operation. */ +class kmp_flag_oncore : public kmp_flag_native { + kmp_uint32 offset; /**< Portion of flag of interest for an operation. */ bool flag_switch; /**< Indicates a switch in flag location. */ enum barrier_type bt; /**< Barrier type. */ - kmp_info_t *this_thr; /**< Thread that may be redirected to different flag - location. */ + kmp_info_t *this_thr; /**< Thread to redirect to different flag location. */ #if USE_ITT_BUILD - void * - itt_sync_obj; /**< ITT object that must be passed to new flag location. */ + void *itt_sync_obj; /**< ITT object to pass to new flag location. */ #endif unsigned char &byteref(volatile kmp_uint64 *loc, size_t offset) { return (RCAST(unsigned char *, CCAST(kmp_uint64 *, loc)))[offset]; @@ -949,26 +939,17 @@ public: kmp_flag_oncore(volatile kmp_uint64 *p) - : kmp_flag_native(p, flag_oncore), num_waiting_threads(0), - flag_switch(false) {} + : kmp_flag_native(p), flag_switch(false) { + } kmp_flag_oncore(volatile kmp_uint64 *p, kmp_uint32 idx) - : kmp_flag_native(p, flag_oncore), num_waiting_threads(0), - offset(idx), flag_switch(false) {} + : kmp_flag_native(p), offset(idx), + flag_switch(false), bt(bs_last_barrier), itt_sync_obj(nullptr) {} kmp_flag_oncore(volatile kmp_uint64 *p, kmp_uint64 c, kmp_uint32 idx, enum barrier_type bar_t, kmp_info_t *thr USE_ITT_BUILD_ARG(void *itt)) - : kmp_flag_native(p, flag_oncore), checker(c), - num_waiting_threads(0), offset(idx), flag_switch(false), bt(bar_t), + : kmp_flag_native(p, c), offset(idx), + flag_switch(false), bt(bar_t), this_thr(thr) USE_ITT_BUILD_ARG(itt_sync_obj(itt)) {} - kmp_info_t *get_waiter(kmp_uint32 i) { - KMP_DEBUG_ASSERT(i < num_waiting_threads); - return waiting_threads[i]; - } - kmp_uint32 get_num_waiters() { return num_waiting_threads; } - void set_waiter(kmp_info_t *thr) { - waiting_threads[0] = thr; - num_waiting_threads = 1; - } bool done_check_val(kmp_uint64 old_loc) { return byteref(&old_loc, offset) == checker; } @@ -997,17 +978,6 @@ KMP_TEST_THEN_OR64(get(), mask); } } - kmp_uint64 set_sleeping() { - return KMP_TEST_THEN_OR64(get(), KMP_BARRIER_SLEEP_STATE); - } - kmp_uint64 unset_sleeping() { - return KMP_TEST_THEN_AND64(get(), ~KMP_BARRIER_SLEEP_STATE); - } - bool is_sleeping_val(kmp_uint64 old_loc) { - return old_loc & KMP_BARRIER_SLEEP_STATE; - } - bool is_sleeping() { return is_sleeping_val(*get()); } - bool is_any_sleeping() { return is_sleeping_val(*get()); } void wait(kmp_info_t *this_thr, int final_spin) { if (final_spin) __kmp_wait_template( @@ -1038,27 +1008,39 @@ thread_finished USE_ITT_BUILD_ARG(itt_sync_obj), is_constrained); #endif } - kmp_uint8 *get_stolen() { return NULL; } enum barrier_type get_bt() { return bt; } flag_type get_ptr_type() { return flag_oncore; } }; -// Used to wake up threads, volatile void* flag is usually the th_sleep_loc -// associated with int gtid. -static inline void __kmp_null_resume_wrapper(int gtid, volatile void *flag) { +static inline void __kmp_null_resume_wrapper(kmp_info_t *thr) { + int gtid = __kmp_gtid_from_thread(thr); + void *flag = CCAST(void *, thr->th.th_sleep_loc); + flag_type type = thr->th.th_sleep_loc_type; if (!flag) return; - - switch (RCAST(kmp_flag_64<> *, CCAST(void *, flag))->get_type()) { + // Attempt to wake up a thread: examine its type and call appropriate template + switch (type) { case flag32: - __kmp_resume_32(gtid, (kmp_flag_32<> *)NULL); + __kmp_resume_32(gtid, RCAST(kmp_flag_32<> *, flag)); break; case flag64: - __kmp_resume_64(gtid, (kmp_flag_64<> *)NULL); + __kmp_resume_64(gtid, RCAST(kmp_flag_64<> *, flag)); + break; + case atomic_flag64: + __kmp_atomic_resume_64(gtid, RCAST(kmp_atomic_flag_64<> *, flag)); break; case flag_oncore: - __kmp_resume_oncore(gtid, (kmp_flag_oncore *)NULL); + __kmp_resume_oncore(gtid, RCAST(kmp_flag_oncore *, flag)); + break; +#ifdef KMP_DEBUG + case flag_unset: + KF_TRACE(100, ("__kmp_null_resume_wrapper: flag type %d is unset\n", type)); break; + default: + KF_TRACE(100, ("__kmp_null_resume_wrapper: flag type %d does not match any " + "known flag type\n", + type)); +#endif } } diff --git a/openmp/runtime/src/kmp_wait_release.cpp b/openmp/runtime/src/kmp_wait_release.cpp --- a/openmp/runtime/src/kmp_wait_release.cpp +++ b/openmp/runtime/src/kmp_wait_release.cpp @@ -33,6 +33,10 @@ void __kmp_mwait_64(int th_gtid, kmp_flag_64 *flag) { __kmp_mwait_template(th_gtid, flag); } +template +void __kmp_atomic_mwait_64(int th_gtid, kmp_atomic_flag_64 *flag) { + __kmp_mwait_template(th_gtid, flag); +} void __kmp_mwait_oncore(int th_gtid, kmp_flag_oncore *flag) { __kmp_mwait_template(th_gtid, flag); } @@ -40,4 +44,8 @@ template void __kmp_mwait_32(int, kmp_flag_32 *); template void __kmp_mwait_64(int, kmp_flag_64 *); template void __kmp_mwait_64(int, kmp_flag_64 *); +template void +__kmp_atomic_mwait_64(int, kmp_atomic_flag_64 *); +template void +__kmp_atomic_mwait_64(int, kmp_atomic_flag_64 *); #endif diff --git a/openmp/runtime/src/z_Linux_util.cpp b/openmp/runtime/src/z_Linux_util.cpp --- a/openmp/runtime/src/z_Linux_util.cpp +++ b/openmp/runtime/src/z_Linux_util.cpp @@ -1409,9 +1409,13 @@ /* TODO: shouldn't this use release semantics to ensure that __kmp_suspend_initialize_thread gets called first? */ old_spin = flag->set_sleeping(); + TCW_PTR(th->th.th_sleep_loc, (void *)flag); + th->th.th_sleep_loc_type = flag->get_type(); if (__kmp_dflt_blocktime == KMP_MAX_BLOCKTIME && __kmp_pause_status != kmp_soft_paused) { flag->unset_sleeping(); + TCW_PTR(th->th.th_sleep_loc, NULL); + th->th.th_sleep_loc_type = flag_unset; __kmp_unlock_suspend_mx(th); return; } @@ -1419,8 +1423,10 @@ " was %x\n", th_gtid, flag->get(), flag->load(), old_spin)); - if (flag->done_check_val(old_spin)) { - old_spin = flag->unset_sleeping(); + if (flag->done_check_val(old_spin) || flag->done_check()) { + flag->unset_sleeping(); + TCW_PTR(th->th.th_sleep_loc, NULL); + th->th.th_sleep_loc_type = flag_unset; KF_TRACE(5, ("__kmp_suspend_template: T#%d false alarm, reset sleep bit " "for spin(%p)\n", th_gtid, flag->get())); @@ -1429,7 +1435,6 @@ "with low probability" return when the condition variable has not been signaled or broadcast */ int deactivated = FALSE; - TCW_PTR(th->th.th_sleep_loc, (void *)flag); while (flag->is_sleeping()) { #ifdef DEBUG_SUSPEND @@ -1451,6 +1456,9 @@ deactivated = TRUE; } + KMP_DEBUG_ASSERT(th->th.th_sleep_loc); + KMP_DEBUG_ASSERT(flag->get_type() == th->th.th_sleep_loc_type); + #if USE_SUSPEND_TIMEOUT struct timespec now; struct timeval tval; @@ -1480,6 +1488,18 @@ if ((status != 0) && (status != EINTR) && (status != ETIMEDOUT)) { KMP_SYSFAIL("pthread_cond_wait", status); } + + KMP_DEBUG_ASSERT(flag->get_type() == flag->get_ptr_type()); + + if (!flag->is_sleeping() && + ((status == EINTR) || (status == ETIMEDOUT))) { + // if interrupt or timeout, and thread is no longer sleeping, we need to + // make sure sleep_loc gets reset; however, this shouldn't be needed if + // we woke up with resume + flag->unset_sleeping(); + TCW_PTR(th->th.th_sleep_loc, NULL); + th->th.th_sleep_loc_type = flag_unset; + } #ifdef KMP_DEBUG if (status == ETIMEDOUT) { if (flag->is_sleeping()) { @@ -1489,6 +1509,8 @@ KF_TRACE(2, ("__kmp_suspend_template: T#%d timeout wakeup, sleep bit " "not set!\n", th_gtid)); + TCW_PTR(th->th.th_sleep_loc, NULL); + th->th.th_sleep_loc_type = flag_unset; } } else if (flag->is_sleeping()) { KF_TRACE(100, @@ -1506,6 +1528,13 @@ } } } + // We may have had the loop variable set before entering the loop body; + // so we need to reset sleep_loc. + TCW_PTR(th->th.th_sleep_loc, NULL); + th->th.th_sleep_loc_type = flag_unset; + + KMP_DEBUG_ASSERT(!flag->is_sleeping()); + KMP_DEBUG_ASSERT(!th->th.th_sleep_loc); #ifdef DEBUG_SUSPEND { char buffer[128]; @@ -1527,6 +1556,10 @@ void __kmp_suspend_64(int th_gtid, kmp_flag_64 *flag) { __kmp_suspend_template(th_gtid, flag); } +template +void __kmp_atomic_suspend_64(int th_gtid, kmp_atomic_flag_64 *flag) { + __kmp_suspend_template(th_gtid, flag); +} void __kmp_suspend_oncore(int th_gtid, kmp_flag_oncore *flag) { __kmp_suspend_template(th_gtid, flag); } @@ -1534,6 +1567,10 @@ template void __kmp_suspend_32(int, kmp_flag_32 *); template void __kmp_suspend_64(int, kmp_flag_64 *); template void __kmp_suspend_64(int, kmp_flag_64 *); +template void +__kmp_atomic_suspend_64(int, kmp_atomic_flag_64 *); +template void +__kmp_atomic_suspend_64(int, kmp_atomic_flag_64 *); /* This routine signals the thread specified by target_gtid to wake up after setting the sleep bit indicated by the flag argument to FALSE. @@ -1556,36 +1593,50 @@ __kmp_lock_suspend_mx(th); - if (!flag) { // coming from __kmp_null_resume_wrapper + if (!flag || flag != th->th.th_sleep_loc) { + // coming from __kmp_null_resume_wrapper, or thread is now sleeping on a + // different location; wake up at new location flag = (C *)CCAST(void *, th->th.th_sleep_loc); } // First, check if the flag is null or its type has changed. If so, someone // else woke it up. - if (!flag || flag->get_type() != flag->get_ptr_type()) { // get_ptr_type - // simply shows what flag was cast to + if (!flag) { // Thread doesn't appear to be sleeping on anything KF_TRACE(5, ("__kmp_resume_template: T#%d exiting, thread T#%d already " "awake: flag(%p)\n", - gtid, target_gtid, NULL)); + gtid, target_gtid, (void *)NULL)); + __kmp_unlock_suspend_mx(th); + return; + } else if (flag->get_type() != th->th.th_sleep_loc_type) { + // Flag type does not appear to match this function template; possibly the + // thread is sleeping on something else. Try null resume again. + KF_TRACE( + 5, + ("__kmp_resume_template: T#%d retrying, thread T#%d Mismatch flag(%p), " + "spin(%p) type=%d ptr_type=%d\n", + gtid, target_gtid, flag, flag->get(), flag->get_type(), + th->th.th_sleep_loc_type)); __kmp_unlock_suspend_mx(th); + __kmp_null_resume_wrapper(th); return; } else { // if multiple threads are sleeping, flag should be internally // referring to a specific thread here - typename C::flag_t old_spin = flag->unset_sleeping(); - if (!flag->is_sleeping_val(old_spin)) { + if (!flag->is_sleeping()) { KF_TRACE(5, ("__kmp_resume_template: T#%d exiting, thread T#%d already " - "awake: flag(%p): " - "%u => %u\n", - gtid, target_gtid, flag->get(), old_spin, flag->load())); + "awake: flag(%p): %u\n", + gtid, target_gtid, flag->get(), (unsigned int)flag->load())); __kmp_unlock_suspend_mx(th); return; } - KF_TRACE(5, ("__kmp_resume_template: T#%d about to wakeup T#%d, reset " - "sleep bit for flag's loc(%p): " - "%u => %u\n", - gtid, target_gtid, flag->get(), old_spin, flag->load())); } + KMP_DEBUG_ASSERT(flag); + flag->unset_sleeping(); TCW_PTR(th->th.th_sleep_loc, NULL); + th->th.th_sleep_loc_type = flag_unset; + + KF_TRACE(5, ("__kmp_resume_template: T#%d about to wakeup T#%d, reset " + "sleep bit for flag's loc(%p): %u\n", + gtid, target_gtid, flag->get(), (unsigned int)flag->load())); #ifdef DEBUG_SUSPEND { @@ -1611,12 +1662,19 @@ void __kmp_resume_64(int target_gtid, kmp_flag_64 *flag) { __kmp_resume_template(target_gtid, flag); } +template +void __kmp_atomic_resume_64(int target_gtid, kmp_atomic_flag_64 *flag) { + __kmp_resume_template(target_gtid, flag); +} void __kmp_resume_oncore(int target_gtid, kmp_flag_oncore *flag) { __kmp_resume_template(target_gtid, flag); } template void __kmp_resume_32(int, kmp_flag_32 *); +template void __kmp_resume_32(int, kmp_flag_32 *); template void __kmp_resume_64(int, kmp_flag_64 *); +template void +__kmp_atomic_resume_64(int, kmp_atomic_flag_64 *); #if KMP_USE_MONITOR void __kmp_resume_monitor() { diff --git a/openmp/runtime/src/z_Windows_NT_util.cpp b/openmp/runtime/src/z_Windows_NT_util.cpp --- a/openmp/runtime/src/z_Windows_NT_util.cpp +++ b/openmp/runtime/src/z_Windows_NT_util.cpp @@ -240,13 +240,12 @@ continue; } // condition fulfilled, exiting - old_f = flag->unset_sleeping(); - KMP_DEBUG_ASSERT(old_f & KMP_BARRIER_SLEEP_STATE); + flag->unset_sleeping(); TCW_PTR(th->th.th_sleep_loc, NULL); - KF_TRACE(50, - ("__kmp_win32_cond_wait: exiting, condition " - "fulfilled: flag's loc(%p): %u => %u\n", - flag->get(), (unsigned int)old_f, (unsigned int)flag->load())); + th->th.th_sleep_loc_type = flag_unset; + KF_TRACE(50, ("__kmp_win32_cond_wait: exiting, condition " + "fulfilled: flag's loc(%p): %u\n", + flag->get(), (unsigned int)flag->load())); __kmp_win32_mutex_lock(&cv->waiters_count_lock_); KMP_DEBUG_ASSERT(cv->waiters_count_ > 0); @@ -376,9 +375,13 @@ /* TODO: shouldn't this use release semantics to ensure that __kmp_suspend_initialize_thread gets called first? */ old_spin = flag->set_sleeping(); + TCW_PTR(th->th.th_sleep_loc, (void *)flag); + th->th.th_sleep_loc_type = flag->get_type(); if (__kmp_dflt_blocktime == KMP_MAX_BLOCKTIME && __kmp_pause_status != kmp_soft_paused) { flag->unset_sleeping(); + TCW_PTR(th->th.th_sleep_loc, NULL); + th->th.th_sleep_loc_type = flag_unset; __kmp_unlock_suspend_mx(th); return; } @@ -387,8 +390,10 @@ " loc(%p)==%u\n", th_gtid, flag->get(), (unsigned int)flag->load())); - if (flag->done_check_val(old_spin)) { - old_spin = flag->unset_sleeping(); + if (flag->done_check_val(old_spin) || flag->done_check()) { + flag->unset_sleeping(); + TCW_PTR(th->th.th_sleep_loc, NULL); + th->th.th_sleep_loc_type = flag_unset; KF_TRACE(5, ("__kmp_suspend_template: T#%d false alarm, reset sleep bit " "for flag's loc(%p)\n", th_gtid, flag->get())); @@ -400,7 +405,7 @@ low probability" return when the condition variable has not been signaled or broadcast */ int deactivated = FALSE; - TCW_PTR(th->th.th_sleep_loc, (void *)flag); + while (flag->is_sleeping()) { KF_TRACE(15, ("__kmp_suspend_template: T#%d about to perform " "kmp_win32_cond_wait()\n", @@ -415,13 +420,14 @@ KMP_DEBUG_ASSERT(TCR_4(__kmp_thread_pool_active_nth) >= 0); } deactivated = TRUE; - __kmp_win32_cond_wait(&th->th.th_suspend_cv, &th->th.th_suspend_mx, th, - flag); - } else { - __kmp_win32_cond_wait(&th->th.th_suspend_cv, &th->th.th_suspend_mx, th, - flag); } + KMP_DEBUG_ASSERT(th->th.th_sleep_loc); + KMP_DEBUG_ASSERT(th->th.th_sleep_loc_type == flag->get_type()); + + __kmp_win32_cond_wait(&th->th.th_suspend_cv, &th->th.th_suspend_mx, th, + flag); + #ifdef KMP_DEBUG if (flag->is_sleeping()) { KF_TRACE(100, @@ -431,6 +437,14 @@ } // while + // We may have had the loop variable set before entering the loop body; + // so we need to reset sleep_loc. + TCW_PTR(th->th.th_sleep_loc, NULL); + th->th.th_sleep_loc_type = flag_unset; + + KMP_DEBUG_ASSERT(!flag->is_sleeping()); + KMP_DEBUG_ASSERT(!th->th.th_sleep_loc); + // Mark the thread as active again (if it was previous marked as inactive) if (deactivated) { th->th.th_active = TRUE; @@ -453,6 +467,10 @@ void __kmp_suspend_64(int th_gtid, kmp_flag_64 *flag) { __kmp_suspend_template(th_gtid, flag); } +template +void __kmp_atomic_suspend_64(int th_gtid, kmp_atomic_flag_64 *flag) { + __kmp_suspend_template(th_gtid, flag); +} void __kmp_suspend_oncore(int th_gtid, kmp_flag_oncore *flag) { __kmp_suspend_template(th_gtid, flag); } @@ -460,6 +478,10 @@ template void __kmp_suspend_32(int, kmp_flag_32 *); template void __kmp_suspend_64(int, kmp_flag_64 *); template void __kmp_suspend_64(int, kmp_flag_64 *); +template void +__kmp_atomic_suspend_64(int, kmp_atomic_flag_64 *); +template void +__kmp_atomic_suspend_64(int, kmp_atomic_flag_64 *); /* This routine signals the thread specified by target_gtid to wake up after setting the sleep bit indicated by the flag argument to FALSE */ @@ -477,32 +499,35 @@ __kmp_suspend_initialize_thread(th); __kmp_lock_suspend_mx(th); - if (!flag) { // coming from __kmp_null_resume_wrapper + if (!flag || flag != th->th.th_sleep_loc) { + // coming from __kmp_null_resume_wrapper, or thread is now sleeping on a + // different location; wake up at new location flag = (C *)th->th.th_sleep_loc; } // First, check if the flag is null or its type has changed. If so, someone // else woke it up. - if (!flag || flag->get_type() != flag->get_ptr_type()) { // get_ptr_type - // simply shows what - // flag was cast to + if (!flag || flag->get_type() != th->th.th_sleep_loc_type) { + // simply shows what flag was cast to KF_TRACE(5, ("__kmp_resume_template: T#%d exiting, thread T#%d already " "awake: flag's loc(%p)\n", gtid, target_gtid, NULL)); __kmp_unlock_suspend_mx(th); return; } else { - typename C::flag_t old_spin = flag->unset_sleeping(); - if (!flag->is_sleeping_val(old_spin)) { + if (!flag->is_sleeping()) { KF_TRACE(5, ("__kmp_resume_template: T#%d exiting, thread T#%d already " - "awake: flag's loc(%p): %u => %u\n", - gtid, target_gtid, flag->get(), (unsigned int)old_spin, - (unsigned int)flag->load())); + "awake: flag's loc(%p): %u\n", + gtid, target_gtid, flag->get(), (unsigned int)flag->load())); __kmp_unlock_suspend_mx(th); return; } } + KMP_DEBUG_ASSERT(flag); + flag->unset_sleeping(); TCW_PTR(th->th.th_sleep_loc, NULL); + th->th.th_sleep_loc_type = flag_unset; + KF_TRACE(5, ("__kmp_resume_template: T#%d about to wakeup T#%d, reset sleep " "bit for flag's loc(%p)\n", gtid, target_gtid, flag->get())); @@ -523,12 +548,19 @@ void __kmp_resume_64(int target_gtid, kmp_flag_64 *flag) { __kmp_resume_template(target_gtid, flag); } +template +void __kmp_atomic_resume_64(int target_gtid, kmp_atomic_flag_64 *flag) { + __kmp_resume_template(target_gtid, flag); +} void __kmp_resume_oncore(int target_gtid, kmp_flag_oncore *flag) { __kmp_resume_template(target_gtid, flag); } template void __kmp_resume_32(int, kmp_flag_32 *); +template void __kmp_resume_32(int, kmp_flag_32 *); template void __kmp_resume_64(int, kmp_flag_64 *); +template void +__kmp_atomic_resume_64(int, kmp_atomic_flag_64 *); void __kmp_yield() { Sleep(0); } diff --git a/openmp/runtime/test/barrier/omp_barrier.c b/openmp/runtime/test/barrier/omp_barrier.c --- a/openmp/runtime/test/barrier/omp_barrier.c +++ b/openmp/runtime/test/barrier/omp_barrier.c @@ -2,6 +2,8 @@ // RUN: %libomp-compile && env KMP_BLOCKTIME=infinite %libomp-run // RUN: %libomp-compile && env KMP_PLAIN_BARRIER_PATTERN='hierarchical,hierarchical' KMP_FORKJOIN_BARRIER_PATTERN='hierarchical,hierarchical' %libomp-run // RUN: %libomp-compile && env KMP_BLOCKTIME=infinite KMP_PLAIN_BARRIER_PATTERN='hierarchical,hierarchical' KMP_FORKJOIN_BARRIER_PATTERN='hierarchical,hierarchical' %libomp-run +// RUN: %libomp-compile && env KMP_PLAIN_BARRIER_PATTERN='dist,dist' KMP_FORKJOIN_BARRIER_PATTERN='dist,dist' KMP_REDUCTION_BARRIER_PATTERN='dist,dist' %libomp-run +// RUN: %libomp-compile && env KMP_BLOCKTIME=infinite KMP_PLAIN_BARRIER_PATTERN='dist,dist' KMP_FORKJOIN_BARRIER_PATTERN='dist,dist' KMP_REDUCTION_BARRIER_PATTERN='dist,dist' %libomp-run #include #include "omp_testsuite.h" #include "omp_my_sleep.h"