diff --git a/lnt/server/db/testsuitedb.py b/lnt/server/db/testsuitedb.py index 51fbdda..0d6a64a 100644 --- a/lnt/server/db/testsuitedb.py +++ b/lnt/server/db/testsuitedb.py @@ -1,1212 +1,1248 @@ """ Database models for the TestSuite databases themselves. These are a bit magical because the models themselves are driven by the test suite metadata, so we only create the classes at runtime. """ from __future__ import absolute_import import datetime import json import os import itertools import aniso8601 import sqlalchemy import flask from sqlalchemy import Float, String, Integer, Column, ForeignKey, Binary, DateTime from sqlalchemy.orm import relation from sqlalchemy.orm.exc import ObjectDeletedError from lnt.util import logger from . import testsuite import lnt.testing.profile.profile as profile import lnt from lnt.server.ui.util import convert_revision def _dict_update_abort_on_duplicates(base_dict, to_merge): '''This behaves like base_dict.update(to_merge) but asserts that none of the keys in to_merge is present in base_dict yet.''' for key, value in to_merge.items(): assert base_dict.get(key, None) is None base_dict[key] = value class MachineInfoChanged(ValueError): pass class TestSuiteDB(object): """ Wrapper object for an individual test suites database tables. This wrapper is somewhat special in that it handles specializing the metatable instances for the given test suite. Clients are expected to only access the test suite database tables by going through the model classes constructed by this wrapper object. """ def __init__(self, v4db, name, test_suite): self.v4db = v4db self.name = name self.test_suite = test_suite # Save caches of the various fields. self.machine_fields = list(self.test_suite.machine_fields) self.order_fields = list(self.test_suite.order_fields) self.run_fields = list(self.test_suite.run_fields) self.sample_fields = list(sorted(self.test_suite.sample_fields, key=lambda s: s.schema_index)) self.machine_to_latest_order_cache = {} sample_field_indexes = dict() for i, field in enumerate(self.sample_fields): sample_field_indexes[field.name] = i self.sample_field_indexes = sample_field_indexes self.base = sqlalchemy.ext.declarative.declarative_base() # Create parameterized model classes for this test suite. class ParameterizedMixin(object): # Class variable to allow finding the associated test suite from # model instances. testsuite = self # Class variable (expected to be defined by subclasses) to allow # easy access to the field list for parameterized model classes. fields = None def get_field(self, field): return getattr(self, field.name) def set_field(self, field, value): return setattr(self, field.name, value) def get_fields(self): result = dict() for field in self.fields: value = self.get_field(field) if value is None: continue result[field.name] = value return result def set_fields_pop(self, data_dict): for field in self.fields: value = data_dict.pop(field.name, None) self.set_field(field, value) db_key_name = self.test_suite.db_key_name class Machine(self.base, ParameterizedMixin): __tablename__ = db_key_name + '_Machine' __table_args__ = {'mysql_collate': 'utf8_bin'} DEFAULT_BASELINE_REVISION = v4db.baseline_revision fields = self.machine_fields id = Column("ID", Integer, primary_key=True) name = Column("Name", String(256), index=True) # The parameters blob is used to store any additional information # reported by the run but not promoted into the machine record. # Such data is stored as a JSON encoded blob. parameters_data = Column("Parameters", Binary) # Dynamically create fields for all of the test suite defined # machine fields. class_dict = locals() for item in fields: iname = item.name if iname in class_dict: raise ValueError("test suite defines reserved key %r" % ( iname)) item.column = testsuite.make_machine_column(iname) class_dict[iname] = item.column def __init__(self, name_value): self.id = None self.name = name_value def __repr__(self): return '%s_%s%r' % (db_key_name, self.__class__.__name__, (self.id, self.name)) @property def parameters(self): """dictionary access to the BLOB encoded parameters data""" return dict(json.loads(self.parameters_data)) @parameters.setter def parameters(self, data): self.parameters_data = json.dumps(sorted(data.items())).encode("utf-8") def get_baseline_run(self, session): ts = Machine.testsuite user_baseline = ts.get_users_baseline(session) if user_baseline: return self.get_closest_previously_reported_run( session, user_baseline.order) else: mach_base = Machine.DEFAULT_BASELINE_REVISION # If we have an int, convert it to a proper string. if isinstance(mach_base, int): mach_base = '% 7d' % mach_base return self.get_closest_previously_reported_run( session, ts.Order(llvm_project_revision=mach_base)) def get_closest_previously_reported_run(self, session, order_to_find): """ Find the closest previous run to the requested order, for which this machine also reported. """ # FIXME: Scalability! Pretty fast in practice, but still. ts = Machine.testsuite # Search for best order. best_order = None for order in session.query(ts.Order).\ join(ts.Run).\ filter(ts.Run.machine_id == self.id).distinct(): if order >= order_to_find and \ (best_order is None or order < best_order): best_order = order # Find the most recent run on this machine that used # that order. closest_run = None if best_order: closest_run = session.query(ts.Run)\ .filter(ts.Run.machine_id == self.id)\ .filter(ts.Run.order_id == best_order.id)\ .order_by(ts.Run.start_time.desc()).first() return closest_run def set_from_dict(self, data): data_name = data.pop('name', None) # This function is not meant for renaming. Abort on mismatch. if data_name is not None and data_name != self.name: raise ValueError("Mismatching machine name") data.pop('id', None) self.set_fields_pop(data) self.parameters = data def __json__(self): result = dict() result['name'] = self.name result['id'] = self.id _dict_update_abort_on_duplicates(result, self.get_fields()) _dict_update_abort_on_duplicates(result, self.parameters) return result class Order(self.base, ParameterizedMixin): __tablename__ = db_key_name + '_Order' # We guarantee that our fields are stored in the order they are # supposed to be lexicographically compared, the rich comparison # methods rely on this. fields = sorted(self.order_fields, key=lambda of: of.ordinal) id = Column("ID", Integer, primary_key=True) # Define two common columns which are used to store the previous # and next links for the total ordering amongst run orders. next_order_id = Column("NextOrder", Integer, ForeignKey(id)) previous_order_id = Column("PreviousOrder", Integer, ForeignKey(id)) # This will implicitly create the previous_order relation. backref = sqlalchemy.orm.backref('previous_order', uselist=False, remote_side=id) join = 'Order.previous_order_id==Order.id' next_order = relation("Order", backref=backref, primaryjoin=join, uselist=False) order_name_cache = {} # Dynamically create fields for all of the test suite defined order # fields. class_dict = locals() for item in self.order_fields: if item.name in class_dict: raise ValueError("test suite defines reserved key %r" % ( item.name,)) class_dict[item.name] = item.column = Column( item.name, String(256)) def __init__(self, previous_order_id=None, next_order_id=None, **kwargs): self.previous_order_id = previous_order_id self.next_order_id = next_order_id # Initialize fields (defaulting to None, for now). for item in self.fields: self.set_field(item, kwargs.get(item.name)) def __repr__(self): fields = dict((item.name, self.get_field(item)) for item in self.fields) return '%s_%s(%r, %r, **%r)' % ( db_key_name, self.__class__.__name__, self.previous_order_id, self.next_order_id, fields) def as_ordered_string(self): """Return a readable value of the order object by printing the fields in lexicographic order.""" # If there is only a single field, return it. if len(self.fields) == 1: return self.get_field(self.fields[0]) # Otherwise, print as a tuple of string. return '(%s)' % ( ', '.join(self.get_field(field) for field in self.fields),) @property def name(self): return self.as_ordered_string() def _get_comparison_discriminant(self, b): """Return a representative pair of converted revision from self and b. Order of the element on this pair is the same as the order of self relative to b. """ # SA occasionally uses comparison to check model instances # versus some sentinels, so we ensure we support comparison # against non-instances. if self.__class__ is not b.__class__: return (0, 1) # Pair converted revision from self and b. converted_revisions = map( lambda item: ( convert_revision( self.get_field(item), cache=Order.order_name_cache ), convert_revision( b.get_field(item), cache=Order.order_name_cache ), ), self.fields, ) # Return the first unequal pair, or (0, 0) otherwise. return next( itertools.dropwhile(lambda x: x[0] == x[1], converted_revisions), (0, 0), ) def __hash__(self): converted_fields = map( lambda item: convert_revision( self.get_field(item), cache=Order.order_name_cache ), self.fields, ) return hash(tuple(converted_fields)) def __eq__(self, b): discriminant = self._get_comparison_discriminant(b) return discriminant[0] == discriminant[1] def __ne__(self, b): discriminant = self._get_comparison_discriminant(b) return discriminant[0] != discriminant[1] def __lt__(self, b): discriminant = self._get_comparison_discriminant(b) return discriminant[0] < discriminant[1] def __le__(self, b): discriminant = self._get_comparison_discriminant(b) return discriminant[0] <= discriminant[1] def __gt__(self, b): discriminant = self._get_comparison_discriminant(b) return discriminant[0] > discriminant[1] def __ge__(self, b): discriminant = self._get_comparison_discriminant(b) return discriminant[0] >= discriminant[1] def __json__(self, include_id=True): result = {} if include_id: result['id'] = self.id _dict_update_abort_on_duplicates(result, self.get_fields()) return result class Run(self.base, ParameterizedMixin): __tablename__ = db_key_name + '_Run' fields = self.run_fields id = Column("ID", Integer, primary_key=True) machine_id = Column("MachineID", Integer, ForeignKey(Machine.id), index=True) order_id = Column("OrderID", Integer, ForeignKey(Order.id), index=True) imported_from = Column("ImportedFrom", String(512)) start_time = Column("StartTime", DateTime) end_time = Column("EndTime", DateTime) simple_run_id = Column("SimpleRunID", Integer) # The parameters blob is used to store any additional information # reported by the run but not promoted into the machine record. # Such data is stored as a JSON encoded blob. parameters_data = Column("Parameters", Binary, index=False, unique=False) machine = relation(Machine) order = relation(Order) # Dynamically create fields for all of the test suite defined run # fields. # # FIXME: We are probably going to want to index on some of these, # but need a bit for that in the test suite definition. class_dict = locals() for item in fields: iname = item.name if iname in class_dict: raise ValueError("test suite defines reserved key %r" % (iname,)) item.column = testsuite.make_run_column(iname) class_dict[iname] = item.column def __init__(self, new_id, machine, order, start_time, end_time): self.id = new_id self.machine = machine self.order = order self.start_time = start_time self.end_time = end_time self.imported_from = None def __repr__(self): return '%s_%s%r' % (db_key_name, self.__class__.__name__, (self.id, self.machine, self.order, self.start_time, self.end_time)) @property def parameters(self): """dictionary access to the BLOB encoded parameters data""" return dict(json.loads(self.parameters_data)) @parameters.setter def parameters(self, data): self.parameters_data = json.dumps(sorted(data.items())).encode("utf-8") def __json__(self, flatten_order=True): result = { 'id': self.id, 'start_time': self.start_time, 'end_time': self.end_time, } # Leave out: machine_id, simple_run_id, imported_from if flatten_order: _dict_update_abort_on_duplicates( result, self.order.__json__(include_id=False)) result['order_by'] = \ ','.join([f.name for f in self.order.fields]) result['order_id'] = self.order_id else: result['order_id'] = self.order_id _dict_update_abort_on_duplicates(result, self.get_fields()) _dict_update_abort_on_duplicates(result, self.parameters) return result Machine.runs = relation(Run, back_populates='machine', cascade="all, delete-orphan") Order.runs = relation(Run, back_populates='order', cascade="all, delete-orphan") class Test(self.base, ParameterizedMixin): __tablename__ = db_key_name + '_Test' # utf8_bin for case sensitive compare __table_args__ = {'mysql_collate': 'utf8_bin'} id = Column("ID", Integer, primary_key=True) name = Column("Name", String(256), unique=True, index=True) def __init__(self, name): self.id = None self.name = name def __repr__(self): return '%s_%s%r' % (db_key_name, self.__class__.__name__, (self.id, self.name)) def __json__(self, include_id=True): result = {'name': self.name} if include_id: result['id'] = self.id return result class Profile(self.base): __tablename__ = db_key_name + '_Profile' id = Column("ID", Integer, primary_key=True) created_time = Column("CreatedTime", DateTime) accessed_time = Column("AccessedTime", DateTime) filename = Column("Filename", String(256)) counters = Column("Counters", String(512)) def __init__(self, encoded, config, testid): self.created_time = datetime.datetime.now() self.accessed_time = datetime.datetime.now() if config is not None: profileDir = config.config.profileDir prefix = 't-%s-s-' % os.path.basename(testid) self.filename = \ profile.Profile.saveFromRendered(encoded, profileDir=profileDir, prefix=prefix) p = profile.Profile.fromRendered(encoded) s = ','.join('%s=%s' % (k, v) for k, v in p.getTopLevelCounters().items()) self.counters = s[:512] def getTopLevelCounters(self): d = dict() for i in self.counters.split('='): k, v = i.split(',') d[k] = v return d def load(self, profileDir): return profile.Profile.fromFile(os.path.join(profileDir, self.filename)) class Sample(self.base, ParameterizedMixin): __tablename__ = db_key_name + '_Sample' fields = list( sorted(self.sample_fields, key=lambda x: self.sample_field_indexes[x.name]) ) id = Column("ID", Integer, primary_key=True) # We do not need an index on run_id, this is covered by the # compound (Run(ID),Test(ID)) index we create below. run_id = Column("RunID", Integer, ForeignKey(Run.id), index=True) test_id = Column("TestID", Integer, ForeignKey(Test.id), index=True) profile_id = Column("ProfileID", Integer, ForeignKey(Profile.id)) run = relation(Run) test = relation(Test) profile = relation(Profile) @staticmethod def get_primary_fields(): """ get_primary_fields() -> [SampleField*] Get the primary sample fields (those which are not associated with some other sample field). """ status_fields = set(s.status_field for s in self.Sample.fields if s.status_field is not None) for field in self.Sample.fields: if field not in status_fields: yield field @staticmethod def get_metric_fields(): """ get_metric_fields() -> [SampleField*] Get the sample fields which represent some kind of metric, i.e. those which have a value that can be interpreted as better or worse than other potential values for this field. """ for field in Sample.fields: if field.type.name in ['Real', 'Integer']: yield field @staticmethod def get_hash_of_binary_field(): """ get_hash_of_binary_field() -> SampleField Get the sample field which represents a hash of the binary being tested. This field will compare equal iff two binaries are considered to be identical, e.g. two different compilers producing identical code output. Returns None if such a field isn't available. """ for field in self.Sample.fields: if field.name == 'hash': return field return None # Dynamically create fields for all of the test suite defined # sample fields. # # FIXME: We might want to index some of these, but for a different # reason than above. It is possible worth it to turn the compound # index below into a covering index. We should evaluate this once # the new UI is up. class_dict = locals() for item in self.sample_fields: iname = item.name if iname in class_dict: raise ValueError("test suite defines reserved key %r" % (iname,)) item.column = testsuite.make_sample_column(iname, item.type.name) class_dict[iname] = item.column def __init__(self, run, test, **kwargs): self.id = None self.run = run self.test = test # Initialize sample fields (defaulting to 0, for now). for item in self.fields: self.set_field(item, kwargs.get(item.name, None)) def __repr__(self): fields = dict((item.name, self.get_field(item)) for item in self.fields) return '%s_%s(%r, %r, %r, **%r)' % ( db_key_name, self.__class__.__name__, self.id, self.run, self.test, fields) def __json__(self, flatten_test=False, include_id=True): result = {} if include_id: result['id'] = self.id # Leave out: run_id # TODO: What about profile/profile_id? if flatten_test: _dict_update_abort_on_duplicates( result, self.test.__json__(include_id=False)) else: result['test_id'] = self.test_id _dict_update_abort_on_duplicates(result, self.get_fields()) return result Run.samples = relation(Sample, back_populates='run', cascade="all, delete-orphan") class FieldChange(self.base, ParameterizedMixin): """FieldChange represents a change in between the values of the same field belonging to two samples from consecutive runs. """ __tablename__ = db_key_name + '_FieldChangeV2' id = Column("ID", Integer, primary_key=True) old_value = Column("OldValue", Float) new_value = Column("NewValue", Float) start_order_id = Column("StartOrderID", Integer, ForeignKey(Order.id), index=True) end_order_id = Column("EndOrderID", Integer, ForeignKey(Order.id)) test_id = Column("TestID", Integer, ForeignKey(Test.id)) machine_id = Column("MachineID", Integer, ForeignKey(Machine.id)) field_id = Column("FieldID", Integer, ForeignKey(testsuite.SampleField.id)) # Could be from many runs, but most recent one is interesting. run_id = Column("RunID", Integer, ForeignKey(Run.id)) start_order = relation(Order, primaryjoin='FieldChange.' 'start_order_id==Order.id') end_order = relation(Order, primaryjoin='FieldChange.' 'end_order_id==Order.id') test = relation(Test) machine = relation(Machine) field = relation(testsuite.SampleField) run = relation(Run) def __init__(self, start_order, end_order, machine, test, field_id): self.start_order = start_order self.end_order = end_order self.machine = machine self.test = test self.field_id = field_id def __repr__(self): return '%s_%s%r' % (db_key_name, self.__class__.__name__, (self.start_order, self.end_order, self.test, self.machine, self.field)) def __json__(self): return { 'id': self.id, 'old_value': self.old_value, 'new_value': self.new_value, 'start_order_id': self.start_order_id, 'end_order_id': self.end_order_id, 'test_id': self.test_id, 'machine_id': self.machine_id, 'field_id': self.field_id, 'run_id': self.run_id, } Machine.fieldchanges = relation(FieldChange, back_populates='machine', cascade="all, delete-orphan") Run.fieldchanges = relation(FieldChange, back_populates='run', cascade="all, delete-orphan") class Regression(self.base, ParameterizedMixin): """Regressions hold data about a set of RegressionIndices.""" __tablename__ = db_key_name + '_Regression' id = Column("ID", Integer, primary_key=True) title = Column("Title", String(256), unique=False, index=False) bug = Column("BugLink", String(256), unique=False, index=False) state = Column("State", Integer) def __init__(self, title, bug, state): self.title = title self.bug = bug self.state = state def __repr__(self): """String representation of the Regression for debugging. Sometimes we try to print deleted regressions: in this case don't die, and return a deleted """ try: return '{}_{}:"{}"'.format(db_key_name, self.__class__.__name__, self.title) except ObjectDeletedError: return '{}_{}:"{}"'.format(db_key_name, self.__class__.__name__, "") def __json__(self): return { 'id': self.id, 'title': self.title, 'bug': self.bug, 'state': self.state, } class RegressionIndicator(self.base, ParameterizedMixin): """Relates a regression to a fieldchange.""" __tablename__ = db_key_name + '_RegressionIndicator' id = Column("ID", Integer, primary_key=True) regression_id = Column("RegressionID", Integer, ForeignKey(Regression.id), index=True) field_change_id = Column("FieldChangeID", Integer, ForeignKey(FieldChange.id)) regression = relation(Regression) field_change = relation(FieldChange) def __init__(self, regression, field_change): self.regression = regression self.field_change = field_change def __repr__(self): return '%s_%s%r' % (db_key_name, self.__class__.__name__, (self.id, self.regression, self.field_change)) def __json__(self): return { 'RegressionIndicatorID': self.id, 'Regression': self.regression, 'FieldChange': self.field_change } FieldChange.regression_indicators = \ relation(RegressionIndicator, back_populates='field_change', cascade="all, delete-orphan") class ChangeIgnore(self.base, ParameterizedMixin): """Changes to ignore in the web interface.""" __tablename__ = db_key_name + '_ChangeIgnore' id = Column("ID", Integer, primary_key=True) field_change_id = Column("ChangeIgnoreID", Integer, ForeignKey(FieldChange.id)) field_change = relation(FieldChange) def __init__(self, field_change): self.field_change = field_change def __repr__(self): return '%s_%s%r' % (db_key_name, self.__class__.__name__, (self.id, self.field_change)) class Baseline(self.base, ParameterizedMixin): """Baselines to compare runs to.""" __tablename__ = db_key_name + '_Baseline' __table_args__ = {'mysql_collate': 'utf8_bin'} id = Column("ID", Integer, primary_key=True) name = Column("Name", String(32), unique=True) comment = Column("Comment", String(256)) order_id = Column("OrderID", Integer, ForeignKey(Order.id), index=True) order = relation(Order) def __str__(self): return "Baseline({})".format(self.name) self.Machine = Machine self.Run = Run self.Test = Test self.Profile = Profile self.Sample = Sample self.Order = Order self.FieldChange = FieldChange self.Regression = Regression self.RegressionIndicator = RegressionIndicator self.ChangeIgnore = ChangeIgnore self.Baseline = Baseline # Create the compound index we cannot declare inline. sqlalchemy.schema.Index("ix_%s_Sample_RunID_TestID" % db_key_name, Sample.run_id, Sample.test_id) def create_tables(self, engine): self.base.metadata.create_all(engine) def get_baselines(self, session): return session.query(self.Baseline).all() def get_users_baseline(self, session): try: baseline_key = lnt.server.ui.util.baseline_key(self.name) session_baseline = flask.session.get(baseline_key) except RuntimeError: # Sometimes this is called from outside the app context. # In that case, don't get the user's session baseline. return None if session_baseline: return session.query(self.Baseline).get(session_baseline) return None def _getIncompatibleFields(self, existing_machine, new_machine): incompatible_fields = set() for field in self.machine_fields: existing_value = existing_machine.get_field(field) new_value = new_machine.get_field(field) if new_value is None or existing_value == new_value: continue if existing_value is not None: incompatible_fields.add(field.name) existing_parameters = existing_machine.parameters for key, new_value in new_machine.parameters.items(): existing_value = existing_parameters.get(key, None) if new_value is None or existing_value == new_value: continue if existing_value is not None: incompatible_fields.add(key) return incompatible_fields def _updateMachine(self, existing_machine, new_machine): for field in self.machine_fields: new_value = new_machine.get_field(field) if new_value is None: continue existing_machine.set_field(field, new_value) parameters = existing_machine.parameters for key, new_value in new_machine.parameters.items(): if new_value is None and parameters.get(key, None) is not None: continue parameters[key] = new_value existing_machine.parameters = parameters def _getOrCreateMachine(self, session, machine_data, select_machine): """ _getOrCreateMachine(data, select_machine) -> Machine Add or create (and insert) a Machine record from the given machine data (as recorded by the test interchange format). select_machine strategies: 'match': Abort if the existing machine doesn't match the new machine data. 'update': Update the existing machine in cases where the new machine data doesn't match the existing data. 'split': On parameter mismatch create a new machine with a `$NN` suffix added, or choose an existing compatible machine with such a suffix. """ assert select_machine == 'match' or select_machine == 'update' \ or select_machine == 'split' # Convert the machine data into a machine record. machine_parameters = machine_data.copy() name = machine_parameters.pop('name') machine = self.Machine(name) machine_parameters.pop('id', None) for item in self.machine_fields: value = machine_parameters.pop(item.name, None) machine.set_field(item, value) machine.parameters = machine_parameters # Look for an existing machine. existing_machines = session.query(self.Machine) \ .filter(self.Machine.name == name) \ .order_by(self.Machine.id.desc()) \ .all() # No existing machine? Add one. if len(existing_machines) == 0: session.add(machine) return machine # Search for a compatible machine. existing_machine = None incompatible_fields_0 = [] for m in existing_machines: incompatible_fields = self._getIncompatibleFields(m, machine) if len(incompatible_fields) == 0: existing_machine = m break if len(incompatible_fields_0) == 0: incompatible_fields_0 = incompatible_fields # All existing machines are incompatible? if existing_machine is None: if select_machine == 'split': # Add a new machine. session.add(machine) return machine if select_machine == 'match': raise MachineInfoChanged("'%s' on machine '%s' changed." % (', '.join(incompatible_fields_0), name)) else: assert select_machine == 'update' # Just pick the first and update it below. existing_machine = existing_machines[0] self._updateMachine(existing_machine, machine) return existing_machine def _getOrCreateOrder(self, session, run_parameters): """ _getOrCreateOrder(data) -> Order Add or create (and insert) an Order record based on the given run parameters (as recorded by the test interchange format). The run parameters that define the order will be removed from the provided ddata argument. """ query = session.query(self.Order) order = self.Order() # First, extract all of the specified order fields. for item in self.order_fields: value = run_parameters.pop(item.name, None) if value is None: # We require that all of the order fields be present. raise ValueError("Supplied run is missing parameter: %r" % (item.name)) query = query.filter(item.column == value) order.set_field(item, value) # Execute the query to see if we already have this order. existing = query.first() if existing is not None: return existing # If not, then we need to insert this order into the total ordering # linked list. # Add the new order and commit, to assign an ID. session.add(order) session.commit() # Load all the orders and sort them to form the total ordering. orders = sorted(session.query(self.Order)) # Find the order we just added. index = orders.index(order) # Insert this order into the linked list which forms the total # ordering. if index > 0: previous_order = orders[index - 1] previous_order.next_order_id = order.id order.previous_order_id = previous_order.id if index + 1 < len(orders): next_order = orders[index + 1] next_order.previous_order_id = order.id order.next_order_id = next_order.id return order def _getOrCreateRun(self, session, run_data, machine, merge): """ _getOrCreateRun(session, run_data, machine, merge) -> Run, bool Add a new Run record from the given data (as recorded by the test interchange format). merge comes into play when there is already a run with the same order fields: - 'reject': Reject submission (raise ValueError). - 'replace': Remove the existing submission(s), then add the new one. - 'append': Add new submission. The boolean result indicates whether the returned record was constructed or not. """ # Extra the run parameters that define the order. run_parameters = run_data.copy() # Ignore incoming ids; we will create our own run_parameters.pop('id', None) # Added by REST API, we will replace as well. run_parameters.pop('order_by', None) run_parameters.pop('order_id', None) run_parameters.pop('machine_id', None) run_parameters.pop('imported_from', None) run_parameters.pop('simple_run_id', None) # Find the order record. order = self._getOrCreateOrder(session, run_parameters) new_id = None if merge != 'append': existing_runs = session.query(self.Run) \ .filter(self.Run.machine_id == machine.id) \ .filter(self.Run.order_id == order.id) \ .all() if len(existing_runs) > 0: if merge == 'reject': raise ValueError("Duplicate submission for '%s'" % order.name) elif merge == 'replace': for previous_run in existing_runs: logger.info("Duplicate submission for order %r: " "deleting previous run %r" % (order, previous_run)) # Keep the latest ID so the URL is still valid on replace new_id = previous_run.id session.delete(previous_run) else: raise ValueError('Invalid Run mergeStrategy %r' % merge) # We'd like ISO8061 timestamps, but will also accept the old format. try: start_time = aniso8601.parse_datetime(run_data['start_time']) except ValueError: start_time = datetime.datetime.strptime(run_data['start_time'], "%Y-%m-%d %H:%M:%S") run_parameters.pop('start_time') try: end_time = aniso8601.parse_datetime(run_data['end_time']) except ValueError: end_time = datetime.datetime.strptime(run_data['end_time'], "%Y-%m-%d %H:%M:%S") run_parameters.pop('end_time') run = self.Run(new_id, machine, order, start_time, end_time) # First, extract all of the specified run fields. for item in self.run_fields: value = run_parameters.pop(item.name, None) run.set_field(item, value) # Any remaining parameters are saved as a JSON encoded array. run.parameters = run_parameters session.add(run) return run def _importSampleValues(self, session, tests_data, run, config): # Load a map of all the tests, which we will extend when we find tests # that need to be added. # Downcast to str, so we match on MySQL. test_cache = dict((test.name, test) for test in session.query(self.Test)) - profiles = dict() field_dict = dict([(f.name, f) for f in self.sample_fields]) all_samples_to_add = [] + is_profile_only = lambda td : len(td) == 2 and 'profile' in td for test_data in tests_data: + if is_profile_only(test_data): + # Ignore for now profile data without other metrics + continue + name = test_data['name'] test = test_cache.get(name) if test is None: test = self.Test(test_data['name']) test_cache[name] = test session.add(test) samples = [] for key, values in test_data.items(): if key == 'name' or key == "id" or key.endswith("_id"): continue field = field_dict.get(key) if field is None and key != 'profile': raise ValueError("test %s: Metric '%s' unknown in suite " % (name, key)) if not isinstance(values, list): values = [values] while len(samples) < len(values): sample = self.Sample(run, test) samples.append(sample) all_samples_to_add.append(sample) for sample, value in zip(samples, values): if key == 'profile': - profile = self.Profile(value, config, name) - sample.profile = profiles.get(hash(value), profile) + sample.profile = self.Profile(value, config, name) else: sample.set_field(field, value) + + for test_data in tests_data: + if not is_profile_only(test_data): + continue + name = test_data['name'] + test = test_cache.get(name) + tests = [test_cache[test_name] for test_name in test_cache \ + if test_name.startswith(name + '.test:')] + if test is not None: + tests.append(test) + + value = test_data['profile'] + new_profile = self.Profile(value, config, name) + count = 0 + for test in tests: + sample_exist = False + for sample in all_samples_to_add: + if sample.test == test: + if sample.profile is None: + sample.profile = new_profile + count += 1 + sample_exist = True + else: + logger.warning('Test %s already contains the profile data. ' \ + 'Profile %s was ignored.', test.name, name) + if not sample_exist: + logger.warning('The test %s is invalid. It contains the profile, ' \ + 'but no any samples. Consider removing it.', test.name) + if count == 0: + logger.warning('Cannot find test(s) for the profile %s', name) + else: + logger.info('The profile %s was added to %d test(s).', name, count) + session.add_all(all_samples_to_add) def importDataFromDict(self, session, data, config, select_machine, merge_run): """ importDataFromDict(session, data, config, select_machine, merge_run) -> Run (or throws ValueError exception) Import a new run from the provided test interchange data, and return the constructed Run record. May throw ValueError exceptions in cases like mismatching machine data or duplicate run submission with merge_run == 'reject'. """ machine = self._getOrCreateMachine(session, data['machine'], select_machine) run = self._getOrCreateRun(session, data['run'], machine, merge_run) self._importSampleValues(session, data['tests'], run, config) return run # Simple query support (mostly used by templates) def machines(self, session, name=None): q = session.query(self.Machine) if name: q = q.filter_by(name=name) return q def getMachine(self, session, id): return session.query(self.Machine).filter_by(id=id).one() def getRun(self, session, id): return session.query(self.Run).filter_by(id=id).one() def get_adjacent_runs_on_machine(self, session, run, N, direction=-1): """ get_adjacent_runs_on_machine(run, N, direction=-1) -> [Run*] Return the N runs which have been submitted to the same machine and are adjacent to the given run. The actual number of runs returned may be greater than N in situations where multiple reports were received for the same order. The runs will be reported starting with the runs closest to the given run's order. The direction must be -1 or 1 and specified whether or not the preceeding or following runs should be returned. """ assert N >= 0, "invalid count" assert direction in (-1, 1), "invalid direction" if N == 0: return [] # The obvious algorithm here is to step through the run orders in the # appropriate direction and yield any runs on the same machine which # were reported at that order. # # However, this has one large problem. In some cases, the gap between # orders reported on that machine may be quite high. This will be # particularly true when a machine has stopped reporting for a while, # for example, as there may be large gap between the largest reported # order and the last order the machine reported at. # # In such cases, we could end up executing a large number of individual # SA object materializations in traversing the order list, which is # very bad. # # We currently solve this by instead finding all the orders reported on # this machine, ordering those programatically, and then iterating over # that. This performs worse (O(N) instead of O(1)) than the obvious # algorithm in the common case but more uniform and significantly # better in the worst cast, and I prefer that response times be # uniform. In practice, this appears to perform fine even for quite # large (~1GB, ~20k runs) databases. # Find all the orders on this machine, then sort them. # # FIXME: Scalability! However, pretty fast in practice, see elaborate # explanation above. all_machine_orders = sorted( session.query(self.Order) .join(self.Run) .filter(self.Run.machine == run.machine) .distinct() .all() ) # Find the index of the current run. index = all_machine_orders.index(run.order) # Gather the next N orders. if direction == -1: orders_to_return = all_machine_orders[max(0, index - N):index] else: orders_to_return = all_machine_orders[index+1:index+N] # Get all the runs for those orders on this machine in a single query. ids_to_fetch = [o.id for o in orders_to_return] if not ids_to_fetch: return [] runs = session.query(self.Run).\ filter(self.Run.machine == run.machine).\ filter(self.Run.order_id.in_(ids_to_fetch)).all() # Sort the result by order, accounting for direction to satisfy our # requirement of returning the runs in adjacency order. # # Even though we already know the right order, this is faster than # issueing separate queries. runs.sort(key=lambda r: r.order, reverse=(direction == -1)) return runs def get_previous_runs_on_machine(self, session, run, N): return self.get_adjacent_runs_on_machine(session, run, N, direction=-1) def get_next_runs_on_machine(self, session, run, N): return self.get_adjacent_runs_on_machine(session, run, N, direction=1) def __repr__(self): return "TestSuiteDB('%s')" % self.name def getNumMachines(self, session): return session.query(sqlalchemy.func.count(self.Machine.id)).scalar() def getNumRuns(self, session): return session.query(sqlalchemy.func.count(self.Run.id)).scalar() def getNumSamples(self, session): return session.query(sqlalchemy.func.count(self.Sample.id)).scalar() def getNumTests(self, session): return session.query(sqlalchemy.func.count(self.Test.id)).scalar() def get_field_index(self, sample_field): return self.sample_field_indexes[sample_field.name] diff --git a/lnt/testing/__init__.py b/lnt/testing/__init__.py index 2b6ec02..5ab1d2c 100644 --- a/lnt/testing/__init__.py +++ b/lnt/testing/__init__.py @@ -1,666 +1,667 @@ """ Utilities for working with the LNT test format. Clients can easily generate LNT test format data by creating Report objects for the runs they wish to submit, and using Report.render to convert them to JSON data suitable for submitting to the server. """ import datetime import json import re from lnt.util import logger # We define the following constants for use as sample values by # convention. PASS = 0 FAIL = 1 XFAIL = 2 def normalize_time(t): if isinstance(t, float): t = datetime.datetime.utcfromtimestamp(t) elif not isinstance(t, datetime.datetime): t = datetime.datetime.strptime(t, '%Y-%m-%d %H:%M:%S') return t.strftime('%Y-%m-%d %H:%M:%S') class Report: """Information on a single testing run. In the LNT test model, every test run should define exactly one machine and run, and any number of test samples. """ def __init__(self, machine, run, tests, report_version=1): """Construct a LNT report file format in the given format version.""" self.machine = machine self.run = run self.tests = list(tests) self.report_version = report_version self.check() def check(self): """Check that object members are adequate to generate an LNT json report file of the version specified at construction when rendering that instance. """ # Check requested report version is supported by this library assert self.report_version <= 2, "Only v2 or older LNT report format supported." assert isinstance(self.machine, Machine), "Unexpected type for machine." assert ( self.machine.report_version == self.report_version ), "Mismatch between machine and report version." assert isinstance(self.run, Run), "Unexpected type for run." assert ( self.run.report_version == self.report_version ), "Mismatch between run and report version." for t in self.tests: if self.report_version == 2: assert isinstance(t, Test), "Unexpected type for test" assert ( t.report_version == self.report_version ), "Mismatch between test and report version." else: assert isinstance(t, TestSamples), "Unexpected type for test samples." def update_report(self, new_tests_samples, end_time=None): """Add extra samples to this report, and update the end time of the run. """ self.check() self.tests.extend(new_tests_samples) self.run.update_endtime(end_time) self.check() def render(self, indent=4): """Return a LNT json report file format of the version specified at construction as a string, where each object is indented by indent spaces compared to its parent. """ if self.report_version == 2: return json.dumps({'format_version': str(self.report_version), 'machine': self.machine.render(), 'run': self.run.render(), 'tests': [t.render() for t in self.tests]}, sort_keys=True, indent=indent) else: return json.dumps({'Machine': self.machine.render(), 'Run': self.run.render(), 'Tests': [t.render() for t in self.tests]}, sort_keys=True, indent=indent) class Machine: """Information on the machine the test was run on. The info dictionary can be used to describe additional information about the machine, for example the hardware resources or the operating environment. Machines entries in the database are uniqued by their name and the entire contents of the info dictionary. """ def __init__(self, name, info={}, report_version=1): self.name = str(name) self.info = dict((str(key), str(value)) for key, value in info.items()) self.report_version = report_version self.check() def check(self): """Check object members are adequate to generate an LNT json report file of the version specified at construction when rendering that instance. """ # Check requested version is supported by this library assert ( self.report_version <= 2 ), "Only v2 or older supported for LNT report format Machine objects." def render(self): """Return info from this instance in a dictionary that respects the LNT report format in the version specified at construction when printed as json. """ if self.report_version == 2: d = dict(self.info) d['Name'] = self.name return d else: return {'Name': self.name, 'Info': self.info} class Run: """Information on the particular test run. At least one parameter must be supplied and is used as ordering among several runs. When generating a report in format 1 or earlier, both start_time and end_time are used for that effect and the current date is used if their value is None. As with Machine, the info dictionary can be used to describe additional information on the run. This dictionary should be used to describe information on the software-under-test that is constant across the test run, for example the revision number being tested. It can also be used to describe information about the current state which could be useful in analysis, for example the current machine load. """ def __init__(self, start_time=None, end_time=None, info={}, report_version=1): if report_version <= 1: if start_time is None: start_time = datetime.datetime.utcnow() if end_time is None: end_time = datetime.datetime.utcnow() self.start_time = normalize_time(start_time) if start_time is not None else None self.end_time = normalize_time(end_time) if end_time is not None else None self.info = dict() # Convert keys/values that are not json encodable to strings. for key, value in info.items(): key = str(key) value = str(value) self.info[key] = value self.report_version = report_version if self.report_version <= 1: if 'tag' not in self.info: raise ValueError("Missing 'tag' entry in 'info' dictionary") if 'run_order' not in self.info: raise ValueError("Missing 'run_order' entry in 'info' dictionary") else: if 'llvm_project_revision' not in self.info: raise ValueError("Missing 'llvm_project_revision' entry in 'info' dictionary") if '__report_version__' in info: raise ValueError("'__report_version__' key is reserved") if report_version == 1: self.info['__report_version__'] = '1' self.check() def check(self): """Check object members are adequate to generate an LNT json report file of the version specified at construction when rendering that instance. """ # Check requested version is supported by this library assert ( self.report_version <= 2 ), "Only v2 or older supported for LNT report format Run objects." if self.start_time is None and self.end_time is None and not bool(self.info): raise ValueError("No data defined in this Run") def update_endtime(self, end_time=None): """Update the end time of this run.""" if self.report_version <= 1 and end_time is None: end_time = datetime.datetime.utcnow() self.end_time = normalize_time(end_time) if end_time else None self.check() def render(self): """Return info from this instance in a dictionary that respects the LNT report format in the version specified at construction when printed as json. """ if self.report_version == 2: d = dict(self.info) if self.start_time is not None: d['start_time'] = self.start_time if self.end_time is not None: d['end_time'] = self.end_time return d else: info = dict(self.info) if self.report_version == 1: info['__report_version__'] = '1' return {'Start Time': self.start_time, 'End Time': self.end_time, 'Info': info} class Test: """Information on a particular test in the run and its associated samples. The server automatically creates test database objects whenever a new test name is seen. Test should be used to generate report in version 2 or later of LNT JSON report file format. Test names are intended to be a persistent, recognizable identifier for what is being executed. Currently, most formats use some form of dotted notation for the test name, and this may become enshrined in the format in the future. In general, the test names should be independent of the software-under-test and refer to some known quantity, for example the software under test. For example, 'CINT2006.403_gcc' is a meaningful test name. The test info dictionary is intended to hold information on the particular permutation of the test that was run. This might include variables specific to the software-under-test . This could include, for example, the compile flags the test was built with, or the runtime parameters that were used. As a general rule, if two test samples are meaningfully and directly comparable, then they should have the same test name but different info paramaters. """ def __init__(self, name, samples, info={}, report_version=2): self.name = name self.samples = samples self.info = dict() # Convert keys/values that are not json encodable to strings. for key, value in info.items(): key = str(key) value = str(value) self.info[key] = value self.report_version = report_version self.check() def check(self): """Check object members are adequate to generate an LNT json report file of the version specified at construction when rendering that instance. """ # Check requested version is supported by this library and is # valid for this object. assert ( self.report_version == 2 ), "Only v2 supported for LNT report format Test objects." for s in self.samples: assert isinstance(s, MetricSamples), "Unexpected type for metric sample." assert ( s.report_version == self.report_version ), "Mismatch between test and metric samples." def render(self): """Return info from this instance in a dictionary that respects the LNT report format in the version specified at construction when printed as json. """ d = dict(self.info) d.update([s.render().popitem() for s in self.samples]) d['Name'] = self.name return d class TestSamples: """Information on a given test and its associated samples data. Samples data must all relate to the same metric. When several metrics are available for a given test, the convention is to have one TestSamples per metric and to encode the metric into the name, e.g. Benchmark1.exec. The server automatically creates test database objects whenever a new test name is seen. TestSamples should only be used to generate report in version 1 or earlier of LNT JSON report file format. Test names are intended to be a persistent, recognizable identifier for what is being executed. Currently, most formats use some form of dotted notation for the test name, and this may become enshrined in the format in the future. In general, the test names should be independent of the software-under-test and refer to some known quantity, for example the software under test. For example, 'CINT2006.403_gcc' is a meaningful test name. The test info dictionary is intended to hold information on the particular permutation of the test that was run. This might include variables specific to the software-under-test . This could include, for example, the compile flags the test was built with, or the runtime parameters that were used. As a general rule, if two test samples are meaningfully and directly comparable, then they should have the same test name but different info paramaters. The report may include an arbitrary number of samples for each test for situations where the same test is run multiple times to gather statistical data. """ def __init__(self, name, data, info={}, conv_f=float): """Create an instance representing the samples converted into floating-point values using the conv_f function. """ self.name = str(name) self.info = dict((str(key), str(value)) for key, value in info.items()) self.data = list(map(conv_f, data)) def render(self): """Return info from this instance in a dictionary that respects the LNT report format in the version specified at construction when printed as json. """ return {'Name': self.name, 'Info': self.info, 'Data': self.data} def __repr__(self): # TODO remove this return "TestSample({}): {} - {}".format(self.name, self.data, self.info) class MetricSamples: """Samples data for a given metric of a given test. An arbitrary number of samples for a given metric is allowed for situations where the same metric is obtained several time for a given test to gather statistical data. MetricSamples should be used to generate report in version 2 or later of LNT JSON report file format. """ def __init__(self, metric, data, conv_f=float, report_version=2): self.metric = str(metric) self.data = list(map(conv_f, data)) self.report_version = report_version self.check() def check(self): """Check object members are adequate to generate an LNT json report file of the version specified at construction when rendering that instance. """ # Check requested version is supported by this library and is # valid for this object. assert ( self.report_version == 2 ), "Only v2 supported for LNT report format MetricSamples objects." def add_samples(self, new_samples, conv_f=float): """Add samples for this metric, converted to float by calling function conv_f. """ self.data.extend(map(conv_f, new_samples)) def render(self): """Return info from this instance in a dictionary that respects the LNT report format in the version specified at construction when printed as json. """ return {self.metric: self.data if len(self.data) > 1 else self.data[0]} ### # Format Versioning # We record information on the report "version" to allow the server to support # some level of auto-upgrading data from submissions of older reports. # # We recorder the report version as a reserved key in the run information # (primarily so that it can be accessed post-import on the server). # # Version 0 -- : initial (and unversioned). # # Version 1 -- 2012-04-12: run_order was changed to not be padded, and allow # non-integral values. # # Version 2 -- 2017-06: Revamped json format # - Directly uses lnt names (no 'info_key' names anymore) # - Flatten Machine.Info and Run.Info into the Machine and Run records # - One record for each test (not one record for test+metric) with one entry # for each metric. def _get_format_version(data): format_version = data.get('format_version') if format_version is not None: return int(format_version) # Older versions had a Run.Info.__report_version__ field run = data.get('Run') if run is not None: info = run.get('Info') if info is not None: report_version = info.get('__report_version__', '0') return int(report_version) return None def upgrade_0_to_1(data): # We recompute the run_order here if it looks like this run_order was # derived (we presume from sniffing a compiler). run_info = data['Run']['Info'] run_order = run_info.get('run_order') inferred_run_order = run_info.get('inferred_run_order') # If the run order is missing, or wasn't the inferred one, do nothing. if run_order is None or (run_order != inferred_run_order and inferred_run_order is not None): return data # Otherwise, assume this run order was derived. # Trim whitespace. run_order = run_order.strip() run_info['run_order'] = run_info['inferred_run_order'] = run_order # If this was a production Clang build, try to recompute the src tag. if 'clang' in run_info.get('cc_name', '') and \ run_info.get('cc_build') == 'PROD' and \ run_info.get('cc_src_tag') and \ run_order == run_info['cc_src_tag'].strip(): # Extract the version line. version_ln = None for ln in run_info.get('cc_version', '').split('\n'): if ' version ' in ln: version_ln = ln break else: # We are done if we didn't find one. return data # Extract the build string. m = re.match(r'(.*) version ([^ ]*) (\([^(]*\))(.*)', version_ln) if not m: return data cc_name, cc_version_num, cc_build_string, cc_extra = m.groups() m = re.search('clang-([0-9.]*)', cc_build_string) if m: run_info['run_order'] = run_info['inferred_run_order'] = \ run_info['cc_src_tag'] = m.group(1) data['Run']['Info']['__report_version__'] = "1" return data # Upgrading from version 1 to version 2 needs some schema in place class _UpgradeSchema(object): def __init__(self, metric_rename, machine_param_rename, run_param_rename): self.metric_rename = metric_rename self.machine_param_rename = machine_param_rename self.run_param_rename = run_param_rename _nts_upgrade = _UpgradeSchema( metric_rename={ '.code_size': 'code_size', '.compile': 'compile_time', '.compile.status': 'compile_status', '.exec': 'execution_time', '.exec.status': 'execution_status', '.hash': 'hash', '.hash.status': 'hash_status', '.mem': 'mem_bytes', '.score': 'score', + '.profile': 'profile', }, machine_param_rename={ 'name': 'hostname', # Avoid name clash with actual machine name. }, run_param_rename={ 'run_order': 'llvm_project_revision', } ) _compile_upgrade = _UpgradeSchema( metric_rename={ '.mem': 'mem_bytes', '.mem.status': 'mem_status', '.size': 'size_bytes', '.size.status': 'size_status', '.sys': 'sys_time', '.sys.status': 'sys_status', '.user': 'user_time', '.user.status': 'user_status', '.wall': 'wall_time', '.wall.status': 'wall_status', }, machine_param_rename={ 'hw.model': 'hardware', 'kern.version': 'os_version', 'name': 'hostname', }, run_param_rename={ 'run_order': 'llvm_project_revision', } ) _default_upgrade = _UpgradeSchema( metric_rename={}, machine_param_rename={}, run_param_rename={ 'run_order': 'llvm_project_revision', } ) _upgrades = { 'nts': _nts_upgrade, 'compile': _compile_upgrade } def upgrade_1_to_2(data, ts_name): result = dict() # Pull version and database schema to toplevel result['format_version'] = '2' report_version = data['Run']['Info'].pop('__report_version__', '1') # We should not be in upgrade_1_to_2 for other versions assert(report_version == '1') tag = data['Run']['Info'].pop('tag', None) if tag is not None and tag != ts_name: raise ValueError("Importing '%s' data into '%s' testsuite" % (tag, ts_name)) upgrade = _upgrades.get(tag) if upgrade is None: logger.warning("No upgrade schema known for '%s'\n" % tag) upgrade = _default_upgrade # Flatten Machine.Info into machine Machine = data['Machine'] result_machine = {'name': Machine['Name']} for key, value in Machine['Info'].items(): newname = upgrade.machine_param_rename.get(key, key) if newname in result_machine: raise ValueError("Name clash for machine info '%s'" % newname) result_machine[newname] = value result['machine'] = result_machine # Flatten Result.Info into result Run = data['Run'] result_run = {} start_time = Run.get('Start Time') if start_time is not None: result_run['start_time'] = start_time end_time = Run.get('End Time') if end_time is not None: result_run['end_time'] = end_time for key, value in Run['Info'].items(): newname = upgrade.run_param_rename.get(key, key) if newname in result_run: raise ValueError("Name clash for run info '%s'" % newname) result_run[newname] = value result['run'] = result_run # Merge tests result_tests = list() result_tests_dict = dict() Tests = data['Tests'] for test in Tests: test_Name = test['Name'] # Old testnames always started with 'tag.', split that part. if len(test['Info']) != 0: # The Info record didn't work with the v4 database anyway... raise ValueError("Tests/%s: cannot convert non-empty Info record" % test_Name) tag_dot = '%s.' % ts_name if not test_Name.startswith(tag_dot): raise ValueError("Tests/%s: test name does not start with '%s'" % (test_Name, tag_dot)) name_metric = test_Name[len(tag_dot):] found_metric = False for oldname, newname in upgrade.metric_rename.items(): assert(oldname.startswith('.')) if name_metric.endswith(oldname): name = name_metric[:-len(oldname)] metric = newname found_metric = True break if not found_metric: # Fallback logic for unknown metrics: Assume they are '.xxxx' name, dot, metric = name_metric.rpartition('.') if dot != '.': raise ValueError("Tests/%s: name does not end in .metric" % test_Name) logger.warning("Found unknown metric '%s'" % metric) upgrade.metric_rename['.'+metric] = metric result_test = result_tests_dict.get(name) if result_test is None: result_test = {'name': name} result_tests_dict[name] = result_test result_tests.append(result_test) data = test['Data'] if metric not in result_test: # Do not construct a list for the very common case of just a # single datum. if len(data) == 1: data = data[0] result_test[metric] = data elif len(data) > 0: # Transform the test data into a list if not isinstance(result_test[metric], list): result_test[metric] = [result_test[metric]] result_test[metric] += data result['tests'] = result_tests return result def upgrade_and_normalize_report(data, ts_name): # Get the report version. V2 has it at the top level, older version # in Run.Info. format_version = _get_format_version(data) if format_version is None: data['format_version'] = '2' format_version = 2 if format_version == 0: data = upgrade_0_to_1(data) format_version = 1 if format_version == 1: data = upgrade_1_to_2(data, ts_name) format_version = 2 if format_version != 2 or data['format_version'] != '2': raise ValueError("Unknown format version") if 'run' not in data: import pprint logger.info(pprint.pformat(data)) raise ValueError("No 'run' section in submission") if 'machine' not in data: raise ValueError("No 'machine' section in submission") if 'tests' not in data: raise ValueError("No 'tests' section in submission") run = data['run'] if 'start_time' not in run: time = datetime.datetime.utcnow().replace(microsecond=0).isoformat() run['start_time'] = time run['end_time'] = time elif 'end_time' not in run: run['end_time'] = run['start_time'] return data __all__ = ['Report', 'Machine', 'Run', 'TestSamples']