1import lldb 2import binascii 3import os.path 4from lldbsuite.test.lldbtest import * 5from lldbsuite.test.decorators import * 6from gdbclientutils import * 7 8 9class TestGDBRemoteClient(GDBRemoteTestBase): 10 11 class gPacketResponder(MockGDBServerResponder): 12 def readRegisters(self): 13 return '0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000' 14 15 def test_connect(self): 16 """Test connecting to a remote gdb server""" 17 target = self.createTarget("a.yaml") 18 process = self.connect(target) 19 self.assertPacketLogContains(["qProcessInfo", "qfThreadInfo"]) 20 21 def test_attach_fail(self): 22 error_msg = "mock-error-msg" 23 24 class MyResponder(MockGDBServerResponder): 25 # Pretend we don't have any process during the initial queries. 26 def qC(self): 27 return "E42" 28 29 def qfThreadInfo(self): 30 return "OK" # No threads. 31 32 # Then, when we are asked to attach, error out. 33 def vAttach(self, pid): 34 return "E42;" + binascii.hexlify(error_msg.encode()).decode() 35 36 self.server.responder = MyResponder() 37 38 target = self.dbg.CreateTarget("") 39 process = self.connect(target) 40 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, [lldb.eStateConnected]) 41 42 error = lldb.SBError() 43 target.AttachToProcessWithID(lldb.SBListener(), 47, error) 44 self.assertEquals(error_msg, error.GetCString()) 45 46 def test_launch_fail(self): 47 class MyResponder(MockGDBServerResponder): 48 # Pretend we don't have any process during the initial queries. 49 def qC(self): 50 return "E42" 51 52 def qfThreadInfo(self): 53 return "OK" # No threads. 54 55 # Then, when we are asked to attach, error out. 56 def A(self, packet): 57 return "E47" 58 59 self.server.responder = MyResponder() 60 61 target = self.createTarget("a.yaml") 62 process = self.connect(target) 63 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, [lldb.eStateConnected]) 64 65 error = lldb.SBError() 66 target.Launch(lldb.SBListener(), None, None, None, None, None, 67 None, 0, True, error) 68 self.assertEquals("'A' packet returned an error: 71", error.GetCString()) 69 70 def test_read_registers_using_g_packets(self): 71 """Test reading registers using 'g' packets (default behavior)""" 72 self.dbg.HandleCommand( 73 "settings set plugin.process.gdb-remote.use-g-packet-for-reading true") 74 self.addTearDownHook(lambda: 75 self.runCmd("settings set plugin.process.gdb-remote.use-g-packet-for-reading false")) 76 self.server.responder = self.gPacketResponder() 77 target = self.createTarget("a.yaml") 78 process = self.connect(target) 79 80 self.assertEquals(1, self.server.responder.packetLog.count("g")) 81 self.server.responder.packetLog = [] 82 self.read_registers(process) 83 # Reading registers should not cause any 'p' packets to be exchanged. 84 self.assertEquals( 85 0, len([p for p in self.server.responder.packetLog if p.startswith("p")])) 86 87 def test_read_registers_using_p_packets(self): 88 """Test reading registers using 'p' packets""" 89 self.dbg.HandleCommand( 90 "settings set plugin.process.gdb-remote.use-g-packet-for-reading false") 91 target = self.createTarget("a.yaml") 92 process = self.connect(target) 93 94 self.read_registers(process) 95 self.assertNotIn("g", self.server.responder.packetLog) 96 self.assertGreater( 97 len([p for p in self.server.responder.packetLog if p.startswith("p")]), 0) 98 99 def test_write_registers_using_P_packets(self): 100 """Test writing registers using 'P' packets (default behavior)""" 101 self.server.responder = self.gPacketResponder() 102 target = self.createTarget("a.yaml") 103 process = self.connect(target) 104 105 self.write_registers(process) 106 self.assertEquals(0, len( 107 [p for p in self.server.responder.packetLog if p.startswith("G")])) 108 self.assertGreater( 109 len([p for p in self.server.responder.packetLog if p.startswith("P")]), 0) 110 111 def test_write_registers_using_G_packets(self): 112 """Test writing registers using 'G' packets""" 113 114 class MyResponder(self.gPacketResponder): 115 def readRegister(self, register): 116 # empty string means unsupported 117 return "" 118 119 self.server.responder = MyResponder() 120 target = self.createTarget("a.yaml") 121 process = self.connect(target) 122 123 self.write_registers(process) 124 self.assertEquals(0, len( 125 [p for p in self.server.responder.packetLog if p.startswith("P")])) 126 self.assertGreater(len( 127 [p for p in self.server.responder.packetLog if p.startswith("G")]), 0) 128 129 def read_registers(self, process): 130 self.for_each_gpr( 131 process, lambda r: self.assertEquals("0x00000000", r.GetValue())) 132 133 def write_registers(self, process): 134 self.for_each_gpr( 135 process, lambda r: r.SetValueFromCString("0x00000000")) 136 137 def for_each_gpr(self, process, operation): 138 registers = process.GetThreadAtIndex(0).GetFrameAtIndex(0).GetRegisters() 139 self.assertGreater(registers.GetSize(), 0) 140 regSet = registers[0] 141 numChildren = regSet.GetNumChildren() 142 self.assertGreater(numChildren, 0) 143 for i in range(numChildren): 144 operation(regSet.GetChildAtIndex(i)) 145 146 def test_launch_A(self): 147 class MyResponder(MockGDBServerResponder): 148 def __init__(self, *args, **kwargs): 149 self.started = False 150 return super().__init__(*args, **kwargs) 151 152 def qC(self): 153 if self.started: 154 return "QCp10.10" 155 else: 156 return "E42" 157 158 def qfThreadInfo(self): 159 if self.started: 160 return "mp10.10" 161 else: 162 return "E42" 163 164 def qsThreadInfo(self): 165 return "l" 166 167 def A(self, packet): 168 self.started = True 169 return "OK" 170 171 def qLaunchSuccess(self): 172 if self.started: 173 return "OK" 174 return "E42" 175 176 self.server.responder = MyResponder() 177 178 target = self.createTarget("a.yaml") 179 # NB: apparently GDB packets are using "/" on Windows too 180 exe_path = self.getBuildArtifact("a").replace(os.path.sep, '/') 181 exe_hex = binascii.b2a_hex(exe_path.encode()).decode() 182 process = self.connect(target) 183 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, 184 [lldb.eStateConnected]) 185 186 target.Launch(lldb.SBListener(), 187 ["arg1", "arg2", "arg3"], # argv 188 [], # envp 189 None, # stdin_path 190 None, # stdout_path 191 None, # stderr_path 192 None, # working_directory 193 0, # launch_flags 194 True, # stop_at_entry 195 lldb.SBError()) # error 196 self.assertTrue(process, PROCESS_IS_VALID) 197 self.assertEqual(process.GetProcessID(), 16) 198 199 self.assertPacketLogContains([ 200 "A%d,0,%s,8,1,61726731,8,2,61726732,8,3,61726733" % ( 201 len(exe_hex), exe_hex), 202 ]) 203 204 def test_launch_vRun(self): 205 class MyResponder(MockGDBServerResponder): 206 def __init__(self, *args, **kwargs): 207 self.started = False 208 return super().__init__(*args, **kwargs) 209 210 def qC(self): 211 if self.started: 212 return "QCp10.10" 213 else: 214 return "E42" 215 216 def qfThreadInfo(self): 217 if self.started: 218 return "mp10.10" 219 else: 220 return "E42" 221 222 def qsThreadInfo(self): 223 return "l" 224 225 def vRun(self, packet): 226 self.started = True 227 return "T13" 228 229 def A(self, packet): 230 return "E28" 231 232 self.server.responder = MyResponder() 233 234 target = self.createTarget("a.yaml") 235 # NB: apparently GDB packets are using "/" on Windows too 236 exe_path = self.getBuildArtifact("a").replace(os.path.sep, '/') 237 exe_hex = binascii.b2a_hex(exe_path.encode()).decode() 238 process = self.connect(target) 239 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, 240 [lldb.eStateConnected]) 241 242 process = target.Launch(lldb.SBListener(), 243 ["arg1", "arg2", "arg3"], # argv 244 [], # envp 245 None, # stdin_path 246 None, # stdout_path 247 None, # stderr_path 248 None, # working_directory 249 0, # launch_flags 250 True, # stop_at_entry 251 lldb.SBError()) # error 252 self.assertTrue(process, PROCESS_IS_VALID) 253 self.assertEqual(process.GetProcessID(), 16) 254 255 self.assertPacketLogContains([ 256 "vRun;%s;61726731;61726732;61726733" % (exe_hex,) 257 ]) 258 259 def test_launch_QEnvironment(self): 260 class MyResponder(MockGDBServerResponder): 261 def qC(self): 262 return "E42" 263 264 def qfThreadInfo(self): 265 return "E42" 266 267 def vRun(self, packet): 268 self.started = True 269 return "E28" 270 271 self.server.responder = MyResponder() 272 273 target = self.createTarget("a.yaml") 274 process = self.connect(target) 275 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, 276 [lldb.eStateConnected]) 277 278 target.Launch(lldb.SBListener(), 279 [], # argv 280 ["PLAIN=foo", 281 "NEEDSENC=frob$", 282 "NEEDSENC2=fr*ob", 283 "NEEDSENC3=fro}b", 284 "NEEDSENC4=f#rob", 285 "EQUALS=foo=bar", 286 ], # envp 287 None, # stdin_path 288 None, # stdout_path 289 None, # stderr_path 290 None, # working_directory 291 0, # launch_flags 292 True, # stop_at_entry 293 lldb.SBError()) # error 294 295 self.assertPacketLogContains([ 296 "QEnvironment:PLAIN=foo", 297 "QEnvironmentHexEncoded:4e45454453454e433d66726f6224", 298 "QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62", 299 "QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62", 300 "QEnvironmentHexEncoded:4e45454453454e43343d6623726f62", 301 "QEnvironment:EQUALS=foo=bar", 302 ]) 303 304 def test_launch_QEnvironmentHexEncoded_only(self): 305 class MyResponder(MockGDBServerResponder): 306 def qC(self): 307 return "E42" 308 309 def qfThreadInfo(self): 310 return "E42" 311 312 def vRun(self, packet): 313 self.started = True 314 return "E28" 315 316 def QEnvironment(self, packet): 317 return "" 318 319 self.server.responder = MyResponder() 320 321 target = self.createTarget("a.yaml") 322 process = self.connect(target) 323 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, 324 [lldb.eStateConnected]) 325 326 target.Launch(lldb.SBListener(), 327 [], # argv 328 ["PLAIN=foo", 329 "NEEDSENC=frob$", 330 "NEEDSENC2=fr*ob", 331 "NEEDSENC3=fro}b", 332 "NEEDSENC4=f#rob", 333 "EQUALS=foo=bar", 334 ], # envp 335 None, # stdin_path 336 None, # stdout_path 337 None, # stderr_path 338 None, # working_directory 339 0, # launch_flags 340 True, # stop_at_entry 341 lldb.SBError()) # error 342 343 self.assertPacketLogContains([ 344 "QEnvironmentHexEncoded:504c41494e3d666f6f", 345 "QEnvironmentHexEncoded:4e45454453454e433d66726f6224", 346 "QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62", 347 "QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62", 348 "QEnvironmentHexEncoded:4e45454453454e43343d6623726f62", 349 "QEnvironmentHexEncoded:455155414c533d666f6f3d626172", 350 ]) 351 352 def test_detach_no_multiprocess(self): 353 class MyResponder(MockGDBServerResponder): 354 def __init__(self): 355 super().__init__() 356 self.detached = None 357 358 def qfThreadInfo(self): 359 return "10200" 360 361 def D(self, packet): 362 self.detached = packet 363 return "OK" 364 365 self.server.responder = MyResponder() 366 target = self.dbg.CreateTarget('') 367 process = self.connect(target) 368 process.Detach() 369 self.assertEqual(self.server.responder.detached, "D") 370 371 def test_detach_pid(self): 372 class MyResponder(MockGDBServerResponder): 373 def __init__(self, test_case): 374 super().__init__() 375 self.test_case = test_case 376 self.detached = None 377 378 def qSupported(self, client_supported): 379 self.test_case.assertIn("multiprocess+", client_supported) 380 return "multiprocess+;" + super().qSupported(client_supported) 381 382 def qfThreadInfo(self): 383 return "mp400.10200" 384 385 def D(self, packet): 386 self.detached = packet 387 return "OK" 388 389 self.server.responder = MyResponder(self) 390 target = self.dbg.CreateTarget('') 391 process = self.connect(target) 392 process.Detach() 393 self.assertRegex(self.server.responder.detached, r"D;0*400") 394