1""" 2Base class for gdb-remote test cases. 3""" 4 5from __future__ import division, print_function 6 7 8import errno 9import os 10import os.path 11import random 12import re 13import select 14import socket 15import subprocess 16import sys 17import tempfile 18import time 19from lldbsuite.test import configuration 20from lldbsuite.test.lldbtest import * 21from lldbsuite.support import seven 22from lldbgdbserverutils import * 23import logging 24 25 26class _ConnectionRefused(IOError): 27 pass 28 29 30class GdbRemoteTestCaseFactory(type): 31 32 def __new__(cls, name, bases, attrs): 33 newattrs = {} 34 for attrname, attrvalue in attrs.items(): 35 if not attrname.startswith("test"): 36 newattrs[attrname] = attrvalue 37 continue 38 39 # If any debug server categories were explicitly tagged, assume 40 # that list to be authoritative. If none were specified, try 41 # all of them. 42 all_categories = set(["debugserver", "llgs"]) 43 categories = set( 44 getattr(attrvalue, "categories", [])) & all_categories 45 if not categories: 46 categories = all_categories 47 48 for cat in categories: 49 @decorators.add_test_categories([cat]) 50 @wraps(attrvalue) 51 def test_method(self, attrvalue=attrvalue): 52 return attrvalue(self) 53 54 method_name = attrname + "_" + cat 55 test_method.__name__ = method_name 56 test_method.debug_server = cat 57 newattrs[method_name] = test_method 58 59 return super(GdbRemoteTestCaseFactory, cls).__new__( 60 cls, name, bases, newattrs) 61 62@add_metaclass(GdbRemoteTestCaseFactory) 63class GdbRemoteTestCaseBase(Base): 64 65 # Default time out in seconds. The timeout is increased tenfold under Asan. 66 DEFAULT_TIMEOUT = 20 * (10 if ('ASAN_OPTIONS' in os.environ) else 1) 67 # Default sleep time in seconds. The sleep time is doubled under Asan. 68 DEFAULT_SLEEP = 5 * (2 if ('ASAN_OPTIONS' in os.environ) else 1) 69 70 _GDBREMOTE_KILL_PACKET = b"$k#6b" 71 72 # Start the inferior separately, attach to the inferior on the stub 73 # command line. 74 _STARTUP_ATTACH = "attach" 75 # Start the inferior separately, start the stub without attaching, allow 76 # the test to attach to the inferior however it wants (e.g. $vAttach;pid). 77 _STARTUP_ATTACH_MANUALLY = "attach_manually" 78 # Start the stub, and launch the inferior with an $A packet via the 79 # initial packet stream. 80 _STARTUP_LAUNCH = "launch" 81 82 # GDB Signal numbers that are not target-specific used for common 83 # exceptions 84 TARGET_EXC_BAD_ACCESS = 0x91 85 TARGET_EXC_BAD_INSTRUCTION = 0x92 86 TARGET_EXC_ARITHMETIC = 0x93 87 TARGET_EXC_EMULATION = 0x94 88 TARGET_EXC_SOFTWARE = 0x95 89 TARGET_EXC_BREAKPOINT = 0x96 90 91 _verbose_log_handler = None 92 _log_formatter = logging.Formatter( 93 fmt='%(asctime)-15s %(levelname)-8s %(message)s') 94 95 def setUpBaseLogging(self): 96 self.logger = logging.getLogger(__name__) 97 98 if len(self.logger.handlers) > 0: 99 return # We have set up this handler already 100 101 self.logger.propagate = False 102 self.logger.setLevel(logging.DEBUG) 103 104 # log all warnings to stderr 105 handler = logging.StreamHandler() 106 handler.setLevel(logging.WARNING) 107 handler.setFormatter(self._log_formatter) 108 self.logger.addHandler(handler) 109 110 def isVerboseLoggingRequested(self): 111 # We will report our detailed logs if the user requested that the "gdb-remote" channel is 112 # logged. 113 return any(("gdb-remote" in channel) 114 for channel in lldbtest_config.channels) 115 116 def getDebugServer(self): 117 method = getattr(self, self.testMethodName) 118 return getattr(method, "debug_server", None) 119 120 def setUp(self): 121 super(GdbRemoteTestCaseBase, self).setUp() 122 123 self.setUpBaseLogging() 124 self.debug_monitor_extra_args = [] 125 126 if self.isVerboseLoggingRequested(): 127 # If requested, full logs go to a log file 128 self._verbose_log_handler = logging.FileHandler( 129 self.getLogBasenameForCurrentTest() + "-host.log") 130 self._verbose_log_handler.setFormatter(self._log_formatter) 131 self._verbose_log_handler.setLevel(logging.DEBUG) 132 self.logger.addHandler(self._verbose_log_handler) 133 134 self.test_sequence = GdbRemoteTestSequence(self.logger) 135 self.set_inferior_startup_launch() 136 self.port = self.get_next_port() 137 self.stub_sends_two_stop_notifications_on_kill = False 138 if configuration.lldb_platform_url: 139 if configuration.lldb_platform_url.startswith('unix-'): 140 url_pattern = '(.+)://\[?(.+?)\]?/.*' 141 else: 142 url_pattern = '(.+)://(.+):\d+' 143 scheme, host = re.match( 144 url_pattern, configuration.lldb_platform_url).groups() 145 if configuration.lldb_platform_name == 'remote-android' and host != 'localhost': 146 self.stub_device = host 147 self.stub_hostname = 'localhost' 148 else: 149 self.stub_device = None 150 self.stub_hostname = host 151 else: 152 self.stub_hostname = "localhost" 153 154 debug_server = self.getDebugServer() 155 if debug_server == "debugserver": 156 self._init_debugserver_test() 157 else: 158 self._init_llgs_test() 159 160 def tearDown(self): 161 self.logger.removeHandler(self._verbose_log_handler) 162 self._verbose_log_handler = None 163 TestBase.tearDown(self) 164 165 def getLocalServerLogFile(self): 166 return self.getLogBasenameForCurrentTest() + "-server.log" 167 168 def setUpServerLogging(self, is_llgs): 169 if len(lldbtest_config.channels) == 0: 170 return # No logging requested 171 172 if lldb.remote_platform: 173 log_file = lldbutil.join_remote_paths( 174 lldb.remote_platform.GetWorkingDirectory(), "server.log") 175 else: 176 log_file = self.getLocalServerLogFile() 177 178 if is_llgs: 179 self.debug_monitor_extra_args.append("--log-file=" + log_file) 180 self.debug_monitor_extra_args.append( 181 "--log-channels={}".format(":".join(lldbtest_config.channels))) 182 else: 183 self.debug_monitor_extra_args = [ 184 "--log-file=" + log_file, "--log-flags=0x800000"] 185 186 def get_next_port(self): 187 return 12000 + random.randint(0, 3999) 188 189 def reset_test_sequence(self): 190 self.test_sequence = GdbRemoteTestSequence(self.logger) 191 192 193 def _init_llgs_test(self): 194 reverse_connect = True 195 if lldb.remote_platform: 196 # Reverse connections may be tricky due to firewalls/NATs. 197 reverse_connect = False 198 199 # FIXME: This is extremely linux-oriented 200 201 # Grab the ppid from /proc/[shell pid]/stat 202 err, retcode, shell_stat = self.run_platform_command( 203 "cat /proc/$$/stat") 204 self.assertTrue( 205 err.Success() and retcode == 0, 206 "Failed to read file /proc/$$/stat: %s, retcode: %d" % 207 (err.GetCString(), 208 retcode)) 209 210 # [pid] ([executable]) [state] [*ppid*] 211 pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1) 212 err, retcode, ls_output = self.run_platform_command( 213 "ls -l /proc/%s/exe" % pid) 214 self.assertTrue( 215 err.Success() and retcode == 0, 216 "Failed to read file /proc/%s/exe: %s, retcode: %d" % 217 (pid, 218 err.GetCString(), 219 retcode)) 220 exe = ls_output.split()[-1] 221 222 # If the binary has been deleted, the link name has " (deleted)" appended. 223 # Remove if it's there. 224 self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe) 225 else: 226 self.debug_monitor_exe = get_lldb_server_exe() 227 228 self.debug_monitor_extra_args = ["gdbserver"] 229 self.setUpServerLogging(is_llgs=True) 230 231 self.reverse_connect = reverse_connect 232 233 def _init_debugserver_test(self): 234 self.debug_monitor_exe = get_debugserver_exe() 235 self.setUpServerLogging(is_llgs=False) 236 self.reverse_connect = True 237 238 # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification 239 # when the process truly dies. 240 self.stub_sends_two_stop_notifications_on_kill = True 241 242 def forward_adb_port(self, source, target, direction, device): 243 adb = ['adb'] + (['-s', device] if device else []) + [direction] 244 245 def remove_port_forward(): 246 subprocess.call(adb + ["--remove", "tcp:%d" % source]) 247 248 subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target]) 249 self.addTearDownHook(remove_port_forward) 250 251 def _verify_socket(self, sock): 252 # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the 253 # connect() attempt. However, due to the way how ADB forwarding works, on android targets 254 # the connect() will always be successful, but the connection will be immediately dropped 255 # if ADB could not connect on the remote side. This function tries to detect this 256 # situation, and report it as "connection refused" so that the upper layers attempt the 257 # connection again. 258 triple = self.dbg.GetSelectedPlatform().GetTriple() 259 if not re.match(".*-.*-.*-android", triple): 260 return # Not android. 261 can_read, _, _ = select.select([sock], [], [], 0.1) 262 if sock not in can_read: 263 return # Data is not available, but the connection is alive. 264 if len(sock.recv(1, socket.MSG_PEEK)) == 0: 265 raise _ConnectionRefused() # Got EOF, connection dropped. 266 267 def create_socket(self): 268 try: 269 sock = socket.socket(family=socket.AF_INET) 270 except OSError as e: 271 if e.errno != errno.EAFNOSUPPORT: 272 raise 273 sock = socket.socket(family=socket.AF_INET6) 274 275 logger = self.logger 276 277 triple = self.dbg.GetSelectedPlatform().GetTriple() 278 if re.match(".*-.*-.*-android", triple): 279 self.forward_adb_port( 280 self.port, 281 self.port, 282 "forward", 283 self.stub_device) 284 285 logger.info( 286 "Connecting to debug monitor on %s:%d", 287 self.stub_hostname, 288 self.port) 289 connect_info = (self.stub_hostname, self.port) 290 try: 291 sock.connect(connect_info) 292 except socket.error as serr: 293 if serr.errno == errno.ECONNREFUSED: 294 raise _ConnectionRefused() 295 raise serr 296 297 def shutdown_socket(): 298 if sock: 299 try: 300 # send the kill packet so lldb-server shuts down gracefully 301 sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET) 302 except: 303 logger.warning( 304 "failed to send kill packet to debug monitor: {}; ignoring".format( 305 sys.exc_info()[0])) 306 307 try: 308 sock.close() 309 except: 310 logger.warning( 311 "failed to close socket to debug monitor: {}; ignoring".format( 312 sys.exc_info()[0])) 313 314 self.addTearDownHook(shutdown_socket) 315 316 self._verify_socket(sock) 317 318 return sock 319 320 def set_inferior_startup_launch(self): 321 self._inferior_startup = self._STARTUP_LAUNCH 322 323 def set_inferior_startup_attach(self): 324 self._inferior_startup = self._STARTUP_ATTACH 325 326 def set_inferior_startup_attach_manually(self): 327 self._inferior_startup = self._STARTUP_ATTACH_MANUALLY 328 329 def get_debug_monitor_command_line_args(self, attach_pid=None): 330 commandline_args = self.debug_monitor_extra_args 331 if attach_pid: 332 commandline_args += ["--attach=%d" % attach_pid] 333 if self.reverse_connect: 334 commandline_args += ["--reverse-connect", self.connect_address] 335 else: 336 if lldb.remote_platform: 337 commandline_args += ["*:{}".format(self.port)] 338 else: 339 commandline_args += ["localhost:{}".format(self.port)] 340 341 return commandline_args 342 343 def get_target_byte_order(self): 344 inferior_exe_path = self.getBuildArtifact("a.out") 345 target = self.dbg.CreateTarget(inferior_exe_path) 346 return target.GetByteOrder() 347 348 def launch_debug_monitor(self, attach_pid=None, logfile=None): 349 if self.reverse_connect: 350 family, type, proto, _, addr = socket.getaddrinfo("localhost", 0, proto=socket.IPPROTO_TCP)[0] 351 sock = socket.socket(family, type, proto) 352 sock.settimeout(self.DEFAULT_TIMEOUT) 353 354 sock.bind(addr) 355 sock.listen(1) 356 addr = sock.getsockname() 357 self.connect_address = "[{}]:{}".format(*addr) 358 359 360 # Create the command line. 361 commandline_args = self.get_debug_monitor_command_line_args( 362 attach_pid=attach_pid) 363 364 # Start the server. 365 server = self.spawnSubprocess( 366 self.debug_monitor_exe, 367 commandline_args, 368 install_remote=False) 369 self.assertIsNotNone(server) 370 371 if self.reverse_connect: 372 self.sock = sock.accept()[0] 373 self.sock.settimeout(self.DEFAULT_TIMEOUT) 374 375 return server 376 377 def connect_to_debug_monitor(self, attach_pid=None): 378 if self.reverse_connect: 379 # Create the stub. 380 server = self.launch_debug_monitor(attach_pid=attach_pid) 381 self.assertIsNotNone(server) 382 383 # Schedule debug monitor to be shut down during teardown. 384 logger = self.logger 385 386 self._server = Server(self.sock, server) 387 return server 388 389 # We're using a random port algorithm to try not to collide with other ports, 390 # and retry a max # times. 391 attempts = 0 392 MAX_ATTEMPTS = 20 393 394 while attempts < MAX_ATTEMPTS: 395 server = self.launch_debug_monitor(attach_pid=attach_pid) 396 397 # Schedule debug monitor to be shut down during teardown. 398 logger = self.logger 399 400 connect_attemps = 0 401 MAX_CONNECT_ATTEMPTS = 10 402 403 while connect_attemps < MAX_CONNECT_ATTEMPTS: 404 # Create a socket to talk to the server 405 try: 406 logger.info("Connect attempt %d", connect_attemps + 1) 407 self.sock = self.create_socket() 408 self._server = Server(self.sock, server) 409 return server 410 except _ConnectionRefused as serr: 411 # Ignore, and try again. 412 pass 413 time.sleep(0.5) 414 connect_attemps += 1 415 416 # We should close the server here to be safe. 417 server.terminate() 418 419 # Increment attempts. 420 print( 421 "connect to debug monitor on port %d failed, attempt #%d of %d" % 422 (self.port, attempts + 1, MAX_ATTEMPTS)) 423 attempts += 1 424 425 # And wait a random length of time before next attempt, to avoid 426 # collisions. 427 time.sleep(random.randint(1, 5)) 428 429 # Now grab a new port number. 430 self.port = self.get_next_port() 431 432 raise Exception( 433 "failed to create a socket to the launched debug monitor after %d tries" % 434 attempts) 435 436 def launch_process_for_attach( 437 self, 438 inferior_args=None, 439 sleep_seconds=3, 440 exe_path=None): 441 # We're going to start a child process that the debug monitor stub can later attach to. 442 # This process needs to be started so that it just hangs around for a while. We'll 443 # have it sleep. 444 if not exe_path: 445 exe_path = self.getBuildArtifact("a.out") 446 447 args = [] 448 if inferior_args: 449 args.extend(inferior_args) 450 if sleep_seconds: 451 args.append("sleep:%d" % sleep_seconds) 452 453 return self.spawnSubprocess(exe_path, args) 454 455 def prep_debug_monitor_and_inferior( 456 self, 457 inferior_args=None, 458 inferior_sleep_seconds=3, 459 inferior_exe_path=None, 460 inferior_env=None): 461 """Prep the debug monitor, the inferior, and the expected packet stream. 462 463 Handle the separate cases of using the debug monitor in attach-to-inferior mode 464 and in launch-inferior mode. 465 466 For attach-to-inferior mode, the inferior process is first started, then 467 the debug monitor is started in attach to pid mode (using --attach on the 468 stub command line), and the no-ack-mode setup is appended to the packet 469 stream. The packet stream is not yet executed, ready to have more expected 470 packet entries added to it. 471 472 For launch-inferior mode, the stub is first started, then no ack mode is 473 setup on the expected packet stream, then the verified launch packets are added 474 to the expected socket stream. The packet stream is not yet executed, ready 475 to have more expected packet entries added to it. 476 477 The return value is: 478 {inferior:<inferior>, server:<server>} 479 """ 480 inferior = None 481 attach_pid = None 482 483 if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY: 484 # Launch the process that we'll use as the inferior. 485 inferior = self.launch_process_for_attach( 486 inferior_args=inferior_args, 487 sleep_seconds=inferior_sleep_seconds, 488 exe_path=inferior_exe_path) 489 self.assertIsNotNone(inferior) 490 self.assertTrue(inferior.pid > 0) 491 if self._inferior_startup == self._STARTUP_ATTACH: 492 # In this case, we want the stub to attach via the command 493 # line, so set the command line attach pid here. 494 attach_pid = inferior.pid 495 496 if self._inferior_startup == self._STARTUP_LAUNCH: 497 # Build launch args 498 if not inferior_exe_path: 499 inferior_exe_path = self.getBuildArtifact("a.out") 500 501 if lldb.remote_platform: 502 remote_path = lldbutil.append_to_process_working_directory(self, 503 os.path.basename(inferior_exe_path)) 504 remote_file_spec = lldb.SBFileSpec(remote_path, False) 505 err = lldb.remote_platform.Install(lldb.SBFileSpec( 506 inferior_exe_path, True), remote_file_spec) 507 if err.Fail(): 508 raise Exception( 509 "remote_platform.Install('%s', '%s') failed: %s" % 510 (inferior_exe_path, remote_path, err)) 511 inferior_exe_path = remote_path 512 513 launch_args = [inferior_exe_path] 514 if inferior_args: 515 launch_args.extend(inferior_args) 516 517 # Launch the debug monitor stub, attaching to the inferior. 518 server = self.connect_to_debug_monitor(attach_pid=attach_pid) 519 self.assertIsNotNone(server) 520 521 self.do_handshake() 522 523 # Build the expected protocol stream 524 if inferior_env: 525 for name, value in inferior_env.items(): 526 self.add_set_environment_packets(name, value) 527 if self._inferior_startup == self._STARTUP_LAUNCH: 528 self.add_verified_launch_packets(launch_args) 529 530 return {"inferior": inferior, "server": server} 531 532 def do_handshake(self): 533 server = self._server 534 server.send_ack() 535 server.send_packet(b"QStartNoAckMode") 536 self.assertEqual(server.get_normal_packet(), b"+") 537 self.assertEqual(server.get_normal_packet(), b"OK") 538 server.send_ack() 539 540 def add_verified_launch_packets(self, launch_args): 541 self.test_sequence.add_log_lines( 542 ["read packet: %s" % build_gdbremote_A_packet(launch_args), 543 "send packet: $OK#00", 544 "read packet: $qLaunchSuccess#a5", 545 "send packet: $OK#00"], 546 True) 547 548 def add_thread_suffix_request_packets(self): 549 self.test_sequence.add_log_lines( 550 ["read packet: $QThreadSuffixSupported#e4", 551 "send packet: $OK#00", 552 ], True) 553 554 def add_process_info_collection_packets(self): 555 self.test_sequence.add_log_lines( 556 ["read packet: $qProcessInfo#dc", 557 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}], 558 True) 559 560 def add_set_environment_packets(self, name, value): 561 self.test_sequence.add_log_lines( 562 ["read packet: $QEnvironment:" + name + "=" + value + "#00", 563 "send packet: $OK#00", 564 ], True) 565 566 _KNOWN_PROCESS_INFO_KEYS = [ 567 "pid", 568 "parent-pid", 569 "real-uid", 570 "real-gid", 571 "effective-uid", 572 "effective-gid", 573 "cputype", 574 "cpusubtype", 575 "ostype", 576 "triple", 577 "vendor", 578 "endian", 579 "elf_abi", 580 "ptrsize" 581 ] 582 583 def parse_process_info_response(self, context): 584 # Ensure we have a process info response. 585 self.assertIsNotNone(context) 586 process_info_raw = context.get("process_info_raw") 587 self.assertIsNotNone(process_info_raw) 588 589 # Pull out key:value; pairs. 590 process_info_dict = { 591 match.group(1): match.group(2) for match in re.finditer( 592 r"([^:]+):([^;]+);", process_info_raw)} 593 594 # Validate keys are known. 595 for (key, val) in list(process_info_dict.items()): 596 self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS) 597 self.assertIsNotNone(val) 598 599 return process_info_dict 600 601 def add_register_info_collection_packets(self): 602 self.test_sequence.add_log_lines( 603 [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True, 604 "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"), 605 "save_key": "reg_info_responses"}], 606 True) 607 608 def parse_register_info_packets(self, context): 609 """Return an array of register info dictionaries, one per register info.""" 610 reg_info_responses = context.get("reg_info_responses") 611 self.assertIsNotNone(reg_info_responses) 612 613 # Parse register infos. 614 return [parse_reg_info_response(reg_info_response) 615 for reg_info_response in reg_info_responses] 616 617 def expect_gdbremote_sequence(self): 618 return expect_lldb_gdbserver_replay( 619 self, 620 self._server, 621 self.test_sequence, 622 self.DEFAULT_TIMEOUT * len(self.test_sequence), 623 self.logger) 624 625 _KNOWN_REGINFO_KEYS = [ 626 "name", 627 "alt-name", 628 "bitsize", 629 "offset", 630 "encoding", 631 "format", 632 "set", 633 "gcc", 634 "ehframe", 635 "dwarf", 636 "generic", 637 "container-regs", 638 "invalidate-regs", 639 "dynamic_size_dwarf_expr_bytes", 640 "dynamic_size_dwarf_len" 641 ] 642 643 def assert_valid_reg_info(self, reg_info): 644 # Assert we know about all the reginfo keys parsed. 645 for key in reg_info: 646 self.assertTrue(key in self._KNOWN_REGINFO_KEYS) 647 648 # Check the bare-minimum expected set of register info keys. 649 self.assertTrue("name" in reg_info) 650 self.assertTrue("bitsize" in reg_info) 651 652 if not self.getArchitecture() == 'aarch64': 653 self.assertTrue("offset" in reg_info) 654 655 self.assertTrue("encoding" in reg_info) 656 self.assertTrue("format" in reg_info) 657 658 def find_pc_reg_info(self, reg_infos): 659 lldb_reg_index = 0 660 for reg_info in reg_infos: 661 if ("generic" in reg_info) and (reg_info["generic"] == "pc"): 662 return (lldb_reg_index, reg_info) 663 lldb_reg_index += 1 664 665 return (None, None) 666 667 def add_lldb_register_index(self, reg_infos): 668 """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry. 669 670 We'll use this when we want to call packets like P/p with a register index but do so 671 on only a subset of the full register info set. 672 """ 673 self.assertIsNotNone(reg_infos) 674 675 reg_index = 0 676 for reg_info in reg_infos: 677 reg_info["lldb_register_index"] = reg_index 678 reg_index += 1 679 680 def add_query_memory_region_packets(self, address): 681 self.test_sequence.add_log_lines( 682 ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address), 683 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}], 684 True) 685 686 def parse_key_val_dict(self, key_val_text, allow_dupes=True): 687 self.assertIsNotNone(key_val_text) 688 kv_dict = {} 689 for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text): 690 key = match.group(1) 691 val = match.group(2) 692 if key in kv_dict: 693 if allow_dupes: 694 if isinstance(kv_dict[key], list): 695 kv_dict[key].append(val) 696 else: 697 # Promote to list 698 kv_dict[key] = [kv_dict[key], val] 699 else: 700 self.fail( 701 "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format( 702 key, val, key_val_text, kv_dict)) 703 else: 704 kv_dict[key] = val 705 return kv_dict 706 707 def parse_memory_region_packet(self, context): 708 # Ensure we have a context. 709 self.assertIsNotNone(context.get("memory_region_response")) 710 711 # Pull out key:value; pairs. 712 mem_region_dict = self.parse_key_val_dict( 713 context.get("memory_region_response")) 714 715 # Validate keys are known. 716 for (key, val) in list(mem_region_dict.items()): 717 self.assertIn(key, 718 ["start", 719 "size", 720 "permissions", 721 "flags", 722 "name", 723 "error", 724 "dirty-pages", 725 "type"]) 726 self.assertIsNotNone(val) 727 728 mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", "")) 729 # Return the dictionary of key-value pairs for the memory region. 730 return mem_region_dict 731 732 def assert_address_within_memory_region( 733 self, test_address, mem_region_dict): 734 self.assertIsNotNone(mem_region_dict) 735 self.assertTrue("start" in mem_region_dict) 736 self.assertTrue("size" in mem_region_dict) 737 738 range_start = int(mem_region_dict["start"], 16) 739 range_size = int(mem_region_dict["size"], 16) 740 range_end = range_start + range_size 741 742 if test_address < range_start: 743 self.fail( 744 "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 745 test_address, 746 range_start, 747 range_end, 748 range_size)) 749 elif test_address >= range_end: 750 self.fail( 751 "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 752 test_address, 753 range_start, 754 range_end, 755 range_size)) 756 757 def add_threadinfo_collection_packets(self): 758 self.test_sequence.add_log_lines( 759 [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo", 760 "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"), 761 "save_key": "threadinfo_responses"}], 762 True) 763 764 def parse_threadinfo_packets(self, context): 765 """Return an array of thread ids (decimal ints), one per thread.""" 766 threadinfo_responses = context.get("threadinfo_responses") 767 self.assertIsNotNone(threadinfo_responses) 768 769 thread_ids = [] 770 for threadinfo_response in threadinfo_responses: 771 new_thread_infos = parse_threadinfo_response(threadinfo_response) 772 thread_ids.extend(new_thread_infos) 773 return thread_ids 774 775 def launch_with_threads(self, thread_count): 776 procs = self.prep_debug_monitor_and_inferior( 777 inferior_args=["thread:new"]*(thread_count-1) + ["trap"]) 778 779 self.test_sequence.add_log_lines([ 780 "read packet: $c#00", 781 {"direction": "send", 782 "regex": r"^\$T([0-9a-fA-F]{2})([^#]*)#..$", 783 "capture": {1: "stop_signo", 2: "stop_reply_kv"}}], True) 784 self.add_threadinfo_collection_packets() 785 context = self.expect_gdbremote_sequence() 786 threads = self.parse_threadinfo_packets(context) 787 self.assertGreaterEqual(len(threads), thread_count) 788 return context, threads 789 790 def add_set_breakpoint_packets( 791 self, 792 address, 793 z_packet_type=0, 794 do_continue=True, 795 breakpoint_kind=1): 796 self.test_sequence.add_log_lines( 797 [ # Set the breakpoint. 798 "read packet: $Z{2},{0:x},{1}#00".format( 799 address, breakpoint_kind, z_packet_type), 800 # Verify the stub could set it. 801 "send packet: $OK#00", 802 ], True) 803 804 if (do_continue): 805 self.test_sequence.add_log_lines( 806 [ # Continue the inferior. 807 "read packet: $c#63", 808 # Expect a breakpoint stop report. 809 {"direction": "send", 810 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 811 "capture": {1: "stop_signo", 812 2: "stop_thread_id"}}, 813 ], True) 814 815 def add_remove_breakpoint_packets( 816 self, 817 address, 818 z_packet_type=0, 819 breakpoint_kind=1): 820 self.test_sequence.add_log_lines( 821 [ # Remove the breakpoint. 822 "read packet: $z{2},{0:x},{1}#00".format( 823 address, breakpoint_kind, z_packet_type), 824 # Verify the stub could unset it. 825 "send packet: $OK#00", 826 ], True) 827 828 def add_qSupported_packets(self, client_features=[]): 829 features = ''.join(';' + x for x in client_features) 830 self.test_sequence.add_log_lines( 831 ["read packet: $qSupported{}#00".format(features), 832 {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}}, 833 ], True) 834 835 _KNOWN_QSUPPORTED_STUB_FEATURES = [ 836 "augmented-libraries-svr4-read", 837 "PacketSize", 838 "QStartNoAckMode", 839 "QThreadSuffixSupported", 840 "QListThreadsInStopReply", 841 "qXfer:auxv:read", 842 "qXfer:libraries:read", 843 "qXfer:libraries-svr4:read", 844 "qXfer:features:read", 845 "qXfer:siginfo:read", 846 "qEcho", 847 "QPassSignals", 848 "multiprocess", 849 "fork-events", 850 "vfork-events", 851 "memory-tagging", 852 "qSaveCore", 853 "native-signals", 854 "QNonStop", 855 ] 856 857 def parse_qSupported_response(self, context): 858 self.assertIsNotNone(context) 859 860 raw_response = context.get("qSupported_response") 861 self.assertIsNotNone(raw_response) 862 863 # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the 864 # +,-,? is stripped from the key and set as the value. 865 supported_dict = {} 866 for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response): 867 key = match.group(1) 868 val = match.group(3) 869 870 # key=val: store as is 871 if val and len(val) > 0: 872 supported_dict[key] = val 873 else: 874 if len(key) < 2: 875 raise Exception( 876 "singular stub feature is too short: must be stub_feature{+,-,?}") 877 supported_type = key[-1] 878 key = key[:-1] 879 if not supported_type in ["+", "-", "?"]: 880 raise Exception( 881 "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type)) 882 supported_dict[key] = supported_type 883 # Ensure we know the supported element 884 if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES: 885 raise Exception( 886 "unknown qSupported stub feature reported: %s" % 887 key) 888 889 return supported_dict 890 891 def continue_process_and_wait_for_stop(self): 892 self.test_sequence.add_log_lines( 893 [ 894 "read packet: $vCont;c#a8", 895 { 896 "direction": "send", 897 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 898 "capture": {1: "stop_signo", 2: "stop_key_val_text"}, 899 }, 900 ], 901 True, 902 ) 903 context = self.expect_gdbremote_sequence() 904 self.assertIsNotNone(context) 905 return self.parse_interrupt_packets(context) 906 907 def select_modifiable_register(self, reg_infos): 908 """Find a register that can be read/written freely.""" 909 PREFERRED_REGISTER_NAMES = set(["rax", ]) 910 911 # First check for the first register from the preferred register name 912 # set. 913 alternative_register_index = None 914 915 self.assertIsNotNone(reg_infos) 916 for reg_info in reg_infos: 917 if ("name" in reg_info) and ( 918 reg_info["name"] in PREFERRED_REGISTER_NAMES): 919 # We found a preferred register. Use it. 920 return reg_info["lldb_register_index"] 921 if ("generic" in reg_info) and (reg_info["generic"] == "fp" or 922 reg_info["generic"] == "arg1"): 923 # A frame pointer or first arg register will do as a 924 # register to modify temporarily. 925 alternative_register_index = reg_info["lldb_register_index"] 926 927 # We didn't find a preferred register. Return whatever alternative register 928 # we found, if any. 929 return alternative_register_index 930 931 def extract_registers_from_stop_notification(self, stop_key_vals_text): 932 self.assertIsNotNone(stop_key_vals_text) 933 kv_dict = self.parse_key_val_dict(stop_key_vals_text) 934 935 registers = {} 936 for (key, val) in list(kv_dict.items()): 937 if re.match(r"^[0-9a-fA-F]+$", key): 938 registers[int(key, 16)] = val 939 return registers 940 941 def gather_register_infos(self): 942 self.reset_test_sequence() 943 self.add_register_info_collection_packets() 944 945 context = self.expect_gdbremote_sequence() 946 self.assertIsNotNone(context) 947 948 reg_infos = self.parse_register_info_packets(context) 949 self.assertIsNotNone(reg_infos) 950 self.add_lldb_register_index(reg_infos) 951 952 return reg_infos 953 954 def find_generic_register_with_name(self, reg_infos, generic_name): 955 self.assertIsNotNone(reg_infos) 956 for reg_info in reg_infos: 957 if ("generic" in reg_info) and ( 958 reg_info["generic"] == generic_name): 959 return reg_info 960 return None 961 962 def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num): 963 self.assertIsNotNone(reg_infos) 964 for reg_info in reg_infos: 965 if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num): 966 return reg_info 967 return None 968 969 def decode_gdbremote_binary(self, encoded_bytes): 970 decoded_bytes = "" 971 i = 0 972 while i < len(encoded_bytes): 973 if encoded_bytes[i] == "}": 974 # Handle escaped char. 975 self.assertTrue(i + 1 < len(encoded_bytes)) 976 decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20) 977 i += 2 978 elif encoded_bytes[i] == "*": 979 # Handle run length encoding. 980 self.assertTrue(len(decoded_bytes) > 0) 981 self.assertTrue(i + 1 < len(encoded_bytes)) 982 repeat_count = ord(encoded_bytes[i + 1]) - 29 983 decoded_bytes += decoded_bytes[-1] * repeat_count 984 i += 2 985 else: 986 decoded_bytes += encoded_bytes[i] 987 i += 1 988 return decoded_bytes 989 990 def build_auxv_dict(self, endian, word_size, auxv_data): 991 self.assertIsNotNone(endian) 992 self.assertIsNotNone(word_size) 993 self.assertIsNotNone(auxv_data) 994 995 auxv_dict = {} 996 997 # PowerPC64le's auxvec has a special key that must be ignored. 998 # This special key may be used multiple times, resulting in 999 # multiple key/value pairs with the same key, which would otherwise 1000 # break this test check for repeated keys. 1001 # 1002 # AT_IGNOREPPC = 22 1003 ignored_keys_for_arch = { 'powerpc64le' : [22] } 1004 arch = self.getArchitecture() 1005 ignore_keys = None 1006 if arch in ignored_keys_for_arch: 1007 ignore_keys = ignored_keys_for_arch[arch] 1008 1009 while len(auxv_data) > 0: 1010 # Chop off key. 1011 raw_key = auxv_data[:word_size] 1012 auxv_data = auxv_data[word_size:] 1013 1014 # Chop of value. 1015 raw_value = auxv_data[:word_size] 1016 auxv_data = auxv_data[word_size:] 1017 1018 # Convert raw text from target endian. 1019 key = unpack_endian_binary_string(endian, raw_key) 1020 value = unpack_endian_binary_string(endian, raw_value) 1021 1022 if ignore_keys and key in ignore_keys: 1023 continue 1024 1025 # Handle ending entry. 1026 if key == 0: 1027 self.assertEqual(value, 0) 1028 return auxv_dict 1029 1030 # The key should not already be present. 1031 self.assertFalse(key in auxv_dict) 1032 auxv_dict[key] = value 1033 1034 self.fail( 1035 "should not reach here - implies required double zero entry not found") 1036 return auxv_dict 1037 1038 def read_binary_data_in_chunks(self, command_prefix, chunk_length): 1039 """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned.""" 1040 offset = 0 1041 done = False 1042 decoded_data = "" 1043 1044 while not done: 1045 # Grab the next iteration of data. 1046 self.reset_test_sequence() 1047 self.test_sequence.add_log_lines( 1048 [ 1049 "read packet: ${}{:x},{:x}:#00".format( 1050 command_prefix, 1051 offset, 1052 chunk_length), 1053 { 1054 "direction": "send", 1055 "regex": re.compile( 1056 r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", 1057 re.MULTILINE | re.DOTALL), 1058 "capture": { 1059 1: "response_type", 1060 2: "content_raw"}}], 1061 True) 1062 1063 context = self.expect_gdbremote_sequence() 1064 self.assertIsNotNone(context) 1065 1066 response_type = context.get("response_type") 1067 self.assertIsNotNone(response_type) 1068 self.assertTrue(response_type in ["l", "m"]) 1069 1070 # Move offset along. 1071 offset += chunk_length 1072 1073 # Figure out if we're done. We're done if the response type is l. 1074 done = response_type == "l" 1075 1076 # Decode binary data. 1077 content_raw = context.get("content_raw") 1078 if content_raw and len(content_raw) > 0: 1079 self.assertIsNotNone(content_raw) 1080 decoded_data += self.decode_gdbremote_binary(content_raw) 1081 return decoded_data 1082 1083 def add_interrupt_packets(self): 1084 self.test_sequence.add_log_lines([ 1085 # Send the intterupt. 1086 "read packet: {}".format(chr(3)), 1087 # And wait for the stop notification. 1088 {"direction": "send", 1089 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 1090 "capture": {1: "stop_signo", 1091 2: "stop_key_val_text"}}, 1092 ], True) 1093 1094 def parse_interrupt_packets(self, context): 1095 self.assertIsNotNone(context.get("stop_signo")) 1096 self.assertIsNotNone(context.get("stop_key_val_text")) 1097 return (int(context["stop_signo"], 16), self.parse_key_val_dict( 1098 context["stop_key_val_text"])) 1099 1100 def add_QSaveRegisterState_packets(self, thread_id): 1101 if thread_id: 1102 # Use the thread suffix form. 1103 request = "read packet: $QSaveRegisterState;thread:{:x}#00".format( 1104 thread_id) 1105 else: 1106 request = "read packet: $QSaveRegisterState#00" 1107 1108 self.test_sequence.add_log_lines([request, 1109 {"direction": "send", 1110 "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$", 1111 "capture": {1: "save_response"}}, 1112 ], 1113 True) 1114 1115 def parse_QSaveRegisterState_response(self, context): 1116 self.assertIsNotNone(context) 1117 1118 save_response = context.get("save_response") 1119 self.assertIsNotNone(save_response) 1120 1121 if len(save_response) < 1 or save_response[0] == "E": 1122 # error received 1123 return (False, None) 1124 else: 1125 return (True, int(save_response)) 1126 1127 def add_QRestoreRegisterState_packets(self, save_id, thread_id=None): 1128 if thread_id: 1129 # Use the thread suffix form. 1130 request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format( 1131 save_id, thread_id) 1132 else: 1133 request = "read packet: $QRestoreRegisterState:{}#00".format( 1134 save_id) 1135 1136 self.test_sequence.add_log_lines([ 1137 request, 1138 "send packet: $OK#00" 1139 ], True) 1140 1141 def flip_all_bits_in_each_register_value( 1142 self, reg_infos, endian, thread_id=None): 1143 self.assertIsNotNone(reg_infos) 1144 1145 successful_writes = 0 1146 failed_writes = 0 1147 1148 for reg_info in reg_infos: 1149 # Use the lldb register index added to the reg info. We're not necessarily 1150 # working off a full set of register infos, so an inferred register 1151 # index could be wrong. 1152 reg_index = reg_info["lldb_register_index"] 1153 self.assertIsNotNone(reg_index) 1154 1155 reg_byte_size = int(reg_info["bitsize"]) // 8 1156 self.assertTrue(reg_byte_size > 0) 1157 1158 # Handle thread suffix. 1159 if thread_id: 1160 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1161 reg_index, thread_id) 1162 else: 1163 p_request = "read packet: $p{:x}#00".format(reg_index) 1164 1165 # Read the existing value. 1166 self.reset_test_sequence() 1167 self.test_sequence.add_log_lines([ 1168 p_request, 1169 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1170 ], True) 1171 context = self.expect_gdbremote_sequence() 1172 self.assertIsNotNone(context) 1173 1174 # Verify the response length. 1175 p_response = context.get("p_response") 1176 self.assertIsNotNone(p_response) 1177 initial_reg_value = unpack_register_hex_unsigned( 1178 endian, p_response) 1179 1180 # Flip the value by xoring with all 1s 1181 all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8) 1182 flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16) 1183 # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int)) 1184 1185 # Handle thread suffix for P. 1186 if thread_id: 1187 P_request = "read packet: $P{:x}={};thread:{:x}#00".format( 1188 reg_index, pack_register_hex( 1189 endian, flipped_bits_int, byte_size=reg_byte_size), thread_id) 1190 else: 1191 P_request = "read packet: $P{:x}={}#00".format( 1192 reg_index, pack_register_hex( 1193 endian, flipped_bits_int, byte_size=reg_byte_size)) 1194 1195 # Write the flipped value to the register. 1196 self.reset_test_sequence() 1197 self.test_sequence.add_log_lines([P_request, 1198 {"direction": "send", 1199 "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", 1200 "capture": {1: "P_response"}}, 1201 ], 1202 True) 1203 context = self.expect_gdbremote_sequence() 1204 self.assertIsNotNone(context) 1205 1206 # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail 1207 # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them 1208 # all flipping perfectly. 1209 P_response = context.get("P_response") 1210 self.assertIsNotNone(P_response) 1211 if P_response == "OK": 1212 successful_writes += 1 1213 else: 1214 failed_writes += 1 1215 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response)) 1216 1217 # Read back the register value, ensure it matches the flipped 1218 # value. 1219 if P_response == "OK": 1220 self.reset_test_sequence() 1221 self.test_sequence.add_log_lines([ 1222 p_request, 1223 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1224 ], True) 1225 context = self.expect_gdbremote_sequence() 1226 self.assertIsNotNone(context) 1227 1228 verify_p_response_raw = context.get("p_response") 1229 self.assertIsNotNone(verify_p_response_raw) 1230 verify_bits = unpack_register_hex_unsigned( 1231 endian, verify_p_response_raw) 1232 1233 if verify_bits != flipped_bits_int: 1234 # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts. 1235 # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits)) 1236 successful_writes -= 1 1237 failed_writes += 1 1238 1239 return (successful_writes, failed_writes) 1240 1241 def is_bit_flippable_register(self, reg_info): 1242 if not reg_info: 1243 return False 1244 if not "set" in reg_info: 1245 return False 1246 if reg_info["set"] != "General Purpose Registers": 1247 return False 1248 if ("container-regs" in reg_info) and ( 1249 len(reg_info["container-regs"]) > 0): 1250 # Don't try to bit flip registers contained in another register. 1251 return False 1252 if re.match("^.s$", reg_info["name"]): 1253 # This is a 2-letter register name that ends in "s", like a segment register. 1254 # Don't try to bit flip these. 1255 return False 1256 if re.match("^(c|)psr$", reg_info["name"]): 1257 # This is an ARM program status register; don't flip it. 1258 return False 1259 # Okay, this looks fine-enough. 1260 return True 1261 1262 def read_register_values(self, reg_infos, endian, thread_id=None): 1263 self.assertIsNotNone(reg_infos) 1264 values = {} 1265 1266 for reg_info in reg_infos: 1267 # We append a register index when load reg infos so we can work 1268 # with subsets. 1269 reg_index = reg_info.get("lldb_register_index") 1270 self.assertIsNotNone(reg_index) 1271 1272 # Handle thread suffix. 1273 if thread_id: 1274 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1275 reg_index, thread_id) 1276 else: 1277 p_request = "read packet: $p{:x}#00".format(reg_index) 1278 1279 # Read it with p. 1280 self.reset_test_sequence() 1281 self.test_sequence.add_log_lines([ 1282 p_request, 1283 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1284 ], True) 1285 context = self.expect_gdbremote_sequence() 1286 self.assertIsNotNone(context) 1287 1288 # Convert value from target endian to integral. 1289 p_response = context.get("p_response") 1290 self.assertIsNotNone(p_response) 1291 self.assertTrue(len(p_response) > 0) 1292 self.assertFalse(p_response[0] == "E") 1293 1294 values[reg_index] = unpack_register_hex_unsigned( 1295 endian, p_response) 1296 1297 return values 1298 1299 def add_vCont_query_packets(self): 1300 self.test_sequence.add_log_lines(["read packet: $vCont?#49", 1301 {"direction": "send", 1302 "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", 1303 "capture": {2: "vCont_query_response"}}, 1304 ], 1305 True) 1306 1307 def parse_vCont_query_response(self, context): 1308 self.assertIsNotNone(context) 1309 vCont_query_response = context.get("vCont_query_response") 1310 1311 # Handle case of no vCont support at all - in which case the capture 1312 # group will be none or zero length. 1313 if not vCont_query_response or len(vCont_query_response) == 0: 1314 return {} 1315 1316 return {key: 1 for key in vCont_query_response.split( 1317 ";") if key and len(key) > 0} 1318 1319 def count_single_steps_until_true( 1320 self, 1321 thread_id, 1322 predicate, 1323 args, 1324 max_step_count=100, 1325 use_Hc_packet=True, 1326 step_instruction="s"): 1327 """Used by single step test that appears in a few different contexts.""" 1328 single_step_count = 0 1329 1330 while single_step_count < max_step_count: 1331 self.assertIsNotNone(thread_id) 1332 1333 # Build the packet for the single step instruction. We replace 1334 # {thread}, if present, with the thread_id. 1335 step_packet = "read packet: ${}#00".format( 1336 re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction)) 1337 # print("\nstep_packet created: {}\n".format(step_packet)) 1338 1339 # Single step. 1340 self.reset_test_sequence() 1341 if use_Hc_packet: 1342 self.test_sequence.add_log_lines( 1343 [ # Set the continue thread. 1344 "read packet: $Hc{0:x}#00".format(thread_id), 1345 "send packet: $OK#00", 1346 ], True) 1347 self.test_sequence.add_log_lines([ 1348 # Single step. 1349 step_packet, 1350 # "read packet: $vCont;s:{0:x}#00".format(thread_id), 1351 # Expect a breakpoint stop report. 1352 {"direction": "send", 1353 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 1354 "capture": {1: "stop_signo", 1355 2: "stop_thread_id"}}, 1356 ], True) 1357 context = self.expect_gdbremote_sequence() 1358 self.assertIsNotNone(context) 1359 self.assertIsNotNone(context.get("stop_signo")) 1360 self.assertEqual(int(context.get("stop_signo"), 16), 1361 lldbutil.get_signal_number('SIGTRAP')) 1362 1363 single_step_count += 1 1364 1365 # See if the predicate is true. If so, we're done. 1366 if predicate(args): 1367 return (True, single_step_count) 1368 1369 # The predicate didn't return true within the runaway step count. 1370 return (False, single_step_count) 1371 1372 def g_c1_c2_contents_are(self, args): 1373 """Used by single step test that appears in a few different contexts.""" 1374 g_c1_address = args["g_c1_address"] 1375 g_c2_address = args["g_c2_address"] 1376 expected_g_c1 = args["expected_g_c1"] 1377 expected_g_c2 = args["expected_g_c2"] 1378 1379 # Read g_c1 and g_c2 contents. 1380 self.reset_test_sequence() 1381 self.test_sequence.add_log_lines( 1382 ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1), 1383 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}}, 1384 "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1), 1385 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}], 1386 True) 1387 1388 # Run the packet stream. 1389 context = self.expect_gdbremote_sequence() 1390 self.assertIsNotNone(context) 1391 1392 # Check if what we read from inferior memory is what we are expecting. 1393 self.assertIsNotNone(context.get("g_c1_contents")) 1394 self.assertIsNotNone(context.get("g_c2_contents")) 1395 1396 return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and ( 1397 seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2) 1398 1399 def single_step_only_steps_one_instruction( 1400 self, use_Hc_packet=True, step_instruction="s"): 1401 """Used by single step test that appears in a few different contexts.""" 1402 # Start up the inferior. 1403 procs = self.prep_debug_monitor_and_inferior( 1404 inferior_args=[ 1405 "get-code-address-hex:swap_chars", 1406 "get-data-address-hex:g_c1", 1407 "get-data-address-hex:g_c2", 1408 "sleep:1", 1409 "call-function:swap_chars", 1410 "sleep:5"]) 1411 1412 # Run the process 1413 self.test_sequence.add_log_lines( 1414 [ # Start running after initial stop. 1415 "read packet: $c#63", 1416 # Match output line that prints the memory address of the function call entry point. 1417 # Note we require launch-only testing so we can get inferior otuput. 1418 {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", 1419 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}}, 1420 # Now stop the inferior. 1421 "read packet: {}".format(chr(3)), 1422 # And wait for the stop notification. 1423 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 1424 True) 1425 1426 # Run the packet stream. 1427 context = self.expect_gdbremote_sequence() 1428 self.assertIsNotNone(context) 1429 1430 # Grab the main thread id. 1431 self.assertIsNotNone(context.get("stop_thread_id")) 1432 main_thread_id = int(context.get("stop_thread_id"), 16) 1433 1434 # Grab the function address. 1435 self.assertIsNotNone(context.get("function_address")) 1436 function_address = int(context.get("function_address"), 16) 1437 1438 # Grab the data addresses. 1439 self.assertIsNotNone(context.get("g_c1_address")) 1440 g_c1_address = int(context.get("g_c1_address"), 16) 1441 1442 self.assertIsNotNone(context.get("g_c2_address")) 1443 g_c2_address = int(context.get("g_c2_address"), 16) 1444 1445 # Set a breakpoint at the given address. 1446 if self.getArchitecture().startswith("arm"): 1447 # TODO: Handle case when setting breakpoint in thumb code 1448 BREAKPOINT_KIND = 4 1449 else: 1450 BREAKPOINT_KIND = 1 1451 self.reset_test_sequence() 1452 self.add_set_breakpoint_packets( 1453 function_address, 1454 do_continue=True, 1455 breakpoint_kind=BREAKPOINT_KIND) 1456 context = self.expect_gdbremote_sequence() 1457 self.assertIsNotNone(context) 1458 1459 # Remove the breakpoint. 1460 self.reset_test_sequence() 1461 self.add_remove_breakpoint_packets( 1462 function_address, breakpoint_kind=BREAKPOINT_KIND) 1463 context = self.expect_gdbremote_sequence() 1464 self.assertIsNotNone(context) 1465 1466 # Verify g_c1 and g_c2 match expected initial state. 1467 args = {} 1468 args["g_c1_address"] = g_c1_address 1469 args["g_c2_address"] = g_c2_address 1470 args["expected_g_c1"] = "0" 1471 args["expected_g_c2"] = "1" 1472 1473 self.assertTrue(self.g_c1_c2_contents_are(args)) 1474 1475 # Verify we take only a small number of steps to hit the first state. 1476 # Might need to work through function entry prologue code. 1477 args["expected_g_c1"] = "1" 1478 args["expected_g_c2"] = "1" 1479 (state_reached, 1480 step_count) = self.count_single_steps_until_true(main_thread_id, 1481 self.g_c1_c2_contents_are, 1482 args, 1483 max_step_count=25, 1484 use_Hc_packet=use_Hc_packet, 1485 step_instruction=step_instruction) 1486 self.assertTrue(state_reached) 1487 1488 # Verify we hit the next state. 1489 args["expected_g_c1"] = "1" 1490 args["expected_g_c2"] = "0" 1491 (state_reached, 1492 step_count) = self.count_single_steps_until_true(main_thread_id, 1493 self.g_c1_c2_contents_are, 1494 args, 1495 max_step_count=5, 1496 use_Hc_packet=use_Hc_packet, 1497 step_instruction=step_instruction) 1498 self.assertTrue(state_reached) 1499 expected_step_count = 1 1500 arch = self.getArchitecture() 1501 1502 # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation 1503 # of variable value 1504 if re.match("mips", arch): 1505 expected_step_count = 3 1506 # S390X requires "2" (LARL, MVI) machine instructions for updation of 1507 # variable value 1508 if re.match("s390x", arch): 1509 expected_step_count = 2 1510 # ARM64 requires "4" instructions: 2 to compute the address (adrp, 1511 # add), one to materialize the constant (mov) and the store. Once 1512 # addresses and constants are materialized, only one instruction is 1513 # needed. 1514 if re.match("arm64", arch): 1515 before_materialization_step_count = 4 1516 after_matrialization_step_count = 1 1517 self.assertIn(step_count, [before_materialization_step_count, 1518 after_matrialization_step_count]) 1519 expected_step_count = after_matrialization_step_count 1520 else: 1521 self.assertEqual(step_count, expected_step_count) 1522 1523 # Verify we hit the next state. 1524 args["expected_g_c1"] = "0" 1525 args["expected_g_c2"] = "0" 1526 (state_reached, 1527 step_count) = self.count_single_steps_until_true(main_thread_id, 1528 self.g_c1_c2_contents_are, 1529 args, 1530 max_step_count=5, 1531 use_Hc_packet=use_Hc_packet, 1532 step_instruction=step_instruction) 1533 self.assertTrue(state_reached) 1534 self.assertEqual(step_count, expected_step_count) 1535 1536 # Verify we hit the next state. 1537 args["expected_g_c1"] = "0" 1538 args["expected_g_c2"] = "1" 1539 (state_reached, 1540 step_count) = self.count_single_steps_until_true(main_thread_id, 1541 self.g_c1_c2_contents_are, 1542 args, 1543 max_step_count=5, 1544 use_Hc_packet=use_Hc_packet, 1545 step_instruction=step_instruction) 1546 self.assertTrue(state_reached) 1547 self.assertEqual(step_count, expected_step_count) 1548 1549 def maybe_strict_output_regex(self, regex): 1550 return '.*' + regex + \ 1551 '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$' 1552 1553 def install_and_create_launch_args(self): 1554 exe_path = self.getBuildArtifact("a.out") 1555 if not lldb.remote_platform: 1556 return [exe_path] 1557 remote_path = lldbutil.append_to_process_working_directory(self, 1558 os.path.basename(exe_path)) 1559 remote_file_spec = lldb.SBFileSpec(remote_path, False) 1560 err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True), 1561 remote_file_spec) 1562 if err.Fail(): 1563 raise Exception("remote_platform.Install('%s', '%s') failed: %s" % 1564 (exe_path, remote_path, err)) 1565 return [remote_path] 1566