1""" 2Test case for testing the gdbremote protocol. 3 4Tests run against debugserver and lldb-server (llgs). 5lldb-server tests run where the lldb-server exe is 6available. 7 8This class will be broken into smaller test case classes by 9gdb remote packet functional areas. For now it contains 10the initial set of tests implemented. 11""" 12 13import binascii 14import itertools 15 16import unittest2 17import gdbremote_testcase 18import lldbgdbserverutils 19from lldbsuite.support import seven 20from lldbsuite.test.decorators import * 21from lldbsuite.test.lldbtest import * 22from lldbsuite.test.lldbdwarf import * 23from lldbsuite.test import lldbutil, lldbplatformutil 24 25 26class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase, DwarfOpcodeParser): 27 28 mydir = TestBase.compute_mydir(__file__) 29 30 def test_thread_suffix_supported(self): 31 server = self.connect_to_debug_monitor() 32 self.assertIsNotNone(server) 33 34 self.do_handshake() 35 self.test_sequence.add_log_lines( 36 ["lldb-server < 26> read packet: $QThreadSuffixSupported#e4", 37 "lldb-server < 6> send packet: $OK#9a"], 38 True) 39 40 self.expect_gdbremote_sequence() 41 42 43 def test_list_threads_in_stop_reply_supported(self): 44 server = self.connect_to_debug_monitor() 45 self.assertIsNotNone(server) 46 47 self.do_handshake() 48 self.test_sequence.add_log_lines( 49 ["lldb-server < 27> read packet: $QListThreadsInStopReply#21", 50 "lldb-server < 6> send packet: $OK#9a"], 51 True) 52 self.expect_gdbremote_sequence() 53 54 def test_c_packet_works(self): 55 self.build() 56 procs = self.prep_debug_monitor_and_inferior() 57 self.test_sequence.add_log_lines( 58 ["read packet: $c#63", 59 "send packet: $W00#00"], 60 True) 61 62 self.expect_gdbremote_sequence() 63 64 @skipIfWindows # No pty support to test any inferior output 65 def test_inferior_print_exit(self): 66 self.build() 67 procs = self.prep_debug_monitor_and_inferior( 68 inferior_args=["hello, world"]) 69 self.test_sequence.add_log_lines( 70 ["read packet: $vCont;c#a8", 71 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"hello, world\r\n")}, 72 "send packet: $W00#00"], 73 True) 74 75 context = self.expect_gdbremote_sequence() 76 self.assertIsNotNone(context) 77 78 def test_first_launch_stop_reply_thread_matches_first_qC(self): 79 self.build() 80 procs = self.prep_debug_monitor_and_inferior() 81 self.test_sequence.add_log_lines(["read packet: $qC#00", 82 {"direction": "send", 83 "regex": r"^\$QC([0-9a-fA-F]+)#", 84 "capture": {1: "thread_id_QC"}}, 85 "read packet: $?#00", 86 {"direction": "send", 87 "regex": r"^\$T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+)", 88 "capture": {1: "thread_id_?"}}], 89 True) 90 context = self.expect_gdbremote_sequence() 91 self.assertEqual(context.get("thread_id_QC"), context.get("thread_id_?")) 92 93 def test_attach_commandline_continue_app_exits(self): 94 self.build() 95 self.set_inferior_startup_attach() 96 procs = self.prep_debug_monitor_and_inferior() 97 self.test_sequence.add_log_lines( 98 ["read packet: $vCont;c#a8", 99 "send packet: $W00#00"], 100 True) 101 self.expect_gdbremote_sequence() 102 103 # Wait a moment for completed and now-detached inferior process to 104 # clear. 105 time.sleep(1) 106 107 if not lldb.remote_platform: 108 # Process should be dead now. Reap results. 109 poll_result = procs["inferior"].poll() 110 self.assertIsNotNone(poll_result) 111 112 # Where possible, verify at the system level that the process is not 113 # running. 114 self.assertFalse( 115 lldbgdbserverutils.process_is_running( 116 procs["inferior"].pid, False)) 117 118 def test_qRegisterInfo_returns_one_valid_result(self): 119 self.build() 120 self.prep_debug_monitor_and_inferior() 121 self.test_sequence.add_log_lines( 122 ["read packet: $qRegisterInfo0#00", 123 {"direction": "send", "regex": r"^\$(.+);#[0-9A-Fa-f]{2}", "capture": {1: "reginfo_0"}}], 124 True) 125 126 # Run the stream 127 context = self.expect_gdbremote_sequence() 128 self.assertIsNotNone(context) 129 130 reg_info_packet = context.get("reginfo_0") 131 self.assertIsNotNone(reg_info_packet) 132 self.assert_valid_reg_info( 133 lldbgdbserverutils.parse_reg_info_response(reg_info_packet)) 134 135 def test_qRegisterInfo_returns_all_valid_results(self): 136 self.build() 137 self.prep_debug_monitor_and_inferior() 138 self.add_register_info_collection_packets() 139 140 # Run the stream. 141 context = self.expect_gdbremote_sequence() 142 self.assertIsNotNone(context) 143 144 # Validate that each register info returned validates. 145 for reg_info in self.parse_register_info_packets(context): 146 self.assert_valid_reg_info(reg_info) 147 148 def test_qRegisterInfo_contains_required_generics_debugserver(self): 149 self.build() 150 self.prep_debug_monitor_and_inferior() 151 self.add_register_info_collection_packets() 152 153 # Run the packet stream. 154 context = self.expect_gdbremote_sequence() 155 self.assertIsNotNone(context) 156 157 # Gather register info entries. 158 reg_infos = self.parse_register_info_packets(context) 159 160 # Collect all generic registers found. 161 generic_regs = { 162 reg_info['generic']: 1 for reg_info in reg_infos if 'generic' in reg_info} 163 164 # Ensure we have a program counter register. 165 self.assertIn('pc', generic_regs) 166 167 # Ensure we have a frame pointer register. PPC64le's FP is the same as SP 168 if self.getArchitecture() != 'powerpc64le': 169 self.assertIn('fp', generic_regs) 170 171 # Ensure we have a stack pointer register. 172 self.assertIn('sp', generic_regs) 173 174 # Ensure we have a flags register. 175 self.assertIn('flags', generic_regs) 176 177 def test_qRegisterInfo_contains_at_least_one_register_set(self): 178 self.build() 179 self.prep_debug_monitor_and_inferior() 180 self.add_register_info_collection_packets() 181 182 # Run the packet stream. 183 context = self.expect_gdbremote_sequence() 184 self.assertIsNotNone(context) 185 186 # Gather register info entries. 187 reg_infos = self.parse_register_info_packets(context) 188 189 # Collect all register sets found. 190 register_sets = { 191 reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info} 192 self.assertTrue(len(register_sets) >= 1) 193 194 def targetHasAVX(self): 195 triple = self.dbg.GetSelectedPlatform().GetTriple() 196 197 # TODO other platforms, please implement this function 198 if not re.match(".*-.*-linux", triple): 199 return True 200 201 # Need to do something different for non-Linux/Android targets 202 if lldb.remote_platform: 203 self.runCmd('platform get-file "/proc/cpuinfo" "cpuinfo"') 204 cpuinfo_path = "cpuinfo" 205 self.addTearDownHook(lambda: os.unlink("cpuinfo")) 206 else: 207 cpuinfo_path = "/proc/cpuinfo" 208 209 f = open(cpuinfo_path, 'r') 210 cpuinfo = f.read() 211 f.close() 212 return " avx " in cpuinfo 213 214 @expectedFailureAll(oslist=["windows"]) # no avx for now. 215 @skipIf(archs=no_match(['amd64', 'i386', 'x86_64'])) 216 @add_test_categories(["llgs"]) 217 def test_qRegisterInfo_contains_avx_registers(self): 218 self.build() 219 self.prep_debug_monitor_and_inferior() 220 self.add_register_info_collection_packets() 221 222 # Run the packet stream. 223 context = self.expect_gdbremote_sequence() 224 self.assertIsNotNone(context) 225 226 # Gather register info entries. 227 reg_infos = self.parse_register_info_packets(context) 228 229 # Collect all generics found. 230 register_sets = { 231 reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info} 232 self.assertEqual( 233 self.targetHasAVX(), 234 "Advanced Vector Extensions" in register_sets) 235 236 def qThreadInfo_contains_thread(self): 237 procs = self.prep_debug_monitor_and_inferior() 238 self.add_threadinfo_collection_packets() 239 240 # Run the packet stream. 241 context = self.expect_gdbremote_sequence() 242 self.assertIsNotNone(context) 243 244 # Gather threadinfo entries. 245 threads = self.parse_threadinfo_packets(context) 246 self.assertIsNotNone(threads) 247 248 # We should have exactly one thread. 249 self.assertEqual(len(threads), 1) 250 251 def test_qThreadInfo_contains_thread_launch(self): 252 self.build() 253 self.set_inferior_startup_launch() 254 self.qThreadInfo_contains_thread() 255 256 @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped 257 def test_qThreadInfo_contains_thread_attach(self): 258 self.build() 259 self.set_inferior_startup_attach() 260 self.qThreadInfo_contains_thread() 261 262 def qThreadInfo_matches_qC(self): 263 procs = self.prep_debug_monitor_and_inferior() 264 265 self.add_threadinfo_collection_packets() 266 self.test_sequence.add_log_lines( 267 ["read packet: $qC#00", 268 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}} 269 ], True) 270 271 # Run the packet stream. 272 context = self.expect_gdbremote_sequence() 273 self.assertIsNotNone(context) 274 275 # Gather threadinfo entries. 276 threads = self.parse_threadinfo_packets(context) 277 self.assertIsNotNone(threads) 278 279 # We should have exactly one thread from threadinfo. 280 self.assertEqual(len(threads), 1) 281 282 # We should have a valid thread_id from $QC. 283 QC_thread_id_hex = context.get("thread_id") 284 self.assertIsNotNone(QC_thread_id_hex) 285 QC_thread_id = int(QC_thread_id_hex, 16) 286 287 # Those two should be the same. 288 self.assertEqual(threads[0], QC_thread_id) 289 290 def test_qThreadInfo_matches_qC_launch(self): 291 self.build() 292 self.set_inferior_startup_launch() 293 self.qThreadInfo_matches_qC() 294 295 @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped 296 def test_qThreadInfo_matches_qC_attach(self): 297 self.build() 298 self.set_inferior_startup_attach() 299 self.qThreadInfo_matches_qC() 300 301 def test_p_returns_correct_data_size_for_each_qRegisterInfo_launch(self): 302 self.build() 303 self.set_inferior_startup_launch() 304 procs = self.prep_debug_monitor_and_inferior() 305 self.add_register_info_collection_packets() 306 307 # Run the packet stream. 308 context = self.expect_gdbremote_sequence() 309 self.assertIsNotNone(context) 310 311 # Gather register info entries. 312 reg_infos = self.parse_register_info_packets(context) 313 self.assertIsNotNone(reg_infos) 314 self.assertTrue(len(reg_infos) > 0) 315 316 byte_order = self.get_target_byte_order() 317 318 # Read value for each register. 319 reg_index = 0 320 for reg_info in reg_infos: 321 # Skip registers that don't have a register set. For x86, these are 322 # the DRx registers, which have no LLDB-kind register number and thus 323 # cannot be read via normal 324 # NativeRegisterContext::ReadRegister(reg_info,...) calls. 325 if not "set" in reg_info: 326 continue 327 328 # Clear existing packet expectations. 329 self.reset_test_sequence() 330 331 # Run the register query 332 self.test_sequence.add_log_lines( 333 ["read packet: $p{0:x}#00".format(reg_index), 334 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}], 335 True) 336 context = self.expect_gdbremote_sequence() 337 self.assertIsNotNone(context) 338 339 # Verify the response length. 340 p_response = context.get("p_response") 341 self.assertIsNotNone(p_response) 342 343 # Skip erraneous (unsupported) registers. 344 # TODO: remove this once we make unsupported registers disappear. 345 if p_response.startswith("E") and len(p_response) == 3: 346 continue 347 348 if "dynamic_size_dwarf_expr_bytes" in reg_info: 349 self.updateRegInfoBitsize(reg_info, byte_order) 350 self.assertEqual(len(p_response), 2 * int(reg_info["bitsize"]) / 8, 351 reg_info) 352 353 # Increment loop 354 reg_index += 1 355 356 def Hg_switches_to_3_threads(self, pass_pid=False): 357 # Startup the inferior with three threads (main + 2 new ones). 358 procs = self.prep_debug_monitor_and_inferior( 359 inferior_args=["thread:new", "thread:new"]) 360 361 # Let the inferior process have a few moments to start up the thread 362 # when launched. (The launch scenario has no time to run, so threads 363 # won't be there yet.) 364 self.run_process_then_stop(run_seconds=1) 365 366 # Wait at most x seconds for 3 threads to be present. 367 threads = self.wait_for_thread_count(3) 368 self.assertEqual(len(threads), 3) 369 370 pid_str = "" 371 if pass_pid: 372 pid_str = "p{0:x}.".format(procs["inferior"].pid) 373 374 # verify we can $H to each thead, and $qC matches the thread we set. 375 for thread in threads: 376 # Change to each thread, verify current thread id. 377 self.reset_test_sequence() 378 self.test_sequence.add_log_lines( 379 ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread), # Set current thread. 380 "send packet: $OK#00", 381 "read packet: $qC#00", 382 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}], 383 True) 384 385 context = self.expect_gdbremote_sequence() 386 self.assertIsNotNone(context) 387 388 # Verify the thread id. 389 self.assertIsNotNone(context.get("thread_id")) 390 self.assertEqual(int(context.get("thread_id"), 16), thread) 391 392 @expectedFailureAll(oslist=["windows"]) # expect 4 threads 393 def test_Hg_switches_to_3_threads_launch(self): 394 self.build() 395 self.set_inferior_startup_launch() 396 self.Hg_switches_to_3_threads() 397 398 @expectedFailureAll(oslist=["windows"]) # expecting one more thread 399 def test_Hg_switches_to_3_threads_attach(self): 400 self.build() 401 self.set_inferior_startup_attach() 402 self.Hg_switches_to_3_threads() 403 404 @expectedFailureAll(oslist=["windows"]) # expect 4 threads 405 @add_test_categories(["llgs"]) 406 def test_Hg_switches_to_3_threads_attach_pass_correct_pid(self): 407 self.build() 408 self.set_inferior_startup_attach() 409 self.Hg_switches_to_3_threads(pass_pid=True) 410 411 def Hg_fails_on_pid(self, pass_pid): 412 # Start the inferior. 413 procs = self.prep_debug_monitor_and_inferior( 414 inferior_args=["thread:new"]) 415 416 self.run_process_then_stop(run_seconds=1) 417 418 threads = self.wait_for_thread_count(2) 419 self.assertEqual(len(threads), 2) 420 421 if pass_pid == -1: 422 pid_str = "p-1." 423 else: 424 pid_str = "p{0:x}.".format(pass_pid) 425 thread = threads[1] 426 427 self.test_sequence.add_log_lines( 428 ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread), # Set current thread. 429 "send packet: $Eff#00"], 430 True) 431 432 self.expect_gdbremote_sequence() 433 434 @expectedFailureAll(oslist=["windows"]) 435 @add_test_categories(["llgs"]) 436 def test_Hg_fails_on_another_pid(self): 437 self.build() 438 self.set_inferior_startup_launch() 439 self.Hg_fails_on_pid(1) 440 441 @expectedFailureAll(oslist=["windows"]) 442 @add_test_categories(["llgs"]) 443 def test_Hg_fails_on_zero_pid(self): 444 self.build() 445 self.set_inferior_startup_launch() 446 self.Hg_fails_on_pid(0) 447 448 @expectedFailureAll(oslist=["windows"]) 449 @add_test_categories(["llgs"]) 450 def test_Hg_fails_on_minus_one_pid(self): 451 self.build() 452 self.set_inferior_startup_launch() 453 self.Hg_fails_on_pid(-1) 454 455 def Hc_then_Csignal_signals_correct_thread(self, segfault_signo): 456 # NOTE only run this one in inferior-launched mode: we can't grab inferior stdout when running attached, 457 # and the test requires getting stdout from the exe. 458 459 NUM_THREADS = 3 460 461 # Startup the inferior with three threads (main + NUM_THREADS-1 worker threads). 462 # inferior_args=["thread:print-ids"] 463 inferior_args = ["thread:segfault"] 464 for i in range(NUM_THREADS - 1): 465 # if i > 0: 466 # Give time between thread creation/segfaulting for the handler to work. 467 # inferior_args.append("sleep:1") 468 inferior_args.append("thread:new") 469 inferior_args.append("sleep:10") 470 471 # Launch/attach. (In our case, this should only ever be launched since 472 # we need inferior stdout/stderr). 473 procs = self.prep_debug_monitor_and_inferior( 474 inferior_args=inferior_args) 475 self.test_sequence.add_log_lines(["read packet: $c#63"], True) 476 context = self.expect_gdbremote_sequence() 477 478 # Let the inferior process have a few moments to start up the thread when launched. 479 # context = self.run_process_then_stop(run_seconds=1) 480 481 # Wait at most x seconds for all threads to be present. 482 # threads = self.wait_for_thread_count(NUM_THREADS) 483 # self.assertEquals(len(threads), NUM_THREADS) 484 485 signaled_tids = {} 486 print_thread_ids = {} 487 488 # Switch to each thread, deliver a signal, and verify signal delivery 489 for i in range(NUM_THREADS - 1): 490 # Run until SIGSEGV comes in. 491 self.reset_test_sequence() 492 self.test_sequence.add_log_lines([{"direction": "send", 493 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 494 "capture": {1: "signo", 495 2: "thread_id"}}], 496 True) 497 498 context = self.expect_gdbremote_sequence() 499 self.assertIsNotNone(context) 500 signo = context.get("signo") 501 self.assertEqual(int(signo, 16), segfault_signo) 502 503 # Ensure we haven't seen this tid yet. 504 thread_id = int(context.get("thread_id"), 16) 505 self.assertNotIn(thread_id, signaled_tids) 506 signaled_tids[thread_id] = 1 507 508 # Send SIGUSR1 to the thread that signaled the SIGSEGV. 509 self.reset_test_sequence() 510 self.test_sequence.add_log_lines( 511 [ 512 # Set the continue thread. 513 # Set current thread. 514 "read packet: $Hc{0:x}#00".format(thread_id), 515 "send packet: $OK#00", 516 517 # Continue sending the signal number to the continue thread. 518 # The commented out packet is a way to do this same operation without using 519 # a $Hc (but this test is testing $Hc, so we'll stick with the former). 520 "read packet: $C{0:x}#00".format(lldbutil.get_signal_number('SIGUSR1')), 521 # "read packet: $vCont;C{0:x}:{1:x};c#00".format(lldbutil.get_signal_number('SIGUSR1'), thread_id), 522 523 # FIXME: Linux does not report the thread stop on the delivered signal (SIGUSR1 here). MacOSX debugserver does. 524 # But MacOSX debugserver isn't guaranteeing the thread the signal handler runs on, so currently its an XFAIL. 525 # Need to rectify behavior here. The linux behavior is more intuitive to me since we're essentially swapping out 526 # an about-to-be-delivered signal (for which we already sent a stop packet) to a different signal. 527 # {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} }, 528 # "read packet: $c#63", 529 {"type": "output_match", "regex": r"^received SIGUSR1 on thread id: ([0-9a-fA-F]+)\r\nthread ([0-9a-fA-F]+): past SIGSEGV\r\n", "capture": {1: "print_thread_id", 2: "post_handle_thread_id"}}, 530 ], 531 True) 532 533 # Run the sequence. 534 context = self.expect_gdbremote_sequence() 535 self.assertIsNotNone(context) 536 537 # Ensure the stop signal is the signal we delivered. 538 # stop_signo = context.get("stop_signo") 539 # self.assertIsNotNone(stop_signo) 540 # self.assertEquals(int(stop_signo,16), lldbutil.get_signal_number('SIGUSR1')) 541 542 # Ensure the stop thread is the thread to which we delivered the signal. 543 # stop_thread_id = context.get("stop_thread_id") 544 # self.assertIsNotNone(stop_thread_id) 545 # self.assertEquals(int(stop_thread_id,16), thread_id) 546 547 # Ensure we haven't seen this thread id yet. The inferior's 548 # self-obtained thread ids are not guaranteed to match the stub 549 # tids (at least on MacOSX). 550 print_thread_id = context.get("print_thread_id") 551 self.assertIsNotNone(print_thread_id) 552 print_thread_id = int(print_thread_id, 16) 553 self.assertNotIn(print_thread_id, print_thread_ids) 554 555 # Now remember this print (i.e. inferior-reflected) thread id and 556 # ensure we don't hit it again. 557 print_thread_ids[print_thread_id] = 1 558 559 # Ensure post signal-handle thread id matches the thread that 560 # initially raised the SIGSEGV. 561 post_handle_thread_id = context.get("post_handle_thread_id") 562 self.assertIsNotNone(post_handle_thread_id) 563 post_handle_thread_id = int(post_handle_thread_id, 16) 564 self.assertEqual(post_handle_thread_id, print_thread_id) 565 566 @expectedFailureDarwin 567 @skipIfWindows # no SIGSEGV support 568 @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48419") 569 @expectedFailureNetBSD 570 def test_Hc_then_Csignal_signals_correct_thread_launch(self): 571 self.build() 572 self.set_inferior_startup_launch() 573 574 if self.platformIsDarwin(): 575 # Darwin debugserver translates some signals like SIGSEGV into some gdb 576 # expectations about fixed signal numbers. 577 self.Hc_then_Csignal_signals_correct_thread(self.TARGET_EXC_BAD_ACCESS) 578 else: 579 self.Hc_then_Csignal_signals_correct_thread( 580 lldbutil.get_signal_number('SIGSEGV')) 581 582 @skipIfWindows # No pty support to test any inferior output 583 def test_m_packet_reads_memory(self): 584 self.build() 585 self.set_inferior_startup_launch() 586 # This is the memory we will write into the inferior and then ensure we 587 # can read back with $m. 588 MEMORY_CONTENTS = "Test contents 0123456789 ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz" 589 590 # Start up the inferior. 591 procs = self.prep_debug_monitor_and_inferior( 592 inferior_args=[ 593 "set-message:%s" % 594 MEMORY_CONTENTS, 595 "get-data-address-hex:g_message", 596 "sleep:5"]) 597 598 # Run the process 599 self.test_sequence.add_log_lines( 600 [ 601 # Start running after initial stop. 602 "read packet: $c#63", 603 # Match output line that prints the memory address of the message buffer within the inferior. 604 # Note we require launch-only testing so we can get inferior otuput. 605 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"), 606 "capture": {1: "message_address"}}, 607 # Now stop the inferior. 608 "read packet: {}".format(chr(3)), 609 # And wait for the stop notification. 610 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 611 True) 612 613 # Run the packet stream. 614 context = self.expect_gdbremote_sequence() 615 self.assertIsNotNone(context) 616 617 # Grab the message address. 618 self.assertIsNotNone(context.get("message_address")) 619 message_address = int(context.get("message_address"), 16) 620 621 # Grab contents from the inferior. 622 self.reset_test_sequence() 623 self.test_sequence.add_log_lines( 624 ["read packet: $m{0:x},{1:x}#00".format(message_address, len(MEMORY_CONTENTS)), 625 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "read_contents"}}], 626 True) 627 628 # Run the packet stream. 629 context = self.expect_gdbremote_sequence() 630 self.assertIsNotNone(context) 631 632 # Ensure what we read from inferior memory is what we wrote. 633 self.assertIsNotNone(context.get("read_contents")) 634 read_contents = seven.unhexlify(context.get("read_contents")) 635 self.assertEqual(read_contents, MEMORY_CONTENTS) 636 637 def test_qMemoryRegionInfo_is_supported(self): 638 self.build() 639 self.set_inferior_startup_launch() 640 # Start up the inferior. 641 procs = self.prep_debug_monitor_and_inferior() 642 643 # Ask if it supports $qMemoryRegionInfo. 644 self.test_sequence.add_log_lines( 645 ["read packet: $qMemoryRegionInfo#00", 646 "send packet: $OK#00" 647 ], True) 648 self.expect_gdbremote_sequence() 649 650 @skipIfWindows # No pty support to test any inferior output 651 def test_qMemoryRegionInfo_reports_code_address_as_executable(self): 652 self.build() 653 self.set_inferior_startup_launch() 654 655 # Start up the inferior. 656 procs = self.prep_debug_monitor_and_inferior( 657 inferior_args=["get-code-address-hex:hello", "sleep:5"]) 658 659 # Run the process 660 self.test_sequence.add_log_lines( 661 [ 662 # Start running after initial stop. 663 "read packet: $c#63", 664 # Match output line that prints the memory address of the message buffer within the inferior. 665 # Note we require launch-only testing so we can get inferior otuput. 666 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"), 667 "capture": {1: "code_address"}}, 668 # Now stop the inferior. 669 "read packet: {}".format(chr(3)), 670 # And wait for the stop notification. 671 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 672 True) 673 674 # Run the packet stream. 675 context = self.expect_gdbremote_sequence() 676 self.assertIsNotNone(context) 677 678 # Grab the code address. 679 self.assertIsNotNone(context.get("code_address")) 680 code_address = int(context.get("code_address"), 16) 681 682 # Grab memory region info from the inferior. 683 self.reset_test_sequence() 684 self.add_query_memory_region_packets(code_address) 685 686 # Run the packet stream. 687 context = self.expect_gdbremote_sequence() 688 self.assertIsNotNone(context) 689 mem_region_dict = self.parse_memory_region_packet(context) 690 691 # Ensure there are no errors reported. 692 self.assertNotIn("error", mem_region_dict) 693 694 # Ensure code address is readable and executable. 695 self.assertIn("permissions", mem_region_dict) 696 self.assertIn("r", mem_region_dict["permissions"]) 697 self.assertIn("x", mem_region_dict["permissions"]) 698 699 # Ensure the start address and size encompass the address we queried. 700 self.assert_address_within_memory_region(code_address, mem_region_dict) 701 702 @skipIfWindows # No pty support to test any inferior output 703 def test_qMemoryRegionInfo_reports_stack_address_as_rw(self): 704 self.build() 705 self.set_inferior_startup_launch() 706 707 # Start up the inferior. 708 procs = self.prep_debug_monitor_and_inferior( 709 inferior_args=["get-stack-address-hex:", "sleep:5"]) 710 711 # Run the process 712 self.test_sequence.add_log_lines( 713 [ 714 # Start running after initial stop. 715 "read packet: $c#63", 716 # Match output line that prints the memory address of the message buffer within the inferior. 717 # Note we require launch-only testing so we can get inferior otuput. 718 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"stack address: 0x([0-9a-fA-F]+)\r\n"), 719 "capture": {1: "stack_address"}}, 720 # Now stop the inferior. 721 "read packet: {}".format(chr(3)), 722 # And wait for the stop notification. 723 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 724 True) 725 726 # Run the packet stream. 727 context = self.expect_gdbremote_sequence() 728 self.assertIsNotNone(context) 729 730 # Grab the address. 731 self.assertIsNotNone(context.get("stack_address")) 732 stack_address = int(context.get("stack_address"), 16) 733 734 # Grab memory region info from the inferior. 735 self.reset_test_sequence() 736 self.add_query_memory_region_packets(stack_address) 737 738 # Run the packet stream. 739 context = self.expect_gdbremote_sequence() 740 self.assertIsNotNone(context) 741 mem_region_dict = self.parse_memory_region_packet(context) 742 743 # Ensure there are no errors reported. 744 self.assertNotIn("error", mem_region_dict) 745 746 # Ensure address is readable and executable. 747 self.assertIn("permissions", mem_region_dict) 748 self.assertIn("r", mem_region_dict["permissions"]) 749 self.assertIn("w", mem_region_dict["permissions"]) 750 751 # Ensure the start address and size encompass the address we queried. 752 self.assert_address_within_memory_region( 753 stack_address, mem_region_dict) 754 755 @skipIfWindows # No pty support to test any inferior output 756 def test_qMemoryRegionInfo_reports_heap_address_as_rw(self): 757 self.build() 758 self.set_inferior_startup_launch() 759 760 # Start up the inferior. 761 procs = self.prep_debug_monitor_and_inferior( 762 inferior_args=["get-heap-address-hex:", "sleep:5"]) 763 764 # Run the process 765 self.test_sequence.add_log_lines( 766 [ 767 # Start running after initial stop. 768 "read packet: $c#63", 769 # Match output line that prints the memory address of the message buffer within the inferior. 770 # Note we require launch-only testing so we can get inferior otuput. 771 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"heap address: 0x([0-9a-fA-F]+)\r\n"), 772 "capture": {1: "heap_address"}}, 773 # Now stop the inferior. 774 "read packet: {}".format(chr(3)), 775 # And wait for the stop notification. 776 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 777 True) 778 779 # Run the packet stream. 780 context = self.expect_gdbremote_sequence() 781 self.assertIsNotNone(context) 782 783 # Grab the address. 784 self.assertIsNotNone(context.get("heap_address")) 785 heap_address = int(context.get("heap_address"), 16) 786 787 # Grab memory region info from the inferior. 788 self.reset_test_sequence() 789 self.add_query_memory_region_packets(heap_address) 790 791 # Run the packet stream. 792 context = self.expect_gdbremote_sequence() 793 self.assertIsNotNone(context) 794 mem_region_dict = self.parse_memory_region_packet(context) 795 796 # Ensure there are no errors reported. 797 self.assertNotIn("error", mem_region_dict) 798 799 # Ensure address is readable and executable. 800 self.assertIn("permissions", mem_region_dict) 801 self.assertIn("r", mem_region_dict["permissions"]) 802 self.assertIn("w", mem_region_dict["permissions"]) 803 804 # Ensure the start address and size encompass the address we queried. 805 self.assert_address_within_memory_region(heap_address, mem_region_dict) 806 807 def breakpoint_set_and_remove_work(self, want_hardware): 808 # Start up the inferior. 809 procs = self.prep_debug_monitor_and_inferior( 810 inferior_args=[ 811 "get-code-address-hex:hello", 812 "sleep:1", 813 "call-function:hello"]) 814 815 # Run the process 816 self.add_register_info_collection_packets() 817 self.add_process_info_collection_packets() 818 self.test_sequence.add_log_lines( 819 [ # Start running after initial stop. 820 "read packet: $c#63", 821 # Match output line that prints the memory address of the function call entry point. 822 # Note we require launch-only testing so we can get inferior otuput. 823 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"), 824 "capture": {1: "function_address"}}, 825 # Now stop the inferior. 826 "read packet: {}".format(chr(3)), 827 # And wait for the stop notification. 828 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 829 True) 830 831 # Run the packet stream. 832 context = self.expect_gdbremote_sequence() 833 self.assertIsNotNone(context) 834 835 # Gather process info - we need endian of target to handle register 836 # value conversions. 837 process_info = self.parse_process_info_response(context) 838 endian = process_info.get("endian") 839 self.assertIsNotNone(endian) 840 841 # Gather register info entries. 842 reg_infos = self.parse_register_info_packets(context) 843 (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_infos) 844 self.assertIsNotNone(pc_lldb_reg_index) 845 self.assertIsNotNone(pc_reg_info) 846 847 # Grab the function address. 848 self.assertIsNotNone(context.get("function_address")) 849 function_address = int(context.get("function_address"), 16) 850 851 # Get current target architecture 852 target_arch = self.getArchitecture() 853 854 # Set the breakpoint. 855 if (target_arch == "arm") or (target_arch == "aarch64"): 856 # TODO: Handle case when setting breakpoint in thumb code 857 BREAKPOINT_KIND = 4 858 else: 859 BREAKPOINT_KIND = 1 860 861 # Set default packet type to Z0 (software breakpoint) 862 z_packet_type = 0 863 864 # If hardware breakpoint is requested set packet type to Z1 865 if want_hardware == True: 866 z_packet_type = 1 867 868 self.reset_test_sequence() 869 self.add_set_breakpoint_packets( 870 function_address, 871 z_packet_type, 872 do_continue=True, 873 breakpoint_kind=BREAKPOINT_KIND) 874 875 # Run the packet stream. 876 context = self.expect_gdbremote_sequence() 877 self.assertIsNotNone(context) 878 879 # Verify the stop signal reported was the breakpoint signal number. 880 stop_signo = context.get("stop_signo") 881 self.assertIsNotNone(stop_signo) 882 self.assertEqual(int(stop_signo, 16), 883 lldbutil.get_signal_number('SIGTRAP')) 884 885 # Ensure we did not receive any output. If the breakpoint was not set, we would 886 # see output (from a launched process with captured stdio) printing a hello, world message. 887 # That would indicate the breakpoint didn't take. 888 self.assertEqual(len(context["O_content"]), 0) 889 890 # Verify that the PC for the main thread is where we expect it - right at the breakpoint address. 891 # This acts as a another validation on the register reading code. 892 self.reset_test_sequence() 893 self.test_sequence.add_log_lines( 894 [ 895 # Print the PC. This should match the breakpoint address. 896 "read packet: $p{0:x}#00".format(pc_lldb_reg_index), 897 # Capture $p results. 898 {"direction": "send", 899 "regex": r"^\$([0-9a-fA-F]+)#", 900 "capture": {1: "p_response"}}, 901 ], True) 902 903 context = self.expect_gdbremote_sequence() 904 self.assertIsNotNone(context) 905 906 # Verify the PC is where we expect. Note response is in endianness of 907 # the inferior. 908 p_response = context.get("p_response") 909 self.assertIsNotNone(p_response) 910 911 # Convert from target endian to int. 912 returned_pc = lldbgdbserverutils.unpack_register_hex_unsigned( 913 endian, p_response) 914 self.assertEqual(returned_pc, function_address) 915 916 # Verify that a breakpoint remove and continue gets us the expected 917 # output. 918 self.reset_test_sequence() 919 920 # Add breakpoint remove packets 921 self.add_remove_breakpoint_packets( 922 function_address, 923 z_packet_type, 924 breakpoint_kind=BREAKPOINT_KIND) 925 926 self.test_sequence.add_log_lines( 927 [ 928 # Continue running. 929 "read packet: $c#63", 930 # We should now receive the output from the call. 931 {"type": "output_match", "regex": r"^hello, world\r\n$"}, 932 # And wait for program completion. 933 {"direction": "send", "regex": r"^\$W00(.*)#[0-9a-fA-F]{2}$"}, 934 ], True) 935 936 context = self.expect_gdbremote_sequence() 937 self.assertIsNotNone(context) 938 939 @skipIfWindows # No pty support to test any inferior output 940 def test_software_breakpoint_set_and_remove_work(self): 941 if self.getArchitecture() == "arm": 942 # TODO: Handle case when setting breakpoint in thumb code 943 self.build(dictionary={'CFLAGS_EXTRAS': '-marm'}) 944 else: 945 self.build() 946 self.set_inferior_startup_launch() 947 self.breakpoint_set_and_remove_work(want_hardware=False) 948 949 @skipUnlessPlatform(oslist=['linux']) 950 @skipIf(archs=no_match(['arm', 'aarch64'])) 951 def test_hardware_breakpoint_set_and_remove_work(self): 952 if self.getArchitecture() == "arm": 953 # TODO: Handle case when setting breakpoint in thumb code 954 self.build(dictionary={'CFLAGS_EXTRAS': '-marm'}) 955 else: 956 self.build() 957 self.set_inferior_startup_launch() 958 self.breakpoint_set_and_remove_work(want_hardware=True) 959 960 def get_qSupported_dict(self, features=[]): 961 self.build() 962 self.set_inferior_startup_launch() 963 964 # Start up the stub and start/prep the inferior. 965 procs = self.prep_debug_monitor_and_inferior() 966 self.add_qSupported_packets(features) 967 968 # Run the packet stream. 969 context = self.expect_gdbremote_sequence() 970 self.assertIsNotNone(context) 971 972 # Retrieve the qSupported features. 973 return self.parse_qSupported_response(context) 974 975 def test_qSupported_returns_known_stub_features(self): 976 supported_dict = self.get_qSupported_dict() 977 self.assertIsNotNone(supported_dict) 978 self.assertTrue(len(supported_dict) > 0) 979 980 def test_qSupported_auvx(self): 981 expected = ('+' if lldbplatformutil.getPlatform() 982 in ["freebsd", "linux", "netbsd"] else '-') 983 supported_dict = self.get_qSupported_dict() 984 self.assertEqual(supported_dict.get('qXfer:auxv:read', '-'), expected) 985 986 def test_qSupported_libraries_svr4(self): 987 expected = ('+' if lldbplatformutil.getPlatform() 988 in ["freebsd", "linux", "netbsd"] else '-') 989 supported_dict = self.get_qSupported_dict() 990 self.assertEqual(supported_dict.get('qXfer:libraries-svr4:read', '-'), 991 expected) 992 993 def test_qSupported_QPassSignals(self): 994 expected = ('+' if lldbplatformutil.getPlatform() 995 in ["freebsd", "linux", "netbsd"] else '-') 996 supported_dict = self.get_qSupported_dict() 997 self.assertEqual(supported_dict.get('QPassSignals', '-'), expected) 998 999 @add_test_categories(["fork"]) 1000 def test_qSupported_fork_events(self): 1001 supported_dict = ( 1002 self.get_qSupported_dict(['multiprocess+', 'fork-events+'])) 1003 self.assertEqual(supported_dict.get('multiprocess', '-'), '+') 1004 self.assertEqual(supported_dict.get('fork-events', '-'), '+') 1005 self.assertEqual(supported_dict.get('vfork-events', '-'), '-') 1006 1007 @add_test_categories(["fork"]) 1008 def test_qSupported_fork_events_without_multiprocess(self): 1009 supported_dict = ( 1010 self.get_qSupported_dict(['fork-events+'])) 1011 self.assertEqual(supported_dict.get('multiprocess', '-'), '-') 1012 self.assertEqual(supported_dict.get('fork-events', '-'), '-') 1013 self.assertEqual(supported_dict.get('vfork-events', '-'), '-') 1014 1015 @add_test_categories(["fork"]) 1016 def test_qSupported_vfork_events(self): 1017 supported_dict = ( 1018 self.get_qSupported_dict(['multiprocess+', 'vfork-events+'])) 1019 self.assertEqual(supported_dict.get('multiprocess', '-'), '+') 1020 self.assertEqual(supported_dict.get('fork-events', '-'), '-') 1021 self.assertEqual(supported_dict.get('vfork-events', '-'), '+') 1022 1023 @add_test_categories(["fork"]) 1024 def test_qSupported_vfork_events_without_multiprocess(self): 1025 supported_dict = ( 1026 self.get_qSupported_dict(['vfork-events+'])) 1027 self.assertEqual(supported_dict.get('multiprocess', '-'), '-') 1028 self.assertEqual(supported_dict.get('fork-events', '-'), '-') 1029 self.assertEqual(supported_dict.get('vfork-events', '-'), '-') 1030 1031 # We need to be able to self.runCmd to get cpuinfo, 1032 # which is not possible when using a remote platform. 1033 @skipIfRemote 1034 def test_qSupported_memory_tagging(self): 1035 supported_dict = self.get_qSupported_dict() 1036 self.assertEqual(supported_dict.get("memory-tagging", '-'), 1037 '+' if self.isAArch64MTE() else '-') 1038 1039 @skipIfWindows # No pty support to test any inferior output 1040 def test_written_M_content_reads_back_correctly(self): 1041 self.build() 1042 self.set_inferior_startup_launch() 1043 1044 TEST_MESSAGE = "Hello, memory" 1045 1046 # Start up the stub and start/prep the inferior. 1047 procs = self.prep_debug_monitor_and_inferior( 1048 inferior_args=[ 1049 "set-message:xxxxxxxxxxxxxX", 1050 "get-data-address-hex:g_message", 1051 "sleep:1", 1052 "print-message:"]) 1053 self.test_sequence.add_log_lines( 1054 [ 1055 # Start running after initial stop. 1056 "read packet: $c#63", 1057 # Match output line that prints the memory address of the message buffer within the inferior. 1058 # Note we require launch-only testing so we can get inferior otuput. 1059 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"), 1060 "capture": {1: "message_address"}}, 1061 # Now stop the inferior. 1062 "read packet: {}".format(chr(3)), 1063 # And wait for the stop notification. 1064 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 1065 True) 1066 context = self.expect_gdbremote_sequence() 1067 self.assertIsNotNone(context) 1068 1069 # Grab the message address. 1070 self.assertIsNotNone(context.get("message_address")) 1071 message_address = int(context.get("message_address"), 16) 1072 1073 # Hex-encode the test message, adding null termination. 1074 hex_encoded_message = seven.hexlify(TEST_MESSAGE) 1075 1076 # Write the message to the inferior. Verify that we can read it with the hex-encoded (m) 1077 # and binary (x) memory read packets. 1078 self.reset_test_sequence() 1079 self.test_sequence.add_log_lines( 1080 ["read packet: $M{0:x},{1:x}:{2}#00".format(message_address, len(TEST_MESSAGE), hex_encoded_message), 1081 "send packet: $OK#00", 1082 "read packet: $m{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)), 1083 "send packet: ${0}#00".format(hex_encoded_message), 1084 "read packet: $x{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)), 1085 "send packet: ${0}#00".format(TEST_MESSAGE), 1086 "read packet: $m{0:x},4#00".format(message_address), 1087 "send packet: ${0}#00".format(hex_encoded_message[0:8]), 1088 "read packet: $x{0:x},4#00".format(message_address), 1089 "send packet: ${0}#00".format(TEST_MESSAGE[0:4]), 1090 "read packet: $c#63", 1091 {"type": "output_match", "regex": r"^message: (.+)\r\n$", "capture": {1: "printed_message"}}, 1092 "send packet: $W00#00", 1093 ], True) 1094 context = self.expect_gdbremote_sequence() 1095 self.assertIsNotNone(context) 1096 1097 # Ensure what we read from inferior memory is what we wrote. 1098 printed_message = context.get("printed_message") 1099 self.assertIsNotNone(printed_message) 1100 self.assertEqual(printed_message, TEST_MESSAGE + "X") 1101 1102 # Note: as of this moment, a hefty number of the GPR writes are failing with E32 (everything except rax-rdx, rdi, rsi, rbp). 1103 # Come back to this. I have the test rigged to verify that at least some 1104 # of the bit-flip writes work. 1105 def test_P_writes_all_gpr_registers(self): 1106 self.build() 1107 self.set_inferior_startup_launch() 1108 1109 # Start inferior debug session, grab all register info. 1110 procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"]) 1111 self.add_register_info_collection_packets() 1112 self.add_process_info_collection_packets() 1113 1114 context = self.expect_gdbremote_sequence() 1115 self.assertIsNotNone(context) 1116 1117 # Process register infos. 1118 reg_infos = self.parse_register_info_packets(context) 1119 self.assertIsNotNone(reg_infos) 1120 self.add_lldb_register_index(reg_infos) 1121 1122 # Process endian. 1123 process_info = self.parse_process_info_response(context) 1124 endian = process_info.get("endian") 1125 self.assertIsNotNone(endian) 1126 1127 # Pull out the register infos that we think we can bit flip 1128 # successfully,. 1129 gpr_reg_infos = [ 1130 reg_info for reg_info in reg_infos if self.is_bit_flippable_register(reg_info)] 1131 self.assertTrue(len(gpr_reg_infos) > 0) 1132 1133 # Write flipped bit pattern of existing value to each register. 1134 (successful_writes, failed_writes) = self.flip_all_bits_in_each_register_value( 1135 gpr_reg_infos, endian) 1136 self.trace("successful writes: {}, failed writes: {}".format(successful_writes, failed_writes)) 1137 self.assertTrue(successful_writes > 0) 1138 1139 # Note: as of this moment, a hefty number of the GPR writes are failing 1140 # with E32 (everything except rax-rdx, rdi, rsi, rbp). 1141 @skipIfWindows 1142 def test_P_and_p_thread_suffix_work(self): 1143 self.build() 1144 self.set_inferior_startup_launch() 1145 1146 # Startup the inferior with three threads. 1147 procs = self.prep_debug_monitor_and_inferior( 1148 inferior_args=["thread:new", "thread:new"]) 1149 self.add_thread_suffix_request_packets() 1150 self.add_register_info_collection_packets() 1151 self.add_process_info_collection_packets() 1152 1153 context = self.expect_gdbremote_sequence() 1154 self.assertIsNotNone(context) 1155 1156 process_info = self.parse_process_info_response(context) 1157 self.assertIsNotNone(process_info) 1158 endian = process_info.get("endian") 1159 self.assertIsNotNone(endian) 1160 1161 reg_infos = self.parse_register_info_packets(context) 1162 self.assertIsNotNone(reg_infos) 1163 self.add_lldb_register_index(reg_infos) 1164 1165 reg_index = self.select_modifiable_register(reg_infos) 1166 self.assertIsNotNone(reg_index) 1167 reg_byte_size = int(reg_infos[reg_index]["bitsize"]) // 8 1168 self.assertTrue(reg_byte_size > 0) 1169 1170 # Run the process a bit so threads can start up, and collect register 1171 # info. 1172 context = self.run_process_then_stop(run_seconds=1) 1173 self.assertIsNotNone(context) 1174 1175 # Wait for 3 threads to be present. 1176 threads = self.wait_for_thread_count(3) 1177 self.assertEqual(len(threads), 3) 1178 1179 expected_reg_values = [] 1180 register_increment = 1 1181 next_value = None 1182 1183 # Set the same register in each of 3 threads to a different value. 1184 # Verify each one has the unique value. 1185 for thread in threads: 1186 # If we don't have a next value yet, start it with the initial read 1187 # value + 1 1188 if not next_value: 1189 # Read pre-existing register value. 1190 self.reset_test_sequence() 1191 self.test_sequence.add_log_lines( 1192 ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread), 1193 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1194 ], True) 1195 context = self.expect_gdbremote_sequence() 1196 self.assertIsNotNone(context) 1197 1198 # Set the next value to use for writing as the increment plus 1199 # current value. 1200 p_response = context.get("p_response") 1201 self.assertIsNotNone(p_response) 1202 next_value = lldbgdbserverutils.unpack_register_hex_unsigned( 1203 endian, p_response) 1204 1205 # Set new value using P and thread suffix. 1206 self.reset_test_sequence() 1207 self.test_sequence.add_log_lines( 1208 [ 1209 "read packet: $P{0:x}={1};thread:{2:x}#00".format( 1210 reg_index, 1211 lldbgdbserverutils.pack_register_hex( 1212 endian, 1213 next_value, 1214 byte_size=reg_byte_size), 1215 thread), 1216 "send packet: $OK#00", 1217 ], 1218 True) 1219 context = self.expect_gdbremote_sequence() 1220 self.assertIsNotNone(context) 1221 1222 # Save the value we set. 1223 expected_reg_values.append(next_value) 1224 1225 # Increment value for next thread to use (we want them all 1226 # different so we can verify they wrote to each thread correctly 1227 # next.) 1228 next_value += register_increment 1229 1230 # Revisit each thread and verify they have the expected value set for 1231 # the register we wrote. 1232 thread_index = 0 1233 for thread in threads: 1234 # Read pre-existing register value. 1235 self.reset_test_sequence() 1236 self.test_sequence.add_log_lines( 1237 ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread), 1238 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1239 ], True) 1240 context = self.expect_gdbremote_sequence() 1241 self.assertIsNotNone(context) 1242 1243 # Get the register value. 1244 p_response = context.get("p_response") 1245 self.assertIsNotNone(p_response) 1246 read_value = lldbgdbserverutils.unpack_register_hex_unsigned( 1247 endian, p_response) 1248 1249 # Make sure we read back what we wrote. 1250 self.assertEqual(read_value, expected_reg_values[thread_index]) 1251 thread_index += 1 1252 1253 @skipIfWindows # No pty support to test any inferior output 1254 @add_test_categories(["llgs"]) 1255 def test_launch_via_A(self): 1256 self.build() 1257 exe_path = self.getBuildArtifact("a.out") 1258 args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"] 1259 hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args] 1260 1261 server = self.connect_to_debug_monitor() 1262 self.assertIsNotNone(server) 1263 self.do_handshake() 1264 # NB: strictly speaking we should use %x here but this packet 1265 # is deprecated, so no point in changing lldb-server's expectations 1266 self.test_sequence.add_log_lines( 1267 ["read packet: $A %d,0,%s,%d,1,%s,%d,2,%s,%d,3,%s#00" % 1268 tuple(itertools.chain.from_iterable( 1269 [(len(x), x) for x in hex_args])), 1270 "send packet: $OK#00", 1271 "read packet: $c#00", 1272 "send packet: $W00#00"], 1273 True) 1274 context = self.expect_gdbremote_sequence() 1275 self.assertEqual(context["O_content"], 1276 b'arg1\r\narg2\r\narg3\r\n') 1277 1278 @skipIfWindows # No pty support to test any inferior output 1279 @add_test_categories(["llgs"]) 1280 def test_launch_via_vRun(self): 1281 self.build() 1282 exe_path = self.getBuildArtifact("a.out") 1283 args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"] 1284 hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args] 1285 1286 server = self.connect_to_debug_monitor() 1287 self.assertIsNotNone(server) 1288 self.do_handshake() 1289 self.test_sequence.add_log_lines( 1290 ["read packet: $vRun;%s;%s;%s;%s#00" % tuple(hex_args), 1291 {"direction": "send", 1292 "regex": r"^\$T([0-9a-fA-F]+)"}, 1293 "read packet: $c#00", 1294 "send packet: $W00#00"], 1295 True) 1296 context = self.expect_gdbremote_sequence() 1297 self.assertEqual(context["O_content"], 1298 b'arg1\r\narg2\r\narg3\r\n') 1299 1300 @add_test_categories(["llgs"]) 1301 def test_launch_via_vRun_no_args(self): 1302 self.build() 1303 exe_path = self.getBuildArtifact("a.out") 1304 hex_path = binascii.b2a_hex(exe_path.encode()).decode() 1305 1306 server = self.connect_to_debug_monitor() 1307 self.assertIsNotNone(server) 1308 self.do_handshake() 1309 self.test_sequence.add_log_lines( 1310 ["read packet: $vRun;%s#00" % (hex_path,), 1311 {"direction": "send", 1312 "regex": r"^\$T([0-9a-fA-F]+)"}, 1313 "read packet: $c#00", 1314 "send packet: $W00#00"], 1315 True) 1316 self.expect_gdbremote_sequence() 1317 1318 @skipIfWindows # No pty support to test any inferior output 1319 @add_test_categories(["llgs"]) 1320 def test_QEnvironment(self): 1321 self.build() 1322 exe_path = self.getBuildArtifact("a.out") 1323 env = {"FOO": "test", "BAR": "a=z"} 1324 args = [exe_path, "print-env:FOO", "print-env:BAR"] 1325 hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args] 1326 1327 server = self.connect_to_debug_monitor() 1328 self.assertIsNotNone(server) 1329 self.do_handshake() 1330 1331 for key, value in env.items(): 1332 self.test_sequence.add_log_lines( 1333 ["read packet: $QEnvironment:%s=%s#00" % (key, value), 1334 "send packet: $OK#00"], 1335 True) 1336 self.test_sequence.add_log_lines( 1337 ["read packet: $vRun;%s#00" % (";".join(hex_args),), 1338 {"direction": "send", 1339 "regex": r"^\$T([0-9a-fA-F]+)"}, 1340 "read packet: $c#00", 1341 "send packet: $W00#00"], 1342 True) 1343 context = self.expect_gdbremote_sequence() 1344 self.assertEqual(context["O_content"], b"test\r\na=z\r\n") 1345 1346 @skipIfWindows # No pty support to test any inferior output 1347 @add_test_categories(["llgs"]) 1348 def test_QEnvironmentHexEncoded(self): 1349 self.build() 1350 exe_path = self.getBuildArtifact("a.out") 1351 env = {"FOO": "test", "BAR": "a=z", "BAZ": "a*}#z"} 1352 args = [exe_path, "print-env:FOO", "print-env:BAR", "print-env:BAZ"] 1353 hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args] 1354 1355 server = self.connect_to_debug_monitor() 1356 self.assertIsNotNone(server) 1357 self.do_handshake() 1358 1359 for key, value in env.items(): 1360 hex_enc = binascii.b2a_hex(("%s=%s" % (key, value)).encode()).decode() 1361 self.test_sequence.add_log_lines( 1362 ["read packet: $QEnvironmentHexEncoded:%s#00" % (hex_enc,), 1363 "send packet: $OK#00"], 1364 True) 1365 self.test_sequence.add_log_lines( 1366 ["read packet: $vRun;%s#00" % (";".join(hex_args),), 1367 {"direction": "send", 1368 "regex": r"^\$T([0-9a-fA-F]+)"}, 1369 "read packet: $c#00", 1370 "send packet: $W00#00"], 1371 True) 1372 context = self.expect_gdbremote_sequence() 1373 self.assertEqual(context["O_content"], b"test\r\na=z\r\na*}#z\r\n") 1374