1import os,json,struct,signal 2 3from typing import Any, Dict 4 5import lldb 6from lldb.plugins.scripted_process import ScriptedProcess 7from lldb.plugins.scripted_process import ScriptedThread 8 9from lldb.macosx.crashlog import CrashLog,CrashLogParser 10 11class CrashLogScriptedProcess(ScriptedProcess): 12 def parse_crashlog(self): 13 try: 14 crash_log = CrashLogParser().parse(self.dbg, self.crashlog_path, False) 15 except Exception as e: 16 return 17 18 self.pid = crash_log.process_id 19 self.crashed_thread_idx = crash_log.crashed_thread_idx 20 self.loaded_images = [] 21 22 for thread in crash_log.threads: 23 if thread.did_crash(): 24 for ident in thread.idents: 25 images = crash_log.find_images_with_identifier(ident) 26 if images: 27 for image in images: 28 #TODO: Add to self.loaded_images and load images in lldb 29 err = image.add_module(self.target) 30 if err: 31 print(err) 32 else: 33 self.loaded_images.append(image) 34 self.threads[thread.index] = CrashLogScriptedThread(self, None, thread) 35 36 def __init__(self, target: lldb.SBTarget, args : lldb.SBStructuredData): 37 super().__init__(target, args) 38 39 if not self.target or not self.target.IsValid(): 40 return 41 42 self.crashlog_path = None 43 44 crashlog_path = args.GetValueForKey("crashlog_path") 45 if crashlog_path and crashlog_path.IsValid(): 46 if crashlog_path.GetType() == lldb.eStructuredDataTypeString: 47 self.crashlog_path = crashlog_path.GetStringValue(4096) 48 49 if not self.crashlog_path: 50 return 51 52 self.pid = super().get_process_id() 53 self.crashed_thread_idx = 0 54 self.parse_crashlog() 55 56 def get_memory_region_containing_address(self, addr: int) -> lldb.SBMemoryRegionInfo: 57 return None 58 59 def get_thread_with_id(self, tid: int): 60 return {} 61 62 def get_registers_for_thread(self, tid: int): 63 return {} 64 65 def read_memory_at_address(self, addr: int, size: int) -> lldb.SBData: 66 # NOTE: CrashLogs don't contain any memory. 67 return lldb.SBData() 68 69 def get_loaded_images(self): 70 # TODO: Iterate over corefile_target modules and build a data structure 71 # from it. 72 return self.loaded_images 73 74 def get_process_id(self) -> int: 75 return self.pid 76 77 def should_stop(self) -> bool: 78 return True 79 80 def is_alive(self) -> bool: 81 return True 82 83 def get_scripted_thread_plugin(self): 84 return CrashLogScriptedThread.__module__ + "." + CrashLogScriptedThread.__name__ 85 86class CrashLogScriptedThread(ScriptedThread): 87 def create_register_ctx(self): 88 if not self.has_crashed: 89 return dict.fromkeys([*map(lambda reg: reg['name'], self.register_info['registers'])] , 0) 90 91 if not self.backing_thread or not len(self.backing_thread.registers): 92 return dict.fromkeys([*map(lambda reg: reg['name'], self.register_info['registers'])] , 0) 93 94 for reg in self.register_info['registers']: 95 reg_name = reg['name'] 96 if reg_name in self.backing_thread.registers: 97 self.register_ctx[reg_name] = self.backing_thread.registers[reg_name] 98 else: 99 self.register_ctx[reg_name] = 0 100 101 return self.register_ctx 102 103 def create_stackframes(self): 104 if not self.has_crashed: 105 return None 106 107 if not self.backing_thread or not len(self.backing_thread.frames): 108 return None 109 110 for frame in self.backing_thread.frames: 111 sym_addr = lldb.SBAddress() 112 sym_addr.SetLoadAddress(frame.pc, self.target) 113 if not sym_addr.IsValid(): 114 continue 115 self.frames.append({"idx": frame.index, "pc": frame.pc}) 116 117 return self.frames 118 119 def __init__(self, process, args, crashlog_thread): 120 super().__init__(process, args) 121 122 self.backing_thread = crashlog_thread 123 self.idx = self.backing_thread.index 124 self.has_crashed = (self.scripted_process.crashed_thread_idx == self.idx) 125 self.create_stackframes() 126 127 def get_thread_id(self) -> int: 128 return self.idx 129 130 def get_name(self) -> str: 131 return CrashLogScriptedThread.__name__ + ".thread-" + str(self.idx) 132 133 def get_state(self): 134 if not self.has_crashed: 135 return lldb.eStateStopped 136 return lldb.eStateCrashed 137 138 def get_stop_reason(self) -> Dict[str, Any]: 139 if not self.has_crashed: 140 return { "type": lldb.eStopReasonNone, "data": { }} 141 # TODO: Investigate what stop reason should be reported when crashed 142 return { "type": lldb.eStopReasonException, "data": { "desc": "EXC_BAD_ACCESS" }} 143 144 def get_register_context(self) -> str: 145 if not self.register_ctx: 146 self.register_ctx = self.create_register_ctx() 147 148 return struct.pack("{}Q".format(len(self.register_ctx)), *self.register_ctx.values()) 149