1import random
2
3import gdbremote_testcase
4from lldbsuite.test.decorators import *
5from lldbsuite.test.lldbtest import *
6from lldbsuite.test import lldbutil
7
8class TestGdbRemoteFork(gdbremote_testcase.GdbRemoteTestCaseBase):
9
10    fork_regex = ("[$]T05thread:p([0-9a-f]+)[.]([0-9a-f]+);.*"
11                  "{}:p([0-9a-f]+)[.]([0-9a-f]+).*")
12    fork_capture = {1: "parent_pid", 2: "parent_tid",
13                    3: "child_pid", 4: "child_tid"}
14
15    def start_fork_test(self, args, variant="fork"):
16        self.build()
17        self.prep_debug_monitor_and_inferior(inferior_args=args)
18        self.add_qSupported_packets(["multiprocess+",
19                                     "{}-events+".format(variant)])
20        ret = self.expect_gdbremote_sequence()
21        self.assertIn("{}-events+".format(variant), ret["qSupported_response"])
22        self.reset_test_sequence()
23
24        # continue and expect fork
25        self.test_sequence.add_log_lines([
26            "read packet: $c#00",
27            {"direction": "send", "regex": self.fork_regex.format(variant),
28             "capture": self.fork_capture},
29        ], True)
30        ret = self.expect_gdbremote_sequence()
31        self.reset_test_sequence()
32
33        return tuple(ret[x] for x in ("parent_pid", "parent_tid",
34                                      "child_pid", "child_tid"))
35
36    @add_test_categories(["fork"])
37    def test_fork_multithreaded(self):
38        _, _, child_pid, _ = self.start_fork_test(["thread:new"]*2 + ["fork"])
39
40        # detach the forked child
41        self.test_sequence.add_log_lines([
42            "read packet: $D;{}#00".format(child_pid),
43            "send packet: $OK#00",
44            "read packet: $k#00",
45        ], True)
46        self.expect_gdbremote_sequence()
47
48    def fork_and_detach_test(self, variant):
49        parent_pid, parent_tid, child_pid, child_tid = (
50            self.start_fork_test([variant], variant))
51
52        # detach the forked child
53        self.test_sequence.add_log_lines([
54            "read packet: $D;{}#00".format(child_pid),
55            "send packet: $OK#00",
56            # verify that the current process is correct
57            "read packet: $qC#00",
58            "send packet: $QCp{}.{}#00".format(parent_pid, parent_tid),
59            # verify that the correct processes are detached/available
60            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
61            "send packet: $Eff#00",
62            "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
63            "send packet: $OK#00",
64        ], True)
65        self.expect_gdbremote_sequence()
66        self.reset_test_sequence()
67        return parent_pid, parent_tid
68
69    @add_test_categories(["fork"])
70    def test_fork(self):
71        parent_pid, _ = self.fork_and_detach_test("fork")
72
73        # resume the parent
74        self.test_sequence.add_log_lines([
75            "read packet: $c#00",
76            "send packet: $W00;process:{}#00".format(parent_pid),
77        ], True)
78        self.expect_gdbremote_sequence()
79
80    @add_test_categories(["fork"])
81    def test_vfork(self):
82        parent_pid, parent_tid = self.fork_and_detach_test("vfork")
83
84        # resume the parent
85        self.test_sequence.add_log_lines([
86            "read packet: $c#00",
87            {"direction": "send",
88             "regex": r"[$]T05thread:p{}[.]{}.*vforkdone.*".format(parent_pid,
89                                                                   parent_tid),
90             },
91            "read packet: $c#00",
92            "send packet: $W00;process:{}#00".format(parent_pid),
93        ], True)
94        self.expect_gdbremote_sequence()
95
96    def fork_and_follow_test(self, variant):
97        parent_pid, parent_tid, child_pid, child_tid = (
98            self.start_fork_test([variant], variant))
99
100        # switch to the forked child
101        self.test_sequence.add_log_lines([
102            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
103            "send packet: $OK#00",
104            "read packet: $Hcp{}.{}#00".format(child_pid, child_tid),
105            "send packet: $OK#00",
106            # detach the parent
107            "read packet: $D;{}#00".format(parent_pid),
108            "send packet: $OK#00",
109            # verify that the correct processes are detached/available
110            "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
111            "send packet: $Eff#00",
112            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
113            "send packet: $OK#00",
114            # then resume the child
115            "read packet: $c#00",
116            "send packet: $W00;process:{}#00".format(child_pid),
117        ], True)
118        self.expect_gdbremote_sequence()
119
120    @add_test_categories(["fork"])
121    def test_fork_follow(self):
122        self.fork_and_follow_test("fork")
123
124    @add_test_categories(["fork"])
125    def test_vfork_follow(self):
126        self.fork_and_follow_test("vfork")
127
128    @add_test_categories(["fork"])
129    def test_select_wrong_pid(self):
130        self.build()
131        self.prep_debug_monitor_and_inferior()
132        self.add_qSupported_packets(["multiprocess+"])
133        ret = self.expect_gdbremote_sequence()
134        self.assertIn("multiprocess+", ret["qSupported_response"])
135        self.reset_test_sequence()
136
137        # get process pid
138        self.test_sequence.add_log_lines([
139            "read packet: $qC#00",
140            {"direction": "send", "regex": "[$]QCp([0-9a-f]+).([0-9a-f]+)#.*",
141             "capture": {1: "pid", 2: "tid"}},
142        ], True)
143        ret = self.expect_gdbremote_sequence()
144        pid, tid = (int(ret[x], 16) for x in ("pid", "tid"))
145        self.reset_test_sequence()
146
147        self.test_sequence.add_log_lines([
148            # try switching to correct pid
149            "read packet: $Hgp{:x}.{:x}#00".format(pid, tid),
150            "send packet: $OK#00",
151            "read packet: $Hcp{:x}.{:x}#00".format(pid, tid),
152            "send packet: $OK#00",
153            # try switching to invalid tid
154            "read packet: $Hgp{:x}.{:x}#00".format(pid, tid+1),
155            "send packet: $E15#00",
156            "read packet: $Hcp{:x}.{:x}#00".format(pid, tid+1),
157            "send packet: $E15#00",
158            # try switching to invalid pid
159            "read packet: $Hgp{:x}.{:x}#00".format(pid+1, tid),
160            "send packet: $Eff#00",
161            "read packet: $Hcp{:x}.{:x}#00".format(pid+1, tid),
162            "send packet: $Eff#00",
163        ], True)
164        self.expect_gdbremote_sequence()
165
166    @add_test_categories(["fork"])
167    def test_detach_current(self):
168        self.build()
169        self.prep_debug_monitor_and_inferior()
170        self.add_qSupported_packets(["multiprocess+"])
171        ret = self.expect_gdbremote_sequence()
172        self.assertIn("multiprocess+", ret["qSupported_response"])
173        self.reset_test_sequence()
174
175        # get process pid
176        self.test_sequence.add_log_lines([
177            "read packet: $qC#00",
178            {"direction": "send", "regex": "[$]QCp([0-9a-f]+).[0-9a-f]+#.*",
179             "capture": {1: "pid"}},
180        ], True)
181        ret = self.expect_gdbremote_sequence()
182        pid = ret["pid"]
183        self.reset_test_sequence()
184
185        # detach the process
186        self.test_sequence.add_log_lines([
187            "read packet: $D;{}#00".format(pid),
188            "send packet: $OK#00",
189            "read packet: $qC#00",
190            "send packet: $E44#00",
191        ], True)
192        self.expect_gdbremote_sequence()
193
194    @add_test_categories(["fork"])
195    def test_detach_all(self):
196        parent_pid, parent_tid, child_pid, child_tid = (
197            self.start_fork_test(["fork"]))
198
199        self.test_sequence.add_log_lines([
200            # double-check our PIDs
201            "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
202            "send packet: $OK#00",
203            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
204            "send packet: $OK#00",
205            # detach all processes
206            "read packet: $D#00",
207            "send packet: $OK#00",
208            # verify that both PIDs are invalid now
209            "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
210            "send packet: $Eff#00",
211            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
212            "send packet: $Eff#00",
213        ], True)
214        self.expect_gdbremote_sequence()
215
216    @add_test_categories(["fork"])
217    def test_kill_all(self):
218        parent_pid, _, child_pid, _ = self.start_fork_test(["fork"])
219
220        exit_regex = "[$]X09;process:([0-9a-f]+)#.*"
221        self.test_sequence.add_log_lines([
222            # kill all processes
223            "read packet: $k#00",
224            {"direction": "send", "regex": exit_regex,
225             "capture": {1: "pid1"}},
226            {"direction": "send", "regex": exit_regex,
227             "capture": {1: "pid2"}},
228        ], True)
229        ret = self.expect_gdbremote_sequence()
230        self.assertEqual(set([ret["pid1"], ret["pid2"]]),
231                         set([parent_pid, child_pid]))
232
233    def vkill_test(self, kill_parent=False, kill_child=False):
234        assert kill_parent or kill_child
235        parent_pid, parent_tid, child_pid, child_tid = (
236            self.start_fork_test(["fork"]))
237
238        if kill_parent:
239            self.test_sequence.add_log_lines([
240                # kill the process
241                "read packet: $vKill;{}#00".format(parent_pid),
242                "send packet: $OK#00",
243            ], True)
244        if kill_child:
245            self.test_sequence.add_log_lines([
246                # kill the process
247                "read packet: $vKill;{}#00".format(child_pid),
248                "send packet: $OK#00",
249            ], True)
250        self.test_sequence.add_log_lines([
251            # check child PID/TID
252            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
253            "send packet: ${}#00".format("Eff" if kill_child else "OK"),
254            # check parent PID/TID
255            "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
256            "send packet: ${}#00".format("Eff" if kill_parent else "OK"),
257        ], True)
258        self.expect_gdbremote_sequence()
259
260    @add_test_categories(["fork"])
261    def test_vkill_child(self):
262        self.vkill_test(kill_child=True)
263
264    @add_test_categories(["fork"])
265    def test_vkill_parent(self):
266        self.vkill_test(kill_parent=True)
267
268    @add_test_categories(["fork"])
269    def test_vkill_both(self):
270        self.vkill_test(kill_parent=True, kill_child=True)
271
272    def resume_one_test(self, run_order, use_vCont=False):
273        parent_pid, parent_tid, child_pid, child_tid = (
274            self.start_fork_test(["fork", "trap"]))
275
276        parent_expect = [
277            "[$]T05thread:p{}.{};.*".format(parent_pid, parent_tid),
278            "[$]W00;process:{}#.*".format(parent_pid),
279        ]
280        child_expect = [
281            "[$]T05thread:p{}.{};.*".format(child_pid, child_tid),
282            "[$]W00;process:{}#.*".format(child_pid),
283        ]
284
285        for x in run_order:
286            if x == "parent":
287                pidtid = (parent_pid, parent_tid)
288                expect = parent_expect.pop(0)
289            elif x == "child":
290                pidtid = (child_pid, child_tid)
291                expect = child_expect.pop(0)
292            else:
293                assert False, "unexpected x={}".format(x)
294
295            if use_vCont:
296                self.test_sequence.add_log_lines([
297                    # continue the selected process
298                    "read packet: $vCont;c:p{}.{}#00".format(*pidtid),
299                ], True)
300            else:
301                self.test_sequence.add_log_lines([
302                    # continue the selected process
303                    "read packet: $Hcp{}.{}#00".format(*pidtid),
304                    "send packet: $OK#00",
305                    "read packet: $c#00",
306                ], True)
307            self.test_sequence.add_log_lines([
308                {"direction": "send", "regex": expect},
309            ], True)
310            # if at least one process remained, check both PIDs
311            if parent_expect or child_expect:
312                self.test_sequence.add_log_lines([
313                    "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
314                    "send packet: ${}#00".format("OK" if parent_expect else "Eff"),
315                    "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
316                    "send packet: ${}#00".format("OK" if child_expect else "Eff"),
317                ], True)
318        self.expect_gdbremote_sequence()
319
320    @add_test_categories(["fork"])
321    def test_c_parent(self):
322        self.resume_one_test(run_order=["parent", "parent"])
323
324    @add_test_categories(["fork"])
325    def test_c_child(self):
326        self.resume_one_test(run_order=["child", "child"])
327
328    @add_test_categories(["fork"])
329    def test_c_parent_then_child(self):
330        self.resume_one_test(run_order=["parent", "parent", "child", "child"])
331
332    @add_test_categories(["fork"])
333    def test_c_child_then_parent(self):
334        self.resume_one_test(run_order=["child", "child", "parent", "parent"])
335
336    @add_test_categories(["fork"])
337    def test_c_interspersed(self):
338        self.resume_one_test(run_order=["parent", "child", "parent", "child"])
339
340    @add_test_categories(["fork"])
341    def test_vCont_parent(self):
342        self.resume_one_test(run_order=["parent", "parent"], use_vCont=True)
343
344    @add_test_categories(["fork"])
345    def test_vCont_child(self):
346        self.resume_one_test(run_order=["child", "child"], use_vCont=True)
347
348    @add_test_categories(["fork"])
349    def test_vCont_parent_then_child(self):
350        self.resume_one_test(run_order=["parent", "parent", "child", "child"],
351                             use_vCont=True)
352
353    @add_test_categories(["fork"])
354    def test_vCont_child_then_parent(self):
355        self.resume_one_test(run_order=["child", "child", "parent", "parent"],
356                             use_vCont=True)
357
358    @add_test_categories(["fork"])
359    def test_vCont_interspersed(self):
360        self.resume_one_test(run_order=["parent", "child", "parent", "child"],
361                             use_vCont=True)
362
363    @add_test_categories(["fork"])
364    def test_vCont_two_processes(self):
365        parent_pid, parent_tid, child_pid, child_tid = (
366            self.start_fork_test(["fork", "trap"]))
367
368        self.test_sequence.add_log_lines([
369            # try to resume both processes
370            "read packet: $vCont;c:p{}.{};c:p{}.{}#00".format(
371                parent_pid, parent_tid, child_pid, child_tid),
372            "send packet: $E03#00",
373        ], True)
374        self.expect_gdbremote_sequence()
375
376    @add_test_categories(["fork"])
377    def test_vCont_all_processes_explicit(self):
378        self.start_fork_test(["fork", "trap"])
379
380        self.test_sequence.add_log_lines([
381            # try to resume all processes implicitly
382            "read packet: $vCont;c:p-1.-1#00",
383            "send packet: $E03#00",
384        ], True)
385        self.expect_gdbremote_sequence()
386
387    @add_test_categories(["fork"])
388    def test_vCont_all_processes_implicit(self):
389        self.start_fork_test(["fork", "trap"])
390
391        self.test_sequence.add_log_lines([
392            # try to resume all processes implicitly
393            "read packet: $vCont;c#00",
394            "send packet: $E03#00",
395        ], True)
396        self.expect_gdbremote_sequence()
397
398    @add_test_categories(["fork"])
399    def test_threadinfo(self):
400        parent_pid, parent_tid, child_pid, child_tid = (
401            self.start_fork_test(["fork", "thread:new", "trap"]))
402        pidtids = [
403            (parent_pid, parent_tid),
404            (child_pid, child_tid),
405        ]
406
407        self.add_threadinfo_collection_packets()
408        ret = self.expect_gdbremote_sequence()
409        prev_pidtids = set(self.parse_threadinfo_packets(ret))
410        self.assertEqual(prev_pidtids,
411                         frozenset((int(pid, 16), int(tid, 16))
412                                   for pid, tid in pidtids))
413        self.reset_test_sequence()
414
415        for pidtid in pidtids:
416            self.test_sequence.add_log_lines(
417                ["read packet: $Hcp{}.{}#00".format(*pidtid),
418                 "send packet: $OK#00",
419                 "read packet: $c#00",
420                 {"direction": "send",
421                  "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid),
422                  },
423                 ], True)
424            self.add_threadinfo_collection_packets()
425            ret = self.expect_gdbremote_sequence()
426            self.reset_test_sequence()
427            new_pidtids = set(self.parse_threadinfo_packets(ret))
428            added_pidtid = new_pidtids - prev_pidtids
429            prev_pidtids = new_pidtids
430
431            # verify that we've got exactly one new thread, and that
432            # the PID matches
433            self.assertEqual(len(added_pidtid), 1)
434            self.assertEqual(added_pidtid.pop()[0], int(pidtid[0], 16))
435
436        for pidtid in new_pidtids:
437            self.test_sequence.add_log_lines(
438                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
439                 "send packet: $OK#00",
440                 ], True)
441        self.expect_gdbremote_sequence()
442
443    @add_test_categories(["fork"])
444    def test_memory_read_write(self):
445        self.build()
446        INITIAL_DATA = "Initial message"
447        self.prep_debug_monitor_and_inferior(
448            inferior_args=["set-message:{}".format(INITIAL_DATA),
449                           "get-data-address-hex:g_message",
450                           "fork",
451                           "print-message:",
452                           "trap",
453                           ])
454        self.add_qSupported_packets(["multiprocess+",
455                                     "fork-events+"])
456        ret = self.expect_gdbremote_sequence()
457        self.assertIn("fork-events+", ret["qSupported_response"])
458        self.reset_test_sequence()
459
460        # continue and expect fork
461        self.test_sequence.add_log_lines([
462            "read packet: $c#00",
463            {"type": "output_match",
464             "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
465             "capture": {1: "addr"}},
466            {"direction": "send", "regex": self.fork_regex.format("fork"),
467             "capture": self.fork_capture},
468        ], True)
469        ret = self.expect_gdbremote_sequence()
470        pidtids = {
471            "parent": (ret["parent_pid"], ret["parent_tid"]),
472            "child": (ret["child_pid"], ret["child_tid"]),
473        }
474        addr = ret["addr"]
475        self.reset_test_sequence()
476
477        for name, pidtid in pidtids.items():
478            self.test_sequence.add_log_lines(
479                ["read packet: $Hgp{}.{}#00".format(*pidtid),
480                 "send packet: $OK#00",
481                 # read the current memory contents
482                 "read packet: $m{},{:x}#00".format(addr,
483                                                    len(INITIAL_DATA) + 1),
484                 {"direction": "send",
485                  "regex": r"^[$](.+)#.*$",
486                  "capture": {1: "data"}},
487                 # write a new value
488                 "read packet: $M{},{:x}:{}#00".format(addr,
489                                                       len(name) + 1,
490                                                       seven.hexlify(
491                                                           name + "\0")),
492                 "send packet: $OK#00",
493                 # resume the process and wait for the trap
494                 "read packet: $Hcp{}.{}#00".format(*pidtid),
495                 "send packet: $OK#00",
496                 "read packet: $c#00",
497                 {"type": "output_match",
498                  "regex": self.maybe_strict_output_regex(r"message: (.*)\r\n"),
499                  "capture": {1: "printed_message"}},
500                 {"direction": "send",
501                  "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid),
502                  },
503                 ], True)
504            ret = self.expect_gdbremote_sequence()
505            data = seven.unhexlify(ret["data"])
506            self.assertEqual(data, INITIAL_DATA + "\0")
507            self.assertEqual(ret["printed_message"], name);
508            self.reset_test_sequence()
509
510        # we do the second round separately to make sure that initial data
511        # is correctly preserved while writing into the first process
512
513        for name, pidtid in pidtids.items():
514            self.test_sequence.add_log_lines(
515                ["read packet: $Hgp{}.{}#00".format(*pidtid),
516                 "send packet: $OK#00",
517                 # read the current memory contents
518                 "read packet: $m{},{:x}#00".format(addr,
519                                                    len(name) + 1),
520                 {"direction": "send",
521                  "regex": r"^[$](.+)#.*$",
522                  "capture": {1: "data"}},
523                 ], True)
524            ret = self.expect_gdbremote_sequence()
525            self.assertIsNotNone(ret.get("data"))
526            data = seven.unhexlify(ret.get("data"))
527            self.assertEqual(data, name + "\0")
528            self.reset_test_sequence()
529
530    @add_test_categories(["fork"])
531    def test_register_read_write(self):
532        parent_pid, parent_tid, child_pid, child_tid = (
533            self.start_fork_test(["fork", "thread:new", "trap"]))
534        pidtids = [
535            (parent_pid, parent_tid),
536            (child_pid, child_tid),
537        ]
538
539        for pidtid in pidtids:
540            self.test_sequence.add_log_lines(
541                ["read packet: $Hcp{}.{}#00".format(*pidtid),
542                 "send packet: $OK#00",
543                 "read packet: $c#00",
544                 {"direction": "send",
545                  "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid),
546                  },
547                 ], True)
548
549        self.add_threadinfo_collection_packets()
550        ret = self.expect_gdbremote_sequence()
551        self.reset_test_sequence()
552
553        pidtids = set(self.parse_threadinfo_packets(ret))
554        self.assertEqual(len(pidtids), 4)
555        # first, save register values from all the threads
556        thread_regs = {}
557        for pidtid in pidtids:
558            for regno in range(256):
559                self.test_sequence.add_log_lines(
560                    ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
561                     "send packet: $OK#00",
562                     "read packet: $p{:x}#00".format(regno),
563                     {"direction": "send",
564                      "regex": r"^[$](.+)#.*$",
565                      "capture": {1: "data"}},
566                     ], True)
567                ret = self.expect_gdbremote_sequence()
568                data = ret.get("data")
569                self.assertIsNotNone(data)
570                # ignore registers shorter than 32 bits (this also catches
571                # "Exx" errors)
572                if len(data) >= 8:
573                    break
574            else:
575                self.skipTest("no usable register found")
576            thread_regs[pidtid] = (regno, data)
577
578        vals = set(x[1] for x in thread_regs.values())
579        # NB: cheap hack to make the loop below easier
580        new_val = next(iter(vals))
581
582        # then, start altering them and verify that we don't unexpectedly
583        # change the value from another thread
584        for pidtid in pidtids:
585            old_val = thread_regs[pidtid]
586            regno = old_val[0]
587            old_val_length = len(old_val[1])
588            # generate a unique new_val
589            while new_val in vals:
590                new_val = ('{{:0{}x}}'.format(old_val_length)
591                           .format(random.getrandbits(old_val_length*4)))
592            vals.add(new_val)
593
594            self.test_sequence.add_log_lines(
595                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
596                 "send packet: $OK#00",
597                 "read packet: $p{:x}#00".format(regno),
598                 {"direction": "send",
599                  "regex": r"^[$](.+)#.*$",
600                  "capture": {1: "data"}},
601                 "read packet: $P{:x}={}#00".format(regno, new_val),
602                 "send packet: $OK#00",
603                 ], True)
604            ret = self.expect_gdbremote_sequence()
605            data = ret.get("data")
606            self.assertIsNotNone(data)
607            self.assertEqual(data, old_val[1])
608            thread_regs[pidtid] = (regno, new_val)
609
610        # finally, verify that new values took effect
611        for pidtid in pidtids:
612            old_val = thread_regs[pidtid]
613            self.test_sequence.add_log_lines(
614                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
615                 "send packet: $OK#00",
616                 "read packet: $p{:x}#00".format(old_val[0]),
617                 {"direction": "send",
618                  "regex": r"^[$](.+)#.*$",
619                  "capture": {1: "data"}},
620                 ], True)
621            ret = self.expect_gdbremote_sequence()
622            data = ret.get("data")
623            self.assertIsNotNone(data)
624            self.assertEqual(data, old_val[1])
625
626    @add_test_categories(["fork"])
627    def test_qC(self):
628        parent_pid, parent_tid, child_pid, child_tid = (
629            self.start_fork_test(["fork", "thread:new", "trap"]))
630        pidtids = [
631            (parent_pid, parent_tid),
632            (child_pid, child_tid),
633        ]
634
635        for pidtid in pidtids:
636            self.test_sequence.add_log_lines(
637                ["read packet: $Hcp{}.{}#00".format(*pidtid),
638                 "send packet: $OK#00",
639                 "read packet: $c#00",
640                 {"direction": "send",
641                  "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid),
642                  },
643                 ], True)
644
645        self.add_threadinfo_collection_packets()
646        ret = self.expect_gdbremote_sequence()
647        self.reset_test_sequence()
648
649        pidtids = set(self.parse_threadinfo_packets(ret))
650        self.assertEqual(len(pidtids), 4)
651        for pidtid in pidtids:
652            self.test_sequence.add_log_lines(
653                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
654                 "send packet: $OK#00",
655                 "read packet: $qC#00",
656                 "send packet: $QCp{:x}.{:x}#00".format(*pidtid),
657                 ], True)
658        self.expect_gdbremote_sequence()
659
660    @add_test_categories(["fork"])
661    def test_T(self):
662        parent_pid, parent_tid, child_pid, child_tid = (
663            self.start_fork_test(["fork", "thread:new", "trap"]))
664        pidtids = [
665            (parent_pid, parent_tid),
666            (child_pid, child_tid),
667        ]
668
669        for pidtid in pidtids:
670            self.test_sequence.add_log_lines(
671                ["read packet: $Hcp{}.{}#00".format(*pidtid),
672                 "send packet: $OK#00",
673                 "read packet: $c#00",
674                 {"direction": "send",
675                  "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid),
676                  },
677                 ], True)
678
679        self.add_threadinfo_collection_packets()
680        ret = self.expect_gdbremote_sequence()
681        self.reset_test_sequence()
682
683        pidtids = set(self.parse_threadinfo_packets(ret))
684        self.assertEqual(len(pidtids), 4)
685        max_pid = max(pid for pid, tid in pidtids)
686        max_tid = max(tid for pid, tid in pidtids)
687        bad_pidtids = (
688            (max_pid, max_tid + 1, "E02"),
689            (max_pid + 1, max_tid, "E01"),
690            (max_pid + 1, max_tid + 1, "E01"),
691        )
692
693        for pidtid in pidtids:
694            self.test_sequence.add_log_lines(
695                [
696                 # test explicit PID+TID
697                 "read packet: $Tp{:x}.{:x}#00".format(*pidtid),
698                 "send packet: $OK#00",
699                 # test implicit PID via Hg
700                 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
701                 "send packet: $OK#00",
702                 "read packet: $T{:x}#00".format(max_tid + 1),
703                 "send packet: $E02#00",
704                 "read packet: $T{:x}#00".format(pidtid[1]),
705                 "send packet: $OK#00",
706                 ], True)
707        for pid, tid, expected in bad_pidtids:
708            self.test_sequence.add_log_lines(
709                ["read packet: $Tp{:x}.{:x}#00".format(pid, tid),
710                 "send packet: ${}#00".format(expected),
711                 ], True)
712        self.expect_gdbremote_sequence()
713