Index: clangd/CMakeLists.txt =================================================================== --- clangd/CMakeLists.txt +++ clangd/CMakeLists.txt @@ -6,7 +6,6 @@ ClangdLSPServer.cpp ClangdServer.cpp ClangdUnit.cpp - ClangdUnitStore.cpp CodeComplete.cpp CodeCompletionStrings.cpp CompileArgsCache.cpp @@ -30,6 +29,8 @@ index/Merge.cpp index/SymbolCollector.cpp index/SymbolYAML.cpp + threading/Cancellation.cpp + threading/RequestQueue.cpp LINK_LIBS clangAST Index: clangd/ClangdServer.h =================================================================== --- clangd/ClangdServer.h +++ clangd/ClangdServer.h @@ -11,7 +11,6 @@ #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDSERVER_H #include "ClangdUnit.h" -#include "ClangdUnitStore.h" #include "CodeComplete.h" #include "CompileArgsCache.h" #include "DraftStore.h" @@ -19,6 +18,8 @@ #include "GlobalCompilationDatabase.h" #include "Protocol.h" #include "index/FileIndex.h" +#include "threading/Cancellation.h" +#include "threading/RequestQueue.h" #include "clang/Tooling/CompilationDatabase.h" #include "clang/Tooling/Core/Replacement.h" #include "llvm/ADT/IntrusiveRefCntPtr.h" @@ -97,14 +98,48 @@ getTaggedFileSystem(PathRef File) override; }; -class ClangdServer; - /// Returns a number of a default async threads to use for Scheduler. /// Returned value is always >= 1 (i.e. will not cause requests to be processed /// synchronously). unsigned getDefaultAsyncThreadsCount(); -/// A simple fixed-size thread pool implementation. +/// A handle to a queue managed by ThreadPool. Can be used to schedule +/// computation on a queue by calling ThreadPool::scheduleOnQueue or signal that +/// the queue is not used anymore by calling ThreadPool::removeQueue. +class QueueHandle { + friend class ThreadPool; + +public: + QueueHandle() = default; + + QueueHandle(const QueueHandle &) = delete; + QueueHandle &operator=(const QueueHandle &) = delete; + + QueueHandle(QueueHandle &&Other) { *this = std::move(Other); } + + QueueHandle &operator=(QueueHandle &&Other) { + assert(Other.isValid()); + Queue = Other.Queue; + Other.Queue = nullptr; + return *this; + } + + bool isValid() const { return Queue != nullptr; } + +private: + explicit QueueHandle(RequestQueue &Queue) : Queue(&Queue) {} + +private: + RequestQueue *Queue = nullptr; +}; + +/// Asynchronous tasks executor backed by a fixed size thread pool. +/// There are two ways to schedule computations in ThreadPool: +/// - schedule will schedule computation to be executed on one of +/// the working threads as soon as an idle thread is available. +/// - scheduleOnQueue will schedule to a specific queue. Requests from the +/// same queue are not processed concurrently. Requests in each queue are +/// executed in the FIFO order. class ThreadPool { public: /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd @@ -116,8 +151,7 @@ /// Add a new request to run function \p F with args \p As to the start of the /// queue. The request will be run on a separate thread. - template - void addToFront(Func &&F, Args &&... As) { + template void schedule(Func &&F, Args &&... As) { if (RunSynchronously) { std::forward(F)(std::forward(As)...); return; @@ -125,28 +159,35 @@ { std::lock_guard Lock(Mutex); - RequestQueue.push_front( + Requests.addToFront( BindWithForward(std::forward(F), std::forward(As)...)); } RequestCV.notify_one(); } - /// Add a new request to run function \p F with args \p As to the end of the - /// queue. The request will be run on a separate thread. - template void addToEnd(Func &&F, Args &&... As) { + /// Add a new request to the end of the queue specified by \p Q. Requests on + /// the same queue will not be processed concurrently. + template + void scheduleOnQueue(const QueueHandle &Q, Func &&F, Args &&... As) { + assert(Q.isValid()); + if (RunSynchronously) { std::forward(F)(std::forward(As)...); return; } - { - std::lock_guard Lock(Mutex); - RequestQueue.push_back( - BindWithForward(std::forward(F), std::forward(As)...)); - } - RequestCV.notify_one(); + Q.Queue->addToBack( + BindWithForward(std::forward(F), std::forward(As)...)); } + /// Create a new queue and return a handle to it. You can schedule new + /// requests on the queue by passing the handle to scheduleOnQueue. + QueueHandle createQueue(); + /// Remove a queue, corresponding to the passed handle \p Q. No new requests + /// can be added the queue after calling this function, but the pending + /// requests will be executed. + void removeQueue(QueueHandle &&Q); + private: bool RunSynchronously; mutable std::mutex Mutex; @@ -156,13 +197,14 @@ std::vector Workers; /// Setting Done to true will make the worker threads terminate. bool Done = false; - /// A queue of requests. - std::deque> RequestQueue; + /// Separate queues, created for the process. + std::map> ExclusiveQueues; /// Condition variable to wake up worker threads. std::condition_variable RequestCV; + /// A queue of requests. + RequestQueue Requests; }; - struct InputsAndAST { const ParseInputs &Inputs; ParsedAST * @@ -170,7 +212,7 @@ struct InputsAndPreamble { const ParseInputs &Inputs; - const PreambleData* Preamble; + const PreambleData *Preamble; }; /// Handles running tasks for ClangdServer and managing the resources (e.g., @@ -180,6 +222,10 @@ Scheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory, ASTParsedCallback ASTCallback); + /// Returns estimated memory usage for each of the currently open files. + /// The order of results is unspecified. + std::vector> getUsedBytesPerFile() const; + /// Schedule an update for \p File. Adds \p File to a list of tracked files if /// \p File was not part of it before. void scheduleUpdate( @@ -212,11 +258,26 @@ UniqueFunction)> Action); private: - const ParseInputs& getInputs(PathRef File); - -private: - llvm::StringMap CachedInputs; - CppFileCollection Units; + struct FileData { + FileData() = default; + FileData(ParseInputs Inputs, std::shared_ptr Resources, + QueueHandle Queue); + + ParseInputs Inputs; + std::shared_ptr Resources; + QueueHandle Queue; + bool LastRequestIsUpdate; + CancellationFlag LastUpdateCF; + }; + + struct AuxData { + bool StorePreamblesInMemory; + std::shared_ptr PCHs; + ASTParsedCallback ASTCallback; + }; + + llvm::StringMap> Files; + AuxData Data; ThreadPool Executor; }; @@ -398,7 +459,6 @@ scheduleReparseAndDiags(Context Ctx, PathRef File, VersionedDraft Contents, Tagged> TaggedFS); - CompileArgsCache CompileArgs; DiagnosticsConsumer &DiagConsumer; FileSystemProvider &FSProvider; Index: clangd/ClangdServer.cpp =================================================================== --- clangd/ClangdServer.cpp +++ clangd/ClangdServer.cpp @@ -97,7 +97,7 @@ } ThreadPool::ThreadPool(unsigned AsyncThreadsCount) - : RunSynchronously(AsyncThreadsCount == 0) { + : RunSynchronously(AsyncThreadsCount == 0), Requests(RequestCV) { if (RunSynchronously) { // Don't start the worker thread if we're running synchronously return; @@ -109,27 +109,52 @@ llvm::set_thread_name(llvm::formatv("scheduler/{0}", I)); while (true) { UniqueFunction Request; + RequestQueue *ExclQueue = nullptr; + std::unique_ptr QueueToRemove; - // Pick request from the queue + // Pick request from the queue. { std::unique_lock Lock(Mutex); // Wait for more requests. - RequestCV.wait(Lock, - [this] { return !RequestQueue.empty() || Done; }); + RequestCV.wait(Lock, [&] { + if (Done) + return true; + if (auto PriorityReq = Requests.pop()) { + Request = std::move(*PriorityReq); + return true; + } + for (auto It = ExclusiveQueues.begin(), End = ExclusiveQueues.end(); + It != End; ++It) { + if (!It->first->needsProcessing()) + continue; + + ExclQueue = It->first; + ExclQueue->startProcessing(); + if (ExclQueue->isScheduledForRemoval()) { + QueueToRemove = std::move(It->second); + ExclusiveQueues.erase(It); + } + return true; + } + return false; + }); + if (Done) return; - - assert(!RequestQueue.empty() && "RequestQueue was empty"); - - // We process requests starting from the front of the queue. Users of - // ThreadPool have a way to prioritise their requests by putting - // them to the either side of the queue (using either addToEnd or - // addToFront). - Request = std::move(RequestQueue.front()); - RequestQueue.pop_front(); + assert(Request || ExclQueue); } // unlock Mutex - Request(); + // We're processing a foreground request. + if (Request) { + Request(); + continue; + } + + // We're processing a non-empty request queue. + assert(ExclQueue); + while (auto ExclRequest = ExclQueue->pop()) + (*ExclRequest)(); + ExclQueue->stopProcessing(); } })); } @@ -150,10 +175,24 @@ Worker.join(); } +QueueHandle ThreadPool::createQueue() { + auto NewQueueOwner = llvm::make_unique(RequestCV); + auto NewQueue = NewQueueOwner.get(); + { + std::lock_guard Lock(Mutex); + ExclusiveQueues.emplace(NewQueue, std::move(NewQueueOwner)); + } + return QueueHandle(*NewQueue); +} + +void ThreadPool::removeQueue(QueueHandle &&Q) { + Q.Queue->setScheduledForRemoval(); +} + Scheduler::Scheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory, ASTParsedCallback ASTCallback) - : Units(StorePreamblesInMemory, std::make_shared(), - std::move(ASTCallback)), + : Data{StorePreamblesInMemory, std::make_shared(), + std::move(ASTCallback)}, Executor(AsyncThreadsCount) {} void Scheduler::scheduleUpdate( @@ -161,95 +200,136 @@ UniqueFunction>)> OnUpdated) { - CachedInputs[File] = Inputs; - - auto Resources = Units.getOrCreateFile(File); - auto DeferredRebuild = Resources->deferRebuild(std::move(Inputs)); + auto It = Files.find(File); + if (It == Files.end()) { + QueueHandle Queue = Executor.createQueue(); + It = Files + .insert( + {File, std::unique_ptr(new FileData{ + Inputs, + CppFile::Create(File, Data.StorePreamblesInMemory, + Data.PCHs, Data.ASTCallback), + std::move(Queue)})}) + .first; + } else { + It->second->Inputs = Inputs; + } - Executor.addToFront( - [](Context Ctx, decltype(OnUpdated) OnUpdated, - decltype(DeferredRebuild) DeferredRebuild) { - auto Diags = DeferredRebuild(Ctx); - OnUpdated(std::move(Ctx), Diags); - }, - std::move(Ctx), std::move(OnUpdated), std::move(DeferredRebuild)); -} + CancellationFlag CF; + CppFile *Resources = It->second->Resources.get(); + auto Task = [Inputs, Resources, CF](Context Ctx, + decltype(OnUpdated) OnUpdated) mutable { + if (CF.isCancelled()) { + // Our request got cancelled, report it to the caller and don't do the + // rebuild. + OnUpdated(std::move(Ctx), llvm::None); + return; + } + auto Diags = Resources->rebuild(Ctx, std::move(Inputs)); + OnUpdated(std::move(Ctx), Diags); + }; -void Scheduler::scheduleRemove(PathRef File, - UniqueFunction Action) { - CachedInputs.erase(File); + FileData *FD = It->second.get(); + Executor.scheduleOnQueue(FD->Queue, Task, std::move(Ctx), + std::move(OnUpdated)); - auto Resources = Units.removeIfPresent(File); - if (!Resources) { - Action(llvm::make_error( - "trying to remove non-added document", llvm::errc::invalid_argument)); - return; - } - - auto DeferredCancel = Resources->deferCancelRebuild(); - Executor.addToFront( - [](decltype(Action) Action, decltype(DeferredCancel) DeferredCancel) { - DeferredCancel(); - Action(llvm::Error::success()); - }, - std::move(Action), std::move(DeferredCancel)); + if (FD->LastRequestIsUpdate) + FD->LastUpdateCF.setCancelled(); + FD->LastRequestIsUpdate = true; + FD->LastUpdateCF = std::move(CF); } void Scheduler::scheduleASTRead( PathRef File, UniqueFunction)> Action) { - auto Resources = Units.getFile(File); - if (!Resources) { + auto It = Files.find(File); + if (It == Files.end()) { Action(llvm::make_error( "trying to get AST for non-added document", llvm::errc::invalid_argument)); return; } - const ParseInputs &Inputs = getInputs(File); - // We currently block the calling thread until AST is available and run the - // action on the calling thread to avoid inconsistent states coming from - // subsequent updates. - // FIXME(ibiryukov): this should be moved to the worker threads. - Resources->getAST().get()->runUnderLock([&](ParsedAST *AST) { - if (AST) + ParseInputs Inputs = It->second->Inputs; + CppFile *Resources = It->second->Resources.get(); + auto Task = [Inputs, Resources](decltype(Action) Action) { + Resources->getAST().get()->runUnderLock([&](ParsedAST *AST) { + if (!AST) { + Action(llvm::make_error( + "invalid AST", llvm::errc::invalid_argument)); + } Action(InputsAndAST{Inputs, *AST}); - else - Action(llvm::make_error( - "Could not build AST for the latest file update", - llvm::errc::invalid_argument)); - }); + }); + }; + + Executor.scheduleOnQueue(It->second->Queue, std::move(Task), + std::move(Action)); + It->second->LastRequestIsUpdate = false; } void Scheduler::schedulePreambleRead( PathRef File, UniqueFunction)> Action) { - std::shared_ptr Resources = Units.getFile(File); - if (!Resources) { + auto It = Files.find(File); + if (It == Files.end()) { Action(llvm::make_error( "trying to get preamble for non-added document", llvm::errc::invalid_argument)); return; } - ParseInputs Inputs = getInputs(File); + ParseInputs InputsCopy = It->second->Inputs; + std::shared_ptr Resources = It->second->Resources; std::shared_ptr Preamble = Resources->getPossiblyStalePreamble(); - Executor.addToFront( - [Resources, Preamble, Inputs](decltype(Action) Action) mutable { - if (!Preamble) - Preamble = Resources->getPossiblyStalePreamble(); + auto Task = [InputsCopy, Resources, + Preamble](decltype(Action) Action) mutable { + if (!Preamble) + Preamble = Resources->getPossiblyStalePreamble(); + Action(InputsAndPreamble{InputsCopy, Preamble.get()}); + }; - Action(InputsAndPreamble{Inputs, Preamble.get()}); - }, - std::move(Action)); + Executor.schedule(std::move(Task), std::move(Action)); +} + +void Scheduler::scheduleRemove(PathRef File, + UniqueFunction Action) { + auto It = Files.find(File); + if (It == Files.end()) { + Action(llvm::make_error( + "trying to remove non-added document", llvm::errc::invalid_argument)); + return; + } + + std::unique_ptr Data = std::move(It->second); + Files.erase(It); + + if (Data->LastRequestIsUpdate) + Data->LastUpdateCF.setCancelled(); + + auto CleanupTask = [](std::unique_ptr Data) { + // Destroy data, nothing else to do. + }; + QueueHandle Queue = std::move(Data->Queue); + Executor.scheduleOnQueue(Queue, CleanupTask, std::move(Data)); + Executor.removeQueue(std::move(Queue)); } -const ParseInputs &Scheduler::getInputs(PathRef File) { - auto It = CachedInputs.find(File); - assert(It != CachedInputs.end()); - return It->second; +std::vector> +Scheduler::getUsedBytesPerFile() const { + std::vector> Result; + Result.reserve(Files.size()); + for (auto &&PathAndFile : Files) + Result.push_back( + {PathAndFile.first(), PathAndFile.second->Resources->getUsedBytes()}); + return Result; } +Scheduler::FileData::FileData(ParseInputs Inputs, + std::shared_ptr Resources, + QueueHandle Queue) + : Inputs(std::move(Inputs)), Resources(std::move(Resources)), + Queue(std::move(Queue)), LastRequestIsUpdate(false) {} + ClangdServer::ClangdServer(GlobalCompilationDatabase &CDB, DiagnosticsConsumer &DiagConsumer, FileSystemProvider &FSProvider, @@ -706,5 +786,5 @@ std::vector> ClangdServer::getUsedBytesPerFile() const { - return Units.getUsedBytesPerFile(); + return WorkScheduler.getUsedBytesPerFile(); } Index: clangd/ClangdUnitStore.h =================================================================== --- clangd/ClangdUnitStore.h +++ /dev/null @@ -1,73 +0,0 @@ -//===--- ClangdUnitStore.h - A container of CppFiles -------------*-C++-*-===// -// -// The LLVM Compiler Infrastructure -// -// This file is distributed under the University of Illinois Open Source -// License. See LICENSE.TXT for details. -// -//===---------------------------------------------------------------------===// - -#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H -#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H - -#include "ClangdUnit.h" -#include "GlobalCompilationDatabase.h" -#include "Logger.h" -#include "Path.h" -#include "clang/Tooling/CompilationDatabase.h" -#include - -namespace clang { -namespace clangd { - -class Logger; - -/// Thread-safe mapping from FileNames to CppFile. -class CppFileCollection { -public: - /// \p ASTCallback is called when a file is parsed synchronously. This should - /// not be expensive since it blocks diagnostics. - explicit CppFileCollection(bool StorePreamblesInMemory, - std::shared_ptr PCHs, - ASTParsedCallback ASTCallback) - : ASTCallback(std::move(ASTCallback)), PCHs(std::move(PCHs)), - StorePreamblesInMemory(StorePreamblesInMemory) {} - - std::shared_ptr getOrCreateFile(PathRef File) { - std::lock_guard Lock(Mutex); - auto It = OpenedFiles.find(File); - if (It == OpenedFiles.end()) { - It = OpenedFiles - .try_emplace(File, CppFile::Create(File, StorePreamblesInMemory, - PCHs, ASTCallback)) - .first; - } - return It->second; - } - - std::shared_ptr getFile(PathRef File) const { - std::lock_guard Lock(Mutex); - auto It = OpenedFiles.find(File); - if (It == OpenedFiles.end()) - return nullptr; - return It->second; - } - - /// Removes a CppFile, stored for \p File, if it's inside collection and - /// returns it. - std::shared_ptr removeIfPresent(PathRef File); - - /// Gets used memory for each of the stored files. - std::vector> getUsedBytesPerFile() const; - -private: - mutable std::mutex Mutex; - llvm::StringMap> OpenedFiles; - ASTParsedCallback ASTCallback; - std::shared_ptr PCHs; - bool StorePreamblesInMemory; -}; -} // namespace clangd -} // namespace clang - -#endif Index: clangd/ClangdUnitStore.cpp =================================================================== --- clangd/ClangdUnitStore.cpp +++ /dev/null @@ -1,37 +0,0 @@ -//===--- ClangdUnitStore.cpp - A ClangdUnits container -----------*-C++-*-===// -// -// The LLVM Compiler Infrastructure -// -// This file is distributed under the University of Illinois Open Source -// License. See LICENSE.TXT for details. -// -//===----------------------------------------------------------------------===// - -#include "ClangdUnitStore.h" -#include "llvm/Support/Path.h" -#include - -using namespace clang::clangd; -using namespace clang; - -std::shared_ptr CppFileCollection::removeIfPresent(PathRef File) { - std::lock_guard Lock(Mutex); - - auto It = OpenedFiles.find(File); - if (It == OpenedFiles.end()) - return nullptr; - - std::shared_ptr Result = It->second; - OpenedFiles.erase(It); - return Result; -} -std::vector> -CppFileCollection::getUsedBytesPerFile() const { - std::lock_guard Lock(Mutex); - std::vector> Result; - Result.reserve(OpenedFiles.size()); - for (auto &&PathAndFile : OpenedFiles) - Result.push_back( - {PathAndFile.first().str(), PathAndFile.second->getUsedBytes()}); - return Result; -} Index: clangd/threading/Cancellation.h =================================================================== --- /dev/null +++ clangd/threading/Cancellation.h @@ -0,0 +1,44 @@ +//===--- Cancellation.h -----------------------------------------*- C++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===---------------------------------------------------------------------===// + +#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_CANCELLATION_H +#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_CANCELLATION_H + +#include +#include +#include + +namespace clang { +namespace clangd { + +/// A shared boolean flag indicating if the computation was cancelled. +/// Once cancelled, cannot be returned to the previous state. +/// FIXME: We should split this class it into consumers and producers of the +/// cancellation flags. +class CancellationFlag { +public: + CancellationFlag(); + + void setCancelled() { + assert(WasCancelled && "the object was moved"); + WasCancelled->store(true); + } + + bool isCancelled() const { + assert(WasCancelled && "the object was moved"); + return WasCancelled->load(); + } + +private: + std::shared_ptr> WasCancelled; +}; +} // namespace clangd +} // namespace clang + +#endif Index: clangd/threading/Cancellation.cpp =================================================================== --- /dev/null +++ clangd/threading/Cancellation.cpp @@ -0,0 +1,18 @@ +//===--- Cancellation.cpp ---------------------------------------*- C++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===---------------------------------------------------------------------===// + +#include "Cancellation.h" +namespace clang { +namespace clangd { + +CancellationFlag::CancellationFlag() + : WasCancelled(std::make_shared>(false)) {} + +} // namespace clangd +} // namespace clang Index: clangd/threading/RequestQueue.h =================================================================== --- /dev/null +++ clangd/threading/RequestQueue.h @@ -0,0 +1,50 @@ +//===--- RequestQueue.h -----------------------------------------*- C++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===---------------------------------------------------------------------===// + +#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_REQUESTQUEUE_H +#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_REQUESTQUEUE_H + +#include "Function.h" +#include +#include +#include + +namespace clang { +namespace clangd { + +/// A thread-safe request queue managed by ThreadPool. This is an implementation +/// detail, see QueueHandle and ThreadPool for user-facing APIs. +class RequestQueue { +public: + RequestQueue(std::condition_variable &Condition); + + void addToFront(UniqueFunction Req); + void addToBack(UniqueFunction Req); + + llvm::Optional> pop(); + + bool needsProcessing() const; + + bool isScheduledForRemoval() const; + void setScheduledForRemoval(); + + void startProcessing(); + void stopProcessing(); + +private: + std::condition_variable &Condition; + mutable std::mutex Mutex; + bool IsProcessing; + bool IsScheduledForRemoval; + std::deque> Requests; +}; + +} // namespace clangd +} // namespace clang +#endif Index: clangd/threading/RequestQueue.cpp =================================================================== --- /dev/null +++ clangd/threading/RequestQueue.cpp @@ -0,0 +1,78 @@ +//===--- RequestQueue.cpp ---------------------------------------*- C++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===---------------------------------------------------------------------===// +#include "RequestQueue.h" + +namespace clang { +namespace clangd { + +RequestQueue::RequestQueue(std::condition_variable &Condition) + : Condition(Condition), IsProcessing(false), IsScheduledForRemoval(false) {} + +void RequestQueue::addToFront(UniqueFunction Req) { + std::unique_lock Lock(Mutex); + Requests.emplace_front(std::move(Req)); + + Lock.unlock(); + Condition.notify_one(); +} + +void RequestQueue::addToBack(UniqueFunction Req) { + std::unique_lock Lock(Mutex); + Requests.emplace_back(std::move(Req)); + + Lock.unlock(); + Condition.notify_one(); +} + +llvm::Optional> RequestQueue::pop() { + std::unique_lock Lock(Mutex); + if (Requests.empty()) + return llvm::None; + + UniqueFunction Result = std::move(Requests.front()); + Requests.pop_front(); + return Result; +} + +bool RequestQueue::needsProcessing() const { + std::unique_lock Lock(Mutex); + if (IsProcessing) + return false; + return !Requests.empty() || IsScheduledForRemoval; +} + +bool RequestQueue::isScheduledForRemoval() const { + std::unique_lock Lock(Mutex); + return IsScheduledForRemoval; +} + +void RequestQueue::setScheduledForRemoval() { + std::unique_lock Lock(Mutex); + assert(!IsScheduledForRemoval); + IsScheduledForRemoval = true; +} + +void RequestQueue::startProcessing() { + std::unique_lock Lock(Mutex); + assert(!IsProcessing); + assert(!Requests.empty() || IsScheduledForRemoval); + IsProcessing = true; +} + +void RequestQueue::stopProcessing() { + std::unique_lock Lock(Mutex); + assert(IsProcessing); + IsProcessing = false; + + Lock.unlock(); + Condition.notify_one(); +} + +} // namespace clangd +} // namespace clang