1from lldbsuite.test.decorators import *
2from lldbsuite.test.lldbtest import *
3
4import gdbremote_testcase
5
6
7class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase):
8
9    @skipIfWindows  # no SIGSEGV support
10    @add_test_categories(["llgs"])
11    def test_run(self):
12        self.build()
13        self.set_inferior_startup_launch()
14        thread_num = 3
15        procs = self.prep_debug_monitor_and_inferior(
16                inferior_args=["thread:segfault"] + thread_num * ["thread:new"])
17        self.test_sequence.add_log_lines(
18            ["read packet: $QNonStop:1#00",
19             "send packet: $OK#00",
20             "read packet: $c#63",
21             "send packet: $OK#00",
22             ], True)
23        self.expect_gdbremote_sequence()
24
25        segv_signo = lldbutil.get_signal_number('SIGSEGV')
26        all_threads = set()
27        all_segv_threads = []
28
29        # we should get segfaults from all the threads
30        for segv_no in range(thread_num):
31            # first wait for the notification event
32            self.reset_test_sequence()
33            self.test_sequence.add_log_lines(
34                [{"direction": "send",
35                  "regex": r"^%Stop:(T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);)",
36                  "capture": {1: "packet", 2: "signo", 3: "thread_id"},
37                  },
38                 ], True)
39            m = self.expect_gdbremote_sequence()
40            del m["O_content"]
41            threads = [m]
42
43            # then we may get events for the remaining threads
44            # (but note that not all threads may have been started yet)
45            while True:
46                self.reset_test_sequence()
47                self.test_sequence.add_log_lines(
48                    ["read packet: $vStopped#00",
49                     {"direction": "send",
50                      "regex": r"^\$(OK|T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);)",
51                      "capture": {1: "packet", 2: "signo", 3: "thread_id"},
52                      },
53                     ], True)
54                m = self.expect_gdbremote_sequence()
55                if m["packet"] == "OK":
56                    break
57                del m["O_content"]
58                threads.append(m)
59
60            segv_threads = []
61            other_threads = []
62            for t in threads:
63                signo = int(t["signo"], 16)
64                if signo == segv_signo:
65                    segv_threads.append(t["thread_id"])
66                else:
67                    self.assertEqual(signo, 0)
68                    other_threads.append(t["thread_id"])
69
70            # verify that exactly one thread segfaulted
71            self.assertEqual(len(segv_threads), 1)
72            # we should get only one segv from every thread
73            self.assertNotIn(segv_threads[0], all_segv_threads)
74            all_segv_threads.extend(segv_threads)
75            # segv_threads + other_threads should always be a superset
76            # of all_threads, i.e. we should get states for all threads
77            # already started
78            self.assertFalse(
79                    all_threads.difference(other_threads + segv_threads))
80            all_threads.update(other_threads + segv_threads)
81
82            # verify that `?` returns the same result
83            self.reset_test_sequence()
84            self.test_sequence.add_log_lines(
85                ["read packet: $?#00",
86                 ], True)
87            threads_verify = []
88            while True:
89                self.test_sequence.add_log_lines(
90                    [{"direction": "send",
91                      "regex": r"^\$(OK|T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);)",
92                      "capture": {1: "packet", 2: "signo", 3: "thread_id"},
93                      },
94                     ], True)
95                m = self.expect_gdbremote_sequence()
96                if m["packet"] == "OK":
97                    break
98                del m["O_content"]
99                threads_verify.append(m)
100                self.reset_test_sequence()
101                self.test_sequence.add_log_lines(
102                    ["read packet: $vStopped#00",
103                     ], True)
104
105            self.assertEqual(threads, threads_verify)
106
107            self.reset_test_sequence()
108            self.test_sequence.add_log_lines(
109                ["read packet: $vCont;C{:02x}:{};c#00"
110                 .format(segv_signo, segv_threads[0]),
111                 "send packet: $OK#00",
112                 ], True)
113            self.expect_gdbremote_sequence()
114
115        # finally, verify that all threads have started
116        self.assertEqual(len(all_threads), thread_num + 1)
117
118    @add_test_categories(["llgs"])
119    def test_vCtrlC(self):
120        self.build()
121        self.set_inferior_startup_launch()
122        procs = self.prep_debug_monitor_and_inferior(
123                inferior_args=["thread:new"])
124        self.test_sequence.add_log_lines(
125            ["read packet: $QNonStop:1#00",
126             "send packet: $OK#00",
127             "read packet: $c#63",
128             "send packet: $OK#00",
129             "read packet: $vCtrlC#00",
130             "send packet: $OK#00",
131             {"direction": "send",
132              "regex": r"^%Stop:T",
133              },
134             ], True)
135        self.expect_gdbremote_sequence()
136
137    @add_test_categories(["llgs"])
138    def test_exit(self):
139        self.build()
140        self.set_inferior_startup_launch()
141        procs = self.prep_debug_monitor_and_inferior()
142        self.test_sequence.add_log_lines(
143            ["read packet: $QNonStop:1#00",
144             "send packet: $OK#00",
145             "read packet: $c#63",
146             "send packet: $OK#00",
147             "send packet: %Stop:W00#00",
148             "read packet: $vStopped#00",
149             "send packet: $OK#00",
150             ], True)
151        self.expect_gdbremote_sequence()
152
153    @skipIfWindows  # no clue, the result makes zero sense
154    @add_test_categories(["llgs"])
155    def test_exit_query(self):
156        self.build()
157        self.set_inferior_startup_launch()
158        procs = self.prep_debug_monitor_and_inferior()
159        self.test_sequence.add_log_lines(
160            ["read packet: $QNonStop:1#00",
161             "send packet: $OK#00",
162             "read packet: $c#63",
163             "send packet: $OK#00",
164             "send packet: %Stop:W00#00",
165             "read packet: $?#00",
166             "send packet: $W00#00",
167             "read packet: $vStopped#00",
168             "send packet: $OK#00",
169             ], True)
170        self.expect_gdbremote_sequence()
171
172    def multiple_resume_test(self, second_command):
173        self.build()
174        self.set_inferior_startup_launch()
175        procs = self.prep_debug_monitor_and_inferior(
176                inferior_args=["sleep:15"])
177        self.test_sequence.add_log_lines(
178            ["read packet: $QNonStop:1#00",
179             "send packet: $OK#00",
180             "read packet: $c#63",
181             "send packet: $OK#00",
182             "read packet: ${}#00".format(second_command),
183             "send packet: $E37#00",
184             ], True)
185        self.expect_gdbremote_sequence()
186
187    @add_test_categories(["llgs"])
188    def test_multiple_C(self):
189        self.multiple_resume_test("C05")
190
191    @add_test_categories(["llgs"])
192    def test_multiple_c(self):
193        self.multiple_resume_test("c")
194
195    @add_test_categories(["llgs"])
196    def test_multiple_s(self):
197        self.multiple_resume_test("s")
198
199    @skipIfWindows
200    @add_test_categories(["llgs"])
201    def test_multiple_vCont(self):
202        self.build()
203        self.set_inferior_startup_launch()
204        procs = self.prep_debug_monitor_and_inferior(
205                inferior_args=["thread:new", "stop", "sleep:15"])
206        self.test_sequence.add_log_lines(
207            ["read packet: $QNonStop:1#00",
208             "send packet: $OK#00",
209             "read packet: $c#63",
210             "send packet: $OK#00",
211             {"direction": "send",
212              "regex": r"^%Stop:T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+);",
213              "capture": {1: "tid1"},
214              },
215             "read packet: $vStopped#63",
216             {"direction": "send",
217              "regex": r"^[$]T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+);",
218              "capture": {1: "tid2"},
219              },
220             "read packet: $vStopped#63",
221             "send packet: $OK#00",
222             ], True)
223        ret = self.expect_gdbremote_sequence()
224
225        self.reset_test_sequence()
226        self.test_sequence.add_log_lines(
227            ["read packet: $vCont;c:{}#00".format(ret["tid1"]),
228             "send packet: $OK#00",
229             "read packet: $vCont;c:{}#00".format(ret["tid2"]),
230             "send packet: $E37#00",
231             ], True)
232        self.expect_gdbremote_sequence()
233
234    @add_test_categories(["llgs"])
235    def test_vCont_then_stop(self):
236        self.build()
237        self.set_inferior_startup_launch()
238        procs = self.prep_debug_monitor_and_inferior(
239                inferior_args=["sleep:15"])
240        self.test_sequence.add_log_lines(
241            ["read packet: $QNonStop:1#00",
242             "send packet: $OK#00",
243             "read packet: $c#63",
244             "send packet: $OK#00",
245             "read packet: $vCont;t#00",
246             "send packet: $OK#00",
247             ], True)
248        self.expect_gdbremote_sequence()
249
250    def vCont_then_partial_stop_test(self, run_both):
251        self.build()
252        self.set_inferior_startup_launch()
253        procs = self.prep_debug_monitor_and_inferior(
254                inferior_args=["thread:new", "stop", "sleep:15"])
255        self.test_sequence.add_log_lines(
256            ["read packet: $QNonStop:1#00",
257             "send packet: $OK#00",
258             "read packet: $c#63",
259             "send packet: $OK#00",
260             {"direction": "send",
261              "regex": r"^%Stop:T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+);",
262              "capture": {1: "tid1"},
263              },
264             "read packet: $vStopped#63",
265             {"direction": "send",
266              "regex": r"^[$]T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+);",
267              "capture": {1: "tid2"},
268              },
269             "read packet: $vStopped#63",
270             "send packet: $OK#00",
271             ], True)
272        ret = self.expect_gdbremote_sequence()
273
274        self.reset_test_sequence()
275        if run_both:
276            self.test_sequence.add_log_lines(
277                ["read packet: $vCont;c#00",
278                 ], True)
279        else:
280            self.test_sequence.add_log_lines(
281                ["read packet: $vCont;c:{}#00".format(ret["tid1"]),
282                 ], True)
283        self.test_sequence.add_log_lines(
284            ["send packet: $OK#00",
285             "read packet: $vCont;t:{}#00".format(ret["tid2"]),
286             "send packet: $E03#00",
287             ], True)
288        self.expect_gdbremote_sequence()
289
290    @skipIfWindows
291    @add_test_categories(["llgs"])
292    def test_vCont_then_partial_stop(self):
293        self.vCont_then_partial_stop_test(False)
294
295    @skipIfWindows
296    @add_test_categories(["llgs"])
297    def test_vCont_then_partial_stop_run_both(self):
298        self.vCont_then_partial_stop_test(True)
299
300    @skipIfWindows
301    @add_test_categories(["llgs"])
302    def test_stdio(self):
303        self.build()
304        self.set_inferior_startup_launch()
305        # Since we can't easily ensure that lldb will send output in two parts,
306        # just put a stop in the middle.  Since we don't clear vStdio,
307        # the second message won't be delivered immediately.
308        self.prep_debug_monitor_and_inferior(
309            inferior_args=["message 1", "stop", "message 2"])
310        self.test_sequence.add_log_lines(
311            ["read packet: $QNonStop:1#00",
312             "send packet: $OK#00",
313             "read packet: $c#63",
314             "send packet: $OK#00",
315             {"direction": "send", "regex": r"^%Stop:T.*"},
316             "read packet: $vStopped#00",
317             "send packet: $OK#00",
318             "read packet: $c#63",
319             "send packet: $OK#00",
320             "send packet: %Stop:W00#00",
321             ], True)
322        ret = self.expect_gdbremote_sequence()
323        self.assertIn(ret["O_content"], b"message 1\r\n")
324
325        # Now, this is somewhat messy.  expect_gdbremote_sequence() will
326        # automatically consume output packets, so we just send vStdio,
327        # assume the first reply was consumed, send another one and expect
328        # a non-consumable "OK" reply.
329        self.reset_test_sequence()
330        self.test_sequence.add_log_lines(
331            ["read packet: $vStdio#00",
332             "read packet: $vStdio#00",
333             "send packet: $OK#00",
334             ], True)
335        ret = self.expect_gdbremote_sequence()
336        self.assertIn(ret["O_content"], b"message 2\r\n")
337
338        self.reset_test_sequence()
339        self.test_sequence.add_log_lines(
340            ["read packet: $vStopped#00",
341             "send packet: $OK#00",
342             ], True)
343        self.expect_gdbremote_sequence()
344
345    @skipIfWindows
346    @add_test_categories(["llgs"])
347    def test_stop_reason_while_running(self):
348        self.build()
349        self.set_inferior_startup_launch()
350        procs = self.prep_debug_monitor_and_inferior(
351                inferior_args=["thread:new", "thread:new", "stop", "sleep:15"])
352        self.test_sequence.add_log_lines(
353            ["read packet: $QNonStop:1#00",
354             "send packet: $OK#00",
355             # stop is used to synchronize starting threads
356             "read packet: $c#63",
357             "send packet: $OK#00",
358             {"direction": "send", "regex": "%Stop:T.*"},
359             "read packet: $c#63",
360             "send packet: $OK#00",
361             "read packet: $?#00",
362             "send packet: $OK#00",
363             ], True)
364        self.expect_gdbremote_sequence()
365
366    @skipIfWindows
367    @add_test_categories(["llgs"])
368    def test_leave_nonstop(self):
369        self.build()
370        self.set_inferior_startup_launch()
371        procs = self.prep_debug_monitor_and_inferior(
372                inferior_args=["thread:new", "thread:new", "stop", "sleep:15"])
373        self.test_sequence.add_log_lines(
374            ["read packet: $QNonStop:1#00",
375             "send packet: $OK#00",
376             # stop is used to synchronize starting threads
377             "read packet: $c#63",
378             "send packet: $OK#00",
379             {"direction": "send", "regex": "%Stop:T.*"},
380             "read packet: $c#63",
381             "send packet: $OK#00",
382             # verify that the threads are running now
383             "read packet: $?#00",
384             "send packet: $OK#00",
385             "read packet: $QNonStop:0#00",
386             "send packet: $OK#00",
387             # we should issue some random request now to verify that the stub
388             # did not send stop reasons -- we may verify whether notification
389             # queue was cleared while at it
390             "read packet: $vStopped#00",
391             "send packet: $Eff#00",
392             "read packet: $?#00",
393             {"direction": "send", "regex": "[$]T.*"},
394             ], True)
395        self.expect_gdbremote_sequence()
396