xref: /vim-8.2.3635/src/testdir/test_channel.py (revision 890ee4e2)
1#!/usr/bin/env python
2#
3# Server that will accept connections from a Vim channel.
4# Used by test_channel.vim.
5#
6# This requires Python 2.6 or later.
7
8from __future__ import print_function
9import json
10import socket
11import sys
12import time
13import threading
14
15try:
16    # Python 3
17    import socketserver
18except ImportError:
19    # Python 2
20    import SocketServer as socketserver
21
22class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
23
24    def setup(self):
25        self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
26
27    def handle(self):
28        print("=== socket opened ===")
29        while True:
30            try:
31                received = self.request.recv(4096).decode('utf-8')
32            except socket.error:
33                print("=== socket error ===")
34                break
35            except IOError:
36                print("=== socket closed ===")
37                break
38            if received == '':
39                print("=== socket closed ===")
40                break
41            print("received: {0}".format(received))
42
43            # We may receive two messages at once. Take the part up to the
44            # newline, which should be after the matching "]".
45            todo = received
46            while todo != '':
47                splitidx = todo.find('\n')
48                if splitidx < 0:
49                     used = todo
50                     todo = ''
51                else:
52                     used = todo[:splitidx]
53                     todo = todo[splitidx + 1:]
54                if used != received:
55                    print("using: {0}".format(used))
56
57                try:
58                    decoded = json.loads(used)
59                except ValueError:
60                    print("json decoding failed")
61                    decoded = [-1, '']
62
63                # Send a response if the sequence number is positive.
64                if decoded[0] >= 0:
65                    if decoded[1] == 'hello!':
66                        # simply send back a string
67                        response = "got it"
68                    elif decoded[1] == 'malformed1':
69                        cmd = '["ex",":"]wrong!["ex","smi"]'
70                        print("sending: {0}".format(cmd))
71                        self.request.sendall(cmd.encode('utf-8'))
72                        response = "ok"
73                        # Need to wait for Vim to give up, otherwise it
74                        # sometimes fails on OS X.
75                        time.sleep(0.2)
76                    elif decoded[1] == 'malformed2':
77                        cmd = '"unterminated string'
78                        print("sending: {0}".format(cmd))
79                        self.request.sendall(cmd.encode('utf-8'))
80                        response = "ok"
81                        # Need to wait for Vim to give up, otherwise the double
82                        # quote in the "ok" response terminates the string.
83                        time.sleep(0.2)
84                    elif decoded[1] == 'malformed3':
85                        cmd = '["ex","missing ]"'
86                        print("sending: {0}".format(cmd))
87                        self.request.sendall(cmd.encode('utf-8'))
88                        response = "ok"
89                        # Need to wait for Vim to give up, otherwise the ]
90                        # in the "ok" response terminates the list.
91                        time.sleep(0.2)
92                    elif decoded[1] == 'split':
93                        cmd = '["ex","let '
94                        print("sending: {0}".format(cmd))
95                        self.request.sendall(cmd.encode('utf-8'))
96                        time.sleep(0.01)
97                        cmd = 'g:split = 123"]'
98                        print("sending: {0}".format(cmd))
99                        self.request.sendall(cmd.encode('utf-8'))
100                        response = "ok"
101                    elif decoded[1].startswith("echo "):
102                        # send back the argument
103                        response = decoded[1][5:]
104                    elif decoded[1] == 'make change':
105                        # Send two ex commands at the same time, before
106                        # replying to the request.
107                        cmd = '["ex","call append(\\"$\\",\\"added1\\")"]'
108                        cmd += '["ex","call append(\\"$\\",\\"added2\\")"]'
109                        print("sending: {0}".format(cmd))
110                        self.request.sendall(cmd.encode('utf-8'))
111                        response = "ok"
112                    elif decoded[1] == 'echoerr':
113                        cmd = '["ex","echoerr \\\"this is an error\\\""]'
114                        print("sending: {0}".format(cmd))
115                        self.request.sendall(cmd.encode('utf-8'))
116                        response = "ok"
117                        # Wait a bit, so that the "ex" command is handled
118                        # before the "ch_evalexpr() returns.  Otherwise we are
119                        # outside the try/catch when the "ex" command is
120                        # handled.
121                        time.sleep(0.02)
122                    elif decoded[1] == 'bad command':
123                        cmd = '["ex","foo bar"]'
124                        print("sending: {0}".format(cmd))
125                        self.request.sendall(cmd.encode('utf-8'))
126                        response = "ok"
127                    elif decoded[1] == 'do normal':
128                        # Send a normal command.
129                        cmd = '["normal","G$s more\u001b"]'
130                        print("sending: {0}".format(cmd))
131                        self.request.sendall(cmd.encode('utf-8'))
132                        response = "ok"
133                    elif decoded[1] == 'eval-works':
134                        # Send an eval request.  We ignore the response.
135                        cmd = '["expr","\\"foo\\" . 123", -1]'
136                        print("sending: {0}".format(cmd))
137                        self.request.sendall(cmd.encode('utf-8'))
138                        response = "ok"
139                    elif decoded[1] == 'eval-special':
140                        # Send an eval request.  We ignore the response.
141                        cmd = '["expr","\\"foo\x7f\x10\x01bar\\"", -2]'
142                        print("sending: {0}".format(cmd))
143                        self.request.sendall(cmd.encode('utf-8'))
144                        response = "ok"
145                    elif decoded[1] == 'eval-getline':
146                        # Send an eval request.  We ignore the response.
147                        cmd = '["expr","getline(3)", -3]'
148                        print("sending: {0}".format(cmd))
149                        self.request.sendall(cmd.encode('utf-8'))
150                        response = "ok"
151                    elif decoded[1] == 'eval-fails':
152                        # Send an eval request that will fail.
153                        cmd = '["expr","xxx", -4]'
154                        print("sending: {0}".format(cmd))
155                        self.request.sendall(cmd.encode('utf-8'))
156                        response = "ok"
157                    elif decoded[1] == 'eval-error':
158                        # Send an eval request that works but the result can't
159                        # be encoded.
160                        cmd = '["expr","function(\\"tr\\")", -5]'
161                        print("sending: {0}".format(cmd))
162                        self.request.sendall(cmd.encode('utf-8'))
163                        response = "ok"
164                    elif decoded[1] == 'eval-bad':
165                        # Send an eval request missing the third argument.
166                        cmd = '["expr","xxx"]'
167                        print("sending: {0}".format(cmd))
168                        self.request.sendall(cmd.encode('utf-8'))
169                        response = "ok"
170                    elif decoded[1] == 'an expr':
171                        # Send an expr request.
172                        cmd = '["expr","setline(\\"$\\", [\\"one\\",\\"two\\",\\"three\\"])"]'
173                        print("sending: {0}".format(cmd))
174                        self.request.sendall(cmd.encode('utf-8'))
175                        response = "ok"
176                    elif decoded[1] == 'call-func':
177                        cmd = '["call","MyFunction",[1,2,3], 0]'
178                        print("sending: {0}".format(cmd))
179                        self.request.sendall(cmd.encode('utf-8'))
180                        response = "ok"
181                    elif decoded[1] == 'redraw':
182                        cmd = '["redraw",""]'
183                        print("sending: {0}".format(cmd))
184                        self.request.sendall(cmd.encode('utf-8'))
185                        response = "ok"
186                    elif decoded[1] == 'redraw!':
187                        cmd = '["redraw","force"]'
188                        print("sending: {0}".format(cmd))
189                        self.request.sendall(cmd.encode('utf-8'))
190                        response = "ok"
191                    elif decoded[1] == 'empty-request':
192                        cmd = '[]'
193                        print("sending: {0}".format(cmd))
194                        self.request.sendall(cmd.encode('utf-8'))
195                        response = "ok"
196                    elif decoded[1] == 'eval-result':
197                        # Send back the last received eval result.
198                        response = last_eval
199                    elif decoded[1] == 'call me':
200                        cmd = '[0,"we called you"]'
201                        print("sending: {0}".format(cmd))
202                        self.request.sendall(cmd.encode('utf-8'))
203                        response = "ok"
204                    elif decoded[1] == 'call me again':
205                        cmd = '[0,"we did call you"]'
206                        print("sending: {0}".format(cmd))
207                        self.request.sendall(cmd.encode('utf-8'))
208                        response = ""
209                    elif decoded[1] == 'send zero':
210                        cmd = '[0,"zero index"]'
211                        print("sending: {0}".format(cmd))
212                        self.request.sendall(cmd.encode('utf-8'))
213                        response = "sent zero"
214                    elif decoded[1] == 'close me':
215                        print("closing")
216                        self.request.close()
217                        response = ""
218                    elif decoded[1] == 'wait a bit':
219                        time.sleep(0.2)
220                        response = "waited"
221                    elif decoded[1] == '!quit!':
222                        # we're done
223                        self.server.shutdown()
224                        return
225                    elif decoded[1] == '!crash!':
226                        # Crash!
227                        42 / 0
228                    else:
229                        response = "what?"
230
231                    if response == "":
232                        print("no response")
233                    else:
234                        encoded = json.dumps([decoded[0], response])
235                        print("sending: {0}".format(encoded))
236                        self.request.sendall(encoded.encode('utf-8'))
237
238                # Negative numbers are used for "eval" responses.
239                elif decoded[0] < 0:
240                    last_eval = decoded
241
242class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
243    pass
244
245def writePortInFile(port):
246    # Write the port number in Xportnr, so that the test knows it.
247    f = open("Xportnr", "w")
248    f.write("{0}".format(port))
249    f.close()
250
251def main(host, port, server_class=ThreadedTCPServer):
252    # Wait half a second before opening the port to test waittime in ch_open().
253    # We do want to get the port number, get that first.  We cannot open the
254    # socket, guess a port is free.
255    if len(sys.argv) >= 2 and sys.argv[1] == 'delay':
256        port = 13684
257        writePortInFile(port)
258
259        print("Wait for it...")
260        time.sleep(0.5)
261
262    server = server_class((host, port), ThreadedTCPRequestHandler)
263    ip, port = server.server_address[0:2]
264
265    # Start a thread with the server.  That thread will then start a new thread
266    # for each connection.
267    server_thread = threading.Thread(target=server.serve_forever)
268    server_thread.start()
269
270    writePortInFile(port)
271
272    print("Listening on port {0}".format(port))
273
274    # Main thread terminates, but the server continues running
275    # until server.shutdown() is called.
276    try:
277        while server_thread.is_alive():
278            server_thread.join(1)
279    except (KeyboardInterrupt, SystemExit):
280        server.shutdown()
281
282if __name__ == "__main__":
283    main("localhost", 0)
284