Index: clangd/CMakeLists.txt =================================================================== --- clangd/CMakeLists.txt +++ clangd/CMakeLists.txt @@ -6,7 +6,6 @@ ClangdLSPServer.cpp ClangdServer.cpp ClangdUnit.cpp - ClangdUnitStore.cpp CodeComplete.cpp CodeCompletionStrings.cpp CompileArgsCache.cpp @@ -32,6 +31,8 @@ index/Merge.cpp index/SymbolCollector.cpp index/SymbolYAML.cpp + threading/Cancellation.cpp + threading/RequestQueue.cpp LINK_LIBS clangAST Index: clangd/ClangdServer.h =================================================================== --- clangd/ClangdServer.h +++ clangd/ClangdServer.h @@ -11,7 +11,6 @@ #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDSERVER_H #include "ClangdUnit.h" -#include "ClangdUnitStore.h" #include "CodeComplete.h" #include "CompileArgsCache.h" #include "DraftStore.h" @@ -95,8 +94,6 @@ getTaggedFileSystem(PathRef File) override; }; -class ClangdServer; - /// Provides API to manage ASTs for a collection of C++ files and request /// various language features. /// Currently supports async diagnostics, code completion, formatting and goto Index: clangd/ClangdUnitStore.h =================================================================== --- clangd/ClangdUnitStore.h +++ /dev/null @@ -1,73 +0,0 @@ -//===--- ClangdUnitStore.h - A container of CppFiles -------------*-C++-*-===// -// -// The LLVM Compiler Infrastructure -// -// This file is distributed under the University of Illinois Open Source -// License. See LICENSE.TXT for details. -// -//===---------------------------------------------------------------------===// - -#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H -#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H - -#include "ClangdUnit.h" -#include "GlobalCompilationDatabase.h" -#include "Logger.h" -#include "Path.h" -#include "clang/Tooling/CompilationDatabase.h" -#include - -namespace clang { -namespace clangd { - -class Logger; - -/// Thread-safe mapping from FileNames to CppFile. -class CppFileCollection { -public: - /// \p ASTCallback is called when a file is parsed synchronously. This should - /// not be expensive since it blocks diagnostics. - explicit CppFileCollection(bool StorePreamblesInMemory, - std::shared_ptr PCHs, - ASTParsedCallback ASTCallback) - : ASTCallback(std::move(ASTCallback)), PCHs(std::move(PCHs)), - StorePreamblesInMemory(StorePreamblesInMemory) {} - - std::shared_ptr getOrCreateFile(PathRef File) { - std::lock_guard Lock(Mutex); - auto It = OpenedFiles.find(File); - if (It == OpenedFiles.end()) { - It = OpenedFiles - .try_emplace(File, CppFile::Create(File, StorePreamblesInMemory, - PCHs, ASTCallback)) - .first; - } - return It->second; - } - - std::shared_ptr getFile(PathRef File) const { - std::lock_guard Lock(Mutex); - auto It = OpenedFiles.find(File); - if (It == OpenedFiles.end()) - return nullptr; - return It->second; - } - - /// Removes a CppFile, stored for \p File, if it's inside collection and - /// returns it. - std::shared_ptr removeIfPresent(PathRef File); - - /// Gets used memory for each of the stored files. - std::vector> getUsedBytesPerFile() const; - -private: - mutable std::mutex Mutex; - llvm::StringMap> OpenedFiles; - ASTParsedCallback ASTCallback; - std::shared_ptr PCHs; - bool StorePreamblesInMemory; -}; -} // namespace clangd -} // namespace clang - -#endif Index: clangd/ClangdUnitStore.cpp =================================================================== --- clangd/ClangdUnitStore.cpp +++ /dev/null @@ -1,37 +0,0 @@ -//===--- ClangdUnitStore.cpp - A ClangdUnits container -----------*-C++-*-===// -// -// The LLVM Compiler Infrastructure -// -// This file is distributed under the University of Illinois Open Source -// License. See LICENSE.TXT for details. -// -//===----------------------------------------------------------------------===// - -#include "ClangdUnitStore.h" -#include "llvm/Support/Path.h" -#include - -using namespace clang::clangd; -using namespace clang; - -std::shared_ptr CppFileCollection::removeIfPresent(PathRef File) { - std::lock_guard Lock(Mutex); - - auto It = OpenedFiles.find(File); - if (It == OpenedFiles.end()) - return nullptr; - - std::shared_ptr Result = It->second; - OpenedFiles.erase(It); - return Result; -} -std::vector> -CppFileCollection::getUsedBytesPerFile() const { - std::lock_guard Lock(Mutex); - std::vector> Result; - Result.reserve(OpenedFiles.size()); - for (auto &&PathAndFile : OpenedFiles) - Result.push_back( - {PathAndFile.first().str(), PathAndFile.second->getUsedBytes()}); - return Result; -} Index: clangd/TUScheduler.h =================================================================== --- clangd/TUScheduler.h +++ clangd/TUScheduler.h @@ -11,9 +11,10 @@ #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_TUSCHEDULER_H #include "ClangdUnit.h" -#include "ClangdUnitStore.h" #include "Function.h" #include "Threading.h" +#include "threading/Cancellation.h" +#include namespace clang { namespace clangd { @@ -32,6 +33,66 @@ const PreambleData *Preamble; }; +/// Limits the number of threads that can acquire this semaphore. +class CountingSemaphore { +public: + CountingSemaphore(std::size_t MaxLocks); + + void lock(); + void unlock(); + +private: + std::mutex Mutex; + std::condition_variable SlotsChanged; + std::size_t FreeSlots; +}; + +class FileASTThread { +public: + FileASTThread(CountingSemaphore &Barrier, std::shared_ptr File, + bool RunSync); + ~FileASTThread(); + + std::shared_ptr getPossiblyStalePreamble() const; + std::size_t getUsedBytes() const; + + void setDone(); + + void update( + Context Ctx, ParseInputs Inputs, + UniqueFunction>)> + OnUpdated); + void enqueueRead(UniqueFunction)> Action); + +private: + const bool RunSync; + mutable std::mutex Mutex; + bool Done; + std::condition_variable RequestsCV; + RequestQueue Requests; + std::shared_ptr File; + // Inputs, corresponding to the current state of File. Note that Requests may + // contain a request to update the inputs. + ParseInputs Inputs; + std::thread Worker; +}; + +class CleanupThread { +public: + CleanupThread(); + ~CleanupThread(); + + void cleanupFile(std::unique_ptr Thread); + void waitTillFinished(std::future Task); + +private: + std::mutex Mutex; + std::condition_variable RequestsCV; + RequestQueue Requests; + bool Done; + std::thread Worker; +}; + /// Handles running tasks for ClangdServer and managing the resources (e.g., /// preambles and ASTs) for opened files. /// TUScheduler is not thread-safe, only one thread should be providing updates @@ -82,11 +143,31 @@ UniqueFunction)> Action); private: - const ParseInputs &getInputs(PathRef File); + struct FileData { + FileData() = default; + FileData(ParseInputs Inputs, std::unique_ptr Worker); + + ~FileData() { + if (!Worker) + return; + Worker->setDone(); + } + + ParseInputs Inputs; + std::unique_ptr Worker; + }; + + struct AuxData { + bool StorePreamblesInMemory; + std::shared_ptr PCHs; + ASTParsedCallback ASTCallback; + }; - llvm::StringMap CachedInputs; - CppFileCollection Files; - ThreadPool Threads; + const bool RunSync; + const AuxData Data; + CountingSemaphore Barrier; + llvm::StringMap> Files; + CleanupThread GCThread; }; } // namespace clangd } // namespace clang Index: clangd/TUScheduler.cpp =================================================================== --- clangd/TUScheduler.cpp +++ clangd/TUScheduler.cpp @@ -14,111 +14,269 @@ return HardwareConcurrency; } +FileASTThread::FileASTThread(CountingSemaphore &Barrier, + std::shared_ptr File, bool RunSync) + : RunSync(RunSync), Done(false), Requests(RequestsCV), + File(std::move(File)) { + if (RunSync) + return; + Worker = std::thread([&]() { + while (true) { + std::unique_lock Lock(Mutex); + RequestsCV.wait(Lock, + [&]() { return Done || Requests.needsProcessing(); }); + Lock.unlock(); + + std::lock_guard BarrierLock(Barrier); + Requests.startProcessing(); + while (auto Req = Requests.pop()) + (*Req)(); + Requests.stopProcessing(); + + if (Done) + return; + } + }); +} + +FileASTThread::~FileASTThread() { + if (RunSync) + return; + +#ifndef NDEBUG + std::unique_lock Lock(Mutex); + assert(Done && "setDone must be called before running destructor"); + Lock.unlock(); +#endif + + Worker.join(); +} + +std::shared_ptr +FileASTThread::getPossiblyStalePreamble() const { + std::lock_guard Lock(Mutex); + return File->getPossiblyStalePreamble(); +} + +std::size_t FileASTThread::getUsedBytes() const { + std::lock_guard Lock(Mutex); + return File->getUsedBytes(); +} + +void FileASTThread::setDone() { + { + std::lock_guard Lock(Mutex); + Done = true; + Requests.setScheduledForRemoval(); + } + RequestsCV.notify_one(); +} + +void FileASTThread::update( + Context Ctx, ParseInputs Inputs, + UniqueFunction>)> + OnUpdated) { + + auto Task = [=](Context Ctx, decltype(OnUpdated) OnUpdated) { + this->Inputs = Inputs; + auto Diags = File->rebuild(Ctx, ParseInputs(Inputs)); + OnUpdated(std::move(Ctx), std::move(Diags)); + }; + + if (RunSync) { + Task(std::move(Ctx), std::move(OnUpdated)); + return; + } + + Requests.addToBack( + BindWithForward(Task, std::move(Ctx), std::move(OnUpdated))); +} + +void FileASTThread::enqueueRead( + UniqueFunction)> Action) { + auto Task = [=](decltype(Action) Action) { + auto AST = File->getAST().get(); + + AST->runUnderLock([&](ParsedAST *AST) { + if (!AST) { + Action(llvm::make_error( + "invalid AST", llvm::errc::invalid_argument)); + return; + } + Action(InputsAndAST{Inputs, *AST}); + }); + }; + + if (RunSync) { + Task(std::move(Action)); + return; + } + + Requests.addToBack(BindWithForward(Task, std::move(Action))); +} + +CleanupThread::CleanupThread() : Requests(RequestsCV), Done(false) { + Worker = std::thread([&]() { + while (true) { + std::unique_lock Lock(Mutex); + RequestsCV.wait(Lock, + [&]() { return Done || Requests.needsProcessing(); }); + if (!Requests.needsProcessing()) { + assert(Done); + return; + } + + auto Req = Requests.pop(); + Lock.unlock(); + (*Req)(); + } + }); +} + +CleanupThread::~CleanupThread() { + std::unique_lock Lock(Mutex); + Done = true; + Lock.unlock(); + + RequestsCV.notify_one(); + Worker.join(); +} + +void CleanupThread::cleanupFile(std::unique_ptr Thread) { + assert(Thread); + + auto CleanupTask = [](std::unique_ptr) { + // Wait for FileASTThreads's destructor to finish. Nothing else to do. + }; + Thread->setDone(); + Requests.addToBack(BindWithForward(CleanupTask, std::move(Thread))); +} + +void CleanupThread::waitTillFinished(std::future Task) { + auto CleanupTask = [](std::future Task) { Task.wait(); }; + Requests.addToBack(BindWithForward(CleanupTask, std::move(Task))); +} + +CountingSemaphore::CountingSemaphore(std::size_t MaxLocks) + : FreeSlots(MaxLocks) {} + +void CountingSemaphore::lock() { + std::unique_lock Lock(Mutex); + SlotsChanged.wait(Lock, [&]() { return FreeSlots > 0; }); + --FreeSlots; +} + +void CountingSemaphore::unlock() { + std::unique_lock Lock(Mutex); + ++FreeSlots; + Lock.unlock(); + + SlotsChanged.notify_one(); +} + TUScheduler::TUScheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory, ASTParsedCallback ASTCallback) - - : Files(StorePreamblesInMemory, std::make_shared(), - std::move(ASTCallback)), - Threads(AsyncThreadsCount) {} + : RunSync(AsyncThreadsCount == 0), + Data{StorePreamblesInMemory, std::make_shared(), + std::move(ASTCallback)}, + Barrier(AsyncThreadsCount) {} void TUScheduler::update( Context Ctx, PathRef File, ParseInputs Inputs, UniqueFunction>)> OnUpdated) { - CachedInputs[File] = Inputs; - - auto Resources = Files.getOrCreateFile(File); - auto DeferredRebuild = Resources->deferRebuild(std::move(Inputs)); + auto It = Files.find(File); + if (It == Files.end()) { + auto Worker = llvm::make_unique( + Barrier, + CppFile::Create(File, Data.StorePreamblesInMemory, Data.PCHs, + Data.ASTCallback), + RunSync); + It = Files + .insert({File, std::unique_ptr( + new FileData{Inputs, std::move(Worker)})}) + .first; + } else { + It->second->Inputs = Inputs; + } - Threads.addToFront( - [](Context Ctx, decltype(OnUpdated) OnUpdated, - decltype(DeferredRebuild) DeferredRebuild) { - auto Diags = DeferredRebuild(Ctx); - OnUpdated(std::move(Ctx), Diags); - }, - std::move(Ctx), std::move(OnUpdated), std::move(DeferredRebuild)); + It->second->Worker->update(std::move(Ctx), Inputs, std::move(OnUpdated)); } void TUScheduler::remove(PathRef File, UniqueFunction Action) { - CachedInputs.erase(File); - - auto Resources = Files.removeIfPresent(File); - if (!Resources) { + auto It = Files.find(File); + if (It == Files.end()) { Action(llvm::make_error( "trying to remove non-added document", llvm::errc::invalid_argument)); return; } - auto DeferredCancel = Resources->deferCancelRebuild(); - Threads.addToFront( - [](decltype(Action) Action, decltype(DeferredCancel) DeferredCancel) { - DeferredCancel(); - Action(llvm::Error::success()); - }, - std::move(Action), std::move(DeferredCancel)); + std::unique_ptr Data = std::move(It->second); + Files.erase(It); + + GCThread.cleanupFile(std::move(Data->Worker)); } void TUScheduler::runWithAST( PathRef File, UniqueFunction)> Action) { - auto Resources = Files.getFile(File); - if (!Resources) { + auto It = Files.find(File); + if (It == Files.end()) { Action(llvm::make_error( "trying to get AST for non-added document", llvm::errc::invalid_argument)); return; } - const ParseInputs &Inputs = getInputs(File); - // We currently block the calling thread until AST is available and run the - // action on the calling thread to avoid inconsistent states coming from - // subsequent updates. - // FIXME(ibiryukov): this should be moved to the worker threads. - Resources->getAST().get()->runUnderLock([&](ParsedAST *AST) { - if (AST) - Action(InputsAndAST{Inputs, *AST}); - else - Action(llvm::make_error( - "Could not build AST for the latest file update", - llvm::errc::invalid_argument)); - }); + It->second->Worker->enqueueRead(std::move(Action)); } void TUScheduler::runWithPreamble( PathRef File, UniqueFunction)> Action) { - std::shared_ptr Resources = Files.getFile(File); - if (!Resources) { + auto It = Files.find(File); + if (It == Files.end()) { Action(llvm::make_error( "trying to get preamble for non-added document", llvm::errc::invalid_argument)); return; } - const ParseInputs &Inputs = getInputs(File); + ParseInputs InputsCopy = It->second->Inputs; std::shared_ptr Preamble = - Resources->getPossiblyStalePreamble(); - Threads.addToFront( - [Resources, Preamble, Inputs](decltype(Action) Action) mutable { - if (!Preamble) - Preamble = Resources->getPossiblyStalePreamble(); + It->second->Worker->getPossiblyStalePreamble(); - Action(InputsAndPreamble{Inputs, Preamble.get()}); - }, - std::move(Action)); -} + if (RunSync) { + Action(InputsAndPreamble{InputsCopy, Preamble.get()}); + return; + } -const ParseInputs &TUScheduler::getInputs(PathRef File) { - auto It = CachedInputs.find(File); - assert(It != CachedInputs.end()); - return It->second; + auto Task = [InputsCopy, Preamble, this](decltype(Action) Action) mutable { + std::lock_guard BarrierLock(Barrier); + // XXX: what if preamble got built by this time? + // if (!Preamble) + // Preamble = Resources->getPossiblyStalePreamble(); + Action(InputsAndPreamble{InputsCopy, Preamble.get()}); + }; + + GCThread.waitTillFinished( + std::async(std::launch::async, Task, std::move(Action))); } std::vector> TUScheduler::getUsedBytesPerFile() const { - return Files.getUsedBytesPerFile(); + std::vector> Result; + Result.reserve(Files.size()); + for (auto &&PathAndFile : Files) + Result.push_back( + {PathAndFile.first(), PathAndFile.second->Worker->getUsedBytes()}); + return Result; } + +TUScheduler::FileData::FileData(ParseInputs Inputs, + std::unique_ptr Worker) + : Inputs(std::move(Inputs)), Worker(std::move(Worker)) {} } // namespace clangd } // namespace clang Index: clangd/Threading.h =================================================================== --- clangd/Threading.h +++ clangd/Threading.h @@ -11,15 +11,53 @@ #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_H #include "Function.h" +#include "threading/RequestQueue.h" #include #include #include #include #include +#include namespace clang { namespace clangd { -/// A simple fixed-size thread pool implementation. +/// A handle to a queue managed by ThreadPool. Can be used to schedule +/// computation on a queue by calling ThreadPool::scheduleOnQueue or signal that +/// the queue is not used anymore by calling ThreadPool::removeQueue. +class QueueHandle { + friend class ThreadPool; + +public: + QueueHandle() = default; + + QueueHandle(const QueueHandle &) = delete; + QueueHandle &operator=(const QueueHandle &) = delete; + + QueueHandle(QueueHandle &&Other) { *this = std::move(Other); } + + QueueHandle &operator=(QueueHandle &&Other) { + assert(Other.isValid()); + Queue = Other.Queue; + Other.Queue = nullptr; + return *this; + } + + bool isValid() const { return Queue != nullptr; } + +private: + explicit QueueHandle(RequestQueue &Queue) : Queue(&Queue) {} + +private: + RequestQueue *Queue = nullptr; +}; + +/// Asynchronous tasks executor backed by a fixed size thread pool. +/// There are two ways to schedule computations in ThreadPool: +/// - schedule will schedule computation to be executed on one of +/// the working threads as soon as an idle thread is available. +/// - scheduleOnQueue will schedule to a specific queue. Requests from the +/// same queue are not processed concurrently. Requests in each queue are +/// executed in the FIFO order. class ThreadPool { public: /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd @@ -31,8 +69,7 @@ /// Add a new request to run function \p F with args \p As to the start of the /// queue. The request will be run on a separate thread. - template - void addToFront(Func &&F, Args &&... As) { + template void schedule(Func &&F, Args &&... As) { if (RunSynchronously) { std::forward(F)(std::forward(As)...); return; @@ -40,28 +77,35 @@ { std::lock_guard Lock(Mutex); - RequestQueue.push_front( + Requests.addToFront( BindWithForward(std::forward(F), std::forward(As)...)); } RequestCV.notify_one(); } - /// Add a new request to run function \p F with args \p As to the end of the - /// queue. The request will be run on a separate thread. - template void addToEnd(Func &&F, Args &&... As) { + /// Add a new request to the end of the queue specified by \p Q. Requests on + /// the same queue will not be processed concurrently. + template + void scheduleOnQueue(const QueueHandle &Q, Func &&F, Args &&... As) { + assert(Q.isValid()); + if (RunSynchronously) { std::forward(F)(std::forward(As)...); return; } - { - std::lock_guard Lock(Mutex); - RequestQueue.push_back( - BindWithForward(std::forward(F), std::forward(As)...)); - } - RequestCV.notify_one(); + Q.Queue->addToBack( + BindWithForward(std::forward(F), std::forward(As)...)); } + /// Create a new queue and return a handle to it. You can schedule new + /// requests on the queue by passing the handle to scheduleOnQueue. + QueueHandle createQueue(); + /// Remove a queue, corresponding to the passed handle \p Q. No new requests + /// can be added the queue after calling this function, but the pending + /// requests will be executed. + void removeQueue(QueueHandle &&Q); + private: bool RunSynchronously; mutable std::mutex Mutex; @@ -71,11 +115,14 @@ std::vector Workers; /// Setting Done to true will make the worker threads terminate. bool Done = false; - /// A queue of requests. - std::deque> RequestQueue; + /// Separate queues, created for the process. + std::map> ExclusiveQueues; /// Condition variable to wake up worker threads. std::condition_variable RequestCV; + /// A queue of requests. + RequestQueue Requests; }; + } // namespace clangd } // namespace clang #endif Index: clangd/Threading.cpp =================================================================== --- clangd/Threading.cpp +++ clangd/Threading.cpp @@ -5,7 +5,7 @@ namespace clang { namespace clangd { ThreadPool::ThreadPool(unsigned AsyncThreadsCount) - : RunSynchronously(AsyncThreadsCount == 0) { + : RunSynchronously(AsyncThreadsCount == 0), Requests(RequestCV) { if (RunSynchronously) { // Don't start the worker thread if we're running synchronously return; @@ -17,27 +17,53 @@ llvm::set_thread_name(llvm::formatv("scheduler/{0}", I)); while (true) { UniqueFunction Request; + RequestQueue *ExclQueue = nullptr; + std::unique_ptr QueueToRemove; - // Pick request from the queue + // Pick request from the queue. { std::unique_lock Lock(Mutex); // Wait for more requests. - RequestCV.wait(Lock, - [this] { return !RequestQueue.empty() || Done; }); - if (RequestQueue.empty()) { + RequestCV.wait(Lock, [&] { + if (auto PriorityReq = Requests.pop()) { + Request = std::move(*PriorityReq); + return true; + } + for (auto It = ExclusiveQueues.begin(), End = ExclusiveQueues.end(); + It != End; ++It) { + if (!It->first->needsProcessing()) + continue; + + ExclQueue = It->first; + ExclQueue->startProcessing(); + if (ExclQueue->isScheduledForRemoval()) { + QueueToRemove = std::move(It->second); + ExclusiveQueues.erase(It); + } + return true; + } + if (Done) + return true; + return false; + }); + + if (!Request && !ExclQueue) { assert(Done); return; } + } // unlock Mutex - // We process requests starting from the front of the queue. Users of - // ThreadPool have a way to prioritise their requests by putting - // them to the either side of the queue (using either addToEnd or - // addToFront). - Request = std::move(RequestQueue.front()); - RequestQueue.pop_front(); - } // unlock Mutex + // We're processing a foreground request. + if (Request) { + Request(); + continue; + } - Request(); + // We're processing a non-empty request queue. + assert(ExclQueue); + while (auto ExclRequest = ExclQueue->pop()) + (*ExclRequest)(); + ExclQueue->stopProcessing(); } })); } @@ -57,5 +83,19 @@ for (auto &Worker : Workers) Worker.join(); } + +QueueHandle ThreadPool::createQueue() { + auto NewQueueOwner = llvm::make_unique(RequestCV); + auto NewQueue = NewQueueOwner.get(); + { + std::lock_guard Lock(Mutex); + ExclusiveQueues.emplace(NewQueue, std::move(NewQueueOwner)); + } + return QueueHandle(*NewQueue); +} + +void ThreadPool::removeQueue(QueueHandle &&Q) { + Q.Queue->setScheduledForRemoval(); +} } // namespace clangd } // namespace clang Index: clangd/threading/Cancellation.h =================================================================== --- /dev/null +++ clangd/threading/Cancellation.h @@ -0,0 +1,44 @@ +//===--- Cancellation.h -----------------------------------------*- C++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===---------------------------------------------------------------------===// + +#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_CANCELLATION_H +#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_CANCELLATION_H + +#include +#include +#include + +namespace clang { +namespace clangd { + +/// A shared boolean flag indicating if the computation was cancelled. +/// Once cancelled, cannot be returned to the previous state. +/// FIXME: We should split this class it into consumers and producers of the +/// cancellation flags. +class CancellationFlag { +public: + CancellationFlag(); + + void setCancelled() { + assert(WasCancelled && "the object was moved"); + WasCancelled->store(true); + } + + bool isCancelled() const { + assert(WasCancelled && "the object was moved"); + return WasCancelled->load(); + } + +private: + std::shared_ptr> WasCancelled; +}; +} // namespace clangd +} // namespace clang + +#endif Index: clangd/threading/Cancellation.cpp =================================================================== --- /dev/null +++ clangd/threading/Cancellation.cpp @@ -0,0 +1,18 @@ +//===--- Cancellation.cpp ---------------------------------------*- C++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===---------------------------------------------------------------------===// + +#include "Cancellation.h" +namespace clang { +namespace clangd { + +CancellationFlag::CancellationFlag() + : WasCancelled(std::make_shared>(false)) {} + +} // namespace clangd +} // namespace clang Index: clangd/threading/RequestQueue.h =================================================================== --- /dev/null +++ clangd/threading/RequestQueue.h @@ -0,0 +1,50 @@ +//===--- RequestQueue.h -----------------------------------------*- C++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===---------------------------------------------------------------------===// + +#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_REQUESTQUEUE_H +#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_THREADING_REQUESTQUEUE_H + +#include "Function.h" +#include +#include +#include + +namespace clang { +namespace clangd { + +/// A thread-safe request queue managed by ThreadPool. This is an implementation +/// detail, see QueueHandle and ThreadPool for user-facing APIs. +class RequestQueue { +public: + RequestQueue(std::condition_variable &Condition); + + void addToFront(UniqueFunction Req); + void addToBack(UniqueFunction Req); + + llvm::Optional> pop(); + + bool needsProcessing() const; + + bool isScheduledForRemoval() const; + void setScheduledForRemoval(); + + void startProcessing(); + void stopProcessing(); + +private: + std::condition_variable &Condition; + mutable std::mutex Mutex; + bool IsProcessing; + bool IsScheduledForRemoval; + std::deque> Requests; +}; + +} // namespace clangd +} // namespace clang +#endif Index: clangd/threading/RequestQueue.cpp =================================================================== --- /dev/null +++ clangd/threading/RequestQueue.cpp @@ -0,0 +1,78 @@ +//===--- RequestQueue.cpp ---------------------------------------*- C++-*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===---------------------------------------------------------------------===// +#include "RequestQueue.h" + +namespace clang { +namespace clangd { + +RequestQueue::RequestQueue(std::condition_variable &Condition) + : Condition(Condition), IsProcessing(false), IsScheduledForRemoval(false) {} + +void RequestQueue::addToFront(UniqueFunction Req) { + std::unique_lock Lock(Mutex); + Requests.emplace_front(std::move(Req)); + + Lock.unlock(); + Condition.notify_one(); +} + +void RequestQueue::addToBack(UniqueFunction Req) { + std::unique_lock Lock(Mutex); + Requests.emplace_back(std::move(Req)); + + Lock.unlock(); + Condition.notify_one(); +} + +llvm::Optional> RequestQueue::pop() { + std::unique_lock Lock(Mutex); + if (Requests.empty()) + return llvm::None; + + UniqueFunction Result = std::move(Requests.front()); + Requests.pop_front(); + return Result; +} + +bool RequestQueue::needsProcessing() const { + std::unique_lock Lock(Mutex); + if (IsProcessing) + return false; + return !Requests.empty() || IsScheduledForRemoval; +} + +bool RequestQueue::isScheduledForRemoval() const { + std::unique_lock Lock(Mutex); + return IsScheduledForRemoval; +} + +void RequestQueue::setScheduledForRemoval() { + std::unique_lock Lock(Mutex); + assert(!IsScheduledForRemoval); + IsScheduledForRemoval = true; +} + +void RequestQueue::startProcessing() { + std::unique_lock Lock(Mutex); + assert(!IsProcessing); + assert(!Requests.empty() || IsScheduledForRemoval); + IsProcessing = true; +} + +void RequestQueue::stopProcessing() { + std::unique_lock Lock(Mutex); + assert(IsProcessing); + IsProcessing = false; + + Lock.unlock(); + Condition.notify_one(); +} + +} // namespace clangd +} // namespace clang