1import random
2
3from lldbsuite.test.decorators import *
4from lldbsuite.test.lldbtest import *
5
6from fork_testbase import GdbRemoteForkTestBase
7
8
9class TestGdbRemoteFork(GdbRemoteForkTestBase):
10    def setUp(self):
11        GdbRemoteForkTestBase.setUp(self)
12        if self.getPlatform() == "linux" and self.getArchitecture() in ['arm', 'aarch64']:
13            self.skipTest("Unsupported for Arm/AArch64 Linux")
14
15    @add_test_categories(["fork"])
16    def test_fork_multithreaded(self):
17        _, _, child_pid, _ = self.start_fork_test(["thread:new"]*2 + ["fork"])
18
19        # detach the forked child
20        self.test_sequence.add_log_lines([
21            "read packet: $D;{}#00".format(child_pid),
22            "send packet: $OK#00",
23            "read packet: $k#00",
24        ], True)
25        self.expect_gdbremote_sequence()
26
27    @add_test_categories(["fork"])
28    def test_fork(self):
29        parent_pid, _ = self.fork_and_detach_test("fork")
30
31        # resume the parent
32        self.test_sequence.add_log_lines([
33            "read packet: $c#00",
34            "send packet: $W00;process:{}#00".format(parent_pid),
35        ], True)
36        self.expect_gdbremote_sequence()
37
38    @add_test_categories(["fork"])
39    def test_vfork(self):
40        parent_pid, parent_tid = self.fork_and_detach_test("vfork")
41
42        # resume the parent
43        self.test_sequence.add_log_lines([
44            "read packet: $c#00",
45            {"direction": "send",
46             "regex": r"[$]T[0-9a-fA-F]{{2}}thread:p{}[.]{}.*vforkdone.*"
47                      .format(parent_pid, parent_tid),
48             },
49            "read packet: $c#00",
50            "send packet: $W00;process:{}#00".format(parent_pid),
51        ], True)
52        self.expect_gdbremote_sequence()
53
54    @add_test_categories(["fork"])
55    def test_fork_follow(self):
56        self.fork_and_follow_test("fork")
57
58    @add_test_categories(["fork"])
59    def test_vfork_follow(self):
60        self.fork_and_follow_test("vfork")
61
62    @add_test_categories(["fork"])
63    def test_select_wrong_pid(self):
64        self.build()
65        self.prep_debug_monitor_and_inferior()
66        self.add_qSupported_packets(["multiprocess+"])
67        ret = self.expect_gdbremote_sequence()
68        self.assertIn("multiprocess+", ret["qSupported_response"])
69        self.reset_test_sequence()
70
71        # get process pid
72        self.test_sequence.add_log_lines([
73            "read packet: $qC#00",
74            {"direction": "send", "regex": "[$]QCp([0-9a-f]+).([0-9a-f]+)#.*",
75             "capture": {1: "pid", 2: "tid"}},
76        ], True)
77        ret = self.expect_gdbremote_sequence()
78        pid, tid = (int(ret[x], 16) for x in ("pid", "tid"))
79        self.reset_test_sequence()
80
81        self.test_sequence.add_log_lines([
82            # try switching to correct pid
83            "read packet: $Hgp{:x}.{:x}#00".format(pid, tid),
84            "send packet: $OK#00",
85            "read packet: $Hcp{:x}.{:x}#00".format(pid, tid),
86            "send packet: $OK#00",
87            # try switching to invalid tid
88            "read packet: $Hgp{:x}.{:x}#00".format(pid, tid+1),
89            "send packet: $E15#00",
90            "read packet: $Hcp{:x}.{:x}#00".format(pid, tid+1),
91            "send packet: $E15#00",
92            # try switching to invalid pid
93            "read packet: $Hgp{:x}.{:x}#00".format(pid+1, tid),
94            "send packet: $Eff#00",
95            "read packet: $Hcp{:x}.{:x}#00".format(pid+1, tid),
96            "send packet: $Eff#00",
97        ], True)
98        self.expect_gdbremote_sequence()
99
100    @add_test_categories(["fork"])
101    def test_detach_current(self):
102        self.build()
103        self.prep_debug_monitor_and_inferior()
104        self.add_qSupported_packets(["multiprocess+"])
105        ret = self.expect_gdbremote_sequence()
106        self.assertIn("multiprocess+", ret["qSupported_response"])
107        self.reset_test_sequence()
108
109        # get process pid
110        self.test_sequence.add_log_lines([
111            "read packet: $qC#00",
112            {"direction": "send", "regex": "[$]QCp([0-9a-f]+).[0-9a-f]+#.*",
113             "capture": {1: "pid"}},
114        ], True)
115        ret = self.expect_gdbremote_sequence()
116        pid = ret["pid"]
117        self.reset_test_sequence()
118
119        # detach the process
120        self.test_sequence.add_log_lines([
121            "read packet: $D;{}#00".format(pid),
122            "send packet: $OK#00",
123            "read packet: $qC#00",
124            "send packet: $E44#00",
125        ], True)
126        self.expect_gdbremote_sequence()
127
128    @add_test_categories(["fork"])
129    def test_detach_all(self):
130        self.detach_all_test()
131
132    @add_test_categories(["fork"])
133    def test_kill_all(self):
134        parent_pid, _, child_pid, _ = self.start_fork_test(["fork"])
135
136        exit_regex = "[$]X09;process:([0-9a-f]+)#.*"
137        self.test_sequence.add_log_lines([
138            # kill all processes
139            "read packet: $k#00",
140            {"direction": "send", "regex": exit_regex,
141             "capture": {1: "pid1"}},
142            {"direction": "send", "regex": exit_regex,
143             "capture": {1: "pid2"}},
144        ], True)
145        ret = self.expect_gdbremote_sequence()
146        self.assertEqual(set([ret["pid1"], ret["pid2"]]),
147                         set([parent_pid, child_pid]))
148
149    @add_test_categories(["fork"])
150    def test_vkill_child(self):
151        self.vkill_test(kill_child=True)
152
153    @add_test_categories(["fork"])
154    def test_vkill_parent(self):
155        self.vkill_test(kill_parent=True)
156
157    @add_test_categories(["fork"])
158    def test_vkill_both(self):
159        self.vkill_test(kill_parent=True, kill_child=True)
160
161    @add_test_categories(["fork"])
162    def test_c_parent(self):
163        self.resume_one_test(run_order=["parent", "parent"])
164
165    @add_test_categories(["fork"])
166    def test_c_child(self):
167        self.resume_one_test(run_order=["child", "child"])
168
169    @add_test_categories(["fork"])
170    def test_c_parent_then_child(self):
171        self.resume_one_test(run_order=["parent", "parent", "child", "child"])
172
173    @add_test_categories(["fork"])
174    def test_c_child_then_parent(self):
175        self.resume_one_test(run_order=["child", "child", "parent", "parent"])
176
177    @add_test_categories(["fork"])
178    def test_c_interspersed(self):
179        self.resume_one_test(run_order=["parent", "child", "parent", "child"])
180
181    @add_test_categories(["fork"])
182    def test_vCont_parent(self):
183        self.resume_one_test(run_order=["parent", "parent"], use_vCont=True)
184
185    @add_test_categories(["fork"])
186    def test_vCont_child(self):
187        self.resume_one_test(run_order=["child", "child"], use_vCont=True)
188
189    @add_test_categories(["fork"])
190    def test_vCont_parent_then_child(self):
191        self.resume_one_test(run_order=["parent", "parent", "child", "child"],
192                             use_vCont=True)
193
194    @add_test_categories(["fork"])
195    def test_vCont_child_then_parent(self):
196        self.resume_one_test(run_order=["child", "child", "parent", "parent"],
197                             use_vCont=True)
198
199    @add_test_categories(["fork"])
200    def test_vCont_interspersed(self):
201        self.resume_one_test(run_order=["parent", "child", "parent", "child"],
202                             use_vCont=True)
203
204    @add_test_categories(["fork"])
205    def test_vCont_two_processes(self):
206        parent_pid, parent_tid, child_pid, child_tid = (
207            self.start_fork_test(["fork", "stop"]))
208
209        self.test_sequence.add_log_lines([
210            # try to resume both processes
211            "read packet: $vCont;c:p{}.{};c:p{}.{}#00".format(
212                parent_pid, parent_tid, child_pid, child_tid),
213            "send packet: $E03#00",
214        ], True)
215        self.expect_gdbremote_sequence()
216
217    @add_test_categories(["fork"])
218    def test_vCont_all_processes_explicit(self):
219        self.start_fork_test(["fork", "stop"])
220
221        self.test_sequence.add_log_lines([
222            # try to resume all processes implicitly
223            "read packet: $vCont;c:p-1.-1#00",
224            "send packet: $E03#00",
225        ], True)
226        self.expect_gdbremote_sequence()
227
228    @add_test_categories(["fork"])
229    def test_vCont_all_processes_implicit(self):
230        self.start_fork_test(["fork", "stop"])
231
232        self.test_sequence.add_log_lines([
233            # try to resume all processes implicitly
234            "read packet: $vCont;c#00",
235            "send packet: $E03#00",
236        ], True)
237        self.expect_gdbremote_sequence()
238
239    @add_test_categories(["fork"])
240    def test_threadinfo(self):
241        parent_pid, parent_tid, child_pid, child_tid = (
242            self.start_fork_test(["fork", "thread:new", "stop"]))
243        pidtids = [
244            (parent_pid, parent_tid),
245            (child_pid, child_tid),
246        ]
247
248        self.add_threadinfo_collection_packets()
249        ret = self.expect_gdbremote_sequence()
250        prev_pidtids = set(self.parse_threadinfo_packets(ret))
251        self.assertEqual(prev_pidtids,
252                         frozenset((int(pid, 16), int(tid, 16))
253                                   for pid, tid in pidtids))
254        self.reset_test_sequence()
255
256        for pidtid in pidtids:
257            self.test_sequence.add_log_lines(
258                ["read packet: $Hcp{}.{}#00".format(*pidtid),
259                 "send packet: $OK#00",
260                 "read packet: $c#00",
261                 {"direction": "send",
262                  "regex": self.stop_regex.format(*pidtid),
263                  },
264                 ], True)
265            self.add_threadinfo_collection_packets()
266            ret = self.expect_gdbremote_sequence()
267            self.reset_test_sequence()
268            new_pidtids = set(self.parse_threadinfo_packets(ret))
269            added_pidtid = new_pidtids - prev_pidtids
270            prev_pidtids = new_pidtids
271
272            # verify that we've got exactly one new thread, and that
273            # the PID matches
274            self.assertEqual(len(added_pidtid), 1)
275            self.assertEqual(added_pidtid.pop()[0], int(pidtid[0], 16))
276
277        for pidtid in new_pidtids:
278            self.test_sequence.add_log_lines(
279                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
280                 "send packet: $OK#00",
281                 ], True)
282        self.expect_gdbremote_sequence()
283
284    @add_test_categories(["fork"])
285    def test_memory_read_write(self):
286        self.build()
287        INITIAL_DATA = "Initial message"
288        self.prep_debug_monitor_and_inferior(
289            inferior_args=["set-message:{}".format(INITIAL_DATA),
290                           "get-data-address-hex:g_message",
291                           "fork",
292                           "print-message:",
293                           "stop",
294                           ])
295        self.add_qSupported_packets(["multiprocess+",
296                                     "fork-events+"])
297        ret = self.expect_gdbremote_sequence()
298        self.assertIn("fork-events+", ret["qSupported_response"])
299        self.reset_test_sequence()
300
301        # continue and expect fork
302        self.test_sequence.add_log_lines([
303            "read packet: $c#00",
304            {"type": "output_match",
305             "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
306             "capture": {1: "addr"}},
307            {"direction": "send", "regex": self.fork_regex.format("fork"),
308             "capture": self.fork_capture},
309        ], True)
310        ret = self.expect_gdbremote_sequence()
311        pidtids = {
312            "parent": (ret["parent_pid"], ret["parent_tid"]),
313            "child": (ret["child_pid"], ret["child_tid"]),
314        }
315        addr = ret["addr"]
316        self.reset_test_sequence()
317
318        for name, pidtid in pidtids.items():
319            self.test_sequence.add_log_lines(
320                ["read packet: $Hgp{}.{}#00".format(*pidtid),
321                 "send packet: $OK#00",
322                 # read the current memory contents
323                 "read packet: $m{},{:x}#00".format(addr,
324                                                    len(INITIAL_DATA) + 1),
325                 {"direction": "send",
326                  "regex": r"^[$](.+)#.*$",
327                  "capture": {1: "data"}},
328                 # write a new value
329                 "read packet: $M{},{:x}:{}#00".format(addr,
330                                                       len(name) + 1,
331                                                       seven.hexlify(
332                                                           name + "\0")),
333                 "send packet: $OK#00",
334                 # resume the process and wait for the trap
335                 "read packet: $Hcp{}.{}#00".format(*pidtid),
336                 "send packet: $OK#00",
337                 "read packet: $c#00",
338                 {"type": "output_match",
339                  "regex": self.maybe_strict_output_regex(r"message: (.*)\r\n"),
340                  "capture": {1: "printed_message"}},
341                 {"direction": "send",
342                  "regex": self.stop_regex.format(*pidtid),
343                  },
344                 ], True)
345            ret = self.expect_gdbremote_sequence()
346            data = seven.unhexlify(ret["data"])
347            self.assertEqual(data, INITIAL_DATA + "\0")
348            self.assertEqual(ret["printed_message"], name);
349            self.reset_test_sequence()
350
351        # we do the second round separately to make sure that initial data
352        # is correctly preserved while writing into the first process
353
354        for name, pidtid in pidtids.items():
355            self.test_sequence.add_log_lines(
356                ["read packet: $Hgp{}.{}#00".format(*pidtid),
357                 "send packet: $OK#00",
358                 # read the current memory contents
359                 "read packet: $m{},{:x}#00".format(addr,
360                                                    len(name) + 1),
361                 {"direction": "send",
362                  "regex": r"^[$](.+)#.*$",
363                  "capture": {1: "data"}},
364                 ], True)
365            ret = self.expect_gdbremote_sequence()
366            self.assertIsNotNone(ret.get("data"))
367            data = seven.unhexlify(ret.get("data"))
368            self.assertEqual(data, name + "\0")
369            self.reset_test_sequence()
370
371    @add_test_categories(["fork"])
372    def test_register_read_write(self):
373        parent_pid, parent_tid, child_pid, child_tid = (
374            self.start_fork_test(["fork", "thread:new", "stop"]))
375        pidtids = [
376            (parent_pid, parent_tid),
377            (child_pid, child_tid),
378        ]
379
380        for pidtid in pidtids:
381            self.test_sequence.add_log_lines(
382                ["read packet: $Hcp{}.{}#00".format(*pidtid),
383                 "send packet: $OK#00",
384                 "read packet: $c#00",
385                 {"direction": "send",
386                  "regex": self.stop_regex.format(*pidtid),
387                  },
388                 ], True)
389
390        self.add_threadinfo_collection_packets()
391        ret = self.expect_gdbremote_sequence()
392        self.reset_test_sequence()
393
394        pidtids = set(self.parse_threadinfo_packets(ret))
395        self.assertEqual(len(pidtids), 4)
396        # first, save register values from all the threads
397        thread_regs = {}
398        for pidtid in pidtids:
399            for regno in range(256):
400                self.test_sequence.add_log_lines(
401                    ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
402                     "send packet: $OK#00",
403                     "read packet: $p{:x}#00".format(regno),
404                     {"direction": "send",
405                      "regex": r"^[$](.+)#.*$",
406                      "capture": {1: "data"}},
407                     ], True)
408                ret = self.expect_gdbremote_sequence()
409                data = ret.get("data")
410                self.assertIsNotNone(data)
411                # ignore registers shorter than 32 bits (this also catches
412                # "Exx" errors)
413                if len(data) >= 8:
414                    break
415            else:
416                self.skipTest("no usable register found")
417            thread_regs[pidtid] = (regno, data)
418
419        vals = set(x[1] for x in thread_regs.values())
420        # NB: cheap hack to make the loop below easier
421        new_val = next(iter(vals))
422
423        # then, start altering them and verify that we don't unexpectedly
424        # change the value from another thread
425        for pidtid in pidtids:
426            old_val = thread_regs[pidtid]
427            regno = old_val[0]
428            old_val_length = len(old_val[1])
429            # generate a unique new_val
430            while new_val in vals:
431                new_val = ('{{:0{}x}}'.format(old_val_length)
432                           .format(random.getrandbits(old_val_length*4)))
433            vals.add(new_val)
434
435            self.test_sequence.add_log_lines(
436                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
437                 "send packet: $OK#00",
438                 "read packet: $p{:x}#00".format(regno),
439                 {"direction": "send",
440                  "regex": r"^[$](.+)#.*$",
441                  "capture": {1: "data"}},
442                 "read packet: $P{:x}={}#00".format(regno, new_val),
443                 "send packet: $OK#00",
444                 ], True)
445            ret = self.expect_gdbremote_sequence()
446            data = ret.get("data")
447            self.assertIsNotNone(data)
448            self.assertEqual(data, old_val[1])
449            thread_regs[pidtid] = (regno, new_val)
450
451        # finally, verify that new values took effect
452        for pidtid in pidtids:
453            old_val = thread_regs[pidtid]
454            self.test_sequence.add_log_lines(
455                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
456                 "send packet: $OK#00",
457                 "read packet: $p{:x}#00".format(old_val[0]),
458                 {"direction": "send",
459                  "regex": r"^[$](.+)#.*$",
460                  "capture": {1: "data"}},
461                 ], True)
462            ret = self.expect_gdbremote_sequence()
463            data = ret.get("data")
464            self.assertIsNotNone(data)
465            self.assertEqual(data, old_val[1])
466
467    @add_test_categories(["fork"])
468    def test_qC(self):
469        parent_pid, parent_tid, child_pid, child_tid = (
470            self.start_fork_test(["fork", "thread:new", "stop"]))
471        pidtids = [
472            (parent_pid, parent_tid),
473            (child_pid, child_tid),
474        ]
475
476        for pidtid in pidtids:
477            self.test_sequence.add_log_lines(
478                ["read packet: $Hcp{}.{}#00".format(*pidtid),
479                 "send packet: $OK#00",
480                 "read packet: $c#00",
481                 {"direction": "send",
482                  "regex": self.stop_regex.format(*pidtid),
483                  },
484                 ], True)
485
486        self.add_threadinfo_collection_packets()
487        ret = self.expect_gdbremote_sequence()
488        self.reset_test_sequence()
489
490        pidtids = set(self.parse_threadinfo_packets(ret))
491        self.assertEqual(len(pidtids), 4)
492        for pidtid in pidtids:
493            self.test_sequence.add_log_lines(
494                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
495                 "send packet: $OK#00",
496                 "read packet: $qC#00",
497                 "send packet: $QCp{:x}.{:x}#00".format(*pidtid),
498                 ], True)
499        self.expect_gdbremote_sequence()
500
501    @add_test_categories(["fork"])
502    def test_T(self):
503        parent_pid, parent_tid, child_pid, child_tid = (
504            self.start_fork_test(["fork", "thread:new", "stop"]))
505        pidtids = [
506            (parent_pid, parent_tid),
507            (child_pid, child_tid),
508        ]
509
510        for pidtid in pidtids:
511            self.test_sequence.add_log_lines(
512                ["read packet: $Hcp{}.{}#00".format(*pidtid),
513                 "send packet: $OK#00",
514                 "read packet: $c#00",
515                 {"direction": "send",
516                  "regex": self.stop_regex.format(*pidtid),
517                  },
518                 ], True)
519
520        self.add_threadinfo_collection_packets()
521        ret = self.expect_gdbremote_sequence()
522        self.reset_test_sequence()
523
524        pidtids = set(self.parse_threadinfo_packets(ret))
525        self.assertEqual(len(pidtids), 4)
526        max_pid = max(pid for pid, tid in pidtids)
527        max_tid = max(tid for pid, tid in pidtids)
528        bad_pidtids = (
529            (max_pid, max_tid + 1, "E02"),
530            (max_pid + 1, max_tid, "E01"),
531            (max_pid + 1, max_tid + 1, "E01"),
532        )
533
534        for pidtid in pidtids:
535            self.test_sequence.add_log_lines(
536                [
537                 # test explicit PID+TID
538                 "read packet: $Tp{:x}.{:x}#00".format(*pidtid),
539                 "send packet: $OK#00",
540                 # test implicit PID via Hg
541                 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
542                 "send packet: $OK#00",
543                 "read packet: $T{:x}#00".format(max_tid + 1),
544                 "send packet: $E02#00",
545                 "read packet: $T{:x}#00".format(pidtid[1]),
546                 "send packet: $OK#00",
547                 ], True)
548        for pid, tid, expected in bad_pidtids:
549            self.test_sequence.add_log_lines(
550                ["read packet: $Tp{:x}.{:x}#00".format(pid, tid),
551                 "send packet: ${}#00".format(expected),
552                 ], True)
553        self.expect_gdbremote_sequence()
554