1""" 2Test case for testing the gdbremote protocol. 3 4Tests run against debugserver and lldb-server (llgs). 5lldb-server tests run where the lldb-server exe is 6available. 7 8This class will be broken into smaller test case classes by 9gdb remote packet functional areas. For now it contains 10the initial set of tests implemented. 11""" 12 13import binascii 14import itertools 15 16import unittest2 17import gdbremote_testcase 18import lldbgdbserverutils 19from lldbsuite.support import seven 20from lldbsuite.test.decorators import * 21from lldbsuite.test.lldbtest import * 22from lldbsuite.test.lldbdwarf import * 23from lldbsuite.test import lldbutil, lldbplatformutil 24 25 26class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase, DwarfOpcodeParser): 27 28 mydir = TestBase.compute_mydir(__file__) 29 30 def test_thread_suffix_supported(self): 31 server = self.connect_to_debug_monitor() 32 self.assertIsNotNone(server) 33 34 self.do_handshake() 35 self.test_sequence.add_log_lines( 36 ["lldb-server < 26> read packet: $QThreadSuffixSupported#e4", 37 "lldb-server < 6> send packet: $OK#9a"], 38 True) 39 40 self.expect_gdbremote_sequence() 41 42 43 def test_list_threads_in_stop_reply_supported(self): 44 server = self.connect_to_debug_monitor() 45 self.assertIsNotNone(server) 46 47 self.do_handshake() 48 self.test_sequence.add_log_lines( 49 ["lldb-server < 27> read packet: $QListThreadsInStopReply#21", 50 "lldb-server < 6> send packet: $OK#9a"], 51 True) 52 self.expect_gdbremote_sequence() 53 54 def test_c_packet_works(self): 55 self.build() 56 procs = self.prep_debug_monitor_and_inferior() 57 self.test_sequence.add_log_lines( 58 ["read packet: $c#63", 59 "send packet: $W00#00"], 60 True) 61 62 self.expect_gdbremote_sequence() 63 64 @skipIfWindows # No pty support to test any inferior output 65 def test_inferior_print_exit(self): 66 self.build() 67 procs = self.prep_debug_monitor_and_inferior( 68 inferior_args=["hello, world"]) 69 self.test_sequence.add_log_lines( 70 ["read packet: $vCont;c#a8", 71 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"hello, world\r\n")}, 72 "send packet: $W00#00"], 73 True) 74 75 context = self.expect_gdbremote_sequence() 76 self.assertIsNotNone(context) 77 78 def test_first_launch_stop_reply_thread_matches_first_qC(self): 79 self.build() 80 procs = self.prep_debug_monitor_and_inferior() 81 self.test_sequence.add_log_lines(["read packet: $qC#00", 82 {"direction": "send", 83 "regex": r"^\$QC([0-9a-fA-F]+)#", 84 "capture": {1: "thread_id_QC"}}, 85 "read packet: $?#00", 86 {"direction": "send", 87 "regex": r"^\$T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+)", 88 "capture": {1: "thread_id_?"}}], 89 True) 90 context = self.expect_gdbremote_sequence() 91 self.assertEqual(context.get("thread_id_QC"), context.get("thread_id_?")) 92 93 def test_attach_commandline_continue_app_exits(self): 94 self.build() 95 self.set_inferior_startup_attach() 96 procs = self.prep_debug_monitor_and_inferior() 97 self.test_sequence.add_log_lines( 98 ["read packet: $vCont;c#a8", 99 "send packet: $W00#00"], 100 True) 101 self.expect_gdbremote_sequence() 102 103 # Wait a moment for completed and now-detached inferior process to 104 # clear. 105 time.sleep(1) 106 107 if not lldb.remote_platform: 108 # Process should be dead now. Reap results. 109 poll_result = procs["inferior"].poll() 110 self.assertIsNotNone(poll_result) 111 112 # Where possible, verify at the system level that the process is not 113 # running. 114 self.assertFalse( 115 lldbgdbserverutils.process_is_running( 116 procs["inferior"].pid, False)) 117 118 def test_qRegisterInfo_returns_one_valid_result(self): 119 self.build() 120 self.prep_debug_monitor_and_inferior() 121 self.test_sequence.add_log_lines( 122 ["read packet: $qRegisterInfo0#00", 123 {"direction": "send", "regex": r"^\$(.+);#[0-9A-Fa-f]{2}", "capture": {1: "reginfo_0"}}], 124 True) 125 126 # Run the stream 127 context = self.expect_gdbremote_sequence() 128 self.assertIsNotNone(context) 129 130 reg_info_packet = context.get("reginfo_0") 131 self.assertIsNotNone(reg_info_packet) 132 self.assert_valid_reg_info( 133 lldbgdbserverutils.parse_reg_info_response(reg_info_packet)) 134 135 def test_qRegisterInfo_returns_all_valid_results(self): 136 self.build() 137 self.prep_debug_monitor_and_inferior() 138 self.add_register_info_collection_packets() 139 140 # Run the stream. 141 context = self.expect_gdbremote_sequence() 142 self.assertIsNotNone(context) 143 144 # Validate that each register info returned validates. 145 for reg_info in self.parse_register_info_packets(context): 146 self.assert_valid_reg_info(reg_info) 147 148 def test_qRegisterInfo_contains_required_generics_debugserver(self): 149 self.build() 150 self.prep_debug_monitor_and_inferior() 151 self.add_register_info_collection_packets() 152 153 # Run the packet stream. 154 context = self.expect_gdbremote_sequence() 155 self.assertIsNotNone(context) 156 157 # Gather register info entries. 158 reg_infos = self.parse_register_info_packets(context) 159 160 # Collect all generic registers found. 161 generic_regs = { 162 reg_info['generic']: 1 for reg_info in reg_infos if 'generic' in reg_info} 163 164 # Ensure we have a program counter register. 165 self.assertIn('pc', generic_regs) 166 167 # Ensure we have a frame pointer register. PPC64le's FP is the same as SP 168 if self.getArchitecture() != 'powerpc64le': 169 self.assertIn('fp', generic_regs) 170 171 # Ensure we have a stack pointer register. 172 self.assertIn('sp', generic_regs) 173 174 # Ensure we have a flags register. 175 self.assertIn('flags', generic_regs) 176 177 def test_qRegisterInfo_contains_at_least_one_register_set(self): 178 self.build() 179 self.prep_debug_monitor_and_inferior() 180 self.add_register_info_collection_packets() 181 182 # Run the packet stream. 183 context = self.expect_gdbremote_sequence() 184 self.assertIsNotNone(context) 185 186 # Gather register info entries. 187 reg_infos = self.parse_register_info_packets(context) 188 189 # Collect all register sets found. 190 register_sets = { 191 reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info} 192 self.assertTrue(len(register_sets) >= 1) 193 194 def targetHasAVX(self): 195 triple = self.dbg.GetSelectedPlatform().GetTriple() 196 197 # TODO other platforms, please implement this function 198 if not re.match(".*-.*-linux", triple): 199 return True 200 201 # Need to do something different for non-Linux/Android targets 202 if lldb.remote_platform: 203 self.runCmd('platform get-file "/proc/cpuinfo" "cpuinfo"') 204 cpuinfo_path = "cpuinfo" 205 self.addTearDownHook(lambda: os.unlink("cpuinfo")) 206 else: 207 cpuinfo_path = "/proc/cpuinfo" 208 209 f = open(cpuinfo_path, 'r') 210 cpuinfo = f.read() 211 f.close() 212 return " avx " in cpuinfo 213 214 @expectedFailureAll(oslist=["windows"]) # no avx for now. 215 @skipIf(archs=no_match(['amd64', 'i386', 'x86_64'])) 216 @add_test_categories(["llgs"]) 217 def test_qRegisterInfo_contains_avx_registers(self): 218 self.build() 219 self.prep_debug_monitor_and_inferior() 220 self.add_register_info_collection_packets() 221 222 # Run the packet stream. 223 context = self.expect_gdbremote_sequence() 224 self.assertIsNotNone(context) 225 226 # Gather register info entries. 227 reg_infos = self.parse_register_info_packets(context) 228 229 # Collect all generics found. 230 register_sets = { 231 reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info} 232 self.assertEqual( 233 self.targetHasAVX(), 234 "Advanced Vector Extensions" in register_sets) 235 236 def qThreadInfo_contains_thread(self): 237 procs = self.prep_debug_monitor_and_inferior() 238 self.add_threadinfo_collection_packets() 239 240 # Run the packet stream. 241 context = self.expect_gdbremote_sequence() 242 self.assertIsNotNone(context) 243 244 # Gather threadinfo entries. 245 threads = self.parse_threadinfo_packets(context) 246 self.assertIsNotNone(threads) 247 248 # We should have exactly one thread. 249 self.assertEqual(len(threads), 1) 250 251 def test_qThreadInfo_contains_thread_launch(self): 252 self.build() 253 self.set_inferior_startup_launch() 254 self.qThreadInfo_contains_thread() 255 256 @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped 257 def test_qThreadInfo_contains_thread_attach(self): 258 self.build() 259 self.set_inferior_startup_attach() 260 self.qThreadInfo_contains_thread() 261 262 def qThreadInfo_matches_qC(self): 263 procs = self.prep_debug_monitor_and_inferior() 264 265 self.add_threadinfo_collection_packets() 266 self.test_sequence.add_log_lines( 267 ["read packet: $qC#00", 268 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}} 269 ], True) 270 271 # Run the packet stream. 272 context = self.expect_gdbremote_sequence() 273 self.assertIsNotNone(context) 274 275 # Gather threadinfo entries. 276 threads = self.parse_threadinfo_packets(context) 277 self.assertIsNotNone(threads) 278 279 # We should have exactly one thread from threadinfo. 280 self.assertEqual(len(threads), 1) 281 282 # We should have a valid thread_id from $QC. 283 QC_thread_id_hex = context.get("thread_id") 284 self.assertIsNotNone(QC_thread_id_hex) 285 QC_thread_id = int(QC_thread_id_hex, 16) 286 287 # Those two should be the same. 288 self.assertEqual(threads[0], QC_thread_id) 289 290 def test_qThreadInfo_matches_qC_launch(self): 291 self.build() 292 self.set_inferior_startup_launch() 293 self.qThreadInfo_matches_qC() 294 295 @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped 296 def test_qThreadInfo_matches_qC_attach(self): 297 self.build() 298 self.set_inferior_startup_attach() 299 self.qThreadInfo_matches_qC() 300 301 def test_p_returns_correct_data_size_for_each_qRegisterInfo_launch(self): 302 self.build() 303 self.set_inferior_startup_launch() 304 procs = self.prep_debug_monitor_and_inferior() 305 self.add_register_info_collection_packets() 306 307 # Run the packet stream. 308 context = self.expect_gdbremote_sequence() 309 self.assertIsNotNone(context) 310 311 # Gather register info entries. 312 reg_infos = self.parse_register_info_packets(context) 313 self.assertIsNotNone(reg_infos) 314 self.assertTrue(len(reg_infos) > 0) 315 316 byte_order = self.get_target_byte_order() 317 318 # Read value for each register. 319 reg_index = 0 320 for reg_info in reg_infos: 321 # Skip registers that don't have a register set. For x86, these are 322 # the DRx registers, which have no LLDB-kind register number and thus 323 # cannot be read via normal 324 # NativeRegisterContext::ReadRegister(reg_info,...) calls. 325 if not "set" in reg_info: 326 continue 327 328 # Clear existing packet expectations. 329 self.reset_test_sequence() 330 331 # Run the register query 332 self.test_sequence.add_log_lines( 333 ["read packet: $p{0:x}#00".format(reg_index), 334 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}], 335 True) 336 context = self.expect_gdbremote_sequence() 337 self.assertIsNotNone(context) 338 339 # Verify the response length. 340 p_response = context.get("p_response") 341 self.assertIsNotNone(p_response) 342 343 # Skip erraneous (unsupported) registers. 344 # TODO: remove this once we make unsupported registers disappear. 345 if p_response.startswith("E") and len(p_response) == 3: 346 continue 347 348 if "dynamic_size_dwarf_expr_bytes" in reg_info: 349 self.updateRegInfoBitsize(reg_info, byte_order) 350 self.assertEqual(len(p_response), 2 * int(reg_info["bitsize"]) / 8, 351 reg_info) 352 353 # Increment loop 354 reg_index += 1 355 356 def Hg_switches_to_3_threads(self, pass_pid=False): 357 # Startup the inferior with three threads (main + 2 new ones). 358 procs = self.prep_debug_monitor_and_inferior( 359 inferior_args=["thread:new", "thread:new"]) 360 361 # Let the inferior process have a few moments to start up the thread 362 # when launched. (The launch scenario has no time to run, so threads 363 # won't be there yet.) 364 self.run_process_then_stop(run_seconds=1) 365 366 # Wait at most x seconds for 3 threads to be present. 367 threads = self.wait_for_thread_count(3) 368 self.assertEqual(len(threads), 3) 369 370 pid_str = "" 371 if pass_pid: 372 pid_str = "p{0:x}.".format(procs["inferior"].pid) 373 374 # verify we can $H to each thead, and $qC matches the thread we set. 375 for thread in threads: 376 # Change to each thread, verify current thread id. 377 self.reset_test_sequence() 378 self.test_sequence.add_log_lines( 379 ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread), # Set current thread. 380 "send packet: $OK#00", 381 "read packet: $qC#00", 382 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}], 383 True) 384 385 context = self.expect_gdbremote_sequence() 386 self.assertIsNotNone(context) 387 388 # Verify the thread id. 389 self.assertIsNotNone(context.get("thread_id")) 390 self.assertEqual(int(context.get("thread_id"), 16), thread) 391 392 @expectedFailureAll(oslist=["windows"]) # expect 4 threads 393 @skipIf(compiler="clang", compiler_version=['<', '11.0']) 394 def test_Hg_switches_to_3_threads_launch(self): 395 self.build() 396 self.set_inferior_startup_launch() 397 self.Hg_switches_to_3_threads() 398 399 @expectedFailureAll(oslist=["windows"]) # expecting one more thread 400 @skipIf(compiler="clang", compiler_version=['<', '11.0']) 401 def test_Hg_switches_to_3_threads_attach(self): 402 self.build() 403 self.set_inferior_startup_attach() 404 self.Hg_switches_to_3_threads() 405 406 @expectedFailureAll(oslist=["windows"]) # expect 4 threads 407 @add_test_categories(["llgs"]) 408 @skipIf(compiler="clang", compiler_version=['<', '11.0']) 409 def test_Hg_switches_to_3_threads_attach_pass_correct_pid(self): 410 self.build() 411 self.set_inferior_startup_attach() 412 self.Hg_switches_to_3_threads(pass_pid=True) 413 414 def Hg_fails_on_pid(self, pass_pid): 415 # Start the inferior. 416 procs = self.prep_debug_monitor_and_inferior( 417 inferior_args=["thread:new"]) 418 419 self.run_process_then_stop(run_seconds=1) 420 421 threads = self.wait_for_thread_count(2) 422 self.assertEqual(len(threads), 2) 423 424 if pass_pid == -1: 425 pid_str = "p-1." 426 else: 427 pid_str = "p{0:x}.".format(pass_pid) 428 thread = threads[1] 429 430 self.test_sequence.add_log_lines( 431 ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread), # Set current thread. 432 "send packet: $Eff#00"], 433 True) 434 435 self.expect_gdbremote_sequence() 436 437 @expectedFailureAll(oslist=["windows"]) 438 @add_test_categories(["llgs"]) 439 def test_Hg_fails_on_another_pid(self): 440 self.build() 441 self.set_inferior_startup_launch() 442 self.Hg_fails_on_pid(1) 443 444 @expectedFailureAll(oslist=["windows"]) 445 @add_test_categories(["llgs"]) 446 def test_Hg_fails_on_zero_pid(self): 447 self.build() 448 self.set_inferior_startup_launch() 449 self.Hg_fails_on_pid(0) 450 451 @expectedFailureAll(oslist=["windows"]) 452 @add_test_categories(["llgs"]) 453 def test_Hg_fails_on_minus_one_pid(self): 454 self.build() 455 self.set_inferior_startup_launch() 456 self.Hg_fails_on_pid(-1) 457 458 def Hc_then_Csignal_signals_correct_thread(self, segfault_signo): 459 # NOTE only run this one in inferior-launched mode: we can't grab inferior stdout when running attached, 460 # and the test requires getting stdout from the exe. 461 462 NUM_THREADS = 3 463 464 # Startup the inferior with three threads (main + NUM_THREADS-1 worker threads). 465 # inferior_args=["thread:print-ids"] 466 inferior_args = ["thread:segfault"] 467 for i in range(NUM_THREADS - 1): 468 # if i > 0: 469 # Give time between thread creation/segfaulting for the handler to work. 470 # inferior_args.append("sleep:1") 471 inferior_args.append("thread:new") 472 inferior_args.append("sleep:10") 473 474 # Launch/attach. (In our case, this should only ever be launched since 475 # we need inferior stdout/stderr). 476 procs = self.prep_debug_monitor_and_inferior( 477 inferior_args=inferior_args) 478 self.test_sequence.add_log_lines(["read packet: $c#63"], True) 479 context = self.expect_gdbremote_sequence() 480 481 # Let the inferior process have a few moments to start up the thread when launched. 482 # context = self.run_process_then_stop(run_seconds=1) 483 484 # Wait at most x seconds for all threads to be present. 485 # threads = self.wait_for_thread_count(NUM_THREADS) 486 # self.assertEquals(len(threads), NUM_THREADS) 487 488 signaled_tids = {} 489 print_thread_ids = {} 490 491 # Switch to each thread, deliver a signal, and verify signal delivery 492 for i in range(NUM_THREADS - 1): 493 # Run until SIGSEGV comes in. 494 self.reset_test_sequence() 495 self.test_sequence.add_log_lines([{"direction": "send", 496 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 497 "capture": {1: "signo", 498 2: "thread_id"}}], 499 True) 500 501 context = self.expect_gdbremote_sequence() 502 self.assertIsNotNone(context) 503 signo = context.get("signo") 504 self.assertEqual(int(signo, 16), segfault_signo) 505 506 # Ensure we haven't seen this tid yet. 507 thread_id = int(context.get("thread_id"), 16) 508 self.assertNotIn(thread_id, signaled_tids) 509 signaled_tids[thread_id] = 1 510 511 # Send SIGUSR1 to the thread that signaled the SIGSEGV. 512 self.reset_test_sequence() 513 self.test_sequence.add_log_lines( 514 [ 515 # Set the continue thread. 516 # Set current thread. 517 "read packet: $Hc{0:x}#00".format(thread_id), 518 "send packet: $OK#00", 519 520 # Continue sending the signal number to the continue thread. 521 # The commented out packet is a way to do this same operation without using 522 # a $Hc (but this test is testing $Hc, so we'll stick with the former). 523 "read packet: $C{0:x}#00".format(lldbutil.get_signal_number('SIGUSR1')), 524 # "read packet: $vCont;C{0:x}:{1:x};c#00".format(lldbutil.get_signal_number('SIGUSR1'), thread_id), 525 526 # FIXME: Linux does not report the thread stop on the delivered signal (SIGUSR1 here). MacOSX debugserver does. 527 # But MacOSX debugserver isn't guaranteeing the thread the signal handler runs on, so currently its an XFAIL. 528 # Need to rectify behavior here. The linux behavior is more intuitive to me since we're essentially swapping out 529 # an about-to-be-delivered signal (for which we already sent a stop packet) to a different signal. 530 # {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} }, 531 # "read packet: $c#63", 532 {"type": "output_match", "regex": r"^received SIGUSR1 on thread id: ([0-9a-fA-F]+)\r\nthread ([0-9a-fA-F]+): past SIGSEGV\r\n", "capture": {1: "print_thread_id", 2: "post_handle_thread_id"}}, 533 ], 534 True) 535 536 # Run the sequence. 537 context = self.expect_gdbremote_sequence() 538 self.assertIsNotNone(context) 539 540 # Ensure the stop signal is the signal we delivered. 541 # stop_signo = context.get("stop_signo") 542 # self.assertIsNotNone(stop_signo) 543 # self.assertEquals(int(stop_signo,16), lldbutil.get_signal_number('SIGUSR1')) 544 545 # Ensure the stop thread is the thread to which we delivered the signal. 546 # stop_thread_id = context.get("stop_thread_id") 547 # self.assertIsNotNone(stop_thread_id) 548 # self.assertEquals(int(stop_thread_id,16), thread_id) 549 550 # Ensure we haven't seen this thread id yet. The inferior's 551 # self-obtained thread ids are not guaranteed to match the stub 552 # tids (at least on MacOSX). 553 print_thread_id = context.get("print_thread_id") 554 self.assertIsNotNone(print_thread_id) 555 print_thread_id = int(print_thread_id, 16) 556 self.assertNotIn(print_thread_id, print_thread_ids) 557 558 # Now remember this print (i.e. inferior-reflected) thread id and 559 # ensure we don't hit it again. 560 print_thread_ids[print_thread_id] = 1 561 562 # Ensure post signal-handle thread id matches the thread that 563 # initially raised the SIGSEGV. 564 post_handle_thread_id = context.get("post_handle_thread_id") 565 self.assertIsNotNone(post_handle_thread_id) 566 post_handle_thread_id = int(post_handle_thread_id, 16) 567 self.assertEqual(post_handle_thread_id, print_thread_id) 568 569 @expectedFailureDarwin 570 @skipIfWindows # no SIGSEGV support 571 @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48419") 572 @expectedFailureNetBSD 573 def test_Hc_then_Csignal_signals_correct_thread_launch(self): 574 self.build() 575 self.set_inferior_startup_launch() 576 577 if self.platformIsDarwin(): 578 # Darwin debugserver translates some signals like SIGSEGV into some gdb 579 # expectations about fixed signal numbers. 580 self.Hc_then_Csignal_signals_correct_thread(self.TARGET_EXC_BAD_ACCESS) 581 else: 582 self.Hc_then_Csignal_signals_correct_thread( 583 lldbutil.get_signal_number('SIGSEGV')) 584 585 @skipIfWindows # No pty support to test any inferior output 586 def test_m_packet_reads_memory(self): 587 self.build() 588 self.set_inferior_startup_launch() 589 # This is the memory we will write into the inferior and then ensure we 590 # can read back with $m. 591 MEMORY_CONTENTS = "Test contents 0123456789 ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz" 592 593 # Start up the inferior. 594 procs = self.prep_debug_monitor_and_inferior( 595 inferior_args=[ 596 "set-message:%s" % 597 MEMORY_CONTENTS, 598 "get-data-address-hex:g_message", 599 "sleep:5"]) 600 601 # Run the process 602 self.test_sequence.add_log_lines( 603 [ 604 # Start running after initial stop. 605 "read packet: $c#63", 606 # Match output line that prints the memory address of the message buffer within the inferior. 607 # Note we require launch-only testing so we can get inferior otuput. 608 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"), 609 "capture": {1: "message_address"}}, 610 # Now stop the inferior. 611 "read packet: {}".format(chr(3)), 612 # And wait for the stop notification. 613 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 614 True) 615 616 # Run the packet stream. 617 context = self.expect_gdbremote_sequence() 618 self.assertIsNotNone(context) 619 620 # Grab the message address. 621 self.assertIsNotNone(context.get("message_address")) 622 message_address = int(context.get("message_address"), 16) 623 624 # Grab contents from the inferior. 625 self.reset_test_sequence() 626 self.test_sequence.add_log_lines( 627 ["read packet: $m{0:x},{1:x}#00".format(message_address, len(MEMORY_CONTENTS)), 628 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "read_contents"}}], 629 True) 630 631 # Run the packet stream. 632 context = self.expect_gdbremote_sequence() 633 self.assertIsNotNone(context) 634 635 # Ensure what we read from inferior memory is what we wrote. 636 self.assertIsNotNone(context.get("read_contents")) 637 read_contents = seven.unhexlify(context.get("read_contents")) 638 self.assertEqual(read_contents, MEMORY_CONTENTS) 639 640 def test_qMemoryRegionInfo_is_supported(self): 641 self.build() 642 self.set_inferior_startup_launch() 643 # Start up the inferior. 644 procs = self.prep_debug_monitor_and_inferior() 645 646 # Ask if it supports $qMemoryRegionInfo. 647 self.test_sequence.add_log_lines( 648 ["read packet: $qMemoryRegionInfo#00", 649 "send packet: $OK#00" 650 ], True) 651 self.expect_gdbremote_sequence() 652 653 @skipIfWindows # No pty support to test any inferior output 654 def test_qMemoryRegionInfo_reports_code_address_as_executable(self): 655 self.build() 656 self.set_inferior_startup_launch() 657 658 # Start up the inferior. 659 procs = self.prep_debug_monitor_and_inferior( 660 inferior_args=["get-code-address-hex:hello", "sleep:5"]) 661 662 # Run the process 663 self.test_sequence.add_log_lines( 664 [ 665 # Start running after initial stop. 666 "read packet: $c#63", 667 # Match output line that prints the memory address of the message buffer within the inferior. 668 # Note we require launch-only testing so we can get inferior otuput. 669 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"), 670 "capture": {1: "code_address"}}, 671 # Now stop the inferior. 672 "read packet: {}".format(chr(3)), 673 # And wait for the stop notification. 674 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 675 True) 676 677 # Run the packet stream. 678 context = self.expect_gdbremote_sequence() 679 self.assertIsNotNone(context) 680 681 # Grab the code address. 682 self.assertIsNotNone(context.get("code_address")) 683 code_address = int(context.get("code_address"), 16) 684 685 # Grab memory region info from the inferior. 686 self.reset_test_sequence() 687 self.add_query_memory_region_packets(code_address) 688 689 # Run the packet stream. 690 context = self.expect_gdbremote_sequence() 691 self.assertIsNotNone(context) 692 mem_region_dict = self.parse_memory_region_packet(context) 693 694 # Ensure there are no errors reported. 695 self.assertNotIn("error", mem_region_dict) 696 697 # Ensure code address is readable and executable. 698 self.assertIn("permissions", mem_region_dict) 699 self.assertIn("r", mem_region_dict["permissions"]) 700 self.assertIn("x", mem_region_dict["permissions"]) 701 702 # Ensure the start address and size encompass the address we queried. 703 self.assert_address_within_memory_region(code_address, mem_region_dict) 704 705 @skipIfWindows # No pty support to test any inferior output 706 def test_qMemoryRegionInfo_reports_stack_address_as_rw(self): 707 self.build() 708 self.set_inferior_startup_launch() 709 710 # Start up the inferior. 711 procs = self.prep_debug_monitor_and_inferior( 712 inferior_args=["get-stack-address-hex:", "sleep:5"]) 713 714 # Run the process 715 self.test_sequence.add_log_lines( 716 [ 717 # Start running after initial stop. 718 "read packet: $c#63", 719 # Match output line that prints the memory address of the message buffer within the inferior. 720 # Note we require launch-only testing so we can get inferior otuput. 721 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"stack address: 0x([0-9a-fA-F]+)\r\n"), 722 "capture": {1: "stack_address"}}, 723 # Now stop the inferior. 724 "read packet: {}".format(chr(3)), 725 # And wait for the stop notification. 726 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 727 True) 728 729 # Run the packet stream. 730 context = self.expect_gdbremote_sequence() 731 self.assertIsNotNone(context) 732 733 # Grab the address. 734 self.assertIsNotNone(context.get("stack_address")) 735 stack_address = int(context.get("stack_address"), 16) 736 737 # Grab memory region info from the inferior. 738 self.reset_test_sequence() 739 self.add_query_memory_region_packets(stack_address) 740 741 # Run the packet stream. 742 context = self.expect_gdbremote_sequence() 743 self.assertIsNotNone(context) 744 mem_region_dict = self.parse_memory_region_packet(context) 745 746 # Ensure there are no errors reported. 747 self.assertNotIn("error", mem_region_dict) 748 749 # Ensure address is readable and executable. 750 self.assertIn("permissions", mem_region_dict) 751 self.assertIn("r", mem_region_dict["permissions"]) 752 self.assertIn("w", mem_region_dict["permissions"]) 753 754 # Ensure the start address and size encompass the address we queried. 755 self.assert_address_within_memory_region( 756 stack_address, mem_region_dict) 757 758 @skipIfWindows # No pty support to test any inferior output 759 def test_qMemoryRegionInfo_reports_heap_address_as_rw(self): 760 self.build() 761 self.set_inferior_startup_launch() 762 763 # Start up the inferior. 764 procs = self.prep_debug_monitor_and_inferior( 765 inferior_args=["get-heap-address-hex:", "sleep:5"]) 766 767 # Run the process 768 self.test_sequence.add_log_lines( 769 [ 770 # Start running after initial stop. 771 "read packet: $c#63", 772 # Match output line that prints the memory address of the message buffer within the inferior. 773 # Note we require launch-only testing so we can get inferior otuput. 774 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"heap address: 0x([0-9a-fA-F]+)\r\n"), 775 "capture": {1: "heap_address"}}, 776 # Now stop the inferior. 777 "read packet: {}".format(chr(3)), 778 # And wait for the stop notification. 779 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 780 True) 781 782 # Run the packet stream. 783 context = self.expect_gdbremote_sequence() 784 self.assertIsNotNone(context) 785 786 # Grab the address. 787 self.assertIsNotNone(context.get("heap_address")) 788 heap_address = int(context.get("heap_address"), 16) 789 790 # Grab memory region info from the inferior. 791 self.reset_test_sequence() 792 self.add_query_memory_region_packets(heap_address) 793 794 # Run the packet stream. 795 context = self.expect_gdbremote_sequence() 796 self.assertIsNotNone(context) 797 mem_region_dict = self.parse_memory_region_packet(context) 798 799 # Ensure there are no errors reported. 800 self.assertNotIn("error", mem_region_dict) 801 802 # Ensure address is readable and executable. 803 self.assertIn("permissions", mem_region_dict) 804 self.assertIn("r", mem_region_dict["permissions"]) 805 self.assertIn("w", mem_region_dict["permissions"]) 806 807 # Ensure the start address and size encompass the address we queried. 808 self.assert_address_within_memory_region(heap_address, mem_region_dict) 809 810 def breakpoint_set_and_remove_work(self, want_hardware): 811 # Start up the inferior. 812 procs = self.prep_debug_monitor_and_inferior( 813 inferior_args=[ 814 "get-code-address-hex:hello", 815 "sleep:1", 816 "call-function:hello"]) 817 818 # Run the process 819 self.add_register_info_collection_packets() 820 self.add_process_info_collection_packets() 821 self.test_sequence.add_log_lines( 822 [ # Start running after initial stop. 823 "read packet: $c#63", 824 # Match output line that prints the memory address of the function call entry point. 825 # Note we require launch-only testing so we can get inferior otuput. 826 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"), 827 "capture": {1: "function_address"}}, 828 # Now stop the inferior. 829 "read packet: {}".format(chr(3)), 830 # And wait for the stop notification. 831 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 832 True) 833 834 # Run the packet stream. 835 context = self.expect_gdbremote_sequence() 836 self.assertIsNotNone(context) 837 838 # Gather process info - we need endian of target to handle register 839 # value conversions. 840 process_info = self.parse_process_info_response(context) 841 endian = process_info.get("endian") 842 self.assertIsNotNone(endian) 843 844 # Gather register info entries. 845 reg_infos = self.parse_register_info_packets(context) 846 (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_infos) 847 self.assertIsNotNone(pc_lldb_reg_index) 848 self.assertIsNotNone(pc_reg_info) 849 850 # Grab the function address. 851 self.assertIsNotNone(context.get("function_address")) 852 function_address = int(context.get("function_address"), 16) 853 854 # Get current target architecture 855 target_arch = self.getArchitecture() 856 857 # Set the breakpoint. 858 if (target_arch == "arm") or (target_arch == "aarch64"): 859 # TODO: Handle case when setting breakpoint in thumb code 860 BREAKPOINT_KIND = 4 861 else: 862 BREAKPOINT_KIND = 1 863 864 # Set default packet type to Z0 (software breakpoint) 865 z_packet_type = 0 866 867 # If hardware breakpoint is requested set packet type to Z1 868 if want_hardware == True: 869 z_packet_type = 1 870 871 self.reset_test_sequence() 872 self.add_set_breakpoint_packets( 873 function_address, 874 z_packet_type, 875 do_continue=True, 876 breakpoint_kind=BREAKPOINT_KIND) 877 878 # Run the packet stream. 879 context = self.expect_gdbremote_sequence() 880 self.assertIsNotNone(context) 881 882 # Verify the stop signal reported was the breakpoint signal number. 883 stop_signo = context.get("stop_signo") 884 self.assertIsNotNone(stop_signo) 885 self.assertEqual(int(stop_signo, 16), 886 lldbutil.get_signal_number('SIGTRAP')) 887 888 # Ensure we did not receive any output. If the breakpoint was not set, we would 889 # see output (from a launched process with captured stdio) printing a hello, world message. 890 # That would indicate the breakpoint didn't take. 891 self.assertEqual(len(context["O_content"]), 0) 892 893 # Verify that the PC for the main thread is where we expect it - right at the breakpoint address. 894 # This acts as a another validation on the register reading code. 895 self.reset_test_sequence() 896 self.test_sequence.add_log_lines( 897 [ 898 # Print the PC. This should match the breakpoint address. 899 "read packet: $p{0:x}#00".format(pc_lldb_reg_index), 900 # Capture $p results. 901 {"direction": "send", 902 "regex": r"^\$([0-9a-fA-F]+)#", 903 "capture": {1: "p_response"}}, 904 ], True) 905 906 context = self.expect_gdbremote_sequence() 907 self.assertIsNotNone(context) 908 909 # Verify the PC is where we expect. Note response is in endianness of 910 # the inferior. 911 p_response = context.get("p_response") 912 self.assertIsNotNone(p_response) 913 914 # Convert from target endian to int. 915 returned_pc = lldbgdbserverutils.unpack_register_hex_unsigned( 916 endian, p_response) 917 self.assertEqual(returned_pc, function_address) 918 919 # Verify that a breakpoint remove and continue gets us the expected 920 # output. 921 self.reset_test_sequence() 922 923 # Add breakpoint remove packets 924 self.add_remove_breakpoint_packets( 925 function_address, 926 z_packet_type, 927 breakpoint_kind=BREAKPOINT_KIND) 928 929 self.test_sequence.add_log_lines( 930 [ 931 # Continue running. 932 "read packet: $c#63", 933 # We should now receive the output from the call. 934 {"type": "output_match", "regex": r"^hello, world\r\n$"}, 935 # And wait for program completion. 936 {"direction": "send", "regex": r"^\$W00(.*)#[0-9a-fA-F]{2}$"}, 937 ], True) 938 939 context = self.expect_gdbremote_sequence() 940 self.assertIsNotNone(context) 941 942 @skipIfWindows # No pty support to test any inferior output 943 def test_software_breakpoint_set_and_remove_work(self): 944 if self.getArchitecture() == "arm": 945 # TODO: Handle case when setting breakpoint in thumb code 946 self.build(dictionary={'CFLAGS_EXTRAS': '-marm'}) 947 else: 948 self.build() 949 self.set_inferior_startup_launch() 950 self.breakpoint_set_and_remove_work(want_hardware=False) 951 952 @skipUnlessPlatform(oslist=['linux']) 953 @skipIf(archs=no_match(['arm', 'aarch64'])) 954 def test_hardware_breakpoint_set_and_remove_work(self): 955 if self.getArchitecture() == "arm": 956 # TODO: Handle case when setting breakpoint in thumb code 957 self.build(dictionary={'CFLAGS_EXTRAS': '-marm'}) 958 else: 959 self.build() 960 self.set_inferior_startup_launch() 961 self.breakpoint_set_and_remove_work(want_hardware=True) 962 963 def get_qSupported_dict(self, features=[]): 964 self.build() 965 self.set_inferior_startup_launch() 966 967 # Start up the stub and start/prep the inferior. 968 procs = self.prep_debug_monitor_and_inferior() 969 self.add_qSupported_packets(features) 970 971 # Run the packet stream. 972 context = self.expect_gdbremote_sequence() 973 self.assertIsNotNone(context) 974 975 # Retrieve the qSupported features. 976 return self.parse_qSupported_response(context) 977 978 def test_qSupported_returns_known_stub_features(self): 979 supported_dict = self.get_qSupported_dict() 980 self.assertIsNotNone(supported_dict) 981 self.assertTrue(len(supported_dict) > 0) 982 983 def test_qSupported_auvx(self): 984 expected = ('+' if lldbplatformutil.getPlatform() 985 in ["freebsd", "linux", "netbsd"] else '-') 986 supported_dict = self.get_qSupported_dict() 987 self.assertEqual(supported_dict.get('qXfer:auxv:read', '-'), expected) 988 989 def test_qSupported_libraries_svr4(self): 990 expected = ('+' if lldbplatformutil.getPlatform() 991 in ["freebsd", "linux", "netbsd"] else '-') 992 supported_dict = self.get_qSupported_dict() 993 self.assertEqual(supported_dict.get('qXfer:libraries-svr4:read', '-'), 994 expected) 995 996 def test_qSupported_QPassSignals(self): 997 expected = ('+' if lldbplatformutil.getPlatform() 998 in ["freebsd", "linux", "netbsd"] else '-') 999 supported_dict = self.get_qSupported_dict() 1000 self.assertEqual(supported_dict.get('QPassSignals', '-'), expected) 1001 1002 @add_test_categories(["fork"]) 1003 def test_qSupported_fork_events(self): 1004 supported_dict = ( 1005 self.get_qSupported_dict(['multiprocess+', 'fork-events+'])) 1006 self.assertEqual(supported_dict.get('multiprocess', '-'), '+') 1007 self.assertEqual(supported_dict.get('fork-events', '-'), '+') 1008 self.assertEqual(supported_dict.get('vfork-events', '-'), '-') 1009 1010 @add_test_categories(["fork"]) 1011 def test_qSupported_fork_events_without_multiprocess(self): 1012 supported_dict = ( 1013 self.get_qSupported_dict(['fork-events+'])) 1014 self.assertEqual(supported_dict.get('multiprocess', '-'), '-') 1015 self.assertEqual(supported_dict.get('fork-events', '-'), '-') 1016 self.assertEqual(supported_dict.get('vfork-events', '-'), '-') 1017 1018 @add_test_categories(["fork"]) 1019 def test_qSupported_vfork_events(self): 1020 supported_dict = ( 1021 self.get_qSupported_dict(['multiprocess+', 'vfork-events+'])) 1022 self.assertEqual(supported_dict.get('multiprocess', '-'), '+') 1023 self.assertEqual(supported_dict.get('fork-events', '-'), '-') 1024 self.assertEqual(supported_dict.get('vfork-events', '-'), '+') 1025 1026 @add_test_categories(["fork"]) 1027 def test_qSupported_vfork_events_without_multiprocess(self): 1028 supported_dict = ( 1029 self.get_qSupported_dict(['vfork-events+'])) 1030 self.assertEqual(supported_dict.get('multiprocess', '-'), '-') 1031 self.assertEqual(supported_dict.get('fork-events', '-'), '-') 1032 self.assertEqual(supported_dict.get('vfork-events', '-'), '-') 1033 1034 # We need to be able to self.runCmd to get cpuinfo, 1035 # which is not possible when using a remote platform. 1036 @skipIfRemote 1037 def test_qSupported_memory_tagging(self): 1038 supported_dict = self.get_qSupported_dict() 1039 self.assertEqual(supported_dict.get("memory-tagging", '-'), 1040 '+' if self.isAArch64MTE() else '-') 1041 1042 @skipIfWindows # No pty support to test any inferior output 1043 def test_written_M_content_reads_back_correctly(self): 1044 self.build() 1045 self.set_inferior_startup_launch() 1046 1047 TEST_MESSAGE = "Hello, memory" 1048 1049 # Start up the stub and start/prep the inferior. 1050 procs = self.prep_debug_monitor_and_inferior( 1051 inferior_args=[ 1052 "set-message:xxxxxxxxxxxxxX", 1053 "get-data-address-hex:g_message", 1054 "sleep:1", 1055 "print-message:"]) 1056 self.test_sequence.add_log_lines( 1057 [ 1058 # Start running after initial stop. 1059 "read packet: $c#63", 1060 # Match output line that prints the memory address of the message buffer within the inferior. 1061 # Note we require launch-only testing so we can get inferior otuput. 1062 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"), 1063 "capture": {1: "message_address"}}, 1064 # Now stop the inferior. 1065 "read packet: {}".format(chr(3)), 1066 # And wait for the stop notification. 1067 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 1068 True) 1069 context = self.expect_gdbremote_sequence() 1070 self.assertIsNotNone(context) 1071 1072 # Grab the message address. 1073 self.assertIsNotNone(context.get("message_address")) 1074 message_address = int(context.get("message_address"), 16) 1075 1076 # Hex-encode the test message, adding null termination. 1077 hex_encoded_message = seven.hexlify(TEST_MESSAGE) 1078 1079 # Write the message to the inferior. Verify that we can read it with the hex-encoded (m) 1080 # and binary (x) memory read packets. 1081 self.reset_test_sequence() 1082 self.test_sequence.add_log_lines( 1083 ["read packet: $M{0:x},{1:x}:{2}#00".format(message_address, len(TEST_MESSAGE), hex_encoded_message), 1084 "send packet: $OK#00", 1085 "read packet: $m{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)), 1086 "send packet: ${0}#00".format(hex_encoded_message), 1087 "read packet: $x{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)), 1088 "send packet: ${0}#00".format(TEST_MESSAGE), 1089 "read packet: $m{0:x},4#00".format(message_address), 1090 "send packet: ${0}#00".format(hex_encoded_message[0:8]), 1091 "read packet: $x{0:x},4#00".format(message_address), 1092 "send packet: ${0}#00".format(TEST_MESSAGE[0:4]), 1093 "read packet: $c#63", 1094 {"type": "output_match", "regex": r"^message: (.+)\r\n$", "capture": {1: "printed_message"}}, 1095 "send packet: $W00#00", 1096 ], True) 1097 context = self.expect_gdbremote_sequence() 1098 self.assertIsNotNone(context) 1099 1100 # Ensure what we read from inferior memory is what we wrote. 1101 printed_message = context.get("printed_message") 1102 self.assertIsNotNone(printed_message) 1103 self.assertEqual(printed_message, TEST_MESSAGE + "X") 1104 1105 # Note: as of this moment, a hefty number of the GPR writes are failing with E32 (everything except rax-rdx, rdi, rsi, rbp). 1106 # Come back to this. I have the test rigged to verify that at least some 1107 # of the bit-flip writes work. 1108 def test_P_writes_all_gpr_registers(self): 1109 self.build() 1110 self.set_inferior_startup_launch() 1111 1112 # Start inferior debug session, grab all register info. 1113 procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"]) 1114 self.add_register_info_collection_packets() 1115 self.add_process_info_collection_packets() 1116 1117 context = self.expect_gdbremote_sequence() 1118 self.assertIsNotNone(context) 1119 1120 # Process register infos. 1121 reg_infos = self.parse_register_info_packets(context) 1122 self.assertIsNotNone(reg_infos) 1123 self.add_lldb_register_index(reg_infos) 1124 1125 # Process endian. 1126 process_info = self.parse_process_info_response(context) 1127 endian = process_info.get("endian") 1128 self.assertIsNotNone(endian) 1129 1130 # Pull out the register infos that we think we can bit flip 1131 # successfully,. 1132 gpr_reg_infos = [ 1133 reg_info for reg_info in reg_infos if self.is_bit_flippable_register(reg_info)] 1134 self.assertTrue(len(gpr_reg_infos) > 0) 1135 1136 # Write flipped bit pattern of existing value to each register. 1137 (successful_writes, failed_writes) = self.flip_all_bits_in_each_register_value( 1138 gpr_reg_infos, endian) 1139 self.trace("successful writes: {}, failed writes: {}".format(successful_writes, failed_writes)) 1140 self.assertTrue(successful_writes > 0) 1141 1142 # Note: as of this moment, a hefty number of the GPR writes are failing 1143 # with E32 (everything except rax-rdx, rdi, rsi, rbp). 1144 @skipIfWindows 1145 def test_P_and_p_thread_suffix_work(self): 1146 self.build() 1147 self.set_inferior_startup_launch() 1148 1149 # Startup the inferior with three threads. 1150 procs = self.prep_debug_monitor_and_inferior( 1151 inferior_args=["thread:new", "thread:new"]) 1152 self.add_thread_suffix_request_packets() 1153 self.add_register_info_collection_packets() 1154 self.add_process_info_collection_packets() 1155 1156 context = self.expect_gdbremote_sequence() 1157 self.assertIsNotNone(context) 1158 1159 process_info = self.parse_process_info_response(context) 1160 self.assertIsNotNone(process_info) 1161 endian = process_info.get("endian") 1162 self.assertIsNotNone(endian) 1163 1164 reg_infos = self.parse_register_info_packets(context) 1165 self.assertIsNotNone(reg_infos) 1166 self.add_lldb_register_index(reg_infos) 1167 1168 reg_index = self.select_modifiable_register(reg_infos) 1169 self.assertIsNotNone(reg_index) 1170 reg_byte_size = int(reg_infos[reg_index]["bitsize"]) // 8 1171 self.assertTrue(reg_byte_size > 0) 1172 1173 # Run the process a bit so threads can start up, and collect register 1174 # info. 1175 context = self.run_process_then_stop(run_seconds=1) 1176 self.assertIsNotNone(context) 1177 1178 # Wait for 3 threads to be present. 1179 threads = self.wait_for_thread_count(3) 1180 self.assertEqual(len(threads), 3) 1181 1182 expected_reg_values = [] 1183 register_increment = 1 1184 next_value = None 1185 1186 # Set the same register in each of 3 threads to a different value. 1187 # Verify each one has the unique value. 1188 for thread in threads: 1189 # If we don't have a next value yet, start it with the initial read 1190 # value + 1 1191 if not next_value: 1192 # Read pre-existing register value. 1193 self.reset_test_sequence() 1194 self.test_sequence.add_log_lines( 1195 ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread), 1196 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1197 ], True) 1198 context = self.expect_gdbremote_sequence() 1199 self.assertIsNotNone(context) 1200 1201 # Set the next value to use for writing as the increment plus 1202 # current value. 1203 p_response = context.get("p_response") 1204 self.assertIsNotNone(p_response) 1205 next_value = lldbgdbserverutils.unpack_register_hex_unsigned( 1206 endian, p_response) 1207 1208 # Set new value using P and thread suffix. 1209 self.reset_test_sequence() 1210 self.test_sequence.add_log_lines( 1211 [ 1212 "read packet: $P{0:x}={1};thread:{2:x}#00".format( 1213 reg_index, 1214 lldbgdbserverutils.pack_register_hex( 1215 endian, 1216 next_value, 1217 byte_size=reg_byte_size), 1218 thread), 1219 "send packet: $OK#00", 1220 ], 1221 True) 1222 context = self.expect_gdbremote_sequence() 1223 self.assertIsNotNone(context) 1224 1225 # Save the value we set. 1226 expected_reg_values.append(next_value) 1227 1228 # Increment value for next thread to use (we want them all 1229 # different so we can verify they wrote to each thread correctly 1230 # next.) 1231 next_value += register_increment 1232 1233 # Revisit each thread and verify they have the expected value set for 1234 # the register we wrote. 1235 thread_index = 0 1236 for thread in threads: 1237 # Read pre-existing register value. 1238 self.reset_test_sequence() 1239 self.test_sequence.add_log_lines( 1240 ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread), 1241 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1242 ], True) 1243 context = self.expect_gdbremote_sequence() 1244 self.assertIsNotNone(context) 1245 1246 # Get the register value. 1247 p_response = context.get("p_response") 1248 self.assertIsNotNone(p_response) 1249 read_value = lldbgdbserverutils.unpack_register_hex_unsigned( 1250 endian, p_response) 1251 1252 # Make sure we read back what we wrote. 1253 self.assertEqual(read_value, expected_reg_values[thread_index]) 1254 thread_index += 1 1255 1256 @skipIfWindows # No pty support to test any inferior output 1257 @add_test_categories(["llgs"]) 1258 def test_launch_via_A(self): 1259 self.build() 1260 exe_path = self.getBuildArtifact("a.out") 1261 args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"] 1262 hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args] 1263 1264 server = self.connect_to_debug_monitor() 1265 self.assertIsNotNone(server) 1266 self.do_handshake() 1267 # NB: strictly speaking we should use %x here but this packet 1268 # is deprecated, so no point in changing lldb-server's expectations 1269 self.test_sequence.add_log_lines( 1270 ["read packet: $A %d,0,%s,%d,1,%s,%d,2,%s,%d,3,%s#00" % 1271 tuple(itertools.chain.from_iterable( 1272 [(len(x), x) for x in hex_args])), 1273 "send packet: $OK#00", 1274 "read packet: $c#00", 1275 "send packet: $W00#00"], 1276 True) 1277 context = self.expect_gdbremote_sequence() 1278 self.assertEqual(context["O_content"], 1279 b'arg1\r\narg2\r\narg3\r\n') 1280 1281 @skipIfWindows # No pty support to test any inferior output 1282 @add_test_categories(["llgs"]) 1283 def test_launch_via_vRun(self): 1284 self.build() 1285 exe_path = self.getBuildArtifact("a.out") 1286 args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"] 1287 hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args] 1288 1289 server = self.connect_to_debug_monitor() 1290 self.assertIsNotNone(server) 1291 self.do_handshake() 1292 self.test_sequence.add_log_lines( 1293 ["read packet: $vRun;%s;%s;%s;%s#00" % tuple(hex_args), 1294 {"direction": "send", 1295 "regex": r"^\$T([0-9a-fA-F]+)"}, 1296 "read packet: $c#00", 1297 "send packet: $W00#00"], 1298 True) 1299 context = self.expect_gdbremote_sequence() 1300 self.assertEqual(context["O_content"], 1301 b'arg1\r\narg2\r\narg3\r\n') 1302 1303 @add_test_categories(["llgs"]) 1304 def test_launch_via_vRun_no_args(self): 1305 self.build() 1306 exe_path = self.getBuildArtifact("a.out") 1307 hex_path = binascii.b2a_hex(exe_path.encode()).decode() 1308 1309 server = self.connect_to_debug_monitor() 1310 self.assertIsNotNone(server) 1311 self.do_handshake() 1312 self.test_sequence.add_log_lines( 1313 ["read packet: $vRun;%s#00" % (hex_path,), 1314 {"direction": "send", 1315 "regex": r"^\$T([0-9a-fA-F]+)"}, 1316 "read packet: $c#00", 1317 "send packet: $W00#00"], 1318 True) 1319 self.expect_gdbremote_sequence() 1320 1321 @skipIfWindows # No pty support to test any inferior output 1322 @add_test_categories(["llgs"]) 1323 def test_QEnvironment(self): 1324 self.build() 1325 exe_path = self.getBuildArtifact("a.out") 1326 env = {"FOO": "test", "BAR": "a=z"} 1327 args = [exe_path, "print-env:FOO", "print-env:BAR"] 1328 hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args] 1329 1330 server = self.connect_to_debug_monitor() 1331 self.assertIsNotNone(server) 1332 self.do_handshake() 1333 1334 for key, value in env.items(): 1335 self.test_sequence.add_log_lines( 1336 ["read packet: $QEnvironment:%s=%s#00" % (key, value), 1337 "send packet: $OK#00"], 1338 True) 1339 self.test_sequence.add_log_lines( 1340 ["read packet: $vRun;%s#00" % (";".join(hex_args),), 1341 {"direction": "send", 1342 "regex": r"^\$T([0-9a-fA-F]+)"}, 1343 "read packet: $c#00", 1344 "send packet: $W00#00"], 1345 True) 1346 context = self.expect_gdbremote_sequence() 1347 self.assertEqual(context["O_content"], b"test\r\na=z\r\n") 1348 1349 @skipIfWindows # No pty support to test any inferior output 1350 @add_test_categories(["llgs"]) 1351 def test_QEnvironmentHexEncoded(self): 1352 self.build() 1353 exe_path = self.getBuildArtifact("a.out") 1354 env = {"FOO": "test", "BAR": "a=z", "BAZ": "a*}#z"} 1355 args = [exe_path, "print-env:FOO", "print-env:BAR", "print-env:BAZ"] 1356 hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args] 1357 1358 server = self.connect_to_debug_monitor() 1359 self.assertIsNotNone(server) 1360 self.do_handshake() 1361 1362 for key, value in env.items(): 1363 hex_enc = binascii.b2a_hex(("%s=%s" % (key, value)).encode()).decode() 1364 self.test_sequence.add_log_lines( 1365 ["read packet: $QEnvironmentHexEncoded:%s#00" % (hex_enc,), 1366 "send packet: $OK#00"], 1367 True) 1368 self.test_sequence.add_log_lines( 1369 ["read packet: $vRun;%s#00" % (";".join(hex_args),), 1370 {"direction": "send", 1371 "regex": r"^\$T([0-9a-fA-F]+)"}, 1372 "read packet: $c#00", 1373 "send packet: $W00#00"], 1374 True) 1375 context = self.expect_gdbremote_sequence() 1376 self.assertEqual(context["O_content"], b"test\r\na=z\r\na*}#z\r\n") 1377