1import random 2 3import gdbremote_testcase 4from lldbsuite.test.decorators import * 5from lldbsuite.test.lldbtest import * 6from lldbsuite.test import lldbutil 7 8class TestGdbRemoteFork(gdbremote_testcase.GdbRemoteTestCaseBase): 9 10 fork_regex = ("[$]T05thread:p([0-9a-f]+)[.]([0-9a-f]+);.*" 11 "{}:p([0-9a-f]+)[.]([0-9a-f]+).*") 12 fork_capture = {1: "parent_pid", 2: "parent_tid", 13 3: "child_pid", 4: "child_tid"} 14 15 def start_fork_test(self, args, variant="fork"): 16 self.build() 17 self.prep_debug_monitor_and_inferior(inferior_args=args) 18 self.add_qSupported_packets(["multiprocess+", 19 "{}-events+".format(variant)]) 20 ret = self.expect_gdbremote_sequence() 21 self.assertIn("{}-events+".format(variant), ret["qSupported_response"]) 22 self.reset_test_sequence() 23 24 # continue and expect fork 25 self.test_sequence.add_log_lines([ 26 "read packet: $c#00", 27 {"direction": "send", "regex": self.fork_regex.format(variant), 28 "capture": self.fork_capture}, 29 ], True) 30 ret = self.expect_gdbremote_sequence() 31 self.reset_test_sequence() 32 33 return tuple(ret[x] for x in ("parent_pid", "parent_tid", 34 "child_pid", "child_tid")) 35 36 @add_test_categories(["fork"]) 37 def test_fork_multithreaded(self): 38 _, _, child_pid, _ = self.start_fork_test(["thread:new"]*2 + ["fork"]) 39 40 # detach the forked child 41 self.test_sequence.add_log_lines([ 42 "read packet: $D;{}#00".format(child_pid), 43 "send packet: $OK#00", 44 "read packet: $k#00", 45 ], True) 46 self.expect_gdbremote_sequence() 47 48 def fork_and_detach_test(self, variant): 49 parent_pid, parent_tid, child_pid, child_tid = ( 50 self.start_fork_test([variant], variant)) 51 52 # detach the forked child 53 self.test_sequence.add_log_lines([ 54 "read packet: $D;{}#00".format(child_pid), 55 "send packet: $OK#00", 56 # verify that the current process is correct 57 "read packet: $qC#00", 58 "send packet: $QCp{}.{}#00".format(parent_pid, parent_tid), 59 # verify that the correct processes are detached/available 60 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 61 "send packet: $Eff#00", 62 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 63 "send packet: $OK#00", 64 ], True) 65 self.expect_gdbremote_sequence() 66 self.reset_test_sequence() 67 return parent_pid, parent_tid 68 69 @add_test_categories(["fork"]) 70 def test_fork(self): 71 parent_pid, _ = self.fork_and_detach_test("fork") 72 73 # resume the parent 74 self.test_sequence.add_log_lines([ 75 "read packet: $c#00", 76 "send packet: $W00;process:{}#00".format(parent_pid), 77 ], True) 78 self.expect_gdbremote_sequence() 79 80 @add_test_categories(["fork"]) 81 def test_vfork(self): 82 parent_pid, parent_tid = self.fork_and_detach_test("vfork") 83 84 # resume the parent 85 self.test_sequence.add_log_lines([ 86 "read packet: $c#00", 87 {"direction": "send", 88 "regex": r"[$]T05thread:p{}[.]{}.*vforkdone.*".format(parent_pid, 89 parent_tid), 90 }, 91 "read packet: $c#00", 92 "send packet: $W00;process:{}#00".format(parent_pid), 93 ], True) 94 self.expect_gdbremote_sequence() 95 96 def fork_and_follow_test(self, variant): 97 parent_pid, parent_tid, child_pid, child_tid = ( 98 self.start_fork_test([variant], variant)) 99 100 # switch to the forked child 101 self.test_sequence.add_log_lines([ 102 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 103 "send packet: $OK#00", 104 "read packet: $Hcp{}.{}#00".format(child_pid, child_tid), 105 "send packet: $OK#00", 106 # detach the parent 107 "read packet: $D;{}#00".format(parent_pid), 108 "send packet: $OK#00", 109 # verify that the correct processes are detached/available 110 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 111 "send packet: $Eff#00", 112 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 113 "send packet: $OK#00", 114 # then resume the child 115 "read packet: $c#00", 116 "send packet: $W00;process:{}#00".format(child_pid), 117 ], True) 118 self.expect_gdbremote_sequence() 119 120 @add_test_categories(["fork"]) 121 def test_fork_follow(self): 122 self.fork_and_follow_test("fork") 123 124 @add_test_categories(["fork"]) 125 def test_vfork_follow(self): 126 self.fork_and_follow_test("vfork") 127 128 @add_test_categories(["fork"]) 129 def test_select_wrong_pid(self): 130 self.build() 131 self.prep_debug_monitor_and_inferior() 132 self.add_qSupported_packets(["multiprocess+"]) 133 ret = self.expect_gdbremote_sequence() 134 self.assertIn("multiprocess+", ret["qSupported_response"]) 135 self.reset_test_sequence() 136 137 # get process pid 138 self.test_sequence.add_log_lines([ 139 "read packet: $qC#00", 140 {"direction": "send", "regex": "[$]QCp([0-9a-f]+).([0-9a-f]+)#.*", 141 "capture": {1: "pid", 2: "tid"}}, 142 ], True) 143 ret = self.expect_gdbremote_sequence() 144 pid, tid = (int(ret[x], 16) for x in ("pid", "tid")) 145 self.reset_test_sequence() 146 147 self.test_sequence.add_log_lines([ 148 # try switching to correct pid 149 "read packet: $Hgp{:x}.{:x}#00".format(pid, tid), 150 "send packet: $OK#00", 151 "read packet: $Hcp{:x}.{:x}#00".format(pid, tid), 152 "send packet: $OK#00", 153 # try switching to invalid tid 154 "read packet: $Hgp{:x}.{:x}#00".format(pid, tid+1), 155 "send packet: $E15#00", 156 "read packet: $Hcp{:x}.{:x}#00".format(pid, tid+1), 157 "send packet: $E15#00", 158 # try switching to invalid pid 159 "read packet: $Hgp{:x}.{:x}#00".format(pid+1, tid), 160 "send packet: $Eff#00", 161 "read packet: $Hcp{:x}.{:x}#00".format(pid+1, tid), 162 "send packet: $Eff#00", 163 ], True) 164 self.expect_gdbremote_sequence() 165 166 @add_test_categories(["fork"]) 167 def test_detach_current(self): 168 self.build() 169 self.prep_debug_monitor_and_inferior() 170 self.add_qSupported_packets(["multiprocess+"]) 171 ret = self.expect_gdbremote_sequence() 172 self.assertIn("multiprocess+", ret["qSupported_response"]) 173 self.reset_test_sequence() 174 175 # get process pid 176 self.test_sequence.add_log_lines([ 177 "read packet: $qC#00", 178 {"direction": "send", "regex": "[$]QCp([0-9a-f]+).[0-9a-f]+#.*", 179 "capture": {1: "pid"}}, 180 ], True) 181 ret = self.expect_gdbremote_sequence() 182 pid = ret["pid"] 183 self.reset_test_sequence() 184 185 # detach the process 186 self.test_sequence.add_log_lines([ 187 "read packet: $D;{}#00".format(pid), 188 "send packet: $OK#00", 189 "read packet: $qC#00", 190 "send packet: $E44#00", 191 ], True) 192 self.expect_gdbremote_sequence() 193 194 @add_test_categories(["fork"]) 195 def test_detach_all(self): 196 parent_pid, parent_tid, child_pid, child_tid = ( 197 self.start_fork_test(["fork"])) 198 199 self.test_sequence.add_log_lines([ 200 # double-check our PIDs 201 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 202 "send packet: $OK#00", 203 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 204 "send packet: $OK#00", 205 # detach all processes 206 "read packet: $D#00", 207 "send packet: $OK#00", 208 # verify that both PIDs are invalid now 209 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 210 "send packet: $Eff#00", 211 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 212 "send packet: $Eff#00", 213 ], True) 214 self.expect_gdbremote_sequence() 215 216 @add_test_categories(["fork"]) 217 def test_kill_all(self): 218 parent_pid, _, child_pid, _ = self.start_fork_test(["fork"]) 219 220 exit_regex = "[$]X09;process:([0-9a-f]+)#.*" 221 self.test_sequence.add_log_lines([ 222 # kill all processes 223 "read packet: $k#00", 224 {"direction": "send", "regex": exit_regex, 225 "capture": {1: "pid1"}}, 226 {"direction": "send", "regex": exit_regex, 227 "capture": {1: "pid2"}}, 228 ], True) 229 ret = self.expect_gdbremote_sequence() 230 self.assertEqual(set([ret["pid1"], ret["pid2"]]), 231 set([parent_pid, child_pid])) 232 233 def vkill_test(self, kill_parent=False, kill_child=False): 234 assert kill_parent or kill_child 235 parent_pid, parent_tid, child_pid, child_tid = ( 236 self.start_fork_test(["fork"])) 237 238 if kill_parent: 239 self.test_sequence.add_log_lines([ 240 # kill the process 241 "read packet: $vKill;{}#00".format(parent_pid), 242 "send packet: $OK#00", 243 ], True) 244 if kill_child: 245 self.test_sequence.add_log_lines([ 246 # kill the process 247 "read packet: $vKill;{}#00".format(child_pid), 248 "send packet: $OK#00", 249 ], True) 250 self.test_sequence.add_log_lines([ 251 # check child PID/TID 252 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 253 "send packet: ${}#00".format("Eff" if kill_child else "OK"), 254 # check parent PID/TID 255 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 256 "send packet: ${}#00".format("Eff" if kill_parent else "OK"), 257 ], True) 258 self.expect_gdbremote_sequence() 259 260 @add_test_categories(["fork"]) 261 def test_vkill_child(self): 262 self.vkill_test(kill_child=True) 263 264 @add_test_categories(["fork"]) 265 def test_vkill_parent(self): 266 self.vkill_test(kill_parent=True) 267 268 @add_test_categories(["fork"]) 269 def test_vkill_both(self): 270 self.vkill_test(kill_parent=True, kill_child=True) 271 272 def resume_one_test(self, run_order, use_vCont=False): 273 parent_pid, parent_tid, child_pid, child_tid = ( 274 self.start_fork_test(["fork", "trap"])) 275 276 parent_expect = [ 277 "[$]T05thread:p{}.{};.*".format(parent_pid, parent_tid), 278 "[$]W00;process:{}#.*".format(parent_pid), 279 ] 280 child_expect = [ 281 "[$]T05thread:p{}.{};.*".format(child_pid, child_tid), 282 "[$]W00;process:{}#.*".format(child_pid), 283 ] 284 285 for x in run_order: 286 if x == "parent": 287 pidtid = (parent_pid, parent_tid) 288 expect = parent_expect.pop(0) 289 elif x == "child": 290 pidtid = (child_pid, child_tid) 291 expect = child_expect.pop(0) 292 else: 293 assert False, "unexpected x={}".format(x) 294 295 if use_vCont: 296 self.test_sequence.add_log_lines([ 297 # continue the selected process 298 "read packet: $vCont;c:p{}.{}#00".format(*pidtid), 299 ], True) 300 else: 301 self.test_sequence.add_log_lines([ 302 # continue the selected process 303 "read packet: $Hcp{}.{}#00".format(*pidtid), 304 "send packet: $OK#00", 305 "read packet: $c#00", 306 ], True) 307 self.test_sequence.add_log_lines([ 308 {"direction": "send", "regex": expect}, 309 ], True) 310 # if at least one process remained, check both PIDs 311 if parent_expect or child_expect: 312 self.test_sequence.add_log_lines([ 313 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 314 "send packet: ${}#00".format("OK" if parent_expect else "Eff"), 315 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 316 "send packet: ${}#00".format("OK" if child_expect else "Eff"), 317 ], True) 318 self.expect_gdbremote_sequence() 319 320 @add_test_categories(["fork"]) 321 def test_c_parent(self): 322 self.resume_one_test(run_order=["parent", "parent"]) 323 324 @add_test_categories(["fork"]) 325 def test_c_child(self): 326 self.resume_one_test(run_order=["child", "child"]) 327 328 @add_test_categories(["fork"]) 329 def test_c_parent_then_child(self): 330 self.resume_one_test(run_order=["parent", "parent", "child", "child"]) 331 332 @add_test_categories(["fork"]) 333 def test_c_child_then_parent(self): 334 self.resume_one_test(run_order=["child", "child", "parent", "parent"]) 335 336 @add_test_categories(["fork"]) 337 def test_c_interspersed(self): 338 self.resume_one_test(run_order=["parent", "child", "parent", "child"]) 339 340 @add_test_categories(["fork"]) 341 def test_vCont_parent(self): 342 self.resume_one_test(run_order=["parent", "parent"], use_vCont=True) 343 344 @add_test_categories(["fork"]) 345 def test_vCont_child(self): 346 self.resume_one_test(run_order=["child", "child"], use_vCont=True) 347 348 @add_test_categories(["fork"]) 349 def test_vCont_parent_then_child(self): 350 self.resume_one_test(run_order=["parent", "parent", "child", "child"], 351 use_vCont=True) 352 353 @add_test_categories(["fork"]) 354 def test_vCont_child_then_parent(self): 355 self.resume_one_test(run_order=["child", "child", "parent", "parent"], 356 use_vCont=True) 357 358 @add_test_categories(["fork"]) 359 def test_vCont_interspersed(self): 360 self.resume_one_test(run_order=["parent", "child", "parent", "child"], 361 use_vCont=True) 362 363 @add_test_categories(["fork"]) 364 def test_vCont_two_processes(self): 365 parent_pid, parent_tid, child_pid, child_tid = ( 366 self.start_fork_test(["fork", "trap"])) 367 368 self.test_sequence.add_log_lines([ 369 # try to resume both processes 370 "read packet: $vCont;c:p{}.{};c:p{}.{}#00".format( 371 parent_pid, parent_tid, child_pid, child_tid), 372 "send packet: $E03#00", 373 ], True) 374 self.expect_gdbremote_sequence() 375 376 @add_test_categories(["fork"]) 377 def test_vCont_all_processes_explicit(self): 378 self.start_fork_test(["fork", "trap"]) 379 380 self.test_sequence.add_log_lines([ 381 # try to resume all processes implicitly 382 "read packet: $vCont;c:p-1.-1#00", 383 "send packet: $E03#00", 384 ], True) 385 self.expect_gdbremote_sequence() 386 387 @add_test_categories(["fork"]) 388 def test_vCont_all_processes_implicit(self): 389 self.start_fork_test(["fork", "trap"]) 390 391 self.test_sequence.add_log_lines([ 392 # try to resume all processes implicitly 393 "read packet: $vCont;c#00", 394 "send packet: $E03#00", 395 ], True) 396 self.expect_gdbremote_sequence() 397 398 @add_test_categories(["fork"]) 399 def test_threadinfo(self): 400 parent_pid, parent_tid, child_pid, child_tid = ( 401 self.start_fork_test(["fork", "thread:new", "trap"])) 402 pidtids = [ 403 (parent_pid, parent_tid), 404 (child_pid, child_tid), 405 ] 406 407 self.add_threadinfo_collection_packets() 408 ret = self.expect_gdbremote_sequence() 409 prev_pidtids = set(self.parse_threadinfo_packets(ret)) 410 self.assertEqual(prev_pidtids, 411 frozenset((int(pid, 16), int(tid, 16)) 412 for pid, tid in pidtids)) 413 self.reset_test_sequence() 414 415 for pidtid in pidtids: 416 self.test_sequence.add_log_lines( 417 ["read packet: $Hcp{}.{}#00".format(*pidtid), 418 "send packet: $OK#00", 419 "read packet: $c#00", 420 {"direction": "send", 421 "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid), 422 }, 423 ], True) 424 self.add_threadinfo_collection_packets() 425 ret = self.expect_gdbremote_sequence() 426 self.reset_test_sequence() 427 new_pidtids = set(self.parse_threadinfo_packets(ret)) 428 added_pidtid = new_pidtids - prev_pidtids 429 prev_pidtids = new_pidtids 430 431 # verify that we've got exactly one new thread, and that 432 # the PID matches 433 self.assertEqual(len(added_pidtid), 1) 434 self.assertEqual(added_pidtid.pop()[0], int(pidtid[0], 16)) 435 436 for pidtid in new_pidtids: 437 self.test_sequence.add_log_lines( 438 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 439 "send packet: $OK#00", 440 ], True) 441 self.expect_gdbremote_sequence() 442 443 @add_test_categories(["fork"]) 444 def test_memory_read_write(self): 445 self.build() 446 INITIAL_DATA = "Initial message" 447 self.prep_debug_monitor_and_inferior( 448 inferior_args=["set-message:{}".format(INITIAL_DATA), 449 "get-data-address-hex:g_message", 450 "fork", 451 "print-message:", 452 "trap", 453 ]) 454 self.add_qSupported_packets(["multiprocess+", 455 "fork-events+"]) 456 ret = self.expect_gdbremote_sequence() 457 self.assertIn("fork-events+", ret["qSupported_response"]) 458 self.reset_test_sequence() 459 460 # continue and expect fork 461 self.test_sequence.add_log_lines([ 462 "read packet: $c#00", 463 {"type": "output_match", 464 "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"), 465 "capture": {1: "addr"}}, 466 {"direction": "send", "regex": self.fork_regex.format("fork"), 467 "capture": self.fork_capture}, 468 ], True) 469 ret = self.expect_gdbremote_sequence() 470 pidtids = { 471 "parent": (ret["parent_pid"], ret["parent_tid"]), 472 "child": (ret["child_pid"], ret["child_tid"]), 473 } 474 addr = ret["addr"] 475 self.reset_test_sequence() 476 477 for name, pidtid in pidtids.items(): 478 self.test_sequence.add_log_lines( 479 ["read packet: $Hgp{}.{}#00".format(*pidtid), 480 "send packet: $OK#00", 481 # read the current memory contents 482 "read packet: $m{},{:x}#00".format(addr, 483 len(INITIAL_DATA) + 1), 484 {"direction": "send", 485 "regex": r"^[$](.+)#.*$", 486 "capture": {1: "data"}}, 487 # write a new value 488 "read packet: $M{},{:x}:{}#00".format(addr, 489 len(name) + 1, 490 seven.hexlify( 491 name + "\0")), 492 "send packet: $OK#00", 493 # resume the process and wait for the trap 494 "read packet: $Hcp{}.{}#00".format(*pidtid), 495 "send packet: $OK#00", 496 "read packet: $c#00", 497 {"type": "output_match", 498 "regex": self.maybe_strict_output_regex(r"message: (.*)\r\n"), 499 "capture": {1: "printed_message"}}, 500 {"direction": "send", 501 "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid), 502 }, 503 ], True) 504 ret = self.expect_gdbremote_sequence() 505 data = seven.unhexlify(ret["data"]) 506 self.assertEqual(data, INITIAL_DATA + "\0") 507 self.assertEqual(ret["printed_message"], name); 508 self.reset_test_sequence() 509 510 # we do the second round separately to make sure that initial data 511 # is correctly preserved while writing into the first process 512 513 for name, pidtid in pidtids.items(): 514 self.test_sequence.add_log_lines( 515 ["read packet: $Hgp{}.{}#00".format(*pidtid), 516 "send packet: $OK#00", 517 # read the current memory contents 518 "read packet: $m{},{:x}#00".format(addr, 519 len(name) + 1), 520 {"direction": "send", 521 "regex": r"^[$](.+)#.*$", 522 "capture": {1: "data"}}, 523 ], True) 524 ret = self.expect_gdbremote_sequence() 525 self.assertIsNotNone(ret.get("data")) 526 data = seven.unhexlify(ret.get("data")) 527 self.assertEqual(data, name + "\0") 528 self.reset_test_sequence() 529 530 @add_test_categories(["fork"]) 531 def test_register_read_write(self): 532 parent_pid, parent_tid, child_pid, child_tid = ( 533 self.start_fork_test(["fork", "thread:new", "trap"])) 534 pidtids = [ 535 (parent_pid, parent_tid), 536 (child_pid, child_tid), 537 ] 538 539 for pidtid in pidtids: 540 self.test_sequence.add_log_lines( 541 ["read packet: $Hcp{}.{}#00".format(*pidtid), 542 "send packet: $OK#00", 543 "read packet: $c#00", 544 {"direction": "send", 545 "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid), 546 }, 547 ], True) 548 549 self.add_threadinfo_collection_packets() 550 ret = self.expect_gdbremote_sequence() 551 self.reset_test_sequence() 552 553 pidtids = set(self.parse_threadinfo_packets(ret)) 554 self.assertEqual(len(pidtids), 4) 555 # first, save register values from all the threads 556 thread_regs = {} 557 for pidtid in pidtids: 558 for regno in range(256): 559 self.test_sequence.add_log_lines( 560 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 561 "send packet: $OK#00", 562 "read packet: $p{:x}#00".format(regno), 563 {"direction": "send", 564 "regex": r"^[$](.+)#.*$", 565 "capture": {1: "data"}}, 566 ], True) 567 ret = self.expect_gdbremote_sequence() 568 data = ret.get("data") 569 self.assertIsNotNone(data) 570 # ignore registers shorter than 32 bits (this also catches 571 # "Exx" errors) 572 if len(data) >= 8: 573 break 574 else: 575 self.skipTest("no usable register found") 576 thread_regs[pidtid] = (regno, data) 577 578 vals = set(x[1] for x in thread_regs.values()) 579 # NB: cheap hack to make the loop below easier 580 new_val = next(iter(vals)) 581 582 # then, start altering them and verify that we don't unexpectedly 583 # change the value from another thread 584 for pidtid in pidtids: 585 old_val = thread_regs[pidtid] 586 regno = old_val[0] 587 old_val_length = len(old_val[1]) 588 # generate a unique new_val 589 while new_val in vals: 590 new_val = ('{{:0{}x}}'.format(old_val_length) 591 .format(random.getrandbits(old_val_length*4))) 592 vals.add(new_val) 593 594 self.test_sequence.add_log_lines( 595 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 596 "send packet: $OK#00", 597 "read packet: $p{:x}#00".format(regno), 598 {"direction": "send", 599 "regex": r"^[$](.+)#.*$", 600 "capture": {1: "data"}}, 601 "read packet: $P{:x}={}#00".format(regno, new_val), 602 "send packet: $OK#00", 603 ], True) 604 ret = self.expect_gdbremote_sequence() 605 data = ret.get("data") 606 self.assertIsNotNone(data) 607 self.assertEqual(data, old_val[1]) 608 thread_regs[pidtid] = (regno, new_val) 609 610 # finally, verify that new values took effect 611 for pidtid in pidtids: 612 old_val = thread_regs[pidtid] 613 self.test_sequence.add_log_lines( 614 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 615 "send packet: $OK#00", 616 "read packet: $p{:x}#00".format(old_val[0]), 617 {"direction": "send", 618 "regex": r"^[$](.+)#.*$", 619 "capture": {1: "data"}}, 620 ], True) 621 ret = self.expect_gdbremote_sequence() 622 data = ret.get("data") 623 self.assertIsNotNone(data) 624 self.assertEqual(data, old_val[1]) 625 626 @add_test_categories(["fork"]) 627 def test_qC(self): 628 parent_pid, parent_tid, child_pid, child_tid = ( 629 self.start_fork_test(["fork", "thread:new", "trap"])) 630 pidtids = [ 631 (parent_pid, parent_tid), 632 (child_pid, child_tid), 633 ] 634 635 for pidtid in pidtids: 636 self.test_sequence.add_log_lines( 637 ["read packet: $Hcp{}.{}#00".format(*pidtid), 638 "send packet: $OK#00", 639 "read packet: $c#00", 640 {"direction": "send", 641 "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid), 642 }, 643 ], True) 644 645 self.add_threadinfo_collection_packets() 646 ret = self.expect_gdbremote_sequence() 647 self.reset_test_sequence() 648 649 pidtids = set(self.parse_threadinfo_packets(ret)) 650 self.assertEqual(len(pidtids), 4) 651 for pidtid in pidtids: 652 self.test_sequence.add_log_lines( 653 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 654 "send packet: $OK#00", 655 "read packet: $qC#00", 656 "send packet: $QCp{:x}.{:x}#00".format(*pidtid), 657 ], True) 658 self.expect_gdbremote_sequence() 659 660 @add_test_categories(["fork"]) 661 def test_T(self): 662 parent_pid, parent_tid, child_pid, child_tid = ( 663 self.start_fork_test(["fork", "thread:new", "trap"])) 664 pidtids = [ 665 (parent_pid, parent_tid), 666 (child_pid, child_tid), 667 ] 668 669 for pidtid in pidtids: 670 self.test_sequence.add_log_lines( 671 ["read packet: $Hcp{}.{}#00".format(*pidtid), 672 "send packet: $OK#00", 673 "read packet: $c#00", 674 {"direction": "send", 675 "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid), 676 }, 677 ], True) 678 679 self.add_threadinfo_collection_packets() 680 ret = self.expect_gdbremote_sequence() 681 self.reset_test_sequence() 682 683 pidtids = set(self.parse_threadinfo_packets(ret)) 684 self.assertEqual(len(pidtids), 4) 685 max_pid = max(pid for pid, tid in pidtids) 686 max_tid = max(tid for pid, tid in pidtids) 687 bad_pidtids = ( 688 (max_pid, max_tid + 1, "E02"), 689 (max_pid + 1, max_tid, "E01"), 690 (max_pid + 1, max_tid + 1, "E01"), 691 ) 692 693 for pidtid in pidtids: 694 self.test_sequence.add_log_lines( 695 [ 696 # test explicit PID+TID 697 "read packet: $Tp{:x}.{:x}#00".format(*pidtid), 698 "send packet: $OK#00", 699 # test implicit PID via Hg 700 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 701 "send packet: $OK#00", 702 "read packet: $T{:x}#00".format(max_tid + 1), 703 "send packet: $E02#00", 704 "read packet: $T{:x}#00".format(pidtid[1]), 705 "send packet: $OK#00", 706 ], True) 707 for pid, tid, expected in bad_pidtids: 708 self.test_sequence.add_log_lines( 709 ["read packet: $Tp{:x}.{:x}#00".format(pid, tid), 710 "send packet: ${}#00".format(expected), 711 ], True) 712 self.expect_gdbremote_sequence() 713