Index: clangd/ASTWorker.h =================================================================== --- /dev/null +++ clangd/ASTWorker.h @@ -0,0 +1,118 @@ +//===--- ASTWorker.h ----------------------------------------------*-C++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_ASTWORKER_H +#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_ASTWORKER_H + +#include "ClangdUnit.h" +#include "Threading.h" +#include +#include + +namespace clang { +namespace clangd { +struct InputsAndAST { + const ParseInputs &Inputs; + ParsedAST &AST; +}; + +struct InputsAndPreamble { + const ParseInputs &Inputs; + const PreambleData *Preamble; +}; + +class ASTWorkerHandle; + +/// Owns one instance of the AST, schedules updated and reads of it. +/// Also responsible for building and providing access to the preamble. +/// Each ASTWorker processes the async requests sent to it on a separate +/// dedicated thread. +/// The clients can access this ASTWorker only via the smart-pointer-like +/// ASTWorkerHandler. +class ASTWorker { + friend class ASTWorkerHandle; + ASTWorker(Semaphore &Barrier, std::shared_ptr File, bool RunSync); + +public: + /// Create a new ASTWorker and return a handle to it. + /// The processing thread is spawned using \p Tasks. However, when \p RunSync + /// is true, all requests will be processed on the calling thread + /// synchronously instead. \p Barrier is acquired when processing each + /// request, it is be used to limit the number of actively running threads. + static ASTWorkerHandle Create(AsyncTaskRunner &Tasks, Semaphore &Barrier, + std::shared_ptr File, bool RunSync); + ~ASTWorker(); + + std::shared_ptr getPossiblyStalePreamble() const; + std::size_t getUsedBytes() const; + + void update(ParseInputs Inputs, + UniqueFunction>)> + OnUpdated); + void runWithAST(UniqueFunction)> Action); + +private: + /// Signal that run() should finish processing pending requests and exit. + void setDone(); + // Must be called exactly once on processing thread. Will return after + // setDone() is called on a separate thread and all pending requests are + // processed. + void run(); + + using RequestWithCtx = std::pair, Context>; + + const bool RunSync; + Semaphore &Barrier; + // File and FileInputs are only accessed on the processing thread from run(). + // FIXME(ibiryukov): group CppFile and FileInputs into a separate class. + const std::shared_ptr File; + // Inputs, corresponding to the current state of File. + ParseInputs FileInputs; + mutable std::mutex Mutex; + // Set to true to signal run() to finish processin + bool Done; /* GUARDED_BY(Mutex) */ + std::queue Requests; /* GUARDED_BY(Mutex) */ + // Only set when last request is an update. + llvm::Optional LastUpdateCF; /* GUARDED_BY(Mutex) */ + std::condition_variable RequestsCV; +}; + +/// A smart-pointer-like class that points to an active ASTWorker. +/// In destructor, signals to the underlying ASTWorker that no new requests will +/// be sent and the processing loop may exit (after running all pending +/// requests). +class ASTWorkerHandle { + friend class ASTWorker; + ASTWorkerHandle(std::shared_ptr Worker); + +public: + ASTWorkerHandle(const ASTWorkerHandle &) = delete; + ASTWorkerHandle &operator=(const ASTWorkerHandle &) = delete; + ASTWorkerHandle(ASTWorkerHandle &&) = default; + ASTWorkerHandle &operator=(ASTWorkerHandle &&) = default; + + ~ASTWorkerHandle(); + + ASTWorker &operator*(); + ASTWorker *operator->(); + + /// Returns an owning reference to the underlying ASTWorker that can outlive + /// the ASTWorkerHandle. However, no new requests to an active ASTWorker can + /// be schedule via the returned reference, i.e. only reads of the preamble + /// are possible. + std::shared_ptr lock(); + +private: + std::shared_ptr Worker; +}; + +} // namespace clangd +} // namespace clang + +#endif Index: clangd/ASTWorker.cpp =================================================================== --- /dev/null +++ clangd/ASTWorker.cpp @@ -0,0 +1,167 @@ +//===--- ASTWorker.cpp --------------------------------------------*-C++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include "ASTWorker.h" +#include "llvm/Support/Errc.h" + +namespace clang { +namespace clangd { + +ASTWorkerHandle ASTWorker::Create(AsyncTaskRunner &Tasks, Semaphore &Barrier, + std::shared_ptr File, bool RunSync) { + std::shared_ptr Worker( + new ASTWorker(Barrier, std::move(File), RunSync)); + if (!RunSync) + Tasks.runAsync([Worker]() { Worker->run(); }); + + return ASTWorkerHandle(std::move(Worker)); +} + +ASTWorker::ASTWorker(Semaphore &Barrier, std::shared_ptr File, + bool RunSync) + : RunSync(RunSync), Barrier(Barrier), File(std::move(File)), Done(false) { + if (RunSync) + return; +} + +ASTWorker::~ASTWorker() { +#ifndef NDEBUG + std::lock_guard Lock(Mutex); + assert(Done && "handle was not destroyed"); + assert(Requests.empty() && "unprocessed requests when destroying ASTWorker"); +#endif +} + +std::shared_ptr +ASTWorker::getPossiblyStalePreamble() const { + return File->getPossiblyStalePreamble(); +} + +std::size_t ASTWorker::getUsedBytes() const { + std::lock_guard Lock(Mutex); + return File->getUsedBytes(); +} + +void ASTWorker::update( + ParseInputs Inputs, + UniqueFunction>)> + OnUpdated) { + auto Task = [=](CancellationFlag CF, decltype(OnUpdated) OnUpdated) mutable { + if (CF.isCancelled()) { + OnUpdated(llvm::None); + return; + } + FileInputs = Inputs; + auto Diags = File->rebuild(std::move(Inputs)); + OnUpdated(std::move(Diags)); + }; + + if (RunSync) { + Task(CancellationFlag(), std::move(OnUpdated)); + return; + } + + { + std::lock_guard Lock(Mutex); + assert(!Done && "update() after setDone()"); + if (!Requests.empty() && LastUpdateCF) { + // There were no reads for the last unprocessed update, let's cancel it to + // not waste time on it. + LastUpdateCF->cancel(); + } + LastUpdateCF = CancellationFlag(); + Requests.emplace(BindWithForward(Task, *LastUpdateCF, std::move(OnUpdated)), + Context::current().clone()); + } + RequestsCV.notify_one(); +} + +void ASTWorker::runWithAST( + UniqueFunction)> Action) { + auto Task = [=](decltype(Action) Action) { + auto AST = File->getAST().get(); + AST->runUnderLock([&](ParsedAST *AST) { + if (!AST) { + Action(llvm::make_error( + "invalid AST", llvm::errc::invalid_argument)); + return; + } + Action(InputsAndAST{FileInputs, *AST}); + }); + }; + + if (RunSync) { + Task(std::move(Action)); + return; + } + + { + std::lock_guard Lock(Mutex); + assert(!Done && "runWithAST() after setDone()"); + LastUpdateCF = llvm::None; + Requests.emplace(BindWithForward(Task, std::move(Action)), + Context::current().clone()); + } + RequestsCV.notify_one(); +} + +void ASTWorker::run() { + while (true) { + RequestWithCtx Req; + { + std::unique_lock Lock(Mutex); + RequestsCV.wait(Lock, [&]() { return Done || !Requests.empty(); }); + if (Requests.empty()) { + assert(Done); + return; + } + + Req = std::move(Requests.front()); + Requests.pop(); + } // unlock Mutex + + std::lock_guard BarrierLock(Barrier); + WithContext Guard(std::move(Req.second)); + Req.first(); + } +} + +void ASTWorker::setDone() { + { + std::lock_guard Lock(Mutex); + assert(!Done && "setDone() called twice"); + Done = true; + } + RequestsCV.notify_one(); +} + +ASTWorkerHandle::ASTWorkerHandle(std::shared_ptr Worker) + : Worker(std::move(Worker)) { + assert(this->Worker); +} + +ASTWorkerHandle::~ASTWorkerHandle() { + if (Worker) + Worker->setDone(); +} + +ASTWorker &ASTWorkerHandle::operator*() { + assert(Worker && "Handle was moved from"); + return *Worker; +} + +ASTWorker *ASTWorkerHandle::operator->() { + assert(Worker && "Handle was moved from"); + return Worker.get(); +} + +std::shared_ptr ASTWorkerHandle::lock() { return Worker; } + +} // namespace clangd +} // namespace clang Index: clangd/CMakeLists.txt =================================================================== --- clangd/CMakeLists.txt +++ clangd/CMakeLists.txt @@ -3,10 +3,10 @@ ) add_clang_library(clangDaemon + ASTWorker.cpp ClangdLSPServer.cpp ClangdServer.cpp ClangdUnit.cpp - ClangdUnitStore.cpp CodeComplete.cpp CodeCompletionStrings.cpp CompileArgsCache.cpp Index: clangd/ClangdServer.h =================================================================== --- clangd/ClangdServer.h +++ clangd/ClangdServer.h @@ -11,7 +11,6 @@ #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDSERVER_H #include "ClangdUnit.h" -#include "ClangdUnitStore.h" #include "CodeComplete.h" #include "CompileArgsCache.h" #include "DraftStore.h" Index: clangd/ClangdUnitStore.h =================================================================== --- clangd/ClangdUnitStore.h +++ /dev/null @@ -1,73 +0,0 @@ -//===--- ClangdUnitStore.h - A container of CppFiles -------------*-C++-*-===// -// -// The LLVM Compiler Infrastructure -// -// This file is distributed under the University of Illinois Open Source -// License. See LICENSE.TXT for details. -// -//===---------------------------------------------------------------------===// - -#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H -#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H - -#include "ClangdUnit.h" -#include "GlobalCompilationDatabase.h" -#include "Logger.h" -#include "Path.h" -#include "clang/Tooling/CompilationDatabase.h" -#include - -namespace clang { -namespace clangd { - -class Logger; - -/// Thread-safe mapping from FileNames to CppFile. -class CppFileCollection { -public: - /// \p ASTCallback is called when a file is parsed synchronously. This should - /// not be expensive since it blocks diagnostics. - explicit CppFileCollection(bool StorePreamblesInMemory, - std::shared_ptr PCHs, - ASTParsedCallback ASTCallback) - : ASTCallback(std::move(ASTCallback)), PCHs(std::move(PCHs)), - StorePreamblesInMemory(StorePreamblesInMemory) {} - - std::shared_ptr getOrCreateFile(PathRef File) { - std::lock_guard Lock(Mutex); - auto It = OpenedFiles.find(File); - if (It == OpenedFiles.end()) { - It = OpenedFiles - .try_emplace(File, CppFile::Create(File, StorePreamblesInMemory, - PCHs, ASTCallback)) - .first; - } - return It->second; - } - - std::shared_ptr getFile(PathRef File) const { - std::lock_guard Lock(Mutex); - auto It = OpenedFiles.find(File); - if (It == OpenedFiles.end()) - return nullptr; - return It->second; - } - - /// Removes a CppFile, stored for \p File, if it's inside collection and - /// returns it. - std::shared_ptr removeIfPresent(PathRef File); - - /// Gets used memory for each of the stored files. - std::vector> getUsedBytesPerFile() const; - -private: - mutable std::mutex Mutex; - llvm::StringMap> OpenedFiles; - ASTParsedCallback ASTCallback; - std::shared_ptr PCHs; - bool StorePreamblesInMemory; -}; -} // namespace clangd -} // namespace clang - -#endif Index: clangd/ClangdUnitStore.cpp =================================================================== --- clangd/ClangdUnitStore.cpp +++ /dev/null @@ -1,37 +0,0 @@ -//===--- ClangdUnitStore.cpp - A ClangdUnits container -----------*-C++-*-===// -// -// The LLVM Compiler Infrastructure -// -// This file is distributed under the University of Illinois Open Source -// License. See LICENSE.TXT for details. -// -//===----------------------------------------------------------------------===// - -#include "ClangdUnitStore.h" -#include "llvm/Support/Path.h" -#include - -using namespace clang::clangd; -using namespace clang; - -std::shared_ptr CppFileCollection::removeIfPresent(PathRef File) { - std::lock_guard Lock(Mutex); - - auto It = OpenedFiles.find(File); - if (It == OpenedFiles.end()) - return nullptr; - - std::shared_ptr Result = It->second; - OpenedFiles.erase(It); - return Result; -} -std::vector> -CppFileCollection::getUsedBytesPerFile() const { - std::lock_guard Lock(Mutex); - std::vector> Result; - Result.reserve(OpenedFiles.size()); - for (auto &&PathAndFile : OpenedFiles) - Result.push_back( - {PathAndFile.first().str(), PathAndFile.second->getUsedBytes()}); - return Result; -} Index: clangd/TUScheduler.h =================================================================== --- clangd/TUScheduler.h +++ clangd/TUScheduler.h @@ -10,10 +10,10 @@ #ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_TUSCHEDULER_H #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_TUSCHEDULER_H -#include "ClangdUnit.h" -#include "ClangdUnitStore.h" +#include "ASTWorker.h" #include "Function.h" #include "Threading.h" +#include "llvm/ADT/StringMap.h" namespace clang { namespace clangd { @@ -22,16 +22,6 @@ /// synchronously). unsigned getDefaultAsyncThreadsCount(); -struct InputsAndAST { - const ParseInputs &Inputs; - ParsedAST &AST; -}; - -struct InputsAndPreamble { - const ParseInputs &Inputs; - const PreambleData *Preamble; -}; - /// Handles running tasks for ClangdServer and managing the resources (e.g., /// preambles and ASTs) for opened files. /// TUScheduler is not thread-safe, only one thread should be providing updates @@ -42,6 +32,7 @@ public: TUScheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory, ASTParsedCallback ASTCallback); + ~TUScheduler(); /// Returns estimated memory usage for each of the currently open files. /// The order of results is unspecified. @@ -81,11 +72,25 @@ UniqueFunction)> Action); private: - const ParseInputs &getInputs(PathRef File); + /// This class stores per-file data in the Files map. + struct FileData { + FileData(ParseInputs Inputs, ASTWorkerHandle Worker); + + ParseInputs Inputs; + ASTWorkerHandle Worker; + }; + + struct AuxData { + bool StorePreamblesInMemory; + std::shared_ptr PCHs; + ASTParsedCallback ASTCallback; + }; - llvm::StringMap CachedInputs; - CppFileCollection Files; - ThreadPool Threads; + const bool RunSync; + const AuxData Data; + Semaphore Barrier; + llvm::StringMap> Files; + AsyncTaskRunner Tasks; }; } // namespace clangd } // namespace clang Index: clangd/TUScheduler.cpp =================================================================== --- clangd/TUScheduler.cpp +++ clangd/TUScheduler.cpp @@ -1,3 +1,11 @@ +//===--- TUScheduler.cpp -----------------------------------------*-C++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// #include "TUScheduler.h" #include "clang/Frontend/PCHContainerOperations.h" #include "llvm/Support/Errc.h" @@ -17,107 +25,112 @@ TUScheduler::TUScheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory, ASTParsedCallback ASTCallback) + : RunSync(AsyncThreadsCount == 0), + Data{StorePreamblesInMemory, std::make_shared(), + std::move(ASTCallback)}, + Barrier(AsyncThreadsCount) {} - : Files(StorePreamblesInMemory, std::make_shared(), - std::move(ASTCallback)), - Threads(AsyncThreadsCount) {} +TUScheduler::~TUScheduler() { + // Clear all FileData objects to notify all workers that they need to stop. + Files.clear(); + + // Wait for all in-flight tasks to finish. + Tasks.waitForAll(); +} void TUScheduler::update( PathRef File, ParseInputs Inputs, UniqueFunction>)> OnUpdated) { - CachedInputs[File] = Inputs; - - auto Resources = Files.getOrCreateFile(File); - auto DeferredRebuild = Resources->deferRebuild(std::move(Inputs)); - - Threads.addToFront( - [](decltype(OnUpdated) OnUpdated, - decltype(DeferredRebuild) DeferredRebuild) { - auto Diags = DeferredRebuild(); - OnUpdated(Diags); - }, - std::move(OnUpdated), std::move(DeferredRebuild)); + auto It = Files.find(File); + if (It == Files.end()) { + // Create a new worker to process the AST-related tasks. + ASTWorkerHandle Worker = + ASTWorker::Create(Tasks, Barrier, + CppFile::Create(File, Data.StorePreamblesInMemory, + Data.PCHs, Data.ASTCallback), + RunSync); + It = Files + .try_emplace( + File, llvm::make_unique(Inputs, std::move(Worker))) + .first; + } else { + It->second->Inputs = Inputs; + } + + It->second->Worker->update(Inputs, std::move(OnUpdated)); } void TUScheduler::remove(PathRef File, UniqueFunction Action) { - CachedInputs.erase(File); - - auto Resources = Files.removeIfPresent(File); - if (!Resources) { + auto It = Files.find(File); + if (It == Files.end()) { Action(llvm::make_error( "trying to remove non-added document", llvm::errc::invalid_argument)); return; } - - auto DeferredCancel = Resources->deferCancelRebuild(); - Threads.addToFront( - [](decltype(Action) Action, decltype(DeferredCancel) DeferredCancel) { - DeferredCancel(); - Action(llvm::Error::success()); - }, - std::move(Action), std::move(DeferredCancel)); + Files.erase(It); } void TUScheduler::runWithAST( PathRef File, UniqueFunction)> Action) { - auto Resources = Files.getFile(File); - if (!Resources) { + auto It = Files.find(File); + if (It == Files.end()) { Action(llvm::make_error( "trying to get AST for non-added document", llvm::errc::invalid_argument)); return; } - const ParseInputs &Inputs = getInputs(File); - // We currently block the calling thread until AST is available and run the - // action on the calling thread to avoid inconsistent states coming from - // subsequent updates. - // FIXME(ibiryukov): this should be moved to the worker threads. - Resources->getAST().get()->runUnderLock([&](ParsedAST *AST) { - if (AST) - Action(InputsAndAST{Inputs, *AST}); - else - Action(llvm::make_error( - "Could not build AST for the latest file update", - llvm::errc::invalid_argument)); - }); + It->second->Worker->runWithAST(std::move(Action)); } void TUScheduler::runWithPreamble( PathRef File, UniqueFunction)> Action) { - std::shared_ptr Resources = Files.getFile(File); - if (!Resources) { + auto It = Files.find(File); + if (It == Files.end()) { Action(llvm::make_error( "trying to get preamble for non-added document", llvm::errc::invalid_argument)); return; } - const ParseInputs &Inputs = getInputs(File); + ParseInputs InputsCopy = It->second->Inputs; std::shared_ptr Preamble = - Resources->getPossiblyStalePreamble(); - Threads.addToFront( - [Resources, Preamble, Inputs](decltype(Action) Action) mutable { - if (!Preamble) - Preamble = Resources->getPossiblyStalePreamble(); - - Action(InputsAndPreamble{Inputs, Preamble.get()}); - }, - std::move(Action)); -} + It->second->Worker->getPossiblyStalePreamble(); + + if (RunSync) { + Action(InputsAndPreamble{InputsCopy, Preamble.get()}); + return; + } -const ParseInputs &TUScheduler::getInputs(PathRef File) { - auto It = CachedInputs.find(File); - assert(It != CachedInputs.end()); - return It->second; + std::shared_ptr Worker = It->second->Worker.lock(); + auto Task = [InputsCopy, Preamble, Worker, + this](Context Ctx, decltype(Action) Action) mutable { + std::lock_guard BarrierLock(Barrier); + WithContext Guard(std::move(Ctx)); + if (!Preamble) + Preamble = Worker->getPossiblyStalePreamble(); + Action(InputsAndPreamble{InputsCopy, Preamble.get()}); + }; + + Tasks.runAsync( + BindWithForward(Task, Context::current().clone(), std::move(Action))); } std::vector> TUScheduler::getUsedBytesPerFile() const { - return Files.getUsedBytesPerFile(); + std::vector> Result; + Result.reserve(Files.size()); + for (auto &&PathAndFile : Files) + Result.push_back( + {PathAndFile.first(), PathAndFile.second->Worker->getUsedBytes()}); + return Result; } + +TUScheduler::FileData::FileData(ParseInputs Inputs, ASTWorkerHandle Worker) + : Inputs(std::move(Inputs)), Worker(std::move(Worker)) {} + } // namespace clangd } // namespace clang Index: clangd/Threading.h =================================================================== --- clangd/Threading.h +++ clangd/Threading.h @@ -12,74 +12,65 @@ #include "Context.h" #include "Function.h" +#include +#include #include -#include +#include #include -#include #include namespace clang { namespace clangd { -/// A simple fixed-size thread pool implementation. -class ThreadPool { + +/// A shared boolean flag indicating if the computation was cancelled. +/// Once cancelled, cannot be returned to the previous state. +/// FIXME: We should split this class it into consumers and producers of the +/// cancellation flags. +class CancellationFlag { public: - /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd - /// will be processed synchronously on the calling thread. - // Otherwise, \p AsyncThreadsCount threads will be created to schedule the - // requests. - ThreadPool(unsigned AsyncThreadsCount); - /// Destructor blocks until all requests are processed and worker threads are - /// terminated. - ~ThreadPool(); + CancellationFlag(); - /// Add a new request to run function \p F with args \p As to the start of the - /// queue. The request will be run on a separate thread. - template - void addToFront(Func &&F, Args &&... As) { - if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; - } + void cancel() { + assert(WasCancelled && "the object was moved"); + WasCancelled->store(true); + } - { - std::lock_guard Lock(Mutex); - RequestQueue.emplace_front( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); - } - RequestCV.notify_one(); + bool isCancelled() const { + assert(WasCancelled && "the object was moved"); + return WasCancelled->load(); } - /// Add a new request to run function \p F with args \p As to the end of the - /// queue. The request will be run on a separate thread. - template void addToEnd(Func &&F, Args &&... As) { - if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; - } +private: + std::shared_ptr> WasCancelled; +}; - { - std::lock_guard Lock(Mutex); - RequestQueue.emplace_back( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); - } - RequestCV.notify_one(); - } +/// Limits the number of threads that can acquire the lock at the same time. +class Semaphore { +public: + Semaphore(std::size_t MaxLocks); + + void lock(); + void unlock(); + +private: + std::mutex Mutex; + std::condition_variable SlotsChanged; + std::size_t FreeSlots; +}; + +/// Allows to run tasks on separate (detached) threads and wait for all tasks to +/// finish. +class AsyncTaskRunner { +public: + ~AsyncTaskRunner(); + + void waitForAll(); + void runAsync(UniqueFunction Action); private: - bool RunSynchronously; - mutable std::mutex Mutex; - /// We run some tasks on separate threads(parsing, CppFile cleanup). - /// These threads looks into RequestQueue to find requests to handle and - /// terminate when Done is set to true. - std::vector Workers; - /// Setting Done to true will make the worker threads terminate. - bool Done = false; - /// A queue of requests. - std::deque, Context>> RequestQueue; - /// Condition variable to wake up worker threads. - std::condition_variable RequestCV; + std::mutex Mutex; + std::condition_variable TasksReachedZero; + std::size_t InFlightTasks = 0; }; } // namespace clangd } // namespace clang Index: clangd/Threading.cpp =================================================================== --- clangd/Threading.cpp +++ clangd/Threading.cpp @@ -1,63 +1,62 @@ #include "Threading.h" +#include "llvm/ADT/ScopeExit.h" #include "llvm/Support/FormatVariadic.h" #include "llvm/Support/Threading.h" +#include namespace clang { namespace clangd { -ThreadPool::ThreadPool(unsigned AsyncThreadsCount) - : RunSynchronously(AsyncThreadsCount == 0) { - if (RunSynchronously) { - // Don't start the worker thread if we're running synchronously - return; - } - Workers.reserve(AsyncThreadsCount); - for (unsigned I = 0; I < AsyncThreadsCount; ++I) { - Workers.push_back(std::thread([this, I]() { - llvm::set_thread_name(llvm::formatv("scheduler/{0}", I)); - while (true) { - UniqueFunction Request; - Context Ctx; - - // Pick request from the queue - { - std::unique_lock Lock(Mutex); - // Wait for more requests. - RequestCV.wait(Lock, - [this] { return !RequestQueue.empty() || Done; }); - if (RequestQueue.empty()) { - assert(Done); - return; - } - - // We process requests starting from the front of the queue. Users of - // ThreadPool have a way to prioritise their requests by putting - // them to the either side of the queue (using either addToEnd or - // addToFront). - std::tie(Request, Ctx) = std::move(RequestQueue.front()); - RequestQueue.pop_front(); - } // unlock Mutex - - WithContext WithCtx(std::move(Ctx)); - Request(); - } - })); - } +CancellationFlag::CancellationFlag() + : WasCancelled(std::make_shared>(false)) {} + +Semaphore::Semaphore(std::size_t MaxLocks) : FreeSlots(MaxLocks) {} + +void Semaphore::lock() { + std::unique_lock Lock(Mutex); + SlotsChanged.wait(Lock, [&]() { return FreeSlots > 0; }); + --FreeSlots; } -ThreadPool::~ThreadPool() { - if (RunSynchronously) - return; // no worker thread is running in that case +void Semaphore::unlock() { + std::unique_lock Lock(Mutex); + ++FreeSlots; + Lock.unlock(); + + SlotsChanged.notify_one(); +} +AsyncTaskRunner::~AsyncTaskRunner() { waitForAll(); } + +void AsyncTaskRunner::waitForAll() { + std::unique_lock Lock(Mutex); + TasksReachedZero.wait(Lock, [&]() { return InFlightTasks == 0; }); +} + +void AsyncTaskRunner::runAsync(UniqueFunction Action) { { - std::lock_guard Lock(Mutex); - // Wake up the worker thread - Done = true; - } // unlock Mutex - RequestCV.notify_all(); - - for (auto &Worker : Workers) - Worker.join(); + std::unique_lock Lock(Mutex); + ++InFlightTasks; + } + + auto CleanupTask = llvm::make_scope_exit([this]() { + std::unique_lock Lock(Mutex); + int NewTasksCnt = --InFlightTasks; + Lock.unlock(); + + if (NewTasksCnt == 0) + TasksReachedZero.notify_one(); + }); + + std::thread( + [](decltype(Action) Action, decltype(CleanupTask)) { + Action(); + // Make sure function stored by Action is destroyed before CleanupTask + // is run. + Action = nullptr; + }, + std::move(Action), std::move(CleanupTask)) + .detach(); } } // namespace clangd } // namespace clang