1import random 2 3from lldbsuite.test.decorators import * 4from lldbsuite.test.lldbtest import * 5 6from fork_testbase import GdbRemoteForkTestBase 7 8 9class TestGdbRemoteFork(GdbRemoteForkTestBase): 10 def setUp(self): 11 GdbRemoteForkTestBase.setUp(self) 12 if self.getPlatform() == "linux" and self.getArchitecture() in ['arm', 'aarch64']: 13 self.skipTest("Unsupported for Arm/AArch64 Linux") 14 15 @add_test_categories(["fork"]) 16 def test_fork_multithreaded(self): 17 _, _, child_pid, _ = self.start_fork_test(["thread:new"]*2 + ["fork"]) 18 19 # detach the forked child 20 self.test_sequence.add_log_lines([ 21 "read packet: $D;{}#00".format(child_pid), 22 "send packet: $OK#00", 23 "read packet: $k#00", 24 ], True) 25 self.expect_gdbremote_sequence() 26 27 @add_test_categories(["fork"]) 28 def test_fork(self): 29 parent_pid, _ = self.fork_and_detach_test("fork") 30 31 # resume the parent 32 self.test_sequence.add_log_lines([ 33 "read packet: $c#00", 34 "send packet: $W00;process:{}#00".format(parent_pid), 35 ], True) 36 self.expect_gdbremote_sequence() 37 38 @add_test_categories(["fork"]) 39 def test_vfork(self): 40 parent_pid, parent_tid = self.fork_and_detach_test("vfork") 41 42 # resume the parent 43 self.test_sequence.add_log_lines([ 44 "read packet: $c#00", 45 {"direction": "send", 46 "regex": r"[$]T[0-9a-fA-F]{{2}}thread:p{}[.]{}.*vforkdone.*" 47 .format(parent_pid, parent_tid), 48 }, 49 "read packet: $c#00", 50 "send packet: $W00;process:{}#00".format(parent_pid), 51 ], True) 52 self.expect_gdbremote_sequence() 53 54 @add_test_categories(["fork"]) 55 def test_fork_follow(self): 56 self.fork_and_follow_test("fork") 57 58 @add_test_categories(["fork"]) 59 def test_vfork_follow(self): 60 self.fork_and_follow_test("vfork") 61 62 @add_test_categories(["fork"]) 63 def test_select_wrong_pid(self): 64 self.build() 65 self.prep_debug_monitor_and_inferior() 66 self.add_qSupported_packets(["multiprocess+"]) 67 ret = self.expect_gdbremote_sequence() 68 self.assertIn("multiprocess+", ret["qSupported_response"]) 69 self.reset_test_sequence() 70 71 # get process pid 72 self.test_sequence.add_log_lines([ 73 "read packet: $qC#00", 74 {"direction": "send", "regex": "[$]QCp([0-9a-f]+).([0-9a-f]+)#.*", 75 "capture": {1: "pid", 2: "tid"}}, 76 ], True) 77 ret = self.expect_gdbremote_sequence() 78 pid, tid = (int(ret[x], 16) for x in ("pid", "tid")) 79 self.reset_test_sequence() 80 81 self.test_sequence.add_log_lines([ 82 # try switching to correct pid 83 "read packet: $Hgp{:x}.{:x}#00".format(pid, tid), 84 "send packet: $OK#00", 85 "read packet: $Hcp{:x}.{:x}#00".format(pid, tid), 86 "send packet: $OK#00", 87 # try switching to invalid tid 88 "read packet: $Hgp{:x}.{:x}#00".format(pid, tid+1), 89 "send packet: $E15#00", 90 "read packet: $Hcp{:x}.{:x}#00".format(pid, tid+1), 91 "send packet: $E15#00", 92 # try switching to invalid pid 93 "read packet: $Hgp{:x}.{:x}#00".format(pid+1, tid), 94 "send packet: $Eff#00", 95 "read packet: $Hcp{:x}.{:x}#00".format(pid+1, tid), 96 "send packet: $Eff#00", 97 ], True) 98 self.expect_gdbremote_sequence() 99 100 @add_test_categories(["fork"]) 101 def test_detach_current(self): 102 self.build() 103 self.prep_debug_monitor_and_inferior() 104 self.add_qSupported_packets(["multiprocess+"]) 105 ret = self.expect_gdbremote_sequence() 106 self.assertIn("multiprocess+", ret["qSupported_response"]) 107 self.reset_test_sequence() 108 109 # get process pid 110 self.test_sequence.add_log_lines([ 111 "read packet: $qC#00", 112 {"direction": "send", "regex": "[$]QCp([0-9a-f]+).[0-9a-f]+#.*", 113 "capture": {1: "pid"}}, 114 ], True) 115 ret = self.expect_gdbremote_sequence() 116 pid = ret["pid"] 117 self.reset_test_sequence() 118 119 # detach the process 120 self.test_sequence.add_log_lines([ 121 "read packet: $D;{}#00".format(pid), 122 "send packet: $OK#00", 123 "read packet: $qC#00", 124 "send packet: $E44#00", 125 ], True) 126 self.expect_gdbremote_sequence() 127 128 @add_test_categories(["fork"]) 129 def test_detach_all(self): 130 self.detach_all_test() 131 132 @add_test_categories(["fork"]) 133 def test_kill_all(self): 134 parent_pid, _, child_pid, _ = self.start_fork_test(["fork"]) 135 136 exit_regex = "[$]X09;process:([0-9a-f]+)#.*" 137 self.test_sequence.add_log_lines([ 138 # kill all processes 139 "read packet: $k#00", 140 {"direction": "send", "regex": exit_regex, 141 "capture": {1: "pid1"}}, 142 {"direction": "send", "regex": exit_regex, 143 "capture": {1: "pid2"}}, 144 ], True) 145 ret = self.expect_gdbremote_sequence() 146 self.assertEqual(set([ret["pid1"], ret["pid2"]]), 147 set([parent_pid, child_pid])) 148 149 @add_test_categories(["fork"]) 150 def test_vkill_child(self): 151 self.vkill_test(kill_child=True) 152 153 @add_test_categories(["fork"]) 154 def test_vkill_parent(self): 155 self.vkill_test(kill_parent=True) 156 157 @add_test_categories(["fork"]) 158 def test_vkill_both(self): 159 self.vkill_test(kill_parent=True, kill_child=True) 160 161 @add_test_categories(["fork"]) 162 def test_c_parent(self): 163 self.resume_one_test(run_order=["parent", "parent"]) 164 165 @add_test_categories(["fork"]) 166 def test_c_child(self): 167 self.resume_one_test(run_order=["child", "child"]) 168 169 @add_test_categories(["fork"]) 170 def test_c_parent_then_child(self): 171 self.resume_one_test(run_order=["parent", "parent", "child", "child"]) 172 173 @add_test_categories(["fork"]) 174 def test_c_child_then_parent(self): 175 self.resume_one_test(run_order=["child", "child", "parent", "parent"]) 176 177 @add_test_categories(["fork"]) 178 def test_c_interspersed(self): 179 self.resume_one_test(run_order=["parent", "child", "parent", "child"]) 180 181 @add_test_categories(["fork"]) 182 def test_vCont_parent(self): 183 self.resume_one_test(run_order=["parent", "parent"], use_vCont=True) 184 185 @add_test_categories(["fork"]) 186 def test_vCont_child(self): 187 self.resume_one_test(run_order=["child", "child"], use_vCont=True) 188 189 @add_test_categories(["fork"]) 190 def test_vCont_parent_then_child(self): 191 self.resume_one_test(run_order=["parent", "parent", "child", "child"], 192 use_vCont=True) 193 194 @add_test_categories(["fork"]) 195 def test_vCont_child_then_parent(self): 196 self.resume_one_test(run_order=["child", "child", "parent", "parent"], 197 use_vCont=True) 198 199 @add_test_categories(["fork"]) 200 def test_vCont_interspersed(self): 201 self.resume_one_test(run_order=["parent", "child", "parent", "child"], 202 use_vCont=True) 203 204 @add_test_categories(["fork"]) 205 def test_vCont_two_processes(self): 206 parent_pid, parent_tid, child_pid, child_tid = ( 207 self.start_fork_test(["fork", "stop"])) 208 209 self.test_sequence.add_log_lines([ 210 # try to resume both processes 211 "read packet: $vCont;c:p{}.{};c:p{}.{}#00".format( 212 parent_pid, parent_tid, child_pid, child_tid), 213 "send packet: $E03#00", 214 ], True) 215 self.expect_gdbremote_sequence() 216 217 @add_test_categories(["fork"]) 218 def test_vCont_all_processes_explicit(self): 219 self.start_fork_test(["fork", "stop"]) 220 221 self.test_sequence.add_log_lines([ 222 # try to resume all processes implicitly 223 "read packet: $vCont;c:p-1.-1#00", 224 "send packet: $E03#00", 225 ], True) 226 self.expect_gdbremote_sequence() 227 228 @add_test_categories(["fork"]) 229 def test_vCont_all_processes_implicit(self): 230 self.start_fork_test(["fork", "stop"]) 231 232 self.test_sequence.add_log_lines([ 233 # try to resume all processes implicitly 234 "read packet: $vCont;c#00", 235 "send packet: $E03#00", 236 ], True) 237 self.expect_gdbremote_sequence() 238 239 @add_test_categories(["fork"]) 240 def test_threadinfo(self): 241 parent_pid, parent_tid, child_pid, child_tid = ( 242 self.start_fork_test(["fork", "thread:new", "stop"])) 243 pidtids = [ 244 (parent_pid, parent_tid), 245 (child_pid, child_tid), 246 ] 247 248 self.add_threadinfo_collection_packets() 249 ret = self.expect_gdbremote_sequence() 250 prev_pidtids = set(self.parse_threadinfo_packets(ret)) 251 self.assertEqual(prev_pidtids, 252 frozenset((int(pid, 16), int(tid, 16)) 253 for pid, tid in pidtids)) 254 self.reset_test_sequence() 255 256 for pidtid in pidtids: 257 self.test_sequence.add_log_lines( 258 ["read packet: $Hcp{}.{}#00".format(*pidtid), 259 "send packet: $OK#00", 260 "read packet: $c#00", 261 {"direction": "send", 262 "regex": self.stop_regex.format(*pidtid), 263 }, 264 ], True) 265 self.add_threadinfo_collection_packets() 266 ret = self.expect_gdbremote_sequence() 267 self.reset_test_sequence() 268 new_pidtids = set(self.parse_threadinfo_packets(ret)) 269 added_pidtid = new_pidtids - prev_pidtids 270 prev_pidtids = new_pidtids 271 272 # verify that we've got exactly one new thread, and that 273 # the PID matches 274 self.assertEqual(len(added_pidtid), 1) 275 self.assertEqual(added_pidtid.pop()[0], int(pidtid[0], 16)) 276 277 for pidtid in new_pidtids: 278 self.test_sequence.add_log_lines( 279 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 280 "send packet: $OK#00", 281 ], True) 282 self.expect_gdbremote_sequence() 283 284 @add_test_categories(["fork"]) 285 def test_memory_read_write(self): 286 self.build() 287 INITIAL_DATA = "Initial message" 288 self.prep_debug_monitor_and_inferior( 289 inferior_args=["set-message:{}".format(INITIAL_DATA), 290 "get-data-address-hex:g_message", 291 "fork", 292 "print-message:", 293 "stop", 294 ]) 295 self.add_qSupported_packets(["multiprocess+", 296 "fork-events+"]) 297 ret = self.expect_gdbremote_sequence() 298 self.assertIn("fork-events+", ret["qSupported_response"]) 299 self.reset_test_sequence() 300 301 # continue and expect fork 302 self.test_sequence.add_log_lines([ 303 "read packet: $c#00", 304 {"type": "output_match", 305 "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"), 306 "capture": {1: "addr"}}, 307 {"direction": "send", "regex": self.fork_regex.format("fork"), 308 "capture": self.fork_capture}, 309 ], True) 310 ret = self.expect_gdbremote_sequence() 311 pidtids = { 312 "parent": (ret["parent_pid"], ret["parent_tid"]), 313 "child": (ret["child_pid"], ret["child_tid"]), 314 } 315 addr = ret["addr"] 316 self.reset_test_sequence() 317 318 for name, pidtid in pidtids.items(): 319 self.test_sequence.add_log_lines( 320 ["read packet: $Hgp{}.{}#00".format(*pidtid), 321 "send packet: $OK#00", 322 # read the current memory contents 323 "read packet: $m{},{:x}#00".format(addr, 324 len(INITIAL_DATA) + 1), 325 {"direction": "send", 326 "regex": r"^[$](.+)#.*$", 327 "capture": {1: "data"}}, 328 # write a new value 329 "read packet: $M{},{:x}:{}#00".format(addr, 330 len(name) + 1, 331 seven.hexlify( 332 name + "\0")), 333 "send packet: $OK#00", 334 # resume the process and wait for the trap 335 "read packet: $Hcp{}.{}#00".format(*pidtid), 336 "send packet: $OK#00", 337 "read packet: $c#00", 338 {"type": "output_match", 339 "regex": self.maybe_strict_output_regex(r"message: (.*)\r\n"), 340 "capture": {1: "printed_message"}}, 341 {"direction": "send", 342 "regex": self.stop_regex.format(*pidtid), 343 }, 344 ], True) 345 ret = self.expect_gdbremote_sequence() 346 data = seven.unhexlify(ret["data"]) 347 self.assertEqual(data, INITIAL_DATA + "\0") 348 self.assertEqual(ret["printed_message"], name); 349 self.reset_test_sequence() 350 351 # we do the second round separately to make sure that initial data 352 # is correctly preserved while writing into the first process 353 354 for name, pidtid in pidtids.items(): 355 self.test_sequence.add_log_lines( 356 ["read packet: $Hgp{}.{}#00".format(*pidtid), 357 "send packet: $OK#00", 358 # read the current memory contents 359 "read packet: $m{},{:x}#00".format(addr, 360 len(name) + 1), 361 {"direction": "send", 362 "regex": r"^[$](.+)#.*$", 363 "capture": {1: "data"}}, 364 ], True) 365 ret = self.expect_gdbremote_sequence() 366 self.assertIsNotNone(ret.get("data")) 367 data = seven.unhexlify(ret.get("data")) 368 self.assertEqual(data, name + "\0") 369 self.reset_test_sequence() 370 371 @add_test_categories(["fork"]) 372 def test_register_read_write(self): 373 parent_pid, parent_tid, child_pid, child_tid = ( 374 self.start_fork_test(["fork", "thread:new", "stop"])) 375 pidtids = [ 376 (parent_pid, parent_tid), 377 (child_pid, child_tid), 378 ] 379 380 for pidtid in pidtids: 381 self.test_sequence.add_log_lines( 382 ["read packet: $Hcp{}.{}#00".format(*pidtid), 383 "send packet: $OK#00", 384 "read packet: $c#00", 385 {"direction": "send", 386 "regex": self.stop_regex.format(*pidtid), 387 }, 388 ], True) 389 390 self.add_threadinfo_collection_packets() 391 ret = self.expect_gdbremote_sequence() 392 self.reset_test_sequence() 393 394 pidtids = set(self.parse_threadinfo_packets(ret)) 395 self.assertEqual(len(pidtids), 4) 396 # first, save register values from all the threads 397 thread_regs = {} 398 for pidtid in pidtids: 399 for regno in range(256): 400 self.test_sequence.add_log_lines( 401 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 402 "send packet: $OK#00", 403 "read packet: $p{:x}#00".format(regno), 404 {"direction": "send", 405 "regex": r"^[$](.+)#.*$", 406 "capture": {1: "data"}}, 407 ], True) 408 ret = self.expect_gdbremote_sequence() 409 data = ret.get("data") 410 self.assertIsNotNone(data) 411 # ignore registers shorter than 32 bits (this also catches 412 # "Exx" errors) 413 if len(data) >= 8: 414 break 415 else: 416 self.skipTest("no usable register found") 417 thread_regs[pidtid] = (regno, data) 418 419 vals = set(x[1] for x in thread_regs.values()) 420 # NB: cheap hack to make the loop below easier 421 new_val = next(iter(vals)) 422 423 # then, start altering them and verify that we don't unexpectedly 424 # change the value from another thread 425 for pidtid in pidtids: 426 old_val = thread_regs[pidtid] 427 regno = old_val[0] 428 old_val_length = len(old_val[1]) 429 # generate a unique new_val 430 while new_val in vals: 431 new_val = ('{{:0{}x}}'.format(old_val_length) 432 .format(random.getrandbits(old_val_length*4))) 433 vals.add(new_val) 434 435 self.test_sequence.add_log_lines( 436 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 437 "send packet: $OK#00", 438 "read packet: $p{:x}#00".format(regno), 439 {"direction": "send", 440 "regex": r"^[$](.+)#.*$", 441 "capture": {1: "data"}}, 442 "read packet: $P{:x}={}#00".format(regno, new_val), 443 "send packet: $OK#00", 444 ], True) 445 ret = self.expect_gdbremote_sequence() 446 data = ret.get("data") 447 self.assertIsNotNone(data) 448 self.assertEqual(data, old_val[1]) 449 thread_regs[pidtid] = (regno, new_val) 450 451 # finally, verify that new values took effect 452 for pidtid in pidtids: 453 old_val = thread_regs[pidtid] 454 self.test_sequence.add_log_lines( 455 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 456 "send packet: $OK#00", 457 "read packet: $p{:x}#00".format(old_val[0]), 458 {"direction": "send", 459 "regex": r"^[$](.+)#.*$", 460 "capture": {1: "data"}}, 461 ], True) 462 ret = self.expect_gdbremote_sequence() 463 data = ret.get("data") 464 self.assertIsNotNone(data) 465 self.assertEqual(data, old_val[1]) 466 467 @add_test_categories(["fork"]) 468 def test_qC(self): 469 parent_pid, parent_tid, child_pid, child_tid = ( 470 self.start_fork_test(["fork", "thread:new", "stop"])) 471 pidtids = [ 472 (parent_pid, parent_tid), 473 (child_pid, child_tid), 474 ] 475 476 for pidtid in pidtids: 477 self.test_sequence.add_log_lines( 478 ["read packet: $Hcp{}.{}#00".format(*pidtid), 479 "send packet: $OK#00", 480 "read packet: $c#00", 481 {"direction": "send", 482 "regex": self.stop_regex.format(*pidtid), 483 }, 484 ], True) 485 486 self.add_threadinfo_collection_packets() 487 ret = self.expect_gdbremote_sequence() 488 self.reset_test_sequence() 489 490 pidtids = set(self.parse_threadinfo_packets(ret)) 491 self.assertEqual(len(pidtids), 4) 492 for pidtid in pidtids: 493 self.test_sequence.add_log_lines( 494 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 495 "send packet: $OK#00", 496 "read packet: $qC#00", 497 "send packet: $QCp{:x}.{:x}#00".format(*pidtid), 498 ], True) 499 self.expect_gdbremote_sequence() 500 501 @add_test_categories(["fork"]) 502 def test_T(self): 503 parent_pid, parent_tid, child_pid, child_tid = ( 504 self.start_fork_test(["fork", "thread:new", "stop"])) 505 pidtids = [ 506 (parent_pid, parent_tid), 507 (child_pid, child_tid), 508 ] 509 510 for pidtid in pidtids: 511 self.test_sequence.add_log_lines( 512 ["read packet: $Hcp{}.{}#00".format(*pidtid), 513 "send packet: $OK#00", 514 "read packet: $c#00", 515 {"direction": "send", 516 "regex": self.stop_regex.format(*pidtid), 517 }, 518 ], True) 519 520 self.add_threadinfo_collection_packets() 521 ret = self.expect_gdbremote_sequence() 522 self.reset_test_sequence() 523 524 pidtids = set(self.parse_threadinfo_packets(ret)) 525 self.assertEqual(len(pidtids), 4) 526 max_pid = max(pid for pid, tid in pidtids) 527 max_tid = max(tid for pid, tid in pidtids) 528 bad_pidtids = ( 529 (max_pid, max_tid + 1, "E02"), 530 (max_pid + 1, max_tid, "E01"), 531 (max_pid + 1, max_tid + 1, "E01"), 532 ) 533 534 for pidtid in pidtids: 535 self.test_sequence.add_log_lines( 536 [ 537 # test explicit PID+TID 538 "read packet: $Tp{:x}.{:x}#00".format(*pidtid), 539 "send packet: $OK#00", 540 # test implicit PID via Hg 541 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 542 "send packet: $OK#00", 543 "read packet: $T{:x}#00".format(max_tid + 1), 544 "send packet: $E02#00", 545 "read packet: $T{:x}#00".format(pidtid[1]), 546 "send packet: $OK#00", 547 ], True) 548 for pid, tid, expected in bad_pidtids: 549 self.test_sequence.add_log_lines( 550 ["read packet: $Tp{:x}.{:x}#00".format(pid, tid), 551 "send packet: ${}#00".format(expected), 552 ], True) 553 self.expect_gdbremote_sequence() 554