1import json
2import re
3
4import gdbremote_testcase
5from lldbsuite.test.decorators import *
6from lldbsuite.test.lldbtest import *
7from lldbsuite.test import lldbutil
8
9class TestGdbRemoteThreadsInStopReply(
10        gdbremote_testcase.GdbRemoteTestCaseBase):
11
12    ENABLE_THREADS_IN_STOP_REPLY_ENTRIES = [
13        "read packet: $QListThreadsInStopReply#21",
14        "send packet: $OK#00",
15    ]
16
17    def gather_stop_reply_fields(self, thread_count, field_names):
18        context, threads = self.launch_with_threads(thread_count)
19        key_vals_text = context.get("stop_reply_kv")
20        self.assertIsNotNone(key_vals_text)
21
22        self.reset_test_sequence()
23        self.add_register_info_collection_packets()
24        self.add_process_info_collection_packets()
25
26        context = self.expect_gdbremote_sequence()
27        self.assertIsNotNone(context)
28        hw_info = self.parse_hw_info(context)
29
30        # Parse the stop reply contents.
31        kv_dict = self.parse_key_val_dict(key_vals_text)
32
33        result = dict();
34        result["pc_register"] = hw_info["pc_register"]
35        result["little_endian"] = hw_info["little_endian"]
36        for key_field in field_names:
37            result[key_field] = kv_dict.get(key_field)
38
39        return result
40
41    def gather_stop_reply_threads(self, thread_count):
42        # Pull out threads from stop response.
43        stop_reply_threads_text = self.gather_stop_reply_fields(
44                thread_count, ["threads"])["threads"]
45        if stop_reply_threads_text:
46            return [int(thread_id, 16)
47                    for thread_id in stop_reply_threads_text.split(",")]
48        else:
49            return []
50
51    def gather_stop_reply_pcs(self, thread_count):
52        results = self.gather_stop_reply_fields(thread_count, ["threads", "thread-pcs"])
53        if not results:
54            return []
55
56        threads_text = results["threads"]
57        pcs_text = results["thread-pcs"]
58        thread_ids = threads_text.split(",")
59        pcs = pcs_text.split(",")
60        self.assertEquals(len(thread_ids), len(pcs))
61
62        thread_pcs = dict()
63        for i in range(0, len(pcs)):
64            thread_pcs[int(thread_ids[i], 16)] = pcs[i]
65
66        result = dict()
67        result["thread_pcs"] = thread_pcs
68        result["pc_register"] = results["pc_register"]
69        result["little_endian"] = results["little_endian"]
70        return result
71
72    def switch_endian(self, egg):
73        return "".join(reversed(re.findall("..", egg)))
74
75    def parse_hw_info(self, context):
76        self.assertIsNotNone(context)
77        process_info = self.parse_process_info_response(context)
78        endian = process_info.get("endian")
79        reg_info = self.parse_register_info_packets(context)
80        (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_info)
81
82        hw_info = dict()
83        hw_info["pc_register"] = pc_lldb_reg_index
84        hw_info["little_endian"] = (endian == "little")
85        return hw_info
86
87    def gather_threads_info_pcs(self, pc_register, little_endian):
88        self.reset_test_sequence()
89        self.test_sequence.add_log_lines(
90                [
91                    "read packet: $jThreadsInfo#c1",
92                    {
93                        "direction": "send",
94                        "regex": r"^\$(.*)#[0-9a-fA-F]{2}$",
95                        "capture": {
96                            1: "threads_info"}},
97                ],
98                True)
99
100        context = self.expect_gdbremote_sequence()
101        self.assertIsNotNone(context)
102        threads_info = context.get("threads_info")
103        register = str(pc_register)
104        # The jThreadsInfo response is not valid JSON data, so we have to
105        # clean it up first.
106        jthreads_info = json.loads(re.sub(r"}]", "}", threads_info))
107        thread_pcs = dict()
108        for thread_info in jthreads_info:
109            tid = thread_info["tid"]
110            pc = thread_info["registers"][register]
111            thread_pcs[tid] = self.switch_endian(pc) if little_endian else pc
112
113        return thread_pcs
114
115
116    def test_QListThreadsInStopReply_supported(self):
117        self.build()
118        self.set_inferior_startup_launch()
119        procs = self.prep_debug_monitor_and_inferior()
120        self.test_sequence.add_log_lines(
121            self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True)
122
123        context = self.expect_gdbremote_sequence()
124        self.assertIsNotNone(context)
125
126    @skipIfNetBSD
127    @expectedFailureAll(oslist=["windows"]) # Extra threads present
128    def test_stop_reply_reports_multiple_threads(self):
129        self.build()
130        self.set_inferior_startup_launch()
131        # Gather threads from stop notification when QThreadsInStopReply is
132        # enabled.
133        self.test_sequence.add_log_lines(
134            self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True)
135        stop_reply_threads = self.gather_stop_reply_threads(5)
136        self.assertEqual(len(stop_reply_threads), 5)
137
138    @skipIfNetBSD
139    def test_no_QListThreadsInStopReply_supplies_no_threads(self):
140        self.build()
141        self.set_inferior_startup_launch()
142        # Gather threads from stop notification when QThreadsInStopReply is not
143        # enabled.
144        stop_reply_threads = self.gather_stop_reply_threads(5)
145        self.assertEqual(len(stop_reply_threads), 0)
146
147    @skipIfNetBSD
148    def test_stop_reply_reports_correct_threads(self):
149        self.build()
150        self.set_inferior_startup_launch()
151        # Gather threads from stop notification when QThreadsInStopReply is
152        # enabled.
153        thread_count = 5
154        self.test_sequence.add_log_lines(
155            self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True)
156        stop_reply_threads = self.gather_stop_reply_threads(thread_count)
157
158        # Gather threads from q{f,s}ThreadInfo.
159        self.reset_test_sequence()
160        self.add_threadinfo_collection_packets()
161
162        context = self.expect_gdbremote_sequence()
163        self.assertIsNotNone(context)
164
165        threads = self.parse_threadinfo_packets(context)
166        self.assertIsNotNone(threads)
167        self.assertGreaterEqual(len(threads), thread_count)
168
169        # Ensure each thread in q{f,s}ThreadInfo appears in stop reply threads
170        for tid in threads:
171            self.assertIn(tid, stop_reply_threads)
172
173    @skipIfNetBSD
174    def test_stop_reply_contains_thread_pcs(self):
175        self.build()
176        self.set_inferior_startup_launch()
177        thread_count = 5
178        self.test_sequence.add_log_lines(
179            self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True)
180        results = self.gather_stop_reply_pcs(thread_count)
181        stop_reply_pcs = results["thread_pcs"]
182        pc_register = results["pc_register"]
183        little_endian = results["little_endian"]
184        self.assertGreaterEqual(len(stop_reply_pcs), thread_count)
185
186        threads_info_pcs = self.gather_threads_info_pcs(pc_register,
187                little_endian)
188
189        self.assertEqual(len(threads_info_pcs), len(stop_reply_pcs))
190        for thread_id in stop_reply_pcs:
191            self.assertIn(thread_id, threads_info_pcs)
192            self.assertEqual(int(stop_reply_pcs[thread_id], 16),
193                    int(threads_info_pcs[thread_id], 16))
194