1"""
2Test case for testing the gdbremote protocol.
3
4Tests run against debugserver and lldb-server (llgs).
5lldb-server tests run where the lldb-server exe is
6available.
7
8This class will be broken into smaller test case classes by
9gdb remote packet functional areas.  For now it contains
10the initial set of tests implemented.
11"""
12
13import unittest2
14import gdbremote_testcase
15import lldbgdbserverutils
16from lldbsuite.support import seven
17from lldbsuite.test.decorators import *
18from lldbsuite.test.lldbtest import *
19from lldbsuite.test.lldbdwarf import *
20from lldbsuite.test import lldbutil
21
22
23class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase, DwarfOpcodeParser):
24
25    mydir = TestBase.compute_mydir(__file__)
26
27    def test_thread_suffix_supported(self):
28        server = self.connect_to_debug_monitor()
29        self.assertIsNotNone(server)
30
31        self.add_no_ack_remote_stream()
32        self.test_sequence.add_log_lines(
33            ["lldb-server <  26> read packet: $QThreadSuffixSupported#e4",
34             "lldb-server <   6> send packet: $OK#9a"],
35            True)
36
37        self.expect_gdbremote_sequence()
38
39
40    def test_list_threads_in_stop_reply_supported(self):
41        server = self.connect_to_debug_monitor()
42        self.assertIsNotNone(server)
43
44        self.add_no_ack_remote_stream()
45        self.test_sequence.add_log_lines(
46            ["lldb-server <  27> read packet: $QListThreadsInStopReply#21",
47             "lldb-server <   6> send packet: $OK#9a"],
48            True)
49        self.expect_gdbremote_sequence()
50
51    def test_c_packet_works(self):
52        self.build()
53        procs = self.prep_debug_monitor_and_inferior()
54        self.test_sequence.add_log_lines(
55            ["read packet: $c#63",
56             "send packet: $W00#00"],
57            True)
58
59        self.expect_gdbremote_sequence()
60
61    @skipIfWindows # No pty support to test any inferior output
62    def test_inferior_print_exit(self):
63        self.build()
64        procs = self.prep_debug_monitor_and_inferior(
65                inferior_args=["hello, world"])
66        self.test_sequence.add_log_lines(
67            ["read packet: $vCont;c#a8",
68             {"type": "output_match", "regex": self.maybe_strict_output_regex(r"hello, world\r\n")},
69             "send packet: $W00#00"],
70            True)
71
72        context = self.expect_gdbremote_sequence()
73        self.assertIsNotNone(context)
74
75    def test_first_launch_stop_reply_thread_matches_first_qC(self):
76        self.build()
77        procs = self.prep_debug_monitor_and_inferior()
78        self.test_sequence.add_log_lines(["read packet: $qC#00",
79                                          {"direction": "send",
80                                           "regex": r"^\$QC([0-9a-fA-F]+)#",
81                                           "capture": {1: "thread_id"}},
82                                          "read packet: $?#00",
83                                          {"direction": "send",
84                                              "regex": r"^\$T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+)",
85                                              "expect_captures": {1: "thread_id"}}],
86                                         True)
87        self.expect_gdbremote_sequence()
88
89    def test_attach_commandline_continue_app_exits(self):
90        self.build()
91        self.set_inferior_startup_attach()
92        procs = self.prep_debug_monitor_and_inferior()
93        self.test_sequence.add_log_lines(
94            ["read packet: $vCont;c#a8",
95             "send packet: $W00#00"],
96            True)
97        self.expect_gdbremote_sequence()
98
99        # Wait a moment for completed and now-detached inferior process to
100        # clear.
101        time.sleep(1)
102
103        if not lldb.remote_platform:
104            # Process should be dead now. Reap results.
105            poll_result = procs["inferior"].poll()
106            self.assertIsNotNone(poll_result)
107
108        # Where possible, verify at the system level that the process is not
109        # running.
110        self.assertFalse(
111            lldbgdbserverutils.process_is_running(
112                procs["inferior"].pid, False))
113
114    def test_qRegisterInfo_returns_one_valid_result(self):
115        self.build()
116        self.prep_debug_monitor_and_inferior()
117        self.test_sequence.add_log_lines(
118            ["read packet: $qRegisterInfo0#00",
119             {"direction": "send", "regex": r"^\$(.+);#[0-9A-Fa-f]{2}", "capture": {1: "reginfo_0"}}],
120            True)
121
122        # Run the stream
123        context = self.expect_gdbremote_sequence()
124        self.assertIsNotNone(context)
125
126        reg_info_packet = context.get("reginfo_0")
127        self.assertIsNotNone(reg_info_packet)
128        self.assert_valid_reg_info(
129            lldbgdbserverutils.parse_reg_info_response(reg_info_packet))
130
131    def test_qRegisterInfo_returns_all_valid_results(self):
132        self.build()
133        self.prep_debug_monitor_and_inferior()
134        self.add_register_info_collection_packets()
135
136        # Run the stream.
137        context = self.expect_gdbremote_sequence()
138        self.assertIsNotNone(context)
139
140        # Validate that each register info returned validates.
141        for reg_info in self.parse_register_info_packets(context):
142            self.assert_valid_reg_info(reg_info)
143
144    def test_qRegisterInfo_contains_required_generics_debugserver(self):
145        self.build()
146        self.prep_debug_monitor_and_inferior()
147        self.add_register_info_collection_packets()
148
149        # Run the packet stream.
150        context = self.expect_gdbremote_sequence()
151        self.assertIsNotNone(context)
152
153        # Gather register info entries.
154        reg_infos = self.parse_register_info_packets(context)
155
156        # Collect all generic registers found.
157        generic_regs = {
158            reg_info['generic']: 1 for reg_info in reg_infos if 'generic' in reg_info}
159
160        # Ensure we have a program counter register.
161        self.assertIn('pc', generic_regs)
162
163        # Ensure we have a frame pointer register. PPC64le's FP is the same as SP
164        if self.getArchitecture() != 'powerpc64le':
165            self.assertIn('fp', generic_regs)
166
167        # Ensure we have a stack pointer register.
168        self.assertIn('sp', generic_regs)
169
170        # Ensure we have a flags register.
171        self.assertIn('flags', generic_regs)
172
173    def test_qRegisterInfo_contains_at_least_one_register_set(self):
174        self.build()
175        self.prep_debug_monitor_and_inferior()
176        self.add_register_info_collection_packets()
177
178        # Run the packet stream.
179        context = self.expect_gdbremote_sequence()
180        self.assertIsNotNone(context)
181
182        # Gather register info entries.
183        reg_infos = self.parse_register_info_packets(context)
184
185        # Collect all register sets found.
186        register_sets = {
187            reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info}
188        self.assertTrue(len(register_sets) >= 1)
189
190    def targetHasAVX(self):
191        triple = self.dbg.GetSelectedPlatform().GetTriple()
192
193        # TODO other platforms, please implement this function
194        if not re.match(".*-.*-linux", triple):
195            return True
196
197        # Need to do something different for non-Linux/Android targets
198        if lldb.remote_platform:
199            self.runCmd('platform get-file "/proc/cpuinfo" "cpuinfo"')
200            cpuinfo_path = "cpuinfo"
201            self.addTearDownHook(lambda: os.unlink("cpuinfo"))
202        else:
203            cpuinfo_path = "/proc/cpuinfo"
204
205        f = open(cpuinfo_path, 'r')
206        cpuinfo = f.read()
207        f.close()
208        return " avx " in cpuinfo
209
210    @expectedFailureAll(oslist=["windows"]) # no avx for now.
211    @skipIf(archs=no_match(['amd64', 'i386', 'x86_64']))
212    @add_test_categories(["llgs"])
213    def test_qRegisterInfo_contains_avx_registers(self):
214        self.build()
215        self.prep_debug_monitor_and_inferior()
216        self.add_register_info_collection_packets()
217
218        # Run the packet stream.
219        context = self.expect_gdbremote_sequence()
220        self.assertIsNotNone(context)
221
222        # Gather register info entries.
223        reg_infos = self.parse_register_info_packets(context)
224
225        # Collect all generics found.
226        register_sets = {
227            reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info}
228        self.assertEqual(
229            self.targetHasAVX(),
230            "Advanced Vector Extensions" in register_sets)
231
232    def qThreadInfo_contains_thread(self):
233        procs = self.prep_debug_monitor_and_inferior()
234        self.add_threadinfo_collection_packets()
235
236        # Run the packet stream.
237        context = self.expect_gdbremote_sequence()
238        self.assertIsNotNone(context)
239
240        # Gather threadinfo entries.
241        threads = self.parse_threadinfo_packets(context)
242        self.assertIsNotNone(threads)
243
244        # We should have exactly one thread.
245        self.assertEqual(len(threads), 1)
246
247    def test_qThreadInfo_contains_thread_launch(self):
248        self.build()
249        self.set_inferior_startup_launch()
250        self.qThreadInfo_contains_thread()
251
252    @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped
253    def test_qThreadInfo_contains_thread_attach(self):
254        self.build()
255        self.set_inferior_startup_attach()
256        self.qThreadInfo_contains_thread()
257
258    def qThreadInfo_matches_qC(self):
259        procs = self.prep_debug_monitor_and_inferior()
260
261        self.add_threadinfo_collection_packets()
262        self.test_sequence.add_log_lines(
263            ["read packet: $qC#00",
264             {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}
265             ], True)
266
267        # Run the packet stream.
268        context = self.expect_gdbremote_sequence()
269        self.assertIsNotNone(context)
270
271        # Gather threadinfo entries.
272        threads = self.parse_threadinfo_packets(context)
273        self.assertIsNotNone(threads)
274
275        # We should have exactly one thread from threadinfo.
276        self.assertEqual(len(threads), 1)
277
278        # We should have a valid thread_id from $QC.
279        QC_thread_id_hex = context.get("thread_id")
280        self.assertIsNotNone(QC_thread_id_hex)
281        QC_thread_id = int(QC_thread_id_hex, 16)
282
283        # Those two should be the same.
284        self.assertEqual(threads[0], QC_thread_id)
285
286    def test_qThreadInfo_matches_qC_launch(self):
287        self.build()
288        self.set_inferior_startup_launch()
289        self.qThreadInfo_matches_qC()
290
291    @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped
292    def test_qThreadInfo_matches_qC_attach(self):
293        self.build()
294        self.set_inferior_startup_attach()
295        self.qThreadInfo_matches_qC()
296
297    def test_p_returns_correct_data_size_for_each_qRegisterInfo_launch(self):
298        self.build()
299        self.set_inferior_startup_launch()
300        procs = self.prep_debug_monitor_and_inferior()
301        self.add_register_info_collection_packets()
302
303        # Run the packet stream.
304        context = self.expect_gdbremote_sequence()
305        self.assertIsNotNone(context)
306
307        # Gather register info entries.
308        reg_infos = self.parse_register_info_packets(context)
309        self.assertIsNotNone(reg_infos)
310        self.assertTrue(len(reg_infos) > 0)
311
312        byte_order = self.get_target_byte_order()
313
314        # Read value for each register.
315        reg_index = 0
316        for reg_info in reg_infos:
317            # Skip registers that don't have a register set.  For x86, these are
318            # the DRx registers, which have no LLDB-kind register number and thus
319            # cannot be read via normal
320            # NativeRegisterContext::ReadRegister(reg_info,...) calls.
321            if not "set" in reg_info:
322                continue
323
324            # Clear existing packet expectations.
325            self.reset_test_sequence()
326
327            # Run the register query
328            self.test_sequence.add_log_lines(
329                ["read packet: $p{0:x}#00".format(reg_index),
330                 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}],
331                True)
332            context = self.expect_gdbremote_sequence()
333            self.assertIsNotNone(context)
334
335            # Verify the response length.
336            p_response = context.get("p_response")
337            self.assertIsNotNone(p_response)
338
339            # Skip erraneous (unsupported) registers.
340            # TODO: remove this once we make unsupported registers disappear.
341            if p_response.startswith("E") and len(p_response) == 3:
342                continue
343
344            if "dynamic_size_dwarf_expr_bytes" in reg_info:
345                self.updateRegInfoBitsize(reg_info, byte_order)
346            self.assertEqual(len(p_response), 2 * int(reg_info["bitsize"]) / 8,
347                             reg_info)
348
349            # Increment loop
350            reg_index += 1
351
352    def Hg_switches_to_3_threads(self):
353        # Startup the inferior with three threads (main + 2 new ones).
354        procs = self.prep_debug_monitor_and_inferior(
355            inferior_args=["thread:new", "thread:new"])
356
357        # Let the inferior process have a few moments to start up the thread
358        # when launched.  (The launch scenario has no time to run, so threads
359        # won't be there yet.)
360        self.run_process_then_stop(run_seconds=1)
361
362        # Wait at most x seconds for 3 threads to be present.
363        threads = self.wait_for_thread_count(3)
364        self.assertEqual(len(threads), 3)
365
366        # verify we can $H to each thead, and $qC matches the thread we set.
367        for thread in threads:
368            # Change to each thread, verify current thread id.
369            self.reset_test_sequence()
370            self.test_sequence.add_log_lines(
371                ["read packet: $Hg{0:x}#00".format(thread),  # Set current thread.
372                 "send packet: $OK#00",
373                 "read packet: $qC#00",
374                 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}],
375                True)
376
377            context = self.expect_gdbremote_sequence()
378            self.assertIsNotNone(context)
379
380            # Verify the thread id.
381            self.assertIsNotNone(context.get("thread_id"))
382            self.assertEqual(int(context.get("thread_id"), 16), thread)
383
384    @expectedFailureAll(oslist=["windows"]) # expect 4 threads
385    def test_Hg_switches_to_3_threads_launch(self):
386        self.build()
387        self.set_inferior_startup_launch()
388        self.Hg_switches_to_3_threads()
389
390    @expectedFailureAll(oslist=["windows"]) # expecting one more thread
391    def test_Hg_switches_to_3_threads_attach(self):
392        self.build()
393        self.set_inferior_startup_attach()
394        self.Hg_switches_to_3_threads()
395
396    def Hc_then_Csignal_signals_correct_thread(self, segfault_signo):
397        # NOTE only run this one in inferior-launched mode: we can't grab inferior stdout when running attached,
398        # and the test requires getting stdout from the exe.
399
400        NUM_THREADS = 3
401
402        # Startup the inferior with three threads (main + NUM_THREADS-1 worker threads).
403        # inferior_args=["thread:print-ids"]
404        inferior_args = ["thread:segfault"]
405        for i in range(NUM_THREADS - 1):
406            # if i > 0:
407                # Give time between thread creation/segfaulting for the handler to work.
408                # inferior_args.append("sleep:1")
409            inferior_args.append("thread:new")
410        inferior_args.append("sleep:10")
411
412        # Launch/attach.  (In our case, this should only ever be launched since
413        # we need inferior stdout/stderr).
414        procs = self.prep_debug_monitor_and_inferior(
415            inferior_args=inferior_args)
416        self.test_sequence.add_log_lines(["read packet: $c#63"], True)
417        context = self.expect_gdbremote_sequence()
418
419        # Let the inferior process have a few moments to start up the thread when launched.
420        # context = self.run_process_then_stop(run_seconds=1)
421
422        # Wait at most x seconds for all threads to be present.
423        # threads = self.wait_for_thread_count(NUM_THREADS)
424        # self.assertEquals(len(threads), NUM_THREADS)
425
426        signaled_tids = {}
427        print_thread_ids = {}
428
429        # Switch to each thread, deliver a signal, and verify signal delivery
430        for i in range(NUM_THREADS - 1):
431            # Run until SIGSEGV comes in.
432            self.reset_test_sequence()
433            self.test_sequence.add_log_lines([{"direction": "send",
434                                               "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
435                                               "capture": {1: "signo",
436                                                            2: "thread_id"}}],
437                                             True)
438
439            context = self.expect_gdbremote_sequence()
440            self.assertIsNotNone(context)
441            signo = context.get("signo")
442            self.assertEqual(int(signo, 16), segfault_signo)
443
444            # Ensure we haven't seen this tid yet.
445            thread_id = int(context.get("thread_id"), 16)
446            self.assertNotIn(thread_id, signaled_tids)
447            signaled_tids[thread_id] = 1
448
449            # Send SIGUSR1 to the thread that signaled the SIGSEGV.
450            self.reset_test_sequence()
451            self.test_sequence.add_log_lines(
452                [
453                    # Set the continue thread.
454                    # Set current thread.
455                    "read packet: $Hc{0:x}#00".format(thread_id),
456                    "send packet: $OK#00",
457
458                    # Continue sending the signal number to the continue thread.
459                    # The commented out packet is a way to do this same operation without using
460                    # a $Hc (but this test is testing $Hc, so we'll stick with the former).
461                    "read packet: $C{0:x}#00".format(lldbutil.get_signal_number('SIGUSR1')),
462                    # "read packet: $vCont;C{0:x}:{1:x};c#00".format(lldbutil.get_signal_number('SIGUSR1'), thread_id),
463
464                    # FIXME: Linux does not report the thread stop on the delivered signal (SIGUSR1 here).  MacOSX debugserver does.
465                    # But MacOSX debugserver isn't guaranteeing the thread the signal handler runs on, so currently its an XFAIL.
466                    # Need to rectify behavior here.  The linux behavior is more intuitive to me since we're essentially swapping out
467                    # an about-to-be-delivered signal (for which we already sent a stop packet) to a different signal.
468                    # {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
469                    #  "read packet: $c#63",
470                    {"type": "output_match", "regex": r"^received SIGUSR1 on thread id: ([0-9a-fA-F]+)\r\nthread ([0-9a-fA-F]+): past SIGSEGV\r\n", "capture": {1: "print_thread_id", 2: "post_handle_thread_id"}},
471                ],
472                True)
473
474            # Run the sequence.
475            context = self.expect_gdbremote_sequence()
476            self.assertIsNotNone(context)
477
478            # Ensure the stop signal is the signal we delivered.
479            # stop_signo = context.get("stop_signo")
480            # self.assertIsNotNone(stop_signo)
481            # self.assertEquals(int(stop_signo,16), lldbutil.get_signal_number('SIGUSR1'))
482
483            # Ensure the stop thread is the thread to which we delivered the signal.
484            # stop_thread_id = context.get("stop_thread_id")
485            # self.assertIsNotNone(stop_thread_id)
486            # self.assertEquals(int(stop_thread_id,16), thread_id)
487
488            # Ensure we haven't seen this thread id yet.  The inferior's
489            # self-obtained thread ids are not guaranteed to match the stub
490            # tids (at least on MacOSX).
491            print_thread_id = context.get("print_thread_id")
492            self.assertIsNotNone(print_thread_id)
493            print_thread_id = int(print_thread_id, 16)
494            self.assertNotIn(print_thread_id, print_thread_ids)
495
496            # Now remember this print (i.e. inferior-reflected) thread id and
497            # ensure we don't hit it again.
498            print_thread_ids[print_thread_id] = 1
499
500            # Ensure post signal-handle thread id matches the thread that
501            # initially raised the SIGSEGV.
502            post_handle_thread_id = context.get("post_handle_thread_id")
503            self.assertIsNotNone(post_handle_thread_id)
504            post_handle_thread_id = int(post_handle_thread_id, 16)
505            self.assertEqual(post_handle_thread_id, print_thread_id)
506
507    @expectedFailureDarwin
508    @skipIfWindows # no SIGSEGV support
509    @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48419")
510    @expectedFailureNetBSD
511    def test_Hc_then_Csignal_signals_correct_thread_launch(self):
512        self.build()
513        self.set_inferior_startup_launch()
514
515        if self.platformIsDarwin():
516            # Darwin debugserver translates some signals like SIGSEGV into some gdb
517            # expectations about fixed signal numbers.
518            self.Hc_then_Csignal_signals_correct_thread(self.TARGET_EXC_BAD_ACCESS)
519        else:
520            self.Hc_then_Csignal_signals_correct_thread(
521                lldbutil.get_signal_number('SIGSEGV'))
522
523    @skipIfWindows # No pty support to test any inferior output
524    def test_m_packet_reads_memory(self):
525        self.build()
526        self.set_inferior_startup_launch()
527        # This is the memory we will write into the inferior and then ensure we
528        # can read back with $m.
529        MEMORY_CONTENTS = "Test contents 0123456789 ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz"
530
531        # Start up the inferior.
532        procs = self.prep_debug_monitor_and_inferior(
533            inferior_args=[
534                "set-message:%s" %
535                MEMORY_CONTENTS,
536                "get-data-address-hex:g_message",
537                "sleep:5"])
538
539        # Run the process
540        self.test_sequence.add_log_lines(
541            [
542                # Start running after initial stop.
543                "read packet: $c#63",
544                # Match output line that prints the memory address of the message buffer within the inferior.
545                # Note we require launch-only testing so we can get inferior otuput.
546                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
547                 "capture": {1: "message_address"}},
548                # Now stop the inferior.
549                "read packet: {}".format(chr(3)),
550                # And wait for the stop notification.
551                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
552            True)
553
554        # Run the packet stream.
555        context = self.expect_gdbremote_sequence()
556        self.assertIsNotNone(context)
557
558        # Grab the message address.
559        self.assertIsNotNone(context.get("message_address"))
560        message_address = int(context.get("message_address"), 16)
561
562        # Grab contents from the inferior.
563        self.reset_test_sequence()
564        self.test_sequence.add_log_lines(
565            ["read packet: $m{0:x},{1:x}#00".format(message_address, len(MEMORY_CONTENTS)),
566             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "read_contents"}}],
567            True)
568
569        # Run the packet stream.
570        context = self.expect_gdbremote_sequence()
571        self.assertIsNotNone(context)
572
573        # Ensure what we read from inferior memory is what we wrote.
574        self.assertIsNotNone(context.get("read_contents"))
575        read_contents = seven.unhexlify(context.get("read_contents"))
576        self.assertEqual(read_contents, MEMORY_CONTENTS)
577
578    def test_qMemoryRegionInfo_is_supported(self):
579        self.build()
580        self.set_inferior_startup_launch()
581        # Start up the inferior.
582        procs = self.prep_debug_monitor_and_inferior()
583
584        # Ask if it supports $qMemoryRegionInfo.
585        self.test_sequence.add_log_lines(
586            ["read packet: $qMemoryRegionInfo#00",
587             "send packet: $OK#00"
588             ], True)
589        self.expect_gdbremote_sequence()
590
591    @skipIfWindows # No pty support to test any inferior output
592    def test_qMemoryRegionInfo_reports_code_address_as_executable(self):
593        self.build()
594        self.set_inferior_startup_launch()
595
596        # Start up the inferior.
597        procs = self.prep_debug_monitor_and_inferior(
598            inferior_args=["get-code-address-hex:hello", "sleep:5"])
599
600        # Run the process
601        self.test_sequence.add_log_lines(
602            [
603                # Start running after initial stop.
604                "read packet: $c#63",
605                # Match output line that prints the memory address of the message buffer within the inferior.
606                # Note we require launch-only testing so we can get inferior otuput.
607                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"),
608                 "capture": {1: "code_address"}},
609                # Now stop the inferior.
610                "read packet: {}".format(chr(3)),
611                # And wait for the stop notification.
612                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
613            True)
614
615        # Run the packet stream.
616        context = self.expect_gdbremote_sequence()
617        self.assertIsNotNone(context)
618
619        # Grab the code address.
620        self.assertIsNotNone(context.get("code_address"))
621        code_address = int(context.get("code_address"), 16)
622
623        # Grab memory region info from the inferior.
624        self.reset_test_sequence()
625        self.add_query_memory_region_packets(code_address)
626
627        # Run the packet stream.
628        context = self.expect_gdbremote_sequence()
629        self.assertIsNotNone(context)
630        mem_region_dict = self.parse_memory_region_packet(context)
631
632        # Ensure there are no errors reported.
633        self.assertNotIn("error", mem_region_dict)
634
635        # Ensure code address is readable and executable.
636        self.assertIn("permissions", mem_region_dict)
637        self.assertIn("r", mem_region_dict["permissions"])
638        self.assertIn("x", mem_region_dict["permissions"])
639
640        # Ensure the start address and size encompass the address we queried.
641        self.assert_address_within_memory_region(code_address, mem_region_dict)
642
643    @skipIfWindows # No pty support to test any inferior output
644    def test_qMemoryRegionInfo_reports_stack_address_as_rw(self):
645        self.build()
646        self.set_inferior_startup_launch()
647
648        # Start up the inferior.
649        procs = self.prep_debug_monitor_and_inferior(
650            inferior_args=["get-stack-address-hex:", "sleep:5"])
651
652        # Run the process
653        self.test_sequence.add_log_lines(
654            [
655                # Start running after initial stop.
656                "read packet: $c#63",
657                # Match output line that prints the memory address of the message buffer within the inferior.
658                # Note we require launch-only testing so we can get inferior otuput.
659                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"stack address: 0x([0-9a-fA-F]+)\r\n"),
660                 "capture": {1: "stack_address"}},
661                # Now stop the inferior.
662                "read packet: {}".format(chr(3)),
663                # And wait for the stop notification.
664                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
665            True)
666
667        # Run the packet stream.
668        context = self.expect_gdbremote_sequence()
669        self.assertIsNotNone(context)
670
671        # Grab the address.
672        self.assertIsNotNone(context.get("stack_address"))
673        stack_address = int(context.get("stack_address"), 16)
674
675        # Grab memory region info from the inferior.
676        self.reset_test_sequence()
677        self.add_query_memory_region_packets(stack_address)
678
679        # Run the packet stream.
680        context = self.expect_gdbremote_sequence()
681        self.assertIsNotNone(context)
682        mem_region_dict = self.parse_memory_region_packet(context)
683
684        # Ensure there are no errors reported.
685        self.assertNotIn("error", mem_region_dict)
686
687        # Ensure address is readable and executable.
688        self.assertIn("permissions", mem_region_dict)
689        self.assertIn("r", mem_region_dict["permissions"])
690        self.assertIn("w", mem_region_dict["permissions"])
691
692        # Ensure the start address and size encompass the address we queried.
693        self.assert_address_within_memory_region(
694            stack_address, mem_region_dict)
695
696    @skipIfWindows # No pty support to test any inferior output
697    def test_qMemoryRegionInfo_reports_heap_address_as_rw(self):
698        self.build()
699        self.set_inferior_startup_launch()
700
701        # Start up the inferior.
702        procs = self.prep_debug_monitor_and_inferior(
703            inferior_args=["get-heap-address-hex:", "sleep:5"])
704
705        # Run the process
706        self.test_sequence.add_log_lines(
707            [
708                # Start running after initial stop.
709                "read packet: $c#63",
710                # Match output line that prints the memory address of the message buffer within the inferior.
711                # Note we require launch-only testing so we can get inferior otuput.
712                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"heap address: 0x([0-9a-fA-F]+)\r\n"),
713                 "capture": {1: "heap_address"}},
714                # Now stop the inferior.
715                "read packet: {}".format(chr(3)),
716                # And wait for the stop notification.
717                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
718            True)
719
720        # Run the packet stream.
721        context = self.expect_gdbremote_sequence()
722        self.assertIsNotNone(context)
723
724        # Grab the address.
725        self.assertIsNotNone(context.get("heap_address"))
726        heap_address = int(context.get("heap_address"), 16)
727
728        # Grab memory region info from the inferior.
729        self.reset_test_sequence()
730        self.add_query_memory_region_packets(heap_address)
731
732        # Run the packet stream.
733        context = self.expect_gdbremote_sequence()
734        self.assertIsNotNone(context)
735        mem_region_dict = self.parse_memory_region_packet(context)
736
737        # Ensure there are no errors reported.
738        self.assertNotIn("error", mem_region_dict)
739
740        # Ensure address is readable and executable.
741        self.assertIn("permissions", mem_region_dict)
742        self.assertIn("r", mem_region_dict["permissions"])
743        self.assertIn("w", mem_region_dict["permissions"])
744
745        # Ensure the start address and size encompass the address we queried.
746        self.assert_address_within_memory_region(heap_address, mem_region_dict)
747
748    def breakpoint_set_and_remove_work(self, want_hardware):
749        # Start up the inferior.
750        procs = self.prep_debug_monitor_and_inferior(
751            inferior_args=[
752                "get-code-address-hex:hello",
753                "sleep:1",
754                "call-function:hello"])
755
756        # Run the process
757        self.add_register_info_collection_packets()
758        self.add_process_info_collection_packets()
759        self.test_sequence.add_log_lines(
760            [  # Start running after initial stop.
761                "read packet: $c#63",
762                # Match output line that prints the memory address of the function call entry point.
763                # Note we require launch-only testing so we can get inferior otuput.
764                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"),
765                 "capture": {1: "function_address"}},
766                # Now stop the inferior.
767                "read packet: {}".format(chr(3)),
768                # And wait for the stop notification.
769                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
770            True)
771
772        # Run the packet stream.
773        context = self.expect_gdbremote_sequence()
774        self.assertIsNotNone(context)
775
776        # Gather process info - we need endian of target to handle register
777        # value conversions.
778        process_info = self.parse_process_info_response(context)
779        endian = process_info.get("endian")
780        self.assertIsNotNone(endian)
781
782        # Gather register info entries.
783        reg_infos = self.parse_register_info_packets(context)
784        (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_infos)
785        self.assertIsNotNone(pc_lldb_reg_index)
786        self.assertIsNotNone(pc_reg_info)
787
788        # Grab the function address.
789        self.assertIsNotNone(context.get("function_address"))
790        function_address = int(context.get("function_address"), 16)
791
792        # Get current target architecture
793        target_arch = self.getArchitecture()
794
795        # Set the breakpoint.
796        if (target_arch == "arm") or (target_arch == "aarch64"):
797            # TODO: Handle case when setting breakpoint in thumb code
798            BREAKPOINT_KIND = 4
799        else:
800            BREAKPOINT_KIND = 1
801
802        # Set default packet type to Z0 (software breakpoint)
803        z_packet_type = 0
804
805        # If hardware breakpoint is requested set packet type to Z1
806        if want_hardware == True:
807            z_packet_type = 1
808
809        self.reset_test_sequence()
810        self.add_set_breakpoint_packets(
811            function_address,
812            z_packet_type,
813            do_continue=True,
814            breakpoint_kind=BREAKPOINT_KIND)
815
816        # Run the packet stream.
817        context = self.expect_gdbremote_sequence()
818        self.assertIsNotNone(context)
819
820        # Verify the stop signal reported was the breakpoint signal number.
821        stop_signo = context.get("stop_signo")
822        self.assertIsNotNone(stop_signo)
823        self.assertEqual(int(stop_signo, 16),
824                         lldbutil.get_signal_number('SIGTRAP'))
825
826        # Ensure we did not receive any output.  If the breakpoint was not set, we would
827        # see output (from a launched process with captured stdio) printing a hello, world message.
828        # That would indicate the breakpoint didn't take.
829        self.assertEqual(len(context["O_content"]), 0)
830
831        # Verify that the PC for the main thread is where we expect it - right at the breakpoint address.
832        # This acts as a another validation on the register reading code.
833        self.reset_test_sequence()
834        self.test_sequence.add_log_lines(
835            [
836                # Print the PC.  This should match the breakpoint address.
837                "read packet: $p{0:x}#00".format(pc_lldb_reg_index),
838                # Capture $p results.
839                {"direction": "send",
840                 "regex": r"^\$([0-9a-fA-F]+)#",
841                 "capture": {1: "p_response"}},
842            ], True)
843
844        context = self.expect_gdbremote_sequence()
845        self.assertIsNotNone(context)
846
847        # Verify the PC is where we expect.  Note response is in endianness of
848        # the inferior.
849        p_response = context.get("p_response")
850        self.assertIsNotNone(p_response)
851
852        # Convert from target endian to int.
853        returned_pc = lldbgdbserverutils.unpack_register_hex_unsigned(
854            endian, p_response)
855        self.assertEqual(returned_pc, function_address)
856
857        # Verify that a breakpoint remove and continue gets us the expected
858        # output.
859        self.reset_test_sequence()
860
861        # Add breakpoint remove packets
862        self.add_remove_breakpoint_packets(
863            function_address,
864            z_packet_type,
865            breakpoint_kind=BREAKPOINT_KIND)
866
867        self.test_sequence.add_log_lines(
868            [
869                # Continue running.
870                "read packet: $c#63",
871                # We should now receive the output from the call.
872                {"type": "output_match", "regex": r"^hello, world\r\n$"},
873                # And wait for program completion.
874                {"direction": "send", "regex": r"^\$W00(.*)#[0-9a-fA-F]{2}$"},
875            ], True)
876
877        context = self.expect_gdbremote_sequence()
878        self.assertIsNotNone(context)
879
880    @skipIfWindows # No pty support to test any inferior output
881    def test_software_breakpoint_set_and_remove_work(self):
882        if self.getArchitecture() == "arm":
883            # TODO: Handle case when setting breakpoint in thumb code
884            self.build(dictionary={'CFLAGS_EXTRAS': '-marm'})
885        else:
886            self.build()
887        self.set_inferior_startup_launch()
888        self.breakpoint_set_and_remove_work(want_hardware=False)
889
890    @skipUnlessPlatform(oslist=['linux'])
891    @skipIf(archs=no_match(['arm', 'aarch64']))
892    def test_hardware_breakpoint_set_and_remove_work(self):
893        if self.getArchitecture() == "arm":
894            # TODO: Handle case when setting breakpoint in thumb code
895            self.build(dictionary={'CFLAGS_EXTRAS': '-marm'})
896        else:
897            self.build()
898        self.set_inferior_startup_launch()
899        self.breakpoint_set_and_remove_work(want_hardware=True)
900
901    def test_qSupported_returns_known_stub_features(self):
902        self.build()
903        self.set_inferior_startup_launch()
904
905        # Start up the stub and start/prep the inferior.
906        procs = self.prep_debug_monitor_and_inferior()
907        self.add_qSupported_packets()
908
909        # Run the packet stream.
910        context = self.expect_gdbremote_sequence()
911        self.assertIsNotNone(context)
912
913        # Retrieve the qSupported features.
914        supported_dict = self.parse_qSupported_response(context)
915        self.assertIsNotNone(supported_dict)
916        self.assertTrue(len(supported_dict) > 0)
917
918    @skipIfWindows # No pty support to test any inferior output
919    def test_written_M_content_reads_back_correctly(self):
920        self.build()
921        self.set_inferior_startup_launch()
922
923        TEST_MESSAGE = "Hello, memory"
924
925        # Start up the stub and start/prep the inferior.
926        procs = self.prep_debug_monitor_and_inferior(
927            inferior_args=[
928                "set-message:xxxxxxxxxxxxxX",
929                "get-data-address-hex:g_message",
930                "sleep:1",
931                "print-message:"])
932        self.test_sequence.add_log_lines(
933            [
934                # Start running after initial stop.
935                "read packet: $c#63",
936                # Match output line that prints the memory address of the message buffer within the inferior.
937                # Note we require launch-only testing so we can get inferior otuput.
938                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
939                 "capture": {1: "message_address"}},
940                # Now stop the inferior.
941                "read packet: {}".format(chr(3)),
942                # And wait for the stop notification.
943                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
944            True)
945        context = self.expect_gdbremote_sequence()
946        self.assertIsNotNone(context)
947
948        # Grab the message address.
949        self.assertIsNotNone(context.get("message_address"))
950        message_address = int(context.get("message_address"), 16)
951
952        # Hex-encode the test message, adding null termination.
953        hex_encoded_message = seven.hexlify(TEST_MESSAGE)
954
955        # Write the message to the inferior. Verify that we can read it with the hex-encoded (m)
956        # and binary (x) memory read packets.
957        self.reset_test_sequence()
958        self.test_sequence.add_log_lines(
959            ["read packet: $M{0:x},{1:x}:{2}#00".format(message_address, len(TEST_MESSAGE), hex_encoded_message),
960             "send packet: $OK#00",
961             "read packet: $m{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)),
962             "send packet: ${0}#00".format(hex_encoded_message),
963             "read packet: $x{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)),
964             "send packet: ${0}#00".format(TEST_MESSAGE),
965             "read packet: $m{0:x},4#00".format(message_address),
966             "send packet: ${0}#00".format(hex_encoded_message[0:8]),
967             "read packet: $x{0:x},4#00".format(message_address),
968             "send packet: ${0}#00".format(TEST_MESSAGE[0:4]),
969             "read packet: $c#63",
970             {"type": "output_match", "regex": r"^message: (.+)\r\n$", "capture": {1: "printed_message"}},
971             "send packet: $W00#00",
972             ], True)
973        context = self.expect_gdbremote_sequence()
974        self.assertIsNotNone(context)
975
976        # Ensure what we read from inferior memory is what we wrote.
977        printed_message = context.get("printed_message")
978        self.assertIsNotNone(printed_message)
979        self.assertEqual(printed_message, TEST_MESSAGE + "X")
980
981    # Note: as of this moment, a hefty number of the GPR writes are failing with E32 (everything except rax-rdx, rdi, rsi, rbp).
982    # Come back to this.  I have the test rigged to verify that at least some
983    # of the bit-flip writes work.
984    def test_P_writes_all_gpr_registers(self):
985        self.build()
986        self.set_inferior_startup_launch()
987
988        # Start inferior debug session, grab all register info.
989        procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"])
990        self.add_register_info_collection_packets()
991        self.add_process_info_collection_packets()
992
993        context = self.expect_gdbremote_sequence()
994        self.assertIsNotNone(context)
995
996        # Process register infos.
997        reg_infos = self.parse_register_info_packets(context)
998        self.assertIsNotNone(reg_infos)
999        self.add_lldb_register_index(reg_infos)
1000
1001        # Process endian.
1002        process_info = self.parse_process_info_response(context)
1003        endian = process_info.get("endian")
1004        self.assertIsNotNone(endian)
1005
1006        # Pull out the register infos that we think we can bit flip
1007        # successfully,.
1008        gpr_reg_infos = [
1009            reg_info for reg_info in reg_infos if self.is_bit_flippable_register(reg_info)]
1010        self.assertTrue(len(gpr_reg_infos) > 0)
1011
1012        # Write flipped bit pattern of existing value to each register.
1013        (successful_writes, failed_writes) = self.flip_all_bits_in_each_register_value(
1014            gpr_reg_infos, endian)
1015        self.trace("successful writes: {}, failed writes: {}".format(successful_writes, failed_writes))
1016        self.assertTrue(successful_writes > 0)
1017
1018    # Note: as of this moment, a hefty number of the GPR writes are failing
1019    # with E32 (everything except rax-rdx, rdi, rsi, rbp).
1020    @skipIfWindows
1021    def test_P_and_p_thread_suffix_work(self):
1022        self.build()
1023        self.set_inferior_startup_launch()
1024
1025        # Startup the inferior with three threads.
1026        procs = self.prep_debug_monitor_and_inferior(
1027            inferior_args=["thread:new", "thread:new"])
1028        self.add_thread_suffix_request_packets()
1029        self.add_register_info_collection_packets()
1030        self.add_process_info_collection_packets()
1031
1032        context = self.expect_gdbremote_sequence()
1033        self.assertIsNotNone(context)
1034
1035        process_info = self.parse_process_info_response(context)
1036        self.assertIsNotNone(process_info)
1037        endian = process_info.get("endian")
1038        self.assertIsNotNone(endian)
1039
1040        reg_infos = self.parse_register_info_packets(context)
1041        self.assertIsNotNone(reg_infos)
1042        self.add_lldb_register_index(reg_infos)
1043
1044        reg_index = self.select_modifiable_register(reg_infos)
1045        self.assertIsNotNone(reg_index)
1046        reg_byte_size = int(reg_infos[reg_index]["bitsize"]) // 8
1047        self.assertTrue(reg_byte_size > 0)
1048
1049        # Run the process a bit so threads can start up, and collect register
1050        # info.
1051        context = self.run_process_then_stop(run_seconds=1)
1052        self.assertIsNotNone(context)
1053
1054        # Wait for 3 threads to be present.
1055        threads = self.wait_for_thread_count(3)
1056        self.assertEqual(len(threads), 3)
1057
1058        expected_reg_values = []
1059        register_increment = 1
1060        next_value = None
1061
1062        # Set the same register in each of 3 threads to a different value.
1063        # Verify each one has the unique value.
1064        for thread in threads:
1065            # If we don't have a next value yet, start it with the initial read
1066            # value + 1
1067            if not next_value:
1068                # Read pre-existing register value.
1069                self.reset_test_sequence()
1070                self.test_sequence.add_log_lines(
1071                    ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread),
1072                     {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1073                     ], True)
1074                context = self.expect_gdbremote_sequence()
1075                self.assertIsNotNone(context)
1076
1077                # Set the next value to use for writing as the increment plus
1078                # current value.
1079                p_response = context.get("p_response")
1080                self.assertIsNotNone(p_response)
1081                next_value = lldbgdbserverutils.unpack_register_hex_unsigned(
1082                    endian, p_response)
1083
1084            # Set new value using P and thread suffix.
1085            self.reset_test_sequence()
1086            self.test_sequence.add_log_lines(
1087                [
1088                    "read packet: $P{0:x}={1};thread:{2:x}#00".format(
1089                        reg_index,
1090                        lldbgdbserverutils.pack_register_hex(
1091                            endian,
1092                            next_value,
1093                            byte_size=reg_byte_size),
1094                        thread),
1095                    "send packet: $OK#00",
1096                ],
1097                True)
1098            context = self.expect_gdbremote_sequence()
1099            self.assertIsNotNone(context)
1100
1101            # Save the value we set.
1102            expected_reg_values.append(next_value)
1103
1104            # Increment value for next thread to use (we want them all
1105            # different so we can verify they wrote to each thread correctly
1106            # next.)
1107            next_value += register_increment
1108
1109        # Revisit each thread and verify they have the expected value set for
1110        # the register we wrote.
1111        thread_index = 0
1112        for thread in threads:
1113            # Read pre-existing register value.
1114            self.reset_test_sequence()
1115            self.test_sequence.add_log_lines(
1116                ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread),
1117                 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1118                 ], True)
1119            context = self.expect_gdbremote_sequence()
1120            self.assertIsNotNone(context)
1121
1122            # Get the register value.
1123            p_response = context.get("p_response")
1124            self.assertIsNotNone(p_response)
1125            read_value = lldbgdbserverutils.unpack_register_hex_unsigned(
1126                endian, p_response)
1127
1128            # Make sure we read back what we wrote.
1129            self.assertEqual(read_value, expected_reg_values[thread_index])
1130            thread_index += 1
1131