1""" 2Test case for testing the gdbremote protocol. 3 4Tests run against debugserver and lldb-server (llgs). 5lldb-server tests run where the lldb-server exe is 6available. 7 8This class will be broken into smaller test case classes by 9gdb remote packet functional areas. For now it contains 10the initial set of tests implemented. 11""" 12 13import unittest2 14import gdbremote_testcase 15import lldbgdbserverutils 16from lldbsuite.support import seven 17from lldbsuite.test.decorators import * 18from lldbsuite.test.lldbtest import * 19from lldbsuite.test.lldbdwarf import * 20from lldbsuite.test import lldbutil 21 22 23class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase, DwarfOpcodeParser): 24 25 mydir = TestBase.compute_mydir(__file__) 26 27 def test_thread_suffix_supported(self): 28 server = self.connect_to_debug_monitor() 29 self.assertIsNotNone(server) 30 31 self.add_no_ack_remote_stream() 32 self.test_sequence.add_log_lines( 33 ["lldb-server < 26> read packet: $QThreadSuffixSupported#e4", 34 "lldb-server < 6> send packet: $OK#9a"], 35 True) 36 37 self.expect_gdbremote_sequence() 38 39 40 def test_list_threads_in_stop_reply_supported(self): 41 server = self.connect_to_debug_monitor() 42 self.assertIsNotNone(server) 43 44 self.add_no_ack_remote_stream() 45 self.test_sequence.add_log_lines( 46 ["lldb-server < 27> read packet: $QListThreadsInStopReply#21", 47 "lldb-server < 6> send packet: $OK#9a"], 48 True) 49 self.expect_gdbremote_sequence() 50 51 def test_c_packet_works(self): 52 self.build() 53 procs = self.prep_debug_monitor_and_inferior() 54 self.test_sequence.add_log_lines( 55 ["read packet: $c#63", 56 "send packet: $W00#00"], 57 True) 58 59 self.expect_gdbremote_sequence() 60 61 @skipIfWindows # No pty support to test any inferior output 62 def test_inferior_print_exit(self): 63 self.build() 64 procs = self.prep_debug_monitor_and_inferior( 65 inferior_args=["hello, world"]) 66 self.test_sequence.add_log_lines( 67 ["read packet: $vCont;c#a8", 68 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"hello, world\r\n")}, 69 "send packet: $W00#00"], 70 True) 71 72 context = self.expect_gdbremote_sequence() 73 self.assertIsNotNone(context) 74 75 def test_first_launch_stop_reply_thread_matches_first_qC(self): 76 self.build() 77 procs = self.prep_debug_monitor_and_inferior() 78 self.test_sequence.add_log_lines(["read packet: $qC#00", 79 {"direction": "send", 80 "regex": r"^\$QC([0-9a-fA-F]+)#", 81 "capture": {1: "thread_id"}}, 82 "read packet: $?#00", 83 {"direction": "send", 84 "regex": r"^\$T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+)", 85 "expect_captures": {1: "thread_id"}}], 86 True) 87 self.expect_gdbremote_sequence() 88 89 def test_attach_commandline_continue_app_exits(self): 90 self.build() 91 self.set_inferior_startup_attach() 92 procs = self.prep_debug_monitor_and_inferior() 93 self.test_sequence.add_log_lines( 94 ["read packet: $vCont;c#a8", 95 "send packet: $W00#00"], 96 True) 97 self.expect_gdbremote_sequence() 98 99 # Wait a moment for completed and now-detached inferior process to 100 # clear. 101 time.sleep(1) 102 103 if not lldb.remote_platform: 104 # Process should be dead now. Reap results. 105 poll_result = procs["inferior"].poll() 106 self.assertIsNotNone(poll_result) 107 108 # Where possible, verify at the system level that the process is not 109 # running. 110 self.assertFalse( 111 lldbgdbserverutils.process_is_running( 112 procs["inferior"].pid, False)) 113 114 def test_qRegisterInfo_returns_one_valid_result(self): 115 self.build() 116 self.prep_debug_monitor_and_inferior() 117 self.test_sequence.add_log_lines( 118 ["read packet: $qRegisterInfo0#00", 119 {"direction": "send", "regex": r"^\$(.+);#[0-9A-Fa-f]{2}", "capture": {1: "reginfo_0"}}], 120 True) 121 122 # Run the stream 123 context = self.expect_gdbremote_sequence() 124 self.assertIsNotNone(context) 125 126 reg_info_packet = context.get("reginfo_0") 127 self.assertIsNotNone(reg_info_packet) 128 self.assert_valid_reg_info( 129 lldbgdbserverutils.parse_reg_info_response(reg_info_packet)) 130 131 def test_qRegisterInfo_returns_all_valid_results(self): 132 self.build() 133 self.prep_debug_monitor_and_inferior() 134 self.add_register_info_collection_packets() 135 136 # Run the stream. 137 context = self.expect_gdbremote_sequence() 138 self.assertIsNotNone(context) 139 140 # Validate that each register info returned validates. 141 for reg_info in self.parse_register_info_packets(context): 142 self.assert_valid_reg_info(reg_info) 143 144 def test_qRegisterInfo_contains_required_generics_debugserver(self): 145 self.build() 146 self.prep_debug_monitor_and_inferior() 147 self.add_register_info_collection_packets() 148 149 # Run the packet stream. 150 context = self.expect_gdbremote_sequence() 151 self.assertIsNotNone(context) 152 153 # Gather register info entries. 154 reg_infos = self.parse_register_info_packets(context) 155 156 # Collect all generic registers found. 157 generic_regs = { 158 reg_info['generic']: 1 for reg_info in reg_infos if 'generic' in reg_info} 159 160 # Ensure we have a program counter register. 161 self.assertIn('pc', generic_regs) 162 163 # Ensure we have a frame pointer register. PPC64le's FP is the same as SP 164 if self.getArchitecture() != 'powerpc64le': 165 self.assertIn('fp', generic_regs) 166 167 # Ensure we have a stack pointer register. 168 self.assertIn('sp', generic_regs) 169 170 # Ensure we have a flags register. 171 self.assertIn('flags', generic_regs) 172 173 def test_qRegisterInfo_contains_at_least_one_register_set(self): 174 self.build() 175 self.prep_debug_monitor_and_inferior() 176 self.add_register_info_collection_packets() 177 178 # Run the packet stream. 179 context = self.expect_gdbremote_sequence() 180 self.assertIsNotNone(context) 181 182 # Gather register info entries. 183 reg_infos = self.parse_register_info_packets(context) 184 185 # Collect all register sets found. 186 register_sets = { 187 reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info} 188 self.assertTrue(len(register_sets) >= 1) 189 190 def targetHasAVX(self): 191 triple = self.dbg.GetSelectedPlatform().GetTriple() 192 193 # TODO other platforms, please implement this function 194 if not re.match(".*-.*-linux", triple): 195 return True 196 197 # Need to do something different for non-Linux/Android targets 198 if lldb.remote_platform: 199 self.runCmd('platform get-file "/proc/cpuinfo" "cpuinfo"') 200 cpuinfo_path = "cpuinfo" 201 self.addTearDownHook(lambda: os.unlink("cpuinfo")) 202 else: 203 cpuinfo_path = "/proc/cpuinfo" 204 205 f = open(cpuinfo_path, 'r') 206 cpuinfo = f.read() 207 f.close() 208 return " avx " in cpuinfo 209 210 @expectedFailureAll(oslist=["windows"]) # no avx for now. 211 @skipIf(archs=no_match(['amd64', 'i386', 'x86_64'])) 212 @add_test_categories(["llgs"]) 213 def test_qRegisterInfo_contains_avx_registers(self): 214 self.build() 215 self.prep_debug_monitor_and_inferior() 216 self.add_register_info_collection_packets() 217 218 # Run the packet stream. 219 context = self.expect_gdbremote_sequence() 220 self.assertIsNotNone(context) 221 222 # Gather register info entries. 223 reg_infos = self.parse_register_info_packets(context) 224 225 # Collect all generics found. 226 register_sets = { 227 reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info} 228 self.assertEqual( 229 self.targetHasAVX(), 230 "Advanced Vector Extensions" in register_sets) 231 232 def qThreadInfo_contains_thread(self): 233 procs = self.prep_debug_monitor_and_inferior() 234 self.add_threadinfo_collection_packets() 235 236 # Run the packet stream. 237 context = self.expect_gdbremote_sequence() 238 self.assertIsNotNone(context) 239 240 # Gather threadinfo entries. 241 threads = self.parse_threadinfo_packets(context) 242 self.assertIsNotNone(threads) 243 244 # We should have exactly one thread. 245 self.assertEqual(len(threads), 1) 246 247 def test_qThreadInfo_contains_thread_launch(self): 248 self.build() 249 self.set_inferior_startup_launch() 250 self.qThreadInfo_contains_thread() 251 252 @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped 253 def test_qThreadInfo_contains_thread_attach(self): 254 self.build() 255 self.set_inferior_startup_attach() 256 self.qThreadInfo_contains_thread() 257 258 def qThreadInfo_matches_qC(self): 259 procs = self.prep_debug_monitor_and_inferior() 260 261 self.add_threadinfo_collection_packets() 262 self.test_sequence.add_log_lines( 263 ["read packet: $qC#00", 264 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}} 265 ], True) 266 267 # Run the packet stream. 268 context = self.expect_gdbremote_sequence() 269 self.assertIsNotNone(context) 270 271 # Gather threadinfo entries. 272 threads = self.parse_threadinfo_packets(context) 273 self.assertIsNotNone(threads) 274 275 # We should have exactly one thread from threadinfo. 276 self.assertEqual(len(threads), 1) 277 278 # We should have a valid thread_id from $QC. 279 QC_thread_id_hex = context.get("thread_id") 280 self.assertIsNotNone(QC_thread_id_hex) 281 QC_thread_id = int(QC_thread_id_hex, 16) 282 283 # Those two should be the same. 284 self.assertEqual(threads[0], QC_thread_id) 285 286 def test_qThreadInfo_matches_qC_launch(self): 287 self.build() 288 self.set_inferior_startup_launch() 289 self.qThreadInfo_matches_qC() 290 291 @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped 292 def test_qThreadInfo_matches_qC_attach(self): 293 self.build() 294 self.set_inferior_startup_attach() 295 self.qThreadInfo_matches_qC() 296 297 def test_p_returns_correct_data_size_for_each_qRegisterInfo_launch(self): 298 self.build() 299 self.set_inferior_startup_launch() 300 procs = self.prep_debug_monitor_and_inferior() 301 self.add_register_info_collection_packets() 302 303 # Run the packet stream. 304 context = self.expect_gdbremote_sequence() 305 self.assertIsNotNone(context) 306 307 # Gather register info entries. 308 reg_infos = self.parse_register_info_packets(context) 309 self.assertIsNotNone(reg_infos) 310 self.assertTrue(len(reg_infos) > 0) 311 312 byte_order = self.get_target_byte_order() 313 314 # Read value for each register. 315 reg_index = 0 316 for reg_info in reg_infos: 317 # Skip registers that don't have a register set. For x86, these are 318 # the DRx registers, which have no LLDB-kind register number and thus 319 # cannot be read via normal 320 # NativeRegisterContext::ReadRegister(reg_info,...) calls. 321 if not "set" in reg_info: 322 continue 323 324 # Clear existing packet expectations. 325 self.reset_test_sequence() 326 327 # Run the register query 328 self.test_sequence.add_log_lines( 329 ["read packet: $p{0:x}#00".format(reg_index), 330 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}], 331 True) 332 context = self.expect_gdbremote_sequence() 333 self.assertIsNotNone(context) 334 335 # Verify the response length. 336 p_response = context.get("p_response") 337 self.assertIsNotNone(p_response) 338 339 # Skip erraneous (unsupported) registers. 340 # TODO: remove this once we make unsupported registers disappear. 341 if p_response.startswith("E") and len(p_response) == 3: 342 continue 343 344 if "dynamic_size_dwarf_expr_bytes" in reg_info: 345 self.updateRegInfoBitsize(reg_info, byte_order) 346 self.assertEqual(len(p_response), 2 * int(reg_info["bitsize"]) / 8, 347 reg_info) 348 349 # Increment loop 350 reg_index += 1 351 352 def Hg_switches_to_3_threads(self): 353 # Startup the inferior with three threads (main + 2 new ones). 354 procs = self.prep_debug_monitor_and_inferior( 355 inferior_args=["thread:new", "thread:new"]) 356 357 # Let the inferior process have a few moments to start up the thread 358 # when launched. (The launch scenario has no time to run, so threads 359 # won't be there yet.) 360 self.run_process_then_stop(run_seconds=1) 361 362 # Wait at most x seconds for 3 threads to be present. 363 threads = self.wait_for_thread_count(3) 364 self.assertEqual(len(threads), 3) 365 366 # verify we can $H to each thead, and $qC matches the thread we set. 367 for thread in threads: 368 # Change to each thread, verify current thread id. 369 self.reset_test_sequence() 370 self.test_sequence.add_log_lines( 371 ["read packet: $Hg{0:x}#00".format(thread), # Set current thread. 372 "send packet: $OK#00", 373 "read packet: $qC#00", 374 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}], 375 True) 376 377 context = self.expect_gdbremote_sequence() 378 self.assertIsNotNone(context) 379 380 # Verify the thread id. 381 self.assertIsNotNone(context.get("thread_id")) 382 self.assertEqual(int(context.get("thread_id"), 16), thread) 383 384 @expectedFailureAll(oslist=["windows"]) # expect 4 threads 385 def test_Hg_switches_to_3_threads_launch(self): 386 self.build() 387 self.set_inferior_startup_launch() 388 self.Hg_switches_to_3_threads() 389 390 @expectedFailureAll(oslist=["windows"]) # expecting one more thread 391 def test_Hg_switches_to_3_threads_attach(self): 392 self.build() 393 self.set_inferior_startup_attach() 394 self.Hg_switches_to_3_threads() 395 396 def Hc_then_Csignal_signals_correct_thread(self, segfault_signo): 397 # NOTE only run this one in inferior-launched mode: we can't grab inferior stdout when running attached, 398 # and the test requires getting stdout from the exe. 399 400 NUM_THREADS = 3 401 402 # Startup the inferior with three threads (main + NUM_THREADS-1 worker threads). 403 # inferior_args=["thread:print-ids"] 404 inferior_args = ["thread:segfault"] 405 for i in range(NUM_THREADS - 1): 406 # if i > 0: 407 # Give time between thread creation/segfaulting for the handler to work. 408 # inferior_args.append("sleep:1") 409 inferior_args.append("thread:new") 410 inferior_args.append("sleep:10") 411 412 # Launch/attach. (In our case, this should only ever be launched since 413 # we need inferior stdout/stderr). 414 procs = self.prep_debug_monitor_and_inferior( 415 inferior_args=inferior_args) 416 self.test_sequence.add_log_lines(["read packet: $c#63"], True) 417 context = self.expect_gdbremote_sequence() 418 419 # Let the inferior process have a few moments to start up the thread when launched. 420 # context = self.run_process_then_stop(run_seconds=1) 421 422 # Wait at most x seconds for all threads to be present. 423 # threads = self.wait_for_thread_count(NUM_THREADS) 424 # self.assertEquals(len(threads), NUM_THREADS) 425 426 signaled_tids = {} 427 print_thread_ids = {} 428 429 # Switch to each thread, deliver a signal, and verify signal delivery 430 for i in range(NUM_THREADS - 1): 431 # Run until SIGSEGV comes in. 432 self.reset_test_sequence() 433 self.test_sequence.add_log_lines([{"direction": "send", 434 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 435 "capture": {1: "signo", 436 2: "thread_id"}}], 437 True) 438 439 context = self.expect_gdbremote_sequence() 440 self.assertIsNotNone(context) 441 signo = context.get("signo") 442 self.assertEqual(int(signo, 16), segfault_signo) 443 444 # Ensure we haven't seen this tid yet. 445 thread_id = int(context.get("thread_id"), 16) 446 self.assertNotIn(thread_id, signaled_tids) 447 signaled_tids[thread_id] = 1 448 449 # Send SIGUSR1 to the thread that signaled the SIGSEGV. 450 self.reset_test_sequence() 451 self.test_sequence.add_log_lines( 452 [ 453 # Set the continue thread. 454 # Set current thread. 455 "read packet: $Hc{0:x}#00".format(thread_id), 456 "send packet: $OK#00", 457 458 # Continue sending the signal number to the continue thread. 459 # The commented out packet is a way to do this same operation without using 460 # a $Hc (but this test is testing $Hc, so we'll stick with the former). 461 "read packet: $C{0:x}#00".format(lldbutil.get_signal_number('SIGUSR1')), 462 # "read packet: $vCont;C{0:x}:{1:x};c#00".format(lldbutil.get_signal_number('SIGUSR1'), thread_id), 463 464 # FIXME: Linux does not report the thread stop on the delivered signal (SIGUSR1 here). MacOSX debugserver does. 465 # But MacOSX debugserver isn't guaranteeing the thread the signal handler runs on, so currently its an XFAIL. 466 # Need to rectify behavior here. The linux behavior is more intuitive to me since we're essentially swapping out 467 # an about-to-be-delivered signal (for which we already sent a stop packet) to a different signal. 468 # {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} }, 469 # "read packet: $c#63", 470 {"type": "output_match", "regex": r"^received SIGUSR1 on thread id: ([0-9a-fA-F]+)\r\nthread ([0-9a-fA-F]+): past SIGSEGV\r\n", "capture": {1: "print_thread_id", 2: "post_handle_thread_id"}}, 471 ], 472 True) 473 474 # Run the sequence. 475 context = self.expect_gdbremote_sequence() 476 self.assertIsNotNone(context) 477 478 # Ensure the stop signal is the signal we delivered. 479 # stop_signo = context.get("stop_signo") 480 # self.assertIsNotNone(stop_signo) 481 # self.assertEquals(int(stop_signo,16), lldbutil.get_signal_number('SIGUSR1')) 482 483 # Ensure the stop thread is the thread to which we delivered the signal. 484 # stop_thread_id = context.get("stop_thread_id") 485 # self.assertIsNotNone(stop_thread_id) 486 # self.assertEquals(int(stop_thread_id,16), thread_id) 487 488 # Ensure we haven't seen this thread id yet. The inferior's 489 # self-obtained thread ids are not guaranteed to match the stub 490 # tids (at least on MacOSX). 491 print_thread_id = context.get("print_thread_id") 492 self.assertIsNotNone(print_thread_id) 493 print_thread_id = int(print_thread_id, 16) 494 self.assertNotIn(print_thread_id, print_thread_ids) 495 496 # Now remember this print (i.e. inferior-reflected) thread id and 497 # ensure we don't hit it again. 498 print_thread_ids[print_thread_id] = 1 499 500 # Ensure post signal-handle thread id matches the thread that 501 # initially raised the SIGSEGV. 502 post_handle_thread_id = context.get("post_handle_thread_id") 503 self.assertIsNotNone(post_handle_thread_id) 504 post_handle_thread_id = int(post_handle_thread_id, 16) 505 self.assertEqual(post_handle_thread_id, print_thread_id) 506 507 @expectedFailureDarwin 508 @skipIfWindows # no SIGSEGV support 509 @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48419") 510 @expectedFailureNetBSD 511 def test_Hc_then_Csignal_signals_correct_thread_launch(self): 512 self.build() 513 self.set_inferior_startup_launch() 514 515 if self.platformIsDarwin(): 516 # Darwin debugserver translates some signals like SIGSEGV into some gdb 517 # expectations about fixed signal numbers. 518 self.Hc_then_Csignal_signals_correct_thread(self.TARGET_EXC_BAD_ACCESS) 519 else: 520 self.Hc_then_Csignal_signals_correct_thread( 521 lldbutil.get_signal_number('SIGSEGV')) 522 523 @skipIfWindows # No pty support to test any inferior output 524 def test_m_packet_reads_memory(self): 525 self.build() 526 self.set_inferior_startup_launch() 527 # This is the memory we will write into the inferior and then ensure we 528 # can read back with $m. 529 MEMORY_CONTENTS = "Test contents 0123456789 ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz" 530 531 # Start up the inferior. 532 procs = self.prep_debug_monitor_and_inferior( 533 inferior_args=[ 534 "set-message:%s" % 535 MEMORY_CONTENTS, 536 "get-data-address-hex:g_message", 537 "sleep:5"]) 538 539 # Run the process 540 self.test_sequence.add_log_lines( 541 [ 542 # Start running after initial stop. 543 "read packet: $c#63", 544 # Match output line that prints the memory address of the message buffer within the inferior. 545 # Note we require launch-only testing so we can get inferior otuput. 546 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"), 547 "capture": {1: "message_address"}}, 548 # Now stop the inferior. 549 "read packet: {}".format(chr(3)), 550 # And wait for the stop notification. 551 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 552 True) 553 554 # Run the packet stream. 555 context = self.expect_gdbremote_sequence() 556 self.assertIsNotNone(context) 557 558 # Grab the message address. 559 self.assertIsNotNone(context.get("message_address")) 560 message_address = int(context.get("message_address"), 16) 561 562 # Grab contents from the inferior. 563 self.reset_test_sequence() 564 self.test_sequence.add_log_lines( 565 ["read packet: $m{0:x},{1:x}#00".format(message_address, len(MEMORY_CONTENTS)), 566 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "read_contents"}}], 567 True) 568 569 # Run the packet stream. 570 context = self.expect_gdbremote_sequence() 571 self.assertIsNotNone(context) 572 573 # Ensure what we read from inferior memory is what we wrote. 574 self.assertIsNotNone(context.get("read_contents")) 575 read_contents = seven.unhexlify(context.get("read_contents")) 576 self.assertEqual(read_contents, MEMORY_CONTENTS) 577 578 def test_qMemoryRegionInfo_is_supported(self): 579 self.build() 580 self.set_inferior_startup_launch() 581 # Start up the inferior. 582 procs = self.prep_debug_monitor_and_inferior() 583 584 # Ask if it supports $qMemoryRegionInfo. 585 self.test_sequence.add_log_lines( 586 ["read packet: $qMemoryRegionInfo#00", 587 "send packet: $OK#00" 588 ], True) 589 self.expect_gdbremote_sequence() 590 591 @skipIfWindows # No pty support to test any inferior output 592 def test_qMemoryRegionInfo_reports_code_address_as_executable(self): 593 self.build() 594 self.set_inferior_startup_launch() 595 596 # Start up the inferior. 597 procs = self.prep_debug_monitor_and_inferior( 598 inferior_args=["get-code-address-hex:hello", "sleep:5"]) 599 600 # Run the process 601 self.test_sequence.add_log_lines( 602 [ 603 # Start running after initial stop. 604 "read packet: $c#63", 605 # Match output line that prints the memory address of the message buffer within the inferior. 606 # Note we require launch-only testing so we can get inferior otuput. 607 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"), 608 "capture": {1: "code_address"}}, 609 # Now stop the inferior. 610 "read packet: {}".format(chr(3)), 611 # And wait for the stop notification. 612 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 613 True) 614 615 # Run the packet stream. 616 context = self.expect_gdbremote_sequence() 617 self.assertIsNotNone(context) 618 619 # Grab the code address. 620 self.assertIsNotNone(context.get("code_address")) 621 code_address = int(context.get("code_address"), 16) 622 623 # Grab memory region info from the inferior. 624 self.reset_test_sequence() 625 self.add_query_memory_region_packets(code_address) 626 627 # Run the packet stream. 628 context = self.expect_gdbremote_sequence() 629 self.assertIsNotNone(context) 630 mem_region_dict = self.parse_memory_region_packet(context) 631 632 # Ensure there are no errors reported. 633 self.assertNotIn("error", mem_region_dict) 634 635 # Ensure code address is readable and executable. 636 self.assertIn("permissions", mem_region_dict) 637 self.assertIn("r", mem_region_dict["permissions"]) 638 self.assertIn("x", mem_region_dict["permissions"]) 639 640 # Ensure the start address and size encompass the address we queried. 641 self.assert_address_within_memory_region(code_address, mem_region_dict) 642 643 @skipIfWindows # No pty support to test any inferior output 644 def test_qMemoryRegionInfo_reports_stack_address_as_rw(self): 645 self.build() 646 self.set_inferior_startup_launch() 647 648 # Start up the inferior. 649 procs = self.prep_debug_monitor_and_inferior( 650 inferior_args=["get-stack-address-hex:", "sleep:5"]) 651 652 # Run the process 653 self.test_sequence.add_log_lines( 654 [ 655 # Start running after initial stop. 656 "read packet: $c#63", 657 # Match output line that prints the memory address of the message buffer within the inferior. 658 # Note we require launch-only testing so we can get inferior otuput. 659 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"stack address: 0x([0-9a-fA-F]+)\r\n"), 660 "capture": {1: "stack_address"}}, 661 # Now stop the inferior. 662 "read packet: {}".format(chr(3)), 663 # And wait for the stop notification. 664 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 665 True) 666 667 # Run the packet stream. 668 context = self.expect_gdbremote_sequence() 669 self.assertIsNotNone(context) 670 671 # Grab the address. 672 self.assertIsNotNone(context.get("stack_address")) 673 stack_address = int(context.get("stack_address"), 16) 674 675 # Grab memory region info from the inferior. 676 self.reset_test_sequence() 677 self.add_query_memory_region_packets(stack_address) 678 679 # Run the packet stream. 680 context = self.expect_gdbremote_sequence() 681 self.assertIsNotNone(context) 682 mem_region_dict = self.parse_memory_region_packet(context) 683 684 # Ensure there are no errors reported. 685 self.assertNotIn("error", mem_region_dict) 686 687 # Ensure address is readable and executable. 688 self.assertIn("permissions", mem_region_dict) 689 self.assertIn("r", mem_region_dict["permissions"]) 690 self.assertIn("w", mem_region_dict["permissions"]) 691 692 # Ensure the start address and size encompass the address we queried. 693 self.assert_address_within_memory_region( 694 stack_address, mem_region_dict) 695 696 @skipIfWindows # No pty support to test any inferior output 697 def test_qMemoryRegionInfo_reports_heap_address_as_rw(self): 698 self.build() 699 self.set_inferior_startup_launch() 700 701 # Start up the inferior. 702 procs = self.prep_debug_monitor_and_inferior( 703 inferior_args=["get-heap-address-hex:", "sleep:5"]) 704 705 # Run the process 706 self.test_sequence.add_log_lines( 707 [ 708 # Start running after initial stop. 709 "read packet: $c#63", 710 # Match output line that prints the memory address of the message buffer within the inferior. 711 # Note we require launch-only testing so we can get inferior otuput. 712 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"heap address: 0x([0-9a-fA-F]+)\r\n"), 713 "capture": {1: "heap_address"}}, 714 # Now stop the inferior. 715 "read packet: {}".format(chr(3)), 716 # And wait for the stop notification. 717 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 718 True) 719 720 # Run the packet stream. 721 context = self.expect_gdbremote_sequence() 722 self.assertIsNotNone(context) 723 724 # Grab the address. 725 self.assertIsNotNone(context.get("heap_address")) 726 heap_address = int(context.get("heap_address"), 16) 727 728 # Grab memory region info from the inferior. 729 self.reset_test_sequence() 730 self.add_query_memory_region_packets(heap_address) 731 732 # Run the packet stream. 733 context = self.expect_gdbremote_sequence() 734 self.assertIsNotNone(context) 735 mem_region_dict = self.parse_memory_region_packet(context) 736 737 # Ensure there are no errors reported. 738 self.assertNotIn("error", mem_region_dict) 739 740 # Ensure address is readable and executable. 741 self.assertIn("permissions", mem_region_dict) 742 self.assertIn("r", mem_region_dict["permissions"]) 743 self.assertIn("w", mem_region_dict["permissions"]) 744 745 # Ensure the start address and size encompass the address we queried. 746 self.assert_address_within_memory_region(heap_address, mem_region_dict) 747 748 def breakpoint_set_and_remove_work(self, want_hardware): 749 # Start up the inferior. 750 procs = self.prep_debug_monitor_and_inferior( 751 inferior_args=[ 752 "get-code-address-hex:hello", 753 "sleep:1", 754 "call-function:hello"]) 755 756 # Run the process 757 self.add_register_info_collection_packets() 758 self.add_process_info_collection_packets() 759 self.test_sequence.add_log_lines( 760 [ # Start running after initial stop. 761 "read packet: $c#63", 762 # Match output line that prints the memory address of the function call entry point. 763 # Note we require launch-only testing so we can get inferior otuput. 764 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"), 765 "capture": {1: "function_address"}}, 766 # Now stop the inferior. 767 "read packet: {}".format(chr(3)), 768 # And wait for the stop notification. 769 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 770 True) 771 772 # Run the packet stream. 773 context = self.expect_gdbremote_sequence() 774 self.assertIsNotNone(context) 775 776 # Gather process info - we need endian of target to handle register 777 # value conversions. 778 process_info = self.parse_process_info_response(context) 779 endian = process_info.get("endian") 780 self.assertIsNotNone(endian) 781 782 # Gather register info entries. 783 reg_infos = self.parse_register_info_packets(context) 784 (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_infos) 785 self.assertIsNotNone(pc_lldb_reg_index) 786 self.assertIsNotNone(pc_reg_info) 787 788 # Grab the function address. 789 self.assertIsNotNone(context.get("function_address")) 790 function_address = int(context.get("function_address"), 16) 791 792 # Get current target architecture 793 target_arch = self.getArchitecture() 794 795 # Set the breakpoint. 796 if (target_arch == "arm") or (target_arch == "aarch64"): 797 # TODO: Handle case when setting breakpoint in thumb code 798 BREAKPOINT_KIND = 4 799 else: 800 BREAKPOINT_KIND = 1 801 802 # Set default packet type to Z0 (software breakpoint) 803 z_packet_type = 0 804 805 # If hardware breakpoint is requested set packet type to Z1 806 if want_hardware == True: 807 z_packet_type = 1 808 809 self.reset_test_sequence() 810 self.add_set_breakpoint_packets( 811 function_address, 812 z_packet_type, 813 do_continue=True, 814 breakpoint_kind=BREAKPOINT_KIND) 815 816 # Run the packet stream. 817 context = self.expect_gdbremote_sequence() 818 self.assertIsNotNone(context) 819 820 # Verify the stop signal reported was the breakpoint signal number. 821 stop_signo = context.get("stop_signo") 822 self.assertIsNotNone(stop_signo) 823 self.assertEqual(int(stop_signo, 16), 824 lldbutil.get_signal_number('SIGTRAP')) 825 826 # Ensure we did not receive any output. If the breakpoint was not set, we would 827 # see output (from a launched process with captured stdio) printing a hello, world message. 828 # That would indicate the breakpoint didn't take. 829 self.assertEqual(len(context["O_content"]), 0) 830 831 # Verify that the PC for the main thread is where we expect it - right at the breakpoint address. 832 # This acts as a another validation on the register reading code. 833 self.reset_test_sequence() 834 self.test_sequence.add_log_lines( 835 [ 836 # Print the PC. This should match the breakpoint address. 837 "read packet: $p{0:x}#00".format(pc_lldb_reg_index), 838 # Capture $p results. 839 {"direction": "send", 840 "regex": r"^\$([0-9a-fA-F]+)#", 841 "capture": {1: "p_response"}}, 842 ], True) 843 844 context = self.expect_gdbremote_sequence() 845 self.assertIsNotNone(context) 846 847 # Verify the PC is where we expect. Note response is in endianness of 848 # the inferior. 849 p_response = context.get("p_response") 850 self.assertIsNotNone(p_response) 851 852 # Convert from target endian to int. 853 returned_pc = lldbgdbserverutils.unpack_register_hex_unsigned( 854 endian, p_response) 855 self.assertEqual(returned_pc, function_address) 856 857 # Verify that a breakpoint remove and continue gets us the expected 858 # output. 859 self.reset_test_sequence() 860 861 # Add breakpoint remove packets 862 self.add_remove_breakpoint_packets( 863 function_address, 864 z_packet_type, 865 breakpoint_kind=BREAKPOINT_KIND) 866 867 self.test_sequence.add_log_lines( 868 [ 869 # Continue running. 870 "read packet: $c#63", 871 # We should now receive the output from the call. 872 {"type": "output_match", "regex": r"^hello, world\r\n$"}, 873 # And wait for program completion. 874 {"direction": "send", "regex": r"^\$W00(.*)#[0-9a-fA-F]{2}$"}, 875 ], True) 876 877 context = self.expect_gdbremote_sequence() 878 self.assertIsNotNone(context) 879 880 @skipIfWindows # No pty support to test any inferior output 881 def test_software_breakpoint_set_and_remove_work(self): 882 if self.getArchitecture() == "arm": 883 # TODO: Handle case when setting breakpoint in thumb code 884 self.build(dictionary={'CFLAGS_EXTRAS': '-marm'}) 885 else: 886 self.build() 887 self.set_inferior_startup_launch() 888 self.breakpoint_set_and_remove_work(want_hardware=False) 889 890 @skipUnlessPlatform(oslist=['linux']) 891 @skipIf(archs=no_match(['arm', 'aarch64'])) 892 def test_hardware_breakpoint_set_and_remove_work(self): 893 if self.getArchitecture() == "arm": 894 # TODO: Handle case when setting breakpoint in thumb code 895 self.build(dictionary={'CFLAGS_EXTRAS': '-marm'}) 896 else: 897 self.build() 898 self.set_inferior_startup_launch() 899 self.breakpoint_set_and_remove_work(want_hardware=True) 900 901 def test_qSupported_returns_known_stub_features(self): 902 self.build() 903 self.set_inferior_startup_launch() 904 905 # Start up the stub and start/prep the inferior. 906 procs = self.prep_debug_monitor_and_inferior() 907 self.add_qSupported_packets() 908 909 # Run the packet stream. 910 context = self.expect_gdbremote_sequence() 911 self.assertIsNotNone(context) 912 913 # Retrieve the qSupported features. 914 supported_dict = self.parse_qSupported_response(context) 915 self.assertIsNotNone(supported_dict) 916 self.assertTrue(len(supported_dict) > 0) 917 918 @skipIfWindows # No pty support to test any inferior output 919 def test_written_M_content_reads_back_correctly(self): 920 self.build() 921 self.set_inferior_startup_launch() 922 923 TEST_MESSAGE = "Hello, memory" 924 925 # Start up the stub and start/prep the inferior. 926 procs = self.prep_debug_monitor_and_inferior( 927 inferior_args=[ 928 "set-message:xxxxxxxxxxxxxX", 929 "get-data-address-hex:g_message", 930 "sleep:1", 931 "print-message:"]) 932 self.test_sequence.add_log_lines( 933 [ 934 # Start running after initial stop. 935 "read packet: $c#63", 936 # Match output line that prints the memory address of the message buffer within the inferior. 937 # Note we require launch-only testing so we can get inferior otuput. 938 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"), 939 "capture": {1: "message_address"}}, 940 # Now stop the inferior. 941 "read packet: {}".format(chr(3)), 942 # And wait for the stop notification. 943 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 944 True) 945 context = self.expect_gdbremote_sequence() 946 self.assertIsNotNone(context) 947 948 # Grab the message address. 949 self.assertIsNotNone(context.get("message_address")) 950 message_address = int(context.get("message_address"), 16) 951 952 # Hex-encode the test message, adding null termination. 953 hex_encoded_message = seven.hexlify(TEST_MESSAGE) 954 955 # Write the message to the inferior. Verify that we can read it with the hex-encoded (m) 956 # and binary (x) memory read packets. 957 self.reset_test_sequence() 958 self.test_sequence.add_log_lines( 959 ["read packet: $M{0:x},{1:x}:{2}#00".format(message_address, len(TEST_MESSAGE), hex_encoded_message), 960 "send packet: $OK#00", 961 "read packet: $m{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)), 962 "send packet: ${0}#00".format(hex_encoded_message), 963 "read packet: $x{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)), 964 "send packet: ${0}#00".format(TEST_MESSAGE), 965 "read packet: $m{0:x},4#00".format(message_address), 966 "send packet: ${0}#00".format(hex_encoded_message[0:8]), 967 "read packet: $x{0:x},4#00".format(message_address), 968 "send packet: ${0}#00".format(TEST_MESSAGE[0:4]), 969 "read packet: $c#63", 970 {"type": "output_match", "regex": r"^message: (.+)\r\n$", "capture": {1: "printed_message"}}, 971 "send packet: $W00#00", 972 ], True) 973 context = self.expect_gdbremote_sequence() 974 self.assertIsNotNone(context) 975 976 # Ensure what we read from inferior memory is what we wrote. 977 printed_message = context.get("printed_message") 978 self.assertIsNotNone(printed_message) 979 self.assertEqual(printed_message, TEST_MESSAGE + "X") 980 981 # Note: as of this moment, a hefty number of the GPR writes are failing with E32 (everything except rax-rdx, rdi, rsi, rbp). 982 # Come back to this. I have the test rigged to verify that at least some 983 # of the bit-flip writes work. 984 def test_P_writes_all_gpr_registers(self): 985 self.build() 986 self.set_inferior_startup_launch() 987 988 # Start inferior debug session, grab all register info. 989 procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"]) 990 self.add_register_info_collection_packets() 991 self.add_process_info_collection_packets() 992 993 context = self.expect_gdbremote_sequence() 994 self.assertIsNotNone(context) 995 996 # Process register infos. 997 reg_infos = self.parse_register_info_packets(context) 998 self.assertIsNotNone(reg_infos) 999 self.add_lldb_register_index(reg_infos) 1000 1001 # Process endian. 1002 process_info = self.parse_process_info_response(context) 1003 endian = process_info.get("endian") 1004 self.assertIsNotNone(endian) 1005 1006 # Pull out the register infos that we think we can bit flip 1007 # successfully,. 1008 gpr_reg_infos = [ 1009 reg_info for reg_info in reg_infos if self.is_bit_flippable_register(reg_info)] 1010 self.assertTrue(len(gpr_reg_infos) > 0) 1011 1012 # Write flipped bit pattern of existing value to each register. 1013 (successful_writes, failed_writes) = self.flip_all_bits_in_each_register_value( 1014 gpr_reg_infos, endian) 1015 self.trace("successful writes: {}, failed writes: {}".format(successful_writes, failed_writes)) 1016 self.assertTrue(successful_writes > 0) 1017 1018 # Note: as of this moment, a hefty number of the GPR writes are failing 1019 # with E32 (everything except rax-rdx, rdi, rsi, rbp). 1020 @skipIfWindows 1021 def test_P_and_p_thread_suffix_work(self): 1022 self.build() 1023 self.set_inferior_startup_launch() 1024 1025 # Startup the inferior with three threads. 1026 procs = self.prep_debug_monitor_and_inferior( 1027 inferior_args=["thread:new", "thread:new"]) 1028 self.add_thread_suffix_request_packets() 1029 self.add_register_info_collection_packets() 1030 self.add_process_info_collection_packets() 1031 1032 context = self.expect_gdbremote_sequence() 1033 self.assertIsNotNone(context) 1034 1035 process_info = self.parse_process_info_response(context) 1036 self.assertIsNotNone(process_info) 1037 endian = process_info.get("endian") 1038 self.assertIsNotNone(endian) 1039 1040 reg_infos = self.parse_register_info_packets(context) 1041 self.assertIsNotNone(reg_infos) 1042 self.add_lldb_register_index(reg_infos) 1043 1044 reg_index = self.select_modifiable_register(reg_infos) 1045 self.assertIsNotNone(reg_index) 1046 reg_byte_size = int(reg_infos[reg_index]["bitsize"]) // 8 1047 self.assertTrue(reg_byte_size > 0) 1048 1049 # Run the process a bit so threads can start up, and collect register 1050 # info. 1051 context = self.run_process_then_stop(run_seconds=1) 1052 self.assertIsNotNone(context) 1053 1054 # Wait for 3 threads to be present. 1055 threads = self.wait_for_thread_count(3) 1056 self.assertEqual(len(threads), 3) 1057 1058 expected_reg_values = [] 1059 register_increment = 1 1060 next_value = None 1061 1062 # Set the same register in each of 3 threads to a different value. 1063 # Verify each one has the unique value. 1064 for thread in threads: 1065 # If we don't have a next value yet, start it with the initial read 1066 # value + 1 1067 if not next_value: 1068 # Read pre-existing register value. 1069 self.reset_test_sequence() 1070 self.test_sequence.add_log_lines( 1071 ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread), 1072 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1073 ], True) 1074 context = self.expect_gdbremote_sequence() 1075 self.assertIsNotNone(context) 1076 1077 # Set the next value to use for writing as the increment plus 1078 # current value. 1079 p_response = context.get("p_response") 1080 self.assertIsNotNone(p_response) 1081 next_value = lldbgdbserverutils.unpack_register_hex_unsigned( 1082 endian, p_response) 1083 1084 # Set new value using P and thread suffix. 1085 self.reset_test_sequence() 1086 self.test_sequence.add_log_lines( 1087 [ 1088 "read packet: $P{0:x}={1};thread:{2:x}#00".format( 1089 reg_index, 1090 lldbgdbserverutils.pack_register_hex( 1091 endian, 1092 next_value, 1093 byte_size=reg_byte_size), 1094 thread), 1095 "send packet: $OK#00", 1096 ], 1097 True) 1098 context = self.expect_gdbremote_sequence() 1099 self.assertIsNotNone(context) 1100 1101 # Save the value we set. 1102 expected_reg_values.append(next_value) 1103 1104 # Increment value for next thread to use (we want them all 1105 # different so we can verify they wrote to each thread correctly 1106 # next.) 1107 next_value += register_increment 1108 1109 # Revisit each thread and verify they have the expected value set for 1110 # the register we wrote. 1111 thread_index = 0 1112 for thread in threads: 1113 # Read pre-existing register value. 1114 self.reset_test_sequence() 1115 self.test_sequence.add_log_lines( 1116 ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread), 1117 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1118 ], True) 1119 context = self.expect_gdbremote_sequence() 1120 self.assertIsNotNone(context) 1121 1122 # Get the register value. 1123 p_response = context.get("p_response") 1124 self.assertIsNotNone(p_response) 1125 read_value = lldbgdbserverutils.unpack_register_hex_unsigned( 1126 endian, p_response) 1127 1128 # Make sure we read back what we wrote. 1129 self.assertEqual(read_value, expected_reg_values[thread_index]) 1130 thread_index += 1 1131