1"""
2Test case for testing the gdbremote protocol.
3
4Tests run against debugserver and lldb-server (llgs).
5lldb-server tests run where the lldb-server exe is
6available.
7
8This class will be broken into smaller test case classes by
9gdb remote packet functional areas.  For now it contains
10the initial set of tests implemented.
11"""
12
13import binascii
14import itertools
15import struct
16
17import unittest2
18import gdbremote_testcase
19import lldbgdbserverutils
20from lldbsuite.support import seven
21from lldbsuite.test.decorators import *
22from lldbsuite.test.lldbtest import *
23from lldbsuite.test.lldbdwarf import *
24from lldbsuite.test import lldbutil, lldbplatformutil
25
26
27class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase, DwarfOpcodeParser):
28
29    def test_thread_suffix_supported(self):
30        server = self.connect_to_debug_monitor()
31        self.assertIsNotNone(server)
32
33        self.do_handshake()
34        self.test_sequence.add_log_lines(
35            ["lldb-server <  26> read packet: $QThreadSuffixSupported#e4",
36             "lldb-server <   6> send packet: $OK#9a"],
37            True)
38
39        self.expect_gdbremote_sequence()
40
41
42    def test_list_threads_in_stop_reply_supported(self):
43        server = self.connect_to_debug_monitor()
44        self.assertIsNotNone(server)
45
46        self.do_handshake()
47        self.test_sequence.add_log_lines(
48            ["lldb-server <  27> read packet: $QListThreadsInStopReply#21",
49             "lldb-server <   6> send packet: $OK#9a"],
50            True)
51        self.expect_gdbremote_sequence()
52
53    def test_c_packet_works(self):
54        self.build()
55        procs = self.prep_debug_monitor_and_inferior()
56        self.test_sequence.add_log_lines(
57            ["read packet: $c#63",
58             "send packet: $W00#00"],
59            True)
60
61        self.expect_gdbremote_sequence()
62
63    @skipIfWindows # No pty support to test any inferior output
64    def test_inferior_print_exit(self):
65        self.build()
66        procs = self.prep_debug_monitor_and_inferior(
67                inferior_args=["hello, world"])
68        self.test_sequence.add_log_lines(
69            ["read packet: $vCont;c#a8",
70             {"type": "output_match", "regex": self.maybe_strict_output_regex(r"hello, world\r\n")},
71             "send packet: $W00#00"],
72            True)
73
74        context = self.expect_gdbremote_sequence()
75        self.assertIsNotNone(context)
76
77    def test_first_launch_stop_reply_thread_matches_first_qC(self):
78        self.build()
79        procs = self.prep_debug_monitor_and_inferior()
80        self.test_sequence.add_log_lines(["read packet: $qC#00",
81                                          {"direction": "send",
82                                           "regex": r"^\$QC([0-9a-fA-F]+)#",
83                                           "capture": {1: "thread_id_QC"}},
84                                          "read packet: $?#00",
85                                          {"direction": "send",
86                                              "regex": r"^\$T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+)",
87                                              "capture": {1: "thread_id_?"}}],
88                                         True)
89        context = self.expect_gdbremote_sequence()
90        self.assertEqual(context.get("thread_id_QC"), context.get("thread_id_?"))
91
92    def test_attach_commandline_continue_app_exits(self):
93        self.build()
94        self.set_inferior_startup_attach()
95        procs = self.prep_debug_monitor_and_inferior()
96        self.test_sequence.add_log_lines(
97            ["read packet: $vCont;c#a8",
98             "send packet: $W00#00"],
99            True)
100        self.expect_gdbremote_sequence()
101
102        # Wait a moment for completed and now-detached inferior process to
103        # clear.
104        time.sleep(1)
105
106        if not lldb.remote_platform:
107            # Process should be dead now. Reap results.
108            poll_result = procs["inferior"].poll()
109            self.assertIsNotNone(poll_result)
110
111        # Where possible, verify at the system level that the process is not
112        # running.
113        self.assertFalse(
114            lldbgdbserverutils.process_is_running(
115                procs["inferior"].pid, False))
116
117    def test_qRegisterInfo_returns_one_valid_result(self):
118        self.build()
119        self.prep_debug_monitor_and_inferior()
120        self.test_sequence.add_log_lines(
121            ["read packet: $qRegisterInfo0#00",
122             {"direction": "send", "regex": r"^\$(.+);#[0-9A-Fa-f]{2}", "capture": {1: "reginfo_0"}}],
123            True)
124
125        # Run the stream
126        context = self.expect_gdbremote_sequence()
127        self.assertIsNotNone(context)
128
129        reg_info_packet = context.get("reginfo_0")
130        self.assertIsNotNone(reg_info_packet)
131        self.assert_valid_reg_info(
132            lldbgdbserverutils.parse_reg_info_response(reg_info_packet))
133
134    def test_qRegisterInfo_returns_all_valid_results(self):
135        self.build()
136        self.prep_debug_monitor_and_inferior()
137        self.add_register_info_collection_packets()
138
139        # Run the stream.
140        context = self.expect_gdbremote_sequence()
141        self.assertIsNotNone(context)
142
143        # Validate that each register info returned validates.
144        for reg_info in self.parse_register_info_packets(context):
145            self.assert_valid_reg_info(reg_info)
146
147    def test_qRegisterInfo_contains_required_generics_debugserver(self):
148        self.build()
149        self.prep_debug_monitor_and_inferior()
150        self.add_register_info_collection_packets()
151
152        # Run the packet stream.
153        context = self.expect_gdbremote_sequence()
154        self.assertIsNotNone(context)
155
156        # Gather register info entries.
157        reg_infos = self.parse_register_info_packets(context)
158
159        # Collect all generic registers found.
160        generic_regs = {
161            reg_info['generic']: 1 for reg_info in reg_infos if 'generic' in reg_info}
162
163        # Ensure we have a program counter register.
164        self.assertIn('pc', generic_regs)
165
166        # Ensure we have a frame pointer register. PPC64le's FP is the same as SP
167        if self.getArchitecture() != 'powerpc64le':
168            self.assertIn('fp', generic_regs)
169
170        # Ensure we have a stack pointer register.
171        self.assertIn('sp', generic_regs)
172
173        # Ensure we have a flags register.
174        self.assertIn('flags', generic_regs)
175
176    def test_qRegisterInfo_contains_at_least_one_register_set(self):
177        self.build()
178        self.prep_debug_monitor_and_inferior()
179        self.add_register_info_collection_packets()
180
181        # Run the packet stream.
182        context = self.expect_gdbremote_sequence()
183        self.assertIsNotNone(context)
184
185        # Gather register info entries.
186        reg_infos = self.parse_register_info_packets(context)
187
188        # Collect all register sets found.
189        register_sets = {
190            reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info}
191        self.assertTrue(len(register_sets) >= 1)
192
193    def targetHasAVX(self):
194        triple = self.dbg.GetSelectedPlatform().GetTriple()
195
196        # TODO other platforms, please implement this function
197        if not re.match(".*-.*-linux", triple):
198            return True
199
200        # Need to do something different for non-Linux/Android targets
201        if lldb.remote_platform:
202            self.runCmd('platform get-file "/proc/cpuinfo" "cpuinfo"')
203            cpuinfo_path = "cpuinfo"
204            self.addTearDownHook(lambda: os.unlink("cpuinfo"))
205        else:
206            cpuinfo_path = "/proc/cpuinfo"
207
208        f = open(cpuinfo_path, 'r')
209        cpuinfo = f.read()
210        f.close()
211        return " avx " in cpuinfo
212
213    @expectedFailureAll(oslist=["windows"]) # no avx for now.
214    @skipIf(archs=no_match(['amd64', 'i386', 'x86_64']))
215    @add_test_categories(["llgs"])
216    def test_qRegisterInfo_contains_avx_registers(self):
217        self.build()
218        self.prep_debug_monitor_and_inferior()
219        self.add_register_info_collection_packets()
220
221        # Run the packet stream.
222        context = self.expect_gdbremote_sequence()
223        self.assertIsNotNone(context)
224
225        # Gather register info entries.
226        reg_infos = self.parse_register_info_packets(context)
227
228        # Collect all generics found.
229        register_sets = {
230            reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info}
231        self.assertEqual(
232            self.targetHasAVX(),
233            "Advanced Vector Extensions" in register_sets)
234
235    def qThreadInfo_contains_thread(self):
236        procs = self.prep_debug_monitor_and_inferior()
237        self.add_threadinfo_collection_packets()
238
239        # Run the packet stream.
240        context = self.expect_gdbremote_sequence()
241        self.assertIsNotNone(context)
242
243        # Gather threadinfo entries.
244        threads = self.parse_threadinfo_packets(context)
245        self.assertIsNotNone(threads)
246
247        # We should have exactly one thread.
248        self.assertEqual(len(threads), 1)
249
250    def test_qThreadInfo_contains_thread_launch(self):
251        self.build()
252        self.set_inferior_startup_launch()
253        self.qThreadInfo_contains_thread()
254
255    @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped
256    def test_qThreadInfo_contains_thread_attach(self):
257        self.build()
258        self.set_inferior_startup_attach()
259        self.qThreadInfo_contains_thread()
260
261    def qThreadInfo_matches_qC(self):
262        procs = self.prep_debug_monitor_and_inferior()
263
264        self.add_threadinfo_collection_packets()
265        self.test_sequence.add_log_lines(
266            ["read packet: $qC#00",
267             {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}
268             ], True)
269
270        # Run the packet stream.
271        context = self.expect_gdbremote_sequence()
272        self.assertIsNotNone(context)
273
274        # Gather threadinfo entries.
275        threads = self.parse_threadinfo_packets(context)
276        self.assertIsNotNone(threads)
277
278        # We should have exactly one thread from threadinfo.
279        self.assertEqual(len(threads), 1)
280
281        # We should have a valid thread_id from $QC.
282        QC_thread_id_hex = context.get("thread_id")
283        self.assertIsNotNone(QC_thread_id_hex)
284        QC_thread_id = int(QC_thread_id_hex, 16)
285
286        # Those two should be the same.
287        self.assertEqual(threads[0], QC_thread_id)
288
289    def test_qThreadInfo_matches_qC_launch(self):
290        self.build()
291        self.set_inferior_startup_launch()
292        self.qThreadInfo_matches_qC()
293
294    @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped
295    def test_qThreadInfo_matches_qC_attach(self):
296        self.build()
297        self.set_inferior_startup_attach()
298        self.qThreadInfo_matches_qC()
299
300    def test_p_returns_correct_data_size_for_each_qRegisterInfo_launch(self):
301        self.build()
302        self.set_inferior_startup_launch()
303        procs = self.prep_debug_monitor_and_inferior()
304        self.add_register_info_collection_packets()
305
306        # Run the packet stream.
307        context = self.expect_gdbremote_sequence()
308        self.assertIsNotNone(context)
309
310        # Gather register info entries.
311        reg_infos = self.parse_register_info_packets(context)
312        self.assertIsNotNone(reg_infos)
313        self.assertTrue(len(reg_infos) > 0)
314
315        byte_order = self.get_target_byte_order()
316
317        # Read value for each register.
318        reg_index = 0
319        for reg_info in reg_infos:
320            # Skip registers that don't have a register set.  For x86, these are
321            # the DRx registers, which have no LLDB-kind register number and thus
322            # cannot be read via normal
323            # NativeRegisterContext::ReadRegister(reg_info,...) calls.
324            if not "set" in reg_info:
325                continue
326
327            # Clear existing packet expectations.
328            self.reset_test_sequence()
329
330            # Run the register query
331            self.test_sequence.add_log_lines(
332                ["read packet: $p{0:x}#00".format(reg_index),
333                 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}],
334                True)
335            context = self.expect_gdbremote_sequence()
336            self.assertIsNotNone(context)
337
338            # Verify the response length.
339            p_response = context.get("p_response")
340            self.assertIsNotNone(p_response)
341
342            # Skip erraneous (unsupported) registers.
343            # TODO: remove this once we make unsupported registers disappear.
344            if p_response.startswith("E") and len(p_response) == 3:
345                continue
346
347            if "dynamic_size_dwarf_expr_bytes" in reg_info:
348                self.updateRegInfoBitsize(reg_info, byte_order)
349            self.assertEqual(len(p_response), 2 * int(reg_info["bitsize"]) / 8,
350                             reg_info)
351
352            # Increment loop
353            reg_index += 1
354
355    def Hg_switches_to_3_threads(self, pass_pid=False):
356        _, threads = self.launch_with_threads(3)
357
358        pid_str = ""
359        if pass_pid:
360            pid_str = "p{0:x}.".format(procs["inferior"].pid)
361
362        # verify we can $H to each thead, and $qC matches the thread we set.
363        for thread in threads:
364            # Change to each thread, verify current thread id.
365            self.reset_test_sequence()
366            self.test_sequence.add_log_lines(
367                ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread),  # Set current thread.
368                 "send packet: $OK#00",
369                 "read packet: $qC#00",
370                 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}],
371                True)
372
373            context = self.expect_gdbremote_sequence()
374            self.assertIsNotNone(context)
375
376            # Verify the thread id.
377            self.assertIsNotNone(context.get("thread_id"))
378            self.assertEqual(int(context.get("thread_id"), 16), thread)
379
380    @skipIf(compiler="clang", compiler_version=['<', '11.0'])
381    def test_Hg_switches_to_3_threads_launch(self):
382        self.build()
383        self.set_inferior_startup_launch()
384        self.Hg_switches_to_3_threads()
385
386    def Hg_fails_on_pid(self, pass_pid):
387        _, threads = self.launch_with_threads(2)
388
389        if pass_pid == -1:
390            pid_str = "p-1."
391        else:
392            pid_str = "p{0:x}.".format(pass_pid)
393        thread = threads[1]
394
395        self.test_sequence.add_log_lines(
396            ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread),  # Set current thread.
397             "send packet: $Eff#00"],
398            True)
399
400        self.expect_gdbremote_sequence()
401
402    @add_test_categories(["llgs"])
403    def test_Hg_fails_on_another_pid(self):
404        self.build()
405        self.set_inferior_startup_launch()
406        self.Hg_fails_on_pid(1)
407
408    @add_test_categories(["llgs"])
409    def test_Hg_fails_on_zero_pid(self):
410        self.build()
411        self.set_inferior_startup_launch()
412        self.Hg_fails_on_pid(0)
413
414    @add_test_categories(["llgs"])
415    def test_Hg_fails_on_minus_one_pid(self):
416        self.build()
417        self.set_inferior_startup_launch()
418        self.Hg_fails_on_pid(-1)
419
420    def Hc_then_Csignal_signals_correct_thread(self, segfault_signo):
421        # NOTE only run this one in inferior-launched mode: we can't grab inferior stdout when running attached,
422        # and the test requires getting stdout from the exe.
423
424        NUM_THREADS = 3
425
426        # Startup the inferior with three threads (main + NUM_THREADS-1 worker threads).
427        # inferior_args=["thread:print-ids"]
428        inferior_args = ["thread:segfault"]
429        for i in range(NUM_THREADS - 1):
430            # if i > 0:
431                # Give time between thread creation/segfaulting for the handler to work.
432                # inferior_args.append("sleep:1")
433            inferior_args.append("thread:new")
434        inferior_args.append("sleep:10")
435
436        # Launch/attach.  (In our case, this should only ever be launched since
437        # we need inferior stdout/stderr).
438        procs = self.prep_debug_monitor_and_inferior(
439            inferior_args=inferior_args)
440        self.test_sequence.add_log_lines(["read packet: $c#63"], True)
441        context = self.expect_gdbremote_sequence()
442
443        signaled_tids = {}
444        print_thread_ids = {}
445
446        # Switch to each thread, deliver a signal, and verify signal delivery
447        for i in range(NUM_THREADS - 1):
448            # Run until SIGSEGV comes in.
449            self.reset_test_sequence()
450            self.test_sequence.add_log_lines([{"direction": "send",
451                                               "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
452                                               "capture": {1: "signo",
453                                                            2: "thread_id"}}],
454                                             True)
455
456            context = self.expect_gdbremote_sequence()
457            self.assertIsNotNone(context)
458            signo = context.get("signo")
459            self.assertEqual(int(signo, 16), segfault_signo)
460
461            # Ensure we haven't seen this tid yet.
462            thread_id = int(context.get("thread_id"), 16)
463            self.assertNotIn(thread_id, signaled_tids)
464            signaled_tids[thread_id] = 1
465
466            # Send SIGUSR1 to the thread that signaled the SIGSEGV.
467            self.reset_test_sequence()
468            self.test_sequence.add_log_lines(
469                [
470                    # Set the continue thread.
471                    # Set current thread.
472                    "read packet: $Hc{0:x}#00".format(thread_id),
473                    "send packet: $OK#00",
474
475                    # Continue sending the signal number to the continue thread.
476                    # The commented out packet is a way to do this same operation without using
477                    # a $Hc (but this test is testing $Hc, so we'll stick with the former).
478                    "read packet: $C{0:x}#00".format(lldbutil.get_signal_number('SIGUSR1')),
479                    # "read packet: $vCont;C{0:x}:{1:x};c#00".format(lldbutil.get_signal_number('SIGUSR1'), thread_id),
480
481                    # FIXME: Linux does not report the thread stop on the delivered signal (SIGUSR1 here).  MacOSX debugserver does.
482                    # But MacOSX debugserver isn't guaranteeing the thread the signal handler runs on, so currently its an XFAIL.
483                    # Need to rectify behavior here.  The linux behavior is more intuitive to me since we're essentially swapping out
484                    # an about-to-be-delivered signal (for which we already sent a stop packet) to a different signal.
485                    # {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
486                    #  "read packet: $c#63",
487                    {"type": "output_match", "regex": r"^received SIGUSR1 on thread id: ([0-9a-fA-F]+)\r\nthread ([0-9a-fA-F]+): past SIGSEGV\r\n", "capture": {1: "print_thread_id", 2: "post_handle_thread_id"}},
488                ],
489                True)
490
491            # Run the sequence.
492            context = self.expect_gdbremote_sequence()
493            self.assertIsNotNone(context)
494
495            # Ensure the stop signal is the signal we delivered.
496            # stop_signo = context.get("stop_signo")
497            # self.assertIsNotNone(stop_signo)
498            # self.assertEquals(int(stop_signo,16), lldbutil.get_signal_number('SIGUSR1'))
499
500            # Ensure the stop thread is the thread to which we delivered the signal.
501            # stop_thread_id = context.get("stop_thread_id")
502            # self.assertIsNotNone(stop_thread_id)
503            # self.assertEquals(int(stop_thread_id,16), thread_id)
504
505            # Ensure we haven't seen this thread id yet.  The inferior's
506            # self-obtained thread ids are not guaranteed to match the stub
507            # tids (at least on MacOSX).
508            print_thread_id = context.get("print_thread_id")
509            self.assertIsNotNone(print_thread_id)
510            print_thread_id = int(print_thread_id, 16)
511            self.assertNotIn(print_thread_id, print_thread_ids)
512
513            # Now remember this print (i.e. inferior-reflected) thread id and
514            # ensure we don't hit it again.
515            print_thread_ids[print_thread_id] = 1
516
517            # Ensure post signal-handle thread id matches the thread that
518            # initially raised the SIGSEGV.
519            post_handle_thread_id = context.get("post_handle_thread_id")
520            self.assertIsNotNone(post_handle_thread_id)
521            post_handle_thread_id = int(post_handle_thread_id, 16)
522            self.assertEqual(post_handle_thread_id, print_thread_id)
523
524    @expectedFailureDarwin
525    @skipIfWindows # no SIGSEGV support
526    @expectedFailureNetBSD
527    def test_Hc_then_Csignal_signals_correct_thread_launch(self):
528        self.build()
529        self.set_inferior_startup_launch()
530
531        if self.platformIsDarwin():
532            # Darwin debugserver translates some signals like SIGSEGV into some gdb
533            # expectations about fixed signal numbers.
534            self.Hc_then_Csignal_signals_correct_thread(self.TARGET_EXC_BAD_ACCESS)
535        else:
536            self.Hc_then_Csignal_signals_correct_thread(
537                lldbutil.get_signal_number('SIGSEGV'))
538
539    @skipIfWindows # No pty support to test any inferior output
540    def test_m_packet_reads_memory(self):
541        self.build()
542        self.set_inferior_startup_launch()
543        # This is the memory we will write into the inferior and then ensure we
544        # can read back with $m.
545        MEMORY_CONTENTS = "Test contents 0123456789 ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz"
546
547        # Start up the inferior.
548        procs = self.prep_debug_monitor_and_inferior(
549            inferior_args=[
550                "set-message:%s" %
551                MEMORY_CONTENTS,
552                "get-data-address-hex:g_message",
553                "sleep:5"])
554
555        # Run the process
556        self.test_sequence.add_log_lines(
557            [
558                # Start running after initial stop.
559                "read packet: $c#63",
560                # Match output line that prints the memory address of the message buffer within the inferior.
561                # Note we require launch-only testing so we can get inferior otuput.
562                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
563                 "capture": {1: "message_address"}},
564                # Now stop the inferior.
565                "read packet: {}".format(chr(3)),
566                # And wait for the stop notification.
567                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
568            True)
569
570        # Run the packet stream.
571        context = self.expect_gdbremote_sequence()
572        self.assertIsNotNone(context)
573
574        # Grab the message address.
575        self.assertIsNotNone(context.get("message_address"))
576        message_address = int(context.get("message_address"), 16)
577
578        # Grab contents from the inferior.
579        self.reset_test_sequence()
580        self.test_sequence.add_log_lines(
581            ["read packet: $m{0:x},{1:x}#00".format(message_address, len(MEMORY_CONTENTS)),
582             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "read_contents"}}],
583            True)
584
585        # Run the packet stream.
586        context = self.expect_gdbremote_sequence()
587        self.assertIsNotNone(context)
588
589        # Ensure what we read from inferior memory is what we wrote.
590        self.assertIsNotNone(context.get("read_contents"))
591        read_contents = seven.unhexlify(context.get("read_contents"))
592        self.assertEqual(read_contents, MEMORY_CONTENTS)
593
594    def test_qMemoryRegionInfo_is_supported(self):
595        self.build()
596        self.set_inferior_startup_launch()
597        # Start up the inferior.
598        procs = self.prep_debug_monitor_and_inferior()
599
600        # Ask if it supports $qMemoryRegionInfo.
601        self.test_sequence.add_log_lines(
602            ["read packet: $qMemoryRegionInfo#00",
603             "send packet: $OK#00"
604             ], True)
605        self.expect_gdbremote_sequence()
606
607    @skipIfWindows # No pty support to test any inferior output
608    def test_qMemoryRegionInfo_reports_code_address_as_executable(self):
609        self.build()
610        self.set_inferior_startup_launch()
611
612        # Start up the inferior.
613        procs = self.prep_debug_monitor_and_inferior(
614            inferior_args=["get-code-address-hex:hello", "sleep:5"])
615
616        # Run the process
617        self.test_sequence.add_log_lines(
618            [
619                # Start running after initial stop.
620                "read packet: $c#63",
621                # Match output line that prints the memory address of the message buffer within the inferior.
622                # Note we require launch-only testing so we can get inferior otuput.
623                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"),
624                 "capture": {1: "code_address"}},
625                # Now stop the inferior.
626                "read packet: {}".format(chr(3)),
627                # And wait for the stop notification.
628                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
629            True)
630
631        # Run the packet stream.
632        context = self.expect_gdbremote_sequence()
633        self.assertIsNotNone(context)
634
635        # Grab the code address.
636        self.assertIsNotNone(context.get("code_address"))
637        code_address = int(context.get("code_address"), 16)
638
639        # Grab memory region info from the inferior.
640        self.reset_test_sequence()
641        self.add_query_memory_region_packets(code_address)
642
643        # Run the packet stream.
644        context = self.expect_gdbremote_sequence()
645        self.assertIsNotNone(context)
646        mem_region_dict = self.parse_memory_region_packet(context)
647
648        # Ensure there are no errors reported.
649        self.assertNotIn("error", mem_region_dict)
650
651        # Ensure code address is readable and executable.
652        self.assertIn("permissions", mem_region_dict)
653        self.assertIn("r", mem_region_dict["permissions"])
654        self.assertIn("x", mem_region_dict["permissions"])
655
656        # Ensure the start address and size encompass the address we queried.
657        self.assert_address_within_memory_region(code_address, mem_region_dict)
658
659    @skipIfWindows # No pty support to test any inferior output
660    def test_qMemoryRegionInfo_reports_stack_address_as_rw(self):
661        self.build()
662        self.set_inferior_startup_launch()
663
664        # Start up the inferior.
665        procs = self.prep_debug_monitor_and_inferior(
666            inferior_args=["get-stack-address-hex:", "sleep:5"])
667
668        # Run the process
669        self.test_sequence.add_log_lines(
670            [
671                # Start running after initial stop.
672                "read packet: $c#63",
673                # Match output line that prints the memory address of the message buffer within the inferior.
674                # Note we require launch-only testing so we can get inferior otuput.
675                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"stack address: 0x([0-9a-fA-F]+)\r\n"),
676                 "capture": {1: "stack_address"}},
677                # Now stop the inferior.
678                "read packet: {}".format(chr(3)),
679                # And wait for the stop notification.
680                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
681            True)
682
683        # Run the packet stream.
684        context = self.expect_gdbremote_sequence()
685        self.assertIsNotNone(context)
686
687        # Grab the address.
688        self.assertIsNotNone(context.get("stack_address"))
689        stack_address = int(context.get("stack_address"), 16)
690
691        # Grab memory region info from the inferior.
692        self.reset_test_sequence()
693        self.add_query_memory_region_packets(stack_address)
694
695        # Run the packet stream.
696        context = self.expect_gdbremote_sequence()
697        self.assertIsNotNone(context)
698        mem_region_dict = self.parse_memory_region_packet(context)
699
700        # Ensure there are no errors reported.
701        self.assertNotIn("error", mem_region_dict)
702
703        # Ensure address is readable and executable.
704        self.assertIn("permissions", mem_region_dict)
705        self.assertIn("r", mem_region_dict["permissions"])
706        self.assertIn("w", mem_region_dict["permissions"])
707
708        # Ensure the start address and size encompass the address we queried.
709        self.assert_address_within_memory_region(
710            stack_address, mem_region_dict)
711
712    @skipIfWindows # No pty support to test any inferior output
713    def test_qMemoryRegionInfo_reports_heap_address_as_rw(self):
714        self.build()
715        self.set_inferior_startup_launch()
716
717        # Start up the inferior.
718        procs = self.prep_debug_monitor_and_inferior(
719            inferior_args=["get-heap-address-hex:", "sleep:5"])
720
721        # Run the process
722        self.test_sequence.add_log_lines(
723            [
724                # Start running after initial stop.
725                "read packet: $c#63",
726                # Match output line that prints the memory address of the message buffer within the inferior.
727                # Note we require launch-only testing so we can get inferior otuput.
728                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"heap address: 0x([0-9a-fA-F]+)\r\n"),
729                 "capture": {1: "heap_address"}},
730                # Now stop the inferior.
731                "read packet: {}".format(chr(3)),
732                # And wait for the stop notification.
733                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
734            True)
735
736        # Run the packet stream.
737        context = self.expect_gdbremote_sequence()
738        self.assertIsNotNone(context)
739
740        # Grab the address.
741        self.assertIsNotNone(context.get("heap_address"))
742        heap_address = int(context.get("heap_address"), 16)
743
744        # Grab memory region info from the inferior.
745        self.reset_test_sequence()
746        self.add_query_memory_region_packets(heap_address)
747
748        # Run the packet stream.
749        context = self.expect_gdbremote_sequence()
750        self.assertIsNotNone(context)
751        mem_region_dict = self.parse_memory_region_packet(context)
752
753        # Ensure there are no errors reported.
754        self.assertNotIn("error", mem_region_dict)
755
756        # Ensure address is readable and executable.
757        self.assertIn("permissions", mem_region_dict)
758        self.assertIn("r", mem_region_dict["permissions"])
759        self.assertIn("w", mem_region_dict["permissions"])
760
761        # Ensure the start address and size encompass the address we queried.
762        self.assert_address_within_memory_region(heap_address, mem_region_dict)
763
764    def breakpoint_set_and_remove_work(self, want_hardware):
765        # Start up the inferior.
766        procs = self.prep_debug_monitor_and_inferior(
767            inferior_args=[
768                "get-code-address-hex:hello",
769                "sleep:1",
770                "call-function:hello"])
771
772        # Run the process
773        self.add_register_info_collection_packets()
774        self.add_process_info_collection_packets()
775        self.test_sequence.add_log_lines(
776            [  # Start running after initial stop.
777                "read packet: $c#63",
778                # Match output line that prints the memory address of the function call entry point.
779                # Note we require launch-only testing so we can get inferior otuput.
780                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"),
781                 "capture": {1: "function_address"}},
782                # Now stop the inferior.
783                "read packet: {}".format(chr(3)),
784                # And wait for the stop notification.
785                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
786            True)
787
788        # Run the packet stream.
789        context = self.expect_gdbremote_sequence()
790        self.assertIsNotNone(context)
791
792        # Gather process info - we need endian of target to handle register
793        # value conversions.
794        process_info = self.parse_process_info_response(context)
795        endian = process_info.get("endian")
796        self.assertIsNotNone(endian)
797
798        # Gather register info entries.
799        reg_infos = self.parse_register_info_packets(context)
800        (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_infos)
801        self.assertIsNotNone(pc_lldb_reg_index)
802        self.assertIsNotNone(pc_reg_info)
803
804        # Grab the function address.
805        self.assertIsNotNone(context.get("function_address"))
806        function_address = int(context.get("function_address"), 16)
807
808        # Get current target architecture
809        target_arch = self.getArchitecture()
810
811        # Set the breakpoint.
812        if target_arch in ["arm", "arm64", "aarch64"]:
813            # TODO: Handle case when setting breakpoint in thumb code
814            BREAKPOINT_KIND = 4
815        else:
816            BREAKPOINT_KIND = 1
817
818        # Set default packet type to Z0 (software breakpoint)
819        z_packet_type = 0
820
821        # If hardware breakpoint is requested set packet type to Z1
822        if want_hardware == True:
823            z_packet_type = 1
824
825        self.reset_test_sequence()
826        self.add_set_breakpoint_packets(
827            function_address,
828            z_packet_type,
829            do_continue=True,
830            breakpoint_kind=BREAKPOINT_KIND)
831
832        # Run the packet stream.
833        context = self.expect_gdbremote_sequence()
834        self.assertIsNotNone(context)
835
836        # Verify the stop signal reported was the breakpoint signal number.
837        stop_signo = context.get("stop_signo")
838        self.assertIsNotNone(stop_signo)
839        self.assertEqual(int(stop_signo, 16),
840                         lldbutil.get_signal_number('SIGTRAP'))
841
842        # Ensure we did not receive any output.  If the breakpoint was not set, we would
843        # see output (from a launched process with captured stdio) printing a hello, world message.
844        # That would indicate the breakpoint didn't take.
845        self.assertEqual(len(context["O_content"]), 0)
846
847        # Verify that the PC for the main thread is where we expect it - right at the breakpoint address.
848        # This acts as a another validation on the register reading code.
849        self.reset_test_sequence()
850        self.test_sequence.add_log_lines(
851            [
852                # Print the PC.  This should match the breakpoint address.
853                "read packet: $p{0:x}#00".format(pc_lldb_reg_index),
854                # Capture $p results.
855                {"direction": "send",
856                 "regex": r"^\$([0-9a-fA-F]+)#",
857                 "capture": {1: "p_response"}},
858            ], True)
859
860        context = self.expect_gdbremote_sequence()
861        self.assertIsNotNone(context)
862
863        # Verify the PC is where we expect.  Note response is in endianness of
864        # the inferior.
865        p_response = context.get("p_response")
866        self.assertIsNotNone(p_response)
867
868        # Convert from target endian to int.
869        returned_pc = lldbgdbserverutils.unpack_register_hex_unsigned(
870            endian, p_response)
871        self.assertEqual(returned_pc, function_address)
872
873        # Verify that a breakpoint remove and continue gets us the expected
874        # output.
875        self.reset_test_sequence()
876
877        # Add breakpoint remove packets
878        self.add_remove_breakpoint_packets(
879            function_address,
880            z_packet_type,
881            breakpoint_kind=BREAKPOINT_KIND)
882
883        self.test_sequence.add_log_lines(
884            [
885                # Continue running.
886                "read packet: $c#63",
887                # We should now receive the output from the call.
888                {"type": "output_match", "regex": r"^hello, world\r\n$"},
889                # And wait for program completion.
890                {"direction": "send", "regex": r"^\$W00(.*)#[0-9a-fA-F]{2}$"},
891            ], True)
892
893        context = self.expect_gdbremote_sequence()
894        self.assertIsNotNone(context)
895
896    @skipIfWindows # No pty support to test any inferior output
897    def test_software_breakpoint_set_and_remove_work(self):
898        if self.getArchitecture() == "arm":
899            # TODO: Handle case when setting breakpoint in thumb code
900            self.build(dictionary={'CFLAGS_EXTRAS': '-marm'})
901        else:
902            self.build()
903        self.set_inferior_startup_launch()
904        self.breakpoint_set_and_remove_work(want_hardware=False)
905
906    @skipUnlessPlatform(oslist=['linux'])
907    @skipIf(archs=no_match(['arm', 'aarch64']))
908    def test_hardware_breakpoint_set_and_remove_work(self):
909        if self.getArchitecture() == "arm":
910            # TODO: Handle case when setting breakpoint in thumb code
911            self.build(dictionary={'CFLAGS_EXTRAS': '-marm'})
912        else:
913            self.build()
914        self.set_inferior_startup_launch()
915        self.breakpoint_set_and_remove_work(want_hardware=True)
916
917    def get_qSupported_dict(self, features=[]):
918        self.build()
919        self.set_inferior_startup_launch()
920
921        # Start up the stub and start/prep the inferior.
922        procs = self.prep_debug_monitor_and_inferior()
923        self.add_qSupported_packets(features)
924
925        # Run the packet stream.
926        context = self.expect_gdbremote_sequence()
927        self.assertIsNotNone(context)
928
929        # Retrieve the qSupported features.
930        return self.parse_qSupported_response(context)
931
932    def test_qSupported_returns_known_stub_features(self):
933        supported_dict = self.get_qSupported_dict()
934        self.assertIsNotNone(supported_dict)
935        self.assertTrue(len(supported_dict) > 0)
936
937    def test_qSupported_auvx(self):
938        expected = ('+' if lldbplatformutil.getPlatform()
939                    in ["freebsd", "linux", "netbsd"] else '-')
940        supported_dict = self.get_qSupported_dict()
941        self.assertEqual(supported_dict.get('qXfer:auxv:read', '-'), expected)
942
943    def test_qSupported_libraries_svr4(self):
944        expected = ('+' if lldbplatformutil.getPlatform()
945                    in ["freebsd", "linux", "netbsd"] else '-')
946        supported_dict = self.get_qSupported_dict()
947        self.assertEqual(supported_dict.get('qXfer:libraries-svr4:read', '-'),
948                         expected)
949
950    def test_qSupported_siginfo_read(self):
951        expected = ('+' if lldbplatformutil.getPlatform()
952                    in ["freebsd", "linux"] else '-')
953        supported_dict = self.get_qSupported_dict()
954        self.assertEqual(supported_dict.get('qXfer:siginfo:read', '-'),
955                         expected)
956
957    def test_qSupported_QPassSignals(self):
958        expected = ('+' if lldbplatformutil.getPlatform()
959                    in ["freebsd", "linux", "netbsd"] else '-')
960        supported_dict = self.get_qSupported_dict()
961        self.assertEqual(supported_dict.get('QPassSignals', '-'), expected)
962
963    @add_test_categories(["fork"])
964    def test_qSupported_fork_events(self):
965        supported_dict = (
966            self.get_qSupported_dict(['multiprocess+', 'fork-events+']))
967        self.assertEqual(supported_dict.get('multiprocess', '-'), '+')
968        self.assertEqual(supported_dict.get('fork-events', '-'), '+')
969        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
970
971    @add_test_categories(["fork"])
972    def test_qSupported_fork_events_without_multiprocess(self):
973        supported_dict = (
974            self.get_qSupported_dict(['fork-events+']))
975        self.assertEqual(supported_dict.get('multiprocess', '-'), '-')
976        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
977        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
978
979    @add_test_categories(["fork"])
980    def test_qSupported_vfork_events(self):
981        supported_dict = (
982            self.get_qSupported_dict(['multiprocess+', 'vfork-events+']))
983        self.assertEqual(supported_dict.get('multiprocess', '-'), '+')
984        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
985        self.assertEqual(supported_dict.get('vfork-events', '-'), '+')
986
987    @add_test_categories(["fork"])
988    def test_qSupported_vfork_events_without_multiprocess(self):
989        supported_dict = (
990            self.get_qSupported_dict(['vfork-events+']))
991        self.assertEqual(supported_dict.get('multiprocess', '-'), '-')
992        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
993        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
994
995    # We need to be able to self.runCmd to get cpuinfo,
996    # which is not possible when using a remote platform.
997    @skipIfRemote
998    def test_qSupported_memory_tagging(self):
999        supported_dict = self.get_qSupported_dict()
1000        self.assertEqual(supported_dict.get("memory-tagging", '-'),
1001                         '+' if self.isAArch64MTE() else '-')
1002
1003    @skipIfWindows # No pty support to test any inferior output
1004    def test_written_M_content_reads_back_correctly(self):
1005        self.build()
1006        self.set_inferior_startup_launch()
1007
1008        TEST_MESSAGE = "Hello, memory"
1009
1010        # Start up the stub and start/prep the inferior.
1011        procs = self.prep_debug_monitor_and_inferior(
1012            inferior_args=[
1013                "set-message:xxxxxxxxxxxxxX",
1014                "get-data-address-hex:g_message",
1015                "sleep:1",
1016                "print-message:"])
1017        self.test_sequence.add_log_lines(
1018            [
1019                # Start running after initial stop.
1020                "read packet: $c#63",
1021                # Match output line that prints the memory address of the message buffer within the inferior.
1022                # Note we require launch-only testing so we can get inferior otuput.
1023                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
1024                 "capture": {1: "message_address"}},
1025                # Now stop the inferior.
1026                "read packet: {}".format(chr(3)),
1027                # And wait for the stop notification.
1028                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
1029            True)
1030        context = self.expect_gdbremote_sequence()
1031        self.assertIsNotNone(context)
1032
1033        # Grab the message address.
1034        self.assertIsNotNone(context.get("message_address"))
1035        message_address = int(context.get("message_address"), 16)
1036
1037        # Hex-encode the test message, adding null termination.
1038        hex_encoded_message = seven.hexlify(TEST_MESSAGE)
1039
1040        # Write the message to the inferior. Verify that we can read it with the hex-encoded (m)
1041        # and binary (x) memory read packets.
1042        self.reset_test_sequence()
1043        self.test_sequence.add_log_lines(
1044            ["read packet: $M{0:x},{1:x}:{2}#00".format(message_address, len(TEST_MESSAGE), hex_encoded_message),
1045             "send packet: $OK#00",
1046             "read packet: $m{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)),
1047             "send packet: ${0}#00".format(hex_encoded_message),
1048             "read packet: $x{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)),
1049             "send packet: ${0}#00".format(TEST_MESSAGE),
1050             "read packet: $m{0:x},4#00".format(message_address),
1051             "send packet: ${0}#00".format(hex_encoded_message[0:8]),
1052             "read packet: $x{0:x},4#00".format(message_address),
1053             "send packet: ${0}#00".format(TEST_MESSAGE[0:4]),
1054             "read packet: $c#63",
1055             {"type": "output_match", "regex": r"^message: (.+)\r\n$", "capture": {1: "printed_message"}},
1056             "send packet: $W00#00",
1057             ], True)
1058        context = self.expect_gdbremote_sequence()
1059        self.assertIsNotNone(context)
1060
1061        # Ensure what we read from inferior memory is what we wrote.
1062        printed_message = context.get("printed_message")
1063        self.assertIsNotNone(printed_message)
1064        self.assertEqual(printed_message, TEST_MESSAGE + "X")
1065
1066    # Note: as of this moment, a hefty number of the GPR writes are failing with E32 (everything except rax-rdx, rdi, rsi, rbp).
1067    # Come back to this.  I have the test rigged to verify that at least some
1068    # of the bit-flip writes work.
1069    def test_P_writes_all_gpr_registers(self):
1070        self.build()
1071        self.set_inferior_startup_launch()
1072
1073        # Start inferior debug session, grab all register info.
1074        procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"])
1075        self.add_register_info_collection_packets()
1076        self.add_process_info_collection_packets()
1077
1078        context = self.expect_gdbremote_sequence()
1079        self.assertIsNotNone(context)
1080
1081        # Process register infos.
1082        reg_infos = self.parse_register_info_packets(context)
1083        self.assertIsNotNone(reg_infos)
1084        self.add_lldb_register_index(reg_infos)
1085
1086        # Process endian.
1087        process_info = self.parse_process_info_response(context)
1088        endian = process_info.get("endian")
1089        self.assertIsNotNone(endian)
1090
1091        # Pull out the register infos that we think we can bit flip
1092        # successfully,.
1093        gpr_reg_infos = [
1094            reg_info for reg_info in reg_infos if self.is_bit_flippable_register(reg_info)]
1095        self.assertTrue(len(gpr_reg_infos) > 0)
1096
1097        # Write flipped bit pattern of existing value to each register.
1098        (successful_writes, failed_writes) = self.flip_all_bits_in_each_register_value(
1099            gpr_reg_infos, endian)
1100        self.trace("successful writes: {}, failed writes: {}".format(successful_writes, failed_writes))
1101        self.assertTrue(successful_writes > 0)
1102
1103    # Note: as of this moment, a hefty number of the GPR writes are failing
1104    # with E32 (everything except rax-rdx, rdi, rsi, rbp).
1105    @skipIfWindows
1106    def test_P_and_p_thread_suffix_work(self):
1107        self.build()
1108        self.set_inferior_startup_launch()
1109
1110        # Startup the inferior with three threads.
1111        _, threads = self.launch_with_threads(3)
1112
1113        self.reset_test_sequence()
1114        self.add_thread_suffix_request_packets()
1115        self.add_register_info_collection_packets()
1116        self.add_process_info_collection_packets()
1117
1118        context = self.expect_gdbremote_sequence()
1119        self.assertIsNotNone(context)
1120
1121        process_info = self.parse_process_info_response(context)
1122        self.assertIsNotNone(process_info)
1123        endian = process_info.get("endian")
1124        self.assertIsNotNone(endian)
1125
1126        reg_infos = self.parse_register_info_packets(context)
1127        self.assertIsNotNone(reg_infos)
1128        self.add_lldb_register_index(reg_infos)
1129
1130        reg_index = self.select_modifiable_register(reg_infos)
1131        self.assertIsNotNone(reg_index)
1132        reg_byte_size = int(reg_infos[reg_index]["bitsize"]) // 8
1133        self.assertTrue(reg_byte_size > 0)
1134
1135        expected_reg_values = []
1136        register_increment = 1
1137        next_value = None
1138
1139        # Set the same register in each of 3 threads to a different value.
1140        # Verify each one has the unique value.
1141        for thread in threads:
1142            # If we don't have a next value yet, start it with the initial read
1143            # value + 1
1144            if not next_value:
1145                # Read pre-existing register value.
1146                self.reset_test_sequence()
1147                self.test_sequence.add_log_lines(
1148                    ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread),
1149                     {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1150                     ], True)
1151                context = self.expect_gdbremote_sequence()
1152                self.assertIsNotNone(context)
1153
1154                # Set the next value to use for writing as the increment plus
1155                # current value.
1156                p_response = context.get("p_response")
1157                self.assertIsNotNone(p_response)
1158                next_value = lldbgdbserverutils.unpack_register_hex_unsigned(
1159                    endian, p_response)
1160
1161            # Set new value using P and thread suffix.
1162            self.reset_test_sequence()
1163            self.test_sequence.add_log_lines(
1164                [
1165                    "read packet: $P{0:x}={1};thread:{2:x}#00".format(
1166                        reg_index,
1167                        lldbgdbserverutils.pack_register_hex(
1168                            endian,
1169                            next_value,
1170                            byte_size=reg_byte_size),
1171                        thread),
1172                    "send packet: $OK#00",
1173                ],
1174                True)
1175            context = self.expect_gdbremote_sequence()
1176            self.assertIsNotNone(context)
1177
1178            # Save the value we set.
1179            expected_reg_values.append(next_value)
1180
1181            # Increment value for next thread to use (we want them all
1182            # different so we can verify they wrote to each thread correctly
1183            # next.)
1184            next_value += register_increment
1185
1186        # Revisit each thread and verify they have the expected value set for
1187        # the register we wrote.
1188        thread_index = 0
1189        for thread in threads:
1190            # Read pre-existing register value.
1191            self.reset_test_sequence()
1192            self.test_sequence.add_log_lines(
1193                ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread),
1194                 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1195                 ], True)
1196            context = self.expect_gdbremote_sequence()
1197            self.assertIsNotNone(context)
1198
1199            # Get the register value.
1200            p_response = context.get("p_response")
1201            self.assertIsNotNone(p_response)
1202            read_value = lldbgdbserverutils.unpack_register_hex_unsigned(
1203                endian, p_response)
1204
1205            # Make sure we read back what we wrote.
1206            self.assertEqual(read_value, expected_reg_values[thread_index])
1207            thread_index += 1
1208
1209    @skipIfWindows # No pty support to test any inferior output
1210    @add_test_categories(["llgs"])
1211    def test_launch_via_A(self):
1212        self.build()
1213        exe_path = self.getBuildArtifact("a.out")
1214        args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"]
1215        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1216
1217        server = self.connect_to_debug_monitor()
1218        self.assertIsNotNone(server)
1219        self.do_handshake()
1220        # NB: strictly speaking we should use %x here but this packet
1221        # is deprecated, so no point in changing lldb-server's expectations
1222        self.test_sequence.add_log_lines(
1223            ["read packet: $A %d,0,%s,%d,1,%s,%d,2,%s,%d,3,%s#00" %
1224             tuple(itertools.chain.from_iterable(
1225                 [(len(x), x) for x in hex_args])),
1226             "send packet: $OK#00",
1227             "read packet: $c#00",
1228             "send packet: $W00#00"],
1229            True)
1230        context = self.expect_gdbremote_sequence()
1231        self.assertEqual(context["O_content"],
1232                         b'arg1\r\narg2\r\narg3\r\n')
1233
1234    @skipIfWindows # No pty support to test any inferior output
1235    @add_test_categories(["llgs"])
1236    def test_launch_via_vRun(self):
1237        self.build()
1238        exe_path = self.getBuildArtifact("a.out")
1239        args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"]
1240        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1241
1242        server = self.connect_to_debug_monitor()
1243        self.assertIsNotNone(server)
1244        self.do_handshake()
1245        self.test_sequence.add_log_lines(
1246            ["read packet: $vRun;%s;%s;%s;%s#00" % tuple(hex_args),
1247             {"direction": "send",
1248              "regex": r"^\$T([0-9a-fA-F]+)"},
1249             "read packet: $c#00",
1250             "send packet: $W00#00"],
1251            True)
1252        context = self.expect_gdbremote_sequence()
1253        self.assertEqual(context["O_content"],
1254                         b'arg1\r\narg2\r\narg3\r\n')
1255
1256    @add_test_categories(["llgs"])
1257    def test_launch_via_vRun_no_args(self):
1258        self.build()
1259        exe_path = self.getBuildArtifact("a.out")
1260        hex_path = binascii.b2a_hex(exe_path.encode()).decode()
1261
1262        server = self.connect_to_debug_monitor()
1263        self.assertIsNotNone(server)
1264        self.do_handshake()
1265        self.test_sequence.add_log_lines(
1266            ["read packet: $vRun;%s#00" % (hex_path,),
1267             {"direction": "send",
1268              "regex": r"^\$T([0-9a-fA-F]+)"},
1269             "read packet: $c#00",
1270             "send packet: $W00#00"],
1271            True)
1272        self.expect_gdbremote_sequence()
1273
1274    @skipIfWindows # No pty support to test any inferior output
1275    @add_test_categories(["llgs"])
1276    def test_QEnvironment(self):
1277        self.build()
1278        exe_path = self.getBuildArtifact("a.out")
1279        env = {"FOO": "test", "BAR": "a=z"}
1280        args = [exe_path, "print-env:FOO", "print-env:BAR"]
1281        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1282
1283        server = self.connect_to_debug_monitor()
1284        self.assertIsNotNone(server)
1285        self.do_handshake()
1286
1287        for key, value in env.items():
1288            self.test_sequence.add_log_lines(
1289                ["read packet: $QEnvironment:%s=%s#00" % (key, value),
1290                 "send packet: $OK#00"],
1291                True)
1292        self.test_sequence.add_log_lines(
1293            ["read packet: $vRun;%s#00" % (";".join(hex_args),),
1294             {"direction": "send",
1295              "regex": r"^\$T([0-9a-fA-F]+)"},
1296             "read packet: $c#00",
1297             "send packet: $W00#00"],
1298            True)
1299        context = self.expect_gdbremote_sequence()
1300        self.assertEqual(context["O_content"], b"test\r\na=z\r\n")
1301
1302    @skipIfWindows # No pty support to test any inferior output
1303    @add_test_categories(["llgs"])
1304    def test_QEnvironmentHexEncoded(self):
1305        self.build()
1306        exe_path = self.getBuildArtifact("a.out")
1307        env = {"FOO": "test", "BAR": "a=z", "BAZ": "a*}#z"}
1308        args = [exe_path, "print-env:FOO", "print-env:BAR", "print-env:BAZ"]
1309        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1310
1311        server = self.connect_to_debug_monitor()
1312        self.assertIsNotNone(server)
1313        self.do_handshake()
1314
1315        for key, value in env.items():
1316            hex_enc = binascii.b2a_hex(("%s=%s" % (key, value)).encode()).decode()
1317            self.test_sequence.add_log_lines(
1318                ["read packet: $QEnvironmentHexEncoded:%s#00" % (hex_enc,),
1319                 "send packet: $OK#00"],
1320                True)
1321        self.test_sequence.add_log_lines(
1322            ["read packet: $vRun;%s#00" % (";".join(hex_args),),
1323             {"direction": "send",
1324              "regex": r"^\$T([0-9a-fA-F]+)"},
1325             "read packet: $c#00",
1326             "send packet: $W00#00"],
1327            True)
1328        context = self.expect_gdbremote_sequence()
1329        self.assertEqual(context["O_content"], b"test\r\na=z\r\na*}#z\r\n")
1330
1331    @skipUnlessPlatform(oslist=["freebsd", "linux"])
1332    @add_test_categories(["llgs"])
1333    def test_qXfer_siginfo_read(self):
1334        self.build()
1335        self.set_inferior_startup_launch()
1336        procs = self.prep_debug_monitor_and_inferior(
1337                inferior_args=["thread:segfault", "thread:new", "sleep:10"])
1338        self.test_sequence.add_log_lines(["read packet: $c#63"], True)
1339        self.expect_gdbremote_sequence()
1340
1341        # Run until SIGSEGV comes in.
1342        self.reset_test_sequence()
1343        self.test_sequence.add_log_lines(
1344            [{"direction": "send",
1345              "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1346              "capture": {1: "signo", 2: "thread_id"},
1347              }], True)
1348
1349        # Figure out which thread crashed.
1350        context = self.expect_gdbremote_sequence()
1351        self.assertIsNotNone(context)
1352        self.assertEqual(int(context["signo"], 16),
1353                         lldbutil.get_signal_number('SIGSEGV'))
1354        crashing_thread = int(context["thread_id"], 16)
1355
1356        # Grab siginfo for the crashing thread.
1357        self.reset_test_sequence()
1358        self.add_process_info_collection_packets()
1359        self.test_sequence.add_log_lines(
1360            ["read packet: $Hg{:x}#00".format(crashing_thread),
1361             "send packet: $OK#00",
1362             "read packet: $qXfer:siginfo:read::0,80:#00",
1363             {"direction": "send",
1364              "regex": re.compile(r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
1365                                  re.MULTILINE | re.DOTALL),
1366              "capture": {1: "response_type", 2: "content_raw"},
1367              }], True)
1368        context = self.expect_gdbremote_sequence()
1369        self.assertIsNotNone(context)
1370
1371        # Ensure we end up with all data in one packet.
1372        self.assertEqual(context.get("response_type"), "l")
1373
1374        # Decode binary data.
1375        content_raw = context.get("content_raw")
1376        self.assertIsNotNone(content_raw)
1377        content = self.decode_gdbremote_binary(content_raw).encode("latin1")
1378
1379        # Decode siginfo_t.
1380        process_info = self.parse_process_info_response(context)
1381        pad = ""
1382        if process_info["ptrsize"] == "8":
1383            pad = "i"
1384        signo_idx = 0
1385        errno_idx = 1
1386        code_idx = 2
1387        addr_idx = -1
1388        SEGV_MAPERR = 1
1389        if process_info["ostype"] == "linux":
1390            # si_signo, si_errno, si_code, [pad], _sifields._sigfault.si_addr
1391            format_str = "iii{}P".format(pad)
1392        elif process_info["ostype"].startswith("freebsd"):
1393            # si_signo, si_errno, si_code, si_pid, si_uid, si_status, si_addr
1394            format_str = "iiiiiiP"
1395        elif process_info["ostype"].startswith("netbsd"):
1396            # _signo, _code, _errno, [pad], _reason._fault._addr
1397            format_str = "iii{}P".format(pad)
1398            errno_idx = 2
1399            code_idx = 1
1400        else:
1401            assert False, "unknown ostype"
1402
1403        decoder = struct.Struct(format_str)
1404        decoded = decoder.unpack(content[:decoder.size])
1405        self.assertEqual(decoded[signo_idx],
1406                         lldbutil.get_signal_number('SIGSEGV'))
1407        self.assertEqual(decoded[errno_idx], 0)  # si_errno
1408        self.assertEqual(decoded[code_idx], SEGV_MAPERR)  # si_code
1409        self.assertEqual(decoded[addr_idx], 0)  # si_addr
1410