summaryrefslogtreecommitdiffstats
path: root/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
diff options
context:
space:
mode:
authorKate Stone <katherine.stone@apple.com>2016-09-06 20:57:50 +0000
committerKate Stone <katherine.stone@apple.com>2016-09-06 20:57:50 +0000
commitb9c1b51e45b845debb76d8658edabca70ca56079 (patch)
treedfcb5a13ef2b014202340f47036da383eaee74aa /lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
parentd5aa73376966339caad04013510626ec2e42c760 (diff)
downloadbcm5719-llvm-b9c1b51e45b845debb76d8658edabca70ca56079.tar.gz
bcm5719-llvm-b9c1b51e45b845debb76d8658edabca70ca56079.zip
*** This commit represents a complete reformatting of the LLDB source code
*** to conform to clang-format’s LLVM style. This kind of mass change has *** two obvious implications: Firstly, merging this particular commit into a downstream fork may be a huge effort. Alternatively, it may be worth merging all changes up to this commit, performing the same reformatting operation locally, and then discarding the merge for this particular commit. The commands used to accomplish this reformatting were as follows (with current working directory as the root of the repository): find . \( -iname "*.c" -or -iname "*.cpp" -or -iname "*.h" -or -iname "*.mm" \) -exec clang-format -i {} + find . -iname "*.py" -exec autopep8 --in-place --aggressive --aggressive {} + ; The version of clang-format used was 3.9.0, and autopep8 was 1.2.4. Secondly, “blame” style tools will generally point to this commit instead of a meaningful prior commit. There are alternatives available that will attempt to look through this change and find the appropriate prior commit. YMMV. llvm-svn: 280751
Diffstat (limited to 'lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py')
-rw-r--r--lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py665
1 files changed, 446 insertions, 219 deletions
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
index 12c1033cba1..7fc25807435 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
@@ -5,7 +5,6 @@ Base class for gdb-remote test cases.
from __future__ import print_function
-
import errno
import os
import os.path
@@ -24,9 +23,11 @@ from lldbsuite.test.lldbtest import *
from lldbgdbserverutils import *
import logging
+
class _ConnectionRefused(IOError):
pass
+
class GdbRemoteTestCaseBase(TestBase):
NO_DEBUG_INFO_TESTCASE = True
@@ -35,29 +36,34 @@ class GdbRemoteTestCaseBase(TestBase):
_GDBREMOTE_KILL_PACKET = "$k#6b"
- # Start the inferior separately, attach to the inferior on the stub command line.
+ # Start the inferior separately, attach to the inferior on the stub
+ # command line.
_STARTUP_ATTACH = "attach"
- # Start the inferior separately, start the stub without attaching, allow the test to attach to the inferior however it wants (e.g. $vAttach;pid).
+ # Start the inferior separately, start the stub without attaching, allow
+ # the test to attach to the inferior however it wants (e.g. $vAttach;pid).
_STARTUP_ATTACH_MANUALLY = "attach_manually"
- # Start the stub, and launch the inferior with an $A packet via the initial packet stream.
+ # Start the stub, and launch the inferior with an $A packet via the
+ # initial packet stream.
_STARTUP_LAUNCH = "launch"
- # GDB Signal numbers that are not target-specific used for common exceptions
- TARGET_EXC_BAD_ACCESS = 0x91
+ # GDB Signal numbers that are not target-specific used for common
+ # exceptions
+ TARGET_EXC_BAD_ACCESS = 0x91
TARGET_EXC_BAD_INSTRUCTION = 0x92
- TARGET_EXC_ARITHMETIC = 0x93
- TARGET_EXC_EMULATION = 0x94
- TARGET_EXC_SOFTWARE = 0x95
- TARGET_EXC_BREAKPOINT = 0x96
+ TARGET_EXC_ARITHMETIC = 0x93
+ TARGET_EXC_EMULATION = 0x94
+ TARGET_EXC_SOFTWARE = 0x95
+ TARGET_EXC_BREAKPOINT = 0x96
_verbose_log_handler = None
- _log_formatter = logging.Formatter(fmt='%(asctime)-15s %(levelname)-8s %(message)s')
+ _log_formatter = logging.Formatter(
+ fmt='%(asctime)-15s %(levelname)-8s %(message)s')
def setUpBaseLogging(self):
self.logger = logging.getLogger(__name__)
if len(self.logger.handlers) > 0:
- return # We have set up this handler already
+ return # We have set up this handler already
self.logger.propagate = False
self.logger.setLevel(logging.DEBUG)
@@ -68,11 +74,11 @@ class GdbRemoteTestCaseBase(TestBase):
handler.setFormatter(self._log_formatter)
self.logger.addHandler(handler)
-
def isVerboseLoggingRequested(self):
# We will report our detailed logs if the user requested that the "gdb-remote" channel is
# logged.
- return any(("gdb-remote" in channel) for channel in lldbtest_config.channels)
+ return any(("gdb-remote" in channel)
+ for channel in lldbtest_config.channels)
def setUp(self):
TestBase.setUp(self)
@@ -83,7 +89,8 @@ class GdbRemoteTestCaseBase(TestBase):
if self.isVerboseLoggingRequested():
# If requested, full logs go to a log file
- self._verbose_log_handler = logging.FileHandler(self.log_basename + "-host.log")
+ self._verbose_log_handler = logging.FileHandler(
+ self.log_basename + "-host.log")
self._verbose_log_handler.setFormatter(self._log_formatter)
self._verbose_log_handler.setLevel(logging.DEBUG)
self.logger.addHandler(self._verbose_log_handler)
@@ -100,7 +107,8 @@ class GdbRemoteTestCaseBase(TestBase):
url_pattern = '(.+)://\[?(.+?)\]?/.*'
else:
url_pattern = '(.+)://(.+):\d+'
- scheme, host = re.match(url_pattern, configuration.lldb_platform_url).groups()
+ scheme, host = re.match(
+ url_pattern, configuration.lldb_platform_url).groups()
if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
self.stub_device = host
self.stub_hostname = 'localhost'
@@ -122,21 +130,24 @@ class GdbRemoteTestCaseBase(TestBase):
def setUpServerLogging(self, is_llgs):
if len(lldbtest_config.channels) == 0:
- return # No logging requested
+ return # No logging requested
if lldb.remote_platform:
- log_file = lldbutil.join_remote_paths(lldb.remote_platform.GetWorkingDirectory(), "server.log")
+ log_file = lldbutil.join_remote_paths(
+ lldb.remote_platform.GetWorkingDirectory(), "server.log")
else:
log_file = self.getLocalServerLogFile()
if is_llgs:
self.debug_monitor_extra_args.append("--log-file=" + log_file)
- self.debug_monitor_extra_args.append("--log-channels={}".format(":".join(lldbtest_config.channels)))
+ self.debug_monitor_extra_args.append(
+ "--log-channels={}".format(":".join(lldbtest_config.channels)))
else:
- self.debug_monitor_extra_args = ["--log-file=" + log_file, "--log-flags=0x800000"]
+ self.debug_monitor_extra_args = [
+ "--log-file=" + log_file, "--log-flags=0x800000"]
def get_next_port(self):
- return 12000 + random.randint(0,3999)
+ return 12000 + random.randint(0, 3999)
def reset_test_sequence(self):
self.test_sequence = GdbRemoteTestSequence(self.logger)
@@ -149,7 +160,8 @@ class GdbRemoteTestCaseBase(TestBase):
# Create the named pipe.
os.mkfifo(named_pipe_path)
- # Open the read side of the pipe in non-blocking mode. This will return right away, ready or not.
+ # Open the read side of the pipe in non-blocking mode. This will
+ # return right away, ready or not.
named_pipe_fd = os.open(named_pipe_path, os.O_RDONLY | os.O_NONBLOCK)
# Create the file for the named pipe. Note this will follow semantics of
@@ -177,7 +189,9 @@ class GdbRemoteTestCaseBase(TestBase):
try:
os.rmdir(temp_dir)
except:
- print("failed to delete temp dir: {}, directory contents: '{}'".format(temp_dir, os.listdir(temp_dir)))
+ print(
+ "failed to delete temp dir: {}, directory contents: '{}'".format(
+ temp_dir, os.listdir(temp_dir)))
None
# Add the shutdown hook to clean up the named pipe.
@@ -190,14 +204,23 @@ class GdbRemoteTestCaseBase(TestBase):
def get_stub_port_from_named_socket(self, read_timeout_seconds=5):
# Wait for something to read with a max timeout.
- (ready_readers, _, _) = select.select([self.named_pipe_fd], [], [], read_timeout_seconds)
- self.assertIsNotNone(ready_readers, "write side of pipe has not written anything - stub isn't writing to pipe.")
- self.assertNotEqual(len(ready_readers), 0, "write side of pipe has not written anything - stub isn't writing to pipe.")
+ (ready_readers, _, _) = select.select(
+ [self.named_pipe_fd], [], [], read_timeout_seconds)
+ self.assertIsNotNone(
+ ready_readers,
+ "write side of pipe has not written anything - stub isn't writing to pipe.")
+ self.assertNotEqual(
+ len(ready_readers),
+ 0,
+ "write side of pipe has not written anything - stub isn't writing to pipe.")
# Read the port from the named pipe.
stub_port_raw = self.named_pipe.read()
self.assertIsNotNone(stub_port_raw)
- self.assertNotEqual(len(stub_port_raw), 0, "no content to read on pipe")
+ self.assertNotEqual(
+ len(stub_port_raw),
+ 0,
+ "no content to read on pipe")
# Trim null byte, convert to int.
stub_port_raw = stub_port_raw[:-1]
@@ -212,15 +235,24 @@ class GdbRemoteTestCaseBase(TestBase):
use_named_pipe = False
# Grab the ppid from /proc/[shell pid]/stat
- err, retcode, shell_stat = self.run_platform_command("cat /proc/$$/stat")
- self.assertTrue(err.Success() and retcode == 0,
- "Failed to read file /proc/$$/stat: %s, retcode: %d" % (err.GetCString(), retcode))
+ err, retcode, shell_stat = self.run_platform_command(
+ "cat /proc/$$/stat")
+ self.assertTrue(
+ err.Success() and retcode == 0,
+ "Failed to read file /proc/$$/stat: %s, retcode: %d" %
+ (err.GetCString(),
+ retcode))
# [pid] ([executable]) [state] [*ppid*]
pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
- err, retcode, ls_output = self.run_platform_command("ls -l /proc/%s/exe" % pid)
- self.assertTrue(err.Success() and retcode == 0,
- "Failed to read file /proc/%s/exe: %s, retcode: %d" % (pid, err.GetCString(), retcode))
+ err, retcode, ls_output = self.run_platform_command(
+ "ls -l /proc/%s/exe" % pid)
+ self.assertTrue(
+ err.Success() and retcode == 0,
+ "Failed to read file /proc/%s/exe: %s, retcode: %d" %
+ (pid,
+ err.GetCString(),
+ retcode))
exe = ls_output.split()[-1]
# If the binary has been deleted, the link name has " (deleted)" appended.
@@ -235,7 +267,8 @@ class GdbRemoteTestCaseBase(TestBase):
self.setUpServerLogging(is_llgs=True)
if use_named_pipe:
- (self.named_pipe_path, self.named_pipe, self.named_pipe_fd) = self.create_named_pipe()
+ (self.named_pipe_path, self.named_pipe,
+ self.named_pipe_fd) = self.create_named_pipe()
def init_debugserver_test(self, use_named_pipe=True):
self.debug_monitor_exe = get_debugserver_exe()
@@ -243,17 +276,19 @@ class GdbRemoteTestCaseBase(TestBase):
self.skipTest("debugserver exe not found")
self.setUpServerLogging(is_llgs=False)
if use_named_pipe:
- (self.named_pipe_path, self.named_pipe, self.named_pipe_fd) = self.create_named_pipe()
+ (self.named_pipe_path, self.named_pipe,
+ self.named_pipe_fd) = self.create_named_pipe()
# The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
# when the process truly dies.
self.stub_sends_two_stop_notifications_on_kill = True
def forward_adb_port(self, source, target, direction, device):
- adb = [ 'adb' ] + ([ '-s', device ] if device else []) + [ direction ]
+ adb = ['adb'] + (['-s', device] if device else []) + [direction]
+
def remove_port_forward():
- subprocess.call(adb + [ "--remove", "tcp:%d" % source])
+ subprocess.call(adb + ["--remove", "tcp:%d" % source])
- subprocess.call(adb + [ "tcp:%d" % source, "tcp:%d" % target])
+ subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target])
self.addTearDownHook(remove_port_forward)
def _verify_socket(self, sock):
@@ -265,12 +300,12 @@ class GdbRemoteTestCaseBase(TestBase):
# connection again.
triple = self.dbg.GetSelectedPlatform().GetTriple()
if not re.match(".*-.*-.*-android", triple):
- return # Not android.
+ return # Not android.
can_read, _, _ = select.select([sock], [], [], 0.1)
if sock not in can_read:
- return # Data is not available, but the connection is alive.
+ return # Data is not available, but the connection is alive.
if len(sock.recv(1, socket.MSG_PEEK)) == 0:
- raise _ConnectionRefused() # Got EOF, connection dropped.
+ raise _ConnectionRefused() # Got EOF, connection dropped.
def create_socket(self):
sock = socket.socket()
@@ -278,9 +313,16 @@ class GdbRemoteTestCaseBase(TestBase):
triple = self.dbg.GetSelectedPlatform().GetTriple()
if re.match(".*-.*-.*-android", triple):
- self.forward_adb_port(self.port, self.port, "forward", self.stub_device)
-
- logger.info("Connecting to debug monitor on %s:%d", self.stub_hostname, self.port)
+ self.forward_adb_port(
+ self.port,
+ self.port,
+ "forward",
+ self.stub_device)
+
+ logger.info(
+ "Connecting to debug monitor on %s:%d",
+ self.stub_hostname,
+ self.port)
connect_info = (self.stub_hostname, self.port)
try:
sock.connect(connect_info)
@@ -295,12 +337,16 @@ class GdbRemoteTestCaseBase(TestBase):
# send the kill packet so lldb-server shuts down gracefully
sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
except:
- logger.warning("failed to send kill packet to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
+ logger.warning(
+ "failed to send kill packet to debug monitor: {}; ignoring".format(
+ sys.exc_info()[0]))
try:
sock.close()
except:
- logger.warning("failed to close socket to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
+ logger.warning(
+ "failed to close socket to debug monitor: {}; ignoring".format(
+ sys.exc_info()[0]))
self.addTearDownHook(shutdown_socket)
@@ -319,9 +365,11 @@ class GdbRemoteTestCaseBase(TestBase):
def get_debug_monitor_command_line_args(self, attach_pid=None):
if lldb.remote_platform:
- commandline_args = self.debug_monitor_extra_args + ["*:{}".format(self.port)]
+ commandline_args = self.debug_monitor_extra_args + \
+ ["*:{}".format(self.port)]
else:
- commandline_args = self.debug_monitor_extra_args + ["localhost:{}".format(self.port)]
+ commandline_args = self.debug_monitor_extra_args + \
+ ["localhost:{}".format(self.port)]
if attach_pid:
commandline_args += ["--attach=%d" % attach_pid]
@@ -331,14 +379,19 @@ class GdbRemoteTestCaseBase(TestBase):
def launch_debug_monitor(self, attach_pid=None, logfile=None):
# Create the command line.
- commandline_args = self.get_debug_monitor_command_line_args(attach_pid=attach_pid)
+ commandline_args = self.get_debug_monitor_command_line_args(
+ attach_pid=attach_pid)
# Start the server.
- server = self.spawnSubprocess(self.debug_monitor_exe, commandline_args, install_remote=False)
+ server = self.spawnSubprocess(
+ self.debug_monitor_exe,
+ commandline_args,
+ install_remote=False)
self.addTearDownHook(self.cleanupSubprocesses)
self.assertIsNotNone(server)
- # If we're receiving the stub's listening port from the named pipe, do that here.
+ # If we're receiving the stub's listening port from the named pipe, do
+ # that here.
if self.named_pipe:
self.port = self.get_stub_port_from_named_socket()
@@ -354,7 +407,9 @@ class GdbRemoteTestCaseBase(TestBase):
try:
server.terminate()
except:
- logger.warning("failed to terminate server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
+ logger.warning(
+ "failed to terminate server for debug monitor: {}; ignoring".format(
+ sys.exc_info()[0]))
self.addTearDownHook(shutdown_debug_monitor)
# Schedule debug monitor to be shut down during teardown.
@@ -374,11 +429,14 @@ class GdbRemoteTestCaseBase(TestBase):
# Schedule debug monitor to be shut down during teardown.
logger = self.logger
+
def shutdown_debug_monitor():
try:
server.terminate()
except:
- logger.warning("failed to terminate server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
+ logger.warning(
+ "failed to terminate server for debug monitor: {}; ignoring".format(
+ sys.exc_info()[0]))
self.addTearDownHook(shutdown_debug_monitor)
connect_attemps = 0
@@ -387,7 +445,7 @@ class GdbRemoteTestCaseBase(TestBase):
while connect_attemps < MAX_CONNECT_ATTEMPTS:
# Create a socket to talk to the server
try:
- logger.info("Connect attempt %d", connect_attemps+1)
+ logger.info("Connect attempt %d", connect_attemps + 1)
self.sock = self.create_socket()
return server
except _ConnectionRefused as serr:
@@ -400,18 +458,27 @@ class GdbRemoteTestCaseBase(TestBase):
server.terminate()
# Increment attempts.
- print("connect to debug monitor on port %d failed, attempt #%d of %d" % (self.port, attempts + 1, MAX_ATTEMPTS))
+ print(
+ "connect to debug monitor on port %d failed, attempt #%d of %d" %
+ (self.port, attempts + 1, MAX_ATTEMPTS))
attempts += 1
- # And wait a random length of time before next attempt, to avoid collisions.
- time.sleep(random.randint(1,5))
-
+ # And wait a random length of time before next attempt, to avoid
+ # collisions.
+ time.sleep(random.randint(1, 5))
+
# Now grab a new port number.
self.port = self.get_next_port()
- raise Exception("failed to create a socket to the launched debug monitor after %d tries" % attempts)
+ raise Exception(
+ "failed to create a socket to the launched debug monitor after %d tries" %
+ attempts)
- def launch_process_for_attach(self, inferior_args=None, sleep_seconds=3, exe_path=None):
+ def launch_process_for_attach(
+ self,
+ inferior_args=None,
+ sleep_seconds=3,
+ exe_path=None):
# We're going to start a child process that the debug monitor stub can later attach to.
# This process needs to be started so that it just hangs around for a while. We'll
# have it sleep.
@@ -425,15 +492,22 @@ class GdbRemoteTestCaseBase(TestBase):
args.append("sleep:%d" % sleep_seconds)
inferior = self.spawnSubprocess(exe_path, args)
+
def shutdown_process_for_attach():
try:
inferior.terminate()
except:
- logger.warning("failed to terminate inferior process for attach: {}; ignoring".format(sys.exc_info()[0]))
+ logger.warning(
+ "failed to terminate inferior process for attach: {}; ignoring".format(
+ sys.exc_info()[0]))
self.addTearDownHook(shutdown_process_for_attach)
return inferior
- def prep_debug_monitor_and_inferior(self, inferior_args=None, inferior_sleep_seconds=3, inferior_exe_path=None):
+ def prep_debug_monitor_and_inferior(
+ self,
+ inferior_args=None,
+ inferior_sleep_seconds=3,
+ inferior_exe_path=None):
"""Prep the debug monitor, the inferior, and the expected packet stream.
Handle the separate cases of using the debug monitor in attach-to-inferior mode
@@ -458,11 +532,15 @@ class GdbRemoteTestCaseBase(TestBase):
if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
# Launch the process that we'll use as the inferior.
- inferior = self.launch_process_for_attach(inferior_args=inferior_args, sleep_seconds=inferior_sleep_seconds, exe_path=inferior_exe_path)
+ inferior = self.launch_process_for_attach(
+ inferior_args=inferior_args,
+ sleep_seconds=inferior_sleep_seconds,
+ exe_path=inferior_exe_path)
self.assertIsNotNone(inferior)
self.assertTrue(inferior.pid > 0)
if self._inferior_startup == self._STARTUP_ATTACH:
- # In this case, we want the stub to attach via the command line, so set the command line attach pid here.
+ # In this case, we want the stub to attach via the command
+ # line, so set the command line attach pid here.
attach_pid = inferior.pid
if self._inferior_startup == self._STARTUP_LAUNCH:
@@ -471,11 +549,15 @@ class GdbRemoteTestCaseBase(TestBase):
inferior_exe_path = os.path.abspath("a.out")
if lldb.remote_platform:
- remote_path = lldbutil.append_to_process_working_directory(os.path.basename(inferior_exe_path))
+ remote_path = lldbutil.append_to_process_working_directory(
+ os.path.basename(inferior_exe_path))
remote_file_spec = lldb.SBFileSpec(remote_path, False)
- err = lldb.remote_platform.Install(lldb.SBFileSpec(inferior_exe_path, True), remote_file_spec)
+ err = lldb.remote_platform.Install(lldb.SBFileSpec(
+ inferior_exe_path, True), remote_file_spec)
if err.Fail():
- raise Exception("remote_platform.Install('%s', '%s') failed: %s" % (inferior_exe_path, remote_path, err))
+ raise Exception(
+ "remote_platform.Install('%s', '%s') failed: %s" %
+ (inferior_exe_path, remote_path, err))
inferior_exe_path = remote_path
launch_args = [inferior_exe_path]
@@ -491,13 +573,18 @@ class GdbRemoteTestCaseBase(TestBase):
if self._inferior_startup == self._STARTUP_LAUNCH:
self.add_verified_launch_packets(launch_args)
- return {"inferior":inferior, "server":server}
+ return {"inferior": inferior, "server": server}
- def expect_socket_recv(self, sock, expected_content_regex, timeout_seconds):
+ def expect_socket_recv(
+ self,
+ sock,
+ expected_content_regex,
+ timeout_seconds):
response = ""
timeout_time = time.time() + timeout_seconds
- while not expected_content_regex.match(response) and time.time() < timeout_time:
+ while not expected_content_regex.match(
+ response) and time.time() < timeout_time:
can_read, _, _ = select.select([sock], [], [], timeout_seconds)
if can_read and sock in can_read:
recv_bytes = sock.recv(4096)
@@ -514,7 +601,8 @@ class GdbRemoteTestCaseBase(TestBase):
_, can_write, _ = select.select([], [sock], [], timeout_seconds)
if can_write and sock in can_write:
written_byte_count = sock.send(request_bytes_remaining)
- request_bytes_remaining = request_bytes_remaining[written_byte_count:]
+ request_bytes_remaining = request_bytes_remaining[
+ written_byte_count:]
self.assertEqual(len(request_bytes_remaining), 0)
def do_handshake(self, stub_socket, timeout_seconds=5):
@@ -527,7 +615,8 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST))
# Receive the ack and "OK"
- self.expect_socket_recv(stub_socket, re.compile(r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds)
+ self.expect_socket_recv(stub_socket, re.compile(
+ r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds)
# Send the final ack.
self.expect_socket_send(stub_socket, "+", timeout_seconds)
@@ -553,12 +642,12 @@ class GdbRemoteTestCaseBase(TestBase):
self.test_sequence.add_log_lines(
["read packet: $QThreadSuffixSupported#e4",
"send packet: $OK#00",
- ], True)
+ ], True)
def add_process_info_collection_packets(self):
self.test_sequence.add_log_lines(
["read packet: $qProcessInfo#dc",
- { "direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"process_info_raw"} }],
+ {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}],
True)
_KNOWN_PROCESS_INFO_KEYS = [
@@ -575,7 +664,7 @@ class GdbRemoteTestCaseBase(TestBase):
"vendor",
"endian",
"ptrsize"
- ]
+ ]
def parse_process_info_response(self, context):
# Ensure we have a process info response.
@@ -584,7 +673,9 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertIsNotNone(process_info_raw)
# Pull out key:value; pairs.
- process_info_dict = { match.group(1):match.group(2) for match in re.finditer(r"([^:]+):([^;]+);", process_info_raw) }
+ process_info_dict = {
+ match.group(1): match.group(2) for match in re.finditer(
+ r"([^:]+):([^;]+);", process_info_raw)}
# Validate keys are known.
for (key, val) in list(process_info_dict.items()):
@@ -595,9 +686,9 @@ class GdbRemoteTestCaseBase(TestBase):
def add_register_info_collection_packets(self):
self.test_sequence.add_log_lines(
- [ { "type":"multi_response", "query":"qRegisterInfo", "append_iteration_suffix":True,
- "end_regex":re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
- "save_key":"reg_info_responses" } ],
+ [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True,
+ "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
+ "save_key": "reg_info_responses"}],
True)
def parse_register_info_packets(self, context):
@@ -606,13 +697,19 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertIsNotNone(reg_info_responses)
# Parse register infos.
- return [parse_reg_info_response(reg_info_response) for reg_info_response in reg_info_responses]
+ return [parse_reg_info_response(reg_info_response)
+ for reg_info_response in reg_info_responses]
def expect_gdbremote_sequence(self, timeout_seconds=None):
if not timeout_seconds:
timeout_seconds = self._TIMEOUT_SECONDS
- return expect_lldb_gdbserver_replay(self, self.sock, self.test_sequence,
- self._pump_queues, timeout_seconds, self.logger)
+ return expect_lldb_gdbserver_replay(
+ self,
+ self.sock,
+ self.test_sequence,
+ self._pump_queues,
+ timeout_seconds,
+ self.logger)
_KNOWN_REGINFO_KEYS = [
"name",
@@ -667,7 +764,7 @@ class GdbRemoteTestCaseBase(TestBase):
def add_query_memory_region_packets(self, address):
self.test_sequence.add_log_lines(
["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
- {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"memory_region_response"} }],
+ {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}],
True)
def parse_key_val_dict(self, key_val_text, allow_dupes=True):
@@ -678,13 +775,15 @@ class GdbRemoteTestCaseBase(TestBase):
val = match.group(2)
if key in kv_dict:
if allow_dupes:
- if type(kv_dict[key]) == list:
+ if isinstance(kv_dict[key], list):
kv_dict[key].append(val)
else:
# Promote to list
kv_dict[key] = [kv_dict[key], val]
else:
- self.fail("key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(key, val, key_val_text, kv_dict))
+ self.fail(
+ "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
+ key, val, key_val_text, kv_dict))
else:
kv_dict[key] = val
return kv_dict
@@ -694,17 +793,25 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertIsNotNone(context.get("memory_region_response"))
# Pull out key:value; pairs.
- mem_region_dict = self.parse_key_val_dict(context.get("memory_region_response"))
+ mem_region_dict = self.parse_key_val_dict(
+ context.get("memory_region_response"))
# Validate keys are known.
for (key, val) in list(mem_region_dict.items()):
- self.assertTrue(key in ["start", "size", "permissions", "name", "error"])
+ self.assertTrue(
+ key in [
+ "start",
+ "size",
+ "permissions",
+ "name",
+ "error"])
self.assertIsNotNone(val)
# Return the dictionary of key-value pairs for the memory region.
return mem_region_dict
- def assert_address_within_memory_region(self, test_address, mem_region_dict):
+ def assert_address_within_memory_region(
+ self, test_address, mem_region_dict):
self.assertIsNotNone(mem_region_dict)
self.assertTrue("start" in mem_region_dict)
self.assertTrue("size" in mem_region_dict)
@@ -714,15 +821,25 @@ class GdbRemoteTestCaseBase(TestBase):
range_end = range_start + range_size
if test_address < range_start:
- self.fail("address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
+ self.fail(
+ "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
+ test_address,
+ range_start,
+ range_end,
+ range_size))
elif test_address >= range_end:
- self.fail("address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
+ self.fail(
+ "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
+ test_address,
+ range_start,
+ range_end,
+ range_size))
def add_threadinfo_collection_packets(self):
self.test_sequence.add_log_lines(
- [ { "type":"multi_response", "first_query":"qfThreadInfo", "next_query":"qsThreadInfo",
- "append_iteration_suffix":False, "end_regex":re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
- "save_key":"threadinfo_responses" } ],
+ [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo",
+ "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
+ "save_key": "threadinfo_responses"}],
True)
def parse_threadinfo_packets(self, context):
@@ -760,35 +877,44 @@ class GdbRemoteTestCaseBase(TestBase):
return threads
- def add_set_breakpoint_packets(self, address, do_continue=True, breakpoint_kind=1):
+ def add_set_breakpoint_packets(
+ self,
+ address,
+ do_continue=True,
+ breakpoint_kind=1):
self.test_sequence.add_log_lines(
- [# Set the breakpoint.
- "read packet: $Z0,{0:x},{1}#00".format(address, breakpoint_kind),
- # Verify the stub could set it.
- "send packet: $OK#00",
- ], True)
+ [ # Set the breakpoint.
+ "read packet: $Z0,{0:x},{1}#00".format(
+ address, breakpoint_kind),
+ # Verify the stub could set it.
+ "send packet: $OK#00",
+ ], True)
if (do_continue):
self.test_sequence.add_log_lines(
- [# Continue the inferior.
- "read packet: $c#63",
- # Expect a breakpoint stop report.
- {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
- ], True)
+ [ # Continue the inferior.
+ "read packet: $c#63",
+ # Expect a breakpoint stop report.
+ {"direction": "send",
+ "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
+ "capture": {1: "stop_signo",
+ 2: "stop_thread_id"}},
+ ], True)
def add_remove_breakpoint_packets(self, address, breakpoint_kind=1):
self.test_sequence.add_log_lines(
- [# Remove the breakpoint.
- "read packet: $z0,{0:x},{1}#00".format(address, breakpoint_kind),
- # Verify the stub could unset it.
- "send packet: $OK#00",
+ [ # Remove the breakpoint.
+ "read packet: $z0,{0:x},{1}#00".format(
+ address, breakpoint_kind),
+ # Verify the stub could unset it.
+ "send packet: $OK#00",
], True)
def add_qSupported_packets(self):
self.test_sequence.add_log_lines(
["read packet: $qSupported#00",
- {"direction":"send", "regex":r"^\$(.*)#[0-9a-fA-F]{2}", "capture":{1: "qSupported_response"}},
- ], True)
+ {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}},
+ ], True)
_KNOWN_QSUPPORTED_STUB_FEATURES = [
"augmented-libraries-svr4-read",
@@ -821,23 +947,27 @@ class GdbRemoteTestCaseBase(TestBase):
supported_dict[key] = val
else:
if len(key) < 2:
- raise Exception("singular stub feature is too short: must be stub_feature{+,-,?}")
+ raise Exception(
+ "singular stub feature is too short: must be stub_feature{+,-,?}")
supported_type = key[-1]
key = key[:-1]
if not supported_type in ["+", "-", "?"]:
- raise Exception("malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
- supported_dict[key] = supported_type
+ raise Exception(
+ "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
+ supported_dict[key] = supported_type
# Ensure we know the supported element
- if not key in self._KNOWN_QSUPPORTED_STUB_FEATURES:
- raise Exception("unknown qSupported stub feature reported: %s" % key)
+ if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES:
+ raise Exception(
+ "unknown qSupported stub feature reported: %s" %
+ key)
return supported_dict
def run_process_then_stop(self, run_seconds=1):
# Tell the stub to continue.
self.test_sequence.add_log_lines(
- ["read packet: $vCont;c#a8"],
- True)
+ ["read packet: $vCont;c#a8"],
+ True)
context = self.expect_gdbremote_sequence()
# Wait for run_seconds.
@@ -847,7 +977,7 @@ class GdbRemoteTestCaseBase(TestBase):
self.reset_test_sequence()
self.test_sequence.add_log_lines(
["read packet: {}".format(chr(3)),
- {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result"} }],
+ {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}],
True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
@@ -857,18 +987,21 @@ class GdbRemoteTestCaseBase(TestBase):
def select_modifiable_register(self, reg_infos):
"""Find a register that can be read/written freely."""
- PREFERRED_REGISTER_NAMES = set(["rax",])
+ PREFERRED_REGISTER_NAMES = set(["rax", ])
- # First check for the first register from the preferred register name set.
+ # First check for the first register from the preferred register name
+ # set.
alternative_register_index = None
self.assertIsNotNone(reg_infos)
for reg_info in reg_infos:
- if ("name" in reg_info) and (reg_info["name"] in PREFERRED_REGISTER_NAMES):
+ if ("name" in reg_info) and (
+ reg_info["name"] in PREFERRED_REGISTER_NAMES):
# We found a preferred register. Use it.
return reg_info["lldb_register_index"]
if ("generic" in reg_info) and (reg_info["generic"] == "fp"):
- # A frame pointer register will do as a register to modify temporarily.
+ # A frame pointer register will do as a register to modify
+ # temporarily.
alternative_register_index = reg_info["lldb_register_index"]
# We didn't find a preferred register. Return whatever alternative register
@@ -901,7 +1034,8 @@ class GdbRemoteTestCaseBase(TestBase):
def find_generic_register_with_name(self, reg_infos, generic_name):
self.assertIsNotNone(reg_infos)
for reg_info in reg_infos:
- if ("generic" in reg_info) and (reg_info["generic"] == generic_name):
+ if ("generic" in reg_info) and (
+ reg_info["generic"] == generic_name):
return reg_info
return None
@@ -912,13 +1046,13 @@ class GdbRemoteTestCaseBase(TestBase):
if encoded_bytes[i] == "}":
# Handle escaped char.
self.assertTrue(i + 1 < len(encoded_bytes))
- decoded_bytes += chr(ord(encoded_bytes[i+1]) ^ 0x20)
- i +=2
+ decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20)
+ i += 2
elif encoded_bytes[i] == "*":
# Handle run length encoding.
self.assertTrue(len(decoded_bytes) > 0)
self.assertTrue(i + 1 < len(encoded_bytes))
- repeat_count = ord(encoded_bytes[i+1]) - 29
+ repeat_count = ord(encoded_bytes[i + 1]) - 29
decoded_bytes += decoded_bytes[-1] * repeat_count
i += 2
else:
@@ -955,7 +1089,8 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertFalse(key in auxv_dict)
auxv_dict[key] = value
- self.fail("should not reach here - implies required double zero entry not found")
+ self.fail(
+ "should not reach here - implies required double zero entry not found")
return auxv_dict
def read_binary_data_in_chunks(self, command_prefix, chunk_length):
@@ -967,10 +1102,21 @@ class GdbRemoteTestCaseBase(TestBase):
while not done:
# Grab the next iteration of data.
self.reset_test_sequence()
- self.test_sequence.add_log_lines([
- "read packet: ${}{:x},{:x}:#00".format(command_prefix, offset, chunk_length),
- {"direction":"send", "regex":re.compile(r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", re.MULTILINE|re.DOTALL), "capture":{1:"response_type", 2:"content_raw"} }
- ], True)
+ self.test_sequence.add_log_lines(
+ [
+ "read packet: ${}{:x},{:x}:#00".format(
+ command_prefix,
+ offset,
+ chunk_length),
+ {
+ "direction": "send",
+ "regex": re.compile(
+ r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
+ re.MULTILINE | re.DOTALL),
+ "capture": {
+ 1: "response_type",
+ 2: "content_raw"}}],
+ True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
@@ -997,25 +1143,32 @@ class GdbRemoteTestCaseBase(TestBase):
# Send the intterupt.
"read packet: {}".format(chr(3)),
# And wait for the stop notification.
- {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", "capture":{1:"stop_signo", 2:"stop_key_val_text" } },
- ], True)
+ {"direction": "send",
+ "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
+ "capture": {1: "stop_signo",
+ 2: "stop_key_val_text"}},
+ ], True)
def parse_interrupt_packets(self, context):
self.assertIsNotNone(context.get("stop_signo"))
self.assertIsNotNone(context.get("stop_key_val_text"))
- return (int(context["stop_signo"], 16), self.parse_key_val_dict(context["stop_key_val_text"]))
+ return (int(context["stop_signo"], 16), self.parse_key_val_dict(
+ context["stop_key_val_text"]))
def add_QSaveRegisterState_packets(self, thread_id):
if thread_id:
# Use the thread suffix form.
- request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(thread_id)
+ request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(
+ thread_id)
else:
request = "read packet: $QSaveRegisterState#00"
-
- self.test_sequence.add_log_lines([
- request,
- {"direction":"send", "regex":r"^\$(E?.*)#[0-9a-fA-F]{2}$", "capture":{1:"save_response" } },
- ], True)
+
+ self.test_sequence.add_log_lines([request,
+ {"direction": "send",
+ "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$",
+ "capture": {1: "save_response"}},
+ ],
+ True)
def parse_QSaveRegisterState_response(self, context):
self.assertIsNotNone(context)
@@ -1032,16 +1185,19 @@ class GdbRemoteTestCaseBase(TestBase):
def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
if thread_id:
# Use the thread suffix form.
- request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(save_id, thread_id)
+ request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
+ save_id, thread_id)
else:
- request = "read packet: $QRestoreRegisterState:{}#00".format(save_id)
+ request = "read packet: $QRestoreRegisterState:{}#00".format(
+ save_id)
self.test_sequence.add_log_lines([
request,
"send packet: $OK#00"
- ], True)
+ ], True)
- def flip_all_bits_in_each_register_value(self, reg_infos, endian, thread_id=None):
+ def flip_all_bits_in_each_register_value(
+ self, reg_infos, endian, thread_id=None):
self.assertIsNotNone(reg_infos)
successful_writes = 0
@@ -1049,16 +1205,18 @@ class GdbRemoteTestCaseBase(TestBase):
for reg_info in reg_infos:
# Use the lldb register index added to the reg info. We're not necessarily
- # working off a full set of register infos, so an inferred register index could be wrong.
+ # working off a full set of register infos, so an inferred register
+ # index could be wrong.
reg_index = reg_info["lldb_register_index"]
self.assertIsNotNone(reg_index)
- reg_byte_size = int(reg_info["bitsize"])/8
+ reg_byte_size = int(reg_info["bitsize"]) / 8
self.assertTrue(reg_byte_size > 0)
# Handle thread suffix.
if thread_id:
- p_request = "read packet: $p{:x};thread:{:x}#00".format(reg_index, thread_id)
+ p_request = "read packet: $p{:x};thread:{:x}#00".format(
+ reg_index, thread_id)
else:
p_request = "read packet: $p{:x}#00".format(reg_index)
@@ -1066,15 +1224,16 @@ class GdbRemoteTestCaseBase(TestBase):
self.reset_test_sequence()
self.test_sequence.add_log_lines([
p_request,
- { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
- ], True)
+ {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
+ ], True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Verify the response length.
p_response = context.get("p_response")
self.assertIsNotNone(p_response)
- initial_reg_value = unpack_register_hex_unsigned(endian, p_response)
+ initial_reg_value = unpack_register_hex_unsigned(
+ endian, p_response)
# Flip the value by xoring with all 1s
all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) / 8)
@@ -1083,16 +1242,22 @@ class GdbRemoteTestCaseBase(TestBase):
# Handle thread suffix for P.
if thread_id:
- P_request = "read packet: $P{:x}={};thread:{:x}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
+ P_request = "read packet: $P{:x}={};thread:{:x}#00".format(
+ reg_index, pack_register_hex(
+ endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
else:
- P_request = "read packet: $P{:x}={}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size))
+ P_request = "read packet: $P{:x}={}#00".format(
+ reg_index, pack_register_hex(
+ endian, flipped_bits_int, byte_size=reg_byte_size))
# Write the flipped value to the register.
self.reset_test_sequence()
- self.test_sequence.add_log_lines([
- P_request,
- { "direction":"send", "regex":r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", "capture":{1:"P_response"} },
- ], True)
+ self.test_sequence.add_log_lines([P_request,
+ {"direction": "send",
+ "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
+ "capture": {1: "P_response"}},
+ ],
+ True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
@@ -1107,25 +1272,27 @@ class GdbRemoteTestCaseBase(TestBase):
failed_writes += 1
# print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
- # Read back the register value, ensure it matches the flipped value.
+ # Read back the register value, ensure it matches the flipped
+ # value.
if P_response == "OK":
self.reset_test_sequence()
self.test_sequence.add_log_lines([
p_request,
- { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
- ], True)
+ {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
+ ], True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
verify_p_response_raw = context.get("p_response")
self.assertIsNotNone(verify_p_response_raw)
- verify_bits = unpack_register_hex_unsigned(endian, verify_p_response_raw)
+ verify_bits = unpack_register_hex_unsigned(
+ endian, verify_p_response_raw)
if verify_bits != flipped_bits_int:
# Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts.
# print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
successful_writes -= 1
- failed_writes +=1
+ failed_writes += 1
return (successful_writes, failed_writes)
@@ -1136,7 +1303,8 @@ class GdbRemoteTestCaseBase(TestBase):
return False
if reg_info["set"] != "General Purpose Registers":
return False
- if ("container-regs" in reg_info) and (len(reg_info["container-regs"]) > 0):
+ if ("container-regs" in reg_info) and (
+ len(reg_info["container-regs"]) > 0):
# Don't try to bit flip registers contained in another register.
return False
if re.match("^.s$", reg_info["name"]):
@@ -1154,13 +1322,15 @@ class GdbRemoteTestCaseBase(TestBase):
values = {}
for reg_info in reg_infos:
- # We append a register index when load reg infos so we can work with subsets.
+ # We append a register index when load reg infos so we can work
+ # with subsets.
reg_index = reg_info.get("lldb_register_index")
self.assertIsNotNone(reg_index)
# Handle thread suffix.
if thread_id:
- p_request = "read packet: $p{:x};thread:{:x}#00".format(reg_index, thread_id)
+ p_request = "read packet: $p{:x};thread:{:x}#00".format(
+ reg_index, thread_id)
else:
p_request = "read packet: $p{:x}#00".format(reg_index)
@@ -1168,8 +1338,8 @@ class GdbRemoteTestCaseBase(TestBase):
self.reset_test_sequence()
self.test_sequence.add_log_lines([
p_request,
- { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
- ], True)
+ {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
+ ], True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
@@ -1178,58 +1348,75 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertIsNotNone(p_response)
self.assertTrue(len(p_response) > 0)
self.assertFalse(p_response[0] == "E")
-
- values[reg_index] = unpack_register_hex_unsigned(endian, p_response)
-
+
+ values[reg_index] = unpack_register_hex_unsigned(
+ endian, p_response)
+
return values
def add_vCont_query_packets(self):
- self.test_sequence.add_log_lines([
- "read packet: $vCont?#49",
- {"direction":"send", "regex":r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", "capture":{2:"vCont_query_response" } },
- ], True)
+ self.test_sequence.add_log_lines(["read packet: $vCont?#49",
+ {"direction": "send",
+ "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
+ "capture": {2: "vCont_query_response"}},
+ ],
+ True)
def parse_vCont_query_response(self, context):
self.assertIsNotNone(context)
vCont_query_response = context.get("vCont_query_response")
- # Handle case of no vCont support at all - in which case the capture group will be none or zero length.
+ # Handle case of no vCont support at all - in which case the capture
+ # group will be none or zero length.
if not vCont_query_response or len(vCont_query_response) == 0:
return {}
- return {key:1 for key in vCont_query_response.split(";") if key and len(key) > 0}
-
- def count_single_steps_until_true(self, thread_id, predicate, args, max_step_count=100, use_Hc_packet=True, step_instruction="s"):
+ return {key: 1 for key in vCont_query_response.split(
+ ";") if key and len(key) > 0}
+
+ def count_single_steps_until_true(
+ self,
+ thread_id,
+ predicate,
+ args,
+ max_step_count=100,
+ use_Hc_packet=True,
+ step_instruction="s"):
"""Used by single step test that appears in a few different contexts."""
single_step_count = 0
while single_step_count < max_step_count:
self.assertIsNotNone(thread_id)
- # Build the packet for the single step instruction. We replace {thread}, if present, with the thread_id.
- step_packet = "read packet: ${}#00".format(re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
+ # Build the packet for the single step instruction. We replace
+ # {thread}, if present, with the thread_id.
+ step_packet = "read packet: ${}#00".format(
+ re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
# print("\nstep_packet created: {}\n".format(step_packet))
# Single step.
self.reset_test_sequence()
if use_Hc_packet:
self.test_sequence.add_log_lines(
- [# Set the continue thread.
- "read packet: $Hc{0:x}#00".format(thread_id),
- "send packet: $OK#00",
- ], True)
+ [ # Set the continue thread.
+ "read packet: $Hc{0:x}#00".format(thread_id),
+ "send packet: $OK#00",
+ ], True)
self.test_sequence.add_log_lines([
- # Single step.
- step_packet,
- # "read packet: $vCont;s:{0:x}#00".format(thread_id),
- # Expect a breakpoint stop report.
- {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
- ], True)
+ # Single step.
+ step_packet,
+ # "read packet: $vCont;s:{0:x}#00".format(thread_id),
+ # Expect a breakpoint stop report.
+ {"direction": "send",
+ "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
+ "capture": {1: "stop_signo",
+ 2: "stop_thread_id"}},
+ ], True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
self.assertIsNotNone(context.get("stop_signo"))
self.assertEqual(int(context.get("stop_signo"), 16),
- lldbutil.get_signal_number('SIGTRAP'))
+ lldbutil.get_signal_number('SIGTRAP'))
single_step_count += 1
@@ -1251,9 +1438,9 @@ class GdbRemoteTestCaseBase(TestBase):
self.reset_test_sequence()
self.test_sequence.add_log_lines(
["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
- {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"g_c1_contents"} },
+ {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}},
"read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
- {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"g_c2_contents"} }],
+ {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}],
True)
# Run the packet stream.
@@ -1264,26 +1451,34 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertIsNotNone(context.get("g_c1_contents"))
self.assertIsNotNone(context.get("g_c2_contents"))
- return (context.get("g_c1_contents").decode("hex") == expected_g_c1) and (context.get("g_c2_contents").decode("hex") == expected_g_c2)
+ return (context.get("g_c1_contents").decode("hex") == expected_g_c1) and (
+ context.get("g_c2_contents").decode("hex") == expected_g_c2)
- def single_step_only_steps_one_instruction(self, use_Hc_packet=True, step_instruction="s"):
+ def single_step_only_steps_one_instruction(
+ self, use_Hc_packet=True, step_instruction="s"):
"""Used by single step test that appears in a few different contexts."""
# Start up the inferior.
procs = self.prep_debug_monitor_and_inferior(
- inferior_args=["get-code-address-hex:swap_chars", "get-data-address-hex:g_c1", "get-data-address-hex:g_c2", "sleep:1", "call-function:swap_chars", "sleep:5"])
+ inferior_args=[
+ "get-code-address-hex:swap_chars",
+ "get-data-address-hex:g_c1",
+ "get-data-address-hex:g_c2",
+ "sleep:1",
+ "call-function:swap_chars",
+ "sleep:5"])
# Run the process
self.test_sequence.add_log_lines(
- [# Start running after initial stop.
- "read packet: $c#63",
- # Match output line that prints the memory address of the function call entry point.
- # Note we require launch-only testing so we can get inferior otuput.
- { "type":"output_match", "regex":r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
- "capture":{ 1:"function_address", 2:"g_c1_address", 3:"g_c2_address"} },
- # Now stop the inferior.
- "read packet: {}".format(chr(3)),
- # And wait for the stop notification.
- {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} }],
+ [ # Start running after initial stop.
+ "read packet: $c#63",
+ # Match output line that prints the memory address of the function call entry point.
+ # Note we require launch-only testing so we can get inferior otuput.
+ {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
+ "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}},
+ # Now stop the inferior.
+ "read packet: {}".format(chr(3)),
+ # And wait for the stop notification.
+ {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
True)
# Run the packet stream.
@@ -1312,13 +1507,17 @@ class GdbRemoteTestCaseBase(TestBase):
else:
BREAKPOINT_KIND = 1
self.reset_test_sequence()
- self.add_set_breakpoint_packets(function_address, do_continue=True, breakpoint_kind=BREAKPOINT_KIND)
+ self.add_set_breakpoint_packets(
+ function_address,
+ do_continue=True,
+ breakpoint_kind=BREAKPOINT_KIND)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Remove the breakpoint.
self.reset_test_sequence()
- self.add_remove_breakpoint_packets(function_address, breakpoint_kind=BREAKPOINT_KIND)
+ self.add_remove_breakpoint_packets(
+ function_address, breakpoint_kind=BREAKPOINT_KIND)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
@@ -1331,44 +1530,72 @@ class GdbRemoteTestCaseBase(TestBase):
self.assertTrue(self.g_c1_c2_contents_are(args))
- # Verify we take only a small number of steps to hit the first state. Might need to work through function entry prologue code.
+ # Verify we take only a small number of steps to hit the first state.
+ # Might need to work through function entry prologue code.
args["expected_g_c1"] = "1"
args["expected_g_c2"] = "1"
- (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=25, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
+ (state_reached,
+ step_count) = self.count_single_steps_until_true(main_thread_id,
+ self.g_c1_c2_contents_are,
+ args,
+ max_step_count=25,
+ use_Hc_packet=use_Hc_packet,
+ step_instruction=step_instruction)
self.assertTrue(state_reached)
# Verify we hit the next state.
args["expected_g_c1"] = "1"
args["expected_g_c2"] = "0"
- (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
+ (state_reached,
+ step_count) = self.count_single_steps_until_true(main_thread_id,
+ self.g_c1_c2_contents_are,
+ args,
+ max_step_count=5,
+ use_Hc_packet=use_Hc_packet,
+ step_instruction=step_instruction)
self.assertTrue(state_reached)
expected_step_count = 1
arch = self.getArchitecture()
- #MIPS required "3" (ADDIU, SB, LD) machine instructions for updation of variable value
- if re.match("mips",arch):
- expected_step_count = 3
- #S390X requires "2" (LARL, MVI) machine instructions for updation of variable value
- if re.match("s390x",arch):
- expected_step_count = 2
+ # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
+ # of variable value
+ if re.match("mips", arch):
+ expected_step_count = 3
+ # S390X requires "2" (LARL, MVI) machine instructions for updation of
+ # variable value
+ if re.match("s390x", arch):
+ expected_step_count = 2
self.assertEqual(step_count, expected_step_count)
# Verify we hit the next state.
args["expected_g_c1"] = "0"
args["expected_g_c2"] = "0"
- (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
+ (state_reached,
+ step_count) = self.count_single_steps_until_true(main_thread_id,
+ self.g_c1_c2_contents_are,
+ args,
+ max_step_count=5,
+ use_Hc_packet=use_Hc_packet,
+ step_instruction=step_instruction)
self.assertTrue(state_reached)
self.assertEqual(step_count, expected_step_count)
# Verify we hit the next state.
args["expected_g_c1"] = "0"
args["expected_g_c2"] = "1"
- (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
+ (state_reached,
+ step_count) = self.count_single_steps_until_true(main_thread_id,
+ self.g_c1_c2_contents_are,
+ args,
+ max_step_count=5,
+ use_Hc_packet=use_Hc_packet,
+ step_instruction=step_instruction)
self.assertTrue(state_reached)
self.assertEqual(step_count, expected_step_count)
def maybe_strict_output_regex(self, regex):
- return '.*'+regex+'.*' if lldbplatformutil.hasChattyStderr(self) else '^'+regex+'$'
+ return '.*' + regex + \
+ '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$'
def install_and_create_launch_args(self):
exe_path = os.path.abspath('a.out')
OpenPOWER on IntegriCloud