1 /* 2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * * Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * * Neither the name of Redis nor the names of its contributors may be used 14 * to endorse or promote products derived from this software without 15 * specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27 * POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "server.h" 31 #include "cluster.h" 32 #include "slowlog.h" 33 #include "bio.h" 34 #include "latency.h" 35 36 #include <time.h> 37 #include <signal.h> 38 #include <sys/wait.h> 39 #include <errno.h> 40 #include <assert.h> 41 #include <ctype.h> 42 #include <stdarg.h> 43 #include <arpa/inet.h> 44 #include <sys/stat.h> 45 #include <fcntl.h> 46 #include <sys/time.h> 47 #include <sys/resource.h> 48 #include <sys/uio.h> 49 #include <sys/un.h> 50 #include <limits.h> 51 #include <float.h> 52 #include <math.h> 53 #include <sys/resource.h> 54 #include <sys/utsname.h> 55 #include <locale.h> 56 #include <sys/socket.h> 57 58 /* Our shared "common" objects */ 59 60 struct sharedObjectsStruct shared; 61 62 /* Global vars that are actually used as constants. The following double 63 * values are used for double on-disk serialization, and are initialized 64 * at runtime to avoid strange compiler optimizations. */ 65 66 double R_Zero, R_PosInf, R_NegInf, R_Nan; 67 68 /*================================= Globals ================================= */ 69 70 /* Global vars */ 71 struct redisServer server; /* server global state */ 72 73 /* Our command table. 74 * 75 * Every entry is composed of the following fields: 76 * 77 * name: a string representing the command name. 78 * function: pointer to the C function implementing the command. 79 * arity: number of arguments, it is possible to use -N to say >= N 80 * sflags: command flags as string. See below for a table of flags. 81 * flags: flags as bitmask. Computed by Redis using the 'sflags' field. 82 * get_keys_proc: an optional function to get key arguments from a command. 83 * This is only used when the following three fields are not 84 * enough to specify what arguments are keys. 85 * first_key_index: first argument that is a key 86 * last_key_index: last argument that is a key 87 * key_step: step to get all the keys from first to last argument. For instance 88 * in MSET the step is two since arguments are key,val,key,val,... 89 * microseconds: microseconds of total execution time for this command. 90 * calls: total number of calls of this command. 91 * 92 * The flags, microseconds and calls fields are computed by Redis and should 93 * always be set to zero. 94 * 95 * Command flags are expressed using strings where every character represents 96 * a flag. Later the populateCommandTable() function will take care of 97 * populating the real 'flags' field using this characters. 98 * 99 * This is the meaning of the flags: 100 * 101 * w: write command (may modify the key space). 102 * r: read command (will never modify the key space). 103 * m: may increase memory usage once called. Don't allow if out of memory. 104 * a: admin command, like SAVE or SHUTDOWN. 105 * p: Pub/Sub related command. 106 * f: force replication of this command, regardless of server.dirty. 107 * s: command not allowed in scripts. 108 * R: random command. Command is not deterministic, that is, the same command 109 * with the same arguments, with the same key space, may have different 110 * results. For instance SPOP and RANDOMKEY are two random commands. 111 * S: Sort command output array if called from script, so that the output 112 * is deterministic. 113 * l: Allow command while loading the database. 114 * t: Allow command while a slave has stale data but is not allowed to 115 * server this data. Normally no command is accepted in this condition 116 * but just a few. 117 * M: Do not automatically propagate the command on MONITOR. 118 * k: Perform an implicit ASKING for this command, so the command will be 119 * accepted in cluster mode if the slot is marked as 'importing'. 120 * F: Fast command: O(1) or O(log(N)) command that should never delay 121 * its execution as long as the kernel scheduler is giving us time. 122 * Note that commands that may trigger a DEL as a side effect (like SET) 123 * are not fast commands. 124 */ 125 struct redisCommand redisCommandTable[] = { 126 {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, 127 {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}, 128 {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0}, 129 {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0}, 130 {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0}, 131 {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0}, 132 {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0}, 133 {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0}, 134 {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0}, 135 {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0}, 136 {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0}, 137 {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0}, 138 {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0}, 139 {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0}, 140 {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0}, 141 {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0}, 142 {"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0}, 143 {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, 144 {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, 145 {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0}, 146 {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0}, 147 {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0}, 148 {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0}, 149 {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0}, 150 {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0}, 151 {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0}, 152 {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0}, 153 {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0}, 154 {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0}, 155 {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0}, 156 {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0}, 157 {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0}, 158 {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0}, 159 {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0}, 160 {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0}, 161 {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0}, 162 {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0}, 163 {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0}, 164 {"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0}, 165 {"spop",spopCommand,-2,"wRsF",0,NULL,1,1,1,0,0}, 166 {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0}, 167 {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0}, 168 {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, 169 {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0}, 170 {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, 171 {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0}, 172 {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, 173 {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0}, 174 {"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0}, 175 {"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0}, 176 {"zincrby",zincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0}, 177 {"zrem",zremCommand,-3,"wF",0,NULL,1,1,1,0,0}, 178 {"zremrangebyscore",zremrangebyscoreCommand,4,"w",0,NULL,1,1,1,0,0}, 179 {"zremrangebyrank",zremrangebyrankCommand,4,"w",0,NULL,1,1,1,0,0}, 180 {"zremrangebylex",zremrangebylexCommand,4,"w",0,NULL,1,1,1,0,0}, 181 {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0}, 182 {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0}, 183 {"zrange",zrangeCommand,-4,"r",0,NULL,1,1,1,0,0}, 184 {"zrangebyscore",zrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0}, 185 {"zrevrangebyscore",zrevrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0}, 186 {"zrangebylex",zrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0}, 187 {"zrevrangebylex",zrevrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0}, 188 {"zcount",zcountCommand,4,"rF",0,NULL,1,1,1,0,0}, 189 {"zlexcount",zlexcountCommand,4,"rF",0,NULL,1,1,1,0,0}, 190 {"zrevrange",zrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0}, 191 {"zcard",zcardCommand,2,"rF",0,NULL,1,1,1,0,0}, 192 {"zscore",zscoreCommand,3,"rF",0,NULL,1,1,1,0,0}, 193 {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0}, 194 {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0}, 195 {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0}, 196 {"hset",hsetCommand,4,"wmF",0,NULL,1,1,1,0,0}, 197 {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0}, 198 {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0}, 199 {"hmset",hmsetCommand,-4,"wm",0,NULL,1,1,1,0,0}, 200 {"hmget",hmgetCommand,-3,"r",0,NULL,1,1,1,0,0}, 201 {"hincrby",hincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0}, 202 {"hincrbyfloat",hincrbyfloatCommand,4,"wmF",0,NULL,1,1,1,0,0}, 203 {"hdel",hdelCommand,-3,"wF",0,NULL,1,1,1,0,0}, 204 {"hlen",hlenCommand,2,"rF",0,NULL,1,1,1,0,0}, 205 {"hstrlen",hstrlenCommand,3,"rF",0,NULL,1,1,1,0,0}, 206 {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0}, 207 {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0}, 208 {"hgetall",hgetallCommand,2,"r",0,NULL,1,1,1,0,0}, 209 {"hexists",hexistsCommand,3,"rF",0,NULL,1,1,1,0,0}, 210 {"hscan",hscanCommand,-3,"rR",0,NULL,1,1,1,0,0}, 211 {"incrby",incrbyCommand,3,"wmF",0,NULL,1,1,1,0,0}, 212 {"decrby",decrbyCommand,3,"wmF",0,NULL,1,1,1,0,0}, 213 {"incrbyfloat",incrbyfloatCommand,3,"wmF",0,NULL,1,1,1,0,0}, 214 {"getset",getsetCommand,3,"wm",0,NULL,1,1,1,0,0}, 215 {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0}, 216 {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0}, 217 {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0}, 218 {"select",selectCommand,2,"rlF",0,NULL,0,0,0,0,0}, 219 {"move",moveCommand,3,"wF",0,NULL,1,1,1,0,0}, 220 {"rename",renameCommand,3,"w",0,NULL,1,2,1,0,0}, 221 {"renamenx",renamenxCommand,3,"wF",0,NULL,1,2,1,0,0}, 222 {"expire",expireCommand,3,"wF",0,NULL,1,1,1,0,0}, 223 {"expireat",expireatCommand,3,"wF",0,NULL,1,1,1,0,0}, 224 {"pexpire",pexpireCommand,3,"wF",0,NULL,1,1,1,0,0}, 225 {"pexpireat",pexpireatCommand,3,"wF",0,NULL,1,1,1,0,0}, 226 {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0}, 227 {"scan",scanCommand,-2,"rR",0,NULL,0,0,0,0,0}, 228 {"dbsize",dbsizeCommand,1,"rF",0,NULL,0,0,0,0,0}, 229 {"auth",authCommand,2,"rsltF",0,NULL,0,0,0,0,0}, 230 {"ping",pingCommand,-1,"rtF",0,NULL,0,0,0,0,0}, 231 {"echo",echoCommand,2,"rF",0,NULL,0,0,0,0,0}, 232 {"save",saveCommand,1,"ars",0,NULL,0,0,0,0,0}, 233 {"bgsave",bgsaveCommand,1,"ar",0,NULL,0,0,0,0,0}, 234 {"bgrewriteaof",bgrewriteaofCommand,1,"ar",0,NULL,0,0,0,0,0}, 235 {"shutdown",shutdownCommand,-1,"arlt",0,NULL,0,0,0,0,0}, 236 {"lastsave",lastsaveCommand,1,"rRF",0,NULL,0,0,0,0,0}, 237 {"type",typeCommand,2,"rF",0,NULL,1,1,1,0,0}, 238 {"multi",multiCommand,1,"rsF",0,NULL,0,0,0,0,0}, 239 {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0}, 240 {"discard",discardCommand,1,"rsF",0,NULL,0,0,0,0,0}, 241 {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0}, 242 {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0}, 243 {"replconf",replconfCommand,-1,"arslt",0,NULL,0,0,0,0,0}, 244 {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0}, 245 {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0}, 246 {"sort",sortCommand,-2,"wm",0,sortGetKeys,1,1,1,0,0}, 247 {"info",infoCommand,-1,"rlt",0,NULL,0,0,0,0,0}, 248 {"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0}, 249 {"ttl",ttlCommand,2,"rF",0,NULL,1,1,1,0,0}, 250 {"pttl",pttlCommand,2,"rF",0,NULL,1,1,1,0,0}, 251 {"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0}, 252 {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0}, 253 {"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0}, 254 {"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0}, 255 {"config",configCommand,-2,"art",0,NULL,0,0,0,0,0}, 256 {"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0}, 257 {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0}, 258 {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0}, 259 {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0}, 260 {"publish",publishCommand,3,"pltrF",0,NULL,0,0,0,0,0}, 261 {"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0}, 262 {"watch",watchCommand,-2,"rsF",0,NULL,1,-1,1,0,0}, 263 {"unwatch",unwatchCommand,1,"rsF",0,NULL,0,0,0,0,0}, 264 {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0}, 265 {"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0}, 266 {"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0}, 267 {"migrate",migrateCommand,-6,"w",0,NULL,3,3,1,0,0}, 268 {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0}, 269 {"readonly",readonlyCommand,1,"rF",0,NULL,0,0,0,0,0}, 270 {"readwrite",readwriteCommand,1,"rF",0,NULL,0,0,0,0,0}, 271 {"dump",dumpCommand,2,"r",0,NULL,1,1,1,0,0}, 272 {"object",objectCommand,3,"r",0,NULL,2,2,2,0,0}, 273 {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0}, 274 {"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0}, 275 {"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0}, 276 {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0}, 277 {"script",scriptCommand,-2,"rs",0,NULL,0,0,0,0,0}, 278 {"time",timeCommand,1,"rRF",0,NULL,0,0,0,0,0}, 279 {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0}, 280 {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0}, 281 {"bitpos",bitposCommand,-3,"r",0,NULL,1,1,1,0,0}, 282 {"wait",waitCommand,3,"rs",0,NULL,0,0,0,0,0}, 283 {"command",commandCommand,0,"rlt",0,NULL,0,0,0,0,0}, 284 {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0}, 285 {"georadius",georadiusCommand,-6,"r",0,NULL,1,1,1,0,0}, 286 {"georadiusbymember",georadiusByMemberCommand,-5,"r",0,NULL,1,1,1,0,0}, 287 {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0}, 288 {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0}, 289 {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0}, 290 {"pfselftest",pfselftestCommand,1,"r",0,NULL,0,0,0,0,0}, 291 {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0}, 292 {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0}, 293 {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0}, 294 {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0}, 295 {"latency",latencyCommand,-2,"arslt",0,NULL,0,0,0,0,0} 296 }; 297 298 struct evictionPoolEntry *evictionPoolAlloc(void); 299 300 /*============================ Utility functions ============================ */ 301 302 /* Low level logging. To use only for very big messages, otherwise 303 * serverLog() is to prefer. */ 304 void serverLogRaw(int level, const char *msg) { 305 const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING }; 306 const char *c = ".-*#"; 307 FILE *fp; 308 char buf[64]; 309 int rawmode = (level & LL_RAW); 310 int log_to_stdout = server.logfile[0] == '\0'; 311 312 level &= 0xff; /* clear flags */ 313 if (level < server.verbosity) return; 314 315 fp = log_to_stdout ? stdout : fopen(server.logfile,"a"); 316 if (!fp) return; 317 318 if (rawmode) { 319 fprintf(fp,"%s",msg); 320 } else { 321 int off; 322 struct timeval tv; 323 int role_char; 324 pid_t pid = getpid(); 325 326 gettimeofday(&tv,NULL); 327 off = strftime(buf,sizeof(buf),"%d %b %H:%M:%S.",localtime(&tv.tv_sec)); 328 snprintf(buf+off,sizeof(buf)-off,"%03d",(int)tv.tv_usec/1000); 329 if (server.sentinel_mode) { 330 role_char = 'X'; /* Sentinel. */ 331 } else if (pid != server.pid) { 332 role_char = 'C'; /* RDB / AOF writing child. */ 333 } else { 334 role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */ 335 } 336 fprintf(fp,"%d:%c %s %c %s\n", 337 (int)getpid(),role_char, buf,c[level],msg); 338 } 339 fflush(fp); 340 341 if (!log_to_stdout) fclose(fp); 342 if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg); 343 } 344 345 /* Like serverLogRaw() but with printf-alike support. This is the function that 346 * is used across the code. The raw version is only used in order to dump 347 * the INFO output on crash. */ 348 void serverLog(int level, const char *fmt, ...) { 349 va_list ap; 350 char msg[LOG_MAX_LEN]; 351 352 if ((level&0xff) < server.verbosity) return; 353 354 va_start(ap, fmt); 355 vsnprintf(msg, sizeof(msg), fmt, ap); 356 va_end(ap); 357 358 serverLogRaw(level,msg); 359 } 360 361 /* Log a fixed message without printf-alike capabilities, in a way that is 362 * safe to call from a signal handler. 363 * 364 * We actually use this only for signals that are not fatal from the point 365 * of view of Redis. Signals that are going to kill the server anyway and 366 * where we need printf-alike features are served by serverLog(). */ 367 void serverLogFromHandler(int level, const char *msg) { 368 int fd; 369 int log_to_stdout = server.logfile[0] == '\0'; 370 char buf[64]; 371 372 if ((level&0xff) < server.verbosity || (log_to_stdout && server.daemonize)) 373 return; 374 fd = log_to_stdout ? STDOUT_FILENO : 375 open(server.logfile, O_APPEND|O_CREAT|O_WRONLY, 0644); 376 if (fd == -1) return; 377 ll2string(buf,sizeof(buf),getpid()); 378 if (write(fd,buf,strlen(buf)) == -1) goto err; 379 if (write(fd,":signal-handler (",17) == -1) goto err; 380 ll2string(buf,sizeof(buf),time(NULL)); 381 if (write(fd,buf,strlen(buf)) == -1) goto err; 382 if (write(fd,") ",2) == -1) goto err; 383 if (write(fd,msg,strlen(msg)) == -1) goto err; 384 if (write(fd,"\n",1) == -1) goto err; 385 err: 386 if (!log_to_stdout) close(fd); 387 } 388 389 /* Return the UNIX time in microseconds */ 390 long long ustime(void) { 391 struct timeval tv; 392 long long ust; 393 394 gettimeofday(&tv, NULL); 395 ust = ((long long)tv.tv_sec)*1000000; 396 ust += tv.tv_usec; 397 return ust; 398 } 399 400 /* Return the UNIX time in milliseconds */ 401 mstime_t mstime(void) { 402 return ustime()/1000; 403 } 404 405 /* After an RDB dump or AOF rewrite we exit from children using _exit() instead of 406 * exit(), because the latter may interact with the same file objects used by 407 * the parent process. However if we are testing the coverage normal exit() is 408 * used in order to obtain the right coverage information. */ 409 void exitFromChild(int retcode) { 410 #ifdef COVERAGE_TEST 411 exit(retcode); 412 #else 413 _exit(retcode); 414 #endif 415 } 416 417 /*====================== Hash table type implementation ==================== */ 418 419 /* This is a hash table type that uses the SDS dynamic strings library as 420 * keys and redis objects as values (objects can hold SDS strings, 421 * lists, sets). */ 422 423 void dictVanillaFree(void *privdata, void *val) 424 { 425 DICT_NOTUSED(privdata); 426 zfree(val); 427 } 428 429 void dictListDestructor(void *privdata, void *val) 430 { 431 DICT_NOTUSED(privdata); 432 listRelease((list*)val); 433 } 434 435 int dictSdsKeyCompare(void *privdata, const void *key1, 436 const void *key2) 437 { 438 int l1,l2; 439 DICT_NOTUSED(privdata); 440 441 l1 = sdslen((sds)key1); 442 l2 = sdslen((sds)key2); 443 if (l1 != l2) return 0; 444 return memcmp(key1, key2, l1) == 0; 445 } 446 447 /* A case insensitive version used for the command lookup table and other 448 * places where case insensitive non binary-safe comparison is needed. */ 449 int dictSdsKeyCaseCompare(void *privdata, const void *key1, 450 const void *key2) 451 { 452 DICT_NOTUSED(privdata); 453 454 return strcasecmp(key1, key2) == 0; 455 } 456 457 void dictObjectDestructor(void *privdata, void *val) 458 { 459 DICT_NOTUSED(privdata); 460 461 if (val == NULL) return; /* Values of swapped out keys as set to NULL */ 462 decrRefCount(val); 463 } 464 465 void dictSdsDestructor(void *privdata, void *val) 466 { 467 DICT_NOTUSED(privdata); 468 469 sdsfree(val); 470 } 471 472 int dictObjKeyCompare(void *privdata, const void *key1, 473 const void *key2) 474 { 475 const robj *o1 = key1, *o2 = key2; 476 return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); 477 } 478 479 unsigned int dictObjHash(const void *key) { 480 const robj *o = key; 481 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); 482 } 483 484 unsigned int dictSdsHash(const void *key) { 485 return dictGenHashFunction((unsigned char*)key, sdslen((char*)key)); 486 } 487 488 unsigned int dictSdsCaseHash(const void *key) { 489 return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key)); 490 } 491 492 int dictEncObjKeyCompare(void *privdata, const void *key1, 493 const void *key2) 494 { 495 robj *o1 = (robj*) key1, *o2 = (robj*) key2; 496 int cmp; 497 498 if (o1->encoding == OBJ_ENCODING_INT && 499 o2->encoding == OBJ_ENCODING_INT) 500 return o1->ptr == o2->ptr; 501 502 o1 = getDecodedObject(o1); 503 o2 = getDecodedObject(o2); 504 cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); 505 decrRefCount(o1); 506 decrRefCount(o2); 507 return cmp; 508 } 509 510 unsigned int dictEncObjHash(const void *key) { 511 robj *o = (robj*) key; 512 513 if (sdsEncodedObject(o)) { 514 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); 515 } else { 516 if (o->encoding == OBJ_ENCODING_INT) { 517 char buf[32]; 518 int len; 519 520 len = ll2string(buf,32,(long)o->ptr); 521 return dictGenHashFunction((unsigned char*)buf, len); 522 } else { 523 unsigned int hash; 524 525 o = getDecodedObject(o); 526 hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); 527 decrRefCount(o); 528 return hash; 529 } 530 } 531 } 532 533 /* Sets type hash table */ 534 dictType setDictType = { 535 dictEncObjHash, /* hash function */ 536 NULL, /* key dup */ 537 NULL, /* val dup */ 538 dictEncObjKeyCompare, /* key compare */ 539 dictObjectDestructor, /* key destructor */ 540 NULL /* val destructor */ 541 }; 542 543 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */ 544 dictType zsetDictType = { 545 dictEncObjHash, /* hash function */ 546 NULL, /* key dup */ 547 NULL, /* val dup */ 548 dictEncObjKeyCompare, /* key compare */ 549 dictObjectDestructor, /* key destructor */ 550 NULL /* val destructor */ 551 }; 552 553 /* Db->dict, keys are sds strings, vals are Redis objects. */ 554 dictType dbDictType = { 555 dictSdsHash, /* hash function */ 556 NULL, /* key dup */ 557 NULL, /* val dup */ 558 dictSdsKeyCompare, /* key compare */ 559 dictSdsDestructor, /* key destructor */ 560 dictObjectDestructor /* val destructor */ 561 }; 562 563 /* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */ 564 dictType shaScriptObjectDictType = { 565 dictSdsCaseHash, /* hash function */ 566 NULL, /* key dup */ 567 NULL, /* val dup */ 568 dictSdsKeyCaseCompare, /* key compare */ 569 dictSdsDestructor, /* key destructor */ 570 dictObjectDestructor /* val destructor */ 571 }; 572 573 /* Db->expires */ 574 dictType keyptrDictType = { 575 dictSdsHash, /* hash function */ 576 NULL, /* key dup */ 577 NULL, /* val dup */ 578 dictSdsKeyCompare, /* key compare */ 579 NULL, /* key destructor */ 580 NULL /* val destructor */ 581 }; 582 583 /* Command table. sds string -> command struct pointer. */ 584 dictType commandTableDictType = { 585 dictSdsCaseHash, /* hash function */ 586 NULL, /* key dup */ 587 NULL, /* val dup */ 588 dictSdsKeyCaseCompare, /* key compare */ 589 dictSdsDestructor, /* key destructor */ 590 NULL /* val destructor */ 591 }; 592 593 /* Hash type hash table (note that small hashes are represented with ziplists) */ 594 dictType hashDictType = { 595 dictEncObjHash, /* hash function */ 596 NULL, /* key dup */ 597 NULL, /* val dup */ 598 dictEncObjKeyCompare, /* key compare */ 599 dictObjectDestructor, /* key destructor */ 600 dictObjectDestructor /* val destructor */ 601 }; 602 603 /* Keylist hash table type has unencoded redis objects as keys and 604 * lists as values. It's used for blocking operations (BLPOP) and to 605 * map swapped keys to a list of clients waiting for this keys to be loaded. */ 606 dictType keylistDictType = { 607 dictObjHash, /* hash function */ 608 NULL, /* key dup */ 609 NULL, /* val dup */ 610 dictObjKeyCompare, /* key compare */ 611 dictObjectDestructor, /* key destructor */ 612 dictListDestructor /* val destructor */ 613 }; 614 615 /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to 616 * clusterNode structures. */ 617 dictType clusterNodesDictType = { 618 dictSdsHash, /* hash function */ 619 NULL, /* key dup */ 620 NULL, /* val dup */ 621 dictSdsKeyCompare, /* key compare */ 622 dictSdsDestructor, /* key destructor */ 623 NULL /* val destructor */ 624 }; 625 626 /* Cluster re-addition blacklist. This maps node IDs to the time 627 * we can re-add this node. The goal is to avoid readding a removed 628 * node for some time. */ 629 dictType clusterNodesBlackListDictType = { 630 dictSdsCaseHash, /* hash function */ 631 NULL, /* key dup */ 632 NULL, /* val dup */ 633 dictSdsKeyCaseCompare, /* key compare */ 634 dictSdsDestructor, /* key destructor */ 635 NULL /* val destructor */ 636 }; 637 638 /* Migrate cache dict type. */ 639 dictType migrateCacheDictType = { 640 dictSdsHash, /* hash function */ 641 NULL, /* key dup */ 642 NULL, /* val dup */ 643 dictSdsKeyCompare, /* key compare */ 644 dictSdsDestructor, /* key destructor */ 645 NULL /* val destructor */ 646 }; 647 648 /* Replication cached script dict (server.repl_scriptcache_dict). 649 * Keys are sds SHA1 strings, while values are not used at all in the current 650 * implementation. */ 651 dictType replScriptCacheDictType = { 652 dictSdsCaseHash, /* hash function */ 653 NULL, /* key dup */ 654 NULL, /* val dup */ 655 dictSdsKeyCaseCompare, /* key compare */ 656 dictSdsDestructor, /* key destructor */ 657 NULL /* val destructor */ 658 }; 659 660 int htNeedsResize(dict *dict) { 661 long long size, used; 662 663 size = dictSlots(dict); 664 used = dictSize(dict); 665 return (size && used && size > DICT_HT_INITIAL_SIZE && 666 (used*100/size < HASHTABLE_MIN_FILL)); 667 } 668 669 /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL 670 * we resize the hash table to save memory */ 671 void tryResizeHashTables(int dbid) { 672 if (htNeedsResize(server.db[dbid].dict)) 673 dictResize(server.db[dbid].dict); 674 if (htNeedsResize(server.db[dbid].expires)) 675 dictResize(server.db[dbid].expires); 676 } 677 678 /* Our hash table implementation performs rehashing incrementally while 679 * we write/read from the hash table. Still if the server is idle, the hash 680 * table will use two tables for a long time. So we try to use 1 millisecond 681 * of CPU time at every call of this function to perform some rehahsing. 682 * 683 * The function returns 1 if some rehashing was performed, otherwise 0 684 * is returned. */ 685 int incrementallyRehash(int dbid) { 686 /* Keys dictionary */ 687 if (dictIsRehashing(server.db[dbid].dict)) { 688 dictRehashMilliseconds(server.db[dbid].dict,1); 689 return 1; /* already used our millisecond for this loop... */ 690 } 691 /* Expires */ 692 if (dictIsRehashing(server.db[dbid].expires)) { 693 dictRehashMilliseconds(server.db[dbid].expires,1); 694 return 1; /* already used our millisecond for this loop... */ 695 } 696 return 0; 697 } 698 699 /* This function is called once a background process of some kind terminates, 700 * as we want to avoid resizing the hash tables when there is a child in order 701 * to play well with copy-on-write (otherwise when a resize happens lots of 702 * memory pages are copied). The goal of this function is to update the ability 703 * for dict.c to resize the hash tables accordingly to the fact we have o not 704 * running childs. */ 705 void updateDictResizePolicy(void) { 706 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) 707 dictEnableResize(); 708 else 709 dictDisableResize(); 710 } 711 712 /* ======================= Cron: called every 100 ms ======================== */ 713 714 /* Helper function for the activeExpireCycle() function. 715 * This function will try to expire the key that is stored in the hash table 716 * entry 'de' of the 'expires' hash table of a Redis database. 717 * 718 * If the key is found to be expired, it is removed from the database and 719 * 1 is returned. Otherwise no operation is performed and 0 is returned. 720 * 721 * When a key is expired, server.stat_expiredkeys is incremented. 722 * 723 * The parameter 'now' is the current time in milliseconds as is passed 724 * to the function to avoid too many gettimeofday() syscalls. */ 725 int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { 726 long long t = dictGetSignedIntegerVal(de); 727 if (now > t) { 728 sds key = dictGetKey(de); 729 robj *keyobj = createStringObject(key,sdslen(key)); 730 731 propagateExpire(db,keyobj); 732 dbDelete(db,keyobj); 733 notifyKeyspaceEvent(NOTIFY_EXPIRED, 734 "expired",keyobj,db->id); 735 decrRefCount(keyobj); 736 server.stat_expiredkeys++; 737 return 1; 738 } else { 739 return 0; 740 } 741 } 742 743 /* Try to expire a few timed out keys. The algorithm used is adaptive and 744 * will use few CPU cycles if there are few expiring keys, otherwise 745 * it will get more aggressive to avoid that too much memory is used by 746 * keys that can be removed from the keyspace. 747 * 748 * No more than CRON_DBS_PER_CALL databases are tested at every 749 * iteration. 750 * 751 * This kind of call is used when Redis detects that timelimit_exit is 752 * true, so there is more work to do, and we do it more incrementally from 753 * the beforeSleep() function of the event loop. 754 * 755 * Expire cycle type: 756 * 757 * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a 758 * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION 759 * microseconds, and is not repeated again before the same amount of time. 760 * 761 * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is 762 * executed, where the time limit is a percentage of the REDIS_HZ period 763 * as specified by the REDIS_EXPIRELOOKUPS_TIME_PERC define. */ 764 765 void activeExpireCycle(int type) { 766 /* This function has some global state in order to continue the work 767 * incrementally across calls. */ 768 static unsigned int current_db = 0; /* Last DB tested. */ 769 static int timelimit_exit = 0; /* Time limit hit in previous call? */ 770 static long long last_fast_cycle = 0; /* When last fast cycle ran. */ 771 772 int j, iteration = 0; 773 int dbs_per_call = CRON_DBS_PER_CALL; 774 long long start = ustime(), timelimit; 775 776 if (type == ACTIVE_EXPIRE_CYCLE_FAST) { 777 /* Don't start a fast cycle if the previous cycle did not exited 778 * for time limt. Also don't repeat a fast cycle for the same period 779 * as the fast cycle total duration itself. */ 780 if (!timelimit_exit) return; 781 if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return; 782 last_fast_cycle = start; 783 } 784 785 /* We usually should test CRON_DBS_PER_CALL per iteration, with 786 * two exceptions: 787 * 788 * 1) Don't test more DBs than we have. 789 * 2) If last time we hit the time limit, we want to scan all DBs 790 * in this iteration, as there is work to do in some DB and we don't want 791 * expired keys to use memory for too much time. */ 792 if (dbs_per_call > server.dbnum || timelimit_exit) 793 dbs_per_call = server.dbnum; 794 795 /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time 796 * per iteration. Since this function gets called with a frequency of 797 * server.hz times per second, the following is the max amount of 798 * microseconds we can spend in this function. */ 799 timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100; 800 timelimit_exit = 0; 801 if (timelimit <= 0) timelimit = 1; 802 803 if (type == ACTIVE_EXPIRE_CYCLE_FAST) 804 timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */ 805 806 for (j = 0; j < dbs_per_call; j++) { 807 int expired; 808 redisDb *db = server.db+(current_db % server.dbnum); 809 810 /* Increment the DB now so we are sure if we run out of time 811 * in the current DB we'll restart from the next. This allows to 812 * distribute the time evenly across DBs. */ 813 current_db++; 814 815 /* Continue to expire if at the end of the cycle more than 25% 816 * of the keys were expired. */ 817 do { 818 unsigned long num, slots; 819 long long now, ttl_sum; 820 int ttl_samples; 821 822 /* If there is nothing to expire try next DB ASAP. */ 823 if ((num = dictSize(db->expires)) == 0) { 824 db->avg_ttl = 0; 825 break; 826 } 827 slots = dictSlots(db->expires); 828 now = mstime(); 829 830 /* When there are less than 1% filled slots getting random 831 * keys is expensive, so stop here waiting for better times... 832 * The dictionary will be resized asap. */ 833 if (num && slots > DICT_HT_INITIAL_SIZE && 834 (num*100/slots < 1)) break; 835 836 /* The main collection cycle. Sample random keys among keys 837 * with an expire set, checking for expired ones. */ 838 expired = 0; 839 ttl_sum = 0; 840 ttl_samples = 0; 841 842 if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) 843 num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; 844 845 while (num--) { 846 dictEntry *de; 847 long long ttl; 848 849 if ((de = dictGetRandomKey(db->expires)) == NULL) break; 850 ttl = dictGetSignedIntegerVal(de)-now; 851 if (activeExpireCycleTryExpire(db,de,now)) expired++; 852 if (ttl < 0) ttl = 0; 853 ttl_sum += ttl; 854 ttl_samples++; 855 } 856 857 /* Update the average TTL stats for this database. */ 858 if (ttl_samples) { 859 long long avg_ttl = ttl_sum/ttl_samples; 860 861 if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; 862 /* Smooth the value averaging with the previous one. */ 863 db->avg_ttl = (db->avg_ttl+avg_ttl)/2; 864 } 865 866 /* We can't block forever here even if there are many keys to 867 * expire. So after a given amount of milliseconds return to the 868 * caller waiting for the other active expire cycle. */ 869 iteration++; 870 if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */ 871 long long elapsed = ustime()-start; 872 873 latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); 874 if (elapsed > timelimit) timelimit_exit = 1; 875 } 876 if (timelimit_exit) return; 877 /* We don't repeat the cycle if there are less than 25% of keys 878 * found expired in the current DB. */ 879 } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); 880 } 881 } 882 883 unsigned int getLRUClock(void) { 884 return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX; 885 } 886 887 /* Add a sample to the operations per second array of samples. */ 888 void trackInstantaneousMetric(int metric, long long current_reading) { 889 long long t = mstime() - server.inst_metric[metric].last_sample_time; 890 long long ops = current_reading - 891 server.inst_metric[metric].last_sample_count; 892 long long ops_sec; 893 894 ops_sec = t > 0 ? (ops*1000/t) : 0; 895 896 server.inst_metric[metric].samples[server.inst_metric[metric].idx] = 897 ops_sec; 898 server.inst_metric[metric].idx++; 899 server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES; 900 server.inst_metric[metric].last_sample_time = mstime(); 901 server.inst_metric[metric].last_sample_count = current_reading; 902 } 903 904 /* Return the mean of all the samples. */ 905 long long getInstantaneousMetric(int metric) { 906 int j; 907 long long sum = 0; 908 909 for (j = 0; j < STATS_METRIC_SAMPLES; j++) 910 sum += server.inst_metric[metric].samples[j]; 911 return sum / STATS_METRIC_SAMPLES; 912 } 913 914 /* Check for timeouts. Returns non-zero if the client was terminated. 915 * The function gets the current time in milliseconds as argument since 916 * it gets called multiple times in a loop, so calling gettimeofday() for 917 * each iteration would be costly without any actual gain. */ 918 int clientsCronHandleTimeout(client *c, mstime_t now_ms) { 919 time_t now = now_ms/1000; 920 921 if (server.maxidletime && 922 !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */ 923 !(c->flags & CLIENT_MASTER) && /* no timeout for masters */ 924 !(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */ 925 !(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */ 926 (now - c->lastinteraction > server.maxidletime)) 927 { 928 serverLog(LL_VERBOSE,"Closing idle client"); 929 freeClient(c); 930 return 1; 931 } else if (c->flags & CLIENT_BLOCKED) { 932 /* Blocked OPS timeout is handled with milliseconds resolution. 933 * However note that the actual resolution is limited by 934 * server.hz. */ 935 936 if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) { 937 /* Handle blocking operation specific timeout. */ 938 replyToBlockedClientTimedOut(c); 939 unblockClient(c); 940 } else if (server.cluster_enabled) { 941 /* Cluster: handle unblock & redirect of clients blocked 942 * into keys no longer served by this server. */ 943 if (clusterRedirectBlockedClientIfNeeded(c)) 944 unblockClient(c); 945 } 946 } 947 return 0; 948 } 949 950 /* The client query buffer is an sds.c string that can end with a lot of 951 * free space not used, this function reclaims space if needed. 952 * 953 * The function always returns 0 as it never terminates the client. */ 954 int clientsCronResizeQueryBuffer(client *c) { 955 size_t querybuf_size = sdsAllocSize(c->querybuf); 956 time_t idletime = server.unixtime - c->lastinteraction; 957 958 /* There are two conditions to resize the query buffer: 959 * 1) Query buffer is > BIG_ARG and too big for latest peak. 960 * 2) Client is inactive and the buffer is bigger than 1k. */ 961 if (((querybuf_size > PROTO_MBULK_BIG_ARG) && 962 (querybuf_size/(c->querybuf_peak+1)) > 2) || 963 (querybuf_size > 1024 && idletime > 2)) 964 { 965 /* Only resize the query buffer if it is actually wasting space. */ 966 if (sdsavail(c->querybuf) > 1024) { 967 c->querybuf = sdsRemoveFreeSpace(c->querybuf); 968 } 969 } 970 /* Reset the peak again to capture the peak memory usage in the next 971 * cycle. */ 972 c->querybuf_peak = 0; 973 return 0; 974 } 975 976 #define CLIENTS_CRON_MIN_ITERATIONS 5 977 void clientsCron(void) { 978 /* Make sure to process at least numclients/server.hz of clients 979 * per call. Since this function is called server.hz times per second 980 * we are sure that in the worst case we process all the clients in 1 981 * second. */ 982 int numclients = listLength(server.clients); 983 int iterations = numclients/server.hz; 984 mstime_t now = mstime(); 985 986 /* Process at least a few clients while we are at it, even if we need 987 * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract 988 * of processing each client once per second. */ 989 if (iterations < CLIENTS_CRON_MIN_ITERATIONS) 990 iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ? 991 numclients : CLIENTS_CRON_MIN_ITERATIONS; 992 993 while(listLength(server.clients) && iterations--) { 994 client *c; 995 listNode *head; 996 997 /* Rotate the list, take the current head, process. 998 * This way if the client must be removed from the list it's the 999 * first element and we don't incur into O(N) computation. */ 1000 listRotate(server.clients); 1001 head = listFirst(server.clients); 1002 c = listNodeValue(head); 1003 /* The following functions do different service checks on the client. 1004 * The protocol is that they return non-zero if the client was 1005 * terminated. */ 1006 if (clientsCronHandleTimeout(c,now)) continue; 1007 if (clientsCronResizeQueryBuffer(c)) continue; 1008 } 1009 } 1010 1011 /* This function handles 'background' operations we are required to do 1012 * incrementally in Redis databases, such as active key expiring, resizing, 1013 * rehashing. */ 1014 void databasesCron(void) { 1015 /* Expire keys by random sampling. Not required for slaves 1016 * as master will synthesize DELs for us. */ 1017 if (server.active_expire_enabled && server.masterhost == NULL) 1018 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); 1019 1020 /* Perform hash tables rehashing if needed, but only if there are no 1021 * other processes saving the DB on disk. Otherwise rehashing is bad 1022 * as will cause a lot of copy-on-write of memory pages. */ 1023 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { 1024 /* We use global counters so if we stop the computation at a given 1025 * DB we'll be able to start from the successive in the next 1026 * cron loop iteration. */ 1027 static unsigned int resize_db = 0; 1028 static unsigned int rehash_db = 0; 1029 int dbs_per_call = CRON_DBS_PER_CALL; 1030 int j; 1031 1032 /* Don't test more DBs than we have. */ 1033 if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; 1034 1035 /* Resize */ 1036 for (j = 0; j < dbs_per_call; j++) { 1037 tryResizeHashTables(resize_db % server.dbnum); 1038 resize_db++; 1039 } 1040 1041 /* Rehash */ 1042 if (server.activerehashing) { 1043 for (j = 0; j < dbs_per_call; j++) { 1044 int work_done = incrementallyRehash(rehash_db % server.dbnum); 1045 rehash_db++; 1046 if (work_done) { 1047 /* If the function did some work, stop here, we'll do 1048 * more at the next cron loop. */ 1049 break; 1050 } 1051 } 1052 } 1053 } 1054 } 1055 1056 /* We take a cached value of the unix time in the global state because with 1057 * virtual memory and aging there is to store the current time in objects at 1058 * every object access, and accuracy is not needed. To access a global var is 1059 * a lot faster than calling time(NULL) */ 1060 void updateCachedTime(void) { 1061 server.unixtime = time(NULL); 1062 server.mstime = mstime(); 1063 } 1064 1065 /* This is our timer interrupt, called server.hz times per second. 1066 * Here is where we do a number of things that need to be done asynchronously. 1067 * For instance: 1068 * 1069 * - Active expired keys collection (it is also performed in a lazy way on 1070 * lookup). 1071 * - Software watchdog. 1072 * - Update some statistic. 1073 * - Incremental rehashing of the DBs hash tables. 1074 * - Triggering BGSAVE / AOF rewrite, and handling of terminated children. 1075 * - Clients timeout of different kinds. 1076 * - Replication reconnection. 1077 * - Many more... 1078 * 1079 * Everything directly called here will be called server.hz times per second, 1080 * so in order to throttle execution of things we want to do less frequently 1081 * a macro is used: run_with_period(milliseconds) { .... } 1082 */ 1083 1084 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { 1085 int j; 1086 UNUSED(eventLoop); 1087 UNUSED(id); 1088 UNUSED(clientData); 1089 1090 /* Software watchdog: deliver the SIGALRM that will reach the signal 1091 * handler if we don't return here fast enough. */ 1092 if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); 1093 1094 /* Update the time cache. */ 1095 updateCachedTime(); 1096 1097 run_with_period(100) { 1098 trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands); 1099 trackInstantaneousMetric(STATS_METRIC_NET_INPUT, 1100 server.stat_net_input_bytes); 1101 trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, 1102 server.stat_net_output_bytes); 1103 } 1104 1105 /* We have just LRU_BITS bits per object for LRU information. 1106 * So we use an (eventually wrapping) LRU clock. 1107 * 1108 * Note that even if the counter wraps it's not a big problem, 1109 * everything will still work but some object will appear younger 1110 * to Redis. However for this to happen a given object should never be 1111 * touched for all the time needed to the counter to wrap, which is 1112 * not likely. 1113 * 1114 * Note that you can change the resolution altering the 1115 * LRU_CLOCK_RESOLUTION define. */ 1116 server.lruclock = getLRUClock(); 1117 1118 /* Record the max memory used since the server was started. */ 1119 if (zmalloc_used_memory() > server.stat_peak_memory) 1120 server.stat_peak_memory = zmalloc_used_memory(); 1121 1122 /* Sample the RSS here since this is a relatively slow call. */ 1123 server.resident_set_size = zmalloc_get_rss(); 1124 1125 /* We received a SIGTERM, shutting down here in a safe way, as it is 1126 * not ok doing so inside the signal handler. */ 1127 if (server.shutdown_asap) { 1128 if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0); 1129 serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); 1130 server.shutdown_asap = 0; 1131 } 1132 1133 /* Show some info about non-empty databases */ 1134 run_with_period(5000) { 1135 for (j = 0; j < server.dbnum; j++) { 1136 long long size, used, vkeys; 1137 1138 size = dictSlots(server.db[j].dict); 1139 used = dictSize(server.db[j].dict); 1140 vkeys = dictSize(server.db[j].expires); 1141 if (used || vkeys) { 1142 serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); 1143 /* dictPrintStats(server.dict); */ 1144 } 1145 } 1146 } 1147 1148 /* Show information about connected clients */ 1149 if (!server.sentinel_mode) { 1150 run_with_period(5000) { 1151 serverLog(LL_VERBOSE, 1152 "%lu clients connected (%lu slaves), %zu bytes in use", 1153 listLength(server.clients)-listLength(server.slaves), 1154 listLength(server.slaves), 1155 zmalloc_used_memory()); 1156 } 1157 } 1158 1159 /* We need to do a few operations on clients asynchronously. */ 1160 clientsCron(); 1161 1162 /* Handle background operations on Redis databases. */ 1163 databasesCron(); 1164 1165 /* Start a scheduled AOF rewrite if this was requested by the user while 1166 * a BGSAVE was in progress. */ 1167 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && 1168 server.aof_rewrite_scheduled) 1169 { 1170 rewriteAppendOnlyFileBackground(); 1171 } 1172 1173 /* Check if a background saving or AOF rewrite in progress terminated. */ 1174 if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || 1175 ldbPendingChildren()) 1176 { 1177 int statloc; 1178 pid_t pid; 1179 1180 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { 1181 int exitcode = WEXITSTATUS(statloc); 1182 int bysignal = 0; 1183 1184 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); 1185 1186 if (pid == -1) { 1187 serverLog(LL_WARNING,"wait3() returned an error: %s. " 1188 "rdb_child_pid = %d, aof_child_pid = %d", 1189 strerror(errno), 1190 (int) server.rdb_child_pid, 1191 (int) server.aof_child_pid); 1192 } else if (pid == server.rdb_child_pid) { 1193 backgroundSaveDoneHandler(exitcode,bysignal); 1194 } else if (pid == server.aof_child_pid) { 1195 backgroundRewriteDoneHandler(exitcode,bysignal); 1196 } else { 1197 if (!ldbRemoveChild(pid)) { 1198 serverLog(LL_WARNING, 1199 "Warning, detected child with unmatched pid: %ld", 1200 (long)pid); 1201 } 1202 } 1203 updateDictResizePolicy(); 1204 } 1205 } else { 1206 /* If there is not a background saving/rewrite in progress check if 1207 * we have to save/rewrite now */ 1208 for (j = 0; j < server.saveparamslen; j++) { 1209 struct saveparam *sp = server.saveparams+j; 1210 1211 /* Save if we reached the given amount of changes, 1212 * the given amount of seconds, and if the latest bgsave was 1213 * successful or if, in case of an error, at least 1214 * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */ 1215 if (server.dirty >= sp->changes && 1216 server.unixtime-server.lastsave > sp->seconds && 1217 (server.unixtime-server.lastbgsave_try > 1218 CONFIG_BGSAVE_RETRY_DELAY || 1219 server.lastbgsave_status == C_OK)) 1220 { 1221 serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", 1222 sp->changes, (int)sp->seconds); 1223 rdbSaveBackground(server.rdb_filename); 1224 break; 1225 } 1226 } 1227 1228 /* Trigger an AOF rewrite if needed */ 1229 if (server.rdb_child_pid == -1 && 1230 server.aof_child_pid == -1 && 1231 server.aof_rewrite_perc && 1232 server.aof_current_size > server.aof_rewrite_min_size) 1233 { 1234 long long base = server.aof_rewrite_base_size ? 1235 server.aof_rewrite_base_size : 1; 1236 long long growth = (server.aof_current_size*100/base) - 100; 1237 if (growth >= server.aof_rewrite_perc) { 1238 serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); 1239 rewriteAppendOnlyFileBackground(); 1240 } 1241 } 1242 } 1243 1244 1245 /* AOF postponed flush: Try at every cron cycle if the slow fsync 1246 * completed. */ 1247 if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); 1248 1249 /* AOF write errors: in this case we have a buffer to flush as well and 1250 * clear the AOF error in case of success to make the DB writable again, 1251 * however to try every second is enough in case of 'hz' is set to 1252 * an higher frequency. */ 1253 run_with_period(1000) { 1254 if (server.aof_last_write_status == C_ERR) 1255 flushAppendOnlyFile(0); 1256 } 1257 1258 /* Close clients that need to be closed asynchronous */ 1259 freeClientsInAsyncFreeQueue(); 1260 1261 /* Clear the paused clients flag if needed. */ 1262 clientsArePaused(); /* Don't check return value, just use the side effect. */ 1263 1264 /* Replication cron function -- used to reconnect to master and 1265 * to detect transfer failures. */ 1266 run_with_period(1000) replicationCron(); 1267 1268 /* Run the Redis Cluster cron. */ 1269 run_with_period(100) { 1270 if (server.cluster_enabled) clusterCron(); 1271 } 1272 1273 /* Run the Sentinel timer if we are in sentinel mode. */ 1274 run_with_period(100) { 1275 if (server.sentinel_mode) sentinelTimer(); 1276 } 1277 1278 /* Cleanup expired MIGRATE cached sockets. */ 1279 run_with_period(1000) { 1280 migrateCloseTimedoutSockets(); 1281 } 1282 1283 server.cronloops++; 1284 return 1000/server.hz; 1285 } 1286 1287 /* This function gets called every time Redis is entering the 1288 * main loop of the event driven library, that is, before to sleep 1289 * for ready file descriptors. */ 1290 void beforeSleep(struct aeEventLoop *eventLoop) { 1291 UNUSED(eventLoop); 1292 1293 /* Call the Redis Cluster before sleep function. Note that this function 1294 * may change the state of Redis Cluster (from ok to fail or vice versa), 1295 * so it's a good idea to call it before serving the unblocked clients 1296 * later in this function. */ 1297 if (server.cluster_enabled) clusterBeforeSleep(); 1298 1299 /* Run a fast expire cycle (the called function will return 1300 * ASAP if a fast cycle is not needed). */ 1301 if (server.active_expire_enabled && server.masterhost == NULL) 1302 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); 1303 1304 /* Send all the slaves an ACK request if at least one client blocked 1305 * during the previous event loop iteration. */ 1306 if (server.get_ack_from_slaves) { 1307 robj *argv[3]; 1308 1309 argv[0] = createStringObject("REPLCONF",8); 1310 argv[1] = createStringObject("GETACK",6); 1311 argv[2] = createStringObject("*",1); /* Not used argument. */ 1312 replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); 1313 decrRefCount(argv[0]); 1314 decrRefCount(argv[1]); 1315 decrRefCount(argv[2]); 1316 server.get_ack_from_slaves = 0; 1317 } 1318 1319 /* Unblock all the clients blocked for synchronous replication 1320 * in WAIT. */ 1321 if (listLength(server.clients_waiting_acks)) 1322 processClientsWaitingReplicas(); 1323 1324 /* Try to process pending commands for clients that were just unblocked. */ 1325 if (listLength(server.unblocked_clients)) 1326 processUnblockedClients(); 1327 1328 /* Write the AOF buffer on disk */ 1329 flushAppendOnlyFile(0); 1330 1331 /* Handle writes with pending output buffers. */ 1332 handleClientsWithPendingWrites(); 1333 } 1334 1335 /* =========================== Server initialization ======================== */ 1336 1337 void createSharedObjects(void) { 1338 int j; 1339 1340 shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n")); 1341 shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n")); 1342 shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n")); 1343 shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n")); 1344 shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n")); 1345 shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n")); 1346 shared.cnegone = createObject(OBJ_STRING,sdsnew(":-1\r\n")); 1347 shared.nullbulk = createObject(OBJ_STRING,sdsnew("$-1\r\n")); 1348 shared.nullmultibulk = createObject(OBJ_STRING,sdsnew("*-1\r\n")); 1349 shared.emptymultibulk = createObject(OBJ_STRING,sdsnew("*0\r\n")); 1350 shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n")); 1351 shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n")); 1352 shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n")); 1353 shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew( 1354 "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")); 1355 shared.nokeyerr = createObject(OBJ_STRING,sdsnew( 1356 "-ERR no such key\r\n")); 1357 shared.syntaxerr = createObject(OBJ_STRING,sdsnew( 1358 "-ERR syntax error\r\n")); 1359 shared.sameobjecterr = createObject(OBJ_STRING,sdsnew( 1360 "-ERR source and destination objects are the same\r\n")); 1361 shared.outofrangeerr = createObject(OBJ_STRING,sdsnew( 1362 "-ERR index out of range\r\n")); 1363 shared.noscripterr = createObject(OBJ_STRING,sdsnew( 1364 "-NOSCRIPT No matching script. Please use EVAL.\r\n")); 1365 shared.loadingerr = createObject(OBJ_STRING,sdsnew( 1366 "-LOADING Redis is loading the dataset in memory\r\n")); 1367 shared.slowscripterr = createObject(OBJ_STRING,sdsnew( 1368 "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n")); 1369 shared.masterdownerr = createObject(OBJ_STRING,sdsnew( 1370 "-MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.\r\n")); 1371 shared.bgsaveerr = createObject(OBJ_STRING,sdsnew( 1372 "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.\r\n")); 1373 shared.roslaveerr = createObject(OBJ_STRING,sdsnew( 1374 "-READONLY You can't write against a read only slave.\r\n")); 1375 shared.noautherr = createObject(OBJ_STRING,sdsnew( 1376 "-NOAUTH Authentication required.\r\n")); 1377 shared.oomerr = createObject(OBJ_STRING,sdsnew( 1378 "-OOM command not allowed when used memory > 'maxmemory'.\r\n")); 1379 shared.execaborterr = createObject(OBJ_STRING,sdsnew( 1380 "-EXECABORT Transaction discarded because of previous errors.\r\n")); 1381 shared.noreplicaserr = createObject(OBJ_STRING,sdsnew( 1382 "-NOREPLICAS Not enough good slaves to write.\r\n")); 1383 shared.busykeyerr = createObject(OBJ_STRING,sdsnew( 1384 "-BUSYKEY Target key name already exists.\r\n")); 1385 shared.space = createObject(OBJ_STRING,sdsnew(" ")); 1386 shared.colon = createObject(OBJ_STRING,sdsnew(":")); 1387 shared.plus = createObject(OBJ_STRING,sdsnew("+")); 1388 1389 for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) { 1390 char dictid_str[64]; 1391 int dictid_len; 1392 1393 dictid_len = ll2string(dictid_str,sizeof(dictid_str),j); 1394 shared.select[j] = createObject(OBJ_STRING, 1395 sdscatprintf(sdsempty(), 1396 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", 1397 dictid_len, dictid_str)); 1398 } 1399 shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13); 1400 shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14); 1401 shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15); 1402 shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18); 1403 shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); 1404 shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); 1405 shared.del = createStringObject("DEL",3); 1406 shared.rpop = createStringObject("RPOP",4); 1407 shared.lpop = createStringObject("LPOP",4); 1408 shared.lpush = createStringObject("LPUSH",5); 1409 for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { 1410 shared.integers[j] = createObject(OBJ_STRING,(void*)(long)j); 1411 shared.integers[j]->encoding = OBJ_ENCODING_INT; 1412 } 1413 for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) { 1414 shared.mbulkhdr[j] = createObject(OBJ_STRING, 1415 sdscatprintf(sdsempty(),"*%d\r\n",j)); 1416 shared.bulkhdr[j] = createObject(OBJ_STRING, 1417 sdscatprintf(sdsempty(),"$%d\r\n",j)); 1418 } 1419 /* The following two shared objects, minstring and maxstrings, are not 1420 * actually used for their value but as a special object meaning 1421 * respectively the minimum possible string and the maximum possible 1422 * string in string comparisons for the ZRANGEBYLEX command. */ 1423 shared.minstring = createStringObject("minstring",9); 1424 shared.maxstring = createStringObject("maxstring",9); 1425 } 1426 1427 void initServerConfig(void) { 1428 int j; 1429 1430 getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE); 1431 server.configfile = NULL; 1432 server.executable = NULL; 1433 server.hz = CONFIG_DEFAULT_HZ; 1434 server.runid[CONFIG_RUN_ID_SIZE] = '\0'; 1435 server.arch_bits = (sizeof(long) == 8) ? 64 : 32; 1436 server.port = CONFIG_DEFAULT_SERVER_PORT; 1437 server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG; 1438 server.bindaddr_count = 0; 1439 server.unixsocket = NULL; 1440 server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM; 1441 server.ipfd_count = 0; 1442 server.sofd = -1; 1443 server.dbnum = CONFIG_DEFAULT_DBNUM; 1444 server.verbosity = CONFIG_DEFAULT_VERBOSITY; 1445 server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT; 1446 server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE; 1447 server.active_expire_enabled = 1; 1448 server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN; 1449 server.saveparams = NULL; 1450 server.loading = 0; 1451 server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE); 1452 server.syslog_enabled = CONFIG_DEFAULT_SYSLOG_ENABLED; 1453 server.syslog_ident = zstrdup(CONFIG_DEFAULT_SYSLOG_IDENT); 1454 server.syslog_facility = LOG_LOCAL0; 1455 server.daemonize = CONFIG_DEFAULT_DAEMONIZE; 1456 server.supervised = 0; 1457 server.supervised_mode = SUPERVISED_NONE; 1458 server.aof_state = AOF_OFF; 1459 server.aof_fsync = CONFIG_DEFAULT_AOF_FSYNC; 1460 server.aof_no_fsync_on_rewrite = CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE; 1461 server.aof_rewrite_perc = AOF_REWRITE_PERC; 1462 server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE; 1463 server.aof_rewrite_base_size = 0; 1464 server.aof_rewrite_scheduled = 0; 1465 server.aof_last_fsync = time(NULL); 1466 server.aof_rewrite_time_last = -1; 1467 server.aof_rewrite_time_start = -1; 1468 server.aof_lastbgrewrite_status = C_OK; 1469 server.aof_delayed_fsync = 0; 1470 server.aof_fd = -1; 1471 server.aof_selected_db = -1; /* Make sure the first time will not match */ 1472 server.aof_flush_postponed_start = 0; 1473 server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; 1474 server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; 1475 server.pidfile = NULL; 1476 server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME); 1477 server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME); 1478 server.requirepass = NULL; 1479 server.rdb_compression = CONFIG_DEFAULT_RDB_COMPRESSION; 1480 server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM; 1481 server.stop_writes_on_bgsave_err = CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR; 1482 server.activerehashing = CONFIG_DEFAULT_ACTIVE_REHASHING; 1483 server.notify_keyspace_events = 0; 1484 server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS; 1485 server.bpop_blocked_clients = 0; 1486 server.maxmemory = CONFIG_DEFAULT_MAXMEMORY; 1487 server.maxmemory_policy = CONFIG_DEFAULT_MAXMEMORY_POLICY; 1488 server.maxmemory_samples = CONFIG_DEFAULT_MAXMEMORY_SAMPLES; 1489 server.hash_max_ziplist_entries = OBJ_HASH_MAX_ZIPLIST_ENTRIES; 1490 server.hash_max_ziplist_value = OBJ_HASH_MAX_ZIPLIST_VALUE; 1491 server.list_max_ziplist_size = OBJ_LIST_MAX_ZIPLIST_SIZE; 1492 server.list_compress_depth = OBJ_LIST_COMPRESS_DEPTH; 1493 server.set_max_intset_entries = OBJ_SET_MAX_INTSET_ENTRIES; 1494 server.zset_max_ziplist_entries = OBJ_ZSET_MAX_ZIPLIST_ENTRIES; 1495 server.zset_max_ziplist_value = OBJ_ZSET_MAX_ZIPLIST_VALUE; 1496 server.hll_sparse_max_bytes = CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES; 1497 server.shutdown_asap = 0; 1498 server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD; 1499 server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT; 1500 server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE; 1501 server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG; 1502 server.cluster_enabled = 0; 1503 server.cluster_node_timeout = CLUSTER_DEFAULT_NODE_TIMEOUT; 1504 server.cluster_migration_barrier = CLUSTER_DEFAULT_MIGRATION_BARRIER; 1505 server.cluster_slave_validity_factor = CLUSTER_DEFAULT_SLAVE_VALIDITY; 1506 server.cluster_require_full_coverage = CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE; 1507 server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE); 1508 server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); 1509 server.next_client_id = 1; /* Client IDs, start from 1 .*/ 1510 server.loading_process_events_interval_bytes = (1024*1024*2); 1511 1512 server.lruclock = getLRUClock(); 1513 resetServerSaveParams(); 1514 1515 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ 1516 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */ 1517 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */ 1518 /* Replication related */ 1519 server.masterauth = NULL; 1520 server.masterhost = NULL; 1521 server.masterport = 6379; 1522 server.master = NULL; 1523 server.cached_master = NULL; 1524 server.repl_master_initial_offset = -1; 1525 server.repl_state = REPL_STATE_NONE; 1526 server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; 1527 server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA; 1528 server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY; 1529 server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ 1530 server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY; 1531 server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC; 1532 server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY; 1533 server.slave_priority = CONFIG_DEFAULT_SLAVE_PRIORITY; 1534 server.master_repl_offset = 0; 1535 1536 /* Replication partial resync backlog */ 1537 server.repl_backlog = NULL; 1538 server.repl_backlog_size = CONFIG_DEFAULT_REPL_BACKLOG_SIZE; 1539 server.repl_backlog_histlen = 0; 1540 server.repl_backlog_idx = 0; 1541 server.repl_backlog_off = 0; 1542 server.repl_backlog_time_limit = CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT; 1543 server.repl_no_slaves_since = time(NULL); 1544 1545 /* Client output buffer limits */ 1546 for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++) 1547 server.client_obuf_limits[j] = clientBufferLimitsDefaults[j]; 1548 1549 /* Double constants initialization */ 1550 R_Zero = 0.0; 1551 R_PosInf = 1.0/R_Zero; 1552 R_NegInf = -1.0/R_Zero; 1553 R_Nan = R_Zero/R_Zero; 1554 1555 /* Command table -- we initiialize it here as it is part of the 1556 * initial configuration, since command names may be changed via 1557 * redis.conf using the rename-command directive. */ 1558 server.commands = dictCreate(&commandTableDictType,NULL); 1559 server.orig_commands = dictCreate(&commandTableDictType,NULL); 1560 populateCommandTable(); 1561 server.delCommand = lookupCommandByCString("del"); 1562 server.multiCommand = lookupCommandByCString("multi"); 1563 server.lpushCommand = lookupCommandByCString("lpush"); 1564 server.lpopCommand = lookupCommandByCString("lpop"); 1565 server.rpopCommand = lookupCommandByCString("rpop"); 1566 server.sremCommand = lookupCommandByCString("srem"); 1567 server.execCommand = lookupCommandByCString("exec"); 1568 1569 /* Slow log */ 1570 server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN; 1571 server.slowlog_max_len = CONFIG_DEFAULT_SLOWLOG_MAX_LEN; 1572 1573 /* Latency monitor */ 1574 server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD; 1575 1576 /* Debugging */ 1577 server.assert_failed = "<no assertion failed>"; 1578 server.assert_file = "<no file>"; 1579 server.assert_line = 0; 1580 server.bug_report_start = 0; 1581 server.watchdog_period = 0; 1582 } 1583 1584 extern char **environ; 1585 1586 /* Restart the server, executing the same executable that started this 1587 * instance, with the same arguments and configuration file. 1588 * 1589 * The function is designed to directly call execve() so that the new 1590 * server instance will retain the PID of the previous one. 1591 * 1592 * The list of flags, that may be bitwise ORed together, alter the 1593 * behavior of this function: 1594 * 1595 * RESTART_SERVER_NONE No flags. 1596 * RESTART_SERVER_GRACEFULLY Do a proper shutdown before restarting. 1597 * RESTART_SERVER_CONFIG_REWRITE Rewrite the config file before restarting. 1598 * 1599 * On success the function does not return, because the process turns into 1600 * a different process. On error C_ERR is returned. */ 1601 int restartServer(int flags, mstime_t delay) { 1602 int j; 1603 1604 /* Check if we still have accesses to the executable that started this 1605 * server instance. */ 1606 if (access(server.executable,X_OK) == -1) return C_ERR; 1607 1608 /* Config rewriting. */ 1609 if (flags & RESTART_SERVER_CONFIG_REWRITE && 1610 server.configfile && 1611 rewriteConfig(server.configfile) == -1) return C_ERR; 1612 1613 /* Perform a proper shutdown. */ 1614 if (flags & RESTART_SERVER_GRACEFULLY && 1615 prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK) return C_ERR; 1616 1617 /* Close all file descriptors, with the exception of stdin, stdout, strerr 1618 * which are useful if we restart a Redis server which is not daemonized. */ 1619 for (j = 3; j < (int)server.maxclients + 1024; j++) close(j); 1620 1621 /* Execute the server with the original command line. */ 1622 if (delay) usleep(delay*1000); 1623 execve(server.executable,server.exec_argv,environ); 1624 1625 /* If an error occurred here, there is nothing we can do, but exit. */ 1626 _exit(1); 1627 1628 return C_ERR; /* Never reached. */ 1629 } 1630 1631 /* This function will try to raise the max number of open files accordingly to 1632 * the configured max number of clients. It also reserves a number of file 1633 * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of 1634 * persistence, listening sockets, log files and so forth. 1635 * 1636 * If it will not be possible to set the limit accordingly to the configured 1637 * max number of clients, the function will do the reverse setting 1638 * server.maxclients to the value that we can actually handle. */ 1639 void adjustOpenFilesLimit(void) { 1640 rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS; 1641 struct rlimit limit; 1642 1643 if (getrlimit(RLIMIT_NOFILE,&limit) == -1) { 1644 serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.", 1645 strerror(errno)); 1646 server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS; 1647 } else { 1648 rlim_t oldlimit = limit.rlim_cur; 1649 1650 /* Set the max number of files if the current limit is not enough 1651 * for our needs. */ 1652 if (oldlimit < maxfiles) { 1653 rlim_t bestlimit; 1654 int setrlimit_error = 0; 1655 1656 /* Try to set the file limit to match 'maxfiles' or at least 1657 * to the higher value supported less than maxfiles. */ 1658 bestlimit = maxfiles; 1659 while(bestlimit > oldlimit) { 1660 rlim_t decr_step = 16; 1661 1662 limit.rlim_cur = bestlimit; 1663 limit.rlim_max = bestlimit; 1664 if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break; 1665 setrlimit_error = errno; 1666 1667 /* We failed to set file limit to 'bestlimit'. Try with a 1668 * smaller limit decrementing by a few FDs per iteration. */ 1669 if (bestlimit < decr_step) break; 1670 bestlimit -= decr_step; 1671 } 1672 1673 /* Assume that the limit we get initially is still valid if 1674 * our last try was even lower. */ 1675 if (bestlimit < oldlimit) bestlimit = oldlimit; 1676 1677 if (bestlimit < maxfiles) { 1678 int old_maxclients = server.maxclients; 1679 server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS; 1680 if (server.maxclients < 1) { 1681 serverLog(LL_WARNING,"Your current 'ulimit -n' " 1682 "of %llu is not enough for the server to start. " 1683 "Please increase your open file limit to at least " 1684 "%llu. Exiting.", 1685 (unsigned long long) oldlimit, 1686 (unsigned long long) maxfiles); 1687 exit(1); 1688 } 1689 serverLog(LL_WARNING,"You requested maxclients of %d " 1690 "requiring at least %llu max file descriptors.", 1691 old_maxclients, 1692 (unsigned long long) maxfiles); 1693 serverLog(LL_WARNING,"Server can't set maximum open files " 1694 "to %llu because of OS error: %s.", 1695 (unsigned long long) maxfiles, strerror(setrlimit_error)); 1696 serverLog(LL_WARNING,"Current maximum open files is %llu. " 1697 "maxclients has been reduced to %d to compensate for " 1698 "low ulimit. " 1699 "If you need higher maxclients increase 'ulimit -n'.", 1700 (unsigned long long) bestlimit, server.maxclients); 1701 } else { 1702 serverLog(LL_NOTICE,"Increased maximum number of open files " 1703 "to %llu (it was originally set to %llu).", 1704 (unsigned long long) maxfiles, 1705 (unsigned long long) oldlimit); 1706 } 1707 } 1708 } 1709 } 1710 1711 /* Check that server.tcp_backlog can be actually enforced in Linux according 1712 * to the value of /proc/sys/net/core/somaxconn, or warn about it. */ 1713 void checkTcpBacklogSettings(void) { 1714 #ifdef HAVE_PROC_SOMAXCONN 1715 FILE *fp = fopen("/proc/sys/net/core/somaxconn","r"); 1716 char buf[1024]; 1717 if (!fp) return; 1718 if (fgets(buf,sizeof(buf),fp) != NULL) { 1719 int somaxconn = atoi(buf); 1720 if (somaxconn > 0 && somaxconn < server.tcp_backlog) { 1721 serverLog(LL_WARNING,"WARNING: The TCP backlog setting of %d cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of %d.", server.tcp_backlog, somaxconn); 1722 } 1723 } 1724 fclose(fp); 1725 #endif 1726 } 1727 1728 /* Initialize a set of file descriptors to listen to the specified 'port' 1729 * binding the addresses specified in the Redis server configuration. 1730 * 1731 * The listening file descriptors are stored in the integer array 'fds' 1732 * and their number is set in '*count'. 1733 * 1734 * The addresses to bind are specified in the global server.bindaddr array 1735 * and their number is server.bindaddr_count. If the server configuration 1736 * contains no specific addresses to bind, this function will try to 1737 * bind * (all addresses) for both the IPv4 and IPv6 protocols. 1738 * 1739 * On success the function returns C_OK. 1740 * 1741 * On error the function returns C_ERR. For the function to be on 1742 * error, at least one of the server.bindaddr addresses was 1743 * impossible to bind, or no bind addresses were specified in the server 1744 * configuration but the function is not able to bind * for at least 1745 * one of the IPv4 or IPv6 protocols. */ 1746 int listenToPort(int port, int *fds, int *count) { 1747 int j; 1748 1749 /* Force binding of 0.0.0.0 if no bind address is specified, always 1750 * entering the loop if j == 0. */ 1751 if (server.bindaddr_count == 0) server.bindaddr[0] = NULL; 1752 for (j = 0; j < server.bindaddr_count || j == 0; j++) { 1753 if (server.bindaddr[j] == NULL) { 1754 /* Bind * for both IPv6 and IPv4, we enter here only if 1755 * server.bindaddr_count == 0. */ 1756 fds[*count] = anetTcp6Server(server.neterr,port,NULL, 1757 server.tcp_backlog); 1758 if (fds[*count] != ANET_ERR) { 1759 anetNonBlock(NULL,fds[*count]); 1760 (*count)++; 1761 } 1762 fds[*count] = anetTcpServer(server.neterr,port,NULL, 1763 server.tcp_backlog); 1764 if (fds[*count] != ANET_ERR) { 1765 anetNonBlock(NULL,fds[*count]); 1766 (*count)++; 1767 } 1768 /* Exit the loop if we were able to bind * on IPv4 or IPv6, 1769 * otherwise fds[*count] will be ANET_ERR and we'll print an 1770 * error and return to the caller with an error. */ 1771 if (*count) break; 1772 } else if (strchr(server.bindaddr[j],':')) { 1773 /* Bind IPv6 address. */ 1774 fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], 1775 server.tcp_backlog); 1776 } else { 1777 /* Bind IPv4 address. */ 1778 fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], 1779 server.tcp_backlog); 1780 } 1781 if (fds[*count] == ANET_ERR) { 1782 serverLog(LL_WARNING, 1783 "Creating Server TCP listening socket %s:%d: %s", 1784 server.bindaddr[j] ? server.bindaddr[j] : "*", 1785 port, server.neterr); 1786 return C_ERR; 1787 } 1788 anetNonBlock(NULL,fds[*count]); 1789 (*count)++; 1790 } 1791 return C_OK; 1792 } 1793 1794 /* Resets the stats that we expose via INFO or other means that we want 1795 * to reset via CONFIG RESETSTAT. The function is also used in order to 1796 * initialize these fields in initServer() at server startup. */ 1797 void resetServerStats(void) { 1798 int j; 1799 1800 server.stat_numcommands = 0; 1801 server.stat_numconnections = 0; 1802 server.stat_expiredkeys = 0; 1803 server.stat_evictedkeys = 0; 1804 server.stat_keyspace_misses = 0; 1805 server.stat_keyspace_hits = 0; 1806 server.stat_fork_time = 0; 1807 server.stat_fork_rate = 0; 1808 server.stat_rejected_conn = 0; 1809 server.stat_sync_full = 0; 1810 server.stat_sync_partial_ok = 0; 1811 server.stat_sync_partial_err = 0; 1812 for (j = 0; j < STATS_METRIC_COUNT; j++) { 1813 server.inst_metric[j].idx = 0; 1814 server.inst_metric[j].last_sample_time = mstime(); 1815 server.inst_metric[j].last_sample_count = 0; 1816 memset(server.inst_metric[j].samples,0, 1817 sizeof(server.inst_metric[j].samples)); 1818 } 1819 server.stat_net_input_bytes = 0; 1820 server.stat_net_output_bytes = 0; 1821 server.aof_delayed_fsync = 0; 1822 } 1823 1824 void initServer(void) { 1825 int j; 1826 1827 signal(SIGHUP, SIG_IGN); 1828 signal(SIGPIPE, SIG_IGN); 1829 setupSignalHandlers(); 1830 1831 if (server.syslog_enabled) { 1832 openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT, 1833 server.syslog_facility); 1834 } 1835 1836 server.pid = getpid(); 1837 server.current_client = NULL; 1838 server.clients = listCreate(); 1839 server.clients_to_close = listCreate(); 1840 server.slaves = listCreate(); 1841 server.monitors = listCreate(); 1842 server.clients_pending_write = listCreate(); 1843 server.slaveseldb = -1; /* Force to emit the first SELECT command. */ 1844 server.unblocked_clients = listCreate(); 1845 server.ready_keys = listCreate(); 1846 server.clients_waiting_acks = listCreate(); 1847 server.get_ack_from_slaves = 0; 1848 server.clients_paused = 0; 1849 server.system_memory_size = zmalloc_get_memory_size(); 1850 1851 createSharedObjects(); 1852 adjustOpenFilesLimit(); 1853 server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); 1854 server.db = zmalloc(sizeof(redisDb)*server.dbnum); 1855 1856 /* Open the TCP listening socket for the user commands. */ 1857 if (server.port != 0 && 1858 listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) 1859 exit(1); 1860 1861 /* Open the listening Unix domain socket. */ 1862 if (server.unixsocket != NULL) { 1863 unlink(server.unixsocket); /* don't care if this fails */ 1864 server.sofd = anetUnixServer(server.neterr,server.unixsocket, 1865 server.unixsocketperm, server.tcp_backlog); 1866 if (server.sofd == ANET_ERR) { 1867 serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr); 1868 exit(1); 1869 } 1870 anetNonBlock(NULL,server.sofd); 1871 } 1872 1873 /* Abort if there are no listening sockets at all. */ 1874 if (server.ipfd_count == 0 && server.sofd < 0) { 1875 serverLog(LL_WARNING, "Configured to not listen anywhere, exiting."); 1876 exit(1); 1877 } 1878 1879 /* Create the Redis databases, and initialize other internal state. */ 1880 for (j = 0; j < server.dbnum; j++) { 1881 server.db[j].dict = dictCreate(&dbDictType,NULL); 1882 server.db[j].expires = dictCreate(&keyptrDictType,NULL); 1883 server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); 1884 server.db[j].ready_keys = dictCreate(&setDictType,NULL); 1885 server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); 1886 server.db[j].eviction_pool = evictionPoolAlloc(); 1887 server.db[j].id = j; 1888 server.db[j].avg_ttl = 0; 1889 } 1890 server.pubsub_channels = dictCreate(&keylistDictType,NULL); 1891 server.pubsub_patterns = listCreate(); 1892 listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); 1893 listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern); 1894 server.cronloops = 0; 1895 server.rdb_child_pid = -1; 1896 server.aof_child_pid = -1; 1897 server.rdb_child_type = RDB_CHILD_TYPE_NONE; 1898 aofRewriteBufferReset(); 1899 server.aof_buf = sdsempty(); 1900 server.lastsave = time(NULL); /* At startup we consider the DB saved. */ 1901 server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */ 1902 server.rdb_save_time_last = -1; 1903 server.rdb_save_time_start = -1; 1904 server.dirty = 0; 1905 resetServerStats(); 1906 /* A few stats we don't want to reset: server startup time, and peak mem. */ 1907 server.stat_starttime = time(NULL); 1908 server.stat_peak_memory = 0; 1909 server.resident_set_size = 0; 1910 server.lastbgsave_status = C_OK; 1911 server.aof_last_write_status = C_OK; 1912 server.aof_last_write_errno = 0; 1913 server.repl_good_slaves_count = 0; 1914 updateCachedTime(); 1915 1916 /* Create the serverCron() time event, that's our main way to process 1917 * background operations. */ 1918 if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { 1919 serverPanic("Can't create the serverCron time event."); 1920 exit(1); 1921 } 1922 1923 /* Create an event handler for accepting new connections in TCP and Unix 1924 * domain sockets. */ 1925 for (j = 0; j < server.ipfd_count; j++) { 1926 if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, 1927 acceptTcpHandler,NULL) == AE_ERR) 1928 { 1929 serverPanic( 1930 "Unrecoverable error creating server.ipfd file event."); 1931 } 1932 } 1933 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, 1934 acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); 1935 1936 /* Open the AOF file if needed. */ 1937 if (server.aof_state == AOF_ON) { 1938 server.aof_fd = open(server.aof_filename, 1939 O_WRONLY|O_APPEND|O_CREAT,0644); 1940 if (server.aof_fd == -1) { 1941 serverLog(LL_WARNING, "Can't open the append-only file: %s", 1942 strerror(errno)); 1943 exit(1); 1944 } 1945 } 1946 1947 /* 32 bit instances are limited to 4GB of address space, so if there is 1948 * no explicit limit in the user provided configuration we set a limit 1949 * at 3 GB using maxmemory with 'noeviction' policy'. This avoids 1950 * useless crashes of the Redis instance for out of memory. */ 1951 if (server.arch_bits == 32 && server.maxmemory == 0) { 1952 serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now."); 1953 server.maxmemory = 3072LL*(1024*1024); /* 3 GB */ 1954 server.maxmemory_policy = MAXMEMORY_NO_EVICTION; 1955 } 1956 1957 if (server.cluster_enabled) clusterInit(); 1958 replicationScriptCacheInit(); 1959 scriptingInit(1); 1960 slowlogInit(); 1961 latencyMonitorInit(); 1962 bioInit(); 1963 } 1964 1965 /* Populates the Redis Command Table starting from the hard coded list 1966 * we have on top of redis.c file. */ 1967 void populateCommandTable(void) { 1968 int j; 1969 int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); 1970 1971 for (j = 0; j < numcommands; j++) { 1972 struct redisCommand *c = redisCommandTable+j; 1973 char *f = c->sflags; 1974 int retval1, retval2; 1975 1976 while(*f != '\0') { 1977 switch(*f) { 1978 case 'w': c->flags |= CMD_WRITE; break; 1979 case 'r': c->flags |= CMD_READONLY; break; 1980 case 'm': c->flags |= CMD_DENYOOM; break; 1981 case 'a': c->flags |= CMD_ADMIN; break; 1982 case 'p': c->flags |= CMD_PUBSUB; break; 1983 case 's': c->flags |= CMD_NOSCRIPT; break; 1984 case 'R': c->flags |= CMD_RANDOM; break; 1985 case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break; 1986 case 'l': c->flags |= CMD_LOADING; break; 1987 case 't': c->flags |= CMD_STALE; break; 1988 case 'M': c->flags |= CMD_SKIP_MONITOR; break; 1989 case 'k': c->flags |= CMD_ASKING; break; 1990 case 'F': c->flags |= CMD_FAST; break; 1991 default: serverPanic("Unsupported command flag"); break; 1992 } 1993 f++; 1994 } 1995 1996 retval1 = dictAdd(server.commands, sdsnew(c->name), c); 1997 /* Populate an additional dictionary that will be unaffected 1998 * by rename-command statements in redis.conf. */ 1999 retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c); 2000 serverAssert(retval1 == DICT_OK && retval2 == DICT_OK); 2001 } 2002 } 2003 2004 void resetCommandTableStats(void) { 2005 int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); 2006 int j; 2007 2008 for (j = 0; j < numcommands; j++) { 2009 struct redisCommand *c = redisCommandTable+j; 2010 2011 c->microseconds = 0; 2012 c->calls = 0; 2013 } 2014 } 2015 2016 /* ========================== Redis OP Array API ============================ */ 2017 2018 void redisOpArrayInit(redisOpArray *oa) { 2019 oa->ops = NULL; 2020 oa->numops = 0; 2021 } 2022 2023 int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid, 2024 robj **argv, int argc, int target) 2025 { 2026 redisOp *op; 2027 2028 oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1)); 2029 op = oa->ops+oa->numops; 2030 op->cmd = cmd; 2031 op->dbid = dbid; 2032 op->argv = argv; 2033 op->argc = argc; 2034 op->target = target; 2035 oa->numops++; 2036 return oa->numops; 2037 } 2038 2039 void redisOpArrayFree(redisOpArray *oa) { 2040 while(oa->numops) { 2041 int j; 2042 redisOp *op; 2043 2044 oa->numops--; 2045 op = oa->ops+oa->numops; 2046 for (j = 0; j < op->argc; j++) 2047 decrRefCount(op->argv[j]); 2048 zfree(op->argv); 2049 } 2050 zfree(oa->ops); 2051 } 2052 2053 /* ====================== Commands lookup and execution ===================== */ 2054 2055 struct redisCommand *lookupCommand(sds name) { 2056 return dictFetchValue(server.commands, name); 2057 } 2058 2059 struct redisCommand *lookupCommandByCString(char *s) { 2060 struct redisCommand *cmd; 2061 sds name = sdsnew(s); 2062 2063 cmd = dictFetchValue(server.commands, name); 2064 sdsfree(name); 2065 return cmd; 2066 } 2067 2068 /* Lookup the command in the current table, if not found also check in 2069 * the original table containing the original command names unaffected by 2070 * redis.conf rename-command statement. 2071 * 2072 * This is used by functions rewriting the argument vector such as 2073 * rewriteClientCommandVector() in order to set client->cmd pointer 2074 * correctly even if the command was renamed. */ 2075 struct redisCommand *lookupCommandOrOriginal(sds name) { 2076 struct redisCommand *cmd = dictFetchValue(server.commands, name); 2077 2078 if (!cmd) cmd = dictFetchValue(server.orig_commands,name); 2079 return cmd; 2080 } 2081 2082 /* Propagate the specified command (in the context of the specified database id) 2083 * to AOF and Slaves. 2084 * 2085 * flags are an xor between: 2086 * + PROPAGATE_NONE (no propagation of command at all) 2087 * + PROPAGATE_AOF (propagate into the AOF file if is enabled) 2088 * + PROPAGATE_REPL (propagate into the replication link) 2089 * 2090 * This should not be used inside commands implementation. Use instead 2091 * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation(). 2092 */ 2093 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, 2094 int flags) 2095 { 2096 if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) 2097 feedAppendOnlyFile(cmd,dbid,argv,argc); 2098 if (flags & PROPAGATE_REPL) 2099 replicationFeedSlaves(server.slaves,dbid,argv,argc); 2100 } 2101 2102 /* Used inside commands to schedule the propagation of additional commands 2103 * after the current command is propagated to AOF / Replication. 2104 * 2105 * 'cmd' must be a pointer to the Redis command to replicate, dbid is the 2106 * database ID the command should be propagated into. 2107 * Arguments of the command to propagte are passed as an array of redis 2108 * objects pointers of len 'argc', using the 'argv' vector. 2109 * 2110 * The function does not take a reference to the passed 'argv' vector, 2111 * so it is up to the caller to release the passed argv (but it is usually 2112 * stack allocated). The function autoamtically increments ref count of 2113 * passed objects, so the caller does not need to. */ 2114 void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, 2115 int target) 2116 { 2117 robj **argvcopy; 2118 int j; 2119 2120 if (server.loading) return; /* No propagation during loading. */ 2121 2122 argvcopy = zmalloc(sizeof(robj*)*argc); 2123 for (j = 0; j < argc; j++) { 2124 argvcopy[j] = argv[j]; 2125 incrRefCount(argv[j]); 2126 } 2127 redisOpArrayAppend(&server.also_propagate,cmd,dbid,argvcopy,argc,target); 2128 } 2129 2130 /* It is possible to call the function forceCommandPropagation() inside a 2131 * Redis command implementation in order to to force the propagation of a 2132 * specific command execution into AOF / Replication. */ 2133 void forceCommandPropagation(client *c, int flags) { 2134 if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL; 2135 if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF; 2136 } 2137 2138 /* Avoid that the executed command is propagated at all. This way we 2139 * are free to just propagate what we want using the alsoPropagate() 2140 * API. */ 2141 void preventCommandPropagation(client *c) { 2142 c->flags |= CLIENT_PREVENT_PROP; 2143 } 2144 2145 /* AOF specific version of preventCommandPropagation(). */ 2146 void preventCommandAOF(client *c) { 2147 c->flags |= CLIENT_PREVENT_AOF_PROP; 2148 } 2149 2150 /* Replication specific version of preventCommandPropagation(). */ 2151 void preventCommandReplication(client *c) { 2152 c->flags |= CLIENT_PREVENT_REPL_PROP; 2153 } 2154 2155 /* Call() is the core of Redis execution of a command. 2156 * 2157 * The following flags can be passed: 2158 * CMD_CALL_NONE No flags. 2159 * CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed. 2160 * CMD_CALL_STATS Populate command stats. 2161 * CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset 2162 * or if the client flags are forcing propagation. 2163 * CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset 2164 * or if the client flags are forcing propagation. 2165 * CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL. 2166 * CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE. 2167 * 2168 * The exact propagation behavior depends on the client flags. 2169 * Specifically: 2170 * 2171 * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set 2172 * and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set 2173 * in the call flags, then the command is propagated even if the 2174 * dataset was not affected by the command. 2175 * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP 2176 * are set, the propagation into AOF or to slaves is not performed even 2177 * if the command modified the dataset. 2178 * 2179 * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF 2180 * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or 2181 * slaves propagation will never occur. 2182 * 2183 * Client flags are modified by the implementation of a given command 2184 * using the following API: 2185 * 2186 * forceCommandPropagation(client *c, int flags); 2187 * preventCommandPropagation(client *c); 2188 * preventCommandAOF(client *c); 2189 * preventCommandReplication(client *c); 2190 * 2191 */ 2192 void call(client *c, int flags) { 2193 long long dirty, start, duration; 2194 int client_old_flags = c->flags; 2195 2196 /* Sent the command to clients in MONITOR mode, only if the commands are 2197 * not generated from reading an AOF. */ 2198 if (listLength(server.monitors) && 2199 !server.loading && 2200 !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) 2201 { 2202 replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); 2203 } 2204 2205 /* Initialization: clear the flags that must be set by the command on 2206 * demand, and initialize the array for additional commands propagation. */ 2207 c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); 2208 redisOpArrayInit(&server.also_propagate); 2209 2210 /* Call the command. */ 2211 dirty = server.dirty; 2212 start = ustime(); 2213 c->cmd->proc(c); 2214 duration = ustime()-start; 2215 dirty = server.dirty-dirty; 2216 if (dirty < 0) dirty = 0; 2217 2218 /* When EVAL is called loading the AOF we don't want commands called 2219 * from Lua to go into the slowlog or to populate statistics. */ 2220 if (server.loading && c->flags & CLIENT_LUA) 2221 flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS); 2222 2223 /* If the caller is Lua, we want to force the EVAL caller to propagate 2224 * the script if the command flag or client flag are forcing the 2225 * propagation. */ 2226 if (c->flags & CLIENT_LUA && server.lua_caller) { 2227 if (c->flags & CLIENT_FORCE_REPL) 2228 server.lua_caller->flags |= CLIENT_FORCE_REPL; 2229 if (c->flags & CLIENT_FORCE_AOF) 2230 server.lua_caller->flags |= CLIENT_FORCE_AOF; 2231 } 2232 2233 /* Log the command into the Slow log if needed, and populate the 2234 * per-command statistics that we show in INFO commandstats. */ 2235 if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) { 2236 char *latency_event = (c->cmd->flags & CMD_FAST) ? 2237 "fast-command" : "command"; 2238 latencyAddSampleIfNeeded(latency_event,duration/1000); 2239 slowlogPushEntryIfNeeded(c->argv,c->argc,duration); 2240 } 2241 if (flags & CMD_CALL_STATS) { 2242 c->cmd->microseconds += duration; 2243 c->cmd->calls++; 2244 } 2245 2246 /* Propagate the command into the AOF and replication link */ 2247 if (flags & CMD_CALL_PROPAGATE && 2248 (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) 2249 { 2250 int propagate_flags = PROPAGATE_NONE; 2251 2252 /* Check if the command operated changes in the data set. If so 2253 * set for replication / AOF propagation. */ 2254 if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); 2255 2256 /* If the client forced AOF / replication of the command, set 2257 * the flags regardless of the command effects on the data set. */ 2258 if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL; 2259 if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF; 2260 2261 /* However prevent AOF / replication propagation if the command 2262 * implementatino called preventCommandPropagation() or similar, 2263 * or if we don't have the call() flags to do so. */ 2264 if (c->flags & CLIENT_PREVENT_REPL_PROP || 2265 !(flags & CMD_CALL_PROPAGATE_REPL)) 2266 propagate_flags &= ~PROPAGATE_REPL; 2267 if (c->flags & CLIENT_PREVENT_AOF_PROP || 2268 !(flags & CMD_CALL_PROPAGATE_AOF)) 2269 propagate_flags &= ~PROPAGATE_AOF; 2270 2271 /* Call propagate() only if at least one of AOF / replication 2272 * propagation is needed. */ 2273 if (propagate_flags != PROPAGATE_NONE) 2274 propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); 2275 } 2276 2277 /* Restore the old replication flags, since call() can be executed 2278 * recursively. */ 2279 c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); 2280 c->flags |= client_old_flags & 2281 (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); 2282 2283 /* Handle the alsoPropagate() API to handle commands that want to propagate 2284 * multiple separated commands. Note that alsoPropagate() is not affected 2285 * by CLIENT_PREVENT_PROP flag. */ 2286 if (server.also_propagate.numops) { 2287 int j; 2288 redisOp *rop; 2289 2290 if (flags & CMD_CALL_PROPAGATE) { 2291 for (j = 0; j < server.also_propagate.numops; j++) { 2292 rop = &server.also_propagate.ops[j]; 2293 int target = rop->target; 2294 /* Whatever the command wish is, we honor the call() flags. */ 2295 if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF; 2296 if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL; 2297 if (target) 2298 propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target); 2299 } 2300 } 2301 redisOpArrayFree(&server.also_propagate); 2302 } 2303 server.stat_numcommands++; 2304 } 2305 2306 /* If this function gets called we already read a whole 2307 * command, arguments are in the client argv/argc fields. 2308 * processCommand() execute the command or prepare the 2309 * server for a bulk read from the client. 2310 * 2311 * If 1 is returned the client is still alive and valid and 2312 * other operations can be performed by the caller. Otherwise 2313 * if 0 is returned the client was destroyed (i.e. after QUIT). */ 2314 int processCommand(client *c) { 2315 /* The QUIT command is handled separately. Normal command procs will 2316 * go through checking for replication and QUIT will cause trouble 2317 * when FORCE_REPLICATION is enabled and would be implemented in 2318 * a regular command proc. */ 2319 if (!strcasecmp(c->argv[0]->ptr,"quit")) { 2320 addReply(c,shared.ok); 2321 c->flags |= CLIENT_CLOSE_AFTER_REPLY; 2322 return C_ERR; 2323 } 2324 2325 /* Now lookup the command and check ASAP about trivial error conditions 2326 * such as wrong arity, bad command name and so forth. */ 2327 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); 2328 if (!c->cmd) { 2329 flagTransaction(c); 2330 addReplyErrorFormat(c,"unknown command '%s'", 2331 (char*)c->argv[0]->ptr); 2332 return C_OK; 2333 } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || 2334 (c->argc < -c->cmd->arity)) { 2335 flagTransaction(c); 2336 addReplyErrorFormat(c,"wrong number of arguments for '%s' command", 2337 c->cmd->name); 2338 return C_OK; 2339 } 2340 2341 /* Check if the user is authenticated */ 2342 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) 2343 { 2344 flagTransaction(c); 2345 addReply(c,shared.noautherr); 2346 return C_OK; 2347 } 2348 2349 /* If cluster is enabled perform the cluster redirection here. 2350 * However we don't perform the redirection if: 2351 * 1) The sender of this command is our master. 2352 * 2) The command has no key arguments. */ 2353 if (server.cluster_enabled && 2354 !(c->flags & CLIENT_MASTER) && 2355 !(c->flags & CLIENT_LUA && 2356 server.lua_caller->flags & CLIENT_MASTER) && 2357 !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) 2358 { 2359 int hashslot; 2360 2361 if (server.cluster->state != CLUSTER_OK) { 2362 flagTransaction(c); 2363 clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE); 2364 return C_OK; 2365 } else { 2366 int error_code; 2367 clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); 2368 if (n == NULL || n != server.cluster->myself) { 2369 flagTransaction(c); 2370 clusterRedirectClient(c,n,hashslot,error_code); 2371 return C_OK; 2372 } 2373 } 2374 } 2375 2376 /* Handle the maxmemory directive. 2377 * 2378 * First we try to free some memory if possible (if there are volatile 2379 * keys in the dataset). If there are not the only thing we can do 2380 * is returning an error. */ 2381 if (server.maxmemory) { 2382 int retval = freeMemoryIfNeeded(); 2383 if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) { 2384 flagTransaction(c); 2385 addReply(c, shared.oomerr); 2386 return C_OK; 2387 } 2388 } 2389 2390 /* Don't accept write commands if there are problems persisting on disk 2391 * and if this is a master instance. */ 2392 if (((server.stop_writes_on_bgsave_err && 2393 server.saveparamslen > 0 && 2394 server.lastbgsave_status == C_ERR) || 2395 server.aof_last_write_status == C_ERR) && 2396 server.masterhost == NULL && 2397 (c->cmd->flags & CMD_WRITE || 2398 c->cmd->proc == pingCommand)) 2399 { 2400 flagTransaction(c); 2401 if (server.aof_last_write_status == C_OK) 2402 addReply(c, shared.bgsaveerr); 2403 else 2404 addReplySds(c, 2405 sdscatprintf(sdsempty(), 2406 "-MISCONF Errors writing to the AOF file: %s\r\n", 2407 strerror(server.aof_last_write_errno))); 2408 return C_OK; 2409 } 2410 2411 /* Don't accept write commands if there are not enough good slaves and 2412 * user configured the min-slaves-to-write option. */ 2413 if (server.masterhost == NULL && 2414 server.repl_min_slaves_to_write && 2415 server.repl_min_slaves_max_lag && 2416 c->cmd->flags & CMD_WRITE && 2417 server.repl_good_slaves_count < server.repl_min_slaves_to_write) 2418 { 2419 flagTransaction(c); 2420 addReply(c, shared.noreplicaserr); 2421 return C_OK; 2422 } 2423 2424 /* Don't accept write commands if this is a read only slave. But 2425 * accept write commands if this is our master. */ 2426 if (server.masterhost && server.repl_slave_ro && 2427 !(c->flags & CLIENT_MASTER) && 2428 c->cmd->flags & CMD_WRITE) 2429 { 2430 addReply(c, shared.roslaveerr); 2431 return C_OK; 2432 } 2433 2434 /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ 2435 if (c->flags & CLIENT_PUBSUB && 2436 c->cmd->proc != pingCommand && 2437 c->cmd->proc != subscribeCommand && 2438 c->cmd->proc != unsubscribeCommand && 2439 c->cmd->proc != psubscribeCommand && 2440 c->cmd->proc != punsubscribeCommand) { 2441 addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"); 2442 return C_OK; 2443 } 2444 2445 /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and 2446 * we are a slave with a broken link with master. */ 2447 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && 2448 server.repl_serve_stale_data == 0 && 2449 !(c->cmd->flags & CMD_STALE)) 2450 { 2451 flagTransaction(c); 2452 addReply(c, shared.masterdownerr); 2453 return C_OK; 2454 } 2455 2456 /* Loading DB? Return an error if the command has not the 2457 * CMD_LOADING flag. */ 2458 if (server.loading && !(c->cmd->flags & CMD_LOADING)) { 2459 addReply(c, shared.loadingerr); 2460 return C_OK; 2461 } 2462 2463 /* Lua script too slow? Only allow a limited number of commands. */ 2464 if (server.lua_timedout && 2465 c->cmd->proc != authCommand && 2466 c->cmd->proc != replconfCommand && 2467 !(c->cmd->proc == shutdownCommand && 2468 c->argc == 2 && 2469 tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && 2470 !(c->cmd->proc == scriptCommand && 2471 c->argc == 2 && 2472 tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) 2473 { 2474 flagTransaction(c); 2475 addReply(c, shared.slowscripterr); 2476 return C_OK; 2477 } 2478 2479 /* Exec the command */ 2480 if (c->flags & CLIENT_MULTI && 2481 c->cmd->proc != execCommand && c->cmd->proc != discardCommand && 2482 c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) 2483 { 2484 queueMultiCommand(c); 2485 addReply(c,shared.queued); 2486 } else { 2487 call(c,CMD_CALL_FULL); 2488 c->woff = server.master_repl_offset; 2489 if (listLength(server.ready_keys)) 2490 handleClientsBlockedOnLists(); 2491 } 2492 return C_OK; 2493 } 2494 2495 /*================================== Shutdown =============================== */ 2496 2497 /* Close listening sockets. Also unlink the unix domain socket if 2498 * unlink_unix_socket is non-zero. */ 2499 void closeListeningSockets(int unlink_unix_socket) { 2500 int j; 2501 2502 for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]); 2503 if (server.sofd != -1) close(server.sofd); 2504 if (server.cluster_enabled) 2505 for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]); 2506 if (unlink_unix_socket && server.unixsocket) { 2507 serverLog(LL_NOTICE,"Removing the unix socket file."); 2508 unlink(server.unixsocket); /* don't care if this fails */ 2509 } 2510 } 2511 2512 int prepareForShutdown(int flags) { 2513 int save = flags & SHUTDOWN_SAVE; 2514 int nosave = flags & SHUTDOWN_NOSAVE; 2515 2516 serverLog(LL_WARNING,"User requested shutdown..."); 2517 2518 /* Kill all the Lua debugger forked sessions. */ 2519 ldbKillForkedSessions(); 2520 2521 /* Kill the saving child if there is a background saving in progress. 2522 We want to avoid race conditions, for instance our saving child may 2523 overwrite the synchronous saving did by SHUTDOWN. */ 2524 if (server.rdb_child_pid != -1) { 2525 serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!"); 2526 kill(server.rdb_child_pid,SIGUSR1); 2527 rdbRemoveTempFile(server.rdb_child_pid); 2528 } 2529 2530 if (server.aof_state != AOF_OFF) { 2531 /* Kill the AOF saving child as the AOF we already have may be longer 2532 * but contains the full dataset anyway. */ 2533 if (server.aof_child_pid != -1) { 2534 /* If we have AOF enabled but haven't written the AOF yet, don't 2535 * shutdown or else the dataset will be lost. */ 2536 if (server.aof_state == AOF_WAIT_REWRITE) { 2537 serverLog(LL_WARNING, "Writing initial AOF, can't exit."); 2538 return C_ERR; 2539 } 2540 serverLog(LL_WARNING, 2541 "There is a child rewriting the AOF. Killing it!"); 2542 kill(server.aof_child_pid,SIGUSR1); 2543 } 2544 /* Append only file: fsync() the AOF and exit */ 2545 serverLog(LL_NOTICE,"Calling fsync() on the AOF file."); 2546 aof_fsync(server.aof_fd); 2547 } 2548 2549 /* Create a new RDB file before exiting. */ 2550 if ((server.saveparamslen > 0 && !nosave) || save) { 2551 serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting."); 2552 /* Snapshotting. Perform a SYNC SAVE and exit */ 2553 if (rdbSave(server.rdb_filename) != C_OK) { 2554 /* Ooops.. error saving! The best we can do is to continue 2555 * operating. Note that if there was a background saving process, 2556 * in the next cron() Redis will be notified that the background 2557 * saving aborted, handling special stuff like slaves pending for 2558 * synchronization... */ 2559 serverLog(LL_WARNING,"Error trying to save the DB, can't exit."); 2560 return C_ERR; 2561 } 2562 } 2563 2564 /* Remove the pid file if possible and needed. */ 2565 if (server.daemonize || server.pidfile) { 2566 serverLog(LL_NOTICE,"Removing the pid file."); 2567 unlink(server.pidfile); 2568 } 2569 2570 /* Best effort flush of slave output buffers, so that we hopefully 2571 * send them pending writes. */ 2572 flushSlavesOutputBuffers(); 2573 2574 /* Close the listening sockets. Apparently this allows faster restarts. */ 2575 closeListeningSockets(1); 2576 serverLog(LL_WARNING,"%s is now ready to exit, bye bye...", 2577 server.sentinel_mode ? "Sentinel" : "Redis"); 2578 return C_OK; 2579 } 2580 2581 /*================================== Commands =============================== */ 2582 2583 /* Return zero if strings are the same, non-zero if they are not. 2584 * The comparison is performed in a way that prevents an attacker to obtain 2585 * information about the nature of the strings just monitoring the execution 2586 * time of the function. 2587 * 2588 * Note that limiting the comparison length to strings up to 512 bytes we 2589 * can avoid leaking any information about the password length and any 2590 * possible branch misprediction related leak. 2591 */ 2592 int time_independent_strcmp(char *a, char *b) { 2593 char bufa[CONFIG_AUTHPASS_MAX_LEN], bufb[CONFIG_AUTHPASS_MAX_LEN]; 2594 /* The above two strlen perform len(a) + len(b) operations where either 2595 * a or b are fixed (our password) length, and the difference is only 2596 * relative to the length of the user provided string, so no information 2597 * leak is possible in the following two lines of code. */ 2598 unsigned int alen = strlen(a); 2599 unsigned int blen = strlen(b); 2600 unsigned int j; 2601 int diff = 0; 2602 2603 /* We can't compare strings longer than our static buffers. 2604 * Note that this will never pass the first test in practical circumstances 2605 * so there is no info leak. */ 2606 if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1; 2607 2608 memset(bufa,0,sizeof(bufa)); /* Constant time. */ 2609 memset(bufb,0,sizeof(bufb)); /* Constant time. */ 2610 /* Again the time of the following two copies is proportional to 2611 * len(a) + len(b) so no info is leaked. */ 2612 memcpy(bufa,a,alen); 2613 memcpy(bufb,b,blen); 2614 2615 /* Always compare all the chars in the two buffers without 2616 * conditional expressions. */ 2617 for (j = 0; j < sizeof(bufa); j++) { 2618 diff |= (bufa[j] ^ bufb[j]); 2619 } 2620 /* Length must be equal as well. */ 2621 diff |= alen ^ blen; 2622 return diff; /* If zero strings are the same. */ 2623 } 2624 2625 void authCommand(client *c) { 2626 if (!server.requirepass) { 2627 addReplyError(c,"Client sent AUTH, but no password is set"); 2628 } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) { 2629 c->authenticated = 1; 2630 addReply(c,shared.ok); 2631 } else { 2632 c->authenticated = 0; 2633 addReplyError(c,"invalid password"); 2634 } 2635 } 2636 2637 /* The PING command. It works in a different way if the client is in 2638 * in Pub/Sub mode. */ 2639 void pingCommand(client *c) { 2640 /* The command takes zero or one arguments. */ 2641 if (c->argc > 2) { 2642 addReplyErrorFormat(c,"wrong number of arguments for '%s' command", 2643 c->cmd->name); 2644 return; 2645 } 2646 2647 if (c->flags & CLIENT_PUBSUB) { 2648 addReply(c,shared.mbulkhdr[2]); 2649 addReplyBulkCBuffer(c,"pong",4); 2650 if (c->argc == 1) 2651 addReplyBulkCBuffer(c,"",0); 2652 else 2653 addReplyBulk(c,c->argv[1]); 2654 } else { 2655 if (c->argc == 1) 2656 addReply(c,shared.pong); 2657 else 2658 addReplyBulk(c,c->argv[1]); 2659 } 2660 } 2661 2662 void echoCommand(client *c) { 2663 addReplyBulk(c,c->argv[1]); 2664 } 2665 2666 void timeCommand(client *c) { 2667 struct timeval tv; 2668 2669 /* gettimeofday() can only fail if &tv is a bad address so we 2670 * don't check for errors. */ 2671 gettimeofday(&tv,NULL); 2672 addReplyMultiBulkLen(c,2); 2673 addReplyBulkLongLong(c,tv.tv_sec); 2674 addReplyBulkLongLong(c,tv.tv_usec); 2675 } 2676 2677 /* Helper function for addReplyCommand() to output flags. */ 2678 int addReplyCommandFlag(client *c, struct redisCommand *cmd, int f, char *reply) { 2679 if (cmd->flags & f) { 2680 addReplyStatus(c, reply); 2681 return 1; 2682 } 2683 return 0; 2684 } 2685 2686 /* Output the representation of a Redis command. Used by the COMMAND command. */ 2687 void addReplyCommand(client *c, struct redisCommand *cmd) { 2688 if (!cmd) { 2689 addReply(c, shared.nullbulk); 2690 } else { 2691 /* We are adding: command name, arg count, flags, first, last, offset */ 2692 addReplyMultiBulkLen(c, 6); 2693 addReplyBulkCString(c, cmd->name); 2694 addReplyLongLong(c, cmd->arity); 2695 2696 int flagcount = 0; 2697 void *flaglen = addDeferredMultiBulkLength(c); 2698 flagcount += addReplyCommandFlag(c,cmd,CMD_WRITE, "write"); 2699 flagcount += addReplyCommandFlag(c,cmd,CMD_READONLY, "readonly"); 2700 flagcount += addReplyCommandFlag(c,cmd,CMD_DENYOOM, "denyoom"); 2701 flagcount += addReplyCommandFlag(c,cmd,CMD_ADMIN, "admin"); 2702 flagcount += addReplyCommandFlag(c,cmd,CMD_PUBSUB, "pubsub"); 2703 flagcount += addReplyCommandFlag(c,cmd,CMD_NOSCRIPT, "noscript"); 2704 flagcount += addReplyCommandFlag(c,cmd,CMD_RANDOM, "random"); 2705 flagcount += addReplyCommandFlag(c,cmd,CMD_SORT_FOR_SCRIPT,"sort_for_script"); 2706 flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading"); 2707 flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale"); 2708 flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor"); 2709 flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking"); 2710 flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast"); 2711 if (cmd->getkeys_proc) { 2712 addReplyStatus(c, "movablekeys"); 2713 flagcount += 1; 2714 } 2715 setDeferredMultiBulkLength(c, flaglen, flagcount); 2716 2717 addReplyLongLong(c, cmd->firstkey); 2718 addReplyLongLong(c, cmd->lastkey); 2719 addReplyLongLong(c, cmd->keystep); 2720 } 2721 } 2722 2723 /* COMMAND <subcommand> <args> */ 2724 void commandCommand(client *c) { 2725 dictIterator *di; 2726 dictEntry *de; 2727 2728 if (c->argc == 1) { 2729 addReplyMultiBulkLen(c, dictSize(server.commands)); 2730 di = dictGetIterator(server.commands); 2731 while ((de = dictNext(di)) != NULL) { 2732 addReplyCommand(c, dictGetVal(de)); 2733 } 2734 dictReleaseIterator(di); 2735 } else if (!strcasecmp(c->argv[1]->ptr, "info")) { 2736 int i; 2737 addReplyMultiBulkLen(c, c->argc-2); 2738 for (i = 2; i < c->argc; i++) { 2739 addReplyCommand(c, dictFetchValue(server.commands, c->argv[i]->ptr)); 2740 } 2741 } else if (!strcasecmp(c->argv[1]->ptr, "count") && c->argc == 2) { 2742 addReplyLongLong(c, dictSize(server.commands)); 2743 } else if (!strcasecmp(c->argv[1]->ptr,"getkeys") && c->argc >= 3) { 2744 struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr); 2745 int *keys, numkeys, j; 2746 2747 if (!cmd) { 2748 addReplyErrorFormat(c,"Invalid command specified"); 2749 return; 2750 } else if ((cmd->arity > 0 && cmd->arity != c->argc-2) || 2751 ((c->argc-2) < -cmd->arity)) 2752 { 2753 addReplyError(c,"Invalid number of arguments specified for command"); 2754 return; 2755 } 2756 2757 keys = getKeysFromCommand(cmd,c->argv+2,c->argc-2,&numkeys); 2758 addReplyMultiBulkLen(c,numkeys); 2759 for (j = 0; j < numkeys; j++) addReplyBulk(c,c->argv[keys[j]+2]); 2760 getKeysFreeResult(keys); 2761 } else { 2762 addReplyError(c, "Unknown subcommand or wrong number of arguments."); 2763 return; 2764 } 2765 } 2766 2767 /* Convert an amount of bytes into a human readable string in the form 2768 * of 100B, 2G, 100M, 4K, and so forth. */ 2769 void bytesToHuman(char *s, unsigned long long n) { 2770 double d; 2771 2772 if (n < 1024) { 2773 /* Bytes */ 2774 sprintf(s,"%lluB",n); 2775 return; 2776 } else if (n < (1024*1024)) { 2777 d = (double)n/(1024); 2778 sprintf(s,"%.2fK",d); 2779 } else if (n < (1024LL*1024*1024)) { 2780 d = (double)n/(1024*1024); 2781 sprintf(s,"%.2fM",d); 2782 } else if (n < (1024LL*1024*1024*1024)) { 2783 d = (double)n/(1024LL*1024*1024); 2784 sprintf(s,"%.2fG",d); 2785 } else if (n < (1024LL*1024*1024*1024*1024)) { 2786 d = (double)n/(1024LL*1024*1024*1024); 2787 sprintf(s,"%.2fT",d); 2788 } else if (n < (1024LL*1024*1024*1024*1024*1024)) { 2789 d = (double)n/(1024LL*1024*1024*1024*1024); 2790 sprintf(s,"%.2fP",d); 2791 } else { 2792 /* Let's hope we never need this */ 2793 sprintf(s,"%lluB",n); 2794 } 2795 } 2796 2797 /* Create the string returned by the INFO command. This is decoupled 2798 * by the INFO command itself as we need to report the same information 2799 * on memory corruption problems. */ 2800 sds genRedisInfoString(char *section) { 2801 sds info = sdsempty(); 2802 time_t uptime = server.unixtime-server.stat_starttime; 2803 int j, numcommands; 2804 struct rusage self_ru, c_ru; 2805 unsigned long lol, bib; 2806 int allsections = 0, defsections = 0; 2807 int sections = 0; 2808 2809 if (section == NULL) section = "default"; 2810 allsections = strcasecmp(section,"all") == 0; 2811 defsections = strcasecmp(section,"default") == 0; 2812 2813 getrusage(RUSAGE_SELF, &self_ru); 2814 getrusage(RUSAGE_CHILDREN, &c_ru); 2815 getClientsMaxBuffers(&lol,&bib); 2816 2817 /* Server */ 2818 if (allsections || defsections || !strcasecmp(section,"server")) { 2819 static int call_uname = 1; 2820 static struct utsname name; 2821 char *mode; 2822 2823 if (server.cluster_enabled) mode = "cluster"; 2824 else if (server.sentinel_mode) mode = "sentinel"; 2825 else mode = "standalone"; 2826 2827 if (sections++) info = sdscat(info,"\r\n"); 2828 2829 if (call_uname) { 2830 /* Uname can be slow and is always the same output. Cache it. */ 2831 uname(&name); 2832 call_uname = 0; 2833 } 2834 2835 info = sdscatprintf(info, 2836 "# Server\r\n" 2837 "redis_version:%s\r\n" 2838 "redis_git_sha1:%s\r\n" 2839 "redis_git_dirty:%d\r\n" 2840 "redis_build_id:%llx\r\n" 2841 "redis_mode:%s\r\n" 2842 "os:%s %s %s\r\n" 2843 "arch_bits:%d\r\n" 2844 "multiplexing_api:%s\r\n" 2845 "gcc_version:%d.%d.%d\r\n" 2846 "process_id:%ld\r\n" 2847 "run_id:%s\r\n" 2848 "tcp_port:%d\r\n" 2849 "uptime_in_seconds:%jd\r\n" 2850 "uptime_in_days:%jd\r\n" 2851 "hz:%d\r\n" 2852 "lru_clock:%ld\r\n" 2853 "executable:%s\r\n" 2854 "config_file:%s\r\n", 2855 REDIS_VERSION, 2856 redisGitSHA1(), 2857 strtol(redisGitDirty(),NULL,10) > 0, 2858 (unsigned long long) redisBuildId(), 2859 mode, 2860 name.sysname, name.release, name.machine, 2861 server.arch_bits, 2862 aeGetApiName(), 2863 #ifdef __GNUC__ 2864 __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__, 2865 #else 2866 0,0,0, 2867 #endif 2868 (long) getpid(), 2869 server.runid, 2870 server.port, 2871 (intmax_t)uptime, 2872 (intmax_t)(uptime/(3600*24)), 2873 server.hz, 2874 (unsigned long) server.lruclock, 2875 server.executable ? server.executable : "", 2876 server.configfile ? server.configfile : ""); 2877 } 2878 2879 /* Clients */ 2880 if (allsections || defsections || !strcasecmp(section,"clients")) { 2881 if (sections++) info = sdscat(info,"\r\n"); 2882 info = sdscatprintf(info, 2883 "# Clients\r\n" 2884 "connected_clients:%lu\r\n" 2885 "client_longest_output_list:%lu\r\n" 2886 "client_biggest_input_buf:%lu\r\n" 2887 "blocked_clients:%d\r\n", 2888 listLength(server.clients)-listLength(server.slaves), 2889 lol, bib, 2890 server.bpop_blocked_clients); 2891 } 2892 2893 /* Memory */ 2894 if (allsections || defsections || !strcasecmp(section,"memory")) { 2895 char hmem[64]; 2896 char peak_hmem[64]; 2897 char total_system_hmem[64]; 2898 char used_memory_lua_hmem[64]; 2899 char used_memory_rss_hmem[64]; 2900 char maxmemory_hmem[64]; 2901 size_t zmalloc_used = zmalloc_used_memory(); 2902 size_t total_system_mem = server.system_memory_size; 2903 const char *evict_policy = maxmemoryToString(); 2904 long long memory_lua = (long long)lua_gc(server.lua,LUA_GCCOUNT,0)*1024; 2905 2906 /* Peak memory is updated from time to time by serverCron() so it 2907 * may happen that the instantaneous value is slightly bigger than 2908 * the peak value. This may confuse users, so we update the peak 2909 * if found smaller than the current memory usage. */ 2910 if (zmalloc_used > server.stat_peak_memory) 2911 server.stat_peak_memory = zmalloc_used; 2912 2913 bytesToHuman(hmem,zmalloc_used); 2914 bytesToHuman(peak_hmem,server.stat_peak_memory); 2915 bytesToHuman(total_system_hmem,total_system_mem); 2916 bytesToHuman(used_memory_lua_hmem,memory_lua); 2917 bytesToHuman(used_memory_rss_hmem,server.resident_set_size); 2918 bytesToHuman(maxmemory_hmem,server.maxmemory); 2919 2920 if (sections++) info = sdscat(info,"\r\n"); 2921 info = sdscatprintf(info, 2922 "# Memory\r\n" 2923 "used_memory:%zu\r\n" 2924 "used_memory_human:%s\r\n" 2925 "used_memory_rss:%zu\r\n" 2926 "used_memory_rss_human:%s\r\n" 2927 "used_memory_peak:%zu\r\n" 2928 "used_memory_peak_human:%s\r\n" 2929 "total_system_memory:%lu\r\n" 2930 "total_system_memory_human:%s\r\n" 2931 "used_memory_lua:%lld\r\n" 2932 "used_memory_lua_human:%s\r\n" 2933 "maxmemory:%lld\r\n" 2934 "maxmemory_human:%s\r\n" 2935 "maxmemory_policy:%s\r\n" 2936 "mem_fragmentation_ratio:%.2f\r\n" 2937 "mem_allocator:%s\r\n", 2938 zmalloc_used, 2939 hmem, 2940 server.resident_set_size, 2941 used_memory_rss_hmem, 2942 server.stat_peak_memory, 2943 peak_hmem, 2944 (unsigned long)total_system_mem, 2945 total_system_hmem, 2946 memory_lua, 2947 used_memory_lua_hmem, 2948 server.maxmemory, 2949 maxmemory_hmem, 2950 evict_policy, 2951 zmalloc_get_fragmentation_ratio(server.resident_set_size), 2952 ZMALLOC_LIB 2953 ); 2954 } 2955 2956 /* Persistence */ 2957 if (allsections || defsections || !strcasecmp(section,"persistence")) { 2958 if (sections++) info = sdscat(info,"\r\n"); 2959 info = sdscatprintf(info, 2960 "# Persistence\r\n" 2961 "loading:%d\r\n" 2962 "rdb_changes_since_last_save:%lld\r\n" 2963 "rdb_bgsave_in_progress:%d\r\n" 2964 "rdb_last_save_time:%jd\r\n" 2965 "rdb_last_bgsave_status:%s\r\n" 2966 "rdb_last_bgsave_time_sec:%jd\r\n" 2967 "rdb_current_bgsave_time_sec:%jd\r\n" 2968 "aof_enabled:%d\r\n" 2969 "aof_rewrite_in_progress:%d\r\n" 2970 "aof_rewrite_scheduled:%d\r\n" 2971 "aof_last_rewrite_time_sec:%jd\r\n" 2972 "aof_current_rewrite_time_sec:%jd\r\n" 2973 "aof_last_bgrewrite_status:%s\r\n" 2974 "aof_last_write_status:%s\r\n", 2975 server.loading, 2976 server.dirty, 2977 server.rdb_child_pid != -1, 2978 (intmax_t)server.lastsave, 2979 (server.lastbgsave_status == C_OK) ? "ok" : "err", 2980 (intmax_t)server.rdb_save_time_last, 2981 (intmax_t)((server.rdb_child_pid == -1) ? 2982 -1 : time(NULL)-server.rdb_save_time_start), 2983 server.aof_state != AOF_OFF, 2984 server.aof_child_pid != -1, 2985 server.aof_rewrite_scheduled, 2986 (intmax_t)server.aof_rewrite_time_last, 2987 (intmax_t)((server.aof_child_pid == -1) ? 2988 -1 : time(NULL)-server.aof_rewrite_time_start), 2989 (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err", 2990 (server.aof_last_write_status == C_OK) ? "ok" : "err"); 2991 2992 if (server.aof_state != AOF_OFF) { 2993 info = sdscatprintf(info, 2994 "aof_current_size:%lld\r\n" 2995 "aof_base_size:%lld\r\n" 2996 "aof_pending_rewrite:%d\r\n" 2997 "aof_buffer_length:%zu\r\n" 2998 "aof_rewrite_buffer_length:%lu\r\n" 2999 "aof_pending_bio_fsync:%llu\r\n" 3000 "aof_delayed_fsync:%lu\r\n", 3001 (long long) server.aof_current_size, 3002 (long long) server.aof_rewrite_base_size, 3003 server.aof_rewrite_scheduled, 3004 sdslen(server.aof_buf), 3005 aofRewriteBufferSize(), 3006 bioPendingJobsOfType(BIO_AOF_FSYNC), 3007 server.aof_delayed_fsync); 3008 } 3009 3010 if (server.loading) { 3011 double perc; 3012 time_t eta, elapsed; 3013 off_t remaining_bytes = server.loading_total_bytes- 3014 server.loading_loaded_bytes; 3015 3016 perc = ((double)server.loading_loaded_bytes / 3017 (server.loading_total_bytes+1)) * 100; 3018 3019 elapsed = time(NULL)-server.loading_start_time; 3020 if (elapsed == 0) { 3021 eta = 1; /* A fake 1 second figure if we don't have 3022 enough info */ 3023 } else { 3024 eta = (elapsed*remaining_bytes)/(server.loading_loaded_bytes+1); 3025 } 3026 3027 info = sdscatprintf(info, 3028 "loading_start_time:%jd\r\n" 3029 "loading_total_bytes:%llu\r\n" 3030 "loading_loaded_bytes:%llu\r\n" 3031 "loading_loaded_perc:%.2f\r\n" 3032 "loading_eta_seconds:%jd\r\n", 3033 (intmax_t) server.loading_start_time, 3034 (unsigned long long) server.loading_total_bytes, 3035 (unsigned long long) server.loading_loaded_bytes, 3036 perc, 3037 (intmax_t)eta 3038 ); 3039 } 3040 } 3041 3042 /* Stats */ 3043 if (allsections || defsections || !strcasecmp(section,"stats")) { 3044 if (sections++) info = sdscat(info,"\r\n"); 3045 info = sdscatprintf(info, 3046 "# Stats\r\n" 3047 "total_connections_received:%lld\r\n" 3048 "total_commands_processed:%lld\r\n" 3049 "instantaneous_ops_per_sec:%lld\r\n" 3050 "total_net_input_bytes:%lld\r\n" 3051 "total_net_output_bytes:%lld\r\n" 3052 "instantaneous_input_kbps:%.2f\r\n" 3053 "instantaneous_output_kbps:%.2f\r\n" 3054 "rejected_connections:%lld\r\n" 3055 "sync_full:%lld\r\n" 3056 "sync_partial_ok:%lld\r\n" 3057 "sync_partial_err:%lld\r\n" 3058 "expired_keys:%lld\r\n" 3059 "evicted_keys:%lld\r\n" 3060 "keyspace_hits:%lld\r\n" 3061 "keyspace_misses:%lld\r\n" 3062 "pubsub_channels:%ld\r\n" 3063 "pubsub_patterns:%lu\r\n" 3064 "latest_fork_usec:%lld\r\n" 3065 "migrate_cached_sockets:%ld\r\n", 3066 server.stat_numconnections, 3067 server.stat_numcommands, 3068 getInstantaneousMetric(STATS_METRIC_COMMAND), 3069 server.stat_net_input_bytes, 3070 server.stat_net_output_bytes, 3071 (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024, 3072 (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024, 3073 server.stat_rejected_conn, 3074 server.stat_sync_full, 3075 server.stat_sync_partial_ok, 3076 server.stat_sync_partial_err, 3077 server.stat_expiredkeys, 3078 server.stat_evictedkeys, 3079 server.stat_keyspace_hits, 3080 server.stat_keyspace_misses, 3081 dictSize(server.pubsub_channels), 3082 listLength(server.pubsub_patterns), 3083 server.stat_fork_time, 3084 dictSize(server.migrate_cached_sockets)); 3085 } 3086 3087 /* Replication */ 3088 if (allsections || defsections || !strcasecmp(section,"replication")) { 3089 if (sections++) info = sdscat(info,"\r\n"); 3090 info = sdscatprintf(info, 3091 "# Replication\r\n" 3092 "role:%s\r\n", 3093 server.masterhost == NULL ? "master" : "slave"); 3094 if (server.masterhost) { 3095 long long slave_repl_offset = 1; 3096 3097 if (server.master) 3098 slave_repl_offset = server.master->reploff; 3099 else if (server.cached_master) 3100 slave_repl_offset = server.cached_master->reploff; 3101 3102 info = sdscatprintf(info, 3103 "master_host:%s\r\n" 3104 "master_port:%d\r\n" 3105 "master_link_status:%s\r\n" 3106 "master_last_io_seconds_ago:%d\r\n" 3107 "master_sync_in_progress:%d\r\n" 3108 "slave_repl_offset:%lld\r\n" 3109 ,server.masterhost, 3110 server.masterport, 3111 (server.repl_state == REPL_STATE_CONNECTED) ? 3112 "up" : "down", 3113 server.master ? 3114 ((int)(server.unixtime-server.master->lastinteraction)) : -1, 3115 server.repl_state == REPL_STATE_TRANSFER, 3116 slave_repl_offset 3117 ); 3118 3119 if (server.repl_state == REPL_STATE_TRANSFER) { 3120 info = sdscatprintf(info, 3121 "master_sync_left_bytes:%lld\r\n" 3122 "master_sync_last_io_seconds_ago:%d\r\n" 3123 , (long long) 3124 (server.repl_transfer_size - server.repl_transfer_read), 3125 (int)(server.unixtime-server.repl_transfer_lastio) 3126 ); 3127 } 3128 3129 if (server.repl_state != REPL_STATE_CONNECTED) { 3130 info = sdscatprintf(info, 3131 "master_link_down_since_seconds:%jd\r\n", 3132 (intmax_t)server.unixtime-server.repl_down_since); 3133 } 3134 info = sdscatprintf(info, 3135 "slave_priority:%d\r\n" 3136 "slave_read_only:%d\r\n", 3137 server.slave_priority, 3138 server.repl_slave_ro); 3139 } 3140 3141 info = sdscatprintf(info, 3142 "connected_slaves:%lu\r\n", 3143 listLength(server.slaves)); 3144 3145 /* If min-slaves-to-write is active, write the number of slaves 3146 * currently considered 'good'. */ 3147 if (server.repl_min_slaves_to_write && 3148 server.repl_min_slaves_max_lag) { 3149 info = sdscatprintf(info, 3150 "min_slaves_good_slaves:%d\r\n", 3151 server.repl_good_slaves_count); 3152 } 3153 3154 if (listLength(server.slaves)) { 3155 int slaveid = 0; 3156 listNode *ln; 3157 listIter li; 3158 3159 listRewind(server.slaves,&li); 3160 while((ln = listNext(&li))) { 3161 client *slave = listNodeValue(ln); 3162 char *state = NULL; 3163 char ip[NET_IP_STR_LEN]; 3164 int port; 3165 long lag = 0; 3166 3167 if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1) continue; 3168 switch(slave->replstate) { 3169 case SLAVE_STATE_WAIT_BGSAVE_START: 3170 case SLAVE_STATE_WAIT_BGSAVE_END: 3171 state = "wait_bgsave"; 3172 break; 3173 case SLAVE_STATE_SEND_BULK: 3174 state = "send_bulk"; 3175 break; 3176 case SLAVE_STATE_ONLINE: 3177 state = "online"; 3178 break; 3179 } 3180 if (state == NULL) continue; 3181 if (slave->replstate == SLAVE_STATE_ONLINE) 3182 lag = time(NULL) - slave->repl_ack_time; 3183 3184 info = sdscatprintf(info, 3185 "slave%d:ip=%s,port=%d,state=%s," 3186 "offset=%lld,lag=%ld\r\n", 3187 slaveid,ip,slave->slave_listening_port,state, 3188 slave->repl_ack_off, lag); 3189 slaveid++; 3190 } 3191 } 3192 info = sdscatprintf(info, 3193 "master_repl_offset:%lld\r\n" 3194 "repl_backlog_active:%d\r\n" 3195 "repl_backlog_size:%lld\r\n" 3196 "repl_backlog_first_byte_offset:%lld\r\n" 3197 "repl_backlog_histlen:%lld\r\n", 3198 server.master_repl_offset, 3199 server.repl_backlog != NULL, 3200 server.repl_backlog_size, 3201 server.repl_backlog_off, 3202 server.repl_backlog_histlen); 3203 } 3204 3205 /* CPU */ 3206 if (allsections || defsections || !strcasecmp(section,"cpu")) { 3207 if (sections++) info = sdscat(info,"\r\n"); 3208 info = sdscatprintf(info, 3209 "# CPU\r\n" 3210 "used_cpu_sys:%.2f\r\n" 3211 "used_cpu_user:%.2f\r\n" 3212 "used_cpu_sys_children:%.2f\r\n" 3213 "used_cpu_user_children:%.2f\r\n", 3214 (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000, 3215 (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000, 3216 (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000, 3217 (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000); 3218 } 3219 3220 /* cmdtime */ 3221 if (allsections || !strcasecmp(section,"commandstats")) { 3222 if (sections++) info = sdscat(info,"\r\n"); 3223 info = sdscatprintf(info, "# Commandstats\r\n"); 3224 numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); 3225 for (j = 0; j < numcommands; j++) { 3226 struct redisCommand *c = redisCommandTable+j; 3227 3228 if (!c->calls) continue; 3229 info = sdscatprintf(info, 3230 "cmdstat_%s:calls=%lld,usec=%lld,usec_per_call=%.2f\r\n", 3231 c->name, c->calls, c->microseconds, 3232 (c->calls == 0) ? 0 : ((float)c->microseconds/c->calls)); 3233 } 3234 } 3235 3236 /* Cluster */ 3237 if (allsections || defsections || !strcasecmp(section,"cluster")) { 3238 if (sections++) info = sdscat(info,"\r\n"); 3239 info = sdscatprintf(info, 3240 "# Cluster\r\n" 3241 "cluster_enabled:%d\r\n", 3242 server.cluster_enabled); 3243 } 3244 3245 /* Key space */ 3246 if (allsections || defsections || !strcasecmp(section,"keyspace")) { 3247 if (sections++) info = sdscat(info,"\r\n"); 3248 info = sdscatprintf(info, "# Keyspace\r\n"); 3249 for (j = 0; j < server.dbnum; j++) { 3250 long long keys, vkeys; 3251 3252 keys = dictSize(server.db[j].dict); 3253 vkeys = dictSize(server.db[j].expires); 3254 if (keys || vkeys) { 3255 info = sdscatprintf(info, 3256 "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n", 3257 j, keys, vkeys, server.db[j].avg_ttl); 3258 } 3259 } 3260 } 3261 return info; 3262 } 3263 3264 void infoCommand(client *c) { 3265 char *section = c->argc == 2 ? c->argv[1]->ptr : "default"; 3266 3267 if (c->argc > 2) { 3268 addReply(c,shared.syntaxerr); 3269 return; 3270 } 3271 addReplyBulkSds(c, genRedisInfoString(section)); 3272 } 3273 3274 void monitorCommand(client *c) { 3275 /* ignore MONITOR if already slave or in monitor mode */ 3276 if (c->flags & CLIENT_SLAVE) return; 3277 3278 c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR); 3279 listAddNodeTail(server.monitors,c); 3280 addReply(c,shared.ok); 3281 } 3282 3283 /* ============================ Maxmemory directive ======================== */ 3284 3285 /* freeMemoryIfNeeded() gets called when 'maxmemory' is set on the config 3286 * file to limit the max memory used by the server, before processing a 3287 * command. 3288 * 3289 * The goal of the function is to free enough memory to keep Redis under the 3290 * configured memory limit. 3291 * 3292 * The function starts calculating how many bytes should be freed to keep 3293 * Redis under the limit, and enters a loop selecting the best keys to 3294 * evict accordingly to the configured policy. 3295 * 3296 * If all the bytes needed to return back under the limit were freed the 3297 * function returns C_OK, otherwise C_ERR is returned, and the caller 3298 * should block the execution of commands that will result in more memory 3299 * used by the server. 3300 * 3301 * ------------------------------------------------------------------------ 3302 * 3303 * LRU approximation algorithm 3304 * 3305 * Redis uses an approximation of the LRU algorithm that runs in constant 3306 * memory. Every time there is a key to expire, we sample N keys (with 3307 * N very small, usually in around 5) to populate a pool of best keys to 3308 * evict of M keys (the pool size is defined by MAXMEMORY_EVICTION_POOL_SIZE). 3309 * 3310 * The N keys sampled are added in the pool of good keys to expire (the one 3311 * with an old access time) if they are better than one of the current keys 3312 * in the pool. 3313 * 3314 * After the pool is populated, the best key we have in the pool is expired. 3315 * However note that we don't remove keys from the pool when they are deleted 3316 * so the pool may contain keys that no longer exist. 3317 * 3318 * When we try to evict a key, and all the entries in the pool don't exist 3319 * we populate it again. This time we'll be sure that the pool has at least 3320 * one key that can be evicted, if there is at least one key that can be 3321 * evicted in the whole database. */ 3322 3323 /* Create a new eviction pool. */ 3324 struct evictionPoolEntry *evictionPoolAlloc(void) { 3325 struct evictionPoolEntry *ep; 3326 int j; 3327 3328 ep = zmalloc(sizeof(*ep)*MAXMEMORY_EVICTION_POOL_SIZE); 3329 for (j = 0; j < MAXMEMORY_EVICTION_POOL_SIZE; j++) { 3330 ep[j].idle = 0; 3331 ep[j].key = NULL; 3332 } 3333 return ep; 3334 } 3335 3336 /* This is an helper function for freeMemoryIfNeeded(), it is used in order 3337 * to populate the evictionPool with a few entries every time we want to 3338 * expire a key. Keys with idle time smaller than one of the current 3339 * keys are added. Keys are always added if there are free entries. 3340 * 3341 * We insert keys on place in ascending order, so keys with the smaller 3342 * idle time are on the left, and keys with the higher idle time on the 3343 * right. */ 3344 3345 #define EVICTION_SAMPLES_ARRAY_SIZE 16 3346 void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) { 3347 int j, k, count; 3348 dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE]; 3349 dictEntry **samples; 3350 3351 /* Try to use a static buffer: this function is a big hit... 3352 * Note: it was actually measured that this helps. */ 3353 if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) { 3354 samples = _samples; 3355 } else { 3356 samples = zmalloc(sizeof(samples[0])*server.maxmemory_samples); 3357 } 3358 3359 count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples); 3360 for (j = 0; j < count; j++) { 3361 unsigned long long idle; 3362 sds key; 3363 robj *o; 3364 dictEntry *de; 3365 3366 de = samples[j]; 3367 key = dictGetKey(de); 3368 /* If the dictionary we are sampling from is not the main 3369 * dictionary (but the expires one) we need to lookup the key 3370 * again in the key dictionary to obtain the value object. */ 3371 if (sampledict != keydict) de = dictFind(keydict, key); 3372 o = dictGetVal(de); 3373 idle = estimateObjectIdleTime(o); 3374 3375 /* Insert the element inside the pool. 3376 * First, find the first empty bucket or the first populated 3377 * bucket that has an idle time smaller than our idle time. */ 3378 k = 0; 3379 while (k < MAXMEMORY_EVICTION_POOL_SIZE && 3380 pool[k].key && 3381 pool[k].idle < idle) k++; 3382 if (k == 0 && pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key != NULL) { 3383 /* Can't insert if the element is < the worst element we have 3384 * and there are no empty buckets. */ 3385 continue; 3386 } else if (k < MAXMEMORY_EVICTION_POOL_SIZE && pool[k].key == NULL) { 3387 /* Inserting into empty position. No setup needed before insert. */ 3388 } else { 3389 /* Inserting in the middle. Now k points to the first element 3390 * greater than the element to insert. */ 3391 if (pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key == NULL) { 3392 /* Free space on the right? Insert at k shifting 3393 * all the elements from k to end to the right. */ 3394 memmove(pool+k+1,pool+k, 3395 sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1)); 3396 } else { 3397 /* No free space on right? Insert at k-1 */ 3398 k--; 3399 /* Shift all elements on the left of k (included) to the 3400 * left, so we discard the element with smaller idle time. */ 3401 sdsfree(pool[0].key); 3402 memmove(pool,pool+1,sizeof(pool[0])*k); 3403 } 3404 } 3405 pool[k].key = sdsdup(key); 3406 pool[k].idle = idle; 3407 } 3408 if (samples != _samples) zfree(samples); 3409 } 3410 3411 int freeMemoryIfNeeded(void) { 3412 size_t mem_used, mem_tofree, mem_freed; 3413 int slaves = listLength(server.slaves); 3414 mstime_t latency, eviction_latency; 3415 3416 /* Remove the size of slaves output buffers and AOF buffer from the 3417 * count of used memory. */ 3418 mem_used = zmalloc_used_memory(); 3419 if (slaves) { 3420 listIter li; 3421 listNode *ln; 3422 3423 listRewind(server.slaves,&li); 3424 while((ln = listNext(&li))) { 3425 client *slave = listNodeValue(ln); 3426 unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave); 3427 if (obuf_bytes > mem_used) 3428 mem_used = 0; 3429 else 3430 mem_used -= obuf_bytes; 3431 } 3432 } 3433 if (server.aof_state != AOF_OFF) { 3434 mem_used -= sdslen(server.aof_buf); 3435 mem_used -= aofRewriteBufferSize(); 3436 } 3437 3438 /* Check if we are over the memory limit. */ 3439 if (mem_used <= server.maxmemory) return C_OK; 3440 3441 if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) 3442 return C_ERR; /* We need to free memory, but policy forbids. */ 3443 3444 /* Compute how much memory we need to free. */ 3445 mem_tofree = mem_used - server.maxmemory; 3446 mem_freed = 0; 3447 latencyStartMonitor(latency); 3448 while (mem_freed < mem_tofree) { 3449 int j, k, keys_freed = 0; 3450 3451 for (j = 0; j < server.dbnum; j++) { 3452 long bestval = 0; /* just to prevent warning */ 3453 sds bestkey = NULL; 3454 dictEntry *de; 3455 redisDb *db = server.db+j; 3456 dict *dict; 3457 3458 if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU || 3459 server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) 3460 { 3461 dict = server.db[j].dict; 3462 } else { 3463 dict = server.db[j].expires; 3464 } 3465 if (dictSize(dict) == 0) continue; 3466 3467 /* volatile-random and allkeys-random policy */ 3468 if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || 3469 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) 3470 { 3471 de = dictGetRandomKey(dict); 3472 bestkey = dictGetKey(de); 3473 } 3474 3475 /* volatile-lru and allkeys-lru policy */ 3476 else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU || 3477 server.maxmemory_policy == MAXMEMORY_VOLATILE_LRU) 3478 { 3479 struct evictionPoolEntry *pool = db->eviction_pool; 3480 3481 while(bestkey == NULL) { 3482 evictionPoolPopulate(dict, db->dict, db->eviction_pool); 3483 /* Go backward from best to worst element to evict. */ 3484 for (k = MAXMEMORY_EVICTION_POOL_SIZE-1; k >= 0; k--) { 3485 if (pool[k].key == NULL) continue; 3486 de = dictFind(dict,pool[k].key); 3487 3488 /* Remove the entry from the pool. */ 3489 sdsfree(pool[k].key); 3490 /* Shift all elements on its right to left. */ 3491 memmove(pool+k,pool+k+1, 3492 sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1)); 3493 /* Clear the element on the right which is empty 3494 * since we shifted one position to the left. */ 3495 pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key = NULL; 3496 pool[MAXMEMORY_EVICTION_POOL_SIZE-1].idle = 0; 3497 3498 /* If the key exists, is our pick. Otherwise it is 3499 * a ghost and we need to try the next element. */ 3500 if (de) { 3501 bestkey = dictGetKey(de); 3502 break; 3503 } else { 3504 /* Ghost... */ 3505 continue; 3506 } 3507 } 3508 } 3509 } 3510 3511 /* volatile-ttl */ 3512 else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { 3513 for (k = 0; k < server.maxmemory_samples; k++) { 3514 sds thiskey; 3515 long thisval; 3516 3517 de = dictGetRandomKey(dict); 3518 thiskey = dictGetKey(de); 3519 thisval = (long) dictGetVal(de); 3520 3521 /* Expire sooner (minor expire unix timestamp) is better 3522 * candidate for deletion */ 3523 if (bestkey == NULL || thisval < bestval) { 3524 bestkey = thiskey; 3525 bestval = thisval; 3526 } 3527 } 3528 } 3529 3530 /* Finally remove the selected key. */ 3531 if (bestkey) { 3532 long long delta; 3533 3534 robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); 3535 propagateExpire(db,keyobj); 3536 /* We compute the amount of memory freed by dbDelete() alone. 3537 * It is possible that actually the memory needed to propagate 3538 * the DEL in AOF and replication link is greater than the one 3539 * we are freeing removing the key, but we can't account for 3540 * that otherwise we would never exit the loop. 3541 * 3542 * AOF and Output buffer memory will be freed eventually so 3543 * we only care about memory used by the key space. */ 3544 delta = (long long) zmalloc_used_memory(); 3545 latencyStartMonitor(eviction_latency); 3546 dbDelete(db,keyobj); 3547 latencyEndMonitor(eviction_latency); 3548 latencyAddSampleIfNeeded("eviction-del",eviction_latency); 3549 latencyRemoveNestedEvent(latency,eviction_latency); 3550 delta -= (long long) zmalloc_used_memory(); 3551 mem_freed += delta; 3552 server.stat_evictedkeys++; 3553 notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", 3554 keyobj, db->id); 3555 decrRefCount(keyobj); 3556 keys_freed++; 3557 3558 /* When the memory to free starts to be big enough, we may 3559 * start spending so much time here that is impossible to 3560 * deliver data to the slaves fast enough, so we force the 3561 * transmission here inside the loop. */ 3562 if (slaves) flushSlavesOutputBuffers(); 3563 } 3564 } 3565 if (!keys_freed) { 3566 latencyEndMonitor(latency); 3567 latencyAddSampleIfNeeded("eviction-cycle",latency); 3568 return C_ERR; /* nothing to free... */ 3569 } 3570 } 3571 latencyEndMonitor(latency); 3572 latencyAddSampleIfNeeded("eviction-cycle",latency); 3573 return C_OK; 3574 } 3575 3576 /* =================================== Main! ================================ */ 3577 3578 #ifdef __linux__ 3579 int linuxOvercommitMemoryValue(void) { 3580 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r"); 3581 char buf[64]; 3582 3583 if (!fp) return -1; 3584 if (fgets(buf,64,fp) == NULL) { 3585 fclose(fp); 3586 return -1; 3587 } 3588 fclose(fp); 3589 3590 return atoi(buf); 3591 } 3592 3593 void linuxMemoryWarnings(void) { 3594 if (linuxOvercommitMemoryValue() == 0) { 3595 serverLog(LL_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect."); 3596 } 3597 if (THPIsEnabled()) { 3598 serverLog(LL_WARNING,"WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled."); 3599 } 3600 } 3601 #endif /* __linux__ */ 3602 3603 void createPidFile(void) { 3604 /* If pidfile requested, but no pidfile defined, use 3605 * default pidfile path */ 3606 if (!server.pidfile) server.pidfile = zstrdup(CONFIG_DEFAULT_PID_FILE); 3607 3608 /* Try to write the pid file in a best-effort way. */ 3609 FILE *fp = fopen(server.pidfile,"w"); 3610 if (fp) { 3611 fprintf(fp,"%d\n",(int)getpid()); 3612 fclose(fp); 3613 } 3614 } 3615 3616 void daemonize(void) { 3617 int fd; 3618 3619 if (fork() != 0) exit(0); /* parent exits */ 3620 setsid(); /* create a new session */ 3621 3622 /* Every output goes to /dev/null. If Redis is daemonized but 3623 * the 'logfile' is set to 'stdout' in the configuration file 3624 * it will not log at all. */ 3625 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) { 3626 dup2(fd, STDIN_FILENO); 3627 dup2(fd, STDOUT_FILENO); 3628 dup2(fd, STDERR_FILENO); 3629 if (fd > STDERR_FILENO) close(fd); 3630 } 3631 } 3632 3633 void version(void) { 3634 printf("Redis server v=%s sha=%s:%d malloc=%s bits=%d build=%llx\n", 3635 REDIS_VERSION, 3636 redisGitSHA1(), 3637 atoi(redisGitDirty()) > 0, 3638 ZMALLOC_LIB, 3639 sizeof(long) == 4 ? 32 : 64, 3640 (unsigned long long) redisBuildId()); 3641 exit(0); 3642 } 3643 3644 void usage(void) { 3645 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf] [options]\n"); 3646 fprintf(stderr," ./redis-server - (read config from stdin)\n"); 3647 fprintf(stderr," ./redis-server -v or --version\n"); 3648 fprintf(stderr," ./redis-server -h or --help\n"); 3649 fprintf(stderr," ./redis-server --test-memory <megabytes>\n\n"); 3650 fprintf(stderr,"Examples:\n"); 3651 fprintf(stderr," ./redis-server (run the server with default conf)\n"); 3652 fprintf(stderr," ./redis-server /etc/redis/6379.conf\n"); 3653 fprintf(stderr," ./redis-server --port 7777\n"); 3654 fprintf(stderr," ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n"); 3655 fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n\n"); 3656 fprintf(stderr,"Sentinel mode:\n"); 3657 fprintf(stderr," ./redis-server /etc/sentinel.conf --sentinel\n"); 3658 exit(1); 3659 } 3660 3661 void redisAsciiArt(void) { 3662 #include "asciilogo.h" 3663 char *buf = zmalloc(1024*16); 3664 char *mode; 3665 3666 if (server.cluster_enabled) mode = "cluster"; 3667 else if (server.sentinel_mode) mode = "sentinel"; 3668 else mode = "standalone"; 3669 3670 if (server.syslog_enabled) { 3671 serverLog(LL_NOTICE, 3672 "Redis %s (%s/%d) %s bit, %s mode, port %d, pid %ld ready to start.", 3673 REDIS_VERSION, 3674 redisGitSHA1(), 3675 strtol(redisGitDirty(),NULL,10) > 0, 3676 (sizeof(long) == 8) ? "64" : "32", 3677 mode, server.port, 3678 (long) getpid() 3679 ); 3680 } else { 3681 snprintf(buf,1024*16,ascii_logo, 3682 REDIS_VERSION, 3683 redisGitSHA1(), 3684 strtol(redisGitDirty(),NULL,10) > 0, 3685 (sizeof(long) == 8) ? "64" : "32", 3686 mode, server.port, 3687 (long) getpid() 3688 ); 3689 serverLogRaw(LL_NOTICE|LL_RAW,buf); 3690 } 3691 zfree(buf); 3692 } 3693 3694 static void sigShutdownHandler(int sig) { 3695 char *msg; 3696 3697 switch (sig) { 3698 case SIGINT: 3699 msg = "Received SIGINT scheduling shutdown..."; 3700 break; 3701 case SIGTERM: 3702 msg = "Received SIGTERM scheduling shutdown..."; 3703 break; 3704 default: 3705 msg = "Received shutdown signal, scheduling shutdown..."; 3706 }; 3707 3708 /* SIGINT is often delivered via Ctrl+C in an interactive session. 3709 * If we receive the signal the second time, we interpret this as 3710 * the user really wanting to quit ASAP without waiting to persist 3711 * on disk. */ 3712 if (server.shutdown_asap && sig == SIGINT) { 3713 serverLogFromHandler(LL_WARNING, "You insist... exiting now."); 3714 rdbRemoveTempFile(getpid()); 3715 exit(1); /* Exit with an error since this was not a clean shutdown. */ 3716 } else if (server.loading) { 3717 exit(0); 3718 } 3719 3720 serverLogFromHandler(LL_WARNING, msg); 3721 server.shutdown_asap = 1; 3722 } 3723 3724 void setupSignalHandlers(void) { 3725 struct sigaction act; 3726 3727 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. 3728 * Otherwise, sa_handler is used. */ 3729 sigemptyset(&act.sa_mask); 3730 act.sa_flags = 0; 3731 act.sa_handler = sigShutdownHandler; 3732 sigaction(SIGTERM, &act, NULL); 3733 sigaction(SIGINT, &act, NULL); 3734 3735 #ifdef HAVE_BACKTRACE 3736 sigemptyset(&act.sa_mask); 3737 act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO; 3738 act.sa_sigaction = sigsegvHandler; 3739 sigaction(SIGSEGV, &act, NULL); 3740 sigaction(SIGBUS, &act, NULL); 3741 sigaction(SIGFPE, &act, NULL); 3742 sigaction(SIGILL, &act, NULL); 3743 #endif 3744 return; 3745 } 3746 3747 void memtest(size_t megabytes, int passes); 3748 3749 /* Returns 1 if there is --sentinel among the arguments or if 3750 * argv[0] is exactly "redis-sentinel". */ 3751 int checkForSentinelMode(int argc, char **argv) { 3752 int j; 3753 3754 if (strstr(argv[0],"redis-sentinel") != NULL) return 1; 3755 for (j = 1; j < argc; j++) 3756 if (!strcmp(argv[j],"--sentinel")) return 1; 3757 return 0; 3758 } 3759 3760 /* Function called at startup to load RDB or AOF file in memory. */ 3761 void loadDataFromDisk(void) { 3762 long long start = ustime(); 3763 if (server.aof_state == AOF_ON) { 3764 if (loadAppendOnlyFile(server.aof_filename) == C_OK) 3765 serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); 3766 } else { 3767 if (rdbLoad(server.rdb_filename) == C_OK) { 3768 serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", 3769 (float)(ustime()-start)/1000000); 3770 } else if (errno != ENOENT) { 3771 serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); 3772 exit(1); 3773 } 3774 } 3775 } 3776 3777 void redisOutOfMemoryHandler(size_t allocation_size) { 3778 serverLog(LL_WARNING,"Out Of Memory allocating %zu bytes!", 3779 allocation_size); 3780 serverPanic("Redis aborting for OUT OF MEMORY"); 3781 } 3782 3783 void redisSetProcTitle(char *title) { 3784 #ifdef USE_SETPROCTITLE 3785 char *server_mode = ""; 3786 if (server.cluster_enabled) server_mode = " [cluster]"; 3787 else if (server.sentinel_mode) server_mode = " [sentinel]"; 3788 3789 setproctitle("%s %s:%d%s", 3790 title, 3791 server.bindaddr_count ? server.bindaddr[0] : "*", 3792 server.port, 3793 server_mode); 3794 #else 3795 UNUSED(title); 3796 #endif 3797 } 3798 3799 /* 3800 * Check whether systemd or upstart have been used to start redis. 3801 */ 3802 3803 int redisSupervisedUpstart(void) { 3804 const char *upstart_job = getenv("UPSTART_JOB"); 3805 3806 if (!upstart_job) { 3807 serverLog(LL_WARNING, 3808 "upstart supervision requested, but UPSTART_JOB not found"); 3809 return 0; 3810 } 3811 3812 serverLog(LL_NOTICE, "supervised by upstart, will stop to signal readyness"); 3813 raise(SIGSTOP); 3814 unsetenv("UPSTART_JOB"); 3815 return 1; 3816 } 3817 3818 int redisSupervisedSystemd(void) { 3819 const char *notify_socket = getenv("NOTIFY_SOCKET"); 3820 int fd = 1; 3821 struct sockaddr_un su; 3822 struct iovec iov; 3823 struct msghdr hdr; 3824 int sendto_flags = 0; 3825 3826 if (!notify_socket) { 3827 serverLog(LL_WARNING, 3828 "systemd supervision requested, but NOTIFY_SOCKET not found"); 3829 return 0; 3830 } 3831 3832 if ((strchr("@/", notify_socket[0])) == NULL || strlen(notify_socket) < 2) { 3833 return 0; 3834 } 3835 3836 serverLog(LL_NOTICE, "supervised by systemd, will signal readyness"); 3837 if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) { 3838 serverLog(LL_WARNING, 3839 "Can't connect to systemd socket %s", notify_socket); 3840 return 0; 3841 } 3842 3843 memset(&su, 0, sizeof(su)); 3844 su.sun_family = AF_UNIX; 3845 strncpy (su.sun_path, notify_socket, sizeof(su.sun_path) -1); 3846 su.sun_path[sizeof(su.sun_path) - 1] = '\0'; 3847 3848 if (notify_socket[0] == '@') 3849 su.sun_path[0] = '\0'; 3850 3851 memset(&iov, 0, sizeof(iov)); 3852 iov.iov_base = "READY=1"; 3853 iov.iov_len = strlen("READY=1"); 3854 3855 memset(&hdr, 0, sizeof(hdr)); 3856 hdr.msg_name = &su; 3857 hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) + 3858 strlen(notify_socket); 3859 hdr.msg_iov = &iov; 3860 hdr.msg_iovlen = 1; 3861 3862 unsetenv("NOTIFY_SOCKET"); 3863 #ifdef HAVE_MSG_NOSIGNAL 3864 sendto_flags |= MSG_NOSIGNAL; 3865 #endif 3866 if (sendmsg(fd, &hdr, sendto_flags) < 0) { 3867 serverLog(LL_WARNING, "Can't send notification to systemd"); 3868 close(fd); 3869 return 0; 3870 } 3871 close(fd); 3872 return 1; 3873 } 3874 3875 int redisIsSupervised(int mode) { 3876 if (mode == SUPERVISED_AUTODETECT) { 3877 const char *upstart_job = getenv("UPSTART_JOB"); 3878 const char *notify_socket = getenv("NOTIFY_SOCKET"); 3879 3880 if (upstart_job) { 3881 redisSupervisedUpstart(); 3882 } else if (notify_socket) { 3883 redisSupervisedSystemd(); 3884 } 3885 } else if (mode == SUPERVISED_UPSTART) { 3886 return redisSupervisedUpstart(); 3887 } else if (mode == SUPERVISED_SYSTEMD) { 3888 return redisSupervisedSystemd(); 3889 } 3890 3891 return 0; 3892 } 3893 3894 3895 int main(int argc, char **argv) { 3896 struct timeval tv; 3897 int j; 3898 3899 #ifdef REDIS_TEST 3900 if (argc == 3 && !strcasecmp(argv[1], "test")) { 3901 if (!strcasecmp(argv[2], "ziplist")) { 3902 return ziplistTest(argc, argv); 3903 } else if (!strcasecmp(argv[2], "quicklist")) { 3904 quicklistTest(argc, argv); 3905 } else if (!strcasecmp(argv[2], "intset")) { 3906 return intsetTest(argc, argv); 3907 } else if (!strcasecmp(argv[2], "zipmap")) { 3908 return zipmapTest(argc, argv); 3909 } else if (!strcasecmp(argv[2], "sha1test")) { 3910 return sha1Test(argc, argv); 3911 } else if (!strcasecmp(argv[2], "util")) { 3912 return utilTest(argc, argv); 3913 } else if (!strcasecmp(argv[2], "sds")) { 3914 return sdsTest(argc, argv); 3915 } else if (!strcasecmp(argv[2], "endianconv")) { 3916 return endianconvTest(argc, argv); 3917 } else if (!strcasecmp(argv[2], "crc64")) { 3918 return crc64Test(argc, argv); 3919 } 3920 3921 return -1; /* test not found */ 3922 } 3923 #endif 3924 3925 /* We need to initialize our libraries, and the server configuration. */ 3926 #ifdef INIT_SETPROCTITLE_REPLACEMENT 3927 spt_init(argc, argv); 3928 #endif 3929 setlocale(LC_COLLATE,""); 3930 zmalloc_enable_thread_safeness(); 3931 zmalloc_set_oom_handler(redisOutOfMemoryHandler); 3932 srand(time(NULL)^getpid()); 3933 gettimeofday(&tv,NULL); 3934 dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid()); 3935 server.sentinel_mode = checkForSentinelMode(argc,argv); 3936 initServerConfig(); 3937 3938 /* Store the executable path and arguments in a safe place in order 3939 * to be able to restart the server later. */ 3940 server.executable = getAbsolutePath(argv[0]); 3941 server.exec_argv = zmalloc(sizeof(char*)*(argc+1)); 3942 server.exec_argv[argc] = NULL; 3943 for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]); 3944 3945 /* We need to init sentinel right now as parsing the configuration file 3946 * in sentinel mode will have the effect of populating the sentinel 3947 * data structures with master nodes to monitor. */ 3948 if (server.sentinel_mode) { 3949 initSentinelConfig(); 3950 initSentinel(); 3951 } 3952 3953 /* Check if we need to start in redis-check-rdb mode. We just execute 3954 * the program main. However the program is part of the Redis executable 3955 * so that we can easily execute an RDB check on loading errors. */ 3956 if (strstr(argv[0],"redis-check-rdb") != NULL) 3957 exit(redis_check_rdb_main(argv,argc)); 3958 3959 if (argc >= 2) { 3960 j = 1; /* First option to parse in argv[] */ 3961 sds options = sdsempty(); 3962 char *configfile = NULL; 3963 3964 /* Handle special options --help and --version */ 3965 if (strcmp(argv[1], "-v") == 0 || 3966 strcmp(argv[1], "--version") == 0) version(); 3967 if (strcmp(argv[1], "--help") == 0 || 3968 strcmp(argv[1], "-h") == 0) usage(); 3969 if (strcmp(argv[1], "--test-memory") == 0) { 3970 if (argc == 3) { 3971 memtest(atoi(argv[2]),50); 3972 exit(0); 3973 } else { 3974 fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n"); 3975 fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n"); 3976 exit(1); 3977 } 3978 } 3979 3980 /* First argument is the config file name? */ 3981 if (argv[j][0] != '-' || argv[j][1] != '-') { 3982 configfile = argv[j]; 3983 server.configfile = getAbsolutePath(configfile); 3984 /* Replace the config file in server.exec_argv with 3985 * its absoulte path. */ 3986 zfree(server.exec_argv[j]); 3987 server.exec_argv[j] = zstrdup(server.configfile); 3988 j++; 3989 } 3990 3991 /* All the other options are parsed and conceptually appended to the 3992 * configuration file. For instance --port 6380 will generate the 3993 * string "port 6380\n" to be parsed after the actual file name 3994 * is parsed, if any. */ 3995 while(j != argc) { 3996 if (argv[j][0] == '-' && argv[j][1] == '-') { 3997 /* Option name */ 3998 if (!strcmp(argv[j], "--check-rdb")) { 3999 /* Argument has no options, need to skip for parsing. */ 4000 j++; 4001 continue; 4002 } 4003 if (sdslen(options)) options = sdscat(options,"\n"); 4004 options = sdscat(options,argv[j]+2); 4005 options = sdscat(options," "); 4006 } else { 4007 /* Option argument */ 4008 options = sdscatrepr(options,argv[j],strlen(argv[j])); 4009 options = sdscat(options," "); 4010 } 4011 j++; 4012 } 4013 if (server.sentinel_mode && configfile && *configfile == '-') { 4014 serverLog(LL_WARNING, 4015 "Sentinel config from STDIN not allowed."); 4016 serverLog(LL_WARNING, 4017 "Sentinel needs config file on disk to save state. Exiting..."); 4018 exit(1); 4019 } 4020 resetServerSaveParams(); 4021 loadServerConfig(configfile,options); 4022 sdsfree(options); 4023 } else { 4024 serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis"); 4025 } 4026 4027 server.supervised = redisIsSupervised(server.supervised_mode); 4028 int background = server.daemonize && !server.supervised; 4029 if (background) daemonize(); 4030 4031 initServer(); 4032 if (background || server.pidfile) createPidFile(); 4033 redisSetProcTitle(argv[0]); 4034 redisAsciiArt(); 4035 checkTcpBacklogSettings(); 4036 4037 if (!server.sentinel_mode) { 4038 /* Things not needed when running in Sentinel mode. */ 4039 serverLog(LL_WARNING,"Server started, Redis version " REDIS_VERSION); 4040 #ifdef __linux__ 4041 linuxMemoryWarnings(); 4042 #endif 4043 loadDataFromDisk(); 4044 if (server.cluster_enabled) { 4045 if (verifyClusterConfigWithData() == C_ERR) { 4046 serverLog(LL_WARNING, 4047 "You can't have keys in a DB different than DB 0 when in " 4048 "Cluster mode. Exiting."); 4049 exit(1); 4050 } 4051 } 4052 if (server.ipfd_count > 0) 4053 serverLog(LL_NOTICE,"The server is now ready to accept connections on port %d", server.port); 4054 if (server.sofd > 0) 4055 serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); 4056 } else { 4057 sentinelIsRunning(); 4058 } 4059 4060 /* Warning the user about suspicious maxmemory setting. */ 4061 if (server.maxmemory > 0 && server.maxmemory < 1024*1024) { 4062 serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); 4063 } 4064 4065 aeSetBeforeSleepProc(server.el,beforeSleep); 4066 aeMain(server.el); 4067 aeDeleteEventLoop(server.el); 4068 return 0; 4069 } 4070 4071 /* The End */ 4072