Index: llvm/trunk/utils/lit/lit/run.py =================================================================== --- llvm/trunk/utils/lit/lit/run.py +++ llvm/trunk/utils/lit/lit/run.py @@ -24,140 +24,6 @@ else: os.kill(0, 9) -### -# Test Execution Implementation - -class LockedValue(object): - def __init__(self, value): - self.lock = threading.Lock() - self._value = value - - def _get_value(self): - self.lock.acquire() - try: - return self._value - finally: - self.lock.release() - - def _set_value(self, value): - self.lock.acquire() - try: - self._value = value - finally: - self.lock.release() - - value = property(_get_value, _set_value) - -class TestProvider(object): - def __init__(self, queue_impl, canceled_flag): - self.canceled_flag = canceled_flag - - # Create a shared queue to provide the test indices. - self.queue = queue_impl() - - def queue_tests(self, tests, num_jobs): - for i in range(len(tests)): - self.queue.put(i) - for i in range(num_jobs): - self.queue.put(None) - - def cancel(self): - self.canceled_flag.value = 1 - - def get(self): - # Check if we are canceled. - if self.canceled_flag.value: - return None - - # Otherwise take the next test. - return self.queue.get() - -class Tester(object): - def __init__(self, run_instance, provider, consumer): - self.run_instance = run_instance - self.provider = provider - self.consumer = consumer - - def run(self): - while True: - item = self.provider.get() - if item is None: - break - self.run_test(item) - self.consumer.task_finished() - - def run_test(self, test_index): - test = self.run_instance.tests[test_index] - try: - execute_test(test, self.run_instance.lit_config, - self.run_instance.parallelism_semaphores) - except KeyboardInterrupt: - # This is a sad hack. Unfortunately subprocess goes - # bonkers with ctrl-c and we start forking merrily. - print('\nCtrl-C detected, goodbye.') - abort_now() - self.consumer.update(test_index, test) - -class ThreadResultsConsumer(object): - def __init__(self, display): - self.display = display - self.lock = threading.Lock() - - def update(self, test_index, test): - self.lock.acquire() - try: - self.display.update(test) - finally: - self.lock.release() - - def task_finished(self): - pass - - def handle_results(self): - pass - -class MultiprocessResultsConsumer(object): - def __init__(self, run, display, num_jobs): - self.run = run - self.display = display - self.num_jobs = num_jobs - self.queue = multiprocessing.Queue() - - def update(self, test_index, test): - # This method is called in the child processes, and communicates the - # results to the actual display implementation via an output queue. - self.queue.put((test_index, test.result)) - - def task_finished(self): - # This method is called in the child processes, and communicates that - # individual tasks are complete. - self.queue.put(None) - - def handle_results(self): - # This method is called in the parent, and consumes the results from the - # output queue and dispatches to the actual display. The method will - # complete after each of num_jobs tasks has signalled completion. - completed = 0 - while completed != self.num_jobs: - # Wait for a result item. - item = self.queue.get() - if item is None: - completed += 1 - continue - - # Update the test result in the parent process. - index,result = item - test = self.run.tests[index] - test.result = result - - self.display.update(test) - -def run_one_tester(run, provider, display): - tester = Tester(run, provider, display) - tester.run() - -### - class _Display(object): def __init__(self, display, provider, maxFailures): self.display = display @@ -170,47 +36,6 @@ if self.failedCount == self.maxFailures: self.provider.cancel() -def handleFailures(provider, consumer, maxFailures): - consumer.display = _Display(consumer.display, provider, maxFailures) - -def execute_test(test, lit_config, parallelism_semaphores): - """Execute one test""" - pg = test.config.parallelism_group - if callable(pg): - pg = pg(test) - - result = None - semaphore = None - try: - if pg: - semaphore = parallelism_semaphores[pg] - if semaphore: - semaphore.acquire() - start_time = time.time() - result = test.config.test_format.execute(test, lit_config) - # Support deprecated result from execute() which returned the result - # code and additional output as a tuple. - if isinstance(result, tuple): - code, output = result - result = lit.Test.Result(code, output) - elif not isinstance(result, lit.Test.Result): - raise ValueError("unexpected result from test execution") - result.elapsed = time.time() - start_time - except KeyboardInterrupt: - raise - except: - if lit_config.debug: - raise - output = 'Exception during script execution:\n' - output += traceback.format_exc() - output += '\n' - result = lit.Test.Result(lit.Test.UNRESOLVED, output) - finally: - if semaphore: - semaphore.release() - - test.setResult(result) - class Run(object): """ This class represents a concrete, configured testing run. @@ -221,7 +46,8 @@ self.tests = tests def execute_test(self, test): - return execute_test(test, self.lit_config, self.parallelism_semaphores) + return _execute_test_impl(test, self.lit_config, + self.parallelism_semaphores) def execute_tests(self, display, jobs, max_time=None): """ @@ -350,6 +176,44 @@ self.failure_count == self.lit_config.maxFailures: self.hit_max_failures = True +def _execute_test_impl(test, lit_config, parallelism_semaphores): + """Execute one test""" + pg = test.config.parallelism_group + if callable(pg): + pg = pg(test) + + result = None + semaphore = None + try: + if pg: + semaphore = parallelism_semaphores[pg] + if semaphore: + semaphore.acquire() + start_time = time.time() + result = test.config.test_format.execute(test, lit_config) + # Support deprecated result from execute() which returned the result + # code and additional output as a tuple. + if isinstance(result, tuple): + code, output = result + result = lit.Test.Result(code, output) + elif not isinstance(result, lit.Test.Result): + raise ValueError("unexpected result from test execution") + result.elapsed = time.time() - start_time + except KeyboardInterrupt: + raise + except: + if lit_config.debug: + raise + output = 'Exception during script execution:\n' + output += traceback.format_exc() + output += '\n' + result = lit.Test.Result(lit.Test.UNRESOLVED, output) + finally: + if semaphore: + semaphore.release() + + test.setResult(result) + child_lit_config = None child_parallelism_semaphores = None @@ -375,7 +239,7 @@ the display. """ try: - execute_test(test, child_lit_config, child_parallelism_semaphores) + _execute_test_impl(test, child_lit_config, child_parallelism_semaphores) return (test_index, test) except KeyboardInterrupt as e: # If a worker process gets an interrupt, abort it immediately.