Index: test/dosep.py =================================================================== --- test/dosep.py +++ test/dosep.py @@ -32,14 +32,19 @@ echo core.%p | sudo tee /proc/sys/kernel/core_pattern """ +import fnmatch import multiprocessing +import multiprocessing.pool import os -import fnmatch import platform +import Queue import re -import dotest_args +import signal import subprocess import sys +import threading + +import dotest_args from optparse import OptionParser @@ -142,7 +147,7 @@ return passes, failures, unexpected_successes -def call_with_timeout(command, timeout, name): +def call_with_timeout(command, timeout, name, inferior_pid_events): """Run command with a timeout if possible.""" """-s QUIT will create a coredump if they are enabled on your system""" process = None @@ -161,8 +166,14 @@ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + inferior_pid = process.pid + if inferior_pid_events: + inferior_pid_events.put_nowait(('created', inferior_pid)) output = process.communicate() exit_status = process.returncode + if inferior_pid_events: + inferior_pid_events.put_nowait(('destroyed', inferior_pid)) + passes, failures, unexpected_successes = parse_test_results(output) if exit_status == 0: # stdout does not have any useful information from 'dotest.py', @@ -173,7 +184,7 @@ return name, exit_status, passes, failures, unexpected_successes -def process_dir(root, files, test_root, dotest_argv): +def process_dir(root, files, test_root, dotest_argv, inferior_pid_events): """Examine a directory for tests, and invoke any found within it.""" results = [] for name in files: @@ -187,7 +198,8 @@ timeout = (os.getenv("LLDB_%s_TIMEOUT" % timeout_name) or getDefaultTimeout(dotest_options.lldb_platform_name)) - results.append(call_with_timeout(command, timeout, name)) + results.append(call_with_timeout( + command, timeout, name, inferior_pid_events)) # result = (name, status, passes, failures, unexpected_successes) timed_out = [name for name, status, _, _, _ in results @@ -208,39 +220,175 @@ out_q = None -def process_dir_worker(arg_tuple): - """Worker thread main loop when in multithreaded mode. +def process_dir_worker_multiprocessing( + a_output_lock, a_test_counter, a_total_tests, a_test_name_len, + a_dotest_options, job_queue, result_queue, inferior_pid_events): + """Worker thread main loop when in multiprocessing mode. Takes one directory specification at a time and works on it.""" - return process_dir(*arg_tuple) + # Shut off interrupt handling in the child process. + signal.signal(signal.SIGINT, signal.SIG_IGN) -def walk_and_invoke(test_directory, test_subdir, dotest_argv, num_threads): - """Look for matched files and invoke test driver on each one. - In single-threaded mode, each test driver is invoked directly. - In multi-threaded mode, submit each test driver to a worker - queue, and then wait for all to complete. + # Setup the global state for the worker process. + setup_global_variables( + a_output_lock, a_test_counter, a_total_tests, a_test_name_len, + a_dotest_options) - test_directory - lldb/test/ directory - test_subdir - lldb/test/ or a subfolder with the tests we're interested in - running + # Keep grabbing entries from the queue until done. + while not job_queue.empty(): + try: + job = job_queue.get(block=False) + result = process_dir(job[0], job[1], job[2], job[3], + inferior_pid_events) + result_queue.put(result) + except Queue.Empty: + # Fine, we're done. + pass + + +def process_dir_worker_multiprocessing_pool(args): + return process_dir(*args) + + +def process_dir_worker_threading( + a_test_counter, a_total_tests, a_test_name_len, + a_dotest_options, job_queue, result_queue, inferior_pid_events): + """Worker thread main loop when in threading mode. + + This one supports the hand-rolled pooling support. + + Takes one directory specification at a time and works on it.""" + + # Keep grabbing entries from the queue until done. + while not job_queue.empty(): + try: + job = job_queue.get(block=False) + result = process_dir(job[0], job[1], job[2], job[3], + inferior_pid_events) + result_queue.put(result) + except Queue.Empty: + # Fine, we're done. + pass + + +def process_dir_worker_threading_pool(args): + return process_dir(*args) + + +def process_dir_mapper_inprocess(args): + """Map adapter for running the subprocess-based, non-threaded test runner. + + @param args the process work item tuple + @return the test result tuple """ + return process_dir(*args) - # Collect the test files that we'll run. - test_work_items = [] - for root, dirs, files in os.walk(test_subdir, topdown=False): - def is_test(name): + +def collect_active_pids_from_pid_events(event_queue): + """ + Returns the set of what should be active inferior pids based on + the event stream. + + @param event_queue a multiprocessing.Queue containing events of the + form: + ('created', pid) + ('destroyed', pid) + + @return set of inferior dotest.py pids activated but never completed. + """ + active_pid_set = set() + while not event_queue.empty(): + pid_event = event_queue.get_nowait() + if pid_event[0] == 'created': + active_pid_set.add(pid_event[1]) + elif pid_event[0] == 'destroyed': + active_pid_set.remove(pid_event[1]) + return active_pid_set + + +def kill_all_worker_processes(workers, inferior_pid_events): + """ + Kills all specified worker processes and their process tree. + + @param workers a list of multiprocess.Process worker objects. + @param inferior_pid_events a multiprocess.Queue that contains + all inferior create and destroy events. Used to construct + the list of child pids still outstanding that need to be killed. + """ + for worker in workers: + worker.terminate() + worker.join() + + # Add all the child test pids created. + active_pid_set = collect_active_pids_from_pid_events( + inferior_pid_events) + for inferior_pid in active_pid_set: + print "killing inferior pid {}".format(inferior_pid) + os.kill(inferior_pid, signal.SIGKILL) + + +def kill_all_worker_threads(workers, inferior_pid_events): + """ + Kills all specified worker threads and their process tree. + + @param workers a list of multiprocess.Process worker objects. + @param inferior_pid_events a multiprocess.Queue that contains + all inferior create and destroy events. Used to construct + the list of child pids still outstanding that need to be killed. + """ + + # Add all the child test pids created. + active_pid_set = collect_active_pids_from_pid_events( + inferior_pid_events) + for inferior_pid in active_pid_set: + print "killing inferior pid {}".format(inferior_pid) + os.kill(inferior_pid, signal.SIGKILL) + + # We don't have a way to nuke the threads. However, since we killed + # all the inferiors, and we drained the job queue, this will be + # good enough. Wait cleanly for each worker thread to wrap up. + for worker in workers: + worker.join() + + +def find_test_files_in_dir_tree(dir_root, found_func): + """Calls found_func for all the test files in the given dir hierarchy. + + @param dir_root the path to the directory to start scanning + for test files. All files in this directory and all its children + directory trees will be searched. + + @param found_func a callable object that will be passed + the parent directory (relative to dir_root) and the list of + test files from within that directory. + """ + for root, _, files in os.walk(dir_root, topdown=False): + def is_test_filename(test_dir, base_filename): + """Returns True if the given filename matches the test name format. + + @param test_dir the directory to check. Should be absolute or + relative to current working directory. + + @param base_filename the base name of the filename to check for a + dherence to the python test case filename format. + + @return True if name matches the python test case filename format. + """ # Not interested in symbolically linked files. - if os.path.islink(os.path.join(root, name)): + if os.path.islink(os.path.join(test_dir, base_filename)): return False # Only interested in test files with the "Test*.py" naming pattern. - return name.startswith("Test") and name.endswith(".py") + return (base_filename.startswith("Test") and + base_filename.endswith(".py")) - tests = filter(is_test, files) - test_work_items.append((root, tests, test_directory, dotest_argv)) + tests = [filename for filename in files + if is_test_filename(root, filename)] + if tests: + found_func(root, tests) - global output_lock, test_counter, total_tests, test_name_len - output_lock = multiprocessing.RLock() - # item = (root, tests, test_directory, dotest_argv) + +def initialize_global_vars_common(num_threads, test_work_items): + global total_tests, test_counter, test_name_len total_tests = sum([len(item[1]) for item in test_work_items]) test_counter = multiprocessing.Value('i', 0) test_name_len = multiprocessing.Value('i', 0) @@ -248,19 +396,246 @@ total_tests, num_threads, (num_threads > 1) * "s") update_progress() - # Run the items, either in a pool (for multicore speedup) or - # calling each individually. - if num_threads > 1: - pool = multiprocessing.Pool( - num_threads, - initializer=setup_global_variables, - initargs=(output_lock, test_counter, total_tests, test_name_len, - dotest_options)) - test_results = pool.map(process_dir_worker, test_work_items) - else: - test_results = map(process_dir_worker, test_work_items) - # result = (timed_out, failed, passed, unexpected_successes, fail_count, pass_count) +def initialize_global_vars_multiprocessing(num_threads, test_work_items): + # Initialize the global state we'll use to communicate with the + # rest of the flat module. + global output_lock + output_lock = multiprocessing.RLock() + initialize_global_vars_common(num_threads, test_work_items) + + +def initialize_global_vars_threading(num_threads, test_work_items): + # Initialize the global state we'll use to communicate with the + # rest of the flat module. + global output_lock + output_lock = threading.RLock() + initialize_global_vars_common(num_threads, test_work_items) + + +def multiprocessing_test_runner(num_threads, test_work_items): + """Provides hand-wrapped pooling test runner adapter with Ctrl-C support. + + This concurrent test runner is based on the multiprocessing + library, and rolls its own worker pooling strategy so it + can handle Ctrl-C properly. + + This test runner is known to have an issue running on + Windows platforms. + + @param num_threads the number of worker processes to use. + + @param test_work_items the iterable of test work item tuples + to run. + """ + + # Initialize our global state. + initialize_global_vars_multiprocessing(num_threads, test_work_items) + + # Create jobs. + job_queue = multiprocessing.Queue(len(test_work_items)) + for test_work_item in test_work_items: + job_queue.put(test_work_item) + + result_queue = multiprocessing.Queue(len(test_work_items)) + + # Create queues for started child pids. Terminating + # the multiprocess processes does not terminate the + # child processes they spawn. We can remove this tracking + # if/when we move to having the multiprocess process directly + # perform the test logic. The Queue size needs to be able to + # hold 2 * (num inferior dotest.py processes started) entries. + inferior_pid_events = multiprocessing.Queue(4096) + + # Create workers. We don't use multiprocessing.Pool due to + # challenges with handling ^C keyboard interrupts. + workers = [] + for _ in range(num_threads): + worker = multiprocessing.Process( + target=process_dir_worker_multiprocessing, + args=(output_lock, + test_counter, + total_tests, + test_name_len, + dotest_options, + job_queue, + result_queue, + inferior_pid_events)) + worker.start() + workers.append(worker) + + # Wait for all workers to finish, handling ^C as needed. + try: + for worker in workers: + worker.join() + except KeyboardInterrupt: + # First try to drain the queue of work and let the + # running tests complete. + while not job_queue.empty(): + try: + # Just drain it to stop more work from being started. + job_queue.get_nowait() + except Queue.Empty: + pass + + print ('\nFirst KeyboardInterrupt received, stopping ' + 'future work. Press again to hard-stop existing tests.') + try: + for worker in workers: + worker.join() + except KeyboardInterrupt: + print ('\nSecond KeyboardInterrupt received, killing ' + 'all worker process trees.') + kill_all_worker_processes(workers, inferior_pid_events) + + test_results = [] + while not result_queue.empty(): + test_results.append(result_queue.get(block=False)) + return test_results + + +def multiprocessing_test_runner_pool(num_threads, test_work_items): + # Initialize our global state. + initialize_global_vars_multiprocessing(num_threads, test_work_items) + + pool = multiprocessing.Pool( + num_threads, + initializer=setup_global_variables, + initargs=(output_lock, test_counter, total_tests, test_name_len, + dotest_options)) + return pool.map(process_dir_worker_multiprocessing_pool, test_work_items) + + +def threading_test_runner(num_threads, test_work_items): + """Provides hand-wrapped pooling threading-based test runner adapter + with Ctrl-C support. + + This concurrent test runner is based on the threading + library, and rolls its own worker pooling strategy so it + can handle Ctrl-C properly. + + @param num_threads the number of worker processes to use. + + @param test_work_items the iterable of test work item tuples + to run. + """ + + # Initialize our global state. + initialize_global_vars_threading(num_threads, test_work_items) + + # Create jobs. + job_queue = Queue.Queue() + for test_work_item in test_work_items: + job_queue.put(test_work_item) + + result_queue = Queue.Queue() + + # Create queues for started child pids. Terminating + # the threading threads does not terminate the + # child processes they spawn. + inferior_pid_events = Queue.Queue() + + # Create workers. We don't use multiprocessing.pool.ThreadedPool + # due to challenges with handling ^C keyboard interrupts. + workers = [] + for _ in range(num_threads): + worker = threading.Thread( + target=process_dir_worker_threading, + args=(test_counter, + total_tests, + test_name_len, + dotest_options, + job_queue, + result_queue, + inferior_pid_events)) + worker.start() + workers.append(worker) + + # Wait for all workers to finish, handling ^C as needed. + try: + # We do some trickery here to ensure we can catch keyboard + # interrupts. + while len(workers) > 0: + # Make a pass throug the workers, checking for who is done. + dead_workers = [] + for worker in workers: + # This non-blocking join call is what allows us + # to still receive keyboard interrupts. + worker.join(0.01) + if not worker.isAlive(): + dead_workers.append(worker) + # Clear out the completed workers + for dead_worker in dead_workers: + workers.remove(dead_worker) + + except KeyboardInterrupt: + # First try to drain the queue of work and let the + # running tests complete. + while not job_queue.empty(): + try: + # Just drain it to stop more work from being started. + job_queue.get_nowait() + except Queue.Empty: + pass + + print ('\nFirst KeyboardInterrupt received, stopping ' + 'future work. Press again to hard-stop existing tests.') + try: + for worker in workers: + worker.join() + except KeyboardInterrupt: + print ('\nSecond KeyboardInterrupt received, killing ' + 'all worker process trees.') + kill_all_worker_threads(workers, inferior_pid_events) + + test_results = [] + while not result_queue.empty(): + test_results.append(result_queue.get(block=False)) + return test_results + + +def threading_test_runner_pool(num_threads, test_work_items): + # Initialize our global state. + initialize_global_vars_threading(num_threads, test_work_items) + + pool = multiprocessing.pool.ThreadPool( + num_threads + # initializer=setup_global_variables, + # initargs=(output_lock, test_counter, total_tests, test_name_len, + # dotest_options) + ) + return pool.map(process_dir_worker_threading_pool, test_work_items) + + +def inprocess_exec_test_runner(test_work_items): + # Initialize our global state. + initialize_global_vars_multiprocessing(1, test_work_items) + return map(process_dir_mapper_inprocess, test_work_items) + + +def walk_and_invoke(test_directory, test_subdir, dotest_argv, + test_runner_func): + """Look for matched files and invoke test driver on each one. + In single-threaded mode, each test driver is invoked directly. + In multi-threaded mode, submit each test driver to a worker + queue, and then wait for all to complete. + + test_directory - lldb/test/ directory + test_subdir - lldb/test/ or a subfolder with the tests we're interested in + running + """ + + # Collect the test files that we'll run. + test_work_items = [] + find_test_files_in_dir_tree( + test_subdir, lambda testdir, test_files: test_work_items.append([ + test_subdir, test_files, test_directory, dotest_argv, None])) + + # Convert test work items into test results using whatever + # was provided as the test run function. + test_results = test_runner_func(test_work_items) + + # Summarize the results and return to caller. timed_out = sum([result[0] for result in test_results], []) passed = sum([result[1] for result in test_results], []) failed = sum([result[2] for result in test_results], []) @@ -268,7 +643,8 @@ pass_count = sum([result[4] for result in test_results]) fail_count = sum([result[5] for result in test_results]) - return (timed_out, passed, failed, unexpected_successes, pass_count, fail_count) + return (timed_out, passed, failed, unexpected_successes, pass_count, + fail_count) def getExpectedTimeouts(platform_name): @@ -277,7 +653,6 @@ host = sys.platform if platform_name is None: target = sys.platform - remote = False else: m = re.search('remote-(\w+)', platform_name) target = m.group(1) @@ -354,7 +729,51 @@ return result -def main(print_details_on_success, num_threads, test_subdir): +def get_test_runner_strategies(num_threads): + """Returns the test runner strategies by name in a dictionary. + + @param num_threads specifies the number of threads/processes + that will be used for concurrent test runners. + + @return dictionary with key as test runner strategy name and + value set to a callable object that takes the test work item + and returns a test result tuple. + """ + return { + # multiprocessing supports ctrl-c and does not use + # multiprocessing.Pool. + "multiprocessing": + (lambda work_items: multiprocessing_test_runner( + num_threads, work_items)), + + # multiprocessing-pool uses multiprocessing.Pool but + # does not support Ctrl-C. + "multiprocessing-pool": + (lambda work_items: multiprocessing_test_runner_pool( + num_threads, work_items)), + + # threading uses a hand-rolled worker pool much + # like multiprocessing, but instead uses in-process + # worker threads. This one supports Ctrl-C. + "threading": + (lambda work_items: threading_test_runner(num_threads, work_items)), + + # threading-pool uses threading for the workers (in-process) + # and uses the multiprocessing.pool thread-enabled pool. + "threading-pool": + (lambda work_items: threading_test_runner_pool( + num_threads, work_items)), + + # serial uses the subprocess-based, single process + # test runner. This provides process isolation but + # no concurrent test running. + "serial": + inprocess_exec_test_runner + } + + +def main(print_details_on_success, num_threads, test_subdir, + test_runner_name): """Run dotest.py in inferior mode in parallel. @param print_details_on_success the parsed value of the output-on-success @@ -368,6 +787,13 @@ @param test_subdir optionally specifies a subdir to limit testing within. May be None if the entire test tree is to be used. This subdir is assumed to be relative to the lldb/test root of the test hierarchy. + + @param test_runner_name if specified, contains the test runner + name which selects the strategy used to run the isolated and + optionally concurrent test runner. Specify None to allow the + system to choose the most appropriate test runner given desired + thread count and OS type. + """ dotest_argv = sys.argv[1:] @@ -435,8 +861,38 @@ num_threads = 1 system_info = " ".join(platform.uname()) - (timed_out, passed, failed, unexpected_successes, pass_count, fail_count) = walk_and_invoke( - test_directory, test_subdir, dotest_argv, num_threads) + + # Figure out which testrunner strategy we'll use. + runner_strategies_by_name = get_test_runner_strategies(num_threads) + + # If the user didn't specify a test runner strategy, determine + # the default now based on number of threads and OS type. + if not test_runner_name: + if num_threads == 1: + # Use the serial runner. + test_runner_name = "serial" + elif os.name == "nt": + # Currently the multiprocessing test runner with ctrl-c + # support isn't running correctly on nt. Use the pool + # support without ctrl-c. + test_runner_name = "multiprocessing-pool" + else: + # For everyone else, use the ctrl-c-enabled + # multiprocessing support. + test_runner_name = "multiprocessing" + + if test_runner_name not in runner_strategies_by_name: + raise Exception("specified testrunner name '{}' unknown. " + "Valid choices: {}".format( + test_runner_name, + runner_strategies_by_name.keys())) + test_runner_func = runner_strategies_by_name[test_runner_name] + + summary_results = walk_and_invoke( + test_directory, test_subdir, dotest_argv, test_runner_func) + + (timed_out, passed, failed, unexpected_successes, pass_count, + fail_count) = summary_results timed_out = set(timed_out) num_test_files = len(passed) + len(failed) Index: test/dotest.py =================================================================== --- test/dotest.py +++ test/dotest.py @@ -249,6 +249,7 @@ num_threads = None output_on_success = False no_multiprocess_test_runner = False +test_runner_name = None def usage(parser): parser.print_help() @@ -495,6 +496,7 @@ global num_threads global output_on_success global no_multiprocess_test_runner + global test_runner_name do_help = False @@ -756,7 +758,8 @@ if args.inferior: is_inferior_test_runner = True - if args.output_on_success: + # Turn on output_on_sucess if either explicitly added or -v specified. + if args.output_on_success or args.v: output_on_success = True if args.num_threads: @@ -765,6 +768,9 @@ if args.test_subdir: multiprocess_test_subdir = args.test_subdir + if args.test_runner_name: + test_runner_name = args.test_runner_name + if args.lldb_platform_name: lldb_platform_name = args.lldb_platform_name if args.lldb_platform_url: @@ -1278,8 +1284,9 @@ # multiprocess test runner here. if isMultiprocessTestRunner(): import dosep - dosep.main(output_on_success, num_threads, multiprocess_test_subdir) - raise "should never get here" + dosep.main(output_on_success, num_threads, multiprocess_test_subdir, + test_runner_name) + raise Exception("should never get here") setupSysPath() setupCrashInfoHook() Index: test/dotest_args.py =================================================================== --- test/dotest_args.py +++ test/dotest_args.py @@ -128,11 +128,17 @@ dest='num_threads', help=('The number of threads/processes to use when running tests ' 'separately, defaults to the number of CPU cores available')) - parser.add_argument( + group.add_argument( '--test-subdir', action='store', help='Specify a test subdirectory to use relative to the test root dir' ) + group.add_argument( + '--test-runner-name', + action='store', + help=('Specify a test runner strategy. Valid values: multiprocessing,' + ' multiprocessing-pool, serial, threading, threading-pool') + ) # Remove the reference to our helper function del X