1""" 2Make sure that if threads are suspended outside of lldb, debugserver 3won't make them run, even if we call an expression on the thread. 4""" 5 6import lldb 7from lldbsuite.test.decorators import * 8import lldbsuite.test.lldbutil as lldbutil 9from lldbsuite.test.lldbtest import * 10 11class TestSuspendedThreadHandling(TestBase): 12 13 NO_DEBUG_INFO_TESTCASE = True 14 15 @skipUnlessDarwin 16 def test_suspended_threads(self): 17 """Test that debugserver doesn't disturb the suspend count of a thread 18 that has been suspended from within a program, when navigating breakpoints 19 on other threads, or calling functions both on the suspended thread and 20 on other threads.""" 21 self.build() 22 self.main_source_file = lldb.SBFileSpec("main.c") 23 self.suspended_thread_test() 24 25 def setUp(self): 26 # Call super's setUp(). 27 TestBase.setUp(self) 28 # Set up your test case here. If your test doesn't need any set up then 29 # remove this method from your TestCase class. 30 31 def try_an_expression(self, thread, correct_value, test_bp): 32 frame = thread.frames[0] 33 34 value = frame.EvaluateExpression('function_to_call()') 35 self.assertSuccess(value.GetError(), "Successfully called the function") 36 self.assertEqual(value.GetValueAsSigned(), correct_value, "Got expected value for expression") 37 38 # Again, make sure we didn't let the suspend thread breakpoint run: 39 self.assertEqual(test_bp.GetHitCount(), 0, "First expression allowed the suspend thread to run") 40 41 42 def make_bkpt(self, pattern): 43 bp = self.target.BreakpointCreateBySourceRegex(pattern, self.main_source_file) 44 self.assertEqual(bp.GetNumLocations(), 1, "Locations for %s"%(pattern)) 45 return bp 46 47 def suspended_thread_test(self): 48 (self.target, process, thread, bkpt) = lldbutil.run_to_source_breakpoint(self, 49 "Stop here to get things going", self.main_source_file) 50 51 # Make in the running thread, so the we will have to stop a number of times 52 # while handling breakpoints. The first couple of times we hit it we will 53 # run expressions as well. Make sure we don't let the suspended thread run 54 # during those operations. 55 rt_bp = self.make_bkpt("Break here to show we can handle breakpoints") 56 57 # Make a breakpoint that we will hit when the running thread exits: 58 rt_exit_bp = self.make_bkpt("Break here after thread_join") 59 60 # Make a breakpoint in the suspended thread. We should not hit this till we 61 # resume it after joining the running thread. 62 st_bp = self.make_bkpt("We allowed the suspend thread to run") 63 64 # Make a breakpoint after pthread_join of the suspend thread to ensure 65 # that we didn't keep the thread from exiting normally 66 st_exit_bp = self.make_bkpt(" Break here to make sure the thread exited normally") 67 68 threads = lldbutil.continue_to_breakpoint(process, rt_bp) 69 self.assertEqual(len(threads), 1, "Hit the running_func breakpoint") 70 71 # Make sure we didn't hit the suspend thread breakpoint: 72 self.assertEqual(st_bp.GetHitCount(), 0, "Continue allowed the suspend thread to run") 73 74 # Now try an expression on the running thread: 75 self.try_an_expression(threads[0], 0, st_bp) 76 77 # Continue, and check the same things: 78 threads = lldbutil.continue_to_breakpoint(process, rt_bp) 79 self.assertEqual(len(threads), 1, "We didn't hit running breakpoint") 80 81 # Try an expression on the suspended thread: 82 thread = lldb.SBThread() 83 for thread in process.threads: 84 th_name = thread.GetName() 85 if th_name == None: 86 continue 87 if "Look for me" in th_name: 88 break 89 self.assertTrue(thread.IsValid(), "We found the suspend thread.") 90 self.try_an_expression(thread, 1, st_bp) 91 92 # Now set the running thread breakpoint to auto-continue and let it 93 # run a bit to make sure we still don't let the suspend thread run. 94 rt_bp.SetAutoContinue(True) 95 threads = lldbutil.continue_to_breakpoint(process, rt_exit_bp) 96 self.assertEqual(len(threads), 1) 97 self.assertEqual(st_bp.GetHitCount(), 0, "Continue again let suspended thread run") 98 99 # Now continue and we SHOULD hit the suspend_func breakpoint: 100 threads = lldbutil.continue_to_breakpoint(process, st_bp) 101 self.assertEqual(len(threads), 1, "The thread resumed successfully") 102 103 # Finally, continue again and we should get out of the last pthread_join 104 # and the process should be about to exit 105 threads = lldbutil.continue_to_breakpoint(process, st_exit_bp) 106 self.assertEqual(len(threads), 1, "pthread_join exited successfully") 107