diff --git a/lldb/examples/python/lldb-os-plugin/arch/__init__.py b/lldb/examples/python/lldb-os-plugin/arch/__init__.py new file mode 100644 diff --git a/lldb/examples/python/lldb-os-plugin/arch/arc.py b/lldb/examples/python/lldb-os-plugin/arch/arc.py new file mode 100644 --- /dev/null +++ b/lldb/examples/python/lldb-os-plugin/arch/arc.py @@ -0,0 +1,65 @@ +import logging + +from typing import Dict, Union, Any + +import os +import sys +import struct + +from core.register_set_common import * + +logger = logging.getLogger(__name__) + +import lldb +# Find the location of registers description file. +_interp = lldb.debugger.GetCommandInterpreter() +_result = lldb.SBCommandReturnObject() +_interp.HandleCommand('settings show plugin.process.icd.hardware.registers', _result) +sys.path.append(os.path.dirname(_result.GetOutput().split()[-1][1:])) +try: + import registers_description as rd +except ImportError: + logger.critical("ARC architecture is chosen, but " + "'plugin.process.icd.hardware.registers' setting points to wrong location") + raise + + +class RegisterSet(RegisterSetCommon): + + register_info = rd.get_dynamic_setting(None, # We do not actually need a target for that. + 'target-registers-description') + + regname_to_regnum = dict() + for index, reg_info in enumerate(register_info['registers']): + regname_to_regnum[reg_info['name']] = index + if 'alt-name' in reg_info: + regname_to_regnum[reg_info['alt-name']] = index + + regname_to_regnum['lp_count'] = regname_to_regnum['lp-count'] + + def __init__(self, init_values: Dict[Union[str, int], int] = None): + super().__init__() + + self._register_count = len(self.register_info['registers']) + self._registers = [None] * self._register_count + + if init_values is not None: + self.fill(init_values) + + def reset_register_values(self) -> None: + self._registers = [None] * self._register_count + + def get_packed_register_state(self) -> bytes: + self._registers = [0 if elem is None else elem for elem in self._registers] + return struct.pack('%uI' % self._register_count, *self._registers) + + def set_register_value(self, key: Union[str, int], value: int) -> None: + if isinstance(key, str): + key = self.regname_to_regnum[key] + self._registers[key] = value + + def fill(self, registers: Dict[Union[str, int], int]) -> None: + self.reset_register_values() + + for k, v in registers.items(): + self.set_register_value(k, v) diff --git a/lldb/examples/python/lldb-os-plugin/arch/common.py b/lldb/examples/python/lldb-os-plugin/arch/common.py new file mode 100644 --- /dev/null +++ b/lldb/examples/python/lldb-os-plugin/arch/common.py @@ -0,0 +1,38 @@ +from typing import Type + +from core.register_set_common import * + + +""" +This file contains common functionality for arch-specific code, helper functions, +and definitions of architecture names. + +IMPORTANT: +Architecture name should be a valid name of that architecture in the LLVM. +By convention: + 1) arch-specific code must be placed under $(ARCH_NAME).py file; + 2) each architecture should implement a class named 'RegisterSet' + inherited from core.RegisterSetCommon class. +""" + +ARC_ARCH = 'arc' + + +def arch_to_regset_type(arch: str) -> Type[RegisterSetCommon]: + """ + Helper function that for an architecture returns a type of corresponding register set. + + :param Arch arch: Target architecture. + + :returns Type of register set for that architecture. + + :raises ValueError if the architecture is not supported by the plug-in. + """ + import importlib.util + spec = importlib.util.find_spec('arch.' + arch) + if spec is None: + raise ValueError(f'Unsupported architecture: {arch}') + + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module.RegisterSet diff --git a/lldb/examples/python/lldb-os-plugin/core/__init__.py b/lldb/examples/python/lldb-os-plugin/core/__init__.py new file mode 100644 diff --git a/lldb/examples/python/lldb-os-plugin/core/cvalue.py b/lldb/examples/python/lldb-os-plugin/core/cvalue.py new file mode 100644 --- /dev/null +++ b/lldb/examples/python/lldb-os-plugin/core/cvalue.py @@ -0,0 +1,306 @@ +from typing import Any + +from utilities.tracing import traced + +import lldb + + +class value: + """ + A class designed to wrap lldb.SBValue() objects so the resulting object + can be used as a variable. So if you have a Point structure variable in + your code in the current frame named "pt", you can initialize an instance + of this class with it: + + pt = lldb.value(lldb.frame.FindVariable("pt")) + print pt + print pt.x + print pt.y + + pt = lldb.value(lldb.frame.FindVariable("rectangle_array")) + print rectangle_array[12] + print rectangle_array[5].origin.x + + If you need to access wrapped SBValue, use get_sbvalue() method since this class does not have any public + attributes in order to avoid collisions with SBValue's fields. + + :param lldb.SBValue sb_value: SBValue object to wrap. + """ + def __init__(self, sb_value: lldb.SBValue): + self.sbvalue = sb_value + + def get_sbvalue(self) -> lldb.SBValue: + """ + Access wrapped value. + + :returns Unwrapped SBValue object. + """ + return object.__getattribute__(self, 'sbvalue') + + def __nonzero__(self): + return self.get_sbvalue().__nonzero__() and self._get_value_as_unsigned() != 0 + + def __repr__(self): + self.get_sbvalue().__str__() + + def __str__(self): + summary = self.get_sbvalue().GetSummary() + if summary: + return summary.strip('"') + return self.get_sbvalue().__str__() + + def __len__(self): + return self.get_sbvalue().GetNumChildren() + + def __iter__(self): + return ValueIter(self) + + def __getitem__(self, key): + # Allow array access if this value has children. + if isinstance(key, slice): + start = int(key.start) + end = int(key.stop) + step = 1 if key.step is None else int(key.step) + return [self[i] for i in range(start, end, step)] + if isinstance(key, value): + key = int(key) + if isinstance(key, int): + child_sbvlaue = self.get_sbvalue().GetChildAtIndex(key) + if child_sbvlaue and child_sbvlaue.IsValid(): + return value(child_sbvlaue) + raise IndexError(f"Index '{key}' is out of range") + raise TypeError(f"Cannot fetch array item for type '{type(key)}'. " + "Accepted types: slice, value, int") + + def __getattribute__(self, item): + if item in dir(value): + return object.__getattribute__(self, item) + + child_sbvalue = self.get_sbvalue().GetChildMemberWithName(item) + if child_sbvalue and child_sbvalue.IsValid(): + return value(child_sbvalue) + raise AttributeError(f"Attribute '{item}' is not defined") + + def __int__(self): + if self.get_sbvalue().GetType().IsPointerType(): + return self._get_value_as_unsigned() + + is_num, is_sign = lldb.is_numeric_type(self.get_sbvalue().GetType().GetCanonicalType().GetBasicType()) + if is_num and not is_sign: + return self._get_value_as_unsigned() + return self._get_value_as_signed() + + def __add__(self, other): + return int(self) + int(other) + + def __radd__(self, other): + return int(self) + int(other) + + def __sub__(self, other): + return int(self) - int(other) + + def __rsub__(self, other): + return int(other) - int(self) + + def __mul__(self, other): + return int(self) * int(other) + + def __rmul__(self, other): + return int(self) * int(other) + + def __floordiv__(self, other): + return int(self) // int(other) + + def __mod__(self, other): + return int(self) % int(other) + + def __rmod__(self, other): + return int(other) % int(self) + + def __divmod__(self, other): + return int(self) % int(other) + + def __rdivmod__(self, other): + return int(other) % int(self) + + def __pow__(self, other, modulo=None): + if modulo is None: + return int(self) ** int(other) + return self.__pow__(other) % int(modulo) + + def __lshift__(self, other): + return int(self) << int(other) + + def __rshift__(self, other): + return int(self) >> int(other) + + def __and__(self, other): + return int(self) & int(other) + + def __rand(self, other): + return int(self) & int(other) + + def __xor__(self, other): + return int(self) ^ int(other) + + def __or__(self, other): + return int(self) | int(other) + + def __div__(self, other): + return int(self) / int(other) + + def __rdiv__(self, other): + return int(other) / int(self) + + def __truediv__(self, other): + return int(self) / int(other) + + def __iadd__(self, other): + result = self.__add__(other) + self.get_sbvalue().SetValueFromCString(str(result)) + return result + + def __isub__(self, other): + result = self.__sub__(other) + self.get_sbvalue().SetValueFromCString(str(result)) + return result + + def __imul__(self, other): + result = self.__mul__(other) + self.get_sbvalue().SetValueFromCString(str(result)) + return result + + def __idiv__(self, other): + result = self.__div__(other) + self.get_sbvalue().SetValueFromCString(str(result)) + return result + + def __itruediv__(self, other): + result = self.__truediv__(other) + self.get_sbvalue().SetValueFromCString(str(result)) + return result + + def __ifloordiv__(self, other): + result = self.__floordiv__(other) + self.get_sbvalue().SetValueFromCString(str(result)) + return result + + def __imod__(self, other): + result = self.__and__(other) + self.get_sbvalue().SetValueFromCString(str(result)) + return result + + def __ipow__(self, other, modulo=None): + result = self.__pow__(other, modulo) + self.get_sbvalue().SetValueFromCString(str(result)) + return result + + def __ilshift__(self, other): + result = self.__lshift__(other) + self.get_sbvalue().SetValueFromCString(str(result)) + return result + + def __irshift__(self, other): + result = self.__rshift__(other) + self.get_sbvalue().SetValueFromCString(str(result)) + return result + + def __iand__(self, other): + result = self.__and__(other) + self.get_sbvalue().SetValueFromCString(str(result)) + return result + + def __eq__(self, other): + if isinstance(other, int): + return int(self) == other + elif isinstance(other, str): + return str(self) == other + elif isinstance(other, value): + self_err = lldb.SBError() + other_err = lldb.SBError() + self_val = self.get_sbvalue().GetValueAsUnsigned(self_err) + if self_err.fail: + raise ValueError('Unable to extract value of self') + other_val = other.get_sbvalue().GetValueAsUnsigned(other_err) + if other_err.fail: + raise ValueError('Unable to extract value of other') + return self_val == other_val + raise TypeError(f"Unknown type: '{type(other)}', No equality operation defined") + + def _get_value_as_unsigned(self): + sbvalue = self.get_sbvalue() + + serr = lldb.SBError() + retval = sbvalue.GetValueAsUnsigned(serr) + if serr.success: + return retval + raise ValueError("Failed to read unsigned data. " + f"{sbvalue}(type={sbvalue.GetType()}) " + f"Error description: {serr.GetCString()}") + + def _get_value_as_signed(self): + sbvalue = self.get_sbvalue() + + serr = lldb.SBError() + retval = sbvalue.GetValueAsSigned(serr) + if serr.success: + return retval + raise ValueError("Failed to read signed data. " + f"{sbvalue}(type={sbvalue.GetType()}) " + f"Error description: {serr.GetCString()}") + + +class ValueIter: + + def __init__(self, val: value): + self.index = 0 + self.val = val + self.length = len(val) + + def __iter__(self): + return self + + def __next__(self): + if self.index == self.length: + raise StopIteration + child_value = self.val[self.index] + self.index += 1 + return child_value + + +@traced +def unsigned(val: Any) -> int: + """ + Helper function to get unsigned value from core.value + + :param Any val: value (see value class above) to be representated as integer. + + :returns Unsigned int representation of the val. + + :raises ValueError if the type cannot be represented as unsigned integer. + """ + if isinstance(val, value): + return val._get_value_as_unsigned() + return int(val) + + +@traced +def dereference(val: value) -> value: + """ + Get a dereferenced object for a pointer type object. + + Example: + val = dereference(ptr_obj) # python + is same as: + ptr_obj = (int *)0x1234; # C + val = *ptr_obj; # C + + :param value val: value object representing a pointer type. + + :returns Dereferenced value. + + :raises TypeError if the type cannot be dereferenced. + """ + if val.get_sbvalue().GetType().IsPointerType(): + return value(val.get_sbvalue().Dereference()) + raise TypeError('Cannot dereference a non-pointer type') diff --git a/lldb/examples/python/lldb-os-plugin/core/operating_system.py b/lldb/examples/python/lldb-os-plugin/core/operating_system.py new file mode 100644 --- /dev/null +++ b/lldb/examples/python/lldb-os-plugin/core/operating_system.py @@ -0,0 +1,56 @@ +from typing import Iterable + +from abc import ABCMeta, abstractmethod + +from core.thread import * +from utilities.tracing import traced + + +class OperatingSystemInterface(metaclass=ABCMeta): + """ + Class that defines an interface each operating system needs to implement to be supported by this plug-in. + """ + + @traced + @abstractmethod + def update(self) -> None: + """ + Method that updates operating system's thread list. It is used by OSPlugInCommon to keep the data up-to-date. + """ + raise NotImplementedError + + @traced + @abstractmethod + def get_threads(self) -> Iterable[OSThread]: + """ + Method that returns current threads in the operating system. + + :returns An iterable object with OSThread objects. + """ + raise NotImplementedError + + @traced + @abstractmethod + def get_registers(self, tid: int) -> RegisterSetCommon: + """ + Method that fetches registers for the specified thread. + + :param int tid: ID of a thread to fetch registers from. + + :returns RegisterSet object filled with register values. + """ + raise NotImplementedError + + @traced + @abstractmethod + def create_thread(self, tid: int, context: int) -> OSThread: + """ + Method that created operating system's thread. It is used by OSPlugInCommon to create thread info as operating + system defines it. + + :param int tid: ID of a thread. + :param int context: pointer to the thread's context. + + :returns Created thread object. + """ + raise NotImplementedError diff --git a/lldb/examples/python/lldb-os-plugin/core/osplugin_common.py b/lldb/examples/python/lldb-os-plugin/core/osplugin_common.py new file mode 100644 --- /dev/null +++ b/lldb/examples/python/lldb-os-plugin/core/osplugin_common.py @@ -0,0 +1,108 @@ +import os +import yaml +import logging.config + +from typing import Dict, List, Any + +import lldb + +from arch.common import * +from arch.arc import * +from core.register_set_common import RegisterSetCommon +from core.cvalue import * +from core.operating_system import OperatingSystemInterface +from utilities.tracing import traced + + +# Set up logging. +_cur_dir = os.path.dirname(__file__) +_path = os.path.join(_cur_dir, '..', 'logging.yaml') +with open(_path, 'r') as f: + _config = yaml.safe_load(f.read()) + _log_file = _config['handlers']['debug_file_handler']['filename'] + _config['handlers']['debug_file_handler']['filename'] = os.path.join(_cur_dir, '..', 'logs', _log_file) +logging.config.dictConfig(_config) + +logger = logging.getLogger(__name__) + + +class OSPlugInCommon: + """ + Class that provides data for an instance of a LLDB 'OperatingSystemPython' plug-in class. + + OSPlugInCommon is used to communicate with LLDB. Each new OS needs to create its own OperatingSystemPython + class implementing OperatingSystemInterface and inheriting from OSPluginCommon. + + IMPORTANT: do not override following methods when inheriting from this classas LLDB uses them + to communicate with the plug-in: + create_thread + get_thread_info + get_register_info + get_register_data + + :param lldb.SBProcess process: SBProcess object representing executing program. + :param OperatingSystemInterface operating_system: An object that implements OperatingSystemInterfacez. + + :var lldb.SBTarget target: SBTarget object representing debug target. + :var Arch arch: An object representing target architecture. + """ + def __init__(self, process: lldb.SBProcess, operating_system: OperatingSystemInterface): + # Initialization needs a valid.SBProcess object. + if not (isinstance(process, lldb.SBProcess) and process.IsValid()): + error_str = 'Invalid SBProcess object' + logger.critical(error_str) + raise ValueError(error_str) + + self.__process = process + self.__operating_system = operating_system + self.__thread_cache = dict() + + self.target = self.__process.target + + self.arch = self.target.triple.split('-')[0].lower() + + self.__register_set_type = arch_to_regset_type(self.arch) + + logger.info('OSPlugIn has been initialized') + + @traced + def __fetch_hardware_registers(self, cpu: int = 0) -> RegisterSetCommon: + hardware_registers = self.__process.GetThreadAtIndex(cpu).GetFrameAtIndex(0).GetRegisters() + registers = dict() + + regnum = 0 + for i in range(hardware_registers.GetSize()): + _set = value(hardware_registers.GetValueAtIndex(i)) + for reg in _set: + registers[regnum] = unsigned(reg) + regnum += 1 + + reg_set = self.__register_set_type() + reg_set.fill(registers) + + return reg_set + + @traced + def create_thread(self, tid: int, context: int) -> Dict: + return self.__operating_system.create_thread(tid, context).get_info() + + @traced + def get_thread_info(self) -> List[Dict[str, Any]]: + self.__operating_system.update() + + res = [] + for thread in self.__operating_system.get_threads(): + self.__thread_cache[thread.id] = thread + res.append(thread.get_info()) + + return res + + @traced + def get_register_info(self) -> Dict: + return self.__register_set_type.register_info + + @traced + def get_register_data(self, tid: int) -> bytes: + if self.__thread_cache[tid].is_active: + return self.__fetch_hardware_registers().get_packed_register_state() + return self.__operating_system.get_registers(tid).get_packed_register_state() diff --git a/lldb/examples/python/lldb-os-plugin/core/register_set_common.py b/lldb/examples/python/lldb-os-plugin/core/register_set_common.py new file mode 100644 --- /dev/null +++ b/lldb/examples/python/lldb-os-plugin/core/register_set_common.py @@ -0,0 +1,68 @@ +from typing import Any, Dict, Union + +from abc import ABCMeta, abstractmethod + +from utilities.tracing import traced + + +class RegisterSetCommon(metaclass=ABCMeta): + """ + Class that provides functionality to work with architecture-specific register set. + + Each architecture should define a register set and register_info in format DynamicRegisterInfo expects. + """ + + register_info = {'sets': [], 'registers': []} + + @traced + @abstractmethod + def reset_register_values(self) -> None: + """ + Method that makes a reset of all registers. + """ + raise NotImplementedError + + @traced + @abstractmethod + def get_packed_register_state(self) -> bytes: + """ + Method that packs register values as bytes in a specific format. + + You could use python module 'struct' to convert register values to a sequence of bytes. + + :returns Sequence of bytes representing register values. + """ + raise NotImplementedError + + @traced + @abstractmethod + def set_register_value(self, key: Union[str, int], value: Any) -> None: + """ + Method that sets register value by key. + + :param key: Key to find register (register's name of number) + :type key: str or int + :param Any value: Value to assign to register. + """ + raise NotImplementedError + + @traced + @abstractmethod + def fill(self, registers: Dict[Union[str, int], Any]) -> None: + """ + Method that takes register values and fills the register set. + + :param registers: Dictionary (register's name or number => value). + :type registers: Dict[str or int, Any] + """ + raise NotImplementedError + + @classmethod + @traced + def get_register_info(cls) -> Dict: + """ + Method that returns register info for this register set. + + :returns Dictionary in a format DynamicRegisterInfo expects. + """ + return cls.register_info diff --git a/lldb/examples/python/lldb-os-plugin/core/thread.py b/lldb/examples/python/lldb-os-plugin/core/thread.py new file mode 100644 --- /dev/null +++ b/lldb/examples/python/lldb-os-plugin/core/thread.py @@ -0,0 +1,85 @@ +import lldb + +from core.register_set_common import * +from arch.common import * +from utilities.tracing import traced + + +class OSThread(metaclass=ABCMeta): + """ + Class that defines an interface operating system specific thread should implement. + + :param lldb.SBTarget target: SBTarget object representing debug target. + :param Arch arch: Current architecture. + :param int address: Address of a thread structure. + :param int tid: Thread's ID. + :param str name: Name of the thread, will be set to 'thread_at_%address' if not specified. + :param bool is_active: Specifies if the thread is active or not. Defaults to False. + + :var lldb.SBTarget target: Debug target the thread belongs to. + :var Arch arch: Target architecture. + :var RegisterSet register_set: Architecture specific register set. + :var int id: Thread's ID. + :var int address: Thread's address. + :var bool is_active: Shows is thread active or not. + """ + + @abstractmethod + def __init__(self, target: lldb.SBTarget, arch: str, address: int, + tid: int, name: str = None, is_active: bool = False): + self.id = tid + self.address = address + self.is_active = is_active + if name is None: + self.name = 'thread_at_' + hex(address) + else: + self.name = name + + self.target = target + self.arch = arch + self.register_set = arch_to_regset_type(arch)() + + @traced + @abstractmethod + def fetch_registers(self) -> RegisterSetCommon: + """ + Method that fetches saved thread's registers. + + :returns RegisterSet object filled with register values. + """ + raise NotImplementedError + + @traced + def get_info(self) -> Dict[str, Any]: + """ + Method for getting basic thread information. + + Possible fields: + tid (mandatory) -> thread ID + name (optional) -> thread name + queue (optional) -> thread dispatch queue name + state (mandatory) -> thread state (set to 'stopped' for now) + stop_reason (mandatory) -> thread stop reason (usually set to 'none') + Possible values: + 'breakpoint' -> is the thread is stopped at a breakpoint + 'none' -> thread is just stopped because the process is stopped + 'trace' -> the thread just single stepped + register_data_addr (optional) -> the address of the register data in memory (optional key/value pair) + Specifying this key/value pair for a thread will avoid a call to get_register_data() + and can be used when your registers are in a thread context structure that is contiguous + in memory. Don't specify this if your register layout in memory doesn't match the layout + described by the dictionary returned from a call to the get_register_info() method. + + :returns Dictionary with basic thread information. + """ + return { + 'tid': self.id, + 'name': self.name, + 'state': 'stopped', + 'stop_reason': 'none' + } + + def __eq__(self, other): + if isinstance(other, OSThread): + return self.id == other.id + return False diff --git a/lldb/examples/python/lldb-os-plugin/logging.yaml b/lldb/examples/python/lldb-os-plugin/logging.yaml new file mode 100644 --- /dev/null +++ b/lldb/examples/python/lldb-os-plugin/logging.yaml @@ -0,0 +1,24 @@ +version: 1 +disable_existing_loggers: False +formatters: + light: + format: "%(levelname)s - %(name)s - %(message)s" + debug: + format: "%(asctime)s - %(levelname)s - %(name)s - %(message)s" +handlers: + console: + class: logging.StreamHandler + level: WARNING + formatter: light + stream: ext://sys.stdout + debug_file_handler: + class: logging.handlers.RotatingFileHandler + level: DEBUG + formatter: debug + filename: debug.log + maxBytes: 10485760 # 10 MB + backupCount: 10 + encoding: utf8 +root: + level: DEBUG + handlers: [debug_file_handler, console] \ No newline at end of file diff --git a/lldb/examples/python/lldb-os-plugin/logs/.gitkeep b/lldb/examples/python/lldb-os-plugin/logs/.gitkeep new file mode 100644 diff --git a/lldb/examples/python/lldb-os-plugin/utilities/helpers.py b/lldb/examples/python/lldb-os-plugin/utilities/helpers.py new file mode 100644 --- /dev/null +++ b/lldb/examples/python/lldb-os-plugin/utilities/helpers.py @@ -0,0 +1,84 @@ +from typing import Dict, Union + +import lldb + +from core.cvalue import * + + +_dummy_name = 'dummy_name' + + +def create_value_from_address(target: lldb.SBTarget, val_type: Union[str, lldb.SBType], + address: Union[int, lldb.SBAddress], name: str = None) -> value: + """ + Helper function to create value object from given address and type. + + :param lldb.SBTarget target: SBTarget object representing debug target. + :param val_type: Type of object to create. + :type val_type: str or lldb.SBType + :param address: Address in memory where the object is. + :type address: int or lldb.SBAddress + :param str name: Name to give to the object. + + :returns value object with wrapped object. + """ + if isinstance(val_type, str): + val_type = target.FindFirstType(val_type) + if isinstance(address, int): + address = lldb.SBAddress(address, target) + if name is None: + name = _dummy_name + + return value(target.CreateValueFromAddress(name, address, val_type)) + + +def find_global_variable(target: lldb.SBTarget, var: str) -> value: + """ + Helper function to find global variable in the debug target. + + :param lldb.SBTarget target: SBTarget object representing debug target. + :param str var: Name of variable to find. + + :returns value object with found variable. + """ + return value(target.FindGlobalVariables(var, 1).GetValueAtIndex(0)) + + +def register_value_to_dict(val: value) -> Dict[str, int]: + """ + Helper function to create a map (register name => register value) from a value. + + Examples: + 1) + struct callee_saved { # C + uint32_t reg1; + uint32_t reg2; + ... + } + val = create_value_from_address(target, 'struct callee_saved', 0x1234) # python + registers = register_value_to_dict(val) # python + /* + reg1: 0x11111111 + reg2: 0x22222222 + ... + */ + + 2) + val = create_value_from_address(target, 'uint32_t', 0x1234, 'regname') # python + registers = register_value_to_dict(val) # python + /* + regname: 0x11111111 + */ + + :param value val: value that contains register(s). + + :returns Dictionary object (register name => register value). + """ + if len(val) == 0: + return {val.get_sbvalue().GetName(): unsigned(val)} + + registers = dict() + for elem in val: + registers[elem.get_sbvalue().GetName()] = unsigned(elem) + + return registers diff --git a/lldb/examples/python/lldb-os-plugin/utilities/tracing.py b/lldb/examples/python/lldb-os-plugin/utilities/tracing.py new file mode 100644 --- /dev/null +++ b/lldb/examples/python/lldb-os-plugin/utilities/tracing.py @@ -0,0 +1,50 @@ +import logging +from functools import wraps + +from typing import Callable + + +def _format_arg_value(arg_val): + """ + Return a string representing a (name, value) pair. + + _format_arg_value(('x', (1, 2, 3))) + 'x=(1, 2, 3)' + """ + arg, val = arg_val + return f'{arg}={val}' + + +def traced(func: Callable) -> Callable: + """ + Trace calls to a function. + + :returns A decorated version of the input function which "traces" calls + made to it by writing out the function's name and the arguments it was + called with. + """ + + # Unpack function's arg count, arg names, arg defaults. + code = func.__code__ + arg_count = code.co_argcount + arg_names = code.co_varnames[:arg_count] + fn_defaults = func.__defaults__ or list() + arg_defs = dict(zip(arg_names[-len(fn_defaults):], fn_defaults)) + + @wraps(func) + def wrapped(*v, **k): + # Collect function arguments by chaining together positional, + # defaulted, extra positional and keyword arguments. + positional = [_format_arg_value(arg_val) for arg_val in zip(arg_names, v)] + defaulted = [_format_arg_value((name, arg_defs[name])) + for name in arg_names[len(v):] if name not in k] + nameless = [repr(elem) for elem in v[arg_count:]] + keyword = [_format_arg_value(arg_val) for arg_val in k.items()] + args = positional + defaulted + nameless + keyword + + logger = logging.getLogger(func.__module__) + logger.debug(func.__name__ + '( ' + ', '.join(args) + ' )') + + return func(*v, **k) + + return wrapped diff --git a/lldb/examples/python/lldb-os-plugin/zephyr.py b/lldb/examples/python/lldb-os-plugin/zephyr.py new file mode 100644 --- /dev/null +++ b/lldb/examples/python/lldb-os-plugin/zephyr.py @@ -0,0 +1,156 @@ +import logging + +from typing import Iterator + +import lldb + +from core.operating_system import * +from core.osplugin_common import * +from core.cvalue import * +from arch.common import * +from utilities.helpers import * + +# Common defines. +ZEPHYR_THREAD_TYPE = 'struct k_thread' + +# ARC-related defines. +ARC_CALLEE_SAVED_TYPE = 'struct _callee_saved_stack' +ARC_IRQ_STACK_FRAME_TYPE = 'struct _irq_stack_frame' +ARC_REGISTER_TYPE = 'u32_t' +ARC_CAUSE_NONE = 0 +ARC_CAUSE_COOP = 1 +ARC_CAUSE_RIRQ = 2 +ARC_CAUSE_FIRQ = 3 + +logger = logging.getLogger(__name__) + + +def create_k_thread(target: lldb.SBTarget, address: int) -> value: + return create_value_from_address(target, ZEPHYR_THREAD_TYPE, address) + + +class ZephyrThread(OSThread): + + def __init__(self, target: lldb.SBTarget, arch: str, address: int, is_active: bool = False): + self._k_thread = create_k_thread(target, address) + + # Zephyr thread has 'name' attribute only if CONFIG_THREAD_NAME option is enabled. + try: + th_name = self._k_thread.name + except AttributeError: + super().__init__(target, arch, address, address, is_active=is_active) + return + + super().__init__(target, arch, address, address, name=str(th_name), is_active=is_active) + + def _arc_register_context_create(self) -> Dict[str, int]: + stack_ptr = unsigned(self._callee_saved.sp) + + registers = register_value_to_dict( + create_value_from_address(self.target, ARC_CALLEE_SAVED_TYPE, stack_ptr)) + + callee_saved_size = self.target.FindFirstType(ARC_CALLEE_SAVED_TYPE).GetByteSize() + + relinquish_cause = self._thread_arch.relinquish_cause + if relinquish_cause == ARC_CAUSE_NONE: + logger.warning(f'Bad relinquish cause for thread {hex(self.id)}') + elif relinquish_cause == ARC_CAUSE_COOP: + stack_ptr += callee_saved_size + 4 + # TODO: check status32' AE bit + status32 = register_value_to_dict(create_value_from_address( + self.target, ARC_REGISTER_TYPE, stack_ptr, 'status32')) + pc = register_value_to_dict(create_value_from_address( + self.target, ARC_REGISTER_TYPE, stack_ptr + 4, 'pc')) + + registers = {**registers, **status32, **pc} + # In case of returning from coop, we should also copy BLINK to PC, STATUS32 to R3, and restore SP. + registers['blink'] = registers['pc'] + registers['r3'] = registers['status32'] + registers['sp'] = stack_ptr + 8 + elif relinquish_cause == ARC_CAUSE_RIRQ or relinquish_cause == ARC_CAUSE_FIRQ: + irq_stack_frame_ptr = stack_ptr + callee_saved_size + irq_stack_frame = register_value_to_dict(create_value_from_address( + self.target, ARC_IRQ_STACK_FRAME_TYPE, irq_stack_frame_ptr)) + # Restore SP. + irq_stack_frame_size = self.target.FindFirstType(ARC_IRQ_STACK_FRAME_TYPE).GetByteSize() + irq_stack_frame['sp'] = irq_stack_frame_ptr + irq_stack_frame_size + 4 + + registers = {**registers, **irq_stack_frame} + + return registers + + def fetch_registers(self) -> RegisterSetCommon: + # At start-up Zephyr has a k_thread object at nullptr, so we should check this to avoid crushing. + if not self._k_thread: + return self.register_set + + if self.arch == ARC_ARCH: + registers = self._arc_register_context_create() + else: + logger.error(f'Unsupported architecture: {self.arch.value}') + return self.register_set + + self.register_set.fill(registers) + return self.register_set + + @property + def _callee_saved(self) -> value: + return self._k_thread.callee_saved + + @property + def _thread_arch(self) -> value: + return self._k_thread.arch + + +@OperatingSystemInterface.register +class OperatingSystemPlugIn(OSPlugInCommon): + + def __init__(self, process: lldb.SBProcess): + super().__init__(process, self) + + self._threads = dict() + self._current_threads = list() + self._kernel = find_global_variable(self.target, '_kernel') + + def update(self) -> None: + self._threads.clear() + self._current_threads.clear() + self._build_thread_list() + + def get_threads(self) -> Iterable[ZephyrThread]: + return self._threads.values() + + def get_registers(self, tid: int) -> RegisterSetCommon: + return self._threads[tid].fetch_registers() + + def _build_thread_list(self) -> None: + try: + threads = self._kernel.threads + except AttributeError: + logger.warning('CONFIG_THREAD_MONITOR should be enabled to access kernel threads') + return + + # Collect current threads. + for cpu in self._kernel.cpus: + self._current_threads.append(cpu.current) + + for thread in self._iterate_threads(threads): + ptr = unsigned(thread) + self._threads[ptr] = ZephyrThread( + self.target, self.arch, ptr, is_active=(thread in self._current_threads)) + + def _iterate_threads(self, thread_val: value) -> Iterator[value]: + visited = set() + + while True: + thread_addr = unsigned(thread_val) + if thread_addr == 0 or thread_addr in visited: + raise StopIteration + yield thread_val + visited.add(thread_addr) + thread_val = thread_val.next_thread + + def create_thread(self, tid: int, context: int) -> ZephyrThread: + thread = ZephyrThread(self.target, self.arch, context) + self._threads[thread.id] = thread + return thread