1import gdbremote_testcase 2from lldbsuite.test.decorators import * 3from lldbsuite.test.lldbtest import * 4from lldbsuite.test import lldbutil 5 6class TestGdbRemoteFork(gdbremote_testcase.GdbRemoteTestCaseBase): 7 8 fork_regex = ("[$]T05thread:p([0-9a-f]+)[.]([0-9a-f]+);.*" 9 "{}:p([0-9a-f]+)[.]([0-9a-f]+).*") 10 fork_capture = {1: "parent_pid", 2: "parent_tid", 11 3: "child_pid", 4: "child_tid"} 12 procinfo_regex = "[$]pid:([0-9a-f]+);.*" 13 14 @add_test_categories(["fork"]) 15 def test_fork_multithreaded(self): 16 self.build() 17 self.prep_debug_monitor_and_inferior(inferior_args=["thread:new"]*2 + ["fork"]) 18 self.add_qSupported_packets(["multiprocess+", "fork-events+"]) 19 ret = self.expect_gdbremote_sequence() 20 self.assertIn("fork-events+", ret["qSupported_response"]) 21 self.reset_test_sequence() 22 23 # continue and expect fork 24 self.test_sequence.add_log_lines([ 25 "read packet: $c#00", 26 {"direction": "send", "regex": self.fork_regex.format("fork"), 27 "capture": self.fork_capture}, 28 ], True) 29 ret = self.expect_gdbremote_sequence() 30 child_pid = ret["child_pid"] 31 self.reset_test_sequence() 32 33 # detach the forked child 34 self.test_sequence.add_log_lines([ 35 "read packet: $D;{}#00".format(child_pid), 36 "send packet: $OK#00", 37 "read packet: $k#00", 38 ], True) 39 self.expect_gdbremote_sequence() 40 41 def fork_and_detach_test(self, variant): 42 self.build() 43 self.prep_debug_monitor_and_inferior(inferior_args=[variant]) 44 self.add_qSupported_packets(["multiprocess+", 45 "{}-events+".format(variant)]) 46 ret = self.expect_gdbremote_sequence() 47 self.assertIn("{}-events+".format(variant), ret["qSupported_response"]) 48 self.reset_test_sequence() 49 50 # continue and expect fork 51 self.test_sequence.add_log_lines([ 52 "read packet: $c#00", 53 {"direction": "send", "regex": self.fork_regex.format(variant), 54 "capture": self.fork_capture}, 55 ], True) 56 ret = self.expect_gdbremote_sequence() 57 parent_pid = ret["parent_pid"] 58 parent_tid = ret["parent_tid"] 59 child_pid = ret["child_pid"] 60 child_tid = ret["child_tid"] 61 self.reset_test_sequence() 62 63 # detach the forked child 64 self.test_sequence.add_log_lines([ 65 "read packet: $D;{}#00".format(child_pid), 66 "send packet: $OK#00", 67 # verify that the current process is correct 68 "read packet: $qC#00", 69 "send packet: $QC{}#00".format(parent_tid), 70 # verify that the correct processes are detached/available 71 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 72 "send packet: $Eff#00", 73 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 74 "send packet: $OK#00", 75 ], True) 76 self.expect_gdbremote_sequence() 77 self.reset_test_sequence() 78 return parent_pid, parent_tid 79 80 @add_test_categories(["fork"]) 81 def test_fork(self): 82 parent_pid, _ = self.fork_and_detach_test("fork") 83 84 # resume the parent 85 self.test_sequence.add_log_lines([ 86 "read packet: $c#00", 87 "send packet: $W00;process:{}#00".format(parent_pid), 88 ], True) 89 self.expect_gdbremote_sequence() 90 91 @add_test_categories(["fork"]) 92 def test_vfork(self): 93 parent_pid, parent_tid = self.fork_and_detach_test("vfork") 94 95 # resume the parent 96 self.test_sequence.add_log_lines([ 97 "read packet: $c#00", 98 {"direction": "send", 99 "regex": r"[$]T05thread:p{}[.]{}.*vforkdone.*".format(parent_pid, 100 parent_tid), 101 }, 102 "read packet: $c#00", 103 "send packet: $W00;process:{}#00".format(parent_pid), 104 ], True) 105 self.expect_gdbremote_sequence() 106 107 def fork_and_follow_test(self, variant): 108 self.build() 109 self.prep_debug_monitor_and_inferior(inferior_args=[variant]) 110 self.add_qSupported_packets(["multiprocess+", 111 "{}-events+".format(variant)]) 112 ret = self.expect_gdbremote_sequence() 113 self.assertIn("{}-events+".format(variant), ret["qSupported_response"]) 114 self.reset_test_sequence() 115 116 # continue and expect fork 117 self.test_sequence.add_log_lines([ 118 "read packet: $c#00", 119 {"direction": "send", "regex": self.fork_regex.format(variant), 120 "capture": self.fork_capture}, 121 ], True) 122 ret = self.expect_gdbremote_sequence() 123 parent_pid = ret["parent_pid"] 124 parent_tid = ret["parent_tid"] 125 child_pid = ret["child_pid"] 126 child_tid = ret["child_tid"] 127 self.reset_test_sequence() 128 129 # switch to the forked child 130 self.test_sequence.add_log_lines([ 131 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 132 "send packet: $OK#00", 133 "read packet: $Hcp{}.{}#00".format(child_pid, child_tid), 134 "send packet: $OK#00", 135 # detach the parent 136 "read packet: $D;{}#00".format(parent_pid), 137 "send packet: $OK#00", 138 # verify that the correct processes are detached/available 139 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 140 "send packet: $Eff#00", 141 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 142 "send packet: $OK#00", 143 # then resume the child 144 "read packet: $c#00", 145 "send packet: $W00;process:{}#00".format(child_pid), 146 ], True) 147 self.expect_gdbremote_sequence() 148 149 @add_test_categories(["fork"]) 150 def test_fork_follow(self): 151 self.fork_and_follow_test("fork") 152 153 @add_test_categories(["fork"]) 154 def test_vfork_follow(self): 155 self.fork_and_follow_test("vfork") 156 157 @add_test_categories(["fork"]) 158 def test_select_wrong_pid(self): 159 self.build() 160 self.prep_debug_monitor_and_inferior() 161 self.add_qSupported_packets(["multiprocess+"]) 162 ret = self.expect_gdbremote_sequence() 163 self.assertIn("multiprocess+", ret["qSupported_response"]) 164 self.reset_test_sequence() 165 166 # get process pid 167 self.test_sequence.add_log_lines([ 168 "read packet: $qProcessInfo#00", 169 {"direction": "send", "regex": self.procinfo_regex, 170 "capture": {1: "pid"}}, 171 "read packet: $qC#00", 172 {"direction": "send", "regex": "[$]QC([0-9a-f]+)#.*", 173 "capture": {1: "tid"}}, 174 ], True) 175 ret = self.expect_gdbremote_sequence() 176 pid, tid = (int(ret[x], 16) for x in ("pid", "tid")) 177 self.reset_test_sequence() 178 179 self.test_sequence.add_log_lines([ 180 # try switching to correct pid 181 "read packet: $Hgp{:x}.{:x}#00".format(pid, tid), 182 "send packet: $OK#00", 183 "read packet: $Hcp{:x}.{:x}#00".format(pid, tid), 184 "send packet: $OK#00", 185 # try switching to invalid tid 186 "read packet: $Hgp{:x}.{:x}#00".format(pid, tid+1), 187 "send packet: $E15#00", 188 "read packet: $Hcp{:x}.{:x}#00".format(pid, tid+1), 189 "send packet: $E15#00", 190 # try switching to invalid pid 191 "read packet: $Hgp{:x}.{:x}#00".format(pid+1, tid), 192 "send packet: $Eff#00", 193 "read packet: $Hcp{:x}.{:x}#00".format(pid+1, tid), 194 "send packet: $Eff#00", 195 ], True) 196 self.expect_gdbremote_sequence() 197 198 @add_test_categories(["fork"]) 199 def test_detach_current(self): 200 self.build() 201 self.prep_debug_monitor_and_inferior() 202 self.add_qSupported_packets(["multiprocess+"]) 203 ret = self.expect_gdbremote_sequence() 204 self.assertIn("multiprocess+", ret["qSupported_response"]) 205 self.reset_test_sequence() 206 207 # get process pid 208 self.test_sequence.add_log_lines([ 209 "read packet: $qProcessInfo#00", 210 {"direction": "send", "regex": self.procinfo_regex, 211 "capture": {1: "pid"}}, 212 ], True) 213 ret = self.expect_gdbremote_sequence() 214 pid = ret["pid"] 215 self.reset_test_sequence() 216 217 # detach the process 218 self.test_sequence.add_log_lines([ 219 "read packet: $D;{}#00".format(pid), 220 "send packet: $OK#00", 221 "read packet: $qC#00", 222 "send packet: $E44#00", 223 ], True) 224 self.expect_gdbremote_sequence() 225 226 @add_test_categories(["fork"]) 227 def test_detach_all(self): 228 self.build() 229 self.prep_debug_monitor_and_inferior(inferior_args=["fork"]) 230 self.add_qSupported_packets(["multiprocess+", 231 "fork-events+"]) 232 ret = self.expect_gdbremote_sequence() 233 self.assertIn("fork-events+", ret["qSupported_response"]) 234 self.reset_test_sequence() 235 236 # continue and expect fork 237 self.test_sequence.add_log_lines([ 238 "read packet: $c#00", 239 {"direction": "send", "regex": self.fork_regex.format("fork"), 240 "capture": self.fork_capture}, 241 ], True) 242 ret = self.expect_gdbremote_sequence() 243 parent_pid = ret["parent_pid"] 244 parent_tid = ret["parent_tid"] 245 child_pid = ret["child_pid"] 246 child_tid = ret["child_tid"] 247 self.reset_test_sequence() 248 249 self.test_sequence.add_log_lines([ 250 # double-check our PIDs 251 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 252 "send packet: $OK#00", 253 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 254 "send packet: $OK#00", 255 # detach all processes 256 "read packet: $D#00", 257 "send packet: $OK#00", 258 # verify that both PIDs are invalid now 259 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 260 "send packet: $Eff#00", 261 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 262 "send packet: $Eff#00", 263 ], True) 264 self.expect_gdbremote_sequence() 265