1"""
2Base class for gdb-remote test cases.
3"""
4
5from __future__ import division, print_function
6
7
8import errno
9import os
10import os.path
11import random
12import re
13import select
14import socket
15import subprocess
16import sys
17import tempfile
18import time
19from lldbsuite.test import configuration
20from lldbsuite.test.lldbtest import *
21from lldbsuite.support import seven
22from lldbgdbserverutils import *
23import logging
24
25
26class _ConnectionRefused(IOError):
27    pass
28
29
30class GdbRemoteTestCaseBase(TestBase):
31
32    NO_DEBUG_INFO_TESTCASE = True
33
34    _TIMEOUT_SECONDS = 120
35
36    _GDBREMOTE_KILL_PACKET = "$k#6b"
37
38    # Start the inferior separately, attach to the inferior on the stub
39    # command line.
40    _STARTUP_ATTACH = "attach"
41    # Start the inferior separately, start the stub without attaching, allow
42    # the test to attach to the inferior however it wants (e.g. $vAttach;pid).
43    _STARTUP_ATTACH_MANUALLY = "attach_manually"
44    # Start the stub, and launch the inferior with an $A packet via the
45    # initial packet stream.
46    _STARTUP_LAUNCH = "launch"
47
48    # GDB Signal numbers that are not target-specific used for common
49    # exceptions
50    TARGET_EXC_BAD_ACCESS = 0x91
51    TARGET_EXC_BAD_INSTRUCTION = 0x92
52    TARGET_EXC_ARITHMETIC = 0x93
53    TARGET_EXC_EMULATION = 0x94
54    TARGET_EXC_SOFTWARE = 0x95
55    TARGET_EXC_BREAKPOINT = 0x96
56
57    _verbose_log_handler = None
58    _log_formatter = logging.Formatter(
59        fmt='%(asctime)-15s %(levelname)-8s %(message)s')
60
61    def setUpBaseLogging(self):
62        self.logger = logging.getLogger(__name__)
63
64        if len(self.logger.handlers) > 0:
65            return  # We have set up this handler already
66
67        self.logger.propagate = False
68        self.logger.setLevel(logging.DEBUG)
69
70        # log all warnings to stderr
71        handler = logging.StreamHandler()
72        handler.setLevel(logging.WARNING)
73        handler.setFormatter(self._log_formatter)
74        self.logger.addHandler(handler)
75
76    def isVerboseLoggingRequested(self):
77        # We will report our detailed logs if the user requested that the "gdb-remote" channel is
78        # logged.
79        return any(("gdb-remote" in channel)
80                   for channel in lldbtest_config.channels)
81
82    def setUp(self):
83        TestBase.setUp(self)
84
85        self.setUpBaseLogging()
86        self.debug_monitor_extra_args = []
87        self._pump_queues = socket_packet_pump.PumpQueues()
88
89        if self.isVerboseLoggingRequested():
90            # If requested, full logs go to a log file
91            self._verbose_log_handler = logging.FileHandler(
92                self.log_basename + "-host.log")
93            self._verbose_log_handler.setFormatter(self._log_formatter)
94            self._verbose_log_handler.setLevel(logging.DEBUG)
95            self.logger.addHandler(self._verbose_log_handler)
96
97        self.test_sequence = GdbRemoteTestSequence(self.logger)
98        self.set_inferior_startup_launch()
99        self.port = self.get_next_port()
100        self.named_pipe_path = None
101        self.named_pipe = None
102        self.named_pipe_fd = None
103        self.stub_sends_two_stop_notifications_on_kill = False
104        if configuration.lldb_platform_url:
105            if configuration.lldb_platform_url.startswith('unix-'):
106                url_pattern = '(.+)://\[?(.+?)\]?/.*'
107            else:
108                url_pattern = '(.+)://(.+):\d+'
109            scheme, host = re.match(
110                url_pattern, configuration.lldb_platform_url).groups()
111            if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
112                self.stub_device = host
113                self.stub_hostname = 'localhost'
114            else:
115                self.stub_device = None
116                self.stub_hostname = host
117        else:
118            self.stub_hostname = "localhost"
119
120    def tearDown(self):
121        self._pump_queues.verify_queues_empty()
122
123        self.logger.removeHandler(self._verbose_log_handler)
124        self._verbose_log_handler = None
125        TestBase.tearDown(self)
126
127    def getLocalServerLogFile(self):
128        return self.log_basename + "-server.log"
129
130    def setUpServerLogging(self, is_llgs):
131        if len(lldbtest_config.channels) == 0:
132            return  # No logging requested
133
134        if lldb.remote_platform:
135            log_file = lldbutil.join_remote_paths(
136                lldb.remote_platform.GetWorkingDirectory(), "server.log")
137        else:
138            log_file = self.getLocalServerLogFile()
139
140        if is_llgs:
141            self.debug_monitor_extra_args.append("--log-file=" + log_file)
142            self.debug_monitor_extra_args.append(
143                "--log-channels={}".format(":".join(lldbtest_config.channels)))
144        else:
145            self.debug_monitor_extra_args = [
146                "--log-file=" + log_file, "--log-flags=0x800000"]
147
148    def get_next_port(self):
149        return 12000 + random.randint(0, 3999)
150
151    def reset_test_sequence(self):
152        self.test_sequence = GdbRemoteTestSequence(self.logger)
153
154    def create_named_pipe(self):
155        # Create a temp dir and name for a pipe.
156        temp_dir = tempfile.mkdtemp()
157        named_pipe_path = os.path.join(temp_dir, "stub_port_number")
158
159        # Create the named pipe.
160        os.mkfifo(named_pipe_path)
161
162        # Open the read side of the pipe in non-blocking mode.  This will
163        # return right away, ready or not.
164        named_pipe_fd = os.open(named_pipe_path, os.O_RDONLY | os.O_NONBLOCK)
165
166        # Create the file for the named pipe.  Note this will follow semantics of
167        # a non-blocking read side of a named pipe, which has different semantics
168        # than a named pipe opened for read in non-blocking mode.
169        named_pipe = os.fdopen(named_pipe_fd, "r")
170        self.assertIsNotNone(named_pipe)
171
172        def shutdown_named_pipe():
173            # Close the pipe.
174            try:
175                named_pipe.close()
176            except:
177                print("failed to close named pipe")
178                None
179
180            # Delete the pipe.
181            try:
182                os.remove(named_pipe_path)
183            except:
184                print("failed to delete named pipe: {}".format(named_pipe_path))
185                None
186
187            # Delete the temp directory.
188            try:
189                os.rmdir(temp_dir)
190            except:
191                print(
192                    "failed to delete temp dir: {}, directory contents: '{}'".format(
193                        temp_dir, os.listdir(temp_dir)))
194                None
195
196        # Add the shutdown hook to clean up the named pipe.
197        self.addTearDownHook(shutdown_named_pipe)
198
199        # Clear the port so the stub selects a port number.
200        self.port = 0
201
202        return (named_pipe_path, named_pipe, named_pipe_fd)
203
204    def get_stub_port_from_named_socket(self, read_timeout_seconds=5):
205        # Wait for something to read with a max timeout.
206        (ready_readers, _, _) = select.select(
207            [self.named_pipe_fd], [], [], read_timeout_seconds)
208        self.assertIsNotNone(
209            ready_readers,
210            "write side of pipe has not written anything - stub isn't writing to pipe.")
211        self.assertNotEqual(
212            len(ready_readers),
213            0,
214            "write side of pipe has not written anything - stub isn't writing to pipe.")
215
216        # Read the port from the named pipe.
217        stub_port_raw = self.named_pipe.read()
218        self.assertIsNotNone(stub_port_raw)
219        self.assertNotEqual(
220            len(stub_port_raw),
221            0,
222            "no content to read on pipe")
223
224        # Trim null byte, convert to int.
225        stub_port_raw = stub_port_raw[:-1]
226        stub_port = int(stub_port_raw)
227        self.assertTrue(stub_port > 0)
228
229        return stub_port
230
231    def init_llgs_test(self, use_named_pipe=True):
232        if lldb.remote_platform:
233            # Remote platforms don't support named pipe based port negotiation
234            use_named_pipe = False
235
236            triple = self.dbg.GetSelectedPlatform().GetTriple()
237            if re.match(".*-.*-windows", triple):
238                self.skipTest("Remotely testing is not supported on Windows yet.")
239
240            # Grab the ppid from /proc/[shell pid]/stat
241            err, retcode, shell_stat = self.run_platform_command(
242                "cat /proc/$$/stat")
243            self.assertTrue(
244                err.Success() and retcode == 0,
245                "Failed to read file /proc/$$/stat: %s, retcode: %d" %
246                (err.GetCString(),
247                 retcode))
248
249            # [pid] ([executable]) [state] [*ppid*]
250            pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
251            err, retcode, ls_output = self.run_platform_command(
252                "ls -l /proc/%s/exe" % pid)
253            self.assertTrue(
254                err.Success() and retcode == 0,
255                "Failed to read file /proc/%s/exe: %s, retcode: %d" %
256                (pid,
257                 err.GetCString(),
258                 retcode))
259            exe = ls_output.split()[-1]
260
261            # If the binary has been deleted, the link name has " (deleted)" appended.
262            # Remove if it's there.
263            self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe)
264        else:
265            # Need to figure out how to create a named pipe on Windows.
266            if platform.system() == 'Windows':
267                use_named_pipe = False
268
269            self.debug_monitor_exe = get_lldb_server_exe()
270            if not self.debug_monitor_exe:
271                self.skipTest("lldb-server exe not found")
272
273        self.debug_monitor_extra_args = ["gdbserver"]
274        self.setUpServerLogging(is_llgs=True)
275
276        if use_named_pipe:
277            (self.named_pipe_path, self.named_pipe,
278             self.named_pipe_fd) = self.create_named_pipe()
279
280    def init_debugserver_test(self, use_named_pipe=True):
281        self.debug_monitor_exe = get_debugserver_exe()
282        if not self.debug_monitor_exe:
283            self.skipTest("debugserver exe not found")
284        self.setUpServerLogging(is_llgs=False)
285        if use_named_pipe:
286            (self.named_pipe_path, self.named_pipe,
287             self.named_pipe_fd) = self.create_named_pipe()
288        # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
289        # when the process truly dies.
290        self.stub_sends_two_stop_notifications_on_kill = True
291
292    def forward_adb_port(self, source, target, direction, device):
293        adb = ['adb'] + (['-s', device] if device else []) + [direction]
294
295        def remove_port_forward():
296            subprocess.call(adb + ["--remove", "tcp:%d" % source])
297
298        subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target])
299        self.addTearDownHook(remove_port_forward)
300
301    def _verify_socket(self, sock):
302        # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
303        # connect() attempt. However, due to the way how ADB forwarding works, on android targets
304        # the connect() will always be successful, but the connection will be immediately dropped
305        # if ADB could not connect on the remote side. This function tries to detect this
306        # situation, and report it as "connection refused" so that the upper layers attempt the
307        # connection again.
308        triple = self.dbg.GetSelectedPlatform().GetTriple()
309        if not re.match(".*-.*-.*-android", triple):
310            return  # Not android.
311        can_read, _, _ = select.select([sock], [], [], 0.1)
312        if sock not in can_read:
313            return  # Data is not available, but the connection is alive.
314        if len(sock.recv(1, socket.MSG_PEEK)) == 0:
315            raise _ConnectionRefused()  # Got EOF, connection dropped.
316
317    def create_socket(self):
318        sock = socket.socket()
319        logger = self.logger
320
321        triple = self.dbg.GetSelectedPlatform().GetTriple()
322        if re.match(".*-.*-.*-android", triple):
323            self.forward_adb_port(
324                self.port,
325                self.port,
326                "forward",
327                self.stub_device)
328
329        logger.info(
330            "Connecting to debug monitor on %s:%d",
331            self.stub_hostname,
332            self.port)
333        connect_info = (self.stub_hostname, self.port)
334        try:
335            sock.connect(connect_info)
336        except socket.error as serr:
337            if serr.errno == errno.ECONNREFUSED:
338                raise _ConnectionRefused()
339            raise serr
340
341        def shutdown_socket():
342            if sock:
343                try:
344                    # send the kill packet so lldb-server shuts down gracefully
345                    sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
346                except:
347                    logger.warning(
348                        "failed to send kill packet to debug monitor: {}; ignoring".format(
349                            sys.exc_info()[0]))
350
351                try:
352                    sock.close()
353                except:
354                    logger.warning(
355                        "failed to close socket to debug monitor: {}; ignoring".format(
356                            sys.exc_info()[0]))
357
358        self.addTearDownHook(shutdown_socket)
359
360        self._verify_socket(sock)
361
362        return sock
363
364    def set_inferior_startup_launch(self):
365        self._inferior_startup = self._STARTUP_LAUNCH
366
367    def set_inferior_startup_attach(self):
368        self._inferior_startup = self._STARTUP_ATTACH
369
370    def set_inferior_startup_attach_manually(self):
371        self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
372
373    def get_debug_monitor_command_line_args(self, attach_pid=None):
374        if lldb.remote_platform:
375            commandline_args = self.debug_monitor_extra_args + \
376                ["*:{}".format(self.port)]
377        else:
378            commandline_args = self.debug_monitor_extra_args + \
379                ["127.0.0.1:{}".format(self.port)]
380
381        if attach_pid:
382            commandline_args += ["--attach=%d" % attach_pid]
383        if self.named_pipe_path:
384            commandline_args += ["--named-pipe", self.named_pipe_path]
385        return commandline_args
386
387    def get_target_byte_order(self):
388        inferior_exe_path = self.getBuildArtifact("a.out")
389        target = self.dbg.CreateTarget(inferior_exe_path)
390        return target.GetByteOrder()
391
392    def launch_debug_monitor(self, attach_pid=None, logfile=None):
393        # Create the command line.
394        commandline_args = self.get_debug_monitor_command_line_args(
395            attach_pid=attach_pid)
396
397        # Start the server.
398        server = self.spawnSubprocess(
399            self.debug_monitor_exe,
400            commandline_args,
401            install_remote=False)
402        self.addTearDownHook(self.cleanupSubprocesses)
403        self.assertIsNotNone(server)
404
405        # If we're receiving the stub's listening port from the named pipe, do
406        # that here.
407        if self.named_pipe:
408            self.port = self.get_stub_port_from_named_socket()
409
410        return server
411
412    def connect_to_debug_monitor(self, attach_pid=None):
413        if self.named_pipe:
414            # Create the stub.
415            server = self.launch_debug_monitor(attach_pid=attach_pid)
416            self.assertIsNotNone(server)
417
418            def shutdown_debug_monitor():
419                try:
420                    server.terminate()
421                except:
422                    logger.warning(
423                        "failed to terminate server for debug monitor: {}; ignoring".format(
424                            sys.exc_info()[0]))
425            self.addTearDownHook(shutdown_debug_monitor)
426
427            # Schedule debug monitor to be shut down during teardown.
428            logger = self.logger
429
430            # Attach to the stub and return a socket opened to it.
431            self.sock = self.create_socket()
432            return server
433
434        # We're using a random port algorithm to try not to collide with other ports,
435        # and retry a max # times.
436        attempts = 0
437        MAX_ATTEMPTS = 20
438
439        while attempts < MAX_ATTEMPTS:
440            server = self.launch_debug_monitor(attach_pid=attach_pid)
441
442            # Schedule debug monitor to be shut down during teardown.
443            logger = self.logger
444
445            def shutdown_debug_monitor():
446                try:
447                    server.terminate()
448                except:
449                    logger.warning(
450                        "failed to terminate server for debug monitor: {}; ignoring".format(
451                            sys.exc_info()[0]))
452            self.addTearDownHook(shutdown_debug_monitor)
453
454            connect_attemps = 0
455            MAX_CONNECT_ATTEMPTS = 10
456
457            while connect_attemps < MAX_CONNECT_ATTEMPTS:
458                # Create a socket to talk to the server
459                try:
460                    logger.info("Connect attempt %d", connect_attemps + 1)
461                    self.sock = self.create_socket()
462                    return server
463                except _ConnectionRefused as serr:
464                    # Ignore, and try again.
465                    pass
466                time.sleep(0.5)
467                connect_attemps += 1
468
469            # We should close the server here to be safe.
470            server.terminate()
471
472            # Increment attempts.
473            print(
474                "connect to debug monitor on port %d failed, attempt #%d of %d" %
475                (self.port, attempts + 1, MAX_ATTEMPTS))
476            attempts += 1
477
478            # And wait a random length of time before next attempt, to avoid
479            # collisions.
480            time.sleep(random.randint(1, 5))
481
482            # Now grab a new port number.
483            self.port = self.get_next_port()
484
485        raise Exception(
486            "failed to create a socket to the launched debug monitor after %d tries" %
487            attempts)
488
489    def launch_process_for_attach(
490            self,
491            inferior_args=None,
492            sleep_seconds=3,
493            exe_path=None):
494        # We're going to start a child process that the debug monitor stub can later attach to.
495        # This process needs to be started so that it just hangs around for a while.  We'll
496        # have it sleep.
497        if not exe_path:
498            exe_path = self.getBuildArtifact("a.out")
499
500        args = []
501        if inferior_args:
502            args.extend(inferior_args)
503        if sleep_seconds:
504            args.append("sleep:%d" % sleep_seconds)
505
506        inferior = self.spawnSubprocess(exe_path, args)
507
508        def shutdown_process_for_attach():
509            try:
510                inferior.terminate()
511            except:
512                logger.warning(
513                    "failed to terminate inferior process for attach: {}; ignoring".format(
514                        sys.exc_info()[0]))
515        self.addTearDownHook(shutdown_process_for_attach)
516        return inferior
517
518    def prep_debug_monitor_and_inferior(
519            self,
520            inferior_args=None,
521            inferior_sleep_seconds=3,
522            inferior_exe_path=None,
523            inferior_env=None):
524        """Prep the debug monitor, the inferior, and the expected packet stream.
525
526        Handle the separate cases of using the debug monitor in attach-to-inferior mode
527        and in launch-inferior mode.
528
529        For attach-to-inferior mode, the inferior process is first started, then
530        the debug monitor is started in attach to pid mode (using --attach on the
531        stub command line), and the no-ack-mode setup is appended to the packet
532        stream.  The packet stream is not yet executed, ready to have more expected
533        packet entries added to it.
534
535        For launch-inferior mode, the stub is first started, then no ack mode is
536        setup on the expected packet stream, then the verified launch packets are added
537        to the expected socket stream.  The packet stream is not yet executed, ready
538        to have more expected packet entries added to it.
539
540        The return value is:
541        {inferior:<inferior>, server:<server>}
542        """
543        inferior = None
544        attach_pid = None
545
546        if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
547            # Launch the process that we'll use as the inferior.
548            inferior = self.launch_process_for_attach(
549                inferior_args=inferior_args,
550                sleep_seconds=inferior_sleep_seconds,
551                exe_path=inferior_exe_path)
552            self.assertIsNotNone(inferior)
553            self.assertTrue(inferior.pid > 0)
554            if self._inferior_startup == self._STARTUP_ATTACH:
555                # In this case, we want the stub to attach via the command
556                # line, so set the command line attach pid here.
557                attach_pid = inferior.pid
558
559        if self._inferior_startup == self._STARTUP_LAUNCH:
560            # Build launch args
561            if not inferior_exe_path:
562                inferior_exe_path = self.getBuildArtifact("a.out")
563
564            if lldb.remote_platform:
565                remote_path = lldbutil.append_to_process_working_directory(self,
566                    os.path.basename(inferior_exe_path))
567                remote_file_spec = lldb.SBFileSpec(remote_path, False)
568                err = lldb.remote_platform.Install(lldb.SBFileSpec(
569                    inferior_exe_path, True), remote_file_spec)
570                if err.Fail():
571                    raise Exception(
572                        "remote_platform.Install('%s', '%s') failed: %s" %
573                        (inferior_exe_path, remote_path, err))
574                inferior_exe_path = remote_path
575
576            launch_args = [inferior_exe_path]
577            if inferior_args:
578                launch_args.extend(inferior_args)
579
580        # Launch the debug monitor stub, attaching to the inferior.
581        server = self.connect_to_debug_monitor(attach_pid=attach_pid)
582        self.assertIsNotNone(server)
583
584        # Build the expected protocol stream
585        self.add_no_ack_remote_stream()
586        if inferior_env:
587            for name, value in inferior_env.items():
588                self.add_set_environment_packets(name, value)
589        if self._inferior_startup == self._STARTUP_LAUNCH:
590            self.add_verified_launch_packets(launch_args)
591
592        return {"inferior": inferior, "server": server}
593
594    def expect_socket_recv(
595            self,
596            sock,
597            expected_content_regex,
598            timeout_seconds):
599        response = ""
600        timeout_time = time.time() + timeout_seconds
601
602        while not expected_content_regex.match(
603                response) and time.time() < timeout_time:
604            can_read, _, _ = select.select([sock], [], [], timeout_seconds)
605            if can_read and sock in can_read:
606                recv_bytes = sock.recv(4096)
607                if recv_bytes:
608                    response += seven.bitcast_to_string(recv_bytes)
609
610        self.assertTrue(expected_content_regex.match(response))
611
612    def expect_socket_send(self, sock, content, timeout_seconds):
613        request_bytes_remaining = content
614        timeout_time = time.time() + timeout_seconds
615
616        while len(request_bytes_remaining) > 0 and time.time() < timeout_time:
617            _, can_write, _ = select.select([], [sock], [], timeout_seconds)
618            if can_write and sock in can_write:
619                written_byte_count = sock.send(request_bytes_remaining.encode())
620                request_bytes_remaining = request_bytes_remaining[
621                    written_byte_count:]
622        self.assertEqual(len(request_bytes_remaining), 0)
623
624    def do_handshake(self, stub_socket, timeout_seconds=5):
625        # Write the ack.
626        self.expect_socket_send(stub_socket, "+", timeout_seconds)
627
628        # Send the start no ack mode packet.
629        NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0"
630        bytes_sent = stub_socket.send(NO_ACK_MODE_REQUEST.encode())
631        self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST))
632
633        # Receive the ack and "OK"
634        self.expect_socket_recv(stub_socket, re.compile(
635            r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds)
636
637        # Send the final ack.
638        self.expect_socket_send(stub_socket, "+", timeout_seconds)
639
640    def add_no_ack_remote_stream(self):
641        self.test_sequence.add_log_lines(
642            ["read packet: +",
643             "read packet: $QStartNoAckMode#b0",
644             "send packet: +",
645             "send packet: $OK#9a",
646             "read packet: +"],
647            True)
648
649    def add_verified_launch_packets(self, launch_args):
650        self.test_sequence.add_log_lines(
651            ["read packet: %s" % build_gdbremote_A_packet(launch_args),
652             "send packet: $OK#00",
653             "read packet: $qLaunchSuccess#a5",
654             "send packet: $OK#00"],
655            True)
656
657    def add_thread_suffix_request_packets(self):
658        self.test_sequence.add_log_lines(
659            ["read packet: $QThreadSuffixSupported#e4",
660             "send packet: $OK#00",
661             ], True)
662
663    def add_process_info_collection_packets(self):
664        self.test_sequence.add_log_lines(
665            ["read packet: $qProcessInfo#dc",
666             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}],
667            True)
668
669    def add_set_environment_packets(self, name, value):
670        self.test_sequence.add_log_lines(
671            ["read packet: $QEnvironment:" + name + "=" + value + "#00",
672             "send packet: $OK#00",
673             ], True)
674
675    _KNOWN_PROCESS_INFO_KEYS = [
676        "pid",
677        "parent-pid",
678        "real-uid",
679        "real-gid",
680        "effective-uid",
681        "effective-gid",
682        "cputype",
683        "cpusubtype",
684        "ostype",
685        "triple",
686        "vendor",
687        "endian",
688        "elf_abi",
689        "ptrsize"
690    ]
691
692    def parse_process_info_response(self, context):
693        # Ensure we have a process info response.
694        self.assertIsNotNone(context)
695        process_info_raw = context.get("process_info_raw")
696        self.assertIsNotNone(process_info_raw)
697
698        # Pull out key:value; pairs.
699        process_info_dict = {
700            match.group(1): match.group(2) for match in re.finditer(
701                r"([^:]+):([^;]+);", process_info_raw)}
702
703        # Validate keys are known.
704        for (key, val) in list(process_info_dict.items()):
705            self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
706            self.assertIsNotNone(val)
707
708        return process_info_dict
709
710    def add_register_info_collection_packets(self):
711        self.test_sequence.add_log_lines(
712            [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True,
713                "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
714                "save_key": "reg_info_responses"}],
715            True)
716
717    def parse_register_info_packets(self, context):
718        """Return an array of register info dictionaries, one per register info."""
719        reg_info_responses = context.get("reg_info_responses")
720        self.assertIsNotNone(reg_info_responses)
721
722        # Parse register infos.
723        return [parse_reg_info_response(reg_info_response)
724                for reg_info_response in reg_info_responses]
725
726    def expect_gdbremote_sequence(self, timeout_seconds=None):
727        if not timeout_seconds:
728            timeout_seconds = self._TIMEOUT_SECONDS
729        return expect_lldb_gdbserver_replay(
730            self,
731            self.sock,
732            self.test_sequence,
733            self._pump_queues,
734            timeout_seconds,
735            self.logger)
736
737    _KNOWN_REGINFO_KEYS = [
738        "name",
739        "alt-name",
740        "bitsize",
741        "offset",
742        "encoding",
743        "format",
744        "set",
745        "gcc",
746        "ehframe",
747        "dwarf",
748        "generic",
749        "container-regs",
750        "invalidate-regs",
751        "dynamic_size_dwarf_expr_bytes",
752        "dynamic_size_dwarf_len"
753    ]
754
755    def assert_valid_reg_info(self, reg_info):
756        # Assert we know about all the reginfo keys parsed.
757        for key in reg_info:
758            self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
759
760        # Check the bare-minimum expected set of register info keys.
761        self.assertTrue("name" in reg_info)
762        self.assertTrue("bitsize" in reg_info)
763        self.assertTrue("offset" in reg_info)
764        self.assertTrue("encoding" in reg_info)
765        self.assertTrue("format" in reg_info)
766
767    def find_pc_reg_info(self, reg_infos):
768        lldb_reg_index = 0
769        for reg_info in reg_infos:
770            if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
771                return (lldb_reg_index, reg_info)
772            lldb_reg_index += 1
773
774        return (None, None)
775
776    def add_lldb_register_index(self, reg_infos):
777        """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
778
779        We'll use this when we want to call packets like P/p with a register index but do so
780        on only a subset of the full register info set.
781        """
782        self.assertIsNotNone(reg_infos)
783
784        reg_index = 0
785        for reg_info in reg_infos:
786            reg_info["lldb_register_index"] = reg_index
787            reg_index += 1
788
789    def add_query_memory_region_packets(self, address):
790        self.test_sequence.add_log_lines(
791            ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
792             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}],
793            True)
794
795    def parse_key_val_dict(self, key_val_text, allow_dupes=True):
796        self.assertIsNotNone(key_val_text)
797        kv_dict = {}
798        for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
799            key = match.group(1)
800            val = match.group(2)
801            if key in kv_dict:
802                if allow_dupes:
803                    if isinstance(kv_dict[key], list):
804                        kv_dict[key].append(val)
805                    else:
806                        # Promote to list
807                        kv_dict[key] = [kv_dict[key], val]
808                else:
809                    self.fail(
810                        "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
811                            key, val, key_val_text, kv_dict))
812            else:
813                kv_dict[key] = val
814        return kv_dict
815
816    def parse_memory_region_packet(self, context):
817        # Ensure we have a context.
818        self.assertIsNotNone(context.get("memory_region_response"))
819
820        # Pull out key:value; pairs.
821        mem_region_dict = self.parse_key_val_dict(
822            context.get("memory_region_response"))
823
824        # Validate keys are known.
825        for (key, val) in list(mem_region_dict.items()):
826            self.assertTrue(
827                key in [
828                    "start",
829                    "size",
830                    "permissions",
831                    "name",
832                    "error"])
833            self.assertIsNotNone(val)
834
835        mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", ""))
836        # Return the dictionary of key-value pairs for the memory region.
837        return mem_region_dict
838
839    def assert_address_within_memory_region(
840            self, test_address, mem_region_dict):
841        self.assertIsNotNone(mem_region_dict)
842        self.assertTrue("start" in mem_region_dict)
843        self.assertTrue("size" in mem_region_dict)
844
845        range_start = int(mem_region_dict["start"], 16)
846        range_size = int(mem_region_dict["size"], 16)
847        range_end = range_start + range_size
848
849        if test_address < range_start:
850            self.fail(
851                "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
852                    test_address,
853                    range_start,
854                    range_end,
855                    range_size))
856        elif test_address >= range_end:
857            self.fail(
858                "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
859                    test_address,
860                    range_start,
861                    range_end,
862                    range_size))
863
864    def add_threadinfo_collection_packets(self):
865        self.test_sequence.add_log_lines(
866            [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo",
867                "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
868                "save_key": "threadinfo_responses"}],
869            True)
870
871    def parse_threadinfo_packets(self, context):
872        """Return an array of thread ids (decimal ints), one per thread."""
873        threadinfo_responses = context.get("threadinfo_responses")
874        self.assertIsNotNone(threadinfo_responses)
875
876        thread_ids = []
877        for threadinfo_response in threadinfo_responses:
878            new_thread_infos = parse_threadinfo_response(threadinfo_response)
879            thread_ids.extend(new_thread_infos)
880        return thread_ids
881
882    def wait_for_thread_count(self, thread_count, timeout_seconds=3):
883        start_time = time.time()
884        timeout_time = start_time + timeout_seconds
885
886        actual_thread_count = 0
887        while actual_thread_count < thread_count:
888            self.reset_test_sequence()
889            self.add_threadinfo_collection_packets()
890
891            context = self.expect_gdbremote_sequence()
892            self.assertIsNotNone(context)
893
894            threads = self.parse_threadinfo_packets(context)
895            self.assertIsNotNone(threads)
896
897            actual_thread_count = len(threads)
898
899            if time.time() > timeout_time:
900                raise Exception(
901                    'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
902                        timeout_seconds, thread_count, actual_thread_count))
903
904        return threads
905
906    def add_set_breakpoint_packets(
907            self,
908            address,
909            z_packet_type=0,
910            do_continue=True,
911            breakpoint_kind=1):
912        self.test_sequence.add_log_lines(
913            [  # Set the breakpoint.
914                "read packet: $Z{2},{0:x},{1}#00".format(
915                    address, breakpoint_kind, z_packet_type),
916                # Verify the stub could set it.
917                "send packet: $OK#00",
918            ], True)
919
920        if (do_continue):
921            self.test_sequence.add_log_lines(
922                [  # Continue the inferior.
923                    "read packet: $c#63",
924                    # Expect a breakpoint stop report.
925                    {"direction": "send",
926                     "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
927                     "capture": {1: "stop_signo",
928                                 2: "stop_thread_id"}},
929                ], True)
930
931    def add_remove_breakpoint_packets(
932            self,
933            address,
934            z_packet_type=0,
935            breakpoint_kind=1):
936        self.test_sequence.add_log_lines(
937            [  # Remove the breakpoint.
938                "read packet: $z{2},{0:x},{1}#00".format(
939                    address, breakpoint_kind, z_packet_type),
940                # Verify the stub could unset it.
941                "send packet: $OK#00",
942            ], True)
943
944    def add_qSupported_packets(self):
945        self.test_sequence.add_log_lines(
946            ["read packet: $qSupported#00",
947             {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}},
948             ], True)
949
950    _KNOWN_QSUPPORTED_STUB_FEATURES = [
951        "augmented-libraries-svr4-read",
952        "PacketSize",
953        "QStartNoAckMode",
954        "QThreadSuffixSupported",
955        "QListThreadsInStopReply",
956        "qXfer:auxv:read",
957        "qXfer:libraries:read",
958        "qXfer:libraries-svr4:read",
959        "qXfer:features:read",
960        "qEcho",
961        "QPassSignals"
962    ]
963
964    def parse_qSupported_response(self, context):
965        self.assertIsNotNone(context)
966
967        raw_response = context.get("qSupported_response")
968        self.assertIsNotNone(raw_response)
969
970        # For values with key=val, the dict key and vals are set as expected.  For feature+, feature- and feature?, the
971        # +,-,? is stripped from the key and set as the value.
972        supported_dict = {}
973        for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
974            key = match.group(1)
975            val = match.group(3)
976
977            # key=val: store as is
978            if val and len(val) > 0:
979                supported_dict[key] = val
980            else:
981                if len(key) < 2:
982                    raise Exception(
983                        "singular stub feature is too short: must be stub_feature{+,-,?}")
984                supported_type = key[-1]
985                key = key[:-1]
986                if not supported_type in ["+", "-", "?"]:
987                    raise Exception(
988                        "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
989                supported_dict[key] = supported_type
990            # Ensure we know the supported element
991            if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES:
992                raise Exception(
993                    "unknown qSupported stub feature reported: %s" %
994                    key)
995
996        return supported_dict
997
998    def run_process_then_stop(self, run_seconds=1):
999        # Tell the stub to continue.
1000        self.test_sequence.add_log_lines(
1001            ["read packet: $vCont;c#a8"],
1002            True)
1003        context = self.expect_gdbremote_sequence()
1004
1005        # Wait for run_seconds.
1006        time.sleep(run_seconds)
1007
1008        # Send an interrupt, capture a T response.
1009        self.reset_test_sequence()
1010        self.test_sequence.add_log_lines(
1011            ["read packet: {}".format(chr(3)),
1012             {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}],
1013            True)
1014        context = self.expect_gdbremote_sequence()
1015        self.assertIsNotNone(context)
1016        self.assertIsNotNone(context.get("stop_result"))
1017
1018        return context
1019
1020    def continue_process_and_wait_for_stop(self):
1021        self.test_sequence.add_log_lines(
1022            [
1023                "read packet: $vCont;c#a8",
1024                {
1025                    "direction": "send",
1026                    "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
1027                    "capture": {1: "stop_signo", 2: "stop_key_val_text"},
1028                },
1029            ],
1030            True,
1031        )
1032        context = self.expect_gdbremote_sequence()
1033        self.assertIsNotNone(context)
1034        return self.parse_interrupt_packets(context)
1035
1036    def select_modifiable_register(self, reg_infos):
1037        """Find a register that can be read/written freely."""
1038        PREFERRED_REGISTER_NAMES = set(["rax", ])
1039
1040        # First check for the first register from the preferred register name
1041        # set.
1042        alternative_register_index = None
1043
1044        self.assertIsNotNone(reg_infos)
1045        for reg_info in reg_infos:
1046            if ("name" in reg_info) and (
1047                    reg_info["name"] in PREFERRED_REGISTER_NAMES):
1048                # We found a preferred register.  Use it.
1049                return reg_info["lldb_register_index"]
1050            if ("generic" in reg_info) and (reg_info["generic"] == "fp" or
1051                    reg_info["generic"] == "arg1"):
1052                # A frame pointer or first arg register will do as a
1053                # register to modify temporarily.
1054                alternative_register_index = reg_info["lldb_register_index"]
1055
1056        # We didn't find a preferred register.  Return whatever alternative register
1057        # we found, if any.
1058        return alternative_register_index
1059
1060    def extract_registers_from_stop_notification(self, stop_key_vals_text):
1061        self.assertIsNotNone(stop_key_vals_text)
1062        kv_dict = self.parse_key_val_dict(stop_key_vals_text)
1063
1064        registers = {}
1065        for (key, val) in list(kv_dict.items()):
1066            if re.match(r"^[0-9a-fA-F]+$", key):
1067                registers[int(key, 16)] = val
1068        return registers
1069
1070    def gather_register_infos(self):
1071        self.reset_test_sequence()
1072        self.add_register_info_collection_packets()
1073
1074        context = self.expect_gdbremote_sequence()
1075        self.assertIsNotNone(context)
1076
1077        reg_infos = self.parse_register_info_packets(context)
1078        self.assertIsNotNone(reg_infos)
1079        self.add_lldb_register_index(reg_infos)
1080
1081        return reg_infos
1082
1083    def find_generic_register_with_name(self, reg_infos, generic_name):
1084        self.assertIsNotNone(reg_infos)
1085        for reg_info in reg_infos:
1086            if ("generic" in reg_info) and (
1087                    reg_info["generic"] == generic_name):
1088                return reg_info
1089        return None
1090
1091    def decode_gdbremote_binary(self, encoded_bytes):
1092        decoded_bytes = ""
1093        i = 0
1094        while i < len(encoded_bytes):
1095            if encoded_bytes[i] == "}":
1096                # Handle escaped char.
1097                self.assertTrue(i + 1 < len(encoded_bytes))
1098                decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20)
1099                i += 2
1100            elif encoded_bytes[i] == "*":
1101                # Handle run length encoding.
1102                self.assertTrue(len(decoded_bytes) > 0)
1103                self.assertTrue(i + 1 < len(encoded_bytes))
1104                repeat_count = ord(encoded_bytes[i + 1]) - 29
1105                decoded_bytes += decoded_bytes[-1] * repeat_count
1106                i += 2
1107            else:
1108                decoded_bytes += encoded_bytes[i]
1109                i += 1
1110        return decoded_bytes
1111
1112    def build_auxv_dict(self, endian, word_size, auxv_data):
1113        self.assertIsNotNone(endian)
1114        self.assertIsNotNone(word_size)
1115        self.assertIsNotNone(auxv_data)
1116
1117        auxv_dict = {}
1118
1119        # PowerPC64le's auxvec has a special key that must be ignored.
1120        # This special key may be used multiple times, resulting in
1121        # multiple key/value pairs with the same key, which would otherwise
1122        # break this test check for repeated keys.
1123        #
1124        # AT_IGNOREPPC = 22
1125        ignored_keys_for_arch = { 'powerpc64le' : [22] }
1126        arch = self.getArchitecture()
1127        ignore_keys = None
1128        if arch in ignored_keys_for_arch:
1129            ignore_keys = ignored_keys_for_arch[arch]
1130
1131        while len(auxv_data) > 0:
1132            # Chop off key.
1133            raw_key = auxv_data[:word_size]
1134            auxv_data = auxv_data[word_size:]
1135
1136            # Chop of value.
1137            raw_value = auxv_data[:word_size]
1138            auxv_data = auxv_data[word_size:]
1139
1140            # Convert raw text from target endian.
1141            key = unpack_endian_binary_string(endian, raw_key)
1142            value = unpack_endian_binary_string(endian, raw_value)
1143
1144            if ignore_keys and key in ignore_keys:
1145                continue
1146
1147            # Handle ending entry.
1148            if key == 0:
1149                self.assertEqual(value, 0)
1150                return auxv_dict
1151
1152            # The key should not already be present.
1153            self.assertFalse(key in auxv_dict)
1154            auxv_dict[key] = value
1155
1156        self.fail(
1157            "should not reach here - implies required double zero entry not found")
1158        return auxv_dict
1159
1160    def read_binary_data_in_chunks(self, command_prefix, chunk_length):
1161        """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
1162        offset = 0
1163        done = False
1164        decoded_data = ""
1165
1166        while not done:
1167            # Grab the next iteration of data.
1168            self.reset_test_sequence()
1169            self.test_sequence.add_log_lines(
1170                [
1171                    "read packet: ${}{:x},{:x}:#00".format(
1172                        command_prefix,
1173                        offset,
1174                        chunk_length),
1175                    {
1176                        "direction": "send",
1177                        "regex": re.compile(
1178                            r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
1179                            re.MULTILINE | re.DOTALL),
1180                        "capture": {
1181                            1: "response_type",
1182                            2: "content_raw"}}],
1183                True)
1184
1185            context = self.expect_gdbremote_sequence()
1186            self.assertIsNotNone(context)
1187
1188            response_type = context.get("response_type")
1189            self.assertIsNotNone(response_type)
1190            self.assertTrue(response_type in ["l", "m"])
1191
1192            # Move offset along.
1193            offset += chunk_length
1194
1195            # Figure out if we're done.  We're done if the response type is l.
1196            done = response_type == "l"
1197
1198            # Decode binary data.
1199            content_raw = context.get("content_raw")
1200            if content_raw and len(content_raw) > 0:
1201                self.assertIsNotNone(content_raw)
1202                decoded_data += self.decode_gdbremote_binary(content_raw)
1203        return decoded_data
1204
1205    def add_interrupt_packets(self):
1206        self.test_sequence.add_log_lines([
1207            # Send the intterupt.
1208            "read packet: {}".format(chr(3)),
1209            # And wait for the stop notification.
1210            {"direction": "send",
1211             "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
1212             "capture": {1: "stop_signo",
1213                         2: "stop_key_val_text"}},
1214        ], True)
1215
1216    def parse_interrupt_packets(self, context):
1217        self.assertIsNotNone(context.get("stop_signo"))
1218        self.assertIsNotNone(context.get("stop_key_val_text"))
1219        return (int(context["stop_signo"], 16), self.parse_key_val_dict(
1220            context["stop_key_val_text"]))
1221
1222    def add_QSaveRegisterState_packets(self, thread_id):
1223        if thread_id:
1224            # Use the thread suffix form.
1225            request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(
1226                thread_id)
1227        else:
1228            request = "read packet: $QSaveRegisterState#00"
1229
1230        self.test_sequence.add_log_lines([request,
1231                                          {"direction": "send",
1232                                           "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$",
1233                                           "capture": {1: "save_response"}},
1234                                          ],
1235                                         True)
1236
1237    def parse_QSaveRegisterState_response(self, context):
1238        self.assertIsNotNone(context)
1239
1240        save_response = context.get("save_response")
1241        self.assertIsNotNone(save_response)
1242
1243        if len(save_response) < 1 or save_response[0] == "E":
1244            # error received
1245            return (False, None)
1246        else:
1247            return (True, int(save_response))
1248
1249    def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
1250        if thread_id:
1251            # Use the thread suffix form.
1252            request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
1253                save_id, thread_id)
1254        else:
1255            request = "read packet: $QRestoreRegisterState:{}#00".format(
1256                save_id)
1257
1258        self.test_sequence.add_log_lines([
1259            request,
1260            "send packet: $OK#00"
1261        ], True)
1262
1263    def flip_all_bits_in_each_register_value(
1264            self, reg_infos, endian, thread_id=None):
1265        self.assertIsNotNone(reg_infos)
1266
1267        successful_writes = 0
1268        failed_writes = 0
1269
1270        for reg_info in reg_infos:
1271            # Use the lldb register index added to the reg info.  We're not necessarily
1272            # working off a full set of register infos, so an inferred register
1273            # index could be wrong.
1274            reg_index = reg_info["lldb_register_index"]
1275            self.assertIsNotNone(reg_index)
1276
1277            reg_byte_size = int(reg_info["bitsize"]) // 8
1278            self.assertTrue(reg_byte_size > 0)
1279
1280            # Handle thread suffix.
1281            if thread_id:
1282                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1283                    reg_index, thread_id)
1284            else:
1285                p_request = "read packet: $p{:x}#00".format(reg_index)
1286
1287            # Read the existing value.
1288            self.reset_test_sequence()
1289            self.test_sequence.add_log_lines([
1290                p_request,
1291                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1292            ], True)
1293            context = self.expect_gdbremote_sequence()
1294            self.assertIsNotNone(context)
1295
1296            # Verify the response length.
1297            p_response = context.get("p_response")
1298            self.assertIsNotNone(p_response)
1299            initial_reg_value = unpack_register_hex_unsigned(
1300                endian, p_response)
1301
1302            # Flip the value by xoring with all 1s
1303            all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8)
1304            flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
1305            # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1306
1307            # Handle thread suffix for P.
1308            if thread_id:
1309                P_request = "read packet: $P{:x}={};thread:{:x}#00".format(
1310                    reg_index, pack_register_hex(
1311                        endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
1312            else:
1313                P_request = "read packet: $P{:x}={}#00".format(
1314                    reg_index, pack_register_hex(
1315                        endian, flipped_bits_int, byte_size=reg_byte_size))
1316
1317            # Write the flipped value to the register.
1318            self.reset_test_sequence()
1319            self.test_sequence.add_log_lines([P_request,
1320                                              {"direction": "send",
1321                                               "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
1322                                               "capture": {1: "P_response"}},
1323                                              ],
1324                                             True)
1325            context = self.expect_gdbremote_sequence()
1326            self.assertIsNotNone(context)
1327
1328            # Determine if the write succeeded.  There are a handful of registers that can fail, or partially fail
1329            # (e.g. flags, segment selectors, etc.) due to register value restrictions.  Don't worry about them
1330            # all flipping perfectly.
1331            P_response = context.get("P_response")
1332            self.assertIsNotNone(P_response)
1333            if P_response == "OK":
1334                successful_writes += 1
1335            else:
1336                failed_writes += 1
1337                # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1338
1339            # Read back the register value, ensure it matches the flipped
1340            # value.
1341            if P_response == "OK":
1342                self.reset_test_sequence()
1343                self.test_sequence.add_log_lines([
1344                    p_request,
1345                    {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1346                ], True)
1347                context = self.expect_gdbremote_sequence()
1348                self.assertIsNotNone(context)
1349
1350                verify_p_response_raw = context.get("p_response")
1351                self.assertIsNotNone(verify_p_response_raw)
1352                verify_bits = unpack_register_hex_unsigned(
1353                    endian, verify_p_response_raw)
1354
1355                if verify_bits != flipped_bits_int:
1356                    # Some registers, like mxcsrmask and others, will permute what's written.  Adjust succeed/fail counts.
1357                    # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1358                    successful_writes -= 1
1359                    failed_writes += 1
1360
1361        return (successful_writes, failed_writes)
1362
1363    def is_bit_flippable_register(self, reg_info):
1364        if not reg_info:
1365            return False
1366        if not "set" in reg_info:
1367            return False
1368        if reg_info["set"] != "General Purpose Registers":
1369            return False
1370        if ("container-regs" in reg_info) and (
1371                len(reg_info["container-regs"]) > 0):
1372            # Don't try to bit flip registers contained in another register.
1373            return False
1374        if re.match("^.s$", reg_info["name"]):
1375            # This is a 2-letter register name that ends in "s", like a segment register.
1376            # Don't try to bit flip these.
1377            return False
1378        if re.match("^(c|)psr$", reg_info["name"]):
1379            # This is an ARM program status register; don't flip it.
1380            return False
1381        # Okay, this looks fine-enough.
1382        return True
1383
1384    def read_register_values(self, reg_infos, endian, thread_id=None):
1385        self.assertIsNotNone(reg_infos)
1386        values = {}
1387
1388        for reg_info in reg_infos:
1389            # We append a register index when load reg infos so we can work
1390            # with subsets.
1391            reg_index = reg_info.get("lldb_register_index")
1392            self.assertIsNotNone(reg_index)
1393
1394            # Handle thread suffix.
1395            if thread_id:
1396                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1397                    reg_index, thread_id)
1398            else:
1399                p_request = "read packet: $p{:x}#00".format(reg_index)
1400
1401            # Read it with p.
1402            self.reset_test_sequence()
1403            self.test_sequence.add_log_lines([
1404                p_request,
1405                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1406            ], True)
1407            context = self.expect_gdbremote_sequence()
1408            self.assertIsNotNone(context)
1409
1410            # Convert value from target endian to integral.
1411            p_response = context.get("p_response")
1412            self.assertIsNotNone(p_response)
1413            self.assertTrue(len(p_response) > 0)
1414            self.assertFalse(p_response[0] == "E")
1415
1416            values[reg_index] = unpack_register_hex_unsigned(
1417                endian, p_response)
1418
1419        return values
1420
1421    def add_vCont_query_packets(self):
1422        self.test_sequence.add_log_lines(["read packet: $vCont?#49",
1423                                          {"direction": "send",
1424                                           "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
1425                                           "capture": {2: "vCont_query_response"}},
1426                                          ],
1427                                         True)
1428
1429    def parse_vCont_query_response(self, context):
1430        self.assertIsNotNone(context)
1431        vCont_query_response = context.get("vCont_query_response")
1432
1433        # Handle case of no vCont support at all - in which case the capture
1434        # group will be none or zero length.
1435        if not vCont_query_response or len(vCont_query_response) == 0:
1436            return {}
1437
1438        return {key: 1 for key in vCont_query_response.split(
1439            ";") if key and len(key) > 0}
1440
1441    def count_single_steps_until_true(
1442            self,
1443            thread_id,
1444            predicate,
1445            args,
1446            max_step_count=100,
1447            use_Hc_packet=True,
1448            step_instruction="s"):
1449        """Used by single step test that appears in a few different contexts."""
1450        single_step_count = 0
1451
1452        while single_step_count < max_step_count:
1453            self.assertIsNotNone(thread_id)
1454
1455            # Build the packet for the single step instruction.  We replace
1456            # {thread}, if present, with the thread_id.
1457            step_packet = "read packet: ${}#00".format(
1458                re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
1459            # print("\nstep_packet created: {}\n".format(step_packet))
1460
1461            # Single step.
1462            self.reset_test_sequence()
1463            if use_Hc_packet:
1464                self.test_sequence.add_log_lines(
1465                    [  # Set the continue thread.
1466                        "read packet: $Hc{0:x}#00".format(thread_id),
1467                        "send packet: $OK#00",
1468                    ], True)
1469            self.test_sequence.add_log_lines([
1470                # Single step.
1471                step_packet,
1472                # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1473                # Expect a breakpoint stop report.
1474                {"direction": "send",
1475                 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1476                 "capture": {1: "stop_signo",
1477                             2: "stop_thread_id"}},
1478            ], True)
1479            context = self.expect_gdbremote_sequence()
1480            self.assertIsNotNone(context)
1481            self.assertIsNotNone(context.get("stop_signo"))
1482            self.assertEqual(int(context.get("stop_signo"), 16),
1483                             lldbutil.get_signal_number('SIGTRAP'))
1484
1485            single_step_count += 1
1486
1487            # See if the predicate is true.  If so, we're done.
1488            if predicate(args):
1489                return (True, single_step_count)
1490
1491        # The predicate didn't return true within the runaway step count.
1492        return (False, single_step_count)
1493
1494    def g_c1_c2_contents_are(self, args):
1495        """Used by single step test that appears in a few different contexts."""
1496        g_c1_address = args["g_c1_address"]
1497        g_c2_address = args["g_c2_address"]
1498        expected_g_c1 = args["expected_g_c1"]
1499        expected_g_c2 = args["expected_g_c2"]
1500
1501        # Read g_c1 and g_c2 contents.
1502        self.reset_test_sequence()
1503        self.test_sequence.add_log_lines(
1504            ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
1505             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}},
1506             "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
1507             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}],
1508            True)
1509
1510        # Run the packet stream.
1511        context = self.expect_gdbremote_sequence()
1512        self.assertIsNotNone(context)
1513
1514        # Check if what we read from inferior memory is what we are expecting.
1515        self.assertIsNotNone(context.get("g_c1_contents"))
1516        self.assertIsNotNone(context.get("g_c2_contents"))
1517
1518        return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and (
1519            seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2)
1520
1521    def single_step_only_steps_one_instruction(
1522            self, use_Hc_packet=True, step_instruction="s"):
1523        """Used by single step test that appears in a few different contexts."""
1524        # Start up the inferior.
1525        procs = self.prep_debug_monitor_and_inferior(
1526            inferior_args=[
1527                "get-code-address-hex:swap_chars",
1528                "get-data-address-hex:g_c1",
1529                "get-data-address-hex:g_c2",
1530                "sleep:1",
1531                "call-function:swap_chars",
1532                "sleep:5"])
1533
1534        # Run the process
1535        self.test_sequence.add_log_lines(
1536            [  # Start running after initial stop.
1537                "read packet: $c#63",
1538                # Match output line that prints the memory address of the function call entry point.
1539                # Note we require launch-only testing so we can get inferior otuput.
1540                {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
1541                 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}},
1542                # Now stop the inferior.
1543                "read packet: {}".format(chr(3)),
1544                # And wait for the stop notification.
1545                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
1546            True)
1547
1548        # Run the packet stream.
1549        context = self.expect_gdbremote_sequence()
1550        self.assertIsNotNone(context)
1551
1552        # Grab the main thread id.
1553        self.assertIsNotNone(context.get("stop_thread_id"))
1554        main_thread_id = int(context.get("stop_thread_id"), 16)
1555
1556        # Grab the function address.
1557        self.assertIsNotNone(context.get("function_address"))
1558        function_address = int(context.get("function_address"), 16)
1559
1560        # Grab the data addresses.
1561        self.assertIsNotNone(context.get("g_c1_address"))
1562        g_c1_address = int(context.get("g_c1_address"), 16)
1563
1564        self.assertIsNotNone(context.get("g_c2_address"))
1565        g_c2_address = int(context.get("g_c2_address"), 16)
1566
1567        # Set a breakpoint at the given address.
1568        if self.getArchitecture() == "arm":
1569            # TODO: Handle case when setting breakpoint in thumb code
1570            BREAKPOINT_KIND = 4
1571        else:
1572            BREAKPOINT_KIND = 1
1573        self.reset_test_sequence()
1574        self.add_set_breakpoint_packets(
1575            function_address,
1576            do_continue=True,
1577            breakpoint_kind=BREAKPOINT_KIND)
1578        context = self.expect_gdbremote_sequence()
1579        self.assertIsNotNone(context)
1580
1581        # Remove the breakpoint.
1582        self.reset_test_sequence()
1583        self.add_remove_breakpoint_packets(
1584            function_address, breakpoint_kind=BREAKPOINT_KIND)
1585        context = self.expect_gdbremote_sequence()
1586        self.assertIsNotNone(context)
1587
1588        # Verify g_c1 and g_c2 match expected initial state.
1589        args = {}
1590        args["g_c1_address"] = g_c1_address
1591        args["g_c2_address"] = g_c2_address
1592        args["expected_g_c1"] = "0"
1593        args["expected_g_c2"] = "1"
1594
1595        self.assertTrue(self.g_c1_c2_contents_are(args))
1596
1597        # Verify we take only a small number of steps to hit the first state.
1598        # Might need to work through function entry prologue code.
1599        args["expected_g_c1"] = "1"
1600        args["expected_g_c2"] = "1"
1601        (state_reached,
1602         step_count) = self.count_single_steps_until_true(main_thread_id,
1603                                                          self.g_c1_c2_contents_are,
1604                                                          args,
1605                                                          max_step_count=25,
1606                                                          use_Hc_packet=use_Hc_packet,
1607                                                          step_instruction=step_instruction)
1608        self.assertTrue(state_reached)
1609
1610        # Verify we hit the next state.
1611        args["expected_g_c1"] = "1"
1612        args["expected_g_c2"] = "0"
1613        (state_reached,
1614         step_count) = self.count_single_steps_until_true(main_thread_id,
1615                                                          self.g_c1_c2_contents_are,
1616                                                          args,
1617                                                          max_step_count=5,
1618                                                          use_Hc_packet=use_Hc_packet,
1619                                                          step_instruction=step_instruction)
1620        self.assertTrue(state_reached)
1621        expected_step_count = 1
1622        arch = self.getArchitecture()
1623
1624        # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
1625        # of variable value
1626        if re.match("mips", arch):
1627            expected_step_count = 3
1628        # S390X requires "2" (LARL, MVI) machine instructions for updation of
1629        # variable value
1630        if re.match("s390x", arch):
1631            expected_step_count = 2
1632        self.assertEqual(step_count, expected_step_count)
1633
1634        # Verify we hit the next state.
1635        args["expected_g_c1"] = "0"
1636        args["expected_g_c2"] = "0"
1637        (state_reached,
1638         step_count) = self.count_single_steps_until_true(main_thread_id,
1639                                                          self.g_c1_c2_contents_are,
1640                                                          args,
1641                                                          max_step_count=5,
1642                                                          use_Hc_packet=use_Hc_packet,
1643                                                          step_instruction=step_instruction)
1644        self.assertTrue(state_reached)
1645        self.assertEqual(step_count, expected_step_count)
1646
1647        # Verify we hit the next state.
1648        args["expected_g_c1"] = "0"
1649        args["expected_g_c2"] = "1"
1650        (state_reached,
1651         step_count) = self.count_single_steps_until_true(main_thread_id,
1652                                                          self.g_c1_c2_contents_are,
1653                                                          args,
1654                                                          max_step_count=5,
1655                                                          use_Hc_packet=use_Hc_packet,
1656                                                          step_instruction=step_instruction)
1657        self.assertTrue(state_reached)
1658        self.assertEqual(step_count, expected_step_count)
1659
1660    def maybe_strict_output_regex(self, regex):
1661        return '.*' + regex + \
1662            '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$'
1663
1664    def install_and_create_launch_args(self):
1665        exe_path = self.getBuildArtifact("a.out")
1666        if not lldb.remote_platform:
1667            return [exe_path]
1668        remote_path = lldbutil.append_to_process_working_directory(self,
1669            os.path.basename(exe_path))
1670        remote_file_spec = lldb.SBFileSpec(remote_path, False)
1671        err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True),
1672                                           remote_file_spec)
1673        if err.Fail():
1674            raise Exception("remote_platform.Install('%s', '%s') failed: %s" %
1675                            (exe_path, remote_path, err))
1676        return [remote_path]
1677