Index: lldb/include/lldb/Core/Communication.h =================================================================== --- lldb/include/lldb/Core/Communication.h +++ lldb/include/lldb/Core/Communication.h @@ -10,6 +10,7 @@ #define LLDB_CORE_COMMUNICATION_H #include "lldb/Host/HostThread.h" +#include "lldb/Host/MainLoop.h" #include "lldb/Utility/Broadcaster.h" #include "lldb/Utility/Timeout.h" #include "lldb/lldb-defines.h" @@ -318,6 +319,8 @@ ///by this communications class. HostThread m_read_thread; ///< The read thread handle in case we need to ///cancel the thread. + std::unique_ptr m_io_loop; ///< The loop instance used by + ///the read thread. std::atomic m_read_thread_enabled; std::atomic m_read_thread_did_exit; std::string Index: lldb/source/Core/Communication.cpp =================================================================== --- lldb/source/Core/Communication.cpp +++ lldb/source/Core/Communication.cpp @@ -228,6 +228,8 @@ m_read_thread_enabled = true; m_read_thread_did_exit = false; + // Allocate the I/O loop in main thread to avoid races. + m_io_loop.reset(new MainLoop()); auto maybe_thread = ThreadLauncher::LaunchThread( thread_name, [this] { return ReadThread(); }); if (maybe_thread) { @@ -258,9 +260,12 @@ BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr); - // error = m_read_thread.Cancel(); + m_io_loop->AddPendingCallback( + [](MainLoopBase &loop) { loop.RequestTermination(); }); Status error = m_read_thread.Join(nullptr); + if (error.Success()) + m_io_loop.reset(nullptr); return error.Success(); } @@ -332,56 +337,76 @@ LLDB_LOG(log, "Communication({0}) thread starting...", this); - uint8_t buf[1024]; - - Status error; - ConnectionStatus status = eConnectionStatusSuccess; - bool done = false; bool disconnect = false; - while (!done && m_read_thread_enabled) { - size_t bytes_read = ReadFromConnection( - buf, sizeof(buf), std::chrono::seconds(5), status, &error); - if (bytes_read > 0 || status == eConnectionStatusEndOfFile) - AppendBytesToCache(buf, bytes_read, true, status); - - switch (status) { - case eConnectionStatusSuccess: - break; - - case eConnectionStatusEndOfFile: - done = true; - disconnect = GetCloseOnEOF(); - break; - case eConnectionStatusError: // Check GetError() for details - if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) { - // EIO on a pipe is usually caused by remote shutdown - disconnect = GetCloseOnEOF(); - done = true; - } - if (error.Fail()) - LLDB_LOG(log, "error: {0}, status = {1}", error, - Communication::ConnectionStatusAsString(status)); - break; - case eConnectionStatusInterrupted: // Synchronization signal from - // SynchronizeWithReadThread() - // The connection returns eConnectionStatusInterrupted only when there is - // no input pending to be read, so we can signal that. - BroadcastEvent(eBroadcastBitNoMorePendingInput); - break; - case eConnectionStatusNoConnection: // No connection - case eConnectionStatusLostConnection: // Lost connection while connected to - // a valid connection - done = true; - [[fallthrough]]; - case eConnectionStatusTimedOut: // Request timed out - if (error.Fail()) - LLDB_LOG(log, "error: {0}, status = {1}", error, - Communication::ConnectionStatusAsString(status)); - break; + ConnectionStatus status = eConnectionStatusSuccess; + Status error; + if (IsConnected()) { + Status loop_error; + auto handle = m_io_loop->RegisterReadObject( + m_connection_sp->GetReadObject(), + [this, &disconnect, &status, &error](MainLoopBase &loop) { + Log *log = GetLog(LLDBLog::Communication); + + if (!m_read_thread_enabled) { + loop.RequestTermination(); + return; + } + + uint8_t buf[1024]; + size_t bytes_read = ReadFromConnection( + buf, sizeof(buf), std::chrono::seconds(5), status, &error); + if (bytes_read > 0 || status == eConnectionStatusEndOfFile) + AppendBytesToCache(buf, bytes_read, true, status); + + switch (status) { + case eConnectionStatusSuccess: + case eConnectionStatusInterrupted: + break; + + case eConnectionStatusEndOfFile: + loop.RequestTermination(); + disconnect = GetCloseOnEOF(); + break; + case eConnectionStatusError: // Check GetError() for details + if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) { + // EIO on a pipe is usually caused by remote shutdown + loop.RequestTermination(); + disconnect = GetCloseOnEOF(); + } + if (error.Fail()) + LLDB_LOG(log, "error: {0}, status = {1}", error, + Communication::ConnectionStatusAsString(status)); + break; + case eConnectionStatusNoConnection: // No connection + case eConnectionStatusLostConnection: // Lost connection while + // connected to a valid + // connection + loop.RequestTermination(); + [[fallthrough]]; + case eConnectionStatusTimedOut: // Request timed out + if (error.Fail()) + LLDB_LOG(log, "error: {0}, status = {1}", error, + Communication::ConnectionStatusAsString(status)); + break; + } + }, + loop_error); + if (loop_error.Success()) + loop_error = m_io_loop->Run(); + if (!loop_error.Success()) { + error = std::move(loop_error); + status = lldb::eConnectionStatusError; } + } else { + if (m_connection_sp) + status = lldb::eConnectionStatusLostConnection; + else + status = lldb::eConnectionStatusNoConnection; + error.SetErrorString(ConnectionStatusAsString(status)); } m_pass_status = status; m_pass_error = std::move(error); + LLDB_LOG(log, "Communication({0}) thread exiting...", this); // Handle threads wishing to synchronize with us. @@ -424,7 +449,9 @@ return; // Notify the read thread. - m_connection_sp->InterruptRead(); + m_io_loop->AddPendingCallback([this](MainLoopBase &loop) { + BroadcastEvent(eBroadcastBitNoMorePendingInput); + }); // Wait for the synchronization event. EventSP event_sp; Index: lldb/unittests/Core/CommunicationTest.cpp =================================================================== --- lldb/unittests/Core/CommunicationTest.cpp +++ lldb/unittests/Core/CommunicationTest.cpp @@ -16,6 +16,8 @@ #include "TestingSupport/SubsystemRAII.h" #include +#include +#include #include #if LLDB_ENABLE_POSIX @@ -114,6 +116,30 @@ CommunicationReadTest(/*use_thread=*/true); } +TEST_F(CommunicationTest, StopReadThread) { + std::condition_variable finished; + std::mutex finished_mutex; + + std::thread test_thread{[&]() { + std::unique_ptr a, b; + ASSERT_TRUE(CreateTCPConnectedSockets("localhost", &a, &b)); + + Communication comm("test"); + comm.SetConnection(std::make_unique(b.release())); + comm.SetCloseOnEOF(true); + + ASSERT_TRUE(comm.StartReadThread()); + ASSERT_TRUE(comm.StopReadThread()); + finished.notify_all(); + }}; + + // StopReadThread() can hang, so force an external timeout. + std::unique_lock lock{finished_mutex}; + ASSERT_EQ(finished.wait_for(lock, std::chrono::seconds(3)), + std::cv_status::no_timeout); + test_thread.join(); +} + TEST_F(CommunicationTest, SynchronizeWhileClosing) { std::unique_ptr a, b; ASSERT_TRUE(CreateTCPConnectedSockets("localhost", &a, &b));