1import random 2 3from lldbsuite.test.decorators import * 4from lldbsuite.test.lldbtest import * 5 6from fork_testbase import GdbRemoteForkTestBase 7 8 9class TestGdbRemoteFork(GdbRemoteForkTestBase): 10 @add_test_categories(["fork"]) 11 def test_fork_multithreaded(self): 12 _, _, child_pid, _ = self.start_fork_test(["thread:new"]*2 + ["fork"]) 13 14 # detach the forked child 15 self.test_sequence.add_log_lines([ 16 "read packet: $D;{}#00".format(child_pid), 17 "send packet: $OK#00", 18 "read packet: $k#00", 19 ], True) 20 self.expect_gdbremote_sequence() 21 22 @add_test_categories(["fork"]) 23 def test_fork(self): 24 parent_pid, _ = self.fork_and_detach_test("fork") 25 26 # resume the parent 27 self.test_sequence.add_log_lines([ 28 "read packet: $c#00", 29 "send packet: $W00;process:{}#00".format(parent_pid), 30 ], True) 31 self.expect_gdbremote_sequence() 32 33 @add_test_categories(["fork"]) 34 def test_vfork(self): 35 parent_pid, parent_tid = self.fork_and_detach_test("vfork") 36 37 # resume the parent 38 self.test_sequence.add_log_lines([ 39 "read packet: $c#00", 40 {"direction": "send", 41 "regex": r"[$]T[0-9a-fA-F]{{2}}thread:p{}[.]{}.*vforkdone.*" 42 .format(parent_pid, parent_tid), 43 }, 44 "read packet: $c#00", 45 "send packet: $W00;process:{}#00".format(parent_pid), 46 ], True) 47 self.expect_gdbremote_sequence() 48 49 @add_test_categories(["fork"]) 50 def test_fork_follow(self): 51 self.fork_and_follow_test("fork") 52 53 @add_test_categories(["fork"]) 54 def test_vfork_follow(self): 55 self.fork_and_follow_test("vfork") 56 57 @add_test_categories(["fork"]) 58 def test_select_wrong_pid(self): 59 self.build() 60 self.prep_debug_monitor_and_inferior() 61 self.add_qSupported_packets(["multiprocess+"]) 62 ret = self.expect_gdbremote_sequence() 63 self.assertIn("multiprocess+", ret["qSupported_response"]) 64 self.reset_test_sequence() 65 66 # get process pid 67 self.test_sequence.add_log_lines([ 68 "read packet: $qC#00", 69 {"direction": "send", "regex": "[$]QCp([0-9a-f]+).([0-9a-f]+)#.*", 70 "capture": {1: "pid", 2: "tid"}}, 71 ], True) 72 ret = self.expect_gdbremote_sequence() 73 pid, tid = (int(ret[x], 16) for x in ("pid", "tid")) 74 self.reset_test_sequence() 75 76 self.test_sequence.add_log_lines([ 77 # try switching to correct pid 78 "read packet: $Hgp{:x}.{:x}#00".format(pid, tid), 79 "send packet: $OK#00", 80 "read packet: $Hcp{:x}.{:x}#00".format(pid, tid), 81 "send packet: $OK#00", 82 # try switching to invalid tid 83 "read packet: $Hgp{:x}.{:x}#00".format(pid, tid+1), 84 "send packet: $E15#00", 85 "read packet: $Hcp{:x}.{:x}#00".format(pid, tid+1), 86 "send packet: $E15#00", 87 # try switching to invalid pid 88 "read packet: $Hgp{:x}.{:x}#00".format(pid+1, tid), 89 "send packet: $Eff#00", 90 "read packet: $Hcp{:x}.{:x}#00".format(pid+1, tid), 91 "send packet: $Eff#00", 92 ], True) 93 self.expect_gdbremote_sequence() 94 95 @add_test_categories(["fork"]) 96 def test_detach_current(self): 97 self.build() 98 self.prep_debug_monitor_and_inferior() 99 self.add_qSupported_packets(["multiprocess+"]) 100 ret = self.expect_gdbremote_sequence() 101 self.assertIn("multiprocess+", ret["qSupported_response"]) 102 self.reset_test_sequence() 103 104 # get process pid 105 self.test_sequence.add_log_lines([ 106 "read packet: $qC#00", 107 {"direction": "send", "regex": "[$]QCp([0-9a-f]+).[0-9a-f]+#.*", 108 "capture": {1: "pid"}}, 109 ], True) 110 ret = self.expect_gdbremote_sequence() 111 pid = ret["pid"] 112 self.reset_test_sequence() 113 114 # detach the process 115 self.test_sequence.add_log_lines([ 116 "read packet: $D;{}#00".format(pid), 117 "send packet: $OK#00", 118 "read packet: $qC#00", 119 "send packet: $E44#00", 120 ], True) 121 self.expect_gdbremote_sequence() 122 123 @add_test_categories(["fork"]) 124 def test_detach_all(self): 125 self.detach_all_test() 126 127 @add_test_categories(["fork"]) 128 def test_kill_all(self): 129 parent_pid, _, child_pid, _ = self.start_fork_test(["fork"]) 130 131 exit_regex = "[$]X09;process:([0-9a-f]+)#.*" 132 self.test_sequence.add_log_lines([ 133 # kill all processes 134 "read packet: $k#00", 135 {"direction": "send", "regex": exit_regex, 136 "capture": {1: "pid1"}}, 137 {"direction": "send", "regex": exit_regex, 138 "capture": {1: "pid2"}}, 139 ], True) 140 ret = self.expect_gdbremote_sequence() 141 self.assertEqual(set([ret["pid1"], ret["pid2"]]), 142 set([parent_pid, child_pid])) 143 144 @add_test_categories(["fork"]) 145 def test_vkill_child(self): 146 self.vkill_test(kill_child=True) 147 148 @add_test_categories(["fork"]) 149 def test_vkill_parent(self): 150 self.vkill_test(kill_parent=True) 151 152 @add_test_categories(["fork"]) 153 def test_vkill_both(self): 154 self.vkill_test(kill_parent=True, kill_child=True) 155 156 @add_test_categories(["fork"]) 157 def test_c_parent(self): 158 self.resume_one_test(run_order=["parent", "parent"]) 159 160 @add_test_categories(["fork"]) 161 def test_c_child(self): 162 self.resume_one_test(run_order=["child", "child"]) 163 164 @add_test_categories(["fork"]) 165 def test_c_parent_then_child(self): 166 self.resume_one_test(run_order=["parent", "parent", "child", "child"]) 167 168 @add_test_categories(["fork"]) 169 def test_c_child_then_parent(self): 170 self.resume_one_test(run_order=["child", "child", "parent", "parent"]) 171 172 @add_test_categories(["fork"]) 173 def test_c_interspersed(self): 174 self.resume_one_test(run_order=["parent", "child", "parent", "child"]) 175 176 @add_test_categories(["fork"]) 177 def test_vCont_parent(self): 178 self.resume_one_test(run_order=["parent", "parent"], use_vCont=True) 179 180 @add_test_categories(["fork"]) 181 def test_vCont_child(self): 182 self.resume_one_test(run_order=["child", "child"], use_vCont=True) 183 184 @add_test_categories(["fork"]) 185 def test_vCont_parent_then_child(self): 186 self.resume_one_test(run_order=["parent", "parent", "child", "child"], 187 use_vCont=True) 188 189 @add_test_categories(["fork"]) 190 def test_vCont_child_then_parent(self): 191 self.resume_one_test(run_order=["child", "child", "parent", "parent"], 192 use_vCont=True) 193 194 @add_test_categories(["fork"]) 195 def test_vCont_interspersed(self): 196 self.resume_one_test(run_order=["parent", "child", "parent", "child"], 197 use_vCont=True) 198 199 @add_test_categories(["fork"]) 200 def test_vCont_two_processes(self): 201 parent_pid, parent_tid, child_pid, child_tid = ( 202 self.start_fork_test(["fork", "stop"])) 203 204 self.test_sequence.add_log_lines([ 205 # try to resume both processes 206 "read packet: $vCont;c:p{}.{};c:p{}.{}#00".format( 207 parent_pid, parent_tid, child_pid, child_tid), 208 "send packet: $E03#00", 209 ], True) 210 self.expect_gdbremote_sequence() 211 212 @add_test_categories(["fork"]) 213 def test_vCont_all_processes_explicit(self): 214 self.start_fork_test(["fork", "stop"]) 215 216 self.test_sequence.add_log_lines([ 217 # try to resume all processes implicitly 218 "read packet: $vCont;c:p-1.-1#00", 219 "send packet: $E03#00", 220 ], True) 221 self.expect_gdbremote_sequence() 222 223 @add_test_categories(["fork"]) 224 def test_vCont_all_processes_implicit(self): 225 self.start_fork_test(["fork", "stop"]) 226 227 self.test_sequence.add_log_lines([ 228 # try to resume all processes implicitly 229 "read packet: $vCont;c#00", 230 "send packet: $E03#00", 231 ], True) 232 self.expect_gdbremote_sequence() 233 234 @add_test_categories(["fork"]) 235 def test_threadinfo(self): 236 parent_pid, parent_tid, child_pid, child_tid = ( 237 self.start_fork_test(["fork", "thread:new", "stop"])) 238 pidtids = [ 239 (parent_pid, parent_tid), 240 (child_pid, child_tid), 241 ] 242 243 self.add_threadinfo_collection_packets() 244 ret = self.expect_gdbremote_sequence() 245 prev_pidtids = set(self.parse_threadinfo_packets(ret)) 246 self.assertEqual(prev_pidtids, 247 frozenset((int(pid, 16), int(tid, 16)) 248 for pid, tid in pidtids)) 249 self.reset_test_sequence() 250 251 for pidtid in pidtids: 252 self.test_sequence.add_log_lines( 253 ["read packet: $Hcp{}.{}#00".format(*pidtid), 254 "send packet: $OK#00", 255 "read packet: $c#00", 256 {"direction": "send", 257 "regex": self.stop_regex.format(*pidtid), 258 }, 259 ], True) 260 self.add_threadinfo_collection_packets() 261 ret = self.expect_gdbremote_sequence() 262 self.reset_test_sequence() 263 new_pidtids = set(self.parse_threadinfo_packets(ret)) 264 added_pidtid = new_pidtids - prev_pidtids 265 prev_pidtids = new_pidtids 266 267 # verify that we've got exactly one new thread, and that 268 # the PID matches 269 self.assertEqual(len(added_pidtid), 1) 270 self.assertEqual(added_pidtid.pop()[0], int(pidtid[0], 16)) 271 272 for pidtid in new_pidtids: 273 self.test_sequence.add_log_lines( 274 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 275 "send packet: $OK#00", 276 ], True) 277 self.expect_gdbremote_sequence() 278 279 @add_test_categories(["fork"]) 280 def test_memory_read_write(self): 281 self.build() 282 INITIAL_DATA = "Initial message" 283 self.prep_debug_monitor_and_inferior( 284 inferior_args=["set-message:{}".format(INITIAL_DATA), 285 "get-data-address-hex:g_message", 286 "fork", 287 "print-message:", 288 "stop", 289 ]) 290 self.add_qSupported_packets(["multiprocess+", 291 "fork-events+"]) 292 ret = self.expect_gdbremote_sequence() 293 self.assertIn("fork-events+", ret["qSupported_response"]) 294 self.reset_test_sequence() 295 296 # continue and expect fork 297 self.test_sequence.add_log_lines([ 298 "read packet: $c#00", 299 {"type": "output_match", 300 "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"), 301 "capture": {1: "addr"}}, 302 {"direction": "send", "regex": self.fork_regex.format("fork"), 303 "capture": self.fork_capture}, 304 ], True) 305 ret = self.expect_gdbremote_sequence() 306 pidtids = { 307 "parent": (ret["parent_pid"], ret["parent_tid"]), 308 "child": (ret["child_pid"], ret["child_tid"]), 309 } 310 addr = ret["addr"] 311 self.reset_test_sequence() 312 313 for name, pidtid in pidtids.items(): 314 self.test_sequence.add_log_lines( 315 ["read packet: $Hgp{}.{}#00".format(*pidtid), 316 "send packet: $OK#00", 317 # read the current memory contents 318 "read packet: $m{},{:x}#00".format(addr, 319 len(INITIAL_DATA) + 1), 320 {"direction": "send", 321 "regex": r"^[$](.+)#.*$", 322 "capture": {1: "data"}}, 323 # write a new value 324 "read packet: $M{},{:x}:{}#00".format(addr, 325 len(name) + 1, 326 seven.hexlify( 327 name + "\0")), 328 "send packet: $OK#00", 329 # resume the process and wait for the trap 330 "read packet: $Hcp{}.{}#00".format(*pidtid), 331 "send packet: $OK#00", 332 "read packet: $c#00", 333 {"type": "output_match", 334 "regex": self.maybe_strict_output_regex(r"message: (.*)\r\n"), 335 "capture": {1: "printed_message"}}, 336 {"direction": "send", 337 "regex": self.stop_regex.format(*pidtid), 338 }, 339 ], True) 340 ret = self.expect_gdbremote_sequence() 341 data = seven.unhexlify(ret["data"]) 342 self.assertEqual(data, INITIAL_DATA + "\0") 343 self.assertEqual(ret["printed_message"], name); 344 self.reset_test_sequence() 345 346 # we do the second round separately to make sure that initial data 347 # is correctly preserved while writing into the first process 348 349 for name, pidtid in pidtids.items(): 350 self.test_sequence.add_log_lines( 351 ["read packet: $Hgp{}.{}#00".format(*pidtid), 352 "send packet: $OK#00", 353 # read the current memory contents 354 "read packet: $m{},{:x}#00".format(addr, 355 len(name) + 1), 356 {"direction": "send", 357 "regex": r"^[$](.+)#.*$", 358 "capture": {1: "data"}}, 359 ], True) 360 ret = self.expect_gdbremote_sequence() 361 self.assertIsNotNone(ret.get("data")) 362 data = seven.unhexlify(ret.get("data")) 363 self.assertEqual(data, name + "\0") 364 self.reset_test_sequence() 365 366 @add_test_categories(["fork"]) 367 def test_register_read_write(self): 368 parent_pid, parent_tid, child_pid, child_tid = ( 369 self.start_fork_test(["fork", "thread:new", "stop"])) 370 pidtids = [ 371 (parent_pid, parent_tid), 372 (child_pid, child_tid), 373 ] 374 375 for pidtid in pidtids: 376 self.test_sequence.add_log_lines( 377 ["read packet: $Hcp{}.{}#00".format(*pidtid), 378 "send packet: $OK#00", 379 "read packet: $c#00", 380 {"direction": "send", 381 "regex": self.stop_regex.format(*pidtid), 382 }, 383 ], True) 384 385 self.add_threadinfo_collection_packets() 386 ret = self.expect_gdbremote_sequence() 387 self.reset_test_sequence() 388 389 pidtids = set(self.parse_threadinfo_packets(ret)) 390 self.assertEqual(len(pidtids), 4) 391 # first, save register values from all the threads 392 thread_regs = {} 393 for pidtid in pidtids: 394 for regno in range(256): 395 self.test_sequence.add_log_lines( 396 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 397 "send packet: $OK#00", 398 "read packet: $p{:x}#00".format(regno), 399 {"direction": "send", 400 "regex": r"^[$](.+)#.*$", 401 "capture": {1: "data"}}, 402 ], True) 403 ret = self.expect_gdbremote_sequence() 404 data = ret.get("data") 405 self.assertIsNotNone(data) 406 # ignore registers shorter than 32 bits (this also catches 407 # "Exx" errors) 408 if len(data) >= 8: 409 break 410 else: 411 self.skipTest("no usable register found") 412 thread_regs[pidtid] = (regno, data) 413 414 vals = set(x[1] for x in thread_regs.values()) 415 # NB: cheap hack to make the loop below easier 416 new_val = next(iter(vals)) 417 418 # then, start altering them and verify that we don't unexpectedly 419 # change the value from another thread 420 for pidtid in pidtids: 421 old_val = thread_regs[pidtid] 422 regno = old_val[0] 423 old_val_length = len(old_val[1]) 424 # generate a unique new_val 425 while new_val in vals: 426 new_val = ('{{:0{}x}}'.format(old_val_length) 427 .format(random.getrandbits(old_val_length*4))) 428 vals.add(new_val) 429 430 self.test_sequence.add_log_lines( 431 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 432 "send packet: $OK#00", 433 "read packet: $p{:x}#00".format(regno), 434 {"direction": "send", 435 "regex": r"^[$](.+)#.*$", 436 "capture": {1: "data"}}, 437 "read packet: $P{:x}={}#00".format(regno, new_val), 438 "send packet: $OK#00", 439 ], True) 440 ret = self.expect_gdbremote_sequence() 441 data = ret.get("data") 442 self.assertIsNotNone(data) 443 self.assertEqual(data, old_val[1]) 444 thread_regs[pidtid] = (regno, new_val) 445 446 # finally, verify that new values took effect 447 for pidtid in pidtids: 448 old_val = thread_regs[pidtid] 449 self.test_sequence.add_log_lines( 450 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 451 "send packet: $OK#00", 452 "read packet: $p{:x}#00".format(old_val[0]), 453 {"direction": "send", 454 "regex": r"^[$](.+)#.*$", 455 "capture": {1: "data"}}, 456 ], True) 457 ret = self.expect_gdbremote_sequence() 458 data = ret.get("data") 459 self.assertIsNotNone(data) 460 self.assertEqual(data, old_val[1]) 461 462 @add_test_categories(["fork"]) 463 def test_qC(self): 464 parent_pid, parent_tid, child_pid, child_tid = ( 465 self.start_fork_test(["fork", "thread:new", "stop"])) 466 pidtids = [ 467 (parent_pid, parent_tid), 468 (child_pid, child_tid), 469 ] 470 471 for pidtid in pidtids: 472 self.test_sequence.add_log_lines( 473 ["read packet: $Hcp{}.{}#00".format(*pidtid), 474 "send packet: $OK#00", 475 "read packet: $c#00", 476 {"direction": "send", 477 "regex": self.stop_regex.format(*pidtid), 478 }, 479 ], True) 480 481 self.add_threadinfo_collection_packets() 482 ret = self.expect_gdbremote_sequence() 483 self.reset_test_sequence() 484 485 pidtids = set(self.parse_threadinfo_packets(ret)) 486 self.assertEqual(len(pidtids), 4) 487 for pidtid in pidtids: 488 self.test_sequence.add_log_lines( 489 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 490 "send packet: $OK#00", 491 "read packet: $qC#00", 492 "send packet: $QCp{:x}.{:x}#00".format(*pidtid), 493 ], True) 494 self.expect_gdbremote_sequence() 495 496 @add_test_categories(["fork"]) 497 def test_T(self): 498 parent_pid, parent_tid, child_pid, child_tid = ( 499 self.start_fork_test(["fork", "thread:new", "stop"])) 500 pidtids = [ 501 (parent_pid, parent_tid), 502 (child_pid, child_tid), 503 ] 504 505 for pidtid in pidtids: 506 self.test_sequence.add_log_lines( 507 ["read packet: $Hcp{}.{}#00".format(*pidtid), 508 "send packet: $OK#00", 509 "read packet: $c#00", 510 {"direction": "send", 511 "regex": self.stop_regex.format(*pidtid), 512 }, 513 ], True) 514 515 self.add_threadinfo_collection_packets() 516 ret = self.expect_gdbremote_sequence() 517 self.reset_test_sequence() 518 519 pidtids = set(self.parse_threadinfo_packets(ret)) 520 self.assertEqual(len(pidtids), 4) 521 max_pid = max(pid for pid, tid in pidtids) 522 max_tid = max(tid for pid, tid in pidtids) 523 bad_pidtids = ( 524 (max_pid, max_tid + 1, "E02"), 525 (max_pid + 1, max_tid, "E01"), 526 (max_pid + 1, max_tid + 1, "E01"), 527 ) 528 529 for pidtid in pidtids: 530 self.test_sequence.add_log_lines( 531 [ 532 # test explicit PID+TID 533 "read packet: $Tp{:x}.{:x}#00".format(*pidtid), 534 "send packet: $OK#00", 535 # test implicit PID via Hg 536 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 537 "send packet: $OK#00", 538 "read packet: $T{:x}#00".format(max_tid + 1), 539 "send packet: $E02#00", 540 "read packet: $T{:x}#00".format(pidtid[1]), 541 "send packet: $OK#00", 542 ], True) 543 for pid, tid, expected in bad_pidtids: 544 self.test_sequence.add_log_lines( 545 ["read packet: $Tp{:x}.{:x}#00".format(pid, tid), 546 "send packet: ${}#00".format(expected), 547 ], True) 548 self.expect_gdbremote_sequence() 549