1import os,json,struct,signal 2 3from typing import Any, Dict 4 5import lldb 6from lldb.plugins.scripted_process import ScriptedProcess 7from lldb.plugins.scripted_process import ScriptedThread 8 9from lldb.macosx.crashlog import CrashLog,CrashLogParser 10 11class CrashLogScriptedProcess(ScriptedProcess): 12 def parse_crashlog(self): 13 try: 14 crash_log = CrashLogParser().parse(self.dbg, self.crashlog_path, False) 15 except Exception as e: 16 return 17 18 self.pid = crash_log.process_id 19 self.crashed_thread_idx = crash_log.crashed_thread_idx 20 self.loaded_images = [] 21 22 def load_images(self, images): 23 #TODO: Add to self.loaded_images and load images in lldb 24 if images: 25 for image in images: 26 if image not in self.loaded_images: 27 err = image.add_module(self.target) 28 if err: 29 print(err) 30 else: 31 self.loaded_images.append(image) 32 33 for thread in crash_log.threads: 34 if self.load_all_images: 35 load_images(self, crash_log.images) 36 elif thread.did_crash(): 37 for ident in thread.idents: 38 load_images(self, crash_log.find_images_with_identifier(ident)) 39 40 self.threads[thread.index] = CrashLogScriptedThread(self, None, thread) 41 42 def __init__(self, target: lldb.SBTarget, args : lldb.SBStructuredData): 43 super().__init__(target, args) 44 45 if not self.target or not self.target.IsValid(): 46 return 47 48 self.crashlog_path = None 49 50 crashlog_path = args.GetValueForKey("crashlog_path") 51 if crashlog_path and crashlog_path.IsValid(): 52 if crashlog_path.GetType() == lldb.eStructuredDataTypeString: 53 self.crashlog_path = crashlog_path.GetStringValue(4096) 54 55 if not self.crashlog_path: 56 return 57 58 load_all_images = args.GetValueForKey("load_all_images") 59 if load_all_images and load_all_images.IsValid(): 60 if load_all_images.GetType() == lldb.eStructuredDataTypeBoolean: 61 self.load_all_images = load_all_images.GetBooleanValue() 62 63 if not self.load_all_images: 64 self.load_all_images = False 65 66 self.pid = super().get_process_id() 67 self.crashed_thread_idx = 0 68 self.parse_crashlog() 69 70 def get_memory_region_containing_address(self, addr: int) -> lldb.SBMemoryRegionInfo: 71 return None 72 73 def get_thread_with_id(self, tid: int): 74 return {} 75 76 def get_registers_for_thread(self, tid: int): 77 return {} 78 79 def read_memory_at_address(self, addr: int, size: int) -> lldb.SBData: 80 # NOTE: CrashLogs don't contain any memory. 81 return lldb.SBData() 82 83 def get_loaded_images(self): 84 # TODO: Iterate over corefile_target modules and build a data structure 85 # from it. 86 return self.loaded_images 87 88 def get_process_id(self) -> int: 89 return self.pid 90 91 def should_stop(self) -> bool: 92 return True 93 94 def is_alive(self) -> bool: 95 return True 96 97 def get_scripted_thread_plugin(self): 98 return CrashLogScriptedThread.__module__ + "." + CrashLogScriptedThread.__name__ 99 100class CrashLogScriptedThread(ScriptedThread): 101 def create_register_ctx(self): 102 if not self.has_crashed: 103 return dict.fromkeys([*map(lambda reg: reg['name'], self.register_info['registers'])] , 0) 104 105 if not self.backing_thread or not len(self.backing_thread.registers): 106 return dict.fromkeys([*map(lambda reg: reg['name'], self.register_info['registers'])] , 0) 107 108 for reg in self.register_info['registers']: 109 reg_name = reg['name'] 110 if reg_name in self.backing_thread.registers: 111 self.register_ctx[reg_name] = self.backing_thread.registers[reg_name] 112 else: 113 self.register_ctx[reg_name] = 0 114 115 return self.register_ctx 116 117 def create_stackframes(self): 118 if not (self.scripted_process.load_all_images or self.has_crashed): 119 return None 120 121 if not self.backing_thread or not len(self.backing_thread.frames): 122 return None 123 124 for frame in self.backing_thread.frames: 125 sym_addr = lldb.SBAddress() 126 sym_addr.SetLoadAddress(frame.pc, self.target) 127 if not sym_addr.IsValid(): 128 continue 129 self.frames.append({"idx": frame.index, "pc": frame.pc}) 130 131 return self.frames 132 133 def __init__(self, process, args, crashlog_thread): 134 super().__init__(process, args) 135 136 self.backing_thread = crashlog_thread 137 self.idx = self.backing_thread.index 138 self.tid = self.backing_thread.id 139 self.name = self.backing_thread.name 140 self.queue = self.backing_thread.queue 141 self.has_crashed = (self.scripted_process.crashed_thread_idx == self.idx) 142 self.create_stackframes() 143 144 def get_state(self): 145 if not self.has_crashed: 146 return lldb.eStateStopped 147 return lldb.eStateCrashed 148 149 def get_stop_reason(self) -> Dict[str, Any]: 150 if not self.has_crashed: 151 return { "type": lldb.eStopReasonNone, "data": { }} 152 # TODO: Investigate what stop reason should be reported when crashed 153 return { "type": lldb.eStopReasonException, "data": { "desc": "EXC_BAD_ACCESS" }} 154 155 def get_register_context(self) -> str: 156 if not self.register_ctx: 157 self.register_ctx = self.create_register_ctx() 158 159 return struct.pack("{}Q".format(len(self.register_ctx)), *self.register_ctx.values()) 160