Index: openmp/runtime/src/kmp.h =================================================================== --- openmp/runtime/src/kmp.h +++ openmp/runtime/src/kmp.h @@ -2361,7 +2361,11 @@ extern kmp_tasking_mode_t __kmp_tasking_mode; /* determines how/when to execute tasks */ extern int __kmp_task_stealing_constraint; +extern std::atomic __kmp_n_tasks_in_flight; extern int __kmp_enable_task_throttling; +extern kmp_int32 __kmp_task_maximum; +extern kmp_int32 __kmp_task_maximum_ready_per_thread; + extern kmp_int32 __kmp_default_device; // Set via OMP_DEFAULT_DEVICE if // specified, defaults to 0 otherwise // Set via OMP_MAX_TASK_PRIORITY if specified, defaults to 0 otherwise Index: openmp/runtime/src/kmp_global.cpp =================================================================== --- openmp/runtime/src/kmp_global.cpp +++ openmp/runtime/src/kmp_global.cpp @@ -349,8 +349,19 @@ KMP_BUILD_ASSERT(sizeof(kmp_tasking_flags_t) == 4); int __kmp_task_stealing_constraint = 1; /* Constrain task stealing by default */ -int __kmp_enable_task_throttling = 1; +std::atomic __kmp_n_tasks_in_flight = 0; /* n° of tasks in flight */ + +kmp_int32 __kmp_enable_task_throttling = 1; /* Serialize tasks once a threshold + is reached, such as the number of + ready tasks or the total number of + tasks */ + +kmp_int32 __kmp_task_maximum = 65536; /* number of tasks threshold before + serializing */ + +kmp_int32 __kmp_task_maximum_ready_per_thread = 256; /* number of ready tasks + before serializing */ #ifdef DEBUG_SUSPEND int __kmp_suspend_count = 0; #endif Index: openmp/runtime/src/kmp_settings.cpp =================================================================== --- openmp/runtime/src/kmp_settings.cpp +++ openmp/runtime/src/kmp_settings.cpp @@ -5338,6 +5338,33 @@ __kmp_stg_print_bool(buffer, name, __kmp_enable_task_throttling); } // __kmp_stg_print_task_throttling +// ----------------------------------------------------------------------------- +// KMP_TASK_MAXIMUM +static void __kmp_stg_parse_task_maximum(char const *name, char const *value, + void *data) { + __kmp_stg_parse_int(name, value, 1, INT_MAX, &__kmp_task_maximum); +} // __kmp_stg_parse_task_maximum + +static void __kmp_stg_print_task_maximum(kmp_str_buf_t *buffer, + char const *name, void *data) { + __kmp_stg_print_int(buffer, name, __kmp_task_maximum); +} // __kmp_stg_print_task_maximum + +// ----------------------------------------------------------------------------- +// KMP_TASK_MAXIMUM_READY +static void __kmp_stg_parse_task_maximum_ready_per_thread(char const *name, + char const *value, + void *data) { + __kmp_stg_parse_int(name, value, 1, INT_MAX, + &__kmp_task_maximum_ready_per_thread); +} // __kmp_stg_parse_task_maximum_ready_per_thread + +static void __kmp_stg_print_task_maximum_ready_per_thread(kmp_str_buf_t *buffer, + char const *name, + void *data) { + __kmp_stg_print_int(buffer, name, __kmp_task_maximum_ready_per_thread); +} // __kmp_stg_print_task_maximum_ready_per_thread + #if KMP_HAVE_MWAIT || KMP_HAVE_UMWAIT // ----------------------------------------------------------------------------- // KMP_USER_LEVEL_MWAIT @@ -5728,6 +5755,13 @@ {"KMP_ENABLE_TASK_THROTTLING", __kmp_stg_parse_task_throttling, __kmp_stg_print_task_throttling, NULL, 0, 0}, + {"KMP_TASK_MAXIMUM", __kmp_stg_parse_task_maximum, + __kmp_stg_print_task_maximum, NULL, 0, 0}, + + {"KMP_TASK_MAXIMUM_READY_PER_THREAD", + __kmp_stg_parse_task_maximum_ready_per_thread, + __kmp_stg_print_task_maximum_ready_per_thread, NULL, 0, 0}, + {"OMP_DISPLAY_ENV", __kmp_stg_parse_omp_display_env, __kmp_stg_print_omp_display_env, NULL, 0, 0}, {"OMP_CANCELLATION", __kmp_stg_parse_omp_cancellation, @@ -5742,7 +5776,8 @@ #if OMPX_TASKGRAPH {"KMP_MAX_TDGS", __kmp_stg_parse_max_tdgs, __kmp_std_print_max_tdgs, NULL, 0, 0}, - {"KMP_TDG_DOT", __kmp_stg_parse_tdg_dot, __kmp_stg_print_tdg_dot, NULL, 0, 0}, + {"KMP_TDG_DOT", __kmp_stg_parse_tdg_dot, __kmp_stg_print_tdg_dot, NULL, 0, + 0}, #endif #if OMPT_SUPPORT Index: openmp/runtime/src/kmp_tasking.cpp =================================================================== --- openmp/runtime/src/kmp_tasking.cpp +++ openmp/runtime/src/kmp_tasking.cpp @@ -438,10 +438,9 @@ __kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock); // Check if deque is full - if (TCR_4(thread_data->td.td_deque_ntasks) >= - TASK_DEQUE_SIZE(thread_data->td)) { - if (__kmp_enable_task_throttling && - __kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata, + if (__kmp_enable_task_throttling && TCR_4(thread_data->td.td_deque_ntasks) >= + __kmp_task_maximum_ready_per_thread) { + if (__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata, thread->th.th_current_task)) { __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); KA_TRACE(20, ("__kmp_push_priority_task: T#%d deque is full; returning " @@ -543,10 +542,9 @@ int locked = 0; // Check if deque is full - if (TCR_4(thread_data->td.td_deque_ntasks) >= - TASK_DEQUE_SIZE(thread_data->td)) { - if (__kmp_enable_task_throttling && - __kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata, + if (__kmp_enable_task_throttling && TCR_4(thread_data->td.td_deque_ntasks) >= + __kmp_task_maximum_ready_per_thread) { + if (__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata, thread->th.th_current_task)) { KA_TRACE(20, ("__kmp_push_task: T#%d deque is full; returning " "TASK_NOT_PUSHED for task %p\n", @@ -566,10 +564,10 @@ if (!locked) { __kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock); // Need to recheck as we can get a proxy task from thread outside of OpenMP - if (TCR_4(thread_data->td.td_deque_ntasks) >= - TASK_DEQUE_SIZE(thread_data->td)) { - if (__kmp_enable_task_throttling && - __kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata, + if (__kmp_enable_task_throttling && + TCR_4(thread_data->td.td_deque_ntasks) >= + __kmp_task_maximum_ready_per_thread) { + if (__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata, thread->th.th_current_task)) { __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); KA_TRACE(20, ("__kmp_push_task: T#%d deque is full on 2nd check; " @@ -906,6 +904,7 @@ #else /* ! USE_FAST_MEMORY */ __kmp_thread_free(thread, taskdata); #endif + --__kmp_n_tasks_in_flight; #if OMPX_TASKGRAPH } else { taskdata->td_flags.complete = 0; @@ -1456,6 +1455,11 @@ if (UNLIKELY(!TCR_4(__kmp_init_middle))) __kmp_middle_initialize(); + // task throttling: to many tasks co-existing, emptying queue now + if (__kmp_enable_task_throttling) + while (TCR_4(__kmp_n_tasks_in_flight.load()) >= __kmp_task_maximum) + __kmpc_omp_taskyield(NULL, gtid, 0); + if (flags->hidden_helper) { if (__kmp_enable_hidden_helper) { if (!TCR_4(__kmp_init_hidden_helper)) @@ -1550,6 +1554,7 @@ taskdata = (kmp_taskdata_t *)__kmp_thread_malloc(thread, shareds_offset + sizeof_shareds); #endif /* USE_FAST_MEMORY */ + ++__kmp_n_tasks_in_flight; task = KMP_TASKDATA_TO_TASK(taskdata); Index: openmp/runtime/test/tasking/omp_throttling_max.c =================================================================== --- /dev/null +++ openmp/runtime/test/tasking/omp_throttling_max.c @@ -0,0 +1,65 @@ +// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 +// KMP_TASK_MAXIMUM=0 %libomp-run RUN: %libomp-compile && env +// OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=1 %libomp-run +// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 +// KMP_TASK_MAXIMUM=256 %libomp-run RUN: %libomp-compile && env +// OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=65536 +// %libomp-run RUN: %libomp-compile && env OMP_NUM_THREADS=2 +// KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=100000 %libomp-run + +/** + * This test ensures that task throttling on the maximum number of tasks + * threshold works properly. + * + * It creates 2 threads (1 producer, 1 consummer) + * The producer infinitely create tasks 'T_i' until one executed + * The consumer is blocked until the producer starts throttling + * Executing any 'T_i' unblocks the consumer and stop the producer + * + * The assertion tests ensures that the producer does not create more than the + * total number of tasks provided by the programmer + */ + +#include +#include +#include + +/* default value */ +#define MAX_TASKS_DEFAULT (65536) + +int main(void) { + /* maximum number of tasks in-flight */ + char *max_tasks_str = getenv("KMP_TASK_MAXIMUM"); + int max_tasks = max_tasks_str ? atoi(max_tasks_str) : MAX_TASKS_DEFAULT; + if (max_tasks <= 0) + max_tasks = 1; + + /* check if throttling is enabled (it is by default) */ + char *throttling_str = getenv("KMP_ENABLE_TASK_THROTTLING"); + int throttling = throttling_str ? *throttling_str == '1' : 1; + assert(throttling); + + volatile int done = 0; + +/* testing KMP_TASK_MAXIMUM */ +#pragma omp parallel num_threads(2) default(none) \ + shared(max_tasks, throttling, done) + { + if (omp_get_thread_num() == 1) + while (!done) + ; + +#pragma omp master + { + int ntasks = 0; + while (!done) { +#pragma omp task default(none) shared(done) depend(out : max_tasks, throttling) + done = 1; + + assert(++ntasks <= max_tasks + 1); + } + } + } + + return 0; +} Index: openmp/runtime/test/tasking/omp_throttling_max_ready.c =================================================================== --- /dev/null +++ openmp/runtime/test/tasking/omp_throttling_max_ready.c @@ -0,0 +1,66 @@ +// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 +// KMP_TASK_MAXIMUM_READY=0 %libomp-run RUN: %libomp-compile && env +// OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY=1 +// %libomp-run RUN: %libomp-compile && env OMP_NUM_THREADS=2 +// KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY=256 %libomp-run RUN: +// %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 +// KMP_TASK_MAXIMUM_READY=65536 %libomp-run RUN: %libomp-compile && env +// OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY=100000 +// %libomp-run + +/** + * This test ensures that task throttling on the maximum number of ready tasks + * per thread threshold works properly. + * + * It creates 2 threads (1 producer, 1 consummer) + * The producer infinitely create tasks 'T_i' until one executed + * The consumer is blocked until the producer starts throttling + * Executing any 'T_i' unblocks the consumer and stop the producer + * + * The assertion tests ensures that the producer does not create more than the + * total number of tasks provided by the programmer + */ + +#include +#include +#include +#include + +#define MAX_TASKS_READY_DEFAULT (1 << 8) + +int main(void) { + /* maximum number of ready tasks in-flight */ + char *max_tasks_ready_str = getenv("KMP_TASK_MAXIMUM_READY"); + int max_tasks_ready = + max_tasks_ready_str ? atoi(max_tasks_ready_str) : MAX_TASKS_READY_DEFAULT; + if (max_tasks_ready <= 0) + max_tasks_ready = 1; + + /* check if throttling is enabled (it is by default) */ + char *throttling_str = getenv("KMP_ENABLE_TASK_THROTTLING"); + int throttling = throttling_str ? *throttling_str == '1' : 1; + + volatile int done = 0; + +/* testing KMP_TASK_MAXIMUM_READY */ +#pragma omp parallel num_threads(2) default(none) \ + shared(max_tasks_ready, throttling, done) + { + if (omp_get_thread_num() == 1) + while (!done) + ; + +#pragma omp master + { + int ntasks = 0; + while (!done) { +#pragma omp task default(none) shared(done) + done = 1; + + assert(++ntasks <= max_tasks_ready + 1); + } + } + } + + return 0; +}