Index: test/dosep.py =================================================================== --- test/dosep.py +++ test/dosep.py @@ -32,6 +32,7 @@ echo core.%p | sudo tee /proc/sys/kernel/core_pattern """ +import asyncore import fnmatch import multiprocessing import multiprocessing.pool @@ -44,10 +45,9 @@ import sys import threading +import dotest_channels import dotest_args -from optparse import OptionParser - def get_timeout_command(): """Search for a suitable timeout command.""" @@ -76,6 +76,9 @@ dotest_options = None output_on_success = False +RESULTS_FORMATTER = None +RUNNER_PROCESS_ASYNC_MAP = None +RESULTS_LISTENER_CHANNEL = None def setup_global_variables(lock, counter, total, name_len, options): global output_lock, test_counter, total_tests, test_name_len @@ -147,12 +150,39 @@ return passes, failures, unexpected_successes +def inferior_session_interceptor(forwarding_func, event): + """Intercepts session begin/end events, passing through everyting else. + + @param forwarding_func a callable object to pass along the event if it + is not one that gets intercepted. + + @param event the test result event received. + """ + + if event is not None and isinstance(event, dict): + if "event" in event: + if event["event"] == "session_begin": + # Swallow it. Could report on inferior here if we + # cared. + return + elif event["event"] == "session_end": + # Swallow it. Could report on inferior here if we + # cared. More usefully, we can verify that the + # inferior went down hard if we don't receive this. + return + + # Pass it along. + forwarding_func(event) + + def call_with_timeout(command, timeout, name, inferior_pid_events): - """Run command with a timeout if possible.""" - """-s QUIT will create a coredump if they are enabled on your system""" + """Run command with a timeout if possible. + -s QUIT will create a coredump if they are enabled on your system + """ process = None if timeout_command and timeout != "0": command = [timeout_command, '-s', 'QUIT', timeout] + command + # Specifying a value for close_fds is unsupported on Windows when using # subprocess.PIPE if os.name != "nt": @@ -170,7 +200,14 @@ if inferior_pid_events: inferior_pid_events.put_nowait(('created', inferior_pid)) output = process.communicate() + + # The inferior should now be entirely wrapped up. exit_status = process.returncode + if exit_status is None: + raise Exception( + "no exit status available after the inferior dotest.py " + "should have completed") + if inferior_pid_events: inferior_pid_events.put_nowait(('destroyed', inferior_pid)) @@ -180,6 +217,10 @@ # only stderr does. report_test_pass(name, output[1]) else: + # TODO need to differentiate a failing test from a run that + # was broken out of by a SIGTERM/SIGKILL, reporting those as + # an error. If a signal-based completion, need to call that + # an error. report_test_failure(name, command, output[1]) return name, exit_status, passes, failures, unexpected_successes @@ -250,9 +291,7 @@ return process_dir(*args) -def process_dir_worker_threading( - a_test_counter, a_total_tests, a_test_name_len, - a_dotest_options, job_queue, result_queue, inferior_pid_events): +def process_dir_worker_threading(job_queue, result_queue, inferior_pid_events): """Worker thread main loop when in threading mode. This one supports the hand-rolled pooling support. @@ -413,6 +452,150 @@ initialize_global_vars_common(num_threads, test_work_items) +def ctrl_c_loop(main_op_func, done_func, ctrl_c_handler): + """Provides a main loop that is Ctrl-C protected. + + The main loop calls the main_op_func() repeatedly until done_func() + returns true. The ctrl_c_handler() method is called with a single + int parameter that contains the number of times the ctrl_c has been + hit (starting with 1). The ctrl_c_handler() should mutate whatever + it needs to have the done_func() return True as soon as it is desired + to exit the loop. + """ + done = False + ctrl_c_count = 0 + + while not done: + try: + # See if we're done. Start with done check since it is + # the first thing executed after a Ctrl-C handler in the + # following loop. + done = done_func() + if not done: + # Run the main op once. + main_op_func() + + except KeyboardInterrupt: + ctrl_c_count += 1 + ctrl_c_handler(ctrl_c_count) + + +def pump_workers_and_asyncore_map(workers, asyncore_map): + """Prunes out completed workers and maintains the asyncore loop. + + The asyncore loop contains the optional socket listener + and handlers. When all workers are complete, this method + takes care of stopping the listener. It also runs the + asyncore loop for the given async map for 10 iterations. + + @param workers the list of worker Thread/Process instances. + + @param asyncore_map the asyncore threading-aware map that + indicates which channels are in use and still alive. + """ + + # Check on all the workers, removing them from the workers + # list as they complete. + dead_workers = [] + for worker in workers: + # This non-blocking join call is what allows us + # to still receive keyboard interrupts. + worker.join(0.01) + if not worker.is_alive(): + dead_workers.append(worker) + # Clear out the completed workers + for dead_worker in dead_workers: + workers.remove(dead_worker) + + # If there are no more workers and there is a listener, + # close the listener. + global RESULTS_LISTENER_CHANNEL + if len(workers) == 0 and RESULTS_LISTENER_CHANNEL is not None: + RESULTS_LISTENER_CHANNEL.close() + RESULTS_LISTENER_CHANNEL = None + + # Pump the asyncore map if it isn't empty. + if len(asyncore_map) > 0: + asyncore.loop(0.1, False, asyncore_map, 10) + + +def handle_ctrl_c(ctrl_c_count, job_queue, workers, inferior_pid_events, + stop_all_inferiors_func): + """Performs the appropriate ctrl-c action for non-pool parallel test runners + + @param ctrl_c_count starting with 1, indicates the number of times ctrl-c + has been intercepted. The value is 1 on the first intercept, 2 on the + second, etc. + + @param job_queue a Queue object that contains the work still outstanding + (i.e. hasn't been assigned to a worker yet). + + @param workers list of Thread or Process workers. + + @param inferior_pid_events specifies a Queue of inferior process + construction and destruction events. Used to build the list of inferior + processes that should be killed if we get that far. + + @param stop_all_inferiors_func a callable object that takes the + workers and inferior_pid_events parameters (in that order) if a hard + stop is to be used on the workers. + """ + + # Print out which Ctrl-C we're handling. + key_name = [ + "first", + "second", + "third", + "many"] + + if ctrl_c_count < len(key_name): + name_index = ctrl_c_count - 1 + else: + name_index = len(key_name) - 1 + message = "\nHandling {} KeyboardInterrupt".format(key_name[name_index]) + with output_lock: + print message + + if ctrl_c_count == 1: + # Remove all outstanding items from the work queue so we stop + # doing any more new work. + while not job_queue.empty(): + try: + # Just drain it to stop more work from being started. + job_queue.get_nowait() + except Queue.Empty: + pass + with output_lock: + print "Stopped more work from being started." + elif ctrl_c_count == 2: + # Try to stop all inferiors, even the ones currently doing work. + stop_all_inferiors_func(workers, inferior_pid_events) + else: + with output_lock: + print "All teardown activities kicked off, should finish soon." + + +def workers_and_async_done(workers, async_map): + """Returns True if the workers list and asyncore channels are all done. + + @param workers list of workers (threads/processes). These must adhere + to the threading Thread or multiprocessing.Process interface. + + @param async_map the threading-aware asyncore channel map to check + for live channels. + + @return False if the workers list exists and has any entries in it, or + if the async_map exists and has any entries left in it; otherwise, True. + """ + if workers is not None and len(workers) > 0: + # We're not done if we still have workers left. + return False + if async_map is not None and len(async_map) > 0: + return False + # We're done. + return True + + def multiprocessing_test_runner(num_threads, test_work_items): """Provides hand-wrapped pooling test runner adapter with Ctrl-C support. @@ -464,36 +647,72 @@ worker.start() workers.append(worker) - # Wait for all workers to finish, handling ^C as needed. - try: - for worker in workers: - worker.join() - except KeyboardInterrupt: - # First try to drain the queue of work and let the - # running tests complete. - while not job_queue.empty(): - try: - # Just drain it to stop more work from being started. - job_queue.get_nowait() - except Queue.Empty: - pass + # Main loop: wait for all workers to finish and wait for + # the socket handlers to wrap up. + ctrl_c_loop( + # Main operation of loop + lambda: pump_workers_and_asyncore_map( + workers, RUNNER_PROCESS_ASYNC_MAP), - print ('\nFirst KeyboardInterrupt received, stopping ' - 'future work. Press again to hard-stop existing tests.') - try: - for worker in workers: - worker.join() - except KeyboardInterrupt: - print ('\nSecond KeyboardInterrupt received, killing ' - 'all worker process trees.') - kill_all_worker_processes(workers, inferior_pid_events) + # Return True when we're done with the main loop. + lambda: workers_and_async_done(workers, RUNNER_PROCESS_ASYNC_MAP), + + # Indicate what we do when we receive one or more Ctrl-Cs. + lambda ctrl_c_count: handle_ctrl_c( + ctrl_c_count, job_queue, workers, inferior_pid_events, + kill_all_worker_processes)) + # Reap the test results. test_results = [] while not result_queue.empty(): test_results.append(result_queue.get(block=False)) return test_results +def map_async_run_loop(future, channel_map, listener_channel): + """Blocks until the Pool.map_async completes and the channel completes. + + @param future an AsyncResult instance from a Pool.map_async() call. + + @param channel_map the asyncore dispatch channel map that should be pumped. + Optional: may be None. + + @param listener_channel the channel representing a listener that should be + closed once the map_async results are available. + + @return the results from the async_result instance. + """ + map_results = None + + done = False + while not done: + # Check if we need to reap the map results. + if map_results is None: + if future.ready(): + # Get the results. + map_results = future.get() + + # Close the runner process listener channel if we have + # one: no more connections will be incoming. + if listener_channel is not None: + listener_channel.close() + + # Pump the asyncore loop if we have a listener socket. + if channel_map is not None: + asyncore.loop(0.01, False, channel_map, 10) + + # Figure out if we're done running. + done = map_results is not None + if channel_map is not None: + # We have a runner process async map. Check if it + # is complete. + if len(channel_map) > 0: + # We still have an asyncore channel running. Not done yet. + done = False + + return map_results + + def multiprocessing_test_runner_pool(num_threads, test_work_items): # Initialize our global state. initialize_global_vars_multiprocessing(num_threads, test_work_items) @@ -503,7 +722,12 @@ initializer=setup_global_variables, initargs=(output_lock, test_counter, total_tests, test_name_len, dotest_options)) - return pool.map(process_dir_worker_multiprocessing_pool, test_work_items) + + # Start the map operation (async mode). + map_future = pool.map_async( + process_dir_worker_multiprocessing_pool, test_work_items) + return map_async_run_loop( + map_future, RUNNER_PROCESS_ASYNC_MAP, RESULTS_LISTENER_CHANNEL) def threading_test_runner(num_threads, test_work_items): @@ -541,53 +765,28 @@ for _ in range(num_threads): worker = threading.Thread( target=process_dir_worker_threading, - args=(test_counter, - total_tests, - test_name_len, - dotest_options, - job_queue, + args=(job_queue, result_queue, inferior_pid_events)) worker.start() workers.append(worker) - # Wait for all workers to finish, handling ^C as needed. - try: - # We do some trickery here to ensure we can catch keyboard - # interrupts. - while len(workers) > 0: - # Make a pass throug the workers, checking for who is done. - dead_workers = [] - for worker in workers: - # This non-blocking join call is what allows us - # to still receive keyboard interrupts. - worker.join(0.01) - if not worker.isAlive(): - dead_workers.append(worker) - # Clear out the completed workers - for dead_worker in dead_workers: - workers.remove(dead_worker) + # Main loop: wait for all workers to finish and wait for + # the socket handlers to wrap up. + ctrl_c_loop( + # Main operation of loop + lambda: pump_workers_and_asyncore_map( + workers, RUNNER_PROCESS_ASYNC_MAP), - except KeyboardInterrupt: - # First try to drain the queue of work and let the - # running tests complete. - while not job_queue.empty(): - try: - # Just drain it to stop more work from being started. - job_queue.get_nowait() - except Queue.Empty: - pass + # Return True when we're done with the main loop. + lambda: workers_and_async_done(workers, RUNNER_PROCESS_ASYNC_MAP), - print ('\nFirst KeyboardInterrupt received, stopping ' - 'future work. Press again to hard-stop existing tests.') - try: - for worker in workers: - worker.join() - except KeyboardInterrupt: - print ('\nSecond KeyboardInterrupt received, killing ' - 'all worker process trees.') - kill_all_worker_threads(workers, inferior_pid_events) + # Indicate what we do when we receive one or more Ctrl-Cs. + lambda ctrl_c_count: handle_ctrl_c( + ctrl_c_count, job_queue, workers, inferior_pid_events, + kill_all_worker_threads)) + # Reap the test results. test_results = [] while not result_queue.empty(): test_results.append(result_queue.get(block=False)) @@ -598,20 +797,49 @@ # Initialize our global state. initialize_global_vars_threading(num_threads, test_work_items) - pool = multiprocessing.pool.ThreadPool( - num_threads - # initializer=setup_global_variables, - # initargs=(output_lock, test_counter, total_tests, test_name_len, - # dotest_options) - ) - return pool.map(process_dir_worker_threading_pool, test_work_items) + pool = multiprocessing.pool.ThreadPool(num_threads) + map_future = pool.map_async( + process_dir_worker_threading_pool, test_work_items) + + return map_async_run_loop( + map_future, RUNNER_PROCESS_ASYNC_MAP, RESULTS_LISTENER_CHANNEL) + + +def asyncore_run_loop(channel_map): + try: + asyncore.loop(None, False, channel_map) + except: + # Swallow it, we're seeing: + # error: (9, 'Bad file descriptor') + # when the listener channel is closed. Shouldn't be the case. + pass def inprocess_exec_test_runner(test_work_items): # Initialize our global state. initialize_global_vars_multiprocessing(1, test_work_items) - return map(process_dir_mapper_inprocess, test_work_items) + # Run the listener and related channel maps in a separate thread. + # global RUNNER_PROCESS_ASYNC_MAP + global RESULTS_LISTENER_CHANNEL + if RESULTS_LISTENER_CHANNEL is not None: + socket_thread = threading.Thread( + target=lambda: asyncore_run_loop(RUNNER_PROCESS_ASYNC_MAP)) + socket_thread.start() + + # Do the work. + test_results = map(process_dir_mapper_inprocess, test_work_items) + + # If we have a listener channel, shut it down here. + if RESULTS_LISTENER_CHANNEL is not None: + # Close down the channel. + RESULTS_LISTENER_CHANNEL.close() + RESULTS_LISTENER_CHANNEL = None + + # Wait for the listener and handlers to complete. + socket_thread.join() + + return test_results def walk_and_invoke(test_directory, test_subdir, dotest_argv, test_runner_func): @@ -624,6 +852,22 @@ test_subdir - lldb/test/ or a subfolder with the tests we're interested in running """ + # The async_map is important to keep all thread-related asyncore + # channels distinct when we call asyncore.loop() later on. + global RESULTS_LISTENER_CHANNEL, RUNNER_PROCESS_ASYNC_MAP + RUNNER_PROCESS_ASYNC_MAP = {} + + # If we're outputting side-channel test results, create the socket + # listener channel and tell the inferior to send results to the + # port on which we'll be listening. + if RESULTS_FORMATTER is not None: + forwarding_func = lambda event: inferior_session_interceptor( + RESULTS_FORMATTER.process_event, event) + RESULTS_LISTENER_CHANNEL = ( + dotest_channels.UnpicklingForwardingListenerChannel( + RUNNER_PROCESS_ASYNC_MAP, "localhost", 0, forwarding_func)) + dotest_argv.append("--results-port") + dotest_argv.append(str(RESULTS_LISTENER_CHANNEL.address[1])) # Collect the test files that we'll run. test_work_items = [] @@ -654,7 +898,7 @@ if platform_name is None: target = sys.platform else: - m = re.search('remote-(\w+)', platform_name) + m = re.search(r'remote-(\w+)', platform_name) target = m.group(1) expected_timeout = set() @@ -759,20 +1003,94 @@ # threading-pool uses threading for the workers (in-process) # and uses the multiprocessing.pool thread-enabled pool. + # This does not properly support Ctrl-C. "threading-pool": (lambda work_items: threading_test_runner_pool( num_threads, work_items)), # serial uses the subprocess-based, single process # test runner. This provides process isolation but - # no concurrent test running. + # no concurrent test execution. "serial": inprocess_exec_test_runner } +def _remove_option(args, option_name, removal_count): + """Removes option and related option arguments from args array. + @param args the array of command line arguments (in/out) + @param option_name the full command line representation of the + option that will be removed (including '--' or '-'). + @param the count of elements to remove. A value of 1 will remove + just the found option, while 2 will remove the option and its first + argument. + """ + try: + index = args.index(option_name) + # Handle the exact match case. + del args[index:index+removal_count] + return + except ValueError: + # Thanks to argparse not handling options with known arguments + # like other options parsing libraries (see + # https://bugs.python.org/issue9334), we need to support the + # --results-formatter-options={second-level-arguments} (note + # the equal sign to fool the first-level arguments parser into + # not treating the second-level arguments as first-level + # options). We're certainly at risk of getting this wrong + # since now we're forced into the business of trying to figure + # out what is an argument (although I think this + # implementation will suffice). + regex_string = "^" + option_name + "=" + regex = re.compile(regex_string) + for index in range(len(args)): + match = regex.match(args[index]) + if match: + print "found matching option= at index {}".format(index) + del args[index] + return + print "failed to find regex '{}'".format(regex_string) + + # We didn't find the option but we should have. + raise Exception("failed to find option '{}' in args '{}'".format( + option_name, args)) + + +def adjust_inferior_options(dotest_argv): + """Adjusts the commandline args array for inferiors. + + This method adjusts the inferior dotest commandline options based + on the parallel test runner's options. Some of the inferior options + will need to change to properly handle aggregation functionality. + """ + global dotest_options + + # If we don't have a session directory, create one. + if not dotest_options.s: + # no session log directory, we need to add this to prevent + # every dotest invocation from creating its own directory + import datetime + # The windows platforms don't like ':' in the pathname. + timestamp_started = datetime.datetime.now().strftime("%F-%H_%M_%S") + dotest_argv.append('-s') + dotest_argv.append(timestamp_started) + dotest_options.s = timestamp_started + + # Adjust inferior results formatter options - if the parallel + # test runner is collecting into the user-specified test results, + # we'll have inferiors spawn with the --results-port option and + # strip the original test runner options. + if dotest_options.results_file is not None: + _remove_option(dotest_argv, "--results-file", 2) + if dotest_options.results_port is not None: + _remove_option(dotest_argv, "--results-port", 2) + if dotest_options.results_formatter is not None: + _remove_option(dotest_argv, "--results-formatter", 2) + if dotest_options.results_formatter_options is not None: + _remove_option(dotest_argv, "--results-formatter-options", 2) + def main(print_details_on_success, num_threads, test_subdir, - test_runner_name): + test_runner_name, results_formatter): """Run dotest.py in inferior mode in parallel. @param print_details_on_success the parsed value of the output-on-success @@ -793,53 +1111,31 @@ system to choose the most appropriate test runner given desired thread count and OS type. + @param results_formatter if specified, provides the TestResultsFormatter + instance that will format and output test result data from the + side-channel test results. When specified, inferior dotest calls + will send test results side-channel data over a socket to the parallel + test runner, which will forward them on to results_formatter. """ dotest_argv = sys.argv[1:] - global output_on_success + global output_on_success, RESULTS_FORMATTER output_on_success = print_details_on_success + RESULTS_FORMATTER = results_formatter # We can't use sys.path[0] to determine the script directory # because it doesn't work under a debugger - test_directory = os.path.dirname(os.path.realpath(__file__)) - parser = OptionParser(usage="""\ -Run lldb test suite using a separate process for each test file. - - Each test will run with a time limit of 10 minutes by default. - - Override the default time limit of 10 minutes by setting - the environment variable LLDB_TEST_TIMEOUT. - - E.g., export LLDB_TEST_TIMEOUT=10m - - Override the time limit for individual tests by setting - the environment variable LLDB_[TEST NAME]_TIMEOUT. - - E.g., export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=2m - - Set to "0" to run without time limit. - - E.g., export LLDB_TEST_TIMEOUT=0 - or export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=0 -""") parser = dotest_args.create_parser() global dotest_options dotest_options = dotest_args.parse_args(parser, dotest_argv) - if not dotest_options.s: - # no session log directory, we need to add this to prevent - # every dotest invocation from creating its own directory - import datetime - # The windows platforms don't like ':' in the pathname. - timestamp_started = datetime.datetime.now().strftime("%F-%H_%M_%S") - dotest_argv.append('-s') - dotest_argv.append(timestamp_started) - dotest_options.s = timestamp_started + adjust_inferior_options(dotest_argv) session_dir = os.path.join(os.getcwd(), dotest_options.s) # The root directory was specified on the command line + test_directory = os.path.dirname(os.path.realpath(__file__)) if test_subdir and len(test_subdir) > 0: test_subdir = os.path.join(test_directory, test_subdir) else: Index: test/dotest.py =================================================================== --- test/dotest.py +++ test/dotest.py @@ -20,15 +20,20 @@ for available options. """ +import atexit import commands +import importlib import os import dotest_args import errno import platform import progress import signal +import socket import subprocess import sys +import test_results +from test_results import EventBuilder import inspect import unittest2 import lldbtest_config @@ -251,6 +256,14 @@ no_multiprocess_test_runner = False test_runner_name = None +# Test results handling globals +results_filename = None +results_port = None +results_file_object = None +results_formatter_name = None +results_formatter_object = None +results_formatter_options = None + def usage(parser): parser.print_help() if verbose > 0: @@ -497,6 +510,10 @@ global output_on_success global no_multiprocess_test_runner global test_runner_name + global results_filename + global results_formatter_name + global results_formatter_options + global results_port do_help = False @@ -782,6 +799,24 @@ if args.test_runner_name: test_runner_name = args.test_runner_name + # Capture test results-related args. + if args.results_file: + results_filename = args.results_file + + if args.results_port: + results_port = args.results_port + + if args.results_file and args.results_port: + sys.stderr.write( + "only one of --results-file and --results-port should " + "be specified\n") + usage(args) + + if args.results_formatter: + results_formatter_name = args.results_formatter + if args.results_formatter_options: + results_formatter_options = args.results_formatter_options + if args.lldb_platform_name: lldb_platform_name = args.lldb_platform_name if args.lldb_platform_url: @@ -886,6 +921,85 @@ return result + +def createSocketToLocalPort(port): + def socket_closer(s): + """Close down an opened socket properly.""" + s.shutdown(socket.SHUT_RDWR) + s.close() + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(("localhost", port)) + return (sock, lambda: socket_closer(sock)) + + +def setupTestResults(): + """Sets up test results-related objects based on arg settings.""" + global results_filename + global results_file_object + global results_formatter_name + global results_formatter_object + global results_formatter_options + global results_port + + default_formatter_name = None + cleanup_func = None + + if results_filename: + # Open the results file for writing. + results_file_object = open(results_filename, "w") + cleanup_func = results_file_object.close + default_formatter_name = "test_results.XunitFormatter" + elif results_port: + # Connect to the specified localhost port. + results_file_object, cleanup_func = createSocketToLocalPort( + results_port) + default_formatter_name = "test_results.RawPickledFormatter" + + if results_file_object: + # We care about the formatter. Choose user-specified or, if + # none specified, use the default for the output type. + if results_formatter_name: + formatter_name = results_formatter_name + else: + formatter_name = default_formatter_name + + # Create an instance of the class. First figure out the package/module. + components = formatter_name.split(".") + module = importlib.import_module(".".join(components[:-1])) + + # Create the class name we need to load. + clazz = getattr(module, components[-1]) + + # Handle formatter options for the results formatter class. + formatter_arg_parser = clazz.arg_parser() + if results_formatter_options and len(results_formatter_options) > 0: + import shlex + split_options = shlex.split(results_formatter_options) + else: + split_options = [] + + formatter_options = formatter_arg_parser.parse_args(split_options) + + # Create the TestResultsFormatter given the processed options. + results_formatter_object = clazz(results_file_object, formatter_options) + + # Start the results formatter session - we'll only have one + # during a given dotest process invocation. + results_formatter_object.begin_session() + + def shutdown_formatter(): + # Tell the formatter to write out anything it may have + # been saving until the very end (e.g. xUnit results + # can't complete its output until this point). + results_formatter_object.end_session() + + # And now close out the output file-like object. + cleanup_func() + + atexit.register(shutdown_formatter) + + def getOutputPaths(lldbRootDirectory): """ Returns typical build output paths for the lldb executable @@ -1293,13 +1407,20 @@ # parseOptionsAndInitTestdirs() + # Setup test results (test results formatter and output handling). + setupTestResults() + # If we are running as the multiprocess test runner, kick off the # multiprocess test runner here. if isMultiprocessTestRunner(): import dosep dosep.main(output_on_success, num_threads, multiprocess_test_subdir, - test_runner_name) + test_runner_name, results_formatter_object) raise Exception("should never get here") + elif is_inferior_test_runner: + # Shut off Ctrl-C processing in inferiors. The parallel + # test runner handles this more holistically. + signal.signal(signal.SIGINT, signal.SIG_IGN) setupSysPath() setupCrashInfoHook() @@ -1653,6 +1774,7 @@ self.progressbar = progress.ProgressWithEvents(stdout=self.stream,start=0,end=suite.countTestCases(),width=width-10) except: self.progressbar = None + self.results_formatter = results_formatter_object def _config_string(self, test): compiler = getattr(test, "getCompiler", None) @@ -1729,12 +1851,18 @@ if self.showAll: self.stream.write(self.fmt % self.counter) super(LLDBTestResult, self).startTest(test) + if self.results_formatter: + self.results_formatter.process_event( + EventBuilder.event_for_start(test)) def addSuccess(self, test): global parsable super(LLDBTestResult, self).addSuccess(test) if parsable: self.stream.write("PASS: LLDB (%s) :: %s\n" % (self._config_string(test), str(test))) + if self.results_formatter: + self.results_formatter.process_event( + EventBuilder.event_for_success(test)) def addError(self, test, err): global sdir_has_content @@ -1746,6 +1874,9 @@ method() if parsable: self.stream.write("FAIL: LLDB (%s) :: %s\n" % (self._config_string(test), str(test))) + if self.results_formatter: + self.results_formatter.process_event( + EventBuilder.event_for_error(test, err)) def addCleanupError(self, test, err): global sdir_has_content @@ -1757,6 +1888,10 @@ method() if parsable: self.stream.write("CLEANUP ERROR: LLDB (%s) :: %s\n" % (self._config_string(test), str(test))) + if self.results_formatter: + self.results_formatter.process_event( + EventBuilder.event_for_cleanup_error( + test, err)) def addFailure(self, test, err): global sdir_has_content @@ -1776,6 +1911,10 @@ failuresPerCategory[category] = failuresPerCategory[category] + 1 else: failuresPerCategory[category] = 1 + if self.results_formatter: + self.results_formatter.process_event( + EventBuilder.event_for_failure(test, err)) + def addExpectedFailure(self, test, err, bugnumber): global sdir_has_content @@ -1787,6 +1926,10 @@ method(err, bugnumber) if parsable: self.stream.write("XFAIL: LLDB (%s) :: %s\n" % (self._config_string(test), str(test))) + if self.results_formatter: + self.results_formatter.process_event( + EventBuilder.event_for_expected_failure( + test, err, bugnumber)) def addSkip(self, test, reason): global sdir_has_content @@ -1798,6 +1941,9 @@ method() if parsable: self.stream.write("UNSUPPORTED: LLDB (%s) :: %s (%s) \n" % (self._config_string(test), str(test), reason)) + if self.results_formatter: + self.results_formatter.process_event( + EventBuilder.event_for_skip(test, reason)) def addUnexpectedSuccess(self, test, bugnumber): global sdir_has_content @@ -1809,6 +1955,11 @@ method(bugnumber) if parsable: self.stream.write("XPASS: LLDB (%s) :: %s\n" % (self._config_string(test), str(test))) + if self.results_formatter: + self.results_formatter.process_event( + EventBuilder.event_for_unexpected_success( + test, bugnumber)) + if parsable: v = 0 Index: test/dotest_args.py =================================================================== --- test/dotest_args.py +++ test/dotest_args.py @@ -140,6 +140,31 @@ ' multiprocessing-pool, serial, threading, threading-pool') ) + # Test results support. + group = parser.add_argument_group('Test results options') + group.add_argument( + '--results-file', + action='store', + help=('Specifies the file where test results will be written ' + 'according to the results-formatter class used')) + group.add_argument( + '--results-port', + action='store', + type=int, + help=('Specifies the localhost port to which the results ' + 'formatted output should be sent')) + group.add_argument( + '--results-formatter', + action='store', + help=('Specifies the full package/module/class name used to translate ' + 'test events into some kind of meaningful report, written to ' + 'the designated output results file-like object')) + group.add_argument( + '--results-formatter-options', + action='store', + help=('Specify comma-separated options to pass to the formatter. ' + 'Use --results-formatter-options="--option1[,--option2[,...]]" ' + 'syntax. Note the "=" is critical, and don\'t use whitespace.')) # Remove the reference to our helper function del X Index: test/dotest_channels.py =================================================================== --- /dev/null +++ test/dotest_channels.py @@ -0,0 +1,173 @@ +""" + The LLVM Compiler Infrastructure + +This file is distributed under the University of Illinois Open Source +License. See LICENSE.TXT for details. + +Sync lldb and related source from a local machine to a remote machine. + +This facilitates working on the lldb sourcecode on multiple machines +and multiple OS types, verifying changes across all. + + +This module provides asyncore channels used within the LLDB test +framework. +""" + +import asyncore +import cPickle +import socket + + +class UnpicklingForwardingReaderChannel(asyncore.dispatcher): + """Provides an unpickling, forwarding asyncore dispatch channel reader. + + Inferior dotest.py processes with side-channel-based test results will + send test result event data in a pickled format, one event at a time. + This class supports reconstructing the pickled data and forwarding it + on to its final destination. + + The channel data is written in the form: + {num_payload_bytes}#{payload_bytes} + + The bulk of this class is devoted to reading and parsing out + the payload bytes. + """ + def __init__(self, file_object, async_map, forwarding_func): + asyncore.dispatcher.__init__(self, sock=file_object, map=async_map) + + self.header_contents = '' + self.packet_bytes_remaining = 0 + self.reading_header = True + self.ibuffer = '' + self.forwarding_func = forwarding_func + if forwarding_func is None: + # This whole class is useless if we do nothing with the + # unpickled results. + raise Exception("forwarding function must be set") + + def deserialize_payload(self): + """Unpickles the collected input buffer bytes and forwards.""" + if len(self.ibuffer) > 0: + self.forwarding_func(cPickle.loads(self.ibuffer)) + self.ibuffer = '' + + def consume_header_bytes(self, data): + """Consumes header bytes from the front of data. + @param data the incoming data stream bytes + @return any data leftover after consuming header bytes. + """ + # We're done if there is no content. + if not data or (len(data) == 0): + return None + + for index in range(len(data)): + byte = data[index] + if byte != '#': + # Header byte. + self.header_contents += byte + else: + # End of header. + self.packet_bytes_remaining = int(self.header_contents) + self.header_contents = '' + self.reading_header = False + return data[(index+1):] + + # If we made it here, we've exhausted the data and + # we're still parsing header content. + return None + + def consume_payload_bytes(self, data): + """Consumes payload bytes from the front of data. + @param data the incoming data stream bytes + @return any data leftover after consuming remaining payload bytes. + """ + if not data or (len(data) == 0): + # We're done and there's nothing to do. + return None + + data_len = len(data) + if data_len <= self.packet_bytes_remaining: + # We're consuming all the data provided. + self.ibuffer += data + self.packet_bytes_remaining -= data_len + + # If we're no longer waiting for payload bytes, + # we flip back to parsing header bytes and we + # unpickle the payload contents. + if self.packet_bytes_remaining < 1: + self.reading_header = True + self.deserialize_payload() + + # We're done, no more data left. + return None + else: + # We're only consuming a portion of the data since + # the data contains more than the payload amount. + self.ibuffer += data[:self.packet_bytes_remaining] + data = data[self.packet_bytes_remaining:] + + # We now move on to reading the header. + self.reading_header = True + self.packet_bytes_remaining = 0 + + # And we can deserialize the payload. + self.deserialize_payload() + + # Return the remaining data. + return data + + def handle_read(self): + data = self.recv(8192) + # print 'driver socket READ: %d bytes' % len(data) + + while data and (len(data) > 0): + # If we're reading the header, gather header bytes. + if self.reading_header: + data = self.consume_header_bytes(data) + else: + data = self.consume_payload_bytes(data) + + def handle_close(self): + # print "socket reader: closing port" + self.close() + + +class UnpicklingForwardingListenerChannel(asyncore.dispatcher): + """Provides a socket listener asyncore channel for unpickling/forwarding. + + This channel will listen on a socket port (use 0 for host-selected). Any + client that connects will have an UnpicklingForwardingReaderChannel handle + communication over the connection. + + The dotest parallel test runners, when collecting test results, open the + test results side channel over a socket. This channel handles connections + from inferiors back to the test runner. Each worker fires up a listener + for each inferior invocation. This simplifies the asyncore.loop() usage, + one of the reasons for implementing with asyncore. This listener shuts + down once a single connection is made to it. + """ + def __init__(self, async_map, host, port, forwarding_func): + asyncore.dispatcher.__init__(self, map=async_map) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind((host, port)) + self.address = self.socket.getsockname() + self.listen(5) + self.handler = None + self.async_map = async_map + self.forwarding_func = forwarding_func + if forwarding_func is None: + # This whole class is useless if we do nothing with the + # unpickled results. + raise Exception("forwarding function must be set") + + def handle_accept(self): + (sock, addr) = self.socket.accept() + if sock and addr: + # print 'Incoming connection from %s' % repr(addr) + self.handler = UnpicklingForwardingReaderChannel( + sock, self.async_map, self.forwarding_func) + + def handle_close(self): + self.close() Index: test/settings/TestSettings.py =================================================================== --- test/settings/TestSettings.py +++ test/settings/TestSettings.py @@ -165,7 +165,7 @@ self.expect("settings show auto-confirm", SETTING_MSG("auto-confirm"), startstr = "auto-confirm (boolean) = false") - @skipUnlessArch(['x86-64', 'i386', 'i686']) + @skipUnlessArch(['x86_64', 'i386', 'i686']) def test_disassembler_settings(self): """Test that user options for the disassembler take effect.""" self.buildDefault() @@ -449,7 +449,7 @@ SETTING_MSG("disassembly-format"), substrs = [ 'disassembly-format (format-string) = "foo "']) self.runCmd("settings clear disassembly-format", check=False) - + def test_all_settings_exist (self): self.expect ("settings show", substrs = [ "auto-confirm", Index: test/test_results.py =================================================================== --- /dev/null +++ test/test_results.py @@ -0,0 +1,778 @@ +""" + The LLVM Compiler Infrastructure + +This file is distributed under the University of Illinois Open Source +License. See LICENSE.TXT for details. + +Provides classes used by the test results reporting infrastructure +within the LLDB test suite. +""" + +import argparse +import cPickle +import inspect +import os +import sys +import threading +import time +import xml.sax.saxutils + + +class EventBuilder(object): + """Helper class to build test result event dictionaries.""" + @staticmethod + def _get_test_name_info(test): + """Returns (test-class-name, test-method-name) from a test case instance. + + @param test a unittest.TestCase instance. + + @return tuple containing (test class name, test method name) + """ + test_class_components = test.id().split(".") + test_class_name = ".".join(test_class_components[:-1]) + test_name = test_class_components[-1] + return (test_class_name, test_name) + + @staticmethod + def _event_dictionary_common(test, event_type): + """Returns an event dictionary setup with values for the given event type. + + @param test the unittest.TestCase instance + + @param event_type the name of the event type (string). + + @return event dictionary with common event fields set. + """ + test_class_name, test_name = EventBuilder._get_test_name_info(test) + return { + "event": event_type, + "test_class": test_class_name, + "test_name": test_name, + "event_time": time.time() + } + + @staticmethod + def _error_tuple_class(error_tuple): + """Returns the unittest error tuple's error class as a string. + + @param error_tuple the error tuple provided by the test framework. + + @return the error type (typically an exception) raised by the + test framework. + """ + type_var = error_tuple[0] + module = inspect.getmodule(type_var) + if module: + return "{}.{}".format(module.__name__, type_var.__name__) + else: + return type_var.__name__ + + @staticmethod + def _error_tuple_message(error_tuple): + """Returns the unittest error tuple's error message. + + @param error_tuple the error tuple provided by the test framework. + + @return the error message provided by the test framework. + """ + return str(error_tuple[1]) + + @staticmethod + def _event_dictionary_test_result(test, status): + """Returns an event dictionary with common test result fields set. + + @param test a unittest.TestCase instance. + + @param status the status/result of the test + (e.g. "success", "failure", etc.) + + @return the event dictionary + """ + event = EventBuilder._event_dictionary_common(test, "test_result") + event["status"] = status + return event + + @staticmethod + def _event_dictionary_issue(test, status, error_tuple): + """Returns an event dictionary with common issue-containing test result + fields set. + + @param test a unittest.TestCase instance. + + @param status the status/result of the test + (e.g. "success", "failure", etc.) + + @param error_tuple the error tuple as reported by the test runner. + This is of the form (type, error). + + @return the event dictionary + """ + event = EventBuilder._event_dictionary_test_result(test, status) + event["issue_class"] = EventBuilder._error_tuple_class(error_tuple) + event["issue_message"] = EventBuilder._error_tuple_message(error_tuple) + return event + + @staticmethod + def event_for_start(test): + """Returns an event dictionary for the test start event. + + @param test a unittest.TestCase instance. + + @return the event dictionary + """ + return EventBuilder._event_dictionary_common(test, "test_start") + + @staticmethod + def event_for_success(test): + """Returns an event dictionary for a successful test. + + @param test a unittest.TestCase instance. + + @return the event dictionary + """ + return EventBuilder._event_dictionary_test_result(test, "success") + + @staticmethod + def event_for_unexpected_success(test, bugnumber): + """Returns an event dictionary for a test that succeeded but was + expected to fail. + + @param test a unittest.TestCase instance. + + @param bugnumber the issue identifier for the bug tracking the + fix request for the test expected to fail (but is in fact + passing here). + + @return the event dictionary + + """ + event = EventBuilder._event_dictionary_test_result( + test, "unexpected_success") + if bugnumber: + event["bugnumber"] = str(bugnumber) + return event + + @staticmethod + def event_for_failure(test, error_tuple): + """Returns an event dictionary for a test that failed. + + @param test a unittest.TestCase instance. + + @param error_tuple the error tuple as reported by the test runner. + This is of the form (type, error). + + @return the event dictionary + """ + return EventBuilder._event_dictionary_issue( + test, "failure", error_tuple) + + @staticmethod + def event_for_expected_failure(test, error_tuple, bugnumber): + """Returns an event dictionary for a test that failed as expected. + + @param test a unittest.TestCase instance. + + @param error_tuple the error tuple as reported by the test runner. + This is of the form (type, error). + + @param bugnumber the issue identifier for the bug tracking the + fix request for the test expected to fail. + + @return the event dictionary + + """ + event = EventBuilder._event_dictionary_issue( + test, "expected_failure", error_tuple) + if bugnumber: + event["bugnumber"] = str(bugnumber) + return event + + @staticmethod + def event_for_skip(test, reason): + """Returns an event dictionary for a test that was skipped. + + @param test a unittest.TestCase instance. + + @param reason the reason why the test is being skipped. + + @return the event dictionary + """ + event = EventBuilder._event_dictionary_test_result(test, "skip") + event["skip_reason"] = reason + return event + + @staticmethod + def event_for_error(test, error_tuple): + """Returns an event dictionary for a test that hit a test execution error. + + @param test a unittest.TestCase instance. + + @param error_tuple the error tuple as reported by the test runner. + This is of the form (type, error). + + @return the event dictionary + """ + return EventBuilder._event_dictionary_issue(test, "error", error_tuple) + + @staticmethod + def event_for_cleanup_error(test, error_tuple): + """Returns an event dictionary for a test that hit a test execution error + during the test cleanup phase. + + @param test a unittest.TestCase instance. + + @param error_tuple the error tuple as reported by the test runner. + This is of the form (type, error). + + @return the event dictionary + """ + event = EventBuilder._event_dictionary_issue( + test, "error", error_tuple) + event["issue_phase"] = "cleanup" + return event + + +class ResultsFormatter(object): + """Provides interface to formatting test results out to a file-like object. + + This class allows the LLDB test framework's raw test-realted + events to be processed and formatted in any manner desired. + Test events are represented by python dictionaries, formatted + as in the EventBuilder class above. + + ResultFormatter instances are given a file-like object in which + to write their results. + + ResultFormatter lifetime looks like the following: + + # The result formatter is created. + # The argparse options dictionary is generated from calling + # the SomeResultFormatter.arg_parser() with the options data + # passed to dotest.py via the "--results-formatter-options" + # argument. See the help on that for syntactic requirements + # on getting that parsed correctly. + formatter = SomeResultFormatter(file_like_object, argpared_options_dict) + + # Single call to session start, before parsing any events. + formatter.begin_session() + + # Zero or more calls specified for events recorded during the test session. + # The parallel test runner manages getting results from all the inferior + # dotest processes, so from a new format perspective, don't worry about + # that. The formatter will be presented with a single stream of events + # sandwiched between a single begin_session()/end_session() pair in the + # parallel test runner process/thread. + for event in zero_or_more_test_events(): + formatter.process_event(event) + + # Single call to session end. Formatters that need all the data before + # they can print a correct result (e.g. xUnit/JUnit), this is where + # the final report can be generated. + formatter.end_session() + + It is not the formatter's responsibility to close the file_like_object. + (i.e. do not close it). + + The lldb test framework passes these test events in real time, so they + arrive as they come in. + + In the case of the parallel test runner, the dotest inferiors + add a 'pid' field to the dictionary that indicates which inferior + pid generated the event. + + Note more events may be added in the future to support richer test + reporting functionality. One example: creating a true flaky test + result category so that unexpected successes really mean the test + is marked incorrectly (either should be marked flaky, or is indeed + passing consistently now and should have the xfail marker + removed). In this case, a flaky_success and flaky_fail event + likely will be added to capture these and support reporting things + like percentages of flaky test passing so we can see if we're + making some things worse/better with regards to failure rates. + + Another example: announcing all the test methods that are planned + to be run, so we can better support redo operations of various kinds + (redo all non-run tests, redo non-run tests except the one that + was running [perhaps crashed], etc.) + + Implementers are expected to override all the public methods + provided in this class. See each method's docstring to see + expectations about when the call should be chained. + + """ + + @classmethod + def arg_parser(cls): + """@return arg parser used to parse formatter-specific options.""" + parser = argparse.ArgumentParser( + description='{} options'.format(cls.__name__), + usage=('dotest.py --results-formatter-options=' + '"--option1 value1 [--option2 value2 [...]]"')) + return parser + + def __init__(self, out_file, options): + super(ResultsFormatter, self).__init__() + self.out_file = out_file + self.options = options + if not self.out_file: + raise Exception("ResultsFormatter created with no file object") + self.start_time_by_test = {} + + # Lock that we use while mutating inner state, like the + # total test count and the elements. We minimize how + # long we hold the lock just to keep inner state safe, not + # entirely consistent from the outside. + self.lock = threading.Lock() + + def begin_session(self): + """Begins a test session. + + All process_event() calls must be sandwiched between + begin_session() and end_session() calls. + + Derived classes may override this but should call this first. + """ + pass + + def end_session(self): + """Ends a test session. + + All process_event() calls must be sandwiched between + begin_session() and end_session() calls. + + All results formatting should be sent to the output + file object by the end of this call. + + Derived classes may override this but should call this after + the dervied class's behavior is complete. + """ + pass + + def process_event(self, test_event): + """Processes the test event for collection into the formatter output. + + Derived classes may override this but should call down to this + implementation first. + + @param test_event the test event as formatted by one of the + event_for_* calls. + """ + pass + + def track_start_time(self, test_class, test_name, start_time): + """Tracks the start time of a test so elapsed time can be computed. + + This alleviates the need for test results to be processed serially + by test. It will save the start time for the test so that + elapsed_time_for_test() can compute the elapsed time properly. + """ + if test_class is None or test_name is None: + return + + test_key = "{}.{}".format(test_class, test_name) + with self.lock: + self.start_time_by_test[test_key] = start_time + + def elapsed_time_for_test(self, test_class, test_name, end_time): + """Returns the elapsed time for a test. + + This function can only be called once per test and requires that + the track_start_time() method be called sometime prior to calling + this method. + """ + if test_class is None or test_name is None: + return -2.0 + + test_key = "{}.{}".format(test_class, test_name) + with self.lock: + if test_key not in self.start_time_by_test: + return -1.0 + else: + start_time = self.start_time_by_test[test_key] + del self.start_time_by_test[test_key] + return end_time - start_time + + +class XunitFormatter(ResultsFormatter): + """Provides xUnit-style formatted output. + """ + + # Result mapping arguments + RM_IGNORE = 'ignore' + RM_SUCCESS = 'success' + RM_FAILURE = 'failure' + RM_PASSTHRU = 'passthru' + + @staticmethod + def _quote_attribute(text): + """Returns the given text in a manner safe for usage in an XML attribute. + + @param text the text that should appear within an XML attribute. + @return the attribute-escaped version of the input text. + """ + return xml.sax.saxutils.quoteattr(text) + + @classmethod + def arg_parser(cls): + """@return arg parser used to parse formatter-specific options.""" + parser = super(XunitFormatter, cls).arg_parser() + + # These are valid choices for results mapping. + results_mapping_choices = [ + XunitFormatter.RM_IGNORE, + XunitFormatter.RM_SUCCESS, + XunitFormatter.RM_FAILURE, + XunitFormatter.RM_PASSTHRU] + parser.add_argument( + "--xpass", action="store", choices=results_mapping_choices, + default=XunitFormatter.RM_FAILURE, + help=('specify mapping from unexpected success to jUnit/xUnit ' + 'result type')) + parser.add_argument( + "--xfail", action="store", choices=results_mapping_choices, + default=XunitFormatter.RM_IGNORE, + help=('specify mapping from expected failure to jUnit/xUnit ' + 'result type')) + return parser + + def __init__(self, out_file, options): + """Initializes the XunitFormatter instance. + @param out_file file-like object where formatted output is written. + @param options_dict specifies a dictionary of options for the + formatter. + """ + # Initialize the parent + super(XunitFormatter, self).__init__(out_file, options) + self.text_encoding = "UTF-8" + + self.total_test_count = 0 + + self.elements = { + "successes": [], + "errors": [], + "failures": [], + "skips": [], + "unexpected_successes": [], + "expected_failures": [], + "all": [] + } + + self.status_handlers = { + "success": self._handle_success, + "failure": self._handle_failure, + "error": self._handle_error, + "skip": self._handle_skip, + "expected_failure": self._handle_expected_failure, + "unexpected_success": self._handle_unexpected_success + } + + def begin_session(self): + super(XunitFormatter, self).begin_session() + + def process_event(self, test_event): + super(XunitFormatter, self).process_event(test_event) + + event_type = test_event["event"] + if event_type is None: + return + + if event_type == "test_start": + self.track_start_time( + test_event["test_class"], + test_event["test_name"], + test_event["event_time"]) + elif event_type == "test_result": + self._process_test_result(test_event) + else: + sys.stderr.write("unknown event type {} from {}\n".format( + event_type, test_event)) + + def _handle_success(self, test_event): + """Handles a test success. + @param test_event the test event to handle. + """ + result = self._common_add_testcase_entry(test_event) + with self.lock: + self.elements["successes"].append(result) + + def _handle_failure(self, test_event): + """Handles a test failure. + @param test_event the test event to handle. + """ + result = self._common_add_testcase_entry( + test_event, + inner_content=''.format( + XunitFormatter._quote_attribute(test_event["issue_class"]), + XunitFormatter._quote_attribute(test_event["issue_message"]))) + with self.lock: + self.elements["failures"].append(result) + + def _handle_error(self, test_event): + """Handles a test error. + @param test_event the test event to handle. + """ + result = self._common_add_testcase_entry( + test_event, + inner_content=''.format( + XunitFormatter._quote_attribute(test_event["issue_class"]), + XunitFormatter._quote_attribute(test_event["issue_message"]))) + with self.lock: + self.elements["errors"].append(result) + + def _handle_skip(self, test_event): + """Handles a skipped test. + @param test_event the test event to handle. + """ + result = self._common_add_testcase_entry( + test_event, + inner_content=''.format( + XunitFormatter._quote_attribute(test_event["skip_reason"]))) + with self.lock: + self.elements["skips"].append(result) + + def _handle_expected_failure(self, test_event): + """Handles a test that failed as expected. + @param test_event the test event to handle. + """ + if self.options.xfail == XunitFormatter.RM_PASSTHRU: + # This is not a natively-supported junit/xunit + # testcase mode, so it might fail a validating + # test results viewer. + if "bugnumber" in test_event: + bug_id_attribute = 'bug-id={} '.format( + XunitFormatter._quote_attribute(test_event["bugnumber"])) + else: + bug_id_attribute = '' + + result = self._common_add_testcase_entry( + test_event, + inner_content=( + ''.format( + bug_id_attribute, + XunitFormatter._quote_attribute( + test_event["issue_class"]), + XunitFormatter._quote_attribute( + test_event["issue_message"])) + )) + with self.lock: + self.elements["expected_failures"].append(result) + elif self.options.xfail == XunitFormatter.RM_SUCCESS: + result = self._common_add_testcase_entry(test_event) + with self.lock: + self.elements["successes"].append(result) + elif self.options.xfail == XunitFormatter.RM_FAILURE: + result = self._common_add_testcase_entry( + test_event, + inner_content=''.format( + XunitFormatter._quote_attribute(test_event["issue_class"]), + XunitFormatter._quote_attribute( + test_event["issue_message"]))) + with self.lock: + self.elements["failures"].append(result) + elif self.options.xfail == XunitFormatter.RM_IGNORE: + pass + else: + raise Exception( + "unknown xfail option: {}".format(self.options.xfail)) + + def _handle_unexpected_success(self, test_event): + """Handles a test that passed but was expected to fail. + @param test_event the test event to handle. + """ + if self.options.xpass == XunitFormatter.RM_PASSTHRU: + # This is not a natively-supported junit/xunit + # testcase mode, so it might fail a validating + # test results viewer. + result = self._common_add_testcase_entry( + test_event, + inner_content=("")) + with self.lock: + self.elements["unexpected_successes"].append(result) + elif self.options.xpass == XunitFormatter.RM_SUCCESS: + # Treat the xpass as a success. + result = self._common_add_testcase_entry(test_event) + with self.lock: + self.elements["successes"].append(result) + elif self.options.xpass == XunitFormatter.RM_FAILURE: + # Treat the xpass as a failure. + if "bugnumber" in test_event: + message = "unexpected success (bug_id:{})".format( + test_event["bugnumber"]) + else: + message = "unexpected success (bug_id:none)" + result = self._common_add_testcase_entry( + test_event, + inner_content=''.format( + XunitFormatter._quote_attribute("unexpected_success"), + XunitFormatter._quote_attribute(message))) + with self.lock: + self.elements["failures"].append(result) + elif self.options.xpass == XunitFormatter.RM_IGNORE: + # Ignore the xpass result as far as xUnit reporting goes. + pass + else: + raise Exception("unknown xpass option: {}".format( + self.options.xpass)) + + def _process_test_result(self, test_event): + """Processes the test_event known to be a test result. + + This categorizes the event appropriately and stores the data needed + to generate the final xUnit report. This method skips events that + cannot be represented in xUnit output. + """ + if "status" not in test_event: + raise Exception("test event dictionary missing 'status' key") + + status = test_event["status"] + if status not in self.status_handlers: + raise Exception("test event status '{}' unsupported".format( + status)) + + # Call the status handler for the test result. + self.status_handlers[status](test_event) + + def _common_add_testcase_entry(self, test_event, inner_content=None): + """Registers a testcase result, and returns the text created. + + The caller is expected to manage failure/skip/success counts + in some kind of appropriate way. This call simply constructs + the XML and appends the returned result to the self.all_results + list. + + @param test_event the test event dictionary. + + @param inner_content if specified, gets included in the + inner section, at the point before stdout and stderr would be + included. This is where a , , , etc. + could go. + + @return the text of the xml testcase element. + """ + + # Get elapsed time. + test_class = test_event["test_class"] + test_name = test_event["test_name"] + event_time = test_event["event_time"] + time_taken = self.elapsed_time_for_test( + test_class, test_name, event_time) + + # Plumb in stdout/stderr once we shift over to only test results. + test_stdout = '' + test_stderr = '' + + # Formulate the output xml. + if not inner_content: + inner_content = "" + result = ( + '' + '{}{}{}'.format( + test_class, + test_name, + time_taken, + inner_content, + test_stdout, + test_stderr)) + + # Save the result, update total test count. + with self.lock: + self.total_test_count += 1 + self.elements["all"].append(result) + + return result + + def _end_session_internal(self): + """Flushes out the report of test executions to form valid xml output. + + xUnit output is in XML. The reporting system cannot complete the + formatting of the output without knowing when there is no more input. + This call addresses notifcation of the completed test run and thus is + when we can finish off the report output. + """ + + # Figure out the counts line for the testsuite. If we have + # been counting either unexpected successes or expected + # failures, we'll output those in the counts, at the risk of + # being invalidated by a validating test results viewer. + # These aren't counted by default so they won't show up unless + # the user specified a formatter option to include them. + xfail_count = len(self.elements["expected_failures"]) + xpass_count = len(self.elements["unexpected_successes"]) + if xfail_count > 0 or xpass_count > 0: + extra_testsuite_attributes = ( + ' expected-failures="{}"' + ' unexpected-successes="{}"'.format(xfail_count, xpass_count)) + else: + extra_testsuite_attributes = "" + + # Output the header. + self.out_file.write( + '\n' + '\n'.format( + self.text_encoding, + "LLDB test suite", + self.total_test_count, + len(self.elements["errors"]), + len(self.elements["failures"]), + len(self.elements["skips"]), + extra_testsuite_attributes)) + + # Output each of the test result entries. + for result in self.elements["all"]: + self.out_file.write(result + '\n') + + # Close off the test suite. + self.out_file.write('\n') + + super(XunitFormatter, self).end_session() + + def end_session(self): + with self.lock: + self._end_session_internal() + + +class RawPickledFormatter(ResultsFormatter): + """Formats events as a pickled stream. + + The parallel test runner has inferiors pickle their results and send them + over a socket back to the parallel test. The parallel test runner then + aggregates them into the final results formatter (e.g. xUnit). + """ + + @classmethod + def arg_parser(cls): + """@return arg parser used to parse formatter-specific options.""" + parser = super(RawPickledFormatter, cls).arg_parser() + return parser + + def __init__(self, out_file, options): + super(RawPickledFormatter, self).__init__(out_file, options) + self.pid = os.getpid() + + def begin_session(self): + super(RawPickledFormatter, self).begin_session() + self.process_event({ + "event": "session_begin", + "event_time": time.time(), + "pid": self.pid + }) + + def process_event(self, test_event): + super(RawPickledFormatter, self).process_event(test_event) + + # Add pid to the event for tracking. + # test_event["pid"] = self.pid + + # Send it as {serialized_length_of_serialized_bytes}#{serialized_bytes} + pickled_message = cPickle.dumps(test_event) + self.out_file.send( + "{}#{}".format(len(pickled_message), pickled_message)) + + def end_session(self): + self.process_event({ + "event": "session_end", + "event_time": time.time(), + "pid": self.pid + }) + super(RawPickledFormatter, self).end_session()