summaryrefslogtreecommitdiffstats
path: root/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
diff options
context:
space:
mode:
authorRaphael Isemann <teemperor@gmail.com>2019-07-19 15:55:23 +0000
committerRaphael Isemann <teemperor@gmail.com>2019-07-19 15:55:23 +0000
commitb45853f173139c7c3078b97f53e7a6eba6148c13 (patch)
tree3b24eec01a7b23edd4364911d9bf6490ce2c1422 /lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
parent005423018182120f3ae2a54ff5fd3390c96fb527 (diff)
downloadbcm5719-llvm-b45853f173139c7c3078b97f53e7a6eba6148c13.tar.gz
bcm5719-llvm-b45853f173139c7c3078b97f53e7a6eba6148c13.zip
[lldb][NFC] Cleanup mentions and code related to lldb-mi
Summary: lldb-mi has been removed, but there are still a bunch of references in the code base. This patch removes all of them. Reviewers: JDevlieghere, jfb Reviewed By: JDevlieghere Subscribers: dexonsmith, ki.stfu, mgorny, abidh, jfb, lldb-commits Tags: #lldb Differential Revision: https://reviews.llvm.org/D64992 llvm-svn: 366590
Diffstat (limited to 'lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py')
-rw-r--r--lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py1643
1 files changed, 0 insertions, 1643 deletions
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
deleted file mode 100644
index e7c63bf21e8..00000000000
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
+++ /dev/null
@@ -1,1643 +0,0 @@
-"""
-Base class for gdb-remote test cases.
-"""
-
-from __future__ import division, print_function
-
-
-import errno
-import os
-import os.path
-import platform
-import random
-import re
-import select
-import signal
-import socket
-import subprocess
-import sys
-import tempfile
-import time
-from lldbsuite.test import configuration
-from lldbsuite.test.lldbtest import *
-from lldbsuite.support import seven
-from lldbgdbserverutils import *
-import logging
-
-
-class _ConnectionRefused(IOError):
- pass
-
-
-class GdbRemoteTestCaseBase(TestBase):
-
- NO_DEBUG_INFO_TESTCASE = True
-
- _TIMEOUT_SECONDS = 120
-
- _GDBREMOTE_KILL_PACKET = "$k#6b"
-
- # Start the inferior separately, attach to the inferior on the stub
- # command line.
- _STARTUP_ATTACH = "attach"
- # Start the inferior separately, start the stub without attaching, allow
- # the test to attach to the inferior however it wants (e.g. $vAttach;pid).
- _STARTUP_ATTACH_MANUALLY = "attach_manually"
- # Start the stub, and launch the inferior with an $A packet via the
- # initial packet stream.
- _STARTUP_LAUNCH = "launch"
-
- # GDB Signal numbers that are not target-specific used for common
- # exceptions
- TARGET_EXC_BAD_ACCESS = 0x91
- TARGET_EXC_BAD_INSTRUCTION = 0x92
- TARGET_EXC_ARITHMETIC = 0x93
- TARGET_EXC_EMULATION = 0x94
- TARGET_EXC_SOFTWARE = 0x95
- TARGET_EXC_BREAKPOINT = 0x96
-
- _verbose_log_handler = None
- _log_formatter = logging.Formatter(
- fmt='%(asctime)-15s %(levelname)-8s %(message)s')
-
- def setUpBaseLogging(self):
- self.logger = logging.getLogger(__name__)
-
- if len(self.logger.handlers) > 0:
- return # We have set up this handler already
-
- self.logger.propagate = False
- self.logger.setLevel(logging.DEBUG)
-
- # log all warnings to stderr
- handler = logging.StreamHandler()
- handler.setLevel(logging.WARNING)
- handler.setFormatter(self._log_formatter)
- self.logger.addHandler(handler)
-
- def isVerboseLoggingRequested(self):
- # We will report our detailed logs if the user requested that the "gdb-remote" channel is
- # logged.
- return any(("gdb-remote" in channel)
- for channel in lldbtest_config.channels)
-
- def setUp(self):
- TestBase.setUp(self)
-
- self.setUpBaseLogging()
- self.debug_monitor_extra_args = []
- self._pump_queues = socket_packet_pump.PumpQueues()
-
- if self.isVerboseLoggingRequested():
- # If requested, full logs go to a log file
- self._verbose_log_handler = logging.FileHandler(
- self.log_basename + "-host.log")
- self._verbose_log_handler.setFormatter(self._log_formatter)
- self._verbose_log_handler.setLevel(logging.DEBUG)
- self.logger.addHandler(self._verbose_log_handler)
-
- self.test_sequence = GdbRemoteTestSequence(self.logger)
- self.set_inferior_startup_launch()
- self.port = self.get_next_port()
- self.named_pipe_path = None
- self.named_pipe = None
- self.named_pipe_fd = None
- self.stub_sends_two_stop_notifications_on_kill = False
- if configuration.lldb_platform_url:
- if configuration.lldb_platform_url.startswith('unix-'):
- url_pattern = '(.+)://\[?(.+?)\]?/.*'
- else:
- url_pattern = '(.+)://(.+):\d+'
- scheme, host = re.match(
- url_pattern, configuration.lldb_platform_url).groups()
- if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
- self.stub_device = host
- self.stub_hostname = 'localhost'
- else:
- self.stub_device = None
- self.stub_hostname = host
- else:
- self.stub_hostname = "localhost"
-
- def tearDown(self):
- self._pump_queues.verify_queues_empty()
-
- self.logger.removeHandler(self._verbose_log_handler)
- self._verbose_log_handler = None
- TestBase.tearDown(self)
-
- def getLocalServerLogFile(self):
- return self.log_basename + "-server.log"
-
- def setUpServerLogging(self, is_llgs):
- if len(lldbtest_config.channels) == 0:
- return # No logging requested
-
- if lldb.remote_platform:
- log_file = lldbutil.join_remote_paths(
- lldb.remote_platform.GetWorkingDirectory(), "server.log")
- else:
- log_file = self.getLocalServerLogFile()
-
- if is_llgs:
- self.debug_monitor_extra_args.append("--log-file=" + log_file)
- self.debug_monitor_extra_args.append(
- "--log-channels={}".format(":".join(lldbtest_config.channels)))
- else:
- self.debug_monitor_extra_args = [
- "--log-file=" + log_file, "--log-flags=0x800000"]
-
- def get_next_port(self):
- return 12000 + random.randint(0, 3999)
-
- def reset_test_sequence(self):
- self.test_sequence = GdbRemoteTestSequence(self.logger)
-
- def create_named_pipe(self):
- # Create a temp dir and name for a pipe.
- temp_dir = tempfile.mkdtemp()
- named_pipe_path = os.path.join(temp_dir, "stub_port_number")
-
- # Create the named pipe.
- os.mkfifo(named_pipe_path)
-
- # Open the read side of the pipe in non-blocking mode. This will
- # return right away, ready or not.
- named_pipe_fd = os.open(named_pipe_path, os.O_RDONLY | os.O_NONBLOCK)
-
- # Create the file for the named pipe. Note this will follow semantics of
- # a non-blocking read side of a named pipe, which has different semantics
- # than a named pipe opened for read in non-blocking mode.
- named_pipe = os.fdopen(named_pipe_fd, "r")
- self.assertIsNotNone(named_pipe)
-
- def shutdown_named_pipe():
- # Close the pipe.
- try:
- named_pipe.close()
- except:
- print("failed to close named pipe")
- None
-
- # Delete the pipe.
- try:
- os.remove(named_pipe_path)
- except:
- print("failed to delete named pipe: {}".format(named_pipe_path))
- None
-
- # Delete the temp directory.
- try:
- os.rmdir(temp_dir)
- except:
- print(
- "failed to delete temp dir: {}, directory contents: '{}'".format(
- temp_dir, os.listdir(temp_dir)))
- None
-
- # Add the shutdown hook to clean up the named pipe.
- self.addTearDownHook(shutdown_named_pipe)
-
- # Clear the port so the stub selects a port number.
- self.port = 0
-
- return (named_pipe_path, named_pipe, named_pipe_fd)
-
- def get_stub_port_from_named_socket(self, read_timeout_seconds=5):
- # Wait for something to read with a max timeout.
- (ready_readers, _, _) = select.select(
- [self.named_pipe_fd], [], [], read_timeout_seconds)
- self.assertIsNotNone(
- ready_readers,
- "write side of pipe has not written anything - stub isn't writing to pipe.")
- self.assertNotEqual(
- len(ready_readers),
- 0,
- "write side of pipe has not written anything - stub isn't writing to pipe.")
-
- # Read the port from the named pipe.
- stub_port_raw = self.named_pipe.read()
- self.assertIsNotNone(stub_port_raw)
- self.assertNotEqual(
- len(stub_port_raw),
- 0,
- "no content to read on pipe")
-
- # Trim null byte, convert to int.
- stub_port_raw = stub_port_raw[:-1]
- stub_port = int(stub_port_raw)
- self.assertTrue(stub_port > 0)
-
- return stub_port
-
- def init_llgs_test(self, use_named_pipe=True):
- if lldb.remote_platform:
- # Remote platforms don't support named pipe based port negotiation
- use_named_pipe = False
-
- # Grab the ppid from /proc/[shell pid]/stat
- err, retcode, shell_stat = self.run_platform_command(
- "cat /proc/$$/stat")
- self.assertTrue(
- err.Success() and retcode == 0,
- "Failed to read file /proc/$$/stat: %s, retcode: %d" %
- (err.GetCString(),
- retcode))
-
- # [pid] ([executable]) [state] [*ppid*]
- pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
- err, retcode, ls_output = self.run_platform_command(
- "ls -l /proc/%s/exe" % pid)
- self.assertTrue(
- err.Success() and retcode == 0,
- "Failed to read file /proc/%s/exe: %s, retcode: %d" %
- (pid,
- err.GetCString(),
- retcode))
- exe = ls_output.split()[-1]
-
- # If the binary has been deleted, the link name has " (deleted)" appended.
- # Remove if it's there.
- self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe)
- else:
- self.debug_monitor_exe = get_lldb_server_exe()
- if not self.debug_monitor_exe:
- self.skipTest("lldb-server exe not found")
-
- self.debug_monitor_extra_args = ["gdbserver"]
- self.setUpServerLogging(is_llgs=True)
-
- if use_named_pipe:
- (self.named_pipe_path, self.named_pipe,
- self.named_pipe_fd) = self.create_named_pipe()
-
- def init_debugserver_test(self, use_named_pipe=True):
- self.debug_monitor_exe = get_debugserver_exe()
- if not self.debug_monitor_exe:
- self.skipTest("debugserver exe not found")
- self.setUpServerLogging(is_llgs=False)
- if use_named_pipe:
- (self.named_pipe_path, self.named_pipe,
- self.named_pipe_fd) = self.create_named_pipe()
- # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
- # when the process truly dies.
- self.stub_sends_two_stop_notifications_on_kill = True
-
- def forward_adb_port(self, source, target, direction, device):
- adb = ['adb'] + (['-s', device] if device else []) + [direction]
-
- def remove_port_forward():
- subprocess.call(adb + ["--remove", "tcp:%d" % source])
-
- subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target])
- self.addTearDownHook(remove_port_forward)
-
- def _verify_socket(self, sock):
- # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
- # connect() attempt. However, due to the way how ADB forwarding works, on android targets
- # the connect() will always be successful, but the connection will be immediately dropped
- # if ADB could not connect on the remote side. This function tries to detect this
- # situation, and report it as "connection refused" so that the upper layers attempt the
- # connection again.
- triple = self.dbg.GetSelectedPlatform().GetTriple()
- if not re.match(".*-.*-.*-android", triple):
- return # Not android.
- can_read, _, _ = select.select([sock], [], [], 0.1)
- if sock not in can_read:
- return # Data is not available, but the connection is alive.
- if len(sock.recv(1, socket.MSG_PEEK)) == 0:
- raise _ConnectionRefused() # Got EOF, connection dropped.
-
- def create_socket(self):
- sock = socket.socket()
- logger = self.logger
-
- triple = self.dbg.GetSelectedPlatform().GetTriple()
- if re.match(".*-.*-.*-android", triple):
- self.forward_adb_port(
- self.port,
- self.port,
- "forward",
- self.stub_device)
-
- logger.info(
- "Connecting to debug monitor on %s:%d",
- self.stub_hostname,
- self.port)
- connect_info = (self.stub_hostname, self.port)
- try:
- sock.connect(connect_info)
- except socket.error as serr:
- if serr.errno == errno.ECONNREFUSED:
- raise _ConnectionRefused()
- raise serr
-
- def shutdown_socket():
- if sock:
- try:
- # send the kill packet so lldb-server shuts down gracefully
- sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
- except:
- logger.warning(
- "failed to send kill packet to debug monitor: {}; ignoring".format(
- sys.exc_info()[0]))
-
- try:
- sock.close()
- except:
- logger.warning(
- "failed to close socket to debug monitor: {}; ignoring".format(
- sys.exc_info()[0]))
-
- self.addTearDownHook(shutdown_socket)
-
- self._verify_socket(sock)
-
- return sock
-
- def set_inferior_startup_launch(self):
- self._inferior_startup = self._STARTUP_LAUNCH
-
- def set_inferior_startup_attach(self):
- self._inferior_startup = self._STARTUP_ATTACH
-
- def set_inferior_startup_attach_manually(self):
- self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
-
- def get_debug_monitor_command_line_args(self, attach_pid=None):
- if lldb.remote_platform:
- commandline_args = self.debug_monitor_extra_args + \
- ["*:{}".format(self.port)]
- else:
- commandline_args = self.debug_monitor_extra_args + \
- ["127.0.0.1:{}".format(self.port)]
-
- if attach_pid:
- commandline_args += ["--attach=%d" % attach_pid]
- if self.named_pipe_path:
- commandline_args += ["--named-pipe", self.named_pipe_path]
- return commandline_args
-
- def get_target_byte_order(self):
- inferior_exe_path = self.getBuildArtifact("a.out")
- target = self.dbg.CreateTarget(inferior_exe_path)
- return target.GetByteOrder()
-
- def launch_debug_monitor(self, attach_pid=None, logfile=None):
- # Create the command line.
- commandline_args = self.get_debug_monitor_command_line_args(
- attach_pid=attach_pid)
-
- # Start the server.
- server = self.spawnSubprocess(
- self.debug_monitor_exe,
- commandline_args,
- install_remote=False)
- self.addTearDownHook(self.cleanupSubprocesses)
- self.assertIsNotNone(server)
-
- # If we're receiving the stub's listening port from the named pipe, do
- # that here.
- if self.named_pipe:
- self.port = self.get_stub_port_from_named_socket()
-
- return server
-
- def connect_to_debug_monitor(self, attach_pid=None):
- if self.named_pipe:
- # Create the stub.
- server = self.launch_debug_monitor(attach_pid=attach_pid)
- self.assertIsNotNone(server)
-
- def shutdown_debug_monitor():
- try:
- server.terminate()
- except:
- logger.warning(
- "failed to terminate server for debug monitor: {}; ignoring".format(
- sys.exc_info()[0]))
- self.addTearDownHook(shutdown_debug_monitor)
-
- # Schedule debug monitor to be shut down during teardown.
- logger = self.logger
-
- # Attach to the stub and return a socket opened to it.
- self.sock = self.create_socket()
- return server
-
- # We're using a random port algorithm to try not to collide with other ports,
- # and retry a max # times.
- attempts = 0
- MAX_ATTEMPTS = 20
-
- while attempts < MAX_ATTEMPTS:
- server = self.launch_debug_monitor(attach_pid=attach_pid)
-
- # Schedule debug monitor to be shut down during teardown.
- logger = self.logger
-
- def shutdown_debug_monitor():
- try:
- server.terminate()
- except:
- logger.warning(
- "failed to terminate server for debug monitor: {}; ignoring".format(
- sys.exc_info()[0]))
- self.addTearDownHook(shutdown_debug_monitor)
-
- connect_attemps = 0
- MAX_CONNECT_ATTEMPTS = 10
-
- while connect_attemps < MAX_CONNECT_ATTEMPTS:
- # Create a socket to talk to the server
- try:
- logger.info("Connect attempt %d", connect_attemps + 1)
- self.sock = self.create_socket()
- return server
- except _ConnectionRefused as serr:
- # Ignore, and try again.
- pass
- time.sleep(0.5)
- connect_attemps += 1
-
- # We should close the server here to be safe.
- server.terminate()
-
- # Increment attempts.
- print(
- "connect to debug monitor on port %d failed, attempt #%d of %d" %
- (self.port, attempts + 1, MAX_ATTEMPTS))
- attempts += 1
-
- # And wait a random length of time before next attempt, to avoid
- # collisions.
- time.sleep(random.randint(1, 5))
-
- # Now grab a new port number.
- self.port = self.get_next_port()
-
- raise Exception(
- "failed to create a socket to the launched debug monitor after %d tries" %
- attempts)
-
- def launch_process_for_attach(
- self,
- inferior_args=None,
- sleep_seconds=3,
- exe_path=None):
- # We're going to start a child process that the debug monitor stub can later attach to.
- # This process needs to be started so that it just hangs around for a while. We'll
- # have it sleep.
- if not exe_path:
- exe_path = self.getBuildArtifact("a.out")
-
- args = []
- if inferior_args:
- args.extend(inferior_args)
- if sleep_seconds:
- args.append("sleep:%d" % sleep_seconds)
-
- inferior = self.spawnSubprocess(exe_path, args)
-
- def shutdown_process_for_attach():
- try:
- inferior.terminate()
- except:
- logger.warning(
- "failed to terminate inferior process for attach: {}; ignoring".format(
- sys.exc_info()[0]))
- self.addTearDownHook(shutdown_process_for_attach)
- return inferior
-
- def prep_debug_monitor_and_inferior(
- self,
- inferior_args=None,
- inferior_sleep_seconds=3,
- inferior_exe_path=None):
- """Prep the debug monitor, the inferior, and the expected packet stream.
-
- Handle the separate cases of using the debug monitor in attach-to-inferior mode
- and in launch-inferior mode.
-
- For attach-to-inferior mode, the inferior process is first started, then
- the debug monitor is started in attach to pid mode (using --attach on the
- stub command line), and the no-ack-mode setup is appended to the packet
- stream. The packet stream is not yet executed, ready to have more expected
- packet entries added to it.
-
- For launch-inferior mode, the stub is first started, then no ack mode is
- setup on the expected packet stream, then the verified launch packets are added
- to the expected socket stream. The packet stream is not yet executed, ready
- to have more expected packet entries added to it.
-
- The return value is:
- {inferior:<inferior>, server:<server>}
- """
- inferior = None
- attach_pid = None
-
- if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
- # Launch the process that we'll use as the inferior.
- inferior = self.launch_process_for_attach(
- inferior_args=inferior_args,
- sleep_seconds=inferior_sleep_seconds,
- exe_path=inferior_exe_path)
- self.assertIsNotNone(inferior)
- self.assertTrue(inferior.pid > 0)
- if self._inferior_startup == self._STARTUP_ATTACH:
- # In this case, we want the stub to attach via the command
- # line, so set the command line attach pid here.
- attach_pid = inferior.pid
-
- if self._inferior_startup == self._STARTUP_LAUNCH:
- # Build launch args
- if not inferior_exe_path:
- inferior_exe_path = self.getBuildArtifact("a.out")
-
- if lldb.remote_platform:
- remote_path = lldbutil.append_to_process_working_directory(self,
- os.path.basename(inferior_exe_path))
- remote_file_spec = lldb.SBFileSpec(remote_path, False)
- err = lldb.remote_platform.Install(lldb.SBFileSpec(
- inferior_exe_path, True), remote_file_spec)
- if err.Fail():
- raise Exception(
- "remote_platform.Install('%s', '%s') failed: %s" %
- (inferior_exe_path, remote_path, err))
- inferior_exe_path = remote_path
-
- launch_args = [inferior_exe_path]
- if inferior_args:
- launch_args.extend(inferior_args)
-
- # Launch the debug monitor stub, attaching to the inferior.
- server = self.connect_to_debug_monitor(attach_pid=attach_pid)
- self.assertIsNotNone(server)
-
- # Build the expected protocol stream
- self.add_no_ack_remote_stream()
- if self._inferior_startup == self._STARTUP_LAUNCH:
- self.add_verified_launch_packets(launch_args)
-
- return {"inferior": inferior, "server": server}
-
- def expect_socket_recv(
- self,
- sock,
- expected_content_regex,
- timeout_seconds):
- response = ""
- timeout_time = time.time() + timeout_seconds
-
- while not expected_content_regex.match(
- response) and time.time() < timeout_time:
- can_read, _, _ = select.select([sock], [], [], timeout_seconds)
- if can_read and sock in can_read:
- recv_bytes = sock.recv(4096)
- if recv_bytes:
- response += seven.bitcast_to_string(recv_bytes)
-
- self.assertTrue(expected_content_regex.match(response))
-
- def expect_socket_send(self, sock, content, timeout_seconds):
- request_bytes_remaining = content
- timeout_time = time.time() + timeout_seconds
-
- while len(request_bytes_remaining) > 0 and time.time() < timeout_time:
- _, can_write, _ = select.select([], [sock], [], timeout_seconds)
- if can_write and sock in can_write:
- written_byte_count = sock.send(request_bytes_remaining.encode())
- request_bytes_remaining = request_bytes_remaining[
- written_byte_count:]
- self.assertEqual(len(request_bytes_remaining), 0)
-
- def do_handshake(self, stub_socket, timeout_seconds=5):
- # Write the ack.
- self.expect_socket_send(stub_socket, "+", timeout_seconds)
-
- # Send the start no ack mode packet.
- NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0"
- bytes_sent = stub_socket.send(NO_ACK_MODE_REQUEST.encode())
- self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST))
-
- # Receive the ack and "OK"
- self.expect_socket_recv(stub_socket, re.compile(
- r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds)
-
- # Send the final ack.
- self.expect_socket_send(stub_socket, "+", timeout_seconds)
-
- def add_no_ack_remote_stream(self):
- self.test_sequence.add_log_lines(
- ["read packet: +",
- "read packet: $QStartNoAckMode#b0",
- "send packet: +",
- "send packet: $OK#9a",
- "read packet: +"],
- True)
-
- def add_verified_launch_packets(self, launch_args):
- self.test_sequence.add_log_lines(
- ["read packet: %s" % build_gdbremote_A_packet(launch_args),
- "send packet: $OK#00",
- "read packet: $qLaunchSuccess#a5",
- "send packet: $OK#00"],
- True)
-
- def add_thread_suffix_request_packets(self):
- self.test_sequence.add_log_lines(
- ["read packet: $QThreadSuffixSupported#e4",
- "send packet: $OK#00",
- ], True)
-
- def add_process_info_collection_packets(self):
- self.test_sequence.add_log_lines(
- ["read packet: $qProcessInfo#dc",
- {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}],
- True)
-
- _KNOWN_PROCESS_INFO_KEYS = [
- "pid",
- "parent-pid",
- "real-uid",
- "real-gid",
- "effective-uid",
- "effective-gid",
- "cputype",
- "cpusubtype",
- "ostype",
- "triple",
- "vendor",
- "endian",
- "elf_abi",
- "ptrsize"
- ]
-
- def parse_process_info_response(self, context):
- # Ensure we have a process info response.
- self.assertIsNotNone(context)
- process_info_raw = context.get("process_info_raw")
- self.assertIsNotNone(process_info_raw)
-
- # Pull out key:value; pairs.
- process_info_dict = {
- match.group(1): match.group(2) for match in re.finditer(
- r"([^:]+):([^;]+);", process_info_raw)}
-
- # Validate keys are known.
- for (key, val) in list(process_info_dict.items()):
- self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
- self.assertIsNotNone(val)
-
- return process_info_dict
-
- def add_register_info_collection_packets(self):
- self.test_sequence.add_log_lines(
- [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True,
- "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
- "save_key": "reg_info_responses"}],
- True)
-
- def parse_register_info_packets(self, context):
- """Return an array of register info dictionaries, one per register info."""
- reg_info_responses = context.get("reg_info_responses")
- self.assertIsNotNone(reg_info_responses)
-
- # Parse register infos.
- return [parse_reg_info_response(reg_info_response)
- for reg_info_response in reg_info_responses]
-
- def expect_gdbremote_sequence(self, timeout_seconds=None):
- if not timeout_seconds:
- timeout_seconds = self._TIMEOUT_SECONDS
- return expect_lldb_gdbserver_replay(
- self,
- self.sock,
- self.test_sequence,
- self._pump_queues,
- timeout_seconds,
- self.logger)
-
- _KNOWN_REGINFO_KEYS = [
- "name",
- "alt-name",
- "bitsize",
- "offset",
- "encoding",
- "format",
- "set",
- "gcc",
- "ehframe",
- "dwarf",
- "generic",
- "container-regs",
- "invalidate-regs",
- "dynamic_size_dwarf_expr_bytes",
- "dynamic_size_dwarf_len"
- ]
-
- def assert_valid_reg_info(self, reg_info):
- # Assert we know about all the reginfo keys parsed.
- for key in reg_info:
- self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
-
- # Check the bare-minimum expected set of register info keys.
- self.assertTrue("name" in reg_info)
- self.assertTrue("bitsize" in reg_info)
- self.assertTrue("offset" in reg_info)
- self.assertTrue("encoding" in reg_info)
- self.assertTrue("format" in reg_info)
-
- def find_pc_reg_info(self, reg_infos):
- lldb_reg_index = 0
- for reg_info in reg_infos:
- if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
- return (lldb_reg_index, reg_info)
- lldb_reg_index += 1
-
- return (None, None)
-
- def add_lldb_register_index(self, reg_infos):
- """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
-
- We'll use this when we want to call packets like P/p with a register index but do so
- on only a subset of the full register info set.
- """
- self.assertIsNotNone(reg_infos)
-
- reg_index = 0
- for reg_info in reg_infos:
- reg_info["lldb_register_index"] = reg_index
- reg_index += 1
-
- def add_query_memory_region_packets(self, address):
- self.test_sequence.add_log_lines(
- ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
- {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}],
- True)
-
- def parse_key_val_dict(self, key_val_text, allow_dupes=True):
- self.assertIsNotNone(key_val_text)
- kv_dict = {}
- for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
- key = match.group(1)
- val = match.group(2)
- if key in kv_dict:
- if allow_dupes:
- if isinstance(kv_dict[key], list):
- kv_dict[key].append(val)
- else:
- # Promote to list
- kv_dict[key] = [kv_dict[key], val]
- else:
- self.fail(
- "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
- key, val, key_val_text, kv_dict))
- else:
- kv_dict[key] = val
- return kv_dict
-
- def parse_memory_region_packet(self, context):
- # Ensure we have a context.
- self.assertIsNotNone(context.get("memory_region_response"))
-
- # Pull out key:value; pairs.
- mem_region_dict = self.parse_key_val_dict(
- context.get("memory_region_response"))
-
- # Validate keys are known.
- for (key, val) in list(mem_region_dict.items()):
- self.assertTrue(
- key in [
- "start",
- "size",
- "permissions",
- "name",
- "error"])
- self.assertIsNotNone(val)
-
- # Return the dictionary of key-value pairs for the memory region.
- return mem_region_dict
-
- def assert_address_within_memory_region(
- self, test_address, mem_region_dict):
- self.assertIsNotNone(mem_region_dict)
- self.assertTrue("start" in mem_region_dict)
- self.assertTrue("size" in mem_region_dict)
-
- range_start = int(mem_region_dict["start"], 16)
- range_size = int(mem_region_dict["size"], 16)
- range_end = range_start + range_size
-
- if test_address < range_start:
- self.fail(
- "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
- test_address,
- range_start,
- range_end,
- range_size))
- elif test_address >= range_end:
- self.fail(
- "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
- test_address,
- range_start,
- range_end,
- range_size))
-
- def add_threadinfo_collection_packets(self):
- self.test_sequence.add_log_lines(
- [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo",
- "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
- "save_key": "threadinfo_responses"}],
- True)
-
- def parse_threadinfo_packets(self, context):
- """Return an array of thread ids (decimal ints), one per thread."""
- threadinfo_responses = context.get("threadinfo_responses")
- self.assertIsNotNone(threadinfo_responses)
-
- thread_ids = []
- for threadinfo_response in threadinfo_responses:
- new_thread_infos = parse_threadinfo_response(threadinfo_response)
- thread_ids.extend(new_thread_infos)
- return thread_ids
-
- def wait_for_thread_count(self, thread_count, timeout_seconds=3):
- start_time = time.time()
- timeout_time = start_time + timeout_seconds
-
- actual_thread_count = 0
- while actual_thread_count < thread_count:
- self.reset_test_sequence()
- self.add_threadinfo_collection_packets()
-
- context = self.expect_gdbremote_sequence()
- self.assertIsNotNone(context)
-
- threads = self.parse_threadinfo_packets(context)
- self.assertIsNotNone(threads)
-
- actual_thread_count = len(threads)
-
- if time.time() > timeout_time:
- raise Exception(
- 'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
- timeout_seconds, thread_count, actual_thread_count))
-
- return threads
-
- def add_set_breakpoint_packets(
- self,
- address,
- z_packet_type=0,
- do_continue=True,
- breakpoint_kind=1):
- self.test_sequence.add_log_lines(
- [ # Set the breakpoint.
- "read packet: $Z{2},{0:x},{1}#00".format(
- address, breakpoint_kind, z_packet_type),
- # Verify the stub could set it.
- "send packet: $OK#00",
- ], True)
-
- if (do_continue):
- self.test_sequence.add_log_lines(
- [ # Continue the inferior.
- "read packet: $c#63",
- # Expect a breakpoint stop report.
- {"direction": "send",
- "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
- "capture": {1: "stop_signo",
- 2: "stop_thread_id"}},
- ], True)
-
- def add_remove_breakpoint_packets(
- self,
- address,
- z_packet_type=0,
- breakpoint_kind=1):
- self.test_sequence.add_log_lines(
- [ # Remove the breakpoint.
- "read packet: $z{2},{0:x},{1}#00".format(
- address, breakpoint_kind, z_packet_type),
- # Verify the stub could unset it.
- "send packet: $OK#00",
- ], True)
-
- def add_qSupported_packets(self):
- self.test_sequence.add_log_lines(
- ["read packet: $qSupported#00",
- {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}},
- ], True)
-
- _KNOWN_QSUPPORTED_STUB_FEATURES = [
- "augmented-libraries-svr4-read",
- "PacketSize",
- "QStartNoAckMode",
- "QThreadSuffixSupported",
- "QListThreadsInStopReply",
- "qXfer:auxv:read",
- "qXfer:libraries:read",
- "qXfer:libraries-svr4:read",
- "qXfer:features:read",
- "qEcho",
- "QPassSignals"
- ]
-
- def parse_qSupported_response(self, context):
- self.assertIsNotNone(context)
-
- raw_response = context.get("qSupported_response")
- self.assertIsNotNone(raw_response)
-
- # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the
- # +,-,? is stripped from the key and set as the value.
- supported_dict = {}
- for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
- key = match.group(1)
- val = match.group(3)
-
- # key=val: store as is
- if val and len(val) > 0:
- supported_dict[key] = val
- else:
- if len(key) < 2:
- raise Exception(
- "singular stub feature is too short: must be stub_feature{+,-,?}")
- supported_type = key[-1]
- key = key[:-1]
- if not supported_type in ["+", "-", "?"]:
- raise Exception(
- "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
- supported_dict[key] = supported_type
- # Ensure we know the supported element
- if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES:
- raise Exception(
- "unknown qSupported stub feature reported: %s" %
- key)
-
- return supported_dict
-
- def run_process_then_stop(self, run_seconds=1):
- # Tell the stub to continue.
- self.test_sequence.add_log_lines(
- ["read packet: $vCont;c#a8"],
- True)
- context = self.expect_gdbremote_sequence()
-
- # Wait for run_seconds.
- time.sleep(run_seconds)
-
- # Send an interrupt, capture a T response.
- self.reset_test_sequence()
- self.test_sequence.add_log_lines(
- ["read packet: {}".format(chr(3)),
- {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}],
- True)
- context = self.expect_gdbremote_sequence()
- self.assertIsNotNone(context)
- self.assertIsNotNone(context.get("stop_result"))
-
- return context
-
- def select_modifiable_register(self, reg_infos):
- """Find a register that can be read/written freely."""
- PREFERRED_REGISTER_NAMES = set(["rax", ])
-
- # First check for the first register from the preferred register name
- # set.
- alternative_register_index = None
-
- self.assertIsNotNone(reg_infos)
- for reg_info in reg_infos:
- if ("name" in reg_info) and (
- reg_info["name"] in PREFERRED_REGISTER_NAMES):
- # We found a preferred register. Use it.
- return reg_info["lldb_register_index"]
- if ("generic" in reg_info) and (reg_info["generic"] == "fp" or
- reg_info["generic"] == "arg1"):
- # A frame pointer or first arg register will do as a
- # register to modify temporarily.
- alternative_register_index = reg_info["lldb_register_index"]
-
- # We didn't find a preferred register. Return whatever alternative register
- # we found, if any.
- return alternative_register_index
-
- def extract_registers_from_stop_notification(self, stop_key_vals_text):
- self.assertIsNotNone(stop_key_vals_text)
- kv_dict = self.parse_key_val_dict(stop_key_vals_text)
-
- registers = {}
- for (key, val) in list(kv_dict.items()):
- if re.match(r"^[0-9a-fA-F]+$", key):
- registers[int(key, 16)] = val
- return registers
-
- def gather_register_infos(self):
- self.reset_test_sequence()
- self.add_register_info_collection_packets()
-
- context = self.expect_gdbremote_sequence()
- self.assertIsNotNone(context)
-
- reg_infos = self.parse_register_info_packets(context)
- self.assertIsNotNone(reg_infos)
- self.add_lldb_register_index(reg_infos)
-
- return reg_infos
-
- def find_generic_register_with_name(self, reg_infos, generic_name):
- self.assertIsNotNone(reg_infos)
- for reg_info in reg_infos:
- if ("generic" in reg_info) and (
- reg_info["generic"] == generic_name):
- return reg_info
- return None
-
- def decode_gdbremote_binary(self, encoded_bytes):
- decoded_bytes = ""
- i = 0
- while i < len(encoded_bytes):
- if encoded_bytes[i] == "}":
- # Handle escaped char.
- self.assertTrue(i + 1 < len(encoded_bytes))
- decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20)
- i += 2
- elif encoded_bytes[i] == "*":
- # Handle run length encoding.
- self.assertTrue(len(decoded_bytes) > 0)
- self.assertTrue(i + 1 < len(encoded_bytes))
- repeat_count = ord(encoded_bytes[i + 1]) - 29
- decoded_bytes += decoded_bytes[-1] * repeat_count
- i += 2
- else:
- decoded_bytes += encoded_bytes[i]
- i += 1
- return decoded_bytes
-
- def build_auxv_dict(self, endian, word_size, auxv_data):
- self.assertIsNotNone(endian)
- self.assertIsNotNone(word_size)
- self.assertIsNotNone(auxv_data)
-
- auxv_dict = {}
-
- # PowerPC64le's auxvec has a special key that must be ignored.
- # This special key may be used multiple times, resulting in
- # multiple key/value pairs with the same key, which would otherwise
- # break this test check for repeated keys.
- #
- # AT_IGNOREPPC = 22
- ignored_keys_for_arch = { 'powerpc64le' : [22] }
- arch = self.getArchitecture()
- ignore_keys = None
- if arch in ignored_keys_for_arch:
- ignore_keys = ignored_keys_for_arch[arch]
-
- while len(auxv_data) > 0:
- # Chop off key.
- raw_key = auxv_data[:word_size]
- auxv_data = auxv_data[word_size:]
-
- # Chop of value.
- raw_value = auxv_data[:word_size]
- auxv_data = auxv_data[word_size:]
-
- # Convert raw text from target endian.
- key = unpack_endian_binary_string(endian, raw_key)
- value = unpack_endian_binary_string(endian, raw_value)
-
- if ignore_keys and key in ignore_keys:
- continue
-
- # Handle ending entry.
- if key == 0:
- self.assertEqual(value, 0)
- return auxv_dict
-
- # The key should not already be present.
- self.assertFalse(key in auxv_dict)
- auxv_dict[key] = value
-
- self.fail(
- "should not reach here - implies required double zero entry not found")
- return auxv_dict
-
- def read_binary_data_in_chunks(self, command_prefix, chunk_length):
- """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
- offset = 0
- done = False
- decoded_data = ""
-
- while not done:
- # Grab the next iteration of data.
- self.reset_test_sequence()
- self.test_sequence.add_log_lines(
- [
- "read packet: ${}{:x},{:x}:#00".format(
- command_prefix,
- offset,
- chunk_length),
- {
- "direction": "send",
- "regex": re.compile(
- r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
- re.MULTILINE | re.DOTALL),
- "capture": {
- 1: "response_type",
- 2: "content_raw"}}],
- True)
-
- context = self.expect_gdbremote_sequence()
- self.assertIsNotNone(context)
-
- response_type = context.get("response_type")
- self.assertIsNotNone(response_type)
- self.assertTrue(response_type in ["l", "m"])
-
- # Move offset along.
- offset += chunk_length
-
- # Figure out if we're done. We're done if the response type is l.
- done = response_type == "l"
-
- # Decode binary data.
- content_raw = context.get("content_raw")
- if content_raw and len(content_raw) > 0:
- self.assertIsNotNone(content_raw)
- decoded_data += self.decode_gdbremote_binary(content_raw)
- return decoded_data
-
- def add_interrupt_packets(self):
- self.test_sequence.add_log_lines([
- # Send the intterupt.
- "read packet: {}".format(chr(3)),
- # And wait for the stop notification.
- {"direction": "send",
- "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
- "capture": {1: "stop_signo",
- 2: "stop_key_val_text"}},
- ], True)
-
- def parse_interrupt_packets(self, context):
- self.assertIsNotNone(context.get("stop_signo"))
- self.assertIsNotNone(context.get("stop_key_val_text"))
- return (int(context["stop_signo"], 16), self.parse_key_val_dict(
- context["stop_key_val_text"]))
-
- def add_QSaveRegisterState_packets(self, thread_id):
- if thread_id:
- # Use the thread suffix form.
- request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(
- thread_id)
- else:
- request = "read packet: $QSaveRegisterState#00"
-
- self.test_sequence.add_log_lines([request,
- {"direction": "send",
- "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$",
- "capture": {1: "save_response"}},
- ],
- True)
-
- def parse_QSaveRegisterState_response(self, context):
- self.assertIsNotNone(context)
-
- save_response = context.get("save_response")
- self.assertIsNotNone(save_response)
-
- if len(save_response) < 1 or save_response[0] == "E":
- # error received
- return (False, None)
- else:
- return (True, int(save_response))
-
- def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
- if thread_id:
- # Use the thread suffix form.
- request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
- save_id, thread_id)
- else:
- request = "read packet: $QRestoreRegisterState:{}#00".format(
- save_id)
-
- self.test_sequence.add_log_lines([
- request,
- "send packet: $OK#00"
- ], True)
-
- def flip_all_bits_in_each_register_value(
- self, reg_infos, endian, thread_id=None):
- self.assertIsNotNone(reg_infos)
-
- successful_writes = 0
- failed_writes = 0
-
- for reg_info in reg_infos:
- # Use the lldb register index added to the reg info. We're not necessarily
- # working off a full set of register infos, so an inferred register
- # index could be wrong.
- reg_index = reg_info["lldb_register_index"]
- self.assertIsNotNone(reg_index)
-
- reg_byte_size = int(reg_info["bitsize"]) // 8
- self.assertTrue(reg_byte_size > 0)
-
- # Handle thread suffix.
- if thread_id:
- p_request = "read packet: $p{:x};thread:{:x}#00".format(
- reg_index, thread_id)
- else:
- p_request = "read packet: $p{:x}#00".format(reg_index)
-
- # Read the existing value.
- self.reset_test_sequence()
- self.test_sequence.add_log_lines([
- p_request,
- {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
- ], True)
- context = self.expect_gdbremote_sequence()
- self.assertIsNotNone(context)
-
- # Verify the response length.
- p_response = context.get("p_response")
- self.assertIsNotNone(p_response)
- initial_reg_value = unpack_register_hex_unsigned(
- endian, p_response)
-
- # Flip the value by xoring with all 1s
- all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8)
- flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
- # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
-
- # Handle thread suffix for P.
- if thread_id:
- P_request = "read packet: $P{:x}={};thread:{:x}#00".format(
- reg_index, pack_register_hex(
- endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
- else:
- P_request = "read packet: $P{:x}={}#00".format(
- reg_index, pack_register_hex(
- endian, flipped_bits_int, byte_size=reg_byte_size))
-
- # Write the flipped value to the register.
- self.reset_test_sequence()
- self.test_sequence.add_log_lines([P_request,
- {"direction": "send",
- "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
- "capture": {1: "P_response"}},
- ],
- True)
- context = self.expect_gdbremote_sequence()
- self.assertIsNotNone(context)
-
- # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail
- # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them
- # all flipping perfectly.
- P_response = context.get("P_response")
- self.assertIsNotNone(P_response)
- if P_response == "OK":
- successful_writes += 1
- else:
- failed_writes += 1
- # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
-
- # Read back the register value, ensure it matches the flipped
- # value.
- if P_response == "OK":
- self.reset_test_sequence()
- self.test_sequence.add_log_lines([
- p_request,
- {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
- ], True)
- context = self.expect_gdbremote_sequence()
- self.assertIsNotNone(context)
-
- verify_p_response_raw = context.get("p_response")
- self.assertIsNotNone(verify_p_response_raw)
- verify_bits = unpack_register_hex_unsigned(
- endian, verify_p_response_raw)
-
- if verify_bits != flipped_bits_int:
- # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts.
- # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
- successful_writes -= 1
- failed_writes += 1
-
- return (successful_writes, failed_writes)
-
- def is_bit_flippable_register(self, reg_info):
- if not reg_info:
- return False
- if not "set" in reg_info:
- return False
- if reg_info["set"] != "General Purpose Registers":
- return False
- if ("container-regs" in reg_info) and (
- len(reg_info["container-regs"]) > 0):
- # Don't try to bit flip registers contained in another register.
- return False
- if re.match("^.s$", reg_info["name"]):
- # This is a 2-letter register name that ends in "s", like a segment register.
- # Don't try to bit flip these.
- return False
- if re.match("^(c|)psr$", reg_info["name"]):
- # This is an ARM program status register; don't flip it.
- return False
- # Okay, this looks fine-enough.
- return True
-
- def read_register_values(self, reg_infos, endian, thread_id=None):
- self.assertIsNotNone(reg_infos)
- values = {}
-
- for reg_info in reg_infos:
- # We append a register index when load reg infos so we can work
- # with subsets.
- reg_index = reg_info.get("lldb_register_index")
- self.assertIsNotNone(reg_index)
-
- # Handle thread suffix.
- if thread_id:
- p_request = "read packet: $p{:x};thread:{:x}#00".format(
- reg_index, thread_id)
- else:
- p_request = "read packet: $p{:x}#00".format(reg_index)
-
- # Read it with p.
- self.reset_test_sequence()
- self.test_sequence.add_log_lines([
- p_request,
- {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
- ], True)
- context = self.expect_gdbremote_sequence()
- self.assertIsNotNone(context)
-
- # Convert value from target endian to integral.
- p_response = context.get("p_response")
- self.assertIsNotNone(p_response)
- self.assertTrue(len(p_response) > 0)
- self.assertFalse(p_response[0] == "E")
-
- values[reg_index] = unpack_register_hex_unsigned(
- endian, p_response)
-
- return values
-
- def add_vCont_query_packets(self):
- self.test_sequence.add_log_lines(["read packet: $vCont?#49",
- {"direction": "send",
- "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
- "capture": {2: "vCont_query_response"}},
- ],
- True)
-
- def parse_vCont_query_response(self, context):
- self.assertIsNotNone(context)
- vCont_query_response = context.get("vCont_query_response")
-
- # Handle case of no vCont support at all - in which case the capture
- # group will be none or zero length.
- if not vCont_query_response or len(vCont_query_response) == 0:
- return {}
-
- return {key: 1 for key in vCont_query_response.split(
- ";") if key and len(key) > 0}
-
- def count_single_steps_until_true(
- self,
- thread_id,
- predicate,
- args,
- max_step_count=100,
- use_Hc_packet=True,
- step_instruction="s"):
- """Used by single step test that appears in a few different contexts."""
- single_step_count = 0
-
- while single_step_count < max_step_count:
- self.assertIsNotNone(thread_id)
-
- # Build the packet for the single step instruction. We replace
- # {thread}, if present, with the thread_id.
- step_packet = "read packet: ${}#00".format(
- re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
- # print("\nstep_packet created: {}\n".format(step_packet))
-
- # Single step.
- self.reset_test_sequence()
- if use_Hc_packet:
- self.test_sequence.add_log_lines(
- [ # Set the continue thread.
- "read packet: $Hc{0:x}#00".format(thread_id),
- "send packet: $OK#00",
- ], True)
- self.test_sequence.add_log_lines([
- # Single step.
- step_packet,
- # "read packet: $vCont;s:{0:x}#00".format(thread_id),
- # Expect a breakpoint stop report.
- {"direction": "send",
- "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
- "capture": {1: "stop_signo",
- 2: "stop_thread_id"}},
- ], True)
- context = self.expect_gdbremote_sequence()
- self.assertIsNotNone(context)
- self.assertIsNotNone(context.get("stop_signo"))
- self.assertEqual(int(context.get("stop_signo"), 16),
- lldbutil.get_signal_number('SIGTRAP'))
-
- single_step_count += 1
-
- # See if the predicate is true. If so, we're done.
- if predicate(args):
- return (True, single_step_count)
-
- # The predicate didn't return true within the runaway step count.
- return (False, single_step_count)
-
- def g_c1_c2_contents_are(self, args):
- """Used by single step test that appears in a few different contexts."""
- g_c1_address = args["g_c1_address"]
- g_c2_address = args["g_c2_address"]
- expected_g_c1 = args["expected_g_c1"]
- expected_g_c2 = args["expected_g_c2"]
-
- # Read g_c1 and g_c2 contents.
- self.reset_test_sequence()
- self.test_sequence.add_log_lines(
- ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
- {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}},
- "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
- {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}],
- True)
-
- # Run the packet stream.
- context = self.expect_gdbremote_sequence()
- self.assertIsNotNone(context)
-
- # Check if what we read from inferior memory is what we are expecting.
- self.assertIsNotNone(context.get("g_c1_contents"))
- self.assertIsNotNone(context.get("g_c2_contents"))
-
- return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and (
- seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2)
-
- def single_step_only_steps_one_instruction(
- self, use_Hc_packet=True, step_instruction="s"):
- """Used by single step test that appears in a few different contexts."""
- # Start up the inferior.
- procs = self.prep_debug_monitor_and_inferior(
- inferior_args=[
- "get-code-address-hex:swap_chars",
- "get-data-address-hex:g_c1",
- "get-data-address-hex:g_c2",
- "sleep:1",
- "call-function:swap_chars",
- "sleep:5"])
-
- # Run the process
- self.test_sequence.add_log_lines(
- [ # Start running after initial stop.
- "read packet: $c#63",
- # Match output line that prints the memory address of the function call entry point.
- # Note we require launch-only testing so we can get inferior otuput.
- {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
- "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}},
- # Now stop the inferior.
- "read packet: {}".format(chr(3)),
- # And wait for the stop notification.
- {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
- True)
-
- # Run the packet stream.
- context = self.expect_gdbremote_sequence()
- self.assertIsNotNone(context)
-
- # Grab the main thread id.
- self.assertIsNotNone(context.get("stop_thread_id"))
- main_thread_id = int(context.get("stop_thread_id"), 16)
-
- # Grab the function address.
- self.assertIsNotNone(context.get("function_address"))
- function_address = int(context.get("function_address"), 16)
-
- # Grab the data addresses.
- self.assertIsNotNone(context.get("g_c1_address"))
- g_c1_address = int(context.get("g_c1_address"), 16)
-
- self.assertIsNotNone(context.get("g_c2_address"))
- g_c2_address = int(context.get("g_c2_address"), 16)
-
- # Set a breakpoint at the given address.
- if self.getArchitecture() == "arm":
- # TODO: Handle case when setting breakpoint in thumb code
- BREAKPOINT_KIND = 4
- else:
- BREAKPOINT_KIND = 1
- self.reset_test_sequence()
- self.add_set_breakpoint_packets(
- function_address,
- do_continue=True,
- breakpoint_kind=BREAKPOINT_KIND)
- context = self.expect_gdbremote_sequence()
- self.assertIsNotNone(context)
-
- # Remove the breakpoint.
- self.reset_test_sequence()
- self.add_remove_breakpoint_packets(
- function_address, breakpoint_kind=BREAKPOINT_KIND)
- context = self.expect_gdbremote_sequence()
- self.assertIsNotNone(context)
-
- # Verify g_c1 and g_c2 match expected initial state.
- args = {}
- args["g_c1_address"] = g_c1_address
- args["g_c2_address"] = g_c2_address
- args["expected_g_c1"] = "0"
- args["expected_g_c2"] = "1"
-
- self.assertTrue(self.g_c1_c2_contents_are(args))
-
- # Verify we take only a small number of steps to hit the first state.
- # Might need to work through function entry prologue code.
- args["expected_g_c1"] = "1"
- args["expected_g_c2"] = "1"
- (state_reached,
- step_count) = self.count_single_steps_until_true(main_thread_id,
- self.g_c1_c2_contents_are,
- args,
- max_step_count=25,
- use_Hc_packet=use_Hc_packet,
- step_instruction=step_instruction)
- self.assertTrue(state_reached)
-
- # Verify we hit the next state.
- args["expected_g_c1"] = "1"
- args["expected_g_c2"] = "0"
- (state_reached,
- step_count) = self.count_single_steps_until_true(main_thread_id,
- self.g_c1_c2_contents_are,
- args,
- max_step_count=5,
- use_Hc_packet=use_Hc_packet,
- step_instruction=step_instruction)
- self.assertTrue(state_reached)
- expected_step_count = 1
- arch = self.getArchitecture()
-
- # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
- # of variable value
- if re.match("mips", arch):
- expected_step_count = 3
- # S390X requires "2" (LARL, MVI) machine instructions for updation of
- # variable value
- if re.match("s390x", arch):
- expected_step_count = 2
- self.assertEqual(step_count, expected_step_count)
-
- # Verify we hit the next state.
- args["expected_g_c1"] = "0"
- args["expected_g_c2"] = "0"
- (state_reached,
- step_count) = self.count_single_steps_until_true(main_thread_id,
- self.g_c1_c2_contents_are,
- args,
- max_step_count=5,
- use_Hc_packet=use_Hc_packet,
- step_instruction=step_instruction)
- self.assertTrue(state_reached)
- self.assertEqual(step_count, expected_step_count)
-
- # Verify we hit the next state.
- args["expected_g_c1"] = "0"
- args["expected_g_c2"] = "1"
- (state_reached,
- step_count) = self.count_single_steps_until_true(main_thread_id,
- self.g_c1_c2_contents_are,
- args,
- max_step_count=5,
- use_Hc_packet=use_Hc_packet,
- step_instruction=step_instruction)
- self.assertTrue(state_reached)
- self.assertEqual(step_count, expected_step_count)
-
- def maybe_strict_output_regex(self, regex):
- return '.*' + regex + \
- '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$'
-
- def install_and_create_launch_args(self):
- exe_path = self.getBuildArtifact("a.out")
- if not lldb.remote_platform:
- return [exe_path]
- remote_path = lldbutil.append_to_process_working_directory(self,
- os.path.basename(exe_path))
- remote_file_spec = lldb.SBFileSpec(remote_path, False)
- err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True),
- remote_file_spec)
- if err.Fail():
- raise Exception("remote_platform.Install('%s', '%s') failed: %s" %
- (exe_path, remote_path, err))
- return [remote_path]
OpenPOWER on IntegriCloud