1""" 2Base class for gdb-remote test cases. 3""" 4 5from __future__ import print_function 6 7 8import errno 9import os 10import os.path 11import platform 12import random 13import re 14import select 15import signal 16import socket 17import subprocess 18import sys 19import tempfile 20import time 21from lldbsuite.test import configuration 22from lldbsuite.test.lldbtest import * 23from lldbgdbserverutils import * 24import logging 25 26 27class _ConnectionRefused(IOError): 28 pass 29 30 31class GdbRemoteTestCaseBase(TestBase): 32 33 NO_DEBUG_INFO_TESTCASE = True 34 35 _TIMEOUT_SECONDS = 7 36 37 _GDBREMOTE_KILL_PACKET = "$k#6b" 38 39 # Start the inferior separately, attach to the inferior on the stub 40 # command line. 41 _STARTUP_ATTACH = "attach" 42 # Start the inferior separately, start the stub without attaching, allow 43 # the test to attach to the inferior however it wants (e.g. $vAttach;pid). 44 _STARTUP_ATTACH_MANUALLY = "attach_manually" 45 # Start the stub, and launch the inferior with an $A packet via the 46 # initial packet stream. 47 _STARTUP_LAUNCH = "launch" 48 49 # GDB Signal numbers that are not target-specific used for common 50 # exceptions 51 TARGET_EXC_BAD_ACCESS = 0x91 52 TARGET_EXC_BAD_INSTRUCTION = 0x92 53 TARGET_EXC_ARITHMETIC = 0x93 54 TARGET_EXC_EMULATION = 0x94 55 TARGET_EXC_SOFTWARE = 0x95 56 TARGET_EXC_BREAKPOINT = 0x96 57 58 _verbose_log_handler = None 59 _log_formatter = logging.Formatter( 60 fmt='%(asctime)-15s %(levelname)-8s %(message)s') 61 62 def setUpBaseLogging(self): 63 self.logger = logging.getLogger(__name__) 64 65 if len(self.logger.handlers) > 0: 66 return # We have set up this handler already 67 68 self.logger.propagate = False 69 self.logger.setLevel(logging.DEBUG) 70 71 # log all warnings to stderr 72 handler = logging.StreamHandler() 73 handler.setLevel(logging.WARNING) 74 handler.setFormatter(self._log_formatter) 75 self.logger.addHandler(handler) 76 77 def isVerboseLoggingRequested(self): 78 # We will report our detailed logs if the user requested that the "gdb-remote" channel is 79 # logged. 80 return any(("gdb-remote" in channel) 81 for channel in lldbtest_config.channels) 82 83 def setUp(self): 84 TestBase.setUp(self) 85 86 self.setUpBaseLogging() 87 self.debug_monitor_extra_args = [] 88 self._pump_queues = socket_packet_pump.PumpQueues() 89 90 if self.isVerboseLoggingRequested(): 91 # If requested, full logs go to a log file 92 self._verbose_log_handler = logging.FileHandler( 93 self.log_basename + "-host.log") 94 self._verbose_log_handler.setFormatter(self._log_formatter) 95 self._verbose_log_handler.setLevel(logging.DEBUG) 96 self.logger.addHandler(self._verbose_log_handler) 97 98 self.test_sequence = GdbRemoteTestSequence(self.logger) 99 self.set_inferior_startup_launch() 100 self.port = self.get_next_port() 101 self.named_pipe_path = None 102 self.named_pipe = None 103 self.named_pipe_fd = None 104 self.stub_sends_two_stop_notifications_on_kill = False 105 if configuration.lldb_platform_url: 106 if configuration.lldb_platform_url.startswith('unix-'): 107 url_pattern = '(.+)://\[?(.+?)\]?/.*' 108 else: 109 url_pattern = '(.+)://(.+):\d+' 110 scheme, host = re.match( 111 url_pattern, configuration.lldb_platform_url).groups() 112 if configuration.lldb_platform_name == 'remote-android' and host != 'localhost': 113 self.stub_device = host 114 self.stub_hostname = 'localhost' 115 else: 116 self.stub_device = None 117 self.stub_hostname = host 118 else: 119 self.stub_hostname = "localhost" 120 121 def tearDown(self): 122 self._pump_queues.verify_queues_empty() 123 124 self.logger.removeHandler(self._verbose_log_handler) 125 self._verbose_log_handler = None 126 TestBase.tearDown(self) 127 128 def getLocalServerLogFile(self): 129 return self.log_basename + "-server.log" 130 131 def setUpServerLogging(self, is_llgs): 132 if len(lldbtest_config.channels) == 0: 133 return # No logging requested 134 135 if lldb.remote_platform: 136 log_file = lldbutil.join_remote_paths( 137 lldb.remote_platform.GetWorkingDirectory(), "server.log") 138 else: 139 log_file = self.getLocalServerLogFile() 140 141 if is_llgs: 142 self.debug_monitor_extra_args.append("--log-file=" + log_file) 143 self.debug_monitor_extra_args.append( 144 "--log-channels={}".format(":".join(lldbtest_config.channels))) 145 else: 146 self.debug_monitor_extra_args = [ 147 "--log-file=" + log_file, "--log-flags=0x800000"] 148 149 def get_next_port(self): 150 return 12000 + random.randint(0, 3999) 151 152 def reset_test_sequence(self): 153 self.test_sequence = GdbRemoteTestSequence(self.logger) 154 155 def create_named_pipe(self): 156 # Create a temp dir and name for a pipe. 157 temp_dir = tempfile.mkdtemp() 158 named_pipe_path = os.path.join(temp_dir, "stub_port_number") 159 160 # Create the named pipe. 161 os.mkfifo(named_pipe_path) 162 163 # Open the read side of the pipe in non-blocking mode. This will 164 # return right away, ready or not. 165 named_pipe_fd = os.open(named_pipe_path, os.O_RDONLY | os.O_NONBLOCK) 166 167 # Create the file for the named pipe. Note this will follow semantics of 168 # a non-blocking read side of a named pipe, which has different semantics 169 # than a named pipe opened for read in non-blocking mode. 170 named_pipe = os.fdopen(named_pipe_fd, "r") 171 self.assertIsNotNone(named_pipe) 172 173 def shutdown_named_pipe(): 174 # Close the pipe. 175 try: 176 named_pipe.close() 177 except: 178 print("failed to close named pipe") 179 None 180 181 # Delete the pipe. 182 try: 183 os.remove(named_pipe_path) 184 except: 185 print("failed to delete named pipe: {}".format(named_pipe_path)) 186 None 187 188 # Delete the temp directory. 189 try: 190 os.rmdir(temp_dir) 191 except: 192 print( 193 "failed to delete temp dir: {}, directory contents: '{}'".format( 194 temp_dir, os.listdir(temp_dir))) 195 None 196 197 # Add the shutdown hook to clean up the named pipe. 198 self.addTearDownHook(shutdown_named_pipe) 199 200 # Clear the port so the stub selects a port number. 201 self.port = 0 202 203 return (named_pipe_path, named_pipe, named_pipe_fd) 204 205 def get_stub_port_from_named_socket(self, read_timeout_seconds=5): 206 # Wait for something to read with a max timeout. 207 (ready_readers, _, _) = select.select( 208 [self.named_pipe_fd], [], [], read_timeout_seconds) 209 self.assertIsNotNone( 210 ready_readers, 211 "write side of pipe has not written anything - stub isn't writing to pipe.") 212 self.assertNotEqual( 213 len(ready_readers), 214 0, 215 "write side of pipe has not written anything - stub isn't writing to pipe.") 216 217 # Read the port from the named pipe. 218 stub_port_raw = self.named_pipe.read() 219 self.assertIsNotNone(stub_port_raw) 220 self.assertNotEqual( 221 len(stub_port_raw), 222 0, 223 "no content to read on pipe") 224 225 # Trim null byte, convert to int. 226 stub_port_raw = stub_port_raw[:-1] 227 stub_port = int(stub_port_raw) 228 self.assertTrue(stub_port > 0) 229 230 return stub_port 231 232 def init_llgs_test(self, use_named_pipe=True): 233 if lldb.remote_platform: 234 # Remote platforms don't support named pipe based port negotiation 235 use_named_pipe = False 236 237 # Grab the ppid from /proc/[shell pid]/stat 238 err, retcode, shell_stat = self.run_platform_command( 239 "cat /proc/$$/stat") 240 self.assertTrue( 241 err.Success() and retcode == 0, 242 "Failed to read file /proc/$$/stat: %s, retcode: %d" % 243 (err.GetCString(), 244 retcode)) 245 246 # [pid] ([executable]) [state] [*ppid*] 247 pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1) 248 err, retcode, ls_output = self.run_platform_command( 249 "ls -l /proc/%s/exe" % pid) 250 self.assertTrue( 251 err.Success() and retcode == 0, 252 "Failed to read file /proc/%s/exe: %s, retcode: %d" % 253 (pid, 254 err.GetCString(), 255 retcode)) 256 exe = ls_output.split()[-1] 257 258 # If the binary has been deleted, the link name has " (deleted)" appended. 259 # Remove if it's there. 260 self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe) 261 else: 262 self.debug_monitor_exe = get_lldb_server_exe() 263 if not self.debug_monitor_exe: 264 self.skipTest("lldb-server exe not found") 265 266 self.debug_monitor_extra_args = ["gdbserver"] 267 self.setUpServerLogging(is_llgs=True) 268 269 if use_named_pipe: 270 (self.named_pipe_path, self.named_pipe, 271 self.named_pipe_fd) = self.create_named_pipe() 272 273 def init_debugserver_test(self, use_named_pipe=True): 274 self.debug_monitor_exe = get_debugserver_exe() 275 if not self.debug_monitor_exe: 276 self.skipTest("debugserver exe not found") 277 self.setUpServerLogging(is_llgs=False) 278 if use_named_pipe: 279 (self.named_pipe_path, self.named_pipe, 280 self.named_pipe_fd) = self.create_named_pipe() 281 # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification 282 # when the process truly dies. 283 self.stub_sends_two_stop_notifications_on_kill = True 284 285 def forward_adb_port(self, source, target, direction, device): 286 adb = ['adb'] + (['-s', device] if device else []) + [direction] 287 288 def remove_port_forward(): 289 subprocess.call(adb + ["--remove", "tcp:%d" % source]) 290 291 subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target]) 292 self.addTearDownHook(remove_port_forward) 293 294 def _verify_socket(self, sock): 295 # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the 296 # connect() attempt. However, due to the way how ADB forwarding works, on android targets 297 # the connect() will always be successful, but the connection will be immediately dropped 298 # if ADB could not connect on the remote side. This function tries to detect this 299 # situation, and report it as "connection refused" so that the upper layers attempt the 300 # connection again. 301 triple = self.dbg.GetSelectedPlatform().GetTriple() 302 if not re.match(".*-.*-.*-android", triple): 303 return # Not android. 304 can_read, _, _ = select.select([sock], [], [], 0.1) 305 if sock not in can_read: 306 return # Data is not available, but the connection is alive. 307 if len(sock.recv(1, socket.MSG_PEEK)) == 0: 308 raise _ConnectionRefused() # Got EOF, connection dropped. 309 310 def create_socket(self): 311 sock = socket.socket() 312 logger = self.logger 313 314 triple = self.dbg.GetSelectedPlatform().GetTriple() 315 if re.match(".*-.*-.*-android", triple): 316 self.forward_adb_port( 317 self.port, 318 self.port, 319 "forward", 320 self.stub_device) 321 322 logger.info( 323 "Connecting to debug monitor on %s:%d", 324 self.stub_hostname, 325 self.port) 326 connect_info = (self.stub_hostname, self.port) 327 try: 328 sock.connect(connect_info) 329 except socket.error as serr: 330 if serr.errno == errno.ECONNREFUSED: 331 raise _ConnectionRefused() 332 raise serr 333 334 def shutdown_socket(): 335 if sock: 336 try: 337 # send the kill packet so lldb-server shuts down gracefully 338 sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET) 339 except: 340 logger.warning( 341 "failed to send kill packet to debug monitor: {}; ignoring".format( 342 sys.exc_info()[0])) 343 344 try: 345 sock.close() 346 except: 347 logger.warning( 348 "failed to close socket to debug monitor: {}; ignoring".format( 349 sys.exc_info()[0])) 350 351 self.addTearDownHook(shutdown_socket) 352 353 self._verify_socket(sock) 354 355 return sock 356 357 def set_inferior_startup_launch(self): 358 self._inferior_startup = self._STARTUP_LAUNCH 359 360 def set_inferior_startup_attach(self): 361 self._inferior_startup = self._STARTUP_ATTACH 362 363 def set_inferior_startup_attach_manually(self): 364 self._inferior_startup = self._STARTUP_ATTACH_MANUALLY 365 366 def get_debug_monitor_command_line_args(self, attach_pid=None): 367 if lldb.remote_platform: 368 commandline_args = self.debug_monitor_extra_args + \ 369 ["*:{}".format(self.port)] 370 else: 371 commandline_args = self.debug_monitor_extra_args + \ 372 ["localhost:{}".format(self.port)] 373 374 if attach_pid: 375 commandline_args += ["--attach=%d" % attach_pid] 376 if self.named_pipe_path: 377 commandline_args += ["--named-pipe", self.named_pipe_path] 378 return commandline_args 379 380 def launch_debug_monitor(self, attach_pid=None, logfile=None): 381 # Create the command line. 382 commandline_args = self.get_debug_monitor_command_line_args( 383 attach_pid=attach_pid) 384 385 # Start the server. 386 server = self.spawnSubprocess( 387 self.debug_monitor_exe, 388 commandline_args, 389 install_remote=False) 390 self.addTearDownHook(self.cleanupSubprocesses) 391 self.assertIsNotNone(server) 392 393 # If we're receiving the stub's listening port from the named pipe, do 394 # that here. 395 if self.named_pipe: 396 self.port = self.get_stub_port_from_named_socket() 397 398 return server 399 400 def connect_to_debug_monitor(self, attach_pid=None): 401 if self.named_pipe: 402 # Create the stub. 403 server = self.launch_debug_monitor(attach_pid=attach_pid) 404 self.assertIsNotNone(server) 405 406 def shutdown_debug_monitor(): 407 try: 408 server.terminate() 409 except: 410 logger.warning( 411 "failed to terminate server for debug monitor: {}; ignoring".format( 412 sys.exc_info()[0])) 413 self.addTearDownHook(shutdown_debug_monitor) 414 415 # Schedule debug monitor to be shut down during teardown. 416 logger = self.logger 417 418 # Attach to the stub and return a socket opened to it. 419 self.sock = self.create_socket() 420 return server 421 422 # We're using a random port algorithm to try not to collide with other ports, 423 # and retry a max # times. 424 attempts = 0 425 MAX_ATTEMPTS = 20 426 427 while attempts < MAX_ATTEMPTS: 428 server = self.launch_debug_monitor(attach_pid=attach_pid) 429 430 # Schedule debug monitor to be shut down during teardown. 431 logger = self.logger 432 433 def shutdown_debug_monitor(): 434 try: 435 server.terminate() 436 except: 437 logger.warning( 438 "failed to terminate server for debug monitor: {}; ignoring".format( 439 sys.exc_info()[0])) 440 self.addTearDownHook(shutdown_debug_monitor) 441 442 connect_attemps = 0 443 MAX_CONNECT_ATTEMPTS = 10 444 445 while connect_attemps < MAX_CONNECT_ATTEMPTS: 446 # Create a socket to talk to the server 447 try: 448 logger.info("Connect attempt %d", connect_attemps + 1) 449 self.sock = self.create_socket() 450 return server 451 except _ConnectionRefused as serr: 452 # Ignore, and try again. 453 pass 454 time.sleep(0.5) 455 connect_attemps += 1 456 457 # We should close the server here to be safe. 458 server.terminate() 459 460 # Increment attempts. 461 print( 462 "connect to debug monitor on port %d failed, attempt #%d of %d" % 463 (self.port, attempts + 1, MAX_ATTEMPTS)) 464 attempts += 1 465 466 # And wait a random length of time before next attempt, to avoid 467 # collisions. 468 time.sleep(random.randint(1, 5)) 469 470 # Now grab a new port number. 471 self.port = self.get_next_port() 472 473 raise Exception( 474 "failed to create a socket to the launched debug monitor after %d tries" % 475 attempts) 476 477 def launch_process_for_attach( 478 self, 479 inferior_args=None, 480 sleep_seconds=3, 481 exe_path=None): 482 # We're going to start a child process that the debug monitor stub can later attach to. 483 # This process needs to be started so that it just hangs around for a while. We'll 484 # have it sleep. 485 if not exe_path: 486 exe_path = os.path.abspath("a.out") 487 488 args = [] 489 if inferior_args: 490 args.extend(inferior_args) 491 if sleep_seconds: 492 args.append("sleep:%d" % sleep_seconds) 493 494 inferior = self.spawnSubprocess(exe_path, args) 495 496 def shutdown_process_for_attach(): 497 try: 498 inferior.terminate() 499 except: 500 logger.warning( 501 "failed to terminate inferior process for attach: {}; ignoring".format( 502 sys.exc_info()[0])) 503 self.addTearDownHook(shutdown_process_for_attach) 504 return inferior 505 506 def prep_debug_monitor_and_inferior( 507 self, 508 inferior_args=None, 509 inferior_sleep_seconds=3, 510 inferior_exe_path=None): 511 """Prep the debug monitor, the inferior, and the expected packet stream. 512 513 Handle the separate cases of using the debug monitor in attach-to-inferior mode 514 and in launch-inferior mode. 515 516 For attach-to-inferior mode, the inferior process is first started, then 517 the debug monitor is started in attach to pid mode (using --attach on the 518 stub command line), and the no-ack-mode setup is appended to the packet 519 stream. The packet stream is not yet executed, ready to have more expected 520 packet entries added to it. 521 522 For launch-inferior mode, the stub is first started, then no ack mode is 523 setup on the expected packet stream, then the verified launch packets are added 524 to the expected socket stream. The packet stream is not yet executed, ready 525 to have more expected packet entries added to it. 526 527 The return value is: 528 {inferior:<inferior>, server:<server>} 529 """ 530 inferior = None 531 attach_pid = None 532 533 if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY: 534 # Launch the process that we'll use as the inferior. 535 inferior = self.launch_process_for_attach( 536 inferior_args=inferior_args, 537 sleep_seconds=inferior_sleep_seconds, 538 exe_path=inferior_exe_path) 539 self.assertIsNotNone(inferior) 540 self.assertTrue(inferior.pid > 0) 541 if self._inferior_startup == self._STARTUP_ATTACH: 542 # In this case, we want the stub to attach via the command 543 # line, so set the command line attach pid here. 544 attach_pid = inferior.pid 545 546 if self._inferior_startup == self._STARTUP_LAUNCH: 547 # Build launch args 548 if not inferior_exe_path: 549 inferior_exe_path = os.path.abspath("a.out") 550 551 if lldb.remote_platform: 552 remote_path = lldbutil.append_to_process_working_directory( 553 os.path.basename(inferior_exe_path)) 554 remote_file_spec = lldb.SBFileSpec(remote_path, False) 555 err = lldb.remote_platform.Install(lldb.SBFileSpec( 556 inferior_exe_path, True), remote_file_spec) 557 if err.Fail(): 558 raise Exception( 559 "remote_platform.Install('%s', '%s') failed: %s" % 560 (inferior_exe_path, remote_path, err)) 561 inferior_exe_path = remote_path 562 563 launch_args = [inferior_exe_path] 564 if inferior_args: 565 launch_args.extend(inferior_args) 566 567 # Launch the debug monitor stub, attaching to the inferior. 568 server = self.connect_to_debug_monitor(attach_pid=attach_pid) 569 self.assertIsNotNone(server) 570 571 # Build the expected protocol stream 572 self.add_no_ack_remote_stream() 573 if self._inferior_startup == self._STARTUP_LAUNCH: 574 self.add_verified_launch_packets(launch_args) 575 576 return {"inferior": inferior, "server": server} 577 578 def expect_socket_recv( 579 self, 580 sock, 581 expected_content_regex, 582 timeout_seconds): 583 response = "" 584 timeout_time = time.time() + timeout_seconds 585 586 while not expected_content_regex.match( 587 response) and time.time() < timeout_time: 588 can_read, _, _ = select.select([sock], [], [], timeout_seconds) 589 if can_read and sock in can_read: 590 recv_bytes = sock.recv(4096) 591 if recv_bytes: 592 response += recv_bytes 593 594 self.assertTrue(expected_content_regex.match(response)) 595 596 def expect_socket_send(self, sock, content, timeout_seconds): 597 request_bytes_remaining = content 598 timeout_time = time.time() + timeout_seconds 599 600 while len(request_bytes_remaining) > 0 and time.time() < timeout_time: 601 _, can_write, _ = select.select([], [sock], [], timeout_seconds) 602 if can_write and sock in can_write: 603 written_byte_count = sock.send(request_bytes_remaining) 604 request_bytes_remaining = request_bytes_remaining[ 605 written_byte_count:] 606 self.assertEqual(len(request_bytes_remaining), 0) 607 608 def do_handshake(self, stub_socket, timeout_seconds=5): 609 # Write the ack. 610 self.expect_socket_send(stub_socket, "+", timeout_seconds) 611 612 # Send the start no ack mode packet. 613 NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0" 614 bytes_sent = stub_socket.send(NO_ACK_MODE_REQUEST) 615 self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST)) 616 617 # Receive the ack and "OK" 618 self.expect_socket_recv(stub_socket, re.compile( 619 r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds) 620 621 # Send the final ack. 622 self.expect_socket_send(stub_socket, "+", timeout_seconds) 623 624 def add_no_ack_remote_stream(self): 625 self.test_sequence.add_log_lines( 626 ["read packet: +", 627 "read packet: $QStartNoAckMode#b0", 628 "send packet: +", 629 "send packet: $OK#9a", 630 "read packet: +"], 631 True) 632 633 def add_verified_launch_packets(self, launch_args): 634 self.test_sequence.add_log_lines( 635 ["read packet: %s" % build_gdbremote_A_packet(launch_args), 636 "send packet: $OK#00", 637 "read packet: $qLaunchSuccess#a5", 638 "send packet: $OK#00"], 639 True) 640 641 def add_thread_suffix_request_packets(self): 642 self.test_sequence.add_log_lines( 643 ["read packet: $QThreadSuffixSupported#e4", 644 "send packet: $OK#00", 645 ], True) 646 647 def add_process_info_collection_packets(self): 648 self.test_sequence.add_log_lines( 649 ["read packet: $qProcessInfo#dc", 650 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}], 651 True) 652 653 _KNOWN_PROCESS_INFO_KEYS = [ 654 "pid", 655 "parent-pid", 656 "real-uid", 657 "real-gid", 658 "effective-uid", 659 "effective-gid", 660 "cputype", 661 "cpusubtype", 662 "ostype", 663 "triple", 664 "vendor", 665 "endian", 666 "elf_abi", 667 "ptrsize" 668 ] 669 670 def parse_process_info_response(self, context): 671 # Ensure we have a process info response. 672 self.assertIsNotNone(context) 673 process_info_raw = context.get("process_info_raw") 674 self.assertIsNotNone(process_info_raw) 675 676 # Pull out key:value; pairs. 677 process_info_dict = { 678 match.group(1): match.group(2) for match in re.finditer( 679 r"([^:]+):([^;]+);", process_info_raw)} 680 681 # Validate keys are known. 682 for (key, val) in list(process_info_dict.items()): 683 self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS) 684 self.assertIsNotNone(val) 685 686 return process_info_dict 687 688 def add_register_info_collection_packets(self): 689 self.test_sequence.add_log_lines( 690 [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True, 691 "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"), 692 "save_key": "reg_info_responses"}], 693 True) 694 695 def parse_register_info_packets(self, context): 696 """Return an array of register info dictionaries, one per register info.""" 697 reg_info_responses = context.get("reg_info_responses") 698 self.assertIsNotNone(reg_info_responses) 699 700 # Parse register infos. 701 return [parse_reg_info_response(reg_info_response) 702 for reg_info_response in reg_info_responses] 703 704 def expect_gdbremote_sequence(self, timeout_seconds=None): 705 if not timeout_seconds: 706 timeout_seconds = self._TIMEOUT_SECONDS 707 return expect_lldb_gdbserver_replay( 708 self, 709 self.sock, 710 self.test_sequence, 711 self._pump_queues, 712 timeout_seconds, 713 self.logger) 714 715 _KNOWN_REGINFO_KEYS = [ 716 "name", 717 "alt-name", 718 "bitsize", 719 "offset", 720 "encoding", 721 "format", 722 "set", 723 "gcc", 724 "ehframe", 725 "dwarf", 726 "generic", 727 "container-regs", 728 "invalidate-regs", 729 "dynamic_size_dwarf_expr_bytes", 730 "dynamic_size_dwarf_len" 731 ] 732 733 def assert_valid_reg_info(self, reg_info): 734 # Assert we know about all the reginfo keys parsed. 735 for key in reg_info: 736 self.assertTrue(key in self._KNOWN_REGINFO_KEYS) 737 738 # Check the bare-minimum expected set of register info keys. 739 self.assertTrue("name" in reg_info) 740 self.assertTrue("bitsize" in reg_info) 741 self.assertTrue("offset" in reg_info) 742 self.assertTrue("encoding" in reg_info) 743 self.assertTrue("format" in reg_info) 744 745 def find_pc_reg_info(self, reg_infos): 746 lldb_reg_index = 0 747 for reg_info in reg_infos: 748 if ("generic" in reg_info) and (reg_info["generic"] == "pc"): 749 return (lldb_reg_index, reg_info) 750 lldb_reg_index += 1 751 752 return (None, None) 753 754 def add_lldb_register_index(self, reg_infos): 755 """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry. 756 757 We'll use this when we want to call packets like P/p with a register index but do so 758 on only a subset of the full register info set. 759 """ 760 self.assertIsNotNone(reg_infos) 761 762 reg_index = 0 763 for reg_info in reg_infos: 764 reg_info["lldb_register_index"] = reg_index 765 reg_index += 1 766 767 def add_query_memory_region_packets(self, address): 768 self.test_sequence.add_log_lines( 769 ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address), 770 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}], 771 True) 772 773 def parse_key_val_dict(self, key_val_text, allow_dupes=True): 774 self.assertIsNotNone(key_val_text) 775 kv_dict = {} 776 for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text): 777 key = match.group(1) 778 val = match.group(2) 779 if key in kv_dict: 780 if allow_dupes: 781 if isinstance(kv_dict[key], list): 782 kv_dict[key].append(val) 783 else: 784 # Promote to list 785 kv_dict[key] = [kv_dict[key], val] 786 else: 787 self.fail( 788 "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format( 789 key, val, key_val_text, kv_dict)) 790 else: 791 kv_dict[key] = val 792 return kv_dict 793 794 def parse_memory_region_packet(self, context): 795 # Ensure we have a context. 796 self.assertIsNotNone(context.get("memory_region_response")) 797 798 # Pull out key:value; pairs. 799 mem_region_dict = self.parse_key_val_dict( 800 context.get("memory_region_response")) 801 802 # Validate keys are known. 803 for (key, val) in list(mem_region_dict.items()): 804 self.assertTrue( 805 key in [ 806 "start", 807 "size", 808 "permissions", 809 "name", 810 "error"]) 811 self.assertIsNotNone(val) 812 813 # Return the dictionary of key-value pairs for the memory region. 814 return mem_region_dict 815 816 def assert_address_within_memory_region( 817 self, test_address, mem_region_dict): 818 self.assertIsNotNone(mem_region_dict) 819 self.assertTrue("start" in mem_region_dict) 820 self.assertTrue("size" in mem_region_dict) 821 822 range_start = int(mem_region_dict["start"], 16) 823 range_size = int(mem_region_dict["size"], 16) 824 range_end = range_start + range_size 825 826 if test_address < range_start: 827 self.fail( 828 "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 829 test_address, 830 range_start, 831 range_end, 832 range_size)) 833 elif test_address >= range_end: 834 self.fail( 835 "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 836 test_address, 837 range_start, 838 range_end, 839 range_size)) 840 841 def add_threadinfo_collection_packets(self): 842 self.test_sequence.add_log_lines( 843 [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo", 844 "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"), 845 "save_key": "threadinfo_responses"}], 846 True) 847 848 def parse_threadinfo_packets(self, context): 849 """Return an array of thread ids (decimal ints), one per thread.""" 850 threadinfo_responses = context.get("threadinfo_responses") 851 self.assertIsNotNone(threadinfo_responses) 852 853 thread_ids = [] 854 for threadinfo_response in threadinfo_responses: 855 new_thread_infos = parse_threadinfo_response(threadinfo_response) 856 thread_ids.extend(new_thread_infos) 857 return thread_ids 858 859 def wait_for_thread_count(self, thread_count, timeout_seconds=3): 860 start_time = time.time() 861 timeout_time = start_time + timeout_seconds 862 863 actual_thread_count = 0 864 while actual_thread_count < thread_count: 865 self.reset_test_sequence() 866 self.add_threadinfo_collection_packets() 867 868 context = self.expect_gdbremote_sequence() 869 self.assertIsNotNone(context) 870 871 threads = self.parse_threadinfo_packets(context) 872 self.assertIsNotNone(threads) 873 874 actual_thread_count = len(threads) 875 876 if time.time() > timeout_time: 877 raise Exception( 878 'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format( 879 timeout_seconds, thread_count, actual_thread_count)) 880 881 return threads 882 883 def add_set_breakpoint_packets( 884 self, 885 address, 886 do_continue=True, 887 breakpoint_kind=1): 888 self.test_sequence.add_log_lines( 889 [ # Set the breakpoint. 890 "read packet: $Z0,{0:x},{1}#00".format( 891 address, breakpoint_kind), 892 # Verify the stub could set it. 893 "send packet: $OK#00", 894 ], True) 895 896 if (do_continue): 897 self.test_sequence.add_log_lines( 898 [ # Continue the inferior. 899 "read packet: $c#63", 900 # Expect a breakpoint stop report. 901 {"direction": "send", 902 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 903 "capture": {1: "stop_signo", 904 2: "stop_thread_id"}}, 905 ], True) 906 907 def add_remove_breakpoint_packets(self, address, breakpoint_kind=1): 908 self.test_sequence.add_log_lines( 909 [ # Remove the breakpoint. 910 "read packet: $z0,{0:x},{1}#00".format( 911 address, breakpoint_kind), 912 # Verify the stub could unset it. 913 "send packet: $OK#00", 914 ], True) 915 916 def add_qSupported_packets(self): 917 self.test_sequence.add_log_lines( 918 ["read packet: $qSupported#00", 919 {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}}, 920 ], True) 921 922 _KNOWN_QSUPPORTED_STUB_FEATURES = [ 923 "augmented-libraries-svr4-read", 924 "PacketSize", 925 "QStartNoAckMode", 926 "QThreadSuffixSupported", 927 "QListThreadsInStopReply", 928 "qXfer:auxv:read", 929 "qXfer:libraries:read", 930 "qXfer:libraries-svr4:read", 931 "qXfer:features:read", 932 "qEcho" 933 ] 934 935 def parse_qSupported_response(self, context): 936 self.assertIsNotNone(context) 937 938 raw_response = context.get("qSupported_response") 939 self.assertIsNotNone(raw_response) 940 941 # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the 942 # +,-,? is stripped from the key and set as the value. 943 supported_dict = {} 944 for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response): 945 key = match.group(1) 946 val = match.group(3) 947 948 # key=val: store as is 949 if val and len(val) > 0: 950 supported_dict[key] = val 951 else: 952 if len(key) < 2: 953 raise Exception( 954 "singular stub feature is too short: must be stub_feature{+,-,?}") 955 supported_type = key[-1] 956 key = key[:-1] 957 if not supported_type in ["+", "-", "?"]: 958 raise Exception( 959 "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type)) 960 supported_dict[key] = supported_type 961 # Ensure we know the supported element 962 if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES: 963 raise Exception( 964 "unknown qSupported stub feature reported: %s" % 965 key) 966 967 return supported_dict 968 969 def run_process_then_stop(self, run_seconds=1): 970 # Tell the stub to continue. 971 self.test_sequence.add_log_lines( 972 ["read packet: $vCont;c#a8"], 973 True) 974 context = self.expect_gdbremote_sequence() 975 976 # Wait for run_seconds. 977 time.sleep(run_seconds) 978 979 # Send an interrupt, capture a T response. 980 self.reset_test_sequence() 981 self.test_sequence.add_log_lines( 982 ["read packet: {}".format(chr(3)), 983 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}], 984 True) 985 context = self.expect_gdbremote_sequence() 986 self.assertIsNotNone(context) 987 self.assertIsNotNone(context.get("stop_result")) 988 989 return context 990 991 def select_modifiable_register(self, reg_infos): 992 """Find a register that can be read/written freely.""" 993 PREFERRED_REGISTER_NAMES = set(["rax", ]) 994 995 # First check for the first register from the preferred register name 996 # set. 997 alternative_register_index = None 998 999 self.assertIsNotNone(reg_infos) 1000 for reg_info in reg_infos: 1001 if ("name" in reg_info) and ( 1002 reg_info["name"] in PREFERRED_REGISTER_NAMES): 1003 # We found a preferred register. Use it. 1004 return reg_info["lldb_register_index"] 1005 if ("generic" in reg_info) and (reg_info["generic"] == "fp"): 1006 # A frame pointer register will do as a register to modify 1007 # temporarily. 1008 alternative_register_index = reg_info["lldb_register_index"] 1009 1010 # We didn't find a preferred register. Return whatever alternative register 1011 # we found, if any. 1012 return alternative_register_index 1013 1014 def extract_registers_from_stop_notification(self, stop_key_vals_text): 1015 self.assertIsNotNone(stop_key_vals_text) 1016 kv_dict = self.parse_key_val_dict(stop_key_vals_text) 1017 1018 registers = {} 1019 for (key, val) in list(kv_dict.items()): 1020 if re.match(r"^[0-9a-fA-F]+$", key): 1021 registers[int(key, 16)] = val 1022 return registers 1023 1024 def gather_register_infos(self): 1025 self.reset_test_sequence() 1026 self.add_register_info_collection_packets() 1027 1028 context = self.expect_gdbremote_sequence() 1029 self.assertIsNotNone(context) 1030 1031 reg_infos = self.parse_register_info_packets(context) 1032 self.assertIsNotNone(reg_infos) 1033 self.add_lldb_register_index(reg_infos) 1034 1035 return reg_infos 1036 1037 def find_generic_register_with_name(self, reg_infos, generic_name): 1038 self.assertIsNotNone(reg_infos) 1039 for reg_info in reg_infos: 1040 if ("generic" in reg_info) and ( 1041 reg_info["generic"] == generic_name): 1042 return reg_info 1043 return None 1044 1045 def decode_gdbremote_binary(self, encoded_bytes): 1046 decoded_bytes = "" 1047 i = 0 1048 while i < len(encoded_bytes): 1049 if encoded_bytes[i] == "}": 1050 # Handle escaped char. 1051 self.assertTrue(i + 1 < len(encoded_bytes)) 1052 decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20) 1053 i += 2 1054 elif encoded_bytes[i] == "*": 1055 # Handle run length encoding. 1056 self.assertTrue(len(decoded_bytes) > 0) 1057 self.assertTrue(i + 1 < len(encoded_bytes)) 1058 repeat_count = ord(encoded_bytes[i + 1]) - 29 1059 decoded_bytes += decoded_bytes[-1] * repeat_count 1060 i += 2 1061 else: 1062 decoded_bytes += encoded_bytes[i] 1063 i += 1 1064 return decoded_bytes 1065 1066 def build_auxv_dict(self, endian, word_size, auxv_data): 1067 self.assertIsNotNone(endian) 1068 self.assertIsNotNone(word_size) 1069 self.assertIsNotNone(auxv_data) 1070 1071 auxv_dict = {} 1072 1073 while len(auxv_data) > 0: 1074 # Chop off key. 1075 raw_key = auxv_data[:word_size] 1076 auxv_data = auxv_data[word_size:] 1077 1078 # Chop of value. 1079 raw_value = auxv_data[:word_size] 1080 auxv_data = auxv_data[word_size:] 1081 1082 # Convert raw text from target endian. 1083 key = unpack_endian_binary_string(endian, raw_key) 1084 value = unpack_endian_binary_string(endian, raw_value) 1085 1086 # Handle ending entry. 1087 if key == 0: 1088 self.assertEqual(value, 0) 1089 return auxv_dict 1090 1091 # The key should not already be present. 1092 self.assertFalse(key in auxv_dict) 1093 auxv_dict[key] = value 1094 1095 self.fail( 1096 "should not reach here - implies required double zero entry not found") 1097 return auxv_dict 1098 1099 def read_binary_data_in_chunks(self, command_prefix, chunk_length): 1100 """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned.""" 1101 offset = 0 1102 done = False 1103 decoded_data = "" 1104 1105 while not done: 1106 # Grab the next iteration of data. 1107 self.reset_test_sequence() 1108 self.test_sequence.add_log_lines( 1109 [ 1110 "read packet: ${}{:x},{:x}:#00".format( 1111 command_prefix, 1112 offset, 1113 chunk_length), 1114 { 1115 "direction": "send", 1116 "regex": re.compile( 1117 r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", 1118 re.MULTILINE | re.DOTALL), 1119 "capture": { 1120 1: "response_type", 1121 2: "content_raw"}}], 1122 True) 1123 1124 context = self.expect_gdbremote_sequence() 1125 self.assertIsNotNone(context) 1126 1127 response_type = context.get("response_type") 1128 self.assertIsNotNone(response_type) 1129 self.assertTrue(response_type in ["l", "m"]) 1130 1131 # Move offset along. 1132 offset += chunk_length 1133 1134 # Figure out if we're done. We're done if the response type is l. 1135 done = response_type == "l" 1136 1137 # Decode binary data. 1138 content_raw = context.get("content_raw") 1139 if content_raw and len(content_raw) > 0: 1140 self.assertIsNotNone(content_raw) 1141 decoded_data += self.decode_gdbremote_binary(content_raw) 1142 return decoded_data 1143 1144 def add_interrupt_packets(self): 1145 self.test_sequence.add_log_lines([ 1146 # Send the intterupt. 1147 "read packet: {}".format(chr(3)), 1148 # And wait for the stop notification. 1149 {"direction": "send", 1150 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 1151 "capture": {1: "stop_signo", 1152 2: "stop_key_val_text"}}, 1153 ], True) 1154 1155 def parse_interrupt_packets(self, context): 1156 self.assertIsNotNone(context.get("stop_signo")) 1157 self.assertIsNotNone(context.get("stop_key_val_text")) 1158 return (int(context["stop_signo"], 16), self.parse_key_val_dict( 1159 context["stop_key_val_text"])) 1160 1161 def add_QSaveRegisterState_packets(self, thread_id): 1162 if thread_id: 1163 # Use the thread suffix form. 1164 request = "read packet: $QSaveRegisterState;thread:{:x}#00".format( 1165 thread_id) 1166 else: 1167 request = "read packet: $QSaveRegisterState#00" 1168 1169 self.test_sequence.add_log_lines([request, 1170 {"direction": "send", 1171 "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$", 1172 "capture": {1: "save_response"}}, 1173 ], 1174 True) 1175 1176 def parse_QSaveRegisterState_response(self, context): 1177 self.assertIsNotNone(context) 1178 1179 save_response = context.get("save_response") 1180 self.assertIsNotNone(save_response) 1181 1182 if len(save_response) < 1 or save_response[0] == "E": 1183 # error received 1184 return (False, None) 1185 else: 1186 return (True, int(save_response)) 1187 1188 def add_QRestoreRegisterState_packets(self, save_id, thread_id=None): 1189 if thread_id: 1190 # Use the thread suffix form. 1191 request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format( 1192 save_id, thread_id) 1193 else: 1194 request = "read packet: $QRestoreRegisterState:{}#00".format( 1195 save_id) 1196 1197 self.test_sequence.add_log_lines([ 1198 request, 1199 "send packet: $OK#00" 1200 ], True) 1201 1202 def flip_all_bits_in_each_register_value( 1203 self, reg_infos, endian, thread_id=None): 1204 self.assertIsNotNone(reg_infos) 1205 1206 successful_writes = 0 1207 failed_writes = 0 1208 1209 for reg_info in reg_infos: 1210 # Use the lldb register index added to the reg info. We're not necessarily 1211 # working off a full set of register infos, so an inferred register 1212 # index could be wrong. 1213 reg_index = reg_info["lldb_register_index"] 1214 self.assertIsNotNone(reg_index) 1215 1216 reg_byte_size = int(reg_info["bitsize"]) / 8 1217 self.assertTrue(reg_byte_size > 0) 1218 1219 # Handle thread suffix. 1220 if thread_id: 1221 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1222 reg_index, thread_id) 1223 else: 1224 p_request = "read packet: $p{:x}#00".format(reg_index) 1225 1226 # Read the existing value. 1227 self.reset_test_sequence() 1228 self.test_sequence.add_log_lines([ 1229 p_request, 1230 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1231 ], True) 1232 context = self.expect_gdbremote_sequence() 1233 self.assertIsNotNone(context) 1234 1235 # Verify the response length. 1236 p_response = context.get("p_response") 1237 self.assertIsNotNone(p_response) 1238 initial_reg_value = unpack_register_hex_unsigned( 1239 endian, p_response) 1240 1241 # Flip the value by xoring with all 1s 1242 all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) / 8) 1243 flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16) 1244 # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int)) 1245 1246 # Handle thread suffix for P. 1247 if thread_id: 1248 P_request = "read packet: $P{:x}={};thread:{:x}#00".format( 1249 reg_index, pack_register_hex( 1250 endian, flipped_bits_int, byte_size=reg_byte_size), thread_id) 1251 else: 1252 P_request = "read packet: $P{:x}={}#00".format( 1253 reg_index, pack_register_hex( 1254 endian, flipped_bits_int, byte_size=reg_byte_size)) 1255 1256 # Write the flipped value to the register. 1257 self.reset_test_sequence() 1258 self.test_sequence.add_log_lines([P_request, 1259 {"direction": "send", 1260 "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", 1261 "capture": {1: "P_response"}}, 1262 ], 1263 True) 1264 context = self.expect_gdbremote_sequence() 1265 self.assertIsNotNone(context) 1266 1267 # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail 1268 # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them 1269 # all flipping perfectly. 1270 P_response = context.get("P_response") 1271 self.assertIsNotNone(P_response) 1272 if P_response == "OK": 1273 successful_writes += 1 1274 else: 1275 failed_writes += 1 1276 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response)) 1277 1278 # Read back the register value, ensure it matches the flipped 1279 # value. 1280 if P_response == "OK": 1281 self.reset_test_sequence() 1282 self.test_sequence.add_log_lines([ 1283 p_request, 1284 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1285 ], True) 1286 context = self.expect_gdbremote_sequence() 1287 self.assertIsNotNone(context) 1288 1289 verify_p_response_raw = context.get("p_response") 1290 self.assertIsNotNone(verify_p_response_raw) 1291 verify_bits = unpack_register_hex_unsigned( 1292 endian, verify_p_response_raw) 1293 1294 if verify_bits != flipped_bits_int: 1295 # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts. 1296 # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits)) 1297 successful_writes -= 1 1298 failed_writes += 1 1299 1300 return (successful_writes, failed_writes) 1301 1302 def is_bit_flippable_register(self, reg_info): 1303 if not reg_info: 1304 return False 1305 if not "set" in reg_info: 1306 return False 1307 if reg_info["set"] != "General Purpose Registers": 1308 return False 1309 if ("container-regs" in reg_info) and ( 1310 len(reg_info["container-regs"]) > 0): 1311 # Don't try to bit flip registers contained in another register. 1312 return False 1313 if re.match("^.s$", reg_info["name"]): 1314 # This is a 2-letter register name that ends in "s", like a segment register. 1315 # Don't try to bit flip these. 1316 return False 1317 if re.match("^(c|)psr$", reg_info["name"]): 1318 # This is an ARM program status register; don't flip it. 1319 return False 1320 # Okay, this looks fine-enough. 1321 return True 1322 1323 def read_register_values(self, reg_infos, endian, thread_id=None): 1324 self.assertIsNotNone(reg_infos) 1325 values = {} 1326 1327 for reg_info in reg_infos: 1328 # We append a register index when load reg infos so we can work 1329 # with subsets. 1330 reg_index = reg_info.get("lldb_register_index") 1331 self.assertIsNotNone(reg_index) 1332 1333 # Handle thread suffix. 1334 if thread_id: 1335 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1336 reg_index, thread_id) 1337 else: 1338 p_request = "read packet: $p{:x}#00".format(reg_index) 1339 1340 # Read it with p. 1341 self.reset_test_sequence() 1342 self.test_sequence.add_log_lines([ 1343 p_request, 1344 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1345 ], True) 1346 context = self.expect_gdbremote_sequence() 1347 self.assertIsNotNone(context) 1348 1349 # Convert value from target endian to integral. 1350 p_response = context.get("p_response") 1351 self.assertIsNotNone(p_response) 1352 self.assertTrue(len(p_response) > 0) 1353 self.assertFalse(p_response[0] == "E") 1354 1355 values[reg_index] = unpack_register_hex_unsigned( 1356 endian, p_response) 1357 1358 return values 1359 1360 def add_vCont_query_packets(self): 1361 self.test_sequence.add_log_lines(["read packet: $vCont?#49", 1362 {"direction": "send", 1363 "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", 1364 "capture": {2: "vCont_query_response"}}, 1365 ], 1366 True) 1367 1368 def parse_vCont_query_response(self, context): 1369 self.assertIsNotNone(context) 1370 vCont_query_response = context.get("vCont_query_response") 1371 1372 # Handle case of no vCont support at all - in which case the capture 1373 # group will be none or zero length. 1374 if not vCont_query_response or len(vCont_query_response) == 0: 1375 return {} 1376 1377 return {key: 1 for key in vCont_query_response.split( 1378 ";") if key and len(key) > 0} 1379 1380 def count_single_steps_until_true( 1381 self, 1382 thread_id, 1383 predicate, 1384 args, 1385 max_step_count=100, 1386 use_Hc_packet=True, 1387 step_instruction="s"): 1388 """Used by single step test that appears in a few different contexts.""" 1389 single_step_count = 0 1390 1391 while single_step_count < max_step_count: 1392 self.assertIsNotNone(thread_id) 1393 1394 # Build the packet for the single step instruction. We replace 1395 # {thread}, if present, with the thread_id. 1396 step_packet = "read packet: ${}#00".format( 1397 re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction)) 1398 # print("\nstep_packet created: {}\n".format(step_packet)) 1399 1400 # Single step. 1401 self.reset_test_sequence() 1402 if use_Hc_packet: 1403 self.test_sequence.add_log_lines( 1404 [ # Set the continue thread. 1405 "read packet: $Hc{0:x}#00".format(thread_id), 1406 "send packet: $OK#00", 1407 ], True) 1408 self.test_sequence.add_log_lines([ 1409 # Single step. 1410 step_packet, 1411 # "read packet: $vCont;s:{0:x}#00".format(thread_id), 1412 # Expect a breakpoint stop report. 1413 {"direction": "send", 1414 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 1415 "capture": {1: "stop_signo", 1416 2: "stop_thread_id"}}, 1417 ], True) 1418 context = self.expect_gdbremote_sequence() 1419 self.assertIsNotNone(context) 1420 self.assertIsNotNone(context.get("stop_signo")) 1421 self.assertEqual(int(context.get("stop_signo"), 16), 1422 lldbutil.get_signal_number('SIGTRAP')) 1423 1424 single_step_count += 1 1425 1426 # See if the predicate is true. If so, we're done. 1427 if predicate(args): 1428 return (True, single_step_count) 1429 1430 # The predicate didn't return true within the runaway step count. 1431 return (False, single_step_count) 1432 1433 def g_c1_c2_contents_are(self, args): 1434 """Used by single step test that appears in a few different contexts.""" 1435 g_c1_address = args["g_c1_address"] 1436 g_c2_address = args["g_c2_address"] 1437 expected_g_c1 = args["expected_g_c1"] 1438 expected_g_c2 = args["expected_g_c2"] 1439 1440 # Read g_c1 and g_c2 contents. 1441 self.reset_test_sequence() 1442 self.test_sequence.add_log_lines( 1443 ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1), 1444 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}}, 1445 "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1), 1446 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}], 1447 True) 1448 1449 # Run the packet stream. 1450 context = self.expect_gdbremote_sequence() 1451 self.assertIsNotNone(context) 1452 1453 # Check if what we read from inferior memory is what we are expecting. 1454 self.assertIsNotNone(context.get("g_c1_contents")) 1455 self.assertIsNotNone(context.get("g_c2_contents")) 1456 1457 return (context.get("g_c1_contents").decode("hex") == expected_g_c1) and ( 1458 context.get("g_c2_contents").decode("hex") == expected_g_c2) 1459 1460 def single_step_only_steps_one_instruction( 1461 self, use_Hc_packet=True, step_instruction="s"): 1462 """Used by single step test that appears in a few different contexts.""" 1463 # Start up the inferior. 1464 procs = self.prep_debug_monitor_and_inferior( 1465 inferior_args=[ 1466 "get-code-address-hex:swap_chars", 1467 "get-data-address-hex:g_c1", 1468 "get-data-address-hex:g_c2", 1469 "sleep:1", 1470 "call-function:swap_chars", 1471 "sleep:5"]) 1472 1473 # Run the process 1474 self.test_sequence.add_log_lines( 1475 [ # Start running after initial stop. 1476 "read packet: $c#63", 1477 # Match output line that prints the memory address of the function call entry point. 1478 # Note we require launch-only testing so we can get inferior otuput. 1479 {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", 1480 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}}, 1481 # Now stop the inferior. 1482 "read packet: {}".format(chr(3)), 1483 # And wait for the stop notification. 1484 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 1485 True) 1486 1487 # Run the packet stream. 1488 context = self.expect_gdbremote_sequence() 1489 self.assertIsNotNone(context) 1490 1491 # Grab the main thread id. 1492 self.assertIsNotNone(context.get("stop_thread_id")) 1493 main_thread_id = int(context.get("stop_thread_id"), 16) 1494 1495 # Grab the function address. 1496 self.assertIsNotNone(context.get("function_address")) 1497 function_address = int(context.get("function_address"), 16) 1498 1499 # Grab the data addresses. 1500 self.assertIsNotNone(context.get("g_c1_address")) 1501 g_c1_address = int(context.get("g_c1_address"), 16) 1502 1503 self.assertIsNotNone(context.get("g_c2_address")) 1504 g_c2_address = int(context.get("g_c2_address"), 16) 1505 1506 # Set a breakpoint at the given address. 1507 if self.getArchitecture() == "arm": 1508 # TODO: Handle case when setting breakpoint in thumb code 1509 BREAKPOINT_KIND = 4 1510 else: 1511 BREAKPOINT_KIND = 1 1512 self.reset_test_sequence() 1513 self.add_set_breakpoint_packets( 1514 function_address, 1515 do_continue=True, 1516 breakpoint_kind=BREAKPOINT_KIND) 1517 context = self.expect_gdbremote_sequence() 1518 self.assertIsNotNone(context) 1519 1520 # Remove the breakpoint. 1521 self.reset_test_sequence() 1522 self.add_remove_breakpoint_packets( 1523 function_address, breakpoint_kind=BREAKPOINT_KIND) 1524 context = self.expect_gdbremote_sequence() 1525 self.assertIsNotNone(context) 1526 1527 # Verify g_c1 and g_c2 match expected initial state. 1528 args = {} 1529 args["g_c1_address"] = g_c1_address 1530 args["g_c2_address"] = g_c2_address 1531 args["expected_g_c1"] = "0" 1532 args["expected_g_c2"] = "1" 1533 1534 self.assertTrue(self.g_c1_c2_contents_are(args)) 1535 1536 # Verify we take only a small number of steps to hit the first state. 1537 # Might need to work through function entry prologue code. 1538 args["expected_g_c1"] = "1" 1539 args["expected_g_c2"] = "1" 1540 (state_reached, 1541 step_count) = self.count_single_steps_until_true(main_thread_id, 1542 self.g_c1_c2_contents_are, 1543 args, 1544 max_step_count=25, 1545 use_Hc_packet=use_Hc_packet, 1546 step_instruction=step_instruction) 1547 self.assertTrue(state_reached) 1548 1549 # Verify we hit the next state. 1550 args["expected_g_c1"] = "1" 1551 args["expected_g_c2"] = "0" 1552 (state_reached, 1553 step_count) = self.count_single_steps_until_true(main_thread_id, 1554 self.g_c1_c2_contents_are, 1555 args, 1556 max_step_count=5, 1557 use_Hc_packet=use_Hc_packet, 1558 step_instruction=step_instruction) 1559 self.assertTrue(state_reached) 1560 expected_step_count = 1 1561 arch = self.getArchitecture() 1562 1563 # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation 1564 # of variable value 1565 if re.match("mips", arch): 1566 expected_step_count = 3 1567 # S390X requires "2" (LARL, MVI) machine instructions for updation of 1568 # variable value 1569 if re.match("s390x", arch): 1570 expected_step_count = 2 1571 self.assertEqual(step_count, expected_step_count) 1572 1573 # Verify we hit the next state. 1574 args["expected_g_c1"] = "0" 1575 args["expected_g_c2"] = "0" 1576 (state_reached, 1577 step_count) = self.count_single_steps_until_true(main_thread_id, 1578 self.g_c1_c2_contents_are, 1579 args, 1580 max_step_count=5, 1581 use_Hc_packet=use_Hc_packet, 1582 step_instruction=step_instruction) 1583 self.assertTrue(state_reached) 1584 self.assertEqual(step_count, expected_step_count) 1585 1586 # Verify we hit the next state. 1587 args["expected_g_c1"] = "0" 1588 args["expected_g_c2"] = "1" 1589 (state_reached, 1590 step_count) = self.count_single_steps_until_true(main_thread_id, 1591 self.g_c1_c2_contents_are, 1592 args, 1593 max_step_count=5, 1594 use_Hc_packet=use_Hc_packet, 1595 step_instruction=step_instruction) 1596 self.assertTrue(state_reached) 1597 self.assertEqual(step_count, expected_step_count) 1598 1599 def maybe_strict_output_regex(self, regex): 1600 return '.*' + regex + \ 1601 '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$' 1602 1603 def install_and_create_launch_args(self): 1604 exe_path = os.path.abspath('a.out') 1605 if not lldb.remote_platform: 1606 return [exe_path] 1607 remote_path = lldbutil.append_to_process_working_directory( 1608 os.path.basename(exe_path)) 1609 remote_file_spec = lldb.SBFileSpec(remote_path, False) 1610 err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True), 1611 remote_file_spec) 1612 if err.Fail(): 1613 raise Exception("remote_platform.Install('%s', '%s') failed: %s" % 1614 (exe_path, remote_path, err)) 1615 return [remote_path] 1616