1""" 2Test lldb Python event APIs. 3""" 4 5from __future__ import print_function 6 7 8import re 9import lldb 10from lldbsuite.test.decorators import * 11from lldbsuite.test.lldbtest import * 12from lldbsuite.test import lldbutil 13 14 15@skipIfLinux # llvm.org/pr25924, sometimes generating SIGSEGV 16class EventAPITestCase(TestBase): 17 18 mydir = TestBase.compute_mydir(__file__) 19 NO_DEBUG_INFO_TESTCASE = True 20 21 def setUp(self): 22 # Call super's setUp(). 23 TestBase.setUp(self) 24 # Find the line number to of function 'c'. 25 self.line = line_number( 26 'main.c', '// Find the line number of function "c" here.') 27 28 @expectedFailureAll( 29 oslist=["linux"], 30 bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases") 31 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 32 @skipIfNetBSD 33 def test_listen_for_and_print_event(self): 34 """Exercise SBEvent API.""" 35 self.build() 36 exe = self.getBuildArtifact("a.out") 37 38 self.dbg.SetAsync(True) 39 40 # Create a target by the debugger. 41 target = self.dbg.CreateTarget(exe) 42 self.assertTrue(target, VALID_TARGET) 43 44 # Now create a breakpoint on main.c by name 'c'. 45 breakpoint = target.BreakpointCreateByName('c', 'a.out') 46 47 listener = lldb.SBListener("my listener") 48 49 # Now launch the process, and do not stop at the entry point. 50 error = lldb.SBError() 51 flags = target.GetLaunchInfo().GetLaunchFlags() 52 process = target.Launch(listener, 53 None, # argv 54 None, # envp 55 None, # stdin_path 56 None, # stdout_path 57 None, # stderr_path 58 None, # working directory 59 flags, # launch flags 60 False, # Stop at entry 61 error) # error 62 63 self.assertEqual( 64 process.GetState(), lldb.eStateStopped, 65 PROCESS_STOPPED) 66 67 # Create an empty event object. 68 event = lldb.SBEvent() 69 70 traceOn = self.TraceOn() 71 if traceOn: 72 lldbutil.print_stacktraces(process) 73 74 # Create MyListeningThread class to wait for any kind of event. 75 import threading 76 77 class MyListeningThread(threading.Thread): 78 79 def run(self): 80 count = 0 81 # Let's only try at most 4 times to retrieve any kind of event. 82 # After that, the thread exits. 83 while not count > 3: 84 if traceOn: 85 print("Try wait for event...") 86 if listener.WaitForEvent(5, event): 87 if traceOn: 88 desc = lldbutil.get_description(event) 89 print("Event description:", desc) 90 print("Event data flavor:", event.GetDataFlavor()) 91 print( 92 "Process state:", 93 lldbutil.state_type_to_str( 94 process.GetState())) 95 print() 96 else: 97 if traceOn: 98 print("timeout occurred waiting for event...") 99 count = count + 1 100 listener.Clear() 101 return 102 103 # Let's start the listening thread to retrieve the events. 104 my_thread = MyListeningThread() 105 my_thread.start() 106 107 # Use Python API to continue the process. The listening thread should be 108 # able to receive the state changed events. 109 process.Continue() 110 111 # Use Python API to kill the process. The listening thread should be 112 # able to receive the state changed event, too. 113 process.Kill() 114 115 # Wait until the 'MyListeningThread' terminates. 116 my_thread.join() 117 118 # Shouldn't we be testing against some kind of expectation here? 119 120 @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases 121 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 122 @skipIfNetBSD 123 def test_wait_for_event(self): 124 """Exercise SBListener.WaitForEvent() API.""" 125 self.build() 126 exe = self.getBuildArtifact("a.out") 127 128 self.dbg.SetAsync(True) 129 130 # Create a target by the debugger. 131 target = self.dbg.CreateTarget(exe) 132 self.assertTrue(target, VALID_TARGET) 133 134 # Now create a breakpoint on main.c by name 'c'. 135 breakpoint = target.BreakpointCreateByName('c', 'a.out') 136 self.trace("breakpoint:", breakpoint) 137 self.assertTrue(breakpoint and 138 breakpoint.GetNumLocations() == 1, 139 VALID_BREAKPOINT) 140 141 # Get the debugger listener. 142 listener = self.dbg.GetListener() 143 144 # Now launch the process, and do not stop at entry point. 145 error = lldb.SBError() 146 flags = target.GetLaunchInfo().GetLaunchFlags() 147 process = target.Launch(listener, 148 None, # argv 149 None, # envp 150 None, # stdin_path 151 None, # stdout_path 152 None, # stderr_path 153 None, # working directory 154 flags, # launch flags 155 False, # Stop at entry 156 error) # error 157 self.assertTrue(error.Success() and process, PROCESS_IS_VALID) 158 159 # Create an empty event object. 160 event = lldb.SBEvent() 161 self.assertFalse(event, "Event should not be valid initially") 162 163 # Create MyListeningThread to wait for any kind of event. 164 import threading 165 166 class MyListeningThread(threading.Thread): 167 168 def run(self): 169 count = 0 170 # Let's only try at most 3 times to retrieve any kind of event. 171 while not count > 3: 172 if listener.WaitForEvent(5, event): 173 self.context.trace("Got a valid event:", event) 174 self.context.trace("Event data flavor:", event.GetDataFlavor()) 175 self.context.trace("Event type:", lldbutil.state_type_to_str(event.GetType())) 176 listener.Clear() 177 return 178 count = count + 1 179 print("Timeout: listener.WaitForEvent") 180 listener.Clear() 181 return 182 183 # Use Python API to kill the process. The listening thread should be 184 # able to receive a state changed event. 185 process.Kill() 186 187 # Let's start the listening thread to retrieve the event. 188 my_thread = MyListeningThread() 189 my_thread.context = self 190 my_thread.start() 191 192 # Wait until the 'MyListeningThread' terminates. 193 my_thread.join() 194 195 self.assertTrue(event, 196 "My listening thread successfully received an event") 197 198 @expectedFailureAll( 199 oslist=["linux"], 200 bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases") 201 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 202 @expectedFailureNetBSD 203 def test_add_listener_to_broadcaster(self): 204 """Exercise some SBBroadcaster APIs.""" 205 self.build() 206 exe = self.getBuildArtifact("a.out") 207 208 self.dbg.SetAsync(True) 209 210 # Create a target by the debugger. 211 target = self.dbg.CreateTarget(exe) 212 self.assertTrue(target, VALID_TARGET) 213 214 # Now create a breakpoint on main.c by name 'c'. 215 breakpoint = target.BreakpointCreateByName('c', 'a.out') 216 self.trace("breakpoint:", breakpoint) 217 self.assertTrue(breakpoint and 218 breakpoint.GetNumLocations() == 1, 219 VALID_BREAKPOINT) 220 221 listener = lldb.SBListener("my listener") 222 223 # Now launch the process, and do not stop at the entry point. 224 error = lldb.SBError() 225 flags = target.GetLaunchInfo().GetLaunchFlags() 226 process = target.Launch(listener, 227 None, # argv 228 None, # envp 229 None, # stdin_path 230 None, # stdout_path 231 None, # stderr_path 232 None, # working directory 233 flags, # launch flags 234 False, # Stop at entry 235 error) # error 236 237 # Create an empty event object. 238 event = lldb.SBEvent() 239 self.assertFalse(event, "Event should not be valid initially") 240 241 # The finite state machine for our custom listening thread, with an 242 # initial state of None, which means no event has been received. 243 # It changes to 'connected' after 'connected' event is received (for remote platforms) 244 # It changes to 'running' after 'running' event is received (should happen only if the 245 # currentstate is either 'None' or 'connected') 246 # It changes to 'stopped' if a 'stopped' event is received (should happen only if the 247 # current state is 'running'.) 248 self.state = None 249 250 # Create MyListeningThread to wait for state changed events. 251 # By design, a "running" event is expected following by a "stopped" 252 # event. 253 import threading 254 255 class MyListeningThread(threading.Thread): 256 257 def run(self): 258 self.context.trace("Running MyListeningThread:", self) 259 260 # Regular expression pattern for the event description. 261 pattern = re.compile("data = {.*, state = (.*)}$") 262 263 # Let's only try at most 6 times to retrieve our events. 264 count = 0 265 while True: 266 if listener.WaitForEvent(5, event): 267 desc = lldbutil.get_description(event) 268 self.context.trace("Event description:", desc) 269 match = pattern.search(desc) 270 if not match: 271 break 272 if match.group(1) == 'connected': 273 # When debugging remote targets with lldb-server, we 274 # first get the 'connected' event. 275 self.context.assertTrue(self.context.state is None) 276 self.context.state = 'connected' 277 continue 278 elif match.group(1) == 'running': 279 self.context.assertTrue( 280 self.context.state is None or self.context.state == 'connected') 281 self.context.state = 'running' 282 continue 283 elif match.group(1) == 'stopped': 284 self.context.assertTrue( 285 self.context.state == 'running') 286 # Whoopee, both events have been received! 287 self.context.state = 'stopped' 288 break 289 else: 290 break 291 print("Timeout: listener.WaitForEvent") 292 count = count + 1 293 if count > 6: 294 break 295 listener.Clear() 296 return 297 298 # Use Python API to continue the process. The listening thread should be 299 # able to receive the state changed events. 300 process.Continue() 301 302 # Start the listening thread to receive the "running" followed by the 303 # "stopped" events. 304 my_thread = MyListeningThread() 305 # Supply the enclosing context so that our listening thread can access 306 # the 'state' variable. 307 my_thread.context = self 308 my_thread.start() 309 310 # Wait until the 'MyListeningThread' terminates. 311 my_thread.join() 312 313 # The final judgement. :-) 314 self.assertEqual(self.state, 'stopped', 315 "Both expected state changed events received") 316