1"""
2Test case for testing the gdbremote protocol.
3
4Tests run against debugserver and lldb-server (llgs).
5lldb-server tests run where the lldb-server exe is
6available.
7
8This class will be broken into smaller test case classes by
9gdb remote packet functional areas.  For now it contains
10the initial set of tests implemented.
11"""
12
13import binascii
14import itertools
15import struct
16
17import unittest2
18import gdbremote_testcase
19import lldbgdbserverutils
20from lldbsuite.support import seven
21from lldbsuite.test.decorators import *
22from lldbsuite.test.lldbtest import *
23from lldbsuite.test.lldbdwarf import *
24from lldbsuite.test import lldbutil, lldbplatformutil
25
26
27class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase, DwarfOpcodeParser):
28
29    mydir = TestBase.compute_mydir(__file__)
30
31    def test_thread_suffix_supported(self):
32        server = self.connect_to_debug_monitor()
33        self.assertIsNotNone(server)
34
35        self.do_handshake()
36        self.test_sequence.add_log_lines(
37            ["lldb-server <  26> read packet: $QThreadSuffixSupported#e4",
38             "lldb-server <   6> send packet: $OK#9a"],
39            True)
40
41        self.expect_gdbremote_sequence()
42
43
44    def test_list_threads_in_stop_reply_supported(self):
45        server = self.connect_to_debug_monitor()
46        self.assertIsNotNone(server)
47
48        self.do_handshake()
49        self.test_sequence.add_log_lines(
50            ["lldb-server <  27> read packet: $QListThreadsInStopReply#21",
51             "lldb-server <   6> send packet: $OK#9a"],
52            True)
53        self.expect_gdbremote_sequence()
54
55    def test_c_packet_works(self):
56        self.build()
57        procs = self.prep_debug_monitor_and_inferior()
58        self.test_sequence.add_log_lines(
59            ["read packet: $c#63",
60             "send packet: $W00#00"],
61            True)
62
63        self.expect_gdbremote_sequence()
64
65    @skipIfWindows # No pty support to test any inferior output
66    def test_inferior_print_exit(self):
67        self.build()
68        procs = self.prep_debug_monitor_and_inferior(
69                inferior_args=["hello, world"])
70        self.test_sequence.add_log_lines(
71            ["read packet: $vCont;c#a8",
72             {"type": "output_match", "regex": self.maybe_strict_output_regex(r"hello, world\r\n")},
73             "send packet: $W00#00"],
74            True)
75
76        context = self.expect_gdbremote_sequence()
77        self.assertIsNotNone(context)
78
79    def test_first_launch_stop_reply_thread_matches_first_qC(self):
80        self.build()
81        procs = self.prep_debug_monitor_and_inferior()
82        self.test_sequence.add_log_lines(["read packet: $qC#00",
83                                          {"direction": "send",
84                                           "regex": r"^\$QC([0-9a-fA-F]+)#",
85                                           "capture": {1: "thread_id_QC"}},
86                                          "read packet: $?#00",
87                                          {"direction": "send",
88                                              "regex": r"^\$T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+)",
89                                              "capture": {1: "thread_id_?"}}],
90                                         True)
91        context = self.expect_gdbremote_sequence()
92        self.assertEqual(context.get("thread_id_QC"), context.get("thread_id_?"))
93
94    def test_attach_commandline_continue_app_exits(self):
95        self.build()
96        self.set_inferior_startup_attach()
97        procs = self.prep_debug_monitor_and_inferior()
98        self.test_sequence.add_log_lines(
99            ["read packet: $vCont;c#a8",
100             "send packet: $W00#00"],
101            True)
102        self.expect_gdbremote_sequence()
103
104        # Wait a moment for completed and now-detached inferior process to
105        # clear.
106        time.sleep(1)
107
108        if not lldb.remote_platform:
109            # Process should be dead now. Reap results.
110            poll_result = procs["inferior"].poll()
111            self.assertIsNotNone(poll_result)
112
113        # Where possible, verify at the system level that the process is not
114        # running.
115        self.assertFalse(
116            lldbgdbserverutils.process_is_running(
117                procs["inferior"].pid, False))
118
119    def test_qRegisterInfo_returns_one_valid_result(self):
120        self.build()
121        self.prep_debug_monitor_and_inferior()
122        self.test_sequence.add_log_lines(
123            ["read packet: $qRegisterInfo0#00",
124             {"direction": "send", "regex": r"^\$(.+);#[0-9A-Fa-f]{2}", "capture": {1: "reginfo_0"}}],
125            True)
126
127        # Run the stream
128        context = self.expect_gdbremote_sequence()
129        self.assertIsNotNone(context)
130
131        reg_info_packet = context.get("reginfo_0")
132        self.assertIsNotNone(reg_info_packet)
133        self.assert_valid_reg_info(
134            lldbgdbserverutils.parse_reg_info_response(reg_info_packet))
135
136    def test_qRegisterInfo_returns_all_valid_results(self):
137        self.build()
138        self.prep_debug_monitor_and_inferior()
139        self.add_register_info_collection_packets()
140
141        # Run the stream.
142        context = self.expect_gdbremote_sequence()
143        self.assertIsNotNone(context)
144
145        # Validate that each register info returned validates.
146        for reg_info in self.parse_register_info_packets(context):
147            self.assert_valid_reg_info(reg_info)
148
149    def test_qRegisterInfo_contains_required_generics_debugserver(self):
150        self.build()
151        self.prep_debug_monitor_and_inferior()
152        self.add_register_info_collection_packets()
153
154        # Run the packet stream.
155        context = self.expect_gdbremote_sequence()
156        self.assertIsNotNone(context)
157
158        # Gather register info entries.
159        reg_infos = self.parse_register_info_packets(context)
160
161        # Collect all generic registers found.
162        generic_regs = {
163            reg_info['generic']: 1 for reg_info in reg_infos if 'generic' in reg_info}
164
165        # Ensure we have a program counter register.
166        self.assertIn('pc', generic_regs)
167
168        # Ensure we have a frame pointer register. PPC64le's FP is the same as SP
169        if self.getArchitecture() != 'powerpc64le':
170            self.assertIn('fp', generic_regs)
171
172        # Ensure we have a stack pointer register.
173        self.assertIn('sp', generic_regs)
174
175        # Ensure we have a flags register.
176        self.assertIn('flags', generic_regs)
177
178    def test_qRegisterInfo_contains_at_least_one_register_set(self):
179        self.build()
180        self.prep_debug_monitor_and_inferior()
181        self.add_register_info_collection_packets()
182
183        # Run the packet stream.
184        context = self.expect_gdbremote_sequence()
185        self.assertIsNotNone(context)
186
187        # Gather register info entries.
188        reg_infos = self.parse_register_info_packets(context)
189
190        # Collect all register sets found.
191        register_sets = {
192            reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info}
193        self.assertTrue(len(register_sets) >= 1)
194
195    def targetHasAVX(self):
196        triple = self.dbg.GetSelectedPlatform().GetTriple()
197
198        # TODO other platforms, please implement this function
199        if not re.match(".*-.*-linux", triple):
200            return True
201
202        # Need to do something different for non-Linux/Android targets
203        if lldb.remote_platform:
204            self.runCmd('platform get-file "/proc/cpuinfo" "cpuinfo"')
205            cpuinfo_path = "cpuinfo"
206            self.addTearDownHook(lambda: os.unlink("cpuinfo"))
207        else:
208            cpuinfo_path = "/proc/cpuinfo"
209
210        f = open(cpuinfo_path, 'r')
211        cpuinfo = f.read()
212        f.close()
213        return " avx " in cpuinfo
214
215    @expectedFailureAll(oslist=["windows"]) # no avx for now.
216    @skipIf(archs=no_match(['amd64', 'i386', 'x86_64']))
217    @add_test_categories(["llgs"])
218    def test_qRegisterInfo_contains_avx_registers(self):
219        self.build()
220        self.prep_debug_monitor_and_inferior()
221        self.add_register_info_collection_packets()
222
223        # Run the packet stream.
224        context = self.expect_gdbremote_sequence()
225        self.assertIsNotNone(context)
226
227        # Gather register info entries.
228        reg_infos = self.parse_register_info_packets(context)
229
230        # Collect all generics found.
231        register_sets = {
232            reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info}
233        self.assertEqual(
234            self.targetHasAVX(),
235            "Advanced Vector Extensions" in register_sets)
236
237    def qThreadInfo_contains_thread(self):
238        procs = self.prep_debug_monitor_and_inferior()
239        self.add_threadinfo_collection_packets()
240
241        # Run the packet stream.
242        context = self.expect_gdbremote_sequence()
243        self.assertIsNotNone(context)
244
245        # Gather threadinfo entries.
246        threads = self.parse_threadinfo_packets(context)
247        self.assertIsNotNone(threads)
248
249        # We should have exactly one thread.
250        self.assertEqual(len(threads), 1)
251
252    def test_qThreadInfo_contains_thread_launch(self):
253        self.build()
254        self.set_inferior_startup_launch()
255        self.qThreadInfo_contains_thread()
256
257    @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped
258    def test_qThreadInfo_contains_thread_attach(self):
259        self.build()
260        self.set_inferior_startup_attach()
261        self.qThreadInfo_contains_thread()
262
263    def qThreadInfo_matches_qC(self):
264        procs = self.prep_debug_monitor_and_inferior()
265
266        self.add_threadinfo_collection_packets()
267        self.test_sequence.add_log_lines(
268            ["read packet: $qC#00",
269             {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}
270             ], True)
271
272        # Run the packet stream.
273        context = self.expect_gdbremote_sequence()
274        self.assertIsNotNone(context)
275
276        # Gather threadinfo entries.
277        threads = self.parse_threadinfo_packets(context)
278        self.assertIsNotNone(threads)
279
280        # We should have exactly one thread from threadinfo.
281        self.assertEqual(len(threads), 1)
282
283        # We should have a valid thread_id from $QC.
284        QC_thread_id_hex = context.get("thread_id")
285        self.assertIsNotNone(QC_thread_id_hex)
286        QC_thread_id = int(QC_thread_id_hex, 16)
287
288        # Those two should be the same.
289        self.assertEqual(threads[0], QC_thread_id)
290
291    def test_qThreadInfo_matches_qC_launch(self):
292        self.build()
293        self.set_inferior_startup_launch()
294        self.qThreadInfo_matches_qC()
295
296    @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped
297    def test_qThreadInfo_matches_qC_attach(self):
298        self.build()
299        self.set_inferior_startup_attach()
300        self.qThreadInfo_matches_qC()
301
302    def test_p_returns_correct_data_size_for_each_qRegisterInfo_launch(self):
303        self.build()
304        self.set_inferior_startup_launch()
305        procs = self.prep_debug_monitor_and_inferior()
306        self.add_register_info_collection_packets()
307
308        # Run the packet stream.
309        context = self.expect_gdbremote_sequence()
310        self.assertIsNotNone(context)
311
312        # Gather register info entries.
313        reg_infos = self.parse_register_info_packets(context)
314        self.assertIsNotNone(reg_infos)
315        self.assertTrue(len(reg_infos) > 0)
316
317        byte_order = self.get_target_byte_order()
318
319        # Read value for each register.
320        reg_index = 0
321        for reg_info in reg_infos:
322            # Skip registers that don't have a register set.  For x86, these are
323            # the DRx registers, which have no LLDB-kind register number and thus
324            # cannot be read via normal
325            # NativeRegisterContext::ReadRegister(reg_info,...) calls.
326            if not "set" in reg_info:
327                continue
328
329            # Clear existing packet expectations.
330            self.reset_test_sequence()
331
332            # Run the register query
333            self.test_sequence.add_log_lines(
334                ["read packet: $p{0:x}#00".format(reg_index),
335                 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}],
336                True)
337            context = self.expect_gdbremote_sequence()
338            self.assertIsNotNone(context)
339
340            # Verify the response length.
341            p_response = context.get("p_response")
342            self.assertIsNotNone(p_response)
343
344            # Skip erraneous (unsupported) registers.
345            # TODO: remove this once we make unsupported registers disappear.
346            if p_response.startswith("E") and len(p_response) == 3:
347                continue
348
349            if "dynamic_size_dwarf_expr_bytes" in reg_info:
350                self.updateRegInfoBitsize(reg_info, byte_order)
351            self.assertEqual(len(p_response), 2 * int(reg_info["bitsize"]) / 8,
352                             reg_info)
353
354            # Increment loop
355            reg_index += 1
356
357    def Hg_switches_to_3_threads(self, pass_pid=False):
358        # Startup the inferior with three threads (main + 2 new ones).
359        procs = self.prep_debug_monitor_and_inferior(
360            inferior_args=["thread:new", "thread:new"])
361
362        # Let the inferior process have a few moments to start up the thread
363        # when launched.  (The launch scenario has no time to run, so threads
364        # won't be there yet.)
365        self.run_process_then_stop(run_seconds=1)
366
367        # Wait at most x seconds for 3 threads to be present.
368        threads = self.wait_for_thread_count(3)
369        self.assertEqual(len(threads), 3)
370
371        pid_str = ""
372        if pass_pid:
373            pid_str = "p{0:x}.".format(procs["inferior"].pid)
374
375        # verify we can $H to each thead, and $qC matches the thread we set.
376        for thread in threads:
377            # Change to each thread, verify current thread id.
378            self.reset_test_sequence()
379            self.test_sequence.add_log_lines(
380                ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread),  # Set current thread.
381                 "send packet: $OK#00",
382                 "read packet: $qC#00",
383                 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}],
384                True)
385
386            context = self.expect_gdbremote_sequence()
387            self.assertIsNotNone(context)
388
389            # Verify the thread id.
390            self.assertIsNotNone(context.get("thread_id"))
391            self.assertEqual(int(context.get("thread_id"), 16), thread)
392
393    @expectedFailureAll(oslist=["windows"]) # expect 4 threads
394    @skipIf(compiler="clang", compiler_version=['<', '11.0'])
395    def test_Hg_switches_to_3_threads_launch(self):
396        self.build()
397        self.set_inferior_startup_launch()
398        self.Hg_switches_to_3_threads()
399
400    @expectedFailureAll(oslist=["windows"]) # expecting one more thread
401    @skipIf(compiler="clang", compiler_version=['<', '11.0'])
402    def test_Hg_switches_to_3_threads_attach(self):
403        self.build()
404        self.set_inferior_startup_attach()
405        self.Hg_switches_to_3_threads()
406
407    @expectedFailureAll(oslist=["windows"]) # expect 4 threads
408    @add_test_categories(["llgs"])
409    @skipIf(compiler="clang", compiler_version=['<', '11.0'])
410    def test_Hg_switches_to_3_threads_attach_pass_correct_pid(self):
411        self.build()
412        self.set_inferior_startup_attach()
413        self.Hg_switches_to_3_threads(pass_pid=True)
414
415    def Hg_fails_on_pid(self, pass_pid):
416        # Start the inferior.
417        procs = self.prep_debug_monitor_and_inferior(
418            inferior_args=["thread:new"])
419
420        self.run_process_then_stop(run_seconds=1)
421
422        threads = self.wait_for_thread_count(2)
423        self.assertEqual(len(threads), 2)
424
425        if pass_pid == -1:
426            pid_str = "p-1."
427        else:
428            pid_str = "p{0:x}.".format(pass_pid)
429        thread = threads[1]
430
431        self.test_sequence.add_log_lines(
432            ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread),  # Set current thread.
433             "send packet: $Eff#00"],
434            True)
435
436        self.expect_gdbremote_sequence()
437
438    @expectedFailureAll(oslist=["windows"])
439    @add_test_categories(["llgs"])
440    def test_Hg_fails_on_another_pid(self):
441        self.build()
442        self.set_inferior_startup_launch()
443        self.Hg_fails_on_pid(1)
444
445    @expectedFailureAll(oslist=["windows"])
446    @add_test_categories(["llgs"])
447    def test_Hg_fails_on_zero_pid(self):
448        self.build()
449        self.set_inferior_startup_launch()
450        self.Hg_fails_on_pid(0)
451
452    @expectedFailureAll(oslist=["windows"])
453    @add_test_categories(["llgs"])
454    def test_Hg_fails_on_minus_one_pid(self):
455        self.build()
456        self.set_inferior_startup_launch()
457        self.Hg_fails_on_pid(-1)
458
459    def Hc_then_Csignal_signals_correct_thread(self, segfault_signo):
460        # NOTE only run this one in inferior-launched mode: we can't grab inferior stdout when running attached,
461        # and the test requires getting stdout from the exe.
462
463        NUM_THREADS = 3
464
465        # Startup the inferior with three threads (main + NUM_THREADS-1 worker threads).
466        # inferior_args=["thread:print-ids"]
467        inferior_args = ["thread:segfault"]
468        for i in range(NUM_THREADS - 1):
469            # if i > 0:
470                # Give time between thread creation/segfaulting for the handler to work.
471                # inferior_args.append("sleep:1")
472            inferior_args.append("thread:new")
473        inferior_args.append("sleep:10")
474
475        # Launch/attach.  (In our case, this should only ever be launched since
476        # we need inferior stdout/stderr).
477        procs = self.prep_debug_monitor_and_inferior(
478            inferior_args=inferior_args)
479        self.test_sequence.add_log_lines(["read packet: $c#63"], True)
480        context = self.expect_gdbremote_sequence()
481
482        # Let the inferior process have a few moments to start up the thread when launched.
483        # context = self.run_process_then_stop(run_seconds=1)
484
485        # Wait at most x seconds for all threads to be present.
486        # threads = self.wait_for_thread_count(NUM_THREADS)
487        # self.assertEquals(len(threads), NUM_THREADS)
488
489        signaled_tids = {}
490        print_thread_ids = {}
491
492        # Switch to each thread, deliver a signal, and verify signal delivery
493        for i in range(NUM_THREADS - 1):
494            # Run until SIGSEGV comes in.
495            self.reset_test_sequence()
496            self.test_sequence.add_log_lines([{"direction": "send",
497                                               "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
498                                               "capture": {1: "signo",
499                                                            2: "thread_id"}}],
500                                             True)
501
502            context = self.expect_gdbremote_sequence()
503            self.assertIsNotNone(context)
504            signo = context.get("signo")
505            self.assertEqual(int(signo, 16), segfault_signo)
506
507            # Ensure we haven't seen this tid yet.
508            thread_id = int(context.get("thread_id"), 16)
509            self.assertNotIn(thread_id, signaled_tids)
510            signaled_tids[thread_id] = 1
511
512            # Send SIGUSR1 to the thread that signaled the SIGSEGV.
513            self.reset_test_sequence()
514            self.test_sequence.add_log_lines(
515                [
516                    # Set the continue thread.
517                    # Set current thread.
518                    "read packet: $Hc{0:x}#00".format(thread_id),
519                    "send packet: $OK#00",
520
521                    # Continue sending the signal number to the continue thread.
522                    # The commented out packet is a way to do this same operation without using
523                    # a $Hc (but this test is testing $Hc, so we'll stick with the former).
524                    "read packet: $C{0:x}#00".format(lldbutil.get_signal_number('SIGUSR1')),
525                    # "read packet: $vCont;C{0:x}:{1:x};c#00".format(lldbutil.get_signal_number('SIGUSR1'), thread_id),
526
527                    # FIXME: Linux does not report the thread stop on the delivered signal (SIGUSR1 here).  MacOSX debugserver does.
528                    # But MacOSX debugserver isn't guaranteeing the thread the signal handler runs on, so currently its an XFAIL.
529                    # Need to rectify behavior here.  The linux behavior is more intuitive to me since we're essentially swapping out
530                    # an about-to-be-delivered signal (for which we already sent a stop packet) to a different signal.
531                    # {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
532                    #  "read packet: $c#63",
533                    {"type": "output_match", "regex": r"^received SIGUSR1 on thread id: ([0-9a-fA-F]+)\r\nthread ([0-9a-fA-F]+): past SIGSEGV\r\n", "capture": {1: "print_thread_id", 2: "post_handle_thread_id"}},
534                ],
535                True)
536
537            # Run the sequence.
538            context = self.expect_gdbremote_sequence()
539            self.assertIsNotNone(context)
540
541            # Ensure the stop signal is the signal we delivered.
542            # stop_signo = context.get("stop_signo")
543            # self.assertIsNotNone(stop_signo)
544            # self.assertEquals(int(stop_signo,16), lldbutil.get_signal_number('SIGUSR1'))
545
546            # Ensure the stop thread is the thread to which we delivered the signal.
547            # stop_thread_id = context.get("stop_thread_id")
548            # self.assertIsNotNone(stop_thread_id)
549            # self.assertEquals(int(stop_thread_id,16), thread_id)
550
551            # Ensure we haven't seen this thread id yet.  The inferior's
552            # self-obtained thread ids are not guaranteed to match the stub
553            # tids (at least on MacOSX).
554            print_thread_id = context.get("print_thread_id")
555            self.assertIsNotNone(print_thread_id)
556            print_thread_id = int(print_thread_id, 16)
557            self.assertNotIn(print_thread_id, print_thread_ids)
558
559            # Now remember this print (i.e. inferior-reflected) thread id and
560            # ensure we don't hit it again.
561            print_thread_ids[print_thread_id] = 1
562
563            # Ensure post signal-handle thread id matches the thread that
564            # initially raised the SIGSEGV.
565            post_handle_thread_id = context.get("post_handle_thread_id")
566            self.assertIsNotNone(post_handle_thread_id)
567            post_handle_thread_id = int(post_handle_thread_id, 16)
568            self.assertEqual(post_handle_thread_id, print_thread_id)
569
570    @expectedFailureDarwin
571    @skipIfWindows # no SIGSEGV support
572    @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48419")
573    @expectedFailureNetBSD
574    def test_Hc_then_Csignal_signals_correct_thread_launch(self):
575        self.build()
576        self.set_inferior_startup_launch()
577
578        if self.platformIsDarwin():
579            # Darwin debugserver translates some signals like SIGSEGV into some gdb
580            # expectations about fixed signal numbers.
581            self.Hc_then_Csignal_signals_correct_thread(self.TARGET_EXC_BAD_ACCESS)
582        else:
583            self.Hc_then_Csignal_signals_correct_thread(
584                lldbutil.get_signal_number('SIGSEGV'))
585
586    @skipIfWindows # No pty support to test any inferior output
587    def test_m_packet_reads_memory(self):
588        self.build()
589        self.set_inferior_startup_launch()
590        # This is the memory we will write into the inferior and then ensure we
591        # can read back with $m.
592        MEMORY_CONTENTS = "Test contents 0123456789 ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz"
593
594        # Start up the inferior.
595        procs = self.prep_debug_monitor_and_inferior(
596            inferior_args=[
597                "set-message:%s" %
598                MEMORY_CONTENTS,
599                "get-data-address-hex:g_message",
600                "sleep:5"])
601
602        # Run the process
603        self.test_sequence.add_log_lines(
604            [
605                # Start running after initial stop.
606                "read packet: $c#63",
607                # Match output line that prints the memory address of the message buffer within the inferior.
608                # Note we require launch-only testing so we can get inferior otuput.
609                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
610                 "capture": {1: "message_address"}},
611                # Now stop the inferior.
612                "read packet: {}".format(chr(3)),
613                # And wait for the stop notification.
614                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
615            True)
616
617        # Run the packet stream.
618        context = self.expect_gdbremote_sequence()
619        self.assertIsNotNone(context)
620
621        # Grab the message address.
622        self.assertIsNotNone(context.get("message_address"))
623        message_address = int(context.get("message_address"), 16)
624
625        # Grab contents from the inferior.
626        self.reset_test_sequence()
627        self.test_sequence.add_log_lines(
628            ["read packet: $m{0:x},{1:x}#00".format(message_address, len(MEMORY_CONTENTS)),
629             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "read_contents"}}],
630            True)
631
632        # Run the packet stream.
633        context = self.expect_gdbremote_sequence()
634        self.assertIsNotNone(context)
635
636        # Ensure what we read from inferior memory is what we wrote.
637        self.assertIsNotNone(context.get("read_contents"))
638        read_contents = seven.unhexlify(context.get("read_contents"))
639        self.assertEqual(read_contents, MEMORY_CONTENTS)
640
641    def test_qMemoryRegionInfo_is_supported(self):
642        self.build()
643        self.set_inferior_startup_launch()
644        # Start up the inferior.
645        procs = self.prep_debug_monitor_and_inferior()
646
647        # Ask if it supports $qMemoryRegionInfo.
648        self.test_sequence.add_log_lines(
649            ["read packet: $qMemoryRegionInfo#00",
650             "send packet: $OK#00"
651             ], True)
652        self.expect_gdbremote_sequence()
653
654    @skipIfWindows # No pty support to test any inferior output
655    def test_qMemoryRegionInfo_reports_code_address_as_executable(self):
656        self.build()
657        self.set_inferior_startup_launch()
658
659        # Start up the inferior.
660        procs = self.prep_debug_monitor_and_inferior(
661            inferior_args=["get-code-address-hex:hello", "sleep:5"])
662
663        # Run the process
664        self.test_sequence.add_log_lines(
665            [
666                # Start running after initial stop.
667                "read packet: $c#63",
668                # Match output line that prints the memory address of the message buffer within the inferior.
669                # Note we require launch-only testing so we can get inferior otuput.
670                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"),
671                 "capture": {1: "code_address"}},
672                # Now stop the inferior.
673                "read packet: {}".format(chr(3)),
674                # And wait for the stop notification.
675                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
676            True)
677
678        # Run the packet stream.
679        context = self.expect_gdbremote_sequence()
680        self.assertIsNotNone(context)
681
682        # Grab the code address.
683        self.assertIsNotNone(context.get("code_address"))
684        code_address = int(context.get("code_address"), 16)
685
686        # Grab memory region info from the inferior.
687        self.reset_test_sequence()
688        self.add_query_memory_region_packets(code_address)
689
690        # Run the packet stream.
691        context = self.expect_gdbremote_sequence()
692        self.assertIsNotNone(context)
693        mem_region_dict = self.parse_memory_region_packet(context)
694
695        # Ensure there are no errors reported.
696        self.assertNotIn("error", mem_region_dict)
697
698        # Ensure code address is readable and executable.
699        self.assertIn("permissions", mem_region_dict)
700        self.assertIn("r", mem_region_dict["permissions"])
701        self.assertIn("x", mem_region_dict["permissions"])
702
703        # Ensure the start address and size encompass the address we queried.
704        self.assert_address_within_memory_region(code_address, mem_region_dict)
705
706    @skipIfWindows # No pty support to test any inferior output
707    def test_qMemoryRegionInfo_reports_stack_address_as_rw(self):
708        self.build()
709        self.set_inferior_startup_launch()
710
711        # Start up the inferior.
712        procs = self.prep_debug_monitor_and_inferior(
713            inferior_args=["get-stack-address-hex:", "sleep:5"])
714
715        # Run the process
716        self.test_sequence.add_log_lines(
717            [
718                # Start running after initial stop.
719                "read packet: $c#63",
720                # Match output line that prints the memory address of the message buffer within the inferior.
721                # Note we require launch-only testing so we can get inferior otuput.
722                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"stack address: 0x([0-9a-fA-F]+)\r\n"),
723                 "capture": {1: "stack_address"}},
724                # Now stop the inferior.
725                "read packet: {}".format(chr(3)),
726                # And wait for the stop notification.
727                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
728            True)
729
730        # Run the packet stream.
731        context = self.expect_gdbremote_sequence()
732        self.assertIsNotNone(context)
733
734        # Grab the address.
735        self.assertIsNotNone(context.get("stack_address"))
736        stack_address = int(context.get("stack_address"), 16)
737
738        # Grab memory region info from the inferior.
739        self.reset_test_sequence()
740        self.add_query_memory_region_packets(stack_address)
741
742        # Run the packet stream.
743        context = self.expect_gdbremote_sequence()
744        self.assertIsNotNone(context)
745        mem_region_dict = self.parse_memory_region_packet(context)
746
747        # Ensure there are no errors reported.
748        self.assertNotIn("error", mem_region_dict)
749
750        # Ensure address is readable and executable.
751        self.assertIn("permissions", mem_region_dict)
752        self.assertIn("r", mem_region_dict["permissions"])
753        self.assertIn("w", mem_region_dict["permissions"])
754
755        # Ensure the start address and size encompass the address we queried.
756        self.assert_address_within_memory_region(
757            stack_address, mem_region_dict)
758
759    @skipIfWindows # No pty support to test any inferior output
760    def test_qMemoryRegionInfo_reports_heap_address_as_rw(self):
761        self.build()
762        self.set_inferior_startup_launch()
763
764        # Start up the inferior.
765        procs = self.prep_debug_monitor_and_inferior(
766            inferior_args=["get-heap-address-hex:", "sleep:5"])
767
768        # Run the process
769        self.test_sequence.add_log_lines(
770            [
771                # Start running after initial stop.
772                "read packet: $c#63",
773                # Match output line that prints the memory address of the message buffer within the inferior.
774                # Note we require launch-only testing so we can get inferior otuput.
775                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"heap address: 0x([0-9a-fA-F]+)\r\n"),
776                 "capture": {1: "heap_address"}},
777                # Now stop the inferior.
778                "read packet: {}".format(chr(3)),
779                # And wait for the stop notification.
780                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
781            True)
782
783        # Run the packet stream.
784        context = self.expect_gdbremote_sequence()
785        self.assertIsNotNone(context)
786
787        # Grab the address.
788        self.assertIsNotNone(context.get("heap_address"))
789        heap_address = int(context.get("heap_address"), 16)
790
791        # Grab memory region info from the inferior.
792        self.reset_test_sequence()
793        self.add_query_memory_region_packets(heap_address)
794
795        # Run the packet stream.
796        context = self.expect_gdbremote_sequence()
797        self.assertIsNotNone(context)
798        mem_region_dict = self.parse_memory_region_packet(context)
799
800        # Ensure there are no errors reported.
801        self.assertNotIn("error", mem_region_dict)
802
803        # Ensure address is readable and executable.
804        self.assertIn("permissions", mem_region_dict)
805        self.assertIn("r", mem_region_dict["permissions"])
806        self.assertIn("w", mem_region_dict["permissions"])
807
808        # Ensure the start address and size encompass the address we queried.
809        self.assert_address_within_memory_region(heap_address, mem_region_dict)
810
811    def breakpoint_set_and_remove_work(self, want_hardware):
812        # Start up the inferior.
813        procs = self.prep_debug_monitor_and_inferior(
814            inferior_args=[
815                "get-code-address-hex:hello",
816                "sleep:1",
817                "call-function:hello"])
818
819        # Run the process
820        self.add_register_info_collection_packets()
821        self.add_process_info_collection_packets()
822        self.test_sequence.add_log_lines(
823            [  # Start running after initial stop.
824                "read packet: $c#63",
825                # Match output line that prints the memory address of the function call entry point.
826                # Note we require launch-only testing so we can get inferior otuput.
827                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"),
828                 "capture": {1: "function_address"}},
829                # Now stop the inferior.
830                "read packet: {}".format(chr(3)),
831                # And wait for the stop notification.
832                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
833            True)
834
835        # Run the packet stream.
836        context = self.expect_gdbremote_sequence()
837        self.assertIsNotNone(context)
838
839        # Gather process info - we need endian of target to handle register
840        # value conversions.
841        process_info = self.parse_process_info_response(context)
842        endian = process_info.get("endian")
843        self.assertIsNotNone(endian)
844
845        # Gather register info entries.
846        reg_infos = self.parse_register_info_packets(context)
847        (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_infos)
848        self.assertIsNotNone(pc_lldb_reg_index)
849        self.assertIsNotNone(pc_reg_info)
850
851        # Grab the function address.
852        self.assertIsNotNone(context.get("function_address"))
853        function_address = int(context.get("function_address"), 16)
854
855        # Get current target architecture
856        target_arch = self.getArchitecture()
857
858        # Set the breakpoint.
859        if (target_arch == "arm") or (target_arch == "aarch64"):
860            # TODO: Handle case when setting breakpoint in thumb code
861            BREAKPOINT_KIND = 4
862        else:
863            BREAKPOINT_KIND = 1
864
865        # Set default packet type to Z0 (software breakpoint)
866        z_packet_type = 0
867
868        # If hardware breakpoint is requested set packet type to Z1
869        if want_hardware == True:
870            z_packet_type = 1
871
872        self.reset_test_sequence()
873        self.add_set_breakpoint_packets(
874            function_address,
875            z_packet_type,
876            do_continue=True,
877            breakpoint_kind=BREAKPOINT_KIND)
878
879        # Run the packet stream.
880        context = self.expect_gdbremote_sequence()
881        self.assertIsNotNone(context)
882
883        # Verify the stop signal reported was the breakpoint signal number.
884        stop_signo = context.get("stop_signo")
885        self.assertIsNotNone(stop_signo)
886        self.assertEqual(int(stop_signo, 16),
887                         lldbutil.get_signal_number('SIGTRAP'))
888
889        # Ensure we did not receive any output.  If the breakpoint was not set, we would
890        # see output (from a launched process with captured stdio) printing a hello, world message.
891        # That would indicate the breakpoint didn't take.
892        self.assertEqual(len(context["O_content"]), 0)
893
894        # Verify that the PC for the main thread is where we expect it - right at the breakpoint address.
895        # This acts as a another validation on the register reading code.
896        self.reset_test_sequence()
897        self.test_sequence.add_log_lines(
898            [
899                # Print the PC.  This should match the breakpoint address.
900                "read packet: $p{0:x}#00".format(pc_lldb_reg_index),
901                # Capture $p results.
902                {"direction": "send",
903                 "regex": r"^\$([0-9a-fA-F]+)#",
904                 "capture": {1: "p_response"}},
905            ], True)
906
907        context = self.expect_gdbremote_sequence()
908        self.assertIsNotNone(context)
909
910        # Verify the PC is where we expect.  Note response is in endianness of
911        # the inferior.
912        p_response = context.get("p_response")
913        self.assertIsNotNone(p_response)
914
915        # Convert from target endian to int.
916        returned_pc = lldbgdbserverutils.unpack_register_hex_unsigned(
917            endian, p_response)
918        self.assertEqual(returned_pc, function_address)
919
920        # Verify that a breakpoint remove and continue gets us the expected
921        # output.
922        self.reset_test_sequence()
923
924        # Add breakpoint remove packets
925        self.add_remove_breakpoint_packets(
926            function_address,
927            z_packet_type,
928            breakpoint_kind=BREAKPOINT_KIND)
929
930        self.test_sequence.add_log_lines(
931            [
932                # Continue running.
933                "read packet: $c#63",
934                # We should now receive the output from the call.
935                {"type": "output_match", "regex": r"^hello, world\r\n$"},
936                # And wait for program completion.
937                {"direction": "send", "regex": r"^\$W00(.*)#[0-9a-fA-F]{2}$"},
938            ], True)
939
940        context = self.expect_gdbremote_sequence()
941        self.assertIsNotNone(context)
942
943    @skipIfWindows # No pty support to test any inferior output
944    def test_software_breakpoint_set_and_remove_work(self):
945        if self.getArchitecture() == "arm":
946            # TODO: Handle case when setting breakpoint in thumb code
947            self.build(dictionary={'CFLAGS_EXTRAS': '-marm'})
948        else:
949            self.build()
950        self.set_inferior_startup_launch()
951        self.breakpoint_set_and_remove_work(want_hardware=False)
952
953    @skipUnlessPlatform(oslist=['linux'])
954    @skipIf(archs=no_match(['arm', 'aarch64']))
955    def test_hardware_breakpoint_set_and_remove_work(self):
956        if self.getArchitecture() == "arm":
957            # TODO: Handle case when setting breakpoint in thumb code
958            self.build(dictionary={'CFLAGS_EXTRAS': '-marm'})
959        else:
960            self.build()
961        self.set_inferior_startup_launch()
962        self.breakpoint_set_and_remove_work(want_hardware=True)
963
964    def get_qSupported_dict(self, features=[]):
965        self.build()
966        self.set_inferior_startup_launch()
967
968        # Start up the stub and start/prep the inferior.
969        procs = self.prep_debug_monitor_and_inferior()
970        self.add_qSupported_packets(features)
971
972        # Run the packet stream.
973        context = self.expect_gdbremote_sequence()
974        self.assertIsNotNone(context)
975
976        # Retrieve the qSupported features.
977        return self.parse_qSupported_response(context)
978
979    def test_qSupported_returns_known_stub_features(self):
980        supported_dict = self.get_qSupported_dict()
981        self.assertIsNotNone(supported_dict)
982        self.assertTrue(len(supported_dict) > 0)
983
984    def test_qSupported_auvx(self):
985        expected = ('+' if lldbplatformutil.getPlatform()
986                    in ["freebsd", "linux", "netbsd"] else '-')
987        supported_dict = self.get_qSupported_dict()
988        self.assertEqual(supported_dict.get('qXfer:auxv:read', '-'), expected)
989
990    def test_qSupported_libraries_svr4(self):
991        expected = ('+' if lldbplatformutil.getPlatform()
992                    in ["freebsd", "linux", "netbsd"] else '-')
993        supported_dict = self.get_qSupported_dict()
994        self.assertEqual(supported_dict.get('qXfer:libraries-svr4:read', '-'),
995                         expected)
996
997    def test_qSupported_siginfo_read(self):
998        expected = ('+' if lldbplatformutil.getPlatform()
999                    in ["freebsd", "linux"] else '-')
1000        supported_dict = self.get_qSupported_dict()
1001        self.assertEqual(supported_dict.get('qXfer:siginfo:read', '-'),
1002                         expected)
1003
1004    def test_qSupported_QPassSignals(self):
1005        expected = ('+' if lldbplatformutil.getPlatform()
1006                    in ["freebsd", "linux", "netbsd"] else '-')
1007        supported_dict = self.get_qSupported_dict()
1008        self.assertEqual(supported_dict.get('QPassSignals', '-'), expected)
1009
1010    @add_test_categories(["fork"])
1011    def test_qSupported_fork_events(self):
1012        supported_dict = (
1013            self.get_qSupported_dict(['multiprocess+', 'fork-events+']))
1014        self.assertEqual(supported_dict.get('multiprocess', '-'), '+')
1015        self.assertEqual(supported_dict.get('fork-events', '-'), '+')
1016        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
1017
1018    @add_test_categories(["fork"])
1019    def test_qSupported_fork_events_without_multiprocess(self):
1020        supported_dict = (
1021            self.get_qSupported_dict(['fork-events+']))
1022        self.assertEqual(supported_dict.get('multiprocess', '-'), '-')
1023        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
1024        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
1025
1026    @add_test_categories(["fork"])
1027    def test_qSupported_vfork_events(self):
1028        supported_dict = (
1029            self.get_qSupported_dict(['multiprocess+', 'vfork-events+']))
1030        self.assertEqual(supported_dict.get('multiprocess', '-'), '+')
1031        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
1032        self.assertEqual(supported_dict.get('vfork-events', '-'), '+')
1033
1034    @add_test_categories(["fork"])
1035    def test_qSupported_vfork_events_without_multiprocess(self):
1036        supported_dict = (
1037            self.get_qSupported_dict(['vfork-events+']))
1038        self.assertEqual(supported_dict.get('multiprocess', '-'), '-')
1039        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
1040        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
1041
1042    # We need to be able to self.runCmd to get cpuinfo,
1043    # which is not possible when using a remote platform.
1044    @skipIfRemote
1045    def test_qSupported_memory_tagging(self):
1046        supported_dict = self.get_qSupported_dict()
1047        self.assertEqual(supported_dict.get("memory-tagging", '-'),
1048                         '+' if self.isAArch64MTE() else '-')
1049
1050    @skipIfWindows # No pty support to test any inferior output
1051    def test_written_M_content_reads_back_correctly(self):
1052        self.build()
1053        self.set_inferior_startup_launch()
1054
1055        TEST_MESSAGE = "Hello, memory"
1056
1057        # Start up the stub and start/prep the inferior.
1058        procs = self.prep_debug_monitor_and_inferior(
1059            inferior_args=[
1060                "set-message:xxxxxxxxxxxxxX",
1061                "get-data-address-hex:g_message",
1062                "sleep:1",
1063                "print-message:"])
1064        self.test_sequence.add_log_lines(
1065            [
1066                # Start running after initial stop.
1067                "read packet: $c#63",
1068                # Match output line that prints the memory address of the message buffer within the inferior.
1069                # Note we require launch-only testing so we can get inferior otuput.
1070                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
1071                 "capture": {1: "message_address"}},
1072                # Now stop the inferior.
1073                "read packet: {}".format(chr(3)),
1074                # And wait for the stop notification.
1075                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
1076            True)
1077        context = self.expect_gdbremote_sequence()
1078        self.assertIsNotNone(context)
1079
1080        # Grab the message address.
1081        self.assertIsNotNone(context.get("message_address"))
1082        message_address = int(context.get("message_address"), 16)
1083
1084        # Hex-encode the test message, adding null termination.
1085        hex_encoded_message = seven.hexlify(TEST_MESSAGE)
1086
1087        # Write the message to the inferior. Verify that we can read it with the hex-encoded (m)
1088        # and binary (x) memory read packets.
1089        self.reset_test_sequence()
1090        self.test_sequence.add_log_lines(
1091            ["read packet: $M{0:x},{1:x}:{2}#00".format(message_address, len(TEST_MESSAGE), hex_encoded_message),
1092             "send packet: $OK#00",
1093             "read packet: $m{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)),
1094             "send packet: ${0}#00".format(hex_encoded_message),
1095             "read packet: $x{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)),
1096             "send packet: ${0}#00".format(TEST_MESSAGE),
1097             "read packet: $m{0:x},4#00".format(message_address),
1098             "send packet: ${0}#00".format(hex_encoded_message[0:8]),
1099             "read packet: $x{0:x},4#00".format(message_address),
1100             "send packet: ${0}#00".format(TEST_MESSAGE[0:4]),
1101             "read packet: $c#63",
1102             {"type": "output_match", "regex": r"^message: (.+)\r\n$", "capture": {1: "printed_message"}},
1103             "send packet: $W00#00",
1104             ], True)
1105        context = self.expect_gdbremote_sequence()
1106        self.assertIsNotNone(context)
1107
1108        # Ensure what we read from inferior memory is what we wrote.
1109        printed_message = context.get("printed_message")
1110        self.assertIsNotNone(printed_message)
1111        self.assertEqual(printed_message, TEST_MESSAGE + "X")
1112
1113    # Note: as of this moment, a hefty number of the GPR writes are failing with E32 (everything except rax-rdx, rdi, rsi, rbp).
1114    # Come back to this.  I have the test rigged to verify that at least some
1115    # of the bit-flip writes work.
1116    def test_P_writes_all_gpr_registers(self):
1117        self.build()
1118        self.set_inferior_startup_launch()
1119
1120        # Start inferior debug session, grab all register info.
1121        procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"])
1122        self.add_register_info_collection_packets()
1123        self.add_process_info_collection_packets()
1124
1125        context = self.expect_gdbremote_sequence()
1126        self.assertIsNotNone(context)
1127
1128        # Process register infos.
1129        reg_infos = self.parse_register_info_packets(context)
1130        self.assertIsNotNone(reg_infos)
1131        self.add_lldb_register_index(reg_infos)
1132
1133        # Process endian.
1134        process_info = self.parse_process_info_response(context)
1135        endian = process_info.get("endian")
1136        self.assertIsNotNone(endian)
1137
1138        # Pull out the register infos that we think we can bit flip
1139        # successfully,.
1140        gpr_reg_infos = [
1141            reg_info for reg_info in reg_infos if self.is_bit_flippable_register(reg_info)]
1142        self.assertTrue(len(gpr_reg_infos) > 0)
1143
1144        # Write flipped bit pattern of existing value to each register.
1145        (successful_writes, failed_writes) = self.flip_all_bits_in_each_register_value(
1146            gpr_reg_infos, endian)
1147        self.trace("successful writes: {}, failed writes: {}".format(successful_writes, failed_writes))
1148        self.assertTrue(successful_writes > 0)
1149
1150    # Note: as of this moment, a hefty number of the GPR writes are failing
1151    # with E32 (everything except rax-rdx, rdi, rsi, rbp).
1152    @skipIfWindows
1153    def test_P_and_p_thread_suffix_work(self):
1154        self.build()
1155        self.set_inferior_startup_launch()
1156
1157        # Startup the inferior with three threads.
1158        procs = self.prep_debug_monitor_and_inferior(
1159            inferior_args=["thread:new", "thread:new"])
1160        self.add_thread_suffix_request_packets()
1161        self.add_register_info_collection_packets()
1162        self.add_process_info_collection_packets()
1163
1164        context = self.expect_gdbremote_sequence()
1165        self.assertIsNotNone(context)
1166
1167        process_info = self.parse_process_info_response(context)
1168        self.assertIsNotNone(process_info)
1169        endian = process_info.get("endian")
1170        self.assertIsNotNone(endian)
1171
1172        reg_infos = self.parse_register_info_packets(context)
1173        self.assertIsNotNone(reg_infos)
1174        self.add_lldb_register_index(reg_infos)
1175
1176        reg_index = self.select_modifiable_register(reg_infos)
1177        self.assertIsNotNone(reg_index)
1178        reg_byte_size = int(reg_infos[reg_index]["bitsize"]) // 8
1179        self.assertTrue(reg_byte_size > 0)
1180
1181        # Run the process a bit so threads can start up, and collect register
1182        # info.
1183        context = self.run_process_then_stop(run_seconds=1)
1184        self.assertIsNotNone(context)
1185
1186        # Wait for 3 threads to be present.
1187        threads = self.wait_for_thread_count(3)
1188        self.assertEqual(len(threads), 3)
1189
1190        expected_reg_values = []
1191        register_increment = 1
1192        next_value = None
1193
1194        # Set the same register in each of 3 threads to a different value.
1195        # Verify each one has the unique value.
1196        for thread in threads:
1197            # If we don't have a next value yet, start it with the initial read
1198            # value + 1
1199            if not next_value:
1200                # Read pre-existing register value.
1201                self.reset_test_sequence()
1202                self.test_sequence.add_log_lines(
1203                    ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread),
1204                     {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1205                     ], True)
1206                context = self.expect_gdbremote_sequence()
1207                self.assertIsNotNone(context)
1208
1209                # Set the next value to use for writing as the increment plus
1210                # current value.
1211                p_response = context.get("p_response")
1212                self.assertIsNotNone(p_response)
1213                next_value = lldbgdbserverutils.unpack_register_hex_unsigned(
1214                    endian, p_response)
1215
1216            # Set new value using P and thread suffix.
1217            self.reset_test_sequence()
1218            self.test_sequence.add_log_lines(
1219                [
1220                    "read packet: $P{0:x}={1};thread:{2:x}#00".format(
1221                        reg_index,
1222                        lldbgdbserverutils.pack_register_hex(
1223                            endian,
1224                            next_value,
1225                            byte_size=reg_byte_size),
1226                        thread),
1227                    "send packet: $OK#00",
1228                ],
1229                True)
1230            context = self.expect_gdbremote_sequence()
1231            self.assertIsNotNone(context)
1232
1233            # Save the value we set.
1234            expected_reg_values.append(next_value)
1235
1236            # Increment value for next thread to use (we want them all
1237            # different so we can verify they wrote to each thread correctly
1238            # next.)
1239            next_value += register_increment
1240
1241        # Revisit each thread and verify they have the expected value set for
1242        # the register we wrote.
1243        thread_index = 0
1244        for thread in threads:
1245            # Read pre-existing register value.
1246            self.reset_test_sequence()
1247            self.test_sequence.add_log_lines(
1248                ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread),
1249                 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1250                 ], True)
1251            context = self.expect_gdbremote_sequence()
1252            self.assertIsNotNone(context)
1253
1254            # Get the register value.
1255            p_response = context.get("p_response")
1256            self.assertIsNotNone(p_response)
1257            read_value = lldbgdbserverutils.unpack_register_hex_unsigned(
1258                endian, p_response)
1259
1260            # Make sure we read back what we wrote.
1261            self.assertEqual(read_value, expected_reg_values[thread_index])
1262            thread_index += 1
1263
1264    @skipIfWindows # No pty support to test any inferior output
1265    @add_test_categories(["llgs"])
1266    def test_launch_via_A(self):
1267        self.build()
1268        exe_path = self.getBuildArtifact("a.out")
1269        args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"]
1270        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1271
1272        server = self.connect_to_debug_monitor()
1273        self.assertIsNotNone(server)
1274        self.do_handshake()
1275        # NB: strictly speaking we should use %x here but this packet
1276        # is deprecated, so no point in changing lldb-server's expectations
1277        self.test_sequence.add_log_lines(
1278            ["read packet: $A %d,0,%s,%d,1,%s,%d,2,%s,%d,3,%s#00" %
1279             tuple(itertools.chain.from_iterable(
1280                 [(len(x), x) for x in hex_args])),
1281             "send packet: $OK#00",
1282             "read packet: $c#00",
1283             "send packet: $W00#00"],
1284            True)
1285        context = self.expect_gdbremote_sequence()
1286        self.assertEqual(context["O_content"],
1287                         b'arg1\r\narg2\r\narg3\r\n')
1288
1289    @skipIfWindows # No pty support to test any inferior output
1290    @add_test_categories(["llgs"])
1291    def test_launch_via_vRun(self):
1292        self.build()
1293        exe_path = self.getBuildArtifact("a.out")
1294        args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"]
1295        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1296
1297        server = self.connect_to_debug_monitor()
1298        self.assertIsNotNone(server)
1299        self.do_handshake()
1300        self.test_sequence.add_log_lines(
1301            ["read packet: $vRun;%s;%s;%s;%s#00" % tuple(hex_args),
1302             {"direction": "send",
1303              "regex": r"^\$T([0-9a-fA-F]+)"},
1304             "read packet: $c#00",
1305             "send packet: $W00#00"],
1306            True)
1307        context = self.expect_gdbremote_sequence()
1308        self.assertEqual(context["O_content"],
1309                         b'arg1\r\narg2\r\narg3\r\n')
1310
1311    @add_test_categories(["llgs"])
1312    def test_launch_via_vRun_no_args(self):
1313        self.build()
1314        exe_path = self.getBuildArtifact("a.out")
1315        hex_path = binascii.b2a_hex(exe_path.encode()).decode()
1316
1317        server = self.connect_to_debug_monitor()
1318        self.assertIsNotNone(server)
1319        self.do_handshake()
1320        self.test_sequence.add_log_lines(
1321            ["read packet: $vRun;%s#00" % (hex_path,),
1322             {"direction": "send",
1323              "regex": r"^\$T([0-9a-fA-F]+)"},
1324             "read packet: $c#00",
1325             "send packet: $W00#00"],
1326            True)
1327        self.expect_gdbremote_sequence()
1328
1329    @skipIfWindows # No pty support to test any inferior output
1330    @add_test_categories(["llgs"])
1331    def test_QEnvironment(self):
1332        self.build()
1333        exe_path = self.getBuildArtifact("a.out")
1334        env = {"FOO": "test", "BAR": "a=z"}
1335        args = [exe_path, "print-env:FOO", "print-env:BAR"]
1336        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1337
1338        server = self.connect_to_debug_monitor()
1339        self.assertIsNotNone(server)
1340        self.do_handshake()
1341
1342        for key, value in env.items():
1343            self.test_sequence.add_log_lines(
1344                ["read packet: $QEnvironment:%s=%s#00" % (key, value),
1345                 "send packet: $OK#00"],
1346                True)
1347        self.test_sequence.add_log_lines(
1348            ["read packet: $vRun;%s#00" % (";".join(hex_args),),
1349             {"direction": "send",
1350              "regex": r"^\$T([0-9a-fA-F]+)"},
1351             "read packet: $c#00",
1352             "send packet: $W00#00"],
1353            True)
1354        context = self.expect_gdbremote_sequence()
1355        self.assertEqual(context["O_content"], b"test\r\na=z\r\n")
1356
1357    @skipIfWindows # No pty support to test any inferior output
1358    @add_test_categories(["llgs"])
1359    def test_QEnvironmentHexEncoded(self):
1360        self.build()
1361        exe_path = self.getBuildArtifact("a.out")
1362        env = {"FOO": "test", "BAR": "a=z", "BAZ": "a*}#z"}
1363        args = [exe_path, "print-env:FOO", "print-env:BAR", "print-env:BAZ"]
1364        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1365
1366        server = self.connect_to_debug_monitor()
1367        self.assertIsNotNone(server)
1368        self.do_handshake()
1369
1370        for key, value in env.items():
1371            hex_enc = binascii.b2a_hex(("%s=%s" % (key, value)).encode()).decode()
1372            self.test_sequence.add_log_lines(
1373                ["read packet: $QEnvironmentHexEncoded:%s#00" % (hex_enc,),
1374                 "send packet: $OK#00"],
1375                True)
1376        self.test_sequence.add_log_lines(
1377            ["read packet: $vRun;%s#00" % (";".join(hex_args),),
1378             {"direction": "send",
1379              "regex": r"^\$T([0-9a-fA-F]+)"},
1380             "read packet: $c#00",
1381             "send packet: $W00#00"],
1382            True)
1383        context = self.expect_gdbremote_sequence()
1384        self.assertEqual(context["O_content"], b"test\r\na=z\r\na*}#z\r\n")
1385
1386    @skipUnlessPlatform(oslist=["freebsd", "linux"])
1387    @add_test_categories(["llgs"])
1388    def test_qXfer_siginfo_read(self):
1389        self.build()
1390        self.set_inferior_startup_launch()
1391        procs = self.prep_debug_monitor_and_inferior(
1392                inferior_args=["thread:segfault", "thread:new", "sleep:10"])
1393        self.test_sequence.add_log_lines(["read packet: $c#63"], True)
1394        self.expect_gdbremote_sequence()
1395
1396        # Run until SIGSEGV comes in.
1397        self.reset_test_sequence()
1398        self.test_sequence.add_log_lines(
1399            [{"direction": "send",
1400              "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1401              "capture": {1: "signo", 2: "thread_id"},
1402              }], True)
1403
1404        # Figure out which thread crashed.
1405        context = self.expect_gdbremote_sequence()
1406        self.assertIsNotNone(context)
1407        self.assertEqual(int(context["signo"], 16),
1408                         lldbutil.get_signal_number('SIGSEGV'))
1409        crashing_thread = int(context["thread_id"], 16)
1410
1411        # Grab siginfo for the crashing thread.
1412        self.reset_test_sequence()
1413        self.add_process_info_collection_packets()
1414        self.test_sequence.add_log_lines(
1415            ["read packet: $Hg{:x}#00".format(crashing_thread),
1416             "send packet: $OK#00",
1417             "read packet: $qXfer:siginfo:read::0,80:#00",
1418             {"direction": "send",
1419              "regex": re.compile(r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
1420                                  re.MULTILINE | re.DOTALL),
1421              "capture": {1: "response_type", 2: "content_raw"},
1422              }], True)
1423        context = self.expect_gdbremote_sequence()
1424        self.assertIsNotNone(context)
1425
1426        # Ensure we end up with all data in one packet.
1427        self.assertEqual(context.get("response_type"), "l")
1428
1429        # Decode binary data.
1430        content_raw = context.get("content_raw")
1431        self.assertIsNotNone(content_raw)
1432        content = self.decode_gdbremote_binary(content_raw).encode("latin1")
1433
1434        # Decode siginfo_t.
1435        process_info = self.parse_process_info_response(context)
1436        pad = ""
1437        if process_info["ptrsize"] == "8":
1438            pad = "i"
1439        signo_idx = 0
1440        errno_idx = 1
1441        code_idx = 2
1442        addr_idx = -1
1443        SEGV_MAPERR = 1
1444        if process_info["ostype"] == "linux":
1445            # si_signo, si_errno, si_code, [pad], _sifields._sigfault.si_addr
1446            format_str = "iii{}P".format(pad)
1447        elif process_info["ostype"].startswith("freebsd"):
1448            # si_signo, si_errno, si_code, si_pid, si_uid, si_status, si_addr
1449            format_str = "iiiiiiP"
1450        elif process_info["ostype"].startswith("netbsd"):
1451            # _signo, _code, _errno, [pad], _reason._fault._addr
1452            format_str = "iii{}P".format(pad)
1453            errno_idx = 2
1454            code_idx = 1
1455        else:
1456            assert False, "unknown ostype"
1457
1458        decoder = struct.Struct(format_str)
1459        decoded = decoder.unpack(content[:decoder.size])
1460        self.assertEqual(decoded[signo_idx],
1461                         lldbutil.get_signal_number('SIGSEGV'))
1462        self.assertEqual(decoded[errno_idx], 0)  # si_errno
1463        self.assertEqual(decoded[code_idx], SEGV_MAPERR)  # si_code
1464        self.assertEqual(decoded[addr_idx], 0)  # si_addr
1465