1""" 2Base class for gdb-remote test cases. 3""" 4 5from __future__ import division, print_function 6 7 8import errno 9import os 10import os.path 11import random 12import re 13import select 14import socket 15import subprocess 16import sys 17import tempfile 18import time 19from lldbsuite.test import configuration 20from lldbsuite.test.lldbtest import * 21from lldbsuite.support import seven 22from lldbgdbserverutils import * 23import logging 24 25 26class _ConnectionRefused(IOError): 27 pass 28 29 30class GdbRemoteTestCaseFactory(type): 31 32 def __new__(cls, name, bases, attrs): 33 newattrs = {} 34 for attrname, attrvalue in attrs.items(): 35 if not attrname.startswith("test"): 36 newattrs[attrname] = attrvalue 37 continue 38 39 # If any debug server categories were explicitly tagged, assume 40 # that list to be authoritative. If none were specified, try 41 # all of them. 42 all_categories = set(["debugserver", "llgs"]) 43 categories = set( 44 getattr(attrvalue, "categories", [])) & all_categories 45 if not categories: 46 categories = all_categories 47 48 for cat in categories: 49 @decorators.add_test_categories([cat]) 50 @wraps(attrvalue) 51 def test_method(self, attrvalue=attrvalue): 52 return attrvalue(self) 53 54 method_name = attrname + "_" + cat 55 test_method.__name__ = method_name 56 test_method.debug_server = cat 57 newattrs[method_name] = test_method 58 59 return super(GdbRemoteTestCaseFactory, cls).__new__( 60 cls, name, bases, newattrs) 61 62@add_metaclass(GdbRemoteTestCaseFactory) 63class GdbRemoteTestCaseBase(Base): 64 65 # Default time out in seconds. The timeout is increased tenfold under Asan. 66 DEFAULT_TIMEOUT = 20 * (10 if ('ASAN_OPTIONS' in os.environ) else 1) 67 # Default sleep time in seconds. The sleep time is doubled under Asan. 68 DEFAULT_SLEEP = 5 * (2 if ('ASAN_OPTIONS' in os.environ) else 1) 69 70 _GDBREMOTE_KILL_PACKET = b"$k#6b" 71 72 # Start the inferior separately, attach to the inferior on the stub 73 # command line. 74 _STARTUP_ATTACH = "attach" 75 # Start the inferior separately, start the stub without attaching, allow 76 # the test to attach to the inferior however it wants (e.g. $vAttach;pid). 77 _STARTUP_ATTACH_MANUALLY = "attach_manually" 78 # Start the stub, and launch the inferior with an $A packet via the 79 # initial packet stream. 80 _STARTUP_LAUNCH = "launch" 81 82 # GDB Signal numbers that are not target-specific used for common 83 # exceptions 84 TARGET_EXC_BAD_ACCESS = 0x91 85 TARGET_EXC_BAD_INSTRUCTION = 0x92 86 TARGET_EXC_ARITHMETIC = 0x93 87 TARGET_EXC_EMULATION = 0x94 88 TARGET_EXC_SOFTWARE = 0x95 89 TARGET_EXC_BREAKPOINT = 0x96 90 91 _verbose_log_handler = None 92 _log_formatter = logging.Formatter( 93 fmt='%(asctime)-15s %(levelname)-8s %(message)s') 94 95 def setUpBaseLogging(self): 96 self.logger = logging.getLogger(__name__) 97 98 if len(self.logger.handlers) > 0: 99 return # We have set up this handler already 100 101 self.logger.propagate = False 102 self.logger.setLevel(logging.DEBUG) 103 104 # log all warnings to stderr 105 handler = logging.StreamHandler() 106 handler.setLevel(logging.WARNING) 107 handler.setFormatter(self._log_formatter) 108 self.logger.addHandler(handler) 109 110 def isVerboseLoggingRequested(self): 111 # We will report our detailed logs if the user requested that the "gdb-remote" channel is 112 # logged. 113 return any(("gdb-remote" in channel) 114 for channel in lldbtest_config.channels) 115 116 def getDebugServer(self): 117 method = getattr(self, self.testMethodName) 118 return getattr(method, "debug_server", None) 119 120 def setUp(self): 121 super(GdbRemoteTestCaseBase, self).setUp() 122 123 self.setUpBaseLogging() 124 self.debug_monitor_extra_args = [] 125 126 if self.isVerboseLoggingRequested(): 127 # If requested, full logs go to a log file 128 self._verbose_log_handler = logging.FileHandler( 129 self.getLogBasenameForCurrentTest() + "-host.log") 130 self._verbose_log_handler.setFormatter(self._log_formatter) 131 self._verbose_log_handler.setLevel(logging.DEBUG) 132 self.logger.addHandler(self._verbose_log_handler) 133 134 self.test_sequence = GdbRemoteTestSequence(self.logger) 135 self.set_inferior_startup_launch() 136 self.port = self.get_next_port() 137 self.stub_sends_two_stop_notifications_on_kill = False 138 if configuration.lldb_platform_url: 139 if configuration.lldb_platform_url.startswith('unix-'): 140 url_pattern = '(.+)://\[?(.+?)\]?/.*' 141 else: 142 url_pattern = '(.+)://(.+):\d+' 143 scheme, host = re.match( 144 url_pattern, configuration.lldb_platform_url).groups() 145 if configuration.lldb_platform_name == 'remote-android' and host != 'localhost': 146 self.stub_device = host 147 self.stub_hostname = 'localhost' 148 else: 149 self.stub_device = None 150 self.stub_hostname = host 151 else: 152 self.stub_hostname = "localhost" 153 154 debug_server = self.getDebugServer() 155 if debug_server == "debugserver": 156 self._init_debugserver_test() 157 else: 158 self._init_llgs_test() 159 160 def tearDown(self): 161 self.logger.removeHandler(self._verbose_log_handler) 162 self._verbose_log_handler = None 163 TestBase.tearDown(self) 164 165 def getLocalServerLogFile(self): 166 return self.getLogBasenameForCurrentTest() + "-server.log" 167 168 def setUpServerLogging(self, is_llgs): 169 if len(lldbtest_config.channels) == 0: 170 return # No logging requested 171 172 if lldb.remote_platform: 173 log_file = lldbutil.join_remote_paths( 174 lldb.remote_platform.GetWorkingDirectory(), "server.log") 175 else: 176 log_file = self.getLocalServerLogFile() 177 178 if is_llgs: 179 self.debug_monitor_extra_args.append("--log-file=" + log_file) 180 self.debug_monitor_extra_args.append( 181 "--log-channels={}".format(":".join(lldbtest_config.channels))) 182 else: 183 self.debug_monitor_extra_args = [ 184 "--log-file=" + log_file, "--log-flags=0x800000"] 185 186 def get_next_port(self): 187 return 12000 + random.randint(0, 3999) 188 189 def reset_test_sequence(self): 190 self.test_sequence = GdbRemoteTestSequence(self.logger) 191 192 193 def _init_llgs_test(self): 194 reverse_connect = True 195 if lldb.remote_platform: 196 # Reverse connections may be tricky due to firewalls/NATs. 197 reverse_connect = False 198 199 # FIXME: This is extremely linux-oriented 200 201 # Grab the ppid from /proc/[shell pid]/stat 202 err, retcode, shell_stat = self.run_platform_command( 203 "cat /proc/$$/stat") 204 self.assertTrue( 205 err.Success() and retcode == 0, 206 "Failed to read file /proc/$$/stat: %s, retcode: %d" % 207 (err.GetCString(), 208 retcode)) 209 210 # [pid] ([executable]) [state] [*ppid*] 211 pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1) 212 err, retcode, ls_output = self.run_platform_command( 213 "ls -l /proc/%s/exe" % pid) 214 self.assertTrue( 215 err.Success() and retcode == 0, 216 "Failed to read file /proc/%s/exe: %s, retcode: %d" % 217 (pid, 218 err.GetCString(), 219 retcode)) 220 exe = ls_output.split()[-1] 221 222 # If the binary has been deleted, the link name has " (deleted)" appended. 223 # Remove if it's there. 224 self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe) 225 else: 226 self.debug_monitor_exe = get_lldb_server_exe() 227 228 self.debug_monitor_extra_args = ["gdbserver"] 229 self.setUpServerLogging(is_llgs=True) 230 231 self.reverse_connect = reverse_connect 232 233 def _init_debugserver_test(self): 234 self.debug_monitor_exe = get_debugserver_exe() 235 self.setUpServerLogging(is_llgs=False) 236 self.reverse_connect = True 237 238 # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification 239 # when the process truly dies. 240 self.stub_sends_two_stop_notifications_on_kill = True 241 242 def forward_adb_port(self, source, target, direction, device): 243 adb = ['adb'] + (['-s', device] if device else []) + [direction] 244 245 def remove_port_forward(): 246 subprocess.call(adb + ["--remove", "tcp:%d" % source]) 247 248 subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target]) 249 self.addTearDownHook(remove_port_forward) 250 251 def _verify_socket(self, sock): 252 # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the 253 # connect() attempt. However, due to the way how ADB forwarding works, on android targets 254 # the connect() will always be successful, but the connection will be immediately dropped 255 # if ADB could not connect on the remote side. This function tries to detect this 256 # situation, and report it as "connection refused" so that the upper layers attempt the 257 # connection again. 258 triple = self.dbg.GetSelectedPlatform().GetTriple() 259 if not re.match(".*-.*-.*-android", triple): 260 return # Not android. 261 can_read, _, _ = select.select([sock], [], [], 0.1) 262 if sock not in can_read: 263 return # Data is not available, but the connection is alive. 264 if len(sock.recv(1, socket.MSG_PEEK)) == 0: 265 raise _ConnectionRefused() # Got EOF, connection dropped. 266 267 def create_socket(self): 268 try: 269 sock = socket.socket(family=socket.AF_INET) 270 except OSError as e: 271 if e.errno != errno.EAFNOSUPPORT: 272 raise 273 sock = socket.socket(family=socket.AF_INET6) 274 275 logger = self.logger 276 277 triple = self.dbg.GetSelectedPlatform().GetTriple() 278 if re.match(".*-.*-.*-android", triple): 279 self.forward_adb_port( 280 self.port, 281 self.port, 282 "forward", 283 self.stub_device) 284 285 logger.info( 286 "Connecting to debug monitor on %s:%d", 287 self.stub_hostname, 288 self.port) 289 connect_info = (self.stub_hostname, self.port) 290 try: 291 sock.connect(connect_info) 292 except socket.error as serr: 293 if serr.errno == errno.ECONNREFUSED: 294 raise _ConnectionRefused() 295 raise serr 296 297 def shutdown_socket(): 298 if sock: 299 try: 300 # send the kill packet so lldb-server shuts down gracefully 301 sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET) 302 except: 303 logger.warning( 304 "failed to send kill packet to debug monitor: {}; ignoring".format( 305 sys.exc_info()[0])) 306 307 try: 308 sock.close() 309 except: 310 logger.warning( 311 "failed to close socket to debug monitor: {}; ignoring".format( 312 sys.exc_info()[0])) 313 314 self.addTearDownHook(shutdown_socket) 315 316 self._verify_socket(sock) 317 318 return sock 319 320 def set_inferior_startup_launch(self): 321 self._inferior_startup = self._STARTUP_LAUNCH 322 323 def set_inferior_startup_attach(self): 324 self._inferior_startup = self._STARTUP_ATTACH 325 326 def set_inferior_startup_attach_manually(self): 327 self._inferior_startup = self._STARTUP_ATTACH_MANUALLY 328 329 def get_debug_monitor_command_line_args(self, attach_pid=None): 330 commandline_args = self.debug_monitor_extra_args 331 if attach_pid: 332 commandline_args += ["--attach=%d" % attach_pid] 333 if self.reverse_connect: 334 commandline_args += ["--reverse-connect", self.connect_address] 335 else: 336 if lldb.remote_platform: 337 commandline_args += ["*:{}".format(self.port)] 338 else: 339 commandline_args += ["localhost:{}".format(self.port)] 340 341 return commandline_args 342 343 def get_target_byte_order(self): 344 inferior_exe_path = self.getBuildArtifact("a.out") 345 target = self.dbg.CreateTarget(inferior_exe_path) 346 return target.GetByteOrder() 347 348 def launch_debug_monitor(self, attach_pid=None, logfile=None): 349 if self.reverse_connect: 350 family, type, proto, _, addr = socket.getaddrinfo("localhost", 0, proto=socket.IPPROTO_TCP)[0] 351 sock = socket.socket(family, type, proto) 352 sock.settimeout(self.DEFAULT_TIMEOUT) 353 354 sock.bind(addr) 355 sock.listen(1) 356 addr = sock.getsockname() 357 self.connect_address = "[{}]:{}".format(*addr) 358 359 360 # Create the command line. 361 commandline_args = self.get_debug_monitor_command_line_args( 362 attach_pid=attach_pid) 363 364 # Start the server. 365 server = self.spawnSubprocess( 366 self.debug_monitor_exe, 367 commandline_args, 368 install_remote=False) 369 self.assertIsNotNone(server) 370 371 if self.reverse_connect: 372 self.sock = sock.accept()[0] 373 self.sock.settimeout(self.DEFAULT_TIMEOUT) 374 375 return server 376 377 def connect_to_debug_monitor(self, attach_pid=None): 378 if self.reverse_connect: 379 # Create the stub. 380 server = self.launch_debug_monitor(attach_pid=attach_pid) 381 self.assertIsNotNone(server) 382 383 # Schedule debug monitor to be shut down during teardown. 384 logger = self.logger 385 386 self._server = Server(self.sock, server) 387 return server 388 389 # We're using a random port algorithm to try not to collide with other ports, 390 # and retry a max # times. 391 attempts = 0 392 MAX_ATTEMPTS = 20 393 394 while attempts < MAX_ATTEMPTS: 395 server = self.launch_debug_monitor(attach_pid=attach_pid) 396 397 # Schedule debug monitor to be shut down during teardown. 398 logger = self.logger 399 400 connect_attemps = 0 401 MAX_CONNECT_ATTEMPTS = 10 402 403 while connect_attemps < MAX_CONNECT_ATTEMPTS: 404 # Create a socket to talk to the server 405 try: 406 logger.info("Connect attempt %d", connect_attemps + 1) 407 self.sock = self.create_socket() 408 self._server = Server(self.sock, server) 409 return server 410 except _ConnectionRefused as serr: 411 # Ignore, and try again. 412 pass 413 time.sleep(0.5) 414 connect_attemps += 1 415 416 # We should close the server here to be safe. 417 server.terminate() 418 419 # Increment attempts. 420 print( 421 "connect to debug monitor on port %d failed, attempt #%d of %d" % 422 (self.port, attempts + 1, MAX_ATTEMPTS)) 423 attempts += 1 424 425 # And wait a random length of time before next attempt, to avoid 426 # collisions. 427 time.sleep(random.randint(1, 5)) 428 429 # Now grab a new port number. 430 self.port = self.get_next_port() 431 432 raise Exception( 433 "failed to create a socket to the launched debug monitor after %d tries" % 434 attempts) 435 436 def launch_process_for_attach( 437 self, 438 inferior_args=None, 439 sleep_seconds=3, 440 exe_path=None): 441 # We're going to start a child process that the debug monitor stub can later attach to. 442 # This process needs to be started so that it just hangs around for a while. We'll 443 # have it sleep. 444 if not exe_path: 445 exe_path = self.getBuildArtifact("a.out") 446 447 args = [] 448 if inferior_args: 449 args.extend(inferior_args) 450 if sleep_seconds: 451 args.append("sleep:%d" % sleep_seconds) 452 453 return self.spawnSubprocess(exe_path, args) 454 455 def prep_debug_monitor_and_inferior( 456 self, 457 inferior_args=None, 458 inferior_sleep_seconds=3, 459 inferior_exe_path=None, 460 inferior_env=None): 461 """Prep the debug monitor, the inferior, and the expected packet stream. 462 463 Handle the separate cases of using the debug monitor in attach-to-inferior mode 464 and in launch-inferior mode. 465 466 For attach-to-inferior mode, the inferior process is first started, then 467 the debug monitor is started in attach to pid mode (using --attach on the 468 stub command line), and the no-ack-mode setup is appended to the packet 469 stream. The packet stream is not yet executed, ready to have more expected 470 packet entries added to it. 471 472 For launch-inferior mode, the stub is first started, then no ack mode is 473 setup on the expected packet stream, then the verified launch packets are added 474 to the expected socket stream. The packet stream is not yet executed, ready 475 to have more expected packet entries added to it. 476 477 The return value is: 478 {inferior:<inferior>, server:<server>} 479 """ 480 inferior = None 481 attach_pid = None 482 483 if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY: 484 # Launch the process that we'll use as the inferior. 485 inferior = self.launch_process_for_attach( 486 inferior_args=inferior_args, 487 sleep_seconds=inferior_sleep_seconds, 488 exe_path=inferior_exe_path) 489 self.assertIsNotNone(inferior) 490 self.assertTrue(inferior.pid > 0) 491 if self._inferior_startup == self._STARTUP_ATTACH: 492 # In this case, we want the stub to attach via the command 493 # line, so set the command line attach pid here. 494 attach_pid = inferior.pid 495 496 if self._inferior_startup == self._STARTUP_LAUNCH: 497 # Build launch args 498 if not inferior_exe_path: 499 inferior_exe_path = self.getBuildArtifact("a.out") 500 501 if lldb.remote_platform: 502 remote_path = lldbutil.append_to_process_working_directory(self, 503 os.path.basename(inferior_exe_path)) 504 remote_file_spec = lldb.SBFileSpec(remote_path, False) 505 err = lldb.remote_platform.Install(lldb.SBFileSpec( 506 inferior_exe_path, True), remote_file_spec) 507 if err.Fail(): 508 raise Exception( 509 "remote_platform.Install('%s', '%s') failed: %s" % 510 (inferior_exe_path, remote_path, err)) 511 inferior_exe_path = remote_path 512 513 launch_args = [inferior_exe_path] 514 if inferior_args: 515 launch_args.extend(inferior_args) 516 517 # Launch the debug monitor stub, attaching to the inferior. 518 server = self.connect_to_debug_monitor(attach_pid=attach_pid) 519 self.assertIsNotNone(server) 520 521 self.do_handshake() 522 523 # Build the expected protocol stream 524 if inferior_env: 525 for name, value in inferior_env.items(): 526 self.add_set_environment_packets(name, value) 527 if self._inferior_startup == self._STARTUP_LAUNCH: 528 self.add_verified_launch_packets(launch_args) 529 530 return {"inferior": inferior, "server": server} 531 532 def do_handshake(self): 533 server = self._server 534 server.send_ack() 535 server.send_packet(b"QStartNoAckMode") 536 self.assertEqual(server.get_normal_packet(), b"+") 537 self.assertEqual(server.get_normal_packet(), b"OK") 538 server.send_ack() 539 540 def add_verified_launch_packets(self, launch_args): 541 self.test_sequence.add_log_lines( 542 ["read packet: %s" % build_gdbremote_A_packet(launch_args), 543 "send packet: $OK#00", 544 "read packet: $qLaunchSuccess#a5", 545 "send packet: $OK#00"], 546 True) 547 548 def add_thread_suffix_request_packets(self): 549 self.test_sequence.add_log_lines( 550 ["read packet: $QThreadSuffixSupported#e4", 551 "send packet: $OK#00", 552 ], True) 553 554 def add_process_info_collection_packets(self): 555 self.test_sequence.add_log_lines( 556 ["read packet: $qProcessInfo#dc", 557 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}], 558 True) 559 560 def add_set_environment_packets(self, name, value): 561 self.test_sequence.add_log_lines( 562 ["read packet: $QEnvironment:" + name + "=" + value + "#00", 563 "send packet: $OK#00", 564 ], True) 565 566 _KNOWN_PROCESS_INFO_KEYS = [ 567 "pid", 568 "parent-pid", 569 "real-uid", 570 "real-gid", 571 "effective-uid", 572 "effective-gid", 573 "cputype", 574 "cpusubtype", 575 "ostype", 576 "triple", 577 "vendor", 578 "endian", 579 "elf_abi", 580 "ptrsize" 581 ] 582 583 def parse_process_info_response(self, context): 584 # Ensure we have a process info response. 585 self.assertIsNotNone(context) 586 process_info_raw = context.get("process_info_raw") 587 self.assertIsNotNone(process_info_raw) 588 589 # Pull out key:value; pairs. 590 process_info_dict = { 591 match.group(1): match.group(2) for match in re.finditer( 592 r"([^:]+):([^;]+);", process_info_raw)} 593 594 # Validate keys are known. 595 for (key, val) in list(process_info_dict.items()): 596 self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS) 597 self.assertIsNotNone(val) 598 599 return process_info_dict 600 601 def add_register_info_collection_packets(self): 602 self.test_sequence.add_log_lines( 603 [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True, 604 "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"), 605 "save_key": "reg_info_responses"}], 606 True) 607 608 def parse_register_info_packets(self, context): 609 """Return an array of register info dictionaries, one per register info.""" 610 reg_info_responses = context.get("reg_info_responses") 611 self.assertIsNotNone(reg_info_responses) 612 613 # Parse register infos. 614 return [parse_reg_info_response(reg_info_response) 615 for reg_info_response in reg_info_responses] 616 617 def expect_gdbremote_sequence(self): 618 return expect_lldb_gdbserver_replay( 619 self, 620 self._server, 621 self.test_sequence, 622 self.DEFAULT_TIMEOUT * len(self.test_sequence), 623 self.logger) 624 625 _KNOWN_REGINFO_KEYS = [ 626 "name", 627 "alt-name", 628 "bitsize", 629 "offset", 630 "encoding", 631 "format", 632 "set", 633 "gcc", 634 "ehframe", 635 "dwarf", 636 "generic", 637 "container-regs", 638 "invalidate-regs", 639 "dynamic_size_dwarf_expr_bytes", 640 "dynamic_size_dwarf_len" 641 ] 642 643 def assert_valid_reg_info(self, reg_info): 644 # Assert we know about all the reginfo keys parsed. 645 for key in reg_info: 646 self.assertTrue(key in self._KNOWN_REGINFO_KEYS) 647 648 # Check the bare-minimum expected set of register info keys. 649 self.assertTrue("name" in reg_info) 650 self.assertTrue("bitsize" in reg_info) 651 652 if not self.getArchitecture() == 'aarch64': 653 self.assertTrue("offset" in reg_info) 654 655 self.assertTrue("encoding" in reg_info) 656 self.assertTrue("format" in reg_info) 657 658 def find_pc_reg_info(self, reg_infos): 659 lldb_reg_index = 0 660 for reg_info in reg_infos: 661 if ("generic" in reg_info) and (reg_info["generic"] == "pc"): 662 return (lldb_reg_index, reg_info) 663 lldb_reg_index += 1 664 665 return (None, None) 666 667 def add_lldb_register_index(self, reg_infos): 668 """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry. 669 670 We'll use this when we want to call packets like P/p with a register index but do so 671 on only a subset of the full register info set. 672 """ 673 self.assertIsNotNone(reg_infos) 674 675 reg_index = 0 676 for reg_info in reg_infos: 677 reg_info["lldb_register_index"] = reg_index 678 reg_index += 1 679 680 def add_query_memory_region_packets(self, address): 681 self.test_sequence.add_log_lines( 682 ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address), 683 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}], 684 True) 685 686 def parse_key_val_dict(self, key_val_text, allow_dupes=True): 687 self.assertIsNotNone(key_val_text) 688 kv_dict = {} 689 for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text): 690 key = match.group(1) 691 val = match.group(2) 692 if key in kv_dict: 693 if allow_dupes: 694 if isinstance(kv_dict[key], list): 695 kv_dict[key].append(val) 696 else: 697 # Promote to list 698 kv_dict[key] = [kv_dict[key], val] 699 else: 700 self.fail( 701 "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format( 702 key, val, key_val_text, kv_dict)) 703 else: 704 kv_dict[key] = val 705 return kv_dict 706 707 def parse_memory_region_packet(self, context): 708 # Ensure we have a context. 709 self.assertIsNotNone(context.get("memory_region_response")) 710 711 # Pull out key:value; pairs. 712 mem_region_dict = self.parse_key_val_dict( 713 context.get("memory_region_response")) 714 715 # Validate keys are known. 716 for (key, val) in list(mem_region_dict.items()): 717 self.assertIn(key, 718 ["start", 719 "size", 720 "permissions", 721 "flags", 722 "name", 723 "error", 724 "dirty-pages", 725 "type"]) 726 self.assertIsNotNone(val) 727 728 mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", "")) 729 # Return the dictionary of key-value pairs for the memory region. 730 return mem_region_dict 731 732 def assert_address_within_memory_region( 733 self, test_address, mem_region_dict): 734 self.assertIsNotNone(mem_region_dict) 735 self.assertTrue("start" in mem_region_dict) 736 self.assertTrue("size" in mem_region_dict) 737 738 range_start = int(mem_region_dict["start"], 16) 739 range_size = int(mem_region_dict["size"], 16) 740 range_end = range_start + range_size 741 742 if test_address < range_start: 743 self.fail( 744 "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 745 test_address, 746 range_start, 747 range_end, 748 range_size)) 749 elif test_address >= range_end: 750 self.fail( 751 "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 752 test_address, 753 range_start, 754 range_end, 755 range_size)) 756 757 def add_threadinfo_collection_packets(self): 758 self.test_sequence.add_log_lines( 759 [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo", 760 "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"), 761 "save_key": "threadinfo_responses"}], 762 True) 763 764 def parse_threadinfo_packets(self, context): 765 """Return an array of thread ids (decimal ints), one per thread.""" 766 threadinfo_responses = context.get("threadinfo_responses") 767 self.assertIsNotNone(threadinfo_responses) 768 769 thread_ids = [] 770 for threadinfo_response in threadinfo_responses: 771 new_thread_infos = parse_threadinfo_response(threadinfo_response) 772 thread_ids.extend(new_thread_infos) 773 return thread_ids 774 775 def wait_for_thread_count(self, thread_count): 776 start_time = time.time() 777 timeout_time = start_time + self.DEFAULT_TIMEOUT 778 779 actual_thread_count = 0 780 while actual_thread_count < thread_count: 781 self.reset_test_sequence() 782 self.add_threadinfo_collection_packets() 783 784 context = self.expect_gdbremote_sequence() 785 self.assertIsNotNone(context) 786 787 threads = self.parse_threadinfo_packets(context) 788 self.assertIsNotNone(threads) 789 790 actual_thread_count = len(threads) 791 792 if time.time() > timeout_time: 793 raise Exception( 794 'timed out after {} seconds while waiting for threads: waiting for at least {} threads, found {}'.format( 795 self.DEFAULT_TIMEOUT, thread_count, actual_thread_count)) 796 797 return threads 798 799 def add_set_breakpoint_packets( 800 self, 801 address, 802 z_packet_type=0, 803 do_continue=True, 804 breakpoint_kind=1): 805 self.test_sequence.add_log_lines( 806 [ # Set the breakpoint. 807 "read packet: $Z{2},{0:x},{1}#00".format( 808 address, breakpoint_kind, z_packet_type), 809 # Verify the stub could set it. 810 "send packet: $OK#00", 811 ], True) 812 813 if (do_continue): 814 self.test_sequence.add_log_lines( 815 [ # Continue the inferior. 816 "read packet: $c#63", 817 # Expect a breakpoint stop report. 818 {"direction": "send", 819 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 820 "capture": {1: "stop_signo", 821 2: "stop_thread_id"}}, 822 ], True) 823 824 def add_remove_breakpoint_packets( 825 self, 826 address, 827 z_packet_type=0, 828 breakpoint_kind=1): 829 self.test_sequence.add_log_lines( 830 [ # Remove the breakpoint. 831 "read packet: $z{2},{0:x},{1}#00".format( 832 address, breakpoint_kind, z_packet_type), 833 # Verify the stub could unset it. 834 "send packet: $OK#00", 835 ], True) 836 837 def add_qSupported_packets(self, client_features=[]): 838 features = ''.join(';' + x for x in client_features) 839 self.test_sequence.add_log_lines( 840 ["read packet: $qSupported{}#00".format(features), 841 {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}}, 842 ], True) 843 844 _KNOWN_QSUPPORTED_STUB_FEATURES = [ 845 "augmented-libraries-svr4-read", 846 "PacketSize", 847 "QStartNoAckMode", 848 "QThreadSuffixSupported", 849 "QListThreadsInStopReply", 850 "qXfer:auxv:read", 851 "qXfer:libraries:read", 852 "qXfer:libraries-svr4:read", 853 "qXfer:features:read", 854 "qXfer:siginfo:read", 855 "qEcho", 856 "QPassSignals", 857 "multiprocess", 858 "fork-events", 859 "vfork-events", 860 "memory-tagging", 861 "qSaveCore", 862 "native-signals", 863 ] 864 865 def parse_qSupported_response(self, context): 866 self.assertIsNotNone(context) 867 868 raw_response = context.get("qSupported_response") 869 self.assertIsNotNone(raw_response) 870 871 # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the 872 # +,-,? is stripped from the key and set as the value. 873 supported_dict = {} 874 for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response): 875 key = match.group(1) 876 val = match.group(3) 877 878 # key=val: store as is 879 if val and len(val) > 0: 880 supported_dict[key] = val 881 else: 882 if len(key) < 2: 883 raise Exception( 884 "singular stub feature is too short: must be stub_feature{+,-,?}") 885 supported_type = key[-1] 886 key = key[:-1] 887 if not supported_type in ["+", "-", "?"]: 888 raise Exception( 889 "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type)) 890 supported_dict[key] = supported_type 891 # Ensure we know the supported element 892 if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES: 893 raise Exception( 894 "unknown qSupported stub feature reported: %s" % 895 key) 896 897 return supported_dict 898 899 def run_process_then_stop(self, run_seconds=1): 900 # Tell the stub to continue. 901 self.test_sequence.add_log_lines( 902 ["read packet: $vCont;c#a8"], 903 True) 904 context = self.expect_gdbremote_sequence() 905 906 # Wait for run_seconds. 907 time.sleep(run_seconds) 908 909 # Send an interrupt, capture a T response. 910 self.reset_test_sequence() 911 self.test_sequence.add_log_lines( 912 ["read packet: {}".format(chr(3)), 913 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}], 914 True) 915 context = self.expect_gdbremote_sequence() 916 self.assertIsNotNone(context) 917 self.assertIsNotNone(context.get("stop_result")) 918 919 return context 920 921 def continue_process_and_wait_for_stop(self): 922 self.test_sequence.add_log_lines( 923 [ 924 "read packet: $vCont;c#a8", 925 { 926 "direction": "send", 927 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 928 "capture": {1: "stop_signo", 2: "stop_key_val_text"}, 929 }, 930 ], 931 True, 932 ) 933 context = self.expect_gdbremote_sequence() 934 self.assertIsNotNone(context) 935 return self.parse_interrupt_packets(context) 936 937 def select_modifiable_register(self, reg_infos): 938 """Find a register that can be read/written freely.""" 939 PREFERRED_REGISTER_NAMES = set(["rax", ]) 940 941 # First check for the first register from the preferred register name 942 # set. 943 alternative_register_index = None 944 945 self.assertIsNotNone(reg_infos) 946 for reg_info in reg_infos: 947 if ("name" in reg_info) and ( 948 reg_info["name"] in PREFERRED_REGISTER_NAMES): 949 # We found a preferred register. Use it. 950 return reg_info["lldb_register_index"] 951 if ("generic" in reg_info) and (reg_info["generic"] == "fp" or 952 reg_info["generic"] == "arg1"): 953 # A frame pointer or first arg register will do as a 954 # register to modify temporarily. 955 alternative_register_index = reg_info["lldb_register_index"] 956 957 # We didn't find a preferred register. Return whatever alternative register 958 # we found, if any. 959 return alternative_register_index 960 961 def extract_registers_from_stop_notification(self, stop_key_vals_text): 962 self.assertIsNotNone(stop_key_vals_text) 963 kv_dict = self.parse_key_val_dict(stop_key_vals_text) 964 965 registers = {} 966 for (key, val) in list(kv_dict.items()): 967 if re.match(r"^[0-9a-fA-F]+$", key): 968 registers[int(key, 16)] = val 969 return registers 970 971 def gather_register_infos(self): 972 self.reset_test_sequence() 973 self.add_register_info_collection_packets() 974 975 context = self.expect_gdbremote_sequence() 976 self.assertIsNotNone(context) 977 978 reg_infos = self.parse_register_info_packets(context) 979 self.assertIsNotNone(reg_infos) 980 self.add_lldb_register_index(reg_infos) 981 982 return reg_infos 983 984 def find_generic_register_with_name(self, reg_infos, generic_name): 985 self.assertIsNotNone(reg_infos) 986 for reg_info in reg_infos: 987 if ("generic" in reg_info) and ( 988 reg_info["generic"] == generic_name): 989 return reg_info 990 return None 991 992 def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num): 993 self.assertIsNotNone(reg_infos) 994 for reg_info in reg_infos: 995 if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num): 996 return reg_info 997 return None 998 999 def decode_gdbremote_binary(self, encoded_bytes): 1000 decoded_bytes = "" 1001 i = 0 1002 while i < len(encoded_bytes): 1003 if encoded_bytes[i] == "}": 1004 # Handle escaped char. 1005 self.assertTrue(i + 1 < len(encoded_bytes)) 1006 decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20) 1007 i += 2 1008 elif encoded_bytes[i] == "*": 1009 # Handle run length encoding. 1010 self.assertTrue(len(decoded_bytes) > 0) 1011 self.assertTrue(i + 1 < len(encoded_bytes)) 1012 repeat_count = ord(encoded_bytes[i + 1]) - 29 1013 decoded_bytes += decoded_bytes[-1] * repeat_count 1014 i += 2 1015 else: 1016 decoded_bytes += encoded_bytes[i] 1017 i += 1 1018 return decoded_bytes 1019 1020 def build_auxv_dict(self, endian, word_size, auxv_data): 1021 self.assertIsNotNone(endian) 1022 self.assertIsNotNone(word_size) 1023 self.assertIsNotNone(auxv_data) 1024 1025 auxv_dict = {} 1026 1027 # PowerPC64le's auxvec has a special key that must be ignored. 1028 # This special key may be used multiple times, resulting in 1029 # multiple key/value pairs with the same key, which would otherwise 1030 # break this test check for repeated keys. 1031 # 1032 # AT_IGNOREPPC = 22 1033 ignored_keys_for_arch = { 'powerpc64le' : [22] } 1034 arch = self.getArchitecture() 1035 ignore_keys = None 1036 if arch in ignored_keys_for_arch: 1037 ignore_keys = ignored_keys_for_arch[arch] 1038 1039 while len(auxv_data) > 0: 1040 # Chop off key. 1041 raw_key = auxv_data[:word_size] 1042 auxv_data = auxv_data[word_size:] 1043 1044 # Chop of value. 1045 raw_value = auxv_data[:word_size] 1046 auxv_data = auxv_data[word_size:] 1047 1048 # Convert raw text from target endian. 1049 key = unpack_endian_binary_string(endian, raw_key) 1050 value = unpack_endian_binary_string(endian, raw_value) 1051 1052 if ignore_keys and key in ignore_keys: 1053 continue 1054 1055 # Handle ending entry. 1056 if key == 0: 1057 self.assertEqual(value, 0) 1058 return auxv_dict 1059 1060 # The key should not already be present. 1061 self.assertFalse(key in auxv_dict) 1062 auxv_dict[key] = value 1063 1064 self.fail( 1065 "should not reach here - implies required double zero entry not found") 1066 return auxv_dict 1067 1068 def read_binary_data_in_chunks(self, command_prefix, chunk_length): 1069 """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned.""" 1070 offset = 0 1071 done = False 1072 decoded_data = "" 1073 1074 while not done: 1075 # Grab the next iteration of data. 1076 self.reset_test_sequence() 1077 self.test_sequence.add_log_lines( 1078 [ 1079 "read packet: ${}{:x},{:x}:#00".format( 1080 command_prefix, 1081 offset, 1082 chunk_length), 1083 { 1084 "direction": "send", 1085 "regex": re.compile( 1086 r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", 1087 re.MULTILINE | re.DOTALL), 1088 "capture": { 1089 1: "response_type", 1090 2: "content_raw"}}], 1091 True) 1092 1093 context = self.expect_gdbremote_sequence() 1094 self.assertIsNotNone(context) 1095 1096 response_type = context.get("response_type") 1097 self.assertIsNotNone(response_type) 1098 self.assertTrue(response_type in ["l", "m"]) 1099 1100 # Move offset along. 1101 offset += chunk_length 1102 1103 # Figure out if we're done. We're done if the response type is l. 1104 done = response_type == "l" 1105 1106 # Decode binary data. 1107 content_raw = context.get("content_raw") 1108 if content_raw and len(content_raw) > 0: 1109 self.assertIsNotNone(content_raw) 1110 decoded_data += self.decode_gdbremote_binary(content_raw) 1111 return decoded_data 1112 1113 def add_interrupt_packets(self): 1114 self.test_sequence.add_log_lines([ 1115 # Send the intterupt. 1116 "read packet: {}".format(chr(3)), 1117 # And wait for the stop notification. 1118 {"direction": "send", 1119 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 1120 "capture": {1: "stop_signo", 1121 2: "stop_key_val_text"}}, 1122 ], True) 1123 1124 def parse_interrupt_packets(self, context): 1125 self.assertIsNotNone(context.get("stop_signo")) 1126 self.assertIsNotNone(context.get("stop_key_val_text")) 1127 return (int(context["stop_signo"], 16), self.parse_key_val_dict( 1128 context["stop_key_val_text"])) 1129 1130 def add_QSaveRegisterState_packets(self, thread_id): 1131 if thread_id: 1132 # Use the thread suffix form. 1133 request = "read packet: $QSaveRegisterState;thread:{:x}#00".format( 1134 thread_id) 1135 else: 1136 request = "read packet: $QSaveRegisterState#00" 1137 1138 self.test_sequence.add_log_lines([request, 1139 {"direction": "send", 1140 "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$", 1141 "capture": {1: "save_response"}}, 1142 ], 1143 True) 1144 1145 def parse_QSaveRegisterState_response(self, context): 1146 self.assertIsNotNone(context) 1147 1148 save_response = context.get("save_response") 1149 self.assertIsNotNone(save_response) 1150 1151 if len(save_response) < 1 or save_response[0] == "E": 1152 # error received 1153 return (False, None) 1154 else: 1155 return (True, int(save_response)) 1156 1157 def add_QRestoreRegisterState_packets(self, save_id, thread_id=None): 1158 if thread_id: 1159 # Use the thread suffix form. 1160 request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format( 1161 save_id, thread_id) 1162 else: 1163 request = "read packet: $QRestoreRegisterState:{}#00".format( 1164 save_id) 1165 1166 self.test_sequence.add_log_lines([ 1167 request, 1168 "send packet: $OK#00" 1169 ], True) 1170 1171 def flip_all_bits_in_each_register_value( 1172 self, reg_infos, endian, thread_id=None): 1173 self.assertIsNotNone(reg_infos) 1174 1175 successful_writes = 0 1176 failed_writes = 0 1177 1178 for reg_info in reg_infos: 1179 # Use the lldb register index added to the reg info. We're not necessarily 1180 # working off a full set of register infos, so an inferred register 1181 # index could be wrong. 1182 reg_index = reg_info["lldb_register_index"] 1183 self.assertIsNotNone(reg_index) 1184 1185 reg_byte_size = int(reg_info["bitsize"]) // 8 1186 self.assertTrue(reg_byte_size > 0) 1187 1188 # Handle thread suffix. 1189 if thread_id: 1190 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1191 reg_index, thread_id) 1192 else: 1193 p_request = "read packet: $p{:x}#00".format(reg_index) 1194 1195 # Read the existing value. 1196 self.reset_test_sequence() 1197 self.test_sequence.add_log_lines([ 1198 p_request, 1199 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1200 ], True) 1201 context = self.expect_gdbremote_sequence() 1202 self.assertIsNotNone(context) 1203 1204 # Verify the response length. 1205 p_response = context.get("p_response") 1206 self.assertIsNotNone(p_response) 1207 initial_reg_value = unpack_register_hex_unsigned( 1208 endian, p_response) 1209 1210 # Flip the value by xoring with all 1s 1211 all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8) 1212 flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16) 1213 # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int)) 1214 1215 # Handle thread suffix for P. 1216 if thread_id: 1217 P_request = "read packet: $P{:x}={};thread:{:x}#00".format( 1218 reg_index, pack_register_hex( 1219 endian, flipped_bits_int, byte_size=reg_byte_size), thread_id) 1220 else: 1221 P_request = "read packet: $P{:x}={}#00".format( 1222 reg_index, pack_register_hex( 1223 endian, flipped_bits_int, byte_size=reg_byte_size)) 1224 1225 # Write the flipped value to the register. 1226 self.reset_test_sequence() 1227 self.test_sequence.add_log_lines([P_request, 1228 {"direction": "send", 1229 "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", 1230 "capture": {1: "P_response"}}, 1231 ], 1232 True) 1233 context = self.expect_gdbremote_sequence() 1234 self.assertIsNotNone(context) 1235 1236 # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail 1237 # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them 1238 # all flipping perfectly. 1239 P_response = context.get("P_response") 1240 self.assertIsNotNone(P_response) 1241 if P_response == "OK": 1242 successful_writes += 1 1243 else: 1244 failed_writes += 1 1245 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response)) 1246 1247 # Read back the register value, ensure it matches the flipped 1248 # value. 1249 if P_response == "OK": 1250 self.reset_test_sequence() 1251 self.test_sequence.add_log_lines([ 1252 p_request, 1253 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1254 ], True) 1255 context = self.expect_gdbremote_sequence() 1256 self.assertIsNotNone(context) 1257 1258 verify_p_response_raw = context.get("p_response") 1259 self.assertIsNotNone(verify_p_response_raw) 1260 verify_bits = unpack_register_hex_unsigned( 1261 endian, verify_p_response_raw) 1262 1263 if verify_bits != flipped_bits_int: 1264 # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts. 1265 # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits)) 1266 successful_writes -= 1 1267 failed_writes += 1 1268 1269 return (successful_writes, failed_writes) 1270 1271 def is_bit_flippable_register(self, reg_info): 1272 if not reg_info: 1273 return False 1274 if not "set" in reg_info: 1275 return False 1276 if reg_info["set"] != "General Purpose Registers": 1277 return False 1278 if ("container-regs" in reg_info) and ( 1279 len(reg_info["container-regs"]) > 0): 1280 # Don't try to bit flip registers contained in another register. 1281 return False 1282 if re.match("^.s$", reg_info["name"]): 1283 # This is a 2-letter register name that ends in "s", like a segment register. 1284 # Don't try to bit flip these. 1285 return False 1286 if re.match("^(c|)psr$", reg_info["name"]): 1287 # This is an ARM program status register; don't flip it. 1288 return False 1289 # Okay, this looks fine-enough. 1290 return True 1291 1292 def read_register_values(self, reg_infos, endian, thread_id=None): 1293 self.assertIsNotNone(reg_infos) 1294 values = {} 1295 1296 for reg_info in reg_infos: 1297 # We append a register index when load reg infos so we can work 1298 # with subsets. 1299 reg_index = reg_info.get("lldb_register_index") 1300 self.assertIsNotNone(reg_index) 1301 1302 # Handle thread suffix. 1303 if thread_id: 1304 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1305 reg_index, thread_id) 1306 else: 1307 p_request = "read packet: $p{:x}#00".format(reg_index) 1308 1309 # Read it with p. 1310 self.reset_test_sequence() 1311 self.test_sequence.add_log_lines([ 1312 p_request, 1313 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1314 ], True) 1315 context = self.expect_gdbremote_sequence() 1316 self.assertIsNotNone(context) 1317 1318 # Convert value from target endian to integral. 1319 p_response = context.get("p_response") 1320 self.assertIsNotNone(p_response) 1321 self.assertTrue(len(p_response) > 0) 1322 self.assertFalse(p_response[0] == "E") 1323 1324 values[reg_index] = unpack_register_hex_unsigned( 1325 endian, p_response) 1326 1327 return values 1328 1329 def add_vCont_query_packets(self): 1330 self.test_sequence.add_log_lines(["read packet: $vCont?#49", 1331 {"direction": "send", 1332 "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", 1333 "capture": {2: "vCont_query_response"}}, 1334 ], 1335 True) 1336 1337 def parse_vCont_query_response(self, context): 1338 self.assertIsNotNone(context) 1339 vCont_query_response = context.get("vCont_query_response") 1340 1341 # Handle case of no vCont support at all - in which case the capture 1342 # group will be none or zero length. 1343 if not vCont_query_response or len(vCont_query_response) == 0: 1344 return {} 1345 1346 return {key: 1 for key in vCont_query_response.split( 1347 ";") if key and len(key) > 0} 1348 1349 def count_single_steps_until_true( 1350 self, 1351 thread_id, 1352 predicate, 1353 args, 1354 max_step_count=100, 1355 use_Hc_packet=True, 1356 step_instruction="s"): 1357 """Used by single step test that appears in a few different contexts.""" 1358 single_step_count = 0 1359 1360 while single_step_count < max_step_count: 1361 self.assertIsNotNone(thread_id) 1362 1363 # Build the packet for the single step instruction. We replace 1364 # {thread}, if present, with the thread_id. 1365 step_packet = "read packet: ${}#00".format( 1366 re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction)) 1367 # print("\nstep_packet created: {}\n".format(step_packet)) 1368 1369 # Single step. 1370 self.reset_test_sequence() 1371 if use_Hc_packet: 1372 self.test_sequence.add_log_lines( 1373 [ # Set the continue thread. 1374 "read packet: $Hc{0:x}#00".format(thread_id), 1375 "send packet: $OK#00", 1376 ], True) 1377 self.test_sequence.add_log_lines([ 1378 # Single step. 1379 step_packet, 1380 # "read packet: $vCont;s:{0:x}#00".format(thread_id), 1381 # Expect a breakpoint stop report. 1382 {"direction": "send", 1383 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 1384 "capture": {1: "stop_signo", 1385 2: "stop_thread_id"}}, 1386 ], True) 1387 context = self.expect_gdbremote_sequence() 1388 self.assertIsNotNone(context) 1389 self.assertIsNotNone(context.get("stop_signo")) 1390 self.assertEqual(int(context.get("stop_signo"), 16), 1391 lldbutil.get_signal_number('SIGTRAP')) 1392 1393 single_step_count += 1 1394 1395 # See if the predicate is true. If so, we're done. 1396 if predicate(args): 1397 return (True, single_step_count) 1398 1399 # The predicate didn't return true within the runaway step count. 1400 return (False, single_step_count) 1401 1402 def g_c1_c2_contents_are(self, args): 1403 """Used by single step test that appears in a few different contexts.""" 1404 g_c1_address = args["g_c1_address"] 1405 g_c2_address = args["g_c2_address"] 1406 expected_g_c1 = args["expected_g_c1"] 1407 expected_g_c2 = args["expected_g_c2"] 1408 1409 # Read g_c1 and g_c2 contents. 1410 self.reset_test_sequence() 1411 self.test_sequence.add_log_lines( 1412 ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1), 1413 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}}, 1414 "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1), 1415 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}], 1416 True) 1417 1418 # Run the packet stream. 1419 context = self.expect_gdbremote_sequence() 1420 self.assertIsNotNone(context) 1421 1422 # Check if what we read from inferior memory is what we are expecting. 1423 self.assertIsNotNone(context.get("g_c1_contents")) 1424 self.assertIsNotNone(context.get("g_c2_contents")) 1425 1426 return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and ( 1427 seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2) 1428 1429 def single_step_only_steps_one_instruction( 1430 self, use_Hc_packet=True, step_instruction="s"): 1431 """Used by single step test that appears in a few different contexts.""" 1432 # Start up the inferior. 1433 procs = self.prep_debug_monitor_and_inferior( 1434 inferior_args=[ 1435 "get-code-address-hex:swap_chars", 1436 "get-data-address-hex:g_c1", 1437 "get-data-address-hex:g_c2", 1438 "sleep:1", 1439 "call-function:swap_chars", 1440 "sleep:5"]) 1441 1442 # Run the process 1443 self.test_sequence.add_log_lines( 1444 [ # Start running after initial stop. 1445 "read packet: $c#63", 1446 # Match output line that prints the memory address of the function call entry point. 1447 # Note we require launch-only testing so we can get inferior otuput. 1448 {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", 1449 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}}, 1450 # Now stop the inferior. 1451 "read packet: {}".format(chr(3)), 1452 # And wait for the stop notification. 1453 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 1454 True) 1455 1456 # Run the packet stream. 1457 context = self.expect_gdbremote_sequence() 1458 self.assertIsNotNone(context) 1459 1460 # Grab the main thread id. 1461 self.assertIsNotNone(context.get("stop_thread_id")) 1462 main_thread_id = int(context.get("stop_thread_id"), 16) 1463 1464 # Grab the function address. 1465 self.assertIsNotNone(context.get("function_address")) 1466 function_address = int(context.get("function_address"), 16) 1467 1468 # Grab the data addresses. 1469 self.assertIsNotNone(context.get("g_c1_address")) 1470 g_c1_address = int(context.get("g_c1_address"), 16) 1471 1472 self.assertIsNotNone(context.get("g_c2_address")) 1473 g_c2_address = int(context.get("g_c2_address"), 16) 1474 1475 # Set a breakpoint at the given address. 1476 if self.getArchitecture().startswith("arm"): 1477 # TODO: Handle case when setting breakpoint in thumb code 1478 BREAKPOINT_KIND = 4 1479 else: 1480 BREAKPOINT_KIND = 1 1481 self.reset_test_sequence() 1482 self.add_set_breakpoint_packets( 1483 function_address, 1484 do_continue=True, 1485 breakpoint_kind=BREAKPOINT_KIND) 1486 context = self.expect_gdbremote_sequence() 1487 self.assertIsNotNone(context) 1488 1489 # Remove the breakpoint. 1490 self.reset_test_sequence() 1491 self.add_remove_breakpoint_packets( 1492 function_address, breakpoint_kind=BREAKPOINT_KIND) 1493 context = self.expect_gdbremote_sequence() 1494 self.assertIsNotNone(context) 1495 1496 # Verify g_c1 and g_c2 match expected initial state. 1497 args = {} 1498 args["g_c1_address"] = g_c1_address 1499 args["g_c2_address"] = g_c2_address 1500 args["expected_g_c1"] = "0" 1501 args["expected_g_c2"] = "1" 1502 1503 self.assertTrue(self.g_c1_c2_contents_are(args)) 1504 1505 # Verify we take only a small number of steps to hit the first state. 1506 # Might need to work through function entry prologue code. 1507 args["expected_g_c1"] = "1" 1508 args["expected_g_c2"] = "1" 1509 (state_reached, 1510 step_count) = self.count_single_steps_until_true(main_thread_id, 1511 self.g_c1_c2_contents_are, 1512 args, 1513 max_step_count=25, 1514 use_Hc_packet=use_Hc_packet, 1515 step_instruction=step_instruction) 1516 self.assertTrue(state_reached) 1517 1518 # Verify we hit the next state. 1519 args["expected_g_c1"] = "1" 1520 args["expected_g_c2"] = "0" 1521 (state_reached, 1522 step_count) = self.count_single_steps_until_true(main_thread_id, 1523 self.g_c1_c2_contents_are, 1524 args, 1525 max_step_count=5, 1526 use_Hc_packet=use_Hc_packet, 1527 step_instruction=step_instruction) 1528 self.assertTrue(state_reached) 1529 expected_step_count = 1 1530 arch = self.getArchitecture() 1531 1532 # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation 1533 # of variable value 1534 if re.match("mips", arch): 1535 expected_step_count = 3 1536 # S390X requires "2" (LARL, MVI) machine instructions for updation of 1537 # variable value 1538 if re.match("s390x", arch): 1539 expected_step_count = 2 1540 # ARM64 requires "4" instructions: 2 to compute the address (adrp, add), 1541 # one to materialize the constant (mov) and the store 1542 if re.match("arm64", arch): 1543 expected_step_count = 4 1544 1545 self.assertEqual(step_count, expected_step_count) 1546 1547 # ARM64: Once addresses and constants are materialized, only one 1548 # instruction is needed. 1549 if re.match("arm64", arch): 1550 expected_step_count = 1 1551 1552 # Verify we hit the next state. 1553 args["expected_g_c1"] = "0" 1554 args["expected_g_c2"] = "0" 1555 (state_reached, 1556 step_count) = self.count_single_steps_until_true(main_thread_id, 1557 self.g_c1_c2_contents_are, 1558 args, 1559 max_step_count=5, 1560 use_Hc_packet=use_Hc_packet, 1561 step_instruction=step_instruction) 1562 self.assertTrue(state_reached) 1563 self.assertEqual(step_count, expected_step_count) 1564 1565 # Verify we hit the next state. 1566 args["expected_g_c1"] = "0" 1567 args["expected_g_c2"] = "1" 1568 (state_reached, 1569 step_count) = self.count_single_steps_until_true(main_thread_id, 1570 self.g_c1_c2_contents_are, 1571 args, 1572 max_step_count=5, 1573 use_Hc_packet=use_Hc_packet, 1574 step_instruction=step_instruction) 1575 self.assertTrue(state_reached) 1576 self.assertEqual(step_count, expected_step_count) 1577 1578 def maybe_strict_output_regex(self, regex): 1579 return '.*' + regex + \ 1580 '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$' 1581 1582 def install_and_create_launch_args(self): 1583 exe_path = self.getBuildArtifact("a.out") 1584 if not lldb.remote_platform: 1585 return [exe_path] 1586 remote_path = lldbutil.append_to_process_working_directory(self, 1587 os.path.basename(exe_path)) 1588 remote_file_spec = lldb.SBFileSpec(remote_path, False) 1589 err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True), 1590 remote_file_spec) 1591 if err.Fail(): 1592 raise Exception("remote_platform.Install('%s', '%s') failed: %s" % 1593 (exe_path, remote_path, err)) 1594 return [remote_path] 1595