diff options
author | Raphael Isemann <teemperor@gmail.com> | 2019-07-19 15:55:23 +0000 |
---|---|---|
committer | Raphael Isemann <teemperor@gmail.com> | 2019-07-19 15:55:23 +0000 |
commit | b45853f173139c7c3078b97f53e7a6eba6148c13 (patch) | |
tree | 3b24eec01a7b23edd4364911d9bf6490ce2c1422 /lldb/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py | |
parent | 005423018182120f3ae2a54ff5fd3390c96fb527 (diff) | |
download | bcm5719-llvm-b45853f173139c7c3078b97f53e7a6eba6148c13.tar.gz bcm5719-llvm-b45853f173139c7c3078b97f53e7a6eba6148c13.zip |
[lldb][NFC] Cleanup mentions and code related to lldb-mi
Summary: lldb-mi has been removed, but there are still a bunch of references in the code base. This patch removes all of them.
Reviewers: JDevlieghere, jfb
Reviewed By: JDevlieghere
Subscribers: dexonsmith, ki.stfu, mgorny, abidh, jfb, lldb-commits
Tags: #lldb
Differential Revision: https://reviews.llvm.org/D64992
llvm-svn: 366590
Diffstat (limited to 'lldb/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py')
-rw-r--r-- | lldb/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py | 1102 |
1 files changed, 0 insertions, 1102 deletions
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py deleted file mode 100644 index 74ee4a6c44c..00000000000 --- a/lldb/packages/Python/lldbsuite/test/tools/lldb-vscode/vscode.py +++ /dev/null @@ -1,1102 +0,0 @@ -#!/usr/bin/env python - -import binascii -import json -import optparse -import os -import pprint -import socket -import string -import subprocess -import sys -import threading - - -def dump_memory(base_addr, data, num_per_line, outfile): - - data_len = len(data) - hex_string = binascii.hexlify(data) - addr = base_addr - ascii_str = '' - i = 0 - while i < data_len: - outfile.write('0x%8.8x: ' % (addr + i)) - bytes_left = data_len - i - if bytes_left >= num_per_line: - curr_data_len = num_per_line - else: - curr_data_len = bytes_left - hex_start_idx = i * 2 - hex_end_idx = hex_start_idx + curr_data_len * 2 - curr_hex_str = hex_string[hex_start_idx:hex_end_idx] - # 'curr_hex_str' now contains the hex byte string for the - # current line with no spaces between bytes - t = iter(curr_hex_str) - # Print hex bytes separated by space - outfile.write(' '.join(a + b for a, b in zip(t, t))) - # Print two spaces - outfile.write(' ') - # Calculate ASCII string for bytes into 'ascii_str' - ascii_str = '' - for j in range(i, i + curr_data_len): - ch = data[j] - if ch in string.printable and ch not in string.whitespace: - ascii_str += '%c' % (ch) - else: - ascii_str += '.' - # Print ASCII representation and newline - outfile.write(ascii_str) - i = i + curr_data_len - outfile.write('\n') - - -def read_packet(f, verbose=False, trace_file=None): - '''Decode a JSON packet that starts with the content length and is - followed by the JSON bytes from a file 'f'. Returns None on EOF. - ''' - line = f.readline().decode("utf-8") - if len(line) == 0: - return None # EOF. - - # Watch for line that starts with the prefix - prefix = 'Content-Length: ' - if line.startswith(prefix): - # Decode length of JSON bytes - if verbose: - print('content: "%s"' % (line)) - length = int(line[len(prefix):]) - if verbose: - print('length: "%u"' % (length)) - # Skip empty line - line = f.readline() - if verbose: - print('empty: "%s"' % (line)) - # Read JSON bytes - json_str = f.read(length) - if verbose: - print('json: "%s"' % (json_str)) - if trace_file: - trace_file.write('from adaptor:\n%s\n' % (json_str)) - # Decode the JSON bytes into a python dictionary - return json.loads(json_str) - - return None - - -def packet_type_is(packet, packet_type): - return 'type' in packet and packet['type'] == packet_type - - -def read_packet_thread(vs_comm): - done = False - while not done: - packet = read_packet(vs_comm.recv, trace_file=vs_comm.trace_file) - # `packet` will be `None` on EOF. We want to pass it down to - # handle_recv_packet anyway so the main thread can handle unexpected - # termination of lldb-vscode and stop waiting for new packets. - done = not vs_comm.handle_recv_packet(packet) - - -class DebugCommunication(object): - - def __init__(self, recv, send): - self.trace_file = None - self.send = send - self.recv = recv - self.recv_packets = [] - self.recv_condition = threading.Condition() - self.recv_thread = threading.Thread(target=read_packet_thread, - args=(self,)) - self.process_event_body = None - self.exit_status = None - self.initialize_body = None - self.thread_stop_reasons = {} - self.sequence = 1 - self.threads = None - self.recv_thread.start() - self.output_condition = threading.Condition() - self.output = {} - self.configuration_done_sent = False - self.frame_scopes = {} - - @classmethod - def encode_content(cls, s): - return ("Content-Length: %u\r\n\r\n%s" % (len(s), s)).encode("utf-8") - - @classmethod - def validate_response(cls, command, response): - if command['command'] != response['command']: - raise ValueError('command mismatch in response') - if command['seq'] != response['request_seq']: - raise ValueError('seq mismatch in response') - - def get_output(self, category, timeout=0.0, clear=True): - self.output_condition.acquire() - output = None - if category in self.output: - output = self.output[category] - if clear: - del self.output[category] - elif timeout != 0.0: - self.output_condition.wait(timeout) - if category in self.output: - output = self.output[category] - if clear: - del self.output[category] - self.output_condition.release() - return output - - def enqueue_recv_packet(self, packet): - self.recv_condition.acquire() - self.recv_packets.append(packet) - self.recv_condition.notify() - self.recv_condition.release() - - def handle_recv_packet(self, packet): - '''Called by the read thread that is waiting for all incoming packets - to store the incoming packet in "self.recv_packets" in a thread safe - way. This function will then signal the "self.recv_condition" to - indicate a new packet is available. Returns True if the caller - should keep calling this function for more packets. - ''' - # If EOF, notify the read thread by enqueing a None. - if not packet: - self.enqueue_recv_packet(None) - return False - - # Check the packet to see if is an event packet - keepGoing = True - packet_type = packet['type'] - if packet_type == 'event': - event = packet['event'] - body = None - if 'body' in packet: - body = packet['body'] - # Handle the event packet and cache information from these packets - # as they come in - if event == 'output': - # Store any output we receive so clients can retrieve it later. - category = body['category'] - output = body['output'] - self.output_condition.acquire() - if category in self.output: - self.output[category] += output - else: - self.output[category] = output - self.output_condition.notify() - self.output_condition.release() - # no need to add 'output' packets to our packets list - return keepGoing - elif event == 'process': - # When a new process is attached or launched, remember the - # details that are available in the body of the event - self.process_event_body = body - elif event == 'stopped': - # Each thread that stops with a reason will send a - # 'stopped' event. We need to remember the thread stop - # reasons since the 'threads' command doesn't return - # that information. - self._process_stopped() - tid = body['threadId'] - self.thread_stop_reasons[tid] = body - elif packet_type == 'response': - if packet['command'] == 'disconnect': - keepGoing = False - self.enqueue_recv_packet(packet) - return keepGoing - - def send_packet(self, command_dict, set_sequence=True): - '''Take the "command_dict" python dictionary and encode it as a JSON - string and send the contents as a packet to the VSCode debug - adaptor''' - # Set the sequence ID for this command automatically - if set_sequence: - command_dict['seq'] = self.sequence - self.sequence += 1 - # Encode our command dictionary as a JSON string - json_str = json.dumps(command_dict, separators=(',', ':')) - if self.trace_file: - self.trace_file.write('to adaptor:\n%s\n' % (json_str)) - length = len(json_str) - if length > 0: - # Send the encoded JSON packet and flush the 'send' file - self.send.write(self.encode_content(json_str)) - self.send.flush() - - def recv_packet(self, filter_type=None, filter_event=None, timeout=None): - '''Get a JSON packet from the VSCode debug adaptor. This function - assumes a thread that reads packets is running and will deliver - any received packets by calling handle_recv_packet(...). This - function will wait for the packet to arrive and return it when - it does.''' - while True: - try: - self.recv_condition.acquire() - packet = None - while True: - for (i, curr_packet) in enumerate(self.recv_packets): - if not curr_packet: - raise EOFError - packet_type = curr_packet['type'] - if filter_type is None or packet_type in filter_type: - if (filter_event is None or - (packet_type == 'event' and - curr_packet['event'] in filter_event)): - packet = self.recv_packets.pop(i) - break - if packet: - break - # Sleep until packet is received - len_before = len(self.recv_packets) - self.recv_condition.wait(timeout) - len_after = len(self.recv_packets) - if len_before == len_after: - return None # Timed out - return packet - except EOFError: - return None - finally: - self.recv_condition.release() - - return None - - def send_recv(self, command): - '''Send a command python dictionary as JSON and receive the JSON - response. Validates that the response is the correct sequence and - command in the reply. Any events that are received are added to the - events list in this object''' - self.send_packet(command) - done = False - while not done: - response = self.recv_packet(filter_type='response') - if response is None: - desc = 'no response for "%s"' % (command['command']) - raise ValueError(desc) - self.validate_response(command, response) - return response - return None - - def wait_for_event(self, filter=None, timeout=None): - while True: - return self.recv_packet(filter_type='event', filter_event=filter, - timeout=timeout) - return None - - def wait_for_stopped(self, timeout=None): - stopped_events = [] - stopped_event = self.wait_for_event(filter=['stopped', 'exited'], - timeout=timeout) - exited = False - while stopped_event: - stopped_events.append(stopped_event) - # If we exited, then we are done - if stopped_event['event'] == 'exited': - self.exit_status = stopped_event['body']['exitCode'] - exited = True - break - # Otherwise we stopped and there might be one or more 'stopped' - # events for each thread that stopped with a reason, so keep - # checking for more 'stopped' events and return all of them - stopped_event = self.wait_for_event(filter='stopped', timeout=0.25) - if exited: - self.threads = [] - return stopped_events - - def wait_for_exited(self): - event_dict = self.wait_for_event('exited') - if event_dict is None: - raise ValueError("didn't get stopped event") - return event_dict - - def get_initialize_value(self, key): - '''Get a value for the given key if it there is a key/value pair in - the "initialize" request response body. - ''' - if self.initialize_body and key in self.initialize_body: - return self.initialize_body[key] - return None - - def get_threads(self): - if self.threads is None: - self.request_threads() - return self.threads - - def get_thread_id(self, threadIndex=0): - '''Utility function to get the first thread ID in the thread list. - If the thread list is empty, then fetch the threads. - ''' - if self.threads is None: - self.request_threads() - if self.threads and threadIndex < len(self.threads): - return self.threads[threadIndex]['id'] - return None - - def get_stackFrame(self, frameIndex=0, threadId=None): - '''Get a single "StackFrame" object from a "stackTrace" request and - return the "StackFrame as a python dictionary, or None on failure - ''' - if threadId is None: - threadId = self.get_thread_id() - if threadId is None: - print('invalid threadId') - return None - response = self.request_stackTrace(threadId, startFrame=frameIndex, - levels=1) - if response: - return response['body']['stackFrames'][0] - print('invalid response') - return None - - def get_scope_variables(self, scope_name, frameIndex=0, threadId=None): - stackFrame = self.get_stackFrame(frameIndex=frameIndex, - threadId=threadId) - if stackFrame is None: - return [] - frameId = stackFrame['id'] - if frameId in self.frame_scopes: - frame_scopes = self.frame_scopes[frameId] - else: - scopes_response = self.request_scopes(frameId) - frame_scopes = scopes_response['body']['scopes'] - self.frame_scopes[frameId] = frame_scopes - for scope in frame_scopes: - if scope['name'] == scope_name: - varRef = scope['variablesReference'] - variables_response = self.request_variables(varRef) - if variables_response: - if 'body' in variables_response: - body = variables_response['body'] - if 'variables' in body: - vars = body['variables'] - return vars - return [] - - def get_global_variables(self, frameIndex=0, threadId=None): - return self.get_scope_variables('Globals', frameIndex=frameIndex, - threadId=threadId) - - def get_local_variables(self, frameIndex=0, threadId=None): - return self.get_scope_variables('Locals', frameIndex=frameIndex, - threadId=threadId) - - def get_local_variable(self, name, frameIndex=0, threadId=None): - locals = self.get_local_variables(frameIndex=frameIndex, - threadId=threadId) - for local in locals: - if 'name' in local and local['name'] == name: - return local - return None - - def get_local_variable_value(self, name, frameIndex=0, threadId=None): - variable = self.get_local_variable(name, frameIndex=frameIndex, - threadId=threadId) - if variable and 'value' in variable: - return variable['value'] - return None - - def replay_packets(self, replay_file_path): - f = open(replay_file_path, 'r') - mode = 'invalid' - set_sequence = False - command_dict = None - while mode != 'eof': - if mode == 'invalid': - line = f.readline() - if line.startswith('to adapter:'): - mode = 'send' - elif line.startswith('from adapter:'): - mode = 'recv' - elif mode == 'send': - command_dict = read_packet(f) - # Skip the end of line that follows the JSON - f.readline() - if command_dict is None: - raise ValueError('decode packet failed from replay file') - print('Sending:') - pprint.PrettyPrinter(indent=2).pprint(command_dict) - # raw_input('Press ENTER to send:') - self.send_packet(command_dict, set_sequence) - mode = 'invalid' - elif mode == 'recv': - print('Replay response:') - replay_response = read_packet(f) - # Skip the end of line that follows the JSON - f.readline() - pprint.PrettyPrinter(indent=2).pprint(replay_response) - actual_response = self.recv_packet() - if actual_response: - type = actual_response['type'] - print('Actual response:') - if type == 'response': - self.validate_response(command_dict, actual_response) - pprint.PrettyPrinter(indent=2).pprint(actual_response) - else: - print("error: didn't get a valid response") - mode = 'invalid' - - def request_attach(self, program=None, pid=None, waitFor=None, trace=None, - initCommands=None, preRunCommands=None, - stopCommands=None, exitCommands=None, - attachCommands=None): - args_dict = {} - if pid is not None: - args_dict['pid'] = pid - if program is not None: - args_dict['program'] = program - if waitFor is not None: - args_dict['waitFor'] = waitFor - if trace: - args_dict['trace'] = trace - args_dict['initCommands'] = [ - 'settings set symbols.enable-external-lookup false'] - if initCommands: - args_dict['initCommands'].extend(initCommands) - if preRunCommands: - args_dict['preRunCommands'] = preRunCommands - if stopCommands: - args_dict['stopCommands'] = stopCommands - if exitCommands: - args_dict['exitCommands'] = exitCommands - if attachCommands: - args_dict['attachCommands'] = attachCommands - command_dict = { - 'command': 'attach', - 'type': 'request', - 'arguments': args_dict - } - return self.send_recv(command_dict) - - def request_configurationDone(self): - command_dict = { - 'command': 'configurationDone', - 'type': 'request', - 'arguments': {} - } - response = self.send_recv(command_dict) - if response: - self.configuration_done_sent = True - return response - - def _process_stopped(self): - self.threads = None - self.frame_scopes = {} - - def request_continue(self, threadId=None): - if self.exit_status is not None: - raise ValueError('request_continue called after process exited') - # If we have launched or attached, then the first continue is done by - # sending the 'configurationDone' request - if not self.configuration_done_sent: - return self.request_configurationDone() - args_dict = {} - if threadId is None: - threadId = self.get_thread_id() - args_dict['threadId'] = threadId - command_dict = { - 'command': 'continue', - 'type': 'request', - 'arguments': args_dict - } - response = self.send_recv(command_dict) - recv_packets = [] - self.recv_condition.acquire() - for event in self.recv_packets: - if event['event'] != 'stopped': - recv_packets.append(event) - self.recv_packets = recv_packets - self.recv_condition.release() - return response - - def request_disconnect(self, terminateDebuggee=None): - args_dict = {} - if terminateDebuggee is not None: - if terminateDebuggee: - args_dict['terminateDebuggee'] = True - else: - args_dict['terminateDebuggee'] = False - command_dict = { - 'command': 'disconnect', - 'type': 'request', - 'arguments': args_dict - } - return self.send_recv(command_dict) - - def request_evaluate(self, expression, frameIndex=0, threadId=None): - stackFrame = self.get_stackFrame(frameIndex=frameIndex, - threadId=threadId) - if stackFrame is None: - return [] - args_dict = { - 'expression': expression, - 'frameId': stackFrame['id'], - } - command_dict = { - 'command': 'evaluate', - 'type': 'request', - 'arguments': args_dict - } - return self.send_recv(command_dict) - - def request_initialize(self): - command_dict = { - 'command': 'initialize', - 'type': 'request', - 'arguments': { - 'adapterID': 'lldb-native', - 'clientID': 'vscode', - 'columnsStartAt1': True, - 'linesStartAt1': True, - 'locale': 'en-us', - 'pathFormat': 'path', - 'supportsRunInTerminalRequest': True, - 'supportsVariablePaging': True, - 'supportsVariableType': True - } - } - response = self.send_recv(command_dict) - if response: - if 'body' in response: - self.initialize_body = response['body'] - return response - - def request_launch(self, program, args=None, cwd=None, env=None, - stopOnEntry=False, disableASLR=True, - disableSTDIO=False, shellExpandArguments=False, - trace=False, initCommands=None, preRunCommands=None, - stopCommands=None, exitCommands=None, sourcePath=None, - debuggerRoot=None): - args_dict = { - 'program': program - } - if args: - args_dict['args'] = args - if cwd: - args_dict['cwd'] = cwd - if env: - args_dict['env'] = env - if stopOnEntry: - args_dict['stopOnEntry'] = stopOnEntry - if disableASLR: - args_dict['disableASLR'] = disableASLR - if disableSTDIO: - args_dict['disableSTDIO'] = disableSTDIO - if shellExpandArguments: - args_dict['shellExpandArguments'] = shellExpandArguments - if trace: - args_dict['trace'] = trace - args_dict['initCommands'] = [ - 'settings set symbols.enable-external-lookup false'] - if initCommands: - args_dict['initCommands'].extend(initCommands) - if preRunCommands: - args_dict['preRunCommands'] = preRunCommands - if stopCommands: - args_dict['stopCommands'] = stopCommands - if exitCommands: - args_dict['exitCommands'] = exitCommands - if sourcePath: - args_dict['sourcePath'] = sourcePath - if debuggerRoot: - args_dict['debuggerRoot'] = debuggerRoot - command_dict = { - 'command': 'launch', - 'type': 'request', - 'arguments': args_dict - } - response = self.send_recv(command_dict) - - # Wait for a 'process' and 'initialized' event in any order - self.wait_for_event(filter=['process', 'initialized']) - self.wait_for_event(filter=['process', 'initialized']) - return response - - def request_next(self, threadId): - if self.exit_status is not None: - raise ValueError('request_continue called after process exited') - args_dict = {'threadId': threadId} - command_dict = { - 'command': 'next', - 'type': 'request', - 'arguments': args_dict - } - return self.send_recv(command_dict) - - def request_stepIn(self, threadId): - if self.exit_status is not None: - raise ValueError('request_continue called after process exited') - args_dict = {'threadId': threadId} - command_dict = { - 'command': 'stepIn', - 'type': 'request', - 'arguments': args_dict - } - return self.send_recv(command_dict) - - def request_stepOut(self, threadId): - if self.exit_status is not None: - raise ValueError('request_continue called after process exited') - args_dict = {'threadId': threadId} - command_dict = { - 'command': 'stepOut', - 'type': 'request', - 'arguments': args_dict - } - return self.send_recv(command_dict) - - def request_pause(self, threadId=None): - if self.exit_status is not None: - raise ValueError('request_continue called after process exited') - if threadId is None: - threadId = self.get_thread_id() - args_dict = {'threadId': threadId} - command_dict = { - 'command': 'pause', - 'type': 'request', - 'arguments': args_dict - } - return self.send_recv(command_dict) - - def request_scopes(self, frameId): - args_dict = {'frameId': frameId} - command_dict = { - 'command': 'scopes', - 'type': 'request', - 'arguments': args_dict - } - return self.send_recv(command_dict) - - def request_setBreakpoints(self, file_path, line_array, condition=None, - hitCondition=None): - (dir, base) = os.path.split(file_path) - breakpoints = [] - for line in line_array: - bp = {'line': line} - if condition is not None: - bp['condition'] = condition - if hitCondition is not None: - bp['hitCondition'] = hitCondition - breakpoints.append(bp) - source_dict = { - 'name': base, - 'path': file_path - } - args_dict = { - 'source': source_dict, - 'breakpoints': breakpoints, - 'lines': '%s' % (line_array), - 'sourceModified': False, - } - command_dict = { - 'command': 'setBreakpoints', - 'type': 'request', - 'arguments': args_dict - } - return self.send_recv(command_dict) - - def request_setExceptionBreakpoints(self, filters): - args_dict = {'filters': filters} - command_dict = { - 'command': 'setExceptionBreakpoints', - 'type': 'request', - 'arguments': args_dict - } - return self.send_recv(command_dict) - - def request_setFunctionBreakpoints(self, names, condition=None, - hitCondition=None): - breakpoints = [] - for name in names: - bp = {'name': name} - if condition is not None: - bp['condition'] = condition - if hitCondition is not None: - bp['hitCondition'] = hitCondition - breakpoints.append(bp) - args_dict = {'breakpoints': breakpoints} - command_dict = { - 'command': 'setFunctionBreakpoints', - 'type': 'request', - 'arguments': args_dict - } - return self.send_recv(command_dict) - - def request_stackTrace(self, threadId=None, startFrame=None, levels=None, - dump=False): - if threadId is None: - threadId = self.get_thread_id() - args_dict = {'threadId': threadId} - if startFrame is not None: - args_dict['startFrame'] = startFrame - if levels is not None: - args_dict['levels'] = levels - command_dict = { - 'command': 'stackTrace', - 'type': 'request', - 'arguments': args_dict - } - response = self.send_recv(command_dict) - if dump: - for (idx, frame) in enumerate(response['body']['stackFrames']): - name = frame['name'] - if 'line' in frame and 'source' in frame: - source = frame['source'] - if 'sourceReference' not in source: - if 'name' in source: - source_name = source['name'] - line = frame['line'] - print("[%3u] %s @ %s:%u" % (idx, name, source_name, - line)) - continue - print("[%3u] %s" % (idx, name)) - return response - - def request_threads(self): - '''Request a list of all threads and combine any information from any - "stopped" events since those contain more information about why a - thread actually stopped. Returns an array of thread dictionaries - with information about all threads''' - command_dict = { - 'command': 'threads', - 'type': 'request', - 'arguments': {} - } - response = self.send_recv(command_dict) - body = response['body'] - # Fill in "self.threads" correctly so that clients that call - # self.get_threads() or self.get_thread_id(...) can get information - # on threads when the process is stopped. - if 'threads' in body: - self.threads = body['threads'] - for thread in self.threads: - # Copy the thread dictionary so we can add key/value pairs to - # it without affecfting the original info from the "threads" - # command. - tid = thread['id'] - if tid in self.thread_stop_reasons: - thread_stop_info = self.thread_stop_reasons[tid] - copy_keys = ['reason', 'description', 'text'] - for key in copy_keys: - if key in thread_stop_info: - thread[key] = thread_stop_info[key] - else: - self.threads = None - return response - - def request_variables(self, variablesReference, start=None, count=None): - args_dict = {'variablesReference': variablesReference} - if start is not None: - args_dict['start'] = start - if count is not None: - args_dict['count'] = count - command_dict = { - 'command': 'variables', - 'type': 'request', - 'arguments': args_dict - } - return self.send_recv(command_dict) - - def request_setVariable(self, containingVarRef, name, value, id=None): - args_dict = { - 'variablesReference': containingVarRef, - 'name': name, - 'value': str(value) - } - if id is not None: - args_dict['id'] = id - command_dict = { - 'command': 'setVariable', - 'type': 'request', - 'arguments': args_dict - } - return self.send_recv(command_dict) - - def request_testGetTargetBreakpoints(self): - '''A request packet used in the LLDB test suite to get all currently - set breakpoint infos for all breakpoints currently set in the - target. - ''' - command_dict = { - 'command': '_testGetTargetBreakpoints', - 'type': 'request', - 'arguments': {} - } - return self.send_recv(command_dict) - - def terminate(self): - self.send.close() - # self.recv.close() - - -class DebugAdaptor(DebugCommunication): - def __init__(self, executable=None, port=None): - self.process = None - if executable is not None: - self.process = subprocess.Popen([executable], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - DebugCommunication.__init__(self, self.process.stdout, - self.process.stdin) - elif port is not None: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(('127.0.0.1', port)) - DebugCommunication.__init__(self, s.makefile('r'), s.makefile('w')) - - def get_pid(self): - if self.process: - return self.process.pid - return -1 - - def terminate(self): - super(DebugAdaptor, self).terminate() - if self.process is not None: - self.process.terminate() - self.process.wait() - self.process = None - - -def attach_options_specified(options): - if options.pid is not None: - return True - if options.waitFor: - return True - if options.attach: - return True - if options.attachCmds: - return True - return False - - -def run_vscode(dbg, args, options): - dbg.request_initialize() - if attach_options_specified(options): - response = dbg.request_attach(program=options.program, - pid=options.pid, - waitFor=options.waitFor, - attachCommands=options.attachCmds, - initCommands=options.initCmds, - preRunCommands=options.preRunCmds, - stopCommands=options.stopCmds, - exitCommands=options.exitCmds) - else: - response = dbg.request_launch(options.program, - args=args, - env=options.envs, - cwd=options.workingDir, - debuggerRoot=options.debuggerRoot, - sourcePath=options.sourcePath, - initCommands=options.initCmds, - preRunCommands=options.preRunCmds, - stopCommands=options.stopCmds, - exitCommands=options.exitCmds) - - if response['success']: - if options.sourceBreakpoints: - source_to_lines = {} - for file_line in options.sourceBreakpoints: - (path, line) = file_line.split(':') - if len(path) == 0 or len(line) == 0: - print('error: invalid source with line "%s"' % - (file_line)) - - else: - if path in source_to_lines: - source_to_lines[path].append(int(line)) - else: - source_to_lines[path] = [int(line)] - for source in source_to_lines: - dbg.request_setBreakpoints(source, source_to_lines[source]) - if options.funcBreakpoints: - dbg.request_setFunctionBreakpoints(options.funcBreakpoints) - dbg.request_configurationDone() - dbg.wait_for_stopped() - else: - if 'message' in response: - print(response['message']) - dbg.request_disconnect(terminateDebuggee=True) - - -def main(): - parser = optparse.OptionParser( - description=('A testing framework for the Visual Studio Code Debug ' - 'Adaptor protocol')) - - parser.add_option( - '--vscode', - type='string', - dest='vscode_path', - help=('The path to the command line program that implements the ' - 'Visual Studio Code Debug Adaptor protocol.'), - default=None) - - parser.add_option( - '--program', - type='string', - dest='program', - help='The path to the program to debug.', - default=None) - - parser.add_option( - '--workingDir', - type='string', - dest='workingDir', - default=None, - help='Set the working directory for the process we launch.') - - parser.add_option( - '--sourcePath', - type='string', - dest='sourcePath', - default=None, - help=('Set the relative source root for any debug info that has ' - 'relative paths in it.')) - - parser.add_option( - '--debuggerRoot', - type='string', - dest='debuggerRoot', - default=None, - help=('Set the working directory for lldb-vscode for any object files ' - 'with relative paths in the Mach-o debug map.')) - - parser.add_option( - '-r', '--replay', - type='string', - dest='replay', - help=('Specify a file containing a packet log to replay with the ' - 'current Visual Studio Code Debug Adaptor executable.'), - default=None) - - parser.add_option( - '-g', '--debug', - action='store_true', - dest='debug', - default=False, - help='Pause waiting for a debugger to attach to the debug adaptor') - - parser.add_option( - '--port', - type='int', - dest='port', - help="Attach a socket to a port instead of using STDIN for VSCode", - default=None) - - parser.add_option( - '--pid', - type='int', - dest='pid', - help="The process ID to attach to", - default=None) - - parser.add_option( - '--attach', - action='store_true', - dest='attach', - default=False, - help=('Specify this option to attach to a process by name. The ' - 'process name is the basanme of the executable specified with ' - 'the --program option.')) - - parser.add_option( - '-f', '--function-bp', - type='string', - action='append', - dest='funcBreakpoints', - help=('Specify the name of a function to break at. ' - 'Can be specified more than once.'), - default=[]) - - parser.add_option( - '-s', '--source-bp', - type='string', - action='append', - dest='sourceBreakpoints', - default=[], - help=('Specify source breakpoints to set in the format of ' - '<source>:<line>. ' - 'Can be specified more than once.')) - - parser.add_option( - '--attachCommand', - type='string', - action='append', - dest='attachCmds', - default=[], - help=('Specify a LLDB command that will attach to a process. ' - 'Can be specified more than once.')) - - parser.add_option( - '--initCommand', - type='string', - action='append', - dest='initCmds', - default=[], - help=('Specify a LLDB command that will be executed before the target ' - 'is created. Can be specified more than once.')) - - parser.add_option( - '--preRunCommand', - type='string', - action='append', - dest='preRunCmds', - default=[], - help=('Specify a LLDB command that will be executed after the target ' - 'has been created. Can be specified more than once.')) - - parser.add_option( - '--stopCommand', - type='string', - action='append', - dest='stopCmds', - default=[], - help=('Specify a LLDB command that will be executed each time the' - 'process stops. Can be specified more than once.')) - - parser.add_option( - '--exitCommand', - type='string', - action='append', - dest='exitCmds', - default=[], - help=('Specify a LLDB command that will be executed when the process ' - 'exits. Can be specified more than once.')) - - parser.add_option( - '--env', - type='string', - action='append', - dest='envs', - default=[], - help=('Specify environment variables to pass to the launched ' - 'process.')) - - parser.add_option( - '--waitFor', - action='store_true', - dest='waitFor', - default=False, - help=('Wait for the next process to be launched whose name matches ' - 'the basename of the program specified with the --program ' - 'option')) - - (options, args) = parser.parse_args(sys.argv[1:]) - - if options.vscode_path is None and options.port is None: - print('error: must either specify a path to a Visual Studio Code ' - 'Debug Adaptor vscode executable path using the --vscode ' - 'option, or a port to attach to for an existing lldb-vscode ' - 'using the --port option') - return - dbg = DebugAdaptor(executable=options.vscode_path, port=options.port) - if options.debug: - raw_input('Waiting for debugger to attach pid "%i"' % ( - dbg.get_pid())) - if options.replay: - dbg.replay_packets(options.replay) - else: - run_vscode(dbg, args, options) - dbg.terminate() - - -if __name__ == '__main__': - main() |