Index: test/dosep.py =================================================================== --- test/dosep.py +++ test/dosep.py @@ -36,8 +36,10 @@ import os import fnmatch import platform +import Queue import re import dotest_args +import signal import subprocess import sys @@ -142,7 +144,7 @@ return passes, failures, unexpected_successes -def call_with_timeout(command, timeout, name): +def call_with_timeout(command, timeout, name, inferior_pid_events): """Run command with a timeout if possible.""" """-s QUIT will create a coredump if they are enabled on your system""" process = None @@ -161,8 +163,12 @@ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + inferior_pid = process.pid + inferior_pid_events.put_nowait(('created', inferior_pid)) output = process.communicate() exit_status = process.returncode + inferior_pid_events.put_nowait(('destroyed', inferior_pid)) + passes, failures, unexpected_successes = parse_test_results(output) if exit_status == 0: # stdout does not have any useful information from 'dotest.py', @@ -173,7 +179,7 @@ return name, exit_status, passes, failures, unexpected_successes -def process_dir(root, files, test_root, dotest_argv): +def process_dir(root, files, test_root, dotest_argv, inferior_pid_events): """Examine a directory for tests, and invoke any found within it.""" results = [] for name in files: @@ -187,7 +193,8 @@ timeout = (os.getenv("LLDB_%s_TIMEOUT" % timeout_name) or getDefaultTimeout(dotest_options.lldb_platform_name)) - results.append(call_with_timeout(command, timeout, name)) + results.append(call_with_timeout( + command, timeout, name, inferior_pid_events)) # result = (name, status, passes, failures, unexpected_successes) timed_out = [name for name, status, _, _, _ in results @@ -208,10 +215,73 @@ out_q = None -def process_dir_worker(arg_tuple): +def process_dir_worker(a_output_lock, a_test_counter, a_total_tests, + a_test_name_len, a_dotest_options, job_queue, + result_queue, inferior_pid_events): """Worker thread main loop when in multithreaded mode. Takes one directory specification at a time and works on it.""" - return process_dir(*arg_tuple) + + # Shut off interrupt handling in the child process. + signal.signal(signal.SIGINT, signal.SIG_IGN) + + # Setup the global state for the worker process. + setup_global_variables( + a_output_lock, a_test_counter, a_total_tests, a_test_name_len, + a_dotest_options) + + # Keep grabbing entries from the queue until done. + while not job_queue.empty(): + try: + job = job_queue.get(block=False) + result = process_dir(job[0], job[1], job[2], job[3], + inferior_pid_events) + result_queue.put(result) + except Queue.Empty: + # Fine, we're done. + pass + + +def collect_active_pids_from_pid_events(event_queue): + """ + Returns the set of what should be active inferior pids based on + the event stream. + + @param event_queue a multiprocessing.Queue containing events of the + form: + ('created', pid) + ('destroyed', pid) + + @return set of inferior dotest.py pids activated but never completed. + """ + active_pid_set = set() + while not event_queue.empty(): + pid_event = event_queue.get_nowait() + if pid_event[0] == 'created': + active_pid_set.add(pid_event[1]) + elif pid_event[0] == 'destroyed': + active_pid_set.remove(pid_event[1]) + return active_pid_set + + +def kill_all_worker_processes(workers, inferior_pid_events): + """ + Kills all specified worker processes and their process tree. + + @param workers a list of multiprocess.Process worker objects. + @param inferior_pid_events a multiprocess.Queue that contains + all inferior create and destroy events. Used to construct + the list of child pids still outstanding that need to be killed. + """ + for worker in workers: + worker.terminate() + worker.join() + + # Add all the child test pids created. + active_pid_set = collect_active_pids_from_pid_events( + inferior_pid_events) + for inferior_pid in active_pid_set: + print "killing inferior pid {}".format(inferior_pid) + os.kill(inferior_pid, signal.SIGKILL) def walk_and_invoke(test_directory, test_subdir, dotest_argv, num_threads): @@ -236,7 +306,8 @@ return name.startswith("Test") and name.endswith(".py") tests = filter(is_test, files) - test_work_items.append((root, tests, test_directory, dotest_argv)) + test_work_items.append( + (root, tests, test_directory, dotest_argv, None)) global output_lock, test_counter, total_tests, test_name_len output_lock = multiprocessing.RLock() @@ -251,12 +322,65 @@ # Run the items, either in a pool (for multicore speedup) or # calling each individually. if num_threads > 1: - pool = multiprocessing.Pool( - num_threads, - initializer=setup_global_variables, - initargs=(output_lock, test_counter, total_tests, test_name_len, - dotest_options)) - test_results = pool.map(process_dir_worker, test_work_items) + # Create jobs. + job_queue = multiprocessing.Queue() + for test_work_item in test_work_items: + job_queue.put(test_work_item) + + result_queue = multiprocessing.Queue() + + # Create queues for started child pids. Terminating + # the multiprocess processes does not terminate the + # child processes they spawn. We can remove this tracking + # if/when we move to having the multiprocess process directly + # perform the test logic. The Queue size needs to be able to + # hold 2 * (num inferior dotest.py processes started) entries. + inferior_pid_events = multiprocessing.Queue(4096) + + # Create workers. We don't use multiprocessing.Pool due to + # challenges with handling ^C keyboard interrupts. + workers = [] + for _ in range(num_threads): + worker = multiprocessing.Process( + target=process_dir_worker, + args=(output_lock, + test_counter, + total_tests, + test_name_len, + dotest_options, + job_queue, + result_queue, + inferior_pid_events)) + worker.start() + workers.append(worker) + + # Wait for all workers to finish, handling ^C as needed. + try: + for worker in workers: + worker.join() + except KeyboardInterrupt: + # First try to drain the queue of work and let the + # running tests complete. + while not job_queue.empty(): + try: + # Just drain it to stop more work from being started. + job_queue.get_nowait() + except Queue.Empty: + pass + + print ('\nFirst KeyboardInterrupt received, stopping ' + 'future work. Press again to hard-stop existing tests.') + try: + for worker in workers: + worker.join() + except KeyboardInterrupt: + print ('\nSecond KeyboardInterrupt received, killing ' + 'all worker process trees.') + kill_all_worker_processes(workers, inferior_pid_events) + + test_results = [] + while not result_queue.empty(): + test_results.append(result_queue.get(block=False)) else: test_results = map(process_dir_worker, test_work_items)