1import lldb
2import binascii
3from lldbsuite.test.lldbtest import *
4from lldbsuite.test.decorators import *
5from gdbclientutils import *
6
7
8class TestGDBRemoteClient(GDBRemoteTestBase):
9
10    class gPacketResponder(MockGDBServerResponder):
11        def readRegisters(self):
12            return '0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
13
14    def setUp(self):
15        super(TestGDBRemoteClient, self).setUp()
16        self._initial_platform = lldb.DBG.GetSelectedPlatform()
17
18    def tearDown(self):
19        lldb.DBG.SetSelectedPlatform(self._initial_platform)
20        super(TestGDBRemoteClient, self).tearDown()
21
22    def test_connect(self):
23        """Test connecting to a remote gdb server"""
24        target = self.createTarget("a.yaml")
25        process = self.connect(target)
26        self.assertPacketLogContains(["qProcessInfo", "qfThreadInfo"])
27
28    def test_attach_fail(self):
29        error_msg = "mock-error-msg"
30
31        class MyResponder(MockGDBServerResponder):
32            # Pretend we don't have any process during the initial queries.
33            def qC(self):
34                return "E42"
35
36            def qfThreadInfo(self):
37                return "OK" # No threads.
38
39            # Then, when we are asked to attach, error out.
40            def vAttach(self, pid):
41                return "E42;" + binascii.hexlify(error_msg.encode()).decode()
42
43        self.server.responder = MyResponder()
44
45        target = self.dbg.CreateTarget("")
46        process = self.connect(target)
47        lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, [lldb.eStateConnected])
48
49        error = lldb.SBError()
50        target.AttachToProcessWithID(lldb.SBListener(), 47, error)
51        self.assertEquals(error_msg, error.GetCString())
52
53    def test_launch_fail(self):
54        class MyResponder(MockGDBServerResponder):
55            # Pretend we don't have any process during the initial queries.
56            def qC(self):
57                return "E42"
58
59            def qfThreadInfo(self):
60                return "OK" # No threads.
61
62            # Then, when we are asked to attach, error out.
63            def A(self, packet):
64                return "E47"
65
66        self.server.responder = MyResponder()
67
68        target = self.createTarget("a.yaml")
69        process = self.connect(target)
70        lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, [lldb.eStateConnected])
71
72        error = lldb.SBError()
73        target.Launch(lldb.SBListener(), None, None, None, None, None,
74                None, 0, True, error)
75        self.assertEquals("'A' packet returned an error: 71", error.GetCString())
76
77    def test_read_registers_using_g_packets(self):
78        """Test reading registers using 'g' packets (default behavior)"""
79        self.dbg.HandleCommand(
80                "settings set plugin.process.gdb-remote.use-g-packet-for-reading true")
81        self.addTearDownHook(lambda:
82                self.runCmd("settings set plugin.process.gdb-remote.use-g-packet-for-reading false"))
83        self.server.responder = self.gPacketResponder()
84        target = self.createTarget("a.yaml")
85        process = self.connect(target)
86
87        self.assertEquals(1, self.server.responder.packetLog.count("g"))
88        self.server.responder.packetLog = []
89        self.read_registers(process)
90        # Reading registers should not cause any 'p' packets to be exchanged.
91        self.assertEquals(
92                0, len([p for p in self.server.responder.packetLog if p.startswith("p")]))
93
94    def test_read_registers_using_p_packets(self):
95        """Test reading registers using 'p' packets"""
96        self.dbg.HandleCommand(
97                "settings set plugin.process.gdb-remote.use-g-packet-for-reading false")
98        target = self.createTarget("a.yaml")
99        process = self.connect(target)
100
101        self.read_registers(process)
102        self.assertFalse("g" in self.server.responder.packetLog)
103        self.assertGreater(
104                len([p for p in self.server.responder.packetLog if p.startswith("p")]), 0)
105
106    def test_write_registers_using_P_packets(self):
107        """Test writing registers using 'P' packets (default behavior)"""
108        self.server.responder = self.gPacketResponder()
109        target = self.createTarget("a.yaml")
110        process = self.connect(target)
111
112        self.write_registers(process)
113        self.assertEquals(0, len(
114                [p for p in self.server.responder.packetLog if p.startswith("G")]))
115        self.assertGreater(
116                len([p for p in self.server.responder.packetLog if p.startswith("P")]), 0)
117
118    def test_write_registers_using_G_packets(self):
119        """Test writing registers using 'G' packets"""
120
121        class MyResponder(self.gPacketResponder):
122            def readRegister(self, register):
123                # empty string means unsupported
124                return ""
125
126        self.server.responder = MyResponder()
127        target = self.createTarget("a.yaml")
128        process = self.connect(target)
129
130        self.write_registers(process)
131        self.assertEquals(0, len(
132                [p for p in self.server.responder.packetLog if p.startswith("P")]))
133        self.assertGreater(len(
134                [p for p in self.server.responder.packetLog if p.startswith("G")]), 0)
135
136    def read_registers(self, process):
137        self.for_each_gpr(
138                process, lambda r: self.assertEquals("0x00000000", r.GetValue()))
139
140    def write_registers(self, process):
141        self.for_each_gpr(
142                process, lambda r: r.SetValueFromCString("0x00000000"))
143
144    def for_each_gpr(self, process, operation):
145        registers = process.GetThreadAtIndex(0).GetFrameAtIndex(0).GetRegisters()
146        self.assertGreater(registers.GetSize(), 0)
147        regSet = registers[0]
148        numChildren = regSet.GetNumChildren()
149        self.assertGreater(numChildren, 0)
150        for i in range(numChildren):
151            operation(regSet.GetChildAtIndex(i))
152