Index: llvm/utils/lit/lit/main.py =================================================================== --- llvm/utils/lit/lit/main.py +++ llvm/utils/lit/lit/main.py @@ -278,12 +278,15 @@ debug_group.add_argument("--show-tests", dest="showTests", help="Show all discovered tests", action="store_true", default=False) - debug_group.add_argument("--use-processes", dest="useProcesses", + debug_group.add_argument("--use-process-pool", dest="executionStrategy", + help="Run tests in parallel with a process pool", + action="store_const", const="PROCESS_POOL") + debug_group.add_argument("--use-processes", dest="executionStrategy", help="Run tests in parallel with processes (not threads)", - action="store_true", default=True) - debug_group.add_argument("--use-threads", dest="useProcesses", + action="store_const", const="PROCESSES") + debug_group.add_argument("--use-threads", dest="executionStrategy", help="Run tests in parallel with threads (not processes)", - action="store_false", default=True) + action="store_const", const="THREADS") opts = parser.parse_args() args = opts.test_paths @@ -298,6 +301,9 @@ if opts.numThreads is None: opts.numThreads = lit.util.detectCPUs() + if opts.executionStrategy is None: + opts.executionStrategy = 'PROCESS_POOL' + if opts.maxFailures == 0: parser.error("Setting --max-failures to 0 does not have any effect.") @@ -481,7 +487,7 @@ display = TestingProgressDisplay(opts, len(run.tests), progressBar) try: run.execute_tests(display, opts.numThreads, opts.maxTime, - opts.useProcesses) + opts.executionStrategy) except KeyboardInterrupt: sys.exit(2) display.finish() Index: llvm/utils/lit/lit/run.py =================================================================== --- llvm/utils/lit/lit/run.py +++ llvm/utils/lit/lit/run.py @@ -1,4 +1,5 @@ import os +import sys import threading import time import traceback @@ -84,11 +85,13 @@ def run_test(self, test_index): test = self.run_instance.tests[test_index] try: - self.run_instance.execute_test(test) + execute_test(test, self.run_instance.lit_config, + self.run_instance.parallelism_semaphores) except KeyboardInterrupt: # This is a sad hack. Unfortunately subprocess goes # bonkers with ctrl-c and we start forking merrily. print('\nCtrl-C detected, goodbye.') + sys.stdout.flush() os.kill(0,9) self.consumer.update(test_index, test) @@ -167,6 +170,44 @@ def handleFailures(provider, consumer, maxFailures): consumer.display = _Display(consumer.display, provider, maxFailures) +def execute_test(test, lit_config, parallelism_semaphores): + """Execute one test""" + pg = test.config.parallelism_group + if callable(pg): + pg = pg(test) + + result = None + semaphore = None + try: + if pg: + semaphore = parallelism_semaphores[pg] + if semaphore: + semaphore.acquire() + start_time = time.time() + result = test.config.test_format.execute(test, lit_config) + # Support deprecated result from execute() which returned the result + # code and additional output as a tuple. + if isinstance(result, tuple): + code, output = result + result = lit.Test.Result(code, output) + elif not isinstance(result, lit.Test.Result): + raise ValueError("unexpected result from test execution") + result.elapsed = time.time() - start_time + except KeyboardInterrupt: + raise + except: + if lit_config.debug: + raise + output = 'Exception during script execution:\n' + output += traceback.format_exc() + output += '\n' + result = lit.Test.Result(lit.Test.UNRESOLVED, output) + finally: + if semaphore: + semaphore.release() + + test.setResult(result) + class Run(object): """ This class represents a concrete, configured testing run. @@ -177,42 +218,10 @@ self.tests = tests def execute_test(self, test): - pg = test.config.parallelism_group - if callable(pg): pg = pg(test) - - result = None - semaphore = None - try: - if pg: semaphore = self.parallelism_semaphores[pg] - if semaphore: semaphore.acquire() - start_time = time.time() - result = test.config.test_format.execute(test, self.lit_config) - - # Support deprecated result from execute() which returned the result - # code and additional output as a tuple. - if isinstance(result, tuple): - code, output = result - result = lit.Test.Result(code, output) - elif not isinstance(result, lit.Test.Result): - raise ValueError("unexpected result from test execution") - - result.elapsed = time.time() - start_time - except KeyboardInterrupt: - raise - except: - if self.lit_config.debug: - raise - output = 'Exception during script execution:\n' - output += traceback.format_exc() - output += '\n' - result = lit.Test.Result(lit.Test.UNRESOLVED, output) - finally: - if semaphore: semaphore.release() - - test.setResult(result) + return execute_test(test, self.lit_config, self.parallelism_semaphores) def execute_tests(self, display, jobs, max_time=None, - use_processes=False): + execution_strategy=None): """ execute_tests(display, jobs, [max_time]) @@ -234,6 +243,14 @@ be given an UNRESOLVED result. """ + if execution_strategy == 'PROCESS_POOL': + self.execute_tests_with_mp_pool(display, jobs, max_time) + return + # FIXME: Standardize on the PROCESS_POOL execution strategy and remove + # the other two strategies. + + use_processes = execution_strategy == 'PROCESSES' + # Choose the appropriate parallel execution implementation. consumer = None if jobs != 1 and use_processes and multiprocessing: @@ -263,8 +280,8 @@ provider = TestProvider(queue_impl, canceled_flag) handleFailures(provider, consumer, self.lit_config.maxFailures) - # Queue the tests outside the main thread because we can't guarantee - # that we can put() all the tests without blocking: + # Putting tasks into the threading or multiprocessing Queue may block, + # so do it in a separate thread. # https://docs.python.org/2/library/multiprocessing.html # e.g: On Mac OS X, we will hang if we put 2^15 elements in the queue # without taking any out. @@ -317,3 +334,108 @@ # Wait for all the tasks to complete. for t in tasks: t.join() + + def execute_tests_with_mp_pool(self, display, jobs, max_time=None): + # Don't do anything if we aren't going to run any tests. + if not self.tests or jobs == 0: + return + + # Set up semaphores to limit parallelism of certain classes of tests. + # For example, some ASan tests require lots of virtual memory and run + # faster with less parallelism on OS X. + self.parallelism_semaphores = \ + {k: multiprocessing.Semaphore(v) for k, v in + self.lit_config.parallelism_groups.items()} + + # Save the display object on the runner so that we can update it from + # our task completion callback. + self.display = display + + # Start a process pool. Copy over the data shared between all test runs. + pool = multiprocessing.Pool(jobs, worker_initializer, + (self.lit_config, + self.parallelism_semaphores)) + + # Install a console-control signal handler on Windows. + if win32api is not None: + def console_ctrl_handler(type): + print "Ctr-C received, terminating" + pool.terminate() + pool.join() + os.kill(0,9) + return True + win32api.SetConsoleCtrlHandler(console_ctrl_handler, True) + + try: + async_results = [pool.apply_async(worker_run_one_test, + args=(test_index, test), + callback=self.consume_test_result) + for test_index, test in enumerate(self.tests)] + + # Wait for all results to come in. The callback that runs in the + # parent process will update the display. + for a in async_results: + a.wait() + if not a.successful(): + a.get() # Exceptions raised here come from the worker. + finally: + pool.terminate() + pool.join() + + # Mark any tests that weren't run as UNRESOLVED. + for test in self.tests: + if test.result is None: + test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0)) + + def consume_test_result(self, pool_result): + """Test completion callback for worker_run_one_test + + Updates the test result status in the parent process. Each task in the + pool returns the test index and the result, and we use the index to look + up the original test object. Also updates the progress bar as tasks + complete. + """ + (test_index, test_with_result) = pool_result + # Update the parent process copy of the test. This includes the result, + # XFAILS, REQUIRES, and UNSUPPORTED statuses. + assert self.tests[test_index].file_path == test_with_result.file_path, \ + "parent and child disagree on test path" + self.tests[test_index] = test_with_result + self.display.update(test_with_result) + +child_lit_config = None +child_parallelism_semaphores = None + +def worker_initializer(lit_config, parallelism_semaphores): + """Copy expensive repeated data into worker processes""" + global child_lit_config + child_lit_config = lit_config + global child_parallelism_semaphores + child_parallelism_semaphores = parallelism_semaphores + +def worker_run_one_test(test_index, test): + """Run one test in a multiprocess.Pool + + Side effects in this function and functions it calls are not visible in the + main lit process. + + Arguments and results of this function are pickled, so they should be cheap + to copy. For efficiency, we copy all data needed to execute all tests into + each worker and store it in the child_* global variables. This reduces the + cost of each task. + + Returns an index and a Result, which the parent process uses to update + the display. + """ + try: + execute_test(test, child_lit_config, child_parallelism_semaphores) + return (test_index, test) + except KeyboardInterrupt as e: + # This is a sad hack. Unfortunately subprocess goes + # bonkers with ctrl-c and we start forking merrily. + print('\nCtrl-C detected, goodbye.') + traceback.print_exc() + sys.stdout.flush() + os.kill(0,9) + except: + traceback.print_exc()