1""" 2Test case for testing the gdbremote protocol. 3 4Tests run against debugserver and lldb-server (llgs). 5lldb-server tests run where the lldb-server exe is 6available. 7 8This class will be broken into smaller test case classes by 9gdb remote packet functional areas. For now it contains 10the initial set of tests implemented. 11""" 12 13import binascii 14import itertools 15import struct 16 17import unittest2 18import gdbremote_testcase 19import lldbgdbserverutils 20from lldbsuite.support import seven 21from lldbsuite.test.decorators import * 22from lldbsuite.test.lldbtest import * 23from lldbsuite.test.lldbdwarf import * 24from lldbsuite.test import lldbutil, lldbplatformutil 25 26 27class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase, DwarfOpcodeParser): 28 29 mydir = TestBase.compute_mydir(__file__) 30 31 def test_thread_suffix_supported(self): 32 server = self.connect_to_debug_monitor() 33 self.assertIsNotNone(server) 34 35 self.do_handshake() 36 self.test_sequence.add_log_lines( 37 ["lldb-server < 26> read packet: $QThreadSuffixSupported#e4", 38 "lldb-server < 6> send packet: $OK#9a"], 39 True) 40 41 self.expect_gdbremote_sequence() 42 43 44 def test_list_threads_in_stop_reply_supported(self): 45 server = self.connect_to_debug_monitor() 46 self.assertIsNotNone(server) 47 48 self.do_handshake() 49 self.test_sequence.add_log_lines( 50 ["lldb-server < 27> read packet: $QListThreadsInStopReply#21", 51 "lldb-server < 6> send packet: $OK#9a"], 52 True) 53 self.expect_gdbremote_sequence() 54 55 def test_c_packet_works(self): 56 self.build() 57 procs = self.prep_debug_monitor_and_inferior() 58 self.test_sequence.add_log_lines( 59 ["read packet: $c#63", 60 "send packet: $W00#00"], 61 True) 62 63 self.expect_gdbremote_sequence() 64 65 @skipIfWindows # No pty support to test any inferior output 66 def test_inferior_print_exit(self): 67 self.build() 68 procs = self.prep_debug_monitor_and_inferior( 69 inferior_args=["hello, world"]) 70 self.test_sequence.add_log_lines( 71 ["read packet: $vCont;c#a8", 72 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"hello, world\r\n")}, 73 "send packet: $W00#00"], 74 True) 75 76 context = self.expect_gdbremote_sequence() 77 self.assertIsNotNone(context) 78 79 def test_first_launch_stop_reply_thread_matches_first_qC(self): 80 self.build() 81 procs = self.prep_debug_monitor_and_inferior() 82 self.test_sequence.add_log_lines(["read packet: $qC#00", 83 {"direction": "send", 84 "regex": r"^\$QC([0-9a-fA-F]+)#", 85 "capture": {1: "thread_id_QC"}}, 86 "read packet: $?#00", 87 {"direction": "send", 88 "regex": r"^\$T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+)", 89 "capture": {1: "thread_id_?"}}], 90 True) 91 context = self.expect_gdbremote_sequence() 92 self.assertEqual(context.get("thread_id_QC"), context.get("thread_id_?")) 93 94 def test_attach_commandline_continue_app_exits(self): 95 self.build() 96 self.set_inferior_startup_attach() 97 procs = self.prep_debug_monitor_and_inferior() 98 self.test_sequence.add_log_lines( 99 ["read packet: $vCont;c#a8", 100 "send packet: $W00#00"], 101 True) 102 self.expect_gdbremote_sequence() 103 104 # Wait a moment for completed and now-detached inferior process to 105 # clear. 106 time.sleep(1) 107 108 if not lldb.remote_platform: 109 # Process should be dead now. Reap results. 110 poll_result = procs["inferior"].poll() 111 self.assertIsNotNone(poll_result) 112 113 # Where possible, verify at the system level that the process is not 114 # running. 115 self.assertFalse( 116 lldbgdbserverutils.process_is_running( 117 procs["inferior"].pid, False)) 118 119 def test_qRegisterInfo_returns_one_valid_result(self): 120 self.build() 121 self.prep_debug_monitor_and_inferior() 122 self.test_sequence.add_log_lines( 123 ["read packet: $qRegisterInfo0#00", 124 {"direction": "send", "regex": r"^\$(.+);#[0-9A-Fa-f]{2}", "capture": {1: "reginfo_0"}}], 125 True) 126 127 # Run the stream 128 context = self.expect_gdbremote_sequence() 129 self.assertIsNotNone(context) 130 131 reg_info_packet = context.get("reginfo_0") 132 self.assertIsNotNone(reg_info_packet) 133 self.assert_valid_reg_info( 134 lldbgdbserverutils.parse_reg_info_response(reg_info_packet)) 135 136 def test_qRegisterInfo_returns_all_valid_results(self): 137 self.build() 138 self.prep_debug_monitor_and_inferior() 139 self.add_register_info_collection_packets() 140 141 # Run the stream. 142 context = self.expect_gdbremote_sequence() 143 self.assertIsNotNone(context) 144 145 # Validate that each register info returned validates. 146 for reg_info in self.parse_register_info_packets(context): 147 self.assert_valid_reg_info(reg_info) 148 149 def test_qRegisterInfo_contains_required_generics_debugserver(self): 150 self.build() 151 self.prep_debug_monitor_and_inferior() 152 self.add_register_info_collection_packets() 153 154 # Run the packet stream. 155 context = self.expect_gdbremote_sequence() 156 self.assertIsNotNone(context) 157 158 # Gather register info entries. 159 reg_infos = self.parse_register_info_packets(context) 160 161 # Collect all generic registers found. 162 generic_regs = { 163 reg_info['generic']: 1 for reg_info in reg_infos if 'generic' in reg_info} 164 165 # Ensure we have a program counter register. 166 self.assertIn('pc', generic_regs) 167 168 # Ensure we have a frame pointer register. PPC64le's FP is the same as SP 169 if self.getArchitecture() != 'powerpc64le': 170 self.assertIn('fp', generic_regs) 171 172 # Ensure we have a stack pointer register. 173 self.assertIn('sp', generic_regs) 174 175 # Ensure we have a flags register. 176 self.assertIn('flags', generic_regs) 177 178 def test_qRegisterInfo_contains_at_least_one_register_set(self): 179 self.build() 180 self.prep_debug_monitor_and_inferior() 181 self.add_register_info_collection_packets() 182 183 # Run the packet stream. 184 context = self.expect_gdbremote_sequence() 185 self.assertIsNotNone(context) 186 187 # Gather register info entries. 188 reg_infos = self.parse_register_info_packets(context) 189 190 # Collect all register sets found. 191 register_sets = { 192 reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info} 193 self.assertTrue(len(register_sets) >= 1) 194 195 def targetHasAVX(self): 196 triple = self.dbg.GetSelectedPlatform().GetTriple() 197 198 # TODO other platforms, please implement this function 199 if not re.match(".*-.*-linux", triple): 200 return True 201 202 # Need to do something different for non-Linux/Android targets 203 if lldb.remote_platform: 204 self.runCmd('platform get-file "/proc/cpuinfo" "cpuinfo"') 205 cpuinfo_path = "cpuinfo" 206 self.addTearDownHook(lambda: os.unlink("cpuinfo")) 207 else: 208 cpuinfo_path = "/proc/cpuinfo" 209 210 f = open(cpuinfo_path, 'r') 211 cpuinfo = f.read() 212 f.close() 213 return " avx " in cpuinfo 214 215 @expectedFailureAll(oslist=["windows"]) # no avx for now. 216 @skipIf(archs=no_match(['amd64', 'i386', 'x86_64'])) 217 @add_test_categories(["llgs"]) 218 def test_qRegisterInfo_contains_avx_registers(self): 219 self.build() 220 self.prep_debug_monitor_and_inferior() 221 self.add_register_info_collection_packets() 222 223 # Run the packet stream. 224 context = self.expect_gdbremote_sequence() 225 self.assertIsNotNone(context) 226 227 # Gather register info entries. 228 reg_infos = self.parse_register_info_packets(context) 229 230 # Collect all generics found. 231 register_sets = { 232 reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info} 233 self.assertEqual( 234 self.targetHasAVX(), 235 "Advanced Vector Extensions" in register_sets) 236 237 def qThreadInfo_contains_thread(self): 238 procs = self.prep_debug_monitor_and_inferior() 239 self.add_threadinfo_collection_packets() 240 241 # Run the packet stream. 242 context = self.expect_gdbremote_sequence() 243 self.assertIsNotNone(context) 244 245 # Gather threadinfo entries. 246 threads = self.parse_threadinfo_packets(context) 247 self.assertIsNotNone(threads) 248 249 # We should have exactly one thread. 250 self.assertEqual(len(threads), 1) 251 252 def test_qThreadInfo_contains_thread_launch(self): 253 self.build() 254 self.set_inferior_startup_launch() 255 self.qThreadInfo_contains_thread() 256 257 @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped 258 def test_qThreadInfo_contains_thread_attach(self): 259 self.build() 260 self.set_inferior_startup_attach() 261 self.qThreadInfo_contains_thread() 262 263 def qThreadInfo_matches_qC(self): 264 procs = self.prep_debug_monitor_and_inferior() 265 266 self.add_threadinfo_collection_packets() 267 self.test_sequence.add_log_lines( 268 ["read packet: $qC#00", 269 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}} 270 ], True) 271 272 # Run the packet stream. 273 context = self.expect_gdbremote_sequence() 274 self.assertIsNotNone(context) 275 276 # Gather threadinfo entries. 277 threads = self.parse_threadinfo_packets(context) 278 self.assertIsNotNone(threads) 279 280 # We should have exactly one thread from threadinfo. 281 self.assertEqual(len(threads), 1) 282 283 # We should have a valid thread_id from $QC. 284 QC_thread_id_hex = context.get("thread_id") 285 self.assertIsNotNone(QC_thread_id_hex) 286 QC_thread_id = int(QC_thread_id_hex, 16) 287 288 # Those two should be the same. 289 self.assertEqual(threads[0], QC_thread_id) 290 291 def test_qThreadInfo_matches_qC_launch(self): 292 self.build() 293 self.set_inferior_startup_launch() 294 self.qThreadInfo_matches_qC() 295 296 @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped 297 def test_qThreadInfo_matches_qC_attach(self): 298 self.build() 299 self.set_inferior_startup_attach() 300 self.qThreadInfo_matches_qC() 301 302 def test_p_returns_correct_data_size_for_each_qRegisterInfo_launch(self): 303 self.build() 304 self.set_inferior_startup_launch() 305 procs = self.prep_debug_monitor_and_inferior() 306 self.add_register_info_collection_packets() 307 308 # Run the packet stream. 309 context = self.expect_gdbremote_sequence() 310 self.assertIsNotNone(context) 311 312 # Gather register info entries. 313 reg_infos = self.parse_register_info_packets(context) 314 self.assertIsNotNone(reg_infos) 315 self.assertTrue(len(reg_infos) > 0) 316 317 byte_order = self.get_target_byte_order() 318 319 # Read value for each register. 320 reg_index = 0 321 for reg_info in reg_infos: 322 # Skip registers that don't have a register set. For x86, these are 323 # the DRx registers, which have no LLDB-kind register number and thus 324 # cannot be read via normal 325 # NativeRegisterContext::ReadRegister(reg_info,...) calls. 326 if not "set" in reg_info: 327 continue 328 329 # Clear existing packet expectations. 330 self.reset_test_sequence() 331 332 # Run the register query 333 self.test_sequence.add_log_lines( 334 ["read packet: $p{0:x}#00".format(reg_index), 335 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}], 336 True) 337 context = self.expect_gdbremote_sequence() 338 self.assertIsNotNone(context) 339 340 # Verify the response length. 341 p_response = context.get("p_response") 342 self.assertIsNotNone(p_response) 343 344 # Skip erraneous (unsupported) registers. 345 # TODO: remove this once we make unsupported registers disappear. 346 if p_response.startswith("E") and len(p_response) == 3: 347 continue 348 349 if "dynamic_size_dwarf_expr_bytes" in reg_info: 350 self.updateRegInfoBitsize(reg_info, byte_order) 351 self.assertEqual(len(p_response), 2 * int(reg_info["bitsize"]) / 8, 352 reg_info) 353 354 # Increment loop 355 reg_index += 1 356 357 def Hg_switches_to_3_threads(self, pass_pid=False): 358 # Startup the inferior with three threads (main + 2 new ones). 359 procs = self.prep_debug_monitor_and_inferior( 360 inferior_args=["thread:new", "thread:new"]) 361 362 # Let the inferior process have a few moments to start up the thread 363 # when launched. (The launch scenario has no time to run, so threads 364 # won't be there yet.) 365 self.run_process_then_stop(run_seconds=1) 366 367 # Wait at most x seconds for 3 threads to be present. 368 threads = self.wait_for_thread_count(3) 369 self.assertEqual(len(threads), 3) 370 371 pid_str = "" 372 if pass_pid: 373 pid_str = "p{0:x}.".format(procs["inferior"].pid) 374 375 # verify we can $H to each thead, and $qC matches the thread we set. 376 for thread in threads: 377 # Change to each thread, verify current thread id. 378 self.reset_test_sequence() 379 self.test_sequence.add_log_lines( 380 ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread), # Set current thread. 381 "send packet: $OK#00", 382 "read packet: $qC#00", 383 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}], 384 True) 385 386 context = self.expect_gdbremote_sequence() 387 self.assertIsNotNone(context) 388 389 # Verify the thread id. 390 self.assertIsNotNone(context.get("thread_id")) 391 self.assertEqual(int(context.get("thread_id"), 16), thread) 392 393 @expectedFailureAll(oslist=["windows"]) # expect 4 threads 394 @skipIf(compiler="clang", compiler_version=['<', '11.0']) 395 def test_Hg_switches_to_3_threads_launch(self): 396 self.build() 397 self.set_inferior_startup_launch() 398 self.Hg_switches_to_3_threads() 399 400 @expectedFailureAll(oslist=["windows"]) # expecting one more thread 401 @skipIf(compiler="clang", compiler_version=['<', '11.0']) 402 def test_Hg_switches_to_3_threads_attach(self): 403 self.build() 404 self.set_inferior_startup_attach() 405 self.Hg_switches_to_3_threads() 406 407 @expectedFailureAll(oslist=["windows"]) # expect 4 threads 408 @add_test_categories(["llgs"]) 409 @skipIf(compiler="clang", compiler_version=['<', '11.0']) 410 def test_Hg_switches_to_3_threads_attach_pass_correct_pid(self): 411 self.build() 412 self.set_inferior_startup_attach() 413 self.Hg_switches_to_3_threads(pass_pid=True) 414 415 def Hg_fails_on_pid(self, pass_pid): 416 # Start the inferior. 417 procs = self.prep_debug_monitor_and_inferior( 418 inferior_args=["thread:new"]) 419 420 self.run_process_then_stop(run_seconds=1) 421 422 threads = self.wait_for_thread_count(2) 423 self.assertEqual(len(threads), 2) 424 425 if pass_pid == -1: 426 pid_str = "p-1." 427 else: 428 pid_str = "p{0:x}.".format(pass_pid) 429 thread = threads[1] 430 431 self.test_sequence.add_log_lines( 432 ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread), # Set current thread. 433 "send packet: $Eff#00"], 434 True) 435 436 self.expect_gdbremote_sequence() 437 438 @expectedFailureAll(oslist=["windows"]) 439 @add_test_categories(["llgs"]) 440 def test_Hg_fails_on_another_pid(self): 441 self.build() 442 self.set_inferior_startup_launch() 443 self.Hg_fails_on_pid(1) 444 445 @expectedFailureAll(oslist=["windows"]) 446 @add_test_categories(["llgs"]) 447 def test_Hg_fails_on_zero_pid(self): 448 self.build() 449 self.set_inferior_startup_launch() 450 self.Hg_fails_on_pid(0) 451 452 @expectedFailureAll(oslist=["windows"]) 453 @add_test_categories(["llgs"]) 454 def test_Hg_fails_on_minus_one_pid(self): 455 self.build() 456 self.set_inferior_startup_launch() 457 self.Hg_fails_on_pid(-1) 458 459 def Hc_then_Csignal_signals_correct_thread(self, segfault_signo): 460 # NOTE only run this one in inferior-launched mode: we can't grab inferior stdout when running attached, 461 # and the test requires getting stdout from the exe. 462 463 NUM_THREADS = 3 464 465 # Startup the inferior with three threads (main + NUM_THREADS-1 worker threads). 466 # inferior_args=["thread:print-ids"] 467 inferior_args = ["thread:segfault"] 468 for i in range(NUM_THREADS - 1): 469 # if i > 0: 470 # Give time between thread creation/segfaulting for the handler to work. 471 # inferior_args.append("sleep:1") 472 inferior_args.append("thread:new") 473 inferior_args.append("sleep:10") 474 475 # Launch/attach. (In our case, this should only ever be launched since 476 # we need inferior stdout/stderr). 477 procs = self.prep_debug_monitor_and_inferior( 478 inferior_args=inferior_args) 479 self.test_sequence.add_log_lines(["read packet: $c#63"], True) 480 context = self.expect_gdbremote_sequence() 481 482 # Let the inferior process have a few moments to start up the thread when launched. 483 # context = self.run_process_then_stop(run_seconds=1) 484 485 # Wait at most x seconds for all threads to be present. 486 # threads = self.wait_for_thread_count(NUM_THREADS) 487 # self.assertEquals(len(threads), NUM_THREADS) 488 489 signaled_tids = {} 490 print_thread_ids = {} 491 492 # Switch to each thread, deliver a signal, and verify signal delivery 493 for i in range(NUM_THREADS - 1): 494 # Run until SIGSEGV comes in. 495 self.reset_test_sequence() 496 self.test_sequence.add_log_lines([{"direction": "send", 497 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 498 "capture": {1: "signo", 499 2: "thread_id"}}], 500 True) 501 502 context = self.expect_gdbremote_sequence() 503 self.assertIsNotNone(context) 504 signo = context.get("signo") 505 self.assertEqual(int(signo, 16), segfault_signo) 506 507 # Ensure we haven't seen this tid yet. 508 thread_id = int(context.get("thread_id"), 16) 509 self.assertNotIn(thread_id, signaled_tids) 510 signaled_tids[thread_id] = 1 511 512 # Send SIGUSR1 to the thread that signaled the SIGSEGV. 513 self.reset_test_sequence() 514 self.test_sequence.add_log_lines( 515 [ 516 # Set the continue thread. 517 # Set current thread. 518 "read packet: $Hc{0:x}#00".format(thread_id), 519 "send packet: $OK#00", 520 521 # Continue sending the signal number to the continue thread. 522 # The commented out packet is a way to do this same operation without using 523 # a $Hc (but this test is testing $Hc, so we'll stick with the former). 524 "read packet: $C{0:x}#00".format(lldbutil.get_signal_number('SIGUSR1')), 525 # "read packet: $vCont;C{0:x}:{1:x};c#00".format(lldbutil.get_signal_number('SIGUSR1'), thread_id), 526 527 # FIXME: Linux does not report the thread stop on the delivered signal (SIGUSR1 here). MacOSX debugserver does. 528 # But MacOSX debugserver isn't guaranteeing the thread the signal handler runs on, so currently its an XFAIL. 529 # Need to rectify behavior here. The linux behavior is more intuitive to me since we're essentially swapping out 530 # an about-to-be-delivered signal (for which we already sent a stop packet) to a different signal. 531 # {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} }, 532 # "read packet: $c#63", 533 {"type": "output_match", "regex": r"^received SIGUSR1 on thread id: ([0-9a-fA-F]+)\r\nthread ([0-9a-fA-F]+): past SIGSEGV\r\n", "capture": {1: "print_thread_id", 2: "post_handle_thread_id"}}, 534 ], 535 True) 536 537 # Run the sequence. 538 context = self.expect_gdbremote_sequence() 539 self.assertIsNotNone(context) 540 541 # Ensure the stop signal is the signal we delivered. 542 # stop_signo = context.get("stop_signo") 543 # self.assertIsNotNone(stop_signo) 544 # self.assertEquals(int(stop_signo,16), lldbutil.get_signal_number('SIGUSR1')) 545 546 # Ensure the stop thread is the thread to which we delivered the signal. 547 # stop_thread_id = context.get("stop_thread_id") 548 # self.assertIsNotNone(stop_thread_id) 549 # self.assertEquals(int(stop_thread_id,16), thread_id) 550 551 # Ensure we haven't seen this thread id yet. The inferior's 552 # self-obtained thread ids are not guaranteed to match the stub 553 # tids (at least on MacOSX). 554 print_thread_id = context.get("print_thread_id") 555 self.assertIsNotNone(print_thread_id) 556 print_thread_id = int(print_thread_id, 16) 557 self.assertNotIn(print_thread_id, print_thread_ids) 558 559 # Now remember this print (i.e. inferior-reflected) thread id and 560 # ensure we don't hit it again. 561 print_thread_ids[print_thread_id] = 1 562 563 # Ensure post signal-handle thread id matches the thread that 564 # initially raised the SIGSEGV. 565 post_handle_thread_id = context.get("post_handle_thread_id") 566 self.assertIsNotNone(post_handle_thread_id) 567 post_handle_thread_id = int(post_handle_thread_id, 16) 568 self.assertEqual(post_handle_thread_id, print_thread_id) 569 570 @expectedFailureDarwin 571 @skipIfWindows # no SIGSEGV support 572 @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48419") 573 @expectedFailureNetBSD 574 def test_Hc_then_Csignal_signals_correct_thread_launch(self): 575 self.build() 576 self.set_inferior_startup_launch() 577 578 if self.platformIsDarwin(): 579 # Darwin debugserver translates some signals like SIGSEGV into some gdb 580 # expectations about fixed signal numbers. 581 self.Hc_then_Csignal_signals_correct_thread(self.TARGET_EXC_BAD_ACCESS) 582 else: 583 self.Hc_then_Csignal_signals_correct_thread( 584 lldbutil.get_signal_number('SIGSEGV')) 585 586 @skipIfWindows # No pty support to test any inferior output 587 def test_m_packet_reads_memory(self): 588 self.build() 589 self.set_inferior_startup_launch() 590 # This is the memory we will write into the inferior and then ensure we 591 # can read back with $m. 592 MEMORY_CONTENTS = "Test contents 0123456789 ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz" 593 594 # Start up the inferior. 595 procs = self.prep_debug_monitor_and_inferior( 596 inferior_args=[ 597 "set-message:%s" % 598 MEMORY_CONTENTS, 599 "get-data-address-hex:g_message", 600 "sleep:5"]) 601 602 # Run the process 603 self.test_sequence.add_log_lines( 604 [ 605 # Start running after initial stop. 606 "read packet: $c#63", 607 # Match output line that prints the memory address of the message buffer within the inferior. 608 # Note we require launch-only testing so we can get inferior otuput. 609 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"), 610 "capture": {1: "message_address"}}, 611 # Now stop the inferior. 612 "read packet: {}".format(chr(3)), 613 # And wait for the stop notification. 614 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 615 True) 616 617 # Run the packet stream. 618 context = self.expect_gdbremote_sequence() 619 self.assertIsNotNone(context) 620 621 # Grab the message address. 622 self.assertIsNotNone(context.get("message_address")) 623 message_address = int(context.get("message_address"), 16) 624 625 # Grab contents from the inferior. 626 self.reset_test_sequence() 627 self.test_sequence.add_log_lines( 628 ["read packet: $m{0:x},{1:x}#00".format(message_address, len(MEMORY_CONTENTS)), 629 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "read_contents"}}], 630 True) 631 632 # Run the packet stream. 633 context = self.expect_gdbremote_sequence() 634 self.assertIsNotNone(context) 635 636 # Ensure what we read from inferior memory is what we wrote. 637 self.assertIsNotNone(context.get("read_contents")) 638 read_contents = seven.unhexlify(context.get("read_contents")) 639 self.assertEqual(read_contents, MEMORY_CONTENTS) 640 641 def test_qMemoryRegionInfo_is_supported(self): 642 self.build() 643 self.set_inferior_startup_launch() 644 # Start up the inferior. 645 procs = self.prep_debug_monitor_and_inferior() 646 647 # Ask if it supports $qMemoryRegionInfo. 648 self.test_sequence.add_log_lines( 649 ["read packet: $qMemoryRegionInfo#00", 650 "send packet: $OK#00" 651 ], True) 652 self.expect_gdbremote_sequence() 653 654 @skipIfWindows # No pty support to test any inferior output 655 def test_qMemoryRegionInfo_reports_code_address_as_executable(self): 656 self.build() 657 self.set_inferior_startup_launch() 658 659 # Start up the inferior. 660 procs = self.prep_debug_monitor_and_inferior( 661 inferior_args=["get-code-address-hex:hello", "sleep:5"]) 662 663 # Run the process 664 self.test_sequence.add_log_lines( 665 [ 666 # Start running after initial stop. 667 "read packet: $c#63", 668 # Match output line that prints the memory address of the message buffer within the inferior. 669 # Note we require launch-only testing so we can get inferior otuput. 670 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"), 671 "capture": {1: "code_address"}}, 672 # Now stop the inferior. 673 "read packet: {}".format(chr(3)), 674 # And wait for the stop notification. 675 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 676 True) 677 678 # Run the packet stream. 679 context = self.expect_gdbremote_sequence() 680 self.assertIsNotNone(context) 681 682 # Grab the code address. 683 self.assertIsNotNone(context.get("code_address")) 684 code_address = int(context.get("code_address"), 16) 685 686 # Grab memory region info from the inferior. 687 self.reset_test_sequence() 688 self.add_query_memory_region_packets(code_address) 689 690 # Run the packet stream. 691 context = self.expect_gdbremote_sequence() 692 self.assertIsNotNone(context) 693 mem_region_dict = self.parse_memory_region_packet(context) 694 695 # Ensure there are no errors reported. 696 self.assertNotIn("error", mem_region_dict) 697 698 # Ensure code address is readable and executable. 699 self.assertIn("permissions", mem_region_dict) 700 self.assertIn("r", mem_region_dict["permissions"]) 701 self.assertIn("x", mem_region_dict["permissions"]) 702 703 # Ensure the start address and size encompass the address we queried. 704 self.assert_address_within_memory_region(code_address, mem_region_dict) 705 706 @skipIfWindows # No pty support to test any inferior output 707 def test_qMemoryRegionInfo_reports_stack_address_as_rw(self): 708 self.build() 709 self.set_inferior_startup_launch() 710 711 # Start up the inferior. 712 procs = self.prep_debug_monitor_and_inferior( 713 inferior_args=["get-stack-address-hex:", "sleep:5"]) 714 715 # Run the process 716 self.test_sequence.add_log_lines( 717 [ 718 # Start running after initial stop. 719 "read packet: $c#63", 720 # Match output line that prints the memory address of the message buffer within the inferior. 721 # Note we require launch-only testing so we can get inferior otuput. 722 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"stack address: 0x([0-9a-fA-F]+)\r\n"), 723 "capture": {1: "stack_address"}}, 724 # Now stop the inferior. 725 "read packet: {}".format(chr(3)), 726 # And wait for the stop notification. 727 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 728 True) 729 730 # Run the packet stream. 731 context = self.expect_gdbremote_sequence() 732 self.assertIsNotNone(context) 733 734 # Grab the address. 735 self.assertIsNotNone(context.get("stack_address")) 736 stack_address = int(context.get("stack_address"), 16) 737 738 # Grab memory region info from the inferior. 739 self.reset_test_sequence() 740 self.add_query_memory_region_packets(stack_address) 741 742 # Run the packet stream. 743 context = self.expect_gdbremote_sequence() 744 self.assertIsNotNone(context) 745 mem_region_dict = self.parse_memory_region_packet(context) 746 747 # Ensure there are no errors reported. 748 self.assertNotIn("error", mem_region_dict) 749 750 # Ensure address is readable and executable. 751 self.assertIn("permissions", mem_region_dict) 752 self.assertIn("r", mem_region_dict["permissions"]) 753 self.assertIn("w", mem_region_dict["permissions"]) 754 755 # Ensure the start address and size encompass the address we queried. 756 self.assert_address_within_memory_region( 757 stack_address, mem_region_dict) 758 759 @skipIfWindows # No pty support to test any inferior output 760 def test_qMemoryRegionInfo_reports_heap_address_as_rw(self): 761 self.build() 762 self.set_inferior_startup_launch() 763 764 # Start up the inferior. 765 procs = self.prep_debug_monitor_and_inferior( 766 inferior_args=["get-heap-address-hex:", "sleep:5"]) 767 768 # Run the process 769 self.test_sequence.add_log_lines( 770 [ 771 # Start running after initial stop. 772 "read packet: $c#63", 773 # Match output line that prints the memory address of the message buffer within the inferior. 774 # Note we require launch-only testing so we can get inferior otuput. 775 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"heap address: 0x([0-9a-fA-F]+)\r\n"), 776 "capture": {1: "heap_address"}}, 777 # Now stop the inferior. 778 "read packet: {}".format(chr(3)), 779 # And wait for the stop notification. 780 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 781 True) 782 783 # Run the packet stream. 784 context = self.expect_gdbremote_sequence() 785 self.assertIsNotNone(context) 786 787 # Grab the address. 788 self.assertIsNotNone(context.get("heap_address")) 789 heap_address = int(context.get("heap_address"), 16) 790 791 # Grab memory region info from the inferior. 792 self.reset_test_sequence() 793 self.add_query_memory_region_packets(heap_address) 794 795 # Run the packet stream. 796 context = self.expect_gdbremote_sequence() 797 self.assertIsNotNone(context) 798 mem_region_dict = self.parse_memory_region_packet(context) 799 800 # Ensure there are no errors reported. 801 self.assertNotIn("error", mem_region_dict) 802 803 # Ensure address is readable and executable. 804 self.assertIn("permissions", mem_region_dict) 805 self.assertIn("r", mem_region_dict["permissions"]) 806 self.assertIn("w", mem_region_dict["permissions"]) 807 808 # Ensure the start address and size encompass the address we queried. 809 self.assert_address_within_memory_region(heap_address, mem_region_dict) 810 811 def breakpoint_set_and_remove_work(self, want_hardware): 812 # Start up the inferior. 813 procs = self.prep_debug_monitor_and_inferior( 814 inferior_args=[ 815 "get-code-address-hex:hello", 816 "sleep:1", 817 "call-function:hello"]) 818 819 # Run the process 820 self.add_register_info_collection_packets() 821 self.add_process_info_collection_packets() 822 self.test_sequence.add_log_lines( 823 [ # Start running after initial stop. 824 "read packet: $c#63", 825 # Match output line that prints the memory address of the function call entry point. 826 # Note we require launch-only testing so we can get inferior otuput. 827 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"), 828 "capture": {1: "function_address"}}, 829 # Now stop the inferior. 830 "read packet: {}".format(chr(3)), 831 # And wait for the stop notification. 832 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 833 True) 834 835 # Run the packet stream. 836 context = self.expect_gdbremote_sequence() 837 self.assertIsNotNone(context) 838 839 # Gather process info - we need endian of target to handle register 840 # value conversions. 841 process_info = self.parse_process_info_response(context) 842 endian = process_info.get("endian") 843 self.assertIsNotNone(endian) 844 845 # Gather register info entries. 846 reg_infos = self.parse_register_info_packets(context) 847 (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_infos) 848 self.assertIsNotNone(pc_lldb_reg_index) 849 self.assertIsNotNone(pc_reg_info) 850 851 # Grab the function address. 852 self.assertIsNotNone(context.get("function_address")) 853 function_address = int(context.get("function_address"), 16) 854 855 # Get current target architecture 856 target_arch = self.getArchitecture() 857 858 # Set the breakpoint. 859 if (target_arch == "arm") or (target_arch == "aarch64"): 860 # TODO: Handle case when setting breakpoint in thumb code 861 BREAKPOINT_KIND = 4 862 else: 863 BREAKPOINT_KIND = 1 864 865 # Set default packet type to Z0 (software breakpoint) 866 z_packet_type = 0 867 868 # If hardware breakpoint is requested set packet type to Z1 869 if want_hardware == True: 870 z_packet_type = 1 871 872 self.reset_test_sequence() 873 self.add_set_breakpoint_packets( 874 function_address, 875 z_packet_type, 876 do_continue=True, 877 breakpoint_kind=BREAKPOINT_KIND) 878 879 # Run the packet stream. 880 context = self.expect_gdbremote_sequence() 881 self.assertIsNotNone(context) 882 883 # Verify the stop signal reported was the breakpoint signal number. 884 stop_signo = context.get("stop_signo") 885 self.assertIsNotNone(stop_signo) 886 self.assertEqual(int(stop_signo, 16), 887 lldbutil.get_signal_number('SIGTRAP')) 888 889 # Ensure we did not receive any output. If the breakpoint was not set, we would 890 # see output (from a launched process with captured stdio) printing a hello, world message. 891 # That would indicate the breakpoint didn't take. 892 self.assertEqual(len(context["O_content"]), 0) 893 894 # Verify that the PC for the main thread is where we expect it - right at the breakpoint address. 895 # This acts as a another validation on the register reading code. 896 self.reset_test_sequence() 897 self.test_sequence.add_log_lines( 898 [ 899 # Print the PC. This should match the breakpoint address. 900 "read packet: $p{0:x}#00".format(pc_lldb_reg_index), 901 # Capture $p results. 902 {"direction": "send", 903 "regex": r"^\$([0-9a-fA-F]+)#", 904 "capture": {1: "p_response"}}, 905 ], True) 906 907 context = self.expect_gdbremote_sequence() 908 self.assertIsNotNone(context) 909 910 # Verify the PC is where we expect. Note response is in endianness of 911 # the inferior. 912 p_response = context.get("p_response") 913 self.assertIsNotNone(p_response) 914 915 # Convert from target endian to int. 916 returned_pc = lldbgdbserverutils.unpack_register_hex_unsigned( 917 endian, p_response) 918 self.assertEqual(returned_pc, function_address) 919 920 # Verify that a breakpoint remove and continue gets us the expected 921 # output. 922 self.reset_test_sequence() 923 924 # Add breakpoint remove packets 925 self.add_remove_breakpoint_packets( 926 function_address, 927 z_packet_type, 928 breakpoint_kind=BREAKPOINT_KIND) 929 930 self.test_sequence.add_log_lines( 931 [ 932 # Continue running. 933 "read packet: $c#63", 934 # We should now receive the output from the call. 935 {"type": "output_match", "regex": r"^hello, world\r\n$"}, 936 # And wait for program completion. 937 {"direction": "send", "regex": r"^\$W00(.*)#[0-9a-fA-F]{2}$"}, 938 ], True) 939 940 context = self.expect_gdbremote_sequence() 941 self.assertIsNotNone(context) 942 943 @skipIfWindows # No pty support to test any inferior output 944 def test_software_breakpoint_set_and_remove_work(self): 945 if self.getArchitecture() == "arm": 946 # TODO: Handle case when setting breakpoint in thumb code 947 self.build(dictionary={'CFLAGS_EXTRAS': '-marm'}) 948 else: 949 self.build() 950 self.set_inferior_startup_launch() 951 self.breakpoint_set_and_remove_work(want_hardware=False) 952 953 @skipUnlessPlatform(oslist=['linux']) 954 @skipIf(archs=no_match(['arm', 'aarch64'])) 955 def test_hardware_breakpoint_set_and_remove_work(self): 956 if self.getArchitecture() == "arm": 957 # TODO: Handle case when setting breakpoint in thumb code 958 self.build(dictionary={'CFLAGS_EXTRAS': '-marm'}) 959 else: 960 self.build() 961 self.set_inferior_startup_launch() 962 self.breakpoint_set_and_remove_work(want_hardware=True) 963 964 def get_qSupported_dict(self, features=[]): 965 self.build() 966 self.set_inferior_startup_launch() 967 968 # Start up the stub and start/prep the inferior. 969 procs = self.prep_debug_monitor_and_inferior() 970 self.add_qSupported_packets(features) 971 972 # Run the packet stream. 973 context = self.expect_gdbremote_sequence() 974 self.assertIsNotNone(context) 975 976 # Retrieve the qSupported features. 977 return self.parse_qSupported_response(context) 978 979 def test_qSupported_returns_known_stub_features(self): 980 supported_dict = self.get_qSupported_dict() 981 self.assertIsNotNone(supported_dict) 982 self.assertTrue(len(supported_dict) > 0) 983 984 def test_qSupported_auvx(self): 985 expected = ('+' if lldbplatformutil.getPlatform() 986 in ["freebsd", "linux", "netbsd"] else '-') 987 supported_dict = self.get_qSupported_dict() 988 self.assertEqual(supported_dict.get('qXfer:auxv:read', '-'), expected) 989 990 def test_qSupported_libraries_svr4(self): 991 expected = ('+' if lldbplatformutil.getPlatform() 992 in ["freebsd", "linux", "netbsd"] else '-') 993 supported_dict = self.get_qSupported_dict() 994 self.assertEqual(supported_dict.get('qXfer:libraries-svr4:read', '-'), 995 expected) 996 997 def test_qSupported_siginfo_read(self): 998 expected = ('+' if lldbplatformutil.getPlatform() 999 in ["freebsd", "linux"] else '-') 1000 supported_dict = self.get_qSupported_dict() 1001 self.assertEqual(supported_dict.get('qXfer:siginfo:read', '-'), 1002 expected) 1003 1004 def test_qSupported_QPassSignals(self): 1005 expected = ('+' if lldbplatformutil.getPlatform() 1006 in ["freebsd", "linux", "netbsd"] else '-') 1007 supported_dict = self.get_qSupported_dict() 1008 self.assertEqual(supported_dict.get('QPassSignals', '-'), expected) 1009 1010 @add_test_categories(["fork"]) 1011 def test_qSupported_fork_events(self): 1012 supported_dict = ( 1013 self.get_qSupported_dict(['multiprocess+', 'fork-events+'])) 1014 self.assertEqual(supported_dict.get('multiprocess', '-'), '+') 1015 self.assertEqual(supported_dict.get('fork-events', '-'), '+') 1016 self.assertEqual(supported_dict.get('vfork-events', '-'), '-') 1017 1018 @add_test_categories(["fork"]) 1019 def test_qSupported_fork_events_without_multiprocess(self): 1020 supported_dict = ( 1021 self.get_qSupported_dict(['fork-events+'])) 1022 self.assertEqual(supported_dict.get('multiprocess', '-'), '-') 1023 self.assertEqual(supported_dict.get('fork-events', '-'), '-') 1024 self.assertEqual(supported_dict.get('vfork-events', '-'), '-') 1025 1026 @add_test_categories(["fork"]) 1027 def test_qSupported_vfork_events(self): 1028 supported_dict = ( 1029 self.get_qSupported_dict(['multiprocess+', 'vfork-events+'])) 1030 self.assertEqual(supported_dict.get('multiprocess', '-'), '+') 1031 self.assertEqual(supported_dict.get('fork-events', '-'), '-') 1032 self.assertEqual(supported_dict.get('vfork-events', '-'), '+') 1033 1034 @add_test_categories(["fork"]) 1035 def test_qSupported_vfork_events_without_multiprocess(self): 1036 supported_dict = ( 1037 self.get_qSupported_dict(['vfork-events+'])) 1038 self.assertEqual(supported_dict.get('multiprocess', '-'), '-') 1039 self.assertEqual(supported_dict.get('fork-events', '-'), '-') 1040 self.assertEqual(supported_dict.get('vfork-events', '-'), '-') 1041 1042 # We need to be able to self.runCmd to get cpuinfo, 1043 # which is not possible when using a remote platform. 1044 @skipIfRemote 1045 def test_qSupported_memory_tagging(self): 1046 supported_dict = self.get_qSupported_dict() 1047 self.assertEqual(supported_dict.get("memory-tagging", '-'), 1048 '+' if self.isAArch64MTE() else '-') 1049 1050 @skipIfWindows # No pty support to test any inferior output 1051 def test_written_M_content_reads_back_correctly(self): 1052 self.build() 1053 self.set_inferior_startup_launch() 1054 1055 TEST_MESSAGE = "Hello, memory" 1056 1057 # Start up the stub and start/prep the inferior. 1058 procs = self.prep_debug_monitor_and_inferior( 1059 inferior_args=[ 1060 "set-message:xxxxxxxxxxxxxX", 1061 "get-data-address-hex:g_message", 1062 "sleep:1", 1063 "print-message:"]) 1064 self.test_sequence.add_log_lines( 1065 [ 1066 # Start running after initial stop. 1067 "read packet: $c#63", 1068 # Match output line that prints the memory address of the message buffer within the inferior. 1069 # Note we require launch-only testing so we can get inferior otuput. 1070 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"), 1071 "capture": {1: "message_address"}}, 1072 # Now stop the inferior. 1073 "read packet: {}".format(chr(3)), 1074 # And wait for the stop notification. 1075 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 1076 True) 1077 context = self.expect_gdbremote_sequence() 1078 self.assertIsNotNone(context) 1079 1080 # Grab the message address. 1081 self.assertIsNotNone(context.get("message_address")) 1082 message_address = int(context.get("message_address"), 16) 1083 1084 # Hex-encode the test message, adding null termination. 1085 hex_encoded_message = seven.hexlify(TEST_MESSAGE) 1086 1087 # Write the message to the inferior. Verify that we can read it with the hex-encoded (m) 1088 # and binary (x) memory read packets. 1089 self.reset_test_sequence() 1090 self.test_sequence.add_log_lines( 1091 ["read packet: $M{0:x},{1:x}:{2}#00".format(message_address, len(TEST_MESSAGE), hex_encoded_message), 1092 "send packet: $OK#00", 1093 "read packet: $m{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)), 1094 "send packet: ${0}#00".format(hex_encoded_message), 1095 "read packet: $x{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)), 1096 "send packet: ${0}#00".format(TEST_MESSAGE), 1097 "read packet: $m{0:x},4#00".format(message_address), 1098 "send packet: ${0}#00".format(hex_encoded_message[0:8]), 1099 "read packet: $x{0:x},4#00".format(message_address), 1100 "send packet: ${0}#00".format(TEST_MESSAGE[0:4]), 1101 "read packet: $c#63", 1102 {"type": "output_match", "regex": r"^message: (.+)\r\n$", "capture": {1: "printed_message"}}, 1103 "send packet: $W00#00", 1104 ], True) 1105 context = self.expect_gdbremote_sequence() 1106 self.assertIsNotNone(context) 1107 1108 # Ensure what we read from inferior memory is what we wrote. 1109 printed_message = context.get("printed_message") 1110 self.assertIsNotNone(printed_message) 1111 self.assertEqual(printed_message, TEST_MESSAGE + "X") 1112 1113 # Note: as of this moment, a hefty number of the GPR writes are failing with E32 (everything except rax-rdx, rdi, rsi, rbp). 1114 # Come back to this. I have the test rigged to verify that at least some 1115 # of the bit-flip writes work. 1116 def test_P_writes_all_gpr_registers(self): 1117 self.build() 1118 self.set_inferior_startup_launch() 1119 1120 # Start inferior debug session, grab all register info. 1121 procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"]) 1122 self.add_register_info_collection_packets() 1123 self.add_process_info_collection_packets() 1124 1125 context = self.expect_gdbremote_sequence() 1126 self.assertIsNotNone(context) 1127 1128 # Process register infos. 1129 reg_infos = self.parse_register_info_packets(context) 1130 self.assertIsNotNone(reg_infos) 1131 self.add_lldb_register_index(reg_infos) 1132 1133 # Process endian. 1134 process_info = self.parse_process_info_response(context) 1135 endian = process_info.get("endian") 1136 self.assertIsNotNone(endian) 1137 1138 # Pull out the register infos that we think we can bit flip 1139 # successfully,. 1140 gpr_reg_infos = [ 1141 reg_info for reg_info in reg_infos if self.is_bit_flippable_register(reg_info)] 1142 self.assertTrue(len(gpr_reg_infos) > 0) 1143 1144 # Write flipped bit pattern of existing value to each register. 1145 (successful_writes, failed_writes) = self.flip_all_bits_in_each_register_value( 1146 gpr_reg_infos, endian) 1147 self.trace("successful writes: {}, failed writes: {}".format(successful_writes, failed_writes)) 1148 self.assertTrue(successful_writes > 0) 1149 1150 # Note: as of this moment, a hefty number of the GPR writes are failing 1151 # with E32 (everything except rax-rdx, rdi, rsi, rbp). 1152 @skipIfWindows 1153 def test_P_and_p_thread_suffix_work(self): 1154 self.build() 1155 self.set_inferior_startup_launch() 1156 1157 # Startup the inferior with three threads. 1158 procs = self.prep_debug_monitor_and_inferior( 1159 inferior_args=["thread:new", "thread:new"]) 1160 self.add_thread_suffix_request_packets() 1161 self.add_register_info_collection_packets() 1162 self.add_process_info_collection_packets() 1163 1164 context = self.expect_gdbremote_sequence() 1165 self.assertIsNotNone(context) 1166 1167 process_info = self.parse_process_info_response(context) 1168 self.assertIsNotNone(process_info) 1169 endian = process_info.get("endian") 1170 self.assertIsNotNone(endian) 1171 1172 reg_infos = self.parse_register_info_packets(context) 1173 self.assertIsNotNone(reg_infos) 1174 self.add_lldb_register_index(reg_infos) 1175 1176 reg_index = self.select_modifiable_register(reg_infos) 1177 self.assertIsNotNone(reg_index) 1178 reg_byte_size = int(reg_infos[reg_index]["bitsize"]) // 8 1179 self.assertTrue(reg_byte_size > 0) 1180 1181 # Run the process a bit so threads can start up, and collect register 1182 # info. 1183 context = self.run_process_then_stop(run_seconds=1) 1184 self.assertIsNotNone(context) 1185 1186 # Wait for 3 threads to be present. 1187 threads = self.wait_for_thread_count(3) 1188 self.assertEqual(len(threads), 3) 1189 1190 expected_reg_values = [] 1191 register_increment = 1 1192 next_value = None 1193 1194 # Set the same register in each of 3 threads to a different value. 1195 # Verify each one has the unique value. 1196 for thread in threads: 1197 # If we don't have a next value yet, start it with the initial read 1198 # value + 1 1199 if not next_value: 1200 # Read pre-existing register value. 1201 self.reset_test_sequence() 1202 self.test_sequence.add_log_lines( 1203 ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread), 1204 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1205 ], True) 1206 context = self.expect_gdbremote_sequence() 1207 self.assertIsNotNone(context) 1208 1209 # Set the next value to use for writing as the increment plus 1210 # current value. 1211 p_response = context.get("p_response") 1212 self.assertIsNotNone(p_response) 1213 next_value = lldbgdbserverutils.unpack_register_hex_unsigned( 1214 endian, p_response) 1215 1216 # Set new value using P and thread suffix. 1217 self.reset_test_sequence() 1218 self.test_sequence.add_log_lines( 1219 [ 1220 "read packet: $P{0:x}={1};thread:{2:x}#00".format( 1221 reg_index, 1222 lldbgdbserverutils.pack_register_hex( 1223 endian, 1224 next_value, 1225 byte_size=reg_byte_size), 1226 thread), 1227 "send packet: $OK#00", 1228 ], 1229 True) 1230 context = self.expect_gdbremote_sequence() 1231 self.assertIsNotNone(context) 1232 1233 # Save the value we set. 1234 expected_reg_values.append(next_value) 1235 1236 # Increment value for next thread to use (we want them all 1237 # different so we can verify they wrote to each thread correctly 1238 # next.) 1239 next_value += register_increment 1240 1241 # Revisit each thread and verify they have the expected value set for 1242 # the register we wrote. 1243 thread_index = 0 1244 for thread in threads: 1245 # Read pre-existing register value. 1246 self.reset_test_sequence() 1247 self.test_sequence.add_log_lines( 1248 ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread), 1249 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1250 ], True) 1251 context = self.expect_gdbremote_sequence() 1252 self.assertIsNotNone(context) 1253 1254 # Get the register value. 1255 p_response = context.get("p_response") 1256 self.assertIsNotNone(p_response) 1257 read_value = lldbgdbserverutils.unpack_register_hex_unsigned( 1258 endian, p_response) 1259 1260 # Make sure we read back what we wrote. 1261 self.assertEqual(read_value, expected_reg_values[thread_index]) 1262 thread_index += 1 1263 1264 @skipIfWindows # No pty support to test any inferior output 1265 @add_test_categories(["llgs"]) 1266 def test_launch_via_A(self): 1267 self.build() 1268 exe_path = self.getBuildArtifact("a.out") 1269 args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"] 1270 hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args] 1271 1272 server = self.connect_to_debug_monitor() 1273 self.assertIsNotNone(server) 1274 self.do_handshake() 1275 # NB: strictly speaking we should use %x here but this packet 1276 # is deprecated, so no point in changing lldb-server's expectations 1277 self.test_sequence.add_log_lines( 1278 ["read packet: $A %d,0,%s,%d,1,%s,%d,2,%s,%d,3,%s#00" % 1279 tuple(itertools.chain.from_iterable( 1280 [(len(x), x) for x in hex_args])), 1281 "send packet: $OK#00", 1282 "read packet: $c#00", 1283 "send packet: $W00#00"], 1284 True) 1285 context = self.expect_gdbremote_sequence() 1286 self.assertEqual(context["O_content"], 1287 b'arg1\r\narg2\r\narg3\r\n') 1288 1289 @skipIfWindows # No pty support to test any inferior output 1290 @add_test_categories(["llgs"]) 1291 def test_launch_via_vRun(self): 1292 self.build() 1293 exe_path = self.getBuildArtifact("a.out") 1294 args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"] 1295 hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args] 1296 1297 server = self.connect_to_debug_monitor() 1298 self.assertIsNotNone(server) 1299 self.do_handshake() 1300 self.test_sequence.add_log_lines( 1301 ["read packet: $vRun;%s;%s;%s;%s#00" % tuple(hex_args), 1302 {"direction": "send", 1303 "regex": r"^\$T([0-9a-fA-F]+)"}, 1304 "read packet: $c#00", 1305 "send packet: $W00#00"], 1306 True) 1307 context = self.expect_gdbremote_sequence() 1308 self.assertEqual(context["O_content"], 1309 b'arg1\r\narg2\r\narg3\r\n') 1310 1311 @add_test_categories(["llgs"]) 1312 def test_launch_via_vRun_no_args(self): 1313 self.build() 1314 exe_path = self.getBuildArtifact("a.out") 1315 hex_path = binascii.b2a_hex(exe_path.encode()).decode() 1316 1317 server = self.connect_to_debug_monitor() 1318 self.assertIsNotNone(server) 1319 self.do_handshake() 1320 self.test_sequence.add_log_lines( 1321 ["read packet: $vRun;%s#00" % (hex_path,), 1322 {"direction": "send", 1323 "regex": r"^\$T([0-9a-fA-F]+)"}, 1324 "read packet: $c#00", 1325 "send packet: $W00#00"], 1326 True) 1327 self.expect_gdbremote_sequence() 1328 1329 @skipIfWindows # No pty support to test any inferior output 1330 @add_test_categories(["llgs"]) 1331 def test_QEnvironment(self): 1332 self.build() 1333 exe_path = self.getBuildArtifact("a.out") 1334 env = {"FOO": "test", "BAR": "a=z"} 1335 args = [exe_path, "print-env:FOO", "print-env:BAR"] 1336 hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args] 1337 1338 server = self.connect_to_debug_monitor() 1339 self.assertIsNotNone(server) 1340 self.do_handshake() 1341 1342 for key, value in env.items(): 1343 self.test_sequence.add_log_lines( 1344 ["read packet: $QEnvironment:%s=%s#00" % (key, value), 1345 "send packet: $OK#00"], 1346 True) 1347 self.test_sequence.add_log_lines( 1348 ["read packet: $vRun;%s#00" % (";".join(hex_args),), 1349 {"direction": "send", 1350 "regex": r"^\$T([0-9a-fA-F]+)"}, 1351 "read packet: $c#00", 1352 "send packet: $W00#00"], 1353 True) 1354 context = self.expect_gdbremote_sequence() 1355 self.assertEqual(context["O_content"], b"test\r\na=z\r\n") 1356 1357 @skipIfWindows # No pty support to test any inferior output 1358 @add_test_categories(["llgs"]) 1359 def test_QEnvironmentHexEncoded(self): 1360 self.build() 1361 exe_path = self.getBuildArtifact("a.out") 1362 env = {"FOO": "test", "BAR": "a=z", "BAZ": "a*}#z"} 1363 args = [exe_path, "print-env:FOO", "print-env:BAR", "print-env:BAZ"] 1364 hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args] 1365 1366 server = self.connect_to_debug_monitor() 1367 self.assertIsNotNone(server) 1368 self.do_handshake() 1369 1370 for key, value in env.items(): 1371 hex_enc = binascii.b2a_hex(("%s=%s" % (key, value)).encode()).decode() 1372 self.test_sequence.add_log_lines( 1373 ["read packet: $QEnvironmentHexEncoded:%s#00" % (hex_enc,), 1374 "send packet: $OK#00"], 1375 True) 1376 self.test_sequence.add_log_lines( 1377 ["read packet: $vRun;%s#00" % (";".join(hex_args),), 1378 {"direction": "send", 1379 "regex": r"^\$T([0-9a-fA-F]+)"}, 1380 "read packet: $c#00", 1381 "send packet: $W00#00"], 1382 True) 1383 context = self.expect_gdbremote_sequence() 1384 self.assertEqual(context["O_content"], b"test\r\na=z\r\na*}#z\r\n") 1385 1386 @skipUnlessPlatform(oslist=["freebsd", "linux"]) 1387 @add_test_categories(["llgs"]) 1388 def test_qXfer_siginfo_read(self): 1389 self.build() 1390 self.set_inferior_startup_launch() 1391 procs = self.prep_debug_monitor_and_inferior( 1392 inferior_args=["thread:segfault", "thread:new", "sleep:10"]) 1393 self.test_sequence.add_log_lines(["read packet: $c#63"], True) 1394 self.expect_gdbremote_sequence() 1395 1396 # Run until SIGSEGV comes in. 1397 self.reset_test_sequence() 1398 self.test_sequence.add_log_lines( 1399 [{"direction": "send", 1400 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 1401 "capture": {1: "signo", 2: "thread_id"}, 1402 }], True) 1403 1404 # Figure out which thread crashed. 1405 context = self.expect_gdbremote_sequence() 1406 self.assertIsNotNone(context) 1407 self.assertEqual(int(context["signo"], 16), 1408 lldbutil.get_signal_number('SIGSEGV')) 1409 crashing_thread = int(context["thread_id"], 16) 1410 1411 # Grab siginfo for the crashing thread. 1412 self.reset_test_sequence() 1413 self.add_process_info_collection_packets() 1414 self.test_sequence.add_log_lines( 1415 ["read packet: $Hg{:x}#00".format(crashing_thread), 1416 "send packet: $OK#00", 1417 "read packet: $qXfer:siginfo:read::0,80:#00", 1418 {"direction": "send", 1419 "regex": re.compile(r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", 1420 re.MULTILINE | re.DOTALL), 1421 "capture": {1: "response_type", 2: "content_raw"}, 1422 }], True) 1423 context = self.expect_gdbremote_sequence() 1424 self.assertIsNotNone(context) 1425 1426 # Ensure we end up with all data in one packet. 1427 self.assertEqual(context.get("response_type"), "l") 1428 1429 # Decode binary data. 1430 content_raw = context.get("content_raw") 1431 self.assertIsNotNone(content_raw) 1432 content = self.decode_gdbremote_binary(content_raw).encode("latin1") 1433 1434 # Decode siginfo_t. 1435 process_info = self.parse_process_info_response(context) 1436 pad = "" 1437 if process_info["ptrsize"] == "8": 1438 pad = "i" 1439 signo_idx = 0 1440 errno_idx = 1 1441 code_idx = 2 1442 addr_idx = -1 1443 SEGV_MAPERR = 1 1444 if process_info["ostype"] == "linux": 1445 # si_signo, si_errno, si_code, [pad], _sifields._sigfault.si_addr 1446 format_str = "iii{}P".format(pad) 1447 elif process_info["ostype"].startswith("freebsd"): 1448 # si_signo, si_errno, si_code, si_pid, si_uid, si_status, si_addr 1449 format_str = "iiiiiiP" 1450 elif process_info["ostype"].startswith("netbsd"): 1451 # _signo, _code, _errno, [pad], _reason._fault._addr 1452 format_str = "iii{}P".format(pad) 1453 errno_idx = 2 1454 code_idx = 1 1455 else: 1456 assert False, "unknown ostype" 1457 1458 decoder = struct.Struct(format_str) 1459 decoded = decoder.unpack(content[:decoder.size]) 1460 self.assertEqual(decoded[signo_idx], 1461 lldbutil.get_signal_number('SIGSEGV')) 1462 self.assertEqual(decoded[errno_idx], 0) # si_errno 1463 self.assertEqual(decoded[code_idx], SEGV_MAPERR) # si_code 1464 self.assertEqual(decoded[addr_idx], 0) # si_addr 1465