1from lldbsuite.test.decorators import *
2from lldbsuite.test.lldbtest import *
3
4import gdbremote_testcase
5
6
7class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase):
8
9    mydir = TestBase.compute_mydir(__file__)
10
11    @skipIfWindows  # no SIGSEGV support
12    @add_test_categories(["llgs"])
13    def test_run(self):
14        self.build()
15        self.set_inferior_startup_launch()
16        thread_num = 3
17        procs = self.prep_debug_monitor_and_inferior(
18                inferior_args=["thread:segfault"] + thread_num * ["thread:new"])
19        self.test_sequence.add_log_lines(
20            ["read packet: $QNonStop:1#00",
21             "send packet: $OK#00",
22             "read packet: $c#63",
23             "send packet: $OK#00",
24             ], True)
25        self.expect_gdbremote_sequence()
26
27        segv_signo = lldbutil.get_signal_number('SIGSEGV')
28        all_threads = set()
29        all_segv_threads = []
30
31        # we should get segfaults from all the threads
32        for segv_no in range(thread_num):
33            # first wait for the notification event
34            self.reset_test_sequence()
35            self.test_sequence.add_log_lines(
36                [{"direction": "send",
37                  "regex": r"^%Stop:(T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);)",
38                  "capture": {1: "packet", 2: "signo", 3: "thread_id"},
39                  },
40                 ], True)
41            m = self.expect_gdbremote_sequence()
42            del m["O_content"]
43            threads = [m]
44
45            # then we may get events for the remaining threads
46            # (but note that not all threads may have been started yet)
47            while True:
48                self.reset_test_sequence()
49                self.test_sequence.add_log_lines(
50                    ["read packet: $vStopped#00",
51                     {"direction": "send",
52                      "regex": r"^\$(OK|T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);)",
53                      "capture": {1: "packet", 2: "signo", 3: "thread_id"},
54                      },
55                     ], True)
56                m = self.expect_gdbremote_sequence()
57                if m["packet"] == "OK":
58                    break
59                del m["O_content"]
60                threads.append(m)
61
62            segv_threads = []
63            other_threads = []
64            for t in threads:
65                signo = int(t["signo"], 16)
66                if signo == segv_signo:
67                    segv_threads.append(t["thread_id"])
68                else:
69                    self.assertEqual(signo, 0)
70                    other_threads.append(t["thread_id"])
71
72            # verify that exactly one thread segfaulted
73            self.assertEqual(len(segv_threads), 1)
74            # we should get only one segv from every thread
75            self.assertNotIn(segv_threads[0], all_segv_threads)
76            all_segv_threads.extend(segv_threads)
77            # segv_threads + other_threads should always be a superset
78            # of all_threads, i.e. we should get states for all threads
79            # already started
80            self.assertFalse(
81                    all_threads.difference(other_threads + segv_threads))
82            all_threads.update(other_threads + segv_threads)
83
84            # verify that `?` returns the same result
85            self.reset_test_sequence()
86            self.test_sequence.add_log_lines(
87                ["read packet: $?#00",
88                 ], True)
89            threads_verify = []
90            while True:
91                self.test_sequence.add_log_lines(
92                    [{"direction": "send",
93                      "regex": r"^\$(OK|T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);)",
94                      "capture": {1: "packet", 2: "signo", 3: "thread_id"},
95                      },
96                     ], True)
97                m = self.expect_gdbremote_sequence()
98                if m["packet"] == "OK":
99                    break
100                del m["O_content"]
101                threads_verify.append(m)
102                self.reset_test_sequence()
103                self.test_sequence.add_log_lines(
104                    ["read packet: $vStopped#00",
105                     ], True)
106
107            self.assertEqual(threads, threads_verify)
108
109            self.reset_test_sequence()
110            self.test_sequence.add_log_lines(
111                ["read packet: $vCont;C{:02x}:{};c#00"
112                 .format(segv_signo, segv_threads[0]),
113                 "send packet: $OK#00",
114                 ], True)
115            self.expect_gdbremote_sequence()
116
117        # finally, verify that all threads have started
118        self.assertEqual(len(all_threads), thread_num + 1)
119
120    @add_test_categories(["llgs"])
121    def test_vCtrlC(self):
122        self.build()
123        self.set_inferior_startup_launch()
124        procs = self.prep_debug_monitor_and_inferior(
125                inferior_args=["thread:new"])
126        self.test_sequence.add_log_lines(
127            ["read packet: $QNonStop:1#00",
128             "send packet: $OK#00",
129             "read packet: $c#63",
130             "send packet: $OK#00",
131             "read packet: $vCtrlC#00",
132             "send packet: $OK#00",
133             {"direction": "send",
134              "regex": r"^%Stop:T",
135              },
136             ], True)
137        self.expect_gdbremote_sequence()
138
139    @add_test_categories(["llgs"])
140    def test_exit(self):
141        self.build()
142        self.set_inferior_startup_launch()
143        procs = self.prep_debug_monitor_and_inferior()
144        self.test_sequence.add_log_lines(
145            ["read packet: $QNonStop:1#00",
146             "send packet: $OK#00",
147             "read packet: $c#63",
148             "send packet: $OK#00",
149             "send packet: %Stop:W00#00",
150             "read packet: $vStopped#00",
151             "send packet: $OK#00",
152             ], True)
153        self.expect_gdbremote_sequence()
154
155    @skipIfWindows  # no clue, the result makes zero sense
156    @add_test_categories(["llgs"])
157    def test_exit_query(self):
158        self.build()
159        self.set_inferior_startup_launch()
160        procs = self.prep_debug_monitor_and_inferior()
161        self.test_sequence.add_log_lines(
162            ["read packet: $QNonStop:1#00",
163             "send packet: $OK#00",
164             "read packet: $c#63",
165             "send packet: $OK#00",
166             "send packet: %Stop:W00#00",
167             "read packet: $?#00",
168             "send packet: $W00#00",
169             "read packet: $vStopped#00",
170             "send packet: $OK#00",
171             ], True)
172        self.expect_gdbremote_sequence()
173