1"""
2Base class for gdb-remote test cases.
3"""
4
5from __future__ import division, print_function
6
7
8import errno
9import os
10import os.path
11import random
12import re
13import select
14import socket
15import subprocess
16import sys
17import tempfile
18import time
19from lldbsuite.test import configuration
20from lldbsuite.test.lldbtest import *
21from lldbsuite.support import seven
22from lldbgdbserverutils import *
23import logging
24
25
26class _ConnectionRefused(IOError):
27    pass
28
29
30class GdbRemoteTestCaseFactory(type):
31
32    def __new__(cls, name, bases, attrs):
33        newattrs = {}
34        for attrname, attrvalue in attrs.items():
35            if not attrname.startswith("test"):
36                newattrs[attrname] = attrvalue
37                continue
38
39            # If any debug server categories were explicitly tagged, assume
40            # that list to be authoritative. If none were specified, try
41            # all of them.
42            all_categories = set(["debugserver", "llgs"])
43            categories = set(
44                getattr(attrvalue, "categories", [])) & all_categories
45            if not categories:
46                categories = all_categories
47
48            for cat in categories:
49                @decorators.add_test_categories([cat])
50                @wraps(attrvalue)
51                def test_method(self, attrvalue=attrvalue):
52                    return attrvalue(self)
53
54                method_name = attrname + "_" + cat
55                test_method.__name__ = method_name
56                test_method.debug_server = cat
57                newattrs[method_name] = test_method
58
59        return super(GdbRemoteTestCaseFactory, cls).__new__(
60                cls, name, bases, newattrs)
61
62@add_metaclass(GdbRemoteTestCaseFactory)
63class GdbRemoteTestCaseBase(Base):
64
65    # Default time out in seconds. The timeout is increased tenfold under Asan.
66    DEFAULT_TIMEOUT =  20 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
67    # Default sleep time in seconds. The sleep time is doubled under Asan.
68    DEFAULT_SLEEP   =  5  * (2  if ('ASAN_OPTIONS' in os.environ) else 1)
69
70    _GDBREMOTE_KILL_PACKET = b"$k#6b"
71
72    # Start the inferior separately, attach to the inferior on the stub
73    # command line.
74    _STARTUP_ATTACH = "attach"
75    # Start the inferior separately, start the stub without attaching, allow
76    # the test to attach to the inferior however it wants (e.g. $vAttach;pid).
77    _STARTUP_ATTACH_MANUALLY = "attach_manually"
78    # Start the stub, and launch the inferior with an $A packet via the
79    # initial packet stream.
80    _STARTUP_LAUNCH = "launch"
81
82    # GDB Signal numbers that are not target-specific used for common
83    # exceptions
84    TARGET_EXC_BAD_ACCESS = 0x91
85    TARGET_EXC_BAD_INSTRUCTION = 0x92
86    TARGET_EXC_ARITHMETIC = 0x93
87    TARGET_EXC_EMULATION = 0x94
88    TARGET_EXC_SOFTWARE = 0x95
89    TARGET_EXC_BREAKPOINT = 0x96
90
91    _verbose_log_handler = None
92    _log_formatter = logging.Formatter(
93        fmt='%(asctime)-15s %(levelname)-8s %(message)s')
94
95    def setUpBaseLogging(self):
96        self.logger = logging.getLogger(__name__)
97
98        if len(self.logger.handlers) > 0:
99            return  # We have set up this handler already
100
101        self.logger.propagate = False
102        self.logger.setLevel(logging.DEBUG)
103
104        # log all warnings to stderr
105        handler = logging.StreamHandler()
106        handler.setLevel(logging.WARNING)
107        handler.setFormatter(self._log_formatter)
108        self.logger.addHandler(handler)
109
110    def isVerboseLoggingRequested(self):
111        # We will report our detailed logs if the user requested that the "gdb-remote" channel is
112        # logged.
113        return any(("gdb-remote" in channel)
114                   for channel in lldbtest_config.channels)
115
116    def getDebugServer(self):
117        method = getattr(self, self.testMethodName)
118        return getattr(method, "debug_server", None)
119
120    def setUp(self):
121        super(GdbRemoteTestCaseBase, self).setUp()
122
123        self.setUpBaseLogging()
124        self.debug_monitor_extra_args = []
125
126        if self.isVerboseLoggingRequested():
127            # If requested, full logs go to a log file
128            self._verbose_log_handler = logging.FileHandler(
129                self.getLogBasenameForCurrentTest() + "-host.log")
130            self._verbose_log_handler.setFormatter(self._log_formatter)
131            self._verbose_log_handler.setLevel(logging.DEBUG)
132            self.logger.addHandler(self._verbose_log_handler)
133
134        self.test_sequence = GdbRemoteTestSequence(self.logger)
135        self.set_inferior_startup_launch()
136        self.port = self.get_next_port()
137        self.stub_sends_two_stop_notifications_on_kill = False
138        if configuration.lldb_platform_url:
139            if configuration.lldb_platform_url.startswith('unix-'):
140                url_pattern = '(.+)://\[?(.+?)\]?/.*'
141            else:
142                url_pattern = '(.+)://(.+):\d+'
143            scheme, host = re.match(
144                url_pattern, configuration.lldb_platform_url).groups()
145            if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
146                self.stub_device = host
147                self.stub_hostname = 'localhost'
148            else:
149                self.stub_device = None
150                self.stub_hostname = host
151        else:
152            self.stub_hostname = "localhost"
153
154        debug_server = self.getDebugServer()
155        if debug_server == "debugserver":
156            self._init_debugserver_test()
157        else:
158            self._init_llgs_test()
159
160    def tearDown(self):
161        self.logger.removeHandler(self._verbose_log_handler)
162        self._verbose_log_handler = None
163        TestBase.tearDown(self)
164
165    def getLocalServerLogFile(self):
166        return self.getLogBasenameForCurrentTest() + "-server.log"
167
168    def setUpServerLogging(self, is_llgs):
169        if len(lldbtest_config.channels) == 0:
170            return  # No logging requested
171
172        if lldb.remote_platform:
173            log_file = lldbutil.join_remote_paths(
174                lldb.remote_platform.GetWorkingDirectory(), "server.log")
175        else:
176            log_file = self.getLocalServerLogFile()
177
178        if is_llgs:
179            self.debug_monitor_extra_args.append("--log-file=" + log_file)
180            self.debug_monitor_extra_args.append(
181                "--log-channels={}".format(":".join(lldbtest_config.channels)))
182        else:
183            self.debug_monitor_extra_args = [
184                "--log-file=" + log_file, "--log-flags=0x800000"]
185
186    def get_next_port(self):
187        return 12000 + random.randint(0, 3999)
188
189    def reset_test_sequence(self):
190        self.test_sequence = GdbRemoteTestSequence(self.logger)
191
192
193    def _init_llgs_test(self):
194        reverse_connect = True
195        if lldb.remote_platform:
196            # Reverse connections may be tricky due to firewalls/NATs.
197            reverse_connect = False
198
199            # FIXME: This is extremely linux-oriented
200
201            # Grab the ppid from /proc/[shell pid]/stat
202            err, retcode, shell_stat = self.run_platform_command(
203                "cat /proc/$$/stat")
204            self.assertTrue(
205                err.Success() and retcode == 0,
206                "Failed to read file /proc/$$/stat: %s, retcode: %d" %
207                (err.GetCString(),
208                 retcode))
209
210            # [pid] ([executable]) [state] [*ppid*]
211            pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
212            err, retcode, ls_output = self.run_platform_command(
213                "ls -l /proc/%s/exe" % pid)
214            self.assertTrue(
215                err.Success() and retcode == 0,
216                "Failed to read file /proc/%s/exe: %s, retcode: %d" %
217                (pid,
218                 err.GetCString(),
219                 retcode))
220            exe = ls_output.split()[-1]
221
222            # If the binary has been deleted, the link name has " (deleted)" appended.
223            # Remove if it's there.
224            self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe)
225        else:
226            self.debug_monitor_exe = get_lldb_server_exe()
227
228        self.debug_monitor_extra_args = ["gdbserver"]
229        self.setUpServerLogging(is_llgs=True)
230
231        self.reverse_connect = reverse_connect
232
233    def _init_debugserver_test(self):
234        self.debug_monitor_exe = get_debugserver_exe()
235        self.setUpServerLogging(is_llgs=False)
236        self.reverse_connect = True
237
238        # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
239        # when the process truly dies.
240        self.stub_sends_two_stop_notifications_on_kill = True
241
242    def forward_adb_port(self, source, target, direction, device):
243        adb = ['adb'] + (['-s', device] if device else []) + [direction]
244
245        def remove_port_forward():
246            subprocess.call(adb + ["--remove", "tcp:%d" % source])
247
248        subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target])
249        self.addTearDownHook(remove_port_forward)
250
251    def _verify_socket(self, sock):
252        # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
253        # connect() attempt. However, due to the way how ADB forwarding works, on android targets
254        # the connect() will always be successful, but the connection will be immediately dropped
255        # if ADB could not connect on the remote side. This function tries to detect this
256        # situation, and report it as "connection refused" so that the upper layers attempt the
257        # connection again.
258        triple = self.dbg.GetSelectedPlatform().GetTriple()
259        if not re.match(".*-.*-.*-android", triple):
260            return  # Not android.
261        can_read, _, _ = select.select([sock], [], [], 0.1)
262        if sock not in can_read:
263            return  # Data is not available, but the connection is alive.
264        if len(sock.recv(1, socket.MSG_PEEK)) == 0:
265            raise _ConnectionRefused()  # Got EOF, connection dropped.
266
267    def create_socket(self):
268        try:
269            sock = socket.socket(family=socket.AF_INET)
270        except OSError as e:
271            if e.errno != errno.EAFNOSUPPORT:
272                raise
273            sock = socket.socket(family=socket.AF_INET6)
274
275        logger = self.logger
276
277        triple = self.dbg.GetSelectedPlatform().GetTriple()
278        if re.match(".*-.*-.*-android", triple):
279            self.forward_adb_port(
280                self.port,
281                self.port,
282                "forward",
283                self.stub_device)
284
285        logger.info(
286            "Connecting to debug monitor on %s:%d",
287            self.stub_hostname,
288            self.port)
289        connect_info = (self.stub_hostname, self.port)
290        try:
291            sock.connect(connect_info)
292        except socket.error as serr:
293            if serr.errno == errno.ECONNREFUSED:
294                raise _ConnectionRefused()
295            raise serr
296
297        def shutdown_socket():
298            if sock:
299                try:
300                    # send the kill packet so lldb-server shuts down gracefully
301                    sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
302                except:
303                    logger.warning(
304                        "failed to send kill packet to debug monitor: {}; ignoring".format(
305                            sys.exc_info()[0]))
306
307                try:
308                    sock.close()
309                except:
310                    logger.warning(
311                        "failed to close socket to debug monitor: {}; ignoring".format(
312                            sys.exc_info()[0]))
313
314        self.addTearDownHook(shutdown_socket)
315
316        self._verify_socket(sock)
317
318        return sock
319
320    def set_inferior_startup_launch(self):
321        self._inferior_startup = self._STARTUP_LAUNCH
322
323    def set_inferior_startup_attach(self):
324        self._inferior_startup = self._STARTUP_ATTACH
325
326    def set_inferior_startup_attach_manually(self):
327        self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
328
329    def get_debug_monitor_command_line_args(self, attach_pid=None):
330        commandline_args = self.debug_monitor_extra_args
331        if attach_pid:
332            commandline_args += ["--attach=%d" % attach_pid]
333        if self.reverse_connect:
334            commandline_args += ["--reverse-connect", self.connect_address]
335        else:
336            if lldb.remote_platform:
337                commandline_args += ["*:{}".format(self.port)]
338            else:
339                commandline_args += ["localhost:{}".format(self.port)]
340
341        return commandline_args
342
343    def get_target_byte_order(self):
344        inferior_exe_path = self.getBuildArtifact("a.out")
345        target = self.dbg.CreateTarget(inferior_exe_path)
346        return target.GetByteOrder()
347
348    def launch_debug_monitor(self, attach_pid=None, logfile=None):
349        if self.reverse_connect:
350            family, type, proto, _, addr = socket.getaddrinfo("localhost", 0, proto=socket.IPPROTO_TCP)[0]
351            sock = socket.socket(family, type, proto)
352            sock.settimeout(self.DEFAULT_TIMEOUT)
353
354            sock.bind(addr)
355            sock.listen(1)
356            addr = sock.getsockname()
357            self.connect_address = "[{}]:{}".format(*addr)
358
359
360        # Create the command line.
361        commandline_args = self.get_debug_monitor_command_line_args(
362            attach_pid=attach_pid)
363
364        # Start the server.
365        server = self.spawnSubprocess(
366            self.debug_monitor_exe,
367            commandline_args,
368            install_remote=False)
369        self.assertIsNotNone(server)
370
371        if self.reverse_connect:
372            self.sock = sock.accept()[0]
373            self.sock.settimeout(self.DEFAULT_TIMEOUT)
374
375        return server
376
377    def connect_to_debug_monitor(self, attach_pid=None):
378        if self.reverse_connect:
379            # Create the stub.
380            server = self.launch_debug_monitor(attach_pid=attach_pid)
381            self.assertIsNotNone(server)
382
383            # Schedule debug monitor to be shut down during teardown.
384            logger = self.logger
385
386            self._server = Server(self.sock, server)
387            return server
388
389        # We're using a random port algorithm to try not to collide with other ports,
390        # and retry a max # times.
391        attempts = 0
392        MAX_ATTEMPTS = 20
393
394        while attempts < MAX_ATTEMPTS:
395            server = self.launch_debug_monitor(attach_pid=attach_pid)
396
397            # Schedule debug monitor to be shut down during teardown.
398            logger = self.logger
399
400            connect_attemps = 0
401            MAX_CONNECT_ATTEMPTS = 10
402
403            while connect_attemps < MAX_CONNECT_ATTEMPTS:
404                # Create a socket to talk to the server
405                try:
406                    logger.info("Connect attempt %d", connect_attemps + 1)
407                    self.sock = self.create_socket()
408                    self._server = Server(self.sock, server)
409                    return server
410                except _ConnectionRefused as serr:
411                    # Ignore, and try again.
412                    pass
413                time.sleep(0.5)
414                connect_attemps += 1
415
416            # We should close the server here to be safe.
417            server.terminate()
418
419            # Increment attempts.
420            print(
421                "connect to debug monitor on port %d failed, attempt #%d of %d" %
422                (self.port, attempts + 1, MAX_ATTEMPTS))
423            attempts += 1
424
425            # And wait a random length of time before next attempt, to avoid
426            # collisions.
427            time.sleep(random.randint(1, 5))
428
429            # Now grab a new port number.
430            self.port = self.get_next_port()
431
432        raise Exception(
433            "failed to create a socket to the launched debug monitor after %d tries" %
434            attempts)
435
436    def launch_process_for_attach(
437            self,
438            inferior_args=None,
439            sleep_seconds=3,
440            exe_path=None):
441        # We're going to start a child process that the debug monitor stub can later attach to.
442        # This process needs to be started so that it just hangs around for a while.  We'll
443        # have it sleep.
444        if not exe_path:
445            exe_path = self.getBuildArtifact("a.out")
446
447        args = []
448        if inferior_args:
449            args.extend(inferior_args)
450        if sleep_seconds:
451            args.append("sleep:%d" % sleep_seconds)
452
453        return self.spawnSubprocess(exe_path, args)
454
455    def prep_debug_monitor_and_inferior(
456            self,
457            inferior_args=None,
458            inferior_sleep_seconds=3,
459            inferior_exe_path=None,
460            inferior_env=None):
461        """Prep the debug monitor, the inferior, and the expected packet stream.
462
463        Handle the separate cases of using the debug monitor in attach-to-inferior mode
464        and in launch-inferior mode.
465
466        For attach-to-inferior mode, the inferior process is first started, then
467        the debug monitor is started in attach to pid mode (using --attach on the
468        stub command line), and the no-ack-mode setup is appended to the packet
469        stream.  The packet stream is not yet executed, ready to have more expected
470        packet entries added to it.
471
472        For launch-inferior mode, the stub is first started, then no ack mode is
473        setup on the expected packet stream, then the verified launch packets are added
474        to the expected socket stream.  The packet stream is not yet executed, ready
475        to have more expected packet entries added to it.
476
477        The return value is:
478        {inferior:<inferior>, server:<server>}
479        """
480        inferior = None
481        attach_pid = None
482
483        if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
484            # Launch the process that we'll use as the inferior.
485            inferior = self.launch_process_for_attach(
486                inferior_args=inferior_args,
487                sleep_seconds=inferior_sleep_seconds,
488                exe_path=inferior_exe_path)
489            self.assertIsNotNone(inferior)
490            self.assertTrue(inferior.pid > 0)
491            if self._inferior_startup == self._STARTUP_ATTACH:
492                # In this case, we want the stub to attach via the command
493                # line, so set the command line attach pid here.
494                attach_pid = inferior.pid
495
496        if self._inferior_startup == self._STARTUP_LAUNCH:
497            # Build launch args
498            if not inferior_exe_path:
499                inferior_exe_path = self.getBuildArtifact("a.out")
500
501            if lldb.remote_platform:
502                remote_path = lldbutil.append_to_process_working_directory(self,
503                    os.path.basename(inferior_exe_path))
504                remote_file_spec = lldb.SBFileSpec(remote_path, False)
505                err = lldb.remote_platform.Install(lldb.SBFileSpec(
506                    inferior_exe_path, True), remote_file_spec)
507                if err.Fail():
508                    raise Exception(
509                        "remote_platform.Install('%s', '%s') failed: %s" %
510                        (inferior_exe_path, remote_path, err))
511                inferior_exe_path = remote_path
512
513            launch_args = [inferior_exe_path]
514            if inferior_args:
515                launch_args.extend(inferior_args)
516
517        # Launch the debug monitor stub, attaching to the inferior.
518        server = self.connect_to_debug_monitor(attach_pid=attach_pid)
519        self.assertIsNotNone(server)
520
521        self.do_handshake()
522
523        # Build the expected protocol stream
524        if inferior_env:
525            for name, value in inferior_env.items():
526                self.add_set_environment_packets(name, value)
527        if self._inferior_startup == self._STARTUP_LAUNCH:
528            self.add_verified_launch_packets(launch_args)
529
530        return {"inferior": inferior, "server": server}
531
532    def do_handshake(self):
533        server = self._server
534        server.send_ack()
535        server.send_packet(b"QStartNoAckMode")
536        self.assertEqual(server.get_normal_packet(), b"+")
537        self.assertEqual(server.get_normal_packet(), b"OK")
538        server.send_ack()
539
540    def add_verified_launch_packets(self, launch_args):
541        self.test_sequence.add_log_lines(
542            ["read packet: %s" % build_gdbremote_A_packet(launch_args),
543             "send packet: $OK#00",
544             "read packet: $qLaunchSuccess#a5",
545             "send packet: $OK#00"],
546            True)
547
548    def add_thread_suffix_request_packets(self):
549        self.test_sequence.add_log_lines(
550            ["read packet: $QThreadSuffixSupported#e4",
551             "send packet: $OK#00",
552             ], True)
553
554    def add_process_info_collection_packets(self):
555        self.test_sequence.add_log_lines(
556            ["read packet: $qProcessInfo#dc",
557             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}],
558            True)
559
560    def add_set_environment_packets(self, name, value):
561        self.test_sequence.add_log_lines(
562            ["read packet: $QEnvironment:" + name + "=" + value + "#00",
563             "send packet: $OK#00",
564             ], True)
565
566    _KNOWN_PROCESS_INFO_KEYS = [
567        "pid",
568        "parent-pid",
569        "real-uid",
570        "real-gid",
571        "effective-uid",
572        "effective-gid",
573        "cputype",
574        "cpusubtype",
575        "ostype",
576        "triple",
577        "vendor",
578        "endian",
579        "elf_abi",
580        "ptrsize"
581    ]
582
583    def parse_process_info_response(self, context):
584        # Ensure we have a process info response.
585        self.assertIsNotNone(context)
586        process_info_raw = context.get("process_info_raw")
587        self.assertIsNotNone(process_info_raw)
588
589        # Pull out key:value; pairs.
590        process_info_dict = {
591            match.group(1): match.group(2) for match in re.finditer(
592                r"([^:]+):([^;]+);", process_info_raw)}
593
594        # Validate keys are known.
595        for (key, val) in list(process_info_dict.items()):
596            self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
597            self.assertIsNotNone(val)
598
599        return process_info_dict
600
601    def add_register_info_collection_packets(self):
602        self.test_sequence.add_log_lines(
603            [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True,
604                "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
605                "save_key": "reg_info_responses"}],
606            True)
607
608    def parse_register_info_packets(self, context):
609        """Return an array of register info dictionaries, one per register info."""
610        reg_info_responses = context.get("reg_info_responses")
611        self.assertIsNotNone(reg_info_responses)
612
613        # Parse register infos.
614        return [parse_reg_info_response(reg_info_response)
615                for reg_info_response in reg_info_responses]
616
617    def expect_gdbremote_sequence(self):
618        return expect_lldb_gdbserver_replay(
619            self,
620            self._server,
621            self.test_sequence,
622            self.DEFAULT_TIMEOUT * len(self.test_sequence),
623            self.logger)
624
625    _KNOWN_REGINFO_KEYS = [
626        "name",
627        "alt-name",
628        "bitsize",
629        "offset",
630        "encoding",
631        "format",
632        "set",
633        "gcc",
634        "ehframe",
635        "dwarf",
636        "generic",
637        "container-regs",
638        "invalidate-regs",
639        "dynamic_size_dwarf_expr_bytes",
640        "dynamic_size_dwarf_len"
641    ]
642
643    def assert_valid_reg_info(self, reg_info):
644        # Assert we know about all the reginfo keys parsed.
645        for key in reg_info:
646            self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
647
648        # Check the bare-minimum expected set of register info keys.
649        self.assertTrue("name" in reg_info)
650        self.assertTrue("bitsize" in reg_info)
651
652        if not self.getArchitecture() == 'aarch64':
653            self.assertTrue("offset" in reg_info)
654
655        self.assertTrue("encoding" in reg_info)
656        self.assertTrue("format" in reg_info)
657
658    def find_pc_reg_info(self, reg_infos):
659        lldb_reg_index = 0
660        for reg_info in reg_infos:
661            if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
662                return (lldb_reg_index, reg_info)
663            lldb_reg_index += 1
664
665        return (None, None)
666
667    def add_lldb_register_index(self, reg_infos):
668        """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
669
670        We'll use this when we want to call packets like P/p with a register index but do so
671        on only a subset of the full register info set.
672        """
673        self.assertIsNotNone(reg_infos)
674
675        reg_index = 0
676        for reg_info in reg_infos:
677            reg_info["lldb_register_index"] = reg_index
678            reg_index += 1
679
680    def add_query_memory_region_packets(self, address):
681        self.test_sequence.add_log_lines(
682            ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
683             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}],
684            True)
685
686    def parse_key_val_dict(self, key_val_text, allow_dupes=True):
687        self.assertIsNotNone(key_val_text)
688        kv_dict = {}
689        for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
690            key = match.group(1)
691            val = match.group(2)
692            if key in kv_dict:
693                if allow_dupes:
694                    if isinstance(kv_dict[key], list):
695                        kv_dict[key].append(val)
696                    else:
697                        # Promote to list
698                        kv_dict[key] = [kv_dict[key], val]
699                else:
700                    self.fail(
701                        "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
702                            key, val, key_val_text, kv_dict))
703            else:
704                kv_dict[key] = val
705        return kv_dict
706
707    def parse_memory_region_packet(self, context):
708        # Ensure we have a context.
709        self.assertIsNotNone(context.get("memory_region_response"))
710
711        # Pull out key:value; pairs.
712        mem_region_dict = self.parse_key_val_dict(
713            context.get("memory_region_response"))
714
715        # Validate keys are known.
716        for (key, val) in list(mem_region_dict.items()):
717            self.assertIn(key,
718                ["start",
719                 "size",
720                 "permissions",
721                 "flags",
722                 "name",
723                 "error",
724                 "dirty-pages",
725                 "type"])
726            self.assertIsNotNone(val)
727
728        mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", ""))
729        # Return the dictionary of key-value pairs for the memory region.
730        return mem_region_dict
731
732    def assert_address_within_memory_region(
733            self, test_address, mem_region_dict):
734        self.assertIsNotNone(mem_region_dict)
735        self.assertTrue("start" in mem_region_dict)
736        self.assertTrue("size" in mem_region_dict)
737
738        range_start = int(mem_region_dict["start"], 16)
739        range_size = int(mem_region_dict["size"], 16)
740        range_end = range_start + range_size
741
742        if test_address < range_start:
743            self.fail(
744                "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
745                    test_address,
746                    range_start,
747                    range_end,
748                    range_size))
749        elif test_address >= range_end:
750            self.fail(
751                "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
752                    test_address,
753                    range_start,
754                    range_end,
755                    range_size))
756
757    def add_threadinfo_collection_packets(self):
758        self.test_sequence.add_log_lines(
759            [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo",
760                "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
761                "save_key": "threadinfo_responses"}],
762            True)
763
764    def parse_threadinfo_packets(self, context):
765        """Return an array of thread ids (decimal ints), one per thread."""
766        threadinfo_responses = context.get("threadinfo_responses")
767        self.assertIsNotNone(threadinfo_responses)
768
769        thread_ids = []
770        for threadinfo_response in threadinfo_responses:
771            new_thread_infos = parse_threadinfo_response(threadinfo_response)
772            thread_ids.extend(new_thread_infos)
773        return thread_ids
774
775    def wait_for_thread_count(self, thread_count):
776        start_time = time.time()
777        timeout_time = start_time + self.DEFAULT_TIMEOUT
778
779        actual_thread_count = 0
780        while actual_thread_count < thread_count:
781            self.reset_test_sequence()
782            self.add_threadinfo_collection_packets()
783
784            context = self.expect_gdbremote_sequence()
785            self.assertIsNotNone(context)
786
787            threads = self.parse_threadinfo_packets(context)
788            self.assertIsNotNone(threads)
789
790            actual_thread_count = len(threads)
791
792            if time.time() > timeout_time:
793                raise Exception(
794                    'timed out after {} seconds while waiting for threads: waiting for at least {} threads, found {}'.format(
795                        self.DEFAULT_TIMEOUT, thread_count, actual_thread_count))
796
797        return threads
798
799    def add_set_breakpoint_packets(
800            self,
801            address,
802            z_packet_type=0,
803            do_continue=True,
804            breakpoint_kind=1):
805        self.test_sequence.add_log_lines(
806            [  # Set the breakpoint.
807                "read packet: $Z{2},{0:x},{1}#00".format(
808                    address, breakpoint_kind, z_packet_type),
809                # Verify the stub could set it.
810                "send packet: $OK#00",
811            ], True)
812
813        if (do_continue):
814            self.test_sequence.add_log_lines(
815                [  # Continue the inferior.
816                    "read packet: $c#63",
817                    # Expect a breakpoint stop report.
818                    {"direction": "send",
819                     "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
820                     "capture": {1: "stop_signo",
821                                 2: "stop_thread_id"}},
822                ], True)
823
824    def add_remove_breakpoint_packets(
825            self,
826            address,
827            z_packet_type=0,
828            breakpoint_kind=1):
829        self.test_sequence.add_log_lines(
830            [  # Remove the breakpoint.
831                "read packet: $z{2},{0:x},{1}#00".format(
832                    address, breakpoint_kind, z_packet_type),
833                # Verify the stub could unset it.
834                "send packet: $OK#00",
835            ], True)
836
837    def add_qSupported_packets(self, client_features=[]):
838        features = ''.join(';' + x for x in client_features)
839        self.test_sequence.add_log_lines(
840            ["read packet: $qSupported{}#00".format(features),
841             {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}},
842             ], True)
843
844    _KNOWN_QSUPPORTED_STUB_FEATURES = [
845        "augmented-libraries-svr4-read",
846        "PacketSize",
847        "QStartNoAckMode",
848        "QThreadSuffixSupported",
849        "QListThreadsInStopReply",
850        "qXfer:auxv:read",
851        "qXfer:libraries:read",
852        "qXfer:libraries-svr4:read",
853        "qXfer:features:read",
854        "qXfer:siginfo:read",
855        "qEcho",
856        "QPassSignals",
857        "multiprocess",
858        "fork-events",
859        "vfork-events",
860        "memory-tagging",
861        "qSaveCore",
862        "native-signals",
863    ]
864
865    def parse_qSupported_response(self, context):
866        self.assertIsNotNone(context)
867
868        raw_response = context.get("qSupported_response")
869        self.assertIsNotNone(raw_response)
870
871        # For values with key=val, the dict key and vals are set as expected.  For feature+, feature- and feature?, the
872        # +,-,? is stripped from the key and set as the value.
873        supported_dict = {}
874        for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
875            key = match.group(1)
876            val = match.group(3)
877
878            # key=val: store as is
879            if val and len(val) > 0:
880                supported_dict[key] = val
881            else:
882                if len(key) < 2:
883                    raise Exception(
884                        "singular stub feature is too short: must be stub_feature{+,-,?}")
885                supported_type = key[-1]
886                key = key[:-1]
887                if not supported_type in ["+", "-", "?"]:
888                    raise Exception(
889                        "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
890                supported_dict[key] = supported_type
891            # Ensure we know the supported element
892            if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES:
893                raise Exception(
894                    "unknown qSupported stub feature reported: %s" %
895                    key)
896
897        return supported_dict
898
899    def run_process_then_stop(self, run_seconds=1):
900        # Tell the stub to continue.
901        self.test_sequence.add_log_lines(
902            ["read packet: $vCont;c#a8"],
903            True)
904        context = self.expect_gdbremote_sequence()
905
906        # Wait for run_seconds.
907        time.sleep(run_seconds)
908
909        # Send an interrupt, capture a T response.
910        self.reset_test_sequence()
911        self.test_sequence.add_log_lines(
912            ["read packet: {}".format(chr(3)),
913             {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}],
914            True)
915        context = self.expect_gdbremote_sequence()
916        self.assertIsNotNone(context)
917        self.assertIsNotNone(context.get("stop_result"))
918
919        return context
920
921    def continue_process_and_wait_for_stop(self):
922        self.test_sequence.add_log_lines(
923            [
924                "read packet: $vCont;c#a8",
925                {
926                    "direction": "send",
927                    "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
928                    "capture": {1: "stop_signo", 2: "stop_key_val_text"},
929                },
930            ],
931            True,
932        )
933        context = self.expect_gdbremote_sequence()
934        self.assertIsNotNone(context)
935        return self.parse_interrupt_packets(context)
936
937    def select_modifiable_register(self, reg_infos):
938        """Find a register that can be read/written freely."""
939        PREFERRED_REGISTER_NAMES = set(["rax", ])
940
941        # First check for the first register from the preferred register name
942        # set.
943        alternative_register_index = None
944
945        self.assertIsNotNone(reg_infos)
946        for reg_info in reg_infos:
947            if ("name" in reg_info) and (
948                    reg_info["name"] in PREFERRED_REGISTER_NAMES):
949                # We found a preferred register.  Use it.
950                return reg_info["lldb_register_index"]
951            if ("generic" in reg_info) and (reg_info["generic"] == "fp" or
952                    reg_info["generic"] == "arg1"):
953                # A frame pointer or first arg register will do as a
954                # register to modify temporarily.
955                alternative_register_index = reg_info["lldb_register_index"]
956
957        # We didn't find a preferred register.  Return whatever alternative register
958        # we found, if any.
959        return alternative_register_index
960
961    def extract_registers_from_stop_notification(self, stop_key_vals_text):
962        self.assertIsNotNone(stop_key_vals_text)
963        kv_dict = self.parse_key_val_dict(stop_key_vals_text)
964
965        registers = {}
966        for (key, val) in list(kv_dict.items()):
967            if re.match(r"^[0-9a-fA-F]+$", key):
968                registers[int(key, 16)] = val
969        return registers
970
971    def gather_register_infos(self):
972        self.reset_test_sequence()
973        self.add_register_info_collection_packets()
974
975        context = self.expect_gdbremote_sequence()
976        self.assertIsNotNone(context)
977
978        reg_infos = self.parse_register_info_packets(context)
979        self.assertIsNotNone(reg_infos)
980        self.add_lldb_register_index(reg_infos)
981
982        return reg_infos
983
984    def find_generic_register_with_name(self, reg_infos, generic_name):
985        self.assertIsNotNone(reg_infos)
986        for reg_info in reg_infos:
987            if ("generic" in reg_info) and (
988                    reg_info["generic"] == generic_name):
989                return reg_info
990        return None
991
992    def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num):
993        self.assertIsNotNone(reg_infos)
994        for reg_info in reg_infos:
995            if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num):
996                return reg_info
997        return None
998
999    def decode_gdbremote_binary(self, encoded_bytes):
1000        decoded_bytes = ""
1001        i = 0
1002        while i < len(encoded_bytes):
1003            if encoded_bytes[i] == "}":
1004                # Handle escaped char.
1005                self.assertTrue(i + 1 < len(encoded_bytes))
1006                decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20)
1007                i += 2
1008            elif encoded_bytes[i] == "*":
1009                # Handle run length encoding.
1010                self.assertTrue(len(decoded_bytes) > 0)
1011                self.assertTrue(i + 1 < len(encoded_bytes))
1012                repeat_count = ord(encoded_bytes[i + 1]) - 29
1013                decoded_bytes += decoded_bytes[-1] * repeat_count
1014                i += 2
1015            else:
1016                decoded_bytes += encoded_bytes[i]
1017                i += 1
1018        return decoded_bytes
1019
1020    def build_auxv_dict(self, endian, word_size, auxv_data):
1021        self.assertIsNotNone(endian)
1022        self.assertIsNotNone(word_size)
1023        self.assertIsNotNone(auxv_data)
1024
1025        auxv_dict = {}
1026
1027        # PowerPC64le's auxvec has a special key that must be ignored.
1028        # This special key may be used multiple times, resulting in
1029        # multiple key/value pairs with the same key, which would otherwise
1030        # break this test check for repeated keys.
1031        #
1032        # AT_IGNOREPPC = 22
1033        ignored_keys_for_arch = { 'powerpc64le' : [22] }
1034        arch = self.getArchitecture()
1035        ignore_keys = None
1036        if arch in ignored_keys_for_arch:
1037            ignore_keys = ignored_keys_for_arch[arch]
1038
1039        while len(auxv_data) > 0:
1040            # Chop off key.
1041            raw_key = auxv_data[:word_size]
1042            auxv_data = auxv_data[word_size:]
1043
1044            # Chop of value.
1045            raw_value = auxv_data[:word_size]
1046            auxv_data = auxv_data[word_size:]
1047
1048            # Convert raw text from target endian.
1049            key = unpack_endian_binary_string(endian, raw_key)
1050            value = unpack_endian_binary_string(endian, raw_value)
1051
1052            if ignore_keys and key in ignore_keys:
1053                continue
1054
1055            # Handle ending entry.
1056            if key == 0:
1057                self.assertEqual(value, 0)
1058                return auxv_dict
1059
1060            # The key should not already be present.
1061            self.assertFalse(key in auxv_dict)
1062            auxv_dict[key] = value
1063
1064        self.fail(
1065            "should not reach here - implies required double zero entry not found")
1066        return auxv_dict
1067
1068    def read_binary_data_in_chunks(self, command_prefix, chunk_length):
1069        """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
1070        offset = 0
1071        done = False
1072        decoded_data = ""
1073
1074        while not done:
1075            # Grab the next iteration of data.
1076            self.reset_test_sequence()
1077            self.test_sequence.add_log_lines(
1078                [
1079                    "read packet: ${}{:x},{:x}:#00".format(
1080                        command_prefix,
1081                        offset,
1082                        chunk_length),
1083                    {
1084                        "direction": "send",
1085                        "regex": re.compile(
1086                            r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
1087                            re.MULTILINE | re.DOTALL),
1088                        "capture": {
1089                            1: "response_type",
1090                            2: "content_raw"}}],
1091                True)
1092
1093            context = self.expect_gdbremote_sequence()
1094            self.assertIsNotNone(context)
1095
1096            response_type = context.get("response_type")
1097            self.assertIsNotNone(response_type)
1098            self.assertTrue(response_type in ["l", "m"])
1099
1100            # Move offset along.
1101            offset += chunk_length
1102
1103            # Figure out if we're done.  We're done if the response type is l.
1104            done = response_type == "l"
1105
1106            # Decode binary data.
1107            content_raw = context.get("content_raw")
1108            if content_raw and len(content_raw) > 0:
1109                self.assertIsNotNone(content_raw)
1110                decoded_data += self.decode_gdbremote_binary(content_raw)
1111        return decoded_data
1112
1113    def add_interrupt_packets(self):
1114        self.test_sequence.add_log_lines([
1115            # Send the intterupt.
1116            "read packet: {}".format(chr(3)),
1117            # And wait for the stop notification.
1118            {"direction": "send",
1119             "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
1120             "capture": {1: "stop_signo",
1121                         2: "stop_key_val_text"}},
1122        ], True)
1123
1124    def parse_interrupt_packets(self, context):
1125        self.assertIsNotNone(context.get("stop_signo"))
1126        self.assertIsNotNone(context.get("stop_key_val_text"))
1127        return (int(context["stop_signo"], 16), self.parse_key_val_dict(
1128            context["stop_key_val_text"]))
1129
1130    def add_QSaveRegisterState_packets(self, thread_id):
1131        if thread_id:
1132            # Use the thread suffix form.
1133            request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(
1134                thread_id)
1135        else:
1136            request = "read packet: $QSaveRegisterState#00"
1137
1138        self.test_sequence.add_log_lines([request,
1139                                          {"direction": "send",
1140                                           "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$",
1141                                           "capture": {1: "save_response"}},
1142                                          ],
1143                                         True)
1144
1145    def parse_QSaveRegisterState_response(self, context):
1146        self.assertIsNotNone(context)
1147
1148        save_response = context.get("save_response")
1149        self.assertIsNotNone(save_response)
1150
1151        if len(save_response) < 1 or save_response[0] == "E":
1152            # error received
1153            return (False, None)
1154        else:
1155            return (True, int(save_response))
1156
1157    def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
1158        if thread_id:
1159            # Use the thread suffix form.
1160            request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
1161                save_id, thread_id)
1162        else:
1163            request = "read packet: $QRestoreRegisterState:{}#00".format(
1164                save_id)
1165
1166        self.test_sequence.add_log_lines([
1167            request,
1168            "send packet: $OK#00"
1169        ], True)
1170
1171    def flip_all_bits_in_each_register_value(
1172            self, reg_infos, endian, thread_id=None):
1173        self.assertIsNotNone(reg_infos)
1174
1175        successful_writes = 0
1176        failed_writes = 0
1177
1178        for reg_info in reg_infos:
1179            # Use the lldb register index added to the reg info.  We're not necessarily
1180            # working off a full set of register infos, so an inferred register
1181            # index could be wrong.
1182            reg_index = reg_info["lldb_register_index"]
1183            self.assertIsNotNone(reg_index)
1184
1185            reg_byte_size = int(reg_info["bitsize"]) // 8
1186            self.assertTrue(reg_byte_size > 0)
1187
1188            # Handle thread suffix.
1189            if thread_id:
1190                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1191                    reg_index, thread_id)
1192            else:
1193                p_request = "read packet: $p{:x}#00".format(reg_index)
1194
1195            # Read the existing value.
1196            self.reset_test_sequence()
1197            self.test_sequence.add_log_lines([
1198                p_request,
1199                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1200            ], True)
1201            context = self.expect_gdbremote_sequence()
1202            self.assertIsNotNone(context)
1203
1204            # Verify the response length.
1205            p_response = context.get("p_response")
1206            self.assertIsNotNone(p_response)
1207            initial_reg_value = unpack_register_hex_unsigned(
1208                endian, p_response)
1209
1210            # Flip the value by xoring with all 1s
1211            all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8)
1212            flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
1213            # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1214
1215            # Handle thread suffix for P.
1216            if thread_id:
1217                P_request = "read packet: $P{:x}={};thread:{:x}#00".format(
1218                    reg_index, pack_register_hex(
1219                        endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
1220            else:
1221                P_request = "read packet: $P{:x}={}#00".format(
1222                    reg_index, pack_register_hex(
1223                        endian, flipped_bits_int, byte_size=reg_byte_size))
1224
1225            # Write the flipped value to the register.
1226            self.reset_test_sequence()
1227            self.test_sequence.add_log_lines([P_request,
1228                                              {"direction": "send",
1229                                               "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
1230                                               "capture": {1: "P_response"}},
1231                                              ],
1232                                             True)
1233            context = self.expect_gdbremote_sequence()
1234            self.assertIsNotNone(context)
1235
1236            # Determine if the write succeeded.  There are a handful of registers that can fail, or partially fail
1237            # (e.g. flags, segment selectors, etc.) due to register value restrictions.  Don't worry about them
1238            # all flipping perfectly.
1239            P_response = context.get("P_response")
1240            self.assertIsNotNone(P_response)
1241            if P_response == "OK":
1242                successful_writes += 1
1243            else:
1244                failed_writes += 1
1245                # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1246
1247            # Read back the register value, ensure it matches the flipped
1248            # value.
1249            if P_response == "OK":
1250                self.reset_test_sequence()
1251                self.test_sequence.add_log_lines([
1252                    p_request,
1253                    {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1254                ], True)
1255                context = self.expect_gdbremote_sequence()
1256                self.assertIsNotNone(context)
1257
1258                verify_p_response_raw = context.get("p_response")
1259                self.assertIsNotNone(verify_p_response_raw)
1260                verify_bits = unpack_register_hex_unsigned(
1261                    endian, verify_p_response_raw)
1262
1263                if verify_bits != flipped_bits_int:
1264                    # Some registers, like mxcsrmask and others, will permute what's written.  Adjust succeed/fail counts.
1265                    # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1266                    successful_writes -= 1
1267                    failed_writes += 1
1268
1269        return (successful_writes, failed_writes)
1270
1271    def is_bit_flippable_register(self, reg_info):
1272        if not reg_info:
1273            return False
1274        if not "set" in reg_info:
1275            return False
1276        if reg_info["set"] != "General Purpose Registers":
1277            return False
1278        if ("container-regs" in reg_info) and (
1279                len(reg_info["container-regs"]) > 0):
1280            # Don't try to bit flip registers contained in another register.
1281            return False
1282        if re.match("^.s$", reg_info["name"]):
1283            # This is a 2-letter register name that ends in "s", like a segment register.
1284            # Don't try to bit flip these.
1285            return False
1286        if re.match("^(c|)psr$", reg_info["name"]):
1287            # This is an ARM program status register; don't flip it.
1288            return False
1289        # Okay, this looks fine-enough.
1290        return True
1291
1292    def read_register_values(self, reg_infos, endian, thread_id=None):
1293        self.assertIsNotNone(reg_infos)
1294        values = {}
1295
1296        for reg_info in reg_infos:
1297            # We append a register index when load reg infos so we can work
1298            # with subsets.
1299            reg_index = reg_info.get("lldb_register_index")
1300            self.assertIsNotNone(reg_index)
1301
1302            # Handle thread suffix.
1303            if thread_id:
1304                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1305                    reg_index, thread_id)
1306            else:
1307                p_request = "read packet: $p{:x}#00".format(reg_index)
1308
1309            # Read it with p.
1310            self.reset_test_sequence()
1311            self.test_sequence.add_log_lines([
1312                p_request,
1313                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1314            ], True)
1315            context = self.expect_gdbremote_sequence()
1316            self.assertIsNotNone(context)
1317
1318            # Convert value from target endian to integral.
1319            p_response = context.get("p_response")
1320            self.assertIsNotNone(p_response)
1321            self.assertTrue(len(p_response) > 0)
1322            self.assertFalse(p_response[0] == "E")
1323
1324            values[reg_index] = unpack_register_hex_unsigned(
1325                endian, p_response)
1326
1327        return values
1328
1329    def add_vCont_query_packets(self):
1330        self.test_sequence.add_log_lines(["read packet: $vCont?#49",
1331                                          {"direction": "send",
1332                                           "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
1333                                           "capture": {2: "vCont_query_response"}},
1334                                          ],
1335                                         True)
1336
1337    def parse_vCont_query_response(self, context):
1338        self.assertIsNotNone(context)
1339        vCont_query_response = context.get("vCont_query_response")
1340
1341        # Handle case of no vCont support at all - in which case the capture
1342        # group will be none or zero length.
1343        if not vCont_query_response or len(vCont_query_response) == 0:
1344            return {}
1345
1346        return {key: 1 for key in vCont_query_response.split(
1347            ";") if key and len(key) > 0}
1348
1349    def count_single_steps_until_true(
1350            self,
1351            thread_id,
1352            predicate,
1353            args,
1354            max_step_count=100,
1355            use_Hc_packet=True,
1356            step_instruction="s"):
1357        """Used by single step test that appears in a few different contexts."""
1358        single_step_count = 0
1359
1360        while single_step_count < max_step_count:
1361            self.assertIsNotNone(thread_id)
1362
1363            # Build the packet for the single step instruction.  We replace
1364            # {thread}, if present, with the thread_id.
1365            step_packet = "read packet: ${}#00".format(
1366                re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
1367            # print("\nstep_packet created: {}\n".format(step_packet))
1368
1369            # Single step.
1370            self.reset_test_sequence()
1371            if use_Hc_packet:
1372                self.test_sequence.add_log_lines(
1373                    [  # Set the continue thread.
1374                        "read packet: $Hc{0:x}#00".format(thread_id),
1375                        "send packet: $OK#00",
1376                    ], True)
1377            self.test_sequence.add_log_lines([
1378                # Single step.
1379                step_packet,
1380                # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1381                # Expect a breakpoint stop report.
1382                {"direction": "send",
1383                 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1384                 "capture": {1: "stop_signo",
1385                             2: "stop_thread_id"}},
1386            ], True)
1387            context = self.expect_gdbremote_sequence()
1388            self.assertIsNotNone(context)
1389            self.assertIsNotNone(context.get("stop_signo"))
1390            self.assertEqual(int(context.get("stop_signo"), 16),
1391                             lldbutil.get_signal_number('SIGTRAP'))
1392
1393            single_step_count += 1
1394
1395            # See if the predicate is true.  If so, we're done.
1396            if predicate(args):
1397                return (True, single_step_count)
1398
1399        # The predicate didn't return true within the runaway step count.
1400        return (False, single_step_count)
1401
1402    def g_c1_c2_contents_are(self, args):
1403        """Used by single step test that appears in a few different contexts."""
1404        g_c1_address = args["g_c1_address"]
1405        g_c2_address = args["g_c2_address"]
1406        expected_g_c1 = args["expected_g_c1"]
1407        expected_g_c2 = args["expected_g_c2"]
1408
1409        # Read g_c1 and g_c2 contents.
1410        self.reset_test_sequence()
1411        self.test_sequence.add_log_lines(
1412            ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
1413             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}},
1414             "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
1415             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}],
1416            True)
1417
1418        # Run the packet stream.
1419        context = self.expect_gdbremote_sequence()
1420        self.assertIsNotNone(context)
1421
1422        # Check if what we read from inferior memory is what we are expecting.
1423        self.assertIsNotNone(context.get("g_c1_contents"))
1424        self.assertIsNotNone(context.get("g_c2_contents"))
1425
1426        return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and (
1427            seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2)
1428
1429    def single_step_only_steps_one_instruction(
1430            self, use_Hc_packet=True, step_instruction="s"):
1431        """Used by single step test that appears in a few different contexts."""
1432        # Start up the inferior.
1433        procs = self.prep_debug_monitor_and_inferior(
1434            inferior_args=[
1435                "get-code-address-hex:swap_chars",
1436                "get-data-address-hex:g_c1",
1437                "get-data-address-hex:g_c2",
1438                "sleep:1",
1439                "call-function:swap_chars",
1440                "sleep:5"])
1441
1442        # Run the process
1443        self.test_sequence.add_log_lines(
1444            [  # Start running after initial stop.
1445                "read packet: $c#63",
1446                # Match output line that prints the memory address of the function call entry point.
1447                # Note we require launch-only testing so we can get inferior otuput.
1448                {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
1449                 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}},
1450                # Now stop the inferior.
1451                "read packet: {}".format(chr(3)),
1452                # And wait for the stop notification.
1453                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
1454            True)
1455
1456        # Run the packet stream.
1457        context = self.expect_gdbremote_sequence()
1458        self.assertIsNotNone(context)
1459
1460        # Grab the main thread id.
1461        self.assertIsNotNone(context.get("stop_thread_id"))
1462        main_thread_id = int(context.get("stop_thread_id"), 16)
1463
1464        # Grab the function address.
1465        self.assertIsNotNone(context.get("function_address"))
1466        function_address = int(context.get("function_address"), 16)
1467
1468        # Grab the data addresses.
1469        self.assertIsNotNone(context.get("g_c1_address"))
1470        g_c1_address = int(context.get("g_c1_address"), 16)
1471
1472        self.assertIsNotNone(context.get("g_c2_address"))
1473        g_c2_address = int(context.get("g_c2_address"), 16)
1474
1475        # Set a breakpoint at the given address.
1476        if self.getArchitecture().startswith("arm"):
1477            # TODO: Handle case when setting breakpoint in thumb code
1478            BREAKPOINT_KIND = 4
1479        else:
1480            BREAKPOINT_KIND = 1
1481        self.reset_test_sequence()
1482        self.add_set_breakpoint_packets(
1483            function_address,
1484            do_continue=True,
1485            breakpoint_kind=BREAKPOINT_KIND)
1486        context = self.expect_gdbremote_sequence()
1487        self.assertIsNotNone(context)
1488
1489        # Remove the breakpoint.
1490        self.reset_test_sequence()
1491        self.add_remove_breakpoint_packets(
1492            function_address, breakpoint_kind=BREAKPOINT_KIND)
1493        context = self.expect_gdbremote_sequence()
1494        self.assertIsNotNone(context)
1495
1496        # Verify g_c1 and g_c2 match expected initial state.
1497        args = {}
1498        args["g_c1_address"] = g_c1_address
1499        args["g_c2_address"] = g_c2_address
1500        args["expected_g_c1"] = "0"
1501        args["expected_g_c2"] = "1"
1502
1503        self.assertTrue(self.g_c1_c2_contents_are(args))
1504
1505        # Verify we take only a small number of steps to hit the first state.
1506        # Might need to work through function entry prologue code.
1507        args["expected_g_c1"] = "1"
1508        args["expected_g_c2"] = "1"
1509        (state_reached,
1510         step_count) = self.count_single_steps_until_true(main_thread_id,
1511                                                          self.g_c1_c2_contents_are,
1512                                                          args,
1513                                                          max_step_count=25,
1514                                                          use_Hc_packet=use_Hc_packet,
1515                                                          step_instruction=step_instruction)
1516        self.assertTrue(state_reached)
1517
1518        # Verify we hit the next state.
1519        args["expected_g_c1"] = "1"
1520        args["expected_g_c2"] = "0"
1521        (state_reached,
1522         step_count) = self.count_single_steps_until_true(main_thread_id,
1523                                                          self.g_c1_c2_contents_are,
1524                                                          args,
1525                                                          max_step_count=5,
1526                                                          use_Hc_packet=use_Hc_packet,
1527                                                          step_instruction=step_instruction)
1528        self.assertTrue(state_reached)
1529        expected_step_count = 1
1530        arch = self.getArchitecture()
1531
1532        # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
1533        # of variable value
1534        if re.match("mips", arch):
1535            expected_step_count = 3
1536        # S390X requires "2" (LARL, MVI) machine instructions for updation of
1537        # variable value
1538        if re.match("s390x", arch):
1539            expected_step_count = 2
1540        # ARM64 requires "4" instructions: 2 to compute the address (adrp, add),
1541        # one to materialize the constant (mov) and the store
1542        if re.match("arm64", arch):
1543            expected_step_count = 4
1544
1545        self.assertEqual(step_count, expected_step_count)
1546
1547        # ARM64: Once addresses and constants are materialized, only one
1548        # instruction is needed.
1549        if re.match("arm64", arch):
1550            expected_step_count = 1
1551
1552        # Verify we hit the next state.
1553        args["expected_g_c1"] = "0"
1554        args["expected_g_c2"] = "0"
1555        (state_reached,
1556         step_count) = self.count_single_steps_until_true(main_thread_id,
1557                                                          self.g_c1_c2_contents_are,
1558                                                          args,
1559                                                          max_step_count=5,
1560                                                          use_Hc_packet=use_Hc_packet,
1561                                                          step_instruction=step_instruction)
1562        self.assertTrue(state_reached)
1563        self.assertEqual(step_count, expected_step_count)
1564
1565        # Verify we hit the next state.
1566        args["expected_g_c1"] = "0"
1567        args["expected_g_c2"] = "1"
1568        (state_reached,
1569         step_count) = self.count_single_steps_until_true(main_thread_id,
1570                                                          self.g_c1_c2_contents_are,
1571                                                          args,
1572                                                          max_step_count=5,
1573                                                          use_Hc_packet=use_Hc_packet,
1574                                                          step_instruction=step_instruction)
1575        self.assertTrue(state_reached)
1576        self.assertEqual(step_count, expected_step_count)
1577
1578    def maybe_strict_output_regex(self, regex):
1579        return '.*' + regex + \
1580            '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$'
1581
1582    def install_and_create_launch_args(self):
1583        exe_path = self.getBuildArtifact("a.out")
1584        if not lldb.remote_platform:
1585            return [exe_path]
1586        remote_path = lldbutil.append_to_process_working_directory(self,
1587            os.path.basename(exe_path))
1588        remote_file_spec = lldb.SBFileSpec(remote_path, False)
1589        err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True),
1590                                           remote_file_spec)
1591        if err.Fail():
1592            raise Exception("remote_platform.Install('%s', '%s') failed: %s" %
1593                            (exe_path, remote_path, err))
1594        return [remote_path]
1595