summaryrefslogtreecommitdiffstats
path: root/lldb/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py
diff options
context:
space:
mode:
authorRaphael Isemann <teemperor@gmail.com>2019-07-19 15:55:23 +0000
committerRaphael Isemann <teemperor@gmail.com>2019-07-19 15:55:23 +0000
commitb45853f173139c7c3078b97f53e7a6eba6148c13 (patch)
tree3b24eec01a7b23edd4364911d9bf6490ce2c1422 /lldb/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py
parent005423018182120f3ae2a54ff5fd3390c96fb527 (diff)
downloadbcm5719-llvm-b45853f173139c7c3078b97f53e7a6eba6148c13.tar.gz
bcm5719-llvm-b45853f173139c7c3078b97f53e7a6eba6148c13.zip
[lldb][NFC] Cleanup mentions and code related to lldb-mi
Summary: lldb-mi has been removed, but there are still a bunch of references in the code base. This patch removes all of them. Reviewers: JDevlieghere, jfb Reviewed By: JDevlieghere Subscribers: dexonsmith, ki.stfu, mgorny, abidh, jfb, lldb-commits Tags: #lldb Differential Revision: https://reviews.llvm.org/D64992 llvm-svn: 366590
Diffstat (limited to 'lldb/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py')
-rw-r--r--lldb/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py1102
1 files changed, 0 insertions, 1102 deletions
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py
deleted file mode 100644
index 74ee4a6c44c..00000000000
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py
+++ /dev/null
@@ -1,1102 +0,0 @@
-#!/usr/bin/env python
-
-import binascii
-import json
-import optparse
-import os
-import pprint
-import socket
-import string
-import subprocess
-import sys
-import threading
-
-
-def dump_memory(base_addr, data, num_per_line, outfile):
-
- data_len = len(data)
- hex_string = binascii.hexlify(data)
- addr = base_addr
- ascii_str = ''
- i = 0
- while i < data_len:
- outfile.write('0x%8.8x: ' % (addr + i))
- bytes_left = data_len - i
- if bytes_left >= num_per_line:
- curr_data_len = num_per_line
- else:
- curr_data_len = bytes_left
- hex_start_idx = i * 2
- hex_end_idx = hex_start_idx + curr_data_len * 2
- curr_hex_str = hex_string[hex_start_idx:hex_end_idx]
- # 'curr_hex_str' now contains the hex byte string for the
- # current line with no spaces between bytes
- t = iter(curr_hex_str)
- # Print hex bytes separated by space
- outfile.write(' '.join(a + b for a, b in zip(t, t)))
- # Print two spaces
- outfile.write(' ')
- # Calculate ASCII string for bytes into 'ascii_str'
- ascii_str = ''
- for j in range(i, i + curr_data_len):
- ch = data[j]
- if ch in string.printable and ch not in string.whitespace:
- ascii_str += '%c' % (ch)
- else:
- ascii_str += '.'
- # Print ASCII representation and newline
- outfile.write(ascii_str)
- i = i + curr_data_len
- outfile.write('\n')
-
-
-def read_packet(f, verbose=False, trace_file=None):
- '''Decode a JSON packet that starts with the content length and is
- followed by the JSON bytes from a file 'f'. Returns None on EOF.
- '''
- line = f.readline().decode("utf-8")
- if len(line) == 0:
- return None # EOF.
-
- # Watch for line that starts with the prefix
- prefix = 'Content-Length: '
- if line.startswith(prefix):
- # Decode length of JSON bytes
- if verbose:
- print('content: "%s"' % (line))
- length = int(line[len(prefix):])
- if verbose:
- print('length: "%u"' % (length))
- # Skip empty line
- line = f.readline()
- if verbose:
- print('empty: "%s"' % (line))
- # Read JSON bytes
- json_str = f.read(length)
- if verbose:
- print('json: "%s"' % (json_str))
- if trace_file:
- trace_file.write('from adaptor:\n%s\n' % (json_str))
- # Decode the JSON bytes into a python dictionary
- return json.loads(json_str)
-
- return None
-
-
-def packet_type_is(packet, packet_type):
- return 'type' in packet and packet['type'] == packet_type
-
-
-def read_packet_thread(vs_comm):
- done = False
- while not done:
- packet = read_packet(vs_comm.recv, trace_file=vs_comm.trace_file)
- # `packet` will be `None` on EOF. We want to pass it down to
- # handle_recv_packet anyway so the main thread can handle unexpected
- # termination of lldb-vscode and stop waiting for new packets.
- done = not vs_comm.handle_recv_packet(packet)
-
-
-class DebugCommunication(object):
-
- def __init__(self, recv, send):
- self.trace_file = None
- self.send = send
- self.recv = recv
- self.recv_packets = []
- self.recv_condition = threading.Condition()
- self.recv_thread = threading.Thread(target=read_packet_thread,
- args=(self,))
- self.process_event_body = None
- self.exit_status = None
- self.initialize_body = None
- self.thread_stop_reasons = {}
- self.sequence = 1
- self.threads = None
- self.recv_thread.start()
- self.output_condition = threading.Condition()
- self.output = {}
- self.configuration_done_sent = False
- self.frame_scopes = {}
-
- @classmethod
- def encode_content(cls, s):
- return ("Content-Length: %u\r\n\r\n%s" % (len(s), s)).encode("utf-8")
-
- @classmethod
- def validate_response(cls, command, response):
- if command['command'] != response['command']:
- raise ValueError('command mismatch in response')
- if command['seq'] != response['request_seq']:
- raise ValueError('seq mismatch in response')
-
- def get_output(self, category, timeout=0.0, clear=True):
- self.output_condition.acquire()
- output = None
- if category in self.output:
- output = self.output[category]
- if clear:
- del self.output[category]
- elif timeout != 0.0:
- self.output_condition.wait(timeout)
- if category in self.output:
- output = self.output[category]
- if clear:
- del self.output[category]
- self.output_condition.release()
- return output
-
- def enqueue_recv_packet(self, packet):
- self.recv_condition.acquire()
- self.recv_packets.append(packet)
- self.recv_condition.notify()
- self.recv_condition.release()
-
- def handle_recv_packet(self, packet):
- '''Called by the read thread that is waiting for all incoming packets
- to store the incoming packet in "self.recv_packets" in a thread safe
- way. This function will then signal the "self.recv_condition" to
- indicate a new packet is available. Returns True if the caller
- should keep calling this function for more packets.
- '''
- # If EOF, notify the read thread by enqueing a None.
- if not packet:
- self.enqueue_recv_packet(None)
- return False
-
- # Check the packet to see if is an event packet
- keepGoing = True
- packet_type = packet['type']
- if packet_type == 'event':
- event = packet['event']
- body = None
- if 'body' in packet:
- body = packet['body']
- # Handle the event packet and cache information from these packets
- # as they come in
- if event == 'output':
- # Store any output we receive so clients can retrieve it later.
- category = body['category']
- output = body['output']
- self.output_condition.acquire()
- if category in self.output:
- self.output[category] += output
- else:
- self.output[category] = output
- self.output_condition.notify()
- self.output_condition.release()
- # no need to add 'output' packets to our packets list
- return keepGoing
- elif event == 'process':
- # When a new process is attached or launched, remember the
- # details that are available in the body of the event
- self.process_event_body = body
- elif event == 'stopped':
- # Each thread that stops with a reason will send a
- # 'stopped' event. We need to remember the thread stop
- # reasons since the 'threads' command doesn't return
- # that information.
- self._process_stopped()
- tid = body['threadId']
- self.thread_stop_reasons[tid] = body
- elif packet_type == 'response':
- if packet['command'] == 'disconnect':
- keepGoing = False
- self.enqueue_recv_packet(packet)
- return keepGoing
-
- def send_packet(self, command_dict, set_sequence=True):
- '''Take the "command_dict" python dictionary and encode it as a JSON
- string and send the contents as a packet to the VSCode debug
- adaptor'''
- # Set the sequence ID for this command automatically
- if set_sequence:
- command_dict['seq'] = self.sequence
- self.sequence += 1
- # Encode our command dictionary as a JSON string
- json_str = json.dumps(command_dict, separators=(',', ':'))
- if self.trace_file:
- self.trace_file.write('to adaptor:\n%s\n' % (json_str))
- length = len(json_str)
- if length > 0:
- # Send the encoded JSON packet and flush the 'send' file
- self.send.write(self.encode_content(json_str))
- self.send.flush()
-
- def recv_packet(self, filter_type=None, filter_event=None, timeout=None):
- '''Get a JSON packet from the VSCode debug adaptor. This function
- assumes a thread that reads packets is running and will deliver
- any received packets by calling handle_recv_packet(...). This
- function will wait for the packet to arrive and return it when
- it does.'''
- while True:
- try:
- self.recv_condition.acquire()
- packet = None
- while True:
- for (i, curr_packet) in enumerate(self.recv_packets):
- if not curr_packet:
- raise EOFError
- packet_type = curr_packet['type']
- if filter_type is None or packet_type in filter_type:
- if (filter_event is None or
- (packet_type == 'event' and
- curr_packet['event'] in filter_event)):
- packet = self.recv_packets.pop(i)
- break
- if packet:
- break
- # Sleep until packet is received
- len_before = len(self.recv_packets)
- self.recv_condition.wait(timeout)
- len_after = len(self.recv_packets)
- if len_before == len_after:
- return None # Timed out
- return packet
- except EOFError:
- return None
- finally:
- self.recv_condition.release()
-
- return None
-
- def send_recv(self, command):
- '''Send a command python dictionary as JSON and receive the JSON
- response. Validates that the response is the correct sequence and
- command in the reply. Any events that are received are added to the
- events list in this object'''
- self.send_packet(command)
- done = False
- while not done:
- response = self.recv_packet(filter_type='response')
- if response is None:
- desc = 'no response for "%s"' % (command['command'])
- raise ValueError(desc)
- self.validate_response(command, response)
- return response
- return None
-
- def wait_for_event(self, filter=None, timeout=None):
- while True:
- return self.recv_packet(filter_type='event', filter_event=filter,
- timeout=timeout)
- return None
-
- def wait_for_stopped(self, timeout=None):
- stopped_events = []
- stopped_event = self.wait_for_event(filter=['stopped', 'exited'],
- timeout=timeout)
- exited = False
- while stopped_event:
- stopped_events.append(stopped_event)
- # If we exited, then we are done
- if stopped_event['event'] == 'exited':
- self.exit_status = stopped_event['body']['exitCode']
- exited = True
- break
- # Otherwise we stopped and there might be one or more 'stopped'
- # events for each thread that stopped with a reason, so keep
- # checking for more 'stopped' events and return all of them
- stopped_event = self.wait_for_event(filter='stopped', timeout=0.25)
- if exited:
- self.threads = []
- return stopped_events
-
- def wait_for_exited(self):
- event_dict = self.wait_for_event('exited')
- if event_dict is None:
- raise ValueError("didn't get stopped event")
- return event_dict
-
- def get_initialize_value(self, key):
- '''Get a value for the given key if it there is a key/value pair in
- the "initialize" request response body.
- '''
- if self.initialize_body and key in self.initialize_body:
- return self.initialize_body[key]
- return None
-
- def get_threads(self):
- if self.threads is None:
- self.request_threads()
- return self.threads
-
- def get_thread_id(self, threadIndex=0):
- '''Utility function to get the first thread ID in the thread list.
- If the thread list is empty, then fetch the threads.
- '''
- if self.threads is None:
- self.request_threads()
- if self.threads and threadIndex < len(self.threads):
- return self.threads[threadIndex]['id']
- return None
-
- def get_stackFrame(self, frameIndex=0, threadId=None):
- '''Get a single "StackFrame" object from a "stackTrace" request and
- return the "StackFrame as a python dictionary, or None on failure
- '''
- if threadId is None:
- threadId = self.get_thread_id()
- if threadId is None:
- print('invalid threadId')
- return None
- response = self.request_stackTrace(threadId, startFrame=frameIndex,
- levels=1)
- if response:
- return response['body']['stackFrames'][0]
- print('invalid response')
- return None
-
- def get_scope_variables(self, scope_name, frameIndex=0, threadId=None):
- stackFrame = self.get_stackFrame(frameIndex=frameIndex,
- threadId=threadId)
- if stackFrame is None:
- return []
- frameId = stackFrame['id']
- if frameId in self.frame_scopes:
- frame_scopes = self.frame_scopes[frameId]
- else:
- scopes_response = self.request_scopes(frameId)
- frame_scopes = scopes_response['body']['scopes']
- self.frame_scopes[frameId] = frame_scopes
- for scope in frame_scopes:
- if scope['name'] == scope_name:
- varRef = scope['variablesReference']
- variables_response = self.request_variables(varRef)
- if variables_response:
- if 'body' in variables_response:
- body = variables_response['body']
- if 'variables' in body:
- vars = body['variables']
- return vars
- return []
-
- def get_global_variables(self, frameIndex=0, threadId=None):
- return self.get_scope_variables('Globals', frameIndex=frameIndex,
- threadId=threadId)
-
- def get_local_variables(self, frameIndex=0, threadId=None):
- return self.get_scope_variables('Locals', frameIndex=frameIndex,
- threadId=threadId)
-
- def get_local_variable(self, name, frameIndex=0, threadId=None):
- locals = self.get_local_variables(frameIndex=frameIndex,
- threadId=threadId)
- for local in locals:
- if 'name' in local and local['name'] == name:
- return local
- return None
-
- def get_local_variable_value(self, name, frameIndex=0, threadId=None):
- variable = self.get_local_variable(name, frameIndex=frameIndex,
- threadId=threadId)
- if variable and 'value' in variable:
- return variable['value']
- return None
-
- def replay_packets(self, replay_file_path):
- f = open(replay_file_path, 'r')
- mode = 'invalid'
- set_sequence = False
- command_dict = None
- while mode != 'eof':
- if mode == 'invalid':
- line = f.readline()
- if line.startswith('to adapter:'):
- mode = 'send'
- elif line.startswith('from adapter:'):
- mode = 'recv'
- elif mode == 'send':
- command_dict = read_packet(f)
- # Skip the end of line that follows the JSON
- f.readline()
- if command_dict is None:
- raise ValueError('decode packet failed from replay file')
- print('Sending:')
- pprint.PrettyPrinter(indent=2).pprint(command_dict)
- # raw_input('Press ENTER to send:')
- self.send_packet(command_dict, set_sequence)
- mode = 'invalid'
- elif mode == 'recv':
- print('Replay response:')
- replay_response = read_packet(f)
- # Skip the end of line that follows the JSON
- f.readline()
- pprint.PrettyPrinter(indent=2).pprint(replay_response)
- actual_response = self.recv_packet()
- if actual_response:
- type = actual_response['type']
- print('Actual response:')
- if type == 'response':
- self.validate_response(command_dict, actual_response)
- pprint.PrettyPrinter(indent=2).pprint(actual_response)
- else:
- print("error: didn't get a valid response")
- mode = 'invalid'
-
- def request_attach(self, program=None, pid=None, waitFor=None, trace=None,
- initCommands=None, preRunCommands=None,
- stopCommands=None, exitCommands=None,
- attachCommands=None):
- args_dict = {}
- if pid is not None:
- args_dict['pid'] = pid
- if program is not None:
- args_dict['program'] = program
- if waitFor is not None:
- args_dict['waitFor'] = waitFor
- if trace:
- args_dict['trace'] = trace
- args_dict['initCommands'] = [
- 'settings set symbols.enable-external-lookup false']
- if initCommands:
- args_dict['initCommands'].extend(initCommands)
- if preRunCommands:
- args_dict['preRunCommands'] = preRunCommands
- if stopCommands:
- args_dict['stopCommands'] = stopCommands
- if exitCommands:
- args_dict['exitCommands'] = exitCommands
- if attachCommands:
- args_dict['attachCommands'] = attachCommands
- command_dict = {
- 'command': 'attach',
- 'type': 'request',
- 'arguments': args_dict
- }
- return self.send_recv(command_dict)
-
- def request_configurationDone(self):
- command_dict = {
- 'command': 'configurationDone',
- 'type': 'request',
- 'arguments': {}
- }
- response = self.send_recv(command_dict)
- if response:
- self.configuration_done_sent = True
- return response
-
- def _process_stopped(self):
- self.threads = None
- self.frame_scopes = {}
-
- def request_continue(self, threadId=None):
- if self.exit_status is not None:
- raise ValueError('request_continue called after process exited')
- # If we have launched or attached, then the first continue is done by
- # sending the 'configurationDone' request
- if not self.configuration_done_sent:
- return self.request_configurationDone()
- args_dict = {}
- if threadId is None:
- threadId = self.get_thread_id()
- args_dict['threadId'] = threadId
- command_dict = {
- 'command': 'continue',
- 'type': 'request',
- 'arguments': args_dict
- }
- response = self.send_recv(command_dict)
- recv_packets = []
- self.recv_condition.acquire()
- for event in self.recv_packets:
- if event['event'] != 'stopped':
- recv_packets.append(event)
- self.recv_packets = recv_packets
- self.recv_condition.release()
- return response
-
- def request_disconnect(self, terminateDebuggee=None):
- args_dict = {}
- if terminateDebuggee is not None:
- if terminateDebuggee:
- args_dict['terminateDebuggee'] = True
- else:
- args_dict['terminateDebuggee'] = False
- command_dict = {
- 'command': 'disconnect',
- 'type': 'request',
- 'arguments': args_dict
- }
- return self.send_recv(command_dict)
-
- def request_evaluate(self, expression, frameIndex=0, threadId=None):
- stackFrame = self.get_stackFrame(frameIndex=frameIndex,
- threadId=threadId)
- if stackFrame is None:
- return []
- args_dict = {
- 'expression': expression,
- 'frameId': stackFrame['id'],
- }
- command_dict = {
- 'command': 'evaluate',
- 'type': 'request',
- 'arguments': args_dict
- }
- return self.send_recv(command_dict)
-
- def request_initialize(self):
- command_dict = {
- 'command': 'initialize',
- 'type': 'request',
- 'arguments': {
- 'adapterID': 'lldb-native',
- 'clientID': 'vscode',
- 'columnsStartAt1': True,
- 'linesStartAt1': True,
- 'locale': 'en-us',
- 'pathFormat': 'path',
- 'supportsRunInTerminalRequest': True,
- 'supportsVariablePaging': True,
- 'supportsVariableType': True
- }
- }
- response = self.send_recv(command_dict)
- if response:
- if 'body' in response:
- self.initialize_body = response['body']
- return response
-
- def request_launch(self, program, args=None, cwd=None, env=None,
- stopOnEntry=False, disableASLR=True,
- disableSTDIO=False, shellExpandArguments=False,
- trace=False, initCommands=None, preRunCommands=None,
- stopCommands=None, exitCommands=None, sourcePath=None,
- debuggerRoot=None):
- args_dict = {
- 'program': program
- }
- if args:
- args_dict['args'] = args
- if cwd:
- args_dict['cwd'] = cwd
- if env:
- args_dict['env'] = env
- if stopOnEntry:
- args_dict['stopOnEntry'] = stopOnEntry
- if disableASLR:
- args_dict['disableASLR'] = disableASLR
- if disableSTDIO:
- args_dict['disableSTDIO'] = disableSTDIO
- if shellExpandArguments:
- args_dict['shellExpandArguments'] = shellExpandArguments
- if trace:
- args_dict['trace'] = trace
- args_dict['initCommands'] = [
- 'settings set symbols.enable-external-lookup false']
- if initCommands:
- args_dict['initCommands'].extend(initCommands)
- if preRunCommands:
- args_dict['preRunCommands'] = preRunCommands
- if stopCommands:
- args_dict['stopCommands'] = stopCommands
- if exitCommands:
- args_dict['exitCommands'] = exitCommands
- if sourcePath:
- args_dict['sourcePath'] = sourcePath
- if debuggerRoot:
- args_dict['debuggerRoot'] = debuggerRoot
- command_dict = {
- 'command': 'launch',
- 'type': 'request',
- 'arguments': args_dict
- }
- response = self.send_recv(command_dict)
-
- # Wait for a 'process' and 'initialized' event in any order
- self.wait_for_event(filter=['process', 'initialized'])
- self.wait_for_event(filter=['process', 'initialized'])
- return response
-
- def request_next(self, threadId):
- if self.exit_status is not None:
- raise ValueError('request_continue called after process exited')
- args_dict = {'threadId': threadId}
- command_dict = {
- 'command': 'next',
- 'type': 'request',
- 'arguments': args_dict
- }
- return self.send_recv(command_dict)
-
- def request_stepIn(self, threadId):
- if self.exit_status is not None:
- raise ValueError('request_continue called after process exited')
- args_dict = {'threadId': threadId}
- command_dict = {
- 'command': 'stepIn',
- 'type': 'request',
- 'arguments': args_dict
- }
- return self.send_recv(command_dict)
-
- def request_stepOut(self, threadId):
- if self.exit_status is not None:
- raise ValueError('request_continue called after process exited')
- args_dict = {'threadId': threadId}
- command_dict = {
- 'command': 'stepOut',
- 'type': 'request',
- 'arguments': args_dict
- }
- return self.send_recv(command_dict)
-
- def request_pause(self, threadId=None):
- if self.exit_status is not None:
- raise ValueError('request_continue called after process exited')
- if threadId is None:
- threadId = self.get_thread_id()
- args_dict = {'threadId': threadId}
- command_dict = {
- 'command': 'pause',
- 'type': 'request',
- 'arguments': args_dict
- }
- return self.send_recv(command_dict)
-
- def request_scopes(self, frameId):
- args_dict = {'frameId': frameId}
- command_dict = {
- 'command': 'scopes',
- 'type': 'request',
- 'arguments': args_dict
- }
- return self.send_recv(command_dict)
-
- def request_setBreakpoints(self, file_path, line_array, condition=None,
- hitCondition=None):
- (dir, base) = os.path.split(file_path)
- breakpoints = []
- for line in line_array:
- bp = {'line': line}
- if condition is not None:
- bp['condition'] = condition
- if hitCondition is not None:
- bp['hitCondition'] = hitCondition
- breakpoints.append(bp)
- source_dict = {
- 'name': base,
- 'path': file_path
- }
- args_dict = {
- 'source': source_dict,
- 'breakpoints': breakpoints,
- 'lines': '%s' % (line_array),
- 'sourceModified': False,
- }
- command_dict = {
- 'command': 'setBreakpoints',
- 'type': 'request',
- 'arguments': args_dict
- }
- return self.send_recv(command_dict)
-
- def request_setExceptionBreakpoints(self, filters):
- args_dict = {'filters': filters}
- command_dict = {
- 'command': 'setExceptionBreakpoints',
- 'type': 'request',
- 'arguments': args_dict
- }
- return self.send_recv(command_dict)
-
- def request_setFunctionBreakpoints(self, names, condition=None,
- hitCondition=None):
- breakpoints = []
- for name in names:
- bp = {'name': name}
- if condition is not None:
- bp['condition'] = condition
- if hitCondition is not None:
- bp['hitCondition'] = hitCondition
- breakpoints.append(bp)
- args_dict = {'breakpoints': breakpoints}
- command_dict = {
- 'command': 'setFunctionBreakpoints',
- 'type': 'request',
- 'arguments': args_dict
- }
- return self.send_recv(command_dict)
-
- def request_stackTrace(self, threadId=None, startFrame=None, levels=None,
- dump=False):
- if threadId is None:
- threadId = self.get_thread_id()
- args_dict = {'threadId': threadId}
- if startFrame is not None:
- args_dict['startFrame'] = startFrame
- if levels is not None:
- args_dict['levels'] = levels
- command_dict = {
- 'command': 'stackTrace',
- 'type': 'request',
- 'arguments': args_dict
- }
- response = self.send_recv(command_dict)
- if dump:
- for (idx, frame) in enumerate(response['body']['stackFrames']):
- name = frame['name']
- if 'line' in frame and 'source' in frame:
- source = frame['source']
- if 'sourceReference' not in source:
- if 'name' in source:
- source_name = source['name']
- line = frame['line']
- print("[%3u] %s @ %s:%u" % (idx, name, source_name,
- line))
- continue
- print("[%3u] %s" % (idx, name))
- return response
-
- def request_threads(self):
- '''Request a list of all threads and combine any information from any
- "stopped" events since those contain more information about why a
- thread actually stopped. Returns an array of thread dictionaries
- with information about all threads'''
- command_dict = {
- 'command': 'threads',
- 'type': 'request',
- 'arguments': {}
- }
- response = self.send_recv(command_dict)
- body = response['body']
- # Fill in "self.threads" correctly so that clients that call
- # self.get_threads() or self.get_thread_id(...) can get information
- # on threads when the process is stopped.
- if 'threads' in body:
- self.threads = body['threads']
- for thread in self.threads:
- # Copy the thread dictionary so we can add key/value pairs to
- # it without affecfting the original info from the "threads"
- # command.
- tid = thread['id']
- if tid in self.thread_stop_reasons:
- thread_stop_info = self.thread_stop_reasons[tid]
- copy_keys = ['reason', 'description', 'text']
- for key in copy_keys:
- if key in thread_stop_info:
- thread[key] = thread_stop_info[key]
- else:
- self.threads = None
- return response
-
- def request_variables(self, variablesReference, start=None, count=None):
- args_dict = {'variablesReference': variablesReference}
- if start is not None:
- args_dict['start'] = start
- if count is not None:
- args_dict['count'] = count
- command_dict = {
- 'command': 'variables',
- 'type': 'request',
- 'arguments': args_dict
- }
- return self.send_recv(command_dict)
-
- def request_setVariable(self, containingVarRef, name, value, id=None):
- args_dict = {
- 'variablesReference': containingVarRef,
- 'name': name,
- 'value': str(value)
- }
- if id is not None:
- args_dict['id'] = id
- command_dict = {
- 'command': 'setVariable',
- 'type': 'request',
- 'arguments': args_dict
- }
- return self.send_recv(command_dict)
-
- def request_testGetTargetBreakpoints(self):
- '''A request packet used in the LLDB test suite to get all currently
- set breakpoint infos for all breakpoints currently set in the
- target.
- '''
- command_dict = {
- 'command': '_testGetTargetBreakpoints',
- 'type': 'request',
- 'arguments': {}
- }
- return self.send_recv(command_dict)
-
- def terminate(self):
- self.send.close()
- # self.recv.close()
-
-
-class DebugAdaptor(DebugCommunication):
- def __init__(self, executable=None, port=None):
- self.process = None
- if executable is not None:
- self.process = subprocess.Popen([executable],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- DebugCommunication.__init__(self, self.process.stdout,
- self.process.stdin)
- elif port is not None:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(('127.0.0.1', port))
- DebugCommunication.__init__(self, s.makefile('r'), s.makefile('w'))
-
- def get_pid(self):
- if self.process:
- return self.process.pid
- return -1
-
- def terminate(self):
- super(DebugAdaptor, self).terminate()
- if self.process is not None:
- self.process.terminate()
- self.process.wait()
- self.process = None
-
-
-def attach_options_specified(options):
- if options.pid is not None:
- return True
- if options.waitFor:
- return True
- if options.attach:
- return True
- if options.attachCmds:
- return True
- return False
-
-
-def run_vscode(dbg, args, options):
- dbg.request_initialize()
- if attach_options_specified(options):
- response = dbg.request_attach(program=options.program,
- pid=options.pid,
- waitFor=options.waitFor,
- attachCommands=options.attachCmds,
- initCommands=options.initCmds,
- preRunCommands=options.preRunCmds,
- stopCommands=options.stopCmds,
- exitCommands=options.exitCmds)
- else:
- response = dbg.request_launch(options.program,
- args=args,
- env=options.envs,
- cwd=options.workingDir,
- debuggerRoot=options.debuggerRoot,
- sourcePath=options.sourcePath,
- initCommands=options.initCmds,
- preRunCommands=options.preRunCmds,
- stopCommands=options.stopCmds,
- exitCommands=options.exitCmds)
-
- if response['success']:
- if options.sourceBreakpoints:
- source_to_lines = {}
- for file_line in options.sourceBreakpoints:
- (path, line) = file_line.split(':')
- if len(path) == 0 or len(line) == 0:
- print('error: invalid source with line "%s"' %
- (file_line))
-
- else:
- if path in source_to_lines:
- source_to_lines[path].append(int(line))
- else:
- source_to_lines[path] = [int(line)]
- for source in source_to_lines:
- dbg.request_setBreakpoints(source, source_to_lines[source])
- if options.funcBreakpoints:
- dbg.request_setFunctionBreakpoints(options.funcBreakpoints)
- dbg.request_configurationDone()
- dbg.wait_for_stopped()
- else:
- if 'message' in response:
- print(response['message'])
- dbg.request_disconnect(terminateDebuggee=True)
-
-
-def main():
- parser = optparse.OptionParser(
- description=('A testing framework for the Visual Studio Code Debug '
- 'Adaptor protocol'))
-
- parser.add_option(
- '--vscode',
- type='string',
- dest='vscode_path',
- help=('The path to the command line program that implements the '
- 'Visual Studio Code Debug Adaptor protocol.'),
- default=None)
-
- parser.add_option(
- '--program',
- type='string',
- dest='program',
- help='The path to the program to debug.',
- default=None)
-
- parser.add_option(
- '--workingDir',
- type='string',
- dest='workingDir',
- default=None,
- help='Set the working directory for the process we launch.')
-
- parser.add_option(
- '--sourcePath',
- type='string',
- dest='sourcePath',
- default=None,
- help=('Set the relative source root for any debug info that has '
- 'relative paths in it.'))
-
- parser.add_option(
- '--debuggerRoot',
- type='string',
- dest='debuggerRoot',
- default=None,
- help=('Set the working directory for lldb-vscode for any object files '
- 'with relative paths in the Mach-o debug map.'))
-
- parser.add_option(
- '-r', '--replay',
- type='string',
- dest='replay',
- help=('Specify a file containing a packet log to replay with the '
- 'current Visual Studio Code Debug Adaptor executable.'),
- default=None)
-
- parser.add_option(
- '-g', '--debug',
- action='store_true',
- dest='debug',
- default=False,
- help='Pause waiting for a debugger to attach to the debug adaptor')
-
- parser.add_option(
- '--port',
- type='int',
- dest='port',
- help="Attach a socket to a port instead of using STDIN for VSCode",
- default=None)
-
- parser.add_option(
- '--pid',
- type='int',
- dest='pid',
- help="The process ID to attach to",
- default=None)
-
- parser.add_option(
- '--attach',
- action='store_true',
- dest='attach',
- default=False,
- help=('Specify this option to attach to a process by name. The '
- 'process name is the basanme of the executable specified with '
- 'the --program option.'))
-
- parser.add_option(
- '-f', '--function-bp',
- type='string',
- action='append',
- dest='funcBreakpoints',
- help=('Specify the name of a function to break at. '
- 'Can be specified more than once.'),
- default=[])
-
- parser.add_option(
- '-s', '--source-bp',
- type='string',
- action='append',
- dest='sourceBreakpoints',
- default=[],
- help=('Specify source breakpoints to set in the format of '
- '<source>:<line>. '
- 'Can be specified more than once.'))
-
- parser.add_option(
- '--attachCommand',
- type='string',
- action='append',
- dest='attachCmds',
- default=[],
- help=('Specify a LLDB command that will attach to a process. '
- 'Can be specified more than once.'))
-
- parser.add_option(
- '--initCommand',
- type='string',
- action='append',
- dest='initCmds',
- default=[],
- help=('Specify a LLDB command that will be executed before the target '
- 'is created. Can be specified more than once.'))
-
- parser.add_option(
- '--preRunCommand',
- type='string',
- action='append',
- dest='preRunCmds',
- default=[],
- help=('Specify a LLDB command that will be executed after the target '
- 'has been created. Can be specified more than once.'))
-
- parser.add_option(
- '--stopCommand',
- type='string',
- action='append',
- dest='stopCmds',
- default=[],
- help=('Specify a LLDB command that will be executed each time the'
- 'process stops. Can be specified more than once.'))
-
- parser.add_option(
- '--exitCommand',
- type='string',
- action='append',
- dest='exitCmds',
- default=[],
- help=('Specify a LLDB command that will be executed when the process '
- 'exits. Can be specified more than once.'))
-
- parser.add_option(
- '--env',
- type='string',
- action='append',
- dest='envs',
- default=[],
- help=('Specify environment variables to pass to the launched '
- 'process.'))
-
- parser.add_option(
- '--waitFor',
- action='store_true',
- dest='waitFor',
- default=False,
- help=('Wait for the next process to be launched whose name matches '
- 'the basename of the program specified with the --program '
- 'option'))
-
- (options, args) = parser.parse_args(sys.argv[1:])
-
- if options.vscode_path is None and options.port is None:
- print('error: must either specify a path to a Visual Studio Code '
- 'Debug Adaptor vscode executable path using the --vscode '
- 'option, or a port to attach to for an existing lldb-vscode '
- 'using the --port option')
- return
- dbg = DebugAdaptor(executable=options.vscode_path, port=options.port)
- if options.debug:
- raw_input('Waiting for debugger to attach pid "%i"' % (
- dbg.get_pid()))
- if options.replay:
- dbg.replay_packets(options.replay)
- else:
- run_vscode(dbg, args, options)
- dbg.terminate()
-
-
-if __name__ == '__main__':
- main()
OpenPOWER on IntegriCloud