1import lldb
2import binascii
3import os.path
4from lldbsuite.test.lldbtest import *
5from lldbsuite.test.decorators import *
6from gdbclientutils import *
7
8
9class TestGDBRemoteClient(GDBRemoteTestBase):
10
11    class gPacketResponder(MockGDBServerResponder):
12        def readRegisters(self):
13            return '0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
14
15    def test_connect(self):
16        """Test connecting to a remote gdb server"""
17        target = self.createTarget("a.yaml")
18        process = self.connect(target)
19        self.assertPacketLogContains(["qProcessInfo", "qfThreadInfo"])
20
21    def test_attach_fail(self):
22        error_msg = "mock-error-msg"
23
24        class MyResponder(MockGDBServerResponder):
25            # Pretend we don't have any process during the initial queries.
26            def qC(self):
27                return "E42"
28
29            def qfThreadInfo(self):
30                return "OK" # No threads.
31
32            # Then, when we are asked to attach, error out.
33            def vAttach(self, pid):
34                return "E42;" + binascii.hexlify(error_msg.encode()).decode()
35
36        self.server.responder = MyResponder()
37
38        target = self.dbg.CreateTarget("")
39        process = self.connect(target)
40        lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, [lldb.eStateConnected])
41
42        error = lldb.SBError()
43        target.AttachToProcessWithID(lldb.SBListener(), 47, error)
44        self.assertEquals(error_msg, error.GetCString())
45
46    def test_launch_fail(self):
47        class MyResponder(MockGDBServerResponder):
48            # Pretend we don't have any process during the initial queries.
49            def qC(self):
50                return "E42"
51
52            def qfThreadInfo(self):
53                return "OK" # No threads.
54
55            # Then, when we are asked to attach, error out.
56            def A(self, packet):
57                return "E47"
58
59        self.server.responder = MyResponder()
60
61        target = self.createTarget("a.yaml")
62        process = self.connect(target)
63        lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, [lldb.eStateConnected])
64
65        error = lldb.SBError()
66        target.Launch(lldb.SBListener(), None, None, None, None, None,
67                None, 0, True, error)
68        self.assertEquals("'A' packet returned an error: 71", error.GetCString())
69
70    def test_read_registers_using_g_packets(self):
71        """Test reading registers using 'g' packets (default behavior)"""
72        self.dbg.HandleCommand(
73                "settings set plugin.process.gdb-remote.use-g-packet-for-reading true")
74        self.addTearDownHook(lambda:
75                self.runCmd("settings set plugin.process.gdb-remote.use-g-packet-for-reading false"))
76        self.server.responder = self.gPacketResponder()
77        target = self.createTarget("a.yaml")
78        process = self.connect(target)
79
80        self.assertEquals(1, self.server.responder.packetLog.count("g"))
81        self.server.responder.packetLog = []
82        self.read_registers(process)
83        # Reading registers should not cause any 'p' packets to be exchanged.
84        self.assertEquals(
85                0, len([p for p in self.server.responder.packetLog if p.startswith("p")]))
86
87    def test_read_registers_using_p_packets(self):
88        """Test reading registers using 'p' packets"""
89        self.dbg.HandleCommand(
90                "settings set plugin.process.gdb-remote.use-g-packet-for-reading false")
91        target = self.createTarget("a.yaml")
92        process = self.connect(target)
93
94        self.read_registers(process)
95        self.assertNotIn("g", self.server.responder.packetLog)
96        self.assertGreater(
97                len([p for p in self.server.responder.packetLog if p.startswith("p")]), 0)
98
99    def test_write_registers_using_P_packets(self):
100        """Test writing registers using 'P' packets (default behavior)"""
101        self.server.responder = self.gPacketResponder()
102        target = self.createTarget("a.yaml")
103        process = self.connect(target)
104
105        self.write_registers(process)
106        self.assertEquals(0, len(
107                [p for p in self.server.responder.packetLog if p.startswith("G")]))
108        self.assertGreater(
109                len([p for p in self.server.responder.packetLog if p.startswith("P")]), 0)
110
111    def test_write_registers_using_G_packets(self):
112        """Test writing registers using 'G' packets"""
113
114        class MyResponder(self.gPacketResponder):
115            def readRegister(self, register):
116                # empty string means unsupported
117                return ""
118
119        self.server.responder = MyResponder()
120        target = self.createTarget("a.yaml")
121        process = self.connect(target)
122
123        self.write_registers(process)
124        self.assertEquals(0, len(
125                [p for p in self.server.responder.packetLog if p.startswith("P")]))
126        self.assertGreater(len(
127                [p for p in self.server.responder.packetLog if p.startswith("G")]), 0)
128
129    def read_registers(self, process):
130        self.for_each_gpr(
131                process, lambda r: self.assertEquals("0x00000000", r.GetValue()))
132
133    def write_registers(self, process):
134        self.for_each_gpr(
135                process, lambda r: r.SetValueFromCString("0x00000000"))
136
137    def for_each_gpr(self, process, operation):
138        registers = process.GetThreadAtIndex(0).GetFrameAtIndex(0).GetRegisters()
139        self.assertGreater(registers.GetSize(), 0)
140        regSet = registers[0]
141        numChildren = regSet.GetNumChildren()
142        self.assertGreater(numChildren, 0)
143        for i in range(numChildren):
144            operation(regSet.GetChildAtIndex(i))
145
146    def test_launch_A(self):
147        class MyResponder(MockGDBServerResponder):
148            def __init__(self, *args, **kwargs):
149                self.started = False
150                return super().__init__(*args, **kwargs)
151
152            def qC(self):
153                if self.started:
154                    return "QCp10.10"
155                else:
156                    return "E42"
157
158            def qfThreadInfo(self):
159                if self.started:
160                    return "mp10.10"
161                else:
162                   return "E42"
163
164            def qsThreadInfo(self):
165                return "l"
166
167            def A(self, packet):
168                self.started = True
169                return "OK"
170
171            def qLaunchSuccess(self):
172                if self.started:
173                    return "OK"
174                return "E42"
175
176        self.server.responder = MyResponder()
177
178        target = self.createTarget("a.yaml")
179        # NB: apparently GDB packets are using "/" on Windows too
180        exe_path = self.getBuildArtifact("a").replace(os.path.sep, '/')
181        exe_hex = binascii.b2a_hex(exe_path.encode()).decode()
182        process = self.connect(target)
183        lldbutil.expect_state_changes(self, self.dbg.GetListener(), process,
184                                      [lldb.eStateConnected])
185
186        target.Launch(lldb.SBListener(),
187                      ["arg1", "arg2", "arg3"],  # argv
188                      [],  # envp
189                      None,  # stdin_path
190                      None,  # stdout_path
191                      None,  # stderr_path
192                      None,  # working_directory
193                      0,  # launch_flags
194                      True,  # stop_at_entry
195                      lldb.SBError())  # error
196        self.assertTrue(process, PROCESS_IS_VALID)
197        self.assertEqual(process.GetProcessID(), 16)
198
199        self.assertPacketLogContains([
200          "A%d,0,%s,8,1,61726731,8,2,61726732,8,3,61726733" % (
201              len(exe_hex), exe_hex),
202        ])
203
204    def test_launch_vRun(self):
205        class MyResponder(MockGDBServerResponder):
206            def __init__(self, *args, **kwargs):
207                self.started = False
208                return super().__init__(*args, **kwargs)
209
210            def qC(self):
211                if self.started:
212                    return "QCp10.10"
213                else:
214                    return "E42"
215
216            def qfThreadInfo(self):
217                if self.started:
218                    return "mp10.10"
219                else:
220                   return "E42"
221
222            def qsThreadInfo(self):
223                return "l"
224
225            def vRun(self, packet):
226                self.started = True
227                return "T13"
228
229            def A(self, packet):
230                return "E28"
231
232        self.server.responder = MyResponder()
233
234        target = self.createTarget("a.yaml")
235        # NB: apparently GDB packets are using "/" on Windows too
236        exe_path = self.getBuildArtifact("a").replace(os.path.sep, '/')
237        exe_hex = binascii.b2a_hex(exe_path.encode()).decode()
238        process = self.connect(target)
239        lldbutil.expect_state_changes(self, self.dbg.GetListener(), process,
240                                      [lldb.eStateConnected])
241
242        process = target.Launch(lldb.SBListener(),
243                                ["arg1", "arg2", "arg3"],  # argv
244                                [],  # envp
245                                None,  # stdin_path
246                                None,  # stdout_path
247                                None,  # stderr_path
248                                None,  # working_directory
249                                0,  # launch_flags
250                                True,  # stop_at_entry
251                                lldb.SBError())  # error
252        self.assertTrue(process, PROCESS_IS_VALID)
253        self.assertEqual(process.GetProcessID(), 16)
254
255        self.assertPacketLogContains([
256          "vRun;%s;61726731;61726732;61726733" % (exe_hex,)
257        ])
258
259    def test_launch_QEnvironment(self):
260        class MyResponder(MockGDBServerResponder):
261            def qC(self):
262                return "E42"
263
264            def qfThreadInfo(self):
265               return "E42"
266
267            def vRun(self, packet):
268                self.started = True
269                return "E28"
270
271        self.server.responder = MyResponder()
272
273        target = self.createTarget("a.yaml")
274        process = self.connect(target)
275        lldbutil.expect_state_changes(self, self.dbg.GetListener(), process,
276                                      [lldb.eStateConnected])
277
278        target.Launch(lldb.SBListener(),
279                      [],  # argv
280                      ["PLAIN=foo",
281                       "NEEDSENC=frob$",
282                       "NEEDSENC2=fr*ob",
283                       "NEEDSENC3=fro}b",
284                       "NEEDSENC4=f#rob",
285                       "EQUALS=foo=bar",
286                       ],  # envp
287                      None,  # stdin_path
288                      None,  # stdout_path
289                      None,  # stderr_path
290                      None,  # working_directory
291                      0,  # launch_flags
292                      True,  # stop_at_entry
293                      lldb.SBError())  # error
294
295        self.assertPacketLogContains([
296          "QEnvironment:PLAIN=foo",
297          "QEnvironmentHexEncoded:4e45454453454e433d66726f6224",
298          "QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62",
299          "QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62",
300          "QEnvironmentHexEncoded:4e45454453454e43343d6623726f62",
301          "QEnvironment:EQUALS=foo=bar",
302        ])
303
304    def test_launch_QEnvironmentHexEncoded_only(self):
305        class MyResponder(MockGDBServerResponder):
306            def qC(self):
307                return "E42"
308
309            def qfThreadInfo(self):
310               return "E42"
311
312            def vRun(self, packet):
313                self.started = True
314                return "E28"
315
316            def QEnvironment(self, packet):
317                return ""
318
319        self.server.responder = MyResponder()
320
321        target = self.createTarget("a.yaml")
322        process = self.connect(target)
323        lldbutil.expect_state_changes(self, self.dbg.GetListener(), process,
324                                      [lldb.eStateConnected])
325
326        target.Launch(lldb.SBListener(),
327                      [],  # argv
328                      ["PLAIN=foo",
329                       "NEEDSENC=frob$",
330                       "NEEDSENC2=fr*ob",
331                       "NEEDSENC3=fro}b",
332                       "NEEDSENC4=f#rob",
333                       "EQUALS=foo=bar",
334                       ],  # envp
335                      None,  # stdin_path
336                      None,  # stdout_path
337                      None,  # stderr_path
338                      None,  # working_directory
339                      0,  # launch_flags
340                      True,  # stop_at_entry
341                      lldb.SBError())  # error
342
343        self.assertPacketLogContains([
344          "QEnvironmentHexEncoded:504c41494e3d666f6f",
345          "QEnvironmentHexEncoded:4e45454453454e433d66726f6224",
346          "QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62",
347          "QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62",
348          "QEnvironmentHexEncoded:4e45454453454e43343d6623726f62",
349          "QEnvironmentHexEncoded:455155414c533d666f6f3d626172",
350        ])
351
352    def test_detach_no_multiprocess(self):
353        class MyResponder(MockGDBServerResponder):
354            def __init__(self):
355                super().__init__()
356                self.detached = None
357
358            def qfThreadInfo(self):
359                return "10200"
360
361            def D(self, packet):
362                self.detached = packet
363                return "OK"
364
365        self.server.responder = MyResponder()
366        target = self.dbg.CreateTarget('')
367        process = self.connect(target)
368        process.Detach()
369        self.assertEqual(self.server.responder.detached, "D")
370
371    def test_detach_pid(self):
372        class MyResponder(MockGDBServerResponder):
373            def __init__(self, test_case):
374                super().__init__()
375                self.test_case = test_case
376                self.detached = None
377
378            def qSupported(self, client_supported):
379                self.test_case.assertIn("multiprocess+", client_supported)
380                return "multiprocess+;" + super().qSupported(client_supported)
381
382            def qfThreadInfo(self):
383                return "mp400.10200"
384
385            def D(self, packet):
386                self.detached = packet
387                return "OK"
388
389        self.server.responder = MyResponder(self)
390        target = self.dbg.CreateTarget('')
391        process = self.connect(target)
392        process.Detach()
393        self.assertRegex(self.server.responder.detached, r"D;0*400")
394