1 /* 2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * * Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * * Neither the name of Redis nor the names of its contributors may be used 14 * to endorse or promote products derived from this software without 15 * specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27 * POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "server.h" 31 #include "cluster.h" 32 #include "slowlog.h" 33 #include "bio.h" 34 #include "latency.h" 35 36 #include <time.h> 37 #include <signal.h> 38 #include <sys/wait.h> 39 #include <errno.h> 40 #include <assert.h> 41 #include <ctype.h> 42 #include <stdarg.h> 43 #include <arpa/inet.h> 44 #include <sys/stat.h> 45 #include <fcntl.h> 46 #include <sys/time.h> 47 #include <sys/resource.h> 48 #include <sys/uio.h> 49 #include <sys/un.h> 50 #include <limits.h> 51 #include <float.h> 52 #include <math.h> 53 #include <sys/resource.h> 54 #include <sys/utsname.h> 55 #include <locale.h> 56 #include <sys/socket.h> 57 58 /* Our shared "common" objects */ 59 60 struct sharedObjectsStruct shared; 61 62 /* Global vars that are actually used as constants. The following double 63 * values are used for double on-disk serialization, and are initialized 64 * at runtime to avoid strange compiler optimizations. */ 65 66 double R_Zero, R_PosInf, R_NegInf, R_Nan; 67 68 /*================================= Globals ================================= */ 69 70 /* Global vars */ 71 struct redisServer server; /* server global state */ 72 73 /* Our command table. 74 * 75 * Every entry is composed of the following fields: 76 * 77 * name: a string representing the command name. 78 * function: pointer to the C function implementing the command. 79 * arity: number of arguments, it is possible to use -N to say >= N 80 * sflags: command flags as string. See below for a table of flags. 81 * flags: flags as bitmask. Computed by Redis using the 'sflags' field. 82 * get_keys_proc: an optional function to get key arguments from a command. 83 * This is only used when the following three fields are not 84 * enough to specify what arguments are keys. 85 * first_key_index: first argument that is a key 86 * last_key_index: last argument that is a key 87 * key_step: step to get all the keys from first to last argument. For instance 88 * in MSET the step is two since arguments are key,val,key,val,... 89 * microseconds: microseconds of total execution time for this command. 90 * calls: total number of calls of this command. 91 * 92 * The flags, microseconds and calls fields are computed by Redis and should 93 * always be set to zero. 94 * 95 * Command flags are expressed using strings where every character represents 96 * a flag. Later the populateCommandTable() function will take care of 97 * populating the real 'flags' field using this characters. 98 * 99 * This is the meaning of the flags: 100 * 101 * w: write command (may modify the key space). 102 * r: read command (will never modify the key space). 103 * m: may increase memory usage once called. Don't allow if out of memory. 104 * a: admin command, like SAVE or SHUTDOWN. 105 * p: Pub/Sub related command. 106 * f: force replication of this command, regardless of server.dirty. 107 * s: command not allowed in scripts. 108 * R: random command. Command is not deterministic, that is, the same command 109 * with the same arguments, with the same key space, may have different 110 * results. For instance SPOP and RANDOMKEY are two random commands. 111 * S: Sort command output array if called from script, so that the output 112 * is deterministic. 113 * l: Allow command while loading the database. 114 * t: Allow command while a slave has stale data but is not allowed to 115 * server this data. Normally no command is accepted in this condition 116 * but just a few. 117 * M: Do not automatically propagate the command on MONITOR. 118 * k: Perform an implicit ASKING for this command, so the command will be 119 * accepted in cluster mode if the slot is marked as 'importing'. 120 * F: Fast command: O(1) or O(log(N)) command that should never delay 121 * its execution as long as the kernel scheduler is giving us time. 122 * Note that commands that may trigger a DEL as a side effect (like SET) 123 * are not fast commands. 124 */ 125 struct redisCommand redisCommandTable[] = { 126 {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, 127 {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}, 128 {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0}, 129 {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0}, 130 {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0}, 131 {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0}, 132 {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0}, 133 {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0}, 134 {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0}, 135 {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0}, 136 {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0}, 137 {"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0}, 138 {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0}, 139 {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0}, 140 {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0}, 141 {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0}, 142 {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0}, 143 {"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0}, 144 {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, 145 {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, 146 {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0}, 147 {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0}, 148 {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0}, 149 {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0}, 150 {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0}, 151 {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0}, 152 {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0}, 153 {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0}, 154 {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0}, 155 {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0}, 156 {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0}, 157 {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0}, 158 {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0}, 159 {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0}, 160 {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0}, 161 {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0}, 162 {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0}, 163 {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0}, 164 {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0}, 165 {"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0}, 166 {"spop",spopCommand,-2,"wRF",0,NULL,1,1,1,0,0}, 167 {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0}, 168 {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0}, 169 {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, 170 {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0}, 171 {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, 172 {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0}, 173 {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, 174 {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0}, 175 {"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0}, 176 {"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0}, 177 {"zincrby",zincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0}, 178 {"zrem",zremCommand,-3,"wF",0,NULL,1,1,1,0,0}, 179 {"zremrangebyscore",zremrangebyscoreCommand,4,"w",0,NULL,1,1,1,0,0}, 180 {"zremrangebyrank",zremrangebyrankCommand,4,"w",0,NULL,1,1,1,0,0}, 181 {"zremrangebylex",zremrangebylexCommand,4,"w",0,NULL,1,1,1,0,0}, 182 {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0}, 183 {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0}, 184 {"zrange",zrangeCommand,-4,"r",0,NULL,1,1,1,0,0}, 185 {"zrangebyscore",zrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0}, 186 {"zrevrangebyscore",zrevrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0}, 187 {"zrangebylex",zrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0}, 188 {"zrevrangebylex",zrevrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0}, 189 {"zcount",zcountCommand,4,"rF",0,NULL,1,1,1,0,0}, 190 {"zlexcount",zlexcountCommand,4,"rF",0,NULL,1,1,1,0,0}, 191 {"zrevrange",zrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0}, 192 {"zcard",zcardCommand,2,"rF",0,NULL,1,1,1,0,0}, 193 {"zscore",zscoreCommand,3,"rF",0,NULL,1,1,1,0,0}, 194 {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0}, 195 {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0}, 196 {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0}, 197 {"hset",hsetCommand,4,"wmF",0,NULL,1,1,1,0,0}, 198 {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0}, 199 {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0}, 200 {"hmset",hmsetCommand,-4,"wm",0,NULL,1,1,1,0,0}, 201 {"hmget",hmgetCommand,-3,"r",0,NULL,1,1,1,0,0}, 202 {"hincrby",hincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0}, 203 {"hincrbyfloat",hincrbyfloatCommand,4,"wmF",0,NULL,1,1,1,0,0}, 204 {"hdel",hdelCommand,-3,"wF",0,NULL,1,1,1,0,0}, 205 {"hlen",hlenCommand,2,"rF",0,NULL,1,1,1,0,0}, 206 {"hstrlen",hstrlenCommand,3,"rF",0,NULL,1,1,1,0,0}, 207 {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0}, 208 {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0}, 209 {"hgetall",hgetallCommand,2,"r",0,NULL,1,1,1,0,0}, 210 {"hexists",hexistsCommand,3,"rF",0,NULL,1,1,1,0,0}, 211 {"hscan",hscanCommand,-3,"rR",0,NULL,1,1,1,0,0}, 212 {"incrby",incrbyCommand,3,"wmF",0,NULL,1,1,1,0,0}, 213 {"decrby",decrbyCommand,3,"wmF",0,NULL,1,1,1,0,0}, 214 {"incrbyfloat",incrbyfloatCommand,3,"wmF",0,NULL,1,1,1,0,0}, 215 {"getset",getsetCommand,3,"wm",0,NULL,1,1,1,0,0}, 216 {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0}, 217 {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0}, 218 {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0}, 219 {"select",selectCommand,2,"lF",0,NULL,0,0,0,0,0}, 220 {"move",moveCommand,3,"wF",0,NULL,1,1,1,0,0}, 221 {"rename",renameCommand,3,"w",0,NULL,1,2,1,0,0}, 222 {"renamenx",renamenxCommand,3,"wF",0,NULL,1,2,1,0,0}, 223 {"expire",expireCommand,3,"wF",0,NULL,1,1,1,0,0}, 224 {"expireat",expireatCommand,3,"wF",0,NULL,1,1,1,0,0}, 225 {"pexpire",pexpireCommand,3,"wF",0,NULL,1,1,1,0,0}, 226 {"pexpireat",pexpireatCommand,3,"wF",0,NULL,1,1,1,0,0}, 227 {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0}, 228 {"scan",scanCommand,-2,"rR",0,NULL,0,0,0,0,0}, 229 {"dbsize",dbsizeCommand,1,"rF",0,NULL,0,0,0,0,0}, 230 {"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0}, 231 {"ping",pingCommand,-1,"tF",0,NULL,0,0,0,0,0}, 232 {"echo",echoCommand,2,"F",0,NULL,0,0,0,0,0}, 233 {"save",saveCommand,1,"as",0,NULL,0,0,0,0,0}, 234 {"bgsave",bgsaveCommand,1,"a",0,NULL,0,0,0,0,0}, 235 {"bgrewriteaof",bgrewriteaofCommand,1,"a",0,NULL,0,0,0,0,0}, 236 {"shutdown",shutdownCommand,-1,"alt",0,NULL,0,0,0,0,0}, 237 {"lastsave",lastsaveCommand,1,"RF",0,NULL,0,0,0,0,0}, 238 {"type",typeCommand,2,"rF",0,NULL,1,1,1,0,0}, 239 {"multi",multiCommand,1,"sF",0,NULL,0,0,0,0,0}, 240 {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0}, 241 {"discard",discardCommand,1,"sF",0,NULL,0,0,0,0,0}, 242 {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0}, 243 {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0}, 244 {"replconf",replconfCommand,-1,"aslt",0,NULL,0,0,0,0,0}, 245 {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0}, 246 {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0}, 247 {"sort",sortCommand,-2,"wm",0,sortGetKeys,1,1,1,0,0}, 248 {"info",infoCommand,-1,"lt",0,NULL,0,0,0,0,0}, 249 {"monitor",monitorCommand,1,"as",0,NULL,0,0,0,0,0}, 250 {"ttl",ttlCommand,2,"rF",0,NULL,1,1,1,0,0}, 251 {"pttl",pttlCommand,2,"rF",0,NULL,1,1,1,0,0}, 252 {"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0}, 253 {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0}, 254 {"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0}, 255 {"debug",debugCommand,-1,"as",0,NULL,0,0,0,0,0}, 256 {"config",configCommand,-2,"lat",0,NULL,0,0,0,0,0}, 257 {"subscribe",subscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0}, 258 {"unsubscribe",unsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0}, 259 {"psubscribe",psubscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0}, 260 {"punsubscribe",punsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0}, 261 {"publish",publishCommand,3,"pltF",0,NULL,0,0,0,0,0}, 262 {"pubsub",pubsubCommand,-2,"pltR",0,NULL,0,0,0,0,0}, 263 {"watch",watchCommand,-2,"sF",0,NULL,1,-1,1,0,0}, 264 {"unwatch",unwatchCommand,1,"sF",0,NULL,0,0,0,0,0}, 265 {"cluster",clusterCommand,-2,"a",0,NULL,0,0,0,0,0}, 266 {"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0}, 267 {"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0}, 268 {"migrate",migrateCommand,-6,"w",0,migrateGetKeys,0,0,0,0,0}, 269 {"asking",askingCommand,1,"F",0,NULL,0,0,0,0,0}, 270 {"readonly",readonlyCommand,1,"F",0,NULL,0,0,0,0,0}, 271 {"readwrite",readwriteCommand,1,"F",0,NULL,0,0,0,0,0}, 272 {"dump",dumpCommand,2,"r",0,NULL,1,1,1,0,0}, 273 {"object",objectCommand,3,"r",0,NULL,2,2,2,0,0}, 274 {"client",clientCommand,-2,"as",0,NULL,0,0,0,0,0}, 275 {"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0}, 276 {"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0}, 277 {"slowlog",slowlogCommand,-2,"a",0,NULL,0,0,0,0,0}, 278 {"script",scriptCommand,-2,"s",0,NULL,0,0,0,0,0}, 279 {"time",timeCommand,1,"RF",0,NULL,0,0,0,0,0}, 280 {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0}, 281 {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0}, 282 {"bitpos",bitposCommand,-3,"r",0,NULL,1,1,1,0,0}, 283 {"wait",waitCommand,3,"s",0,NULL,0,0,0,0,0}, 284 {"command",commandCommand,0,"lt",0,NULL,0,0,0,0,0}, 285 {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0}, 286 {"georadius",georadiusCommand,-6,"w",0,NULL,1,1,1,0,0}, 287 {"georadiusbymember",georadiusByMemberCommand,-5,"w",0,NULL,1,1,1,0,0}, 288 {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0}, 289 {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0}, 290 {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0}, 291 {"pfselftest",pfselftestCommand,1,"a",0,NULL,0,0,0,0,0}, 292 {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0}, 293 {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0}, 294 {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0}, 295 {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0}, 296 {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0} 297 }; 298 299 struct evictionPoolEntry *evictionPoolAlloc(void); 300 301 /*============================ Utility functions ============================ */ 302 303 /* Low level logging. To use only for very big messages, otherwise 304 * serverLog() is to prefer. */ 305 void serverLogRaw(int level, const char *msg) { 306 const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING }; 307 const char *c = ".-*#"; 308 FILE *fp; 309 char buf[64]; 310 int rawmode = (level & LL_RAW); 311 int log_to_stdout = server.logfile[0] == '\0'; 312 313 level &= 0xff; /* clear flags */ 314 if (level < server.verbosity) return; 315 316 fp = log_to_stdout ? stdout : fopen(server.logfile,"a"); 317 if (!fp) return; 318 319 if (rawmode) { 320 fprintf(fp,"%s",msg); 321 } else { 322 int off; 323 struct timeval tv; 324 int role_char; 325 pid_t pid = getpid(); 326 327 gettimeofday(&tv,NULL); 328 off = strftime(buf,sizeof(buf),"%d %b %H:%M:%S.",localtime(&tv.tv_sec)); 329 snprintf(buf+off,sizeof(buf)-off,"%03d",(int)tv.tv_usec/1000); 330 if (server.sentinel_mode) { 331 role_char = 'X'; /* Sentinel. */ 332 } else if (pid != server.pid) { 333 role_char = 'C'; /* RDB / AOF writing child. */ 334 } else { 335 role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */ 336 } 337 fprintf(fp,"%d:%c %s %c %s\n", 338 (int)getpid(),role_char, buf,c[level],msg); 339 } 340 fflush(fp); 341 342 if (!log_to_stdout) fclose(fp); 343 if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg); 344 } 345 346 /* Like serverLogRaw() but with printf-alike support. This is the function that 347 * is used across the code. The raw version is only used in order to dump 348 * the INFO output on crash. */ 349 void serverLog(int level, const char *fmt, ...) { 350 va_list ap; 351 char msg[LOG_MAX_LEN]; 352 353 if ((level&0xff) < server.verbosity) return; 354 355 va_start(ap, fmt); 356 vsnprintf(msg, sizeof(msg), fmt, ap); 357 va_end(ap); 358 359 serverLogRaw(level,msg); 360 } 361 362 /* Log a fixed message without printf-alike capabilities, in a way that is 363 * safe to call from a signal handler. 364 * 365 * We actually use this only for signals that are not fatal from the point 366 * of view of Redis. Signals that are going to kill the server anyway and 367 * where we need printf-alike features are served by serverLog(). */ 368 void serverLogFromHandler(int level, const char *msg) { 369 int fd; 370 int log_to_stdout = server.logfile[0] == '\0'; 371 char buf[64]; 372 373 if ((level&0xff) < server.verbosity || (log_to_stdout && server.daemonize)) 374 return; 375 fd = log_to_stdout ? STDOUT_FILENO : 376 open(server.logfile, O_APPEND|O_CREAT|O_WRONLY, 0644); 377 if (fd == -1) return; 378 ll2string(buf,sizeof(buf),getpid()); 379 if (write(fd,buf,strlen(buf)) == -1) goto err; 380 if (write(fd,":signal-handler (",17) == -1) goto err; 381 ll2string(buf,sizeof(buf),time(NULL)); 382 if (write(fd,buf,strlen(buf)) == -1) goto err; 383 if (write(fd,") ",2) == -1) goto err; 384 if (write(fd,msg,strlen(msg)) == -1) goto err; 385 if (write(fd,"\n",1) == -1) goto err; 386 err: 387 if (!log_to_stdout) close(fd); 388 } 389 390 /* Return the UNIX time in microseconds */ 391 long long ustime(void) { 392 struct timeval tv; 393 long long ust; 394 395 gettimeofday(&tv, NULL); 396 ust = ((long long)tv.tv_sec)*1000000; 397 ust += tv.tv_usec; 398 return ust; 399 } 400 401 /* Return the UNIX time in milliseconds */ 402 mstime_t mstime(void) { 403 return ustime()/1000; 404 } 405 406 /* After an RDB dump or AOF rewrite we exit from children using _exit() instead of 407 * exit(), because the latter may interact with the same file objects used by 408 * the parent process. However if we are testing the coverage normal exit() is 409 * used in order to obtain the right coverage information. */ 410 void exitFromChild(int retcode) { 411 #ifdef COVERAGE_TEST 412 exit(retcode); 413 #else 414 _exit(retcode); 415 #endif 416 } 417 418 /*====================== Hash table type implementation ==================== */ 419 420 /* This is a hash table type that uses the SDS dynamic strings library as 421 * keys and redis objects as values (objects can hold SDS strings, 422 * lists, sets). */ 423 424 void dictVanillaFree(void *privdata, void *val) 425 { 426 DICT_NOTUSED(privdata); 427 zfree(val); 428 } 429 430 void dictListDestructor(void *privdata, void *val) 431 { 432 DICT_NOTUSED(privdata); 433 listRelease((list*)val); 434 } 435 436 int dictSdsKeyCompare(void *privdata, const void *key1, 437 const void *key2) 438 { 439 int l1,l2; 440 DICT_NOTUSED(privdata); 441 442 l1 = sdslen((sds)key1); 443 l2 = sdslen((sds)key2); 444 if (l1 != l2) return 0; 445 return memcmp(key1, key2, l1) == 0; 446 } 447 448 /* A case insensitive version used for the command lookup table and other 449 * places where case insensitive non binary-safe comparison is needed. */ 450 int dictSdsKeyCaseCompare(void *privdata, const void *key1, 451 const void *key2) 452 { 453 DICT_NOTUSED(privdata); 454 455 return strcasecmp(key1, key2) == 0; 456 } 457 458 void dictObjectDestructor(void *privdata, void *val) 459 { 460 DICT_NOTUSED(privdata); 461 462 if (val == NULL) return; /* Values of swapped out keys as set to NULL */ 463 decrRefCount(val); 464 } 465 466 void dictSdsDestructor(void *privdata, void *val) 467 { 468 DICT_NOTUSED(privdata); 469 470 sdsfree(val); 471 } 472 473 int dictObjKeyCompare(void *privdata, const void *key1, 474 const void *key2) 475 { 476 const robj *o1 = key1, *o2 = key2; 477 return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); 478 } 479 480 unsigned int dictObjHash(const void *key) { 481 const robj *o = key; 482 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); 483 } 484 485 unsigned int dictSdsHash(const void *key) { 486 return dictGenHashFunction((unsigned char*)key, sdslen((char*)key)); 487 } 488 489 unsigned int dictSdsCaseHash(const void *key) { 490 return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key)); 491 } 492 493 int dictEncObjKeyCompare(void *privdata, const void *key1, 494 const void *key2) 495 { 496 robj *o1 = (robj*) key1, *o2 = (robj*) key2; 497 int cmp; 498 499 if (o1->encoding == OBJ_ENCODING_INT && 500 o2->encoding == OBJ_ENCODING_INT) 501 return o1->ptr == o2->ptr; 502 503 o1 = getDecodedObject(o1); 504 o2 = getDecodedObject(o2); 505 cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); 506 decrRefCount(o1); 507 decrRefCount(o2); 508 return cmp; 509 } 510 511 unsigned int dictEncObjHash(const void *key) { 512 robj *o = (robj*) key; 513 514 if (sdsEncodedObject(o)) { 515 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); 516 } else { 517 if (o->encoding == OBJ_ENCODING_INT) { 518 char buf[32]; 519 int len; 520 521 len = ll2string(buf,32,(long)o->ptr); 522 return dictGenHashFunction((unsigned char*)buf, len); 523 } else { 524 unsigned int hash; 525 526 o = getDecodedObject(o); 527 hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); 528 decrRefCount(o); 529 return hash; 530 } 531 } 532 } 533 534 /* Sets type hash table */ 535 dictType setDictType = { 536 dictEncObjHash, /* hash function */ 537 NULL, /* key dup */ 538 NULL, /* val dup */ 539 dictEncObjKeyCompare, /* key compare */ 540 dictObjectDestructor, /* key destructor */ 541 NULL /* val destructor */ 542 }; 543 544 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */ 545 dictType zsetDictType = { 546 dictEncObjHash, /* hash function */ 547 NULL, /* key dup */ 548 NULL, /* val dup */ 549 dictEncObjKeyCompare, /* key compare */ 550 dictObjectDestructor, /* key destructor */ 551 NULL /* val destructor */ 552 }; 553 554 /* Db->dict, keys are sds strings, vals are Redis objects. */ 555 dictType dbDictType = { 556 dictSdsHash, /* hash function */ 557 NULL, /* key dup */ 558 NULL, /* val dup */ 559 dictSdsKeyCompare, /* key compare */ 560 dictSdsDestructor, /* key destructor */ 561 dictObjectDestructor /* val destructor */ 562 }; 563 564 /* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */ 565 dictType shaScriptObjectDictType = { 566 dictSdsCaseHash, /* hash function */ 567 NULL, /* key dup */ 568 NULL, /* val dup */ 569 dictSdsKeyCaseCompare, /* key compare */ 570 dictSdsDestructor, /* key destructor */ 571 dictObjectDestructor /* val destructor */ 572 }; 573 574 /* Db->expires */ 575 dictType keyptrDictType = { 576 dictSdsHash, /* hash function */ 577 NULL, /* key dup */ 578 NULL, /* val dup */ 579 dictSdsKeyCompare, /* key compare */ 580 NULL, /* key destructor */ 581 NULL /* val destructor */ 582 }; 583 584 /* Command table. sds string -> command struct pointer. */ 585 dictType commandTableDictType = { 586 dictSdsCaseHash, /* hash function */ 587 NULL, /* key dup */ 588 NULL, /* val dup */ 589 dictSdsKeyCaseCompare, /* key compare */ 590 dictSdsDestructor, /* key destructor */ 591 NULL /* val destructor */ 592 }; 593 594 /* Hash type hash table (note that small hashes are represented with ziplists) */ 595 dictType hashDictType = { 596 dictEncObjHash, /* hash function */ 597 NULL, /* key dup */ 598 NULL, /* val dup */ 599 dictEncObjKeyCompare, /* key compare */ 600 dictObjectDestructor, /* key destructor */ 601 dictObjectDestructor /* val destructor */ 602 }; 603 604 /* Keylist hash table type has unencoded redis objects as keys and 605 * lists as values. It's used for blocking operations (BLPOP) and to 606 * map swapped keys to a list of clients waiting for this keys to be loaded. */ 607 dictType keylistDictType = { 608 dictObjHash, /* hash function */ 609 NULL, /* key dup */ 610 NULL, /* val dup */ 611 dictObjKeyCompare, /* key compare */ 612 dictObjectDestructor, /* key destructor */ 613 dictListDestructor /* val destructor */ 614 }; 615 616 /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to 617 * clusterNode structures. */ 618 dictType clusterNodesDictType = { 619 dictSdsHash, /* hash function */ 620 NULL, /* key dup */ 621 NULL, /* val dup */ 622 dictSdsKeyCompare, /* key compare */ 623 dictSdsDestructor, /* key destructor */ 624 NULL /* val destructor */ 625 }; 626 627 /* Cluster re-addition blacklist. This maps node IDs to the time 628 * we can re-add this node. The goal is to avoid readding a removed 629 * node for some time. */ 630 dictType clusterNodesBlackListDictType = { 631 dictSdsCaseHash, /* hash function */ 632 NULL, /* key dup */ 633 NULL, /* val dup */ 634 dictSdsKeyCaseCompare, /* key compare */ 635 dictSdsDestructor, /* key destructor */ 636 NULL /* val destructor */ 637 }; 638 639 /* Migrate cache dict type. */ 640 dictType migrateCacheDictType = { 641 dictSdsHash, /* hash function */ 642 NULL, /* key dup */ 643 NULL, /* val dup */ 644 dictSdsKeyCompare, /* key compare */ 645 dictSdsDestructor, /* key destructor */ 646 NULL /* val destructor */ 647 }; 648 649 /* Replication cached script dict (server.repl_scriptcache_dict). 650 * Keys are sds SHA1 strings, while values are not used at all in the current 651 * implementation. */ 652 dictType replScriptCacheDictType = { 653 dictSdsCaseHash, /* hash function */ 654 NULL, /* key dup */ 655 NULL, /* val dup */ 656 dictSdsKeyCaseCompare, /* key compare */ 657 dictSdsDestructor, /* key destructor */ 658 NULL /* val destructor */ 659 }; 660 661 int htNeedsResize(dict *dict) { 662 long long size, used; 663 664 size = dictSlots(dict); 665 used = dictSize(dict); 666 return (size > DICT_HT_INITIAL_SIZE && 667 (used*100/size < HASHTABLE_MIN_FILL)); 668 } 669 670 /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL 671 * we resize the hash table to save memory */ 672 void tryResizeHashTables(int dbid) { 673 if (htNeedsResize(server.db[dbid].dict)) 674 dictResize(server.db[dbid].dict); 675 if (htNeedsResize(server.db[dbid].expires)) 676 dictResize(server.db[dbid].expires); 677 } 678 679 /* Our hash table implementation performs rehashing incrementally while 680 * we write/read from the hash table. Still if the server is idle, the hash 681 * table will use two tables for a long time. So we try to use 1 millisecond 682 * of CPU time at every call of this function to perform some rehahsing. 683 * 684 * The function returns 1 if some rehashing was performed, otherwise 0 685 * is returned. */ 686 int incrementallyRehash(int dbid) { 687 /* Keys dictionary */ 688 if (dictIsRehashing(server.db[dbid].dict)) { 689 dictRehashMilliseconds(server.db[dbid].dict,1); 690 return 1; /* already used our millisecond for this loop... */ 691 } 692 /* Expires */ 693 if (dictIsRehashing(server.db[dbid].expires)) { 694 dictRehashMilliseconds(server.db[dbid].expires,1); 695 return 1; /* already used our millisecond for this loop... */ 696 } 697 return 0; 698 } 699 700 /* This function is called once a background process of some kind terminates, 701 * as we want to avoid resizing the hash tables when there is a child in order 702 * to play well with copy-on-write (otherwise when a resize happens lots of 703 * memory pages are copied). The goal of this function is to update the ability 704 * for dict.c to resize the hash tables accordingly to the fact we have o not 705 * running childs. */ 706 void updateDictResizePolicy(void) { 707 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) 708 dictEnableResize(); 709 else 710 dictDisableResize(); 711 } 712 713 /* ======================= Cron: called every 100 ms ======================== */ 714 715 /* Helper function for the activeExpireCycle() function. 716 * This function will try to expire the key that is stored in the hash table 717 * entry 'de' of the 'expires' hash table of a Redis database. 718 * 719 * If the key is found to be expired, it is removed from the database and 720 * 1 is returned. Otherwise no operation is performed and 0 is returned. 721 * 722 * When a key is expired, server.stat_expiredkeys is incremented. 723 * 724 * The parameter 'now' is the current time in milliseconds as is passed 725 * to the function to avoid too many gettimeofday() syscalls. */ 726 int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { 727 long long t = dictGetSignedIntegerVal(de); 728 if (now > t) { 729 sds key = dictGetKey(de); 730 robj *keyobj = createStringObject(key,sdslen(key)); 731 732 propagateExpire(db,keyobj); 733 dbDelete(db,keyobj); 734 notifyKeyspaceEvent(NOTIFY_EXPIRED, 735 "expired",keyobj,db->id); 736 decrRefCount(keyobj); 737 server.stat_expiredkeys++; 738 return 1; 739 } else { 740 return 0; 741 } 742 } 743 744 /* Try to expire a few timed out keys. The algorithm used is adaptive and 745 * will use few CPU cycles if there are few expiring keys, otherwise 746 * it will get more aggressive to avoid that too much memory is used by 747 * keys that can be removed from the keyspace. 748 * 749 * No more than CRON_DBS_PER_CALL databases are tested at every 750 * iteration. 751 * 752 * This kind of call is used when Redis detects that timelimit_exit is 753 * true, so there is more work to do, and we do it more incrementally from 754 * the beforeSleep() function of the event loop. 755 * 756 * Expire cycle type: 757 * 758 * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a 759 * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION 760 * microseconds, and is not repeated again before the same amount of time. 761 * 762 * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is 763 * executed, where the time limit is a percentage of the REDIS_HZ period 764 * as specified by the REDIS_EXPIRELOOKUPS_TIME_PERC define. */ 765 766 void activeExpireCycle(int type) { 767 /* This function has some global state in order to continue the work 768 * incrementally across calls. */ 769 static unsigned int current_db = 0; /* Last DB tested. */ 770 static int timelimit_exit = 0; /* Time limit hit in previous call? */ 771 static long long last_fast_cycle = 0; /* When last fast cycle ran. */ 772 773 int j, iteration = 0; 774 int dbs_per_call = CRON_DBS_PER_CALL; 775 long long start = ustime(), timelimit; 776 777 if (type == ACTIVE_EXPIRE_CYCLE_FAST) { 778 /* Don't start a fast cycle if the previous cycle did not exited 779 * for time limt. Also don't repeat a fast cycle for the same period 780 * as the fast cycle total duration itself. */ 781 if (!timelimit_exit) return; 782 if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return; 783 last_fast_cycle = start; 784 } 785 786 /* We usually should test CRON_DBS_PER_CALL per iteration, with 787 * two exceptions: 788 * 789 * 1) Don't test more DBs than we have. 790 * 2) If last time we hit the time limit, we want to scan all DBs 791 * in this iteration, as there is work to do in some DB and we don't want 792 * expired keys to use memory for too much time. */ 793 if (dbs_per_call > server.dbnum || timelimit_exit) 794 dbs_per_call = server.dbnum; 795 796 /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time 797 * per iteration. Since this function gets called with a frequency of 798 * server.hz times per second, the following is the max amount of 799 * microseconds we can spend in this function. */ 800 timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100; 801 timelimit_exit = 0; 802 if (timelimit <= 0) timelimit = 1; 803 804 if (type == ACTIVE_EXPIRE_CYCLE_FAST) 805 timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */ 806 807 for (j = 0; j < dbs_per_call; j++) { 808 int expired; 809 redisDb *db = server.db+(current_db % server.dbnum); 810 811 /* Increment the DB now so we are sure if we run out of time 812 * in the current DB we'll restart from the next. This allows to 813 * distribute the time evenly across DBs. */ 814 current_db++; 815 816 /* Continue to expire if at the end of the cycle more than 25% 817 * of the keys were expired. */ 818 do { 819 unsigned long num, slots; 820 long long now, ttl_sum; 821 int ttl_samples; 822 823 /* If there is nothing to expire try next DB ASAP. */ 824 if ((num = dictSize(db->expires)) == 0) { 825 db->avg_ttl = 0; 826 break; 827 } 828 slots = dictSlots(db->expires); 829 now = mstime(); 830 831 /* When there are less than 1% filled slots getting random 832 * keys is expensive, so stop here waiting for better times... 833 * The dictionary will be resized asap. */ 834 if (num && slots > DICT_HT_INITIAL_SIZE && 835 (num*100/slots < 1)) break; 836 837 /* The main collection cycle. Sample random keys among keys 838 * with an expire set, checking for expired ones. */ 839 expired = 0; 840 ttl_sum = 0; 841 ttl_samples = 0; 842 843 if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) 844 num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; 845 846 while (num--) { 847 dictEntry *de; 848 long long ttl; 849 850 if ((de = dictGetRandomKey(db->expires)) == NULL) break; 851 ttl = dictGetSignedIntegerVal(de)-now; 852 if (activeExpireCycleTryExpire(db,de,now)) expired++; 853 if (ttl > 0) { 854 /* We want the average TTL of keys yet not expired. */ 855 ttl_sum += ttl; 856 ttl_samples++; 857 } 858 } 859 860 /* Update the average TTL stats for this database. */ 861 if (ttl_samples) { 862 long long avg_ttl = ttl_sum/ttl_samples; 863 864 /* Do a simple running average with a few samples. 865 * We just use the current estimate with a weight of 2% 866 * and the previous estimate with a weight of 98%. */ 867 if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; 868 db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50); 869 } 870 871 /* We can't block forever here even if there are many keys to 872 * expire. So after a given amount of milliseconds return to the 873 * caller waiting for the other active expire cycle. */ 874 iteration++; 875 if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */ 876 long long elapsed = ustime()-start; 877 878 latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); 879 if (elapsed > timelimit) timelimit_exit = 1; 880 } 881 if (timelimit_exit) return; 882 /* We don't repeat the cycle if there are less than 25% of keys 883 * found expired in the current DB. */ 884 } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); 885 } 886 } 887 888 unsigned int getLRUClock(void) { 889 return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX; 890 } 891 892 /* Add a sample to the operations per second array of samples. */ 893 void trackInstantaneousMetric(int metric, long long current_reading) { 894 long long t = mstime() - server.inst_metric[metric].last_sample_time; 895 long long ops = current_reading - 896 server.inst_metric[metric].last_sample_count; 897 long long ops_sec; 898 899 ops_sec = t > 0 ? (ops*1000/t) : 0; 900 901 server.inst_metric[metric].samples[server.inst_metric[metric].idx] = 902 ops_sec; 903 server.inst_metric[metric].idx++; 904 server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES; 905 server.inst_metric[metric].last_sample_time = mstime(); 906 server.inst_metric[metric].last_sample_count = current_reading; 907 } 908 909 /* Return the mean of all the samples. */ 910 long long getInstantaneousMetric(int metric) { 911 int j; 912 long long sum = 0; 913 914 for (j = 0; j < STATS_METRIC_SAMPLES; j++) 915 sum += server.inst_metric[metric].samples[j]; 916 return sum / STATS_METRIC_SAMPLES; 917 } 918 919 /* Check for timeouts. Returns non-zero if the client was terminated. 920 * The function gets the current time in milliseconds as argument since 921 * it gets called multiple times in a loop, so calling gettimeofday() for 922 * each iteration would be costly without any actual gain. */ 923 int clientsCronHandleTimeout(client *c, mstime_t now_ms) { 924 time_t now = now_ms/1000; 925 926 if (server.maxidletime && 927 !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */ 928 !(c->flags & CLIENT_MASTER) && /* no timeout for masters */ 929 !(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */ 930 !(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */ 931 (now - c->lastinteraction > server.maxidletime)) 932 { 933 serverLog(LL_VERBOSE,"Closing idle client"); 934 freeClient(c); 935 return 1; 936 } else if (c->flags & CLIENT_BLOCKED) { 937 /* Blocked OPS timeout is handled with milliseconds resolution. 938 * However note that the actual resolution is limited by 939 * server.hz. */ 940 941 if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) { 942 /* Handle blocking operation specific timeout. */ 943 replyToBlockedClientTimedOut(c); 944 unblockClient(c); 945 } else if (server.cluster_enabled) { 946 /* Cluster: handle unblock & redirect of clients blocked 947 * into keys no longer served by this server. */ 948 if (clusterRedirectBlockedClientIfNeeded(c)) 949 unblockClient(c); 950 } 951 } 952 return 0; 953 } 954 955 /* The client query buffer is an sds.c string that can end with a lot of 956 * free space not used, this function reclaims space if needed. 957 * 958 * The function always returns 0 as it never terminates the client. */ 959 int clientsCronResizeQueryBuffer(client *c) { 960 size_t querybuf_size = sdsAllocSize(c->querybuf); 961 time_t idletime = server.unixtime - c->lastinteraction; 962 963 /* There are two conditions to resize the query buffer: 964 * 1) Query buffer is > BIG_ARG and too big for latest peak. 965 * 2) Client is inactive and the buffer is bigger than 1k. */ 966 if (((querybuf_size > PROTO_MBULK_BIG_ARG) && 967 (querybuf_size/(c->querybuf_peak+1)) > 2) || 968 (querybuf_size > 1024 && idletime > 2)) 969 { 970 /* Only resize the query buffer if it is actually wasting space. */ 971 if (sdsavail(c->querybuf) > 1024) { 972 c->querybuf = sdsRemoveFreeSpace(c->querybuf); 973 } 974 } 975 /* Reset the peak again to capture the peak memory usage in the next 976 * cycle. */ 977 c->querybuf_peak = 0; 978 return 0; 979 } 980 981 #define CLIENTS_CRON_MIN_ITERATIONS 5 982 void clientsCron(void) { 983 /* Make sure to process at least numclients/server.hz of clients 984 * per call. Since this function is called server.hz times per second 985 * we are sure that in the worst case we process all the clients in 1 986 * second. */ 987 int numclients = listLength(server.clients); 988 int iterations = numclients/server.hz; 989 mstime_t now = mstime(); 990 991 /* Process at least a few clients while we are at it, even if we need 992 * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract 993 * of processing each client once per second. */ 994 if (iterations < CLIENTS_CRON_MIN_ITERATIONS) 995 iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ? 996 numclients : CLIENTS_CRON_MIN_ITERATIONS; 997 998 while(listLength(server.clients) && iterations--) { 999 client *c; 1000 listNode *head; 1001 1002 /* Rotate the list, take the current head, process. 1003 * This way if the client must be removed from the list it's the 1004 * first element and we don't incur into O(N) computation. */ 1005 listRotate(server.clients); 1006 head = listFirst(server.clients); 1007 c = listNodeValue(head); 1008 /* The following functions do different service checks on the client. 1009 * The protocol is that they return non-zero if the client was 1010 * terminated. */ 1011 if (clientsCronHandleTimeout(c,now)) continue; 1012 if (clientsCronResizeQueryBuffer(c)) continue; 1013 } 1014 } 1015 1016 /* This function handles 'background' operations we are required to do 1017 * incrementally in Redis databases, such as active key expiring, resizing, 1018 * rehashing. */ 1019 void databasesCron(void) { 1020 /* Expire keys by random sampling. Not required for slaves 1021 * as master will synthesize DELs for us. */ 1022 if (server.active_expire_enabled && server.masterhost == NULL) 1023 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); 1024 1025 /* Perform hash tables rehashing if needed, but only if there are no 1026 * other processes saving the DB on disk. Otherwise rehashing is bad 1027 * as will cause a lot of copy-on-write of memory pages. */ 1028 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { 1029 /* We use global counters so if we stop the computation at a given 1030 * DB we'll be able to start from the successive in the next 1031 * cron loop iteration. */ 1032 static unsigned int resize_db = 0; 1033 static unsigned int rehash_db = 0; 1034 int dbs_per_call = CRON_DBS_PER_CALL; 1035 int j; 1036 1037 /* Don't test more DBs than we have. */ 1038 if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; 1039 1040 /* Resize */ 1041 for (j = 0; j < dbs_per_call; j++) { 1042 tryResizeHashTables(resize_db % server.dbnum); 1043 resize_db++; 1044 } 1045 1046 /* Rehash */ 1047 if (server.activerehashing) { 1048 for (j = 0; j < dbs_per_call; j++) { 1049 int work_done = incrementallyRehash(rehash_db % server.dbnum); 1050 rehash_db++; 1051 if (work_done) { 1052 /* If the function did some work, stop here, we'll do 1053 * more at the next cron loop. */ 1054 break; 1055 } 1056 } 1057 } 1058 } 1059 } 1060 1061 /* We take a cached value of the unix time in the global state because with 1062 * virtual memory and aging there is to store the current time in objects at 1063 * every object access, and accuracy is not needed. To access a global var is 1064 * a lot faster than calling time(NULL) */ 1065 void updateCachedTime(void) { 1066 server.unixtime = time(NULL); 1067 server.mstime = mstime(); 1068 } 1069 1070 /* This is our timer interrupt, called server.hz times per second. 1071 * Here is where we do a number of things that need to be done asynchronously. 1072 * For instance: 1073 * 1074 * - Active expired keys collection (it is also performed in a lazy way on 1075 * lookup). 1076 * - Software watchdog. 1077 * - Update some statistic. 1078 * - Incremental rehashing of the DBs hash tables. 1079 * - Triggering BGSAVE / AOF rewrite, and handling of terminated children. 1080 * - Clients timeout of different kinds. 1081 * - Replication reconnection. 1082 * - Many more... 1083 * 1084 * Everything directly called here will be called server.hz times per second, 1085 * so in order to throttle execution of things we want to do less frequently 1086 * a macro is used: run_with_period(milliseconds) { .... } 1087 */ 1088 1089 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { 1090 int j; 1091 UNUSED(eventLoop); 1092 UNUSED(id); 1093 UNUSED(clientData); 1094 1095 /* Software watchdog: deliver the SIGALRM that will reach the signal 1096 * handler if we don't return here fast enough. */ 1097 if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); 1098 1099 /* Update the time cache. */ 1100 updateCachedTime(); 1101 1102 run_with_period(100) { 1103 trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands); 1104 trackInstantaneousMetric(STATS_METRIC_NET_INPUT, 1105 server.stat_net_input_bytes); 1106 trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, 1107 server.stat_net_output_bytes); 1108 } 1109 1110 /* We have just LRU_BITS bits per object for LRU information. 1111 * So we use an (eventually wrapping) LRU clock. 1112 * 1113 * Note that even if the counter wraps it's not a big problem, 1114 * everything will still work but some object will appear younger 1115 * to Redis. However for this to happen a given object should never be 1116 * touched for all the time needed to the counter to wrap, which is 1117 * not likely. 1118 * 1119 * Note that you can change the resolution altering the 1120 * LRU_CLOCK_RESOLUTION define. */ 1121 server.lruclock = getLRUClock(); 1122 1123 /* Record the max memory used since the server was started. */ 1124 if (zmalloc_used_memory() > server.stat_peak_memory) 1125 server.stat_peak_memory = zmalloc_used_memory(); 1126 1127 /* Sample the RSS here since this is a relatively slow call. */ 1128 server.resident_set_size = zmalloc_get_rss(); 1129 1130 /* We received a SIGTERM, shutting down here in a safe way, as it is 1131 * not ok doing so inside the signal handler. */ 1132 if (server.shutdown_asap) { 1133 if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0); 1134 serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); 1135 server.shutdown_asap = 0; 1136 } 1137 1138 /* Show some info about non-empty databases */ 1139 run_with_period(5000) { 1140 for (j = 0; j < server.dbnum; j++) { 1141 long long size, used, vkeys; 1142 1143 size = dictSlots(server.db[j].dict); 1144 used = dictSize(server.db[j].dict); 1145 vkeys = dictSize(server.db[j].expires); 1146 if (used || vkeys) { 1147 serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); 1148 /* dictPrintStats(server.dict); */ 1149 } 1150 } 1151 } 1152 1153 /* Show information about connected clients */ 1154 if (!server.sentinel_mode) { 1155 run_with_period(5000) { 1156 serverLog(LL_VERBOSE, 1157 "%lu clients connected (%lu slaves), %zu bytes in use", 1158 listLength(server.clients)-listLength(server.slaves), 1159 listLength(server.slaves), 1160 zmalloc_used_memory()); 1161 } 1162 } 1163 1164 /* We need to do a few operations on clients asynchronously. */ 1165 clientsCron(); 1166 1167 /* Handle background operations on Redis databases. */ 1168 databasesCron(); 1169 1170 /* Start a scheduled AOF rewrite if this was requested by the user while 1171 * a BGSAVE was in progress. */ 1172 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && 1173 server.aof_rewrite_scheduled) 1174 { 1175 rewriteAppendOnlyFileBackground(); 1176 } 1177 1178 /* Check if a background saving or AOF rewrite in progress terminated. */ 1179 if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || 1180 ldbPendingChildren()) 1181 { 1182 int statloc; 1183 pid_t pid; 1184 1185 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { 1186 int exitcode = WEXITSTATUS(statloc); 1187 int bysignal = 0; 1188 1189 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); 1190 1191 if (pid == -1) { 1192 serverLog(LL_WARNING,"wait3() returned an error: %s. " 1193 "rdb_child_pid = %d, aof_child_pid = %d", 1194 strerror(errno), 1195 (int) server.rdb_child_pid, 1196 (int) server.aof_child_pid); 1197 } else if (pid == server.rdb_child_pid) { 1198 backgroundSaveDoneHandler(exitcode,bysignal); 1199 } else if (pid == server.aof_child_pid) { 1200 backgroundRewriteDoneHandler(exitcode,bysignal); 1201 } else { 1202 if (!ldbRemoveChild(pid)) { 1203 serverLog(LL_WARNING, 1204 "Warning, detected child with unmatched pid: %ld", 1205 (long)pid); 1206 } 1207 } 1208 updateDictResizePolicy(); 1209 } 1210 } else { 1211 /* If there is not a background saving/rewrite in progress check if 1212 * we have to save/rewrite now */ 1213 for (j = 0; j < server.saveparamslen; j++) { 1214 struct saveparam *sp = server.saveparams+j; 1215 1216 /* Save if we reached the given amount of changes, 1217 * the given amount of seconds, and if the latest bgsave was 1218 * successful or if, in case of an error, at least 1219 * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */ 1220 if (server.dirty >= sp->changes && 1221 server.unixtime-server.lastsave > sp->seconds && 1222 (server.unixtime-server.lastbgsave_try > 1223 CONFIG_BGSAVE_RETRY_DELAY || 1224 server.lastbgsave_status == C_OK)) 1225 { 1226 serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", 1227 sp->changes, (int)sp->seconds); 1228 rdbSaveBackground(server.rdb_filename); 1229 break; 1230 } 1231 } 1232 1233 /* Trigger an AOF rewrite if needed */ 1234 if (server.rdb_child_pid == -1 && 1235 server.aof_child_pid == -1 && 1236 server.aof_rewrite_perc && 1237 server.aof_current_size > server.aof_rewrite_min_size) 1238 { 1239 long long base = server.aof_rewrite_base_size ? 1240 server.aof_rewrite_base_size : 1; 1241 long long growth = (server.aof_current_size*100/base) - 100; 1242 if (growth >= server.aof_rewrite_perc) { 1243 serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); 1244 rewriteAppendOnlyFileBackground(); 1245 } 1246 } 1247 } 1248 1249 1250 /* AOF postponed flush: Try at every cron cycle if the slow fsync 1251 * completed. */ 1252 if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); 1253 1254 /* AOF write errors: in this case we have a buffer to flush as well and 1255 * clear the AOF error in case of success to make the DB writable again, 1256 * however to try every second is enough in case of 'hz' is set to 1257 * an higher frequency. */ 1258 run_with_period(1000) { 1259 if (server.aof_last_write_status == C_ERR) 1260 flushAppendOnlyFile(0); 1261 } 1262 1263 /* Close clients that need to be closed asynchronous */ 1264 freeClientsInAsyncFreeQueue(); 1265 1266 /* Clear the paused clients flag if needed. */ 1267 clientsArePaused(); /* Don't check return value, just use the side effect. */ 1268 1269 /* Replication cron function -- used to reconnect to master and 1270 * to detect transfer failures. */ 1271 run_with_period(1000) replicationCron(); 1272 1273 /* Run the Redis Cluster cron. */ 1274 run_with_period(100) { 1275 if (server.cluster_enabled) clusterCron(); 1276 } 1277 1278 /* Run the Sentinel timer if we are in sentinel mode. */ 1279 run_with_period(100) { 1280 if (server.sentinel_mode) sentinelTimer(); 1281 } 1282 1283 /* Cleanup expired MIGRATE cached sockets. */ 1284 run_with_period(1000) { 1285 migrateCloseTimedoutSockets(); 1286 } 1287 1288 server.cronloops++; 1289 return 1000/server.hz; 1290 } 1291 1292 /* This function gets called every time Redis is entering the 1293 * main loop of the event driven library, that is, before to sleep 1294 * for ready file descriptors. */ 1295 void beforeSleep(struct aeEventLoop *eventLoop) { 1296 UNUSED(eventLoop); 1297 1298 /* Call the Redis Cluster before sleep function. Note that this function 1299 * may change the state of Redis Cluster (from ok to fail or vice versa), 1300 * so it's a good idea to call it before serving the unblocked clients 1301 * later in this function. */ 1302 if (server.cluster_enabled) clusterBeforeSleep(); 1303 1304 /* Run a fast expire cycle (the called function will return 1305 * ASAP if a fast cycle is not needed). */ 1306 if (server.active_expire_enabled && server.masterhost == NULL) 1307 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); 1308 1309 /* Send all the slaves an ACK request if at least one client blocked 1310 * during the previous event loop iteration. */ 1311 if (server.get_ack_from_slaves) { 1312 robj *argv[3]; 1313 1314 argv[0] = createStringObject("REPLCONF",8); 1315 argv[1] = createStringObject("GETACK",6); 1316 argv[2] = createStringObject("*",1); /* Not used argument. */ 1317 replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); 1318 decrRefCount(argv[0]); 1319 decrRefCount(argv[1]); 1320 decrRefCount(argv[2]); 1321 server.get_ack_from_slaves = 0; 1322 } 1323 1324 /* Unblock all the clients blocked for synchronous replication 1325 * in WAIT. */ 1326 if (listLength(server.clients_waiting_acks)) 1327 processClientsWaitingReplicas(); 1328 1329 /* Try to process pending commands for clients that were just unblocked. */ 1330 if (listLength(server.unblocked_clients)) 1331 processUnblockedClients(); 1332 1333 /* Write the AOF buffer on disk */ 1334 flushAppendOnlyFile(0); 1335 1336 /* Handle writes with pending output buffers. */ 1337 handleClientsWithPendingWrites(); 1338 } 1339 1340 /* =========================== Server initialization ======================== */ 1341 1342 void createSharedObjects(void) { 1343 int j; 1344 1345 shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n")); 1346 shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n")); 1347 shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n")); 1348 shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n")); 1349 shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n")); 1350 shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n")); 1351 shared.cnegone = createObject(OBJ_STRING,sdsnew(":-1\r\n")); 1352 shared.nullbulk = createObject(OBJ_STRING,sdsnew("$-1\r\n")); 1353 shared.nullmultibulk = createObject(OBJ_STRING,sdsnew("*-1\r\n")); 1354 shared.emptymultibulk = createObject(OBJ_STRING,sdsnew("*0\r\n")); 1355 shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n")); 1356 shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n")); 1357 shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n")); 1358 shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew( 1359 "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")); 1360 shared.nokeyerr = createObject(OBJ_STRING,sdsnew( 1361 "-ERR no such key\r\n")); 1362 shared.syntaxerr = createObject(OBJ_STRING,sdsnew( 1363 "-ERR syntax error\r\n")); 1364 shared.sameobjecterr = createObject(OBJ_STRING,sdsnew( 1365 "-ERR source and destination objects are the same\r\n")); 1366 shared.outofrangeerr = createObject(OBJ_STRING,sdsnew( 1367 "-ERR index out of range\r\n")); 1368 shared.noscripterr = createObject(OBJ_STRING,sdsnew( 1369 "-NOSCRIPT No matching script. Please use EVAL.\r\n")); 1370 shared.loadingerr = createObject(OBJ_STRING,sdsnew( 1371 "-LOADING Redis is loading the dataset in memory\r\n")); 1372 shared.slowscripterr = createObject(OBJ_STRING,sdsnew( 1373 "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n")); 1374 shared.masterdownerr = createObject(OBJ_STRING,sdsnew( 1375 "-MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.\r\n")); 1376 shared.bgsaveerr = createObject(OBJ_STRING,sdsnew( 1377 "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.\r\n")); 1378 shared.roslaveerr = createObject(OBJ_STRING,sdsnew( 1379 "-READONLY You can't write against a read only slave.\r\n")); 1380 shared.noautherr = createObject(OBJ_STRING,sdsnew( 1381 "-NOAUTH Authentication required.\r\n")); 1382 shared.oomerr = createObject(OBJ_STRING,sdsnew( 1383 "-OOM command not allowed when used memory > 'maxmemory'.\r\n")); 1384 shared.execaborterr = createObject(OBJ_STRING,sdsnew( 1385 "-EXECABORT Transaction discarded because of previous errors.\r\n")); 1386 shared.noreplicaserr = createObject(OBJ_STRING,sdsnew( 1387 "-NOREPLICAS Not enough good slaves to write.\r\n")); 1388 shared.busykeyerr = createObject(OBJ_STRING,sdsnew( 1389 "-BUSYKEY Target key name already exists.\r\n")); 1390 shared.space = createObject(OBJ_STRING,sdsnew(" ")); 1391 shared.colon = createObject(OBJ_STRING,sdsnew(":")); 1392 shared.plus = createObject(OBJ_STRING,sdsnew("+")); 1393 1394 for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) { 1395 char dictid_str[64]; 1396 int dictid_len; 1397 1398 dictid_len = ll2string(dictid_str,sizeof(dictid_str),j); 1399 shared.select[j] = createObject(OBJ_STRING, 1400 sdscatprintf(sdsempty(), 1401 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", 1402 dictid_len, dictid_str)); 1403 } 1404 shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13); 1405 shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14); 1406 shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15); 1407 shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18); 1408 shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); 1409 shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); 1410 shared.del = createStringObject("DEL",3); 1411 shared.rpop = createStringObject("RPOP",4); 1412 shared.lpop = createStringObject("LPOP",4); 1413 shared.lpush = createStringObject("LPUSH",5); 1414 for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { 1415 shared.integers[j] = createObject(OBJ_STRING,(void*)(long)j); 1416 shared.integers[j]->encoding = OBJ_ENCODING_INT; 1417 } 1418 for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) { 1419 shared.mbulkhdr[j] = createObject(OBJ_STRING, 1420 sdscatprintf(sdsempty(),"*%d\r\n",j)); 1421 shared.bulkhdr[j] = createObject(OBJ_STRING, 1422 sdscatprintf(sdsempty(),"$%d\r\n",j)); 1423 } 1424 /* The following two shared objects, minstring and maxstrings, are not 1425 * actually used for their value but as a special object meaning 1426 * respectively the minimum possible string and the maximum possible 1427 * string in string comparisons for the ZRANGEBYLEX command. */ 1428 shared.minstring = createStringObject("minstring",9); 1429 shared.maxstring = createStringObject("maxstring",9); 1430 } 1431 1432 void initServerConfig(void) { 1433 int j; 1434 1435 getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE); 1436 server.configfile = NULL; 1437 server.executable = NULL; 1438 server.hz = CONFIG_DEFAULT_HZ; 1439 server.runid[CONFIG_RUN_ID_SIZE] = '\0'; 1440 server.arch_bits = (sizeof(long) == 8) ? 64 : 32; 1441 server.port = CONFIG_DEFAULT_SERVER_PORT; 1442 server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG; 1443 server.bindaddr_count = 0; 1444 server.unixsocket = NULL; 1445 server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM; 1446 server.ipfd_count = 0; 1447 server.sofd = -1; 1448 server.protected_mode = CONFIG_DEFAULT_PROTECTED_MODE; 1449 server.dbnum = CONFIG_DEFAULT_DBNUM; 1450 server.verbosity = CONFIG_DEFAULT_VERBOSITY; 1451 server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT; 1452 server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE; 1453 server.active_expire_enabled = 1; 1454 server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN; 1455 server.saveparams = NULL; 1456 server.loading = 0; 1457 server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE); 1458 server.syslog_enabled = CONFIG_DEFAULT_SYSLOG_ENABLED; 1459 server.syslog_ident = zstrdup(CONFIG_DEFAULT_SYSLOG_IDENT); 1460 server.syslog_facility = LOG_LOCAL0; 1461 server.daemonize = CONFIG_DEFAULT_DAEMONIZE; 1462 server.supervised = 0; 1463 server.supervised_mode = SUPERVISED_NONE; 1464 server.aof_state = AOF_OFF; 1465 server.aof_fsync = CONFIG_DEFAULT_AOF_FSYNC; 1466 server.aof_no_fsync_on_rewrite = CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE; 1467 server.aof_rewrite_perc = AOF_REWRITE_PERC; 1468 server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE; 1469 server.aof_rewrite_base_size = 0; 1470 server.aof_rewrite_scheduled = 0; 1471 server.aof_last_fsync = time(NULL); 1472 server.aof_rewrite_time_last = -1; 1473 server.aof_rewrite_time_start = -1; 1474 server.aof_lastbgrewrite_status = C_OK; 1475 server.aof_delayed_fsync = 0; 1476 server.aof_fd = -1; 1477 server.aof_selected_db = -1; /* Make sure the first time will not match */ 1478 server.aof_flush_postponed_start = 0; 1479 server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; 1480 server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; 1481 server.pidfile = NULL; 1482 server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME); 1483 server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME); 1484 server.requirepass = NULL; 1485 server.rdb_compression = CONFIG_DEFAULT_RDB_COMPRESSION; 1486 server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM; 1487 server.stop_writes_on_bgsave_err = CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR; 1488 server.activerehashing = CONFIG_DEFAULT_ACTIVE_REHASHING; 1489 server.notify_keyspace_events = 0; 1490 server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS; 1491 server.bpop_blocked_clients = 0; 1492 server.maxmemory = CONFIG_DEFAULT_MAXMEMORY; 1493 server.maxmemory_policy = CONFIG_DEFAULT_MAXMEMORY_POLICY; 1494 server.maxmemory_samples = CONFIG_DEFAULT_MAXMEMORY_SAMPLES; 1495 server.hash_max_ziplist_entries = OBJ_HASH_MAX_ZIPLIST_ENTRIES; 1496 server.hash_max_ziplist_value = OBJ_HASH_MAX_ZIPLIST_VALUE; 1497 server.list_max_ziplist_size = OBJ_LIST_MAX_ZIPLIST_SIZE; 1498 server.list_compress_depth = OBJ_LIST_COMPRESS_DEPTH; 1499 server.set_max_intset_entries = OBJ_SET_MAX_INTSET_ENTRIES; 1500 server.zset_max_ziplist_entries = OBJ_ZSET_MAX_ZIPLIST_ENTRIES; 1501 server.zset_max_ziplist_value = OBJ_ZSET_MAX_ZIPLIST_VALUE; 1502 server.hll_sparse_max_bytes = CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES; 1503 server.shutdown_asap = 0; 1504 server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD; 1505 server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT; 1506 server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE; 1507 server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG; 1508 server.cluster_enabled = 0; 1509 server.cluster_node_timeout = CLUSTER_DEFAULT_NODE_TIMEOUT; 1510 server.cluster_migration_barrier = CLUSTER_DEFAULT_MIGRATION_BARRIER; 1511 server.cluster_slave_validity_factor = CLUSTER_DEFAULT_SLAVE_VALIDITY; 1512 server.cluster_require_full_coverage = CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE; 1513 server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE); 1514 server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); 1515 server.next_client_id = 1; /* Client IDs, start from 1 .*/ 1516 server.loading_process_events_interval_bytes = (1024*1024*2); 1517 1518 server.lruclock = getLRUClock(); 1519 resetServerSaveParams(); 1520 1521 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ 1522 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */ 1523 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */ 1524 /* Replication related */ 1525 server.masterauth = NULL; 1526 server.masterhost = NULL; 1527 server.masterport = 6379; 1528 server.master = NULL; 1529 server.cached_master = NULL; 1530 server.repl_master_initial_offset = -1; 1531 server.repl_state = REPL_STATE_NONE; 1532 server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; 1533 server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA; 1534 server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY; 1535 server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ 1536 server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY; 1537 server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC; 1538 server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY; 1539 server.slave_priority = CONFIG_DEFAULT_SLAVE_PRIORITY; 1540 server.master_repl_offset = 0; 1541 1542 /* Replication partial resync backlog */ 1543 server.repl_backlog = NULL; 1544 server.repl_backlog_size = CONFIG_DEFAULT_REPL_BACKLOG_SIZE; 1545 server.repl_backlog_histlen = 0; 1546 server.repl_backlog_idx = 0; 1547 server.repl_backlog_off = 0; 1548 server.repl_backlog_time_limit = CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT; 1549 server.repl_no_slaves_since = time(NULL); 1550 1551 /* Client output buffer limits */ 1552 for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++) 1553 server.client_obuf_limits[j] = clientBufferLimitsDefaults[j]; 1554 1555 /* Double constants initialization */ 1556 R_Zero = 0.0; 1557 R_PosInf = 1.0/R_Zero; 1558 R_NegInf = -1.0/R_Zero; 1559 R_Nan = R_Zero/R_Zero; 1560 1561 /* Command table -- we initiialize it here as it is part of the 1562 * initial configuration, since command names may be changed via 1563 * redis.conf using the rename-command directive. */ 1564 server.commands = dictCreate(&commandTableDictType,NULL); 1565 server.orig_commands = dictCreate(&commandTableDictType,NULL); 1566 populateCommandTable(); 1567 server.delCommand = lookupCommandByCString("del"); 1568 server.multiCommand = lookupCommandByCString("multi"); 1569 server.lpushCommand = lookupCommandByCString("lpush"); 1570 server.lpopCommand = lookupCommandByCString("lpop"); 1571 server.rpopCommand = lookupCommandByCString("rpop"); 1572 server.sremCommand = lookupCommandByCString("srem"); 1573 server.execCommand = lookupCommandByCString("exec"); 1574 1575 /* Slow log */ 1576 server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN; 1577 server.slowlog_max_len = CONFIG_DEFAULT_SLOWLOG_MAX_LEN; 1578 1579 /* Latency monitor */ 1580 server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD; 1581 1582 /* Debugging */ 1583 server.assert_failed = "<no assertion failed>"; 1584 server.assert_file = "<no file>"; 1585 server.assert_line = 0; 1586 server.bug_report_start = 0; 1587 server.watchdog_period = 0; 1588 } 1589 1590 extern char **environ; 1591 1592 /* Restart the server, executing the same executable that started this 1593 * instance, with the same arguments and configuration file. 1594 * 1595 * The function is designed to directly call execve() so that the new 1596 * server instance will retain the PID of the previous one. 1597 * 1598 * The list of flags, that may be bitwise ORed together, alter the 1599 * behavior of this function: 1600 * 1601 * RESTART_SERVER_NONE No flags. 1602 * RESTART_SERVER_GRACEFULLY Do a proper shutdown before restarting. 1603 * RESTART_SERVER_CONFIG_REWRITE Rewrite the config file before restarting. 1604 * 1605 * On success the function does not return, because the process turns into 1606 * a different process. On error C_ERR is returned. */ 1607 int restartServer(int flags, mstime_t delay) { 1608 int j; 1609 1610 /* Check if we still have accesses to the executable that started this 1611 * server instance. */ 1612 if (access(server.executable,X_OK) == -1) return C_ERR; 1613 1614 /* Config rewriting. */ 1615 if (flags & RESTART_SERVER_CONFIG_REWRITE && 1616 server.configfile && 1617 rewriteConfig(server.configfile) == -1) return C_ERR; 1618 1619 /* Perform a proper shutdown. */ 1620 if (flags & RESTART_SERVER_GRACEFULLY && 1621 prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK) return C_ERR; 1622 1623 /* Close all file descriptors, with the exception of stdin, stdout, strerr 1624 * which are useful if we restart a Redis server which is not daemonized. */ 1625 for (j = 3; j < (int)server.maxclients + 1024; j++) close(j); 1626 1627 /* Execute the server with the original command line. */ 1628 if (delay) usleep(delay*1000); 1629 execve(server.executable,server.exec_argv,environ); 1630 1631 /* If an error occurred here, there is nothing we can do, but exit. */ 1632 _exit(1); 1633 1634 return C_ERR; /* Never reached. */ 1635 } 1636 1637 /* This function will try to raise the max number of open files accordingly to 1638 * the configured max number of clients. It also reserves a number of file 1639 * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of 1640 * persistence, listening sockets, log files and so forth. 1641 * 1642 * If it will not be possible to set the limit accordingly to the configured 1643 * max number of clients, the function will do the reverse setting 1644 * server.maxclients to the value that we can actually handle. */ 1645 void adjustOpenFilesLimit(void) { 1646 rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS; 1647 struct rlimit limit; 1648 1649 if (getrlimit(RLIMIT_NOFILE,&limit) == -1) { 1650 serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.", 1651 strerror(errno)); 1652 server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS; 1653 } else { 1654 rlim_t oldlimit = limit.rlim_cur; 1655 1656 /* Set the max number of files if the current limit is not enough 1657 * for our needs. */ 1658 if (oldlimit < maxfiles) { 1659 rlim_t bestlimit; 1660 int setrlimit_error = 0; 1661 1662 /* Try to set the file limit to match 'maxfiles' or at least 1663 * to the higher value supported less than maxfiles. */ 1664 bestlimit = maxfiles; 1665 while(bestlimit > oldlimit) { 1666 rlim_t decr_step = 16; 1667 1668 limit.rlim_cur = bestlimit; 1669 limit.rlim_max = bestlimit; 1670 if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break; 1671 setrlimit_error = errno; 1672 1673 /* We failed to set file limit to 'bestlimit'. Try with a 1674 * smaller limit decrementing by a few FDs per iteration. */ 1675 if (bestlimit < decr_step) break; 1676 bestlimit -= decr_step; 1677 } 1678 1679 /* Assume that the limit we get initially is still valid if 1680 * our last try was even lower. */ 1681 if (bestlimit < oldlimit) bestlimit = oldlimit; 1682 1683 if (bestlimit < maxfiles) { 1684 int old_maxclients = server.maxclients; 1685 server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS; 1686 if (server.maxclients < 1) { 1687 serverLog(LL_WARNING,"Your current 'ulimit -n' " 1688 "of %llu is not enough for the server to start. " 1689 "Please increase your open file limit to at least " 1690 "%llu. Exiting.", 1691 (unsigned long long) oldlimit, 1692 (unsigned long long) maxfiles); 1693 exit(1); 1694 } 1695 serverLog(LL_WARNING,"You requested maxclients of %d " 1696 "requiring at least %llu max file descriptors.", 1697 old_maxclients, 1698 (unsigned long long) maxfiles); 1699 serverLog(LL_WARNING,"Server can't set maximum open files " 1700 "to %llu because of OS error: %s.", 1701 (unsigned long long) maxfiles, strerror(setrlimit_error)); 1702 serverLog(LL_WARNING,"Current maximum open files is %llu. " 1703 "maxclients has been reduced to %d to compensate for " 1704 "low ulimit. " 1705 "If you need higher maxclients increase 'ulimit -n'.", 1706 (unsigned long long) bestlimit, server.maxclients); 1707 } else { 1708 serverLog(LL_NOTICE,"Increased maximum number of open files " 1709 "to %llu (it was originally set to %llu).", 1710 (unsigned long long) maxfiles, 1711 (unsigned long long) oldlimit); 1712 } 1713 } 1714 } 1715 } 1716 1717 /* Check that server.tcp_backlog can be actually enforced in Linux according 1718 * to the value of /proc/sys/net/core/somaxconn, or warn about it. */ 1719 void checkTcpBacklogSettings(void) { 1720 #ifdef HAVE_PROC_SOMAXCONN 1721 FILE *fp = fopen("/proc/sys/net/core/somaxconn","r"); 1722 char buf[1024]; 1723 if (!fp) return; 1724 if (fgets(buf,sizeof(buf),fp) != NULL) { 1725 int somaxconn = atoi(buf); 1726 if (somaxconn > 0 && somaxconn < server.tcp_backlog) { 1727 serverLog(LL_WARNING,"WARNING: The TCP backlog setting of %d cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of %d.", server.tcp_backlog, somaxconn); 1728 } 1729 } 1730 fclose(fp); 1731 #endif 1732 } 1733 1734 /* Initialize a set of file descriptors to listen to the specified 'port' 1735 * binding the addresses specified in the Redis server configuration. 1736 * 1737 * The listening file descriptors are stored in the integer array 'fds' 1738 * and their number is set in '*count'. 1739 * 1740 * The addresses to bind are specified in the global server.bindaddr array 1741 * and their number is server.bindaddr_count. If the server configuration 1742 * contains no specific addresses to bind, this function will try to 1743 * bind * (all addresses) for both the IPv4 and IPv6 protocols. 1744 * 1745 * On success the function returns C_OK. 1746 * 1747 * On error the function returns C_ERR. For the function to be on 1748 * error, at least one of the server.bindaddr addresses was 1749 * impossible to bind, or no bind addresses were specified in the server 1750 * configuration but the function is not able to bind * for at least 1751 * one of the IPv4 or IPv6 protocols. */ 1752 int listenToPort(int port, int *fds, int *count) { 1753 int j; 1754 1755 /* Force binding of 0.0.0.0 if no bind address is specified, always 1756 * entering the loop if j == 0. */ 1757 if (server.bindaddr_count == 0) server.bindaddr[0] = NULL; 1758 for (j = 0; j < server.bindaddr_count || j == 0; j++) { 1759 if (server.bindaddr[j] == NULL) { 1760 /* Bind * for both IPv6 and IPv4, we enter here only if 1761 * server.bindaddr_count == 0. */ 1762 fds[*count] = anetTcp6Server(server.neterr,port,NULL, 1763 server.tcp_backlog); 1764 if (fds[*count] != ANET_ERR) { 1765 anetNonBlock(NULL,fds[*count]); 1766 (*count)++; 1767 1768 /* Bind the IPv4 address as well. */ 1769 fds[*count] = anetTcpServer(server.neterr,port,NULL, 1770 server.tcp_backlog); 1771 if (fds[*count] != ANET_ERR) { 1772 anetNonBlock(NULL,fds[*count]); 1773 (*count)++; 1774 } 1775 } 1776 /* Exit the loop if we were able to bind * on IPv4 and IPv6, 1777 * otherwise fds[*count] will be ANET_ERR and we'll print an 1778 * error and return to the caller with an error. */ 1779 if (*count == 2) break; 1780 } else if (strchr(server.bindaddr[j],':')) { 1781 /* Bind IPv6 address. */ 1782 fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], 1783 server.tcp_backlog); 1784 } else { 1785 /* Bind IPv4 address. */ 1786 fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], 1787 server.tcp_backlog); 1788 } 1789 if (fds[*count] == ANET_ERR) { 1790 serverLog(LL_WARNING, 1791 "Creating Server TCP listening socket %s:%d: %s", 1792 server.bindaddr[j] ? server.bindaddr[j] : "*", 1793 port, server.neterr); 1794 return C_ERR; 1795 } 1796 anetNonBlock(NULL,fds[*count]); 1797 (*count)++; 1798 } 1799 return C_OK; 1800 } 1801 1802 /* Resets the stats that we expose via INFO or other means that we want 1803 * to reset via CONFIG RESETSTAT. The function is also used in order to 1804 * initialize these fields in initServer() at server startup. */ 1805 void resetServerStats(void) { 1806 int j; 1807 1808 server.stat_numcommands = 0; 1809 server.stat_numconnections = 0; 1810 server.stat_expiredkeys = 0; 1811 server.stat_evictedkeys = 0; 1812 server.stat_keyspace_misses = 0; 1813 server.stat_keyspace_hits = 0; 1814 server.stat_fork_time = 0; 1815 server.stat_fork_rate = 0; 1816 server.stat_rejected_conn = 0; 1817 server.stat_sync_full = 0; 1818 server.stat_sync_partial_ok = 0; 1819 server.stat_sync_partial_err = 0; 1820 for (j = 0; j < STATS_METRIC_COUNT; j++) { 1821 server.inst_metric[j].idx = 0; 1822 server.inst_metric[j].last_sample_time = mstime(); 1823 server.inst_metric[j].last_sample_count = 0; 1824 memset(server.inst_metric[j].samples,0, 1825 sizeof(server.inst_metric[j].samples)); 1826 } 1827 server.stat_net_input_bytes = 0; 1828 server.stat_net_output_bytes = 0; 1829 server.aof_delayed_fsync = 0; 1830 } 1831 1832 void initServer(void) { 1833 int j; 1834 1835 signal(SIGHUP, SIG_IGN); 1836 signal(SIGPIPE, SIG_IGN); 1837 setupSignalHandlers(); 1838 1839 if (server.syslog_enabled) { 1840 openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT, 1841 server.syslog_facility); 1842 } 1843 1844 server.pid = getpid(); 1845 server.current_client = NULL; 1846 server.clients = listCreate(); 1847 server.clients_to_close = listCreate(); 1848 server.slaves = listCreate(); 1849 server.monitors = listCreate(); 1850 server.clients_pending_write = listCreate(); 1851 server.slaveseldb = -1; /* Force to emit the first SELECT command. */ 1852 server.unblocked_clients = listCreate(); 1853 server.ready_keys = listCreate(); 1854 server.clients_waiting_acks = listCreate(); 1855 server.get_ack_from_slaves = 0; 1856 server.clients_paused = 0; 1857 server.system_memory_size = zmalloc_get_memory_size(); 1858 1859 createSharedObjects(); 1860 adjustOpenFilesLimit(); 1861 server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); 1862 server.db = zmalloc(sizeof(redisDb)*server.dbnum); 1863 1864 /* Open the TCP listening socket for the user commands. */ 1865 if (server.port != 0 && 1866 listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) 1867 exit(1); 1868 1869 /* Open the listening Unix domain socket. */ 1870 if (server.unixsocket != NULL) { 1871 unlink(server.unixsocket); /* don't care if this fails */ 1872 server.sofd = anetUnixServer(server.neterr,server.unixsocket, 1873 server.unixsocketperm, server.tcp_backlog); 1874 if (server.sofd == ANET_ERR) { 1875 serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr); 1876 exit(1); 1877 } 1878 anetNonBlock(NULL,server.sofd); 1879 } 1880 1881 /* Abort if there are no listening sockets at all. */ 1882 if (server.ipfd_count == 0 && server.sofd < 0) { 1883 serverLog(LL_WARNING, "Configured to not listen anywhere, exiting."); 1884 exit(1); 1885 } 1886 1887 /* Create the Redis databases, and initialize other internal state. */ 1888 for (j = 0; j < server.dbnum; j++) { 1889 server.db[j].dict = dictCreate(&dbDictType,NULL); 1890 server.db[j].expires = dictCreate(&keyptrDictType,NULL); 1891 server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); 1892 server.db[j].ready_keys = dictCreate(&setDictType,NULL); 1893 server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); 1894 server.db[j].eviction_pool = evictionPoolAlloc(); 1895 server.db[j].id = j; 1896 server.db[j].avg_ttl = 0; 1897 } 1898 server.pubsub_channels = dictCreate(&keylistDictType,NULL); 1899 server.pubsub_patterns = listCreate(); 1900 listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); 1901 listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern); 1902 server.cronloops = 0; 1903 server.rdb_child_pid = -1; 1904 server.aof_child_pid = -1; 1905 server.rdb_child_type = RDB_CHILD_TYPE_NONE; 1906 aofRewriteBufferReset(); 1907 server.aof_buf = sdsempty(); 1908 server.lastsave = time(NULL); /* At startup we consider the DB saved. */ 1909 server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */ 1910 server.rdb_save_time_last = -1; 1911 server.rdb_save_time_start = -1; 1912 server.dirty = 0; 1913 resetServerStats(); 1914 /* A few stats we don't want to reset: server startup time, and peak mem. */ 1915 server.stat_starttime = time(NULL); 1916 server.stat_peak_memory = 0; 1917 server.resident_set_size = 0; 1918 server.lastbgsave_status = C_OK; 1919 server.aof_last_write_status = C_OK; 1920 server.aof_last_write_errno = 0; 1921 server.repl_good_slaves_count = 0; 1922 updateCachedTime(); 1923 1924 /* Create the serverCron() time event, that's our main way to process 1925 * background operations. */ 1926 if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { 1927 serverPanic("Can't create the serverCron time event."); 1928 exit(1); 1929 } 1930 1931 /* Create an event handler for accepting new connections in TCP and Unix 1932 * domain sockets. */ 1933 for (j = 0; j < server.ipfd_count; j++) { 1934 if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, 1935 acceptTcpHandler,NULL) == AE_ERR) 1936 { 1937 serverPanic( 1938 "Unrecoverable error creating server.ipfd file event."); 1939 } 1940 } 1941 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, 1942 acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); 1943 1944 /* Open the AOF file if needed. */ 1945 if (server.aof_state == AOF_ON) { 1946 server.aof_fd = open(server.aof_filename, 1947 O_WRONLY|O_APPEND|O_CREAT,0644); 1948 if (server.aof_fd == -1) { 1949 serverLog(LL_WARNING, "Can't open the append-only file: %s", 1950 strerror(errno)); 1951 exit(1); 1952 } 1953 } 1954 1955 /* 32 bit instances are limited to 4GB of address space, so if there is 1956 * no explicit limit in the user provided configuration we set a limit 1957 * at 3 GB using maxmemory with 'noeviction' policy'. This avoids 1958 * useless crashes of the Redis instance for out of memory. */ 1959 if (server.arch_bits == 32 && server.maxmemory == 0) { 1960 serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now."); 1961 server.maxmemory = 3072LL*(1024*1024); /* 3 GB */ 1962 server.maxmemory_policy = MAXMEMORY_NO_EVICTION; 1963 } 1964 1965 if (server.cluster_enabled) clusterInit(); 1966 replicationScriptCacheInit(); 1967 scriptingInit(1); 1968 slowlogInit(); 1969 latencyMonitorInit(); 1970 bioInit(); 1971 } 1972 1973 /* Populates the Redis Command Table starting from the hard coded list 1974 * we have on top of redis.c file. */ 1975 void populateCommandTable(void) { 1976 int j; 1977 int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); 1978 1979 for (j = 0; j < numcommands; j++) { 1980 struct redisCommand *c = redisCommandTable+j; 1981 char *f = c->sflags; 1982 int retval1, retval2; 1983 1984 while(*f != '\0') { 1985 switch(*f) { 1986 case 'w': c->flags |= CMD_WRITE; break; 1987 case 'r': c->flags |= CMD_READONLY; break; 1988 case 'm': c->flags |= CMD_DENYOOM; break; 1989 case 'a': c->flags |= CMD_ADMIN; break; 1990 case 'p': c->flags |= CMD_PUBSUB; break; 1991 case 's': c->flags |= CMD_NOSCRIPT; break; 1992 case 'R': c->flags |= CMD_RANDOM; break; 1993 case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break; 1994 case 'l': c->flags |= CMD_LOADING; break; 1995 case 't': c->flags |= CMD_STALE; break; 1996 case 'M': c->flags |= CMD_SKIP_MONITOR; break; 1997 case 'k': c->flags |= CMD_ASKING; break; 1998 case 'F': c->flags |= CMD_FAST; break; 1999 default: serverPanic("Unsupported command flag"); break; 2000 } 2001 f++; 2002 } 2003 2004 retval1 = dictAdd(server.commands, sdsnew(c->name), c); 2005 /* Populate an additional dictionary that will be unaffected 2006 * by rename-command statements in redis.conf. */ 2007 retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c); 2008 serverAssert(retval1 == DICT_OK && retval2 == DICT_OK); 2009 } 2010 } 2011 2012 void resetCommandTableStats(void) { 2013 int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); 2014 int j; 2015 2016 for (j = 0; j < numcommands; j++) { 2017 struct redisCommand *c = redisCommandTable+j; 2018 2019 c->microseconds = 0; 2020 c->calls = 0; 2021 } 2022 } 2023 2024 /* ========================== Redis OP Array API ============================ */ 2025 2026 void redisOpArrayInit(redisOpArray *oa) { 2027 oa->ops = NULL; 2028 oa->numops = 0; 2029 } 2030 2031 int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid, 2032 robj **argv, int argc, int target) 2033 { 2034 redisOp *op; 2035 2036 oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1)); 2037 op = oa->ops+oa->numops; 2038 op->cmd = cmd; 2039 op->dbid = dbid; 2040 op->argv = argv; 2041 op->argc = argc; 2042 op->target = target; 2043 oa->numops++; 2044 return oa->numops; 2045 } 2046 2047 void redisOpArrayFree(redisOpArray *oa) { 2048 while(oa->numops) { 2049 int j; 2050 redisOp *op; 2051 2052 oa->numops--; 2053 op = oa->ops+oa->numops; 2054 for (j = 0; j < op->argc; j++) 2055 decrRefCount(op->argv[j]); 2056 zfree(op->argv); 2057 } 2058 zfree(oa->ops); 2059 } 2060 2061 /* ====================== Commands lookup and execution ===================== */ 2062 2063 struct redisCommand *lookupCommand(sds name) { 2064 return dictFetchValue(server.commands, name); 2065 } 2066 2067 struct redisCommand *lookupCommandByCString(char *s) { 2068 struct redisCommand *cmd; 2069 sds name = sdsnew(s); 2070 2071 cmd = dictFetchValue(server.commands, name); 2072 sdsfree(name); 2073 return cmd; 2074 } 2075 2076 /* Lookup the command in the current table, if not found also check in 2077 * the original table containing the original command names unaffected by 2078 * redis.conf rename-command statement. 2079 * 2080 * This is used by functions rewriting the argument vector such as 2081 * rewriteClientCommandVector() in order to set client->cmd pointer 2082 * correctly even if the command was renamed. */ 2083 struct redisCommand *lookupCommandOrOriginal(sds name) { 2084 struct redisCommand *cmd = dictFetchValue(server.commands, name); 2085 2086 if (!cmd) cmd = dictFetchValue(server.orig_commands,name); 2087 return cmd; 2088 } 2089 2090 /* Propagate the specified command (in the context of the specified database id) 2091 * to AOF and Slaves. 2092 * 2093 * flags are an xor between: 2094 * + PROPAGATE_NONE (no propagation of command at all) 2095 * + PROPAGATE_AOF (propagate into the AOF file if is enabled) 2096 * + PROPAGATE_REPL (propagate into the replication link) 2097 * 2098 * This should not be used inside commands implementation. Use instead 2099 * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation(). 2100 */ 2101 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, 2102 int flags) 2103 { 2104 if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) 2105 feedAppendOnlyFile(cmd,dbid,argv,argc); 2106 if (flags & PROPAGATE_REPL) 2107 replicationFeedSlaves(server.slaves,dbid,argv,argc); 2108 } 2109 2110 /* Used inside commands to schedule the propagation of additional commands 2111 * after the current command is propagated to AOF / Replication. 2112 * 2113 * 'cmd' must be a pointer to the Redis command to replicate, dbid is the 2114 * database ID the command should be propagated into. 2115 * Arguments of the command to propagte are passed as an array of redis 2116 * objects pointers of len 'argc', using the 'argv' vector. 2117 * 2118 * The function does not take a reference to the passed 'argv' vector, 2119 * so it is up to the caller to release the passed argv (but it is usually 2120 * stack allocated). The function autoamtically increments ref count of 2121 * passed objects, so the caller does not need to. */ 2122 void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, 2123 int target) 2124 { 2125 robj **argvcopy; 2126 int j; 2127 2128 if (server.loading) return; /* No propagation during loading. */ 2129 2130 argvcopy = zmalloc(sizeof(robj*)*argc); 2131 for (j = 0; j < argc; j++) { 2132 argvcopy[j] = argv[j]; 2133 incrRefCount(argv[j]); 2134 } 2135 redisOpArrayAppend(&server.also_propagate,cmd,dbid,argvcopy,argc,target); 2136 } 2137 2138 /* It is possible to call the function forceCommandPropagation() inside a 2139 * Redis command implementation in order to to force the propagation of a 2140 * specific command execution into AOF / Replication. */ 2141 void forceCommandPropagation(client *c, int flags) { 2142 if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL; 2143 if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF; 2144 } 2145 2146 /* Avoid that the executed command is propagated at all. This way we 2147 * are free to just propagate what we want using the alsoPropagate() 2148 * API. */ 2149 void preventCommandPropagation(client *c) { 2150 c->flags |= CLIENT_PREVENT_PROP; 2151 } 2152 2153 /* AOF specific version of preventCommandPropagation(). */ 2154 void preventCommandAOF(client *c) { 2155 c->flags |= CLIENT_PREVENT_AOF_PROP; 2156 } 2157 2158 /* Replication specific version of preventCommandPropagation(). */ 2159 void preventCommandReplication(client *c) { 2160 c->flags |= CLIENT_PREVENT_REPL_PROP; 2161 } 2162 2163 /* Call() is the core of Redis execution of a command. 2164 * 2165 * The following flags can be passed: 2166 * CMD_CALL_NONE No flags. 2167 * CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed. 2168 * CMD_CALL_STATS Populate command stats. 2169 * CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset 2170 * or if the client flags are forcing propagation. 2171 * CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset 2172 * or if the client flags are forcing propagation. 2173 * CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL. 2174 * CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE. 2175 * 2176 * The exact propagation behavior depends on the client flags. 2177 * Specifically: 2178 * 2179 * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set 2180 * and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set 2181 * in the call flags, then the command is propagated even if the 2182 * dataset was not affected by the command. 2183 * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP 2184 * are set, the propagation into AOF or to slaves is not performed even 2185 * if the command modified the dataset. 2186 * 2187 * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF 2188 * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or 2189 * slaves propagation will never occur. 2190 * 2191 * Client flags are modified by the implementation of a given command 2192 * using the following API: 2193 * 2194 * forceCommandPropagation(client *c, int flags); 2195 * preventCommandPropagation(client *c); 2196 * preventCommandAOF(client *c); 2197 * preventCommandReplication(client *c); 2198 * 2199 */ 2200 void call(client *c, int flags) { 2201 long long dirty, start, duration; 2202 int client_old_flags = c->flags; 2203 2204 /* Sent the command to clients in MONITOR mode, only if the commands are 2205 * not generated from reading an AOF. */ 2206 if (listLength(server.monitors) && 2207 !server.loading && 2208 !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) 2209 { 2210 replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); 2211 } 2212 2213 /* Initialization: clear the flags that must be set by the command on 2214 * demand, and initialize the array for additional commands propagation. */ 2215 c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); 2216 redisOpArrayInit(&server.also_propagate); 2217 2218 /* Call the command. */ 2219 dirty = server.dirty; 2220 start = ustime(); 2221 c->cmd->proc(c); 2222 duration = ustime()-start; 2223 dirty = server.dirty-dirty; 2224 if (dirty < 0) dirty = 0; 2225 2226 /* When EVAL is called loading the AOF we don't want commands called 2227 * from Lua to go into the slowlog or to populate statistics. */ 2228 if (server.loading && c->flags & CLIENT_LUA) 2229 flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS); 2230 2231 /* If the caller is Lua, we want to force the EVAL caller to propagate 2232 * the script if the command flag or client flag are forcing the 2233 * propagation. */ 2234 if (c->flags & CLIENT_LUA && server.lua_caller) { 2235 if (c->flags & CLIENT_FORCE_REPL) 2236 server.lua_caller->flags |= CLIENT_FORCE_REPL; 2237 if (c->flags & CLIENT_FORCE_AOF) 2238 server.lua_caller->flags |= CLIENT_FORCE_AOF; 2239 } 2240 2241 /* Log the command into the Slow log if needed, and populate the 2242 * per-command statistics that we show in INFO commandstats. */ 2243 if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) { 2244 char *latency_event = (c->cmd->flags & CMD_FAST) ? 2245 "fast-command" : "command"; 2246 latencyAddSampleIfNeeded(latency_event,duration/1000); 2247 slowlogPushEntryIfNeeded(c->argv,c->argc,duration); 2248 } 2249 if (flags & CMD_CALL_STATS) { 2250 c->lastcmd->microseconds += duration; 2251 c->lastcmd->calls++; 2252 } 2253 2254 /* Propagate the command into the AOF and replication link */ 2255 if (flags & CMD_CALL_PROPAGATE && 2256 (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) 2257 { 2258 int propagate_flags = PROPAGATE_NONE; 2259 2260 /* Check if the command operated changes in the data set. If so 2261 * set for replication / AOF propagation. */ 2262 if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); 2263 2264 /* If the client forced AOF / replication of the command, set 2265 * the flags regardless of the command effects on the data set. */ 2266 if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL; 2267 if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF; 2268 2269 /* However prevent AOF / replication propagation if the command 2270 * implementatino called preventCommandPropagation() or similar, 2271 * or if we don't have the call() flags to do so. */ 2272 if (c->flags & CLIENT_PREVENT_REPL_PROP || 2273 !(flags & CMD_CALL_PROPAGATE_REPL)) 2274 propagate_flags &= ~PROPAGATE_REPL; 2275 if (c->flags & CLIENT_PREVENT_AOF_PROP || 2276 !(flags & CMD_CALL_PROPAGATE_AOF)) 2277 propagate_flags &= ~PROPAGATE_AOF; 2278 2279 /* Call propagate() only if at least one of AOF / replication 2280 * propagation is needed. */ 2281 if (propagate_flags != PROPAGATE_NONE) 2282 propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); 2283 } 2284 2285 /* Restore the old replication flags, since call() can be executed 2286 * recursively. */ 2287 c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); 2288 c->flags |= client_old_flags & 2289 (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); 2290 2291 /* Handle the alsoPropagate() API to handle commands that want to propagate 2292 * multiple separated commands. Note that alsoPropagate() is not affected 2293 * by CLIENT_PREVENT_PROP flag. */ 2294 if (server.also_propagate.numops) { 2295 int j; 2296 redisOp *rop; 2297 2298 if (flags & CMD_CALL_PROPAGATE) { 2299 for (j = 0; j < server.also_propagate.numops; j++) { 2300 rop = &server.also_propagate.ops[j]; 2301 int target = rop->target; 2302 /* Whatever the command wish is, we honor the call() flags. */ 2303 if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF; 2304 if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL; 2305 if (target) 2306 propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target); 2307 } 2308 } 2309 redisOpArrayFree(&server.also_propagate); 2310 } 2311 server.stat_numcommands++; 2312 } 2313 2314 /* If this function gets called we already read a whole 2315 * command, arguments are in the client argv/argc fields. 2316 * processCommand() execute the command or prepare the 2317 * server for a bulk read from the client. 2318 * 2319 * If C_OK is returned the client is still alive and valid and 2320 * other operations can be performed by the caller. Otherwise 2321 * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ 2322 int processCommand(client *c) { 2323 /* The QUIT command is handled separately. Normal command procs will 2324 * go through checking for replication and QUIT will cause trouble 2325 * when FORCE_REPLICATION is enabled and would be implemented in 2326 * a regular command proc. */ 2327 if (!strcasecmp(c->argv[0]->ptr,"quit")) { 2328 addReply(c,shared.ok); 2329 c->flags |= CLIENT_CLOSE_AFTER_REPLY; 2330 return C_ERR; 2331 } 2332 2333 /* Now lookup the command and check ASAP about trivial error conditions 2334 * such as wrong arity, bad command name and so forth. */ 2335 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); 2336 if (!c->cmd) { 2337 flagTransaction(c); 2338 addReplyErrorFormat(c,"unknown command '%s'", 2339 (char*)c->argv[0]->ptr); 2340 return C_OK; 2341 } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || 2342 (c->argc < -c->cmd->arity)) { 2343 flagTransaction(c); 2344 addReplyErrorFormat(c,"wrong number of arguments for '%s' command", 2345 c->cmd->name); 2346 return C_OK; 2347 } 2348 2349 /* Check if the user is authenticated */ 2350 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) 2351 { 2352 flagTransaction(c); 2353 addReply(c,shared.noautherr); 2354 return C_OK; 2355 } 2356 2357 /* If cluster is enabled perform the cluster redirection here. 2358 * However we don't perform the redirection if: 2359 * 1) The sender of this command is our master. 2360 * 2) The command has no key arguments. */ 2361 if (server.cluster_enabled && 2362 !(c->flags & CLIENT_MASTER) && 2363 !(c->flags & CLIENT_LUA && 2364 server.lua_caller->flags & CLIENT_MASTER) && 2365 !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && 2366 c->cmd->proc != execCommand)) 2367 { 2368 int hashslot; 2369 int error_code; 2370 clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, 2371 &hashslot,&error_code); 2372 if (n == NULL || n != server.cluster->myself) { 2373 if (c->cmd->proc == execCommand) { 2374 discardTransaction(c); 2375 } else { 2376 flagTransaction(c); 2377 } 2378 clusterRedirectClient(c,n,hashslot,error_code); 2379 return C_OK; 2380 } 2381 } 2382 2383 /* Handle the maxmemory directive. 2384 * 2385 * First we try to free some memory if possible (if there are volatile 2386 * keys in the dataset). If there are not the only thing we can do 2387 * is returning an error. */ 2388 if (server.maxmemory) { 2389 int retval = freeMemoryIfNeeded(); 2390 /* freeMemoryIfNeeded may flush slave output buffers. This may result 2391 * into a slave, that may be the active client, to be freed. */ 2392 if (server.current_client == NULL) return C_ERR; 2393 2394 /* It was impossible to free enough memory, and the command the client 2395 * is trying to execute is denied during OOM conditions? Error. */ 2396 if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) { 2397 flagTransaction(c); 2398 addReply(c, shared.oomerr); 2399 return C_OK; 2400 } 2401 } 2402 2403 /* Don't accept write commands if there are problems persisting on disk 2404 * and if this is a master instance. */ 2405 if (((server.stop_writes_on_bgsave_err && 2406 server.saveparamslen > 0 && 2407 server.lastbgsave_status == C_ERR) || 2408 server.aof_last_write_status == C_ERR) && 2409 server.masterhost == NULL && 2410 (c->cmd->flags & CMD_WRITE || 2411 c->cmd->proc == pingCommand)) 2412 { 2413 flagTransaction(c); 2414 if (server.aof_last_write_status == C_OK) 2415 addReply(c, shared.bgsaveerr); 2416 else 2417 addReplySds(c, 2418 sdscatprintf(sdsempty(), 2419 "-MISCONF Errors writing to the AOF file: %s\r\n", 2420 strerror(server.aof_last_write_errno))); 2421 return C_OK; 2422 } 2423 2424 /* Don't accept write commands if there are not enough good slaves and 2425 * user configured the min-slaves-to-write option. */ 2426 if (server.masterhost == NULL && 2427 server.repl_min_slaves_to_write && 2428 server.repl_min_slaves_max_lag && 2429 c->cmd->flags & CMD_WRITE && 2430 server.repl_good_slaves_count < server.repl_min_slaves_to_write) 2431 { 2432 flagTransaction(c); 2433 addReply(c, shared.noreplicaserr); 2434 return C_OK; 2435 } 2436 2437 /* Don't accept write commands if this is a read only slave. But 2438 * accept write commands if this is our master. */ 2439 if (server.masterhost && server.repl_slave_ro && 2440 !(c->flags & CLIENT_MASTER) && 2441 c->cmd->flags & CMD_WRITE) 2442 { 2443 addReply(c, shared.roslaveerr); 2444 return C_OK; 2445 } 2446 2447 /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ 2448 if (c->flags & CLIENT_PUBSUB && 2449 c->cmd->proc != pingCommand && 2450 c->cmd->proc != subscribeCommand && 2451 c->cmd->proc != unsubscribeCommand && 2452 c->cmd->proc != psubscribeCommand && 2453 c->cmd->proc != punsubscribeCommand) { 2454 addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"); 2455 return C_OK; 2456 } 2457 2458 /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and 2459 * we are a slave with a broken link with master. */ 2460 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && 2461 server.repl_serve_stale_data == 0 && 2462 !(c->cmd->flags & CMD_STALE)) 2463 { 2464 flagTransaction(c); 2465 addReply(c, shared.masterdownerr); 2466 return C_OK; 2467 } 2468 2469 /* Loading DB? Return an error if the command has not the 2470 * CMD_LOADING flag. */ 2471 if (server.loading && !(c->cmd->flags & CMD_LOADING)) { 2472 addReply(c, shared.loadingerr); 2473 return C_OK; 2474 } 2475 2476 /* Lua script too slow? Only allow a limited number of commands. */ 2477 if (server.lua_timedout && 2478 c->cmd->proc != authCommand && 2479 c->cmd->proc != replconfCommand && 2480 !(c->cmd->proc == shutdownCommand && 2481 c->argc == 2 && 2482 tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && 2483 !(c->cmd->proc == scriptCommand && 2484 c->argc == 2 && 2485 tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) 2486 { 2487 flagTransaction(c); 2488 addReply(c, shared.slowscripterr); 2489 return C_OK; 2490 } 2491 2492 /* Exec the command */ 2493 if (c->flags & CLIENT_MULTI && 2494 c->cmd->proc != execCommand && c->cmd->proc != discardCommand && 2495 c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) 2496 { 2497 queueMultiCommand(c); 2498 addReply(c,shared.queued); 2499 } else { 2500 call(c,CMD_CALL_FULL); 2501 c->woff = server.master_repl_offset; 2502 if (listLength(server.ready_keys)) 2503 handleClientsBlockedOnLists(); 2504 } 2505 return C_OK; 2506 } 2507 2508 /*================================== Shutdown =============================== */ 2509 2510 /* Close listening sockets. Also unlink the unix domain socket if 2511 * unlink_unix_socket is non-zero. */ 2512 void closeListeningSockets(int unlink_unix_socket) { 2513 int j; 2514 2515 for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]); 2516 if (server.sofd != -1) close(server.sofd); 2517 if (server.cluster_enabled) 2518 for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]); 2519 if (unlink_unix_socket && server.unixsocket) { 2520 serverLog(LL_NOTICE,"Removing the unix socket file."); 2521 unlink(server.unixsocket); /* don't care if this fails */ 2522 } 2523 } 2524 2525 int prepareForShutdown(int flags) { 2526 int save = flags & SHUTDOWN_SAVE; 2527 int nosave = flags & SHUTDOWN_NOSAVE; 2528 2529 serverLog(LL_WARNING,"User requested shutdown..."); 2530 2531 /* Kill all the Lua debugger forked sessions. */ 2532 ldbKillForkedSessions(); 2533 2534 /* Kill the saving child if there is a background saving in progress. 2535 We want to avoid race conditions, for instance our saving child may 2536 overwrite the synchronous saving did by SHUTDOWN. */ 2537 if (server.rdb_child_pid != -1) { 2538 serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!"); 2539 kill(server.rdb_child_pid,SIGUSR1); 2540 rdbRemoveTempFile(server.rdb_child_pid); 2541 } 2542 2543 if (server.aof_state != AOF_OFF) { 2544 /* Kill the AOF saving child as the AOF we already have may be longer 2545 * but contains the full dataset anyway. */ 2546 if (server.aof_child_pid != -1) { 2547 /* If we have AOF enabled but haven't written the AOF yet, don't 2548 * shutdown or else the dataset will be lost. */ 2549 if (server.aof_state == AOF_WAIT_REWRITE) { 2550 serverLog(LL_WARNING, "Writing initial AOF, can't exit."); 2551 return C_ERR; 2552 } 2553 serverLog(LL_WARNING, 2554 "There is a child rewriting the AOF. Killing it!"); 2555 kill(server.aof_child_pid,SIGUSR1); 2556 } 2557 /* Append only file: fsync() the AOF and exit */ 2558 serverLog(LL_NOTICE,"Calling fsync() on the AOF file."); 2559 aof_fsync(server.aof_fd); 2560 } 2561 2562 /* Create a new RDB file before exiting. */ 2563 if ((server.saveparamslen > 0 && !nosave) || save) { 2564 serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting."); 2565 /* Snapshotting. Perform a SYNC SAVE and exit */ 2566 if (rdbSave(server.rdb_filename) != C_OK) { 2567 /* Ooops.. error saving! The best we can do is to continue 2568 * operating. Note that if there was a background saving process, 2569 * in the next cron() Redis will be notified that the background 2570 * saving aborted, handling special stuff like slaves pending for 2571 * synchronization... */ 2572 serverLog(LL_WARNING,"Error trying to save the DB, can't exit."); 2573 return C_ERR; 2574 } 2575 } 2576 2577 /* Remove the pid file if possible and needed. */ 2578 if (server.daemonize || server.pidfile) { 2579 serverLog(LL_NOTICE,"Removing the pid file."); 2580 unlink(server.pidfile); 2581 } 2582 2583 /* Best effort flush of slave output buffers, so that we hopefully 2584 * send them pending writes. */ 2585 flushSlavesOutputBuffers(); 2586 2587 /* Close the listening sockets. Apparently this allows faster restarts. */ 2588 closeListeningSockets(1); 2589 serverLog(LL_WARNING,"%s is now ready to exit, bye bye...", 2590 server.sentinel_mode ? "Sentinel" : "Redis"); 2591 return C_OK; 2592 } 2593 2594 /*================================== Commands =============================== */ 2595 2596 /* Return zero if strings are the same, non-zero if they are not. 2597 * The comparison is performed in a way that prevents an attacker to obtain 2598 * information about the nature of the strings just monitoring the execution 2599 * time of the function. 2600 * 2601 * Note that limiting the comparison length to strings up to 512 bytes we 2602 * can avoid leaking any information about the password length and any 2603 * possible branch misprediction related leak. 2604 */ 2605 int time_independent_strcmp(char *a, char *b) { 2606 char bufa[CONFIG_AUTHPASS_MAX_LEN], bufb[CONFIG_AUTHPASS_MAX_LEN]; 2607 /* The above two strlen perform len(a) + len(b) operations where either 2608 * a or b are fixed (our password) length, and the difference is only 2609 * relative to the length of the user provided string, so no information 2610 * leak is possible in the following two lines of code. */ 2611 unsigned int alen = strlen(a); 2612 unsigned int blen = strlen(b); 2613 unsigned int j; 2614 int diff = 0; 2615 2616 /* We can't compare strings longer than our static buffers. 2617 * Note that this will never pass the first test in practical circumstances 2618 * so there is no info leak. */ 2619 if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1; 2620 2621 memset(bufa,0,sizeof(bufa)); /* Constant time. */ 2622 memset(bufb,0,sizeof(bufb)); /* Constant time. */ 2623 /* Again the time of the following two copies is proportional to 2624 * len(a) + len(b) so no info is leaked. */ 2625 memcpy(bufa,a,alen); 2626 memcpy(bufb,b,blen); 2627 2628 /* Always compare all the chars in the two buffers without 2629 * conditional expressions. */ 2630 for (j = 0; j < sizeof(bufa); j++) { 2631 diff |= (bufa[j] ^ bufb[j]); 2632 } 2633 /* Length must be equal as well. */ 2634 diff |= alen ^ blen; 2635 return diff; /* If zero strings are the same. */ 2636 } 2637 2638 void authCommand(client *c) { 2639 if (!server.requirepass) { 2640 addReplyError(c,"Client sent AUTH, but no password is set"); 2641 } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) { 2642 c->authenticated = 1; 2643 addReply(c,shared.ok); 2644 } else { 2645 c->authenticated = 0; 2646 addReplyError(c,"invalid password"); 2647 } 2648 } 2649 2650 /* The PING command. It works in a different way if the client is in 2651 * in Pub/Sub mode. */ 2652 void pingCommand(client *c) { 2653 /* The command takes zero or one arguments. */ 2654 if (c->argc > 2) { 2655 addReplyErrorFormat(c,"wrong number of arguments for '%s' command", 2656 c->cmd->name); 2657 return; 2658 } 2659 2660 if (c->flags & CLIENT_PUBSUB) { 2661 addReply(c,shared.mbulkhdr[2]); 2662 addReplyBulkCBuffer(c,"pong",4); 2663 if (c->argc == 1) 2664 addReplyBulkCBuffer(c,"",0); 2665 else 2666 addReplyBulk(c,c->argv[1]); 2667 } else { 2668 if (c->argc == 1) 2669 addReply(c,shared.pong); 2670 else 2671 addReplyBulk(c,c->argv[1]); 2672 } 2673 } 2674 2675 void echoCommand(client *c) { 2676 addReplyBulk(c,c->argv[1]); 2677 } 2678 2679 void timeCommand(client *c) { 2680 struct timeval tv; 2681 2682 /* gettimeofday() can only fail if &tv is a bad address so we 2683 * don't check for errors. */ 2684 gettimeofday(&tv,NULL); 2685 addReplyMultiBulkLen(c,2); 2686 addReplyBulkLongLong(c,tv.tv_sec); 2687 addReplyBulkLongLong(c,tv.tv_usec); 2688 } 2689 2690 /* Helper function for addReplyCommand() to output flags. */ 2691 int addReplyCommandFlag(client *c, struct redisCommand *cmd, int f, char *reply) { 2692 if (cmd->flags & f) { 2693 addReplyStatus(c, reply); 2694 return 1; 2695 } 2696 return 0; 2697 } 2698 2699 /* Output the representation of a Redis command. Used by the COMMAND command. */ 2700 void addReplyCommand(client *c, struct redisCommand *cmd) { 2701 if (!cmd) { 2702 addReply(c, shared.nullbulk); 2703 } else { 2704 /* We are adding: command name, arg count, flags, first, last, offset */ 2705 addReplyMultiBulkLen(c, 6); 2706 addReplyBulkCString(c, cmd->name); 2707 addReplyLongLong(c, cmd->arity); 2708 2709 int flagcount = 0; 2710 void *flaglen = addDeferredMultiBulkLength(c); 2711 flagcount += addReplyCommandFlag(c,cmd,CMD_WRITE, "write"); 2712 flagcount += addReplyCommandFlag(c,cmd,CMD_READONLY, "readonly"); 2713 flagcount += addReplyCommandFlag(c,cmd,CMD_DENYOOM, "denyoom"); 2714 flagcount += addReplyCommandFlag(c,cmd,CMD_ADMIN, "admin"); 2715 flagcount += addReplyCommandFlag(c,cmd,CMD_PUBSUB, "pubsub"); 2716 flagcount += addReplyCommandFlag(c,cmd,CMD_NOSCRIPT, "noscript"); 2717 flagcount += addReplyCommandFlag(c,cmd,CMD_RANDOM, "random"); 2718 flagcount += addReplyCommandFlag(c,cmd,CMD_SORT_FOR_SCRIPT,"sort_for_script"); 2719 flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading"); 2720 flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale"); 2721 flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor"); 2722 flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking"); 2723 flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast"); 2724 if (cmd->getkeys_proc) { 2725 addReplyStatus(c, "movablekeys"); 2726 flagcount += 1; 2727 } 2728 setDeferredMultiBulkLength(c, flaglen, flagcount); 2729 2730 addReplyLongLong(c, cmd->firstkey); 2731 addReplyLongLong(c, cmd->lastkey); 2732 addReplyLongLong(c, cmd->keystep); 2733 } 2734 } 2735 2736 /* COMMAND <subcommand> <args> */ 2737 void commandCommand(client *c) { 2738 dictIterator *di; 2739 dictEntry *de; 2740 2741 if (c->argc == 1) { 2742 addReplyMultiBulkLen(c, dictSize(server.commands)); 2743 di = dictGetIterator(server.commands); 2744 while ((de = dictNext(di)) != NULL) { 2745 addReplyCommand(c, dictGetVal(de)); 2746 } 2747 dictReleaseIterator(di); 2748 } else if (!strcasecmp(c->argv[1]->ptr, "info")) { 2749 int i; 2750 addReplyMultiBulkLen(c, c->argc-2); 2751 for (i = 2; i < c->argc; i++) { 2752 addReplyCommand(c, dictFetchValue(server.commands, c->argv[i]->ptr)); 2753 } 2754 } else if (!strcasecmp(c->argv[1]->ptr, "count") && c->argc == 2) { 2755 addReplyLongLong(c, dictSize(server.commands)); 2756 } else if (!strcasecmp(c->argv[1]->ptr,"getkeys") && c->argc >= 3) { 2757 struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr); 2758 int *keys, numkeys, j; 2759 2760 if (!cmd) { 2761 addReplyErrorFormat(c,"Invalid command specified"); 2762 return; 2763 } else if ((cmd->arity > 0 && cmd->arity != c->argc-2) || 2764 ((c->argc-2) < -cmd->arity)) 2765 { 2766 addReplyError(c,"Invalid number of arguments specified for command"); 2767 return; 2768 } 2769 2770 keys = getKeysFromCommand(cmd,c->argv+2,c->argc-2,&numkeys); 2771 addReplyMultiBulkLen(c,numkeys); 2772 for (j = 0; j < numkeys; j++) addReplyBulk(c,c->argv[keys[j]+2]); 2773 getKeysFreeResult(keys); 2774 } else { 2775 addReplyError(c, "Unknown subcommand or wrong number of arguments."); 2776 return; 2777 } 2778 } 2779 2780 /* Convert an amount of bytes into a human readable string in the form 2781 * of 100B, 2G, 100M, 4K, and so forth. */ 2782 void bytesToHuman(char *s, unsigned long long n) { 2783 double d; 2784 2785 if (n < 1024) { 2786 /* Bytes */ 2787 sprintf(s,"%lluB",n); 2788 return; 2789 } else if (n < (1024*1024)) { 2790 d = (double)n/(1024); 2791 sprintf(s,"%.2fK",d); 2792 } else if (n < (1024LL*1024*1024)) { 2793 d = (double)n/(1024*1024); 2794 sprintf(s,"%.2fM",d); 2795 } else if (n < (1024LL*1024*1024*1024)) { 2796 d = (double)n/(1024LL*1024*1024); 2797 sprintf(s,"%.2fG",d); 2798 } else if (n < (1024LL*1024*1024*1024*1024)) { 2799 d = (double)n/(1024LL*1024*1024*1024); 2800 sprintf(s,"%.2fT",d); 2801 } else if (n < (1024LL*1024*1024*1024*1024*1024)) { 2802 d = (double)n/(1024LL*1024*1024*1024*1024); 2803 sprintf(s,"%.2fP",d); 2804 } else { 2805 /* Let's hope we never need this */ 2806 sprintf(s,"%lluB",n); 2807 } 2808 } 2809 2810 /* Create the string returned by the INFO command. This is decoupled 2811 * by the INFO command itself as we need to report the same information 2812 * on memory corruption problems. */ 2813 sds genRedisInfoString(char *section) { 2814 sds info = sdsempty(); 2815 time_t uptime = server.unixtime-server.stat_starttime; 2816 int j, numcommands; 2817 struct rusage self_ru, c_ru; 2818 unsigned long lol, bib; 2819 int allsections = 0, defsections = 0; 2820 int sections = 0; 2821 2822 if (section == NULL) section = "default"; 2823 allsections = strcasecmp(section,"all") == 0; 2824 defsections = strcasecmp(section,"default") == 0; 2825 2826 getrusage(RUSAGE_SELF, &self_ru); 2827 getrusage(RUSAGE_CHILDREN, &c_ru); 2828 getClientsMaxBuffers(&lol,&bib); 2829 2830 /* Server */ 2831 if (allsections || defsections || !strcasecmp(section,"server")) { 2832 static int call_uname = 1; 2833 static struct utsname name; 2834 char *mode; 2835 2836 if (server.cluster_enabled) mode = "cluster"; 2837 else if (server.sentinel_mode) mode = "sentinel"; 2838 else mode = "standalone"; 2839 2840 if (sections++) info = sdscat(info,"\r\n"); 2841 2842 if (call_uname) { 2843 /* Uname can be slow and is always the same output. Cache it. */ 2844 uname(&name); 2845 call_uname = 0; 2846 } 2847 2848 info = sdscatprintf(info, 2849 "# Server\r\n" 2850 "redis_version:%s\r\n" 2851 "redis_git_sha1:%s\r\n" 2852 "redis_git_dirty:%d\r\n" 2853 "redis_build_id:%llx\r\n" 2854 "redis_mode:%s\r\n" 2855 "os:%s %s %s\r\n" 2856 "arch_bits:%d\r\n" 2857 "multiplexing_api:%s\r\n" 2858 "gcc_version:%d.%d.%d\r\n" 2859 "process_id:%ld\r\n" 2860 "run_id:%s\r\n" 2861 "tcp_port:%d\r\n" 2862 "uptime_in_seconds:%jd\r\n" 2863 "uptime_in_days:%jd\r\n" 2864 "hz:%d\r\n" 2865 "lru_clock:%ld\r\n" 2866 "executable:%s\r\n" 2867 "config_file:%s\r\n", 2868 REDIS_VERSION, 2869 redisGitSHA1(), 2870 strtol(redisGitDirty(),NULL,10) > 0, 2871 (unsigned long long) redisBuildId(), 2872 mode, 2873 name.sysname, name.release, name.machine, 2874 server.arch_bits, 2875 aeGetApiName(), 2876 #ifdef __GNUC__ 2877 __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__, 2878 #else 2879 0,0,0, 2880 #endif 2881 (long) getpid(), 2882 server.runid, 2883 server.port, 2884 (intmax_t)uptime, 2885 (intmax_t)(uptime/(3600*24)), 2886 server.hz, 2887 (unsigned long) server.lruclock, 2888 server.executable ? server.executable : "", 2889 server.configfile ? server.configfile : ""); 2890 } 2891 2892 /* Clients */ 2893 if (allsections || defsections || !strcasecmp(section,"clients")) { 2894 if (sections++) info = sdscat(info,"\r\n"); 2895 info = sdscatprintf(info, 2896 "# Clients\r\n" 2897 "connected_clients:%lu\r\n" 2898 "client_longest_output_list:%lu\r\n" 2899 "client_biggest_input_buf:%lu\r\n" 2900 "blocked_clients:%d\r\n", 2901 listLength(server.clients)-listLength(server.slaves), 2902 lol, bib, 2903 server.bpop_blocked_clients); 2904 } 2905 2906 /* Memory */ 2907 if (allsections || defsections || !strcasecmp(section,"memory")) { 2908 char hmem[64]; 2909 char peak_hmem[64]; 2910 char total_system_hmem[64]; 2911 char used_memory_lua_hmem[64]; 2912 char used_memory_rss_hmem[64]; 2913 char maxmemory_hmem[64]; 2914 size_t zmalloc_used = zmalloc_used_memory(); 2915 size_t total_system_mem = server.system_memory_size; 2916 const char *evict_policy = evictPolicyToString(); 2917 long long memory_lua = (long long)lua_gc(server.lua,LUA_GCCOUNT,0)*1024; 2918 2919 /* Peak memory is updated from time to time by serverCron() so it 2920 * may happen that the instantaneous value is slightly bigger than 2921 * the peak value. This may confuse users, so we update the peak 2922 * if found smaller than the current memory usage. */ 2923 if (zmalloc_used > server.stat_peak_memory) 2924 server.stat_peak_memory = zmalloc_used; 2925 2926 bytesToHuman(hmem,zmalloc_used); 2927 bytesToHuman(peak_hmem,server.stat_peak_memory); 2928 bytesToHuman(total_system_hmem,total_system_mem); 2929 bytesToHuman(used_memory_lua_hmem,memory_lua); 2930 bytesToHuman(used_memory_rss_hmem,server.resident_set_size); 2931 bytesToHuman(maxmemory_hmem,server.maxmemory); 2932 2933 if (sections++) info = sdscat(info,"\r\n"); 2934 info = sdscatprintf(info, 2935 "# Memory\r\n" 2936 "used_memory:%zu\r\n" 2937 "used_memory_human:%s\r\n" 2938 "used_memory_rss:%zu\r\n" 2939 "used_memory_rss_human:%s\r\n" 2940 "used_memory_peak:%zu\r\n" 2941 "used_memory_peak_human:%s\r\n" 2942 "total_system_memory:%lu\r\n" 2943 "total_system_memory_human:%s\r\n" 2944 "used_memory_lua:%lld\r\n" 2945 "used_memory_lua_human:%s\r\n" 2946 "maxmemory:%lld\r\n" 2947 "maxmemory_human:%s\r\n" 2948 "maxmemory_policy:%s\r\n" 2949 "mem_fragmentation_ratio:%.2f\r\n" 2950 "mem_allocator:%s\r\n", 2951 zmalloc_used, 2952 hmem, 2953 server.resident_set_size, 2954 used_memory_rss_hmem, 2955 server.stat_peak_memory, 2956 peak_hmem, 2957 (unsigned long)total_system_mem, 2958 total_system_hmem, 2959 memory_lua, 2960 used_memory_lua_hmem, 2961 server.maxmemory, 2962 maxmemory_hmem, 2963 evict_policy, 2964 zmalloc_get_fragmentation_ratio(server.resident_set_size), 2965 ZMALLOC_LIB 2966 ); 2967 } 2968 2969 /* Persistence */ 2970 if (allsections || defsections || !strcasecmp(section,"persistence")) { 2971 if (sections++) info = sdscat(info,"\r\n"); 2972 info = sdscatprintf(info, 2973 "# Persistence\r\n" 2974 "loading:%d\r\n" 2975 "rdb_changes_since_last_save:%lld\r\n" 2976 "rdb_bgsave_in_progress:%d\r\n" 2977 "rdb_last_save_time:%jd\r\n" 2978 "rdb_last_bgsave_status:%s\r\n" 2979 "rdb_last_bgsave_time_sec:%jd\r\n" 2980 "rdb_current_bgsave_time_sec:%jd\r\n" 2981 "aof_enabled:%d\r\n" 2982 "aof_rewrite_in_progress:%d\r\n" 2983 "aof_rewrite_scheduled:%d\r\n" 2984 "aof_last_rewrite_time_sec:%jd\r\n" 2985 "aof_current_rewrite_time_sec:%jd\r\n" 2986 "aof_last_bgrewrite_status:%s\r\n" 2987 "aof_last_write_status:%s\r\n", 2988 server.loading, 2989 server.dirty, 2990 server.rdb_child_pid != -1, 2991 (intmax_t)server.lastsave, 2992 (server.lastbgsave_status == C_OK) ? "ok" : "err", 2993 (intmax_t)server.rdb_save_time_last, 2994 (intmax_t)((server.rdb_child_pid == -1) ? 2995 -1 : time(NULL)-server.rdb_save_time_start), 2996 server.aof_state != AOF_OFF, 2997 server.aof_child_pid != -1, 2998 server.aof_rewrite_scheduled, 2999 (intmax_t)server.aof_rewrite_time_last, 3000 (intmax_t)((server.aof_child_pid == -1) ? 3001 -1 : time(NULL)-server.aof_rewrite_time_start), 3002 (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err", 3003 (server.aof_last_write_status == C_OK) ? "ok" : "err"); 3004 3005 if (server.aof_state != AOF_OFF) { 3006 info = sdscatprintf(info, 3007 "aof_current_size:%lld\r\n" 3008 "aof_base_size:%lld\r\n" 3009 "aof_pending_rewrite:%d\r\n" 3010 "aof_buffer_length:%zu\r\n" 3011 "aof_rewrite_buffer_length:%lu\r\n" 3012 "aof_pending_bio_fsync:%llu\r\n" 3013 "aof_delayed_fsync:%lu\r\n", 3014 (long long) server.aof_current_size, 3015 (long long) server.aof_rewrite_base_size, 3016 server.aof_rewrite_scheduled, 3017 sdslen(server.aof_buf), 3018 aofRewriteBufferSize(), 3019 bioPendingJobsOfType(BIO_AOF_FSYNC), 3020 server.aof_delayed_fsync); 3021 } 3022 3023 if (server.loading) { 3024 double perc; 3025 time_t eta, elapsed; 3026 off_t remaining_bytes = server.loading_total_bytes- 3027 server.loading_loaded_bytes; 3028 3029 perc = ((double)server.loading_loaded_bytes / 3030 (server.loading_total_bytes+1)) * 100; 3031 3032 elapsed = time(NULL)-server.loading_start_time; 3033 if (elapsed == 0) { 3034 eta = 1; /* A fake 1 second figure if we don't have 3035 enough info */ 3036 } else { 3037 eta = (elapsed*remaining_bytes)/(server.loading_loaded_bytes+1); 3038 } 3039 3040 info = sdscatprintf(info, 3041 "loading_start_time:%jd\r\n" 3042 "loading_total_bytes:%llu\r\n" 3043 "loading_loaded_bytes:%llu\r\n" 3044 "loading_loaded_perc:%.2f\r\n" 3045 "loading_eta_seconds:%jd\r\n", 3046 (intmax_t) server.loading_start_time, 3047 (unsigned long long) server.loading_total_bytes, 3048 (unsigned long long) server.loading_loaded_bytes, 3049 perc, 3050 (intmax_t)eta 3051 ); 3052 } 3053 } 3054 3055 /* Stats */ 3056 if (allsections || defsections || !strcasecmp(section,"stats")) { 3057 if (sections++) info = sdscat(info,"\r\n"); 3058 info = sdscatprintf(info, 3059 "# Stats\r\n" 3060 "total_connections_received:%lld\r\n" 3061 "total_commands_processed:%lld\r\n" 3062 "instantaneous_ops_per_sec:%lld\r\n" 3063 "total_net_input_bytes:%lld\r\n" 3064 "total_net_output_bytes:%lld\r\n" 3065 "instantaneous_input_kbps:%.2f\r\n" 3066 "instantaneous_output_kbps:%.2f\r\n" 3067 "rejected_connections:%lld\r\n" 3068 "sync_full:%lld\r\n" 3069 "sync_partial_ok:%lld\r\n" 3070 "sync_partial_err:%lld\r\n" 3071 "expired_keys:%lld\r\n" 3072 "evicted_keys:%lld\r\n" 3073 "keyspace_hits:%lld\r\n" 3074 "keyspace_misses:%lld\r\n" 3075 "pubsub_channels:%ld\r\n" 3076 "pubsub_patterns:%lu\r\n" 3077 "latest_fork_usec:%lld\r\n" 3078 "migrate_cached_sockets:%ld\r\n", 3079 server.stat_numconnections, 3080 server.stat_numcommands, 3081 getInstantaneousMetric(STATS_METRIC_COMMAND), 3082 server.stat_net_input_bytes, 3083 server.stat_net_output_bytes, 3084 (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024, 3085 (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024, 3086 server.stat_rejected_conn, 3087 server.stat_sync_full, 3088 server.stat_sync_partial_ok, 3089 server.stat_sync_partial_err, 3090 server.stat_expiredkeys, 3091 server.stat_evictedkeys, 3092 server.stat_keyspace_hits, 3093 server.stat_keyspace_misses, 3094 dictSize(server.pubsub_channels), 3095 listLength(server.pubsub_patterns), 3096 server.stat_fork_time, 3097 dictSize(server.migrate_cached_sockets)); 3098 } 3099 3100 /* Replication */ 3101 if (allsections || defsections || !strcasecmp(section,"replication")) { 3102 if (sections++) info = sdscat(info,"\r\n"); 3103 info = sdscatprintf(info, 3104 "# Replication\r\n" 3105 "role:%s\r\n", 3106 server.masterhost == NULL ? "master" : "slave"); 3107 if (server.masterhost) { 3108 long long slave_repl_offset = 1; 3109 3110 if (server.master) 3111 slave_repl_offset = server.master->reploff; 3112 else if (server.cached_master) 3113 slave_repl_offset = server.cached_master->reploff; 3114 3115 info = sdscatprintf(info, 3116 "master_host:%s\r\n" 3117 "master_port:%d\r\n" 3118 "master_link_status:%s\r\n" 3119 "master_last_io_seconds_ago:%d\r\n" 3120 "master_sync_in_progress:%d\r\n" 3121 "slave_repl_offset:%lld\r\n" 3122 ,server.masterhost, 3123 server.masterport, 3124 (server.repl_state == REPL_STATE_CONNECTED) ? 3125 "up" : "down", 3126 server.master ? 3127 ((int)(server.unixtime-server.master->lastinteraction)) : -1, 3128 server.repl_state == REPL_STATE_TRANSFER, 3129 slave_repl_offset 3130 ); 3131 3132 if (server.repl_state == REPL_STATE_TRANSFER) { 3133 info = sdscatprintf(info, 3134 "master_sync_left_bytes:%lld\r\n" 3135 "master_sync_last_io_seconds_ago:%d\r\n" 3136 , (long long) 3137 (server.repl_transfer_size - server.repl_transfer_read), 3138 (int)(server.unixtime-server.repl_transfer_lastio) 3139 ); 3140 } 3141 3142 if (server.repl_state != REPL_STATE_CONNECTED) { 3143 info = sdscatprintf(info, 3144 "master_link_down_since_seconds:%jd\r\n", 3145 (intmax_t)server.unixtime-server.repl_down_since); 3146 } 3147 info = sdscatprintf(info, 3148 "slave_priority:%d\r\n" 3149 "slave_read_only:%d\r\n", 3150 server.slave_priority, 3151 server.repl_slave_ro); 3152 } 3153 3154 info = sdscatprintf(info, 3155 "connected_slaves:%lu\r\n", 3156 listLength(server.slaves)); 3157 3158 /* If min-slaves-to-write is active, write the number of slaves 3159 * currently considered 'good'. */ 3160 if (server.repl_min_slaves_to_write && 3161 server.repl_min_slaves_max_lag) { 3162 info = sdscatprintf(info, 3163 "min_slaves_good_slaves:%d\r\n", 3164 server.repl_good_slaves_count); 3165 } 3166 3167 if (listLength(server.slaves)) { 3168 int slaveid = 0; 3169 listNode *ln; 3170 listIter li; 3171 3172 listRewind(server.slaves,&li); 3173 while((ln = listNext(&li))) { 3174 client *slave = listNodeValue(ln); 3175 char *state = NULL; 3176 char ip[NET_IP_STR_LEN]; 3177 int port; 3178 long lag = 0; 3179 3180 if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1) continue; 3181 switch(slave->replstate) { 3182 case SLAVE_STATE_WAIT_BGSAVE_START: 3183 case SLAVE_STATE_WAIT_BGSAVE_END: 3184 state = "wait_bgsave"; 3185 break; 3186 case SLAVE_STATE_SEND_BULK: 3187 state = "send_bulk"; 3188 break; 3189 case SLAVE_STATE_ONLINE: 3190 state = "online"; 3191 break; 3192 } 3193 if (state == NULL) continue; 3194 if (slave->replstate == SLAVE_STATE_ONLINE) 3195 lag = time(NULL) - slave->repl_ack_time; 3196 3197 info = sdscatprintf(info, 3198 "slave%d:ip=%s,port=%d,state=%s," 3199 "offset=%lld,lag=%ld\r\n", 3200 slaveid,ip,slave->slave_listening_port,state, 3201 slave->repl_ack_off, lag); 3202 slaveid++; 3203 } 3204 } 3205 info = sdscatprintf(info, 3206 "master_repl_offset:%lld\r\n" 3207 "repl_backlog_active:%d\r\n" 3208 "repl_backlog_size:%lld\r\n" 3209 "repl_backlog_first_byte_offset:%lld\r\n" 3210 "repl_backlog_histlen:%lld\r\n", 3211 server.master_repl_offset, 3212 server.repl_backlog != NULL, 3213 server.repl_backlog_size, 3214 server.repl_backlog_off, 3215 server.repl_backlog_histlen); 3216 } 3217 3218 /* CPU */ 3219 if (allsections || defsections || !strcasecmp(section,"cpu")) { 3220 if (sections++) info = sdscat(info,"\r\n"); 3221 info = sdscatprintf(info, 3222 "# CPU\r\n" 3223 "used_cpu_sys:%.2f\r\n" 3224 "used_cpu_user:%.2f\r\n" 3225 "used_cpu_sys_children:%.2f\r\n" 3226 "used_cpu_user_children:%.2f\r\n", 3227 (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000, 3228 (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000, 3229 (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000, 3230 (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000); 3231 } 3232 3233 /* cmdtime */ 3234 if (allsections || !strcasecmp(section,"commandstats")) { 3235 if (sections++) info = sdscat(info,"\r\n"); 3236 info = sdscatprintf(info, "# Commandstats\r\n"); 3237 numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); 3238 for (j = 0; j < numcommands; j++) { 3239 struct redisCommand *c = redisCommandTable+j; 3240 3241 if (!c->calls) continue; 3242 info = sdscatprintf(info, 3243 "cmdstat_%s:calls=%lld,usec=%lld,usec_per_call=%.2f\r\n", 3244 c->name, c->calls, c->microseconds, 3245 (c->calls == 0) ? 0 : ((float)c->microseconds/c->calls)); 3246 } 3247 } 3248 3249 /* Cluster */ 3250 if (allsections || defsections || !strcasecmp(section,"cluster")) { 3251 if (sections++) info = sdscat(info,"\r\n"); 3252 info = sdscatprintf(info, 3253 "# Cluster\r\n" 3254 "cluster_enabled:%d\r\n", 3255 server.cluster_enabled); 3256 } 3257 3258 /* Key space */ 3259 if (allsections || defsections || !strcasecmp(section,"keyspace")) { 3260 if (sections++) info = sdscat(info,"\r\n"); 3261 info = sdscatprintf(info, "# Keyspace\r\n"); 3262 for (j = 0; j < server.dbnum; j++) { 3263 long long keys, vkeys; 3264 3265 keys = dictSize(server.db[j].dict); 3266 vkeys = dictSize(server.db[j].expires); 3267 if (keys || vkeys) { 3268 info = sdscatprintf(info, 3269 "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n", 3270 j, keys, vkeys, server.db[j].avg_ttl); 3271 } 3272 } 3273 } 3274 return info; 3275 } 3276 3277 void infoCommand(client *c) { 3278 char *section = c->argc == 2 ? c->argv[1]->ptr : "default"; 3279 3280 if (c->argc > 2) { 3281 addReply(c,shared.syntaxerr); 3282 return; 3283 } 3284 addReplyBulkSds(c, genRedisInfoString(section)); 3285 } 3286 3287 void monitorCommand(client *c) { 3288 /* ignore MONITOR if already slave or in monitor mode */ 3289 if (c->flags & CLIENT_SLAVE) return; 3290 3291 c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR); 3292 listAddNodeTail(server.monitors,c); 3293 addReply(c,shared.ok); 3294 } 3295 3296 /* ============================ Maxmemory directive ======================== */ 3297 3298 /* freeMemoryIfNeeded() gets called when 'maxmemory' is set on the config 3299 * file to limit the max memory used by the server, before processing a 3300 * command. 3301 * 3302 * The goal of the function is to free enough memory to keep Redis under the 3303 * configured memory limit. 3304 * 3305 * The function starts calculating how many bytes should be freed to keep 3306 * Redis under the limit, and enters a loop selecting the best keys to 3307 * evict accordingly to the configured policy. 3308 * 3309 * If all the bytes needed to return back under the limit were freed the 3310 * function returns C_OK, otherwise C_ERR is returned, and the caller 3311 * should block the execution of commands that will result in more memory 3312 * used by the server. 3313 * 3314 * ------------------------------------------------------------------------ 3315 * 3316 * LRU approximation algorithm 3317 * 3318 * Redis uses an approximation of the LRU algorithm that runs in constant 3319 * memory. Every time there is a key to expire, we sample N keys (with 3320 * N very small, usually in around 5) to populate a pool of best keys to 3321 * evict of M keys (the pool size is defined by MAXMEMORY_EVICTION_POOL_SIZE). 3322 * 3323 * The N keys sampled are added in the pool of good keys to expire (the one 3324 * with an old access time) if they are better than one of the current keys 3325 * in the pool. 3326 * 3327 * After the pool is populated, the best key we have in the pool is expired. 3328 * However note that we don't remove keys from the pool when they are deleted 3329 * so the pool may contain keys that no longer exist. 3330 * 3331 * When we try to evict a key, and all the entries in the pool don't exist 3332 * we populate it again. This time we'll be sure that the pool has at least 3333 * one key that can be evicted, if there is at least one key that can be 3334 * evicted in the whole database. */ 3335 3336 /* Create a new eviction pool. */ 3337 struct evictionPoolEntry *evictionPoolAlloc(void) { 3338 struct evictionPoolEntry *ep; 3339 int j; 3340 3341 ep = zmalloc(sizeof(*ep)*MAXMEMORY_EVICTION_POOL_SIZE); 3342 for (j = 0; j < MAXMEMORY_EVICTION_POOL_SIZE; j++) { 3343 ep[j].idle = 0; 3344 ep[j].key = NULL; 3345 } 3346 return ep; 3347 } 3348 3349 /* This is an helper function for freeMemoryIfNeeded(), it is used in order 3350 * to populate the evictionPool with a few entries every time we want to 3351 * expire a key. Keys with idle time smaller than one of the current 3352 * keys are added. Keys are always added if there are free entries. 3353 * 3354 * We insert keys on place in ascending order, so keys with the smaller 3355 * idle time are on the left, and keys with the higher idle time on the 3356 * right. */ 3357 3358 #define EVICTION_SAMPLES_ARRAY_SIZE 16 3359 void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) { 3360 int j, k, count; 3361 dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE]; 3362 dictEntry **samples; 3363 3364 /* Try to use a static buffer: this function is a big hit... 3365 * Note: it was actually measured that this helps. */ 3366 if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) { 3367 samples = _samples; 3368 } else { 3369 samples = zmalloc(sizeof(samples[0])*server.maxmemory_samples); 3370 } 3371 3372 count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples); 3373 for (j = 0; j < count; j++) { 3374 unsigned long long idle; 3375 sds key; 3376 robj *o; 3377 dictEntry *de; 3378 3379 de = samples[j]; 3380 key = dictGetKey(de); 3381 /* If the dictionary we are sampling from is not the main 3382 * dictionary (but the expires one) we need to lookup the key 3383 * again in the key dictionary to obtain the value object. */ 3384 if (sampledict != keydict) de = dictFind(keydict, key); 3385 o = dictGetVal(de); 3386 idle = estimateObjectIdleTime(o); 3387 3388 /* Insert the element inside the pool. 3389 * First, find the first empty bucket or the first populated 3390 * bucket that has an idle time smaller than our idle time. */ 3391 k = 0; 3392 while (k < MAXMEMORY_EVICTION_POOL_SIZE && 3393 pool[k].key && 3394 pool[k].idle < idle) k++; 3395 if (k == 0 && pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key != NULL) { 3396 /* Can't insert if the element is < the worst element we have 3397 * and there are no empty buckets. */ 3398 continue; 3399 } else if (k < MAXMEMORY_EVICTION_POOL_SIZE && pool[k].key == NULL) { 3400 /* Inserting into empty position. No setup needed before insert. */ 3401 } else { 3402 /* Inserting in the middle. Now k points to the first element 3403 * greater than the element to insert. */ 3404 if (pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key == NULL) { 3405 /* Free space on the right? Insert at k shifting 3406 * all the elements from k to end to the right. */ 3407 memmove(pool+k+1,pool+k, 3408 sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1)); 3409 } else { 3410 /* No free space on right? Insert at k-1 */ 3411 k--; 3412 /* Shift all elements on the left of k (included) to the 3413 * left, so we discard the element with smaller idle time. */ 3414 sdsfree(pool[0].key); 3415 memmove(pool,pool+1,sizeof(pool[0])*k); 3416 } 3417 } 3418 pool[k].key = sdsdup(key); 3419 pool[k].idle = idle; 3420 } 3421 if (samples != _samples) zfree(samples); 3422 } 3423 3424 int freeMemoryIfNeeded(void) { 3425 size_t mem_used, mem_tofree, mem_freed; 3426 int slaves = listLength(server.slaves); 3427 mstime_t latency, eviction_latency; 3428 3429 /* Remove the size of slaves output buffers and AOF buffer from the 3430 * count of used memory. */ 3431 mem_used = zmalloc_used_memory(); 3432 if (slaves) { 3433 listIter li; 3434 listNode *ln; 3435 3436 listRewind(server.slaves,&li); 3437 while((ln = listNext(&li))) { 3438 client *slave = listNodeValue(ln); 3439 unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave); 3440 if (obuf_bytes > mem_used) 3441 mem_used = 0; 3442 else 3443 mem_used -= obuf_bytes; 3444 } 3445 } 3446 if (server.aof_state != AOF_OFF) { 3447 mem_used -= sdslen(server.aof_buf); 3448 mem_used -= aofRewriteBufferSize(); 3449 } 3450 3451 /* Check if we are over the memory limit. */ 3452 if (mem_used <= server.maxmemory) return C_OK; 3453 3454 if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) 3455 return C_ERR; /* We need to free memory, but policy forbids. */ 3456 3457 /* Compute how much memory we need to free. */ 3458 mem_tofree = mem_used - server.maxmemory; 3459 mem_freed = 0; 3460 latencyStartMonitor(latency); 3461 while (mem_freed < mem_tofree) { 3462 int j, k, keys_freed = 0; 3463 3464 for (j = 0; j < server.dbnum; j++) { 3465 long bestval = 0; /* just to prevent warning */ 3466 sds bestkey = NULL; 3467 dictEntry *de; 3468 redisDb *db = server.db+j; 3469 dict *dict; 3470 3471 if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU || 3472 server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) 3473 { 3474 dict = server.db[j].dict; 3475 } else { 3476 dict = server.db[j].expires; 3477 } 3478 if (dictSize(dict) == 0) continue; 3479 3480 /* volatile-random and allkeys-random policy */ 3481 if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || 3482 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) 3483 { 3484 de = dictGetRandomKey(dict); 3485 bestkey = dictGetKey(de); 3486 } 3487 3488 /* volatile-lru and allkeys-lru policy */ 3489 else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU || 3490 server.maxmemory_policy == MAXMEMORY_VOLATILE_LRU) 3491 { 3492 struct evictionPoolEntry *pool = db->eviction_pool; 3493 3494 while(bestkey == NULL) { 3495 evictionPoolPopulate(dict, db->dict, db->eviction_pool); 3496 /* Go backward from best to worst element to evict. */ 3497 for (k = MAXMEMORY_EVICTION_POOL_SIZE-1; k >= 0; k--) { 3498 if (pool[k].key == NULL) continue; 3499 de = dictFind(dict,pool[k].key); 3500 3501 /* Remove the entry from the pool. */ 3502 sdsfree(pool[k].key); 3503 /* Shift all elements on its right to left. */ 3504 memmove(pool+k,pool+k+1, 3505 sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1)); 3506 /* Clear the element on the right which is empty 3507 * since we shifted one position to the left. */ 3508 pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key = NULL; 3509 pool[MAXMEMORY_EVICTION_POOL_SIZE-1].idle = 0; 3510 3511 /* If the key exists, is our pick. Otherwise it is 3512 * a ghost and we need to try the next element. */ 3513 if (de) { 3514 bestkey = dictGetKey(de); 3515 break; 3516 } else { 3517 /* Ghost... */ 3518 continue; 3519 } 3520 } 3521 } 3522 } 3523 3524 /* volatile-ttl */ 3525 else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { 3526 for (k = 0; k < server.maxmemory_samples; k++) { 3527 sds thiskey; 3528 long thisval; 3529 3530 de = dictGetRandomKey(dict); 3531 thiskey = dictGetKey(de); 3532 thisval = (long) dictGetVal(de); 3533 3534 /* Expire sooner (minor expire unix timestamp) is better 3535 * candidate for deletion */ 3536 if (bestkey == NULL || thisval < bestval) { 3537 bestkey = thiskey; 3538 bestval = thisval; 3539 } 3540 } 3541 } 3542 3543 /* Finally remove the selected key. */ 3544 if (bestkey) { 3545 long long delta; 3546 3547 robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); 3548 propagateExpire(db,keyobj); 3549 /* We compute the amount of memory freed by dbDelete() alone. 3550 * It is possible that actually the memory needed to propagate 3551 * the DEL in AOF and replication link is greater than the one 3552 * we are freeing removing the key, but we can't account for 3553 * that otherwise we would never exit the loop. 3554 * 3555 * AOF and Output buffer memory will be freed eventually so 3556 * we only care about memory used by the key space. */ 3557 delta = (long long) zmalloc_used_memory(); 3558 latencyStartMonitor(eviction_latency); 3559 dbDelete(db,keyobj); 3560 latencyEndMonitor(eviction_latency); 3561 latencyAddSampleIfNeeded("eviction-del",eviction_latency); 3562 latencyRemoveNestedEvent(latency,eviction_latency); 3563 delta -= (long long) zmalloc_used_memory(); 3564 mem_freed += delta; 3565 server.stat_evictedkeys++; 3566 notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", 3567 keyobj, db->id); 3568 decrRefCount(keyobj); 3569 keys_freed++; 3570 3571 /* When the memory to free starts to be big enough, we may 3572 * start spending so much time here that is impossible to 3573 * deliver data to the slaves fast enough, so we force the 3574 * transmission here inside the loop. */ 3575 if (slaves) flushSlavesOutputBuffers(); 3576 } 3577 } 3578 if (!keys_freed) { 3579 latencyEndMonitor(latency); 3580 latencyAddSampleIfNeeded("eviction-cycle",latency); 3581 return C_ERR; /* nothing to free... */ 3582 } 3583 } 3584 latencyEndMonitor(latency); 3585 latencyAddSampleIfNeeded("eviction-cycle",latency); 3586 return C_OK; 3587 } 3588 3589 /* =================================== Main! ================================ */ 3590 3591 #ifdef __linux__ 3592 int linuxOvercommitMemoryValue(void) { 3593 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r"); 3594 char buf[64]; 3595 3596 if (!fp) return -1; 3597 if (fgets(buf,64,fp) == NULL) { 3598 fclose(fp); 3599 return -1; 3600 } 3601 fclose(fp); 3602 3603 return atoi(buf); 3604 } 3605 3606 void linuxMemoryWarnings(void) { 3607 if (linuxOvercommitMemoryValue() == 0) { 3608 serverLog(LL_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect."); 3609 } 3610 if (THPIsEnabled()) { 3611 serverLog(LL_WARNING,"WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled."); 3612 } 3613 } 3614 #endif /* __linux__ */ 3615 3616 void createPidFile(void) { 3617 /* If pidfile requested, but no pidfile defined, use 3618 * default pidfile path */ 3619 if (!server.pidfile) server.pidfile = zstrdup(CONFIG_DEFAULT_PID_FILE); 3620 3621 /* Try to write the pid file in a best-effort way. */ 3622 FILE *fp = fopen(server.pidfile,"w"); 3623 if (fp) { 3624 fprintf(fp,"%d\n",(int)getpid()); 3625 fclose(fp); 3626 } 3627 } 3628 3629 void daemonize(void) { 3630 int fd; 3631 3632 if (fork() != 0) exit(0); /* parent exits */ 3633 setsid(); /* create a new session */ 3634 3635 /* Every output goes to /dev/null. If Redis is daemonized but 3636 * the 'logfile' is set to 'stdout' in the configuration file 3637 * it will not log at all. */ 3638 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) { 3639 dup2(fd, STDIN_FILENO); 3640 dup2(fd, STDOUT_FILENO); 3641 dup2(fd, STDERR_FILENO); 3642 if (fd > STDERR_FILENO) close(fd); 3643 } 3644 } 3645 3646 void version(void) { 3647 printf("Redis server v=%s sha=%s:%d malloc=%s bits=%d build=%llx\n", 3648 REDIS_VERSION, 3649 redisGitSHA1(), 3650 atoi(redisGitDirty()) > 0, 3651 ZMALLOC_LIB, 3652 sizeof(long) == 4 ? 32 : 64, 3653 (unsigned long long) redisBuildId()); 3654 exit(0); 3655 } 3656 3657 void usage(void) { 3658 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf] [options]\n"); 3659 fprintf(stderr," ./redis-server - (read config from stdin)\n"); 3660 fprintf(stderr," ./redis-server -v or --version\n"); 3661 fprintf(stderr," ./redis-server -h or --help\n"); 3662 fprintf(stderr," ./redis-server --test-memory <megabytes>\n\n"); 3663 fprintf(stderr,"Examples:\n"); 3664 fprintf(stderr," ./redis-server (run the server with default conf)\n"); 3665 fprintf(stderr," ./redis-server /etc/redis/6379.conf\n"); 3666 fprintf(stderr," ./redis-server --port 7777\n"); 3667 fprintf(stderr," ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n"); 3668 fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n\n"); 3669 fprintf(stderr,"Sentinel mode:\n"); 3670 fprintf(stderr," ./redis-server /etc/sentinel.conf --sentinel\n"); 3671 exit(1); 3672 } 3673 3674 void redisAsciiArt(void) { 3675 #include "asciilogo.h" 3676 char *buf = zmalloc(1024*16); 3677 char *mode; 3678 3679 if (server.cluster_enabled) mode = "cluster"; 3680 else if (server.sentinel_mode) mode = "sentinel"; 3681 else mode = "standalone"; 3682 3683 if (server.syslog_enabled) { 3684 serverLog(LL_NOTICE, 3685 "Redis %s (%s/%d) %s bit, %s mode, port %d, pid %ld ready to start.", 3686 REDIS_VERSION, 3687 redisGitSHA1(), 3688 strtol(redisGitDirty(),NULL,10) > 0, 3689 (sizeof(long) == 8) ? "64" : "32", 3690 mode, server.port, 3691 (long) getpid() 3692 ); 3693 } else { 3694 snprintf(buf,1024*16,ascii_logo, 3695 REDIS_VERSION, 3696 redisGitSHA1(), 3697 strtol(redisGitDirty(),NULL,10) > 0, 3698 (sizeof(long) == 8) ? "64" : "32", 3699 mode, server.port, 3700 (long) getpid() 3701 ); 3702 serverLogRaw(LL_NOTICE|LL_RAW,buf); 3703 } 3704 zfree(buf); 3705 } 3706 3707 static void sigShutdownHandler(int sig) { 3708 char *msg; 3709 3710 switch (sig) { 3711 case SIGINT: 3712 msg = "Received SIGINT scheduling shutdown..."; 3713 break; 3714 case SIGTERM: 3715 msg = "Received SIGTERM scheduling shutdown..."; 3716 break; 3717 default: 3718 msg = "Received shutdown signal, scheduling shutdown..."; 3719 }; 3720 3721 /* SIGINT is often delivered via Ctrl+C in an interactive session. 3722 * If we receive the signal the second time, we interpret this as 3723 * the user really wanting to quit ASAP without waiting to persist 3724 * on disk. */ 3725 if (server.shutdown_asap && sig == SIGINT) { 3726 serverLogFromHandler(LL_WARNING, "You insist... exiting now."); 3727 rdbRemoveTempFile(getpid()); 3728 exit(1); /* Exit with an error since this was not a clean shutdown. */ 3729 } else if (server.loading) { 3730 exit(0); 3731 } 3732 3733 serverLogFromHandler(LL_WARNING, msg); 3734 server.shutdown_asap = 1; 3735 } 3736 3737 void setupSignalHandlers(void) { 3738 struct sigaction act; 3739 3740 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. 3741 * Otherwise, sa_handler is used. */ 3742 sigemptyset(&act.sa_mask); 3743 act.sa_flags = 0; 3744 act.sa_handler = sigShutdownHandler; 3745 sigaction(SIGTERM, &act, NULL); 3746 sigaction(SIGINT, &act, NULL); 3747 3748 #ifdef HAVE_BACKTRACE 3749 sigemptyset(&act.sa_mask); 3750 act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO; 3751 act.sa_sigaction = sigsegvHandler; 3752 sigaction(SIGSEGV, &act, NULL); 3753 sigaction(SIGBUS, &act, NULL); 3754 sigaction(SIGFPE, &act, NULL); 3755 sigaction(SIGILL, &act, NULL); 3756 #endif 3757 return; 3758 } 3759 3760 void memtest(size_t megabytes, int passes); 3761 3762 /* Returns 1 if there is --sentinel among the arguments or if 3763 * argv[0] is exactly "redis-sentinel". */ 3764 int checkForSentinelMode(int argc, char **argv) { 3765 int j; 3766 3767 if (strstr(argv[0],"redis-sentinel") != NULL) return 1; 3768 for (j = 1; j < argc; j++) 3769 if (!strcmp(argv[j],"--sentinel")) return 1; 3770 return 0; 3771 } 3772 3773 /* Function called at startup to load RDB or AOF file in memory. */ 3774 void loadDataFromDisk(void) { 3775 long long start = ustime(); 3776 if (server.aof_state == AOF_ON) { 3777 if (loadAppendOnlyFile(server.aof_filename) == C_OK) 3778 serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); 3779 } else { 3780 if (rdbLoad(server.rdb_filename) == C_OK) { 3781 serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", 3782 (float)(ustime()-start)/1000000); 3783 } else if (errno != ENOENT) { 3784 serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); 3785 exit(1); 3786 } 3787 } 3788 } 3789 3790 void redisOutOfMemoryHandler(size_t allocation_size) { 3791 serverLog(LL_WARNING,"Out Of Memory allocating %zu bytes!", 3792 allocation_size); 3793 serverPanic("Redis aborting for OUT OF MEMORY"); 3794 } 3795 3796 void redisSetProcTitle(char *title) { 3797 #ifdef USE_SETPROCTITLE 3798 char *server_mode = ""; 3799 if (server.cluster_enabled) server_mode = " [cluster]"; 3800 else if (server.sentinel_mode) server_mode = " [sentinel]"; 3801 3802 setproctitle("%s %s:%d%s", 3803 title, 3804 server.bindaddr_count ? server.bindaddr[0] : "*", 3805 server.port, 3806 server_mode); 3807 #else 3808 UNUSED(title); 3809 #endif 3810 } 3811 3812 /* 3813 * Check whether systemd or upstart have been used to start redis. 3814 */ 3815 3816 int redisSupervisedUpstart(void) { 3817 const char *upstart_job = getenv("UPSTART_JOB"); 3818 3819 if (!upstart_job) { 3820 serverLog(LL_WARNING, 3821 "upstart supervision requested, but UPSTART_JOB not found"); 3822 return 0; 3823 } 3824 3825 serverLog(LL_NOTICE, "supervised by upstart, will stop to signal readiness"); 3826 raise(SIGSTOP); 3827 unsetenv("UPSTART_JOB"); 3828 return 1; 3829 } 3830 3831 int redisSupervisedSystemd(void) { 3832 const char *notify_socket = getenv("NOTIFY_SOCKET"); 3833 int fd = 1; 3834 struct sockaddr_un su; 3835 struct iovec iov; 3836 struct msghdr hdr; 3837 int sendto_flags = 0; 3838 3839 if (!notify_socket) { 3840 serverLog(LL_WARNING, 3841 "systemd supervision requested, but NOTIFY_SOCKET not found"); 3842 return 0; 3843 } 3844 3845 if ((strchr("@/", notify_socket[0])) == NULL || strlen(notify_socket) < 2) { 3846 return 0; 3847 } 3848 3849 serverLog(LL_NOTICE, "supervised by systemd, will signal readiness"); 3850 if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) { 3851 serverLog(LL_WARNING, 3852 "Can't connect to systemd socket %s", notify_socket); 3853 return 0; 3854 } 3855 3856 memset(&su, 0, sizeof(su)); 3857 su.sun_family = AF_UNIX; 3858 strncpy (su.sun_path, notify_socket, sizeof(su.sun_path) -1); 3859 su.sun_path[sizeof(su.sun_path) - 1] = '\0'; 3860 3861 if (notify_socket[0] == '@') 3862 su.sun_path[0] = '\0'; 3863 3864 memset(&iov, 0, sizeof(iov)); 3865 iov.iov_base = "READY=1"; 3866 iov.iov_len = strlen("READY=1"); 3867 3868 memset(&hdr, 0, sizeof(hdr)); 3869 hdr.msg_name = &su; 3870 hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) + 3871 strlen(notify_socket); 3872 hdr.msg_iov = &iov; 3873 hdr.msg_iovlen = 1; 3874 3875 unsetenv("NOTIFY_SOCKET"); 3876 #ifdef HAVE_MSG_NOSIGNAL 3877 sendto_flags |= MSG_NOSIGNAL; 3878 #endif 3879 if (sendmsg(fd, &hdr, sendto_flags) < 0) { 3880 serverLog(LL_WARNING, "Can't send notification to systemd"); 3881 close(fd); 3882 return 0; 3883 } 3884 close(fd); 3885 return 1; 3886 } 3887 3888 int redisIsSupervised(int mode) { 3889 if (mode == SUPERVISED_AUTODETECT) { 3890 const char *upstart_job = getenv("UPSTART_JOB"); 3891 const char *notify_socket = getenv("NOTIFY_SOCKET"); 3892 3893 if (upstart_job) { 3894 redisSupervisedUpstart(); 3895 } else if (notify_socket) { 3896 redisSupervisedSystemd(); 3897 } 3898 } else if (mode == SUPERVISED_UPSTART) { 3899 return redisSupervisedUpstart(); 3900 } else if (mode == SUPERVISED_SYSTEMD) { 3901 return redisSupervisedSystemd(); 3902 } 3903 3904 return 0; 3905 } 3906 3907 3908 int main(int argc, char **argv) { 3909 struct timeval tv; 3910 int j; 3911 3912 #ifdef REDIS_TEST 3913 if (argc == 3 && !strcasecmp(argv[1], "test")) { 3914 if (!strcasecmp(argv[2], "ziplist")) { 3915 return ziplistTest(argc, argv); 3916 } else if (!strcasecmp(argv[2], "quicklist")) { 3917 quicklistTest(argc, argv); 3918 } else if (!strcasecmp(argv[2], "intset")) { 3919 return intsetTest(argc, argv); 3920 } else if (!strcasecmp(argv[2], "zipmap")) { 3921 return zipmapTest(argc, argv); 3922 } else if (!strcasecmp(argv[2], "sha1test")) { 3923 return sha1Test(argc, argv); 3924 } else if (!strcasecmp(argv[2], "util")) { 3925 return utilTest(argc, argv); 3926 } else if (!strcasecmp(argv[2], "sds")) { 3927 return sdsTest(argc, argv); 3928 } else if (!strcasecmp(argv[2], "endianconv")) { 3929 return endianconvTest(argc, argv); 3930 } else if (!strcasecmp(argv[2], "crc64")) { 3931 return crc64Test(argc, argv); 3932 } 3933 3934 return -1; /* test not found */ 3935 } 3936 #endif 3937 3938 /* We need to initialize our libraries, and the server configuration. */ 3939 #ifdef INIT_SETPROCTITLE_REPLACEMENT 3940 spt_init(argc, argv); 3941 #endif 3942 setlocale(LC_COLLATE,""); 3943 zmalloc_enable_thread_safeness(); 3944 zmalloc_set_oom_handler(redisOutOfMemoryHandler); 3945 srand(time(NULL)^getpid()); 3946 gettimeofday(&tv,NULL); 3947 dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid()); 3948 server.sentinel_mode = checkForSentinelMode(argc,argv); 3949 initServerConfig(); 3950 3951 /* Store the executable path and arguments in a safe place in order 3952 * to be able to restart the server later. */ 3953 server.executable = getAbsolutePath(argv[0]); 3954 server.exec_argv = zmalloc(sizeof(char*)*(argc+1)); 3955 server.exec_argv[argc] = NULL; 3956 for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]); 3957 3958 /* We need to init sentinel right now as parsing the configuration file 3959 * in sentinel mode will have the effect of populating the sentinel 3960 * data structures with master nodes to monitor. */ 3961 if (server.sentinel_mode) { 3962 initSentinelConfig(); 3963 initSentinel(); 3964 } 3965 3966 /* Check if we need to start in redis-check-rdb mode. We just execute 3967 * the program main. However the program is part of the Redis executable 3968 * so that we can easily execute an RDB check on loading errors. */ 3969 if (strstr(argv[0],"redis-check-rdb") != NULL) 3970 exit(redis_check_rdb_main(argv,argc)); 3971 3972 if (argc >= 2) { 3973 j = 1; /* First option to parse in argv[] */ 3974 sds options = sdsempty(); 3975 char *configfile = NULL; 3976 3977 /* Handle special options --help and --version */ 3978 if (strcmp(argv[1], "-v") == 0 || 3979 strcmp(argv[1], "--version") == 0) version(); 3980 if (strcmp(argv[1], "--help") == 0 || 3981 strcmp(argv[1], "-h") == 0) usage(); 3982 if (strcmp(argv[1], "--test-memory") == 0) { 3983 if (argc == 3) { 3984 memtest(atoi(argv[2]),50); 3985 exit(0); 3986 } else { 3987 fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n"); 3988 fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n"); 3989 exit(1); 3990 } 3991 } 3992 3993 /* First argument is the config file name? */ 3994 if (argv[j][0] != '-' || argv[j][1] != '-') { 3995 configfile = argv[j]; 3996 server.configfile = getAbsolutePath(configfile); 3997 /* Replace the config file in server.exec_argv with 3998 * its absoulte path. */ 3999 zfree(server.exec_argv[j]); 4000 server.exec_argv[j] = zstrdup(server.configfile); 4001 j++; 4002 } 4003 4004 /* All the other options are parsed and conceptually appended to the 4005 * configuration file. For instance --port 6380 will generate the 4006 * string "port 6380\n" to be parsed after the actual file name 4007 * is parsed, if any. */ 4008 while(j != argc) { 4009 if (argv[j][0] == '-' && argv[j][1] == '-') { 4010 /* Option name */ 4011 if (!strcmp(argv[j], "--check-rdb")) { 4012 /* Argument has no options, need to skip for parsing. */ 4013 j++; 4014 continue; 4015 } 4016 if (sdslen(options)) options = sdscat(options,"\n"); 4017 options = sdscat(options,argv[j]+2); 4018 options = sdscat(options," "); 4019 } else { 4020 /* Option argument */ 4021 options = sdscatrepr(options,argv[j],strlen(argv[j])); 4022 options = sdscat(options," "); 4023 } 4024 j++; 4025 } 4026 if (server.sentinel_mode && configfile && *configfile == '-') { 4027 serverLog(LL_WARNING, 4028 "Sentinel config from STDIN not allowed."); 4029 serverLog(LL_WARNING, 4030 "Sentinel needs config file on disk to save state. Exiting..."); 4031 exit(1); 4032 } 4033 resetServerSaveParams(); 4034 loadServerConfig(configfile,options); 4035 sdsfree(options); 4036 } else { 4037 serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis"); 4038 } 4039 4040 server.supervised = redisIsSupervised(server.supervised_mode); 4041 int background = server.daemonize && !server.supervised; 4042 if (background) daemonize(); 4043 4044 initServer(); 4045 if (background || server.pidfile) createPidFile(); 4046 redisSetProcTitle(argv[0]); 4047 redisAsciiArt(); 4048 checkTcpBacklogSettings(); 4049 4050 if (!server.sentinel_mode) { 4051 /* Things not needed when running in Sentinel mode. */ 4052 serverLog(LL_WARNING,"Server started, Redis version " REDIS_VERSION); 4053 #ifdef __linux__ 4054 linuxMemoryWarnings(); 4055 #endif 4056 loadDataFromDisk(); 4057 if (server.cluster_enabled) { 4058 if (verifyClusterConfigWithData() == C_ERR) { 4059 serverLog(LL_WARNING, 4060 "You can't have keys in a DB different than DB 0 when in " 4061 "Cluster mode. Exiting."); 4062 exit(1); 4063 } 4064 } 4065 if (server.ipfd_count > 0) 4066 serverLog(LL_NOTICE,"The server is now ready to accept connections on port %d", server.port); 4067 if (server.sofd > 0) 4068 serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); 4069 } else { 4070 sentinelIsRunning(); 4071 } 4072 4073 /* Warning the user about suspicious maxmemory setting. */ 4074 if (server.maxmemory > 0 && server.maxmemory < 1024*1024) { 4075 serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); 4076 } 4077 4078 aeSetBeforeSleepProc(server.el,beforeSleep); 4079 aeMain(server.el); 4080 aeDeleteEventLoop(server.el); 4081 return 0; 4082 } 4083 4084 /* The End */ 4085