1"""
2Test case for testing the gdbremote protocol.
3
4Tests run against debugserver and lldb-server (llgs).
5lldb-server tests run where the lldb-server exe is
6available.
7
8This class will be broken into smaller test case classes by
9gdb remote packet functional areas.  For now it contains
10the initial set of tests implemented.
11"""
12
13import binascii
14import itertools
15
16import unittest2
17import gdbremote_testcase
18import lldbgdbserverutils
19from lldbsuite.support import seven
20from lldbsuite.test.decorators import *
21from lldbsuite.test.lldbtest import *
22from lldbsuite.test.lldbdwarf import *
23from lldbsuite.test import lldbutil, lldbplatformutil
24
25
26class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase, DwarfOpcodeParser):
27
28    mydir = TestBase.compute_mydir(__file__)
29
30    def test_thread_suffix_supported(self):
31        server = self.connect_to_debug_monitor()
32        self.assertIsNotNone(server)
33
34        self.do_handshake()
35        self.test_sequence.add_log_lines(
36            ["lldb-server <  26> read packet: $QThreadSuffixSupported#e4",
37             "lldb-server <   6> send packet: $OK#9a"],
38            True)
39
40        self.expect_gdbremote_sequence()
41
42
43    def test_list_threads_in_stop_reply_supported(self):
44        server = self.connect_to_debug_monitor()
45        self.assertIsNotNone(server)
46
47        self.do_handshake()
48        self.test_sequence.add_log_lines(
49            ["lldb-server <  27> read packet: $QListThreadsInStopReply#21",
50             "lldb-server <   6> send packet: $OK#9a"],
51            True)
52        self.expect_gdbremote_sequence()
53
54    def test_c_packet_works(self):
55        self.build()
56        procs = self.prep_debug_monitor_and_inferior()
57        self.test_sequence.add_log_lines(
58            ["read packet: $c#63",
59             "send packet: $W00#00"],
60            True)
61
62        self.expect_gdbremote_sequence()
63
64    @skipIfWindows # No pty support to test any inferior output
65    def test_inferior_print_exit(self):
66        self.build()
67        procs = self.prep_debug_monitor_and_inferior(
68                inferior_args=["hello, world"])
69        self.test_sequence.add_log_lines(
70            ["read packet: $vCont;c#a8",
71             {"type": "output_match", "regex": self.maybe_strict_output_regex(r"hello, world\r\n")},
72             "send packet: $W00#00"],
73            True)
74
75        context = self.expect_gdbremote_sequence()
76        self.assertIsNotNone(context)
77
78    def test_first_launch_stop_reply_thread_matches_first_qC(self):
79        self.build()
80        procs = self.prep_debug_monitor_and_inferior()
81        self.test_sequence.add_log_lines(["read packet: $qC#00",
82                                          {"direction": "send",
83                                           "regex": r"^\$QC([0-9a-fA-F]+)#",
84                                           "capture": {1: "thread_id_QC"}},
85                                          "read packet: $?#00",
86                                          {"direction": "send",
87                                              "regex": r"^\$T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+)",
88                                              "capture": {1: "thread_id_?"}}],
89                                         True)
90        context = self.expect_gdbremote_sequence()
91        self.assertEqual(context.get("thread_id_QC"), context.get("thread_id_?"))
92
93    def test_attach_commandline_continue_app_exits(self):
94        self.build()
95        self.set_inferior_startup_attach()
96        procs = self.prep_debug_monitor_and_inferior()
97        self.test_sequence.add_log_lines(
98            ["read packet: $vCont;c#a8",
99             "send packet: $W00#00"],
100            True)
101        self.expect_gdbremote_sequence()
102
103        # Wait a moment for completed and now-detached inferior process to
104        # clear.
105        time.sleep(1)
106
107        if not lldb.remote_platform:
108            # Process should be dead now. Reap results.
109            poll_result = procs["inferior"].poll()
110            self.assertIsNotNone(poll_result)
111
112        # Where possible, verify at the system level that the process is not
113        # running.
114        self.assertFalse(
115            lldbgdbserverutils.process_is_running(
116                procs["inferior"].pid, False))
117
118    def test_qRegisterInfo_returns_one_valid_result(self):
119        self.build()
120        self.prep_debug_monitor_and_inferior()
121        self.test_sequence.add_log_lines(
122            ["read packet: $qRegisterInfo0#00",
123             {"direction": "send", "regex": r"^\$(.+);#[0-9A-Fa-f]{2}", "capture": {1: "reginfo_0"}}],
124            True)
125
126        # Run the stream
127        context = self.expect_gdbremote_sequence()
128        self.assertIsNotNone(context)
129
130        reg_info_packet = context.get("reginfo_0")
131        self.assertIsNotNone(reg_info_packet)
132        self.assert_valid_reg_info(
133            lldbgdbserverutils.parse_reg_info_response(reg_info_packet))
134
135    def test_qRegisterInfo_returns_all_valid_results(self):
136        self.build()
137        self.prep_debug_monitor_and_inferior()
138        self.add_register_info_collection_packets()
139
140        # Run the stream.
141        context = self.expect_gdbremote_sequence()
142        self.assertIsNotNone(context)
143
144        # Validate that each register info returned validates.
145        for reg_info in self.parse_register_info_packets(context):
146            self.assert_valid_reg_info(reg_info)
147
148    def test_qRegisterInfo_contains_required_generics_debugserver(self):
149        self.build()
150        self.prep_debug_monitor_and_inferior()
151        self.add_register_info_collection_packets()
152
153        # Run the packet stream.
154        context = self.expect_gdbremote_sequence()
155        self.assertIsNotNone(context)
156
157        # Gather register info entries.
158        reg_infos = self.parse_register_info_packets(context)
159
160        # Collect all generic registers found.
161        generic_regs = {
162            reg_info['generic']: 1 for reg_info in reg_infos if 'generic' in reg_info}
163
164        # Ensure we have a program counter register.
165        self.assertIn('pc', generic_regs)
166
167        # Ensure we have a frame pointer register. PPC64le's FP is the same as SP
168        if self.getArchitecture() != 'powerpc64le':
169            self.assertIn('fp', generic_regs)
170
171        # Ensure we have a stack pointer register.
172        self.assertIn('sp', generic_regs)
173
174        # Ensure we have a flags register.
175        self.assertIn('flags', generic_regs)
176
177    def test_qRegisterInfo_contains_at_least_one_register_set(self):
178        self.build()
179        self.prep_debug_monitor_and_inferior()
180        self.add_register_info_collection_packets()
181
182        # Run the packet stream.
183        context = self.expect_gdbremote_sequence()
184        self.assertIsNotNone(context)
185
186        # Gather register info entries.
187        reg_infos = self.parse_register_info_packets(context)
188
189        # Collect all register sets found.
190        register_sets = {
191            reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info}
192        self.assertTrue(len(register_sets) >= 1)
193
194    def targetHasAVX(self):
195        triple = self.dbg.GetSelectedPlatform().GetTriple()
196
197        # TODO other platforms, please implement this function
198        if not re.match(".*-.*-linux", triple):
199            return True
200
201        # Need to do something different for non-Linux/Android targets
202        if lldb.remote_platform:
203            self.runCmd('platform get-file "/proc/cpuinfo" "cpuinfo"')
204            cpuinfo_path = "cpuinfo"
205            self.addTearDownHook(lambda: os.unlink("cpuinfo"))
206        else:
207            cpuinfo_path = "/proc/cpuinfo"
208
209        f = open(cpuinfo_path, 'r')
210        cpuinfo = f.read()
211        f.close()
212        return " avx " in cpuinfo
213
214    @expectedFailureAll(oslist=["windows"]) # no avx for now.
215    @skipIf(archs=no_match(['amd64', 'i386', 'x86_64']))
216    @add_test_categories(["llgs"])
217    def test_qRegisterInfo_contains_avx_registers(self):
218        self.build()
219        self.prep_debug_monitor_and_inferior()
220        self.add_register_info_collection_packets()
221
222        # Run the packet stream.
223        context = self.expect_gdbremote_sequence()
224        self.assertIsNotNone(context)
225
226        # Gather register info entries.
227        reg_infos = self.parse_register_info_packets(context)
228
229        # Collect all generics found.
230        register_sets = {
231            reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info}
232        self.assertEqual(
233            self.targetHasAVX(),
234            "Advanced Vector Extensions" in register_sets)
235
236    def qThreadInfo_contains_thread(self):
237        procs = self.prep_debug_monitor_and_inferior()
238        self.add_threadinfo_collection_packets()
239
240        # Run the packet stream.
241        context = self.expect_gdbremote_sequence()
242        self.assertIsNotNone(context)
243
244        # Gather threadinfo entries.
245        threads = self.parse_threadinfo_packets(context)
246        self.assertIsNotNone(threads)
247
248        # We should have exactly one thread.
249        self.assertEqual(len(threads), 1)
250
251    def test_qThreadInfo_contains_thread_launch(self):
252        self.build()
253        self.set_inferior_startup_launch()
254        self.qThreadInfo_contains_thread()
255
256    @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped
257    def test_qThreadInfo_contains_thread_attach(self):
258        self.build()
259        self.set_inferior_startup_attach()
260        self.qThreadInfo_contains_thread()
261
262    def qThreadInfo_matches_qC(self):
263        procs = self.prep_debug_monitor_and_inferior()
264
265        self.add_threadinfo_collection_packets()
266        self.test_sequence.add_log_lines(
267            ["read packet: $qC#00",
268             {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}
269             ], True)
270
271        # Run the packet stream.
272        context = self.expect_gdbremote_sequence()
273        self.assertIsNotNone(context)
274
275        # Gather threadinfo entries.
276        threads = self.parse_threadinfo_packets(context)
277        self.assertIsNotNone(threads)
278
279        # We should have exactly one thread from threadinfo.
280        self.assertEqual(len(threads), 1)
281
282        # We should have a valid thread_id from $QC.
283        QC_thread_id_hex = context.get("thread_id")
284        self.assertIsNotNone(QC_thread_id_hex)
285        QC_thread_id = int(QC_thread_id_hex, 16)
286
287        # Those two should be the same.
288        self.assertEqual(threads[0], QC_thread_id)
289
290    def test_qThreadInfo_matches_qC_launch(self):
291        self.build()
292        self.set_inferior_startup_launch()
293        self.qThreadInfo_matches_qC()
294
295    @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped
296    def test_qThreadInfo_matches_qC_attach(self):
297        self.build()
298        self.set_inferior_startup_attach()
299        self.qThreadInfo_matches_qC()
300
301    def test_p_returns_correct_data_size_for_each_qRegisterInfo_launch(self):
302        self.build()
303        self.set_inferior_startup_launch()
304        procs = self.prep_debug_monitor_and_inferior()
305        self.add_register_info_collection_packets()
306
307        # Run the packet stream.
308        context = self.expect_gdbremote_sequence()
309        self.assertIsNotNone(context)
310
311        # Gather register info entries.
312        reg_infos = self.parse_register_info_packets(context)
313        self.assertIsNotNone(reg_infos)
314        self.assertTrue(len(reg_infos) > 0)
315
316        byte_order = self.get_target_byte_order()
317
318        # Read value for each register.
319        reg_index = 0
320        for reg_info in reg_infos:
321            # Skip registers that don't have a register set.  For x86, these are
322            # the DRx registers, which have no LLDB-kind register number and thus
323            # cannot be read via normal
324            # NativeRegisterContext::ReadRegister(reg_info,...) calls.
325            if not "set" in reg_info:
326                continue
327
328            # Clear existing packet expectations.
329            self.reset_test_sequence()
330
331            # Run the register query
332            self.test_sequence.add_log_lines(
333                ["read packet: $p{0:x}#00".format(reg_index),
334                 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}],
335                True)
336            context = self.expect_gdbremote_sequence()
337            self.assertIsNotNone(context)
338
339            # Verify the response length.
340            p_response = context.get("p_response")
341            self.assertIsNotNone(p_response)
342
343            # Skip erraneous (unsupported) registers.
344            # TODO: remove this once we make unsupported registers disappear.
345            if p_response.startswith("E") and len(p_response) == 3:
346                continue
347
348            if "dynamic_size_dwarf_expr_bytes" in reg_info:
349                self.updateRegInfoBitsize(reg_info, byte_order)
350            self.assertEqual(len(p_response), 2 * int(reg_info["bitsize"]) / 8,
351                             reg_info)
352
353            # Increment loop
354            reg_index += 1
355
356    def Hg_switches_to_3_threads(self, pass_pid=False):
357        # Startup the inferior with three threads (main + 2 new ones).
358        procs = self.prep_debug_monitor_and_inferior(
359            inferior_args=["thread:new", "thread:new"])
360
361        # Let the inferior process have a few moments to start up the thread
362        # when launched.  (The launch scenario has no time to run, so threads
363        # won't be there yet.)
364        self.run_process_then_stop(run_seconds=1)
365
366        # Wait at most x seconds for 3 threads to be present.
367        threads = self.wait_for_thread_count(3)
368        self.assertEqual(len(threads), 3)
369
370        pid_str = ""
371        if pass_pid:
372            pid_str = "p{0:x}.".format(procs["inferior"].pid)
373
374        # verify we can $H to each thead, and $qC matches the thread we set.
375        for thread in threads:
376            # Change to each thread, verify current thread id.
377            self.reset_test_sequence()
378            self.test_sequence.add_log_lines(
379                ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread),  # Set current thread.
380                 "send packet: $OK#00",
381                 "read packet: $qC#00",
382                 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}],
383                True)
384
385            context = self.expect_gdbremote_sequence()
386            self.assertIsNotNone(context)
387
388            # Verify the thread id.
389            self.assertIsNotNone(context.get("thread_id"))
390            self.assertEqual(int(context.get("thread_id"), 16), thread)
391
392    @expectedFailureAll(oslist=["windows"]) # expect 4 threads
393    @skipIf(compiler="clang", compiler_version=['<', '11.0'])
394    def test_Hg_switches_to_3_threads_launch(self):
395        self.build()
396        self.set_inferior_startup_launch()
397        self.Hg_switches_to_3_threads()
398
399    @expectedFailureAll(oslist=["windows"]) # expecting one more thread
400    @skipIf(compiler="clang", compiler_version=['<', '11.0'])
401    def test_Hg_switches_to_3_threads_attach(self):
402        self.build()
403        self.set_inferior_startup_attach()
404        self.Hg_switches_to_3_threads()
405
406    @expectedFailureAll(oslist=["windows"]) # expect 4 threads
407    @add_test_categories(["llgs"])
408    @skipIf(compiler="clang", compiler_version=['<', '11.0'])
409    def test_Hg_switches_to_3_threads_attach_pass_correct_pid(self):
410        self.build()
411        self.set_inferior_startup_attach()
412        self.Hg_switches_to_3_threads(pass_pid=True)
413
414    def Hg_fails_on_pid(self, pass_pid):
415        # Start the inferior.
416        procs = self.prep_debug_monitor_and_inferior(
417            inferior_args=["thread:new"])
418
419        self.run_process_then_stop(run_seconds=1)
420
421        threads = self.wait_for_thread_count(2)
422        self.assertEqual(len(threads), 2)
423
424        if pass_pid == -1:
425            pid_str = "p-1."
426        else:
427            pid_str = "p{0:x}.".format(pass_pid)
428        thread = threads[1]
429
430        self.test_sequence.add_log_lines(
431            ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread),  # Set current thread.
432             "send packet: $Eff#00"],
433            True)
434
435        self.expect_gdbremote_sequence()
436
437    @expectedFailureAll(oslist=["windows"])
438    @add_test_categories(["llgs"])
439    def test_Hg_fails_on_another_pid(self):
440        self.build()
441        self.set_inferior_startup_launch()
442        self.Hg_fails_on_pid(1)
443
444    @expectedFailureAll(oslist=["windows"])
445    @add_test_categories(["llgs"])
446    def test_Hg_fails_on_zero_pid(self):
447        self.build()
448        self.set_inferior_startup_launch()
449        self.Hg_fails_on_pid(0)
450
451    @expectedFailureAll(oslist=["windows"])
452    @add_test_categories(["llgs"])
453    def test_Hg_fails_on_minus_one_pid(self):
454        self.build()
455        self.set_inferior_startup_launch()
456        self.Hg_fails_on_pid(-1)
457
458    def Hc_then_Csignal_signals_correct_thread(self, segfault_signo):
459        # NOTE only run this one in inferior-launched mode: we can't grab inferior stdout when running attached,
460        # and the test requires getting stdout from the exe.
461
462        NUM_THREADS = 3
463
464        # Startup the inferior with three threads (main + NUM_THREADS-1 worker threads).
465        # inferior_args=["thread:print-ids"]
466        inferior_args = ["thread:segfault"]
467        for i in range(NUM_THREADS - 1):
468            # if i > 0:
469                # Give time between thread creation/segfaulting for the handler to work.
470                # inferior_args.append("sleep:1")
471            inferior_args.append("thread:new")
472        inferior_args.append("sleep:10")
473
474        # Launch/attach.  (In our case, this should only ever be launched since
475        # we need inferior stdout/stderr).
476        procs = self.prep_debug_monitor_and_inferior(
477            inferior_args=inferior_args)
478        self.test_sequence.add_log_lines(["read packet: $c#63"], True)
479        context = self.expect_gdbremote_sequence()
480
481        # Let the inferior process have a few moments to start up the thread when launched.
482        # context = self.run_process_then_stop(run_seconds=1)
483
484        # Wait at most x seconds for all threads to be present.
485        # threads = self.wait_for_thread_count(NUM_THREADS)
486        # self.assertEquals(len(threads), NUM_THREADS)
487
488        signaled_tids = {}
489        print_thread_ids = {}
490
491        # Switch to each thread, deliver a signal, and verify signal delivery
492        for i in range(NUM_THREADS - 1):
493            # Run until SIGSEGV comes in.
494            self.reset_test_sequence()
495            self.test_sequence.add_log_lines([{"direction": "send",
496                                               "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
497                                               "capture": {1: "signo",
498                                                            2: "thread_id"}}],
499                                             True)
500
501            context = self.expect_gdbremote_sequence()
502            self.assertIsNotNone(context)
503            signo = context.get("signo")
504            self.assertEqual(int(signo, 16), segfault_signo)
505
506            # Ensure we haven't seen this tid yet.
507            thread_id = int(context.get("thread_id"), 16)
508            self.assertNotIn(thread_id, signaled_tids)
509            signaled_tids[thread_id] = 1
510
511            # Send SIGUSR1 to the thread that signaled the SIGSEGV.
512            self.reset_test_sequence()
513            self.test_sequence.add_log_lines(
514                [
515                    # Set the continue thread.
516                    # Set current thread.
517                    "read packet: $Hc{0:x}#00".format(thread_id),
518                    "send packet: $OK#00",
519
520                    # Continue sending the signal number to the continue thread.
521                    # The commented out packet is a way to do this same operation without using
522                    # a $Hc (but this test is testing $Hc, so we'll stick with the former).
523                    "read packet: $C{0:x}#00".format(lldbutil.get_signal_number('SIGUSR1')),
524                    # "read packet: $vCont;C{0:x}:{1:x};c#00".format(lldbutil.get_signal_number('SIGUSR1'), thread_id),
525
526                    # FIXME: Linux does not report the thread stop on the delivered signal (SIGUSR1 here).  MacOSX debugserver does.
527                    # But MacOSX debugserver isn't guaranteeing the thread the signal handler runs on, so currently its an XFAIL.
528                    # Need to rectify behavior here.  The linux behavior is more intuitive to me since we're essentially swapping out
529                    # an about-to-be-delivered signal (for which we already sent a stop packet) to a different signal.
530                    # {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
531                    #  "read packet: $c#63",
532                    {"type": "output_match", "regex": r"^received SIGUSR1 on thread id: ([0-9a-fA-F]+)\r\nthread ([0-9a-fA-F]+): past SIGSEGV\r\n", "capture": {1: "print_thread_id", 2: "post_handle_thread_id"}},
533                ],
534                True)
535
536            # Run the sequence.
537            context = self.expect_gdbremote_sequence()
538            self.assertIsNotNone(context)
539
540            # Ensure the stop signal is the signal we delivered.
541            # stop_signo = context.get("stop_signo")
542            # self.assertIsNotNone(stop_signo)
543            # self.assertEquals(int(stop_signo,16), lldbutil.get_signal_number('SIGUSR1'))
544
545            # Ensure the stop thread is the thread to which we delivered the signal.
546            # stop_thread_id = context.get("stop_thread_id")
547            # self.assertIsNotNone(stop_thread_id)
548            # self.assertEquals(int(stop_thread_id,16), thread_id)
549
550            # Ensure we haven't seen this thread id yet.  The inferior's
551            # self-obtained thread ids are not guaranteed to match the stub
552            # tids (at least on MacOSX).
553            print_thread_id = context.get("print_thread_id")
554            self.assertIsNotNone(print_thread_id)
555            print_thread_id = int(print_thread_id, 16)
556            self.assertNotIn(print_thread_id, print_thread_ids)
557
558            # Now remember this print (i.e. inferior-reflected) thread id and
559            # ensure we don't hit it again.
560            print_thread_ids[print_thread_id] = 1
561
562            # Ensure post signal-handle thread id matches the thread that
563            # initially raised the SIGSEGV.
564            post_handle_thread_id = context.get("post_handle_thread_id")
565            self.assertIsNotNone(post_handle_thread_id)
566            post_handle_thread_id = int(post_handle_thread_id, 16)
567            self.assertEqual(post_handle_thread_id, print_thread_id)
568
569    @expectedFailureDarwin
570    @skipIfWindows # no SIGSEGV support
571    @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48419")
572    @expectedFailureNetBSD
573    def test_Hc_then_Csignal_signals_correct_thread_launch(self):
574        self.build()
575        self.set_inferior_startup_launch()
576
577        if self.platformIsDarwin():
578            # Darwin debugserver translates some signals like SIGSEGV into some gdb
579            # expectations about fixed signal numbers.
580            self.Hc_then_Csignal_signals_correct_thread(self.TARGET_EXC_BAD_ACCESS)
581        else:
582            self.Hc_then_Csignal_signals_correct_thread(
583                lldbutil.get_signal_number('SIGSEGV'))
584
585    @skipIfWindows # No pty support to test any inferior output
586    def test_m_packet_reads_memory(self):
587        self.build()
588        self.set_inferior_startup_launch()
589        # This is the memory we will write into the inferior and then ensure we
590        # can read back with $m.
591        MEMORY_CONTENTS = "Test contents 0123456789 ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz"
592
593        # Start up the inferior.
594        procs = self.prep_debug_monitor_and_inferior(
595            inferior_args=[
596                "set-message:%s" %
597                MEMORY_CONTENTS,
598                "get-data-address-hex:g_message",
599                "sleep:5"])
600
601        # Run the process
602        self.test_sequence.add_log_lines(
603            [
604                # Start running after initial stop.
605                "read packet: $c#63",
606                # Match output line that prints the memory address of the message buffer within the inferior.
607                # Note we require launch-only testing so we can get inferior otuput.
608                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
609                 "capture": {1: "message_address"}},
610                # Now stop the inferior.
611                "read packet: {}".format(chr(3)),
612                # And wait for the stop notification.
613                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
614            True)
615
616        # Run the packet stream.
617        context = self.expect_gdbremote_sequence()
618        self.assertIsNotNone(context)
619
620        # Grab the message address.
621        self.assertIsNotNone(context.get("message_address"))
622        message_address = int(context.get("message_address"), 16)
623
624        # Grab contents from the inferior.
625        self.reset_test_sequence()
626        self.test_sequence.add_log_lines(
627            ["read packet: $m{0:x},{1:x}#00".format(message_address, len(MEMORY_CONTENTS)),
628             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "read_contents"}}],
629            True)
630
631        # Run the packet stream.
632        context = self.expect_gdbremote_sequence()
633        self.assertIsNotNone(context)
634
635        # Ensure what we read from inferior memory is what we wrote.
636        self.assertIsNotNone(context.get("read_contents"))
637        read_contents = seven.unhexlify(context.get("read_contents"))
638        self.assertEqual(read_contents, MEMORY_CONTENTS)
639
640    def test_qMemoryRegionInfo_is_supported(self):
641        self.build()
642        self.set_inferior_startup_launch()
643        # Start up the inferior.
644        procs = self.prep_debug_monitor_and_inferior()
645
646        # Ask if it supports $qMemoryRegionInfo.
647        self.test_sequence.add_log_lines(
648            ["read packet: $qMemoryRegionInfo#00",
649             "send packet: $OK#00"
650             ], True)
651        self.expect_gdbremote_sequence()
652
653    @skipIfWindows # No pty support to test any inferior output
654    def test_qMemoryRegionInfo_reports_code_address_as_executable(self):
655        self.build()
656        self.set_inferior_startup_launch()
657
658        # Start up the inferior.
659        procs = self.prep_debug_monitor_and_inferior(
660            inferior_args=["get-code-address-hex:hello", "sleep:5"])
661
662        # Run the process
663        self.test_sequence.add_log_lines(
664            [
665                # Start running after initial stop.
666                "read packet: $c#63",
667                # Match output line that prints the memory address of the message buffer within the inferior.
668                # Note we require launch-only testing so we can get inferior otuput.
669                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"),
670                 "capture": {1: "code_address"}},
671                # Now stop the inferior.
672                "read packet: {}".format(chr(3)),
673                # And wait for the stop notification.
674                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
675            True)
676
677        # Run the packet stream.
678        context = self.expect_gdbremote_sequence()
679        self.assertIsNotNone(context)
680
681        # Grab the code address.
682        self.assertIsNotNone(context.get("code_address"))
683        code_address = int(context.get("code_address"), 16)
684
685        # Grab memory region info from the inferior.
686        self.reset_test_sequence()
687        self.add_query_memory_region_packets(code_address)
688
689        # Run the packet stream.
690        context = self.expect_gdbremote_sequence()
691        self.assertIsNotNone(context)
692        mem_region_dict = self.parse_memory_region_packet(context)
693
694        # Ensure there are no errors reported.
695        self.assertNotIn("error", mem_region_dict)
696
697        # Ensure code address is readable and executable.
698        self.assertIn("permissions", mem_region_dict)
699        self.assertIn("r", mem_region_dict["permissions"])
700        self.assertIn("x", mem_region_dict["permissions"])
701
702        # Ensure the start address and size encompass the address we queried.
703        self.assert_address_within_memory_region(code_address, mem_region_dict)
704
705    @skipIfWindows # No pty support to test any inferior output
706    def test_qMemoryRegionInfo_reports_stack_address_as_rw(self):
707        self.build()
708        self.set_inferior_startup_launch()
709
710        # Start up the inferior.
711        procs = self.prep_debug_monitor_and_inferior(
712            inferior_args=["get-stack-address-hex:", "sleep:5"])
713
714        # Run the process
715        self.test_sequence.add_log_lines(
716            [
717                # Start running after initial stop.
718                "read packet: $c#63",
719                # Match output line that prints the memory address of the message buffer within the inferior.
720                # Note we require launch-only testing so we can get inferior otuput.
721                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"stack address: 0x([0-9a-fA-F]+)\r\n"),
722                 "capture": {1: "stack_address"}},
723                # Now stop the inferior.
724                "read packet: {}".format(chr(3)),
725                # And wait for the stop notification.
726                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
727            True)
728
729        # Run the packet stream.
730        context = self.expect_gdbremote_sequence()
731        self.assertIsNotNone(context)
732
733        # Grab the address.
734        self.assertIsNotNone(context.get("stack_address"))
735        stack_address = int(context.get("stack_address"), 16)
736
737        # Grab memory region info from the inferior.
738        self.reset_test_sequence()
739        self.add_query_memory_region_packets(stack_address)
740
741        # Run the packet stream.
742        context = self.expect_gdbremote_sequence()
743        self.assertIsNotNone(context)
744        mem_region_dict = self.parse_memory_region_packet(context)
745
746        # Ensure there are no errors reported.
747        self.assertNotIn("error", mem_region_dict)
748
749        # Ensure address is readable and executable.
750        self.assertIn("permissions", mem_region_dict)
751        self.assertIn("r", mem_region_dict["permissions"])
752        self.assertIn("w", mem_region_dict["permissions"])
753
754        # Ensure the start address and size encompass the address we queried.
755        self.assert_address_within_memory_region(
756            stack_address, mem_region_dict)
757
758    @skipIfWindows # No pty support to test any inferior output
759    def test_qMemoryRegionInfo_reports_heap_address_as_rw(self):
760        self.build()
761        self.set_inferior_startup_launch()
762
763        # Start up the inferior.
764        procs = self.prep_debug_monitor_and_inferior(
765            inferior_args=["get-heap-address-hex:", "sleep:5"])
766
767        # Run the process
768        self.test_sequence.add_log_lines(
769            [
770                # Start running after initial stop.
771                "read packet: $c#63",
772                # Match output line that prints the memory address of the message buffer within the inferior.
773                # Note we require launch-only testing so we can get inferior otuput.
774                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"heap address: 0x([0-9a-fA-F]+)\r\n"),
775                 "capture": {1: "heap_address"}},
776                # Now stop the inferior.
777                "read packet: {}".format(chr(3)),
778                # And wait for the stop notification.
779                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
780            True)
781
782        # Run the packet stream.
783        context = self.expect_gdbremote_sequence()
784        self.assertIsNotNone(context)
785
786        # Grab the address.
787        self.assertIsNotNone(context.get("heap_address"))
788        heap_address = int(context.get("heap_address"), 16)
789
790        # Grab memory region info from the inferior.
791        self.reset_test_sequence()
792        self.add_query_memory_region_packets(heap_address)
793
794        # Run the packet stream.
795        context = self.expect_gdbremote_sequence()
796        self.assertIsNotNone(context)
797        mem_region_dict = self.parse_memory_region_packet(context)
798
799        # Ensure there are no errors reported.
800        self.assertNotIn("error", mem_region_dict)
801
802        # Ensure address is readable and executable.
803        self.assertIn("permissions", mem_region_dict)
804        self.assertIn("r", mem_region_dict["permissions"])
805        self.assertIn("w", mem_region_dict["permissions"])
806
807        # Ensure the start address and size encompass the address we queried.
808        self.assert_address_within_memory_region(heap_address, mem_region_dict)
809
810    def breakpoint_set_and_remove_work(self, want_hardware):
811        # Start up the inferior.
812        procs = self.prep_debug_monitor_and_inferior(
813            inferior_args=[
814                "get-code-address-hex:hello",
815                "sleep:1",
816                "call-function:hello"])
817
818        # Run the process
819        self.add_register_info_collection_packets()
820        self.add_process_info_collection_packets()
821        self.test_sequence.add_log_lines(
822            [  # Start running after initial stop.
823                "read packet: $c#63",
824                # Match output line that prints the memory address of the function call entry point.
825                # Note we require launch-only testing so we can get inferior otuput.
826                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"),
827                 "capture": {1: "function_address"}},
828                # Now stop the inferior.
829                "read packet: {}".format(chr(3)),
830                # And wait for the stop notification.
831                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
832            True)
833
834        # Run the packet stream.
835        context = self.expect_gdbremote_sequence()
836        self.assertIsNotNone(context)
837
838        # Gather process info - we need endian of target to handle register
839        # value conversions.
840        process_info = self.parse_process_info_response(context)
841        endian = process_info.get("endian")
842        self.assertIsNotNone(endian)
843
844        # Gather register info entries.
845        reg_infos = self.parse_register_info_packets(context)
846        (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_infos)
847        self.assertIsNotNone(pc_lldb_reg_index)
848        self.assertIsNotNone(pc_reg_info)
849
850        # Grab the function address.
851        self.assertIsNotNone(context.get("function_address"))
852        function_address = int(context.get("function_address"), 16)
853
854        # Get current target architecture
855        target_arch = self.getArchitecture()
856
857        # Set the breakpoint.
858        if (target_arch == "arm") or (target_arch == "aarch64"):
859            # TODO: Handle case when setting breakpoint in thumb code
860            BREAKPOINT_KIND = 4
861        else:
862            BREAKPOINT_KIND = 1
863
864        # Set default packet type to Z0 (software breakpoint)
865        z_packet_type = 0
866
867        # If hardware breakpoint is requested set packet type to Z1
868        if want_hardware == True:
869            z_packet_type = 1
870
871        self.reset_test_sequence()
872        self.add_set_breakpoint_packets(
873            function_address,
874            z_packet_type,
875            do_continue=True,
876            breakpoint_kind=BREAKPOINT_KIND)
877
878        # Run the packet stream.
879        context = self.expect_gdbremote_sequence()
880        self.assertIsNotNone(context)
881
882        # Verify the stop signal reported was the breakpoint signal number.
883        stop_signo = context.get("stop_signo")
884        self.assertIsNotNone(stop_signo)
885        self.assertEqual(int(stop_signo, 16),
886                         lldbutil.get_signal_number('SIGTRAP'))
887
888        # Ensure we did not receive any output.  If the breakpoint was not set, we would
889        # see output (from a launched process with captured stdio) printing a hello, world message.
890        # That would indicate the breakpoint didn't take.
891        self.assertEqual(len(context["O_content"]), 0)
892
893        # Verify that the PC for the main thread is where we expect it - right at the breakpoint address.
894        # This acts as a another validation on the register reading code.
895        self.reset_test_sequence()
896        self.test_sequence.add_log_lines(
897            [
898                # Print the PC.  This should match the breakpoint address.
899                "read packet: $p{0:x}#00".format(pc_lldb_reg_index),
900                # Capture $p results.
901                {"direction": "send",
902                 "regex": r"^\$([0-9a-fA-F]+)#",
903                 "capture": {1: "p_response"}},
904            ], True)
905
906        context = self.expect_gdbremote_sequence()
907        self.assertIsNotNone(context)
908
909        # Verify the PC is where we expect.  Note response is in endianness of
910        # the inferior.
911        p_response = context.get("p_response")
912        self.assertIsNotNone(p_response)
913
914        # Convert from target endian to int.
915        returned_pc = lldbgdbserverutils.unpack_register_hex_unsigned(
916            endian, p_response)
917        self.assertEqual(returned_pc, function_address)
918
919        # Verify that a breakpoint remove and continue gets us the expected
920        # output.
921        self.reset_test_sequence()
922
923        # Add breakpoint remove packets
924        self.add_remove_breakpoint_packets(
925            function_address,
926            z_packet_type,
927            breakpoint_kind=BREAKPOINT_KIND)
928
929        self.test_sequence.add_log_lines(
930            [
931                # Continue running.
932                "read packet: $c#63",
933                # We should now receive the output from the call.
934                {"type": "output_match", "regex": r"^hello, world\r\n$"},
935                # And wait for program completion.
936                {"direction": "send", "regex": r"^\$W00(.*)#[0-9a-fA-F]{2}$"},
937            ], True)
938
939        context = self.expect_gdbremote_sequence()
940        self.assertIsNotNone(context)
941
942    @skipIfWindows # No pty support to test any inferior output
943    def test_software_breakpoint_set_and_remove_work(self):
944        if self.getArchitecture() == "arm":
945            # TODO: Handle case when setting breakpoint in thumb code
946            self.build(dictionary={'CFLAGS_EXTRAS': '-marm'})
947        else:
948            self.build()
949        self.set_inferior_startup_launch()
950        self.breakpoint_set_and_remove_work(want_hardware=False)
951
952    @skipUnlessPlatform(oslist=['linux'])
953    @skipIf(archs=no_match(['arm', 'aarch64']))
954    def test_hardware_breakpoint_set_and_remove_work(self):
955        if self.getArchitecture() == "arm":
956            # TODO: Handle case when setting breakpoint in thumb code
957            self.build(dictionary={'CFLAGS_EXTRAS': '-marm'})
958        else:
959            self.build()
960        self.set_inferior_startup_launch()
961        self.breakpoint_set_and_remove_work(want_hardware=True)
962
963    def get_qSupported_dict(self, features=[]):
964        self.build()
965        self.set_inferior_startup_launch()
966
967        # Start up the stub and start/prep the inferior.
968        procs = self.prep_debug_monitor_and_inferior()
969        self.add_qSupported_packets(features)
970
971        # Run the packet stream.
972        context = self.expect_gdbremote_sequence()
973        self.assertIsNotNone(context)
974
975        # Retrieve the qSupported features.
976        return self.parse_qSupported_response(context)
977
978    def test_qSupported_returns_known_stub_features(self):
979        supported_dict = self.get_qSupported_dict()
980        self.assertIsNotNone(supported_dict)
981        self.assertTrue(len(supported_dict) > 0)
982
983    def test_qSupported_auvx(self):
984        expected = ('+' if lldbplatformutil.getPlatform()
985                    in ["freebsd", "linux", "netbsd"] else '-')
986        supported_dict = self.get_qSupported_dict()
987        self.assertEqual(supported_dict.get('qXfer:auxv:read', '-'), expected)
988
989    def test_qSupported_libraries_svr4(self):
990        expected = ('+' if lldbplatformutil.getPlatform()
991                    in ["freebsd", "linux", "netbsd"] else '-')
992        supported_dict = self.get_qSupported_dict()
993        self.assertEqual(supported_dict.get('qXfer:libraries-svr4:read', '-'),
994                         expected)
995
996    def test_qSupported_QPassSignals(self):
997        expected = ('+' if lldbplatformutil.getPlatform()
998                    in ["freebsd", "linux", "netbsd"] else '-')
999        supported_dict = self.get_qSupported_dict()
1000        self.assertEqual(supported_dict.get('QPassSignals', '-'), expected)
1001
1002    @add_test_categories(["fork"])
1003    def test_qSupported_fork_events(self):
1004        supported_dict = (
1005            self.get_qSupported_dict(['multiprocess+', 'fork-events+']))
1006        self.assertEqual(supported_dict.get('multiprocess', '-'), '+')
1007        self.assertEqual(supported_dict.get('fork-events', '-'), '+')
1008        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
1009
1010    @add_test_categories(["fork"])
1011    def test_qSupported_fork_events_without_multiprocess(self):
1012        supported_dict = (
1013            self.get_qSupported_dict(['fork-events+']))
1014        self.assertEqual(supported_dict.get('multiprocess', '-'), '-')
1015        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
1016        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
1017
1018    @add_test_categories(["fork"])
1019    def test_qSupported_vfork_events(self):
1020        supported_dict = (
1021            self.get_qSupported_dict(['multiprocess+', 'vfork-events+']))
1022        self.assertEqual(supported_dict.get('multiprocess', '-'), '+')
1023        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
1024        self.assertEqual(supported_dict.get('vfork-events', '-'), '+')
1025
1026    @add_test_categories(["fork"])
1027    def test_qSupported_vfork_events_without_multiprocess(self):
1028        supported_dict = (
1029            self.get_qSupported_dict(['vfork-events+']))
1030        self.assertEqual(supported_dict.get('multiprocess', '-'), '-')
1031        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
1032        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
1033
1034    # We need to be able to self.runCmd to get cpuinfo,
1035    # which is not possible when using a remote platform.
1036    @skipIfRemote
1037    def test_qSupported_memory_tagging(self):
1038        supported_dict = self.get_qSupported_dict()
1039        self.assertEqual(supported_dict.get("memory-tagging", '-'),
1040                         '+' if self.isAArch64MTE() else '-')
1041
1042    @skipIfWindows # No pty support to test any inferior output
1043    def test_written_M_content_reads_back_correctly(self):
1044        self.build()
1045        self.set_inferior_startup_launch()
1046
1047        TEST_MESSAGE = "Hello, memory"
1048
1049        # Start up the stub and start/prep the inferior.
1050        procs = self.prep_debug_monitor_and_inferior(
1051            inferior_args=[
1052                "set-message:xxxxxxxxxxxxxX",
1053                "get-data-address-hex:g_message",
1054                "sleep:1",
1055                "print-message:"])
1056        self.test_sequence.add_log_lines(
1057            [
1058                # Start running after initial stop.
1059                "read packet: $c#63",
1060                # Match output line that prints the memory address of the message buffer within the inferior.
1061                # Note we require launch-only testing so we can get inferior otuput.
1062                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
1063                 "capture": {1: "message_address"}},
1064                # Now stop the inferior.
1065                "read packet: {}".format(chr(3)),
1066                # And wait for the stop notification.
1067                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
1068            True)
1069        context = self.expect_gdbremote_sequence()
1070        self.assertIsNotNone(context)
1071
1072        # Grab the message address.
1073        self.assertIsNotNone(context.get("message_address"))
1074        message_address = int(context.get("message_address"), 16)
1075
1076        # Hex-encode the test message, adding null termination.
1077        hex_encoded_message = seven.hexlify(TEST_MESSAGE)
1078
1079        # Write the message to the inferior. Verify that we can read it with the hex-encoded (m)
1080        # and binary (x) memory read packets.
1081        self.reset_test_sequence()
1082        self.test_sequence.add_log_lines(
1083            ["read packet: $M{0:x},{1:x}:{2}#00".format(message_address, len(TEST_MESSAGE), hex_encoded_message),
1084             "send packet: $OK#00",
1085             "read packet: $m{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)),
1086             "send packet: ${0}#00".format(hex_encoded_message),
1087             "read packet: $x{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)),
1088             "send packet: ${0}#00".format(TEST_MESSAGE),
1089             "read packet: $m{0:x},4#00".format(message_address),
1090             "send packet: ${0}#00".format(hex_encoded_message[0:8]),
1091             "read packet: $x{0:x},4#00".format(message_address),
1092             "send packet: ${0}#00".format(TEST_MESSAGE[0:4]),
1093             "read packet: $c#63",
1094             {"type": "output_match", "regex": r"^message: (.+)\r\n$", "capture": {1: "printed_message"}},
1095             "send packet: $W00#00",
1096             ], True)
1097        context = self.expect_gdbremote_sequence()
1098        self.assertIsNotNone(context)
1099
1100        # Ensure what we read from inferior memory is what we wrote.
1101        printed_message = context.get("printed_message")
1102        self.assertIsNotNone(printed_message)
1103        self.assertEqual(printed_message, TEST_MESSAGE + "X")
1104
1105    # Note: as of this moment, a hefty number of the GPR writes are failing with E32 (everything except rax-rdx, rdi, rsi, rbp).
1106    # Come back to this.  I have the test rigged to verify that at least some
1107    # of the bit-flip writes work.
1108    def test_P_writes_all_gpr_registers(self):
1109        self.build()
1110        self.set_inferior_startup_launch()
1111
1112        # Start inferior debug session, grab all register info.
1113        procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"])
1114        self.add_register_info_collection_packets()
1115        self.add_process_info_collection_packets()
1116
1117        context = self.expect_gdbremote_sequence()
1118        self.assertIsNotNone(context)
1119
1120        # Process register infos.
1121        reg_infos = self.parse_register_info_packets(context)
1122        self.assertIsNotNone(reg_infos)
1123        self.add_lldb_register_index(reg_infos)
1124
1125        # Process endian.
1126        process_info = self.parse_process_info_response(context)
1127        endian = process_info.get("endian")
1128        self.assertIsNotNone(endian)
1129
1130        # Pull out the register infos that we think we can bit flip
1131        # successfully,.
1132        gpr_reg_infos = [
1133            reg_info for reg_info in reg_infos if self.is_bit_flippable_register(reg_info)]
1134        self.assertTrue(len(gpr_reg_infos) > 0)
1135
1136        # Write flipped bit pattern of existing value to each register.
1137        (successful_writes, failed_writes) = self.flip_all_bits_in_each_register_value(
1138            gpr_reg_infos, endian)
1139        self.trace("successful writes: {}, failed writes: {}".format(successful_writes, failed_writes))
1140        self.assertTrue(successful_writes > 0)
1141
1142    # Note: as of this moment, a hefty number of the GPR writes are failing
1143    # with E32 (everything except rax-rdx, rdi, rsi, rbp).
1144    @skipIfWindows
1145    def test_P_and_p_thread_suffix_work(self):
1146        self.build()
1147        self.set_inferior_startup_launch()
1148
1149        # Startup the inferior with three threads.
1150        procs = self.prep_debug_monitor_and_inferior(
1151            inferior_args=["thread:new", "thread:new"])
1152        self.add_thread_suffix_request_packets()
1153        self.add_register_info_collection_packets()
1154        self.add_process_info_collection_packets()
1155
1156        context = self.expect_gdbremote_sequence()
1157        self.assertIsNotNone(context)
1158
1159        process_info = self.parse_process_info_response(context)
1160        self.assertIsNotNone(process_info)
1161        endian = process_info.get("endian")
1162        self.assertIsNotNone(endian)
1163
1164        reg_infos = self.parse_register_info_packets(context)
1165        self.assertIsNotNone(reg_infos)
1166        self.add_lldb_register_index(reg_infos)
1167
1168        reg_index = self.select_modifiable_register(reg_infos)
1169        self.assertIsNotNone(reg_index)
1170        reg_byte_size = int(reg_infos[reg_index]["bitsize"]) // 8
1171        self.assertTrue(reg_byte_size > 0)
1172
1173        # Run the process a bit so threads can start up, and collect register
1174        # info.
1175        context = self.run_process_then_stop(run_seconds=1)
1176        self.assertIsNotNone(context)
1177
1178        # Wait for 3 threads to be present.
1179        threads = self.wait_for_thread_count(3)
1180        self.assertEqual(len(threads), 3)
1181
1182        expected_reg_values = []
1183        register_increment = 1
1184        next_value = None
1185
1186        # Set the same register in each of 3 threads to a different value.
1187        # Verify each one has the unique value.
1188        for thread in threads:
1189            # If we don't have a next value yet, start it with the initial read
1190            # value + 1
1191            if not next_value:
1192                # Read pre-existing register value.
1193                self.reset_test_sequence()
1194                self.test_sequence.add_log_lines(
1195                    ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread),
1196                     {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1197                     ], True)
1198                context = self.expect_gdbremote_sequence()
1199                self.assertIsNotNone(context)
1200
1201                # Set the next value to use for writing as the increment plus
1202                # current value.
1203                p_response = context.get("p_response")
1204                self.assertIsNotNone(p_response)
1205                next_value = lldbgdbserverutils.unpack_register_hex_unsigned(
1206                    endian, p_response)
1207
1208            # Set new value using P and thread suffix.
1209            self.reset_test_sequence()
1210            self.test_sequence.add_log_lines(
1211                [
1212                    "read packet: $P{0:x}={1};thread:{2:x}#00".format(
1213                        reg_index,
1214                        lldbgdbserverutils.pack_register_hex(
1215                            endian,
1216                            next_value,
1217                            byte_size=reg_byte_size),
1218                        thread),
1219                    "send packet: $OK#00",
1220                ],
1221                True)
1222            context = self.expect_gdbremote_sequence()
1223            self.assertIsNotNone(context)
1224
1225            # Save the value we set.
1226            expected_reg_values.append(next_value)
1227
1228            # Increment value for next thread to use (we want them all
1229            # different so we can verify they wrote to each thread correctly
1230            # next.)
1231            next_value += register_increment
1232
1233        # Revisit each thread and verify they have the expected value set for
1234        # the register we wrote.
1235        thread_index = 0
1236        for thread in threads:
1237            # Read pre-existing register value.
1238            self.reset_test_sequence()
1239            self.test_sequence.add_log_lines(
1240                ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread),
1241                 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1242                 ], True)
1243            context = self.expect_gdbremote_sequence()
1244            self.assertIsNotNone(context)
1245
1246            # Get the register value.
1247            p_response = context.get("p_response")
1248            self.assertIsNotNone(p_response)
1249            read_value = lldbgdbserverutils.unpack_register_hex_unsigned(
1250                endian, p_response)
1251
1252            # Make sure we read back what we wrote.
1253            self.assertEqual(read_value, expected_reg_values[thread_index])
1254            thread_index += 1
1255
1256    @skipIfWindows # No pty support to test any inferior output
1257    @add_test_categories(["llgs"])
1258    def test_launch_via_A(self):
1259        self.build()
1260        exe_path = self.getBuildArtifact("a.out")
1261        args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"]
1262        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1263
1264        server = self.connect_to_debug_monitor()
1265        self.assertIsNotNone(server)
1266        self.do_handshake()
1267        # NB: strictly speaking we should use %x here but this packet
1268        # is deprecated, so no point in changing lldb-server's expectations
1269        self.test_sequence.add_log_lines(
1270            ["read packet: $A %d,0,%s,%d,1,%s,%d,2,%s,%d,3,%s#00" %
1271             tuple(itertools.chain.from_iterable(
1272                 [(len(x), x) for x in hex_args])),
1273             "send packet: $OK#00",
1274             "read packet: $c#00",
1275             "send packet: $W00#00"],
1276            True)
1277        context = self.expect_gdbremote_sequence()
1278        self.assertEqual(context["O_content"],
1279                         b'arg1\r\narg2\r\narg3\r\n')
1280
1281    @skipIfWindows # No pty support to test any inferior output
1282    @add_test_categories(["llgs"])
1283    def test_launch_via_vRun(self):
1284        self.build()
1285        exe_path = self.getBuildArtifact("a.out")
1286        args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"]
1287        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1288
1289        server = self.connect_to_debug_monitor()
1290        self.assertIsNotNone(server)
1291        self.do_handshake()
1292        self.test_sequence.add_log_lines(
1293            ["read packet: $vRun;%s;%s;%s;%s#00" % tuple(hex_args),
1294             {"direction": "send",
1295              "regex": r"^\$T([0-9a-fA-F]+)"},
1296             "read packet: $c#00",
1297             "send packet: $W00#00"],
1298            True)
1299        context = self.expect_gdbremote_sequence()
1300        self.assertEqual(context["O_content"],
1301                         b'arg1\r\narg2\r\narg3\r\n')
1302
1303    @add_test_categories(["llgs"])
1304    def test_launch_via_vRun_no_args(self):
1305        self.build()
1306        exe_path = self.getBuildArtifact("a.out")
1307        hex_path = binascii.b2a_hex(exe_path.encode()).decode()
1308
1309        server = self.connect_to_debug_monitor()
1310        self.assertIsNotNone(server)
1311        self.do_handshake()
1312        self.test_sequence.add_log_lines(
1313            ["read packet: $vRun;%s#00" % (hex_path,),
1314             {"direction": "send",
1315              "regex": r"^\$T([0-9a-fA-F]+)"},
1316             "read packet: $c#00",
1317             "send packet: $W00#00"],
1318            True)
1319        self.expect_gdbremote_sequence()
1320
1321    @skipIfWindows # No pty support to test any inferior output
1322    @add_test_categories(["llgs"])
1323    def test_QEnvironment(self):
1324        self.build()
1325        exe_path = self.getBuildArtifact("a.out")
1326        env = {"FOO": "test", "BAR": "a=z"}
1327        args = [exe_path, "print-env:FOO", "print-env:BAR"]
1328        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1329
1330        server = self.connect_to_debug_monitor()
1331        self.assertIsNotNone(server)
1332        self.do_handshake()
1333
1334        for key, value in env.items():
1335            self.test_sequence.add_log_lines(
1336                ["read packet: $QEnvironment:%s=%s#00" % (key, value),
1337                 "send packet: $OK#00"],
1338                True)
1339        self.test_sequence.add_log_lines(
1340            ["read packet: $vRun;%s#00" % (";".join(hex_args),),
1341             {"direction": "send",
1342              "regex": r"^\$T([0-9a-fA-F]+)"},
1343             "read packet: $c#00",
1344             "send packet: $W00#00"],
1345            True)
1346        context = self.expect_gdbremote_sequence()
1347        self.assertEqual(context["O_content"], b"test\r\na=z\r\n")
1348
1349    @skipIfWindows # No pty support to test any inferior output
1350    @add_test_categories(["llgs"])
1351    def test_QEnvironmentHexEncoded(self):
1352        self.build()
1353        exe_path = self.getBuildArtifact("a.out")
1354        env = {"FOO": "test", "BAR": "a=z", "BAZ": "a*}#z"}
1355        args = [exe_path, "print-env:FOO", "print-env:BAR", "print-env:BAZ"]
1356        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1357
1358        server = self.connect_to_debug_monitor()
1359        self.assertIsNotNone(server)
1360        self.do_handshake()
1361
1362        for key, value in env.items():
1363            hex_enc = binascii.b2a_hex(("%s=%s" % (key, value)).encode()).decode()
1364            self.test_sequence.add_log_lines(
1365                ["read packet: $QEnvironmentHexEncoded:%s#00" % (hex_enc,),
1366                 "send packet: $OK#00"],
1367                True)
1368        self.test_sequence.add_log_lines(
1369            ["read packet: $vRun;%s#00" % (";".join(hex_args),),
1370             {"direction": "send",
1371              "regex": r"^\$T([0-9a-fA-F]+)"},
1372             "read packet: $c#00",
1373             "send packet: $W00#00"],
1374            True)
1375        context = self.expect_gdbremote_sequence()
1376        self.assertEqual(context["O_content"], b"test\r\na=z\r\na*}#z\r\n")
1377