Index: clang-tools-extra/trunk/clangd/CMakeLists.txt =================================================================== --- clang-tools-extra/trunk/clangd/CMakeLists.txt +++ clang-tools-extra/trunk/clangd/CMakeLists.txt @@ -73,8 +73,9 @@ XRefs.cpp index/Background.cpp - index/BackgroundRebuild.cpp index/BackgroundIndexStorage.cpp + index/BackgroundQueue.cpp + index/BackgroundRebuild.cpp index/CanonicalIncludes.cpp index/FileIndex.cpp index/Index.cpp Index: clang-tools-extra/trunk/clangd/index/Background.h =================================================================== --- clang-tools-extra/trunk/clangd/index/Background.h +++ clang-tools-extra/trunk/clangd/index/Background.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -59,6 +60,47 @@ static Factory createDiskBackedStorageFactory(); }; +// A priority queue of tasks which can be run on (external) worker threads. +class BackgroundQueue { +public: + /// A work item on the thread pool's queue. + struct Task { + template + explicit Task(Func &&F) : Run(std::forward(F)){}; + + std::function Run; + llvm::ThreadPriority ThreadPri = llvm::ThreadPriority::Background; + // Higher-priority tasks will run first. + unsigned QueuePri = 0; + + bool operator<(const Task &O) const { return QueuePri < O.QueuePri; } + }; + + // Add tasks to the queue. + void push(Task); + void append(std::vector); + + // Process items on the queue until the queue is stopped. + // If the queue becomes empty, OnIdle will be called (on one worker). + void work(std::function OnIdle = nullptr); + + // Stop processing new tasks, allowing all work() calls to return soon. + void stop(); + + // Disables thread priority lowering to ensure progress on loaded systems. + // Only affects tasks that run after the call. + static void preventThreadStarvationInTests(); + LLVM_NODISCARD bool + blockUntilIdleForTest(llvm::Optional TimeoutSeconds); + +private: + std::mutex Mu; + unsigned NumActiveTasks = 0; // Only idle when queue is empty *and* no tasks. + std::condition_variable CV; + bool ShouldStop = false; + std::vector Queue; // max-heap +}; + // Builds an in-memory index by by running the static indexer action over // all commands in a compilation database. Indexing happens in the background. // FIXME: it should also persist its state on disk for fast start. @@ -78,19 +120,22 @@ // Enqueue translation units for indexing. // The indexing happens in a background thread, so the symbols will be // available sometime later. - void enqueue(const std::vector &ChangedFiles); + void enqueue(const std::vector &ChangedFiles) { + Queue.push(changedFilesTask(ChangedFiles)); + } // Cause background threads to stop after ther current task, any remaining // tasks will be discarded. - void stop(); + void stop() { + Rebuilder.shutdown(); + Queue.stop(); + } // Wait until the queue is empty, to allow deterministic testing. LLVM_NODISCARD bool - blockUntilIdleForTest(llvm::Optional TimeoutSeconds = 10); - - // Disables thread priority lowering in background index to make sure it can - // progress on loaded systems. Only affects tasks that run after the call. - static void preventThreadStarvationInTests(); + blockUntilIdleForTest(llvm::Optional TimeoutSeconds = 10) { + return Queue.blockUntilIdleForTest(TimeoutSeconds); + } private: /// Represents the state of a single file when indexing was performed. @@ -111,11 +156,8 @@ const GlobalCompilationDatabase &CDB; Context BackgroundContext; - // index state llvm::Error index(tooling::CompileCommand, BackgroundIndexStorage *IndexStorage); - std::mutex IndexMu; - std::condition_variable IndexCV; FileSymbols IndexedSymbols; BackgroundIndexRebuilder Rebuilder; @@ -137,19 +179,18 @@ // Tries to load shards for the ChangedFiles. std::vector> loadShards(std::vector ChangedFiles); - void enqueue(tooling::CompileCommand Cmd, BackgroundIndexStorage *Storage); - // queue management - using Task = std::function; - void run(); // Main loop executed by Thread. Runs tasks from Queue. - void enqueueTask(Task T, llvm::ThreadPriority Prioirty); - void enqueueLocked(tooling::CompileCommand Cmd, - BackgroundIndexStorage *IndexStorage); - std::mutex QueueMu; - unsigned NumActiveTasks = 0; // Only idle when queue is empty *and* no tasks. - std::condition_variable QueueCV; - bool ShouldStop = false; - std::deque> Queue; + BackgroundQueue::Task + changedFilesTask(const std::vector &ChangedFiles); + BackgroundQueue::Task indexFileTask(tooling::CompileCommand Cmd, + BackgroundIndexStorage *Storage); + + // from lowest to highest priority + enum QueuePriority { + IndexFile, + LoadShards, + }; + BackgroundQueue Queue; AsyncTaskRunner ThreadPool; GlobalCompilationDatabase::CommandChanged::Subscription CommandsChanged; }; Index: clang-tools-extra/trunk/clangd/index/Background.cpp =================================================================== --- clang-tools-extra/trunk/clangd/index/Background.cpp +++ clang-tools-extra/trunk/clangd/index/Background.cpp @@ -9,6 +9,7 @@ #include "index/Background.h" #include "ClangdUnit.h" #include "Compiler.h" +#include "Context.h" #include "Headers.h" #include "Logger.h" #include "Path.h" @@ -33,8 +34,10 @@ #include "llvm/ADT/StringRef.h" #include "llvm/ADT/StringSet.h" #include "llvm/Support/Error.h" +#include "llvm/Support/Path.h" #include "llvm/Support/Threading.h" +#include #include #include #include @@ -50,8 +53,6 @@ namespace clangd { namespace { -static std::atomic PreventStarvation = {false}; - // Resolves URI to file paths with cache. class URIToFileCache { public: @@ -134,8 +135,10 @@ assert(ThreadPoolSize > 0 && "Thread pool size can't be zero."); assert(this->IndexStorageFactory && "Storage factory can not be null!"); for (unsigned I = 0; I < ThreadPoolSize; ++I) { - ThreadPool.runAsync("background-worker-" + llvm::Twine(I + 1), - [this] { run(); }); + ThreadPool.runAsync("background-worker-" + llvm::Twine(I + 1), [this] { + WithContext Ctx(this->BackgroundContext.clone()); + Queue.work([&] { Rebuilder.idle(); }); + }); } } @@ -144,113 +147,42 @@ ThreadPool.wait(); } -void BackgroundIndex::stop() { - Rebuilder.shutdown(); - { - std::lock_guard QueueLock(QueueMu); - std::lock_guard IndexLock(IndexMu); - ShouldStop = true; - } - QueueCV.notify_all(); - IndexCV.notify_all(); -} - -void BackgroundIndex::run() { - WithContext Background(BackgroundContext.clone()); - while (true) { - llvm::Optional Task; - llvm::ThreadPriority Priority; - { - std::unique_lock Lock(QueueMu); - QueueCV.wait(Lock, [&] { return ShouldStop || !Queue.empty(); }); - if (ShouldStop) { - Queue.clear(); - QueueCV.notify_all(); - return; - } - ++NumActiveTasks; - std::tie(Task, Priority) = std::move(Queue.front()); - Queue.pop_front(); - } - - if (Priority != llvm::ThreadPriority::Default && !PreventStarvation.load()) - llvm::set_thread_priority(Priority); - (*Task)(); - if (Priority != llvm::ThreadPriority::Default) - llvm::set_thread_priority(llvm::ThreadPriority::Default); - - { - std::unique_lock Lock(QueueMu); - if (NumActiveTasks == 1 && Queue.empty()) { - // We just finished the last item, the queue is going idle. - Lock.unlock(); - Rebuilder.idle(); - Lock.lock(); - } - assert(NumActiveTasks > 0 && "before decrementing"); - --NumActiveTasks; - } - QueueCV.notify_all(); - } -} - -bool BackgroundIndex::blockUntilIdleForTest( - llvm::Optional TimeoutSeconds) { - std::unique_lock Lock(QueueMu); - return wait(Lock, QueueCV, timeoutSeconds(TimeoutSeconds), - [&] { return Queue.empty() && NumActiveTasks == 0; }); -} - -void BackgroundIndex::enqueue(const std::vector &ChangedFiles) { - enqueueTask( - [this, ChangedFiles] { - trace::Span Tracer("BackgroundIndexEnqueue"); - // We're doing this asynchronously, because we'll read shards here too. - log("Enqueueing {0} commands for indexing", ChangedFiles.size()); - SPAN_ATTACH(Tracer, "files", int64_t(ChangedFiles.size())); - - auto NeedsReIndexing = loadShards(std::move(ChangedFiles)); - // Run indexing for files that need to be updated. - std::shuffle(NeedsReIndexing.begin(), NeedsReIndexing.end(), - std::mt19937(std::random_device{}())); - for (auto &Elem : NeedsReIndexing) - enqueue(std::move(Elem.first), Elem.second); - }, - llvm::ThreadPriority::Default); -} - -void BackgroundIndex::enqueue(tooling::CompileCommand Cmd, - BackgroundIndexStorage *Storage) { - enqueueTask(Bind( - [this, Storage](tooling::CompileCommand Cmd) { - // We can't use llvm::StringRef here since we are going to - // move from Cmd during the call below. - const std::string FileName = Cmd.Filename; - if (auto Error = index(std::move(Cmd), Storage)) - elog("Indexing {0} failed: {1}", FileName, - std::move(Error)); - }, - std::move(Cmd)), - llvm::ThreadPriority::Background); -} - -void BackgroundIndex::enqueueTask(Task T, llvm::ThreadPriority Priority) { - { - std::lock_guard Lock(QueueMu); - auto I = Queue.end(); - // We first store the tasks with Normal priority in the front of the queue. - // Then we store low priority tasks. Normal priority tasks are pretty rare, - // they should not grow beyond single-digit numbers, so it is OK to do - // linear search and insert after that. - if (Priority == llvm::ThreadPriority::Default) { - I = llvm::find_if( - Queue, [](const std::pair &Elem) { - return Elem.second == llvm::ThreadPriority::Background; - }); - } - Queue.insert(I, {std::move(T), Priority}); - } - QueueCV.notify_all(); +BackgroundQueue::Task BackgroundIndex::changedFilesTask( + const std::vector &ChangedFiles) { + BackgroundQueue::Task T([this, ChangedFiles] { + trace::Span Tracer("BackgroundIndexEnqueue"); + // We're doing this asynchronously, because we'll read shards here too. + log("Enqueueing {0} commands for indexing", ChangedFiles.size()); + SPAN_ATTACH(Tracer, "files", int64_t(ChangedFiles.size())); + + auto NeedsReIndexing = loadShards(std::move(ChangedFiles)); + // Run indexing for files that need to be updated. + std::shuffle(NeedsReIndexing.begin(), NeedsReIndexing.end(), + std::mt19937(std::random_device{}())); + std::vector Tasks; + Tasks.reserve(NeedsReIndexing.size()); + for (auto &Elem : NeedsReIndexing) + Tasks.push_back(indexFileTask(std::move(Elem.first), Elem.second)); + Queue.append(std::move(Tasks)); + }); + + T.QueuePri = LoadShards; + T.ThreadPri = llvm::ThreadPriority::Default; + return T; +} + +BackgroundQueue::Task +BackgroundIndex::indexFileTask(tooling::CompileCommand Cmd, + BackgroundIndexStorage *Storage) { + BackgroundQueue::Task T([this, Storage, Cmd] { + // We can't use llvm::StringRef here since we are going to + // move from Cmd during the call below. + const std::string FileName = Cmd.Filename; + if (auto Error = index(std::move(Cmd), Storage)) + elog("Indexing {0} failed: {1}", FileName, std::move(Error)); + }); + T.QueuePri = IndexFile; + return T; } /// Given index results from a TU, only update symbols coming from files that @@ -649,9 +581,5 @@ return NeedsReIndexing; } -void BackgroundIndex::preventThreadStarvationInTests() { - PreventStarvation.store(true); -} - } // namespace clangd } // namespace clang Index: clang-tools-extra/trunk/clangd/index/BackgroundQueue.cpp =================================================================== --- clang-tools-extra/trunk/clangd/index/BackgroundQueue.cpp +++ clang-tools-extra/trunk/clangd/index/BackgroundQueue.cpp @@ -0,0 +1,93 @@ +//===-- BackgroundQueue.cpp - Task queue for background index -------------===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#include "index/Background.h" + +namespace clang { +namespace clangd { + +static std::atomic PreventStarvation = {false}; + +void BackgroundQueue::preventThreadStarvationInTests() { + PreventStarvation.store(true); +} + +void BackgroundQueue::work(std::function OnIdle) { + while (true) { + llvm::Optional Task; + { + std::unique_lock Lock(Mu); + CV.wait(Lock, [&] { return ShouldStop || !Queue.empty(); }); + if (ShouldStop) { + Queue.clear(); + CV.notify_all(); + return; + } + ++NumActiveTasks; + std::pop_heap(Queue.begin(), Queue.end()); + Task = std::move(Queue.back()); + Queue.pop_back(); + } + + if (Task->ThreadPri != llvm::ThreadPriority::Default && + !PreventStarvation.load()) + llvm::set_thread_priority(Task->ThreadPri); + Task->Run(); + if (Task->ThreadPri != llvm::ThreadPriority::Default) + llvm::set_thread_priority(llvm::ThreadPriority::Default); + + { + std::unique_lock Lock(Mu); + if (NumActiveTasks == 1 && Queue.empty() && OnIdle) { + // We just finished the last item, the queue is going idle. + Lock.unlock(); + OnIdle(); + Lock.lock(); + } + assert(NumActiveTasks > 0 && "before decrementing"); + --NumActiveTasks; + } + CV.notify_all(); + } +} + +void BackgroundQueue::stop() { + { + std::lock_guard QueueLock(Mu); + ShouldStop = true; + } + CV.notify_all(); +} + +void BackgroundQueue::push(Task T) { + { + std::lock_guard Lock(Mu); + Queue.push_back(std::move(T)); + std::push_heap(Queue.begin(), Queue.end()); + } + CV.notify_all(); +} + +void BackgroundQueue::append(std::vector Tasks) { + { + std::lock_guard Lock(Mu); + std::move(Tasks.begin(), Tasks.end(), std::back_inserter(Queue)); + std::make_heap(Queue.begin(), Queue.end()); + } + CV.notify_all(); +} + +bool BackgroundQueue::blockUntilIdleForTest( + llvm::Optional TimeoutSeconds) { + std::unique_lock Lock(Mu); + return wait(Lock, CV, timeoutSeconds(TimeoutSeconds), + [&] { return Queue.empty() && NumActiveTasks == 0; }); +} + +} // namespace clangd +} // namespace clang Index: clang-tools-extra/trunk/clangd/tool/ClangdMain.cpp =================================================================== --- clang-tools-extra/trunk/clangd/tool/ClangdMain.cpp +++ clang-tools-extra/trunk/clangd/tool/ClangdMain.cpp @@ -356,7 +356,7 @@ LogLevel = Logger::Verbose; PrettyPrint = true; // Ensure background index makes progress. - BackgroundIndex::preventThreadStarvationInTests(); + BackgroundQueue::preventThreadStarvationInTests(); } if (Test || EnableTestScheme) { static URISchemeRegistry::Add X( Index: clang-tools-extra/trunk/clangd/unittests/BackgroundIndexTests.cpp =================================================================== --- clang-tools-extra/trunk/clangd/unittests/BackgroundIndexTests.cpp +++ clang-tools-extra/trunk/clangd/unittests/BackgroundIndexTests.cpp @@ -82,7 +82,7 @@ class BackgroundIndexTest : public ::testing::Test { protected: - BackgroundIndexTest() { BackgroundIndex::preventThreadStarvationInTests(); } + BackgroundIndexTest() { BackgroundQueue::preventThreadStarvationInTests(); } }; TEST_F(BackgroundIndexTest, NoCrashOnErrorFile) { @@ -646,5 +646,37 @@ EXPECT_TRUE(checkRebuild([&] { Rebuilder.doneLoading(); })); } +TEST(BackgroundQueueTest, Priority) { + // Create high and low priority tasks. + // Once a bunch of high priority tasks have run, the queue is stopped. + // So the low priority tasks should never run. + BackgroundQueue Q; + std::atomic HiRan(0), LoRan(0); + BackgroundQueue::Task Lo([&] { ++LoRan; }); + BackgroundQueue::Task Hi([&] { + if (++HiRan >= 10) + Q.stop(); + }); + Hi.QueuePri = 100; + + // Enqueuing the low-priority ones first shouldn't make them run first. + Q.append(std::vector(30, Lo)); + for (unsigned I = 0; I < 30; ++I) + Q.push(Hi); + + AsyncTaskRunner ThreadPool; + for (unsigned I = 0; I < 5; ++I) + ThreadPool.runAsync("worker", [&] { Q.work(); }); + // We should test enqueue with active workers, but it's hard to avoid races. + // Just make sure we don't crash. + Q.push(Lo); + Q.append(std::vector(2, Hi)); + + // After finishing, check the tasks that ran. + ThreadPool.wait(); + EXPECT_GE(HiRan, 10u); + EXPECT_EQ(LoRan, 0u); +} + } // namespace clangd } // namespace clang