Index: lldb/test/API/functionalities/gdb_remote_client/TestPty.py
===================================================================
--- /dev/null
+++ lldb/test/API/functionalities/gdb_remote_client/TestPty.py
@@ -0,0 +1,37 @@
+import lldb
+from lldbsuite.test.lldbtest import *
+from lldbsuite.test.decorators import *
+from gdbclientutils import *
+
+
+@skipIfWindows
+class TestPty(GDBRemoteTestBase):
+    mydir = TestBase.compute_mydir(__file__)
+    server_socket_class = PtyServerSocket
+
+    def test_process_connect_sync(self):
+        """Test the process connect command in synchronous mode"""
+        try:
+            self.dbg.SetAsync(False)
+            self.expect("platform select remote-gdb-server",
+                        substrs=['Platform: remote-gdb-server', 'Connected: no'])
+            self.expect("process connect file://" +
+                        self.server.get_connect_address(),
+                        substrs=['Process', 'stopped'])
+        finally:
+            self.dbg.GetSelectedPlatform().DisconnectRemote()
+
+    def test_process_connect_async(self):
+        """Test the process connect command in asynchronous mode"""
+        try:
+            self.dbg.SetAsync(True)
+            self.expect("platform select remote-gdb-server",
+                        substrs=['Platform: remote-gdb-server', 'Connected: no'])
+            self.expect("process connect file://" +
+                        self.server.get_connect_address(),
+                        matching=False,
+                        substrs=['Process', 'stopped'])
+            lldbutil.expect_state_changes(self, self.dbg.GetListener(),
+                                          self.process(), [lldb.eStateStopped])
+        finally:
+            self.dbg.GetSelectedPlatform().DisconnectRemote()
Index: lldb/test/API/functionalities/gdb_remote_client/gdbclientutils.py
===================================================================
--- lldb/test/API/functionalities/gdb_remote_client/gdbclientutils.py
+++ lldb/test/API/functionalities/gdb_remote_client/gdbclientutils.py
@@ -1,8 +1,12 @@
+import ctypes
 import errno
+import io
 import os
 import os.path
+import pty
 import threading
 import socket
+import tty
 import lldb
 import binascii
 import traceback
@@ -332,6 +336,101 @@
         pass
 
 
