Index: llvm/trunk/utils/lit/lit/run.py =================================================================== --- llvm/trunk/utils/lit/lit/run.py +++ llvm/trunk/utils/lit/lit/run.py @@ -1,28 +1,9 @@ -import os -import sys -import threading +import multiprocessing import time -import traceback -try: - import Queue as queue -except ImportError: - import queue - -try: - import win32api -except ImportError: - win32api = None -import multiprocessing import lit.Test - -def abort_now(): - """Abort the current process without doing any exception teardown""" - sys.stdout.flush() - if win32api: - win32api.TerminateProcess(win32api.GetCurrentProcess(), 3) - else: - os.kill(0, 9) +import lit.util +import lit.worker class _Display(object): def __init__(self, display, provider, maxFailures): @@ -48,12 +29,11 @@ # For example, some ASan tests require lots of virtual memory and run # faster with less parallelism on OS X. self.parallelism_semaphores = \ - {k: multiprocessing.Semaphore(v) for k, v in + {k: multiprocessing.BoundedSemaphore(v) for k, v in self.lit_config.parallelism_groups.items()} def execute_test(self, test): - return _execute_test_impl(test, self.lit_config, - self.parallelism_semaphores) + return lit.worker._execute_test(test, self.lit_config) def execute_tests_in_pool(self, jobs, max_time): # We need to issue many wait calls, so compute the final deadline and @@ -67,22 +47,22 @@ # interrupts the workers before we make it into our task callback, they # will each raise a KeyboardInterrupt exception and print to stderr at # the same time. - pool = multiprocessing.Pool(jobs, worker_initializer, + pool = multiprocessing.Pool(jobs, lit.worker.initializer, (self.lit_config, self.parallelism_semaphores)) # Install a console-control signal handler on Windows. - if win32api is not None: + if lit.util.win32api is not None: def console_ctrl_handler(type): print('\nCtrl-C detected, terminating.') pool.terminate() pool.join() - abort_now() + lit.util.abort_now() return True - win32api.SetConsoleCtrlHandler(console_ctrl_handler, True) + lit.util.win32api.SetConsoleCtrlHandler(console_ctrl_handler, True) try: - async_results = [pool.apply_async(worker_run_one_test, + async_results = [pool.apply_async(lit.worker.run_one_test, args=(test_index, test), callback=self.consume_test_result) for test_index, test in enumerate(self.tests)] @@ -143,11 +123,9 @@ self.failure_count = 0 self.hit_max_failures = False if jobs == 1: - global child_lit_config - child_lit_config = self.lit_config for test_index, test in enumerate(self.tests): - result = worker_run_one_test(test_index, test) - self.consume_test_result(result) + lit.worker._execute_test(test, self.lit_config) + self.consume_test_result((test_index, test)) if self.hit_max_failures: break else: @@ -159,7 +137,7 @@ test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0)) def consume_test_result(self, pool_result): - """Test completion callback for worker_run_one_test + """Test completion callback for lit.worker.run_one_test Updates the test result status in the parent process. Each task in the pool returns the test index and the result, and we use the index to look @@ -186,74 +164,3 @@ if self.lit_config.maxFailures and \ self.failure_count == self.lit_config.maxFailures: self.hit_max_failures = True - -def _execute_test_impl(test, lit_config, parallelism_semaphores): - """Execute one test""" - pg = test.config.parallelism_group - if callable(pg): - pg = pg(test) - - result = None - semaphore = None - try: - if pg: - semaphore = parallelism_semaphores[pg] - if semaphore: - semaphore.acquire() - start_time = time.time() - result = test.config.test_format.execute(test, lit_config) - # Support deprecated result from execute() which returned the result - # code and additional output as a tuple. - if isinstance(result, tuple): - code, output = result - result = lit.Test.Result(code, output) - elif not isinstance(result, lit.Test.Result): - raise ValueError("unexpected result from test execution") - result.elapsed = time.time() - start_time - except KeyboardInterrupt: - raise - except: - if lit_config.debug: - raise - output = 'Exception during script execution:\n' - output += traceback.format_exc() - output += '\n' - result = lit.Test.Result(lit.Test.UNRESOLVED, output) - finally: - if semaphore: - semaphore.release() - - test.setResult(result) - -child_lit_config = None -child_parallelism_semaphores = None - -def worker_initializer(lit_config, parallelism_semaphores): - """Copy expensive repeated data into worker processes""" - global child_lit_config - child_lit_config = lit_config - global child_parallelism_semaphores - child_parallelism_semaphores = parallelism_semaphores - -def worker_run_one_test(test_index, test): - """Run one test in a multiprocessing.Pool - - Side effects in this function and functions it calls are not visible in the - main lit process. - - Arguments and results of this function are pickled, so they should be cheap - to copy. For efficiency, we copy all data needed to execute all tests into - each worker and store it in the child_* global variables. This reduces the - cost of each task. - - Returns an index and a Result, which the parent process uses to update - the display. - """ - try: - _execute_test_impl(test, child_lit_config, child_parallelism_semaphores) - return (test_index, test) - except KeyboardInterrupt as e: - # If a worker process gets an interrupt, abort it immediately. - abort_now() - except: - traceback.print_exc() Index: llvm/trunk/utils/lit/lit/util.py =================================================================== --- llvm/trunk/utils/lit/lit/util.py +++ llvm/trunk/utils/lit/lit/util.py @@ -424,3 +424,17 @@ psutilProc.kill() except psutil.NoSuchProcess: pass + + +try: + import win32api +except ImportError: + win32api = None + +def abort_now(): + """Abort the current process without doing any exception teardown""" + sys.stdout.flush() + if win32api: + win32api.TerminateProcess(win32api.GetCurrentProcess(), 3) + else: + os.kill(0, 9) Index: llvm/trunk/utils/lit/lit/worker.py =================================================================== --- llvm/trunk/utils/lit/lit/worker.py +++ llvm/trunk/utils/lit/lit/worker.py @@ -0,0 +1,82 @@ +# The functions in this module are meant to run on a separate worker process. +# Exception: in single process mode _execute_test is called directly. +import time +import traceback + +import lit.Test +import lit.util + +_lit_config = None +_parallelism_semaphores = None + +def initializer(lit_config, parallelism_semaphores): + """Copy expensive repeated data into worker processes""" + global _lit_config + global _parallelism_semaphores + _lit_config = lit_config + _parallelism_semaphores = parallelism_semaphores + +def run_one_test(test_index, test): + """Run one test in a multiprocessing.Pool + + Side effects in this function and functions it calls are not visible in the + main lit process. + + Arguments and results of this function are pickled, so they should be cheap + to copy. For efficiency, we copy all data needed to execute all tests into + each worker and store it in the worker_* global variables. This reduces the + cost of each task. + + Returns an index and a Result, which the parent process uses to update + the display. + """ + try: + _execute_test_in_parallelism_group(test, _lit_config, + _parallelism_semaphores) + return (test_index, test) + except KeyboardInterrupt: + # If a worker process gets an interrupt, abort it immediately. + lit.util.abort_now() + except: + traceback.print_exc() + +def _execute_test_in_parallelism_group(test, lit_config, parallelism_semaphores): + """Execute one test inside the appropriate parallelism group""" + pg = test.config.parallelism_group + if callable(pg): + pg = pg(test) + + if pg: + semaphore = parallelism_semaphores[pg] + try: + semaphore.acquire() + _execute_test(test, lit_config) + finally: + semaphore.release() + else: + _execute_test(test, lit_config) + +def _execute_test(test, lit_config): + """Execute one test""" + try: + start_time = time.time() + result = test.config.test_format.execute(test, lit_config) + # Support deprecated result from execute() which returned the result + # code and additional output as a tuple. + if isinstance(result, tuple): + code, output = result + result = lit.Test.Result(code, output) + elif not isinstance(result, lit.Test.Result): + raise ValueError("unexpected result from test execution") + result.elapsed = time.time() - start_time + except KeyboardInterrupt: + raise + except: + if lit_config.debug: + raise + output = 'Exception during script execution:\n' + output += traceback.format_exc() + output += '\n' + result = lit.Test.Result(lit.Test.UNRESOLVED, output) + + test.setResult(result)