1import random
2
3from lldbsuite.test.decorators import *
4from lldbsuite.test.lldbtest import *
5
6from fork_testbase import GdbRemoteForkTestBase
7
8
9class TestGdbRemoteFork(GdbRemoteForkTestBase):
10    @add_test_categories(["fork"])
11    def test_fork_multithreaded(self):
12        _, _, child_pid, _ = self.start_fork_test(["thread:new"]*2 + ["fork"])
13
14        # detach the forked child
15        self.test_sequence.add_log_lines([
16            "read packet: $D;{}#00".format(child_pid),
17            "send packet: $OK#00",
18            "read packet: $k#00",
19        ], True)
20        self.expect_gdbremote_sequence()
21
22    @add_test_categories(["fork"])
23    def test_fork(self):
24        parent_pid, _ = self.fork_and_detach_test("fork")
25
26        # resume the parent
27        self.test_sequence.add_log_lines([
28            "read packet: $c#00",
29            "send packet: $W00;process:{}#00".format(parent_pid),
30        ], True)
31        self.expect_gdbremote_sequence()
32
33    @add_test_categories(["fork"])
34    def test_vfork(self):
35        parent_pid, parent_tid = self.fork_and_detach_test("vfork")
36
37        # resume the parent
38        self.test_sequence.add_log_lines([
39            "read packet: $c#00",
40            {"direction": "send",
41             "regex": r"[$]T[0-9a-fA-F]{{2}}thread:p{}[.]{}.*vforkdone.*"
42                      .format(parent_pid, parent_tid),
43             },
44            "read packet: $c#00",
45            "send packet: $W00;process:{}#00".format(parent_pid),
46        ], True)
47        self.expect_gdbremote_sequence()
48
49    @add_test_categories(["fork"])
50    def test_fork_follow(self):
51        self.fork_and_follow_test("fork")
52
53    @add_test_categories(["fork"])
54    def test_vfork_follow(self):
55        self.fork_and_follow_test("vfork")
56
57    @add_test_categories(["fork"])
58    def test_select_wrong_pid(self):
59        self.build()
60        self.prep_debug_monitor_and_inferior()
61        self.add_qSupported_packets(["multiprocess+"])
62        ret = self.expect_gdbremote_sequence()
63        self.assertIn("multiprocess+", ret["qSupported_response"])
64        self.reset_test_sequence()
65
66        # get process pid
67        self.test_sequence.add_log_lines([
68            "read packet: $qC#00",
69            {"direction": "send", "regex": "[$]QCp([0-9a-f]+).([0-9a-f]+)#.*",
70             "capture": {1: "pid", 2: "tid"}},
71        ], True)
72        ret = self.expect_gdbremote_sequence()
73        pid, tid = (int(ret[x], 16) for x in ("pid", "tid"))
74        self.reset_test_sequence()
75
76        self.test_sequence.add_log_lines([
77            # try switching to correct pid
78            "read packet: $Hgp{:x}.{:x}#00".format(pid, tid),
79            "send packet: $OK#00",
80            "read packet: $Hcp{:x}.{:x}#00".format(pid, tid),
81            "send packet: $OK#00",
82            # try switching to invalid tid
83            "read packet: $Hgp{:x}.{:x}#00".format(pid, tid+1),
84            "send packet: $E15#00",
85            "read packet: $Hcp{:x}.{:x}#00".format(pid, tid+1),
86            "send packet: $E15#00",
87            # try switching to invalid pid
88            "read packet: $Hgp{:x}.{:x}#00".format(pid+1, tid),
89            "send packet: $Eff#00",
90            "read packet: $Hcp{:x}.{:x}#00".format(pid+1, tid),
91            "send packet: $Eff#00",
92        ], True)
93        self.expect_gdbremote_sequence()
94
95    @add_test_categories(["fork"])
96    def test_detach_current(self):
97        self.build()
98        self.prep_debug_monitor_and_inferior()
99        self.add_qSupported_packets(["multiprocess+"])
100        ret = self.expect_gdbremote_sequence()
101        self.assertIn("multiprocess+", ret["qSupported_response"])
102        self.reset_test_sequence()
103
104        # get process pid
105        self.test_sequence.add_log_lines([
106            "read packet: $qC#00",
107            {"direction": "send", "regex": "[$]QCp([0-9a-f]+).[0-9a-f]+#.*",
108             "capture": {1: "pid"}},
109        ], True)
110        ret = self.expect_gdbremote_sequence()
111        pid = ret["pid"]
112        self.reset_test_sequence()
113
114        # detach the process
115        self.test_sequence.add_log_lines([
116            "read packet: $D;{}#00".format(pid),
117            "send packet: $OK#00",
118            "read packet: $qC#00",
119            "send packet: $E44#00",
120        ], True)
121        self.expect_gdbremote_sequence()
122
123    @add_test_categories(["fork"])
124    def test_detach_all(self):
125        self.detach_all_test()
126
127    @add_test_categories(["fork"])
128    def test_kill_all(self):
129        parent_pid, _, child_pid, _ = self.start_fork_test(["fork"])
130
131        exit_regex = "[$]X09;process:([0-9a-f]+)#.*"
132        self.test_sequence.add_log_lines([
133            # kill all processes
134            "read packet: $k#00",
135            {"direction": "send", "regex": exit_regex,
136             "capture": {1: "pid1"}},
137            {"direction": "send", "regex": exit_regex,
138             "capture": {1: "pid2"}},
139        ], True)
140        ret = self.expect_gdbremote_sequence()
141        self.assertEqual(set([ret["pid1"], ret["pid2"]]),
142                         set([parent_pid, child_pid]))
143
144    @add_test_categories(["fork"])
145    def test_vkill_child(self):
146        self.vkill_test(kill_child=True)
147
148    @add_test_categories(["fork"])
149    def test_vkill_parent(self):
150        self.vkill_test(kill_parent=True)
151
152    @add_test_categories(["fork"])
153    def test_vkill_both(self):
154        self.vkill_test(kill_parent=True, kill_child=True)
155
156    @add_test_categories(["fork"])
157    def test_c_parent(self):
158        self.resume_one_test(run_order=["parent", "parent"])
159
160    @add_test_categories(["fork"])
161    def test_c_child(self):
162        self.resume_one_test(run_order=["child", "child"])
163
164    @add_test_categories(["fork"])
165    def test_c_parent_then_child(self):
166        self.resume_one_test(run_order=["parent", "parent", "child", "child"])
167
168    @add_test_categories(["fork"])
169    def test_c_child_then_parent(self):
170        self.resume_one_test(run_order=["child", "child", "parent", "parent"])
171
172    @add_test_categories(["fork"])
173    def test_c_interspersed(self):
174        self.resume_one_test(run_order=["parent", "child", "parent", "child"])
175
176    @add_test_categories(["fork"])
177    def test_vCont_parent(self):
178        self.resume_one_test(run_order=["parent", "parent"], use_vCont=True)
179
180    @add_test_categories(["fork"])
181    def test_vCont_child(self):
182        self.resume_one_test(run_order=["child", "child"], use_vCont=True)
183
184    @add_test_categories(["fork"])
185    def test_vCont_parent_then_child(self):
186        self.resume_one_test(run_order=["parent", "parent", "child", "child"],
187                             use_vCont=True)
188
189    @add_test_categories(["fork"])
190    def test_vCont_child_then_parent(self):
191        self.resume_one_test(run_order=["child", "child", "parent", "parent"],
192                             use_vCont=True)
193
194    @add_test_categories(["fork"])
195    def test_vCont_interspersed(self):
196        self.resume_one_test(run_order=["parent", "child", "parent", "child"],
197                             use_vCont=True)
198
199    @add_test_categories(["fork"])
200    def test_vCont_two_processes(self):
201        parent_pid, parent_tid, child_pid, child_tid = (
202            self.start_fork_test(["fork", "stop"]))
203
204        self.test_sequence.add_log_lines([
205            # try to resume both processes
206            "read packet: $vCont;c:p{}.{};c:p{}.{}#00".format(
207                parent_pid, parent_tid, child_pid, child_tid),
208            "send packet: $E03#00",
209        ], True)
210        self.expect_gdbremote_sequence()
211
212    @add_test_categories(["fork"])
213    def test_vCont_all_processes_explicit(self):
214        self.start_fork_test(["fork", "stop"])
215
216        self.test_sequence.add_log_lines([
217            # try to resume all processes implicitly
218            "read packet: $vCont;c:p-1.-1#00",
219            "send packet: $E03#00",
220        ], True)
221        self.expect_gdbremote_sequence()
222
223    @add_test_categories(["fork"])
224    def test_vCont_all_processes_implicit(self):
225        self.start_fork_test(["fork", "stop"])
226
227        self.test_sequence.add_log_lines([
228            # try to resume all processes implicitly
229            "read packet: $vCont;c#00",
230            "send packet: $E03#00",
231        ], True)
232        self.expect_gdbremote_sequence()
233
234    @add_test_categories(["fork"])
235    def test_threadinfo(self):
236        parent_pid, parent_tid, child_pid, child_tid = (
237            self.start_fork_test(["fork", "thread:new", "stop"]))
238        pidtids = [
239            (parent_pid, parent_tid),
240            (child_pid, child_tid),
241        ]
242
243        self.add_threadinfo_collection_packets()
244        ret = self.expect_gdbremote_sequence()
245        prev_pidtids = set(self.parse_threadinfo_packets(ret))
246        self.assertEqual(prev_pidtids,
247                         frozenset((int(pid, 16), int(tid, 16))
248                                   for pid, tid in pidtids))
249        self.reset_test_sequence()
250
251        for pidtid in pidtids:
252            self.test_sequence.add_log_lines(
253                ["read packet: $Hcp{}.{}#00".format(*pidtid),
254                 "send packet: $OK#00",
255                 "read packet: $c#00",
256                 {"direction": "send",
257                  "regex": self.stop_regex.format(*pidtid),
258                  },
259                 ], True)
260            self.add_threadinfo_collection_packets()
261            ret = self.expect_gdbremote_sequence()
262            self.reset_test_sequence()
263            new_pidtids = set(self.parse_threadinfo_packets(ret))
264            added_pidtid = new_pidtids - prev_pidtids
265            prev_pidtids = new_pidtids
266
267            # verify that we've got exactly one new thread, and that
268            # the PID matches
269            self.assertEqual(len(added_pidtid), 1)
270            self.assertEqual(added_pidtid.pop()[0], int(pidtid[0], 16))
271
272        for pidtid in new_pidtids:
273            self.test_sequence.add_log_lines(
274                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
275                 "send packet: $OK#00",
276                 ], True)
277        self.expect_gdbremote_sequence()
278
279    @add_test_categories(["fork"])
280    def test_memory_read_write(self):
281        self.build()
282        INITIAL_DATA = "Initial message"
283        self.prep_debug_monitor_and_inferior(
284            inferior_args=["set-message:{}".format(INITIAL_DATA),
285                           "get-data-address-hex:g_message",
286                           "fork",
287                           "print-message:",
288                           "stop",
289                           ])
290        self.add_qSupported_packets(["multiprocess+",
291                                     "fork-events+"])
292        ret = self.expect_gdbremote_sequence()
293        self.assertIn("fork-events+", ret["qSupported_response"])
294        self.reset_test_sequence()
295
296        # continue and expect fork
297        self.test_sequence.add_log_lines([
298            "read packet: $c#00",
299            {"type": "output_match",
300             "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
301             "capture": {1: "addr"}},
302            {"direction": "send", "regex": self.fork_regex.format("fork"),
303             "capture": self.fork_capture},
304        ], True)
305        ret = self.expect_gdbremote_sequence()
306        pidtids = {
307            "parent": (ret["parent_pid"], ret["parent_tid"]),
308            "child": (ret["child_pid"], ret["child_tid"]),
309        }
310        addr = ret["addr"]
311        self.reset_test_sequence()
312
313        for name, pidtid in pidtids.items():
314            self.test_sequence.add_log_lines(
315                ["read packet: $Hgp{}.{}#00".format(*pidtid),
316                 "send packet: $OK#00",
317                 # read the current memory contents
318                 "read packet: $m{},{:x}#00".format(addr,
319                                                    len(INITIAL_DATA) + 1),
320                 {"direction": "send",
321                  "regex": r"^[$](.+)#.*$",
322                  "capture": {1: "data"}},
323                 # write a new value
324                 "read packet: $M{},{:x}:{}#00".format(addr,
325                                                       len(name) + 1,
326                                                       seven.hexlify(
327                                                           name + "\0")),
328                 "send packet: $OK#00",
329                 # resume the process and wait for the trap
330                 "read packet: $Hcp{}.{}#00".format(*pidtid),
331                 "send packet: $OK#00",
332                 "read packet: $c#00",
333                 {"type": "output_match",
334                  "regex": self.maybe_strict_output_regex(r"message: (.*)\r\n"),
335                  "capture": {1: "printed_message"}},
336                 {"direction": "send",
337                  "regex": self.stop_regex.format(*pidtid),
338                  },
339                 ], True)
340            ret = self.expect_gdbremote_sequence()
341            data = seven.unhexlify(ret["data"])
342            self.assertEqual(data, INITIAL_DATA + "\0")
343            self.assertEqual(ret["printed_message"], name);
344            self.reset_test_sequence()
345
346        # we do the second round separately to make sure that initial data
347        # is correctly preserved while writing into the first process
348
349        for name, pidtid in pidtids.items():
350            self.test_sequence.add_log_lines(
351                ["read packet: $Hgp{}.{}#00".format(*pidtid),
352                 "send packet: $OK#00",
353                 # read the current memory contents
354                 "read packet: $m{},{:x}#00".format(addr,
355                                                    len(name) + 1),
356                 {"direction": "send",
357                  "regex": r"^[$](.+)#.*$",
358                  "capture": {1: "data"}},
359                 ], True)
360            ret = self.expect_gdbremote_sequence()
361            self.assertIsNotNone(ret.get("data"))
362            data = seven.unhexlify(ret.get("data"))
363            self.assertEqual(data, name + "\0")
364            self.reset_test_sequence()
365
366    @add_test_categories(["fork"])
367    def test_register_read_write(self):
368        parent_pid, parent_tid, child_pid, child_tid = (
369            self.start_fork_test(["fork", "thread:new", "stop"]))
370        pidtids = [
371            (parent_pid, parent_tid),
372            (child_pid, child_tid),
373        ]
374
375        for pidtid in pidtids:
376            self.test_sequence.add_log_lines(
377                ["read packet: $Hcp{}.{}#00".format(*pidtid),
378                 "send packet: $OK#00",
379                 "read packet: $c#00",
380                 {"direction": "send",
381                  "regex": self.stop_regex.format(*pidtid),
382                  },
383                 ], True)
384
385        self.add_threadinfo_collection_packets()
386        ret = self.expect_gdbremote_sequence()
387        self.reset_test_sequence()
388
389        pidtids = set(self.parse_threadinfo_packets(ret))
390        self.assertEqual(len(pidtids), 4)
391        # first, save register values from all the threads
392        thread_regs = {}
393        for pidtid in pidtids:
394            for regno in range(256):
395                self.test_sequence.add_log_lines(
396                    ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
397                     "send packet: $OK#00",
398                     "read packet: $p{:x}#00".format(regno),
399                     {"direction": "send",
400                      "regex": r"^[$](.+)#.*$",
401                      "capture": {1: "data"}},
402                     ], True)
403                ret = self.expect_gdbremote_sequence()
404                data = ret.get("data")
405                self.assertIsNotNone(data)
406                # ignore registers shorter than 32 bits (this also catches
407                # "Exx" errors)
408                if len(data) >= 8:
409                    break
410            else:
411                self.skipTest("no usable register found")
412            thread_regs[pidtid] = (regno, data)
413
414        vals = set(x[1] for x in thread_regs.values())
415        # NB: cheap hack to make the loop below easier
416        new_val = next(iter(vals))
417
418        # then, start altering them and verify that we don't unexpectedly
419        # change the value from another thread
420        for pidtid in pidtids:
421            old_val = thread_regs[pidtid]
422            regno = old_val[0]
423            old_val_length = len(old_val[1])
424            # generate a unique new_val
425            while new_val in vals:
426                new_val = ('{{:0{}x}}'.format(old_val_length)
427                           .format(random.getrandbits(old_val_length*4)))
428            vals.add(new_val)
429
430            self.test_sequence.add_log_lines(
431                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
432                 "send packet: $OK#00",
433                 "read packet: $p{:x}#00".format(regno),
434                 {"direction": "send",
435                  "regex": r"^[$](.+)#.*$",
436                  "capture": {1: "data"}},
437                 "read packet: $P{:x}={}#00".format(regno, new_val),
438                 "send packet: $OK#00",
439                 ], True)
440            ret = self.expect_gdbremote_sequence()
441            data = ret.get("data")
442            self.assertIsNotNone(data)
443            self.assertEqual(data, old_val[1])
444            thread_regs[pidtid] = (regno, new_val)
445
446        # finally, verify that new values took effect
447        for pidtid in pidtids:
448            old_val = thread_regs[pidtid]
449            self.test_sequence.add_log_lines(
450                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
451                 "send packet: $OK#00",
452                 "read packet: $p{:x}#00".format(old_val[0]),
453                 {"direction": "send",
454                  "regex": r"^[$](.+)#.*$",
455                  "capture": {1: "data"}},
456                 ], True)
457            ret = self.expect_gdbremote_sequence()
458            data = ret.get("data")
459            self.assertIsNotNone(data)
460            self.assertEqual(data, old_val[1])
461
462    @add_test_categories(["fork"])
463    def test_qC(self):
464        parent_pid, parent_tid, child_pid, child_tid = (
465            self.start_fork_test(["fork", "thread:new", "stop"]))
466        pidtids = [
467            (parent_pid, parent_tid),
468            (child_pid, child_tid),
469        ]
470
471        for pidtid in pidtids:
472            self.test_sequence.add_log_lines(
473                ["read packet: $Hcp{}.{}#00".format(*pidtid),
474                 "send packet: $OK#00",
475                 "read packet: $c#00",
476                 {"direction": "send",
477                  "regex": self.stop_regex.format(*pidtid),
478                  },
479                 ], True)
480
481        self.add_threadinfo_collection_packets()
482        ret = self.expect_gdbremote_sequence()
483        self.reset_test_sequence()
484
485        pidtids = set(self.parse_threadinfo_packets(ret))
486        self.assertEqual(len(pidtids), 4)
487        for pidtid in pidtids:
488            self.test_sequence.add_log_lines(
489                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
490                 "send packet: $OK#00",
491                 "read packet: $qC#00",
492                 "send packet: $QCp{:x}.{:x}#00".format(*pidtid),
493                 ], True)
494        self.expect_gdbremote_sequence()
495
496    @add_test_categories(["fork"])
497    def test_T(self):
498        parent_pid, parent_tid, child_pid, child_tid = (
499            self.start_fork_test(["fork", "thread:new", "stop"]))
500        pidtids = [
501            (parent_pid, parent_tid),
502            (child_pid, child_tid),
503        ]
504
505        for pidtid in pidtids:
506            self.test_sequence.add_log_lines(
507                ["read packet: $Hcp{}.{}#00".format(*pidtid),
508                 "send packet: $OK#00",
509                 "read packet: $c#00",
510                 {"direction": "send",
511                  "regex": self.stop_regex.format(*pidtid),
512                  },
513                 ], True)
514
515        self.add_threadinfo_collection_packets()
516        ret = self.expect_gdbremote_sequence()
517        self.reset_test_sequence()
518
519        pidtids = set(self.parse_threadinfo_packets(ret))
520        self.assertEqual(len(pidtids), 4)
521        max_pid = max(pid for pid, tid in pidtids)
522        max_tid = max(tid for pid, tid in pidtids)
523        bad_pidtids = (
524            (max_pid, max_tid + 1, "E02"),
525            (max_pid + 1, max_tid, "E01"),
526            (max_pid + 1, max_tid + 1, "E01"),
527        )
528
529        for pidtid in pidtids:
530            self.test_sequence.add_log_lines(
531                [
532                 # test explicit PID+TID
533                 "read packet: $Tp{:x}.{:x}#00".format(*pidtid),
534                 "send packet: $OK#00",
535                 # test implicit PID via Hg
536                 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
537                 "send packet: $OK#00",
538                 "read packet: $T{:x}#00".format(max_tid + 1),
539                 "send packet: $E02#00",
540                 "read packet: $T{:x}#00".format(pidtid[1]),
541                 "send packet: $OK#00",
542                 ], True)
543        for pid, tid, expected in bad_pidtids:
544            self.test_sequence.add_log_lines(
545                ["read packet: $Tp{:x}.{:x}#00".format(pid, tid),
546                 "send packet: ${}#00".format(expected),
547                 ], True)
548        self.expect_gdbremote_sequence()
549