xref: /f-stack/app/redis-5.0.5/src/rio.c (revision 572c4311)
1 /* rio.c is a simple stream-oriented I/O abstraction that provides an interface
2  * to write code that can consume/produce data using different concrete input
3  * and output devices. For instance the same rdb.c code using the rio
4  * abstraction can be used to read and write the RDB format using in-memory
5  * buffers or files.
6  *
7  * A rio object provides the following methods:
8  *  read: read from stream.
9  *  write: write to stream.
10  *  tell: get the current offset.
11  *
12  * It is also possible to set a 'checksum' method that is used by rio.c in order
13  * to compute a checksum of the data written or read, or to query the rio object
14  * for the current checksum.
15  *
16  * ----------------------------------------------------------------------------
17  *
18  * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
19  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
20  * All rights reserved.
21  *
22  * Redistribution and use in source and binary forms, with or without
23  * modification, are permitted provided that the following conditions are met:
24  *
25  *   * Redistributions of source code must retain the above copyright notice,
26  *     this list of conditions and the following disclaimer.
27  *   * Redistributions in binary form must reproduce the above copyright
28  *     notice, this list of conditions and the following disclaimer in the
29  *     documentation and/or other materials provided with the distribution.
30  *   * Neither the name of Redis nor the names of its contributors may be used
31  *     to endorse or promote products derived from this software without
32  *     specific prior written permission.
33  *
34  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
35  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
38  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
39  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
40  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
41  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
42  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
43  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
44  * POSSIBILITY OF SUCH DAMAGE.
45  */
46 
47 
48 #include "fmacros.h"
49 #include <string.h>
50 #include <stdio.h>
51 #include <unistd.h>
52 #include "rio.h"
53 #include "util.h"
54 #include "crc64.h"
55 #include "config.h"
56 #include "server.h"
57 
58 /* ------------------------- Buffer I/O implementation ----------------------- */
59 
60 /* Returns 1 or 0 for success/failure. */
rioBufferWrite(rio * r,const void * buf,size_t len)61 static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
62     r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len);
63     r->io.buffer.pos += len;
64     return 1;
65 }
66 
67 /* Returns 1 or 0 for success/failure. */
rioBufferRead(rio * r,void * buf,size_t len)68 static size_t rioBufferRead(rio *r, void *buf, size_t len) {
69     if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
70         return 0; /* not enough buffer to return len bytes. */
71     memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
72     r->io.buffer.pos += len;
73     return 1;
74 }
75 
76 /* Returns read/write position in buffer. */
rioBufferTell(rio * r)77 static off_t rioBufferTell(rio *r) {
78     return r->io.buffer.pos;
79 }
80 
81 /* Flushes any buffer to target device if applicable. Returns 1 on success
82  * and 0 on failures. */
rioBufferFlush(rio * r)83 static int rioBufferFlush(rio *r) {
84     UNUSED(r);
85     return 1; /* Nothing to do, our write just appends to the buffer. */
86 }
87 
88 static const rio rioBufferIO = {
89     rioBufferRead,
90     rioBufferWrite,
91     rioBufferTell,
92     rioBufferFlush,
93     NULL,           /* update_checksum */
94     0,              /* current checksum */
95     0,              /* bytes read or written */
96     0,              /* read/write chunk size */
97     { { NULL, 0 } } /* union for io-specific vars */
98 };
99 
rioInitWithBuffer(rio * r,sds s)100 void rioInitWithBuffer(rio *r, sds s) {
101     *r = rioBufferIO;
102     r->io.buffer.ptr = s;
103     r->io.buffer.pos = 0;
104 }
105 
106 /* --------------------- Stdio file pointer implementation ------------------- */
107 
108 /* Returns 1 or 0 for success/failure. */
rioFileWrite(rio * r,const void * buf,size_t len)109 static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
110     size_t retval;
111 
112     retval = fwrite(buf,len,1,r->io.file.fp);
113     r->io.file.buffered += len;
114 
115     if (r->io.file.autosync &&
116         r->io.file.buffered >= r->io.file.autosync)
117     {
118         fflush(r->io.file.fp);
119         redis_fsync(fileno(r->io.file.fp));
120         r->io.file.buffered = 0;
121     }
122     return retval;
123 }
124 
125 /* Returns 1 or 0 for success/failure. */
rioFileRead(rio * r,void * buf,size_t len)126 static size_t rioFileRead(rio *r, void *buf, size_t len) {
127     return fread(buf,len,1,r->io.file.fp);
128 }
129 
130 /* Returns read/write position in file. */
rioFileTell(rio * r)131 static off_t rioFileTell(rio *r) {
132     return ftello(r->io.file.fp);
133 }
134 
135 /* Flushes any buffer to target device if applicable. Returns 1 on success
136  * and 0 on failures. */
rioFileFlush(rio * r)137 static int rioFileFlush(rio *r) {
138     return (fflush(r->io.file.fp) == 0) ? 1 : 0;
139 }
140 
141 static const rio rioFileIO = {
142     rioFileRead,
143     rioFileWrite,
144     rioFileTell,
145     rioFileFlush,
146     NULL,           /* update_checksum */
147     0,              /* current checksum */
148     0,              /* bytes read or written */
149     0,              /* read/write chunk size */
150     { { NULL, 0 } } /* union for io-specific vars */
151 };
152 
rioInitWithFile(rio * r,FILE * fp)153 void rioInitWithFile(rio *r, FILE *fp) {
154     *r = rioFileIO;
155     r->io.file.fp = fp;
156     r->io.file.buffered = 0;
157     r->io.file.autosync = 0;
158 }
159 
160 /* ------------------- File descriptors set implementation ------------------- */
161 
162 /* Returns 1 or 0 for success/failure.
163  * The function returns success as long as we are able to correctly write
164  * to at least one file descriptor.
165  *
166  * When buf is NULL and len is 0, the function performs a flush operation
167  * if there is some pending buffer, so this function is also used in order
168  * to implement rioFdsetFlush(). */
rioFdsetWrite(rio * r,const void * buf,size_t len)169 static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
170     ssize_t retval;
171     int j;
172     unsigned char *p = (unsigned char*) buf;
173     int doflush = (buf == NULL && len == 0);
174 
175     /* To start we always append to our buffer. If it gets larger than
176      * a given size, we actually write to the sockets. */
177     if (len) {
178         r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len);
179         len = 0; /* Prevent entering the while below if we don't flush. */
180         if (sdslen(r->io.fdset.buf) > PROTO_IOBUF_LEN) doflush = 1;
181     }
182 
183     if (doflush) {
184         p = (unsigned char*) r->io.fdset.buf;
185         len = sdslen(r->io.fdset.buf);
186     }
187 
188     /* Write in little chunchs so that when there are big writes we
189      * parallelize while the kernel is sending data in background to
190      * the TCP socket. */
191     while(len) {
192         size_t count = len < 1024 ? len : 1024;
193         int broken = 0;
194         for (j = 0; j < r->io.fdset.numfds; j++) {
195             if (r->io.fdset.state[j] != 0) {
196                 /* Skip FDs alraedy in error. */
197                 broken++;
198                 continue;
199             }
200 
201             /* Make sure to write 'count' bytes to the socket regardless
202              * of short writes. */
203             size_t nwritten = 0;
204             while(nwritten != count) {
205                 retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten);
206                 if (retval <= 0) {
207                     /* With blocking sockets, which is the sole user of this
208                      * rio target, EWOULDBLOCK is returned only because of
209                      * the SO_SNDTIMEO socket option, so we translate the error
210                      * into one more recognizable by the user. */
211                     if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
212                     break;
213                 }
214                 nwritten += retval;
215             }
216 
217             if (nwritten != count) {
218                 /* Mark this FD as broken. */
219                 r->io.fdset.state[j] = errno;
220                 if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO;
221             }
222         }
223         if (broken == r->io.fdset.numfds) return 0; /* All the FDs in error. */
224         p += count;
225         len -= count;
226         r->io.fdset.pos += count;
227     }
228 
229     if (doflush) sdsclear(r->io.fdset.buf);
230     return 1;
231 }
232 
233 /* Returns 1 or 0 for success/failure. */
rioFdsetRead(rio * r,void * buf,size_t len)234 static size_t rioFdsetRead(rio *r, void *buf, size_t len) {
235     UNUSED(r);
236     UNUSED(buf);
237     UNUSED(len);
238     return 0; /* Error, this target does not support reading. */
239 }
240 
241 /* Returns read/write position in file. */
rioFdsetTell(rio * r)242 static off_t rioFdsetTell(rio *r) {
243     return r->io.fdset.pos;
244 }
245 
246 /* Flushes any buffer to target device if applicable. Returns 1 on success
247  * and 0 on failures. */
rioFdsetFlush(rio * r)248 static int rioFdsetFlush(rio *r) {
249     /* Our flush is implemented by the write method, that recognizes a
250      * buffer set to NULL with a count of zero as a flush request. */
251     return rioFdsetWrite(r,NULL,0);
252 }
253 
254 static const rio rioFdsetIO = {
255     rioFdsetRead,
256     rioFdsetWrite,
257     rioFdsetTell,
258     rioFdsetFlush,
259     NULL,           /* update_checksum */
260     0,              /* current checksum */
261     0,              /* bytes read or written */
262     0,              /* read/write chunk size */
263     { { NULL, 0 } } /* union for io-specific vars */
264 };
265 
rioInitWithFdset(rio * r,int * fds,int numfds)266 void rioInitWithFdset(rio *r, int *fds, int numfds) {
267     int j;
268 
269     *r = rioFdsetIO;
270     r->io.fdset.fds = zmalloc(sizeof(int)*numfds);
271     r->io.fdset.state = zmalloc(sizeof(int)*numfds);
272     memcpy(r->io.fdset.fds,fds,sizeof(int)*numfds);
273     for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0;
274     r->io.fdset.numfds = numfds;
275     r->io.fdset.pos = 0;
276     r->io.fdset.buf = sdsempty();
277 }
278 
279 /* release the rio stream. */
rioFreeFdset(rio * r)280 void rioFreeFdset(rio *r) {
281     zfree(r->io.fdset.fds);
282     zfree(r->io.fdset.state);
283     sdsfree(r->io.fdset.buf);
284 }
285 
286 /* ---------------------------- Generic functions ---------------------------- */
287 
288 /* This function can be installed both in memory and file streams when checksum
289  * computation is needed. */
rioGenericUpdateChecksum(rio * r,const void * buf,size_t len)290 void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
291     r->cksum = crc64(r->cksum,buf,len);
292 }
293 
294 /* Set the file-based rio object to auto-fsync every 'bytes' file written.
295  * By default this is set to zero that means no automatic file sync is
296  * performed.
297  *
298  * This feature is useful in a few contexts since when we rely on OS write
299  * buffers sometimes the OS buffers way too much, resulting in too many
300  * disk I/O concentrated in very little time. When we fsync in an explicit
301  * way instead the I/O pressure is more distributed across time. */
rioSetAutoSync(rio * r,off_t bytes)302 void rioSetAutoSync(rio *r, off_t bytes) {
303     serverAssert(r->read == rioFileIO.read);
304     r->io.file.autosync = bytes;
305 }
306 
307 /* --------------------------- Higher level interface --------------------------
308  *
309  * The following higher level functions use lower level rio.c functions to help
310  * generating the Redis protocol for the Append Only File. */
311 
312 /* Write multi bulk count in the format: "*<count>\r\n". */
rioWriteBulkCount(rio * r,char prefix,long count)313 size_t rioWriteBulkCount(rio *r, char prefix, long count) {
314     char cbuf[128];
315     int clen;
316 
317     cbuf[0] = prefix;
318     clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
319     cbuf[clen++] = '\r';
320     cbuf[clen++] = '\n';
321     if (rioWrite(r,cbuf,clen) == 0) return 0;
322     return clen;
323 }
324 
325 /* Write binary-safe string in the format: "$<count>\r\n<payload>\r\n". */
rioWriteBulkString(rio * r,const char * buf,size_t len)326 size_t rioWriteBulkString(rio *r, const char *buf, size_t len) {
327     size_t nwritten;
328 
329     if ((nwritten = rioWriteBulkCount(r,'$',len)) == 0) return 0;
330     if (len > 0 && rioWrite(r,buf,len) == 0) return 0;
331     if (rioWrite(r,"\r\n",2) == 0) return 0;
332     return nwritten+len+2;
333 }
334 
335 /* Write a long long value in format: "$<count>\r\n<payload>\r\n". */
rioWriteBulkLongLong(rio * r,long long l)336 size_t rioWriteBulkLongLong(rio *r, long long l) {
337     char lbuf[32];
338     unsigned int llen;
339 
340     llen = ll2string(lbuf,sizeof(lbuf),l);
341     return rioWriteBulkString(r,lbuf,llen);
342 }
343 
344 /* Write a double value in the format: "$<count>\r\n<payload>\r\n" */
rioWriteBulkDouble(rio * r,double d)345 size_t rioWriteBulkDouble(rio *r, double d) {
346     char dbuf[128];
347     unsigned int dlen;
348 
349     dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
350     return rioWriteBulkString(r,dbuf,dlen);
351 }
352