1import os,json,struct,signal
2
3from typing import Any, Dict
4
5import lldb
6from lldb.plugins.scripted_process import ScriptedProcess
7from lldb.plugins.scripted_process import ScriptedThread
8
9from lldb.macosx.crashlog import CrashLog,CrashLogParser
10
11class CrashLogScriptedProcess(ScriptedProcess):
12    def parse_crashlog(self):
13        try:
14            crash_log = CrashLogParser().parse(self.dbg, self.crashlog_path, False)
15        except Exception as e:
16            return
17
18        self.pid = crash_log.process_id
19        self.crashed_thread_idx = crash_log.crashed_thread_idx
20        self.loaded_images = []
21
22        for thread in crash_log.threads:
23            if thread.did_crash():
24                for ident in thread.idents:
25                    images = crash_log.find_images_with_identifier(ident)
26                    if images:
27                        for image in images:
28                            #TODO: Add to self.loaded_images and load images in lldb
29                            err = image.add_module(self.target)
30                            if err:
31                                print(err)
32                            else:
33                                self.loaded_images.append(image)
34            self.threads[thread.index] = CrashLogScriptedThread(self, None, thread)
35
36    def __init__(self, target: lldb.SBTarget, args : lldb.SBStructuredData):
37        super().__init__(target, args)
38
39        if not self.target or not self.target.IsValid():
40            return
41
42        self.crashlog_path = None
43
44        crashlog_path = args.GetValueForKey("crashlog_path")
45        if crashlog_path and crashlog_path.IsValid():
46            if crashlog_path.GetType() == lldb.eStructuredDataTypeString:
47                self.crashlog_path = crashlog_path.GetStringValue(4096)
48
49        if not self.crashlog_path:
50            return
51
52        self.pid = super().get_process_id()
53        self.crashed_thread_idx = 0
54        self.parse_crashlog()
55
56    def get_memory_region_containing_address(self, addr: int) -> lldb.SBMemoryRegionInfo:
57        return None
58
59    def get_thread_with_id(self, tid: int):
60        return {}
61
62    def get_registers_for_thread(self, tid: int):
63        return {}
64
65    def read_memory_at_address(self, addr: int, size: int) -> lldb.SBData:
66        # NOTE: CrashLogs don't contain any memory.
67        return lldb.SBData()
68
69    def get_loaded_images(self):
70        # TODO: Iterate over corefile_target modules and build a data structure
71        # from it.
72        return self.loaded_images
73
74    def get_process_id(self) -> int:
75        return self.pid
76
77    def should_stop(self) -> bool:
78        return True
79
80    def is_alive(self) -> bool:
81        return True
82
83    def get_scripted_thread_plugin(self):
84        return CrashLogScriptedThread.__module__ + "." + CrashLogScriptedThread.__name__
85
86class CrashLogScriptedThread(ScriptedThread):
87    def create_register_ctx(self):
88        if not self.has_crashed:
89            return dict.fromkeys([*map(lambda reg: reg['name'], self.register_info['registers'])] , 0)
90
91        if not self.backing_thread or not len(self.backing_thread.registers):
92            return dict.fromkeys([*map(lambda reg: reg['name'], self.register_info['registers'])] , 0)
93
94        for reg in self.register_info['registers']:
95            reg_name = reg['name']
96            if reg_name in self.backing_thread.registers:
97                self.register_ctx[reg_name] = self.backing_thread.registers[reg_name]
98            else:
99                self.register_ctx[reg_name] = 0
100
101        return self.register_ctx
102
103    def create_stackframes(self):
104        if not self.has_crashed:
105            return None
106
107        if not self.backing_thread or not len(self.backing_thread.frames):
108            return None
109
110        for frame in self.backing_thread.frames:
111            sym_addr = lldb.SBAddress()
112            sym_addr.SetLoadAddress(frame.pc, self.target)
113            if not sym_addr.IsValid():
114                continue
115            self.frames.append({"idx": frame.index, "pc": frame.pc})
116
117        return self.frames
118
119    def __init__(self, process, args, crashlog_thread):
120        super().__init__(process, args)
121
122        self.backing_thread = crashlog_thread
123        self.idx = self.backing_thread.index
124        self.has_crashed = (self.scripted_process.crashed_thread_idx == self.idx)
125        self.create_stackframes()
126
127    def get_thread_id(self) -> int:
128        return self.idx
129
130    def get_name(self) -> str:
131        return CrashLogScriptedThread.__name__ + ".thread-" + str(self.idx)
132
133    def get_state(self):
134        if not self.has_crashed:
135            return lldb.eStateStopped
136        return lldb.eStateCrashed
137
138    def get_stop_reason(self) -> Dict[str, Any]:
139        if not self.has_crashed:
140            return { "type": lldb.eStopReasonNone, "data": {  }}
141        # TODO: Investigate what stop reason should be reported when crashed
142        return { "type": lldb.eStopReasonException, "data": { "desc": "EXC_BAD_ACCESS" }}
143
144    def get_register_context(self) -> str:
145        if not self.register_ctx:
146            self.register_ctx = self.create_register_ctx()
147
148        return struct.pack("{}Q".format(len(self.register_ctx)), *self.register_ctx.values())
149