Index: test/dosep.py =================================================================== --- test/dosep.py +++ test/dosep.py @@ -80,7 +80,12 @@ RUNNER_PROCESS_ASYNC_MAP = None RESULTS_LISTENER_CHANNEL = None -def setup_global_variables(lock, counter, total, name_len, options): +"""Contains an optional function pointer that can return the worker index + for the given thread/process calling it. Returns a 0-based index.""" +GET_WORKER_INDEX = None + +def setup_global_variables( + lock, counter, total, name_len, options, worker_index_map): global output_lock, test_counter, total_tests, test_name_len global dotest_options output_lock = lock @@ -89,7 +94,23 @@ test_name_len = name_len dotest_options = options + if worker_index_map is not None: + # We'll use the output lock for this to avoid sharing another lock. + # This won't be used much. + index_lock = lock + def get_worker_index_use_pid(): + """Returns a 0-based, process-unique index for the worker.""" + pid = os.getpid() + with index_lock: + if pid not in worker_index_map: + worker_index_map[pid] = len(worker_index_map) + return worker_index_map[pid] + + global GET_WORKER_INDEX + GET_WORKER_INDEX = get_worker_index_use_pid + + def report_test_failure(name, command, output): global output_lock with output_lock: @@ -150,31 +171,6 @@ return passes, failures, unexpected_successes -def inferior_session_interceptor(forwarding_func, event): - """Intercepts session begin/end events, passing through everyting else. - - @param forwarding_func a callable object to pass along the event if it - is not one that gets intercepted. - - @param event the test result event received. - """ - - if event is not None and isinstance(event, dict): - if "event" in event: - if event["event"] == "session_begin": - # Swallow it. Could report on inferior here if we - # cared. - return - elif event["event"] == "session_end": - # Swallow it. Could report on inferior here if we - # cared. More usefully, we can verify that the - # inferior went down hard if we don't receive this. - return - - # Pass it along. - forwarding_func(event) - - def call_with_timeout(command, timeout, name, inferior_pid_events): """Run command with a timeout if possible. -s QUIT will create a coredump if they are enabled on your system @@ -183,6 +179,10 @@ if timeout_command and timeout != "0": command = [timeout_command, '-s', 'QUIT', timeout] + command + if GET_WORKER_INDEX is not None: + worker_index = GET_WORKER_INDEX() + command.extend([ + "--event-add-entries", "worker_index={}".format(worker_index)]) # Specifying a value for close_fds is unsupported on Windows when using # subprocess.PIPE if os.name != "nt": @@ -263,7 +263,8 @@ def process_dir_worker_multiprocessing( a_output_lock, a_test_counter, a_total_tests, a_test_name_len, - a_dotest_options, job_queue, result_queue, inferior_pid_events): + a_dotest_options, job_queue, result_queue, inferior_pid_events, + worker_index_map): """Worker thread main loop when in multiprocessing mode. Takes one directory specification at a time and works on it.""" @@ -273,7 +274,7 @@ # Setup the global state for the worker process. setup_global_variables( a_output_lock, a_test_counter, a_total_tests, a_test_name_len, - a_dotest_options) + a_dotest_options, worker_index_map) # Keep grabbing entries from the queue until done. while not job_queue.empty(): @@ -441,14 +442,36 @@ # rest of the flat module. global output_lock output_lock = multiprocessing.RLock() + initialize_global_vars_common(num_threads, test_work_items) def initialize_global_vars_threading(num_threads, test_work_items): + """Initializes global variables used in threading mode. + @param num_threads specifies the number of workers used. + @param test_work_items specifies all the work items + that will be processed. + """ # Initialize the global state we'll use to communicate with the # rest of the flat module. global output_lock output_lock = threading.RLock() + + index_lock = threading.RLock() + index_map = {} + + def get_worker_index_threading(): + """Returns a 0-based, thread-unique index for the worker thread.""" + thread_id = threading.current_thread().ident + with index_lock: + if thread_id not in index_map: + index_map[thread_id] = len(index_map) + return index_map[thread_id] + + + global GET_WORKER_INDEX + GET_WORKER_INDEX = get_worker_index_threading + initialize_global_vars_common(num_threads, test_work_items) @@ -630,6 +653,10 @@ # hold 2 * (num inferior dotest.py processes started) entries. inferior_pid_events = multiprocessing.Queue(4096) + # Worker dictionary allows each worker to figure out its worker index. + manager = multiprocessing.Manager() + worker_index_map = manager.dict() + # Create workers. We don't use multiprocessing.Pool due to # challenges with handling ^C keyboard interrupts. workers = [] @@ -643,7 +670,8 @@ dotest_options, job_queue, result_queue, - inferior_pid_events)) + inferior_pid_events, + worker_index_map)) worker.start() workers.append(worker) @@ -717,11 +745,14 @@ # Initialize our global state. initialize_global_vars_multiprocessing(num_threads, test_work_items) + manager = multiprocessing.Manager() + worker_index_map = manager.dict() + pool = multiprocessing.Pool( num_threads, initializer=setup_global_variables, initargs=(output_lock, test_counter, total_tests, test_name_len, - dotest_options)) + dotest_options, worker_index_map)) # Start the map operation (async mode). map_future = pool.map_async( @@ -819,6 +850,10 @@ # Initialize our global state. initialize_global_vars_multiprocessing(1, test_work_items) + # We're always worker index 0 + global GET_WORKER_INDEX + GET_WORKER_INDEX = lambda: 0 + # Run the listener and related channel maps in a separate thread. # global RUNNER_PROCESS_ASYNC_MAP global RESULTS_LISTENER_CHANNEL @@ -861,8 +896,7 @@ # listener channel and tell the inferior to send results to the # port on which we'll be listening. if RESULTS_FORMATTER is not None: - forwarding_func = lambda event: inferior_session_interceptor( - RESULTS_FORMATTER.process_event, event) + forwarding_func = RESULTS_FORMATTER.process_event RESULTS_LISTENER_CHANNEL = ( dotest_channels.UnpicklingForwardingListenerChannel( RUNNER_PROCESS_ASYNC_MAP, "localhost", 0, forwarding_func)) @@ -1088,6 +1122,11 @@ if dotest_options.results_formatter_options is not None: _remove_option(dotest_argv, "--results-formatter-options", 2) + # Remove test runner name if present. + if dotest_options.test_runner_name is not None: + _remove_option(dotest_argv, "--test-runner-name", 2) + + def main(print_details_on_success, num_threads, test_subdir, test_runner_name, results_formatter): """Run dotest.py in inferior mode in parallel. Index: test/dotest.py =================================================================== --- test/dotest.py +++ test/dotest.py @@ -823,6 +823,19 @@ lldb_platform_url = args.lldb_platform_url if args.lldb_platform_working_dir: lldb_platform_working_dir = args.lldb_platform_working_dir + + if args.event_add_entries and len(args.event_add_entries) > 0: + entries = {} + # Parse out key=val pairs, separated by comma + for keyval in args.event_add_entries.split(","): + key_val_entry = keyval.split("=") + if len(key_val_entry) == 2: + entries[key_val_entry[0]] = key_val_entry[1] + # Tell the event builder to create all events with these + # key/val pairs in them. + if len(entries) > 0: + test_results.EventBuilder.add_entries_to_all_events(entries) + # Gather all the dirs passed on the command line. if len(args.args) > 0: testdirs = map(os.path.abspath, args.args) @@ -947,8 +960,15 @@ if results_filename: # Open the results file for writing. - results_file_object = open(results_filename, "w") - cleanup_func = results_file_object.close + if results_filename == 'stdout': + results_file_object = sys.stdout + cleanup_func = None + elif results_filename == 'stderr': + results_file_object = sys.stderr + cleanup_func = None + else: + results_file_object = open(results_filename, "w") + cleanup_func = results_file_object.close default_formatter_name = "test_results.XunitFormatter" elif results_port: # Connect to the specified localhost port. @@ -995,7 +1015,8 @@ results_formatter_object.end_session() # And now close out the output file-like object. - cleanup_func() + if cleanup_func is not None: + cleanup_func() atexit.register(shutdown_formatter) Index: test/dotest_args.py =================================================================== --- test/dotest_args.py +++ test/dotest_args.py @@ -165,6 +165,11 @@ help=('Specify comma-separated options to pass to the formatter. ' 'Use --results-formatter-options="--option1[,--option2[,...]]" ' 'syntax. Note the "=" is critical, and don\'t use whitespace.')) + group.add_argument( + '--event-add-entries', + action='store', + help=('Specify comma-separated KEY=VAL entries to add key and value ' + 'pairs to all test events generated by this test run.')) # Remove the reference to our helper function del X Index: test/test_results.py =================================================================== --- test/test_results.py +++ test/test_results.py @@ -12,6 +12,7 @@ import cPickle import inspect import os +import pprint import re import sys import threading @@ -22,6 +23,9 @@ class EventBuilder(object): """Helper class to build test result event dictionaries.""" + + BASE_DICTIONARY = None + @staticmethod def _get_test_name_info(test): """Returns (test-class-name, test-method-name) from a test case instance. @@ -46,12 +50,19 @@ @return event dictionary with common event fields set. """ test_class_name, test_name = EventBuilder._get_test_name_info(test) - return { + + if EventBuilder.BASE_DICTIONARY is not None: + # Start with a copy of the "always include" entries. + result = dict(EventBuilder.BASE_DICTIONARY) + else: + result = {} + result.update({ "event": event_type, "test_class": test_class_name, "test_name": test_name, "event_time": time.time() - } + }) + return result @staticmethod def _error_tuple_class(error_tuple): @@ -122,9 +133,9 @@ event = EventBuilder._event_dictionary_test_result(test, status) event["issue_class"] = EventBuilder._error_tuple_class(error_tuple) event["issue_message"] = EventBuilder._error_tuple_message(error_tuple) - tb = EventBuilder._error_tuple_traceback(error_tuple) - if tb is not None: - event["issue_backtrace"] = traceback.format_tb(tb) + backtrace = EventBuilder._error_tuple_traceback(error_tuple) + if backtrace is not None: + event["issue_backtrace"] = traceback.format_tb(backtrace) return event @staticmethod @@ -246,8 +257,38 @@ event["issue_phase"] = "cleanup" return event + @staticmethod + def add_entries_to_all_events(entries_dict): + """Specifies a dictionary of entries to add to all test events. + This provides a mechanism for, say, a parallel test runner to + indicate to each inferior dotest.py that it should add a + worker index to each. + + Calling this method replaces all previous entries added + by a prior call to this. + + Event build methods will overwrite any entries that collide. + Thus, the passed in dictionary is the base, which gets merged + over by event building when keys collide. + + @param entries_dict a dictionary containing key and value + pairs that should be merged into all events created by the + event generator. May be None to clear out any extra entries. + """ + EventBuilder.BASE_DICTIONARY = dict(entries_dict) + + @staticmethod + def base_event(): + """@return the base event dictionary that all events should contain.""" + if EventBuilder.BASE_DICTIONARY is not None: + return dict(EventBuilder.BASE_DICTIONARY) + else: + return None + + class ResultsFormatter(object): + """Provides interface to formatting test results out to a file-like object. This class allows the LLDB test framework's raw test-realted @@ -420,6 +461,9 @@ @staticmethod def _build_illegal_xml_regex(): + """Contructs a regex to match all illegal xml characters. + + Expects to be used against a unicode string.""" # Construct the range pairs of invalid unicode chareacters. illegal_chars_u = [ (0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), (0x7F, 0x84), @@ -453,6 +497,15 @@ return xml.sax.saxutils.quoteattr(text) def _replace_invalid_xml(self, str_or_unicode): + """Replaces invalid XML characters with a '?'. + + @param str_or_unicode a string to replace invalid XML + characters within. Can be unicode or not. If not unicode, + assumes it is a byte string in utf-8 encoding. + + @returns a utf-8-encoded byte string with invalid + XML replaced with '?'. + """ # Get the content into unicode if isinstance(str_or_unicode, str): unicode_content = str_or_unicode.decode('utf-8') @@ -472,6 +525,11 @@ XunitFormatter.RM_FAILURE, XunitFormatter.RM_PASSTHRU] parser.add_argument( + "--assert-on-unknown-events", + action="store_true", + help=('cause unknown test events to generate ' + 'a python assert. Default is to ignore.')) + parser.add_argument( "--xpass", action="store", choices=results_mapping_choices, default=XunitFormatter.RM_FAILURE, help=('specify mapping from unexpected success to jUnit/xUnit ' @@ -533,8 +591,10 @@ elif event_type == "test_result": self._process_test_result(test_event) else: - sys.stderr.write("unknown event type {} from {}\n".format( - event_type, test_event)) + # This is an unknown event. + if self.options.assert_on_unknown_events: + raise Exception("unknown event type {} from {}\n".format( + event_type, test_event)) def _handle_success(self, test_event): """Handles a test success. @@ -817,27 +877,40 @@ def begin_session(self): super(RawPickledFormatter, self).begin_session() - self.process_event({ + event = EventBuilder.base_event() + if event is None: + event = {} + event.update({ "event": "session_begin", "event_time": time.time(), "pid": self.pid }) + self.process_event(event) def process_event(self, test_event): super(RawPickledFormatter, self).process_event(test_event) - # Add pid to the event for tracking. - # test_event["pid"] = self.pid - # Send it as {serialized_length_of_serialized_bytes}#{serialized_bytes} pickled_message = cPickle.dumps(test_event) self.out_file.send( "{}#{}".format(len(pickled_message), pickled_message)) def end_session(self): - self.process_event({ + event = EventBuilder.base_event() + if event is None: + event = {} + event.update({ "event": "session_end", "event_time": time.time(), "pid": self.pid }) + self.process_event(event) super(RawPickledFormatter, self).end_session() + + +class DumpFormatter(ResultsFormatter): + """Formats events to the file as their raw python dictionary format.""" + + def process_event(self, test_event): + super(DumpFormatter, self).process_event(test_event) + self.out_file.write("\n" + pprint.pformat(test_event) + "\n")