+class ServerSocket:
+    """
+    A wrapper class for TCP or pty-based server.
+    """
+
+    def get_connect_address(self):
+        """Get address for the client to connect to."""
+
+    def close_server(self):
+        """Close all resources used by the server."""
+
+    def accept(self):
+        """Accept a single client connection to the server."""
+
+    def close_connection(self):
+        """Close all resources used by the accepted connection."""
+
+    def recv(self):
+        """Receive a data packet from the connected client."""
+
+    def sendall(self, data):
+        """Send the data to the connected client."""
+
+
+class TCPServerSocket(ServerSocket):
+    def __init__(self):
+        family, type, proto, _, addr = socket.getaddrinfo(
+                "localhost", 0, proto=socket.IPPROTO_TCP)[0]
+        self._server_socket = socket.socket(family, type, proto)
+        self._connection = None
+
+        self._server_socket.bind(addr)
+        self._server_socket.listen(1)
+
+    def get_connect_address(self):
+        return "[{}]:{}".format(*self._server_socket.getsockname())
+
+    def close_server(self):
+        self._server_socket.close()
+
+    def accept(self):
+        assert self._connection is None
+        # accept() is stubborn and won't fail even when the socket is
+        # shutdown, so we'll use a timeout
+        self._server_socket.settimeout(30.0)
+        client, client_addr = self._server_socket.accept()
+        # The connected client inherits its timeout from self._socket,
+        # but we'll use a blocking socket for the client
+        client.settimeout(None)
+        self._connection = client
+
+    def close_connection(self):
+        assert self._connection is not None
+        self._connection.close()
+        self._connection = None
+
+    def recv(self):
+        assert self._connection is not None
+        return self._connection.recv(4096)
+
+    def sendall(self, data):
+        assert self._connection is not None
+        return self._connection.sendall(data)
+
+
+class PtyServerSocket(ServerSocket):
+    def __init__(self):
+        master, slave = pty.openpty()
+        tty.setraw(master)
+        self._master = io.FileIO(master, 'r+b')
+        self._slave = io.FileIO(slave, 'r+b')
+
+    def get_connect_address(self):
+        libc = ctypes.CDLL(None)
+        libc.ptsname.argtypes = (ctypes.c_int,)
+        libc.ptsname.restype = ctypes.c_char_p
+        return libc.ptsname(self._master.fileno()).decode()
+
+    def close_server(self):
+        self._slave.close()
+        self._master.close()
+
+    def recv(self):
+        try:
+            return self._master.read(4096)
+        except OSError as e:
+            # closing the pty results in EIO on Linux, convert it to EOF
+            if e.errno == errno.EIO:
+                return b''
+            raise
+
+    def sendall(self, data):
+        return self._master.write(data)
+
+
 class MockGDBServer:
     """
     A simple TCP-based GDB server that can test client behavior by receiving
@@ -343,48 +442,34 @@
 
     responder = None
     _socket = None
-    _client = None
     _thread = None
     _receivedData = None
     _receivedDataOffset = None
     _shouldSendAck = True
 
-    def __init__(self):
+    def __init__(self, socket_class):
+        self._socket_class = socket_class
         self.responder = MockGDBServerResponder()
 
     def start(self):
-        family, type, proto, _, addr = socket.getaddrinfo("localhost", 0,
-                proto=socket.IPPROTO_TCP)[0]
-        self._socket = socket.socket(family, type, proto)
-
-
-        self._socket.bind(addr)
-        self._socket.listen(1)
-
+        self._socket = self._socket_class()
         # Start a thread that waits for a client connection.
         self._thread = threading.Thread(target=self._run)
         self._thread.start()
 
     def stop(self):
-        self._socket.close()
+        self._socket.close_server()
         self._thread.join()
         self._thread = None
 
     def get_connect_address(self):
-        return "[{}]:{}".format(*self._socket.getsockname())
+        return self._socket.get_connect_address()
 
     def _run(self):
         # For testing purposes, we only need to worry about one client
         # connecting just one time.
         try:
-            # accept() is stubborn and won't fail even when the socket is
-            # shutdown, so we'll use a timeout
-            self._socket.settimeout(30.0)
-            client, client_addr = self._socket.accept()
-            self._client = client
-            # The connected client inherits its timeout from self._socket,
-            # but we'll use a blocking socket for the client
-            self._client.settimeout(None)
+            self._socket.accept()
         except:
             return
         self._shouldSendAck = True
@@ -393,14 +478,14 @@
         data = None
         while True:
             try:
-                data = seven.bitcast_to_string(self._client.recv(4096))
+                data = seven.bitcast_to_string(self._socket.recv())
                 if data is None or len(data) == 0:
                     break
                 self._receive(data)
             except Exception as e:
                 print("An exception happened when receiving the response from the gdb server. Closing the client...")
                 traceback.print_exc()
-                self._client.close()
+                self._socket.close_connection()
                 break
 
     def _receive(self, data):
@@ -415,7 +500,7 @@
                 self._handlePacket(packet)
                 packet = self._parsePacket()
         except self.InvalidPacketException:
-            self._client.close()
+            self._socket.close_connection()
 
     def _parsePacket(self):
         """
@@ -492,7 +577,7 @@
         # We'll handle the ack stuff here since it's not something any of the
         # tests will be concerned about, and it'll get turned off quickly anyway.
         if self._shouldSendAck:
-            self._client.sendall(seven.bitcast_to_bytes('+'))
+            self._socket.sendall(seven.bitcast_to_bytes('+'))
         if packet == "QStartNoAckMode":
             self._shouldSendAck = False
             response = "OK"
@@ -502,7 +587,7 @@
         # Handle packet framing since we don't want to bother tests with it.
         if response is not None:
             framed = frame_packet(response)
-            self._client.sendall(seven.bitcast_to_bytes(framed))
+            self._socket.sendall(seven.bitcast_to_bytes(framed))
 
     PACKET_ACK = object()
     PACKET_INTERRUPT = object()
@@ -510,6 +595,7 @@
     class InvalidPacketException(Exception):
         pass
 
+
 class GDBRemoteTestBase(TestBase):
     """
     Base class for GDB client tests.
@@ -522,10 +608,11 @@
     NO_DEBUG_INFO_TESTCASE = True
     mydir = TestBase.compute_mydir(__file__)
     server = None
+    server_socket_class = TCPServerSocket
 
     def setUp(self):
         TestBase.setUp(self)
-        self.server = MockGDBServer()
+        self.server = MockGDBServer(socket_class=self.server_socket_class)
         self.server.start()
 
     def tearDown(self):