1import unittest2 2import gdbremote_testcase 3from lldbsuite.test.decorators import * 4from lldbsuite.test.lldbtest import * 5from lldbsuite.test import lldbutil 6 7class TestGdbRemote_qThreadStopInfo(gdbremote_testcase.GdbRemoteTestCaseBase): 8 THREAD_COUNT = 5 9 10 def gather_stop_replies_via_qThreadStopInfo(self, threads): 11 # Grab stop reply for each thread via qThreadStopInfo{tid:hex}. 12 stop_replies = {} 13 thread_dicts = {} 14 for thread in threads: 15 # Run the qThreadStopInfo command. 16 self.reset_test_sequence() 17 self.test_sequence.add_log_lines( 18 [ 19 "read packet: $qThreadStopInfo{:x}#00".format(thread), 20 { 21 "direction": "send", 22 "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", 23 "capture": { 24 1: "stop_result", 25 2: "key_vals_text"}}, 26 ], 27 True) 28 context = self.expect_gdbremote_sequence() 29 self.assertIsNotNone(context) 30 31 # Parse stop reply contents. 32 key_vals_text = context.get("key_vals_text") 33 self.assertIsNotNone(key_vals_text) 34 kv_dict = self.parse_key_val_dict(key_vals_text) 35 self.assertIsNotNone(kv_dict) 36 37 # Verify there is a thread and that it matches the expected thread 38 # id. 39 kv_thread = kv_dict.get("thread") 40 self.assertIsNotNone(kv_thread) 41 kv_thread_id = int(kv_thread, 16) 42 self.assertEqual(kv_thread_id, thread) 43 44 # Grab the stop id reported. 45 stop_result_text = context.get("stop_result") 46 self.assertIsNotNone(stop_result_text) 47 stop_replies[kv_thread_id] = int(stop_result_text, 16) 48 49 # Hang on to the key-val dictionary for the thread. 50 thread_dicts[kv_thread_id] = kv_dict 51 52 return stop_replies 53 54 @skipIfNetBSD 55 def test_qThreadStopInfo_works_for_multiple_threads(self): 56 self.build() 57 self.set_inferior_startup_launch() 58 _, threads = self.launch_with_threads(self.THREAD_COUNT) 59 stop_replies = self.gather_stop_replies_via_qThreadStopInfo(threads) 60 triple = self.dbg.GetSelectedPlatform().GetTriple() 61 # Consider one more thread created by calling DebugBreakProcess. 62 if re.match(".*-.*-windows", triple): 63 self.assertGreaterEqual(len(stop_replies), self.THREAD_COUNT) 64 else: 65 self.assertEqual(len(stop_replies), self.THREAD_COUNT) 66 67 @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48418") 68 @expectedFailureNetBSD 69 @expectedFailureAll(oslist=["windows"]) # Output forwarding not implemented 70 def test_qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(self): 71 self.build() 72 self.set_inferior_startup_launch() 73 procs = self.prep_debug_monitor_and_inferior( 74 inferior_args=["thread:new"]*4 + ["stop-me-now", "sleep:60"]) 75 76 self.test_sequence.add_log_lines([ 77 "read packet: $c#00", 78 {"type": "output_match", 79 "regex": self.maybe_strict_output_regex(r"stop-me-now\r\n")}, 80 "read packet: \x03", 81 {"direction": "send", 82 "regex": r"^\$T([0-9a-fA-F]{2})([^#]*)#..$"}], True) 83 self.add_threadinfo_collection_packets() 84 context = self.expect_gdbremote_sequence() 85 threads = self.parse_threadinfo_packets(context) 86 87 stop_replies = self.gather_stop_replies_via_qThreadStopInfo(threads) 88 self.assertIsNotNone(stop_replies) 89 90 no_stop_reason_count = sum( 91 1 for stop_reason in list( 92 stop_replies.values()) if stop_reason == 0) 93 with_stop_reason_count = sum( 94 1 for stop_reason in list( 95 stop_replies.values()) if stop_reason != 0) 96 97 # All but one thread should report no stop reason. 98 triple = self.dbg.GetSelectedPlatform().GetTriple() 99 100 # Consider one more thread created by calling DebugBreakProcess. 101 if re.match(".*-.*-windows", triple): 102 self.assertGreaterEqual(no_stop_reason_count, self.THREAD_COUNT - 1) 103 else: 104 self.assertEqual(no_stop_reason_count, self.THREAD_COUNT - 1) 105 106 # Only one thread should should indicate a stop reason. 107 self.assertEqual(with_stop_reason_count, 1) 108