1import os,json,struct,signal 2 3from typing import Any, Dict 4 5import lldb 6from lldb.plugins.scripted_process import ScriptedProcess 7from lldb.plugins.scripted_process import ScriptedThread 8 9from lldb.macosx.crashlog import CrashLog,CrashLogParser 10 11class CrashLogScriptedProcess(ScriptedProcess): 12 def parse_crashlog(self): 13 try: 14 crash_log = CrashLogParser().parse(self.dbg, self.crashlog_path, False) 15 except Exception as e: 16 return 17 18 self.pid = crash_log.process_id 19 self.addr_mask = crash_log.addr_mask 20 self.crashed_thread_idx = crash_log.crashed_thread_idx 21 self.loaded_images = [] 22 23 def load_images(self, images): 24 #TODO: Add to self.loaded_images and load images in lldb 25 if images: 26 for image in images: 27 if image not in self.loaded_images: 28 err = image.add_module(self.target) 29 if err: 30 print(err) 31 else: 32 self.loaded_images.append(image) 33 34 for thread in crash_log.threads: 35 if self.load_all_images: 36 load_images(self, crash_log.images) 37 elif thread.did_crash(): 38 for ident in thread.idents: 39 load_images(self, crash_log.find_images_with_identifier(ident)) 40 41 self.threads[thread.index] = CrashLogScriptedThread(self, None, thread) 42 43 def __init__(self, target: lldb.SBTarget, args : lldb.SBStructuredData): 44 super().__init__(target, args) 45 46 if not self.target or not self.target.IsValid(): 47 return 48 49 self.crashlog_path = None 50 51 crashlog_path = args.GetValueForKey("crashlog_path") 52 if crashlog_path and crashlog_path.IsValid(): 53 if crashlog_path.GetType() == lldb.eStructuredDataTypeString: 54 self.crashlog_path = crashlog_path.GetStringValue(4096) 55 56 if not self.crashlog_path: 57 return 58 59 load_all_images = args.GetValueForKey("load_all_images") 60 if load_all_images and load_all_images.IsValid(): 61 if load_all_images.GetType() == lldb.eStructuredDataTypeBoolean: 62 self.load_all_images = load_all_images.GetBooleanValue() 63 64 if not self.load_all_images: 65 self.load_all_images = False 66 67 self.pid = super().get_process_id() 68 self.crashed_thread_idx = 0 69 self.parse_crashlog() 70 71 def get_memory_region_containing_address(self, addr: int) -> lldb.SBMemoryRegionInfo: 72 return None 73 74 def get_thread_with_id(self, tid: int): 75 return {} 76 77 def get_registers_for_thread(self, tid: int): 78 return {} 79 80 def read_memory_at_address(self, addr: int, size: int) -> lldb.SBData: 81 # NOTE: CrashLogs don't contain any memory. 82 return lldb.SBData() 83 84 def get_loaded_images(self): 85 # TODO: Iterate over corefile_target modules and build a data structure 86 # from it. 87 return self.loaded_images 88 89 def get_process_id(self) -> int: 90 return self.pid 91 92 def should_stop(self) -> bool: 93 return True 94 95 def is_alive(self) -> bool: 96 return True 97 98 def get_scripted_thread_plugin(self): 99 return CrashLogScriptedThread.__module__ + "." + CrashLogScriptedThread.__name__ 100 101class CrashLogScriptedThread(ScriptedThread): 102 def create_register_ctx(self): 103 if not self.has_crashed: 104 return dict.fromkeys([*map(lambda reg: reg['name'], self.register_info['registers'])] , 0) 105 106 if not self.backing_thread or not len(self.backing_thread.registers): 107 return dict.fromkeys([*map(lambda reg: reg['name'], self.register_info['registers'])] , 0) 108 109 for reg in self.register_info['registers']: 110 reg_name = reg['name'] 111 if reg_name in self.backing_thread.registers: 112 self.register_ctx[reg_name] = self.backing_thread.registers[reg_name] 113 else: 114 self.register_ctx[reg_name] = 0 115 116 return self.register_ctx 117 118 def create_stackframes(self): 119 if not (self.scripted_process.load_all_images or self.has_crashed): 120 return None 121 122 if not self.backing_thread or not len(self.backing_thread.frames): 123 return None 124 125 for frame in self.backing_thread.frames: 126 frame_pc = frame.pc & self.scripted_process.addr_mask 127 pc = frame_pc if frame.index == 0 or frame_pc == 0 else frame_pc - 1 128 sym_addr = lldb.SBAddress() 129 sym_addr.SetLoadAddress(pc, self.target) 130 if not sym_addr.IsValid(): 131 continue 132 self.frames.append({"idx": frame.index, "pc": pc}) 133 134 return self.frames 135 136 def __init__(self, process, args, crashlog_thread): 137 super().__init__(process, args) 138 139 self.backing_thread = crashlog_thread 140 self.idx = self.backing_thread.index 141 self.tid = self.backing_thread.id 142 self.name = self.backing_thread.name 143 self.queue = self.backing_thread.queue 144 self.has_crashed = (self.scripted_process.crashed_thread_idx == self.idx) 145 self.create_stackframes() 146 147 def get_state(self): 148 if not self.has_crashed: 149 return lldb.eStateStopped 150 return lldb.eStateCrashed 151 152 def get_stop_reason(self) -> Dict[str, Any]: 153 if not self.has_crashed: 154 return { "type": lldb.eStopReasonNone, "data": { }} 155 # TODO: Investigate what stop reason should be reported when crashed 156 return { "type": lldb.eStopReasonException, "data": { "desc": "EXC_BAD_ACCESS" }} 157 158 def get_register_context(self) -> str: 159 if not self.register_ctx: 160 self.register_ctx = self.create_register_ctx() 161 162 return struct.pack("{}Q".format(len(self.register_ctx)), *self.register_ctx.values()) 163