1 /* 2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * * Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * * Neither the name of Redis nor the names of its contributors may be used 14 * to endorse or promote products derived from this software without 15 * specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27 * POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "server.h" 31 #include "cluster.h" 32 #include "slowlog.h" 33 #include "bio.h" 34 #include "latency.h" 35 36 #include <time.h> 37 #include <signal.h> 38 #include <sys/wait.h> 39 #include <errno.h> 40 #include <assert.h> 41 #include <ctype.h> 42 #include <stdarg.h> 43 #include <arpa/inet.h> 44 #include <sys/stat.h> 45 #include <fcntl.h> 46 #include <sys/time.h> 47 #include <sys/resource.h> 48 #include <sys/uio.h> 49 #include <sys/un.h> 50 #include <limits.h> 51 #include <float.h> 52 #include <math.h> 53 #include <sys/resource.h> 54 #include <sys/utsname.h> 55 #include <locale.h> 56 #include <sys/socket.h> 57 58 /* Our shared "common" objects */ 59 60 struct sharedObjectsStruct shared; 61 62 /* Global vars that are actually used as constants. The following double 63 * values are used for double on-disk serialization, and are initialized 64 * at runtime to avoid strange compiler optimizations. */ 65 66 double R_Zero, R_PosInf, R_NegInf, R_Nan; 67 68 /*================================= Globals ================================= */ 69 70 /* Global vars */ 71 struct redisServer server; /* server global state */ 72 73 /* Our command table. 74 * 75 * Every entry is composed of the following fields: 76 * 77 * name: a string representing the command name. 78 * function: pointer to the C function implementing the command. 79 * arity: number of arguments, it is possible to use -N to say >= N 80 * sflags: command flags as string. See below for a table of flags. 81 * flags: flags as bitmask. Computed by Redis using the 'sflags' field. 82 * get_keys_proc: an optional function to get key arguments from a command. 83 * This is only used when the following three fields are not 84 * enough to specify what arguments are keys. 85 * first_key_index: first argument that is a key 86 * last_key_index: last argument that is a key 87 * key_step: step to get all the keys from first to last argument. For instance 88 * in MSET the step is two since arguments are key,val,key,val,... 89 * microseconds: microseconds of total execution time for this command. 90 * calls: total number of calls of this command. 91 * 92 * The flags, microseconds and calls fields are computed by Redis and should 93 * always be set to zero. 94 * 95 * Command flags are expressed using strings where every character represents 96 * a flag. Later the populateCommandTable() function will take care of 97 * populating the real 'flags' field using this characters. 98 * 99 * This is the meaning of the flags: 100 * 101 * w: write command (may modify the key space). 102 * r: read command (will never modify the key space). 103 * m: may increase memory usage once called. Don't allow if out of memory. 104 * a: admin command, like SAVE or SHUTDOWN. 105 * p: Pub/Sub related command. 106 * f: force replication of this command, regardless of server.dirty. 107 * s: command not allowed in scripts. 108 * R: random command. Command is not deterministic, that is, the same command 109 * with the same arguments, with the same key space, may have different 110 * results. For instance SPOP and RANDOMKEY are two random commands. 111 * S: Sort command output array if called from script, so that the output 112 * is deterministic. 113 * l: Allow command while loading the database. 114 * t: Allow command while a slave has stale data but is not allowed to 115 * server this data. Normally no command is accepted in this condition 116 * but just a few. 117 * M: Do not automatically propagate the command on MONITOR. 118 * k: Perform an implicit ASKING for this command, so the command will be 119 * accepted in cluster mode if the slot is marked as 'importing'. 120 * F: Fast command: O(1) or O(log(N)) command that should never delay 121 * its execution as long as the kernel scheduler is giving us time. 122 * Note that commands that may trigger a DEL as a side effect (like SET) 123 * are not fast commands. 124 */ 125 struct redisCommand redisCommandTable[] = { 126 {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, 127 {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}, 128 {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0}, 129 {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0}, 130 {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0}, 131 {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0}, 132 {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0}, 133 {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0}, 134 {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0}, 135 {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0}, 136 {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0}, 137 {"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0}, 138 {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0}, 139 {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0}, 140 {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0}, 141 {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0}, 142 {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0}, 143 {"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0}, 144 {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, 145 {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, 146 {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0}, 147 {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0}, 148 {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0}, 149 {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0}, 150 {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0}, 151 {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0}, 152 {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0}, 153 {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0}, 154 {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0}, 155 {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0}, 156 {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0}, 157 {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0}, 158 {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0}, 159 {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0}, 160 {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0}, 161 {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0}, 162 {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0}, 163 {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0}, 164 {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0}, 165 {"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0}, 166 {"spop",spopCommand,-2,"wRsF",0,NULL,1,1,1,0,0}, 167 {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0}, 168 {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0}, 169 {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, 170 {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0}, 171 {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, 172 {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0}, 173 {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, 174 {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0}, 175 {"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0}, 176 {"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0}, 177 {"zincrby",zincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0}, 178 {"zrem",zremCommand,-3,"wF",0,NULL,1,1,1,0,0}, 179 {"zremrangebyscore",zremrangebyscoreCommand,4,"w",0,NULL,1,1,1,0,0}, 180 {"zremrangebyrank",zremrangebyrankCommand,4,"w",0,NULL,1,1,1,0,0}, 181 {"zremrangebylex",zremrangebylexCommand,4,"w",0,NULL,1,1,1,0,0}, 182 {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0}, 183 {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0}, 184 {"zrange",zrangeCommand,-4,"r",0,NULL,1,1,1,0,0}, 185 {"zrangebyscore",zrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0}, 186 {"zrevrangebyscore",zrevrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0}, 187 {"zrangebylex",zrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0}, 188 {"zrevrangebylex",zrevrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0}, 189 {"zcount",zcountCommand,4,"rF",0,NULL,1,1,1,0,0}, 190 {"zlexcount",zlexcountCommand,4,"rF",0,NULL,1,1,1,0,0}, 191 {"zrevrange",zrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0}, 192 {"zcard",zcardCommand,2,"rF",0,NULL,1,1,1,0,0}, 193 {"zscore",zscoreCommand,3,"rF",0,NULL,1,1,1,0,0}, 194 {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0}, 195 {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0}, 196 {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0}, 197 {"hset",hsetCommand,4,"wmF",0,NULL,1,1,1,0,0}, 198 {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0}, 199 {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0}, 200 {"hmset",hmsetCommand,-4,"wm",0,NULL,1,1,1,0,0}, 201 {"hmget",hmgetCommand,-3,"r",0,NULL,1,1,1,0,0}, 202 {"hincrby",hincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0}, 203 {"hincrbyfloat",hincrbyfloatCommand,4,"wmF",0,NULL,1,1,1,0,0}, 204 {"hdel",hdelCommand,-3,"wF",0,NULL,1,1,1,0,0}, 205 {"hlen",hlenCommand,2,"rF",0,NULL,1,1,1,0,0}, 206 {"hstrlen",hstrlenCommand,3,"rF",0,NULL,1,1,1,0,0}, 207 {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0}, 208 {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0}, 209 {"hgetall",hgetallCommand,2,"r",0,NULL,1,1,1,0,0}, 210 {"hexists",hexistsCommand,3,"rF",0,NULL,1,1,1,0,0}, 211 {"hscan",hscanCommand,-3,"rR",0,NULL,1,1,1,0,0}, 212 {"incrby",incrbyCommand,3,"wmF",0,NULL,1,1,1,0,0}, 213 {"decrby",decrbyCommand,3,"wmF",0,NULL,1,1,1,0,0}, 214 {"incrbyfloat",incrbyfloatCommand,3,"wmF",0,NULL,1,1,1,0,0}, 215 {"getset",getsetCommand,3,"wm",0,NULL,1,1,1,0,0}, 216 {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0}, 217 {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0}, 218 {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0}, 219 {"select",selectCommand,2,"rlF",0,NULL,0,0,0,0,0}, 220 {"move",moveCommand,3,"wF",0,NULL,1,1,1,0,0}, 221 {"rename",renameCommand,3,"w",0,NULL,1,2,1,0,0}, 222 {"renamenx",renamenxCommand,3,"wF",0,NULL,1,2,1,0,0}, 223 {"expire",expireCommand,3,"wF",0,NULL,1,1,1,0,0}, 224 {"expireat",expireatCommand,3,"wF",0,NULL,1,1,1,0,0}, 225 {"pexpire",pexpireCommand,3,"wF",0,NULL,1,1,1,0,0}, 226 {"pexpireat",pexpireatCommand,3,"wF",0,NULL,1,1,1,0,0}, 227 {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0}, 228 {"scan",scanCommand,-2,"rR",0,NULL,0,0,0,0,0}, 229 {"dbsize",dbsizeCommand,1,"rF",0,NULL,0,0,0,0,0}, 230 {"auth",authCommand,2,"rsltF",0,NULL,0,0,0,0,0}, 231 {"ping",pingCommand,-1,"rtF",0,NULL,0,0,0,0,0}, 232 {"echo",echoCommand,2,"rF",0,NULL,0,0,0,0,0}, 233 {"save",saveCommand,1,"ars",0,NULL,0,0,0,0,0}, 234 {"bgsave",bgsaveCommand,1,"ar",0,NULL,0,0,0,0,0}, 235 {"bgrewriteaof",bgrewriteaofCommand,1,"ar",0,NULL,0,0,0,0,0}, 236 {"shutdown",shutdownCommand,-1,"arlt",0,NULL,0,0,0,0,0}, 237 {"lastsave",lastsaveCommand,1,"rRF",0,NULL,0,0,0,0,0}, 238 {"type",typeCommand,2,"rF",0,NULL,1,1,1,0,0}, 239 {"multi",multiCommand,1,"rsF",0,NULL,0,0,0,0,0}, 240 {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0}, 241 {"discard",discardCommand,1,"rsF",0,NULL,0,0,0,0,0}, 242 {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0}, 243 {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0}, 244 {"replconf",replconfCommand,-1,"arslt",0,NULL,0,0,0,0,0}, 245 {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0}, 246 {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0}, 247 {"sort",sortCommand,-2,"wm",0,sortGetKeys,1,1,1,0,0}, 248 {"info",infoCommand,-1,"rlt",0,NULL,0,0,0,0,0}, 249 {"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0}, 250 {"ttl",ttlCommand,2,"rF",0,NULL,1,1,1,0,0}, 251 {"pttl",pttlCommand,2,"rF",0,NULL,1,1,1,0,0}, 252 {"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0}, 253 {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0}, 254 {"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0}, 255 {"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0}, 256 {"config",configCommand,-2,"art",0,NULL,0,0,0,0,0}, 257 {"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0}, 258 {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0}, 259 {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0}, 260 {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0}, 261 {"publish",publishCommand,3,"pltrF",0,NULL,0,0,0,0,0}, 262 {"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0}, 263 {"watch",watchCommand,-2,"rsF",0,NULL,1,-1,1,0,0}, 264 {"unwatch",unwatchCommand,1,"rsF",0,NULL,0,0,0,0,0}, 265 {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0}, 266 {"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0}, 267 {"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0}, 268 {"migrate",migrateCommand,-6,"w",0,migrateGetKeys,0,0,0,0,0}, 269 {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0}, 270 {"readonly",readonlyCommand,1,"rF",0,NULL,0,0,0,0,0}, 271 {"readwrite",readwriteCommand,1,"rF",0,NULL,0,0,0,0,0}, 272 {"dump",dumpCommand,2,"r",0,NULL,1,1,1,0,0}, 273 {"object",objectCommand,3,"r",0,NULL,2,2,2,0,0}, 274 {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0}, 275 {"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0}, 276 {"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0}, 277 {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0}, 278 {"script",scriptCommand,-2,"rs",0,NULL,0,0,0,0,0}, 279 {"time",timeCommand,1,"rRF",0,NULL,0,0,0,0,0}, 280 {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0}, 281 {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0}, 282 {"bitpos",bitposCommand,-3,"r",0,NULL,1,1,1,0,0}, 283 {"wait",waitCommand,3,"rs",0,NULL,0,0,0,0,0}, 284 {"command",commandCommand,0,"rlt",0,NULL,0,0,0,0,0}, 285 {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0}, 286 {"georadius",georadiusCommand,-6,"w",0,NULL,1,1,1,0,0}, 287 {"georadiusbymember",georadiusByMemberCommand,-5,"w",0,NULL,1,1,1,0,0}, 288 {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0}, 289 {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0}, 290 {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0}, 291 {"pfselftest",pfselftestCommand,1,"r",0,NULL,0,0,0,0,0}, 292 {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0}, 293 {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0}, 294 {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0}, 295 {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0}, 296 {"latency",latencyCommand,-2,"arslt",0,NULL,0,0,0,0,0} 297 }; 298 299 struct evictionPoolEntry *evictionPoolAlloc(void); 300 301 /*============================ Utility functions ============================ */ 302 303 /* Low level logging. To use only for very big messages, otherwise 304 * serverLog() is to prefer. */ 305 void serverLogRaw(int level, const char *msg) { 306 const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING }; 307 const char *c = ".-*#"; 308 FILE *fp; 309 char buf[64]; 310 int rawmode = (level & LL_RAW); 311 int log_to_stdout = server.logfile[0] == '\0'; 312 313 level &= 0xff; /* clear flags */ 314 if (level < server.verbosity) return; 315 316 fp = log_to_stdout ? stdout : fopen(server.logfile,"a"); 317 if (!fp) return; 318 319 if (rawmode) { 320 fprintf(fp,"%s",msg); 321 } else { 322 int off; 323 struct timeval tv; 324 int role_char; 325 pid_t pid = getpid(); 326 327 gettimeofday(&tv,NULL); 328 off = strftime(buf,sizeof(buf),"%d %b %H:%M:%S.",localtime(&tv.tv_sec)); 329 snprintf(buf+off,sizeof(buf)-off,"%03d",(int)tv.tv_usec/1000); 330 if (server.sentinel_mode) { 331 role_char = 'X'; /* Sentinel. */ 332 } else if (pid != server.pid) { 333 role_char = 'C'; /* RDB / AOF writing child. */ 334 } else { 335 role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */ 336 } 337 fprintf(fp,"%d:%c %s %c %s\n", 338 (int)getpid(),role_char, buf,c[level],msg); 339 } 340 fflush(fp); 341 342 if (!log_to_stdout) fclose(fp); 343 if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg); 344 } 345 346 /* Like serverLogRaw() but with printf-alike support. This is the function that 347 * is used across the code. The raw version is only used in order to dump 348 * the INFO output on crash. */ 349 void serverLog(int level, const char *fmt, ...) { 350 va_list ap; 351 char msg[LOG_MAX_LEN]; 352 353 if ((level&0xff) < server.verbosity) return; 354 355 va_start(ap, fmt); 356 vsnprintf(msg, sizeof(msg), fmt, ap); 357 va_end(ap); 358 359 serverLogRaw(level,msg); 360 } 361 362 /* Log a fixed message without printf-alike capabilities, in a way that is 363 * safe to call from a signal handler. 364 * 365 * We actually use this only for signals that are not fatal from the point 366 * of view of Redis. Signals that are going to kill the server anyway and 367 * where we need printf-alike features are served by serverLog(). */ 368 void serverLogFromHandler(int level, const char *msg) { 369 int fd; 370 int log_to_stdout = server.logfile[0] == '\0'; 371 char buf[64]; 372 373 if ((level&0xff) < server.verbosity || (log_to_stdout && server.daemonize)) 374 return; 375 fd = log_to_stdout ? STDOUT_FILENO : 376 open(server.logfile, O_APPEND|O_CREAT|O_WRONLY, 0644); 377 if (fd == -1) return; 378 ll2string(buf,sizeof(buf),getpid()); 379 if (write(fd,buf,strlen(buf)) == -1) goto err; 380 if (write(fd,":signal-handler (",17) == -1) goto err; 381 ll2string(buf,sizeof(buf),time(NULL)); 382 if (write(fd,buf,strlen(buf)) == -1) goto err; 383 if (write(fd,") ",2) == -1) goto err; 384 if (write(fd,msg,strlen(msg)) == -1) goto err; 385 if (write(fd,"\n",1) == -1) goto err; 386 err: 387 if (!log_to_stdout) close(fd); 388 } 389 390 /* Return the UNIX time in microseconds */ 391 long long ustime(void) { 392 struct timeval tv; 393 long long ust; 394 395 gettimeofday(&tv, NULL); 396 ust = ((long long)tv.tv_sec)*1000000; 397 ust += tv.tv_usec; 398 return ust; 399 } 400 401 /* Return the UNIX time in milliseconds */ 402 mstime_t mstime(void) { 403 return ustime()/1000; 404 } 405 406 /* After an RDB dump or AOF rewrite we exit from children using _exit() instead of 407 * exit(), because the latter may interact with the same file objects used by 408 * the parent process. However if we are testing the coverage normal exit() is 409 * used in order to obtain the right coverage information. */ 410 void exitFromChild(int retcode) { 411 #ifdef COVERAGE_TEST 412 exit(retcode); 413 #else 414 _exit(retcode); 415 #endif 416 } 417 418 /*====================== Hash table type implementation ==================== */ 419 420 /* This is a hash table type that uses the SDS dynamic strings library as 421 * keys and redis objects as values (objects can hold SDS strings, 422 * lists, sets). */ 423 424 void dictVanillaFree(void *privdata, void *val) 425 { 426 DICT_NOTUSED(privdata); 427 zfree(val); 428 } 429 430 void dictListDestructor(void *privdata, void *val) 431 { 432 DICT_NOTUSED(privdata); 433 listRelease((list*)val); 434 } 435 436 int dictSdsKeyCompare(void *privdata, const void *key1, 437 const void *key2) 438 { 439 int l1,l2; 440 DICT_NOTUSED(privdata); 441 442 l1 = sdslen((sds)key1); 443 l2 = sdslen((sds)key2); 444 if (l1 != l2) return 0; 445 return memcmp(key1, key2, l1) == 0; 446 } 447 448 /* A case insensitive version used for the command lookup table and other 449 * places where case insensitive non binary-safe comparison is needed. */ 450 int dictSdsKeyCaseCompare(void *privdata, const void *key1, 451 const void *key2) 452 { 453 DICT_NOTUSED(privdata); 454 455 return strcasecmp(key1, key2) == 0; 456 } 457 458 void dictObjectDestructor(void *privdata, void *val) 459 { 460 DICT_NOTUSED(privdata); 461 462 if (val == NULL) return; /* Values of swapped out keys as set to NULL */ 463 decrRefCount(val); 464 } 465 466 void dictSdsDestructor(void *privdata, void *val) 467 { 468 DICT_NOTUSED(privdata); 469 470 sdsfree(val); 471 } 472 473 int dictObjKeyCompare(void *privdata, const void *key1, 474 const void *key2) 475 { 476 const robj *o1 = key1, *o2 = key2; 477 return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); 478 } 479 480 unsigned int dictObjHash(const void *key) { 481 const robj *o = key; 482 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); 483 } 484 485 unsigned int dictSdsHash(const void *key) { 486 return dictGenHashFunction((unsigned char*)key, sdslen((char*)key)); 487 } 488 489 unsigned int dictSdsCaseHash(const void *key) { 490 return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key)); 491 } 492 493 int dictEncObjKeyCompare(void *privdata, const void *key1, 494 const void *key2) 495 { 496 robj *o1 = (robj*) key1, *o2 = (robj*) key2; 497 int cmp; 498 499 if (o1->encoding == OBJ_ENCODING_INT && 500 o2->encoding == OBJ_ENCODING_INT) 501 return o1->ptr == o2->ptr; 502 503 o1 = getDecodedObject(o1); 504 o2 = getDecodedObject(o2); 505 cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); 506 decrRefCount(o1); 507 decrRefCount(o2); 508 return cmp; 509 } 510 511 unsigned int dictEncObjHash(const void *key) { 512 robj *o = (robj*) key; 513 514 if (sdsEncodedObject(o)) { 515 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); 516 } else { 517 if (o->encoding == OBJ_ENCODING_INT) { 518 char buf[32]; 519 int len; 520 521 len = ll2string(buf,32,(long)o->ptr); 522 return dictGenHashFunction((unsigned char*)buf, len); 523 } else { 524 unsigned int hash; 525 526 o = getDecodedObject(o); 527 hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); 528 decrRefCount(o); 529 return hash; 530 } 531 } 532 } 533 534 /* Sets type hash table */ 535 dictType setDictType = { 536 dictEncObjHash, /* hash function */ 537 NULL, /* key dup */ 538 NULL, /* val dup */ 539 dictEncObjKeyCompare, /* key compare */ 540 dictObjectDestructor, /* key destructor */ 541 NULL /* val destructor */ 542 }; 543 544 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */ 545 dictType zsetDictType = { 546 dictEncObjHash, /* hash function */ 547 NULL, /* key dup */ 548 NULL, /* val dup */ 549 dictEncObjKeyCompare, /* key compare */ 550 dictObjectDestructor, /* key destructor */ 551 NULL /* val destructor */ 552 }; 553 554 /* Db->dict, keys are sds strings, vals are Redis objects. */ 555 dictType dbDictType = { 556 dictSdsHash, /* hash function */ 557 NULL, /* key dup */ 558 NULL, /* val dup */ 559 dictSdsKeyCompare, /* key compare */ 560 dictSdsDestructor, /* key destructor */ 561 dictObjectDestructor /* val destructor */ 562 }; 563 564 /* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */ 565 dictType shaScriptObjectDictType = { 566 dictSdsCaseHash, /* hash function */ 567 NULL, /* key dup */ 568 NULL, /* val dup */ 569 dictSdsKeyCaseCompare, /* key compare */ 570 dictSdsDestructor, /* key destructor */ 571 dictObjectDestructor /* val destructor */ 572 }; 573 574 /* Db->expires */ 575 dictType keyptrDictType = { 576 dictSdsHash, /* hash function */ 577 NULL, /* key dup */ 578 NULL, /* val dup */ 579 dictSdsKeyCompare, /* key compare */ 580 NULL, /* key destructor */ 581 NULL /* val destructor */ 582 }; 583 584 /* Command table. sds string -> command struct pointer. */ 585 dictType commandTableDictType = { 586 dictSdsCaseHash, /* hash function */ 587 NULL, /* key dup */ 588 NULL, /* val dup */ 589 dictSdsKeyCaseCompare, /* key compare */ 590 dictSdsDestructor, /* key destructor */ 591 NULL /* val destructor */ 592 }; 593 594 /* Hash type hash table (note that small hashes are represented with ziplists) */ 595 dictType hashDictType = { 596 dictEncObjHash, /* hash function */ 597 NULL, /* key dup */ 598 NULL, /* val dup */ 599 dictEncObjKeyCompare, /* key compare */ 600 dictObjectDestructor, /* key destructor */ 601 dictObjectDestructor /* val destructor */ 602 }; 603 604 /* Keylist hash table type has unencoded redis objects as keys and 605 * lists as values. It's used for blocking operations (BLPOP) and to 606 * map swapped keys to a list of clients waiting for this keys to be loaded. */ 607 dictType keylistDictType = { 608 dictObjHash, /* hash function */ 609 NULL, /* key dup */ 610 NULL, /* val dup */ 611 dictObjKeyCompare, /* key compare */ 612 dictObjectDestructor, /* key destructor */ 613 dictListDestructor /* val destructor */ 614 }; 615 616 /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to 617 * clusterNode structures. */ 618 dictType clusterNodesDictType = { 619 dictSdsHash, /* hash function */ 620 NULL, /* key dup */ 621 NULL, /* val dup */ 622 dictSdsKeyCompare, /* key compare */ 623 dictSdsDestructor, /* key destructor */ 624 NULL /* val destructor */ 625 }; 626 627 /* Cluster re-addition blacklist. This maps node IDs to the time 628 * we can re-add this node. The goal is to avoid readding a removed 629 * node for some time. */ 630 dictType clusterNodesBlackListDictType = { 631 dictSdsCaseHash, /* hash function */ 632 NULL, /* key dup */ 633 NULL, /* val dup */ 634 dictSdsKeyCaseCompare, /* key compare */ 635 dictSdsDestructor, /* key destructor */ 636 NULL /* val destructor */ 637 }; 638 639 /* Migrate cache dict type. */ 640 dictType migrateCacheDictType = { 641 dictSdsHash, /* hash function */ 642 NULL, /* key dup */ 643 NULL, /* val dup */ 644 dictSdsKeyCompare, /* key compare */ 645 dictSdsDestructor, /* key destructor */ 646 NULL /* val destructor */ 647 }; 648 649 /* Replication cached script dict (server.repl_scriptcache_dict). 650 * Keys are sds SHA1 strings, while values are not used at all in the current 651 * implementation. */ 652 dictType replScriptCacheDictType = { 653 dictSdsCaseHash, /* hash function */ 654 NULL, /* key dup */ 655 NULL, /* val dup */ 656 dictSdsKeyCaseCompare, /* key compare */ 657 dictSdsDestructor, /* key destructor */ 658 NULL /* val destructor */ 659 }; 660 661 int htNeedsResize(dict *dict) { 662 long long size, used; 663 664 size = dictSlots(dict); 665 used = dictSize(dict); 666 return (size && used && size > DICT_HT_INITIAL_SIZE && 667 (used*100/size < HASHTABLE_MIN_FILL)); 668 } 669 670 /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL 671 * we resize the hash table to save memory */ 672 void tryResizeHashTables(int dbid) { 673 if (htNeedsResize(server.db[dbid].dict)) 674 dictResize(server.db[dbid].dict); 675 if (htNeedsResize(server.db[dbid].expires)) 676 dictResize(server.db[dbid].expires); 677 } 678 679 /* Our hash table implementation performs rehashing incrementally while 680 * we write/read from the hash table. Still if the server is idle, the hash 681 * table will use two tables for a long time. So we try to use 1 millisecond 682 * of CPU time at every call of this function to perform some rehahsing. 683 * 684 * The function returns 1 if some rehashing was performed, otherwise 0 685 * is returned. */ 686 int incrementallyRehash(int dbid) { 687 /* Keys dictionary */ 688 if (dictIsRehashing(server.db[dbid].dict)) { 689 dictRehashMilliseconds(server.db[dbid].dict,1); 690 return 1; /* already used our millisecond for this loop... */ 691 } 692 /* Expires */ 693 if (dictIsRehashing(server.db[dbid].expires)) { 694 dictRehashMilliseconds(server.db[dbid].expires,1); 695 return 1; /* already used our millisecond for this loop... */ 696 } 697 return 0; 698 } 699 700 /* This function is called once a background process of some kind terminates, 701 * as we want to avoid resizing the hash tables when there is a child in order 702 * to play well with copy-on-write (otherwise when a resize happens lots of 703 * memory pages are copied). The goal of this function is to update the ability 704 * for dict.c to resize the hash tables accordingly to the fact we have o not 705 * running childs. */ 706 void updateDictResizePolicy(void) { 707 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) 708 dictEnableResize(); 709 else 710 dictDisableResize(); 711 } 712 713 /* ======================= Cron: called every 100 ms ======================== */ 714 715 /* Helper function for the activeExpireCycle() function. 716 * This function will try to expire the key that is stored in the hash table 717 * entry 'de' of the 'expires' hash table of a Redis database. 718 * 719 * If the key is found to be expired, it is removed from the database and 720 * 1 is returned. Otherwise no operation is performed and 0 is returned. 721 * 722 * When a key is expired, server.stat_expiredkeys is incremented. 723 * 724 * The parameter 'now' is the current time in milliseconds as is passed 725 * to the function to avoid too many gettimeofday() syscalls. */ 726 int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { 727 long long t = dictGetSignedIntegerVal(de); 728 if (now > t) { 729 sds key = dictGetKey(de); 730 robj *keyobj = createStringObject(key,sdslen(key)); 731 732 propagateExpire(db,keyobj); 733 dbDelete(db,keyobj); 734 notifyKeyspaceEvent(NOTIFY_EXPIRED, 735 "expired",keyobj,db->id); 736 decrRefCount(keyobj); 737 server.stat_expiredkeys++; 738 return 1; 739 } else { 740 return 0; 741 } 742 } 743 744 /* Try to expire a few timed out keys. The algorithm used is adaptive and 745 * will use few CPU cycles if there are few expiring keys, otherwise 746 * it will get more aggressive to avoid that too much memory is used by 747 * keys that can be removed from the keyspace. 748 * 749 * No more than CRON_DBS_PER_CALL databases are tested at every 750 * iteration. 751 * 752 * This kind of call is used when Redis detects that timelimit_exit is 753 * true, so there is more work to do, and we do it more incrementally from 754 * the beforeSleep() function of the event loop. 755 * 756 * Expire cycle type: 757 * 758 * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a 759 * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION 760 * microseconds, and is not repeated again before the same amount of time. 761 * 762 * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is 763 * executed, where the time limit is a percentage of the REDIS_HZ period 764 * as specified by the REDIS_EXPIRELOOKUPS_TIME_PERC define. */ 765 766 void activeExpireCycle(int type) { 767 /* This function has some global state in order to continue the work 768 * incrementally across calls. */ 769 static unsigned int current_db = 0; /* Last DB tested. */ 770 static int timelimit_exit = 0; /* Time limit hit in previous call? */ 771 static long long last_fast_cycle = 0; /* When last fast cycle ran. */ 772 773 int j, iteration = 0; 774 int dbs_per_call = CRON_DBS_PER_CALL; 775 long long start = ustime(), timelimit; 776 777 if (type == ACTIVE_EXPIRE_CYCLE_FAST) { 778 /* Don't start a fast cycle if the previous cycle did not exited 779 * for time limt. Also don't repeat a fast cycle for the same period 780 * as the fast cycle total duration itself. */ 781 if (!timelimit_exit) return; 782 if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return; 783 last_fast_cycle = start; 784 } 785 786 /* We usually should test CRON_DBS_PER_CALL per iteration, with 787 * two exceptions: 788 * 789 * 1) Don't test more DBs than we have. 790 * 2) If last time we hit the time limit, we want to scan all DBs 791 * in this iteration, as there is work to do in some DB and we don't want 792 * expired keys to use memory for too much time. */ 793 if (dbs_per_call > server.dbnum || timelimit_exit) 794 dbs_per_call = server.dbnum; 795 796 /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time 797 * per iteration. Since this function gets called with a frequency of 798 * server.hz times per second, the following is the max amount of 799 * microseconds we can spend in this function. */ 800 timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100; 801 timelimit_exit = 0; 802 if (timelimit <= 0) timelimit = 1; 803 804 if (type == ACTIVE_EXPIRE_CYCLE_FAST) 805 timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */ 806 807 for (j = 0; j < dbs_per_call; j++) { 808 int expired; 809 redisDb *db = server.db+(current_db % server.dbnum); 810 811 /* Increment the DB now so we are sure if we run out of time 812 * in the current DB we'll restart from the next. This allows to 813 * distribute the time evenly across DBs. */ 814 current_db++; 815 816 /* Continue to expire if at the end of the cycle more than 25% 817 * of the keys were expired. */ 818 do { 819 unsigned long num, slots; 820 long long now, ttl_sum; 821 int ttl_samples; 822 823 /* If there is nothing to expire try next DB ASAP. */ 824 if ((num = dictSize(db->expires)) == 0) { 825 db->avg_ttl = 0; 826 break; 827 } 828 slots = dictSlots(db->expires); 829 now = mstime(); 830 831 /* When there are less than 1% filled slots getting random 832 * keys is expensive, so stop here waiting for better times... 833 * The dictionary will be resized asap. */ 834 if (num && slots > DICT_HT_INITIAL_SIZE && 835 (num*100/slots < 1)) break; 836 837 /* The main collection cycle. Sample random keys among keys 838 * with an expire set, checking for expired ones. */ 839 expired = 0; 840 ttl_sum = 0; 841 ttl_samples = 0; 842 843 if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) 844 num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; 845 846 while (num--) { 847 dictEntry *de; 848 long long ttl; 849 850 if ((de = dictGetRandomKey(db->expires)) == NULL) break; 851 ttl = dictGetSignedIntegerVal(de)-now; 852 if (activeExpireCycleTryExpire(db,de,now)) expired++; 853 if (ttl > 0) { 854 /* We want the average TTL of keys yet not expired. */ 855 ttl_sum += ttl; 856 ttl_samples++; 857 } 858 } 859 860 /* Update the average TTL stats for this database. */ 861 if (ttl_samples) { 862 long long avg_ttl = ttl_sum/ttl_samples; 863 864 /* Do a simple running average with a few samples. 865 * We just use the current estimate with a weight of 2% 866 * and the previous estimate with a weight of 98%. */ 867 if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; 868 db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50); 869 } 870 871 /* We can't block forever here even if there are many keys to 872 * expire. So after a given amount of milliseconds return to the 873 * caller waiting for the other active expire cycle. */ 874 iteration++; 875 if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */ 876 long long elapsed = ustime()-start; 877 878 latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); 879 if (elapsed > timelimit) timelimit_exit = 1; 880 } 881 if (timelimit_exit) return; 882 /* We don't repeat the cycle if there are less than 25% of keys 883 * found expired in the current DB. */ 884 } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); 885 } 886 } 887 888 unsigned int getLRUClock(void) { 889 return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX; 890 } 891 892 /* Add a sample to the operations per second array of samples. */ 893 void trackInstantaneousMetric(int metric, long long current_reading) { 894 long long t = mstime() - server.inst_metric[metric].last_sample_time; 895 long long ops = current_reading - 896 server.inst_metric[metric].last_sample_count; 897 long long ops_sec; 898 899 ops_sec = t > 0 ? (ops*1000/t) : 0; 900 901 server.inst_metric[metric].samples[server.inst_metric[metric].idx] = 902 ops_sec; 903 server.inst_metric[metric].idx++; 904 server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES; 905 server.inst_metric[metric].last_sample_time = mstime(); 906 server.inst_metric[metric].last_sample_count = current_reading; 907 } 908 909 /* Return the mean of all the samples. */ 910 long long getInstantaneousMetric(int metric) { 911 int j; 912 long long sum = 0; 913 914 for (j = 0; j < STATS_METRIC_SAMPLES; j++) 915 sum += server.inst_metric[metric].samples[j]; 916 return sum / STATS_METRIC_SAMPLES; 917 } 918 919 /* Check for timeouts. Returns non-zero if the client was terminated. 920 * The function gets the current time in milliseconds as argument since 921 * it gets called multiple times in a loop, so calling gettimeofday() for 922 * each iteration would be costly without any actual gain. */ 923 int clientsCronHandleTimeout(client *c, mstime_t now_ms) { 924 time_t now = now_ms/1000; 925 926 if (server.maxidletime && 927 !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */ 928 !(c->flags & CLIENT_MASTER) && /* no timeout for masters */ 929 !(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */ 930 !(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */ 931 (now - c->lastinteraction > server.maxidletime)) 932 { 933 serverLog(LL_VERBOSE,"Closing idle client"); 934 freeClient(c); 935 return 1; 936 } else if (c->flags & CLIENT_BLOCKED) { 937 /* Blocked OPS timeout is handled with milliseconds resolution. 938 * However note that the actual resolution is limited by 939 * server.hz. */ 940 941 if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) { 942 /* Handle blocking operation specific timeout. */ 943 replyToBlockedClientTimedOut(c); 944 unblockClient(c); 945 } else if (server.cluster_enabled) { 946 /* Cluster: handle unblock & redirect of clients blocked 947 * into keys no longer served by this server. */ 948 if (clusterRedirectBlockedClientIfNeeded(c)) 949 unblockClient(c); 950 } 951 } 952 return 0; 953 } 954 955 /* The client query buffer is an sds.c string that can end with a lot of 956 * free space not used, this function reclaims space if needed. 957 * 958 * The function always returns 0 as it never terminates the client. */ 959 int clientsCronResizeQueryBuffer(client *c) { 960 size_t querybuf_size = sdsAllocSize(c->querybuf); 961 time_t idletime = server.unixtime - c->lastinteraction; 962 963 /* There are two conditions to resize the query buffer: 964 * 1) Query buffer is > BIG_ARG and too big for latest peak. 965 * 2) Client is inactive and the buffer is bigger than 1k. */ 966 if (((querybuf_size > PROTO_MBULK_BIG_ARG) && 967 (querybuf_size/(c->querybuf_peak+1)) > 2) || 968 (querybuf_size > 1024 && idletime > 2)) 969 { 970 /* Only resize the query buffer if it is actually wasting space. */ 971 if (sdsavail(c->querybuf) > 1024) { 972 c->querybuf = sdsRemoveFreeSpace(c->querybuf); 973 } 974 } 975 /* Reset the peak again to capture the peak memory usage in the next 976 * cycle. */ 977 c->querybuf_peak = 0; 978 return 0; 979 } 980 981 #define CLIENTS_CRON_MIN_ITERATIONS 5 982 void clientsCron(void) { 983 /* Make sure to process at least numclients/server.hz of clients 984 * per call. Since this function is called server.hz times per second 985 * we are sure that in the worst case we process all the clients in 1 986 * second. */ 987 int numclients = listLength(server.clients); 988 int iterations = numclients/server.hz; 989 mstime_t now = mstime(); 990 991 /* Process at least a few clients while we are at it, even if we need 992 * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract 993 * of processing each client once per second. */ 994 if (iterations < CLIENTS_CRON_MIN_ITERATIONS) 995 iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ? 996 numclients : CLIENTS_CRON_MIN_ITERATIONS; 997 998 while(listLength(server.clients) && iterations--) { 999 client *c; 1000 listNode *head; 1001 1002 /* Rotate the list, take the current head, process. 1003 * This way if the client must be removed from the list it's the 1004 * first element and we don't incur into O(N) computation. */ 1005 listRotate(server.clients); 1006 head = listFirst(server.clients); 1007 c = listNodeValue(head); 1008 /* The following functions do different service checks on the client. 1009 * The protocol is that they return non-zero if the client was 1010 * terminated. */ 1011 if (clientsCronHandleTimeout(c,now)) continue; 1012 if (clientsCronResizeQueryBuffer(c)) continue; 1013 } 1014 } 1015 1016 /* This function handles 'background' operations we are required to do 1017 * incrementally in Redis databases, such as active key expiring, resizing, 1018 * rehashing. */ 1019 void databasesCron(void) { 1020 /* Expire keys by random sampling. Not required for slaves 1021 * as master will synthesize DELs for us. */ 1022 if (server.active_expire_enabled && server.masterhost == NULL) 1023 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); 1024 1025 /* Perform hash tables rehashing if needed, but only if there are no 1026 * other processes saving the DB on disk. Otherwise rehashing is bad 1027 * as will cause a lot of copy-on-write of memory pages. */ 1028 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { 1029 /* We use global counters so if we stop the computation at a given 1030 * DB we'll be able to start from the successive in the next 1031 * cron loop iteration. */ 1032 static unsigned int resize_db = 0; 1033 static unsigned int rehash_db = 0; 1034 int dbs_per_call = CRON_DBS_PER_CALL; 1035 int j; 1036 1037 /* Don't test more DBs than we have. */ 1038 if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; 1039 1040 /* Resize */ 1041 for (j = 0; j < dbs_per_call; j++) { 1042 tryResizeHashTables(resize_db % server.dbnum); 1043 resize_db++; 1044 } 1045 1046 /* Rehash */ 1047 if (server.activerehashing) { 1048 for (j = 0; j < dbs_per_call; j++) { 1049 int work_done = incrementallyRehash(rehash_db % server.dbnum); 1050 rehash_db++; 1051 if (work_done) { 1052 /* If the function did some work, stop here, we'll do 1053 * more at the next cron loop. */ 1054 break; 1055 } 1056 } 1057 } 1058 } 1059 } 1060 1061 /* We take a cached value of the unix time in the global state because with 1062 * virtual memory and aging there is to store the current time in objects at 1063 * every object access, and accuracy is not needed. To access a global var is 1064 * a lot faster than calling time(NULL) */ 1065 void updateCachedTime(void) { 1066 server.unixtime = time(NULL); 1067 server.mstime = mstime(); 1068 } 1069 1070 /* This is our timer interrupt, called server.hz times per second. 1071 * Here is where we do a number of things that need to be done asynchronously. 1072 * For instance: 1073 * 1074 * - Active expired keys collection (it is also performed in a lazy way on 1075 * lookup). 1076 * - Software watchdog. 1077 * - Update some statistic. 1078 * - Incremental rehashing of the DBs hash tables. 1079 * - Triggering BGSAVE / AOF rewrite, and handling of terminated children. 1080 * - Clients timeout of different kinds. 1081 * - Replication reconnection. 1082 * - Many more... 1083 * 1084 * Everything directly called here will be called server.hz times per second, 1085 * so in order to throttle execution of things we want to do less frequently 1086 * a macro is used: run_with_period(milliseconds) { .... } 1087 */ 1088 1089 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { 1090 int j; 1091 UNUSED(eventLoop); 1092 UNUSED(id); 1093 UNUSED(clientData); 1094 1095 /* Software watchdog: deliver the SIGALRM that will reach the signal 1096 * handler if we don't return here fast enough. */ 1097 if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); 1098 1099 /* Update the time cache. */ 1100 updateCachedTime(); 1101 1102 run_with_period(100) { 1103 trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands); 1104 trackInstantaneousMetric(STATS_METRIC_NET_INPUT, 1105 server.stat_net_input_bytes); 1106 trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, 1107 server.stat_net_output_bytes); 1108 } 1109 1110 /* We have just LRU_BITS bits per object for LRU information. 1111 * So we use an (eventually wrapping) LRU clock. 1112 * 1113 * Note that even if the counter wraps it's not a big problem, 1114 * everything will still work but some object will appear younger 1115 * to Redis. However for this to happen a given object should never be 1116 * touched for all the time needed to the counter to wrap, which is 1117 * not likely. 1118 * 1119 * Note that you can change the resolution altering the 1120 * LRU_CLOCK_RESOLUTION define. */ 1121 server.lruclock = getLRUClock(); 1122 1123 /* Record the max memory used since the server was started. */ 1124 if (zmalloc_used_memory() > server.stat_peak_memory) 1125 server.stat_peak_memory = zmalloc_used_memory(); 1126 1127 /* Sample the RSS here since this is a relatively slow call. */ 1128 server.resident_set_size = zmalloc_get_rss(); 1129 1130 /* We received a SIGTERM, shutting down here in a safe way, as it is 1131 * not ok doing so inside the signal handler. */ 1132 if (server.shutdown_asap) { 1133 if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0); 1134 serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); 1135 server.shutdown_asap = 0; 1136 } 1137 1138 /* Show some info about non-empty databases */ 1139 run_with_period(5000) { 1140 for (j = 0; j < server.dbnum; j++) { 1141 long long size, used, vkeys; 1142 1143 size = dictSlots(server.db[j].dict); 1144 used = dictSize(server.db[j].dict); 1145 vkeys = dictSize(server.db[j].expires); 1146 if (used || vkeys) { 1147 serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); 1148 /* dictPrintStats(server.dict); */ 1149 } 1150 } 1151 } 1152 1153 /* Show information about connected clients */ 1154 if (!server.sentinel_mode) { 1155 run_with_period(5000) { 1156 serverLog(LL_VERBOSE, 1157 "%lu clients connected (%lu slaves), %zu bytes in use", 1158 listLength(server.clients)-listLength(server.slaves), 1159 listLength(server.slaves), 1160 zmalloc_used_memory()); 1161 } 1162 } 1163 1164 /* We need to do a few operations on clients asynchronously. */ 1165 clientsCron(); 1166 1167 /* Handle background operations on Redis databases. */ 1168 databasesCron(); 1169 1170 /* Start a scheduled AOF rewrite if this was requested by the user while 1171 * a BGSAVE was in progress. */ 1172 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && 1173 server.aof_rewrite_scheduled) 1174 { 1175 rewriteAppendOnlyFileBackground(); 1176 } 1177 1178 /* Check if a background saving or AOF rewrite in progress terminated. */ 1179 if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || 1180 ldbPendingChildren()) 1181 { 1182 int statloc; 1183 pid_t pid; 1184 1185 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { 1186 int exitcode = WEXITSTATUS(statloc); 1187 int bysignal = 0; 1188 1189 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); 1190 1191 if (pid == -1) { 1192 serverLog(LL_WARNING,"wait3() returned an error: %s. " 1193 "rdb_child_pid = %d, aof_child_pid = %d", 1194 strerror(errno), 1195 (int) server.rdb_child_pid, 1196 (int) server.aof_child_pid); 1197 } else if (pid == server.rdb_child_pid) { 1198 backgroundSaveDoneHandler(exitcode,bysignal); 1199 } else if (pid == server.aof_child_pid) { 1200 backgroundRewriteDoneHandler(exitcode,bysignal); 1201 } else { 1202 if (!ldbRemoveChild(pid)) { 1203 serverLog(LL_WARNING, 1204 "Warning, detected child with unmatched pid: %ld", 1205 (long)pid); 1206 } 1207 } 1208 updateDictResizePolicy(); 1209 } 1210 } else { 1211 /* If there is not a background saving/rewrite in progress check if 1212 * we have to save/rewrite now */ 1213 for (j = 0; j < server.saveparamslen; j++) { 1214 struct saveparam *sp = server.saveparams+j; 1215 1216 /* Save if we reached the given amount of changes, 1217 * the given amount of seconds, and if the latest bgsave was 1218 * successful or if, in case of an error, at least 1219 * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */ 1220 if (server.dirty >= sp->changes && 1221 server.unixtime-server.lastsave > sp->seconds && 1222 (server.unixtime-server.lastbgsave_try > 1223 CONFIG_BGSAVE_RETRY_DELAY || 1224 server.lastbgsave_status == C_OK)) 1225 { 1226 serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", 1227 sp->changes, (int)sp->seconds); 1228 rdbSaveBackground(server.rdb_filename); 1229 break; 1230 } 1231 } 1232 1233 /* Trigger an AOF rewrite if needed */ 1234 if (server.rdb_child_pid == -1 && 1235 server.aof_child_pid == -1 && 1236 server.aof_rewrite_perc && 1237 server.aof_current_size > server.aof_rewrite_min_size) 1238 { 1239 long long base = server.aof_rewrite_base_size ? 1240 server.aof_rewrite_base_size : 1; 1241 long long growth = (server.aof_current_size*100/base) - 100; 1242 if (growth >= server.aof_rewrite_perc) { 1243 serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); 1244 rewriteAppendOnlyFileBackground(); 1245 } 1246 } 1247 } 1248 1249 1250 /* AOF postponed flush: Try at every cron cycle if the slow fsync 1251 * completed. */ 1252 if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); 1253 1254 /* AOF write errors: in this case we have a buffer to flush as well and 1255 * clear the AOF error in case of success to make the DB writable again, 1256 * however to try every second is enough in case of 'hz' is set to 1257 * an higher frequency. */ 1258 run_with_period(1000) { 1259 if (server.aof_last_write_status == C_ERR) 1260 flushAppendOnlyFile(0); 1261 } 1262 1263 /* Close clients that need to be closed asynchronous */ 1264 freeClientsInAsyncFreeQueue(); 1265 1266 /* Clear the paused clients flag if needed. */ 1267 clientsArePaused(); /* Don't check return value, just use the side effect. */ 1268 1269 /* Replication cron function -- used to reconnect to master and 1270 * to detect transfer failures. */ 1271 run_with_period(1000) replicationCron(); 1272 1273 /* Run the Redis Cluster cron. */ 1274 run_with_period(100) { 1275 if (server.cluster_enabled) clusterCron(); 1276 } 1277 1278 /* Run the Sentinel timer if we are in sentinel mode. */ 1279 run_with_period(100) { 1280 if (server.sentinel_mode) sentinelTimer(); 1281 } 1282 1283 /* Cleanup expired MIGRATE cached sockets. */ 1284 run_with_period(1000) { 1285 migrateCloseTimedoutSockets(); 1286 } 1287 1288 server.cronloops++; 1289 return 1000/server.hz; 1290 } 1291 1292 /* This function gets called every time Redis is entering the 1293 * main loop of the event driven library, that is, before to sleep 1294 * for ready file descriptors. */ 1295 void beforeSleep(struct aeEventLoop *eventLoop) { 1296 UNUSED(eventLoop); 1297 1298 /* Call the Redis Cluster before sleep function. Note that this function 1299 * may change the state of Redis Cluster (from ok to fail or vice versa), 1300 * so it's a good idea to call it before serving the unblocked clients 1301 * later in this function. */ 1302 if (server.cluster_enabled) clusterBeforeSleep(); 1303 1304 /* Run a fast expire cycle (the called function will return 1305 * ASAP if a fast cycle is not needed). */ 1306 if (server.active_expire_enabled && server.masterhost == NULL) 1307 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); 1308 1309 /* Send all the slaves an ACK request if at least one client blocked 1310 * during the previous event loop iteration. */ 1311 if (server.get_ack_from_slaves) { 1312 robj *argv[3]; 1313 1314 argv[0] = createStringObject("REPLCONF",8); 1315 argv[1] = createStringObject("GETACK",6); 1316 argv[2] = createStringObject("*",1); /* Not used argument. */ 1317 replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); 1318 decrRefCount(argv[0]); 1319 decrRefCount(argv[1]); 1320 decrRefCount(argv[2]); 1321 server.get_ack_from_slaves = 0; 1322 } 1323 1324 /* Unblock all the clients blocked for synchronous replication 1325 * in WAIT. */ 1326 if (listLength(server.clients_waiting_acks)) 1327 processClientsWaitingReplicas(); 1328 1329 /* Try to process pending commands for clients that were just unblocked. */ 1330 if (listLength(server.unblocked_clients)) 1331 processUnblockedClients(); 1332 1333 /* Write the AOF buffer on disk */ 1334 flushAppendOnlyFile(0); 1335 1336 /* Handle writes with pending output buffers. */ 1337 handleClientsWithPendingWrites(); 1338 } 1339 1340 /* =========================== Server initialization ======================== */ 1341 1342 void createSharedObjects(void) { 1343 int j; 1344 1345 shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n")); 1346 shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n")); 1347 shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n")); 1348 shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n")); 1349 shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n")); 1350 shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n")); 1351 shared.cnegone = createObject(OBJ_STRING,sdsnew(":-1\r\n")); 1352 shared.nullbulk = createObject(OBJ_STRING,sdsnew("$-1\r\n")); 1353 shared.nullmultibulk = createObject(OBJ_STRING,sdsnew("*-1\r\n")); 1354 shared.emptymultibulk = createObject(OBJ_STRING,sdsnew("*0\r\n")); 1355 shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n")); 1356 shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n")); 1357 shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n")); 1358 shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew( 1359 "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")); 1360 shared.nokeyerr = createObject(OBJ_STRING,sdsnew( 1361 "-ERR no such key\r\n")); 1362 shared.syntaxerr = createObject(OBJ_STRING,sdsnew( 1363 "-ERR syntax error\r\n")); 1364 shared.sameobjecterr = createObject(OBJ_STRING,sdsnew( 1365 "-ERR source and destination objects are the same\r\n")); 1366 shared.outofrangeerr = createObject(OBJ_STRING,sdsnew( 1367 "-ERR index out of range\r\n")); 1368 shared.noscripterr = createObject(OBJ_STRING,sdsnew( 1369 "-NOSCRIPT No matching script. Please use EVAL.\r\n")); 1370 shared.loadingerr = createObject(OBJ_STRING,sdsnew( 1371 "-LOADING Redis is loading the dataset in memory\r\n")); 1372 shared.slowscripterr = createObject(OBJ_STRING,sdsnew( 1373 "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n")); 1374 shared.masterdownerr = createObject(OBJ_STRING,sdsnew( 1375 "-MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.\r\n")); 1376 shared.bgsaveerr = createObject(OBJ_STRING,sdsnew( 1377 "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.\r\n")); 1378 shared.roslaveerr = createObject(OBJ_STRING,sdsnew( 1379 "-READONLY You can't write against a read only slave.\r\n")); 1380 shared.noautherr = createObject(OBJ_STRING,sdsnew( 1381 "-NOAUTH Authentication required.\r\n")); 1382 shared.oomerr = createObject(OBJ_STRING,sdsnew( 1383 "-OOM command not allowed when used memory > 'maxmemory'.\r\n")); 1384 shared.execaborterr = createObject(OBJ_STRING,sdsnew( 1385 "-EXECABORT Transaction discarded because of previous errors.\r\n")); 1386 shared.noreplicaserr = createObject(OBJ_STRING,sdsnew( 1387 "-NOREPLICAS Not enough good slaves to write.\r\n")); 1388 shared.busykeyerr = createObject(OBJ_STRING,sdsnew( 1389 "-BUSYKEY Target key name already exists.\r\n")); 1390 shared.space = createObject(OBJ_STRING,sdsnew(" ")); 1391 shared.colon = createObject(OBJ_STRING,sdsnew(":")); 1392 shared.plus = createObject(OBJ_STRING,sdsnew("+")); 1393 1394 for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) { 1395 char dictid_str[64]; 1396 int dictid_len; 1397 1398 dictid_len = ll2string(dictid_str,sizeof(dictid_str),j); 1399 shared.select[j] = createObject(OBJ_STRING, 1400 sdscatprintf(sdsempty(), 1401 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", 1402 dictid_len, dictid_str)); 1403 } 1404 shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13); 1405 shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14); 1406 shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15); 1407 shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18); 1408 shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); 1409 shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); 1410 shared.del = createStringObject("DEL",3); 1411 shared.rpop = createStringObject("RPOP",4); 1412 shared.lpop = createStringObject("LPOP",4); 1413 shared.lpush = createStringObject("LPUSH",5); 1414 for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { 1415 shared.integers[j] = createObject(OBJ_STRING,(void*)(long)j); 1416 shared.integers[j]->encoding = OBJ_ENCODING_INT; 1417 } 1418 for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) { 1419 shared.mbulkhdr[j] = createObject(OBJ_STRING, 1420 sdscatprintf(sdsempty(),"*%d\r\n",j)); 1421 shared.bulkhdr[j] = createObject(OBJ_STRING, 1422 sdscatprintf(sdsempty(),"$%d\r\n",j)); 1423 } 1424 /* The following two shared objects, minstring and maxstrings, are not 1425 * actually used for their value but as a special object meaning 1426 * respectively the minimum possible string and the maximum possible 1427 * string in string comparisons for the ZRANGEBYLEX command. */ 1428 shared.minstring = createStringObject("minstring",9); 1429 shared.maxstring = createStringObject("maxstring",9); 1430 } 1431 1432 void initServerConfig(void) { 1433 int j; 1434 1435 getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE); 1436 server.configfile = NULL; 1437 server.executable = NULL; 1438 server.hz = CONFIG_DEFAULT_HZ; 1439 server.runid[CONFIG_RUN_ID_SIZE] = '\0'; 1440 server.arch_bits = (sizeof(long) == 8) ? 64 : 32; 1441 server.port = CONFIG_DEFAULT_SERVER_PORT; 1442 server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG; 1443 server.bindaddr_count = 0; 1444 server.unixsocket = NULL; 1445 server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM; 1446 server.ipfd_count = 0; 1447 server.sofd = -1; 1448 server.protected_mode = CONFIG_DEFAULT_PROTECTED_MODE; 1449 server.dbnum = CONFIG_DEFAULT_DBNUM; 1450 server.verbosity = CONFIG_DEFAULT_VERBOSITY; 1451 server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT; 1452 server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE; 1453 server.active_expire_enabled = 1; 1454 server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN; 1455 server.saveparams = NULL; 1456 server.loading = 0; 1457 server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE); 1458 server.syslog_enabled = CONFIG_DEFAULT_SYSLOG_ENABLED; 1459 server.syslog_ident = zstrdup(CONFIG_DEFAULT_SYSLOG_IDENT); 1460 server.syslog_facility = LOG_LOCAL0; 1461 server.daemonize = CONFIG_DEFAULT_DAEMONIZE; 1462 server.supervised = 0; 1463 server.supervised_mode = SUPERVISED_NONE; 1464 server.aof_state = AOF_OFF; 1465 server.aof_fsync = CONFIG_DEFAULT_AOF_FSYNC; 1466 server.aof_no_fsync_on_rewrite = CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE; 1467 server.aof_rewrite_perc = AOF_REWRITE_PERC; 1468 server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE; 1469 server.aof_rewrite_base_size = 0; 1470 server.aof_rewrite_scheduled = 0; 1471 server.aof_last_fsync = time(NULL); 1472 server.aof_rewrite_time_last = -1; 1473 server.aof_rewrite_time_start = -1; 1474 server.aof_lastbgrewrite_status = C_OK; 1475 server.aof_delayed_fsync = 0; 1476 server.aof_fd = -1; 1477 server.aof_selected_db = -1; /* Make sure the first time will not match */ 1478 server.aof_flush_postponed_start = 0; 1479 server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; 1480 server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; 1481 server.pidfile = NULL; 1482 server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME); 1483 server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME); 1484 server.requirepass = NULL; 1485 server.rdb_compression = CONFIG_DEFAULT_RDB_COMPRESSION; 1486 server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM; 1487 server.stop_writes_on_bgsave_err = CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR; 1488 server.activerehashing = CONFIG_DEFAULT_ACTIVE_REHASHING; 1489 server.notify_keyspace_events = 0; 1490 server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS; 1491 server.bpop_blocked_clients = 0; 1492 server.maxmemory = CONFIG_DEFAULT_MAXMEMORY; 1493 server.maxmemory_policy = CONFIG_DEFAULT_MAXMEMORY_POLICY; 1494 server.maxmemory_samples = CONFIG_DEFAULT_MAXMEMORY_SAMPLES; 1495 server.hash_max_ziplist_entries = OBJ_HASH_MAX_ZIPLIST_ENTRIES; 1496 server.hash_max_ziplist_value = OBJ_HASH_MAX_ZIPLIST_VALUE; 1497 server.list_max_ziplist_size = OBJ_LIST_MAX_ZIPLIST_SIZE; 1498 server.list_compress_depth = OBJ_LIST_COMPRESS_DEPTH; 1499 server.set_max_intset_entries = OBJ_SET_MAX_INTSET_ENTRIES; 1500 server.zset_max_ziplist_entries = OBJ_ZSET_MAX_ZIPLIST_ENTRIES; 1501 server.zset_max_ziplist_value = OBJ_ZSET_MAX_ZIPLIST_VALUE; 1502 server.hll_sparse_max_bytes = CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES; 1503 server.shutdown_asap = 0; 1504 server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD; 1505 server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT; 1506 server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE; 1507 server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG; 1508 server.cluster_enabled = 0; 1509 server.cluster_node_timeout = CLUSTER_DEFAULT_NODE_TIMEOUT; 1510 server.cluster_migration_barrier = CLUSTER_DEFAULT_MIGRATION_BARRIER; 1511 server.cluster_slave_validity_factor = CLUSTER_DEFAULT_SLAVE_VALIDITY; 1512 server.cluster_require_full_coverage = CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE; 1513 server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE); 1514 server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); 1515 server.next_client_id = 1; /* Client IDs, start from 1 .*/ 1516 server.loading_process_events_interval_bytes = (1024*1024*2); 1517 1518 server.lruclock = getLRUClock(); 1519 resetServerSaveParams(); 1520 1521 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ 1522 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */ 1523 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */ 1524 /* Replication related */ 1525 server.masterauth = NULL; 1526 server.masterhost = NULL; 1527 server.masterport = 6379; 1528 server.master = NULL; 1529 server.cached_master = NULL; 1530 server.repl_master_initial_offset = -1; 1531 server.repl_state = REPL_STATE_NONE; 1532 server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; 1533 server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA; 1534 server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY; 1535 server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ 1536 server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY; 1537 server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC; 1538 server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY; 1539 server.slave_priority = CONFIG_DEFAULT_SLAVE_PRIORITY; 1540 server.master_repl_offset = 0; 1541 1542 /* Replication partial resync backlog */ 1543 server.repl_backlog = NULL; 1544 server.repl_backlog_size = CONFIG_DEFAULT_REPL_BACKLOG_SIZE; 1545 server.repl_backlog_histlen = 0; 1546 server.repl_backlog_idx = 0; 1547 server.repl_backlog_off = 0; 1548 server.repl_backlog_time_limit = CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT; 1549 server.repl_no_slaves_since = time(NULL); 1550 1551 /* Client output buffer limits */ 1552 for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++) 1553 server.client_obuf_limits[j] = clientBufferLimitsDefaults[j]; 1554 1555 /* Double constants initialization */ 1556 R_Zero = 0.0; 1557 R_PosInf = 1.0/R_Zero; 1558 R_NegInf = -1.0/R_Zero; 1559 R_Nan = R_Zero/R_Zero; 1560 1561 /* Command table -- we initiialize it here as it is part of the 1562 * initial configuration, since command names may be changed via 1563 * redis.conf using the rename-command directive. */ 1564 server.commands = dictCreate(&commandTableDictType,NULL); 1565 server.orig_commands = dictCreate(&commandTableDictType,NULL); 1566 populateCommandTable(); 1567 server.delCommand = lookupCommandByCString("del"); 1568 server.multiCommand = lookupCommandByCString("multi"); 1569 server.lpushCommand = lookupCommandByCString("lpush"); 1570 server.lpopCommand = lookupCommandByCString("lpop"); 1571 server.rpopCommand = lookupCommandByCString("rpop"); 1572 server.sremCommand = lookupCommandByCString("srem"); 1573 server.execCommand = lookupCommandByCString("exec"); 1574 1575 /* Slow log */ 1576 server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN; 1577 server.slowlog_max_len = CONFIG_DEFAULT_SLOWLOG_MAX_LEN; 1578 1579 /* Latency monitor */ 1580 server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD; 1581 1582 /* Debugging */ 1583 server.assert_failed = "<no assertion failed>"; 1584 server.assert_file = "<no file>"; 1585 server.assert_line = 0; 1586 server.bug_report_start = 0; 1587 server.watchdog_period = 0; 1588 } 1589 1590 extern char **environ; 1591 1592 /* Restart the server, executing the same executable that started this 1593 * instance, with the same arguments and configuration file. 1594 * 1595 * The function is designed to directly call execve() so that the new 1596 * server instance will retain the PID of the previous one. 1597 * 1598 * The list of flags, that may be bitwise ORed together, alter the 1599 * behavior of this function: 1600 * 1601 * RESTART_SERVER_NONE No flags. 1602 * RESTART_SERVER_GRACEFULLY Do a proper shutdown before restarting. 1603 * RESTART_SERVER_CONFIG_REWRITE Rewrite the config file before restarting. 1604 * 1605 * On success the function does not return, because the process turns into 1606 * a different process. On error C_ERR is returned. */ 1607 int restartServer(int flags, mstime_t delay) { 1608 int j; 1609 1610 /* Check if we still have accesses to the executable that started this 1611 * server instance. */ 1612 if (access(server.executable,X_OK) == -1) return C_ERR; 1613 1614 /* Config rewriting. */ 1615 if (flags & RESTART_SERVER_CONFIG_REWRITE && 1616 server.configfile && 1617 rewriteConfig(server.configfile) == -1) return C_ERR; 1618 1619 /* Perform a proper shutdown. */ 1620 if (flags & RESTART_SERVER_GRACEFULLY && 1621 prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK) return C_ERR; 1622 1623 /* Close all file descriptors, with the exception of stdin, stdout, strerr 1624 * which are useful if we restart a Redis server which is not daemonized. */ 1625 for (j = 3; j < (int)server.maxclients + 1024; j++) close(j); 1626 1627 /* Execute the server with the original command line. */ 1628 if (delay) usleep(delay*1000); 1629 execve(server.executable,server.exec_argv,environ); 1630 1631 /* If an error occurred here, there is nothing we can do, but exit. */ 1632 _exit(1); 1633 1634 return C_ERR; /* Never reached. */ 1635 } 1636 1637 /* This function will try to raise the max number of open files accordingly to 1638 * the configured max number of clients. It also reserves a number of file 1639 * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of 1640 * persistence, listening sockets, log files and so forth. 1641 * 1642 * If it will not be possible to set the limit accordingly to the configured 1643 * max number of clients, the function will do the reverse setting 1644 * server.maxclients to the value that we can actually handle. */ 1645 void adjustOpenFilesLimit(void) { 1646 rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS; 1647 struct rlimit limit; 1648 1649 if (getrlimit(RLIMIT_NOFILE,&limit) == -1) { 1650 serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.", 1651 strerror(errno)); 1652 server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS; 1653 } else { 1654 rlim_t oldlimit = limit.rlim_cur; 1655 1656 /* Set the max number of files if the current limit is not enough 1657 * for our needs. */ 1658 if (oldlimit < maxfiles) { 1659 rlim_t bestlimit; 1660 int setrlimit_error = 0; 1661 1662 /* Try to set the file limit to match 'maxfiles' or at least 1663 * to the higher value supported less than maxfiles. */ 1664 bestlimit = maxfiles; 1665 while(bestlimit > oldlimit) { 1666 rlim_t decr_step = 16; 1667 1668 limit.rlim_cur = bestlimit; 1669 limit.rlim_max = bestlimit; 1670 if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break; 1671 setrlimit_error = errno; 1672 1673 /* We failed to set file limit to 'bestlimit'. Try with a 1674 * smaller limit decrementing by a few FDs per iteration. */ 1675 if (bestlimit < decr_step) break; 1676 bestlimit -= decr_step; 1677 } 1678 1679 /* Assume that the limit we get initially is still valid if 1680 * our last try was even lower. */ 1681 if (bestlimit < oldlimit) bestlimit = oldlimit; 1682 1683 if (bestlimit < maxfiles) { 1684 int old_maxclients = server.maxclients; 1685 server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS; 1686 if (server.maxclients < 1) { 1687 serverLog(LL_WARNING,"Your current 'ulimit -n' " 1688 "of %llu is not enough for the server to start. " 1689 "Please increase your open file limit to at least " 1690 "%llu. Exiting.", 1691 (unsigned long long) oldlimit, 1692 (unsigned long long) maxfiles); 1693 exit(1); 1694 } 1695 serverLog(LL_WARNING,"You requested maxclients of %d " 1696 "requiring at least %llu max file descriptors.", 1697 old_maxclients, 1698 (unsigned long long) maxfiles); 1699 serverLog(LL_WARNING,"Server can't set maximum open files " 1700 "to %llu because of OS error: %s.", 1701 (unsigned long long) maxfiles, strerror(setrlimit_error)); 1702 serverLog(LL_WARNING,"Current maximum open files is %llu. " 1703 "maxclients has been reduced to %d to compensate for " 1704 "low ulimit. " 1705 "If you need higher maxclients increase 'ulimit -n'.", 1706 (unsigned long long) bestlimit, server.maxclients); 1707 } else { 1708 serverLog(LL_NOTICE,"Increased maximum number of open files " 1709 "to %llu (it was originally set to %llu).", 1710 (unsigned long long) maxfiles, 1711 (unsigned long long) oldlimit); 1712 } 1713 } 1714 } 1715 } 1716 1717 /* Check that server.tcp_backlog can be actually enforced in Linux according 1718 * to the value of /proc/sys/net/core/somaxconn, or warn about it. */ 1719 void checkTcpBacklogSettings(void) { 1720 #ifdef HAVE_PROC_SOMAXCONN 1721 FILE *fp = fopen("/proc/sys/net/core/somaxconn","r"); 1722 char buf[1024]; 1723 if (!fp) return; 1724 if (fgets(buf,sizeof(buf),fp) != NULL) { 1725 int somaxconn = atoi(buf); 1726 if (somaxconn > 0 && somaxconn < server.tcp_backlog) { 1727 serverLog(LL_WARNING,"WARNING: The TCP backlog setting of %d cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of %d.", server.tcp_backlog, somaxconn); 1728 } 1729 } 1730 fclose(fp); 1731 #endif 1732 } 1733 1734 /* Initialize a set of file descriptors to listen to the specified 'port' 1735 * binding the addresses specified in the Redis server configuration. 1736 * 1737 * The listening file descriptors are stored in the integer array 'fds' 1738 * and their number is set in '*count'. 1739 * 1740 * The addresses to bind are specified in the global server.bindaddr array 1741 * and their number is server.bindaddr_count. If the server configuration 1742 * contains no specific addresses to bind, this function will try to 1743 * bind * (all addresses) for both the IPv4 and IPv6 protocols. 1744 * 1745 * On success the function returns C_OK. 1746 * 1747 * On error the function returns C_ERR. For the function to be on 1748 * error, at least one of the server.bindaddr addresses was 1749 * impossible to bind, or no bind addresses were specified in the server 1750 * configuration but the function is not able to bind * for at least 1751 * one of the IPv4 or IPv6 protocols. */ 1752 int listenToPort(int port, int *fds, int *count) { 1753 int j; 1754 1755 /* Force binding of 0.0.0.0 if no bind address is specified, always 1756 * entering the loop if j == 0. */ 1757 if (server.bindaddr_count == 0) server.bindaddr[0] = NULL; 1758 for (j = 0; j < server.bindaddr_count || j == 0; j++) { 1759 if (server.bindaddr[j] == NULL) { 1760 /* Bind * for both IPv6 and IPv4, we enter here only if 1761 * server.bindaddr_count == 0. */ 1762 fds[*count] = anetTcp6Server(server.neterr,port,NULL, 1763 server.tcp_backlog); 1764 if (fds[*count] != ANET_ERR) { 1765 anetNonBlock(NULL,fds[*count]); 1766 (*count)++; 1767 } 1768 fds[*count] = anetTcpServer(server.neterr,port,NULL, 1769 server.tcp_backlog); 1770 if (fds[*count] != ANET_ERR) { 1771 anetNonBlock(NULL,fds[*count]); 1772 (*count)++; 1773 } 1774 /* Exit the loop if we were able to bind * on IPv4 or IPv6, 1775 * otherwise fds[*count] will be ANET_ERR and we'll print an 1776 * error and return to the caller with an error. */ 1777 if (*count) break; 1778 } else if (strchr(server.bindaddr[j],':')) { 1779 /* Bind IPv6 address. */ 1780 fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], 1781 server.tcp_backlog); 1782 } else { 1783 /* Bind IPv4 address. */ 1784 fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], 1785 server.tcp_backlog); 1786 } 1787 if (fds[*count] == ANET_ERR) { 1788 serverLog(LL_WARNING, 1789 "Creating Server TCP listening socket %s:%d: %s", 1790 server.bindaddr[j] ? server.bindaddr[j] : "*", 1791 port, server.neterr); 1792 return C_ERR; 1793 } 1794 anetNonBlock(NULL,fds[*count]); 1795 (*count)++; 1796 } 1797 return C_OK; 1798 } 1799 1800 /* Resets the stats that we expose via INFO or other means that we want 1801 * to reset via CONFIG RESETSTAT. The function is also used in order to 1802 * initialize these fields in initServer() at server startup. */ 1803 void resetServerStats(void) { 1804 int j; 1805 1806 server.stat_numcommands = 0; 1807 server.stat_numconnections = 0; 1808 server.stat_expiredkeys = 0; 1809 server.stat_evictedkeys = 0; 1810 server.stat_keyspace_misses = 0; 1811 server.stat_keyspace_hits = 0; 1812 server.stat_fork_time = 0; 1813 server.stat_fork_rate = 0; 1814 server.stat_rejected_conn = 0; 1815 server.stat_sync_full = 0; 1816 server.stat_sync_partial_ok = 0; 1817 server.stat_sync_partial_err = 0; 1818 for (j = 0; j < STATS_METRIC_COUNT; j++) { 1819 server.inst_metric[j].idx = 0; 1820 server.inst_metric[j].last_sample_time = mstime(); 1821 server.inst_metric[j].last_sample_count = 0; 1822 memset(server.inst_metric[j].samples,0, 1823 sizeof(server.inst_metric[j].samples)); 1824 } 1825 server.stat_net_input_bytes = 0; 1826 server.stat_net_output_bytes = 0; 1827 server.aof_delayed_fsync = 0; 1828 } 1829 1830 void initServer(void) { 1831 int j; 1832 1833 signal(SIGHUP, SIG_IGN); 1834 signal(SIGPIPE, SIG_IGN); 1835 setupSignalHandlers(); 1836 1837 if (server.syslog_enabled) { 1838 openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT, 1839 server.syslog_facility); 1840 } 1841 1842 server.pid = getpid(); 1843 server.current_client = NULL; 1844 server.clients = listCreate(); 1845 server.clients_to_close = listCreate(); 1846 server.slaves = listCreate(); 1847 server.monitors = listCreate(); 1848 server.clients_pending_write = listCreate(); 1849 server.slaveseldb = -1; /* Force to emit the first SELECT command. */ 1850 server.unblocked_clients = listCreate(); 1851 server.ready_keys = listCreate(); 1852 server.clients_waiting_acks = listCreate(); 1853 server.get_ack_from_slaves = 0; 1854 server.clients_paused = 0; 1855 server.system_memory_size = zmalloc_get_memory_size(); 1856 1857 createSharedObjects(); 1858 adjustOpenFilesLimit(); 1859 server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); 1860 server.db = zmalloc(sizeof(redisDb)*server.dbnum); 1861 1862 /* Open the TCP listening socket for the user commands. */ 1863 if (server.port != 0 && 1864 listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) 1865 exit(1); 1866 1867 /* Open the listening Unix domain socket. */ 1868 if (server.unixsocket != NULL) { 1869 unlink(server.unixsocket); /* don't care if this fails */ 1870 server.sofd = anetUnixServer(server.neterr,server.unixsocket, 1871 server.unixsocketperm, server.tcp_backlog); 1872 if (server.sofd == ANET_ERR) { 1873 serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr); 1874 exit(1); 1875 } 1876 anetNonBlock(NULL,server.sofd); 1877 } 1878 1879 /* Abort if there are no listening sockets at all. */ 1880 if (server.ipfd_count == 0 && server.sofd < 0) { 1881 serverLog(LL_WARNING, "Configured to not listen anywhere, exiting."); 1882 exit(1); 1883 } 1884 1885 /* Create the Redis databases, and initialize other internal state. */ 1886 for (j = 0; j < server.dbnum; j++) { 1887 server.db[j].dict = dictCreate(&dbDictType,NULL); 1888 server.db[j].expires = dictCreate(&keyptrDictType,NULL); 1889 server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); 1890 server.db[j].ready_keys = dictCreate(&setDictType,NULL); 1891 server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); 1892 server.db[j].eviction_pool = evictionPoolAlloc(); 1893 server.db[j].id = j; 1894 server.db[j].avg_ttl = 0; 1895 } 1896 server.pubsub_channels = dictCreate(&keylistDictType,NULL); 1897 server.pubsub_patterns = listCreate(); 1898 listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); 1899 listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern); 1900 server.cronloops = 0; 1901 server.rdb_child_pid = -1; 1902 server.aof_child_pid = -1; 1903 server.rdb_child_type = RDB_CHILD_TYPE_NONE; 1904 aofRewriteBufferReset(); 1905 server.aof_buf = sdsempty(); 1906 server.lastsave = time(NULL); /* At startup we consider the DB saved. */ 1907 server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */ 1908 server.rdb_save_time_last = -1; 1909 server.rdb_save_time_start = -1; 1910 server.dirty = 0; 1911 resetServerStats(); 1912 /* A few stats we don't want to reset: server startup time, and peak mem. */ 1913 server.stat_starttime = time(NULL); 1914 server.stat_peak_memory = 0; 1915 server.resident_set_size = 0; 1916 server.lastbgsave_status = C_OK; 1917 server.aof_last_write_status = C_OK; 1918 server.aof_last_write_errno = 0; 1919 server.repl_good_slaves_count = 0; 1920 updateCachedTime(); 1921 1922 /* Create the serverCron() time event, that's our main way to process 1923 * background operations. */ 1924 if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { 1925 serverPanic("Can't create the serverCron time event."); 1926 exit(1); 1927 } 1928 1929 /* Create an event handler for accepting new connections in TCP and Unix 1930 * domain sockets. */ 1931 for (j = 0; j < server.ipfd_count; j++) { 1932 if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, 1933 acceptTcpHandler,NULL) == AE_ERR) 1934 { 1935 serverPanic( 1936 "Unrecoverable error creating server.ipfd file event."); 1937 } 1938 } 1939 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, 1940 acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); 1941 1942 /* Open the AOF file if needed. */ 1943 if (server.aof_state == AOF_ON) { 1944 server.aof_fd = open(server.aof_filename, 1945 O_WRONLY|O_APPEND|O_CREAT,0644); 1946 if (server.aof_fd == -1) { 1947 serverLog(LL_WARNING, "Can't open the append-only file: %s", 1948 strerror(errno)); 1949 exit(1); 1950 } 1951 } 1952 1953 /* 32 bit instances are limited to 4GB of address space, so if there is 1954 * no explicit limit in the user provided configuration we set a limit 1955 * at 3 GB using maxmemory with 'noeviction' policy'. This avoids 1956 * useless crashes of the Redis instance for out of memory. */ 1957 if (server.arch_bits == 32 && server.maxmemory == 0) { 1958 serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now."); 1959 server.maxmemory = 3072LL*(1024*1024); /* 3 GB */ 1960 server.maxmemory_policy = MAXMEMORY_NO_EVICTION; 1961 } 1962 1963 if (server.cluster_enabled) clusterInit(); 1964 replicationScriptCacheInit(); 1965 scriptingInit(1); 1966 slowlogInit(); 1967 latencyMonitorInit(); 1968 bioInit(); 1969 } 1970 1971 /* Populates the Redis Command Table starting from the hard coded list 1972 * we have on top of redis.c file. */ 1973 void populateCommandTable(void) { 1974 int j; 1975 int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); 1976 1977 for (j = 0; j < numcommands; j++) { 1978 struct redisCommand *c = redisCommandTable+j; 1979 char *f = c->sflags; 1980 int retval1, retval2; 1981 1982 while(*f != '\0') { 1983 switch(*f) { 1984 case 'w': c->flags |= CMD_WRITE; break; 1985 case 'r': c->flags |= CMD_READONLY; break; 1986 case 'm': c->flags |= CMD_DENYOOM; break; 1987 case 'a': c->flags |= CMD_ADMIN; break; 1988 case 'p': c->flags |= CMD_PUBSUB; break; 1989 case 's': c->flags |= CMD_NOSCRIPT; break; 1990 case 'R': c->flags |= CMD_RANDOM; break; 1991 case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break; 1992 case 'l': c->flags |= CMD_LOADING; break; 1993 case 't': c->flags |= CMD_STALE; break; 1994 case 'M': c->flags |= CMD_SKIP_MONITOR; break; 1995 case 'k': c->flags |= CMD_ASKING; break; 1996 case 'F': c->flags |= CMD_FAST; break; 1997 default: serverPanic("Unsupported command flag"); break; 1998 } 1999 f++; 2000 } 2001 2002 retval1 = dictAdd(server.commands, sdsnew(c->name), c); 2003 /* Populate an additional dictionary that will be unaffected 2004 * by rename-command statements in redis.conf. */ 2005 retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c); 2006 serverAssert(retval1 == DICT_OK && retval2 == DICT_OK); 2007 } 2008 } 2009 2010 void resetCommandTableStats(void) { 2011 int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); 2012 int j; 2013 2014 for (j = 0; j < numcommands; j++) { 2015 struct redisCommand *c = redisCommandTable+j; 2016 2017 c->microseconds = 0; 2018 c->calls = 0; 2019 } 2020 } 2021 2022 /* ========================== Redis OP Array API ============================ */ 2023 2024 void redisOpArrayInit(redisOpArray *oa) { 2025 oa->ops = NULL; 2026 oa->numops = 0; 2027 } 2028 2029 int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid, 2030 robj **argv, int argc, int target) 2031 { 2032 redisOp *op; 2033 2034 oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1)); 2035 op = oa->ops+oa->numops; 2036 op->cmd = cmd; 2037 op->dbid = dbid; 2038 op->argv = argv; 2039 op->argc = argc; 2040 op->target = target; 2041 oa->numops++; 2042 return oa->numops; 2043 } 2044 2045 void redisOpArrayFree(redisOpArray *oa) { 2046 while(oa->numops) { 2047 int j; 2048 redisOp *op; 2049 2050 oa->numops--; 2051 op = oa->ops+oa->numops; 2052 for (j = 0; j < op->argc; j++) 2053 decrRefCount(op->argv[j]); 2054 zfree(op->argv); 2055 } 2056 zfree(oa->ops); 2057 } 2058 2059 /* ====================== Commands lookup and execution ===================== */ 2060 2061 struct redisCommand *lookupCommand(sds name) { 2062 return dictFetchValue(server.commands, name); 2063 } 2064 2065 struct redisCommand *lookupCommandByCString(char *s) { 2066 struct redisCommand *cmd; 2067 sds name = sdsnew(s); 2068 2069 cmd = dictFetchValue(server.commands, name); 2070 sdsfree(name); 2071 return cmd; 2072 } 2073 2074 /* Lookup the command in the current table, if not found also check in 2075 * the original table containing the original command names unaffected by 2076 * redis.conf rename-command statement. 2077 * 2078 * This is used by functions rewriting the argument vector such as 2079 * rewriteClientCommandVector() in order to set client->cmd pointer 2080 * correctly even if the command was renamed. */ 2081 struct redisCommand *lookupCommandOrOriginal(sds name) { 2082 struct redisCommand *cmd = dictFetchValue(server.commands, name); 2083 2084 if (!cmd) cmd = dictFetchValue(server.orig_commands,name); 2085 return cmd; 2086 } 2087 2088 /* Propagate the specified command (in the context of the specified database id) 2089 * to AOF and Slaves. 2090 * 2091 * flags are an xor between: 2092 * + PROPAGATE_NONE (no propagation of command at all) 2093 * + PROPAGATE_AOF (propagate into the AOF file if is enabled) 2094 * + PROPAGATE_REPL (propagate into the replication link) 2095 * 2096 * This should not be used inside commands implementation. Use instead 2097 * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation(). 2098 */ 2099 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, 2100 int flags) 2101 { 2102 if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) 2103 feedAppendOnlyFile(cmd,dbid,argv,argc); 2104 if (flags & PROPAGATE_REPL) 2105 replicationFeedSlaves(server.slaves,dbid,argv,argc); 2106 } 2107 2108 /* Used inside commands to schedule the propagation of additional commands 2109 * after the current command is propagated to AOF / Replication. 2110 * 2111 * 'cmd' must be a pointer to the Redis command to replicate, dbid is the 2112 * database ID the command should be propagated into. 2113 * Arguments of the command to propagte are passed as an array of redis 2114 * objects pointers of len 'argc', using the 'argv' vector. 2115 * 2116 * The function does not take a reference to the passed 'argv' vector, 2117 * so it is up to the caller to release the passed argv (but it is usually 2118 * stack allocated). The function autoamtically increments ref count of 2119 * passed objects, so the caller does not need to. */ 2120 void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, 2121 int target) 2122 { 2123 robj **argvcopy; 2124 int j; 2125 2126 if (server.loading) return; /* No propagation during loading. */ 2127 2128 argvcopy = zmalloc(sizeof(robj*)*argc); 2129 for (j = 0; j < argc; j++) { 2130 argvcopy[j] = argv[j]; 2131 incrRefCount(argv[j]); 2132 } 2133 redisOpArrayAppend(&server.also_propagate,cmd,dbid,argvcopy,argc,target); 2134 } 2135 2136 /* It is possible to call the function forceCommandPropagation() inside a 2137 * Redis command implementation in order to to force the propagation of a 2138 * specific command execution into AOF / Replication. */ 2139 void forceCommandPropagation(client *c, int flags) { 2140 if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL; 2141 if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF; 2142 } 2143 2144 /* Avoid that the executed command is propagated at all. This way we 2145 * are free to just propagate what we want using the alsoPropagate() 2146 * API. */ 2147 void preventCommandPropagation(client *c) { 2148 c->flags |= CLIENT_PREVENT_PROP; 2149 } 2150 2151 /* AOF specific version of preventCommandPropagation(). */ 2152 void preventCommandAOF(client *c) { 2153 c->flags |= CLIENT_PREVENT_AOF_PROP; 2154 } 2155 2156 /* Replication specific version of preventCommandPropagation(). */ 2157 void preventCommandReplication(client *c) { 2158 c->flags |= CLIENT_PREVENT_REPL_PROP; 2159 } 2160 2161 /* Call() is the core of Redis execution of a command. 2162 * 2163 * The following flags can be passed: 2164 * CMD_CALL_NONE No flags. 2165 * CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed. 2166 * CMD_CALL_STATS Populate command stats. 2167 * CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset 2168 * or if the client flags are forcing propagation. 2169 * CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset 2170 * or if the client flags are forcing propagation. 2171 * CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL. 2172 * CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE. 2173 * 2174 * The exact propagation behavior depends on the client flags. 2175 * Specifically: 2176 * 2177 * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set 2178 * and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set 2179 * in the call flags, then the command is propagated even if the 2180 * dataset was not affected by the command. 2181 * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP 2182 * are set, the propagation into AOF or to slaves is not performed even 2183 * if the command modified the dataset. 2184 * 2185 * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF 2186 * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or 2187 * slaves propagation will never occur. 2188 * 2189 * Client flags are modified by the implementation of a given command 2190 * using the following API: 2191 * 2192 * forceCommandPropagation(client *c, int flags); 2193 * preventCommandPropagation(client *c); 2194 * preventCommandAOF(client *c); 2195 * preventCommandReplication(client *c); 2196 * 2197 */ 2198 void call(client *c, int flags) { 2199 long long dirty, start, duration; 2200 int client_old_flags = c->flags; 2201 2202 /* Sent the command to clients in MONITOR mode, only if the commands are 2203 * not generated from reading an AOF. */ 2204 if (listLength(server.monitors) && 2205 !server.loading && 2206 !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) 2207 { 2208 replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); 2209 } 2210 2211 /* Initialization: clear the flags that must be set by the command on 2212 * demand, and initialize the array for additional commands propagation. */ 2213 c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); 2214 redisOpArrayInit(&server.also_propagate); 2215 2216 /* Call the command. */ 2217 dirty = server.dirty; 2218 start = ustime(); 2219 c->cmd->proc(c); 2220 duration = ustime()-start; 2221 dirty = server.dirty-dirty; 2222 if (dirty < 0) dirty = 0; 2223 2224 /* When EVAL is called loading the AOF we don't want commands called 2225 * from Lua to go into the slowlog or to populate statistics. */ 2226 if (server.loading && c->flags & CLIENT_LUA) 2227 flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS); 2228 2229 /* If the caller is Lua, we want to force the EVAL caller to propagate 2230 * the script if the command flag or client flag are forcing the 2231 * propagation. */ 2232 if (c->flags & CLIENT_LUA && server.lua_caller) { 2233 if (c->flags & CLIENT_FORCE_REPL) 2234 server.lua_caller->flags |= CLIENT_FORCE_REPL; 2235 if (c->flags & CLIENT_FORCE_AOF) 2236 server.lua_caller->flags |= CLIENT_FORCE_AOF; 2237 } 2238 2239 /* Log the command into the Slow log if needed, and populate the 2240 * per-command statistics that we show in INFO commandstats. */ 2241 if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) { 2242 char *latency_event = (c->cmd->flags & CMD_FAST) ? 2243 "fast-command" : "command"; 2244 latencyAddSampleIfNeeded(latency_event,duration/1000); 2245 slowlogPushEntryIfNeeded(c->argv,c->argc,duration); 2246 } 2247 if (flags & CMD_CALL_STATS) { 2248 c->lastcmd->microseconds += duration; 2249 c->lastcmd->calls++; 2250 } 2251 2252 /* Propagate the command into the AOF and replication link */ 2253 if (flags & CMD_CALL_PROPAGATE && 2254 (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) 2255 { 2256 int propagate_flags = PROPAGATE_NONE; 2257 2258 /* Check if the command operated changes in the data set. If so 2259 * set for replication / AOF propagation. */ 2260 if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); 2261 2262 /* If the client forced AOF / replication of the command, set 2263 * the flags regardless of the command effects on the data set. */ 2264 if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL; 2265 if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF; 2266 2267 /* However prevent AOF / replication propagation if the command 2268 * implementatino called preventCommandPropagation() or similar, 2269 * or if we don't have the call() flags to do so. */ 2270 if (c->flags & CLIENT_PREVENT_REPL_PROP || 2271 !(flags & CMD_CALL_PROPAGATE_REPL)) 2272 propagate_flags &= ~PROPAGATE_REPL; 2273 if (c->flags & CLIENT_PREVENT_AOF_PROP || 2274 !(flags & CMD_CALL_PROPAGATE_AOF)) 2275 propagate_flags &= ~PROPAGATE_AOF; 2276 2277 /* Call propagate() only if at least one of AOF / replication 2278 * propagation is needed. */ 2279 if (propagate_flags != PROPAGATE_NONE) 2280 propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); 2281 } 2282 2283 /* Restore the old replication flags, since call() can be executed 2284 * recursively. */ 2285 c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); 2286 c->flags |= client_old_flags & 2287 (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); 2288 2289 /* Handle the alsoPropagate() API to handle commands that want to propagate 2290 * multiple separated commands. Note that alsoPropagate() is not affected 2291 * by CLIENT_PREVENT_PROP flag. */ 2292 if (server.also_propagate.numops) { 2293 int j; 2294 redisOp *rop; 2295 2296 if (flags & CMD_CALL_PROPAGATE) { 2297 for (j = 0; j < server.also_propagate.numops; j++) { 2298 rop = &server.also_propagate.ops[j]; 2299 int target = rop->target; 2300 /* Whatever the command wish is, we honor the call() flags. */ 2301 if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF; 2302 if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL; 2303 if (target) 2304 propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target); 2305 } 2306 } 2307 redisOpArrayFree(&server.also_propagate); 2308 } 2309 server.stat_numcommands++; 2310 } 2311 2312 /* If this function gets called we already read a whole 2313 * command, arguments are in the client argv/argc fields. 2314 * processCommand() execute the command or prepare the 2315 * server for a bulk read from the client. 2316 * 2317 * If C_OK is returned the client is still alive and valid and 2318 * other operations can be performed by the caller. Otherwise 2319 * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ 2320 int processCommand(client *c) { 2321 /* The QUIT command is handled separately. Normal command procs will 2322 * go through checking for replication and QUIT will cause trouble 2323 * when FORCE_REPLICATION is enabled and would be implemented in 2324 * a regular command proc. */ 2325 if (!strcasecmp(c->argv[0]->ptr,"quit")) { 2326 addReply(c,shared.ok); 2327 c->flags |= CLIENT_CLOSE_AFTER_REPLY; 2328 return C_ERR; 2329 } 2330 2331 /* Now lookup the command and check ASAP about trivial error conditions 2332 * such as wrong arity, bad command name and so forth. */ 2333 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); 2334 if (!c->cmd) { 2335 flagTransaction(c); 2336 addReplyErrorFormat(c,"unknown command '%s'", 2337 (char*)c->argv[0]->ptr); 2338 return C_OK; 2339 } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || 2340 (c->argc < -c->cmd->arity)) { 2341 flagTransaction(c); 2342 addReplyErrorFormat(c,"wrong number of arguments for '%s' command", 2343 c->cmd->name); 2344 return C_OK; 2345 } 2346 2347 /* Check if the user is authenticated */ 2348 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) 2349 { 2350 flagTransaction(c); 2351 addReply(c,shared.noautherr); 2352 return C_OK; 2353 } 2354 2355 /* If cluster is enabled perform the cluster redirection here. 2356 * However we don't perform the redirection if: 2357 * 1) The sender of this command is our master. 2358 * 2) The command has no key arguments. */ 2359 if (server.cluster_enabled && 2360 !(c->flags & CLIENT_MASTER) && 2361 !(c->flags & CLIENT_LUA && 2362 server.lua_caller->flags & CLIENT_MASTER) && 2363 !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) 2364 { 2365 int hashslot; 2366 2367 if (server.cluster->state != CLUSTER_OK) { 2368 flagTransaction(c); 2369 clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE); 2370 return C_OK; 2371 } else { 2372 int error_code; 2373 clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); 2374 if (n == NULL || n != server.cluster->myself) { 2375 flagTransaction(c); 2376 clusterRedirectClient(c,n,hashslot,error_code); 2377 return C_OK; 2378 } 2379 } 2380 } 2381 2382 /* Handle the maxmemory directive. 2383 * 2384 * First we try to free some memory if possible (if there are volatile 2385 * keys in the dataset). If there are not the only thing we can do 2386 * is returning an error. */ 2387 if (server.maxmemory) { 2388 int retval = freeMemoryIfNeeded(); 2389 /* freeMemoryIfNeeded may flush slave output buffers. This may result 2390 * into a slave, that may be the active client, to be freed. */ 2391 if (server.current_client == NULL) return C_ERR; 2392 2393 /* It was impossible to free enough memory, and the command the client 2394 * is trying to execute is denied during OOM conditions? Error. */ 2395 if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) { 2396 flagTransaction(c); 2397 addReply(c, shared.oomerr); 2398 return C_OK; 2399 } 2400 } 2401 2402 /* Don't accept write commands if there are problems persisting on disk 2403 * and if this is a master instance. */ 2404 if (((server.stop_writes_on_bgsave_err && 2405 server.saveparamslen > 0 && 2406 server.lastbgsave_status == C_ERR) || 2407 server.aof_last_write_status == C_ERR) && 2408 server.masterhost == NULL && 2409 (c->cmd->flags & CMD_WRITE || 2410 c->cmd->proc == pingCommand)) 2411 { 2412 flagTransaction(c); 2413 if (server.aof_last_write_status == C_OK) 2414 addReply(c, shared.bgsaveerr); 2415 else 2416 addReplySds(c, 2417 sdscatprintf(sdsempty(), 2418 "-MISCONF Errors writing to the AOF file: %s\r\n", 2419 strerror(server.aof_last_write_errno))); 2420 return C_OK; 2421 } 2422 2423 /* Don't accept write commands if there are not enough good slaves and 2424 * user configured the min-slaves-to-write option. */ 2425 if (server.masterhost == NULL && 2426 server.repl_min_slaves_to_write && 2427 server.repl_min_slaves_max_lag && 2428 c->cmd->flags & CMD_WRITE && 2429 server.repl_good_slaves_count < server.repl_min_slaves_to_write) 2430 { 2431 flagTransaction(c); 2432 addReply(c, shared.noreplicaserr); 2433 return C_OK; 2434 } 2435 2436 /* Don't accept write commands if this is a read only slave. But 2437 * accept write commands if this is our master. */ 2438 if (server.masterhost && server.repl_slave_ro && 2439 !(c->flags & CLIENT_MASTER) && 2440 c->cmd->flags & CMD_WRITE) 2441 { 2442 addReply(c, shared.roslaveerr); 2443 return C_OK; 2444 } 2445 2446 /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ 2447 if (c->flags & CLIENT_PUBSUB && 2448 c->cmd->proc != pingCommand && 2449 c->cmd->proc != subscribeCommand && 2450 c->cmd->proc != unsubscribeCommand && 2451 c->cmd->proc != psubscribeCommand && 2452 c->cmd->proc != punsubscribeCommand) { 2453 addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"); 2454 return C_OK; 2455 } 2456 2457 /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and 2458 * we are a slave with a broken link with master. */ 2459 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && 2460 server.repl_serve_stale_data == 0 && 2461 !(c->cmd->flags & CMD_STALE)) 2462 { 2463 flagTransaction(c); 2464 addReply(c, shared.masterdownerr); 2465 return C_OK; 2466 } 2467 2468 /* Loading DB? Return an error if the command has not the 2469 * CMD_LOADING flag. */ 2470 if (server.loading && !(c->cmd->flags & CMD_LOADING)) { 2471 addReply(c, shared.loadingerr); 2472 return C_OK; 2473 } 2474 2475 /* Lua script too slow? Only allow a limited number of commands. */ 2476 if (server.lua_timedout && 2477 c->cmd->proc != authCommand && 2478 c->cmd->proc != replconfCommand && 2479 !(c->cmd->proc == shutdownCommand && 2480 c->argc == 2 && 2481 tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && 2482 !(c->cmd->proc == scriptCommand && 2483 c->argc == 2 && 2484 tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) 2485 { 2486 flagTransaction(c); 2487 addReply(c, shared.slowscripterr); 2488 return C_OK; 2489 } 2490 2491 /* Exec the command */ 2492 if (c->flags & CLIENT_MULTI && 2493 c->cmd->proc != execCommand && c->cmd->proc != discardCommand && 2494 c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) 2495 { 2496 queueMultiCommand(c); 2497 addReply(c,shared.queued); 2498 } else { 2499 call(c,CMD_CALL_FULL); 2500 c->woff = server.master_repl_offset; 2501 if (listLength(server.ready_keys)) 2502 handleClientsBlockedOnLists(); 2503 } 2504 return C_OK; 2505 } 2506 2507 /*================================== Shutdown =============================== */ 2508 2509 /* Close listening sockets. Also unlink the unix domain socket if 2510 * unlink_unix_socket is non-zero. */ 2511 void closeListeningSockets(int unlink_unix_socket) { 2512 int j; 2513 2514 for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]); 2515 if (server.sofd != -1) close(server.sofd); 2516 if (server.cluster_enabled) 2517 for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]); 2518 if (unlink_unix_socket && server.unixsocket) { 2519 serverLog(LL_NOTICE,"Removing the unix socket file."); 2520 unlink(server.unixsocket); /* don't care if this fails */ 2521 } 2522 } 2523 2524 int prepareForShutdown(int flags) { 2525 int save = flags & SHUTDOWN_SAVE; 2526 int nosave = flags & SHUTDOWN_NOSAVE; 2527 2528 serverLog(LL_WARNING,"User requested shutdown..."); 2529 2530 /* Kill all the Lua debugger forked sessions. */ 2531 ldbKillForkedSessions(); 2532 2533 /* Kill the saving child if there is a background saving in progress. 2534 We want to avoid race conditions, for instance our saving child may 2535 overwrite the synchronous saving did by SHUTDOWN. */ 2536 if (server.rdb_child_pid != -1) { 2537 serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!"); 2538 kill(server.rdb_child_pid,SIGUSR1); 2539 rdbRemoveTempFile(server.rdb_child_pid); 2540 } 2541 2542 if (server.aof_state != AOF_OFF) { 2543 /* Kill the AOF saving child as the AOF we already have may be longer 2544 * but contains the full dataset anyway. */ 2545 if (server.aof_child_pid != -1) { 2546 /* If we have AOF enabled but haven't written the AOF yet, don't 2547 * shutdown or else the dataset will be lost. */ 2548 if (server.aof_state == AOF_WAIT_REWRITE) { 2549 serverLog(LL_WARNING, "Writing initial AOF, can't exit."); 2550 return C_ERR; 2551 } 2552 serverLog(LL_WARNING, 2553 "There is a child rewriting the AOF. Killing it!"); 2554 kill(server.aof_child_pid,SIGUSR1); 2555 } 2556 /* Append only file: fsync() the AOF and exit */ 2557 serverLog(LL_NOTICE,"Calling fsync() on the AOF file."); 2558 aof_fsync(server.aof_fd); 2559 } 2560 2561 /* Create a new RDB file before exiting. */ 2562 if ((server.saveparamslen > 0 && !nosave) || save) { 2563 serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting."); 2564 /* Snapshotting. Perform a SYNC SAVE and exit */ 2565 if (rdbSave(server.rdb_filename) != C_OK) { 2566 /* Ooops.. error saving! The best we can do is to continue 2567 * operating. Note that if there was a background saving process, 2568 * in the next cron() Redis will be notified that the background 2569 * saving aborted, handling special stuff like slaves pending for 2570 * synchronization... */ 2571 serverLog(LL_WARNING,"Error trying to save the DB, can't exit."); 2572 return C_ERR; 2573 } 2574 } 2575 2576 /* Remove the pid file if possible and needed. */ 2577 if (server.daemonize || server.pidfile) { 2578 serverLog(LL_NOTICE,"Removing the pid file."); 2579 unlink(server.pidfile); 2580 } 2581 2582 /* Best effort flush of slave output buffers, so that we hopefully 2583 * send them pending writes. */ 2584 flushSlavesOutputBuffers(); 2585 2586 /* Close the listening sockets. Apparently this allows faster restarts. */ 2587 closeListeningSockets(1); 2588 serverLog(LL_WARNING,"%s is now ready to exit, bye bye...", 2589 server.sentinel_mode ? "Sentinel" : "Redis"); 2590 return C_OK; 2591 } 2592 2593 /*================================== Commands =============================== */ 2594 2595 /* Return zero if strings are the same, non-zero if they are not. 2596 * The comparison is performed in a way that prevents an attacker to obtain 2597 * information about the nature of the strings just monitoring the execution 2598 * time of the function. 2599 * 2600 * Note that limiting the comparison length to strings up to 512 bytes we 2601 * can avoid leaking any information about the password length and any 2602 * possible branch misprediction related leak. 2603 */ 2604 int time_independent_strcmp(char *a, char *b) { 2605 char bufa[CONFIG_AUTHPASS_MAX_LEN], bufb[CONFIG_AUTHPASS_MAX_LEN]; 2606 /* The above two strlen perform len(a) + len(b) operations where either 2607 * a or b are fixed (our password) length, and the difference is only 2608 * relative to the length of the user provided string, so no information 2609 * leak is possible in the following two lines of code. */ 2610 unsigned int alen = strlen(a); 2611 unsigned int blen = strlen(b); 2612 unsigned int j; 2613 int diff = 0; 2614 2615 /* We can't compare strings longer than our static buffers. 2616 * Note that this will never pass the first test in practical circumstances 2617 * so there is no info leak. */ 2618 if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1; 2619 2620 memset(bufa,0,sizeof(bufa)); /* Constant time. */ 2621 memset(bufb,0,sizeof(bufb)); /* Constant time. */ 2622 /* Again the time of the following two copies is proportional to 2623 * len(a) + len(b) so no info is leaked. */ 2624 memcpy(bufa,a,alen); 2625 memcpy(bufb,b,blen); 2626 2627 /* Always compare all the chars in the two buffers without 2628 * conditional expressions. */ 2629 for (j = 0; j < sizeof(bufa); j++) { 2630 diff |= (bufa[j] ^ bufb[j]); 2631 } 2632 /* Length must be equal as well. */ 2633 diff |= alen ^ blen; 2634 return diff; /* If zero strings are the same. */ 2635 } 2636 2637 void authCommand(client *c) { 2638 if (!server.requirepass) { 2639 addReplyError(c,"Client sent AUTH, but no password is set"); 2640 } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) { 2641 c->authenticated = 1; 2642 addReply(c,shared.ok); 2643 } else { 2644 c->authenticated = 0; 2645 addReplyError(c,"invalid password"); 2646 } 2647 } 2648 2649 /* The PING command. It works in a different way if the client is in 2650 * in Pub/Sub mode. */ 2651 void pingCommand(client *c) { 2652 /* The command takes zero or one arguments. */ 2653 if (c->argc > 2) { 2654 addReplyErrorFormat(c,"wrong number of arguments for '%s' command", 2655 c->cmd->name); 2656 return; 2657 } 2658 2659 if (c->flags & CLIENT_PUBSUB) { 2660 addReply(c,shared.mbulkhdr[2]); 2661 addReplyBulkCBuffer(c,"pong",4); 2662 if (c->argc == 1) 2663 addReplyBulkCBuffer(c,"",0); 2664 else 2665 addReplyBulk(c,c->argv[1]); 2666 } else { 2667 if (c->argc == 1) 2668 addReply(c,shared.pong); 2669 else 2670 addReplyBulk(c,c->argv[1]); 2671 } 2672 } 2673 2674 void echoCommand(client *c) { 2675 addReplyBulk(c,c->argv[1]); 2676 } 2677 2678 void timeCommand(client *c) { 2679 struct timeval tv; 2680 2681 /* gettimeofday() can only fail if &tv is a bad address so we 2682 * don't check for errors. */ 2683 gettimeofday(&tv,NULL); 2684 addReplyMultiBulkLen(c,2); 2685 addReplyBulkLongLong(c,tv.tv_sec); 2686 addReplyBulkLongLong(c,tv.tv_usec); 2687 } 2688 2689 /* Helper function for addReplyCommand() to output flags. */ 2690 int addReplyCommandFlag(client *c, struct redisCommand *cmd, int f, char *reply) { 2691 if (cmd->flags & f) { 2692 addReplyStatus(c, reply); 2693 return 1; 2694 } 2695 return 0; 2696 } 2697 2698 /* Output the representation of a Redis command. Used by the COMMAND command. */ 2699 void addReplyCommand(client *c, struct redisCommand *cmd) { 2700 if (!cmd) { 2701 addReply(c, shared.nullbulk); 2702 } else { 2703 /* We are adding: command name, arg count, flags, first, last, offset */ 2704 addReplyMultiBulkLen(c, 6); 2705 addReplyBulkCString(c, cmd->name); 2706 addReplyLongLong(c, cmd->arity); 2707 2708 int flagcount = 0; 2709 void *flaglen = addDeferredMultiBulkLength(c); 2710 flagcount += addReplyCommandFlag(c,cmd,CMD_WRITE, "write"); 2711 flagcount += addReplyCommandFlag(c,cmd,CMD_READONLY, "readonly"); 2712 flagcount += addReplyCommandFlag(c,cmd,CMD_DENYOOM, "denyoom"); 2713 flagcount += addReplyCommandFlag(c,cmd,CMD_ADMIN, "admin"); 2714 flagcount += addReplyCommandFlag(c,cmd,CMD_PUBSUB, "pubsub"); 2715 flagcount += addReplyCommandFlag(c,cmd,CMD_NOSCRIPT, "noscript"); 2716 flagcount += addReplyCommandFlag(c,cmd,CMD_RANDOM, "random"); 2717 flagcount += addReplyCommandFlag(c,cmd,CMD_SORT_FOR_SCRIPT,"sort_for_script"); 2718 flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading"); 2719 flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale"); 2720 flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor"); 2721 flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking"); 2722 flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast"); 2723 if (cmd->getkeys_proc) { 2724 addReplyStatus(c, "movablekeys"); 2725 flagcount += 1; 2726 } 2727 setDeferredMultiBulkLength(c, flaglen, flagcount); 2728 2729 addReplyLongLong(c, cmd->firstkey); 2730 addReplyLongLong(c, cmd->lastkey); 2731 addReplyLongLong(c, cmd->keystep); 2732 } 2733 } 2734 2735 /* COMMAND <subcommand> <args> */ 2736 void commandCommand(client *c) { 2737 dictIterator *di; 2738 dictEntry *de; 2739 2740 if (c->argc == 1) { 2741 addReplyMultiBulkLen(c, dictSize(server.commands)); 2742 di = dictGetIterator(server.commands); 2743 while ((de = dictNext(di)) != NULL) { 2744 addReplyCommand(c, dictGetVal(de)); 2745 } 2746 dictReleaseIterator(di); 2747 } else if (!strcasecmp(c->argv[1]->ptr, "info")) { 2748 int i; 2749 addReplyMultiBulkLen(c, c->argc-2); 2750 for (i = 2; i < c->argc; i++) { 2751 addReplyCommand(c, dictFetchValue(server.commands, c->argv[i]->ptr)); 2752 } 2753 } else if (!strcasecmp(c->argv[1]->ptr, "count") && c->argc == 2) { 2754 addReplyLongLong(c, dictSize(server.commands)); 2755 } else if (!strcasecmp(c->argv[1]->ptr,"getkeys") && c->argc >= 3) { 2756 struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr); 2757 int *keys, numkeys, j; 2758 2759 if (!cmd) { 2760 addReplyErrorFormat(c,"Invalid command specified"); 2761 return; 2762 } else if ((cmd->arity > 0 && cmd->arity != c->argc-2) || 2763 ((c->argc-2) < -cmd->arity)) 2764 { 2765 addReplyError(c,"Invalid number of arguments specified for command"); 2766 return; 2767 } 2768 2769 keys = getKeysFromCommand(cmd,c->argv+2,c->argc-2,&numkeys); 2770 addReplyMultiBulkLen(c,numkeys); 2771 for (j = 0; j < numkeys; j++) addReplyBulk(c,c->argv[keys[j]+2]); 2772 getKeysFreeResult(keys); 2773 } else { 2774 addReplyError(c, "Unknown subcommand or wrong number of arguments."); 2775 return; 2776 } 2777 } 2778 2779 /* Convert an amount of bytes into a human readable string in the form 2780 * of 100B, 2G, 100M, 4K, and so forth. */ 2781 void bytesToHuman(char *s, unsigned long long n) { 2782 double d; 2783 2784 if (n < 1024) { 2785 /* Bytes */ 2786 sprintf(s,"%lluB",n); 2787 return; 2788 } else if (n < (1024*1024)) { 2789 d = (double)n/(1024); 2790 sprintf(s,"%.2fK",d); 2791 } else if (n < (1024LL*1024*1024)) { 2792 d = (double)n/(1024*1024); 2793 sprintf(s,"%.2fM",d); 2794 } else if (n < (1024LL*1024*1024*1024)) { 2795 d = (double)n/(1024LL*1024*1024); 2796 sprintf(s,"%.2fG",d); 2797 } else if (n < (1024LL*1024*1024*1024*1024)) { 2798 d = (double)n/(1024LL*1024*1024*1024); 2799 sprintf(s,"%.2fT",d); 2800 } else if (n < (1024LL*1024*1024*1024*1024*1024)) { 2801 d = (double)n/(1024LL*1024*1024*1024*1024); 2802 sprintf(s,"%.2fP",d); 2803 } else { 2804 /* Let's hope we never need this */ 2805 sprintf(s,"%lluB",n); 2806 } 2807 } 2808 2809 /* Create the string returned by the INFO command. This is decoupled 2810 * by the INFO command itself as we need to report the same information 2811 * on memory corruption problems. */ 2812 sds genRedisInfoString(char *section) { 2813 sds info = sdsempty(); 2814 time_t uptime = server.unixtime-server.stat_starttime; 2815 int j, numcommands; 2816 struct rusage self_ru, c_ru; 2817 unsigned long lol, bib; 2818 int allsections = 0, defsections = 0; 2819 int sections = 0; 2820 2821 if (section == NULL) section = "default"; 2822 allsections = strcasecmp(section,"all") == 0; 2823 defsections = strcasecmp(section,"default") == 0; 2824 2825 getrusage(RUSAGE_SELF, &self_ru); 2826 getrusage(RUSAGE_CHILDREN, &c_ru); 2827 getClientsMaxBuffers(&lol,&bib); 2828 2829 /* Server */ 2830 if (allsections || defsections || !strcasecmp(section,"server")) { 2831 static int call_uname = 1; 2832 static struct utsname name; 2833 char *mode; 2834 2835 if (server.cluster_enabled) mode = "cluster"; 2836 else if (server.sentinel_mode) mode = "sentinel"; 2837 else mode = "standalone"; 2838 2839 if (sections++) info = sdscat(info,"\r\n"); 2840 2841 if (call_uname) { 2842 /* Uname can be slow and is always the same output. Cache it. */ 2843 uname(&name); 2844 call_uname = 0; 2845 } 2846 2847 info = sdscatprintf(info, 2848 "# Server\r\n" 2849 "redis_version:%s\r\n" 2850 "redis_git_sha1:%s\r\n" 2851 "redis_git_dirty:%d\r\n" 2852 "redis_build_id:%llx\r\n" 2853 "redis_mode:%s\r\n" 2854 "os:%s %s %s\r\n" 2855 "arch_bits:%d\r\n" 2856 "multiplexing_api:%s\r\n" 2857 "gcc_version:%d.%d.%d\r\n" 2858 "process_id:%ld\r\n" 2859 "run_id:%s\r\n" 2860 "tcp_port:%d\r\n" 2861 "uptime_in_seconds:%jd\r\n" 2862 "uptime_in_days:%jd\r\n" 2863 "hz:%d\r\n" 2864 "lru_clock:%ld\r\n" 2865 "executable:%s\r\n" 2866 "config_file:%s\r\n", 2867 REDIS_VERSION, 2868 redisGitSHA1(), 2869 strtol(redisGitDirty(),NULL,10) > 0, 2870 (unsigned long long) redisBuildId(), 2871 mode, 2872 name.sysname, name.release, name.machine, 2873 server.arch_bits, 2874 aeGetApiName(), 2875 #ifdef __GNUC__ 2876 __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__, 2877 #else 2878 0,0,0, 2879 #endif 2880 (long) getpid(), 2881 server.runid, 2882 server.port, 2883 (intmax_t)uptime, 2884 (intmax_t)(uptime/(3600*24)), 2885 server.hz, 2886 (unsigned long) server.lruclock, 2887 server.executable ? server.executable : "", 2888 server.configfile ? server.configfile : ""); 2889 } 2890 2891 /* Clients */ 2892 if (allsections || defsections || !strcasecmp(section,"clients")) { 2893 if (sections++) info = sdscat(info,"\r\n"); 2894 info = sdscatprintf(info, 2895 "# Clients\r\n" 2896 "connected_clients:%lu\r\n" 2897 "client_longest_output_list:%lu\r\n" 2898 "client_biggest_input_buf:%lu\r\n" 2899 "blocked_clients:%d\r\n", 2900 listLength(server.clients)-listLength(server.slaves), 2901 lol, bib, 2902 server.bpop_blocked_clients); 2903 } 2904 2905 /* Memory */ 2906 if (allsections || defsections || !strcasecmp(section,"memory")) { 2907 char hmem[64]; 2908 char peak_hmem[64]; 2909 char total_system_hmem[64]; 2910 char used_memory_lua_hmem[64]; 2911 char used_memory_rss_hmem[64]; 2912 char maxmemory_hmem[64]; 2913 size_t zmalloc_used = zmalloc_used_memory(); 2914 size_t total_system_mem = server.system_memory_size; 2915 const char *evict_policy = maxmemoryToString(); 2916 long long memory_lua = (long long)lua_gc(server.lua,LUA_GCCOUNT,0)*1024; 2917 2918 /* Peak memory is updated from time to time by serverCron() so it 2919 * may happen that the instantaneous value is slightly bigger than 2920 * the peak value. This may confuse users, so we update the peak 2921 * if found smaller than the current memory usage. */ 2922 if (zmalloc_used > server.stat_peak_memory) 2923 server.stat_peak_memory = zmalloc_used; 2924 2925 bytesToHuman(hmem,zmalloc_used); 2926 bytesToHuman(peak_hmem,server.stat_peak_memory); 2927 bytesToHuman(total_system_hmem,total_system_mem); 2928 bytesToHuman(used_memory_lua_hmem,memory_lua); 2929 bytesToHuman(used_memory_rss_hmem,server.resident_set_size); 2930 bytesToHuman(maxmemory_hmem,server.maxmemory); 2931 2932 if (sections++) info = sdscat(info,"\r\n"); 2933 info = sdscatprintf(info, 2934 "# Memory\r\n" 2935 "used_memory:%zu\r\n" 2936 "used_memory_human:%s\r\n" 2937 "used_memory_rss:%zu\r\n" 2938 "used_memory_rss_human:%s\r\n" 2939 "used_memory_peak:%zu\r\n" 2940 "used_memory_peak_human:%s\r\n" 2941 "total_system_memory:%lu\r\n" 2942 "total_system_memory_human:%s\r\n" 2943 "used_memory_lua:%lld\r\n" 2944 "used_memory_lua_human:%s\r\n" 2945 "maxmemory:%lld\r\n" 2946 "maxmemory_human:%s\r\n" 2947 "maxmemory_policy:%s\r\n" 2948 "mem_fragmentation_ratio:%.2f\r\n" 2949 "mem_allocator:%s\r\n", 2950 zmalloc_used, 2951 hmem, 2952 server.resident_set_size, 2953 used_memory_rss_hmem, 2954 server.stat_peak_memory, 2955 peak_hmem, 2956 (unsigned long)total_system_mem, 2957 total_system_hmem, 2958 memory_lua, 2959 used_memory_lua_hmem, 2960 server.maxmemory, 2961 maxmemory_hmem, 2962 evict_policy, 2963 zmalloc_get_fragmentation_ratio(server.resident_set_size), 2964 ZMALLOC_LIB 2965 ); 2966 } 2967 2968 /* Persistence */ 2969 if (allsections || defsections || !strcasecmp(section,"persistence")) { 2970 if (sections++) info = sdscat(info,"\r\n"); 2971 info = sdscatprintf(info, 2972 "# Persistence\r\n" 2973 "loading:%d\r\n" 2974 "rdb_changes_since_last_save:%lld\r\n" 2975 "rdb_bgsave_in_progress:%d\r\n" 2976 "rdb_last_save_time:%jd\r\n" 2977 "rdb_last_bgsave_status:%s\r\n" 2978 "rdb_last_bgsave_time_sec:%jd\r\n" 2979 "rdb_current_bgsave_time_sec:%jd\r\n" 2980 "aof_enabled:%d\r\n" 2981 "aof_rewrite_in_progress:%d\r\n" 2982 "aof_rewrite_scheduled:%d\r\n" 2983 "aof_last_rewrite_time_sec:%jd\r\n" 2984 "aof_current_rewrite_time_sec:%jd\r\n" 2985 "aof_last_bgrewrite_status:%s\r\n" 2986 "aof_last_write_status:%s\r\n", 2987 server.loading, 2988 server.dirty, 2989 server.rdb_child_pid != -1, 2990 (intmax_t)server.lastsave, 2991 (server.lastbgsave_status == C_OK) ? "ok" : "err", 2992 (intmax_t)server.rdb_save_time_last, 2993 (intmax_t)((server.rdb_child_pid == -1) ? 2994 -1 : time(NULL)-server.rdb_save_time_start), 2995 server.aof_state != AOF_OFF, 2996 server.aof_child_pid != -1, 2997 server.aof_rewrite_scheduled, 2998 (intmax_t)server.aof_rewrite_time_last, 2999 (intmax_t)((server.aof_child_pid == -1) ? 3000 -1 : time(NULL)-server.aof_rewrite_time_start), 3001 (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err", 3002 (server.aof_last_write_status == C_OK) ? "ok" : "err"); 3003 3004 if (server.aof_state != AOF_OFF) { 3005 info = sdscatprintf(info, 3006 "aof_current_size:%lld\r\n" 3007 "aof_base_size:%lld\r\n" 3008 "aof_pending_rewrite:%d\r\n" 3009 "aof_buffer_length:%zu\r\n" 3010 "aof_rewrite_buffer_length:%lu\r\n" 3011 "aof_pending_bio_fsync:%llu\r\n" 3012 "aof_delayed_fsync:%lu\r\n", 3013 (long long) server.aof_current_size, 3014 (long long) server.aof_rewrite_base_size, 3015 server.aof_rewrite_scheduled, 3016 sdslen(server.aof_buf), 3017 aofRewriteBufferSize(), 3018 bioPendingJobsOfType(BIO_AOF_FSYNC), 3019 server.aof_delayed_fsync); 3020 } 3021 3022 if (server.loading) { 3023 double perc; 3024 time_t eta, elapsed; 3025 off_t remaining_bytes = server.loading_total_bytes- 3026 server.loading_loaded_bytes; 3027 3028 perc = ((double)server.loading_loaded_bytes / 3029 (server.loading_total_bytes+1)) * 100; 3030 3031 elapsed = time(NULL)-server.loading_start_time; 3032 if (elapsed == 0) { 3033 eta = 1; /* A fake 1 second figure if we don't have 3034 enough info */ 3035 } else { 3036 eta = (elapsed*remaining_bytes)/(server.loading_loaded_bytes+1); 3037 } 3038 3039 info = sdscatprintf(info, 3040 "loading_start_time:%jd\r\n" 3041 "loading_total_bytes:%llu\r\n" 3042 "loading_loaded_bytes:%llu\r\n" 3043 "loading_loaded_perc:%.2f\r\n" 3044 "loading_eta_seconds:%jd\r\n", 3045 (intmax_t) server.loading_start_time, 3046 (unsigned long long) server.loading_total_bytes, 3047 (unsigned long long) server.loading_loaded_bytes, 3048 perc, 3049 (intmax_t)eta 3050 ); 3051 } 3052 } 3053 3054 /* Stats */ 3055 if (allsections || defsections || !strcasecmp(section,"stats")) { 3056 if (sections++) info = sdscat(info,"\r\n"); 3057 info = sdscatprintf(info, 3058 "# Stats\r\n" 3059 "total_connections_received:%lld\r\n" 3060 "total_commands_processed:%lld\r\n" 3061 "instantaneous_ops_per_sec:%lld\r\n" 3062 "total_net_input_bytes:%lld\r\n" 3063 "total_net_output_bytes:%lld\r\n" 3064 "instantaneous_input_kbps:%.2f\r\n" 3065 "instantaneous_output_kbps:%.2f\r\n" 3066 "rejected_connections:%lld\r\n" 3067 "sync_full:%lld\r\n" 3068 "sync_partial_ok:%lld\r\n" 3069 "sync_partial_err:%lld\r\n" 3070 "expired_keys:%lld\r\n" 3071 "evicted_keys:%lld\r\n" 3072 "keyspace_hits:%lld\r\n" 3073 "keyspace_misses:%lld\r\n" 3074 "pubsub_channels:%ld\r\n" 3075 "pubsub_patterns:%lu\r\n" 3076 "latest_fork_usec:%lld\r\n" 3077 "migrate_cached_sockets:%ld\r\n", 3078 server.stat_numconnections, 3079 server.stat_numcommands, 3080 getInstantaneousMetric(STATS_METRIC_COMMAND), 3081 server.stat_net_input_bytes, 3082 server.stat_net_output_bytes, 3083 (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024, 3084 (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024, 3085 server.stat_rejected_conn, 3086 server.stat_sync_full, 3087 server.stat_sync_partial_ok, 3088 server.stat_sync_partial_err, 3089 server.stat_expiredkeys, 3090 server.stat_evictedkeys, 3091 server.stat_keyspace_hits, 3092 server.stat_keyspace_misses, 3093 dictSize(server.pubsub_channels), 3094 listLength(server.pubsub_patterns), 3095 server.stat_fork_time, 3096 dictSize(server.migrate_cached_sockets)); 3097 } 3098 3099 /* Replication */ 3100 if (allsections || defsections || !strcasecmp(section,"replication")) { 3101 if (sections++) info = sdscat(info,"\r\n"); 3102 info = sdscatprintf(info, 3103 "# Replication\r\n" 3104 "role:%s\r\n", 3105 server.masterhost == NULL ? "master" : "slave"); 3106 if (server.masterhost) { 3107 long long slave_repl_offset = 1; 3108 3109 if (server.master) 3110 slave_repl_offset = server.master->reploff; 3111 else if (server.cached_master) 3112 slave_repl_offset = server.cached_master->reploff; 3113 3114 info = sdscatprintf(info, 3115 "master_host:%s\r\n" 3116 "master_port:%d\r\n" 3117 "master_link_status:%s\r\n" 3118 "master_last_io_seconds_ago:%d\r\n" 3119 "master_sync_in_progress:%d\r\n" 3120 "slave_repl_offset:%lld\r\n" 3121 ,server.masterhost, 3122 server.masterport, 3123 (server.repl_state == REPL_STATE_CONNECTED) ? 3124 "up" : "down", 3125 server.master ? 3126 ((int)(server.unixtime-server.master->lastinteraction)) : -1, 3127 server.repl_state == REPL_STATE_TRANSFER, 3128 slave_repl_offset 3129 ); 3130 3131 if (server.repl_state == REPL_STATE_TRANSFER) { 3132 info = sdscatprintf(info, 3133 "master_sync_left_bytes:%lld\r\n" 3134 "master_sync_last_io_seconds_ago:%d\r\n" 3135 , (long long) 3136 (server.repl_transfer_size - server.repl_transfer_read), 3137 (int)(server.unixtime-server.repl_transfer_lastio) 3138 ); 3139 } 3140 3141 if (server.repl_state != REPL_STATE_CONNECTED) { 3142 info = sdscatprintf(info, 3143 "master_link_down_since_seconds:%jd\r\n", 3144 (intmax_t)server.unixtime-server.repl_down_since); 3145 } 3146 info = sdscatprintf(info, 3147 "slave_priority:%d\r\n" 3148 "slave_read_only:%d\r\n", 3149 server.slave_priority, 3150 server.repl_slave_ro); 3151 } 3152 3153 info = sdscatprintf(info, 3154 "connected_slaves:%lu\r\n", 3155 listLength(server.slaves)); 3156 3157 /* If min-slaves-to-write is active, write the number of slaves 3158 * currently considered 'good'. */ 3159 if (server.repl_min_slaves_to_write && 3160 server.repl_min_slaves_max_lag) { 3161 info = sdscatprintf(info, 3162 "min_slaves_good_slaves:%d\r\n", 3163 server.repl_good_slaves_count); 3164 } 3165 3166 if (listLength(server.slaves)) { 3167 int slaveid = 0; 3168 listNode *ln; 3169 listIter li; 3170 3171 listRewind(server.slaves,&li); 3172 while((ln = listNext(&li))) { 3173 client *slave = listNodeValue(ln); 3174 char *state = NULL; 3175 char ip[NET_IP_STR_LEN]; 3176 int port; 3177 long lag = 0; 3178 3179 if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1) continue; 3180 switch(slave->replstate) { 3181 case SLAVE_STATE_WAIT_BGSAVE_START: 3182 case SLAVE_STATE_WAIT_BGSAVE_END: 3183 state = "wait_bgsave"; 3184 break; 3185 case SLAVE_STATE_SEND_BULK: 3186 state = "send_bulk"; 3187 break; 3188 case SLAVE_STATE_ONLINE: 3189 state = "online"; 3190 break; 3191 } 3192 if (state == NULL) continue; 3193 if (slave->replstate == SLAVE_STATE_ONLINE) 3194 lag = time(NULL) - slave->repl_ack_time; 3195 3196 info = sdscatprintf(info, 3197 "slave%d:ip=%s,port=%d,state=%s," 3198 "offset=%lld,lag=%ld\r\n", 3199 slaveid,ip,slave->slave_listening_port,state, 3200 slave->repl_ack_off, lag); 3201 slaveid++; 3202 } 3203 } 3204 info = sdscatprintf(info, 3205 "master_repl_offset:%lld\r\n" 3206 "repl_backlog_active:%d\r\n" 3207 "repl_backlog_size:%lld\r\n" 3208 "repl_backlog_first_byte_offset:%lld\r\n" 3209 "repl_backlog_histlen:%lld\r\n", 3210 server.master_repl_offset, 3211 server.repl_backlog != NULL, 3212 server.repl_backlog_size, 3213 server.repl_backlog_off, 3214 server.repl_backlog_histlen); 3215 } 3216 3217 /* CPU */ 3218 if (allsections || defsections || !strcasecmp(section,"cpu")) { 3219 if (sections++) info = sdscat(info,"\r\n"); 3220 info = sdscatprintf(info, 3221 "# CPU\r\n" 3222 "used_cpu_sys:%.2f\r\n" 3223 "used_cpu_user:%.2f\r\n" 3224 "used_cpu_sys_children:%.2f\r\n" 3225 "used_cpu_user_children:%.2f\r\n", 3226 (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000, 3227 (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000, 3228 (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000, 3229 (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000); 3230 } 3231 3232 /* cmdtime */ 3233 if (allsections || !strcasecmp(section,"commandstats")) { 3234 if (sections++) info = sdscat(info,"\r\n"); 3235 info = sdscatprintf(info, "# Commandstats\r\n"); 3236 numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); 3237 for (j = 0; j < numcommands; j++) { 3238 struct redisCommand *c = redisCommandTable+j; 3239 3240 if (!c->calls) continue; 3241 info = sdscatprintf(info, 3242 "cmdstat_%s:calls=%lld,usec=%lld,usec_per_call=%.2f\r\n", 3243 c->name, c->calls, c->microseconds, 3244 (c->calls == 0) ? 0 : ((float)c->microseconds/c->calls)); 3245 } 3246 } 3247 3248 /* Cluster */ 3249 if (allsections || defsections || !strcasecmp(section,"cluster")) { 3250 if (sections++) info = sdscat(info,"\r\n"); 3251 info = sdscatprintf(info, 3252 "# Cluster\r\n" 3253 "cluster_enabled:%d\r\n", 3254 server.cluster_enabled); 3255 } 3256 3257 /* Key space */ 3258 if (allsections || defsections || !strcasecmp(section,"keyspace")) { 3259 if (sections++) info = sdscat(info,"\r\n"); 3260 info = sdscatprintf(info, "# Keyspace\r\n"); 3261 for (j = 0; j < server.dbnum; j++) { 3262 long long keys, vkeys; 3263 3264 keys = dictSize(server.db[j].dict); 3265 vkeys = dictSize(server.db[j].expires); 3266 if (keys || vkeys) { 3267 info = sdscatprintf(info, 3268 "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n", 3269 j, keys, vkeys, server.db[j].avg_ttl); 3270 } 3271 } 3272 } 3273 return info; 3274 } 3275 3276 void infoCommand(client *c) { 3277 char *section = c->argc == 2 ? c->argv[1]->ptr : "default"; 3278 3279 if (c->argc > 2) { 3280 addReply(c,shared.syntaxerr); 3281 return; 3282 } 3283 addReplyBulkSds(c, genRedisInfoString(section)); 3284 } 3285 3286 void monitorCommand(client *c) { 3287 /* ignore MONITOR if already slave or in monitor mode */ 3288 if (c->flags & CLIENT_SLAVE) return; 3289 3290 c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR); 3291 listAddNodeTail(server.monitors,c); 3292 addReply(c,shared.ok); 3293 } 3294 3295 /* ============================ Maxmemory directive ======================== */ 3296 3297 /* freeMemoryIfNeeded() gets called when 'maxmemory' is set on the config 3298 * file to limit the max memory used by the server, before processing a 3299 * command. 3300 * 3301 * The goal of the function is to free enough memory to keep Redis under the 3302 * configured memory limit. 3303 * 3304 * The function starts calculating how many bytes should be freed to keep 3305 * Redis under the limit, and enters a loop selecting the best keys to 3306 * evict accordingly to the configured policy. 3307 * 3308 * If all the bytes needed to return back under the limit were freed the 3309 * function returns C_OK, otherwise C_ERR is returned, and the caller 3310 * should block the execution of commands that will result in more memory 3311 * used by the server. 3312 * 3313 * ------------------------------------------------------------------------ 3314 * 3315 * LRU approximation algorithm 3316 * 3317 * Redis uses an approximation of the LRU algorithm that runs in constant 3318 * memory. Every time there is a key to expire, we sample N keys (with 3319 * N very small, usually in around 5) to populate a pool of best keys to 3320 * evict of M keys (the pool size is defined by MAXMEMORY_EVICTION_POOL_SIZE). 3321 * 3322 * The N keys sampled are added in the pool of good keys to expire (the one 3323 * with an old access time) if they are better than one of the current keys 3324 * in the pool. 3325 * 3326 * After the pool is populated, the best key we have in the pool is expired. 3327 * However note that we don't remove keys from the pool when they are deleted 3328 * so the pool may contain keys that no longer exist. 3329 * 3330 * When we try to evict a key, and all the entries in the pool don't exist 3331 * we populate it again. This time we'll be sure that the pool has at least 3332 * one key that can be evicted, if there is at least one key that can be 3333 * evicted in the whole database. */ 3334 3335 /* Create a new eviction pool. */ 3336 struct evictionPoolEntry *evictionPoolAlloc(void) { 3337 struct evictionPoolEntry *ep; 3338 int j; 3339 3340 ep = zmalloc(sizeof(*ep)*MAXMEMORY_EVICTION_POOL_SIZE); 3341 for (j = 0; j < MAXMEMORY_EVICTION_POOL_SIZE; j++) { 3342 ep[j].idle = 0; 3343 ep[j].key = NULL; 3344 } 3345 return ep; 3346 } 3347 3348 /* This is an helper function for freeMemoryIfNeeded(), it is used in order 3349 * to populate the evictionPool with a few entries every time we want to 3350 * expire a key. Keys with idle time smaller than one of the current 3351 * keys are added. Keys are always added if there are free entries. 3352 * 3353 * We insert keys on place in ascending order, so keys with the smaller 3354 * idle time are on the left, and keys with the higher idle time on the 3355 * right. */ 3356 3357 #define EVICTION_SAMPLES_ARRAY_SIZE 16 3358 void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) { 3359 int j, k, count; 3360 dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE]; 3361 dictEntry **samples; 3362 3363 /* Try to use a static buffer: this function is a big hit... 3364 * Note: it was actually measured that this helps. */ 3365 if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) { 3366 samples = _samples; 3367 } else { 3368 samples = zmalloc(sizeof(samples[0])*server.maxmemory_samples); 3369 } 3370 3371 count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples); 3372 for (j = 0; j < count; j++) { 3373 unsigned long long idle; 3374 sds key; 3375 robj *o; 3376 dictEntry *de; 3377 3378 de = samples[j]; 3379 key = dictGetKey(de); 3380 /* If the dictionary we are sampling from is not the main 3381 * dictionary (but the expires one) we need to lookup the key 3382 * again in the key dictionary to obtain the value object. */ 3383 if (sampledict != keydict) de = dictFind(keydict, key); 3384 o = dictGetVal(de); 3385 idle = estimateObjectIdleTime(o); 3386 3387 /* Insert the element inside the pool. 3388 * First, find the first empty bucket or the first populated 3389 * bucket that has an idle time smaller than our idle time. */ 3390 k = 0; 3391 while (k < MAXMEMORY_EVICTION_POOL_SIZE && 3392 pool[k].key && 3393 pool[k].idle < idle) k++; 3394 if (k == 0 && pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key != NULL) { 3395 /* Can't insert if the element is < the worst element we have 3396 * and there are no empty buckets. */ 3397 continue; 3398 } else if (k < MAXMEMORY_EVICTION_POOL_SIZE && pool[k].key == NULL) { 3399 /* Inserting into empty position. No setup needed before insert. */ 3400 } else { 3401 /* Inserting in the middle. Now k points to the first element 3402 * greater than the element to insert. */ 3403 if (pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key == NULL) { 3404 /* Free space on the right? Insert at k shifting 3405 * all the elements from k to end to the right. */ 3406 memmove(pool+k+1,pool+k, 3407 sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1)); 3408 } else { 3409 /* No free space on right? Insert at k-1 */ 3410 k--; 3411 /* Shift all elements on the left of k (included) to the 3412 * left, so we discard the element with smaller idle time. */ 3413 sdsfree(pool[0].key); 3414 memmove(pool,pool+1,sizeof(pool[0])*k); 3415 } 3416 } 3417 pool[k].key = sdsdup(key); 3418 pool[k].idle = idle; 3419 } 3420 if (samples != _samples) zfree(samples); 3421 } 3422 3423 int freeMemoryIfNeeded(void) { 3424 size_t mem_used, mem_tofree, mem_freed; 3425 int slaves = listLength(server.slaves); 3426 mstime_t latency, eviction_latency; 3427 3428 /* Remove the size of slaves output buffers and AOF buffer from the 3429 * count of used memory. */ 3430 mem_used = zmalloc_used_memory(); 3431 if (slaves) { 3432 listIter li; 3433 listNode *ln; 3434 3435 listRewind(server.slaves,&li); 3436 while((ln = listNext(&li))) { 3437 client *slave = listNodeValue(ln); 3438 unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave); 3439 if (obuf_bytes > mem_used) 3440 mem_used = 0; 3441 else 3442 mem_used -= obuf_bytes; 3443 } 3444 } 3445 if (server.aof_state != AOF_OFF) { 3446 mem_used -= sdslen(server.aof_buf); 3447 mem_used -= aofRewriteBufferSize(); 3448 } 3449 3450 /* Check if we are over the memory limit. */ 3451 if (mem_used <= server.maxmemory) return C_OK; 3452 3453 if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) 3454 return C_ERR; /* We need to free memory, but policy forbids. */ 3455 3456 /* Compute how much memory we need to free. */ 3457 mem_tofree = mem_used - server.maxmemory; 3458 mem_freed = 0; 3459 latencyStartMonitor(latency); 3460 while (mem_freed < mem_tofree) { 3461 int j, k, keys_freed = 0; 3462 3463 for (j = 0; j < server.dbnum; j++) { 3464 long bestval = 0; /* just to prevent warning */ 3465 sds bestkey = NULL; 3466 dictEntry *de; 3467 redisDb *db = server.db+j; 3468 dict *dict; 3469 3470 if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU || 3471 server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) 3472 { 3473 dict = server.db[j].dict; 3474 } else { 3475 dict = server.db[j].expires; 3476 } 3477 if (dictSize(dict) == 0) continue; 3478 3479 /* volatile-random and allkeys-random policy */ 3480 if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || 3481 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) 3482 { 3483 de = dictGetRandomKey(dict); 3484 bestkey = dictGetKey(de); 3485 } 3486 3487 /* volatile-lru and allkeys-lru policy */ 3488 else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU || 3489 server.maxmemory_policy == MAXMEMORY_VOLATILE_LRU) 3490 { 3491 struct evictionPoolEntry *pool = db->eviction_pool; 3492 3493 while(bestkey == NULL) { 3494 evictionPoolPopulate(dict, db->dict, db->eviction_pool); 3495 /* Go backward from best to worst element to evict. */ 3496 for (k = MAXMEMORY_EVICTION_POOL_SIZE-1; k >= 0; k--) { 3497 if (pool[k].key == NULL) continue; 3498 de = dictFind(dict,pool[k].key); 3499 3500 /* Remove the entry from the pool. */ 3501 sdsfree(pool[k].key); 3502 /* Shift all elements on its right to left. */ 3503 memmove(pool+k,pool+k+1, 3504 sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1)); 3505 /* Clear the element on the right which is empty 3506 * since we shifted one position to the left. */ 3507 pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key = NULL; 3508 pool[MAXMEMORY_EVICTION_POOL_SIZE-1].idle = 0; 3509 3510 /* If the key exists, is our pick. Otherwise it is 3511 * a ghost and we need to try the next element. */ 3512 if (de) { 3513 bestkey = dictGetKey(de); 3514 break; 3515 } else { 3516 /* Ghost... */ 3517 continue; 3518 } 3519 } 3520 } 3521 } 3522 3523 /* volatile-ttl */ 3524 else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { 3525 for (k = 0; k < server.maxmemory_samples; k++) { 3526 sds thiskey; 3527 long thisval; 3528 3529 de = dictGetRandomKey(dict); 3530 thiskey = dictGetKey(de); 3531 thisval = (long) dictGetVal(de); 3532 3533 /* Expire sooner (minor expire unix timestamp) is better 3534 * candidate for deletion */ 3535 if (bestkey == NULL || thisval < bestval) { 3536 bestkey = thiskey; 3537 bestval = thisval; 3538 } 3539 } 3540 } 3541 3542 /* Finally remove the selected key. */ 3543 if (bestkey) { 3544 long long delta; 3545 3546 robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); 3547 propagateExpire(db,keyobj); 3548 /* We compute the amount of memory freed by dbDelete() alone. 3549 * It is possible that actually the memory needed to propagate 3550 * the DEL in AOF and replication link is greater than the one 3551 * we are freeing removing the key, but we can't account for 3552 * that otherwise we would never exit the loop. 3553 * 3554 * AOF and Output buffer memory will be freed eventually so 3555 * we only care about memory used by the key space. */ 3556 delta = (long long) zmalloc_used_memory(); 3557 latencyStartMonitor(eviction_latency); 3558 dbDelete(db,keyobj); 3559 latencyEndMonitor(eviction_latency); 3560 latencyAddSampleIfNeeded("eviction-del",eviction_latency); 3561 latencyRemoveNestedEvent(latency,eviction_latency); 3562 delta -= (long long) zmalloc_used_memory(); 3563 mem_freed += delta; 3564 server.stat_evictedkeys++; 3565 notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", 3566 keyobj, db->id); 3567 decrRefCount(keyobj); 3568 keys_freed++; 3569 3570 /* When the memory to free starts to be big enough, we may 3571 * start spending so much time here that is impossible to 3572 * deliver data to the slaves fast enough, so we force the 3573 * transmission here inside the loop. */ 3574 if (slaves) flushSlavesOutputBuffers(); 3575 } 3576 } 3577 if (!keys_freed) { 3578 latencyEndMonitor(latency); 3579 latencyAddSampleIfNeeded("eviction-cycle",latency); 3580 return C_ERR; /* nothing to free... */ 3581 } 3582 } 3583 latencyEndMonitor(latency); 3584 latencyAddSampleIfNeeded("eviction-cycle",latency); 3585 return C_OK; 3586 } 3587 3588 /* =================================== Main! ================================ */ 3589 3590 #ifdef __linux__ 3591 int linuxOvercommitMemoryValue(void) { 3592 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r"); 3593 char buf[64]; 3594 3595 if (!fp) return -1; 3596 if (fgets(buf,64,fp) == NULL) { 3597 fclose(fp); 3598 return -1; 3599 } 3600 fclose(fp); 3601 3602 return atoi(buf); 3603 } 3604 3605 void linuxMemoryWarnings(void) { 3606 if (linuxOvercommitMemoryValue() == 0) { 3607 serverLog(LL_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect."); 3608 } 3609 if (THPIsEnabled()) { 3610 serverLog(LL_WARNING,"WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled."); 3611 } 3612 } 3613 #endif /* __linux__ */ 3614 3615 void createPidFile(void) { 3616 /* If pidfile requested, but no pidfile defined, use 3617 * default pidfile path */ 3618 if (!server.pidfile) server.pidfile = zstrdup(CONFIG_DEFAULT_PID_FILE); 3619 3620 /* Try to write the pid file in a best-effort way. */ 3621 FILE *fp = fopen(server.pidfile,"w"); 3622 if (fp) { 3623 fprintf(fp,"%d\n",(int)getpid()); 3624 fclose(fp); 3625 } 3626 } 3627 3628 void daemonize(void) { 3629 int fd; 3630 3631 if (fork() != 0) exit(0); /* parent exits */ 3632 setsid(); /* create a new session */ 3633 3634 /* Every output goes to /dev/null. If Redis is daemonized but 3635 * the 'logfile' is set to 'stdout' in the configuration file 3636 * it will not log at all. */ 3637 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) { 3638 dup2(fd, STDIN_FILENO); 3639 dup2(fd, STDOUT_FILENO); 3640 dup2(fd, STDERR_FILENO); 3641 if (fd > STDERR_FILENO) close(fd); 3642 } 3643 } 3644 3645 void version(void) { 3646 printf("Redis server v=%s sha=%s:%d malloc=%s bits=%d build=%llx\n", 3647 REDIS_VERSION, 3648 redisGitSHA1(), 3649 atoi(redisGitDirty()) > 0, 3650 ZMALLOC_LIB, 3651 sizeof(long) == 4 ? 32 : 64, 3652 (unsigned long long) redisBuildId()); 3653 exit(0); 3654 } 3655 3656 void usage(void) { 3657 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf] [options]\n"); 3658 fprintf(stderr," ./redis-server - (read config from stdin)\n"); 3659 fprintf(stderr," ./redis-server -v or --version\n"); 3660 fprintf(stderr," ./redis-server -h or --help\n"); 3661 fprintf(stderr," ./redis-server --test-memory <megabytes>\n\n"); 3662 fprintf(stderr,"Examples:\n"); 3663 fprintf(stderr," ./redis-server (run the server with default conf)\n"); 3664 fprintf(stderr," ./redis-server /etc/redis/6379.conf\n"); 3665 fprintf(stderr," ./redis-server --port 7777\n"); 3666 fprintf(stderr," ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n"); 3667 fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n\n"); 3668 fprintf(stderr,"Sentinel mode:\n"); 3669 fprintf(stderr," ./redis-server /etc/sentinel.conf --sentinel\n"); 3670 exit(1); 3671 } 3672 3673 void redisAsciiArt(void) { 3674 #include "asciilogo.h" 3675 char *buf = zmalloc(1024*16); 3676 char *mode; 3677 3678 if (server.cluster_enabled) mode = "cluster"; 3679 else if (server.sentinel_mode) mode = "sentinel"; 3680 else mode = "standalone"; 3681 3682 if (server.syslog_enabled) { 3683 serverLog(LL_NOTICE, 3684 "Redis %s (%s/%d) %s bit, %s mode, port %d, pid %ld ready to start.", 3685 REDIS_VERSION, 3686 redisGitSHA1(), 3687 strtol(redisGitDirty(),NULL,10) > 0, 3688 (sizeof(long) == 8) ? "64" : "32", 3689 mode, server.port, 3690 (long) getpid() 3691 ); 3692 } else { 3693 snprintf(buf,1024*16,ascii_logo, 3694 REDIS_VERSION, 3695 redisGitSHA1(), 3696 strtol(redisGitDirty(),NULL,10) > 0, 3697 (sizeof(long) == 8) ? "64" : "32", 3698 mode, server.port, 3699 (long) getpid() 3700 ); 3701 serverLogRaw(LL_NOTICE|LL_RAW,buf); 3702 } 3703 zfree(buf); 3704 } 3705 3706 static void sigShutdownHandler(int sig) { 3707 char *msg; 3708 3709 switch (sig) { 3710 case SIGINT: 3711 msg = "Received SIGINT scheduling shutdown..."; 3712 break; 3713 case SIGTERM: 3714 msg = "Received SIGTERM scheduling shutdown..."; 3715 break; 3716 default: 3717 msg = "Received shutdown signal, scheduling shutdown..."; 3718 }; 3719 3720 /* SIGINT is often delivered via Ctrl+C in an interactive session. 3721 * If we receive the signal the second time, we interpret this as 3722 * the user really wanting to quit ASAP without waiting to persist 3723 * on disk. */ 3724 if (server.shutdown_asap && sig == SIGINT) { 3725 serverLogFromHandler(LL_WARNING, "You insist... exiting now."); 3726 rdbRemoveTempFile(getpid()); 3727 exit(1); /* Exit with an error since this was not a clean shutdown. */ 3728 } else if (server.loading) { 3729 exit(0); 3730 } 3731 3732 serverLogFromHandler(LL_WARNING, msg); 3733 server.shutdown_asap = 1; 3734 } 3735 3736 void setupSignalHandlers(void) { 3737 struct sigaction act; 3738 3739 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. 3740 * Otherwise, sa_handler is used. */ 3741 sigemptyset(&act.sa_mask); 3742 act.sa_flags = 0; 3743 act.sa_handler = sigShutdownHandler; 3744 sigaction(SIGTERM, &act, NULL); 3745 sigaction(SIGINT, &act, NULL); 3746 3747 #ifdef HAVE_BACKTRACE 3748 sigemptyset(&act.sa_mask); 3749 act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO; 3750 act.sa_sigaction = sigsegvHandler; 3751 sigaction(SIGSEGV, &act, NULL); 3752 sigaction(SIGBUS, &act, NULL); 3753 sigaction(SIGFPE, &act, NULL); 3754 sigaction(SIGILL, &act, NULL); 3755 #endif 3756 return; 3757 } 3758 3759 void memtest(size_t megabytes, int passes); 3760 3761 /* Returns 1 if there is --sentinel among the arguments or if 3762 * argv[0] is exactly "redis-sentinel". */ 3763 int checkForSentinelMode(int argc, char **argv) { 3764 int j; 3765 3766 if (strstr(argv[0],"redis-sentinel") != NULL) return 1; 3767 for (j = 1; j < argc; j++) 3768 if (!strcmp(argv[j],"--sentinel")) return 1; 3769 return 0; 3770 } 3771 3772 /* Function called at startup to load RDB or AOF file in memory. */ 3773 void loadDataFromDisk(void) { 3774 long long start = ustime(); 3775 if (server.aof_state == AOF_ON) { 3776 if (loadAppendOnlyFile(server.aof_filename) == C_OK) 3777 serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); 3778 } else { 3779 if (rdbLoad(server.rdb_filename) == C_OK) { 3780 serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", 3781 (float)(ustime()-start)/1000000); 3782 } else if (errno != ENOENT) { 3783 serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); 3784 exit(1); 3785 } 3786 } 3787 } 3788 3789 void redisOutOfMemoryHandler(size_t allocation_size) { 3790 serverLog(LL_WARNING,"Out Of Memory allocating %zu bytes!", 3791 allocation_size); 3792 serverPanic("Redis aborting for OUT OF MEMORY"); 3793 } 3794 3795 void redisSetProcTitle(char *title) { 3796 #ifdef USE_SETPROCTITLE 3797 char *server_mode = ""; 3798 if (server.cluster_enabled) server_mode = " [cluster]"; 3799 else if (server.sentinel_mode) server_mode = " [sentinel]"; 3800 3801 setproctitle("%s %s:%d%s", 3802 title, 3803 server.bindaddr_count ? server.bindaddr[0] : "*", 3804 server.port, 3805 server_mode); 3806 #else 3807 UNUSED(title); 3808 #endif 3809 } 3810 3811 /* 3812 * Check whether systemd or upstart have been used to start redis. 3813 */ 3814 3815 int redisSupervisedUpstart(void) { 3816 const char *upstart_job = getenv("UPSTART_JOB"); 3817 3818 if (!upstart_job) { 3819 serverLog(LL_WARNING, 3820 "upstart supervision requested, but UPSTART_JOB not found"); 3821 return 0; 3822 } 3823 3824 serverLog(LL_NOTICE, "supervised by upstart, will stop to signal readiness"); 3825 raise(SIGSTOP); 3826 unsetenv("UPSTART_JOB"); 3827 return 1; 3828 } 3829 3830 int redisSupervisedSystemd(void) { 3831 const char *notify_socket = getenv("NOTIFY_SOCKET"); 3832 int fd = 1; 3833 struct sockaddr_un su; 3834 struct iovec iov; 3835 struct msghdr hdr; 3836 int sendto_flags = 0; 3837 3838 if (!notify_socket) { 3839 serverLog(LL_WARNING, 3840 "systemd supervision requested, but NOTIFY_SOCKET not found"); 3841 return 0; 3842 } 3843 3844 if ((strchr("@/", notify_socket[0])) == NULL || strlen(notify_socket) < 2) { 3845 return 0; 3846 } 3847 3848 serverLog(LL_NOTICE, "supervised by systemd, will signal readiness"); 3849 if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) { 3850 serverLog(LL_WARNING, 3851 "Can't connect to systemd socket %s", notify_socket); 3852 return 0; 3853 } 3854 3855 memset(&su, 0, sizeof(su)); 3856 su.sun_family = AF_UNIX; 3857 strncpy (su.sun_path, notify_socket, sizeof(su.sun_path) -1); 3858 su.sun_path[sizeof(su.sun_path) - 1] = '\0'; 3859 3860 if (notify_socket[0] == '@') 3861 su.sun_path[0] = '\0'; 3862 3863 memset(&iov, 0, sizeof(iov)); 3864 iov.iov_base = "READY=1"; 3865 iov.iov_len = strlen("READY=1"); 3866 3867 memset(&hdr, 0, sizeof(hdr)); 3868 hdr.msg_name = &su; 3869 hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) + 3870 strlen(notify_socket); 3871 hdr.msg_iov = &iov; 3872 hdr.msg_iovlen = 1; 3873 3874 unsetenv("NOTIFY_SOCKET"); 3875 #ifdef HAVE_MSG_NOSIGNAL 3876 sendto_flags |= MSG_NOSIGNAL; 3877 #endif 3878 if (sendmsg(fd, &hdr, sendto_flags) < 0) { 3879 serverLog(LL_WARNING, "Can't send notification to systemd"); 3880 close(fd); 3881 return 0; 3882 } 3883 close(fd); 3884 return 1; 3885 } 3886 3887 int redisIsSupervised(int mode) { 3888 if (mode == SUPERVISED_AUTODETECT) { 3889 const char *upstart_job = getenv("UPSTART_JOB"); 3890 const char *notify_socket = getenv("NOTIFY_SOCKET"); 3891 3892 if (upstart_job) { 3893 redisSupervisedUpstart(); 3894 } else if (notify_socket) { 3895 redisSupervisedSystemd(); 3896 } 3897 } else if (mode == SUPERVISED_UPSTART) { 3898 return redisSupervisedUpstart(); 3899 } else if (mode == SUPERVISED_SYSTEMD) { 3900 return redisSupervisedSystemd(); 3901 } 3902 3903 return 0; 3904 } 3905 3906 3907 int main(int argc, char **argv) { 3908 struct timeval tv; 3909 int j; 3910 3911 #ifdef REDIS_TEST 3912 if (argc == 3 && !strcasecmp(argv[1], "test")) { 3913 if (!strcasecmp(argv[2], "ziplist")) { 3914 return ziplistTest(argc, argv); 3915 } else if (!strcasecmp(argv[2], "quicklist")) { 3916 quicklistTest(argc, argv); 3917 } else if (!strcasecmp(argv[2], "intset")) { 3918 return intsetTest(argc, argv); 3919 } else if (!strcasecmp(argv[2], "zipmap")) { 3920 return zipmapTest(argc, argv); 3921 } else if (!strcasecmp(argv[2], "sha1test")) { 3922 return sha1Test(argc, argv); 3923 } else if (!strcasecmp(argv[2], "util")) { 3924 return utilTest(argc, argv); 3925 } else if (!strcasecmp(argv[2], "sds")) { 3926 return sdsTest(argc, argv); 3927 } else if (!strcasecmp(argv[2], "endianconv")) { 3928 return endianconvTest(argc, argv); 3929 } else if (!strcasecmp(argv[2], "crc64")) { 3930 return crc64Test(argc, argv); 3931 } 3932 3933 return -1; /* test not found */ 3934 } 3935 #endif 3936 3937 /* We need to initialize our libraries, and the server configuration. */ 3938 #ifdef INIT_SETPROCTITLE_REPLACEMENT 3939 spt_init(argc, argv); 3940 #endif 3941 setlocale(LC_COLLATE,""); 3942 zmalloc_enable_thread_safeness(); 3943 zmalloc_set_oom_handler(redisOutOfMemoryHandler); 3944 srand(time(NULL)^getpid()); 3945 gettimeofday(&tv,NULL); 3946 dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid()); 3947 server.sentinel_mode = checkForSentinelMode(argc,argv); 3948 initServerConfig(); 3949 3950 /* Store the executable path and arguments in a safe place in order 3951 * to be able to restart the server later. */ 3952 server.executable = getAbsolutePath(argv[0]); 3953 server.exec_argv = zmalloc(sizeof(char*)*(argc+1)); 3954 server.exec_argv[argc] = NULL; 3955 for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]); 3956 3957 /* We need to init sentinel right now as parsing the configuration file 3958 * in sentinel mode will have the effect of populating the sentinel 3959 * data structures with master nodes to monitor. */ 3960 if (server.sentinel_mode) { 3961 initSentinelConfig(); 3962 initSentinel(); 3963 } 3964 3965 /* Check if we need to start in redis-check-rdb mode. We just execute 3966 * the program main. However the program is part of the Redis executable 3967 * so that we can easily execute an RDB check on loading errors. */ 3968 if (strstr(argv[0],"redis-check-rdb") != NULL) 3969 exit(redis_check_rdb_main(argv,argc)); 3970 3971 if (argc >= 2) { 3972 j = 1; /* First option to parse in argv[] */ 3973 sds options = sdsempty(); 3974 char *configfile = NULL; 3975 3976 /* Handle special options --help and --version */ 3977 if (strcmp(argv[1], "-v") == 0 || 3978 strcmp(argv[1], "--version") == 0) version(); 3979 if (strcmp(argv[1], "--help") == 0 || 3980 strcmp(argv[1], "-h") == 0) usage(); 3981 if (strcmp(argv[1], "--test-memory") == 0) { 3982 if (argc == 3) { 3983 memtest(atoi(argv[2]),50); 3984 exit(0); 3985 } else { 3986 fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n"); 3987 fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n"); 3988 exit(1); 3989 } 3990 } 3991 3992 /* First argument is the config file name? */ 3993 if (argv[j][0] != '-' || argv[j][1] != '-') { 3994 configfile = argv[j]; 3995 server.configfile = getAbsolutePath(configfile); 3996 /* Replace the config file in server.exec_argv with 3997 * its absoulte path. */ 3998 zfree(server.exec_argv[j]); 3999 server.exec_argv[j] = zstrdup(server.configfile); 4000 j++; 4001 } 4002 4003 /* All the other options are parsed and conceptually appended to the 4004 * configuration file. For instance --port 6380 will generate the 4005 * string "port 6380\n" to be parsed after the actual file name 4006 * is parsed, if any. */ 4007 while(j != argc) { 4008 if (argv[j][0] == '-' && argv[j][1] == '-') { 4009 /* Option name */ 4010 if (!strcmp(argv[j], "--check-rdb")) { 4011 /* Argument has no options, need to skip for parsing. */ 4012 j++; 4013 continue; 4014 } 4015 if (sdslen(options)) options = sdscat(options,"\n"); 4016 options = sdscat(options,argv[j]+2); 4017 options = sdscat(options," "); 4018 } else { 4019 /* Option argument */ 4020 options = sdscatrepr(options,argv[j],strlen(argv[j])); 4021 options = sdscat(options," "); 4022 } 4023 j++; 4024 } 4025 if (server.sentinel_mode && configfile && *configfile == '-') { 4026 serverLog(LL_WARNING, 4027 "Sentinel config from STDIN not allowed."); 4028 serverLog(LL_WARNING, 4029 "Sentinel needs config file on disk to save state. Exiting..."); 4030 exit(1); 4031 } 4032 resetServerSaveParams(); 4033 loadServerConfig(configfile,options); 4034 sdsfree(options); 4035 } else { 4036 serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis"); 4037 } 4038 4039 server.supervised = redisIsSupervised(server.supervised_mode); 4040 int background = server.daemonize && !server.supervised; 4041 if (background) daemonize(); 4042 4043 initServer(); 4044 if (background || server.pidfile) createPidFile(); 4045 redisSetProcTitle(argv[0]); 4046 redisAsciiArt(); 4047 checkTcpBacklogSettings(); 4048 4049 if (!server.sentinel_mode) { 4050 /* Things not needed when running in Sentinel mode. */ 4051 serverLog(LL_WARNING,"Server started, Redis version " REDIS_VERSION); 4052 #ifdef __linux__ 4053 linuxMemoryWarnings(); 4054 #endif 4055 loadDataFromDisk(); 4056 if (server.cluster_enabled) { 4057 if (verifyClusterConfigWithData() == C_ERR) { 4058 serverLog(LL_WARNING, 4059 "You can't have keys in a DB different than DB 0 when in " 4060 "Cluster mode. Exiting."); 4061 exit(1); 4062 } 4063 } 4064 if (server.ipfd_count > 0) 4065 serverLog(LL_NOTICE,"The server is now ready to accept connections on port %d", server.port); 4066 if (server.sofd > 0) 4067 serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); 4068 } else { 4069 sentinelIsRunning(); 4070 } 4071 4072 /* Warning the user about suspicious maxmemory setting. */ 4073 if (server.maxmemory > 0 && server.maxmemory < 1024*1024) { 4074 serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); 4075 } 4076 4077 aeSetBeforeSleepProc(server.el,beforeSleep); 4078 aeMain(server.el); 4079 aeDeleteEventLoop(server.el); 4080 return 0; 4081 } 4082 4083 /* The End */ 4084