1""" 2Base class for gdb-remote test cases. 3""" 4 5from __future__ import division, print_function 6 7 8import errno 9import os 10import os.path 11import random 12import re 13import select 14import socket 15import subprocess 16import sys 17import tempfile 18import time 19from lldbsuite.test import configuration 20from lldbsuite.test.lldbtest import * 21from lldbsuite.support import seven 22from lldbgdbserverutils import * 23import logging 24 25 26class _ConnectionRefused(IOError): 27 pass 28 29 30class GdbRemoteTestCaseFactory(type): 31 32 def __new__(cls, name, bases, attrs): 33 newattrs = {} 34 for attrname, attrvalue in attrs.items(): 35 if not attrname.startswith("test"): 36 newattrs[attrname] = attrvalue 37 continue 38 39 # If any debug server categories were explicitly tagged, assume 40 # that list to be authoritative. If none were specified, try 41 # all of them. 42 all_categories = set(["debugserver", "llgs"]) 43 categories = set( 44 getattr(attrvalue, "categories", [])) & all_categories 45 if not categories: 46 categories = all_categories 47 48 for cat in categories: 49 @decorators.add_test_categories([cat]) 50 @wraps(attrvalue) 51 def test_method(self, attrvalue=attrvalue): 52 return attrvalue(self) 53 54 method_name = attrname + "_" + cat 55 test_method.__name__ = method_name 56 test_method.debug_server = cat 57 newattrs[method_name] = test_method 58 59 return super(GdbRemoteTestCaseFactory, cls).__new__( 60 cls, name, bases, newattrs) 61 62@add_metaclass(GdbRemoteTestCaseFactory) 63class GdbRemoteTestCaseBase(Base): 64 65 # Default time out in seconds. The timeout is increased tenfold under Asan. 66 DEFAULT_TIMEOUT = 20 * (10 if ('ASAN_OPTIONS' in os.environ) else 1) 67 # Default sleep time in seconds. The sleep time is doubled under Asan. 68 DEFAULT_SLEEP = 5 * (2 if ('ASAN_OPTIONS' in os.environ) else 1) 69 70 _GDBREMOTE_KILL_PACKET = b"$k#6b" 71 72 # Start the inferior separately, attach to the inferior on the stub 73 # command line. 74 _STARTUP_ATTACH = "attach" 75 # Start the inferior separately, start the stub without attaching, allow 76 # the test to attach to the inferior however it wants (e.g. $vAttach;pid). 77 _STARTUP_ATTACH_MANUALLY = "attach_manually" 78 # Start the stub, and launch the inferior with an $A packet via the 79 # initial packet stream. 80 _STARTUP_LAUNCH = "launch" 81 82 # GDB Signal numbers that are not target-specific used for common 83 # exceptions 84 TARGET_EXC_BAD_ACCESS = 0x91 85 TARGET_EXC_BAD_INSTRUCTION = 0x92 86 TARGET_EXC_ARITHMETIC = 0x93 87 TARGET_EXC_EMULATION = 0x94 88 TARGET_EXC_SOFTWARE = 0x95 89 TARGET_EXC_BREAKPOINT = 0x96 90 91 _verbose_log_handler = None 92 _log_formatter = logging.Formatter( 93 fmt='%(asctime)-15s %(levelname)-8s %(message)s') 94 95 def setUpBaseLogging(self): 96 self.logger = logging.getLogger(__name__) 97 98 if len(self.logger.handlers) > 0: 99 return # We have set up this handler already 100 101 self.logger.propagate = False 102 self.logger.setLevel(logging.DEBUG) 103 104 # log all warnings to stderr 105 handler = logging.StreamHandler() 106 handler.setLevel(logging.WARNING) 107 handler.setFormatter(self._log_formatter) 108 self.logger.addHandler(handler) 109 110 def isVerboseLoggingRequested(self): 111 # We will report our detailed logs if the user requested that the "gdb-remote" channel is 112 # logged. 113 return any(("gdb-remote" in channel) 114 for channel in lldbtest_config.channels) 115 116 def getDebugServer(self): 117 method = getattr(self, self.testMethodName) 118 return getattr(method, "debug_server", None) 119 120 def setUp(self): 121 super(GdbRemoteTestCaseBase, self).setUp() 122 123 self.setUpBaseLogging() 124 self.debug_monitor_extra_args = [] 125 126 if self.isVerboseLoggingRequested(): 127 # If requested, full logs go to a log file 128 self._verbose_log_handler = logging.FileHandler( 129 self.getLogBasenameForCurrentTest() + "-host.log") 130 self._verbose_log_handler.setFormatter(self._log_formatter) 131 self._verbose_log_handler.setLevel(logging.DEBUG) 132 self.logger.addHandler(self._verbose_log_handler) 133 134 self.test_sequence = GdbRemoteTestSequence(self.logger) 135 self.set_inferior_startup_launch() 136 self.port = self.get_next_port() 137 self.stub_sends_two_stop_notifications_on_kill = False 138 if configuration.lldb_platform_url: 139 if configuration.lldb_platform_url.startswith('unix-'): 140 url_pattern = '(.+)://\[?(.+?)\]?/.*' 141 else: 142 url_pattern = '(.+)://(.+):\d+' 143 scheme, host = re.match( 144 url_pattern, configuration.lldb_platform_url).groups() 145 if configuration.lldb_platform_name == 'remote-android' and host != 'localhost': 146 self.stub_device = host 147 self.stub_hostname = 'localhost' 148 else: 149 self.stub_device = None 150 self.stub_hostname = host 151 else: 152 self.stub_hostname = "localhost" 153 154 debug_server = self.getDebugServer() 155 if debug_server == "debugserver": 156 self._init_debugserver_test() 157 else: 158 self._init_llgs_test() 159 160 def tearDown(self): 161 self.logger.removeHandler(self._verbose_log_handler) 162 self._verbose_log_handler = None 163 TestBase.tearDown(self) 164 165 def getLocalServerLogFile(self): 166 return self.getLogBasenameForCurrentTest() + "-server.log" 167 168 def setUpServerLogging(self, is_llgs): 169 if len(lldbtest_config.channels) == 0: 170 return # No logging requested 171 172 if lldb.remote_platform: 173 log_file = lldbutil.join_remote_paths( 174 lldb.remote_platform.GetWorkingDirectory(), "server.log") 175 else: 176 log_file = self.getLocalServerLogFile() 177 178 if is_llgs: 179 self.debug_monitor_extra_args.append("--log-file=" + log_file) 180 self.debug_monitor_extra_args.append( 181 "--log-channels={}".format(":".join(lldbtest_config.channels))) 182 else: 183 self.debug_monitor_extra_args = [ 184 "--log-file=" + log_file, "--log-flags=0x800000"] 185 186 def get_next_port(self): 187 return 12000 + random.randint(0, 3999) 188 189 def reset_test_sequence(self): 190 self.test_sequence = GdbRemoteTestSequence(self.logger) 191 192 193 def _init_llgs_test(self): 194 reverse_connect = True 195 if lldb.remote_platform: 196 # Reverse connections may be tricky due to firewalls/NATs. 197 reverse_connect = False 198 199 # FIXME: This is extremely linux-oriented 200 201 # Grab the ppid from /proc/[shell pid]/stat 202 err, retcode, shell_stat = self.run_platform_command( 203 "cat /proc/$$/stat") 204 self.assertTrue( 205 err.Success() and retcode == 0, 206 "Failed to read file /proc/$$/stat: %s, retcode: %d" % 207 (err.GetCString(), 208 retcode)) 209 210 # [pid] ([executable]) [state] [*ppid*] 211 pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1) 212 err, retcode, ls_output = self.run_platform_command( 213 "ls -l /proc/%s/exe" % pid) 214 self.assertTrue( 215 err.Success() and retcode == 0, 216 "Failed to read file /proc/%s/exe: %s, retcode: %d" % 217 (pid, 218 err.GetCString(), 219 retcode)) 220 exe = ls_output.split()[-1] 221 222 # If the binary has been deleted, the link name has " (deleted)" appended. 223 # Remove if it's there. 224 self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe) 225 else: 226 self.debug_monitor_exe = get_lldb_server_exe() 227 228 self.debug_monitor_extra_args = ["gdbserver"] 229 self.setUpServerLogging(is_llgs=True) 230 231 self.reverse_connect = reverse_connect 232 233 def _init_debugserver_test(self): 234 self.debug_monitor_exe = get_debugserver_exe() 235 self.setUpServerLogging(is_llgs=False) 236 self.reverse_connect = True 237 238 # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification 239 # when the process truly dies. 240 self.stub_sends_two_stop_notifications_on_kill = True 241 242 def forward_adb_port(self, source, target, direction, device): 243 adb = ['adb'] + (['-s', device] if device else []) + [direction] 244 245 def remove_port_forward(): 246 subprocess.call(adb + ["--remove", "tcp:%d" % source]) 247 248 subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target]) 249 self.addTearDownHook(remove_port_forward) 250 251 def _verify_socket(self, sock): 252 # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the 253 # connect() attempt. However, due to the way how ADB forwarding works, on android targets 254 # the connect() will always be successful, but the connection will be immediately dropped 255 # if ADB could not connect on the remote side. This function tries to detect this 256 # situation, and report it as "connection refused" so that the upper layers attempt the 257 # connection again. 258 triple = self.dbg.GetSelectedPlatform().GetTriple() 259 if not re.match(".*-.*-.*-android", triple): 260 return # Not android. 261 can_read, _, _ = select.select([sock], [], [], 0.1) 262 if sock not in can_read: 263 return # Data is not available, but the connection is alive. 264 if len(sock.recv(1, socket.MSG_PEEK)) == 0: 265 raise _ConnectionRefused() # Got EOF, connection dropped. 266 267 def create_socket(self): 268 try: 269 sock = socket.socket(family=socket.AF_INET) 270 except OSError as e: 271 if e.errno != errno.EAFNOSUPPORT: 272 raise 273 sock = socket.socket(family=socket.AF_INET6) 274 275 logger = self.logger 276 277 triple = self.dbg.GetSelectedPlatform().GetTriple() 278 if re.match(".*-.*-.*-android", triple): 279 self.forward_adb_port( 280 self.port, 281 self.port, 282 "forward", 283 self.stub_device) 284 285 logger.info( 286 "Connecting to debug monitor on %s:%d", 287 self.stub_hostname, 288 self.port) 289 connect_info = (self.stub_hostname, self.port) 290 try: 291 sock.connect(connect_info) 292 except socket.error as serr: 293 if serr.errno == errno.ECONNREFUSED: 294 raise _ConnectionRefused() 295 raise serr 296 297 def shutdown_socket(): 298 if sock: 299 try: 300 # send the kill packet so lldb-server shuts down gracefully 301 sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET) 302 except: 303 logger.warning( 304 "failed to send kill packet to debug monitor: {}; ignoring".format( 305 sys.exc_info()[0])) 306 307 try: 308 sock.close() 309 except: 310 logger.warning( 311 "failed to close socket to debug monitor: {}; ignoring".format( 312 sys.exc_info()[0])) 313 314 self.addTearDownHook(shutdown_socket) 315 316 self._verify_socket(sock) 317 318 return sock 319 320 def set_inferior_startup_launch(self): 321 self._inferior_startup = self._STARTUP_LAUNCH 322 323 def set_inferior_startup_attach(self): 324 self._inferior_startup = self._STARTUP_ATTACH 325 326 def set_inferior_startup_attach_manually(self): 327 self._inferior_startup = self._STARTUP_ATTACH_MANUALLY 328 329 def get_debug_monitor_command_line_args(self, attach_pid=None): 330 commandline_args = self.debug_monitor_extra_args 331 if attach_pid: 332 commandline_args += ["--attach=%d" % attach_pid] 333 if self.reverse_connect: 334 commandline_args += ["--reverse-connect", self.connect_address] 335 else: 336 if lldb.remote_platform: 337 commandline_args += ["*:{}".format(self.port)] 338 else: 339 commandline_args += ["localhost:{}".format(self.port)] 340 341 return commandline_args 342 343 def get_target_byte_order(self): 344 inferior_exe_path = self.getBuildArtifact("a.out") 345 target = self.dbg.CreateTarget(inferior_exe_path) 346 return target.GetByteOrder() 347 348 def launch_debug_monitor(self, attach_pid=None, logfile=None): 349 if self.reverse_connect: 350 family, type, proto, _, addr = socket.getaddrinfo("localhost", 0, proto=socket.IPPROTO_TCP)[0] 351 sock = socket.socket(family, type, proto) 352 sock.settimeout(self.DEFAULT_TIMEOUT) 353 354 sock.bind(addr) 355 sock.listen(1) 356 addr = sock.getsockname() 357 self.connect_address = "[{}]:{}".format(*addr) 358 359 360 # Create the command line. 361 commandline_args = self.get_debug_monitor_command_line_args( 362 attach_pid=attach_pid) 363 364 # Start the server. 365 server = self.spawnSubprocess( 366 self.debug_monitor_exe, 367 commandline_args, 368 install_remote=False) 369 self.assertIsNotNone(server) 370 371 if self.reverse_connect: 372 self.sock = sock.accept()[0] 373 self.sock.settimeout(self.DEFAULT_TIMEOUT) 374 375 return server 376 377 def connect_to_debug_monitor(self, attach_pid=None): 378 if self.reverse_connect: 379 # Create the stub. 380 server = self.launch_debug_monitor(attach_pid=attach_pid) 381 self.assertIsNotNone(server) 382 383 # Schedule debug monitor to be shut down during teardown. 384 logger = self.logger 385 386 self._server = Server(self.sock, server) 387 return server 388 389 # We're using a random port algorithm to try not to collide with other ports, 390 # and retry a max # times. 391 attempts = 0 392 MAX_ATTEMPTS = 20 393 394 while attempts < MAX_ATTEMPTS: 395 server = self.launch_debug_monitor(attach_pid=attach_pid) 396 397 # Schedule debug monitor to be shut down during teardown. 398 logger = self.logger 399 400 connect_attemps = 0 401 MAX_CONNECT_ATTEMPTS = 10 402 403 while connect_attemps < MAX_CONNECT_ATTEMPTS: 404 # Create a socket to talk to the server 405 try: 406 logger.info("Connect attempt %d", connect_attemps + 1) 407 self.sock = self.create_socket() 408 self._server = Server(self.sock, server) 409 return server 410 except _ConnectionRefused as serr: 411 # Ignore, and try again. 412 pass 413 time.sleep(0.5) 414 connect_attemps += 1 415 416 # We should close the server here to be safe. 417 server.terminate() 418 419 # Increment attempts. 420 print( 421 "connect to debug monitor on port %d failed, attempt #%d of %d" % 422 (self.port, attempts + 1, MAX_ATTEMPTS)) 423 attempts += 1 424 425 # And wait a random length of time before next attempt, to avoid 426 # collisions. 427 time.sleep(random.randint(1, 5)) 428 429 # Now grab a new port number. 430 self.port = self.get_next_port() 431 432 raise Exception( 433 "failed to create a socket to the launched debug monitor after %d tries" % 434 attempts) 435 436 def launch_process_for_attach( 437 self, 438 inferior_args=None, 439 sleep_seconds=3, 440 exe_path=None): 441 # We're going to start a child process that the debug monitor stub can later attach to. 442 # This process needs to be started so that it just hangs around for a while. We'll 443 # have it sleep. 444 if not exe_path: 445 exe_path = self.getBuildArtifact("a.out") 446 447 args = [] 448 if inferior_args: 449 args.extend(inferior_args) 450 if sleep_seconds: 451 args.append("sleep:%d" % sleep_seconds) 452 453 return self.spawnSubprocess(exe_path, args) 454 455 def prep_debug_monitor_and_inferior( 456 self, 457 inferior_args=None, 458 inferior_sleep_seconds=3, 459 inferior_exe_path=None, 460 inferior_env=None): 461 """Prep the debug monitor, the inferior, and the expected packet stream. 462 463 Handle the separate cases of using the debug monitor in attach-to-inferior mode 464 and in launch-inferior mode. 465 466 For attach-to-inferior mode, the inferior process is first started, then 467 the debug monitor is started in attach to pid mode (using --attach on the 468 stub command line), and the no-ack-mode setup is appended to the packet 469 stream. The packet stream is not yet executed, ready to have more expected 470 packet entries added to it. 471 472 For launch-inferior mode, the stub is first started, then no ack mode is 473 setup on the expected packet stream, then the verified launch packets are added 474 to the expected socket stream. The packet stream is not yet executed, ready 475 to have more expected packet entries added to it. 476 477 The return value is: 478 {inferior:<inferior>, server:<server>} 479 """ 480 inferior = None 481 attach_pid = None 482 483 if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY: 484 # Launch the process that we'll use as the inferior. 485 inferior = self.launch_process_for_attach( 486 inferior_args=inferior_args, 487 sleep_seconds=inferior_sleep_seconds, 488 exe_path=inferior_exe_path) 489 self.assertIsNotNone(inferior) 490 self.assertTrue(inferior.pid > 0) 491 if self._inferior_startup == self._STARTUP_ATTACH: 492 # In this case, we want the stub to attach via the command 493 # line, so set the command line attach pid here. 494 attach_pid = inferior.pid 495 496 if self._inferior_startup == self._STARTUP_LAUNCH: 497 # Build launch args 498 if not inferior_exe_path: 499 inferior_exe_path = self.getBuildArtifact("a.out") 500 501 if lldb.remote_platform: 502 remote_path = lldbutil.append_to_process_working_directory(self, 503 os.path.basename(inferior_exe_path)) 504 remote_file_spec = lldb.SBFileSpec(remote_path, False) 505 err = lldb.remote_platform.Install(lldb.SBFileSpec( 506 inferior_exe_path, True), remote_file_spec) 507 if err.Fail(): 508 raise Exception( 509 "remote_platform.Install('%s', '%s') failed: %s" % 510 (inferior_exe_path, remote_path, err)) 511 inferior_exe_path = remote_path 512 513 launch_args = [inferior_exe_path] 514 if inferior_args: 515 launch_args.extend(inferior_args) 516 517 # Launch the debug monitor stub, attaching to the inferior. 518 server = self.connect_to_debug_monitor(attach_pid=attach_pid) 519 self.assertIsNotNone(server) 520 521 self.do_handshake() 522 523 # Build the expected protocol stream 524 if inferior_env: 525 for name, value in inferior_env.items(): 526 self.add_set_environment_packets(name, value) 527 if self._inferior_startup == self._STARTUP_LAUNCH: 528 self.add_verified_launch_packets(launch_args) 529 530 return {"inferior": inferior, "server": server} 531 532 def do_handshake(self): 533 server = self._server 534 server.send_ack() 535 server.send_packet(b"QStartNoAckMode") 536 self.assertEqual(server.get_normal_packet(), b"+") 537 self.assertEqual(server.get_normal_packet(), b"OK") 538 server.send_ack() 539 540 def add_verified_launch_packets(self, launch_args): 541 self.test_sequence.add_log_lines( 542 ["read packet: %s" % build_gdbremote_A_packet(launch_args), 543 "send packet: $OK#00", 544 "read packet: $qLaunchSuccess#a5", 545 "send packet: $OK#00"], 546 True) 547 548 def add_thread_suffix_request_packets(self): 549 self.test_sequence.add_log_lines( 550 ["read packet: $QThreadSuffixSupported#e4", 551 "send packet: $OK#00", 552 ], True) 553 554 def add_process_info_collection_packets(self): 555 self.test_sequence.add_log_lines( 556 ["read packet: $qProcessInfo#dc", 557 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}], 558 True) 559 560 def add_set_environment_packets(self, name, value): 561 self.test_sequence.add_log_lines( 562 ["read packet: $QEnvironment:" + name + "=" + value + "#00", 563 "send packet: $OK#00", 564 ], True) 565 566 _KNOWN_PROCESS_INFO_KEYS = [ 567 "pid", 568 "parent-pid", 569 "real-uid", 570 "real-gid", 571 "effective-uid", 572 "effective-gid", 573 "cputype", 574 "cpusubtype", 575 "ostype", 576 "triple", 577 "vendor", 578 "endian", 579 "elf_abi", 580 "ptrsize" 581 ] 582 583 def parse_process_info_response(self, context): 584 # Ensure we have a process info response. 585 self.assertIsNotNone(context) 586 process_info_raw = context.get("process_info_raw") 587 self.assertIsNotNone(process_info_raw) 588 589 # Pull out key:value; pairs. 590 process_info_dict = { 591 match.group(1): match.group(2) for match in re.finditer( 592 r"([^:]+):([^;]+);", process_info_raw)} 593 594 # Validate keys are known. 595 for (key, val) in list(process_info_dict.items()): 596 self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS) 597 self.assertIsNotNone(val) 598 599 return process_info_dict 600 601 def add_register_info_collection_packets(self): 602 self.test_sequence.add_log_lines( 603 [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True, 604 "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"), 605 "save_key": "reg_info_responses"}], 606 True) 607 608 def parse_register_info_packets(self, context): 609 """Return an array of register info dictionaries, one per register info.""" 610 reg_info_responses = context.get("reg_info_responses") 611 self.assertIsNotNone(reg_info_responses) 612 613 # Parse register infos. 614 return [parse_reg_info_response(reg_info_response) 615 for reg_info_response in reg_info_responses] 616 617 def expect_gdbremote_sequence(self): 618 return expect_lldb_gdbserver_replay( 619 self, 620 self._server, 621 self.test_sequence, 622 self.DEFAULT_TIMEOUT * len(self.test_sequence), 623 self.logger) 624 625 _KNOWN_REGINFO_KEYS = [ 626 "name", 627 "alt-name", 628 "bitsize", 629 "offset", 630 "encoding", 631 "format", 632 "set", 633 "gcc", 634 "ehframe", 635 "dwarf", 636 "generic", 637 "container-regs", 638 "invalidate-regs", 639 "dynamic_size_dwarf_expr_bytes", 640 "dynamic_size_dwarf_len" 641 ] 642 643 def assert_valid_reg_info(self, reg_info): 644 # Assert we know about all the reginfo keys parsed. 645 for key in reg_info: 646 self.assertTrue(key in self._KNOWN_REGINFO_KEYS) 647 648 # Check the bare-minimum expected set of register info keys. 649 self.assertTrue("name" in reg_info) 650 self.assertTrue("bitsize" in reg_info) 651 652 if not self.getArchitecture() == 'aarch64': 653 self.assertTrue("offset" in reg_info) 654 655 self.assertTrue("encoding" in reg_info) 656 self.assertTrue("format" in reg_info) 657 658 def find_pc_reg_info(self, reg_infos): 659 lldb_reg_index = 0 660 for reg_info in reg_infos: 661 if ("generic" in reg_info) and (reg_info["generic"] == "pc"): 662 return (lldb_reg_index, reg_info) 663 lldb_reg_index += 1 664 665 return (None, None) 666 667 def add_lldb_register_index(self, reg_infos): 668 """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry. 669 670 We'll use this when we want to call packets like P/p with a register index but do so 671 on only a subset of the full register info set. 672 """ 673 self.assertIsNotNone(reg_infos) 674 675 reg_index = 0 676 for reg_info in reg_infos: 677 reg_info["lldb_register_index"] = reg_index 678 reg_index += 1 679 680 def add_query_memory_region_packets(self, address): 681 self.test_sequence.add_log_lines( 682 ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address), 683 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}], 684 True) 685 686 def parse_key_val_dict(self, key_val_text, allow_dupes=True): 687 self.assertIsNotNone(key_val_text) 688 kv_dict = {} 689 for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text): 690 key = match.group(1) 691 val = match.group(2) 692 if key in kv_dict: 693 if allow_dupes: 694 if isinstance(kv_dict[key], list): 695 kv_dict[key].append(val) 696 else: 697 # Promote to list 698 kv_dict[key] = [kv_dict[key], val] 699 else: 700 self.fail( 701 "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format( 702 key, val, key_val_text, kv_dict)) 703 else: 704 kv_dict[key] = val 705 return kv_dict 706 707 def parse_memory_region_packet(self, context): 708 # Ensure we have a context. 709 self.assertIsNotNone(context.get("memory_region_response")) 710 711 # Pull out key:value; pairs. 712 mem_region_dict = self.parse_key_val_dict( 713 context.get("memory_region_response")) 714 715 # Validate keys are known. 716 for (key, val) in list(mem_region_dict.items()): 717 self.assertIn(key, 718 ["start", 719 "size", 720 "permissions", 721 "flags", 722 "name", 723 "error", 724 "dirty-pages", 725 "type"]) 726 self.assertIsNotNone(val) 727 728 mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", "")) 729 # Return the dictionary of key-value pairs for the memory region. 730 return mem_region_dict 731 732 def assert_address_within_memory_region( 733 self, test_address, mem_region_dict): 734 self.assertIsNotNone(mem_region_dict) 735 self.assertTrue("start" in mem_region_dict) 736 self.assertTrue("size" in mem_region_dict) 737 738 range_start = int(mem_region_dict["start"], 16) 739 range_size = int(mem_region_dict["size"], 16) 740 range_end = range_start + range_size 741 742 if test_address < range_start: 743 self.fail( 744 "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 745 test_address, 746 range_start, 747 range_end, 748 range_size)) 749 elif test_address >= range_end: 750 self.fail( 751 "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 752 test_address, 753 range_start, 754 range_end, 755 range_size)) 756 757 def add_threadinfo_collection_packets(self): 758 self.test_sequence.add_log_lines( 759 [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo", 760 "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"), 761 "save_key": "threadinfo_responses"}], 762 True) 763 764 def parse_threadinfo_packets(self, context): 765 """Return an array of thread ids (decimal ints), one per thread.""" 766 threadinfo_responses = context.get("threadinfo_responses") 767 self.assertIsNotNone(threadinfo_responses) 768 769 thread_ids = [] 770 for threadinfo_response in threadinfo_responses: 771 new_thread_infos = parse_threadinfo_response(threadinfo_response) 772 thread_ids.extend(new_thread_infos) 773 return thread_ids 774 775 def launch_with_threads(self, thread_count): 776 procs = self.prep_debug_monitor_and_inferior( 777 inferior_args=["thread:new"]*(thread_count-1) + ["trap"]) 778 779 self.test_sequence.add_log_lines([ 780 "read packet: $c#00", 781 {"direction": "send", 782 "regex": r"^\$T([0-9a-fA-F]{2})([^#]*)#..$", 783 "capture": {1: "stop_signo", 2: "stop_reply_kv"}}], True) 784 self.add_threadinfo_collection_packets() 785 context = self.expect_gdbremote_sequence() 786 threads = self.parse_threadinfo_packets(context) 787 self.assertGreaterEqual(len(threads), thread_count) 788 return context, threads 789 790 def add_set_breakpoint_packets( 791 self, 792 address, 793 z_packet_type=0, 794 do_continue=True, 795 breakpoint_kind=1): 796 self.test_sequence.add_log_lines( 797 [ # Set the breakpoint. 798 "read packet: $Z{2},{0:x},{1}#00".format( 799 address, breakpoint_kind, z_packet_type), 800 # Verify the stub could set it. 801 "send packet: $OK#00", 802 ], True) 803 804 if (do_continue): 805 self.test_sequence.add_log_lines( 806 [ # Continue the inferior. 807 "read packet: $c#63", 808 # Expect a breakpoint stop report. 809 {"direction": "send", 810 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 811 "capture": {1: "stop_signo", 812 2: "stop_thread_id"}}, 813 ], True) 814 815 def add_remove_breakpoint_packets( 816 self, 817 address, 818 z_packet_type=0, 819 breakpoint_kind=1): 820 self.test_sequence.add_log_lines( 821 [ # Remove the breakpoint. 822 "read packet: $z{2},{0:x},{1}#00".format( 823 address, breakpoint_kind, z_packet_type), 824 # Verify the stub could unset it. 825 "send packet: $OK#00", 826 ], True) 827 828 def add_qSupported_packets(self, client_features=[]): 829 features = ''.join(';' + x for x in client_features) 830 self.test_sequence.add_log_lines( 831 ["read packet: $qSupported{}#00".format(features), 832 {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}}, 833 ], True) 834 835 _KNOWN_QSUPPORTED_STUB_FEATURES = [ 836 "augmented-libraries-svr4-read", 837 "PacketSize", 838 "QStartNoAckMode", 839 "QThreadSuffixSupported", 840 "QListThreadsInStopReply", 841 "qXfer:auxv:read", 842 "qXfer:libraries:read", 843 "qXfer:libraries-svr4:read", 844 "qXfer:features:read", 845 "qXfer:siginfo:read", 846 "qEcho", 847 "QPassSignals", 848 "multiprocess", 849 "fork-events", 850 "vfork-events", 851 "memory-tagging", 852 "qSaveCore", 853 "native-signals", 854 ] 855 856 def parse_qSupported_response(self, context): 857 self.assertIsNotNone(context) 858 859 raw_response = context.get("qSupported_response") 860 self.assertIsNotNone(raw_response) 861 862 # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the 863 # +,-,? is stripped from the key and set as the value. 864 supported_dict = {} 865 for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response): 866 key = match.group(1) 867 val = match.group(3) 868 869 # key=val: store as is 870 if val and len(val) > 0: 871 supported_dict[key] = val 872 else: 873 if len(key) < 2: 874 raise Exception( 875 "singular stub feature is too short: must be stub_feature{+,-,?}") 876 supported_type = key[-1] 877 key = key[:-1] 878 if not supported_type in ["+", "-", "?"]: 879 raise Exception( 880 "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type)) 881 supported_dict[key] = supported_type 882 # Ensure we know the supported element 883 if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES: 884 raise Exception( 885 "unknown qSupported stub feature reported: %s" % 886 key) 887 888 return supported_dict 889 890 def continue_process_and_wait_for_stop(self): 891 self.test_sequence.add_log_lines( 892 [ 893 "read packet: $vCont;c#a8", 894 { 895 "direction": "send", 896 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 897 "capture": {1: "stop_signo", 2: "stop_key_val_text"}, 898 }, 899 ], 900 True, 901 ) 902 context = self.expect_gdbremote_sequence() 903 self.assertIsNotNone(context) 904 return self.parse_interrupt_packets(context) 905 906 def select_modifiable_register(self, reg_infos): 907 """Find a register that can be read/written freely.""" 908 PREFERRED_REGISTER_NAMES = set(["rax", ]) 909 910 # First check for the first register from the preferred register name 911 # set. 912 alternative_register_index = None 913 914 self.assertIsNotNone(reg_infos) 915 for reg_info in reg_infos: 916 if ("name" in reg_info) and ( 917 reg_info["name"] in PREFERRED_REGISTER_NAMES): 918 # We found a preferred register. Use it. 919 return reg_info["lldb_register_index"] 920 if ("generic" in reg_info) and (reg_info["generic"] == "fp" or 921 reg_info["generic"] == "arg1"): 922 # A frame pointer or first arg register will do as a 923 # register to modify temporarily. 924 alternative_register_index = reg_info["lldb_register_index"] 925 926 # We didn't find a preferred register. Return whatever alternative register 927 # we found, if any. 928 return alternative_register_index 929 930 def extract_registers_from_stop_notification(self, stop_key_vals_text): 931 self.assertIsNotNone(stop_key_vals_text) 932 kv_dict = self.parse_key_val_dict(stop_key_vals_text) 933 934 registers = {} 935 for (key, val) in list(kv_dict.items()): 936 if re.match(r"^[0-9a-fA-F]+$", key): 937 registers[int(key, 16)] = val 938 return registers 939 940 def gather_register_infos(self): 941 self.reset_test_sequence() 942 self.add_register_info_collection_packets() 943 944 context = self.expect_gdbremote_sequence() 945 self.assertIsNotNone(context) 946 947 reg_infos = self.parse_register_info_packets(context) 948 self.assertIsNotNone(reg_infos) 949 self.add_lldb_register_index(reg_infos) 950 951 return reg_infos 952 953 def find_generic_register_with_name(self, reg_infos, generic_name): 954 self.assertIsNotNone(reg_infos) 955 for reg_info in reg_infos: 956 if ("generic" in reg_info) and ( 957 reg_info["generic"] == generic_name): 958 return reg_info 959 return None 960 961 def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num): 962 self.assertIsNotNone(reg_infos) 963 for reg_info in reg_infos: 964 if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num): 965 return reg_info 966 return None 967 968 def decode_gdbremote_binary(self, encoded_bytes): 969 decoded_bytes = "" 970 i = 0 971 while i < len(encoded_bytes): 972 if encoded_bytes[i] == "}": 973 # Handle escaped char. 974 self.assertTrue(i + 1 < len(encoded_bytes)) 975 decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20) 976 i += 2 977 elif encoded_bytes[i] == "*": 978 # Handle run length encoding. 979 self.assertTrue(len(decoded_bytes) > 0) 980 self.assertTrue(i + 1 < len(encoded_bytes)) 981 repeat_count = ord(encoded_bytes[i + 1]) - 29 982 decoded_bytes += decoded_bytes[-1] * repeat_count 983 i += 2 984 else: 985 decoded_bytes += encoded_bytes[i] 986 i += 1 987 return decoded_bytes 988 989 def build_auxv_dict(self, endian, word_size, auxv_data): 990 self.assertIsNotNone(endian) 991 self.assertIsNotNone(word_size) 992 self.assertIsNotNone(auxv_data) 993 994 auxv_dict = {} 995 996 # PowerPC64le's auxvec has a special key that must be ignored. 997 # This special key may be used multiple times, resulting in 998 # multiple key/value pairs with the same key, which would otherwise 999 # break this test check for repeated keys. 1000 # 1001 # AT_IGNOREPPC = 22 1002 ignored_keys_for_arch = { 'powerpc64le' : [22] } 1003 arch = self.getArchitecture() 1004 ignore_keys = None 1005 if arch in ignored_keys_for_arch: 1006 ignore_keys = ignored_keys_for_arch[arch] 1007 1008 while len(auxv_data) > 0: 1009 # Chop off key. 1010 raw_key = auxv_data[:word_size] 1011 auxv_data = auxv_data[word_size:] 1012 1013 # Chop of value. 1014 raw_value = auxv_data[:word_size] 1015 auxv_data = auxv_data[word_size:] 1016 1017 # Convert raw text from target endian. 1018 key = unpack_endian_binary_string(endian, raw_key) 1019 value = unpack_endian_binary_string(endian, raw_value) 1020 1021 if ignore_keys and key in ignore_keys: 1022 continue 1023 1024 # Handle ending entry. 1025 if key == 0: 1026 self.assertEqual(value, 0) 1027 return auxv_dict 1028 1029 # The key should not already be present. 1030 self.assertFalse(key in auxv_dict) 1031 auxv_dict[key] = value 1032 1033 self.fail( 1034 "should not reach here - implies required double zero entry not found") 1035 return auxv_dict 1036 1037 def read_binary_data_in_chunks(self, command_prefix, chunk_length): 1038 """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned.""" 1039 offset = 0 1040 done = False 1041 decoded_data = "" 1042 1043 while not done: 1044 # Grab the next iteration of data. 1045 self.reset_test_sequence() 1046 self.test_sequence.add_log_lines( 1047 [ 1048 "read packet: ${}{:x},{:x}:#00".format( 1049 command_prefix, 1050 offset, 1051 chunk_length), 1052 { 1053 "direction": "send", 1054 "regex": re.compile( 1055 r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", 1056 re.MULTILINE | re.DOTALL), 1057 "capture": { 1058 1: "response_type", 1059 2: "content_raw"}}], 1060 True) 1061 1062 context = self.expect_gdbremote_sequence() 1063 self.assertIsNotNone(context) 1064 1065 response_type = context.get("response_type") 1066 self.assertIsNotNone(response_type) 1067 self.assertTrue(response_type in ["l", "m"]) 1068 1069 # Move offset along. 1070 offset += chunk_length 1071 1072 # Figure out if we're done. We're done if the response type is l. 1073 done = response_type == "l" 1074 1075 # Decode binary data. 1076 content_raw = context.get("content_raw") 1077 if content_raw and len(content_raw) > 0: 1078 self.assertIsNotNone(content_raw) 1079 decoded_data += self.decode_gdbremote_binary(content_raw) 1080 return decoded_data 1081 1082 def add_interrupt_packets(self): 1083 self.test_sequence.add_log_lines([ 1084 # Send the intterupt. 1085 "read packet: {}".format(chr(3)), 1086 # And wait for the stop notification. 1087 {"direction": "send", 1088 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 1089 "capture": {1: "stop_signo", 1090 2: "stop_key_val_text"}}, 1091 ], True) 1092 1093 def parse_interrupt_packets(self, context): 1094 self.assertIsNotNone(context.get("stop_signo")) 1095 self.assertIsNotNone(context.get("stop_key_val_text")) 1096 return (int(context["stop_signo"], 16), self.parse_key_val_dict( 1097 context["stop_key_val_text"])) 1098 1099 def add_QSaveRegisterState_packets(self, thread_id): 1100 if thread_id: 1101 # Use the thread suffix form. 1102 request = "read packet: $QSaveRegisterState;thread:{:x}#00".format( 1103 thread_id) 1104 else: 1105 request = "read packet: $QSaveRegisterState#00" 1106 1107 self.test_sequence.add_log_lines([request, 1108 {"direction": "send", 1109 "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$", 1110 "capture": {1: "save_response"}}, 1111 ], 1112 True) 1113 1114 def parse_QSaveRegisterState_response(self, context): 1115 self.assertIsNotNone(context) 1116 1117 save_response = context.get("save_response") 1118 self.assertIsNotNone(save_response) 1119 1120 if len(save_response) < 1 or save_response[0] == "E": 1121 # error received 1122 return (False, None) 1123 else: 1124 return (True, int(save_response)) 1125 1126 def add_QRestoreRegisterState_packets(self, save_id, thread_id=None): 1127 if thread_id: 1128 # Use the thread suffix form. 1129 request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format( 1130 save_id, thread_id) 1131 else: 1132 request = "read packet: $QRestoreRegisterState:{}#00".format( 1133 save_id) 1134 1135 self.test_sequence.add_log_lines([ 1136 request, 1137 "send packet: $OK#00" 1138 ], True) 1139 1140 def flip_all_bits_in_each_register_value( 1141 self, reg_infos, endian, thread_id=None): 1142 self.assertIsNotNone(reg_infos) 1143 1144 successful_writes = 0 1145 failed_writes = 0 1146 1147 for reg_info in reg_infos: 1148 # Use the lldb register index added to the reg info. We're not necessarily 1149 # working off a full set of register infos, so an inferred register 1150 # index could be wrong. 1151 reg_index = reg_info["lldb_register_index"] 1152 self.assertIsNotNone(reg_index) 1153 1154 reg_byte_size = int(reg_info["bitsize"]) // 8 1155 self.assertTrue(reg_byte_size > 0) 1156 1157 # Handle thread suffix. 1158 if thread_id: 1159 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1160 reg_index, thread_id) 1161 else: 1162 p_request = "read packet: $p{:x}#00".format(reg_index) 1163 1164 # Read the existing value. 1165 self.reset_test_sequence() 1166 self.test_sequence.add_log_lines([ 1167 p_request, 1168 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1169 ], True) 1170 context = self.expect_gdbremote_sequence() 1171 self.assertIsNotNone(context) 1172 1173 # Verify the response length. 1174 p_response = context.get("p_response") 1175 self.assertIsNotNone(p_response) 1176 initial_reg_value = unpack_register_hex_unsigned( 1177 endian, p_response) 1178 1179 # Flip the value by xoring with all 1s 1180 all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8) 1181 flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16) 1182 # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int)) 1183 1184 # Handle thread suffix for P. 1185 if thread_id: 1186 P_request = "read packet: $P{:x}={};thread:{:x}#00".format( 1187 reg_index, pack_register_hex( 1188 endian, flipped_bits_int, byte_size=reg_byte_size), thread_id) 1189 else: 1190 P_request = "read packet: $P{:x}={}#00".format( 1191 reg_index, pack_register_hex( 1192 endian, flipped_bits_int, byte_size=reg_byte_size)) 1193 1194 # Write the flipped value to the register. 1195 self.reset_test_sequence() 1196 self.test_sequence.add_log_lines([P_request, 1197 {"direction": "send", 1198 "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", 1199 "capture": {1: "P_response"}}, 1200 ], 1201 True) 1202 context = self.expect_gdbremote_sequence() 1203 self.assertIsNotNone(context) 1204 1205 # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail 1206 # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them 1207 # all flipping perfectly. 1208 P_response = context.get("P_response") 1209 self.assertIsNotNone(P_response) 1210 if P_response == "OK": 1211 successful_writes += 1 1212 else: 1213 failed_writes += 1 1214 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response)) 1215 1216 # Read back the register value, ensure it matches the flipped 1217 # value. 1218 if P_response == "OK": 1219 self.reset_test_sequence() 1220 self.test_sequence.add_log_lines([ 1221 p_request, 1222 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1223 ], True) 1224 context = self.expect_gdbremote_sequence() 1225 self.assertIsNotNone(context) 1226 1227 verify_p_response_raw = context.get("p_response") 1228 self.assertIsNotNone(verify_p_response_raw) 1229 verify_bits = unpack_register_hex_unsigned( 1230 endian, verify_p_response_raw) 1231 1232 if verify_bits != flipped_bits_int: 1233 # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts. 1234 # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits)) 1235 successful_writes -= 1 1236 failed_writes += 1 1237 1238 return (successful_writes, failed_writes) 1239 1240 def is_bit_flippable_register(self, reg_info): 1241 if not reg_info: 1242 return False 1243 if not "set" in reg_info: 1244 return False 1245 if reg_info["set"] != "General Purpose Registers": 1246 return False 1247 if ("container-regs" in reg_info) and ( 1248 len(reg_info["container-regs"]) > 0): 1249 # Don't try to bit flip registers contained in another register. 1250 return False 1251 if re.match("^.s$", reg_info["name"]): 1252 # This is a 2-letter register name that ends in "s", like a segment register. 1253 # Don't try to bit flip these. 1254 return False 1255 if re.match("^(c|)psr$", reg_info["name"]): 1256 # This is an ARM program status register; don't flip it. 1257 return False 1258 # Okay, this looks fine-enough. 1259 return True 1260 1261 def read_register_values(self, reg_infos, endian, thread_id=None): 1262 self.assertIsNotNone(reg_infos) 1263 values = {} 1264 1265 for reg_info in reg_infos: 1266 # We append a register index when load reg infos so we can work 1267 # with subsets. 1268 reg_index = reg_info.get("lldb_register_index") 1269 self.assertIsNotNone(reg_index) 1270 1271 # Handle thread suffix. 1272 if thread_id: 1273 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1274 reg_index, thread_id) 1275 else: 1276 p_request = "read packet: $p{:x}#00".format(reg_index) 1277 1278 # Read it with p. 1279 self.reset_test_sequence() 1280 self.test_sequence.add_log_lines([ 1281 p_request, 1282 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1283 ], True) 1284 context = self.expect_gdbremote_sequence() 1285 self.assertIsNotNone(context) 1286 1287 # Convert value from target endian to integral. 1288 p_response = context.get("p_response") 1289 self.assertIsNotNone(p_response) 1290 self.assertTrue(len(p_response) > 0) 1291 self.assertFalse(p_response[0] == "E") 1292 1293 values[reg_index] = unpack_register_hex_unsigned( 1294 endian, p_response) 1295 1296 return values 1297 1298 def add_vCont_query_packets(self): 1299 self.test_sequence.add_log_lines(["read packet: $vCont?#49", 1300 {"direction": "send", 1301 "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", 1302 "capture": {2: "vCont_query_response"}}, 1303 ], 1304 True) 1305 1306 def parse_vCont_query_response(self, context): 1307 self.assertIsNotNone(context) 1308 vCont_query_response = context.get("vCont_query_response") 1309 1310 # Handle case of no vCont support at all - in which case the capture 1311 # group will be none or zero length. 1312 if not vCont_query_response or len(vCont_query_response) == 0: 1313 return {} 1314 1315 return {key: 1 for key in vCont_query_response.split( 1316 ";") if key and len(key) > 0} 1317 1318 def count_single_steps_until_true( 1319 self, 1320 thread_id, 1321 predicate, 1322 args, 1323 max_step_count=100, 1324 use_Hc_packet=True, 1325 step_instruction="s"): 1326 """Used by single step test that appears in a few different contexts.""" 1327 single_step_count = 0 1328 1329 while single_step_count < max_step_count: 1330 self.assertIsNotNone(thread_id) 1331 1332 # Build the packet for the single step instruction. We replace 1333 # {thread}, if present, with the thread_id. 1334 step_packet = "read packet: ${}#00".format( 1335 re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction)) 1336 # print("\nstep_packet created: {}\n".format(step_packet)) 1337 1338 # Single step. 1339 self.reset_test_sequence() 1340 if use_Hc_packet: 1341 self.test_sequence.add_log_lines( 1342 [ # Set the continue thread. 1343 "read packet: $Hc{0:x}#00".format(thread_id), 1344 "send packet: $OK#00", 1345 ], True) 1346 self.test_sequence.add_log_lines([ 1347 # Single step. 1348 step_packet, 1349 # "read packet: $vCont;s:{0:x}#00".format(thread_id), 1350 # Expect a breakpoint stop report. 1351 {"direction": "send", 1352 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 1353 "capture": {1: "stop_signo", 1354 2: "stop_thread_id"}}, 1355 ], True) 1356 context = self.expect_gdbremote_sequence() 1357 self.assertIsNotNone(context) 1358 self.assertIsNotNone(context.get("stop_signo")) 1359 self.assertEqual(int(context.get("stop_signo"), 16), 1360 lldbutil.get_signal_number('SIGTRAP')) 1361 1362 single_step_count += 1 1363 1364 # See if the predicate is true. If so, we're done. 1365 if predicate(args): 1366 return (True, single_step_count) 1367 1368 # The predicate didn't return true within the runaway step count. 1369 return (False, single_step_count) 1370 1371 def g_c1_c2_contents_are(self, args): 1372 """Used by single step test that appears in a few different contexts.""" 1373 g_c1_address = args["g_c1_address"] 1374 g_c2_address = args["g_c2_address"] 1375 expected_g_c1 = args["expected_g_c1"] 1376 expected_g_c2 = args["expected_g_c2"] 1377 1378 # Read g_c1 and g_c2 contents. 1379 self.reset_test_sequence() 1380 self.test_sequence.add_log_lines( 1381 ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1), 1382 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}}, 1383 "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1), 1384 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}], 1385 True) 1386 1387 # Run the packet stream. 1388 context = self.expect_gdbremote_sequence() 1389 self.assertIsNotNone(context) 1390 1391 # Check if what we read from inferior memory is what we are expecting. 1392 self.assertIsNotNone(context.get("g_c1_contents")) 1393 self.assertIsNotNone(context.get("g_c2_contents")) 1394 1395 return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and ( 1396 seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2) 1397 1398 def single_step_only_steps_one_instruction( 1399 self, use_Hc_packet=True, step_instruction="s"): 1400 """Used by single step test that appears in a few different contexts.""" 1401 # Start up the inferior. 1402 procs = self.prep_debug_monitor_and_inferior( 1403 inferior_args=[ 1404 "get-code-address-hex:swap_chars", 1405 "get-data-address-hex:g_c1", 1406 "get-data-address-hex:g_c2", 1407 "sleep:1", 1408 "call-function:swap_chars", 1409 "sleep:5"]) 1410 1411 # Run the process 1412 self.test_sequence.add_log_lines( 1413 [ # Start running after initial stop. 1414 "read packet: $c#63", 1415 # Match output line that prints the memory address of the function call entry point. 1416 # Note we require launch-only testing so we can get inferior otuput. 1417 {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", 1418 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}}, 1419 # Now stop the inferior. 1420 "read packet: {}".format(chr(3)), 1421 # And wait for the stop notification. 1422 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 1423 True) 1424 1425 # Run the packet stream. 1426 context = self.expect_gdbremote_sequence() 1427 self.assertIsNotNone(context) 1428 1429 # Grab the main thread id. 1430 self.assertIsNotNone(context.get("stop_thread_id")) 1431 main_thread_id = int(context.get("stop_thread_id"), 16) 1432 1433 # Grab the function address. 1434 self.assertIsNotNone(context.get("function_address")) 1435 function_address = int(context.get("function_address"), 16) 1436 1437 # Grab the data addresses. 1438 self.assertIsNotNone(context.get("g_c1_address")) 1439 g_c1_address = int(context.get("g_c1_address"), 16) 1440 1441 self.assertIsNotNone(context.get("g_c2_address")) 1442 g_c2_address = int(context.get("g_c2_address"), 16) 1443 1444 # Set a breakpoint at the given address. 1445 if self.getArchitecture().startswith("arm"): 1446 # TODO: Handle case when setting breakpoint in thumb code 1447 BREAKPOINT_KIND = 4 1448 else: 1449 BREAKPOINT_KIND = 1 1450 self.reset_test_sequence() 1451 self.add_set_breakpoint_packets( 1452 function_address, 1453 do_continue=True, 1454 breakpoint_kind=BREAKPOINT_KIND) 1455 context = self.expect_gdbremote_sequence() 1456 self.assertIsNotNone(context) 1457 1458 # Remove the breakpoint. 1459 self.reset_test_sequence() 1460 self.add_remove_breakpoint_packets( 1461 function_address, breakpoint_kind=BREAKPOINT_KIND) 1462 context = self.expect_gdbremote_sequence() 1463 self.assertIsNotNone(context) 1464 1465 # Verify g_c1 and g_c2 match expected initial state. 1466 args = {} 1467 args["g_c1_address"] = g_c1_address 1468 args["g_c2_address"] = g_c2_address 1469 args["expected_g_c1"] = "0" 1470 args["expected_g_c2"] = "1" 1471 1472 self.assertTrue(self.g_c1_c2_contents_are(args)) 1473 1474 # Verify we take only a small number of steps to hit the first state. 1475 # Might need to work through function entry prologue code. 1476 args["expected_g_c1"] = "1" 1477 args["expected_g_c2"] = "1" 1478 (state_reached, 1479 step_count) = self.count_single_steps_until_true(main_thread_id, 1480 self.g_c1_c2_contents_are, 1481 args, 1482 max_step_count=25, 1483 use_Hc_packet=use_Hc_packet, 1484 step_instruction=step_instruction) 1485 self.assertTrue(state_reached) 1486 1487 # Verify we hit the next state. 1488 args["expected_g_c1"] = "1" 1489 args["expected_g_c2"] = "0" 1490 (state_reached, 1491 step_count) = self.count_single_steps_until_true(main_thread_id, 1492 self.g_c1_c2_contents_are, 1493 args, 1494 max_step_count=5, 1495 use_Hc_packet=use_Hc_packet, 1496 step_instruction=step_instruction) 1497 self.assertTrue(state_reached) 1498 expected_step_count = 1 1499 arch = self.getArchitecture() 1500 1501 # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation 1502 # of variable value 1503 if re.match("mips", arch): 1504 expected_step_count = 3 1505 # S390X requires "2" (LARL, MVI) machine instructions for updation of 1506 # variable value 1507 if re.match("s390x", arch): 1508 expected_step_count = 2 1509 # ARM64 requires "4" instructions: 2 to compute the address (adrp, 1510 # add), one to materialize the constant (mov) and the store. Once 1511 # addresses and constants are materialized, only one instruction is 1512 # needed. 1513 if re.match("arm64", arch): 1514 before_materialization_step_count = 4 1515 after_matrialization_step_count = 1 1516 self.assertIn(step_count, [before_materialization_step_count, 1517 after_matrialization_step_count]) 1518 expected_step_count = after_matrialization_step_count 1519 else: 1520 self.assertEqual(step_count, expected_step_count) 1521 1522 # Verify we hit the next state. 1523 args["expected_g_c1"] = "0" 1524 args["expected_g_c2"] = "0" 1525 (state_reached, 1526 step_count) = self.count_single_steps_until_true(main_thread_id, 1527 self.g_c1_c2_contents_are, 1528 args, 1529 max_step_count=5, 1530 use_Hc_packet=use_Hc_packet, 1531 step_instruction=step_instruction) 1532 self.assertTrue(state_reached) 1533 self.assertEqual(step_count, expected_step_count) 1534 1535 # Verify we hit the next state. 1536 args["expected_g_c1"] = "0" 1537 args["expected_g_c2"] = "1" 1538 (state_reached, 1539 step_count) = self.count_single_steps_until_true(main_thread_id, 1540 self.g_c1_c2_contents_are, 1541 args, 1542 max_step_count=5, 1543 use_Hc_packet=use_Hc_packet, 1544 step_instruction=step_instruction) 1545 self.assertTrue(state_reached) 1546 self.assertEqual(step_count, expected_step_count) 1547 1548 def maybe_strict_output_regex(self, regex): 1549 return '.*' + regex + \ 1550 '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$' 1551 1552 def install_and_create_launch_args(self): 1553 exe_path = self.getBuildArtifact("a.out") 1554 if not lldb.remote_platform: 1555 return [exe_path] 1556 remote_path = lldbutil.append_to_process_working_directory(self, 1557 os.path.basename(exe_path)) 1558 remote_file_spec = lldb.SBFileSpec(remote_path, False) 1559 err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True), 1560 remote_file_spec) 1561 if err.Fail(): 1562 raise Exception("remote_platform.Install('%s', '%s') failed: %s" % 1563 (exe_path, remote_path, err)) 1564 return [remote_path] 1565