1import unittest2 2import gdbremote_testcase 3from lldbsuite.test.decorators import * 4from lldbsuite.test.lldbtest import * 5from lldbsuite.test import lldbutil 6 7class TestGdbRemote_qThreadStopInfo(gdbremote_testcase.GdbRemoteTestCaseBase): 8 9 mydir = TestBase.compute_mydir(__file__) 10 THREAD_COUNT = 5 11 12 def gather_stop_replies_via_qThreadStopInfo(self, threads): 13 # Grab stop reply for each thread via qThreadStopInfo{tid:hex}. 14 stop_replies = {} 15 thread_dicts = {} 16 for thread in threads: 17 # Run the qThreadStopInfo command. 18 self.reset_test_sequence() 19 self.test_sequence.add_log_lines( 20 [ 21 "read packet: $qThreadStopInfo{:x}#00".format(thread), 22 { 23 "direction": "send", 24 "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", 25 "capture": { 26 1: "stop_result", 27 2: "key_vals_text"}}, 28 ], 29 True) 30 context = self.expect_gdbremote_sequence() 31 self.assertIsNotNone(context) 32 33 # Parse stop reply contents. 34 key_vals_text = context.get("key_vals_text") 35 self.assertIsNotNone(key_vals_text) 36 kv_dict = self.parse_key_val_dict(key_vals_text) 37 self.assertIsNotNone(kv_dict) 38 39 # Verify there is a thread and that it matches the expected thread 40 # id. 41 kv_thread = kv_dict.get("thread") 42 self.assertIsNotNone(kv_thread) 43 kv_thread_id = int(kv_thread, 16) 44 self.assertEqual(kv_thread_id, thread) 45 46 # Grab the stop id reported. 47 stop_result_text = context.get("stop_result") 48 self.assertIsNotNone(stop_result_text) 49 stop_replies[kv_thread_id] = int(stop_result_text, 16) 50 51 # Hang on to the key-val dictionary for the thread. 52 thread_dicts[kv_thread_id] = kv_dict 53 54 return stop_replies 55 56 @skipIfNetBSD 57 def test_qThreadStopInfo_works_for_multiple_threads(self): 58 self.build() 59 self.set_inferior_startup_launch() 60 _, threads = self.launch_with_threads(self.THREAD_COUNT) 61 stop_replies = self.gather_stop_replies_via_qThreadStopInfo(threads) 62 triple = self.dbg.GetSelectedPlatform().GetTriple() 63 # Consider one more thread created by calling DebugBreakProcess. 64 if re.match(".*-.*-windows", triple): 65 self.assertGreaterEqual(len(stop_replies), self.THREAD_COUNT) 66 else: 67 self.assertEqual(len(stop_replies), self.THREAD_COUNT) 68 69 @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48418") 70 @expectedFailureNetBSD 71 @expectedFailureAll(oslist=["windows"]) # Output forwarding not implemented 72 def test_qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(self): 73 self.build() 74 self.set_inferior_startup_launch() 75 procs = self.prep_debug_monitor_and_inferior( 76 inferior_args=["thread:new"]*4 + ["stop-me-now", "sleep:60"]) 77 78 self.test_sequence.add_log_lines([ 79 "read packet: $c#00", 80 {"type": "output_match", 81 "regex": self.maybe_strict_output_regex(r"stop-me-now\r\n")}, 82 "read packet: \x03", 83 {"direction": "send", 84 "regex": r"^\$T([0-9a-fA-F]{2})([^#]*)#..$"}], True) 85 self.add_threadinfo_collection_packets() 86 context = self.expect_gdbremote_sequence() 87 threads = self.parse_threadinfo_packets(context) 88 89 stop_replies = self.gather_stop_replies_via_qThreadStopInfo(threads) 90 self.assertIsNotNone(stop_replies) 91 92 no_stop_reason_count = sum( 93 1 for stop_reason in list( 94 stop_replies.values()) if stop_reason == 0) 95 with_stop_reason_count = sum( 96 1 for stop_reason in list( 97 stop_replies.values()) if stop_reason != 0) 98 99 # All but one thread should report no stop reason. 100 triple = self.dbg.GetSelectedPlatform().GetTriple() 101 102 # Consider one more thread created by calling DebugBreakProcess. 103 if re.match(".*-.*-windows", triple): 104 self.assertGreaterEqual(no_stop_reason_count, self.THREAD_COUNT - 1) 105 else: 106 self.assertEqual(no_stop_reason_count, self.THREAD_COUNT - 1) 107 108 # Only one thread should should indicate a stop reason. 109 self.assertEqual(with_stop_reason_count, 1) 110