Index: clangd/CMakeLists.txt =================================================================== --- clangd/CMakeLists.txt +++ clangd/CMakeLists.txt @@ -6,7 +6,6 @@ ClangdLSPServer.cpp ClangdServer.cpp ClangdUnit.cpp - ClangdUnitStore.cpp CodeComplete.cpp CodeCompletionStrings.cpp CompileArgsCache.cpp Index: clangd/ClangdServer.h =================================================================== --- clangd/ClangdServer.h +++ clangd/ClangdServer.h @@ -11,7 +11,6 @@ #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDSERVER_H #include "ClangdUnit.h" -#include "ClangdUnitStore.h" #include "CodeComplete.h" #include "CompileArgsCache.h" #include "DraftStore.h" Index: clangd/ClangdUnit.h =================================================================== --- clangd/ClangdUnit.h +++ clangd/ClangdUnit.h @@ -151,6 +151,8 @@ /// Manages resources, required by clangd. Allows to rebuild file with new /// contents, and provides AST and Preamble for it. +/// NOTE: Threading-related bits of CppFile are now deprecated and will be +/// removed soon. class CppFile : public std::enable_shared_from_this { public: // We only allow to create CppFile as shared_ptr, because a future returned by @@ -178,6 +180,7 @@ /// that will wait for any ongoing rebuilds to finish and actually set the AST /// and Preamble to nulls. It can be run on a different thread. This function /// is useful to cancel ongoing rebuilds, if any, before removing CppFile. + /// DEPRECATED. This function will be removed soon, please do not use it. UniqueFunction deferCancelRebuild(); /// Rebuild AST and Preamble synchronously on the calling thread. @@ -200,6 +203,7 @@ /// The future to finish rebuild returns a list of diagnostics built during /// reparse, or None, if another deferRebuild was called before this /// rebuild was finished. + /// DEPRECATED. This function will be removed soon, please do not use it. UniqueFunction>()> deferRebuild(ParseInputs &&Inputs); Index: clangd/ClangdUnitStore.h =================================================================== --- clangd/ClangdUnitStore.h +++ /dev/null @@ -1,73 +0,0 @@ -//===--- ClangdUnitStore.h - A container of CppFiles -------------*-C++-*-===// -// -// The LLVM Compiler Infrastructure -// -// This file is distributed under the University of Illinois Open Source -// License. See LICENSE.TXT for details. -// -//===---------------------------------------------------------------------===// - -#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H -#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H - -#include "ClangdUnit.h" -#include "GlobalCompilationDatabase.h" -#include "Logger.h" -#include "Path.h" -#include "clang/Tooling/CompilationDatabase.h" -#include - -namespace clang { -namespace clangd { - -class Logger; - -/// Thread-safe mapping from FileNames to CppFile. -class CppFileCollection { -public: - /// \p ASTCallback is called when a file is parsed synchronously. This should - /// not be expensive since it blocks diagnostics. - explicit CppFileCollection(bool StorePreamblesInMemory, - std::shared_ptr PCHs, - ASTParsedCallback ASTCallback) - : ASTCallback(std::move(ASTCallback)), PCHs(std::move(PCHs)), - StorePreamblesInMemory(StorePreamblesInMemory) {} - - std::shared_ptr getOrCreateFile(PathRef File) { - std::lock_guard Lock(Mutex); - auto It = OpenedFiles.find(File); - if (It == OpenedFiles.end()) { - It = OpenedFiles - .try_emplace(File, CppFile::Create(File, StorePreamblesInMemory, - PCHs, ASTCallback)) - .first; - } - return It->second; - } - - std::shared_ptr getFile(PathRef File) const { - std::lock_guard Lock(Mutex); - auto It = OpenedFiles.find(File); - if (It == OpenedFiles.end()) - return nullptr; - return It->second; - } - - /// Removes a CppFile, stored for \p File, if it's inside collection and - /// returns it. - std::shared_ptr removeIfPresent(PathRef File); - - /// Gets used memory for each of the stored files. - std::vector> getUsedBytesPerFile() const; - -private: - mutable std::mutex Mutex; - llvm::StringMap> OpenedFiles; - ASTParsedCallback ASTCallback; - std::shared_ptr PCHs; - bool StorePreamblesInMemory; -}; -} // namespace clangd -} // namespace clang - -#endif Index: clangd/ClangdUnitStore.cpp =================================================================== --- clangd/ClangdUnitStore.cpp +++ /dev/null @@ -1,37 +0,0 @@ -//===--- ClangdUnitStore.cpp - A ClangdUnits container -----------*-C++-*-===// -// -// The LLVM Compiler Infrastructure -// -// This file is distributed under the University of Illinois Open Source -// License. See LICENSE.TXT for details. -// -//===----------------------------------------------------------------------===// - -#include "ClangdUnitStore.h" -#include "llvm/Support/Path.h" -#include - -using namespace clang::clangd; -using namespace clang; - -std::shared_ptr CppFileCollection::removeIfPresent(PathRef File) { - std::lock_guard Lock(Mutex); - - auto It = OpenedFiles.find(File); - if (It == OpenedFiles.end()) - return nullptr; - - std::shared_ptr Result = It->second; - OpenedFiles.erase(It); - return Result; -} -std::vector> -CppFileCollection::getUsedBytesPerFile() const { - std::lock_guard Lock(Mutex); - std::vector> Result; - Result.reserve(OpenedFiles.size()); - for (auto &&PathAndFile : OpenedFiles) - Result.push_back( - {PathAndFile.first().str(), PathAndFile.second->getUsedBytes()}); - return Result; -} Index: clangd/TUScheduler.h =================================================================== --- clangd/TUScheduler.h +++ clangd/TUScheduler.h @@ -11,9 +11,9 @@ #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_TUSCHEDULER_H #include "ClangdUnit.h" -#include "ClangdUnitStore.h" #include "Function.h" #include "Threading.h" +#include "llvm/ADT/StringMap.h" namespace clang { namespace clangd { @@ -42,6 +42,7 @@ public: TUScheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory, ASTParsedCallback ASTCallback); + ~TUScheduler(); /// Returns estimated memory usage for each of the currently open files. /// The order of results is unspecified. @@ -81,11 +82,17 @@ UniqueFunction)> Action); private: - const ParseInputs &getInputs(PathRef File); + /// This class stores per-file data in the Files map. + struct FileData; - llvm::StringMap CachedInputs; - CppFileCollection Files; - ThreadPool Threads; + const bool StorePreamblesInMemory; + const std::shared_ptr PCHOps; + const ASTParsedCallback ASTCallback; + Semaphore Barrier; + llvm::StringMap> Files; + // None when running tasks synchronously and non-None when running tasks + // asynchronously. + llvm::Optional Tasks; }; } // namespace clangd } // namespace clang Index: clangd/TUScheduler.cpp =================================================================== --- clangd/TUScheduler.cpp +++ clangd/TUScheduler.cpp @@ -1,9 +1,303 @@ +//===--- TUScheduler.cpp -----------------------------------------*-C++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// For each file, managed by TUScheduler, we create a single ASTWorker that +// manages an AST for that file. All operations that modify or read the AST are +// run on a separate dedicated thread asynchronously in FIFO order. +// +// We start processing each update immediately after we receive it. If two or +// more updates come subsequently without reads in-between, we attempt to drop +// an older one to not waste time building the ASTs we don't need. +// +// The processing thread of the ASTWorker is also responsible for building the +// preamble. However, unlike AST, the same preamble can be read concurrently, so +// we run each of async preamble reads on its own thread. +// +// To limit the concurrent load that clangd produces we mantain a semaphore that +// keeps more than a fixed number of threads from running concurrently. +// +// Rationale for cancelling updates. +// LSP clients can send updates to clangd on each keystroke. Some files take +// significant time to parse (e.g. a few seconds) and clangd can get starved by +// the updates to those files. Therefore we try to process only the last update, +// if possible. +// Our current strategy to do that is the following: +// - For each update we immediately schedule rebuild of the AST. +// - Rebuild of the AST checks if it was cancelled before doing any actual work. +// If it was, it does not do an actual rebuild, only reports llvm::None to the +// callback +// - When adding an update, we cancel the last update in the queue if it didn't +// have any reads. +// There is probably a optimal ways to do that. One approach we might take is +// the following: +// - For each update we remember the pending inputs, but delay rebuild of the +// AST for some timeout. +// - If subsequent updates come before rebuild was started, we replace the +// pending inputs and reset the timer. +// - If any reads of the AST are scheduled, we start building the AST +// immediately. + #include "TUScheduler.h" #include "clang/Frontend/PCHContainerOperations.h" #include "llvm/Support/Errc.h" +#include +#include namespace clang { namespace clangd { +namespace { +class ASTWorkerHandle; + +/// Owns one instance of the AST, schedules updates and reads of it. +/// Also responsible for building and providing access to the preamble. +/// Each ASTWorker processes the async requests sent to it on a separate +/// dedicated thread. +/// The ASTWorker that manages the AST is shared by both the processing thread +/// and the TUScheduler. The TUScheduler should discard an ASTWorker when +/// remove() is called, but its thread may be busy and we don't want to block. +/// So the workers are accessed via an ASTWorkerHandle. Destroying the handle +/// signals the worker to exit its run loop and gives up shared ownership of the +/// worker. +class ASTWorker { + friend class ASTWorkerHandle; + ASTWorker(Semaphore &Barrier, std::shared_ptr AST, bool RunSync); + +public: + /// Create a new ASTWorker and return a handle to it. + /// The processing thread is spawned using \p Tasks. However, when \p Tasks + /// is null, all requests will be processed on the calling thread + /// synchronously instead. \p Barrier is acquired when processing each + /// request, it is be used to limit the number of actively running threads. + static ASTWorkerHandle Create(AsyncTaskRunner *Tasks, Semaphore &Barrier, + std::shared_ptr AST); + ~ASTWorker(); + + void update(ParseInputs Inputs, + UniqueFunction>)> + OnUpdated); + void runWithAST(UniqueFunction)> Action); + + std::shared_ptr getPossiblyStalePreamble() const; + std::size_t getUsedBytes() const; + +private: + // Must be called exactly once on processing thread. Will return after + // stop() is called on a separate thread and all pending requests are + // processed. + void run(); + /// Signal that run() should finish processing pending requests and exit. + void stop(); + /// Adds a new task to the end of the request queue. + void startTask(UniqueFunction Task, bool isUpdate, + llvm::Optional CF); + + using RequestWithCtx = std::pair, Context>; + + const bool RunSync; + Semaphore &Barrier; + // AST and FileInputs are only accessed on the processing thread from run(). + const std::shared_ptr AST; + // Inputs, corresponding to the current state of AST. + ParseInputs FileInputs; + // Guards members used by both TUScheduler and the worker thread. + mutable std::mutex Mutex; + // Set to true to signal run() to finish processing. + bool Done; /* GUARDED_BY(Mutex) */ + std::queue Requests; /* GUARDED_BY(Mutex) */ + // Only set when last request is an update. This allows us to cancel an update + // that was never read, if a subsequent update comes in. + llvm::Optional LastUpdateCF; /* GUARDED_BY(Mutex) */ + std::condition_variable RequestsCV; +}; + +/// A smart-pointer-like class that points to an active ASTWorker. +/// In destructor, signals to the underlying ASTWorker that no new requests will +/// be sent and the processing loop may exit (after running all pending +/// requests). +class ASTWorkerHandle { + friend class ASTWorker; + ASTWorkerHandle(std::shared_ptr Worker) + : Worker(std::move(Worker)) { + assert(this->Worker); + } + +public: + ASTWorkerHandle(const ASTWorkerHandle &) = delete; + ASTWorkerHandle &operator=(const ASTWorkerHandle &) = delete; + ASTWorkerHandle(ASTWorkerHandle &&) = default; + ASTWorkerHandle &operator=(ASTWorkerHandle &&) = default; + + ~ASTWorkerHandle() { + if (Worker) + Worker->stop(); + } + + ASTWorker &operator*() { + assert(Worker && "Handle was moved from"); + return *Worker; + } + + ASTWorker *operator->() { + assert(Worker && "Handle was moved from"); + return Worker.get(); + } + + /// Returns an owning reference to the underlying ASTWorker that can outlive + /// the ASTWorkerHandle. However, no new requests to an active ASTWorker can + /// be schedule via the returned reference, i.e. only reads of the preamble + /// are possible. + std::shared_ptr lock() { return Worker; } + +private: + std::shared_ptr Worker; +}; + +ASTWorkerHandle ASTWorker::Create(AsyncTaskRunner *Tasks, Semaphore &Barrier, + std::shared_ptr AST) { + std::shared_ptr Worker( + new ASTWorker(Barrier, std::move(AST), /*RunSync=*/!Tasks)); + if (Tasks) + Tasks->runAsync([Worker]() { Worker->run(); }); + + return ASTWorkerHandle(std::move(Worker)); +} + +ASTWorker::ASTWorker(Semaphore &Barrier, std::shared_ptr AST, + bool RunSync) + : RunSync(RunSync), Barrier(Barrier), AST(std::move(AST)), Done(false) { + if (RunSync) + return; +} + +ASTWorker::~ASTWorker() { +#ifndef NDEBUG + std::lock_guard Lock(Mutex); + assert(Done && "handle was not destroyed"); + assert(Requests.empty() && "unprocessed requests when destroying ASTWorker"); +#endif +} + +void ASTWorker::update( + ParseInputs Inputs, + UniqueFunction>)> + OnUpdated) { + auto Task = [=](CancellationFlag CF, decltype(OnUpdated) OnUpdated) mutable { + if (CF.isCancelled()) { + OnUpdated(llvm::None); + return; + } + FileInputs = Inputs; + auto Diags = AST->rebuild(std::move(Inputs)); + // We want to report the diagnostics even if this update was cancelled. + // It seems more useful than making the clients wait indefinitely if they + // spam us with updates. + OnUpdated(std::move(Diags)); + }; + + CancellationFlag UpdateCF; + startTask(BindWithForward(Task, UpdateCF, std::move(OnUpdated)), + /*isUpdate=*/true, UpdateCF); +} + +void ASTWorker::runWithAST( + UniqueFunction)> Action) { + auto Task = [=](decltype(Action) Action) { + auto ASTWrapper = this->AST->getAST().get(); + // FIXME: no need to lock here, cleanup the CppFile interface to get rid of + // them. + ASTWrapper->runUnderLock([&](ParsedAST *AST) { + if (!AST) { + Action(llvm::make_error( + "invalid AST", llvm::errc::invalid_argument)); + return; + } + Action(InputsAndAST{FileInputs, *AST}); + }); + }; + + startTask(BindWithForward(Task, std::move(Action)), /*isUpdate=*/false, + llvm::None); +} + +std::shared_ptr +ASTWorker::getPossiblyStalePreamble() const { + return AST->getPossiblyStalePreamble(); +} + +std::size_t ASTWorker::getUsedBytes() const { + // FIXME(ibiryukov): we'll need to take locks here after we remove + // thread-safety from CppFile. For now, CppFile is thread-safe and we can + // safely call methods on it without acquiring a lock. + return AST->getUsedBytes(); +} + +void ASTWorker::stop() { + { + std::lock_guard Lock(Mutex); + assert(!Done && "stop() called twice"); + Done = true; + } + RequestsCV.notify_one(); +} + +void ASTWorker::startTask(UniqueFunction Task, bool isUpdate, + llvm::Optional CF) { + assert(isUpdate == CF.hasValue() && + "Only updates are expected to pass CancellationFlag"); + + if (RunSync) { + assert(!Done && "running a task after stop()"); + Task(); + return; + } + + { + std::lock_guard Lock(Mutex); + assert(!Done && "running a task after stop()"); + if (isUpdate) { + if (!Requests.empty() && LastUpdateCF) { + // There were no reads for the last unprocessed update, let's cancel it + // to not waste time on it. + LastUpdateCF->cancel(); + } + LastUpdateCF = std::move(*CF); + } else { + LastUpdateCF = llvm::None; + } + Requests.emplace(std::move(Task), Context::current().clone()); + } // unlock Mutex. + RequestsCV.notify_one(); +} + +void ASTWorker::run() { + while (true) { + RequestWithCtx Req; + { + std::unique_lock Lock(Mutex); + RequestsCV.wait(Lock, [&]() { return Done || !Requests.empty(); }); + if (Requests.empty()) { + assert(Done); + return; + } + // Even when Done is true, we finish processing all pending requests + // before exiting the processing loop. + + Req = std::move(Requests.front()); + Requests.pop(); + } // unlock Mutex + + std::lock_guard BarrierLock(Barrier); + WithContext Guard(std::move(Req.second)); + Req.first(); + } +} +} // namespace + unsigned getDefaultAsyncThreadsCount() { unsigned HardwareConcurrency = std::thread::hardware_concurrency(); // C++ standard says that hardware_concurrency() @@ -14,110 +308,114 @@ return HardwareConcurrency; } +struct TUScheduler::FileData { + /// Latest inputs, passed to TUScheduler::update(). + ParseInputs Inputs; + ASTWorkerHandle Worker; +}; + TUScheduler::TUScheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory, ASTParsedCallback ASTCallback) + : StorePreamblesInMemory(StorePreamblesInMemory), + PCHOps(std::make_shared()), + ASTCallback(std::move(ASTCallback)), Barrier(AsyncThreadsCount) { + if (0 < AsyncThreadsCount) + Tasks.emplace(); +} + +TUScheduler::~TUScheduler() { + // Notify all workers that they need to stop. + Files.clear(); - : Files(StorePreamblesInMemory, std::make_shared(), - std::move(ASTCallback)), - Threads(AsyncThreadsCount) {} + // Wait for all in-flight tasks to finish. + if (Tasks) + Tasks->waitForAll(); +} void TUScheduler::update( PathRef File, ParseInputs Inputs, UniqueFunction>)> OnUpdated) { - CachedInputs[File] = Inputs; - - auto Resources = Files.getOrCreateFile(File); - auto DeferredRebuild = Resources->deferRebuild(std::move(Inputs)); - - Threads.addToFront( - [](decltype(OnUpdated) OnUpdated, - decltype(DeferredRebuild) DeferredRebuild) { - auto Diags = DeferredRebuild(); - OnUpdated(Diags); - }, - std::move(OnUpdated), std::move(DeferredRebuild)); + std::unique_ptr &FD = Files[File]; + if (!FD) { + // Create a new worker to process the AST-related tasks. + ASTWorkerHandle Worker = ASTWorker::Create( + Tasks ? Tasks.getPointer() : nullptr, Barrier, + CppFile::Create(File, StorePreamblesInMemory, PCHOps, ASTCallback)); + FD = std::unique_ptr(new FileData{Inputs, std::move(Worker)}); + } else { + FD->Inputs = Inputs; + } + FD->Worker->update(std::move(Inputs), std::move(OnUpdated)); } void TUScheduler::remove(PathRef File, UniqueFunction Action) { - CachedInputs.erase(File); - - auto Resources = Files.removeIfPresent(File); - if (!Resources) { + auto It = Files.find(File); + if (It == Files.end()) { Action(llvm::make_error( "trying to remove non-added document", llvm::errc::invalid_argument)); return; } - - auto DeferredCancel = Resources->deferCancelRebuild(); - Threads.addToFront( - [](decltype(Action) Action, decltype(DeferredCancel) DeferredCancel) { - DeferredCancel(); - Action(llvm::Error::success()); - }, - std::move(Action), std::move(DeferredCancel)); + Files.erase(It); } void TUScheduler::runWithAST( PathRef File, UniqueFunction)> Action) { - auto Resources = Files.getFile(File); - if (!Resources) { + auto It = Files.find(File); + if (It == Files.end()) { Action(llvm::make_error( "trying to get AST for non-added document", llvm::errc::invalid_argument)); return; } - const ParseInputs &Inputs = getInputs(File); - // We currently block the calling thread until AST is available and run the - // action on the calling thread to avoid inconsistent states coming from - // subsequent updates. - // FIXME(ibiryukov): this should be moved to the worker threads. - Resources->getAST().get()->runUnderLock([&](ParsedAST *AST) { - if (AST) - Action(InputsAndAST{Inputs, *AST}); - else - Action(llvm::make_error( - "Could not build AST for the latest file update", - llvm::errc::invalid_argument)); - }); + It->second->Worker->runWithAST(std::move(Action)); } void TUScheduler::runWithPreamble( PathRef File, UniqueFunction)> Action) { - std::shared_ptr Resources = Files.getFile(File); - if (!Resources) { + auto It = Files.find(File); + if (It == Files.end()) { Action(llvm::make_error( "trying to get preamble for non-added document", llvm::errc::invalid_argument)); return; } - const ParseInputs &Inputs = getInputs(File); - std::shared_ptr Preamble = - Resources->getPossiblyStalePreamble(); - Threads.addToFront( - [Resources, Preamble, Inputs](decltype(Action) Action) mutable { - if (!Preamble) - Preamble = Resources->getPossiblyStalePreamble(); + if (!Tasks) { + std::shared_ptr Preamble = + It->second->Worker->getPossiblyStalePreamble(); + Action(InputsAndPreamble{It->second->Inputs, Preamble.get()}); + return; + } - Action(InputsAndPreamble{Inputs, Preamble.get()}); - }, - std::move(Action)); -} + ParseInputs InputsCopy = It->second->Inputs; + std::shared_ptr Worker = It->second->Worker.lock(); + auto Task = [InputsCopy, Worker, this](Context Ctx, + decltype(Action) Action) mutable { + std::lock_guard BarrierLock(Barrier); + WithContext Guard(std::move(Ctx)); + std::shared_ptr Preamble = + Worker->getPossiblyStalePreamble(); + Action(InputsAndPreamble{InputsCopy, Preamble.get()}); + }; -const ParseInputs &TUScheduler::getInputs(PathRef File) { - auto It = CachedInputs.find(File); - assert(It != CachedInputs.end()); - return It->second; + Tasks->runAsync( + BindWithForward(Task, Context::current().clone(), std::move(Action))); } std::vector> TUScheduler::getUsedBytesPerFile() const { - return Files.getUsedBytesPerFile(); + std::vector> Result; + Result.reserve(Files.size()); + for (auto &&PathAndFile : Files) + Result.push_back( + {PathAndFile.first(), PathAndFile.second->Worker->getUsedBytes()}); + return Result; } + } // namespace clangd } // namespace clang Index: clangd/Threading.h =================================================================== --- clangd/Threading.h +++ clangd/Threading.h @@ -12,74 +12,65 @@ #include "Context.h" #include "Function.h" +#include +#include #include -#include +#include #include -#include #include namespace clang { namespace clangd { -/// A simple fixed-size thread pool implementation. -class ThreadPool { + +/// A shared boolean flag indicating if the computation was cancelled. +/// Once cancelled, cannot be returned to the previous state. +class CancellationFlag { public: - /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd - /// will be processed synchronously on the calling thread. - // Otherwise, \p AsyncThreadsCount threads will be created to schedule the - // requests. - ThreadPool(unsigned AsyncThreadsCount); - /// Destructor blocks until all requests are processed and worker threads are - /// terminated. - ~ThreadPool(); + CancellationFlag(); - /// Add a new request to run function \p F with args \p As to the start of the - /// queue. The request will be run on a separate thread. - template - void addToFront(Func &&F, Args &&... As) { - if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; - } + void cancel() { + assert(WasCancelled && "the object was moved"); + WasCancelled->store(true); + } - { - std::lock_guard Lock(Mutex); - RequestQueue.emplace_front( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); - } - RequestCV.notify_one(); + bool isCancelled() const { + assert(WasCancelled && "the object was moved"); + return WasCancelled->load(); } - /// Add a new request to run function \p F with args \p As to the end of the - /// queue. The request will be run on a separate thread. - template void addToEnd(Func &&F, Args &&... As) { - if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; - } +private: + std::shared_ptr> WasCancelled; +}; - { - std::lock_guard Lock(Mutex); - RequestQueue.emplace_back( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); - } - RequestCV.notify_one(); - } +/// Limits the number of threads that can acquire the lock at the same time. +class Semaphore { +public: + Semaphore(std::size_t MaxLocks); + + void lock(); + void unlock(); + +private: + std::mutex Mutex; + std::condition_variable SlotsChanged; + std::size_t FreeSlots; +}; + +/// Runs tasks on separate (detached) threads and wait for all tasks to finish. +/// Objects that need to spawn threads can own an AsyncTaskRunner to ensure they +/// all complete on destruction. +class AsyncTaskRunner { +public: + /// Destructor waits for all pending tasks to finish. + ~AsyncTaskRunner(); + + void waitForAll(); + void runAsync(UniqueFunction Action); private: - bool RunSynchronously; - mutable std::mutex Mutex; - /// We run some tasks on separate threads(parsing, CppFile cleanup). - /// These threads looks into RequestQueue to find requests to handle and - /// terminate when Done is set to true. - std::vector Workers; - /// Setting Done to true will make the worker threads terminate. - bool Done = false; - /// A queue of requests. - std::deque, Context>> RequestQueue; - /// Condition variable to wake up worker threads. - std::condition_variable RequestCV; + std::mutex Mutex; + std::condition_variable TasksReachedZero; + std::size_t InFlightTasks = 0; }; } // namespace clangd } // namespace clang Index: clangd/Threading.cpp =================================================================== --- clangd/Threading.cpp +++ clangd/Threading.cpp @@ -1,63 +1,62 @@ #include "Threading.h" +#include "llvm/ADT/ScopeExit.h" #include "llvm/Support/FormatVariadic.h" #include "llvm/Support/Threading.h" +#include namespace clang { namespace clangd { -ThreadPool::ThreadPool(unsigned AsyncThreadsCount) - : RunSynchronously(AsyncThreadsCount == 0) { - if (RunSynchronously) { - // Don't start the worker thread if we're running synchronously - return; - } - Workers.reserve(AsyncThreadsCount); - for (unsigned I = 0; I < AsyncThreadsCount; ++I) { - Workers.push_back(std::thread([this, I]() { - llvm::set_thread_name(llvm::formatv("scheduler/{0}", I)); - while (true) { - UniqueFunction Request; - Context Ctx; - - // Pick request from the queue - { - std::unique_lock Lock(Mutex); - // Wait for more requests. - RequestCV.wait(Lock, - [this] { return !RequestQueue.empty() || Done; }); - if (RequestQueue.empty()) { - assert(Done); - return; - } - - // We process requests starting from the front of the queue. Users of - // ThreadPool have a way to prioritise their requests by putting - // them to the either side of the queue (using either addToEnd or - // addToFront). - std::tie(Request, Ctx) = std::move(RequestQueue.front()); - RequestQueue.pop_front(); - } // unlock Mutex - - WithContext WithCtx(std::move(Ctx)); - Request(); - } - })); - } +CancellationFlag::CancellationFlag() + : WasCancelled(std::make_shared>(false)) {} + +Semaphore::Semaphore(std::size_t MaxLocks) : FreeSlots(MaxLocks) {} + +void Semaphore::lock() { + std::unique_lock Lock(Mutex); + SlotsChanged.wait(Lock, [&]() { return FreeSlots > 0; }); + --FreeSlots; } -ThreadPool::~ThreadPool() { - if (RunSynchronously) - return; // no worker thread is running in that case +void Semaphore::unlock() { + std::unique_lock Lock(Mutex); + ++FreeSlots; + Lock.unlock(); + + SlotsChanged.notify_one(); +} +AsyncTaskRunner::~AsyncTaskRunner() { waitForAll(); } + +void AsyncTaskRunner::waitForAll() { + std::unique_lock Lock(Mutex); + TasksReachedZero.wait(Lock, [&]() { return InFlightTasks == 0; }); +} + +void AsyncTaskRunner::runAsync(UniqueFunction Action) { { - std::lock_guard Lock(Mutex); - // Wake up the worker thread - Done = true; - } // unlock Mutex - RequestCV.notify_all(); - - for (auto &Worker : Workers) - Worker.join(); + std::unique_lock Lock(Mutex); + ++InFlightTasks; + } + + auto CleanupTask = llvm::make_scope_exit([this]() { + std::unique_lock Lock(Mutex); + int NewTasksCnt = --InFlightTasks; + Lock.unlock(); + + if (NewTasksCnt == 0) + TasksReachedZero.notify_one(); + }); + + std::thread( + [](decltype(Action) Action, decltype(CleanupTask)) { + Action(); + // Make sure function stored by Action is destroyed before CleanupTask + // is run. + Action = nullptr; + }, + std::move(Action), std::move(CleanupTask)) + .detach(); } } // namespace clangd } // namespace clang Index: unittests/clangd/CMakeLists.txt =================================================================== --- unittests/clangd/CMakeLists.txt +++ unittests/clangd/CMakeLists.txt @@ -21,6 +21,7 @@ JSONExprTests.cpp URITests.cpp TestFS.cpp + ThreadingTests.cpp TraceTests.cpp TUSchedulerTests.cpp SourceCodeTests.cpp Index: unittests/clangd/ThreadingTests.cpp =================================================================== --- /dev/null +++ unittests/clangd/ThreadingTests.cpp @@ -0,0 +1,45 @@ +//===-- ThreadingTests.cpp --------------------------------------*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include "Threading.h" +#include "gtest/gtest.h" +#include + +namespace clang { +namespace clangd { +class ThreadingTest : public ::testing::Test {}; + +TEST_F(ThreadingTest, TaskRunner) { + const int TasksCnt = 100; + const int IncrementsPerTask = 1000; + + std::atomic Counter(0); + { + AsyncTaskRunner Tasks; + auto scheduleIncrements = [&]() { + for (int TaskI = 0; TaskI < TasksCnt; ++TaskI) { + Tasks.runAsync([&Counter]() { + for (int Increment = 0; Increment < IncrementsPerTask; ++Increment) + ++Counter; + }); + } + }; + + scheduleIncrements(); + Tasks.waitForAll(); + ASSERT_EQ(Counter.load(), TasksCnt * IncrementsPerTask); + + Counter = 0; + scheduleIncrements(); + } + // Check that destructor has waited for tasks to finish. + ASSERT_EQ(Counter.load(), TasksCnt * IncrementsPerTask); +} +} // namespace clangd +} // namespace clang