1 /* 2 * Copyright (c) 2009-2016, Salvatore Sanfilippo <antirez at gmail dot com> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * * Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * * Neither the name of Redis nor the names of its contributors may be used 14 * to endorse or promote products derived from this software without 15 * specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27 * POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "server.h" 31 #include "cluster.h" 32 #include "slowlog.h" 33 #include "bio.h" 34 #include "latency.h" 35 #include "atomicvar.h" 36 37 #ifdef HAVE_FF_KQUEUE 38 #include "ff_api.h" 39 #include "anet_ff.h" 40 #endif 41 42 #include <time.h> 43 #include <signal.h> 44 #include <sys/wait.h> 45 #include <errno.h> 46 #include <assert.h> 47 #include <ctype.h> 48 #include <stdarg.h> 49 #include <arpa/inet.h> 50 #include <sys/stat.h> 51 #include <fcntl.h> 52 #include <sys/time.h> 53 #include <sys/resource.h> 54 #include <sys/uio.h> 55 #include <sys/un.h> 56 #include <limits.h> 57 #include <float.h> 58 #include <math.h> 59 #include <sys/resource.h> 60 #include <sys/utsname.h> 61 #include <locale.h> 62 #include <sys/socket.h> 63 64 /* Our shared "common" objects */ 65 66 struct sharedObjectsStruct shared; 67 68 /* Global vars that are actually used as constants. The following double 69 * values are used for double on-disk serialization, and are initialized 70 * at runtime to avoid strange compiler optimizations. */ 71 72 double R_Zero, R_PosInf, R_NegInf, R_Nan; 73 74 /*================================= Globals ================================= */ 75 76 /* Global vars */ 77 struct redisServer server; /* Server global state */ 78 volatile unsigned long lru_clock; /* Server global current LRU time. */ 79 80 /* Our command table. 81 * 82 * Every entry is composed of the following fields: 83 * 84 * name: a string representing the command name. 85 * function: pointer to the C function implementing the command. 86 * arity: number of arguments, it is possible to use -N to say >= N 87 * sflags: command flags as string. See below for a table of flags. 88 * flags: flags as bitmask. Computed by Redis using the 'sflags' field. 89 * get_keys_proc: an optional function to get key arguments from a command. 90 * This is only used when the following three fields are not 91 * enough to specify what arguments are keys. 92 * first_key_index: first argument that is a key 93 * last_key_index: last argument that is a key 94 * key_step: step to get all the keys from first to last argument. For instance 95 * in MSET the step is two since arguments are key,val,key,val,... 96 * microseconds: microseconds of total execution time for this command. 97 * calls: total number of calls of this command. 98 * 99 * The flags, microseconds and calls fields are computed by Redis and should 100 * always be set to zero. 101 * 102 * Command flags are expressed using strings where every character represents 103 * a flag. Later the populateCommandTable() function will take care of 104 * populating the real 'flags' field using this characters. 105 * 106 * This is the meaning of the flags: 107 * 108 * w: write command (may modify the key space). 109 * r: read command (will never modify the key space). 110 * m: may increase memory usage once called. Don't allow if out of memory. 111 * a: admin command, like SAVE or SHUTDOWN. 112 * p: Pub/Sub related command. 113 * f: force replication of this command, regardless of server.dirty. 114 * s: command not allowed in scripts. 115 * R: random command. Command is not deterministic, that is, the same command 116 * with the same arguments, with the same key space, may have different 117 * results. For instance SPOP and RANDOMKEY are two random commands. 118 * S: Sort command output array if called from script, so that the output 119 * is deterministic. 120 * l: Allow command while loading the database. 121 * t: Allow command while a slave has stale data but is not allowed to 122 * server this data. Normally no command is accepted in this condition 123 * but just a few. 124 * M: Do not automatically propagate the command on MONITOR. 125 * k: Perform an implicit ASKING for this command, so the command will be 126 * accepted in cluster mode if the slot is marked as 'importing'. 127 * F: Fast command: O(1) or O(log(N)) command that should never delay 128 * its execution as long as the kernel scheduler is giving us time. 129 * Note that commands that may trigger a DEL as a side effect (like SET) 130 * are not fast commands. 131 */ 132 struct redisCommand redisCommandTable[] = { 133 {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0}, 134 {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, 135 {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}, 136 {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0}, 137 {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0}, 138 {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0}, 139 {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0}, 140 {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0}, 141 {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0}, 142 {"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0}, 143 {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0}, 144 {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0}, 145 {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0}, 146 {"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0}, 147 {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0}, 148 {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0}, 149 {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0}, 150 {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0}, 151 {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0}, 152 {"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0}, 153 {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, 154 {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, 155 {"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0}, 156 {"lpushx",lpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0}, 157 {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0}, 158 {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0}, 159 {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0}, 160 {"brpop",brpopCommand,-3,"ws",0,NULL,1,-2,1,0,0}, 161 {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0}, 162 {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0}, 163 {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0}, 164 {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0}, 165 {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0}, 166 {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0}, 167 {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0}, 168 {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0}, 169 {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0}, 170 {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0}, 171 {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0}, 172 {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0}, 173 {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0}, 174 {"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0}, 175 {"spop",spopCommand,-2,"wRF",0,NULL,1,1,1,0,0}, 176 {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0}, 177 {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0}, 178 {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, 179 {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0}, 180 {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, 181 {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0}, 182 {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, 183 {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0}, 184 {"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0}, 185 {"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0}, 186 {"zincrby",zincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0}, 187 {"zrem",zremCommand,-3,"wF",0,NULL,1,1,1,0,0}, 188 {"zremrangebyscore",zremrangebyscoreCommand,4,"w",0,NULL,1,1,1,0,0}, 189 {"zremrangebyrank",zremrangebyrankCommand,4,"w",0,NULL,1,1,1,0,0}, 190 {"zremrangebylex",zremrangebylexCommand,4,"w",0,NULL,1,1,1,0,0}, 191 {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0}, 192 {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0}, 193 {"zrange",zrangeCommand,-4,"r",0,NULL,1,1,1,0,0}, 194 {"zrangebyscore",zrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0}, 195 {"zrevrangebyscore",zrevrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0}, 196 {"zrangebylex",zrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0}, 197 {"zrevrangebylex",zrevrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0}, 198 {"zcount",zcountCommand,4,"rF",0,NULL,1,1,1,0,0}, 199 {"zlexcount",zlexcountCommand,4,"rF",0,NULL,1,1,1,0,0}, 200 {"zrevrange",zrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0}, 201 {"zcard",zcardCommand,2,"rF",0,NULL,1,1,1,0,0}, 202 {"zscore",zscoreCommand,3,"rF",0,NULL,1,1,1,0,0}, 203 {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0}, 204 {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0}, 205 {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0}, 206 {"zpopmin",zpopminCommand,-2,"wF",0,NULL,1,1,1,0,0}, 207 {"zpopmax",zpopmaxCommand,-2,"wF",0,NULL,1,1,1,0,0}, 208 {"bzpopmin",bzpopminCommand,-3,"wsF",0,NULL,1,-2,1,0,0}, 209 {"bzpopmax",bzpopmaxCommand,-3,"wsF",0,NULL,1,-2,1,0,0}, 210 {"hset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0}, 211 {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0}, 212 {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0}, 213 {"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0}, 214 {"hmget",hmgetCommand,-3,"rF",0,NULL,1,1,1,0,0}, 215 {"hincrby",hincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0}, 216 {"hincrbyfloat",hincrbyfloatCommand,4,"wmF",0,NULL,1,1,1,0,0}, 217 {"hdel",hdelCommand,-3,"wF",0,NULL,1,1,1,0,0}, 218 {"hlen",hlenCommand,2,"rF",0,NULL,1,1,1,0,0}, 219 {"hstrlen",hstrlenCommand,3,"rF",0,NULL,1,1,1,0,0}, 220 {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0}, 221 {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0}, 222 {"hgetall",hgetallCommand,2,"rR",0,NULL,1,1,1,0,0}, 223 {"hexists",hexistsCommand,3,"rF",0,NULL,1,1,1,0,0}, 224 {"hscan",hscanCommand,-3,"rR",0,NULL,1,1,1,0,0}, 225 {"incrby",incrbyCommand,3,"wmF",0,NULL,1,1,1,0,0}, 226 {"decrby",decrbyCommand,3,"wmF",0,NULL,1,1,1,0,0}, 227 {"incrbyfloat",incrbyfloatCommand,3,"wmF",0,NULL,1,1,1,0,0}, 228 {"getset",getsetCommand,3,"wm",0,NULL,1,1,1,0,0}, 229 {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0}, 230 {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0}, 231 {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0}, 232 {"select",selectCommand,2,"lF",0,NULL,0,0,0,0,0}, 233 {"swapdb",swapdbCommand,3,"wF",0,NULL,0,0,0,0,0}, 234 {"move",moveCommand,3,"wF",0,NULL,1,1,1,0,0}, 235 {"rename",renameCommand,3,"w",0,NULL,1,2,1,0,0}, 236 {"renamenx",renamenxCommand,3,"wF",0,NULL,1,2,1,0,0}, 237 {"expire",expireCommand,3,"wF",0,NULL,1,1,1,0,0}, 238 {"expireat",expireatCommand,3,"wF",0,NULL,1,1,1,0,0}, 239 {"pexpire",pexpireCommand,3,"wF",0,NULL,1,1,1,0,0}, 240 {"pexpireat",pexpireatCommand,3,"wF",0,NULL,1,1,1,0,0}, 241 {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0}, 242 {"scan",scanCommand,-2,"rR",0,NULL,0,0,0,0,0}, 243 {"dbsize",dbsizeCommand,1,"rF",0,NULL,0,0,0,0,0}, 244 {"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0}, 245 {"ping",pingCommand,-1,"tF",0,NULL,0,0,0,0,0}, 246 {"echo",echoCommand,2,"F",0,NULL,0,0,0,0,0}, 247 {"save",saveCommand,1,"as",0,NULL,0,0,0,0,0}, 248 {"bgsave",bgsaveCommand,-1,"as",0,NULL,0,0,0,0,0}, 249 {"bgrewriteaof",bgrewriteaofCommand,1,"as",0,NULL,0,0,0,0,0}, 250 {"shutdown",shutdownCommand,-1,"aslt",0,NULL,0,0,0,0,0}, 251 {"lastsave",lastsaveCommand,1,"RF",0,NULL,0,0,0,0,0}, 252 {"type",typeCommand,2,"rF",0,NULL,1,1,1,0,0}, 253 {"multi",multiCommand,1,"sF",0,NULL,0,0,0,0,0}, 254 {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0}, 255 {"discard",discardCommand,1,"sF",0,NULL,0,0,0,0,0}, 256 {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0}, 257 {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0}, 258 {"replconf",replconfCommand,-1,"aslt",0,NULL,0,0,0,0,0}, 259 {"flushdb",flushdbCommand,-1,"w",0,NULL,0,0,0,0,0}, 260 {"flushall",flushallCommand,-1,"w",0,NULL,0,0,0,0,0}, 261 {"sort",sortCommand,-2,"wm",0,sortGetKeys,1,1,1,0,0}, 262 {"info",infoCommand,-1,"ltR",0,NULL,0,0,0,0,0}, 263 {"monitor",monitorCommand,1,"as",0,NULL,0,0,0,0,0}, 264 {"ttl",ttlCommand,2,"rFR",0,NULL,1,1,1,0,0}, 265 {"touch",touchCommand,-2,"rF",0,NULL,1,1,1,0,0}, 266 {"pttl",pttlCommand,2,"rFR",0,NULL,1,1,1,0,0}, 267 {"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0}, 268 {"slaveof",replicaofCommand,3,"ast",0,NULL,0,0,0,0,0}, 269 {"replicaof",replicaofCommand,3,"ast",0,NULL,0,0,0,0,0}, 270 {"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0}, 271 {"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0}, 272 {"config",configCommand,-2,"last",0,NULL,0,0,0,0,0}, 273 {"subscribe",subscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0}, 274 {"unsubscribe",unsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0}, 275 {"psubscribe",psubscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0}, 276 {"punsubscribe",punsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0}, 277 {"publish",publishCommand,3,"pltF",0,NULL,0,0,0,0,0}, 278 {"pubsub",pubsubCommand,-2,"pltR",0,NULL,0,0,0,0,0}, 279 {"watch",watchCommand,-2,"sF",0,NULL,1,-1,1,0,0}, 280 {"unwatch",unwatchCommand,1,"sF",0,NULL,0,0,0,0,0}, 281 {"cluster",clusterCommand,-2,"a",0,NULL,0,0,0,0,0}, 282 {"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0}, 283 {"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0}, 284 {"migrate",migrateCommand,-6,"wR",0,migrateGetKeys,0,0,0,0,0}, 285 {"asking",askingCommand,1,"F",0,NULL,0,0,0,0,0}, 286 {"readonly",readonlyCommand,1,"F",0,NULL,0,0,0,0,0}, 287 {"readwrite",readwriteCommand,1,"F",0,NULL,0,0,0,0,0}, 288 {"dump",dumpCommand,2,"rR",0,NULL,1,1,1,0,0}, 289 {"object",objectCommand,-2,"rR",0,NULL,2,2,1,0,0}, 290 {"memory",memoryCommand,-2,"rR",0,NULL,0,0,0,0,0}, 291 {"client",clientCommand,-2,"as",0,NULL,0,0,0,0,0}, 292 {"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0}, 293 {"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0}, 294 {"slowlog",slowlogCommand,-2,"aR",0,NULL,0,0,0,0,0}, 295 {"script",scriptCommand,-2,"s",0,NULL,0,0,0,0,0}, 296 {"time",timeCommand,1,"RF",0,NULL,0,0,0,0,0}, 297 {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0}, 298 {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0}, 299 {"bitpos",bitposCommand,-3,"r",0,NULL,1,1,1,0,0}, 300 {"wait",waitCommand,3,"s",0,NULL,0,0,0,0,0}, 301 {"command",commandCommand,0,"ltR",0,NULL,0,0,0,0,0}, 302 {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0}, 303 {"georadius",georadiusCommand,-6,"w",0,georadiusGetKeys,1,1,1,0,0}, 304 {"georadius_ro",georadiusroCommand,-6,"r",0,georadiusGetKeys,1,1,1,0,0}, 305 {"georadiusbymember",georadiusbymemberCommand,-5,"w",0,georadiusGetKeys,1,1,1,0,0}, 306 {"georadiusbymember_ro",georadiusbymemberroCommand,-5,"r",0,georadiusGetKeys,1,1,1,0,0}, 307 {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0}, 308 {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0}, 309 {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0}, 310 {"pfselftest",pfselftestCommand,1,"a",0,NULL,0,0,0,0,0}, 311 {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0}, 312 {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0}, 313 {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0}, 314 {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0}, 315 {"xadd",xaddCommand,-5,"wmFR",0,NULL,1,1,1,0,0}, 316 {"xrange",xrangeCommand,-4,"r",0,NULL,1,1,1,0,0}, 317 {"xrevrange",xrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0}, 318 {"xlen",xlenCommand,2,"rF",0,NULL,1,1,1,0,0}, 319 {"xread",xreadCommand,-4,"rs",0,xreadGetKeys,1,1,1,0,0}, 320 {"xreadgroup",xreadCommand,-7,"ws",0,xreadGetKeys,1,1,1,0,0}, 321 {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0}, 322 {"xsetid",xsetidCommand,3,"wmF",0,NULL,1,1,1,0,0}, 323 {"xack",xackCommand,-4,"wF",0,NULL,1,1,1,0,0}, 324 {"xpending",xpendingCommand,-3,"rR",0,NULL,1,1,1,0,0}, 325 {"xclaim",xclaimCommand,-6,"wRF",0,NULL,1,1,1,0,0}, 326 {"xinfo",xinfoCommand,-2,"rR",0,NULL,2,2,1,0,0}, 327 {"xdel",xdelCommand,-3,"wF",0,NULL,1,1,1,0,0}, 328 {"xtrim",xtrimCommand,-2,"wFR",0,NULL,1,1,1,0,0}, 329 {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, 330 {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, 331 {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}, 332 {"lolwut",lolwutCommand,-1,"r",0,NULL,0,0,0,0,0} 333 }; 334 335 /*============================ Utility functions ============================ */ 336 337 /* We use a private localtime implementation which is fork-safe. The logging 338 * function of Redis may be called from other threads. */ 339 void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst); 340 341 /* Low level logging. To use only for very big messages, otherwise 342 * serverLog() is to prefer. */ 343 void serverLogRaw(int level, const char *msg) { 344 const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING }; 345 const char *c = ".-*#"; 346 FILE *fp; 347 char buf[64]; 348 int rawmode = (level & LL_RAW); 349 int log_to_stdout = server.logfile[0] == '\0'; 350 351 level &= 0xff; /* clear flags */ 352 if (level < server.verbosity) return; 353 354 fp = log_to_stdout ? stdout : fopen(server.logfile,"a"); 355 if (!fp) return; 356 357 if (rawmode) { 358 fprintf(fp,"%s",msg); 359 } else { 360 int off; 361 struct timeval tv; 362 int role_char; 363 pid_t pid = getpid(); 364 365 gettimeofday(&tv,NULL); 366 struct tm tm; 367 nolocks_localtime(&tm,tv.tv_sec,server.timezone,server.daylight_active); 368 off = strftime(buf,sizeof(buf),"%d %b %Y %H:%M:%S.",&tm); 369 snprintf(buf+off,sizeof(buf)-off,"%03d",(int)tv.tv_usec/1000); 370 if (server.sentinel_mode) { 371 role_char = 'X'; /* Sentinel. */ 372 } else if (pid != server.pid) { 373 role_char = 'C'; /* RDB / AOF writing child. */ 374 } else { 375 role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */ 376 } 377 fprintf(fp,"%d:%c %s %c %s\n", 378 (int)getpid(),role_char, buf,c[level],msg); 379 } 380 fflush(fp); 381 382 if (!log_to_stdout) fclose(fp); 383 if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg); 384 } 385 386 /* Like serverLogRaw() but with printf-alike support. This is the function that 387 * is used across the code. The raw version is only used in order to dump 388 * the INFO output on crash. */ 389 void serverLog(int level, const char *fmt, ...) { 390 va_list ap; 391 char msg[LOG_MAX_LEN]; 392 393 if ((level&0xff) < server.verbosity) return; 394 395 va_start(ap, fmt); 396 vsnprintf(msg, sizeof(msg), fmt, ap); 397 va_end(ap); 398 399 serverLogRaw(level,msg); 400 } 401 402 /* Log a fixed message without printf-alike capabilities, in a way that is 403 * safe to call from a signal handler. 404 * 405 * We actually use this only for signals that are not fatal from the point 406 * of view of Redis. Signals that are going to kill the server anyway and 407 * where we need printf-alike features are served by serverLog(). */ 408 void serverLogFromHandler(int level, const char *msg) { 409 int fd; 410 int log_to_stdout = server.logfile[0] == '\0'; 411 char buf[64]; 412 413 if ((level&0xff) < server.verbosity || (log_to_stdout && server.daemonize)) 414 return; 415 fd = log_to_stdout ? STDOUT_FILENO : 416 open(server.logfile, O_APPEND|O_CREAT|O_WRONLY, 0644); 417 if (fd == -1) return; 418 ll2string(buf,sizeof(buf),getpid()); 419 if (write(fd,buf,strlen(buf)) == -1) goto err; 420 if (write(fd,":signal-handler (",17) == -1) goto err; 421 ll2string(buf,sizeof(buf),time(NULL)); 422 if (write(fd,buf,strlen(buf)) == -1) goto err; 423 if (write(fd,") ",2) == -1) goto err; 424 if (write(fd,msg,strlen(msg)) == -1) goto err; 425 if (write(fd,"\n",1) == -1) goto err; 426 err: 427 if (!log_to_stdout) close(fd); 428 } 429 430 /* Return the UNIX time in microseconds */ 431 long long ustime(void) { 432 struct timeval tv; 433 long long ust; 434 435 gettimeofday(&tv, NULL); 436 ust = ((long long)tv.tv_sec)*1000000; 437 ust += tv.tv_usec; 438 return ust; 439 } 440 441 /* Return the UNIX time in milliseconds */ 442 mstime_t mstime(void) { 443 return ustime()/1000; 444 } 445 446 /* After an RDB dump or AOF rewrite we exit from children using _exit() instead of 447 * exit(), because the latter may interact with the same file objects used by 448 * the parent process. However if we are testing the coverage normal exit() is 449 * used in order to obtain the right coverage information. */ 450 void exitFromChild(int retcode) { 451 #ifdef COVERAGE_TEST 452 exit(retcode); 453 #else 454 _exit(retcode); 455 #endif 456 } 457 458 /*====================== Hash table type implementation ==================== */ 459 460 /* This is a hash table type that uses the SDS dynamic strings library as 461 * keys and redis objects as values (objects can hold SDS strings, 462 * lists, sets). */ 463 464 void dictVanillaFree(void *privdata, void *val) 465 { 466 DICT_NOTUSED(privdata); 467 zfree(val); 468 } 469 470 void dictListDestructor(void *privdata, void *val) 471 { 472 DICT_NOTUSED(privdata); 473 listRelease((list*)val); 474 } 475 476 int dictSdsKeyCompare(void *privdata, const void *key1, 477 const void *key2) 478 { 479 int l1,l2; 480 DICT_NOTUSED(privdata); 481 482 l1 = sdslen((sds)key1); 483 l2 = sdslen((sds)key2); 484 if (l1 != l2) return 0; 485 return memcmp(key1, key2, l1) == 0; 486 } 487 488 /* A case insensitive version used for the command lookup table and other 489 * places where case insensitive non binary-safe comparison is needed. */ 490 int dictSdsKeyCaseCompare(void *privdata, const void *key1, 491 const void *key2) 492 { 493 DICT_NOTUSED(privdata); 494 495 return strcasecmp(key1, key2) == 0; 496 } 497 498 void dictObjectDestructor(void *privdata, void *val) 499 { 500 DICT_NOTUSED(privdata); 501 502 if (val == NULL) return; /* Lazy freeing will set value to NULL. */ 503 decrRefCount(val); 504 } 505 506 void dictSdsDestructor(void *privdata, void *val) 507 { 508 DICT_NOTUSED(privdata); 509 510 sdsfree(val); 511 } 512 513 int dictObjKeyCompare(void *privdata, const void *key1, 514 const void *key2) 515 { 516 const robj *o1 = key1, *o2 = key2; 517 return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); 518 } 519 520 uint64_t dictObjHash(const void *key) { 521 const robj *o = key; 522 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); 523 } 524 525 uint64_t dictSdsHash(const void *key) { 526 return dictGenHashFunction((unsigned char*)key, sdslen((char*)key)); 527 } 528 529 uint64_t dictSdsCaseHash(const void *key) { 530 return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key)); 531 } 532 533 int dictEncObjKeyCompare(void *privdata, const void *key1, 534 const void *key2) 535 { 536 robj *o1 = (robj*) key1, *o2 = (robj*) key2; 537 int cmp; 538 539 if (o1->encoding == OBJ_ENCODING_INT && 540 o2->encoding == OBJ_ENCODING_INT) 541 return o1->ptr == o2->ptr; 542 543 o1 = getDecodedObject(o1); 544 o2 = getDecodedObject(o2); 545 cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); 546 decrRefCount(o1); 547 decrRefCount(o2); 548 return cmp; 549 } 550 551 uint64_t dictEncObjHash(const void *key) { 552 robj *o = (robj*) key; 553 554 if (sdsEncodedObject(o)) { 555 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); 556 } else { 557 if (o->encoding == OBJ_ENCODING_INT) { 558 char buf[32]; 559 int len; 560 561 len = ll2string(buf,32,(long)o->ptr); 562 return dictGenHashFunction((unsigned char*)buf, len); 563 } else { 564 uint64_t hash; 565 566 o = getDecodedObject(o); 567 hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); 568 decrRefCount(o); 569 return hash; 570 } 571 } 572 } 573 574 /* Generic hash table type where keys are Redis Objects, Values 575 * dummy pointers. */ 576 dictType objectKeyPointerValueDictType = { 577 dictEncObjHash, /* hash function */ 578 NULL, /* key dup */ 579 NULL, /* val dup */ 580 dictEncObjKeyCompare, /* key compare */ 581 dictObjectDestructor, /* key destructor */ 582 NULL /* val destructor */ 583 }; 584 585 /* Like objectKeyPointerValueDictType(), but values can be destroyed, if 586 * not NULL, calling zfree(). */ 587 dictType objectKeyHeapPointerValueDictType = { 588 dictEncObjHash, /* hash function */ 589 NULL, /* key dup */ 590 NULL, /* val dup */ 591 dictEncObjKeyCompare, /* key compare */ 592 dictObjectDestructor, /* key destructor */ 593 dictVanillaFree /* val destructor */ 594 }; 595 596 /* Set dictionary type. Keys are SDS strings, values are ot used. */ 597 dictType setDictType = { 598 dictSdsHash, /* hash function */ 599 NULL, /* key dup */ 600 NULL, /* val dup */ 601 dictSdsKeyCompare, /* key compare */ 602 dictSdsDestructor, /* key destructor */ 603 NULL /* val destructor */ 604 }; 605 606 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */ 607 dictType zsetDictType = { 608 dictSdsHash, /* hash function */ 609 NULL, /* key dup */ 610 NULL, /* val dup */ 611 dictSdsKeyCompare, /* key compare */ 612 NULL, /* Note: SDS string shared & freed by skiplist */ 613 NULL /* val destructor */ 614 }; 615 616 /* Db->dict, keys are sds strings, vals are Redis objects. */ 617 dictType dbDictType = { 618 dictSdsHash, /* hash function */ 619 NULL, /* key dup */ 620 NULL, /* val dup */ 621 dictSdsKeyCompare, /* key compare */ 622 dictSdsDestructor, /* key destructor */ 623 dictObjectDestructor /* val destructor */ 624 }; 625 626 /* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */ 627 dictType shaScriptObjectDictType = { 628 dictSdsCaseHash, /* hash function */ 629 NULL, /* key dup */ 630 NULL, /* val dup */ 631 dictSdsKeyCaseCompare, /* key compare */ 632 dictSdsDestructor, /* key destructor */ 633 dictObjectDestructor /* val destructor */ 634 }; 635 636 /* Db->expires */ 637 dictType keyptrDictType = { 638 dictSdsHash, /* hash function */ 639 NULL, /* key dup */ 640 NULL, /* val dup */ 641 dictSdsKeyCompare, /* key compare */ 642 NULL, /* key destructor */ 643 NULL /* val destructor */ 644 }; 645 646 /* Command table. sds string -> command struct pointer. */ 647 dictType commandTableDictType = { 648 dictSdsCaseHash, /* hash function */ 649 NULL, /* key dup */ 650 NULL, /* val dup */ 651 dictSdsKeyCaseCompare, /* key compare */ 652 dictSdsDestructor, /* key destructor */ 653 NULL /* val destructor */ 654 }; 655 656 /* Hash type hash table (note that small hashes are represented with ziplists) */ 657 dictType hashDictType = { 658 dictSdsHash, /* hash function */ 659 NULL, /* key dup */ 660 NULL, /* val dup */ 661 dictSdsKeyCompare, /* key compare */ 662 dictSdsDestructor, /* key destructor */ 663 dictSdsDestructor /* val destructor */ 664 }; 665 666 /* Keylist hash table type has unencoded redis objects as keys and 667 * lists as values. It's used for blocking operations (BLPOP) and to 668 * map swapped keys to a list of clients waiting for this keys to be loaded. */ 669 dictType keylistDictType = { 670 dictObjHash, /* hash function */ 671 NULL, /* key dup */ 672 NULL, /* val dup */ 673 dictObjKeyCompare, /* key compare */ 674 dictObjectDestructor, /* key destructor */ 675 dictListDestructor /* val destructor */ 676 }; 677 678 /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to 679 * clusterNode structures. */ 680 dictType clusterNodesDictType = { 681 dictSdsHash, /* hash function */ 682 NULL, /* key dup */ 683 NULL, /* val dup */ 684 dictSdsKeyCompare, /* key compare */ 685 dictSdsDestructor, /* key destructor */ 686 NULL /* val destructor */ 687 }; 688 689 /* Cluster re-addition blacklist. This maps node IDs to the time 690 * we can re-add this node. The goal is to avoid readding a removed 691 * node for some time. */ 692 dictType clusterNodesBlackListDictType = { 693 dictSdsCaseHash, /* hash function */ 694 NULL, /* key dup */ 695 NULL, /* val dup */ 696 dictSdsKeyCaseCompare, /* key compare */ 697 dictSdsDestructor, /* key destructor */ 698 NULL /* val destructor */ 699 }; 700 701 /* Cluster re-addition blacklist. This maps node IDs to the time 702 * we can re-add this node. The goal is to avoid readding a removed 703 * node for some time. */ 704 dictType modulesDictType = { 705 dictSdsCaseHash, /* hash function */ 706 NULL, /* key dup */ 707 NULL, /* val dup */ 708 dictSdsKeyCaseCompare, /* key compare */ 709 dictSdsDestructor, /* key destructor */ 710 NULL /* val destructor */ 711 }; 712 713 /* Migrate cache dict type. */ 714 dictType migrateCacheDictType = { 715 dictSdsHash, /* hash function */ 716 NULL, /* key dup */ 717 NULL, /* val dup */ 718 dictSdsKeyCompare, /* key compare */ 719 dictSdsDestructor, /* key destructor */ 720 NULL /* val destructor */ 721 }; 722 723 /* Replication cached script dict (server.repl_scriptcache_dict). 724 * Keys are sds SHA1 strings, while values are not used at all in the current 725 * implementation. */ 726 dictType replScriptCacheDictType = { 727 dictSdsCaseHash, /* hash function */ 728 NULL, /* key dup */ 729 NULL, /* val dup */ 730 dictSdsKeyCaseCompare, /* key compare */ 731 dictSdsDestructor, /* key destructor */ 732 NULL /* val destructor */ 733 }; 734 735 int htNeedsResize(dict *dict) { 736 long long size, used; 737 738 size = dictSlots(dict); 739 used = dictSize(dict); 740 return (size > DICT_HT_INITIAL_SIZE && 741 (used*100/size < HASHTABLE_MIN_FILL)); 742 } 743 744 /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL 745 * we resize the hash table to save memory */ 746 void tryResizeHashTables(int dbid) { 747 if (htNeedsResize(server.db[dbid].dict)) 748 dictResize(server.db[dbid].dict); 749 if (htNeedsResize(server.db[dbid].expires)) 750 dictResize(server.db[dbid].expires); 751 } 752 753 /* Our hash table implementation performs rehashing incrementally while 754 * we write/read from the hash table. Still if the server is idle, the hash 755 * table will use two tables for a long time. So we try to use 1 millisecond 756 * of CPU time at every call of this function to perform some rehahsing. 757 * 758 * The function returns 1 if some rehashing was performed, otherwise 0 759 * is returned. */ 760 int incrementallyRehash(int dbid) { 761 /* Keys dictionary */ 762 if (dictIsRehashing(server.db[dbid].dict)) { 763 dictRehashMilliseconds(server.db[dbid].dict,1); 764 return 1; /* already used our millisecond for this loop... */ 765 } 766 /* Expires */ 767 if (dictIsRehashing(server.db[dbid].expires)) { 768 dictRehashMilliseconds(server.db[dbid].expires,1); 769 return 1; /* already used our millisecond for this loop... */ 770 } 771 return 0; 772 } 773 774 /* This function is called once a background process of some kind terminates, 775 * as we want to avoid resizing the hash tables when there is a child in order 776 * to play well with copy-on-write (otherwise when a resize happens lots of 777 * memory pages are copied). The goal of this function is to update the ability 778 * for dict.c to resize the hash tables accordingly to the fact we have o not 779 * running childs. */ 780 void updateDictResizePolicy(void) { 781 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) 782 dictEnableResize(); 783 else 784 dictDisableResize(); 785 } 786 787 /* ======================= Cron: called every 100 ms ======================== */ 788 789 /* Add a sample to the operations per second array of samples. */ 790 void trackInstantaneousMetric(int metric, long long current_reading) { 791 long long t = mstime() - server.inst_metric[metric].last_sample_time; 792 long long ops = current_reading - 793 server.inst_metric[metric].last_sample_count; 794 long long ops_sec; 795 796 ops_sec = t > 0 ? (ops*1000/t) : 0; 797 798 server.inst_metric[metric].samples[server.inst_metric[metric].idx] = 799 ops_sec; 800 server.inst_metric[metric].idx++; 801 server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES; 802 server.inst_metric[metric].last_sample_time = mstime(); 803 server.inst_metric[metric].last_sample_count = current_reading; 804 } 805 806 /* Return the mean of all the samples. */ 807 long long getInstantaneousMetric(int metric) { 808 int j; 809 long long sum = 0; 810 811 for (j = 0; j < STATS_METRIC_SAMPLES; j++) 812 sum += server.inst_metric[metric].samples[j]; 813 return sum / STATS_METRIC_SAMPLES; 814 } 815 816 /* Check for timeouts. Returns non-zero if the client was terminated. 817 * The function gets the current time in milliseconds as argument since 818 * it gets called multiple times in a loop, so calling gettimeofday() for 819 * each iteration would be costly without any actual gain. */ 820 int clientsCronHandleTimeout(client *c, mstime_t now_ms) { 821 time_t now = now_ms/1000; 822 823 if (server.maxidletime && 824 !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */ 825 !(c->flags & CLIENT_MASTER) && /* no timeout for masters */ 826 !(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */ 827 !(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */ 828 (now - c->lastinteraction > server.maxidletime)) 829 { 830 serverLog(LL_VERBOSE,"Closing idle client"); 831 freeClient(c); 832 return 1; 833 } else if (c->flags & CLIENT_BLOCKED) { 834 /* Blocked OPS timeout is handled with milliseconds resolution. 835 * However note that the actual resolution is limited by 836 * server.hz. */ 837 838 if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) { 839 /* Handle blocking operation specific timeout. */ 840 replyToBlockedClientTimedOut(c); 841 unblockClient(c); 842 } else if (server.cluster_enabled) { 843 /* Cluster: handle unblock & redirect of clients blocked 844 * into keys no longer served by this server. */ 845 if (clusterRedirectBlockedClientIfNeeded(c)) 846 unblockClient(c); 847 } 848 } 849 return 0; 850 } 851 852 /* The client query buffer is an sds.c string that can end with a lot of 853 * free space not used, this function reclaims space if needed. 854 * 855 * The function always returns 0 as it never terminates the client. */ 856 int clientsCronResizeQueryBuffer(client *c) { 857 size_t querybuf_size = sdsAllocSize(c->querybuf); 858 time_t idletime = server.unixtime - c->lastinteraction; 859 860 /* There are two conditions to resize the query buffer: 861 * 1) Query buffer is > BIG_ARG and too big for latest peak. 862 * 2) Query buffer is > BIG_ARG and client is idle. */ 863 if (querybuf_size > PROTO_MBULK_BIG_ARG && 864 ((querybuf_size/(c->querybuf_peak+1)) > 2 || 865 idletime > 2)) 866 { 867 /* Only resize the query buffer if it is actually wasting 868 * at least a few kbytes. */ 869 if (sdsavail(c->querybuf) > 1024*4) { 870 c->querybuf = sdsRemoveFreeSpace(c->querybuf); 871 } 872 } 873 /* Reset the peak again to capture the peak memory usage in the next 874 * cycle. */ 875 c->querybuf_peak = 0; 876 877 /* Clients representing masters also use a "pending query buffer" that 878 * is the yet not applied part of the stream we are reading. Such buffer 879 * also needs resizing from time to time, otherwise after a very large 880 * transfer (a huge value or a big MIGRATE operation) it will keep using 881 * a lot of memory. */ 882 if (c->flags & CLIENT_MASTER) { 883 /* There are two conditions to resize the pending query buffer: 884 * 1) Pending Query buffer is > LIMIT_PENDING_QUERYBUF. 885 * 2) Used length is smaller than pending_querybuf_size/2 */ 886 size_t pending_querybuf_size = sdsAllocSize(c->pending_querybuf); 887 if(pending_querybuf_size > LIMIT_PENDING_QUERYBUF && 888 sdslen(c->pending_querybuf) < (pending_querybuf_size/2)) 889 { 890 c->pending_querybuf = sdsRemoveFreeSpace(c->pending_querybuf); 891 } 892 } 893 return 0; 894 } 895 896 /* This function is used in order to track clients using the biggest amount 897 * of memory in the latest few seconds. This way we can provide such information 898 * in the INFO output (clients section), without having to do an O(N) scan for 899 * all the clients. 900 * 901 * This is how it works. We have an array of CLIENTS_PEAK_MEM_USAGE_SLOTS slots 902 * where we track, for each, the biggest client output and input buffers we 903 * saw in that slot. Every slot correspond to one of the latest seconds, since 904 * the array is indexed by doing UNIXTIME % CLIENTS_PEAK_MEM_USAGE_SLOTS. 905 * 906 * When we want to know what was recently the peak memory usage, we just scan 907 * such few slots searching for the maximum value. */ 908 #define CLIENTS_PEAK_MEM_USAGE_SLOTS 8 909 size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS]; 910 size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS]; 911 912 int clientsCronTrackExpansiveClients(client *c) { 913 size_t in_usage = sdsAllocSize(c->querybuf); 914 size_t out_usage = getClientOutputBufferMemoryUsage(c); 915 int i = server.unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS; 916 int zeroidx = (i+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS; 917 918 /* Always zero the next sample, so that when we switch to that second, we'll 919 * only register samples that are greater in that second without considering 920 * the history of such slot. 921 * 922 * Note: our index may jump to any random position if serverCron() is not 923 * called for some reason with the normal frequency, for instance because 924 * some slow command is called taking multiple seconds to execute. In that 925 * case our array may end containing data which is potentially older 926 * than CLIENTS_PEAK_MEM_USAGE_SLOTS seconds: however this is not a problem 927 * since here we want just to track if "recently" there were very expansive 928 * clients from the POV of memory usage. */ 929 ClientsPeakMemInput[zeroidx] = 0; 930 ClientsPeakMemOutput[zeroidx] = 0; 931 932 /* Track the biggest values observed so far in this slot. */ 933 if (in_usage > ClientsPeakMemInput[i]) ClientsPeakMemInput[i] = in_usage; 934 if (out_usage > ClientsPeakMemOutput[i]) ClientsPeakMemOutput[i] = out_usage; 935 936 return 0; /* This function never terminates the client. */ 937 } 938 939 /* Return the max samples in the memory usage of clients tracked by 940 * the function clientsCronTrackExpansiveClients(). */ 941 void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) { 942 size_t i = 0, o = 0; 943 for (int j = 0; j < CLIENTS_PEAK_MEM_USAGE_SLOTS; j++) { 944 if (ClientsPeakMemInput[j] > i) i = ClientsPeakMemInput[j]; 945 if (ClientsPeakMemOutput[j] > o) o = ClientsPeakMemOutput[j]; 946 } 947 *in_usage = i; 948 *out_usage = o; 949 } 950 951 /* This function is called by serverCron() and is used in order to perform 952 * operations on clients that are important to perform constantly. For instance 953 * we use this function in order to disconnect clients after a timeout, including 954 * clients blocked in some blocking command with a non-zero timeout. 955 * 956 * The function makes some effort to process all the clients every second, even 957 * if this cannot be strictly guaranteed, since serverCron() may be called with 958 * an actual frequency lower than server.hz in case of latency events like slow 959 * commands. 960 * 961 * It is very important for this function, and the functions it calls, to be 962 * very fast: sometimes Redis has tens of hundreds of connected clients, and the 963 * default server.hz value is 10, so sometimes here we need to process thousands 964 * of clients per second, turning this function into a source of latency. 965 */ 966 #define CLIENTS_CRON_MIN_ITERATIONS 5 967 void clientsCron(void) { 968 /* Try to process at least numclients/server.hz of clients 969 * per call. Since normally (if there are no big latency events) this 970 * function is called server.hz times per second, in the average case we 971 * process all the clients in 1 second. */ 972 int numclients = listLength(server.clients); 973 int iterations = numclients/server.hz; 974 mstime_t now = mstime(); 975 976 /* Process at least a few clients while we are at it, even if we need 977 * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract 978 * of processing each client once per second. */ 979 if (iterations < CLIENTS_CRON_MIN_ITERATIONS) 980 iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ? 981 numclients : CLIENTS_CRON_MIN_ITERATIONS; 982 983 while(listLength(server.clients) && iterations--) { 984 client *c; 985 listNode *head; 986 987 /* Rotate the list, take the current head, process. 988 * This way if the client must be removed from the list it's the 989 * first element and we don't incur into O(N) computation. */ 990 listRotate(server.clients); 991 head = listFirst(server.clients); 992 c = listNodeValue(head); 993 /* The following functions do different service checks on the client. 994 * The protocol is that they return non-zero if the client was 995 * terminated. */ 996 if (clientsCronHandleTimeout(c,now)) continue; 997 if (clientsCronResizeQueryBuffer(c)) continue; 998 if (clientsCronTrackExpansiveClients(c)) continue; 999 } 1000 } 1001 1002 /* This function handles 'background' operations we are required to do 1003 * incrementally in Redis databases, such as active key expiring, resizing, 1004 * rehashing. */ 1005 void databasesCron(void) { 1006 /* Expire keys by random sampling. Not required for slaves 1007 * as master will synthesize DELs for us. */ 1008 if (server.active_expire_enabled) { 1009 if (server.masterhost == NULL) { 1010 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); 1011 } else { 1012 expireSlaveKeys(); 1013 } 1014 } 1015 1016 /* Defrag keys gradually. */ 1017 if (server.active_defrag_enabled) 1018 activeDefragCycle(); 1019 1020 /* Perform hash tables rehashing if needed, but only if there are no 1021 * other processes saving the DB on disk. Otherwise rehashing is bad 1022 * as will cause a lot of copy-on-write of memory pages. */ 1023 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { 1024 /* We use global counters so if we stop the computation at a given 1025 * DB we'll be able to start from the successive in the next 1026 * cron loop iteration. */ 1027 static unsigned int resize_db = 0; 1028 static unsigned int rehash_db = 0; 1029 int dbs_per_call = CRON_DBS_PER_CALL; 1030 int j; 1031 1032 /* Don't test more DBs than we have. */ 1033 if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; 1034 1035 /* Resize */ 1036 for (j = 0; j < dbs_per_call; j++) { 1037 tryResizeHashTables(resize_db % server.dbnum); 1038 resize_db++; 1039 } 1040 1041 /* Rehash */ 1042 if (server.activerehashing) { 1043 for (j = 0; j < dbs_per_call; j++) { 1044 int work_done = incrementallyRehash(rehash_db); 1045 if (work_done) { 1046 /* If the function did some work, stop here, we'll do 1047 * more at the next cron loop. */ 1048 break; 1049 } else { 1050 /* If this db didn't need rehash, we'll try the next one. */ 1051 rehash_db++; 1052 rehash_db %= server.dbnum; 1053 } 1054 } 1055 } 1056 } 1057 } 1058 1059 /* We take a cached value of the unix time in the global state because with 1060 * virtual memory and aging there is to store the current time in objects at 1061 * every object access, and accuracy is not needed. To access a global var is 1062 * a lot faster than calling time(NULL) */ 1063 void updateCachedTime(void) { 1064 time_t unixtime = time(NULL); 1065 atomicSet(server.unixtime,unixtime); 1066 server.mstime = mstime(); 1067 1068 /* To get information about daylight saving time, we need to call localtime_r 1069 * and cache the result. However calling localtime_r in this context is safe 1070 * since we will never fork() while here, in the main thread. The logging 1071 * function will call a thread safe version of localtime that has no locks. */ 1072 struct tm tm; 1073 localtime_r(&server.unixtime,&tm); 1074 server.daylight_active = tm.tm_isdst; 1075 } 1076 1077 /* This is our timer interrupt, called server.hz times per second. 1078 * Here is where we do a number of things that need to be done asynchronously. 1079 * For instance: 1080 * 1081 * - Active expired keys collection (it is also performed in a lazy way on 1082 * lookup). 1083 * - Software watchdog. 1084 * - Update some statistic. 1085 * - Incremental rehashing of the DBs hash tables. 1086 * - Triggering BGSAVE / AOF rewrite, and handling of terminated children. 1087 * - Clients timeout of different kinds. 1088 * - Replication reconnection. 1089 * - Many more... 1090 * 1091 * Everything directly called here will be called server.hz times per second, 1092 * so in order to throttle execution of things we want to do less frequently 1093 * a macro is used: run_with_period(milliseconds) { .... } 1094 */ 1095 1096 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { 1097 int j; 1098 UNUSED(eventLoop); 1099 UNUSED(id); 1100 UNUSED(clientData); 1101 1102 /* Software watchdog: deliver the SIGALRM that will reach the signal 1103 * handler if we don't return here fast enough. */ 1104 if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); 1105 1106 /* Update the time cache. */ 1107 updateCachedTime(); 1108 1109 server.hz = server.config_hz; 1110 /* Adapt the server.hz value to the number of configured clients. If we have 1111 * many clients, we want to call serverCron() with an higher frequency. */ 1112 if (server.dynamic_hz) { 1113 while (listLength(server.clients) / server.hz > 1114 MAX_CLIENTS_PER_CLOCK_TICK) 1115 { 1116 server.hz *= 2; 1117 if (server.hz > CONFIG_MAX_HZ) { 1118 server.hz = CONFIG_MAX_HZ; 1119 break; 1120 } 1121 } 1122 } 1123 1124 run_with_period(100) { 1125 trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands); 1126 trackInstantaneousMetric(STATS_METRIC_NET_INPUT, 1127 server.stat_net_input_bytes); 1128 trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, 1129 server.stat_net_output_bytes); 1130 } 1131 1132 /* We have just LRU_BITS bits per object for LRU information. 1133 * So we use an (eventually wrapping) LRU clock. 1134 * 1135 * Note that even if the counter wraps it's not a big problem, 1136 * everything will still work but some object will appear younger 1137 * to Redis. However for this to happen a given object should never be 1138 * touched for all the time needed to the counter to wrap, which is 1139 * not likely. 1140 * 1141 * Note that you can change the resolution altering the 1142 * LRU_CLOCK_RESOLUTION define. */ 1143 unsigned long lruclock = getLRUClock(); 1144 atomicSet(server.lruclock,lruclock); 1145 1146 /* Record the max memory used since the server was started. */ 1147 if (zmalloc_used_memory() > server.stat_peak_memory) 1148 server.stat_peak_memory = zmalloc_used_memory(); 1149 1150 run_with_period(100) { 1151 /* Sample the RSS and other metrics here since this is a relatively slow call. 1152 * We must sample the zmalloc_used at the same time we take the rss, otherwise 1153 * the frag ratio calculate may be off (ratio of two samples at different times) */ 1154 server.cron_malloc_stats.process_rss = zmalloc_get_rss(); 1155 server.cron_malloc_stats.zmalloc_used = zmalloc_used_memory(); 1156 /* Sampling the allcator info can be slow too. 1157 * The fragmentation ratio it'll show is potentically more accurate 1158 * it excludes other RSS pages such as: shared libraries, LUA and other non-zmalloc 1159 * allocations, and allocator reserved pages that can be pursed (all not actual frag) */ 1160 zmalloc_get_allocator_info(&server.cron_malloc_stats.allocator_allocated, 1161 &server.cron_malloc_stats.allocator_active, 1162 &server.cron_malloc_stats.allocator_resident); 1163 /* in case the allocator isn't providing these stats, fake them so that 1164 * fragmention info still shows some (inaccurate metrics) */ 1165 if (!server.cron_malloc_stats.allocator_resident) { 1166 /* LUA memory isn't part of zmalloc_used, but it is part of the process RSS, 1167 * so we must desuct it in order to be able to calculate correct 1168 * "allocator fragmentation" ratio */ 1169 size_t lua_memory = lua_gc(server.lua,LUA_GCCOUNT,0)*1024LL; 1170 server.cron_malloc_stats.allocator_resident = server.cron_malloc_stats.process_rss - lua_memory; 1171 } 1172 if (!server.cron_malloc_stats.allocator_active) 1173 server.cron_malloc_stats.allocator_active = server.cron_malloc_stats.allocator_resident; 1174 if (!server.cron_malloc_stats.allocator_allocated) 1175 server.cron_malloc_stats.allocator_allocated = server.cron_malloc_stats.zmalloc_used; 1176 } 1177 1178 /* We received a SIGTERM, shutting down here in a safe way, as it is 1179 * not ok doing so inside the signal handler. */ 1180 if (server.shutdown_asap) { 1181 if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0); 1182 serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); 1183 server.shutdown_asap = 0; 1184 } 1185 1186 /* Show some info about non-empty databases */ 1187 run_with_period(5000) { 1188 for (j = 0; j < server.dbnum; j++) { 1189 long long size, used, vkeys; 1190 1191 size = dictSlots(server.db[j].dict); 1192 used = dictSize(server.db[j].dict); 1193 vkeys = dictSize(server.db[j].expires); 1194 if (used || vkeys) { 1195 serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); 1196 /* dictPrintStats(server.dict); */ 1197 } 1198 } 1199 } 1200 1201 /* Show information about connected clients */ 1202 if (!server.sentinel_mode) { 1203 run_with_period(5000) { 1204 serverLog(LL_VERBOSE, 1205 "%lu clients connected (%lu replicas), %zu bytes in use", 1206 listLength(server.clients)-listLength(server.slaves), 1207 listLength(server.slaves), 1208 zmalloc_used_memory()); 1209 } 1210 } 1211 1212 /* We need to do a few operations on clients asynchronously. */ 1213 clientsCron(); 1214 1215 /* Handle background operations on Redis databases. */ 1216 databasesCron(); 1217 1218 /* Start a scheduled AOF rewrite if this was requested by the user while 1219 * a BGSAVE was in progress. */ 1220 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && 1221 server.aof_rewrite_scheduled) 1222 { 1223 rewriteAppendOnlyFileBackground(); 1224 } 1225 1226 /* Check if a background saving or AOF rewrite in progress terminated. */ 1227 if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || 1228 ldbPendingChildren()) 1229 { 1230 int statloc; 1231 pid_t pid; 1232 1233 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { 1234 int exitcode = WEXITSTATUS(statloc); 1235 int bysignal = 0; 1236 1237 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); 1238 1239 if (pid == -1) { 1240 serverLog(LL_WARNING,"wait3() returned an error: %s. " 1241 "rdb_child_pid = %d, aof_child_pid = %d", 1242 strerror(errno), 1243 (int) server.rdb_child_pid, 1244 (int) server.aof_child_pid); 1245 } else if (pid == server.rdb_child_pid) { 1246 backgroundSaveDoneHandler(exitcode,bysignal); 1247 if (!bysignal && exitcode == 0) receiveChildInfo(); 1248 } else if (pid == server.aof_child_pid) { 1249 backgroundRewriteDoneHandler(exitcode,bysignal); 1250 if (!bysignal && exitcode == 0) receiveChildInfo(); 1251 } else { 1252 if (!ldbRemoveChild(pid)) { 1253 serverLog(LL_WARNING, 1254 "Warning, detected child with unmatched pid: %ld", 1255 (long)pid); 1256 } 1257 } 1258 updateDictResizePolicy(); 1259 closeChildInfoPipe(); 1260 } 1261 } else { 1262 /* If there is not a background saving/rewrite in progress check if 1263 * we have to save/rewrite now. */ 1264 for (j = 0; j < server.saveparamslen; j++) { 1265 struct saveparam *sp = server.saveparams+j; 1266 1267 /* Save if we reached the given amount of changes, 1268 * the given amount of seconds, and if the latest bgsave was 1269 * successful or if, in case of an error, at least 1270 * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */ 1271 if (server.dirty >= sp->changes && 1272 server.unixtime-server.lastsave > sp->seconds && 1273 (server.unixtime-server.lastbgsave_try > 1274 CONFIG_BGSAVE_RETRY_DELAY || 1275 server.lastbgsave_status == C_OK)) 1276 { 1277 serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", 1278 sp->changes, (int)sp->seconds); 1279 rdbSaveInfo rsi, *rsiptr; 1280 rsiptr = rdbPopulateSaveInfo(&rsi); 1281 rdbSaveBackground(server.rdb_filename,rsiptr); 1282 break; 1283 } 1284 } 1285 1286 /* Trigger an AOF rewrite if needed. */ 1287 if (server.aof_state == AOF_ON && 1288 server.rdb_child_pid == -1 && 1289 server.aof_child_pid == -1 && 1290 server.aof_rewrite_perc && 1291 server.aof_current_size > server.aof_rewrite_min_size) 1292 { 1293 long long base = server.aof_rewrite_base_size ? 1294 server.aof_rewrite_base_size : 1; 1295 long long growth = (server.aof_current_size*100/base) - 100; 1296 if (growth >= server.aof_rewrite_perc) { 1297 serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); 1298 rewriteAppendOnlyFileBackground(); 1299 } 1300 } 1301 } 1302 1303 1304 /* AOF postponed flush: Try at every cron cycle if the slow fsync 1305 * completed. */ 1306 if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); 1307 1308 /* AOF write errors: in this case we have a buffer to flush as well and 1309 * clear the AOF error in case of success to make the DB writable again, 1310 * however to try every second is enough in case of 'hz' is set to 1311 * an higher frequency. */ 1312 run_with_period(1000) { 1313 if (server.aof_last_write_status == C_ERR) 1314 flushAppendOnlyFile(0); 1315 } 1316 1317 /* Close clients that need to be closed asynchronous */ 1318 freeClientsInAsyncFreeQueue(); 1319 1320 /* Clear the paused clients flag if needed. */ 1321 clientsArePaused(); /* Don't check return value, just use the side effect.*/ 1322 1323 /* Replication cron function -- used to reconnect to master, 1324 * detect transfer failures, start background RDB transfers and so forth. */ 1325 run_with_period(1000) replicationCron(); 1326 1327 /* Run the Redis Cluster cron. */ 1328 run_with_period(100) { 1329 if (server.cluster_enabled) clusterCron(); 1330 } 1331 1332 /* Run the Sentinel timer if we are in sentinel mode. */ 1333 if (server.sentinel_mode) sentinelTimer(); 1334 1335 /* Cleanup expired MIGRATE cached sockets. */ 1336 run_with_period(1000) { 1337 migrateCloseTimedoutSockets(); 1338 } 1339 1340 /* Start a scheduled BGSAVE if the corresponding flag is set. This is 1341 * useful when we are forced to postpone a BGSAVE because an AOF 1342 * rewrite is in progress. 1343 * 1344 * Note: this code must be after the replicationCron() call above so 1345 * make sure when refactoring this file to keep this order. This is useful 1346 * because we want to give priority to RDB savings for replication. */ 1347 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && 1348 server.rdb_bgsave_scheduled && 1349 (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || 1350 server.lastbgsave_status == C_OK)) 1351 { 1352 rdbSaveInfo rsi, *rsiptr; 1353 rsiptr = rdbPopulateSaveInfo(&rsi); 1354 if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) 1355 server.rdb_bgsave_scheduled = 0; 1356 } 1357 1358 server.cronloops++; 1359 return 1000/server.hz; 1360 } 1361 1362 /* This function gets called every time Redis is entering the 1363 * main loop of the event driven library, that is, before to sleep 1364 * for ready file descriptors. */ 1365 void beforeSleep(struct aeEventLoop *eventLoop) { 1366 UNUSED(eventLoop); 1367 1368 /* Call the Redis Cluster before sleep function. Note that this function 1369 * may change the state of Redis Cluster (from ok to fail or vice versa), 1370 * so it's a good idea to call it before serving the unblocked clients 1371 * later in this function. */ 1372 if (server.cluster_enabled) clusterBeforeSleep(); 1373 1374 /* Run a fast expire cycle (the called function will return 1375 * ASAP if a fast cycle is not needed). */ 1376 if (server.active_expire_enabled && server.masterhost == NULL) 1377 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); 1378 1379 /* Send all the slaves an ACK request if at least one client blocked 1380 * during the previous event loop iteration. */ 1381 if (server.get_ack_from_slaves) { 1382 robj *argv[3]; 1383 1384 argv[0] = createStringObject("REPLCONF",8); 1385 argv[1] = createStringObject("GETACK",6); 1386 argv[2] = createStringObject("*",1); /* Not used argument. */ 1387 replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); 1388 decrRefCount(argv[0]); 1389 decrRefCount(argv[1]); 1390 decrRefCount(argv[2]); 1391 server.get_ack_from_slaves = 0; 1392 } 1393 1394 /* Unblock all the clients blocked for synchronous replication 1395 * in WAIT. */ 1396 if (listLength(server.clients_waiting_acks)) 1397 processClientsWaitingReplicas(); 1398 1399 /* Check if there are clients unblocked by modules that implement 1400 * blocking commands. */ 1401 moduleHandleBlockedClients(); 1402 1403 /* Try to process pending commands for clients that were just unblocked. */ 1404 if (listLength(server.unblocked_clients)) 1405 processUnblockedClients(); 1406 1407 /* Write the AOF buffer on disk */ 1408 flushAppendOnlyFile(0); 1409 1410 /* Handle writes with pending output buffers. */ 1411 handleClientsWithPendingWrites(); 1412 1413 /* Before we are going to sleep, let the threads access the dataset by 1414 * releasing the GIL. Redis main thread will not touch anything at this 1415 * time. */ 1416 if (moduleCount()) moduleReleaseGIL(); 1417 } 1418 1419 /* This function is called immadiately after the event loop multiplexing 1420 * API returned, and the control is going to soon return to Redis by invoking 1421 * the different events callbacks. */ 1422 void afterSleep(struct aeEventLoop *eventLoop) { 1423 UNUSED(eventLoop); 1424 if (moduleCount()) moduleAcquireGIL(); 1425 } 1426 1427 /* =========================== Server initialization ======================== */ 1428 1429 void createSharedObjects(void) { 1430 int j; 1431 1432 shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n")); 1433 shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n")); 1434 shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n")); 1435 shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n")); 1436 shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n")); 1437 shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n")); 1438 shared.cnegone = createObject(OBJ_STRING,sdsnew(":-1\r\n")); 1439 shared.nullbulk = createObject(OBJ_STRING,sdsnew("$-1\r\n")); 1440 shared.nullmultibulk = createObject(OBJ_STRING,sdsnew("*-1\r\n")); 1441 shared.emptymultibulk = createObject(OBJ_STRING,sdsnew("*0\r\n")); 1442 shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n")); 1443 shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n")); 1444 shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n")); 1445 shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew( 1446 "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")); 1447 shared.nokeyerr = createObject(OBJ_STRING,sdsnew( 1448 "-ERR no such key\r\n")); 1449 shared.syntaxerr = createObject(OBJ_STRING,sdsnew( 1450 "-ERR syntax error\r\n")); 1451 shared.sameobjecterr = createObject(OBJ_STRING,sdsnew( 1452 "-ERR source and destination objects are the same\r\n")); 1453 shared.outofrangeerr = createObject(OBJ_STRING,sdsnew( 1454 "-ERR index out of range\r\n")); 1455 shared.noscripterr = createObject(OBJ_STRING,sdsnew( 1456 "-NOSCRIPT No matching script. Please use EVAL.\r\n")); 1457 shared.loadingerr = createObject(OBJ_STRING,sdsnew( 1458 "-LOADING Redis is loading the dataset in memory\r\n")); 1459 shared.slowscripterr = createObject(OBJ_STRING,sdsnew( 1460 "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n")); 1461 shared.masterdownerr = createObject(OBJ_STRING,sdsnew( 1462 "-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to 'no'.\r\n")); 1463 shared.bgsaveerr = createObject(OBJ_STRING,sdsnew( 1464 "-MISCONF Redis is configured to save RDB snapshots, but it is currently not able to persist on disk. Commands that may modify the data set are disabled, because this instance is configured to report errors during writes if RDB snapshotting fails (stop-writes-on-bgsave-error option). Please check the Redis logs for details about the RDB error.\r\n")); 1465 shared.roslaveerr = createObject(OBJ_STRING,sdsnew( 1466 "-READONLY You can't write against a read only replica.\r\n")); 1467 shared.noautherr = createObject(OBJ_STRING,sdsnew( 1468 "-NOAUTH Authentication required.\r\n")); 1469 shared.oomerr = createObject(OBJ_STRING,sdsnew( 1470 "-OOM command not allowed when used memory > 'maxmemory'.\r\n")); 1471 shared.execaborterr = createObject(OBJ_STRING,sdsnew( 1472 "-EXECABORT Transaction discarded because of previous errors.\r\n")); 1473 shared.noreplicaserr = createObject(OBJ_STRING,sdsnew( 1474 "-NOREPLICAS Not enough good replicas to write.\r\n")); 1475 shared.busykeyerr = createObject(OBJ_STRING,sdsnew( 1476 "-BUSYKEY Target key name already exists.\r\n")); 1477 shared.space = createObject(OBJ_STRING,sdsnew(" ")); 1478 shared.colon = createObject(OBJ_STRING,sdsnew(":")); 1479 shared.plus = createObject(OBJ_STRING,sdsnew("+")); 1480 1481 for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) { 1482 char dictid_str[64]; 1483 int dictid_len; 1484 1485 dictid_len = ll2string(dictid_str,sizeof(dictid_str),j); 1486 shared.select[j] = createObject(OBJ_STRING, 1487 sdscatprintf(sdsempty(), 1488 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", 1489 dictid_len, dictid_str)); 1490 } 1491 shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13); 1492 shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14); 1493 shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15); 1494 shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18); 1495 shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); 1496 shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); 1497 shared.del = createStringObject("DEL",3); 1498 shared.unlink = createStringObject("UNLINK",6); 1499 shared.rpop = createStringObject("RPOP",4); 1500 shared.lpop = createStringObject("LPOP",4); 1501 shared.lpush = createStringObject("LPUSH",5); 1502 shared.rpoplpush = createStringObject("RPOPLPUSH",9); 1503 shared.zpopmin = createStringObject("ZPOPMIN",7); 1504 shared.zpopmax = createStringObject("ZPOPMAX",7); 1505 for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { 1506 shared.integers[j] = 1507 makeObjectShared(createObject(OBJ_STRING,(void*)(long)j)); 1508 shared.integers[j]->encoding = OBJ_ENCODING_INT; 1509 } 1510 for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) { 1511 shared.mbulkhdr[j] = createObject(OBJ_STRING, 1512 sdscatprintf(sdsempty(),"*%d\r\n",j)); 1513 shared.bulkhdr[j] = createObject(OBJ_STRING, 1514 sdscatprintf(sdsempty(),"$%d\r\n",j)); 1515 } 1516 /* The following two shared objects, minstring and maxstrings, are not 1517 * actually used for their value but as a special object meaning 1518 * respectively the minimum possible string and the maximum possible 1519 * string in string comparisons for the ZRANGEBYLEX command. */ 1520 shared.minstring = sdsnew("minstring"); 1521 shared.maxstring = sdsnew("maxstring"); 1522 } 1523 1524 void initServerConfig(void) { 1525 int j; 1526 1527 pthread_mutex_init(&server.next_client_id_mutex,NULL); 1528 pthread_mutex_init(&server.lruclock_mutex,NULL); 1529 pthread_mutex_init(&server.unixtime_mutex,NULL); 1530 1531 updateCachedTime(); 1532 getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE); 1533 server.runid[CONFIG_RUN_ID_SIZE] = '\0'; 1534 changeReplicationId(); 1535 clearReplicationId2(); 1536 server.timezone = getTimeZone(); /* Initialized by tzset(). */ 1537 server.configfile = NULL; 1538 server.executable = NULL; 1539 server.hz = server.config_hz = CONFIG_DEFAULT_HZ; 1540 server.dynamic_hz = CONFIG_DEFAULT_DYNAMIC_HZ; 1541 server.arch_bits = (sizeof(long) == 8) ? 64 : 32; 1542 server.port = CONFIG_DEFAULT_SERVER_PORT; 1543 server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG; 1544 server.bindaddr_count = 0; 1545 server.unixsocket = NULL; 1546 server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM; 1547 server.ipfd_count = 0; 1548 server.sofd = -1; 1549 server.protected_mode = CONFIG_DEFAULT_PROTECTED_MODE; 1550 server.dbnum = CONFIG_DEFAULT_DBNUM; 1551 server.verbosity = CONFIG_DEFAULT_VERBOSITY; 1552 server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT; 1553 server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE; 1554 server.active_expire_enabled = 1; 1555 server.active_defrag_enabled = CONFIG_DEFAULT_ACTIVE_DEFRAG; 1556 server.active_defrag_ignore_bytes = CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES; 1557 server.active_defrag_threshold_lower = CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER; 1558 server.active_defrag_threshold_upper = CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER; 1559 server.active_defrag_cycle_min = CONFIG_DEFAULT_DEFRAG_CYCLE_MIN; 1560 server.active_defrag_cycle_max = CONFIG_DEFAULT_DEFRAG_CYCLE_MAX; 1561 server.active_defrag_max_scan_fields = CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS; 1562 server.proto_max_bulk_len = CONFIG_DEFAULT_PROTO_MAX_BULK_LEN; 1563 server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN; 1564 server.saveparams = NULL; 1565 server.loading = 0; 1566 server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE); 1567 server.syslog_enabled = CONFIG_DEFAULT_SYSLOG_ENABLED; 1568 server.syslog_ident = zstrdup(CONFIG_DEFAULT_SYSLOG_IDENT); 1569 server.syslog_facility = LOG_LOCAL0; 1570 server.daemonize = CONFIG_DEFAULT_DAEMONIZE; 1571 server.supervised = 0; 1572 server.supervised_mode = SUPERVISED_NONE; 1573 server.aof_state = AOF_OFF; 1574 server.aof_fsync = CONFIG_DEFAULT_AOF_FSYNC; 1575 server.aof_no_fsync_on_rewrite = CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE; 1576 server.aof_rewrite_perc = AOF_REWRITE_PERC; 1577 server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE; 1578 server.aof_rewrite_base_size = 0; 1579 server.aof_rewrite_scheduled = 0; 1580 server.aof_last_fsync = time(NULL); 1581 server.aof_rewrite_time_last = -1; 1582 server.aof_rewrite_time_start = -1; 1583 server.aof_lastbgrewrite_status = C_OK; 1584 server.aof_delayed_fsync = 0; 1585 server.aof_fd = -1; 1586 server.aof_selected_db = -1; /* Make sure the first time will not match */ 1587 server.aof_flush_postponed_start = 0; 1588 server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; 1589 server.rdb_save_incremental_fsync = CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC; 1590 server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; 1591 server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE; 1592 server.pidfile = NULL; 1593 server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME); 1594 server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME); 1595 server.requirepass = NULL; 1596 server.rdb_compression = CONFIG_DEFAULT_RDB_COMPRESSION; 1597 server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM; 1598 server.stop_writes_on_bgsave_err = CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR; 1599 server.activerehashing = CONFIG_DEFAULT_ACTIVE_REHASHING; 1600 server.active_defrag_running = 0; 1601 server.notify_keyspace_events = 0; 1602 server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS; 1603 server.blocked_clients = 0; 1604 memset(server.blocked_clients_by_type,0, 1605 sizeof(server.blocked_clients_by_type)); 1606 server.maxmemory = CONFIG_DEFAULT_MAXMEMORY; 1607 server.maxmemory_policy = CONFIG_DEFAULT_MAXMEMORY_POLICY; 1608 server.maxmemory_samples = CONFIG_DEFAULT_MAXMEMORY_SAMPLES; 1609 server.lfu_log_factor = CONFIG_DEFAULT_LFU_LOG_FACTOR; 1610 server.lfu_decay_time = CONFIG_DEFAULT_LFU_DECAY_TIME; 1611 server.hash_max_ziplist_entries = OBJ_HASH_MAX_ZIPLIST_ENTRIES; 1612 server.hash_max_ziplist_value = OBJ_HASH_MAX_ZIPLIST_VALUE; 1613 server.list_max_ziplist_size = OBJ_LIST_MAX_ZIPLIST_SIZE; 1614 server.list_compress_depth = OBJ_LIST_COMPRESS_DEPTH; 1615 server.set_max_intset_entries = OBJ_SET_MAX_INTSET_ENTRIES; 1616 server.zset_max_ziplist_entries = OBJ_ZSET_MAX_ZIPLIST_ENTRIES; 1617 server.zset_max_ziplist_value = OBJ_ZSET_MAX_ZIPLIST_VALUE; 1618 server.hll_sparse_max_bytes = CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES; 1619 server.stream_node_max_bytes = OBJ_STREAM_NODE_MAX_BYTES; 1620 server.stream_node_max_entries = OBJ_STREAM_NODE_MAX_ENTRIES; 1621 server.shutdown_asap = 0; 1622 server.cluster_enabled = 0; 1623 server.cluster_node_timeout = CLUSTER_DEFAULT_NODE_TIMEOUT; 1624 server.cluster_migration_barrier = CLUSTER_DEFAULT_MIGRATION_BARRIER; 1625 server.cluster_slave_validity_factor = CLUSTER_DEFAULT_SLAVE_VALIDITY; 1626 server.cluster_require_full_coverage = CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE; 1627 server.cluster_slave_no_failover = CLUSTER_DEFAULT_SLAVE_NO_FAILOVER; 1628 server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE); 1629 server.cluster_announce_ip = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_IP; 1630 server.cluster_announce_port = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_PORT; 1631 server.cluster_announce_bus_port = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_BUS_PORT; 1632 server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE; 1633 server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); 1634 server.next_client_id = 1; /* Client IDs, start from 1 .*/ 1635 server.loading_process_events_interval_bytes = (1024*1024*2); 1636 server.lazyfree_lazy_eviction = CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION; 1637 server.lazyfree_lazy_expire = CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE; 1638 server.lazyfree_lazy_server_del = CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL; 1639 server.always_show_logo = CONFIG_DEFAULT_ALWAYS_SHOW_LOGO; 1640 server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT; 1641 1642 unsigned int lruclock = getLRUClock(); 1643 atomicSet(server.lruclock,lruclock); 1644 resetServerSaveParams(); 1645 1646 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ 1647 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */ 1648 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */ 1649 1650 /* Replication related */ 1651 server.masterauth = NULL; 1652 server.masterhost = NULL; 1653 server.masterport = 6379; 1654 server.master = NULL; 1655 server.cached_master = NULL; 1656 server.master_initial_offset = -1; 1657 server.repl_state = REPL_STATE_NONE; 1658 server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; 1659 server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA; 1660 server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY; 1661 server.repl_slave_ignore_maxmemory = CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY; 1662 server.repl_slave_lazy_flush = CONFIG_DEFAULT_SLAVE_LAZY_FLUSH; 1663 server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ 1664 server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY; 1665 server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC; 1666 server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY; 1667 server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD; 1668 server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT; 1669 server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE; 1670 server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG; 1671 server.slave_priority = CONFIG_DEFAULT_SLAVE_PRIORITY; 1672 server.slave_announce_ip = CONFIG_DEFAULT_SLAVE_ANNOUNCE_IP; 1673 server.slave_announce_port = CONFIG_DEFAULT_SLAVE_ANNOUNCE_PORT; 1674 server.master_repl_offset = 0; 1675 1676 /* Replication partial resync backlog */ 1677 server.repl_backlog = NULL; 1678 server.repl_backlog_size = CONFIG_DEFAULT_REPL_BACKLOG_SIZE; 1679 server.repl_backlog_histlen = 0; 1680 server.repl_backlog_idx = 0; 1681 server.repl_backlog_off = 0; 1682 server.repl_backlog_time_limit = CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT; 1683 server.repl_no_slaves_since = time(NULL); 1684 1685 /* Client output buffer limits */ 1686 for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++) 1687 server.client_obuf_limits[j] = clientBufferLimitsDefaults[j]; 1688 1689 /* Double constants initialization */ 1690 R_Zero = 0.0; 1691 R_PosInf = 1.0/R_Zero; 1692 R_NegInf = -1.0/R_Zero; 1693 R_Nan = R_Zero/R_Zero; 1694 1695 /* Command table -- we initiialize it here as it is part of the 1696 * initial configuration, since command names may be changed via 1697 * redis.conf using the rename-command directive. */ 1698 server.commands = dictCreate(&commandTableDictType,NULL); 1699 server.orig_commands = dictCreate(&commandTableDictType,NULL); 1700 populateCommandTable(); 1701 server.delCommand = lookupCommandByCString("del"); 1702 server.multiCommand = lookupCommandByCString("multi"); 1703 server.lpushCommand = lookupCommandByCString("lpush"); 1704 server.lpopCommand = lookupCommandByCString("lpop"); 1705 server.rpopCommand = lookupCommandByCString("rpop"); 1706 server.zpopminCommand = lookupCommandByCString("zpopmin"); 1707 server.zpopmaxCommand = lookupCommandByCString("zpopmax"); 1708 server.sremCommand = lookupCommandByCString("srem"); 1709 server.execCommand = lookupCommandByCString("exec"); 1710 server.expireCommand = lookupCommandByCString("expire"); 1711 server.pexpireCommand = lookupCommandByCString("pexpire"); 1712 server.xclaimCommand = lookupCommandByCString("xclaim"); 1713 server.xgroupCommand = lookupCommandByCString("xgroup"); 1714 1715 /* Slow log */ 1716 server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN; 1717 server.slowlog_max_len = CONFIG_DEFAULT_SLOWLOG_MAX_LEN; 1718 1719 /* Latency monitor */ 1720 server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD; 1721 1722 /* Debugging */ 1723 server.assert_failed = "<no assertion failed>"; 1724 server.assert_file = "<no file>"; 1725 server.assert_line = 0; 1726 server.bug_report_start = 0; 1727 server.watchdog_period = 0; 1728 1729 /* By default we want scripts to be always replicated by effects 1730 * (single commands executed by the script), and not by sending the 1731 * script to the slave / AOF. This is the new way starting from 1732 * Redis 5. However it is possible to revert it via redis.conf. */ 1733 server.lua_always_replicate_commands = 1; 1734 } 1735 1736 extern char **environ; 1737 1738 /* Restart the server, executing the same executable that started this 1739 * instance, with the same arguments and configuration file. 1740 * 1741 * The function is designed to directly call execve() so that the new 1742 * server instance will retain the PID of the previous one. 1743 * 1744 * The list of flags, that may be bitwise ORed together, alter the 1745 * behavior of this function: 1746 * 1747 * RESTART_SERVER_NONE No flags. 1748 * RESTART_SERVER_GRACEFULLY Do a proper shutdown before restarting. 1749 * RESTART_SERVER_CONFIG_REWRITE Rewrite the config file before restarting. 1750 * 1751 * On success the function does not return, because the process turns into 1752 * a different process. On error C_ERR is returned. */ 1753 int restartServer(int flags, mstime_t delay) { 1754 int j; 1755 1756 /* Check if we still have accesses to the executable that started this 1757 * server instance. */ 1758 if (access(server.executable,X_OK) == -1) { 1759 serverLog(LL_WARNING,"Can't restart: this process has no " 1760 "permissions to execute %s", server.executable); 1761 return C_ERR; 1762 } 1763 1764 /* Config rewriting. */ 1765 if (flags & RESTART_SERVER_CONFIG_REWRITE && 1766 server.configfile && 1767 rewriteConfig(server.configfile) == -1) 1768 { 1769 serverLog(LL_WARNING,"Can't restart: configuration rewrite process " 1770 "failed"); 1771 return C_ERR; 1772 } 1773 1774 /* Perform a proper shutdown. */ 1775 if (flags & RESTART_SERVER_GRACEFULLY && 1776 prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK) 1777 { 1778 serverLog(LL_WARNING,"Can't restart: error preparing for shutdown"); 1779 return C_ERR; 1780 } 1781 1782 /* Close all file descriptors, with the exception of stdin, stdout, strerr 1783 * which are useful if we restart a Redis server which is not daemonized. */ 1784 for (j = 3; j < (int)server.maxclients + 1024; j++) { 1785 /* Test the descriptor validity before closing it, otherwise 1786 * Valgrind issues a warning on close(). */ 1787 if (fcntl(j,F_GETFD) != -1) close(j); 1788 } 1789 1790 /* Execute the server with the original command line. */ 1791 if (delay) usleep(delay*1000); 1792 zfree(server.exec_argv[0]); 1793 server.exec_argv[0] = zstrdup(server.executable); 1794 execve(server.executable,server.exec_argv,environ); 1795 1796 /* If an error occurred here, there is nothing we can do, but exit. */ 1797 _exit(1); 1798 1799 return C_ERR; /* Never reached. */ 1800 } 1801 1802 /* This function will try to raise the max number of open files accordingly to 1803 * the configured max number of clients. It also reserves a number of file 1804 * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of 1805 * persistence, listening sockets, log files and so forth. 1806 * 1807 * If it will not be possible to set the limit accordingly to the configured 1808 * max number of clients, the function will do the reverse setting 1809 * server.maxclients to the value that we can actually handle. */ 1810 void adjustOpenFilesLimit(void) { 1811 rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS; 1812 struct rlimit limit; 1813 1814 if (getrlimit(RLIMIT_NOFILE,&limit) == -1) { 1815 serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.", 1816 strerror(errno)); 1817 server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS; 1818 } else { 1819 rlim_t oldlimit = limit.rlim_cur; 1820 1821 /* Set the max number of files if the current limit is not enough 1822 * for our needs. */ 1823 if (oldlimit < maxfiles) { 1824 rlim_t bestlimit; 1825 int setrlimit_error = 0; 1826 1827 /* Try to set the file limit to match 'maxfiles' or at least 1828 * to the higher value supported less than maxfiles. */ 1829 bestlimit = maxfiles; 1830 while(bestlimit > oldlimit) { 1831 rlim_t decr_step = 16; 1832 1833 limit.rlim_cur = bestlimit; 1834 limit.rlim_max = bestlimit; 1835 if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break; 1836 setrlimit_error = errno; 1837 1838 /* We failed to set file limit to 'bestlimit'. Try with a 1839 * smaller limit decrementing by a few FDs per iteration. */ 1840 if (bestlimit < decr_step) break; 1841 bestlimit -= decr_step; 1842 } 1843 1844 /* Assume that the limit we get initially is still valid if 1845 * our last try was even lower. */ 1846 if (bestlimit < oldlimit) bestlimit = oldlimit; 1847 1848 if (bestlimit < maxfiles) { 1849 unsigned int old_maxclients = server.maxclients; 1850 server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS; 1851 /* maxclients is unsigned so may overflow: in order 1852 * to check if maxclients is now logically less than 1 1853 * we test indirectly via bestlimit. */ 1854 if (bestlimit <= CONFIG_MIN_RESERVED_FDS) { 1855 serverLog(LL_WARNING,"Your current 'ulimit -n' " 1856 "of %llu is not enough for the server to start. " 1857 "Please increase your open file limit to at least " 1858 "%llu. Exiting.", 1859 (unsigned long long) oldlimit, 1860 (unsigned long long) maxfiles); 1861 exit(1); 1862 } 1863 serverLog(LL_WARNING,"You requested maxclients of %d " 1864 "requiring at least %llu max file descriptors.", 1865 old_maxclients, 1866 (unsigned long long) maxfiles); 1867 serverLog(LL_WARNING,"Server can't set maximum open files " 1868 "to %llu because of OS error: %s.", 1869 (unsigned long long) maxfiles, strerror(setrlimit_error)); 1870 serverLog(LL_WARNING,"Current maximum open files is %llu. " 1871 "maxclients has been reduced to %d to compensate for " 1872 "low ulimit. " 1873 "If you need higher maxclients increase 'ulimit -n'.", 1874 (unsigned long long) bestlimit, server.maxclients); 1875 } else { 1876 serverLog(LL_NOTICE,"Increased maximum number of open files " 1877 "to %llu (it was originally set to %llu).", 1878 (unsigned long long) maxfiles, 1879 (unsigned long long) oldlimit); 1880 } 1881 } 1882 } 1883 } 1884 1885 /* Check that server.tcp_backlog can be actually enforced in Linux according 1886 * to the value of /proc/sys/net/core/somaxconn, or warn about it. */ 1887 void checkTcpBacklogSettings(void) { 1888 #ifdef HAVE_PROC_SOMAXCONN 1889 FILE *fp = fopen("/proc/sys/net/core/somaxconn","r"); 1890 char buf[1024]; 1891 if (!fp) return; 1892 if (fgets(buf,sizeof(buf),fp) != NULL) { 1893 int somaxconn = atoi(buf); 1894 if (somaxconn > 0 && somaxconn < server.tcp_backlog) { 1895 serverLog(LL_WARNING,"WARNING: The TCP backlog setting of %d cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of %d.", server.tcp_backlog, somaxconn); 1896 } 1897 } 1898 fclose(fp); 1899 #endif 1900 } 1901 1902 /* Initialize a set of file descriptors to listen to the specified 'port' 1903 * binding the addresses specified in the Redis server configuration. 1904 * 1905 * The listening file descriptors are stored in the integer array 'fds' 1906 * and their number is set in '*count'. 1907 * 1908 * The addresses to bind are specified in the global server.bindaddr array 1909 * and their number is server.bindaddr_count. If the server configuration 1910 * contains no specific addresses to bind, this function will try to 1911 * bind * (all addresses) for both the IPv4 and IPv6 protocols. 1912 * 1913 * On success the function returns C_OK. 1914 * 1915 * On error the function returns C_ERR. For the function to be on 1916 * error, at least one of the server.bindaddr addresses was 1917 * impossible to bind, or no bind addresses were specified in the server 1918 * configuration but the function is not able to bind * for at least 1919 * one of the IPv4 or IPv6 protocols. */ 1920 int listenToPort(int port, int *fds, int *count) { 1921 int j; 1922 1923 /* Force binding of 0.0.0.0 if no bind address is specified, always 1924 * entering the loop if j == 0. */ 1925 if (server.bindaddr_count == 0) server.bindaddr[0] = NULL; 1926 for (j = 0; j < server.bindaddr_count || j == 0; j++) { 1927 if (server.bindaddr[j] == NULL) { 1928 int unsupported = 0; 1929 /* Bind * for both IPv6 and IPv4, we enter here only if 1930 * server.bindaddr_count == 0. */ 1931 fds[*count] = anetTcp6Server(server.neterr,port,NULL, 1932 server.tcp_backlog); 1933 if (fds[*count] != ANET_ERR) { 1934 anetNonBlock(NULL,fds[*count]); 1935 (*count)++; 1936 } else if (errno == EAFNOSUPPORT) { 1937 unsupported++; 1938 serverLog(LL_WARNING,"Not listening to IPv6: unsupproted"); 1939 } 1940 1941 if (*count == 1 || unsupported) { 1942 /* Bind the IPv4 address as well. */ 1943 fds[*count] = anetTcpServer(server.neterr,port,NULL, 1944 server.tcp_backlog); 1945 if (fds[*count] != ANET_ERR) { 1946 anetNonBlock(NULL,fds[*count]); 1947 (*count)++; 1948 } else if (errno == EAFNOSUPPORT) { 1949 unsupported++; 1950 serverLog(LL_WARNING,"Not listening to IPv4: unsupproted"); 1951 } 1952 } 1953 /* Exit the loop if we were able to bind * on IPv4 and IPv6, 1954 * otherwise fds[*count] will be ANET_ERR and we'll print an 1955 * error and return to the caller with an error. */ 1956 if (*count + unsupported == 2) break; 1957 } else if (strchr(server.bindaddr[j],':')) { 1958 /* Bind IPv6 address. */ 1959 fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], 1960 server.tcp_backlog); 1961 } else { 1962 /* Bind IPv4 address. */ 1963 fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], 1964 server.tcp_backlog); 1965 } 1966 if (fds[*count] == ANET_ERR) { 1967 serverLog(LL_WARNING, 1968 "Could not create server TCP listening socket %s:%d: %s", 1969 server.bindaddr[j] ? server.bindaddr[j] : "*", 1970 port, server.neterr); 1971 if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT || 1972 errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT || 1973 errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL) 1974 continue; 1975 return C_ERR; 1976 } 1977 anetNonBlock(NULL,fds[*count]); 1978 (*count)++; 1979 } 1980 return C_OK; 1981 } 1982 1983 /* Resets the stats that we expose via INFO or other means that we want 1984 * to reset via CONFIG RESETSTAT. The function is also used in order to 1985 * initialize these fields in initServer() at server startup. */ 1986 void resetServerStats(void) { 1987 int j; 1988 1989 server.stat_numcommands = 0; 1990 server.stat_numconnections = 0; 1991 server.stat_expiredkeys = 0; 1992 server.stat_expired_stale_perc = 0; 1993 server.stat_expired_time_cap_reached_count = 0; 1994 server.stat_evictedkeys = 0; 1995 server.stat_keyspace_misses = 0; 1996 server.stat_keyspace_hits = 0; 1997 server.stat_active_defrag_hits = 0; 1998 server.stat_active_defrag_misses = 0; 1999 server.stat_active_defrag_key_hits = 0; 2000 server.stat_active_defrag_key_misses = 0; 2001 server.stat_active_defrag_scanned = 0; 2002 server.stat_fork_time = 0; 2003 server.stat_fork_rate = 0; 2004 server.stat_rejected_conn = 0; 2005 server.stat_sync_full = 0; 2006 server.stat_sync_partial_ok = 0; 2007 server.stat_sync_partial_err = 0; 2008 for (j = 0; j < STATS_METRIC_COUNT; j++) { 2009 server.inst_metric[j].idx = 0; 2010 server.inst_metric[j].last_sample_time = mstime(); 2011 server.inst_metric[j].last_sample_count = 0; 2012 memset(server.inst_metric[j].samples,0, 2013 sizeof(server.inst_metric[j].samples)); 2014 } 2015 server.stat_net_input_bytes = 0; 2016 server.stat_net_output_bytes = 0; 2017 server.aof_delayed_fsync = 0; 2018 } 2019 2020 void initServer(void) { 2021 int j; 2022 2023 signal(SIGHUP, SIG_IGN); 2024 signal(SIGPIPE, SIG_IGN); 2025 setupSignalHandlers(); 2026 2027 if (server.syslog_enabled) { 2028 openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT, 2029 server.syslog_facility); 2030 } 2031 2032 server.hz = server.config_hz; 2033 server.pid = getpid(); 2034 server.current_client = NULL; 2035 server.clients = listCreate(); 2036 server.clients_index = raxNew(); 2037 server.clients_to_close = listCreate(); 2038 server.slaves = listCreate(); 2039 server.monitors = listCreate(); 2040 server.clients_pending_write = listCreate(); 2041 server.slaveseldb = -1; /* Force to emit the first SELECT command. */ 2042 server.unblocked_clients = listCreate(); 2043 server.ready_keys = listCreate(); 2044 server.clients_waiting_acks = listCreate(); 2045 server.get_ack_from_slaves = 0; 2046 server.clients_paused = 0; 2047 server.system_memory_size = zmalloc_get_memory_size(); 2048 2049 createSharedObjects(); 2050 adjustOpenFilesLimit(); 2051 server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); 2052 if (server.el == NULL) { 2053 serverLog(LL_WARNING, 2054 "Failed creating the event loop. Error message: '%s'", 2055 strerror(errno)); 2056 exit(1); 2057 } 2058 server.db = zmalloc(sizeof(redisDb)*server.dbnum); 2059 2060 /* Open the TCP listening socket for the user commands. */ 2061 if (server.port != 0 && 2062 listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) 2063 exit(1); 2064 2065 /* Open the listening Unix domain socket. */ 2066 if (server.unixsocket != NULL) { 2067 unlink(server.unixsocket); /* don't care if this fails */ 2068 server.sofd = anetUnixServer(server.neterr,server.unixsocket, 2069 server.unixsocketperm, server.tcp_backlog); 2070 if (server.sofd == ANET_ERR) { 2071 serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr); 2072 exit(1); 2073 } 2074 anetNonBlock(NULL,server.sofd); 2075 } 2076 2077 /* Abort if there are no listening sockets at all. */ 2078 if (server.ipfd_count == 0 && server.sofd < 0) { 2079 serverLog(LL_WARNING, "Configured to not listen anywhere, exiting."); 2080 exit(1); 2081 } 2082 2083 /* Create the Redis databases, and initialize other internal state. */ 2084 for (j = 0; j < server.dbnum; j++) { 2085 server.db[j].dict = dictCreate(&dbDictType,NULL); 2086 server.db[j].expires = dictCreate(&keyptrDictType,NULL); 2087 server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); 2088 server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); 2089 server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); 2090 server.db[j].id = j; 2091 server.db[j].avg_ttl = 0; 2092 server.db[j].defrag_later = listCreate(); 2093 } 2094 evictionPoolAlloc(); /* Initialize the LRU keys pool. */ 2095 server.pubsub_channels = dictCreate(&keylistDictType,NULL); 2096 server.pubsub_patterns = listCreate(); 2097 listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); 2098 listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern); 2099 server.cronloops = 0; 2100 server.rdb_child_pid = -1; 2101 server.aof_child_pid = -1; 2102 server.rdb_child_type = RDB_CHILD_TYPE_NONE; 2103 server.rdb_bgsave_scheduled = 0; 2104 server.child_info_pipe[0] = -1; 2105 server.child_info_pipe[1] = -1; 2106 server.child_info_data.magic = 0; 2107 aofRewriteBufferReset(); 2108 server.aof_buf = sdsempty(); 2109 server.lastsave = time(NULL); /* At startup we consider the DB saved. */ 2110 server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */ 2111 server.rdb_save_time_last = -1; 2112 server.rdb_save_time_start = -1; 2113 server.dirty = 0; 2114 resetServerStats(); 2115 /* A few stats we don't want to reset: server startup time, and peak mem. */ 2116 server.stat_starttime = time(NULL); 2117 server.stat_peak_memory = 0; 2118 server.stat_rdb_cow_bytes = 0; 2119 server.stat_aof_cow_bytes = 0; 2120 server.cron_malloc_stats.zmalloc_used = 0; 2121 server.cron_malloc_stats.process_rss = 0; 2122 server.cron_malloc_stats.allocator_allocated = 0; 2123 server.cron_malloc_stats.allocator_active = 0; 2124 server.cron_malloc_stats.allocator_resident = 0; 2125 server.lastbgsave_status = C_OK; 2126 server.aof_last_write_status = C_OK; 2127 server.aof_last_write_errno = 0; 2128 server.repl_good_slaves_count = 0; 2129 2130 /* Create the timer callback, this is our way to process many background 2131 * operations incrementally, like clients timeout, eviction of unaccessed 2132 * expired keys and so forth. */ 2133 if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { 2134 serverPanic("Can't create event loop timers."); 2135 exit(1); 2136 } 2137 2138 /* Create an event handler for accepting new connections in TCP and Unix 2139 * domain sockets. */ 2140 for (j = 0; j < server.ipfd_count; j++) { 2141 if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, 2142 acceptTcpHandler,NULL) == AE_ERR) 2143 { 2144 serverPanic( 2145 "Unrecoverable error creating server.ipfd file event."); 2146 } 2147 } 2148 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, 2149 acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); 2150 2151 #ifndef HAVE_FF_KQUEUE 2152 /* Register a readable event for the pipe used to awake the event loop 2153 * when a blocked client in a module needs attention. */ 2154 if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE, 2155 moduleBlockedClientPipeReadable,NULL) == AE_ERR) { 2156 serverPanic( 2157 "Error registering the readable event for the module " 2158 "blocked clients subsystem."); 2159 } 2160 #endif 2161 2162 /* Open the AOF file if needed. */ 2163 if (server.aof_state == AOF_ON) { 2164 server.aof_fd = open(server.aof_filename, 2165 O_WRONLY|O_APPEND|O_CREAT,0644); 2166 if (server.aof_fd == -1) { 2167 serverLog(LL_WARNING, "Can't open the append-only file: %s", 2168 strerror(errno)); 2169 exit(1); 2170 } 2171 } 2172 2173 /* 32 bit instances are limited to 4GB of address space, so if there is 2174 * no explicit limit in the user provided configuration we set a limit 2175 * at 3 GB using maxmemory with 'noeviction' policy'. This avoids 2176 * useless crashes of the Redis instance for out of memory. */ 2177 if (server.arch_bits == 32 && server.maxmemory == 0) { 2178 serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now."); 2179 server.maxmemory = 3072LL*(1024*1024); /* 3 GB */ 2180 server.maxmemory_policy = MAXMEMORY_NO_EVICTION; 2181 } 2182 2183 if (server.cluster_enabled) clusterInit(); 2184 replicationScriptCacheInit(); 2185 scriptingInit(1); 2186 slowlogInit(); 2187 latencyMonitorInit(); 2188 bioInit(); 2189 server.initial_memory_usage = zmalloc_used_memory(); 2190 } 2191 2192 /* Populates the Redis Command Table starting from the hard coded list 2193 * we have on top of redis.c file. */ 2194 void populateCommandTable(void) { 2195 int j; 2196 int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); 2197 2198 for (j = 0; j < numcommands; j++) { 2199 struct redisCommand *c = redisCommandTable+j; 2200 char *f = c->sflags; 2201 int retval1, retval2; 2202 2203 while(*f != '\0') { 2204 switch(*f) { 2205 case 'w': c->flags |= CMD_WRITE; break; 2206 case 'r': c->flags |= CMD_READONLY; break; 2207 case 'm': c->flags |= CMD_DENYOOM; break; 2208 case 'a': c->flags |= CMD_ADMIN; break; 2209 case 'p': c->flags |= CMD_PUBSUB; break; 2210 case 's': c->flags |= CMD_NOSCRIPT; break; 2211 case 'R': c->flags |= CMD_RANDOM; break; 2212 case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break; 2213 case 'l': c->flags |= CMD_LOADING; break; 2214 case 't': c->flags |= CMD_STALE; break; 2215 case 'M': c->flags |= CMD_SKIP_MONITOR; break; 2216 case 'k': c->flags |= CMD_ASKING; break; 2217 case 'F': c->flags |= CMD_FAST; break; 2218 default: serverPanic("Unsupported command flag"); break; 2219 } 2220 f++; 2221 } 2222 2223 retval1 = dictAdd(server.commands, sdsnew(c->name), c); 2224 /* Populate an additional dictionary that will be unaffected 2225 * by rename-command statements in redis.conf. */ 2226 retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c); 2227 serverAssert(retval1 == DICT_OK && retval2 == DICT_OK); 2228 } 2229 } 2230 2231 void resetCommandTableStats(void) { 2232 struct redisCommand *c; 2233 dictEntry *de; 2234 dictIterator *di; 2235 2236 di = dictGetSafeIterator(server.commands); 2237 while((de = dictNext(di)) != NULL) { 2238 c = (struct redisCommand *) dictGetVal(de); 2239 c->microseconds = 0; 2240 c->calls = 0; 2241 } 2242 dictReleaseIterator(di); 2243 2244 } 2245 2246 /* ========================== Redis OP Array API ============================ */ 2247 2248 void redisOpArrayInit(redisOpArray *oa) { 2249 oa->ops = NULL; 2250 oa->numops = 0; 2251 } 2252 2253 int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid, 2254 robj **argv, int argc, int target) 2255 { 2256 redisOp *op; 2257 2258 oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1)); 2259 op = oa->ops+oa->numops; 2260 op->cmd = cmd; 2261 op->dbid = dbid; 2262 op->argv = argv; 2263 op->argc = argc; 2264 op->target = target; 2265 oa->numops++; 2266 return oa->numops; 2267 } 2268 2269 void redisOpArrayFree(redisOpArray *oa) { 2270 while(oa->numops) { 2271 int j; 2272 redisOp *op; 2273 2274 oa->numops--; 2275 op = oa->ops+oa->numops; 2276 for (j = 0; j < op->argc; j++) 2277 decrRefCount(op->argv[j]); 2278 zfree(op->argv); 2279 } 2280 zfree(oa->ops); 2281 } 2282 2283 /* ====================== Commands lookup and execution ===================== */ 2284 2285 struct redisCommand *lookupCommand(sds name) { 2286 return dictFetchValue(server.commands, name); 2287 } 2288 2289 struct redisCommand *lookupCommandByCString(char *s) { 2290 struct redisCommand *cmd; 2291 sds name = sdsnew(s); 2292 2293 cmd = dictFetchValue(server.commands, name); 2294 sdsfree(name); 2295 return cmd; 2296 } 2297 2298 /* Lookup the command in the current table, if not found also check in 2299 * the original table containing the original command names unaffected by 2300 * redis.conf rename-command statement. 2301 * 2302 * This is used by functions rewriting the argument vector such as 2303 * rewriteClientCommandVector() in order to set client->cmd pointer 2304 * correctly even if the command was renamed. */ 2305 struct redisCommand *lookupCommandOrOriginal(sds name) { 2306 struct redisCommand *cmd = dictFetchValue(server.commands, name); 2307 2308 if (!cmd) cmd = dictFetchValue(server.orig_commands,name); 2309 return cmd; 2310 } 2311 2312 /* Propagate the specified command (in the context of the specified database id) 2313 * to AOF and Slaves. 2314 * 2315 * flags are an xor between: 2316 * + PROPAGATE_NONE (no propagation of command at all) 2317 * + PROPAGATE_AOF (propagate into the AOF file if is enabled) 2318 * + PROPAGATE_REPL (propagate into the replication link) 2319 * 2320 * This should not be used inside commands implementation. Use instead 2321 * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation(). 2322 */ 2323 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, 2324 int flags) 2325 { 2326 if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) 2327 feedAppendOnlyFile(cmd,dbid,argv,argc); 2328 if (flags & PROPAGATE_REPL) 2329 replicationFeedSlaves(server.slaves,dbid,argv,argc); 2330 } 2331 2332 /* Used inside commands to schedule the propagation of additional commands 2333 * after the current command is propagated to AOF / Replication. 2334 * 2335 * 'cmd' must be a pointer to the Redis command to replicate, dbid is the 2336 * database ID the command should be propagated into. 2337 * Arguments of the command to propagte are passed as an array of redis 2338 * objects pointers of len 'argc', using the 'argv' vector. 2339 * 2340 * The function does not take a reference to the passed 'argv' vector, 2341 * so it is up to the caller to release the passed argv (but it is usually 2342 * stack allocated). The function autoamtically increments ref count of 2343 * passed objects, so the caller does not need to. */ 2344 void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, 2345 int target) 2346 { 2347 robj **argvcopy; 2348 int j; 2349 2350 if (server.loading) return; /* No propagation during loading. */ 2351 2352 argvcopy = zmalloc(sizeof(robj*)*argc); 2353 for (j = 0; j < argc; j++) { 2354 argvcopy[j] = argv[j]; 2355 incrRefCount(argv[j]); 2356 } 2357 redisOpArrayAppend(&server.also_propagate,cmd,dbid,argvcopy,argc,target); 2358 } 2359 2360 /* It is possible to call the function forceCommandPropagation() inside a 2361 * Redis command implementation in order to to force the propagation of a 2362 * specific command execution into AOF / Replication. */ 2363 void forceCommandPropagation(client *c, int flags) { 2364 if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL; 2365 if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF; 2366 } 2367 2368 /* Avoid that the executed command is propagated at all. This way we 2369 * are free to just propagate what we want using the alsoPropagate() 2370 * API. */ 2371 void preventCommandPropagation(client *c) { 2372 c->flags |= CLIENT_PREVENT_PROP; 2373 } 2374 2375 /* AOF specific version of preventCommandPropagation(). */ 2376 void preventCommandAOF(client *c) { 2377 c->flags |= CLIENT_PREVENT_AOF_PROP; 2378 } 2379 2380 /* Replication specific version of preventCommandPropagation(). */ 2381 void preventCommandReplication(client *c) { 2382 c->flags |= CLIENT_PREVENT_REPL_PROP; 2383 } 2384 2385 /* Call() is the core of Redis execution of a command. 2386 * 2387 * The following flags can be passed: 2388 * CMD_CALL_NONE No flags. 2389 * CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed. 2390 * CMD_CALL_STATS Populate command stats. 2391 * CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset 2392 * or if the client flags are forcing propagation. 2393 * CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset 2394 * or if the client flags are forcing propagation. 2395 * CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL. 2396 * CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE. 2397 * 2398 * The exact propagation behavior depends on the client flags. 2399 * Specifically: 2400 * 2401 * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set 2402 * and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set 2403 * in the call flags, then the command is propagated even if the 2404 * dataset was not affected by the command. 2405 * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP 2406 * are set, the propagation into AOF or to slaves is not performed even 2407 * if the command modified the dataset. 2408 * 2409 * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF 2410 * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or 2411 * slaves propagation will never occur. 2412 * 2413 * Client flags are modified by the implementation of a given command 2414 * using the following API: 2415 * 2416 * forceCommandPropagation(client *c, int flags); 2417 * preventCommandPropagation(client *c); 2418 * preventCommandAOF(client *c); 2419 * preventCommandReplication(client *c); 2420 * 2421 */ 2422 void call(client *c, int flags) { 2423 long long dirty, start, duration; 2424 int client_old_flags = c->flags; 2425 struct redisCommand *real_cmd = c->cmd; 2426 2427 /* Sent the command to clients in MONITOR mode, only if the commands are 2428 * not generated from reading an AOF. */ 2429 if (listLength(server.monitors) && 2430 !server.loading && 2431 !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) 2432 { 2433 replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); 2434 } 2435 2436 /* Initialization: clear the flags that must be set by the command on 2437 * demand, and initialize the array for additional commands propagation. */ 2438 c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); 2439 redisOpArray prev_also_propagate = server.also_propagate; 2440 redisOpArrayInit(&server.also_propagate); 2441 2442 /* Call the command. */ 2443 dirty = server.dirty; 2444 start = ustime(); 2445 c->cmd->proc(c); 2446 duration = ustime()-start; 2447 dirty = server.dirty-dirty; 2448 if (dirty < 0) dirty = 0; 2449 2450 /* When EVAL is called loading the AOF we don't want commands called 2451 * from Lua to go into the slowlog or to populate statistics. */ 2452 if (server.loading && c->flags & CLIENT_LUA) 2453 flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS); 2454 2455 /* If the caller is Lua, we want to force the EVAL caller to propagate 2456 * the script if the command flag or client flag are forcing the 2457 * propagation. */ 2458 if (c->flags & CLIENT_LUA && server.lua_caller) { 2459 if (c->flags & CLIENT_FORCE_REPL) 2460 server.lua_caller->flags |= CLIENT_FORCE_REPL; 2461 if (c->flags & CLIENT_FORCE_AOF) 2462 server.lua_caller->flags |= CLIENT_FORCE_AOF; 2463 } 2464 2465 /* Log the command into the Slow log if needed, and populate the 2466 * per-command statistics that we show in INFO commandstats. */ 2467 if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) { 2468 char *latency_event = (c->cmd->flags & CMD_FAST) ? 2469 "fast-command" : "command"; 2470 latencyAddSampleIfNeeded(latency_event,duration/1000); 2471 slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration); 2472 } 2473 if (flags & CMD_CALL_STATS) { 2474 /* use the real command that was executed (cmd and lastamc) may be 2475 * different, in case of MULTI-EXEC or re-written commands such as 2476 * EXPIRE, GEOADD, etc. */ 2477 real_cmd->microseconds += duration; 2478 real_cmd->calls++; 2479 } 2480 2481 /* Propagate the command into the AOF and replication link */ 2482 if (flags & CMD_CALL_PROPAGATE && 2483 (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) 2484 { 2485 int propagate_flags = PROPAGATE_NONE; 2486 2487 /* Check if the command operated changes in the data set. If so 2488 * set for replication / AOF propagation. */ 2489 if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); 2490 2491 /* If the client forced AOF / replication of the command, set 2492 * the flags regardless of the command effects on the data set. */ 2493 if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL; 2494 if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF; 2495 2496 /* However prevent AOF / replication propagation if the command 2497 * implementations called preventCommandPropagation() or similar, 2498 * or if we don't have the call() flags to do so. */ 2499 if (c->flags & CLIENT_PREVENT_REPL_PROP || 2500 !(flags & CMD_CALL_PROPAGATE_REPL)) 2501 propagate_flags &= ~PROPAGATE_REPL; 2502 if (c->flags & CLIENT_PREVENT_AOF_PROP || 2503 !(flags & CMD_CALL_PROPAGATE_AOF)) 2504 propagate_flags &= ~PROPAGATE_AOF; 2505 2506 /* Call propagate() only if at least one of AOF / replication 2507 * propagation is needed. Note that modules commands handle replication 2508 * in an explicit way, so we never replicate them automatically. */ 2509 if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE)) 2510 propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); 2511 } 2512 2513 /* Restore the old replication flags, since call() can be executed 2514 * recursively. */ 2515 c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); 2516 c->flags |= client_old_flags & 2517 (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); 2518 2519 /* Handle the alsoPropagate() API to handle commands that want to propagate 2520 * multiple separated commands. Note that alsoPropagate() is not affected 2521 * by CLIENT_PREVENT_PROP flag. */ 2522 if (server.also_propagate.numops) { 2523 int j; 2524 redisOp *rop; 2525 2526 if (flags & CMD_CALL_PROPAGATE) { 2527 for (j = 0; j < server.also_propagate.numops; j++) { 2528 rop = &server.also_propagate.ops[j]; 2529 int target = rop->target; 2530 /* Whatever the command wish is, we honor the call() flags. */ 2531 if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF; 2532 if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL; 2533 if (target) 2534 propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target); 2535 } 2536 } 2537 redisOpArrayFree(&server.also_propagate); 2538 } 2539 server.also_propagate = prev_also_propagate; 2540 server.stat_numcommands++; 2541 } 2542 2543 /* If this function gets called we already read a whole 2544 * command, arguments are in the client argv/argc fields. 2545 * processCommand() execute the command or prepare the 2546 * server for a bulk read from the client. 2547 * 2548 * If C_OK is returned the client is still alive and valid and 2549 * other operations can be performed by the caller. Otherwise 2550 * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ 2551 int processCommand(client *c) { 2552 moduleCallCommandFilters(c); 2553 2554 /* The QUIT command is handled separately. Normal command procs will 2555 * go through checking for replication and QUIT will cause trouble 2556 * when FORCE_REPLICATION is enabled and would be implemented in 2557 * a regular command proc. */ 2558 if (!strcasecmp(c->argv[0]->ptr,"quit")) { 2559 addReply(c,shared.ok); 2560 c->flags |= CLIENT_CLOSE_AFTER_REPLY; 2561 return C_ERR; 2562 } 2563 2564 /* Now lookup the command and check ASAP about trivial error conditions 2565 * such as wrong arity, bad command name and so forth. */ 2566 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); 2567 if (!c->cmd) { 2568 flagTransaction(c); 2569 sds args = sdsempty(); 2570 int i; 2571 for (i=1; i < c->argc && sdslen(args) < 128; i++) 2572 args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr); 2573 addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s", 2574 (char*)c->argv[0]->ptr, args); 2575 sdsfree(args); 2576 return C_OK; 2577 } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || 2578 (c->argc < -c->cmd->arity)) { 2579 flagTransaction(c); 2580 addReplyErrorFormat(c,"wrong number of arguments for '%s' command", 2581 c->cmd->name); 2582 return C_OK; 2583 } 2584 2585 /* Check if the user is authenticated */ 2586 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) 2587 { 2588 flagTransaction(c); 2589 addReply(c,shared.noautherr); 2590 return C_OK; 2591 } 2592 2593 /* If cluster is enabled perform the cluster redirection here. 2594 * However we don't perform the redirection if: 2595 * 1) The sender of this command is our master. 2596 * 2) The command has no key arguments. */ 2597 if (server.cluster_enabled && 2598 !(c->flags & CLIENT_MASTER) && 2599 !(c->flags & CLIENT_LUA && 2600 server.lua_caller->flags & CLIENT_MASTER) && 2601 !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && 2602 c->cmd->proc != execCommand)) 2603 { 2604 int hashslot; 2605 int error_code; 2606 clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, 2607 &hashslot,&error_code); 2608 if (n == NULL || n != server.cluster->myself) { 2609 if (c->cmd->proc == execCommand) { 2610 discardTransaction(c); 2611 } else { 2612 flagTransaction(c); 2613 } 2614 clusterRedirectClient(c,n,hashslot,error_code); 2615 return C_OK; 2616 } 2617 } 2618 2619 /* Handle the maxmemory directive. 2620 * 2621 * Note that we do not want to reclaim memory if we are here re-entering 2622 * the event loop since there is a busy Lua script running in timeout 2623 * condition, to avoid mixing the propagation of scripts with the 2624 * propagation of DELs due to eviction. */ 2625 if (server.maxmemory && !server.lua_timedout) { 2626 int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR; 2627 /* freeMemoryIfNeeded may flush slave output buffers. This may result 2628 * into a slave, that may be the active client, to be freed. */ 2629 if (server.current_client == NULL) return C_ERR; 2630 2631 /* It was impossible to free enough memory, and the command the client 2632 * is trying to execute is denied during OOM conditions or the client 2633 * is in MULTI/EXEC context? Error. */ 2634 if (out_of_memory && 2635 (c->cmd->flags & CMD_DENYOOM || 2636 (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) { 2637 flagTransaction(c); 2638 addReply(c, shared.oomerr); 2639 return C_OK; 2640 } 2641 } 2642 2643 /* Don't accept write commands if there are problems persisting on disk 2644 * and if this is a master instance. */ 2645 int deny_write_type = writeCommandsDeniedByDiskError(); 2646 if (deny_write_type != DISK_ERROR_TYPE_NONE && 2647 server.masterhost == NULL && 2648 (c->cmd->flags & CMD_WRITE || 2649 c->cmd->proc == pingCommand)) 2650 { 2651 flagTransaction(c); 2652 if (deny_write_type == DISK_ERROR_TYPE_RDB) 2653 addReply(c, shared.bgsaveerr); 2654 else 2655 addReplySds(c, 2656 sdscatprintf(sdsempty(), 2657 "-MISCONF Errors writing to the AOF file: %s\r\n", 2658 strerror(server.aof_last_write_errno))); 2659 return C_OK; 2660 } 2661 2662 /* Don't accept write commands if there are not enough good slaves and 2663 * user configured the min-slaves-to-write option. */ 2664 if (server.masterhost == NULL && 2665 server.repl_min_slaves_to_write && 2666 server.repl_min_slaves_max_lag && 2667 c->cmd->flags & CMD_WRITE && 2668 server.repl_good_slaves_count < server.repl_min_slaves_to_write) 2669 { 2670 flagTransaction(c); 2671 addReply(c, shared.noreplicaserr); 2672 return C_OK; 2673 } 2674 2675 /* Don't accept write commands if this is a read only slave. But 2676 * accept write commands if this is our master. */ 2677 if (server.masterhost && server.repl_slave_ro && 2678 !(c->flags & CLIENT_MASTER) && 2679 c->cmd->flags & CMD_WRITE) 2680 { 2681 addReply(c, shared.roslaveerr); 2682 return C_OK; 2683 } 2684 2685 /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ 2686 if (c->flags & CLIENT_PUBSUB && 2687 c->cmd->proc != pingCommand && 2688 c->cmd->proc != subscribeCommand && 2689 c->cmd->proc != unsubscribeCommand && 2690 c->cmd->proc != psubscribeCommand && 2691 c->cmd->proc != punsubscribeCommand) { 2692 addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"); 2693 return C_OK; 2694 } 2695 2696 /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on, 2697 * when slave-serve-stale-data is no and we are a slave with a broken 2698 * link with master. */ 2699 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && 2700 server.repl_serve_stale_data == 0 && 2701 !(c->cmd->flags & CMD_STALE)) 2702 { 2703 flagTransaction(c); 2704 addReply(c, shared.masterdownerr); 2705 return C_OK; 2706 } 2707 2708 /* Loading DB? Return an error if the command has not the 2709 * CMD_LOADING flag. */ 2710 if (server.loading && !(c->cmd->flags & CMD_LOADING)) { 2711 addReply(c, shared.loadingerr); 2712 return C_OK; 2713 } 2714 2715 /* Lua script too slow? Only allow a limited number of commands. */ 2716 if (server.lua_timedout && 2717 c->cmd->proc != authCommand && 2718 c->cmd->proc != replconfCommand && 2719 !(c->cmd->proc == shutdownCommand && 2720 c->argc == 2 && 2721 tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && 2722 !(c->cmd->proc == scriptCommand && 2723 c->argc == 2 && 2724 tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) 2725 { 2726 flagTransaction(c); 2727 addReply(c, shared.slowscripterr); 2728 return C_OK; 2729 } 2730 2731 /* Exec the command */ 2732 if (c->flags & CLIENT_MULTI && 2733 c->cmd->proc != execCommand && c->cmd->proc != discardCommand && 2734 c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) 2735 { 2736 queueMultiCommand(c); 2737 addReply(c,shared.queued); 2738 } else { 2739 call(c,CMD_CALL_FULL); 2740 c->woff = server.master_repl_offset; 2741 if (listLength(server.ready_keys)) 2742 handleClientsBlockedOnKeys(); 2743 } 2744 return C_OK; 2745 } 2746 2747 /*================================== Shutdown =============================== */ 2748 2749 /* Close listening sockets. Also unlink the unix domain socket if 2750 * unlink_unix_socket is non-zero. */ 2751 void closeListeningSockets(int unlink_unix_socket) { 2752 int j; 2753 2754 for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]); 2755 if (server.sofd != -1) close(server.sofd); 2756 if (server.cluster_enabled) 2757 for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]); 2758 if (unlink_unix_socket && server.unixsocket) { 2759 serverLog(LL_NOTICE,"Removing the unix socket file."); 2760 unlink(server.unixsocket); /* don't care if this fails */ 2761 } 2762 } 2763 2764 /* Reset cpu affinity as soon as new process fork(). 2765 * For new process will use same cpu core with redis server. */ 2766 void resetCpuAffinity(const char* name) 2767 { 2768 int j = 0, s = 0; 2769 cpu_set_t cpuset_frm, cpuset_to; 2770 pthread_t thread; 2771 #ifdef HAVE_FF_KQUEUE 2772 thread = pthread_self(); 2773 CPU_ZERO(&cpuset_frm); 2774 CPU_ZERO(&cpuset_to); 2775 2776 pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset_frm); 2777 for (j = 0; j < CPU_SETSIZE; j++) 2778 { 2779 if ( CPU_ISSET(j, &cpuset_frm) ) 2780 continue; 2781 CPU_SET(j, &cpuset_to); 2782 } 2783 s = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset_to); 2784 if (s != 0) 2785 serverLog(LL_WARNING,"set cpu affinity, failed."); 2786 if (name!=NULL) 2787 { 2788 pthread_setname_np(thread, name); 2789 } 2790 #endif 2791 return; 2792 } 2793 2794 int prepareForShutdown(int flags) { 2795 int save = flags & SHUTDOWN_SAVE; 2796 int nosave = flags & SHUTDOWN_NOSAVE; 2797 2798 serverLog(LL_WARNING,"User requested shutdown..."); 2799 2800 /* Kill all the Lua debugger forked sessions. */ 2801 ldbKillForkedSessions(); 2802 2803 /* Kill the saving child if there is a background saving in progress. 2804 We want to avoid race conditions, for instance our saving child may 2805 overwrite the synchronous saving did by SHUTDOWN. */ 2806 if (server.rdb_child_pid != -1) { 2807 serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!"); 2808 kill(server.rdb_child_pid,SIGUSR1); 2809 rdbRemoveTempFile(server.rdb_child_pid); 2810 } 2811 2812 if (server.aof_state != AOF_OFF) { 2813 /* Kill the AOF saving child as the AOF we already have may be longer 2814 * but contains the full dataset anyway. */ 2815 if (server.aof_child_pid != -1) { 2816 /* If we have AOF enabled but haven't written the AOF yet, don't 2817 * shutdown or else the dataset will be lost. */ 2818 if (server.aof_state == AOF_WAIT_REWRITE) { 2819 serverLog(LL_WARNING, "Writing initial AOF, can't exit."); 2820 return C_ERR; 2821 } 2822 serverLog(LL_WARNING, 2823 "There is a child rewriting the AOF. Killing it!"); 2824 kill(server.aof_child_pid,SIGUSR1); 2825 } 2826 /* Append only file: flush buffers and fsync() the AOF at exit */ 2827 serverLog(LL_NOTICE,"Calling fsync() on the AOF file."); 2828 flushAppendOnlyFile(1); 2829 redis_fsync(server.aof_fd); 2830 } 2831 2832 /* Create a new RDB file before exiting. */ 2833 if ((server.saveparamslen > 0 && !nosave) || save) { 2834 serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting."); 2835 /* Snapshotting. Perform a SYNC SAVE and exit */ 2836 rdbSaveInfo rsi, *rsiptr; 2837 rsiptr = rdbPopulateSaveInfo(&rsi); 2838 if (rdbSave(server.rdb_filename,rsiptr) != C_OK) { 2839 /* Ooops.. error saving! The best we can do is to continue 2840 * operating. Note that if there was a background saving process, 2841 * in the next cron() Redis will be notified that the background 2842 * saving aborted, handling special stuff like slaves pending for 2843 * synchronization... */ 2844 serverLog(LL_WARNING,"Error trying to save the DB, can't exit."); 2845 return C_ERR; 2846 } 2847 } 2848 2849 /* Remove the pid file if possible and needed. */ 2850 if (server.daemonize || server.pidfile) { 2851 serverLog(LL_NOTICE,"Removing the pid file."); 2852 unlink(server.pidfile); 2853 } 2854 2855 /* Best effort flush of slave output buffers, so that we hopefully 2856 * send them pending writes. */ 2857 flushSlavesOutputBuffers(); 2858 2859 /* Close the listening sockets. Apparently this allows faster restarts. */ 2860 closeListeningSockets(1); 2861 serverLog(LL_WARNING,"%s is now ready to exit, bye bye...", 2862 server.sentinel_mode ? "Sentinel" : "Redis"); 2863 return C_OK; 2864 } 2865 2866 /*================================== Commands =============================== */ 2867 2868 /* Sometimes Redis cannot accept write commands because there is a perstence 2869 * error with the RDB or AOF file, and Redis is configured in order to stop 2870 * accepting writes in such situation. This function returns if such a 2871 * condition is active, and the type of the condition. 2872 * 2873 * Function return values: 2874 * 2875 * DISK_ERROR_TYPE_NONE: No problems, we can accept writes. 2876 * DISK_ERROR_TYPE_AOF: Don't accept writes: AOF errors. 2877 * DISK_ERROR_TYPE_RDB: Don't accept writes: RDB errors. 2878 */ 2879 int writeCommandsDeniedByDiskError(void) { 2880 if (server.stop_writes_on_bgsave_err && 2881 server.saveparamslen > 0 && 2882 server.lastbgsave_status == C_ERR) 2883 { 2884 return DISK_ERROR_TYPE_RDB; 2885 } else if (server.aof_state != AOF_OFF && 2886 server.aof_last_write_status == C_ERR) 2887 { 2888 return DISK_ERROR_TYPE_AOF; 2889 } else { 2890 return DISK_ERROR_TYPE_NONE; 2891 } 2892 } 2893 2894 /* Return zero if strings are the same, non-zero if they are not. 2895 * The comparison is performed in a way that prevents an attacker to obtain 2896 * information about the nature of the strings just monitoring the execution 2897 * time of the function. 2898 * 2899 * Note that limiting the comparison length to strings up to 512 bytes we 2900 * can avoid leaking any information about the password length and any 2901 * possible branch misprediction related leak. 2902 */ 2903 int time_independent_strcmp(char *a, char *b) { 2904 char bufa[CONFIG_AUTHPASS_MAX_LEN], bufb[CONFIG_AUTHPASS_MAX_LEN]; 2905 /* The above two strlen perform len(a) + len(b) operations where either 2906 * a or b are fixed (our password) length, and the difference is only 2907 * relative to the length of the user provided string, so no information 2908 * leak is possible in the following two lines of code. */ 2909 unsigned int alen = strlen(a); 2910 unsigned int blen = strlen(b); 2911 unsigned int j; 2912 int diff = 0; 2913 2914 /* We can't compare strings longer than our static buffers. 2915 * Note that this will never pass the first test in practical circumstances 2916 * so there is no info leak. */ 2917 if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1; 2918 2919 memset(bufa,0,sizeof(bufa)); /* Constant time. */ 2920 memset(bufb,0,sizeof(bufb)); /* Constant time. */ 2921 /* Again the time of the following two copies is proportional to 2922 * len(a) + len(b) so no info is leaked. */ 2923 memcpy(bufa,a,alen); 2924 memcpy(bufb,b,blen); 2925 2926 /* Always compare all the chars in the two buffers without 2927 * conditional expressions. */ 2928 for (j = 0; j < sizeof(bufa); j++) { 2929 diff |= (bufa[j] ^ bufb[j]); 2930 } 2931 /* Length must be equal as well. */ 2932 diff |= alen ^ blen; 2933 return diff; /* If zero strings are the same. */ 2934 } 2935 2936 void authCommand(client *c) { 2937 if (!server.requirepass) { 2938 addReplyError(c,"Client sent AUTH, but no password is set"); 2939 } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) { 2940 c->authenticated = 1; 2941 addReply(c,shared.ok); 2942 } else { 2943 c->authenticated = 0; 2944 addReplyError(c,"invalid password"); 2945 } 2946 } 2947 2948 /* The PING command. It works in a different way if the client is in 2949 * in Pub/Sub mode. */ 2950 void pingCommand(client *c) { 2951 /* The command takes zero or one arguments. */ 2952 if (c->argc > 2) { 2953 addReplyErrorFormat(c,"wrong number of arguments for '%s' command", 2954 c->cmd->name); 2955 return; 2956 } 2957 2958 if (c->flags & CLIENT_PUBSUB) { 2959 addReply(c,shared.mbulkhdr[2]); 2960 addReplyBulkCBuffer(c,"pong",4); 2961 if (c->argc == 1) 2962 addReplyBulkCBuffer(c,"",0); 2963 else 2964 addReplyBulk(c,c->argv[1]); 2965 } else { 2966 if (c->argc == 1) 2967 addReply(c,shared.pong); 2968 else 2969 addReplyBulk(c,c->argv[1]); 2970 } 2971 } 2972 2973 void echoCommand(client *c) { 2974 addReplyBulk(c,c->argv[1]); 2975 } 2976 2977 void timeCommand(client *c) { 2978 struct timeval tv; 2979 2980 /* gettimeofday() can only fail if &tv is a bad address so we 2981 * don't check for errors. */ 2982 gettimeofday(&tv,NULL); 2983 addReplyMultiBulkLen(c,2); 2984 addReplyBulkLongLong(c,tv.tv_sec); 2985 addReplyBulkLongLong(c,tv.tv_usec); 2986 } 2987 2988 /* Helper function for addReplyCommand() to output flags. */ 2989 int addReplyCommandFlag(client *c, struct redisCommand *cmd, int f, char *reply) { 2990 if (cmd->flags & f) { 2991 addReplyStatus(c, reply); 2992 return 1; 2993 } 2994 return 0; 2995 } 2996 2997 /* Output the representation of a Redis command. Used by the COMMAND command. */ 2998 void addReplyCommand(client *c, struct redisCommand *cmd) { 2999 if (!cmd) { 3000 addReply(c, shared.nullbulk); 3001 } else { 3002 /* We are adding: command name, arg count, flags, first, last, offset */ 3003 addReplyMultiBulkLen(c, 6); 3004 addReplyBulkCString(c, cmd->name); 3005 addReplyLongLong(c, cmd->arity); 3006 3007 int flagcount = 0; 3008 void *flaglen = addDeferredMultiBulkLength(c); 3009 flagcount += addReplyCommandFlag(c,cmd,CMD_WRITE, "write"); 3010 flagcount += addReplyCommandFlag(c,cmd,CMD_READONLY, "readonly"); 3011 flagcount += addReplyCommandFlag(c,cmd,CMD_DENYOOM, "denyoom"); 3012 flagcount += addReplyCommandFlag(c,cmd,CMD_ADMIN, "admin"); 3013 flagcount += addReplyCommandFlag(c,cmd,CMD_PUBSUB, "pubsub"); 3014 flagcount += addReplyCommandFlag(c,cmd,CMD_NOSCRIPT, "noscript"); 3015 flagcount += addReplyCommandFlag(c,cmd,CMD_RANDOM, "random"); 3016 flagcount += addReplyCommandFlag(c,cmd,CMD_SORT_FOR_SCRIPT,"sort_for_script"); 3017 flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading"); 3018 flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale"); 3019 flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor"); 3020 flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking"); 3021 flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast"); 3022 if ((cmd->getkeys_proc && !(cmd->flags & CMD_MODULE)) || 3023 cmd->flags & CMD_MODULE_GETKEYS) 3024 { 3025 addReplyStatus(c, "movablekeys"); 3026 flagcount += 1; 3027 } 3028 setDeferredMultiBulkLength(c, flaglen, flagcount); 3029 3030 addReplyLongLong(c, cmd->firstkey); 3031 addReplyLongLong(c, cmd->lastkey); 3032 addReplyLongLong(c, cmd->keystep); 3033 } 3034 } 3035 3036 /* COMMAND <subcommand> <args> */ 3037 void commandCommand(client *c) { 3038 dictIterator *di; 3039 dictEntry *de; 3040 3041 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { 3042 const char *help[] = { 3043 "(no subcommand) -- Return details about all Redis commands.", 3044 "COUNT -- Return the total number of commands in this Redis server.", 3045 "GETKEYS <full-command> -- Return the keys from a full Redis command.", 3046 "INFO [command-name ...] -- Return details about multiple Redis commands.", 3047 NULL 3048 }; 3049 addReplyHelp(c, help); 3050 } else if (c->argc == 1) { 3051 addReplyMultiBulkLen(c, dictSize(server.commands)); 3052 di = dictGetIterator(server.commands); 3053 while ((de = dictNext(di)) != NULL) { 3054 addReplyCommand(c, dictGetVal(de)); 3055 } 3056 dictReleaseIterator(di); 3057 } else if (!strcasecmp(c->argv[1]->ptr, "info")) { 3058 int i; 3059 addReplyMultiBulkLen(c, c->argc-2); 3060 for (i = 2; i < c->argc; i++) { 3061 addReplyCommand(c, dictFetchValue(server.commands, c->argv[i]->ptr)); 3062 } 3063 } else if (!strcasecmp(c->argv[1]->ptr, "count") && c->argc == 2) { 3064 addReplyLongLong(c, dictSize(server.commands)); 3065 } else if (!strcasecmp(c->argv[1]->ptr,"getkeys") && c->argc >= 3) { 3066 struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr); 3067 int *keys, numkeys, j; 3068 3069 if (!cmd) { 3070 addReplyError(c,"Invalid command specified"); 3071 return; 3072 } else if (cmd->getkeys_proc == NULL && cmd->firstkey == 0) { 3073 addReplyError(c,"The command has no key arguments"); 3074 return; 3075 } else if ((cmd->arity > 0 && cmd->arity != c->argc-2) || 3076 ((c->argc-2) < -cmd->arity)) 3077 { 3078 addReplyError(c,"Invalid number of arguments specified for command"); 3079 return; 3080 } 3081 3082 keys = getKeysFromCommand(cmd,c->argv+2,c->argc-2,&numkeys); 3083 if (!keys) { 3084 addReplyError(c,"Invalid arguments specified for command"); 3085 } else { 3086 addReplyMultiBulkLen(c,numkeys); 3087 for (j = 0; j < numkeys; j++) addReplyBulk(c,c->argv[keys[j]+2]); 3088 getKeysFreeResult(keys); 3089 } 3090 } else { 3091 addReplySubcommandSyntaxError(c); 3092 } 3093 } 3094 3095 /* Convert an amount of bytes into a human readable string in the form 3096 * of 100B, 2G, 100M, 4K, and so forth. */ 3097 void bytesToHuman(char *s, unsigned long long n) { 3098 double d; 3099 3100 if (n < 1024) { 3101 /* Bytes */ 3102 sprintf(s,"%lluB",n); 3103 } else if (n < (1024*1024)) { 3104 d = (double)n/(1024); 3105 sprintf(s,"%.2fK",d); 3106 } else if (n < (1024LL*1024*1024)) { 3107 d = (double)n/(1024*1024); 3108 sprintf(s,"%.2fM",d); 3109 } else if (n < (1024LL*1024*1024*1024)) { 3110 d = (double)n/(1024LL*1024*1024); 3111 sprintf(s,"%.2fG",d); 3112 } else if (n < (1024LL*1024*1024*1024*1024)) { 3113 d = (double)n/(1024LL*1024*1024*1024); 3114 sprintf(s,"%.2fT",d); 3115 } else if (n < (1024LL*1024*1024*1024*1024*1024)) { 3116 d = (double)n/(1024LL*1024*1024*1024*1024); 3117 sprintf(s,"%.2fP",d); 3118 } else { 3119 /* Let's hope we never need this */ 3120 sprintf(s,"%lluB",n); 3121 } 3122 } 3123 3124 /* Create the string returned by the INFO command. This is decoupled 3125 * by the INFO command itself as we need to report the same information 3126 * on memory corruption problems. */ 3127 sds genRedisInfoString(char *section) { 3128 sds info = sdsempty(); 3129 time_t uptime = server.unixtime-server.stat_starttime; 3130 int j; 3131 struct rusage self_ru, c_ru; 3132 int allsections = 0, defsections = 0; 3133 int sections = 0; 3134 3135 if (section == NULL) section = "default"; 3136 allsections = strcasecmp(section,"all") == 0; 3137 defsections = strcasecmp(section,"default") == 0; 3138 3139 getrusage(RUSAGE_SELF, &self_ru); 3140 getrusage(RUSAGE_CHILDREN, &c_ru); 3141 3142 /* Server */ 3143 if (allsections || defsections || !strcasecmp(section,"server")) { 3144 static int call_uname = 1; 3145 static struct utsname name; 3146 char *mode; 3147 3148 if (server.cluster_enabled) mode = "cluster"; 3149 else if (server.sentinel_mode) mode = "sentinel"; 3150 else mode = "standalone"; 3151 3152 if (sections++) info = sdscat(info,"\r\n"); 3153 3154 if (call_uname) { 3155 /* Uname can be slow and is always the same output. Cache it. */ 3156 uname(&name); 3157 call_uname = 0; 3158 } 3159 3160 unsigned int lruclock; 3161 atomicGet(server.lruclock,lruclock); 3162 info = sdscatprintf(info, 3163 "# Server\r\n" 3164 "redis_version:%s\r\n" 3165 "redis_git_sha1:%s\r\n" 3166 "redis_git_dirty:%d\r\n" 3167 "redis_build_id:%llx\r\n" 3168 "redis_mode:%s\r\n" 3169 "os:%s %s %s\r\n" 3170 "arch_bits:%d\r\n" 3171 "multiplexing_api:%s\r\n" 3172 "atomicvar_api:%s\r\n" 3173 "gcc_version:%d.%d.%d\r\n" 3174 "process_id:%ld\r\n" 3175 "run_id:%s\r\n" 3176 "tcp_port:%d\r\n" 3177 "uptime_in_seconds:%jd\r\n" 3178 "uptime_in_days:%jd\r\n" 3179 "hz:%d\r\n" 3180 "configured_hz:%d\r\n" 3181 "lru_clock:%ld\r\n" 3182 "executable:%s\r\n" 3183 "config_file:%s\r\n", 3184 REDIS_VERSION, 3185 redisGitSHA1(), 3186 strtol(redisGitDirty(),NULL,10) > 0, 3187 (unsigned long long) redisBuildId(), 3188 mode, 3189 name.sysname, name.release, name.machine, 3190 server.arch_bits, 3191 aeGetApiName(), 3192 REDIS_ATOMIC_API, 3193 #ifdef __GNUC__ 3194 __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__, 3195 #else 3196 0,0,0, 3197 #endif 3198 (long) getpid(), 3199 server.runid, 3200 server.port, 3201 (intmax_t)uptime, 3202 (intmax_t)(uptime/(3600*24)), 3203 server.hz, 3204 server.config_hz, 3205 (unsigned long) lruclock, 3206 server.executable ? server.executable : "", 3207 server.configfile ? server.configfile : ""); 3208 } 3209 3210 /* Clients */ 3211 if (allsections || defsections || !strcasecmp(section,"clients")) { 3212 size_t maxin, maxout; 3213 getExpansiveClientsInfo(&maxin,&maxout); 3214 if (sections++) info = sdscat(info,"\r\n"); 3215 info = sdscatprintf(info, 3216 "# Clients\r\n" 3217 "connected_clients:%lu\r\n" 3218 "client_recent_max_input_buffer:%zu\r\n" 3219 "client_recent_max_output_buffer:%zu\r\n" 3220 "blocked_clients:%d\r\n", 3221 listLength(server.clients)-listLength(server.slaves), 3222 maxin, maxout, 3223 server.blocked_clients); 3224 } 3225 3226 /* Memory */ 3227 if (allsections || defsections || !strcasecmp(section,"memory")) { 3228 char hmem[64]; 3229 char peak_hmem[64]; 3230 char total_system_hmem[64]; 3231 char used_memory_lua_hmem[64]; 3232 char used_memory_scripts_hmem[64]; 3233 char used_memory_rss_hmem[64]; 3234 char maxmemory_hmem[64]; 3235 size_t zmalloc_used = zmalloc_used_memory(); 3236 size_t total_system_mem = server.system_memory_size; 3237 const char *evict_policy = evictPolicyToString(); 3238 long long memory_lua = (long long)lua_gc(server.lua,LUA_GCCOUNT,0)*1024; 3239 struct redisMemOverhead *mh = getMemoryOverheadData(); 3240 3241 /* Peak memory is updated from time to time by serverCron() so it 3242 * may happen that the instantaneous value is slightly bigger than 3243 * the peak value. This may confuse users, so we update the peak 3244 * if found smaller than the current memory usage. */ 3245 if (zmalloc_used > server.stat_peak_memory) 3246 server.stat_peak_memory = zmalloc_used; 3247 3248 bytesToHuman(hmem,zmalloc_used); 3249 bytesToHuman(peak_hmem,server.stat_peak_memory); 3250 bytesToHuman(total_system_hmem,total_system_mem); 3251 bytesToHuman(used_memory_lua_hmem,memory_lua); 3252 bytesToHuman(used_memory_scripts_hmem,mh->lua_caches); 3253 bytesToHuman(used_memory_rss_hmem,server.cron_malloc_stats.process_rss); 3254 bytesToHuman(maxmemory_hmem,server.maxmemory); 3255 3256 if (sections++) info = sdscat(info,"\r\n"); 3257 info = sdscatprintf(info, 3258 "# Memory\r\n" 3259 "used_memory:%zu\r\n" 3260 "used_memory_human:%s\r\n" 3261 "used_memory_rss:%zu\r\n" 3262 "used_memory_rss_human:%s\r\n" 3263 "used_memory_peak:%zu\r\n" 3264 "used_memory_peak_human:%s\r\n" 3265 "used_memory_peak_perc:%.2f%%\r\n" 3266 "used_memory_overhead:%zu\r\n" 3267 "used_memory_startup:%zu\r\n" 3268 "used_memory_dataset:%zu\r\n" 3269 "used_memory_dataset_perc:%.2f%%\r\n" 3270 "allocator_allocated:%zu\r\n" 3271 "allocator_active:%zu\r\n" 3272 "allocator_resident:%zu\r\n" 3273 "total_system_memory:%lu\r\n" 3274 "total_system_memory_human:%s\r\n" 3275 "used_memory_lua:%lld\r\n" 3276 "used_memory_lua_human:%s\r\n" 3277 "used_memory_scripts:%lld\r\n" 3278 "used_memory_scripts_human:%s\r\n" 3279 "number_of_cached_scripts:%lu\r\n" 3280 "maxmemory:%lld\r\n" 3281 "maxmemory_human:%s\r\n" 3282 "maxmemory_policy:%s\r\n" 3283 "allocator_frag_ratio:%.2f\r\n" 3284 "allocator_frag_bytes:%zu\r\n" 3285 "allocator_rss_ratio:%.2f\r\n" 3286 "allocator_rss_bytes:%zd\r\n" 3287 "rss_overhead_ratio:%.2f\r\n" 3288 "rss_overhead_bytes:%zd\r\n" 3289 "mem_fragmentation_ratio:%.2f\r\n" 3290 "mem_fragmentation_bytes:%zd\r\n" 3291 "mem_not_counted_for_evict:%zu\r\n" 3292 "mem_replication_backlog:%zu\r\n" 3293 "mem_clients_slaves:%zu\r\n" 3294 "mem_clients_normal:%zu\r\n" 3295 "mem_aof_buffer:%zu\r\n" 3296 "mem_allocator:%s\r\n" 3297 "active_defrag_running:%d\r\n" 3298 "lazyfree_pending_objects:%zu\r\n", 3299 zmalloc_used, 3300 hmem, 3301 server.cron_malloc_stats.process_rss, 3302 used_memory_rss_hmem, 3303 server.stat_peak_memory, 3304 peak_hmem, 3305 mh->peak_perc, 3306 mh->overhead_total, 3307 mh->startup_allocated, 3308 mh->dataset, 3309 mh->dataset_perc, 3310 server.cron_malloc_stats.allocator_allocated, 3311 server.cron_malloc_stats.allocator_active, 3312 server.cron_malloc_stats.allocator_resident, 3313 (unsigned long)total_system_mem, 3314 total_system_hmem, 3315 memory_lua, 3316 used_memory_lua_hmem, 3317 (long long) mh->lua_caches, 3318 used_memory_scripts_hmem, 3319 dictSize(server.lua_scripts), 3320 server.maxmemory, 3321 maxmemory_hmem, 3322 evict_policy, 3323 mh->allocator_frag, 3324 mh->allocator_frag_bytes, 3325 mh->allocator_rss, 3326 mh->allocator_rss_bytes, 3327 mh->rss_extra, 3328 mh->rss_extra_bytes, 3329 mh->total_frag, /* this is the total RSS overhead, including fragmentation, */ 3330 mh->total_frag_bytes, /* named so for backwards compatibility */ 3331 freeMemoryGetNotCountedMemory(), 3332 mh->repl_backlog, 3333 mh->clients_slaves, 3334 mh->clients_normal, 3335 mh->aof_buffer, 3336 ZMALLOC_LIB, 3337 server.active_defrag_running, 3338 lazyfreeGetPendingObjectsCount() 3339 ); 3340 freeMemoryOverheadData(mh); 3341 } 3342 3343 /* Persistence */ 3344 if (allsections || defsections || !strcasecmp(section,"persistence")) { 3345 if (sections++) info = sdscat(info,"\r\n"); 3346 info = sdscatprintf(info, 3347 "# Persistence\r\n" 3348 "loading:%d\r\n" 3349 "rdb_changes_since_last_save:%lld\r\n" 3350 "rdb_bgsave_in_progress:%d\r\n" 3351 "rdb_last_save_time:%jd\r\n" 3352 "rdb_last_bgsave_status:%s\r\n" 3353 "rdb_last_bgsave_time_sec:%jd\r\n" 3354 "rdb_current_bgsave_time_sec:%jd\r\n" 3355 "rdb_last_cow_size:%zu\r\n" 3356 "aof_enabled:%d\r\n" 3357 "aof_rewrite_in_progress:%d\r\n" 3358 "aof_rewrite_scheduled:%d\r\n" 3359 "aof_last_rewrite_time_sec:%jd\r\n" 3360 "aof_current_rewrite_time_sec:%jd\r\n" 3361 "aof_last_bgrewrite_status:%s\r\n" 3362 "aof_last_write_status:%s\r\n" 3363 "aof_last_cow_size:%zu\r\n", 3364 server.loading, 3365 server.dirty, 3366 server.rdb_child_pid != -1, 3367 (intmax_t)server.lastsave, 3368 (server.lastbgsave_status == C_OK) ? "ok" : "err", 3369 (intmax_t)server.rdb_save_time_last, 3370 (intmax_t)((server.rdb_child_pid == -1) ? 3371 -1 : time(NULL)-server.rdb_save_time_start), 3372 server.stat_rdb_cow_bytes, 3373 server.aof_state != AOF_OFF, 3374 server.aof_child_pid != -1, 3375 server.aof_rewrite_scheduled, 3376 (intmax_t)server.aof_rewrite_time_last, 3377 (intmax_t)((server.aof_child_pid == -1) ? 3378 -1 : time(NULL)-server.aof_rewrite_time_start), 3379 (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err", 3380 (server.aof_last_write_status == C_OK) ? "ok" : "err", 3381 server.stat_aof_cow_bytes); 3382 3383 if (server.aof_state != AOF_OFF) { 3384 info = sdscatprintf(info, 3385 "aof_current_size:%lld\r\n" 3386 "aof_base_size:%lld\r\n" 3387 "aof_pending_rewrite:%d\r\n" 3388 "aof_buffer_length:%zu\r\n" 3389 "aof_rewrite_buffer_length:%lu\r\n" 3390 "aof_pending_bio_fsync:%llu\r\n" 3391 "aof_delayed_fsync:%lu\r\n", 3392 (long long) server.aof_current_size, 3393 (long long) server.aof_rewrite_base_size, 3394 server.aof_rewrite_scheduled, 3395 sdslen(server.aof_buf), 3396 aofRewriteBufferSize(), 3397 bioPendingJobsOfType(BIO_AOF_FSYNC), 3398 server.aof_delayed_fsync); 3399 } 3400 3401 if (server.loading) { 3402 double perc; 3403 time_t eta, elapsed; 3404 off_t remaining_bytes = server.loading_total_bytes- 3405 server.loading_loaded_bytes; 3406 3407 perc = ((double)server.loading_loaded_bytes / 3408 (server.loading_total_bytes+1)) * 100; 3409 3410 elapsed = time(NULL)-server.loading_start_time; 3411 if (elapsed == 0) { 3412 eta = 1; /* A fake 1 second figure if we don't have 3413 enough info */ 3414 } else { 3415 eta = (elapsed*remaining_bytes)/(server.loading_loaded_bytes+1); 3416 } 3417 3418 info = sdscatprintf(info, 3419 "loading_start_time:%jd\r\n" 3420 "loading_total_bytes:%llu\r\n" 3421 "loading_loaded_bytes:%llu\r\n" 3422 "loading_loaded_perc:%.2f\r\n" 3423 "loading_eta_seconds:%jd\r\n", 3424 (intmax_t) server.loading_start_time, 3425 (unsigned long long) server.loading_total_bytes, 3426 (unsigned long long) server.loading_loaded_bytes, 3427 perc, 3428 (intmax_t)eta 3429 ); 3430 } 3431 } 3432 3433 /* Stats */ 3434 if (allsections || defsections || !strcasecmp(section,"stats")) { 3435 if (sections++) info = sdscat(info,"\r\n"); 3436 info = sdscatprintf(info, 3437 "# Stats\r\n" 3438 "total_connections_received:%lld\r\n" 3439 "total_commands_processed:%lld\r\n" 3440 "instantaneous_ops_per_sec:%lld\r\n" 3441 "total_net_input_bytes:%lld\r\n" 3442 "total_net_output_bytes:%lld\r\n" 3443 "instantaneous_input_kbps:%.2f\r\n" 3444 "instantaneous_output_kbps:%.2f\r\n" 3445 "rejected_connections:%lld\r\n" 3446 "sync_full:%lld\r\n" 3447 "sync_partial_ok:%lld\r\n" 3448 "sync_partial_err:%lld\r\n" 3449 "expired_keys:%lld\r\n" 3450 "expired_stale_perc:%.2f\r\n" 3451 "expired_time_cap_reached_count:%lld\r\n" 3452 "evicted_keys:%lld\r\n" 3453 "keyspace_hits:%lld\r\n" 3454 "keyspace_misses:%lld\r\n" 3455 "pubsub_channels:%ld\r\n" 3456 "pubsub_patterns:%lu\r\n" 3457 "latest_fork_usec:%lld\r\n" 3458 "migrate_cached_sockets:%ld\r\n" 3459 "slave_expires_tracked_keys:%zu\r\n" 3460 "active_defrag_hits:%lld\r\n" 3461 "active_defrag_misses:%lld\r\n" 3462 "active_defrag_key_hits:%lld\r\n" 3463 "active_defrag_key_misses:%lld\r\n", 3464 server.stat_numconnections, 3465 server.stat_numcommands, 3466 getInstantaneousMetric(STATS_METRIC_COMMAND), 3467 server.stat_net_input_bytes, 3468 server.stat_net_output_bytes, 3469 (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024, 3470 (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024, 3471 server.stat_rejected_conn, 3472 server.stat_sync_full, 3473 server.stat_sync_partial_ok, 3474 server.stat_sync_partial_err, 3475 server.stat_expiredkeys, 3476 server.stat_expired_stale_perc*100, 3477 server.stat_expired_time_cap_reached_count, 3478 server.stat_evictedkeys, 3479 server.stat_keyspace_hits, 3480 server.stat_keyspace_misses, 3481 dictSize(server.pubsub_channels), 3482 listLength(server.pubsub_patterns), 3483 server.stat_fork_time, 3484 dictSize(server.migrate_cached_sockets), 3485 getSlaveKeyWithExpireCount(), 3486 server.stat_active_defrag_hits, 3487 server.stat_active_defrag_misses, 3488 server.stat_active_defrag_key_hits, 3489 server.stat_active_defrag_key_misses); 3490 } 3491 3492 /* Replication */ 3493 if (allsections || defsections || !strcasecmp(section,"replication")) { 3494 if (sections++) info = sdscat(info,"\r\n"); 3495 info = sdscatprintf(info, 3496 "# Replication\r\n" 3497 "role:%s\r\n", 3498 server.masterhost == NULL ? "master" : "slave"); 3499 if (server.masterhost) { 3500 long long slave_repl_offset = 1; 3501 3502 if (server.master) 3503 slave_repl_offset = server.master->reploff; 3504 else if (server.cached_master) 3505 slave_repl_offset = server.cached_master->reploff; 3506 3507 info = sdscatprintf(info, 3508 "master_host:%s\r\n" 3509 "master_port:%d\r\n" 3510 "master_link_status:%s\r\n" 3511 "master_last_io_seconds_ago:%d\r\n" 3512 "master_sync_in_progress:%d\r\n" 3513 "slave_repl_offset:%lld\r\n" 3514 ,server.masterhost, 3515 server.masterport, 3516 (server.repl_state == REPL_STATE_CONNECTED) ? 3517 "up" : "down", 3518 server.master ? 3519 ((int)(server.unixtime-server.master->lastinteraction)) : -1, 3520 server.repl_state == REPL_STATE_TRANSFER, 3521 slave_repl_offset 3522 ); 3523 3524 if (server.repl_state == REPL_STATE_TRANSFER) { 3525 info = sdscatprintf(info, 3526 "master_sync_left_bytes:%lld\r\n" 3527 "master_sync_last_io_seconds_ago:%d\r\n" 3528 , (long long) 3529 (server.repl_transfer_size - server.repl_transfer_read), 3530 (int)(server.unixtime-server.repl_transfer_lastio) 3531 ); 3532 } 3533 3534 if (server.repl_state != REPL_STATE_CONNECTED) { 3535 info = sdscatprintf(info, 3536 "master_link_down_since_seconds:%jd\r\n", 3537 (intmax_t)server.unixtime-server.repl_down_since); 3538 } 3539 info = sdscatprintf(info, 3540 "slave_priority:%d\r\n" 3541 "slave_read_only:%d\r\n", 3542 server.slave_priority, 3543 server.repl_slave_ro); 3544 } 3545 3546 info = sdscatprintf(info, 3547 "connected_slaves:%lu\r\n", 3548 listLength(server.slaves)); 3549 3550 /* If min-slaves-to-write is active, write the number of slaves 3551 * currently considered 'good'. */ 3552 if (server.repl_min_slaves_to_write && 3553 server.repl_min_slaves_max_lag) { 3554 info = sdscatprintf(info, 3555 "min_slaves_good_slaves:%d\r\n", 3556 server.repl_good_slaves_count); 3557 } 3558 3559 if (listLength(server.slaves)) { 3560 int slaveid = 0; 3561 listNode *ln; 3562 listIter li; 3563 3564 listRewind(server.slaves,&li); 3565 while((ln = listNext(&li))) { 3566 client *slave = listNodeValue(ln); 3567 char *state = NULL; 3568 char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip; 3569 int port; 3570 long lag = 0; 3571 3572 if (slaveip[0] == '\0') { 3573 if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1) 3574 continue; 3575 slaveip = ip; 3576 } 3577 switch(slave->replstate) { 3578 case SLAVE_STATE_WAIT_BGSAVE_START: 3579 case SLAVE_STATE_WAIT_BGSAVE_END: 3580 state = "wait_bgsave"; 3581 break; 3582 case SLAVE_STATE_SEND_BULK: 3583 state = "send_bulk"; 3584 break; 3585 case SLAVE_STATE_ONLINE: 3586 state = "online"; 3587 break; 3588 } 3589 if (state == NULL) continue; 3590 if (slave->replstate == SLAVE_STATE_ONLINE) 3591 lag = time(NULL) - slave->repl_ack_time; 3592 3593 info = sdscatprintf(info, 3594 "slave%d:ip=%s,port=%d,state=%s," 3595 "offset=%lld,lag=%ld\r\n", 3596 slaveid,slaveip,slave->slave_listening_port,state, 3597 slave->repl_ack_off, lag); 3598 slaveid++; 3599 } 3600 } 3601 info = sdscatprintf(info, 3602 "master_replid:%s\r\n" 3603 "master_replid2:%s\r\n" 3604 "master_repl_offset:%lld\r\n" 3605 "second_repl_offset:%lld\r\n" 3606 "repl_backlog_active:%d\r\n" 3607 "repl_backlog_size:%lld\r\n" 3608 "repl_backlog_first_byte_offset:%lld\r\n" 3609 "repl_backlog_histlen:%lld\r\n", 3610 server.replid, 3611 server.replid2, 3612 server.master_repl_offset, 3613 server.second_replid_offset, 3614 server.repl_backlog != NULL, 3615 server.repl_backlog_size, 3616 server.repl_backlog_off, 3617 server.repl_backlog_histlen); 3618 } 3619 3620 /* CPU */ 3621 if (allsections || defsections || !strcasecmp(section,"cpu")) { 3622 if (sections++) info = sdscat(info,"\r\n"); 3623 info = sdscatprintf(info, 3624 "# CPU\r\n" 3625 "used_cpu_sys:%ld.%06ld\r\n" 3626 "used_cpu_user:%ld.%06ld\r\n" 3627 "used_cpu_sys_children:%ld.%06ld\r\n" 3628 "used_cpu_user_children:%ld.%06ld\r\n", 3629 (long)self_ru.ru_stime.tv_sec, (long)self_ru.ru_stime.tv_usec, 3630 (long)self_ru.ru_utime.tv_sec, (long)self_ru.ru_utime.tv_usec, 3631 (long)c_ru.ru_stime.tv_sec, (long)c_ru.ru_stime.tv_usec, 3632 (long)c_ru.ru_utime.tv_sec, (long)c_ru.ru_utime.tv_usec); 3633 } 3634 3635 /* Command statistics */ 3636 if (allsections || !strcasecmp(section,"commandstats")) { 3637 if (sections++) info = sdscat(info,"\r\n"); 3638 info = sdscatprintf(info, "# Commandstats\r\n"); 3639 3640 struct redisCommand *c; 3641 dictEntry *de; 3642 dictIterator *di; 3643 di = dictGetSafeIterator(server.commands); 3644 while((de = dictNext(di)) != NULL) { 3645 c = (struct redisCommand *) dictGetVal(de); 3646 if (!c->calls) continue; 3647 info = sdscatprintf(info, 3648 "cmdstat_%s:calls=%lld,usec=%lld,usec_per_call=%.2f\r\n", 3649 c->name, c->calls, c->microseconds, 3650 (c->calls == 0) ? 0 : ((float)c->microseconds/c->calls)); 3651 } 3652 dictReleaseIterator(di); 3653 } 3654 3655 /* Cluster */ 3656 if (allsections || defsections || !strcasecmp(section,"cluster")) { 3657 if (sections++) info = sdscat(info,"\r\n"); 3658 info = sdscatprintf(info, 3659 "# Cluster\r\n" 3660 "cluster_enabled:%d\r\n", 3661 server.cluster_enabled); 3662 } 3663 3664 /* Key space */ 3665 if (allsections || defsections || !strcasecmp(section,"keyspace")) { 3666 if (sections++) info = sdscat(info,"\r\n"); 3667 info = sdscatprintf(info, "# Keyspace\r\n"); 3668 for (j = 0; j < server.dbnum; j++) { 3669 long long keys, vkeys; 3670 3671 keys = dictSize(server.db[j].dict); 3672 vkeys = dictSize(server.db[j].expires); 3673 if (keys || vkeys) { 3674 info = sdscatprintf(info, 3675 "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n", 3676 j, keys, vkeys, server.db[j].avg_ttl); 3677 } 3678 } 3679 } 3680 return info; 3681 } 3682 3683 void infoCommand(client *c) { 3684 char *section = c->argc == 2 ? c->argv[1]->ptr : "default"; 3685 3686 if (c->argc > 2) { 3687 addReply(c,shared.syntaxerr); 3688 return; 3689 } 3690 addReplyBulkSds(c, genRedisInfoString(section)); 3691 } 3692 3693 void monitorCommand(client *c) { 3694 /* ignore MONITOR if already slave or in monitor mode */ 3695 if (c->flags & CLIENT_SLAVE) return; 3696 3697 c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR); 3698 listAddNodeTail(server.monitors,c); 3699 addReply(c,shared.ok); 3700 } 3701 3702 /* =================================== Main! ================================ */ 3703 3704 #ifdef __linux__ 3705 int linuxOvercommitMemoryValue(void) { 3706 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r"); 3707 char buf[64]; 3708 3709 if (!fp) return -1; 3710 if (fgets(buf,64,fp) == NULL) { 3711 fclose(fp); 3712 return -1; 3713 } 3714 fclose(fp); 3715 3716 return atoi(buf); 3717 } 3718 3719 void linuxMemoryWarnings(void) { 3720 if (linuxOvercommitMemoryValue() == 0) { 3721 serverLog(LL_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect."); 3722 } 3723 if (THPIsEnabled()) { 3724 serverLog(LL_WARNING,"WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled."); 3725 } 3726 } 3727 #endif /* __linux__ */ 3728 3729 void createPidFile(void) { 3730 /* If pidfile requested, but no pidfile defined, use 3731 * default pidfile path */ 3732 if (!server.pidfile) server.pidfile = zstrdup(CONFIG_DEFAULT_PID_FILE); 3733 3734 /* Try to write the pid file in a best-effort way. */ 3735 FILE *fp = fopen(server.pidfile,"w"); 3736 if (fp) { 3737 fprintf(fp,"%d\n",(int)getpid()); 3738 fclose(fp); 3739 } 3740 } 3741 3742 void daemonize(void) { 3743 int fd; 3744 3745 if (fork() != 0) exit(0); /* parent exits */ 3746 setsid(); /* create a new session */ 3747 3748 /* Every output goes to /dev/null. If Redis is daemonized but 3749 * the 'logfile' is set to 'stdout' in the configuration file 3750 * it will not log at all. */ 3751 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) { 3752 dup2(fd, STDIN_FILENO); 3753 dup2(fd, STDOUT_FILENO); 3754 dup2(fd, STDERR_FILENO); 3755 if (fd > STDERR_FILENO) close(fd); 3756 } 3757 } 3758 3759 void version(void) { 3760 printf("Redis server v=%s sha=%s:%d malloc=%s bits=%d build=%llx\n", 3761 REDIS_VERSION, 3762 redisGitSHA1(), 3763 atoi(redisGitDirty()) > 0, 3764 ZMALLOC_LIB, 3765 sizeof(long) == 4 ? 32 : 64, 3766 (unsigned long long) redisBuildId()); 3767 exit(0); 3768 } 3769 3770 void usage(void) { 3771 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf] [options]\n"); 3772 fprintf(stderr," ./redis-server - (read config from stdin)\n"); 3773 fprintf(stderr," ./redis-server -v or --version\n"); 3774 fprintf(stderr," ./redis-server -h or --help\n"); 3775 fprintf(stderr," ./redis-server --test-memory <megabytes>\n\n"); 3776 fprintf(stderr,"Examples:\n"); 3777 fprintf(stderr," ./redis-server (run the server with default conf)\n"); 3778 fprintf(stderr," ./redis-server /etc/redis/6379.conf\n"); 3779 fprintf(stderr," ./redis-server --port 7777\n"); 3780 fprintf(stderr," ./redis-server --port 7777 --replicaof 127.0.0.1 8888\n"); 3781 fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n\n"); 3782 fprintf(stderr,"Sentinel mode:\n"); 3783 fprintf(stderr," ./redis-server /etc/sentinel.conf --sentinel\n"); 3784 exit(1); 3785 } 3786 3787 void redisAsciiArt(void) { 3788 #include "asciilogo.h" 3789 char *buf = zmalloc(1024*16); 3790 char *mode; 3791 3792 if (server.cluster_enabled) mode = "cluster"; 3793 else if (server.sentinel_mode) mode = "sentinel"; 3794 else mode = "standalone"; 3795 3796 /* Show the ASCII logo if: log file is stdout AND stdout is a 3797 * tty AND syslog logging is disabled. Also show logo if the user 3798 * forced us to do so via redis.conf. */ 3799 int show_logo = ((!server.syslog_enabled && 3800 server.logfile[0] == '\0' && 3801 isatty(fileno(stdout))) || 3802 server.always_show_logo); 3803 3804 if (!show_logo) { 3805 serverLog(LL_NOTICE, 3806 "Running mode=%s, port=%d.", 3807 mode, server.port 3808 ); 3809 } else { 3810 snprintf(buf,1024*16,ascii_logo, 3811 REDIS_VERSION, 3812 redisGitSHA1(), 3813 strtol(redisGitDirty(),NULL,10) > 0, 3814 (sizeof(long) == 8) ? "64" : "32", 3815 mode, server.port, 3816 (long) getpid() 3817 ); 3818 serverLogRaw(LL_NOTICE|LL_RAW,buf); 3819 } 3820 zfree(buf); 3821 } 3822 3823 static void sigShutdownHandler(int sig) { 3824 char *msg; 3825 3826 switch (sig) { 3827 case SIGINT: 3828 msg = "Received SIGINT scheduling shutdown..."; 3829 break; 3830 case SIGTERM: 3831 msg = "Received SIGTERM scheduling shutdown..."; 3832 break; 3833 default: 3834 msg = "Received shutdown signal, scheduling shutdown..."; 3835 }; 3836 3837 /* SIGINT is often delivered via Ctrl+C in an interactive session. 3838 * If we receive the signal the second time, we interpret this as 3839 * the user really wanting to quit ASAP without waiting to persist 3840 * on disk. */ 3841 if (server.shutdown_asap && sig == SIGINT) { 3842 serverLogFromHandler(LL_WARNING, "You insist... exiting now."); 3843 rdbRemoveTempFile(getpid()); 3844 exit(1); /* Exit with an error since this was not a clean shutdown. */ 3845 } else if (server.loading) { 3846 serverLogFromHandler(LL_WARNING, "Received shutdown signal during loading, exiting now."); 3847 exit(0); 3848 } 3849 3850 serverLogFromHandler(LL_WARNING, msg); 3851 server.shutdown_asap = 1; 3852 } 3853 3854 void setupSignalHandlers(void) { 3855 struct sigaction act; 3856 3857 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. 3858 * Otherwise, sa_handler is used. */ 3859 sigemptyset(&act.sa_mask); 3860 act.sa_flags = 0; 3861 act.sa_handler = sigShutdownHandler; 3862 sigaction(SIGTERM, &act, NULL); 3863 sigaction(SIGINT, &act, NULL); 3864 3865 #ifdef HAVE_BACKTRACE 3866 sigemptyset(&act.sa_mask); 3867 act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO; 3868 act.sa_sigaction = sigsegvHandler; 3869 sigaction(SIGSEGV, &act, NULL); 3870 sigaction(SIGBUS, &act, NULL); 3871 sigaction(SIGFPE, &act, NULL); 3872 sigaction(SIGILL, &act, NULL); 3873 #endif 3874 return; 3875 } 3876 3877 void memtest(size_t megabytes, int passes); 3878 3879 /* Returns 1 if there is --sentinel among the arguments or if 3880 * argv[0] contains "redis-sentinel". */ 3881 int checkForSentinelMode(int argc, char **argv) { 3882 int j; 3883 3884 if (strstr(argv[0],"redis-sentinel") != NULL) return 1; 3885 for (j = 1; j < argc; j++) 3886 if (!strcmp(argv[j],"--sentinel")) return 1; 3887 return 0; 3888 } 3889 3890 /* Function called at startup to load RDB or AOF file in memory. */ 3891 void loadDataFromDisk(void) { 3892 long long start = ustime(); 3893 if (server.aof_state == AOF_ON) { 3894 if (loadAppendOnlyFile(server.aof_filename) == C_OK) 3895 serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); 3896 } else { 3897 rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; 3898 if (rdbLoad(server.rdb_filename,&rsi) == C_OK) { 3899 serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", 3900 (float)(ustime()-start)/1000000); 3901 3902 /* Restore the replication ID / offset from the RDB file. */ 3903 if ((server.masterhost || (server.cluster_enabled && nodeIsSlave(server.cluster->myself)))&& 3904 rsi.repl_id_is_set && 3905 rsi.repl_offset != -1 && 3906 /* Note that older implementations may save a repl_stream_db 3907 * of -1 inside the RDB file in a wrong way, see more information 3908 * in function rdbPopulateSaveInfo. */ 3909 rsi.repl_stream_db != -1) 3910 { 3911 memcpy(server.replid,rsi.repl_id,sizeof(server.replid)); 3912 server.master_repl_offset = rsi.repl_offset; 3913 /* If we are a slave, create a cached master from this 3914 * information, in order to allow partial resynchronizations 3915 * with masters. */ 3916 replicationCacheMasterUsingMyself(); 3917 selectDb(server.cached_master,rsi.repl_stream_db); 3918 } 3919 } else if (errno != ENOENT) { 3920 serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); 3921 exit(1); 3922 } 3923 } 3924 } 3925 3926 void redisOutOfMemoryHandler(size_t allocation_size) { 3927 serverLog(LL_WARNING,"Out Of Memory allocating %zu bytes!", 3928 allocation_size); 3929 serverPanic("Redis aborting for OUT OF MEMORY"); 3930 } 3931 3932 void redisSetProcTitle(char *title) { 3933 #ifdef USE_SETPROCTITLE 3934 char *server_mode = ""; 3935 if (server.cluster_enabled) server_mode = " [cluster]"; 3936 else if (server.sentinel_mode) server_mode = " [sentinel]"; 3937 3938 setproctitle("%s %s:%d%s", 3939 title, 3940 server.bindaddr_count ? server.bindaddr[0] : "*", 3941 server.port, 3942 server_mode); 3943 #else 3944 UNUSED(title); 3945 #endif 3946 } 3947 3948 /* 3949 * Check whether systemd or upstart have been used to start redis. 3950 */ 3951 3952 int redisSupervisedUpstart(void) { 3953 const char *upstart_job = getenv("UPSTART_JOB"); 3954 3955 if (!upstart_job) { 3956 serverLog(LL_WARNING, 3957 "upstart supervision requested, but UPSTART_JOB not found"); 3958 return 0; 3959 } 3960 3961 serverLog(LL_NOTICE, "supervised by upstart, will stop to signal readiness"); 3962 raise(SIGSTOP); 3963 unsetenv("UPSTART_JOB"); 3964 return 1; 3965 } 3966 3967 int redisSupervisedSystemd(void) { 3968 const char *notify_socket = getenv("NOTIFY_SOCKET"); 3969 int fd = 1; 3970 struct sockaddr_un su; 3971 struct iovec iov; 3972 struct msghdr hdr; 3973 int sendto_flags = 0; 3974 3975 if (!notify_socket) { 3976 serverLog(LL_WARNING, 3977 "systemd supervision requested, but NOTIFY_SOCKET not found"); 3978 return 0; 3979 } 3980 3981 if ((strchr("@/", notify_socket[0])) == NULL || strlen(notify_socket) < 2) { 3982 return 0; 3983 } 3984 3985 serverLog(LL_NOTICE, "supervised by systemd, will signal readiness"); 3986 if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) { 3987 serverLog(LL_WARNING, 3988 "Can't connect to systemd socket %s", notify_socket); 3989 return 0; 3990 } 3991 3992 memset(&su, 0, sizeof(su)); 3993 su.sun_family = AF_UNIX; 3994 strncpy (su.sun_path, notify_socket, sizeof(su.sun_path) -1); 3995 su.sun_path[sizeof(su.sun_path) - 1] = '\0'; 3996 3997 if (notify_socket[0] == '@') 3998 su.sun_path[0] = '\0'; 3999 4000 memset(&iov, 0, sizeof(iov)); 4001 iov.iov_base = "READY=1"; 4002 iov.iov_len = strlen("READY=1"); 4003 4004 memset(&hdr, 0, sizeof(hdr)); 4005 hdr.msg_name = &su; 4006 hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) + 4007 strlen(notify_socket); 4008 hdr.msg_iov = &iov; 4009 hdr.msg_iovlen = 1; 4010 4011 unsetenv("NOTIFY_SOCKET"); 4012 #ifdef HAVE_MSG_NOSIGNAL 4013 sendto_flags |= MSG_NOSIGNAL; 4014 #endif 4015 if (sendmsg(fd, &hdr, sendto_flags) < 0) { 4016 serverLog(LL_WARNING, "Can't send notification to systemd"); 4017 close(fd); 4018 return 0; 4019 } 4020 close(fd); 4021 return 1; 4022 } 4023 4024 int redisIsSupervised(int mode) { 4025 if (mode == SUPERVISED_AUTODETECT) { 4026 const char *upstart_job = getenv("UPSTART_JOB"); 4027 const char *notify_socket = getenv("NOTIFY_SOCKET"); 4028 4029 if (upstart_job) { 4030 redisSupervisedUpstart(); 4031 } else if (notify_socket) { 4032 redisSupervisedSystemd(); 4033 } 4034 } else if (mode == SUPERVISED_UPSTART) { 4035 return redisSupervisedUpstart(); 4036 } else if (mode == SUPERVISED_SYSTEMD) { 4037 return redisSupervisedSystemd(); 4038 } 4039 4040 return 0; 4041 } 4042 4043 static int loop(void *arg) { 4044 aeMain((aeEventLoop *)arg); 4045 return 0; 4046 } 4047 4048 int main(int argc, char **argv) { 4049 struct timeval tv; 4050 int j; 4051 4052 #ifdef HAVE_FF_KQUEUE 4053 int rc = ff_init(argc, argv); 4054 assert(0 == rc); 4055 ff_mod_init(); 4056 int new_argc = argc - 4; 4057 if (new_argc <= 0) { 4058 new_argc = 1; 4059 } 4060 4061 char **new_argv = zmalloc(sizeof(char *) * new_argc); 4062 new_argv[0] = argv[0]; 4063 int i; 4064 for (i = 1; i < new_argc; i++) { 4065 new_argv[i] = argv[i + 4]; 4066 } 4067 argv = new_argv; 4068 argc = new_argc; 4069 #endif 4070 4071 4072 #ifdef REDIS_TEST 4073 if (argc == 3 && !strcasecmp(argv[1], "test")) { 4074 if (!strcasecmp(argv[2], "ziplist")) { 4075 return ziplistTest(argc, argv); 4076 } else if (!strcasecmp(argv[2], "quicklist")) { 4077 quicklistTest(argc, argv); 4078 } else if (!strcasecmp(argv[2], "intset")) { 4079 return intsetTest(argc, argv); 4080 } else if (!strcasecmp(argv[2], "zipmap")) { 4081 return zipmapTest(argc, argv); 4082 } else if (!strcasecmp(argv[2], "sha1test")) { 4083 return sha1Test(argc, argv); 4084 } else if (!strcasecmp(argv[2], "util")) { 4085 return utilTest(argc, argv); 4086 } else if (!strcasecmp(argv[2], "endianconv")) { 4087 return endianconvTest(argc, argv); 4088 } else if (!strcasecmp(argv[2], "crc64")) { 4089 return crc64Test(argc, argv); 4090 } else if (!strcasecmp(argv[2], "zmalloc")) { 4091 return zmalloc_test(argc, argv); 4092 } 4093 4094 return -1; /* test not found */ 4095 } 4096 #endif 4097 4098 /* We need to initialize our libraries, and the server configuration. */ 4099 #ifdef INIT_SETPROCTITLE_REPLACEMENT 4100 spt_init(argc, argv); 4101 #endif 4102 setlocale(LC_COLLATE,""); 4103 tzset(); /* Populates 'timezone' global. */ 4104 zmalloc_set_oom_handler(redisOutOfMemoryHandler); 4105 srand(time(NULL)^getpid()); 4106 gettimeofday(&tv,NULL); 4107 4108 char hashseed[16]; 4109 getRandomHexChars(hashseed,sizeof(hashseed)); 4110 dictSetHashFunctionSeed((uint8_t*)hashseed); 4111 server.sentinel_mode = checkForSentinelMode(argc,argv); 4112 initServerConfig(); 4113 moduleInitModulesSystem(); 4114 4115 /* Store the executable path and arguments in a safe place in order 4116 * to be able to restart the server later. */ 4117 server.executable = getAbsolutePath(argv[0]); 4118 server.exec_argv = zmalloc(sizeof(char*)*(argc+1)); 4119 server.exec_argv[argc] = NULL; 4120 for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]); 4121 4122 /* We need to init sentinel right now as parsing the configuration file 4123 * in sentinel mode will have the effect of populating the sentinel 4124 * data structures with master nodes to monitor. */ 4125 if (server.sentinel_mode) { 4126 initSentinelConfig(); 4127 initSentinel(); 4128 } 4129 4130 /* Check if we need to start in redis-check-rdb/aof mode. We just execute 4131 * the program main. However the program is part of the Redis executable 4132 * so that we can easily execute an RDB check on loading errors. */ 4133 if (strstr(argv[0],"redis-check-rdb") != NULL) 4134 redis_check_rdb_main(argc,argv,NULL); 4135 else if (strstr(argv[0],"redis-check-aof") != NULL) 4136 redis_check_aof_main(argc,argv); 4137 4138 if (argc >= 2) { 4139 j = 1; /* First option to parse in argv[] */ 4140 sds options = sdsempty(); 4141 char *configfile = NULL; 4142 4143 /* Handle special options --help and --version */ 4144 if (strcmp(argv[1], "-v") == 0 || 4145 strcmp(argv[1], "--version") == 0) version(); 4146 if (strcmp(argv[1], "--help") == 0 || 4147 strcmp(argv[1], "-h") == 0) usage(); 4148 if (strcmp(argv[1], "--test-memory") == 0) { 4149 if (argc == 3) { 4150 memtest(atoi(argv[2]),50); 4151 exit(0); 4152 } else { 4153 fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n"); 4154 fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n"); 4155 exit(1); 4156 } 4157 } 4158 4159 /* First argument is the config file name? */ 4160 if (argv[j][0] != '-' || argv[j][1] != '-') { 4161 configfile = argv[j]; 4162 server.configfile = getAbsolutePath(configfile); 4163 /* Replace the config file in server.exec_argv with 4164 * its absolute path. */ 4165 zfree(server.exec_argv[j]); 4166 server.exec_argv[j] = zstrdup(server.configfile); 4167 j++; 4168 } 4169 4170 /* All the other options are parsed and conceptually appended to the 4171 * configuration file. For instance --port 6380 will generate the 4172 * string "port 6380\n" to be parsed after the actual file name 4173 * is parsed, if any. */ 4174 while(j != argc) { 4175 if (argv[j][0] == '-' && argv[j][1] == '-') { 4176 /* Option name */ 4177 if (!strcmp(argv[j], "--check-rdb")) { 4178 /* Argument has no options, need to skip for parsing. */ 4179 j++; 4180 continue; 4181 } 4182 if (sdslen(options)) options = sdscat(options,"\n"); 4183 options = sdscat(options,argv[j]+2); 4184 options = sdscat(options," "); 4185 } else { 4186 /* Option argument */ 4187 options = sdscatrepr(options,argv[j],strlen(argv[j])); 4188 options = sdscat(options," "); 4189 } 4190 j++; 4191 } 4192 if (server.sentinel_mode && configfile && *configfile == '-') { 4193 serverLog(LL_WARNING, 4194 "Sentinel config from STDIN not allowed."); 4195 serverLog(LL_WARNING, 4196 "Sentinel needs config file on disk to save state. Exiting..."); 4197 exit(1); 4198 } 4199 resetServerSaveParams(); 4200 loadServerConfig(configfile,options); 4201 sdsfree(options); 4202 } 4203 4204 serverLog(LL_WARNING, "oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo"); 4205 serverLog(LL_WARNING, 4206 "Redis version=%s, bits=%d, commit=%s, modified=%d, pid=%d, just started", 4207 REDIS_VERSION, 4208 (sizeof(long) == 8) ? 64 : 32, 4209 redisGitSHA1(), 4210 strtol(redisGitDirty(),NULL,10) > 0, 4211 (int)getpid()); 4212 4213 if (argc == 1) { 4214 serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis"); 4215 } else { 4216 serverLog(LL_WARNING, "Configuration loaded"); 4217 } 4218 4219 server.supervised = redisIsSupervised(server.supervised_mode); 4220 int background = server.daemonize && !server.supervised; 4221 if (background) daemonize(); 4222 4223 initServer(); 4224 if (background || server.pidfile) createPidFile(); 4225 redisSetProcTitle(argv[0]); 4226 redisAsciiArt(); 4227 checkTcpBacklogSettings(); 4228 4229 if (!server.sentinel_mode) { 4230 /* Things not needed when running in Sentinel mode. */ 4231 serverLog(LL_WARNING,"Server initialized"); 4232 #ifdef __linux__ 4233 linuxMemoryWarnings(); 4234 #endif 4235 moduleLoadFromQueue(); 4236 loadDataFromDisk(); 4237 if (server.cluster_enabled) { 4238 if (verifyClusterConfigWithData() == C_ERR) { 4239 serverLog(LL_WARNING, 4240 "You can't have keys in a DB different than DB 0 when in " 4241 "Cluster mode. Exiting."); 4242 exit(1); 4243 } 4244 } 4245 if (server.ipfd_count > 0) 4246 serverLog(LL_NOTICE,"Ready to accept connections"); 4247 if (server.sofd > 0) 4248 serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); 4249 } else { 4250 sentinelIsRunning(); 4251 } 4252 4253 /* Warning the user about suspicious maxmemory setting. */ 4254 if (server.maxmemory > 0 && server.maxmemory < 1024*1024) { 4255 serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); 4256 } 4257 4258 aeSetBeforeSleepProc(server.el,beforeSleep); 4259 aeSetAfterSleepProc(server.el,afterSleep); 4260 ff_run(loop, server.el); 4261 aeDeleteEventLoop(server.el); 4262 #ifdef HAVE_FF_KQUEUE 4263 zfree(new_argv); 4264 #endif 4265 return 0; 4266 } 4267 4268 /* The End */ 4269