1"""
2Test case for testing the gdbremote protocol.
3
4Tests run against debugserver and lldb-server (llgs).
5lldb-server tests run where the lldb-server exe is
6available.
7
8This class will be broken into smaller test case classes by
9gdb remote packet functional areas.  For now it contains
10the initial set of tests implemented.
11"""
12
13import unittest2
14import gdbremote_testcase
15import lldbgdbserverutils
16from lldbsuite.support import seven
17from lldbsuite.test.decorators import *
18from lldbsuite.test.lldbtest import *
19from lldbsuite.test.lldbdwarf import *
20from lldbsuite.test import lldbutil, lldbplatformutil
21
22
23class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase, DwarfOpcodeParser):
24
25    mydir = TestBase.compute_mydir(__file__)
26
27    def test_thread_suffix_supported(self):
28        server = self.connect_to_debug_monitor()
29        self.assertIsNotNone(server)
30
31        self.do_handshake()
32        self.test_sequence.add_log_lines(
33            ["lldb-server <  26> read packet: $QThreadSuffixSupported#e4",
34             "lldb-server <   6> send packet: $OK#9a"],
35            True)
36
37        self.expect_gdbremote_sequence()
38
39
40    def test_list_threads_in_stop_reply_supported(self):
41        server = self.connect_to_debug_monitor()
42        self.assertIsNotNone(server)
43
44        self.do_handshake()
45        self.test_sequence.add_log_lines(
46            ["lldb-server <  27> read packet: $QListThreadsInStopReply#21",
47             "lldb-server <   6> send packet: $OK#9a"],
48            True)
49        self.expect_gdbremote_sequence()
50
51    def test_c_packet_works(self):
52        self.build()
53        procs = self.prep_debug_monitor_and_inferior()
54        self.test_sequence.add_log_lines(
55            ["read packet: $c#63",
56             "send packet: $W00#00"],
57            True)
58
59        self.expect_gdbremote_sequence()
60
61    @skipIfWindows # No pty support to test any inferior output
62    def test_inferior_print_exit(self):
63        self.build()
64        procs = self.prep_debug_monitor_and_inferior(
65                inferior_args=["hello, world"])
66        self.test_sequence.add_log_lines(
67            ["read packet: $vCont;c#a8",
68             {"type": "output_match", "regex": self.maybe_strict_output_regex(r"hello, world\r\n")},
69             "send packet: $W00#00"],
70            True)
71
72        context = self.expect_gdbremote_sequence()
73        self.assertIsNotNone(context)
74
75    def test_first_launch_stop_reply_thread_matches_first_qC(self):
76        self.build()
77        procs = self.prep_debug_monitor_and_inferior()
78        self.test_sequence.add_log_lines(["read packet: $qC#00",
79                                          {"direction": "send",
80                                           "regex": r"^\$QC([0-9a-fA-F]+)#",
81                                           "capture": {1: "thread_id_QC"}},
82                                          "read packet: $?#00",
83                                          {"direction": "send",
84                                              "regex": r"^\$T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+)",
85                                              "capture": {1: "thread_id_?"}}],
86                                         True)
87        context = self.expect_gdbremote_sequence()
88        self.assertEqual(context.get("thread_id_QC"), context.get("thread_id_?"))
89
90    def test_attach_commandline_continue_app_exits(self):
91        self.build()
92        self.set_inferior_startup_attach()
93        procs = self.prep_debug_monitor_and_inferior()
94        self.test_sequence.add_log_lines(
95            ["read packet: $vCont;c#a8",
96             "send packet: $W00#00"],
97            True)
98        self.expect_gdbremote_sequence()
99
100        # Wait a moment for completed and now-detached inferior process to
101        # clear.
102        time.sleep(1)
103
104        if not lldb.remote_platform:
105            # Process should be dead now. Reap results.
106            poll_result = procs["inferior"].poll()
107            self.assertIsNotNone(poll_result)
108
109        # Where possible, verify at the system level that the process is not
110        # running.
111        self.assertFalse(
112            lldbgdbserverutils.process_is_running(
113                procs["inferior"].pid, False))
114
115    def test_qRegisterInfo_returns_one_valid_result(self):
116        self.build()
117        self.prep_debug_monitor_and_inferior()
118        self.test_sequence.add_log_lines(
119            ["read packet: $qRegisterInfo0#00",
120             {"direction": "send", "regex": r"^\$(.+);#[0-9A-Fa-f]{2}", "capture": {1: "reginfo_0"}}],
121            True)
122
123        # Run the stream
124        context = self.expect_gdbremote_sequence()
125        self.assertIsNotNone(context)
126
127        reg_info_packet = context.get("reginfo_0")
128        self.assertIsNotNone(reg_info_packet)
129        self.assert_valid_reg_info(
130            lldbgdbserverutils.parse_reg_info_response(reg_info_packet))
131
132    def test_qRegisterInfo_returns_all_valid_results(self):
133        self.build()
134        self.prep_debug_monitor_and_inferior()
135        self.add_register_info_collection_packets()
136
137        # Run the stream.
138        context = self.expect_gdbremote_sequence()
139        self.assertIsNotNone(context)
140
141        # Validate that each register info returned validates.
142        for reg_info in self.parse_register_info_packets(context):
143            self.assert_valid_reg_info(reg_info)
144
145    def test_qRegisterInfo_contains_required_generics_debugserver(self):
146        self.build()
147        self.prep_debug_monitor_and_inferior()
148        self.add_register_info_collection_packets()
149
150        # Run the packet stream.
151        context = self.expect_gdbremote_sequence()
152        self.assertIsNotNone(context)
153
154        # Gather register info entries.
155        reg_infos = self.parse_register_info_packets(context)
156
157        # Collect all generic registers found.
158        generic_regs = {
159            reg_info['generic']: 1 for reg_info in reg_infos if 'generic' in reg_info}
160
161        # Ensure we have a program counter register.
162        self.assertIn('pc', generic_regs)
163
164        # Ensure we have a frame pointer register. PPC64le's FP is the same as SP
165        if self.getArchitecture() != 'powerpc64le':
166            self.assertIn('fp', generic_regs)
167
168        # Ensure we have a stack pointer register.
169        self.assertIn('sp', generic_regs)
170
171        # Ensure we have a flags register.
172        self.assertIn('flags', generic_regs)
173
174    def test_qRegisterInfo_contains_at_least_one_register_set(self):
175        self.build()
176        self.prep_debug_monitor_and_inferior()
177        self.add_register_info_collection_packets()
178
179        # Run the packet stream.
180        context = self.expect_gdbremote_sequence()
181        self.assertIsNotNone(context)
182
183        # Gather register info entries.
184        reg_infos = self.parse_register_info_packets(context)
185
186        # Collect all register sets found.
187        register_sets = {
188            reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info}
189        self.assertTrue(len(register_sets) >= 1)
190
191    def targetHasAVX(self):
192        triple = self.dbg.GetSelectedPlatform().GetTriple()
193
194        # TODO other platforms, please implement this function
195        if not re.match(".*-.*-linux", triple):
196            return True
197
198        # Need to do something different for non-Linux/Android targets
199        if lldb.remote_platform:
200            self.runCmd('platform get-file "/proc/cpuinfo" "cpuinfo"')
201            cpuinfo_path = "cpuinfo"
202            self.addTearDownHook(lambda: os.unlink("cpuinfo"))
203        else:
204            cpuinfo_path = "/proc/cpuinfo"
205
206        f = open(cpuinfo_path, 'r')
207        cpuinfo = f.read()
208        f.close()
209        return " avx " in cpuinfo
210
211    @expectedFailureAll(oslist=["windows"]) # no avx for now.
212    @skipIf(archs=no_match(['amd64', 'i386', 'x86_64']))
213    @add_test_categories(["llgs"])
214    def test_qRegisterInfo_contains_avx_registers(self):
215        self.build()
216        self.prep_debug_monitor_and_inferior()
217        self.add_register_info_collection_packets()
218
219        # Run the packet stream.
220        context = self.expect_gdbremote_sequence()
221        self.assertIsNotNone(context)
222
223        # Gather register info entries.
224        reg_infos = self.parse_register_info_packets(context)
225
226        # Collect all generics found.
227        register_sets = {
228            reg_info['set']: 1 for reg_info in reg_infos if 'set' in reg_info}
229        self.assertEqual(
230            self.targetHasAVX(),
231            "Advanced Vector Extensions" in register_sets)
232
233    def qThreadInfo_contains_thread(self):
234        procs = self.prep_debug_monitor_and_inferior()
235        self.add_threadinfo_collection_packets()
236
237        # Run the packet stream.
238        context = self.expect_gdbremote_sequence()
239        self.assertIsNotNone(context)
240
241        # Gather threadinfo entries.
242        threads = self.parse_threadinfo_packets(context)
243        self.assertIsNotNone(threads)
244
245        # We should have exactly one thread.
246        self.assertEqual(len(threads), 1)
247
248    def test_qThreadInfo_contains_thread_launch(self):
249        self.build()
250        self.set_inferior_startup_launch()
251        self.qThreadInfo_contains_thread()
252
253    @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped
254    def test_qThreadInfo_contains_thread_attach(self):
255        self.build()
256        self.set_inferior_startup_attach()
257        self.qThreadInfo_contains_thread()
258
259    def qThreadInfo_matches_qC(self):
260        procs = self.prep_debug_monitor_and_inferior()
261
262        self.add_threadinfo_collection_packets()
263        self.test_sequence.add_log_lines(
264            ["read packet: $qC#00",
265             {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}
266             ], True)
267
268        # Run the packet stream.
269        context = self.expect_gdbremote_sequence()
270        self.assertIsNotNone(context)
271
272        # Gather threadinfo entries.
273        threads = self.parse_threadinfo_packets(context)
274        self.assertIsNotNone(threads)
275
276        # We should have exactly one thread from threadinfo.
277        self.assertEqual(len(threads), 1)
278
279        # We should have a valid thread_id from $QC.
280        QC_thread_id_hex = context.get("thread_id")
281        self.assertIsNotNone(QC_thread_id_hex)
282        QC_thread_id = int(QC_thread_id_hex, 16)
283
284        # Those two should be the same.
285        self.assertEqual(threads[0], QC_thread_id)
286
287    def test_qThreadInfo_matches_qC_launch(self):
288        self.build()
289        self.set_inferior_startup_launch()
290        self.qThreadInfo_matches_qC()
291
292    @expectedFailureAll(oslist=["windows"]) # expect one more thread stopped
293    def test_qThreadInfo_matches_qC_attach(self):
294        self.build()
295        self.set_inferior_startup_attach()
296        self.qThreadInfo_matches_qC()
297
298    def test_p_returns_correct_data_size_for_each_qRegisterInfo_launch(self):
299        self.build()
300        self.set_inferior_startup_launch()
301        procs = self.prep_debug_monitor_and_inferior()
302        self.add_register_info_collection_packets()
303
304        # Run the packet stream.
305        context = self.expect_gdbremote_sequence()
306        self.assertIsNotNone(context)
307
308        # Gather register info entries.
309        reg_infos = self.parse_register_info_packets(context)
310        self.assertIsNotNone(reg_infos)
311        self.assertTrue(len(reg_infos) > 0)
312
313        byte_order = self.get_target_byte_order()
314
315        # Read value for each register.
316        reg_index = 0
317        for reg_info in reg_infos:
318            # Skip registers that don't have a register set.  For x86, these are
319            # the DRx registers, which have no LLDB-kind register number and thus
320            # cannot be read via normal
321            # NativeRegisterContext::ReadRegister(reg_info,...) calls.
322            if not "set" in reg_info:
323                continue
324
325            # Clear existing packet expectations.
326            self.reset_test_sequence()
327
328            # Run the register query
329            self.test_sequence.add_log_lines(
330                ["read packet: $p{0:x}#00".format(reg_index),
331                 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}],
332                True)
333            context = self.expect_gdbremote_sequence()
334            self.assertIsNotNone(context)
335
336            # Verify the response length.
337            p_response = context.get("p_response")
338            self.assertIsNotNone(p_response)
339
340            # Skip erraneous (unsupported) registers.
341            # TODO: remove this once we make unsupported registers disappear.
342            if p_response.startswith("E") and len(p_response) == 3:
343                continue
344
345            if "dynamic_size_dwarf_expr_bytes" in reg_info:
346                self.updateRegInfoBitsize(reg_info, byte_order)
347            self.assertEqual(len(p_response), 2 * int(reg_info["bitsize"]) / 8,
348                             reg_info)
349
350            # Increment loop
351            reg_index += 1
352
353    def Hg_switches_to_3_threads(self, pass_pid=False):
354        # Startup the inferior with three threads (main + 2 new ones).
355        procs = self.prep_debug_monitor_and_inferior(
356            inferior_args=["thread:new", "thread:new"])
357
358        # Let the inferior process have a few moments to start up the thread
359        # when launched.  (The launch scenario has no time to run, so threads
360        # won't be there yet.)
361        self.run_process_then_stop(run_seconds=1)
362
363        # Wait at most x seconds for 3 threads to be present.
364        threads = self.wait_for_thread_count(3)
365        self.assertEqual(len(threads), 3)
366
367        pid_str = ""
368        if pass_pid:
369            pid_str = "p{0:x}.".format(procs["inferior"].pid)
370
371        # verify we can $H to each thead, and $qC matches the thread we set.
372        for thread in threads:
373            # Change to each thread, verify current thread id.
374            self.reset_test_sequence()
375            self.test_sequence.add_log_lines(
376                ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread),  # Set current thread.
377                 "send packet: $OK#00",
378                 "read packet: $qC#00",
379                 {"direction": "send", "regex": r"^\$QC([0-9a-fA-F]+)#", "capture": {1: "thread_id"}}],
380                True)
381
382            context = self.expect_gdbremote_sequence()
383            self.assertIsNotNone(context)
384
385            # Verify the thread id.
386            self.assertIsNotNone(context.get("thread_id"))
387            self.assertEqual(int(context.get("thread_id"), 16), thread)
388
389    @expectedFailureAll(oslist=["windows"]) # expect 4 threads
390    def test_Hg_switches_to_3_threads_launch(self):
391        self.build()
392        self.set_inferior_startup_launch()
393        self.Hg_switches_to_3_threads()
394
395    @expectedFailureAll(oslist=["windows"]) # expecting one more thread
396    def test_Hg_switches_to_3_threads_attach(self):
397        self.build()
398        self.set_inferior_startup_attach()
399        self.Hg_switches_to_3_threads()
400
401    @expectedFailureAll(oslist=["windows"]) # expect 4 threads
402    @add_test_categories(["llgs"])
403    def test_Hg_switches_to_3_threads_attach_pass_correct_pid(self):
404        self.build()
405        self.set_inferior_startup_attach()
406        self.Hg_switches_to_3_threads(pass_pid=True)
407
408    def Hg_fails_on_pid(self, pass_pid):
409        # Start the inferior.
410        procs = self.prep_debug_monitor_and_inferior(
411            inferior_args=["thread:new"])
412
413        self.run_process_then_stop(run_seconds=1)
414
415        threads = self.wait_for_thread_count(2)
416        self.assertEqual(len(threads), 2)
417
418        if pass_pid == -1:
419            pid_str = "p-1."
420        else:
421            pid_str = "p{0:x}.".format(pass_pid)
422        thread = threads[1]
423
424        self.test_sequence.add_log_lines(
425            ["read packet: $Hg{0}{1:x}#00".format(pid_str, thread),  # Set current thread.
426             "send packet: $Eff#00"],
427            True)
428
429        self.expect_gdbremote_sequence()
430
431    @expectedFailureAll(oslist=["windows"])
432    @add_test_categories(["llgs"])
433    def test_Hg_fails_on_another_pid(self):
434        self.build()
435        self.set_inferior_startup_launch()
436        self.Hg_fails_on_pid(1)
437
438    @expectedFailureAll(oslist=["windows"])
439    @add_test_categories(["llgs"])
440    def test_Hg_fails_on_zero_pid(self):
441        self.build()
442        self.set_inferior_startup_launch()
443        self.Hg_fails_on_pid(0)
444
445    @expectedFailureAll(oslist=["windows"])
446    @add_test_categories(["llgs"])
447    def test_Hg_fails_on_minus_one_pid(self):
448        self.build()
449        self.set_inferior_startup_launch()
450        self.Hg_fails_on_pid(-1)
451
452    def Hc_then_Csignal_signals_correct_thread(self, segfault_signo):
453        # NOTE only run this one in inferior-launched mode: we can't grab inferior stdout when running attached,
454        # and the test requires getting stdout from the exe.
455
456        NUM_THREADS = 3
457
458        # Startup the inferior with three threads (main + NUM_THREADS-1 worker threads).
459        # inferior_args=["thread:print-ids"]
460        inferior_args = ["thread:segfault"]
461        for i in range(NUM_THREADS - 1):
462            # if i > 0:
463                # Give time between thread creation/segfaulting for the handler to work.
464                # inferior_args.append("sleep:1")
465            inferior_args.append("thread:new")
466        inferior_args.append("sleep:10")
467
468        # Launch/attach.  (In our case, this should only ever be launched since
469        # we need inferior stdout/stderr).
470        procs = self.prep_debug_monitor_and_inferior(
471            inferior_args=inferior_args)
472        self.test_sequence.add_log_lines(["read packet: $c#63"], True)
473        context = self.expect_gdbremote_sequence()
474
475        # Let the inferior process have a few moments to start up the thread when launched.
476        # context = self.run_process_then_stop(run_seconds=1)
477
478        # Wait at most x seconds for all threads to be present.
479        # threads = self.wait_for_thread_count(NUM_THREADS)
480        # self.assertEquals(len(threads), NUM_THREADS)
481
482        signaled_tids = {}
483        print_thread_ids = {}
484
485        # Switch to each thread, deliver a signal, and verify signal delivery
486        for i in range(NUM_THREADS - 1):
487            # Run until SIGSEGV comes in.
488            self.reset_test_sequence()
489            self.test_sequence.add_log_lines([{"direction": "send",
490                                               "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
491                                               "capture": {1: "signo",
492                                                            2: "thread_id"}}],
493                                             True)
494
495            context = self.expect_gdbremote_sequence()
496            self.assertIsNotNone(context)
497            signo = context.get("signo")
498            self.assertEqual(int(signo, 16), segfault_signo)
499
500            # Ensure we haven't seen this tid yet.
501            thread_id = int(context.get("thread_id"), 16)
502            self.assertNotIn(thread_id, signaled_tids)
503            signaled_tids[thread_id] = 1
504
505            # Send SIGUSR1 to the thread that signaled the SIGSEGV.
506            self.reset_test_sequence()
507            self.test_sequence.add_log_lines(
508                [
509                    # Set the continue thread.
510                    # Set current thread.
511                    "read packet: $Hc{0:x}#00".format(thread_id),
512                    "send packet: $OK#00",
513
514                    # Continue sending the signal number to the continue thread.
515                    # The commented out packet is a way to do this same operation without using
516                    # a $Hc (but this test is testing $Hc, so we'll stick with the former).
517                    "read packet: $C{0:x}#00".format(lldbutil.get_signal_number('SIGUSR1')),
518                    # "read packet: $vCont;C{0:x}:{1:x};c#00".format(lldbutil.get_signal_number('SIGUSR1'), thread_id),
519
520                    # FIXME: Linux does not report the thread stop on the delivered signal (SIGUSR1 here).  MacOSX debugserver does.
521                    # But MacOSX debugserver isn't guaranteeing the thread the signal handler runs on, so currently its an XFAIL.
522                    # Need to rectify behavior here.  The linux behavior is more intuitive to me since we're essentially swapping out
523                    # an about-to-be-delivered signal (for which we already sent a stop packet) to a different signal.
524                    # {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
525                    #  "read packet: $c#63",
526                    {"type": "output_match", "regex": r"^received SIGUSR1 on thread id: ([0-9a-fA-F]+)\r\nthread ([0-9a-fA-F]+): past SIGSEGV\r\n", "capture": {1: "print_thread_id", 2: "post_handle_thread_id"}},
527                ],
528                True)
529
530            # Run the sequence.
531            context = self.expect_gdbremote_sequence()
532            self.assertIsNotNone(context)
533
534            # Ensure the stop signal is the signal we delivered.
535            # stop_signo = context.get("stop_signo")
536            # self.assertIsNotNone(stop_signo)
537            # self.assertEquals(int(stop_signo,16), lldbutil.get_signal_number('SIGUSR1'))
538
539            # Ensure the stop thread is the thread to which we delivered the signal.
540            # stop_thread_id = context.get("stop_thread_id")
541            # self.assertIsNotNone(stop_thread_id)
542            # self.assertEquals(int(stop_thread_id,16), thread_id)
543
544            # Ensure we haven't seen this thread id yet.  The inferior's
545            # self-obtained thread ids are not guaranteed to match the stub
546            # tids (at least on MacOSX).
547            print_thread_id = context.get("print_thread_id")
548            self.assertIsNotNone(print_thread_id)
549            print_thread_id = int(print_thread_id, 16)
550            self.assertNotIn(print_thread_id, print_thread_ids)
551
552            # Now remember this print (i.e. inferior-reflected) thread id and
553            # ensure we don't hit it again.
554            print_thread_ids[print_thread_id] = 1
555
556            # Ensure post signal-handle thread id matches the thread that
557            # initially raised the SIGSEGV.
558            post_handle_thread_id = context.get("post_handle_thread_id")
559            self.assertIsNotNone(post_handle_thread_id)
560            post_handle_thread_id = int(post_handle_thread_id, 16)
561            self.assertEqual(post_handle_thread_id, print_thread_id)
562
563    @expectedFailureDarwin
564    @skipIfWindows # no SIGSEGV support
565    @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48419")
566    @expectedFailureNetBSD
567    def test_Hc_then_Csignal_signals_correct_thread_launch(self):
568        self.build()
569        self.set_inferior_startup_launch()
570
571        if self.platformIsDarwin():
572            # Darwin debugserver translates some signals like SIGSEGV into some gdb
573            # expectations about fixed signal numbers.
574            self.Hc_then_Csignal_signals_correct_thread(self.TARGET_EXC_BAD_ACCESS)
575        else:
576            self.Hc_then_Csignal_signals_correct_thread(
577                lldbutil.get_signal_number('SIGSEGV'))
578
579    @skipIfWindows # No pty support to test any inferior output
580    def test_m_packet_reads_memory(self):
581        self.build()
582        self.set_inferior_startup_launch()
583        # This is the memory we will write into the inferior and then ensure we
584        # can read back with $m.
585        MEMORY_CONTENTS = "Test contents 0123456789 ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz"
586
587        # Start up the inferior.
588        procs = self.prep_debug_monitor_and_inferior(
589            inferior_args=[
590                "set-message:%s" %
591                MEMORY_CONTENTS,
592                "get-data-address-hex:g_message",
593                "sleep:5"])
594
595        # Run the process
596        self.test_sequence.add_log_lines(
597            [
598                # Start running after initial stop.
599                "read packet: $c#63",
600                # Match output line that prints the memory address of the message buffer within the inferior.
601                # Note we require launch-only testing so we can get inferior otuput.
602                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
603                 "capture": {1: "message_address"}},
604                # Now stop the inferior.
605                "read packet: {}".format(chr(3)),
606                # And wait for the stop notification.
607                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
608            True)
609
610        # Run the packet stream.
611        context = self.expect_gdbremote_sequence()
612        self.assertIsNotNone(context)
613
614        # Grab the message address.
615        self.assertIsNotNone(context.get("message_address"))
616        message_address = int(context.get("message_address"), 16)
617
618        # Grab contents from the inferior.
619        self.reset_test_sequence()
620        self.test_sequence.add_log_lines(
621            ["read packet: $m{0:x},{1:x}#00".format(message_address, len(MEMORY_CONTENTS)),
622             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "read_contents"}}],
623            True)
624
625        # Run the packet stream.
626        context = self.expect_gdbremote_sequence()
627        self.assertIsNotNone(context)
628
629        # Ensure what we read from inferior memory is what we wrote.
630        self.assertIsNotNone(context.get("read_contents"))
631        read_contents = seven.unhexlify(context.get("read_contents"))
632        self.assertEqual(read_contents, MEMORY_CONTENTS)
633
634    def test_qMemoryRegionInfo_is_supported(self):
635        self.build()
636        self.set_inferior_startup_launch()
637        # Start up the inferior.
638        procs = self.prep_debug_monitor_and_inferior()
639
640        # Ask if it supports $qMemoryRegionInfo.
641        self.test_sequence.add_log_lines(
642            ["read packet: $qMemoryRegionInfo#00",
643             "send packet: $OK#00"
644             ], True)
645        self.expect_gdbremote_sequence()
646
647    @skipIfWindows # No pty support to test any inferior output
648    def test_qMemoryRegionInfo_reports_code_address_as_executable(self):
649        self.build()
650        self.set_inferior_startup_launch()
651
652        # Start up the inferior.
653        procs = self.prep_debug_monitor_and_inferior(
654            inferior_args=["get-code-address-hex:hello", "sleep:5"])
655
656        # Run the process
657        self.test_sequence.add_log_lines(
658            [
659                # Start running after initial stop.
660                "read packet: $c#63",
661                # Match output line that prints the memory address of the message buffer within the inferior.
662                # Note we require launch-only testing so we can get inferior otuput.
663                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"),
664                 "capture": {1: "code_address"}},
665                # Now stop the inferior.
666                "read packet: {}".format(chr(3)),
667                # And wait for the stop notification.
668                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
669            True)
670
671        # Run the packet stream.
672        context = self.expect_gdbremote_sequence()
673        self.assertIsNotNone(context)
674
675        # Grab the code address.
676        self.assertIsNotNone(context.get("code_address"))
677        code_address = int(context.get("code_address"), 16)
678
679        # Grab memory region info from the inferior.
680        self.reset_test_sequence()
681        self.add_query_memory_region_packets(code_address)
682
683        # Run the packet stream.
684        context = self.expect_gdbremote_sequence()
685        self.assertIsNotNone(context)
686        mem_region_dict = self.parse_memory_region_packet(context)
687
688        # Ensure there are no errors reported.
689        self.assertNotIn("error", mem_region_dict)
690
691        # Ensure code address is readable and executable.
692        self.assertIn("permissions", mem_region_dict)
693        self.assertIn("r", mem_region_dict["permissions"])
694        self.assertIn("x", mem_region_dict["permissions"])
695
696        # Ensure the start address and size encompass the address we queried.
697        self.assert_address_within_memory_region(code_address, mem_region_dict)
698
699    @skipIfWindows # No pty support to test any inferior output
700    def test_qMemoryRegionInfo_reports_stack_address_as_rw(self):
701        self.build()
702        self.set_inferior_startup_launch()
703
704        # Start up the inferior.
705        procs = self.prep_debug_monitor_and_inferior(
706            inferior_args=["get-stack-address-hex:", "sleep:5"])
707
708        # Run the process
709        self.test_sequence.add_log_lines(
710            [
711                # Start running after initial stop.
712                "read packet: $c#63",
713                # Match output line that prints the memory address of the message buffer within the inferior.
714                # Note we require launch-only testing so we can get inferior otuput.
715                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"stack address: 0x([0-9a-fA-F]+)\r\n"),
716                 "capture": {1: "stack_address"}},
717                # Now stop the inferior.
718                "read packet: {}".format(chr(3)),
719                # And wait for the stop notification.
720                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
721            True)
722
723        # Run the packet stream.
724        context = self.expect_gdbremote_sequence()
725        self.assertIsNotNone(context)
726
727        # Grab the address.
728        self.assertIsNotNone(context.get("stack_address"))
729        stack_address = int(context.get("stack_address"), 16)
730
731        # Grab memory region info from the inferior.
732        self.reset_test_sequence()
733        self.add_query_memory_region_packets(stack_address)
734
735        # Run the packet stream.
736        context = self.expect_gdbremote_sequence()
737        self.assertIsNotNone(context)
738        mem_region_dict = self.parse_memory_region_packet(context)
739
740        # Ensure there are no errors reported.
741        self.assertNotIn("error", mem_region_dict)
742
743        # Ensure address is readable and executable.
744        self.assertIn("permissions", mem_region_dict)
745        self.assertIn("r", mem_region_dict["permissions"])
746        self.assertIn("w", mem_region_dict["permissions"])
747
748        # Ensure the start address and size encompass the address we queried.
749        self.assert_address_within_memory_region(
750            stack_address, mem_region_dict)
751
752    @skipIfWindows # No pty support to test any inferior output
753    def test_qMemoryRegionInfo_reports_heap_address_as_rw(self):
754        self.build()
755        self.set_inferior_startup_launch()
756
757        # Start up the inferior.
758        procs = self.prep_debug_monitor_and_inferior(
759            inferior_args=["get-heap-address-hex:", "sleep:5"])
760
761        # Run the process
762        self.test_sequence.add_log_lines(
763            [
764                # Start running after initial stop.
765                "read packet: $c#63",
766                # Match output line that prints the memory address of the message buffer within the inferior.
767                # Note we require launch-only testing so we can get inferior otuput.
768                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"heap address: 0x([0-9a-fA-F]+)\r\n"),
769                 "capture": {1: "heap_address"}},
770                # Now stop the inferior.
771                "read packet: {}".format(chr(3)),
772                # And wait for the stop notification.
773                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
774            True)
775
776        # Run the packet stream.
777        context = self.expect_gdbremote_sequence()
778        self.assertIsNotNone(context)
779
780        # Grab the address.
781        self.assertIsNotNone(context.get("heap_address"))
782        heap_address = int(context.get("heap_address"), 16)
783
784        # Grab memory region info from the inferior.
785        self.reset_test_sequence()
786        self.add_query_memory_region_packets(heap_address)
787
788        # Run the packet stream.
789        context = self.expect_gdbremote_sequence()
790        self.assertIsNotNone(context)
791        mem_region_dict = self.parse_memory_region_packet(context)
792
793        # Ensure there are no errors reported.
794        self.assertNotIn("error", mem_region_dict)
795
796        # Ensure address is readable and executable.
797        self.assertIn("permissions", mem_region_dict)
798        self.assertIn("r", mem_region_dict["permissions"])
799        self.assertIn("w", mem_region_dict["permissions"])
800
801        # Ensure the start address and size encompass the address we queried.
802        self.assert_address_within_memory_region(heap_address, mem_region_dict)
803
804    def breakpoint_set_and_remove_work(self, want_hardware):
805        # Start up the inferior.
806        procs = self.prep_debug_monitor_and_inferior(
807            inferior_args=[
808                "get-code-address-hex:hello",
809                "sleep:1",
810                "call-function:hello"])
811
812        # Run the process
813        self.add_register_info_collection_packets()
814        self.add_process_info_collection_packets()
815        self.test_sequence.add_log_lines(
816            [  # Start running after initial stop.
817                "read packet: $c#63",
818                # Match output line that prints the memory address of the function call entry point.
819                # Note we require launch-only testing so we can get inferior otuput.
820                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"code address: 0x([0-9a-fA-F]+)\r\n"),
821                 "capture": {1: "function_address"}},
822                # Now stop the inferior.
823                "read packet: {}".format(chr(3)),
824                # And wait for the stop notification.
825                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
826            True)
827
828        # Run the packet stream.
829        context = self.expect_gdbremote_sequence()
830        self.assertIsNotNone(context)
831
832        # Gather process info - we need endian of target to handle register
833        # value conversions.
834        process_info = self.parse_process_info_response(context)
835        endian = process_info.get("endian")
836        self.assertIsNotNone(endian)
837
838        # Gather register info entries.
839        reg_infos = self.parse_register_info_packets(context)
840        (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_infos)
841        self.assertIsNotNone(pc_lldb_reg_index)
842        self.assertIsNotNone(pc_reg_info)
843
844        # Grab the function address.
845        self.assertIsNotNone(context.get("function_address"))
846        function_address = int(context.get("function_address"), 16)
847
848        # Get current target architecture
849        target_arch = self.getArchitecture()
850
851        # Set the breakpoint.
852        if (target_arch == "arm") or (target_arch == "aarch64"):
853            # TODO: Handle case when setting breakpoint in thumb code
854            BREAKPOINT_KIND = 4
855        else:
856            BREAKPOINT_KIND = 1
857
858        # Set default packet type to Z0 (software breakpoint)
859        z_packet_type = 0
860
861        # If hardware breakpoint is requested set packet type to Z1
862        if want_hardware == True:
863            z_packet_type = 1
864
865        self.reset_test_sequence()
866        self.add_set_breakpoint_packets(
867            function_address,
868            z_packet_type,
869            do_continue=True,
870            breakpoint_kind=BREAKPOINT_KIND)
871
872        # Run the packet stream.
873        context = self.expect_gdbremote_sequence()
874        self.assertIsNotNone(context)
875
876        # Verify the stop signal reported was the breakpoint signal number.
877        stop_signo = context.get("stop_signo")
878        self.assertIsNotNone(stop_signo)
879        self.assertEqual(int(stop_signo, 16),
880                         lldbutil.get_signal_number('SIGTRAP'))
881
882        # Ensure we did not receive any output.  If the breakpoint was not set, we would
883        # see output (from a launched process with captured stdio) printing a hello, world message.
884        # That would indicate the breakpoint didn't take.
885        self.assertEqual(len(context["O_content"]), 0)
886
887        # Verify that the PC for the main thread is where we expect it - right at the breakpoint address.
888        # This acts as a another validation on the register reading code.
889        self.reset_test_sequence()
890        self.test_sequence.add_log_lines(
891            [
892                # Print the PC.  This should match the breakpoint address.
893                "read packet: $p{0:x}#00".format(pc_lldb_reg_index),
894                # Capture $p results.
895                {"direction": "send",
896                 "regex": r"^\$([0-9a-fA-F]+)#",
897                 "capture": {1: "p_response"}},
898            ], True)
899
900        context = self.expect_gdbremote_sequence()
901        self.assertIsNotNone(context)
902
903        # Verify the PC is where we expect.  Note response is in endianness of
904        # the inferior.
905        p_response = context.get("p_response")
906        self.assertIsNotNone(p_response)
907
908        # Convert from target endian to int.
909        returned_pc = lldbgdbserverutils.unpack_register_hex_unsigned(
910            endian, p_response)
911        self.assertEqual(returned_pc, function_address)
912
913        # Verify that a breakpoint remove and continue gets us the expected
914        # output.
915        self.reset_test_sequence()
916
917        # Add breakpoint remove packets
918        self.add_remove_breakpoint_packets(
919            function_address,
920            z_packet_type,
921            breakpoint_kind=BREAKPOINT_KIND)
922
923        self.test_sequence.add_log_lines(
924            [
925                # Continue running.
926                "read packet: $c#63",
927                # We should now receive the output from the call.
928                {"type": "output_match", "regex": r"^hello, world\r\n$"},
929                # And wait for program completion.
930                {"direction": "send", "regex": r"^\$W00(.*)#[0-9a-fA-F]{2}$"},
931            ], True)
932
933        context = self.expect_gdbremote_sequence()
934        self.assertIsNotNone(context)
935
936    @skipIfWindows # No pty support to test any inferior output
937    def test_software_breakpoint_set_and_remove_work(self):
938        if self.getArchitecture() == "arm":
939            # TODO: Handle case when setting breakpoint in thumb code
940            self.build(dictionary={'CFLAGS_EXTRAS': '-marm'})
941        else:
942            self.build()
943        self.set_inferior_startup_launch()
944        self.breakpoint_set_and_remove_work(want_hardware=False)
945
946    @skipUnlessPlatform(oslist=['linux'])
947    @skipIf(archs=no_match(['arm', 'aarch64']))
948    def test_hardware_breakpoint_set_and_remove_work(self):
949        if self.getArchitecture() == "arm":
950            # TODO: Handle case when setting breakpoint in thumb code
951            self.build(dictionary={'CFLAGS_EXTRAS': '-marm'})
952        else:
953            self.build()
954        self.set_inferior_startup_launch()
955        self.breakpoint_set_and_remove_work(want_hardware=True)
956
957    def get_qSupported_dict(self, features=[]):
958        self.build()
959        self.set_inferior_startup_launch()
960
961        # Start up the stub and start/prep the inferior.
962        procs = self.prep_debug_monitor_and_inferior()
963        self.add_qSupported_packets(features)
964
965        # Run the packet stream.
966        context = self.expect_gdbremote_sequence()
967        self.assertIsNotNone(context)
968
969        # Retrieve the qSupported features.
970        return self.parse_qSupported_response(context)
971
972    def test_qSupported_returns_known_stub_features(self):
973        supported_dict = self.get_qSupported_dict()
974        self.assertIsNotNone(supported_dict)
975        self.assertTrue(len(supported_dict) > 0)
976
977    def test_qSupported_auvx(self):
978        expected = ('+' if lldbplatformutil.getPlatform()
979                    in ["freebsd", "linux", "netbsd"] else '-')
980        supported_dict = self.get_qSupported_dict()
981        self.assertEqual(supported_dict.get('qXfer:auxv:read', '-'), expected)
982
983    def test_qSupported_libraries_svr4(self):
984        expected = ('+' if lldbplatformutil.getPlatform()
985                    in ["freebsd", "linux", "netbsd"] else '-')
986        supported_dict = self.get_qSupported_dict()
987        self.assertEqual(supported_dict.get('qXfer:libraries-svr4:read', '-'),
988                         expected)
989
990    def test_qSupported_QPassSignals(self):
991        expected = ('+' if lldbplatformutil.getPlatform()
992                    in ["freebsd", "linux", "netbsd"] else '-')
993        supported_dict = self.get_qSupported_dict()
994        self.assertEqual(supported_dict.get('QPassSignals', '-'), expected)
995
996    @add_test_categories(["fork"])
997    def test_qSupported_fork_events(self):
998        supported_dict = (
999            self.get_qSupported_dict(['multiprocess+', 'fork-events+']))
1000        self.assertEqual(supported_dict.get('multiprocess', '-'), '+')
1001        self.assertEqual(supported_dict.get('fork-events', '-'), '+')
1002        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
1003
1004    @add_test_categories(["fork"])
1005    def test_qSupported_fork_events_without_multiprocess(self):
1006        supported_dict = (
1007            self.get_qSupported_dict(['fork-events+']))
1008        self.assertEqual(supported_dict.get('multiprocess', '-'), '-')
1009        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
1010        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
1011
1012    @add_test_categories(["fork"])
1013    def test_qSupported_vfork_events(self):
1014        supported_dict = (
1015            self.get_qSupported_dict(['multiprocess+', 'vfork-events+']))
1016        self.assertEqual(supported_dict.get('multiprocess', '-'), '+')
1017        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
1018        self.assertEqual(supported_dict.get('vfork-events', '-'), '+')
1019
1020    @add_test_categories(["fork"])
1021    def test_qSupported_vfork_events_without_multiprocess(self):
1022        supported_dict = (
1023            self.get_qSupported_dict(['vfork-events+']))
1024        self.assertEqual(supported_dict.get('multiprocess', '-'), '-')
1025        self.assertEqual(supported_dict.get('fork-events', '-'), '-')
1026        self.assertEqual(supported_dict.get('vfork-events', '-'), '-')
1027
1028    # We need to be able to self.runCmd to get cpuinfo,
1029    # which is not possible when using a remote platform.
1030    @skipIfRemote
1031    def test_qSupported_memory_tagging(self):
1032        supported_dict = self.get_qSupported_dict()
1033        self.assertEqual(supported_dict.get("memory-tagging", '-'),
1034                         '+' if self.isAArch64MTE() else '-')
1035
1036    @skipIfWindows # No pty support to test any inferior output
1037    def test_written_M_content_reads_back_correctly(self):
1038        self.build()
1039        self.set_inferior_startup_launch()
1040
1041        TEST_MESSAGE = "Hello, memory"
1042
1043        # Start up the stub and start/prep the inferior.
1044        procs = self.prep_debug_monitor_and_inferior(
1045            inferior_args=[
1046                "set-message:xxxxxxxxxxxxxX",
1047                "get-data-address-hex:g_message",
1048                "sleep:1",
1049                "print-message:"])
1050        self.test_sequence.add_log_lines(
1051            [
1052                # Start running after initial stop.
1053                "read packet: $c#63",
1054                # Match output line that prints the memory address of the message buffer within the inferior.
1055                # Note we require launch-only testing so we can get inferior otuput.
1056                {"type": "output_match", "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
1057                 "capture": {1: "message_address"}},
1058                # Now stop the inferior.
1059                "read packet: {}".format(chr(3)),
1060                # And wait for the stop notification.
1061                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
1062            True)
1063        context = self.expect_gdbremote_sequence()
1064        self.assertIsNotNone(context)
1065
1066        # Grab the message address.
1067        self.assertIsNotNone(context.get("message_address"))
1068        message_address = int(context.get("message_address"), 16)
1069
1070        # Hex-encode the test message, adding null termination.
1071        hex_encoded_message = seven.hexlify(TEST_MESSAGE)
1072
1073        # Write the message to the inferior. Verify that we can read it with the hex-encoded (m)
1074        # and binary (x) memory read packets.
1075        self.reset_test_sequence()
1076        self.test_sequence.add_log_lines(
1077            ["read packet: $M{0:x},{1:x}:{2}#00".format(message_address, len(TEST_MESSAGE), hex_encoded_message),
1078             "send packet: $OK#00",
1079             "read packet: $m{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)),
1080             "send packet: ${0}#00".format(hex_encoded_message),
1081             "read packet: $x{0:x},{1:x}#00".format(message_address, len(TEST_MESSAGE)),
1082             "send packet: ${0}#00".format(TEST_MESSAGE),
1083             "read packet: $m{0:x},4#00".format(message_address),
1084             "send packet: ${0}#00".format(hex_encoded_message[0:8]),
1085             "read packet: $x{0:x},4#00".format(message_address),
1086             "send packet: ${0}#00".format(TEST_MESSAGE[0:4]),
1087             "read packet: $c#63",
1088             {"type": "output_match", "regex": r"^message: (.+)\r\n$", "capture": {1: "printed_message"}},
1089             "send packet: $W00#00",
1090             ], True)
1091        context = self.expect_gdbremote_sequence()
1092        self.assertIsNotNone(context)
1093
1094        # Ensure what we read from inferior memory is what we wrote.
1095        printed_message = context.get("printed_message")
1096        self.assertIsNotNone(printed_message)
1097        self.assertEqual(printed_message, TEST_MESSAGE + "X")
1098
1099    # Note: as of this moment, a hefty number of the GPR writes are failing with E32 (everything except rax-rdx, rdi, rsi, rbp).
1100    # Come back to this.  I have the test rigged to verify that at least some
1101    # of the bit-flip writes work.
1102    def test_P_writes_all_gpr_registers(self):
1103        self.build()
1104        self.set_inferior_startup_launch()
1105
1106        # Start inferior debug session, grab all register info.
1107        procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"])
1108        self.add_register_info_collection_packets()
1109        self.add_process_info_collection_packets()
1110
1111        context = self.expect_gdbremote_sequence()
1112        self.assertIsNotNone(context)
1113
1114        # Process register infos.
1115        reg_infos = self.parse_register_info_packets(context)
1116        self.assertIsNotNone(reg_infos)
1117        self.add_lldb_register_index(reg_infos)
1118
1119        # Process endian.
1120        process_info = self.parse_process_info_response(context)
1121        endian = process_info.get("endian")
1122        self.assertIsNotNone(endian)
1123
1124        # Pull out the register infos that we think we can bit flip
1125        # successfully,.
1126        gpr_reg_infos = [
1127            reg_info for reg_info in reg_infos if self.is_bit_flippable_register(reg_info)]
1128        self.assertTrue(len(gpr_reg_infos) > 0)
1129
1130        # Write flipped bit pattern of existing value to each register.
1131        (successful_writes, failed_writes) = self.flip_all_bits_in_each_register_value(
1132            gpr_reg_infos, endian)
1133        self.trace("successful writes: {}, failed writes: {}".format(successful_writes, failed_writes))
1134        self.assertTrue(successful_writes > 0)
1135
1136    # Note: as of this moment, a hefty number of the GPR writes are failing
1137    # with E32 (everything except rax-rdx, rdi, rsi, rbp).
1138    @skipIfWindows
1139    def test_P_and_p_thread_suffix_work(self):
1140        self.build()
1141        self.set_inferior_startup_launch()
1142
1143        # Startup the inferior with three threads.
1144        procs = self.prep_debug_monitor_and_inferior(
1145            inferior_args=["thread:new", "thread:new"])
1146        self.add_thread_suffix_request_packets()
1147        self.add_register_info_collection_packets()
1148        self.add_process_info_collection_packets()
1149
1150        context = self.expect_gdbremote_sequence()
1151        self.assertIsNotNone(context)
1152
1153        process_info = self.parse_process_info_response(context)
1154        self.assertIsNotNone(process_info)
1155        endian = process_info.get("endian")
1156        self.assertIsNotNone(endian)
1157
1158        reg_infos = self.parse_register_info_packets(context)
1159        self.assertIsNotNone(reg_infos)
1160        self.add_lldb_register_index(reg_infos)
1161
1162        reg_index = self.select_modifiable_register(reg_infos)
1163        self.assertIsNotNone(reg_index)
1164        reg_byte_size = int(reg_infos[reg_index]["bitsize"]) // 8
1165        self.assertTrue(reg_byte_size > 0)
1166
1167        # Run the process a bit so threads can start up, and collect register
1168        # info.
1169        context = self.run_process_then_stop(run_seconds=1)
1170        self.assertIsNotNone(context)
1171
1172        # Wait for 3 threads to be present.
1173        threads = self.wait_for_thread_count(3)
1174        self.assertEqual(len(threads), 3)
1175
1176        expected_reg_values = []
1177        register_increment = 1
1178        next_value = None
1179
1180        # Set the same register in each of 3 threads to a different value.
1181        # Verify each one has the unique value.
1182        for thread in threads:
1183            # If we don't have a next value yet, start it with the initial read
1184            # value + 1
1185            if not next_value:
1186                # Read pre-existing register value.
1187                self.reset_test_sequence()
1188                self.test_sequence.add_log_lines(
1189                    ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread),
1190                     {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1191                     ], True)
1192                context = self.expect_gdbremote_sequence()
1193                self.assertIsNotNone(context)
1194
1195                # Set the next value to use for writing as the increment plus
1196                # current value.
1197                p_response = context.get("p_response")
1198                self.assertIsNotNone(p_response)
1199                next_value = lldbgdbserverutils.unpack_register_hex_unsigned(
1200                    endian, p_response)
1201
1202            # Set new value using P and thread suffix.
1203            self.reset_test_sequence()
1204            self.test_sequence.add_log_lines(
1205                [
1206                    "read packet: $P{0:x}={1};thread:{2:x}#00".format(
1207                        reg_index,
1208                        lldbgdbserverutils.pack_register_hex(
1209                            endian,
1210                            next_value,
1211                            byte_size=reg_byte_size),
1212                        thread),
1213                    "send packet: $OK#00",
1214                ],
1215                True)
1216            context = self.expect_gdbremote_sequence()
1217            self.assertIsNotNone(context)
1218
1219            # Save the value we set.
1220            expected_reg_values.append(next_value)
1221
1222            # Increment value for next thread to use (we want them all
1223            # different so we can verify they wrote to each thread correctly
1224            # next.)
1225            next_value += register_increment
1226
1227        # Revisit each thread and verify they have the expected value set for
1228        # the register we wrote.
1229        thread_index = 0
1230        for thread in threads:
1231            # Read pre-existing register value.
1232            self.reset_test_sequence()
1233            self.test_sequence.add_log_lines(
1234                ["read packet: $p{0:x};thread:{1:x}#00".format(reg_index, thread),
1235                 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1236                 ], True)
1237            context = self.expect_gdbremote_sequence()
1238            self.assertIsNotNone(context)
1239
1240            # Get the register value.
1241            p_response = context.get("p_response")
1242            self.assertIsNotNone(p_response)
1243            read_value = lldbgdbserverutils.unpack_register_hex_unsigned(
1244                endian, p_response)
1245
1246            # Make sure we read back what we wrote.
1247            self.assertEqual(read_value, expected_reg_values[thread_index])
1248            thread_index += 1
1249