1"""
2Base class for gdb-remote test cases.
3"""
4
5from __future__ import print_function
6
7
8import errno
9import os
10import os.path
11import platform
12import random
13import re
14import select
15import signal
16import socket
17import subprocess
18import sys
19import tempfile
20import time
21from lldbsuite.test import configuration
22from lldbsuite.test.lldbtest import *
23from lldbgdbserverutils import *
24import logging
25
26
27class _ConnectionRefused(IOError):
28    pass
29
30
31class GdbRemoteTestCaseBase(TestBase):
32
33    NO_DEBUG_INFO_TESTCASE = True
34
35    _TIMEOUT_SECONDS = 7
36
37    _GDBREMOTE_KILL_PACKET = "$k#6b"
38
39    # Start the inferior separately, attach to the inferior on the stub
40    # command line.
41    _STARTUP_ATTACH = "attach"
42    # Start the inferior separately, start the stub without attaching, allow
43    # the test to attach to the inferior however it wants (e.g. $vAttach;pid).
44    _STARTUP_ATTACH_MANUALLY = "attach_manually"
45    # Start the stub, and launch the inferior with an $A packet via the
46    # initial packet stream.
47    _STARTUP_LAUNCH = "launch"
48
49    # GDB Signal numbers that are not target-specific used for common
50    # exceptions
51    TARGET_EXC_BAD_ACCESS = 0x91
52    TARGET_EXC_BAD_INSTRUCTION = 0x92
53    TARGET_EXC_ARITHMETIC = 0x93
54    TARGET_EXC_EMULATION = 0x94
55    TARGET_EXC_SOFTWARE = 0x95
56    TARGET_EXC_BREAKPOINT = 0x96
57
58    _verbose_log_handler = None
59    _log_formatter = logging.Formatter(
60        fmt='%(asctime)-15s %(levelname)-8s %(message)s')
61
62    def setUpBaseLogging(self):
63        self.logger = logging.getLogger(__name__)
64
65        if len(self.logger.handlers) > 0:
66            return  # We have set up this handler already
67
68        self.logger.propagate = False
69        self.logger.setLevel(logging.DEBUG)
70
71        # log all warnings to stderr
72        handler = logging.StreamHandler()
73        handler.setLevel(logging.WARNING)
74        handler.setFormatter(self._log_formatter)
75        self.logger.addHandler(handler)
76
77    def isVerboseLoggingRequested(self):
78        # We will report our detailed logs if the user requested that the "gdb-remote" channel is
79        # logged.
80        return any(("gdb-remote" in channel)
81                   for channel in lldbtest_config.channels)
82
83    def setUp(self):
84        TestBase.setUp(self)
85
86        self.setUpBaseLogging()
87        self.debug_monitor_extra_args = []
88        self._pump_queues = socket_packet_pump.PumpQueues()
89
90        if self.isVerboseLoggingRequested():
91            # If requested, full logs go to a log file
92            self._verbose_log_handler = logging.FileHandler(
93                self.log_basename + "-host.log")
94            self._verbose_log_handler.setFormatter(self._log_formatter)
95            self._verbose_log_handler.setLevel(logging.DEBUG)
96            self.logger.addHandler(self._verbose_log_handler)
97
98        self.test_sequence = GdbRemoteTestSequence(self.logger)
99        self.set_inferior_startup_launch()
100        self.port = self.get_next_port()
101        self.named_pipe_path = None
102        self.named_pipe = None
103        self.named_pipe_fd = None
104        self.stub_sends_two_stop_notifications_on_kill = False
105        if configuration.lldb_platform_url:
106            if configuration.lldb_platform_url.startswith('unix-'):
107                url_pattern = '(.+)://\[?(.+?)\]?/.*'
108            else:
109                url_pattern = '(.+)://(.+):\d+'
110            scheme, host = re.match(
111                url_pattern, configuration.lldb_platform_url).groups()
112            if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
113                self.stub_device = host
114                self.stub_hostname = 'localhost'
115            else:
116                self.stub_device = None
117                self.stub_hostname = host
118        else:
119            self.stub_hostname = "localhost"
120
121    def tearDown(self):
122        self._pump_queues.verify_queues_empty()
123
124        self.logger.removeHandler(self._verbose_log_handler)
125        self._verbose_log_handler = None
126        TestBase.tearDown(self)
127
128    def getLocalServerLogFile(self):
129        return self.log_basename + "-server.log"
130
131    def setUpServerLogging(self, is_llgs):
132        if len(lldbtest_config.channels) == 0:
133            return  # No logging requested
134
135        if lldb.remote_platform:
136            log_file = lldbutil.join_remote_paths(
137                lldb.remote_platform.GetWorkingDirectory(), "server.log")
138        else:
139            log_file = self.getLocalServerLogFile()
140
141        if is_llgs:
142            self.debug_monitor_extra_args.append("--log-file=" + log_file)
143            self.debug_monitor_extra_args.append(
144                "--log-channels={}".format(":".join(lldbtest_config.channels)))
145        else:
146            self.debug_monitor_extra_args = [
147                "--log-file=" + log_file, "--log-flags=0x800000"]
148
149    def get_next_port(self):
150        return 12000 + random.randint(0, 3999)
151
152    def reset_test_sequence(self):
153        self.test_sequence = GdbRemoteTestSequence(self.logger)
154
155    def create_named_pipe(self):
156        # Create a temp dir and name for a pipe.
157        temp_dir = tempfile.mkdtemp()
158        named_pipe_path = os.path.join(temp_dir, "stub_port_number")
159
160        # Create the named pipe.
161        os.mkfifo(named_pipe_path)
162
163        # Open the read side of the pipe in non-blocking mode.  This will
164        # return right away, ready or not.
165        named_pipe_fd = os.open(named_pipe_path, os.O_RDONLY | os.O_NONBLOCK)
166
167        # Create the file for the named pipe.  Note this will follow semantics of
168        # a non-blocking read side of a named pipe, which has different semantics
169        # than a named pipe opened for read in non-blocking mode.
170        named_pipe = os.fdopen(named_pipe_fd, "r")
171        self.assertIsNotNone(named_pipe)
172
173        def shutdown_named_pipe():
174            # Close the pipe.
175            try:
176                named_pipe.close()
177            except:
178                print("failed to close named pipe")
179                None
180
181            # Delete the pipe.
182            try:
183                os.remove(named_pipe_path)
184            except:
185                print("failed to delete named pipe: {}".format(named_pipe_path))
186                None
187
188            # Delete the temp directory.
189            try:
190                os.rmdir(temp_dir)
191            except:
192                print(
193                    "failed to delete temp dir: {}, directory contents: '{}'".format(
194                        temp_dir, os.listdir(temp_dir)))
195                None
196
197        # Add the shutdown hook to clean up the named pipe.
198        self.addTearDownHook(shutdown_named_pipe)
199
200        # Clear the port so the stub selects a port number.
201        self.port = 0
202
203        return (named_pipe_path, named_pipe, named_pipe_fd)
204
205    def get_stub_port_from_named_socket(self, read_timeout_seconds=5):
206        # Wait for something to read with a max timeout.
207        (ready_readers, _, _) = select.select(
208            [self.named_pipe_fd], [], [], read_timeout_seconds)
209        self.assertIsNotNone(
210            ready_readers,
211            "write side of pipe has not written anything - stub isn't writing to pipe.")
212        self.assertNotEqual(
213            len(ready_readers),
214            0,
215            "write side of pipe has not written anything - stub isn't writing to pipe.")
216
217        # Read the port from the named pipe.
218        stub_port_raw = self.named_pipe.read()
219        self.assertIsNotNone(stub_port_raw)
220        self.assertNotEqual(
221            len(stub_port_raw),
222            0,
223            "no content to read on pipe")
224
225        # Trim null byte, convert to int.
226        stub_port_raw = stub_port_raw[:-1]
227        stub_port = int(stub_port_raw)
228        self.assertTrue(stub_port > 0)
229
230        return stub_port
231
232    def init_llgs_test(self, use_named_pipe=True):
233        if lldb.remote_platform:
234            # Remote platforms don't support named pipe based port negotiation
235            use_named_pipe = False
236
237            # Grab the ppid from /proc/[shell pid]/stat
238            err, retcode, shell_stat = self.run_platform_command(
239                "cat /proc/$$/stat")
240            self.assertTrue(
241                err.Success() and retcode == 0,
242                "Failed to read file /proc/$$/stat: %s, retcode: %d" %
243                (err.GetCString(),
244                 retcode))
245
246            # [pid] ([executable]) [state] [*ppid*]
247            pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
248            err, retcode, ls_output = self.run_platform_command(
249                "ls -l /proc/%s/exe" % pid)
250            self.assertTrue(
251                err.Success() and retcode == 0,
252                "Failed to read file /proc/%s/exe: %s, retcode: %d" %
253                (pid,
254                 err.GetCString(),
255                 retcode))
256            exe = ls_output.split()[-1]
257
258            # If the binary has been deleted, the link name has " (deleted)" appended.
259            # Remove if it's there.
260            self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe)
261        else:
262            self.debug_monitor_exe = get_lldb_server_exe()
263            if not self.debug_monitor_exe:
264                self.skipTest("lldb-server exe not found")
265
266        self.debug_monitor_extra_args = ["gdbserver"]
267        self.setUpServerLogging(is_llgs=True)
268
269        if use_named_pipe:
270            (self.named_pipe_path, self.named_pipe,
271             self.named_pipe_fd) = self.create_named_pipe()
272
273    def init_debugserver_test(self, use_named_pipe=True):
274        self.debug_monitor_exe = get_debugserver_exe()
275        if not self.debug_monitor_exe:
276            self.skipTest("debugserver exe not found")
277        self.setUpServerLogging(is_llgs=False)
278        if use_named_pipe:
279            (self.named_pipe_path, self.named_pipe,
280             self.named_pipe_fd) = self.create_named_pipe()
281        # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
282        # when the process truly dies.
283        self.stub_sends_two_stop_notifications_on_kill = True
284
285    def forward_adb_port(self, source, target, direction, device):
286        adb = ['adb'] + (['-s', device] if device else []) + [direction]
287
288        def remove_port_forward():
289            subprocess.call(adb + ["--remove", "tcp:%d" % source])
290
291        subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target])
292        self.addTearDownHook(remove_port_forward)
293
294    def _verify_socket(self, sock):
295        # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
296        # connect() attempt. However, due to the way how ADB forwarding works, on android targets
297        # the connect() will always be successful, but the connection will be immediately dropped
298        # if ADB could not connect on the remote side. This function tries to detect this
299        # situation, and report it as "connection refused" so that the upper layers attempt the
300        # connection again.
301        triple = self.dbg.GetSelectedPlatform().GetTriple()
302        if not re.match(".*-.*-.*-android", triple):
303            return  # Not android.
304        can_read, _, _ = select.select([sock], [], [], 0.1)
305        if sock not in can_read:
306            return  # Data is not available, but the connection is alive.
307        if len(sock.recv(1, socket.MSG_PEEK)) == 0:
308            raise _ConnectionRefused()  # Got EOF, connection dropped.
309
310    def create_socket(self):
311        sock = socket.socket()
312        logger = self.logger
313
314        triple = self.dbg.GetSelectedPlatform().GetTriple()
315        if re.match(".*-.*-.*-android", triple):
316            self.forward_adb_port(
317                self.port,
318                self.port,
319                "forward",
320                self.stub_device)
321
322        logger.info(
323            "Connecting to debug monitor on %s:%d",
324            self.stub_hostname,
325            self.port)
326        connect_info = (self.stub_hostname, self.port)
327        try:
328            sock.connect(connect_info)
329        except socket.error as serr:
330            if serr.errno == errno.ECONNREFUSED:
331                raise _ConnectionRefused()
332            raise serr
333
334        def shutdown_socket():
335            if sock:
336                try:
337                    # send the kill packet so lldb-server shuts down gracefully
338                    sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
339                except:
340                    logger.warning(
341                        "failed to send kill packet to debug monitor: {}; ignoring".format(
342                            sys.exc_info()[0]))
343
344                try:
345                    sock.close()
346                except:
347                    logger.warning(
348                        "failed to close socket to debug monitor: {}; ignoring".format(
349                            sys.exc_info()[0]))
350
351        self.addTearDownHook(shutdown_socket)
352
353        self._verify_socket(sock)
354
355        return sock
356
357    def set_inferior_startup_launch(self):
358        self._inferior_startup = self._STARTUP_LAUNCH
359
360    def set_inferior_startup_attach(self):
361        self._inferior_startup = self._STARTUP_ATTACH
362
363    def set_inferior_startup_attach_manually(self):
364        self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
365
366    def get_debug_monitor_command_line_args(self, attach_pid=None):
367        if lldb.remote_platform:
368            commandline_args = self.debug_monitor_extra_args + \
369                ["*:{}".format(self.port)]
370        else:
371            commandline_args = self.debug_monitor_extra_args + \
372                ["localhost:{}".format(self.port)]
373
374        if attach_pid:
375            commandline_args += ["--attach=%d" % attach_pid]
376        if self.named_pipe_path:
377            commandline_args += ["--named-pipe", self.named_pipe_path]
378        return commandline_args
379
380    def launch_debug_monitor(self, attach_pid=None, logfile=None):
381        # Create the command line.
382        commandline_args = self.get_debug_monitor_command_line_args(
383            attach_pid=attach_pid)
384
385        # Start the server.
386        server = self.spawnSubprocess(
387            self.debug_monitor_exe,
388            commandline_args,
389            install_remote=False)
390        self.addTearDownHook(self.cleanupSubprocesses)
391        self.assertIsNotNone(server)
392
393        # If we're receiving the stub's listening port from the named pipe, do
394        # that here.
395        if self.named_pipe:
396            self.port = self.get_stub_port_from_named_socket()
397
398        return server
399
400    def connect_to_debug_monitor(self, attach_pid=None):
401        if self.named_pipe:
402            # Create the stub.
403            server = self.launch_debug_monitor(attach_pid=attach_pid)
404            self.assertIsNotNone(server)
405
406            def shutdown_debug_monitor():
407                try:
408                    server.terminate()
409                except:
410                    logger.warning(
411                        "failed to terminate server for debug monitor: {}; ignoring".format(
412                            sys.exc_info()[0]))
413            self.addTearDownHook(shutdown_debug_monitor)
414
415            # Schedule debug monitor to be shut down during teardown.
416            logger = self.logger
417
418            # Attach to the stub and return a socket opened to it.
419            self.sock = self.create_socket()
420            return server
421
422        # We're using a random port algorithm to try not to collide with other ports,
423        # and retry a max # times.
424        attempts = 0
425        MAX_ATTEMPTS = 20
426
427        while attempts < MAX_ATTEMPTS:
428            server = self.launch_debug_monitor(attach_pid=attach_pid)
429
430            # Schedule debug monitor to be shut down during teardown.
431            logger = self.logger
432
433            def shutdown_debug_monitor():
434                try:
435                    server.terminate()
436                except:
437                    logger.warning(
438                        "failed to terminate server for debug monitor: {}; ignoring".format(
439                            sys.exc_info()[0]))
440            self.addTearDownHook(shutdown_debug_monitor)
441
442            connect_attemps = 0
443            MAX_CONNECT_ATTEMPTS = 10
444
445            while connect_attemps < MAX_CONNECT_ATTEMPTS:
446                # Create a socket to talk to the server
447                try:
448                    logger.info("Connect attempt %d", connect_attemps + 1)
449                    self.sock = self.create_socket()
450                    return server
451                except _ConnectionRefused as serr:
452                    # Ignore, and try again.
453                    pass
454                time.sleep(0.5)
455                connect_attemps += 1
456
457            # We should close the server here to be safe.
458            server.terminate()
459
460            # Increment attempts.
461            print(
462                "connect to debug monitor on port %d failed, attempt #%d of %d" %
463                (self.port, attempts + 1, MAX_ATTEMPTS))
464            attempts += 1
465
466            # And wait a random length of time before next attempt, to avoid
467            # collisions.
468            time.sleep(random.randint(1, 5))
469
470            # Now grab a new port number.
471            self.port = self.get_next_port()
472
473        raise Exception(
474            "failed to create a socket to the launched debug monitor after %d tries" %
475            attempts)
476
477    def launch_process_for_attach(
478            self,
479            inferior_args=None,
480            sleep_seconds=3,
481            exe_path=None):
482        # We're going to start a child process that the debug monitor stub can later attach to.
483        # This process needs to be started so that it just hangs around for a while.  We'll
484        # have it sleep.
485        if not exe_path:
486            exe_path = os.path.abspath("a.out")
487
488        args = []
489        if inferior_args:
490            args.extend(inferior_args)
491        if sleep_seconds:
492            args.append("sleep:%d" % sleep_seconds)
493
494        inferior = self.spawnSubprocess(exe_path, args)
495
496        def shutdown_process_for_attach():
497            try:
498                inferior.terminate()
499            except:
500                logger.warning(
501                    "failed to terminate inferior process for attach: {}; ignoring".format(
502                        sys.exc_info()[0]))
503        self.addTearDownHook(shutdown_process_for_attach)
504        return inferior
505
506    def prep_debug_monitor_and_inferior(
507            self,
508            inferior_args=None,
509            inferior_sleep_seconds=3,
510            inferior_exe_path=None):
511        """Prep the debug monitor, the inferior, and the expected packet stream.
512
513        Handle the separate cases of using the debug monitor in attach-to-inferior mode
514        and in launch-inferior mode.
515
516        For attach-to-inferior mode, the inferior process is first started, then
517        the debug monitor is started in attach to pid mode (using --attach on the
518        stub command line), and the no-ack-mode setup is appended to the packet
519        stream.  The packet stream is not yet executed, ready to have more expected
520        packet entries added to it.
521
522        For launch-inferior mode, the stub is first started, then no ack mode is
523        setup on the expected packet stream, then the verified launch packets are added
524        to the expected socket stream.  The packet stream is not yet executed, ready
525        to have more expected packet entries added to it.
526
527        The return value is:
528        {inferior:<inferior>, server:<server>}
529        """
530        inferior = None
531        attach_pid = None
532
533        if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
534            # Launch the process that we'll use as the inferior.
535            inferior = self.launch_process_for_attach(
536                inferior_args=inferior_args,
537                sleep_seconds=inferior_sleep_seconds,
538                exe_path=inferior_exe_path)
539            self.assertIsNotNone(inferior)
540            self.assertTrue(inferior.pid > 0)
541            if self._inferior_startup == self._STARTUP_ATTACH:
542                # In this case, we want the stub to attach via the command
543                # line, so set the command line attach pid here.
544                attach_pid = inferior.pid
545
546        if self._inferior_startup == self._STARTUP_LAUNCH:
547            # Build launch args
548            if not inferior_exe_path:
549                inferior_exe_path = os.path.abspath("a.out")
550
551            if lldb.remote_platform:
552                remote_path = lldbutil.append_to_process_working_directory(
553                    os.path.basename(inferior_exe_path))
554                remote_file_spec = lldb.SBFileSpec(remote_path, False)
555                err = lldb.remote_platform.Install(lldb.SBFileSpec(
556                    inferior_exe_path, True), remote_file_spec)
557                if err.Fail():
558                    raise Exception(
559                        "remote_platform.Install('%s', '%s') failed: %s" %
560                        (inferior_exe_path, remote_path, err))
561                inferior_exe_path = remote_path
562
563            launch_args = [inferior_exe_path]
564            if inferior_args:
565                launch_args.extend(inferior_args)
566
567        # Launch the debug monitor stub, attaching to the inferior.
568        server = self.connect_to_debug_monitor(attach_pid=attach_pid)
569        self.assertIsNotNone(server)
570
571        # Build the expected protocol stream
572        self.add_no_ack_remote_stream()
573        if self._inferior_startup == self._STARTUP_LAUNCH:
574            self.add_verified_launch_packets(launch_args)
575
576        return {"inferior": inferior, "server": server}
577
578    def expect_socket_recv(
579            self,
580            sock,
581            expected_content_regex,
582            timeout_seconds):
583        response = ""
584        timeout_time = time.time() + timeout_seconds
585
586        while not expected_content_regex.match(
587                response) and time.time() < timeout_time:
588            can_read, _, _ = select.select([sock], [], [], timeout_seconds)
589            if can_read and sock in can_read:
590                recv_bytes = sock.recv(4096)
591                if recv_bytes:
592                    response += recv_bytes
593
594        self.assertTrue(expected_content_regex.match(response))
595
596    def expect_socket_send(self, sock, content, timeout_seconds):
597        request_bytes_remaining = content
598        timeout_time = time.time() + timeout_seconds
599
600        while len(request_bytes_remaining) > 0 and time.time() < timeout_time:
601            _, can_write, _ = select.select([], [sock], [], timeout_seconds)
602            if can_write and sock in can_write:
603                written_byte_count = sock.send(request_bytes_remaining)
604                request_bytes_remaining = request_bytes_remaining[
605                    written_byte_count:]
606        self.assertEqual(len(request_bytes_remaining), 0)
607
608    def do_handshake(self, stub_socket, timeout_seconds=5):
609        # Write the ack.
610        self.expect_socket_send(stub_socket, "+", timeout_seconds)
611
612        # Send the start no ack mode packet.
613        NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0"
614        bytes_sent = stub_socket.send(NO_ACK_MODE_REQUEST)
615        self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST))
616
617        # Receive the ack and "OK"
618        self.expect_socket_recv(stub_socket, re.compile(
619            r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds)
620
621        # Send the final ack.
622        self.expect_socket_send(stub_socket, "+", timeout_seconds)
623
624    def add_no_ack_remote_stream(self):
625        self.test_sequence.add_log_lines(
626            ["read packet: +",
627             "read packet: $QStartNoAckMode#b0",
628             "send packet: +",
629             "send packet: $OK#9a",
630             "read packet: +"],
631            True)
632
633    def add_verified_launch_packets(self, launch_args):
634        self.test_sequence.add_log_lines(
635            ["read packet: %s" % build_gdbremote_A_packet(launch_args),
636             "send packet: $OK#00",
637             "read packet: $qLaunchSuccess#a5",
638             "send packet: $OK#00"],
639            True)
640
641    def add_thread_suffix_request_packets(self):
642        self.test_sequence.add_log_lines(
643            ["read packet: $QThreadSuffixSupported#e4",
644             "send packet: $OK#00",
645             ], True)
646
647    def add_process_info_collection_packets(self):
648        self.test_sequence.add_log_lines(
649            ["read packet: $qProcessInfo#dc",
650             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}],
651            True)
652
653    _KNOWN_PROCESS_INFO_KEYS = [
654        "pid",
655        "parent-pid",
656        "real-uid",
657        "real-gid",
658        "effective-uid",
659        "effective-gid",
660        "cputype",
661        "cpusubtype",
662        "ostype",
663        "triple",
664        "vendor",
665        "endian",
666        "elf_abi",
667        "ptrsize"
668    ]
669
670    def parse_process_info_response(self, context):
671        # Ensure we have a process info response.
672        self.assertIsNotNone(context)
673        process_info_raw = context.get("process_info_raw")
674        self.assertIsNotNone(process_info_raw)
675
676        # Pull out key:value; pairs.
677        process_info_dict = {
678            match.group(1): match.group(2) for match in re.finditer(
679                r"([^:]+):([^;]+);", process_info_raw)}
680
681        # Validate keys are known.
682        for (key, val) in list(process_info_dict.items()):
683            self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
684            self.assertIsNotNone(val)
685
686        return process_info_dict
687
688    def add_register_info_collection_packets(self):
689        self.test_sequence.add_log_lines(
690            [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True,
691                "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
692                "save_key": "reg_info_responses"}],
693            True)
694
695    def parse_register_info_packets(self, context):
696        """Return an array of register info dictionaries, one per register info."""
697        reg_info_responses = context.get("reg_info_responses")
698        self.assertIsNotNone(reg_info_responses)
699
700        # Parse register infos.
701        return [parse_reg_info_response(reg_info_response)
702                for reg_info_response in reg_info_responses]
703
704    def expect_gdbremote_sequence(self, timeout_seconds=None):
705        if not timeout_seconds:
706            timeout_seconds = self._TIMEOUT_SECONDS
707        return expect_lldb_gdbserver_replay(
708            self,
709            self.sock,
710            self.test_sequence,
711            self._pump_queues,
712            timeout_seconds,
713            self.logger)
714
715    _KNOWN_REGINFO_KEYS = [
716        "name",
717        "alt-name",
718        "bitsize",
719        "offset",
720        "encoding",
721        "format",
722        "set",
723        "gcc",
724        "ehframe",
725        "dwarf",
726        "generic",
727        "container-regs",
728        "invalidate-regs",
729        "dynamic_size_dwarf_expr_bytes",
730        "dynamic_size_dwarf_len"
731    ]
732
733    def assert_valid_reg_info(self, reg_info):
734        # Assert we know about all the reginfo keys parsed.
735        for key in reg_info:
736            self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
737
738        # Check the bare-minimum expected set of register info keys.
739        self.assertTrue("name" in reg_info)
740        self.assertTrue("bitsize" in reg_info)
741        self.assertTrue("offset" in reg_info)
742        self.assertTrue("encoding" in reg_info)
743        self.assertTrue("format" in reg_info)
744
745    def find_pc_reg_info(self, reg_infos):
746        lldb_reg_index = 0
747        for reg_info in reg_infos:
748            if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
749                return (lldb_reg_index, reg_info)
750            lldb_reg_index += 1
751
752        return (None, None)
753
754    def add_lldb_register_index(self, reg_infos):
755        """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
756
757        We'll use this when we want to call packets like P/p with a register index but do so
758        on only a subset of the full register info set.
759        """
760        self.assertIsNotNone(reg_infos)
761
762        reg_index = 0
763        for reg_info in reg_infos:
764            reg_info["lldb_register_index"] = reg_index
765            reg_index += 1
766
767    def add_query_memory_region_packets(self, address):
768        self.test_sequence.add_log_lines(
769            ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
770             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}],
771            True)
772
773    def parse_key_val_dict(self, key_val_text, allow_dupes=True):
774        self.assertIsNotNone(key_val_text)
775        kv_dict = {}
776        for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
777            key = match.group(1)
778            val = match.group(2)
779            if key in kv_dict:
780                if allow_dupes:
781                    if isinstance(kv_dict[key], list):
782                        kv_dict[key].append(val)
783                    else:
784                        # Promote to list
785                        kv_dict[key] = [kv_dict[key], val]
786                else:
787                    self.fail(
788                        "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
789                            key, val, key_val_text, kv_dict))
790            else:
791                kv_dict[key] = val
792        return kv_dict
793
794    def parse_memory_region_packet(self, context):
795        # Ensure we have a context.
796        self.assertIsNotNone(context.get("memory_region_response"))
797
798        # Pull out key:value; pairs.
799        mem_region_dict = self.parse_key_val_dict(
800            context.get("memory_region_response"))
801
802        # Validate keys are known.
803        for (key, val) in list(mem_region_dict.items()):
804            self.assertTrue(
805                key in [
806                    "start",
807                    "size",
808                    "permissions",
809                    "name",
810                    "error"])
811            self.assertIsNotNone(val)
812
813        # Return the dictionary of key-value pairs for the memory region.
814        return mem_region_dict
815
816    def assert_address_within_memory_region(
817            self, test_address, mem_region_dict):
818        self.assertIsNotNone(mem_region_dict)
819        self.assertTrue("start" in mem_region_dict)
820        self.assertTrue("size" in mem_region_dict)
821
822        range_start = int(mem_region_dict["start"], 16)
823        range_size = int(mem_region_dict["size"], 16)
824        range_end = range_start + range_size
825
826        if test_address < range_start:
827            self.fail(
828                "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
829                    test_address,
830                    range_start,
831                    range_end,
832                    range_size))
833        elif test_address >= range_end:
834            self.fail(
835                "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
836                    test_address,
837                    range_start,
838                    range_end,
839                    range_size))
840
841    def add_threadinfo_collection_packets(self):
842        self.test_sequence.add_log_lines(
843            [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo",
844                "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
845                "save_key": "threadinfo_responses"}],
846            True)
847
848    def parse_threadinfo_packets(self, context):
849        """Return an array of thread ids (decimal ints), one per thread."""
850        threadinfo_responses = context.get("threadinfo_responses")
851        self.assertIsNotNone(threadinfo_responses)
852
853        thread_ids = []
854        for threadinfo_response in threadinfo_responses:
855            new_thread_infos = parse_threadinfo_response(threadinfo_response)
856            thread_ids.extend(new_thread_infos)
857        return thread_ids
858
859    def wait_for_thread_count(self, thread_count, timeout_seconds=3):
860        start_time = time.time()
861        timeout_time = start_time + timeout_seconds
862
863        actual_thread_count = 0
864        while actual_thread_count < thread_count:
865            self.reset_test_sequence()
866            self.add_threadinfo_collection_packets()
867
868            context = self.expect_gdbremote_sequence()
869            self.assertIsNotNone(context)
870
871            threads = self.parse_threadinfo_packets(context)
872            self.assertIsNotNone(threads)
873
874            actual_thread_count = len(threads)
875
876            if time.time() > timeout_time:
877                raise Exception(
878                    'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
879                        timeout_seconds, thread_count, actual_thread_count))
880
881        return threads
882
883    def add_set_breakpoint_packets(
884            self,
885            address,
886            do_continue=True,
887            breakpoint_kind=1):
888        self.test_sequence.add_log_lines(
889            [  # Set the breakpoint.
890                "read packet: $Z0,{0:x},{1}#00".format(
891                    address, breakpoint_kind),
892                # Verify the stub could set it.
893                "send packet: $OK#00",
894            ], True)
895
896        if (do_continue):
897            self.test_sequence.add_log_lines(
898                [  # Continue the inferior.
899                    "read packet: $c#63",
900                    # Expect a breakpoint stop report.
901                    {"direction": "send",
902                     "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
903                     "capture": {1: "stop_signo",
904                                 2: "stop_thread_id"}},
905                ], True)
906
907    def add_remove_breakpoint_packets(self, address, breakpoint_kind=1):
908        self.test_sequence.add_log_lines(
909            [  # Remove the breakpoint.
910                "read packet: $z0,{0:x},{1}#00".format(
911                    address, breakpoint_kind),
912                # Verify the stub could unset it.
913                "send packet: $OK#00",
914            ], True)
915
916    def add_qSupported_packets(self):
917        self.test_sequence.add_log_lines(
918            ["read packet: $qSupported#00",
919             {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}},
920             ], True)
921
922    _KNOWN_QSUPPORTED_STUB_FEATURES = [
923        "augmented-libraries-svr4-read",
924        "PacketSize",
925        "QStartNoAckMode",
926        "QThreadSuffixSupported",
927        "QListThreadsInStopReply",
928        "qXfer:auxv:read",
929        "qXfer:libraries:read",
930        "qXfer:libraries-svr4:read",
931        "qXfer:features:read",
932        "qEcho"
933    ]
934
935    def parse_qSupported_response(self, context):
936        self.assertIsNotNone(context)
937
938        raw_response = context.get("qSupported_response")
939        self.assertIsNotNone(raw_response)
940
941        # For values with key=val, the dict key and vals are set as expected.  For feature+, feature- and feature?, the
942        # +,-,? is stripped from the key and set as the value.
943        supported_dict = {}
944        for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
945            key = match.group(1)
946            val = match.group(3)
947
948            # key=val: store as is
949            if val and len(val) > 0:
950                supported_dict[key] = val
951            else:
952                if len(key) < 2:
953                    raise Exception(
954                        "singular stub feature is too short: must be stub_feature{+,-,?}")
955                supported_type = key[-1]
956                key = key[:-1]
957                if not supported_type in ["+", "-", "?"]:
958                    raise Exception(
959                        "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
960                supported_dict[key] = supported_type
961            # Ensure we know the supported element
962            if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES:
963                raise Exception(
964                    "unknown qSupported stub feature reported: %s" %
965                    key)
966
967        return supported_dict
968
969    def run_process_then_stop(self, run_seconds=1):
970        # Tell the stub to continue.
971        self.test_sequence.add_log_lines(
972            ["read packet: $vCont;c#a8"],
973            True)
974        context = self.expect_gdbremote_sequence()
975
976        # Wait for run_seconds.
977        time.sleep(run_seconds)
978
979        # Send an interrupt, capture a T response.
980        self.reset_test_sequence()
981        self.test_sequence.add_log_lines(
982            ["read packet: {}".format(chr(3)),
983             {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}],
984            True)
985        context = self.expect_gdbremote_sequence()
986        self.assertIsNotNone(context)
987        self.assertIsNotNone(context.get("stop_result"))
988
989        return context
990
991    def select_modifiable_register(self, reg_infos):
992        """Find a register that can be read/written freely."""
993        PREFERRED_REGISTER_NAMES = set(["rax", ])
994
995        # First check for the first register from the preferred register name
996        # set.
997        alternative_register_index = None
998
999        self.assertIsNotNone(reg_infos)
1000        for reg_info in reg_infos:
1001            if ("name" in reg_info) and (
1002                    reg_info["name"] in PREFERRED_REGISTER_NAMES):
1003                # We found a preferred register.  Use it.
1004                return reg_info["lldb_register_index"]
1005            if ("generic" in reg_info) and (reg_info["generic"] == "fp"):
1006                # A frame pointer register will do as a register to modify
1007                # temporarily.
1008                alternative_register_index = reg_info["lldb_register_index"]
1009
1010        # We didn't find a preferred register.  Return whatever alternative register
1011        # we found, if any.
1012        return alternative_register_index
1013
1014    def extract_registers_from_stop_notification(self, stop_key_vals_text):
1015        self.assertIsNotNone(stop_key_vals_text)
1016        kv_dict = self.parse_key_val_dict(stop_key_vals_text)
1017
1018        registers = {}
1019        for (key, val) in list(kv_dict.items()):
1020            if re.match(r"^[0-9a-fA-F]+$", key):
1021                registers[int(key, 16)] = val
1022        return registers
1023
1024    def gather_register_infos(self):
1025        self.reset_test_sequence()
1026        self.add_register_info_collection_packets()
1027
1028        context = self.expect_gdbremote_sequence()
1029        self.assertIsNotNone(context)
1030
1031        reg_infos = self.parse_register_info_packets(context)
1032        self.assertIsNotNone(reg_infos)
1033        self.add_lldb_register_index(reg_infos)
1034
1035        return reg_infos
1036
1037    def find_generic_register_with_name(self, reg_infos, generic_name):
1038        self.assertIsNotNone(reg_infos)
1039        for reg_info in reg_infos:
1040            if ("generic" in reg_info) and (
1041                    reg_info["generic"] == generic_name):
1042                return reg_info
1043        return None
1044
1045    def decode_gdbremote_binary(self, encoded_bytes):
1046        decoded_bytes = ""
1047        i = 0
1048        while i < len(encoded_bytes):
1049            if encoded_bytes[i] == "}":
1050                # Handle escaped char.
1051                self.assertTrue(i + 1 < len(encoded_bytes))
1052                decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20)
1053                i += 2
1054            elif encoded_bytes[i] == "*":
1055                # Handle run length encoding.
1056                self.assertTrue(len(decoded_bytes) > 0)
1057                self.assertTrue(i + 1 < len(encoded_bytes))
1058                repeat_count = ord(encoded_bytes[i + 1]) - 29
1059                decoded_bytes += decoded_bytes[-1] * repeat_count
1060                i += 2
1061            else:
1062                decoded_bytes += encoded_bytes[i]
1063                i += 1
1064        return decoded_bytes
1065
1066    def build_auxv_dict(self, endian, word_size, auxv_data):
1067        self.assertIsNotNone(endian)
1068        self.assertIsNotNone(word_size)
1069        self.assertIsNotNone(auxv_data)
1070
1071        auxv_dict = {}
1072
1073        while len(auxv_data) > 0:
1074            # Chop off key.
1075            raw_key = auxv_data[:word_size]
1076            auxv_data = auxv_data[word_size:]
1077
1078            # Chop of value.
1079            raw_value = auxv_data[:word_size]
1080            auxv_data = auxv_data[word_size:]
1081
1082            # Convert raw text from target endian.
1083            key = unpack_endian_binary_string(endian, raw_key)
1084            value = unpack_endian_binary_string(endian, raw_value)
1085
1086            # Handle ending entry.
1087            if key == 0:
1088                self.assertEqual(value, 0)
1089                return auxv_dict
1090
1091            # The key should not already be present.
1092            self.assertFalse(key in auxv_dict)
1093            auxv_dict[key] = value
1094
1095        self.fail(
1096            "should not reach here - implies required double zero entry not found")
1097        return auxv_dict
1098
1099    def read_binary_data_in_chunks(self, command_prefix, chunk_length):
1100        """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
1101        offset = 0
1102        done = False
1103        decoded_data = ""
1104
1105        while not done:
1106            # Grab the next iteration of data.
1107            self.reset_test_sequence()
1108            self.test_sequence.add_log_lines(
1109                [
1110                    "read packet: ${}{:x},{:x}:#00".format(
1111                        command_prefix,
1112                        offset,
1113                        chunk_length),
1114                    {
1115                        "direction": "send",
1116                        "regex": re.compile(
1117                            r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
1118                            re.MULTILINE | re.DOTALL),
1119                        "capture": {
1120                            1: "response_type",
1121                            2: "content_raw"}}],
1122                True)
1123
1124            context = self.expect_gdbremote_sequence()
1125            self.assertIsNotNone(context)
1126
1127            response_type = context.get("response_type")
1128            self.assertIsNotNone(response_type)
1129            self.assertTrue(response_type in ["l", "m"])
1130
1131            # Move offset along.
1132            offset += chunk_length
1133
1134            # Figure out if we're done.  We're done if the response type is l.
1135            done = response_type == "l"
1136
1137            # Decode binary data.
1138            content_raw = context.get("content_raw")
1139            if content_raw and len(content_raw) > 0:
1140                self.assertIsNotNone(content_raw)
1141                decoded_data += self.decode_gdbremote_binary(content_raw)
1142        return decoded_data
1143
1144    def add_interrupt_packets(self):
1145        self.test_sequence.add_log_lines([
1146            # Send the intterupt.
1147            "read packet: {}".format(chr(3)),
1148            # And wait for the stop notification.
1149            {"direction": "send",
1150             "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
1151             "capture": {1: "stop_signo",
1152                         2: "stop_key_val_text"}},
1153        ], True)
1154
1155    def parse_interrupt_packets(self, context):
1156        self.assertIsNotNone(context.get("stop_signo"))
1157        self.assertIsNotNone(context.get("stop_key_val_text"))
1158        return (int(context["stop_signo"], 16), self.parse_key_val_dict(
1159            context["stop_key_val_text"]))
1160
1161    def add_QSaveRegisterState_packets(self, thread_id):
1162        if thread_id:
1163            # Use the thread suffix form.
1164            request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(
1165                thread_id)
1166        else:
1167            request = "read packet: $QSaveRegisterState#00"
1168
1169        self.test_sequence.add_log_lines([request,
1170                                          {"direction": "send",
1171                                           "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$",
1172                                           "capture": {1: "save_response"}},
1173                                          ],
1174                                         True)
1175
1176    def parse_QSaveRegisterState_response(self, context):
1177        self.assertIsNotNone(context)
1178
1179        save_response = context.get("save_response")
1180        self.assertIsNotNone(save_response)
1181
1182        if len(save_response) < 1 or save_response[0] == "E":
1183            # error received
1184            return (False, None)
1185        else:
1186            return (True, int(save_response))
1187
1188    def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
1189        if thread_id:
1190            # Use the thread suffix form.
1191            request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
1192                save_id, thread_id)
1193        else:
1194            request = "read packet: $QRestoreRegisterState:{}#00".format(
1195                save_id)
1196
1197        self.test_sequence.add_log_lines([
1198            request,
1199            "send packet: $OK#00"
1200        ], True)
1201
1202    def flip_all_bits_in_each_register_value(
1203            self, reg_infos, endian, thread_id=None):
1204        self.assertIsNotNone(reg_infos)
1205
1206        successful_writes = 0
1207        failed_writes = 0
1208
1209        for reg_info in reg_infos:
1210            # Use the lldb register index added to the reg info.  We're not necessarily
1211            # working off a full set of register infos, so an inferred register
1212            # index could be wrong.
1213            reg_index = reg_info["lldb_register_index"]
1214            self.assertIsNotNone(reg_index)
1215
1216            reg_byte_size = int(reg_info["bitsize"]) / 8
1217            self.assertTrue(reg_byte_size > 0)
1218
1219            # Handle thread suffix.
1220            if thread_id:
1221                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1222                    reg_index, thread_id)
1223            else:
1224                p_request = "read packet: $p{:x}#00".format(reg_index)
1225
1226            # Read the existing value.
1227            self.reset_test_sequence()
1228            self.test_sequence.add_log_lines([
1229                p_request,
1230                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1231            ], True)
1232            context = self.expect_gdbremote_sequence()
1233            self.assertIsNotNone(context)
1234
1235            # Verify the response length.
1236            p_response = context.get("p_response")
1237            self.assertIsNotNone(p_response)
1238            initial_reg_value = unpack_register_hex_unsigned(
1239                endian, p_response)
1240
1241            # Flip the value by xoring with all 1s
1242            all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) / 8)
1243            flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
1244            # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1245
1246            # Handle thread suffix for P.
1247            if thread_id:
1248                P_request = "read packet: $P{:x}={};thread:{:x}#00".format(
1249                    reg_index, pack_register_hex(
1250                        endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
1251            else:
1252                P_request = "read packet: $P{:x}={}#00".format(
1253                    reg_index, pack_register_hex(
1254                        endian, flipped_bits_int, byte_size=reg_byte_size))
1255
1256            # Write the flipped value to the register.
1257            self.reset_test_sequence()
1258            self.test_sequence.add_log_lines([P_request,
1259                                              {"direction": "send",
1260                                               "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
1261                                               "capture": {1: "P_response"}},
1262                                              ],
1263                                             True)
1264            context = self.expect_gdbremote_sequence()
1265            self.assertIsNotNone(context)
1266
1267            # Determine if the write succeeded.  There are a handful of registers that can fail, or partially fail
1268            # (e.g. flags, segment selectors, etc.) due to register value restrictions.  Don't worry about them
1269            # all flipping perfectly.
1270            P_response = context.get("P_response")
1271            self.assertIsNotNone(P_response)
1272            if P_response == "OK":
1273                successful_writes += 1
1274            else:
1275                failed_writes += 1
1276                # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1277
1278            # Read back the register value, ensure it matches the flipped
1279            # value.
1280            if P_response == "OK":
1281                self.reset_test_sequence()
1282                self.test_sequence.add_log_lines([
1283                    p_request,
1284                    {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1285                ], True)
1286                context = self.expect_gdbremote_sequence()
1287                self.assertIsNotNone(context)
1288
1289                verify_p_response_raw = context.get("p_response")
1290                self.assertIsNotNone(verify_p_response_raw)
1291                verify_bits = unpack_register_hex_unsigned(
1292                    endian, verify_p_response_raw)
1293
1294                if verify_bits != flipped_bits_int:
1295                    # Some registers, like mxcsrmask and others, will permute what's written.  Adjust succeed/fail counts.
1296                    # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1297                    successful_writes -= 1
1298                    failed_writes += 1
1299
1300        return (successful_writes, failed_writes)
1301
1302    def is_bit_flippable_register(self, reg_info):
1303        if not reg_info:
1304            return False
1305        if not "set" in reg_info:
1306            return False
1307        if reg_info["set"] != "General Purpose Registers":
1308            return False
1309        if ("container-regs" in reg_info) and (
1310                len(reg_info["container-regs"]) > 0):
1311            # Don't try to bit flip registers contained in another register.
1312            return False
1313        if re.match("^.s$", reg_info["name"]):
1314            # This is a 2-letter register name that ends in "s", like a segment register.
1315            # Don't try to bit flip these.
1316            return False
1317        if re.match("^(c|)psr$", reg_info["name"]):
1318            # This is an ARM program status register; don't flip it.
1319            return False
1320        # Okay, this looks fine-enough.
1321        return True
1322
1323    def read_register_values(self, reg_infos, endian, thread_id=None):
1324        self.assertIsNotNone(reg_infos)
1325        values = {}
1326
1327        for reg_info in reg_infos:
1328            # We append a register index when load reg infos so we can work
1329            # with subsets.
1330            reg_index = reg_info.get("lldb_register_index")
1331            self.assertIsNotNone(reg_index)
1332
1333            # Handle thread suffix.
1334            if thread_id:
1335                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1336                    reg_index, thread_id)
1337            else:
1338                p_request = "read packet: $p{:x}#00".format(reg_index)
1339
1340            # Read it with p.
1341            self.reset_test_sequence()
1342            self.test_sequence.add_log_lines([
1343                p_request,
1344                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1345            ], True)
1346            context = self.expect_gdbremote_sequence()
1347            self.assertIsNotNone(context)
1348
1349            # Convert value from target endian to integral.
1350            p_response = context.get("p_response")
1351            self.assertIsNotNone(p_response)
1352            self.assertTrue(len(p_response) > 0)
1353            self.assertFalse(p_response[0] == "E")
1354
1355            values[reg_index] = unpack_register_hex_unsigned(
1356                endian, p_response)
1357
1358        return values
1359
1360    def add_vCont_query_packets(self):
1361        self.test_sequence.add_log_lines(["read packet: $vCont?#49",
1362                                          {"direction": "send",
1363                                           "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
1364                                           "capture": {2: "vCont_query_response"}},
1365                                          ],
1366                                         True)
1367
1368    def parse_vCont_query_response(self, context):
1369        self.assertIsNotNone(context)
1370        vCont_query_response = context.get("vCont_query_response")
1371
1372        # Handle case of no vCont support at all - in which case the capture
1373        # group will be none or zero length.
1374        if not vCont_query_response or len(vCont_query_response) == 0:
1375            return {}
1376
1377        return {key: 1 for key in vCont_query_response.split(
1378            ";") if key and len(key) > 0}
1379
1380    def count_single_steps_until_true(
1381            self,
1382            thread_id,
1383            predicate,
1384            args,
1385            max_step_count=100,
1386            use_Hc_packet=True,
1387            step_instruction="s"):
1388        """Used by single step test that appears in a few different contexts."""
1389        single_step_count = 0
1390
1391        while single_step_count < max_step_count:
1392            self.assertIsNotNone(thread_id)
1393
1394            # Build the packet for the single step instruction.  We replace
1395            # {thread}, if present, with the thread_id.
1396            step_packet = "read packet: ${}#00".format(
1397                re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
1398            # print("\nstep_packet created: {}\n".format(step_packet))
1399
1400            # Single step.
1401            self.reset_test_sequence()
1402            if use_Hc_packet:
1403                self.test_sequence.add_log_lines(
1404                    [  # Set the continue thread.
1405                        "read packet: $Hc{0:x}#00".format(thread_id),
1406                        "send packet: $OK#00",
1407                    ], True)
1408            self.test_sequence.add_log_lines([
1409                # Single step.
1410                step_packet,
1411                # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1412                # Expect a breakpoint stop report.
1413                {"direction": "send",
1414                 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1415                 "capture": {1: "stop_signo",
1416                             2: "stop_thread_id"}},
1417            ], True)
1418            context = self.expect_gdbremote_sequence()
1419            self.assertIsNotNone(context)
1420            self.assertIsNotNone(context.get("stop_signo"))
1421            self.assertEqual(int(context.get("stop_signo"), 16),
1422                             lldbutil.get_signal_number('SIGTRAP'))
1423
1424            single_step_count += 1
1425
1426            # See if the predicate is true.  If so, we're done.
1427            if predicate(args):
1428                return (True, single_step_count)
1429
1430        # The predicate didn't return true within the runaway step count.
1431        return (False, single_step_count)
1432
1433    def g_c1_c2_contents_are(self, args):
1434        """Used by single step test that appears in a few different contexts."""
1435        g_c1_address = args["g_c1_address"]
1436        g_c2_address = args["g_c2_address"]
1437        expected_g_c1 = args["expected_g_c1"]
1438        expected_g_c2 = args["expected_g_c2"]
1439
1440        # Read g_c1 and g_c2 contents.
1441        self.reset_test_sequence()
1442        self.test_sequence.add_log_lines(
1443            ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
1444             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}},
1445             "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
1446             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}],
1447            True)
1448
1449        # Run the packet stream.
1450        context = self.expect_gdbremote_sequence()
1451        self.assertIsNotNone(context)
1452
1453        # Check if what we read from inferior memory is what we are expecting.
1454        self.assertIsNotNone(context.get("g_c1_contents"))
1455        self.assertIsNotNone(context.get("g_c2_contents"))
1456
1457        return (context.get("g_c1_contents").decode("hex") == expected_g_c1) and (
1458            context.get("g_c2_contents").decode("hex") == expected_g_c2)
1459
1460    def single_step_only_steps_one_instruction(
1461            self, use_Hc_packet=True, step_instruction="s"):
1462        """Used by single step test that appears in a few different contexts."""
1463        # Start up the inferior.
1464        procs = self.prep_debug_monitor_and_inferior(
1465            inferior_args=[
1466                "get-code-address-hex:swap_chars",
1467                "get-data-address-hex:g_c1",
1468                "get-data-address-hex:g_c2",
1469                "sleep:1",
1470                "call-function:swap_chars",
1471                "sleep:5"])
1472
1473        # Run the process
1474        self.test_sequence.add_log_lines(
1475            [  # Start running after initial stop.
1476                "read packet: $c#63",
1477                # Match output line that prints the memory address of the function call entry point.
1478                # Note we require launch-only testing so we can get inferior otuput.
1479                {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
1480                 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}},
1481                # Now stop the inferior.
1482                "read packet: {}".format(chr(3)),
1483                # And wait for the stop notification.
1484                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
1485            True)
1486
1487        # Run the packet stream.
1488        context = self.expect_gdbremote_sequence()
1489        self.assertIsNotNone(context)
1490
1491        # Grab the main thread id.
1492        self.assertIsNotNone(context.get("stop_thread_id"))
1493        main_thread_id = int(context.get("stop_thread_id"), 16)
1494
1495        # Grab the function address.
1496        self.assertIsNotNone(context.get("function_address"))
1497        function_address = int(context.get("function_address"), 16)
1498
1499        # Grab the data addresses.
1500        self.assertIsNotNone(context.get("g_c1_address"))
1501        g_c1_address = int(context.get("g_c1_address"), 16)
1502
1503        self.assertIsNotNone(context.get("g_c2_address"))
1504        g_c2_address = int(context.get("g_c2_address"), 16)
1505
1506        # Set a breakpoint at the given address.
1507        if self.getArchitecture() == "arm":
1508            # TODO: Handle case when setting breakpoint in thumb code
1509            BREAKPOINT_KIND = 4
1510        else:
1511            BREAKPOINT_KIND = 1
1512        self.reset_test_sequence()
1513        self.add_set_breakpoint_packets(
1514            function_address,
1515            do_continue=True,
1516            breakpoint_kind=BREAKPOINT_KIND)
1517        context = self.expect_gdbremote_sequence()
1518        self.assertIsNotNone(context)
1519
1520        # Remove the breakpoint.
1521        self.reset_test_sequence()
1522        self.add_remove_breakpoint_packets(
1523            function_address, breakpoint_kind=BREAKPOINT_KIND)
1524        context = self.expect_gdbremote_sequence()
1525        self.assertIsNotNone(context)
1526
1527        # Verify g_c1 and g_c2 match expected initial state.
1528        args = {}
1529        args["g_c1_address"] = g_c1_address
1530        args["g_c2_address"] = g_c2_address
1531        args["expected_g_c1"] = "0"
1532        args["expected_g_c2"] = "1"
1533
1534        self.assertTrue(self.g_c1_c2_contents_are(args))
1535
1536        # Verify we take only a small number of steps to hit the first state.
1537        # Might need to work through function entry prologue code.
1538        args["expected_g_c1"] = "1"
1539        args["expected_g_c2"] = "1"
1540        (state_reached,
1541         step_count) = self.count_single_steps_until_true(main_thread_id,
1542                                                          self.g_c1_c2_contents_are,
1543                                                          args,
1544                                                          max_step_count=25,
1545                                                          use_Hc_packet=use_Hc_packet,
1546                                                          step_instruction=step_instruction)
1547        self.assertTrue(state_reached)
1548
1549        # Verify we hit the next state.
1550        args["expected_g_c1"] = "1"
1551        args["expected_g_c2"] = "0"
1552        (state_reached,
1553         step_count) = self.count_single_steps_until_true(main_thread_id,
1554                                                          self.g_c1_c2_contents_are,
1555                                                          args,
1556                                                          max_step_count=5,
1557                                                          use_Hc_packet=use_Hc_packet,
1558                                                          step_instruction=step_instruction)
1559        self.assertTrue(state_reached)
1560        expected_step_count = 1
1561        arch = self.getArchitecture()
1562
1563        # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
1564        # of variable value
1565        if re.match("mips", arch):
1566            expected_step_count = 3
1567        # S390X requires "2" (LARL, MVI) machine instructions for updation of
1568        # variable value
1569        if re.match("s390x", arch):
1570            expected_step_count = 2
1571        self.assertEqual(step_count, expected_step_count)
1572
1573        # Verify we hit the next state.
1574        args["expected_g_c1"] = "0"
1575        args["expected_g_c2"] = "0"
1576        (state_reached,
1577         step_count) = self.count_single_steps_until_true(main_thread_id,
1578                                                          self.g_c1_c2_contents_are,
1579                                                          args,
1580                                                          max_step_count=5,
1581                                                          use_Hc_packet=use_Hc_packet,
1582                                                          step_instruction=step_instruction)
1583        self.assertTrue(state_reached)
1584        self.assertEqual(step_count, expected_step_count)
1585
1586        # Verify we hit the next state.
1587        args["expected_g_c1"] = "0"
1588        args["expected_g_c2"] = "1"
1589        (state_reached,
1590         step_count) = self.count_single_steps_until_true(main_thread_id,
1591                                                          self.g_c1_c2_contents_are,
1592                                                          args,
1593                                                          max_step_count=5,
1594                                                          use_Hc_packet=use_Hc_packet,
1595                                                          step_instruction=step_instruction)
1596        self.assertTrue(state_reached)
1597        self.assertEqual(step_count, expected_step_count)
1598
1599    def maybe_strict_output_regex(self, regex):
1600        return '.*' + regex + \
1601            '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$'
1602
1603    def install_and_create_launch_args(self):
1604        exe_path = os.path.abspath('a.out')
1605        if not lldb.remote_platform:
1606            return [exe_path]
1607        remote_path = lldbutil.append_to_process_working_directory(
1608            os.path.basename(exe_path))
1609        remote_file_spec = lldb.SBFileSpec(remote_path, False)
1610        err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True),
1611                                           remote_file_spec)
1612        if err.Fail():
1613            raise Exception("remote_platform.Install('%s', '%s') failed: %s" %
1614                            (exe_path, remote_path, err))
1615        return [remote_path]
1616