1import json 2import re 3 4import gdbremote_testcase 5from lldbsuite.test.decorators import * 6from lldbsuite.test.lldbtest import * 7from lldbsuite.test import lldbutil 8 9class TestGdbRemoteThreadsInStopReply( 10 gdbremote_testcase.GdbRemoteTestCaseBase): 11 12 ENABLE_THREADS_IN_STOP_REPLY_ENTRIES = [ 13 "read packet: $QListThreadsInStopReply#21", 14 "send packet: $OK#00", 15 ] 16 17 def gather_stop_reply_fields(self, thread_count, field_names): 18 context, threads = self.launch_with_threads(thread_count) 19 key_vals_text = context.get("stop_reply_kv") 20 self.assertIsNotNone(key_vals_text) 21 22 self.reset_test_sequence() 23 self.add_register_info_collection_packets() 24 self.add_process_info_collection_packets() 25 26 context = self.expect_gdbremote_sequence() 27 self.assertIsNotNone(context) 28 hw_info = self.parse_hw_info(context) 29 30 # Parse the stop reply contents. 31 kv_dict = self.parse_key_val_dict(key_vals_text) 32 33 result = dict(); 34 result["pc_register"] = hw_info["pc_register"] 35 result["little_endian"] = hw_info["little_endian"] 36 for key_field in field_names: 37 result[key_field] = kv_dict.get(key_field) 38 39 return result 40 41 def gather_stop_reply_threads(self, thread_count): 42 # Pull out threads from stop response. 43 stop_reply_threads_text = self.gather_stop_reply_fields( 44 thread_count, ["threads"])["threads"] 45 if stop_reply_threads_text: 46 return [int(thread_id, 16) 47 for thread_id in stop_reply_threads_text.split(",")] 48 else: 49 return [] 50 51 def gather_stop_reply_pcs(self, thread_count): 52 results = self.gather_stop_reply_fields(thread_count, ["threads", "thread-pcs"]) 53 if not results: 54 return [] 55 56 threads_text = results["threads"] 57 pcs_text = results["thread-pcs"] 58 thread_ids = threads_text.split(",") 59 pcs = pcs_text.split(",") 60 self.assertEquals(len(thread_ids), len(pcs)) 61 62 thread_pcs = dict() 63 for i in range(0, len(pcs)): 64 thread_pcs[int(thread_ids[i], 16)] = pcs[i] 65 66 result = dict() 67 result["thread_pcs"] = thread_pcs 68 result["pc_register"] = results["pc_register"] 69 result["little_endian"] = results["little_endian"] 70 return result 71 72 def switch_endian(self, egg): 73 return "".join(reversed(re.findall("..", egg))) 74 75 def parse_hw_info(self, context): 76 self.assertIsNotNone(context) 77 process_info = self.parse_process_info_response(context) 78 endian = process_info.get("endian") 79 reg_info = self.parse_register_info_packets(context) 80 (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_info) 81 82 hw_info = dict() 83 hw_info["pc_register"] = pc_lldb_reg_index 84 hw_info["little_endian"] = (endian == "little") 85 return hw_info 86 87 def gather_threads_info_pcs(self, pc_register, little_endian): 88 self.reset_test_sequence() 89 self.test_sequence.add_log_lines( 90 [ 91 "read packet: $jThreadsInfo#c1", 92 { 93 "direction": "send", 94 "regex": r"^\$(.*)#[0-9a-fA-F]{2}$", 95 "capture": { 96 1: "threads_info"}}, 97 ], 98 True) 99 100 context = self.expect_gdbremote_sequence() 101 self.assertIsNotNone(context) 102 threads_info = context.get("threads_info") 103 register = str(pc_register) 104 # The jThreadsInfo response is not valid JSON data, so we have to 105 # clean it up first. 106 jthreads_info = json.loads(re.sub(r"}]", "}", threads_info)) 107 thread_pcs = dict() 108 for thread_info in jthreads_info: 109 tid = thread_info["tid"] 110 pc = thread_info["registers"][register] 111 thread_pcs[tid] = self.switch_endian(pc) if little_endian else pc 112 113 return thread_pcs 114 115 116 def test_QListThreadsInStopReply_supported(self): 117 self.build() 118 self.set_inferior_startup_launch() 119 procs = self.prep_debug_monitor_and_inferior() 120 self.test_sequence.add_log_lines( 121 self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True) 122 123 context = self.expect_gdbremote_sequence() 124 self.assertIsNotNone(context) 125 126 @skipIfNetBSD 127 @expectedFailureAll(oslist=["windows"]) # Extra threads present 128 def test_stop_reply_reports_multiple_threads(self): 129 self.build() 130 self.set_inferior_startup_launch() 131 # Gather threads from stop notification when QThreadsInStopReply is 132 # enabled. 133 self.test_sequence.add_log_lines( 134 self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True) 135 stop_reply_threads = self.gather_stop_reply_threads(5) 136 self.assertEqual(len(stop_reply_threads), 5) 137 138 @skipIfNetBSD 139 def test_no_QListThreadsInStopReply_supplies_no_threads(self): 140 self.build() 141 self.set_inferior_startup_launch() 142 # Gather threads from stop notification when QThreadsInStopReply is not 143 # enabled. 144 stop_reply_threads = self.gather_stop_reply_threads(5) 145 self.assertEqual(len(stop_reply_threads), 0) 146 147 @skipIfNetBSD 148 def test_stop_reply_reports_correct_threads(self): 149 self.build() 150 self.set_inferior_startup_launch() 151 # Gather threads from stop notification when QThreadsInStopReply is 152 # enabled. 153 thread_count = 5 154 self.test_sequence.add_log_lines( 155 self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True) 156 stop_reply_threads = self.gather_stop_reply_threads(thread_count) 157 158 # Gather threads from q{f,s}ThreadInfo. 159 self.reset_test_sequence() 160 self.add_threadinfo_collection_packets() 161 162 context = self.expect_gdbremote_sequence() 163 self.assertIsNotNone(context) 164 165 threads = self.parse_threadinfo_packets(context) 166 self.assertIsNotNone(threads) 167 self.assertGreaterEqual(len(threads), thread_count) 168 169 # Ensure each thread in q{f,s}ThreadInfo appears in stop reply threads 170 for tid in threads: 171 self.assertIn(tid, stop_reply_threads) 172 173 @skipIfNetBSD 174 def test_stop_reply_contains_thread_pcs(self): 175 self.build() 176 self.set_inferior_startup_launch() 177 thread_count = 5 178 self.test_sequence.add_log_lines( 179 self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True) 180 results = self.gather_stop_reply_pcs(thread_count) 181 stop_reply_pcs = results["thread_pcs"] 182 pc_register = results["pc_register"] 183 little_endian = results["little_endian"] 184 self.assertGreaterEqual(len(stop_reply_pcs), thread_count) 185 186 threads_info_pcs = self.gather_threads_info_pcs(pc_register, 187 little_endian) 188 189 self.assertEqual(len(threads_info_pcs), len(stop_reply_pcs)) 190 for thread_id in stop_reply_pcs: 191 self.assertIn(thread_id, threads_info_pcs) 192 self.assertEqual(int(stop_reply_pcs[thread_id], 16), 193 int(threads_info_pcs[thread_id], 16)) 194