Index: lldb/include/lldb/API/SBCommunication.h =================================================================== --- lldb/include/lldb/API/SBCommunication.h +++ lldb/include/lldb/API/SBCommunication.h @@ -75,7 +75,7 @@ SBCommunication(const SBCommunication &) = delete; const SBCommunication &operator=(const SBCommunication &) = delete; - lldb_private::Communication *m_opaque = nullptr; + lldb_private::ThreadedCommunication *m_opaque = nullptr; bool m_opaque_owned = false; }; Index: lldb/include/lldb/Core/Communication.h =================================================================== --- lldb/include/lldb/Core/Communication.h +++ lldb/include/lldb/Core/Communication.h @@ -9,22 +9,15 @@ #ifndef LLDB_CORE_COMMUNICATION_H #define LLDB_CORE_COMMUNICATION_H -#include "lldb/Host/HostThread.h" -#include "lldb/Utility/Broadcaster.h" #include "lldb/Utility/Timeout.h" #include "lldb/lldb-defines.h" #include "lldb/lldb-enumerations.h" #include "lldb/lldb-forward.h" #include "lldb/lldb-types.h" -#include #include -#include #include -#include -#include - namespace lldb_private { class Connection; class ConstString; @@ -38,90 +31,22 @@ /// approach has a couple of advantages: it allows a single instance of this /// class to be used even though its connection can change. Connections could /// negotiate for different connections based on abilities like starting with -/// Bluetooth and negotiating up to WiFi if available. It also allows this -/// class to be subclassed by any interfaces that don't want to give bytes but -/// want to validate and give out packets. This can be done by overriding: -/// -/// AppendBytesToCache (const uint8_t *src, size_t src_len, bool broadcast); -/// -/// Communication inherits from Broadcaster which means it can be used in -/// conjunction with Listener to wait for multiple broadcaster objects and -/// multiple events from each of those objects. Communication defines a set of -/// pre-defined event bits (see enumerations definitions that start with -/// "eBroadcastBit" below). -/// -/// There are two modes in which communications can occur: -/// \li single-threaded -/// \li multi-threaded -/// -/// In single-threaded mode, all reads and writes happen synchronously on the -/// calling thread. -/// -/// In multi-threaded mode, a read thread is spawned that continually reads -/// data and caches any received bytes. To start the read thread clients call: -/// -/// bool Communication::StartReadThread (Status *); -/// -/// If true is returned a read thread has been spawned that will continually -/// execute a call to the pure virtual DoRead function: +/// Bluetooth and negotiating up to WiFi if available. /// -/// size_t Communication::ReadFromConnection (void *, size_t, uint32_t); -/// -/// When bytes are received the data gets cached in \a m_bytes and this class -/// will broadcast a \b eBroadcastBitReadThreadGotBytes event. Clients that -/// want packet based communication should override AppendBytesToCache. The -/// subclasses can choose to call the built in AppendBytesToCache with the \a -/// broadcast parameter set to false. This will cause the \b -/// eBroadcastBitReadThreadGotBytes event not get broadcast, and then the -/// subclass can post a \b eBroadcastBitPacketAvailable event when a full -/// packet of data has been received. -/// -/// If the connection is disconnected a \b eBroadcastBitDisconnected event -/// gets broadcast. If the read thread exits a \b -/// eBroadcastBitReadThreadDidExit event will be broadcast. Clients can also -/// post a \b eBroadcastBitReadThreadShouldExit event to this object which -/// will cause the read thread to exit. -class Communication : public Broadcaster { +/// When using this class, all reads and writes happen synchronously on the +/// calling thread. There is also a ThreadedCommunication class that supports +/// multi-threaded mode. +class Communication { public: - FLAGS_ANONYMOUS_ENUM(){ - eBroadcastBitDisconnected = - (1u << 0), ///< Sent when the communications connection is lost. - eBroadcastBitReadThreadGotBytes = - (1u << 1), ///< Sent by the read thread when bytes become available. - eBroadcastBitReadThreadDidExit = - (1u - << 2), ///< Sent by the read thread when it exits to inform clients. - eBroadcastBitReadThreadShouldExit = - (1u << 3), ///< Sent by clients that need to cancel the read thread. - eBroadcastBitPacketAvailable = - (1u << 4), ///< Sent when data received makes a complete packet. - eBroadcastBitNoMorePendingInput = (1u << 5), ///< Sent by the read thread - ///to indicate all pending - ///input has been processed. - kLoUserBroadcastBit = - (1u << 16), ///< Subclasses can used bits 31:16 for any needed events. - kHiUserBroadcastBit = (1u << 31), - eAllEventBits = 0xffffffff}; - - typedef void (*ReadThreadBytesReceived)(void *baton, const void *src, - size_t src_len); - - /// Construct the Communication object with the specified name for the - /// Broadcaster that this object inherits from. - /// - /// \param[in] broadcaster_name - /// The name of the broadcaster object. This name should be as - /// complete as possible to uniquely identify this object. The - /// broadcaster name can be updated after the connect function - /// is called. - Communication(const char *broadcaster_name); + /// Construct the Communication object. + Communication(); /// Destructor. /// /// The destructor is virtual since this class gets subclassed. - ~Communication() override; + virtual ~Communication(); - void Clear(); + virtual void Clear(); /// Connect using the current connection by passing \a url to its connect /// function. string. @@ -148,7 +73,7 @@ /// /// \see Status& Communication::GetError (); /// \see bool Connection::Disconnect (); - lldb::ConnectionStatus Disconnect(Status *error_ptr = nullptr); + virtual lldb::ConnectionStatus Disconnect(Status *error_ptr = nullptr); /// Check if the connection is valid. /// @@ -166,13 +91,6 @@ /// If no read thread is running, this function call the connection's /// Connection::Read(...) function to get any available. /// - /// If a read thread has been started, this function will check for any - /// cached bytes that have already been read and return any currently - /// available bytes. If no bytes are cached, it will wait for the bytes to - /// become available by listening for the \a eBroadcastBitReadThreadGotBytes - /// event. If this function consumes all of the bytes in the cache, it will - /// reset the \a eBroadcastBitReadThreadGotBytes event bit. - /// /// \param[in] dst /// A destination buffer that must be at least \a dst_len bytes /// long. @@ -188,8 +106,9 @@ /// The number of bytes actually read. /// /// \see size_t Connection::Read (void *, size_t); - size_t Read(void *dst, size_t dst_len, const Timeout &timeout, - lldb::ConnectionStatus &status, Status *error_ptr); + virtual size_t Read(void *dst, size_t dst_len, + const Timeout &timeout, + lldb::ConnectionStatus &status, Status *error_ptr); /// The actual write function that attempts to write to the communications /// protocol. @@ -237,69 +156,7 @@ /// /// \see /// class Connection - void SetConnection(std::unique_ptr connection); - - /// Starts a read thread whose sole purpose it to read bytes from the - /// current connection. This function will call connection's read function: - /// - /// size_t Connection::Read (void *, size_t); - /// - /// When bytes are read and cached, this function will call: - /// - /// Communication::AppendBytesToCache (const uint8_t * bytes, size_t len, - /// bool - /// broadcast); - /// - /// Subclasses should override this function if they wish to override the - /// default action of caching the bytes and broadcasting a \b - /// eBroadcastBitReadThreadGotBytes event. - /// - /// \return - /// \b True if the read thread was successfully started, \b - /// false otherwise. - /// - /// \see size_t Connection::Read (void *, size_t); - /// \see void Communication::AppendBytesToCache (const uint8_t * bytes, - /// size_t len, bool broadcast); - virtual bool StartReadThread(Status *error_ptr = nullptr); - - /// Stops the read thread by cancelling it. - /// - /// \return - /// \b True if the read thread was successfully canceled, \b - /// false otherwise. - virtual bool StopReadThread(Status *error_ptr = nullptr); - - virtual bool JoinReadThread(Status *error_ptr = nullptr); - /// Checks if there is a currently running read thread. - /// - /// \return - /// \b True if the read thread is running, \b false otherwise. - bool ReadThreadIsRunning(); - - /// The read thread function. This function will call the "DoRead" - /// function continuously and wait for data to become available. When data - /// is received it will append the available data to the internal cache and - /// broadcast a \b eBroadcastBitReadThreadGotBytes event. - /// - /// \param[in] comm_ptr - /// A pointer to an instance of this class. - /// - /// \return - /// \b NULL. - /// - /// \see void Communication::ReadThreadGotBytes (const uint8_t *, size_t); - lldb::thread_result_t ReadThread(); - - void SetReadThreadBytesReceivedCallback(ReadThreadBytesReceived callback, - void *callback_baton); - - /// Wait for the read thread to process all outstanding data. - /// - /// After this function returns, the read thread has processed all data that - /// has been waiting in the Connection queue. - /// - void SynchronizeWithReadThread(); + virtual void SetConnection(std::unique_ptr connection); static std::string ConnectionStatusAsString(lldb::ConnectionStatus status); @@ -307,76 +164,17 @@ void SetCloseOnEOF(bool b) { m_close_on_eof = b; } - static ConstString &GetStaticBroadcasterClass(); - - ConstString &GetBroadcasterClass() const override { - return GetStaticBroadcasterClass(); - } - protected: lldb::ConnectionSP m_connection_sp; ///< The connection that is current in use ///by this communications class. - HostThread m_read_thread; ///< The read thread handle in case we need to - ///cancel the thread. - std::atomic m_read_thread_enabled; - std::atomic m_read_thread_did_exit; - std::string - m_bytes; ///< A buffer to cache bytes read in the ReadThread function. - std::recursive_mutex m_bytes_mutex; ///< A mutex to protect multi-threaded - ///access to the cached bytes. - lldb::ConnectionStatus m_pass_status; ///< Connection status passthrough - ///from read thread. - Status m_pass_error; ///< Error passthrough from read thread. std::mutex m_write_mutex; ///< Don't let multiple threads write at the same time... - std::mutex m_synchronize_mutex; - ReadThreadBytesReceived m_callback; - void *m_callback_baton; bool m_close_on_eof; size_t ReadFromConnection(void *dst, size_t dst_len, const Timeout &timeout, lldb::ConnectionStatus &status, Status *error_ptr); - /// Append new bytes that get read from the read thread into the internal - /// object byte cache. This will cause a \b eBroadcastBitReadThreadGotBytes - /// event to be broadcast if \a broadcast is true. - /// - /// Subclasses can override this function in order to inspect the received - /// data and check if a packet is available. - /// - /// Subclasses can also still call this function from the overridden method - /// to allow the caching to correctly happen and suppress the broadcasting - /// of the \a eBroadcastBitReadThreadGotBytes event by setting \a broadcast - /// to false. - /// - /// \param[in] src - /// A source buffer that must be at least \a src_len bytes - /// long. - /// - /// \param[in] src_len - /// The number of bytes to append to the cache. - virtual void AppendBytesToCache(const uint8_t *src, size_t src_len, - bool broadcast, - lldb::ConnectionStatus status); - - /// Get any available bytes from our data cache. If this call empties the - /// data cache, the \b eBroadcastBitReadThreadGotBytes event will be reset - /// to signify no more bytes are available. - /// - /// \param[in] dst - /// A destination buffer that must be at least \a dst_len bytes - /// long. - /// - /// \param[in] dst_len - /// The number of bytes to attempt to read from the cache, - /// and also the max number of bytes that can be placed into - /// \a dst. - /// - /// \return - /// The number of bytes extracted from the data cache. - size_t GetCachedBytes(void *dst, size_t dst_len); - private: Communication(const Communication &) = delete; const Communication &operator=(const Communication &) = delete; Index: lldb/include/lldb/Core/ThreadedCommunication.h =================================================================== --- /dev/null +++ lldb/include/lldb/Core/ThreadedCommunication.h @@ -0,0 +1,288 @@ +//===-- ThreadedCommunication.h ---------------------------------*- C++ -*-===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#ifndef LLDB_CORE_THREADEDCOMMUNICATION_H +#define LLDB_CORE_THREADEDCOMMUNICATION_H + +#include "lldb/Core/Communication.h" +#include "lldb/Host/HostThread.h" +#include "lldb/Utility/Broadcaster.h" + +#include +#include +#include + +#include +#include + +namespace lldb_private { + +/// \class ThreadedCommunication ThreadedCommunication.h +/// "lldb/Core/ThreadedCommunication.h" Variation of Communication that +/// supports threaded reads. +/// +/// ThreadedCommunication enhances the base Communication class with support +/// for multi-threaded mode. In this mode, a read thread is spawned that +/// continually reads data and caches any received bytes. To start the read +/// thread clients call: +/// +/// bool ThreadedCommunication::StartReadThread (Status *); +/// +/// If true is returned a read thread has been spawned that will continually +/// execute a call to the pure virtual DoRead function: +/// +/// size_t Communication::ReadFromConnection (void *, size_t, uint32_t); +/// +/// When bytes are received the data gets cached in \a m_bytes and this class +/// will broadcast a \b eBroadcastBitReadThreadGotBytes event. Clients that +/// want packet based communication should override AppendBytesToCache. The +/// subclasses can choose to call the built in AppendBytesToCache with the \a +/// broadcast parameter set to false. This will cause the \b +/// eBroadcastBitReadThreadGotBytes event not get broadcast, and then the +/// subclass can post a \b eBroadcastBitPacketAvailable event when a full +/// packet of data has been received. +/// +/// If the connection is disconnected a \b eBroadcastBitDisconnected event +/// gets broadcast. If the read thread exits a \b +/// eBroadcastBitReadThreadDidExit event will be broadcast. Clients can also +/// post a \b eBroadcastBitReadThreadShouldExit event to this object which +/// will cause the read thread to exit. +/// +/// ThreadedCommunication inherits from Broadcaster which means it can be used +/// in conjunction with Listener to wait for multiple broadcaster objects and +/// multiple events from each of those objects. ThreadedCommunication defines a +/// set of pre-defined event bits (see enumerations definitions that start with +/// "eBroadcastBit" below). +class ThreadedCommunication : public Communication, public Broadcaster { + using Communication::Communication; + +public: + FLAGS_ANONYMOUS_ENUM(){ + eBroadcastBitDisconnected = + (1u << 0), ///< Sent when the communications connection is lost. + eBroadcastBitReadThreadGotBytes = + (1u << 1), ///< Sent by the read thread when bytes become available. + eBroadcastBitReadThreadDidExit = + (1u + << 2), ///< Sent by the read thread when it exits to inform clients. + eBroadcastBitReadThreadShouldExit = + (1u << 3), ///< Sent by clients that need to cancel the read thread. + eBroadcastBitPacketAvailable = + (1u << 4), ///< Sent when data received makes a complete packet. + eBroadcastBitNoMorePendingInput = (1u << 5), ///< Sent by the read thread + /// to indicate all pending + /// input has been processed. + }; + + typedef void (*ReadThreadBytesReceived)(void *baton, const void *src, + size_t src_len); + + /// Construct the ThreadedCommunication object with the specified name for the + /// Broadcaster that this object inherits from. + /// + /// \param[in] broadcaster_name + /// The name of the broadcaster object. This name should be as + /// complete as possible to uniquely identify this object. The + /// broadcaster name can be updated after the connect function + /// is called. + ThreadedCommunication(const char *broadcaster_name); + + /// Destructor. + /// + /// The destructor is virtual since this class gets subclassed. + ~ThreadedCommunication() override; + + void Clear() override; + + /// Disconnect the communications connection if one is currently connected. + /// + /// \return + /// \b True if the disconnect succeeded, \b false otherwise. The + /// internal error object should be filled in with an + /// appropriate value based on the result of this function. + /// + /// \see Status& Communication::GetError (); + /// \see bool Connection::Disconnect (); + lldb::ConnectionStatus Disconnect(Status *error_ptr = nullptr) override; + + /// Read bytes from the current connection. + /// + /// If no read thread is running, this function call the connection's + /// Connection::Read(...) function to get any available. + /// + /// If a read thread has been started, this function will check for any + /// cached bytes that have already been read and return any currently + /// available bytes. If no bytes are cached, it will wait for the bytes to + /// become available by listening for the \a eBroadcastBitReadThreadGotBytes + /// event. If this function consumes all of the bytes in the cache, it will + /// reset the \a eBroadcastBitReadThreadGotBytes event bit. + /// + /// \param[in] dst + /// A destination buffer that must be at least \a dst_len bytes + /// long. + /// + /// \param[in] dst_len + /// The number of bytes to attempt to read, and also the max + /// number of bytes that can be placed into \a dst. + /// + /// \param[in] timeout + /// A timeout value or llvm::None for no timeout. + /// + /// \return + /// The number of bytes actually read. + /// + /// \see size_t Connection::Read (void *, size_t); + size_t Read(void *dst, size_t dst_len, const Timeout &timeout, + lldb::ConnectionStatus &status, Status *error_ptr) override; + + /// Sets the connection that it to be used by this class. + /// + /// By making a communication class that uses different connections it + /// allows a single communication interface to negotiate and change its + /// connection without any interruption to the client. It also allows the + /// Communication class to be subclassed for packet based communication. + /// + /// \param[in] connection + /// A connection that this class will own and destroy. + /// + /// \see + /// class Connection + void SetConnection(std::unique_ptr connection) override; + + /// Starts a read thread whose sole purpose it to read bytes from the + /// current connection. This function will call connection's read function: + /// + /// size_t Connection::Read (void *, size_t); + /// + /// When bytes are read and cached, this function will call: + /// + /// Communication::AppendBytesToCache (const uint8_t * bytes, size_t len, + /// bool + /// broadcast); + /// + /// Subclasses should override this function if they wish to override the + /// default action of caching the bytes and broadcasting a \b + /// eBroadcastBitReadThreadGotBytes event. + /// + /// \return + /// \b True if the read thread was successfully started, \b + /// false otherwise. + /// + /// \see size_t Connection::Read (void *, size_t); + /// \see void Communication::AppendBytesToCache (const uint8_t * bytes, + /// size_t len, bool broadcast); + virtual bool StartReadThread(Status *error_ptr = nullptr); + + /// Stops the read thread by cancelling it. + /// + /// \return + /// \b True if the read thread was successfully canceled, \b + /// false otherwise. + virtual bool StopReadThread(Status *error_ptr = nullptr); + + virtual bool JoinReadThread(Status *error_ptr = nullptr); + /// Checks if there is a currently running read thread. + /// + /// \return + /// \b True if the read thread is running, \b false otherwise. + bool ReadThreadIsRunning(); + + /// The read thread function. This function will call the "DoRead" + /// function continuously and wait for data to become available. When data + /// is received it will append the available data to the internal cache and + /// broadcast a \b eBroadcastBitReadThreadGotBytes event. + /// + /// \param[in] comm_ptr + /// A pointer to an instance of this class. + /// + /// \return + /// \b NULL. + /// + /// \see void Communication::ReadThreadGotBytes (const uint8_t *, size_t); + lldb::thread_result_t ReadThread(); + + void SetReadThreadBytesReceivedCallback(ReadThreadBytesReceived callback, + void *callback_baton); + + /// Wait for the read thread to process all outstanding data. + /// + /// After this function returns, the read thread has processed all data that + /// has been waiting in the Connection queue. + /// + void SynchronizeWithReadThread(); + + static ConstString &GetStaticBroadcasterClass(); + + ConstString &GetBroadcasterClass() const override { + return GetStaticBroadcasterClass(); + } + +protected: + HostThread m_read_thread; ///< The read thread handle in case we need to + /// cancel the thread. + std::atomic m_read_thread_enabled; + std::atomic m_read_thread_did_exit; + std::string + m_bytes; ///< A buffer to cache bytes read in the ReadThread function. + std::recursive_mutex m_bytes_mutex; ///< A mutex to protect multi-threaded + /// access to the cached bytes. + lldb::ConnectionStatus m_pass_status; ///< Connection status passthrough + /// from read thread. + Status m_pass_error; ///< Error passthrough from read thread. + std::mutex m_synchronize_mutex; + ReadThreadBytesReceived m_callback; + void *m_callback_baton; + + /// Append new bytes that get read from the read thread into the internal + /// object byte cache. This will cause a \b eBroadcastBitReadThreadGotBytes + /// event to be broadcast if \a broadcast is true. + /// + /// Subclasses can override this function in order to inspect the received + /// data and check if a packet is available. + /// + /// Subclasses can also still call this function from the overridden method + /// to allow the caching to correctly happen and suppress the broadcasting + /// of the \a eBroadcastBitReadThreadGotBytes event by setting \a broadcast + /// to false. + /// + /// \param[in] src + /// A source buffer that must be at least \a src_len bytes + /// long. + /// + /// \param[in] src_len + /// The number of bytes to append to the cache. + virtual void AppendBytesToCache(const uint8_t *src, size_t src_len, + bool broadcast, + lldb::ConnectionStatus status); + + /// Get any available bytes from our data cache. If this call empties the + /// data cache, the \b eBroadcastBitReadThreadGotBytes event will be reset + /// to signify no more bytes are available. + /// + /// \param[in] dst + /// A destination buffer that must be at least \a dst_len bytes + /// long. + /// + /// \param[in] dst_len + /// The number of bytes to attempt to read from the cache, + /// and also the max number of bytes that can be placed into + /// \a dst. + /// + /// \return + /// The number of bytes extracted from the data cache. + size_t GetCachedBytes(void *dst, size_t dst_len); + +private: + ThreadedCommunication(const ThreadedCommunication &) = delete; + const ThreadedCommunication & + operator=(const ThreadedCommunication &) = delete; +}; + +} // namespace lldb_private + +#endif // LLDB_CORE_THREADEDCOMMUNICATION_H Index: lldb/include/lldb/Interpreter/ScriptInterpreter.h =================================================================== --- lldb/include/lldb/Interpreter/ScriptInterpreter.h +++ lldb/include/lldb/Interpreter/ScriptInterpreter.h @@ -13,10 +13,10 @@ #include "lldb/API/SBError.h" #include "lldb/API/SBMemoryRegionInfo.h" #include "lldb/Breakpoint/BreakpointOptions.h" -#include "lldb/Core/Communication.h" #include "lldb/Core/PluginInterface.h" #include "lldb/Core/SearchFilter.h" #include "lldb/Core/StreamFile.h" +#include "lldb/Core/ThreadedCommunication.h" #include "lldb/Host/PseudoTerminal.h" #include "lldb/Interpreter/ScriptedProcessInterface.h" #include "lldb/Utility/Broadcaster.h" @@ -119,7 +119,7 @@ lldb::FileSP m_input_file_sp; lldb::StreamFileSP m_output_file_sp; lldb::StreamFileSP m_error_file_sp; - Communication m_communication; + ThreadedCommunication m_communication; bool m_disconnect; }; Index: lldb/include/lldb/Target/Process.h =================================================================== --- lldb/include/lldb/Target/Process.h +++ lldb/include/lldb/Target/Process.h @@ -22,10 +22,10 @@ #include #include "lldb/Breakpoint/BreakpointSiteList.h" -#include "lldb/Core/Communication.h" #include "lldb/Core/LoadedModuleInfoList.h" #include "lldb/Core/PluginInterface.h" #include "lldb/Core/ThreadSafeValue.h" +#include "lldb/Core/ThreadedCommunication.h" #include "lldb/Core/UserSettingsController.h" #include "lldb/Host/HostThread.h" #include "lldb/Host/ProcessLaunchInfo.h" @@ -2883,7 +2883,7 @@ m_unix_signals_sp; /// This is the current signal set for this process. lldb::ABISP m_abi_sp; lldb::IOHandlerSP m_process_input_reader; - Communication m_stdio_communication; + ThreadedCommunication m_stdio_communication; std::recursive_mutex m_stdio_communication_mutex; bool m_stdin_forward; /// Remember if stdin must be forwarded to remote debug /// server Index: lldb/include/lldb/lldb-forward.h =================================================================== --- lldb/include/lldb/lldb-forward.h +++ lldb/include/lldb/lldb-forward.h @@ -236,6 +236,7 @@ class ThreadPlanTracer; class ThreadSpec; class ThreadPostMortemTrace; +class ThreadedCommunication; class Trace; class TraceCursor; class TraceExporter; Index: lldb/source/API/SBCommunication.cpp =================================================================== --- lldb/source/API/SBCommunication.cpp +++ lldb/source/API/SBCommunication.cpp @@ -8,7 +8,7 @@ #include "lldb/API/SBCommunication.h" #include "lldb/API/SBBroadcaster.h" -#include "lldb/Core/Communication.h" +#include "lldb/Core/ThreadedCommunication.h" #include "lldb/Host/ConnectionFileDescriptor.h" #include "lldb/Host/Host.h" #include "lldb/Utility/Instrumentation.h" @@ -19,7 +19,8 @@ SBCommunication::SBCommunication() { LLDB_INSTRUMENT_VA(this); } SBCommunication::SBCommunication(const char *broadcaster_name) - : m_opaque(new Communication(broadcaster_name)), m_opaque_owned(true) { + : m_opaque(new ThreadedCommunication(broadcaster_name)), + m_opaque_owned(true) { LLDB_INSTRUMENT_VA(this, broadcaster_name); } @@ -169,5 +170,5 @@ const char *SBCommunication::GetBroadcasterClass() { LLDB_INSTRUMENT(); - return Communication::GetStaticBroadcasterClass().AsCString(); + return ThreadedCommunication::GetStaticBroadcasterClass().AsCString(); } Index: lldb/source/Core/CMakeLists.txt =================================================================== --- lldb/source/Core/CMakeLists.txt +++ lldb/source/Core/CMakeLists.txt @@ -54,6 +54,7 @@ SourceManager.cpp StreamAsynchronousIO.cpp StreamFile.cpp + ThreadedCommunication.cpp UserSettingsController.cpp Value.cpp ValueObject.cpp Index: lldb/source/Core/Communication.cpp =================================================================== --- lldb/source/Core/Communication.cpp +++ lldb/source/Core/Communication.cpp @@ -8,22 +8,14 @@ #include "lldb/Core/Communication.h" -#include "lldb/Host/HostThread.h" -#include "lldb/Host/ThreadLauncher.h" #include "lldb/Utility/Connection.h" -#include "lldb/Utility/ConstString.h" -#include "lldb/Utility/Event.h" #include "lldb/Utility/LLDBLog.h" -#include "lldb/Utility/Listener.h" #include "lldb/Utility/Log.h" #include "lldb/Utility/Status.h" -#include "llvm/ADT/None.h" -#include "llvm/ADT/Optional.h" #include "llvm/Support/Compiler.h" #include -#include #include #include @@ -34,42 +26,15 @@ using namespace lldb; using namespace lldb_private; -ConstString &Communication::GetStaticBroadcasterClass() { - static ConstString class_name("lldb.communication"); - return class_name; -} - -Communication::Communication(const char *name) - : Broadcaster(nullptr, name), m_connection_sp(), - m_read_thread_enabled(false), m_read_thread_did_exit(false), m_bytes(), - m_bytes_mutex(), m_write_mutex(), m_synchronize_mutex(), - m_callback(nullptr), m_callback_baton(nullptr), m_close_on_eof(true) - -{ - - LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication), - "{0} Communication::Communication (name = {1})", this, name); - - SetEventName(eBroadcastBitDisconnected, "disconnected"); - SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes"); - SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit"); - SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit"); - SetEventName(eBroadcastBitPacketAvailable, "packet available"); - SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input"); - - CheckInWithManager(); +Communication::Communication() + : m_connection_sp(), m_write_mutex(), m_close_on_eof(true) { } Communication::~Communication() { - LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication), - "{0} Communication::~Communication (name = {1})", this, - GetBroadcasterName().AsCString()); Clear(); } void Communication::Clear() { - SetReadThreadBytesReceivedCallback(nullptr, nullptr); - StopReadThread(nullptr); Disconnect(nullptr); } @@ -91,8 +56,6 @@ LLDB_LOG(GetLog(LLDBLog::Communication), "{0} Communication::Disconnect ()", this); - assert((!m_read_thread_enabled || m_read_thread_did_exit) && - "Disconnecting while the read thread is running is racy!"); lldb::ConnectionSP connection_sp(m_connection_sp); if (connection_sp) { ConnectionStatus status = connection_sp->Disconnect(error_ptr); @@ -129,58 +92,6 @@ "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}", this, dst, dst_len, timeout, m_connection_sp.get()); - if (m_read_thread_enabled) { - // We have a dedicated read thread that is getting data for us - size_t cached_bytes = GetCachedBytes(dst, dst_len); - if (cached_bytes > 0) { - status = eConnectionStatusSuccess; - return cached_bytes; - } - if (timeout && timeout->count() == 0) { - if (error_ptr) - error_ptr->SetErrorString("Timed out."); - status = eConnectionStatusTimedOut; - return 0; - } - - if (!m_connection_sp) { - if (error_ptr) - error_ptr->SetErrorString("Invalid connection."); - status = eConnectionStatusNoConnection; - return 0; - } - - ListenerSP listener_sp(Listener::MakeListener("Communication::Read")); - listener_sp->StartListeningForEvents( - this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit); - EventSP event_sp; - while (listener_sp->GetEvent(event_sp, timeout)) { - const uint32_t event_type = event_sp->GetType(); - if (event_type & eBroadcastBitReadThreadGotBytes) { - return GetCachedBytes(dst, dst_len); - } - - if (event_type & eBroadcastBitReadThreadDidExit) { - // If the thread exited of its own accord, it either means it - // hit an end-of-file condition or an error. - status = m_pass_status; - if (error_ptr) - *error_ptr = std::move(m_pass_error); - - if (GetCloseOnEOF()) - Disconnect(nullptr); - return 0; - } - } - - if (error_ptr) - error_ptr->SetErrorString("Timed out."); - status = eConnectionStatusTimedOut; - return 0; - } - - // We aren't using a read thread, just read the data synchronously in this - // thread. return ReadFromConnection(dst, dst_len, timeout, status, error_ptr); } @@ -213,104 +124,6 @@ return total_written; } -bool Communication::StartReadThread(Status *error_ptr) { - if (error_ptr) - error_ptr->Clear(); - - if (m_read_thread.IsJoinable()) - return true; - - LLDB_LOG(GetLog(LLDBLog::Communication), - "{0} Communication::StartReadThread ()", this); - - const std::string thread_name = - llvm::formatv("", GetBroadcasterName()); - - m_read_thread_enabled = true; - m_read_thread_did_exit = false; - auto maybe_thread = ThreadLauncher::LaunchThread( - thread_name, [this] { return ReadThread(); }); - if (maybe_thread) { - m_read_thread = *maybe_thread; - } else { - if (error_ptr) - *error_ptr = Status(maybe_thread.takeError()); - else { - LLDB_LOG(GetLog(LLDBLog::Host), "failed to launch host thread: {}", - llvm::toString(maybe_thread.takeError())); - } - } - - if (!m_read_thread.IsJoinable()) - m_read_thread_enabled = false; - - return m_read_thread_enabled; -} - -bool Communication::StopReadThread(Status *error_ptr) { - if (!m_read_thread.IsJoinable()) - return true; - - LLDB_LOG(GetLog(LLDBLog::Communication), - "{0} Communication::StopReadThread ()", this); - - m_read_thread_enabled = false; - - BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr); - - // error = m_read_thread.Cancel(); - - Status error = m_read_thread.Join(nullptr); - return error.Success(); -} - -bool Communication::JoinReadThread(Status *error_ptr) { - if (!m_read_thread.IsJoinable()) - return true; - - Status error = m_read_thread.Join(nullptr); - return error.Success(); -} - -size_t Communication::GetCachedBytes(void *dst, size_t dst_len) { - std::lock_guard guard(m_bytes_mutex); - if (!m_bytes.empty()) { - // If DST is nullptr and we have a thread, then return the number of bytes - // that are available so the caller can call again - if (dst == nullptr) - return m_bytes.size(); - - const size_t len = std::min(dst_len, m_bytes.size()); - - ::memcpy(dst, m_bytes.c_str(), len); - m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len); - - return len; - } - return 0; -} - -void Communication::AppendBytesToCache(const uint8_t *bytes, size_t len, - bool broadcast, - ConnectionStatus status) { - LLDB_LOG(GetLog(LLDBLog::Communication), - "{0} Communication::AppendBytesToCache (src = {1}, src_len = {2}, " - "broadcast = {3})", - this, bytes, (uint64_t)len, broadcast); - if ((bytes == nullptr || len == 0) && - (status != lldb::eConnectionStatusEndOfFile)) - return; - if (m_callback) { - // If the user registered a callback, then call it and do not broadcast - m_callback(m_callback_baton, bytes, len); - } else if (bytes != nullptr && len > 0) { - std::lock_guard guard(m_bytes_mutex); - m_bytes.append((const char *)bytes, len); - if (broadcast) - BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes); - } -} - size_t Communication::ReadFromConnection(void *dst, size_t dst_len, const Timeout &timeout, ConnectionStatus &status, @@ -325,115 +138,8 @@ return 0; } -bool Communication::ReadThreadIsRunning() { return m_read_thread_enabled; } - -lldb::thread_result_t Communication::ReadThread() { - Log *log = GetLog(LLDBLog::Communication); - - LLDB_LOG(log, "Communication({0}) thread starting...", this); - - uint8_t buf[1024]; - - Status error; - ConnectionStatus status = eConnectionStatusSuccess; - bool done = false; - bool disconnect = false; - while (!done && m_read_thread_enabled) { - size_t bytes_read = ReadFromConnection( - buf, sizeof(buf), std::chrono::seconds(5), status, &error); - if (bytes_read > 0 || status == eConnectionStatusEndOfFile) - AppendBytesToCache(buf, bytes_read, true, status); - - switch (status) { - case eConnectionStatusSuccess: - break; - - case eConnectionStatusEndOfFile: - done = true; - disconnect = GetCloseOnEOF(); - break; - case eConnectionStatusError: // Check GetError() for details - if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) { - // EIO on a pipe is usually caused by remote shutdown - disconnect = GetCloseOnEOF(); - done = true; - } - if (error.Fail()) - LLDB_LOG(log, "error: {0}, status = {1}", error, - Communication::ConnectionStatusAsString(status)); - break; - case eConnectionStatusInterrupted: // Synchronization signal from - // SynchronizeWithReadThread() - // The connection returns eConnectionStatusInterrupted only when there is - // no input pending to be read, so we can signal that. - BroadcastEvent(eBroadcastBitNoMorePendingInput); - break; - case eConnectionStatusNoConnection: // No connection - case eConnectionStatusLostConnection: // Lost connection while connected to - // a valid connection - done = true; - [[fallthrough]]; - case eConnectionStatusTimedOut: // Request timed out - if (error.Fail()) - LLDB_LOG(log, "error: {0}, status = {1}", error, - Communication::ConnectionStatusAsString(status)); - break; - } - } - m_pass_status = status; - m_pass_error = std::move(error); - LLDB_LOG(log, "Communication({0}) thread exiting...", this); - - // Handle threads wishing to synchronize with us. - { - // Prevent new ones from showing up. - m_read_thread_did_exit = true; - - // Unblock any existing thread waiting for the synchronization signal. - BroadcastEvent(eBroadcastBitNoMorePendingInput); - - // Wait for the thread to finish... - std::lock_guard guard(m_synchronize_mutex); - // ... and disconnect. - if (disconnect) - Disconnect(); - } - - // Let clients know that this thread is exiting - BroadcastEvent(eBroadcastBitReadThreadDidExit); - return {}; -} - -void Communication::SetReadThreadBytesReceivedCallback( - ReadThreadBytesReceived callback, void *callback_baton) { - m_callback = callback; - m_callback_baton = callback_baton; -} - -void Communication::SynchronizeWithReadThread() { - // Only one thread can do the synchronization dance at a time. - std::lock_guard guard(m_synchronize_mutex); - - // First start listening for the synchronization event. - ListenerSP listener_sp( - Listener::MakeListener("Communication::SyncronizeWithReadThread")); - listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput); - - // If the thread is not running, there is no point in synchronizing. - if (!m_read_thread_enabled || m_read_thread_did_exit) - return; - - // Notify the read thread. - m_connection_sp->InterruptRead(); - - // Wait for the synchronization event. - EventSP event_sp; - listener_sp->GetEvent(event_sp, llvm::None); -} - void Communication::SetConnection(std::unique_ptr connection) { Disconnect(nullptr); - StopReadThread(nullptr); m_connection_sp = std::move(connection); } Index: lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.h =================================================================== --- lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.h +++ lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.h @@ -21,8 +21,6 @@ class CommunicationKDP : public lldb_private::Communication { public: - enum { eBroadcastBitRunPacketSent = kLoUserBroadcastBit }; - const static uint32_t kMaxPacketSize = 1200; const static uint32_t kMaxDataSize = 1024; typedef lldb_private::StreamBuffer<4096> PacketStreamType; Index: lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.cpp =================================================================== --- lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.cpp +++ lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.cpp @@ -29,7 +29,7 @@ // CommunicationKDP constructor CommunicationKDP::CommunicationKDP(const char *comm_name) - : Communication(comm_name), m_addr_byte_size(4), + : Communication(), m_addr_byte_size(4), m_byte_order(eByteOrderLittle), m_packet_timeout(5), m_sequence_mutex(), m_is_running(false), m_session_key(0u), m_request_sequence_id(0u), m_exception_sequence_id(0u), m_kdp_version_version(0u), Index: lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.h =================================================================== --- lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.h +++ lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.h @@ -80,10 +80,10 @@ class ProcessGDBRemote; -class GDBRemoteCommunication : public Communication { +class GDBRemoteCommunication : public Communication, public Broadcaster { public: enum { - eBroadcastBitRunPacketSent = kLoUserBroadcastBit, + eBroadcastBitRunPacketSent = (1u << 0), }; enum class PacketType { Invalid = 0, Standard, Notify }; @@ -180,6 +180,8 @@ // false if this class represents a debug session for // a single process + std::string m_bytes; + std::recursive_mutex m_bytes_mutex; CompressionType m_compression_type; PacketResult SendPacketNoLock(llvm::StringRef payload); Index: lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.cpp =================================================================== --- lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.cpp +++ lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.cpp @@ -60,7 +60,7 @@ // GDBRemoteCommunication constructor GDBRemoteCommunication::GDBRemoteCommunication(const char *comm_name, const char *listener_name) - : Communication(comm_name), + : Communication(), Broadcaster(nullptr, comm_name), #ifdef LLDB_CONFIGURATION_DEBUG m_packet_timeout(1000), #else Index: lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunicationServerLLGS.cpp =================================================================== --- lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunicationServerLLGS.cpp +++ lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunicationServerLLGS.cpp @@ -74,7 +74,7 @@ "gdb-remote.server.rx_packet"), m_mainloop(mainloop), m_process_factory(process_factory), m_current_process(nullptr), m_continue_process(nullptr), - m_stdio_communication("process.stdio") { + m_stdio_communication() { RegisterPacketHandlers(); } Index: lldb/source/Plugins/Process/gdb-remote/ProcessGDBRemote.cpp =================================================================== --- lldb/source/Plugins/Process/gdb-remote/ProcessGDBRemote.cpp +++ lldb/source/Plugins/Process/gdb-remote/ProcessGDBRemote.cpp @@ -285,14 +285,6 @@ __FUNCTION__); } - const uint32_t gdb_event_mask = Communication::eBroadcastBitReadThreadDidExit; - if (m_async_listener_sp->StartListeningForEvents( - &m_gdb_comm, gdb_event_mask) != gdb_event_mask) { - LLDB_LOGF(log, - "ProcessGDBRemote::%s failed to listen for m_gdb_comm events", - __FUNCTION__); - } - const uint64_t timeout_seconds = GetGlobalPluginProperties().GetPacketTimeout(); if (timeout_seconds > 0) @@ -3567,21 +3559,6 @@ done = true; break; - default: - LLDB_LOGF(log, - "ProcessGDBRemote::%s(pid = %" PRIu64 - ") got unknown event 0x%8.8x", - __FUNCTION__, GetID(), event_type); - done = true; - break; - } - } else if (event_sp->BroadcasterIs(&m_gdb_comm)) { - switch (event_type) { - case Communication::eBroadcastBitReadThreadDidExit: - SetExitStatus(-1, "lost connection"); - done = true; - break; - default: LLDB_LOGF(log, "ProcessGDBRemote::%s(pid = %" PRIu64 Index: lldb/source/Plugins/ScriptInterpreter/Python/ScriptInterpreterPython.cpp =================================================================== --- lldb/source/Plugins/ScriptInterpreter/Python/ScriptInterpreterPython.cpp +++ lldb/source/Plugins/ScriptInterpreter/Python/ScriptInterpreterPython.cpp @@ -25,9 +25,9 @@ #include "lldb/API/SBValue.h" #include "lldb/Breakpoint/StoppointCallbackContext.h" #include "lldb/Breakpoint/WatchpointOptions.h" -#include "lldb/Core/Communication.h" #include "lldb/Core/Debugger.h" #include "lldb/Core/PluginManager.h" +#include "lldb/Core/ThreadedCommunication.h" #include "lldb/Core/ValueObject.h" #include "lldb/DataFormatters/TypeSummary.h" #include "lldb/Host/FileSystem.h" Index: lldb/unittests/Core/CommunicationTest.cpp =================================================================== --- lldb/unittests/Core/CommunicationTest.cpp +++ lldb/unittests/Core/CommunicationTest.cpp @@ -7,6 +7,7 @@ //===----------------------------------------------------------------------===// #include "lldb/Core/Communication.h" +#include "lldb/Core/ThreadedCommunication.h" #include "lldb/Host/Config.h" #include "lldb/Host/ConnectionFileDescriptor.h" #include "lldb/Host/Pipe.h" @@ -37,7 +38,7 @@ ASSERT_THAT_ERROR(a->Write("test", num_bytes).ToError(), llvm::Succeeded()); ASSERT_EQ(num_bytes, 4U); - Communication comm("test"); + ThreadedCommunication comm("test"); comm.SetConnection(std::make_unique(b.release())); comm.SetCloseOnEOF(true); @@ -118,7 +119,7 @@ std::unique_ptr a, b; ASSERT_TRUE(CreateTCPConnectedSockets("localhost", &a, &b)); - Communication comm("test"); + ThreadedCommunication comm("test"); comm.SetConnection(std::make_unique(b.release())); comm.SetCloseOnEOF(true); ASSERT_TRUE(comm.StartReadThread()); @@ -146,7 +147,7 @@ ConnectionFileDescriptor read_conn{pipe.ReleaseReadFileDescriptor(), /*owns_fd=*/true}; - Communication write_comm("test"); + Communication write_comm; write_comm.SetConnection( std::make_unique(write_fd, /*owns_fd=*/true));