1import gdbremote_testcase
2from lldbsuite.test.decorators import *
3from lldbsuite.test.lldbtest import *
4from lldbsuite.test import lldbutil
5
6class TestGdbRemoteFork(gdbremote_testcase.GdbRemoteTestCaseBase):
7
8    fork_regex = ("[$]T05thread:p([0-9a-f]+)[.]([0-9a-f]+);.*"
9                  "{}:p([0-9a-f]+)[.]([0-9a-f]+).*")
10    fork_capture = {1: "parent_pid", 2: "parent_tid",
11                    3: "child_pid", 4: "child_tid"}
12    procinfo_regex = "[$]pid:([0-9a-f]+);.*"
13
14    @add_test_categories(["fork"])
15    def test_fork_multithreaded(self):
16        self.build()
17        self.prep_debug_monitor_and_inferior(inferior_args=["thread:new"]*2 + ["fork"])
18        self.add_qSupported_packets(["multiprocess+", "fork-events+"])
19        ret = self.expect_gdbremote_sequence()
20        self.assertIn("fork-events+", ret["qSupported_response"])
21        self.reset_test_sequence()
22
23        # continue and expect fork
24        self.test_sequence.add_log_lines([
25            "read packet: $c#00",
26            {"direction": "send", "regex": self.fork_regex.format("fork"),
27             "capture": self.fork_capture},
28        ], True)
29        ret = self.expect_gdbremote_sequence()
30        child_pid = ret["child_pid"]
31        self.reset_test_sequence()
32
33        # detach the forked child
34        self.test_sequence.add_log_lines([
35            "read packet: $D;{}#00".format(child_pid),
36            "send packet: $OK#00",
37            "read packet: $k#00",
38        ], True)
39        self.expect_gdbremote_sequence()
40
41    def fork_and_detach_test(self, variant):
42        self.build()
43        self.prep_debug_monitor_and_inferior(inferior_args=[variant])
44        self.add_qSupported_packets(["multiprocess+",
45                                     "{}-events+".format(variant)])
46        ret = self.expect_gdbremote_sequence()
47        self.assertIn("{}-events+".format(variant), ret["qSupported_response"])
48        self.reset_test_sequence()
49
50        # continue and expect fork
51        self.test_sequence.add_log_lines([
52            "read packet: $c#00",
53            {"direction": "send", "regex": self.fork_regex.format(variant),
54             "capture": self.fork_capture},
55        ], True)
56        ret = self.expect_gdbremote_sequence()
57        parent_pid = ret["parent_pid"]
58        parent_tid = ret["parent_tid"]
59        child_pid = ret["child_pid"]
60        child_tid = ret["child_tid"]
61        self.reset_test_sequence()
62
63        # detach the forked child
64        self.test_sequence.add_log_lines([
65            "read packet: $D;{}#00".format(child_pid),
66            "send packet: $OK#00",
67            # verify that the current process is correct
68            "read packet: $qC#00",
69            "send packet: $QC{}#00".format(parent_tid),
70            # verify that the correct processes are detached/available
71            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
72            "send packet: $Eff#00",
73            "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
74            "send packet: $OK#00",
75        ], True)
76        self.expect_gdbremote_sequence()
77        self.reset_test_sequence()
78        return parent_pid, parent_tid
79
80    @add_test_categories(["fork"])
81    def test_fork(self):
82        parent_pid, _ = self.fork_and_detach_test("fork")
83
84        # resume the parent
85        self.test_sequence.add_log_lines([
86            "read packet: $c#00",
87            "send packet: $W00;process:{}#00".format(parent_pid),
88        ], True)
89        self.expect_gdbremote_sequence()
90
91    @add_test_categories(["fork"])
92    def test_vfork(self):
93        parent_pid, parent_tid = self.fork_and_detach_test("vfork")
94
95        # resume the parent
96        self.test_sequence.add_log_lines([
97            "read packet: $c#00",
98            {"direction": "send",
99             "regex": r"[$]T05thread:p{}[.]{}.*vforkdone.*".format(parent_pid,
100                                                                   parent_tid),
101             },
102            "read packet: $c#00",
103            "send packet: $W00;process:{}#00".format(parent_pid),
104        ], True)
105        self.expect_gdbremote_sequence()
106
107    def fork_and_follow_test(self, variant):
108        self.build()
109        self.prep_debug_monitor_and_inferior(inferior_args=[variant])
110        self.add_qSupported_packets(["multiprocess+",
111                                     "{}-events+".format(variant)])
112        ret = self.expect_gdbremote_sequence()
113        self.assertIn("{}-events+".format(variant), ret["qSupported_response"])
114        self.reset_test_sequence()
115
116        # continue and expect fork
117        self.test_sequence.add_log_lines([
118            "read packet: $c#00",
119            {"direction": "send", "regex": self.fork_regex.format(variant),
120             "capture": self.fork_capture},
121        ], True)
122        ret = self.expect_gdbremote_sequence()
123        parent_pid = ret["parent_pid"]
124        parent_tid = ret["parent_tid"]
125        child_pid = ret["child_pid"]
126        child_tid = ret["child_tid"]
127        self.reset_test_sequence()
128
129        # switch to the forked child
130        self.test_sequence.add_log_lines([
131            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
132            "send packet: $OK#00",
133            "read packet: $Hcp{}.{}#00".format(child_pid, child_tid),
134            "send packet: $OK#00",
135            # detach the parent
136            "read packet: $D;{}#00".format(parent_pid),
137            "send packet: $OK#00",
138            # verify that the correct processes are detached/available
139            "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
140            "send packet: $Eff#00",
141            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
142            "send packet: $OK#00",
143            # then resume the child
144            "read packet: $c#00",
145            "send packet: $W00;process:{}#00".format(child_pid),
146        ], True)
147        self.expect_gdbremote_sequence()
148
149    @add_test_categories(["fork"])
150    def test_fork_follow(self):
151        self.fork_and_follow_test("fork")
152
153    @add_test_categories(["fork"])
154    def test_vfork_follow(self):
155        self.fork_and_follow_test("vfork")
156
157    @add_test_categories(["fork"])
158    def test_select_wrong_pid(self):
159        self.build()
160        self.prep_debug_monitor_and_inferior()
161        self.add_qSupported_packets(["multiprocess+"])
162        ret = self.expect_gdbremote_sequence()
163        self.assertIn("multiprocess+", ret["qSupported_response"])
164        self.reset_test_sequence()
165
166        # get process pid
167        self.test_sequence.add_log_lines([
168            "read packet: $qProcessInfo#00",
169            {"direction": "send", "regex": self.procinfo_regex,
170             "capture": {1: "pid"}},
171            "read packet: $qC#00",
172            {"direction": "send", "regex": "[$]QC([0-9a-f]+)#.*",
173             "capture": {1: "tid"}},
174        ], True)
175        ret = self.expect_gdbremote_sequence()
176        pid, tid = (int(ret[x], 16) for x in ("pid", "tid"))
177        self.reset_test_sequence()
178
179        self.test_sequence.add_log_lines([
180            # try switching to correct pid
181            "read packet: $Hgp{:x}.{:x}#00".format(pid, tid),
182            "send packet: $OK#00",
183            "read packet: $Hcp{:x}.{:x}#00".format(pid, tid),
184            "send packet: $OK#00",
185            # try switching to invalid tid
186            "read packet: $Hgp{:x}.{:x}#00".format(pid, tid+1),
187            "send packet: $E15#00",
188            "read packet: $Hcp{:x}.{:x}#00".format(pid, tid+1),
189            "send packet: $E15#00",
190            # try switching to invalid pid
191            "read packet: $Hgp{:x}.{:x}#00".format(pid+1, tid),
192            "send packet: $Eff#00",
193            "read packet: $Hcp{:x}.{:x}#00".format(pid+1, tid),
194            "send packet: $Eff#00",
195        ], True)
196        self.expect_gdbremote_sequence()
197
198    @add_test_categories(["fork"])
199    def test_detach_current(self):
200        self.build()
201        self.prep_debug_monitor_and_inferior()
202        self.add_qSupported_packets(["multiprocess+"])
203        ret = self.expect_gdbremote_sequence()
204        self.assertIn("multiprocess+", ret["qSupported_response"])
205        self.reset_test_sequence()
206
207        # get process pid
208        self.test_sequence.add_log_lines([
209            "read packet: $qProcessInfo#00",
210            {"direction": "send", "regex": self.procinfo_regex,
211             "capture": {1: "pid"}},
212        ], True)
213        ret = self.expect_gdbremote_sequence()
214        pid = ret["pid"]
215        self.reset_test_sequence()
216
217        # detach the process
218        self.test_sequence.add_log_lines([
219            "read packet: $D;{}#00".format(pid),
220            "send packet: $OK#00",
221            "read packet: $qC#00",
222            "send packet: $E44#00",
223        ], True)
224        self.expect_gdbremote_sequence()
225
226    @add_test_categories(["fork"])
227    def test_detach_all(self):
228        self.build()
229        self.prep_debug_monitor_and_inferior(inferior_args=["fork"])
230        self.add_qSupported_packets(["multiprocess+",
231                                     "fork-events+"])
232        ret = self.expect_gdbremote_sequence()
233        self.assertIn("fork-events+", ret["qSupported_response"])
234        self.reset_test_sequence()
235
236        # continue and expect fork
237        self.test_sequence.add_log_lines([
238            "read packet: $c#00",
239            {"direction": "send", "regex": self.fork_regex.format("fork"),
240             "capture": self.fork_capture},
241        ], True)
242        ret = self.expect_gdbremote_sequence()
243        parent_pid = ret["parent_pid"]
244        parent_tid = ret["parent_tid"]
245        child_pid = ret["child_pid"]
246        child_tid = ret["child_tid"]
247        self.reset_test_sequence()
248
249        self.test_sequence.add_log_lines([
250            # double-check our PIDs
251            "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
252            "send packet: $OK#00",
253            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
254            "send packet: $OK#00",
255            # detach all processes
256            "read packet: $D#00",
257            "send packet: $OK#00",
258            # verify that both PIDs are invalid now
259            "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
260            "send packet: $Eff#00",
261            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
262            "send packet: $Eff#00",
263        ], True)
264        self.expect_gdbremote_sequence()
265