xref: /f-stack/app/redis-5.0.5/src/syncio.c (revision 572c4311)
1 /* Synchronous socket and file I/O operations useful across the core.
2  *
3  * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #include "server.h"
32 
33 /* ----------------- Blocking sockets I/O with timeouts --------------------- */
34 
35 /* Redis performs most of the I/O in a nonblocking way, with the exception
36  * of the SYNC command where the slave does it in a blocking way, and
37  * the MIGRATE command that must be blocking in order to be atomic from the
38  * point of view of the two instances (one migrating the key and one receiving
39  * the key). This is why need the following blocking I/O functions.
40  *
41  * All the functions take the timeout in milliseconds. */
42 
43 #define SYNCIO__RESOLUTION 10 /* Resolution in milliseconds */
44 
45 /* Write the specified payload to 'fd'. If writing the whole payload will be
46  * done within 'timeout' milliseconds the operation succeeds and 'size' is
47  * returned. Otherwise the operation fails, -1 is returned, and an unspecified
48  * partial write could be performed against the file descriptor. */
syncWrite(int fd,char * ptr,ssize_t size,long long timeout)49 ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout) {
50     ssize_t nwritten, ret = size;
51     long long start = mstime();
52     long long remaining = timeout;
53 
54     while(1) {
55         long long wait = (remaining > SYNCIO__RESOLUTION) ?
56                           remaining : SYNCIO__RESOLUTION;
57         long long elapsed;
58 
59         /* Optimistically try to write before checking if the file descriptor
60          * is actually writable. At worst we get EAGAIN. */
61         nwritten = write(fd,ptr,size);
62         if (nwritten == -1) {
63             if (errno != EAGAIN) return -1;
64         } else {
65             ptr += nwritten;
66             size -= nwritten;
67         }
68         if (size == 0) return ret;
69 
70         /* Wait */
71         aeWait(fd,AE_WRITABLE,wait);
72         elapsed = mstime() - start;
73         if (elapsed >= timeout) {
74             errno = ETIMEDOUT;
75             return -1;
76         }
77         remaining = timeout - elapsed;
78     }
79 }
80 
81 /* Read the specified amount of bytes from 'fd'. If all the bytes are read
82  * within 'timeout' milliseconds the operation succeed and 'size' is returned.
83  * Otherwise the operation fails, -1 is returned, and an unspecified amount of
84  * data could be read from the file descriptor. */
syncRead(int fd,char * ptr,ssize_t size,long long timeout)85 ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout) {
86     ssize_t nread, totread = 0;
87     long long start = mstime();
88     long long remaining = timeout;
89 
90     if (size == 0) return 0;
91     while(1) {
92         long long wait = (remaining > SYNCIO__RESOLUTION) ?
93                           remaining : SYNCIO__RESOLUTION;
94         long long elapsed;
95 
96         /* Optimistically try to read before checking if the file descriptor
97          * is actually readable. At worst we get EAGAIN. */
98         nread = read(fd,ptr,size);
99         if (nread == 0) return -1; /* short read. */
100         if (nread == -1) {
101             if (errno != EAGAIN) return -1;
102         } else {
103             ptr += nread;
104             size -= nread;
105             totread += nread;
106         }
107         if (size == 0) return totread;
108 
109         /* Wait */
110         aeWait(fd,AE_READABLE,wait);
111         elapsed = mstime() - start;
112         if (elapsed >= timeout) {
113             errno = ETIMEDOUT;
114             return -1;
115         }
116         remaining = timeout - elapsed;
117     }
118 }
119 
120 /* Read a line making sure that every char will not require more than 'timeout'
121  * milliseconds to be read.
122  *
123  * On success the number of bytes read is returned, otherwise -1.
124  * On success the string is always correctly terminated with a 0 byte. */
syncReadLine(int fd,char * ptr,ssize_t size,long long timeout)125 ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout) {
126     ssize_t nread = 0;
127 
128     size--;
129     while(size) {
130         char c;
131 
132         if (syncRead(fd,&c,1,timeout) == -1) return -1;
133         if (c == '\n') {
134             *ptr = '\0';
135             if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
136             return nread;
137         } else {
138             *ptr++ = c;
139             *ptr = '\0';
140             nread++;
141         }
142         size--;
143     }
144     return nread;
145 }
146