1import os,json,struct,signal
2
3from typing import Any, Dict
4
5import lldb
6from lldb.plugins.scripted_process import ScriptedProcess
7from lldb.plugins.scripted_process import ScriptedThread
8
9from lldb.macosx.crashlog import CrashLog,CrashLogParser
10
11class CrashLogScriptedProcess(ScriptedProcess):
12    def parse_crashlog(self):
13        try:
14            crash_log = CrashLogParser().parse(self.dbg, self.crashlog_path, False)
15        except Exception as e:
16            return
17
18        self.pid = crash_log.process_id
19        self.crashed_thread_idx = crash_log.crashed_thread_idx
20        self.loaded_images = []
21
22        def load_images(self, images):
23            #TODO: Add to self.loaded_images and load images in lldb
24            if images:
25                for image in images:
26                    if image not in self.loaded_images:
27                        err = image.add_module(self.target)
28                        if err:
29                            print(err)
30                        else:
31                            self.loaded_images.append(image)
32
33        for thread in crash_log.threads:
34            if self.load_all_images:
35                load_images(self, crash_log.images)
36            elif thread.did_crash():
37                for ident in thread.idents:
38                    load_images(self, crash_log.find_images_with_identifier(ident))
39
40            self.threads[thread.index] = CrashLogScriptedThread(self, None, thread)
41
42    def __init__(self, target: lldb.SBTarget, args : lldb.SBStructuredData):
43        super().__init__(target, args)
44
45        if not self.target or not self.target.IsValid():
46            return
47
48        self.crashlog_path = None
49
50        crashlog_path = args.GetValueForKey("crashlog_path")
51        if crashlog_path and crashlog_path.IsValid():
52            if crashlog_path.GetType() == lldb.eStructuredDataTypeString:
53                self.crashlog_path = crashlog_path.GetStringValue(4096)
54
55        if not self.crashlog_path:
56            return
57
58        load_all_images = args.GetValueForKey("load_all_images")
59        if load_all_images and load_all_images.IsValid():
60            if load_all_images.GetType() == lldb.eStructuredDataTypeBoolean:
61                self.load_all_images = load_all_images.GetBooleanValue()
62
63        if not self.load_all_images:
64            self.load_all_images = False
65
66        self.pid = super().get_process_id()
67        self.crashed_thread_idx = 0
68        self.parse_crashlog()
69
70    def get_memory_region_containing_address(self, addr: int) -> lldb.SBMemoryRegionInfo:
71        return None
72
73    def get_thread_with_id(self, tid: int):
74        return {}
75
76    def get_registers_for_thread(self, tid: int):
77        return {}
78
79    def read_memory_at_address(self, addr: int, size: int) -> lldb.SBData:
80        # NOTE: CrashLogs don't contain any memory.
81        return lldb.SBData()
82
83    def get_loaded_images(self):
84        # TODO: Iterate over corefile_target modules and build a data structure
85        # from it.
86        return self.loaded_images
87
88    def get_process_id(self) -> int:
89        return self.pid
90
91    def should_stop(self) -> bool:
92        return True
93
94    def is_alive(self) -> bool:
95        return True
96
97    def get_scripted_thread_plugin(self):
98        return CrashLogScriptedThread.__module__ + "." + CrashLogScriptedThread.__name__
99
100class CrashLogScriptedThread(ScriptedThread):
101    def create_register_ctx(self):
102        if not self.has_crashed:
103            return dict.fromkeys([*map(lambda reg: reg['name'], self.register_info['registers'])] , 0)
104
105        if not self.backing_thread or not len(self.backing_thread.registers):
106            return dict.fromkeys([*map(lambda reg: reg['name'], self.register_info['registers'])] , 0)
107
108        for reg in self.register_info['registers']:
109            reg_name = reg['name']
110            if reg_name in self.backing_thread.registers:
111                self.register_ctx[reg_name] = self.backing_thread.registers[reg_name]
112            else:
113                self.register_ctx[reg_name] = 0
114
115        return self.register_ctx
116
117    def create_stackframes(self):
118        if not (self.scripted_process.load_all_images or self.has_crashed):
119            return None
120
121        if not self.backing_thread or not len(self.backing_thread.frames):
122            return None
123
124        for frame in self.backing_thread.frames:
125            sym_addr = lldb.SBAddress()
126            sym_addr.SetLoadAddress(frame.pc, self.target)
127            if not sym_addr.IsValid():
128                continue
129            self.frames.append({"idx": frame.index, "pc": frame.pc})
130
131        return self.frames
132
133    def __init__(self, process, args, crashlog_thread):
134        super().__init__(process, args)
135
136        self.backing_thread = crashlog_thread
137        self.idx = self.backing_thread.index
138        self.tid = self.backing_thread.id
139        self.name = self.backing_thread.name
140        self.queue = self.backing_thread.queue
141        self.has_crashed = (self.scripted_process.crashed_thread_idx == self.idx)
142        self.create_stackframes()
143
144    def get_state(self):
145        if not self.has_crashed:
146            return lldb.eStateStopped
147        return lldb.eStateCrashed
148
149    def get_stop_reason(self) -> Dict[str, Any]:
150        if not self.has_crashed:
151            return { "type": lldb.eStopReasonNone, "data": {  }}
152        # TODO: Investigate what stop reason should be reported when crashed
153        return { "type": lldb.eStopReasonException, "data": { "desc": "EXC_BAD_ACCESS" }}
154
155    def get_register_context(self) -> str:
156        if not self.register_ctx:
157            self.register_ctx = self.create_register_ctx()
158
159        return struct.pack("{}Q".format(len(self.register_ctx)), *self.register_ctx.values())
160