1 /* Synchronous socket and file I/O operations useful across the core. 2 * 3 * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * * Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * * Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * * Neither the name of Redis nor the names of its contributors may be used 15 * to endorse or promote products derived from this software without 16 * specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #include "server.h" 32 33 /* ----------------- Blocking sockets I/O with timeouts --------------------- */ 34 35 /* Redis performs most of the I/O in a nonblocking way, with the exception 36 * of the SYNC command where the slave does it in a blocking way, and 37 * the MIGRATE command that must be blocking in order to be atomic from the 38 * point of view of the two instances (one migrating the key and one receiving 39 * the key). This is why need the following blocking I/O functions. 40 * 41 * All the functions take the timeout in milliseconds. */ 42 43 #define SYNCIO__RESOLUTION 10 /* Resolution in milliseconds */ 44 45 /* Write the specified payload to 'fd'. If writing the whole payload will be 46 * done within 'timeout' milliseconds the operation succeeds and 'size' is 47 * returned. Otherwise the operation fails, -1 is returned, and an unspecified 48 * partial write could be performed against the file descriptor. */ 49 ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout) { 50 ssize_t nwritten, ret = size; 51 long long start = mstime(); 52 long long remaining = timeout; 53 54 while(1) { 55 long long wait = (remaining > SYNCIO__RESOLUTION) ? 56 remaining : SYNCIO__RESOLUTION; 57 long long elapsed; 58 59 /* Optimistically try to write before checking if the file descriptor 60 * is actually writable. At worst we get EAGAIN. */ 61 nwritten = write(fd,ptr,size); 62 if (nwritten == -1) { 63 if (errno != EAGAIN) return -1; 64 } else { 65 ptr += nwritten; 66 size -= nwritten; 67 } 68 if (size == 0) return ret; 69 70 /* Wait */ 71 aeWait(fd,AE_WRITABLE,wait); 72 elapsed = mstime() - start; 73 if (elapsed >= timeout) { 74 errno = ETIMEDOUT; 75 return -1; 76 } 77 remaining = timeout - elapsed; 78 } 79 } 80 81 /* Read the specified amount of bytes from 'fd'. If all the bytes are read 82 * within 'timeout' milliseconds the operation succeed and 'size' is returned. 83 * Otherwise the operation fails, -1 is returned, and an unspecified amount of 84 * data could be read from the file descriptor. */ 85 ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout) { 86 ssize_t nread, totread = 0; 87 long long start = mstime(); 88 long long remaining = timeout; 89 90 if (size == 0) return 0; 91 while(1) { 92 long long wait = (remaining > SYNCIO__RESOLUTION) ? 93 remaining : SYNCIO__RESOLUTION; 94 long long elapsed; 95 96 /* Optimistically try to read before checking if the file descriptor 97 * is actually readable. At worst we get EAGAIN. */ 98 nread = read(fd,ptr,size); 99 if (nread == 0) return -1; /* short read. */ 100 if (nread == -1) { 101 if (errno != EAGAIN) return -1; 102 } else { 103 ptr += nread; 104 size -= nread; 105 totread += nread; 106 } 107 if (size == 0) return totread; 108 109 /* Wait */ 110 aeWait(fd,AE_READABLE,wait); 111 elapsed = mstime() - start; 112 if (elapsed >= timeout) { 113 errno = ETIMEDOUT; 114 return -1; 115 } 116 remaining = timeout - elapsed; 117 } 118 } 119 120 /* Read a line making sure that every char will not require more than 'timeout' 121 * milliseconds to be read. 122 * 123 * On success the number of bytes read is returned, otherwise -1. 124 * On success the string is always correctly terminated with a 0 byte. */ 125 ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout) { 126 ssize_t nread = 0; 127 128 size--; 129 while(size) { 130 char c; 131 132 if (syncRead(fd,&c,1,timeout) == -1) return -1; 133 if (c == '\n') { 134 *ptr = '\0'; 135 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0'; 136 return nread; 137 } else { 138 *ptr++ = c; 139 *ptr = '\0'; 140 nread++; 141 } 142 size--; 143 } 144 return nread; 145 } 146