1
2import json
3import re
4
5import gdbremote_testcase
6from lldbsuite.test.decorators import *
7from lldbsuite.test.lldbtest import *
8from lldbsuite.test import lldbutil
9
10class TestGdbRemoteThreadsInStopReply(
11        gdbremote_testcase.GdbRemoteTestCaseBase):
12
13    mydir = TestBase.compute_mydir(__file__)
14
15    ENABLE_THREADS_IN_STOP_REPLY_ENTRIES = [
16        "read packet: $QListThreadsInStopReply#21",
17        "send packet: $OK#00",
18    ]
19
20    def gather_stop_reply_fields(self, post_startup_log_lines, thread_count,
21            field_names):
22        # Set up the inferior args.
23        inferior_args = []
24        for i in range(thread_count - 1):
25            inferior_args.append("thread:new")
26        inferior_args.append("sleep:10")
27        procs = self.prep_debug_monitor_and_inferior(
28            inferior_args=inferior_args)
29
30        self.add_register_info_collection_packets()
31        self.add_process_info_collection_packets()
32
33        # Assumes test_sequence has anything added needed to setup the initial state.
34        # (Like optionally enabling QThreadsInStopReply.)
35        if post_startup_log_lines:
36            self.test_sequence.add_log_lines(post_startup_log_lines, True)
37        self.test_sequence.add_log_lines([
38            "read packet: $c#63"
39        ], True)
40        context = self.expect_gdbremote_sequence()
41        self.assertIsNotNone(context)
42        hw_info = self.parse_hw_info(context)
43
44        # Give threads time to start up, then break.
45        time.sleep(self._WAIT_TIMEOUT)
46        self.reset_test_sequence()
47        self.test_sequence.add_log_lines(
48            [
49                "read packet: {}".format(
50                    chr(3)),
51                {
52                    "direction": "send",
53                    "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
54                    "capture": {
55                        1: "stop_result",
56                        2: "key_vals_text"}},
57            ],
58            True)
59        context = self.expect_gdbremote_sequence()
60        self.assertIsNotNone(context)
61
62        # Wait until all threads have started.
63        threads = self.wait_for_thread_count(thread_count,
64                                             timeout_seconds=self._WAIT_TIMEOUT)
65        self.assertIsNotNone(threads)
66        self.assertEqual(len(threads), thread_count)
67
68        # Run, then stop the process, grab the stop reply content.
69        self.reset_test_sequence()
70        self.test_sequence.add_log_lines(["read packet: $c#63",
71                                          "read packet: {}".format(chr(3)),
72                                          {"direction": "send",
73                                           "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
74                                           "capture": {1: "stop_result",
75                                                       2: "key_vals_text"}},
76                                          ],
77                                         True)
78        context = self.expect_gdbremote_sequence()
79        self.assertIsNotNone(context)
80
81        # Parse the stop reply contents.
82        key_vals_text = context.get("key_vals_text")
83        self.assertIsNotNone(key_vals_text)
84        kv_dict = self.parse_key_val_dict(key_vals_text)
85        self.assertIsNotNone(kv_dict)
86
87        result = dict();
88        result["pc_register"] = hw_info["pc_register"]
89        result["little_endian"] = hw_info["little_endian"]
90        for key_field in field_names:
91            result[key_field] = kv_dict.get(key_field)
92
93        return result
94
95    def gather_stop_reply_threads(self, post_startup_log_lines, thread_count):
96        # Pull out threads from stop response.
97        stop_reply_threads_text = self.gather_stop_reply_fields(
98                post_startup_log_lines, thread_count, ["threads"])["threads"]
99        if stop_reply_threads_text:
100            return [int(thread_id, 16)
101                    for thread_id in stop_reply_threads_text.split(",")]
102        else:
103            return []
104
105    def gather_stop_reply_pcs(self, post_startup_log_lines, thread_count):
106        results = self.gather_stop_reply_fields( post_startup_log_lines,
107                thread_count, ["threads", "thread-pcs"])
108        if not results:
109            return []
110
111        threads_text = results["threads"]
112        pcs_text = results["thread-pcs"]
113        thread_ids = threads_text.split(",")
114        pcs = pcs_text.split(",")
115        self.assertEquals(len(thread_ids), len(pcs))
116
117        thread_pcs = dict()
118        for i in range(0, len(pcs)):
119            thread_pcs[int(thread_ids[i], 16)] = pcs[i]
120
121        result = dict()
122        result["thread_pcs"] = thread_pcs
123        result["pc_register"] = results["pc_register"]
124        result["little_endian"] = results["little_endian"]
125        return result
126
127    def switch_endian(self, egg):
128        return "".join(reversed(re.findall("..", egg)))
129
130    def parse_hw_info(self, context):
131        self.assertIsNotNone(context)
132        process_info = self.parse_process_info_response(context)
133        endian = process_info.get("endian")
134        reg_info = self.parse_register_info_packets(context)
135        (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_info)
136
137        hw_info = dict()
138        hw_info["pc_register"] = pc_lldb_reg_index
139        hw_info["little_endian"] = (endian == "little")
140        return hw_info
141
142    def gather_threads_info_pcs(self, pc_register, little_endian):
143        self.reset_test_sequence()
144        self.test_sequence.add_log_lines(
145                [
146                    "read packet: $jThreadsInfo#c1",
147                    {
148                        "direction": "send",
149                        "regex": r"^\$(.*)#[0-9a-fA-F]{2}$",
150                        "capture": {
151                            1: "threads_info"}},
152                ],
153                True)
154
155        context = self.expect_gdbremote_sequence()
156        self.assertIsNotNone(context)
157        threads_info = context.get("threads_info")
158        register = str(pc_register)
159        # The jThreadsInfo response is not valid JSON data, so we have to
160        # clean it up first.
161        jthreads_info = json.loads(re.sub(r"}]", "}", threads_info))
162        thread_pcs = dict()
163        for thread_info in jthreads_info:
164            tid = thread_info["tid"]
165            pc = thread_info["registers"][register]
166            thread_pcs[tid] = self.switch_endian(pc) if little_endian else pc
167
168        return thread_pcs
169
170    def QListThreadsInStopReply_supported(self):
171        procs = self.prep_debug_monitor_and_inferior()
172        self.test_sequence.add_log_lines(
173            self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True)
174
175        context = self.expect_gdbremote_sequence()
176        self.assertIsNotNone(context)
177
178    @skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
179    @debugserver_test
180    def test_QListThreadsInStopReply_supported_debugserver(self):
181        self.init_debugserver_test()
182        self.build()
183        self.set_inferior_startup_launch()
184        self.QListThreadsInStopReply_supported()
185
186    @llgs_test
187    def test_QListThreadsInStopReply_supported_llgs(self):
188        self.init_llgs_test()
189        self.build()
190        self.set_inferior_startup_launch()
191        self.QListThreadsInStopReply_supported()
192
193    def stop_reply_reports_multiple_threads(self, thread_count):
194        # Gather threads from stop notification when QThreadsInStopReply is
195        # enabled.
196        stop_reply_threads = self.gather_stop_reply_threads(
197            self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, thread_count)
198        self.assertEqual(len(stop_reply_threads), thread_count)
199
200    @skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
201    @debugserver_test
202    def test_stop_reply_reports_multiple_threads_debugserver(self):
203        self.init_debugserver_test()
204        self.build()
205        self.set_inferior_startup_launch()
206        self.stop_reply_reports_multiple_threads(5)
207
208    # In current implementation of llgs on Windows, as a response to '\x03' packet, the debugger
209    # of the native process will trigger a call to DebugBreakProcess that will create a new thread
210    # to handle the exception debug event. So one more stop thread will be notified to the
211    # delegate, e.g. llgs.  So tests below to assert the stop threads number will all fail.
212    @expectedFailureAll(oslist=["windows"])
213    @skipIfNetBSD
214    @llgs_test
215    def test_stop_reply_reports_multiple_threads_llgs(self):
216        self.init_llgs_test()
217        self.build()
218        self.set_inferior_startup_launch()
219        self.stop_reply_reports_multiple_threads(5)
220
221    def no_QListThreadsInStopReply_supplies_no_threads(self, thread_count):
222        # Gather threads from stop notification when QThreadsInStopReply is not
223        # enabled.
224        stop_reply_threads = self.gather_stop_reply_threads(None, thread_count)
225        self.assertEqual(len(stop_reply_threads), 0)
226
227    @skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
228    @debugserver_test
229    def test_no_QListThreadsInStopReply_supplies_no_threads_debugserver(self):
230        self.init_debugserver_test()
231        self.build()
232        self.set_inferior_startup_launch()
233        self.no_QListThreadsInStopReply_supplies_no_threads(5)
234
235    @expectedFailureAll(oslist=["windows"])
236    @skipIfNetBSD
237    @llgs_test
238    def test_no_QListThreadsInStopReply_supplies_no_threads_llgs(self):
239        self.init_llgs_test()
240        self.build()
241        self.set_inferior_startup_launch()
242        self.no_QListThreadsInStopReply_supplies_no_threads(5)
243
244    def stop_reply_reports_correct_threads(self, thread_count):
245        # Gather threads from stop notification when QThreadsInStopReply is
246        # enabled.
247        stop_reply_threads = self.gather_stop_reply_threads(
248            self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, thread_count)
249        self.assertEqual(len(stop_reply_threads), thread_count)
250
251        # Gather threads from q{f,s}ThreadInfo.
252        self.reset_test_sequence()
253        self.add_threadinfo_collection_packets()
254
255        context = self.expect_gdbremote_sequence()
256        self.assertIsNotNone(context)
257
258        threads = self.parse_threadinfo_packets(context)
259        self.assertIsNotNone(threads)
260        self.assertEqual(len(threads), thread_count)
261
262        # Ensure each thread in q{f,s}ThreadInfo appears in stop reply threads
263        for tid in threads:
264            self.assertTrue(tid in stop_reply_threads)
265
266    @skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
267    @debugserver_test
268    def test_stop_reply_reports_correct_threads_debugserver(self):
269        self.init_debugserver_test()
270        self.build()
271        self.set_inferior_startup_launch()
272        self.stop_reply_reports_correct_threads(5)
273
274    @expectedFailureAll(oslist=["windows"])
275    @skipIfNetBSD
276    @llgs_test
277    def test_stop_reply_reports_correct_threads_llgs(self):
278        self.init_llgs_test()
279        self.build()
280        self.set_inferior_startup_launch()
281        self.stop_reply_reports_correct_threads(5)
282
283    def stop_reply_contains_thread_pcs(self, thread_count):
284        results = self.gather_stop_reply_pcs(
285                self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, thread_count)
286        stop_reply_pcs = results["thread_pcs"]
287        pc_register = results["pc_register"]
288        little_endian = results["little_endian"]
289        self.assertEqual(len(stop_reply_pcs), thread_count)
290
291        threads_info_pcs = self.gather_threads_info_pcs(pc_register,
292                little_endian)
293
294        self.assertEqual(len(threads_info_pcs), thread_count)
295        for thread_id in stop_reply_pcs:
296            self.assertTrue(thread_id in threads_info_pcs)
297            self.assertTrue(int(stop_reply_pcs[thread_id], 16)
298                    == int(threads_info_pcs[thread_id], 16))
299
300    @expectedFailureAll(oslist=["windows"])
301    @skipIfNetBSD
302    @llgs_test
303    def test_stop_reply_contains_thread_pcs_llgs(self):
304        self.init_llgs_test()
305        self.build()
306        self.set_inferior_startup_launch()
307        self.stop_reply_contains_thread_pcs(5)
308
309    @skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
310    @debugserver_test
311    def test_stop_reply_contains_thread_pcs_debugserver(self):
312        self.init_debugserver_test()
313        self.build()
314        self.set_inferior_startup_launch()
315        self.stop_reply_contains_thread_pcs(5)
316