Index: packages/Python/lldbsuite/test/curses_results.py =================================================================== --- packages/Python/lldbsuite/test/curses_results.py +++ /dev/null @@ -1,224 +0,0 @@ -#!/usr/bin/env python - -""" - The LLVM Compiler Infrastructure - - This file is distributed under the University of Illinois Open Source - License. See LICENSE.TXT for details. - -Configuration options for lldbtest.py set by dotest.py during initialization -""" - -from __future__ import print_function -from __future__ import absolute_import - -# System modules -import curses -import datetime -import math -import sys -import time - -# Third-party modules - -# LLDB modules -from . import lldbcurses -from . import result_formatter -from .result_formatter import EventBuilder - - -class Curses(result_formatter.ResultsFormatter): - """Receives live results from tests that are running and reports them to the terminal in a curses GUI""" - - def __init__(self, out_file, options): - # Initialize the parent - super(Curses, self).__init__(out_file, options) - self.using_terminal = True - self.have_curses = True - self.initialize_event = None - self.jobs = [None] * 64 - self.job_tests = [None] * 64 - self.results = list() - try: - self.main_window = lldbcurses.intialize_curses() - self.main_window.add_key_action('\t', self.main_window.select_next_first_responder, "Switch between views that can respond to keyboard input") - self.main_window.refresh() - self.job_panel = None - self.results_panel = None - self.status_panel = None - self.info_panel = None - self.hide_status_list = list() - self.start_time = time.time() - except: - self.have_curses = False - lldbcurses.terminate_curses() - self.using_terminal = False - print("Unexpected error:", sys.exc_info()[0]) - raise - - self.line_dict = dict() - # self.events_file = open("/tmp/events.txt", "w") - # self.formatters = list() - # if tee_results_formatter: - # self.formatters.append(tee_results_formatter) - - def status_to_short_str(self, status): - if status == EventBuilder.STATUS_SUCCESS: - return '.' - elif status == EventBuilder.STATUS_FAILURE: - return 'F' - elif status == EventBuilder.STATUS_UNEXPECTED_SUCCESS: - return '?' - elif status == EventBuilder.STATUS_EXPECTED_FAILURE: - return 'X' - elif status == EventBuilder.STATUS_SKIP: - return 'S' - elif status == EventBuilder.STATUS_ERROR: - return 'E' - else: - return status - - def show_info_panel(self): - selected_idx = self.results_panel.get_selected_idx() - if selected_idx >= 0 and selected_idx < len(self.results): - if self.info_panel is None: - info_frame = self.results_panel.get_contained_rect(top_inset=10, left_inset=10, right_inset=10, height=30) - self.info_panel = lldbcurses.BoxedPanel(info_frame, "Result Details") - # Add a key action for any key that will hide this panel when any key is pressed - self.info_panel.add_key_action(-1, self.hide_info_panel, 'Hide the info panel') - self.info_panel.top() - else: - self.info_panel.show() - - self.main_window.push_first_responder(self.info_panel) - test_start = self.results[selected_idx][0] - test_result = self.results[selected_idx][1] - self.info_panel.set_line(0, "File: %s" % (test_start['test_filename'])) - self.info_panel.set_line(1, "Test: %s.%s" % (test_start['test_class'], test_start['test_name'])) - self.info_panel.set_line(2, "Time: %s" % (test_result['elapsed_time'])) - self.info_panel.set_line(3, "Status: %s" % (test_result['status'])) - - def hide_info_panel(self): - self.main_window.pop_first_responder(self.info_panel) - self.info_panel.hide() - self.main_window.refresh() - - def toggle_status(self, status): - if status: - # Toggle showing and hiding results whose status matches "status" in "Results" window - if status in self.hide_status_list: - self.hide_status_list.remove(status) - else: - self.hide_status_list.append(status) - self.update_results() - - def update_results(self, update=True): - '''Called after a category of test have been show/hidden to update the results list with - what the user desires to see.''' - self.results_panel.clear(update=False) - for result in self.results: - test_result = result[1] - status = test_result['status'] - if status in self.hide_status_list: - continue - name = test_result['test_class'] + '.' + test_result['test_name'] - self.results_panel.append_line('%s (%6.2f sec) %s' % (self.status_to_short_str(status), test_result['elapsed_time'], name)) - if update: - self.main_window.refresh() - - def handle_event(self, test_event): - with self.lock: - super(Curses, self).handle_event(test_event) - # for formatter in self.formatters: - # formatter.process_event(test_event) - if self.have_curses: - worker_index = -1 - if 'worker_index' in test_event: - worker_index = test_event['worker_index'] - if 'event' in test_event: - check_for_one_key = True - #print(str(test_event), file=self.events_file) - event = test_event['event'] - if self.status_panel: - self.status_panel.update_status('time', str(datetime.timedelta(seconds=math.floor(time.time() - self.start_time)))) - if event == 'test_start': - name = test_event['test_class'] + '.' + test_event['test_name'] - self.job_tests[worker_index] = test_event - if 'pid' in test_event: - line = 'pid: %5d ' % (test_event['pid']) + name - else: - line = name - self.job_panel.set_line(worker_index, line) - self.main_window.refresh() - elif event == 'test_result': - status = test_event['status'] - self.status_panel.increment_status(status) - if 'pid' in test_event: - line = 'pid: %5d ' % (test_event['pid']) - else: - line = '' - self.job_panel.set_line(worker_index, line) - name = test_event['test_class'] + '.' + test_event['test_name'] - elapsed_time = test_event['event_time'] - self.job_tests[worker_index]['event_time'] - if not status in self.hide_status_list: - self.results_panel.append_line('%s (%6.2f sec) %s' % (self.status_to_short_str(status), elapsed_time, name)) - self.main_window.refresh() - # Append the result pairs - test_event['elapsed_time'] = elapsed_time - self.results.append([self.job_tests[worker_index], test_event]) - self.job_tests[worker_index] = '' - elif event == 'job_begin': - self.jobs[worker_index] = test_event - if 'pid' in test_event: - line = 'pid: %5d ' % (test_event['pid']) - else: - line = '' - self.job_panel.set_line(worker_index, line) - elif event == 'job_end': - self.jobs[worker_index] = '' - self.job_panel.set_line(worker_index, '') - elif event == 'initialize': - self.initialize_event = test_event - num_jobs = test_event['worker_count'] - job_frame = self.main_window.get_contained_rect(height=num_jobs+2) - results_frame = self.main_window.get_contained_rect(top_inset=num_jobs+2, bottom_inset=1) - status_frame = self.main_window.get_contained_rect(height=1, top_inset=self.main_window.get_size().h-1) - self.job_panel = lldbcurses.BoxedPanel(frame=job_frame, title="Jobs") - self.results_panel = lldbcurses.BoxedPanel(frame=results_frame, title="Results") - - self.results_panel.add_key_action(curses.KEY_UP, self.results_panel.select_prev , "Select the previous list entry") - self.results_panel.add_key_action(curses.KEY_DOWN, self.results_panel.select_next , "Select the next list entry") - self.results_panel.add_key_action(curses.KEY_HOME, self.results_panel.scroll_begin , "Scroll to the start of the list") - self.results_panel.add_key_action(curses.KEY_END, self.results_panel.scroll_end , "Scroll to the end of the list") - self.results_panel.add_key_action(curses.KEY_ENTER, self.show_info_panel , "Display info for the selected result item") - self.results_panel.add_key_action('.', lambda : self.toggle_status(EventBuilder.STATUS_SUCCESS) , "Toggle showing/hiding tests whose status is 'success'") - self.results_panel.add_key_action('e', lambda : self.toggle_status(EventBuilder.STATUS_ERROR) , "Toggle showing/hiding tests whose status is 'error'") - self.results_panel.add_key_action('f', lambda : self.toggle_status(EventBuilder.STATUS_FAILURE) , "Toggle showing/hiding tests whose status is 'failure'") - self.results_panel.add_key_action('s', lambda : self.toggle_status(EventBuilder.STATUS_SKIP) , "Toggle showing/hiding tests whose status is 'skip'") - self.results_panel.add_key_action('x', lambda : self.toggle_status(EventBuilder.STATUS_EXPECTED_FAILURE) , "Toggle showing/hiding tests whose status is 'expected_failure'") - self.results_panel.add_key_action('?', lambda : self.toggle_status(EventBuilder.STATUS_UNEXPECTED_SUCCESS), "Toggle showing/hiding tests whose status is 'unexpected_success'") - self.status_panel = lldbcurses.StatusPanel(frame=status_frame) - - self.main_window.add_child(self.job_panel) - self.main_window.add_child(self.results_panel) - self.main_window.add_child(self.status_panel) - self.main_window.set_first_responder(self.results_panel) - - self.status_panel.add_status_item(name="time", title="Elapsed", format="%s", width=20, value="0:00:00", update=False) - self.status_panel.add_status_item(name=EventBuilder.STATUS_SUCCESS, title="Success", format="%u", width=20, value=0, update=False) - self.status_panel.add_status_item(name=EventBuilder.STATUS_FAILURE, title="Failure", format="%u", width=20, value=0, update=False) - self.status_panel.add_status_item(name=EventBuilder.STATUS_ERROR, title="Error", format="%u", width=20, value=0, update=False) - self.status_panel.add_status_item(name=EventBuilder.STATUS_SKIP, title="Skipped", format="%u", width=20, value=0, update=True) - self.status_panel.add_status_item(name=EventBuilder.STATUS_EXPECTED_FAILURE, title="Expected Failure", format="%u", width=30, value=0, update=False) - self.status_panel.add_status_item(name=EventBuilder.STATUS_UNEXPECTED_SUCCESS, title="Unexpected Success", format="%u", width=30, value=0, update=False) - self.main_window.refresh() - elif event == 'terminate': - #self.main_window.key_event_loop() - lldbcurses.terminate_curses() - check_for_one_key = False - self.using_terminal = False - # Check for 1 keypress with no delay - - # Check for 1 keypress with no delay - if check_for_one_key: - self.main_window.key_event_loop(0, 1) Index: packages/Python/lldbsuite/test/decorators.py =================================================================== --- packages/Python/lldbsuite/test/decorators.py +++ packages/Python/lldbsuite/test/decorators.py @@ -1,10 +1,9 @@ -from __future__ import print_function from __future__ import absolute_import +from __future__ import print_function # System modules from distutils.version import LooseVersion, StrictVersion from functools import wraps -import itertools import os import re import sys @@ -19,7 +18,7 @@ import lldb from . import configuration from . import test_categories -from .result_formatter import EventBuilder +from lldbsuite.test_event.event_builder import EventBuilder from lldbsuite.support import funcutils from lldbsuite.test import lldbplatform from lldbsuite.test import lldbplatformutil @@ -77,7 +76,6 @@ raise Exception("Decorator can only be used to decorate a test method") @wraps(func) def wrapper(*args, **kwargs): - from unittest2 import case self = args[0] if funcutils.requires_self(expected_fn): xfail_reason = expected_fn(self) @@ -110,7 +108,6 @@ @wraps(func) def wrapper(*args, **kwargs): - from unittest2 import case self = args[0] if funcutils.requires_self(expected_fn): reason = expected_fn(self) Index: packages/Python/lldbsuite/test/dosep.py =================================================================== --- packages/Python/lldbsuite/test/dosep.py +++ packages/Python/lldbsuite/test/dosep.py @@ -30,8 +30,8 @@ echo core.%p | sudo tee /proc/sys/kernel/core_pattern """ -from __future__ import print_function from __future__ import absolute_import +from __future__ import print_function # system packages and modules import asyncore @@ -52,13 +52,12 @@ import lldbsuite import lldbsuite.support.seven as seven -from lldbsuite.support import optional_with from . import configuration -from . import dotest_channels from . import dotest_args -from . import result_formatter - -from .result_formatter import EventBuilder +from lldbsuite.support import optional_with +from lldbsuite.test_event import dotest_channels +from lldbsuite.test_event.event_builder import EventBuilder +from lldbsuite.test_event import formatter from .test_runner import process_control @@ -299,9 +298,9 @@ event_port = int(command[arg_index]) # Create results formatter connected back to collector via socket. - config = result_formatter.FormatterConfig() + config = formatter.FormatterConfig() config.port = event_port - formatter_spec = result_formatter.create_results_formatter(config) + formatter_spec = formatter.create_results_formatter(config) if formatter_spec is None or formatter_spec.formatter is None: raise Exception( "Failed to create socket-based ResultsFormatter " Index: packages/Python/lldbsuite/test/dotest.py =================================================================== --- packages/Python/lldbsuite/test/dotest.py +++ packages/Python/lldbsuite/test/dotest.py @@ -23,7 +23,6 @@ # System modules import atexit -import importlib import os import errno import platform @@ -31,7 +30,6 @@ import socket import subprocess import sys -import inspect # Third-party modules import six @@ -43,9 +41,9 @@ from . import dotest_args from . import lldbtest_config from . import test_categories -from . import result_formatter +from lldbsuite.test_event import formatter from . import test_result -from .result_formatter import EventBuilder +from lldbsuite.test_event.event_builder import EventBuilder from ..support import seven def is_exe(fpath): @@ -359,7 +357,7 @@ # Capture test results-related args. if args.curses and not args.inferior: # Act as if the following args were set. - args.results_formatter = "lldbsuite.test.curses_results.Curses" + args.results_formatter = "lldbsuite.test_event.formatter.curses.Curses" args.results_file = "stdout" if args.results_file: @@ -383,7 +381,7 @@ # and we're not a test inferior. if not args.inferior and configuration.results_formatter_name is None: configuration.results_formatter_name = ( - "lldbsuite.test.result_formatter.ResultsFormatter") + "lldbsuite.test_event.formatter.results_formatter.ResultsFormatter") # rerun-related arguments configuration.rerun_all_issues = args.rerun_all_issues @@ -412,7 +410,7 @@ # Tell the event builder to create all events with these # key/val pairs in them. if len(entries) > 0: - result_formatter.EventBuilder.add_entries_to_all_events(entries) + EventBuilder.add_entries_to_all_events(entries) # Gather all the dirs passed on the command line. if len(args.args) > 0: @@ -453,7 +451,7 @@ def setupTestResults(): """Sets up test results-related objects based on arg settings.""" # Setup the results formatter configuration. - formatter_config = result_formatter.FormatterConfig() + formatter_config = formatter.FormatterConfig() formatter_config.filename = configuration.results_filename formatter_config.formatter_name = configuration.results_formatter_name formatter_config.formatter_options = ( @@ -461,12 +459,12 @@ formatter_config.port = configuration.results_port # Create the results formatter. - formatter_spec = result_formatter.create_results_formatter( + formatter_spec = formatter.create_results_formatter( formatter_config) if formatter_spec is not None and formatter_spec.formatter is not None: configuration.results_formatter_object = formatter_spec.formatter - # Send an intialize message to the formatter. + # Send an initialize message to the formatter. initialize_event = EventBuilder.bare_event("initialize") if isMultiprocessTestRunner(): if (configuration.test_runner_name is not None and Index: packages/Python/lldbsuite/test/dotest_channels.py =================================================================== --- packages/Python/lldbsuite/test/dotest_channels.py +++ /dev/null @@ -1,208 +0,0 @@ -""" - The LLVM Compiler Infrastructure - -This file is distributed under the University of Illinois Open Source -License. See LICENSE.TXT for details. - -Sync lldb and related source from a local machine to a remote machine. - -This facilitates working on the lldb sourcecode on multiple machines -and multiple OS types, verifying changes across all. - - -This module provides asyncore channels used within the LLDB test -framework. -""" - -from __future__ import print_function -from __future__ import absolute_import - - -# System modules -import asyncore -import socket - -# Third-party modules -from six.moves import cPickle - -# LLDB modules - - -class UnpicklingForwardingReaderChannel(asyncore.dispatcher): - """Provides an unpickling, forwarding asyncore dispatch channel reader. - - Inferior dotest.py processes with side-channel-based test results will - send test result event data in a pickled format, one event at a time. - This class supports reconstructing the pickled data and forwarding it - on to its final destination. - - The channel data is written in the form: - {num_payload_bytes}#{payload_bytes} - - The bulk of this class is devoted to reading and parsing out - the payload bytes. - """ - def __init__(self, file_object, async_map, forwarding_func): - asyncore.dispatcher.__init__(self, sock=file_object, map=async_map) - - self.header_contents = b"" - self.packet_bytes_remaining = 0 - self.reading_header = True - self.ibuffer = b'' - self.forwarding_func = forwarding_func - if forwarding_func is None: - # This whole class is useless if we do nothing with the - # unpickled results. - raise Exception("forwarding function must be set") - - # Initiate all connections by sending an ack. This allows - # the initiators of the socket to await this to ensure - # that this end is up and running (and therefore already - # into the async map). - ack_bytes = bytearray() - ack_bytes.append(chr(42)) - file_object.send(ack_bytes) - - def deserialize_payload(self): - """Unpickles the collected input buffer bytes and forwards.""" - if len(self.ibuffer) > 0: - self.forwarding_func(cPickle.loads(self.ibuffer)) - self.ibuffer = b'' - - def consume_header_bytes(self, data): - """Consumes header bytes from the front of data. - @param data the incoming data stream bytes - @return any data leftover after consuming header bytes. - """ - # We're done if there is no content. - if not data or (len(data) == 0): - return None - - full_header_len = 4 - - assert len(self.header_contents) < full_header_len - - bytes_avail = len(data) - bytes_needed = full_header_len - len(self.header_contents) - header_bytes_avail = min(bytes_needed, bytes_avail) - self.header_contents += data[:header_bytes_avail] - if len(self.header_contents) == full_header_len: - import struct - # End of header. - self.packet_bytes_remaining = struct.unpack( - "!I", self.header_contents)[0] - self.header_contents = b"" - self.reading_header = False - return data[header_bytes_avail:] - - # If we made it here, we've exhausted the data and - # we're still parsing header content. - return None - - def consume_payload_bytes(self, data): - """Consumes payload bytes from the front of data. - @param data the incoming data stream bytes - @return any data leftover after consuming remaining payload bytes. - """ - if not data or (len(data) == 0): - # We're done and there's nothing to do. - return None - - data_len = len(data) - if data_len <= self.packet_bytes_remaining: - # We're consuming all the data provided. - self.ibuffer += data - self.packet_bytes_remaining -= data_len - - # If we're no longer waiting for payload bytes, - # we flip back to parsing header bytes and we - # unpickle the payload contents. - if self.packet_bytes_remaining < 1: - self.reading_header = True - self.deserialize_payload() - - # We're done, no more data left. - return None - else: - # We're only consuming a portion of the data since - # the data contains more than the payload amount. - self.ibuffer += data[:self.packet_bytes_remaining] - data = data[self.packet_bytes_remaining:] - - # We now move on to reading the header. - self.reading_header = True - self.packet_bytes_remaining = 0 - - # And we can deserialize the payload. - self.deserialize_payload() - - # Return the remaining data. - return data - - def handle_read(self): - # Read some data from the socket. - try: - data = self.recv(8192) - # print('driver socket READ: %d bytes' % len(data)) - except socket.error as socket_error: - print( - "\nINFO: received socket error when reading data " - "from test inferior:\n{}".format(socket_error)) - raise - except Exception as general_exception: - print( - "\nERROR: received non-socket error when reading data " - "from the test inferior:\n{}".format(general_exception)) - raise - - # Consume the message content. - while data and (len(data) > 0): - # If we're reading the header, gather header bytes. - if self.reading_header: - data = self.consume_header_bytes(data) - else: - data = self.consume_payload_bytes(data) - - def handle_close(self): - # print("socket reader: closing port") - self.close() - - -class UnpicklingForwardingListenerChannel(asyncore.dispatcher): - """Provides a socket listener asyncore channel for unpickling/forwarding. - - This channel will listen on a socket port (use 0 for host-selected). Any - client that connects will have an UnpicklingForwardingReaderChannel handle - communication over the connection. - - The dotest parallel test runners, when collecting test results, open the - test results side channel over a socket. This channel handles connections - from inferiors back to the test runner. Each worker fires up a listener - for each inferior invocation. This simplifies the asyncore.loop() usage, - one of the reasons for implementing with asyncore. This listener shuts - down once a single connection is made to it. - """ - def __init__(self, async_map, host, port, backlog_count, forwarding_func): - asyncore.dispatcher.__init__(self, map=async_map) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.set_reuse_addr() - self.bind((host, port)) - self.address = self.socket.getsockname() - self.listen(backlog_count) - self.handler = None - self.async_map = async_map - self.forwarding_func = forwarding_func - if forwarding_func is None: - # This whole class is useless if we do nothing with the - # unpickled results. - raise Exception("forwarding function must be set") - - def handle_accept(self): - (sock, addr) = self.socket.accept() - if sock and addr: - # print('Incoming connection from %s' % repr(addr)) - self.handler = UnpicklingForwardingReaderChannel( - sock, self.async_map, self.forwarding_func) - - def handle_close(self): - self.close() Index: packages/Python/lldbsuite/test/lldbtest.py =================================================================== --- packages/Python/lldbsuite/test/lldbtest.py +++ packages/Python/lldbsuite/test/lldbtest.py @@ -31,8 +31,8 @@ $ """ -from __future__ import print_function from __future__ import absolute_import +from __future__ import print_function # System modules import abc @@ -42,12 +42,13 @@ import glob import inspect import io -import os, sys, traceback import os.path import re import signal from subprocess import * +import sys import time +import traceback import types # Third-party modules @@ -68,8 +69,6 @@ from lldbsuite.support import encoded_file from lldbsuite.support import funcutils -from .result_formatter import EventBuilder - # dosep.py starts lots and lots of dotest instances # This option helps you find if two (or more) dotest instances are using the same # directory at the same time Index: packages/Python/lldbsuite/test/result_formatter.py =================================================================== --- packages/Python/lldbsuite/test/result_formatter.py +++ /dev/null @@ -1,1323 +0,0 @@ -""" - The LLVM Compiler Infrastructure - -This file is distributed under the University of Illinois Open Source -License. See LICENSE.TXT for details. - -Provides classes used by the test results reporting infrastructure -within the LLDB test suite. -""" - -from __future__ import print_function -from __future__ import absolute_import - -# System modules -import argparse -import importlib -import inspect -import os -import pprint -import socket -import sys -import threading -import time -import traceback - -# Third-party modules -import six -from six.moves import cPickle - -# LLDB modules -from . import configuration - -import lldbsuite - - -# Ignore method count on DTOs. -# pylint: disable=too-few-public-methods -class FormatterConfig(object): - """Provides formatter configuration info to create_results_formatter().""" - def __init__(self): - self.filename = None - self.port = None - self.formatter_name = None - self.formatter_options = None - - -# Ignore method count on DTOs. -# pylint: disable=too-few-public-methods -class CreatedFormatter(object): - """Provides transfer object for returns from create_results_formatter().""" - def __init__(self, formatter, cleanup_func): - self.formatter = formatter - self.cleanup_func = cleanup_func - - -def create_results_formatter(config): - """Sets up a test results formatter. - - @param config an instance of FormatterConfig - that indicates how to setup the ResultsFormatter. - - @return an instance of CreatedFormatter. - """ - def create_socket(port): - """Creates a socket to the localhost on the given port. - - @param port the port number of the listening port on - the localhost. - - @return (socket object, socket closing function) - """ - def socket_closer(open_sock): - """Close down an opened socket properly.""" - open_sock.shutdown(socket.SHUT_RDWR) - open_sock.close() - - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(("localhost", port)) - - # Wait for the ack from the listener side. - # This is needed to prevent a race condition - # in the main dosep.py processing loop: we - # can't allow a worker queue thread to die - # that has outstanding messages to a listener - # socket before the listener socket asyncore - # listener socket gets spun up; otherwise, - # we lose the test result info. - read_bytes = sock.recv(1) - # print("\n** socket creation: received ack: {}".format(ord(read_bytes[0])), file=sys.stderr) - - return (sock, lambda: socket_closer(sock)) - - default_formatter_name = None - results_file_object = None - cleanup_func = None - - if config.filename: - # Open the results file for writing. - if config.filename == 'stdout': - results_file_object = sys.stdout - cleanup_func = None - elif config.filename == 'stderr': - results_file_object = sys.stderr - cleanup_func = None - else: - results_file_object = open(config.filename, "w") - cleanup_func = results_file_object.close - default_formatter_name = ( - "lldbsuite.test.xunit_formatter.XunitFormatter") - elif config.port: - # Connect to the specified localhost port. - results_file_object, cleanup_func = create_socket(config.port) - default_formatter_name = ( - "lldbsuite.test.result_formatter.RawPickledFormatter") - - # If we have a results formatter name specified and we didn't specify - # a results file, we should use stdout. - if config.formatter_name is not None and results_file_object is None: - # Use stdout. - results_file_object = sys.stdout - cleanup_func = None - - if results_file_object: - # We care about the formatter. Choose user-specified or, if - # none specified, use the default for the output type. - if config.formatter_name: - formatter_name = config.formatter_name - else: - formatter_name = default_formatter_name - - # Create an instance of the class. - # First figure out the package/module. - components = formatter_name.split(".") - module = importlib.import_module(".".join(components[:-1])) - - # Create the class name we need to load. - cls = getattr(module, components[-1]) - - # Handle formatter options for the results formatter class. - formatter_arg_parser = cls.arg_parser() - if config.formatter_options and len(config.formatter_options) > 0: - command_line_options = config.formatter_options - else: - command_line_options = [] - - formatter_options = formatter_arg_parser.parse_args( - command_line_options) - - # Create the TestResultsFormatter given the processed options. - results_formatter_object = cls(results_file_object, formatter_options) - - def shutdown_formatter(): - """Shuts down the formatter when it is no longer needed.""" - # Tell the formatter to write out anything it may have - # been saving until the very end (e.g. xUnit results - # can't complete its output until this point). - results_formatter_object.send_terminate_as_needed() - - # And now close out the output file-like object. - if cleanup_func is not None: - cleanup_func() - - return CreatedFormatter( - results_formatter_object, - shutdown_formatter) - else: - return None - - -class EventBuilder(object): - """Helper class to build test result event dictionaries.""" - - BASE_DICTIONARY = None - - # Test Event Types - TYPE_JOB_RESULT = "job_result" - TYPE_TEST_RESULT = "test_result" - TYPE_TEST_START = "test_start" - TYPE_MARK_TEST_RERUN_ELIGIBLE = "test_eligible_for_rerun" - TYPE_MARK_TEST_EXPECTED_FAILURE = "test_expected_failure" - TYPE_SESSION_TERMINATE = "terminate" - - RESULT_TYPES = set([ - TYPE_JOB_RESULT, - TYPE_TEST_RESULT - ]) - - # Test/Job Status Tags - STATUS_EXCEPTIONAL_EXIT = "exceptional_exit" - STATUS_SUCCESS = "success" - STATUS_FAILURE = "failure" - STATUS_EXPECTED_FAILURE = "expected_failure" - STATUS_EXPECTED_TIMEOUT = "expected_timeout" - STATUS_UNEXPECTED_SUCCESS = "unexpected_success" - STATUS_SKIP = "skip" - STATUS_ERROR = "error" - STATUS_TIMEOUT = "timeout" - - """Test methods or jobs with a status matching any of these - status values will cause a testrun failure, unless - the test methods rerun and do not trigger an issue when rerun.""" - TESTRUN_ERROR_STATUS_VALUES = set([ - STATUS_ERROR, - STATUS_EXCEPTIONAL_EXIT, - STATUS_FAILURE, - STATUS_TIMEOUT - ]) - - @staticmethod - def _get_test_name_info(test): - """Returns (test-class-name, test-method-name) from a test case instance. - - @param test a unittest.TestCase instance. - - @return tuple containing (test class name, test method name) - """ - test_class_components = test.id().split(".") - test_class_name = ".".join(test_class_components[:-1]) - test_name = test_class_components[-1] - return (test_class_name, test_name) - - @staticmethod - def bare_event(event_type): - """Creates an event with default additions, event type and timestamp. - - @param event_type the value set for the "event" key, used - to distinguish events. - - @returns an event dictionary with all default additions, the "event" - key set to the passed in event_type, and the event_time value set to - time.time(). - """ - if EventBuilder.BASE_DICTIONARY is not None: - # Start with a copy of the "always include" entries. - event = dict(EventBuilder.BASE_DICTIONARY) - else: - event = {} - - event.update({ - "event": event_type, - "event_time": time.time() - }) - return event - - @staticmethod - def _assert_is_python_sourcefile(test_filename): - if test_filename is not None: - if not test_filename.endswith(".py"): - raise Exception("source python filename has unexpected extension: {}".format(test_filename)) - return test_filename - - @staticmethod - def _event_dictionary_common(test, event_type): - """Returns an event dictionary setup with values for the given event type. - - @param test the unittest.TestCase instance - - @param event_type the name of the event type (string). - - @return event dictionary with common event fields set. - """ - test_class_name, test_name = EventBuilder._get_test_name_info(test) - - # Determine the filename for the test case. If there is an attribute - # for it, use it. Otherwise, determine from the TestCase class path. - if hasattr(test, "test_filename"): - test_filename = EventBuilder._assert_is_python_sourcefile(test.test_filename) - else: - test_filename = EventBuilder._assert_is_python_sourcefile(inspect.getsourcefile(test.__class__)) - - event = EventBuilder.bare_event(event_type) - event.update({ - "test_class": test_class_name, - "test_name": test_name, - "test_filename": test_filename - }) - - return event - - @staticmethod - def _error_tuple_class(error_tuple): - """Returns the unittest error tuple's error class as a string. - - @param error_tuple the error tuple provided by the test framework. - - @return the error type (typically an exception) raised by the - test framework. - """ - type_var = error_tuple[0] - module = inspect.getmodule(type_var) - if module: - return "{}.{}".format(module.__name__, type_var.__name__) - else: - return type_var.__name__ - - @staticmethod - def _error_tuple_message(error_tuple): - """Returns the unittest error tuple's error message. - - @param error_tuple the error tuple provided by the test framework. - - @return the error message provided by the test framework. - """ - return str(error_tuple[1]) - - @staticmethod - def _error_tuple_traceback(error_tuple): - """Returns the unittest error tuple's error message. - - @param error_tuple the error tuple provided by the test framework. - - @return the error message provided by the test framework. - """ - return error_tuple[2] - - @staticmethod - def _event_dictionary_test_result(test, status): - """Returns an event dictionary with common test result fields set. - - @param test a unittest.TestCase instance. - - @param status the status/result of the test - (e.g. "success", "failure", etc.) - - @return the event dictionary - """ - event = EventBuilder._event_dictionary_common( - test, EventBuilder.TYPE_TEST_RESULT) - event["status"] = status - return event - - @staticmethod - def _event_dictionary_issue(test, status, error_tuple): - """Returns an event dictionary with common issue-containing test result - fields set. - - @param test a unittest.TestCase instance. - - @param status the status/result of the test - (e.g. "success", "failure", etc.) - - @param error_tuple the error tuple as reported by the test runner. - This is of the form (type, error). - - @return the event dictionary - """ - event = EventBuilder._event_dictionary_test_result(test, status) - event["issue_class"] = EventBuilder._error_tuple_class(error_tuple) - event["issue_message"] = EventBuilder._error_tuple_message(error_tuple) - backtrace = EventBuilder._error_tuple_traceback(error_tuple) - if backtrace is not None: - event["issue_backtrace"] = traceback.format_tb(backtrace) - return event - - @staticmethod - def event_for_start(test): - """Returns an event dictionary for the test start event. - - @param test a unittest.TestCase instance. - - @return the event dictionary - """ - return EventBuilder._event_dictionary_common( - test, EventBuilder.TYPE_TEST_START) - - @staticmethod - def event_for_success(test): - """Returns an event dictionary for a successful test. - - @param test a unittest.TestCase instance. - - @return the event dictionary - """ - return EventBuilder._event_dictionary_test_result( - test, EventBuilder.STATUS_SUCCESS) - - @staticmethod - def event_for_unexpected_success(test, bugnumber): - """Returns an event dictionary for a test that succeeded but was - expected to fail. - - @param test a unittest.TestCase instance. - - @param bugnumber the issue identifier for the bug tracking the - fix request for the test expected to fail (but is in fact - passing here). - - @return the event dictionary - - """ - event = EventBuilder._event_dictionary_test_result( - test, EventBuilder.STATUS_UNEXPECTED_SUCCESS) - if bugnumber: - event["bugnumber"] = str(bugnumber) - return event - - @staticmethod - def event_for_failure(test, error_tuple): - """Returns an event dictionary for a test that failed. - - @param test a unittest.TestCase instance. - - @param error_tuple the error tuple as reported by the test runner. - This is of the form (type, error). - - @return the event dictionary - """ - return EventBuilder._event_dictionary_issue( - test, EventBuilder.STATUS_FAILURE, error_tuple) - - @staticmethod - def event_for_expected_failure(test, error_tuple, bugnumber): - """Returns an event dictionary for a test that failed as expected. - - @param test a unittest.TestCase instance. - - @param error_tuple the error tuple as reported by the test runner. - This is of the form (type, error). - - @param bugnumber the issue identifier for the bug tracking the - fix request for the test expected to fail. - - @return the event dictionary - - """ - event = EventBuilder._event_dictionary_issue( - test, EventBuilder.STATUS_EXPECTED_FAILURE, error_tuple) - if bugnumber: - event["bugnumber"] = str(bugnumber) - return event - - @staticmethod - def event_for_skip(test, reason): - """Returns an event dictionary for a test that was skipped. - - @param test a unittest.TestCase instance. - - @param reason the reason why the test is being skipped. - - @return the event dictionary - """ - event = EventBuilder._event_dictionary_test_result( - test, EventBuilder.STATUS_SKIP) - event["skip_reason"] = reason - return event - - @staticmethod - def event_for_error(test, error_tuple): - """Returns an event dictionary for a test that hit a test execution error. - - @param test a unittest.TestCase instance. - - @param error_tuple the error tuple as reported by the test runner. - This is of the form (type, error). - - @return the event dictionary - """ - return EventBuilder._event_dictionary_issue( - test, EventBuilder.STATUS_ERROR, error_tuple) - - @staticmethod - def event_for_cleanup_error(test, error_tuple): - """Returns an event dictionary for a test that hit a test execution error - during the test cleanup phase. - - @param test a unittest.TestCase instance. - - @param error_tuple the error tuple as reported by the test runner. - This is of the form (type, error). - - @return the event dictionary - """ - event = EventBuilder._event_dictionary_issue( - test, EventBuilder.STATUS_ERROR, error_tuple) - event["issue_phase"] = "cleanup" - return event - - @staticmethod - def event_for_job_exceptional_exit( - pid, worker_index, exception_code, exception_description, - test_filename, command_line): - """Creates an event for a job (i.e. process) exit due to signal. - - @param pid the process id for the job that failed - @param worker_index optional id for the job queue running the process - @param exception_code optional code - (e.g. SIGTERM integer signal number) - @param exception_description optional string containing symbolic - representation of the issue (e.g. "SIGTERM") - @param test_filename the path to the test filename that exited - in some exceptional way. - @param command_line the Popen-style list provided as the command line - for the process that timed out. - - @return an event dictionary coding the job completion description. - """ - event = EventBuilder.bare_event(EventBuilder.TYPE_JOB_RESULT) - event["status"] = EventBuilder.STATUS_EXCEPTIONAL_EXIT - if pid is not None: - event["pid"] = pid - if worker_index is not None: - event["worker_index"] = int(worker_index) - if exception_code is not None: - event["exception_code"] = exception_code - if exception_description is not None: - event["exception_description"] = exception_description - if test_filename is not None: - event["test_filename"] = EventBuilder._assert_is_python_sourcefile(test_filename) - if command_line is not None: - event["command_line"] = command_line - return event - - @staticmethod - def event_for_job_timeout(pid, worker_index, test_filename, command_line): - """Creates an event for a job (i.e. process) timeout. - - @param pid the process id for the job that timed out - @param worker_index optional id for the job queue running the process - @param test_filename the path to the test filename that timed out. - @param command_line the Popen-style list provided as the command line - for the process that timed out. - - @return an event dictionary coding the job completion description. - """ - event = EventBuilder.bare_event(EventBuilder.TYPE_JOB_RESULT) - event["status"] = "timeout" - if pid is not None: - event["pid"] = pid - if worker_index is not None: - event["worker_index"] = int(worker_index) - if test_filename is not None: - event["test_filename"] = EventBuilder._assert_is_python_sourcefile(test_filename) - if command_line is not None: - event["command_line"] = command_line - return event - - @staticmethod - def event_for_mark_test_rerun_eligible(test): - """Creates an event that indicates the specified test is explicitly - eligible for rerun. - - Note there is a mode that will enable test rerun eligibility at the - global level. These markings for explicit rerun eligibility are - intended for the mode of running where only explicitly rerunnable - tests are rerun upon hitting an issue. - - @param test the TestCase instance to which this pertains. - - @return an event that specifies the given test as being eligible to - be rerun. - """ - event = EventBuilder._event_dictionary_common( - test, - EventBuilder.TYPE_MARK_TEST_RERUN_ELIGIBLE) - return event - - @staticmethod - def event_for_mark_test_expected_failure(test): - """Creates an event that indicates the specified test is expected - to fail. - - @param test the TestCase instance to which this pertains. - - @return an event that specifies the given test is expected to fail. - """ - event = EventBuilder._event_dictionary_common( - test, - EventBuilder.TYPE_MARK_TEST_EXPECTED_FAILURE) - return event - - @staticmethod - def add_entries_to_all_events(entries_dict): - """Specifies a dictionary of entries to add to all test events. - - This provides a mechanism for, say, a parallel test runner to - indicate to each inferior dotest.py that it should add a - worker index to each. - - Calling this method replaces all previous entries added - by a prior call to this. - - Event build methods will overwrite any entries that collide. - Thus, the passed in dictionary is the base, which gets merged - over by event building when keys collide. - - @param entries_dict a dictionary containing key and value - pairs that should be merged into all events created by the - event generator. May be None to clear out any extra entries. - """ - EventBuilder.BASE_DICTIONARY = dict(entries_dict) - - -class ResultsFormatter(object): - """Provides interface to formatting test results out to a file-like object. - - This class allows the LLDB test framework's raw test-realted - events to be processed and formatted in any manner desired. - Test events are represented by python dictionaries, formatted - as in the EventBuilder class above. - - ResultFormatter instances are given a file-like object in which - to write their results. - - ResultFormatter lifetime looks like the following: - - # The result formatter is created. - # The argparse options dictionary is generated from calling - # the SomeResultFormatter.arg_parser() with the options data - # passed to dotest.py via the "--results-formatter-options" - # argument. See the help on that for syntactic requirements - # on getting that parsed correctly. - formatter = SomeResultFormatter(file_like_object, argpared_options_dict) - - # Single call to session start, before parsing any events. - formatter.begin_session() - - formatter.handle_event({"event":"initialize",...}) - - # Zero or more calls specified for events recorded during the test session. - # The parallel test runner manages getting results from all the inferior - # dotest processes, so from a new format perspective, don't worry about - # that. The formatter will be presented with a single stream of events - # sandwiched between a single begin_session()/end_session() pair in the - # parallel test runner process/thread. - for event in zero_or_more_test_events(): - formatter.handle_event(event) - - # Single call to terminate/wrap-up. Formatters that need all the - # data before they can print a correct result (e.g. xUnit/JUnit), - # this is where the final report can be generated. - formatter.handle_event({"event":"terminate",...}) - - It is not the formatter's responsibility to close the file_like_object. - (i.e. do not close it). - - The lldb test framework passes these test events in real time, so they - arrive as they come in. - - In the case of the parallel test runner, the dotest inferiors - add a 'pid' field to the dictionary that indicates which inferior - pid generated the event. - - Note more events may be added in the future to support richer test - reporting functionality. One example: creating a true flaky test - result category so that unexpected successes really mean the test - is marked incorrectly (either should be marked flaky, or is indeed - passing consistently now and should have the xfail marker - removed). In this case, a flaky_success and flaky_fail event - likely will be added to capture these and support reporting things - like percentages of flaky test passing so we can see if we're - making some things worse/better with regards to failure rates. - - Another example: announcing all the test methods that are planned - to be run, so we can better support redo operations of various kinds - (redo all non-run tests, redo non-run tests except the one that - was running [perhaps crashed], etc.) - - Implementers are expected to override all the public methods - provided in this class. See each method's docstring to see - expectations about when the call should be chained. - - """ - @classmethod - def arg_parser(cls): - """@return arg parser used to parse formatter-specific options.""" - parser = argparse.ArgumentParser( - description='{} options'.format(cls.__name__), - usage=('dotest.py --results-formatter-options=' - '"--option1 value1 [--option2 value2 [...]]"')) - parser.add_argument( - "--dump-results", - action="store_true", - help=('dump the raw results data after printing ' - 'the summary output.')) - return parser - - def __init__(self, out_file, options): - super(ResultsFormatter, self).__init__() - self.out_file = out_file - self.options = options - self.using_terminal = False - if not self.out_file: - raise Exception("ResultsFormatter created with no file object") - self.start_time_by_test = {} - self.terminate_called = False - - # Store counts of test_result events by status. - self.result_status_counts = { - EventBuilder.STATUS_SUCCESS: 0, - EventBuilder.STATUS_EXPECTED_FAILURE: 0, - EventBuilder.STATUS_EXPECTED_TIMEOUT: 0, - EventBuilder.STATUS_SKIP: 0, - EventBuilder.STATUS_UNEXPECTED_SUCCESS: 0, - EventBuilder.STATUS_FAILURE: 0, - EventBuilder.STATUS_ERROR: 0, - EventBuilder.STATUS_TIMEOUT: 0, - EventBuilder.STATUS_EXCEPTIONAL_EXIT: 0 - } - - # Track the most recent test start event by worker index. - # We'll use this to assign TIMEOUT and exceptional - # exits to the most recent test started on a given - # worker index. - self.started_tests_by_worker = {} - - # Store the most recent test_method/job status. - self.result_events = {} - - # Track the number of test method reruns. - self.test_method_rerun_count = 0 - - # Lock that we use while mutating inner state, like the - # total test count and the elements. We minimize how - # long we hold the lock just to keep inner state safe, not - # entirely consistent from the outside. - self.lock = threading.RLock() - - # Keeps track of the test base filenames for tests that - # are expected to timeout. If a timeout occurs in any test - # basename that matches this list, that result should be - # converted into a non-issue. We'll create an expected - # timeout test status for this. - self.expected_timeouts_by_basename = set() - - # Tests which have reported that they are expecting to fail. These will - # be marked as expected failures even if they return a failing status, - # probably because they crashed or deadlocked. - self.expected_failures = set() - - # Keep track of rerun-eligible tests. - # This is a set that contains tests saved as: - # {test_filename}:{test_class}:{test_name} - self.rerun_eligible_tests = set() - - # A dictionary of test files that had a failing - # test, in the format of: - # key = test path, value = array of test methods that need rerun - self.tests_for_rerun = {} - - @classmethod - def _make_key(cls, result_event): - """Creates a key from a test or job result event. - - This key attempts to be as unique as possible. For - test result events, it will be unique per test method. - For job events (ones not promoted to a test result event), - it will be unique per test case file. - - @return a string-based key of the form - {test_filename}:{test_class}.{test_name} - """ - if result_event is None: - return None - component_count = 0 - if "test_filename" in result_event: - key = result_event["test_filename"] - component_count += 1 - if "test_class" in result_event: - if component_count > 0: - key += ":" - key += result_event["test_class"] - component_count += 1 - if "test_name" in result_event: - if component_count > 0: - key += "." - key += result_event["test_name"] - component_count += 1 - return key - - def _mark_test_as_expected_failure(self, test_result_event): - key = self._make_key(test_result_event) - if key is not None: - self.expected_failures.add(key) - else: - sys.stderr.write( - "\nerror: test marked as expected failure but " - "failed to create key.\n") - - def _mark_test_for_rerun_eligibility(self, test_result_event): - key = self._make_key(test_result_event) - if key is not None: - self.rerun_eligible_tests.add(key) - else: - sys.stderr.write( - "\nerror: test marked for re-run eligibility but " - "failed to create key.\n") - - def _maybe_add_test_to_rerun_list(self, result_event): - key = self._make_key(result_event) - if key is not None: - if (key in self.rerun_eligible_tests or - configuration.rerun_all_issues): - test_filename = result_event.get("test_filename", None) - if test_filename is not None: - test_name = result_event.get("test_name", None) - if test_filename not in self.tests_for_rerun: - self.tests_for_rerun[test_filename] = [] - if test_name is not None: - self.tests_for_rerun[test_filename].append(test_name) - else: - sys.stderr.write( - "\nerror: couldn't add testrun-failing test to rerun " - "list because no eligibility key could be created.\n") - - def _maybe_remap_job_result_event(self, test_event): - """Remaps timeout/exceptional exit job results to last test method running. - - @param test_event the job_result test event. This is an in/out - parameter. It will be modified if it can be mapped to a test_result - of the same status, using details from the last-running test method - known to be most recently started on the same worker index. - """ - test_start = None - - job_status = test_event["status"] - if job_status in [ - EventBuilder.STATUS_TIMEOUT, - EventBuilder.STATUS_EXCEPTIONAL_EXIT]: - worker_index = test_event.get("worker_index", None) - if worker_index is not None: - test_start = self.started_tests_by_worker.get( - worker_index, None) - - # If we have a test start to remap, do it here. - if test_start is not None: - test_event["event"] = EventBuilder.TYPE_TEST_RESULT - - # Fill in all fields from test start not present in - # job status message. - for (start_key, start_value) in test_start.items(): - if start_key not in test_event: - test_event[start_key] = start_value - - def _maybe_remap_expected_timeout(self, event): - if event is None: - return - - status = event.get("status", None) - if status is None or status != EventBuilder.STATUS_TIMEOUT: - return - - # Check if the timeout test's basename is in the expected timeout - # list. If so, convert to an expected timeout. - basename = os.path.basename(event.get("test_filename", "")) - if basename in self.expected_timeouts_by_basename: - # Convert to an expected timeout. - event["status"] = EventBuilder.STATUS_EXPECTED_TIMEOUT - - def _maybe_remap_expected_failure(self, event): - if event is None: - return - - key = self._make_key(event) - if key not in self.expected_failures: - return - - status = event.get("status", None) - if status in EventBuilder.TESTRUN_ERROR_STATUS_VALUES: - event["status"] = EventBuilder.STATUS_EXPECTED_FAILURE - elif status == EventBuilder.STATUS_SUCCESS: - event["status"] = EventBuilder.STATUS_UNEXPECTED_SUCCESS - - def handle_event(self, test_event): - """Handles the test event for collection into the formatter output. - - Derived classes may override this but should call down to this - implementation first. - - @param test_event the test event as formatted by one of the - event_for_* calls. - """ - with self.lock: - # Keep track of whether terminate was received. We do this so - # that a process can call the 'terminate' event on its own, to - # close down a formatter at the appropriate time. Then the - # atexit() cleanup can call the "terminate if it hasn't been - # called yet". - if test_event is not None: - event_type = test_event.get("event", "") - # We intentionally allow event_type to be checked anew - # after this check below since this check may rewrite - # the event type - if event_type == EventBuilder.TYPE_JOB_RESULT: - # Possibly convert the job status (timeout, exceptional exit) - # to an appropriate test_result event. - self._maybe_remap_job_result_event(test_event) - event_type = test_event.get("event", "") - - # Remap timeouts to expected timeouts. - if event_type in EventBuilder.RESULT_TYPES: - self._maybe_remap_expected_timeout(test_event) - self._maybe_remap_expected_failure(test_event) - event_type = test_event.get("event", "") - - if event_type == "terminate": - self.terminate_called = True - elif event_type in EventBuilder.RESULT_TYPES: - # Keep track of event counts per test/job result status type. - # The only job (i.e. inferior process) results that make it - # here are ones that cannot be remapped to the most recently - # started test for the given worker index. - status = test_event["status"] - self.result_status_counts[status] += 1 - # Clear the most recently started test for the related worker. - worker_index = test_event.get("worker_index", None) - if worker_index is not None: - self.started_tests_by_worker.pop(worker_index, None) - - if status in EventBuilder.TESTRUN_ERROR_STATUS_VALUES: - # A test/job status value in any of those status values - # causes a testrun failure. If such a test fails, check - # whether it can be rerun. If it can be rerun, add it - # to the rerun job. - self._maybe_add_test_to_rerun_list(test_event) - - # Build the test key. - test_key = self._make_key(test_event) - if test_key is None: - raise Exception( - "failed to find test filename for " - "test event {}".format(test_event)) - - # Save the most recent test event for the test key. - # This allows a second test phase to overwrite the most - # recent result for the test key (unique per method). - # We do final reporting at the end, so we'll report based - # on final results. - # We do this so that a re-run caused by, perhaps, the need - # to run a low-load, single-worker test run can have the final - # run's results to always be used. - if test_key in self.result_events: - # We are replacing the result of something that was - # already counted by the base class. Remove the double - # counting by reducing by one the count for the test - # result status. - old_status = self.result_events[test_key]["status"] - self.result_status_counts[old_status] -= 1 - self.test_method_rerun_count += 1 - self.result_events[test_key] = test_event - elif event_type == EventBuilder.TYPE_TEST_START: - # Track the start time for the test method. - self.track_start_time( - test_event["test_class"], - test_event["test_name"], - test_event["event_time"]) - # Track of the most recent test method start event - # for the related worker. This allows us to figure - # out whether a process timeout or exceptional exit - # can be charged (i.e. assigned) to a test method. - worker_index = test_event.get("worker_index", None) - if worker_index is not None: - self.started_tests_by_worker[worker_index] = test_event - - elif event_type == EventBuilder.TYPE_MARK_TEST_RERUN_ELIGIBLE: - self._mark_test_for_rerun_eligibility(test_event) - elif event_type == EventBuilder.TYPE_MARK_TEST_EXPECTED_FAILURE: - self._mark_test_as_expected_failure(test_event) - - def set_expected_timeouts_by_basename(self, basenames): - """Specifies a list of test file basenames that are allowed to timeout - without being called out as a timeout issue. - - These fall into a new status category called STATUS_EXPECTED_TIMEOUT. - """ - if basenames is not None: - for basename in basenames: - self.expected_timeouts_by_basename.add(basename) - - def track_start_time(self, test_class, test_name, start_time): - """tracks the start time of a test so elapsed time can be computed. - - this alleviates the need for test results to be processed serially - by test. it will save the start time for the test so that - elapsed_time_for_test() can compute the elapsed time properly. - """ - if test_class is None or test_name is None: - return - - test_key = "{}.{}".format(test_class, test_name) - self.start_time_by_test[test_key] = start_time - - def elapsed_time_for_test(self, test_class, test_name, end_time): - """returns the elapsed time for a test. - - this function can only be called once per test and requires that - the track_start_time() method be called sometime prior to calling - this method. - """ - if test_class is None or test_name is None: - return -2.0 - - test_key = "{}.{}".format(test_class, test_name) - if test_key not in self.start_time_by_test: - return -1.0 - else: - start_time = self.start_time_by_test[test_key] - del self.start_time_by_test[test_key] - return end_time - start_time - - def is_using_terminal(self): - """returns true if this results formatter is using the terminal and - output should be avoided.""" - return self.using_terminal - - def send_terminate_as_needed(self): - """sends the terminate event if it hasn't been received yet.""" - if not self.terminate_called: - terminate_event = EventBuilder.bare_event("terminate") - self.handle_event(terminate_event) - - # Derived classes may require self access - # pylint: disable=no-self-use - def replaces_summary(self): - """Returns whether the results formatter includes a summary - suitable to replace the old lldb test run results. - - @return True if the lldb test runner can skip its summary - generation when using this results formatter; False otherwise. - """ - return False - - def counts_by_test_result_status(self, status): - """Returns number of test method results for the given status. - - @status_result a test result status (e.g. success, fail, skip) - as defined by the EventBuilder.STATUS_* class members. - - @return an integer returning the number of test methods matching - the given test result status. - """ - return self.result_status_counts[status] - - @classmethod - def _event_sort_key(cls, event): - """Returns the sort key to be used for a test event. - - This method papers over the differences in a test method result vs. a - job (i.e. inferior process) result. - - @param event a test result or job result event. - @return a key useful for sorting events by name (test name preferably, - then by test filename). - """ - if "test_name" in event: - return event["test_name"] - else: - return event.get("test_filename", None) - - def _partition_results_by_status(self, categories): - """Partitions the captured test results by event status. - - This permits processing test results by the category ids. - - @param categories the list of categories on which to partition. - Follows the format described in _report_category_details(). - - @return a dictionary where each key is the test result status, - and each entry is a list containing all the test result events - that matched that test result status. Result status IDs with - no matching entries will have a zero-length list. - """ - partitioned_events = {} - for category in categories: - result_status_id = category[0] - matching_events = [ - [key, event] for (key, event) in self.result_events.items() - if event.get("status", "") == result_status_id] - partitioned_events[result_status_id] = sorted( - matching_events, - key=lambda x: self._event_sort_key(x[1])) - return partitioned_events - - def _print_banner(self, out_file, banner_text): - """Prints an ASCII banner around given text. - - Output goes to the out file for the results formatter. - - @param out_file a file-like object where output will be written. - @param banner_text the text to display, with a banner - of '=' around the line above and line below. - """ - banner_separator = "".ljust(len(banner_text), "=") - - out_file.write("\n{}\n{}\n{}\n".format( - banner_separator, - banner_text, - banner_separator)) - - def _print_summary_counts( - self, out_file, categories, result_events_by_status, extra_rows): - """Prints summary counts for all categories. - - @param out_file a file-like object used to print output. - - @param categories the list of categories on which to partition. - Follows the format described in _report_category_details(). - - @param result_events_by_status the partitioned list of test - result events in a dictionary, with the key set to the test - result status id and the value set to the list of test method - results that match the status id. - """ - - # Get max length for category printed name - category_with_max_printed_name = max( - categories, key=lambda x: len(x[1])) - max_category_name_length = len(category_with_max_printed_name[1]) - - # If we are provided with extra rows, consider these row name lengths. - if extra_rows is not None: - for row in extra_rows: - name_length = len(row[0]) - if name_length > max_category_name_length: - max_category_name_length = name_length - - self._print_banner(out_file, "Test Result Summary") - - # Prepend extra rows - if extra_rows is not None: - for row in extra_rows: - extra_label = "{}:".format(row[0]).ljust( - max_category_name_length + 1) - out_file.write("{} {:4}\n".format(extra_label, row[1])) - - for category in categories: - result_status_id = category[0] - result_label = "{}:".format(category[1]).ljust( - max_category_name_length + 1) - count = len(result_events_by_status[result_status_id]) - out_file.write("{} {:4}\n".format( - result_label, - count)) - - @classmethod - def _has_printable_details(cls, categories, result_events_by_status): - """Returns whether there are any test result details that need to be printed. - - This will spin through the results and see if any result in a category - that is printable has any results to print. - - @param categories the list of categories on which to partition. - Follows the format described in _report_category_details(). - - @param result_events_by_status the partitioned list of test - result events in a dictionary, with the key set to the test - result status id and the value set to the list of test method - results that match the status id. - - @return True if there are any details (i.e. test results - for failures, errors, unexpected successes); False otherwise. - """ - for category in categories: - result_status_id = category[0] - print_matching_tests = category[2] - if print_matching_tests: - if len(result_events_by_status[result_status_id]) > 0: - # We found a printable details test result status - # that has details to print. - return True - # We didn't find any test result category with printable - # details. - return False - - def _report_category_details(self, out_file, category, result_events_by_status): - """Reports all test results matching the given category spec. - - @param out_file a file-like object used to print output. - - @param category a category spec of the format [test_event_name, - printed_category_name, print_matching_entries?] - - @param result_events_by_status the partitioned list of test - result events in a dictionary, with the key set to the test - result status id and the value set to the list of test method - results that match the status id. - """ - result_status_id = category[0] - print_matching_tests = category[2] - detail_label = category[3] - - if print_matching_tests: - # Sort by test name - for (_, event) in result_events_by_status[result_status_id]: - # Convert full test path into test-root-relative. - test_relative_path = os.path.relpath( - os.path.realpath(event["test_filename"]), - lldbsuite.lldb_test_root) - - # Create extra info component (used for exceptional exit info) - if result_status_id == EventBuilder.STATUS_EXCEPTIONAL_EXIT: - extra_info = "[EXCEPTIONAL EXIT {} ({})] ".format( - event["exception_code"], - event["exception_description"]) - else: - extra_info = "" - - # Figure out the identity we will use for this test. - if configuration.verbose and ("test_class" in event): - test_id = "{}.{}".format( - event["test_class"], event["test_name"]) - elif "test_name" in event: - test_id = event["test_name"] - else: - test_id = "" - - # Display the info. - out_file.write("{}: {}{} ({})\n".format( - detail_label, - extra_info, - test_id, - test_relative_path)) - - def print_results(self, out_file): - """Writes the test result report to the output file. - - @param out_file a file-like object used for printing summary - results. This is different than self.out_file, which might - be something else for non-summary data. - """ - extra_results = [ - # Total test methods processed, excluding reruns. - ["Test Methods", len(self.result_events)], - ["Reruns", self.test_method_rerun_count]] - - # Output each of the test result entries. - categories = [ - # result id, printed name, print matching tests?, detail label - [EventBuilder.STATUS_SUCCESS, - "Success", False, None], - [EventBuilder.STATUS_EXPECTED_FAILURE, - "Expected Failure", False, None], - [EventBuilder.STATUS_FAILURE, - "Failure", True, "FAIL"], - [EventBuilder.STATUS_ERROR, - "Error", True, "ERROR"], - [EventBuilder.STATUS_EXCEPTIONAL_EXIT, - "Exceptional Exit", True, "ERROR"], - [EventBuilder.STATUS_UNEXPECTED_SUCCESS, - "Unexpected Success", True, "UNEXPECTED SUCCESS"], - [EventBuilder.STATUS_SKIP, "Skip", False, None], - [EventBuilder.STATUS_TIMEOUT, - "Timeout", True, "TIMEOUT"], - [EventBuilder.STATUS_EXPECTED_TIMEOUT, - # Intentionally using the unusual hyphenation in TIME-OUT to - # prevent buildbots from thinking it is an issue when scanning - # for TIMEOUT. - "Expected Timeout", True, "EXPECTED TIME-OUT"] - ] - - # Partition all the events by test result status - result_events_by_status = self._partition_results_by_status( - categories) - - # Print the details - have_details = self._has_printable_details( - categories, result_events_by_status) - if have_details: - self._print_banner(out_file, "Issue Details") - for category in categories: - self._report_category_details( - out_file, category, result_events_by_status) - - # Print the summary - self._print_summary_counts( - out_file, categories, result_events_by_status, extra_results) - - if self.options.dump_results: - # Debug dump of the key/result info for all categories. - self._print_banner("Results Dump") - for status, events_by_key in result_events_by_status.items(): - out_file.write("\nSTATUS: {}\n".format(status)) - for key, event in events_by_key: - out_file.write("key: {}\n".format(key)) - out_file.write("event: {}\n".format(event)) - - -class RawPickledFormatter(ResultsFormatter): - """Formats events as a pickled stream. - - The parallel test runner has inferiors pickle their results and send them - over a socket back to the parallel test. The parallel test runner then - aggregates them into the final results formatter (e.g. xUnit). - """ - - @classmethod - def arg_parser(cls): - """@return arg parser used to parse formatter-specific options.""" - parser = super(RawPickledFormatter, cls).arg_parser() - return parser - - def __init__(self, out_file, options): - super(RawPickledFormatter, self).__init__(out_file, options) - self.pid = os.getpid() - - def handle_event(self, test_event): - super(RawPickledFormatter, self).handle_event(test_event) - - # Convert initialize/terminate events into job_begin/job_end events. - event_type = test_event["event"] - if event_type is None: - return - - if event_type == "initialize": - test_event["event"] = "job_begin" - elif event_type == "terminate": - test_event["event"] = "job_end" - - # Tack on the pid. - test_event["pid"] = self.pid - - # Send it as {serialized_length_of_serialized_bytes}{serialized_bytes} - import struct - msg = cPickle.dumps(test_event) - packet = struct.pack("!I%ds" % len(msg), len(msg), msg) - self.out_file.send(packet) - - -class DumpFormatter(ResultsFormatter): - """Formats events to the file as their raw python dictionary format.""" - - def handle_event(self, test_event): - super(DumpFormatter, self).handle_event(test_event) - self.out_file.write("\n" + pprint.pformat(test_event) + "\n") Index: packages/Python/lldbsuite/test/test_result.py =================================================================== --- packages/Python/lldbsuite/test/test_result.py +++ packages/Python/lldbsuite/test/test_result.py @@ -18,9 +18,8 @@ import unittest2 # LLDB Modules -import lldbsuite from . import configuration -from .result_formatter import EventBuilder +from lldbsuite.test_event.event_builder import EventBuilder class LLDBTestResult(unittest2.TextTestResult): Index: packages/Python/lldbsuite/test/xunit_formatter.py =================================================================== --- packages/Python/lldbsuite/test/xunit_formatter.py +++ /dev/null @@ -1,565 +0,0 @@ -""" - The LLVM Compiler Infrastructure - -This file is distributed under the University of Illinois Open Source -License. See LICENSE.TXT for details. - -Provides an xUnit ResultsFormatter for integrating the LLDB -test suite with the Jenkins xUnit aggregator and other xUnit-compliant -test output processors. -""" -from __future__ import print_function -from __future__ import absolute_import - -# System modules -import re -import sys -import xml.sax.saxutils - -# Third-party modules -import six - -# Local modules -from .result_formatter import EventBuilder -from .result_formatter import ResultsFormatter - - -class XunitFormatter(ResultsFormatter): - """Provides xUnit-style formatted output. - """ - - # Result mapping arguments - RM_IGNORE = 'ignore' - RM_SUCCESS = 'success' - RM_FAILURE = 'failure' - RM_PASSTHRU = 'passthru' - - @staticmethod - def _build_illegal_xml_regex(): - """Contructs a regex to match all illegal xml characters. - - Expects to be used against a unicode string.""" - # Construct the range pairs of invalid unicode chareacters. - illegal_chars_u = [ - (0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), (0x7F, 0x84), - (0x86, 0x9F), (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)] - - # For wide builds, we have more. - if sys.maxunicode >= 0x10000: - illegal_chars_u.extend( - [(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF), (0x3FFFE, 0x3FFFF), - (0x4FFFE, 0x4FFFF), (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF), - (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF), (0x9FFFE, 0x9FFFF), - (0xAFFFE, 0xAFFFF), (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF), - (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF), (0xFFFFE, 0xFFFFF), - (0x10FFFE, 0x10FFFF)]) - - # Build up an array of range expressions. - illegal_ranges = [ - "%s-%s" % (six.unichr(low), six.unichr(high)) - for (low, high) in illegal_chars_u] - - # Compile the regex - return re.compile(six.u('[%s]') % six.u('').join(illegal_ranges)) - - @staticmethod - def _quote_attribute(text): - """Returns the given text in a manner safe for usage in an XML attribute. - - @param text the text that should appear within an XML attribute. - @return the attribute-escaped version of the input text. - """ - return xml.sax.saxutils.quoteattr(text) - - def _replace_invalid_xml(self, str_or_unicode): - """Replaces invalid XML characters with a '?'. - - @param str_or_unicode a string to replace invalid XML - characters within. Can be unicode or not. If not unicode, - assumes it is a byte string in utf-8 encoding. - - @returns a utf-8-encoded byte string with invalid - XML replaced with '?'. - """ - # Get the content into unicode - if isinstance(str_or_unicode, str): - unicode_content = str_or_unicode.decode('utf-8') - else: - unicode_content = str_or_unicode - return self.invalid_xml_re.sub( - six.u('?'), unicode_content).encode('utf-8') - - @classmethod - def arg_parser(cls): - """@return arg parser used to parse formatter-specific options.""" - parser = super(XunitFormatter, cls).arg_parser() - - # These are valid choices for results mapping. - results_mapping_choices = [ - XunitFormatter.RM_IGNORE, - XunitFormatter.RM_SUCCESS, - XunitFormatter.RM_FAILURE, - XunitFormatter.RM_PASSTHRU] - parser.add_argument( - "--assert-on-unknown-events", - action="store_true", - help=('cause unknown test events to generate ' - 'a python assert. Default is to ignore.')) - parser.add_argument( - "--ignore-skip-name", - "-n", - metavar='PATTERN', - action="append", - dest='ignore_skip_name_patterns', - help=('a python regex pattern, where ' - 'any skipped test with a test method name where regex ' - 'matches (via search) will be ignored for xUnit test ' - 'result purposes. Can be specified multiple times.')) - parser.add_argument( - "--ignore-skip-reason", - "-r", - metavar='PATTERN', - action="append", - dest='ignore_skip_reason_patterns', - help=('a python regex pattern, where ' - 'any skipped test with a skip reason where the regex ' - 'matches (via search) will be ignored for xUnit test ' - 'result purposes. Can be specified multiple times.')) - parser.add_argument( - "--xpass", action="store", choices=results_mapping_choices, - default=XunitFormatter.RM_FAILURE, - help=('specify mapping from unexpected success to jUnit/xUnit ' - 'result type')) - parser.add_argument( - "--xfail", action="store", choices=results_mapping_choices, - default=XunitFormatter.RM_IGNORE, - help=('specify mapping from expected failure to jUnit/xUnit ' - 'result type')) - return parser - - @staticmethod - def _build_regex_list_from_patterns(patterns): - """Builds a list of compiled regexes from option value. - - @param option string containing a comma-separated list of regex - patterns. Zero-length or None will produce an empty regex list. - - @return list of compiled regular expressions, empty if no - patterns provided. - """ - regex_list = [] - if patterns is not None: - for pattern in patterns: - regex_list.append(re.compile(pattern)) - return regex_list - - def __init__(self, out_file, options): - """Initializes the XunitFormatter instance. - @param out_file file-like object where formatted output is written. - @param options_dict specifies a dictionary of options for the - formatter. - """ - # Initialize the parent - super(XunitFormatter, self).__init__(out_file, options) - self.text_encoding = "UTF-8" - self.invalid_xml_re = XunitFormatter._build_illegal_xml_regex() - self.total_test_count = 0 - self.ignore_skip_name_regexes = ( - XunitFormatter._build_regex_list_from_patterns( - options.ignore_skip_name_patterns)) - self.ignore_skip_reason_regexes = ( - XunitFormatter._build_regex_list_from_patterns( - options.ignore_skip_reason_patterns)) - - self.elements = { - "successes": [], - "errors": [], - "failures": [], - "skips": [], - "unexpected_successes": [], - "expected_failures": [], - "all": [] - } - - self.status_handlers = { - EventBuilder.STATUS_SUCCESS: self._handle_success, - EventBuilder.STATUS_FAILURE: self._handle_failure, - EventBuilder.STATUS_ERROR: self._handle_error, - EventBuilder.STATUS_SKIP: self._handle_skip, - EventBuilder.STATUS_EXPECTED_FAILURE: - self._handle_expected_failure, - EventBuilder.STATUS_EXPECTED_TIMEOUT: - self._handle_expected_timeout, - EventBuilder.STATUS_UNEXPECTED_SUCCESS: - self._handle_unexpected_success, - EventBuilder.STATUS_EXCEPTIONAL_EXIT: - self._handle_exceptional_exit, - EventBuilder.STATUS_TIMEOUT: - self._handle_timeout - } - - RESULT_TYPES = set( - [EventBuilder.TYPE_TEST_RESULT, - EventBuilder.TYPE_JOB_RESULT]) - - def handle_event(self, test_event): - super(XunitFormatter, self).handle_event(test_event) - - event_type = test_event["event"] - if event_type is None: - return - - if event_type == "terminate": - # Process all the final result events into their - # XML counterparts. - for result_event in self.result_events.values(): - self._process_test_result(result_event) - self._finish_output() - else: - # This is an unknown event. - if self.options.assert_on_unknown_events: - raise Exception("unknown event type {} from {}\n".format( - event_type, test_event)) - - def _handle_success(self, test_event): - """Handles a test success. - @param test_event the test event to handle. - """ - result = self._common_add_testcase_entry(test_event) - with self.lock: - self.elements["successes"].append(result) - - def _handle_failure(self, test_event): - """Handles a test failure. - @param test_event the test event to handle. - """ - message = self._replace_invalid_xml(test_event["issue_message"]) - backtrace = self._replace_invalid_xml( - "".join(test_event.get("issue_backtrace", []))) - - result = self._common_add_testcase_entry( - test_event, - inner_content=( - ''.format( - XunitFormatter._quote_attribute(test_event["issue_class"]), - XunitFormatter._quote_attribute(message), - backtrace) - )) - with self.lock: - self.elements["failures"].append(result) - - def _handle_error(self, test_event): - """Handles a test error. - @param test_event the test event to handle. - """ - message = self._replace_invalid_xml(test_event["issue_message"]) - backtrace = self._replace_invalid_xml( - "".join(test_event.get("issue_backtrace", []))) - - result = self._common_add_testcase_entry( - test_event, - inner_content=( - ''.format( - XunitFormatter._quote_attribute(test_event["issue_class"]), - XunitFormatter._quote_attribute(message), - backtrace) - )) - with self.lock: - self.elements["errors"].append(result) - - def _handle_exceptional_exit(self, test_event): - """Handles an exceptional exit. - @param test_event the test method or job result event to handle. - """ - if "test_name" in test_event: - name = test_event["test_name"] - else: - name = test_event.get("test_filename", "") - - message_text = "ERROR: {} ({}): {}".format( - test_event.get("exception_code", 0), - test_event.get("exception_description", ""), - name) - message = self._replace_invalid_xml(message_text) - - result = self._common_add_testcase_entry( - test_event, - inner_content=( - ''.format( - "exceptional_exit", - XunitFormatter._quote_attribute(message)) - )) - with self.lock: - self.elements["errors"].append(result) - - def _handle_timeout(self, test_event): - """Handles a test method or job timeout. - @param test_event the test method or job result event to handle. - """ - if "test_name" in test_event: - name = test_event["test_name"] - else: - name = test_event.get("test_filename", "") - - message_text = "TIMEOUT: {}".format(name) - message = self._replace_invalid_xml(message_text) - - result = self._common_add_testcase_entry( - test_event, - inner_content=( - ''.format( - "timeout", - XunitFormatter._quote_attribute(message)) - )) - with self.lock: - self.elements["errors"].append(result) - - @staticmethod - def _ignore_based_on_regex_list(test_event, test_key, regex_list): - """Returns whether to ignore a test event based on patterns. - - @param test_event the test event dictionary to check. - @param test_key the key within the dictionary to check. - @param regex_list a list of zero or more regexes. May contain - zero or more compiled regexes. - - @return True if any o the regex list match based on the - re.search() method; false otherwise. - """ - for regex in regex_list: - match = regex.search(test_event.get(test_key, '')) - if match: - return True - return False - - def _handle_skip(self, test_event): - """Handles a skipped test. - @param test_event the test event to handle. - """ - - # Are we ignoring this test based on test name? - if XunitFormatter._ignore_based_on_regex_list( - test_event, 'test_name', self.ignore_skip_name_regexes): - return - - # Are we ignoring this test based on skip reason? - if XunitFormatter._ignore_based_on_regex_list( - test_event, 'skip_reason', self.ignore_skip_reason_regexes): - return - - # We're not ignoring this test. Process the skip. - reason = self._replace_invalid_xml(test_event.get("skip_reason", "")) - result = self._common_add_testcase_entry( - test_event, - inner_content=''.format( - XunitFormatter._quote_attribute(reason))) - with self.lock: - self.elements["skips"].append(result) - - def _handle_expected_failure(self, test_event): - """Handles a test that failed as expected. - @param test_event the test event to handle. - """ - if self.options.xfail == XunitFormatter.RM_PASSTHRU: - # This is not a natively-supported junit/xunit - # testcase mode, so it might fail a validating - # test results viewer. - if "bugnumber" in test_event: - bug_id_attribute = 'bug-id={} '.format( - XunitFormatter._quote_attribute(test_event["bugnumber"])) - else: - bug_id_attribute = '' - - result = self._common_add_testcase_entry( - test_event, - inner_content=( - ''.format( - bug_id_attribute, - XunitFormatter._quote_attribute( - test_event["issue_class"]), - XunitFormatter._quote_attribute( - test_event["issue_message"])) - )) - with self.lock: - self.elements["expected_failures"].append(result) - elif self.options.xfail == XunitFormatter.RM_SUCCESS: - result = self._common_add_testcase_entry(test_event) - with self.lock: - self.elements["successes"].append(result) - elif self.options.xfail == XunitFormatter.RM_FAILURE: - result = self._common_add_testcase_entry( - test_event, - inner_content=''.format( - XunitFormatter._quote_attribute(test_event["issue_class"]), - XunitFormatter._quote_attribute( - test_event["issue_message"]))) - with self.lock: - self.elements["failures"].append(result) - elif self.options.xfail == XunitFormatter.RM_IGNORE: - pass - else: - raise Exception( - "unknown xfail option: {}".format(self.options.xfail)) - - def _handle_expected_timeout(self, test_event): - """Handles expected_timeout. - @param test_event the test event to handle. - """ - # We don't do anything with expected timeouts, not even report. - pass - - def _handle_unexpected_success(self, test_event): - """Handles a test that passed but was expected to fail. - @param test_event the test event to handle. - """ - if self.options.xpass == XunitFormatter.RM_PASSTHRU: - # This is not a natively-supported junit/xunit - # testcase mode, so it might fail a validating - # test results viewer. - result = self._common_add_testcase_entry( - test_event, - inner_content=("")) - with self.lock: - self.elements["unexpected_successes"].append(result) - elif self.options.xpass == XunitFormatter.RM_SUCCESS: - # Treat the xpass as a success. - result = self._common_add_testcase_entry(test_event) - with self.lock: - self.elements["successes"].append(result) - elif self.options.xpass == XunitFormatter.RM_FAILURE: - # Treat the xpass as a failure. - if "bugnumber" in test_event: - message = "unexpected success (bug_id:{})".format( - test_event["bugnumber"]) - else: - message = "unexpected success (bug_id:none)" - result = self._common_add_testcase_entry( - test_event, - inner_content=''.format( - XunitFormatter._quote_attribute("unexpected_success"), - XunitFormatter._quote_attribute(message))) - with self.lock: - self.elements["failures"].append(result) - elif self.options.xpass == XunitFormatter.RM_IGNORE: - # Ignore the xpass result as far as xUnit reporting goes. - pass - else: - raise Exception("unknown xpass option: {}".format( - self.options.xpass)) - - def _process_test_result(self, test_event): - """Processes the test_event known to be a test result. - - This categorizes the event appropriately and stores the data needed - to generate the final xUnit report. This method skips events that - cannot be represented in xUnit output. - """ - if "status" not in test_event: - raise Exception("test event dictionary missing 'status' key") - - status = test_event["status"] - if status not in self.status_handlers: - raise Exception("test event status '{}' unsupported".format( - status)) - - # Call the status handler for the test result. - self.status_handlers[status](test_event) - - def _common_add_testcase_entry(self, test_event, inner_content=None): - """Registers a testcase result, and returns the text created. - - The caller is expected to manage failure/skip/success counts - in some kind of appropriate way. This call simply constructs - the XML and appends the returned result to the self.all_results - list. - - @param test_event the test event dictionary. - - @param inner_content if specified, gets included in the - inner section, at the point before stdout and stderr would be - included. This is where a , , , etc. - could go. - - @return the text of the xml testcase element. - """ - - # Get elapsed time. - test_class = test_event.get("test_class", "") - test_name = test_event.get("test_name", "") - event_time = test_event["event_time"] - time_taken = self.elapsed_time_for_test( - test_class, test_name, event_time) - - # Plumb in stdout/stderr once we shift over to only test results. - test_stdout = '' - test_stderr = '' - - # Formulate the output xml. - if not inner_content: - inner_content = "" - result = ( - '' - '{}{}{}'.format( - test_class, - test_name, - time_taken, - inner_content, - test_stdout, - test_stderr)) - - # Save the result, update total test count. - with self.lock: - self.total_test_count += 1 - self.elements["all"].append(result) - - return result - - def _finish_output_no_lock(self): - """Flushes out the report of test executions to form valid xml output. - - xUnit output is in XML. The reporting system cannot complete the - formatting of the output without knowing when there is no more input. - This call addresses notifcation of the completed test run and thus is - when we can finish off the report output. - """ - - # Figure out the counts line for the testsuite. If we have - # been counting either unexpected successes or expected - # failures, we'll output those in the counts, at the risk of - # being invalidated by a validating test results viewer. - # These aren't counted by default so they won't show up unless - # the user specified a formatter option to include them. - xfail_count = len(self.elements["expected_failures"]) - xpass_count = len(self.elements["unexpected_successes"]) - if xfail_count > 0 or xpass_count > 0: - extra_testsuite_attributes = ( - ' expected-failures="{}"' - ' unexpected-successes="{}"'.format(xfail_count, xpass_count)) - else: - extra_testsuite_attributes = "" - - # Output the header. - self.out_file.write( - '\n' - '' - '\n'.format( - self.text_encoding, - "LLDB test suite", - self.total_test_count, - len(self.elements["errors"]), - len(self.elements["failures"]), - len(self.elements["skips"]), - extra_testsuite_attributes)) - - # Output each of the test result entries. - for result in self.elements["all"]: - self.out_file.write(result + '\n') - - # Close off the test suite. - self.out_file.write('\n') - - def _finish_output(self): - """Finish writing output as all incoming events have arrived.""" - with self.lock: - self._finish_output_no_lock() Index: packages/Python/lldbsuite/test_event/dotest_channels.py =================================================================== --- /dev/null +++ packages/Python/lldbsuite/test_event/dotest_channels.py @@ -0,0 +1,208 @@ +""" + The LLVM Compiler Infrastructure + +This file is distributed under the University of Illinois Open Source +License. See LICENSE.TXT for details. + +Sync lldb and related source from a local machine to a remote machine. + +This facilitates working on the lldb sourcecode on multiple machines +and multiple OS types, verifying changes across all. + + +This module provides asyncore channels used within the LLDB test +framework. +""" + +from __future__ import print_function +from __future__ import absolute_import + + +# System modules +import asyncore +import socket + +# Third-party modules +from six.moves import cPickle + +# LLDB modules + + +class UnpicklingForwardingReaderChannel(asyncore.dispatcher): + """Provides an unpickling, forwarding asyncore dispatch channel reader. + + Inferior dotest.py processes with side-channel-based test results will + send test result event data in a pickled format, one event at a time. + This class supports reconstructing the pickled data and forwarding it + on to its final destination. + + The channel data is written in the form: + {num_payload_bytes}#{payload_bytes} + + The bulk of this class is devoted to reading and parsing out + the payload bytes. + """ + def __init__(self, file_object, async_map, forwarding_func): + asyncore.dispatcher.__init__(self, sock=file_object, map=async_map) + + self.header_contents = b"" + self.packet_bytes_remaining = 0 + self.reading_header = True + self.ibuffer = b'' + self.forwarding_func = forwarding_func + if forwarding_func is None: + # This whole class is useless if we do nothing with the + # unpickled results. + raise Exception("forwarding function must be set") + + # Initiate all connections by sending an ack. This allows + # the initiators of the socket to await this to ensure + # that this end is up and running (and therefore already + # into the async map). + ack_bytes = bytearray() + ack_bytes.append(chr(42)) + file_object.send(ack_bytes) + + def deserialize_payload(self): + """Unpickles the collected input buffer bytes and forwards.""" + if len(self.ibuffer) > 0: + self.forwarding_func(cPickle.loads(self.ibuffer)) + self.ibuffer = b'' + + def consume_header_bytes(self, data): + """Consumes header bytes from the front of data. + @param data the incoming data stream bytes + @return any data leftover after consuming header bytes. + """ + # We're done if there is no content. + if not data or (len(data) == 0): + return None + + full_header_len = 4 + + assert len(self.header_contents) < full_header_len + + bytes_avail = len(data) + bytes_needed = full_header_len - len(self.header_contents) + header_bytes_avail = min(bytes_needed, bytes_avail) + self.header_contents += data[:header_bytes_avail] + if len(self.header_contents) == full_header_len: + import struct + # End of header. + self.packet_bytes_remaining = struct.unpack( + "!I", self.header_contents)[0] + self.header_contents = b"" + self.reading_header = False + return data[header_bytes_avail:] + + # If we made it here, we've exhausted the data and + # we're still parsing header content. + return None + + def consume_payload_bytes(self, data): + """Consumes payload bytes from the front of data. + @param data the incoming data stream bytes + @return any data leftover after consuming remaining payload bytes. + """ + if not data or (len(data) == 0): + # We're done and there's nothing to do. + return None + + data_len = len(data) + if data_len <= self.packet_bytes_remaining: + # We're consuming all the data provided. + self.ibuffer += data + self.packet_bytes_remaining -= data_len + + # If we're no longer waiting for payload bytes, + # we flip back to parsing header bytes and we + # unpickle the payload contents. + if self.packet_bytes_remaining < 1: + self.reading_header = True + self.deserialize_payload() + + # We're done, no more data left. + return None + else: + # We're only consuming a portion of the data since + # the data contains more than the payload amount. + self.ibuffer += data[:self.packet_bytes_remaining] + data = data[self.packet_bytes_remaining:] + + # We now move on to reading the header. + self.reading_header = True + self.packet_bytes_remaining = 0 + + # And we can deserialize the payload. + self.deserialize_payload() + + # Return the remaining data. + return data + + def handle_read(self): + # Read some data from the socket. + try: + data = self.recv(8192) + # print('driver socket READ: %d bytes' % len(data)) + except socket.error as socket_error: + print( + "\nINFO: received socket error when reading data " + "from test inferior:\n{}".format(socket_error)) + raise + except Exception as general_exception: + print( + "\nERROR: received non-socket error when reading data " + "from the test inferior:\n{}".format(general_exception)) + raise + + # Consume the message content. + while data and (len(data) > 0): + # If we're reading the header, gather header bytes. + if self.reading_header: + data = self.consume_header_bytes(data) + else: + data = self.consume_payload_bytes(data) + + def handle_close(self): + # print("socket reader: closing port") + self.close() + + +class UnpicklingForwardingListenerChannel(asyncore.dispatcher): + """Provides a socket listener asyncore channel for unpickling/forwarding. + + This channel will listen on a socket port (use 0 for host-selected). Any + client that connects will have an UnpicklingForwardingReaderChannel handle + communication over the connection. + + The dotest parallel test runners, when collecting test results, open the + test results side channel over a socket. This channel handles connections + from inferiors back to the test runner. Each worker fires up a listener + for each inferior invocation. This simplifies the asyncore.loop() usage, + one of the reasons for implementing with asyncore. This listener shuts + down once a single connection is made to it. + """ + def __init__(self, async_map, host, port, backlog_count, forwarding_func): + asyncore.dispatcher.__init__(self, map=async_map) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind((host, port)) + self.address = self.socket.getsockname() + self.listen(backlog_count) + self.handler = None + self.async_map = async_map + self.forwarding_func = forwarding_func + if forwarding_func is None: + # This whole class is useless if we do nothing with the + # unpickled results. + raise Exception("forwarding function must be set") + + def handle_accept(self): + (sock, addr) = self.socket.accept() + if sock and addr: + # print('Incoming connection from %s' % repr(addr)) + self.handler = UnpicklingForwardingReaderChannel( + sock, self.async_map, self.forwarding_func) + + def handle_close(self): + self.close() Index: packages/Python/lldbsuite/test_event/event_builder.py =================================================================== --- /dev/null +++ packages/Python/lldbsuite/test_event/event_builder.py @@ -0,0 +1,435 @@ +""" + The LLVM Compiler Infrastructure + +This file is distributed under the University of Illinois Open Source +License. See LICENSE.TXT for details. + +Provides a class to build Python test event data structures. +""" + +from __future__ import print_function +from __future__ import absolute_import + +# System modules +import inspect +import time +import traceback + +# Third-party modules + +# LLDB modules + + +class EventBuilder(object): + """Helper class to build test result event dictionaries.""" + + BASE_DICTIONARY = None + + # Test Event Types + TYPE_JOB_RESULT = "job_result" + TYPE_TEST_RESULT = "test_result" + TYPE_TEST_START = "test_start" + TYPE_MARK_TEST_RERUN_ELIGIBLE = "test_eligible_for_rerun" + TYPE_MARK_TEST_EXPECTED_FAILURE = "test_expected_failure" + TYPE_SESSION_TERMINATE = "terminate" + + RESULT_TYPES = {TYPE_JOB_RESULT, TYPE_TEST_RESULT} + + # Test/Job Status Tags + STATUS_EXCEPTIONAL_EXIT = "exceptional_exit" + STATUS_SUCCESS = "success" + STATUS_FAILURE = "failure" + STATUS_EXPECTED_FAILURE = "expected_failure" + STATUS_EXPECTED_TIMEOUT = "expected_timeout" + STATUS_UNEXPECTED_SUCCESS = "unexpected_success" + STATUS_SKIP = "skip" + STATUS_ERROR = "error" + STATUS_TIMEOUT = "timeout" + + """Test methods or jobs with a status matching any of these + status values will cause a testrun failure, unless + the test methods rerun and do not trigger an issue when rerun.""" + TESTRUN_ERROR_STATUS_VALUES = {STATUS_ERROR, STATUS_EXCEPTIONAL_EXIT, STATUS_FAILURE, STATUS_TIMEOUT} + + @staticmethod + def _get_test_name_info(test): + """Returns (test-class-name, test-method-name) from a test case instance. + + @param test a unittest.TestCase instance. + + @return tuple containing (test class name, test method name) + """ + test_class_components = test.id().split(".") + test_class_name = ".".join(test_class_components[:-1]) + test_name = test_class_components[-1] + return test_class_name, test_name + + @staticmethod + def bare_event(event_type): + """Creates an event with default additions, event type and timestamp. + + @param event_type the value set for the "event" key, used + to distinguish events. + + @returns an event dictionary with all default additions, the "event" + key set to the passed in event_type, and the event_time value set to + time.time(). + """ + if EventBuilder.BASE_DICTIONARY is not None: + # Start with a copy of the "always include" entries. + event = dict(EventBuilder.BASE_DICTIONARY) + else: + event = {} + + event.update({ + "event": event_type, + "event_time": time.time() + }) + return event + + @staticmethod + def _assert_is_python_sourcefile(test_filename): + if test_filename is not None: + if not test_filename.endswith(".py"): + raise Exception("source python filename has unexpected extension: {}".format(test_filename)) + return test_filename + + @staticmethod + def _event_dictionary_common(test, event_type): + """Returns an event dictionary setup with values for the given event type. + + @param test the unittest.TestCase instance + + @param event_type the name of the event type (string). + + @return event dictionary with common event fields set. + """ + test_class_name, test_name = EventBuilder._get_test_name_info(test) + + # Determine the filename for the test case. If there is an attribute + # for it, use it. Otherwise, determine from the TestCase class path. + if hasattr(test, "test_filename"): + test_filename = EventBuilder._assert_is_python_sourcefile(test.test_filename) + else: + test_filename = EventBuilder._assert_is_python_sourcefile(inspect.getsourcefile(test.__class__)) + + event = EventBuilder.bare_event(event_type) + event.update({ + "test_class": test_class_name, + "test_name": test_name, + "test_filename": test_filename + }) + + return event + + @staticmethod + def _error_tuple_class(error_tuple): + """Returns the unittest error tuple's error class as a string. + + @param error_tuple the error tuple provided by the test framework. + + @return the error type (typically an exception) raised by the + test framework. + """ + type_var = error_tuple[0] + module = inspect.getmodule(type_var) + if module: + return "{}.{}".format(module.__name__, type_var.__name__) + else: + return type_var.__name__ + + @staticmethod + def _error_tuple_message(error_tuple): + """Returns the unittest error tuple's error message. + + @param error_tuple the error tuple provided by the test framework. + + @return the error message provided by the test framework. + """ + return str(error_tuple[1]) + + @staticmethod + def _error_tuple_traceback(error_tuple): + """Returns the unittest error tuple's error message. + + @param error_tuple the error tuple provided by the test framework. + + @return the error message provided by the test framework. + """ + return error_tuple[2] + + @staticmethod + def _event_dictionary_test_result(test, status): + """Returns an event dictionary with common test result fields set. + + @param test a unittest.TestCase instance. + + @param status the status/result of the test + (e.g. "success", "failure", etc.) + + @return the event dictionary + """ + event = EventBuilder._event_dictionary_common( + test, EventBuilder.TYPE_TEST_RESULT) + event["status"] = status + return event + + @staticmethod + def _event_dictionary_issue(test, status, error_tuple): + """Returns an event dictionary with common issue-containing test result + fields set. + + @param test a unittest.TestCase instance. + + @param status the status/result of the test + (e.g. "success", "failure", etc.) + + @param error_tuple the error tuple as reported by the test runner. + This is of the form (type, error). + + @return the event dictionary + """ + event = EventBuilder._event_dictionary_test_result(test, status) + event["issue_class"] = EventBuilder._error_tuple_class(error_tuple) + event["issue_message"] = EventBuilder._error_tuple_message(error_tuple) + backtrace = EventBuilder._error_tuple_traceback(error_tuple) + if backtrace is not None: + event["issue_backtrace"] = traceback.format_tb(backtrace) + return event + + @staticmethod + def event_for_start(test): + """Returns an event dictionary for the test start event. + + @param test a unittest.TestCase instance. + + @return the event dictionary + """ + return EventBuilder._event_dictionary_common( + test, EventBuilder.TYPE_TEST_START) + + @staticmethod + def event_for_success(test): + """Returns an event dictionary for a successful test. + + @param test a unittest.TestCase instance. + + @return the event dictionary + """ + return EventBuilder._event_dictionary_test_result( + test, EventBuilder.STATUS_SUCCESS) + + @staticmethod + def event_for_unexpected_success(test, bugnumber): + """Returns an event dictionary for a test that succeeded but was + expected to fail. + + @param test a unittest.TestCase instance. + + @param bugnumber the issue identifier for the bug tracking the + fix request for the test expected to fail (but is in fact + passing here). + + @return the event dictionary + + """ + event = EventBuilder._event_dictionary_test_result( + test, EventBuilder.STATUS_UNEXPECTED_SUCCESS) + if bugnumber: + event["bugnumber"] = str(bugnumber) + return event + + @staticmethod + def event_for_failure(test, error_tuple): + """Returns an event dictionary for a test that failed. + + @param test a unittest.TestCase instance. + + @param error_tuple the error tuple as reported by the test runner. + This is of the form (type, error). + + @return the event dictionary + """ + return EventBuilder._event_dictionary_issue( + test, EventBuilder.STATUS_FAILURE, error_tuple) + + @staticmethod + def event_for_expected_failure(test, error_tuple, bugnumber): + """Returns an event dictionary for a test that failed as expected. + + @param test a unittest.TestCase instance. + + @param error_tuple the error tuple as reported by the test runner. + This is of the form (type, error). + + @param bugnumber the issue identifier for the bug tracking the + fix request for the test expected to fail. + + @return the event dictionary + + """ + event = EventBuilder._event_dictionary_issue( + test, EventBuilder.STATUS_EXPECTED_FAILURE, error_tuple) + if bugnumber: + event["bugnumber"] = str(bugnumber) + return event + + @staticmethod + def event_for_skip(test, reason): + """Returns an event dictionary for a test that was skipped. + + @param test a unittest.TestCase instance. + + @param reason the reason why the test is being skipped. + + @return the event dictionary + """ + event = EventBuilder._event_dictionary_test_result( + test, EventBuilder.STATUS_SKIP) + event["skip_reason"] = reason + return event + + @staticmethod + def event_for_error(test, error_tuple): + """Returns an event dictionary for a test that hit a test execution error. + + @param test a unittest.TestCase instance. + + @param error_tuple the error tuple as reported by the test runner. + This is of the form (type, error). + + @return the event dictionary + """ + return EventBuilder._event_dictionary_issue( + test, EventBuilder.STATUS_ERROR, error_tuple) + + @staticmethod + def event_for_cleanup_error(test, error_tuple): + """Returns an event dictionary for a test that hit a test execution error + during the test cleanup phase. + + @param test a unittest.TestCase instance. + + @param error_tuple the error tuple as reported by the test runner. + This is of the form (type, error). + + @return the event dictionary + """ + event = EventBuilder._event_dictionary_issue( + test, EventBuilder.STATUS_ERROR, error_tuple) + event["issue_phase"] = "cleanup" + return event + + @staticmethod + def event_for_job_exceptional_exit( + pid, worker_index, exception_code, exception_description, + test_filename, command_line): + """Creates an event for a job (i.e. process) exit due to signal. + + @param pid the process id for the job that failed + @param worker_index optional id for the job queue running the process + @param exception_code optional code + (e.g. SIGTERM integer signal number) + @param exception_description optional string containing symbolic + representation of the issue (e.g. "SIGTERM") + @param test_filename the path to the test filename that exited + in some exceptional way. + @param command_line the Popen()-style list provided as the command line + for the process that timed out. + + @return an event dictionary coding the job completion description. + """ + event = EventBuilder.bare_event(EventBuilder.TYPE_JOB_RESULT) + event["status"] = EventBuilder.STATUS_EXCEPTIONAL_EXIT + if pid is not None: + event["pid"] = pid + if worker_index is not None: + event["worker_index"] = int(worker_index) + if exception_code is not None: + event["exception_code"] = exception_code + if exception_description is not None: + event["exception_description"] = exception_description + if test_filename is not None: + event["test_filename"] = EventBuilder._assert_is_python_sourcefile(test_filename) + if command_line is not None: + event["command_line"] = command_line + return event + + @staticmethod + def event_for_job_timeout(pid, worker_index, test_filename, command_line): + """Creates an event for a job (i.e. process) timeout. + + @param pid the process id for the job that timed out + @param worker_index optional id for the job queue running the process + @param test_filename the path to the test filename that timed out. + @param command_line the Popen-style list provided as the command line + for the process that timed out. + + @return an event dictionary coding the job completion description. + """ + event = EventBuilder.bare_event(EventBuilder.TYPE_JOB_RESULT) + event["status"] = "timeout" + if pid is not None: + event["pid"] = pid + if worker_index is not None: + event["worker_index"] = int(worker_index) + if test_filename is not None: + event["test_filename"] = EventBuilder._assert_is_python_sourcefile(test_filename) + if command_line is not None: + event["command_line"] = command_line + return event + + @staticmethod + def event_for_mark_test_rerun_eligible(test): + """Creates an event that indicates the specified test is explicitly + eligible for rerun. + + Note there is a mode that will enable test rerun eligibility at the + global level. These markings for explicit rerun eligibility are + intended for the mode of running where only explicitly re-runnable + tests are rerun upon hitting an issue. + + @param test the TestCase instance to which this pertains. + + @return an event that specifies the given test as being eligible to + be rerun. + """ + event = EventBuilder._event_dictionary_common( + test, + EventBuilder.TYPE_MARK_TEST_RERUN_ELIGIBLE) + return event + + @staticmethod + def event_for_mark_test_expected_failure(test): + """Creates an event that indicates the specified test is expected + to fail. + + @param test the TestCase instance to which this pertains. + + @return an event that specifies the given test is expected to fail. + """ + event = EventBuilder._event_dictionary_common( + test, + EventBuilder.TYPE_MARK_TEST_EXPECTED_FAILURE) + return event + + @staticmethod + def add_entries_to_all_events(entries_dict): + """Specifies a dictionary of entries to add to all test events. + + This provides a mechanism for, say, a parallel test runner to + indicate to each inferior dotest.py that it should add a + worker index to each. + + Calling this method replaces all previous entries added + by a prior call to this. + + Event build methods will overwrite any entries that collide. + Thus, the passed in dictionary is the base, which gets merged + over by event building when keys collide. + + @param entries_dict a dictionary containing key and value + pairs that should be merged into all events created by the + event generator. May be None to clear out any extra entries. + """ + EventBuilder.BASE_DICTIONARY = dict(entries_dict) Index: packages/Python/lldbsuite/test_event/formatter/__init__.py =================================================================== --- /dev/null +++ packages/Python/lldbsuite/test_event/formatter/__init__.py @@ -0,0 +1,160 @@ +""" + The LLVM Compiler Infrastructure + +This file is distributed under the University of Illinois Open Source +License. See LICENSE.TXT for details. +""" + +from __future__ import print_function +from __future__ import absolute_import + +# System modules +import importlib +import socket +import sys + +# Third-party modules + +# LLDB modules + + +# Ignore method count on DTOs. +# pylint: disable=too-few-public-methods +class FormatterConfig(object): + """Provides formatter configuration info to create_results_formatter().""" + + def __init__(self): + self.filename = None + self.port = None + self.formatter_name = None + self.formatter_options = None + + +# Ignore method count on DTOs. +# pylint: disable=too-few-public-methods +class CreatedFormatter(object): + """Provides transfer object for returns from create_results_formatter().""" + + def __init__(self, formatter, cleanup_func): + self.formatter = formatter + self.cleanup_func = cleanup_func + + +SOCKET_ACK_BYTE_VALUE = b'*' # ASCII for chr(42) + + +def create_results_formatter(config): + """Sets up a test results formatter. + + @param config an instance of FormatterConfig + that indicates how to setup the ResultsFormatter. + + @return an instance of CreatedFormatter. + """ + + def create_socket(port): + """Creates a socket to the localhost on the given port. + + @param port the port number of the listening port on + the localhost. + + @return (socket object, socket closing function) + """ + + def socket_closer(open_sock): + """Close down an opened socket properly.""" + open_sock.shutdown(socket.SHUT_RDWR) + open_sock.close() + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(("localhost", port)) + + # Wait for the ack from the listener side. + # This is needed to prevent a race condition + # in the main dosep.py processing loop: we + # can't allow a worker queue thread to die + # that has outstanding messages to a listener + # socket before the listener socket asyncore + # listener socket gets spun up; otherwise, + # we lose the test result info. + read_bytes = sock.recv(1) + if read_bytes is None or (len(read_bytes) < 1) or (read_bytes[0] != SOCKET_ACK_BYTE_VALUE): + raise Exception("listening socket did not respond with ack byte: response={}".format(read_bytes)) + + return sock, lambda: socket_closer(sock) + + default_formatter_name = None + results_file_object = None + cleanup_func = None + + if config.filename: + # Open the results file for writing. + if config.filename == 'stdout': + results_file_object = sys.stdout + cleanup_func = None + elif config.filename == 'stderr': + results_file_object = sys.stderr + cleanup_func = None + else: + results_file_object = open(config.filename, "w") + cleanup_func = results_file_object.close + default_formatter_name = ( + "lldbsuite.test_event.formatter.xunit.XunitFormatter") + elif config.port: + # Connect to the specified localhost port. + results_file_object, cleanup_func = create_socket(config.port) + default_formatter_name = ( + "lldbsuite.test_event.formatter.pickled.RawPickledFormatter") + + # If we have a results formatter name specified and we didn't specify + # a results file, we should use stdout. + if config.formatter_name is not None and results_file_object is None: + # Use stdout. + results_file_object = sys.stdout + cleanup_func = None + + if results_file_object: + # We care about the formatter. Choose user-specified or, if + # none specified, use the default for the output type. + if config.formatter_name: + formatter_name = config.formatter_name + else: + formatter_name = default_formatter_name + + # Create an instance of the class. + # First figure out the package/module. + components = formatter_name.split(".") + module = importlib.import_module(".".join(components[:-1])) + + # Create the class name we need to load. + cls = getattr(module, components[-1]) + + # Handle formatter options for the results formatter class. + formatter_arg_parser = cls.arg_parser() + if config.formatter_options and len(config.formatter_options) > 0: + command_line_options = config.formatter_options + else: + command_line_options = [] + + formatter_options = formatter_arg_parser.parse_args( + command_line_options) + + # Create the TestResultsFormatter given the processed options. + results_formatter_object = cls(results_file_object, formatter_options) + + def shutdown_formatter(): + """Shuts down the formatter when it is no longer needed.""" + # Tell the formatter to write out anything it may have + # been saving until the very end (e.g. xUnit results + # can't complete its output until this point). + results_formatter_object.send_terminate_as_needed() + + # And now close out the output file-like object. + if cleanup_func is not None: + cleanup_func() + + return CreatedFormatter( + results_formatter_object, + shutdown_formatter) + else: + return None Index: packages/Python/lldbsuite/test_event/formatter/curses.py =================================================================== --- /dev/null +++ packages/Python/lldbsuite/test_event/formatter/curses.py @@ -0,0 +1,225 @@ +""" + The LLVM Compiler Infrastructure + + This file is distributed under the University of Illinois Open Source + License. See LICENSE.TXT for details. +""" + +from __future__ import absolute_import +from __future__ import print_function + +# System modules +import curses +import datetime +import math +import sys +import time + +# Third-party modules + +# LLDB modules +from lldbsuite.test import lldbcurses + +from . import results_formatter +from ..event_builder import EventBuilder + + +class Curses(results_formatter.ResultsFormatter): + """Receives live results from tests that are running and reports them to the terminal in a curses GUI""" + + def __init__(self, out_file, options): + # Initialize the parent + super(Curses, self).__init__(out_file, options) + self.using_terminal = True + self.have_curses = True + self.initialize_event = None + self.jobs = [None] * 64 + self.job_tests = [None] * 64 + self.results = list() + try: + self.main_window = lldbcurses.intialize_curses() + self.main_window.add_key_action('\t', self.main_window.select_next_first_responder, "Switch between views that can respond to keyboard input") + self.main_window.refresh() + self.job_panel = None + self.results_panel = None + self.status_panel = None + self.info_panel = None + self.hide_status_list = list() + self.start_time = time.time() + except: + self.have_curses = False + lldbcurses.terminate_curses() + self.using_terminal = False + print("Unexpected error:", sys.exc_info()[0]) + raise + + self.line_dict = dict() + # self.events_file = open("/tmp/events.txt", "w") + # self.formatters = list() + # if tee_results_formatter: + # self.formatters.append(tee_results_formatter) + + def status_to_short_str(self, status): + if status == EventBuilder.STATUS_SUCCESS: + return '.' + elif status == EventBuilder.STATUS_FAILURE: + return 'F' + elif status == EventBuilder.STATUS_UNEXPECTED_SUCCESS: + return '?' + elif status == EventBuilder.STATUS_EXPECTED_FAILURE: + return 'X' + elif status == EventBuilder.STATUS_SKIP: + return 'S' + elif status == EventBuilder.STATUS_ERROR: + return 'E' + elif status == EventBuilder.STATUS_TIMEOUT: + return 'T' + elif status == EventBuilder.STATUS_EXPECTED_TIMEOUT: + return 't' + else: + return status + + def show_info_panel(self): + selected_idx = self.results_panel.get_selected_idx() + if selected_idx >= 0 and selected_idx < len(self.results): + if self.info_panel is None: + info_frame = self.results_panel.get_contained_rect(top_inset=10, left_inset=10, right_inset=10, height=30) + self.info_panel = lldbcurses.BoxedPanel(info_frame, "Result Details") + # Add a key action for any key that will hide this panel when any key is pressed + self.info_panel.add_key_action(-1, self.hide_info_panel, 'Hide the info panel') + self.info_panel.top() + else: + self.info_panel.show() + + self.main_window.push_first_responder(self.info_panel) + test_start = self.results[selected_idx][0] + test_result = self.results[selected_idx][1] + self.info_panel.set_line(0, "File: %s" % (test_start['test_filename'])) + self.info_panel.set_line(1, "Test: %s.%s" % (test_start['test_class'], test_start['test_name'])) + self.info_panel.set_line(2, "Time: %s" % (test_result['elapsed_time'])) + self.info_panel.set_line(3, "Status: %s" % (test_result['status'])) + + def hide_info_panel(self): + self.main_window.pop_first_responder(self.info_panel) + self.info_panel.hide() + self.main_window.refresh() + + def toggle_status(self, status): + if status: + # Toggle showing and hiding results whose status matches "status" in "Results" window + if status in self.hide_status_list: + self.hide_status_list.remove(status) + else: + self.hide_status_list.append(status) + self.update_results() + + def update_results(self, update=True): + '''Called after a category of test have been show/hidden to update the results list with + what the user desires to see.''' + self.results_panel.clear(update=False) + for result in self.results: + test_result = result[1] + status = test_result['status'] + if status in self.hide_status_list: + continue + name = test_result['test_class'] + '.' + test_result['test_name'] + self.results_panel.append_line('%s (%6.2f sec) %s' % (self.status_to_short_str(status), test_result['elapsed_time'], name)) + if update: + self.main_window.refresh() + + def handle_event(self, test_event): + with self.lock: + super(Curses, self).handle_event(test_event) + # for formatter in self.formatters: + # formatter.process_event(test_event) + if self.have_curses: + worker_index = -1 + if 'worker_index' in test_event: + worker_index = test_event['worker_index'] + if 'event' in test_event: + check_for_one_key = True + #print(str(test_event), file=self.events_file) + event = test_event['event'] + if self.status_panel: + self.status_panel.update_status('time', str(datetime.timedelta(seconds=math.floor(time.time() - self.start_time)))) + if event == 'test_start': + name = test_event['test_class'] + '.' + test_event['test_name'] + self.job_tests[worker_index] = test_event + if 'pid' in test_event: + line = 'pid: %5d ' % (test_event['pid']) + name + else: + line = name + self.job_panel.set_line(worker_index, line) + self.main_window.refresh() + elif event == 'test_result': + status = test_event['status'] + self.status_panel.increment_status(status) + if 'pid' in test_event: + line = 'pid: %5d ' % (test_event['pid']) + else: + line = '' + self.job_panel.set_line(worker_index, line) + name = test_event['test_class'] + '.' + test_event['test_name'] + elapsed_time = test_event['event_time'] - self.job_tests[worker_index]['event_time'] + if not status in self.hide_status_list: + self.results_panel.append_line('%s (%6.2f sec) %s' % (self.status_to_short_str(status), elapsed_time, name)) + self.main_window.refresh() + # Append the result pairs + test_event['elapsed_time'] = elapsed_time + self.results.append([self.job_tests[worker_index], test_event]) + self.job_tests[worker_index] = '' + elif event == 'job_begin': + self.jobs[worker_index] = test_event + if 'pid' in test_event: + line = 'pid: %5d ' % (test_event['pid']) + else: + line = '' + self.job_panel.set_line(worker_index, line) + elif event == 'job_end': + self.jobs[worker_index] = '' + self.job_panel.set_line(worker_index, '') + elif event == 'initialize': + self.initialize_event = test_event + num_jobs = test_event['worker_count'] + job_frame = self.main_window.get_contained_rect(height=num_jobs+2) + results_frame = self.main_window.get_contained_rect(top_inset=num_jobs+2, bottom_inset=1) + status_frame = self.main_window.get_contained_rect(height=1, top_inset=self.main_window.get_size().h-1) + self.job_panel = lldbcurses.BoxedPanel(frame=job_frame, title="Jobs") + self.results_panel = lldbcurses.BoxedPanel(frame=results_frame, title="Results") + + self.results_panel.add_key_action(curses.KEY_UP, self.results_panel.select_prev , "Select the previous list entry") + self.results_panel.add_key_action(curses.KEY_DOWN, self.results_panel.select_next , "Select the next list entry") + self.results_panel.add_key_action(curses.KEY_HOME, self.results_panel.scroll_begin , "Scroll to the start of the list") + self.results_panel.add_key_action(curses.KEY_END, self.results_panel.scroll_end , "Scroll to the end of the list") + self.results_panel.add_key_action(curses.KEY_ENTER, self.show_info_panel , "Display info for the selected result item") + self.results_panel.add_key_action('.', lambda : self.toggle_status(EventBuilder.STATUS_SUCCESS) , "Toggle showing/hiding tests whose status is 'success'") + self.results_panel.add_key_action('e', lambda : self.toggle_status(EventBuilder.STATUS_ERROR) , "Toggle showing/hiding tests whose status is 'error'") + self.results_panel.add_key_action('f', lambda : self.toggle_status(EventBuilder.STATUS_FAILURE) , "Toggle showing/hiding tests whose status is 'failure'") + self.results_panel.add_key_action('s', lambda : self.toggle_status(EventBuilder.STATUS_SKIP) , "Toggle showing/hiding tests whose status is 'skip'") + self.results_panel.add_key_action('x', lambda : self.toggle_status(EventBuilder.STATUS_EXPECTED_FAILURE) , "Toggle showing/hiding tests whose status is 'expected_failure'") + self.results_panel.add_key_action('?', lambda : self.toggle_status(EventBuilder.STATUS_UNEXPECTED_SUCCESS), "Toggle showing/hiding tests whose status is 'unexpected_success'") + self.status_panel = lldbcurses.StatusPanel(frame=status_frame) + + self.main_window.add_child(self.job_panel) + self.main_window.add_child(self.results_panel) + self.main_window.add_child(self.status_panel) + self.main_window.set_first_responder(self.results_panel) + + self.status_panel.add_status_item(name="time", title="Elapsed", format="%s", width=20, value="0:00:00", update=False) + self.status_panel.add_status_item(name=EventBuilder.STATUS_SUCCESS, title="Success", format="%u", width=20, value=0, update=False) + self.status_panel.add_status_item(name=EventBuilder.STATUS_FAILURE, title="Failure", format="%u", width=20, value=0, update=False) + self.status_panel.add_status_item(name=EventBuilder.STATUS_ERROR, title="Error", format="%u", width=20, value=0, update=False) + self.status_panel.add_status_item(name=EventBuilder.STATUS_SKIP, title="Skipped", format="%u", width=20, value=0, update=True) + self.status_panel.add_status_item(name=EventBuilder.STATUS_EXPECTED_FAILURE, title="Expected Failure", format="%u", width=30, value=0, update=False) + self.status_panel.add_status_item(name=EventBuilder.STATUS_UNEXPECTED_SUCCESS, title="Unexpected Success", format="%u", width=30, value=0, update=False) + self.main_window.refresh() + elif event == 'terminate': + #self.main_window.key_event_loop() + lldbcurses.terminate_curses() + check_for_one_key = False + self.using_terminal = False + # Check for 1 keypress with no delay + + # Check for 1 keypress with no delay + if check_for_one_key: + self.main_window.key_event_loop(0, 1) Index: packages/Python/lldbsuite/test_event/formatter/dump_formatter.py =================================================================== --- /dev/null +++ packages/Python/lldbsuite/test_event/formatter/dump_formatter.py @@ -0,0 +1,23 @@ +""" + The LLVM Compiler Infrastructure + +This file is distributed under the University of Illinois Open Source +License. See LICENSE.TXT for details. +""" + +from __future__ import print_function +from __future__ import absolute_import + +# System modules +import pprint + +# Our modules +from .results_formatter import ResultsFormatter + + +class DumpFormatter(ResultsFormatter): + """Formats events to the file as their raw python dictionary format.""" + + def handle_event(self, test_event): + super(DumpFormatter, self).handle_event(test_event) + self.out_file.write("\n" + pprint.pformat(test_event) + "\n") Index: packages/Python/lldbsuite/test_event/formatter/pickled.py =================================================================== --- /dev/null +++ packages/Python/lldbsuite/test_event/formatter/pickled.py @@ -0,0 +1,57 @@ +""" + The LLVM Compiler Infrastructure + +This file is distributed under the University of Illinois Open Source +License. See LICENSE.TXT for details. +""" + +from __future__ import print_function +from __future__ import absolute_import + +# System modules +import os + +# Our modules +from .results_formatter import ResultsFormatter +from six.moves import cPickle + + +class RawPickledFormatter(ResultsFormatter): + """Formats events as a pickled stream. + + The parallel test runner has inferiors pickle their results and send them + over a socket back to the parallel test. The parallel test runner then + aggregates them into the final results formatter (e.g. xUnit). + """ + + @classmethod + def arg_parser(cls): + """@return arg parser used to parse formatter-specific options.""" + parser = super(RawPickledFormatter, cls).arg_parser() + return parser + + def __init__(self, out_file, options): + super(RawPickledFormatter, self).__init__(out_file, options) + self.pid = os.getpid() + + def handle_event(self, test_event): + super(RawPickledFormatter, self).handle_event(test_event) + + # Convert initialize/terminate events into job_begin/job_end events. + event_type = test_event["event"] + if event_type is None: + return + + if event_type == "initialize": + test_event["event"] = "job_begin" + elif event_type == "terminate": + test_event["event"] = "job_end" + + # Tack on the pid. + test_event["pid"] = self.pid + + # Send it as {serialized_length_of_serialized_bytes}{serialized_bytes} + import struct + msg = cPickle.dumps(test_event) + packet = struct.pack("!I%ds" % len(msg), len(msg), msg) + self.out_file.send(packet) Index: packages/Python/lldbsuite/test_event/formatter/results_formatter.py =================================================================== --- /dev/null +++ packages/Python/lldbsuite/test_event/formatter/results_formatter.py @@ -0,0 +1,716 @@ +""" + The LLVM Compiler Infrastructure + +This file is distributed under the University of Illinois Open Source +License. See LICENSE.TXT for details. + +Provides classes used by the test results reporting infrastructure +within the LLDB test suite. +""" + +from __future__ import print_function +from __future__ import absolute_import + +# System modules +import argparse +import os +import sys +import threading + +# Third-party modules + + +# LLDB modules +from lldbsuite.test import configuration +from ..event_builder import EventBuilder + +import lldbsuite + + +class ResultsFormatter(object): + """Provides interface to formatting test results out to a file-like object. + + This class allows the LLDB test framework's raw test-related + events to be processed and formatted in any manner desired. + Test events are represented by python dictionaries, formatted + as in the EventBuilder class above. + + ResultFormatter instances are given a file-like object in which + to write their results. + + ResultFormatter lifetime looks like the following: + + # The result formatter is created. + # The argparse options dictionary is generated from calling + # the SomeResultFormatter.arg_parser() with the options data + # passed to dotest.py via the "--results-formatter-options" + # argument. See the help on that for syntactic requirements + # on getting that parsed correctly. + formatter = SomeResultFormatter(file_like_object, argparse_options_dict) + + # Single call to session start, before parsing any events. + formatter.begin_session() + + formatter.handle_event({"event":"initialize",...}) + + # Zero or more calls specified for events recorded during the test session. + # The parallel test runner manages getting results from all the inferior + # dotest processes, so from a new format perspective, don't worry about + # that. The formatter will be presented with a single stream of events + # sandwiched between a single begin_session()/end_session() pair in the + # parallel test runner process/thread. + for event in zero_or_more_test_events(): + formatter.handle_event(event) + + # Single call to terminate/wrap-up. For formatters that need all the + # data before they can print a correct result (e.g. xUnit/JUnit), + # this is where the final report can be generated. + formatter.handle_event({"event":"terminate",...}) + + It is not the formatter's responsibility to close the file_like_object. + (i.e. do not close it). + + The lldb test framework passes these test events in real time, so they + arrive as they come in. + + In the case of the parallel test runner, the dotest inferiors + add a 'pid' field to the dictionary that indicates which inferior + pid generated the event. + + Note more events may be added in the future to support richer test + reporting functionality. One example: creating a true flaky test + result category so that unexpected successes really mean the test + is marked incorrectly (either should be marked flaky, or is indeed + passing consistently now and should have the xfail marker + removed). In this case, a flaky_success and flaky_fail event + likely will be added to capture these and support reporting things + like percentages of flaky test passing so we can see if we're + making some things worse/better with regards to failure rates. + + Another example: announcing all the test methods that are planned + to be run, so we can better support redo operations of various kinds + (redo all non-run tests, redo non-run tests except the one that + was running [perhaps crashed], etc.) + + Implementers are expected to override all the public methods + provided in this class. See each method's docstring to see + expectations about when the call should be chained. + + """ + @classmethod + def arg_parser(cls): + """@return arg parser used to parse formatter-specific options.""" + parser = argparse.ArgumentParser( + description='{} options'.format(cls.__name__), + usage=('dotest.py --results-formatter-options=' + '"--option1 value1 [--option2 value2 [...]]"')) + parser.add_argument( + "--dump-results", + action="store_true", + help=('dump the raw results data after printing ' + 'the summary output.')) + return parser + + def __init__(self, out_file, options): + super(ResultsFormatter, self).__init__() + self.out_file = out_file + self.options = options + self.using_terminal = False + if not self.out_file: + raise Exception("ResultsFormatter created with no file object") + self.start_time_by_test = {} + self.terminate_called = False + + # Store counts of test_result events by status. + self.result_status_counts = { + EventBuilder.STATUS_SUCCESS: 0, + EventBuilder.STATUS_EXPECTED_FAILURE: 0, + EventBuilder.STATUS_EXPECTED_TIMEOUT: 0, + EventBuilder.STATUS_SKIP: 0, + EventBuilder.STATUS_UNEXPECTED_SUCCESS: 0, + EventBuilder.STATUS_FAILURE: 0, + EventBuilder.STATUS_ERROR: 0, + EventBuilder.STATUS_TIMEOUT: 0, + EventBuilder.STATUS_EXCEPTIONAL_EXIT: 0 + } + + # Track the most recent test start event by worker index. + # We'll use this to assign TIMEOUT and exceptional + # exits to the most recent test started on a given + # worker index. + self.started_tests_by_worker = {} + + # Store the most recent test_method/job status. + self.result_events = {} + + # Track the number of test method reruns. + self.test_method_rerun_count = 0 + + # Lock that we use while mutating inner state, like the + # total test count and the elements. We minimize how + # long we hold the lock just to keep inner state safe, not + # entirely consistent from the outside. + self.lock = threading.RLock() + + # Keeps track of the test base filenames for tests that + # are expected to timeout. If a timeout occurs in any test + # basename that matches this list, that result should be + # converted into a non-issue. We'll create an expected + # timeout test status for this. + self.expected_timeouts_by_basename = set() + + # Tests which have reported that they are expecting to fail. These will + # be marked as expected failures even if they return a failing status, + # probably because they crashed or deadlocked. + self.expected_failures = set() + + # Keep track of rerun-eligible tests. + # This is a set that contains tests saved as: + # {test_filename}:{test_class}:{test_name} + self.rerun_eligible_tests = set() + + # A dictionary of test files that had a failing + # test, in the format of: + # key = test path, value = array of test methods that need rerun + self.tests_for_rerun = {} + + @classmethod + def _make_key(cls, result_event): + """Creates a key from a test or job result event. + + This key attempts to be as unique as possible. For + test result events, it will be unique per test method. + For job events (ones not promoted to a test result event), + it will be unique per test case file. + + @return a string-based key of the form + {test_filename}:{test_class}.{test_name} + """ + if result_event is None: + return None + component_count = 0 + if "test_filename" in result_event: + key = result_event["test_filename"] + component_count += 1 + else: + key = "" + if "test_class" in result_event: + if component_count > 0: + key += ":" + key += result_event["test_class"] + component_count += 1 + if "test_name" in result_event: + if component_count > 0: + key += "." + key += result_event["test_name"] + component_count += 1 + return key + + def _mark_test_as_expected_failure(self, test_result_event): + key = self._make_key(test_result_event) + if key is not None: + self.expected_failures.add(key) + else: + sys.stderr.write( + "\nerror: test marked as expected failure but " + "failed to create key.\n") + + def _mark_test_for_rerun_eligibility(self, test_result_event): + key = self._make_key(test_result_event) + if key is not None: + self.rerun_eligible_tests.add(key) + else: + sys.stderr.write( + "\nerror: test marked for re-run eligibility but " + "failed to create key.\n") + + def _maybe_add_test_to_rerun_list(self, result_event): + key = self._make_key(result_event) + if key is not None: + if (key in self.rerun_eligible_tests or + configuration.rerun_all_issues): + test_filename = result_event.get("test_filename", None) + if test_filename is not None: + test_name = result_event.get("test_name", None) + if test_filename not in self.tests_for_rerun: + self.tests_for_rerun[test_filename] = [] + if test_name is not None: + self.tests_for_rerun[test_filename].append(test_name) + else: + sys.stderr.write( + "\nerror: couldn't add testrun-failing test to rerun " + "list because no eligibility key could be created.\n") + + def _maybe_remap_job_result_event(self, test_event): + """Remaps timeout/exceptional exit job results to last test method running. + + @param test_event the job_result test event. This is an in/out + parameter. It will be modified if it can be mapped to a test_result + of the same status, using details from the last-running test method + known to be most recently started on the same worker index. + """ + test_start = None + + job_status = test_event["status"] + if job_status in [ + EventBuilder.STATUS_TIMEOUT, + EventBuilder.STATUS_EXCEPTIONAL_EXIT]: + worker_index = test_event.get("worker_index", None) + if worker_index is not None: + test_start = self.started_tests_by_worker.get( + worker_index, None) + + # If we have a test start to remap, do it here. + if test_start is not None: + test_event["event"] = EventBuilder.TYPE_TEST_RESULT + + # Fill in all fields from test start not present in + # job status message. + for (start_key, start_value) in test_start.items(): + if start_key not in test_event: + test_event[start_key] = start_value + + def _maybe_remap_expected_timeout(self, event): + if event is None: + return + + status = event.get("status", None) + if status is None or status != EventBuilder.STATUS_TIMEOUT: + return + + # Check if the timeout test's basename is in the expected timeout + # list. If so, convert to an expected timeout. + basename = os.path.basename(event.get("test_filename", "")) + if basename in self.expected_timeouts_by_basename: + # Convert to an expected timeout. + event["status"] = EventBuilder.STATUS_EXPECTED_TIMEOUT + + def _maybe_remap_expected_failure(self, event): + if event is None: + return + + key = self._make_key(event) + if key not in self.expected_failures: + return + + status = event.get("status", None) + if status in EventBuilder.TESTRUN_ERROR_STATUS_VALUES: + event["status"] = EventBuilder.STATUS_EXPECTED_FAILURE + elif status == EventBuilder.STATUS_SUCCESS: + event["status"] = EventBuilder.STATUS_UNEXPECTED_SUCCESS + + def handle_event(self, test_event): + """Handles the test event for collection into the formatter output. + + Derived classes may override this but should call down to this + implementation first. + + @param test_event the test event as formatted by one of the + event_for_* calls. + """ + with self.lock: + # Keep track of whether terminate was received. We do this so + # that a process can call the 'terminate' event on its own, to + # close down a formatter at the appropriate time. Then the + # atexit() cleanup can call the "terminate if it hasn't been + # called yet". + if test_event is not None: + event_type = test_event.get("event", "") + # We intentionally allow event_type to be checked anew + # after this check below since this check may rewrite + # the event type + if event_type == EventBuilder.TYPE_JOB_RESULT: + # Possibly convert the job status (timeout, exceptional exit) + # to an appropriate test_result event. + self._maybe_remap_job_result_event(test_event) + event_type = test_event.get("event", "") + + # Remap timeouts to expected timeouts. + if event_type in EventBuilder.RESULT_TYPES: + self._maybe_remap_expected_timeout(test_event) + self._maybe_remap_expected_failure(test_event) + event_type = test_event.get("event", "") + + if event_type == "terminate": + self.terminate_called = True + elif event_type in EventBuilder.RESULT_TYPES: + # Keep track of event counts per test/job result status type. + # The only job (i.e. inferior process) results that make it + # here are ones that cannot be remapped to the most recently + # started test for the given worker index. + status = test_event["status"] + self.result_status_counts[status] += 1 + # Clear the most recently started test for the related worker. + worker_index = test_event.get("worker_index", None) + if worker_index is not None: + self.started_tests_by_worker.pop(worker_index, None) + + if status in EventBuilder.TESTRUN_ERROR_STATUS_VALUES: + # A test/job status value in any of those status values + # causes a testrun failure. If such a test fails, check + # whether it can be rerun. If it can be rerun, add it + # to the rerun job. + self._maybe_add_test_to_rerun_list(test_event) + + # Build the test key. + test_key = self._make_key(test_event) + if test_key is None: + raise Exception( + "failed to find test filename for " + "test event {}".format(test_event)) + + # Save the most recent test event for the test key. + # This allows a second test phase to overwrite the most + # recent result for the test key (unique per method). + # We do final reporting at the end, so we'll report based + # on final results. + # We do this so that a re-run caused by, perhaps, the need + # to run a low-load, single-worker test run can have the final + # run's results to always be used. + if test_key in self.result_events: + # We are replacing the result of something that was + # already counted by the base class. Remove the double + # counting by reducing by one the count for the test + # result status. + old_status = self.result_events[test_key]["status"] + self.result_status_counts[old_status] -= 1 + self.test_method_rerun_count += 1 + self.result_events[test_key] = test_event + elif event_type == EventBuilder.TYPE_TEST_START: + # Track the start time for the test method. + self.track_start_time( + test_event["test_class"], + test_event["test_name"], + test_event["event_time"]) + # Track of the most recent test method start event + # for the related worker. This allows us to figure + # out whether a process timeout or exceptional exit + # can be charged (i.e. assigned) to a test method. + worker_index = test_event.get("worker_index", None) + if worker_index is not None: + self.started_tests_by_worker[worker_index] = test_event + + elif event_type == EventBuilder.TYPE_MARK_TEST_RERUN_ELIGIBLE: + self._mark_test_for_rerun_eligibility(test_event) + elif event_type == EventBuilder.TYPE_MARK_TEST_EXPECTED_FAILURE: + self._mark_test_as_expected_failure(test_event) + + def set_expected_timeouts_by_basename(self, basenames): + """Specifies a list of test file basenames that are allowed to timeout + without being called out as a timeout issue. + + These fall into a new status category called STATUS_EXPECTED_TIMEOUT. + """ + if basenames is not None: + for basename in basenames: + self.expected_timeouts_by_basename.add(basename) + + def track_start_time(self, test_class, test_name, start_time): + """tracks the start time of a test so elapsed time can be computed. + + this alleviates the need for test results to be processed serially + by test. it will save the start time for the test so that + elapsed_time_for_test() can compute the elapsed time properly. + """ + if test_class is None or test_name is None: + return + + test_key = "{}.{}".format(test_class, test_name) + self.start_time_by_test[test_key] = start_time + + def elapsed_time_for_test(self, test_class, test_name, end_time): + """returns the elapsed time for a test. + + this function can only be called once per test and requires that + the track_start_time() method be called sometime prior to calling + this method. + """ + if test_class is None or test_name is None: + return -2.0 + + test_key = "{}.{}".format(test_class, test_name) + if test_key not in self.start_time_by_test: + return -1.0 + else: + start_time = self.start_time_by_test[test_key] + del self.start_time_by_test[test_key] + return end_time - start_time + + def is_using_terminal(self): + """returns true if this results formatter is using the terminal and + output should be avoided.""" + return self.using_terminal + + def send_terminate_as_needed(self): + """sends the terminate event if it hasn't been received yet.""" + if not self.terminate_called: + terminate_event = EventBuilder.bare_event("terminate") + self.handle_event(terminate_event) + + # Derived classes may require self access + # pylint: disable=no-self-use + # noinspection PyMethodMayBeStatic,PyMethodMayBeStatic + def replaces_summary(self): + """Returns whether the results formatter includes a summary + suitable to replace the old lldb test run results. + + @return True if the lldb test runner can skip its summary + generation when using this results formatter; False otherwise. + """ + return False + + def counts_by_test_result_status(self, status): + """Returns number of test method results for the given status. + + @status_result a test result status (e.g. success, fail, skip) + as defined by the EventBuilder.STATUS_* class members. + + @return an integer returning the number of test methods matching + the given test result status. + """ + return self.result_status_counts[status] + + @classmethod + def _event_sort_key(cls, event): + """Returns the sort key to be used for a test event. + + This method papers over the differences in a test method result vs. a + job (i.e. inferior process) result. + + @param event a test result or job result event. + @return a key useful for sorting events by name (test name preferably, + then by test filename). + """ + if "test_name" in event: + return event["test_name"] + else: + return event.get("test_filename", None) + + def _partition_results_by_status(self, categories): + """Partitions the captured test results by event status. + + This permits processing test results by the category ids. + + @param categories the list of categories on which to partition. + Follows the format described in _report_category_details(). + + @return a dictionary where each key is the test result status, + and each entry is a list containing all the test result events + that matched that test result status. Result status IDs with + no matching entries will have a zero-length list. + """ + partitioned_events = {} + for category in categories: + result_status_id = category[0] + matching_events = [ + [key, event] for (key, event) in self.result_events.items() + if event.get("status", "") == result_status_id] + partitioned_events[result_status_id] = sorted( + matching_events, + key=lambda x: self._event_sort_key(x[1])) + return partitioned_events + + @staticmethod + def _print_banner(out_file, banner_text): + """Prints an ASCII banner around given text. + + Output goes to the out file for the results formatter. + + @param out_file a file-like object where output will be written. + @param banner_text the text to display, with a banner + of '=' around the line above and line below. + """ + banner_separator = "".ljust(len(banner_text), "=") + + out_file.write("\n{}\n{}\n{}\n".format( + banner_separator, + banner_text, + banner_separator)) + + def _print_summary_counts( + self, out_file, categories, result_events_by_status, extra_rows): + """Prints summary counts for all categories. + + @param out_file a file-like object used to print output. + + @param categories the list of categories on which to partition. + Follows the format described in _report_category_details(). + + @param result_events_by_status the partitioned list of test + result events in a dictionary, with the key set to the test + result status id and the value set to the list of test method + results that match the status id. + """ + + # Get max length for category printed name + category_with_max_printed_name = max( + categories, key=lambda x: len(x[1])) + max_category_name_length = len(category_with_max_printed_name[1]) + + # If we are provided with extra rows, consider these row name lengths. + if extra_rows is not None: + for row in extra_rows: + name_length = len(row[0]) + if name_length > max_category_name_length: + max_category_name_length = name_length + + self._print_banner(out_file, "Test Result Summary") + + # Prepend extra rows + if extra_rows is not None: + for row in extra_rows: + extra_label = "{}:".format(row[0]).ljust( + max_category_name_length + 1) + out_file.write("{} {:4}\n".format(extra_label, row[1])) + + for category in categories: + result_status_id = category[0] + result_label = "{}:".format(category[1]).ljust( + max_category_name_length + 1) + count = len(result_events_by_status[result_status_id]) + out_file.write("{} {:4}\n".format( + result_label, + count)) + + @classmethod + def _has_printable_details(cls, categories, result_events_by_status): + """Returns whether there are any test result details that need to be printed. + + This will spin through the results and see if any result in a category + that is printable has any results to print. + + @param categories the list of categories on which to partition. + Follows the format described in _report_category_details(). + + @param result_events_by_status the partitioned list of test + result events in a dictionary, with the key set to the test + result status id and the value set to the list of test method + results that match the status id. + + @return True if there are any details (i.e. test results + for failures, errors, unexpected successes); False otherwise. + """ + for category in categories: + result_status_id = category[0] + print_matching_tests = category[2] + if print_matching_tests: + if len(result_events_by_status[result_status_id]) > 0: + # We found a printable details test result status + # that has details to print. + return True + # We didn't find any test result category with printable + # details. + return False + + @staticmethod + def _report_category_details(out_file, category, result_events_by_status): + """Reports all test results matching the given category spec. + + @param out_file a file-like object used to print output. + + @param category a category spec of the format [test_event_name, + printed_category_name, print_matching_entries?] + + @param result_events_by_status the partitioned list of test + result events in a dictionary, with the key set to the test + result status id and the value set to the list of test method + results that match the status id. + """ + result_status_id = category[0] + print_matching_tests = category[2] + detail_label = category[3] + + if print_matching_tests: + # Sort by test name + for (_, event) in result_events_by_status[result_status_id]: + # Convert full test path into test-root-relative. + test_relative_path = os.path.relpath( + os.path.realpath(event["test_filename"]), + lldbsuite.lldb_test_root) + + # Create extra info component (used for exceptional exit info) + if result_status_id == EventBuilder.STATUS_EXCEPTIONAL_EXIT: + extra_info = "[EXCEPTIONAL EXIT {} ({})] ".format( + event["exception_code"], + event["exception_description"]) + else: + extra_info = "" + + # Figure out the identity we will use for this test. + if configuration.verbose and ("test_class" in event): + test_id = "{}.{}".format( + event["test_class"], event["test_name"]) + elif "test_name" in event: + test_id = event["test_name"] + else: + test_id = "" + + # Display the info. + out_file.write("{}: {}{} ({})\n".format( + detail_label, + extra_info, + test_id, + test_relative_path)) + + def print_results(self, out_file): + """Writes the test result report to the output file. + + @param out_file a file-like object used for printing summary + results. This is different than self.out_file, which might + be something else for non-summary data. + """ + extra_results = [ + # Total test methods processed, excluding reruns. + ["Test Methods", len(self.result_events)], + ["Reruns", self.test_method_rerun_count]] + + # Output each of the test result entries. + categories = [ + # result id, printed name, print matching tests?, detail label + [EventBuilder.STATUS_SUCCESS, + "Success", False, None], + [EventBuilder.STATUS_EXPECTED_FAILURE, + "Expected Failure", False, None], + [EventBuilder.STATUS_FAILURE, + "Failure", True, "FAIL"], + [EventBuilder.STATUS_ERROR, + "Error", True, "ERROR"], + [EventBuilder.STATUS_EXCEPTIONAL_EXIT, + "Exceptional Exit", True, "ERROR"], + [EventBuilder.STATUS_UNEXPECTED_SUCCESS, + "Unexpected Success", True, "UNEXPECTED SUCCESS"], + [EventBuilder.STATUS_SKIP, "Skip", False, None], + [EventBuilder.STATUS_TIMEOUT, + "Timeout", True, "TIMEOUT"], + [EventBuilder.STATUS_EXPECTED_TIMEOUT, + # Intentionally using the unusual hyphenation in TIME-OUT to + # prevent buildbots from thinking it is an issue when scanning + # for TIMEOUT. + "Expected Timeout", True, "EXPECTED TIME-OUT"] + ] + + # Partition all the events by test result status + result_events_by_status = self._partition_results_by_status( + categories) + + # Print the details + have_details = self._has_printable_details( + categories, result_events_by_status) + if have_details: + self._print_banner(out_file, "Issue Details") + for category in categories: + self._report_category_details( + out_file, category, result_events_by_status) + + # Print the summary + self._print_summary_counts( + out_file, categories, result_events_by_status, extra_results) + + if self.options.dump_results: + # Debug dump of the key/result info for all categories. + self._print_banner(out_file, "Results Dump") + for status, events_by_key in result_events_by_status.items(): + out_file.write("\nSTATUS: {}\n".format(status)) + for key, event in events_by_key: + out_file.write("key: {}\n".format(key)) + out_file.write("event: {}\n".format(event)) Index: packages/Python/lldbsuite/test_event/formatter/xunit.py =================================================================== --- /dev/null +++ packages/Python/lldbsuite/test_event/formatter/xunit.py @@ -0,0 +1,564 @@ +""" + The LLVM Compiler Infrastructure + +This file is distributed under the University of Illinois Open Source +License. See LICENSE.TXT for details. + +Provides an xUnit ResultsFormatter for integrating the LLDB +test suite with the Jenkins xUnit aggregator and other xUnit-compliant +test output processors. +""" +from __future__ import absolute_import +from __future__ import print_function + +# System modules +import re +import sys +import xml.sax.saxutils + +# Third-party modules +import six + +# Local modules +from ..event_builder import EventBuilder +from .results_formatter import ResultsFormatter + + +class XunitFormatter(ResultsFormatter): + """Provides xUnit-style formatted output. + """ + + # Result mapping arguments + RM_IGNORE = 'ignore' + RM_SUCCESS = 'success' + RM_FAILURE = 'failure' + RM_PASSTHRU = 'passthru' + + @staticmethod + def _build_illegal_xml_regex(): + """Constructs a regex to match all illegal xml characters. + + Expects to be used against a unicode string.""" + # Construct the range pairs of invalid unicode characters. + illegal_chars_u = [ + (0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), (0x7F, 0x84), + (0x86, 0x9F), (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)] + + # For wide builds, we have more. + if sys.maxunicode >= 0x10000: + illegal_chars_u.extend( + [(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF), (0x3FFFE, 0x3FFFF), + (0x4FFFE, 0x4FFFF), (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF), + (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF), (0x9FFFE, 0x9FFFF), + (0xAFFFE, 0xAFFFF), (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF), + (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF), (0xFFFFE, 0xFFFFF), + (0x10FFFE, 0x10FFFF)]) + + # Build up an array of range expressions. + illegal_ranges = [ + "%s-%s" % (six.unichr(low), six.unichr(high)) + for (low, high) in illegal_chars_u] + + # Compile the regex + return re.compile(six.u('[%s]') % six.u('').join(illegal_ranges)) + + @staticmethod + def _quote_attribute(text): + """Returns the given text in a manner safe for usage in an XML attribute. + + @param text the text that should appear within an XML attribute. + @return the attribute-escaped version of the input text. + """ + return xml.sax.saxutils.quoteattr(text) + + def _replace_invalid_xml(self, str_or_unicode): + """Replaces invalid XML characters with a '?'. + + @param str_or_unicode a string to replace invalid XML + characters within. Can be unicode or not. If not unicode, + assumes it is a byte string in utf-8 encoding. + + @returns a utf-8-encoded byte string with invalid + XML replaced with '?'. + """ + # Get the content into unicode + if isinstance(str_or_unicode, str): + unicode_content = str_or_unicode.decode('utf-8') + else: + unicode_content = str_or_unicode + return self.invalid_xml_re.sub( + six.u('?'), unicode_content).encode('utf-8') + + @classmethod + def arg_parser(cls): + """@return arg parser used to parse formatter-specific options.""" + parser = super(XunitFormatter, cls).arg_parser() + + # These are valid choices for results mapping. + results_mapping_choices = [ + XunitFormatter.RM_IGNORE, + XunitFormatter.RM_SUCCESS, + XunitFormatter.RM_FAILURE, + XunitFormatter.RM_PASSTHRU] + parser.add_argument( + "--assert-on-unknown-events", + action="store_true", + help=('cause unknown test events to generate ' + 'a python assert. Default is to ignore.')) + parser.add_argument( + "--ignore-skip-name", + "-n", + metavar='PATTERN', + action="append", + dest='ignore_skip_name_patterns', + help=('a python regex pattern, where ' + 'any skipped test with a test method name where regex ' + 'matches (via search) will be ignored for xUnit test ' + 'result purposes. Can be specified multiple times.')) + parser.add_argument( + "--ignore-skip-reason", + "-r", + metavar='PATTERN', + action="append", + dest='ignore_skip_reason_patterns', + help=('a python regex pattern, where ' + 'any skipped test with a skip reason where the regex ' + 'matches (via search) will be ignored for xUnit test ' + 'result purposes. Can be specified multiple times.')) + parser.add_argument( + "--xpass", action="store", choices=results_mapping_choices, + default=XunitFormatter.RM_FAILURE, + help=('specify mapping from unexpected success to jUnit/xUnit ' + 'result type')) + parser.add_argument( + "--xfail", action="store", choices=results_mapping_choices, + default=XunitFormatter.RM_IGNORE, + help=('specify mapping from expected failure to jUnit/xUnit ' + 'result type')) + return parser + + @staticmethod + def _build_regex_list_from_patterns(patterns): + """Builds a list of compiled regular expressions from option value. + + @param patterns contains a list of regular expression + patterns. + + @return list of compiled regular expressions, empty if no + patterns provided. + """ + regex_list = [] + if patterns is not None: + for pattern in patterns: + regex_list.append(re.compile(pattern)) + return regex_list + + def __init__(self, out_file, options): + """Initializes the XunitFormatter instance. + @param out_file file-like object where formatted output is written. + @param options specifies a dictionary of options for the + formatter. + """ + # Initialize the parent + super(XunitFormatter, self).__init__(out_file, options) + self.text_encoding = "UTF-8" + self.invalid_xml_re = XunitFormatter._build_illegal_xml_regex() + self.total_test_count = 0 + self.ignore_skip_name_regexes = ( + XunitFormatter._build_regex_list_from_patterns( + options.ignore_skip_name_patterns)) + self.ignore_skip_reason_regexes = ( + XunitFormatter._build_regex_list_from_patterns( + options.ignore_skip_reason_patterns)) + + self.elements = { + "successes": [], + "errors": [], + "failures": [], + "skips": [], + "unexpected_successes": [], + "expected_failures": [], + "all": [] + } + + self.status_handlers = { + EventBuilder.STATUS_SUCCESS: self._handle_success, + EventBuilder.STATUS_FAILURE: self._handle_failure, + EventBuilder.STATUS_ERROR: self._handle_error, + EventBuilder.STATUS_SKIP: self._handle_skip, + EventBuilder.STATUS_EXPECTED_FAILURE: + self._handle_expected_failure, + EventBuilder.STATUS_EXPECTED_TIMEOUT: + self._handle_expected_timeout, + EventBuilder.STATUS_UNEXPECTED_SUCCESS: + self._handle_unexpected_success, + EventBuilder.STATUS_EXCEPTIONAL_EXIT: + self._handle_exceptional_exit, + EventBuilder.STATUS_TIMEOUT: + self._handle_timeout + } + + RESULT_TYPES = {EventBuilder.TYPE_TEST_RESULT, EventBuilder.TYPE_JOB_RESULT} + + def handle_event(self, test_event): + super(XunitFormatter, self).handle_event(test_event) + + event_type = test_event["event"] + if event_type is None: + return + + if event_type == "terminate": + # Process all the final result events into their + # XML counterparts. + for result_event in self.result_events.values(): + self._process_test_result(result_event) + self._finish_output() + else: + # This is an unknown event. + if self.options.assert_on_unknown_events: + raise Exception("unknown event type {} from {}\n".format( + event_type, test_event)) + + def _handle_success(self, test_event): + """Handles a test success. + @param test_event the test event to handle. + """ + result = self._common_add_testcase_entry(test_event) + with self.lock: + self.elements["successes"].append(result) + + def _handle_failure(self, test_event): + """Handles a test failure. + @param test_event the test event to handle. + """ + message = self._replace_invalid_xml(test_event["issue_message"]) + backtrace = self._replace_invalid_xml( + "".join(test_event.get("issue_backtrace", []))) + + result = self._common_add_testcase_entry( + test_event, + inner_content=( + ''.format( + XunitFormatter._quote_attribute(test_event["issue_class"]), + XunitFormatter._quote_attribute(message), + backtrace) + )) + with self.lock: + self.elements["failures"].append(result) + + def _handle_error(self, test_event): + """Handles a test error. + @param test_event the test event to handle. + """ + message = self._replace_invalid_xml(test_event["issue_message"]) + backtrace = self._replace_invalid_xml( + "".join(test_event.get("issue_backtrace", []))) + + result = self._common_add_testcase_entry( + test_event, + inner_content=( + ''.format( + XunitFormatter._quote_attribute(test_event["issue_class"]), + XunitFormatter._quote_attribute(message), + backtrace) + )) + with self.lock: + self.elements["errors"].append(result) + + def _handle_exceptional_exit(self, test_event): + """Handles an exceptional exit. + @param test_event the test method or job result event to handle. + """ + if "test_name" in test_event: + name = test_event["test_name"] + else: + name = test_event.get("test_filename", "") + + message_text = "ERROR: {} ({}): {}".format( + test_event.get("exception_code", 0), + test_event.get("exception_description", ""), + name) + message = self._replace_invalid_xml(message_text) + + result = self._common_add_testcase_entry( + test_event, + inner_content=( + ''.format( + "exceptional_exit", + XunitFormatter._quote_attribute(message)) + )) + with self.lock: + self.elements["errors"].append(result) + + def _handle_timeout(self, test_event): + """Handles a test method or job timeout. + @param test_event the test method or job result event to handle. + """ + if "test_name" in test_event: + name = test_event["test_name"] + else: + name = test_event.get("test_filename", "") + + message_text = "TIMEOUT: {}".format(name) + message = self._replace_invalid_xml(message_text) + + result = self._common_add_testcase_entry( + test_event, + inner_content=( + ''.format( + "timeout", + XunitFormatter._quote_attribute(message)) + )) + with self.lock: + self.elements["errors"].append(result) + + @staticmethod + def _ignore_based_on_regex_list(test_event, test_key, regex_list): + """Returns whether to ignore a test event based on patterns. + + @param test_event the test event dictionary to check. + @param test_key the key within the dictionary to check. + @param regex_list a list of zero or more regexes. May contain + zero or more compiled regexes. + + @return True if any o the regex list match based on the + re.search() method; false otherwise. + """ + for regex in regex_list: + match = regex.search(test_event.get(test_key, '')) + if match: + return True + return False + + def _handle_skip(self, test_event): + """Handles a skipped test. + @param test_event the test event to handle. + """ + + # Are we ignoring this test based on test name? + if XunitFormatter._ignore_based_on_regex_list( + test_event, 'test_name', self.ignore_skip_name_regexes): + return + + # Are we ignoring this test based on skip reason? + if XunitFormatter._ignore_based_on_regex_list( + test_event, 'skip_reason', self.ignore_skip_reason_regexes): + return + + # We're not ignoring this test. Process the skip. + reason = self._replace_invalid_xml(test_event.get("skip_reason", "")) + result = self._common_add_testcase_entry( + test_event, + inner_content=''.format( + XunitFormatter._quote_attribute(reason))) + with self.lock: + self.elements["skips"].append(result) + + def _handle_expected_failure(self, test_event): + """Handles a test that failed as expected. + @param test_event the test event to handle. + """ + if self.options.xfail == XunitFormatter.RM_PASSTHRU: + # This is not a natively-supported junit/xunit + # testcase mode, so it might fail a validating + # test results viewer. + if "bugnumber" in test_event: + bug_id_attribute = 'bug-id={} '.format( + XunitFormatter._quote_attribute(test_event["bugnumber"])) + else: + bug_id_attribute = '' + + result = self._common_add_testcase_entry( + test_event, + inner_content=( + ''.format( + bug_id_attribute, + XunitFormatter._quote_attribute( + test_event["issue_class"]), + XunitFormatter._quote_attribute( + test_event["issue_message"])) + )) + with self.lock: + self.elements["expected_failures"].append(result) + elif self.options.xfail == XunitFormatter.RM_SUCCESS: + result = self._common_add_testcase_entry(test_event) + with self.lock: + self.elements["successes"].append(result) + elif self.options.xfail == XunitFormatter.RM_FAILURE: + result = self._common_add_testcase_entry( + test_event, + inner_content=''.format( + XunitFormatter._quote_attribute(test_event["issue_class"]), + XunitFormatter._quote_attribute( + test_event["issue_message"]))) + with self.lock: + self.elements["failures"].append(result) + elif self.options.xfail == XunitFormatter.RM_IGNORE: + pass + else: + raise Exception( + "unknown xfail option: {}".format(self.options.xfail)) + + @staticmethod + def _handle_expected_timeout(test_event): + """Handles expected_timeout. + @param test_event the test event to handle. + """ + # We don't do anything with expected timeouts, not even report. + pass + + def _handle_unexpected_success(self, test_event): + """Handles a test that passed but was expected to fail. + @param test_event the test event to handle. + """ + if self.options.xpass == XunitFormatter.RM_PASSTHRU: + # This is not a natively-supported junit/xunit + # testcase mode, so it might fail a validating + # test results viewer. + result = self._common_add_testcase_entry( + test_event, + inner_content="") + with self.lock: + self.elements["unexpected_successes"].append(result) + elif self.options.xpass == XunitFormatter.RM_SUCCESS: + # Treat the xpass as a success. + result = self._common_add_testcase_entry(test_event) + with self.lock: + self.elements["successes"].append(result) + elif self.options.xpass == XunitFormatter.RM_FAILURE: + # Treat the xpass as a failure. + if "bugnumber" in test_event: + message = "unexpected success (bug_id:{})".format( + test_event["bugnumber"]) + else: + message = "unexpected success (bug_id:none)" + result = self._common_add_testcase_entry( + test_event, + inner_content=''.format( + XunitFormatter._quote_attribute("unexpected_success"), + XunitFormatter._quote_attribute(message))) + with self.lock: + self.elements["failures"].append(result) + elif self.options.xpass == XunitFormatter.RM_IGNORE: + # Ignore the xpass result as far as xUnit reporting goes. + pass + else: + raise Exception("unknown xpass option: {}".format( + self.options.xpass)) + + def _process_test_result(self, test_event): + """Processes the test_event known to be a test result. + + This categorizes the event appropriately and stores the data needed + to generate the final xUnit report. This method skips events that + cannot be represented in xUnit output. + """ + if "status" not in test_event: + raise Exception("test event dictionary missing 'status' key") + + status = test_event["status"] + if status not in self.status_handlers: + raise Exception("test event status '{}' unsupported".format( + status)) + + # Call the status handler for the test result. + self.status_handlers[status](test_event) + + def _common_add_testcase_entry(self, test_event, inner_content=None): + """Registers a testcase result, and returns the text created. + + The caller is expected to manage failure/skip/success counts + in some kind of appropriate way. This call simply constructs + the XML and appends the returned result to the self.all_results + list. + + @param test_event the test event dictionary. + + @param inner_content if specified, gets included in the + inner section, at the point before stdout and stderr would be + included. This is where a , , , etc. + could go. + + @return the text of the xml testcase element. + """ + + # Get elapsed time. + test_class = test_event.get("test_class", "") + test_name = test_event.get("test_name", "") + event_time = test_event["event_time"] + time_taken = self.elapsed_time_for_test( + test_class, test_name, event_time) + + # Plumb in stdout/stderr once we shift over to only test results. + test_stdout = '' + test_stderr = '' + + # Formulate the output xml. + if not inner_content: + inner_content = "" + result = ( + '' + '{}{}{}'.format( + test_class, + test_name, + time_taken, + inner_content, + test_stdout, + test_stderr)) + + # Save the result, update total test count. + with self.lock: + self.total_test_count += 1 + self.elements["all"].append(result) + + return result + + def _finish_output_no_lock(self): + """Flushes out the report of test executions to form valid xml output. + + xUnit output is in XML. The reporting system cannot complete the + formatting of the output without knowing when there is no more input. + This call addresses notification of the completed test run and thus is + when we can finish off the report output. + """ + + # Figure out the counts line for the testsuite. If we have + # been counting either unexpected successes or expected + # failures, we'll output those in the counts, at the risk of + # being invalidated by a validating test results viewer. + # These aren't counted by default so they won't show up unless + # the user specified a formatter option to include them. + xfail_count = len(self.elements["expected_failures"]) + xpass_count = len(self.elements["unexpected_successes"]) + if xfail_count > 0 or xpass_count > 0: + extra_testsuite_attributes = ( + ' expected-failures="{}"' + ' unexpected-successes="{}"'.format(xfail_count, xpass_count)) + else: + extra_testsuite_attributes = "" + + # Output the header. + self.out_file.write( + '\n' + '' + '\n'.format( + self.text_encoding, + "LLDB test suite", + self.total_test_count, + len(self.elements["errors"]), + len(self.elements["failures"]), + len(self.elements["skips"]), + extra_testsuite_attributes)) + + # Output each of the test result entries. + for result in self.elements["all"]: + self.out_file.write(result + '\n') + + # Close off the test suite. + self.out_file.write('\n') + + def _finish_output(self): + """Finish writing output as all incoming events have arrived.""" + with self.lock: + self._finish_output_no_lock()