Index: lldb/include/lldb/Core/Communication.h =================================================================== --- lldb/include/lldb/Core/Communication.h +++ lldb/include/lldb/Core/Communication.h @@ -10,6 +10,7 @@ #define LLDB_CORE_COMMUNICATION_H #include "lldb/Host/HostThread.h" +#include "lldb/Host/MainLoop.h" #include "lldb/Utility/Broadcaster.h" #include "lldb/Utility/Timeout.h" #include "lldb/lldb-defines.h" @@ -318,6 +319,7 @@ ///by this communications class. HostThread m_read_thread; ///< The read thread handle in case we need to ///cancel the thread. + MainLoop m_io_loop; //< Loop instance used by the read thread. std::atomic m_read_thread_enabled; std::atomic m_read_thread_did_exit; std::string Index: lldb/source/Core/Communication.cpp =================================================================== --- lldb/source/Core/Communication.cpp +++ lldb/source/Core/Communication.cpp @@ -262,7 +262,9 @@ BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr); - // error = m_read_thread.Cancel(); + m_io_loop.AddPendingCallback( + [](MainLoopBase &loop) { loop.RequestTermination(); }); + m_io_loop.TriggerPendingCallbacks(); Status error = m_read_thread.Join(nullptr); return error.Success(); @@ -336,54 +338,70 @@ LLDB_LOG(log, "Communication({0}) thread starting...", this); - uint8_t buf[1024]; - - Status error; - ConnectionStatus status = eConnectionStatusSuccess; - bool done = false; bool disconnect = false; - while (!done && m_read_thread_enabled) { - size_t bytes_read = ReadFromConnection( - buf, sizeof(buf), std::chrono::seconds(5), status, &error); - if (bytes_read > 0 || status == eConnectionStatusEndOfFile) - AppendBytesToCache(buf, bytes_read, true, status); - - switch (status) { - case eConnectionStatusSuccess: - break; - - case eConnectionStatusEndOfFile: - done = true; - disconnect = GetCloseOnEOF(); - break; - case eConnectionStatusError: // Check GetError() for details - if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) { - // EIO on a pipe is usually caused by remote shutdown - disconnect = GetCloseOnEOF(); - done = true; - } - if (error.Fail()) - LLDB_LOG(log, "error: {0}, status = {1}", error, - Communication::ConnectionStatusAsString(status)); - break; - case eConnectionStatusInterrupted: // Synchronization signal from - // SynchronizeWithReadThread() - // The connection returns eConnectionStatusInterrupted only when there is - // no input pending to be read, so we can signal that. - BroadcastEvent(eBroadcastBitNoMorePendingInput); - break; - case eConnectionStatusNoConnection: // No connection - case eConnectionStatusLostConnection: // Lost connection while connected to - // a valid connection - done = true; - [[fallthrough]]; - case eConnectionStatusTimedOut: // Request timed out - if (error.Fail()) - LLDB_LOG(log, "error: {0}, status = {1}", error, - Communication::ConnectionStatusAsString(status)); - break; - } + if (m_connection_sp) { + Status error; + auto handle = m_io_loop.RegisterReadObject( + m_connection_sp->GetReadObject(), + [this, &disconnect](MainLoopBase &loop) { + Log *log = GetLog(LLDBLog::Communication); + + if (!m_read_thread_enabled) { + loop.RequestTermination(); + return; + } + + uint8_t buf[1024]; + ConnectionStatus status = eConnectionStatusSuccess; + Status error; + size_t bytes_read = ReadFromConnection( + buf, sizeof(buf), std::chrono::seconds(5), status, &error); + if (bytes_read > 0 || status == eConnectionStatusEndOfFile) + AppendBytesToCache(buf, bytes_read, true, status); + + switch (status) { + case eConnectionStatusSuccess: + break; + + case eConnectionStatusEndOfFile: + loop.RequestTermination(); + disconnect = GetCloseOnEOF(); + break; + case eConnectionStatusError: // Check GetError() for details + if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) { + // EIO on a pipe is usually caused by remote shutdown + loop.RequestTermination(); + disconnect = GetCloseOnEOF(); + } + if (error.Fail()) + LLDB_LOG(log, "error: {0}, status = {1}", error, + Communication::ConnectionStatusAsString(status)); + break; + case eConnectionStatusInterrupted: // Synchronization signal from + // SynchronizeWithReadThread() + // The connection returns eConnectionStatusInterrupted only when + // there is no input pending to be read, so we can signal that. + BroadcastEvent(eBroadcastBitNoMorePendingInput); + break; + case eConnectionStatusNoConnection: // No connection + case eConnectionStatusLostConnection: // Lost connection while + // connected to a valid + // connection + loop.RequestTermination(); + [[fallthrough]]; + case eConnectionStatusTimedOut: // Request timed out + if (error.Fail()) + LLDB_LOG(log, "error: {0}, status = {1}", error, + Communication::ConnectionStatusAsString(status)); + break; + } + }, + error); + assert(error.Success()); + error = m_io_loop.Run(); + assert(error.Success()); } + log = GetLog(LLDBLog::Communication); LLDB_LOG(log, "Communication({0}) thread exiting...", this); Index: lldb/unittests/Core/CommunicationTest.cpp =================================================================== --- lldb/unittests/Core/CommunicationTest.cpp +++ lldb/unittests/Core/CommunicationTest.cpp @@ -13,6 +13,9 @@ #include "llvm/Testing/Support/Error.h" #include "gtest/gtest.h" +#include +#include +#include #include #if LLDB_ENABLE_POSIX @@ -88,6 +91,31 @@ CommunicationReadTest(/*use_thread=*/true); } +TEST(CommunicationTest, StopReadThread) { + std::condition_variable finished; + std::mutex finished_mutex; + + std::thread test_thread{[&finished]() { + Pipe pipe; + ASSERT_THAT_ERROR(pipe.CreateNew(/*child_process_inherit=*/false).ToError(), + llvm::Succeeded()); + + Communication comm("test"); + comm.SetConnection(std::make_unique( + pipe.ReleaseReadFileDescriptor(), /*owns_fd=*/true)); + comm.SetCloseOnEOF(true); + + ASSERT_TRUE(comm.StartReadThread()); + ASSERT_TRUE(comm.StopReadThread()); + finished.notify_all(); + }}; + + // StopReadThread() can hang, so force an external timeout. + std::unique_lock lock{finished_mutex}; + ASSERT_EQ(finished.wait_for(lock, std::chrono::seconds(3)), std::cv_status::no_timeout); + test_thread.join(); +} + TEST(CommunicationTest, SynchronizeWhileClosing) { // Set up a communication object reading from a pipe. Pipe pipe;