diff options
Diffstat (limited to 'debuginfo-tests/dexter/dex/debugger/DebuggerBase.py')
-rw-r--r-- | debuginfo-tests/dexter/dex/debugger/DebuggerBase.py | 227 |
1 files changed, 227 insertions, 0 deletions
diff --git a/debuginfo-tests/dexter/dex/debugger/DebuggerBase.py b/debuginfo-tests/dexter/dex/debugger/DebuggerBase.py new file mode 100644 index 00000000000..8013ceb6436 --- /dev/null +++ b/debuginfo-tests/dexter/dex/debugger/DebuggerBase.py @@ -0,0 +1,227 @@ +# DExTer : Debugging Experience Tester +# ~~~~~~ ~ ~~ ~ ~~ +# +# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +# See https://llvm.org/LICENSE.txt for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +"""Base class for all debugger interface implementations.""" + +import abc +from itertools import chain +import os +import sys +import time +import traceback + +from dex.dextIR import DebuggerIR, ValueIR +from dex.utils.Exceptions import DebuggerException +from dex.utils.Exceptions import NotYetLoadedDebuggerException +from dex.utils.ReturnCode import ReturnCode + + +class DebuggerBase(object, metaclass=abc.ABCMeta): + def __init__(self, context, step_collection): + self.context = context + self.steps = step_collection + self._interface = None + self.has_loaded = False + self._loading_error = NotYetLoadedDebuggerException() + self.watches = set() + + try: + self._interface = self._load_interface() + self.has_loaded = True + self._loading_error = None + except DebuggerException: + self._loading_error = sys.exc_info() + + self.step_index = 0 + + def __enter__(self): + try: + self._custom_init() + self.clear_breakpoints() + self.add_breakpoints() + except DebuggerException: + self._loading_error = sys.exc_info() + return self + + def __exit__(self, *args): + self._custom_exit() + + def _custom_init(self): + pass + + def _custom_exit(self): + pass + + @property + def debugger_info(self): + return DebuggerIR(name=self.name, version=self.version) + + @property + def is_available(self): + return self.has_loaded and self.loading_error is None + + @property + def loading_error(self): + return (str(self._loading_error[1]) + if self._loading_error is not None else None) + + @property + def loading_error_trace(self): + if not self._loading_error: + return None + + tb = traceback.format_exception(*self._loading_error) + + if self._loading_error[1].orig_exception is not None: + orig_exception = traceback.format_exception( + *self._loading_error[1].orig_exception) + + if ''.join(orig_exception) not in ''.join(tb): + tb.extend(['\n']) + tb.extend(orig_exception) + + tb = ''.join(tb).splitlines(True) + return tb + + def add_breakpoints(self): + for s in self.context.options.source_files: + with open(s, 'r') as fp: + num_lines = len(fp.readlines()) + for line in range(1, num_lines + 1): + self.add_breakpoint(s, line) + + def _update_step_watches(self, step_info): + loc = step_info.current_location + watch_cmds = ['DexUnreachable', 'DexExpectStepOrder'] + towatch = chain.from_iterable(self.steps.commands[x] + for x in watch_cmds + if x in self.steps.commands) + try: + # Iterate over all watches of the types named in watch_cmds + for watch in towatch: + if (os.path.exists(loc.path) + and os.path.samefile(watch.path, loc.path) + and watch.lineno == loc.lineno): + result = watch.eval(self) + step_info.watches.update(result) + break + except KeyError: + pass + + def _sanitize_function_name(self, name): # pylint: disable=no-self-use + """If the function name returned by the debugger needs any post- + processing to make it fit (for example, if it includes a byte offset), + do that here. + """ + return name + + def start(self): + self.steps.clear_steps() + self.launch() + + for command_obj in chain.from_iterable(self.steps.commands.values()): + self.watches.update(command_obj.get_watches()) + + max_steps = self.context.options.max_steps + for _ in range(max_steps): + while self.is_running: + pass + + if self.is_finished: + break + + self.step_index += 1 + step_info = self.get_step_info() + + if step_info.current_frame: + self._update_step_watches(step_info) + self.steps.new_step(self.context, step_info) + + if self.in_source_file(step_info): + self.step() + else: + self.go() + + time.sleep(self.context.options.pause_between_steps) + else: + raise DebuggerException( + 'maximum number of steps reached ({})'.format(max_steps)) + + def in_source_file(self, step_info): + if not step_info.current_frame: + return False + if not os.path.exists(step_info.current_location.path): + return False + return any(os.path.samefile(step_info.current_location.path, f) \ + for f in self.context.options.source_files) + + @abc.abstractmethod + def _load_interface(self): + pass + + @classmethod + def get_option_name(cls): + """Short name that will be used on the command line to specify this + debugger. + """ + raise NotImplementedError() + + @classmethod + def get_name(cls): + """Full name of this debugger.""" + raise NotImplementedError() + + @property + def name(self): + return self.__class__.get_name() + + @property + def option_name(self): + return self.__class__.get_option_name() + + @abc.abstractproperty + def version(self): + pass + + @abc.abstractmethod + def clear_breakpoints(self): + pass + + @abc.abstractmethod + def add_breakpoint(self, file_, line): + pass + + @abc.abstractmethod + def launch(self): + pass + + @abc.abstractmethod + def step(self): + pass + + @abc.abstractmethod + def go(self) -> ReturnCode: + pass + + @abc.abstractmethod + def get_step_info(self): + pass + + @abc.abstractproperty + def is_running(self): + pass + + @abc.abstractproperty + def is_finished(self): + pass + + @abc.abstractproperty + def frames_below_main(self): + pass + + @abc.abstractmethod + def evaluate_expression(self, expression, frame_idx=0) -> ValueIR: + pass |