Index: openmp/runtime/src/kmp.h =================================================================== --- openmp/runtime/src/kmp.h +++ openmp/runtime/src/kmp.h @@ -2361,7 +2361,11 @@ extern kmp_tasking_mode_t __kmp_tasking_mode; /* determines how/when to execute tasks */ extern int __kmp_task_stealing_constraint; +extern std::atomic __kmp_n_tasks_in_flight; extern int __kmp_enable_task_throttling; +extern kmp_int32 __kmp_task_maximum; +extern kmp_int32 __kmp_task_maximum_ready_per_thread; + extern kmp_int32 __kmp_default_device; // Set via OMP_DEFAULT_DEVICE if // specified, defaults to 0 otherwise // Set via OMP_MAX_TASK_PRIORITY if specified, defaults to 0 otherwise Index: openmp/runtime/src/kmp_global.cpp =================================================================== --- openmp/runtime/src/kmp_global.cpp +++ openmp/runtime/src/kmp_global.cpp @@ -349,8 +349,19 @@ KMP_BUILD_ASSERT(sizeof(kmp_tasking_flags_t) == 4); int __kmp_task_stealing_constraint = 1; /* Constrain task stealing by default */ -int __kmp_enable_task_throttling = 1; +std::atomic __kmp_n_tasks_in_flight = 0; /* n° of tasks in flight */ + +kmp_int32 __kmp_enable_task_throttling = 1; /* Serialize tasks once a threshold + is reached, such as the number of + ready tasks or the total number of + tasks */ + +kmp_int32 __kmp_task_maximum = 65536; /* number of tasks threshold before + serializing */ + +kmp_int32 __kmp_task_maximum_ready_per_thread = 256; /* number of ready tasks + before serializing */ #ifdef DEBUG_SUSPEND int __kmp_suspend_count = 0; #endif Index: openmp/runtime/src/kmp_settings.cpp =================================================================== --- openmp/runtime/src/kmp_settings.cpp +++ openmp/runtime/src/kmp_settings.cpp @@ -5338,6 +5338,33 @@ __kmp_stg_print_bool(buffer, name, __kmp_enable_task_throttling); } // __kmp_stg_print_task_throttling +// ----------------------------------------------------------------------------- +// KMP_TASK_MAXIMUM +static void __kmp_stg_parse_task_maximum(char const *name, char const *value, + void *data) { + __kmp_stg_parse_int(name, value, 1, INT_MAX, &__kmp_task_maximum); +} // __kmp_stg_parse_task_maximum + +static void __kmp_stg_print_task_maximum(kmp_str_buf_t *buffer, + char const *name, void *data) { + __kmp_stg_print_int(buffer, name, __kmp_task_maximum); +} // __kmp_stg_print_task_maximum + +// ----------------------------------------------------------------------------- +// KMP_TASK_MAXIMUM_READY_PER_THREAD +static void __kmp_stg_parse_task_maximum_ready_per_thread(char const *name, + char const *value, + void *data) { + __kmp_stg_parse_int(name, value, 1, INT_MAX, + &__kmp_task_maximum_ready_per_thread); +} // __kmp_stg_parse_task_maximum_ready_per_thread + +static void __kmp_stg_print_task_maximum_ready_per_thread(kmp_str_buf_t *buffer, + char const *name, + void *data) { + __kmp_stg_print_int(buffer, name, __kmp_task_maximum_ready_per_thread); +} // __kmp_stg_print_task_maximum_ready_per_thread + #if KMP_HAVE_MWAIT || KMP_HAVE_UMWAIT // ----------------------------------------------------------------------------- // KMP_USER_LEVEL_MWAIT @@ -5728,6 +5755,13 @@ {"KMP_ENABLE_TASK_THROTTLING", __kmp_stg_parse_task_throttling, __kmp_stg_print_task_throttling, NULL, 0, 0}, + {"KMP_TASK_MAXIMUM", __kmp_stg_parse_task_maximum, + __kmp_stg_print_task_maximum, NULL, 0, 0}, + + {"KMP_TASK_MAXIMUM_READY_PER_THREAD", + __kmp_stg_parse_task_maximum_ready_per_thread, + __kmp_stg_print_task_maximum_ready_per_thread, NULL, 0, 0}, + {"OMP_DISPLAY_ENV", __kmp_stg_parse_omp_display_env, __kmp_stg_print_omp_display_env, NULL, 0, 0}, {"OMP_CANCELLATION", __kmp_stg_parse_omp_cancellation, @@ -5742,7 +5776,8 @@ #if OMPX_TASKGRAPH {"KMP_MAX_TDGS", __kmp_stg_parse_max_tdgs, __kmp_std_print_max_tdgs, NULL, 0, 0}, - {"KMP_TDG_DOT", __kmp_stg_parse_tdg_dot, __kmp_stg_print_tdg_dot, NULL, 0, 0}, + {"KMP_TDG_DOT", __kmp_stg_parse_tdg_dot, __kmp_stg_print_tdg_dot, NULL, 0, + 0}, #endif #if OMPT_SUPPORT Index: openmp/runtime/src/kmp_tasking.cpp =================================================================== --- openmp/runtime/src/kmp_tasking.cpp +++ openmp/runtime/src/kmp_tasking.cpp @@ -438,10 +438,9 @@ __kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock); // Check if deque is full - if (TCR_4(thread_data->td.td_deque_ntasks) >= - TASK_DEQUE_SIZE(thread_data->td)) { - if (__kmp_enable_task_throttling && - __kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata, + if (__kmp_enable_task_throttling && TCR_4(thread_data->td.td_deque_ntasks) >= + __kmp_task_maximum_ready_per_thread) { + if (__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata, thread->th.th_current_task)) { __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); KA_TRACE(20, ("__kmp_push_priority_task: T#%d deque is full; returning " @@ -543,40 +542,51 @@ int locked = 0; // Check if deque is full - if (TCR_4(thread_data->td.td_deque_ntasks) >= - TASK_DEQUE_SIZE(thread_data->td)) { - if (__kmp_enable_task_throttling && + int requires_resize = TCR_4(thread_data->td.td_deque_ntasks) >= + TASK_DEQUE_SIZE(thread_data->td); + int requires_throttling = + __kmp_enable_task_throttling && TCR_4(thread_data->td.td_deque_ntasks) >= + __kmp_task_maximum_ready_per_thread; + int thread_can_execute; + if (requires_resize || requires_throttling) { + thread_can_execute = __kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata, - thread->th.th_current_task)) { + thread->th.th_current_task); + if (requires_throttling && thread_can_execute) { KA_TRACE(20, ("__kmp_push_task: T#%d deque is full; returning " "TASK_NOT_PUSHED for task %p\n", gtid, taskdata)); return TASK_NOT_PUSHED; - } else { + } else { /* maybe requires_resize */ __kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock); locked = 1; - if (TCR_4(thread_data->td.td_deque_ntasks) >= - TASK_DEQUE_SIZE(thread_data->td)) { - // expand deque to push the task which is not allowed to execute + requires_resize = TCR_4(thread_data->td.td_deque_ntasks) >= + TASK_DEQUE_SIZE(thread_data->td); + // expand deque to push the task which is not allowed to execute + if (requires_resize) __kmp_realloc_task_deque(thread, thread_data); - } } } // Lock the deque for the task push operation if (!locked) { __kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock); // Need to recheck as we can get a proxy task from thread outside of OpenMP - if (TCR_4(thread_data->td.td_deque_ntasks) >= - TASK_DEQUE_SIZE(thread_data->td)) { - if (__kmp_enable_task_throttling && + requires_resize = TCR_4(thread_data->td.td_deque_ntasks) >= + TASK_DEQUE_SIZE(thread_data->td); + requires_throttling = __kmp_enable_task_throttling && + TCR_4(thread_data->td.td_deque_ntasks) >= + __kmp_task_maximum_ready_per_thread; + if (requires_resize || requires_throttling) { + thread_can_execute = __kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata, - thread->th.th_current_task)) { + thread->th.th_current_task); + if (requires_throttling && thread_can_execute) { __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); KA_TRACE(20, ("__kmp_push_task: T#%d deque is full on 2nd check; " "returning TASK_NOT_PUSHED for task %p\n", gtid, taskdata)); return TASK_NOT_PUSHED; - } else { + } else { /* requires_resize */ // expand deque to push the task which is not allowed to execute __kmp_realloc_task_deque(thread, thread_data); } @@ -906,6 +916,7 @@ #else /* ! USE_FAST_MEMORY */ __kmp_thread_free(thread, taskdata); #endif + --__kmp_n_tasks_in_flight; #if OMPX_TASKGRAPH } else { taskdata->td_flags.complete = 0; @@ -1456,6 +1467,11 @@ if (UNLIKELY(!TCR_4(__kmp_init_middle))) __kmp_middle_initialize(); + // task throttling: to many tasks co-existing, emptying queue now + if (__kmp_enable_task_throttling) + while (TCR_4(__kmp_n_tasks_in_flight.load()) >= __kmp_task_maximum) + __kmpc_omp_taskyield(NULL, gtid, 0); + if (flags->hidden_helper) { if (__kmp_enable_hidden_helper) { if (!TCR_4(__kmp_init_hidden_helper)) @@ -1550,6 +1566,7 @@ taskdata = (kmp_taskdata_t *)__kmp_thread_malloc(thread, shareds_offset + sizeof_shareds); #endif /* USE_FAST_MEMORY */ + ++__kmp_n_tasks_in_flight; task = KMP_TASKDATA_TO_TASK(taskdata); Index: openmp/runtime/test/tasking/omp_throttling_max.c =================================================================== --- /dev/null +++ openmp/runtime/test/tasking/omp_throttling_max.c @@ -0,0 +1,62 @@ +// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=0 %libomp-run +// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=1 %libomp-run +// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=256 %libomp-run +// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=65536 %libomp-run +// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=100000 %libomp-run + +/** + * This test ensures that task throttling on the maximum number of tasks + * threshold works properly. + * + * It creates 2 threads (1 producer, 1 consummer) + * The producer infinitely create tasks 'T_i' until one executed + * The consumer is blocked until the producer starts throttling + * Executing any 'T_i' unblocks the consumer and stop the producer + * + * The assertion tests ensures that the producer does not create more than the + * total number of tasks provided by the programmer + */ + +#include +#include +#include + +/* default value */ +#define MAX_TASKS_DEFAULT (65536) + +int main(void) { + /* maximum number of tasks in-flight */ + char *max_tasks_str = getenv("KMP_TASK_MAXIMUM"); + int max_tasks = max_tasks_str ? atoi(max_tasks_str) : MAX_TASKS_DEFAULT; + if (max_tasks <= 0) + max_tasks = 1; + + /* check if throttling is enabled (it is by default) */ + char *throttling_str = getenv("KMP_ENABLE_TASK_THROTTLING"); + int throttling = throttling_str ? *throttling_str == '1' : 1; + assert(throttling); + + volatile int done = 0; + +/* testing KMP_TASK_MAXIMUM */ +#pragma omp parallel num_threads(2) default(none) \ + shared(max_tasks, throttling, done) + { + if (omp_get_thread_num() == 1) + while (!done) + ; + +#pragma omp master + { + int ntasks = 0; + while (!done) { +#pragma omp task default(none) shared(done) depend(out : max_tasks, throttling) + done = 1; + + assert(++ntasks <= max_tasks + 1); + } + } + } + + return 0; +} Index: openmp/runtime/test/tasking/omp_throttling_max_ready_per_thread.c =================================================================== --- /dev/null +++ openmp/runtime/test/tasking/omp_throttling_max_ready_per_thread.c @@ -0,0 +1,62 @@ +// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=0 %libomp-run +// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=1 %libomp-run +// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=256 %libomp-run +// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=65536 %libomp-run +// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=100000 %libomp-run + +/** + * This test ensures that task throttling on the maximum number of ready tasks + * per thread threshold works properly. + * + * It creates 2 threads (1 producer, 1 consummer) + * The producer infinitely create tasks 'T_i' until one executed + * The consumer is blocked until the producer starts throttling + * Executing any 'T_i' unblocks the consumer and stop the producer + * + * The assertion tests ensures that the producer does not create more than the + * total number of tasks provided by the programmer + */ + +#include +#include +#include +#include + +#define MAX_TASKS_READY_DEFAULT (1 << 8) + +int main(void) { + /* maximum number of ready tasks in-flight */ + char *max_tasks_ready_str = getenv("KMP_TASK_MAXIMUM_READY_PER_THREAD"); + int max_tasks_ready = + max_tasks_ready_str ? atoi(max_tasks_ready_str) : MAX_TASKS_READY_DEFAULT; + if (max_tasks_ready <= 0) + max_tasks_ready = 1; + + /* check if throttling is enabled (it is by default) */ + char *throttling_str = getenv("KMP_ENABLE_TASK_THROTTLING"); + int throttling = throttling_str ? *throttling_str == '1' : 1; + + volatile int done = 0; + +/* testing KMP_TASK_MAXIMUM_READY */ +#pragma omp parallel num_threads(2) default(none) \ + shared(max_tasks_ready, throttling, done) + { + if (omp_get_thread_num() == 1) + while (!done) + ; + +#pragma omp master + { + int ntasks = 0; + while (!done) { +#pragma omp task default(none) shared(done) + done = 1; + + assert(++ntasks <= max_tasks_ready + 1); + } + } + } + + return 0; +}