diff --git a/openmp/runtime/src/kmp.h b/openmp/runtime/src/kmp.h --- a/openmp/runtime/src/kmp.h +++ b/openmp/runtime/src/kmp.h @@ -2548,11 +2548,22 @@ char td_pad[KMP_PAD(kmp_base_thread_data_t, CACHE_LINE)]; } kmp_thread_data_t; +typedef struct kmp_task_pri { + kmp_thread_data_t td; + kmp_int32 priority; + kmp_task_pri *next; +} kmp_task_pri_t; + // Data for task teams which are used when tasking is enabled for the team typedef struct kmp_base_task_team { kmp_bootstrap_lock_t tt_threads_lock; /* Lock used to allocate per-thread part of task team */ /* must be bootstrap lock since used at library shutdown*/ + + // TODO: check performance vs kmp_tas_lock_t + kmp_bootstrap_lock_t tt_task_pri_lock; /* Lock to access priority tasks */ + kmp_task_pri_t *tt_task_pri_list; + kmp_task_team_t *tt_next; /* For linking the task team free list */ kmp_thread_data_t *tt_threads_data; /* Array of per-thread structures for task team */ @@ -2564,6 +2575,7 @@ kmp_int32 tt_max_threads; // # entries allocated for threads_data array kmp_int32 tt_found_proxy_tasks; // found proxy tasks since last barrier kmp_int32 tt_untied_task_encountered; + std::atomic tt_num_task_pri; // number of priority tasks enqueued // There is hidden helper thread encountered in this task team so that we must // wait when waiting on task team kmp_int32 tt_hidden_helper_task_encountered; diff --git a/openmp/runtime/src/kmp_tasking.cpp b/openmp/runtime/src/kmp_tasking.cpp --- a/openmp/runtime/src/kmp_tasking.cpp +++ b/openmp/runtime/src/kmp_tasking.cpp @@ -319,6 +319,144 @@ thread_data->td.td_deque_size = new_size; } +static kmp_task_pri_t *__kmp_alloc_task_pri_list() { + kmp_task_pri_t *l = (kmp_task_pri_t *)__kmp_allocate(sizeof(kmp_task_pri_t)); + kmp_thread_data_t *thread_data = &l->td; + __kmp_init_bootstrap_lock(&thread_data->td.td_deque_lock); + thread_data->td.td_deque_last_stolen = -1; + KE_TRACE(20, ("__kmp_alloc_task_pri_list: T#%d allocating deque[%d] " + "for thread_data %p\n", + __kmp_get_gtid(), INITIAL_TASK_DEQUE_SIZE, thread_data)); + thread_data->td.td_deque = (kmp_taskdata_t **)__kmp_allocate( + INITIAL_TASK_DEQUE_SIZE * sizeof(kmp_taskdata_t *)); + thread_data->td.td_deque_size = INITIAL_TASK_DEQUE_SIZE; + return l; +} + +// The function finds the deque of priority tasks with given priority, or +// allocates a new deque and put it into sorted (high -> low) list of deques. +// Deques of non-default priority tasks are shared between all threads in team, +// as opposed to per-thread deques of tasks with default priority. +// The function is called under the lock task_team->tt.tt_task_pri_lock. +static kmp_thread_data_t * +__kmp_get_priority_deque_data(kmp_task_team_t *task_team, kmp_int32 pri) { + kmp_thread_data_t *thread_data; + kmp_task_pri_t *lst = task_team->tt.tt_task_pri_list; + if (lst->priority == pri) { + // Found queue of tasks with given priority. + thread_data = &lst->td; + } else if (lst->priority < pri) { + // All current priority queues contain tasks with lower priority. + // Allocate new one for given priority tasks. + kmp_task_pri_t *list = __kmp_alloc_task_pri_list(); + thread_data = &list->td; + list->priority = pri; + list->next = lst; + task_team->tt.tt_task_pri_list = list; + } else { // task_team->tt.tt_task_pri_list->priority > pri + kmp_task_pri_t *next_queue = lst->next; + while (next_queue && next_queue->priority > pri) { + lst = next_queue; + next_queue = lst->next; + } + // lst->priority > pri && (next == NULL || pri >= next->priority) + if (next_queue == NULL) { + // No queue with pri priority, need to allocate new one. + kmp_task_pri_t *list = __kmp_alloc_task_pri_list(); + thread_data = &list->td; + list->priority = pri; + list->next = NULL; + lst->next = list; + } else if (next_queue->priority == pri) { + // Found queue of tasks with given priority. + thread_data = &next_queue->td; + } else { // lst->priority > pri > next->priority + // insert newly allocated between existed queues + kmp_task_pri_t *list = __kmp_alloc_task_pri_list(); + thread_data = &list->td; + list->priority = pri; + list->next = next_queue; + lst->next = list; + } + } + return thread_data; +} + +// __kmp_push_priority_task: Add a task to the team's priority task deque +static kmp_int32 __kmp_push_priority_task(kmp_int32 gtid, kmp_info_t *thread, + kmp_taskdata_t *taskdata, + kmp_task_team_t *task_team, + kmp_int32 pri) { + kmp_thread_data_t *thread_data = NULL; + KA_TRACE(20, + ("__kmp_push_priority_task: T#%d trying to push task %p, pri %d.\n", + gtid, taskdata, pri)); + + // Find task queue specific to priority value + kmp_task_pri_t *lst = task_team->tt.tt_task_pri_list; + if (UNLIKELY(lst == NULL)) { + __kmp_acquire_bootstrap_lock(&task_team->tt.tt_task_pri_lock); + if (task_team->tt.tt_task_pri_list == NULL) { + // List of queues is still empty, allocate one. + kmp_task_pri_t *list = __kmp_alloc_task_pri_list(); + thread_data = &list->td; + list->priority = pri; + list->next = NULL; + task_team->tt.tt_task_pri_list = list; + } else { + // Other thread initialized a queue. Check if it fits and get thread_data. + thread_data = __kmp_get_priority_deque_data(task_team, pri); + } + __kmp_release_bootstrap_lock(&task_team->tt.tt_task_pri_lock); + } else { + if (lst->priority == pri) { + // Found queue of tasks with given priority. + thread_data = &lst->td; + } else { + __kmp_acquire_bootstrap_lock(&task_team->tt.tt_task_pri_lock); + thread_data = __kmp_get_priority_deque_data(task_team, pri); + __kmp_release_bootstrap_lock(&task_team->tt.tt_task_pri_lock); + } + } + KMP_DEBUG_ASSERT(thread_data); + + __kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock); + // Check if deque is full + if (TCR_4(thread_data->td.td_deque_ntasks) >= + TASK_DEQUE_SIZE(thread_data->td)) { + if (__kmp_enable_task_throttling && + __kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata, + thread->th.th_current_task)) { + __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); + KA_TRACE(20, ("__kmp_push_priority_task: T#%d deque is full; returning " + "TASK_NOT_PUSHED for task %p\n", + gtid, taskdata)); + return TASK_NOT_PUSHED; + } else { + // expand deque to push the task which is not allowed to execute + __kmp_realloc_task_deque(thread, thread_data); + } + } + KMP_DEBUG_ASSERT(TCR_4(thread_data->td.td_deque_ntasks) < + TASK_DEQUE_SIZE(thread_data->td)); + // Push taskdata. + thread_data->td.td_deque[thread_data->td.td_deque_tail] = taskdata; + // Wrap index. + thread_data->td.td_deque_tail = + (thread_data->td.td_deque_tail + 1) & TASK_DEQUE_MASK(thread_data->td); + TCW_4(thread_data->td.td_deque_ntasks, + TCR_4(thread_data->td.td_deque_ntasks) + 1); // Adjust task count + KMP_FSYNC_RELEASING(thread->th.th_current_task); // releasing self + KMP_FSYNC_RELEASING(taskdata); // releasing child + KA_TRACE(20, ("__kmp_push_priority_task: T#%d returning " + "TASK_SUCCESSFULLY_PUSHED: task=%p ntasks=%d head=%u tail=%u\n", + gtid, taskdata, thread_data->td.td_deque_ntasks, + thread_data->td.td_deque_head, thread_data->td.td_deque_tail)); + __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); + task_team->tt.tt_num_task_pri++; // atomic inc + return TASK_SUCCESSFULLY_PUSHED; +} + // __kmp_push_task: Add a task to the thread's deque static kmp_int32 __kmp_push_task(kmp_int32 gtid, kmp_task_t *task) { kmp_info_t *thread = __kmp_threads[gtid]; @@ -371,6 +509,12 @@ KMP_DEBUG_ASSERT(TCR_4(task_team->tt.tt_found_tasks) == TRUE); KMP_DEBUG_ASSERT(TCR_PTR(task_team->tt.tt_threads_data) != NULL); + if (taskdata->td_flags.priority_specified && task->data2.priority > 0 && + __kmp_max_task_priority > 0) { + int pri = KMP_MIN(task->data2.priority, __kmp_max_task_priority); + return __kmp_push_priority_task(gtid, thread, taskdata, task_team, pri); + } + // Find tasking deque specific to encountering thread thread_data = &task_team->tt.tt_threads_data[tid]; @@ -728,6 +872,10 @@ KMP_DEBUG_ASSERT(taskdata->td_allocated_child_tasks == 0 || taskdata->td_flags.task_serial == 1); KMP_DEBUG_ASSERT(taskdata->td_incomplete_child_tasks == 0); + kmp_task_t *task = KMP_TASKDATA_TO_TASK(taskdata); + // Clear data to not be re-used later by mistake. + task->data1.destructors = NULL; + task->data2.priority = 0; taskdata->td_flags.freed = 1; // deallocate the taskdata and shared variable blocks associated with this task @@ -2667,6 +2815,105 @@ #endif } +static kmp_task_t *__kmp_get_priority_task(kmp_int32 gtid, + kmp_task_team_t *task_team, + kmp_int32 is_constrained) { + kmp_task_t *task = NULL; + kmp_taskdata_t *taskdata; + kmp_taskdata_t *current; + kmp_thread_data_t *thread_data; + int ntasks = task_team->tt.tt_num_task_pri; + if (ntasks == 0) { + KA_TRACE( + 20, ("__kmp_get_priority_task(exit #1): T#%d No tasks to get\n", gtid)); + return NULL; + } + do { + // decrement num_tasks to "reserve" one task to get for execution + if (__kmp_atomic_compare_store(&task_team->tt.tt_num_task_pri, ntasks, + ntasks - 1)) + break; + } while (ntasks > 0); + if (ntasks == 0) { + KA_TRACE(20, ("__kmp_get_priority_task(exit #2): T#%d No tasks to get\n", + __kmp_get_gtid())); + return NULL; + } + // We got a "ticket" to get a "reserved" priority task + int deque_ntasks; + kmp_task_pri_t *list = task_team->tt.tt_task_pri_list; + do { + KMP_ASSERT(list != NULL); + thread_data = &list->td; + __kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock); + deque_ntasks = thread_data->td.td_deque_ntasks; + if (deque_ntasks == 0) { + __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); + KA_TRACE(20, ("__kmp_get_priority_task: T#%d No tasks to get from %p\n", + __kmp_get_gtid(), thread_data)); + list = list->next; + } + } while (deque_ntasks == 0); + KMP_DEBUG_ASSERT(deque_ntasks); + int target = thread_data->td.td_deque_head; + current = __kmp_threads[gtid]->th.th_current_task; + taskdata = thread_data->td.td_deque[target]; + if (__kmp_task_is_allowed(gtid, is_constrained, taskdata, current)) { + // Bump head pointer and Wrap. + thread_data->td.td_deque_head = + (target + 1) & TASK_DEQUE_MASK(thread_data->td); + } else { + if (!task_team->tt.tt_untied_task_encountered) { + // The TSC does not allow to steal victim task + __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); + KA_TRACE(20, ("__kmp_get_priority_task(exit #3): T#%d could not get task " + "from %p: task_team=%p ntasks=%d head=%u tail=%u\n", + gtid, thread_data, task_team, deque_ntasks, target, + thread_data->td.td_deque_tail)); + task_team->tt.tt_num_task_pri++; // atomic inc, restore value + return NULL; + } + int i; + // walk through the deque trying to steal any task + taskdata = NULL; + for (i = 1; i < deque_ntasks; ++i) { + target = (target + 1) & TASK_DEQUE_MASK(thread_data->td); + taskdata = thread_data->td.td_deque[target]; + if (__kmp_task_is_allowed(gtid, is_constrained, taskdata, current)) { + break; // found task to execute + } else { + taskdata = NULL; + } + } + if (taskdata == NULL) { + // No appropriate candidate found to execute + __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); + KA_TRACE( + 10, ("__kmp_get_priority_task(exit #4): T#%d could not get task from " + "%p: task_team=%p ntasks=%d head=%u tail=%u\n", + gtid, thread_data, task_team, deque_ntasks, + thread_data->td.td_deque_head, thread_data->td.td_deque_tail)); + task_team->tt.tt_num_task_pri++; // atomic inc, restore value + return NULL; + } + int prev = target; + for (i = i + 1; i < deque_ntasks; ++i) { + // shift remaining tasks in the deque left by 1 + target = (target + 1) & TASK_DEQUE_MASK(thread_data->td); + thread_data->td.td_deque[prev] = thread_data->td.td_deque[target]; + prev = target; + } + KMP_DEBUG_ASSERT( + thread_data->td.td_deque_tail == + (kmp_uint32)((target + 1) & TASK_DEQUE_MASK(thread_data->td))); + thread_data->td.td_deque_tail = target; // tail -= 1 (wrapped)) + } + thread_data->td.td_deque_ntasks = deque_ntasks - 1; + __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); + task = KMP_TASKDATA_TO_TASK(taskdata); + return task; +} + // __kmp_remove_my_task: remove a task from my own deque static kmp_task_t *__kmp_remove_my_task(kmp_info_t *thread, kmp_int32 gtid, kmp_task_team_t *task_team, @@ -2916,10 +3163,13 @@ // getting tasks from target constructs while (1) { // Inner loop to find a task and execute it task = NULL; - if (use_own_tasks) { // check on own queue first + if (task_team->tt.tt_num_task_pri) { // get priority task first + task = __kmp_get_priority_task(gtid, task_team, is_constrained); + } + if (task == NULL && use_own_tasks) { // check own queue next task = __kmp_remove_my_task(thread, gtid, task_team, is_constrained); } - if ((task == NULL) && (nthreads > 1)) { // Steal a task + if ((task == NULL) && (nthreads > 1)) { // Steal a task finally int asleep = 1; use_own_tasks = 0; // Try to steal from the last place I stole from successfully. @@ -3440,6 +3690,24 @@ __kmp_release_bootstrap_lock(&task_team->tt.tt_threads_lock); } +// __kmp_free_task_pri_list: +// Deallocates tasking deques used for priority tasks. +// Only occurs at library shutdown. +static void __kmp_free_task_pri_list(kmp_task_team_t *task_team) { + __kmp_acquire_bootstrap_lock(&task_team->tt.tt_task_pri_lock); + if (task_team->tt.tt_task_pri_list != NULL) { + kmp_task_pri_t *list = task_team->tt.tt_task_pri_list; + while (list != NULL) { + kmp_task_pri_t *next = list->next; + __kmp_free_task_deque(&list->td); + __kmp_free(list); + list = next; + } + task_team->tt.tt_task_pri_list = NULL; + } + __kmp_release_bootstrap_lock(&task_team->tt.tt_task_pri_lock); +} + // __kmp_allocate_task_team: // Allocates a task team associated with a specific team, taking it from // the global task team free list if possible. Also initializes data @@ -3471,6 +3739,7 @@ // __kmp_thread_malloc because threads not around for kmp_reap_task_team. task_team = (kmp_task_team_t *)__kmp_allocate(sizeof(kmp_task_team_t)); __kmp_init_bootstrap_lock(&task_team->tt.tt_threads_lock); + __kmp_init_bootstrap_lock(&task_team->tt.tt_task_pri_lock); #if USE_ITT_BUILD && USE_ITT_NOTIFY && KMP_DEBUG // suppress race conditions detection on synchronization flags in debug mode // this helps to analyze library internals eliminating false positives @@ -3540,6 +3809,9 @@ if (task_team->tt.tt_threads_data != NULL) { __kmp_free_task_threads_data(task_team); } + if (task_team->tt.tt_task_pri_list != NULL) { + __kmp_free_task_pri_list(task_team); + } __kmp_free(task_team); } __kmp_release_bootstrap_lock(&__kmp_task_team_lock); diff --git a/openmp/runtime/test/tasking/omp_task_priority2.c b/openmp/runtime/test/tasking/omp_task_priority2.c new file mode 100644 --- /dev/null +++ b/openmp/runtime/test/tasking/omp_task_priority2.c @@ -0,0 +1,145 @@ +// RUN: %libomp-compile && env OMP_MAX_TASK_PRIORITY='2' %libomp-run + +// Test OMP 4.5 task priorities +// Higher priority task supposed to be executed before lower priority task. + +#include +#include + +#include "omp_my_sleep.h" +// delay(n) - sleep n ms +#define delay(n) my_sleep(((double)n)/1000.0) + +int main ( void ) { + int passed; + passed = (omp_get_max_task_priority() == 2); + printf("Got %d max priority via env\n", omp_get_max_task_priority()); + if(!passed) { + printf( "failed\n" ); + return 1; + } + printf("parallel 1 spawns 4 tasks for primary thread to execute\n"); + #pragma omp parallel num_threads(2) + { + int th = omp_get_thread_num(); + if (th == 0) // primary thread + { + #pragma omp task priority(1) + { // middle priority + int val, t = omp_get_thread_num(); + #pragma omp atomic capture + val = passed++; + printf("P1: val = %d, thread gen %d, thread exe %d\n", val, th, t); + delay(10); // sleep 10 ms + } + #pragma omp task priority(2) + { // high priority + int val, t = omp_get_thread_num(); + #pragma omp atomic capture + val = passed++; + printf("P2: val = %d, thread gen %d, thread exe %d\n", val, th, t); + delay(20); // sleep 20 ms + } + #pragma omp task priority(0) + { // low priority specified explicitly + int val, t = omp_get_thread_num(); + #pragma omp atomic capture + val = passed++; + printf("P0exp: val = %d, thread gen %d, thread exe %d\n", val, th, t); + delay(1); // sleep 1 ms + } + #pragma omp task + { // low priority by default + int val, t = omp_get_thread_num(); + #pragma omp atomic capture + val = passed++; + printf("P0imp: val = %d, thread gen %d, thread exe %d\n", val, th, t); + delay(1); // sleep 1 ms + } + } else { + // wait for the primary thread to finish all tasks + int wait = 0; + do { + delay(5); + #pragma omp atomic read + wait = passed; + } while (wait < 5); + } + } + printf("parallel 2 spawns 4 tasks for worker thread to execute\n"); + #pragma omp parallel num_threads(2) + { + int th = omp_get_thread_num(); + if (th == 0) // primary thread + { + #pragma omp task priority(1) + { // middle priority + int val, t = omp_get_thread_num(); + #pragma omp atomic capture + val = passed++; + printf("P1: val = %d, thread gen %d, thread exe %d\n", val, th, t); + delay(10); // sleep 10 ms + } + #pragma omp task priority(2) + { // high priority + int val, t = omp_get_thread_num(); + #pragma omp atomic capture + val = passed++; + printf("P2: val = %d, thread gen %d, thread exe %d\n", val, th, t); + delay(20); // sleep 20 ms + } + #pragma omp task priority(0) + { // low priority specified explicitly + int val, t = omp_get_thread_num(); + #pragma omp atomic capture + val = passed++; + printf("P0exp: val = %d, thread gen %d, thread exe %d\n", val, th, t); + delay(1); // sleep 1 ms + } + #pragma omp task + { // low priority by default + int val, t = omp_get_thread_num(); + #pragma omp atomic capture + val = passed++; + printf("P0imp: val = %d, thread gen %d, thread exe %d\n", val, th, t); + delay(1); // sleep 1 ms + } + // signal creation of all tasks: passed = 5 + 1 = 6 + #pragma omp atomic + passed++; + // wait for completion of all 4 tasks + int wait = 0; + do { + delay(5); + #pragma omp atomic read + wait = passed; + } while (wait < 10); // passed = 6 + 4 = 10 + } else { + // wait for the primary thread to create all tasks + int wait = 0; + do { + delay(5); + #pragma omp atomic read + wait = passed; + } while (wait < 6); + // go execute 4 tasks created by primary thread + } + } + if (passed != 10) { + printf("failed, passed = %d (should be 10)\n", passed); + return 1; + } + printf("passed\n"); + return 0; +} +// CHECK: parallel 1 +// CHECK-NEXT: P2 +// CHECK-NEXT: P1 +// CHECK-NEXT: P0 +// CHECK-NEXT: P0 +// CHECK-NEXT: parallel 2 +// CHECK-NEXT: P2 +// CHECK-NEXT: P1 +// CHECK-NEXT: P0 +// CHECK-NEXT: P0 +// CHECK: passed