1""" 2Base class for gdb-remote test cases. 3""" 4 5from __future__ import division, print_function 6 7 8import errno 9import os 10import os.path 11import random 12import re 13import select 14import socket 15import subprocess 16import sys 17import tempfile 18import time 19from lldbsuite.test import configuration 20from lldbsuite.test.lldbtest import * 21from lldbsuite.support import seven 22from lldbgdbserverutils import * 23import logging 24 25 26class _ConnectionRefused(IOError): 27 pass 28 29 30class GdbRemoteTestCaseBase(TestBase): 31 32 NO_DEBUG_INFO_TESTCASE = True 33 34 _TIMEOUT_SECONDS = 120 35 36 _GDBREMOTE_KILL_PACKET = "$k#6b" 37 38 # Start the inferior separately, attach to the inferior on the stub 39 # command line. 40 _STARTUP_ATTACH = "attach" 41 # Start the inferior separately, start the stub without attaching, allow 42 # the test to attach to the inferior however it wants (e.g. $vAttach;pid). 43 _STARTUP_ATTACH_MANUALLY = "attach_manually" 44 # Start the stub, and launch the inferior with an $A packet via the 45 # initial packet stream. 46 _STARTUP_LAUNCH = "launch" 47 48 # GDB Signal numbers that are not target-specific used for common 49 # exceptions 50 TARGET_EXC_BAD_ACCESS = 0x91 51 TARGET_EXC_BAD_INSTRUCTION = 0x92 52 TARGET_EXC_ARITHMETIC = 0x93 53 TARGET_EXC_EMULATION = 0x94 54 TARGET_EXC_SOFTWARE = 0x95 55 TARGET_EXC_BREAKPOINT = 0x96 56 57 _verbose_log_handler = None 58 _log_formatter = logging.Formatter( 59 fmt='%(asctime)-15s %(levelname)-8s %(message)s') 60 61 def setUpBaseLogging(self): 62 self.logger = logging.getLogger(__name__) 63 64 if len(self.logger.handlers) > 0: 65 return # We have set up this handler already 66 67 self.logger.propagate = False 68 self.logger.setLevel(logging.DEBUG) 69 70 # log all warnings to stderr 71 handler = logging.StreamHandler() 72 handler.setLevel(logging.WARNING) 73 handler.setFormatter(self._log_formatter) 74 self.logger.addHandler(handler) 75 76 def isVerboseLoggingRequested(self): 77 # We will report our detailed logs if the user requested that the "gdb-remote" channel is 78 # logged. 79 return any(("gdb-remote" in channel) 80 for channel in lldbtest_config.channels) 81 82 def setUp(self): 83 TestBase.setUp(self) 84 85 self.setUpBaseLogging() 86 self.debug_monitor_extra_args = [] 87 self._pump_queues = socket_packet_pump.PumpQueues() 88 89 if self.isVerboseLoggingRequested(): 90 # If requested, full logs go to a log file 91 self._verbose_log_handler = logging.FileHandler( 92 self.log_basename + "-host.log") 93 self._verbose_log_handler.setFormatter(self._log_formatter) 94 self._verbose_log_handler.setLevel(logging.DEBUG) 95 self.logger.addHandler(self._verbose_log_handler) 96 97 self.test_sequence = GdbRemoteTestSequence(self.logger) 98 self.set_inferior_startup_launch() 99 self.port = self.get_next_port() 100 self.named_pipe_path = None 101 self.named_pipe = None 102 self.named_pipe_fd = None 103 self.stub_sends_two_stop_notifications_on_kill = False 104 if configuration.lldb_platform_url: 105 if configuration.lldb_platform_url.startswith('unix-'): 106 url_pattern = '(.+)://\[?(.+?)\]?/.*' 107 else: 108 url_pattern = '(.+)://(.+):\d+' 109 scheme, host = re.match( 110 url_pattern, configuration.lldb_platform_url).groups() 111 if configuration.lldb_platform_name == 'remote-android' and host != 'localhost': 112 self.stub_device = host 113 self.stub_hostname = 'localhost' 114 else: 115 self.stub_device = None 116 self.stub_hostname = host 117 else: 118 self.stub_hostname = "localhost" 119 120 def tearDown(self): 121 self._pump_queues.verify_queues_empty() 122 123 self.logger.removeHandler(self._verbose_log_handler) 124 self._verbose_log_handler = None 125 TestBase.tearDown(self) 126 127 def getLocalServerLogFile(self): 128 return self.log_basename + "-server.log" 129 130 def setUpServerLogging(self, is_llgs): 131 if len(lldbtest_config.channels) == 0: 132 return # No logging requested 133 134 if lldb.remote_platform: 135 log_file = lldbutil.join_remote_paths( 136 lldb.remote_platform.GetWorkingDirectory(), "server.log") 137 else: 138 log_file = self.getLocalServerLogFile() 139 140 if is_llgs: 141 self.debug_monitor_extra_args.append("--log-file=" + log_file) 142 self.debug_monitor_extra_args.append( 143 "--log-channels={}".format(":".join(lldbtest_config.channels))) 144 else: 145 self.debug_monitor_extra_args = [ 146 "--log-file=" + log_file, "--log-flags=0x800000"] 147 148 def get_next_port(self): 149 return 12000 + random.randint(0, 3999) 150 151 def reset_test_sequence(self): 152 self.test_sequence = GdbRemoteTestSequence(self.logger) 153 154 def create_named_pipe(self): 155 # Create a temp dir and name for a pipe. 156 temp_dir = tempfile.mkdtemp() 157 named_pipe_path = os.path.join(temp_dir, "stub_port_number") 158 159 # Create the named pipe. 160 os.mkfifo(named_pipe_path) 161 162 # Open the read side of the pipe in non-blocking mode. This will 163 # return right away, ready or not. 164 named_pipe_fd = os.open(named_pipe_path, os.O_RDONLY | os.O_NONBLOCK) 165 166 # Create the file for the named pipe. Note this will follow semantics of 167 # a non-blocking read side of a named pipe, which has different semantics 168 # than a named pipe opened for read in non-blocking mode. 169 named_pipe = os.fdopen(named_pipe_fd, "r") 170 self.assertIsNotNone(named_pipe) 171 172 def shutdown_named_pipe(): 173 # Close the pipe. 174 try: 175 named_pipe.close() 176 except: 177 print("failed to close named pipe") 178 None 179 180 # Delete the pipe. 181 try: 182 os.remove(named_pipe_path) 183 except: 184 print("failed to delete named pipe: {}".format(named_pipe_path)) 185 None 186 187 # Delete the temp directory. 188 try: 189 os.rmdir(temp_dir) 190 except: 191 print( 192 "failed to delete temp dir: {}, directory contents: '{}'".format( 193 temp_dir, os.listdir(temp_dir))) 194 None 195 196 # Add the shutdown hook to clean up the named pipe. 197 self.addTearDownHook(shutdown_named_pipe) 198 199 # Clear the port so the stub selects a port number. 200 self.port = 0 201 202 return (named_pipe_path, named_pipe, named_pipe_fd) 203 204 def get_stub_port_from_named_socket(self, read_timeout_seconds=5): 205 # Wait for something to read with a max timeout. 206 (ready_readers, _, _) = select.select( 207 [self.named_pipe_fd], [], [], read_timeout_seconds) 208 self.assertIsNotNone( 209 ready_readers, 210 "write side of pipe has not written anything - stub isn't writing to pipe.") 211 self.assertNotEqual( 212 len(ready_readers), 213 0, 214 "write side of pipe has not written anything - stub isn't writing to pipe.") 215 216 # Read the port from the named pipe. 217 stub_port_raw = self.named_pipe.read() 218 self.assertIsNotNone(stub_port_raw) 219 self.assertNotEqual( 220 len(stub_port_raw), 221 0, 222 "no content to read on pipe") 223 224 # Trim null byte, convert to int. 225 stub_port_raw = stub_port_raw[:-1] 226 stub_port = int(stub_port_raw) 227 self.assertTrue(stub_port > 0) 228 229 return stub_port 230 231 def init_llgs_test(self, use_named_pipe=True): 232 if lldb.remote_platform: 233 # Remote platforms don't support named pipe based port negotiation 234 use_named_pipe = False 235 236 triple = self.dbg.GetSelectedPlatform().GetTriple() 237 if re.match(".*-.*-windows", triple): 238 self.skipTest("Remotely testing is not supported on Windows yet.") 239 240 # Grab the ppid from /proc/[shell pid]/stat 241 err, retcode, shell_stat = self.run_platform_command( 242 "cat /proc/$$/stat") 243 self.assertTrue( 244 err.Success() and retcode == 0, 245 "Failed to read file /proc/$$/stat: %s, retcode: %d" % 246 (err.GetCString(), 247 retcode)) 248 249 # [pid] ([executable]) [state] [*ppid*] 250 pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1) 251 err, retcode, ls_output = self.run_platform_command( 252 "ls -l /proc/%s/exe" % pid) 253 self.assertTrue( 254 err.Success() and retcode == 0, 255 "Failed to read file /proc/%s/exe: %s, retcode: %d" % 256 (pid, 257 err.GetCString(), 258 retcode)) 259 exe = ls_output.split()[-1] 260 261 # If the binary has been deleted, the link name has " (deleted)" appended. 262 # Remove if it's there. 263 self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe) 264 else: 265 # Need to figure out how to create a named pipe on Windows. 266 if platform.system() == 'Windows': 267 use_named_pipe = False 268 269 self.debug_monitor_exe = get_lldb_server_exe() 270 if not self.debug_monitor_exe: 271 self.skipTest("lldb-server exe not found") 272 273 self.debug_monitor_extra_args = ["gdbserver"] 274 self.setUpServerLogging(is_llgs=True) 275 276 if use_named_pipe: 277 (self.named_pipe_path, self.named_pipe, 278 self.named_pipe_fd) = self.create_named_pipe() 279 280 def init_debugserver_test(self, use_named_pipe=True): 281 self.debug_monitor_exe = get_debugserver_exe() 282 if not self.debug_monitor_exe: 283 self.skipTest("debugserver exe not found") 284 self.setUpServerLogging(is_llgs=False) 285 if use_named_pipe: 286 (self.named_pipe_path, self.named_pipe, 287 self.named_pipe_fd) = self.create_named_pipe() 288 # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification 289 # when the process truly dies. 290 self.stub_sends_two_stop_notifications_on_kill = True 291 292 def forward_adb_port(self, source, target, direction, device): 293 adb = ['adb'] + (['-s', device] if device else []) + [direction] 294 295 def remove_port_forward(): 296 subprocess.call(adb + ["--remove", "tcp:%d" % source]) 297 298 subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target]) 299 self.addTearDownHook(remove_port_forward) 300 301 def _verify_socket(self, sock): 302 # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the 303 # connect() attempt. However, due to the way how ADB forwarding works, on android targets 304 # the connect() will always be successful, but the connection will be immediately dropped 305 # if ADB could not connect on the remote side. This function tries to detect this 306 # situation, and report it as "connection refused" so that the upper layers attempt the 307 # connection again. 308 triple = self.dbg.GetSelectedPlatform().GetTriple() 309 if not re.match(".*-.*-.*-android", triple): 310 return # Not android. 311 can_read, _, _ = select.select([sock], [], [], 0.1) 312 if sock not in can_read: 313 return # Data is not available, but the connection is alive. 314 if len(sock.recv(1, socket.MSG_PEEK)) == 0: 315 raise _ConnectionRefused() # Got EOF, connection dropped. 316 317 def create_socket(self): 318 sock = socket.socket() 319 logger = self.logger 320 321 triple = self.dbg.GetSelectedPlatform().GetTriple() 322 if re.match(".*-.*-.*-android", triple): 323 self.forward_adb_port( 324 self.port, 325 self.port, 326 "forward", 327 self.stub_device) 328 329 logger.info( 330 "Connecting to debug monitor on %s:%d", 331 self.stub_hostname, 332 self.port) 333 connect_info = (self.stub_hostname, self.port) 334 try: 335 sock.connect(connect_info) 336 except socket.error as serr: 337 if serr.errno == errno.ECONNREFUSED: 338 raise _ConnectionRefused() 339 raise serr 340 341 def shutdown_socket(): 342 if sock: 343 try: 344 # send the kill packet so lldb-server shuts down gracefully 345 sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET) 346 except: 347 logger.warning( 348 "failed to send kill packet to debug monitor: {}; ignoring".format( 349 sys.exc_info()[0])) 350 351 try: 352 sock.close() 353 except: 354 logger.warning( 355 "failed to close socket to debug monitor: {}; ignoring".format( 356 sys.exc_info()[0])) 357 358 self.addTearDownHook(shutdown_socket) 359 360 self._verify_socket(sock) 361 362 return sock 363 364 def set_inferior_startup_launch(self): 365 self._inferior_startup = self._STARTUP_LAUNCH 366 367 def set_inferior_startup_attach(self): 368 self._inferior_startup = self._STARTUP_ATTACH 369 370 def set_inferior_startup_attach_manually(self): 371 self._inferior_startup = self._STARTUP_ATTACH_MANUALLY 372 373 def get_debug_monitor_command_line_args(self, attach_pid=None): 374 if lldb.remote_platform: 375 commandline_args = self.debug_monitor_extra_args + \ 376 ["*:{}".format(self.port)] 377 else: 378 commandline_args = self.debug_monitor_extra_args + \ 379 ["127.0.0.1:{}".format(self.port)] 380 381 if attach_pid: 382 commandline_args += ["--attach=%d" % attach_pid] 383 if self.named_pipe_path: 384 commandline_args += ["--named-pipe", self.named_pipe_path] 385 return commandline_args 386 387 def get_target_byte_order(self): 388 inferior_exe_path = self.getBuildArtifact("a.out") 389 target = self.dbg.CreateTarget(inferior_exe_path) 390 return target.GetByteOrder() 391 392 def launch_debug_monitor(self, attach_pid=None, logfile=None): 393 # Create the command line. 394 commandline_args = self.get_debug_monitor_command_line_args( 395 attach_pid=attach_pid) 396 397 # Start the server. 398 server = self.spawnSubprocess( 399 self.debug_monitor_exe, 400 commandline_args, 401 install_remote=False) 402 self.addTearDownHook(self.cleanupSubprocesses) 403 self.assertIsNotNone(server) 404 405 # If we're receiving the stub's listening port from the named pipe, do 406 # that here. 407 if self.named_pipe: 408 self.port = self.get_stub_port_from_named_socket() 409 410 return server 411 412 def connect_to_debug_monitor(self, attach_pid=None): 413 if self.named_pipe: 414 # Create the stub. 415 server = self.launch_debug_monitor(attach_pid=attach_pid) 416 self.assertIsNotNone(server) 417 418 def shutdown_debug_monitor(): 419 try: 420 server.terminate() 421 except: 422 logger.warning( 423 "failed to terminate server for debug monitor: {}; ignoring".format( 424 sys.exc_info()[0])) 425 self.addTearDownHook(shutdown_debug_monitor) 426 427 # Schedule debug monitor to be shut down during teardown. 428 logger = self.logger 429 430 # Attach to the stub and return a socket opened to it. 431 self.sock = self.create_socket() 432 return server 433 434 # We're using a random port algorithm to try not to collide with other ports, 435 # and retry a max # times. 436 attempts = 0 437 MAX_ATTEMPTS = 20 438 439 while attempts < MAX_ATTEMPTS: 440 server = self.launch_debug_monitor(attach_pid=attach_pid) 441 442 # Schedule debug monitor to be shut down during teardown. 443 logger = self.logger 444 445 def shutdown_debug_monitor(): 446 try: 447 server.terminate() 448 except: 449 logger.warning( 450 "failed to terminate server for debug monitor: {}; ignoring".format( 451 sys.exc_info()[0])) 452 self.addTearDownHook(shutdown_debug_monitor) 453 454 connect_attemps = 0 455 MAX_CONNECT_ATTEMPTS = 10 456 457 while connect_attemps < MAX_CONNECT_ATTEMPTS: 458 # Create a socket to talk to the server 459 try: 460 logger.info("Connect attempt %d", connect_attemps + 1) 461 self.sock = self.create_socket() 462 return server 463 except _ConnectionRefused as serr: 464 # Ignore, and try again. 465 pass 466 time.sleep(0.5) 467 connect_attemps += 1 468 469 # We should close the server here to be safe. 470 server.terminate() 471 472 # Increment attempts. 473 print( 474 "connect to debug monitor on port %d failed, attempt #%d of %d" % 475 (self.port, attempts + 1, MAX_ATTEMPTS)) 476 attempts += 1 477 478 # And wait a random length of time before next attempt, to avoid 479 # collisions. 480 time.sleep(random.randint(1, 5)) 481 482 # Now grab a new port number. 483 self.port = self.get_next_port() 484 485 raise Exception( 486 "failed to create a socket to the launched debug monitor after %d tries" % 487 attempts) 488 489 def launch_process_for_attach( 490 self, 491 inferior_args=None, 492 sleep_seconds=3, 493 exe_path=None): 494 # We're going to start a child process that the debug monitor stub can later attach to. 495 # This process needs to be started so that it just hangs around for a while. We'll 496 # have it sleep. 497 if not exe_path: 498 exe_path = self.getBuildArtifact("a.out") 499 500 args = [] 501 if inferior_args: 502 args.extend(inferior_args) 503 if sleep_seconds: 504 args.append("sleep:%d" % sleep_seconds) 505 506 inferior = self.spawnSubprocess(exe_path, args) 507 508 def shutdown_process_for_attach(): 509 try: 510 inferior.terminate() 511 except: 512 logger.warning( 513 "failed to terminate inferior process for attach: {}; ignoring".format( 514 sys.exc_info()[0])) 515 self.addTearDownHook(shutdown_process_for_attach) 516 return inferior 517 518 def prep_debug_monitor_and_inferior( 519 self, 520 inferior_args=None, 521 inferior_sleep_seconds=3, 522 inferior_exe_path=None, 523 inferior_env=None): 524 """Prep the debug monitor, the inferior, and the expected packet stream. 525 526 Handle the separate cases of using the debug monitor in attach-to-inferior mode 527 and in launch-inferior mode. 528 529 For attach-to-inferior mode, the inferior process is first started, then 530 the debug monitor is started in attach to pid mode (using --attach on the 531 stub command line), and the no-ack-mode setup is appended to the packet 532 stream. The packet stream is not yet executed, ready to have more expected 533 packet entries added to it. 534 535 For launch-inferior mode, the stub is first started, then no ack mode is 536 setup on the expected packet stream, then the verified launch packets are added 537 to the expected socket stream. The packet stream is not yet executed, ready 538 to have more expected packet entries added to it. 539 540 The return value is: 541 {inferior:<inferior>, server:<server>} 542 """ 543 inferior = None 544 attach_pid = None 545 546 if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY: 547 # Launch the process that we'll use as the inferior. 548 inferior = self.launch_process_for_attach( 549 inferior_args=inferior_args, 550 sleep_seconds=inferior_sleep_seconds, 551 exe_path=inferior_exe_path) 552 self.assertIsNotNone(inferior) 553 self.assertTrue(inferior.pid > 0) 554 if self._inferior_startup == self._STARTUP_ATTACH: 555 # In this case, we want the stub to attach via the command 556 # line, so set the command line attach pid here. 557 attach_pid = inferior.pid 558 559 if self._inferior_startup == self._STARTUP_LAUNCH: 560 # Build launch args 561 if not inferior_exe_path: 562 inferior_exe_path = self.getBuildArtifact("a.out") 563 564 if lldb.remote_platform: 565 remote_path = lldbutil.append_to_process_working_directory(self, 566 os.path.basename(inferior_exe_path)) 567 remote_file_spec = lldb.SBFileSpec(remote_path, False) 568 err = lldb.remote_platform.Install(lldb.SBFileSpec( 569 inferior_exe_path, True), remote_file_spec) 570 if err.Fail(): 571 raise Exception( 572 "remote_platform.Install('%s', '%s') failed: %s" % 573 (inferior_exe_path, remote_path, err)) 574 inferior_exe_path = remote_path 575 576 launch_args = [inferior_exe_path] 577 if inferior_args: 578 launch_args.extend(inferior_args) 579 580 # Launch the debug monitor stub, attaching to the inferior. 581 server = self.connect_to_debug_monitor(attach_pid=attach_pid) 582 self.assertIsNotNone(server) 583 584 # Build the expected protocol stream 585 self.add_no_ack_remote_stream() 586 if inferior_env: 587 for name, value in inferior_env.items(): 588 self.add_set_environment_packets(name, value) 589 if self._inferior_startup == self._STARTUP_LAUNCH: 590 self.add_verified_launch_packets(launch_args) 591 592 return {"inferior": inferior, "server": server} 593 594 def expect_socket_recv( 595 self, 596 sock, 597 expected_content_regex, 598 timeout_seconds): 599 response = "" 600 timeout_time = time.time() + timeout_seconds 601 602 while not expected_content_regex.match( 603 response) and time.time() < timeout_time: 604 can_read, _, _ = select.select([sock], [], [], timeout_seconds) 605 if can_read and sock in can_read: 606 recv_bytes = sock.recv(4096) 607 if recv_bytes: 608 response += seven.bitcast_to_string(recv_bytes) 609 610 self.assertTrue(expected_content_regex.match(response)) 611 612 def expect_socket_send(self, sock, content, timeout_seconds): 613 request_bytes_remaining = content 614 timeout_time = time.time() + timeout_seconds 615 616 while len(request_bytes_remaining) > 0 and time.time() < timeout_time: 617 _, can_write, _ = select.select([], [sock], [], timeout_seconds) 618 if can_write and sock in can_write: 619 written_byte_count = sock.send(request_bytes_remaining.encode()) 620 request_bytes_remaining = request_bytes_remaining[ 621 written_byte_count:] 622 self.assertEqual(len(request_bytes_remaining), 0) 623 624 def do_handshake(self, stub_socket, timeout_seconds=5): 625 # Write the ack. 626 self.expect_socket_send(stub_socket, "+", timeout_seconds) 627 628 # Send the start no ack mode packet. 629 NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0" 630 bytes_sent = stub_socket.send(NO_ACK_MODE_REQUEST.encode()) 631 self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST)) 632 633 # Receive the ack and "OK" 634 self.expect_socket_recv(stub_socket, re.compile( 635 r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds) 636 637 # Send the final ack. 638 self.expect_socket_send(stub_socket, "+", timeout_seconds) 639 640 def add_no_ack_remote_stream(self): 641 self.test_sequence.add_log_lines( 642 ["read packet: +", 643 "read packet: $QStartNoAckMode#b0", 644 "send packet: +", 645 "send packet: $OK#9a", 646 "read packet: +"], 647 True) 648 649 def add_verified_launch_packets(self, launch_args): 650 self.test_sequence.add_log_lines( 651 ["read packet: %s" % build_gdbremote_A_packet(launch_args), 652 "send packet: $OK#00", 653 "read packet: $qLaunchSuccess#a5", 654 "send packet: $OK#00"], 655 True) 656 657 def add_thread_suffix_request_packets(self): 658 self.test_sequence.add_log_lines( 659 ["read packet: $QThreadSuffixSupported#e4", 660 "send packet: $OK#00", 661 ], True) 662 663 def add_process_info_collection_packets(self): 664 self.test_sequence.add_log_lines( 665 ["read packet: $qProcessInfo#dc", 666 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}], 667 True) 668 669 def add_set_environment_packets(self, name, value): 670 self.test_sequence.add_log_lines( 671 ["read packet: $QEnvironment:" + name + "=" + value + "#00", 672 "send packet: $OK#00", 673 ], True) 674 675 _KNOWN_PROCESS_INFO_KEYS = [ 676 "pid", 677 "parent-pid", 678 "real-uid", 679 "real-gid", 680 "effective-uid", 681 "effective-gid", 682 "cputype", 683 "cpusubtype", 684 "ostype", 685 "triple", 686 "vendor", 687 "endian", 688 "elf_abi", 689 "ptrsize" 690 ] 691 692 def parse_process_info_response(self, context): 693 # Ensure we have a process info response. 694 self.assertIsNotNone(context) 695 process_info_raw = context.get("process_info_raw") 696 self.assertIsNotNone(process_info_raw) 697 698 # Pull out key:value; pairs. 699 process_info_dict = { 700 match.group(1): match.group(2) for match in re.finditer( 701 r"([^:]+):([^;]+);", process_info_raw)} 702 703 # Validate keys are known. 704 for (key, val) in list(process_info_dict.items()): 705 self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS) 706 self.assertIsNotNone(val) 707 708 return process_info_dict 709 710 def add_register_info_collection_packets(self): 711 self.test_sequence.add_log_lines( 712 [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True, 713 "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"), 714 "save_key": "reg_info_responses"}], 715 True) 716 717 def parse_register_info_packets(self, context): 718 """Return an array of register info dictionaries, one per register info.""" 719 reg_info_responses = context.get("reg_info_responses") 720 self.assertIsNotNone(reg_info_responses) 721 722 # Parse register infos. 723 return [parse_reg_info_response(reg_info_response) 724 for reg_info_response in reg_info_responses] 725 726 def expect_gdbremote_sequence(self, timeout_seconds=None): 727 if not timeout_seconds: 728 timeout_seconds = self._TIMEOUT_SECONDS 729 return expect_lldb_gdbserver_replay( 730 self, 731 self.sock, 732 self.test_sequence, 733 self._pump_queues, 734 timeout_seconds, 735 self.logger) 736 737 _KNOWN_REGINFO_KEYS = [ 738 "name", 739 "alt-name", 740 "bitsize", 741 "offset", 742 "encoding", 743 "format", 744 "set", 745 "gcc", 746 "ehframe", 747 "dwarf", 748 "generic", 749 "container-regs", 750 "invalidate-regs", 751 "dynamic_size_dwarf_expr_bytes", 752 "dynamic_size_dwarf_len" 753 ] 754 755 def assert_valid_reg_info(self, reg_info): 756 # Assert we know about all the reginfo keys parsed. 757 for key in reg_info: 758 self.assertTrue(key in self._KNOWN_REGINFO_KEYS) 759 760 # Check the bare-minimum expected set of register info keys. 761 self.assertTrue("name" in reg_info) 762 self.assertTrue("bitsize" in reg_info) 763 self.assertTrue("offset" in reg_info) 764 self.assertTrue("encoding" in reg_info) 765 self.assertTrue("format" in reg_info) 766 767 def find_pc_reg_info(self, reg_infos): 768 lldb_reg_index = 0 769 for reg_info in reg_infos: 770 if ("generic" in reg_info) and (reg_info["generic"] == "pc"): 771 return (lldb_reg_index, reg_info) 772 lldb_reg_index += 1 773 774 return (None, None) 775 776 def add_lldb_register_index(self, reg_infos): 777 """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry. 778 779 We'll use this when we want to call packets like P/p with a register index but do so 780 on only a subset of the full register info set. 781 """ 782 self.assertIsNotNone(reg_infos) 783 784 reg_index = 0 785 for reg_info in reg_infos: 786 reg_info["lldb_register_index"] = reg_index 787 reg_index += 1 788 789 def add_query_memory_region_packets(self, address): 790 self.test_sequence.add_log_lines( 791 ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address), 792 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}], 793 True) 794 795 def parse_key_val_dict(self, key_val_text, allow_dupes=True): 796 self.assertIsNotNone(key_val_text) 797 kv_dict = {} 798 for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text): 799 key = match.group(1) 800 val = match.group(2) 801 if key in kv_dict: 802 if allow_dupes: 803 if isinstance(kv_dict[key], list): 804 kv_dict[key].append(val) 805 else: 806 # Promote to list 807 kv_dict[key] = [kv_dict[key], val] 808 else: 809 self.fail( 810 "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format( 811 key, val, key_val_text, kv_dict)) 812 else: 813 kv_dict[key] = val 814 return kv_dict 815 816 def parse_memory_region_packet(self, context): 817 # Ensure we have a context. 818 self.assertIsNotNone(context.get("memory_region_response")) 819 820 # Pull out key:value; pairs. 821 mem_region_dict = self.parse_key_val_dict( 822 context.get("memory_region_response")) 823 824 # Validate keys are known. 825 for (key, val) in list(mem_region_dict.items()): 826 self.assertTrue( 827 key in [ 828 "start", 829 "size", 830 "permissions", 831 "name", 832 "error"]) 833 self.assertIsNotNone(val) 834 835 mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", "")) 836 # Return the dictionary of key-value pairs for the memory region. 837 return mem_region_dict 838 839 def assert_address_within_memory_region( 840 self, test_address, mem_region_dict): 841 self.assertIsNotNone(mem_region_dict) 842 self.assertTrue("start" in mem_region_dict) 843 self.assertTrue("size" in mem_region_dict) 844 845 range_start = int(mem_region_dict["start"], 16) 846 range_size = int(mem_region_dict["size"], 16) 847 range_end = range_start + range_size 848 849 if test_address < range_start: 850 self.fail( 851 "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 852 test_address, 853 range_start, 854 range_end, 855 range_size)) 856 elif test_address >= range_end: 857 self.fail( 858 "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 859 test_address, 860 range_start, 861 range_end, 862 range_size)) 863 864 def add_threadinfo_collection_packets(self): 865 self.test_sequence.add_log_lines( 866 [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo", 867 "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"), 868 "save_key": "threadinfo_responses"}], 869 True) 870 871 def parse_threadinfo_packets(self, context): 872 """Return an array of thread ids (decimal ints), one per thread.""" 873 threadinfo_responses = context.get("threadinfo_responses") 874 self.assertIsNotNone(threadinfo_responses) 875 876 thread_ids = [] 877 for threadinfo_response in threadinfo_responses: 878 new_thread_infos = parse_threadinfo_response(threadinfo_response) 879 thread_ids.extend(new_thread_infos) 880 return thread_ids 881 882 def wait_for_thread_count(self, thread_count, timeout_seconds=3): 883 start_time = time.time() 884 timeout_time = start_time + timeout_seconds 885 886 actual_thread_count = 0 887 while actual_thread_count < thread_count: 888 self.reset_test_sequence() 889 self.add_threadinfo_collection_packets() 890 891 context = self.expect_gdbremote_sequence() 892 self.assertIsNotNone(context) 893 894 threads = self.parse_threadinfo_packets(context) 895 self.assertIsNotNone(threads) 896 897 actual_thread_count = len(threads) 898 899 if time.time() > timeout_time: 900 raise Exception( 901 'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format( 902 timeout_seconds, thread_count, actual_thread_count)) 903 904 return threads 905 906 def add_set_breakpoint_packets( 907 self, 908 address, 909 z_packet_type=0, 910 do_continue=True, 911 breakpoint_kind=1): 912 self.test_sequence.add_log_lines( 913 [ # Set the breakpoint. 914 "read packet: $Z{2},{0:x},{1}#00".format( 915 address, breakpoint_kind, z_packet_type), 916 # Verify the stub could set it. 917 "send packet: $OK#00", 918 ], True) 919 920 if (do_continue): 921 self.test_sequence.add_log_lines( 922 [ # Continue the inferior. 923 "read packet: $c#63", 924 # Expect a breakpoint stop report. 925 {"direction": "send", 926 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 927 "capture": {1: "stop_signo", 928 2: "stop_thread_id"}}, 929 ], True) 930 931 def add_remove_breakpoint_packets( 932 self, 933 address, 934 z_packet_type=0, 935 breakpoint_kind=1): 936 self.test_sequence.add_log_lines( 937 [ # Remove the breakpoint. 938 "read packet: $z{2},{0:x},{1}#00".format( 939 address, breakpoint_kind, z_packet_type), 940 # Verify the stub could unset it. 941 "send packet: $OK#00", 942 ], True) 943 944 def add_qSupported_packets(self): 945 self.test_sequence.add_log_lines( 946 ["read packet: $qSupported#00", 947 {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}}, 948 ], True) 949 950 _KNOWN_QSUPPORTED_STUB_FEATURES = [ 951 "augmented-libraries-svr4-read", 952 "PacketSize", 953 "QStartNoAckMode", 954 "QThreadSuffixSupported", 955 "QListThreadsInStopReply", 956 "qXfer:auxv:read", 957 "qXfer:libraries:read", 958 "qXfer:libraries-svr4:read", 959 "qXfer:features:read", 960 "qEcho", 961 "QPassSignals" 962 ] 963 964 def parse_qSupported_response(self, context): 965 self.assertIsNotNone(context) 966 967 raw_response = context.get("qSupported_response") 968 self.assertIsNotNone(raw_response) 969 970 # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the 971 # +,-,? is stripped from the key and set as the value. 972 supported_dict = {} 973 for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response): 974 key = match.group(1) 975 val = match.group(3) 976 977 # key=val: store as is 978 if val and len(val) > 0: 979 supported_dict[key] = val 980 else: 981 if len(key) < 2: 982 raise Exception( 983 "singular stub feature is too short: must be stub_feature{+,-,?}") 984 supported_type = key[-1] 985 key = key[:-1] 986 if not supported_type in ["+", "-", "?"]: 987 raise Exception( 988 "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type)) 989 supported_dict[key] = supported_type 990 # Ensure we know the supported element 991 if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES: 992 raise Exception( 993 "unknown qSupported stub feature reported: %s" % 994 key) 995 996 return supported_dict 997 998 def run_process_then_stop(self, run_seconds=1): 999 # Tell the stub to continue. 1000 self.test_sequence.add_log_lines( 1001 ["read packet: $vCont;c#a8"], 1002 True) 1003 context = self.expect_gdbremote_sequence() 1004 1005 # Wait for run_seconds. 1006 time.sleep(run_seconds) 1007 1008 # Send an interrupt, capture a T response. 1009 self.reset_test_sequence() 1010 self.test_sequence.add_log_lines( 1011 ["read packet: {}".format(chr(3)), 1012 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}], 1013 True) 1014 context = self.expect_gdbremote_sequence() 1015 self.assertIsNotNone(context) 1016 self.assertIsNotNone(context.get("stop_result")) 1017 1018 return context 1019 1020 def continue_process_and_wait_for_stop(self): 1021 self.test_sequence.add_log_lines( 1022 [ 1023 "read packet: $vCont;c#a8", 1024 { 1025 "direction": "send", 1026 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 1027 "capture": {1: "stop_signo", 2: "stop_key_val_text"}, 1028 }, 1029 ], 1030 True, 1031 ) 1032 context = self.expect_gdbremote_sequence() 1033 self.assertIsNotNone(context) 1034 return self.parse_interrupt_packets(context) 1035 1036 def select_modifiable_register(self, reg_infos): 1037 """Find a register that can be read/written freely.""" 1038 PREFERRED_REGISTER_NAMES = set(["rax", ]) 1039 1040 # First check for the first register from the preferred register name 1041 # set. 1042 alternative_register_index = None 1043 1044 self.assertIsNotNone(reg_infos) 1045 for reg_info in reg_infos: 1046 if ("name" in reg_info) and ( 1047 reg_info["name"] in PREFERRED_REGISTER_NAMES): 1048 # We found a preferred register. Use it. 1049 return reg_info["lldb_register_index"] 1050 if ("generic" in reg_info) and (reg_info["generic"] == "fp" or 1051 reg_info["generic"] == "arg1"): 1052 # A frame pointer or first arg register will do as a 1053 # register to modify temporarily. 1054 alternative_register_index = reg_info["lldb_register_index"] 1055 1056 # We didn't find a preferred register. Return whatever alternative register 1057 # we found, if any. 1058 return alternative_register_index 1059 1060 def extract_registers_from_stop_notification(self, stop_key_vals_text): 1061 self.assertIsNotNone(stop_key_vals_text) 1062 kv_dict = self.parse_key_val_dict(stop_key_vals_text) 1063 1064 registers = {} 1065 for (key, val) in list(kv_dict.items()): 1066 if re.match(r"^[0-9a-fA-F]+$", key): 1067 registers[int(key, 16)] = val 1068 return registers 1069 1070 def gather_register_infos(self): 1071 self.reset_test_sequence() 1072 self.add_register_info_collection_packets() 1073 1074 context = self.expect_gdbremote_sequence() 1075 self.assertIsNotNone(context) 1076 1077 reg_infos = self.parse_register_info_packets(context) 1078 self.assertIsNotNone(reg_infos) 1079 self.add_lldb_register_index(reg_infos) 1080 1081 return reg_infos 1082 1083 def find_generic_register_with_name(self, reg_infos, generic_name): 1084 self.assertIsNotNone(reg_infos) 1085 for reg_info in reg_infos: 1086 if ("generic" in reg_info) and ( 1087 reg_info["generic"] == generic_name): 1088 return reg_info 1089 return None 1090 1091 def decode_gdbremote_binary(self, encoded_bytes): 1092 decoded_bytes = "" 1093 i = 0 1094 while i < len(encoded_bytes): 1095 if encoded_bytes[i] == "}": 1096 # Handle escaped char. 1097 self.assertTrue(i + 1 < len(encoded_bytes)) 1098 decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20) 1099 i += 2 1100 elif encoded_bytes[i] == "*": 1101 # Handle run length encoding. 1102 self.assertTrue(len(decoded_bytes) > 0) 1103 self.assertTrue(i + 1 < len(encoded_bytes)) 1104 repeat_count = ord(encoded_bytes[i + 1]) - 29 1105 decoded_bytes += decoded_bytes[-1] * repeat_count 1106 i += 2 1107 else: 1108 decoded_bytes += encoded_bytes[i] 1109 i += 1 1110 return decoded_bytes 1111 1112 def build_auxv_dict(self, endian, word_size, auxv_data): 1113 self.assertIsNotNone(endian) 1114 self.assertIsNotNone(word_size) 1115 self.assertIsNotNone(auxv_data) 1116 1117 auxv_dict = {} 1118 1119 # PowerPC64le's auxvec has a special key that must be ignored. 1120 # This special key may be used multiple times, resulting in 1121 # multiple key/value pairs with the same key, which would otherwise 1122 # break this test check for repeated keys. 1123 # 1124 # AT_IGNOREPPC = 22 1125 ignored_keys_for_arch = { 'powerpc64le' : [22] } 1126 arch = self.getArchitecture() 1127 ignore_keys = None 1128 if arch in ignored_keys_for_arch: 1129 ignore_keys = ignored_keys_for_arch[arch] 1130 1131 while len(auxv_data) > 0: 1132 # Chop off key. 1133 raw_key = auxv_data[:word_size] 1134 auxv_data = auxv_data[word_size:] 1135 1136 # Chop of value. 1137 raw_value = auxv_data[:word_size] 1138 auxv_data = auxv_data[word_size:] 1139 1140 # Convert raw text from target endian. 1141 key = unpack_endian_binary_string(endian, raw_key) 1142 value = unpack_endian_binary_string(endian, raw_value) 1143 1144 if ignore_keys and key in ignore_keys: 1145 continue 1146 1147 # Handle ending entry. 1148 if key == 0: 1149 self.assertEqual(value, 0) 1150 return auxv_dict 1151 1152 # The key should not already be present. 1153 self.assertFalse(key in auxv_dict) 1154 auxv_dict[key] = value 1155 1156 self.fail( 1157 "should not reach here - implies required double zero entry not found") 1158 return auxv_dict 1159 1160 def read_binary_data_in_chunks(self, command_prefix, chunk_length): 1161 """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned.""" 1162 offset = 0 1163 done = False 1164 decoded_data = "" 1165 1166 while not done: 1167 # Grab the next iteration of data. 1168 self.reset_test_sequence() 1169 self.test_sequence.add_log_lines( 1170 [ 1171 "read packet: ${}{:x},{:x}:#00".format( 1172 command_prefix, 1173 offset, 1174 chunk_length), 1175 { 1176 "direction": "send", 1177 "regex": re.compile( 1178 r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", 1179 re.MULTILINE | re.DOTALL), 1180 "capture": { 1181 1: "response_type", 1182 2: "content_raw"}}], 1183 True) 1184 1185 context = self.expect_gdbremote_sequence() 1186 self.assertIsNotNone(context) 1187 1188 response_type = context.get("response_type") 1189 self.assertIsNotNone(response_type) 1190 self.assertTrue(response_type in ["l", "m"]) 1191 1192 # Move offset along. 1193 offset += chunk_length 1194 1195 # Figure out if we're done. We're done if the response type is l. 1196 done = response_type == "l" 1197 1198 # Decode binary data. 1199 content_raw = context.get("content_raw") 1200 if content_raw and len(content_raw) > 0: 1201 self.assertIsNotNone(content_raw) 1202 decoded_data += self.decode_gdbremote_binary(content_raw) 1203 return decoded_data 1204 1205 def add_interrupt_packets(self): 1206 self.test_sequence.add_log_lines([ 1207 # Send the intterupt. 1208 "read packet: {}".format(chr(3)), 1209 # And wait for the stop notification. 1210 {"direction": "send", 1211 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 1212 "capture": {1: "stop_signo", 1213 2: "stop_key_val_text"}}, 1214 ], True) 1215 1216 def parse_interrupt_packets(self, context): 1217 self.assertIsNotNone(context.get("stop_signo")) 1218 self.assertIsNotNone(context.get("stop_key_val_text")) 1219 return (int(context["stop_signo"], 16), self.parse_key_val_dict( 1220 context["stop_key_val_text"])) 1221 1222 def add_QSaveRegisterState_packets(self, thread_id): 1223 if thread_id: 1224 # Use the thread suffix form. 1225 request = "read packet: $QSaveRegisterState;thread:{:x}#00".format( 1226 thread_id) 1227 else: 1228 request = "read packet: $QSaveRegisterState#00" 1229 1230 self.test_sequence.add_log_lines([request, 1231 {"direction": "send", 1232 "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$", 1233 "capture": {1: "save_response"}}, 1234 ], 1235 True) 1236 1237 def parse_QSaveRegisterState_response(self, context): 1238 self.assertIsNotNone(context) 1239 1240 save_response = context.get("save_response") 1241 self.assertIsNotNone(save_response) 1242 1243 if len(save_response) < 1 or save_response[0] == "E": 1244 # error received 1245 return (False, None) 1246 else: 1247 return (True, int(save_response)) 1248 1249 def add_QRestoreRegisterState_packets(self, save_id, thread_id=None): 1250 if thread_id: 1251 # Use the thread suffix form. 1252 request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format( 1253 save_id, thread_id) 1254 else: 1255 request = "read packet: $QRestoreRegisterState:{}#00".format( 1256 save_id) 1257 1258 self.test_sequence.add_log_lines([ 1259 request, 1260 "send packet: $OK#00" 1261 ], True) 1262 1263 def flip_all_bits_in_each_register_value( 1264 self, reg_infos, endian, thread_id=None): 1265 self.assertIsNotNone(reg_infos) 1266 1267 successful_writes = 0 1268 failed_writes = 0 1269 1270 for reg_info in reg_infos: 1271 # Use the lldb register index added to the reg info. We're not necessarily 1272 # working off a full set of register infos, so an inferred register 1273 # index could be wrong. 1274 reg_index = reg_info["lldb_register_index"] 1275 self.assertIsNotNone(reg_index) 1276 1277 reg_byte_size = int(reg_info["bitsize"]) // 8 1278 self.assertTrue(reg_byte_size > 0) 1279 1280 # Handle thread suffix. 1281 if thread_id: 1282 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1283 reg_index, thread_id) 1284 else: 1285 p_request = "read packet: $p{:x}#00".format(reg_index) 1286 1287 # Read the existing value. 1288 self.reset_test_sequence() 1289 self.test_sequence.add_log_lines([ 1290 p_request, 1291 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1292 ], True) 1293 context = self.expect_gdbremote_sequence() 1294 self.assertIsNotNone(context) 1295 1296 # Verify the response length. 1297 p_response = context.get("p_response") 1298 self.assertIsNotNone(p_response) 1299 initial_reg_value = unpack_register_hex_unsigned( 1300 endian, p_response) 1301 1302 # Flip the value by xoring with all 1s 1303 all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8) 1304 flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16) 1305 # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int)) 1306 1307 # Handle thread suffix for P. 1308 if thread_id: 1309 P_request = "read packet: $P{:x}={};thread:{:x}#00".format( 1310 reg_index, pack_register_hex( 1311 endian, flipped_bits_int, byte_size=reg_byte_size), thread_id) 1312 else: 1313 P_request = "read packet: $P{:x}={}#00".format( 1314 reg_index, pack_register_hex( 1315 endian, flipped_bits_int, byte_size=reg_byte_size)) 1316 1317 # Write the flipped value to the register. 1318 self.reset_test_sequence() 1319 self.test_sequence.add_log_lines([P_request, 1320 {"direction": "send", 1321 "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", 1322 "capture": {1: "P_response"}}, 1323 ], 1324 True) 1325 context = self.expect_gdbremote_sequence() 1326 self.assertIsNotNone(context) 1327 1328 # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail 1329 # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them 1330 # all flipping perfectly. 1331 P_response = context.get("P_response") 1332 self.assertIsNotNone(P_response) 1333 if P_response == "OK": 1334 successful_writes += 1 1335 else: 1336 failed_writes += 1 1337 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response)) 1338 1339 # Read back the register value, ensure it matches the flipped 1340 # value. 1341 if P_response == "OK": 1342 self.reset_test_sequence() 1343 self.test_sequence.add_log_lines([ 1344 p_request, 1345 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1346 ], True) 1347 context = self.expect_gdbremote_sequence() 1348 self.assertIsNotNone(context) 1349 1350 verify_p_response_raw = context.get("p_response") 1351 self.assertIsNotNone(verify_p_response_raw) 1352 verify_bits = unpack_register_hex_unsigned( 1353 endian, verify_p_response_raw) 1354 1355 if verify_bits != flipped_bits_int: 1356 # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts. 1357 # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits)) 1358 successful_writes -= 1 1359 failed_writes += 1 1360 1361 return (successful_writes, failed_writes) 1362 1363 def is_bit_flippable_register(self, reg_info): 1364 if not reg_info: 1365 return False 1366 if not "set" in reg_info: 1367 return False 1368 if reg_info["set"] != "General Purpose Registers": 1369 return False 1370 if ("container-regs" in reg_info) and ( 1371 len(reg_info["container-regs"]) > 0): 1372 # Don't try to bit flip registers contained in another register. 1373 return False 1374 if re.match("^.s$", reg_info["name"]): 1375 # This is a 2-letter register name that ends in "s", like a segment register. 1376 # Don't try to bit flip these. 1377 return False 1378 if re.match("^(c|)psr$", reg_info["name"]): 1379 # This is an ARM program status register; don't flip it. 1380 return False 1381 # Okay, this looks fine-enough. 1382 return True 1383 1384 def read_register_values(self, reg_infos, endian, thread_id=None): 1385 self.assertIsNotNone(reg_infos) 1386 values = {} 1387 1388 for reg_info in reg_infos: 1389 # We append a register index when load reg infos so we can work 1390 # with subsets. 1391 reg_index = reg_info.get("lldb_register_index") 1392 self.assertIsNotNone(reg_index) 1393 1394 # Handle thread suffix. 1395 if thread_id: 1396 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1397 reg_index, thread_id) 1398 else: 1399 p_request = "read packet: $p{:x}#00".format(reg_index) 1400 1401 # Read it with p. 1402 self.reset_test_sequence() 1403 self.test_sequence.add_log_lines([ 1404 p_request, 1405 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1406 ], True) 1407 context = self.expect_gdbremote_sequence() 1408 self.assertIsNotNone(context) 1409 1410 # Convert value from target endian to integral. 1411 p_response = context.get("p_response") 1412 self.assertIsNotNone(p_response) 1413 self.assertTrue(len(p_response) > 0) 1414 self.assertFalse(p_response[0] == "E") 1415 1416 values[reg_index] = unpack_register_hex_unsigned( 1417 endian, p_response) 1418 1419 return values 1420 1421 def add_vCont_query_packets(self): 1422 self.test_sequence.add_log_lines(["read packet: $vCont?#49", 1423 {"direction": "send", 1424 "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", 1425 "capture": {2: "vCont_query_response"}}, 1426 ], 1427 True) 1428 1429 def parse_vCont_query_response(self, context): 1430 self.assertIsNotNone(context) 1431 vCont_query_response = context.get("vCont_query_response") 1432 1433 # Handle case of no vCont support at all - in which case the capture 1434 # group will be none or zero length. 1435 if not vCont_query_response or len(vCont_query_response) == 0: 1436 return {} 1437 1438 return {key: 1 for key in vCont_query_response.split( 1439 ";") if key and len(key) > 0} 1440 1441 def count_single_steps_until_true( 1442 self, 1443 thread_id, 1444 predicate, 1445 args, 1446 max_step_count=100, 1447 use_Hc_packet=True, 1448 step_instruction="s"): 1449 """Used by single step test that appears in a few different contexts.""" 1450 single_step_count = 0 1451 1452 while single_step_count < max_step_count: 1453 self.assertIsNotNone(thread_id) 1454 1455 # Build the packet for the single step instruction. We replace 1456 # {thread}, if present, with the thread_id. 1457 step_packet = "read packet: ${}#00".format( 1458 re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction)) 1459 # print("\nstep_packet created: {}\n".format(step_packet)) 1460 1461 # Single step. 1462 self.reset_test_sequence() 1463 if use_Hc_packet: 1464 self.test_sequence.add_log_lines( 1465 [ # Set the continue thread. 1466 "read packet: $Hc{0:x}#00".format(thread_id), 1467 "send packet: $OK#00", 1468 ], True) 1469 self.test_sequence.add_log_lines([ 1470 # Single step. 1471 step_packet, 1472 # "read packet: $vCont;s:{0:x}#00".format(thread_id), 1473 # Expect a breakpoint stop report. 1474 {"direction": "send", 1475 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 1476 "capture": {1: "stop_signo", 1477 2: "stop_thread_id"}}, 1478 ], True) 1479 context = self.expect_gdbremote_sequence() 1480 self.assertIsNotNone(context) 1481 self.assertIsNotNone(context.get("stop_signo")) 1482 self.assertEqual(int(context.get("stop_signo"), 16), 1483 lldbutil.get_signal_number('SIGTRAP')) 1484 1485 single_step_count += 1 1486 1487 # See if the predicate is true. If so, we're done. 1488 if predicate(args): 1489 return (True, single_step_count) 1490 1491 # The predicate didn't return true within the runaway step count. 1492 return (False, single_step_count) 1493 1494 def g_c1_c2_contents_are(self, args): 1495 """Used by single step test that appears in a few different contexts.""" 1496 g_c1_address = args["g_c1_address"] 1497 g_c2_address = args["g_c2_address"] 1498 expected_g_c1 = args["expected_g_c1"] 1499 expected_g_c2 = args["expected_g_c2"] 1500 1501 # Read g_c1 and g_c2 contents. 1502 self.reset_test_sequence() 1503 self.test_sequence.add_log_lines( 1504 ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1), 1505 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}}, 1506 "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1), 1507 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}], 1508 True) 1509 1510 # Run the packet stream. 1511 context = self.expect_gdbremote_sequence() 1512 self.assertIsNotNone(context) 1513 1514 # Check if what we read from inferior memory is what we are expecting. 1515 self.assertIsNotNone(context.get("g_c1_contents")) 1516 self.assertIsNotNone(context.get("g_c2_contents")) 1517 1518 return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and ( 1519 seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2) 1520 1521 def single_step_only_steps_one_instruction( 1522 self, use_Hc_packet=True, step_instruction="s"): 1523 """Used by single step test that appears in a few different contexts.""" 1524 # Start up the inferior. 1525 procs = self.prep_debug_monitor_and_inferior( 1526 inferior_args=[ 1527 "get-code-address-hex:swap_chars", 1528 "get-data-address-hex:g_c1", 1529 "get-data-address-hex:g_c2", 1530 "sleep:1", 1531 "call-function:swap_chars", 1532 "sleep:5"]) 1533 1534 # Run the process 1535 self.test_sequence.add_log_lines( 1536 [ # Start running after initial stop. 1537 "read packet: $c#63", 1538 # Match output line that prints the memory address of the function call entry point. 1539 # Note we require launch-only testing so we can get inferior otuput. 1540 {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", 1541 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}}, 1542 # Now stop the inferior. 1543 "read packet: {}".format(chr(3)), 1544 # And wait for the stop notification. 1545 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 1546 True) 1547 1548 # Run the packet stream. 1549 context = self.expect_gdbremote_sequence() 1550 self.assertIsNotNone(context) 1551 1552 # Grab the main thread id. 1553 self.assertIsNotNone(context.get("stop_thread_id")) 1554 main_thread_id = int(context.get("stop_thread_id"), 16) 1555 1556 # Grab the function address. 1557 self.assertIsNotNone(context.get("function_address")) 1558 function_address = int(context.get("function_address"), 16) 1559 1560 # Grab the data addresses. 1561 self.assertIsNotNone(context.get("g_c1_address")) 1562 g_c1_address = int(context.get("g_c1_address"), 16) 1563 1564 self.assertIsNotNone(context.get("g_c2_address")) 1565 g_c2_address = int(context.get("g_c2_address"), 16) 1566 1567 # Set a breakpoint at the given address. 1568 if self.getArchitecture() == "arm": 1569 # TODO: Handle case when setting breakpoint in thumb code 1570 BREAKPOINT_KIND = 4 1571 else: 1572 BREAKPOINT_KIND = 1 1573 self.reset_test_sequence() 1574 self.add_set_breakpoint_packets( 1575 function_address, 1576 do_continue=True, 1577 breakpoint_kind=BREAKPOINT_KIND) 1578 context = self.expect_gdbremote_sequence() 1579 self.assertIsNotNone(context) 1580 1581 # Remove the breakpoint. 1582 self.reset_test_sequence() 1583 self.add_remove_breakpoint_packets( 1584 function_address, breakpoint_kind=BREAKPOINT_KIND) 1585 context = self.expect_gdbremote_sequence() 1586 self.assertIsNotNone(context) 1587 1588 # Verify g_c1 and g_c2 match expected initial state. 1589 args = {} 1590 args["g_c1_address"] = g_c1_address 1591 args["g_c2_address"] = g_c2_address 1592 args["expected_g_c1"] = "0" 1593 args["expected_g_c2"] = "1" 1594 1595 self.assertTrue(self.g_c1_c2_contents_are(args)) 1596 1597 # Verify we take only a small number of steps to hit the first state. 1598 # Might need to work through function entry prologue code. 1599 args["expected_g_c1"] = "1" 1600 args["expected_g_c2"] = "1" 1601 (state_reached, 1602 step_count) = self.count_single_steps_until_true(main_thread_id, 1603 self.g_c1_c2_contents_are, 1604 args, 1605 max_step_count=25, 1606 use_Hc_packet=use_Hc_packet, 1607 step_instruction=step_instruction) 1608 self.assertTrue(state_reached) 1609 1610 # Verify we hit the next state. 1611 args["expected_g_c1"] = "1" 1612 args["expected_g_c2"] = "0" 1613 (state_reached, 1614 step_count) = self.count_single_steps_until_true(main_thread_id, 1615 self.g_c1_c2_contents_are, 1616 args, 1617 max_step_count=5, 1618 use_Hc_packet=use_Hc_packet, 1619 step_instruction=step_instruction) 1620 self.assertTrue(state_reached) 1621 expected_step_count = 1 1622 arch = self.getArchitecture() 1623 1624 # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation 1625 # of variable value 1626 if re.match("mips", arch): 1627 expected_step_count = 3 1628 # S390X requires "2" (LARL, MVI) machine instructions for updation of 1629 # variable value 1630 if re.match("s390x", arch): 1631 expected_step_count = 2 1632 self.assertEqual(step_count, expected_step_count) 1633 1634 # Verify we hit the next state. 1635 args["expected_g_c1"] = "0" 1636 args["expected_g_c2"] = "0" 1637 (state_reached, 1638 step_count) = self.count_single_steps_until_true(main_thread_id, 1639 self.g_c1_c2_contents_are, 1640 args, 1641 max_step_count=5, 1642 use_Hc_packet=use_Hc_packet, 1643 step_instruction=step_instruction) 1644 self.assertTrue(state_reached) 1645 self.assertEqual(step_count, expected_step_count) 1646 1647 # Verify we hit the next state. 1648 args["expected_g_c1"] = "0" 1649 args["expected_g_c2"] = "1" 1650 (state_reached, 1651 step_count) = self.count_single_steps_until_true(main_thread_id, 1652 self.g_c1_c2_contents_are, 1653 args, 1654 max_step_count=5, 1655 use_Hc_packet=use_Hc_packet, 1656 step_instruction=step_instruction) 1657 self.assertTrue(state_reached) 1658 self.assertEqual(step_count, expected_step_count) 1659 1660 def maybe_strict_output_regex(self, regex): 1661 return '.*' + regex + \ 1662 '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$' 1663 1664 def install_and_create_launch_args(self): 1665 exe_path = self.getBuildArtifact("a.out") 1666 if not lldb.remote_platform: 1667 return [exe_path] 1668 remote_path = lldbutil.append_to_process_working_directory(self, 1669 os.path.basename(exe_path)) 1670 remote_file_spec = lldb.SBFileSpec(remote_path, False) 1671 err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True), 1672 remote_file_spec) 1673 if err.Fail(): 1674 raise Exception("remote_platform.Install('%s', '%s') failed: %s" % 1675 (exe_path, remote_path, err)) 1676 return [remote_path] 1677