1import json
2import re
3
4import gdbremote_testcase
5from lldbsuite.test.decorators import *
6from lldbsuite.test.lldbtest import *
7from lldbsuite.test import lldbutil
8
9class TestGdbRemoteThreadsInStopReply(
10        gdbremote_testcase.GdbRemoteTestCaseBase):
11
12    mydir = TestBase.compute_mydir(__file__)
13
14    ENABLE_THREADS_IN_STOP_REPLY_ENTRIES = [
15        "read packet: $QListThreadsInStopReply#21",
16        "send packet: $OK#00",
17    ]
18
19    def gather_stop_reply_fields(self, post_startup_log_lines, thread_count,
20            field_names):
21        # Set up the inferior args.
22        inferior_args = []
23        for i in range(thread_count - 1):
24            inferior_args.append("thread:new")
25        inferior_args.append("sleep:10")
26        procs = self.prep_debug_monitor_and_inferior(
27            inferior_args=inferior_args)
28
29        self.add_register_info_collection_packets()
30        self.add_process_info_collection_packets()
31
32        # Assumes test_sequence has anything added needed to setup the initial state.
33        # (Like optionally enabling QThreadsInStopReply.)
34        if post_startup_log_lines:
35            self.test_sequence.add_log_lines(post_startup_log_lines, True)
36        self.test_sequence.add_log_lines([
37            "read packet: $c#63"
38        ], True)
39        context = self.expect_gdbremote_sequence()
40        self.assertIsNotNone(context)
41        hw_info = self.parse_hw_info(context)
42
43        # Give threads time to start up, then break.
44        time.sleep(self.DEFAULT_SLEEP)
45        self.reset_test_sequence()
46        self.test_sequence.add_log_lines(
47            [
48                "read packet: {}".format(
49                    chr(3)),
50                {
51                    "direction": "send",
52                    "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
53                    "capture": {
54                        1: "stop_result",
55                        2: "key_vals_text"}},
56            ],
57            True)
58        context = self.expect_gdbremote_sequence()
59        self.assertIsNotNone(context)
60
61        # Wait until all threads have started.
62        threads = self.wait_for_thread_count(thread_count)
63        self.assertIsNotNone(threads)
64        self.assertEqual(len(threads), thread_count)
65
66        # Run, then stop the process, grab the stop reply content.
67        self.reset_test_sequence()
68        self.test_sequence.add_log_lines(["read packet: $c#63",
69                                          "read packet: {}".format(chr(3)),
70                                          {"direction": "send",
71                                           "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
72                                           "capture": {1: "stop_result",
73                                                       2: "key_vals_text"}},
74                                          ],
75                                         True)
76        context = self.expect_gdbremote_sequence()
77        self.assertIsNotNone(context)
78
79        # Parse the stop reply contents.
80        key_vals_text = context.get("key_vals_text")
81        self.assertIsNotNone(key_vals_text)
82        kv_dict = self.parse_key_val_dict(key_vals_text)
83        self.assertIsNotNone(kv_dict)
84
85        result = dict();
86        result["pc_register"] = hw_info["pc_register"]
87        result["little_endian"] = hw_info["little_endian"]
88        for key_field in field_names:
89            result[key_field] = kv_dict.get(key_field)
90
91        return result
92
93    def gather_stop_reply_threads(self, post_startup_log_lines, thread_count):
94        # Pull out threads from stop response.
95        stop_reply_threads_text = self.gather_stop_reply_fields(
96                post_startup_log_lines, thread_count, ["threads"])["threads"]
97        if stop_reply_threads_text:
98            return [int(thread_id, 16)
99                    for thread_id in stop_reply_threads_text.split(",")]
100        else:
101            return []
102
103    def gather_stop_reply_pcs(self, post_startup_log_lines, thread_count):
104        results = self.gather_stop_reply_fields( post_startup_log_lines,
105                thread_count, ["threads", "thread-pcs"])
106        if not results:
107            return []
108
109        threads_text = results["threads"]
110        pcs_text = results["thread-pcs"]
111        thread_ids = threads_text.split(",")
112        pcs = pcs_text.split(",")
113        self.assertEquals(len(thread_ids), len(pcs))
114
115        thread_pcs = dict()
116        for i in range(0, len(pcs)):
117            thread_pcs[int(thread_ids[i], 16)] = pcs[i]
118
119        result = dict()
120        result["thread_pcs"] = thread_pcs
121        result["pc_register"] = results["pc_register"]
122        result["little_endian"] = results["little_endian"]
123        return result
124
125    def switch_endian(self, egg):
126        return "".join(reversed(re.findall("..", egg)))
127
128    def parse_hw_info(self, context):
129        self.assertIsNotNone(context)
130        process_info = self.parse_process_info_response(context)
131        endian = process_info.get("endian")
132        reg_info = self.parse_register_info_packets(context)
133        (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_info)
134
135        hw_info = dict()
136        hw_info["pc_register"] = pc_lldb_reg_index
137        hw_info["little_endian"] = (endian == "little")
138        return hw_info
139
140    def gather_threads_info_pcs(self, pc_register, little_endian):
141        self.reset_test_sequence()
142        self.test_sequence.add_log_lines(
143                [
144                    "read packet: $jThreadsInfo#c1",
145                    {
146                        "direction": "send",
147                        "regex": r"^\$(.*)#[0-9a-fA-F]{2}$",
148                        "capture": {
149                            1: "threads_info"}},
150                ],
151                True)
152
153        context = self.expect_gdbremote_sequence()
154        self.assertIsNotNone(context)
155        threads_info = context.get("threads_info")
156        register = str(pc_register)
157        # The jThreadsInfo response is not valid JSON data, so we have to
158        # clean it up first.
159        jthreads_info = json.loads(re.sub(r"}]", "}", threads_info))
160        thread_pcs = dict()
161        for thread_info in jthreads_info:
162            tid = thread_info["tid"]
163            pc = thread_info["registers"][register]
164            thread_pcs[tid] = self.switch_endian(pc) if little_endian else pc
165
166        return thread_pcs
167
168
169    def test_QListThreadsInStopReply_supported(self):
170        self.build()
171        self.set_inferior_startup_launch()
172        procs = self.prep_debug_monitor_and_inferior()
173        self.test_sequence.add_log_lines(
174            self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True)
175
176        context = self.expect_gdbremote_sequence()
177        self.assertIsNotNone(context)
178
179    # In current implementation of llgs on Windows, as a response to '\x03' packet, the debugger
180    # of the native process will trigger a call to DebugBreakProcess that will create a new thread
181    # to handle the exception debug event. So one more stop thread will be notified to the
182    # delegate, e.g. llgs.  So tests below to assert the stop threads number will all fail.
183    @expectedFailureAll(oslist=["windows"])
184    @skipIfNetBSD
185    def test_stop_reply_reports_multiple_threads(self):
186        self.build()
187        self.set_inferior_startup_launch()
188        # Gather threads from stop notification when QThreadsInStopReply is
189        # enabled.
190        stop_reply_threads = self.gather_stop_reply_threads(
191            self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, 5)
192        self.assertEqual(len(stop_reply_threads), 5)
193
194    @expectedFailureAll(oslist=["windows"])
195    @skipIfNetBSD
196    def test_no_QListThreadsInStopReply_supplies_no_threads(self):
197        self.build()
198        self.set_inferior_startup_launch()
199        # Gather threads from stop notification when QThreadsInStopReply is not
200        # enabled.
201        stop_reply_threads = self.gather_stop_reply_threads(None, 5)
202        self.assertEqual(len(stop_reply_threads), 0)
203
204    @expectedFailureAll(oslist=["windows"])
205    @skipIfNetBSD
206    def test_stop_reply_reports_correct_threads(self):
207        self.build()
208        self.set_inferior_startup_launch()
209        # Gather threads from stop notification when QThreadsInStopReply is
210        # enabled.
211        thread_count = 5
212        stop_reply_threads = self.gather_stop_reply_threads(
213            self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, thread_count)
214        self.assertEqual(len(stop_reply_threads), thread_count)
215
216        # Gather threads from q{f,s}ThreadInfo.
217        self.reset_test_sequence()
218        self.add_threadinfo_collection_packets()
219
220        context = self.expect_gdbremote_sequence()
221        self.assertIsNotNone(context)
222
223        threads = self.parse_threadinfo_packets(context)
224        self.assertIsNotNone(threads)
225        self.assertEqual(len(threads), thread_count)
226
227        # Ensure each thread in q{f,s}ThreadInfo appears in stop reply threads
228        for tid in threads:
229            self.assertIn(tid, stop_reply_threads)
230
231    @expectedFailureAll(oslist=["windows"])
232    @skipIfNetBSD
233    def test_stop_reply_contains_thread_pcs(self):
234        self.build()
235        self.set_inferior_startup_launch()
236        thread_count = 5
237        results = self.gather_stop_reply_pcs(
238                self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, thread_count)
239        stop_reply_pcs = results["thread_pcs"]
240        pc_register = results["pc_register"]
241        little_endian = results["little_endian"]
242        self.assertEqual(len(stop_reply_pcs), thread_count)
243
244        threads_info_pcs = self.gather_threads_info_pcs(pc_register,
245                little_endian)
246
247        self.assertEqual(len(threads_info_pcs), thread_count)
248        for thread_id in stop_reply_pcs:
249            self.assertIn(thread_id, threads_info_pcs)
250            self.assertEqual(int(stop_reply_pcs[thread_id], 16),
251                    int(threads_info_pcs[thread_id], 16))
252