1from __future__ import print_function
2
3# lldb test suite imports
4from lldbsuite.test.decorators import *
5from lldbsuite.test.lldbtest import TestBase
6
7# gdb-remote-specific imports
8import lldbgdbserverutils
9from gdbremote_testcase import GdbRemoteTestCaseBase
10
11import binascii
12import stat
13import tempfile
14
15
16class TestGdbRemotePlatformFile(GdbRemoteTestCaseBase):
17
18    mydir = TestBase.compute_mydir(__file__)
19
20    @skipIfWindows
21    @add_test_categories(["llgs"])
22    def test_platform_file_rdonly(self):
23        self.vFile_test(read=True)
24
25    @skipIfWindows
26    @add_test_categories(["llgs"])
27    def test_platform_file_wronly(self):
28        self.vFile_test(write=True)
29
30    @skipIfWindows
31    @add_test_categories(["llgs"])
32    def test_platform_file_rdwr(self):
33        self.vFile_test(read=True, write=True)
34
35    @skipIfWindows
36    @add_test_categories(["llgs"])
37    def test_platform_file_wronly_append(self):
38        self.vFile_test(write=True, append=True)
39
40    @skipIfWindows
41    @add_test_categories(["llgs"])
42    def test_platform_file_rdwr_append(self):
43        self.vFile_test(read=True, write=True, append=True)
44
45    @skipIfWindows
46    @add_test_categories(["llgs"])
47    def test_platform_file_wronly_trunc(self):
48        self.vFile_test(write=True, trunc=True)
49
50    @skipIfWindows
51    @add_test_categories(["llgs"])
52    def test_platform_file_rdwr_trunc(self):
53        self.vFile_test(read=True, write=True, trunc=True)
54
55    @skipIfWindows
56    @add_test_categories(["llgs"])
57    def test_platform_file_wronly_creat(self):
58        self.vFile_test(write=True, creat=True)
59
60    @skipIfWindows
61    @add_test_categories(["llgs"])
62    def test_platform_file_wronly_creat_excl(self):
63        self.vFile_test(write=True, creat=True, excl=True)
64
65    @skipIfWindows
66    @add_test_categories(["llgs"])
67    def test_platform_file_wronly_fail(self):
68        server = self.connect_to_debug_monitor()
69        self.assertIsNotNone(server)
70
71        # create a temporary directory
72        with tempfile.TemporaryDirectory() as temp_dir:
73            temp_path = os.path.join(temp_dir, "test")
74            self.assertFalse(os.path.exists(temp_path))
75
76            # attempt to open the file without O_CREAT
77            self.do_handshake()
78            self.test_sequence.add_log_lines(
79                ["read packet: $vFile:open:%s,1,0#00" % (
80                    binascii.b2a_hex(temp_path.encode()).decode(),),
81                 {"direction": "send",
82                 "regex": r"^\$F-1,[0-9a-fA-F]+#[0-9a-fA-F]{2}$"}],
83                True)
84            self.expect_gdbremote_sequence()
85
86    @skipIfWindows
87    @add_test_categories(["llgs"])
88    def test_platform_file_wronly_creat_excl_fail(self):
89        server = self.connect_to_debug_monitor()
90        self.assertIsNotNone(server)
91
92        with tempfile.NamedTemporaryFile() as temp_file:
93            # attempt to open the file with O_CREAT|O_EXCL
94            self.do_handshake()
95            self.test_sequence.add_log_lines(
96                ["read packet: $vFile:open:%s,a01,0#00" % (
97                    binascii.b2a_hex(temp_file.name.encode()).decode(),),
98                 {"direction": "send",
99                 "regex": r"^\$F-1,[0-9a-fA-F]+#[0-9a-fA-F]{2}$"}],
100                True)
101            self.expect_gdbremote_sequence()
102
103    def expect_error(self):
104        self.test_sequence.add_log_lines(
105            [{"direction": "send",
106             "regex": r"^\$F-1,[0-9a-fA-F]+#[0-9a-fA-F]{2}$"}],
107            True)
108        self.expect_gdbremote_sequence()
109
110    def vFile_test(self, read=False, write=False, append=False, trunc=False,
111                   creat=False, excl=False):
112        if read and write:
113            mode = 2
114        elif write:
115            mode = 1
116        else:  # read
117            mode = 0
118        if append:
119            mode |= 8
120        if creat:
121            mode |= 0x200
122        if trunc:
123            mode |= 0x400
124        if excl:
125            mode |= 0x800
126
127        old_umask = os.umask(0o22)
128        try:
129            server = self.connect_to_debug_monitor()
130        finally:
131            os.umask(old_umask)
132        self.assertIsNotNone(server)
133
134        # create a temporary file with some data
135        test_data = 'some test data longer than 16 bytes\n'
136        if creat:
137            temp_dir = tempfile.TemporaryDirectory()
138        else:
139            temp_file = tempfile.NamedTemporaryFile()
140
141        try:
142            if creat:
143                temp_path = os.path.join(temp_dir.name, "test")
144                self.assertFalse(os.path.exists(temp_path))
145            else:
146                temp_file.write(test_data.encode())
147                temp_file.flush()
148                temp_path = temp_file.name
149
150            # open the file for reading
151            self.do_handshake()
152            self.test_sequence.add_log_lines(
153                ["read packet: $vFile:open:%s,%x,1a0#00" % (
154                    binascii.b2a_hex(temp_path.encode()).decode(),
155                    mode),
156                 {"direction": "send",
157                 "regex": r"^\$F([0-9a-fA-F]+)#[0-9a-fA-F]{2}$",
158                 "capture": {1: "fd"}}],
159                True)
160
161            context = self.expect_gdbremote_sequence()
162            self.assertIsNotNone(context)
163            fd = int(context["fd"], 16)
164
165            # read data from the file
166            self.reset_test_sequence()
167            self.test_sequence.add_log_lines(
168                ["read packet: $vFile:pread:%x,11,10#00" % (fd,)],
169                True)
170            if read:
171                self.test_sequence.add_log_lines(
172                    [{"direction": "send",
173                     "regex": r"^\$F([0-9a-fA-F]+);(.*)#[0-9a-fA-F]{2}$",
174                     "capture": {1: "size", 2: "data"}}],
175                    True)
176                context = self.expect_gdbremote_sequence()
177                self.assertIsNotNone(context)
178                if trunc:
179                    self.assertEqual(context["size"], "0")
180                    self.assertEqual(context["data"], "")
181                else:
182                    self.assertEqual(context["size"], "11")  # hex
183                    self.assertEqual(context["data"], test_data[0x10:0x10 + 0x11])
184            else:
185                self.expect_error()
186
187            # another offset
188            if read and not trunc:
189                self.reset_test_sequence()
190                self.test_sequence.add_log_lines(
191                    ["read packet: $vFile:pread:%x,6,3#00" % (fd,),
192                     {"direction": "send",
193                     "regex": r"^\$F([0-9a-fA-F]+);(.+)#[0-9a-fA-F]{2}$",
194                     "capture": {1: "size", 2: "data"}}],
195                    True)
196                context = self.expect_gdbremote_sequence()
197                self.assertIsNotNone(context)
198                self.assertEqual(context["size"], "6")  # hex
199                self.assertEqual(context["data"], test_data[3:3 + 6])
200
201            # write data to the file
202            self.reset_test_sequence()
203            self.test_sequence.add_log_lines(
204                ["read packet: $vFile:pwrite:%x,6,somedata#00" % (fd,)],
205                True)
206            if write:
207                self.test_sequence.add_log_lines(
208                    ["send packet: $F8#00"],
209                    True)
210                self.expect_gdbremote_sequence()
211            else:
212                self.expect_error()
213
214            # close the file
215            self.reset_test_sequence()
216            self.test_sequence.add_log_lines(
217                ["read packet: $vFile:close:%x#00" % (fd,),
218                 "send packet: $F0#00"],
219                True)
220            self.expect_gdbremote_sequence()
221
222            if write:
223                # check if the data was actually written
224                if creat:
225                    temp_file = open(temp_path, "rb")
226                    self.assertEqual(os.fstat(temp_file.fileno()).st_mode & 0o7777,
227                                     0o640)
228                temp_file.seek(0)
229                data = test_data.encode()
230                if trunc or creat:
231                    data = b"\0" * 6 + b"somedata"
232                elif append:
233                    data += b"somedata"
234                else:
235                    data = data[:6] + b"somedata" + data[6 + 8:]
236                self.assertEqual(temp_file.read(), data)
237        finally:
238            if creat:
239                temp_dir.cleanup()
240            else:
241                temp_file.close()
242