1"""
2Test case for testing the gdbremote protocol.
3
4Tests run against debugserver and lldb-server (llgs).
5lldb-server tests run where the lldb-server exe is
6available.
7
8This class will be broken into smaller test case classes by
9gdb remote packet functional areas.  For now it contains
10the initial set of tests implemented.
11"""
12
13import binascii
14import itertools
15
16import unittest2
17import gdbremote_testcase
18import lldbgdbserverutils
19from lldbsuite.support import seven
20from lldbsuite.test.decorators import *
21from lldbsuite.test.lldbtest import *
22from lldbsuite.test.lldbdwarf import *
23from lldbsuite.test import lldbutil, lldbplatformutil
24
25
26class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase, DwarfOpcodeParser):
27
28    mydir = TestBase.compute_mydir(__file__)
29
30    def test_thread_suffix_supported(self):
31        server = self.connect_to_debug_monitor()
32        self.assertIsNotNone(server)
33
34        self.do_handshake()
35        self.test_sequence.add_log_lines(
36            ["lldb-server <  26> read packet: $QThreadSuffixSupported#e4",
37             "lldb-server <   6> send packet: $OK#9a"],
38            True)
39
40        self.expect_gdbremote_sequence()
41
42
43    def test_list_threads_in_stop_reply_supported(self):
44        server = self.connect_to_debug_monitor()
45        self.assertIsNotNone(server)
46
47        self.do_handshake()
48        self.test_sequence.add_log_lines(
49            ["lldb-server <  27> read packet: $QListThreadsInStopReply#21",
50             "lldb-server <   6> send packet: $OK#9a"],
51            True)
52        self.expect_gdbremote_sequence()
53
54    def test_c_packet_works(self):
55        self.build()
56        procs = self.prep_debug_monitor_and_inferior()
57        self.test_sequence.add_log_lines(
58            ["read packet: $c#63",
59             "send packet: $W00#00"],
60            True)
61
62        self.expect_gdbremote_sequence()
63
64    @skipIfWindows # No pty support to test any inferior output
65    def test_inferior_print_exit(self):
66        self.build()
67        procs = self.prep_debug_monitor_and_inferior(
68                inferior_args=["hello, world"])
69        self.test_sequence.add_log_lines(
70            ["read packet: $vCont;c#a8",
71             {"type": "output_match", "regex": self.maybe_strict_output_regex(r"hello, world\r\n")},
72             "send packet: $W00#00"],
73            True)
74
75        context = self.expect_gdbremote_sequence()
76        self.assertIsNotNone(context)
77
78    def test_first_launch_stop_reply_thread_matches_first_qC(self):
79        self.build()
80        procs = self.prep_debug_monitor_and_inferior()
81        self.test_sequence.add_log_lines(["read packet: $qC#00",
82                                          {"direction": "send",
83                                           "regex": r"^\$QC([0-9a-fA-F]+)#",
84                                           "capture": {1: "thread_id_QC"}},
85                                          "read packet: $?#00",
86                                          {"direction": "send",
87                                              "regex": r"^\$T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+)",
88                                              "capture": {1: "thread_id_?"}}],
89                                         True)
90        context = self.expect_gdbremote_sequence()
91        self.assertEqual(context.get("thread_id_QC"), context.get("thread_id_?"))
92
93    def test_attach_commandline_continue_app_exits(self):
94        self.build()
95        self.set_inferior_startup_attach()
96        procs = self.prep_debug_monitor_and_inferior()
97        self.test_sequence.add_log_lines(
98            ["read packet: $vCont;c#a8",
99             "send packet: $W00#00"],
100            True)
101        self.expect_gdbremote_sequence()
102
103        # Wait a moment for completed and now-detached inferior process to
104        # clear.
105        time.sleep(1)
106
107        if not lldb.remote_platform:
108            # Process should be dead now. Reap results.
109            poll_result = procs["inferior"].poll()
110            self.assertIsNotNone(poll_result)
111
112        # Where possible, verify at the system level that the process is not
113        # running.
114        self.assertFalse(
115            lldbgdbserverutils.process_is_running(
116                procs["inferior"].pid, False))
117
118    def test_qRegisterInfo_returns_one_valid_result(self):
119        self.build()
120        self.prep_debug_monitor_and_inferior()
121        self.test_sequence.add_log_lines(
122            ["read packet: $qRegisterInfo0#00",
123             {"direction": "send", "regex": r"^\$(.+);#[0-9A-Fa-f]{2}", "capture": {1: "reginfo_0"}}],
124            True)
125
126        # Run the stream
127        context = self.expect_gdbremote_sequence()
128        self.assertIsNotNone(context)
129
130        reg_info_packet = context.get("reginfo_0")
131        self.assertIsNotNone(reg_info_packet)
132        self.assert_valid_reg_info(
133            lldbgdbserverutils.parse_reg_info_response(reg_info_packet))
134
135    def test_qRegisterInfo_returns_all_valid_results(self):
136        self.build()
137        self.prep_debug_monitor_and_inferior()
138        self.add_register_info_collection_packets()
139
140        # Run the stream.
141        context = self.expect_gdbremote_sequence()
142        self.assertIsNotNone(context)
143
144        # Validate that each register info returned validates.
145        for reg_info in self.parse_register_info_packets(context):
146            self.assert_valid_reg_info(reg_info)
147
148    def test_qRegisterInfo_contains_required_generics_debugserver(self):
149        self.build()
150        self.prep_debug_monitor_and_inferior()
151        self.add_register_info_collection_packets()
152
153        # Run the packet stream.
154        context = self.expect_gdbremote_sequence()
155        self.assertIsNotNone(context)
156
157        # Gather register info entries.
158        reg_infos = self.parse_register_info_packets(context)
159
160        # Collect all generic registers found.
161        generic_regs = {
162            reg_info['generic']: 1 for reg_info in reg_infos if 'generic' in reg_info}
163
164        # Ensure we have a program counter register.
165        self.assertIn('pc', generic_regs)
166
167        # Ensure we have a frame pointer register. PPC64le's FP is the same as SP
168        if self.getArchitecture() != 'powerpc64le':
169            self.assertIn('fp', generic_regs)
170
171        # Ensure we have a stack pointer register.
172        self.assertIn('sp', generic_regs)
173
174        # Ensure we have a flags register.
175        self.assertIn('flags', generic_regs)
176
177    def test_qRegisterInfo_contains_at_least_one_register_set(self):
178        self.build()
179        self.prep_debug_monitor_and_inferior()
180        self.add_register_info_collection_packets()
181
182        # Run the packet stream.
183        context = self.expect_gdbremote_sequence()
184        self.assertIsNotNone(context)
185
186        # Gather register info entries.
187        reg_infos = self.parse_register_info_packets(context)
188
189        # Collect all register sets found.
190        register_sets = {
191            reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info}
192        self.assertTrue(len(register_sets) >= 1)
193
194    def targetHasAVX(self):
195        triple = self.dbg.GetSelectedPlatform().GetTriple()
196
197        # TODO other platforms, please implement this function
198        if not re.match(".*-.*-linux", triple):
199            return True
200
201        # Need to do something different for non-Linux/Android targets
202        if lldb.remote_platform:
203            self.runCmd('platform get-file "/proc/cpuinfo" "cpuinfo"')
204            cpuinfo_path = "cpuinfo"
205            self.addTearDownHook(lambda: os.unlink("cpuinfo"))
206        else:
207            cpuinfo_path = "/proc/cpuinfo"
208
209        f = open(cpuinfo_path, 'r')
210        cpuinfo = f.read()
211        f.close()
212        return " avx " in cpuinfo
213
214    @expectedFailureAll(oslist=["windows"]) # no avx for now.
215    @skipIf(archs=no_match(['amd64', 'i386', 'x86_64']))
216    @add_test_categories(["llgs"])
217    def test_qRegisterInfo_contains_avx_registers(self):
218        self.build()
219        self.prep_debug_monitor_and_inferior()
220        self.add_register_info_collection_packets()
221
222        # Run the packet stream.
223        context = self.expect_gdbremote_sequence()
224        self.assertIsNotNone(context)
225
226        # Gather register info entries.
227        reg_infos = self.parse_register_info_packets(context)
228
229        # Collect all generics found.
230        register_sets = {
231            reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info}
232        self.assertEqual(
233            self.targetHasAVX(),
234            "Advanced Vector Extensions" in register_sets)
235
236    def qThreadInfo_contains_thread(self):
237        procs = self.prep_debug_monitor_and_inferior()
238        self.add_threadinfo_collection_packets()
239
240        # Run the packet stream.
241        context = self.expect_gdbremote_sequence()
242        self.assertIsNotNone(context)
243
244        # Gather threadinfo entries.
245        threads = self.parse_threadinfo_packets(context)
246        self.assertIsNotNone(threads)
247
248        # We should have exactly one thread.
249        self.assertEqual(len(threads), 1)
250
251    def test_qThreadInfo_contains_thread_launch(self):
252        self.build()
253        self.set_inferior_startup_launch()
254        self.qThreadInfo_contains_thread()
255
256    @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped
257    def test_qThreadInfo_contains_thread_attach(self):
258        self.build()
259        self.set_inferior_startup_attach()
260        self.qThreadInfo_contains_thread()
261
262    def qThreadInfo_matches_qC(self):
263        procs = self.prep_debug_monitor_and_inferior()
264
265        self.add_threadinfo_collection_packets()
266        self.test_sequence.add_log_lines(
267            ["read packet: $qC#00",
268             {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}
269             ], True)
270
271        # Run the packet stream.
272        context = self.expect_gdbremote_sequence()
273        self.assertIsNotNone(context)
274
275        # Gather threadinfo entries.
276        threads = self.parse_threadinfo_packets(context)
277        self.assertIsNotNone(threads)
278
279        # We should have exactly one thread from threadinfo.
280        self.assertEqual(len(threads), 1)
281
282        # We should have a valid thread_id from $QC.
283        QC_thread_id_hex = context.get("thread_id")
284        self.assertIsNotNone(QC_thread_id_hex)
285        QC_thread_id = int(QC_thread_id_hex, 16)
286
287        # Those two should be the same.
288        self.assertEqual(threads[0], QC_thread_id)
289
290    def test_qThreadInfo_matches_qC_launch(self):
291        self.build()
292        self.set_inferior_startup_launch()
293        self.qThreadInfo_matches_qC()
294
295    @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped
296    def test_qThreadInfo_matches_qC_attach(self):
297        self.build()
298        self.set_inferior_startup_attach()
299        self.qThreadInfo_matches_qC()
300
301    def test_p_returns_correct_data_size_for_each_qRegisterInfo_launch(self):
302        self.build()
303        self.set_inferior_startup_launch()
304        procs = self.prep_debug_monitor_and_inferior()
305        self.add_register_info_collection_packets()
306
307        # Run the packet stream.
308        context = self.expect_gdbremote_sequence()
309        self.assertIsNotNone(context)
310
311        # Gather register info entries.
312        reg_infos = self.parse_register_info_packets(context)
313        self.assertIsNotNone(reg_infos)
314        self.assertTrue(len(reg_infos) > 0)
315
316        byte_order = self.get_target_byte_order()
317
318        # Read value for each register.
319        reg_index = 0
320        for reg_info in reg_infos:
321            # Skip registers that don't have a register set.  For x86, these are
322            # the DRx registers, which have no LLDB-kind register number and thus
323            # cannot be read via normal
324            # NativeRegisterContext::ReadRegister(reg_info,...) calls.
325            if not "set" in reg_info:
326                continue
327
328            # Clear existing packet expectations.
329            self.reset_test_sequence()
330
331            # Run the register query
332            self.test_sequence.add_log_lines(
333                ["read packet: $p{0:x}#00".format(reg_index),
334                 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}],
335                True)
336            context = self.expect_gdbremote_sequence()
337            self.assertIsNotNone(context)
338
339            # Verify the response length.
340            p_response = context.get("p_response")
341            self.assertIsNotNone(p_response)
342
343            # Skip erraneous (unsupported) registers.
344            # TODO: remove this once we make unsupported registers disappear.
345            if p_response.startswith("E") and len(p_response) == 3:
346                continue
347
348            if "dynamic_size_dwarf_expr_bytes" in reg_info:
349                self.updateRegInfoBitsize(reg_info, byte_order)
350            self.assertEqual(len(p_response), 2 * int(reg_info["bitsize"]) / 8,
351                             reg_info)
352
353            # Increment loop
354            reg_index += 1
355
356    def Hg_switches_to_3_threads(self, pass_pid=False):
357        # Startup the inferior with three threads (main + 2 new ones).
358        procs = self.prep_debug_monitor_and_inferior(
359            inferior_args=["thread:new", "thread:new"])
360
361        # Let the inferior process have a few moments to start up the thread
362        # when launched.  (The launch scenario has no time to run, so threads
363        # won't be there yet.)
364        self.run_process_then_stop(run_seconds=1)
365
366        # Wait at most x seconds for 3 threads to be present.
367        threads = self.wait_for_thread_count(3)
368        self.assertEqual(len(threads), 3)
369
370        pid_str = ""
371        if pass_pid:
372            pid_str = "p{0:x}.".format(procs["inferior"].pid)
373
374        # verify we can $H to each thead, and $qC matches the thread we set.
375        for thread in threads:
376            # Change to each thread, verify current thread id.
377            self.reset_test_sequence()
378            self.test_sequence.add_log_lines(
379                ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread),  # Set current thread.
380                 "send packet: $OK#00",
381                 "read packet: $qC#00",
382                 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}],
383                True)
384
385            context = self.expect_gdbremote_sequence()
386            self.assertIsNotNone(context)
387
388            # Verify the thread id.
389            self.assertIsNotNone(context.get("thread_id"))
390            self.assertEqual(int(context.get("thread_id"), 16), thread)
391
392    @expectedFailureAll(oslist=["windows"]) # expect 4 threads
393    def test_Hg_switches_to_3_threads_launch(self):
394        self.build()
395        self.set_inferior_startup_launch()
396        self.Hg_switches_to_3_threads()
397
398    @expectedFailureAll(oslist=["windows"]) # expecting one more thread
399    def test_Hg_switches_to_3_threads_attach(self):
400        self.build()
401        self.set_inferior_startup_attach()
402        self.Hg_switches_to_3_threads()
403
404    @expectedFailureAll(oslist=["windows"]) # expect 4 threads
405    @add_test_categories(["llgs"])
406    def test_Hg_switches_to_3_threads_attach_pass_correct_pid(self):
407        self.build()
408        self.set_inferior_startup_attach()
409        self.Hg_switches_to_3_threads(pass_pid=True)
410
411    def Hg_fails_on_pid(self, pass_pid):
412        # Start the inferior.
413        procs = self.prep_debug_monitor_and_inferior(
414            inferior_args=["thread:new"])
415
416        self.run_process_then_stop(run_seconds=1)
417
418        threads = self.wait_for_thread_count(2)
419        self.assertEqual(len(threads), 2)
420
421        if pass_pid == -1:
422            pid_str = "p-1."
423        else:
424            pid_str = "p{0:x}.".format(pass_pid)
425        thread = threads[1]
426
427        self.test_sequence.add_log_lines(
428            ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread),  # Set current thread.
429             "send packet: $Eff#00"],
430            True)
431
432        self.expect_gdbremote_sequence()
433
434    @expectedFailureAll(oslist=["windows"])
435    @add_test_categories(["llgs"])
436    def test_Hg_fails_on_another_pid(self):
437        self.build()
438        self.set_inferior_startup_launch()
439        self.Hg_fails_on_pid(1)
440
441    @expectedFailureAll(oslist=["windows"])
442    @add_test_categories(["llgs"])
443    def test_Hg_fails_on_zero_pid(self):
444        self.build()
445        self.set_inferior_startup_launch()
446        self.Hg_fails_on_pid(0)
447
448    @expectedFailureAll(oslist=["windows"])
449    @add_test_categories(["llgs"])
450    def test_Hg_fails_on_minus_one_pid(self):
451        self.build()
452        self.set_inferior_startup_launch()
453        self.Hg_fails_on_pid(-1)
454
455    def Hc_then_Csignal_signals_correct_thread(self, segfault_signo):
456        # NOTE only run this one in inferior-launched mode: we can't grab inferior stdout when running attached,
457        # and the test requires getting stdout from the exe.
458
459        NUM_THREADS = 3
460
461        # Startup the inferior with three threads (main + NUM_THREADS-1 worker threads).
462        # inferior_args=["thread:print-ids"]
463        inferior_args = ["thread:segfault"]
464        for i in range(NUM_THREADS - 1):
465            # if i > 0:
466                # Give time between thread creation/segfaulting for the handler to work.
467                # inferior_args.append("sleep:1")
468            inferior_args.append("thread:new")
469        inferior_args.append("sleep:10")
470
471        # Launch/attach.  (In our case, this should only ever be launched since
472        # we need inferior stdout/stderr).
473        procs = self.prep_debug_monitor_and_inferior(
474            inferior_args=inferior_args)
475        self.test_sequence.add_log_lines(["read packet: $c#63"], True)
476        context = self.expect_gdbremote_sequence()
477
478        # Let the inferior process have a few moments to start up the thread when launched.
479        # context = self.run_process_then_stop(run_seconds=1)
480
481        # Wait at most x seconds for all threads to be present.
482        # threads = self.wait_for_thread_count(NUM_THREADS)
483        # self.assertEquals(len(threads), NUM_THREADS)
484
485        signaled_tids = {}
486        print_thread_ids = {}
487
488        # Switch to each thread, deliver a signal, and verify signal delivery
489        for i in range(NUM_THREADS - 1):
490            # Run until SIGSEGV comes in.
491            self.reset_test_sequence()
492            self.test_sequence.add_log_lines([{"direction": "send",
493                                               "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
494                                               "capture": {1: "signo",
495                                                            2: "thread_id"}}],
496                                             True)
497
498            context = self.expect_gdbremote_sequence()
499            self.assertIsNotNone(context)
500            signo = context.get("signo")
501            self.assertEqual(int(signo, 16), segfault_signo)
502
503            # Ensure we haven't seen this tid yet.
504            thread_id = int(context.get("thread_id"), 16)
505            self.assertNotIn(thread_id, signaled_tids)
506            signaled_tids[thread_id] = 1
507
508            # Send SIGUSR1 to the thread that signaled the SIGSEGV.
509            self.reset_test_sequence()
510            self.test_sequence.add_log_lines(
511                [
512                    # Set the continue thread.
513                    # Set current thread.
514                    "read packet: $Hc{0:x}#00".format(thread_id),
515                    "send packet: $OK#00",
516
517                    # Continue sending the signal number to the continue thread.
518                    # The commented out packet is a way to do this same operation without using
519                    # a $Hc (but this test is testing $Hc, so we'll stick with the former).
520                    "read packet: $C{0:x}#00".format(lldbutil.get_signal_number('SIGUSR1')),
521                    # "read packet: $vCont;C{0:x}:{1:x};c#00".format(lldbutil.get_signal_number('SIGUSR1'), thread_id),
522
523                    # FIXME: Linux does not report the thread stop on the delivered signal (SIGUSR1 here).  MacOSX debugserver does.
524                    # But MacOSX debugserver isn't guaranteeing the thread the signal handler runs on, so currently its an XFAIL.
525                    # Need to rectify behavior here.  The linux behavior is more intuitive to me since we're essentially swapping out
526                    # an about-to-be-delivered signal (for which we already sent a stop packet) to a different signal.
527                    # {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
528                    #  "read packet: $c#63",
529                    {"type": "output_match", "regex": r"^received SIGUSR1 on thread id: ([0-9a-fA-F]+)\r\nthread ([0-9a-fA-F]+): past SIGSEGV\r\n", "capture": {1: "print_thread_id", 2: "post_handle_thread_id"}},
530                ],
531                True)
532
533            # Run the sequence.
534            context = self.expect_gdbremote_sequence()
535            self.assertIsNotNone(context)
536
537            # Ensure the stop signal is the signal we delivered.
538            # stop_signo = context.get("stop_signo")
539            # self.assertIsNotNone(stop_signo)
540            # self.assertEquals(int(stop_signo,16), lldbutil.get_signal_number('SIGUSR1'))
541
542            # Ensure the stop thread is the thread to which we delivered the signal.
543            # stop_thread_id = context.get("stop_thread_id")
544            # self.assertIsNotNone(stop_thread_id)
545            # self.assertEquals(int(stop_thread_id,16), thread_id)
546
547            # Ensure we haven't seen this thread id yet.  The inferior's
548            # self-obtained thread ids are not guaranteed to match the stub
549            # tids (at least on MacOSX).
550            print_thread_id = context.get("print_thread_id")
551            self.assertIsNotNone(print_thread_id)
552            print_thread_id = int(print_thread_id, 16)
553            self.assertNotIn(print_thread_id, print_thread_ids)
554
555            # Now remember this print (i.e. inferior-reflected) thread id and
556            # ensure we don't hit it again.
557            print_thread_ids[print_thread_id] = 1
558
559            # Ensure post signal-handle thread id matches the thread that
560            # initially raised the SIGSEGV.
561            post_handle_thread_id = context.get("post_handle_thread_id")
562            self.assertIsNotNone(post_handle_thread_id)
563            post_handle_thread_id = int(post_handle_thread_id, 16)
564            self.assertEqual(post_handle_thread_id, print_thread_id)
565
566    @expectedFailureDarwin
567    @skipIfWindows # no SIGSEGV support
568    @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48419")
569    @expectedFailureNetBSD
570    def test_Hc_then_Csignal_signals_correct_thread_launch(self):
571        self.build()
572        self.set_inferior_startup_launch()
573
574        if self.platformIsDarwin():
575            # Darwin debugserver translates some signals like SIGSEGV into some gdb
576            # expectations about fixed signal numbers.
577            self.Hc_then_Csignal_signals_correct_thread(self.TARGET_EXC_BAD_ACCESS)
578        else:
579            self.Hc_then_Csignal_signals_correct_thread(
580                lldbutil.get_signal_number('SIGSEGV'))
581
582    @skipIfWindows # No pty support to test any inferior output
583    def test_m_packet_reads_memory(self):
584        self.build()
585        self.set_inferior_startup_launch()
586        # This is the memory we will write into the inferior and then ensure we
587        # can read back with $m.
588        MEMORY_CONTENTS = "Test contents 0123456789 ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz"
589
590        # Start up the inferior.
591        procs = self.prep_debug_monitor_and_inferior(
592            inferior_args=[
593                "set-message:%s" %
594                MEMORY_CONTENTS,
595                "get-data-address-hex:g_message",
596                "sleep:5"])
597
598        # Run the process
599        self.test_sequence.add_log_lines(
600            [
601                # Start running after initial stop.
602                "read packet: $c#63",
603                # Match output line that prints the memory address of the message buffer within the inferior.
604                # Note we require launch-only testing so we can get inferior otuput.
605                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
606                 "capture": {1: "message_address"}},
607                # Now stop the inferior.
608                "read packet: {}".format(chr(3)),
609                # And wait for the stop notification.
610                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
611            True)
612
613        # Run the packet stream.
614        context = self.expect_gdbremote_sequence()
615        self.assertIsNotNone(context)
616
617        # Grab the message address.
618        self.assertIsNotNone(context.get("message_address"))
619        message_address = int(context.get("message_address"), 16)
620
621        # Grab contents from the inferior.
622        self.reset_test_sequence()
623        self.test_sequence.add_log_lines(
624            ["read packet: $m{0:x},{1:x}#00".format(message_address, len(MEMORY_CONTENTS)),
625             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "read_contents"}}],
626            True)
627
628        # Run the packet stream.
629        context = self.expect_gdbremote_sequence()
630        self.assertIsNotNone(context)
631
632        # Ensure what we read from inferior memory is what we wrote.
633        self.assertIsNotNone(context.get("read_contents"))
634        read_contents = seven.unhexlify(context.get("read_contents"))
635        self.assertEqual(read_contents, MEMORY_CONTENTS)
636
637    def test_qMemoryRegionInfo_is_supported(self):
638        self.build()
639        self.set_inferior_startup_launch()
640        # Start up the inferior.
641        procs = self.prep_debug_monitor_and_inferior()
642
643        # Ask if it supports $qMemoryRegionInfo.
644        self.test_sequence.add_log_lines(
645            ["read packet: $qMemoryRegionInfo#00",
646             "send packet: $OK#00"
647             ], True)
648        self.expect_gdbremote_sequence()
649
650    @skipIfWindows # No pty support to test any inferior output
651    def test_qMemoryRegionInfo_reports_code_address_as_executable(self):
652        self.build()
653        self.set_inferior_startup_launch()
654
655        # Start up the inferior.
656        procs = self.prep_debug_monitor_and_inferior(
657            inferior_args=["get-code-address-hex:hello", "sleep:5"])
658
659        # Run the process
660        self.test_sequence.add_log_lines(
661            [
662                # Start running after initial stop.
663                "read packet: $c#63",
664                # Match output line that prints the memory address of the message buffer within the inferior.
665                # Note we require launch-only testing so we can get inferior otuput.
666                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"),
667                 "capture": {1: "code_address"}},
668                # Now stop the inferior.
669                "read packet: {}".format(chr(3)),
670                # And wait for the stop notification.
671                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
672            True)
673
674        # Run the packet stream.
675        context = self.expect_gdbremote_sequence()
676        self.assertIsNotNone(context)
677
678        # Grab the code address.
679        self.assertIsNotNone(context.get("code_address"))
680        code_address = int(context.get("code_address"), 16)
681
682        # Grab memory region info from the inferior.
683        self.reset_test_sequence()
684        self.add_query_memory_region_packets(code_address)
685
686        # Run the packet stream.
687        context = self.expect_gdbremote_sequence()
688        self.assertIsNotNone(context)
689        mem_region_dict = self.parse_memory_region_packet(context)
690
691        # Ensure there are no errors reported.
692        self.assertNotIn("error", mem_region_dict)
693
694        # Ensure code address is readable and executable.
695        self.assertIn("permissions", mem_region_dict)
696        self.assertIn("r", mem_region_dict["permissions"])
697        self.assertIn("x", mem_region_dict["permissions"])
698
699        # Ensure the start address and size encompass the address we queried.
700        self.assert_address_within_memory_region(code_address, mem_region_dict)
701
702    @skipIfWindows # No pty support to test any inferior output
703    def test_qMemoryRegionInfo_reports_stack_address_as_rw(self):
704        self.build()
705        self.set_inferior_startup_launch()
706
707        # Start up the inferior.
708        procs = self.prep_debug_monitor_and_inferior(
709            inferior_args=["get-stack-address-hex:", "sleep:5"])
710
711        # Run the process
712        self.test_sequence.add_log_lines(
713            [
714                # Start running after initial stop.
715                "read packet: $c#63",
716                # Match output line that prints the memory address of the message buffer within the inferior.
717                # Note we require launch-only testing so we can get inferior otuput.
718                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"stack address: 0x([0-9a-fA-F]+)\r\n"),
719                 "capture": {1: "stack_address"}},
720                # Now stop the inferior.
721                "read packet: {}".format(chr(3)),
722                # And wait for the stop notification.
723                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
724            True)
725
726        # Run the packet stream.
727        context = self.expect_gdbremote_sequence()
728        self.assertIsNotNone(context)
729
730        # Grab the address.
731        self.assertIsNotNone(context.get("stack_address"))
732        stack_address = int(context.get("stack_address"), 16)
733
734        # Grab memory region info from the inferior.
735        self.reset_test_sequence()
736        self.add_query_memory_region_packets(stack_address)
737
738        # Run the packet stream.
739        context = self.expect_gdbremote_sequence()
740        self.assertIsNotNone(context)
741        mem_region_dict = self.parse_memory_region_packet(context)
742
743        # Ensure there are no errors reported.
744        self.assertNotIn("error", mem_region_dict)
745
746        # Ensure address is readable and executable.
747        self.assertIn("permissions", mem_region_dict)
748        self.assertIn("r", mem_region_dict["permissions"])
749        self.assertIn("w", mem_region_dict["permissions"])
750
751        # Ensure the start address and size encompass the address we queried.
752        self.assert_address_within_memory_region(
753            stack_address, mem_region_dict)
754
755    @skipIfWindows # No pty support to test any inferior output
756    def test_qMemoryRegionInfo_reports_heap_address_as_rw(self):
757        self.build()
758        self.set_inferior_startup_launch()
759
760        # Start up the inferior.
761        procs = self.prep_debug_monitor_and_inferior(
762            inferior_args=["get-heap-address-hex:", "sleep:5"])
763
764        # Run the process
765        self.test_sequence.add_log_lines(
766            [
767                # Start running after initial stop.
768                "read packet: $c#63",
769                # Match output line that prints the memory address of the message buffer within the inferior.
770                # Note we require launch-only testing so we can get inferior otuput.
771                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"heap address: 0x([0-9a-fA-F]+)\r\n"),
772                 "capture": {1: "heap_address"}},
773                # Now stop the inferior.
774                "read packet: {}".format(chr(3)),
775                # And wait for the stop notification.
776                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
777            True)
778
779        # Run the packet stream.
780        context = self.expect_gdbremote_sequence()
781        self.assertIsNotNone(context)
782
783        # Grab the address.
784        self.assertIsNotNone(context.get("heap_address"))
785        heap_address = int(context.get("heap_address"), 16)
786
787        # Grab memory region info from the inferior.
788        self.reset_test_sequence()
789        self.add_query_memory_region_packets(heap_address)
790
791        # Run the packet stream.
792        context = self.expect_gdbremote_sequence()
793        self.assertIsNotNone(context)
794        mem_region_dict = self.parse_memory_region_packet(context)
795
796        # Ensure there are no errors reported.
797        self.assertNotIn("error", mem_region_dict)
798
799        # Ensure address is readable and executable.
800        self.assertIn("permissions", mem_region_dict)
801        self.assertIn("r", mem_region_dict["permissions"])
802        self.assertIn("w", mem_region_dict["permissions"])
803
804        # Ensure the start address and size encompass the address we queried.
805        self.assert_address_within_memory_region(heap_address, mem_region_dict)
806
807    def breakpoint_set_and_remove_work(self, want_hardware):
808        # Start up the inferior.
809        procs = self.prep_debug_monitor_and_inferior(
810            inferior_args=[
811                "get-code-address-hex:hello",
812                "sleep:1",
813                "call-function:hello"])
814
815        # Run the process
816        self.add_register_info_collection_packets()
817        self.add_process_info_collection_packets()
818        self.test_sequence.add_log_lines(
819            [  # Start running after initial stop.
820                "read packet: $c#63",
821                # Match output line that prints the memory address of the function call entry point.
822                # Note we require launch-only testing so we can get inferior otuput.
823                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"),
824                 "capture": {1: "function_address"}},
825                # Now stop the inferior.
826                "read packet: {}".format(chr(3)),
827                # And wait for the stop notification.
828                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
829            True)
830
831        # Run the packet stream.
832        context = self.expect_gdbremote_sequence()
833        self.assertIsNotNone(context)
834
835        # Gather process info - we need endian of target to handle register
836        # value conversions.
837        process_info = self.parse_process_info_response(context)
838        endian = process_info.get("endian")
839        self.assertIsNotNone(endian)
840
841        # Gather register info entries.
842        reg_infos = self.parse_register_info_packets(context)
843        (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_infos)
844        self.assertIsNotNone(pc_lldb_reg_index)
845        self.assertIsNotNone(pc_reg_info)
846
847        # Grab the function address.
848        self.assertIsNotNone(context.get("function_address"))
849        function_address = int(context.get("function_address"), 16)
850
851        # Get current target architecture
852        target_arch = self.getArchitecture()
853
854        # Set the breakpoint.
855        if (target_arch == "arm") or (target_arch == "aarch64"):
856            # TODO: Handle case when setting breakpoint in thumb code
857            BREAKPOINT_KIND = 4
858        else:
859            BREAKPOINT_KIND = 1
860
861        # Set default packet type to Z0 (software breakpoint)
862        z_packet_type = 0
863
864        # If hardware breakpoint is requested set packet type to Z1
865        if want_hardware == True:
866            z_packet_type = 1
867
868        self.reset_test_sequence()
869        self.add_set_breakpoint_packets(
870            function_address,
871            z_packet_type,
872            do_continue=True,
873            breakpoint_kind=BREAKPOINT_KIND)
874
875        # Run the packet stream.
876        context = self.expect_gdbremote_sequence()
877        self.assertIsNotNone(context)
878
879        # Verify the stop signal reported was the breakpoint signal number.
880        stop_signo = context.get("stop_signo")
881        self.assertIsNotNone(stop_signo)
882        self.assertEqual(int(stop_signo, 16),
883                         lldbutil.get_signal_number('SIGTRAP'))
884
885        # Ensure we did not receive any output.  If the breakpoint was not set, we would
886        # see output (from a launched process with captured stdio) printing a hello, world message.
887        # That would indicate the breakpoint didn't take.
888        self.assertEqual(len(context["O_content"]), 0)
889
890        # Verify that the PC for the main thread is where we expect it - right at the breakpoint address.
891        # This acts as a another validation on the register reading code.
892        self.reset_test_sequence()
893        self.test_sequence.add_log_lines(
894            [
895                # Print the PC.  This should match the breakpoint address.
896                "read packet: $p{0:x}#00".format(pc_lldb_reg_index),
897                # Capture $p results.
898                {"direction": "send",
899                 "regex": r"^\$([0-9a-fA-F]+)#",
900                 "capture": {1: "p_response"}},
901            ], True)
902
903        context = self.expect_gdbremote_sequence()
904        self.assertIsNotNone(context)
905
906        # Verify the PC is where we expect.  Note response is in endianness of
907        # the inferior.
908        p_response = context.get("p_response")
909        self.assertIsNotNone(p_response)
910
911        # Convert from target endian to int.
912        returned_pc = lldbgdbserverutils.unpack_register_hex_unsigned(
913            endian, p_response)
914        self.assertEqual(returned_pc, function_address)
915
916        # Verify that a breakpoint remove and continue gets us the expected
917        # output.
918        self.reset_test_sequence()
919
920        # Add breakpoint remove packets
921        self.add_remove_breakpoint_packets(
922            function_address,
923            z_packet_type,
924            breakpoint_kind=BREAKPOINT_KIND)
925
926        self.test_sequence.add_log_lines(
927            [
928                # Continue running.
929                "read packet: $c#63",
930                # We should now receive the output from the call.
931                {"type": "output_match", "regex": r"^hello, world\r\n$"},
932                # And wait for program completion.
933                {"direction": "send", "regex": r"^\$W00(.*)#[0-9a-fA-F]{2}$"},
934            ], True)
935
936        context = self.expect_gdbremote_sequence()
937        self.assertIsNotNone(context)
938
939    @skipIfWindows # No pty support to test any inferior output
940    def test_software_breakpoint_set_and_remove_work(self):
941        if self.getArchitecture() == "arm":
942            # TODO: Handle case when setting breakpoint in thumb code
943            self.build(dictionary={'CFLAGS_EXTRAS': '-marm'})
944        else:
945            self.build()
946        self.set_inferior_startup_launch()
947        self.breakpoint_set_and_remove_work(want_hardware=False)
948
949    @skipUnlessPlatform(oslist=['linux'])
950    @skipIf(archs=no_match(['arm', 'aarch64']))
951    def test_hardware_breakpoint_set_and_remove_work(self):
952        if self.getArchitecture() == "arm":
953            # TODO: Handle case when setting breakpoint in thumb code
954            self.build(dictionary={'CFLAGS_EXTRAS': '-marm'})
955        else:
956            self.build()
957        self.set_inferior_startup_launch()
958        self.breakpoint_set_and_remove_work(want_hardware=True)
959
960    def get_qSupported_dict(self, features=[]):
961        self.build()
962        self.set_inferior_startup_launch()
963
964        # Start up the stub and start/prep the inferior.
965        procs = self.prep_debug_monitor_and_inferior()
966        self.add_qSupported_packets(features)
967
968        # Run the packet stream.
969        context = self.expect_gdbremote_sequence()
970        self.assertIsNotNone(context)
971
972        # Retrieve the qSupported features.
973        return self.parse_qSupported_response(context)
974
975    def test_qSupported_returns_known_stub_features(self):
976        supported_dict = self.get_qSupported_dict()
977        self.assertIsNotNone(supported_dict)
978        self.assertTrue(len(supported_dict) > 0)
979
980    def test_qSupported_auvx(self):
981        expected = ('+' if lldbplatformutil.getPlatform()
982                    in ["freebsd", "linux", "netbsd"] else '-')
983        supported_dict = self.get_qSupported_dict()
984        self.assertEqual(supported_dict.get('qXfer:auxv:read', '-'), expected)
985
986    def test_qSupported_libraries_svr4(self):
987        expected = ('+' if lldbplatformutil.getPlatform()
988                    in ["freebsd", "linux", "netbsd"] else '-')
989        supported_dict = self.get_qSupported_dict()
990        self.assertEqual(supported_dict.get('qXfer:libraries-svr4:read', '-'),
991                         expected)
992
993    def test_qSupported_QPassSignals(self):
994        expected = ('+' if lldbplatformutil.getPlatform()
995                    in ["freebsd", "linux", "netbsd"] else '-')
996        supported_dict = self.get_qSupported_dict()
997        self.assertEqual(supported_dict.get('QPassSignals', '-'), expected)
998
999    @add_test_categories(["fork"])
1000    def test_qSupported_fork_events(self):
1001        supported_dict = (
1002            self.get_qSupported_dict(['multiprocess+', 'fork-events+']))
1003        self.assertEqual(supported_dict.get('multiprocess', '-'), '+')
1004        self.assertEqual(supported_dict.get('fork-events', '-'), '+')
1005        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
1006
1007    @add_test_categories(["fork"])
1008    def test_qSupported_fork_events_without_multiprocess(self):
1009        supported_dict = (
1010            self.get_qSupported_dict(['fork-events+']))
1011        self.assertEqual(supported_dict.get('multiprocess', '-'), '-')
1012        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
1013        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
1014
1015    @add_test_categories(["fork"])
1016    def test_qSupported_vfork_events(self):
1017        supported_dict = (
1018            self.get_qSupported_dict(['multiprocess+', 'vfork-events+']))
1019        self.assertEqual(supported_dict.get('multiprocess', '-'), '+')
1020        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
1021        self.assertEqual(supported_dict.get('vfork-events', '-'), '+')
1022
1023    @add_test_categories(["fork"])
1024    def test_qSupported_vfork_events_without_multiprocess(self):
1025        supported_dict = (
1026            self.get_qSupported_dict(['vfork-events+']))
1027        self.assertEqual(supported_dict.get('multiprocess', '-'), '-')
1028        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
1029        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
1030
1031    # We need to be able to self.runCmd to get cpuinfo,
1032    # which is not possible when using a remote platform.
1033    @skipIfRemote
1034    def test_qSupported_memory_tagging(self):
1035        supported_dict = self.get_qSupported_dict()
1036        self.assertEqual(supported_dict.get("memory-tagging", '-'),
1037                         '+' if self.isAArch64MTE() else '-')
1038
1039    @skipIfWindows # No pty support to test any inferior output
1040    def test_written_M_content_reads_back_correctly(self):
1041        self.build()
1042        self.set_inferior_startup_launch()
1043
1044        TEST_MESSAGE = "Hello, memory"
1045
1046        # Start up the stub and start/prep the inferior.
1047        procs = self.prep_debug_monitor_and_inferior(
1048            inferior_args=[
1049                "set-message:xxxxxxxxxxxxxX",
1050                "get-data-address-hex:g_message",
1051                "sleep:1",
1052                "print-message:"])
1053        self.test_sequence.add_log_lines(
1054            [
1055                # Start running after initial stop.
1056                "read packet: $c#63",
1057                # Match output line that prints the memory address of the message buffer within the inferior.
1058                # Note we require launch-only testing so we can get inferior otuput.
1059                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
1060                 "capture": {1: "message_address"}},
1061                # Now stop the inferior.
1062                "read packet: {}".format(chr(3)),
1063                # And wait for the stop notification.
1064                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
1065            True)
1066        context = self.expect_gdbremote_sequence()
1067        self.assertIsNotNone(context)
1068
1069        # Grab the message address.
1070        self.assertIsNotNone(context.get("message_address"))
1071        message_address = int(context.get("message_address"), 16)
1072
1073        # Hex-encode the test message, adding null termination.
1074        hex_encoded_message = seven.hexlify(TEST_MESSAGE)
1075
1076        # Write the message to the inferior. Verify that we can read it with the hex-encoded (m)
1077        # and binary (x) memory read packets.
1078        self.reset_test_sequence()
1079        self.test_sequence.add_log_lines(
1080            ["read packet: $M{0:x},{1:x}:{2}#00".format(message_address, len(TEST_MESSAGE), hex_encoded_message),
1081             "send packet: $OK#00",
1082             "read packet: $m{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)),
1083             "send packet: ${0}#00".format(hex_encoded_message),
1084             "read packet: $x{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)),
1085             "send packet: ${0}#00".format(TEST_MESSAGE),
1086             "read packet: $m{0:x},4#00".format(message_address),
1087             "send packet: ${0}#00".format(hex_encoded_message[0:8]),
1088             "read packet: $x{0:x},4#00".format(message_address),
1089             "send packet: ${0}#00".format(TEST_MESSAGE[0:4]),
1090             "read packet: $c#63",
1091             {"type": "output_match", "regex": r"^message: (.+)\r\n$", "capture": {1: "printed_message"}},
1092             "send packet: $W00#00",
1093             ], True)
1094        context = self.expect_gdbremote_sequence()
1095        self.assertIsNotNone(context)
1096
1097        # Ensure what we read from inferior memory is what we wrote.
1098        printed_message = context.get("printed_message")
1099        self.assertIsNotNone(printed_message)
1100        self.assertEqual(printed_message, TEST_MESSAGE + "X")
1101
1102    # Note: as of this moment, a hefty number of the GPR writes are failing with E32 (everything except rax-rdx, rdi, rsi, rbp).
1103    # Come back to this.  I have the test rigged to verify that at least some
1104    # of the bit-flip writes work.
1105    def test_P_writes_all_gpr_registers(self):
1106        self.build()
1107        self.set_inferior_startup_launch()
1108
1109        # Start inferior debug session, grab all register info.
1110        procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"])
1111        self.add_register_info_collection_packets()
1112        self.add_process_info_collection_packets()
1113
1114        context = self.expect_gdbremote_sequence()
1115        self.assertIsNotNone(context)
1116
1117        # Process register infos.
1118        reg_infos = self.parse_register_info_packets(context)
1119        self.assertIsNotNone(reg_infos)
1120        self.add_lldb_register_index(reg_infos)
1121
1122        # Process endian.
1123        process_info = self.parse_process_info_response(context)
1124        endian = process_info.get("endian")
1125        self.assertIsNotNone(endian)
1126
1127        # Pull out the register infos that we think we can bit flip
1128        # successfully,.
1129        gpr_reg_infos = [
1130            reg_info for reg_info in reg_infos if self.is_bit_flippable_register(reg_info)]
1131        self.assertTrue(len(gpr_reg_infos) > 0)
1132
1133        # Write flipped bit pattern of existing value to each register.
1134        (successful_writes, failed_writes) = self.flip_all_bits_in_each_register_value(
1135            gpr_reg_infos, endian)
1136        self.trace("successful writes: {}, failed writes: {}".format(successful_writes, failed_writes))
1137        self.assertTrue(successful_writes > 0)
1138
1139    # Note: as of this moment, a hefty number of the GPR writes are failing
1140    # with E32 (everything except rax-rdx, rdi, rsi, rbp).
1141    @skipIfWindows
1142    def test_P_and_p_thread_suffix_work(self):
1143        self.build()
1144        self.set_inferior_startup_launch()
1145
1146        # Startup the inferior with three threads.
1147        procs = self.prep_debug_monitor_and_inferior(
1148            inferior_args=["thread:new", "thread:new"])
1149        self.add_thread_suffix_request_packets()
1150        self.add_register_info_collection_packets()
1151        self.add_process_info_collection_packets()
1152
1153        context = self.expect_gdbremote_sequence()
1154        self.assertIsNotNone(context)
1155
1156        process_info = self.parse_process_info_response(context)
1157        self.assertIsNotNone(process_info)
1158        endian = process_info.get("endian")
1159        self.assertIsNotNone(endian)
1160
1161        reg_infos = self.parse_register_info_packets(context)
1162        self.assertIsNotNone(reg_infos)
1163        self.add_lldb_register_index(reg_infos)
1164
1165        reg_index = self.select_modifiable_register(reg_infos)
1166        self.assertIsNotNone(reg_index)
1167        reg_byte_size = int(reg_infos[reg_index]["bitsize"]) // 8
1168        self.assertTrue(reg_byte_size > 0)
1169
1170        # Run the process a bit so threads can start up, and collect register
1171        # info.
1172        context = self.run_process_then_stop(run_seconds=1)
1173        self.assertIsNotNone(context)
1174
1175        # Wait for 3 threads to be present.
1176        threads = self.wait_for_thread_count(3)
1177        self.assertEqual(len(threads), 3)
1178
1179        expected_reg_values = []
1180        register_increment = 1
1181        next_value = None
1182
1183        # Set the same register in each of 3 threads to a different value.
1184        # Verify each one has the unique value.
1185        for thread in threads:
1186            # If we don't have a next value yet, start it with the initial read
1187            # value + 1
1188            if not next_value:
1189                # Read pre-existing register value.
1190                self.reset_test_sequence()
1191                self.test_sequence.add_log_lines(
1192                    ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread),
1193                     {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1194                     ], True)
1195                context = self.expect_gdbremote_sequence()
1196                self.assertIsNotNone(context)
1197
1198                # Set the next value to use for writing as the increment plus
1199                # current value.
1200                p_response = context.get("p_response")
1201                self.assertIsNotNone(p_response)
1202                next_value = lldbgdbserverutils.unpack_register_hex_unsigned(
1203                    endian, p_response)
1204
1205            # Set new value using P and thread suffix.
1206            self.reset_test_sequence()
1207            self.test_sequence.add_log_lines(
1208                [
1209                    "read packet: $P{0:x}={1};thread:{2:x}#00".format(
1210                        reg_index,
1211                        lldbgdbserverutils.pack_register_hex(
1212                            endian,
1213                            next_value,
1214                            byte_size=reg_byte_size),
1215                        thread),
1216                    "send packet: $OK#00",
1217                ],
1218                True)
1219            context = self.expect_gdbremote_sequence()
1220            self.assertIsNotNone(context)
1221
1222            # Save the value we set.
1223            expected_reg_values.append(next_value)
1224
1225            # Increment value for next thread to use (we want them all
1226            # different so we can verify they wrote to each thread correctly
1227            # next.)
1228            next_value += register_increment
1229
1230        # Revisit each thread and verify they have the expected value set for
1231        # the register we wrote.
1232        thread_index = 0
1233        for thread in threads:
1234            # Read pre-existing register value.
1235            self.reset_test_sequence()
1236            self.test_sequence.add_log_lines(
1237                ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread),
1238                 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1239                 ], True)
1240            context = self.expect_gdbremote_sequence()
1241            self.assertIsNotNone(context)
1242
1243            # Get the register value.
1244            p_response = context.get("p_response")
1245            self.assertIsNotNone(p_response)
1246            read_value = lldbgdbserverutils.unpack_register_hex_unsigned(
1247                endian, p_response)
1248
1249            # Make sure we read back what we wrote.
1250            self.assertEqual(read_value, expected_reg_values[thread_index])
1251            thread_index += 1
1252
1253    @skipIfWindows # No pty support to test any inferior output
1254    @add_test_categories(["llgs"])
1255    def test_launch_via_A(self):
1256        self.build()
1257        exe_path = self.getBuildArtifact("a.out")
1258        args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"]
1259        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1260
1261        server = self.connect_to_debug_monitor()
1262        self.assertIsNotNone(server)
1263        self.do_handshake()
1264        # NB: strictly speaking we should use %x here but this packet
1265        # is deprecated, so no point in changing lldb-server's expectations
1266        self.test_sequence.add_log_lines(
1267            ["read packet: $A %d,0,%s,%d,1,%s,%d,2,%s,%d,3,%s#00" %
1268             tuple(itertools.chain.from_iterable(
1269                 [(len(x), x) for x in hex_args])),
1270             "send packet: $OK#00",
1271             "read packet: $c#00",
1272             "send packet: $W00#00"],
1273            True)
1274        context = self.expect_gdbremote_sequence()
1275        self.assertEqual(context["O_content"],
1276                         b'arg1\r\narg2\r\narg3\r\n')
1277
1278    @skipIfWindows # No pty support to test any inferior output
1279    @add_test_categories(["llgs"])
1280    def test_launch_via_vRun(self):
1281        self.build()
1282        exe_path = self.getBuildArtifact("a.out")
1283        args = [exe_path, "stderr:arg1", "stderr:arg2", "stderr:arg3"]
1284        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1285
1286        server = self.connect_to_debug_monitor()
1287        self.assertIsNotNone(server)
1288        self.do_handshake()
1289        self.test_sequence.add_log_lines(
1290            ["read packet: $vRun;%s;%s;%s;%s#00" % tuple(hex_args),
1291             {"direction": "send",
1292              "regex": r"^\$T([0-9a-fA-F]+)"},
1293             "read packet: $c#00",
1294             "send packet: $W00#00"],
1295            True)
1296        context = self.expect_gdbremote_sequence()
1297        self.assertEqual(context["O_content"],
1298                         b'arg1\r\narg2\r\narg3\r\n')
1299
1300    @add_test_categories(["llgs"])
1301    def test_launch_via_vRun_no_args(self):
1302        self.build()
1303        exe_path = self.getBuildArtifact("a.out")
1304        hex_path = binascii.b2a_hex(exe_path.encode()).decode()
1305
1306        server = self.connect_to_debug_monitor()
1307        self.assertIsNotNone(server)
1308        self.do_handshake()
1309        self.test_sequence.add_log_lines(
1310            ["read packet: $vRun;%s#00" % (hex_path,),
1311             {"direction": "send",
1312              "regex": r"^\$T([0-9a-fA-F]+)"},
1313             "read packet: $c#00",
1314             "send packet: $W00#00"],
1315            True)
1316        self.expect_gdbremote_sequence()
1317
1318    @skipIfWindows # No pty support to test any inferior output
1319    @add_test_categories(["llgs"])
1320    def test_QEnvironment(self):
1321        self.build()
1322        exe_path = self.getBuildArtifact("a.out")
1323        env = {"FOO": "test", "BAR": "a=z"}
1324        args = [exe_path, "print-env:FOO", "print-env:BAR"]
1325        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1326
1327        server = self.connect_to_debug_monitor()
1328        self.assertIsNotNone(server)
1329        self.do_handshake()
1330
1331        for key, value in env.items():
1332            self.test_sequence.add_log_lines(
1333                ["read packet: $QEnvironment:%s=%s#00" % (key, value),
1334                 "send packet: $OK#00"],
1335                True)
1336        self.test_sequence.add_log_lines(
1337            ["read packet: $vRun;%s#00" % (";".join(hex_args),),
1338             {"direction": "send",
1339              "regex": r"^\$T([0-9a-fA-F]+)"},
1340             "read packet: $c#00",
1341             "send packet: $W00#00"],
1342            True)
1343        context = self.expect_gdbremote_sequence()
1344        self.assertEqual(context["O_content"], b"test\r\na=z\r\n")
1345
1346    @skipIfWindows # No pty support to test any inferior output
1347    @add_test_categories(["llgs"])
1348    def test_QEnvironmentHexEncoded(self):
1349        self.build()
1350        exe_path = self.getBuildArtifact("a.out")
1351        env = {"FOO": "test", "BAR": "a=z", "BAZ": "a*}#z"}
1352        args = [exe_path, "print-env:FOO", "print-env:BAR", "print-env:BAZ"]
1353        hex_args = [binascii.b2a_hex(x.encode()).decode() for x in args]
1354
1355        server = self.connect_to_debug_monitor()
1356        self.assertIsNotNone(server)
1357        self.do_handshake()
1358
1359        for key, value in env.items():
1360            hex_enc = binascii.b2a_hex(("%s=%s" % (key, value)).encode()).decode()
1361            self.test_sequence.add_log_lines(
1362                ["read packet: $QEnvironmentHexEncoded:%s#00" % (hex_enc,),
1363                 "send packet: $OK#00"],
1364                True)
1365        self.test_sequence.add_log_lines(
1366            ["read packet: $vRun;%s#00" % (";".join(hex_args),),
1367             {"direction": "send",
1368              "regex": r"^\$T([0-9a-fA-F]+)"},
1369             "read packet: $c#00",
1370             "send packet: $W00#00"],
1371            True)
1372        context = self.expect_gdbremote_sequence()
1373        self.assertEqual(context["O_content"], b"test\r\na=z\r\na*}#z\r\n")
1374