diff --git a/lld/ELF/OutputSections.cpp b/lld/ELF/OutputSections.cpp --- a/lld/ELF/OutputSections.cpp +++ b/lld/ELF/OutputSections.cpp @@ -534,7 +534,7 @@ taskSize += sections[i]->getSize(); bool done = ++i == numSections; if (done || taskSize >= taskSizeLimit) { - tg.execute([=] { fn(begin, i); }); + tg.spawn([=] { fn(begin, i); }); if (done) break; begin = i; diff --git a/lld/ELF/Relocations.cpp b/lld/ELF/Relocations.cpp --- a/lld/ELF/Relocations.cpp +++ b/lld/ELF/Relocations.cpp @@ -1534,16 +1534,13 @@ scanner.template scanSection(*s); } }; - if (serial) - fn(); - else - tg.execute(fn); + tg.spawn(fn, serial); } // Both the main thread and thread pool index 0 use getThreadIndex()==0. Be // careful that they don't concurrently run scanSections. When serial is // true, fn() has finished at this point, so running execute is safe. - tg.execute([] { + tg.spawn([] { RelocationScanner scanner; for (Partition &part : partitions) { for (EhInputSection *sec : part.ehFrame->sections) diff --git a/llvm/include/llvm/Support/Parallel.h b/llvm/include/llvm/Support/Parallel.h --- a/llvm/include/llvm/Support/Parallel.h +++ b/llvm/include/llvm/Support/Parallel.h @@ -84,24 +84,11 @@ ~TaskGroup(); // Spawn a task, but does not wait for it to finish. - void spawn(std::function f); - - // Similar to spawn, but execute the task immediately when ThreadsRequested == - // 1. The difference is to give the following pattern a more intuitive order - // when single threading is requested. - // - // for (size_t begin = 0, i = 0, taskSize = 0;;) { - // taskSize += ... - // bool done = ++i == end; - // if (done || taskSize >= taskSizeLimit) { - // tg.execute([=] { fn(begin, i); }); - // if (done) - // break; - // begin = i; - // taskSize = 0; - // } - // } - void execute(std::function f); + // Tasks marked with \p Sequential will be executed + // exactly in the order which they were spawned. + // Note: Sequential tasks may be executed on different + // threads, but strictly in sequential order. + void spawn(std::function f, bool Sequential = false); void sync() const { L.sync(); } }; diff --git a/llvm/lib/Support/Parallel.cpp b/llvm/lib/Support/Parallel.cpp --- a/llvm/lib/Support/Parallel.cpp +++ b/llvm/lib/Support/Parallel.cpp @@ -12,8 +12,8 @@ #include "llvm/Support/Threading.h" #include +#include #include -#include #include #include @@ -39,7 +39,7 @@ class Executor { public: virtual ~Executor() = default; - virtual void add(std::function func) = 0; + virtual void add(std::function func, bool Sequential = false) = 0; static Executor *getDefaultExecutor(); }; @@ -97,32 +97,56 @@ static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } }; - void add(std::function F) override { + void add(std::function F, bool Sequential = false) override { { + bool UseSequentialQueue = + Sequential || parallel::strategy.ThreadsRequested == 1; std::lock_guard Lock(Mutex); - WorkStack.push(std::move(F)); + if (UseSequentialQueue) + WorkQueueSequential.emplace_front(std::move(F)); + else + WorkQueue.emplace_back(std::move(F)); } Cond.notify_one(); } private: + bool hasSequentialTasks() const { + return !WorkQueueSequential.empty() && !SequentialQueueIsLocked; + } + + bool hasGeneralTasks() const { return !WorkQueue.empty(); } + void work(ThreadPoolStrategy S, unsigned ThreadID) { threadIndex = ThreadID; S.apply_thread_strategy(ThreadID); while (true) { std::unique_lock Lock(Mutex); - Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); + Cond.wait(Lock, [&] { + return Stop || hasGeneralTasks() || hasSequentialTasks(); + }); if (Stop) break; - auto Task = std::move(WorkStack.top()); - WorkStack.pop(); + bool Sequential = hasSequentialTasks(); + if (Sequential) + SequentialQueueIsLocked = true; + else + assert(hasGeneralTasks()); + + auto &Queue = Sequential ? WorkQueueSequential : WorkQueue; + auto Task = std::move(Queue.back()); + Queue.pop_back(); Lock.unlock(); Task(); + if (Sequential) + SequentialQueueIsLocked = false; } } std::atomic Stop{false}; - std::stack> WorkStack; + std::atomic SequentialQueueIsLocked{false}; + std::deque> WorkQueue; + std::deque> WorkQueueSequential; std::mutex Mutex; std::condition_variable Cond; std::promise ThreadsCreated; @@ -172,26 +196,22 @@ --TaskGroupInstances; } -void TaskGroup::spawn(std::function F) { +void TaskGroup::spawn(std::function F, bool Sequential) { #if LLVM_ENABLE_THREADS if (Parallel) { L.inc(); - detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] { - F(); - L.dec(); - }); + detail::Executor::getDefaultExecutor()->add( + [&, F = std::move(F)] { + F(); + L.dec(); + }, + Sequential); return; } #endif F(); } -void TaskGroup::execute(std::function F) { - if (parallel::strategy.ThreadsRequested == 1) - F(); - else - spawn(F); -} } // namespace parallel } // namespace llvm diff --git a/llvm/unittests/Support/ParallelTest.cpp b/llvm/unittests/Support/ParallelTest.cpp --- a/llvm/unittests/Support/ParallelTest.cpp +++ b/llvm/unittests/Support/ParallelTest.cpp @@ -92,4 +92,14 @@ EXPECT_EQ(errText, std::string("asdf\nasdf\nasdf")); } +TEST(Parallel, TaskGroupSequentialFor) { + size_t Count = 0; + { + parallel::TaskGroup tg; + for (size_t Idx = 0; Idx < 500; Idx++) + tg.spawn([&Count, Idx]() { EXPECT_EQ(Count++, Idx); }, true); + } + EXPECT_EQ(Count, 500ul); +} + #endif