diff --git a/llvm/include/llvm/Support/Parallel.h b/llvm/include/llvm/Support/Parallel.h --- a/llvm/include/llvm/Support/Parallel.h +++ b/llvm/include/llvm/Support/Parallel.h @@ -11,6 +11,7 @@ #include "llvm/ADT/STLExtras.h" #include "llvm/Config/llvm-config.h" +#include "llvm/Support/Error.h" #include "llvm/Support/MathExtras.h" #include "llvm/Support/Threading.h" @@ -120,13 +121,17 @@ llvm::Log2_64(std::distance(Start, End)) + 1); } +// TaskGroup has a relatively high overhead, so we want to reduce +// the number of spawn() calls. We'll create up to 1024 tasks here. +// (Note that 1024 is an arbitrary number. This code probably needs +// improving to take the number of available cores into account.) +enum { MaxTasksPerGroup = 1024 }; + template void parallel_for_each(IterTy Begin, IterTy End, FuncTy Fn) { - // TaskGroup has a relatively high overhead, so we want to reduce - // the number of spawn() calls. We'll create up to 1024 tasks here. - // (Note that 1024 is an arbitrary number. This code probably needs - // improving to take the number of available cores into account.) - ptrdiff_t TaskSize = std::distance(Begin, End) / 1024; + // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling + // overhead on large inputs. + ptrdiff_t TaskSize = std::distance(Begin, End) / MaxTasksPerGroup; if (TaskSize == 0) TaskSize = 1; @@ -140,7 +145,9 @@ template void parallel_for_each_n(IndexTy Begin, IndexTy End, FuncTy Fn) { - ptrdiff_t TaskSize = (End - Begin) / 1024; + // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling + // overhead on large inputs. + ptrdiff_t TaskSize = (End - Begin) / MaxTasksPerGroup; if (TaskSize == 0) TaskSize = 1; @@ -156,6 +163,50 @@ Fn(J); } +template +ResultTy parallel_transform_reduce(IterTy Begin, IterTy End, ResultTy Init, + ReduceFuncTy Reduce, + TransformFuncTy Transform) { + // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling + // overhead on large inputs. + size_t NumInputs = std::distance(Begin, End); + if (NumInputs == 0) + return std::move(Init); + size_t NumTasks = std::min(static_cast(MaxTasksPerGroup), NumInputs); + std::vector Results(NumTasks, Init); + { + // Each task processes either TaskSize or TaskSize+1 inputs. Any inputs + // remaining after dividing them equally amongst tasks are distributed as + // one extra input over the first tasks. + TaskGroup TG; + size_t TaskSize = NumInputs / NumTasks; + size_t RemainingInputs = NumInputs % NumTasks; + IterTy TBegin = Begin; + for (size_t TaskId = 0; TaskId < NumTasks; ++TaskId) { + IterTy TEnd = TBegin + TaskSize + (TaskId < RemainingInputs ? 1 : 0); + TG.spawn([=, &Transform, &Reduce, &Results] { + // Reduce the result of transformation eagerly within each task. + ResultTy R = Init; + for (IterTy It = TBegin; It != TEnd; ++It) + R = Reduce(R, Transform(*It)); + Results[TaskId] = R; + }); + TBegin = TEnd; + } + assert(TBegin == End); + } + + // Do a final reduction. There are at most 1024 tasks, so this only adds + // constant single-threaded overhead for large inputs. Hopefully most + // reductions are cheaper than the transformation. + ResultTy FinalResult = std::move(Results.front()); + for (ResultTy &PartialResult : + makeMutableArrayRef(Results.data() + 1, Results.size() - 1)) + FinalResult = Reduce(FinalResult, std::move(PartialResult)); + return std::move(FinalResult); +} + #endif } // namespace detail @@ -198,6 +249,22 @@ Fn(I); } +template +ResultTy parallelTransformReduce(IterTy Begin, IterTy End, ResultTy Init, + ReduceFuncTy Reduce, + TransformFuncTy Transform) { +#if LLVM_ENABLE_THREADS + if (parallel::strategy.ThreadsRequested != 1) { + return parallel::detail::parallel_transform_reduce(Begin, End, Init, Reduce, + Transform); + } +#endif + for (IterTy I = Begin; I != End; ++I) + Init = Reduce(std::move(Init), Transform(*I)); + return std::move(Init); +} + // Range wrappers. template > @@ -210,6 +277,31 @@ parallelForEach(std::begin(R), std::end(R), Fn); } +template +ResultTy parallelTransformReduce(RangeTy &&R, ResultTy Init, + ReduceFuncTy Reduce, + TransformFuncTy Transform) { + return parallelTransformReduce(std::begin(R), std::end(R), Init, Reduce, + Transform); +} + +// Parallel for-each, but with error handling. +template +Error parallelForEachError(RangeTy &&R, FuncTy Fn) { + // The transform_reduce algorithm requires that the initial value be copyable. + // Error objects are uncopyable. We only need to copy initial success values, + // so work around this mismatch via the C API. The C API represents success + // values with a null pointer. The joinErrors discards null values and joins + // multiple errors into an ErrorList. + return unwrap(parallelTransformReduce( + std::begin(R), std::end(R), wrap(Error::success()), + [](LLVMErrorRef Lhs, LLVMErrorRef Rhs) { + return wrap(joinErrors(unwrap(Lhs), unwrap(Rhs))); + }, + [&Fn](auto &&V) { return wrap(Fn(V)); })); +} + } // namespace llvm #endif // LLVM_SUPPORT_PARALLEL_H diff --git a/llvm/unittests/Support/ParallelTest.cpp b/llvm/unittests/Support/ParallelTest.cpp --- a/llvm/unittests/Support/ParallelTest.cpp +++ b/llvm/unittests/Support/ParallelTest.cpp @@ -49,4 +49,47 @@ ASSERT_EQ(range[2049], 1u); } +TEST(Parallel, TransformReduce) { + // Sum an empty list, check that it works. + auto identity = [](uint32_t v) { return v; }; + uint32_t sum = parallelTransformReduce(ArrayRef(), 0U, + std::plus(), identity); + EXPECT_EQ(sum, 0U); + + // Sum the lengths of these strings in parallel. + const char *strs[] = {"a", "ab", "abc", "abcd", "abcde", "abcdef"}; + size_t lenSum = + parallelTransformReduce(strs, static_cast(0), std::plus(), + [](const char *s) { return strlen(s); }); + EXPECT_EQ(lenSum, static_cast(21)); + + // Check that we handle non-divisible task sizes as above. + uint32_t range[2050]; + std::fill(std::begin(range), std::end(range), 1); + sum = parallelTransformReduce(range, 0U, std::plus(), identity); + EXPECT_EQ(sum, 2050U); + + std::fill(std::begin(range), std::end(range), 2); + sum = parallelTransformReduce(range, 0U, std::plus(), identity); + EXPECT_EQ(sum, 4100U); + + // Avoid one large task. + uint32_t range2[3060]; + std::fill(std::begin(range2), std::end(range2), 1); + sum = parallelTransformReduce(range2, 0U, std::plus(), identity); + EXPECT_EQ(sum, 3060U); +} + +TEST(Parallel, ForEachError) { + int nums[] = {1, 2, 3, 4, 5, 6}; + Error e = parallelForEachError(nums, [](int v) -> Error { + if ((v & 1) == 0) + return createStringError(std::errc::invalid_argument, "asdf"); + return Error::success(); + }); + EXPECT_TRUE(e.isA()); + std::string errText = toString(std::move(e)); + EXPECT_EQ(errText, std::string("asdf\nasdf\nasdf")); +} + #endif