1"""
2Test case for testing the gdbremote protocol.
3
4Tests run against debugserver and lldb-server (llgs).
5lldb-server tests run where the lldb-server exe is
6available.
7
8This class will be broken into smaller test case classes by
9gdb remote packet functional areas.  For now it contains
10the initial set of tests implemented.
11"""
12
13import binascii
14import itertools
15import struct
16
17import unittest2
18import gdbremote_testcase
19import lldbgdbserverutils
20from lldbsuite.support import seven
21from lldbsuite.test.decorators import *
22from lldbsuite.test.lldbtest import *
23from lldbsuite.test.lldbdwarf import *
24from lldbsuite.test import lldbutil, lldbplatformutil
25
26
27class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase, DwarfOpcodeParser):
28
29    mydir = TestBase.compute_mydir(__file__)
30
31    def test_thread_suffix_supported(self):
32        server = self.connect_to_debug_monitor()
33        self.assertIsNotNone(server)
34
35        self.do_handshake()
36        self.test_sequence.add_log_lines(
37            ["lldb-server <  26> read packet: $QThreadSuffixSupported#e4",
38             "lldb-server <   6> send packet: $OK#9a"],
39            True)
40
41        self.expect_gdbremote_sequence()
42
43
44    def test_list_threads_in_stop_reply_supported(self):
45        server = self.connect_to_debug_monitor()
46        self.assertIsNotNone(server)
47
48        self.do_handshake()
49        self.test_sequence.add_log_lines(
50            ["lldb-server <  27> read packet: $QListThreadsInStopReply#21",
51             "lldb-server <   6> send packet: $OK#9a"],
52            True)
53        self.expect_gdbremote_sequence()
54
55    def test_c_packet_works(self):
56        self.build()
57        procs = self.prep_debug_monitor_and_inferior()
58        self.test_sequence.add_log_lines(
59            ["read packet: $c#63",
60             "send packet: $W00#00"],
61            True)
62
63        self.expect_gdbremote_sequence()
64
65    @skipIfWindows # No pty support to test any inferior output
66    def test_inferior_print_exit(self):
67        self.build()
68        procs = self.prep_debug_monitor_and_inferior(
69                inferior_args=["hello, world"])
70        self.test_sequence.add_log_lines(
71            ["read packet: $vCont;c#a8",
72             {"type": "output_match", "regex": self.maybe_strict_output_regex(r"hello, world\r\n")},
73             "send packet: $W00#00"],
74            True)
75
76        context = self.expect_gdbremote_sequence()
77        self.assertIsNotNone(context)
78
79    def test_first_launch_stop_reply_thread_matches_first_qC(self):
80        self.build()
81        procs = self.prep_debug_monitor_and_inferior()
82        self.test_sequence.add_log_lines(["read packet: $qC#00",
83                                          {"direction": "send",
84                                           "regex": r"^\$QC([0-9a-fA-F]+)#",
85                                           "capture": {1: "thread_id_QC"}},
86                                          "read packet: $?#00",
87                                          {"direction": "send",
88                                              "regex": r"^\$T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+)",
89                                              "capture": {1: "thread_id_?"}}],
90                                         True)
91        context = self.expect_gdbremote_sequence()
92        self.assertEqual(context.get("thread_id_QC"), context.get("thread_id_?"))
93
94    def test_attach_commandline_continue_app_exits(self):
95        self.build()
96        self.set_inferior_startup_attach()
97        procs = self.prep_debug_monitor_and_inferior()
98        self.test_sequence.add_log_lines(
99            ["read packet: $vCont;c#a8",
100             "send packet: $W00#00"],
101            True)
102        self.expect_gdbremote_sequence()
103
104        # Wait a moment for completed and now-detached inferior process to
105        # clear.
106        time.sleep(1)
107
108        if not lldb.remote_platform:
109            # Process should be dead now. Reap results.
110            poll_result = procs["inferior"].poll()
111            self.assertIsNotNone(poll_result)
112
113        # Where possible, verify at the system level that the process is not
114        # running.
115        self.assertFalse(
116            lldbgdbserverutils.process_is_running(
117                procs["inferior"].pid, False))
118
119    def test_qRegisterInfo_returns_one_valid_result(self):
120        self.build()
121        self.prep_debug_monitor_and_inferior()
122        self.test_sequence.add_log_lines(
123            ["read packet: $qRegisterInfo0#00",
124             {"direction": "send", "regex": r"^\$(.+);#[0-9A-Fa-f]{2}", "capture": {1: "reginfo_0"}}],
125            True)
126
127        # Run the stream
128        context = self.expect_gdbremote_sequence()
129        self.assertIsNotNone(context)
130
131        reg_info_packet = context.get("reginfo_0")
132        self.assertIsNotNone(reg_info_packet)
133        self.assert_valid_reg_info(
134            lldbgdbserverutils.parse_reg_info_response(reg_info_packet))
135
136    def test_qRegisterInfo_returns_all_valid_results(self):
137        self.build()
138        self.prep_debug_monitor_and_inferior()
139        self.add_register_info_collection_packets()
140
141        # Run the stream.
142        context = self.expect_gdbremote_sequence()
143        self.assertIsNotNone(context)
144
145        # Validate that each register info returned validates.
146        for reg_info in self.parse_register_info_packets(context):
147            self.assert_valid_reg_info(reg_info)
148
149    def test_qRegisterInfo_contains_required_generics_debugserver(self):
150        self.build()
151        self.prep_debug_monitor_and_inferior()
152        self.add_register_info_collection_packets()
153
154        # Run the packet stream.
155        context = self.expect_gdbremote_sequence()
156        self.assertIsNotNone(context)
157
158        # Gather register info entries.
159        reg_infos = self.parse_register_info_packets(context)
160
161        # Collect all generic registers found.
162        generic_regs = {
163            reg_info['generic']: 1 for reg_info in reg_infos if 'generic' in reg_info}
164
165        # Ensure we have a program counter register.
166        self.assertIn('pc', generic_regs)
167
168        # Ensure we have a frame pointer register. PPC64le's FP is the same as SP
169        if self.getArchitecture() != 'powerpc64le':
170            self.assertIn('fp', generic_regs)
171
172        # Ensure we have a stack pointer register.
173        self.assertIn('sp', generic_regs)
174
175        # Ensure we have a flags register.
176        self.assertIn('flags', generic_regs)
177
178    def test_qRegisterInfo_contains_at_least_one_register_set(self):
179        self.build()
180        self.prep_debug_monitor_and_inferior()
181        self.add_register_info_collection_packets()
182
183        # Run the packet stream.
184        context = self.expect_gdbremote_sequence()
185        self.assertIsNotNone(context)
186
187        # Gather register info entries.
188        reg_infos = self.parse_register_info_packets(context)
189
190        # Collect all register sets found.
191        register_sets = {
192            reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info}
193        self.assertTrue(len(register_sets) >= 1)
194
195    def targetHasAVX(self):
196        triple = self.dbg.GetSelectedPlatform().GetTriple()
197
198        # TODO other platforms, please implement this function
199        if not re.match(".*-.*-linux", triple):
200            return True
201
202        # Need to do something different for non-Linux/Android targets
203        if lldb.remote_platform:
204            self.runCmd('platform get-file "/proc/cpuinfo" "cpuinfo"')
205            cpuinfo_path = "cpuinfo"
206            self.addTearDownHook(lambda: os.unlink("cpuinfo"))
207        else:
208            cpuinfo_path = "/proc/cpuinfo"
209
210        f = open(cpuinfo_path, 'r')
211        cpuinfo = f.read()
212        f.close()
213        return " avx " in cpuinfo
214
215    @expectedFailureAll(oslist=["windows"]) # no avx for now.
216    @skipIf(archs=no_match(['amd64', 'i386', 'x86_64']))
217    @add_test_categories(["llgs"])
218    def test_qRegisterInfo_contains_avx_registers(self):
219        self.build()
220        self.prep_debug_monitor_and_inferior()
221        self.add_register_info_collection_packets()
222
223        # Run the packet stream.
224        context = self.expect_gdbremote_sequence()
225        self.assertIsNotNone(context)
226
227        # Gather register info entries.
228        reg_infos = self.parse_register_info_packets(context)
229
230        # Collect all generics found.
231        register_sets = {
232            reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info}
233        self.assertEqual(
234            self.targetHasAVX(),
235            "Advanced Vector Extensions" in register_sets)
236
237    def qThreadInfo_contains_thread(self):
238        procs = self.prep_debug_monitor_and_inferior()
239        self.add_threadinfo_collection_packets()
240
241        # Run the packet stream.
242        context = self.expect_gdbremote_sequence()
243        self.assertIsNotNone(context)
244
245        # Gather threadinfo entries.
246        threads = self.parse_threadinfo_packets(context)
247        self.assertIsNotNone(threads)
248
249        # We should have exactly one thread.
250        self.assertEqual(len(threads), 1)
251
252    def test_qThreadInfo_contains_thread_launch(self):
253        self.build()
254        self.set_inferior_startup_launch()
255        self.qThreadInfo_contains_thread()
256
257    @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped
258    def test_qThreadInfo_contains_thread_attach(self):
259        self.build()
260        self.set_inferior_startup_attach()
261        self.qThreadInfo_contains_thread()
262
263    def qThreadInfo_matches_qC(self):
264        procs = self.prep_debug_monitor_and_inferior()
265
266        self.add_threadinfo_collection_packets()
267        self.test_sequence.add_log_lines(
268            ["read packet: $qC#00",
269             {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}
270             ], True)
271
272        # Run the packet stream.
273        context = self.expect_gdbremote_sequence()
274        self.assertIsNotNone(context)
275
276        # Gather threadinfo entries.
277        threads = self.parse_threadinfo_packets(context)
278        self.assertIsNotNone(threads)
279
280        # We should have exactly one thread from threadinfo.
281        self.assertEqual(len(threads), 1)
282
283        # We should have a valid thread_id from $QC.
284        QC_thread_id_hex = context.get("thread_id")
285        self.assertIsNotNone(QC_thread_id_hex)
286        QC_thread_id = int(QC_thread_id_hex, 16)
287
288        # Those two should be the same.
289        self.assertEqual(threads[0], QC_thread_id)
290
291    def test_qThreadInfo_matches_qC_launch(self):
292        self.build()
293        self.set_inferior_startup_launch()
294        self.qThreadInfo_matches_qC()
295
296    @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped
297    def test_qThreadInfo_matches_qC_attach(self):
298        self.build()
299        self.set_inferior_startup_attach()
300        self.qThreadInfo_matches_qC()
301
302    def test_p_returns_correct_data_size_for_each_qRegisterInfo_launch(self):
303        self.build()
304        self.set_inferior_startup_launch()
305        procs = self.prep_debug_monitor_and_inferior()
306        self.add_register_info_collection_packets()
307
308        # Run the packet stream.
309        context = self.expect_gdbremote_sequence()
310        self.assertIsNotNone(context)
311
312        # Gather register info entries.
313        reg_infos = self.parse_register_info_packets(context)
314        self.assertIsNotNone(reg_infos)
315        self.assertTrue(len(reg_infos) > 0)
316
317        byte_order = self.get_target_byte_order()
318
319        # Read value for each register.
320        reg_index = 0
321        for reg_info in reg_infos:
322            # Skip registers that don't have a register set.  For x86, these are
323            # the DRx registers, which have no LLDB-kind register number and thus
324            # cannot be read via normal
325            # NativeRegisterContext::ReadRegister(reg_info,...) calls.
326            if not "set" in reg_info:
327                continue
328
329            # Clear existing packet expectations.
330            self.reset_test_sequence()
331
332            # Run the register query
333            self.test_sequence.add_log_lines(
334                ["read packet: $p{0:x}#00".format(reg_index),
335                 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}],
336                True)
337            context = self.expect_gdbremote_sequence()
338            self.assertIsNotNone(context)
339
340            # Verify the response length.
341            p_response = context.get("p_response")
342            self.assertIsNotNone(p_response)
343
344            # Skip erraneous (unsupported) registers.
345            # TODO: remove this once we make unsupported registers disappear.
346            if p_response.startswith("E") and len(p_response) == 3:
347                continue
348
349            if "dynamic_size_dwarf_expr_bytes" in reg_info:
350                self.updateRegInfoBitsize(reg_info, byte_order)
351            self.assertEqual(len(p_response), 2 * int(reg_info["bitsize"]) / 8,
352                             reg_info)
353
354            # Increment loop
355            reg_index += 1
356
357    def Hg_switches_to_3_threads(self, pass_pid=False):
358        _, threads = self.launch_with_threads(3)
359
360        pid_str = ""
361        if pass_pid:
362            pid_str = "p{0:x}.".format(procs["inferior"].pid)
363
364        # verify we can $H to each thead, and $qC matches the thread we set.
365        for thread in threads:
366            # Change to each thread, verify current thread id.
367            self.reset_test_sequence()
368            self.test_sequence.add_log_lines(
369                ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread),  # Set current thread.
370                 "send packet: $OK#00",
371                 "read packet: $qC#00",
372                 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}],
373                True)
374
375            context = self.expect_gdbremote_sequence()
376            self.assertIsNotNone(context)
377
378            # Verify the thread id.
379            self.assertIsNotNone(context.get("thread_id"))
380            self.assertEqual(int(context.get("thread_id"), 16), thread)
381
382    @skipIf(compiler="clang", compiler_version=['<', '11.0'])
383    def test_Hg_switches_to_3_threads_launch(self):
384        self.build()
385        self.set_inferior_startup_launch()
386        self.Hg_switches_to_3_threads()
387
388    def Hg_fails_on_pid(self, pass_pid):
389        _, threads = self.launch_with_threads(2)
390
391        if pass_pid == -1:
392            pid_str = "p-1."
393        else:
394            pid_str = "p{0:x}.".format(pass_pid)
395        thread = threads[1]
396
397        self.test_sequence.add_log_lines(
398            ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread),  # Set current thread.
399             "send packet: $Eff#00"],
400            True)
401
402        self.expect_gdbremote_sequence()
403
404    @add_test_categories(["llgs"])
405    def test_Hg_fails_on_another_pid(self):
406        self.build()
407        self.set_inferior_startup_launch()
408        self.Hg_fails_on_pid(1)
409
410    @add_test_categories(["llgs"])
411    def test_Hg_fails_on_zero_pid(self):
412        self.build()
413        self.set_inferior_startup_launch()
414        self.Hg_fails_on_pid(0)
415
416    @add_test_categories(["llgs"])
417    def test_Hg_fails_on_minus_one_pid(self):
418        self.build()
419        self.set_inferior_startup_launch()
420        self.Hg_fails_on_pid(-1)
421
422    def Hc_then_Csignal_signals_correct_thread(self, segfault_signo):
423        # NOTE only run this one in inferior-launched mode: we can't grab inferior stdout when running attached,
424        # and the test requires getting stdout from the exe.
425
426        NUM_THREADS = 3
427
428        # Startup the inferior with three threads (main + NUM_THREADS-1 worker threads).
429        # inferior_args=["thread:print-ids"]
430        inferior_args = ["thread:segfault"]
431        for i in range(NUM_THREADS - 1):
432            # if i > 0:
433                # Give time between thread creation/segfaulting for the handler to work.
434                # inferior_args.append("sleep:1")
435            inferior_args.append("thread:new")
436        inferior_args.append("sleep:10")
437
438        # Launch/attach.  (In our case, this should only ever be launched since
439        # we need inferior stdout/stderr).
440        procs = self.prep_debug_monitor_and_inferior(
441            inferior_args=inferior_args)
442        self.test_sequence.add_log_lines(["read packet: $c#63"], True)
443        context = self.expect_gdbremote_sequence()
444
445        signaled_tids = {}
446        print_thread_ids = {}
447
448        # Switch to each thread, deliver a signal, and verify signal delivery
449        for i in range(NUM_THREADS - 1):
450            # Run until SIGSEGV comes in.
451            self.reset_test_sequence()
452            self.test_sequence.add_log_lines([{"direction": "send",
453                                               "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
454                                               "capture": {1: "signo",
455                                                            2: "thread_id"}}],
456                                             True)
457
458            context = self.expect_gdbremote_sequence()
459            self.assertIsNotNone(context)
460            signo = context.get("signo")
461            self.assertEqual(int(signo, 16), segfault_signo)
462
463            # Ensure we haven't seen this tid yet.
464            thread_id = int(context.get("thread_id"), 16)
465            self.assertNotIn(thread_id, signaled_tids)
466            signaled_tids[thread_id] = 1
467
468            # Send SIGUSR1 to the thread that signaled the SIGSEGV.
469            self.reset_test_sequence()
470            self.test_sequence.add_log_lines(
471                [
472                    # Set the continue thread.
473                    # Set current thread.
474                    "read packet: $Hc{0:x}#00".format(thread_id),
475                    "send packet: $OK#00",
476
477                    # Continue sending the signal number to the continue thread.
478                    # The commented out packet is a way to do this same operation without using
479                    # a $Hc (but this test is testing $Hc, so we'll stick with the former).
480                    "read packet: $C{0:x}#00".format(lldbutil.get_signal_number('SIGUSR1')),
481                    # "read packet: $vCont;C{0:x}:{1:x};c#00".format(lldbutil.get_signal_number('SIGUSR1'), thread_id),
482
483                    # FIXME: Linux does not report the thread stop on the delivered signal (SIGUSR1 here).  MacOSX debugserver does.
484                    # But MacOSX debugserver isn't guaranteeing the thread the signal handler runs on, so currently its an XFAIL.
485                    # Need to rectify behavior here.  The linux behavior is more intuitive to me since we're essentially swapping out
486                    # an about-to-be-delivered signal (for which we already sent a stop packet) to a different signal.
487                    # {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
488                    #  "read packet: $c#63",
489                    {"type": "output_match", "regex": r"^received SIGUSR1 on thread id: ([0-9a-fA-F]+)\r\nthread ([0-9a-fA-F]+): past SIGSEGV\r\n", "capture": {1: "print_thread_id", 2: "post_handle_thread_id"}},
490                ],
491                True)
492
493            # Run the sequence.
494            context = self.expect_gdbremote_sequence()
495            self.assertIsNotNone(context)
496
497            # Ensure the stop signal is the signal we delivered.
498            # stop_signo = context.get("stop_signo")
499            # self.assertIsNotNone(stop_signo)
500            # self.assertEquals(int(stop_signo,16), lldbutil.get_signal_number('SIGUSR1'))
501
502            # Ensure the stop thread is the thread to which we delivered the signal.
503            # stop_thread_id = context.get("stop_thread_id")
504            # self.assertIsNotNone(stop_thread_id)
505            # self.assertEquals(int(stop_thread_id,16), thread_id)
506
507            # Ensure we haven't seen this thread id yet.  The inferior's
508            # self-obtained thread ids are not guaranteed to match the stub
509            # tids (at least on MacOSX).
510            print_thread_id = context.get("print_thread_id")
511            self.assertIsNotNone(print_thread_id)
512            print_thread_id = int(print_thread_id, 16)
513            self.assertNotIn(print_thread_id, print_thread_ids)
514
515            # Now remember this print (i.e. inferior-reflected) thread id and
516            # ensure we don't hit it again.
517            print_thread_ids[print_thread_id] = 1
518
519            # Ensure post signal-handle thread id matches the thread that
520            # initially raised the SIGSEGV.
521            post_handle_thread_id = context.get("post_handle_thread_id")
522            self.assertIsNotNone(post_handle_thread_id)
523            post_handle_thread_id = int(post_handle_thread_id, 16)
524            self.assertEqual(post_handle_thread_id, print_thread_id)
525
526    @expectedFailureDarwin
527    @skipIfWindows # no SIGSEGV support
528    @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48419")
529    @expectedFailureNetBSD
530    def test_Hc_then_Csignal_signals_correct_thread_launch(self):
531        self.build()
532        self.set_inferior_startup_launch()
533
534        if self.platformIsDarwin():
535            # Darwin debugserver translates some signals like SIGSEGV into some gdb
536            # expectations about fixed signal numbers.
537            self.Hc_then_Csignal_signals_correct_thread(self.TARGET_EXC_BAD_ACCESS)
538        else:
539            self.Hc_then_Csignal_signals_correct_thread(
540                lldbutil.get_signal_number('SIGSEGV'))
541
542    @skipIfWindows # No pty support to test any inferior output
543    def test_m_packet_reads_memory(self):
544        self.build()
545        self.set_inferior_startup_launch()
546        # This is the memory we will write into the inferior and then ensure we
547        # can read back with $m.
548        MEMORY_CONTENTS = "Test contents 0123456789 ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz"
549
550        # Start up the inferior.
551        procs = self.prep_debug_monitor_and_inferior(
552            inferior_args=[
553                "set-message:%s" %
554                MEMORY_CONTENTS,
555                "get-data-address-hex:g_message",
556                "sleep:5"])
557
558        # Run the process
559        self.test_sequence.add_log_lines(
560            [
561                # Start running after initial stop.
562                "read packet: $c#63",
563                # Match output line that prints the memory address of the message buffer within the inferior.
564                # Note we require launch-only testing so we can get inferior otuput.
565                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
566                 "capture": {1: "message_address"}},
567                # Now stop the inferior.
568                "read packet: {}".format(chr(3)),
569                # And wait for the stop notification.
570                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
571            True)
572
573        # Run the packet stream.
574        context = self.expect_gdbremote_sequence()
575        self.assertIsNotNone(context)
576
577        # Grab the message address.
578        self.assertIsNotNone(context.get("message_address"))
579        message_address = int(context.get("message_address"), 16)
580
581        # Grab contents from the inferior.
582        self.reset_test_sequence()
583        self.test_sequence.add_log_lines(
584            ["read packet: $m{0:x},{1:x}#00".format(message_address, len(MEMORY_CONTENTS)),
585             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "read_contents"}}],
586            True)
587
588        # Run the packet stream.
589        context = self.expect_gdbremote_sequence()
590        self.assertIsNotNone(context)
591
592        # Ensure what we read from inferior memory is what we wrote.
593        self.assertIsNotNone(context.get("read_contents"))
594        read_contents = seven.unhexlify(context.get("read_contents"))
595        self.assertEqual(read_contents, MEMORY_CONTENTS)
596
597    def test_qMemoryRegionInfo_is_supported(self):
598        self.build()
599        self.set_inferior_startup_launch()
600        # Start up the inferior.
601        procs = self.prep_debug_monitor_and_inferior()
602
603        # Ask if it supports $qMemoryRegionInfo.
604        self.test_sequence.add_log_lines(
605            ["read packet: $qMemoryRegionInfo#00",
606             "send packet: $OK#00"
607             ], True)
608        self.expect_gdbremote_sequence()
609
610    @skipIfWindows # No pty support to test any inferior output
611    def test_qMemoryRegionInfo_reports_code_address_as_executable(self):
612        self.build()
613        self.set_inferior_startup_launch()
614
615        # Start up the inferior.
616        procs = self.prep_debug_monitor_and_inferior(
617            inferior_args=["get-code-address-hex:hello", "sleep:5"])
618
619        # Run the process
620        self.test_sequence.add_log_lines(
621            [
622                # Start running after initial stop.
623                "read packet: $c#63",
624                # Match output line that prints the memory address of the message buffer within the inferior.
625                # Note we require launch-only testing so we can get inferior otuput.
626                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"),
627                 "capture": {1: "code_address"}},
628                # Now stop the inferior.
629                "read packet: {}".format(chr(3)),
630                # And wait for the stop notification.
631                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
632            True)
633
634        # Run the packet stream.
635        context = self.expect_gdbremote_sequence()
636        self.assertIsNotNone(context)
637
638        # Grab the code address.
639        self.assertIsNotNone(context.get("code_address"))
640        code_address = int(context.get("code_address"), 16)
641
642        # Grab memory region info from the inferior.
643        self.reset_test_sequence()
644        self.add_query_memory_region_packets(code_address)
645
646        # Run the packet stream.
647        context = self.expect_gdbremote_sequence()
648        self.assertIsNotNone(context)
649        mem_region_dict = self.parse_memory_region_packet(context)
650
651        # Ensure there are no errors reported.
652        self.assertNotIn("error", mem_region_dict)
653
654        # Ensure code address is readable and executable.
655        self.assertIn("permissions", mem_region_dict)
656        self.assertIn("r", mem_region_dict["permissions"])
657        self.assertIn("x", mem_region_dict["permissions"])
658
659        # Ensure the start address and size encompass the address we queried.
660        self.assert_address_within_memory_region(code_address, mem_region_dict)
661
662    @skipIfWindows # No pty support to test any inferior output
663    def test_qMemoryRegionInfo_reports_stack_address_as_rw(self):
664        self.build()
665        self.set_inferior_startup_launch()
666
667        # Start up the inferior.
668        procs = self.prep_debug_monitor_and_inferior(
669            inferior_args=["get-stack-address-hex:", "sleep:5"])
670
671        # Run the process
672        self.test_sequence.add_log_lines(
673            [
674                # Start running after initial stop.
675                "read packet: $c#63",
676                # Match output line that prints the memory address of the message buffer within the inferior.
677                # Note we require launch-only testing so we can get inferior otuput.
678                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"stack address: 0x([0-9a-fA-F]+)\r\n"),
679                 "capture": {1: "stack_address"}},
680                # Now stop the inferior.
681                "read packet: {}".format(chr(3)),
682                # And wait for the stop notification.
683                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
684            True)
685
686        # Run the packet stream.
687        context = self.expect_gdbremote_sequence()
688        self.assertIsNotNone(context)
689
690        # Grab the address.
691        self.assertIsNotNone(context.get("stack_address"))
692        stack_address = int(context.get("stack_address"), 16)
693
694        # Grab memory region info from the inferior.
695        self.reset_test_sequence()
696        self.add_query_memory_region_packets(stack_address)
697
698        # Run the packet stream.
699        context = self.expect_gdbremote_sequence()
700        self.assertIsNotNone(context)
701        mem_region_dict = self.parse_memory_region_packet(context)
702
703        # Ensure there are no errors reported.
704        self.assertNotIn("error", mem_region_dict)
705
706        # Ensure address is readable and executable.
707        self.assertIn("permissions", mem_region_dict)
708        self.assertIn("r", mem_region_dict["permissions"])
709        self.assertIn("w", mem_region_dict["permissions"])
710
711        # Ensure the start address and size encompass the address we queried.
712        self.assert_address_within_memory_region(
713            stack_address, mem_region_dict)
714
715    @skipIfWindows # No pty support to test any inferior output
716    def test_qMemoryRegionInfo_reports_heap_address_as_rw(self):
717        self.build()
718        self.set_inferior_startup_launch()
719
720        # Start up the inferior.
721        procs = self.prep_debug_monitor_and_inferior(
722            inferior_args=["get-heap-address-hex:", "sleep:5"])
723
724        # Run the process
725        self.test_sequence.add_log_lines(
726            [
727                # Start running after initial stop.
728                "read packet: $c#63",
729                # Match output line that prints the memory address of the message buffer within the inferior.
730                # Note we require launch-only testing so we can get inferior otuput.
731                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"heap address: 0x([0-9a-fA-F]+)\r\n"),
732                 "capture": {1: "heap_address"}},
733                # Now stop the inferior.
734                "read packet: {}".format(chr(3)),
735                # And wait for the stop notification.
736                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
737            True)
738
739        # Run the packet stream.
740        context = self.expect_gdbremote_sequence()
741        self.assertIsNotNone(context)
742
743        # Grab the address.
744        self.assertIsNotNone(context.get("heap_address"))
745        heap_address = int(context.get("heap_address"), 16)
746
747        # Grab memory region info from the inferior.
748        self.reset_test_sequence()
749        self.add_query_memory_region_packets(heap_address)
750
751        # Run the packet stream.
752        context = self.expect_gdbremote_sequence()
753        self.assertIsNotNone(context)
754        mem_region_dict = self.parse_memory_region_packet(context)
755
756        # Ensure there are no errors reported.
757        self.assertNotIn("error", mem_region_dict)
758
759        # Ensure address is readable and executable.
760        self.assertIn("permissions", mem_region_dict)
761        self.assertIn("r", mem_region_dict["permissions"])
762        self.assertIn("w", mem_region_dict["permissions"])
763
764        # Ensure the start address and size encompass the address we queried.
765        self.assert_address_within_memory_region(heap_address, mem_region_dict)
766
767    def breakpoint_set_and_remove_work(self, want_hardware):
768        # Start up the inferior.
769        procs = self.prep_debug_monitor_and_inferior(
770            inferior_args=[
771                "get-code-address-hex:hello",
772                "sleep:1",
773                "call-function:hello"])
774
775        # Run the process
776        self.add_register_info_collection_packets()
777        self.add_process_info_collection_packets()
778        self.test_sequence.add_log_lines(
779            [  # Start running after initial stop.
780                "read packet: $c#63",
781                # Match output line that prints the memory address of the function call entry point.
782                # Note we require launch-only testing so we can get inferior otuput.
783                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"),
784                 "capture": {1: "function_address"}},
785                # Now stop the inferior.
786                "read packet: {}".format(chr(3)),
787                # And wait for the stop notification.
788                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
789            True)
790
791        # Run the packet stream.
792        context = self.expect_gdbremote_sequence()
793        self.assertIsNotNone(context)
794
795        # Gather process info - we need endian of target to handle register
796        # value conversions.
797        process_info = self.parse_process_info_response(context)
798        endian = process_info.get("endian")
799        self.assertIsNotNone(endian)
800
801        # Gather register info entries.
802        reg_infos = self.parse_register_info_packets(context)
803        (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_infos)
804        self.assertIsNotNone(pc_lldb_reg_index)
805        self.assertIsNotNone(pc_reg_info)
806
807        # Grab the function address.
808        self.assertIsNotNone(context.get("function_address"))
809        function_address = int(context.get("function_address"), 16)
810
811        # Get current target architecture
812        target_arch = self.getArchitecture()
813
814        # Set the breakpoint.
815        if target_arch in ["arm", "arm64", "aarch64"]:
816            # TODO: Handle case when setting breakpoint in thumb code
817            BREAKPOINT_KIND = 4
818        else:
819            BREAKPOINT_KIND = 1
820
821        # Set default packet type to Z0 (software breakpoint)
822        z_packet_type = 0
823
824        # If hardware breakpoint is requested set packet type to Z1
825        if want_hardware == True:
826            z_packet_type = 1
827
828        self.reset_test_sequence()
829        self.add_set_breakpoint_packets(
830            function_address,
831            z_packet_type,
832            do_continue=True,
833            breakpoint_kind=BREAKPOINT_KIND)
834
835        # Run the packet stream.
836        context = self.expect_gdbremote_sequence()
837        self.assertIsNotNone(context)
838
839        # Verify the stop signal reported was the breakpoint signal number.
840        stop_signo = context.get("stop_signo")
841        self.assertIsNotNone(stop_signo)
842        self.assertEqual(int(stop_signo, 16),
843                         lldbutil.get_signal_number('SIGTRAP'))
844
845        # Ensure we did not receive any output.  If the breakpoint was not set, we would
846        # see output (from a launched process with captured stdio) printing a hello, world message.
847        # That would indicate the breakpoint didn't take.
848        self.assertEqual(len(context["O_content"]), 0)
849
850        # Verify that the PC for the main thread is where we expect it - right at the breakpoint address.
851        # This acts as a another validation on the register reading code.
852        self.reset_test_sequence()
853        self.test_sequence.add_log_lines(
854            [
855                # Print the PC.  This should match the breakpoint address.
856                "read packet: $p{0:x}#00".format(pc_lldb_reg_index),
857                # Capture $p results.
858                {"direction": "send",
859                 "regex": r"^\$([0-9a-fA-F]+)#",
860                 "capture": {1: "p_response"}},
861            ], True)
862
863        context = self.expect_gdbremote_sequence()
864        self.assertIsNotNone(context)
865
866        # Verify the PC is where we expect.  Note response is in endianness of
867        # the inferior.
868        p_response = context.get("p_response")
869        self.assertIsNotNone(p_response)
870
871        # Convert from target endian to int.
872        returned_pc = lldbgdbserverutils.unpack_register_hex_unsigned(
873            endian, p_response)
874        self.assertEqual(returned_pc, function_address)
875
876        # Verify that a breakpoint remove and continue gets us the expected
877        # output.
878        self.reset_test_sequence()
879
880        # Add breakpoint remove packets
881        self.add_remove_breakpoint_packets(
882            function_address,
883            z_packet_type,
884            breakpoint_kind=BREAKPOINT_KIND)
885
886        self.test_sequence.add_log_lines(
887            [
888                # Continue running.
889                "read packet: $c#63",
890                # We should now receive the output from the call.
891                {"type": "output_match", "regex": r"^hello, world\r\n$"},
892                # And wait for program completion.
893                {"direction": "send", "regex": r"^\$W00(.*)#[0-9a-fA-F]{2}$"},
894            ], True)
895
896        context = self.expect_gdbremote_sequence()
897        self.assertIsNotNone(context)
898
899    @skipIfWindows # No pty support to test any inferior output
900    def test_software_breakpoint_set_and_remove_work(self):
901        if self.getArchitecture() == "arm":
902            # TODO: Handle case when setting breakpoint in thumb code
903            self.build(dictionary={'CFLAGS_EXTRAS': '-marm'})
904        else:
905            self.build()
906        self.set_inferior_startup_launch()
907        self.breakpoint_set_and_remove_work(want_hardware=False)
908
909    @skipUnlessPlatform(oslist=['linux'])
910    @skipIf(archs=no_match(['arm', 'aarch64']))
911    def test_hardware_breakpoint_set_and_remove_work(self):
912        if self.getArchitecture() == "arm":
913            # TODO: Handle case when setting breakpoint in thumb code
914            self.build(dictionary={'CFLAGS_EXTRAS': '-marm'})
915        else:
916            self.build()
917        self.set_inferior_startup_launch()
918        self.breakpoint_set_and_remove_work(want_hardware=True)
919
920    def get_qSupported_dict(self, features=[]):
921        self.build()
922        self.set_inferior_startup_launch()
923
924        # Start up the stub and start/prep the inferior.
925        procs = self.prep_debug_monitor_and_inferior()
926        self.add_qSupported_packets(features)
927
928        # Run the packet stream.
929        context = self.expect_gdbremote_sequence()
930        self.assertIsNotNone(context)
931
932        # Retrieve the qSupported features.
933        return self.parse_qSupported_response(context)
934
935    def test_qSupported_returns_known_stub_features(self):
936        supported_dict = self.get_qSupported_dict()
937        self.assertIsNotNone(supported_dict)
938        self.assertTrue(len(supported_dict) > 0)
939
940    def test_qSupported_auvx(self):
941        expected = ('+' if lldbplatformutil.getPlatform()
942                    in ["freebsd", "linux", "netbsd"] else '-')
943        supported_dict = self.get_qSupported_dict()
944        self.assertEqual(supported_dict.get('qXfer:auxv:read', '-'), expected)
945
946    def test_qSupported_libraries_svr4(self):
947        expected = ('+' if lldbplatformutil.getPlatform()
948                    in ["freebsd", "linux", "netbsd"] else '-')
949        supported_dict = self.get_qSupported_dict()
950        self.assertEqual(supported_dict.get('qXfer:libraries-svr4:read', '-'),
951                         expected)
952
953    def test_qSupported_siginfo_read(self):
954        expected = ('+' if lldbplatformutil.getPlatform()
955                    in ["freebsd", "linux"] else '-')
956        supported_dict = self.get_qSupported_dict()
957        self.assertEqual(supported_dict.get('qXfer:siginfo:read', '-'),
958                         expected)
959
960    def test_qSupported_QPassSignals(self):
961        expected = ('+' if lldbplatformutil.getPlatform()
962                    in ["freebsd", "linux", "netbsd"] else '-')
963        supported_dict = self.get_qSupported_dict()
964        self.assertEqual(supported_dict.get('QPassSignals', '-'), expected)
965
966    @add_test_categories(["fork"])
967    def test_qSupported_fork_events(self):
968        supported_dict = (
969            self.get_qSupported_dict(['multiprocess+', 'fork-events+']))
970        self.assertEqual(supported_dict.get('multiprocess', '-'), '+')
971        self.assertEqual(supported_dict.get('fork-events', '-'), '+')
972        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
973
974    @add_test_categories(["fork"])
975    def test_qSupported_fork_events_without_multiprocess(self):
976        supported_dict = (
977            self.get_qSupported_dict(['fork-events+']))
978        self.assertEqual(supported_dict.get('multiprocess', '-'), '-')
979        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
980        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
981
982    @add_test_categories(["fork"])
983    def test_qSupported_vfork_events(self):
984        supported_dict = (
985            self.get_qSupported_dict(['multiprocess+', 'vfork-events+']))
986        self.assertEqual(supported_dict.get('multiprocess', '-'), '+')
987        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
988        self.assertEqual(supported_dict.get('vfork-events', '-'), '+')
989
990    @add_test_categories(["fork"])
991    def test_qSupported_vfork_events_without_multiprocess(self):
992        supported_dict = (
993            self.get_qSupported_dict(['vfork-events+']))
994        self.assertEqual(supported_dict.get('multiprocess', '-'), '-')
995        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
996        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
997
998    # We need to be able to self.runCmd to get cpuinfo,
999    # which is not possible when using a remote platform.
1000    @skipIfRemote
1001    def test_qSupported_memory_tagging(self):
1002        supported_dict = self.get_qSupported_dict()
1003        self.assertEqual(supported_dict.get("memory-tagging", '-'),
1004                         '+' if self.isAArch64MTE() else '-')
1005
1006    @skipIfWindows # No pty support to test any inferior output
1007    def test_written_M_content_reads_back_correctly(self):
1008        self.build()
1009        self.set_inferior_startup_launch()
1010
1011        TEST_MESSAGE = "Hello, memory"
1012
1013        # Start up the stub and start/prep the inferior.
1014        procs = self.prep_debug_monitor_and_inferior(
1015            inferior_args=[
1016                "set-message:xxxxxxxxxxxxxX",
1017                "get-data-address-hex:g_message",
1018                "sleep:1",
1019                "print-message:"])
1020        self.test_sequence.add_log_lines(
1021            [
1022                # Start running after initial stop.
1023                "read packet: $c#63",
1024                # Match output line that prints the memory address of the message buffer within the inferior.
1025                # Note we require launch-only testing so we can get inferior otuput.
1026                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
1027                 "capture": {1: "message_address"}},
1028                # Now stop the inferior.
1029                "read packet: {}".format(chr(3)),
1030                # And wait for the stop notification.
1031                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
1032            True)
1033        context = self.expect_gdbremote_sequence()
1034        self.assertIsNotNone(context)
1035
1036        # Grab the message address.
1037        self.assertIsNotNone(context.get("message_address"))
1038        message_address = int(context.get("message_address"), 16)
1039
1040        # Hex-encode the test message, adding null termination.
1041        hex_encoded_message = seven.hexlify(TEST_MESSAGE)
1042
1043        # Write the message to the inferior. Verify that we can read it with the hex-encoded (m)
1044        # and binary (x) memory read packets.
1045        self.reset_test_sequence()
1046        self.test_sequence.add_log_lines(
1047            ["read packet: $M{0:x},{1:x}:{2}#00".format(message_address, len(TEST_MESSAGE), hex_encoded_message),
1048             "send packet: $OK#00",
1049             "read packet: $m{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)),
1050             "send packet: ${0}#00".format(hex_encoded_message),
1051             "read packet: $x{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)),
1052             "send packet: ${0}#00".format(TEST_MESSAGE),
1053             "read packet: $m{0:x},4#00".format(message_address),
1054             "send packet: ${0}#00".format(hex_encoded_message[0:8]),
1055             "read packet: $x{0:x},4#00".format(message_address),
1056             "send packet: ${0}#00".format(TEST_MESSAGE[0:4]),
1057             "read packet: $c#63",
1058             {"type": "output_match", "regex": r"^message: (.+)\r\n$", "capture": {1: "printed_message"}},
1059             "send packet: $W00#00",
1060             ], True)
1061        context = self.expect_gdbremote_sequence()
1062        self.assertIsNotNone(context)
1063
1064        # Ensure what we read from inferior memory is what we wrote.
1065        printed_message = context.get("printed_message")
1066        self.assertIsNotNone(printed_message)
1067        self.assertEqual(printed_message, TEST_MESSAGE + "X")
1068
1069    # Note: as of this moment, a hefty number of the GPR writes are failing with E32 (everything except rax-rdx, rdi, rsi, rbp).
1070    # Come back to this.  I have the test rigged to verify that at least some
1071    # of the bit-flip writes work.
1072    def test_P_writes_all_gpr_registers(self):
1073        self.build()
1074        self.set_inferior_startup_launch()
1075
1076        # Start inferior debug session, grab all register info.
1077        procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"])
1078        self.add_register_info_collection_packets()
1079        self.add_process_info_collection_packets()
1080
1081        context = self.expect_gdbremote_sequence()
1082        self.assertIsNotNone(context)
1083
1084        # Process register infos.
1085        reg_infos = self.parse_register_info_packets(context)
1086        self.assertIsNotNone(reg_infos)
1087        self.add_lldb_register_index(reg_infos)
1088
1089        # Process endian.
1090        process_info = self.parse_process_info_response(context)
1091        endian = process_info.get("endian")
1092        self.assertIsNotNone(endian)
1093
1094        # Pull out the register infos that we think we can bit flip
1095        # successfully,.
1096        gpr_reg_infos = [
1097            reg_info for reg_info in reg_infos if self.is_bit_flippable_register(reg_info)]
1098        self.assertTrue(len(gpr_reg_infos) > 0)
1099
1100        # Write flipped bit pattern of existing value to each register.
1101        (successful_writes, failed_writes) = self.flip_all_bits_in_each_register_value(
1102            gpr_reg_infos, endian)
1103        self.trace("successful writes: {}, failed writes: {}".format(successful_writes, failed_writes))
1104        self.assertTrue(successful_writes > 0)
1105
1106    # Note: as of this moment, a hefty number of the GPR writes are failing
1107    # with E32 (everything except rax-rdx, rdi, rsi, rbp).
1108    @skipIfWindows
1109    def test_P_and_p_thread_suffix_work(self):
1110        self.build()
1111        self.set_inferior_startup_launch()
1112
1113        # Startup the inferior with three threads.
1114        _, threads = self.launch_with_threads(3)
1115
1116        self.reset_test_sequence()
1117        self.add_thread_suffix_request_packets()
1118        self.add_register_info_collection_packets()
1119        self.add_process_info_collection_packets()
1120
1121        context = self.expect_gdbremote_sequence()
1122        self.assertIsNotNone(context)
1123
1124        process_info = self.parse_process_info_response(context)
1125        self.assertIsNotNone(process_info)
1126        endian = process_info.get("endian")
1127        self.assertIsNotNone(endian)
1128
1129        reg_infos = self.parse_register_info_packets(context)
1130        self.assertIsNotNone(reg_infos)
1131        self.add_lldb_register_index(reg_infos)
1132
1133        reg_index = self.select_modifiable_register(reg_infos)
1134        self.assertIsNotNone(reg_index)
1135        reg_byte_size = int(reg_infos[reg_index]["bitsize"]) // 8
1136        self.assertTrue(reg_byte_size > 0)
1137
1138        expected_reg_values = []
1139        register_increment = 1
1140        next_value = None
1141
1142        # Set the same register in each of 3 threads to a different value.
1143        # Verify each one has the unique value.
1144        for thread in threads:
1145            # If we don't have a next value yet, start it with the initial read
1146            # value + 1
1147            if not next_value:
1148                # Read pre-existing register value.
1149                self.reset_test_sequence()
1150                self.test_sequence.add_log_lines(
1151                    ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread),
1152                     {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1153                     ], True)
1154                context = self.expect_gdbremote_sequence()
1155                self.assertIsNotNone(context)
1156
1157                # Set the next value to use for writing as the increment plus
1158                # current value.
1159                p_response = context.get("p_response")
1160                self.assertIsNotNone(p_response)
1161                next_value = lldbgdbserverutils.unpack_register_hex_unsigned(
1162                    endian, p_response)
1163
1164            # Set new value using P and thread suffix.
1165            self.reset_test_sequence()
1166            self.test_sequence.add_log_lines(
1167                [
1168                    "read packet: $P{0:x}={1};thread:{2:x}#00".format(
1169                        reg_index,
1170                        lldbgdbserverutils.pack_register_hex(
1171                            endian,
1172                            next_value,
1173                            byte_size=reg_byte_size),
1174                        thread),
1175                    "send packet: $OK#00",
1176                ],
1177                True)
1178            context = self.expect_gdbremote_sequence()
1179            self.assertIsNotNone(context)
1180
1181            # Save the value we set.
1182            expected_reg_values.append(next_value)
1183
1184            # Increment value for next thread to use (we want them all
1185            # different so we can verify they wrote to each thread correctly
1186            # next.)
1187            next_value += register_increment
1188
1189        # Revisit each thread and verify they have the expected value set for
1190        # the register we wrote.
1191        thread_index = 0
1192        for thread in threads:
1193            # Read pre-existing register value.
1194            self.reset_test_sequence()
1195            self.test_sequence.add_log_lines(
1196                ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread),
1197                 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1198                 ], True)
1199            context = self.expect_gdbremote_sequence()
1200            self.assertIsNotNone(context)
1201
1202            # Get the register value.
1203            p_response = context.get("p_response")
1204            self.assertIsNotNone(p_response)
1205            read_value = lldbgdbserverutils.unpack_register_hex_unsigned(
1206                endian, p_response)
1207
1208            # Make sure we read back what we wrote.
1209            self.assertEqual(read_value, expected_reg_values[thread_index])
1210            thread_index += 1
1211
1212    @skipIfWindows # No pty support to test any inferior output
1213    @add_test_categories(["llgs"])
1214    def test_launch_via_A(self):
1215        self.build()
1216        exe_path = self.getBuildArtifact("a.out")
1217        args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"]
1218        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1219
1220        server = self.connect_to_debug_monitor()
1221        self.assertIsNotNone(server)
1222        self.do_handshake()
1223        # NB: strictly speaking we should use %x here but this packet
1224        # is deprecated, so no point in changing lldb-server's expectations
1225        self.test_sequence.add_log_lines(
1226            ["read packet: $A %d,0,%s,%d,1,%s,%d,2,%s,%d,3,%s#00" %
1227             tuple(itertools.chain.from_iterable(
1228                 [(len(x), x) for x in hex_args])),
1229             "send packet: $OK#00",
1230             "read packet: $c#00",
1231             "send packet: $W00#00"],
1232            True)
1233        context = self.expect_gdbremote_sequence()
1234        self.assertEqual(context["O_content"],
1235                         b'arg1\r\narg2\r\narg3\r\n')
1236
1237    @skipIfWindows # No pty support to test any inferior output
1238    @add_test_categories(["llgs"])
1239    def test_launch_via_vRun(self):
1240        self.build()
1241        exe_path = self.getBuildArtifact("a.out")
1242        args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"]
1243        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1244
1245        server = self.connect_to_debug_monitor()
1246        self.assertIsNotNone(server)
1247        self.do_handshake()
1248        self.test_sequence.add_log_lines(
1249            ["read packet: $vRun;%s;%s;%s;%s#00" % tuple(hex_args),
1250             {"direction": "send",
1251              "regex": r"^\$T([0-9a-fA-F]+)"},
1252             "read packet: $c#00",
1253             "send packet: $W00#00"],
1254            True)
1255        context = self.expect_gdbremote_sequence()
1256        self.assertEqual(context["O_content"],
1257                         b'arg1\r\narg2\r\narg3\r\n')
1258
1259    @add_test_categories(["llgs"])
1260    def test_launch_via_vRun_no_args(self):
1261        self.build()
1262        exe_path = self.getBuildArtifact("a.out")
1263        hex_path = binascii.b2a_hex(exe_path.encode()).decode()
1264
1265        server = self.connect_to_debug_monitor()
1266        self.assertIsNotNone(server)
1267        self.do_handshake()
1268        self.test_sequence.add_log_lines(
1269            ["read packet: $vRun;%s#00" % (hex_path,),
1270             {"direction": "send",
1271              "regex": r"^\$T([0-9a-fA-F]+)"},
1272             "read packet: $c#00",
1273             "send packet: $W00#00"],
1274            True)
1275        self.expect_gdbremote_sequence()
1276
1277    @skipIfWindows # No pty support to test any inferior output
1278    @add_test_categories(["llgs"])
1279    def test_QEnvironment(self):
1280        self.build()
1281        exe_path = self.getBuildArtifact("a.out")
1282        env = {"FOO": "test", "BAR": "a=z"}
1283        args = [exe_path, "print-env:FOO", "print-env:BAR"]
1284        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1285
1286        server = self.connect_to_debug_monitor()
1287        self.assertIsNotNone(server)
1288        self.do_handshake()
1289
1290        for key, value in env.items():
1291            self.test_sequence.add_log_lines(
1292                ["read packet: $QEnvironment:%s=%s#00" % (key, value),
1293                 "send packet: $OK#00"],
1294                True)
1295        self.test_sequence.add_log_lines(
1296            ["read packet: $vRun;%s#00" % (";".join(hex_args),),
1297             {"direction": "send",
1298              "regex": r"^\$T([0-9a-fA-F]+)"},
1299             "read packet: $c#00",
1300             "send packet: $W00#00"],
1301            True)
1302        context = self.expect_gdbremote_sequence()
1303        self.assertEqual(context["O_content"], b"test\r\na=z\r\n")
1304
1305    @skipIfWindows # No pty support to test any inferior output
1306    @add_test_categories(["llgs"])
1307    def test_QEnvironmentHexEncoded(self):
1308        self.build()
1309        exe_path = self.getBuildArtifact("a.out")
1310        env = {"FOO": "test", "BAR": "a=z", "BAZ": "a*}#z"}
1311        args = [exe_path, "print-env:FOO", "print-env:BAR", "print-env:BAZ"]
1312        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1313
1314        server = self.connect_to_debug_monitor()
1315        self.assertIsNotNone(server)
1316        self.do_handshake()
1317
1318        for key, value in env.items():
1319            hex_enc = binascii.b2a_hex(("%s=%s" % (key, value)).encode()).decode()
1320            self.test_sequence.add_log_lines(
1321                ["read packet: $QEnvironmentHexEncoded:%s#00" % (hex_enc,),
1322                 "send packet: $OK#00"],
1323                True)
1324        self.test_sequence.add_log_lines(
1325            ["read packet: $vRun;%s#00" % (";".join(hex_args),),
1326             {"direction": "send",
1327              "regex": r"^\$T([0-9a-fA-F]+)"},
1328             "read packet: $c#00",
1329             "send packet: $W00#00"],
1330            True)
1331        context = self.expect_gdbremote_sequence()
1332        self.assertEqual(context["O_content"], b"test\r\na=z\r\na*}#z\r\n")
1333
1334    @skipUnlessPlatform(oslist=["freebsd", "linux"])
1335    @add_test_categories(["llgs"])
1336    def test_qXfer_siginfo_read(self):
1337        self.build()
1338        self.set_inferior_startup_launch()
1339        procs = self.prep_debug_monitor_and_inferior(
1340                inferior_args=["thread:segfault", "thread:new", "sleep:10"])
1341        self.test_sequence.add_log_lines(["read packet: $c#63"], True)
1342        self.expect_gdbremote_sequence()
1343
1344        # Run until SIGSEGV comes in.
1345        self.reset_test_sequence()
1346        self.test_sequence.add_log_lines(
1347            [{"direction": "send",
1348              "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1349              "capture": {1: "signo", 2: "thread_id"},
1350              }], True)
1351
1352        # Figure out which thread crashed.
1353        context = self.expect_gdbremote_sequence()
1354        self.assertIsNotNone(context)
1355        self.assertEqual(int(context["signo"], 16),
1356                         lldbutil.get_signal_number('SIGSEGV'))
1357        crashing_thread = int(context["thread_id"], 16)
1358
1359        # Grab siginfo for the crashing thread.
1360        self.reset_test_sequence()
1361        self.add_process_info_collection_packets()
1362        self.test_sequence.add_log_lines(
1363            ["read packet: $Hg{:x}#00".format(crashing_thread),
1364             "send packet: $OK#00",
1365             "read packet: $qXfer:siginfo:read::0,80:#00",
1366             {"direction": "send",
1367              "regex": re.compile(r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
1368                                  re.MULTILINE | re.DOTALL),
1369              "capture": {1: "response_type", 2: "content_raw"},
1370              }], True)
1371        context = self.expect_gdbremote_sequence()
1372        self.assertIsNotNone(context)
1373
1374        # Ensure we end up with all data in one packet.
1375        self.assertEqual(context.get("response_type"), "l")
1376
1377        # Decode binary data.
1378        content_raw = context.get("content_raw")
1379        self.assertIsNotNone(content_raw)
1380        content = self.decode_gdbremote_binary(content_raw).encode("latin1")
1381
1382        # Decode siginfo_t.
1383        process_info = self.parse_process_info_response(context)
1384        pad = ""
1385        if process_info["ptrsize"] == "8":
1386            pad = "i"
1387        signo_idx = 0
1388        errno_idx = 1
1389        code_idx = 2
1390        addr_idx = -1
1391        SEGV_MAPERR = 1
1392        if process_info["ostype"] == "linux":
1393            # si_signo, si_errno, si_code, [pad], _sifields._sigfault.si_addr
1394            format_str = "iii{}P".format(pad)
1395        elif process_info["ostype"].startswith("freebsd"):
1396            # si_signo, si_errno, si_code, si_pid, si_uid, si_status, si_addr
1397            format_str = "iiiiiiP"
1398        elif process_info["ostype"].startswith("netbsd"):
1399            # _signo, _code, _errno, [pad], _reason._fault._addr
1400            format_str = "iii{}P".format(pad)
1401            errno_idx = 2
1402            code_idx = 1
1403        else:
1404            assert False, "unknown ostype"
1405
1406        decoder = struct.Struct(format_str)
1407        decoded = decoder.unpack(content[:decoder.size])
1408        self.assertEqual(decoded[signo_idx],
1409                         lldbutil.get_signal_number('SIGSEGV'))
1410        self.assertEqual(decoded[errno_idx], 0)  # si_errno
1411        self.assertEqual(decoded[code_idx], SEGV_MAPERR)  # si_code
1412        self.assertEqual(decoded[addr_idx], 0)  # si_addr
1413