1"""
2Base class for gdb-remote test cases.
3"""
4
5from __future__ import division, print_function
6
7
8import errno
9import os
10import os.path
11import random
12import re
13import select
14import socket
15import subprocess
16import sys
17import tempfile
18import time
19from lldbsuite.test import configuration
20from lldbsuite.test.lldbtest import *
21from lldbsuite.support import seven
22from lldbgdbserverutils import *
23import logging
24
25
26class _ConnectionRefused(IOError):
27    pass
28
29
30class GdbRemoteTestCaseFactory(type):
31
32    def __new__(cls, name, bases, attrs):
33        newattrs = {}
34        for attrname, attrvalue in attrs.items():
35            if not attrname.startswith("test"):
36                newattrs[attrname] = attrvalue
37                continue
38
39            # If any debug server categories were explicitly tagged, assume
40            # that list to be authoritative. If none were specified, try
41            # all of them.
42            all_categories = set(["debugserver", "llgs"])
43            categories = set(
44                getattr(attrvalue, "categories", [])) & all_categories
45            if not categories:
46                categories = all_categories
47
48            for cat in categories:
49                @decorators.add_test_categories([cat])
50                @wraps(attrvalue)
51                def test_method(self, attrvalue=attrvalue):
52                    return attrvalue(self)
53
54                method_name = attrname + "_" + cat
55                test_method.__name__ = method_name
56                test_method.debug_server = cat
57                newattrs[method_name] = test_method
58
59        return super(GdbRemoteTestCaseFactory, cls).__new__(
60                cls, name, bases, newattrs)
61
62@add_metaclass(GdbRemoteTestCaseFactory)
63class GdbRemoteTestCaseBase(Base):
64
65    # Default time out in seconds. The timeout is increased tenfold under Asan.
66    DEFAULT_TIMEOUT =  20 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
67    # Default sleep time in seconds. The sleep time is doubled under Asan.
68    DEFAULT_SLEEP   =  5  * (2  if ('ASAN_OPTIONS' in os.environ) else 1)
69
70    _GDBREMOTE_KILL_PACKET = b"$k#6b"
71
72    # Start the inferior separately, attach to the inferior on the stub
73    # command line.
74    _STARTUP_ATTACH = "attach"
75    # Start the inferior separately, start the stub without attaching, allow
76    # the test to attach to the inferior however it wants (e.g. $vAttach;pid).
77    _STARTUP_ATTACH_MANUALLY = "attach_manually"
78    # Start the stub, and launch the inferior with an $A packet via the
79    # initial packet stream.
80    _STARTUP_LAUNCH = "launch"
81
82    # GDB Signal numbers that are not target-specific used for common
83    # exceptions
84    TARGET_EXC_BAD_ACCESS = 0x91
85    TARGET_EXC_BAD_INSTRUCTION = 0x92
86    TARGET_EXC_ARITHMETIC = 0x93
87    TARGET_EXC_EMULATION = 0x94
88    TARGET_EXC_SOFTWARE = 0x95
89    TARGET_EXC_BREAKPOINT = 0x96
90
91    _verbose_log_handler = None
92    _log_formatter = logging.Formatter(
93        fmt='%(asctime)-15s %(levelname)-8s %(message)s')
94
95    def setUpBaseLogging(self):
96        self.logger = logging.getLogger(__name__)
97
98        if len(self.logger.handlers) > 0:
99            return  # We have set up this handler already
100
101        self.logger.propagate = False
102        self.logger.setLevel(logging.DEBUG)
103
104        # log all warnings to stderr
105        handler = logging.StreamHandler()
106        handler.setLevel(logging.WARNING)
107        handler.setFormatter(self._log_formatter)
108        self.logger.addHandler(handler)
109
110    def isVerboseLoggingRequested(self):
111        # We will report our detailed logs if the user requested that the "gdb-remote" channel is
112        # logged.
113        return any(("gdb-remote" in channel)
114                   for channel in lldbtest_config.channels)
115
116    def getDebugServer(self):
117        method = getattr(self, self.testMethodName)
118        return getattr(method, "debug_server", None)
119
120    def setUp(self):
121        super(GdbRemoteTestCaseBase, self).setUp()
122
123        self.setUpBaseLogging()
124        self.debug_monitor_extra_args = []
125
126        if self.isVerboseLoggingRequested():
127            # If requested, full logs go to a log file
128            self._verbose_log_handler = logging.FileHandler(
129                self.getLogBasenameForCurrentTest() + "-host.log")
130            self._verbose_log_handler.setFormatter(self._log_formatter)
131            self._verbose_log_handler.setLevel(logging.DEBUG)
132            self.logger.addHandler(self._verbose_log_handler)
133
134        self.test_sequence = GdbRemoteTestSequence(self.logger)
135        self.set_inferior_startup_launch()
136        self.port = self.get_next_port()
137        self.stub_sends_two_stop_notifications_on_kill = False
138        if configuration.lldb_platform_url:
139            if configuration.lldb_platform_url.startswith('unix-'):
140                url_pattern = '(.+)://\[?(.+?)\]?/.*'
141            else:
142                url_pattern = '(.+)://(.+):\d+'
143            scheme, host = re.match(
144                url_pattern, configuration.lldb_platform_url).groups()
145            if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
146                self.stub_device = host
147                self.stub_hostname = 'localhost'
148            else:
149                self.stub_device = None
150                self.stub_hostname = host
151        else:
152            self.stub_hostname = "localhost"
153
154        debug_server = self.getDebugServer()
155        if debug_server == "debugserver":
156            self._init_debugserver_test()
157        else:
158            self._init_llgs_test()
159
160    def tearDown(self):
161        self.logger.removeHandler(self._verbose_log_handler)
162        self._verbose_log_handler = None
163        TestBase.tearDown(self)
164
165    def getLocalServerLogFile(self):
166        return self.getLogBasenameForCurrentTest() + "-server.log"
167
168    def setUpServerLogging(self, is_llgs):
169        if len(lldbtest_config.channels) == 0:
170            return  # No logging requested
171
172        if lldb.remote_platform:
173            log_file = lldbutil.join_remote_paths(
174                lldb.remote_platform.GetWorkingDirectory(), "server.log")
175        else:
176            log_file = self.getLocalServerLogFile()
177
178        if is_llgs:
179            self.debug_monitor_extra_args.append("--log-file=" + log_file)
180            self.debug_monitor_extra_args.append(
181                "--log-channels={}".format(":".join(lldbtest_config.channels)))
182        else:
183            self.debug_monitor_extra_args = [
184                "--log-file=" + log_file, "--log-flags=0x800000"]
185
186    def get_next_port(self):
187        return 12000 + random.randint(0, 3999)
188
189    def reset_test_sequence(self):
190        self.test_sequence = GdbRemoteTestSequence(self.logger)
191
192
193    def _init_llgs_test(self):
194        reverse_connect = True
195        if lldb.remote_platform:
196            # Reverse connections may be tricky due to firewalls/NATs.
197            reverse_connect = False
198
199            # FIXME: This is extremely linux-oriented
200
201            # Grab the ppid from /proc/[shell pid]/stat
202            err, retcode, shell_stat = self.run_platform_command(
203                "cat /proc/$$/stat")
204            self.assertTrue(
205                err.Success() and retcode == 0,
206                "Failed to read file /proc/$$/stat: %s, retcode: %d" %
207                (err.GetCString(),
208                 retcode))
209
210            # [pid] ([executable]) [state] [*ppid*]
211            pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
212            err, retcode, ls_output = self.run_platform_command(
213                "ls -l /proc/%s/exe" % pid)
214            self.assertTrue(
215                err.Success() and retcode == 0,
216                "Failed to read file /proc/%s/exe: %s, retcode: %d" %
217                (pid,
218                 err.GetCString(),
219                 retcode))
220            exe = ls_output.split()[-1]
221
222            # If the binary has been deleted, the link name has " (deleted)" appended.
223            # Remove if it's there.
224            self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe)
225        else:
226            self.debug_monitor_exe = get_lldb_server_exe()
227
228        self.debug_monitor_extra_args = ["gdbserver"]
229        self.setUpServerLogging(is_llgs=True)
230
231        self.reverse_connect = reverse_connect
232
233    def _init_debugserver_test(self):
234        self.debug_monitor_exe = get_debugserver_exe()
235        self.setUpServerLogging(is_llgs=False)
236        self.reverse_connect = True
237
238        # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
239        # when the process truly dies.
240        self.stub_sends_two_stop_notifications_on_kill = True
241
242    def forward_adb_port(self, source, target, direction, device):
243        adb = ['adb'] + (['-s', device] if device else []) + [direction]
244
245        def remove_port_forward():
246            subprocess.call(adb + ["--remove", "tcp:%d" % source])
247
248        subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target])
249        self.addTearDownHook(remove_port_forward)
250
251    def _verify_socket(self, sock):
252        # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
253        # connect() attempt. However, due to the way how ADB forwarding works, on android targets
254        # the connect() will always be successful, but the connection will be immediately dropped
255        # if ADB could not connect on the remote side. This function tries to detect this
256        # situation, and report it as "connection refused" so that the upper layers attempt the
257        # connection again.
258        triple = self.dbg.GetSelectedPlatform().GetTriple()
259        if not re.match(".*-.*-.*-android", triple):
260            return  # Not android.
261        can_read, _, _ = select.select([sock], [], [], 0.1)
262        if sock not in can_read:
263            return  # Data is not available, but the connection is alive.
264        if len(sock.recv(1, socket.MSG_PEEK)) == 0:
265            raise _ConnectionRefused()  # Got EOF, connection dropped.
266
267    def create_socket(self):
268        try:
269            sock = socket.socket(family=socket.AF_INET)
270        except OSError as e:
271            if e.errno != errno.EAFNOSUPPORT:
272                raise
273            sock = socket.socket(family=socket.AF_INET6)
274
275        logger = self.logger
276
277        triple = self.dbg.GetSelectedPlatform().GetTriple()
278        if re.match(".*-.*-.*-android", triple):
279            self.forward_adb_port(
280                self.port,
281                self.port,
282                "forward",
283                self.stub_device)
284
285        logger.info(
286            "Connecting to debug monitor on %s:%d",
287            self.stub_hostname,
288            self.port)
289        connect_info = (self.stub_hostname, self.port)
290        try:
291            sock.connect(connect_info)
292        except socket.error as serr:
293            if serr.errno == errno.ECONNREFUSED:
294                raise _ConnectionRefused()
295            raise serr
296
297        def shutdown_socket():
298            if sock:
299                try:
300                    # send the kill packet so lldb-server shuts down gracefully
301                    sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
302                except:
303                    logger.warning(
304                        "failed to send kill packet to debug monitor: {}; ignoring".format(
305                            sys.exc_info()[0]))
306
307                try:
308                    sock.close()
309                except:
310                    logger.warning(
311                        "failed to close socket to debug monitor: {}; ignoring".format(
312                            sys.exc_info()[0]))
313
314        self.addTearDownHook(shutdown_socket)
315
316        self._verify_socket(sock)
317
318        return sock
319
320    def set_inferior_startup_launch(self):
321        self._inferior_startup = self._STARTUP_LAUNCH
322
323    def set_inferior_startup_attach(self):
324        self._inferior_startup = self._STARTUP_ATTACH
325
326    def set_inferior_startup_attach_manually(self):
327        self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
328
329    def get_debug_monitor_command_line_args(self, attach_pid=None):
330        commandline_args = self.debug_monitor_extra_args
331        if attach_pid:
332            commandline_args += ["--attach=%d" % attach_pid]
333        if self.reverse_connect:
334            commandline_args += ["--reverse-connect", self.connect_address]
335        else:
336            if lldb.remote_platform:
337                commandline_args += ["*:{}".format(self.port)]
338            else:
339                commandline_args += ["localhost:{}".format(self.port)]
340
341        return commandline_args
342
343    def get_target_byte_order(self):
344        inferior_exe_path = self.getBuildArtifact("a.out")
345        target = self.dbg.CreateTarget(inferior_exe_path)
346        return target.GetByteOrder()
347
348    def launch_debug_monitor(self, attach_pid=None, logfile=None):
349        if self.reverse_connect:
350            family, type, proto, _, addr = socket.getaddrinfo("localhost", 0, proto=socket.IPPROTO_TCP)[0]
351            sock = socket.socket(family, type, proto)
352            sock.settimeout(self.DEFAULT_TIMEOUT)
353
354            sock.bind(addr)
355            sock.listen(1)
356            addr = sock.getsockname()
357            self.connect_address = "[{}]:{}".format(*addr)
358
359
360        # Create the command line.
361        commandline_args = self.get_debug_monitor_command_line_args(
362            attach_pid=attach_pid)
363
364        # Start the server.
365        server = self.spawnSubprocess(
366            self.debug_monitor_exe,
367            commandline_args,
368            install_remote=False)
369        self.assertIsNotNone(server)
370
371        if self.reverse_connect:
372            self.sock = sock.accept()[0]
373            self.sock.settimeout(self.DEFAULT_TIMEOUT)
374
375        return server
376
377    def connect_to_debug_monitor(self, attach_pid=None):
378        if self.reverse_connect:
379            # Create the stub.
380            server = self.launch_debug_monitor(attach_pid=attach_pid)
381            self.assertIsNotNone(server)
382
383            # Schedule debug monitor to be shut down during teardown.
384            logger = self.logger
385
386            self._server = Server(self.sock, server)
387            return server
388
389        # We're using a random port algorithm to try not to collide with other ports,
390        # and retry a max # times.
391        attempts = 0
392        MAX_ATTEMPTS = 20
393
394        while attempts < MAX_ATTEMPTS:
395            server = self.launch_debug_monitor(attach_pid=attach_pid)
396
397            # Schedule debug monitor to be shut down during teardown.
398            logger = self.logger
399
400            connect_attemps = 0
401            MAX_CONNECT_ATTEMPTS = 10
402
403            while connect_attemps < MAX_CONNECT_ATTEMPTS:
404                # Create a socket to talk to the server
405                try:
406                    logger.info("Connect attempt %d", connect_attemps + 1)
407                    self.sock = self.create_socket()
408                    self._server = Server(self.sock, server)
409                    return server
410                except _ConnectionRefused as serr:
411                    # Ignore, and try again.
412                    pass
413                time.sleep(0.5)
414                connect_attemps += 1
415
416            # We should close the server here to be safe.
417            server.terminate()
418
419            # Increment attempts.
420            print(
421                "connect to debug monitor on port %d failed, attempt #%d of %d" %
422                (self.port, attempts + 1, MAX_ATTEMPTS))
423            attempts += 1
424
425            # And wait a random length of time before next attempt, to avoid
426            # collisions.
427            time.sleep(random.randint(1, 5))
428
429            # Now grab a new port number.
430            self.port = self.get_next_port()
431
432        raise Exception(
433            "failed to create a socket to the launched debug monitor after %d tries" %
434            attempts)
435
436    def launch_process_for_attach(
437            self,
438            inferior_args=None,
439            sleep_seconds=3,
440            exe_path=None):
441        # We're going to start a child process that the debug monitor stub can later attach to.
442        # This process needs to be started so that it just hangs around for a while.  We'll
443        # have it sleep.
444        if not exe_path:
445            exe_path = self.getBuildArtifact("a.out")
446
447        args = []
448        if inferior_args:
449            args.extend(inferior_args)
450        if sleep_seconds:
451            args.append("sleep:%d" % sleep_seconds)
452
453        return self.spawnSubprocess(exe_path, args)
454
455    def prep_debug_monitor_and_inferior(
456            self,
457            inferior_args=None,
458            inferior_sleep_seconds=3,
459            inferior_exe_path=None,
460            inferior_env=None):
461        """Prep the debug monitor, the inferior, and the expected packet stream.
462
463        Handle the separate cases of using the debug monitor in attach-to-inferior mode
464        and in launch-inferior mode.
465
466        For attach-to-inferior mode, the inferior process is first started, then
467        the debug monitor is started in attach to pid mode (using --attach on the
468        stub command line), and the no-ack-mode setup is appended to the packet
469        stream.  The packet stream is not yet executed, ready to have more expected
470        packet entries added to it.
471
472        For launch-inferior mode, the stub is first started, then no ack mode is
473        setup on the expected packet stream, then the verified launch packets are added
474        to the expected socket stream.  The packet stream is not yet executed, ready
475        to have more expected packet entries added to it.
476
477        The return value is:
478        {inferior:<inferior>, server:<server>}
479        """
480        inferior = None
481        attach_pid = None
482
483        if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
484            # Launch the process that we'll use as the inferior.
485            inferior = self.launch_process_for_attach(
486                inferior_args=inferior_args,
487                sleep_seconds=inferior_sleep_seconds,
488                exe_path=inferior_exe_path)
489            self.assertIsNotNone(inferior)
490            self.assertTrue(inferior.pid > 0)
491            if self._inferior_startup == self._STARTUP_ATTACH:
492                # In this case, we want the stub to attach via the command
493                # line, so set the command line attach pid here.
494                attach_pid = inferior.pid
495
496        if self._inferior_startup == self._STARTUP_LAUNCH:
497            # Build launch args
498            if not inferior_exe_path:
499                inferior_exe_path = self.getBuildArtifact("a.out")
500
501            if lldb.remote_platform:
502                remote_path = lldbutil.append_to_process_working_directory(self,
503                    os.path.basename(inferior_exe_path))
504                remote_file_spec = lldb.SBFileSpec(remote_path, False)
505                err = lldb.remote_platform.Install(lldb.SBFileSpec(
506                    inferior_exe_path, True), remote_file_spec)
507                if err.Fail():
508                    raise Exception(
509                        "remote_platform.Install('%s', '%s') failed: %s" %
510                        (inferior_exe_path, remote_path, err))
511                inferior_exe_path = remote_path
512
513            launch_args = [inferior_exe_path]
514            if inferior_args:
515                launch_args.extend(inferior_args)
516
517        # Launch the debug monitor stub, attaching to the inferior.
518        server = self.connect_to_debug_monitor(attach_pid=attach_pid)
519        self.assertIsNotNone(server)
520
521        self.do_handshake()
522
523        # Build the expected protocol stream
524        if inferior_env:
525            for name, value in inferior_env.items():
526                self.add_set_environment_packets(name, value)
527        if self._inferior_startup == self._STARTUP_LAUNCH:
528            self.add_verified_launch_packets(launch_args)
529
530        return {"inferior": inferior, "server": server}
531
532    def do_handshake(self):
533        server = self._server
534        server.send_ack()
535        server.send_packet(b"QStartNoAckMode")
536        self.assertEqual(server.get_normal_packet(), b"+")
537        self.assertEqual(server.get_normal_packet(), b"OK")
538        server.send_ack()
539
540    def add_verified_launch_packets(self, launch_args):
541        self.test_sequence.add_log_lines(
542            ["read packet: %s" % build_gdbremote_A_packet(launch_args),
543             "send packet: $OK#00",
544             "read packet: $qLaunchSuccess#a5",
545             "send packet: $OK#00"],
546            True)
547
548    def add_thread_suffix_request_packets(self):
549        self.test_sequence.add_log_lines(
550            ["read packet: $QThreadSuffixSupported#e4",
551             "send packet: $OK#00",
552             ], True)
553
554    def add_process_info_collection_packets(self):
555        self.test_sequence.add_log_lines(
556            ["read packet: $qProcessInfo#dc",
557             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}],
558            True)
559
560    def add_set_environment_packets(self, name, value):
561        self.test_sequence.add_log_lines(
562            ["read packet: $QEnvironment:" + name + "=" + value + "#00",
563             "send packet: $OK#00",
564             ], True)
565
566    _KNOWN_PROCESS_INFO_KEYS = [
567        "pid",
568        "parent-pid",
569        "real-uid",
570        "real-gid",
571        "effective-uid",
572        "effective-gid",
573        "cputype",
574        "cpusubtype",
575        "ostype",
576        "triple",
577        "vendor",
578        "endian",
579        "elf_abi",
580        "ptrsize"
581    ]
582
583    def parse_process_info_response(self, context):
584        # Ensure we have a process info response.
585        self.assertIsNotNone(context)
586        process_info_raw = context.get("process_info_raw")
587        self.assertIsNotNone(process_info_raw)
588
589        # Pull out key:value; pairs.
590        process_info_dict = {
591            match.group(1): match.group(2) for match in re.finditer(
592                r"([^:]+):([^;]+);", process_info_raw)}
593
594        # Validate keys are known.
595        for (key, val) in list(process_info_dict.items()):
596            self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
597            self.assertIsNotNone(val)
598
599        return process_info_dict
600
601    def add_register_info_collection_packets(self):
602        self.test_sequence.add_log_lines(
603            [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True,
604                "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
605                "save_key": "reg_info_responses"}],
606            True)
607
608    def parse_register_info_packets(self, context):
609        """Return an array of register info dictionaries, one per register info."""
610        reg_info_responses = context.get("reg_info_responses")
611        self.assertIsNotNone(reg_info_responses)
612
613        # Parse register infos.
614        return [parse_reg_info_response(reg_info_response)
615                for reg_info_response in reg_info_responses]
616
617    def expect_gdbremote_sequence(self):
618        return expect_lldb_gdbserver_replay(
619            self,
620            self._server,
621            self.test_sequence,
622            self.DEFAULT_TIMEOUT * len(self.test_sequence),
623            self.logger)
624
625    _KNOWN_REGINFO_KEYS = [
626        "name",
627        "alt-name",
628        "bitsize",
629        "offset",
630        "encoding",
631        "format",
632        "set",
633        "gcc",
634        "ehframe",
635        "dwarf",
636        "generic",
637        "container-regs",
638        "invalidate-regs",
639        "dynamic_size_dwarf_expr_bytes",
640        "dynamic_size_dwarf_len"
641    ]
642
643    def assert_valid_reg_info(self, reg_info):
644        # Assert we know about all the reginfo keys parsed.
645        for key in reg_info:
646            self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
647
648        # Check the bare-minimum expected set of register info keys.
649        self.assertTrue("name" in reg_info)
650        self.assertTrue("bitsize" in reg_info)
651
652        if not self.getArchitecture() == 'aarch64':
653            self.assertTrue("offset" in reg_info)
654
655        self.assertTrue("encoding" in reg_info)
656        self.assertTrue("format" in reg_info)
657
658    def find_pc_reg_info(self, reg_infos):
659        lldb_reg_index = 0
660        for reg_info in reg_infos:
661            if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
662                return (lldb_reg_index, reg_info)
663            lldb_reg_index += 1
664
665        return (None, None)
666
667    def add_lldb_register_index(self, reg_infos):
668        """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
669
670        We'll use this when we want to call packets like P/p with a register index but do so
671        on only a subset of the full register info set.
672        """
673        self.assertIsNotNone(reg_infos)
674
675        reg_index = 0
676        for reg_info in reg_infos:
677            reg_info["lldb_register_index"] = reg_index
678            reg_index += 1
679
680    def add_query_memory_region_packets(self, address):
681        self.test_sequence.add_log_lines(
682            ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
683             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}],
684            True)
685
686    def parse_key_val_dict(self, key_val_text, allow_dupes=True):
687        self.assertIsNotNone(key_val_text)
688        kv_dict = {}
689        for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
690            key = match.group(1)
691            val = match.group(2)
692            if key in kv_dict:
693                if allow_dupes:
694                    if isinstance(kv_dict[key], list):
695                        kv_dict[key].append(val)
696                    else:
697                        # Promote to list
698                        kv_dict[key] = [kv_dict[key], val]
699                else:
700                    self.fail(
701                        "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
702                            key, val, key_val_text, kv_dict))
703            else:
704                kv_dict[key] = val
705        return kv_dict
706
707    def parse_memory_region_packet(self, context):
708        # Ensure we have a context.
709        self.assertIsNotNone(context.get("memory_region_response"))
710
711        # Pull out key:value; pairs.
712        mem_region_dict = self.parse_key_val_dict(
713            context.get("memory_region_response"))
714
715        # Validate keys are known.
716        for (key, val) in list(mem_region_dict.items()):
717            self.assertIn(key,
718                ["start",
719                 "size",
720                 "permissions",
721                 "flags",
722                 "name",
723                 "error",
724                 "dirty-pages",
725                 "type"])
726            self.assertIsNotNone(val)
727
728        mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", ""))
729        # Return the dictionary of key-value pairs for the memory region.
730        return mem_region_dict
731
732    def assert_address_within_memory_region(
733            self, test_address, mem_region_dict):
734        self.assertIsNotNone(mem_region_dict)
735        self.assertTrue("start" in mem_region_dict)
736        self.assertTrue("size" in mem_region_dict)
737
738        range_start = int(mem_region_dict["start"], 16)
739        range_size = int(mem_region_dict["size"], 16)
740        range_end = range_start + range_size
741
742        if test_address < range_start:
743            self.fail(
744                "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
745                    test_address,
746                    range_start,
747                    range_end,
748                    range_size))
749        elif test_address >= range_end:
750            self.fail(
751                "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
752                    test_address,
753                    range_start,
754                    range_end,
755                    range_size))
756
757    def add_threadinfo_collection_packets(self):
758        self.test_sequence.add_log_lines(
759            [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo",
760                "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
761                "save_key": "threadinfo_responses"}],
762            True)
763
764    def parse_threadinfo_packets(self, context):
765        """Return an array of thread ids (decimal ints), one per thread."""
766        threadinfo_responses = context.get("threadinfo_responses")
767        self.assertIsNotNone(threadinfo_responses)
768
769        thread_ids = []
770        for threadinfo_response in threadinfo_responses:
771            new_thread_infos = parse_threadinfo_response(threadinfo_response)
772            thread_ids.extend(new_thread_infos)
773        return thread_ids
774
775    def launch_with_threads(self, thread_count):
776        procs = self.prep_debug_monitor_and_inferior(
777                inferior_args=["thread:new"]*(thread_count-1) + ["trap"])
778
779        self.test_sequence.add_log_lines([
780                "read packet: $c#00",
781                {"direction": "send",
782                    "regex": r"^\$T([0-9a-fA-F]{2})([^#]*)#..$",
783                    "capture": {1: "stop_signo", 2: "stop_reply_kv"}}], True)
784        self.add_threadinfo_collection_packets()
785        context = self.expect_gdbremote_sequence()
786        threads = self.parse_threadinfo_packets(context)
787        self.assertGreaterEqual(len(threads), thread_count)
788        return context, threads
789
790    def add_set_breakpoint_packets(
791            self,
792            address,
793            z_packet_type=0,
794            do_continue=True,
795            breakpoint_kind=1):
796        self.test_sequence.add_log_lines(
797            [  # Set the breakpoint.
798                "read packet: $Z{2},{0:x},{1}#00".format(
799                    address, breakpoint_kind, z_packet_type),
800                # Verify the stub could set it.
801                "send packet: $OK#00",
802            ], True)
803
804        if (do_continue):
805            self.test_sequence.add_log_lines(
806                [  # Continue the inferior.
807                    "read packet: $c#63",
808                    # Expect a breakpoint stop report.
809                    {"direction": "send",
810                     "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
811                     "capture": {1: "stop_signo",
812                                 2: "stop_thread_id"}},
813                ], True)
814
815    def add_remove_breakpoint_packets(
816            self,
817            address,
818            z_packet_type=0,
819            breakpoint_kind=1):
820        self.test_sequence.add_log_lines(
821            [  # Remove the breakpoint.
822                "read packet: $z{2},{0:x},{1}#00".format(
823                    address, breakpoint_kind, z_packet_type),
824                # Verify the stub could unset it.
825                "send packet: $OK#00",
826            ], True)
827
828    def add_qSupported_packets(self, client_features=[]):
829        features = ''.join(';' + x for x in client_features)
830        self.test_sequence.add_log_lines(
831            ["read packet: $qSupported{}#00".format(features),
832             {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}},
833             ], True)
834
835    _KNOWN_QSUPPORTED_STUB_FEATURES = [
836        "augmented-libraries-svr4-read",
837        "PacketSize",
838        "QStartNoAckMode",
839        "QThreadSuffixSupported",
840        "QListThreadsInStopReply",
841        "qXfer:auxv:read",
842        "qXfer:libraries:read",
843        "qXfer:libraries-svr4:read",
844        "qXfer:features:read",
845        "qXfer:siginfo:read",
846        "qEcho",
847        "QPassSignals",
848        "multiprocess",
849        "fork-events",
850        "vfork-events",
851        "memory-tagging",
852        "qSaveCore",
853        "native-signals",
854        "QNonStop",
855    ]
856
857    def parse_qSupported_response(self, context):
858        self.assertIsNotNone(context)
859
860        raw_response = context.get("qSupported_response")
861        self.assertIsNotNone(raw_response)
862
863        # For values with key=val, the dict key and vals are set as expected.  For feature+, feature- and feature?, the
864        # +,-,? is stripped from the key and set as the value.
865        supported_dict = {}
866        for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
867            key = match.group(1)
868            val = match.group(3)
869
870            # key=val: store as is
871            if val and len(val) > 0:
872                supported_dict[key] = val
873            else:
874                if len(key) < 2:
875                    raise Exception(
876                        "singular stub feature is too short: must be stub_feature{+,-,?}")
877                supported_type = key[-1]
878                key = key[:-1]
879                if not supported_type in ["+", "-", "?"]:
880                    raise Exception(
881                        "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
882                supported_dict[key] = supported_type
883            # Ensure we know the supported element
884            if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES:
885                raise Exception(
886                    "unknown qSupported stub feature reported: %s" %
887                    key)
888
889        return supported_dict
890
891    def continue_process_and_wait_for_stop(self):
892        self.test_sequence.add_log_lines(
893            [
894                "read packet: $vCont;c#a8",
895                {
896                    "direction": "send",
897                    "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
898                    "capture": {1: "stop_signo", 2: "stop_key_val_text"},
899                },
900            ],
901            True,
902        )
903        context = self.expect_gdbremote_sequence()
904        self.assertIsNotNone(context)
905        return self.parse_interrupt_packets(context)
906
907    def select_modifiable_register(self, reg_infos):
908        """Find a register that can be read/written freely."""
909        PREFERRED_REGISTER_NAMES = set(["rax", ])
910
911        # First check for the first register from the preferred register name
912        # set.
913        alternative_register_index = None
914
915        self.assertIsNotNone(reg_infos)
916        for reg_info in reg_infos:
917            if ("name" in reg_info) and (
918                    reg_info["name"] in PREFERRED_REGISTER_NAMES):
919                # We found a preferred register.  Use it.
920                return reg_info["lldb_register_index"]
921            if ("generic" in reg_info) and (reg_info["generic"] == "fp" or
922                    reg_info["generic"] == "arg1"):
923                # A frame pointer or first arg register will do as a
924                # register to modify temporarily.
925                alternative_register_index = reg_info["lldb_register_index"]
926
927        # We didn't find a preferred register.  Return whatever alternative register
928        # we found, if any.
929        return alternative_register_index
930
931    def extract_registers_from_stop_notification(self, stop_key_vals_text):
932        self.assertIsNotNone(stop_key_vals_text)
933        kv_dict = self.parse_key_val_dict(stop_key_vals_text)
934
935        registers = {}
936        for (key, val) in list(kv_dict.items()):
937            if re.match(r"^[0-9a-fA-F]+$", key):
938                registers[int(key, 16)] = val
939        return registers
940
941    def gather_register_infos(self):
942        self.reset_test_sequence()
943        self.add_register_info_collection_packets()
944
945        context = self.expect_gdbremote_sequence()
946        self.assertIsNotNone(context)
947
948        reg_infos = self.parse_register_info_packets(context)
949        self.assertIsNotNone(reg_infos)
950        self.add_lldb_register_index(reg_infos)
951
952        return reg_infos
953
954    def find_generic_register_with_name(self, reg_infos, generic_name):
955        self.assertIsNotNone(reg_infos)
956        for reg_info in reg_infos:
957            if ("generic" in reg_info) and (
958                    reg_info["generic"] == generic_name):
959                return reg_info
960        return None
961
962    def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num):
963        self.assertIsNotNone(reg_infos)
964        for reg_info in reg_infos:
965            if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num):
966                return reg_info
967        return None
968
969    def decode_gdbremote_binary(self, encoded_bytes):
970        decoded_bytes = ""
971        i = 0
972        while i < len(encoded_bytes):
973            if encoded_bytes[i] == "}":
974                # Handle escaped char.
975                self.assertTrue(i + 1 < len(encoded_bytes))
976                decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20)
977                i += 2
978            elif encoded_bytes[i] == "*":
979                # Handle run length encoding.
980                self.assertTrue(len(decoded_bytes) > 0)
981                self.assertTrue(i + 1 < len(encoded_bytes))
982                repeat_count = ord(encoded_bytes[i + 1]) - 29
983                decoded_bytes += decoded_bytes[-1] * repeat_count
984                i += 2
985            else:
986                decoded_bytes += encoded_bytes[i]
987                i += 1
988        return decoded_bytes
989
990    def build_auxv_dict(self, endian, word_size, auxv_data):
991        self.assertIsNotNone(endian)
992        self.assertIsNotNone(word_size)
993        self.assertIsNotNone(auxv_data)
994
995        auxv_dict = {}
996
997        # PowerPC64le's auxvec has a special key that must be ignored.
998        # This special key may be used multiple times, resulting in
999        # multiple key/value pairs with the same key, which would otherwise
1000        # break this test check for repeated keys.
1001        #
1002        # AT_IGNOREPPC = 22
1003        ignored_keys_for_arch = { 'powerpc64le' : [22] }
1004        arch = self.getArchitecture()
1005        ignore_keys = None
1006        if arch in ignored_keys_for_arch:
1007            ignore_keys = ignored_keys_for_arch[arch]
1008
1009        while len(auxv_data) > 0:
1010            # Chop off key.
1011            raw_key = auxv_data[:word_size]
1012            auxv_data = auxv_data[word_size:]
1013
1014            # Chop of value.
1015            raw_value = auxv_data[:word_size]
1016            auxv_data = auxv_data[word_size:]
1017
1018            # Convert raw text from target endian.
1019            key = unpack_endian_binary_string(endian, raw_key)
1020            value = unpack_endian_binary_string(endian, raw_value)
1021
1022            if ignore_keys and key in ignore_keys:
1023                continue
1024
1025            # Handle ending entry.
1026            if key == 0:
1027                self.assertEqual(value, 0)
1028                return auxv_dict
1029
1030            # The key should not already be present.
1031            self.assertFalse(key in auxv_dict)
1032            auxv_dict[key] = value
1033
1034        self.fail(
1035            "should not reach here - implies required double zero entry not found")
1036        return auxv_dict
1037
1038    def read_binary_data_in_chunks(self, command_prefix, chunk_length):
1039        """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
1040        offset = 0
1041        done = False
1042        decoded_data = ""
1043
1044        while not done:
1045            # Grab the next iteration of data.
1046            self.reset_test_sequence()
1047            self.test_sequence.add_log_lines(
1048                [
1049                    "read packet: ${}{:x},{:x}:#00".format(
1050                        command_prefix,
1051                        offset,
1052                        chunk_length),
1053                    {
1054                        "direction": "send",
1055                        "regex": re.compile(
1056                            r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
1057                            re.MULTILINE | re.DOTALL),
1058                        "capture": {
1059                            1: "response_type",
1060                            2: "content_raw"}}],
1061                True)
1062
1063            context = self.expect_gdbremote_sequence()
1064            self.assertIsNotNone(context)
1065
1066            response_type = context.get("response_type")
1067            self.assertIsNotNone(response_type)
1068            self.assertTrue(response_type in ["l", "m"])
1069
1070            # Move offset along.
1071            offset += chunk_length
1072
1073            # Figure out if we're done.  We're done if the response type is l.
1074            done = response_type == "l"
1075
1076            # Decode binary data.
1077            content_raw = context.get("content_raw")
1078            if content_raw and len(content_raw) > 0:
1079                self.assertIsNotNone(content_raw)
1080                decoded_data += self.decode_gdbremote_binary(content_raw)
1081        return decoded_data
1082
1083    def add_interrupt_packets(self):
1084        self.test_sequence.add_log_lines([
1085            # Send the intterupt.
1086            "read packet: {}".format(chr(3)),
1087            # And wait for the stop notification.
1088            {"direction": "send",
1089             "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
1090             "capture": {1: "stop_signo",
1091                         2: "stop_key_val_text"}},
1092        ], True)
1093
1094    def parse_interrupt_packets(self, context):
1095        self.assertIsNotNone(context.get("stop_signo"))
1096        self.assertIsNotNone(context.get("stop_key_val_text"))
1097        return (int(context["stop_signo"], 16), self.parse_key_val_dict(
1098            context["stop_key_val_text"]))
1099
1100    def add_QSaveRegisterState_packets(self, thread_id):
1101        if thread_id:
1102            # Use the thread suffix form.
1103            request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(
1104                thread_id)
1105        else:
1106            request = "read packet: $QSaveRegisterState#00"
1107
1108        self.test_sequence.add_log_lines([request,
1109                                          {"direction": "send",
1110                                           "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$",
1111                                           "capture": {1: "save_response"}},
1112                                          ],
1113                                         True)
1114
1115    def parse_QSaveRegisterState_response(self, context):
1116        self.assertIsNotNone(context)
1117
1118        save_response = context.get("save_response")
1119        self.assertIsNotNone(save_response)
1120
1121        if len(save_response) < 1 or save_response[0] == "E":
1122            # error received
1123            return (False, None)
1124        else:
1125            return (True, int(save_response))
1126
1127    def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
1128        if thread_id:
1129            # Use the thread suffix form.
1130            request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
1131                save_id, thread_id)
1132        else:
1133            request = "read packet: $QRestoreRegisterState:{}#00".format(
1134                save_id)
1135
1136        self.test_sequence.add_log_lines([
1137            request,
1138            "send packet: $OK#00"
1139        ], True)
1140
1141    def flip_all_bits_in_each_register_value(
1142            self, reg_infos, endian, thread_id=None):
1143        self.assertIsNotNone(reg_infos)
1144
1145        successful_writes = 0
1146        failed_writes = 0
1147
1148        for reg_info in reg_infos:
1149            # Use the lldb register index added to the reg info.  We're not necessarily
1150            # working off a full set of register infos, so an inferred register
1151            # index could be wrong.
1152            reg_index = reg_info["lldb_register_index"]
1153            self.assertIsNotNone(reg_index)
1154
1155            reg_byte_size = int(reg_info["bitsize"]) // 8
1156            self.assertTrue(reg_byte_size > 0)
1157
1158            # Handle thread suffix.
1159            if thread_id:
1160                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1161                    reg_index, thread_id)
1162            else:
1163                p_request = "read packet: $p{:x}#00".format(reg_index)
1164
1165            # Read the existing value.
1166            self.reset_test_sequence()
1167            self.test_sequence.add_log_lines([
1168                p_request,
1169                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1170            ], True)
1171            context = self.expect_gdbremote_sequence()
1172            self.assertIsNotNone(context)
1173
1174            # Verify the response length.
1175            p_response = context.get("p_response")
1176            self.assertIsNotNone(p_response)
1177            initial_reg_value = unpack_register_hex_unsigned(
1178                endian, p_response)
1179
1180            # Flip the value by xoring with all 1s
1181            all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8)
1182            flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
1183            # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1184
1185            # Handle thread suffix for P.
1186            if thread_id:
1187                P_request = "read packet: $P{:x}={};thread:{:x}#00".format(
1188                    reg_index, pack_register_hex(
1189                        endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
1190            else:
1191                P_request = "read packet: $P{:x}={}#00".format(
1192                    reg_index, pack_register_hex(
1193                        endian, flipped_bits_int, byte_size=reg_byte_size))
1194
1195            # Write the flipped value to the register.
1196            self.reset_test_sequence()
1197            self.test_sequence.add_log_lines([P_request,
1198                                              {"direction": "send",
1199                                               "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
1200                                               "capture": {1: "P_response"}},
1201                                              ],
1202                                             True)
1203            context = self.expect_gdbremote_sequence()
1204            self.assertIsNotNone(context)
1205
1206            # Determine if the write succeeded.  There are a handful of registers that can fail, or partially fail
1207            # (e.g. flags, segment selectors, etc.) due to register value restrictions.  Don't worry about them
1208            # all flipping perfectly.
1209            P_response = context.get("P_response")
1210            self.assertIsNotNone(P_response)
1211            if P_response == "OK":
1212                successful_writes += 1
1213            else:
1214                failed_writes += 1
1215                # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1216
1217            # Read back the register value, ensure it matches the flipped
1218            # value.
1219            if P_response == "OK":
1220                self.reset_test_sequence()
1221                self.test_sequence.add_log_lines([
1222                    p_request,
1223                    {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1224                ], True)
1225                context = self.expect_gdbremote_sequence()
1226                self.assertIsNotNone(context)
1227
1228                verify_p_response_raw = context.get("p_response")
1229                self.assertIsNotNone(verify_p_response_raw)
1230                verify_bits = unpack_register_hex_unsigned(
1231                    endian, verify_p_response_raw)
1232
1233                if verify_bits != flipped_bits_int:
1234                    # Some registers, like mxcsrmask and others, will permute what's written.  Adjust succeed/fail counts.
1235                    # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1236                    successful_writes -= 1
1237                    failed_writes += 1
1238
1239        return (successful_writes, failed_writes)
1240
1241    def is_bit_flippable_register(self, reg_info):
1242        if not reg_info:
1243            return False
1244        if not "set" in reg_info:
1245            return False
1246        if reg_info["set"] != "General Purpose Registers":
1247            return False
1248        if ("container-regs" in reg_info) and (
1249                len(reg_info["container-regs"]) > 0):
1250            # Don't try to bit flip registers contained in another register.
1251            return False
1252        if re.match("^.s$", reg_info["name"]):
1253            # This is a 2-letter register name that ends in "s", like a segment register.
1254            # Don't try to bit flip these.
1255            return False
1256        if re.match("^(c|)psr$", reg_info["name"]):
1257            # This is an ARM program status register; don't flip it.
1258            return False
1259        # Okay, this looks fine-enough.
1260        return True
1261
1262    def read_register_values(self, reg_infos, endian, thread_id=None):
1263        self.assertIsNotNone(reg_infos)
1264        values = {}
1265
1266        for reg_info in reg_infos:
1267            # We append a register index when load reg infos so we can work
1268            # with subsets.
1269            reg_index = reg_info.get("lldb_register_index")
1270            self.assertIsNotNone(reg_index)
1271
1272            # Handle thread suffix.
1273            if thread_id:
1274                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1275                    reg_index, thread_id)
1276            else:
1277                p_request = "read packet: $p{:x}#00".format(reg_index)
1278
1279            # Read it with p.
1280            self.reset_test_sequence()
1281            self.test_sequence.add_log_lines([
1282                p_request,
1283                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1284            ], True)
1285            context = self.expect_gdbremote_sequence()
1286            self.assertIsNotNone(context)
1287
1288            # Convert value from target endian to integral.
1289            p_response = context.get("p_response")
1290            self.assertIsNotNone(p_response)
1291            self.assertTrue(len(p_response) > 0)
1292            self.assertFalse(p_response[0] == "E")
1293
1294            values[reg_index] = unpack_register_hex_unsigned(
1295                endian, p_response)
1296
1297        return values
1298
1299    def add_vCont_query_packets(self):
1300        self.test_sequence.add_log_lines(["read packet: $vCont?#49",
1301                                          {"direction": "send",
1302                                           "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
1303                                           "capture": {2: "vCont_query_response"}},
1304                                          ],
1305                                         True)
1306
1307    def parse_vCont_query_response(self, context):
1308        self.assertIsNotNone(context)
1309        vCont_query_response = context.get("vCont_query_response")
1310
1311        # Handle case of no vCont support at all - in which case the capture
1312        # group will be none or zero length.
1313        if not vCont_query_response or len(vCont_query_response) == 0:
1314            return {}
1315
1316        return {key: 1 for key in vCont_query_response.split(
1317            ";") if key and len(key) > 0}
1318
1319    def count_single_steps_until_true(
1320            self,
1321            thread_id,
1322            predicate,
1323            args,
1324            max_step_count=100,
1325            use_Hc_packet=True,
1326            step_instruction="s"):
1327        """Used by single step test that appears in a few different contexts."""
1328        single_step_count = 0
1329
1330        while single_step_count < max_step_count:
1331            self.assertIsNotNone(thread_id)
1332
1333            # Build the packet for the single step instruction.  We replace
1334            # {thread}, if present, with the thread_id.
1335            step_packet = "read packet: ${}#00".format(
1336                re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
1337            # print("\nstep_packet created: {}\n".format(step_packet))
1338
1339            # Single step.
1340            self.reset_test_sequence()
1341            if use_Hc_packet:
1342                self.test_sequence.add_log_lines(
1343                    [  # Set the continue thread.
1344                        "read packet: $Hc{0:x}#00".format(thread_id),
1345                        "send packet: $OK#00",
1346                    ], True)
1347            self.test_sequence.add_log_lines([
1348                # Single step.
1349                step_packet,
1350                # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1351                # Expect a breakpoint stop report.
1352                {"direction": "send",
1353                 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1354                 "capture": {1: "stop_signo",
1355                             2: "stop_thread_id"}},
1356            ], True)
1357            context = self.expect_gdbremote_sequence()
1358            self.assertIsNotNone(context)
1359            self.assertIsNotNone(context.get("stop_signo"))
1360            self.assertEqual(int(context.get("stop_signo"), 16),
1361                             lldbutil.get_signal_number('SIGTRAP'))
1362
1363            single_step_count += 1
1364
1365            # See if the predicate is true.  If so, we're done.
1366            if predicate(args):
1367                return (True, single_step_count)
1368
1369        # The predicate didn't return true within the runaway step count.
1370        return (False, single_step_count)
1371
1372    def g_c1_c2_contents_are(self, args):
1373        """Used by single step test that appears in a few different contexts."""
1374        g_c1_address = args["g_c1_address"]
1375        g_c2_address = args["g_c2_address"]
1376        expected_g_c1 = args["expected_g_c1"]
1377        expected_g_c2 = args["expected_g_c2"]
1378
1379        # Read g_c1 and g_c2 contents.
1380        self.reset_test_sequence()
1381        self.test_sequence.add_log_lines(
1382            ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
1383             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}},
1384             "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
1385             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}],
1386            True)
1387
1388        # Run the packet stream.
1389        context = self.expect_gdbremote_sequence()
1390        self.assertIsNotNone(context)
1391
1392        # Check if what we read from inferior memory is what we are expecting.
1393        self.assertIsNotNone(context.get("g_c1_contents"))
1394        self.assertIsNotNone(context.get("g_c2_contents"))
1395
1396        return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and (
1397            seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2)
1398
1399    def single_step_only_steps_one_instruction(
1400            self, use_Hc_packet=True, step_instruction="s"):
1401        """Used by single step test that appears in a few different contexts."""
1402        # Start up the inferior.
1403        procs = self.prep_debug_monitor_and_inferior(
1404            inferior_args=[
1405                "get-code-address-hex:swap_chars",
1406                "get-data-address-hex:g_c1",
1407                "get-data-address-hex:g_c2",
1408                "sleep:1",
1409                "call-function:swap_chars",
1410                "sleep:5"])
1411
1412        # Run the process
1413        self.test_sequence.add_log_lines(
1414            [  # Start running after initial stop.
1415                "read packet: $c#63",
1416                # Match output line that prints the memory address of the function call entry point.
1417                # Note we require launch-only testing so we can get inferior otuput.
1418                {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
1419                 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}},
1420                # Now stop the inferior.
1421                "read packet: {}".format(chr(3)),
1422                # And wait for the stop notification.
1423                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
1424            True)
1425
1426        # Run the packet stream.
1427        context = self.expect_gdbremote_sequence()
1428        self.assertIsNotNone(context)
1429
1430        # Grab the main thread id.
1431        self.assertIsNotNone(context.get("stop_thread_id"))
1432        main_thread_id = int(context.get("stop_thread_id"), 16)
1433
1434        # Grab the function address.
1435        self.assertIsNotNone(context.get("function_address"))
1436        function_address = int(context.get("function_address"), 16)
1437
1438        # Grab the data addresses.
1439        self.assertIsNotNone(context.get("g_c1_address"))
1440        g_c1_address = int(context.get("g_c1_address"), 16)
1441
1442        self.assertIsNotNone(context.get("g_c2_address"))
1443        g_c2_address = int(context.get("g_c2_address"), 16)
1444
1445        # Set a breakpoint at the given address.
1446        if self.getArchitecture().startswith("arm"):
1447            # TODO: Handle case when setting breakpoint in thumb code
1448            BREAKPOINT_KIND = 4
1449        else:
1450            BREAKPOINT_KIND = 1
1451        self.reset_test_sequence()
1452        self.add_set_breakpoint_packets(
1453            function_address,
1454            do_continue=True,
1455            breakpoint_kind=BREAKPOINT_KIND)
1456        context = self.expect_gdbremote_sequence()
1457        self.assertIsNotNone(context)
1458
1459        # Remove the breakpoint.
1460        self.reset_test_sequence()
1461        self.add_remove_breakpoint_packets(
1462            function_address, breakpoint_kind=BREAKPOINT_KIND)
1463        context = self.expect_gdbremote_sequence()
1464        self.assertIsNotNone(context)
1465
1466        # Verify g_c1 and g_c2 match expected initial state.
1467        args = {}
1468        args["g_c1_address"] = g_c1_address
1469        args["g_c2_address"] = g_c2_address
1470        args["expected_g_c1"] = "0"
1471        args["expected_g_c2"] = "1"
1472
1473        self.assertTrue(self.g_c1_c2_contents_are(args))
1474
1475        # Verify we take only a small number of steps to hit the first state.
1476        # Might need to work through function entry prologue code.
1477        args["expected_g_c1"] = "1"
1478        args["expected_g_c2"] = "1"
1479        (state_reached,
1480         step_count) = self.count_single_steps_until_true(main_thread_id,
1481                                                          self.g_c1_c2_contents_are,
1482                                                          args,
1483                                                          max_step_count=25,
1484                                                          use_Hc_packet=use_Hc_packet,
1485                                                          step_instruction=step_instruction)
1486        self.assertTrue(state_reached)
1487
1488        # Verify we hit the next state.
1489        args["expected_g_c1"] = "1"
1490        args["expected_g_c2"] = "0"
1491        (state_reached,
1492         step_count) = self.count_single_steps_until_true(main_thread_id,
1493                                                          self.g_c1_c2_contents_are,
1494                                                          args,
1495                                                          max_step_count=5,
1496                                                          use_Hc_packet=use_Hc_packet,
1497                                                          step_instruction=step_instruction)
1498        self.assertTrue(state_reached)
1499        expected_step_count = 1
1500        arch = self.getArchitecture()
1501
1502        # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
1503        # of variable value
1504        if re.match("mips", arch):
1505            expected_step_count = 3
1506        # S390X requires "2" (LARL, MVI) machine instructions for updation of
1507        # variable value
1508        if re.match("s390x", arch):
1509            expected_step_count = 2
1510        # ARM64 requires "4" instructions: 2 to compute the address (adrp,
1511        # add), one to materialize the constant (mov) and the store. Once
1512        # addresses and constants are materialized, only one instruction is
1513        # needed.
1514        if re.match("arm64", arch):
1515            before_materialization_step_count = 4
1516            after_matrialization_step_count = 1
1517            self.assertIn(step_count, [before_materialization_step_count,
1518                                       after_matrialization_step_count])
1519            expected_step_count = after_matrialization_step_count
1520        else:
1521            self.assertEqual(step_count, expected_step_count)
1522
1523        # Verify we hit the next state.
1524        args["expected_g_c1"] = "0"
1525        args["expected_g_c2"] = "0"
1526        (state_reached,
1527         step_count) = self.count_single_steps_until_true(main_thread_id,
1528                                                          self.g_c1_c2_contents_are,
1529                                                          args,
1530                                                          max_step_count=5,
1531                                                          use_Hc_packet=use_Hc_packet,
1532                                                          step_instruction=step_instruction)
1533        self.assertTrue(state_reached)
1534        self.assertEqual(step_count, expected_step_count)
1535
1536        # Verify we hit the next state.
1537        args["expected_g_c1"] = "0"
1538        args["expected_g_c2"] = "1"
1539        (state_reached,
1540         step_count) = self.count_single_steps_until_true(main_thread_id,
1541                                                          self.g_c1_c2_contents_are,
1542                                                          args,
1543                                                          max_step_count=5,
1544                                                          use_Hc_packet=use_Hc_packet,
1545                                                          step_instruction=step_instruction)
1546        self.assertTrue(state_reached)
1547        self.assertEqual(step_count, expected_step_count)
1548
1549    def maybe_strict_output_regex(self, regex):
1550        return '.*' + regex + \
1551            '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$'
1552
1553    def install_and_create_launch_args(self):
1554        exe_path = self.getBuildArtifact("a.out")
1555        if not lldb.remote_platform:
1556            return [exe_path]
1557        remote_path = lldbutil.append_to_process_working_directory(self,
1558            os.path.basename(exe_path))
1559        remote_file_spec = lldb.SBFileSpec(remote_path, False)
1560        err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True),
1561                                           remote_file_spec)
1562        if err.Fail():
1563            raise Exception("remote_platform.Install('%s', '%s') failed: %s" %
1564                            (exe_path, remote_path, err))
1565        return [remote_path]
1566