Index: test/test_runner/lib/process_control.py =================================================================== --- test/test_runner/lib/process_control.py +++ test/test_runner/lib/process_control.py @@ -13,6 +13,7 @@ """ # System imports +import datetime import os import re import signal @@ -21,31 +22,40 @@ import threading -class CommunicatorThread(threading.Thread): - """Provides a thread class that communicates with a subprocess.""" - def __init__(self, process, event, output_file): - super(CommunicatorThread, self).__init__() +class CallAndNotifyThread(threading.Thread): + """Provides a thread class that calls a method, then notifies. + + Implements a thread that sits on a synchronous call, then notifies + the client of the return and (if an exception was raised while waiting), + any exception that occurred. + """ + + def __init__(self, call_func, completion_func): + """Initializes the CommunicatorThread object. + + @param call_func the callable object that should be called as + result = call_func() + + @param completion_func the callable object that will be called + as completion_func(call_func_result, None) if no exception occurred, + or completion_func(None, exception) if an exception occurred. + """ + super(CallAndNotifyThread, self).__init__() # Don't let this thread prevent shutdown. self.daemon = True - self.process = process - self.pid = process.pid - self.event = event - self.output_file = output_file - self.output = None + self.call_func = call_func + self.completion_func = completion_func def run(self): try: - # Communicate with the child process. - # This will not complete until the child process terminates. - self.output = self.process.communicate() + # Call the call_func() + result = self.call_func() + # Synchronous method completed. Notify client. + self.completion_func(result, None) except Exception as exception: # pylint: disable=broad-except - if self.output_file: - self.output_file.write( - "exception while using communicate() for pid: {}\n".format( - exception)) - finally: - # Signal that the thread's run is complete. - self.event.set() + # Notify client that we hit an exception while + # waiting for our synchronous call to complete. + self.completion_func(None, exception) # Provides a regular expression for matching gtimeout-based durations. @@ -404,21 +414,34 @@ super(ProcessDriver, self).__init__() self.process_helper = ProcessHelper.process_helper() self.pid = None - # Create the synchronization event for notifying when the - # inferior dotest process is complete. - self.done_event = threading.Event() - self.io_thread = None + self.process = None + # Number of seconds to wait for the soft terminate to # wrap up, before moving to more drastic measures. # Might want this longer if core dumps are generated and # take a long time to write out. self.soft_terminate_timeout = soft_terminate_timeout + # Number of seconds to wait for the hard terminate to # wrap up, before giving up on the io thread. This should # be fast. self.hard_terminate_timeout = 5.0 + + # Number of seconds to wait after one of the process + # completion events succeeds but we're now waiting on + # the other (quite likely wait() succeeded, but stdout/stderr + # are not closed, possibly due to child sharing of file + # descriptors. Used during normal (non-timeout) completion + # of any of the child process completion events. + self.wait_for_all_completion_timeout = 5.0 + + # This condition variable protects the following two state variables. + self.ending_condition = threading.Condition() + self.wait_thread = None self.returncode = None + self.communicate_thread = None + self.output = None # ============================================= # Methods for subclasses to override if desired. @@ -444,23 +467,26 @@ def run_command(self, command): # Start up the child process and the thread that does the # communication pump. - self._start_process_and_io_thread(command) + self._start_process_and_completion_threads(command) + + # Wait for either the wait() or communicate() thread to + # complete + self._wait_for_any_completion_event() + cleanup_result = self._wait_for_all_completion_events( + 10.0) + if not cleanup_result: + self._raise_on_incomplete_state() - # Wait indefinitely for the child process to finish - # communicating. This indicates it has closed stdout/stderr - # pipes and is done. - self.io_thread.join() - self.returncode = self.process.wait() if self.returncode is None: raise Exception( "no exit status available for pid {} after the " - " inferior dotest.py should have completed".format( + " child process should have completed".format( self.process.pid)) # Notify of non-timeout exit. self.on_process_exited( command, - self.io_thread.output, + self.output, False, self.returncode) @@ -470,7 +496,7 @@ # Start up the child process and the thread that does the # communication pump. - self._start_process_and_io_thread(command) + self._start_process_and_completion_threads(command) self._wait_with_timeout(timeout_seconds, command, want_core) @@ -478,20 +504,207 @@ # Internal details. # ================ - def _start_process_and_io_thread(self, command): - # Create the process. + def _raise_on_incomplete_state(self): + do_raise = False + self.ending_condition.acquire() + try: + if self.output is not None: + communicate_state = "completed" + else: + communicate_state = "incomplete" + do_raise = True + if self.returncode is not None: + wait_state = "completed" + else: + wait_state = "incomplete" + do_raise = True + finally: + self.ending_condition.release() + + if do_raise: + raise Exception( + "failed waiting on one of wait() and communicate(): " + "communicate()={}, wait()={}".format( + communicate_state, wait_state)) + + def _wait_completed(self, result, exception): + """Notifies that the wait() on the child process completed. + + result will be non-None if there was no exception caught + while calling the method; otherwise, result will be None + and exception will be non-None, indicating the exception + thrown. + + @param result the result of calling subprocess.Popen.wait(). + This will be the returncode from wait(), unless an exception + occurred. In that case, it will be None. + + @param exception specifies the exception that occurred while + calling wait(); otherwise, None on normal operation. + """ + # Report any errors. + if exception is not None: + self.write("caught exception during wait() on process: {}".format( + exception)) + + # Grab the condition lock. + self.ending_condition.acquire() + + try: + # Update our state. + if exception is not None: + # Save it as the returncode. We'll check for + # exceptions when the condition variable is triggered. + # This allows us to wrap up, but also know we totally + # failed to reap the process (along with why). + self.returncode = exception + else: + self.returncode = result + + # Notify that we have new state. + self.ending_condition.notifyAll() + finally: + # And release the lock. + self.ending_condition.release() + + def _communicate_completed(self, output, exception): + """Notifies that the communicate() on the child process completed. + + @param output the (stdout, stderr) from the child process; None + if an exception occurred while waiting for communicate() to return. + + @param exception specifies the exception that occurred while + calling wait(); otherwise, None on normal operation with communicate() + completing successfully. + """ + # Report any errors. + if exception is not None: + self.write( + "caught exception during communicate() on process: {}".format( + exception)) + + # Grab the condition lock. + self.ending_condition.acquire() + + try: + # Update our state. + if exception is None: + if output is None: + # This will create some problems as we're + # expecting self.output to have something (which + # can be a tuple with empty strings, or an exception). + # Note the issue, as something went awry if we + # didn't get an exception *and* the return value + # was None. We'll later use self.output is None + # to validate that the communicate() thread is + # still doing work. + output = ('', '') + self.write( + "unexpected: communicate() completed but return value " + "was None, replacing with ('', '')") + self.output = output + else: + self.output = exception + + # Notify that we have new state. + self.ending_condition.notifyAll() + finally: + # And release the lock. + self.ending_condition.release() + + def _wait_for_any_completion_event(self, timeout=None): + """Waits for either wait() or commuicate() to complete, or timeout. + + @param timeout float representing the number of seconds to wait for + one of the completion events to occur. May be None if no timeout + is desired, in which case it will wait indefinitely for one of them + to occur. + + @return True if either of the completion events triggered the return, + None if the timeout was reached first. + """ + if timeout: + end_time = datetime.datetime.now() + datetime.timedelta(0, timeout) + else: + end_time = None + + # Do the following while we haven't timed out (or don't have a timeout) + self.ending_condition.acquire() + try: + # While we haven't timed out, and none of the completion + # events have occured, we'll wait. + while ((not end_time or datetime.datetime.now() < end_time) and + (self.returncode is None) and (self.output is None)): + # Wait with a timeout. The reason for the timeout on + # this wait is to support our overall child process + # timeout mechanism. We need to be able to kill this + # thing if it isn't ending. The fidelity of this timer + # only needs to be good enough to catch a timeout + # issue. + self.ending_condition.wait(5.0) + return (self.returncode is not None) or (self.output is not None) + finally: + # Make sure we release the lock. + self.ending_condition.release() + + def _wait_for_all_completion_events(self, timeout): + """Given that we received at least one of the completion events, + wait for the remaining. + + The intent of this method is to provide a short, graceful collection + of the completion event that has not yet occurred. e.g. if the + wait() returns but the communicate() hasn't fully drained the read + side of the pipe, we need the communicate to wrap up cleanly in order + to parse output properly. The timeout should be short here, just + a few seconds should do. + + Unlike _wait_for_any_completion_event(), this call requires a + timeout. + + @timeout a float indicating the maximum amount of time to wait for + the remaining completion event. + + @return True if all completion events are now complete; False + otherwise (i.e. a timeout occurred). + """ + # Do the following while we haven't timed out (or don't have a timeout) + end_time = datetime.datetime.now() + datetime.timedelta(0, timeout) + + self.ending_condition.acquire() + try: + # While we haven't timed out, and either of the completion + # events have not yet occurred, we'll wait. + while ((datetime.datetime.now() < end_time) and + ((self.returncode is None) or (self.output is None))): + # Wait with a timeout. The reason for the timeout on + # this wait is to support our overall child process + # timeout mechanism. We need to be able to kill this + # thing if it isn't ending. The fidelity of this timer + # only needs to be good enough to catch a timeout + # issue. + self.ending_condition.wait(1.0) + return (self.returncode is not None) and (self.output is not None) + finally: + # Make sure we release the lock. + self.ending_condition.release() + + def _start_process_and_completion_threads(self, command): + # Create and start the process. self.process = self.process_helper.create_piped_process(command) self.pid = self.process.pid self.on_process_started() - # Ensure the event is cleared that is used for signaling - # from the communication() thread when communication is - # complete (i.e. the inferior process has finished). - self.done_event.clear() + # Create the thread that waits on Popen.communicate() + self.communicate_thread = CallAndNotifyThread( + self.process.communicate, + self._communicate_completed) + self.communicate_thread.start() - self.io_thread = CommunicatorThread( - self.process, self.done_event, self.write) - self.io_thread.start() + # Create the thread that waits on Popen.wait() + self.wait_thread = CallAndNotifyThread( + self.process.wait, + self._wait_completed) + self.wait_thread.start() def _attempt_soft_kill(self, want_core): # The inferior dotest timed out. Attempt to clean it @@ -503,23 +716,14 @@ want_core=want_core, log_file=self) - # Now wait up to a certain timeout period for the io thread - # to say that the communication ended. If that wraps up - # within our soft terminate timeout, we're all done here. - self.io_thread.join(self.soft_terminate_timeout) - if not self.io_thread.is_alive(): - # stdout/stderr were closed on the child process side. We - # should be able to wait and reap the child process here. - self.returncode = self.process.wait() - # We terminated, and the done_trying result is n/a - terminated = True - done_trying = None - else: + # This will wait for all completion events to complete. + terminated = self._wait_for_all_completion_events( + self.soft_terminate_timeout) + if not terminated: self.write("soft kill attempt of process {} timed out " "after {} seconds\n".format( self.process.pid, self.soft_terminate_timeout)) - terminated = False - done_trying = False + done_trying = False return terminated, done_trying def _attempt_hard_kill(self): @@ -529,27 +733,11 @@ self.process, log_file=self) - # Reap the child process. This should not hang as the - # hard_kill() mechanism is supposed to really kill it. - # Improvement option: - # If this does ever hang, convert to a self.process.poll() - # loop checking on self.process.returncode until it is not - # None or the timeout occurs. - self.returncode = self.process.wait() - - # Wait a few moments for the io thread to finish... - self.io_thread.join(self.hard_terminate_timeout) - if self.io_thread.is_alive(): - # ... but this is not critical if it doesn't end for some - # reason. - self.write( - "hard kill of process {} timed out after {} seconds waiting " - "for the io thread (ignoring)\n".format( - self.process.pid, self.hard_terminate_timeout)) + # This will wait for all completion events to complete. + terminated = self._wait_for_all_completion_events( + self.hard_terminate_timeout) - # Set if it terminated. (Set up for optional improvement above). - terminated = self.returncode is not None - # Nothing else to try. + # Nothing else to try if this didn't work. done_trying = True return terminated, done_trying @@ -565,7 +753,9 @@ return self._attempt_hard_kill() else: # We don't have anything else to try. - terminated = self.returncode is not None + terminated = ( + self.returncode is not None and + self.output is not None) done_trying = True return terminated, done_trying else: @@ -575,20 +765,43 @@ return self._attempt_hard_kill() else: # We don't have anything else to try. - terminated = self.returncode is not None + terminated = ( + self.returncode is not None and + self.output is not None) done_trying = True return terminated, done_trying def _wait_with_timeout(self, timeout_seconds, command, want_core): # Allow up to timeout seconds for the io thread to wrap up. # If that completes, the child process should be done. - completed_normally = self.done_event.wait(timeout_seconds) - if completed_normally: - # Reap the child process here. - self.returncode = self.process.wait() + one_event_received = self._wait_for_any_completion_event( + timeout=timeout_seconds) + if one_event_received: + # Make sure we get the other completion event. At most + # this should need to wait a few seconds for output or reaping + # to complete. If longer than that, we may have the + # process group leader dead but with shared out stdout/stderr + # pipes. + all_events_received = self._wait_for_all_completion_events( + self.wait_for_all_completion_timeout) else: - # Prepare to stop the process - process_terminated = completed_normally + all_events_received = False + + if not all_events_received: + # Prepare to stop the process/process group. We're doing + # this because one of two cases happened: + # 1) the child process launched is still running (the + # obvious case) + # or + # 2) the child process finished, but it shared out its + # stdout/stderr with a child it spawned, which is still + # running. + # + # In case 2, the process itself is done, but we need to + # terminate its children. As long as we're using + # process groups, we'll be fine terminating the whole + # process group. + process_terminated = False terminate_attempt_count = 0 # Try as many attempts as we support for trying to shut down @@ -608,6 +821,8 @@ # attempts, or we failed but gave it our best effort. self.on_process_exited( command, - self.io_thread.output, - not completed_normally, + self.output, + # We were a timeout if we never received at least one of + # the wait()/communicate() events before the timeout. + not one_event_received, self.returncode) Index: test/test_runner/test/inferior.py =================================================================== --- test/test_runner/test/inferior.py +++ test/test_runner/test/inferior.py @@ -140,4 +140,6 @@ return options.return_code if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) + RETURN_CODE = main(sys.argv[1:]) + print "returning {}".format(RETURN_CODE) + sys.exit(RETURN_CODE) Index: test/test_runner/test/process_control_tests.py =================================================================== --- test/test_runner/test/process_control_tests.py +++ test/test_runner/test/process_control_tests.py @@ -14,7 +14,6 @@ # System imports. import os -import platform import unittest import sys import threading @@ -27,9 +26,11 @@ class TestInferiorDriver(process_control.ProcessDriver): - def __init__(self, soft_terminate_timeout=None): + def __init__(self, soft_terminate_timeout=5.0): + # override the default super(TestInferiorDriver, self).__init__( soft_terminate_timeout=soft_terminate_timeout) + self.started_event = threading.Event() self.started_event.clear() @@ -105,10 +106,13 @@ def test_run_completes_with_code(self): """Test that running completes and gets expected stdout/stderr.""" driver = TestInferiorDriver() - driver.run_command(self.inferior_command(options="-r10")) + expected_returncode = 10 + driver.run_command(self.inferior_command(options="-r{}".format( + expected_returncode))) self.assertTrue( driver.completed_event.wait(5), "process failed to complete") - self.assertEqual(driver.returncode, 10, "return code does not match") + self.assertIsNotNone(driver.returncode) + self.assertEqual(driver.returncode, expected_returncode) class ProcessControlTimeoutTests(ProcessControlTests): @@ -204,27 +208,30 @@ """ driver = TestInferiorDriver() + timeout_seconds = 5 + return_code = 3 # Create the inferior (I1), and instruct it to create a child (C1) # that shares the stdout/stderr handles with the inferior. # C1 will then loop forever. driver.run_command_with_timeout( self.inferior_command( - options="--launch-child-share-handles --return-code 3"), - "5s", + options="--launch-child-share-handles --return-code {}".format( + return_code)), + "{}s".format(timeout_seconds), False) # We should complete without a timetout. I1 should end # immediately after launching C1. self.assertTrue( - driver.completed_event.wait(5), + driver.completed_event.wait(timeout_seconds), "process failed to complete") # Ensure we didn't receive a timeout. - self.assertTrue( + self.assertFalse( driver.was_timeout, "inferior should have completed normally") self.assertEqual( - driver.returncode, 3, + driver.returncode, return_code, "expected inferior process to end with expected returncode")