1import os,json,struct,signal
2
3from typing import Any, Dict
4
5import lldb
6from lldb.plugins.scripted_process import ScriptedProcess
7from lldb.plugins.scripted_process import ScriptedThread
8
9class StackCoreScriptedProcess(ScriptedProcess):
10    def get_module_with_name(self, target, name):
11        for module in target.modules:
12            if name in module.GetFileSpec().GetFilename():
13                return module
14        return None
15
16    def __init__(self, target: lldb.SBTarget, args : lldb.SBStructuredData):
17        super().__init__(target, args)
18
19        self.corefile_target = None
20        self.corefile_process = None
21
22        self.backing_target_idx = args.GetValueForKey("backing_target_idx")
23        if (self.backing_target_idx and self.backing_target_idx.IsValid()):
24            if self.backing_target_idx.GetType() == lldb.eStructuredDataTypeInteger:
25                idx = self.backing_target_idx.GetIntegerValue(42)
26            if self.backing_target_idx.GetType() == lldb.eStructuredDataTypeString:
27                idx = int(self.backing_target_idx.GetStringValue(100))
28            self.corefile_target = target.GetDebugger().GetTargetAtIndex(idx)
29            self.corefile_process = self.corefile_target.GetProcess()
30            for corefile_thread in self.corefile_process:
31                structured_data = lldb.SBStructuredData()
32                structured_data.SetFromJSON(json.dumps({
33                    "backing_target_idx" : idx,
34                    "thread_idx" : corefile_thread.GetIndexID()
35                }))
36
37                self.threads[corefile_thread.GetThreadID()] = StackCoreScriptedThread(self, structured_data)
38
39        if len(self.threads) == 2:
40            self.threads[len(self.threads) - 1].is_stopped = True
41
42        corefile_module = self.get_module_with_name(self.corefile_target,
43                                                    "libbaz.dylib")
44        if not corefile_module or not corefile_module.IsValid():
45            return
46        module_path = os.path.join(corefile_module.GetFileSpec().GetDirectory(),
47                                   corefile_module.GetFileSpec().GetFilename())
48        if not os.path.exists(module_path):
49            return
50        module_load_addr = corefile_module.GetObjectFileHeaderAddress().GetLoadAddress(self.corefile_target)
51
52        self.loaded_images.append({"path": module_path,
53                                   "load_addr": module_load_addr})
54
55    def get_memory_region_containing_address(self, addr: int) -> lldb.SBMemoryRegionInfo:
56        mem_region = lldb.SBMemoryRegionInfo()
57        error = self.corefile_process.GetMemoryRegionInfo(addr, mem_region)
58        if error.Fail():
59            return None
60        return mem_region
61
62    def get_thread_with_id(self, tid: int):
63        return {}
64
65    def get_registers_for_thread(self, tid: int):
66        return {}
67
68    def read_memory_at_address(self, addr: int, size: int) -> lldb.SBData:
69        data = lldb.SBData()
70        error = lldb.SBError()
71        bytes_read = self.corefile_process.ReadMemory(addr, size, error)
72
73        if error.Fail():
74            return data
75
76        data.SetDataWithOwnership(error, bytes_read,
77                                  self.corefile_target.GetByteOrder(),
78                                  self.corefile_target.GetAddressByteSize())
79
80        return data
81
82    def get_loaded_images(self):
83        return self.loaded_images
84
85    def get_process_id(self) -> int:
86        return 42
87
88    def should_stop(self) -> bool:
89        return True
90
91    def is_alive(self) -> bool:
92        return True
93
94    def get_scripted_thread_plugin(self):
95        return StackCoreScriptedThread.__module__ + "." + StackCoreScriptedThread.__name__
96
97
98class StackCoreScriptedThread(ScriptedThread):
99    def __init__(self, process, args):
100        super().__init__(process, args)
101        backing_target_idx = args.GetValueForKey("backing_target_idx")
102        thread_idx = args.GetValueForKey("thread_idx")
103        self.is_stopped = False
104
105        def extract_value_from_structured_data(data, default_val):
106            if data and data.IsValid():
107                if data.GetType() == lldb.eStructuredDataTypeInteger:
108                    return data.GetIntegerValue(default_val)
109                if data.GetType() == lldb.eStructuredDataTypeString:
110                    return int(data.GetStringValue(100))
111            return None
112
113        #TODO: Change to Walrus operator (:=) with oneline if assignment
114        # Requires python 3.8
115        val = extract_value_from_structured_data(thread_idx, 0)
116        if val is not None:
117            self.idx = val
118
119        self.corefile_target = None
120        self.corefile_process = None
121        self.corefile_thread = None
122
123        #TODO: Change to Walrus operator (:=) with oneline if assignment
124        # Requires python 3.8
125        val = extract_value_from_structured_data(backing_target_idx, 42)
126        if val is not None:
127            self.corefile_target = self.target.GetDebugger().GetTargetAtIndex(val)
128            self.corefile_process = self.corefile_target.GetProcess()
129            self.corefile_thread = self.corefile_process.GetThreadByIndexID(self.idx)
130
131        if self.corefile_thread:
132            self.id = self.corefile_thread.GetThreadID()
133
134    def get_thread_id(self) -> int:
135        return self.id
136
137    def get_name(self) -> str:
138        return StackCoreScriptedThread.__name__ + ".thread-" + str(self.id)
139
140    def get_stop_reason(self) -> Dict[str, Any]:
141        stop_reason = { "type": lldb.eStopReasonInvalid, "data": {  }}
142
143        if self.corefile_thread and self.corefile_thread.IsValid() \
144            and self.get_thread_id() == self.corefile_thread.GetThreadID():
145            stop_reason["type"] = lldb.eStopReasonNone
146
147            if self.is_stopped:
148                if 'arm64' in self.scripted_process.arch:
149                    stop_reason["type"] = lldb.eStopReasonException
150                    stop_reason["data"]["desc"] = self.corefile_thread.GetStopDescription(100)
151                elif self.scripted_process.arch == 'x86_64':
152                    stop_reason["type"] = lldb.eStopReasonSignal
153                    stop_reason["data"]["signal"] = signal.SIGTRAP
154                else:
155                    stop_reason["type"] = self.corefile_thread.GetStopReason()
156
157        return stop_reason
158
159    def get_register_context(self) -> str:
160        if not self.corefile_thread or self.corefile_thread.GetNumFrames() == 0:
161            return None
162        frame = self.corefile_thread.GetFrameAtIndex(0)
163
164        GPRs = None
165        registerSet = frame.registers # Returns an SBValueList.
166        for regs in registerSet:
167            if 'general purpose' in regs.name.lower():
168                GPRs = regs
169                break
170
171        if not GPRs:
172            return None
173
174        for reg in GPRs:
175            self.register_ctx[reg.name] = int(reg.value, base=16)
176
177        return struct.pack("{}Q".format(len(self.register_ctx)), *self.register_ctx.values())
178
179
180def __lldb_init_module(debugger, dict):
181    if not 'SKIP_SCRIPTED_PROCESS_LAUNCH' in os.environ:
182        debugger.HandleCommand(
183            "process launch -C %s.%s" % (__name__,
184                                     StackCoreScriptedProcess.__name__))
185    else:
186        print("Name of the class that will manage the scripted process: '%s.%s'"
187                % (__name__, StackCoreScriptedProcess.__name__))
188