1""" 2Test case for testing the gdbremote protocol. 3 4Tests run against debugserver and lldb-server (llgs). 5lldb-server tests run where the lldb-server exe is 6available. 7 8This class will be broken into smaller test case classes by 9gdb remote packet functional areas. For now it contains 10the initial set of tests implemented. 11""" 12 13import unittest2 14import gdbremote_testcase 15import lldbgdbserverutils 16from lldbsuite.support import seven 17from lldbsuite.test.decorators import * 18from lldbsuite.test.lldbtest import * 19from lldbsuite.test.lldbdwarf import * 20from lldbsuite.test import lldbutil, lldbplatformutil 21 22 23class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase, DwarfOpcodeParser): 24 25 mydir = TestBase.compute_mydir(__file__) 26 27 def test_thread_suffix_supported(self): 28 server = self.connect_to_debug_monitor() 29 self.assertIsNotNone(server) 30 31 self.do_handshake() 32 self.test_sequence.add_log_lines( 33 ["lldb-server < 26> read packet: $QThreadSuffixSupported#e4", 34 "lldb-server < 6> send packet: $OK#9a"], 35 True) 36 37 self.expect_gdbremote_sequence() 38 39 40 def test_list_threads_in_stop_reply_supported(self): 41 server = self.connect_to_debug_monitor() 42 self.assertIsNotNone(server) 43 44 self.do_handshake() 45 self.test_sequence.add_log_lines( 46 ["lldb-server < 27> read packet: $QListThreadsInStopReply#21", 47 "lldb-server < 6> send packet: $OK#9a"], 48 True) 49 self.expect_gdbremote_sequence() 50 51 def test_c_packet_works(self): 52 self.build() 53 procs = self.prep_debug_monitor_and_inferior() 54 self.test_sequence.add_log_lines( 55 ["read packet: $c#63", 56 "send packet: $W00#00"], 57 True) 58 59 self.expect_gdbremote_sequence() 60 61 @skipIfWindows # No pty support to test any inferior output 62 def test_inferior_print_exit(self): 63 self.build() 64 procs = self.prep_debug_monitor_and_inferior( 65 inferior_args=["hello, world"]) 66 self.test_sequence.add_log_lines( 67 ["read packet: $vCont;c#a8", 68 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"hello, world\r\n")}, 69 "send packet: $W00#00"], 70 True) 71 72 context = self.expect_gdbremote_sequence() 73 self.assertIsNotNone(context) 74 75 def test_first_launch_stop_reply_thread_matches_first_qC(self): 76 self.build() 77 procs = self.prep_debug_monitor_and_inferior() 78 self.test_sequence.add_log_lines(["read packet: $qC#00", 79 {"direction": "send", 80 "regex": r"^\$QC([0-9a-fA-F]+)#", 81 "capture": {1: "thread_id_QC"}}, 82 "read packet: $?#00", 83 {"direction": "send", 84 "regex": r"^\$T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+)", 85 "capture": {1: "thread_id_?"}}], 86 True) 87 context = self.expect_gdbremote_sequence() 88 self.assertEqual(context.get("thread_id_QC"), context.get("thread_id_?")) 89 90 def test_attach_commandline_continue_app_exits(self): 91 self.build() 92 self.set_inferior_startup_attach() 93 procs = self.prep_debug_monitor_and_inferior() 94 self.test_sequence.add_log_lines( 95 ["read packet: $vCont;c#a8", 96 "send packet: $W00#00"], 97 True) 98 self.expect_gdbremote_sequence() 99 100 # Wait a moment for completed and now-detached inferior process to 101 # clear. 102 time.sleep(1) 103 104 if not lldb.remote_platform: 105 # Process should be dead now. Reap results. 106 poll_result = procs["inferior"].poll() 107 self.assertIsNotNone(poll_result) 108 109 # Where possible, verify at the system level that the process is not 110 # running. 111 self.assertFalse( 112 lldbgdbserverutils.process_is_running( 113 procs["inferior"].pid, False)) 114 115 def test_qRegisterInfo_returns_one_valid_result(self): 116 self.build() 117 self.prep_debug_monitor_and_inferior() 118 self.test_sequence.add_log_lines( 119 ["read packet: $qRegisterInfo0#00", 120 {"direction": "send", "regex": r"^\$(.+);#[0-9A-Fa-f]{2}", "capture": {1: "reginfo_0"}}], 121 True) 122 123 # Run the stream 124 context = self.expect_gdbremote_sequence() 125 self.assertIsNotNone(context) 126 127 reg_info_packet = context.get("reginfo_0") 128 self.assertIsNotNone(reg_info_packet) 129 self.assert_valid_reg_info( 130 lldbgdbserverutils.parse_reg_info_response(reg_info_packet)) 131 132 def test_qRegisterInfo_returns_all_valid_results(self): 133 self.build() 134 self.prep_debug_monitor_and_inferior() 135 self.add_register_info_collection_packets() 136 137 # Run the stream. 138 context = self.expect_gdbremote_sequence() 139 self.assertIsNotNone(context) 140 141 # Validate that each register info returned validates. 142 for reg_info in self.parse_register_info_packets(context): 143 self.assert_valid_reg_info(reg_info) 144 145 def test_qRegisterInfo_contains_required_generics_debugserver(self): 146 self.build() 147 self.prep_debug_monitor_and_inferior() 148 self.add_register_info_collection_packets() 149 150 # Run the packet stream. 151 context = self.expect_gdbremote_sequence() 152 self.assertIsNotNone(context) 153 154 # Gather register info entries. 155 reg_infos = self.parse_register_info_packets(context) 156 157 # Collect all generic registers found. 158 generic_regs = { 159 reg_info['generic']: 1 for reg_info in reg_infos if 'generic' in reg_info} 160 161 # Ensure we have a program counter register. 162 self.assertIn('pc', generic_regs) 163 164 # Ensure we have a frame pointer register. PPC64le's FP is the same as SP 165 if self.getArchitecture() != 'powerpc64le': 166 self.assertIn('fp', generic_regs) 167 168 # Ensure we have a stack pointer register. 169 self.assertIn('sp', generic_regs) 170 171 # Ensure we have a flags register. 172 self.assertIn('flags', generic_regs) 173 174 def test_qRegisterInfo_contains_at_least_one_register_set(self): 175 self.build() 176 self.prep_debug_monitor_and_inferior() 177 self.add_register_info_collection_packets() 178 179 # Run the packet stream. 180 context = self.expect_gdbremote_sequence() 181 self.assertIsNotNone(context) 182 183 # Gather register info entries. 184 reg_infos = self.parse_register_info_packets(context) 185 186 # Collect all register sets found. 187 register_sets = { 188 reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info} 189 self.assertTrue(len(register_sets) >= 1) 190 191 def targetHasAVX(self): 192 triple = self.dbg.GetSelectedPlatform().GetTriple() 193 194 # TODO other platforms, please implement this function 195 if not re.match(".*-.*-linux", triple): 196 return True 197 198 # Need to do something different for non-Linux/Android targets 199 if lldb.remote_platform: 200 self.runCmd('platform get-file "/proc/cpuinfo" "cpuinfo"') 201 cpuinfo_path = "cpuinfo" 202 self.addTearDownHook(lambda: os.unlink("cpuinfo")) 203 else: 204 cpuinfo_path = "/proc/cpuinfo" 205 206 f = open(cpuinfo_path, 'r') 207 cpuinfo = f.read() 208 f.close() 209 return " avx " in cpuinfo 210 211 @expectedFailureAll(oslist=["windows"]) # no avx for now. 212 @skipIf(archs=no_match(['amd64', 'i386', 'x86_64'])) 213 @add_test_categories(["llgs"]) 214 def test_qRegisterInfo_contains_avx_registers(self): 215 self.build() 216 self.prep_debug_monitor_and_inferior() 217 self.add_register_info_collection_packets() 218 219 # Run the packet stream. 220 context = self.expect_gdbremote_sequence() 221 self.assertIsNotNone(context) 222 223 # Gather register info entries. 224 reg_infos = self.parse_register_info_packets(context) 225 226 # Collect all generics found. 227 register_sets = { 228 reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info} 229 self.assertEqual( 230 self.targetHasAVX(), 231 "Advanced Vector Extensions" in register_sets) 232 233 def qThreadInfo_contains_thread(self): 234 procs = self.prep_debug_monitor_and_inferior() 235 self.add_threadinfo_collection_packets() 236 237 # Run the packet stream. 238 context = self.expect_gdbremote_sequence() 239 self.assertIsNotNone(context) 240 241 # Gather threadinfo entries. 242 threads = self.parse_threadinfo_packets(context) 243 self.assertIsNotNone(threads) 244 245 # We should have exactly one thread. 246 self.assertEqual(len(threads), 1) 247 248 def test_qThreadInfo_contains_thread_launch(self): 249 self.build() 250 self.set_inferior_startup_launch() 251 self.qThreadInfo_contains_thread() 252 253 @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped 254 def test_qThreadInfo_contains_thread_attach(self): 255 self.build() 256 self.set_inferior_startup_attach() 257 self.qThreadInfo_contains_thread() 258 259 def qThreadInfo_matches_qC(self): 260 procs = self.prep_debug_monitor_and_inferior() 261 262 self.add_threadinfo_collection_packets() 263 self.test_sequence.add_log_lines( 264 ["read packet: $qC#00", 265 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}} 266 ], True) 267 268 # Run the packet stream. 269 context = self.expect_gdbremote_sequence() 270 self.assertIsNotNone(context) 271 272 # Gather threadinfo entries. 273 threads = self.parse_threadinfo_packets(context) 274 self.assertIsNotNone(threads) 275 276 # We should have exactly one thread from threadinfo. 277 self.assertEqual(len(threads), 1) 278 279 # We should have a valid thread_id from $QC. 280 QC_thread_id_hex = context.get("thread_id") 281 self.assertIsNotNone(QC_thread_id_hex) 282 QC_thread_id = int(QC_thread_id_hex, 16) 283 284 # Those two should be the same. 285 self.assertEqual(threads[0], QC_thread_id) 286 287 def test_qThreadInfo_matches_qC_launch(self): 288 self.build() 289 self.set_inferior_startup_launch() 290 self.qThreadInfo_matches_qC() 291 292 @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped 293 def test_qThreadInfo_matches_qC_attach(self): 294 self.build() 295 self.set_inferior_startup_attach() 296 self.qThreadInfo_matches_qC() 297 298 def test_p_returns_correct_data_size_for_each_qRegisterInfo_launch(self): 299 self.build() 300 self.set_inferior_startup_launch() 301 procs = self.prep_debug_monitor_and_inferior() 302 self.add_register_info_collection_packets() 303 304 # Run the packet stream. 305 context = self.expect_gdbremote_sequence() 306 self.assertIsNotNone(context) 307 308 # Gather register info entries. 309 reg_infos = self.parse_register_info_packets(context) 310 self.assertIsNotNone(reg_infos) 311 self.assertTrue(len(reg_infos) > 0) 312 313 byte_order = self.get_target_byte_order() 314 315 # Read value for each register. 316 reg_index = 0 317 for reg_info in reg_infos: 318 # Skip registers that don't have a register set. For x86, these are 319 # the DRx registers, which have no LLDB-kind register number and thus 320 # cannot be read via normal 321 # NativeRegisterContext::ReadRegister(reg_info,...) calls. 322 if not "set" in reg_info: 323 continue 324 325 # Clear existing packet expectations. 326 self.reset_test_sequence() 327 328 # Run the register query 329 self.test_sequence.add_log_lines( 330 ["read packet: $p{0:x}#00".format(reg_index), 331 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}], 332 True) 333 context = self.expect_gdbremote_sequence() 334 self.assertIsNotNone(context) 335 336 # Verify the response length. 337 p_response = context.get("p_response") 338 self.assertIsNotNone(p_response) 339 340 # Skip erraneous (unsupported) registers. 341 # TODO: remove this once we make unsupported registers disappear. 342 if p_response.startswith("E") and len(p_response) == 3: 343 continue 344 345 if "dynamic_size_dwarf_expr_bytes" in reg_info: 346 self.updateRegInfoBitsize(reg_info, byte_order) 347 self.assertEqual(len(p_response), 2 * int(reg_info["bitsize"]) / 8, 348 reg_info) 349 350 # Increment loop 351 reg_index += 1 352 353 def Hg_switches_to_3_threads(self, pass_pid=False): 354 # Startup the inferior with three threads (main + 2 new ones). 355 procs = self.prep_debug_monitor_and_inferior( 356 inferior_args=["thread:new", "thread:new"]) 357 358 # Let the inferior process have a few moments to start up the thread 359 # when launched. (The launch scenario has no time to run, so threads 360 # won't be there yet.) 361 self.run_process_then_stop(run_seconds=1) 362 363 # Wait at most x seconds for 3 threads to be present. 364 threads = self.wait_for_thread_count(3) 365 self.assertEqual(len(threads), 3) 366 367 pid_str = "" 368 if pass_pid: 369 pid_str = "p{0:x}.".format(procs["inferior"].pid) 370 371 # verify we can $H to each thead, and $qC matches the thread we set. 372 for thread in threads: 373 # Change to each thread, verify current thread id. 374 self.reset_test_sequence() 375 self.test_sequence.add_log_lines( 376 ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread), # Set current thread. 377 "send packet: $OK#00", 378 "read packet: $qC#00", 379 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}], 380 True) 381 382 context = self.expect_gdbremote_sequence() 383 self.assertIsNotNone(context) 384 385 # Verify the thread id. 386 self.assertIsNotNone(context.get("thread_id")) 387 self.assertEqual(int(context.get("thread_id"), 16), thread) 388 389 @expectedFailureAll(oslist=["windows"]) # expect 4 threads 390 def test_Hg_switches_to_3_threads_launch(self): 391 self.build() 392 self.set_inferior_startup_launch() 393 self.Hg_switches_to_3_threads() 394 395 @expectedFailureAll(oslist=["windows"]) # expecting one more thread 396 def test_Hg_switches_to_3_threads_attach(self): 397 self.build() 398 self.set_inferior_startup_attach() 399 self.Hg_switches_to_3_threads() 400 401 @expectedFailureAll(oslist=["windows"]) # expect 4 threads 402 @add_test_categories(["llgs"]) 403 def test_Hg_switches_to_3_threads_attach_pass_correct_pid(self): 404 self.build() 405 self.set_inferior_startup_attach() 406 self.Hg_switches_to_3_threads(pass_pid=True) 407 408 def Hg_fails_on_pid(self, pass_pid): 409 # Start the inferior. 410 procs = self.prep_debug_monitor_and_inferior( 411 inferior_args=["thread:new"]) 412 413 self.run_process_then_stop(run_seconds=1) 414 415 threads = self.wait_for_thread_count(2) 416 self.assertEqual(len(threads), 2) 417 418 if pass_pid == -1: 419 pid_str = "p-1." 420 else: 421 pid_str = "p{0:x}.".format(pass_pid) 422 thread = threads[1] 423 424 self.test_sequence.add_log_lines( 425 ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread), # Set current thread. 426 "send packet: $Eff#00"], 427 True) 428 429 self.expect_gdbremote_sequence() 430 431 @expectedFailureAll(oslist=["windows"]) 432 @add_test_categories(["llgs"]) 433 def test_Hg_fails_on_another_pid(self): 434 self.build() 435 self.set_inferior_startup_launch() 436 self.Hg_fails_on_pid(1) 437 438 @expectedFailureAll(oslist=["windows"]) 439 @add_test_categories(["llgs"]) 440 def test_Hg_fails_on_zero_pid(self): 441 self.build() 442 self.set_inferior_startup_launch() 443 self.Hg_fails_on_pid(0) 444 445 @expectedFailureAll(oslist=["windows"]) 446 @add_test_categories(["llgs"]) 447 def test_Hg_fails_on_minus_one_pid(self): 448 self.build() 449 self.set_inferior_startup_launch() 450 self.Hg_fails_on_pid(-1) 451 452 def Hc_then_Csignal_signals_correct_thread(self, segfault_signo): 453 # NOTE only run this one in inferior-launched mode: we can't grab inferior stdout when running attached, 454 # and the test requires getting stdout from the exe. 455 456 NUM_THREADS = 3 457 458 # Startup the inferior with three threads (main + NUM_THREADS-1 worker threads). 459 # inferior_args=["thread:print-ids"] 460 inferior_args = ["thread:segfault"] 461 for i in range(NUM_THREADS - 1): 462 # if i > 0: 463 # Give time between thread creation/segfaulting for the handler to work. 464 # inferior_args.append("sleep:1") 465 inferior_args.append("thread:new") 466 inferior_args.append("sleep:10") 467 468 # Launch/attach. (In our case, this should only ever be launched since 469 # we need inferior stdout/stderr). 470 procs = self.prep_debug_monitor_and_inferior( 471 inferior_args=inferior_args) 472 self.test_sequence.add_log_lines(["read packet: $c#63"], True) 473 context = self.expect_gdbremote_sequence() 474 475 # Let the inferior process have a few moments to start up the thread when launched. 476 # context = self.run_process_then_stop(run_seconds=1) 477 478 # Wait at most x seconds for all threads to be present. 479 # threads = self.wait_for_thread_count(NUM_THREADS) 480 # self.assertEquals(len(threads), NUM_THREADS) 481 482 signaled_tids = {} 483 print_thread_ids = {} 484 485 # Switch to each thread, deliver a signal, and verify signal delivery 486 for i in range(NUM_THREADS - 1): 487 # Run until SIGSEGV comes in. 488 self.reset_test_sequence() 489 self.test_sequence.add_log_lines([{"direction": "send", 490 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 491 "capture": {1: "signo", 492 2: "thread_id"}}], 493 True) 494 495 context = self.expect_gdbremote_sequence() 496 self.assertIsNotNone(context) 497 signo = context.get("signo") 498 self.assertEqual(int(signo, 16), segfault_signo) 499 500 # Ensure we haven't seen this tid yet. 501 thread_id = int(context.get("thread_id"), 16) 502 self.assertNotIn(thread_id, signaled_tids) 503 signaled_tids[thread_id] = 1 504 505 # Send SIGUSR1 to the thread that signaled the SIGSEGV. 506 self.reset_test_sequence() 507 self.test_sequence.add_log_lines( 508 [ 509 # Set the continue thread. 510 # Set current thread. 511 "read packet: $Hc{0:x}#00".format(thread_id), 512 "send packet: $OK#00", 513 514 # Continue sending the signal number to the continue thread. 515 # The commented out packet is a way to do this same operation without using 516 # a $Hc (but this test is testing $Hc, so we'll stick with the former). 517 "read packet: $C{0:x}#00".format(lldbutil.get_signal_number('SIGUSR1')), 518 # "read packet: $vCont;C{0:x}:{1:x};c#00".format(lldbutil.get_signal_number('SIGUSR1'), thread_id), 519 520 # FIXME: Linux does not report the thread stop on the delivered signal (SIGUSR1 here). MacOSX debugserver does. 521 # But MacOSX debugserver isn't guaranteeing the thread the signal handler runs on, so currently its an XFAIL. 522 # Need to rectify behavior here. The linux behavior is more intuitive to me since we're essentially swapping out 523 # an about-to-be-delivered signal (for which we already sent a stop packet) to a different signal. 524 # {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} }, 525 # "read packet: $c#63", 526 {"type": "output_match", "regex": r"^received SIGUSR1 on thread id: ([0-9a-fA-F]+)\r\nthread ([0-9a-fA-F]+): past SIGSEGV\r\n", "capture": {1: "print_thread_id", 2: "post_handle_thread_id"}}, 527 ], 528 True) 529 530 # Run the sequence. 531 context = self.expect_gdbremote_sequence() 532 self.assertIsNotNone(context) 533 534 # Ensure the stop signal is the signal we delivered. 535 # stop_signo = context.get("stop_signo") 536 # self.assertIsNotNone(stop_signo) 537 # self.assertEquals(int(stop_signo,16), lldbutil.get_signal_number('SIGUSR1')) 538 539 # Ensure the stop thread is the thread to which we delivered the signal. 540 # stop_thread_id = context.get("stop_thread_id") 541 # self.assertIsNotNone(stop_thread_id) 542 # self.assertEquals(int(stop_thread_id,16), thread_id) 543 544 # Ensure we haven't seen this thread id yet. The inferior's 545 # self-obtained thread ids are not guaranteed to match the stub 546 # tids (at least on MacOSX). 547 print_thread_id = context.get("print_thread_id") 548 self.assertIsNotNone(print_thread_id) 549 print_thread_id = int(print_thread_id, 16) 550 self.assertNotIn(print_thread_id, print_thread_ids) 551 552 # Now remember this print (i.e. inferior-reflected) thread id and 553 # ensure we don't hit it again. 554 print_thread_ids[print_thread_id] = 1 555 556 # Ensure post signal-handle thread id matches the thread that 557 # initially raised the SIGSEGV. 558 post_handle_thread_id = context.get("post_handle_thread_id") 559 self.assertIsNotNone(post_handle_thread_id) 560 post_handle_thread_id = int(post_handle_thread_id, 16) 561 self.assertEqual(post_handle_thread_id, print_thread_id) 562 563 @expectedFailureDarwin 564 @skipIfWindows # no SIGSEGV support 565 @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48419") 566 @expectedFailureNetBSD 567 def test_Hc_then_Csignal_signals_correct_thread_launch(self): 568 self.build() 569 self.set_inferior_startup_launch() 570 571 if self.platformIsDarwin(): 572 # Darwin debugserver translates some signals like SIGSEGV into some gdb 573 # expectations about fixed signal numbers. 574 self.Hc_then_Csignal_signals_correct_thread(self.TARGET_EXC_BAD_ACCESS) 575 else: 576 self.Hc_then_Csignal_signals_correct_thread( 577 lldbutil.get_signal_number('SIGSEGV')) 578 579 @skipIfWindows # No pty support to test any inferior output 580 def test_m_packet_reads_memory(self): 581 self.build() 582 self.set_inferior_startup_launch() 583 # This is the memory we will write into the inferior and then ensure we 584 # can read back with $m. 585 MEMORY_CONTENTS = "Test contents 0123456789 ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz" 586 587 # Start up the inferior. 588 procs = self.prep_debug_monitor_and_inferior( 589 inferior_args=[ 590 "set-message:%s" % 591 MEMORY_CONTENTS, 592 "get-data-address-hex:g_message", 593 "sleep:5"]) 594 595 # Run the process 596 self.test_sequence.add_log_lines( 597 [ 598 # Start running after initial stop. 599 "read packet: $c#63", 600 # Match output line that prints the memory address of the message buffer within the inferior. 601 # Note we require launch-only testing so we can get inferior otuput. 602 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"), 603 "capture": {1: "message_address"}}, 604 # Now stop the inferior. 605 "read packet: {}".format(chr(3)), 606 # And wait for the stop notification. 607 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 608 True) 609 610 # Run the packet stream. 611 context = self.expect_gdbremote_sequence() 612 self.assertIsNotNone(context) 613 614 # Grab the message address. 615 self.assertIsNotNone(context.get("message_address")) 616 message_address = int(context.get("message_address"), 16) 617 618 # Grab contents from the inferior. 619 self.reset_test_sequence() 620 self.test_sequence.add_log_lines( 621 ["read packet: $m{0:x},{1:x}#00".format(message_address, len(MEMORY_CONTENTS)), 622 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "read_contents"}}], 623 True) 624 625 # Run the packet stream. 626 context = self.expect_gdbremote_sequence() 627 self.assertIsNotNone(context) 628 629 # Ensure what we read from inferior memory is what we wrote. 630 self.assertIsNotNone(context.get("read_contents")) 631 read_contents = seven.unhexlify(context.get("read_contents")) 632 self.assertEqual(read_contents, MEMORY_CONTENTS) 633 634 def test_qMemoryRegionInfo_is_supported(self): 635 self.build() 636 self.set_inferior_startup_launch() 637 # Start up the inferior. 638 procs = self.prep_debug_monitor_and_inferior() 639 640 # Ask if it supports $qMemoryRegionInfo. 641 self.test_sequence.add_log_lines( 642 ["read packet: $qMemoryRegionInfo#00", 643 "send packet: $OK#00" 644 ], True) 645 self.expect_gdbremote_sequence() 646 647 @skipIfWindows # No pty support to test any inferior output 648 def test_qMemoryRegionInfo_reports_code_address_as_executable(self): 649 self.build() 650 self.set_inferior_startup_launch() 651 652 # Start up the inferior. 653 procs = self.prep_debug_monitor_and_inferior( 654 inferior_args=["get-code-address-hex:hello", "sleep:5"]) 655 656 # Run the process 657 self.test_sequence.add_log_lines( 658 [ 659 # Start running after initial stop. 660 "read packet: $c#63", 661 # Match output line that prints the memory address of the message buffer within the inferior. 662 # Note we require launch-only testing so we can get inferior otuput. 663 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"), 664 "capture": {1: "code_address"}}, 665 # Now stop the inferior. 666 "read packet: {}".format(chr(3)), 667 # And wait for the stop notification. 668 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 669 True) 670 671 # Run the packet stream. 672 context = self.expect_gdbremote_sequence() 673 self.assertIsNotNone(context) 674 675 # Grab the code address. 676 self.assertIsNotNone(context.get("code_address")) 677 code_address = int(context.get("code_address"), 16) 678 679 # Grab memory region info from the inferior. 680 self.reset_test_sequence() 681 self.add_query_memory_region_packets(code_address) 682 683 # Run the packet stream. 684 context = self.expect_gdbremote_sequence() 685 self.assertIsNotNone(context) 686 mem_region_dict = self.parse_memory_region_packet(context) 687 688 # Ensure there are no errors reported. 689 self.assertNotIn("error", mem_region_dict) 690 691 # Ensure code address is readable and executable. 692 self.assertIn("permissions", mem_region_dict) 693 self.assertIn("r", mem_region_dict["permissions"]) 694 self.assertIn("x", mem_region_dict["permissions"]) 695 696 # Ensure the start address and size encompass the address we queried. 697 self.assert_address_within_memory_region(code_address, mem_region_dict) 698 699 @skipIfWindows # No pty support to test any inferior output 700 def test_qMemoryRegionInfo_reports_stack_address_as_rw(self): 701 self.build() 702 self.set_inferior_startup_launch() 703 704 # Start up the inferior. 705 procs = self.prep_debug_monitor_and_inferior( 706 inferior_args=["get-stack-address-hex:", "sleep:5"]) 707 708 # Run the process 709 self.test_sequence.add_log_lines( 710 [ 711 # Start running after initial stop. 712 "read packet: $c#63", 713 # Match output line that prints the memory address of the message buffer within the inferior. 714 # Note we require launch-only testing so we can get inferior otuput. 715 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"stack address: 0x([0-9a-fA-F]+)\r\n"), 716 "capture": {1: "stack_address"}}, 717 # Now stop the inferior. 718 "read packet: {}".format(chr(3)), 719 # And wait for the stop notification. 720 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 721 True) 722 723 # Run the packet stream. 724 context = self.expect_gdbremote_sequence() 725 self.assertIsNotNone(context) 726 727 # Grab the address. 728 self.assertIsNotNone(context.get("stack_address")) 729 stack_address = int(context.get("stack_address"), 16) 730 731 # Grab memory region info from the inferior. 732 self.reset_test_sequence() 733 self.add_query_memory_region_packets(stack_address) 734 735 # Run the packet stream. 736 context = self.expect_gdbremote_sequence() 737 self.assertIsNotNone(context) 738 mem_region_dict = self.parse_memory_region_packet(context) 739 740 # Ensure there are no errors reported. 741 self.assertNotIn("error", mem_region_dict) 742 743 # Ensure address is readable and executable. 744 self.assertIn("permissions", mem_region_dict) 745 self.assertIn("r", mem_region_dict["permissions"]) 746 self.assertIn("w", mem_region_dict["permissions"]) 747 748 # Ensure the start address and size encompass the address we queried. 749 self.assert_address_within_memory_region( 750 stack_address, mem_region_dict) 751 752 @skipIfWindows # No pty support to test any inferior output 753 def test_qMemoryRegionInfo_reports_heap_address_as_rw(self): 754 self.build() 755 self.set_inferior_startup_launch() 756 757 # Start up the inferior. 758 procs = self.prep_debug_monitor_and_inferior( 759 inferior_args=["get-heap-address-hex:", "sleep:5"]) 760 761 # Run the process 762 self.test_sequence.add_log_lines( 763 [ 764 # Start running after initial stop. 765 "read packet: $c#63", 766 # Match output line that prints the memory address of the message buffer within the inferior. 767 # Note we require launch-only testing so we can get inferior otuput. 768 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"heap address: 0x([0-9a-fA-F]+)\r\n"), 769 "capture": {1: "heap_address"}}, 770 # Now stop the inferior. 771 "read packet: {}".format(chr(3)), 772 # And wait for the stop notification. 773 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 774 True) 775 776 # Run the packet stream. 777 context = self.expect_gdbremote_sequence() 778 self.assertIsNotNone(context) 779 780 # Grab the address. 781 self.assertIsNotNone(context.get("heap_address")) 782 heap_address = int(context.get("heap_address"), 16) 783 784 # Grab memory region info from the inferior. 785 self.reset_test_sequence() 786 self.add_query_memory_region_packets(heap_address) 787 788 # Run the packet stream. 789 context = self.expect_gdbremote_sequence() 790 self.assertIsNotNone(context) 791 mem_region_dict = self.parse_memory_region_packet(context) 792 793 # Ensure there are no errors reported. 794 self.assertNotIn("error", mem_region_dict) 795 796 # Ensure address is readable and executable. 797 self.assertIn("permissions", mem_region_dict) 798 self.assertIn("r", mem_region_dict["permissions"]) 799 self.assertIn("w", mem_region_dict["permissions"]) 800 801 # Ensure the start address and size encompass the address we queried. 802 self.assert_address_within_memory_region(heap_address, mem_region_dict) 803 804 def breakpoint_set_and_remove_work(self, want_hardware): 805 # Start up the inferior. 806 procs = self.prep_debug_monitor_and_inferior( 807 inferior_args=[ 808 "get-code-address-hex:hello", 809 "sleep:1", 810 "call-function:hello"]) 811 812 # Run the process 813 self.add_register_info_collection_packets() 814 self.add_process_info_collection_packets() 815 self.test_sequence.add_log_lines( 816 [ # Start running after initial stop. 817 "read packet: $c#63", 818 # Match output line that prints the memory address of the function call entry point. 819 # Note we require launch-only testing so we can get inferior otuput. 820 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"), 821 "capture": {1: "function_address"}}, 822 # Now stop the inferior. 823 "read packet: {}".format(chr(3)), 824 # And wait for the stop notification. 825 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 826 True) 827 828 # Run the packet stream. 829 context = self.expect_gdbremote_sequence() 830 self.assertIsNotNone(context) 831 832 # Gather process info - we need endian of target to handle register 833 # value conversions. 834 process_info = self.parse_process_info_response(context) 835 endian = process_info.get("endian") 836 self.assertIsNotNone(endian) 837 838 # Gather register info entries. 839 reg_infos = self.parse_register_info_packets(context) 840 (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_infos) 841 self.assertIsNotNone(pc_lldb_reg_index) 842 self.assertIsNotNone(pc_reg_info) 843 844 # Grab the function address. 845 self.assertIsNotNone(context.get("function_address")) 846 function_address = int(context.get("function_address"), 16) 847 848 # Get current target architecture 849 target_arch = self.getArchitecture() 850 851 # Set the breakpoint. 852 if (target_arch == "arm") or (target_arch == "aarch64"): 853 # TODO: Handle case when setting breakpoint in thumb code 854 BREAKPOINT_KIND = 4 855 else: 856 BREAKPOINT_KIND = 1 857 858 # Set default packet type to Z0 (software breakpoint) 859 z_packet_type = 0 860 861 # If hardware breakpoint is requested set packet type to Z1 862 if want_hardware == True: 863 z_packet_type = 1 864 865 self.reset_test_sequence() 866 self.add_set_breakpoint_packets( 867 function_address, 868 z_packet_type, 869 do_continue=True, 870 breakpoint_kind=BREAKPOINT_KIND) 871 872 # Run the packet stream. 873 context = self.expect_gdbremote_sequence() 874 self.assertIsNotNone(context) 875 876 # Verify the stop signal reported was the breakpoint signal number. 877 stop_signo = context.get("stop_signo") 878 self.assertIsNotNone(stop_signo) 879 self.assertEqual(int(stop_signo, 16), 880 lldbutil.get_signal_number('SIGTRAP')) 881 882 # Ensure we did not receive any output. If the breakpoint was not set, we would 883 # see output (from a launched process with captured stdio) printing a hello, world message. 884 # That would indicate the breakpoint didn't take. 885 self.assertEqual(len(context["O_content"]), 0) 886 887 # Verify that the PC for the main thread is where we expect it - right at the breakpoint address. 888 # This acts as a another validation on the register reading code. 889 self.reset_test_sequence() 890 self.test_sequence.add_log_lines( 891 [ 892 # Print the PC. This should match the breakpoint address. 893 "read packet: $p{0:x}#00".format(pc_lldb_reg_index), 894 # Capture $p results. 895 {"direction": "send", 896 "regex": r"^\$([0-9a-fA-F]+)#", 897 "capture": {1: "p_response"}}, 898 ], True) 899 900 context = self.expect_gdbremote_sequence() 901 self.assertIsNotNone(context) 902 903 # Verify the PC is where we expect. Note response is in endianness of 904 # the inferior. 905 p_response = context.get("p_response") 906 self.assertIsNotNone(p_response) 907 908 # Convert from target endian to int. 909 returned_pc = lldbgdbserverutils.unpack_register_hex_unsigned( 910 endian, p_response) 911 self.assertEqual(returned_pc, function_address) 912 913 # Verify that a breakpoint remove and continue gets us the expected 914 # output. 915 self.reset_test_sequence() 916 917 # Add breakpoint remove packets 918 self.add_remove_breakpoint_packets( 919 function_address, 920 z_packet_type, 921 breakpoint_kind=BREAKPOINT_KIND) 922 923 self.test_sequence.add_log_lines( 924 [ 925 # Continue running. 926 "read packet: $c#63", 927 # We should now receive the output from the call. 928 {"type": "output_match", "regex": r"^hello, world\r\n$"}, 929 # And wait for program completion. 930 {"direction": "send", "regex": r"^\$W00(.*)#[0-9a-fA-F]{2}$"}, 931 ], True) 932 933 context = self.expect_gdbremote_sequence() 934 self.assertIsNotNone(context) 935 936 @skipIfWindows # No pty support to test any inferior output 937 def test_software_breakpoint_set_and_remove_work(self): 938 if self.getArchitecture() == "arm": 939 # TODO: Handle case when setting breakpoint in thumb code 940 self.build(dictionary={'CFLAGS_EXTRAS': '-marm'}) 941 else: 942 self.build() 943 self.set_inferior_startup_launch() 944 self.breakpoint_set_and_remove_work(want_hardware=False) 945 946 @skipUnlessPlatform(oslist=['linux']) 947 @skipIf(archs=no_match(['arm', 'aarch64'])) 948 def test_hardware_breakpoint_set_and_remove_work(self): 949 if self.getArchitecture() == "arm": 950 # TODO: Handle case when setting breakpoint in thumb code 951 self.build(dictionary={'CFLAGS_EXTRAS': '-marm'}) 952 else: 953 self.build() 954 self.set_inferior_startup_launch() 955 self.breakpoint_set_and_remove_work(want_hardware=True) 956 957 def get_qSupported_dict(self, features=[]): 958 self.build() 959 self.set_inferior_startup_launch() 960 961 # Start up the stub and start/prep the inferior. 962 procs = self.prep_debug_monitor_and_inferior() 963 self.add_qSupported_packets(features) 964 965 # Run the packet stream. 966 context = self.expect_gdbremote_sequence() 967 self.assertIsNotNone(context) 968 969 # Retrieve the qSupported features. 970 return self.parse_qSupported_response(context) 971 972 def test_qSupported_returns_known_stub_features(self): 973 supported_dict = self.get_qSupported_dict() 974 self.assertIsNotNone(supported_dict) 975 self.assertTrue(len(supported_dict) > 0) 976 977 def test_qSupported_auvx(self): 978 expected = ('+' if lldbplatformutil.getPlatform() 979 in ["freebsd", "linux", "netbsd"] else '-') 980 supported_dict = self.get_qSupported_dict() 981 self.assertEqual(supported_dict.get('qXfer:auxv:read', '-'), expected) 982 983 def test_qSupported_libraries_svr4(self): 984 expected = ('+' if lldbplatformutil.getPlatform() 985 in ["freebsd", "linux", "netbsd"] else '-') 986 supported_dict = self.get_qSupported_dict() 987 self.assertEqual(supported_dict.get('qXfer:libraries-svr4:read', '-'), 988 expected) 989 990 def test_qSupported_QPassSignals(self): 991 expected = ('+' if lldbplatformutil.getPlatform() 992 in ["freebsd", "linux", "netbsd"] else '-') 993 supported_dict = self.get_qSupported_dict() 994 self.assertEqual(supported_dict.get('QPassSignals', '-'), expected) 995 996 @add_test_categories(["fork"]) 997 def test_qSupported_fork_events(self): 998 supported_dict = ( 999 self.get_qSupported_dict(['multiprocess+', 'fork-events+'])) 1000 self.assertEqual(supported_dict.get('multiprocess', '-'), '+') 1001 self.assertEqual(supported_dict.get('fork-events', '-'), '+') 1002 self.assertEqual(supported_dict.get('vfork-events', '-'), '-') 1003 1004 @add_test_categories(["fork"]) 1005 def test_qSupported_fork_events_without_multiprocess(self): 1006 supported_dict = ( 1007 self.get_qSupported_dict(['fork-events+'])) 1008 self.assertEqual(supported_dict.get('multiprocess', '-'), '-') 1009 self.assertEqual(supported_dict.get('fork-events', '-'), '-') 1010 self.assertEqual(supported_dict.get('vfork-events', '-'), '-') 1011 1012 @add_test_categories(["fork"]) 1013 def test_qSupported_vfork_events(self): 1014 supported_dict = ( 1015 self.get_qSupported_dict(['multiprocess+', 'vfork-events+'])) 1016 self.assertEqual(supported_dict.get('multiprocess', '-'), '+') 1017 self.assertEqual(supported_dict.get('fork-events', '-'), '-') 1018 self.assertEqual(supported_dict.get('vfork-events', '-'), '+') 1019 1020 @add_test_categories(["fork"]) 1021 def test_qSupported_vfork_events_without_multiprocess(self): 1022 supported_dict = ( 1023 self.get_qSupported_dict(['vfork-events+'])) 1024 self.assertEqual(supported_dict.get('multiprocess', '-'), '-') 1025 self.assertEqual(supported_dict.get('fork-events', '-'), '-') 1026 self.assertEqual(supported_dict.get('vfork-events', '-'), '-') 1027 1028 # We need to be able to self.runCmd to get cpuinfo, 1029 # which is not possible when using a remote platform. 1030 @skipIfRemote 1031 def test_qSupported_memory_tagging(self): 1032 supported_dict = self.get_qSupported_dict() 1033 self.assertEqual(supported_dict.get("memory-tagging", '-'), 1034 '+' if self.isAArch64MTE() else '-') 1035 1036 @skipIfWindows # No pty support to test any inferior output 1037 def test_written_M_content_reads_back_correctly(self): 1038 self.build() 1039 self.set_inferior_startup_launch() 1040 1041 TEST_MESSAGE = "Hello, memory" 1042 1043 # Start up the stub and start/prep the inferior. 1044 procs = self.prep_debug_monitor_and_inferior( 1045 inferior_args=[ 1046 "set-message:xxxxxxxxxxxxxX", 1047 "get-data-address-hex:g_message", 1048 "sleep:1", 1049 "print-message:"]) 1050 self.test_sequence.add_log_lines( 1051 [ 1052 # Start running after initial stop. 1053 "read packet: $c#63", 1054 # Match output line that prints the memory address of the message buffer within the inferior. 1055 # Note we require launch-only testing so we can get inferior otuput. 1056 {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"), 1057 "capture": {1: "message_address"}}, 1058 # Now stop the inferior. 1059 "read packet: {}".format(chr(3)), 1060 # And wait for the stop notification. 1061 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 1062 True) 1063 context = self.expect_gdbremote_sequence() 1064 self.assertIsNotNone(context) 1065 1066 # Grab the message address. 1067 self.assertIsNotNone(context.get("message_address")) 1068 message_address = int(context.get("message_address"), 16) 1069 1070 # Hex-encode the test message, adding null termination. 1071 hex_encoded_message = seven.hexlify(TEST_MESSAGE) 1072 1073 # Write the message to the inferior. Verify that we can read it with the hex-encoded (m) 1074 # and binary (x) memory read packets. 1075 self.reset_test_sequence() 1076 self.test_sequence.add_log_lines( 1077 ["read packet: $M{0:x},{1:x}:{2}#00".format(message_address, len(TEST_MESSAGE), hex_encoded_message), 1078 "send packet: $OK#00", 1079 "read packet: $m{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)), 1080 "send packet: ${0}#00".format(hex_encoded_message), 1081 "read packet: $x{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)), 1082 "send packet: ${0}#00".format(TEST_MESSAGE), 1083 "read packet: $m{0:x},4#00".format(message_address), 1084 "send packet: ${0}#00".format(hex_encoded_message[0:8]), 1085 "read packet: $x{0:x},4#00".format(message_address), 1086 "send packet: ${0}#00".format(TEST_MESSAGE[0:4]), 1087 "read packet: $c#63", 1088 {"type": "output_match", "regex": r"^message: (.+)\r\n$", "capture": {1: "printed_message"}}, 1089 "send packet: $W00#00", 1090 ], True) 1091 context = self.expect_gdbremote_sequence() 1092 self.assertIsNotNone(context) 1093 1094 # Ensure what we read from inferior memory is what we wrote. 1095 printed_message = context.get("printed_message") 1096 self.assertIsNotNone(printed_message) 1097 self.assertEqual(printed_message, TEST_MESSAGE + "X") 1098 1099 # Note: as of this moment, a hefty number of the GPR writes are failing with E32 (everything except rax-rdx, rdi, rsi, rbp). 1100 # Come back to this. I have the test rigged to verify that at least some 1101 # of the bit-flip writes work. 1102 def test_P_writes_all_gpr_registers(self): 1103 self.build() 1104 self.set_inferior_startup_launch() 1105 1106 # Start inferior debug session, grab all register info. 1107 procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"]) 1108 self.add_register_info_collection_packets() 1109 self.add_process_info_collection_packets() 1110 1111 context = self.expect_gdbremote_sequence() 1112 self.assertIsNotNone(context) 1113 1114 # Process register infos. 1115 reg_infos = self.parse_register_info_packets(context) 1116 self.assertIsNotNone(reg_infos) 1117 self.add_lldb_register_index(reg_infos) 1118 1119 # Process endian. 1120 process_info = self.parse_process_info_response(context) 1121 endian = process_info.get("endian") 1122 self.assertIsNotNone(endian) 1123 1124 # Pull out the register infos that we think we can bit flip 1125 # successfully,. 1126 gpr_reg_infos = [ 1127 reg_info for reg_info in reg_infos if self.is_bit_flippable_register(reg_info)] 1128 self.assertTrue(len(gpr_reg_infos) > 0) 1129 1130 # Write flipped bit pattern of existing value to each register. 1131 (successful_writes, failed_writes) = self.flip_all_bits_in_each_register_value( 1132 gpr_reg_infos, endian) 1133 self.trace("successful writes: {}, failed writes: {}".format(successful_writes, failed_writes)) 1134 self.assertTrue(successful_writes > 0) 1135 1136 # Note: as of this moment, a hefty number of the GPR writes are failing 1137 # with E32 (everything except rax-rdx, rdi, rsi, rbp). 1138 @skipIfWindows 1139 def test_P_and_p_thread_suffix_work(self): 1140 self.build() 1141 self.set_inferior_startup_launch() 1142 1143 # Startup the inferior with three threads. 1144 procs = self.prep_debug_monitor_and_inferior( 1145 inferior_args=["thread:new", "thread:new"]) 1146 self.add_thread_suffix_request_packets() 1147 self.add_register_info_collection_packets() 1148 self.add_process_info_collection_packets() 1149 1150 context = self.expect_gdbremote_sequence() 1151 self.assertIsNotNone(context) 1152 1153 process_info = self.parse_process_info_response(context) 1154 self.assertIsNotNone(process_info) 1155 endian = process_info.get("endian") 1156 self.assertIsNotNone(endian) 1157 1158 reg_infos = self.parse_register_info_packets(context) 1159 self.assertIsNotNone(reg_infos) 1160 self.add_lldb_register_index(reg_infos) 1161 1162 reg_index = self.select_modifiable_register(reg_infos) 1163 self.assertIsNotNone(reg_index) 1164 reg_byte_size = int(reg_infos[reg_index]["bitsize"]) // 8 1165 self.assertTrue(reg_byte_size > 0) 1166 1167 # Run the process a bit so threads can start up, and collect register 1168 # info. 1169 context = self.run_process_then_stop(run_seconds=1) 1170 self.assertIsNotNone(context) 1171 1172 # Wait for 3 threads to be present. 1173 threads = self.wait_for_thread_count(3) 1174 self.assertEqual(len(threads), 3) 1175 1176 expected_reg_values = [] 1177 register_increment = 1 1178 next_value = None 1179 1180 # Set the same register in each of 3 threads to a different value. 1181 # Verify each one has the unique value. 1182 for thread in threads: 1183 # If we don't have a next value yet, start it with the initial read 1184 # value + 1 1185 if not next_value: 1186 # Read pre-existing register value. 1187 self.reset_test_sequence() 1188 self.test_sequence.add_log_lines( 1189 ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread), 1190 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1191 ], True) 1192 context = self.expect_gdbremote_sequence() 1193 self.assertIsNotNone(context) 1194 1195 # Set the next value to use for writing as the increment plus 1196 # current value. 1197 p_response = context.get("p_response") 1198 self.assertIsNotNone(p_response) 1199 next_value = lldbgdbserverutils.unpack_register_hex_unsigned( 1200 endian, p_response) 1201 1202 # Set new value using P and thread suffix. 1203 self.reset_test_sequence() 1204 self.test_sequence.add_log_lines( 1205 [ 1206 "read packet: $P{0:x}={1};thread:{2:x}#00".format( 1207 reg_index, 1208 lldbgdbserverutils.pack_register_hex( 1209 endian, 1210 next_value, 1211 byte_size=reg_byte_size), 1212 thread), 1213 "send packet: $OK#00", 1214 ], 1215 True) 1216 context = self.expect_gdbremote_sequence() 1217 self.assertIsNotNone(context) 1218 1219 # Save the value we set. 1220 expected_reg_values.append(next_value) 1221 1222 # Increment value for next thread to use (we want them all 1223 # different so we can verify they wrote to each thread correctly 1224 # next.) 1225 next_value += register_increment 1226 1227 # Revisit each thread and verify they have the expected value set for 1228 # the register we wrote. 1229 thread_index = 0 1230 for thread in threads: 1231 # Read pre-existing register value. 1232 self.reset_test_sequence() 1233 self.test_sequence.add_log_lines( 1234 ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread), 1235 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1236 ], True) 1237 context = self.expect_gdbremote_sequence() 1238 self.assertIsNotNone(context) 1239 1240 # Get the register value. 1241 p_response = context.get("p_response") 1242 self.assertIsNotNone(p_response) 1243 read_value = lldbgdbserverutils.unpack_register_hex_unsigned( 1244 endian, p_response) 1245 1246 # Make sure we read back what we wrote. 1247 self.assertEqual(read_value, expected_reg_values[thread_index]) 1248 thread_index += 1 1249