Index: clangd/CMakeLists.txt =================================================================== --- clangd/CMakeLists.txt +++ clangd/CMakeLists.txt @@ -6,7 +6,6 @@ ClangdLSPServer.cpp ClangdServer.cpp ClangdUnit.cpp - ClangdUnitStore.cpp CodeComplete.cpp CodeCompletionStrings.cpp CompileArgsCache.cpp Index: clangd/ClangdServer.h =================================================================== --- clangd/ClangdServer.h +++ clangd/ClangdServer.h @@ -11,7 +11,6 @@ #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDSERVER_H #include "ClangdUnit.h" -#include "ClangdUnitStore.h" #include "CodeComplete.h" #include "CompileArgsCache.h" #include "DraftStore.h" Index: clangd/ClangdUnitStore.h =================================================================== --- clangd/ClangdUnitStore.h +++ /dev/null @@ -1,73 +0,0 @@ -//===--- ClangdUnitStore.h - A container of CppFiles -------------*-C++-*-===// -// -// The LLVM Compiler Infrastructure -// -// This file is distributed under the University of Illinois Open Source -// License. See LICENSE.TXT for details. -// -//===---------------------------------------------------------------------===// - -#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H -#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H - -#include "ClangdUnit.h" -#include "GlobalCompilationDatabase.h" -#include "Logger.h" -#include "Path.h" -#include "clang/Tooling/CompilationDatabase.h" -#include - -namespace clang { -namespace clangd { - -class Logger; - -/// Thread-safe mapping from FileNames to CppFile. -class CppFileCollection { -public: - /// \p ASTCallback is called when a file is parsed synchronously. This should - /// not be expensive since it blocks diagnostics. - explicit CppFileCollection(bool StorePreamblesInMemory, - std::shared_ptr PCHs, - ASTParsedCallback ASTCallback) - : ASTCallback(std::move(ASTCallback)), PCHs(std::move(PCHs)), - StorePreamblesInMemory(StorePreamblesInMemory) {} - - std::shared_ptr getOrCreateFile(PathRef File) { - std::lock_guard Lock(Mutex); - auto It = OpenedFiles.find(File); - if (It == OpenedFiles.end()) { - It = OpenedFiles - .try_emplace(File, CppFile::Create(File, StorePreamblesInMemory, - PCHs, ASTCallback)) - .first; - } - return It->second; - } - - std::shared_ptr getFile(PathRef File) const { - std::lock_guard Lock(Mutex); - auto It = OpenedFiles.find(File); - if (It == OpenedFiles.end()) - return nullptr; - return It->second; - } - - /// Removes a CppFile, stored for \p File, if it's inside collection and - /// returns it. - std::shared_ptr removeIfPresent(PathRef File); - - /// Gets used memory for each of the stored files. - std::vector> getUsedBytesPerFile() const; - -private: - mutable std::mutex Mutex; - llvm::StringMap> OpenedFiles; - ASTParsedCallback ASTCallback; - std::shared_ptr PCHs; - bool StorePreamblesInMemory; -}; -} // namespace clangd -} // namespace clang - -#endif Index: clangd/ClangdUnitStore.cpp =================================================================== --- clangd/ClangdUnitStore.cpp +++ /dev/null @@ -1,37 +0,0 @@ -//===--- ClangdUnitStore.cpp - A ClangdUnits container -----------*-C++-*-===// -// -// The LLVM Compiler Infrastructure -// -// This file is distributed under the University of Illinois Open Source -// License. See LICENSE.TXT for details. -// -//===----------------------------------------------------------------------===// - -#include "ClangdUnitStore.h" -#include "llvm/Support/Path.h" -#include - -using namespace clang::clangd; -using namespace clang; - -std::shared_ptr CppFileCollection::removeIfPresent(PathRef File) { - std::lock_guard Lock(Mutex); - - auto It = OpenedFiles.find(File); - if (It == OpenedFiles.end()) - return nullptr; - - std::shared_ptr Result = It->second; - OpenedFiles.erase(It); - return Result; -} -std::vector> -CppFileCollection::getUsedBytesPerFile() const { - std::lock_guard Lock(Mutex); - std::vector> Result; - Result.reserve(OpenedFiles.size()); - for (auto &&PathAndFile : OpenedFiles) - Result.push_back( - {PathAndFile.first().str(), PathAndFile.second->getUsedBytes()}); - return Result; -} Index: clangd/TUScheduler.h =================================================================== --- clangd/TUScheduler.h +++ clangd/TUScheduler.h @@ -10,10 +10,10 @@ #ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_TUSCHEDULER_H #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_TUSCHEDULER_H -#include "ClangdUnit.h" -#include "ClangdUnitStore.h" #include "Function.h" #include "Threading.h" +#include "ClangdUnit.h" +#include "llvm/ADT/StringMap.h" namespace clang { namespace clangd { @@ -42,6 +42,7 @@ public: TUScheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory, ASTParsedCallback ASTCallback); + ~TUScheduler(); /// Returns estimated memory usage for each of the currently open files. /// The order of results is unspecified. @@ -81,11 +82,16 @@ UniqueFunction)> Action); private: - const ParseInputs &getInputs(PathRef File); + /// This class stores per-file data in the Files map. + struct FileData; - llvm::StringMap CachedInputs; - CppFileCollection Files; - ThreadPool Threads; + const bool RunSync; + const bool StorePreamblesInMemory; + const std::shared_ptr PCHs; + const ASTParsedCallback ASTCallback; + Semaphore Barrier; + llvm::StringMap> Files; + AsyncTaskRunner Tasks; }; } // namespace clangd } // namespace clang Index: clangd/TUScheduler.cpp =================================================================== --- clangd/TUScheduler.cpp +++ clangd/TUScheduler.cpp @@ -1,9 +1,276 @@ +//===--- TUScheduler.cpp -----------------------------------------*-C++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// For each file, managed by TUScheduler, we store a single ASTWorker that +// manages an AST for that file. All operations that modify or read the AST are +// run on a separate dedicated thread asynchronously in FIFO order. The +// ASTWorker that manages the AST is owned both by the processing thread and the +// TUScheduler. Therefore the destructor of ASTWorker to not block, as it is +// always run after the processing loop is finished. When the TUScheduler gives +// up the ownership of the ASTWorker, it signals the ASTWorker to end the +// processing loop. +// +// We start processing each update immediately after we receive it. If two or +// more updates come subsequently without reads in-between, we attempt to drop +// an older one to not waste time building the ASTs we don't need. +// +// The processing thread of the ASTWorker is also responsible for building the +// preamble. However, unlike AST, the same preamble can be read concurrently, so +// we run each of async preamble reads on its own thread. +// +// To limit the concurrent load that clangd produces we mantain a semaphore that +// keeps more than a fixed number of threads from running concurrently. + #include "TUScheduler.h" #include "clang/Frontend/PCHContainerOperations.h" #include "llvm/Support/Errc.h" +#include +#include namespace clang { namespace clangd { +namespace { +class ASTWorkerHandle; + +/// Owns one instance of the AST, schedules updates and reads of it. +/// Also responsible for building and providing access to the preamble. +/// Each ASTWorker processes the async requests sent to it on a separate +/// dedicated thread. +/// The clients can access this ASTWorker only via the smart-pointer-like +/// ASTWorkerHandler. +class ASTWorker { + friend class ASTWorkerHandle; + ASTWorker(Semaphore &Barrier, std::shared_ptr File, bool RunSync); + +public: + /// Create a new ASTWorker and return a handle to it. + /// The processing thread is spawned using \p Tasks. However, when \p RunSync + /// is true, all requests will be processed on the calling thread + /// synchronously instead. \p Barrier is acquired when processing each + /// request, it is be used to limit the number of actively running threads. + static ASTWorkerHandle Create(AsyncTaskRunner &Tasks, Semaphore &Barrier, + std::shared_ptr File, bool RunSync); + ~ASTWorker(); + + void update(ParseInputs Inputs, + UniqueFunction>)> + OnUpdated); + void runWithAST(UniqueFunction)> Action); + + std::shared_ptr getPossiblyStalePreamble() const; + std::size_t getUsedBytes() const; + +private: + // Must be called exactly once on processing thread. Will return after + // stop() is called on a separate thread and all pending requests are + // processed. + void run(); + /// Signal that run() should finish processing pending requests and exit. + void stop(); + + using RequestWithCtx = std::pair, Context>; + + const bool RunSync; + Semaphore &Barrier; + // File and FileInputs are only accessed on the processing thread from run(). + const std::shared_ptr File; + // Inputs, corresponding to the current state of File. + ParseInputs FileInputs; + mutable std::mutex Mutex; + // Set to true to signal run() to finish processin + bool Done; /* GUARDED_BY(Mutex) */ + std::queue Requests; /* GUARDED_BY(Mutex) */ + // Only set when last request is an update. This allows us to cancel an update + // that was never read, if a subsequent update comes in. + llvm::Optional LastUpdateCF; /* GUARDED_BY(Mutex) */ + std::condition_variable RequestsCV; +}; + +/// A smart-pointer-like class that points to an active ASTWorker. +/// In destructor, signals to the underlying ASTWorker that no new requests will +/// be sent and the processing loop may exit (after running all pending +/// requests). +class ASTWorkerHandle { + friend class ASTWorker; + ASTWorkerHandle(std::shared_ptr Worker); + +public: + ASTWorkerHandle(const ASTWorkerHandle &) = delete; + ASTWorkerHandle &operator=(const ASTWorkerHandle &) = delete; + ASTWorkerHandle(ASTWorkerHandle &&) = default; + ASTWorkerHandle &operator=(ASTWorkerHandle &&) = default; + + ~ASTWorkerHandle(); + + ASTWorker &operator*(); + ASTWorker *operator->(); + + /// Returns an owning reference to the underlying ASTWorker that can outlive + /// the ASTWorkerHandle. However, no new requests to an active ASTWorker can + /// be schedule via the returned reference, i.e. only reads of the preamble + /// are possible. + std::shared_ptr lock(); + +private: + std::shared_ptr Worker; +}; + +ASTWorkerHandle ASTWorker::Create(AsyncTaskRunner &Tasks, Semaphore &Barrier, + std::shared_ptr File, bool RunSync) { + std::shared_ptr Worker( + new ASTWorker(Barrier, std::move(File), RunSync)); + if (!RunSync) + Tasks.runAsync([Worker]() { Worker->run(); }); + + return ASTWorkerHandle(std::move(Worker)); +} + +ASTWorker::ASTWorker(Semaphore &Barrier, std::shared_ptr File, + bool RunSync) + : RunSync(RunSync), Barrier(Barrier), File(std::move(File)), Done(false) { + if (RunSync) + return; +} + +ASTWorker::~ASTWorker() { +#ifndef NDEBUG + std::lock_guard Lock(Mutex); + assert(Done && "handle was not destroyed"); + assert(Requests.empty() && "unprocessed requests when destroying ASTWorker"); +#endif +} + +void ASTWorker::update( + ParseInputs Inputs, + UniqueFunction>)> + OnUpdated) { + auto Task = [=](CancellationFlag CF, decltype(OnUpdated) OnUpdated) mutable { + if (CF.isCancelled()) { + OnUpdated(llvm::None); + return; + } + FileInputs = Inputs; + auto Diags = File->rebuild(std::move(Inputs)); + OnUpdated(std::move(Diags)); + }; + + if (RunSync) { + Task(CancellationFlag(), std::move(OnUpdated)); + return; + } + + { + std::lock_guard Lock(Mutex); + assert(!Done && "update() after stop()"); + if (!Requests.empty() && LastUpdateCF) { + // There were no reads for the last unprocessed update, let's cancel it to + // not waste time on it. + LastUpdateCF->cancel(); + } + LastUpdateCF = CancellationFlag(); + Requests.emplace(BindWithForward(Task, *LastUpdateCF, std::move(OnUpdated)), + Context::current().clone()); + } + RequestsCV.notify_one(); +} + +void ASTWorker::runWithAST( + UniqueFunction)> Action) { + auto Task = [=](decltype(Action) Action) { + auto AST = File->getAST().get(); + AST->runUnderLock([&](ParsedAST *AST) { + if (!AST) { + Action(llvm::make_error( + "invalid AST", llvm::errc::invalid_argument)); + return; + } + Action(InputsAndAST{FileInputs, *AST}); + }); + }; + + if (RunSync) { + Task(std::move(Action)); + return; + } + + { + std::lock_guard Lock(Mutex); + assert(!Done && "runWithAST() after stop()"); + LastUpdateCF = llvm::None; + Requests.emplace(BindWithForward(Task, std::move(Action)), + Context::current().clone()); + } + RequestsCV.notify_one(); +} + +std::shared_ptr +ASTWorker::getPossiblyStalePreamble() const { + return File->getPossiblyStalePreamble(); +} + +std::size_t ASTWorker::getUsedBytes() const { + std::lock_guard Lock(Mutex); + return File->getUsedBytes(); +} + +void ASTWorker::stop() { + { + std::lock_guard Lock(Mutex); + assert(!Done && "stop() called twice"); + Done = true; + } + RequestsCV.notify_one(); +} + +void ASTWorker::run() { + while (true) { + RequestWithCtx Req; + { + std::unique_lock Lock(Mutex); + RequestsCV.wait(Lock, [&]() { return Done || !Requests.empty(); }); + if (Requests.empty()) { + assert(Done); + return; + } + + Req = std::move(Requests.front()); + Requests.pop(); + } // unlock Mutex + + std::lock_guard BarrierLock(Barrier); + WithContext Guard(std::move(Req.second)); + Req.first(); + } +} + +ASTWorkerHandle::ASTWorkerHandle(std::shared_ptr Worker) + : Worker(std::move(Worker)) { + assert(this->Worker); +} + +ASTWorkerHandle::~ASTWorkerHandle() { + if (Worker) + Worker->stop(); +} + +ASTWorker &ASTWorkerHandle::operator*() { + assert(Worker && "Handle was moved from"); + return *Worker; +} + +ASTWorker *ASTWorkerHandle::operator->() { + assert(Worker && "Handle was moved from"); + return Worker.get(); +} + +std::shared_ptr ASTWorkerHandle::lock() { return Worker; } +} // namespace + unsigned getDefaultAsyncThreadsCount() { unsigned HardwareConcurrency = std::thread::hardware_concurrency(); // C++ standard says that hardware_concurrency() @@ -14,110 +281,119 @@ return HardwareConcurrency; } +struct TUScheduler::FileData { + FileData(ParseInputs Inputs, ASTWorkerHandle Worker) + : Inputs(std::move(Inputs)), Worker(std::move(Worker)) {} + + ParseInputs Inputs; + ASTWorkerHandle Worker; +}; + TUScheduler::TUScheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory, ASTParsedCallback ASTCallback) + : RunSync(AsyncThreadsCount == 0), + StorePreamblesInMemory(StorePreamblesInMemory), + PCHs(std::make_shared()), + ASTCallback(std::move(ASTCallback)), Barrier(AsyncThreadsCount) {} - : Files(StorePreamblesInMemory, std::make_shared(), - std::move(ASTCallback)), - Threads(AsyncThreadsCount) {} +TUScheduler::~TUScheduler() { + // Clear all FileData objects to notify all workers that they need to stop. + Files.clear(); + + // Wait for all in-flight tasks to finish. + Tasks.waitForAll(); +} void TUScheduler::update( PathRef File, ParseInputs Inputs, UniqueFunction>)> OnUpdated) { - CachedInputs[File] = Inputs; - - auto Resources = Files.getOrCreateFile(File); - auto DeferredRebuild = Resources->deferRebuild(std::move(Inputs)); + auto It = Files.find(File); + if (It == Files.end()) { + // Create a new worker to process the AST-related tasks. + ASTWorkerHandle Worker = ASTWorker::Create( + Tasks, Barrier, + CppFile::Create(File, StorePreamblesInMemory, PCHs, ASTCallback), + RunSync); + It = Files + .try_emplace( + File, llvm::make_unique(Inputs, std::move(Worker))) + .first; + } else { + It->second->Inputs = Inputs; + } - Threads.addToFront( - [](decltype(OnUpdated) OnUpdated, - decltype(DeferredRebuild) DeferredRebuild) { - auto Diags = DeferredRebuild(); - OnUpdated(Diags); - }, - std::move(OnUpdated), std::move(DeferredRebuild)); + It->second->Worker->update(Inputs, std::move(OnUpdated)); } void TUScheduler::remove(PathRef File, UniqueFunction Action) { - CachedInputs.erase(File); - - auto Resources = Files.removeIfPresent(File); - if (!Resources) { + auto It = Files.find(File); + if (It == Files.end()) { Action(llvm::make_error( "trying to remove non-added document", llvm::errc::invalid_argument)); return; } - - auto DeferredCancel = Resources->deferCancelRebuild(); - Threads.addToFront( - [](decltype(Action) Action, decltype(DeferredCancel) DeferredCancel) { - DeferredCancel(); - Action(llvm::Error::success()); - }, - std::move(Action), std::move(DeferredCancel)); + Files.erase(It); } void TUScheduler::runWithAST( PathRef File, UniqueFunction)> Action) { - auto Resources = Files.getFile(File); - if (!Resources) { + auto It = Files.find(File); + if (It == Files.end()) { Action(llvm::make_error( "trying to get AST for non-added document", llvm::errc::invalid_argument)); return; } - const ParseInputs &Inputs = getInputs(File); - // We currently block the calling thread until AST is available and run the - // action on the calling thread to avoid inconsistent states coming from - // subsequent updates. - // FIXME(ibiryukov): this should be moved to the worker threads. - Resources->getAST().get()->runUnderLock([&](ParsedAST *AST) { - if (AST) - Action(InputsAndAST{Inputs, *AST}); - else - Action(llvm::make_error( - "Could not build AST for the latest file update", - llvm::errc::invalid_argument)); - }); + It->second->Worker->runWithAST(std::move(Action)); } void TUScheduler::runWithPreamble( PathRef File, UniqueFunction)> Action) { - std::shared_ptr Resources = Files.getFile(File); - if (!Resources) { + auto It = Files.find(File); + if (It == Files.end()) { Action(llvm::make_error( "trying to get preamble for non-added document", llvm::errc::invalid_argument)); return; } - const ParseInputs &Inputs = getInputs(File); + ParseInputs InputsCopy = It->second->Inputs; std::shared_ptr Preamble = - Resources->getPossiblyStalePreamble(); - Threads.addToFront( - [Resources, Preamble, Inputs](decltype(Action) Action) mutable { - if (!Preamble) - Preamble = Resources->getPossiblyStalePreamble(); + It->second->Worker->getPossiblyStalePreamble(); - Action(InputsAndPreamble{Inputs, Preamble.get()}); - }, - std::move(Action)); -} + if (RunSync) { + Action(InputsAndPreamble{InputsCopy, Preamble.get()}); + return; + } -const ParseInputs &TUScheduler::getInputs(PathRef File) { - auto It = CachedInputs.find(File); - assert(It != CachedInputs.end()); - return It->second; + std::shared_ptr Worker = It->second->Worker.lock(); + auto Task = [InputsCopy, Preamble, Worker, + this](Context Ctx, decltype(Action) Action) mutable { + std::lock_guard BarrierLock(Barrier); + WithContext Guard(std::move(Ctx)); + if (!Preamble) + Preamble = Worker->getPossiblyStalePreamble(); + Action(InputsAndPreamble{InputsCopy, Preamble.get()}); + }; + + Tasks.runAsync( + BindWithForward(Task, Context::current().clone(), std::move(Action))); } std::vector> TUScheduler::getUsedBytesPerFile() const { - return Files.getUsedBytesPerFile(); + std::vector> Result; + Result.reserve(Files.size()); + for (auto &&PathAndFile : Files) + Result.push_back( + {PathAndFile.first(), PathAndFile.second->Worker->getUsedBytes()}); + return Result; } + } // namespace clangd } // namespace clang Index: clangd/Threading.h =================================================================== --- clangd/Threading.h +++ clangd/Threading.h @@ -12,74 +12,67 @@ #include "Context.h" #include "Function.h" +#include +#include #include -#include +#include #include -#include #include namespace clang { namespace clangd { -/// A simple fixed-size thread pool implementation. -class ThreadPool { + +/// A shared boolean flag indicating if the computation was cancelled. +/// Once cancelled, cannot be returned to the previous state. +/// FIXME: We should split this class it into consumers and producers of the +/// cancellation flags. +class CancellationFlag { public: - /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd - /// will be processed synchronously on the calling thread. - // Otherwise, \p AsyncThreadsCount threads will be created to schedule the - // requests. - ThreadPool(unsigned AsyncThreadsCount); - /// Destructor blocks until all requests are processed and worker threads are - /// terminated. - ~ThreadPool(); + CancellationFlag(); - /// Add a new request to run function \p F with args \p As to the start of the - /// queue. The request will be run on a separate thread. - template - void addToFront(Func &&F, Args &&... As) { - if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; - } + void cancel() { + assert(WasCancelled && "the object was moved"); + WasCancelled->store(true); + } - { - std::lock_guard Lock(Mutex); - RequestQueue.emplace_front( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); - } - RequestCV.notify_one(); + bool isCancelled() const { + assert(WasCancelled && "the object was moved"); + return WasCancelled->load(); } - /// Add a new request to run function \p F with args \p As to the end of the - /// queue. The request will be run on a separate thread. - template void addToEnd(Func &&F, Args &&... As) { - if (RunSynchronously) { - std::forward(F)(std::forward(As)...); - return; - } +private: + std::shared_ptr> WasCancelled; +}; - { - std::lock_guard Lock(Mutex); - RequestQueue.emplace_back( - BindWithForward(std::forward(F), std::forward(As)...), - Context::current().clone()); - } - RequestCV.notify_one(); - } +/// Limits the number of threads that can acquire the lock at the same time. +class Semaphore { +public: + Semaphore(std::size_t MaxLocks); + + void lock(); + void unlock(); + +private: + std::mutex Mutex; + std::condition_variable SlotsChanged; + std::size_t FreeSlots; +}; + +/// Runs tasks on separate (detached) threads and wait for all tasks to finish. +/// Objects that need to spawn threads can own an AsyncTaskRunner to ensure they +/// all complete on destruction. +class AsyncTaskRunner { +public: + /// Destructor waits for all pending tasks to finish. + ~AsyncTaskRunner(); + + void waitForAll(); + void runAsync(UniqueFunction Action); private: - bool RunSynchronously; - mutable std::mutex Mutex; - /// We run some tasks on separate threads(parsing, CppFile cleanup). - /// These threads looks into RequestQueue to find requests to handle and - /// terminate when Done is set to true. - std::vector Workers; - /// Setting Done to true will make the worker threads terminate. - bool Done = false; - /// A queue of requests. - std::deque, Context>> RequestQueue; - /// Condition variable to wake up worker threads. - std::condition_variable RequestCV; + std::mutex Mutex; + std::condition_variable TasksReachedZero; + std::size_t InFlightTasks = 0; }; } // namespace clangd } // namespace clang Index: clangd/Threading.cpp =================================================================== --- clangd/Threading.cpp +++ clangd/Threading.cpp @@ -1,63 +1,62 @@ #include "Threading.h" +#include "llvm/ADT/ScopeExit.h" #include "llvm/Support/FormatVariadic.h" #include "llvm/Support/Threading.h" +#include namespace clang { namespace clangd { -ThreadPool::ThreadPool(unsigned AsyncThreadsCount) - : RunSynchronously(AsyncThreadsCount == 0) { - if (RunSynchronously) { - // Don't start the worker thread if we're running synchronously - return; - } - Workers.reserve(AsyncThreadsCount); - for (unsigned I = 0; I < AsyncThreadsCount; ++I) { - Workers.push_back(std::thread([this, I]() { - llvm::set_thread_name(llvm::formatv("scheduler/{0}", I)); - while (true) { - UniqueFunction Request; - Context Ctx; - - // Pick request from the queue - { - std::unique_lock Lock(Mutex); - // Wait for more requests. - RequestCV.wait(Lock, - [this] { return !RequestQueue.empty() || Done; }); - if (RequestQueue.empty()) { - assert(Done); - return; - } - - // We process requests starting from the front of the queue. Users of - // ThreadPool have a way to prioritise their requests by putting - // them to the either side of the queue (using either addToEnd or - // addToFront). - std::tie(Request, Ctx) = std::move(RequestQueue.front()); - RequestQueue.pop_front(); - } // unlock Mutex - - WithContext WithCtx(std::move(Ctx)); - Request(); - } - })); - } +CancellationFlag::CancellationFlag() + : WasCancelled(std::make_shared>(false)) {} + +Semaphore::Semaphore(std::size_t MaxLocks) : FreeSlots(MaxLocks) {} + +void Semaphore::lock() { + std::unique_lock Lock(Mutex); + SlotsChanged.wait(Lock, [&]() { return FreeSlots > 0; }); + --FreeSlots; } -ThreadPool::~ThreadPool() { - if (RunSynchronously) - return; // no worker thread is running in that case +void Semaphore::unlock() { + std::unique_lock Lock(Mutex); + ++FreeSlots; + Lock.unlock(); + + SlotsChanged.notify_one(); +} +AsyncTaskRunner::~AsyncTaskRunner() { waitForAll(); } + +void AsyncTaskRunner::waitForAll() { + std::unique_lock Lock(Mutex); + TasksReachedZero.wait(Lock, [&]() { return InFlightTasks == 0; }); +} + +void AsyncTaskRunner::runAsync(UniqueFunction Action) { { - std::lock_guard Lock(Mutex); - // Wake up the worker thread - Done = true; - } // unlock Mutex - RequestCV.notify_all(); - - for (auto &Worker : Workers) - Worker.join(); + std::unique_lock Lock(Mutex); + ++InFlightTasks; + } + + auto CleanupTask = llvm::make_scope_exit([this]() { + std::unique_lock Lock(Mutex); + int NewTasksCnt = --InFlightTasks; + Lock.unlock(); + + if (NewTasksCnt == 0) + TasksReachedZero.notify_one(); + }); + + std::thread( + [](decltype(Action) Action, decltype(CleanupTask)) { + Action(); + // Make sure function stored by Action is destroyed before CleanupTask + // is run. + Action = nullptr; + }, + std::move(Action), std::move(CleanupTask)) + .detach(); } } // namespace clangd } // namespace clang