1 /* rio.c is a simple stream-oriented I/O abstraction that provides an interface
2 * to write code that can consume/produce data using different concrete input
3 * and output devices. For instance the same rdb.c code using the rio
4 * abstraction can be used to read and write the RDB format using in-memory
5 * buffers or files.
6 *
7 * A rio object provides the following methods:
8 * read: read from stream.
9 * write: write to stream.
10 * tell: get the current offset.
11 *
12 * It is also possible to set a 'checksum' method that is used by rio.c in order
13 * to compute a checksum of the data written or read, or to query the rio object
14 * for the current checksum.
15 *
16 * ----------------------------------------------------------------------------
17 *
18 * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
19 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
20 * All rights reserved.
21 *
22 * Redistribution and use in source and binary forms, with or without
23 * modification, are permitted provided that the following conditions are met:
24 *
25 * * Redistributions of source code must retain the above copyright notice,
26 * this list of conditions and the following disclaimer.
27 * * Redistributions in binary form must reproduce the above copyright
28 * notice, this list of conditions and the following disclaimer in the
29 * documentation and/or other materials provided with the distribution.
30 * * Neither the name of Redis nor the names of its contributors may be used
31 * to endorse or promote products derived from this software without
32 * specific prior written permission.
33 *
34 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
35 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
38 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
39 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
40 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
41 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
42 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
43 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
44 * POSSIBILITY OF SUCH DAMAGE.
45 */
46
47
48 #include "fmacros.h"
49 #include <string.h>
50 #include <stdio.h>
51 #include <unistd.h>
52 #include "rio.h"
53 #include "util.h"
54 #include "crc64.h"
55 #include "config.h"
56 #include "server.h"
57
58 /* ------------------------- Buffer I/O implementation ----------------------- */
59
60 /* Returns 1 or 0 for success/failure. */
rioBufferWrite(rio * r,const void * buf,size_t len)61 static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
62 r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len);
63 r->io.buffer.pos += len;
64 return 1;
65 }
66
67 /* Returns 1 or 0 for success/failure. */
rioBufferRead(rio * r,void * buf,size_t len)68 static size_t rioBufferRead(rio *r, void *buf, size_t len) {
69 if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
70 return 0; /* not enough buffer to return len bytes. */
71 memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
72 r->io.buffer.pos += len;
73 return 1;
74 }
75
76 /* Returns read/write position in buffer. */
rioBufferTell(rio * r)77 static off_t rioBufferTell(rio *r) {
78 return r->io.buffer.pos;
79 }
80
81 /* Flushes any buffer to target device if applicable. Returns 1 on success
82 * and 0 on failures. */
rioBufferFlush(rio * r)83 static int rioBufferFlush(rio *r) {
84 UNUSED(r);
85 return 1; /* Nothing to do, our write just appends to the buffer. */
86 }
87
88 static const rio rioBufferIO = {
89 rioBufferRead,
90 rioBufferWrite,
91 rioBufferTell,
92 rioBufferFlush,
93 NULL, /* update_checksum */
94 0, /* current checksum */
95 0, /* bytes read or written */
96 0, /* read/write chunk size */
97 { { NULL, 0 } } /* union for io-specific vars */
98 };
99
rioInitWithBuffer(rio * r,sds s)100 void rioInitWithBuffer(rio *r, sds s) {
101 *r = rioBufferIO;
102 r->io.buffer.ptr = s;
103 r->io.buffer.pos = 0;
104 }
105
106 /* --------------------- Stdio file pointer implementation ------------------- */
107
108 /* Returns 1 or 0 for success/failure. */
rioFileWrite(rio * r,const void * buf,size_t len)109 static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
110 size_t retval;
111
112 retval = fwrite(buf,len,1,r->io.file.fp);
113 r->io.file.buffered += len;
114
115 if (r->io.file.autosync &&
116 r->io.file.buffered >= r->io.file.autosync)
117 {
118 fflush(r->io.file.fp);
119 aof_fsync(fileno(r->io.file.fp));
120 r->io.file.buffered = 0;
121 }
122 return retval;
123 }
124
125 /* Returns 1 or 0 for success/failure. */
rioFileRead(rio * r,void * buf,size_t len)126 static size_t rioFileRead(rio *r, void *buf, size_t len) {
127 return fread(buf,len,1,r->io.file.fp);
128 }
129
130 /* Returns read/write position in file. */
rioFileTell(rio * r)131 static off_t rioFileTell(rio *r) {
132 return ftello(r->io.file.fp);
133 }
134
135 /* Flushes any buffer to target device if applicable. Returns 1 on success
136 * and 0 on failures. */
rioFileFlush(rio * r)137 static int rioFileFlush(rio *r) {
138 return (fflush(r->io.file.fp) == 0) ? 1 : 0;
139 }
140
141 static const rio rioFileIO = {
142 rioFileRead,
143 rioFileWrite,
144 rioFileTell,
145 rioFileFlush,
146 NULL, /* update_checksum */
147 0, /* current checksum */
148 0, /* bytes read or written */
149 0, /* read/write chunk size */
150 { { NULL, 0 } } /* union for io-specific vars */
151 };
152
rioInitWithFile(rio * r,FILE * fp)153 void rioInitWithFile(rio *r, FILE *fp) {
154 *r = rioFileIO;
155 r->io.file.fp = fp;
156 r->io.file.buffered = 0;
157 r->io.file.autosync = 0;
158 }
159
160 /* ------------------- File descriptors set implementation ------------------- */
161
162 /* Returns 1 or 0 for success/failure.
163 * The function returns success as long as we are able to correctly write
164 * to at least one file descriptor.
165 *
166 * When buf is NULL and len is 0, the function performs a flush operation
167 * if there is some pending buffer, so this function is also used in order
168 * to implement rioFdsetFlush(). */
rioFdsetWrite(rio * r,const void * buf,size_t len)169 static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
170 ssize_t retval;
171 int j;
172 unsigned char *p = (unsigned char*) buf;
173 int doflush = (buf == NULL && len == 0);
174
175 /* To start we always append to our buffer. If it gets larger than
176 * a given size, we actually write to the sockets. */
177 if (len) {
178 r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len);
179 len = 0; /* Prevent entering the while below if we don't flush. */
180 if (sdslen(r->io.fdset.buf) > PROTO_IOBUF_LEN) doflush = 1;
181 }
182
183 if (doflush) {
184 p = (unsigned char*) r->io.fdset.buf;
185 len = sdslen(r->io.fdset.buf);
186 }
187
188 /* Write in little chunchs so that when there are big writes we
189 * parallelize while the kernel is sending data in background to
190 * the TCP socket. */
191 while(len) {
192 size_t count = len < 1024 ? len : 1024;
193 int broken = 0;
194 for (j = 0; j < r->io.fdset.numfds; j++) {
195 if (r->io.fdset.state[j] != 0) {
196 /* Skip FDs alraedy in error. */
197 broken++;
198 continue;
199 }
200
201 /* Make sure to write 'count' bytes to the socket regardless
202 * of short writes. */
203 size_t nwritten = 0;
204 while(nwritten != count) {
205 retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten);
206 if (retval <= 0) {
207 /* With blocking sockets, which is the sole user of this
208 * rio target, EWOULDBLOCK is returned only because of
209 * the SO_SNDTIMEO socket option, so we translate the error
210 * into one more recognizable by the user. */
211 if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
212 break;
213 }
214 nwritten += retval;
215 }
216
217 if (nwritten != count) {
218 /* Mark this FD as broken. */
219 r->io.fdset.state[j] = errno;
220 if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO;
221 }
222 }
223 if (broken == r->io.fdset.numfds) return 0; /* All the FDs in error. */
224 p += count;
225 len -= count;
226 r->io.fdset.pos += count;
227 }
228
229 if (doflush) sdsclear(r->io.fdset.buf);
230 return 1;
231 }
232
233 /* Returns 1 or 0 for success/failure. */
rioFdsetRead(rio * r,void * buf,size_t len)234 static size_t rioFdsetRead(rio *r, void *buf, size_t len) {
235 UNUSED(r);
236 UNUSED(buf);
237 UNUSED(len);
238 return 0; /* Error, this target does not support reading. */
239 }
240
241 /* Returns read/write position in file. */
rioFdsetTell(rio * r)242 static off_t rioFdsetTell(rio *r) {
243 return r->io.fdset.pos;
244 }
245
246 /* Flushes any buffer to target device if applicable. Returns 1 on success
247 * and 0 on failures. */
rioFdsetFlush(rio * r)248 static int rioFdsetFlush(rio *r) {
249 /* Our flush is implemented by the write method, that recognizes a
250 * buffer set to NULL with a count of zero as a flush request. */
251 return rioFdsetWrite(r,NULL,0);
252 }
253
254 static const rio rioFdsetIO = {
255 rioFdsetRead,
256 rioFdsetWrite,
257 rioFdsetTell,
258 rioFdsetFlush,
259 NULL, /* update_checksum */
260 0, /* current checksum */
261 0, /* bytes read or written */
262 0, /* read/write chunk size */
263 { { NULL, 0 } } /* union for io-specific vars */
264 };
265
rioInitWithFdset(rio * r,int * fds,int numfds)266 void rioInitWithFdset(rio *r, int *fds, int numfds) {
267 int j;
268
269 *r = rioFdsetIO;
270 r->io.fdset.fds = zmalloc(sizeof(int)*numfds);
271 r->io.fdset.state = zmalloc(sizeof(int)*numfds);
272 memcpy(r->io.fdset.fds,fds,sizeof(int)*numfds);
273 for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0;
274 r->io.fdset.numfds = numfds;
275 r->io.fdset.pos = 0;
276 r->io.fdset.buf = sdsempty();
277 }
278
279 /* release the rio stream. */
rioFreeFdset(rio * r)280 void rioFreeFdset(rio *r) {
281 zfree(r->io.fdset.fds);
282 zfree(r->io.fdset.state);
283 sdsfree(r->io.fdset.buf);
284 }
285
286 /* ---------------------------- Generic functions ---------------------------- */
287
288 /* This function can be installed both in memory and file streams when checksum
289 * computation is needed. */
rioGenericUpdateChecksum(rio * r,const void * buf,size_t len)290 void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
291 r->cksum = crc64(r->cksum,buf,len);
292 }
293
294 /* Set the file-based rio object to auto-fsync every 'bytes' file written.
295 * By default this is set to zero that means no automatic file sync is
296 * performed.
297 *
298 * This feature is useful in a few contexts since when we rely on OS write
299 * buffers sometimes the OS buffers way too much, resulting in too many
300 * disk I/O concentrated in very little time. When we fsync in an explicit
301 * way instead the I/O pressure is more distributed across time. */
rioSetAutoSync(rio * r,off_t bytes)302 void rioSetAutoSync(rio *r, off_t bytes) {
303 serverAssert(r->read == rioFileIO.read);
304 r->io.file.autosync = bytes;
305 }
306
307 /* --------------------------- Higher level interface --------------------------
308 *
309 * The following higher level functions use lower level rio.c functions to help
310 * generating the Redis protocol for the Append Only File. */
311
312 /* Write multi bulk count in the format: "*<count>\r\n". */
rioWriteBulkCount(rio * r,char prefix,int count)313 size_t rioWriteBulkCount(rio *r, char prefix, int count) {
314 char cbuf[128];
315 int clen;
316
317 cbuf[0] = prefix;
318 clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
319 cbuf[clen++] = '\r';
320 cbuf[clen++] = '\n';
321 if (rioWrite(r,cbuf,clen) == 0) return 0;
322 return clen;
323 }
324
325 /* Write binary-safe string in the format: "$<count>\r\n<payload>\r\n". */
rioWriteBulkString(rio * r,const char * buf,size_t len)326 size_t rioWriteBulkString(rio *r, const char *buf, size_t len) {
327 size_t nwritten;
328
329 if ((nwritten = rioWriteBulkCount(r,'$',len)) == 0) return 0;
330 if (len > 0 && rioWrite(r,buf,len) == 0) return 0;
331 if (rioWrite(r,"\r\n",2) == 0) return 0;
332 return nwritten+len+2;
333 }
334
335 /* Write a long long value in format: "$<count>\r\n<payload>\r\n". */
rioWriteBulkLongLong(rio * r,long long l)336 size_t rioWriteBulkLongLong(rio *r, long long l) {
337 char lbuf[32];
338 unsigned int llen;
339
340 llen = ll2string(lbuf,sizeof(lbuf),l);
341 return rioWriteBulkString(r,lbuf,llen);
342 }
343
344 /* Write a double value in the format: "$<count>\r\n<payload>\r\n" */
rioWriteBulkDouble(rio * r,double d)345 size_t rioWriteBulkDouble(rio *r, double d) {
346 char dbuf[128];
347 unsigned int dlen;
348
349 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
350 return rioWriteBulkString(r,dbuf,dlen);
351 }
352